import glob import numpy as np from thunderhopper.modeltools import load_data, save_data from thunderhopper.filetools import crop_paths from thunderhopper.filters import sosfilter from IPython import embed # GENERAL SETTINGS: target = 'Omocestus_rufipes' data_paths = glob.glob(f'../data/processed/{target}*.npz') save_path = '../data/inv/thresh_lp/' # ANALYSIS SETTINGS: add_noise = False threshold = 0.5 example_scales = np.array([threshold, 0.6, 1, 10, 50, 100]) scales = np.linspace(threshold + 0.1, 100, 100) if not add_noise: example_scales = example_scales[example_scales > threshold] scales = np.unique(np.concatenate((scales, example_scales))) # EXECUTION: for data_path, name in zip(data_paths, crop_paths(data_paths)): print(f'Processing {name}') # Get normalized pure-song kernel responses: data, config = load_data(data_path, files='conv') song, rate = data['conv'], data['conv_rate'] song /= song.std(axis=0) # Prepare kernel-specific thresholds: threshold *= song.max(axis=0, keepdims=True) if add_noise: # Get normalized noise: rng = np.random.default_rng() noise = rng.normal(size=(song.shape[0], 1)) noise /= noise.std() # Prepare snippet storage: shape = song.shape + (example_scales.size,) conv = np.zeros(shape, dtype=float) bi = np.zeros(shape, dtype=float) feat = np.zeros(shape, dtype=float) # Prepare measure storage: shape = (scales.size, song.shape[1]) measure_conv = np.zeros(shape, dtype=float) measure_feat = np.zeros(shape, dtype=float) # Execute piecewise: for i, scale in enumerate(scales): print('Simulating scale ', scale) # Rescale song component: scaled_conv = song * scale if add_noise: # Add noise: scaled_conv += noise # Process mixture: scaled_bi = (scaled_conv > threshold).astype(float) scaled_feat = sosfilter(scaled_bi, rate, config['feat_fcut'], 'lp', padtype='fixed', padlen=config['padlen']) # Log snippet data: if scale in example_scales: scale_ind = np.nonzero(example_scales == scale)[0][0] conv[:, :, scale_ind] = scaled_conv bi[:, :, scale_ind] = scaled_bi feat[:, :, scale_ind] = scaled_feat # Get "intensity measure" per stage: measure_conv[i] = scaled_conv.std(axis=0) measure_feat[i] = scaled_feat.mean(axis=0) # Relate to smallest scale: base_ind = np.argmin(scales) measure_conv /= measure_conv[base_ind, :] measure_feat /= measure_feat[base_ind, :] # Condense measures across kernels: spread_conv = np.zeros((2, scales.size)) spread_conv[0] = np.percentile(measure_conv, 25, axis=1) spread_conv[1] = np.percentile(measure_conv, 75, axis=1) measure_conv = np.median(measure_conv, axis=1) spread_feat = np.zeros((2, scales.size)) spread_feat[0] = np.percentile(measure_feat, 25, axis=1) spread_feat[1] = np.percentile(measure_feat, 75, axis=1) measure_feat = np.median(measure_feat, axis=1) # Save analysis results: if save_path is not None: data = dict( scales=scales, example_scales=example_scales, conv=conv, bi=bi, feat=feat, measure_conv=measure_conv, spread_conv=spread_conv, measure_feat=measure_feat, spread_feat=spread_feat, ) file_name = save_path + name if add_noise: file_name += '_noise' save_data(file_name, data, config, overwrite=True) print('Done.') embed()