fishbook/database.py

683 lines
22 KiB
Python

import numpy as np
import datajoint as dj
import nixio as nix
import os
import glob
import util as ut
import uuid
import yaml
from IPython import embed
schema = dj.schema("fish_book_new", locals())
@schema
class Datasets(dj.Manual):
definition = """ # _Dataset
dataset_id : varchar(256)
----
data_source : varchar(512) # path to the dataset
experimenter : varchar(512)
setup : varchar(128)
recording_date : date
quality : varchar(512)
comment : varchar(1024)
duration : float
has_nix : bool
"""
@staticmethod
def get_template_tuple(id=None):
if id is not None:
d = dict((Datasets() & {"dataset_id": id}).fetch1())
return d
return dict(dataset_id=None, data_source="", experimenter="", setup="", recording_date=None,
quality="", comment="", duration=0.0, has_nix=False)
@staticmethod
def get_nix_file(key):
dset = (Datasets() & key).fetch1()
if dset["ignore"]:
return None
file_path = os.path.join(dset["data_source"], dset["dataset_id"] + ".nix")
if not (os.path.exists(file_path)):
print("\t No nix file found for path: %s" % dset["data_source"])
return None
if not Datasets.check_file_integrity(file_path):
return None
return file_path
@staticmethod
def check_file_integrity(nix_file):
sane = True
try:
f = nix.File.open(nix_file, nix.FileMode.ReadOnly)
b = f.blocks[0]
m = b.metadata
if "Recording" not in m.sections:
Warning("\t Could not find Recording section in dataset: %s" % nix_file)
sane = False
f.close()
except ():
f = None
print("file: %s is NOT SANE!")
sane = False
return sane
@staticmethod
def datasets(min_duration=None, experimenter=None, quality=None):
dsets = Datasets
if min_duration:
dsets = dsets & "duration > %.2f" % min_duration
if experimenter:
dsets = dsets & dict(experimenter=experimenter)
if quality:
dsets = dsets & "quality like '{0:s}'".format(quality)
return [Dataset(tuple=d) for d in dsets]
class Dataset:
def __init__(self, dataset_id=None, exact=False, tuple=None):
if tuple:
self.__tuple = tuple
elif dataset_id:
wildcard = "%" if not exact else ""
pattern = "dataset_id like '{0:s}{0:s}{0:s}'".format(wildcard, dataset_id, wildcard)
dsets = (Datasets & pattern)
assert(len(dsets) == 1), "Dataset name is not unique!"
self.__tuple = dsets.fetch(limit=1)[0]
else:
print("Empty dataset, not linked to any database entry!")
@property
def dataset_id(self):
return self.__tuple["dataset_id"]
@property
def experimenter(self):
return self.__tuple["experimenter"]
@property
def recording_date(self):
return self.__tuple["recording_date"]
@property
def recording_duration(self):
return self.__tuple["duration"]
@property
def quality(self):
return self.__tuple["quality"]
@property
def has_nix(self):
return self.__tuple["has_nix"]
@property
def comment(self):
return self.__tuple["comment"]
@property
def data_source(self):
return self.__tuple["data_source"]
@property
def setup(self):
return self.__tuple["setup"]
@property
def cells(self):
cs = (Cells * (CellDatasetMap & self.__tuple))
return [Cell(tuple=c) for c in cs]
@property
def subjects(self):
subjs = (Subjects * (SubjectDatasetMap & self.__tuple))
return [Subject(tuple=s) for s in subjs]
@schema
class Subjects(dj.Manual):
definition = """
# Subjects
subject_id : varchar(256)
----
species : varchar(256)
"""
@staticmethod
def get_template_tuple(subject_id=None):
tup = dict(subject_id=None, species="")
if subject_id is not None:
d = dict((Subjects() & {"subject_id": subject_id}).fetch1())
return d
return tup
def make(self, key):
file_path = Datasets.get_nix_file(key)
if file_path is None:
return
nix_file = nix.File.open(file_path, nix.FileMode.ReadOnly)
m = nix_file.blocks[0].metadata
inserts = Subjects.get_template_tuple()
subj_info = m["Recording"]["Subject"]
inserts["subject_id"] = subj_info["Identifier"]
inserts["species"] = subj_info["Species"][0]
inserts["weight"] = subj_info["Weight"]
inserts["size"] = subj_info["Size"]
inserts["eod_frequency"] = np.round(subj_info["EOD Frequency"] * 10) / 10
inserts.update(key)
self.insert1(inserts, skip_duplicates=True)
nix_file.close()
@staticmethod
def subjects(species=None):
subjs = []
if species:
subjs = (Subjects & "species like '%{0:s}%'".format(species))
else:
subjs = (Subjects & True)
return [Subject(tuple=s for s in subjs]
@staticmethod
def unique_species():
all_species = (Subjects & True).fetch("species")
return np.unique(all_species)
@property
def properties(self):
return (SubjectProperties & self).fetch1()
#@property
#def datasets(self):
# retrun
class Subject:
__tuple = {}
def __init__(self, subject_id=None, tuple=None):
if tuple:
self.__tuple = tuple
elif subject_id:
self.__tuple = Subjects & "subject_id like '{0:s}'".format(subject_id).fetch()[0]
else:
print("Empty Subject, not linked to any database entry!")
@property
def subject_id(self):
return self.__tuple["subject_id"]
@property
def species(self):
return self.__tuple["species"]
@property
def cells(self):
cs = Cells & self.__tuple
return [Cell(tuple=c) for c in cs]
@schema
class SubjectDatasetMap(dj.Manual):
definition = """
# SubjectDatasetMap
-> Subjects
-> Datasets
"""
@schema
class SubjectProperties(dj.Manual):
definition = """
# _SubjectProperties
id : int auto_increment
----
-> Subjects
recording_date : date
weight : float
size : float
eod_frequency : float
"""
def get_template_tuple(id=None):
tup = dict(id=None, subject_id=None, recording_date=None, weight=0.0, size=0.0,
eod_frequency=0.0)
if id is not None:
return dict((SubjectProperties() & {"id": id}).fetch1())
return tup
@schema
class Cells(dj.Manual):
definition = """
# Table that stores information about recorded cells.
cell_id : varchar(256)
----
-> Subjects
cell_type : varchar(256)
firing_rate : float
structure : varchar(256)
region : varchar(256)
subregion : varchar(256)
depth : float
lateral_pos : float
transversal_section : float
"""
@staticmethod
def get_template_tuple(cell_id=None):
tup = dict(cell_id=None, subject_id=None, cell_type="", firing_rate=0.0,
depth=0.0, region="", subregion="", structure="",
lateral_pos=0.0, transversal_section=0.0)
if cell_id is not None:
d = dict((Cells() & {"cell_id": cell_id}).fetch1())
return d
return tup
@property
def subject(self):
return Subjects & self
@staticmethod
def celltypes():
return np.unique(Cells.fetch("cell_type"))
@staticmethod
def cells(celltype=None, species=None, quality="good"):
cs = Cells * CellDatasetMap * Datasets * Subjects
if celltype:
cs = cs & "cell_type like '{0:s}'".format(celltype)
if species:
cs = cs & "species like '%{0:s}%'".format(species)
if quality:
cs = cs & "quality like '{0:s}'".format(quality)
return cs
class Cell:
def __init__(self, cell_id=None, tuple=None):
if tuple:
self.__tuple = tuple
elif cell_id:
pattern = "cell_id like '{0:s}'".format(cell_id)
cells = (Cells & pattern)
assert (len(cells) == 1), "Cell id is not unique!"
self.__tuple = cells.fetch(as_dict=True)[0]
else:
print("Empty Cell, not linke to any database entry!")
@property
def cell_id(self):
return self.__tuple["cell_id"] if "cell_id" in self.__tuple.keys() else ""
@schema
class CellDatasetMap(dj.Manual):
definition = """
# Table that maps recorded cells to datasets
-> Datasets
-> Cells
"""
@schema
class Repros(dj.Manual):
definition = """
repro_id : varchar(512) # The name that was given to the RePro run by relacs
run : smallint # A counter counting the runs of the ReProp in this dataset
-> Cells #
----
repro_name : varchar(512) # The original name of the RePro itself, not any given name by user or relacs
settings : varchar(3000) # Yaml formatted string containing the repro settings (tag.metadata in case of a nix file)
start : float # The start time of the repro
duration : float # The duration of the repro
"""
@staticmethod
def get_template_tuple(repro_id=None):
tup = dict(repro_id=None, cell_id=None, run=0, repro_name="", settings=None, start=None, duration=None)
if repro_id is not None:
d = dict((Repros() & {"repro_id": repro_id}).fetch1())
return d
return tup
@schema
class Stimuli(dj.Manual):
definition = """
stimulus_id : varchar(50)
-> Repros
---
stimulus_index : int
stimulus_name : varchar(512)
mtag_id : varchar(50)
start_time : float
start_index : int
duration : float
settings : varchar(3000)
"""
@staticmethod
def get_template_tuple(stimulus_id=None):
if stimulus_id is not None:
tup = dict((Stimuli & {"stimulus_id": stimulus_id}).fetch1())
else:
tup = dict(stimulus_id=None, stimulus_index=None, stimulus_name="", start_index=0, start_time=0.0,
duration=0.0, settings=None)
return tup
def populate_datasets(data_path, update=False):
if not os.path.exists(data_path):
return
dset_name = os.path.split(data_path)[-1]
experimenter, rec_date, quality, comment, has_nix, rec_duration, setup = ut.read_dataset_info(os.path.join(data_path, 'info.dat'))
if not experimenter:
return False
inserts = Datasets.get_template_tuple()
inserts["dataset_id"] = dset_name
inserts["data_source"] = data_path
inserts["experimenter"] = experimenter
inserts["recording_date"] = rec_date
inserts["quality"] = quality if not isinstance(quality, dict) else ""
inserts["comment"] = comment if not isinstance(comment, dict) else ""
inserts["duration"] = rec_duration
inserts["setup"] = setup
inserts["has_nix"] = has_nix
if len(Datasets & inserts) > 0 and not update:
print('\t\t %s is already in database!' % dset_name)
return False
Datasets().insert1(inserts, skip_duplicates=True)
return True
def populate_subjects(data_path):
print("\tImporting subject(s) of %s" % data_path)
dset_name = os.path.split(data_path)[-1]
info_file = os.path.join(data_path, 'info.dat')
if not os.path.exists(info_file):
return None, None, False
info = ut.read_info_file(info_file)
p = []
ut.find_key_recursive(info, "Subject", p)
subj = {}
if len(p) > 0:
subj = ut.deep_get(info, p)
inserts = Subjects.get_template_tuple()
subj_id = None
if "Identifier" in subj.keys():
if isinstance(subj["Identifier"], dict):
subj_id = "unspecified_" + dset_name
else:
subj_id = subj["Identifier"]
elif "Identifier" in info.keys():
if isinstance(info["Identifier"], dict):
subj_id = "unspecified_" + dset_name
else:
subj_id = info["Identifier"]
else:
subj_id = "unspecified_" + dset_name
inserts["subject_id"] = subj_id
inserts["species"] = subj["Species"]
Subjects().insert1(inserts, skip_duplicates=True)
# multi match entry
dataset = dict((Datasets() & {"dataset_id": dset_name}).fetch1())
mm = dict(dataset_id=dataset["dataset_id"], subject_id=inserts["subject_id"])
SubjectDatasetMap.insert1(mm, skip_duplicates=True)
# subject properties
props = SubjectProperties.get_template_tuple()
props["subject_id"] = inserts["subject_id"]
props["recording_date"] = dataset["recording_date"]
if "Weight" in subj.keys():
props["weight"] = np.round(float(subj["Weight"][:-1]), 1)
if "Size" in subj.keys():
props["size"] = np.round(float(subj["Size"][:-2]), 1)
if "EOD Frequency" in subj.keys():
props["eod_frequency"] = np.round(float(subj["EOD Frequency"][:-2]))
p = props.copy()
p.pop("id")
if len(SubjectProperties & p) == 0:
SubjectProperties.insert1(props, skip_duplicates=True)
def populate_cells(data_path):
print("\tImporting cell(s) of %s" % data_path)
dset_name = os.path.split(data_path)[-1]
info_file = os.path.join(data_path, 'info.dat')
if not os.path.exists(info_file):
return None, None, False
info = ut.read_info_file(info_file)
p = []
ut.find_key_recursive(info, "Subject", p)
subject_info = ut.deep_get(info, p)
p = []
ut.find_key_recursive(info, "Cell", p)
cell_info = ut.deep_get(info, p)
p = []
ut.find_key_recursive(info, "Firing Rate1", p)
firing_rate = ut.deep_get(info, p, default=0.0)
if isinstance(firing_rate, str):
firing_rate = float(firing_rate[:-2])
subj_id = None
if "Identifier" in subject_info.keys():
if isinstance(subject_info["Identifier"], dict):
subj_id = "unspecified_" + dset_name
else:
subj_id = subject_info["Identifier"]
elif "Identifier" in info.keys():
if isinstance(info["Identifier"], dict):
subj_id = "unspecified_" + dset_name
else:
subj_id = info["Identifier"]
else:
subj_id = "unspecified_" + dset_name
dataset = dict((Datasets & {"dataset_id": dset_name}).fetch1())
subject = dict((Subjects & {"subject_id": subj_id}).fetch1())
dataset_id = dataset["dataset_id"]
cell_id = "-".join(dataset_id.split("-")[:4]) if len(dataset_id) > 4 else dataset_id
cell_props = Cells.get_template_tuple()
cell_props["subject_id"] = subject["subject_id"]
cell_props["cell_id"] = cell_id
cell_props["cell_type"] = cell_info["CellType"]
cell_props["firing_rate"] = firing_rate
if "Structure" in cell_info.keys():
cell_props["structure"] = cell_info["Structure"]
if "BrainRegion" in cell_info.keys():
cell_props["region"] = cell_info["BrainRegion"]
if "BrainSubRegion" in cell_info.keys():
cell_props["subregion"] = cell_info["BrainSubRegion"]
if "Depth" in cell_info.keys():
cell_props["depth"] = float(cell_info["Depth"][:-2])
if "Lateral position" in cell_info.keys():
cell_props["lateral_pos"] = float(cell_info["Lateral position"][:-2])
if "Transverse section" in cell_info.keys():
cell_props["transversal_section"] = float(cell_info["Transverse section"])
Cells.insert1(cell_props, skip_duplicates=True)
# multi mach entry
mm = dict(dataset_id=dataset["dataset_id"], cell_id=cell_props["cell_id"])
CellDatasetMap.insert1(mm, skip_duplicates=True)
def scan_nix_file_for_repros(dataset):
print("\t\tscanning nix file")
cell_id = (Cells * CellDatasetMap * (Datasets & "dataset_id = '%s'" % dataset["dataset_id"])).fetch("cell_id", limit=1)[0]
nix_files = glob.glob(os.path.join(dataset["data_source"], "*.nix"))
for nf in nix_files:
if not Datasets.check_file_integrity(nf):
print("\t\tfile is not sane!!!")
continue
f = nix.File.open(nf, nix.FileMode.ReadOnly)
b = f.blocks[0]
repro_runs = [t for t in b.tags if "relacs.repro_run" in t.type]
for t in repro_runs:
rs = t.metadata.find_sections(lambda x: "Run" in x.props)
if len(rs) == 0:
continue
rs = rs[0]
print("\t\t%s" % rs["RePro"])
rp = Repros.get_template_tuple()
rp["run"] = rs["Run"]
rp["repro_name"] = rs["RePro"]
rp["cell_id"] = cell_id
rp["repro_id"] = t.name
settings = t.metadata.find_sections(lambda x: "settings" in x.type)
if len(settings) > 0:
rp["settings"] = ut.nix_metadata_to_yaml(settings[0])
else:
rp["settings"] = ut.nix_metadata_to_yaml(t.metadata)
rp["start"] = t.position[0]
rp["duration"] = t.extent[0]
Repros.insert1(rp, skip_duplicates=True)
# import Stimuli
repro = dict((Repros & dict(repro_id=rp["repro_id"], cell_id=cell_id)).fetch1())
repro.pop("settings")
repro.pop("repro_name")
repro.pop("start")
repro.pop("duration")
mtags, positions = ut.find_mtags_for_tag(b, t)
for i, mt in enumerate(mtags):
mt_positions = np.atleast_2d(mt.positions[:]).T
mt_extents = np.atleast_2d(mt.extents[:]).T
for p in positions[i]:
settings = ut.mtag_settings_to_yaml(mt, p)
stim_start = mt_positions[p, 0]
stim_duration = mt_extents[p, 0]
stim = Stimuli.get_template_tuple()
stim["stimulus_id"] = str(uuid.uuid1())
stim["stimulus_index"] = p
stim["start_time"] = stim_start
stim["start_index"] = -1
stim["duration"] = stim_duration
stim["settings"] = settings
stim["mtag_id"] = mt.id
stim["stimulus_name"] = mt.name
stim.update(repro)
Stimuli.insert1(stim, skip_duplicates=True)
f.close()
f = None
def scan_folder_for_repros(dataset):
print("\t\tNo nix-file, scanning directory!")
repro_settings, stim_indices = ut.read_stimuli_file(dataset["data_source"])
repro_counts = {}
cell_id = (Cells * CellDatasetMap * (Datasets & "dataset_id = '%s'" % dataset["dataset_id"])).fetch("cell_id", limit=1)[0]
for i, (rs, si) in enumerate(zip(repro_settings, stim_indices)):
rp = Repros.get_template_tuple()
path = []
if not ut.find_key_recursive(rs, "run", path):
ut.find_key_recursive(rs, "Run", path)
if len(path) > 0:
rp["run"] = ut.deep_get(rs, path, 0)
else:
rp["run"] = -1
path = []
if not ut.find_key_recursive(rs, "repro", path):
ut.find_key_recursive(rs, "RePro", path)
print("\t\t %s" % ut.deep_get(rs, path, "None"))
rp["repro_name"] = ut.deep_get(rs, path, "None")
path = []
if rp["repro_name"] in repro_counts.keys():
repro_counts[rp["repro_name"]] += 1
else:
repro_counts[rp["repro_name"]] = 1
rp["cell_id"] = cell_id
rp["repro_id"] = rp["repro_name"] + str(repro_counts[rp["repro_name"]])
rp["start"] = 0.
rp["duration"] = 0.
rp["settings"] = yaml.dump(rs)
Repros.insert1(rp, skip_duplicates=True)
# import stimuli
repro = dict((Repros & dict(repro_id=rp["repro_id"], cell_id=cell_id)).fetch1())
repro.pop("settings")
repro.pop("repro_name")
repro.pop("start")
repro.pop("duration")
for j, k in enumerate(si.keys()):
s = int(si[k])
stim_start = 0.
path = []
if not ut.find_key_recursive(rs, "duration", path):
ut.find_key_recursive(rs, "Duration", path)
if len(path) > 0 :
stim_duration = ut.deep_get(rs, path, None)
if "ms" in stim_duration:
stim_duration = float(stim_duration[:stim_duration.index("ms")])
else:
stim_duration = float(stim_duration[:stim_duration.index("s")])
else:
stim_duration = 0.0
stim = Stimuli.get_template_tuple()
stim["stimulus_id"] = str(uuid.uuid1())
stim["stimulus_index"] = j
stim["start_time"] = stim_start
stim["start_index"] = s
stim["duration"] = stim_duration
stim["settings"] = yaml.dump(rs)
stim["mtag_id"] = ""
stim["stimulus_name"] = ""
stim.update(repro)
Stimuli.insert1(stim, skip_duplicates=True)
def populate_repros(data_path):
print("\tImporting RePro(s) of %s" % data_path)
dset_name = os.path.split(data_path)[-1]
if len(Datasets & {"dataset_id": dset_name}) != 1:
return False
dataset = dict((Datasets & {"dataset_id": dset_name}).fetch1())
if dataset["has_nix"]:
scan_nix_file_for_repros(dataset)
else:
scan_folder_for_repros(dataset)
return True
def drop_tables():
Datasets.drop()
Subjects.drop()
def populate(datasets, update=False):
for i, d in enumerate(datasets):
print("Importing %i of %i: %s" % (i, len(datasets), d))
if not populate_datasets(d, update):
continue
populate_subjects(d)
populate_cells(d)
try:
populate_repros(d)
except ():
print("\t\tsomething went wrong! %s" % d)
if __name__ == "__main__":
data_dir = "/data/apteronotus"
# data_dir = "../high_freq_chirps/data"
# drop_tables()
# datasets = glob.glob("/Users/jan/zwischenlager/2012-*")2010-06-21-ac/info.dat
datasets = glob.glob(os.path.join(data_dir, '/data/apteronotus/2018-*'))
populate(datasets, update=False)