[reproclasses] new FI-Curve class, reading from nix works

This commit is contained in:
Jan Grewe 2019-10-02 18:10:47 +02:00
parent c03f84c2ee
commit 8d253c9c7a

View File

@ -1,4 +1,4 @@
from fishbook.fishbook import Dataset, RePro
from fishbook.fishbook import Dataset, RePro, Stimulus
import numpy as np
import nixio as nix
from scipy.stats import circstd
@ -276,8 +276,127 @@ class BaselineData:
return np.asarray(data)
class FIData:
def __init__(self, dataset:Dataset):
self.__spike_data = []
self.__contrasts = []
self.__eod_data = []
self.__eod_times = []
self.__dataset = dataset
self.__repros = None
self.__cell = dataset.cells[0] # Beware: Assumption that there is only a single cell
self._get_data()
pass
def _get_data(self):
if not self.__dataset:
return
self.__repros = RePro.find("FICurve", cell_id=self.__cell.id)
for r in self.__repros:
sd, c, eods, time = self.__read_spike_data(r)
if sd is not None and len(sd) > 1:
self.__spike_data.extend(sd)
self.__eod_data.extend(eods)
self.__contrasts.extend(c)
self.__eod_times.extend(time)
else:
continue
def __read_spike_data(self, repro:RePro):
"""
:param repro:
:return: spike data and the respective contrasts
"""
if self.__dataset.has_nix:
return self.__read_spikes_from_nix(repro)
else:
print("Sorry, so far only from nix!!!")
pass
return None, None, None, None
def __do_read_spike_data_from_nix(self, mtag:nix.pycore.MultiTag, stimulus:Stimulus, repro: RePro):
r_settings = repro.settings.split("\n")
s_settings = stimulus.settings.split("\n")
delay = 0.0
contrast = 0.0
for s in r_settings:
if "delay:" in s:
delay = float(s.split(":")[-1])
break
for s in s_settings:
if "Contrast:" in s and "PreContrast" not in s and "\t\t" not in s and "+-" not in s:
contrast = float(s.split(":")[-1])
break
start_time = stimulus.start_time - delay
end_time = stimulus.start_time + stimulus.duration
spikes_da = mtag.references["Spikes-1"]
eod_da = mtag.references["LocalEOD-1"]
start_index_spikes = spikes_da.dimensions[0].index_of(start_time)
end_index_spikes = spikes_da.dimensions[0].index_of(end_time)
start_index_eod = eod_da.dimensions[0].index_of(start_time)
end_index_eod = eod_da.dimensions[0].index_of(end_time)
local_eod = eod_da[start_index_eod:end_index_eod]
spikes = spikes_da[start_index_spikes:end_index_spikes] - start_time
time = np.asarray(eod_da.dimensions[0].axis(len(local_eod))) - delay
return spikes, local_eod, time, contrast
def __read_spikes_from_nix(self, repro:RePro):
spikes = []
eods = []
time = []
contrasts = []
stimuli = Stimulus.find(cell_id=repro.cell_id, repro_id=repro.id)
if len(stimuli) == 0:
return spikes, contrasts, eods, time
data_source = os.path.join(self.__dataset.data_source, self.__dataset.id + ".nix")
if not os.path.exists(data_source):
print("Data not found! Trying from directory")
return self.__read_spike_data_from_directory(repro)
f = nix.File.open(data_source, nix.FileMode.ReadOnly)
b = f.blocks[0]
mt = None
for s in stimuli:
if not mt or mt.id != s.multi_tag_id:
mt = b.multi_tags[s.multi_tag_id]
sp, eod, t, c = self.__do_read_spike_data_from_nix(mt, s, repro)
spikes.append(sp)
eods.append(eod)
time.append(t)
contrasts.append(c)
f.close()
return spikes, contrasts, eods, time
@property
def size(self):
return len(self.__spike_data)
def spikes(self, index=-1):
if 0 < index < self.size:
return self.__spike_data[index]
else:
return self.__spike_data
def eod(self, index=-1):
if 0 < index < self.size:
return self.__eod_times[index], self.__eod_data[index]
else:
return self.__eod_times, self.__eod_data
def contrast(self, index=-1):
if 0 < index < self.size:
return self.__contrasts[index]
else:
return self.__contrasts
if __name__ == "__main__":
dataset = Dataset(dataset_id='2011-06-14-ag')
#dataset = Dataset(dataset_id='2011-06-14-ag')
dataset = Dataset(dataset_id="2018-01-19-ac-invivo-1")
# dataset = Dataset(dataset_id='2018-11-09-aa-invivo-1')
baseline = BaselineData(dataset)
embed()