158 lines
6.0 KiB
Python
158 lines
6.0 KiB
Python
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
from thunderhopper.modeltools import load_data, save_data
|
|
from thunderhopper.filetools import search_files, crop_paths
|
|
from thunderhopper.filtertools import find_kern_specs
|
|
from thunderhopper.filters import sosfilter
|
|
from thunderhopper.model import convolve_kernels, process_signal
|
|
from misc_functions import draw_noise_segment
|
|
from IPython import embed
|
|
|
|
# GENERAL SETTINGS:
|
|
target_species = [
|
|
# 'Chorthippus_biguttulus',
|
|
'Chorthippus_mollis',
|
|
# 'Chrysochraon_dispar',
|
|
# 'Euchorthippus_declivus',
|
|
'Gomphocerippus_rufus',
|
|
# 'Omocestus_rufipes',
|
|
# 'Pseudochorthippus_parallelus',
|
|
][1]
|
|
example_file = {
|
|
'Chorthippus_biguttulus': 'Chorthippus_biguttulus_GBC_94-17s73.1ms-19s977ms',
|
|
'Chorthippus_mollis': 'Chorthippus_mollis_DJN_41_T28C-46s4.58ms-1m15s697ms',
|
|
'Chrysochraon_dispar': 'Chrysochraon_dispar_DJN_26_T28C_DT-32s134ms-34s432ms',
|
|
'Euchorthippus_declivus': 'Euchorthippus_declivus_FTN_79-2s167ms-2s563ms',
|
|
'Gomphocerippus_rufus': 'Gomphocerippus_rufus_FTN_91-3-884ms-10s427ms',
|
|
'Omocestus_rufipes': 'Omocestus_rufipes_DJN_32-40s724ms-48s779ms',
|
|
'Pseudochorthippus_parallelus': 'Pseudochorthippus_parallelus_GBC_88-6s678ms-9s32.3ms'
|
|
}[target_species]
|
|
data_paths = search_files(target_species, dir='../data/processed/')
|
|
noise_path = '../data/processed/white_noise_sd-1.npz'
|
|
thresh_path = '../data/inv/short/thresholds.npz'
|
|
pre_stages = ['filt', 'env']
|
|
stages = pre_stages + ['inv', 'conv', 'feat']
|
|
save_path = '../data/inv/short/'
|
|
|
|
# ANALYSIS SETTINGS:
|
|
example_scales = np.array([0.1, 1, 10, 30, 100, 300])
|
|
scales = np.geomspace(0.01, 10000, 500)
|
|
scales = np.unique(np.concatenate(([0], scales, example_scales)))
|
|
thresh_rel = np.array([0, 0.5, 1, 1.5, 2, 2.5, 3])
|
|
|
|
# SUBSET SETTINGS:
|
|
kernels = None
|
|
types = None
|
|
sigmas = None
|
|
|
|
# PREPARATION:
|
|
pure_noise = np.load(noise_path)['raw']
|
|
thresh_data = dict(np.load(thresh_path))
|
|
thresh_abs = thresh_rel[:, None] * thresh_data['sds'][None, :]
|
|
|
|
# EXECUTION:
|
|
for data_path, name in zip(data_paths, crop_paths(data_paths)):
|
|
save_detailed = example_file in name
|
|
print(f'Processing {name}')
|
|
|
|
# Get song recording (prior to anything):
|
|
data, config = load_data(data_path, files='raw')
|
|
song, rate = data['raw'], config['rate']
|
|
|
|
# Reduce to kernel subset:
|
|
if any(var is not None for var in [kernels, types, sigmas]):
|
|
kern_inds = find_kern_specs(config['k_specs'], kernels, types, sigmas)
|
|
config['kernels'] = config['kernels'][:, kern_inds]
|
|
config['k_specs'] = config['k_specs'][kern_inds, :]
|
|
config['k_props'] = [config['k_props'][i] for i in kern_inds]
|
|
thresh_abs = thresh_abs[:, kern_inds]
|
|
|
|
# Get song segment to be analyzed:
|
|
time = np.arange(song.shape[0]) / rate
|
|
start, end = data['songs_0'].ravel()
|
|
segment = (time >= start) & (time <= end)
|
|
|
|
# Normalize song component:
|
|
song /= song[segment].std(axis=0)
|
|
|
|
# Get normalized noise component:
|
|
noise = draw_noise_segment(pure_noise, song.shape[0])
|
|
noise /= noise[segment].std()
|
|
|
|
# Prepare storage:
|
|
shape_low = (scales.size,)
|
|
shape_high = (scales.size, config['k_specs'].shape[0])
|
|
measures = dict(
|
|
measure_filt=np.zeros(shape_low, dtype=float),
|
|
measure_env=np.zeros(shape_low, dtype=float),
|
|
measure_inv=np.zeros(shape_low, dtype=float),
|
|
measure_conv=np.zeros(shape_high, dtype=float),
|
|
measure_feat=np.zeros(shape_high + (thresh_rel.size,), dtype=float)
|
|
)
|
|
if save_detailed:
|
|
# Prepare optional storage:
|
|
shape_low = (song.shape[0], example_scales.size)
|
|
shape_high = (song.shape[0], config['k_specs'].shape[0], example_scales.size)
|
|
snippets = dict(
|
|
snip_filt=np.zeros(shape_low, dtype=float),
|
|
snip_env=np.zeros(shape_low, dtype=float),
|
|
snip_inv=np.zeros(shape_low, dtype=float),
|
|
snip_conv=np.zeros(shape_high, dtype=float),
|
|
snip_feat=np.zeros(shape_high + (thresh_rel.size,), dtype=float)
|
|
)
|
|
|
|
# Execute piecewise:
|
|
for i, scale in enumerate(scales):
|
|
print('Simulating scale ', scale)
|
|
|
|
# Rescale song and add noise:
|
|
scaled = song * scale + noise
|
|
|
|
# Process mixture:
|
|
signals, rates = process_signal(config, returns=pre_stages,
|
|
signal=scaled, rate=rate)
|
|
|
|
# Process mixture further:
|
|
signals['inv'] = sosfilter(signals['env'], rate, config['inv_fcut'], 'hp',
|
|
padtype='constant', padlen=config['padlen'])
|
|
signals['conv'] = convolve_kernels(signals['inv'], config['kernels'], config['k_specs'])
|
|
|
|
# Store non-feature results:
|
|
for stage in stages[:-1]:
|
|
# Log intensity measures:
|
|
measures[f'measure_{stage}'][i] = signals[stage][segment, ...].std(axis=0)
|
|
|
|
# Log optional snippet data:
|
|
if save_detailed and scale in example_scales:
|
|
scale_ind = np.nonzero(example_scales == scale)[0][0]
|
|
snippets[f'snip_{stage}'][:, ..., scale_ind] = signals[stage]
|
|
|
|
# Execute piecewise again:
|
|
for j, thresholds in enumerate(thresh_abs):
|
|
# Finalize processing:
|
|
feat = sosfilter((signals['conv'] > thresholds).astype(float),
|
|
rate, config['feat_fcut'], 'lp',
|
|
padtype='fixed', padlen=config['padlen'])
|
|
|
|
# Log intensity measure:
|
|
measures['measure_feat'][i, :, j] = feat[segment, :].mean(axis=0)
|
|
|
|
# Log optional snippet data:
|
|
if save_detailed and scale in example_scales:
|
|
snippets['snip_feat'][:, :, scale_ind, j] = feat
|
|
|
|
# Save analysis results:
|
|
if save_path is not None:
|
|
data = dict(
|
|
scales=scales,
|
|
example_scales=example_scales,
|
|
thresh_rel=thresh_rel,
|
|
thresh_abs=thresh_abs,
|
|
)
|
|
data.update(measures)
|
|
if save_detailed:
|
|
data.update(snippets)
|
|
save_data(save_path + name, data, config, overwrite=True)
|
|
print('Done.')
|
|
embed()
|