debug and add unittests 1

This commit is contained in:
a.ott 2020-02-27 09:28:34 +01:00
parent 234f58404e
commit 170ecab8f4
8 changed files with 419 additions and 113 deletions

View File

@ -10,8 +10,12 @@ import functions as fu
import time import time
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
def main(): def main():
run_test_with_fixed_model()
def run_with_real_data():
for celldata in icelldata_of_dir("./data/"): for celldata in icelldata_of_dir("./data/"):
start_time = time.time() start_time = time.time()
fitter = Fitter(celldata) fitter = Fitter(celldata)
@ -26,14 +30,39 @@ def main():
pass pass
def run_test_with_fixed_model():
a_tau = 10
a_delta = 0.08
parameters = {'mem_tau': 5, 'delta_a': a_delta, 'input_scaling': 100,
'v_offset': 50, 'threshold': 1, 'v_base': 0, 'step_size': 0.05, 'tau_a': a_tau,
'a_zero': 0, 'v_zero': 0, 'noise_strength': 0.5}
model = LifacNoiseModel(parameters)
eod_freq = 750
contrasts = np.arange(0.5, 1.51, 0.1)
modulation_freq = 10
print(contrasts)
baseline_freq, vector_strength, serial_correlation = model.calculate_baseline_markers(eod_freq)
f_infinities, f_infinities_slope = model.calculate_fi_markers(contrasts, eod_freq, modulation_freq)
fitter = Fitter()
fmin, fit_parameters = fitter.fit_model_to_values(eod_freq, baseline_freq, serial_correlation, vector_strength, contrasts, f_infinities, f_infinities_slope, a_delta, a_tau)
print("calculated parameters:")
print(fit_parameters)
print("ref parameters:")
print(parameters)
class Fitter: class Fitter:
def __init__(self, data: CellData, step_size=None): def __init__(self, step_size=None):
if step_size is not None: if step_size is not None:
self.model = LifacNoiseModel({"step_size": step_size}) self.model = LifacNoiseModel({"step_size": step_size})
else: else:
self.model = LifacNoiseModel({"step_size": 0.05}) self.model = LifacNoiseModel({"step_size": 0.05})
self.data = data # self.data = data
self.fi_contrasts = [] self.fi_contrasts = []
self.eod_freq = 0 self.eod_freq = 0
@ -53,16 +82,15 @@ class Fitter:
self.a_delta = 0 self.a_delta = 0
self.counter = 0 self.counter = 0
self.calculate_needed_values_from_data()
def calculate_needed_values_from_data(self): def calculate_needed_values_from_data(self, data: CellData):
self.eod_freq = self.data.get_eod_frequency() self.eod_freq = data.get_eod_frequency()
self.baseline_freq = self.data.get_base_frequency() self.baseline_freq = data.get_base_frequency()
self.vector_strength = self.data.get_vector_strength() self.vector_strength = data.get_vector_strength()
self.serial_correlation = self.data.get_serial_correlation(self.sc_max_lag) self.serial_correlation = data.get_serial_correlation(self.sc_max_lag)
fi_curve = FICurve(self.data, contrast=True) fi_curve = FICurve(data, contrast=True)
self.fi_contrasts = fi_curve.stimulus_value self.fi_contrasts = fi_curve.stimulus_value
self.f_infinities = fi_curve.f_infinities self.f_infinities = fi_curve.f_infinities
self.f_infinities_slope = fi_curve.get_f_infinity_slope() self.f_infinities_slope = fi_curve.get_f_infinity_slope()
@ -70,11 +98,12 @@ class Fitter:
f_zero_slope = fi_curve.get_fi_curve_slope_of_straight() f_zero_slope = fi_curve.get_fi_curve_slope_of_straight()
self.a_delta = f_zero_slope / self.f_infinities_slope self.a_delta = f_zero_slope / self.f_infinities_slope
adaption = Adaption(self.data, fi_curve) adaption = Adaption(data, fi_curve)
self.a_tau = adaption.get_tau_real() self.a_tau = adaption.get_tau_real()
# mem_tau, (threshold?), (v_offset), noise_strength, input_scaling # mem_tau, (threshold?), (v_offset), noise_strength, input_scaling
def cost_function(self, X, tau_a=10, delta_a=3, error_scaling=()): def cost_function(self, X, tau_a=10, delta_a=3, error_scaling=()):
freq_sampling_rate = 0.005
# set model parameters to the given ones: # set model parameters to the given ones:
self.model.set_variable("mem_tau", X[0]) self.model.set_variable("mem_tau", X[0])
self.model.set_variable("noise_strength", X[1]) self.model.set_variable("noise_strength", X[1])
@ -83,18 +112,19 @@ class Fitter:
self.model.set_variable("delta_a", delta_a) self.model.set_variable("delta_a", delta_a)
# minimize the difference in baseline_freq first by fitting v_offset # minimize the difference in baseline_freq first by fitting v_offset
v_offset = self.__fit_v_offset_to_baseline_frequency__() # v_offset = self.__fit_v_offset_to_baseline_frequency__()
v_offset = self.model.find_v_offset(self.baseline_freq, self.eod_freq)
self.model.set_variable("v_offset", v_offset) self.model.set_variable("v_offset", v_offset)
# only eod with amplitude 1 and no modulation # only eod with amplitude 1 and no modulation
base_stimulus = SinusAmplitudeModulationStimulus(self.eod_freq, 0, 0) base_stimulus = SinusAmplitudeModulationStimulus(self.eod_freq, 0, 0)
_, spiketimes = self.model.simulate_fast(base_stimulus, 30) _, spiketimes = self.model.simulate_fast(base_stimulus, 30)
baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 5) baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, freq_sampling_rate, 5)
# print("model:", baseline_freq, "data:", self.baseline_freq) # print("model:", baseline_freq, "data:", self.baseline_freq)
relative_spiketimes = np.array([s % (1/self.eod_freq) for s in spiketimes]) relative_spiketimes = np.array([s % (1/self.eod_freq) for s in spiketimes if s > 0])
eod_durations = np.full((len(spiketimes)), 1/self.eod_freq) eod_durations = np.full((len(relative_spiketimes)), 1/self.eod_freq)
vector_strength = hF.__vector_strength__(relative_spiketimes, eod_durations) vector_strength = hF.__vector_strength__(relative_spiketimes, eod_durations)
serial_correlation = hF.calculate_serial_correlation(np.array(spiketimes), self.sc_max_lag) serial_correlation = hF.calculate_serial_correlation(np.array(spiketimes), self.sc_max_lag)
@ -106,7 +136,7 @@ class Fitter:
if len(spiketimes) < 2: if len(spiketimes) < 2:
f_infinities.append(0) f_infinities.append(0)
else: else:
f_infinity = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 0.4) f_infinity = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, freq_sampling_rate, 0.4)
f_infinities.append(f_infinity) f_infinities.append(f_infinity)
popt, pcov = curve_fit(fu.line, self.fi_contrasts, f_infinities, maxfev=10000) popt, pcov = curve_fit(fu.line, self.fi_contrasts, f_infinities, maxfev=10000)
@ -127,7 +157,8 @@ class Fitter:
error_f_inf = error_f_inf / len(f_infinities) error_f_inf = error_f_inf / len(f_infinities)
self.counter += 1 self.counter += 1
# print("mem_tau:", X[0], "noise:", X[0], "input_scaling:", X[2]) # print("mem_tau:", X[0], "noise:", X[0], "input_scaling:", X[2])
print("Cost function run times:", self.counter, "errors:", [error_bf, error_vs, error_sc, error_f_inf_slope, error_f_inf]) errors = [error_bf, error_vs, error_sc, error_f_inf_slope, error_f_inf]
print("Cost function run times:", self.counter, "error sum:", sum(errors), errors)
return error_bf + error_vs + error_sc + error_f_inf_slope + error_f_inf return error_bf + error_vs + error_sc + error_f_inf_slope + error_f_inf
def __fit_v_offset_to_baseline_frequency__(self): def __fit_v_offset_to_baseline_frequency__(self):
@ -201,7 +232,7 @@ class Fitter:
else: else:
baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, simulation_time/2) baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, simulation_time/2)
if abs(baseline_freq - self.baseline_freq) < 1: if abs(baseline_freq - self.baseline_freq) < 5:
# print("close enough:", baseline_freq, self.baseline_freq, abs(baseline_freq - self.baseline_freq)) # print("close enough:", baseline_freq, self.baseline_freq, abs(baseline_freq - self.baseline_freq))
break break
elif baseline_freq < self.baseline_freq: elif baseline_freq < self.baseline_freq:
@ -211,14 +242,28 @@ class Fitter:
return middle return middle
def fit_model_to_data(self): def fit_model_to_data(self, data: CellData):
self.calculate_needed_values_from_data(data)
return self.fit_model()
def fit_model_to_values(self, eod_freq, baseline_freq, sc, vs, fi_contrasts, fi_inf_values, fi_inf_slope, a_delta, a_tau):
self.eod_freq = eod_freq
self.baseline_freq = baseline_freq
self.serial_correlation = sc
self.vector_strength = vs
self.fi_contrasts = fi_contrasts
self.f_infinities = fi_inf_values
self.f_infinities_slope = fi_inf_slope
self.a_delta = a_delta
self.a_tau = a_tau
return self.fit_model()
def fit_model(self):
x0 = np.array([20, 15, 75]) x0 = np.array([20, 15, 75])
init_simplex = np.array([np.array([2, 1, 10]), np.array([40, 100, 140]), np.array([20, 50, 70]), np.array([150, 1, 200])]) init_simplex = np.array([np.array([2, 1, 10]), np.array([40, 100, 140]), np.array([20, 50, 70]), np.array([150, 1, 200])])
fmin = minimize(fun=self.cost_function, x0=x0, args=(self.a_tau, self.a_delta), method="Nelder-Mead", options={"initial_simplex": init_simplex}) fmin = minimize(fun=self.cost_function, x0=x0, args=(self.a_tau, self.a_delta), method="Nelder-Mead", options={"initial_simplex": init_simplex})
#fmin = minimize(fun=self.cost_function, x0=x0, args=(self.a_tau, self.a_delta), method="BFGS")
return fmin, self.model.get_parameters() return fmin, self.model.get_parameters()

