import glob import numpy as np import matplotlib.pyplot as plt from thunderhopper.modeltools import load_data, save_data from thunderhopper.filetools import crop_paths from thunderhopper.filtertools import find_kern_specs from thunderhopper.model import process_signal, convolve_kernels from IPython import embed # GENERAL SETTINGS: target = 'Omocestus_rufipes' data_paths = glob.glob(f'../data/processed/{target}*.npz') noise_path = '../data/processed/white_noise_sd-1.npz' stages = ['filt', 'env', 'log', 'inv', 'conv', 'feat'] save_path = '../data/inv/full/' # ANALYSIS SETTINGS: example_scales = np.array([0.1, 1, 10, 30, 100, 300]) scales = np.geomspace(0.01, 10000, 100) scales = np.unique(np.concatenate((scales, example_scales))) thresh_rel = 3 # SUBSET SETTINGS: kernels = np.array([ [1, 0.002], [-1, 0.002], [2, 0.004], [-2, 0.004], [3, 0.032], [-3, 0.032] ]) kernels = None types = None#np.array([-1]) sigmas = None#np.array([0.001, 0.002, 0.004, 0.008, 0.016, 0.032]) # PREPARATION: noise_data = np.load(noise_path) pure_noise = noise_data['raw'] # EXECUTION: for data_path, name in zip(data_paths, crop_paths(data_paths)): print(f'Processing {name}') # Get song recording (prior to anything): data, config = load_data(data_path, files='raw') song, rate = data['raw'], config['rate'] if thresh_rel is not None: # Get noise-bound kernel-specific thresholds: config['feat_thresh'] = noise_data['conv'].std(axis=0) * thresh_rel # Reduce to kernel subset: if any(var is not None for var in [kernels, types, sigmas]): kern_inds = find_kern_specs(config['k_specs'], kernels, types, sigmas) config['kernels'] = config['kernels'][:, kern_inds] config['k_specs'] = config['k_specs'][kern_inds, :] config['k_props'] = [config['k_props'][i] for i in kern_inds] config['feat_thresh'] = config['feat_thresh'][kern_inds] # Get song segment to be analyzed: time = np.arange(song.shape[0]) / rate start, end = data['songs_0'].ravel() segment = (time >= start) & (time <= end) # Normalize song component: song /= song[segment].std(axis=0) # Get normalized noise component: noise = pure_noise[:song.shape[0]] noise /= noise[segment].std() # Prepare snippet storage: shape_low = (song.shape[0], example_scales.size) shape_high = (song.shape[0], config['k_specs'].shape[0], example_scales.size) snippets = dict( snip_filt=np.zeros(shape_low, dtype=float), snip_env=np.zeros(shape_low, dtype=float), snip_log=np.zeros(shape_low, dtype=float), snip_inv=np.zeros(shape_low, dtype=float), snip_conv=np.zeros(shape_high, dtype=float), snip_feat=np.zeros(shape_high, dtype=float) ) # Prepare measure storage: shape_low = (scales.size,) shape_high = (scales.size, config['k_specs'].shape[0]) measures = dict( measure_filt=np.zeros(shape_low, dtype=float), measure_env=np.zeros(shape_low, dtype=float), measure_log=np.zeros(shape_low, dtype=float), measure_inv=np.zeros(shape_low, dtype=float), measure_conv=np.zeros(shape_high, dtype=float), measure_feat=np.zeros(shape_high, dtype=float) ) # Execute piecewise: for i, scale in enumerate(scales): print('Simulating scale ', scale) # Rescale song and add noise: scaled = song * scale + noise # Process mixture: signals, rates = process_signal(config, returns=stages, signal=scaled, rate=rate) # Store results: for stage in stages: mkey, skey = f'measure_{stage}', f'snip_{stage}' # Log snippet data: if scale in example_scales: scale_ind = np.nonzero(example_scales == scale)[0][0] snippets[skey][:, ..., scale_ind] = signals[stage] # Log intensity measure per stage (excluding binary): if stage in ['raw', 'filt', 'env', 'log', 'inv', 'conv']: measures[mkey][i] = signals[stage][segment, ...].std(axis=0) elif stage == 'feat': measures[mkey][i] = signals[stage][segment, :].mean(axis=0) # Save analysis results: if save_path is not None: data = dict( scales=scales, example_scales=example_scales, ) data.update(snippets) data.update(measures) save_data(save_path + name, data, config, overwrite=True) print('Done.') embed()