from IPython import embed import nixio from pyrelacs.dataio.circbuffer import CircBuffer from pyrelacs.util.logging import config_logging log = config_logging() class NixWriter: def __init__(self, buffer: CircBuffer) -> None: self.buffer = buffer def write_nix(self, *args, **kwargs): self._write_header() items = 0 chunk = 1000 log.debug("Starting the writing") self.write = True while self.write: log.debug(items) try: data, _ = self.buffer.read(items, extend=chunk) self.data_array.append(data) except IndexError as e: log.debug(f"{e}") continue items += chunk log.debug("Stoppint the writing") log.debug(f"Samples written {items}") self.nix_file.close() def _write_header(self): self.nix_file = nixio.File.open(path="data.nix", mode=nixio.FileMode.Overwrite) self.block = self.nix_file.create_block("recording", "testfile") self.data_array = self.block.create_data_array( "Analog1", "ndarray", shape=(1000,), dtype=nixio.DataType.Double ) def stop_writing(self): self.write = False