import glob import numpy as np import matplotlib.pyplot as plt from thunderhopper.modeltools import load_data, save_data from thunderhopper.filetools import crop_paths from thunderhopper.filtertools import find_kern_specs from thunderhopper.model import process_signal from IPython import embed # GENERAL SETTINGS: target = 'Omocestus_rufipes' data_paths = glob.glob(f'../data/processed/{target}*.npz') stages = ['raw', 'filt', 'env', 'log', 'inv', 'conv', 'bi', 'feat'] save_path = '../data/inv/full/' # ANALYSIS SETTINGS: example_scales = np.array([0, 1, 10, 50]) scales = np.geomspace(0.01, 100, 100) scales = np.unique(np.concatenate((scales, example_scales))) kernels = np.array([ [1, 0.002], [-1, 0.002], [2, 0.004], [-2, 0.004], [3, 0.032], [-3, 0.032] ]) kernels = None types = None#np.array([-1]) sigmas = None#np.array([0.001, 0.002, 0.004, 0.008, 0.016, 0.032]) # EXECUTION: for data_path, name in zip(data_paths, crop_paths(data_paths)): print(f'Processing {name}') # Get song recording: data, config = load_data(data_path, files='raw') song, rate = data['raw'], config['rate'] # Reduce to kernel subset: kern_inds = find_kern_specs(config['k_specs'], kernels, types, sigmas) config['kernels'] = config['kernels'][:, kern_inds] config['k_specs'] = config['k_specs'][kern_inds, :] config['k_props'] = [config['k_props'][i] for i in kern_inds] config['feat_thresh'] = config['feat_thresh'][kern_inds] # Get song segment to be analyzed: time = np.arange(song.shape[0]) / rate start, end = data['songs_0'].ravel() segment = (time >= start) & (time <= end) # Normalize song component: song /= song[segment].std(axis=0) # Get normalized noise: rng = np.random.default_rng() noise = rng.normal(size=song.shape[0]) noise /= noise[segment].std() # Prepare snippet storage: shape_low = (song.shape[0], example_scales.size) shape_high = (song.shape[0], config['k_specs'].shape[0], example_scales.size) snippets = dict( snip_raw=np.zeros(shape_low, dtype=float), snip_filt=np.zeros(shape_low, dtype=float), snip_env=np.zeros(shape_low, dtype=float), snip_log=np.zeros(shape_low, dtype=float), snip_inv=np.zeros(shape_low, dtype=float), snip_conv=np.zeros(shape_high, dtype=float), snip_bi=np.zeros(shape_high, dtype=float), snip_feat=np.zeros(shape_high, dtype=float) ) # Prepare measure storage: shape_low = (scales.size,) shape_high = (scales.size, config['k_specs'].shape[0]) measures = dict( measure_raw=np.zeros(shape_low, dtype=float), measure_filt=np.zeros(shape_low, dtype=float), measure_env=np.zeros(shape_low, dtype=float), measure_log=np.zeros(shape_low, dtype=float), measure_inv=np.zeros(shape_low, dtype=float), measure_conv=np.zeros(shape_high, dtype=float), measure_feat=np.zeros(shape_high, dtype=float) ) # Execute piecewise: for i, scale in enumerate(scales): print('Simulating scale ', scale) # Rescale song and add noise: scaled = song * scale + noise # Process mixture: signals, rates = process_signal(config, returns=stages, signal=scaled, rate=rate) # Store results: for stage in stages: key = f'measure_{stage}' # Log snippet data: if scale in example_scales: scale_ind = np.nonzero(example_scales == scale)[0][0] snippets[f'snip_{stage}'][:, ..., scale_ind] = signals[stage] # Log intensity measure per stage (excluding binary): if stage in ['raw', 'filt', 'env', 'log', 'inv', 'conv']: measures[key][i] = signals[stage][segment, ...].std(axis=0) elif stage == 'feat': measures[key][i] = signals[stage][segment, :].mean(axis=0) # thresh_y = np.percentile(measures['measure_feat'], 99, axis=0) # kern_types = np.unique() # thresh_x = np.zeros(thresh_y.shape, dtype=float) # for i, thresh in enumerate(thresh_y): # if thresh < 0.1: # thresh_x[i] = scales[-1] # continue # mask = (measures['measure_feat'][:, i] < thresh) # thresh_x[i] = scales[np.nonzero(mask)[0][-1]] # inds = np.argsort(thresh_x) # print(config['k_specs'][inds, :]) # fig, axes = plt.subplots(1, 2) # axes[0].plot(snippets['snip_feat'][:, inds, -1]) # axes[1].plot(scales, measures['measure_feat'][:, inds]) # plt.show() # embed() # Save analysis results: if save_path is not None: data = dict( scales=scales, example_scales=example_scales, ) data.update(snippets) data.update(measures) save_data(save_path + name, data, config, overwrite=True) print('Done.') embed()