tests, correct stimulus-type for fi-curve, general changes

This commit is contained in:
a.ott 2020-03-11 18:03:21 +01:00
parent 15166042be
commit 6012927416
14 changed files with 466 additions and 155 deletions

View File

@ -24,13 +24,14 @@ class Adaption:
self.fit_exponential()
self.calculate_tau_from_tau_eff()
def fit_exponential(self, length_of_fit=0.05):
def fit_exponential(self, length_of_fit=0.1):
mean_frequencies = self.cell_data.get_mean_isi_frequencies()
time_axes = self.cell_data.get_time_axes_mean_frequencies()
for i in range(len(mean_frequencies)):
start_idx = self.__find_start_idx_for_exponential_fit(i)
if start_idx == -1:
print("start index negative")
self.exponential_fit_vars.append([])
continue
@ -60,6 +61,7 @@ class Adaption:
# Obviously a bad fit - time constant, expected in range 3-10ms, has value over 1 second or is negative
if abs(popt[1] > 1) or popt[1] < 0:
print("detected an obviously bad fit")
self.exponential_fit_vars.append([])
else:
self.exponential_fit_vars.append(popt)
@ -81,7 +83,8 @@ class Adaption:
return tau
def __find_start_idx_for_exponential_fit(self, mean_freq_idx):
stimulus_start_idx = int((self.cell_data.get_delay() + self.cell_data.get_stimulus_start()) / self.cell_data.get_sampling_interval())
time_axes = self.cell_data.get_time_axes_mean_frequencies()[mean_freq_idx]
stimulus_start_idx = int((self.cell_data.get_stimulus_start() + time_axes[0]) / self.cell_data.get_sampling_interval())
if self.fi_curve.f_infinities[mean_freq_idx] > self.fi_curve.f_baselines[mean_freq_idx] * 1.1:
# start setting starting variables for the fit
# search for the start_index by searching for the max
@ -124,24 +127,32 @@ class Adaption:
return start_idx
def calculate_tau_from_tau_eff(self):
taus = []
tau_effs = []
for i in range(len(self.exponential_fit_vars)):
if len(self.exponential_fit_vars[i]) == 0:
continue
tau_eff = self.exponential_fit_vars[i][1]*1000 # tau_eff in ms
# intensity = self.fi_curve.stimulus_value[i]
tau_effs.append(self.exponential_fit_vars[i][1])
f_infinity_slope = self.fi_curve.get_f_infinity_slope()
fi_curve_slope = self.fi_curve.get_fi_curve_slope_of_straight()
# --- old way to calculate with the fi slope at middle of the fi curve
# fi_curve_slope = self.fi_curve.get_fi_curve_slope_of_straight()
# self.tau_real = np.median(tau_effs) * (fi_curve_slope / f_infinity_slope)
taus.append(tau_eff*(fi_curve_slope/f_infinity_slope))
# print((fi_curve_slope/f_infinity_slope))
# print(tau_eff*(fi_curve_slope/f_infinity_slope), "=", tau_eff, "*", (fi_curve_slope/f_infinity_slope))
# print("fi_slope to f_inf slope:", fi_curve_slope/f_infinity_slope)
# print("fi_slope:", fi_curve_slope, "f_inf slope:", f_infinity_slope)
# print("current tau: {:.1f}ms".format(np.median(tau_effs) * (fi_curve_slope / f_infinity_slope) * 1000))
self.tau_real = np.median(taus)
# new way to calculate with the fi curve slope at the intersection point of it and the f_inf line
factor = self.fi_curve.get_fi_curve_slope_at_f_zero_intersection() / f_infinity_slope
self.tau_real = np.median(tau_effs) * factor
print("###### tau: {:.1f}ms".format(self.tau_real*1000), "other f_0 slope:", self.fi_curve.get_fi_curve_slope_at_f_zero_intersection())
def get_tau_real(self):
return np.median(self.tau_real)
def get_tau_effs(self):
return [ex_vars[1] for ex_vars in self.exponential_fit_vars if ex_vars != []]
def plot_exponential_fits(self, save_path: str = None, indices: list = None, delete_previous: bool = False):
if delete_previous:
for val in self.cell_data.get_fi_contrasts():
@ -152,7 +163,11 @@ class Adaption:
os.remove(prev_path)
for i in range(len(self.cell_data.get_fi_contrasts())):
if indices is not None and i not in indices:
continue
if self.exponential_fit_vars[i] == []:
print("no fit vars for index!")
continue
plt.plot(self.cell_data.get_time_axes_mean_frequencies()[i], self.cell_data.get_mean_isi_frequencies()[i])

View File

@ -144,7 +144,7 @@ class CellData:
times = self.get_base_traces(self.TIME)
eods = self.get_base_traces(self.EOD)
v1_traces = self.get_base_traces(self.V1)
return hf.calculate_vector_strength(times, eods, v1_traces)
return hf.calculate_vector_strength_from_v1_trace(times, eods, v1_traces)
def get_serial_correlation(self, max_lag):
serial_cors = []

View File

