from datetime import datetime import itertools import string import time from dataclasses import asdict from IPython import embed from PyQt6.QtCore import QMutex import nixio from pyrelacs.dataio.circbuffer import CircBuffer from pyrelacs.util.logging import config_logging log = config_logging() class NixWriter: def __init__(self, buffer: CircBuffer, config) -> None: self.buffer = buffer self.config = config def write_nix( self, data_array: nixio.DataArray, mutex: QMutex, channel: int = 0, chunk_size=1000, *args, **kwargs, ): index = 0 log.debug("Starting the writing") self.write = True while self.write: total_count = self.buffer.totalcount(channel=channel) if total_count - index >= chunk_size: mutex.lock() log.debug(index) try: _, data = self.buffer.read( index, extend=chunk_size, channel=channel ) if index == 0: data_array.write_direct(data) else: data_array.append(data) index += chunk_size except IndexError as e: time.sleep(0.001) log.debug(f"{e}") mutex.unlock() else: time.sleep(0.001) continue total_count = self.buffer.totalcount(channel=channel) try: mutex.lock() _, data = self.buffer.read( index, extend=total_count - index, channel=channel ) data_array.append(data) mutex.unlock() index += total_count - index except IndexError as e: log.error(f"Could not read the last samples, {e}") log.debug("Stoppint the writing") log.debug(f"Samples written {index}") def _write_header(self): self.nix_file = nixio.File.open(path="data.nix", mode=nixio.FileMode.Overwrite) self.block = self.nix_file.create_block("recording", "testfile") self.data_array = self.block.create_data_array( "Analog1", "ndarray", shape=(1000,), dtype=nixio.DataType.Double ) def generate_letter_sequence(self): alphabet = string.ascii_lowercase for size in range(2, 3): for combo in itertools.product(alphabet, repeat=size): yield "".join(combo) def create_nix_file(self, file_path, metadata): data_time = datetime.now().strftime("%Y-%m-%t_%H-%M-%S") sequence_generator = self.generate_letter_sequence() sequence = next(sequence_generator) self.nix_file = nixio.File.open( path=f"{file_path}/{data_time}_{sequence}.nix", mode=nixio.FileMode.Overwrite, ) self.block = self.nix_file.create_block("recording", "testfile") self.section = self.nix_file.create_section("metadata", "config.yaml") for key, value in asdict(metadata).items(): self.section[key] = value self.data_array_analog1 = self.block.create_data_array( "Analog1", "ndarray", shape=(1000,), dtype=nixio.DataType.Double ) self.data_array_analog2 = self.block.create_data_array( "Analog2", "ndarray", shape=(1000,), dtype=nixio.DataType.Double ) return self.data_array_analog1, self.data_array_analog2 def stop_writing(self): self.write = False