import numpy as np from thunderhopper.modeltools import load_data, save_data from thunderhopper.filetools import search_files, crop_paths from thunderhopper.filtertools import find_kern_specs from thunderhopper.model import process_signal from thunderhopper.filters import sosfilter from misc_functions import draw_noise_segment from IPython import embed # GENERAL SETTINGS: target_species = [ 'Chorthippus_biguttulus', 'Chorthippus_mollis', # 'Chrysochraon_dispar', # 'Euchorthippus_declivus', # 'Gomphocerippus_rufus', # 'Omocestus_rufipes', # 'Pseudochorthippus_parallelus', ][1] example_file = { 'Chorthippus_biguttulus': 'Chorthippus_biguttulus_GBC_94-17s73.1ms-19s977ms', 'Chorthippus_mollis': 'Chorthippus_mollis_DJN_41_T28C-46s4.58ms-1m15s697ms', 'Chrysochraon_dispar': 'Chrysochraon_dispar_DJN_26_T28C_DT-32s134ms-34s432ms', 'Euchorthippus_declivus': 'Euchorthippus_declivus_FTN_79-2s167ms-2s563ms', 'Gomphocerippus_rufus': 'Gomphocerippus_rufus_FTN_91-3-884ms-10s427ms', 'Omocestus_rufipes': 'Omocestus_rufipes_DJN_32-40s724ms-48s779ms', 'Pseudochorthippus_parallelus': 'Pseudochorthippus_parallelus_GBC_88-6s678ms-9s32.3ms' }[target_species] data_paths = search_files(target_species, dir='../data/processed/') noise_path = '../data/processed/white_noise_sd-1.npz' thresh_path = '../data/inv/full/thresholds.npz' stages = ['filt', 'env', 'log', 'inv', 'conv', 'feat'] pre_stages = stages[:-1] save_path = '../data/inv/full/' # ANALYSIS SETTINGS: example_scales = np.array([0.1, 1, 10, 30, 100, 300]) scales = np.geomspace(0.01, 10000, 500) scales = np.unique(np.concatenate(([0], scales, example_scales))) thresh_rel = np.array([0, 0.5, 1, 1.5, 2, 2.5, 3]) # SUBSET SETTINGS: kernels = None types = None sigmas = None # PREPARATION: pure_noise = np.load(noise_path)['raw'] thresh_data = dict(np.load(thresh_path)) thresh_abs = thresh_rel[:, None] * thresh_data['sds'][None, :] # EXECUTION: for data_path, name in zip(data_paths, crop_paths(data_paths)): save_detailed = example_file in name print(f'Processing {name}') # Get song recording (prior to anything): data, config = load_data(data_path, files='raw') song, rate = data['raw'], config['rate'] # Reduce to kernel subset: if any(var is not None for var in [kernels, types, sigmas]): kern_inds = find_kern_specs(config['k_specs'], kernels, types, sigmas) config['kernels'] = config['kernels'][:, kern_inds] config['k_specs'] = config['k_specs'][kern_inds, :] config['k_props'] = [config['k_props'][i] for i in kern_inds] thresh_abs = thresh_abs[:, kern_inds] # Get song segment to be analyzed: time = np.arange(song.shape[0]) / rate start, end = data['songs_0'].ravel() segment = (time >= start) & (time <= end) # Normalize song component: song /= song[segment].std(axis=0) # Get normalized noise component: noise = draw_noise_segment(pure_noise, song.shape[0]) noise /= noise[segment].std() # Prepare storage: shape_low = (scales.size,) shape_high = (scales.size, config['k_specs'].shape[0]) measures = dict( measure_filt=np.zeros(shape_low, dtype=float), measure_env=np.zeros(shape_low, dtype=float), measure_log=np.zeros(shape_low, dtype=float), measure_inv=np.zeros(shape_low, dtype=float), measure_conv=np.zeros(shape_high, dtype=float), measure_feat=np.zeros(shape_high + (thresh_rel.size,), dtype=float) ) if save_detailed: # Prepare optional storage: shape_low = (song.shape[0], example_scales.size) shape_high = (song.shape[0], config['k_specs'].shape[0], example_scales.size) snippets = dict( snip_filt=np.zeros(shape_low, dtype=float), snip_env=np.zeros(shape_low, dtype=float), snip_log=np.zeros(shape_low, dtype=float), snip_inv=np.zeros(shape_low, dtype=float), snip_conv=np.zeros(shape_high, dtype=float), snip_feat=np.zeros(shape_high + (thresh_rel.size,), dtype=float) ) # Execute piecewise: for i, scale in enumerate(scales): print('Simulating scale ', scale) # Rescale song and add noise: scaled = song * scale + noise # Process mixture (excluding features): signals, rates = process_signal(config, returns=pre_stages, signal=scaled, rate=rate) # Store non-feature results: for stage in pre_stages: # Log intensity measures: measures[f'measure_{stage}'][i] = signals[stage][segment, ...].std(axis=0) # Log optional snippet data: if save_detailed and scale in example_scales: scale_ind = np.nonzero(example_scales == scale)[0][0] snippets[f'snip_{stage}'][:, ..., scale_ind] = signals[stage] # Execute piecewise again: for j, thresholds in enumerate(thresh_abs): # Finalize processing: feat = sosfilter((signals['conv'] > thresholds).astype(float), rate, config['feat_fcut'], 'lp', padtype='fixed', padlen=config['padlen']) # Log intensity measure: measures['measure_feat'][i, :, j] = feat[segment, :].mean(axis=0) # Log optional snippet data: if save_detailed and scale in example_scales: snippets['snip_feat'][:, :, scale_ind, j] = feat # Save analysis results: if save_path is not None: data = dict( scales=scales, example_scales=example_scales, thresh_rel=thresh_rel, thresh_abs=thresh_abs, ) data.update(measures) if save_detailed: data.update(snippets) save_data(save_path + name, data, config, overwrite=True) print('Done.') embed()