[tonix] adding script that adds all the data arrays to nix

This commit is contained in:
wendtalexander 2025-10-07 16:07:51 +02:00
parent e61172227e
commit 7c9323311c

View File

@ -1,711 +1,138 @@
import logging import logging
import pathlib import pathlib
import tomllib
import matplotlib.pyplot as plt
import nixio import nixio
import numpy as np import numpy as np
import rlxnix as rlx import rlxnix as rlx
from IPython import embed from IPython import embed
from neo.io import OpenEphysBinaryIO from neo.io import OpenEphysBinaryIO
from nixio.exceptions import DuplicateName
from scipy import signal from oephys2nix.logging import setup_logging
from oephys2nix.metadata import create_dict_from_section, create_metadata_from_dict
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
setup_logging(log, level="DEBUG") setup_logging(log, level="DEBUG")
class CreateNix: class RawToNix:
def __init__(self, open_ephys_path, relacs_nix_path, nix_file): def __init__(self, open_ephys_path: pathlib.Path, relacs_nix_path: str, nix_file: str):
self.relacs_nix_file = nixio.File.open(relacs_nix_path, nixio.FileMode.ReadOnly) self.relacs_nix_file = nixio.File.open(relacs_nix_path, nixio.FileMode.ReadOnly)
self.dataset = rlx.Dataset(relacs_nix_path) self.dataset = rlx.Dataset(relacs_nix_path)
self.relacs_block = self.relacs_nix_file.blocks[0] self.relacs_block = self.relacs_nix_file.blocks[0]
self.relacs_sections = self.relacs_nix_file.sections self.relacs_sections = self.relacs_nix_file.sections
self.spike_sorter_path = str(self.recording_path.resolve() / "spikesorter.nix")
self.neo_data = OpenEphysBinaryIO(open_ephys_path).read(lazy=True) self.neo_data = OpenEphysBinaryIO(open_ephys_path).read(lazy=True)
self.nix_file = nix_file self.nix_file = nixio.File(nix_file, nixio.FileMode.ReadWrite)
self.nix_file.create_block("open-ephys.data", "open-ephys.sampled")
self.block = self.nix_file.blocks[0] self.block = self.nix_file.blocks[0]
def _load_config(self): def append_section(self):
config_path = self.recording_path / "stimulus_config.toml" sec = self.nix_file.create_section(
if not config_path.is_file(): self.relacs_sections[0].name, self.relacs_sections[0].type
log.debug("Stimulus config was not found")
with open(config_path, "rb") as config_file:
try:
config = tomllib.load(config_file)
config_file.close()
return config
except tomllib.TOMLDecodeError as e:
config_file.close()
raise tomllib.TOMLDecodeError(f"Error parsing TOML file: {e}")
def _append_relacs_tag_mtags(self):
for t in self.relacs_block.tags:
log.debug(f"Appending relacs tags {t.name}")
tag = self.block.create_tag(f"relacs_{t.name}", t.type, position=t.position)
tag.extent = t.extent
tag.references.extend(self.block.groups["relacs"].data_arrays)
sec = self.relacs_sections[f"{t.name}"]
d = create_dict_from_section(sec)
try:
new_sec = self.nix_file.create_section(sec.name, sec.type)
create_metadata_from_dict(d, new_sec)
except DuplicateName:
pass
tag.metadata = self.nix_file.sections[sec.name]
for t in self.relacs_block.multi_tags:
log.debug(f"Appending relacs multi-tags {t.name}")
mtag = self.block.create_multi_tag(
f"relacs_{t.name}",
t.type,
positions=t.positions[:],
extents=t.extents[:],
) )
mtag.references.extend(self.block.groups["relacs"].data_arrays) d = create_dict_from_section(self.relacs_sections[0])
create_metadata_from_dict(d, sec)
self.block.metadata = sec
sec = self.relacs_sections[f"{t.name}"] def append_fish_lines(self):
d = create_dict_from_section(sec) efishs = [
try: "ttl-line",
new_sec = self.nix_file.create_section(sec.name, sec.type) "global-eod",
create_metadata_from_dict(d, new_sec) "stimulus",
except DuplicateName: "local-eod",
pass
mtag.metadata = self.nix_file.sections[sec.name]
def _find_peak_ttl(self, time_ttl, peaks_ttl, lower, upper):
peak = time_ttl[
peaks_ttl[(time_ttl[peaks_ttl] > lower) & (time_ttl[peaks_ttl] < upper)]
] ]
if not peak.size > 0: efish_types = [
log.error("No peaks found") "open-ephys.data.sampled",
elif peak.size > 1: "open-ephys.data.sampled",
if np.all(np.diff(peak) > 0.5): "open-ephys.data.sampled",
log.error("Peaks are further aways than 0.5 seconds") "open-ephys.data.sampled",
log.error(f"Peaks {peak}, Furthest aways: {np.max(peak)}")
peak = np.mean(peak)
return peak
def _find_peak_ttl_index(self, time_ttl, peaks_ttl, current_position):
new_repro_start_index = peaks_ttl[
(time_ttl[peaks_ttl] > current_position - 0.1)
& (time_ttl[peaks_ttl] < current_position + 0.1)
] ]
# if multiple take the last one efish_group = self.block.create_group("efish", "open-ephys.sampled")
if new_repro_start_index.size > 1:
log.warning("Multiple current positions taking the last index")
new_repro_start_index = new_repro_start_index[-1]
# log.debug(f"Time current repro/trial {time_ttl[new_repro_start_index]}") efish_neo_data = self.neo_data[0].segments[0].analogsignals[1].load()
if np.where(new_repro_start_index == peaks_ttl)[0] + 1 == len(peaks_ttl):
return np.array([])
else:
next_repro_start = peaks_ttl[
np.where(new_repro_start_index == peaks_ttl)[0] + 1
]
start_next_repro = time_ttl[next_repro_start]
log.debug(f"Start of new repro/trial {start_next_repro}")
return start_next_repro
def _reference_groups(self) -> list[nixio.Group]: for i in np.arange(len(efishs)):
return [ log.debug(f"Appending efish traces {efishs[i]}")
self.block.groups["neuronal-data"], efish_neo_data_array = efish_neo_data[:, i]
# self.block.groups["spike-data"], data_array = self.block.create_data_array(
self.block.groups["efish"], f"{efishs[i]}",
self.block.groups["relacs"], f"{efish_types[i]}",
data=efish_neo_data_array.magnitude.flatten(),
label="voltage",
unit="V",
)
data_array.append_sampled_dimension(
1 / efish_neo_data.sampling_rate.magnitude, label="time", unit="s"
)
efish_group.data_arrays.append(data_array)
def append_relacs_lines(self):
relacs = [
"V-1",
"EOD",
"LocalEOD-1",
"GlobalEFieldStimulus",
"Spikes-1",
"Chirps",
"LocalBeat-1-1",
] ]
def _append_mtag(self, repro, positions, extents): relacs_types = [
try: "relacs.data.sampled.V-1",
nix_mtag = self.block.create_multi_tag( "relacs.data.sampled.EOD",
f"{repro.name}", "relacs.data.sampled.LocalEOD-1",
"relacs.stimulus", "relacs.data.sampled.GlobalEFieldStimulus",
positions=positions, "relacs.data.events.Spikes-1",
extents=extents, "relacs.data.events.Chirps",
) "relacs.data.events.LocalBeat-1-1",
except DuplicateName:
del self.block.multi_tags[repro.name]
nix_mtag = self.block.create_multi_tag(
f"{repro.name}",
"relacs.stimulus",
positions=positions,
extents=extents,
)
sec = self.relacs_sections[f"{repro.name}"]
d = create_dict_from_section(sec)
try:
new_sec = self.nix_file.create_section(sec.name, sec.type)
create_metadata_from_dict(d, new_sec)
except DuplicateName:
del self.nix_file.sections[sec.name]
new_sec = self.nix_file.create_section(sec.name, sec.type)
create_metadata_from_dict(d, new_sec)
nix_mtag.metadata = self.nix_file.sections[repro.name]
[
nix_mtag.references.extend(ref_groups.data_arrays)
for ref_groups in self._reference_groups
if ref_groups.data_arrays
] ]
try: efish_group = self.block.create_group("relacs", "relacs.sampled")
nix_group = self.block.create_group(repro.name, repro.type)
except DuplicateName:
nix_group = self.nix_file.blocks[0].groups[repro.name]
nix_group.multi_tags.append(nix_mtag)
def _append_tag(self, repro, position, extent): for i, re in enumerate(relacs):
try: log.debug(f"Appending relacs efish traces {re}")
nix_tag = self.block.create_tag( efish_relacs_data_array = self.relacs_block.data_arrays[re]
f"{repro.name}", data_array = self.block.create_data_array(
"relacs.repro_run", f"{re}",
position=np.array(position).flatten(), f"{relacs_types[i]}",
data=efish_relacs_data_array[:],
label=efish_relacs_data_array.label,
unit=efish_relacs_data_array.unit,
) )
nix_tag.extent = extent.flatten() if efish_relacs_data_array.dimensions[0].dimension_type == nixio.DimensionType.Sample:
except DuplicateName: data_array.append_sampled_dimension(
del self.block.tags[repro.name] efish_relacs_data_array.dimensions[0].sampling_interval,
nix_tag = self.block.create_tag( label="time",
f"{repro.name}", unit="s",
"relacs.repro_run",
position=np.array(position).flatten(),
) )
nix_tag.extent = extent.flatten() elif efish_relacs_data_array.dimensions[0].dimension_type == nixio.DimensionType.Range:
data_array.append_range_dimension(
sec = self.relacs_sections[f"{repro.name}"] np.sort(efish_relacs_data_array.dimensions[0].ticks),
d = create_dict_from_section(sec) label="time",
unit="s",
try:
new_sec = self.nix_file.create_section(sec.name, sec.type)
create_metadata_from_dict(d, new_sec)
except DuplicateName:
del self.nix_file.sections[sec.name]
new_sec = self.nix_file.create_section(sec.name, sec.type)
create_metadata_from_dict(d, new_sec)
nix_tag.metadata = self.nix_file.sections[repro.name]
# NOTE: adding refs to tag
[
nix_tag.references.extend(ref_groups.data_arrays)
for ref_groups in self._reference_groups
if ref_groups.data_arrays
]
try:
nix_group = self.block.create_group(repro.name, repro.type)
except DuplicateName:
nix_group = self.nix_file.blocks[0].groups[repro.name]
nix_group.tags.append(nix_tag)
def create_repros_automatically(self):
ttl_oeph = self.block.data_arrays["ttl-line"][:]
time_ttl = np.arange(len(ttl_oeph)) / self.cfg.open_ephys.samplerate
time_index = np.arange(len(ttl_oeph))
# Threshold for TTL peak
threshold = 2
peaks_ttl = time_index[
(np.roll(ttl_oeph, 1) < threshold) & (ttl_oeph > threshold)
]
# WARNING:Check if peaks are duplicates or near each other
close_peaks = np.where(np.diff(peaks_ttl) == 1)[0]
if close_peaks.size > 0:
peaks_ttl = np.delete(peaks_ttl, close_peaks)
# NOTE: Get the first peak from the toml file
first_peak = self._find_peak_ttl(
time_ttl,
peaks_ttl,
self.stimulus_config["stimulus"]["start_repro_low"],
self.stimulus_config["stimulus"]["start_repro_high"],
) )
current_position = np.asarray(first_peak.reshape(1)) efish_group.data_arrays.append(data_array)
# NOTE: Start defined by config
for i, repro in enumerate(
self.dataset.repro_runs()[
self.stimulus_config["stimulus"]["repro_number_start"] :
],
start=self.stimulus_config["stimulus"]["repro_number_start"],
):
log.debug(repro.name)
log.debug(f"Current Position {current_position.item()}")
if repro.duration < 1.0:
log.warning(f"Skipping repro {repro.name} because it is two short")
continue
if repro.stimuli: def append_raw_data(self):
log.debug("Processing MultiTag") gr = self.block.create_group("neuronal-data", "open-epyhs.sampled")
repetition = len(repro.stimuli) raw_neo_data = self.neo_data[0].segments[0].analogsignals[0].load()
extents_mtag = np.zeros((repetition, 1))
position_mtags = np.zeros((repetition, 1))
for trial, stimulus in enumerate(repro.stimuli): log.debug("Appending raw data")
duration_trial = stimulus.duration nix_data_array = self.block.create_data_array(
extents_mtag[trial] = duration_trial name="data",
position_mtags[trial] = current_position array_type="open-ephys.data.sampled",
dtype=nixio.DataType.Int16,
current_position = self._find_peak_ttl_index( data=raw_neo_data,
time_ttl, peaks_ttl, current_position unit="uV",
) )
nix_data_array.append_sampled_dimension(
# current_position = position_mtags[-1] 1 / raw_neo_data.sampling_rate.magnitude, label="time", unit="s"
self._append_mtag(repro, position_mtags, extents_mtag)
extent = position_mtags[-1] + extents_mtag[-1] - position_mtags[0]
self._append_tag(repro, position_mtags[0], extent)
if not current_position.size > 0:
log.info("Finishing writing")
log.info("Closing nix files")
self.close()
exit()
else:
if i == 0 and "BaselineActivity" in repro.name:
self._append_tag(repro, 0.0, current_position)
continue
last_repro_name = self.dataset.repro_runs()[i - 1].name
last_repro_position = (
self.block.groups[last_repro_name].tags[0].position[0]
+ self.block.groups[last_repro_name].tags[0].extent[0]
) )
self._append_tag( gr.data_arrays.append(nix_data_array)
repro,
last_repro_position.reshape(-1, 1),
(current_position - last_repro_position).reshape(-1, 1),
)
self.close()
#
#
# # NOTE: Create a repro tag
# t = self.block.create_tag(
# f"{repro.name}",
# f"{repro.type}",
# position=current_position,
# )
# t.metadata = repro.metadata
#
# for i, group in enumerate(
# list(self.relacs_block.groups)[
# self.stimulus_config["stimulus"]["repro_number_start"] :
# ],
# start=self.stimulus_config["stimulus"]["repro_number_start"],
# ):
# log.debug(group.name)
# log.debug(current_position)
#
# nix_group = self.block.groups[group.name]
# if group.name == "BaselineActivity_1":
# first_peak = 0.0
# current_position = np.array(first_peak).reshape(1)
#
# # if no multi_tag exists just a tag
# if not group.multi_tags:
# log.debug(f"Creating Tag {group.name}")
# t = self.block.create_tag(
# f"{nix_group.name}",
# f"{nix_group.type}",
# position=current_position,
# )
# t.metadata = self.nix_file.sections[group.name]
#
# # searching for the end of the repro in the ttl channel
# start_next_repro = time_ttl[
# peaks_ttl[
# (
# time_ttl[peaks_ttl]
# > group.tags[0].extent + current_position - 1
# )
# & (
# time_ttl[peaks_ttl]
# < group.tags[0].extent + current_position + 1
# )
# ]
# ]
# log.debug(start_next_repro)
# if start_next_repro.size > 1:
# if np.all(np.diff(start_next_repro) > 0.5):
# log.error("Wrong end point in end of repro")
# log.error(f"{start_next_repro}, {np.max(start_next_repro)}")
# start_next_repro = np.mean(start_next_repro)
#
# if not start_next_repro:
# log.error(f"No Endpoint found for repro {group.name}")
# embed()
# exit()
# t.extent = (start_next_repro - t.position[0]).reshape(1)
# # add references to the tag
# [
# t.references.extend(ref_group.data_arrays)
# for ref_group in referencs_groups
# if ref_group.data_arrays
# ]
# nix_group.tags.append(t)
# current_position = start_next_repro.reshape(1)
#
# else:
# # NOTE: If repro has multitags
# log.debug(f"Creating Multi Tag {group.name}")
# test_mul_tags = group.multi_tags[0].positions[:]
# test_position_tag, test_extent_tag = (
# group.tags[0].position[0],
# group.tags[0].extent[0],
# )
# len_mtag = len(
# test_mul_tags[
# (test_mul_tags >= test_position_tag)
# & (test_mul_tags <= (test_position_tag + test_extent_tag))
# ]
# )
# extents_mtag = group.multi_tags[0].extents[:][
# (test_mul_tags >= test_position_tag)
# & (test_mul_tags <= (test_position_tag + test_extent_tag))
# ]
# position_mtags = np.zeros((len_mtag, 1))
# position_mtags[0] = current_position
#
# for j, extents in enumerate(extents_mtag[1:], start=1):
# next_repro_position = time_ttl[
# peaks_ttl[
# (
# time_ttl[peaks_ttl]
# > extents + current_position - extents / 2
# )
# & (time_ttl[peaks_ttl] < extents + current_position + 2)
# ]
# ]
# if next_repro_position.size == 0:
# log.error(f"next start of MultiTag not found")
# embed()
# exit()
#
# if next_repro_position.size > 1:
# if np.all(np.diff(next_repro_position) > 0.005):
# log.error("Wrong end point in end of repro")
# log.error(
# f"{next_repro_position}, {np.max(next_repro_position)}"
# )
# embed()
# exit()
# exit(1)
# next_repro_position = np.mean(next_repro_position)
# log.debug(f"{j}, {next_repro_position}")
#
# position_mtags[j] = next_repro_position
# current_position = next_repro_position
#
# start_next_repro = time_ttl[
# peaks_ttl[
# (time_ttl[peaks_ttl] > current_position + nix_mtag.extents[-1])
# & (
# time_ttl[peaks_ttl]
# < current_position + nix_mtag.extents[-1] + 2
# )
# ]
# ]
# if not start_next_repro.size == 0:
# if start_next_repro.size > 1:
# if np.all(np.diff(start_next_repro) > 0.005):
# log.error("Wrong end point in end of repro")
# log.error(f"{start_next_repro}, {np.max(start_next_repro)}")
# embed()
# exit(1)
#
# start_next_repro = np.mean(start_next_repro)
#
# else:
# log.debug(
# "No ttl pluse foud for the end of the repro create time point"
# )
# log.debug("Taking the next ttl pulse")
# new_repro_start_index = peaks_ttl[
# (time_ttl[peaks_ttl] > current_position - 1)
# & (time_ttl[peaks_ttl] < current_position + 2)
# ]
#
# # if multiple take the last one
# if new_repro_start_index.size > 1:
# new_repro_start_index = new_repro_start_index[-1]
#
# log.debug(time_ttl[new_repro_start_index])
# try:
# next_repro_start = peaks_ttl[
# np.where(new_repro_start_index == peaks_ttl)[0] + 1
# ]
# except IndexError:
# if np.where(new_repro_start_index == peaks_ttl)[0] + 1 == len(
# peaks_ttl
# ):
# log.debug("Finishing writing")
# self.nix_file.close()
# self.relacs_nix_file.close()
# exit(1)
# if next_repro_start.size > 1:
# log.debug("Start of next Repro has multiple ttl pulse")
# embed()
# exit()
#
# # check if the time difference is greater and 5 seconds
# if (
# not time_ttl[next_repro_start] - time_ttl[new_repro_start_index]
# > 5
# ):
# log.error("Wrong endpoint for repro")
# exit(1)
# start_next_repro = time_ttl[next_repro_start]
# log.info(f"Start of new Repro {start_next_repro}")
#
# current_position = start_next_repro
#
# # NOTE: If SAM or Baseline we dont have to find the next current position
# if (
# group.name.split("_")[0] == "SAM"
# or group.name.split("_")[0] == "BaselineActivity"
# # or group.name.split("_")[0] == "FileStimulus"
# ):
# current_position = current_position.reshape(1)
# log.info(f"Start of new Repro {current_position}")
# continue
#
# # peaks_repro_index = peaks_ttl[
# # (time_ttl[peaks_ttl] > current_position - 1)
# # & (time_ttl[peaks_ttl] < current_position + 2)
# # ]
# #
# # # if multiple take the last one
# # if peaks_repro_index.size > 1:
# # peaks_repro_index = peaks_repro_index[-1]
# #
# # log.debug(time_ttl[peaks_repro_index])
# # next_repro_start = peaks_ttl[
# # np.where(peaks_repro_index == peaks_ttl)[0] + 1
# # ]
# # if next_repro_start.size > 1:
# # embed()
# #
# # # check if the time difference is greater and 5 seconds
# # if not time_ttl[next_repro_start] - time_ttl[peaks_repro_index] > 5:
# # log.error("Wrong endpoint for repro")
# # embed()
# # exit()
# # current_position = time_ttl[next_repro_start]
# # log.info(f"Start of new Repro {current_position}")
# #
# # current_position = current_position.reshape(1)
#
# self.nix_file.close()
# self.relacs_nix_file.close()
def create_repros_from_config_file(
self,
):
ttl_oeph = self.block.data_arrays["ttl-line"][:]
peaks_ttl = signal.find_peaks(
ttl_oeph.flatten(),
**self.stimulus_config["stimulus"]["peak_detection"],
)[0]
time_ttl = np.arange(len(ttl_oeph)) / self.cfg.open_ephys.samplerate
number_of_repros = len(self.stimulus_config["repros"]["name"])
referencs_groups = [
self.block.groups["neuronal-data"],
self.block.groups["spike-data"],
self.block.groups["efish"],
]
for repro_index in range(number_of_repros):
name = self.stimulus_config["repros"]["name"][repro_index]
log.debug(name)
start = np.array(self.stimulus_config["repros"]["start"][repro_index])
end = np.array(self.stimulus_config["repros"]["end"][repro_index])
nix_group = self.block.groups[name]
if start.size > 1:
start_repro = time_ttl[
peaks_ttl[
(time_ttl[peaks_ttl] > start[0])
& (time_ttl[peaks_ttl] < start[1])
]
]
if start_repro.size > 1:
if np.all(np.diff(start_repro) > 0.005):
log.error("Wrong end point in end of repro")
log.error(f"{name[repro_index]}, {np.max(start_repro)}")
exit(1)
start_repro = np.mean(start_repro)
else:
start_repro = start_repro[0]
else:
start_repro = start[0]
if end.size > 1:
end_repro = time_ttl[
peaks_ttl[
(time_ttl[peaks_ttl] > end[0]) & (time_ttl[peaks_ttl] < end[1])
]
]
if end_repro.size > 1:
if np.all(np.diff(end_repro) > 0.005):
log.error("Wrong end point in end of repro")
log.error(f"{name[repro_index]}, {np.max(end_repro)}")
exit(1)
end_repro = np.mean(end_repro)
else:
end_repro = end[0]
nix_tag = self.block.create_tag(
f"{nix_group.name}",
f"{nix_group.type}",
position=[start_repro],
)
nix_tag.extent = [end_repro - start_repro]
nix_tag.metadata = self.nix_file.sections[name]
# NOTE: adding refs to tag
[
nix_tag.references.extend(ref_groups.data_arrays)
for ref_groups in referencs_groups
if ref_groups.data_arrays
]
nix_group.tags.append(nix_tag)
if not self.relacs_block.groups[name].multi_tags:
log.debug(f"no multitags in repro {name}, skipping")
continue
start_repro_multi_tag = start_repro.copy()
positions = []
positions.append(np.array(start_repro))
extents = []
extents_relacsed_nix = (
self.relacs_block.groups[name].multi_tags[0].extents[:]
)
extents.append(np.array(extents_relacsed_nix[0]))
index_multi_tag = 0
while start_repro_multi_tag < end_repro:
log.debug(f"{start_repro_multi_tag}")
start_repro_multi_tag = time_ttl[
peaks_ttl[
(
time_ttl[peaks_ttl]
> start_repro_multi_tag
+ extents_relacsed_nix[index_multi_tag]
)
& (
time_ttl[peaks_ttl]
< start_repro_multi_tag
+ extents_relacsed_nix[index_multi_tag]
+ 2
)
]
]
if start_repro_multi_tag.size == 0:
log.debug("Did not find any peaks for new start multi tag")
log.debug(
f"Differenz to end of repro {end_repro - (positions[-1] + extents_relacsed_nix[index_multi_tag])}"
)
break
if start_repro_multi_tag.size > 1:
if np.all(np.diff(start_repro_multi_tag) > 0.005):
log.error("Wrong end point in end of repro")
log.error(f"Repro_name: {name[repro_index]}")
log.error(f"multitag_index: {index_multi_tag}")
log.error(f"max_value {np.max(start_repro_multi_tag)}")
exit(1)
start_repro_multi_tag = np.mean(start_repro_multi_tag)
else:
start_repro_multi_tag = start_repro_multi_tag[0]
if end_repro - start_repro_multi_tag < 1:
log.debug("Posssible endpoint detected")
log.debug(
f"Differenz to repro end: {end_repro - start_repro_multi_tag}"
)
break
positions.append(np.array(start_repro_multi_tag))
extents.append(extents_relacsed_nix[index_multi_tag])
positions = np.array(positions).reshape(len(positions), 1)
extents = np.array(extents)
if positions.size != extents.size:
log.error("Calcualted positions and extents do not match ")
log.error(
f"Shape positions {positions.shape}, shape extents {extents.shape}"
)
if positions.shape != extents.shape:
log.error("Shape of Calculated positions and extents do not match")
log.error(
f"Shape positions {positions.shape}, shape extents {extents.shape}"
)
embed()
exit()
nix_mtag = self.block.create_multi_tag(
f"{nix_group.name}",
f"{nix_group.type}",
positions=positions,
extents=extents,
)
nix_mtag.metadata = self.nix_file.sections[name]
# NOTE: adding refs to mtag
[
nix_mtag.references.extend(ref_groups.data_arrays)
for ref_groups in referencs_groups
if ref_groups.data_arrays
]
nix_group.multi_tags.append(nix_mtag)
def plot_stimulus(self):
ttl_oeph = self.block.data_arrays["ttl-line"][:]
time_index = np.arange(len(ttl_oeph))
peaks_ttl = time_index[(np.roll(ttl_oeph, 1) < 2) & (ttl_oeph > 2)]
stimulus_oeph = self.block.data_arrays["stimulus"]
stimulus = self.relacs_block.data_arrays["GlobalEFieldStimulus"]
plt.plot(np.arange(stimulus.size) / 20_000.0, stimulus, label="relacs-stimulus")
plt.plot(
np.arange(len(stimulus_oeph)) / 30_000.0,
stimulus_oeph[:],
label="open-ephys",
)
plt.plot(
np.arange(len(ttl_oeph)) / 30_000.0,
ttl_oeph[:],
label="ttl-line",
)
plt.scatter(
np.arange(len(ttl_oeph))[peaks_ttl] / 30_000.0,
ttl_oeph[peaks_ttl],
label="detected peaks",
color="red",
zorder=100,
)
plt.legend(loc="upper right")
plt.show()
def close(self): def close(self):
self.dataset.close()
self.nix_file.close() self.nix_file.close()
self.relacs_nix_file.close() self.relacs_nix_file.close()
# if __name__ == "__main__":
# create_nix = CreateNix()
# create_nix.create_repros_automatically()
# create_nix.plot_stimulus()
# # create_nix.create_repros_from_config_file()