@ -42,7 +42,6 @@ class FICurve:
for i in range(len(mean_frequencies)):
if time_axes[i][0] > self.cell_data.get_stimulus_start():
# TODO
warn("TODO: Deal with to strongly cut frequency traces in cell data! ")
self.f_zeros.append(-1)
self.f_baselines.append(-1)
@ -83,7 +82,6 @@ class FICurve:
end_idx = int((stim_start-buffer)/sampling_interval)
f_baseline = np.mean(frequency[start_idx:end_idx])
plt.plot((start_idx, end_idx), (f_baseline, f_baseline), label="f_baseline")
return f_baseline
def __calculate_f_zero__(self, time, frequency, peak_buffer_percent=0.05, buffer=0.025):
@ -113,9 +111,6 @@ class FICurve:
end_idx = start_idx + int((end_idx-start_idx)/2)
f_zero = np.mean(frequency[start_idx:end_idx])
plt.plot(frequency)
plt.plot((start_idx, end_idx), (f_zero, f_zero), label="f_zero, {:.2f}".format(peak_buffer))
return f_zero
# start_idx = int(stimulus_start / sampling_interval)
@ -143,18 +138,18 @@ class FICurve:
start_idx = int((stimulus_end_time - length - buffer) / self.cell_data.get_sampling_interval())
end_idx = int((stimulus_end_time - buffer) / self.cell_data.get_sampling_interval())
x = np.arange(start_idx, end_idx, 1) # time[start_idx:end_idx]
slope, intercept, r_value, p_value, std_err = linregress(x, frequency[start_idx:end_idx])
if p_value < 0.0001:
plt.title("significant slope: {:.2f}, p: {:.5f}, r: {:.5f}".format(slope, p_value, r_value))
plt.plot(x, [i*slope + intercept for i in x], color="black")
plt.plot((start_idx, end_idx), (np.mean(frequency[start_idx:end_idx]), np.mean(frequency[start_idx:end_idx])), label="f_inf")
plt.legend()
plt.show()
plt.close()
# TODO add way to plot detected f_zero, f_inf, f_base. With detection of remaining slope?
# x = np.arange(start_idx, end_idx, 1) # time[start_idx:end_idx]
# slope, intercept, r_value, p_value, std_err = linregress(x, frequency[start_idx:end_idx])
# if p_value < 0.0001:
# plt.title("significant slope: {:.2f}, p: {:.5f}, r: {:.5f}".format(slope, p_value, r_value))
# plt.plot(x, [i*slope + intercept for i in x], color="black")
#
#
# plt.plot((start_idx, end_idx), (np.mean(frequency[start_idx:end_idx]), np.mean(frequency[start_idx:end_idx])), label="f_inf")
# plt.legend()
# plt.show()
# plt.close()
return np.mean(frequency[start_idx:end_idx])
@ -177,6 +172,36 @@ class FICurve:
fit_vars = self.boltzmann_fit_vars
return fu.full_boltzmann_straight_slope(fit_vars[0], fit_vars[1], fit_vars[2], fit_vars[3])
def get_f_zero_and_f_inf_intersection(self):
x_values = np.arange(min(self.stimulus_value), max(self.stimulus_value), 0.0001)
fit_vars = self.boltzmann_fit_vars
f_zero = fu.full_boltzmann(x_values, fit_vars[0], fit_vars[1], fit_vars[2], fit_vars[3])
f_inf = fu.clipped_line(x_values, self.f_infinity_fit[0], self.f_infinity_fit[1])
intersection_indicies = np.argwhere(np.diff(np.sign(f_zero - f_inf))).flatten()
# print("fi-curve calc intersection:", intersection_indicies, x_values[intersection_indicies])
if len(intersection_indicies) > 1:
f_baseline = np.median(self.f_baselines)
best_dist = np.inf
best_idx = -1
for idx in intersection_indicies:
dist = abs(fu.clipped_line(x_values[idx], self.f_infinity_fit[0], self.f_infinity_fit[1]) - f_baseline)
if dist < best_dist:
best_dist = dist
best_idx = idx
return x_values[best_idx]
elif len(intersection_indicies) == 0:
raise ValueError("No intersection found!")
else:
return x_values[intersection_indicies[0]]
def get_fi_curve_slope_at_f_zero_intersection(self):
x = self.get_f_zero_and_f_inf_intersection()
fit_vars = self.boltzmann_fit_vars
return fu.derivative_full_boltzmann(x, fit_vars[0], fit_vars[1], fit_vars[2], fit_vars[3])
def plot_fi_curve(self, savepath: str = None):
min_x = min(self.stimulus_value)
max_x = max(self.stimulus_value)
@ -210,7 +235,6 @@ class FICurve:
mean_frequencies = np.array(self.cell_data.get_mean_isi_frequencies())
time_axes = self.cell_data.get_time_axes_mean_frequencies()
for i in range(len(mean_frequencies)):
fig, axes = plt.subplots(1, 1, sharex="all")
axes.plot(time_axes[i], mean_frequencies[i], label="voltage")

View File

