import numpy as np import matplotlib.pyplot as plt from thunderhopper.modeltools import load_data, save_data from thunderhopper.filetools import search_files, crop_paths from thunderhopper.filtertools import find_kern_specs from thunderhopper.filters import sosfilter from thunderhopper.model import process_signal from IPython import embed # GENERAL SETTINGS: mode = ['song', 'noise'][0] example_file = dict( song='Pseudochorthippus_parallelus_micarray-short_JJ_20240815T160355-20240815T160755-1m10s690ms-1m13s614ms', noise='merged_noise' )[mode] search_path = f'../data/field/processed/{mode}/' data_paths = search_files('*', ext='npz', dir=search_path) thresh_path = '../data/inv/field/thresholds.npz' stages = ['raw', 'filt', 'env', 'log', 'inv', 'conv', 'feat'] pre_stages = stages[:-1] save_path = f'../data/inv/field/{mode}/' # ANALYSIS SETTINGS: distances = np.load('../data/field/recording_distances.npy')[::-1] thresh_rel = np.array([0, 0.5, 1, 1.5, 2, 2.5, 3]) init_scale = 10000 # SUBSET SETTINGS: kernels = None types = None sigmas = None # PREPARATION: thresh_data = dict(np.load(thresh_path)) thresh_abs = thresh_rel[:, None] * thresh_data['sds'][None, :] # EXECUTION: for data_path, name in zip(data_paths, crop_paths(data_paths)): save_detailed = example_file in name print(f'Processing {name}') # Get song recording (prior to anything): data, config = load_data(data_path, files='raw') song, rate = data['raw'], config['rate'] # Sort max to min distance: song = song[:, ::-1] * init_scale # Reduce to kernel subset: if any(var is not None for var in [kernels, types, sigmas]): kern_inds = find_kern_specs(config['k_specs'], kernels, types, sigmas) config['kernels'] = config['kernels'][:, kern_inds] config['k_specs'] = config['k_specs'][kern_inds, :] config['k_props'] = [config['k_props'][i] for i in kern_inds] thresh_abs = thresh_abs[:, kern_inds] # Get song segment to be analyzed: time = np.arange(song.shape[0]) / rate start, end = data['songs_0'].ravel() segment = (time >= start) & (time <= end) # Prepare storage: shape = (distances.size, config['k_specs'].shape[0], thresh_rel.size) measures = dict(measure_feat=np.zeros(shape, dtype=float)) if save_detailed: shape = (song.shape[0], config['k_specs'].shape[0], distances.size, thresh_rel.size) snippets = dict(snip_feat=np.zeros(shape, dtype=float)) # Process snippet (excluding features): signals, rates = process_signal(config, returns=pre_stages, signal=song, rate=rate) # Store non-feature results: for stage in pre_stages: mkey = f'measure_{stage}' # Log intensity measures: measures[mkey] = signals[stage][segment, ...].std(axis=0) if stage == 'conv': # Make shape (distances, kernels) for consistency: measures[mkey] = np.moveaxis(measures[mkey], 1, 0) if save_detailed: # Log optional snippet data: snippets[f'snip_{stage}'] = signals[stage] conv = signals['conv'] # Execute piecewise per threshold: for i, thresholds in enumerate(thresh_abs): # Execute piecewise per distance: for j in range(conv.shape[-1]): feat = sosfilter((conv[:, :, j] > thresholds).astype(float), rate, config['feat_fcut'], 'lp', padtype='fixed', padlen=config['padlen']) # Log intensity measure: measure = feat[segment, ...].mean(axis=0) measures['measure_feat'][j, :, i] = measure if save_detailed: # Log optional snippet data: snippets['snip_feat'][:, :, j, i] = feat # # Log intensity measure, ensuring shape (distances, kernels, thresholds): # measures['measure_feat'][:, :, i] = np.moveaxis(feat[segment, ...].mean(axis=0), 1, 0) # if save_detailed: # # Log optional snippet data: # snippets['snip_feat'][:, :, :, i] = feat # thresholds = thresholds[None, :, None] # embed() # # Finalize processing: # feat = sosfilter((signals['conv'] > thresholds).astype(float), # rate, config['feat_fcut'], 'lp', # padtype='fixed', padlen=config['padlen']) # if i == thresholds.shape[0] - 1: # fig, axes = plt.subplots(1, 8, sharex=True, sharey=True, figsize=(16, 9)) # for j, ax in enumerate(axes): # ax.plot(time, feat[..., j]) # plt.show() # embed() # # Log intensity measure, ensuring shape (distances, kernels, thresholds): # measures['measure_feat'][:, :, i] = np.moveaxis(feat[segment, ...].mean(axis=0), 1, 0) # if save_detailed: # # Log optional snippet data: # snippets['snip_feat'][:, :, :, i] = feat # Save analysis results: if save_path is not None: data = dict( distances=distances, thresh_rel=thresh_rel, thresh_abs=thresh_abs, ) data.update(measures) if save_detailed: data.update(snippets) save_data(save_path + name, data, config, overwrite=True) print('Done.') embed()