purge duplicates
This commit is contained in:
@@ -1,11 +1,15 @@
|
||||
import numpy as np
|
||||
from typing import List
|
||||
from typing import List, Union, Any
|
||||
|
||||
|
||||
def purge_duplicates(timestamps: List[float], threshold: float = 0.5) -> List[float]:
|
||||
def purge_duplicates(
|
||||
timestamps: List[float], threshold: float = 0.5
|
||||
) -> List[float]:
|
||||
"""
|
||||
Compute the mean of groups of timestamps that are closer to the previous or consecutive timestamp than the threshold,
|
||||
and return all timestamps that are further apart from the previous or consecutive timestamp than the threshold in a single list.
|
||||
Compute the mean of groups of timestamps that are closer to the previous
|
||||
or consecutive timestamp than the threshold, and return all timestamps that
|
||||
are further apart from the previous or consecutive timestamp than the
|
||||
threshold in a single list.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
@@ -17,10 +21,12 @@ def purge_duplicates(timestamps: List[float], threshold: float = 0.5) -> List[fl
|
||||
Returns
|
||||
-------
|
||||
List[float]
|
||||
A list containing a list of timestamps that are further apart than the threshold
|
||||
and a list of means of the groups of timestamps that are closer to the previous or consecutive timestamp than the threshold.
|
||||
A list containing a list of timestamps that are further apart than
|
||||
the threshold and a list of means of the groups of timestamps that
|
||||
are closer to the previous or consecutive timestamp than the threshold.
|
||||
"""
|
||||
# Initialize an empty list to store the groups of timestamps that are closer to the previous or consecutive timestamp than the threshold
|
||||
# Initialize an empty list to store the groups of timestamps that are
|
||||
# closer to the previous or consecutive timestamp than the threshold
|
||||
groups = []
|
||||
|
||||
# initialize the first group with the first timestamp
|
||||
@@ -28,8 +34,9 @@ def purge_duplicates(timestamps: List[float], threshold: float = 0.5) -> List[fl
|
||||
|
||||
for i in range(1, len(timestamps)):
|
||||
|
||||
# check the difference between current timestamp and previous timestamp is less than the threshold
|
||||
if timestamps[i] - timestamps[i-1] < threshold:
|
||||
# check the difference between current timestamp and previous
|
||||
# timestamp is less than the threshold
|
||||
if timestamps[i] - timestamps[i - 1] < threshold:
|
||||
# add the current timestamp to the current group
|
||||
group.append(timestamps[i])
|
||||
else:
|
||||
@@ -40,22 +47,28 @@ def purge_duplicates(timestamps: List[float], threshold: float = 0.5) -> List[fl
|
||||
# start a new group with the current timestamp
|
||||
group = [timestamps[i]]
|
||||
|
||||
# after iterating through all the timestamps, add the last group to the groups list
|
||||
# after iterating through all the timestamps, add the last group to the
|
||||
# groups list
|
||||
groups.append(group)
|
||||
|
||||
# get the mean of each group and only include the ones that have more than 1 timestamp
|
||||
# get the mean of each group and only include the ones that have more
|
||||
# than 1 timestamp
|
||||
means = [np.mean(group) for group in groups if len(group) > 1]
|
||||
|
||||
# get the timestamps that are outliers, i.e. the ones that are alone in a group
|
||||
# get the timestamps that are outliers, i.e. the ones that are alone
|
||||
# in a group
|
||||
outliers = [ts for group in groups for ts in group if len(group) == 1]
|
||||
|
||||
# return the outliers and means in a single list
|
||||
return outliers + means
|
||||
|
||||
|
||||
def group_timestamps(sublists: List[List[float]], n: int, threshold: float) -> List[float]:
|
||||
def group_timestamps(
|
||||
sublists: List[List[float]], n: int, threshold: float
|
||||
) -> List[float]:
|
||||
"""
|
||||
Groups timestamps that are less than `threshold` milliseconds apart from at least `n` other sublists.
|
||||
Groups timestamps that are less than `threshold` milliseconds apart from
|
||||
at least `n` other sublists.
|
||||
Returns a list of the mean of each group.
|
||||
If any of the sublists is empty, it will be ignored.
|
||||
|
||||
@@ -64,9 +77,11 @@ def group_timestamps(sublists: List[List[float]], n: int, threshold: float) -> L
|
||||
sublists : List[List[float]]
|
||||
a list of sublists, each containing timestamps
|
||||
n : int
|
||||
minimum number of sublists that a timestamp must be close to in order to be grouped
|
||||
minimum number of sublists that a timestamp must be close to in order
|
||||
to be grouped
|
||||
threshold : float
|
||||
the maximum difference in milliseconds between timestamps to be considered a match
|
||||
the maximum difference in milliseconds between timestamps to be
|
||||
considered a match
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -76,7 +91,8 @@ def group_timestamps(sublists: List[List[float]], n: int, threshold: float) -> L
|
||||
"""
|
||||
# Flatten the sublists and sort the timestamps
|
||||
timestamps = [
|
||||
timestamp for sublist in sublists if sublist for timestamp in sublist]
|
||||
timestamp for sublist in sublists if sublist for timestamp in sublist
|
||||
]
|
||||
timestamps.sort()
|
||||
|
||||
groups = []
|
||||
@@ -84,7 +100,7 @@ def group_timestamps(sublists: List[List[float]], n: int, threshold: float) -> L
|
||||
|
||||
# Group timestamps that are less than threshold milliseconds apart
|
||||
for i in range(1, len(timestamps)):
|
||||
if timestamps[i] - timestamps[i-1] < threshold:
|
||||
if timestamps[i] - timestamps[i - 1] < threshold:
|
||||
current_group.append(timestamps[i])
|
||||
else:
|
||||
groups.append(current_group)
|
||||
@@ -104,10 +120,32 @@ def group_timestamps(sublists: List[List[float]], n: int, threshold: float) -> L
|
||||
return means
|
||||
|
||||
|
||||
def flatten(list: List[List[Any]]) -> List:
|
||||
"""
|
||||
Flattens a list / array of lists.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
l : array or list of lists
|
||||
The list to be flattened
|
||||
|
||||
Returns
|
||||
-------
|
||||
list
|
||||
The flattened list
|
||||
"""
|
||||
return [item for sublist in list for item in sublist]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
timestamps = [[1.2, 1.5, 1.3], [],
|
||||
[1.21, 1.51, 1.31], [1.19, 1.49, 1.29], [1.22, 1.52, 1.32], [1.2, 1.5, 1.3]]
|
||||
timestamps = [
|
||||
[1.2, 1.5, 1.3],
|
||||
[],
|
||||
[1.21, 1.51, 1.31],
|
||||
[1.19, 1.49, 1.29],
|
||||
[1.22, 1.52, 1.32],
|
||||
[1.2, 1.5, 1.3],
|
||||
]
|
||||
print(group_timestamps(timestamps, 2, 0.05))
|
||||
print(purge_duplicates(
|
||||
[1, 2, 3, 4, 5, 6, 6.02, 7, 8, 8.02], 0.05))
|
||||
print(purge_duplicates([1, 2, 3, 4, 5, 6, 6.02, 7, 8, 8.02], 0.05))
|
||||
Reference in New Issue
Block a user