P-unit_model/test_for_cells.py
2021-01-09 23:59:34 +01:00

379 lines
11 KiB
Python

from parser.CellData import icelldata_of_dir, CellData
from parser.DataParserFactory import DatParser
import numpy as np
import os
import matplotlib.pyplot as plt
import pyrelacs.DataLoader as Dl
from experiments.Baseline import BaselineCellData
from experiments.FiCurve import FICurveCellData
data_save_path = "test_routines/test_files/"
read = False
def main():
# test_kraken_files()
# test_for_species()
# test_for_vector_strength()
# test_cells()
read_test_cells_tsv()
# inspect_fi_curve()
def inspect_fi_curve():
data = "data/final/"
skip = True
for cell in sorted(os.listdir(data)):
if cell == "2015-01-15-ab-invivo-1":
skip = False
if skip:
continue
cell_path = data + cell
cell_data = CellData(cell_path)
print(cell_path)
ficurve = FICurveCellData(cell_data, cell_data.get_fi_contrasts())
ficurve.plot_fi_curve()
def read_test_cells_tsv():
file_path = "data/test_cells.tsv"
data_path = "data/final/"
rejected = os.listdir("rejected_cells/")
cells = os.listdir(data_path)
count_rejected = 0
count_accepted = 0
missing_cells = []
with open(file_path, 'r') as file:
for line in file:
parts = line.strip().split('\t')
if parts[0] == "True":
# print(parts[0])
count_accepted += 1
cell_name = parts[1].split("/")[-1]
if cell_name in cells:
pass
elif cell_name not in rejected:
missing_cells.append(cell_name)
else:
print("already thrown:", cell_name)
else:
count_rejected += 1
print("accepted:", count_accepted)
print("rejected:", count_rejected)
print("Total:", count_rejected + count_accepted)
for c in sorted(missing_cells):
print(c)
def test_cells():
directory = "/mnt/invivo_data/"
fi_curve_min_contrasts = 7
fi_curve_min_trials = 7
baseline_min_duration = 29.9
accepted_ls = []
files = []
baseline_lengths = []
ficurve_contrasts = []
count_errors = 0
for cell in os.listdir(directory):
accepted = True
cell_path = os.path.join(directory, cell)
if not os.path.isdir(cell_path):
continue
if not os.path.exists(cell_path + "/info.dat"):
count_errors += 1
continue
print(cell_path)
try:
parser = DatParser(cell_path)
# Test length of baseline recording
cell_baseline_lengths = parser.get_baseline_length()
long_enough = False
for baseline_len in cell_baseline_lengths:
if baseline_len >= baseline_min_duration:
long_enough = True
break
if not long_enough:
accepted = False
# test for recording quality
quality = parser.get_quality()
if quality.lower() not in ["good", "bursty", "fair"]:
print("bad quality")
accepted = False
# test for species
if "lepto" not in parser.get_species().lower():
accepted = False
if not parser.get_cell_type().lower() == "p-unit":
print(parser.get_cell_type())
accepted = False
# Test for enough trials and tested contrasts
contrasts = parser.get_fi_curve_contrasts()
count = 0
for c in contrasts:
if c[1] >= fi_curve_min_trials:
count += 1
if count < fi_curve_min_contrasts:
accepted = False
accepted_ls.append(accepted)
files.append(cell_path)
baseline_lengths.append(max(cell_baseline_lengths))
ficurve_contrasts.append(count)
except FileNotFoundError as e:
print("parser didn't work: File not found")
# accepted_ls.append(False)
# files.append(cell_path)
# baseline_lengths.append(-1)
# ficurve_contrasts.append(-1)
# count_errors += 1
except UnicodeDecodeError as e:
print("parser didn't work: UnicodeError")
# accepted_ls.append(False)
# files.append(cell_path)
# baseline_lengths.append(-1)
# ficurve_contrasts.append(-1)
count_errors += 1
print("Error:", count_errors)
with open("data/test_cells.tsv", 'w') as file:
for i in range(len(accepted_ls)):
file.write("{}\t{}\t{}\t{}\n".format(accepted_ls[i], files[i], baseline_lengths[i], ficurve_contrasts[i]))
def test_for_vector_strength():
directory = "invivo_data/"
bursty = []
p_units = []
no_vs = []
for cell in os.listdir(directory):
if "thresh" in cell:
continue
cell_path = os.path.join(directory, cell)
print(cell_path)
cell_data = CellData(cell_path)
base = BaselineCellData(cell_data)
if base.get_vector_strength() < 0.5:
no_vs.append(cell_path)
# print(cell_path, "->", "data/invivo_no_vs/" + cell)
os.rename(cell_path, "data/invivo_no_vs/" + cell)
continue
# vs > 0.5
if base.get_burstiness() > 0.2:
bursty.append(cell_path)
# print(cell_path, "->", "data/invivo_bursty/" + cell)
os.rename(cell_path, "data/invivo_bursty/" + cell)
continue
# vs > 0.5 and bursty == 0
p_units.append(cell_path)
# print(cell_path, "->", "data/invivo/" + cell)
os.rename(cell_path, "data/invivo/" + cell)
print("done")
print("bursty:", len(bursty), bursty)
print("no vs:", len(no_vs), no_vs)
print("p-units:", len(p_units), p_units)
def test_for_species():
directory = "invivo_data/"
sorted_cells = {}
error_cells = []
for cell in os.listdir(directory):
if "thresh" in cell:
continue
cell_path = os.path.join(directory, cell)
# print(cell_path)
info_file = os.path.join(cell_path, "info.dat")
for metadata in Dl.load(info_file):
if "Species" in metadata.keys():
species = metadata["Species"]
else:
species = metadata["Subject"]["Species"]
if species not in sorted_cells.keys():
sorted_cells[species] = []
sorted_cells[species].append(cell_path)
print("Errors:", len(error_cells))
for species in sorted_cells.keys():
print("{}: {}".format(species, len(sorted_cells[species])))
print()
print("errors:")
for cell in error_cells:
print(cell)
# print()
# print("eigen:")
# for cell in sorted_cells["Eigenmannia virescens"]:
# print(cell)
#
# print()
# print("albi:")
# for cell in sorted_cells["Apteronotus albifrons"]:
# print(cell)
# temp ssh mount command:
# sudo sshfs -o allow_other,default_permissions efish@kraken:/home/efish/ephys/invivo-1/intra/ /mnt/invivo_data/
def test_kraken_files():
if read:
directory = "/mnt/invivo_data/"
fi_curve_min_contrasts = 7
fi_curve_min_trials = 7
baseline_min_duration = 30
files = []
baseline = []
ficurve = []
accepted = []
count = 0
for data_dir in os.listdir(directory):
data_dir = os.path.join(directory, data_dir)
if not os.path.isdir(data_dir):
continue
try:
parser = DatParser(data_dir)
print(data_dir)
baseline_lengths = parser.get_baseline_length()
baseline_good = max(baseline_lengths) >= baseline_min_duration
contrasts = parser.get_fi_curve_contrasts()
if len(contrasts) < fi_curve_min_contrasts:
fi_curve_good = False
else:
intensities_with_enough_trials = contrasts[:, 0][contrasts[:, 1] >= fi_curve_min_trials]
fi_curve_good = len(intensities_with_enough_trials) >= fi_curve_min_contrasts
if fi_curve_good and baseline_good:
count += 1
print("good")
accepted.append(True)
else:
print("bad")
accepted.append(False)
files.append(data_dir)
baseline.append(baseline_lengths)
ficurve.append(contrasts)
except RuntimeError as e:
print(data_dir)
print("bad")
accepted.append(False)
files.append(data_dir)
baseline.append([])
ficurve.append([])
files = np.array(files)
baseline = np.array(baseline)
ficurve = np.array(ficurve)
accepted = np.array(accepted)
np.save(data_save_path + "files", files)
np.save(data_save_path + "baseline", baseline)
np.save(data_save_path + "ficurve", ficurve)
np.save(data_save_path + "accepted", accepted)
print("Total good:", count)
else:
files = np.load(data_save_path + "files.npy", allow_pickle=True)
baseline = np.load(data_save_path + "baseline.npy", allow_pickle=True)
ficurve = np.load(data_save_path + "ficurve.npy", allow_pickle=True)
accepted = np.load(data_save_path + "accepted.npy", allow_pickle=True)
print(np.sum(accepted))
with open("test_routines/data_files.txt", "w") as file:
for i in range(len(files)):
if accepted[i]:
file.write(files[i] + "\n")
quit()
min_contrasts = 7
min_trials = 7
min_baseline = 30
print("min_baseline: {:}, min_contrasts: {:}, min_trials: {:}".format(min_baseline, min_contrasts, min_trials))
# bins = np.arange(0, 100, 1)
# plt.hist([max(x) for x in baseline if len(x) > 0], bins=bins)
# plt.show()
# plt.close()
good_cells = []
ints_with_enough_trials = []
for i, contrasts in enumerate(ficurve):
if len(baseline[i]) <= 0 or max(baseline[i]) < min_baseline:
continue
count = 0
if len(contrasts) == 0:
continue
for intensity in contrasts:
if intensity[1] >= min_trials:
count += 1
ints_with_enough_trials.append(count)
bins = np.arange(0.5, 20.5, 1)
points = plt.hist(ints_with_enough_trials, bins=bins)
print(sum(points[0][min_contrasts-1:]))
#plt.show()
#plt.close()
count = 0
all_cells = 0
for cell_data in icelldata_of_dir("data/", False):
all_cells += 1
if max(cell_data.get_baseline_length()) < min_baseline:
continue
contrasts = cell_data.get_fi_curve_contrasts_with_trial_number()
c_count = 0
for c in contrasts:
if c[1] >= min_trials:
c_count += 1
if c_count < min_contrasts:
continue
count += 1
print("Fullfilled by {:} of {:} test cells".format(count, all_cells))
if __name__ == '__main__':
main()