import numpy as np import datajoint as dj import nixio as nix import os import glob from util import read_info_file, find_key_recursive, deep_get from IPython import embed import datetime as dt data_dir = 'data' schema = dj.schema("fish_book", locals()) @schema class Dataset(dj.Manual): definition = """ # Datasets dataset_id : varchar(256) ---- data_source : varchar(512) # path to the dataset experimenter : varchar(512) recording_date : date has_nix : bool """ @staticmethod def get_template_tuple(id=None): if id is not None: d = dict((Dataset() & {"dataset_id": id}).fetch1()) return d return dict(dataset_id=None, data_source="", experimenter="", recording_date=None, has_nix=False) @staticmethod def get_nix_file(key): dset = (Dataset() & key).fetch1() if dset["ignore"]: return None file_path = os.path.join(dset["data_source"], dset["dataset_id"] + ".nix") if not (os.path.exists(file_path)): print("\t No nix file found for path: %s" % dset["data_source"]) return None if not Dataset.check_file_integrity(file_path): return None return file_path @staticmethod def check_file_integrity(nix_file): sane = True try: f = nix.File.open(nix_file, nix.FileMode.ReadOnly) b = f.blocks[0] m = b.metadata if "Recording" not in m.sections: Warning("\t Could not find Recording section in dataset: %s" % nix_file) sane = False f.close() except (): print("file: %s is NOT SANE!") sane = False return sane def read_info(info_file): if not os.path.exists(info_file): return None, None, False has_nix = len(glob.glob(os.path.sep.join(info_file.split(os.path.sep)[:-1]) + os.path.sep + "*.nix")) > 0 info = read_info_file(info_file) p = [] find_key_recursive(info, "Experimenter", p) if len(p) > 0: exp = deep_get(info, p) p = [] find_key_recursive(info, "Date", p) if len(p) > 0: rec_date = dt.date.fromisoformat(deep_get(info, p)) return exp, rec_date, has_nix def populate_datasets(data_path): print("Importing dataset %s" % data_path) if not os.path.exists(data_path): return dset_name = os.path.split(data_path)[-1] experimenter, rec_date, has_nix = read_info(os.path.join(data_path, 'info.dat')) if not experimenter: return inserts = Dataset.get_template_tuple() inserts["dataset_id"] = dset_name inserts["data_source"] = data_path inserts["experimenter"] = experimenter inserts["recording_date"] = rec_date inserts["has_nix"] = has_nix Dataset().insert1(inserts, skip_duplicates=True) if __name__ == "__main__": datasets = glob.glob('/data/apteronotus/2018-05-08*') for d in datasets: populate_datasets(d)