@ -1,6 +1,6 @@
from models.FirerateModel import FirerateModel
from models.LIFAC import LIFACModel
from models.LIFACnoise import LifacNoiseModel
from stimuli.StepStimulus import StepStimulus
import helperFunctions as hf
import numpy as np
@ -13,7 +13,7 @@ def main():
values = [1] # np.arange(5, 40, 1)
parameter = "currently not active"
for value in values:
lifac_model = LIFACModel({"delta_a": 0})
lifac_model = LifacNoiseModel({"delta_a": 0})
# lifac_model.set_variable(parameter, value)
stimulus_strengths = np.arange(50, 60, 1)
@ -36,8 +36,11 @@ def find_fitting_line(lifac_model, stimulus_strengths):
if len(spiketimes) == 0:
frequencies.append(0)
continue
time, freq = hf.calculate_isi_frequency_trace(spiketimes, 0, lifac_model.get_sampling_interval() / 1000)
time, freq = hf.calculate_time_and_frequency_trace(spiketimes, lifac_model.get_sampling_interval())
if len(freq) == 0:
frequencies.append(0)
else:
frequencies.append(freq[-1])
popt, pcov = curve_fit(fu.line, stimulus_strengths, frequencies)
@ -72,8 +75,12 @@ def find_relation(lifac, line_vars, stimulus_strengths, parameter="", value=0, c
stimulus = StepStimulus(0, duration, stim)
lifac.simulate(stimulus, duration)
spiketimes = lifac.get_spiketimes()
time, freq = hf.calculate_isi_frequency_trace(spiketimes, 0, lifac.get_sampling_interval() / 1000)
time, freq = hf.calculate_time_and_frequency_trace(spiketimes, lifac.get_sampling_interval())
if len(freq) == 0:
adapted_frequencies.append(0)
goal_adapted_freq = 0
else:
adapted_frequencies.append(freq[-1])
goal_adapted_freq = freq[-1]

View File

@ -2,7 +2,7 @@ from models.LIFACnoise import LifacNoiseModel
from CellData import CellData, icelldata_of_dir
from FiCurve import FICurve
from AdaptionCurrent import Adaption
from stimuli.SinusAmplitudeModulation import SinusAmplitudeModulationStimulus
from stimuli.SinusoidalStepStimulus import SinusoidalStepStimulus
import helperFunctions as hF
import numpy as np
from scipy.optimize import curve_fit, minimize
@ -15,23 +15,39 @@ def main():
# run_test_with_fixed_model()
# quit()
fitter = Fitter()
fmin, params = fitter.fit_model_to_values(700, 1400, [-0.3], 1, [0.7, 0.8, 0.9, 1, 1.1, 1.2, 1.3], [1370, 1380, 1390, 1400, 1410, 1420, 1430], 100, 0.02, 0.01)
# fitter = Fitter()
# fmin, params = fitter.fit_model_to_values(700, 1400, [-0.3], 1, [0.7, 0.8, 0.9, 1, 1.1, 1.2, 1.3], [1370, 1380, 1390, 1400, 1410, 1420, 1430], 100, 0.02, 0.01)
# print("calculated parameters:")
# print(params)
run_with_real_data()
print("calculated parameters:")
print(params)
def run_with_real_data():
for celldata in icelldata_of_dir("./data/"):
start_time = time.time()
fitter = Fitter(celldata)
fmin, parameters = fitter.fit_model_to_data()
fitter = Fitter()
fmin, parameters = fitter.fit_model_to_data(celldata)
print(fmin)
print(parameters)
end_time = time.time()
print('Fitting of cell took function took {:.3f} s'.format((end_time - start_time)))
res_model = LifacNoiseModel(parameters)
m_bf, m_vs, m_sc = res_model.calculate_baseline_markers(celldata.get_base_frequency())
m_f_values, m_f_slope = res_model.calculate_fi_markers(celldata.get_fi_contrasts(), celldata.get_base_frequency(), 10)
c_bf = celldata.get_base_frequency()
c_vs = celldata.get_vector_strength()
c_sc = celldata.get_serial_correlation(1)
fi_curve = FICurve(celldata)
c_f_slope = fi_curve.get_f_infinity_slope()
c_f_values = fi_curve.f_infinities
print("bf: cell - {:.2f} vs model {:.2f}".format(c_bf, m_bf))
print("vs: cell - {:.2f} vs model {:.2f}".format(c_vs, m_vs))
print("sc: cell - {:.2f} vs model {:.2f}".format(c_sc[0], m_sc[0]))
print("f_slope: cell - {:.2f} vs model {:.2f}".format(c_f_slope, m_f_slope))
print("f values:\n cell -", c_f_values, "\n model -", m_f_values)
pass
@ -97,46 +113,53 @@ class Fitter:
fi_curve = FICurve(data, contrast=True)
self.fi_contrasts = fi_curve.stimulus_value
print("Fitter: fi-contrasts", self.fi_contrasts)
self.f_infinities = fi_curve.f_infinities
self.f_infinities_slope = fi_curve.get_f_infinity_slope()
f_zero_slope = fi_curve.get_fi_curve_slope_of_straight()
self.a_delta = f_zero_slope / self.f_infinities_slope
self.a_delta = (f_zero_slope / self.f_infinities_slope)
adaption = Adaption(data, fi_curve)
self.a_tau = adaption.get_tau_real()
# mem_tau, (threshold?), (v_offset), noise_strength, input_scaling
def cost_function(self, X, tau_a=10, delta_a=3, error_scaling=()):
freq_sampling_rate = 0.005
def cost_function(self, X, tau_a=0.001, delta_a=3, error_scaling=()):
# set model parameters to the given ones:
self.model.set_variable("mem_tau", X[0])
self.model.set_variable("noise_strength", X[1])
self.model.set_variable("input_scaling", X[2])
self.model.set_variable("tau_a", tau_a)
self.model.set_variable("delta_a", delta_a)
self.model.set_variable("tau_a", X[3])
self.model.set_variable("delta_a", X[4])
# self.model.set_variable("tau_a", tau_a)
# self.model.set_variable("delta_a", delta_a)
# minimize the difference in baseline_freq first by fitting v_offset
# v_offset = self.__fit_v_offset_to_baseline_frequency__()
base_stimulus = SinusAmplitudeModulationStimulus(self.eod_freq, 0, 0)
base_stimulus = SinusoidalStepStimulus(self.eod_freq, 0)
v_offset = self.model.find_v_offset(self.baseline_freq, base_stimulus)
self.model.set_variable("v_offset", v_offset)
baseline_freq, vector_strength, serial_correlation = self.model.calculate_baseline_markers(self.baseline_freq, self.sc_max_lag)
# only eod with amplitude 1 and no modulation
_, spiketimes = self.model.simulate_fast(base_stimulus, 30)
baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 5)
# print("model:", baseline_freq, "data:", self.baseline_freq)
relative_spiketimes = np.array([s % (1/self.eod_freq) for s in spiketimes if s > 0])
eod_durations = np.full((len(relative_spiketimes)), 1/self.eod_freq)
vector_strength = hF.__vector_strength__(relative_spiketimes, eod_durations)
serial_correlation = hF.calculate_serial_correlation(np.array(spiketimes), self.sc_max_lag)
# _, spiketimes = self.model.simulate_fast(base_stimulus, 30)
# baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 5)
# # print("model:", baseline_freq, "data:", self.baseline_freq)
#
# if len(spiketimes) > 10:
# relative_spiketimes = np.array([s % (1/self.eod_freq) for s in spiketimes if s > 0])
# eod_durations = np.full((len(relative_spiketimes)), 1/self.eod_freq)
# vector_strength = hF.__vector_strength__(relative_spiketimes, eod_durations)
# serial_correlation = hF.calculate_serial_correlation(np.array(spiketimes), self.sc_max_lag)
# else:
# vector_strength = 0
# serial_correlation = [0]*self.sc_max_lag
f_infinities = []
for contrast in self.fi_contrasts:
stimulus = SinusAmplitudeModulationStimulus(self.eod_freq, contrast, self.modulation_frequency)
stimulus = SinusoidalStepStimulus(self.eod_freq, contrast)
_, spiketimes = self.model.simulate_fast(stimulus, 1)
if len(spiketimes) < 2:
@ -148,7 +171,8 @@ class Fitter:
popt, pcov = curve_fit(fu.line, self.fi_contrasts, f_infinities, maxfev=10000)
f_infinities_slope = popt[0]
if self.counter == 600:
bf, vs, sc = self.model.calculate_baseline_markers(self.eod_freq)
error_bf = abs((baseline_freq - self.baseline_freq) / self.baseline_freq)
error_vs = abs((vector_strength - self.vector_strength) / self.vector_strength)
error_sc = abs((serial_correlation[0] - self.serial_correlation[0]) / self.serial_correlation[0])
@ -161,14 +185,18 @@ class Fitter:
error_f_inf += abs((f_infinities[i] - self.f_infinities[i]) / f_infinities[i])
error_f_inf = error_f_inf / len(f_infinities)
self.counter += 1
# print("mem_tau:", X[0], "noise:", X[0], "input_scaling:", X[2])
errors = [error_bf, error_vs, error_sc, error_f_inf_slope, error_f_inf]
self.counter += 1
if self.counter % 10 == 0:
print("Cost function run times:", self.counter, "error sum:", sum(errors), errors)
return error_bf + error_vs + error_sc + error_f_inf_slope + error_f_inf
def fit_model_to_data(self, data: CellData):
self.calculate_needed_values_from_data(data)
self.counter = 0
return self.fit_model()
def fit_model_to_values(self, eod_freq, baseline_freq, sc, vs, fi_contrasts, fi_inf_values, fi_inf_slope, a_delta, a_tau):
@ -185,9 +213,9 @@ class Fitter:
return self.fit_model()
def fit_model(self):
x0 = np.array([20, 15, 75])
init_simplex = np.array([np.array([2, 1, 10]), np.array([40, 100, 140]), np.array([20, 50, 70]), np.array([150, 1, 200])])
fmin = minimize(fun=self.cost_function, x0=x0, args=(self.a_tau, self.a_delta), method="Nelder-Mead", options={"initial_simplex": init_simplex})
x0 = np.array([0.02, 3, 75, 0.05, 20])
init_simplex = np.array([np.array([2, 1, 10]), np.array([40, 10, 140]), np.array([20, 20, 70]), np.array([150, 1, 200])])
fmin = minimize(fun=self.cost_function, x0=x0, args=(self.a_tau, self.a_delta), method="Nelder-Mead") #, options={"initial_simplex": init_simplex})
return fmin, self.model.get_parameters()

