[classifier] working but not really...

This commit is contained in:
Jan Grewe 2025-02-25 18:45:37 +01:00
parent f1a4f4dc84
commit 430ee4fac7

View File

@ -12,6 +12,17 @@ from fixtracks.utils.trackingdata import TrackingData
from IPython import embed
class Detection():
def __init__(self, id, frame, track, position, orientation, length, userlabeled):
self.id = id
self.frame = frame
self.track = track
self.position = position
self.score = 0.0
self.angle = orientation
self.length = length
self.userlabeled = userlabeled
class WorkerSignals(QObject):
error = Signal(str)
running = Signal(bool)
@ -24,7 +35,8 @@ class ConsitencyDataLoader(QRunnable):
super().__init__()
self.signals = WorkerSignals()
self.data = data
self.bendedness = self.positions = None
self.bendedness = None
self.positions = None
self.lengths = None
self.orientations = None
self.userlabeled = None
@ -70,6 +82,18 @@ class ConsistencyWorker(QRunnable):
@Slot()
def run(self):
def get_detections(frame, indices):
detections = []
for i in indices:
if np.any(self.positions[i] < 0.1):
logging.debug("Encountered probably invalid position %s", str(self.positions[i]))
continue
d = Detection(i, frame, self.tracks[i], self.positions[i],
self.orientations[i], self.lengths[i],
self.userlabeled[i])
detections.append(d)
return detections
def needs_checking(original, new):
res = False
for n, o in zip(new, original):
@ -82,112 +106,135 @@ class ConsistencyWorker(QRunnable):
print("all detections would be assigned to one track!")
return res
def assign_by_distance(f, p):
t1_step = f - last_frame[0]
t2_step = f - last_frame[1]
def assign_by_distance(d):
t1_step = d.frame - last_detections[1].frame
t2_step = d.frame - last_detections[2].frame
if t1_step == 0 or t2_step == 0:
print(f"framecount is zero! current frame {f}, last frame {last_frame[0]} and {last_frame[1]}")
distance_to_trackone = np.linalg.norm(p - last_pos[0])/t1_step
distance_to_tracktwo = np.linalg.norm(p - last_pos[1])/t2_step
print(f"framecount is zero! current frame {f}, last frame {last_detections[1].frame} and {last_detections[2].frame}")
distance_to_trackone = np.linalg.norm(d.position - last_detections[1].position)/t1_step
distance_to_tracktwo = np.linalg.norm(d.position - last_detections[2].position)/t2_step
most_likely_track = np.argmin([distance_to_trackone, distance_to_tracktwo]) + 1
distances = np.zeros(2)
distances[0] = distance_to_trackone
distances[1] = distance_to_tracktwo
return most_likely_track, distances
def assign_by_orientation(f, o):
t1_step = f - last_frame[0]
t2_step = f - last_frame[1]
orientationchange = (last_angle - o)
orientationchange[orientationchange > 180] = 360 - orientationchange[orientationchange > 180]
orientationchange /= np.array([t1_step, t2_step])
# orientationchange = np.abs(np.unwrap((last_angle - o)/np.array([t1_step, t2_step])))
most_likely_track = np.argmin(np.abs(orientationchange)) + 1
return most_likely_track, orientationchange
def assign_by_length(o):
length_difference = np.abs((last_length - o))
most_likely_track = np.argmin(length_difference) + 1
return most_likely_track, length_difference
def do_assignment(f, indices, assignments):
for i, idx in enumerate(indices):
self.tracks[idx] = assignments[i]
last_pos[assignments[i]-1] = pp[i]
last_frame[assignments[i]-1] = f
last_angle[assignments[i]-1] = self.orientations[idx]
last_length[assignments[i]-1] += ((self.lengths[idx] - last_length[assignments[i]-1])/processed)
# self.userlabeled
last_pos = [self.positions[(self.tracks == 1) & (self.frames <= self._startframe)][-1],
self.positions[(self.tracks == 2) & (self.frames <= self._startframe)][-1]]
last_frame = [self.frames[(self.tracks == 1) & (self.frames <= self._startframe)][-1],
self.frames[(self.tracks == 2) & (self.frames <= self._startframe)][-1]]
last_angle = [self.orientations[(self.tracks == 1) & (self.frames <= self._startframe)][-1],
self.orientations[(self.tracks == 2) & (self.frames <= self._startframe)][-1]]
last_length = [self.lengths[(self.tracks == 1) & (self.frames <= self._startframe)][-1],
self.lengths[(self.tracks == 2) & (self.frames <= self._startframe)][-1]]
def assign_by_orientation(d):
t1_step = d.frame - last_detections[1].frame
t2_step = d.frame - last_detections[2].frame
orientationchanges = np.zeros(2)
for i in [1, 2]:
orientationchanges[i-1] = (last_detections[i].angle - d.angle)
orientationchanges[orientationchanges > 180] = 360 - orientationchanges[orientationchanges > 180]
orientationchanges /= np.array([t1_step, t2_step])
most_likely_track = np.argmin(np.abs(orientationchanges)) + 1
return most_likely_track, orientationchanges
def assign_by_length(d):
length_differences = np.zeros(2)
length_differences[0] = np.abs((last_detections[1].length - d.length))
length_differences[1] = np.abs((last_detections[2].length - d.length))
most_likely_track = np.argmin(length_differences) + 1
return most_likely_track, length_differences
unique_frames = np.unique(self.frames)
steps = int((len(unique_frames) - self._startframe) // 100)
errors = 0
processed = 1
progress = 0
self._stoprequest = False
maxframes = np.max(self.frames)
startframe = np.max(last_frame)
steps = int((maxframes - startframe) // 200)
last_detections = {1: None, 2: None, -1: None}
for f in np.unique(self.frames[self.frames > startframe]):
processed += 1
self.signals.currentframe.emit(f)
for f in unique_frames[unique_frames >= self._startframe]:
if self._stoprequest:
break
error = False
self.signals.currentframe.emit(f)
indices = np.where(self.frames == f)[0]
pp = self.positions[indices]
originaltracks = self.tracks[indices]
dist_assignments = np.zeros_like(originaltracks)
angle_assignments = np.zeros_like(originaltracks)
length_assignments = np.zeros_like(originaltracks)
userlabeled = np.zeros_like(originaltracks)
distances = np.zeros((len(originaltracks), 2))
detections = get_detections(f, indices)
done = [False, False]
if len(detections) == 0:
continue
if len(detections) > 1 and np.any([detections[0].userlabeled, detections[1].userlabeled]):
# more than one detection
if detections[0].userlabeled and detections[1].userlabeled:
if detections[0].track == detections[1].track:
error = True
logging.info("Classification error both detections in the same frame are assigned to the same track!")
elif detections[0].userlabeled and not detections[1].userlabeled:
detections[1].track = 1 if detections[0].track == 2 else 2
elif not detections[0].userlabeled and detections[1].userlabeled:
detections[0].track = 1 if detections[1].track == 2 else 2
if not error:
last_detections[detections[0].track] = detections[0]
last_detections[detections[1].track] = detections[1]
self.tracks[detections[0].id] = detections[0].track
self.tracks[detections[1].id] = detections[1].track
done[0] = True
done[1] = True
elif len(detections) == 1 and detections[0].userlabeled: # ony one detection and labeled
last_detections[detections[0].track] = detections[0]
done[0] = True
if np.sum(done) == len(detections):
continue
# if f == 2088:
# embed()
# return
if error and self._stoponerror:
self.signals.error.emit("Classification error both detections in the same frame are assigned to the same track!")
break
dist_assignments = np.zeros(2, dtype=int)
orientation_assignments = np.zeros_like(dist_assignments)
length_assignments = np.zeros_like(dist_assignments)
distances = np.zeros((2, 2))
orientations = np.zeros_like(distances)
lengths = np.zeros_like(distances)
assignments = np.zeros((2, 2))
for i, d in enumerate(detections):
dist_assignments[i], distances[i, :] = assign_by_distance(d)
orientation_assignments[i], orientations[i,:] = assign_by_orientation(d)
length_assignments[i], lengths[i, :] = assign_by_length(d)
assignments[i, :] = dist_assignments # (dist_assignments * 10 + orientation_assignments + length_assignments) / 3
diffs = np.diff(assignments, axis=1)
error = False
temp = {}
message = ""
for i, d in enumerate(detections):
temp = {}
if diffs[i] == 0: # both are equally likely
d.track = -1
error = True
message = "Classification error both detections in the same frame are assigned to the same track!"
break
if diffs[i] < 0:
d.track = 1
else:
d.track = 2
self.tracks[d.id] = d.track
if d.track not in temp:
temp[d.track] = d
else:
error = True
message = "Double assignment to the same track!"
break
for i, (idx, p) in enumerate(zip(indices, pp)):
if self.userlabeled[idx]:
print("user")
userlabeled[i] = True
last_pos[originaltracks[i]-1] = pp[i]
last_frame[originaltracks[i]-1] = f
last_angle[originaltracks[i]-1] = self.orientations[idx]
last_length[originaltracks[i]-1] += ((self.lengths[idx] - last_length[originaltracks[i]-1]) / processed)
continue
dist_assignments[i], distances[i, :] = assign_by_distance(f, p)
angle_assignments[i], orientations[i,:] = assign_by_orientation(f, self.orientations[idx])
length_assignments[i], lengths[i, :] = assign_by_length(self.lengths[idx])
if np.any(userlabeled):
continue
# check (re) assignment, update, and proceed
if not needs_checking(originaltracks, dist_assignments):
do_assignment(f, indices, dist_assignments)
if not error:
for k in temp:
last_detections[temp[k].track] = temp[k]
else:
if not (np.all(length_assignments == 1) or np.all(length_assignments == 2)): # if I find a solution by body length
logging.debug("frame %i: Decision based on body length", f)
do_assignment(f, indices, length_assignments)
elif not (np.all(angle_assignments == 1) or np.all(angle_assignments == 2)): # else there is a solution based on orientation
logging.info("frame %i: Decision based on orientation", f)
do_assignment(f, indices, angle_assignments)
else:
logging.info("frame %i: Cannot decide who is who")
for idx in indices:
self.tracks[idx] = -1
errors += 1
if self._stoponerror:
break
logging.info("frame %i: Cannot decide who is who! %s", f, message)
for idx in indices:
self.tracks[idx] = -1
errors += 1
if self._stoponerror:
self.signals.error.emit(message)
break
processed += 1
if steps > 0 and f % steps == 0:
progress += 1
@ -486,18 +533,25 @@ class ConsistencyClassifier(QWidget):
self._all_scores = self._dataworker.scores
self._frames = self._dataworker.frames
self._tracks = self._dataworker.tracks
self._dataworker = None
if np.sum(self._userlabeled) < 1:
logging.error("ConsistencyTracker: I need at least 1 user-labeled frame to start with!")
self.setEnabled(False)
else:
t1_userlabeled = self._frames[self._userlabeled & (self._tracks == 1)]
t2_userlabeled = self._frames[self._userlabeled & (self._tracks == 2)]
max_startframe = np.min([t1_userlabeled[-1], t2_userlabeled[-1]])
min_startframe = np.max([t1_userlabeled[0], t2_userlabeled[0]])
self._maxframes = np.max(self._frames)
# FIXME the following line causes an error when there are no detections in the range
min_frame = max([self._frames[self._tracks == 1][0], self._frames[self._tracks == 2][0]]) + 1
self._maxframeslabel.setText(str(self._maxframes))
self._startframe_spinner.setMinimum(min_frame)
self._startframe_spinner.setMaximum(self._frames[-1])
self._startframe_spinner.setValue(self._frames[0] + 1)
self._startframe_spinner.setMinimum(min_startframe)
self._startframe_spinner.setMaximum(max_startframe)
self._startframe_spinner.setValue(min_startframe)
self._startframe_spinner.setSingleStep(20)
self._startbtn.setEnabled(True)
self._assignedlabel.setText("0")
self._errorlabel.setText("0")
self._dataworker = None
self.setEnabled(True)
self.setEnabled(True)
@Slot(float)
def on_progress(self, value):
@ -612,16 +666,14 @@ def main():
import pickle
from fixtracks.info import PACKAGE_ROOT
datafile = PACKAGE_ROOT / "data/merged2.pkl"
datafile = PACKAGE_ROOT / "data/merged_small_beginning.pkl"
with open(datafile, "rb") as f:
df = pickle.load(f)
data = TrackingData()
data.setData(as_dict(df))
data = TrackingData(as_dict(df))
coords = data.coordinates()
cogs = data.centerOfGravity()
userlabeled = data["userlabeled"]
embed()
app = QApplication([])
window = QWidget()
window.setMinimumSize(200, 200)