[soritng] adding sorign and small fixes

This commit is contained in:
wendtalexander
2026-05-06 16:30:24 +02:00
parent 616dd721f5
commit 56e6d1c589
4 changed files with 139 additions and 6 deletions

View File

@@ -1,4 +1,6 @@
import logging
import sys
from os import path
from pathlib import Path
from typing import Annotated
@@ -9,6 +11,7 @@ from IPython import embed
from rich.console import Console
from oephys2nix.logging import setup_logging
from oephys2nix.sorting import AppendSorting
from oephys2nix.stimulus_recreation import StimulusToNix
from oephys2nix.tonix import RawToNix
@@ -17,6 +20,42 @@ log = logging.getLogger(__name__)
console = Console()
@app.command()
def append_sorting(
sorter_name: str = typer.Argument(
"sorting_analyzser",
help="The sorter name that should be appended to the generated nix file",
),
data_path: Path | None = typer.Argument(
None,
help="The source directory containing the generated recording.",
exists=True,
file_okay=False,
dir_okay=True,
readable=True,
resolve_path=True,
),
overwrite: bool = typer.Option(default=True, help="Overwrites the sorter"),
verbose: Annotated[int, typer.Option("--verbose", "-v", count=True)] = 0,
) -> None:
"""Combines open ephys data with relacs data from data_path to a new nix file."""
setup_logging(logging.getLogger("oephys2nix"), verbosity=verbose)
if data_path is None:
data_path = Path.cwd()
log.info(f"Selected path is {data_path}")
rec_data_paths = list(Path(data_path).rglob("*recording.nix"))
log.debug(rec_data_paths)
for recording in rec_data_paths:
parent = recording.parent
sorter_path = parent / sorter_name
if not (sorter_path).is_dir:
log.error(f"Could not find the sorter that was specifided in {parent}")
sys.exit(1)
sorter_cls = AppendSorting(sorter_path, recording)
@app.command()
def convert(
data_path: Path = typer.Argument(

92
oephys2nix/sorting.py Normal file
View File

@@ -0,0 +1,92 @@
import logging
import pathlib
import sys
import matplotlib.pyplot as plt
import nixio
import numpy as np
import rlxnix as rlx
import spikeinterface.core as si
from IPython import embed
from neo.io import OpenEphysBinaryIO
from nixio.exceptions import DuplicateName
from rich.console import Console
from rich.table import Table
from rlxnix.plugins.efish.utils import extract_am
from scipy import signal
from oephys2nix.metadata import create_dict_from_section, create_metadata_from_dict
log = logging.getLogger(__name__)
console = Console()
class AppendSorting:
"""Append the sorting analyzser or a sortign form spikeinterface to the created nix file.
Parameters
----------
sorter_path: pathlib.Path
Path to open-ephys recording
relacs_nix_path : str
Path to relacs nix file
nix_file : str
Path to new nix file
"""
def __init__(self, sorter_path: pathlib.Path, recording_path: pathlib.Path):
self.sorter_path = sorter_path
self.recording_path = recording_path
self.sorting = si.load_sorting_analyzer(self.sorter_path)
self.nixfile = nixio.File.open(str(self.recording_path), nixio.FileMode.ReadWrite)
self.block = self.nixfile.blocks[0]
self.das = self.block.data_arrays
self.channel_ids = si.get_template_extremum_channel(
self.sorting, mode="extremum", peak_sign="neg", outputs="index"
)
self.data = self.block.data_arrays["data"]
self.append_sorting_to_recording()
def append_sorting_to_recording(self):
try:
gr = self.block.create_group("units", "sorting.group")
except DuplicateName:
del self.block.groups["units"]
gr = self.block.create_group("units", "sorting.group")
for unit in self.sorting.unit_ids:
spike_times = self.sorting.sorting.get_unit_spike_train_in_seconds(
unit, segment_index=0
)
unit_channel = self.channel_ids[unit]
channel_tag = np.repeat(unit_channel, spike_times.shape[0])
multi_tag_positions = np.column_stack((spike_times, channel_tag))
try:
positions = self.block.create_data_array(
f"unit-{unit}", "sorting.spike_index", data=spike_times
)
positions.append_range_dimension_using_self()
except DuplicateName:
del self.das[f"unit-{unit}"]
positions = self.block.create_data_array(
f"unit-{unit}", "sorting.spike_index", data=spike_times
)
positions.append_range_dimension_using_self()
gr.data_arrays.append(positions)
try:
multi_tag = self.block.create_multi_tag(
f"unit-{unit}", "sorting.spike_index", multi_tag_positions
)
multi_tag.references.append(self.data)
except DuplicateName:
del self.block.multi_tags[f"unit-{unit}"]
del self.das[f"unit-{unit}-positions"]
multi_tag = self.block.create_multi_tag(
f"unit-{unit}", "sorting.spike_index", multi_tag_positions
)
multi_tag.references.append(self.data)
def close(self):
self.nixfile.close()

View File

@@ -407,7 +407,8 @@ class StimulusToNix:
)
if "FICurve_" in repro.name:
position_mtags += 0.2
delay = repro.stimuli[0].feature_data(1).item()
extents_mtag += delay
self._append_mtag(repro, position_mtags, extents_mtag)
extent = position_mtags[-1] + extents_mtag[-1] - position_mtags[0]

View File

@@ -79,7 +79,7 @@ class RawToNix:
efish_group = self.block.create_group("efish", "open-ephys.sampled")
efish_neo_data = self._load_neo_object("Data_ADC")
efish_neo_data = self._load_neo_object(["Data_ADC", "acquisition_board_ADC"])
for i in np.arange(len(efishs)):
log.debug(f"Appending efish traces {efishs[i]}")
@@ -96,12 +96,12 @@ class RawToNix:
)
efish_group.data_arrays.append(data_array)
def _load_neo_object(self, name: str):
def _load_neo_object(self, names: list[str]):
for sig in self.neo_data[0].segments[0].analogsignals:
if sig.name.endswith(name):
if any([sig.name.endswith(n) for n in names]):
return sig.load()
log.error(f"No {name} found in open ephys data")
log.error(f"No {names} found in open ephys data")
sys.exit(1)
def append_relacs_lines(self) -> None:
@@ -162,7 +162,7 @@ class RawToNix:
def append_raw_data(self) -> None:
"""Append Open-Ephys Raw data."""
gr = self.block.create_group("neuronal-data", "open-epyhs.sampled")
raw_neo_data = self._load_neo_object("Data")
raw_neo_data = self._load_neo_object(["Data", "acquisition_board"])
log.debug("Appending raw data")
nix_data_array = self.block.create_data_array(
@@ -175,6 +175,7 @@ class RawToNix:
nix_data_array.append_sampled_dimension(
1 / raw_neo_data.sampling_rate.magnitude, label="time", unit="s"
)
nix_data_array.append_sampled_dimension(1, label="channel")
gr.data_arrays.append(nix_data_array)
def close(self) -> None: