379 lines
11 KiB
Python
379 lines
11 KiB
Python
|
|
from parser.CellData import icelldata_of_dir, CellData
|
|
from parser.DataParserFactory import DatParser
|
|
import numpy as np
|
|
import os
|
|
import matplotlib.pyplot as plt
|
|
import pyrelacs.DataLoader as Dl
|
|
from experiments.Baseline import BaselineCellData
|
|
from experiments.FiCurve import FICurveCellData
|
|
|
|
data_save_path = "test_routines/test_files/"
|
|
read = False
|
|
|
|
|
|
def main():
|
|
# test_kraken_files()
|
|
# test_for_species()
|
|
# test_for_vector_strength()
|
|
|
|
# test_cells()
|
|
|
|
read_test_cells_tsv()
|
|
# inspect_fi_curve()
|
|
|
|
|
|
def inspect_fi_curve():
|
|
data = "data/final/"
|
|
skip = True
|
|
for cell in sorted(os.listdir(data)):
|
|
if cell == "2015-01-15-ab-invivo-1":
|
|
skip = False
|
|
if skip:
|
|
continue
|
|
cell_path = data + cell
|
|
|
|
cell_data = CellData(cell_path)
|
|
print(cell_path)
|
|
ficurve = FICurveCellData(cell_data, cell_data.get_fi_contrasts())
|
|
|
|
ficurve.plot_fi_curve()
|
|
|
|
|
|
def read_test_cells_tsv():
|
|
file_path = "data/test_cells.tsv"
|
|
data_path = "data/final/"
|
|
rejected = os.listdir("rejected_cells/")
|
|
cells = os.listdir(data_path)
|
|
count_rejected = 0
|
|
count_accepted = 0
|
|
missing_cells = []
|
|
with open(file_path, 'r') as file:
|
|
for line in file:
|
|
parts = line.strip().split('\t')
|
|
|
|
if parts[0] == "True":
|
|
# print(parts[0])
|
|
count_accepted += 1
|
|
|
|
cell_name = parts[1].split("/")[-1]
|
|
if cell_name in cells:
|
|
pass
|
|
elif cell_name not in rejected:
|
|
missing_cells.append(cell_name)
|
|
else:
|
|
print("already thrown:", cell_name)
|
|
|
|
else:
|
|
count_rejected += 1
|
|
|
|
print("accepted:", count_accepted)
|
|
print("rejected:", count_rejected)
|
|
print("Total:", count_rejected + count_accepted)
|
|
|
|
for c in sorted(missing_cells):
|
|
print(c)
|
|
|
|
|
|
def test_cells():
|
|
directory = "/mnt/invivo_data/"
|
|
|
|
fi_curve_min_contrasts = 7
|
|
fi_curve_min_trials = 7
|
|
|
|
baseline_min_duration = 29.9
|
|
|
|
accepted_ls = []
|
|
files = []
|
|
baseline_lengths = []
|
|
ficurve_contrasts = []
|
|
count_errors = 0
|
|
for cell in os.listdir(directory):
|
|
accepted = True
|
|
cell_path = os.path.join(directory, cell)
|
|
if not os.path.isdir(cell_path):
|
|
continue
|
|
|
|
if not os.path.exists(cell_path + "/info.dat"):
|
|
count_errors += 1
|
|
continue
|
|
print(cell_path)
|
|
try:
|
|
parser = DatParser(cell_path)
|
|
|
|
# Test length of baseline recording
|
|
cell_baseline_lengths = parser.get_baseline_length()
|
|
long_enough = False
|
|
for baseline_len in cell_baseline_lengths:
|
|
if baseline_len >= baseline_min_duration:
|
|
long_enough = True
|
|
break
|
|
|
|
if not long_enough:
|
|
accepted = False
|
|
|
|
# test for recording quality
|
|
quality = parser.get_quality()
|
|
if quality.lower() not in ["good", "bursty", "fair"]:
|
|
print("bad quality")
|
|
accepted = False
|
|
|
|
# test for species
|
|
if "lepto" not in parser.get_species().lower():
|
|
accepted = False
|
|
|
|
if not parser.get_cell_type().lower() == "p-unit":
|
|
print(parser.get_cell_type())
|
|
accepted = False
|
|
# Test for enough trials and tested contrasts
|
|
contrasts = parser.get_fi_curve_contrasts()
|
|
count = 0
|
|
for c in contrasts:
|
|
if c[1] >= fi_curve_min_trials:
|
|
count += 1
|
|
|
|
if count < fi_curve_min_contrasts:
|
|
accepted = False
|
|
|
|
accepted_ls.append(accepted)
|
|
files.append(cell_path)
|
|
baseline_lengths.append(max(cell_baseline_lengths))
|
|
ficurve_contrasts.append(count)
|
|
|
|
except FileNotFoundError as e:
|
|
print("parser didn't work: File not found")
|
|
# accepted_ls.append(False)
|
|
# files.append(cell_path)
|
|
# baseline_lengths.append(-1)
|
|
# ficurve_contrasts.append(-1)
|
|
# count_errors += 1
|
|
except UnicodeDecodeError as e:
|
|
print("parser didn't work: UnicodeError")
|
|
# accepted_ls.append(False)
|
|
# files.append(cell_path)
|
|
# baseline_lengths.append(-1)
|
|
# ficurve_contrasts.append(-1)
|
|
count_errors += 1
|
|
print("Error:", count_errors)
|
|
|
|
with open("data/test_cells.tsv", 'w') as file:
|
|
|
|
for i in range(len(accepted_ls)):
|
|
file.write("{}\t{}\t{}\t{}\n".format(accepted_ls[i], files[i], baseline_lengths[i], ficurve_contrasts[i]))
|
|
|
|
|
|
|
|
|
|
|
|
def test_for_vector_strength():
|
|
directory = "invivo_data/"
|
|
bursty = []
|
|
p_units = []
|
|
no_vs = []
|
|
|
|
for cell in os.listdir(directory):
|
|
if "thresh" in cell:
|
|
continue
|
|
cell_path = os.path.join(directory, cell)
|
|
print(cell_path)
|
|
|
|
cell_data = CellData(cell_path)
|
|
base = BaselineCellData(cell_data)
|
|
|
|
if base.get_vector_strength() < 0.5:
|
|
no_vs.append(cell_path)
|
|
# print(cell_path, "->", "data/invivo_no_vs/" + cell)
|
|
os.rename(cell_path, "data/invivo_no_vs/" + cell)
|
|
continue
|
|
|
|
# vs > 0.5
|
|
if base.get_burstiness() > 0.2:
|
|
bursty.append(cell_path)
|
|
# print(cell_path, "->", "data/invivo_bursty/" + cell)
|
|
os.rename(cell_path, "data/invivo_bursty/" + cell)
|
|
continue
|
|
|
|
# vs > 0.5 and bursty == 0
|
|
|
|
p_units.append(cell_path)
|
|
# print(cell_path, "->", "data/invivo/" + cell)
|
|
os.rename(cell_path, "data/invivo/" + cell)
|
|
|
|
print("done")
|
|
print("bursty:", len(bursty), bursty)
|
|
print("no vs:", len(no_vs), no_vs)
|
|
print("p-units:", len(p_units), p_units)
|
|
|
|
|
|
def test_for_species():
|
|
|
|
directory = "invivo_data/"
|
|
sorted_cells = {}
|
|
error_cells = []
|
|
for cell in os.listdir(directory):
|
|
if "thresh" in cell:
|
|
continue
|
|
cell_path = os.path.join(directory, cell)
|
|
# print(cell_path)
|
|
info_file = os.path.join(cell_path, "info.dat")
|
|
|
|
for metadata in Dl.load(info_file):
|
|
if "Species" in metadata.keys():
|
|
species = metadata["Species"]
|
|
else:
|
|
species = metadata["Subject"]["Species"]
|
|
if species not in sorted_cells.keys():
|
|
sorted_cells[species] = []
|
|
sorted_cells[species].append(cell_path)
|
|
|
|
|
|
print("Errors:", len(error_cells))
|
|
for species in sorted_cells.keys():
|
|
print("{}: {}".format(species, len(sorted_cells[species])))
|
|
|
|
|
|
print()
|
|
print("errors:")
|
|
for cell in error_cells:
|
|
print(cell)
|
|
|
|
# print()
|
|
# print("eigen:")
|
|
# for cell in sorted_cells["Eigenmannia virescens"]:
|
|
# print(cell)
|
|
#
|
|
# print()
|
|
# print("albi:")
|
|
# for cell in sorted_cells["Apteronotus albifrons"]:
|
|
# print(cell)
|
|
|
|
|
|
# temp ssh mount command:
|
|
# sudo sshfs -o allow_other,default_permissions efish@kraken:/home/efish/ephys/invivo-1/intra/ /mnt/invivo_data/
|
|
|
|
def test_kraken_files():
|
|
if read:
|
|
directory = "/mnt/invivo_data/"
|
|
fi_curve_min_contrasts = 7
|
|
fi_curve_min_trials = 7
|
|
|
|
baseline_min_duration = 30
|
|
files = []
|
|
baseline = []
|
|
ficurve = []
|
|
accepted = []
|
|
count = 0
|
|
for data_dir in os.listdir(directory):
|
|
data_dir = os.path.join(directory, data_dir)
|
|
if not os.path.isdir(data_dir):
|
|
continue
|
|
try:
|
|
parser = DatParser(data_dir)
|
|
|
|
print(data_dir)
|
|
baseline_lengths = parser.get_baseline_length()
|
|
baseline_good = max(baseline_lengths) >= baseline_min_duration
|
|
contrasts = parser.get_fi_curve_contrasts()
|
|
if len(contrasts) < fi_curve_min_contrasts:
|
|
fi_curve_good = False
|
|
else:
|
|
intensities_with_enough_trials = contrasts[:, 0][contrasts[:, 1] >= fi_curve_min_trials]
|
|
|
|
fi_curve_good = len(intensities_with_enough_trials) >= fi_curve_min_contrasts
|
|
|
|
if fi_curve_good and baseline_good:
|
|
count += 1
|
|
print("good")
|
|
accepted.append(True)
|
|
else:
|
|
print("bad")
|
|
accepted.append(False)
|
|
files.append(data_dir)
|
|
baseline.append(baseline_lengths)
|
|
ficurve.append(contrasts)
|
|
except RuntimeError as e:
|
|
print(data_dir)
|
|
print("bad")
|
|
accepted.append(False)
|
|
files.append(data_dir)
|
|
baseline.append([])
|
|
ficurve.append([])
|
|
|
|
files = np.array(files)
|
|
baseline = np.array(baseline)
|
|
ficurve = np.array(ficurve)
|
|
accepted = np.array(accepted)
|
|
|
|
np.save(data_save_path + "files", files)
|
|
np.save(data_save_path + "baseline", baseline)
|
|
np.save(data_save_path + "ficurve", ficurve)
|
|
np.save(data_save_path + "accepted", accepted)
|
|
print("Total good:", count)
|
|
|
|
else:
|
|
files = np.load(data_save_path + "files.npy", allow_pickle=True)
|
|
baseline = np.load(data_save_path + "baseline.npy", allow_pickle=True)
|
|
ficurve = np.load(data_save_path + "ficurve.npy", allow_pickle=True)
|
|
accepted = np.load(data_save_path + "accepted.npy", allow_pickle=True)
|
|
|
|
print(np.sum(accepted))
|
|
with open("test_routines/data_files.txt", "w") as file:
|
|
|
|
for i in range(len(files)):
|
|
if accepted[i]:
|
|
file.write(files[i] + "\n")
|
|
|
|
quit()
|
|
|
|
min_contrasts = 7
|
|
min_trials = 7
|
|
min_baseline = 30
|
|
print("min_baseline: {:}, min_contrasts: {:}, min_trials: {:}".format(min_baseline, min_contrasts, min_trials))
|
|
# bins = np.arange(0, 100, 1)
|
|
# plt.hist([max(x) for x in baseline if len(x) > 0], bins=bins)
|
|
# plt.show()
|
|
# plt.close()
|
|
good_cells = []
|
|
ints_with_enough_trials = []
|
|
for i, contrasts in enumerate(ficurve):
|
|
if len(baseline[i]) <= 0 or max(baseline[i]) < min_baseline:
|
|
continue
|
|
count = 0
|
|
if len(contrasts) == 0:
|
|
continue
|
|
|
|
for intensity in contrasts:
|
|
if intensity[1] >= min_trials:
|
|
count += 1
|
|
ints_with_enough_trials.append(count)
|
|
|
|
bins = np.arange(0.5, 20.5, 1)
|
|
points = plt.hist(ints_with_enough_trials, bins=bins)
|
|
print(sum(points[0][min_contrasts-1:]))
|
|
#plt.show()
|
|
#plt.close()
|
|
|
|
count = 0
|
|
all_cells = 0
|
|
for cell_data in icelldata_of_dir("data/", False):
|
|
all_cells += 1
|
|
if max(cell_data.get_baseline_length()) < min_baseline:
|
|
continue
|
|
|
|
contrasts = cell_data.get_fi_curve_contrasts_with_trial_number()
|
|
c_count = 0
|
|
for c in contrasts:
|
|
if c[1] >= min_trials:
|
|
c_count += 1
|
|
|
|
if c_count < min_contrasts:
|
|
continue
|
|
|
|
count += 1
|
|
|
|
print("Fullfilled by {:} of {:} test cells".format(count, all_cells))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|