diff --git a/bin/pam_ball_trajectories.py b/bin/pam_ball_trajectories.py index 50e1154..089b19d 100755 --- a/bin/pam_ball_trajectories.py +++ b/bin/pam_ball_trajectories.py @@ -66,7 +66,7 @@ def _add_tennicam(hdf5_path: pathlib.Path, group_name: str): with bt.MutableRecordedBallTrajectories(path=hdf5_path) as rbt: if group_name in rbt.get_groups(): raise ValueError("group {} already present in the file") - nb_added = rbt.add_tennicam_trajectories(group_name, pathlib.Path.cwd()) + nb_added = rbt.add_tennicam_trajectories(group_name, pathlib.Path.cwd()) logging.info("added {} trajectories".format(nb_added)) diff --git a/python/context/ball_trajectories.py b/python/context/ball_trajectories.py index e88ca64..f620b8e 100644 --- a/python/context/ball_trajectories.py +++ b/python/context/ball_trajectories.py @@ -8,6 +8,8 @@ from __future__ import annotations import typing import nptyping as npt +from typing import Union, Tuple + import random import math @@ -17,7 +19,8 @@ import o80 import pam_configuration -import tennicam_client + +from context import LowPassFilter assert int(npt.__version__[0]) >= 2, "Need nptyping >=2." @@ -83,7 +86,7 @@ def to_stamped_trajectory(input: DurationTrajectory) -> StampedTrajectory: return stamps, positions -def to_duration_trajectory(input: StampedTrajectory) -> DurationTrajectory: +def to_duration_trajectory(input: StampedTrajectory, vel_filter_window_size: int = 1) -> DurationTrajectory: """ Converts a StampedTrajectories to a DurationTrajectory. The velocities are estimated by performing finite differences. @@ -92,6 +95,12 @@ def to_duration_trajectory(input: StampedTrajectory) -> DurationTrajectory: dp = np.diff(input[1], axis=0) velocities = (dp.T / (dt * 1e-6)).T positions = input[1][:-1, :] + + #velocity filtering + if vel_filter_window_size > 1: + filter_lp = [LowPassFilter(vel_filter_window_size), LowPassFilter(vel_filter_window_size), LowPassFilter(vel_filter_window_size)] + velocities = [[filter_lp[i].get(v[i]) for i in range(3)] for v in velocities] + return dt, positions, velocities @@ -290,6 +299,8 @@ def add_tennicam_trajectories( The number of trajectories added to the file. """ + import tennicam_client + def _read_trajectory(tennicam_file: pathlib.Path) -> StampedTrajectory: """ Parse the file and returned the corresponding @@ -475,12 +486,60 @@ def get_trajectory(self, index: int) -> StampedTrajectory: """ return self._data[index] - def random_trajectory(self) -> StampedTrajectory: + def get_trajectory_with_translation( + self, index: int, translation: typing.Sequence[float] + ) -> StampedTrajectory: + """ + Returns the trajectory at the requested index, translated + by the provided translation vector. + """ + if len(translation) != 3: + raise ValueError("Translation must be a 3d vector") + trajectory = self._data[index] + positions = trajectory[1] + np.array(translation, np.float32) + return trajectory[0], positions + + def get_trajectory_with_random_translation( + self, index: int, translation_range: typing.Sequence[float] + ) -> StampedTrajectory: + """ + Returns the trajectory at the requested index, translated + by a random vector in the provided range. + The range is expected to be a 3d vector, i.e. [x_min, x_max, y_min, y_max, z_min, z_max]. + """ + if len(translation_range) != 6: + raise ValueError("Translation range must be a 6d vector") + translation = [ + random.uniform(translation_range[i*2], translation_range[i*2 + 1]) + for i in range(3) + ] + return self.get_trajectory_with_translation(index, translation) + + def random_trajectory(self, return_index: bool = False) -> Union[StampedTrajectory, Tuple[StampedTrajectory, int]]: """ Returns one of the trajectory, randomly selected. + If return_index is True, also returns the selected index. """ index = random.choice(list(range(len(self._data.keys())))) - return self._data[index] + trajectory = self._data[index] + + return (trajectory, index) if return_index else trajectory + + def random_trajectory_with_random_translation( + self, translation_range: typing.Sequence[float], return_index: bool = False + ) -> Union[StampedTrajectory, Tuple[StampedTrajectory, int]]: + """ + Returns one of the trajectory, randomly selected, + translated by a random vector in the provided range. + The range is expected to be a 3d vector, i.e. [x_min, x_max, y_min, y_max, z_min, z_max]. + If return_index is True, also returns the selected index. + """ + index = random.choice(list(range(len(self._data.keys())))) + translated_trajectory = self.get_trajectory_with_random_translation( + index, translation_range + ) + + return (translated_trajectory, index) if return_index else translated_trajectory def get_different_random_trajectories( self, nb_trajectories: int @@ -498,23 +557,44 @@ def get_different_random_trajectories( random.shuffle(indexes) return [self._data[index] for index in indexes[:nb_trajectories]] + def get_different_random_trajectories_with_random_translation( + self, nb_trajectories: int, translation_range: typing.Sequence[float] + ) -> StampedTrajectories: + """ + Returns a list of trajectories, randomly + ordered and selected, translated by a random vector + in the provided range. + The range is expected to be a 3d vector, i.e. [x_min, x_max, y_min, y_max, z_min, z_max]. + """ + if nb_trajectories > self.size(): + raise ValueError( + "BallTrajectories: only {} trajectories " + "available ({} requested)".format(self.size(), nb_trajectories) + ) + indexes = list(self._data.keys()) + random.shuffle(indexes) + return [ + self.get_trajectory_with_random_translation(index, translation_range) + for index in indexes[:nb_trajectories] + ] + @staticmethod - def to_duration(input: StampedTrajectory) -> DurationTrajectory: + def to_duration(input: StampedTrajectory, vel_filter_window_size: int = 1) -> DurationTrajectory: """ Returns a corresponding duration trajectory """ - return to_duration_trajectory(input) + return to_duration_trajectory(input, vel_filter_window_size) @classmethod def iterate( - cls, input: StampedTrajectory + cls, input: StampedTrajectory, vel_filter_window_size: int = 1 ) -> typing.Generator[DurationPoint, None, None]: """ Generator over the trajectory. Yields tuples (duration in microseconds, state), state having a position and a velocity attribute. """ - durations, positions, velocities = cls.to_duration(input) + durations, positions, velocities = cls.to_duration(input, vel_filter_window_size) for d, p, v in zip(durations, positions, velocities): yield d, o80.Item3dState(p, v) return