added chirp inst freq exporation
This commit is contained in:
parent
21c74b3cb0
commit
12b1fdccae
202
chirp_instantaneous_freq/filters.py
Normal file
202
chirp_instantaneous_freq/filters.py
Normal file
@ -0,0 +1,202 @@
|
||||
from scipy.signal import butter, sosfiltfilt
|
||||
from scipy.ndimage import gaussian_filter1d
|
||||
import numpy as np
|
||||
|
||||
|
||||
def instantaneous_frequency(
|
||||
signal: np.ndarray,
|
||||
samplerate: int,
|
||||
smoothing_window: int,
|
||||
) -> tuple[np.ndarray, np.ndarray]:
|
||||
"""
|
||||
Compute the instantaneous frequency of a signal that is approximately
|
||||
sinusoidal and symmetric around 0.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
signal : np.ndarray
|
||||
Signal to compute the instantaneous frequency from.
|
||||
samplerate : int
|
||||
Samplerate of the signal.
|
||||
smoothing_window : int
|
||||
Window size for the gaussian filter.
|
||||
|
||||
Returns
|
||||
-------
|
||||
tuple[np.ndarray, np.ndarray]
|
||||
|
||||
"""
|
||||
# calculate instantaneous frequency with zero crossings
|
||||
roll_signal = np.roll(signal, shift=1)
|
||||
time_signal = np.arange(len(signal)) / samplerate
|
||||
period_index = np.arange(len(signal))[(roll_signal < 0) & (signal >= 0)][
|
||||
1:-1
|
||||
]
|
||||
|
||||
upper_bound = np.abs(signal[period_index])
|
||||
lower_bound = np.abs(signal[period_index - 1])
|
||||
upper_time = np.abs(time_signal[period_index])
|
||||
lower_time = np.abs(time_signal[period_index - 1])
|
||||
|
||||
# create ratio
|
||||
lower_ratio = lower_bound / (lower_bound + upper_bound)
|
||||
|
||||
# appy to time delta
|
||||
time_delta = upper_time - lower_time
|
||||
true_zero = lower_time + lower_ratio * time_delta
|
||||
|
||||
# create new time array
|
||||
instantaneous_frequency_time = true_zero[:-1] + 0.5 * np.diff(true_zero)
|
||||
|
||||
# compute frequency
|
||||
instantaneous_frequency = gaussian_filter1d(
|
||||
1 / np.diff(true_zero), smoothing_window
|
||||
)
|
||||
|
||||
return instantaneous_frequency_time, instantaneous_frequency
|
||||
|
||||
|
||||
def inst_freq(signal, fs):
|
||||
"""
|
||||
Computes the instantaneous frequency of a periodic signal using zero-crossings.
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
signal : array-like
|
||||
The input signal.
|
||||
fs : float
|
||||
The sampling frequency of the input signal.
|
||||
|
||||
Returns:
|
||||
--------
|
||||
freq : array-like
|
||||
The instantaneous frequency of the input signal.
|
||||
"""
|
||||
# Compute the sign of the signal
|
||||
sign = np.sign(signal)
|
||||
|
||||
# Compute the crossings of the sign signal with a zero line
|
||||
crossings = np.where(np.diff(sign))[0]
|
||||
|
||||
# Compute the time differences between zero crossings
|
||||
dt = np.diff(crossings) / fs
|
||||
|
||||
# Compute the instantaneous frequency as the reciprocal of the time differences
|
||||
freq = 1 / dt
|
||||
|
||||
# Gaussian filter the signal
|
||||
freq = gaussian_filter1d(freq, 10)
|
||||
|
||||
# Pad the frequency vector with zeros to match the length of the input signal
|
||||
freq = np.pad(freq, (0, len(signal) - len(freq)))
|
||||
|
||||
return freq
|
||||
|
||||
def bandpass_filter(
|
||||
signal: np.ndarray,
|
||||
samplerate: float,
|
||||
lowf: float,
|
||||
highf: float,
|
||||
) -> np.ndarray:
|
||||
"""Bandpass filter a signal.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
signal : np.ndarray
|
||||
The data to be filtered
|
||||
rate : float
|
||||
The sampling rate
|
||||
lowf : float
|
||||
The low cutoff frequency
|
||||
highf : float
|
||||
The high cutoff frequency
|
||||
|
||||
Returns
|
||||
-------
|
||||
np.ndarray
|
||||
The filtered data
|
||||
"""
|
||||
sos = butter(2, (lowf, highf), "bandpass", fs=samplerate, output="sos")
|
||||
filtered_signal = sosfiltfilt(sos, signal)
|
||||
|
||||
return filtered_signal
|
||||
|
||||
|
||||
def highpass_filter(
|
||||
signal: np.ndarray,
|
||||
samplerate: float,
|
||||
cutoff: float,
|
||||
) -> np.ndarray:
|
||||
"""Highpass filter a signal.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
signal : np.ndarray
|
||||
The data to be filtered
|
||||
rate : float
|
||||
The sampling rate
|
||||
cutoff : float
|
||||
The cutoff frequency
|
||||
|
||||
Returns
|
||||
-------
|
||||
np.ndarray
|
||||
The filtered data
|
||||
"""
|
||||
sos = butter(2, cutoff, "highpass", fs=samplerate, output="sos")
|
||||
filtered_signal = sosfiltfilt(sos, signal)
|
||||
|
||||
return filtered_signal
|
||||
|
||||
|
||||
def lowpass_filter(
|
||||
signal: np.ndarray,
|
||||
samplerate: float,
|
||||
cutoff: float
|
||||
) -> np.ndarray:
|
||||
"""Lowpass filter a signal.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
data : np.ndarray
|
||||
The data to be filtered
|
||||
rate : float
|
||||
The sampling rate
|
||||
cutoff : float
|
||||
The cutoff frequency
|
||||
|
||||
Returns
|
||||
-------
|
||||
np.ndarray
|
||||
The filtered data
|
||||
"""
|
||||
sos = butter(2, cutoff, "lowpass", fs=samplerate, output="sos")
|
||||
filtered_signal = sosfiltfilt(sos, signal)
|
||||
|
||||
return filtered_signal
|
||||
|
||||
|
||||
def envelope(signal: np.ndarray,
|
||||
samplerate: float,
|
||||
cutoff_frequency: float
|
||||
) -> np.ndarray:
|
||||
"""Calculate the envelope of a signal using a lowpass filter.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
signal : np.ndarray
|
||||
The signal to calculate the envelope of
|
||||
samplingrate : float
|
||||
The sampling rate of the signal
|
||||
cutoff_frequency : float
|
||||
The cutoff frequency of the lowpass filter
|
||||
|
||||
Returns
|
||||
-------
|
||||
np.ndarray
|
||||
The envelope of the signal
|
||||
"""
|
||||
sos = butter(2, cutoff_frequency, "lowpass", fs=samplerate, output="sos")
|
||||
envelope = np.sqrt(2) * sosfiltfilt(sos, np.abs(signal))
|
||||
|
||||
return envelope
|
557
chirp_instantaneous_freq/fish_signal.py
Normal file
557
chirp_instantaneous_freq/fish_signal.py
Normal file
@ -0,0 +1,557 @@
|
||||
import sys
|
||||
from IPython import embed
|
||||
import thunderfish.powerspectrum as ps
|
||||
import numpy as np
|
||||
|
||||
species_name = dict(
|
||||
Sine="Sinewave",
|
||||
Alepto="Apteronotus leptorhynchus",
|
||||
Arostratus="Apteronotus rostratus",
|
||||
Eigenmannia="Eigenmannia spec.",
|
||||
Sternarchella="Sternarchella terminalis",
|
||||
Sternopygus="Sternopygus dariensis",
|
||||
)
|
||||
"""Translate species ids used by wavefish_harmonics and pulsefish_eodpeaks to full species names.
|
||||
"""
|
||||
|
||||
|
||||
def abbrv_genus(name):
|
||||
"""Abbreviate genus in a species name.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name: string
|
||||
Full species name of the form 'Genus species subspecies'
|
||||
|
||||
Returns
|
||||
-------
|
||||
name: string
|
||||
The species name with abbreviated genus, i.e. 'G. species subspecies'
|
||||
"""
|
||||
ns = name.split()
|
||||
return ns[0][0] + ". " + " ".join(ns[1:])
|
||||
|
||||
|
||||
# Amplitudes and phases of various wavefish species:
|
||||
|
||||
Sine_harmonics = dict(amplitudes=(1.0,), phases=(0.5 * np.pi,))
|
||||
|
||||
Apteronotus_leptorhynchus_harmonics = dict(
|
||||
amplitudes=(0.90062, 0.15311, 0.072049, 0.012609, 0.011708),
|
||||
phases=(1.3623, 2.3246, 0.9869, 2.6492, -2.6885),
|
||||
)
|
||||
|
||||
Apteronotus_rostratus_harmonics = dict(
|
||||
amplitudes=(
|
||||
0.64707,
|
||||
0.43874,
|
||||
0.063592,
|
||||
0.07379,
|
||||
0.040199,
|
||||
0.023073,
|
||||
0.0097678,
|
||||
),
|
||||
phases=(2.2988, 0.78876, -1.316, 2.2416, 2.0413, 1.1022, -2.0513),
|
||||
)
|
||||
|
||||
Eigenmannia_harmonics = dict(
|
||||
amplitudes=(1.0087, 0.23201, 0.060524, 0.020175, 0.010087, 0.0080699),
|
||||
phases=(1.3414, 1.3228, 2.9242, 2.8157, 2.6871, -2.8415),
|
||||
)
|
||||
|
||||
Sternarchella_terminalis_harmonics = dict(
|
||||
amplitudes=(
|
||||
0.11457,
|
||||
0.4401,
|
||||
0.41055,
|
||||
0.20132,
|
||||
0.061364,
|
||||
0.011389,
|
||||
0.0057985,
|
||||
),
|
||||
phases=(-2.7106, 2.4472, 1.6829, 0.79085, 0.119, -0.82355, -1.9956),
|
||||
)
|
||||
|
||||
Sternopygus_dariensis_harmonics = dict(
|
||||
amplitudes=(
|
||||
0.98843,
|
||||
0.41228,
|
||||
0.047848,
|
||||
0.11048,
|
||||
0.022801,
|
||||
0.030706,
|
||||
0.019018,
|
||||
),
|
||||
phases=(1.4153, 1.3141, 3.1062, -2.3961, -1.9524, 0.54321, 1.6844),
|
||||
)
|
||||
|
||||
wavefish_harmonics = dict(
|
||||
Sine=Sine_harmonics,
|
||||
Alepto=Apteronotus_leptorhynchus_harmonics,
|
||||
Arostratus=Apteronotus_rostratus_harmonics,
|
||||
Eigenmannia=Eigenmannia_harmonics,
|
||||
Sternarchella=Sternarchella_terminalis_harmonics,
|
||||
Sternopygus=Sternopygus_dariensis_harmonics,
|
||||
)
|
||||
"""Amplitudes and phases of EOD waveforms of various species of wave-type electric fish."""
|
||||
|
||||
|
||||
def wavefish_spectrum(fish):
|
||||
"""Amplitudes and phases of a wavefish EOD.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
fish: string, dict or tuple of lists/arrays
|
||||
Specify relative amplitudes and phases of the fundamental and its harmonics.
|
||||
If string then take amplitudes and phases from the `wavefish_harmonics` dictionary.
|
||||
If dictionary then take amplitudes and phases from the 'amlitudes' and 'phases' keys.
|
||||
If tuple then the first element is the list of amplitudes and
|
||||
the second one the list of relative phases in radians.
|
||||
|
||||
Returns
|
||||
-------
|
||||
amplitudes: array of floats
|
||||
Amplitudes of the fundamental and its harmonics.
|
||||
phases: array of floats
|
||||
Phases in radians of the fundamental and its harmonics.
|
||||
|
||||
Raises
|
||||
------
|
||||
KeyError:
|
||||
Unknown fish.
|
||||
IndexError:
|
||||
Amplitudes and phases differ in length.
|
||||
"""
|
||||
if isinstance(fish, (tuple, list)):
|
||||
amplitudes = fish[0]
|
||||
phases = fish[1]
|
||||
elif isinstance(fish, dict):
|
||||
amplitudes = fish["amplitudes"]
|
||||
phases = fish["phases"]
|
||||
else:
|
||||
if fish not in wavefish_harmonics:
|
||||
raise KeyError(
|
||||
"unknown wavefish. Choose one of "
|
||||
+ ", ".join(wavefish_harmonics.keys())
|
||||
)
|
||||
amplitudes = wavefish_harmonics[fish]["amplitudes"]
|
||||
phases = wavefish_harmonics[fish]["phases"]
|
||||
if len(amplitudes) != len(phases):
|
||||
raise IndexError("need exactly as many phases as amplitudes")
|
||||
# remove NaNs:
|
||||
for k in reversed(range(len(amplitudes))):
|
||||
if np.isfinite(amplitudes[k]) or np.isfinite(phases[k]):
|
||||
amplitudes = amplitudes[: k + 1]
|
||||
phases = phases[: k + 1]
|
||||
break
|
||||
return amplitudes, phases
|
||||
|
||||
|
||||
def wavefish_eods(
|
||||
fish="Eigenmannia",
|
||||
frequency=100.0,
|
||||
samplerate=44100.0,
|
||||
duration=1.0,
|
||||
phase0=0.0,
|
||||
noise_std=0.05,
|
||||
):
|
||||
"""Simulate EOD waveform of a wave-type fish.
|
||||
|
||||
The waveform is constructed by superimposing sinewaves of integral
|
||||
multiples of the fundamental frequency - the fundamental and its
|
||||
harmonics. The fundamental frequency of the EOD (EODf) is given by
|
||||
`frequency`. With `fish` relative amplitudes and phases of the
|
||||
fundamental and its harmonics are specified.
|
||||
|
||||
The generated waveform is `duration` seconds long and is sampled with
|
||||
`samplerate` Hertz. Gaussian white noise with a standard deviation of
|
||||
`noise_std` is added to the generated waveform.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
fish: string, dict or tuple of lists/arrays
|
||||
Specify relative amplitudes and phases of the fundamental and its harmonics.
|
||||
If string then take amplitudes and phases from the `wavefish_harmonics` dictionary.
|
||||
If dictionary then take amplitudes and phases from the 'amlitudes' and 'phases' keys.
|
||||
If tuple then the first element is the list of amplitudes and
|
||||
the second one the list of relative phases in radians.
|
||||
frequency: float or array of floats
|
||||
EOD frequency of the fish in Hertz. Either fixed number or array for
|
||||
time-dependent frequencies.
|
||||
samplerate: float
|
||||
Sampling rate in Hertz.
|
||||
duration: float
|
||||
Duration of the generated data in seconds. Only used if frequency is scalar.
|
||||
phase0: float
|
||||
Phase offset of the EOD waveform in radians.
|
||||
noise_std: float
|
||||
Standard deviation of additive Gaussian white noise.
|
||||
|
||||
Returns
|
||||
-------
|
||||
data: array of floats
|
||||
Generated data of a wave-type fish.
|
||||
|
||||
Raises
|
||||
------
|
||||
KeyError:
|
||||
Unknown fish.
|
||||
IndexError:
|
||||
Amplitudes and phases differ in length.
|
||||
"""
|
||||
# get relative amplitude and phases:
|
||||
amplitudes, phases = wavefish_spectrum(fish)
|
||||
# compute phase:
|
||||
if np.isscalar(frequency):
|
||||
phase = np.arange(0, duration, 1.0 / samplerate)
|
||||
phase *= frequency
|
||||
else:
|
||||
phase = np.cumsum(frequency) / samplerate
|
||||
# generate EOD:
|
||||
data = np.zeros(len(phase))
|
||||
for har, (ampl, phi) in enumerate(zip(amplitudes, phases)):
|
||||
if np.isfinite(ampl) and np.isfinite(phi):
|
||||
data += ampl * np.sin(
|
||||
2 * np.pi * (har + 1) * phase + phi - (har + 1) * phase0
|
||||
)
|
||||
# add noise:
|
||||
data += noise_std * np.random.randn(len(data))
|
||||
return data
|
||||
|
||||
|
||||
def normalize_wavefish(fish, mode="peak"):
|
||||
"""Normalize amplitudes and phases of wave-type EOD waveform.
|
||||
|
||||
The amplitudes and phases of the Fourier components are adjusted
|
||||
such that the resulting EOD waveform has a peak-to-peak amplitude
|
||||
of two and the peak of the waveform is at time zero (mode is set
|
||||
to 'peak') or that the fundamental has an amplitude of one and a
|
||||
phase of 0 (mode is set to 'zero').
|
||||
|
||||
Parameters
|
||||
----------
|
||||
fish: string, dict or tuple of lists/arrays
|
||||
Specify relative amplitudes and phases of the fundamental and its harmonics.
|
||||
If string then take amplitudes and phases from the `wavefish_harmonics` dictionary.
|
||||
If dictionary then take amplitudes and phases from the 'amlitudes' and 'phases' keys.
|
||||
If tuple then the first element is the list of amplitudes and
|
||||
the second one the list of relative phases in radians.
|
||||
mode: 'peak' or 'zero'
|
||||
How to normalize amplitude and phases:
|
||||
- 'peak': normalize waveform to a peak-to-peak amplitude of two
|
||||
and shift it such that its peak is at time zero.
|
||||
- 'zero': scale amplitude of fundamental to one and its phase to zero.
|
||||
|
||||
Returns
|
||||
-------
|
||||
amplitudes: array of floats
|
||||
Adjusted amplitudes of the fundamental and its harmonics.
|
||||
phases: array of floats
|
||||
Adjusted phases in radians of the fundamental and its harmonics.
|
||||
|
||||
"""
|
||||
# get relative amplitude and phases:
|
||||
amplitudes, phases = wavefish_spectrum(fish)
|
||||
if mode == "zero":
|
||||
newamplitudes = np.array(amplitudes) / amplitudes[0]
|
||||
newphases = np.array(
|
||||
[p + (k + 1) * (-phases[0]) for k, p in enumerate(phases)]
|
||||
)
|
||||
newphases %= 2.0 * np.pi
|
||||
newphases[newphases > np.pi] -= 2.0 * np.pi
|
||||
return newamplitudes, newphases
|
||||
else:
|
||||
# generate waveform:
|
||||
eodf = 100.0
|
||||
rate = 100000.0
|
||||
data = wavefish_eods(fish, eodf, rate, 2.0 / eodf, noise_std=0.0)
|
||||
# normalize amplitudes:
|
||||
ampl = 0.5 * (np.max(data) - np.min(data))
|
||||
newamplitudes = np.array(amplitudes) / ampl
|
||||
# shift phases:
|
||||
deltat = np.argmax(data[: int(rate / eodf)]) / rate
|
||||
deltap = 2.0 * np.pi * deltat * eodf
|
||||
newphases = np.array(
|
||||
[p + (k + 1) * deltap for k, p in enumerate(phases)]
|
||||
)
|
||||
newphases %= 2.0 * np.pi
|
||||
newphases[newphases > np.pi] -= 2.0 * np.pi
|
||||
# return:
|
||||
return newamplitudes, newphases
|
||||
|
||||
|
||||
def export_wavefish(fish, name="Unknown_harmonics", file=None):
|
||||
"""Serialize wavefish parameter to python code.
|
||||
|
||||
Add output to the wavefish_harmonics dictionary!
|
||||
|
||||
Parameters
|
||||
----------
|
||||
fish: string, dict or tuple of lists/arrays
|
||||
Specify relative amplitudes and phases of the fundamental and its harmonics.
|
||||
If string then take amplitudes and phases from the `wavefish_harmonics` dictionary.
|
||||
If dictionary then take amplitudes and phases from the 'amlitudes' and 'phases' keys.
|
||||
If tuple then the first element is the list of amplitudes and
|
||||
the second one the list of relative phases in radians.
|
||||
name: string
|
||||
Name of the dictionary to be written.
|
||||
file: string or file or None
|
||||
File name or open file object where to write wavefish dictionary.
|
||||
|
||||
Returns
|
||||
-------
|
||||
fish: dict
|
||||
Dictionary with amplitudes and phases.
|
||||
"""
|
||||
# get relative amplitude and phases:
|
||||
amplitudes, phases = wavefish_spectrum(fish)
|
||||
# write out dictionary:
|
||||
if file is None:
|
||||
file = sys.stdout
|
||||
try:
|
||||
file.write("")
|
||||
closeit = False
|
||||
except AttributeError:
|
||||
file = open(file, "w")
|
||||
closeit = True
|
||||
n = 6
|
||||
file.write(name + " = \\\n")
|
||||
file.write(" dict(amplitudes=(")
|
||||
file.write(", ".join(["%.5g" % a for a in amplitudes[:n]]))
|
||||
for k in range(n, len(amplitudes), n):
|
||||
file.write(",\n")
|
||||
file.write(" " * (9 + 12))
|
||||
file.write(", ".join(["%.5g" % a for a in amplitudes[k : k + n]]))
|
||||
file.write("),\n")
|
||||
file.write(" " * 9 + "phases=(")
|
||||
file.write(", ".join(["%.5g" % p for p in phases[:n]]))
|
||||
for k in range(n, len(phases), n):
|
||||
file.write(",\n")
|
||||
file.write(" " * (9 + 8))
|
||||
file.write(", ".join(["%.5g" % p for p in phases[k : k + n]]))
|
||||
file.write("))\n")
|
||||
if closeit:
|
||||
file.close()
|
||||
# return dictionary:
|
||||
harmonics = dict(amplitudes=amplitudes, phases=phases)
|
||||
return harmonics
|
||||
|
||||
|
||||
def chirps(
|
||||
eodf=100.0,
|
||||
samplerate=44100.0,
|
||||
duration=1.0,
|
||||
chirp_times=[0.5],
|
||||
chirp_size=[100.0],
|
||||
chirp_width=[0.01],
|
||||
chirp_kurtosis=[1.0],
|
||||
chirp_contrast=[0.05],
|
||||
):
|
||||
"""Simulate frequency trace with chirps.
|
||||
|
||||
A chirp is modeled as a Gaussian frequency modulation.
|
||||
The first chirp is placed at 0.5/chirp_freq.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
eodf: float
|
||||
EOD frequency of the fish in Hertz.
|
||||
samplerate: float
|
||||
Sampling rate in Hertz.
|
||||
duration: float
|
||||
Duration of the generated data in seconds.
|
||||
chirp_times: float
|
||||
Timestamps of every single chirp in seconds.
|
||||
chirp_size: list
|
||||
Size of each chirp (maximum frequency increase above eodf) in Hertz.
|
||||
chirp_width: list
|
||||
Width of every single chirp at 10% height in seconds.
|
||||
chirp_kurtosis: list:
|
||||
Shape of every single chirp. =1: Gaussian, >1: more rectangular, <1: more peaked.
|
||||
chirp_contrast: float
|
||||
Maximum amplitude reduction of EOD during every respective chirp.
|
||||
|
||||
Returns
|
||||
-------
|
||||
frequency: array of floats
|
||||
Generated frequency trace that can be passed on to wavefish_eods().
|
||||
amplitude: array of floats
|
||||
Generated amplitude modulation that can be used to multiply the trace generated by
|
||||
wavefish_eods().
|
||||
"""
|
||||
# baseline eod frequency and amplitude modulation:
|
||||
n = len(np.arange(0, duration, 1.0 / samplerate))
|
||||
frequency = eodf * np.ones(n)
|
||||
am = np.ones(n)
|
||||
|
||||
for time, width, size, kurtosis, contrast in zip(chirp_times, chirp_width, chirp_size, chirp_kurtosis, chirp_contrast):
|
||||
|
||||
# chirp frequency waveform:
|
||||
chirp_t = np.arange(-2.0 * width, 2.0 * width, 1.0 / samplerate)
|
||||
chirp_sig = (
|
||||
0.5 * width / (2.0 * np.log(10.0)) ** (0.5 / kurtosis)
|
||||
)
|
||||
gauss = np.exp(-0.5 * ((chirp_t / chirp_sig) ** 2.0) ** kurtosis)
|
||||
|
||||
|
||||
# add chirps on baseline eodf:
|
||||
index = int(time * samplerate)
|
||||
i0 = index - len(gauss) // 2
|
||||
i1 = i0 + len(gauss)
|
||||
gi0 = 0
|
||||
gi1 = len(gauss)
|
||||
if i0 < 0:
|
||||
gi0 -= i0
|
||||
i0 = 0
|
||||
if i1 >= len(frequency):
|
||||
gi1 -= i1 - len(frequency)
|
||||
i1 = len(frequency)
|
||||
frequency[i0:i1] += size * gauss[gi0:gi1]
|
||||
am[i0:i1] -= contrast * gauss[gi0:gi1]
|
||||
|
||||
return frequency, am
|
||||
|
||||
|
||||
def rises(
|
||||
eodf,
|
||||
samplerate,
|
||||
duration,
|
||||
rise_times,
|
||||
rise_size,
|
||||
rise_tau,
|
||||
decay_tau,
|
||||
):
|
||||
"""Simulate frequency trace with rises.
|
||||
|
||||
A rise is modeled as a double exponential frequency modulation.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
eodf: float
|
||||
EOD frequency of the fish in Hertz.
|
||||
samplerate: float
|
||||
Sampling rate in Hertz.
|
||||
duration: float
|
||||
Duration of the generated data in seconds.
|
||||
rise_times: list
|
||||
Timestamp of each of the rises in seconds.
|
||||
rise_size: list
|
||||
Size of the respective rise (frequency increase above eodf) in Hertz.
|
||||
rise_tau: list
|
||||
Time constant of the frequency increase of the respective rise in seconds.
|
||||
decay_tau: list
|
||||
Time constant of the frequency decay of the respective rise in seconds.
|
||||
|
||||
Returns
|
||||
-------
|
||||
data: array of floats
|
||||
Generate frequency trace that can be passed on to wavefish_eods().
|
||||
"""
|
||||
n = len(np.arange(0, duration, 1.0 / samplerate))
|
||||
|
||||
# baseline eod frequency:
|
||||
frequency = eodf * np.ones(n)
|
||||
|
||||
for time, size, riset, decayt in zip(rise_times, rise_size, rise_tau, decay_tau):
|
||||
|
||||
# rise frequency waveform:
|
||||
rise_t = np.arange(0.0, 5.0 * decayt, 1.0 / samplerate)
|
||||
rise = (
|
||||
size
|
||||
* (1.0 - np.exp(-rise_t / riset))
|
||||
* np.exp(-rise_t / decayt)
|
||||
)
|
||||
|
||||
# add rises on baseline eodf:
|
||||
index = int(time * samplerate)
|
||||
if index + len(rise) > len(frequency):
|
||||
rise_index = len(frequency) - index
|
||||
frequency[index : index + rise_index] += rise[:rise_index]
|
||||
break
|
||||
else:
|
||||
frequency[index : index + len(rise)] += rise
|
||||
return frequency
|
||||
|
||||
class FishSignal:
|
||||
def __init__(self, samplerate, duration, eodf, nchirps, nrises):
|
||||
time = np.arange(0, duration, 1 / samplerate)
|
||||
chirp_times = np.random.uniform(0, duration, nchirps)
|
||||
rise_times = np.random.uniform(0, duration, nrises)
|
||||
|
||||
# pick random parameters for chirps
|
||||
chirp_size = np.random.uniform(60, 200, nchirps)
|
||||
chirp_width = np.random.uniform(0.01, 0.1, nchirps)
|
||||
chirp_kurtosis = np.random.uniform(1, 1, nchirps)
|
||||
chirp_contrast = np.random.uniform(0.1, 0.5, nchirps)
|
||||
|
||||
# pick random parameters for rises
|
||||
rise_size = np.random.uniform(10, 100, nrises)
|
||||
rise_tau = np.random.uniform(0.5, 1.5, nrises)
|
||||
decay_tau = np.random.uniform(5, 15, nrises)
|
||||
|
||||
# generate frequency trace with chirps
|
||||
chirp_trace, chirp_amp = chirps(
|
||||
eodf=0.0,
|
||||
samplerate=samplerate,
|
||||
duration=duration,
|
||||
chirp_times=chirp_times,
|
||||
chirp_size=chirp_size,
|
||||
chirp_width=chirp_width,
|
||||
chirp_kurtosis=chirp_kurtosis,
|
||||
chirp_contrast=chirp_contrast,
|
||||
)
|
||||
|
||||
# generate frequency trace with rises
|
||||
rise_trace = rises(
|
||||
eodf=0.0,
|
||||
samplerate=samplerate,
|
||||
duration=duration,
|
||||
rise_times=rise_times,
|
||||
rise_size=rise_size,
|
||||
rise_tau=rise_tau,
|
||||
decay_tau=decay_tau,
|
||||
)
|
||||
|
||||
# combine traces to one
|
||||
full_trace = chirp_trace + rise_trace + eodf
|
||||
|
||||
# make the EOD from the frequency trace
|
||||
fish = wavefish_eods(
|
||||
fish="Alepto",
|
||||
frequency=full_trace,
|
||||
samplerate=samplerate,
|
||||
duration=duration,
|
||||
phase0=0.0,
|
||||
noise_std=0.05,
|
||||
)
|
||||
|
||||
signal = fish * chirp_amp
|
||||
|
||||
self.signal = signal
|
||||
self.trace = full_trace
|
||||
self.time = time
|
||||
self.samplerate = samplerate
|
||||
self.eodf = eodf
|
||||
|
||||
def visualize(self):
|
||||
|
||||
spec, freqs, spectime = ps.spectrogram(
|
||||
data=self.signal,
|
||||
ratetime=self.samplerate,
|
||||
freq_resolution=0.5,
|
||||
overlap_frac=0.5,
|
||||
)
|
||||
|
||||
fig, (ax1, ax2) = plt.subplots(2, 1, height_ratios=[1, 4], sharex=True)
|
||||
|
||||
ax1.plot(self.time, self.signal)
|
||||
ax1.set_ylabel("Amplitude")
|
||||
ax1.set_xlabel("Time (s)")
|
||||
ax1.set_title("EOD signal")
|
||||
|
||||
ax2.imshow(ps.decibel(spec), origin='lower', aspect="auto", extent=[spectime[0], spectime[-1], freqs[0], freqs[-1]])
|
||||
ax2.set_ylabel("Frequency (Hz)")
|
||||
ax2.set_xlabel("Time (s)")
|
||||
ax2.set_title("Spectrogram")
|
||||
ax2.set_ylim(0, 2000)
|
||||
plt.show()
|
118
chirp_instantaneous_freq/test_parameters.py
Normal file
118
chirp_instantaneous_freq/test_parameters.py
Normal file
@ -0,0 +1,118 @@
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
from fish_signal import chirps, wavefish_eods
|
||||
from filters import bandpass_filter, instantaneous_frequency, inst_freq
|
||||
from IPython import embed
|
||||
|
||||
|
||||
def switch_test(test, defaultparams, testparams):
|
||||
if test == 'width':
|
||||
defaultparams['chirp_width'] = testparams['chirp_width']
|
||||
key = 'chirp_width'
|
||||
elif test == 'size':
|
||||
defaultparams['chirp_size'] = testparams['chirp_size']
|
||||
key = 'chirp_size'
|
||||
elif test == 'kurtosis':
|
||||
defaultparams['chirp_kurtosis'] = testparams['chirp_kurtosis']
|
||||
key = 'chirp_kurtosis'
|
||||
elif test == 'contrast':
|
||||
defaultparams['chirp_contrast'] = testparams['chirp_contrast']
|
||||
key = 'chirp_contrast'
|
||||
else:
|
||||
raise ValueError("Test not recognized")
|
||||
|
||||
return key, defaultparams
|
||||
|
||||
|
||||
def extract_dict(dict, index):
|
||||
return {key: value[index] for key, value in dict.items()}
|
||||
|
||||
|
||||
def main(test1, test2, resolution=10):
|
||||
|
||||
assert test1 in ['width', 'size', 'kurtosis', 'contrast'], "Test1 not recognized"
|
||||
assert test2 in ['width', 'size', 'kurtosis', 'contrast'], "Test2 not recognized"
|
||||
|
||||
# Define the parameters for the chirp simulations
|
||||
ntest = resolution
|
||||
|
||||
defaultparams = dict(
|
||||
chirp_size = np.ones(ntest) * 100,
|
||||
chirp_width = np.ones(ntest) * 0.1,
|
||||
chirp_kurtosis = np.ones(ntest) * 1.0,
|
||||
chirp_contrast = np.ones(ntest) * 0.5,
|
||||
)
|
||||
|
||||
testparams = dict(
|
||||
chirp_width = np.linspace(0.01, 0.2, ntest),
|
||||
chirp_size = np.linspace(50, 300, ntest),
|
||||
chirp_kurtosis = np.linspace(0.5, 1.5, ntest),
|
||||
chirp_contrast = np.linspace(0.01, 1.0, ntest),
|
||||
)
|
||||
|
||||
key1, chirp_params = switch_test(test1, defaultparams, testparams)
|
||||
key2, chirp_params = switch_test(test2, chirp_params, testparams)
|
||||
|
||||
# make the chirp trace
|
||||
eodf = 500
|
||||
samplerate = 20000
|
||||
duration = 2
|
||||
chirp_times = [0.5, 1, 1.5]
|
||||
|
||||
wide_cutoffs = 200
|
||||
tight_cutoffs = 10
|
||||
|
||||
distances = np.full((ntest, ntest), np.nan)
|
||||
|
||||
fig, axs = plt.subplots(ntest, ntest, figsize = (10, 10), sharex = True, sharey = True)
|
||||
axs = axs.flatten()
|
||||
|
||||
iter0 = 0
|
||||
for iter1, test1_param in enumerate(chirp_params[key1]):
|
||||
for iter2, test2_param in enumerate(chirp_params[key2]):
|
||||
|
||||
# get the chirp parameters for the current test
|
||||
inner_chirp_params = extract_dict(chirp_params, iter2)
|
||||
inner_chirp_params[key1] = test1_param
|
||||
inner_chirp_params[key2] = test2_param
|
||||
|
||||
# make the chirp trace for the current chirp parameters
|
||||
sizes = np.ones(len(chirp_times)) * inner_chirp_params['chirp_size']
|
||||
widths = np.ones(len(chirp_times)) * inner_chirp_params['chirp_width']
|
||||
kurtosis = np.ones(len(chirp_times)) * inner_chirp_params['chirp_kurtosis']
|
||||
contrast = np.ones(len(chirp_times)) * inner_chirp_params['chirp_contrast']
|
||||
|
||||
# make the chirp trace
|
||||
chirp_trace, ampmod = chirps(eodf, samplerate, duration, chirp_times, sizes, widths, kurtosis, contrast)
|
||||
signal = wavefish_eods(
|
||||
fish="Alepto",
|
||||
frequency=chirp_trace,
|
||||
samplerate=samplerate,
|
||||
duration=duration,
|
||||
phase0=0.0,
|
||||
noise_std=0.05
|
||||
)
|
||||
signal = signal * ampmod
|
||||
|
||||
# apply broadband filter
|
||||
wide_signal = bandpass_filter(signal, samplerate, eodf - wide_cutoffs, eodf + wide_cutoffs)
|
||||
tight_signal = bandpass_filter(signal, samplerate, eodf - tight_cutoffs, eodf + tight_cutoffs)
|
||||
|
||||
# get the instantaneous frequency
|
||||
wide_frequency = inst_freq(wide_signal, samplerate)
|
||||
tight_frequency = inst_freq(tight_signal, samplerate)
|
||||
|
||||
bool_mask = wide_frequency != 0
|
||||
axs[iter0].plot(wide_frequency[bool_mask])
|
||||
axs[iter0].plot(tight_frequency[bool_mask])
|
||||
fig.supylabel(key1)
|
||||
fig.supxlabel(key2)
|
||||
|
||||
iter0 += 1
|
||||
|
||||
fig, ax = plt.subplots()
|
||||
ax.imshow(distances, cmap = 'jet')
|
||||
plt.show()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main('width', 'size')
|
@ -1,4 +1,3 @@
|
||||
audioio==0.10.0
|
||||
cmocean==3.0.3
|
||||
cycler==0.11.0
|
||||
ipython==8.12.0
|
||||
@ -9,5 +8,4 @@ paramiko==3.1.0
|
||||
PyYAML==6.0
|
||||
scipy==1.10.1
|
||||
scp==0.14.5
|
||||
thunderfish==1.9.10
|
||||
tqdm==4.65.0
|
||||
|
Loading…
Reference in New Issue
Block a user