import glob import numpy as np import matplotlib.pyplot as plt from thunderhopper.modeltools import load_data, save_data from thunderhopper.filetools import crop_paths from thunderhopper.filters import sosfilter from thunderhopper.filtertools import find_kern_specs from IPython import embed # GENERAL SETTINGS: target = 'Omocestus_rufipes' data_paths = glob.glob(f'../data/processed/{target}*.npz') save_path = '../data/inv/thresh_lp/' # ANALYSIS SETTINGS: add_noise = True thresh_percent = np.array([50, 75, 100]) example_scales = np.array([0, 1, 10, 50]) scales = np.geomspace(0.01, 100, 100) scales = np.unique(np.concatenate((scales, example_scales))) plot_results = False kernels = np.array([ [2, 0.008], [4, 0.008], ]) # EXECUTION: for data_path, name in zip(data_paths, crop_paths(data_paths)): print(f'Processing {name}') save_name = save_path + name + '_subset' # Get pure-song kernel responses: data, config = load_data(data_path, files='conv') conv, rate = data['conv'], data['conv_rate'] # Get song segment to be analyzed: time = np.arange(conv.shape[0]) / rate start, end = data['songs_0'].ravel() segment = (time >= start) & (time <= end) # Reduce to kernel subset: kern_inds = find_kern_specs(config['k_specs'], kerns=kernels) config['kernels'] = config['kernels'][:, kern_inds] config['k_specs'] = config['k_specs'][kern_inds] config['k_props'] = [config['k_props'][i] for i in kern_inds] conv = conv[:, kern_inds] # Normalize kernel responses: conv /= conv[segment, :].std(axis=0) if add_noise: # Get normalized noise: rng = np.random.default_rng() noise = rng.normal(size=(conv.shape[0], 1)) noise /= noise[segment].std() # Prepare snippet storage: shape = conv.shape + (example_scales.size, thresh_percent.size) snip_conv = np.zeros(shape, dtype=float) snip_bi = np.zeros(shape, dtype=float) snip_feat = np.zeros(shape, dtype=float) # Prepare measure storage: shape = (scales.size, conv.shape[1], thresh_percent.size) measure_conv = np.zeros(shape, dtype=float) measure_feat = np.zeros(shape, dtype=float) # Compute kernel-specific thresholds (thresholds, kernels): thresholds = np.percentile(conv, thresh_percent, axis=0) # Execute piecewise analysis: for i, threshs in enumerate(thresholds): print('\nSimulating threshold ', thresh_percent[i]) for j, scale in enumerate(scales): print('Simulating scale ', scale) # Rescale conv component: scaled_conv = conv * scale if add_noise: # Add noise: scaled_conv += noise # Process mixture: scaled_bi = (scaled_conv > threshs).astype(float) scaled_feat = sosfilter(scaled_bi, rate, config['feat_fcut'], 'lp', padtype='fixed', padlen=config['padlen']) # Log snippet data: if scale in example_scales: scale_ind = np.nonzero(example_scales == scale)[0][0] snip_conv[:, :, scale_ind, i] = scaled_conv snip_bi[:, :, scale_ind, i] = scaled_bi snip_feat[:, :, scale_ind, i] = scaled_feat # Get intensity measure per stage: measure_conv[j, :, i] = scaled_conv[segment, :].std(axis=0) measure_feat[j, :, i] = scaled_feat[segment, :].mean(axis=0) if plot_results: fig, axes = plt.subplots(thresh_percent.size, kernels.shape[0], figsize=(16, 9), layout='constrained', sharex=True, sharey=True, squeeze=True) axes[0, 0].set_xscale('symlog', linthresh=scales[scales>0].min(), linscale=0.25) for i, thresh in enumerate(thresh_percent): for j, kernel in enumerate(kernels): ax = axes[i, j] ax.plot(scales, measure_feat[:, j, i], 'k') if i == 0: ax.set_title(f'Kernel {kernel}') if j == 0: ax.set_ylabel(f'{thresh}%') plt.show() # Save analysis results: if save_path is not None: data = dict( scales=scales, example_scales=example_scales, snip_conv=snip_conv, snip_bi=snip_bi, snip_feat=snip_feat, measure_conv=measure_conv, measure_feat=measure_feat, thresh_perc=thresh_percent, threshs=thresholds, ) if add_noise: save_name += '_noise' save_data(save_name, data, config, overwrite=True) print('Done.') embed()