added second inst freq function

This commit is contained in:
weygoldt 2023-04-07 12:49:37 +02:00
parent 4076cf074e
commit e1e1e75cd2
No known key found for this signature in database
3 changed files with 76 additions and 23 deletions

View File

@ -18,6 +18,7 @@ from modules.datahandling import (
purge_duplicates,
group_timestamps,
instantaneous_frequency,
instantaneous_frequency2,
minmaxnorm,
)
@ -517,6 +518,27 @@ def has_chirp(baseline_frequency: np.ndarray, peak_height: float) -> bool:
return False
def mask_low_amplitudes(envelope, threshold):
"""
Mask low amplitudes in the envelope.
Parameters
----------
envelope : np.ndarray
Envelope of the signal.
threshold : float
Threshold to mask low amplitudes.
Returns
-------
np.ndarray
"""
mask = np.ones_like(envelope, dtype=bool)
mask[envelope < threshold] = np.nan
return mask
def find_searchband(
current_frequency: np.ndarray,
percentiles_ids: np.ndarray,
@ -740,15 +762,6 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None:
current_frequencies = data.freq[track_window_index]
current_powers = data.powers[track_window_index, :]
# approximate sampling rate to compute expected durations if there
# is data available for this time window for this fish id
# track_samplerate = np.mean(1 / np.diff(data.time))
# expected_duration = (
# (window_start_seconds + window_duration_seconds)
# - window_start_seconds
# ) * track_samplerate
# check if tracked data available in this window
if len(current_frequencies) < 3:
logger.warning(
@ -832,17 +845,17 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None:
highf=config.baseline_envelope_bandpass_highf,
)
# highbass filter introduced filter effects, i.e. oscillations
# around peaks. Compute the envelope of the highpass filtered
# and inverted baseline envelope to remove these oscillations
# create a mask that removes areas where amplitudes are very
# because the instantaneous frequency is not reliable there
baseline_envelope = -baseline_envelope
amplitude_mask = mask_low_amplitudes(
baseline_envelope,
config.baseline_min_amplitude
)
# baseline_envelope = envelope(
# signal=baseline_envelope,
# samplerate=data.raw_rate,
# cutoff_frequency=config.baseline_envelope_envelope_cutoff,
# )
# invert baseline envelope to find troughs in the baseline
baseline_envelope = -baseline_envelope
# compute the envelope of the search band. Peaks in the search
# band envelope correspond to troughs in the baseline envelope
@ -863,6 +876,8 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None:
# chirp. This phenomenon is only observed on chirps on a narrow
# filtered baseline such as the one we are working with.
baseline_frequency = instantaneous_frequency2(baselineband, data.raw_rate)
(
baseline_frequency_time,
baseline_frequency,
@ -872,7 +887,6 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None:
smoothing_window=config.baseline_frequency_smoothing,
)
# Take the absolute of the instantaneous frequency to invert
# troughs into peaks. This is nessecary since the narrow
# pass band introduces these anomalies. Also substract by the
@ -882,16 +896,14 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None:
baseline_frequency - np.median(baseline_frequency)
)
# Now check if there are strong dips in the signal amplitude
# in the rolling window. These result in frequency anomalies,
# which would be detected as chirps on the frequency trace.
# check if there is at least one superthreshold peak on the
# instantaneous and exit the loop if not. This is used to
# prevent windows that do definetely not include a chirp
# to enter normalization, where small changes due to noise
# would be amplified
# if not has_chirp(baseline_frequency[amplitude_mask], config.baseline_frequency_peakheight):
# continue
if not has_chirp(baseline_frequency, config.baseline_frequency_peakheight):
continue

View File

@ -27,6 +27,7 @@ baseline_frequency_smoothing: 3 # instantaneous frequency smoothing
# Feature processing parameters -----------------------------------------------
baseline_frequency_peakheight: 5 # the min peak height of the baseline instfreq
baseline_min_amplitude: 0.1 # the minimal value of the baseline envelope
baseline_envelope_cutoff: 25 # envelope estimation cutoff
baseline_envelope_bandpass_lowf: 2 # envelope badpass lower cutoff
baseline_envelope_bandpass_highf: 100 # envelope bandbass higher cutoff

View File

@ -2,6 +2,7 @@ import numpy as np
from typing import List, Any
from scipy.ndimage import gaussian_filter1d
from scipy.stats import gamma, norm
from scipy.signal import resample
def minmaxnorm(data):
@ -21,6 +22,45 @@ def minmaxnorm(data):
"""
return (data - np.min(data)) / (np.max(data) - np.min(data))
def instantaneous_frequency2(signal: np.ndarray, fs: float, interpolation: str = 'linear') -> np.ndarray:
"""
Compute the instantaneous frequency of a periodic signal using zero crossings and resample the frequency using linear
or cubic interpolation to match the dimensions of the input array.
Parameters
----------
signal : np.ndarray
Input signal.
fs : float
Sampling frequency of the input signal.
interpolation : str, optional
Interpolation method to use during resampling. Should be either 'linear' or 'cubic'. Default is 'linear'.
Returns
-------
freq : np.ndarray
Instantaneous frequency of the input signal, resampled to match the dimensions of the input array.
"""
# Find zero crossings
zero_crossings = np.where(np.diff(np.sign(signal)))[0]
# Compute time differences between zero crossings
time_diff = np.diff(zero_crossings) / fs
# Compute instantaneous frequency as inverse of time differences
freq = 1 / time_diff
# Resample the frequency using specified interpolation method to match the dimensions of the input array
orig_len = len(signal)
freq = resample(freq, orig_len)
if interpolation == 'linear':
freq = np.interp(np.arange(0, orig_len), np.arange(0, orig_len), freq)
elif interpolation == 'cubic':
freq = resample(freq, orig_len, window='cubic')
return freq
def instantaneous_frequency(
signal: np.ndarray,