diff --git a/code/chirpdetection.py b/code/chirpdetection.py index 533fd90..1fe2372 100755 --- a/code/chirpdetection.py +++ b/code/chirpdetection.py @@ -18,6 +18,7 @@ from modules.datahandling import ( purge_duplicates, group_timestamps, instantaneous_frequency, + instantaneous_frequency2, minmaxnorm, ) @@ -517,6 +518,27 @@ def has_chirp(baseline_frequency: np.ndarray, peak_height: float) -> bool: return False +def mask_low_amplitudes(envelope, threshold): + """ + Mask low amplitudes in the envelope. + + Parameters + ---------- + envelope : np.ndarray + Envelope of the signal. + threshold : float + Threshold to mask low amplitudes. + + Returns + ------- + np.ndarray + + """ + mask = np.ones_like(envelope, dtype=bool) + mask[envelope < threshold] = np.nan + return mask + + def find_searchband( current_frequency: np.ndarray, percentiles_ids: np.ndarray, @@ -740,15 +762,6 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None: current_frequencies = data.freq[track_window_index] current_powers = data.powers[track_window_index, :] - # approximate sampling rate to compute expected durations if there - # is data available for this time window for this fish id - - # track_samplerate = np.mean(1 / np.diff(data.time)) - # expected_duration = ( - # (window_start_seconds + window_duration_seconds) - # - window_start_seconds - # ) * track_samplerate - # check if tracked data available in this window if len(current_frequencies) < 3: logger.warning( @@ -832,17 +845,17 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None: highf=config.baseline_envelope_bandpass_highf, ) - # highbass filter introduced filter effects, i.e. oscillations - # around peaks. Compute the envelope of the highpass filtered - # and inverted baseline envelope to remove these oscillations + # create a mask that removes areas where amplitudes are very + # because the instantaneous frequency is not reliable there - baseline_envelope = -baseline_envelope + amplitude_mask = mask_low_amplitudes( + baseline_envelope, + config.baseline_min_amplitude + ) - # baseline_envelope = envelope( - # signal=baseline_envelope, - # samplerate=data.raw_rate, - # cutoff_frequency=config.baseline_envelope_envelope_cutoff, - # ) + # invert baseline envelope to find troughs in the baseline + + baseline_envelope = -baseline_envelope # compute the envelope of the search band. Peaks in the search # band envelope correspond to troughs in the baseline envelope @@ -863,6 +876,8 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None: # chirp. This phenomenon is only observed on chirps on a narrow # filtered baseline such as the one we are working with. + baseline_frequency = instantaneous_frequency2(baselineband, data.raw_rate) + ( baseline_frequency_time, baseline_frequency, @@ -872,7 +887,6 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None: smoothing_window=config.baseline_frequency_smoothing, ) - # Take the absolute of the instantaneous frequency to invert # troughs into peaks. This is nessecary since the narrow # pass band introduces these anomalies. Also substract by the @@ -882,16 +896,14 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None: baseline_frequency - np.median(baseline_frequency) ) - # Now check if there are strong dips in the signal amplitude - # in the rolling window. These result in frequency anomalies, - # which would be detected as chirps on the frequency trace. - # check if there is at least one superthreshold peak on the # instantaneous and exit the loop if not. This is used to # prevent windows that do definetely not include a chirp # to enter normalization, where small changes due to noise # would be amplified + # if not has_chirp(baseline_frequency[amplitude_mask], config.baseline_frequency_peakheight): + # continue if not has_chirp(baseline_frequency, config.baseline_frequency_peakheight): continue diff --git a/code/chirpdetector_conf.yml b/code/chirpdetector_conf.yml index e52c59e..803ed8e 100755 --- a/code/chirpdetector_conf.yml +++ b/code/chirpdetector_conf.yml @@ -27,6 +27,7 @@ baseline_frequency_smoothing: 3 # instantaneous frequency smoothing # Feature processing parameters ----------------------------------------------- baseline_frequency_peakheight: 5 # the min peak height of the baseline instfreq +baseline_min_amplitude: 0.1 # the minimal value of the baseline envelope baseline_envelope_cutoff: 25 # envelope estimation cutoff baseline_envelope_bandpass_lowf: 2 # envelope badpass lower cutoff baseline_envelope_bandpass_highf: 100 # envelope bandbass higher cutoff diff --git a/code/modules/datahandling.py b/code/modules/datahandling.py index f375d44..ca9e693 100644 --- a/code/modules/datahandling.py +++ b/code/modules/datahandling.py @@ -2,6 +2,7 @@ import numpy as np from typing import List, Any from scipy.ndimage import gaussian_filter1d from scipy.stats import gamma, norm +from scipy.signal import resample def minmaxnorm(data): @@ -21,6 +22,45 @@ def minmaxnorm(data): """ return (data - np.min(data)) / (np.max(data) - np.min(data)) +def instantaneous_frequency2(signal: np.ndarray, fs: float, interpolation: str = 'linear') -> np.ndarray: + """ + Compute the instantaneous frequency of a periodic signal using zero crossings and resample the frequency using linear + or cubic interpolation to match the dimensions of the input array. + + Parameters + ---------- + signal : np.ndarray + Input signal. + fs : float + Sampling frequency of the input signal. + interpolation : str, optional + Interpolation method to use during resampling. Should be either 'linear' or 'cubic'. Default is 'linear'. + + Returns + ------- + freq : np.ndarray + Instantaneous frequency of the input signal, resampled to match the dimensions of the input array. + """ + # Find zero crossings + zero_crossings = np.where(np.diff(np.sign(signal)))[0] + + # Compute time differences between zero crossings + time_diff = np.diff(zero_crossings) / fs + + # Compute instantaneous frequency as inverse of time differences + freq = 1 / time_diff + + # Resample the frequency using specified interpolation method to match the dimensions of the input array + orig_len = len(signal) + freq = resample(freq, orig_len) + + if interpolation == 'linear': + freq = np.interp(np.arange(0, orig_len), np.arange(0, orig_len), freq) + elif interpolation == 'cubic': + freq = resample(freq, orig_len, window='cubic') + + return freq + def instantaneous_frequency( signal: np.ndarray,