Added complete "rect-lp" analysis except figure. Added multiple appendix figs. Overhauled normalization options across all condense scripts. Co-authored-by: Copilot <copilot@github.com>
143 lines
5.1 KiB
Python
143 lines
5.1 KiB
Python
import numpy as np
|
|
from thunderhopper.filetools import search_files, crop_paths
|
|
from thunderhopper.modeltools import load_data, save_data
|
|
from IPython import embed
|
|
|
|
def sort_files_by_rec(paths, sources=['JJ', 'SLO']):
|
|
# Separate by source:
|
|
sorted_paths = {}
|
|
for source in sources:
|
|
|
|
# Check for any source-specific song files:
|
|
source_paths = [path for path in paths if source in path]
|
|
if not source_paths:
|
|
continue
|
|
|
|
# Separate by recording:
|
|
sorted_paths[source] = {}
|
|
for path, name in zip(source_paths, crop_paths(source_paths)):
|
|
|
|
# Find global time stamp behind source tag:
|
|
ind = name.find(source) + len(source) + 1
|
|
time_stamps = name[ind:].split('_')[-1]
|
|
global_time = '-'.join(time_stamps.split('-')[:2])
|
|
|
|
if global_time in sorted_paths[source]:
|
|
# Found existing time stamp (known recording):
|
|
sorted_paths[source][global_time].append(path)
|
|
else:
|
|
# Found new time stamp (novel recording):
|
|
sorted_paths[source][global_time] = [path]
|
|
|
|
# Re-sort song files by recording only (discarding source separation):
|
|
flat_sorted = []
|
|
for source_paths in sorted_paths.values():
|
|
for rec_paths in source_paths.values():
|
|
flat_sorted.append(rec_paths)
|
|
return flat_sorted
|
|
|
|
|
|
# GENERAL SETTINGS:
|
|
target_species = ['Pseudochorthippus_parallelus']
|
|
mode = ['song', 'noise'][0]
|
|
stages = ['raw', 'filt', 'env', 'log', 'inv', 'conv', 'feat']
|
|
search_path = f'../data/inv/field/{mode}/'
|
|
ref_path = f'../data/inv/field/ref_measures.npz'
|
|
save_path = f'../data/inv/field/{mode}/condensed/'
|
|
sources = [
|
|
'JJ',
|
|
'SLO',
|
|
]
|
|
|
|
# ANALYSIS SETTINGS:
|
|
normalization = 'none'
|
|
if mode == 'song':
|
|
normalization = [
|
|
'none',
|
|
'min',
|
|
'max',
|
|
'base',
|
|
'range'
|
|
][1]
|
|
suffix = dict(
|
|
none='_unnormed',
|
|
min='_norm-min',
|
|
max='_norm-max',
|
|
base='_norm-base',
|
|
range='_norm-range'
|
|
)[normalization]
|
|
if normalization == 'base':
|
|
ref_data = dict(np.load(ref_path))
|
|
|
|
# EXECUTION:
|
|
for i, species in enumerate(target_species):
|
|
print(f'Processing {species}')
|
|
|
|
# Fetch all species-specific song files:
|
|
all_paths = search_files(species, excl='merged_noise', ext='npz', dir=search_path)
|
|
if not all_paths:
|
|
continue
|
|
|
|
# Sort song files by recording (one or more per source):
|
|
sorted_paths = sort_files_by_rec(all_paths, sources)
|
|
|
|
# Condense across song files per recording:
|
|
for j, rec_paths in enumerate(sorted_paths):
|
|
for k, path in enumerate(rec_paths):
|
|
|
|
# Load invariance data:
|
|
data, config = load_data(path, 'distances', 'measure')
|
|
|
|
if k == 0:
|
|
# Prepare song file-specific storage:
|
|
file_data = {}
|
|
for stage in stages:
|
|
shape = data[f'measure_{stage}'].shape + (len(rec_paths),)
|
|
file_data[stage] = np.zeros(shape, dtype=float)
|
|
if j == 0:
|
|
# Prepare recording-specific storage:
|
|
rec_mean, rec_sd = {}, {}
|
|
for stage in stages:
|
|
shape = data[f'measure_{stage}'].shape + (len(sorted_paths),)
|
|
rec_mean[f'mean_{stage}'] = np.zeros(shape, dtype=float)
|
|
rec_sd[f'sd_{stage}'] = np.zeros(shape, dtype=float)
|
|
|
|
# Log song file data:
|
|
for stage in stages:
|
|
mkey = f'measure_{stage}'
|
|
|
|
if normalization == 'min':
|
|
# Minimum normalization:
|
|
data[mkey] /= data[mkey].min(axis=0, keepdims=True)
|
|
elif normalization == 'max':
|
|
# Maximum normalization:
|
|
data[mkey] /= data[mkey].max(axis=0, keepdims=True)
|
|
elif normalization == 'base':
|
|
# Noise baseline normalization:
|
|
data[mkey] /= ref_data[stage]
|
|
# data[mkey] /= data[mkey][0]
|
|
elif normalization == 'range':
|
|
# Min-max normalization:
|
|
min_measure = data[mkey].min(axis=0, keepdims=True)
|
|
max_measure = data[mkey].max(axis=0, keepdims=True)
|
|
data[mkey] = (data[mkey] - min_measure) / (max_measure - min_measure)
|
|
|
|
file_data[stage][..., k] = data[mkey]
|
|
|
|
# Get recording statistics:
|
|
for stage in stages:
|
|
rec_mean[f'mean_{stage}'][..., j] = np.nanmean(file_data[stage], axis=-1)
|
|
rec_sd[f'sd_{stage}'][..., j] = np.nanstd(file_data[stage], axis=-1)
|
|
if len(sorted_paths) == 1:
|
|
# Prune recording dimension for single recording:
|
|
rec_mean[f'mean_{stage}'] = rec_mean[f'mean_{stage}'][..., 0]
|
|
rec_sd[f'sd_{stage}'] = rec_sd[f'sd_{stage}'][..., 0]
|
|
|
|
# Save condensed recording data:
|
|
archive = dict(distances=data['distances'])
|
|
archive.update(rec_mean)
|
|
archive.update(rec_sd)
|
|
save_data(save_path + species + suffix, archive, config, overwrite=True)
|
|
|
|
print('Done.')
|