[dataio/nix] adding a simple nix writer

This commit is contained in:
wendtalexander 2024-10-11 14:08:35 +02:00
parent 0f09c948ec
commit 7510d04f8d

View File

@ -0,0 +1,43 @@
from IPython import embed
import nixio
from pyrelacs.dataio.circbuffer import CircBuffer
from pyrelacs.util.logging import config_logging
log = config_logging()
class NixWriter:
def __init__(self, buffer: CircBuffer) -> None:
self.buffer = buffer
def write_nix(self, *args, **kwargs):
self._write_header()
items = 0
chunk = 1000
log.debug("Starting the writing")
self.write = True
while self.write:
log.debug(items)
try:
data, _ = self.buffer.read(items, extend=chunk)
self.data_array.append(data)
except IndexError as e:
log.debug(f"{e}")
continue
items += chunk
log.debug("Stoppint the writing")
log.debug(f"Samples written {items}")
embed()
exit()
self.nix_file.close()
def _write_header(self):
self.nix_file = nixio.File.open(path="data.nix", mode=nixio.FileMode.Overwrite)
self.block = self.nix_file.create_block("recording", "testfile")
self.data_array = self.block.create_data_array(
"Analog1", "ndarray", shape=(1000,), dtype=nixio.DataType.Double
)
def stop_writing(self):
self.write = False