diff --git a/code/modules/timestamps.py b/code/modules/timestamps.py index 9ad6add..80c3d5a 100644 --- a/code/modules/timestamps.py +++ b/code/modules/timestamps.py @@ -1,108 +1,113 @@ import numpy as np -from typing import List, Union +from typing import List -def group_timestamps(timestamps: List[Union[int, float]], time_threshold: float = 0.05) -> List[float]: +def purge_duplicates(timestamps: List[float], threshold: float = 0.5) -> List[float]: """ - Group timestamps that are less than a certain time threshold apart. + Compute the mean of groups of timestamps that are closer to the previous or consecutive timestamp than the threshold, + and return all timestamps that are further apart from the previous or consecutive timestamp than the threshold in a single list. Parameters ---------- - timestamps : list of float or int - List of timestamps to group - time_threshold : float, optional - The threshold for time difference between two consecutive timestamps in milliseconds. Default is 0.05 milliseconds. + timestamps : List[float] + A list of sorted timestamps + threshold : float, optional + The threshold to group the timestamps by, default is 0.5 Returns ------- - list of float - List of mean of each group of timestamps - - Examples - -------- - >>> timestamps = [1.2, 1.25, 1.3, 1.35, 1.4, 1.45, 1.5, 1.55, 1.6, 1.65] - >>> group_timestamps(timestamps) - [1.275, 1.425, 1.575] + List[float] + A list containing a list of timestamps that are further apart than the threshold + and a list of means of the groups of timestamps that are closer to the previous or consecutive timestamp than the threshold. """ - # Create an empty list to store the groups of timestamps + # Initialize an empty list to store the groups of timestamps that are closer to the previous or consecutive timestamp than the threshold groups = [] - # Create a variable to store the current group of timestamps - current_group = [] - # Iterate through the timestamps - for i in range(len(timestamps)): - # If the current timestamp is less than 50 milliseconds away from the previous timestamp - if i > 0 and timestamps[i] - timestamps[i-1] < time_threshold: - # Add the current timestamp to the current group - current_group.append(timestamps[i]) + + # initialize the first group with the first timestamp + group = [timestamps[0]] + + for i in range(1, len(timestamps)): + + # check the difference between current timestamp and previous timestamp is less than the threshold + if timestamps[i] - timestamps[i-1] < threshold: + # add the current timestamp to the current group + group.append(timestamps[i]) else: - # If the current timestamp is not part of the current group - if current_group: - # Add the current group to the list of groups - groups.append(current_group) - # Reset the current group - current_group = [] - # Add the current timestamp to a new group - current_group.append(timestamps[i]) - # If there is a group left after the loop - if current_group: - # Add the current group to the list of groups - groups.append(current_group) - # Compute the mean of each group and return it - return [np.mean(group) for group in groups] + # if the difference is greater than the threshold + # append the current group to the groups list + groups.append(group) + + # start a new group with the current timestamp + group = [timestamps[i]] + + # after iterating through all the timestamps, add the last group to the groups list + groups.append(group) + + # get the mean of each group and only include the ones that have more than 1 timestamp + means = [np.mean(group) for group in groups if len(group) > 1] + + # get the timestamps that are outliers, i.e. the ones that are alone in a group + outliers = [ts for group in groups for ts in group if len(group) == 1] + + # return the outliers and means in a single list + return outliers + means -def group_timestamps_v2(sublists: List[List[Union[int, float]]], n: int, time_threshold: float = 0.05) -> List[float]: +def group_timestamps(sublists: List[List[float]], n: int, threshold: float) -> List[float]: """ - Group timestamps that are less than a certain time threshold apart and occur in at least n sublists. + Groups timestamps that are less than `threshold` milliseconds apart from at least `n` other sublists. + Returns a list of the mean of each group. + If any of the sublists is empty, it will be ignored. Parameters ---------- - sublists : list of list of float or int - List of sublists containing timestamps + sublists : List[List[float]] + a list of sublists, each containing timestamps n : int - Minimum number of sublists in which a timestamp should occur to be considered - time_threshold : float, optional - The threshold for time difference between two consecutive timestamps in milliseconds. Default is 0 + minimum number of sublists that a timestamp must be close to in order to be grouped + threshold : float + the maximum difference in milliseconds between timestamps to be considered a match Returns ------- - list of float - List of mean of each group of timestamps - - Examples - -------- - >>> sublists = [[1.2, 1.25, 1.3, 1.35, 1.4], [1.3, 1.35, 1.4, 1.45, 1.5], [1.4, 1.45, 1.5, 1.55, 1.6]] - >>> group_timestamps_v2(sublists, 2) - [1.325, 1.45] + List[float] + a list of the mean of each group. + """ + # Flatten the sublists and sort the timestamps + timestamps = [ + timestamp for sublist in sublists if sublist for timestamp in sublist] + timestamps.sort() - # Create an empty list to store the groups of timestamps groups = [] - # Create a variable to store the current group of timestamps - current_group = [] - # Create a set to store the timestamps that occur in at least n of the sublists - common_timestamps = set.intersection(*[set(lst) for lst in sublists]) - # convert the set to a list - common_timestamps = list(common_timestamps) - # Iterate through the timestamps - for i in range(len(common_timestamps)): - # If the current timestamp is less than 50 milliseconds away from the previous timestamp - if i > 0 and common_timestamps[i] - common_timestamps[i-1] < time_threshold: - # Add the current timestamp to the current group - current_group.append(common_timestamps[i]) + current_group = [timestamps[0]] + + # Group timestamps that are less than threshold milliseconds apart + for i in range(1, len(timestamps)): + if timestamps[i] - timestamps[i-1] < threshold: + current_group.append(timestamps[i]) else: - # If the current timestamp is not part of the current group - if current_group: - # Add the current group to the list of groups - groups.append(current_group) - # Reset the current group - current_group = [] - # Add the current timestamp to a new group - current_group.append(common_timestamps[i]) - # If there is a group left after the loop - if current_group: - # Add the current group to the list of groups - groups.append(current_group) - # Compute the mean of each group and return it - return [np.mean(group) for group in groups] + groups.append(current_group) + current_group = [timestamps[i]] + + groups.append(current_group) + + # Retain only groups that contain at least n timestamps + final_groups = [] + for group in groups: + if len(group) >= n: + final_groups.append(group) + + # Calculate the mean of each group + means = [np.mean(group) for group in final_groups] + + return means + + +if __name__ == "__main__": + timestamps = [[1.2, 1.5, 1.3], [], + [1.21, 1.51, 1.31], [1.19, 1.49, 1.29], [1.22, 1.52, 1.32], [1.2, 1.5, 1.3]] + print(group_timestamps(timestamps, 2, 0.05)) + print(purge_duplicates( + [1, 2, 3, 4, 5, 6, 6.02, 7, 8, 8.02], 0.05))