fixtracks/fixtracks/util.py

327 lines
12 KiB
Python

import logging
import pandas as pd
import cv2 as cv
import time
import pickle
import numpy as np
from PySide6.QtCore import QRunnable, Signal, QObject, Slot
from IPython import embed
class ProducerSignals(QObject):
finished = Signal(bool)
error = Signal(str)
# start = pyqtSignal(float)
# running = pyqtSignal()
progress = Signal(float)
progress2 = Signal((str, float, float))
class ImageReader(QRunnable):
finished = Signal(bool)
def __init__(self, filename, frame=1000) -> None:
super().__init__()
self._filename = filename
self._framenumber = frame
self._signals = ProducerSignals()
self._frame = None
@Slot()
def run(self):
'''
Your code goes in this function
'''
logging.debug("ImageReader: trying to open file %s", self._filename)
cap = cv.VideoCapture(self._filename)
framecount = int(cap.get(cv.CAP_PROP_FRAME_COUNT))
if self._framenumber >= framecount:
logging.warning("ImageReader: desired frame number (%i) exceeds the frame count (%i)! Redefined to frame count." % (self._framenumber, framecount))
if not cap.isOpened():
logging.debug("ImageReader: failed to open file %s", self._filename)
self._signals.finished.emit(False)
fn = 0
while cap.isOpened() and fn < self._framenumber:
ret, frame = cap.read()
if not ret:
logging.warning("ImageReader: failed to read frame %i", fn)
self._signals.finished.emit(False)
break
fn += 1
self._frame = frame # cv.cvtColor(frame, cv.COLOR_BGR2RGB)
cap.release()
self._signals.finished.emit(True)
@property
def signals(self):
return self._signals
@property
def frame(self):
return self._frame
class DataFrameReader(QRunnable):
finished = Signal(bool)
def __init__(self, filename) -> None:
super().__init__()
self._filename = filename
self._signals = ProducerSignals()
self._dataframe = None
@Slot()
def run(self):
'''
Your code goes in this function
'''
logging.debug("DataFrameReader: trying to open file %s", self._filename)
self._dataframe = pd.read_csv(self._filename, sep=";", index_col=0)
self._signals.finished.emit(True)
@property
def signals(self):
return self._signals
@property
def dataframe(self):
return self._dataframe
class Merger(QRunnable):
def __init__(self, left_data, right_data, left_cut, right_cut) -> None:
super().__init__()
self._signals = ProducerSignals()
self._left_data = left_data
self._right_data = right_data
self._left_cut = left_cut
self._right_cut = right_cut
self._result = None
self._stopRequest = False
self._merged = None
self._current_task = ""
self._mergeprogress = 0.0
for df in [self._left_data, self._right_data]:
if not self.check_dataframe(df):
self.signals.error.emit("Merger.__init__: Error checking DataFrame structure!")
def check_dataframe(self, df):
"""Perform some sanity checks on the dataframe.
Parameters
----------
df : pandas.DataFrame
the DataFrame conataining the detections of the left or right camera.
Returns
-------
bool
True, if everything's all right, False otherwise.
"""
return True
def to_numpy(self, df):
"""Convert some columns of the DataFrame to numpy arrays.
Parameters
----------
df : pandas.DataFrame
The DataFrame containing the detections.
Returns
-------
numpy.ndarray
3D array containing the x,y coordinates of each detection in each frame. Shape (num_detections, num_keypoints, 2)
numpy.ndarray
2D array with visibility score for each of the keypoints in each frame. Shape (num_detections, num_keypoints)
numpy.ndarray
2D array, Coordinates of the bounding box for each detection. Shape: (num_detections, 4) x1, y1, x2, y2
"""
logging.info("Converting to numpy ...")
key_columns = [c for c in df.columns if "key_" in c]
box_columns = [c for c in df.columns if "box_" in c]
num_frames = len(df)
num_keypoints = len(key_columns)
dimensions = 2
keypoints = np.empty((num_frames, num_keypoints, dimensions))
visibility = np.empty((num_frames, num_keypoints))
boxcoordinates = np.empty((num_frames, 4))
for i, row in df.iterrows():
for j, k in enumerate(key_columns):
key_data = row[k]
l = list(map(float, list(key_data[1:-1].split(","))))
keypoints[i, j, :] = l
for j, b in enumerate(box_columns):
boxcoordinates[i, j] = row[b]
if isinstance(row["visible"], str):
vis = list(map(float, row["visible"][1:-1].split()))
visibility[i, :] = vis
else:
visibility[i, :] = row["visible"]
logging.info("Converting to numpy done!")
return keypoints, visibility, boxcoordinates
def sort_detections(self, keypoints, threshold, left=True):
"""Categorize the detections into those that are easy (not in the visual overlap zone) and those that are tricky, i.e. right across the threshold.
Detections beyond threshold are ignored, those across the threshold need to be treated separately.
Parameters
----------
keypoints : np.ndarray
3d array of keypoint coordinates (num detections, num keypoints, (x,y))
threshold : int
the threshold line at which the data should be merged
left : bool, optional
whether or not the data is from the left side, controls how the threshold is interpreted, by default True
Returns
-------
np.ndarray
The indices of the easy detections
np.ndarray
The tricky detections
"""
logging.info("Sorting detections")
if left:
easyindeces = np.where(np.all(keypoints[:,:,0] < threshold, axis=1))[0]
trickyindices = np.where(np.any((keypoints[:,:,0] >= threshold) &
(keypoints[:,:,0] < threshold), axis=1))[0]
else:
easyindeces = np.where(np.all(keypoints[:,:,0] >= threshold, axis=1))[0]
trickyindices = np.where(np.any((keypoints[:,:,0] < threshold) &
(keypoints[:,:,0] >= threshold), axis=1))[0]
return easyindeces, trickyindices
def select_and_transform(self, df, keypoints, boxes, quality, frames, valid_detections,
left_threshold=None, right_threshold=None):
keypoints = keypoints[valid_detections, :, :]
boxes = boxes[valid_detections, :]
quality = quality[valid_detections, :]
frames = frames[valid_detections]
df = df.iloc[valid_detections]
if all([left_threshold, right_threshold]):
keypoints[:, :, 0] += (left_threshold - right_threshold)
boxes[:, [0, 2]] += (left_threshold - right_threshold)
return df, keypoints, quality, boxes, frames
def to_dataframe(self, old_left, old_right, lkeypoints, rkeypoints, lboxes, rboxes,
lqualities, rqualities, lframes, rframes):
frames = np.concatenate([lframes, rframes])
sorting = np.argsort(frames)
frames = frames[sorting]
confidences = np.concatenate([old_left.confidence.values, old_right.confidence.values])
confidences = confidences[sorting]
classes = np.concatenate([old_left.cls.values, old_right.cls.values])
classes = classes[sorting]
names = np.concatenate([old_left.name.values, old_right.name.values])
names = names[sorting]
keypoints = np.concatenate([lkeypoints, rkeypoints], axis=0)
keypoints = keypoints[sorting, :, :]
boxes = np.concatenate([lboxes, rboxes], axis=0)
boxes = boxes[sorting, :]
qualities = np.concatenate([lqualities, rqualities], axis=0)
qualities = qualities[sorting, :]
tracks = np.concatenate([old_left.track_id.values, old_right.track_id.values], axis=0)
tracks = tracks[sorting]
# sort before converting to df
q = []; b=[]; k = []
for i in range(len(frames)):
q.append(qualities[i, :])
b.append(boxes[i, :])
k.append(keypoints[i, :])
d = {"frame":frames, "cls": classes, "name":names, "keypoint_score": q, "track": tracks,
"keypoints": k, "box":b, "confidence":confidences}
df = pd.DataFrame(d)
return df
def save(self, filename):
if self._merged is None:
logging.error("Saving/pickling merged dataFrame is None!")
return
logging.info("Saving/pickling merged file to %s" % filename)
with open(filename, 'wb') as f:
pickle.dump(self._merged, f)
@Slot()
def stop_request(self):
self._stopRequest = True
@Slot()
def run(self):
logging.info("Cutting left detections to limit %i", self._left_cut)
self.signals.progress.emit(0.0)
self.signals.progress2.emit("Merging", self._mergeprogress, 0.)
if not self.check_dataframe(self._left_data) or not self.check_dataframe(self._right_data):
logging.error("Left or right dataframe structure does not match my expectations")
return None
self.signals.progress.emit(0.05)
if not self._stopRequest:
logging.info("Converting to numpy... %s", "Left camera")
lkeypoints, lquality, lbox = self.to_numpy(self._left_data)
lframes = self._left_data.frame.values
self.signals.progress.emit(0.3)
else:
self.signals.finished(False)
return
if not self._stopRequest:
logging.info("Converting to numpy... %s", "Right camera")
rkeypoints, rquality, rbox = self.to_numpy(self._right_data)
rframes = self._right_data.frame.values
self.signals.progress.emit(0.6)
else:
self.signals.finished(False)
return
logging.info("Filtering detections")
left_easy, _ = self.sort_detections(lkeypoints, self._left_cut, left=True)
right_easy, _ = self.sort_detections(rkeypoints, self._right_cut, left=False)
self.signals.progress.emit(0.7)
logging.info("Merging and transformation")
ldf, lkeypoints, lquality, lboxes, lframes = self.select_and_transform(self._left_data, lkeypoints, lbox,
lquality, lframes, left_easy)
self.signals.progress.emit(0.8)
rdf, rkeypoints, rquality, rboxes, rframes = self.select_and_transform(self._right_data, rkeypoints, rbox,
rquality, rframes, right_easy,
self._left_cut, self._right_cut)
self.signals.progress.emit(0.9)
if not self._stopRequest:
self._merged = self.to_dataframe(ldf, rdf, lkeypoints, rkeypoints, lboxes, rboxes, lquality, rquality,
lframes, rframes)
self.signals.progress.emit(1.0)
else:
self.signals.finished(False)
return
logging.info("Merging done!")
self._signals.finished.emit(True and (not self._stopRequest))
@property
def signals(self):
return self._signals
@property
def result(self):
return self._result
# TEST code
def main():
logging.info("Loading data left")
left = pd.read_csv("../data/left_tracks.csv", sep=";", index_col=0)
logging.info("Loading data right")
right = pd.read_csv("../data/right_tracks.csv", sep=";", index_col=0)
# merge_detections(left, right, 2000, 300)
merger = Merger(left, right, 2000, 300 )
merger.run()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, force=True)
main()