[trackingdata] improvements, some docstrings

This commit is contained in:
Jan Grewe 2025-02-10 11:08:02 +01:00
parent 6244f7fdbe
commit 96e4b0b2c5

View File

@ -1,10 +1,10 @@
import pickle import pickle
import logging import logging
import numpy as np import numpy as np
import pandas as pd
from PySide6.QtCore import QObject from PySide6.QtCore import QObject
class TrackingData(QObject): class TrackingData(QObject):
def __init__(self, parent=None): def __init__(self, parent=None):
super().__init__(parent) super().__init__(parent)
@ -62,12 +62,39 @@ class TrackingData(QObject):
return self._data[col][self._indices] return self._data[col][self._indices]
def setUserSelection(self, ids): def setUserSelection(self, ids):
"""
Set the user selections. That is, e.g. when the user selected a number of ids.
Parameters
----------
ids : array-like
An array-like object containing the IDs to be set as user selections.
The IDs will be converted to integers.
"""
self._user_selections = ids.astype(int) self._user_selections = ids.astype(int)
def assignUserSelection(self, track_id): def assignUserSelection(self, track_id:int)-> None:
"""Assign a new track_id to the user-selected detections
Parameters
----------
track_id : int
The new track id for the user-selected detections
"""
self._data["track"][self._user_selections] = track_id self._data["track"][self._user_selections] = track_id
def assignTracks(self, tracks): def assignTracks(self, tracks):
"""assignTracks _summary_
Parameters
----------
tracks : _type_
_description_
Returns
-------
_type_
_description_
"""
if len(tracks) != self.numDetections: if len(tracks) != self.numDetections:
logging.error("DataController: Size of passed tracks does not match data!") logging.error("DataController: Size of passed tracks does not match data!")
return return
@ -87,8 +114,54 @@ class TrackingData(QObject):
return self._data["keypoints"][0].shape[0] return self._data["keypoints"][0].shape[0]
def coordinates(self): def coordinates(self):
"""
Returns the coordinates of all keypoints as a NumPy array.
Returns:
np.ndarray: A NumPy array of shape (N, M, 2) where N is the number of detections,
and M is number of keypoints
"""
return np.stack(self._data["keypoints"]).astype(np.float32) return np.stack(self._data["keypoints"]).astype(np.float32)
def keypointScores(self):
"""
Returns the keypoint scores as a NumPy array of type float32.
Returns
-------
numpy.ndarray
A NumPy array of type float32 containing the keypoint scores of the shape (N, M)
with N the number of detections and M the number of keypoints.
"""
return np.stack(self._data["keypoint_score"]).astype(np.float32)
def centerOfGravity(self, threshold=0.8):
"""
Calculate the center of gravity of keypoints weighted by their scores. Ignores keypoints that have a score
less than threshold.
Parameters:
-----------
threshold: float
keypoints with a score less than threshold are ignored
Returns:
--------
np.ndarray:
A NumPy array of shape (N, 2) containing the center of gravity for each detection.
"""
scores = self.keypointScores()
scores[scores < threshold] = 0.0
weighted_coords = self.coordinates() * scores[:, :, np.newaxis]
sum_scores = np.sum(scores, axis=1, keepdims=True)
center_of_gravity = np.sum(weighted_coords, axis=1) / sum_scores
return center_of_gravity
def __getitem__(self, key):
return self._data[key]
# def __setitem__(self, key, value):
# self._data[key] = value
""" """
self._data.setSelectionRange("index", 0, self._data.numDetections) self._data.setSelectionRange("index", 0, self._data.numDetections)
self._data.assignTracks(tracks) self._data.assignTracks(tracks)
@ -99,22 +172,65 @@ class TrackingData(QObject):
def main(): def main():
import pandas as pd import pandas as pd
from IPython import embed from IPython import embed
import matplotlib.pyplot as plt
from fixtracks.info import PACKAGE_ROOT from fixtracks.info import PACKAGE_ROOT
from scipy.spatial.distance import cdist
def as_dict(df:pd.DataFrame): def as_dict(df:pd.DataFrame):
d = {c: df[c].values for c in df.columns} d = {c: df[c].values for c in df.columns}
d["index"] = df.index.values d["index"] = df.index.values
return d return d
def neighborDistances(x, n=5, symmetric=True):
pad_shape = list(x.shape)
pad_shape[0] = 5
pad = np.zeros(pad_shape)
if symmetric:
padded_x = np.vstack((pad, x, pad))
else:
padded_x = np.vstack((pad, x))
dists = np.zeros((padded_x.shape[0], 2*n))
count = 0
r = range(-n, n+1) if symmetric else range(-n, 0)
for i in r:
if i == 0:
continue
shifted_x = np.roll(padded_x, i)
dists[:, count] = np.sqrt(np.sum((padded_x - shifted_x)**2, axis=1))
count += 1
return dists
datafile = PACKAGE_ROOT / "data/merged_small.pkl" datafile = PACKAGE_ROOT / "data/merged_small.pkl"
with open(datafile, "rb") as f: with open(datafile, "rb") as f:
df = pickle.load(f) df = pickle.load(f)
data = TrackingData() data = TrackingData()
data.setData(as_dict(df)) data.setData(as_dict(df))
all_cogs = data.centerOfGravity()
tracks = data["track"]
cogs = all_cogs[tracks==1]
all_dists = neighborDistances(cogs, 2, False)
plt.hist(all_dists[1:, 0], bins=1000)
print(np.percentile(all_dists[1:, 0], 99))
print(np.percentile(all_dists[1:, 0], 1))
plt.gca().set_xscale("log")
plt.gca().set_yscale("log")
# plt.hist(all_dists[1:, 1], bins=100)
plt.show()
# def compute_neighbor_distances(cogs, window=10):
# distances = []
# for i in range(len(cogs)):
# start = max(0, i - window)
# stop = min(len(cogs), i + window + 1)
# neighbors = cogs[start:stop]
# dists = cdist([cogs[i]], neighbors)[0]
# distances.append(dists)
# return distances
# print("estimating neighorhood distances")
# neighbor_distances = compute_neighbor_distances(cogs)
embed() embed()
pass
if __name__ == "__main__": if __name__ == "__main__":