forked from awendt/pyrelacs
107 lines
3.5 KiB
Python
107 lines
3.5 KiB
Python
from datetime import datetime
|
|
import itertools
|
|
import string
|
|
import time
|
|
from dataclasses import asdict
|
|
|
|
from IPython import embed
|
|
from PyQt6.QtCore import QMutex
|
|
import nixio
|
|
|
|
from pyrelacs.dataio.circbuffer import CircBuffer
|
|
from pyrelacs.util.logging import config_logging
|
|
|
|
log = config_logging()
|
|
|
|
|
|
class NixWriter:
|
|
def __init__(self, buffer: CircBuffer, config) -> None:
|
|
self.buffer = buffer
|
|
self.config = config
|
|
|
|
def write_nix(
|
|
self,
|
|
data_array: nixio.DataArray,
|
|
mutex: QMutex,
|
|
channel: int = 0,
|
|
chunk_size=1000,
|
|
*args,
|
|
**kwargs,
|
|
):
|
|
index = 0
|
|
log.debug("Starting the writing")
|
|
self.write = True
|
|
while self.write:
|
|
total_count = self.buffer.totalcount(channel=channel)
|
|
if total_count - index >= chunk_size:
|
|
mutex.lock()
|
|
log.debug(index)
|
|
try:
|
|
_, data = self.buffer.read(
|
|
index, extend=chunk_size, channel=channel
|
|
)
|
|
if index == 0:
|
|
data_array.write_direct(data)
|
|
else:
|
|
data_array.append(data)
|
|
index += chunk_size
|
|
except IndexError as e:
|
|
time.sleep(0.001)
|
|
log.debug(f"{e}")
|
|
mutex.unlock()
|
|
else:
|
|
time.sleep(0.001)
|
|
continue
|
|
total_count = self.buffer.totalcount(channel=channel)
|
|
try:
|
|
mutex.lock()
|
|
_, data = self.buffer.read(
|
|
index, extend=total_count - index, channel=channel
|
|
)
|
|
data_array.append(data)
|
|
mutex.unlock()
|
|
index += total_count - index
|
|
except IndexError as e:
|
|
log.error(f"Could not read the last samples, {e}")
|
|
|
|
log.debug("Stoppint the writing")
|
|
log.debug(f"Samples written {index}")
|
|
|
|
def _write_header(self):
|
|
self.nix_file = nixio.File.open(path="data.nix", mode=nixio.FileMode.Overwrite)
|
|
self.block = self.nix_file.create_block("recording", "testfile")
|
|
self.data_array = self.block.create_data_array(
|
|
"Analog1", "ndarray", shape=(1000,), dtype=nixio.DataType.Double
|
|
)
|
|
|
|
def generate_letter_sequence(self):
|
|
alphabet = string.ascii_lowercase
|
|
for size in range(2, 3):
|
|
for combo in itertools.product(alphabet, repeat=size):
|
|
yield "".join(combo)
|
|
|
|
def create_nix_file(self, file_path, metadata):
|
|
data_time = datetime.now().strftime("%Y-%m-%t_%H-%M-%S")
|
|
sequence_generator = self.generate_letter_sequence()
|
|
sequence = next(sequence_generator)
|
|
|
|
self.nix_file = nixio.File.open(
|
|
path=f"{file_path}/{data_time}_{sequence}.nix",
|
|
mode=nixio.FileMode.Overwrite,
|
|
)
|
|
self.block = self.nix_file.create_block("recording", "testfile")
|
|
self.section = self.nix_file.create_section("metadata", "config.yaml")
|
|
for key, value in asdict(metadata).items():
|
|
self.section[key] = value
|
|
|
|
self.data_array_analog1 = self.block.create_data_array(
|
|
"Analog1", "ndarray", shape=(1000,), dtype=nixio.DataType.Double
|
|
)
|
|
self.data_array_analog2 = self.block.create_data_array(
|
|
"Analog2", "ndarray", shape=(1000,), dtype=nixio.DataType.Double
|
|
)
|
|
return self.data_array_analog1, self.data_array_analog2
|
|
|
|
def stop_writing(self):
|
|
self.write = False
|