diff --git a/code/chirpdetection.py b/code/chirpdetection.py index 1fe2372..95800df 100755 --- a/code/chirpdetection.py +++ b/code/chirpdetection.py @@ -535,7 +535,7 @@ def mask_low_amplitudes(envelope, threshold): """ mask = np.ones_like(envelope, dtype=bool) - mask[envelope < threshold] = np.nan + mask[envelope < threshold] = False return mask @@ -835,6 +835,14 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None: cutoff_frequency=config.baseline_envelope_cutoff, ) + # create a mask that removes areas where amplitudes are very + # because the instantaneous frequency is not reliable there + + amplitude_mask = mask_low_amplitudes( + baseline_envelope_unfiltered, + config.baseline_min_amplitude + ) + # highpass filter baseline envelope to remove slower # fluctuations e.g. due to motion envelope @@ -845,14 +853,6 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None: highf=config.baseline_envelope_bandpass_highf, ) - # create a mask that removes areas where amplitudes are very - # because the instantaneous frequency is not reliable there - - amplitude_mask = mask_low_amplitudes( - baseline_envelope, - config.baseline_min_amplitude - ) - # invert baseline envelope to find troughs in the baseline baseline_envelope = -baseline_envelope @@ -876,15 +876,10 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None: # chirp. This phenomenon is only observed on chirps on a narrow # filtered baseline such as the one we are working with. - baseline_frequency = instantaneous_frequency2(baselineband, data.raw_rate) - - ( - baseline_frequency_time, - baseline_frequency, - ) = instantaneous_frequency( - signal=baselineband, - samplerate=data.raw_rate, - smoothing_window=config.baseline_frequency_smoothing, + baseline_frequency = instantaneous_frequency( + baselineband, + data.raw_rate, + config.baseline_frequency_smoothing ) # Take the absolute of the instantaneous frequency to invert @@ -902,9 +897,7 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None: # to enter normalization, where small changes due to noise # would be amplified - # if not has_chirp(baseline_frequency[amplitude_mask], config.baseline_frequency_peakheight): - # continue - if not has_chirp(baseline_frequency, config.baseline_frequency_peakheight): + if not has_chirp(baseline_frequency_filtered[amplitude_mask], config.baseline_frequency_peakheight): continue # CUT OFF OVERLAP --------------------------------------------- @@ -925,20 +918,24 @@ def chirpdetection(datapath: str, plot: str, debug: str = "false") -> None: search_envelope_unfiltered = search_envelope_unfiltered[no_edges] search_envelope = search_envelope[no_edges] - # get instantaneous frequency withoup edges - no_edges_t0 = int(window_edge) / data.raw_rate - no_edges_t1 = baseline_frequency_time[-1] - ( - int(window_edge) / data.raw_rate - ) - no_edges = (baseline_frequency_time >= no_edges_t0) & ( - baseline_frequency_time <= no_edges_t1 - ) - - baseline_frequency_filtered = baseline_frequency_filtered[no_edges] baseline_frequency = baseline_frequency[no_edges] - baseline_frequency_time = ( - baseline_frequency_time[no_edges] + window_start_seconds - ) + baseline_frequency_filtered = baseline_frequency_filtered[no_edges] + baseline_frequency_time = current_raw_time + + # # get instantaneous frequency withoup edges + # no_edges_t0 = int(window_edge) / data.raw_rate + # no_edges_t1 = baseline_frequency_time[-1] - ( + # int(window_edge) / data.raw_rate + # ) + # no_edges = (baseline_frequency_time >= no_edges_t0) & ( + # baseline_frequency_time <= no_edges_t1 + # ) + + # baseline_frequency_filtered = baseline_frequency_filtered[no_edges] + # baseline_frequency = baseline_frequency[no_edges] + # baseline_frequency_time = ( + # baseline_frequency_time[no_edges] + window_start_seconds + # ) # NORMALIZE --------------------------------------------------- diff --git a/code/chirpdetector_conf.yml b/code/chirpdetector_conf.yml index 803ed8e..371326e 100755 --- a/code/chirpdetector_conf.yml +++ b/code/chirpdetector_conf.yml @@ -27,7 +27,7 @@ baseline_frequency_smoothing: 3 # instantaneous frequency smoothing # Feature processing parameters ----------------------------------------------- baseline_frequency_peakheight: 5 # the min peak height of the baseline instfreq -baseline_min_amplitude: 0.1 # the minimal value of the baseline envelope +baseline_min_amplitude: 0.0001 # the minimal value of the baseline envelope baseline_envelope_cutoff: 25 # envelope estimation cutoff baseline_envelope_bandpass_lowf: 2 # envelope badpass lower cutoff baseline_envelope_bandpass_highf: 100 # envelope bandbass higher cutoff diff --git a/code/modules/datahandling.py b/code/modules/datahandling.py index ca9e693..0a240ab 100644 --- a/code/modules/datahandling.py +++ b/code/modules/datahandling.py @@ -22,6 +22,7 @@ def minmaxnorm(data): """ return (data - np.min(data)) / (np.max(data) - np.min(data)) + def instantaneous_frequency2(signal: np.ndarray, fs: float, interpolation: str = 'linear') -> np.ndarray: """ Compute the instantaneous frequency of a periodic signal using zero crossings and resample the frequency using linear @@ -66,7 +67,8 @@ def instantaneous_frequency( signal: np.ndarray, samplerate: int, smoothing_window: int, -) -> tuple[np.ndarray, np.ndarray]: + interpolation: str = 'linear', +) -> np.ndarray: """ Compute the instantaneous frequency of a signal that is approximately sinusoidal and symmetric around 0. @@ -79,6 +81,8 @@ def instantaneous_frequency( Samplerate of the signal. smoothing_window : int Window size for the gaussian filter. + interpolation : str, optional + Interpolation method to use during resampling. Should be either 'linear' or 'cubic'. Default is 'linear'. Returns ------- @@ -112,7 +116,17 @@ def instantaneous_frequency( 1 / np.diff(true_zero), smoothing_window ) - return instantaneous_frequency_time, instantaneous_frequency + # Resample the frequency using specified interpolation method to match the dimensions of the input array + orig_len = len(signal) + freq = resample(instantaneous_frequency, orig_len) + + if interpolation == 'linear': + freq = np.interp(np.arange(0, orig_len), np.arange(0, orig_len), freq) + elif interpolation == 'cubic': + freq = resample(freq, orig_len, window='cubic') + + + return freq def purge_duplicates(