From f9cf2459c3590a4924d38636cc8a439819929dbb Mon Sep 17 00:00:00 2001 From: Simon Guist Date: Wed, 5 Oct 2022 12:20:00 +0200 Subject: [PATCH 1/7] add ball velocity filtering for recorded trajectories --- python/context/ball_trajectories.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/python/context/ball_trajectories.py b/python/context/ball_trajectories.py index e88ca64..7abf631 100644 --- a/python/context/ball_trajectories.py +++ b/python/context/ball_trajectories.py @@ -19,6 +19,8 @@ import pam_configuration import tennicam_client +from context import LowPassFilter + assert int(npt.__version__[0]) >= 2, "Need nptyping >=2." @@ -83,7 +85,7 @@ def to_stamped_trajectory(input: DurationTrajectory) -> StampedTrajectory: return stamps, positions -def to_duration_trajectory(input: StampedTrajectory) -> DurationTrajectory: +def to_duration_trajectory(input: StampedTrajectory, vel_filter_window_size: int = 1) -> DurationTrajectory: """ Converts a StampedTrajectories to a DurationTrajectory. The velocities are estimated by performing finite differences. @@ -92,6 +94,12 @@ def to_duration_trajectory(input: StampedTrajectory) -> DurationTrajectory: dp = np.diff(input[1], axis=0) velocities = (dp.T / (dt * 1e-6)).T positions = input[1][:-1, :] + + #velocity filtering + window_size = 5 + filter_lp = [LowPassFilter(vel_filter_window_size), LowPassFilter(vel_filter_window_size), LowPassFilter(vel_filter_window_size)] + velocities = [[filter_lp[i].get(v[i]) for i in range(3)] for v in velocities] + return dt, positions, velocities @@ -455,7 +463,7 @@ def __init__(self, group: str, hdf5_path: pathlib.Path = None): self._data: typing.Dict[ int, StampedTrajectory ] = rbt.get_stamped_trajectories(group, direct=True) - + def size(self) -> int: """ Returns the number of trajectories that have been loaded. @@ -499,22 +507,22 @@ def get_different_random_trajectories( return [self._data[index] for index in indexes[:nb_trajectories]] @staticmethod - def to_duration(input: StampedTrajectory) -> DurationTrajectory: + def to_duration(input: StampedTrajectory, vel_filter_window_size: int = 1) -> DurationTrajectory: """ Returns a corresponding duration trajectory """ - return to_duration_trajectory(input) + return to_duration_trajectory(input, vel_filter_window_size) @classmethod def iterate( - cls, input: StampedTrajectory + cls, input: StampedTrajectory, vel_filter_window_size: int = 1 ) -> typing.Generator[DurationPoint, None, None]: """ Generator over the trajectory. Yields tuples (duration in microseconds, state), state having a position and a velocity attribute. """ - durations, positions, velocities = cls.to_duration(input) + durations, positions, velocities = cls.to_duration(input, vel_filter_window_size) for d, p, v in zip(durations, positions, velocities): yield d, o80.Item3dState(p, v) return From fe99901b6eca3b663f9ad5bc8eff5b9175d0534b Mon Sep 17 00:00:00 2001 From: Simon Guist Date: Thu, 23 Jan 2025 11:46:31 +0100 Subject: [PATCH 2/7] bug fix --- bin/pam_ball_trajectories.py | 2 +- python/context/ball_trajectories.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/bin/pam_ball_trajectories.py b/bin/pam_ball_trajectories.py index 50e1154..089b19d 100755 --- a/bin/pam_ball_trajectories.py +++ b/bin/pam_ball_trajectories.py @@ -66,7 +66,7 @@ def _add_tennicam(hdf5_path: pathlib.Path, group_name: str): with bt.MutableRecordedBallTrajectories(path=hdf5_path) as rbt: if group_name in rbt.get_groups(): raise ValueError("group {} already present in the file") - nb_added = rbt.add_tennicam_trajectories(group_name, pathlib.Path.cwd()) + nb_added = rbt.add_tennicam_trajectories(group_name, pathlib.Path.cwd()) logging.info("added {} trajectories".format(nb_added)) diff --git a/python/context/ball_trajectories.py b/python/context/ball_trajectories.py index 7abf631..d15a3cc 100644 --- a/python/context/ball_trajectories.py +++ b/python/context/ball_trajectories.py @@ -96,7 +96,6 @@ def to_duration_trajectory(input: StampedTrajectory, vel_filter_window_size: int positions = input[1][:-1, :] #velocity filtering - window_size = 5 filter_lp = [LowPassFilter(vel_filter_window_size), LowPassFilter(vel_filter_window_size), LowPassFilter(vel_filter_window_size)] velocities = [[filter_lp[i].get(v[i]) for i in range(3)] for v in velocities] From 24c9284812b3865a4b8e3e181cc8aa9f9dd138ef Mon Sep 17 00:00:00 2001 From: Simon Guist Date: Thu, 23 Jan 2025 11:52:47 +0100 Subject: [PATCH 3/7] nit: no filtering with window size 1 --- python/context/ball_trajectories.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/context/ball_trajectories.py b/python/context/ball_trajectories.py index d15a3cc..091ce39 100644 --- a/python/context/ball_trajectories.py +++ b/python/context/ball_trajectories.py @@ -96,8 +96,9 @@ def to_duration_trajectory(input: StampedTrajectory, vel_filter_window_size: int positions = input[1][:-1, :] #velocity filtering - filter_lp = [LowPassFilter(vel_filter_window_size), LowPassFilter(vel_filter_window_size), LowPassFilter(vel_filter_window_size)] - velocities = [[filter_lp[i].get(v[i]) for i in range(3)] for v in velocities] + if vel_filter_window_size > 1: + filter_lp = [LowPassFilter(vel_filter_window_size), LowPassFilter(vel_filter_window_size), LowPassFilter(vel_filter_window_size)] + velocities = [[filter_lp[i].get(v[i]) for i in range(3)] for v in velocities] return dt, positions, velocities From 060de1d5067c4a04304623db72d9150fa4af13ec Mon Sep 17 00:00:00 2001 From: Simon Guist Date: Sun, 6 Apr 2025 13:39:15 +0200 Subject: [PATCH 4/7] optionally return index of random trajectory --- python/context/ball_trajectories.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/context/ball_trajectories.py b/python/context/ball_trajectories.py index 091ce39..3147dc4 100644 --- a/python/context/ball_trajectories.py +++ b/python/context/ball_trajectories.py @@ -483,12 +483,15 @@ def get_trajectory(self, index: int) -> StampedTrajectory: """ return self._data[index] - def random_trajectory(self) -> StampedTrajectory: + def random_trajectory(self, return_index: bool = False) -> Union[StampedTrajectory, Tuple[StampedTrajectory, int]]: """ Returns one of the trajectory, randomly selected. + If return_index is True, also returns the selected index. """ index = random.choice(list(range(len(self._data.keys())))) - return self._data[index] + trajectory = self._data[index] + + return (trajectory, index) if return_index else trajectory def get_different_random_trajectories( self, nb_trajectories: int From e7597cc4af769a2f497705663e17fbe511b312e5 Mon Sep 17 00:00:00 2001 From: Simon Guist Date: Sun, 6 Apr 2025 13:40:24 +0200 Subject: [PATCH 5/7] optionally return index of random trajectory --- python/context/ball_trajectories.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/context/ball_trajectories.py b/python/context/ball_trajectories.py index 3147dc4..2fe74bb 100644 --- a/python/context/ball_trajectories.py +++ b/python/context/ball_trajectories.py @@ -8,6 +8,8 @@ from __future__ import annotations import typing import nptyping as npt +from typing import Union, Tuple + import random import math From 35f669d8655dc08351d3cdcc6aec2780254325a6 Mon Sep 17 00:00:00 2001 From: Simon Guist Date: Sun, 6 Apr 2025 13:43:32 +0200 Subject: [PATCH 6/7] tennicam_client import in function --- python/context/ball_trajectories.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/context/ball_trajectories.py b/python/context/ball_trajectories.py index 2fe74bb..7fe97a6 100644 --- a/python/context/ball_trajectories.py +++ b/python/context/ball_trajectories.py @@ -19,7 +19,6 @@ import o80 import pam_configuration -import tennicam_client from context import LowPassFilter @@ -300,6 +299,8 @@ def add_tennicam_trajectories( The number of trajectories added to the file. """ + import tennicam_client + def _read_trajectory(tennicam_file: pathlib.Path) -> StampedTrajectory: """ Parse the file and returned the corresponding From 46dc7a76f3d11005303be1b479df9bebc7f211bd Mon Sep 17 00:00:00 2001 From: Simon Guist Date: Mon, 14 Jul 2025 11:25:00 +0200 Subject: [PATCH 7/7] random trajectory translations --- python/context/ball_trajectories.py | 70 ++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/python/context/ball_trajectories.py b/python/context/ball_trajectories.py index 7fe97a6..f620b8e 100644 --- a/python/context/ball_trajectories.py +++ b/python/context/ball_trajectories.py @@ -466,7 +466,7 @@ def __init__(self, group: str, hdf5_path: pathlib.Path = None): self._data: typing.Dict[ int, StampedTrajectory ] = rbt.get_stamped_trajectories(group, direct=True) - + def size(self) -> int: """ Returns the number of trajectories that have been loaded. @@ -486,6 +486,35 @@ def get_trajectory(self, index: int) -> StampedTrajectory: """ return self._data[index] + def get_trajectory_with_translation( + self, index: int, translation: typing.Sequence[float] + ) -> StampedTrajectory: + """ + Returns the trajectory at the requested index, translated + by the provided translation vector. + """ + if len(translation) != 3: + raise ValueError("Translation must be a 3d vector") + trajectory = self._data[index] + positions = trajectory[1] + np.array(translation, np.float32) + return trajectory[0], positions + + def get_trajectory_with_random_translation( + self, index: int, translation_range: typing.Sequence[float] + ) -> StampedTrajectory: + """ + Returns the trajectory at the requested index, translated + by a random vector in the provided range. + The range is expected to be a 3d vector, i.e. [x_min, x_max, y_min, y_max, z_min, z_max]. + """ + if len(translation_range) != 6: + raise ValueError("Translation range must be a 6d vector") + translation = [ + random.uniform(translation_range[i*2], translation_range[i*2 + 1]) + for i in range(3) + ] + return self.get_trajectory_with_translation(index, translation) + def random_trajectory(self, return_index: bool = False) -> Union[StampedTrajectory, Tuple[StampedTrajectory, int]]: """ Returns one of the trajectory, randomly selected. @@ -493,9 +522,25 @@ def random_trajectory(self, return_index: bool = False) -> Union[StampedTrajecto """ index = random.choice(list(range(len(self._data.keys())))) trajectory = self._data[index] - + return (trajectory, index) if return_index else trajectory + def random_trajectory_with_random_translation( + self, translation_range: typing.Sequence[float], return_index: bool = False + ) -> Union[StampedTrajectory, Tuple[StampedTrajectory, int]]: + """ + Returns one of the trajectory, randomly selected, + translated by a random vector in the provided range. + The range is expected to be a 3d vector, i.e. [x_min, x_max, y_min, y_max, z_min, z_max]. + If return_index is True, also returns the selected index. + """ + index = random.choice(list(range(len(self._data.keys())))) + translated_trajectory = self.get_trajectory_with_random_translation( + index, translation_range + ) + + return (translated_trajectory, index) if return_index else translated_trajectory + def get_different_random_trajectories( self, nb_trajectories: int ) -> StampedTrajectories: @@ -512,6 +557,27 @@ def get_different_random_trajectories( random.shuffle(indexes) return [self._data[index] for index in indexes[:nb_trajectories]] + def get_different_random_trajectories_with_random_translation( + self, nb_trajectories: int, translation_range: typing.Sequence[float] + ) -> StampedTrajectories: + """ + Returns a list of trajectories, randomly + ordered and selected, translated by a random vector + in the provided range. + The range is expected to be a 3d vector, i.e. [x_min, x_max, y_min, y_max, z_min, z_max]. + """ + if nb_trajectories > self.size(): + raise ValueError( + "BallTrajectories: only {} trajectories " + "available ({} requested)".format(self.size(), nb_trajectories) + ) + indexes = list(self._data.keys()) + random.shuffle(indexes) + return [ + self.get_trajectory_with_random_translation(index, translation_range) + for index in indexes[:nb_trajectories] + ] + @staticmethod def to_duration(input: StampedTrajectory, vel_filter_window_size: int = 1) -> DurationTrajectory: """