from functools import reduce import numpy as np import nixio as nix import re import os import glob import datetime as dt import subprocess from IPython import embed def read_info_file(file_name): """ Reads the info file and returns the stored metadata in a dictionary. The dictionary may be nested. @param file_name: The name of the info file. @return: dictionary, the stored information. """ root = {} try: with open(file_name, 'r') as f: lines = f.readlines() except UnicodeDecodeError: print("Replacing experimenter!!!") command = "sudo sed -i '/Experimenter/c\# Experimenter: Anna Stoeckl' %s" % file_name subprocess.check_call(command, shell=True) with open(file_name, 'r') as f: lines = f.readlines() for l in lines: if not l.startswith("#"): continue l = l.strip("#").strip() if len(l) == 0: continue if not ": " in l: # subsection sec = {} root[l[:-1] if l.endswith(":") else l] = sec else: parts = l.split(': ') sec[parts[0].strip()] = parts[1].strip('"').strip() return root def parse_metadata_line(line): if not line.startswith("#"): return None, None line = line.strip("#").strip() parts = line.split(":") if len(parts) == 0: return None, None if len(parts) == 1 or len(parts[-1].strip()) == 0: return parts[0].strip(), None else: return parts[0].strip(), parts[-1].strip() def has_signal(line, col_names): """ Checks whether a signal/stimulus was given in the line. :param line: the current line of the data table :param col_names: The names of the table header columns :return: whether or not any of the signal entries is not empty ("-") """ values = line.split() for i, n in enumerate(col_names): if n.lower() == "signal" and i < len(values): if len(values[i].strip()) > 0 and values[i].strip()[0] != "-": return True return False def parse_table(lines, start_index): """ :param lines: :param start_index: :return: """ data_indices = {} stim_count = 0 names = re.split(r'\s{2,}', lines[start_index + 3][1:].strip()) while start_index < len(lines): l = lines[start_index].strip() if l.startswith("#"): # ignore start_index += 1 elif len(l) > 0: if stim_count == 0 and (has_signal(l, names)): data_indices[stim_count] = l.split()[0] stim_count += 1 elif stim_count > 0: data_indices[stim_count] = l.split()[0] stim_count += 1 start_index += 1 else: start_index += 1 break return data_indices, start_index def read_stimuli_file(dataset): repro_settings = [] stimulus_indices = [] settings = {} with open(os.path.join(dataset, 'stimuli.dat'), 'r') as f: lines = f.readlines() index = 0 current_section = None current_section_name = "" while index < len(lines): l = lines[index].strip() if len(l) == 0: index += 1 elif l.startswith("#") and "key" not in l.lower(): name, value = parse_metadata_line(l) if not name: continue if name and not value: if current_section: settings[current_section_name] = current_section.copy() current_section = {} current_section_name = name else: current_section[name] = value index += 1 elif l.lower().startswith("#key"): # table data coming data, index = parse_table(lines, index) # we are done with this repro run stimulus_indices.append(data) settings[current_section_name] = current_section.copy() repro_settings.append(settings.copy()) current_section = None settings = {} else: # data lines, ignore them here index += 1 return repro_settings, stimulus_indices def find_key_recursive(dictionary, key, path=[]): assert(isinstance(dictionary, dict)) if key in dictionary.keys(): path.append(key) return True for k in dictionary.keys(): if isinstance(dictionary[k], dict): if find_key_recursive(dictionary[k], key, path): path.insert(-1, k) break return len(path) > 0 def deep_get(dictionary, keys, default=None): assert(isinstance(dictionary, dict)) assert(isinstance(keys, list)) return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys, dictionary) def _get_string(dictionary: dict, key:str, alt_key=None, default=None): p = [] value = default find_key_recursive(dictionary, key, p) if len(p) > 0: value = deep_get(dictionary, p, default) elif alt_key: find_key_recursive(dictionary, alt_key, p) value = deep_get(dictionary, p, default) if default and value != default and isinstance(value, dict): value = default return value def _get_date(dictionary: dict, key: str, alt_key=None, default=None): p = [] value = default find_key_recursive(dictionary, key, p) if len(p) > 0: value = dt.date.fromisoformat(deep_get(dictionary, p, default)) elif alt_key: find_key_recursive(dictionary, alt_key, p) value = dt.date.fromisoformat(deep_get(dictionary, p, default)) if value != default and isinstance(value, dict): value = default return value def read_dataset_info(info_file): exp = "" quality = "" comment = "" rec_date = None has_nix = False setup = "" rec_duration = 0.0 if not os.path.exists(info_file): return exp, rec_date, quality, comment, has_nix, rec_duration, setup has_nix = len(glob.glob(os.path.sep.join(info_file.split(os.path.sep)[:-1]) + os.path.sep + "*.nix")) > 0 info = read_info_file(info_file) p = [] exp = _get_string(info, "Experimenter") rec_date = _get_date(info, "Date") quality = _get_string(info, "Recording quality") comment = _get_string(info, "Comment", default="") rec_duration = _get_string(info, "Recording duration", "Recording duratio", default=0.0) if rec_duration != 0.0 and isinstance(rec_duration, str) and "min" in rec_duration: rec_duration = rec_duration[:-3] elif isinstance(rec_duration, dict): rec_duration = 0.0 setup_info = _get_string(info, "Setup", default=None) if setup_info and isinstance(setup_info, dict): setup = _get_string(setup_info, "Identifier") return exp, rec_date, quality, comment, has_nix, rec_duration, setup def nix_metadata_to_dict(section): info = {} for p in section.props: info[p.name] = [v.value for v in p.values] for s in section.sections: info[s.name] = nix_metadata_to_dict(s) return info def nix_metadata_to_yaml(section, cur_depth=0, val_count=1): assert(isinstance(section, nix.section.SectionMixin)) yaml = "%s%s:\n" % ("\t" * cur_depth, section.name) for p in section.props: val_str = "" if val_count > 1 and len(p.values) > 1: val_str = "[" + ', '.join([v.to_string() for v in p.values]) + "]" elif len(p.values) == 1: val_str = p.values[0].to_string() yaml += "%s%s: %s\n" % ("\t" * (cur_depth+1), p.name, val_str) for s in section.sections: yaml += nix_metadata_to_yaml(s, cur_depth+1) return yaml def find_mtags_for_tag(block, tag): """ Finds those multi tags and the respective positions within that match to a certain repro run. @:returns list of mtags, list of mtag positions """ assert(isinstance(block, nix.pycore.block.Block)) assert(isinstance(tag, nix.pycore.tag.Tag)) mtags = [] indices = [] tag_start = np.atleast_1d(tag.position) tag_end = tag_start + np.atleast_1d(tag.extent) for mt in block.multi_tags: position_count = mt.positions.shape[0] in_tag_positions = [] for i in range(position_count): mt_start = np.atleast_1d(mt.positions[i, :]) mt_end = mt_start + np.atleast_1d(mt.extents[i, :]) for j in range(len(tag_start)): if mt_start[j] >= tag_start[j] and mt_end[j] <= tag_end[j]: in_tag_positions.append(i) if len(in_tag_positions) > 0: mtags.append(mt) indices.append(in_tag_positions) return mtags, indices def mtag_settings_to_yaml(mtag, pos_index): assert(isinstance(mtag, nix.pycore.multi_tag.MultiTag)) assert(0 <= pos_index < mtag.positions.shape[0]) yaml = "" if mtag.metadata is not None: yaml = nix_metadata_to_yaml(mtag.metadata) for i in range(len(mtag.features)): feat = mtag.features[i] feat_data = mtag.retrieve_feature_data(pos_index, i) if len(feat_data.shape) == 1: feat_name = feat.data.label if feat.data.label and len(feat.data.label) > 0 else feat.data.name feat_unit = feat.data.unit if feat.data.unit and len(feat.data.unit) > 0 else "" if feat_data.shape[0] == 1: feat_content = "%s %s" % (feat_data[0], feat_unit) else: feat_content = "[" + ','.join(map(str, feat_data[:])) + "] %s" % feat_unit yaml += "\t%s: %s\n" % (feat_name, feat_content) return yaml if __name__ == "__main__": """ nix_file = "../../science/high_freq_chirps/data/2018-11-09-aa-invivo-1/2018-11-09-aa-invivo-1.nix" f = nix.File.open(nix_file, nix.FileMode.ReadOnly) b = f.blocks[0] yml = nix_metadata_to_yaml(b.tags[0].metadata) print(yml) print("-"* 80) print(nix_metadata_to_yaml(b.metadata)) embed() f.close() """ dataset = "/Users/jan/zwischenlager/2012-03-23-ad" settings = read_stimuli_file(os.path.join(dataset, "stimuli.dat")) embed()