From 430ee4fac7e62a73ebd3daa634792e397a74e10d Mon Sep 17 00:00:00 2001 From: Jan Grewe Date: Tue, 25 Feb 2025 18:45:37 +0100 Subject: [PATCH] [classifier] working but not really... --- fixtracks/widgets/classifier.py | 250 +++++++++++++++++++------------- 1 file changed, 151 insertions(+), 99 deletions(-) diff --git a/fixtracks/widgets/classifier.py b/fixtracks/widgets/classifier.py index c107d6f..406d114 100644 --- a/fixtracks/widgets/classifier.py +++ b/fixtracks/widgets/classifier.py @@ -12,6 +12,17 @@ from fixtracks.utils.trackingdata import TrackingData from IPython import embed +class Detection(): + def __init__(self, id, frame, track, position, orientation, length, userlabeled): + self.id = id + self.frame = frame + self.track = track + self.position = position + self.score = 0.0 + self.angle = orientation + self.length = length + self.userlabeled = userlabeled + class WorkerSignals(QObject): error = Signal(str) running = Signal(bool) @@ -24,7 +35,8 @@ class ConsitencyDataLoader(QRunnable): super().__init__() self.signals = WorkerSignals() self.data = data - self.bendedness = self.positions = None + self.bendedness = None + self.positions = None self.lengths = None self.orientations = None self.userlabeled = None @@ -70,6 +82,18 @@ class ConsistencyWorker(QRunnable): @Slot() def run(self): + def get_detections(frame, indices): + detections = [] + for i in indices: + if np.any(self.positions[i] < 0.1): + logging.debug("Encountered probably invalid position %s", str(self.positions[i])) + continue + d = Detection(i, frame, self.tracks[i], self.positions[i], + self.orientations[i], self.lengths[i], + self.userlabeled[i]) + detections.append(d) + return detections + def needs_checking(original, new): res = False for n, o in zip(new, original): @@ -82,112 +106,135 @@ class ConsistencyWorker(QRunnable): print("all detections would be assigned to one track!") return res - def assign_by_distance(f, p): - t1_step = f - last_frame[0] - t2_step = f - last_frame[1] + def assign_by_distance(d): + t1_step = d.frame - last_detections[1].frame + t2_step = d.frame - last_detections[2].frame if t1_step == 0 or t2_step == 0: - print(f"framecount is zero! current frame {f}, last frame {last_frame[0]} and {last_frame[1]}") - - distance_to_trackone = np.linalg.norm(p - last_pos[0])/t1_step - distance_to_tracktwo = np.linalg.norm(p - last_pos[1])/t2_step + print(f"framecount is zero! current frame {f}, last frame {last_detections[1].frame} and {last_detections[2].frame}") + distance_to_trackone = np.linalg.norm(d.position - last_detections[1].position)/t1_step + distance_to_tracktwo = np.linalg.norm(d.position - last_detections[2].position)/t2_step most_likely_track = np.argmin([distance_to_trackone, distance_to_tracktwo]) + 1 distances = np.zeros(2) distances[0] = distance_to_trackone distances[1] = distance_to_tracktwo return most_likely_track, distances - def assign_by_orientation(f, o): - t1_step = f - last_frame[0] - t2_step = f - last_frame[1] - orientationchange = (last_angle - o) - orientationchange[orientationchange > 180] = 360 - orientationchange[orientationchange > 180] - orientationchange /= np.array([t1_step, t2_step]) - # orientationchange = np.abs(np.unwrap((last_angle - o)/np.array([t1_step, t2_step]))) - most_likely_track = np.argmin(np.abs(orientationchange)) + 1 - return most_likely_track, orientationchange - - def assign_by_length(o): - length_difference = np.abs((last_length - o)) - most_likely_track = np.argmin(length_difference) + 1 - return most_likely_track, length_difference - - def do_assignment(f, indices, assignments): - for i, idx in enumerate(indices): - self.tracks[idx] = assignments[i] - last_pos[assignments[i]-1] = pp[i] - last_frame[assignments[i]-1] = f - last_angle[assignments[i]-1] = self.orientations[idx] - last_length[assignments[i]-1] += ((self.lengths[idx] - last_length[assignments[i]-1])/processed) - - # self.userlabeled - - - - - last_pos = [self.positions[(self.tracks == 1) & (self.frames <= self._startframe)][-1], - self.positions[(self.tracks == 2) & (self.frames <= self._startframe)][-1]] - last_frame = [self.frames[(self.tracks == 1) & (self.frames <= self._startframe)][-1], - self.frames[(self.tracks == 2) & (self.frames <= self._startframe)][-1]] - last_angle = [self.orientations[(self.tracks == 1) & (self.frames <= self._startframe)][-1], - self.orientations[(self.tracks == 2) & (self.frames <= self._startframe)][-1]] - last_length = [self.lengths[(self.tracks == 1) & (self.frames <= self._startframe)][-1], - self.lengths[(self.tracks == 2) & (self.frames <= self._startframe)][-1]] - + def assign_by_orientation(d): + t1_step = d.frame - last_detections[1].frame + t2_step = d.frame - last_detections[2].frame + orientationchanges = np.zeros(2) + for i in [1, 2]: + orientationchanges[i-1] = (last_detections[i].angle - d.angle) + + orientationchanges[orientationchanges > 180] = 360 - orientationchanges[orientationchanges > 180] + orientationchanges /= np.array([t1_step, t2_step]) + most_likely_track = np.argmin(np.abs(orientationchanges)) + 1 + return most_likely_track, orientationchanges + + def assign_by_length(d): + length_differences = np.zeros(2) + length_differences[0] = np.abs((last_detections[1].length - d.length)) + length_differences[1] = np.abs((last_detections[2].length - d.length)) + most_likely_track = np.argmin(length_differences) + 1 + return most_likely_track, length_differences + + unique_frames = np.unique(self.frames) + steps = int((len(unique_frames) - self._startframe) // 100) errors = 0 processed = 1 progress = 0 self._stoprequest = False - maxframes = np.max(self.frames) - startframe = np.max(last_frame) - steps = int((maxframes - startframe) // 200) + last_detections = {1: None, 2: None, -1: None} - for f in np.unique(self.frames[self.frames > startframe]): - processed += 1 - self.signals.currentframe.emit(f) + for f in unique_frames[unique_frames >= self._startframe]: if self._stoprequest: break + error = False + self.signals.currentframe.emit(f) indices = np.where(self.frames == f)[0] - pp = self.positions[indices] - originaltracks = self.tracks[indices] - dist_assignments = np.zeros_like(originaltracks) - angle_assignments = np.zeros_like(originaltracks) - length_assignments = np.zeros_like(originaltracks) - userlabeled = np.zeros_like(originaltracks) - distances = np.zeros((len(originaltracks), 2)) + detections = get_detections(f, indices) + done = [False, False] + if len(detections) == 0: + continue + + if len(detections) > 1 and np.any([detections[0].userlabeled, detections[1].userlabeled]): + # more than one detection + if detections[0].userlabeled and detections[1].userlabeled: + if detections[0].track == detections[1].track: + error = True + logging.info("Classification error both detections in the same frame are assigned to the same track!") + elif detections[0].userlabeled and not detections[1].userlabeled: + detections[1].track = 1 if detections[0].track == 2 else 2 + elif not detections[0].userlabeled and detections[1].userlabeled: + detections[0].track = 1 if detections[1].track == 2 else 2 + + if not error: + last_detections[detections[0].track] = detections[0] + last_detections[detections[1].track] = detections[1] + self.tracks[detections[0].id] = detections[0].track + self.tracks[detections[1].id] = detections[1].track + done[0] = True + done[1] = True + elif len(detections) == 1 and detections[0].userlabeled: # ony one detection and labeled + last_detections[detections[0].track] = detections[0] + done[0] = True + + if np.sum(done) == len(detections): + continue + # if f == 2088: + # embed() + # return + if error and self._stoponerror: + self.signals.error.emit("Classification error both detections in the same frame are assigned to the same track!") + break + dist_assignments = np.zeros(2, dtype=int) + orientation_assignments = np.zeros_like(dist_assignments) + length_assignments = np.zeros_like(dist_assignments) + distances = np.zeros((2, 2)) orientations = np.zeros_like(distances) lengths = np.zeros_like(distances) + assignments = np.zeros((2, 2)) + for i, d in enumerate(detections): + dist_assignments[i], distances[i, :] = assign_by_distance(d) + orientation_assignments[i], orientations[i,:] = assign_by_orientation(d) + length_assignments[i], lengths[i, :] = assign_by_length(d) + assignments[i, :] = dist_assignments # (dist_assignments * 10 + orientation_assignments + length_assignments) / 3 + + diffs = np.diff(assignments, axis=1) + error = False + temp = {} + message = "" + for i, d in enumerate(detections): + temp = {} + if diffs[i] == 0: # both are equally likely + d.track = -1 + error = True + message = "Classification error both detections in the same frame are assigned to the same track!" + break + if diffs[i] < 0: + d.track = 1 + else: + d.track = 2 + self.tracks[d.id] = d.track + if d.track not in temp: + temp[d.track] = d + else: + error = True + message = "Double assignment to the same track!" + break - for i, (idx, p) in enumerate(zip(indices, pp)): - if self.userlabeled[idx]: - print("user") - userlabeled[i] = True - last_pos[originaltracks[i]-1] = pp[i] - last_frame[originaltracks[i]-1] = f - last_angle[originaltracks[i]-1] = self.orientations[idx] - last_length[originaltracks[i]-1] += ((self.lengths[idx] - last_length[originaltracks[i]-1]) / processed) - continue - dist_assignments[i], distances[i, :] = assign_by_distance(f, p) - angle_assignments[i], orientations[i,:] = assign_by_orientation(f, self.orientations[idx]) - length_assignments[i], lengths[i, :] = assign_by_length(self.lengths[idx]) - if np.any(userlabeled): - continue - # check (re) assignment, update, and proceed - if not needs_checking(originaltracks, dist_assignments): - do_assignment(f, indices, dist_assignments) + if not error: + for k in temp: + last_detections[temp[k].track] = temp[k] else: - if not (np.all(length_assignments == 1) or np.all(length_assignments == 2)): # if I find a solution by body length - logging.debug("frame %i: Decision based on body length", f) - do_assignment(f, indices, length_assignments) - elif not (np.all(angle_assignments == 1) or np.all(angle_assignments == 2)): # else there is a solution based on orientation - logging.info("frame %i: Decision based on orientation", f) - do_assignment(f, indices, angle_assignments) - else: - logging.info("frame %i: Cannot decide who is who") - for idx in indices: - self.tracks[idx] = -1 - errors += 1 - if self._stoponerror: - break + logging.info("frame %i: Cannot decide who is who! %s", f, message) + for idx in indices: + self.tracks[idx] = -1 + errors += 1 + if self._stoponerror: + self.signals.error.emit(message) + break + processed += 1 if steps > 0 and f % steps == 0: progress += 1 @@ -486,18 +533,25 @@ class ConsistencyClassifier(QWidget): self._all_scores = self._dataworker.scores self._frames = self._dataworker.frames self._tracks = self._dataworker.tracks + self._dataworker = None + if np.sum(self._userlabeled) < 1: + logging.error("ConsistencyTracker: I need at least 1 user-labeled frame to start with!") + self.setEnabled(False) + else: + t1_userlabeled = self._frames[self._userlabeled & (self._tracks == 1)] + t2_userlabeled = self._frames[self._userlabeled & (self._tracks == 2)] + max_startframe = np.min([t1_userlabeled[-1], t2_userlabeled[-1]]) + min_startframe = np.max([t1_userlabeled[0], t2_userlabeled[0]]) self._maxframes = np.max(self._frames) - # FIXME the following line causes an error when there are no detections in the range - min_frame = max([self._frames[self._tracks == 1][0], self._frames[self._tracks == 2][0]]) + 1 self._maxframeslabel.setText(str(self._maxframes)) - self._startframe_spinner.setMinimum(min_frame) - self._startframe_spinner.setMaximum(self._frames[-1]) - self._startframe_spinner.setValue(self._frames[0] + 1) + self._startframe_spinner.setMinimum(min_startframe) + self._startframe_spinner.setMaximum(max_startframe) + self._startframe_spinner.setValue(min_startframe) + self._startframe_spinner.setSingleStep(20) self._startbtn.setEnabled(True) self._assignedlabel.setText("0") self._errorlabel.setText("0") - self._dataworker = None - self.setEnabled(True) + self.setEnabled(True) @Slot(float) def on_progress(self, value): @@ -612,16 +666,14 @@ def main(): import pickle from fixtracks.info import PACKAGE_ROOT - datafile = PACKAGE_ROOT / "data/merged2.pkl" + datafile = PACKAGE_ROOT / "data/merged_small_beginning.pkl" with open(datafile, "rb") as f: df = pickle.load(f) - data = TrackingData() - data.setData(as_dict(df)) + data = TrackingData(as_dict(df)) coords = data.coordinates() cogs = data.centerOfGravity() userlabeled = data["userlabeled"] - embed() app = QApplication([]) window = QWidget() window.setMinimumSize(200, 200)