View File

@ -11,6 +11,9 @@ from thunderfish.eventdetection import detect_peaks
from stimuli.SinusAmplitudeModulation import SinusAmplitudeModulationStimulus
from models.LIFACnoise import LifacNoiseModel
from FiCurve import FICurve
from AdaptionCurrent import Adaption
from stimuli.StepStimulus import StepStimulus
from stimuli.SinusoidalStepStimulus import SinusoidalStepStimulus
def time_test_function():
@ -101,6 +104,7 @@ def test_simulation_speed():
def test_fi_curve_class():
for cell_data in icelldata_of_dir("./data/"):
fi_curve = FICurve(cell_data)
fi_curve.get_f_zero_and_f_inf_intersection()
fi_curve.plot_fi_curve()
# fi_curve.plot_f_point_detections()
@ -108,6 +112,20 @@ def test_fi_curve_class():
pass
def test_adaption_class():
for cell_data in icelldata_of_dir("./data/"):
print()
print(cell_data.get_data_path())
fi_curve = FICurve(cell_data)
adaption = Adaption(cell_data, fi_curve)
adaption.plot_exponential_fits()
print("tau_effs:", adaption.get_tau_effs())
print("tau_real:", adaption.get_tau_real())
fi_curve.plot_fi_curve()
def test_parameters():
parameters = {'mem_tau': 21., 'delta_a': 0.1, 'input_scaling': 400.,
'v_offset': 85.25, 'threshold': 0.1, 'v_base': 0, 'step_size': 0.00005, 'tau_a': 0.01,
@ -132,11 +150,28 @@ def test_parameters():
print("f infinities: \n", f_infs)
def test_vector_strength_calculation():
model = LifacNoiseModel({"noise_strength": 0})
bf, vs1, sc = model.calculate_baseline_markers(600)
base_stim = SinusAmplitudeModulationStimulus(600, 0, 0)
_, spiketimes = model.simulate_fast(base_stim, 30)
stimulus_trace = base_stim.as_array(0, 30, model.get_sampling_interval())
time_trace = np.arange(0, 30, model.get_sampling_interval())
vs2 = hf.calculate_vector_strength_from_spiketimes(time_trace, stimulus_trace, spiketimes, model.get_sampling_interval())
print("with assumed eod durations vs: {:.3f}".format(vs1))
print("with detected eod durations vs: {:.3f}".format(vs2))
def plot_model_during_stimulus(model: LifacNoiseModel, stimulus:SinusAmplitudeModulationStimulus, total_time):
model.simulate_fast(stimulus, total_time)
_, spiketimes = model.simulate_fast(stimulus, total_time)
time = np.arange(0, total_time, model.get_sampling_interval())
fig, axes = plt.subplots(4, 1, figsize=(9, 4*2), sharex="all")
fig, axes = plt.subplots(5, 1, figsize=(9, 4*2), sharex="all")
stimulus_array = stimulus.as_array(0, total_time, model.get_sampling_interval())
axes[0].plot(time, stimulus_array)
@ -147,6 +182,11 @@ def plot_model_during_stimulus(model: LifacNoiseModel, stimulus:SinusAmplitudeMo
axes[2].set_title("Voltage")
axes[3].plot(time, model.get_adaption_trace())
axes[3].set_title("Adaption")
f_time, f = hf.calculate_time_and_frequency_trace(spiketimes, model.get_sampling_interval())
axes[4].plot(f_time, f)
axes[4].set_title("Frequency")
plt.show()
@ -155,11 +195,23 @@ def rectify_stimulus_array(stimulus_array: np.ndarray):
if __name__ == '__main__':
model = LifacNoiseModel()
stim = SinusoidalStepStimulus(700, 0.2, start_time=1, duration=1)
bf, vs, sc = model.calculate_baseline_markers(700)
print(bf, vs, "\n", sc)
plot_model_during_stimulus(model, stim, 3)
quit()
# time_test_function()
# test_cell_data()
# test_peak_detection()
# test_simulation_speed()
# test_parameters()
test_fi_curve_class()
# test_fi_curve_class()
# test_adaption_class()
test_vector_strength_calculation()
pass

View File

@ -102,8 +102,9 @@ def calculate_isi_frequency_trace(spiketimes, sampling_interval, time_in_ms=Fals
return []
isis = np.diff(spiketimes)
if sampling_interval > min(isis):
raise ValueError("The sampling interval is bigger than the some isis! cannot accurately compute the trace.")
if sampling_interval > round(min(isis), 7):
raise ValueError("The sampling interval is bigger than the some isis! cannot accurately compute the trace.\n"
"Sampling interval {:.5f}, smallest isi: {:.5f}".format(sampling_interval, min(isis)))
if time_in_ms:
isis = isis / 1000
@ -125,10 +126,18 @@ def calculate_isi_frequency_trace(spiketimes, sampling_interval, time_in_ms=Fals
def calculate_time_and_frequency_trace(spiketimes, sampling_interval, time_in_ms=False):
if len(spiketimes) < 2:
pass # raise ValueError("Cannot compute a time and frequency vector with fewer than 2 spikes")
frequency = calculate_isi_frequency_trace(spiketimes, sampling_interval, time_in_ms)
time = np.arange(spiketimes[0], spiketimes[-1], sampling_interval)
if len(time) != len(frequency):
if len(time) > len(frequency):
time = time[:len(frequency)]
return time, frequency
@ -245,7 +254,7 @@ def calculate_eod_frequency(time, eod):
return 1/mean_duration
def calculate_vector_strength(times, eods, v1_traces):
def calculate_vector_strength_from_v1_trace(times, eods, v1_traces):
# Vectorstaerke (use EOD frequency from header (metadata)) VS > 0.8
# dl.iload_traces(repro='BaselineActivity')
@ -260,7 +269,7 @@ def calculate_vector_strength(times, eods, v1_traces):
rel_spikes, eod_durs = eods_around_spikes(times[recording], eods[recording], spiketime_idices)
relative_spike_times.extend(rel_spikes)
eod_durations.extend(eod_durs)
print(__vector_strength__(np.array(rel_spikes), np.array(eod_durs)))
# print(__vector_strength__(np.array(rel_spikes), np.array(eod_durs)))
relative_spike_times = np.array(relative_spike_times)
eod_durations = np.array(eod_durations)
@ -268,6 +277,13 @@ def calculate_vector_strength(times, eods, v1_traces):
return __vector_strength__(relative_spike_times, eod_durations)
def calculate_vector_strength_from_spiketimes(time, eod, spiketimes, sampling_interval):
spiketime_indices = np.array(np.around((np.array(spiketimes) + time[0]) / sampling_interval), dtype=int)
rel_spikes, eod_durs = eods_around_spikes(time, eod, spiketime_indices)
return __vector_strength__(rel_spikes, eod_durs)
def detect_spikes(v1, split=20, threshold=3):
total = len(v1)
all_peaks = []
@ -299,15 +315,31 @@ def eods_around_spikes(time, eod, spiketime_idices):
eod_durations = []
relative_spike_times = []
for spike_idx in spiketime_idices:
start_time, end_time = search_eod_start_and_end_times(time, eod, spike_idx)
sign_changes = np.sign(eod[:-1]) != np.sign(eod[1:])
eod_trace_increasing = eod[:-1] < eod[1:]
eod_durations.append(end_time-start_time)
spiketime = time[spike_idx]
relative_spike_times.append(spiketime - start_time)
return relative_spike_times, eod_durations
eod_zero_crossings_indices = np.where(sign_changes & eod_trace_increasing)[0]
for spike_idx in spiketime_idices:
# test if it is inside two detected crossings
if eod_zero_crossings_indices[0] > spike_idx > eod_zero_crossings_indices[-1]:
continue
zero_crossing_index_of_eod_end = np.argmax(eod_zero_crossings_indices > spike_idx)
end_time_idx = eod_zero_crossings_indices[zero_crossing_index_of_eod_end]
start_time_idx = eod_zero_crossings_indices[zero_crossing_index_of_eod_end - 1]
eod_durations.append(time[end_time_idx] - time[start_time_idx])
relative_spike_times.append(time[spike_idx] - time[start_time_idx])
# try:
# start_time, end_time = search_eod_start_and_end_times(time, eod, spike_idx)
#
# eod_durations.append(end_time-start_time)
# spiketime = time[spike_idx]
# relative_spike_times.append(spiketime - start_time)
# except IndexError as e:
# continue
return np.array(relative_spike_times), np.array(eod_durations)
def search_eod_start_and_end_times(time, eod, index):

View File

@ -4,6 +4,26 @@ import helperFunctions as hF
import time
def main():
time = np.arange(-1, 30, 0.0001)
eod = np.sin(2*np.pi * 600 * time)
signs = np.sign(eod[:-1]) != np.sign(eod[1:])
delta = eod[:-1] < eod[1:]
sign_changes = np.where(signs & delta)[0]
plt.plot(time, eod)
plt.plot([time[i] for i in sign_changes], [0]*len(sign_changes), 'o')
plt.show()
print(sign_changes)
quit()
for freq in [700, 50, 100, 500, 1000]:
reps = 1000
start = time.time()
@ -57,9 +77,4 @@ def abs_phase_diff(rel_phases:list, ref_phase:float):
if __name__ == '__main__':
print(-2.4%0.35, (int(-2.4/0.35)-1)*0.35)
hF.calculate_isi_frequency_trace([0, 2, 1, 3], 0.5)
#main()
main()

View File

@ -5,7 +5,7 @@ import numpy as np
import functions as fu
from numba import jit
import helperFunctions as hF
from stimuli.SinusAmplitudeModulation import SinusAmplitudeModulationStimulus
from stimuli.SinusoidalStepStimulus import SinusoidalStepStimulus
from scipy.optimize import curve_fit
from warnings import warn
@ -13,22 +13,22 @@ from warnings import warn
class LifacNoiseModel(AbstractModel):
# all times in milliseconds
# possible mem_res: 100 * 1000000 exact value unknown in p-units
DEFAULT_VALUES = {"mem_tau": 20,
DEFAULT_VALUES = {"mem_tau": 0.015,
"v_base": 0,
"v_zero": 0,
"threshold": 1,
"v_offset": 50,
"input_scaling": 1,
"delta_a": 0.4,
"tau_a": 0.04,
"a_zero": 0,
"noise_strength": 3,
"v_offset": -10,
"input_scaling": 60,
"delta_a": 0.08,
"tau_a": 0.1,
"a_zero": 10,
"noise_strength": 0.05,
"step_size": 0.00005}
def __init__(self, params: dict = None):
super().__init__(params)
if self.parameters["step_size"] >= 0.0001:
if self.parameters["step_size"] > 0.0001:
warn("LifacNoiseModel: The step size is quite big simulation could fail.")
self.voltage_trace = []
self.adaption_trace = []
@ -60,7 +60,7 @@ class LifacNoiseModel(AbstractModel):
if v_next > self.parameters["threshold"]:
v_next = self.parameters["v_base"]
spiketimes.append(time_point)
a_next += self.parameters["delta_a"] / (self.parameters["tau_a"])
a_next += self.parameters["delta_a"] / self.parameters["tau_a"]
output_voltage[i] = v_next
adaption[i] = a_next
@ -164,14 +164,19 @@ class LifacNoiseModel(AbstractModel):
:return: baseline_freq, vs, sc
"""
base_stimulus = SinusAmplitudeModulationStimulus(base_stimulus_freq, 0, 0)
base_stimulus = SinusoidalStepStimulus(base_stimulus_freq, 0)
_, spiketimes = self.simulate_fast(base_stimulus, 30)
time_x = 5
baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, time_x)
relative_spiketimes = np.array([s % (1 / base_stimulus_freq) for s in spiketimes])
eod_durations = np.full((len(spiketimes)), 1 / base_stimulus_freq)
vector_strength = hF.__vector_strength__(relative_spiketimes, eod_durations)
if baseline_freq < 1:
return baseline_freq, 0, [0]*max_lag
else:
time_trace = np.arange(0, 30, self.get_sampling_interval())
stimulus_array = base_stimulus.as_array(0, 30, self.get_sampling_interval())
vector_strength = hF.calculate_vector_strength_from_spiketimes(time_trace, stimulus_array, spiketimes, self.get_sampling_interval())
serial_correlation = hF.calculate_serial_correlation(np.array(spiketimes), max_lag)
return baseline_freq, vector_strength, serial_correlation
@ -185,7 +190,7 @@ class LifacNoiseModel(AbstractModel):
f_infinities = []
for contrast in contrasts:
stimulus = SinusAmplitudeModulationStimulus(base_freq, contrast, modulation_frequency)
stimulus = SinusoidalStepStimulus(base_freq, contrast)
_, spiketimes = self.simulate_fast(stimulus, 1)
f_infinity = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 0.3)
@ -197,7 +202,7 @@ class LifacNoiseModel(AbstractModel):
return f_infinities, f_infinities_slope
def find_v_offset(self, goal_baseline_frequency, base_stimulus, threshold=10):
def find_v_offset(self, goal_baseline_frequency, base_stimulus, threshold=10, border=50000):
test_model = self.get_model_copy()
simulation_length = 5
@ -208,9 +213,14 @@ class LifacNoiseModel(AbstractModel):
current_freq = test_v_offset(test_model, current_v_offset, base_stimulus, simulation_length)
while current_freq < goal_baseline_frequency:
if current_v_offset >= border:
return border
current_v_offset += v_search_step_size
current_freq = test_v_offset(test_model, current_v_offset, base_stimulus, simulation_length)
if current_v_offset == 0:
return -1000
lower_bound = current_v_offset - v_search_step_size
upper_bound = current_v_offset
@ -218,13 +228,15 @@ class LifacNoiseModel(AbstractModel):
def binary_search_base_freq(model: LifacNoiseModel, base_stimulus, goal_frequency, simulation_length, lower_bound, upper_bound, threshold):
counter = 0
if threshold <= 0:
raise ValueError("binary_search_base_freq() - LifacNoiseModel: threshold is not allowed to be negative!")
while True:
counter += 1
middle = upper_bound - (upper_bound - lower_bound)/2
frequency = test_v_offset(model, middle, base_stimulus, simulation_length)
if counter > 100:
print("meep")
# print('{:.1f}, {:.1f}, {:.1f}, {:.1f} vs {:.1f} '.format(lower_bound, middle, upper_bound, frequency, goal_frequency))
if abs(frequency - goal_frequency) < threshold:
return middle
@ -255,7 +267,6 @@ def rectify_stimulus_array(stimulus_array: np.ndarray):
@jit(nopython=True)
def simulate_fast(rectified_stimulus_array, total_time_s, parameters: np.ndarray):
v_zero = parameters[0]
a_zero = parameters[1]
step_size = parameters[2]
@ -273,7 +284,6 @@ def simulate_fast(rectified_stimulus_array, total_time_s, parameters: np.ndarray
length = len(time)
output_voltage = np.zeros(length)
adaption = np.zeros(length)
stimulus_values = rectified_stimulus_array
spiketimes = []
output_voltage[0] = v_zero
@ -284,7 +294,7 @@ def simulate_fast(rectified_stimulus_array, total_time_s, parameters: np.ndarray
noise_value = np.random.normal()
noise = noise_strength * noise_value / np.sqrt(step_size)
output_voltage[i] = output_voltage[i-1] + ((v_base - output_voltage[i-1] + v_offset + (stimulus_values[i]*input_scaling) - adaption[i-1] + noise) / mem_tau) * step_size
output_voltage[i] = output_voltage[i-1] + ((v_base - output_voltage[i-1] + v_offset + (rectified_stimulus_array[i] * input_scaling) - adaption[i-1] + noise) / mem_tau) * step_size
adaption[i] = adaption[i-1] + ((-adaption[i-1]) / tau_a) * step_size
if output_voltage[i] > threshold:

View File

@ -81,43 +81,43 @@ def convert_to_array(carrier_freq, amplitude, modulation_freq, contrast, start_t
return values
# if the whole stimulus time has the amplitude modulation just built it at once;
if time_start >= start_time and start_time+duration < time_start+total_time:
carrier = np.sin(2 * np.pi * carrier_freq * np.arange(start_time, total_time - start_time, step_size_s))
modulation = 1 + contrast * np.sin(2 * np.pi * modulation_freq * np.arange(start_time, total_time - start_time, step_size_s))
values = amplitude * carrier * modulation
return values
# if it is split into parts with and without amplitude modulation built it in parts:
values = np.array([])
# there is some time before the modulation starts:
if time_start < start_time:
carrier_before_am = np.sin(2 * np.pi * carrier_freq * np.arange(time_start, start_time, step_size_s))
values = np.concatenate((values, amplitude * carrier_before_am))
# there is at least a second part of the stimulus that contains the amplitude:
# time starts before the end of the am and ends after it was started
if time_start < start_time+duration and time_start+total_time > start_time:
if duration is np.inf:
carrier_during_am = np.sin(
2 * np.pi * carrier_freq * np.arange(start_time, time_start + total_time, step_size_s))
am = 1 + contrast * np.sin(
2 * np.pi * modulation_freq * np.arange(start_time, time_start + total_time, step_size_s))
else:
carrier_during_am = np.sin(
2 * np.pi * carrier_freq * np.arange(start_time, start_time + duration, step_size_s))
am = 1 + contrast * np.sin(
2 * np.pi * modulation_freq * np.arange(start_time, start_time + duration, step_size_s))
values = np.concatenate((values, amplitude * am * carrier_during_am))
else:
if contrast != 0:
print("Given stimulus time parameters (start, total) result in no part of it containing the amplitude modulation!")
if time_start+total_time > start_time+duration:
carrier_after_am = np.sin(2 * np.pi * carrier_freq * np.arange(start_time + duration, time_start + total_time, step_size_s))
values = np.concatenate((values, amplitude*carrier_after_am))
return values
# # if the whole stimulus time has the amplitude modulation just built it at once;
# if time_start >= start_time and start_time+duration < time_start+total_time:
# carrier = np.sin(2 * np.pi * carrier_freq * np.arange(start_time, total_time - start_time, step_size_s))
# modulation = 1 + contrast * np.sin(2 * np.pi * modulation_freq * np.arange(start_time, total_time - start_time, step_size_s))
# values = amplitude * carrier * modulation
# return values
#
# # if it is split into parts with and without amplitude modulation built it in parts:
# values = np.array([])
#
# # there is some time before the modulation starts:
# if time_start < start_time:
# carrier_before_am = np.sin(2 * np.pi * carrier_freq * np.arange(time_start, start_time, step_size_s))
# values = np.concatenate((values, amplitude * carrier_before_am))
#
# # there is at least a second part of the stimulus that contains the amplitude:
# # time starts before the end of the am and ends after it was started
# if time_start < start_time+duration and time_start+total_time > start_time:
# if duration is np.inf:
#
# carrier_during_am = np.sin(
# 2 * np.pi * carrier_freq * np.arange(start_time, time_start + total_time, step_size_s))
# am = 1 + contrast * np.sin(
# 2 * np.pi * modulation_freq * np.arange(start_time, time_start + total_time, step_size_s))
# else:
# carrier_during_am = np.sin(
# 2 * np.pi * carrier_freq * np.arange(start_time, start_time + duration, step_size_s))
# am = 1 + contrast * np.sin(
# 2 * np.pi * modulation_freq * np.arange(start_time, start_time + duration, step_size_s))
# values = np.concatenate((values, amplitude * am * carrier_during_am))
#
# else:
# if contrast != 0:
# print("Given stimulus time parameters (start, total) result in no part of it containing the amplitude modulation!")
#
# if time_start+total_time > start_time+duration:
# carrier_after_am = np.sin(2 * np.pi * carrier_freq * np.arange(start_time + duration, time_start + total_time, step_size_s))
# values = np.concatenate((values, amplitude*carrier_after_am))
#
# return values

View File

@ -0,0 +1,75 @@
from stimuli.AbstractStimulus import AbstractStimulus
import numpy as np
from numba import jit, njit
class SinusoidalStepStimulus(AbstractStimulus):
def __init__(self, frequency, contrast, start_time=0, duration=np.inf, amplitude=1):
self.contrast = 1 + contrast
self.amplitude = amplitude
self.frequency = frequency
self.start_time = start_time
self.duration = duration
def value_at_time_in_s(self, time_point):
carrier = np.sin(2 * np.pi * self.frequency * time_point)
if time_point < self.start_time or time_point > self.start_time + self.duration:
return self.amplitude * carrier * self.contrast
return self.amplitude * carrier
def get_stimulus_start_s(self):
return self.start_time
def get_stimulus_duration_s(self):
return self.duration
def get_amplitude(self):
return self.contrast
def as_array(self, time_start, total_time, step_size):
frequency = self.frequency
amp = self.amplitude
contrast = self.contrast
start_time = self.start_time
duration = self.duration
values = convert_to_array(frequency, amp, contrast, start_time, duration, time_start, total_time, step_size)
return values
# @jit(nopython=True) # makes it slower?
def convert_to_array(frequency, amplitude, contrast, start_time, duration, time_start, total_time, step_size_s):
full_time = np.arange(time_start, time_start + total_time, step_size_s)
full_carrier = np.sin(2 * np.pi * frequency * full_time)
if start_time > time_start+duration or start_time+duration < time_start:
return full_carrier * amplitude
else:
if start_time >= time_start:
am_start = start_time
else:
am_start = time_start
if time_start + total_time >= start_time + duration:
am_end = start_time + duration
else:
am_end = time_start + total_time
idx_start = (am_start - time_start) / step_size_s
idx_end = (am_end - time_start) / step_size_s
if idx_start != round(idx_start) or idx_end != round(idx_end):
raise ValueError("Didn't calculate integers when searching the start and end index. start:", idx_start, "end:", idx_end)
# print("am_start: {:.0f}, am_end: {:.0f}, length: {:.0f}".format(am_start, am_end, am_end-am_start))
idx_start = int(idx_start)
idx_end = int(idx_end)
values = full_carrier * amplitude
values[idx_start:idx_end] = values[idx_start:idx_end]*contrast
return values

View File

@ -1,6 +1,6 @@
from stimuli.AbstractStimulus import AbstractStimulus
import numpy as np
class StepStimulus(AbstractStimulus):
@ -31,3 +31,36 @@ class StepStimulus(AbstractStimulus):
def get_amplitude(self):
return self.value - self.base_value
def as_array(self, time_start, total_time, step_size):
values = np.full(int(total_time/step_size), self.base_value)
if self.start > time_start + self.duration or self.start + self.duration < time_start:
return values
else:
if self.start >= time_start:
am_start = self.start
else:
am_start = time_start
if time_start + total_time >= self.start + self.duration:
am_end = self.start + self.duration
else:
am_end = time_start + total_time
idx_start = (am_start - time_start) / step_size
idx_end = (am_end - time_start) / step_size
if idx_start != round(idx_start) or idx_end != round(idx_end):
raise ValueError("Didn't calculate integers when searching the start and end index. start:", idx_start,
"end:", idx_end)
# print("am_start: {:.0f}, am_end: {:.0f}, length: {:.0f}".format(am_start, am_end, am_end-am_start))
idx_start = int(idx_start)
idx_end = int(idx_end)
values[idx_start:idx_end+1] = self.value
return values

View File

@ -126,7 +126,7 @@ class FrequencyFunctionsTester(unittest.TestCase):
freq_traces = [test1_f, test2_f]
time, mean = hF.calculate_mean_of_frequency_traces(time_traces, freq_traces, 0.5)
expected_time = np.arange(0.5, 7.5, 0.5)
expected_time = np.arange(0.5, 7, 0.5)
expected_mean = [0.75, 1.25, 1.25, 2, 2, 2.5]
time_equal = np.all([time[i] == expected_time[i] for i in range(len(time))])
@ -137,7 +137,11 @@ class FrequencyFunctionsTester(unittest.TestCase):
self.assertEqual(len(expected_time), len(time), msg="expected:\n" + str(expected_time) + "\n actual: \n" + str(time))
# TODO:
# all_calculate_mean_isi_frequency_traces(spiketimes, sampling_interval, time_in_ms=True):
# all_calculate_mean_isi_frequency_traces(spiketimes, sampling_interval, time_in_ms=False):
#def test_all_calculate_mean_isi_frequency_traces(self):
# hF.all_calculate_mean_isi_frequency_traces(,
def generate_jittered_spiketimes(frequency, noise_level=0., start=0, end=5, method='normal'):

View File

@ -30,7 +30,23 @@ class HelperFunctionsTester(unittest.TestCase):
self.assertEqual(0, round(hF.__vector_strength__(rel_spike_times, eod_durations), 5))
# def test_eods_around_spikes(self):
#
# time = np.arange(0, 3, 0.01)
# eod = np.sin(2*np.pi * 2 * time)
#
# spikes = [0.2, 0.5, 0.6]
# indices = np.searchsorted(time, spikes)
#
# rel_spike_time, eod_duration = hF.eods_around_spikes(time, eod, indices)
#
# print("meep")
# todo
# search_eod_start_and_end_times ? (not used anymore ?)
# eods_around_spikes
# calculate_phases
# def test(self):
# test_distribution()