reformat
This commit is contained in:
@@ -59,14 +59,14 @@ def instantaneous_frequency(
|
||||
def inst_freq(signal, fs):
|
||||
"""
|
||||
Computes the instantaneous frequency of a periodic signal using zero-crossings.
|
||||
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
signal : array-like
|
||||
The input signal.
|
||||
fs : float
|
||||
The sampling frequency of the input signal.
|
||||
|
||||
|
||||
Returns:
|
||||
--------
|
||||
freq : array-like
|
||||
@@ -74,29 +74,30 @@ def inst_freq(signal, fs):
|
||||
"""
|
||||
# Compute the sign of the signal
|
||||
sign = np.sign(signal)
|
||||
|
||||
|
||||
# Compute the crossings of the sign signal with a zero line
|
||||
crossings = np.where(np.diff(sign))[0]
|
||||
|
||||
|
||||
# Compute the time differences between zero crossings
|
||||
dt = np.diff(crossings) / fs
|
||||
|
||||
|
||||
# Compute the instantaneous frequency as the reciprocal of the time differences
|
||||
freq = 1 / dt
|
||||
|
||||
# Gaussian filter the signal
|
||||
# Gaussian filter the signal
|
||||
freq = gaussian_filter1d(freq, 10)
|
||||
|
||||
|
||||
# Pad the frequency vector with zeros to match the length of the input signal
|
||||
freq = np.pad(freq, (0, len(signal) - len(freq)))
|
||||
|
||||
|
||||
return freq
|
||||
|
||||
|
||||
def bandpass_filter(
|
||||
signal: np.ndarray,
|
||||
samplerate: float,
|
||||
lowf: float,
|
||||
highf: float,
|
||||
signal: np.ndarray,
|
||||
samplerate: float,
|
||||
lowf: float,
|
||||
highf: float,
|
||||
) -> np.ndarray:
|
||||
"""Bandpass filter a signal.
|
||||
|
||||
@@ -150,9 +151,7 @@ def highpass_filter(
|
||||
|
||||
|
||||
def lowpass_filter(
|
||||
signal: np.ndarray,
|
||||
samplerate: float,
|
||||
cutoff: float
|
||||
signal: np.ndarray, samplerate: float, cutoff: float
|
||||
) -> np.ndarray:
|
||||
"""Lowpass filter a signal.
|
||||
|
||||
@@ -176,10 +175,9 @@ def lowpass_filter(
|
||||
return filtered_signal
|
||||
|
||||
|
||||
def envelope(signal: np.ndarray,
|
||||
samplerate: float,
|
||||
cutoff_frequency: float
|
||||
) -> np.ndarray:
|
||||
def envelope(
|
||||
signal: np.ndarray, samplerate: float, cutoff_frequency: float
|
||||
) -> np.ndarray:
|
||||
"""Calculate the envelope of a signal using a lowpass filter.
|
||||
|
||||
Parameters
|
||||
|
||||
@@ -384,16 +384,14 @@ def chirps(
|
||||
frequency = eodf * np.ones(n)
|
||||
am = np.ones(n)
|
||||
|
||||
for time, width, size, kurtosis, contrast in zip(chirp_times, chirp_width, chirp_size, chirp_kurtosis, chirp_contrast):
|
||||
|
||||
for time, width, size, kurtosis, contrast in zip(
|
||||
chirp_times, chirp_width, chirp_size, chirp_kurtosis, chirp_contrast
|
||||
):
|
||||
# chirp frequency waveform:
|
||||
chirp_t = np.arange(-2.0 * width, 2.0 * width, 1.0 / samplerate)
|
||||
chirp_sig = (
|
||||
0.5 * width / (2.0 * np.log(10.0)) ** (0.5 / kurtosis)
|
||||
)
|
||||
chirp_sig = 0.5 * width / (2.0 * np.log(10.0)) ** (0.5 / kurtosis)
|
||||
gauss = np.exp(-0.5 * ((chirp_t / chirp_sig) ** 2.0) ** kurtosis)
|
||||
|
||||
|
||||
# add chirps on baseline eodf:
|
||||
index = int(time * samplerate)
|
||||
i0 = index - len(gauss) // 2
|
||||
@@ -433,7 +431,7 @@ def rises(
|
||||
Sampling rate in Hertz.
|
||||
duration: float
|
||||
Duration of the generated data in seconds.
|
||||
rise_times: list
|
||||
rise_times: list
|
||||
Timestamp of each of the rises in seconds.
|
||||
rise_size: list
|
||||
Size of the respective rise (frequency increase above eodf) in Hertz.
|
||||
@@ -452,15 +450,12 @@ def rises(
|
||||
# baseline eod frequency:
|
||||
frequency = eodf * np.ones(n)
|
||||
|
||||
for time, size, riset, decayt in zip(rise_times, rise_size, rise_tau, decay_tau):
|
||||
|
||||
for time, size, riset, decayt in zip(
|
||||
rise_times, rise_size, rise_tau, decay_tau
|
||||
):
|
||||
# rise frequency waveform:
|
||||
rise_t = np.arange(0.0, 5.0 * decayt, 1.0 / samplerate)
|
||||
rise = (
|
||||
size
|
||||
* (1.0 - np.exp(-rise_t / riset))
|
||||
* np.exp(-rise_t / decayt)
|
||||
)
|
||||
rise = size * (1.0 - np.exp(-rise_t / riset)) * np.exp(-rise_t / decayt)
|
||||
|
||||
# add rises on baseline eodf:
|
||||
index = int(time * samplerate)
|
||||
@@ -472,13 +467,14 @@ def rises(
|
||||
frequency[index : index + len(rise)] += rise
|
||||
return frequency
|
||||
|
||||
|
||||
class FishSignal:
|
||||
def __init__(self, samplerate, duration, eodf, nchirps, nrises):
|
||||
time = np.arange(0, duration, 1 / samplerate)
|
||||
chirp_times = np.random.uniform(0, duration, nchirps)
|
||||
rise_times = np.random.uniform(0, duration, nrises)
|
||||
|
||||
# pick random parameters for chirps
|
||||
# pick random parameters for chirps
|
||||
chirp_size = np.random.uniform(60, 200, nchirps)
|
||||
chirp_width = np.random.uniform(0.01, 0.1, nchirps)
|
||||
chirp_kurtosis = np.random.uniform(1, 1, nchirps)
|
||||
@@ -534,7 +530,6 @@ class FishSignal:
|
||||
self.eodf = eodf
|
||||
|
||||
def visualize(self):
|
||||
|
||||
spec, freqs, spectime = ps.spectrogram(
|
||||
data=self.signal,
|
||||
ratetime=self.samplerate,
|
||||
@@ -549,7 +544,12 @@ class FishSignal:
|
||||
ax1.set_xlabel("Time (s)")
|
||||
ax1.set_title("EOD signal")
|
||||
|
||||
ax2.imshow(ps.decibel(spec), origin='lower', aspect="auto", extent=[spectime[0], spectime[-1], freqs[0], freqs[-1]])
|
||||
ax2.imshow(
|
||||
ps.decibel(spec),
|
||||
origin="lower",
|
||||
aspect="auto",
|
||||
extent=[spectime[0], spectime[-1], freqs[0], freqs[-1]],
|
||||
)
|
||||
ax2.set_ylabel("Frequency (Hz)")
|
||||
ax2.set_xlabel("Time (s)")
|
||||
ax2.set_title("Spectrogram")
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import numpy as np
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
from fish_signal import chirps, wavefish_eods
|
||||
from filters import bandpass_filter, instantaneous_frequency, inst_freq
|
||||
@@ -6,18 +6,18 @@ from IPython import embed
|
||||
|
||||
|
||||
def switch_test(test, defaultparams, testparams):
|
||||
if test == 'width':
|
||||
defaultparams['chirp_width'] = testparams['chirp_width']
|
||||
key = 'chirp_width'
|
||||
elif test == 'size':
|
||||
defaultparams['chirp_size'] = testparams['chirp_size']
|
||||
key = 'chirp_size'
|
||||
elif test == 'kurtosis':
|
||||
defaultparams['chirp_kurtosis'] = testparams['chirp_kurtosis']
|
||||
key = 'chirp_kurtosis'
|
||||
elif test == 'contrast':
|
||||
defaultparams['chirp_contrast'] = testparams['chirp_contrast']
|
||||
key = 'chirp_contrast'
|
||||
if test == "width":
|
||||
defaultparams["chirp_width"] = testparams["chirp_width"]
|
||||
key = "chirp_width"
|
||||
elif test == "size":
|
||||
defaultparams["chirp_size"] = testparams["chirp_size"]
|
||||
key = "chirp_size"
|
||||
elif test == "kurtosis":
|
||||
defaultparams["chirp_kurtosis"] = testparams["chirp_kurtosis"]
|
||||
key = "chirp_kurtosis"
|
||||
elif test == "contrast":
|
||||
defaultparams["chirp_contrast"] = testparams["chirp_contrast"]
|
||||
key = "chirp_contrast"
|
||||
else:
|
||||
raise ValueError("Test not recognized")
|
||||
|
||||
@@ -29,31 +29,40 @@ def extract_dict(dict, index):
|
||||
|
||||
|
||||
def main(test1, test2, resolution=10):
|
||||
assert test1 in [
|
||||
"width",
|
||||
"size",
|
||||
"kurtosis",
|
||||
"contrast",
|
||||
], "Test1 not recognized"
|
||||
assert test2 in [
|
||||
"width",
|
||||
"size",
|
||||
"kurtosis",
|
||||
"contrast",
|
||||
], "Test2 not recognized"
|
||||
|
||||
assert test1 in ['width', 'size', 'kurtosis', 'contrast'], "Test1 not recognized"
|
||||
assert test2 in ['width', 'size', 'kurtosis', 'contrast'], "Test2 not recognized"
|
||||
|
||||
# Define the parameters for the chirp simulations
|
||||
# Define the parameters for the chirp simulations
|
||||
ntest = resolution
|
||||
|
||||
defaultparams = dict(
|
||||
chirp_size = np.ones(ntest) * 100,
|
||||
chirp_width = np.ones(ntest) * 0.1,
|
||||
chirp_kurtosis = np.ones(ntest) * 1.0,
|
||||
chirp_contrast = np.ones(ntest) * 0.5,
|
||||
chirp_size=np.ones(ntest) * 100,
|
||||
chirp_width=np.ones(ntest) * 0.1,
|
||||
chirp_kurtosis=np.ones(ntest) * 1.0,
|
||||
chirp_contrast=np.ones(ntest) * 0.5,
|
||||
)
|
||||
|
||||
testparams = dict(
|
||||
chirp_width = np.linspace(0.01, 0.2, ntest),
|
||||
chirp_size = np.linspace(50, 300, ntest),
|
||||
chirp_kurtosis = np.linspace(0.5, 1.5, ntest),
|
||||
chirp_contrast = np.linspace(0.01, 1.0, ntest),
|
||||
chirp_width=np.linspace(0.01, 0.2, ntest),
|
||||
chirp_size=np.linspace(50, 300, ntest),
|
||||
chirp_kurtosis=np.linspace(0.5, 1.5, ntest),
|
||||
chirp_contrast=np.linspace(0.01, 1.0, ntest),
|
||||
)
|
||||
|
||||
key1, chirp_params = switch_test(test1, defaultparams, testparams)
|
||||
key2, chirp_params = switch_test(test2, chirp_params, testparams)
|
||||
|
||||
# make the chirp trace
|
||||
# make the chirp trace
|
||||
eodf = 500
|
||||
samplerate = 20000
|
||||
duration = 2
|
||||
@@ -63,40 +72,60 @@ def main(test1, test2, resolution=10):
|
||||
tight_cutoffs = 10
|
||||
|
||||
distances = np.full((ntest, ntest), np.nan)
|
||||
|
||||
fig, axs = plt.subplots(ntest, ntest, figsize = (10, 10), sharex = True, sharey = True)
|
||||
|
||||
fig, axs = plt.subplots(
|
||||
ntest, ntest, figsize=(10, 10), sharex=True, sharey=True
|
||||
)
|
||||
axs = axs.flatten()
|
||||
|
||||
iter0 = 0
|
||||
for iter1, test1_param in enumerate(chirp_params[key1]):
|
||||
for iter2, test2_param in enumerate(chirp_params[key2]):
|
||||
|
||||
# get the chirp parameters for the current test
|
||||
inner_chirp_params = extract_dict(chirp_params, iter2)
|
||||
inner_chirp_params[key1] = test1_param
|
||||
inner_chirp_params[key2] = test2_param
|
||||
|
||||
# make the chirp trace for the current chirp parameters
|
||||
sizes = np.ones(len(chirp_times)) * inner_chirp_params['chirp_size']
|
||||
widths = np.ones(len(chirp_times)) * inner_chirp_params['chirp_width']
|
||||
kurtosis = np.ones(len(chirp_times)) * inner_chirp_params['chirp_kurtosis']
|
||||
contrast = np.ones(len(chirp_times)) * inner_chirp_params['chirp_contrast']
|
||||
sizes = np.ones(len(chirp_times)) * inner_chirp_params["chirp_size"]
|
||||
widths = (
|
||||
np.ones(len(chirp_times)) * inner_chirp_params["chirp_width"]
|
||||
)
|
||||
kurtosis = (
|
||||
np.ones(len(chirp_times)) * inner_chirp_params["chirp_kurtosis"]
|
||||
)
|
||||
contrast = (
|
||||
np.ones(len(chirp_times)) * inner_chirp_params["chirp_contrast"]
|
||||
)
|
||||
|
||||
# make the chirp trace
|
||||
chirp_trace, ampmod = chirps(eodf, samplerate, duration, chirp_times, sizes, widths, kurtosis, contrast)
|
||||
chirp_trace, ampmod = chirps(
|
||||
eodf,
|
||||
samplerate,
|
||||
duration,
|
||||
chirp_times,
|
||||
sizes,
|
||||
widths,
|
||||
kurtosis,
|
||||
contrast,
|
||||
)
|
||||
signal = wavefish_eods(
|
||||
fish="Alepto",
|
||||
frequency=chirp_trace,
|
||||
samplerate=samplerate,
|
||||
duration=duration,
|
||||
phase0=0.0,
|
||||
noise_std=0.05
|
||||
)
|
||||
fish="Alepto",
|
||||
frequency=chirp_trace,
|
||||
samplerate=samplerate,
|
||||
duration=duration,
|
||||
phase0=0.0,
|
||||
noise_std=0.05,
|
||||
)
|
||||
signal = signal * ampmod
|
||||
|
||||
# apply broadband filter
|
||||
wide_signal = bandpass_filter(signal, samplerate, eodf - wide_cutoffs, eodf + wide_cutoffs)
|
||||
tight_signal = bandpass_filter(signal, samplerate, eodf - tight_cutoffs, eodf + tight_cutoffs)
|
||||
# apply broadband filter
|
||||
wide_signal = bandpass_filter(
|
||||
signal, samplerate, eodf - wide_cutoffs, eodf + wide_cutoffs
|
||||
)
|
||||
tight_signal = bandpass_filter(
|
||||
signal, samplerate, eodf - tight_cutoffs, eodf + tight_cutoffs
|
||||
)
|
||||
|
||||
# get the instantaneous frequency
|
||||
wide_frequency = inst_freq(wide_signal, samplerate)
|
||||
@@ -111,8 +140,9 @@ def main(test1, test2, resolution=10):
|
||||
iter0 += 1
|
||||
|
||||
fig, ax = plt.subplots()
|
||||
ax.imshow(distances, cmap = 'jet')
|
||||
ax.imshow(distances, cmap="jet")
|
||||
plt.show()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main('width', 'size')
|
||||
main("width", "size")
|
||||
|
||||
Reference in New Issue
Block a user