import numpy as np import matplotlib.pyplot as plt from thunderhopper.modeltools import load_data, save_data from thunderhopper.filetools import search_files, crop_paths from thunderhopper.filters import sosfilter from thunderhopper.filtertools import find_kern_specs, pdf_proportion from IPython import embed # GENERAL SETTINGS: target = ['Omocestus_rufipes', '*'][0] data_paths = search_files(target, dir='../data/processed/') save_path = '../data/inv/thresh_lp/' # ANALYSIS SETTINGS: add_noise = False save_snippets = True example_scales = np.array([0, 1, 10, 50]) scales = np.geomspace(0.01, 1000, 100) scales = np.unique(np.concatenate((scales, example_scales))) thresh_percent = np.array([0.6, 0.75, 0.999]) thresholds = pdf_proportion(thresh_percent, sd=1, mu=0) plot_results = False kernels = np.array([ [1, 0.008], [2, 0.004], [3, 0.002], ]) # EXECUTION: for data_path, name in zip(data_paths, crop_paths(data_paths)): print(f'Processing {name}') save_name = save_path + name + '_subset' # Get pure-song kernel responses: data, config = load_data(data_path, files='conv') conv, rate = data['conv'], data['conv_rate'] # Get song segment to be analyzed: time = np.arange(conv.shape[0]) / rate start, end = data['songs_0'].ravel() segment = (time >= start) & (time <= end) # Reduce to kernel subset: kern_inds = find_kern_specs(config['k_specs'], kerns=kernels) config['kernels'] = config['kernels'][:, kern_inds] config['k_specs'] = config['k_specs'][kern_inds] config['k_props'] = [config['k_props'][i] for i in kern_inds] conv = conv[:, kern_inds] # Normalize kernel responses: conv /= conv[segment, :].std(axis=0) if add_noise: # Get normalized noise: rng = np.random.default_rng() noise = rng.normal(size=(conv.shape[0], 1)) noise /= noise[segment].std() if save_snippets: # Prepare snippet storage: shape = conv.shape + (example_scales.size, thresh_percent.size) snip_conv = np.zeros(shape, dtype=float) snip_bi = np.zeros(shape, dtype=float) snip_feat = np.zeros(shape, dtype=float) # Prepare measure storage: shape = (scales.size, conv.shape[1], thresh_percent.size) # measure_conv = np.zeros(shape, dtype=float) measure_feat = np.zeros(shape, dtype=float) # Execute piecewise analysis: for i, thresh in enumerate(thresholds): print('\nSimulating threshold ', thresh_percent[i]) for j, scale in enumerate(scales): print('Simulating scale ', scale) # Rescale conv component: scaled_conv = conv * scale if add_noise: # Add noise: scaled_conv += noise # Process mixture: scaled_bi = (scaled_conv > thresh).astype(float) scaled_feat = sosfilter(scaled_bi, rate, config['feat_fcut'], 'lp', padtype='fixed', padlen=config['padlen']) # Log snippet data: if save_snippets and scale in example_scales: scale_ind = np.nonzero(example_scales == scale)[0][0] snip_conv[:, :, scale_ind, i] = scaled_conv snip_bi[:, :, scale_ind, i] = scaled_bi snip_feat[:, :, scale_ind, i] = scaled_feat # Get intensity measure per stage: measure_feat[j, :, i] = scaled_feat[segment, :].mean(axis=0) # measure_conv[j, :, i] = scaled_conv[segment, :].std(axis=0) if plot_results: fig, axes = plt.subplots(thresh_percent.size, kernels.shape[0], figsize=(16, 9), layout='constrained', sharex=True, sharey=True, squeeze=True) axes[0, 0].set_xscale('symlog', linthresh=scales[scales>0].min(), linscale=0.25) axes[0, 0].set_ylim(0, 1) for i, thresh in enumerate(thresh_percent): for j, kernel in enumerate(kernels): ax = axes[i, j] ax.plot(scales, measure_feat[:, j, i], 'k') if i == 0: ax.set_title(f'Kernel {kernel}') if j == 0: ax.set_ylabel(f'{100 * thresh}%') plt.show() # Save analysis results: if save_path is not None: data = dict( scales=scales, example_scales=example_scales, # measure_conv=measure_conv, measure_feat=measure_feat, thresh_perc=thresh_percent, threshs=thresholds, ) if save_snippets: data.update(dict( snip_conv=snip_conv, snip_bi=snip_bi, snip_feat=snip_feat, )) if add_noise: save_name += '_noise' save_data(save_name, data, config, overwrite=True) print('Done.') embed()