pyrelacs/pyrelacs/dataio/nix_writer.py

73 lines
2.2 KiB
Python

import time
from IPython import embed
from PyQt6.QtCore import QMutex
import nixio
from pyrelacs.dataio.circbuffer import CircBuffer
from pyrelacs.util.logging import config_logging
log = config_logging()
class NixWriter:
def __init__(self, buffer: CircBuffer) -> None:
self.buffer = buffer
def write_nix(
self,
data_array: nixio.DataArray,
mutex: QMutex,
channel: int = 0,
chunk_size=1000,
*args,
**kwargs,
):
index = 0
log.debug("Starting the writing")
self.write = True
while self.write:
total_count = self.buffer.totalcount(channel=channel)
if total_count - index >= chunk_size:
mutex.lock()
log.debug(index)
try:
_, data = self.buffer.read(
index, extend=chunk_size, channel=channel
)
if index == 0:
data_array.write_direct(data)
else:
data_array.append(data)
index += chunk_size
except IndexError as e:
time.sleep(0.001)
log.debug(f"{e}")
mutex.unlock()
else:
time.sleep(0.001)
continue
total_count = self.buffer.totalcount(channel=channel)
try:
mutex.lock()
_, data = self.buffer.read(
index, extend=total_count - index, channel=channel
)
data_array.append(data)
mutex.unlock()
index += total_count - index
except IndexError as e:
log.error(f"Could not read the last samples, {e}")
log.debug("Stoppint the writing")
log.debug(f"Samples written {index}")
def _write_header(self):
self.nix_file = nixio.File.open(path="data.nix", mode=nixio.FileMode.Overwrite)
self.block = self.nix_file.create_block("recording", "testfile")
self.data_array = self.block.create_data_array(
"Analog1", "ndarray", shape=(1000,), dtype=nixio.DataType.Double
)
def stop_writing(self):
self.write = False