Files
paper_2025/python/save_inv_data_thresh-lp.py

160 lines
5.7 KiB
Python

import numpy as np
import matplotlib.pyplot as plt
from thunderhopper.modeltools import load_data, save_data
from thunderhopper.filetools import search_files, crop_paths
from thunderhopper.filters import sosfilter
from thunderhopper.filtertools import find_kern_specs
from thunderhopper.model import convolve_kernels
from misc_functions import draw_noise_segment
from IPython import embed
# GENERAL SETTINGS:
example_file = 'Omocestus_rufipes_DJN_32-40s724ms-48s779ms'
search_target = ['*', example_file][0]
data_paths = search_files(search_target, excl='noise', dir='../data/processed/')
noise_path = '../data/processed/white_noise_sd-1.npz'
ref_path = '../data/inv/thresh_lp/ref_measures.npz'
save_path = '../data/inv/thresh_lp/'
# ANALYSIS SETTINGS:
add_noise = False
plot_results = False
example_scales = np.array([0, 1, 10, 30, 100])
scales = np.geomspace(0.01, 10000, 1000)
scales = np.unique(np.concatenate(([0], scales, example_scales)))
thresh_rel = np.array([0.5, 1, 3])
kern_specs = np.array([
[1, 0.008],
[2, 0.004],
[3, 0.002],
])
# PREPARATION:
if add_noise:
pure_noise = np.load(noise_path)['inv']
# Define kernel-specific threshold values based on pure-noise response SD:
thresh_abs = np.load(ref_path)['conv'][None, :] * thresh_rel[:, None]
# EXECUTION:
for data_path, name in zip(data_paths, crop_paths(data_paths)):
save_detailed = example_file in name
print(f'Processing {name}')
# Get adapted envelope (prior to convolution):
data, config = load_data(data_path, files='inv')
song, rate = data['inv'], data['inv_rate']
# Get song segment to be analyzed:
time = np.arange(song.shape[0]) / rate
start, end = data['songs_0'].ravel()
segment = (time >= start) & (time <= end)
# Normalize song component:
song /= song[segment].std()
# Reduce to kernel subset:
kern_inds = find_kern_specs(config['k_specs'], kerns=kern_specs)
config['kernels'] = config['kernels'][:, kern_inds]
config['k_specs'] = config['k_specs'][kern_inds, :]
config['k_props'] = [config['k_props'][i] for i in kern_inds]
if add_noise:
# Get normalized noise component:
noise = draw_noise_segment(pure_noise, song.shape[0])
noise /= noise[segment].std()
# Prepare storage:
measure_feat = np.zeros((scales.size, kern_specs.shape[0], thresh_rel.size), dtype=float)
if save_detailed:
# Prepare optional storage:
measure_inv = np.zeros((scales.size,), dtype=float)
snip_inv = np.zeros((song.size, example_scales.size), dtype=float)
shape = (song.size, kern_specs.shape[0], example_scales.size, thresh_rel.size)
snip_conv = np.zeros(shape[:-1], dtype=float)
snip_bi = np.zeros(shape, dtype=float)
snip_feat = np.zeros(shape, dtype=float)
# Execute piecewise:
for i, scale in enumerate(scales):
print('Simulating scale', scale)
# Rescale song component:
scaled_song = song * scale
if add_noise:
# Add noise:
scaled_song += noise
if save_detailed:
# Log input intensity measure:
measure_inv[i] = scaled_song[segment].std()
# Process mixture:
scaled_conv = convolve_kernels(scaled_song, config['kernels'], config['k_specs'])
# Log threshold-independent snippet data:
if save_detailed and scale in example_scales:
save_ind = np.nonzero(example_scales == scale)[0][0]
snip_inv[:, save_ind] = scaled_song
snip_conv[:, :, save_ind] = scaled_conv
# Execute piecewise again:
for j, thresholds in enumerate(thresh_abs[:, kern_inds]):
# Process mixture further:
scaled_bi = (scaled_conv > thresholds).astype(float)
scaled_feat = sosfilter(scaled_bi, rate, config['feat_fcut'], 'lp',
padtype='fixed', padlen=config['padlen'])
# Log threshold-dependent snippet data:
if save_detailed and scale in example_scales:
snip_bi[:, :, save_ind, j] = scaled_bi
snip_feat[:, :, save_ind, j] = scaled_feat
# Log output intensity measure:
measure_feat[i, :, j] = scaled_feat[segment, :].mean(axis=0)
# Overview plot:
if plot_results:
fig, axes = plt.subplots(thresh_rel.size, kern_specs.shape[0],
figsize=(16, 9), layout='constrained',
sharex=True, sharey=True, squeeze=True)
axes[0, 0].set_xscale('symlog', linthresh=scales[scales>0].min(),
linscale=0.25)
axes[0, 0].set_ylim(0, 1)
for i, thresh in enumerate(thresh_rel):
for j, kernel in enumerate(kern_specs):
ax = axes[i, j]
ax.plot(scales, measure_feat[:, j, i], 'k')
if i == 0:
ax.set_title(f'Kernel {kernel}')
if j == 0:
ax.set_ylabel(f'{thresh} * SD')
plt.show()
# Save analysis results:
if save_path is not None:
data = dict(
scales=scales,
example_scales=example_scales,
measure_feat=measure_feat,
thresh_rel=thresh_rel,
thresh_abs=thresh_abs,
)
if save_detailed:
data.update(dict(
measure_inv=measure_inv,
snip_inv=snip_inv,
snip_conv=snip_conv,
snip_bi=snip_bi,
snip_feat=snip_feat,
))
save_name = save_path + name
if add_noise:
save_name += '_noise'
else:
save_name += '_pure'
save_data(save_name, data, config, overwrite=True)
print('Done.')
embed()