This commit is contained in:
wendtalexander
2025-10-06 15:57:36 +02:00
commit 423754ece3
8 changed files with 1436 additions and 0 deletions

18
oephys2nix/logging.py Normal file
View File

@@ -0,0 +1,18 @@
import logging
from rich.logging import RichHandler
DEFAULT_LOG_LEVEL = "DEBUG"
def setup_logging(logger: logging.Logger, level=DEFAULT_LOG_LEVEL):
if logger.hasHandlers():
logger.handlers.clear()
stream_handler = RichHandler(rich_tracebacks=True, show_path=False)
stream_handler.setLevel(level)
fmt_shell = "%(filename)s:%(lineno)d - %(message)s"
shell_formatter = logging.Formatter(fmt_shell)
stream_handler.setFormatter(shell_formatter)
logger.addHandler(stream_handler)
logger.setLevel(level)
logging.getLogger(__name__).info(f"Logging configured with level {level}.")

76
oephys2nix/main.py Normal file
View File

@@ -0,0 +1,76 @@
import logging
from pathlib import Path
import nixio
import typer
from IPython import embed
from rich.console import Console
from oephys2nix.logging import setup_logging
app = typer.Typer()
log = logging.getLogger(__name__)
setup_logging(log, level="DEBUG")
console = Console()
@app.command()
def main(
data_path: Path = typer.Argument(
...,
help="The source directory containing the Open Ephys data.",
exists=True,
file_okay=False,
dir_okay=True,
readable=True,
resolve_path=True,
),
ttl: bool = typer.Option(
default=True, help="For recordings that did not have a ttl pulse"
),
overwrite: bool = typer.Option(default=True, help="Overwrites nix file"),
):
"""
Combines open ephys data with relacs data from data_path to a new nix file
"""
log.info(f"Selected data_path is {data_path}")
open_ephys_data_paths = list(Path(data_path).rglob("*open-ephys"))
relacs_data_paths = list(Path(data_path).rglob("*relacs"))
if not len(open_ephys_data_paths) == len(relacs_data_paths):
log.error(
f"Missmatch in data directory found open-ephys data {len(open_ephys_data_paths)} and relacs data {len(relacs_data_paths)}"
)
log.error("Please check if both are present")
raise typer.Exit()
for open_ephys, relacs in zip(
open_ephys_data_paths, relacs_data_paths, strict=True
):
nix_file_name = relacs.name.split("_")[0] + "-recording.nix"
console.print(
f"Converting [bold cyan]{open_ephys.name}[/bold cyan] and "
f"[bold magenta]{relacs.name}[/bold magenta] -> "
f"[green]{nix_file_name}[/green]"
)
nix_path = relacs.parent / nix_file_name
if nix_path.exists and nix_path.is_file():
if overwrite:
log.warning("Overwriting nix file")
nix_files = nixio.File(str(nix_file_name), nixio.FileMode.Overwrite)
else:
log.error(
"Converted nix file already exits, and Overwrite is not enabled"
)
raise typer.Exit()
else:
log.debug("Creating nix file")
nix_files = nixio.File(str(nix_file_name), nixio.FileMode.Overwrite)
if __name__ == "__main__":
app()

28
oephys2nix/metadata.py Normal file
View File

@@ -0,0 +1,28 @@
import nixio
def create_metadata_from_dict(d: dict, section: nixio.Section) -> None:
for key, value in d.items():
if isinstance(value, dict):
new_sec = section.create_section(key, f"{type(key)}")
create_metadata_from_dict(value, new_sec)
else:
try:
section.create_property(key, values_or_dtype=value)
except TypeError:
if isinstance(value, list):
value = [str(i) for i in value]
else:
value = str(value)
section.create_property(key, values_or_dtype=value)
def create_dict_from_section(section: nixio.Section) -> dict:
d = {}
for key, value in section.items():
if isinstance(value, nixio.Section):
subdict = create_dict_from_section(value)
d[key] = subdict
else:
d[key] = value.values
return d

