214 lines
7.9 KiB
Python
214 lines
7.9 KiB
Python
import numpy as np
|
|
from scipy.stats import gaussian_kde
|
|
from itertools import product
|
|
from thunderhopper.filetools import crop_paths
|
|
from IPython import embed
|
|
|
|
def shorten_species(name):
|
|
genus, species = name.split('_')
|
|
return genus[0] + '. ' + species
|
|
|
|
def unsort_unique(array):
|
|
values, inds = np.unique(array, return_index=True)
|
|
return values[np.argsort(inds)]
|
|
|
|
def draw_noise_segment(noise, n):
|
|
rng = np.random.default_rng()
|
|
start = rng.integers(0, noise.shape[0] - n, endpoint=True)
|
|
return np.take(noise, np.arange(start, start + n), axis=0)
|
|
|
|
def divide_by_zero(num, denom, replace=np.nan):
|
|
with np.errstate(divide='ignore', invalid='ignore'):
|
|
result = np.true_divide(num, denom)
|
|
result[~np.isfinite(result)] = replace
|
|
return result
|
|
|
|
def exclude_zero_scale(data, keys=None, combis=None):
|
|
inds = np.nonzero(data['scales'] > 0)[0]
|
|
data['scales'] = data['scales'][inds]
|
|
if keys is not None:
|
|
for key in keys:
|
|
data[key] = data[key][inds, ...]
|
|
if combis is not None:
|
|
for key1, key2 in product(*combis):
|
|
key = f'{key1}_{key2}'
|
|
data[key] = data[key][inds, ...]
|
|
return data
|
|
|
|
def reduce_kernel_set(data, inds, keys=None, combis=None):
|
|
if keys is not None:
|
|
for key in keys:
|
|
data[key] = data[key][:, inds, ...]
|
|
if combis is not None:
|
|
for key1, key2 in product(*combis):
|
|
key = f'{key1}_{key2}'
|
|
data[key] = data[key][:, inds, ...]
|
|
return data
|
|
|
|
def sort_files_by_rec(paths, sources=['BM04', 'BM93', 'DJN', 'GBC', 'FTN']):
|
|
# Separate by source:
|
|
sorted_paths = {}
|
|
for source in sources:
|
|
|
|
# Check for any source-specific song files:
|
|
source_paths = [path for path in paths if source in path]
|
|
if not source_paths:
|
|
continue
|
|
|
|
# Separate by recording:
|
|
sorted_paths[source] = [[]]
|
|
for path, name in zip(source_paths, crop_paths(source_paths)):
|
|
|
|
# Find numerical ID behind source tag:
|
|
id_ind = name.find(source) + len(source) + 1
|
|
# Get segment where sub-ID would be:
|
|
sub_id = name[id_ind:].split('-')[1]
|
|
if 's' in sub_id:
|
|
# Found time stamp (single recording):
|
|
sorted_paths[source][0].append(path)
|
|
continue
|
|
sub_id = int(sub_id)
|
|
# Found sub-ID (multiple recordings):
|
|
if sub_id > len(sorted_paths[source]):
|
|
# Open new recording-specific slot:
|
|
sorted_paths[source].append([])
|
|
sorted_paths[source][sub_id - 1].append(path)
|
|
|
|
# Re-sort song files by recording only (discarding source separation):
|
|
sorted_paths = [path for paths in sorted_paths.values() for path in paths]
|
|
return sorted_paths
|
|
|
|
def get_thresholds(data=None, path=None, perc=None, factor=None,
|
|
direct=False, which=None):
|
|
|
|
def get_inds(nearest, which):
|
|
if which == 'floor':
|
|
nearest[nearest < 0] = np.inf
|
|
return nearest.argmin(axis=0)
|
|
elif which == 'ceil':
|
|
nearest[nearest > 0] = -np.inf
|
|
return nearest.argmax(axis=0)
|
|
return np.abs(nearest).argmin(axis=0)
|
|
|
|
if data is None:
|
|
# Load threshold data:
|
|
data = dict(np.load(path))
|
|
|
|
# From SD scaling factor:
|
|
if factor is not None:
|
|
if direct:
|
|
# Scale SDs directly by factor:
|
|
return data['sds'] * factor, factor, None
|
|
|
|
# Link to supra-thresh proportion:
|
|
nearest = np.atleast_2d(factor) - data['factors'][:, None]
|
|
inds = get_inds(nearest, which)
|
|
factors = data['factors'][inds]
|
|
return data['sds'] * factors, factors, data['percs'][inds, :]
|
|
|
|
# From supra-thresh proportion:
|
|
nearest = perc - data['percs']
|
|
inds = get_inds(nearest, which)
|
|
factors = data['factors'][inds]
|
|
return data['sds'] * factors, factors, data['percs'][inds, :]
|
|
|
|
def y_dist(ax, values, edges=None, nbins=50, limits=None, log=False, cap=0.01,
|
|
density=True, line_kwargs={}, fill_kwargs={}):
|
|
# Get distribution:
|
|
if edges is None:
|
|
if limits is None:
|
|
limits = np.array([np.nanmin(values), np.nanmax(values)])
|
|
limits += np.array([-1.1, 1.1]) * (limits[1] - limits[0])
|
|
if log:
|
|
limits[0] = max(limits[0], cap)
|
|
edges = np.geomspace(*limits, nbins + 1)
|
|
else:
|
|
edges = np.linspace(*limits, nbins + 1)
|
|
centers = edges[:-1] + np.diff(edges) / 2
|
|
pdf, _ = np.histogram(values, bins=edges, density=density)
|
|
|
|
# Plot distribution:
|
|
fill_handle = ax.fill_betweenx(centers, pdf.min(), pdf, **fill_kwargs)
|
|
line_handle = ax.plot(pdf, centers, **line_kwargs)[0]
|
|
ax.set_xlim(0, pdf.max() * 1.05)
|
|
return pdf, centers, line_handle, fill_handle
|
|
|
|
def x_dist(ax, values, edges=None, nbins=50, limits=None, log=False, cap=0.01,
|
|
density=True, line_kwargs={}, fill_kwargs={}):
|
|
# Get distribution:
|
|
if edges is None:
|
|
if limits is None:
|
|
limits = np.array([np.nanmin(values), np.nanmax(values)])
|
|
limits += np.array([-1.1, 1.1]) * (limits[1] - limits[0])
|
|
if log:
|
|
limits[0] = max(limits[0], cap)
|
|
edges = np.geomspace(*limits, nbins + 1)
|
|
else:
|
|
edges = np.linspace(*limits, nbins + 1)
|
|
centers = edges[:-1] + np.diff(edges) / 2
|
|
pdf, _ = np.histogram(values, bins=edges, density=density)
|
|
|
|
# Plot distribution:
|
|
fill_handle = ax.fill_between(centers, pdf.min(), pdf, **fill_kwargs)
|
|
line_handle = ax.plot(centers, pdf, **line_kwargs)[0]
|
|
ax.set_ylim(0, pdf.max() * 1.05)
|
|
return pdf, centers,line_handle, fill_handle
|
|
|
|
def get_histogram(data, edges=None, nbins=50, pad=0.1, shared=True):
|
|
if edges is None:
|
|
axis = None if shared else 0
|
|
min_data, max_data = data.min(axis=axis), data.max(axis=axis)
|
|
pad = pad * (max_data - min_data)
|
|
if shared or data.ndim == 1:
|
|
edges = np.linspace(min_data - pad, max_data + pad, nbins + 1)
|
|
else:
|
|
edges = np.zeros((nbins + 1, data.shape[1]))
|
|
for i, mini, maxi, padi in enumerate(zip(min_data, max_data, pad)):
|
|
edges[:, i] = np.linspace(mini - padi, maxi + padi, nbins + 1)
|
|
|
|
centers = edges[:-1] + np.diff(edges, axis=0) / 2
|
|
if data.ndim == 1:
|
|
hists, _ = np.histogram(data, bins=edges, density=True)
|
|
else:
|
|
hists = np.zeros((nbins, data.shape[1]))
|
|
for i in range(data.shape[1]):
|
|
bins = edges if shared else edges[:, i]
|
|
hists[:, i], _ = np.histogram(data[:, i], bins=bins, density=True)
|
|
return hists, centers
|
|
|
|
def get_kde(data, sigma, axis=None, n=1000, pad=10):
|
|
if axis is None:
|
|
axis = np.linspace(data.min() - pad * sigma, data.max() + pad * sigma, n)
|
|
pdf = gaussian_kde(data, sigma)(axis)
|
|
return pdf, axis
|
|
|
|
def get_saturation(sigmoid, low=0.05, high=0.95, first=True, last=True,
|
|
condense=None):
|
|
|
|
unpack_inds = lambda inds: np.nan if inds.size == 0 else inds[-1]
|
|
|
|
if condense == 'norm' and sigmoid.ndim == 2:
|
|
sigmoid = np.linalg.norm(sigmoid, axis=1)
|
|
|
|
min_value = sigmoid[0] if first else np.nanmin(sigmoid, axis=0)
|
|
max_value = sigmoid[-1] if last else np.nanmax(sigmoid, axis=0)
|
|
|
|
span = max_value - min_value
|
|
low_value = min_value + low * span
|
|
high_value = min_value + high * span
|
|
|
|
low_mask = sigmoid <= low_value
|
|
high_mask = sigmoid <= high_value
|
|
if sigmoid.ndim == 1:
|
|
low_ind = unpack_inds(np.nonzero(low_mask)[0])
|
|
high_ind = unpack_inds(np.nonzero(high_mask)[0])
|
|
elif condense == 'all':
|
|
low_ind = unpack_inds(np.nonzero(low_mask.all(axis=1))[0])
|
|
high_ind = unpack_inds(np.nonzero(high_mask.all(axis=1))[0])
|
|
else:
|
|
low_ind, high_ind = [], []
|
|
for i in range(sigmoid.shape[1]):
|
|
low_ind.append(unpack_inds(np.nonzero(low_mask[:, i])[0]))
|
|
high_ind.append(unpack_inds(np.nonzero(high_mask[:, i])[0]))
|
|
return low_ind, high_ind
|