From 4f7ebbe8c3abb2bc846b7cac535fc4ebe737ec06 Mon Sep 17 00:00:00 2001 From: wendtalexander Date: Tue, 22 Oct 2024 10:10:09 +0200 Subject: [PATCH] [dataio/nix] adding mutex, writing data_array, and the last chunk --- pyrelacs/dataio/nix_writer.py | 49 ++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/pyrelacs/dataio/nix_writer.py b/pyrelacs/dataio/nix_writer.py index 6b437f0..decc2c7 100644 --- a/pyrelacs/dataio/nix_writer.py +++ b/pyrelacs/dataio/nix_writer.py @@ -1,5 +1,6 @@ import time from IPython import embed +from PyQt6.QtCore import QMutex import nixio from pyrelacs.dataio.circbuffer import CircBuffer @@ -12,29 +13,53 @@ class NixWriter: def __init__(self, buffer: CircBuffer) -> None: self.buffer = buffer - def write_nix(self, *args, **kwargs): - self._write_header() - items = 0 - chunk = 1000 + def write_nix( + self, + data_array: nixio.DataArray, + mutex: QMutex, + channel: int = 0, + chunk_size=1000, + *args, + **kwargs, + ): + index = 0 log.debug("Starting the writing") self.write = True while self.write: - total_count = self.buffer.totalcount() - if total_count - items >= chunk: - # log.debug(items) + total_count = self.buffer.totalcount(channel=channel) + if total_count - index >= chunk_size: + mutex.lock() + log.debug(index) try: - data, _ = self.buffer.read(items, extend=chunk) - self.data_array.append(data) + _, data = self.buffer.read( + index, extend=chunk_size, channel=channel + ) + if index == 0: + data_array.write_direct(data) + else: + data_array.append(data) + index += chunk_size except IndexError as e: time.sleep(0.001) log.debug(f"{e}") - items += chunk + mutex.unlock() else: time.sleep(0.001) continue + total_count = self.buffer.totalcount(channel=channel) + try: + mutex.lock() + _, data = self.buffer.read( + index, extend=total_count - index, channel=channel + ) + data_array.append(data) + mutex.unlock() + index += total_count - index + except IndexError as e: + log.error(f"Could not read the last samples, {e}") + log.debug("Stoppint the writing") - log.debug(f"Samples written {items}") - self.nix_file.close() + log.debug(f"Samples written {index}") def _write_header(self): self.nix_file = nixio.File.open(path="data.nix", mode=nixio.FileMode.Overwrite)