import numpy as np import matplotlib.pyplot as plt from thunderhopper.modeltools import load_data, save_data from thunderhopper.filetools import search_files, crop_paths from thunderhopper.filters import sosfilter from thunderhopper.filtertools import find_kern_specs from thunderhopper.model import convolve_kernels from IPython import embed # GENERAL SETTINGS: target = ['Omocestus_rufipes', '*'][0] data_paths = search_files(target, dir='../data/processed/') noise_path = '../data/processed/white_noise_sd-1.npz' save_path = '../data/inv/thresh_lp/' # ANALYSIS SETTINGS: add_noise = True save_snippets = add_noise and True plot_results = False example_scales = np.array([0, 1, 10, 30, 100]) scales = np.geomspace(0.01, 10000, 100) scales = np.unique(np.concatenate((scales, example_scales))) thresh_rel = np.array([0.5, 1, 3]) kern_specs = np.array([ [1, 0.008], [2, 0.004], [3, 0.002], ]) # PREPARATION: pure_noise = np.load(noise_path)['inv'] # EXECUTION: for data_path, name in zip(data_paths, crop_paths(data_paths)): print(f'Processing {name}') save_name = save_path + name # Get adapted envelope (prior to convolution): data, config = load_data(data_path, files='inv') song, rate = data['inv'], data['inv_rate'] # Get song segment to be analyzed: time = np.arange(song.shape[0]) / rate start, end = data['songs_0'].ravel() segment = (time >= start) & (time <= end) # Reduce to kernel subset: kern_inds = find_kern_specs(config['k_specs'], kerns=kern_specs) config['kernels'] = config['kernels'][:, kern_inds] config['k_specs'] = config['k_specs'][kern_inds, :] config['k_props'] = [config['k_props'][i] for i in kern_inds] # Normalize song component: song /= song[segment].std() # Get normalized noise component: noise = pure_noise[:song.shape[0]] noise /= noise[segment].std() # Define kernel-specific threshold values based on pure-noise response SD: ref_conv = convolve_kernels(noise, config['kernels'], config['k_specs']) thresh_abs = ref_conv[segment, :].std(axis=0, keepdims=True) * thresh_rel[:, None] # Prepare measure storage: shape = (scales.size, kern_specs.shape[0], thresh_rel.size) measure_feat = np.zeros(shape, dtype=float) if save_snippets: # Prepare snippet storage: snip_inv = np.zeros((song.size, example_scales.size), dtype=float) shape = (song.size, kern_specs.shape[0], example_scales.size, thresh_rel.size) snip_conv = np.zeros(shape[:-1], dtype=float) snip_bi = np.zeros(shape, dtype=float) snip_feat = np.zeros(shape, dtype=float) # Execute piecewise: for i, scale in enumerate(scales): print('Simulating scale', scale) # Rescale song component: scaled_song = song * scale if add_noise: # Add noise: scaled_song += noise # Process mixture: scaled_conv = convolve_kernels(scaled_song, config['kernels'], config['k_specs']) # Log threshold-independent snippet data: if save_snippets and scale in example_scales: save_ind = np.nonzero(example_scales == scale)[0][0] snip_inv[:, save_ind] = scaled_song snip_conv[:, :, save_ind] = scaled_conv # Execute piecewise again: for j, thresholds in enumerate(thresh_abs): # Process mixture further: scaled_bi = (scaled_conv > thresholds).astype(float) scaled_feat = sosfilter(scaled_bi, rate, config['feat_fcut'], 'lp', padtype='fixed', padlen=config['padlen']) # Log threshold-dependent snippet data: if save_snippets and scale in example_scales: snip_bi[:, :, save_ind, j] = scaled_bi snip_feat[:, :, save_ind, j] = scaled_feat # Log intensity measure: measure_feat[i, :, j] = scaled_feat[segment, :].mean(axis=0) # Overview plot: if plot_results: fig, axes = plt.subplots(thresh_rel.size, kern_specs.shape[0], figsize=(16, 9), layout='constrained', sharex=True, sharey=True, squeeze=True) axes[0, 0].set_xscale('symlog', linthresh=scales[scales>0].min(), linscale=0.25) axes[0, 0].set_ylim(0, 1) for i, thresh in enumerate(thresh_rel): for j, kernel in enumerate(kern_specs): ax = axes[i, j] ax.plot(scales, measure_feat[:, j, i], 'k') if i == 0: ax.set_title(f'Kernel {kernel}') if j == 0: ax.set_ylabel(f'{thresh} * SD') plt.show() # Save analysis results: if save_path is not None: data = dict( scales=scales, example_scales=example_scales, measure_feat=measure_feat, thresh_rel=thresh_rel, thresh_abs=thresh_abs, ) if save_snippets: data.update(dict( snip_inv=snip_inv, snip_conv=snip_conv, snip_bi=snip_bi, snip_feat=snip_feat, )) if add_noise: save_name += '_noise' else: save_name += '_pure' save_data(save_name, data, config, overwrite=True) print('Done.') embed()