[reproclasses] new FileStimulusData class

This commit is contained in:
Jan Grewe 2019-11-18 16:52:01 +01:00
parent 9136b4b042
commit e0cd2a0538

View File

@ -653,9 +653,109 @@ class FIData:
return boltzmann_fit
class FileStimulusData:
def __init__(self, dataset: Dataset):
"""
Constructor.
:param dataset: The dataset entity for which the filestimulus repro data should be loaded.
"""
self.__spike_data = []
self.__contrasts = []
self.__stimuli = []
self.__dataset = dataset
self.__repros = None
self.__cell = dataset.cells[0] # Beware: Assumption that there is only a single cell
self._get_data()
def _get_data(self):
if not self.__dataset:
return
self.__repros = RePro.find("FileStimulus", cell_id=self.__cell.id)
for r in self.__repros:
sd, c, stims = self.__read_spike_data_from_nix(r) if self.__dataset.has_nix else self.__read_spike_data_from_directory(r)
if sd is not None and len(sd) > 1:
self.__spike_data.extend(sd)
self.__contrasts.extend(c)
self.__stimuli.extend(stims)
else:
continue
def __do_read_spike_data_from_nix(self, mt: nix.pycore.MultiTag, stimulus: Stimulus, repro: RePro):
spikes = None
contrast = 0.0
stim_file = ""
r_settings = repro.settings.split("\n")
s_settings = stimulus.settings.split("\n")
delay = 0.0
for s in r_settings:
if "delay:" in s:
delay = float(s.split(":")[-1])
break
start_time = stimulus.start_time - delay
end_time = stimulus.start_time + mt.extents[stimulus.index]
contrast = 0.0 # this is a quick fix!!!
embed()
for s in s_settings:
if "Contrast:" in s and "PreContrast" not in s and "\t\t" not in s and "+-" not in s:
contrast = float(s.split(":")[-1])
break
return spikes, contrast, stim_file
local_eod = eod_da[start_index_eod:end_index_eod]
spikes = self.__all_spikes[(self.__all_spikes >= start_time) & (self.__all_spikes < end_time)] - start_time - delay
time = np.asarray(eod_da.dimensions[0].axis(end_index_eod - start_index_eod)) - delay
return spikes, local_eod, time, contrast
return spikes, contrast, stim_file
def __read_spike_data_from_nix(self, repro: RePro):
spikes = []
contrasts = []
stim_files = []
stimuli = Stimulus.find(cell_id=repro.cell_id, repro_id=repro.id)
if len(stimuli) == 0:
return spikes, contrasts, stim_files
data_source = os.path.join(self.__dataset.data_source, self.__dataset.id + ".nix")
if not os.path.exists(data_source):
print("Data not found! Trying from directory")
return self.__read_spike_data_from_directory(repro)
f = nix.File.open(data_source, nix.FileMode.ReadOnly)
b = f.blocks[0]
self.__all_spikes = b.data_arrays["Spikes-1"][:]
mt = None
for i in tqdm(range(len(stimuli)), desc="Loading data"):
s = stimuli[i]
if not mt or mt.id != s.multi_tag_id:
mt = b.multi_tags[s.multi_tag_id]
sp, c, stim = self.__do_read_spike_data_from_nix(mt, s, repro)
spikes.append(sp)
contrasts.append(c)
stim_files.append(stim)
f.close()
return spikes, contrasts, stim_files
def __read_spike_data_from_directory(self, repro: RePro):
print("not yet my friend!")
spikes = []
contrast = 0.0
stim = None
return spikes, contrast, stim
def read_stimulus(self, index=0):
pass
if __name__ == "__main__":
# dataset = Dataset(dataset_id='2011-06-14-ag')
# dataset = Dataset(dataset_id="2018-01-19-ac-invivo-1")
dataset = Dataset(dataset_id='2013-04-18-ac')
fi_curve = FIData(dataset)
dataset = Dataset(dataset_id="2018-09-13-ac-invivo-1")
# dataset = Dataset(dataset_id='2013-04-18-ac')
fi_curve = FileStimulusData(dataset)
embed()