711
oephys2nix/tonix.py Normal file
View File

@@ -0,0 +1,711 @@
import logging
import pathlib
import tomllib
import matplotlib.pyplot as plt
import nixio
import numpy as np
import rlxnix as rlx
from IPython import embed
from neo.io import OpenEphysBinaryIO
from nixio.exceptions import DuplicateName
from scipy import signal
log = logging.getLogger(__name__)
setup_logging(log, level="DEBUG")
class CreateNix:
def __init__(self, open_ephys_path, relacs_nix_path, nix_file):
self.relacs_nix_file = nixio.File.open(relacs_nix_path, nixio.FileMode.ReadOnly)
self.dataset = rlx.Dataset(relacs_nix_path)
self.relacs_block = self.relacs_nix_file.blocks[0]
self.relacs_sections = self.relacs_nix_file.sections
self.spike_sorter_path = str(self.recording_path.resolve() / "spikesorter.nix")
self.neo_data = OpenEphysBinaryIO(open_ephys_path).read(lazy=True)
self.nix_file = nix_file
self.block = self.nix_file.blocks[0]
def _load_config(self):
config_path = self.recording_path / "stimulus_config.toml"
if not config_path.is_file():
log.debug("Stimulus config was not found")
with open(config_path, "rb") as config_file:
try:
config = tomllib.load(config_file)
config_file.close()
return config
except tomllib.TOMLDecodeError as e:
config_file.close()
raise tomllib.TOMLDecodeError(f"Error parsing TOML file: {e}")
def _append_relacs_tag_mtags(self):
for t in self.relacs_block.tags:
log.debug(f"Appending relacs tags {t.name}")
tag = self.block.create_tag(f"relacs_{t.name}", t.type, position=t.position)
tag.extent = t.extent
tag.references.extend(self.block.groups["relacs"].data_arrays)
sec = self.relacs_sections[f"{t.name}"]
d = create_dict_from_section(sec)
try:
new_sec = self.nix_file.create_section(sec.name, sec.type)
create_metadata_from_dict(d, new_sec)
except DuplicateName:
pass
tag.metadata = self.nix_file.sections[sec.name]
for t in self.relacs_block.multi_tags:
log.debug(f"Appending relacs multi-tags {t.name}")
mtag = self.block.create_multi_tag(
f"relacs_{t.name}",
t.type,
positions=t.positions[:],
extents=t.extents[:],
)
mtag.references.extend(self.block.groups["relacs"].data_arrays)
sec = self.relacs_sections[f"{t.name}"]
d = create_dict_from_section(sec)
try:
new_sec = self.nix_file.create_section(sec.name, sec.type)
create_metadata_from_dict(d, new_sec)
except DuplicateName:
pass
mtag.metadata = self.nix_file.sections[sec.name]
def _find_peak_ttl(self, time_ttl, peaks_ttl, lower, upper):
peak = time_ttl[
peaks_ttl[(time_ttl[peaks_ttl] > lower) & (time_ttl[peaks_ttl] < upper)]
]
if not peak.size > 0:
log.error("No peaks found")
elif peak.size > 1:
if np.all(np.diff(peak) > 0.5):
log.error("Peaks are further aways than 0.5 seconds")
log.error(f"Peaks {peak}, Furthest aways: {np.max(peak)}")
peak = np.mean(peak)
return peak
def _find_peak_ttl_index(self, time_ttl, peaks_ttl, current_position):
new_repro_start_index = peaks_ttl[
(time_ttl[peaks_ttl] > current_position - 0.1)
& (time_ttl[peaks_ttl] < current_position + 0.1)
]
# if multiple take the last one
if new_repro_start_index.size > 1:
log.warning("Multiple current positions taking the last index")
new_repro_start_index = new_repro_start_index[-1]
# log.debug(f"Time current repro/trial {time_ttl[new_repro_start_index]}")
if np.where(new_repro_start_index == peaks_ttl)[0] + 1 == len(peaks_ttl):
return np.array([])
else:
next_repro_start = peaks_ttl[
np.where(new_repro_start_index == peaks_ttl)[0] + 1
]
start_next_repro = time_ttl[next_repro_start]
log.debug(f"Start of new repro/trial {start_next_repro}")
return start_next_repro
def _reference_groups(self) -> list[nixio.Group]:
return [
self.block.groups["neuronal-data"],
# self.block.groups["spike-data"],
self.block.groups["efish"],
self.block.groups["relacs"],
]
def _append_mtag(self, repro, positions, extents):
try:
nix_mtag = self.block.create_multi_tag(
f"{repro.name}",
"relacs.stimulus",
positions=positions,
extents=extents,
)
except DuplicateName:
del self.block.multi_tags[repro.name]
nix_mtag = self.block.create_multi_tag(
f"{repro.name}",
"relacs.stimulus",
positions=positions,
extents=extents,
)
sec = self.relacs_sections[f"{repro.name}"]
d = create_dict_from_section(sec)
try:
new_sec = self.nix_file.create_section(sec.name, sec.type)
create_metadata_from_dict(d, new_sec)
except DuplicateName:
del self.nix_file.sections[sec.name]
new_sec = self.nix_file.create_section(sec.name, sec.type)
create_metadata_from_dict(d, new_sec)
nix_mtag.metadata = self.nix_file.sections[repro.name]
[
nix_mtag.references.extend(ref_groups.data_arrays)
for ref_groups in self._reference_groups
if ref_groups.data_arrays
]
try:
nix_group = self.block.create_group(repro.name, repro.type)
except DuplicateName:
nix_group = self.nix_file.blocks[0].groups[repro.name]
nix_group.multi_tags.append(nix_mtag)
def _append_tag(self, repro, position, extent):
try:
nix_tag = self.block.create_tag(
f"{repro.name}",
"relacs.repro_run",
position=np.array(position).flatten(),
)
nix_tag.extent = extent.flatten()
except DuplicateName:
del self.block.tags[repro.name]
nix_tag = self.block.create_tag(
f"{repro.name}",
"relacs.repro_run",
position=np.array(position).flatten(),
)
nix_tag.extent = extent.flatten()
sec = self.relacs_sections[f"{repro.name}"]
d = create_dict_from_section(sec)
try:
new_sec = self.nix_file.create_section(sec.name, sec.type)
create_metadata_from_dict(d, new_sec)
except DuplicateName:
del self.nix_file.sections[sec.name]
new_sec = self.nix_file.create_section(sec.name, sec.type)
create_metadata_from_dict(d, new_sec)
nix_tag.metadata = self.nix_file.sections[repro.name]
# NOTE: adding refs to tag
[
nix_tag.references.extend(ref_groups.data_arrays)
for ref_groups in self._reference_groups
if ref_groups.data_arrays
]
try:
nix_group = self.block.create_group(repro.name, repro.type)
except DuplicateName:
nix_group = self.nix_file.blocks[0].groups[repro.name]
nix_group.tags.append(nix_tag)
def create_repros_automatically(self):
ttl_oeph = self.block.data_arrays["ttl-line"][:]
time_ttl = np.arange(len(ttl_oeph)) / self.cfg.open_ephys.samplerate
time_index = np.arange(len(ttl_oeph))
# Threshold for TTL peak
threshold = 2
peaks_ttl = time_index[
(np.roll(ttl_oeph, 1) < threshold) & (ttl_oeph > threshold)
]
# WARNING:Check if peaks are duplicates or near each other
close_peaks = np.where(np.diff(peaks_ttl) == 1)[0]
if close_peaks.size > 0:
peaks_ttl = np.delete(peaks_ttl, close_peaks)
# NOTE: Get the first peak from the toml file
first_peak = self._find_peak_ttl(
time_ttl,
peaks_ttl,
self.stimulus_config["stimulus"]["start_repro_low"],
self.stimulus_config["stimulus"]["start_repro_high"],
)
current_position = np.asarray(first_peak.reshape(1))
# NOTE: Start defined by config
for i, repro in enumerate(
self.dataset.repro_runs()[
self.stimulus_config["stimulus"]["repro_number_start"] :
],
start=self.stimulus_config["stimulus"]["repro_number_start"],
):
log.debug(repro.name)
log.debug(f"Current Position {current_position.item()}")
if repro.duration < 1.0:
log.warning(f"Skipping repro {repro.name} because it is two short")
continue
if repro.stimuli:
log.debug("Processing MultiTag")
repetition = len(repro.stimuli)
extents_mtag = np.zeros((repetition, 1))
position_mtags = np.zeros((repetition, 1))
for trial, stimulus in enumerate(repro.stimuli):
duration_trial = stimulus.duration
extents_mtag[trial] = duration_trial
position_mtags[trial] = current_position
current_position = self._find_peak_ttl_index(
time_ttl, peaks_ttl, current_position
)
# current_position = position_mtags[-1]
self._append_mtag(repro, position_mtags, extents_mtag)
extent = position_mtags[-1] + extents_mtag[-1] - position_mtags[0]
self._append_tag(repro, position_mtags[0], extent)
if not current_position.size > 0:
log.info("Finishing writing")
log.info("Closing nix files")
self.close()
exit()
else:
if i == 0 and "BaselineActivity" in repro.name:
self._append_tag(repro, 0.0, current_position)
continue
last_repro_name = self.dataset.repro_runs()[i - 1].name
last_repro_position = (
self.block.groups[last_repro_name].tags[0].position[0]
+ self.block.groups[last_repro_name].tags[0].extent[0]
)
self._append_tag(
repro,
last_repro_position.reshape(-1, 1),
(current_position - last_repro_position).reshape(-1, 1),
)
self.close()
#
#
# # NOTE: Create a repro tag
# t = self.block.create_tag(
# f"{repro.name}",
# f"{repro.type}",
# position=current_position,
# )
# t.metadata = repro.metadata
#
# for i, group in enumerate(
# list(self.relacs_block.groups)[
# self.stimulus_config["stimulus"]["repro_number_start"] :
# ],
# start=self.stimulus_config["stimulus"]["repro_number_start"],
# ):
# log.debug(group.name)
# log.debug(current_position)
#
# nix_group = self.block.groups[group.name]
# if group.name == "BaselineActivity_1":
# first_peak = 0.0
# current_position = np.array(first_peak).reshape(1)
#
# # if no multi_tag exists just a tag
# if not group.multi_tags:
# log.debug(f"Creating Tag {group.name}")
# t = self.block.create_tag(
# f"{nix_group.name}",
# f"{nix_group.type}",
# position=current_position,
# )
# t.metadata = self.nix_file.sections[group.name]
#
# # searching for the end of the repro in the ttl channel
# start_next_repro = time_ttl[
# peaks_ttl[
# (
# time_ttl[peaks_ttl]
# > group.tags[0].extent + current_position - 1
# )
# & (
# time_ttl[peaks_ttl]
# < group.tags[0].extent + current_position + 1
# )
# ]
# ]
# log.debug(start_next_repro)
# if start_next_repro.size > 1:
# if np.all(np.diff(start_next_repro) > 0.5):
# log.error("Wrong end point in end of repro")
# log.error(f"{start_next_repro}, {np.max(start_next_repro)}")
# start_next_repro = np.mean(start_next_repro)
#
# if not start_next_repro:
# log.error(f"No Endpoint found for repro {group.name}")
# embed()
# exit()
# t.extent = (start_next_repro - t.position[0]).reshape(1)
# # add references to the tag
# [
# t.references.extend(ref_group.data_arrays)
# for ref_group in referencs_groups
# if ref_group.data_arrays
# ]
# nix_group.tags.append(t)
# current_position = start_next_repro.reshape(1)
#
# else:
# # NOTE: If repro has multitags
# log.debug(f"Creating Multi Tag {group.name}")
# test_mul_tags = group.multi_tags[0].positions[:]
# test_position_tag, test_extent_tag = (
# group.tags[0].position[0],
# group.tags[0].extent[0],
# )
# len_mtag = len(
# test_mul_tags[
# (test_mul_tags >= test_position_tag)
# & (test_mul_tags <= (test_position_tag + test_extent_tag))
# ]
# )
# extents_mtag = group.multi_tags[0].extents[:][
# (test_mul_tags >= test_position_tag)
# & (test_mul_tags <= (test_position_tag + test_extent_tag))
# ]
# position_mtags = np.zeros((len_mtag, 1))
# position_mtags[0] = current_position
#
# for j, extents in enumerate(extents_mtag[1:], start=1):
# next_repro_position = time_ttl[
# peaks_ttl[
# (
# time_ttl[peaks_ttl]
# > extents + current_position - extents / 2
# )
# & (time_ttl[peaks_ttl] < extents + current_position + 2)
# ]
# ]
# if next_repro_position.size == 0:
# log.error(f"next start of MultiTag not found")
# embed()
# exit()
#
# if next_repro_position.size > 1:
# if np.all(np.diff(next_repro_position) > 0.005):
# log.error("Wrong end point in end of repro")
# log.error(
# f"{next_repro_position}, {np.max(next_repro_position)}"
# )
# embed()
# exit()
# exit(1)
# next_repro_position = np.mean(next_repro_position)
# log.debug(f"{j}, {next_repro_position}")
#
# position_mtags[j] = next_repro_position
# current_position = next_repro_position
#
# start_next_repro = time_ttl[
# peaks_ttl[
# (time_ttl[peaks_ttl] > current_position + nix_mtag.extents[-1])
# & (
# time_ttl[peaks_ttl]
# < current_position + nix_mtag.extents[-1] + 2
# )
# ]
# ]
# if not start_next_repro.size == 0:
# if start_next_repro.size > 1:
# if np.all(np.diff(start_next_repro) > 0.005):
# log.error("Wrong end point in end of repro")
# log.error(f"{start_next_repro}, {np.max(start_next_repro)}")
# embed()
# exit(1)
#
# start_next_repro = np.mean(start_next_repro)
#
# else:
# log.debug(
# "No ttl pluse foud for the end of the repro create time point"
# )
# log.debug("Taking the next ttl pulse")
# new_repro_start_index = peaks_ttl[
# (time_ttl[peaks_ttl] > current_position - 1)
# & (time_ttl[peaks_ttl] < current_position + 2)
# ]
#
# # if multiple take the last one
# if new_repro_start_index.size > 1:
# new_repro_start_index = new_repro_start_index[-1]
#
# log.debug(time_ttl[new_repro_start_index])
# try:
# next_repro_start = peaks_ttl[
# np.where(new_repro_start_index == peaks_ttl)[0] + 1
# ]
# except IndexError:
# if np.where(new_repro_start_index == peaks_ttl)[0] + 1 == len(
# peaks_ttl
# ):
# log.debug("Finishing writing")
# self.nix_file.close()
# self.relacs_nix_file.close()
# exit(1)
# if next_repro_start.size > 1:
# log.debug("Start of next Repro has multiple ttl pulse")
# embed()
# exit()
#
# # check if the time difference is greater and 5 seconds
# if (
# not time_ttl[next_repro_start] - time_ttl[new_repro_start_index]
# > 5
# ):
# log.error("Wrong endpoint for repro")
# exit(1)
# start_next_repro = time_ttl[next_repro_start]
# log.info(f"Start of new Repro {start_next_repro}")
#
# current_position = start_next_repro
#
# # NOTE: If SAM or Baseline we dont have to find the next current position
# if (
# group.name.split("_")[0] == "SAM"
# or group.name.split("_")[0] == "BaselineActivity"
# # or group.name.split("_")[0] == "FileStimulus"
# ):
# current_position = current_position.reshape(1)
# log.info(f"Start of new Repro {current_position}")
# continue
#
# # peaks_repro_index = peaks_ttl[
# # (time_ttl[peaks_ttl] > current_position - 1)
# # & (time_ttl[peaks_ttl] < current_position + 2)
# # ]
# #
# # # if multiple take the last one
# # if peaks_repro_index.size > 1:
# # peaks_repro_index = peaks_repro_index[-1]
# #
# # log.debug(time_ttl[peaks_repro_index])
# # next_repro_start = peaks_ttl[
# # np.where(peaks_repro_index == peaks_ttl)[0] + 1
# # ]
# # if next_repro_start.size > 1:
# # embed()
# #
# # # check if the time difference is greater and 5 seconds
# # if not time_ttl[next_repro_start] - time_ttl[peaks_repro_index] > 5:
# # log.error("Wrong endpoint for repro")
# # embed()
# # exit()
# # current_position = time_ttl[next_repro_start]
# # log.info(f"Start of new Repro {current_position}")
# #
# # current_position = current_position.reshape(1)
#
# self.nix_file.close()
# self.relacs_nix_file.close()
def create_repros_from_config_file(
self,
):
ttl_oeph = self.block.data_arrays["ttl-line"][:]
peaks_ttl = signal.find_peaks(
ttl_oeph.flatten(),
**self.stimulus_config["stimulus"]["peak_detection"],
)[0]
time_ttl = np.arange(len(ttl_oeph)) / self.cfg.open_ephys.samplerate
number_of_repros = len(self.stimulus_config["repros"]["name"])
referencs_groups = [
self.block.groups["neuronal-data"],
self.block.groups["spike-data"],
self.block.groups["efish"],
]
for repro_index in range(number_of_repros):
name = self.stimulus_config["repros"]["name"][repro_index]
log.debug(name)
start = np.array(self.stimulus_config["repros"]["start"][repro_index])
end = np.array(self.stimulus_config["repros"]["end"][repro_index])
nix_group = self.block.groups[name]
if start.size > 1:
start_repro = time_ttl[
peaks_ttl[
(time_ttl[peaks_ttl] > start[0])
& (time_ttl[peaks_ttl] < start[1])
]
]
if start_repro.size > 1:
if np.all(np.diff(start_repro) > 0.005):
log.error("Wrong end point in end of repro")
log.error(f"{name[repro_index]}, {np.max(start_repro)}")
exit(1)
start_repro = np.mean(start_repro)
else:
start_repro = start_repro[0]
else:
start_repro = start[0]
if end.size > 1:
end_repro = time_ttl[
peaks_ttl[
(time_ttl[peaks_ttl] > end[0]) & (time_ttl[peaks_ttl] < end[1])
]
]
if end_repro.size > 1:
if np.all(np.diff(end_repro) > 0.005):
log.error("Wrong end point in end of repro")
log.error(f"{name[repro_index]}, {np.max(end_repro)}")
exit(1)
end_repro = np.mean(end_repro)
else:
end_repro = end[0]
nix_tag = self.block.create_tag(
f"{nix_group.name}",
f"{nix_group.type}",
position=[start_repro],
)
nix_tag.extent = [end_repro - start_repro]
nix_tag.metadata = self.nix_file.sections[name]
# NOTE: adding refs to tag
[
nix_tag.references.extend(ref_groups.data_arrays)
for ref_groups in referencs_groups
if ref_groups.data_arrays
]
nix_group.tags.append(nix_tag)
if not self.relacs_block.groups[name].multi_tags:
log.debug(f"no multitags in repro {name}, skipping")
continue
start_repro_multi_tag = start_repro.copy()
positions = []
positions.append(np.array(start_repro))
extents = []
extents_relacsed_nix = (
self.relacs_block.groups[name].multi_tags[0].extents[:]
)
extents.append(np.array(extents_relacsed_nix[0]))
index_multi_tag = 0
while start_repro_multi_tag < end_repro:
log.debug(f"{start_repro_multi_tag}")
start_repro_multi_tag = time_ttl[
peaks_ttl[
(
time_ttl[peaks_ttl]
> start_repro_multi_tag
+ extents_relacsed_nix[index_multi_tag]
)
& (
time_ttl[peaks_ttl]
< start_repro_multi_tag
+ extents_relacsed_nix[index_multi_tag]
+ 2
)
]
]
if start_repro_multi_tag.size == 0:
log.debug("Did not find any peaks for new start multi tag")
log.debug(
f"Differenz to end of repro {end_repro - (positions[-1] + extents_relacsed_nix[index_multi_tag])}"
)
break
if start_repro_multi_tag.size > 1:
if np.all(np.diff(start_repro_multi_tag) > 0.005):
log.error("Wrong end point in end of repro")
log.error(f"Repro_name: {name[repro_index]}")
log.error(f"multitag_index: {index_multi_tag}")
log.error(f"max_value {np.max(start_repro_multi_tag)}")
exit(1)
start_repro_multi_tag = np.mean(start_repro_multi_tag)
else:
start_repro_multi_tag = start_repro_multi_tag[0]
if end_repro - start_repro_multi_tag < 1:
log.debug("Posssible endpoint detected")
log.debug(
f"Differenz to repro end: {end_repro - start_repro_multi_tag}"
)
break
positions.append(np.array(start_repro_multi_tag))
extents.append(extents_relacsed_nix[index_multi_tag])
positions = np.array(positions).reshape(len(positions), 1)
extents = np.array(extents)
if positions.size != extents.size:
log.error("Calcualted positions and extents do not match ")
log.error(
f"Shape positions {positions.shape}, shape extents {extents.shape}"
)
if positions.shape != extents.shape:
log.error("Shape of Calculated positions and extents do not match")
log.error(
f"Shape positions {positions.shape}, shape extents {extents.shape}"
)
embed()
exit()
nix_mtag = self.block.create_multi_tag(
f"{nix_group.name}",
f"{nix_group.type}",
positions=positions,
extents=extents,
)
nix_mtag.metadata = self.nix_file.sections[name]
# NOTE: adding refs to mtag
[
nix_mtag.references.extend(ref_groups.data_arrays)
for ref_groups in referencs_groups
if ref_groups.data_arrays
]
nix_group.multi_tags.append(nix_mtag)
def plot_stimulus(self):
ttl_oeph = self.block.data_arrays["ttl-line"][:]
time_index = np.arange(len(ttl_oeph))
peaks_ttl = time_index[(np.roll(ttl_oeph, 1) < 2) & (ttl_oeph > 2)]
stimulus_oeph = self.block.data_arrays["stimulus"]
stimulus = self.relacs_block.data_arrays["GlobalEFieldStimulus"]
plt.plot(np.arange(stimulus.size) / 20_000.0, stimulus, label="relacs-stimulus")
plt.plot(
np.arange(len(stimulus_oeph)) / 30_000.0,
stimulus_oeph[:],
label="open-ephys",
)
plt.plot(
np.arange(len(ttl_oeph)) / 30_000.0,
ttl_oeph[:],
label="ttl-line",
)
plt.scatter(
np.arange(len(ttl_oeph))[peaks_ttl] / 30_000.0,
ttl_oeph[peaks_ttl],
label="detected peaks",
color="red",
zorder=100,
)
plt.legend(loc="upper right")
plt.show()
def close(self):
self.dataset.close()
self.nix_file.close()
self.relacs_nix_file.close()
# if __name__ == "__main__":
# create_nix = CreateNix()
# create_nix.create_repros_automatically()
# create_nix.plot_stimulus()
# # create_nix.create_repros_from_config_file()