import numpy as np from thunderhopper.modeltools import load_data, save_data from thunderhopper.filetools import search_files, crop_paths from thunderhopper.filtertools import find_kern_specs from thunderhopper.model import process_signal from IPython import embed # GENERAL SETTINGS: mode = ['song', 'noise'][0] example_file = dict( song='Pseudochorthippus_parallelus_micarray-short_JJ_20240815T160355-20240815T160755-1m10s690ms-1m13s614ms', noise='merged_noise' )[mode] search_path = f'../data/field/processed/{mode}/' data_paths = search_files('*', ext='npz', dir=search_path) ref_path = '../data/inv/field/ref_measures.npz' stages = ['raw', 'filt', 'env', 'log', 'inv', 'conv', 'feat'] save_path = f'../data/inv/field/{mode}/' # ANALYSIS SETTINGS: distances = np.load('../data/field/recording_distances.npy')[::-1] thresh_rel = 0.5 # SUBSET SETTINGS: kernels = np.array([ [1, 0.002], [-1, 0.002], [2, 0.004], [-2, 0.004], [3, 0.032], [-3, 0.032] ]) kernels = None types = None#np.array([-1]) sigmas = None#np.array([0.001, 0.002, 0.004, 0.008, 0.016, 0.032]) # PREPARATION: if thresh_rel is not None: # Get threshold values from pure-noise response SD: thresh_abs = np.load(ref_path)['conv'] * thresh_rel # EXECUTION: for data_path, name in zip(data_paths, crop_paths(data_paths)): save_detailed = example_file in name print(f'Processing {name}') # Get song recording (prior to anything): data, config = load_data(data_path, files='raw') song, rate = data['raw'], config['rate'] if thresh_rel is not None: # Set kernel-specific thresholds: config['feat_thresh'] = thresh_abs # Reduce to kernel subset: if any(var is not None for var in [kernels, types, sigmas]): kern_inds = find_kern_specs(config['k_specs'], kernels, types, sigmas) config['kernels'] = config['kernels'][:, kern_inds] config['k_specs'] = config['k_specs'][kern_inds, :] config['k_props'] = [config['k_props'][i] for i in kern_inds] config['feat_thresh'] = config['feat_thresh'][kern_inds] # Get song segment to be analyzed: time = np.arange(song.shape[0]) / rate start, end = data['songs_0'].ravel() segment = (time >= start) & (time <= end) # Prepare storage: measures = {} if save_detailed: snippets = {} # Process snippet: signals, rates = process_signal(config, returns=stages, signal=song, rate=rate) for stage in stages: # Sort largest to smallest distance: signals[stage] = signals[stage][..., ::-1] # Store results: for stage in stages: # Log intensity measures: mkey = f'measure_{stage}' if stage == 'feat': measures[mkey] = signals[stage][segment, ...].mean(axis=0) else: measures[mkey] = signals[stage][segment, ...].std(axis=0) if measures[mkey].ndim == 2: # Make shape (distances, kernels): measures[mkey] = np.moveaxis(measures[mkey], 1, 0) # Log optional snippet data: if save_detailed: snippets[f'snip_{stage}'] = signals[stage] # Save analysis results: if save_path is not None: data = dict( distances=distances, ) data.update(measures) if save_detailed: data.update(snippets) save_data(save_path + name, data, config, overwrite=True) print('Done.') embed()