diff --git a/fit_lifacnoise.py b/fit_lifacnoise.py index 5549c52..bf52acb 100644 --- a/fit_lifacnoise.py +++ b/fit_lifacnoise.py @@ -10,8 +10,12 @@ import functions as fu import time import matplotlib.pyplot as plt + def main(): + run_test_with_fixed_model() + +def run_with_real_data(): for celldata in icelldata_of_dir("./data/"): start_time = time.time() fitter = Fitter(celldata) @@ -26,14 +30,39 @@ def main(): pass +def run_test_with_fixed_model(): + a_tau = 10 + a_delta = 0.08 + + parameters = {'mem_tau': 5, 'delta_a': a_delta, 'input_scaling': 100, + 'v_offset': 50, 'threshold': 1, 'v_base': 0, 'step_size': 0.05, 'tau_a': a_tau, + 'a_zero': 0, 'v_zero': 0, 'noise_strength': 0.5} + + model = LifacNoiseModel(parameters) + eod_freq = 750 + contrasts = np.arange(0.5, 1.51, 0.1) + modulation_freq = 10 + print(contrasts) + baseline_freq, vector_strength, serial_correlation = model.calculate_baseline_markers(eod_freq) + f_infinities, f_infinities_slope = model.calculate_fi_markers(contrasts, eod_freq, modulation_freq) + + fitter = Fitter() + fmin, fit_parameters = fitter.fit_model_to_values(eod_freq, baseline_freq, serial_correlation, vector_strength, contrasts, f_infinities, f_infinities_slope, a_delta, a_tau) + print("calculated parameters:") + print(fit_parameters) + + print("ref parameters:") + print(parameters) + + class Fitter: - def __init__(self, data: CellData, step_size=None): + def __init__(self, step_size=None): if step_size is not None: self.model = LifacNoiseModel({"step_size": step_size}) else: self.model = LifacNoiseModel({"step_size": 0.05}) - self.data = data + # self.data = data self.fi_contrasts = [] self.eod_freq = 0 @@ -53,16 +82,15 @@ class Fitter: self.a_delta = 0 self.counter = 0 - self.calculate_needed_values_from_data() - def calculate_needed_values_from_data(self): - self.eod_freq = self.data.get_eod_frequency() + def calculate_needed_values_from_data(self, data: CellData): + self.eod_freq = data.get_eod_frequency() - self.baseline_freq = self.data.get_base_frequency() - self.vector_strength = self.data.get_vector_strength() - self.serial_correlation = self.data.get_serial_correlation(self.sc_max_lag) + self.baseline_freq = data.get_base_frequency() + self.vector_strength = data.get_vector_strength() + self.serial_correlation = data.get_serial_correlation(self.sc_max_lag) - fi_curve = FICurve(self.data, contrast=True) + fi_curve = FICurve(data, contrast=True) self.fi_contrasts = fi_curve.stimulus_value self.f_infinities = fi_curve.f_infinities self.f_infinities_slope = fi_curve.get_f_infinity_slope() @@ -70,11 +98,12 @@ class Fitter: f_zero_slope = fi_curve.get_fi_curve_slope_of_straight() self.a_delta = f_zero_slope / self.f_infinities_slope - adaption = Adaption(self.data, fi_curve) + adaption = Adaption(data, fi_curve) self.a_tau = adaption.get_tau_real() # mem_tau, (threshold?), (v_offset), noise_strength, input_scaling def cost_function(self, X, tau_a=10, delta_a=3, error_scaling=()): + freq_sampling_rate = 0.005 # set model parameters to the given ones: self.model.set_variable("mem_tau", X[0]) self.model.set_variable("noise_strength", X[1]) @@ -83,18 +112,19 @@ class Fitter: self.model.set_variable("delta_a", delta_a) # minimize the difference in baseline_freq first by fitting v_offset - v_offset = self.__fit_v_offset_to_baseline_frequency__() + # v_offset = self.__fit_v_offset_to_baseline_frequency__() + v_offset = self.model.find_v_offset(self.baseline_freq, self.eod_freq) self.model.set_variable("v_offset", v_offset) # only eod with amplitude 1 and no modulation base_stimulus = SinusAmplitudeModulationStimulus(self.eod_freq, 0, 0) _, spiketimes = self.model.simulate_fast(base_stimulus, 30) - baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 5) + baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, freq_sampling_rate, 5) # print("model:", baseline_freq, "data:", self.baseline_freq) - relative_spiketimes = np.array([s % (1/self.eod_freq) for s in spiketimes]) - eod_durations = np.full((len(spiketimes)), 1/self.eod_freq) + relative_spiketimes = np.array([s % (1/self.eod_freq) for s in spiketimes if s > 0]) + eod_durations = np.full((len(relative_spiketimes)), 1/self.eod_freq) vector_strength = hF.__vector_strength__(relative_spiketimes, eod_durations) serial_correlation = hF.calculate_serial_correlation(np.array(spiketimes), self.sc_max_lag) @@ -106,7 +136,7 @@ class Fitter: if len(spiketimes) < 2: f_infinities.append(0) else: - f_infinity = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 0.4) + f_infinity = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, freq_sampling_rate, 0.4) f_infinities.append(f_infinity) popt, pcov = curve_fit(fu.line, self.fi_contrasts, f_infinities, maxfev=10000) @@ -127,7 +157,8 @@ class Fitter: error_f_inf = error_f_inf / len(f_infinities) self.counter += 1 # print("mem_tau:", X[0], "noise:", X[0], "input_scaling:", X[2]) - print("Cost function run times:", self.counter, "errors:", [error_bf, error_vs, error_sc, error_f_inf_slope, error_f_inf]) + errors = [error_bf, error_vs, error_sc, error_f_inf_slope, error_f_inf] + print("Cost function run times:", self.counter, "error sum:", sum(errors), errors) return error_bf + error_vs + error_sc + error_f_inf_slope + error_f_inf def __fit_v_offset_to_baseline_frequency__(self): @@ -201,7 +232,7 @@ class Fitter: else: baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, simulation_time/2) - if abs(baseline_freq - self.baseline_freq) < 1: + if abs(baseline_freq - self.baseline_freq) < 5: # print("close enough:", baseline_freq, self.baseline_freq, abs(baseline_freq - self.baseline_freq)) break elif baseline_freq < self.baseline_freq: @@ -211,14 +242,28 @@ class Fitter: return middle - def fit_model_to_data(self): + def fit_model_to_data(self, data: CellData): + self.calculate_needed_values_from_data(data) + return self.fit_model() + + def fit_model_to_values(self, eod_freq, baseline_freq, sc, vs, fi_contrasts, fi_inf_values, fi_inf_slope, a_delta, a_tau): + self.eod_freq = eod_freq + self.baseline_freq = baseline_freq + self.serial_correlation = sc + self.vector_strength = vs + self.fi_contrasts = fi_contrasts + self.f_infinities = fi_inf_values + self.f_infinities_slope = fi_inf_slope + self.a_delta = a_delta + self.a_tau = a_tau + + return self.fit_model() + + def fit_model(self): x0 = np.array([20, 15, 75]) init_simplex = np.array([np.array([2, 1, 10]), np.array([40, 100, 140]), np.array([20, 50, 70]), np.array([150, 1, 200])]) fmin = minimize(fun=self.cost_function, x0=x0, args=(self.a_tau, self.a_delta), method="Nelder-Mead", options={"initial_simplex": init_simplex}) - - #fmin = minimize(fun=self.cost_function, x0=x0, args=(self.a_tau, self.a_delta), method="BFGS") - return fmin, self.model.get_parameters() diff --git a/generalTests.py b/generalTests.py index f6cec94..385a4f5 100644 --- a/generalTests.py +++ b/generalTests.py @@ -73,19 +73,27 @@ def test_peak_detection(): def test_simulation_speed(): - parameters = {'mem_tau': 21.348990483539083, 'delta_a': 20.41809814660199, 'input_scaling': 3.0391541280864196, 'v_offset': 26.25, 'threshold': 1, 'v_base': 0, 'step_size': 0.01, 'tau_a': 158.0404259501454, 'a_zero': 0, 'v_zero': 0, 'noise_strength': 2.87718460648148} + parameters = {'mem_tau': 21.348990483539083, 'delta_a': 20.41809814660199, 'input_scaling': 3.0391541280864196, 'v_offset': 26.25, 'threshold': 1, 'v_base': 0, 'step_size': 0.00005, 'tau_a': 158.0404259501454, 'a_zero': 0, 'v_zero': 0, 'noise_strength': 2.87718460648148} model = LifacNoiseModel(parameters) - repetitions = 20 + repetitions = 1 seconds = 10 stimulus = SinusAmplitudeModulationStimulus(750, 1, 10, 1, 8) time_start = 0 t_start = time.time() for i in range(repetitions): - v, spikes = model.simulate_fast(stimulus, seconds, time_start) + v, spikes = model.simulate_fast(stimulus, seconds, time_start) + print(len(v)) + print(len(spikes)) + + #time_v = np.arange(time_start, seconds, model.get_sampling_interval()) + #plt.plot(time_v, v, '.') + #plt.show() + #freq = hf.mean_freq_of_spiketimes_after_time_x(spikes, parameters["step_size"], 0) + #print(freq) t_end = time.time() - print("baseline markers:",model.calculate_baseline_markers(750, 3)) - print("took:", round((t_end-t_start)/repetitions, 2), "seconds for " + str(seconds) + "s simulation", "step size:", parameters["step_size"]) + #print("baseline markers:", model.calculate_baseline_markers(750, 3)) + print("took:", round((t_end-t_start)/repetitions, 5), "seconds for " + str(seconds) + "s simulation", "step size:", parameters["step_size"]*1000, "ms") if __name__ == '__main__': diff --git a/helperFunctions.py b/helperFunctions.py index bef6538..c97906d 100644 --- a/helperFunctions.py +++ b/helperFunctions.py @@ -74,37 +74,32 @@ def all_calculate_mean_isi_frequencies(spiketimes, time_start, sampling_interval return times, mean_frequencies -# TODO remove additional time vector calculation! -def calculate_isi_frequency(spiketimes, time_start, sampling_interval): - first_isi = spiketimes[0] - time_start - isis = [first_isi] - isis.extend(np.diff(spiketimes)) - time = np.arange(time_start, spiketimes[-1], sampling_interval) - - full_frequency = [] - i = 0 +def calculate_isi_frequency(spiketimes, sampling_interval, time_in_ms=True): + """ + Calculates the frequency over time according to the inter spike intervals. + + :param spiketimes: time points spikes were measured array_like + :param sampling_interval: the sampling interval in which the frequency should be given back + :param time_in_ms: whether the time is in ms or in s for BOTH the spiketimes and the sampling interval + :return: an np.array with the isi frequency starting at the time of first spike and ending at the time of the last spike + """ + isis = np.diff(spiketimes) + + if time_in_ms: + isis = isis / 1000 + sampling_interval = sampling_interval / 1000 + + full_frequency = np.array([]) for isi in isis: if isi == 0: warn("An ISI was zero in FiCurve:__calculate_mean_isi_frequency__()") + print("ISI was zero:", spiketimes) continue freq = 1 / isi - frequency_step = int(round(isi * (1 / sampling_interval))) * [freq] - full_frequency.extend(frequency_step) - i += 1 - if len(full_frequency) != len(time): - if abs(len(full_frequency) - len(time)) == 1: - warn("FiCurve:__calculate_mean_isi_frequency__():\nFrequency and time were one of in length!") - if len(full_frequency) < len(time): - time = time[:len(full_frequency)] - else: - full_frequency = full_frequency[:len(time)] - else: - print("ERROR PRINT:") - print("freq:", len(full_frequency), "time:", len(time), "diff:", len(full_frequency) - len(time)) - raise RuntimeError("FiCurve:__calculate_mean_isi_frequency__():\n" - "Frequency and time are not the same length!") + frequency_step = np.full(int(round(isi * (1 / sampling_interval))), freq) + full_frequency = np.concatenate((full_frequency, frequency_step)) - return time, full_frequency + return full_frequency def calculate_mean_frequency(trial_times, trial_freqs): @@ -118,27 +113,20 @@ def calculate_mean_frequency(trial_times, trial_freqs): return time, mean_freq -def mean_freq_of_spiketimes_after_time_x(spiketimes, time_x): +def mean_freq_of_spiketimes_after_time_x(spiketimes, sampling_interval, time_x, time_in_ms=False): """ Calculates the mean frequency of the portion of spiketimes that is after last_x_time """ - idx = -1 - if time_x < spiketimes[int(len(spiketimes)/2)]: - for i in range(len(spiketimes)): - if spiketimes[i] > time_x: - idx = i + 1 - break - else: - for i in range(len(spiketimes) - 1, -1, -1): - if spiketimes[i] < time_x: - idx = i + 1 - break - - all_isi = np.diff(spiketimes[idx:]) / 1000 - if len(all_isi) < 5: + if len(spiketimes) <= 1: return 0 - mean_freq = np.mean([1 / isi for isi in all_isi]) + + freq = calculate_isi_frequency(spiketimes, sampling_interval, time_in_ms) + # returned frequency starts at the + idx = int((time_x-spiketimes[0]) / sampling_interval) + + mean_freq = np.mean(freq[idx:]) return mean_freq + # @jit(nopython=True) # only faster at around 30 000 calls def calculate_coefficient_of_variation(spiketimes: np.ndarray) -> float: # CV (stddev of ISI divided by mean ISI (np.diff(spiketimes)) @@ -167,10 +155,13 @@ def calculate_serial_correlation(spiketimes: np.ndarray, max_lag: int) -> np.nda def calculate_eod_frequency(time, eod): - + # TODO for few samples very volatile measure! up_indicies, down_indicies = threshold_crossings(eod, 0) up_times, down_times = threshold_crossing_times(time, eod, 0, up_indicies, down_indicies) + if len(up_times) == 0: + return 0 + durations = np.diff(up_times) mean_duration = np.mean(durations) @@ -297,3 +288,4 @@ def __vector_strength__(relative_spike_times: np.ndarray, eod_durations: np.ndar vs = np.sqrt((1 / n * np.sum(np.cos(phase_times))) ** 2 + (1 / n * np.sum(np.sin(phase_times))) ** 2) return vs + diff --git a/introduction/test.py b/introduction/test.py new file mode 100644 index 0000000..5792884 --- /dev/null +++ b/introduction/test.py @@ -0,0 +1,10 @@ +import numpy as np +import matplotlib.pyplot as plt + + +def main(): + pass + + +if __name__ == '__main__': + main() diff --git a/models/LIFACnoise.py b/models/LIFACnoise.py index b738e5f..a3eef19 100644 --- a/models/LIFACnoise.py +++ b/models/LIFACnoise.py @@ -19,10 +19,10 @@ class LifacNoiseModel(AbstractModel): "v_offset": 50, "input_scaling": 1, "delta_a": 0.4, - "tau_a": 40, + "tau_a": 0.04, "a_zero": 0, "noise_strength": 3, - "step_size": 0.01} + "step_size": 0.00005} def __init__(self, params: dict = None): super().__init__(params) @@ -36,26 +36,31 @@ class LifacNoiseModel(AbstractModel): def simulate(self, stimulus: AbstractStimulus, total_time_s): self.stimulus = stimulus - output_voltage = [] - adaption = [] + time = np.arange(0, total_time_s, self.parameters["step_size"]) + output_voltage = np.zeros(len(time), dtype='float64') + adaption = np.zeros(len(time), dtype='float64') spiketimes = [] + current_v = self.parameters["v_zero"] current_a = self.parameters["a_zero"] + output_voltage[0] = current_v + adaption[0] = current_a - for time_point in np.arange(0, total_time_s*1000, self.parameters["step_size"]): + for i in range(1, len(time), 1): + time_point = time[i] # rectified input: - stimulus_strength = fu.rectify(stimulus.value_at_time_in_ms(time_point)) + stimulus_strength = fu.rectify(stimulus.value_at_time_in_s(time_point)) v_next = self._calculate_voltage_step(current_v, stimulus_strength - current_a) a_next = self._calculate_adaption_step(current_a) if v_next > self.parameters["threshold"]: v_next = self.parameters["v_base"] - spiketimes.append(time_point/1000) - a_next += self.parameters["delta_a"] / (self.parameters["tau_a"] / 1000) + spiketimes.append(time_point) + a_next += self.parameters["delta_a"] / (self.parameters["tau_a"]) - output_voltage.append(v_next) - adaption.append(a_next) + output_voltage[i] = v_next + adaption[i] = a_next current_v = v_next current_a = a_next @@ -66,6 +71,22 @@ class LifacNoiseModel(AbstractModel): return output_voltage, spiketimes + def _calculate_voltage_step(self, current_v, input_v): + v_base = self.parameters["v_base"] + step_size = self.parameters["step_size"] + v_offset = self.parameters["v_offset"] + mem_tau = self.parameters["mem_tau"] + + noise_strength = self.parameters["noise_strength"] + noise_value = np.random.normal() + noise = noise_strength * noise_value / np.sqrt(step_size) + + return current_v + step_size * ((v_base - current_v + v_offset + input_v + noise) / mem_tau) + + def _calculate_adaption_step(self, current_a): + step_size = self.parameters["step_size"] + return current_a + (step_size * (-current_a)) / self.parameters["tau_a"] + def simulate_fast(self, stimulus: AbstractStimulus, total_time_s, time_start=0): v_zero = self.parameters["v_zero"] @@ -79,10 +100,8 @@ class LifacNoiseModel(AbstractModel): mem_tau = self.parameters["mem_tau"] noise_strength = self.parameters["noise_strength"] - stimulus_array = stimulus.as_array(time_start, total_time_s, step_size) - rectified_stimulus = rectify_stimulus_array(stimulus_array) - - parameters = np.array([v_zero, a_zero, step_size, threshold, v_base, delta_a, tau_a, v_offset, mem_tau, noise_strength]) + rectified_stimulus = rectify_stimulus_array(stimulus.as_array(time_start, total_time_s, step_size)) + parameters = np.array([v_zero, a_zero, step_size, threshold, v_base, delta_a, tau_a, v_offset, mem_tau, noise_strength, time_start]) voltage_trace, adaption, spiketimes = simulate_fast(rectified_stimulus, total_time_s, parameters) @@ -93,22 +112,6 @@ class LifacNoiseModel(AbstractModel): return voltage_trace, spiketimes - def _calculate_voltage_step(self, current_v, input_v): - v_base = self.parameters["v_base"] - step_size = self.parameters["step_size"] - v_offset = self.parameters["v_offset"] - mem_tau = self.parameters["mem_tau"] - - noise_strength = self.parameters["noise_strength"] - noise_value = np.random.normal() - noise = noise_strength * noise_value / np.sqrt(step_size) - - return current_v + step_size * ((v_base - current_v + v_offset + input_v + noise) / mem_tau) - - def _calculate_adaption_step(self, current_a): - step_size = self.parameters["step_size"] - return current_a + (step_size * (-current_a)) / self.parameters["tau_a"] - def min_stimulus_strength_to_spike(self): return self.parameters["threshold"] - self.parameters["v_base"] @@ -159,8 +162,8 @@ class LifacNoiseModel(AbstractModel): """ base_stimulus = SinusAmplitudeModulationStimulus(base_stimulus_freq, 0, 0) _, spiketimes = self.simulate_fast(base_stimulus, 30) - - baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 5) + time_x = 5 + baseline_freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, self.get_sampling_interval(), time_x) relative_spiketimes = np.array([s % (1 / base_stimulus_freq) for s in spiketimes]) eod_durations = np.full((len(spiketimes)), 1 / base_stimulus_freq) @@ -179,13 +182,10 @@ class LifacNoiseModel(AbstractModel): f_infinities = [] for contrast in contrasts: stimulus = SinusAmplitudeModulationStimulus(base_freq, contrast, modulation_frequency) - _, spiketimes = self.simulate_fast(stimulus, 0.5) + _, spiketimes = self.simulate_fast(stimulus, 1) - if len(spiketimes) < 2: - f_infinities.append(0) - else: - f_infinity = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 0.4) - f_infinities.append(f_infinity) + f_infinity = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, self.get_sampling_interval(), 0.3) + f_infinities.append(f_infinity) popt, pcov = curve_fit(fu.line, contrasts, f_infinities, maxfev=10000) @@ -193,9 +193,58 @@ class LifacNoiseModel(AbstractModel): return f_infinities, f_infinities_slope + def find_v_offset(self, goal_baseline_frequency, base_stimulus, threshold=10): + test_model = self.get_model_copy() + simulation_length = 5 + + v_search_step_size = 1000 + + current_v_offset = 0 + + current_freq = test_v_offset(test_model, current_v_offset, base_stimulus, simulation_length) + + while current_freq < goal_baseline_frequency: + current_v_offset += v_search_step_size + current_freq = test_v_offset(test_model, current_v_offset, base_stimulus, simulation_length) + + lower_bound = current_v_offset - v_search_step_size + upper_bound = current_v_offset + + return binary_search_base_freq(test_model, base_stimulus, goal_baseline_frequency, simulation_length, lower_bound, upper_bound, threshold) + + +def binary_search_base_freq(model: LifacNoiseModel, base_stimulus, goal_frequency, simulation_length, lower_bound, upper_bound, threshold): + + if threshold <= 0: + raise ValueError("binary_search_base_freq() - LifacNoiseModel: threshold is not allowed to be negative!") + while True: + middle = upper_bound - (upper_bound - lower_bound)/2 + frequency = test_v_offset(model, middle, base_stimulus, simulation_length) + + # print('{:.1f}, {:.1f}, {:.1f}, {:.1f} vs {:.1f} '.format(lower_bound, middle, upper_bound, frequency, goal_frequency)) + if abs(frequency - goal_frequency) < threshold: + return middle + elif frequency < goal_frequency: + lower_bound = middle + elif frequency > goal_frequency: + upper_bound = middle + elif abs(upper_bound-lower_bound) < 0.01 * threshold: + return middle + else: + print('lower bound: {:.1f}, middle: {:.1f}, upper_bound: {:.1f}, frequency: {:.1f} vs goal: {:.1f} '.format(lower_bound, middle, upper_bound, frequency, goal_frequency)) + raise ValueError("binary_search_base_freq() - LifacNoiseModel: Goal frequency might be nan?") + + +def test_v_offset(model: LifacNoiseModel, v_offset, base_stimulus, simulation_length): + model.set_variable("v_offset", v_offset) + _, spiketimes = model.simulate_fast(base_stimulus, simulation_length) + freq = hF.mean_freq_of_spiketimes_after_time_x(spiketimes, 0.005, simulation_length/3) + + return freq + @jit(nopython=True) -def rectify_stimulus_array(stimulus_array:np.ndarray): +def rectify_stimulus_array(stimulus_array: np.ndarray): return np.array([x if x > 0 else 0 for x in stimulus_array]) @@ -212,8 +261,9 @@ def simulate_fast(rectified_stimulus_array, total_time_s, parameters: np.ndarray v_offset = parameters[7] mem_tau = parameters[8] noise_strength = parameters[9] + time_start = parameters[10] - time = np.arange(0, total_time_s * 1000, step_size) + time = np.arange(time_start, total_time_s, step_size) length = len(time) output_voltage = np.zeros(length) adaption = np.zeros(length) @@ -223,7 +273,8 @@ def simulate_fast(rectified_stimulus_array, total_time_s, parameters: np.ndarray output_voltage[0] = v_zero adaption[0] = a_zero - for i in range(len(time)-1): + for i in range(1, len(time), 1): + noise_value = np.random.normal() noise = noise_strength * noise_value / np.sqrt(step_size) @@ -233,7 +284,7 @@ def simulate_fast(rectified_stimulus_array, total_time_s, parameters: np.ndarray if output_voltage[i] > threshold: output_voltage[i] = v_base spiketimes.append(i*step_size) - adaption[i] += delta_a / (tau_a / 1000) + adaption[i] += delta_a / tau_a return output_voltage, adaption, spiketimes diff --git a/stimuli/SinusAmplitudeModulation.py b/stimuli/SinusAmplitudeModulation.py index fe23e77..d4e2cc0 100644 --- a/stimuli/SinusAmplitudeModulation.py +++ b/stimuli/SinusAmplitudeModulation.py @@ -40,12 +40,12 @@ class SinusAmplitudeModulationStimulus(AbstractStimulus): start_time = self.start_time duration = self.duration - values = convert_to_array(carrier, amp, mod_freq, contrast, start_time, duration, time_start, total_time, step_size/1000) + values = convert_to_array(carrier, amp, mod_freq, contrast, start_time, duration, time_start, total_time, step_size) return values -#@jit(nopython=True) # makes it slower? +# @jit(nopython=True) # makes it slower? def convert_to_array(carrier_freq, amplitude, modulation_freq, contrast, start_time, duration, time_start, total_time, step_size_s): # if the whole stimulus time has the amplitude modulation just built it at once; if time_start >= start_time and start_time+duration < time_start+total_time: @@ -55,7 +55,7 @@ def convert_to_array(carrier_freq, amplitude, modulation_freq, contrast, start_t return values # if it is split into parts with and without amplitude modulation built it in parts: - values = np.empty(1) + values = np.array([], dtype='float32') if time_start < start_time: carrier_before_am = np.sin(2 * np.pi * carrier_freq * np.arange(time_start, start_time, step_size_s)) diff --git a/unittests/testHelperFunctions.py b/unittests/testHelperFunctions.py new file mode 100644 index 0000000..47351e0 --- /dev/null +++ b/unittests/testHelperFunctions.py @@ -0,0 +1,130 @@ +import unittest +import numpy as np +import helperFunctions as hF +import matplotlib.pyplot as plt + + +class HelperFunctionsTester(unittest.TestCase): + + noise_levels = [0, 0.05, 0.1, 0.2] + frequencies = [0, 1, 5, 30, 100, 500, 750, 1000] + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_calculate_eod_frequency(self): + start = 0 + end = 5 + step = 0.1 / 1000 + freqs = [0, 1, 10, 500, 700, 1000] + for freq in freqs: + time = np.arange(start, end, step) + eod = np.sin(freq*(2*np.pi) * time) + self.assertEqual(freq, round(hF.calculate_eod_frequency(time, eod), 2)) + + def test__vector_strength__is_1(self): + length = 2000 + rel_spike_times = np.full(length, 0.3) + eod_durations = np.full(length, 0.14) + + self.assertEqual(1, round(hF.__vector_strength__(rel_spike_times,eod_durations), 2)) + + def test__vector_strength__is_0(self): + length = 2000 + period = 0.14 + rel_spike_times = np.arange(0, period, period/length) + eod_durations = np.full(length, period) + + self.assertEqual(0, round(hF.__vector_strength__(rel_spike_times, eod_durations), 5)) + + def test_mean_freq_of_spiketimes_after_time_x(self): + simulation_time = 8 + for freq in self.frequencies: + for n in self.noise_levels: + spikes = generate_jittered_spiketimes(freq, n, end=simulation_time) + sim_freq = hF.mean_freq_of_spiketimes_after_time_x(spikes, 0.00005, simulation_time/4, time_in_ms=False) + + max_diff = round(n*(10+0.7*np.sqrt(freq)), 2) + # print("noise: {:.2f}".format(n), "\texpected: {:.2f}".format(freq), "\tgotten: {:.2f}".format(round(sim_freq, 2)), "\tfreq diff: {:.2f}".format(abs(freq-round(sim_freq, 2))), "\tmax_diff:", max_diff) + self.assertTrue(abs(freq-round(sim_freq)) <= max_diff, msg="expected freq: {:.2f} vs calculated: {:.2f}. max diff was {:.2f}".format(freq, sim_freq, max_diff)) + + def test_calculate_isi_frequency(self): + simulation_time = 1 + sampling_interval = 0.00005 + + for freq in self.frequencies: + for n in self.noise_levels: + spikes = generate_jittered_spiketimes(freq, n, end=simulation_time) + sim_freq = hF.calculate_isi_frequency(spikes, sampling_interval, time_in_ms=False) + + isis = np.diff(spikes) + step_length = isis / sampling_interval + rounded_step_length = np.around(step_length) + expected_length = sum(rounded_step_length) + + length = len(sim_freq) + self.assertEqual(expected_length, length) + + + + # def test(self): + # test_distribution() + +def generate_jittered_spiketimes(frequency, noise_level, start=0, end=5): + if frequency == 0: + return [] + + mean_isi = 1 / frequency + if noise_level == 0: + return np.arange(start, end, mean_isi) + + spikes = [start] + count = 0 + while True: + next_isi = np.random.normal(mean_isi, noise_level*mean_isi) + if next_isi <= 0: + count += 1 + continue + next_spike = spikes[-1] + next_isi + if next_spike > end: + break + spikes.append(spikes[-1] + next_isi) + + # print("count: {:} percentage of missed: {:.2f}".format(count, count/len(spikes))) + if count > 0.01*len(spikes): + print("!!! Danger of lowering actual simulated frequency") + pass + return spikes + + +def test_distribution(): + simulation_time = 5 + freqs = [5, 30, 100, 500, 1000] + noise_level = [0.05, 0.1, 0.2, 0.3] + repetitions = 1000 + for freq in freqs: + diffs_per_noise = [] + for n in noise_level: + diffs = [] + print("#### - freq:", freq, "noise level:", n ) + for reps in range(repetitions): + spikes = generate_jittered_spiketimes(freq, n, end=simulation_time) + sim_freq = hF.mean_freq_of_spiketimes_after_time_x(spikes, 0.0002, simulation_time / 4, time_in_ms=False) + diffs.append(sim_freq-freq) + + diffs_per_noise.append(diffs) + + fig, axs = plt.subplots(1, len(noise_level), figsize=(3.5*len(noise_level), 4), sharex='all') + + for i in range(len(diffs_per_noise)): + max_diff = np.max(np.abs(diffs_per_noise[i])) + print("Freq: ", freq, "noise: {:.2f}".format(noise_level[i]), "mean: {:.2f}".format(np.mean(diffs_per_noise[i])), "max_diff: {:.4f}".format(max_diff)) + bins = np.arange(-max_diff, max_diff, 2*max_diff/100) + axs[i].hist(diffs_per_noise[i], bins=bins) + axs[i].set_title('Noise level: {:.2f}'.format(noise_level[i])) + + plt.show() + plt.close() \ No newline at end of file diff --git a/unittests/testLifacNoise.py b/unittests/testLifacNoise.py new file mode 100644 index 0000000..e78d4c9 --- /dev/null +++ b/unittests/testLifacNoise.py @@ -0,0 +1,70 @@ + +import unittest +import numpy as np +import helperFunctions as hF +import matplotlib.pyplot as plt +from models.LIFACnoise import LifacNoiseModel +from stimuli.SinusAmplitudeModulation import SinusAmplitudeModulationStimulus + + +class HelperFunctionsTester(unittest.TestCase): + + # TODO specify exact parameters for all test (multiple sets?) + def setUp(self) -> None: + self.model = LifacNoiseModel() + self.model.set_variable("noise_strength", 0) + + self.step_sinus = SinusAmplitudeModulationStimulus(700, 0.5, 10, start_time=0.5, duration=0.5) + self.base_sinus = SinusAmplitudeModulationStimulus(700, 0, 0) + self.none_stimulus = SinusAmplitudeModulationStimulus(700, 0, 0, amplitude=0) + self.simulation_time = 1.5 + self.voltage_precision = 0.000001 + + def test_simulate_fast_step_sinus(self): + v1, spikes = self.model.simulate(self.step_sinus, self.simulation_time) + v_fast, spikes_fast = self.model.simulate_fast(self.step_sinus, self.simulation_time) + test = [abs(v1[i] - v_fast[i]) > self.voltage_precision for i in range(len(v1))] + + self.assertTrue(sum(test) == 0, msg="The voltage traces between the fast and slow simulation aren't equal diff: {:}".format(sum(test))) + self.assertTrue(np.array_equal(spikes, spikes_fast), msg="The spike times between the fast and slow simulation aren't equal") + + def test_simulate_fast_base_sinus(self): + v1, spikes = self.model.simulate(self.base_sinus, self.simulation_time) + v_fast, spikes_fast = self.model.simulate_fast(self.base_sinus, self.simulation_time) + + test = [abs(v1[i] - v_fast[i]) > self.voltage_precision for i in range(len(v1))] + + self.assertTrue(sum(test) == 0, msg="The voltage traces between the fast and slow simulation aren't equal diff: {:}".format(sum(test))) + self.assertTrue(np.array_equal(spikes, spikes_fast), msg="The spike times between the fast and slow simulation aren't equal") + + def test_simulate_fast_no_stimulus(self): + v1, spikes = self.model.simulate(self.none_stimulus, self.simulation_time) + v_fast, spikes_fast = self.model.simulate_fast(self.none_stimulus, self.simulation_time) + + test = [abs(v1[i] - v_fast[i]) > self.voltage_precision for i in range(len(v1))] + + self.assertTrue(sum(test) == 0, msg="The voltage traces between the fast and slow simulation aren't equal diff: {:}".format(sum(test))) + self.assertTrue(np.array_equal(spikes, spikes_fast), msg="The spike times between the fast and slow simulation aren't equal") + + def test_find_v_offset(self): + v_offsets = [50, 100, 150, 250, 1000] + threshold = 0.01 + test_model = self.model.get_model_copy() + stimulus = SinusAmplitudeModulationStimulus(700, 0, 0, amplitude=0) # no stimulus + for offset in v_offsets: + test_model.set_variable("v_offset", offset) + + _, spikes = test_model.simulate_fast(stimulus, 5) + goal_freq = hF.mean_freq_of_spiketimes_after_time_x(spikes, 0.0005, 1) + + if goal_freq <= threshold: + print("test Offset ({:.1f}) generates a too low frequency: {:.2f}".format(offset, goal_freq)) + continue + + measured_offset = self.model.find_v_offset(goal_freq, stimulus, threshold) + # print(offset, measured_offset) + self.assertTrue(abs(offset - measured_offset) < 1) + + def test_fin_v_offset_threshold_limit(self): + for threshold in [0, -0.0001, -2, -500]: + self.assertRaises(ValueError, self.model.find_v_offset, 700, self.base_sinus, threshold)