From 96e4b0b2c544aad0b7daa144b5ae14864b97b91c Mon Sep 17 00:00:00 2001 From: Jan Grewe Date: Mon, 10 Feb 2025 11:08:02 +0100 Subject: [PATCH] [trackingdata] improvements, some docstrings --- fixtracks/utils/trackingdata.py | 126 ++++++++++++++++++++++++++++++-- 1 file changed, 121 insertions(+), 5 deletions(-) diff --git a/fixtracks/utils/trackingdata.py b/fixtracks/utils/trackingdata.py index 9853938..20d7183 100644 --- a/fixtracks/utils/trackingdata.py +++ b/fixtracks/utils/trackingdata.py @@ -1,10 +1,10 @@ import pickle import logging import numpy as np +import pandas as pd from PySide6.QtCore import QObject - class TrackingData(QObject): def __init__(self, parent=None): super().__init__(parent) @@ -62,12 +62,39 @@ class TrackingData(QObject): return self._data[col][self._indices] def setUserSelection(self, ids): + """ + Set the user selections. That is, e.g. when the user selected a number of ids. + Parameters + ---------- + ids : array-like + An array-like object containing the IDs to be set as user selections. + The IDs will be converted to integers. + """ self._user_selections = ids.astype(int) - def assignUserSelection(self, track_id): + def assignUserSelection(self, track_id:int)-> None: + """Assign a new track_id to the user-selected detections + + Parameters + ---------- + track_id : int + The new track id for the user-selected detections + """ self._data["track"][self._user_selections] = track_id def assignTracks(self, tracks): + """assignTracks _summary_ + + Parameters + ---------- + tracks : _type_ + _description_ + + Returns + ------- + _type_ + _description_ + """ if len(tracks) != self.numDetections: logging.error("DataController: Size of passed tracks does not match data!") return @@ -87,8 +114,54 @@ class TrackingData(QObject): return self._data["keypoints"][0].shape[0] def coordinates(self): - return np.stack(self._data["keypoints"]).astype(np.float32) + """ + Returns the coordinates of all keypoints as a NumPy array. + + Returns: + np.ndarray: A NumPy array of shape (N, M, 2) where N is the number of detections, + and M is number of keypoints + """ + return np.stack(self._data["keypoints"]).astype(np.float32) + def keypointScores(self): + """ + Returns the keypoint scores as a NumPy array of type float32. + + Returns + ------- + numpy.ndarray + A NumPy array of type float32 containing the keypoint scores of the shape (N, M) + with N the number of detections and M the number of keypoints. + """ + return np.stack(self._data["keypoint_score"]).astype(np.float32) + + def centerOfGravity(self, threshold=0.8): + """ + Calculate the center of gravity of keypoints weighted by their scores. Ignores keypoints that have a score + less than threshold. + + Parameters: + ----------- + threshold: float + keypoints with a score less than threshold are ignored + + Returns: + -------- + np.ndarray: + A NumPy array of shape (N, 2) containing the center of gravity for each detection. + """ + scores = self.keypointScores() + scores[scores < threshold] = 0.0 + weighted_coords = self.coordinates() * scores[:, :, np.newaxis] + sum_scores = np.sum(scores, axis=1, keepdims=True) + center_of_gravity = np.sum(weighted_coords, axis=1) / sum_scores + return center_of_gravity + + def __getitem__(self, key): + return self._data[key] + + # def __setitem__(self, key, value): + # self._data[key] = value """ self._data.setSelectionRange("index", 0, self._data.numDetections) self._data.assignTracks(tracks) @@ -99,22 +172,65 @@ class TrackingData(QObject): def main(): import pandas as pd from IPython import embed + import matplotlib.pyplot as plt from fixtracks.info import PACKAGE_ROOT + from scipy.spatial.distance import cdist def as_dict(df:pd.DataFrame): d = {c: df[c].values for c in df.columns} d["index"] = df.index.values return d + + def neighborDistances(x, n=5, symmetric=True): + pad_shape = list(x.shape) + pad_shape[0] = 5 + pad = np.zeros(pad_shape) + if symmetric: + padded_x = np.vstack((pad, x, pad)) + else: + padded_x = np.vstack((pad, x)) + dists = np.zeros((padded_x.shape[0], 2*n)) + count = 0 + r = range(-n, n+1) if symmetric else range(-n, 0) + for i in r: + if i == 0: + continue + shifted_x = np.roll(padded_x, i) + dists[:, count] = np.sqrt(np.sum((padded_x - shifted_x)**2, axis=1)) + count += 1 + return dists + datafile = PACKAGE_ROOT / "data/merged_small.pkl" with open(datafile, "rb") as f: df = pickle.load(f) data = TrackingData() data.setData(as_dict(df)) + all_cogs = data.centerOfGravity() + tracks = data["track"] + cogs = all_cogs[tracks==1] + all_dists = neighborDistances(cogs, 2, False) + plt.hist(all_dists[1:, 0], bins=1000) + print(np.percentile(all_dists[1:, 0], 99)) + print(np.percentile(all_dists[1:, 0], 1)) + plt.gca().set_xscale("log") + plt.gca().set_yscale("log") + # plt.hist(all_dists[1:, 1], bins=100) + plt.show() + # def compute_neighbor_distances(cogs, window=10): + # distances = [] + # for i in range(len(cogs)): + # start = max(0, i - window) + # stop = min(len(cogs), i + window + 1) + # neighbors = cogs[start:stop] + # dists = cdist([cogs[i]], neighbors)[0] + # distances.append(dists) + # return distances + # print("estimating neighorhood distances") + # neighbor_distances = compute_neighbor_distances(cogs) embed() - - pass + if __name__ == "__main__":