View File

@ -73,19 +73,27 @@ def test_peak_detection():
def test_simulation_speed(): def test_simulation_speed():
parameters = {'mem_tau': 21.348990483539083, 'delta_a': 20.41809814660199, 'input_scaling': 3.0391541280864196, 'v_offset': 26.25, 'threshold': 1, 'v_base': 0, 'step_size': 0.01, 'tau_a': 158.0404259501454, 'a_zero': 0, 'v_zero': 0, 'noise_strength': 2.87718460648148} parameters = {'mem_tau': 21.348990483539083, 'delta_a': 20.41809814660199, 'input_scaling': 3.0391541280864196, 'v_offset': 26.25, 'threshold': 1, 'v_base': 0, 'step_size': 0.00005, 'tau_a': 158.0404259501454, 'a_zero': 0, 'v_zero': 0, 'noise_strength': 2.87718460648148}
model = LifacNoiseModel(parameters) model = LifacNoiseModel(parameters)
repetitions = 20 repetitions = 1
seconds = 10 seconds = 10
stimulus = SinusAmplitudeModulationStimulus(750, 1, 10, 1, 8) stimulus = SinusAmplitudeModulationStimulus(750, 1, 10, 1, 8)
time_start = 0 time_start = 0
t_start = time.time() t_start = time.time()
for i in range(repetitions): for i in range(repetitions):
v, spikes = model.simulate_fast(stimulus, seconds, time_start)
v, spikes = model.simulate_fast(stimulus, seconds, time_start)
print(len(v))
print(len(spikes))
#time_v = np.arange(time_start, seconds, model.get_sampling_interval())
#plt.plot(time_v, v, '.')
#plt.show()
#freq = hf.mean_freq_of_spiketimes_after_time_x(spikes, parameters["step_size"], 0)
#print(freq)
t_end = time.time() t_end = time.time()
print("baseline markers:",model.calculate_baseline_markers(750, 3)) #print("baseline markers:", model.calculate_baseline_markers(750, 3))
print("took:", round((t_end-t_start)/repetitions, 2), "seconds for " + str(seconds) + "s simulation", "step size:", parameters["step_size"]) print("took:", round((t_end-t_start)/repetitions, 5), "seconds for " + str(seconds) + "s simulation", "step size:", parameters["step_size"]*1000, "ms")
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -74,37 +74,32 @@ def all_calculate_mean_isi_frequencies(spiketimes, time_start, sampling_interval
return times, mean_frequencies return times, mean_frequencies
# TODO remove additional time vector calculation! def calculate_isi_frequency(spiketimes, sampling_interval, time_in_ms=True):
def calculate_isi_frequency(spiketimes, time_start, sampling_interval): """
first_isi = spiketimes[0] - time_start Calculates the frequency over time according to the inter spike intervals.
isis = [first_isi]
isis.extend(np.diff(spiketimes)) :param spiketimes: time points spikes were measured array_like
time = np.arange(time_start, spiketimes[-1], sampling_interval) :param sampling_interval: the sampling interval in which the frequency should be given back
:param time_in_ms: whether the time is in ms or in s for BOTH the spiketimes and the sampling interval
full_frequency = [] :return: an np.array with the isi frequency starting at the time of first spike and ending at the time of the last spike
i = 0 """
isis = np.diff(spiketimes)
if time_in_ms:
isis = isis / 1000
sampling_interval = sampling_interval / 1000
full_frequency = np.array([])
for isi in isis: for isi in isis:
if isi == 0: if isi == 0:
warn("An ISI was zero in FiCurve:__calculate_mean_isi_frequency__()") warn("An ISI was zero in FiCurve:__calculate_mean_isi_frequency__()")
print("ISI was zero:", spiketimes)
continue continue
freq = 1 / isi freq = 1 / isi
frequency_step = int(round(isi * (1 / sampling_interval))) * [freq] frequency_step = np.full(int(round(isi * (1 / sampling_interval))), freq)
full_frequency.extend(frequency_step) full_frequency = np.concatenate((full_frequency, frequency_step))
i += 1
if len(full_frequency) != len(time):
if abs(len(full_frequency) - len(time)) == 1:
warn("FiCurve:__calculate_mean_isi_frequency__():\nFrequency and time were one of in length!")
if len(full_frequency) < len(time):
time = time[:len(full_frequency)]
else:
full_frequency = full_frequency[:len(time)]
else:
print("ERROR PRINT:")
print("freq:", len(full_frequency), "time:", len(time), "diff:", len(full_frequency) - len(time))
raise RuntimeError("FiCurve:__calculate_mean_isi_frequency__():\n"
"Frequency and time are not the same length!")
return time, full_frequency return full_frequency
def calculate_mean_frequency(trial_times, trial_freqs): def calculate_mean_frequency(trial_times, trial_freqs):
@ -118,27 +113,20 @@ def calculate_mean_frequency(trial_times, trial_freqs):
return time, mean_freq return time, mean_freq
def mean_freq_of_spiketimes_after_time_x(spiketimes, time_x): def mean_freq_of_spiketimes_after_time_x(spiketimes, sampling_interval, time_x, time_in_ms=False):
""" Calculates the mean frequency of the portion of spiketimes that is after last_x_time """ """ Calculates the mean frequency of the portion of spiketimes that is after last_x_time """
idx = -1 if len(spiketimes) <= 1:
if time_x < spiketimes[int(len(spiketimes)/2)]:
for i in range(len(spiketimes)):
if spiketimes[i] > time_x:
idx = i + 1
break
else:
for i in range(len(spiketimes) - 1, -1, -1):
if spiketimes[i] < time_x:
idx = i + 1
break
all_isi = np.diff(spiketimes[idx:]) / 1000
if len(all_isi) < 5:
return 0 return 0
mean_freq = np.mean([1 / isi for isi in all_isi])
freq = calculate_isi_frequency(spiketimes, sampling_interval, time_in_ms)
# returned frequency starts at the
idx = int((time_x-spiketimes[0]) / sampling_interval)
mean_freq = np.mean(freq[idx:])
return mean_freq return mean_freq
# @jit(nopython=True) # only faster at around 30 000 calls # @jit(nopython=True) # only faster at around 30 000 calls
def calculate_coefficient_of_variation(spiketimes: np.ndarray) -> float: def calculate_coefficient_of_variation(spiketimes: np.ndarray) -> float:
# CV (stddev of ISI divided by mean ISI (np.diff(spiketimes)) # CV (stddev of ISI divided by mean ISI (np.diff(spiketimes))
@ -167,10 +155,13 @@ def calculate_serial_correlation(spiketimes: np.ndarray, max_lag: int) -> np.nda
def calculate_eod_frequency(time, eod): def calculate_eod_frequency(time, eod):
# TODO for few samples very volatile measure!
up_indicies, down_indicies = threshold_crossings(eod, 0) up_indicies, down_indicies = threshold_crossings(eod, 0)
up_times, down_times = threshold_crossing_times(time, eod, 0, up_indicies, down_indicies) up_times, down_times = threshold_crossing_times(time, eod, 0, up_indicies, down_indicies)
if len(up_times) == 0:
return 0
durations = np.diff(up_times) durations = np.diff(up_times)
mean_duration = np.mean(durations) mean_duration = np.mean(durations)
@ -297,3 +288,4 @@ def __vector_strength__(relative_spike_times: np.ndarray, eod_durations: np.ndar
vs = np.sqrt((1 / n * np.sum(np.cos(phase_times))) ** 2 + (1 / n * np.sum(np.sin(phase_times))) ** 2) vs = np.sqrt((1 / n * np.sum(np.cos(phase_times))) ** 2 + (1 / n * np.sum(np.sin(phase_times))) ** 2)
return vs return vs

10
introduction/test.py Normal file
View File

@ -0,0 +1,10 @@
import numpy as np
import matplotlib.pyplot as plt
def main():
pass
if __name__ == '__main__':
main()

View File

@ -19,10 +19,10 @@ class LifacNoiseModel(AbstractModel):
"v_offset": 50, "v_offset": 50,
"input_scaling": 1, "input_scaling": 1,
"delta_a": 0.4, "delta_a": 0.4,
"tau_a": 40, "tau_a": 0.04,
"a_zero": 0, "a_zero": 0,
"noise_strength": 3, "noise_strength": 3,
"step_size": 0.01} "step_size": 0.00005}
def __init__(self, params: dict = None): def __init__(self, params: dict = None):
super().__init__(params) super().__init__(params)
@ -36,26 +36,31 @@ class LifacNoiseModel(AbstractModel):
def simulate(self, stimulus: AbstractStimulus, total_time_s): def simulate(self, stimulus: AbstractStimulus, total_time_s):
self.stimulus = stimulus self.stimulus = stimulus
output_voltage = [] time = np.arange(0, total_time_s, self.parameters["step_size"])
adaption = [] output_voltage = np.zeros(len(time), dtype='float64')
adaption = np.zeros(len(time), dtype='float64')
spiketimes = [] spiketimes = []
current_v = self.parameters["v_zero"] current_v = self.parameters["v_zero"]
current_a = self.parameters["a_zero"] current_a = self.parameters["a_zero"]
output_voltage[0] = current_v
adaption[0] = current_a
for time_point in np.arange(0, total_time_s*1000, self.parameters["step_size"]): for i in range(1, len(time), 1):
time_point = time[i]
# rectified input: # rectified input:
stimulus_strength = fu.rectify(stimulus.value_at_time_in_ms(time_point)) stimulus_strength = fu.rectify(stimulus.value_at_time_in_s(time_point))
v_next = self._calculate_voltage_step(current_v, stimulus_strength - current_a) v_next = self._calculate_voltage_step(current_v, stimulus_strength - current_a)
a_next = self._calculate_adaption_step(current_a) a_next = self._calculate_adaption_step(current_a)
if v_next > self.parameters["threshold"]: if v_next > self.parameters["threshold"]:
v_next = self.parameters["v_base"] v_next = self.parameters["v_base"]
spiketimes.append(time_point/1000) spiketimes.append(time_point)
a_next += self.parameters["delta_a"] / (self.parameters["tau_a"] / 1000) a_next += self.parameters["delta_a"] / (self.parameters["tau_a"])
output_voltage.append(v_next) output_voltage[i] = v_next
adaption.append(a_next) adaption[i] = a_next
current_v = v_next current_v = v_next
current_a = a_next current_a = a_next
@ -66,6 +71,22 @@ class LifacNoiseModel(AbstractModel):
return output_voltage, spiketimes return output_voltage, spiketimes
def _calculate_voltage_step(self, current_v, input_v):
v_base = self.parameters["v_base"]
step_size = self.parameters["step_size"]
v_offset = self.parameters["v_offset"]
mem_tau = self.parameters["mem_tau"]
noise_strength = self.parameters["noise_strength"]
noise_value = np.random.normal()
noise = noise_strength * noise_value / np.sqrt(step_size)
return current_v + step_size * ((v_base - current_v + v_offset + input_v + noise) / mem_tau)
def _calculate_adaption_step(self, current_a):
step_size = self.parameters["step_size"]
return current_a + (step_size * (-current_a)) / self.parameters["tau_a"]
def simulate_fast(self, stimulus: AbstractStimulus, total_time_s, time_start=0): def simulate_fast(self, stimulus: AbstractStimulus, total_time_s, time_start=0):
v_zero = self.parameters["v_zero"] v_zero = self.parameters["v_zero"]
@ -79,10 +100,8 @@ class LifacNoiseModel(AbstractModel):
mem_tau = self.parameters["mem_tau"] mem_tau = self.parameters["mem_tau"]
noise_strength = self.parameters["noise_strength"] noise_strength = self.parameters["noise_strength"]
stimulus_array = stimulus.as_array(time_start, total_time_s, step_size) rectified_stimulus = rectify_stimulus_array(stimulus.as_array(time_start, total_time_s, step_size))
rectified_stimulus = rectify_stimulus_array(stimulus_array) parameters = np.array([v_zero, a_zero, step_size, threshold, v_base, delta_a, tau_a, v_offset, mem_tau, noise_strength, time_start])
parameters = np.array([v_zero, a_zero, step_size, threshold, v_base, delta_a, tau_a, v_offset, mem_tau, noise_strength])
voltage_trace, adaption, spiketimes = simulate_fast(rectified_stimulus, total_time_s, parameters) voltage_trace, adaption, spiketimes = simulate_fast(rectified_stimulus, total_time_s, parameters)
@ -93,22 +112,6 @@ class LifacNoiseModel(AbstractModel):
return voltage_trace, spiketimes return voltage_trace, spiketimes
def _calculate_voltage_step(self, current_v, input_v):
v_base = self.parameters["v_base"]
step_size = self.parameters["step_size"]
v_offset = self.parameters["v_offset"]
mem_tau = self.parameters["mem_tau"]
noise_strength = self.parameters["noise_strength"]
noise_value = np.random.normal()
noise = noise_strength * noise_value / np.sqrt(step_size)
return current_v + step_size * ((v_base - current_v + v_offset + input_v + noise) / mem_tau)
def _calculate_adaption_step(self, current_a):
step_size = self.parameters["step_size"]
return current_a + (step_size * (-current_a)) / self.parameters["tau_a"]
def min_stimulus_strength_to_spike(self): def min_stimulus_strength_to_spike(self):
return self.parameters["threshold"] - self.parameters["v_base"] return self.parameters["threshold"] - self.parameters["v_base"]
@ -159,8 +162,8 @@ class LifacNoiseModel(AbstractModel):
""" """
base_stimulus = SinusAmplitudeModulationStimulus(base_stimulus_freq, 0, 0) base_stimulus = SinusAmplitudeModulationStimulus(base_stimulus_freq, 0, 0)
_, spiketimes = self.simulate_fast(base_stimulus, 30) _, spiketimes = self.simulate_fast(base_stimulus, 30)
time_x = 5
baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 5) baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, self.get_sampling_interval(), time_x)
relative_spiketimes = np.array([s % (1 / base_stimulus_freq) for s in spiketimes]) relative_spiketimes = np.array([s % (1 / base_stimulus_freq) for s in spiketimes])
eod_durations = np.full((len(spiketimes)), 1 / base_stimulus_freq) eod_durations = np.full((len(spiketimes)), 1 / base_stimulus_freq)
@ -179,12 +182,9 @@ class LifacNoiseModel(AbstractModel):
f_infinities = [] f_infinities = []
for contrast in contrasts: for contrast in contrasts:
stimulus = SinusAmplitudeModulationStimulus(base_freq, contrast, modulation_frequency) stimulus = SinusAmplitudeModulationStimulus(base_freq, contrast, modulation_frequency)
_, spiketimes = self.simulate_fast(stimulus, 0.5) _, spiketimes = self.simulate_fast(stimulus, 1)
if len(spiketimes) < 2: f_infinity = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, self.get_sampling_interval(), 0.3)
f_infinities.append(0)
else:
f_infinity = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 0.4)
f_infinities.append(f_infinity) f_infinities.append(f_infinity)
popt, pcov = curve_fit(fu.line, contrasts, f_infinities, maxfev=10000) popt, pcov = curve_fit(fu.line, contrasts, f_infinities, maxfev=10000)
@ -193,9 +193,58 @@ class LifacNoiseModel(AbstractModel):
return f_infinities, f_infinities_slope return f_infinities, f_infinities_slope
def find_v_offset(self, goal_baseline_frequency, base_stimulus, threshold=10):
test_model = self.get_model_copy()
simulation_length = 5
v_search_step_size = 1000
current_v_offset = 0
current_freq = test_v_offset(test_model, current_v_offset, base_stimulus, simulation_length)
while current_freq < goal_baseline_frequency:
current_v_offset += v_search_step_size
current_freq = test_v_offset(test_model, current_v_offset, base_stimulus, simulation_length)
lower_bound = current_v_offset - v_search_step_size
upper_bound = current_v_offset
return binary_search_base_freq(test_model, base_stimulus, goal_baseline_frequency, simulation_length, lower_bound, upper_bound, threshold)
def binary_search_base_freq(model: LifacNoiseModel, base_stimulus, goal_frequency, simulation_length, lower_bound, upper_bound, threshold):
if threshold <= 0:
raise ValueError("binary_search_base_freq() - LifacNoiseModel: threshold is not allowed to be negative!")
while True:
middle = upper_bound - (upper_bound - lower_bound)/2
frequency = test_v_offset(model, middle, base_stimulus, simulation_length)
# print('{:.1f}, {:.1f}, {:.1f}, {:.1f} vs {:.1f} '.format(lower_bound, middle, upper_bound, frequency, goal_frequency))
if abs(frequency - goal_frequency) < threshold:
return middle
elif frequency < goal_frequency:
lower_bound = middle
elif frequency > goal_frequency:
upper_bound = middle
elif abs(upper_bound-lower_bound) < 0.01 * threshold:
return middle
else:
print('lower bound: {:.1f}, middle: {:.1f}, upper_bound: {:.1f}, frequency: {:.1f} vs goal: {:.1f} '.format(lower_bound, middle, upper_bound, frequency, goal_frequency))
raise ValueError("binary_search_base_freq() - LifacNoiseModel: Goal frequency might be nan?")
def test_v_offset(model: LifacNoiseModel, v_offset, base_stimulus, simulation_length):
model.set_variable("v_offset", v_offset)
_, spiketimes = model.simulate_fast(base_stimulus, simulation_length)
freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 0.005, simulation_length/3)
return freq
@jit(nopython=True) @jit(nopython=True)
def rectify_stimulus_array(stimulus_array:np.ndarray): def rectify_stimulus_array(stimulus_array: np.ndarray):
return np.array([x if x > 0 else 0 for x in stimulus_array]) return np.array([x if x > 0 else 0 for x in stimulus_array])
@ -212,8 +261,9 @@ def simulate_fast(rectified_stimulus_array, total_time_s, parameters: np.ndarray
v_offset = parameters[7] v_offset = parameters[7]
mem_tau = parameters[8] mem_tau = parameters[8]
noise_strength = parameters[9] noise_strength = parameters[9]
time_start = parameters[10]
time = np.arange(0, total_time_s * 1000, step_size) time = np.arange(time_start, total_time_s, step_size)
length = len(time) length = len(time)
output_voltage = np.zeros(length) output_voltage = np.zeros(length)
adaption = np.zeros(length) adaption = np.zeros(length)
@ -223,7 +273,8 @@ def simulate_fast(rectified_stimulus_array, total_time_s, parameters: np.ndarray
output_voltage[0] = v_zero output_voltage[0] = v_zero
adaption[0] = a_zero adaption[0] = a_zero
for i in range(len(time)-1): for i in range(1, len(time), 1):
noise_value = np.random.normal() noise_value = np.random.normal()
noise = noise_strength * noise_value / np.sqrt(step_size) noise = noise_strength * noise_value / np.sqrt(step_size)
@ -233,7 +284,7 @@ def simulate_fast(rectified_stimulus_array, total_time_s, parameters: np.ndarray
if output_voltage[i] > threshold: if output_voltage[i] > threshold:
output_voltage[i] = v_base output_voltage[i] = v_base
spiketimes.append(i*step_size) spiketimes.append(i*step_size)
adaption[i] += delta_a / (tau_a / 1000) adaption[i] += delta_a / tau_a
return output_voltage, adaption, spiketimes return output_voltage, adaption, spiketimes

View File

@ -40,12 +40,12 @@ class SinusAmplitudeModulationStimulus(AbstractStimulus):
start_time = self.start_time start_time = self.start_time
duration = self.duration duration = self.duration
values = convert_to_array(carrier, amp, mod_freq, contrast, start_time, duration, time_start, total_time, step_size/1000) values = convert_to_array(carrier, amp, mod_freq, contrast, start_time, duration, time_start, total_time, step_size)
return values return values
#@jit(nopython=True) # makes it slower? # @jit(nopython=True) # makes it slower?
def convert_to_array(carrier_freq, amplitude, modulation_freq, contrast, start_time, duration, time_start, total_time, step_size_s): def convert_to_array(carrier_freq, amplitude, modulation_freq, contrast, start_time, duration, time_start, total_time, step_size_s):
# if the whole stimulus time has the amplitude modulation just built it at once; # if the whole stimulus time has the amplitude modulation just built it at once;
if time_start >= start_time and start_time+duration < time_start+total_time: if time_start >= start_time and start_time+duration < time_start+total_time:
@ -55,7 +55,7 @@ def convert_to_array(carrier_freq, amplitude, modulation_freq, contrast, start_t
return values return values
# if it is split into parts with and without amplitude modulation built it in parts: # if it is split into parts with and without amplitude modulation built it in parts:
values = np.empty(1) values = np.array([], dtype='float32')
if time_start < start_time: if time_start < start_time:
carrier_before_am = np.sin(2 * np.pi * carrier_freq * np.arange(time_start, start_time, step_size_s)) carrier_before_am = np.sin(2 * np.pi * carrier_freq * np.arange(time_start, start_time, step_size_s))

View File

@ -0,0 +1,130 @@
import unittest
import numpy as np
import helperFunctions as hF
import matplotlib.pyplot as plt
class HelperFunctionsTester(unittest.TestCase):
noise_levels = [0, 0.05, 0.1, 0.2]
frequencies = [0, 1, 5, 30, 100, 500, 750, 1000]
def setUp(self):
pass
def tearDown(self):
pass
def test_calculate_eod_frequency(self):
start = 0
end = 5
step = 0.1 / 1000
freqs = [0, 1, 10, 500, 700, 1000]
for freq in freqs:
time = np.arange(start, end, step)
eod = np.sin(freq*(2*np.pi) * time)
self.assertEqual(freq, round(hF.calculate_eod_frequency(time, eod), 2))
def test__vector_strength__is_1(self):
length = 2000
rel_spike_times = np.full(length, 0.3)
eod_durations = np.full(length, 0.14)
self.assertEqual(1, round(hF.__vector_strength__(rel_spike_times,eod_durations), 2))
def test__vector_strength__is_0(self):
length = 2000
period = 0.14
rel_spike_times = np.arange(0, period, period/length)
eod_durations = np.full(length, period)
self.assertEqual(0, round(hF.__vector_strength__(rel_spike_times, eod_durations), 5))
def test_mean_freq_of_spiketimes_after_time_x(self):
simulation_time = 8
for freq in self.frequencies:
for n in self.noise_levels:
spikes = generate_jittered_spiketimes(freq, n, end=simulation_time)
sim_freq = hF.mean_freq_of_spiketimes_after_time_x(spikes, 0.00005, simulation_time/4, time_in_ms=False)
max_diff = round(n*(10+0.7*np.sqrt(freq)), 2)
# print("noise: {:.2f}".format(n), "\texpected: {:.2f}".format(freq), "\tgotten: {:.2f}".format(round(sim_freq, 2)), "\tfreq diff: {:.2f}".format(abs(freq-round(sim_freq, 2))), "\tmax_diff:", max_diff)
self.assertTrue(abs(freq-round(sim_freq)) <= max_diff, msg="expected freq: {:.2f} vs calculated: {:.2f}. max diff was {:.2f}".format(freq, sim_freq, max_diff))
def test_calculate_isi_frequency(self):
simulation_time = 1
sampling_interval = 0.00005
for freq in self.frequencies:
for n in self.noise_levels:
spikes = generate_jittered_spiketimes(freq, n, end=simulation_time)
sim_freq = hF.calculate_isi_frequency(spikes, sampling_interval, time_in_ms=False)
isis = np.diff(spikes)
step_length = isis / sampling_interval
rounded_step_length = np.around(step_length)
expected_length = sum(rounded_step_length)
length = len(sim_freq)
self.assertEqual(expected_length, length)
# def test(self):
# test_distribution()
def generate_jittered_spiketimes(frequency, noise_level, start=0, end=5):
if frequency == 0:
return []
mean_isi = 1 / frequency
if noise_level == 0:
return np.arange(start, end, mean_isi)
spikes = [start]
count = 0
while True:
next_isi = np.random.normal(mean_isi, noise_level*mean_isi)
if next_isi <= 0:
count += 1
continue
next_spike = spikes[-1] + next_isi
if next_spike > end:
break
spikes.append(spikes[-1] + next_isi)
# print("count: {:} percentage of missed: {:.2f}".format(count, count/len(spikes)))
if count > 0.01*len(spikes):
print("!!! Danger of lowering actual simulated frequency")
pass
return spikes
def test_distribution():
simulation_time = 5
freqs = [5, 30, 100, 500, 1000]
noise_level = [0.05, 0.1, 0.2, 0.3]
repetitions = 1000
for freq in freqs:
diffs_per_noise = []
for n in noise_level:
diffs = []
print("#### - freq:", freq, "noise level:", n )
for reps in range(repetitions):
spikes = generate_jittered_spiketimes(freq, n, end=simulation_time)
sim_freq = hF.mean_freq_of_spiketimes_after_time_x(spikes, 0.0002, simulation_time / 4, time_in_ms=False)
diffs.append(sim_freq-freq)
diffs_per_noise.append(diffs)
fig, axs = plt.subplots(1, len(noise_level), figsize=(3.5*len(noise_level), 4), sharex='all')
for i in range(len(diffs_per_noise)):
max_diff = np.max(np.abs(diffs_per_noise[i]))
print("Freq: ", freq, "noise: {:.2f}".format(noise_level[i]), "mean: {:.2f}".format(np.mean(diffs_per_noise[i])), "max_diff: {:.4f}".format(max_diff))
bins = np.arange(-max_diff, max_diff, 2*max_diff/100)
axs[i].hist(diffs_per_noise[i], bins=bins)
axs[i].set_title('Noise level: {:.2f}'.format(noise_level[i]))
plt.show()
plt.close()

View File

@ -0,0 +1,70 @@
import unittest
import numpy as np
import helperFunctions as hF
import matplotlib.pyplot as plt
from models.LIFACnoise import LifacNoiseModel
from stimuli.SinusAmplitudeModulation import SinusAmplitudeModulationStimulus
class HelperFunctionsTester(unittest.TestCase):
# TODO specify exact parameters for all test (multiple sets?)
def setUp(self) -> None:
self.model = LifacNoiseModel()
self.model.set_variable("noise_strength", 0)
self.step_sinus = SinusAmplitudeModulationStimulus(700, 0.5, 10, start_time=0.5, duration=0.5)
self.base_sinus = SinusAmplitudeModulationStimulus(700, 0, 0)
self.none_stimulus = SinusAmplitudeModulationStimulus(700, 0, 0, amplitude=0)
self.simulation_time = 1.5
self.voltage_precision = 0.000001
def test_simulate_fast_step_sinus(self):
v1, spikes = self.model.simulate(self.step_sinus, self.simulation_time)
v_fast, spikes_fast = self.model.simulate_fast(self.step_sinus, self.simulation_time)
test = [abs(v1[i] - v_fast[i]) > self.voltage_precision for i in range(len(v1))]
self.assertTrue(sum(test) == 0, msg="The voltage traces between the fast and slow simulation aren't equal diff: {:}".format(sum(test)))
self.assertTrue(np.array_equal(spikes, spikes_fast), msg="The spike times between the fast and slow simulation aren't equal")
def test_simulate_fast_base_sinus(self):
v1, spikes = self.model.simulate(self.base_sinus, self.simulation_time)
v_fast, spikes_fast = self.model.simulate_fast(self.base_sinus, self.simulation_time)
test = [abs(v1[i] - v_fast[i]) > self.voltage_precision for i in range(len(v1))]
self.assertTrue(sum(test) == 0, msg="The voltage traces between the fast and slow simulation aren't equal diff: {:}".format(sum(test)))
self.assertTrue(np.array_equal(spikes, spikes_fast), msg="The spike times between the fast and slow simulation aren't equal")
def test_simulate_fast_no_stimulus(self):
v1, spikes = self.model.simulate(self.none_stimulus, self.simulation_time)
v_fast, spikes_fast = self.model.simulate_fast(self.none_stimulus, self.simulation_time)
test = [abs(v1[i] - v_fast[i]) > self.voltage_precision for i in range(len(v1))]
self.assertTrue(sum(test) == 0, msg="The voltage traces between the fast and slow simulation aren't equal diff: {:}".format(sum(test)))
self.assertTrue(np.array_equal(spikes, spikes_fast), msg="The spike times between the fast and slow simulation aren't equal")
def test_find_v_offset(self):
v_offsets = [50, 100, 150, 250, 1000]
threshold = 0.01
test_model = self.model.get_model_copy()
stimulus = SinusAmplitudeModulationStimulus(700, 0, 0, amplitude=0) # no stimulus
for offset in v_offsets:
test_model.set_variable("v_offset", offset)
_, spikes = test_model.simulate_fast(stimulus, 5)
goal_freq = hF.mean_freq_of_spiketimes_after_time_x(spikes, 0.0005, 1)
if goal_freq <= threshold:
print("test Offset ({:.1f}) generates a too low frequency: {:.2f}".format(offset, goal_freq))
continue
measured_offset = self.model.find_v_offset(goal_freq, stimulus, threshold)
# print(offset, measured_offset)
self.assertTrue(abs(offset - measured_offset) < 1)
def test_fin_v_offset_threshold_limit(self):
for threshold in [0, -0.0001, -2, -500]:
self.assertRaises(ValueError, self.model.find_v_offset, 700, self.base_sinus, threshold)