diff --git a/AdaptionCurrent.py b/AdaptionCurrent.py index f94b215..944425f 100644 --- a/AdaptionCurrent.py +++ b/AdaptionCurrent.py @@ -24,13 +24,14 @@ class Adaption: self.fit_exponential() self.calculate_tau_from_tau_eff() - def fit_exponential(self, length_of_fit=0.05): + def fit_exponential(self, length_of_fit=0.1): mean_frequencies = self.cell_data.get_mean_isi_frequencies() time_axes = self.cell_data.get_time_axes_mean_frequencies() for i in range(len(mean_frequencies)): start_idx = self.__find_start_idx_for_exponential_fit(i) if start_idx == -1: + print("start index negative") self.exponential_fit_vars.append([]) continue @@ -60,6 +61,7 @@ class Adaption: # Obviously a bad fit - time constant, expected in range 3-10ms, has value over 1 second or is negative if abs(popt[1] > 1) or popt[1] < 0: + print("detected an obviously bad fit") self.exponential_fit_vars.append([]) else: self.exponential_fit_vars.append(popt) @@ -81,7 +83,8 @@ class Adaption: return tau def __find_start_idx_for_exponential_fit(self, mean_freq_idx): - stimulus_start_idx = int((self.cell_data.get_delay() + self.cell_data.get_stimulus_start()) / self.cell_data.get_sampling_interval()) + time_axes = self.cell_data.get_time_axes_mean_frequencies()[mean_freq_idx] + stimulus_start_idx = int((self.cell_data.get_stimulus_start() + time_axes[0]) / self.cell_data.get_sampling_interval()) if self.fi_curve.f_infinities[mean_freq_idx] > self.fi_curve.f_baselines[mean_freq_idx] * 1.1: # start setting starting variables for the fit # search for the start_index by searching for the max @@ -124,24 +127,32 @@ class Adaption: return start_idx def calculate_tau_from_tau_eff(self): - taus = [] + tau_effs = [] for i in range(len(self.exponential_fit_vars)): if len(self.exponential_fit_vars[i]) == 0: continue - tau_eff = self.exponential_fit_vars[i][1]*1000 # tau_eff in ms - # intensity = self.fi_curve.stimulus_value[i] - f_infinity_slope = self.fi_curve.get_f_infinity_slope() - fi_curve_slope = self.fi_curve.get_fi_curve_slope_of_straight() + tau_effs.append(self.exponential_fit_vars[i][1]) - taus.append(tau_eff*(fi_curve_slope/f_infinity_slope)) - # print((fi_curve_slope/f_infinity_slope)) - # print(tau_eff*(fi_curve_slope/f_infinity_slope), "=", tau_eff, "*", (fi_curve_slope/f_infinity_slope)) + f_infinity_slope = self.fi_curve.get_f_infinity_slope() + # --- old way to calculate with the fi slope at middle of the fi curve + # fi_curve_slope = self.fi_curve.get_fi_curve_slope_of_straight() + # self.tau_real = np.median(tau_effs) * (fi_curve_slope / f_infinity_slope) - self.tau_real = np.median(taus) + # print("fi_slope to f_inf slope:", fi_curve_slope/f_infinity_slope) + # print("fi_slope:", fi_curve_slope, "f_inf slope:", f_infinity_slope) + # print("current tau: {:.1f}ms".format(np.median(tau_effs) * (fi_curve_slope / f_infinity_slope) * 1000)) + + # new way to calculate with the fi curve slope at the intersection point of it and the f_inf line + factor = self.fi_curve.get_fi_curve_slope_at_f_zero_intersection() / f_infinity_slope + self.tau_real = np.median(tau_effs) * factor + print("###### tau: {:.1f}ms".format(self.tau_real*1000), "other f_0 slope:", self.fi_curve.get_fi_curve_slope_at_f_zero_intersection()) def get_tau_real(self): return np.median(self.tau_real) + def get_tau_effs(self): + return [ex_vars[1] for ex_vars in self.exponential_fit_vars if ex_vars != []] + def plot_exponential_fits(self, save_path: str = None, indices: list = None, delete_previous: bool = False): if delete_previous: for val in self.cell_data.get_fi_contrasts(): @@ -152,7 +163,11 @@ class Adaption: os.remove(prev_path) for i in range(len(self.cell_data.get_fi_contrasts())): + if indices is not None and i not in indices: + continue + if self.exponential_fit_vars[i] == []: + print("no fit vars for index!") continue plt.plot(self.cell_data.get_time_axes_mean_frequencies()[i], self.cell_data.get_mean_isi_frequencies()[i]) diff --git a/CellData.py b/CellData.py index ad0d165..6fe085e 100644 --- a/CellData.py +++ b/CellData.py @@ -144,7 +144,7 @@ class CellData: times = self.get_base_traces(self.TIME) eods = self.get_base_traces(self.EOD) v1_traces = self.get_base_traces(self.V1) - return hf.calculate_vector_strength(times, eods, v1_traces) + return hf.calculate_vector_strength_from_v1_trace(times, eods, v1_traces) def get_serial_correlation(self, max_lag): serial_cors = [] diff --git a/FiCurve.py b/FiCurve.py index 74a1c6c..e81cc76 100644 --- a/FiCurve.py +++ b/FiCurve.py @@ -42,7 +42,6 @@ class FICurve: for i in range(len(mean_frequencies)): if time_axes[i][0] > self.cell_data.get_stimulus_start(): - # TODO warn("TODO: Deal with to strongly cut frequency traces in cell data! ") self.f_zeros.append(-1) self.f_baselines.append(-1) @@ -83,7 +82,6 @@ class FICurve: end_idx = int((stim_start-buffer)/sampling_interval) f_baseline = np.mean(frequency[start_idx:end_idx]) - plt.plot((start_idx, end_idx), (f_baseline, f_baseline), label="f_baseline") return f_baseline def __calculate_f_zero__(self, time, frequency, peak_buffer_percent=0.05, buffer=0.025): @@ -113,9 +111,6 @@ class FICurve: end_idx = start_idx + int((end_idx-start_idx)/2) f_zero = np.mean(frequency[start_idx:end_idx]) - plt.plot(frequency) - plt.plot((start_idx, end_idx), (f_zero, f_zero), label="f_zero, {:.2f}".format(peak_buffer)) - return f_zero # start_idx = int(stimulus_start / sampling_interval) @@ -143,18 +138,18 @@ class FICurve: start_idx = int((stimulus_end_time - length - buffer) / self.cell_data.get_sampling_interval()) end_idx = int((stimulus_end_time - buffer) / self.cell_data.get_sampling_interval()) - x = np.arange(start_idx, end_idx, 1) # time[start_idx:end_idx] - slope, intercept, r_value, p_value, std_err = linregress(x, frequency[start_idx:end_idx]) - - if p_value < 0.0001: - plt.title("significant slope: {:.2f}, p: {:.5f}, r: {:.5f}".format(slope, p_value, r_value)) - plt.plot(x, [i*slope + intercept for i in x], color="black") - - - plt.plot((start_idx, end_idx), (np.mean(frequency[start_idx:end_idx]), np.mean(frequency[start_idx:end_idx])), label="f_inf") - plt.legend() - plt.show() - plt.close() + # TODO add way to plot detected f_zero, f_inf, f_base. With detection of remaining slope? + # x = np.arange(start_idx, end_idx, 1) # time[start_idx:end_idx] + # slope, intercept, r_value, p_value, std_err = linregress(x, frequency[start_idx:end_idx]) + # if p_value < 0.0001: + # plt.title("significant slope: {:.2f}, p: {:.5f}, r: {:.5f}".format(slope, p_value, r_value)) + # plt.plot(x, [i*slope + intercept for i in x], color="black") + # + # + # plt.plot((start_idx, end_idx), (np.mean(frequency[start_idx:end_idx]), np.mean(frequency[start_idx:end_idx])), label="f_inf") + # plt.legend() + # plt.show() + # plt.close() return np.mean(frequency[start_idx:end_idx]) @@ -177,6 +172,36 @@ class FICurve: fit_vars = self.boltzmann_fit_vars return fu.full_boltzmann_straight_slope(fit_vars[0], fit_vars[1], fit_vars[2], fit_vars[3]) + def get_f_zero_and_f_inf_intersection(self): + x_values = np.arange(min(self.stimulus_value), max(self.stimulus_value), 0.0001) + fit_vars = self.boltzmann_fit_vars + f_zero = fu.full_boltzmann(x_values, fit_vars[0], fit_vars[1], fit_vars[2], fit_vars[3]) + f_inf = fu.clipped_line(x_values, self.f_infinity_fit[0], self.f_infinity_fit[1]) + + intersection_indicies = np.argwhere(np.diff(np.sign(f_zero - f_inf))).flatten() + # print("fi-curve calc intersection:", intersection_indicies, x_values[intersection_indicies]) + if len(intersection_indicies) > 1: + f_baseline = np.median(self.f_baselines) + best_dist = np.inf + best_idx = -1 + for idx in intersection_indicies: + dist = abs(fu.clipped_line(x_values[idx], self.f_infinity_fit[0], self.f_infinity_fit[1]) - f_baseline) + if dist < best_dist: + best_dist = dist + best_idx = idx + + return x_values[best_idx] + + elif len(intersection_indicies) == 0: + raise ValueError("No intersection found!") + else: + return x_values[intersection_indicies[0]] + + def get_fi_curve_slope_at_f_zero_intersection(self): + x = self.get_f_zero_and_f_inf_intersection() + fit_vars = self.boltzmann_fit_vars + return fu.derivative_full_boltzmann(x, fit_vars[0], fit_vars[1], fit_vars[2], fit_vars[3]) + def plot_fi_curve(self, savepath: str = None): min_x = min(self.stimulus_value) max_x = max(self.stimulus_value) @@ -210,7 +235,6 @@ class FICurve: mean_frequencies = np.array(self.cell_data.get_mean_isi_frequencies()) time_axes = self.cell_data.get_time_axes_mean_frequencies() - for i in range(len(mean_frequencies)): fig, axes = plt.subplots(1, 1, sharex="all") axes.plot(time_axes[i], mean_frequencies[i], label="voltage") diff --git a/RelationAdaptionVariables.py b/RelationAdaptionVariables.py index 55fc47f..b157cba 100644 --- a/RelationAdaptionVariables.py +++ b/RelationAdaptionVariables.py @@ -1,6 +1,6 @@ from models.FirerateModel import FirerateModel -from models.LIFAC import LIFACModel +from models.LIFACnoise import LifacNoiseModel from stimuli.StepStimulus import StepStimulus import helperFunctions as hf import numpy as np @@ -13,7 +13,7 @@ def main(): values = [1] # np.arange(5, 40, 1) parameter = "currently not active" for value in values: - lifac_model = LIFACModel({"delta_a": 0}) + lifac_model = LifacNoiseModel({"delta_a": 0}) # lifac_model.set_variable(parameter, value) stimulus_strengths = np.arange(50, 60, 1) @@ -36,9 +36,12 @@ def find_fitting_line(lifac_model, stimulus_strengths): if len(spiketimes) == 0: frequencies.append(0) continue - time, freq = hf.calculate_isi_frequency_trace(spiketimes, 0, lifac_model.get_sampling_interval() / 1000) + time, freq = hf.calculate_time_and_frequency_trace(spiketimes, lifac_model.get_sampling_interval()) - frequencies.append(freq[-1]) + if len(freq) == 0: + frequencies.append(0) + else: + frequencies.append(freq[-1]) popt, pcov = curve_fit(fu.line, stimulus_strengths, frequencies) print("line:", popt) @@ -72,10 +75,14 @@ def find_relation(lifac, line_vars, stimulus_strengths, parameter="", value=0, c stimulus = StepStimulus(0, duration, stim) lifac.simulate(stimulus, duration) spiketimes = lifac.get_spiketimes() - time, freq = hf.calculate_isi_frequency_trace(spiketimes, 0, lifac.get_sampling_interval() / 1000) - - adapted_frequencies.append(freq[-1]) - goal_adapted_freq = freq[-1] + time, freq = hf.calculate_time_and_frequency_trace(spiketimes, lifac.get_sampling_interval()) + + if len(freq) == 0: + adapted_frequencies.append(0) + goal_adapted_freq = 0 + else: + adapted_frequencies.append(freq[-1]) + goal_adapted_freq = freq[-1] # assume fitted linear firing rate as basis of the fire-rate model: stimulus_strength_after_adaption = fu.inverse_line(goal_adapted_freq, line_vars[0], line_vars[1]) diff --git a/fit_lifacnoise.py b/fit_lifacnoise.py index 21e4e19..7f080ed 100644 --- a/fit_lifacnoise.py +++ b/fit_lifacnoise.py @@ -2,7 +2,7 @@ from models.LIFACnoise import LifacNoiseModel from CellData import CellData, icelldata_of_dir from FiCurve import FICurve from AdaptionCurrent import Adaption -from stimuli.SinusAmplitudeModulation import SinusAmplitudeModulationStimulus +from stimuli.SinusoidalStepStimulus import SinusoidalStepStimulus import helperFunctions as hF import numpy as np from scipy.optimize import curve_fit, minimize @@ -15,23 +15,39 @@ def main(): # run_test_with_fixed_model() # quit() - fitter = Fitter() - fmin, params = fitter.fit_model_to_values(700, 1400, [-0.3], 1, [0.7, 0.8, 0.9, 1, 1.1, 1.2, 1.3], [1370, 1380, 1390, 1400, 1410, 1420, 1430], 100, 0.02, 0.01) + # fitter = Fitter() + # fmin, params = fitter.fit_model_to_values(700, 1400, [-0.3], 1, [0.7, 0.8, 0.9, 1, 1.1, 1.2, 1.3], [1370, 1380, 1390, 1400, 1410, 1420, 1430], 100, 0.02, 0.01) + # print("calculated parameters:") + # print(params) + run_with_real_data() - print("calculated parameters:") - print(params) def run_with_real_data(): for celldata in icelldata_of_dir("./data/"): start_time = time.time() - fitter = Fitter(celldata) - fmin, parameters = fitter.fit_model_to_data() + fitter = Fitter() + fmin, parameters = fitter.fit_model_to_data(celldata) print(fmin) print(parameters) end_time = time.time() print('Fitting of cell took function took {:.3f} s'.format((end_time - start_time))) + res_model = LifacNoiseModel(parameters) + m_bf, m_vs, m_sc = res_model.calculate_baseline_markers(celldata.get_base_frequency()) + m_f_values, m_f_slope = res_model.calculate_fi_markers(celldata.get_fi_contrasts(), celldata.get_base_frequency(), 10) + + c_bf = celldata.get_base_frequency() + c_vs = celldata.get_vector_strength() + c_sc = celldata.get_serial_correlation(1) + fi_curve = FICurve(celldata) + c_f_slope = fi_curve.get_f_infinity_slope() + c_f_values = fi_curve.f_infinities + print("bf: cell - {:.2f} vs model {:.2f}".format(c_bf, m_bf)) + print("vs: cell - {:.2f} vs model {:.2f}".format(c_vs, m_vs)) + print("sc: cell - {:.2f} vs model {:.2f}".format(c_sc[0], m_sc[0])) + print("f_slope: cell - {:.2f} vs model {:.2f}".format(c_f_slope, m_f_slope)) + print("f values:\n cell -", c_f_values, "\n model -", m_f_values) pass @@ -97,46 +113,53 @@ class Fitter: fi_curve = FICurve(data, contrast=True) self.fi_contrasts = fi_curve.stimulus_value + print("Fitter: fi-contrasts", self.fi_contrasts) self.f_infinities = fi_curve.f_infinities self.f_infinities_slope = fi_curve.get_f_infinity_slope() f_zero_slope = fi_curve.get_fi_curve_slope_of_straight() - self.a_delta = f_zero_slope / self.f_infinities_slope + self.a_delta = (f_zero_slope / self.f_infinities_slope) adaption = Adaption(data, fi_curve) self.a_tau = adaption.get_tau_real() # mem_tau, (threshold?), (v_offset), noise_strength, input_scaling - def cost_function(self, X, tau_a=10, delta_a=3, error_scaling=()): - freq_sampling_rate = 0.005 + def cost_function(self, X, tau_a=0.001, delta_a=3, error_scaling=()): # set model parameters to the given ones: self.model.set_variable("mem_tau", X[0]) self.model.set_variable("noise_strength", X[1]) self.model.set_variable("input_scaling", X[2]) - self.model.set_variable("tau_a", tau_a) - self.model.set_variable("delta_a", delta_a) + self.model.set_variable("tau_a", X[3]) + self.model.set_variable("delta_a", X[4]) + # self.model.set_variable("tau_a", tau_a) + # self.model.set_variable("delta_a", delta_a) # minimize the difference in baseline_freq first by fitting v_offset # v_offset = self.__fit_v_offset_to_baseline_frequency__() - base_stimulus = SinusAmplitudeModulationStimulus(self.eod_freq, 0, 0) + base_stimulus = SinusoidalStepStimulus(self.eod_freq, 0) v_offset = self.model.find_v_offset(self.baseline_freq, base_stimulus) self.model.set_variable("v_offset", v_offset) + baseline_freq, vector_strength, serial_correlation = self.model.calculate_baseline_markers(self.baseline_freq, self.sc_max_lag) # only eod with amplitude 1 and no modulation - _, spiketimes = self.model.simulate_fast(base_stimulus, 30) - - baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 5) - # print("model:", baseline_freq, "data:", self.baseline_freq) - - relative_spiketimes = np.array([s % (1/self.eod_freq) for s in spiketimes if s > 0]) - eod_durations = np.full((len(relative_spiketimes)), 1/self.eod_freq) - vector_strength = hF.__vector_strength__(relative_spiketimes, eod_durations) - serial_correlation = hF.calculate_serial_correlation(np.array(spiketimes), self.sc_max_lag) + # _, spiketimes = self.model.simulate_fast(base_stimulus, 30) + + # baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 5) + # # print("model:", baseline_freq, "data:", self.baseline_freq) + # + # if len(spiketimes) > 10: + # relative_spiketimes = np.array([s % (1/self.eod_freq) for s in spiketimes if s > 0]) + # eod_durations = np.full((len(relative_spiketimes)), 1/self.eod_freq) + # vector_strength = hF.__vector_strength__(relative_spiketimes, eod_durations) + # serial_correlation = hF.calculate_serial_correlation(np.array(spiketimes), self.sc_max_lag) + # else: + # vector_strength = 0 + # serial_correlation = [0]*self.sc_max_lag f_infinities = [] for contrast in self.fi_contrasts: - stimulus = SinusAmplitudeModulationStimulus(self.eod_freq, contrast, self.modulation_frequency) + stimulus = SinusoidalStepStimulus(self.eod_freq, contrast) _, spiketimes = self.model.simulate_fast(stimulus, 1) if len(spiketimes) < 2: @@ -148,7 +171,8 @@ class Fitter: popt, pcov = curve_fit(fu.line, self.fi_contrasts, f_infinities, maxfev=10000) f_infinities_slope = popt[0] - + if self.counter == 600: + bf, vs, sc = self.model.calculate_baseline_markers(self.eod_freq) error_bf = abs((baseline_freq - self.baseline_freq) / self.baseline_freq) error_vs = abs((vector_strength - self.vector_strength) / self.vector_strength) error_sc = abs((serial_correlation[0] - self.serial_correlation[0]) / self.serial_correlation[0]) @@ -161,14 +185,18 @@ class Fitter: error_f_inf += abs((f_infinities[i] - self.f_infinities[i]) / f_infinities[i]) error_f_inf = error_f_inf / len(f_infinities) - self.counter += 1 + # print("mem_tau:", X[0], "noise:", X[0], "input_scaling:", X[2]) errors = [error_bf, error_vs, error_sc, error_f_inf_slope, error_f_inf] - print("Cost function run times:", self.counter, "error sum:", sum(errors), errors) + + self.counter += 1 + if self.counter % 10 == 0: + print("Cost function run times:", self.counter, "error sum:", sum(errors), errors) return error_bf + error_vs + error_sc + error_f_inf_slope + error_f_inf def fit_model_to_data(self, data: CellData): self.calculate_needed_values_from_data(data) + self.counter = 0 return self.fit_model() def fit_model_to_values(self, eod_freq, baseline_freq, sc, vs, fi_contrasts, fi_inf_values, fi_inf_slope, a_delta, a_tau): @@ -185,9 +213,9 @@ class Fitter: return self.fit_model() def fit_model(self): - x0 = np.array([20, 15, 75]) - init_simplex = np.array([np.array([2, 1, 10]), np.array([40, 100, 140]), np.array([20, 50, 70]), np.array([150, 1, 200])]) - fmin = minimize(fun=self.cost_function, x0=x0, args=(self.a_tau, self.a_delta), method="Nelder-Mead", options={"initial_simplex": init_simplex}) + x0 = np.array([0.02, 3, 75, 0.05, 20]) + init_simplex = np.array([np.array([2, 1, 10]), np.array([40, 10, 140]), np.array([20, 20, 70]), np.array([150, 1, 200])]) + fmin = minimize(fun=self.cost_function, x0=x0, args=(self.a_tau, self.a_delta), method="Nelder-Mead") #, options={"initial_simplex": init_simplex}) return fmin, self.model.get_parameters() diff --git a/generalTests.py b/generalTests.py index 8815002..6139142 100644 --- a/generalTests.py +++ b/generalTests.py @@ -11,6 +11,9 @@ from thunderfish.eventdetection import detect_peaks from stimuli.SinusAmplitudeModulation import SinusAmplitudeModulationStimulus from models.LIFACnoise import LifacNoiseModel from FiCurve import FICurve +from AdaptionCurrent import Adaption +from stimuli.StepStimulus import StepStimulus +from stimuli.SinusoidalStepStimulus import SinusoidalStepStimulus def time_test_function(): @@ -101,6 +104,7 @@ def test_simulation_speed(): def test_fi_curve_class(): for cell_data in icelldata_of_dir("./data/"): fi_curve = FICurve(cell_data) + fi_curve.get_f_zero_and_f_inf_intersection() fi_curve.plot_fi_curve() # fi_curve.plot_f_point_detections() @@ -108,6 +112,20 @@ def test_fi_curve_class(): pass +def test_adaption_class(): + for cell_data in icelldata_of_dir("./data/"): + print() + print(cell_data.get_data_path()) + fi_curve = FICurve(cell_data) + adaption = Adaption(cell_data, fi_curve) + + adaption.plot_exponential_fits() + + print("tau_effs:", adaption.get_tau_effs()) + print("tau_real:", adaption.get_tau_real()) + fi_curve.plot_fi_curve() + + def test_parameters(): parameters = {'mem_tau': 21., 'delta_a': 0.1, 'input_scaling': 400., 'v_offset': 85.25, 'threshold': 0.1, 'v_base': 0, 'step_size': 0.00005, 'tau_a': 0.01, @@ -132,11 +150,28 @@ def test_parameters(): print("f infinities: \n", f_infs) +def test_vector_strength_calculation(): + model = LifacNoiseModel({"noise_strength": 0}) + + + bf, vs1, sc = model.calculate_baseline_markers(600) + + base_stim = SinusAmplitudeModulationStimulus(600, 0, 0) + _, spiketimes = model.simulate_fast(base_stim, 30) + stimulus_trace = base_stim.as_array(0, 30, model.get_sampling_interval()) + time_trace = np.arange(0, 30, model.get_sampling_interval()) + + vs2 = hf.calculate_vector_strength_from_spiketimes(time_trace, stimulus_trace, spiketimes, model.get_sampling_interval()) + + print("with assumed eod durations vs: {:.3f}".format(vs1)) + print("with detected eod durations vs: {:.3f}".format(vs2)) + + def plot_model_during_stimulus(model: LifacNoiseModel, stimulus:SinusAmplitudeModulationStimulus, total_time): - model.simulate_fast(stimulus, total_time) + _, spiketimes = model.simulate_fast(stimulus, total_time) time = np.arange(0, total_time, model.get_sampling_interval()) - fig, axes = plt.subplots(4, 1, figsize=(9, 4*2), sharex="all") + fig, axes = plt.subplots(5, 1, figsize=(9, 4*2), sharex="all") stimulus_array = stimulus.as_array(0, total_time, model.get_sampling_interval()) axes[0].plot(time, stimulus_array) @@ -147,6 +182,11 @@ def plot_model_during_stimulus(model: LifacNoiseModel, stimulus:SinusAmplitudeMo axes[2].set_title("Voltage") axes[3].plot(time, model.get_adaption_trace()) axes[3].set_title("Adaption") + + f_time, f = hf.calculate_time_and_frequency_trace(spiketimes, model.get_sampling_interval()) + axes[4].plot(f_time, f) + + axes[4].set_title("Frequency") plt.show() @@ -155,11 +195,23 @@ def rectify_stimulus_array(stimulus_array: np.ndarray): if __name__ == '__main__': + + model = LifacNoiseModel() + stim = SinusoidalStepStimulus(700, 0.2, start_time=1, duration=1) + bf, vs, sc = model.calculate_baseline_markers(700) + print(bf, vs, "\n", sc) + plot_model_during_stimulus(model, stim, 3) + + quit() + + # time_test_function() # test_cell_data() # test_peak_detection() # test_simulation_speed() # test_parameters() - test_fi_curve_class() + # test_fi_curve_class() + # test_adaption_class() + test_vector_strength_calculation() pass diff --git a/helperFunctions.py b/helperFunctions.py index fff59f1..68f1dcd 100644 --- a/helperFunctions.py +++ b/helperFunctions.py @@ -102,8 +102,9 @@ def calculate_isi_frequency_trace(spiketimes, sampling_interval, time_in_ms=Fals return [] isis = np.diff(spiketimes) - if sampling_interval > min(isis): - raise ValueError("The sampling interval is bigger than the some isis! cannot accurately compute the trace.") + if sampling_interval > round(min(isis), 7): + raise ValueError("The sampling interval is bigger than the some isis! cannot accurately compute the trace.\n" + "Sampling interval {:.5f}, smallest isi: {:.5f}".format(sampling_interval, min(isis))) if time_in_ms: isis = isis / 1000 @@ -125,10 +126,18 @@ def calculate_isi_frequency_trace(spiketimes, sampling_interval, time_in_ms=Fals def calculate_time_and_frequency_trace(spiketimes, sampling_interval, time_in_ms=False): + if len(spiketimes) < 2: + pass # raise ValueError("Cannot compute a time and frequency vector with fewer than 2 spikes") + frequency = calculate_isi_frequency_trace(spiketimes, sampling_interval, time_in_ms) time = np.arange(spiketimes[0], spiketimes[-1], sampling_interval) + if len(time) != len(frequency): + if len(time) > len(frequency): + time = time[:len(frequency)] + + return time, frequency @@ -245,7 +254,7 @@ def calculate_eod_frequency(time, eod): return 1/mean_duration -def calculate_vector_strength(times, eods, v1_traces): +def calculate_vector_strength_from_v1_trace(times, eods, v1_traces): # Vectorstaerke (use EOD frequency from header (metadata)) VS > 0.8 # dl.iload_traces(repro='BaselineActivity') @@ -260,7 +269,7 @@ def calculate_vector_strength(times, eods, v1_traces): rel_spikes, eod_durs = eods_around_spikes(times[recording], eods[recording], spiketime_idices) relative_spike_times.extend(rel_spikes) eod_durations.extend(eod_durs) - print(__vector_strength__(np.array(rel_spikes), np.array(eod_durs))) + # print(__vector_strength__(np.array(rel_spikes), np.array(eod_durs))) relative_spike_times = np.array(relative_spike_times) eod_durations = np.array(eod_durations) @@ -268,6 +277,13 @@ def calculate_vector_strength(times, eods, v1_traces): return __vector_strength__(relative_spike_times, eod_durations) +def calculate_vector_strength_from_spiketimes(time, eod, spiketimes, sampling_interval): + spiketime_indices = np.array(np.around((np.array(spiketimes) + time[0]) / sampling_interval), dtype=int) + rel_spikes, eod_durs = eods_around_spikes(time, eod, spiketime_indices) + + return __vector_strength__(rel_spikes, eod_durs) + + def detect_spikes(v1, split=20, threshold=3): total = len(v1) all_peaks = [] @@ -299,15 +315,31 @@ def eods_around_spikes(time, eod, spiketime_idices): eod_durations = [] relative_spike_times = [] - for spike_idx in spiketime_idices: - - start_time, end_time = search_eod_start_and_end_times(time, eod, spike_idx) + sign_changes = np.sign(eod[:-1]) != np.sign(eod[1:]) + eod_trace_increasing = eod[:-1] < eod[1:] - eod_durations.append(end_time-start_time) - spiketime = time[spike_idx] - relative_spike_times.append(spiketime - start_time) - - return relative_spike_times, eod_durations + eod_zero_crossings_indices = np.where(sign_changes & eod_trace_increasing)[0] + for spike_idx in spiketime_idices: + # test if it is inside two detected crossings + if eod_zero_crossings_indices[0] > spike_idx > eod_zero_crossings_indices[-1]: + continue + zero_crossing_index_of_eod_end = np.argmax(eod_zero_crossings_indices > spike_idx) + end_time_idx = eod_zero_crossings_indices[zero_crossing_index_of_eod_end] + start_time_idx = eod_zero_crossings_indices[zero_crossing_index_of_eod_end - 1] + + eod_durations.append(time[end_time_idx] - time[start_time_idx]) + relative_spike_times.append(time[spike_idx] - time[start_time_idx]) + + # try: + # start_time, end_time = search_eod_start_and_end_times(time, eod, spike_idx) + # + # eod_durations.append(end_time-start_time) + # spiketime = time[spike_idx] + # relative_spike_times.append(spiketime - start_time) + # except IndexError as e: + # continue + + return np.array(relative_spike_times), np.array(eod_durations) def search_eod_start_and_end_times(time, eod, index): diff --git a/introduction/test.py b/introduction/test.py index e120870..73110b1 100644 --- a/introduction/test.py +++ b/introduction/test.py @@ -4,6 +4,26 @@ import helperFunctions as hF import time def main(): + + + time = np.arange(-1, 30, 0.0001) + eod = np.sin(2*np.pi * 600 * time) + + + signs = np.sign(eod[:-1]) != np.sign(eod[1:]) + delta = eod[:-1] < eod[1:] + + sign_changes = np.where(signs & delta)[0] + + plt.plot(time, eod) + plt.plot([time[i] for i in sign_changes], [0]*len(sign_changes), 'o') + plt.show() + + + print(sign_changes) + quit() + + for freq in [700, 50, 100, 500, 1000]: reps = 1000 start = time.time() @@ -57,9 +77,4 @@ def abs_phase_diff(rel_phases:list, ref_phase:float): if __name__ == '__main__': - print(-2.4%0.35, (int(-2.4/0.35)-1)*0.35) - - hF.calculate_isi_frequency_trace([0, 2, 1, 3], 0.5) - - - #main() + main() diff --git a/models/LIFACnoise.py b/models/LIFACnoise.py index 3ebd88e..cedefb0 100644 --- a/models/LIFACnoise.py +++ b/models/LIFACnoise.py @@ -5,7 +5,7 @@ import numpy as np import functions as fu from numba import jit import helperFunctions as hF -from stimuli.SinusAmplitudeModulation import SinusAmplitudeModulationStimulus +from stimuli.SinusoidalStepStimulus import SinusoidalStepStimulus from scipy.optimize import curve_fit from warnings import warn @@ -13,22 +13,22 @@ from warnings import warn class LifacNoiseModel(AbstractModel): # all times in milliseconds # possible mem_res: 100 * 1000000 exact value unknown in p-units - DEFAULT_VALUES = {"mem_tau": 20, + DEFAULT_VALUES = {"mem_tau": 0.015, "v_base": 0, "v_zero": 0, "threshold": 1, - "v_offset": 50, - "input_scaling": 1, - "delta_a": 0.4, - "tau_a": 0.04, - "a_zero": 0, - "noise_strength": 3, + "v_offset": -10, + "input_scaling": 60, + "delta_a": 0.08, + "tau_a": 0.1, + "a_zero": 10, + "noise_strength": 0.05, "step_size": 0.00005} def __init__(self, params: dict = None): super().__init__(params) - if self.parameters["step_size"] >= 0.0001: + if self.parameters["step_size"] > 0.0001: warn("LifacNoiseModel: The step size is quite big simulation could fail.") self.voltage_trace = [] self.adaption_trace = [] @@ -60,7 +60,7 @@ class LifacNoiseModel(AbstractModel): if v_next > self.parameters["threshold"]: v_next = self.parameters["v_base"] spiketimes.append(time_point) - a_next += self.parameters["delta_a"] / (self.parameters["tau_a"]) + a_next += self.parameters["delta_a"] / self.parameters["tau_a"] output_voltage[i] = v_next adaption[i] = a_next @@ -164,17 +164,22 @@ class LifacNoiseModel(AbstractModel): :return: baseline_freq, vs, sc """ - base_stimulus = SinusAmplitudeModulationStimulus(base_stimulus_freq, 0, 0) + base_stimulus = SinusoidalStepStimulus(base_stimulus_freq, 0) _, spiketimes = self.simulate_fast(base_stimulus, 30) time_x = 5 baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, time_x) - relative_spiketimes = np.array([s % (1 / base_stimulus_freq) for s in spiketimes]) - eod_durations = np.full((len(spiketimes)), 1 / base_stimulus_freq) - vector_strength = hF.__vector_strength__(relative_spiketimes, eod_durations) - serial_correlation = hF.calculate_serial_correlation(np.array(spiketimes), max_lag) + if baseline_freq < 1: + return baseline_freq, 0, [0]*max_lag - return baseline_freq, vector_strength, serial_correlation + else: + time_trace = np.arange(0, 30, self.get_sampling_interval()) + stimulus_array = base_stimulus.as_array(0, 30, self.get_sampling_interval()) + + vector_strength = hF.calculate_vector_strength_from_spiketimes(time_trace, stimulus_array, spiketimes, self.get_sampling_interval()) + serial_correlation = hF.calculate_serial_correlation(np.array(spiketimes), max_lag) + + return baseline_freq, vector_strength, serial_correlation def calculate_fi_markers(self, contrasts, base_freq, modulation_frequency): """ @@ -185,7 +190,7 @@ class LifacNoiseModel(AbstractModel): f_infinities = [] for contrast in contrasts: - stimulus = SinusAmplitudeModulationStimulus(base_freq, contrast, modulation_frequency) + stimulus = SinusoidalStepStimulus(base_freq, contrast) _, spiketimes = self.simulate_fast(stimulus, 1) f_infinity = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 0.3) @@ -197,7 +202,7 @@ class LifacNoiseModel(AbstractModel): return f_infinities, f_infinities_slope - def find_v_offset(self, goal_baseline_frequency, base_stimulus, threshold=10): + def find_v_offset(self, goal_baseline_frequency, base_stimulus, threshold=10, border=50000): test_model = self.get_model_copy() simulation_length = 5 @@ -208,9 +213,14 @@ class LifacNoiseModel(AbstractModel): current_freq = test_v_offset(test_model, current_v_offset, base_stimulus, simulation_length) while current_freq < goal_baseline_frequency: + if current_v_offset >= border: + return border current_v_offset += v_search_step_size current_freq = test_v_offset(test_model, current_v_offset, base_stimulus, simulation_length) + if current_v_offset == 0: + return -1000 + lower_bound = current_v_offset - v_search_step_size upper_bound = current_v_offset @@ -218,13 +228,15 @@ class LifacNoiseModel(AbstractModel): def binary_search_base_freq(model: LifacNoiseModel, base_stimulus, goal_frequency, simulation_length, lower_bound, upper_bound, threshold): - + counter = 0 if threshold <= 0: raise ValueError("binary_search_base_freq() - LifacNoiseModel: threshold is not allowed to be negative!") while True: + counter += 1 middle = upper_bound - (upper_bound - lower_bound)/2 frequency = test_v_offset(model, middle, base_stimulus, simulation_length) - + if counter > 100: + print("meep") # print('{:.1f}, {:.1f}, {:.1f}, {:.1f} vs {:.1f} '.format(lower_bound, middle, upper_bound, frequency, goal_frequency)) if abs(frequency - goal_frequency) < threshold: return middle @@ -255,7 +267,6 @@ def rectify_stimulus_array(stimulus_array: np.ndarray): @jit(nopython=True) def simulate_fast(rectified_stimulus_array, total_time_s, parameters: np.ndarray): - v_zero = parameters[0] a_zero = parameters[1] step_size = parameters[2] @@ -273,7 +284,6 @@ def simulate_fast(rectified_stimulus_array, total_time_s, parameters: np.ndarray length = len(time) output_voltage = np.zeros(length) adaption = np.zeros(length) - stimulus_values = rectified_stimulus_array spiketimes = [] output_voltage[0] = v_zero @@ -284,7 +294,7 @@ def simulate_fast(rectified_stimulus_array, total_time_s, parameters: np.ndarray noise_value = np.random.normal() noise = noise_strength * noise_value / np.sqrt(step_size) - output_voltage[i] = output_voltage[i-1] + ((v_base - output_voltage[i-1] + v_offset + (stimulus_values[i]*input_scaling) - adaption[i-1] + noise) / mem_tau) * step_size + output_voltage[i] = output_voltage[i-1] + ((v_base - output_voltage[i-1] + v_offset + (rectified_stimulus_array[i] * input_scaling) - adaption[i-1] + noise) / mem_tau) * step_size adaption[i] = adaption[i-1] + ((-adaption[i-1]) / tau_a) * step_size if output_voltage[i] > threshold: diff --git a/stimuli/SinusAmplitudeModulation.py b/stimuli/SinusAmplitudeModulation.py index 3ff27aa..3157f35 100644 --- a/stimuli/SinusAmplitudeModulation.py +++ b/stimuli/SinusAmplitudeModulation.py @@ -81,43 +81,43 @@ def convert_to_array(carrier_freq, amplitude, modulation_freq, contrast, start_t return values - # if the whole stimulus time has the amplitude modulation just built it at once; - if time_start >= start_time and start_time+duration < time_start+total_time: - carrier = np.sin(2 * np.pi * carrier_freq * np.arange(start_time, total_time - start_time, step_size_s)) - modulation = 1 + contrast * np.sin(2 * np.pi * modulation_freq * np.arange(start_time, total_time - start_time, step_size_s)) - values = amplitude * carrier * modulation - return values - - # if it is split into parts with and without amplitude modulation built it in parts: - values = np.array([]) - - # there is some time before the modulation starts: - if time_start < start_time: - carrier_before_am = np.sin(2 * np.pi * carrier_freq * np.arange(time_start, start_time, step_size_s)) - values = np.concatenate((values, amplitude * carrier_before_am)) - - # there is at least a second part of the stimulus that contains the amplitude: - # time starts before the end of the am and ends after it was started - if time_start < start_time+duration and time_start+total_time > start_time: - if duration is np.inf: - - carrier_during_am = np.sin( - 2 * np.pi * carrier_freq * np.arange(start_time, time_start + total_time, step_size_s)) - am = 1 + contrast * np.sin( - 2 * np.pi * modulation_freq * np.arange(start_time, time_start + total_time, step_size_s)) - else: - carrier_during_am = np.sin( - 2 * np.pi * carrier_freq * np.arange(start_time, start_time + duration, step_size_s)) - am = 1 + contrast * np.sin( - 2 * np.pi * modulation_freq * np.arange(start_time, start_time + duration, step_size_s)) - values = np.concatenate((values, amplitude * am * carrier_during_am)) - - else: - if contrast != 0: - print("Given stimulus time parameters (start, total) result in no part of it containing the amplitude modulation!") - - if time_start+total_time > start_time+duration: - carrier_after_am = np.sin(2 * np.pi * carrier_freq * np.arange(start_time + duration, time_start + total_time, step_size_s)) - values = np.concatenate((values, amplitude*carrier_after_am)) - - return values + # # if the whole stimulus time has the amplitude modulation just built it at once; + # if time_start >= start_time and start_time+duration < time_start+total_time: + # carrier = np.sin(2 * np.pi * carrier_freq * np.arange(start_time, total_time - start_time, step_size_s)) + # modulation = 1 + contrast * np.sin(2 * np.pi * modulation_freq * np.arange(start_time, total_time - start_time, step_size_s)) + # values = amplitude * carrier * modulation + # return values + # + # # if it is split into parts with and without amplitude modulation built it in parts: + # values = np.array([]) + # + # # there is some time before the modulation starts: + # if time_start < start_time: + # carrier_before_am = np.sin(2 * np.pi * carrier_freq * np.arange(time_start, start_time, step_size_s)) + # values = np.concatenate((values, amplitude * carrier_before_am)) + # + # # there is at least a second part of the stimulus that contains the amplitude: + # # time starts before the end of the am and ends after it was started + # if time_start < start_time+duration and time_start+total_time > start_time: + # if duration is np.inf: + # + # carrier_during_am = np.sin( + # 2 * np.pi * carrier_freq * np.arange(start_time, time_start + total_time, step_size_s)) + # am = 1 + contrast * np.sin( + # 2 * np.pi * modulation_freq * np.arange(start_time, time_start + total_time, step_size_s)) + # else: + # carrier_during_am = np.sin( + # 2 * np.pi * carrier_freq * np.arange(start_time, start_time + duration, step_size_s)) + # am = 1 + contrast * np.sin( + # 2 * np.pi * modulation_freq * np.arange(start_time, start_time + duration, step_size_s)) + # values = np.concatenate((values, amplitude * am * carrier_during_am)) + # + # else: + # if contrast != 0: + # print("Given stimulus time parameters (start, total) result in no part of it containing the amplitude modulation!") + # + # if time_start+total_time > start_time+duration: + # carrier_after_am = np.sin(2 * np.pi * carrier_freq * np.arange(start_time + duration, time_start + total_time, step_size_s)) + # values = np.concatenate((values, amplitude*carrier_after_am)) + # + # return values diff --git a/stimuli/SinusoidalStepStimulus.py b/stimuli/SinusoidalStepStimulus.py new file mode 100644 index 0000000..bd383b9 --- /dev/null +++ b/stimuli/SinusoidalStepStimulus.py @@ -0,0 +1,75 @@ +from stimuli.AbstractStimulus import AbstractStimulus +import numpy as np +from numba import jit, njit + + +class SinusoidalStepStimulus(AbstractStimulus): + + def __init__(self, frequency, contrast, start_time=0, duration=np.inf, amplitude=1): + self.contrast = 1 + contrast + self.amplitude = amplitude + self.frequency = frequency + self.start_time = start_time + self.duration = duration + + def value_at_time_in_s(self, time_point): + carrier = np.sin(2 * np.pi * self.frequency * time_point) + + if time_point < self.start_time or time_point > self.start_time + self.duration: + return self.amplitude * carrier * self.contrast + + return self.amplitude * carrier + + def get_stimulus_start_s(self): + return self.start_time + + def get_stimulus_duration_s(self): + return self.duration + + def get_amplitude(self): + return self.contrast + + def as_array(self, time_start, total_time, step_size): + frequency = self.frequency + amp = self.amplitude + contrast = self.contrast + start_time = self.start_time + duration = self.duration + + values = convert_to_array(frequency, amp, contrast, start_time, duration, time_start, total_time, step_size) + + return values + + +# @jit(nopython=True) # makes it slower? +def convert_to_array(frequency, amplitude, contrast, start_time, duration, time_start, total_time, step_size_s): + full_time = np.arange(time_start, time_start + total_time, step_size_s) + full_carrier = np.sin(2 * np.pi * frequency * full_time) + if start_time > time_start+duration or start_time+duration < time_start: + return full_carrier * amplitude + else: + if start_time >= time_start: + am_start = start_time + else: + am_start = time_start + + if time_start + total_time >= start_time + duration: + am_end = start_time + duration + else: + am_end = time_start + total_time + + + idx_start = (am_start - time_start) / step_size_s + idx_end = (am_end - time_start) / step_size_s + + if idx_start != round(idx_start) or idx_end != round(idx_end): + raise ValueError("Didn't calculate integers when searching the start and end index. start:", idx_start, "end:", idx_end) + # print("am_start: {:.0f}, am_end: {:.0f}, length: {:.0f}".format(am_start, am_end, am_end-am_start)) + + idx_start = int(idx_start) + idx_end = int(idx_end) + + values = full_carrier * amplitude + values[idx_start:idx_end] = values[idx_start:idx_end]*contrast + + return values diff --git a/stimuli/StepStimulus.py b/stimuli/StepStimulus.py index 34a6ac4..1112776 100644 --- a/stimuli/StepStimulus.py +++ b/stimuli/StepStimulus.py @@ -1,6 +1,6 @@ from stimuli.AbstractStimulus import AbstractStimulus - +import numpy as np class StepStimulus(AbstractStimulus): @@ -31,3 +31,36 @@ class StepStimulus(AbstractStimulus): def get_amplitude(self): return self.value - self.base_value + + def as_array(self, time_start, total_time, step_size): + values = np.full(int(total_time/step_size), self.base_value) + + if self.start > time_start + self.duration or self.start + self.duration < time_start: + return values + else: + if self.start >= time_start: + am_start = self.start + else: + am_start = time_start + + if time_start + total_time >= self.start + self.duration: + am_end = self.start + self.duration + else: + am_end = time_start + total_time + + idx_start = (am_start - time_start) / step_size + idx_end = (am_end - time_start) / step_size + + if idx_start != round(idx_start) or idx_end != round(idx_end): + raise ValueError("Didn't calculate integers when searching the start and end index. start:", idx_start, + "end:", idx_end) + # print("am_start: {:.0f}, am_end: {:.0f}, length: {:.0f}".format(am_start, am_end, am_end-am_start)) + + idx_start = int(idx_start) + idx_end = int(idx_end) + + values[idx_start:idx_end+1] = self.value + + return values + + diff --git a/unittests/testFrequencyFunctions.py b/unittests/testFrequencyFunctions.py index bb03e45..c5d8ac1 100644 --- a/unittests/testFrequencyFunctions.py +++ b/unittests/testFrequencyFunctions.py @@ -126,7 +126,7 @@ class FrequencyFunctionsTester(unittest.TestCase): freq_traces = [test1_f, test2_f] time, mean = hF.calculate_mean_of_frequency_traces(time_traces, freq_traces, 0.5) - expected_time = np.arange(0.5, 7.5, 0.5) + expected_time = np.arange(0.5, 7, 0.5) expected_mean = [0.75, 1.25, 1.25, 2, 2, 2.5] time_equal = np.all([time[i] == expected_time[i] for i in range(len(time))]) @@ -137,7 +137,11 @@ class FrequencyFunctionsTester(unittest.TestCase): self.assertEqual(len(expected_time), len(time), msg="expected:\n" + str(expected_time) + "\n actual: \n" + str(time)) # TODO: - # all_calculate_mean_isi_frequency_traces(spiketimes, sampling_interval, time_in_ms=True): + # all_calculate_mean_isi_frequency_traces(spiketimes, sampling_interval, time_in_ms=False): + + + #def test_all_calculate_mean_isi_frequency_traces(self): + # hF.all_calculate_mean_isi_frequency_traces(, def generate_jittered_spiketimes(frequency, noise_level=0., start=0, end=5, method='normal'): diff --git a/unittests/testHelperFunctions.py b/unittests/testHelperFunctions.py index 7618176..76576e2 100644 --- a/unittests/testHelperFunctions.py +++ b/unittests/testHelperFunctions.py @@ -30,7 +30,23 @@ class HelperFunctionsTester(unittest.TestCase): self.assertEqual(0, round(hF.__vector_strength__(rel_spike_times, eod_durations), 5)) - + # def test_eods_around_spikes(self): + # + # time = np.arange(0, 3, 0.01) + # eod = np.sin(2*np.pi * 2 * time) + # + # spikes = [0.2, 0.5, 0.6] + # indices = np.searchsorted(time, spikes) + # + # rel_spike_time, eod_duration = hF.eods_around_spikes(time, eod, indices) + # + # print("meep") + + + # todo + # search_eod_start_and_end_times ? (not used anymore ?) + # eods_around_spikes + # calculate_phases # def test(self): # test_distribution()