fixtracks/fixtracks/utils/trackingdata.py

315 lines
11 KiB
Python

import pickle
import logging
import numpy as np
import pandas as pd
from PySide6.QtCore import QObject
class TrackingData(QObject):
def __init__(self, parent=None):
super().__init__(parent)
self._data = None
self._columns = []
self._start = 0
self._stop = 0
self._indices = None
self._selection_column = None
self._user_selections = None
def setData(self, datadict):
assert isinstance(datadict, dict)
self._data = datadict
self._data["userlabeled"] = np.zeros_like(self["frame"], dtype=bool)
self._columns = [k for k in self._data.keys()]
@property
def data(self):
return self._data
@property
def columns(self):
return self._columns
def max(self, col):
if col in self.columns:
return np.max(self._data[col])
else:
logging.error("Column %s not in dictionary", col)
return np.nan
@property
def numDetections(self):
return self._data["track"].shape[0]
@property
def selectionRange(self):
return self._start, self._stop
@property
def selectionRangeColumn(self):
return self._selection_column
@property
def selectionIndices(self):
return self._indices
def setSelectionRange(self, col, start, stop):
logging.debug("Trackingdata: set selection range based on column %s to %.2f - %.2f", col, start, stop)
self._start = start
self._stop = stop
self._selection_column = col
self._indices = np.where((self._data[col] >= self._start) & (self._data[col] < self._stop))[0]
def selectedData(self, col):
return self[col][self._indices]
def setUserSelection(self, ids):
"""
Set the user selections. That is, e.g. when the user selected a number of detection ids (aka the index of the original data frame entries).
Parameters
----------
ids : array-like
An array-like object containing the IDs to be set as user selections.
The IDs will be converted to integers.
"""
self._user_selections = ids.astype(int)
def assignUserSelection(self, track_id:int, userFlag:bool=True)-> None:
"""Assign a new track_id to the user-selected detections
Parameters
----------
track_id : int
The new track id for the user-selected detections
userFlag : bool
Should the "userlabeled" state of the detections be set to True or False?
"""
self["track"][self._user_selections] = track_id
self.setAssignmentStatus(userFlag)
def setAssignmentStatus(self, isTrue: bool):
logging.debug("TrackingData:Re-setting assignment status of user selected data to %s", str(isTrue))
self["userlabeled"][self._user_selections] = isTrue
def revertAssignmentStatus(self):
logging.debug("TrackingData:Un-setting assignment status of all data!")
self["userlabeled"][:] = False
def revertTrackAssignments(self):
logging.debug("TrackingData: Reverting all track assignments!")
self["track"][:] = -1
def deleteDetections(self):
# from IPython import embed
# if self._user_selections is not None:
# ids = self._user_selections
# for c in self.columns:
# pass
# embed()
pass
def assignTracks(self, tracks:np.ndarray):
"""assigns the given tracks to the user-selected detections. If the sizes of
provided tracks and the user selection do not match and error is logged and the tracks are not set.
Parameters
----------
tracks : np.ndarray
The track information.
Returns
-------
None
"""
if len(tracks) != self.numDetections:
logging.error("Trackingdata: Size of passed tracks does not match data!")
return
self._data["track"] = tracks
def save(self, filename):
export_columns = self._columns.copy()
export_columns.remove("index")
dictionary = {c: self._data[c] for c in export_columns}
df = pd.DataFrame(dictionary, index=self._data["index"])
with open(filename, 'wb') as f:
pickle.dump(df, f)
def numKeypoints(self):
if len(self._data["keypoints"]) == 0:
return 0
return self._data["keypoints"][0].shape[0]
def coordinates(self, selection=False):
"""
Returns the coordinates of all keypoints as a NumPy array.
Returns:
np.ndarray: A NumPy array of shape (N, M, 2) where N is the number of detections,
and M is number of keypoints
"""
if selection:
return np.stack(self._data["keypoints"][self._start:self._stop, :, :]).astype(np.float32)
else:
return np.stack(self._data["keypoints"]).astype(np.float32)
def keypointScores(self):
"""
Returns the keypoint scores as a NumPy array of type float32.
Returns
-------
numpy.ndarray
A NumPy array of type float32 containing the keypoint scores of the shape (N, M)
with N the number of detections and M the number of keypoints.
"""
return np.stack(self._data["keypoint_score"]).astype(np.float32)
def centerOfGravity(self, threshold=0.8):
"""
Calculate the center of gravity of keypoints weighted by their scores. Ignores keypoints that have a score
less than threshold.
Parameters:
-----------
threshold: float
keypoints with a score less than threshold are ignored
Returns:
--------
np.ndarray:
A NumPy array of shape (N, 2) containing the center of gravity for each detection.
"""
scores = self.keypointScores()
scores[scores < threshold] = 0.0
weighted_coords = self.coordinates() * scores[:, :, np.newaxis]
sum_scores = np.sum(scores, axis=1, keepdims=True)
center_of_gravity = np.sum(weighted_coords, axis=1) / sum_scores
return center_of_gravity
def animalLength(self, bodyaxis=None):
if bodyaxis is None:
bodyaxis = [0, 1, 2, 5]
bodycoords = self.coordinates()[:, bodyaxis, :]
lengths = np.sum(np.sqrt(np.sum(np.diff(bodycoords, axis=1)**2, axis=2)), axis=1)
return lengths
def orientation(self, head_node=0, tail_node=5):
bodycoords = self.coordinates()[:, [head_node, tail_node], :]
vectors = bodycoords[:, 1, :] - bodycoords[:, 0, :]
orientations = np.arctan2(vectors[:, 0], vectors[:, 1]) * 180 / np.pi
orientations[orientations < 0] += 360
return orientations
def bendedness(self, bodyaxis=None):
"""
Calculate the bendedness of the body axis.
Parameters
----------
bodyaxis : list of int, optional
Indices of the body axis coordinates to consider. If None, defaults to [0, 1, 2, 5].
Returns
-------
numpy.ndarray
Array of mean absolute deviations of the body axis points from the head-tail vector.
"""
if bodyaxis is None:
bodyaxis = [0, 1, 2, 5]
bodycoords = self.coordinates()[:, bodyaxis, :]
bodycoords = np.concat((bodycoords, np.zeros((bodycoords.shape[0], len(bodyaxis), 1))), axis=2)
head_tail_vector = bodycoords[:, -1, :] - bodycoords[:, 0, :]
point_axis_vector = bodycoords[:,:,:] - bodycoords[:, 0, :][:,np.newaxis,:]
htv = head_tail_vector[:,np.newaxis, :]
# Pythagoras, length of head- tail connection
head_tail_length = np.linalg.norm(head_tail_vector, axis=1, keepdims=True)
deviations = np.cross(htv, point_axis_vector)[:,:,-1] / head_tail_length
deviations = np.mean(np.abs(deviations), axis=1)
return deviations
def __getitem__(self, key):
return self._data[key]
def main():
import pandas as pd
from IPython import embed
import matplotlib.pyplot as plt
from fixtracks.info import PACKAGE_ROOT
logging.basicConfig(level=logging.DEBUG, force=True)
def as_dict(df:pd.DataFrame):
d = {c: df[c].values for c in df.columns}
d["index"] = df.index.values
return d
def neighborDistances(x, n=5, symmetric=True):
pad_shape = list(x.shape)
pad_shape[0] = 5
pad = np.zeros(pad_shape)
if symmetric:
padded_x = np.vstack((pad, x, pad))
else:
padded_x = np.vstack((pad, x))
dists = np.zeros((padded_x.shape[0], 2*n))
count = 0
r = range(-n, n+1) if symmetric else range(-n, 0)
for i in r:
if i == 0:
continue
shifted_x = np.roll(padded_x, i)
dists[:, count] = np.sqrt(np.sum((padded_x - shifted_x)**2, axis=1))
count += 1
return dists
def plot_skeleton(positions):
skeleton_grid = [(0, 1), (1, 2), (1, 3), (1, 4), (2, 5)]
colors = ["tab:red"]
colors.extend(["tab:blue"]*5)
plt.scatter(positions[:, 0], positions[:, 1], c=colors)
for si, ei in skeleton_grid:
plt.plot([positions[si, 0], positions[ei, 0]],
[positions[si, 1], positions[ei, 1]], color="tab:green")
datafile = PACKAGE_ROOT / "data/merged_small.pkl"
with open(datafile, "rb") as f:
df = pickle.load(f)
data = TrackingData()
data.setData(as_dict(df))
all_cogs = data.centerOfGravity()
orientations = data.orientation()
lengths = data.animalLength()
frames = data["frame"]
tracks = data["track"]
bendedness = data.bendedness()
positions = data.coordinates()[[160388, 160389]]
embed()
tracks = data["track"]
cogs = all_cogs[tracks==1]
all_dists = neighborDistances(cogs, 2, False)
# plt.hist(all_dists[1:, 0], bins=1000)
# print(np.percentile(all_dists[1:, 0], 99))
# print(np.percentile(all_dists[1:, 0], 1))
# plt.gca().set_xscale("log")
# plt.gca().set_yscale("log")
# plt.hist(all_dists[1:, 1], bins=100)
# plt.show()
# def compute_neighbor_distances(cogs, window=10):
# distances = []
# for i in range(len(cogs)):
# start = max(0, i - window)
# stop = min(len(cogs), i + window + 1)
# neighbors = cogs[start:stop]
# dists = cdist([cogs[i]], neighbors)[0]
# distances.append(dists)
# return distances
# print("estimating neighorhood distances")
# neighbor_distances = compute_neighbor_distances(cogs)
embed()
if __name__ == "__main__":
main()