P-unit_model/cell_overview.py
2020-08-28 17:40:47 +02:00

208 lines
7.6 KiB
Python

from CellData import icelldata_of_dir, CellData
from Baseline import BaselineCellData
from FiCurve import FICurveCellData
from DataParserFactory import DatParser
import os
import numpy as np
import matplotlib.pyplot as plt
def main():
# plot_visualizations("cells/")
# full_overview("cells/master_table.csv", "cells/")
# recalculate_saved_preanalysis("data/final/")
metadata_analysis("data/final/")
pass
def metadata_analysis(data_folder, filter_double_fish=False):
gender = {}
species = {}
sampling_interval = {}
eod_freqs = []
sizes = []
dates = {}
fi_curve_stimulus = []
fi_curve_contrasts = []
fi_curve_c_trials = []
for cell in os.listdir(data_folder):
if filter_double_fish and cell[:10] in dates.keys():
continue
dates[cell[:10]] = 1
cell_path = data_folder + cell
parser = DatParser(cell_path)
cell_data = CellData(cell_path)
species = count_with_dict(species, parser.get_species())
gender = count_with_dict(gender, parser.get_gender())
sampling_interval = count_with_dict(sampling_interval, parser.get_sampling_interval())
sizes.append(float(parser.get_fish_size()))
# eod_freqs.append(cell_data.get_eod_frequency())
fi_curve_stimulus.append(cell_data.get_recording_times())
contrasts_with_trials = cell_data.get_fi_curve_contrasts_with_trial_number()
fi_curve_contrasts.append([c[0] for c in contrasts_with_trials if c[1] >= 3])
fi_curve_c_trials.append([c[1] for c in contrasts_with_trials if c[1] >= 3])
for k in sampling_interval.keys():
print("sampling:", np.rint(1.0/float(k)), "count:", sampling_interval[k])
print("# of Fish (by dates):", len(dates.keys()))
print("Fish genders:", gender)
# print("EOD-freq: min {:.2f}, mean {:.2f}, max {:.2f}, std {:.2f}".format(min(eod_freqs), np.mean(eod_freqs), max(eod_freqs), np.std(eod_freqs)))
print("Sizes: min {:.2f}, mean {:.2f}, max {:.2f}, std {:.2f}".format(min(sizes), np.mean(sizes), max(sizes), np.std(sizes)))
print("\n\nFi-Curve Stimulus:")
print("Delay:", np.unique([r[0] for r in fi_curve_stimulus]))
print("starts:", np.unique([r[1] for r in fi_curve_stimulus]))
# print("ends:", np.unique([r[0] for r in fi_curve_stimulus]))
print("duration:", np.unique([r[2] for r in fi_curve_stimulus], return_counts=True))
print("after:", np.unique([r[3] for r in fi_curve_stimulus]))
# print("Contrasts:")
# for c in fi_curve_contrasts:
# print("min: {:.1f}, max: {:.1f}, count: {},".format(c[0], c[-1], len(c)))
# bins = np.arange(-1, 1.01, 0.05)
# all_contrasts = []
# for c in fi_curve_contrasts:
# all_contrasts.extend(c)
# plt.hist([c[0] for c in fi_curve_contrasts], bins=bins, label="mins", color="blue", alpha=0.5)
# plt.hist([c[-1] for c in fi_curve_contrasts], bins=bins, label="maxs", color="red", alpha=0.5)
# plt.hist(all_contrasts, bins=bins, label="all", color="black", alpha=0.5)
# plt.show()
print("done")
def count_with_dict(dictionary, key):
if key not in dictionary:
dictionary[key] = 0
dictionary[key] += 1
return dictionary
def recalculate_saved_preanalysis(data_folder):
for cell_data in icelldata_of_dir(data_folder, test_for_v1_trace=True):
print(cell_data.get_data_path())
baseline = BaselineCellData(cell_data)
baseline.save_values(cell_data.get_data_path())
contrasts = cell_data.get_fi_contrasts()
fi_curve = FICurveCellData(cell_data, contrasts)
if fi_curve.get_f_inf_slope() < 0:
contrasts = np.array(cell_data.get_fi_contrasts()) * -1
print(contrasts)
fi_curve = FICurveCellData(cell_data, contrasts, save_dir=cell_data.get_data_path(), recalculate=True)
# fi_curve.plot_fi_curve()
def move_rejected_cell_data():
count = 0
jump_to = 0
negative_contrast_rel = 0
cell_list = []
for d in icelldata_of_dir("invivo_data/"):
count += 1
if count < jump_to:
continue
print(d.get_data_path())
base = BaselineCellData(d)
base.load_values(d.get_data_path())
ficurve = FICurveCellData(d, d.get_fi_contrasts(), d.get_data_path())
if ficurve.get_f_inf_slope() < 0:
negative_contrast_rel += 1
print("negative f_inf slope")
cell_list.append(os.path.abspath(d.get_data_path()))
for c in cell_list:
if os.path.exists(c):
print("Source: ", c)
destination = os.path.abspath("rejected_cells/negative_slope_f_inf/" + os.path.basename(c))
print("destination: ", destination)
print()
os.rename(c, destination)
print("Number: " + str(negative_contrast_rel))
def plot_visualizations(folder_path):
for cell_data in icelldata_of_dir("invivo_data/"):
name = os.path.split(cell_data.get_data_path())[-1]
print(name)
save_path = folder_path + name + "/"
if not os.path.exists(save_path):
os.mkdir(save_path)
baseline = BaselineCellData(cell_data)
baseline.plot_baseline(save_path)
baseline.plot_serial_correlation(10, save_path)
baseline.plot_polar_vector_strength(save_path)
baseline.plot_interspike_interval_histogram(save_path)
ficurve = FICurveCellData(cell_data, cell_data.get_fi_contrasts())
ficurve.plot_fi_curve(save_path)
def full_overview(save_path_table, folder_path):
with open(save_path_table, "w") as table:
table.write("Name, Path, Baseline Frequency Hz,Vector Strength, serial correlation lag=1,"
" serial correlation lag=2, burstiness, coefficient of variation,"
" fi-curve inf slope, fi-curve zero slope at straight, contrast at fi-curve zero straight\n")
# add contrasts, f-inf values, f_zero_values
count = 0
start = 0
for cell_data in icelldata_of_dir("invivo_data/"):
count += 1
if count < start:
continue
save_dir = cell_data.get_data_path()
name = os.path.split(cell_data.get_data_path())[-1]
line = name + ","
line += cell_data.get_data_path() + ","
baseline = BaselineCellData(cell_data)
if not baseline.load_values(save_dir):
baseline.save_values(save_dir)
line += "{:.1f},".format(baseline.get_baseline_frequency())
line += "{:.2f},".format(baseline.get_vector_strength())
sc = baseline.get_serial_correlation(2)
line += "{:.2f},".format(sc[0])
line += "{:.2f},".format(sc[1])
line += "{:.2f},".format(baseline.get_burstiness())
line += "{:.2f},".format(baseline.get_coefficient_of_variation())
ficurve = FICurveCellData(cell_data, cell_data.get_fi_contrasts(), save_dir)
line += "{:.2f},".format(ficurve.get_f_inf_slope())
line += "{:.2f}\n".format(ficurve.get_f_zero_fit_slope_at_straight())
line += "{:.2f}\n".format(ficurve.f_zero_fit[3])
table.write(line)
name = os.path.split(cell_data.get_data_path())[-1]
print(name)
save_path = folder_path + name + "/"
if not os.path.exists(save_path):
os.mkdir(save_path)
baseline.plot_baseline(save_path)
baseline.plot_serial_correlation(10, save_path)
baseline.plot_polar_vector_strength(save_path)
baseline.plot_interspike_interval_histogram(save_path)
ficurve.plot_fi_curve(save_path)
if __name__ == '__main__':
main()