diff --git a/examples/motion_pvt_sequence_generation/Pipfile b/examples/motion_pvt_sequence_generation/Pipfile index 02cddde..87b79e0 100644 --- a/examples/motion_pvt_sequence_generation/Pipfile +++ b/examples/motion_pvt_sequence_generation/Pipfile @@ -7,6 +7,7 @@ name = "pypi" matplotlib = "*" numpy = "*" scipy = "*" +zaber-motion = "*" [dev-packages] black = "*" diff --git a/examples/motion_pvt_sequence_generation/Pipfile.lock b/examples/motion_pvt_sequence_generation/Pipfile.lock index a884eab..6fcb68d 100644 --- a/examples/motion_pvt_sequence_generation/Pipfile.lock +++ b/examples/motion_pvt_sequence_generation/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "30726dd61b9bda3032ca111bccf5f956d28b1202e4a9913c58d90abe0aa3077f" + "sha256": "68cecb6b0ff5eeadd07dbb2e9be1d5ee5f879962ee0c30b025c4ecfd4312a481" }, "pipfile-spec": 6, "requires": { @@ -394,6 +394,14 @@ "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==2.8.2" }, + "reactivex": { + "hashes": [ + "sha256:0004796c420bd9e68aad8e65627d85a8e13f293de76656165dffbcb3a0e3fb6a", + "sha256:e912e6591022ab9176df8348a653fe8c8fa7a301f26f9931c9d8c78a650e04e8" + ], + "markers": "python_version >= '3.7' and python_version < '4.0'", + "version": "==4.0.4" + }, "scipy": { "hashes": [ "sha256:00150c5eae7b610c32589dda259eacc7c4f1665aedf25d921907f4d08a951b1c", @@ -433,6 +441,29 @@ ], "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==1.16.0" + }, + "typing-extensions": { + "hashes": [ + "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466", + "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548" + ], + "markers": "python_version >= '3.9'", + "version": "==4.15.0" + }, + "zaber-motion": { + "hashes": [ + "sha256:005497c65edb5b0c44ac85154891b3cd206ff69c5cd758085d1617a8ea8686bc", + "sha256:1876ccb177d666590cc504e6a8725113fbed031380c3e0715f59ddac4ae05a67", + "sha256:2a5f3ed5bc8cfa41e01d2090cc8e74f80c5b84c5d2a47f3e23177d23e36bebe8", + "sha256:34fed8f32d156073178d3c90069a52ef9e10d7f5da7eecc49452d835c65aae1d", + "sha256:850e46bd010111af2bd85f5f34eb0f421029453326b79a8a054581dd7f038bbf", + "sha256:8fb809f567ad80e9ca534d7bfb72429849bc9495a3faa9616b711964eb4c3f90", + "sha256:abd7de381feaaee4c4bdec0b3ee4859fba89bfb68fafdb42311869e951241f74", + "sha256:f7a5495883d02ac856d367ff5f3676ffeff164dfc685fc666bcf58f26a54595b" + ], + "index": "pypi", + "markers": "python_version >= '3.8'", + "version": "==7.12.0" } }, "develop": { diff --git a/examples/motion_pvt_sequence_generation/README.md b/examples/motion_pvt_sequence_generation/README.md index 161426f..9c17da3 100644 --- a/examples/motion_pvt_sequence_generation/README.md +++ b/examples/motion_pvt_sequence_generation/README.md @@ -2,10 +2,7 @@ *By Jeff Homer, Firmware Team* -This repository contains code to complement our article [Motion Planning with Position-Velocity-Time](https://www.zaber.com/articles/motion-planning-with-position-velocity-time). The cubic polynomial PVT algorithm -described in the article is implemented in Python, along with helper functions to automatically -generate missing parameters, plot the generated path and trajectory, and save the results -to a CSV file that is compatible with [Zaber Launcher](https://zaber.com/zaber-launcher#download)'s PVT Viewer App. +This repository contains code to complement our article [Motion Planning with Position-Velocity-Time](https://www.zaber.com/articles/motion-planning-with-position-velocity-time). The cubic polynomial PVT algorithm described in the article is implemented in Python, along with helper functions to automatically generate missing parameters, plot the generated path and trajectory, and save the results to a CSV file that is compatible with [Zaber Launcher](https://zaber.com/zaber-launcher#download)'s PVT Viewer App. ## Hardware Requirements @@ -69,7 +66,7 @@ The repository also contains some helper files that may be useful for extending ## The PVT File -The [pvt.py](pvt.py) file contains several helper classes and functions for creating and manipulating PVT sequences. +The [pvt.py](pvt.py) file contains several helper classes and functions for sampling and plotting PVT sequences. Import all the helper classes and functions in this file under the `pvt` namespace by adding the following import to your file: @@ -174,58 +171,99 @@ sequence.append_point(point4) sequence.append_point(point5) ``` -##### 2. Generate from a CSV file +##### 2. Generate from PvtSequenceData loaded from a CSV file -Initialize an instance of this class by providing a CSV file with position data, position-time data, or position-velocity-time data, by using the static class method `pvt.Sequence.from_csv()`. +Zaber Motion Library (ZML) provides a utility function for loading a CSV file with position data, position-time data, or position-velocity-time data, by using the static class method `PvtSequence.load_sequence_data()`. For example, generate a PVT sequence from the sample position-only data file [spiral_2d.csv](sample_data/position_data/spiral_2d.csv): ```python -sequence = pvt.Sequence.from_csv("sample_data/position_data/spiral_2d.csv") +from zaber_motion.ascii import PvtSequence +import pvt + +csv_data = PvtSequence.load_sequence_data("sample_data/position_data/spiral_2d.csv") +sequence = pvt.Sequence.from_sequence_data(csv_data.sequence_data) ``` -##### 3. Generate from position data +##### 3. Generate using ZML's PVT Sequence Generation API + +Note that because the `Sequence` class doesn't attempt to do any scaling based on units, all units used for position, velocity and acceleration must be of the same scale (i.e, [cm, cm/s, cm/s²], [°, °/s, °/s²]) and time must be specified in seconds. + +For more information on ZML's PVT sequence generation API, please see our [how-to guide](https://software.zaber.com/motion-library/docs/guides/device_features/pvt_sequence_generation) and [API documentation](https://software.zaber.com/motion-library/api/py/ascii/pvtsequence#generatepositions). -Initialize an instance of this class by providing position data, by using the static class method `pvt.Sequence.generate_times_and_velocities()`. +###### a. From position data + +Initialize an instance of this class from a `PvtSequenceData` object generated from position data, by using the static class method `PvtSequence.generate_velocities_and_times()`. This method works by creating a [geometric path](#the-geometricpath-class) from the provided position data, which parameterizes the curve as a function of distance. The missing velocity and time parameters are then generated by traversing this geometric path using a trapezoidal speed profile. For example, generate a 2-D PVT sequence from some x and y position vectors: ```python -x_positions = [1, 2, 3] -y_positions = [4, 5, 6] -sequence = pvt.Sequence.generate_times_and_velocities([x_positions, y_positions]) +from zaber_motion import Units, Measurement, MeasurementSequence +from zaber_motion.ascii import PvtSequence +import pvt + +target_speed = Measurement(1, Units.VELOCITY_CENTIMETRES_PER_SECOND) +target_accel = Measurement(0.5, Units.ACCELERATION_CENTIMETRES_PER_SECOND_SQUARED) +position_sequences = [ + MeasurementSequence([1, 2, 3], Units.LENGTH_CENTIMETRES), + MeasurementSequence([4, 5, 6], Units.LENGTH_CENTIMETRES) +] +sequence_data = PvtSequence.generate_velocities_and_times(position_sequences, target_speed, target_accel) +sequence = pvt.Sequence.from_sequence_data(sequence_data) ``` -##### 4. Generate from position-time or position-velocity-time data +###### b. From position-time or partial position-velocity-time data -Initialize an instance of this class by providing position-time or position-velocity-time data, by using the static class method `pvt.Sequence.generate_velocities()`. +Initialize an instance of this class from a `PvtSequenceData` object generated from position-time data, by using the static class method `PvtSequence.generate_velocities()`. This method works by generating velocities such that acceleration is continuous at segment transitions. The velocity at the first and last point in the sequence will be set to zero if unspecified. For example, generate a 1-D PVT sequence from position and time data: ```python -time = [0, 1, 2] -position = [4, 5, 6] -sequence = pvt.Sequence.generate_velocities(time, position) +from zaber_motion import Units, Measurement, MeasurementSequence +from zaber_motion.ascii import PvtSequence +import pvt + +times = MeasurementSequence([0, 1, 2], Units.TIME_SECONDS) +positions = [MeasurementSequence([4, 5, 6], Units.LENGTH_CENTIMETRES)] +sequence_data = PvtSequence.generate_velocities(positions, times, times_relative=False) +sequence = pvt.Sequence.from_sequence_data(sequence_data) +``` + +Or with partially specified velocities: +```python +from zaber_motion import Units, Measurement, MeasurementSequence, OptionalMeasurementSequence +from zaber_motion.ascii import PvtSequence +import pvt + +times = MeasurementSequence([0, 1, 2], Units.TIME_SECONDS) +positions = [MeasurementSequence([4, 5, 6, 4, 5], Units.LENGTH_CENTIMETRES)] +velocities = [OptionalMeasurementSequence([0, 1, None, None, 0], Units.VELOCITY_CENTIMETRES_PER_SECOND)] +sequence_data = PvtSequence.generate_velocities(positions, times, velocities, times_relative=False) +sequence = pvt.Sequence.from_sequence_data(sequence_data) ``` -##### 5. Generate from velocity-time data +###### c. Generate from velocity-time data -Initialize an instance of this class by providing velocity-time data, by using the static class method `pvt.Sequence.generate_positions()`. +Initialize an instance of this class from a `PvtSequenceData` object generated from velocity-time data, by using the static class method `PvtSequence.generate_positions()`. This method works by generating velocities such that acceleration is -continuous at segment transitions. The velocity at the first and last -point in the sequence will be set to zero if unspecified. +continuous at segment transitions. For example, generate a 1-D PVT sequence from position and time data: ```python -time = [0, 1, 2] -velocities = [4, 5, 6] -sequence = pvt.Sequence.generate_positions(time, velocities) +from zaber_motion import Units, Measurement, MeasurementSequence +from zaber_motion.ascii import PvtSequence +import pvt + +times = MeasurementSequence([0, 1, 2], Units.TIME_SECONDS) +velocities = [MeasurementSequence([4, 5, 6], Units.VELOCITY_CENTIMETRES_PER_SECOND)] +sequence_data = PvtSequence.generate_positions(velocities, times) +sequence = pvt.Sequence.from_sequence_data(sequence_data) ``` #### Class Properties @@ -247,6 +285,32 @@ The class has the following methods: - `acceleration(time)` - Return the position at any time in the sequence. - `save_to_file(filename)` - Save the sequence to a CSV file. +#### Class Methods + +- `position(u)` - The position at the given parameterization length. +- `direction(u)` - The unit vector describing the direction or tangent of the path at the given parameterization length. +- `segment_length(u0, uf)` - The arc length between some starting and some final parameterization length. + +## The Visualization File + +The [visualization.py](visualization.py) file contains several functions for plotting PVT sequence trajectories and paths: + +- `plot_pvt_trajectory(sequence, ...)` - Plots the position, velocity, and acceleration profiles for a given PVT sequence. +- `plot_pvt_path(sequence, ...)` - Plots the 2-D or 3-D geometric path of a given PVT sequence and scaled arrows representing the velocity vector (or tangent) at each point. +- `plot_path_and_trajectory(sequence, ...)` - Combines the previous two functions and plots both the PVT trajectories and the geometric path for the given PVT sequence. + +Import the functions in this file by adding the following import to your file: + +```python +from visualization import plot_pvt_trajectory, plot_pvt_path, plot_path_and_trajectory +``` + +## The Sequence Generators File + +The functions and utility classes in this file have now been ported directly to Zaber Motion Library. They remain here in case anyone is interested in reading and studying their implementation: we assume that even the most technically proficient user may find it easier to read and understand their implementation in python than in golang. + +This file contains the original implementations of the functions, `PvtSequence.generate_velocities()`, `PvtSequence.generate_positions()` and `PvtSequence.generate_velocities_and_times()`, the `GeometricPath` helper class, and also a function for interpolating velocities using finite difference. + ### The `GeometricPath` Class A geometric path ties together a sequence of N-D position vectors, creating a continuous positional path between them. The generated path does not contain any velocity or time information, it creates a purely positional relationship between the points. @@ -255,7 +319,7 @@ The following image shows a geometric path consisting of six points. Note that t ![A geometric path and its defining points](img/geometric-path.png "Geometric Path") -Under the hood, the geometric path is represented by an multi-dimensional B-spline. The generated curve is parameterized by a normalized parameterization length `u` that goes from 0, defining the first point, to 1, defining the last. +Under the hood, the geometric path is represented by an multi-dimensional B-spline (not-a-knot cubic splines). The generated curve is parameterized by a normalized parameterization length `u` that goes from 0, defining the first point, to 1, defining the last. For more information about B-splines, see . @@ -277,23 +341,3 @@ The class has the following read-only properties: - `length` - The length of the path, from start to finish. - `parameterized_lengths` - The normalized lengths corresponding to each of the points used to construct the path. These range from 0, for the first point, to 1, for the final point. - -#### Class Methods - -- `position(u)` - The position at the given parameterization length. -- `direction(u)` - The unit vector describing the direction or tangent of the path at the given parameterization length. -- `segment_length(u0, uf)` - The arc length between some starting and some final parameterization length. - -## The Visualization File - -The [visualization.py](visualization.py) file contains several functions for plotting PVT sequence trajectories and paths: - -- `plot_pvt_trajectory(sequence, ...)` - Plots the position, velocity, and acceleration profiles for a given PVT sequence. -- `plot_pvt_path(sequence, ...)` - Plots the 2-D or 3-D geometric path of a given PVT sequence and scaled arrows representing the velocity vector (or tangent) at each point. -- `plot_path_and_trajectory(sequence, ...)` - Combines the previous two functions and plots both the PVT trajectories and the geometric path for the given PVT sequence. - -Import the functions in this file by adding the following import to your file: - -```python -from visualization import plot_pvt_trajectory, plot_pvt_path, plot_path_and_trajectory -``` diff --git a/examples/motion_pvt_sequence_generation/article.yml b/examples/motion_pvt_sequence_generation/article.yml index 4a97258..d67a69c 100644 --- a/examples/motion_pvt_sequence_generation/article.yml +++ b/examples/motion_pvt_sequence_generation/article.yml @@ -1,7 +1,7 @@ authors: - Jeff Homer date: 2023-12-12 -updated_date: 2023-12-12 +updated_date: 2025-10-07 category: Motion tags: - Python diff --git a/examples/motion_pvt_sequence_generation/generate_pvt_sequence.py b/examples/motion_pvt_sequence_generation/generate_pvt_sequence.py index 01c0f3c..aef9555 100644 --- a/examples/motion_pvt_sequence_generation/generate_pvt_sequence.py +++ b/examples/motion_pvt_sequence_generation/generate_pvt_sequence.py @@ -2,8 +2,7 @@ Generate the missing parameters for a given PVT sequence. This script can be used to generate a fully-defined PVT sequence -from position data, position-time data, velocity-time data, or position-velocity-time -data (with some velocity values missing). +from position data, position-time data or velocity-time data. Use the script settings to toggle visualization of the generated sequence, and writing it to a CSV file compatible with Zaber @@ -11,18 +10,14 @@ Input data must be given in CSV form, with columns for time, position in each dimension, and velocity in each dimension. Missing parameters -can only be generated in the following two cases: -- No velocity or time information is provided (i.e., there are - no such columns or all values in corresponding columns are - empty.) In this case, all velocity and time parameters will - be generated. -- No position information is provided (i.e., there are no such columns - or all values in the corresponding columns are empty.) In this case, - all position values will be generated. -- Some or all velocity information is missing (i.e., there are no - velocity columns at all, or there is a velocity column for - each position column, and some or all values are empty.) In this - case only the missing velocity parameters will be generated. +can only be generated in the following three cases: +- No velocity or time information is provided (i.e., there are no such + columns). In this case, all velocity and time parameters will be + generated. +- No position information is provided (i.e., there is no such column). + In this case, all position values will be generated. +- No velocity information is provided (i.e. there is no such column). + In this case, all velocity values will be generated. 1-D, 2-D, and 3-D sample data is also provided and can be found in the subdirectory "sample_data". @@ -32,6 +27,8 @@ import pvt from visualization import plot_path_and_trajectory +from zaber_motion import Measurement, Units +from zaber_motion.ascii import PvtSequence # ------------------- Script Settings ---------------------- @@ -39,9 +36,9 @@ """The directory of the input file(s)""" FILENAMES = ["wave_1d.csv", "spiral_2d.csv", "spiral_3d.csv"] """The names of the input files to read.""" -TARGET_SPEED = 6 +TARGET_SPEED = Measurement(6, Units.VELOCITY_CENTIMETRES_PER_SECOND) """The target speed to use when generating velocities and times.""" -TARGET_ACCEL = 10 +TARGET_ACCEL = Measurement(10, Units.ACCELERATION_CENTIMETRES_PER_SECOND_SQUARED) """The target aceleration to use when generating velocities and times.""" SHOW_PLOTS = True """Whether to plot the generated sequences.""" @@ -60,17 +57,20 @@ def main() -> None: """Generate complete PVT sequences from underdefined input data.""" for filename in FILENAMES: # Generate the sequence - pvt_sequence = pvt.Sequence.from_csv( + sequence_data = pvt.sequence_data_from_csv( os.path.join(DATA_DIRECTORY, filename), TARGET_SPEED, TARGET_ACCEL ) + if sequence_data is None: + return + if SHOW_PLOTS: # Plot the sequence - plot_path_and_trajectory(pvt_sequence) + plot_path_and_trajectory(sequence_data, times_relative=True) if OUTPUT_DIRECTORY is not None: # Write the file with the same name plus a _generated suffix base, extension = filename.rsplit(".", 1) output_filename = f"{base}_generated.{extension}" - pvt_sequence.save_to_file(os.path.join(OUTPUT_DIRECTORY, output_filename)) + PvtSequence.save_sequence_data(sequence_data, os.path.join(OUTPUT_DIRECTORY, output_filename)) if __name__ == "__main__": diff --git a/examples/motion_pvt_sequence_generation/pvt.py b/examples/motion_pvt_sequence_generation/pvt.py index f71deda..820b257 100644 --- a/examples/motion_pvt_sequence_generation/pvt.py +++ b/examples/motion_pvt_sequence_generation/pvt.py @@ -11,18 +11,117 @@ import csv from dataclasses import dataclass from enum import Enum, auto -from functools import partial -from itertools import accumulate -import math import numpy as np -from numpy import float64 -from numpy.typing import NDArray -from scipy.integrate import quad # type: ignore -from scipy.interpolate import splev, splprep # type: ignore -from scipy.linalg import solve_banded # type: ignore -from scipy.optimize import bisect, newton # type: ignore +from zaber_motion import Measurement +from zaber_motion.ascii import PvtSequenceData, PvtSequence + +@staticmethod +def sequence_data_from_csv( + filename: str, target_speed: Measurement | None = None, target_accel: Measurement | None = None +) -> PvtSequenceData | None: + """ + Return a PVT sequence data object loaded from CSV. + + This function will load all the given data and attempt to + generate any missing parameters. Generation is only possible + in the following cases: + - No velocity or time information is provided (i.e., there are + no such columns). In this case, all velocity and time parameters + will be generated. + - No position information is provided (i.e., there is no such column). + In this case, all position values will be generated. + - No velocity information is provided (i.e., there is no such column). + In this case, all velocity values will be generated. + + This function assumes the file has a header row with all + necessary columns, and that position and velocity columns + are ordered according to their axis number (i.e., if position + column 1 comes first, then so must velocity column 1.) + Additionally: + - Position columns must have a header containing the keyword "pos" + - Velocity columns must have a header containing the keyword "vel" + - The time column must have a header containing the keyword "time" + No information is read from the header names except their identifying + keyword. + + :param filename: The name of the csv file to load. + :param target_speed: The target speed used for generating velocities and times. + :param target_accel: The target acceleration used for generating velocities and times. + :return: The generated PVT sequence. + """ + + class GenerationType(Enum): + """An enum describing what parameters to generate.""" + + NONE = auto() + """Don't generate any parameters.""" + TIME_AND_VELOCITY = auto() + """Generate time and velocity.""" + POSITION = auto() + """Generate position only.""" + VELOCITY = auto() + """Generate velocity only.""" + + # Read the data and do some sanity checks + data = PvtSequence.load_sequence_data(filename) + + contains_time_data = len(data.sequence_data.times.values) > 0 + contains_position_data = ( + len(data.sequence_data.positions) > 0 and len(data.sequence_data.positions[0].values) > 0 + ) + contains_velocity_data = ( + len(data.sequence_data.velocities) > 0 and len(data.sequence_data.velocities[0].values) > 0 + ) + + gen_type = GenerationType.NONE + if not contains_time_data: + gen_type = GenerationType.TIME_AND_VELOCITY + assert not contains_velocity_data and contains_position_data, ( + "Invalid csv structure. Time can only be generated if position is specified and " + "velocity is unspecified." + ) + elif not contains_velocity_data: + gen_type = GenerationType.VELOCITY + assert contains_position_data, ( + "Invalid csv structure. If velocity is unspecified, position must be specified." + ) + elif not contains_position_data: + gen_type = GenerationType.POSITION + assert contains_velocity_data, ( + "Invalid csv structure. If position is unspecified, " + "velocity and time must both be specified" + ) + + # Call the appropriate generation function + match gen_type: + case GenerationType.NONE: + return data.sequence_data + case GenerationType.TIME_AND_VELOCITY: + assert ( + target_speed is not None and target_accel is not None + ), "Target speed and accel must be defined to generate velocities and times" + return PvtSequence.generate_velocities_and_times( + data.sequence_data.positions, + target_speed, + target_accel, + ) + case GenerationType.POSITION: + return PvtSequence.generate_positions( + data.sequence_data.velocities, + data.sequence_data.times, + times_relative=True, + ) + case GenerationType.VELOCITY: + return PvtSequence.generate_velocities( + data.sequence_data.positions, + data.sequence_data.times, + velocities=None, + times_relative=True, + ) + case _: + return data.sequence_data @dataclass(frozen=True) class Point: @@ -153,394 +252,6 @@ def _validate_time(self, time: float) -> None: ), f"Time {time} is outside of segment range ({time_min}, {time_max})" -class GeometricPath: - """An N-directional geometric path constructed from a sequence of position keypoints.""" - - def __init__(self, position_sequences: list[list[float]]): - """ - Initialize an N-dimensional geometric path from a list of position sequences. - - The path is generated as a B-Spline, and is parametrized by - a normalized length variable u. That is, the path starts at - u=0 and ends at u=1. - - :param position_sequences: A list of position sequences, one - for each dimension. - """ - self._dim = len(position_sequences) - tck, u = splprep( # pylint: disable=unbalanced-tuple-unpacking - position_sequences, s=0, full_output=0 - ) - self._tck: tuple[NDArray[float64], list[NDArray[float64]], int] = tck - self._u: list[float] = list(u) - self._length_at_u = list( - accumulate( - ( - self._calculate_segment_length(u0, uf) - for u0, uf in zip(self._u[:-1], self._u[1:]) - ), - initial=0, - ) - ) - - @property - def length(self) -> float: - """The length of the path.""" - return self._length_at_u[-1] - - @property - def parameterized_lengths(self) -> list[float]: - """ - The sequence of parameterized lengths. - - These lengths correspond to the position keypoints - used to construct the path. - """ - return self._u.copy() - - def position(self, u: float) -> tuple[float, ...]: - """ - Return the N-D path position at parameterized length u. - - :param u: The parameterized length u. - """ - # By default, splev returns an array for each element. Pick - # the one and only value by flattening the arrays. - val = splev(u, self._tck) - return tuple(val[i].flat[0] for i in range(self._dim)) - - def direction(self, u: float) -> tuple[float, ...]: - """ - Return the N-D path direction, as a unit vector, at parameterized distance u. - - :param u: The parameterized length u. - """ - dx_du = self.dx_du(u) - norm = sum(val**2 for val in dx_du) ** 0.5 - if norm == 0: - assert all(dx_du_i == 0 for dx_du_i in dx_du) - return tuple(0 for _ in dx_du) - return tuple(val / norm for val in dx_du) - - def velocity(self, u: float, speed: float) -> tuple[float, ...]: - """ - Return the velocity at the given parameterized length. - - :param u: The parameterized length u. - :param speed: The tangential speed along the path. - """ - dl_du = self.dl_du(u) - return tuple(speed * dx_du / dl_du for dx_du in self.dx_du(u)) - - def acceleration(self, u: float, speed: float, accel: float) -> tuple[float, ...]: - """ - Return the acceleration at the given parameterized length. - - :param u: The parameterized length u. - :param speed: The tangential speed along the path. - :param accel: The tangential acceleration along the path. - """ - dx_dl = self.dx_dl(u) - d2x_dl2 = self.d2x_dl2(u) - return tuple( - speed**2 * d2x_dl2_i + accel * dx_dl_i for dx_dl_i, d2x_dl2_i in zip(dx_dl, d2x_dl2) - ) - - def _calculate_segment_length(self, u0: float, uf: float) -> float: - """ - Calculate the path length between u0 and uf. - - :param u0: The start point of the measurement, in parameterized units. - :param uf: The end point of the measurement, in parameterized units. - """ - return float(quad(self.dl_du, u0, uf)[0]) - - def segment_length(self, u0: float, uf: float) -> float: - """ - Return the path length between u0 and uf. - - :param u0: The start point of the measurement, in parameterized units. - :param uf: The end point of the measurement, in parameterized units. - """ - first_index_after_u0 = next((i for i, u in enumerate(self._u) if u >= u0), len(self._u) - 1) - last_index_before_uf = next( - (i - 1 for i, u in enumerate(self._u) if u > uf), len(self._u) - 1 - ) - if last_index_before_uf >= first_index_after_u0 - 1 or last_index_before_uf < 0: - return self._calculate_segment_length(u0, uf) - - length = self._calculate_segment_length(u0, self._u[first_index_after_u0]) - length += self._length_at_u[last_index_before_uf] - self._length_at_u[first_index_after_u0] - length += self._calculate_segment_length(self._u[last_index_before_uf], uf) - assert (uf >= u0) == ( - length >= 0 - ), f"{u0} {uf} {first_index_after_u0} {last_index_before_uf}" - return length - - def calc_u_at_length(self, length: float) -> float: - """ - Return the parameterization length for a given real length. - - :param length: The length at which we want to calculate u. - """ - # Find an estimate of u via linear interpolation - u_estimate = np.interp(length, self._length_at_u, self._u) - i0 = next((i - 1 for i, u in enumerate(self._u) if u > u_estimate), len(self._u) - 1) - u0 = self._u[i0] - - # Calculate u using an optimization algorithm - delta_len = length - self._length_at_u[i0] - u: float = newton( - lambda u: self.segment_length(u0, u) - delta_len, - u_estimate, - self.dl_du, - fprime2=self.d2l_du2, - ) - return u - - def dx_du(self, u: float, derivative_number: float = 1) -> tuple[float, ...]: - """ - Return the derivative of N-D path position with respect to u. - - :param u: The parameterized length u. - :param derivative_number: The derivative (defaults to the first derivative). - """ - # By default, splev returns an array for each element. Pick - # the one and only value by flattening the arrays. - val = splev(u, self._tck, derivative_number) - return tuple(val[i].flat[0] for i in range(self._dim)) - - def dl_du(self, u: float) -> float: - """ - Return the derivative of path length with respect to the parameterization variable at u. - - :param u: The parameterized length u. - """ - dx_du = self.dx_du(u) - return float(sum(val**2 for val in dx_du) ** 0.5) - - def d2l_du2(self, u: float) -> float: - """ - Return the second derivative of path length with respect to the parameterization variable. - - :param u: The parameterized length u. - """ - dl_du = self.dl_du(u) - dx_du = self.dx_du(u) - d2p_du2 = self.dx_du(u, 2) - if dl_du == 0: - assert sum(2 * dx_du[i] * d2p_du2[i] for i in range(self._dim)) == 0 - return 0 - return sum(2 * dx_du[i] * d2p_du2[i] for i in range(self._dim)) / (2 * dl_du) - - def dx_dl(self, u: float) -> tuple[float, ...]: - """ - Return the first derivative of x with respect to path length at u. - - :param u: The parameterized length u. - """ - dl_du = self.dl_du(u) - dx_du = self.dx_du(u) - if dl_du == 0: - assert all(dx_du_i == 0 for dx_du_i in dx_du) - return tuple(0 for _ in dx_du) - return tuple(dx_du_i / dl_du for dx_du_i in dx_du) - - def d2x_dl2(self, u: float) -> tuple[float, ...]: - """ - Return the second derivative of x with respect to path length at u. - - :param u: The parameterized length u. - """ - dl_du = self.dl_du(u) - d2x_du2 = self.dx_du(u, 2) - d2l_du2 = self.d2l_du2(u) - dx_du = self.dx_du(u) - if dl_du == 0: - assert all(dx_du_i == 0 for dx_du_i in dx_du) - return tuple(0 for _ in dx_du) - d2u_dl2 = -d2l_du2 / dl_du**3 - return tuple( - d2x_du2_i / dl_du**2 + dx_du_i * d2u_dl2 for dx_du_i, d2x_du2_i in zip(dx_du, d2x_du2) - ) - - -def generate_velocities_continuous_acceleration( - position_sequence: list[float], - time_sequence: list[float], - vel_start: float = 0, - vel_end: float = 0, -) -> list[float]: - """ - Generate velocities such that acceleration is continuous at each transition. - - This function solves for the velocities at each point that make the acceleration - at the end of each segment equal to the acceleration at the start of the next. - - Each segment's trajectory is represented by a cubic polynomial, where: - Δpᵢ(t) = c1ᵢ * t + c2ᵢ * t² + c3ᵢ * t³, and - vᵢ(t) = c1ᵢ + 2 * c2ᵢ * t + 3 * c3ᵢ * t² - aᵢ(t) = 2 * c2ᵢ + 6 * c3ᵢ * t - - Geneneration is done by solving the system Ax = b for x, where x is an array of PVT - coefficients for each segment (i.e., [c1₁, c2₁, c3₁, ... c1ₙ, c2ₙ, c3ₙ]. A is - a tridiagonal matrix, and b is an array. The system is formed by combining all of - the following constraint equations: - 1. position at the start and end of each segment is as specified by the user - 2. velocity is continous at each segment transition - 3. acceleration is continuous at each segment transition - 4. velocity at the start and end of the sequence is as specified by the user - """ - num_segments = len(time_sequence) - 1 - A = np.zeros((num_segments * 3, num_segments * 3)) # pylint: disable=invalid-name - b = np.zeros((num_segments * 3, 1)) - # Initial boundary condition - A[0, 0] = 1 - b[0] = vel_start - for segment_index in range(num_segments): - delta_time = time_sequence[segment_index + 1] - time_sequence[segment_index] - delta_pos = position_sequence[segment_index + 1] - position_sequence[segment_index] - # Position equation for segment - row_start = segment_index * 3 + 1 - col_start = segment_index * 3 - A[row_start, col_start : col_start + 3] = [delta_time, delta_time**2, delta_time**3] - b[row_start] = delta_pos - if segment_index < num_segments - 1: - # Middle segments, where velocity is unknown and accelerations are continuous - A[row_start + 1, col_start + 1 : col_start + 4] = [delta_time, 2 * delta_time**2, -1] - b[row_start + 1] = -delta_pos / delta_time - A[row_start + 2, col_start + 2 : col_start + 5] = [delta_time, 1 / delta_time, -1] - b[row_start + 2] = delta_pos / delta_time**2 - else: - # Final segment - A[row_start + 1, col_start + 1 : col_start + 3] = [delta_time, 2 * delta_time**2] - b[row_start + 1] = vel_end - delta_pos / delta_time - # Get banded form of matrix - ab = np.array( - [ - [0] + [A[i, i + 1] for i in range(num_segments * 3 - 1)], - [A[i, i] for i in range(num_segments * 3)], - [A[i + 1, i] for i in range(num_segments * 3 - 1)] + [0], - ] - ) - coefficients = solve_banded((1, 1), ab, b) - # Generate velocities - vel_sequence = [vel_start] - for segment_index in range(1, num_segments): - vel_sequence.append(float(coefficients[segment_index * 3])) - vel_sequence.append(vel_end) - return vel_sequence - - -def generate_positions_continuous_acceleration( - velocity_sequence: list[float], - time_sequence: list[float], - pos_start: float = 0, - pos_end: float = 0, -) -> list[float]: - """ - Generate positions such that acceleration is continuous at each transition. - - This function solves for the positions at each point that make the acceleration - at the end of each segment equal to the acceleration at the start of the next. - - Each segment's trajectory is represented by a cubic polynomial, where: - Δpᵢ(t) = vᵢ(0) * t + c2ᵢ * t² + c3ᵢ * t³, and - vᵢ(t) = vᵢ(0) + 2 * c2ᵢ * t + 3 * c3ᵢ * t² - aᵢ(t) = 2 * c2ᵢ + 6 * c3ᵢ * t - - Geneneration is done by solving the system Ax = b for x, where x is an array of PVT - coefficients for each segment (i.e., [c2₁, c3₁, ... c2ₙ, c3ₙ]. A is - a tridiagonal matrix, and b is an array. The system is formed by combining all of - the following constraint equations: - 1. velocity at the start and end of each segment is as specified by the user - 2. acceleration is continuous at each segment transition - 3. acceleration at the start of the sequence is zero - """ - - def calculate_delta_position(c1: float, c2: float, c3: float, delta_time: float) -> float: - """Calculate the position delta of a segment from its coefficients.""" - return float(c1 * delta_time + c2 * delta_time**2 + c3 * delta_time**3) - - num_segments = len(time_sequence) - 1 - A = np.zeros((num_segments * 2, num_segments * 2)) # pylint: disable=invalid-name - b = np.zeros((num_segments * 2, 1)) - # Initial condition acceleration 0 is zero - A[0, 0] = 1 - for segment_index in range(num_segments): - delta_time = time_sequence[segment_index + 1] - time_sequence[segment_index] - delta_vel = velocity_sequence[segment_index + 1] - velocity_sequence[segment_index] - # Position equation for segment - block_index = segment_index * 2 - A[block_index + 1, block_index : block_index + 2] = [2 * delta_time, 3 * delta_time**2] - b[block_index + 1] = delta_vel - if segment_index < num_segments - 1: - # Middle segments, where accelerations are continuous - A[block_index + 2, block_index + 1 : block_index + 3] = [3 * delta_time / 2, -1] - b[block_index + 2] = -delta_vel / (2 * delta_time) - # Get banded form of matrix - ab = np.array( - [ - [A[i, i] for i in range(num_segments * 2)], - [A[i + 1, i] for i in range(num_segments * 2 - 1)] + [0], - ] - ) - coefficients = solve_banded((1, 0), ab, b) - # Generate positions - pos_sequence = [pos_start] - for segment_index in range(num_segments): - pos_sequence.append( - pos_sequence[-1] - + calculate_delta_position( - velocity_sequence[segment_index], - coefficients[segment_index * 2], - coefficients[segment_index * 2 + 1], - time_sequence[segment_index + 1] - time_sequence[segment_index], - ) - ) - pos_sequence.append(pos_end) - return pos_sequence - - -def interpolate_velocity_finite_difference( - position_sequence: list[float], time_sequence: list[float] -) -> float: - """ - Interpolate the velocity at a point. - - This functions uses the positions and time at the generation point and the preceding and - proceeding points to interpolate an appropriate velocity value. - - Generation is done using a finite different scheme, using the positions and times of - the point in question and the preceding and proceeding points in the sequence. - - See https://en.wikipedia.org/wiki/Cubic_Hermite_spline#Finite_difference. - - :param position_sequence: An array containing the position before, at, and after, the - generation point. - :param time_sequence: An array containing the time before, at, and after, the generation - point. - :return: The interpolated velocity. - """ - # Setup - sum_differences = 0.0 - num_differences = 0.0 - - # Calculate the finite differences for the two segments involving the - # point at which to generate a velocity. - for index in range(2): - delta_time = time_sequence[index + 1] - time_sequence[index] - if delta_time > 0: - num_differences += 1 - sum_differences += ( - position_sequence[index + 1] - position_sequence[index] - ) / delta_time - assert num_differences > 0, "All three points must not have the same time" - - # Return the average difference - return sum_differences / num_differences - - class CSVData: """A helper class to read sequences from CSV files.""" @@ -653,7 +364,11 @@ def _read_row(self, row: list[str]) -> None: class Sequence: - """A PVT sequence, formed from one or more PVT points.""" + """ + A PVT sequence, formed from one or more PVT points. + + Used for plotting paths and trajectories in visualization.py + """ def __init__(self, points: list[Point] | None = None) -> None: """ @@ -690,7 +405,7 @@ def end_time(self) -> float: assert len(self._points) > 0, "There are no points in the sequence." return self._points[-1].time - def append_point(self, point: Point) -> None: + def append_point(self, point: Point = False) -> None: """ Append a point to the PVT sequence. @@ -781,396 +496,22 @@ def _get_segment_at_time(self, time: float) -> Segment: return self._segments[index] @staticmethod - def from_parameter_sequences( - time_sequence: list[float], - position_sequences: list[list[float]], - velocity_sequences: list[list[float]], - ) -> Sequence: + def from_sequence_data(data: PvtSequenceData, times_relative: bool): """ - Return a PVT sequence from parameter sequences. + Return a PVT sequence from sequence data. - This function generates a PVT sequence from time, - position, and velocity arrays. + This function generates a PVT sequence from a ZML PVT sequence data object. - :param time_sequence: The sequence of time values. - :param position_sequences: An array of position sequences, one - for each dimension. - :param velocity_sequences: An array of velocity sequences, one - for each dimension. + :param data: The sequence data object. :return: The PVT sequence with generated parameters. """ - sequence = Sequence() - dim = len(position_sequences) - for point_index, time in enumerate(time_sequence): - positions = tuple(position_sequences[i][point_index] for i in range(dim)) - velocities = tuple(velocity_sequences[i][point_index] for i in range(dim)) - sequence.append_point(Point(positions, velocities, time)) - return sequence - - @staticmethod - def from_csv( - filename: str, target_speed: float | None = None, target_accel: float | None = None - ) -> Sequence: - """ - Return a PVT sequence with data loaded from a CSV file. - - This function will load all the given data and attempt to - generate any missing parameters. Generation is only possible - in the following cases: - - No velocity or time information is provided (i.e., there are - no such columns or all values in corresponding columns are - empty.) In this case, all velocity and time parameters will - be generated. - - No position information is provided (i.e., there are no such - columns or all values in the corresponding columns are empty.) - In this case, all position values will be generated. - - Some or all velocity information is missing (i.e., there are no - velocity columns at all, or there is a velocity column for - each position column, and some or all values are empty.) In this - case only the missing velocity parameters will be generated. - - This function assumes the file has a header row with all - necessary columns, and that position and velocity columns - are ordered according to their axis number (i.e., if position - column 1 comes first, then so must velocity column 1.) - Additionally: - - Position columns must have a header containing the keyword "pos" - - Velocity columns must have a header containing the keyword "vel" - - The time column must have a header containing the keyword "time" - No information is read from the header names except their identifying - keyword. - - The function also assumes units are consistent with one another. - That is, if position is given in units P / T, then time is given in - units of T, and velocity is given in units of P / T². - - :param filename: The name of the csv file to load. - :param target_speed: The target speed used for generating velocities and times. - :param target_accel: The target acceleration used for generating velocities and times. - :return: The generated PVT sequence. - """ - - class GenerationType(Enum): - """An enum describing what parameters to generate.""" - - NONE = auto() - """Don't generate any parameters.""" - TIME_AND_VELOCITY = auto() - """Generate time and velocity.""" - POSITION = auto() - """Generate position only.""" - VELOCITY = auto() - """Generate velocity only.""" - - # Read the data and do some sanity checks - data = CSVData(filename) - gen_type = GenerationType.NONE - if not data.contains_time_data: - gen_type = GenerationType.TIME_AND_VELOCITY - assert not data.contains_velocity_data, ( - "Invalid csv structure. Time can only be generated if" - "velocity is also unspecified." - ) - elif not data.contains_velocity_data: - gen_type = GenerationType.VELOCITY - assert data.contains_position_data, ( - "Invalid csv structure. If velocity is unspecified, " "position must be specified." - ) - elif not data.contains_complete_velocity_data: - gen_type = GenerationType.VELOCITY - if not data.contains_position_data: - gen_type = GenerationType.POSITION - assert data.contains_time_data and data.contains_complete_velocity_data, ( - "Invalid csv structure. If position is unspecified, " - "velocity and time must both be specified" - ) - - # Call the appropriate generation function - match gen_type: - case GenerationType.NONE: - return Sequence.from_parameter_sequences( - data.time_sequence, - data.position_sequences, - data.velocity_sequences, # type: ignore - ) - case GenerationType.TIME_AND_VELOCITY: - assert ( - target_speed is not None and target_accel is not None - ), "Target speed and accel must be defined to generate velocities and times" - return Sequence.generate_times_and_velocities( - data.position_sequences, target_speed, target_accel - ) - case GenerationType.POSITION: - return Sequence.generate_positions( - data.time_sequence, - data.velocity_sequences, # type: ignore - ) - case GenerationType.VELOCITY: - return Sequence.generate_velocities( - data.time_sequence, - data.position_sequences, - data.velocity_sequences if data.contains_velocity_data else None, - ) - - @staticmethod - def generate_times_and_velocities( # pylint: disable=too-many-locals - position_sequences: list[list[float]], - target_speed: float, - target_accel: float, - resample_number: int | None = None, - ) -> Sequence: - """ - Return a PVT sequence from a sequence of position keypoints. - - This function generates the velocity and time parameters, - using a target speed and acceleration - - This function fits a geometric spline over the position - information, and then calculates the velocity and time - information by traversing it using a trapezoidal motion - profile. - - This generation scheme attempts to keep speed and acceleration - less than the specified target values, but does not guarantee it. - Generally speaking, a higher resample number will bring the - generated trajectory closer to respecting these limits. - - :param position_sequences: The position sequences for each axis. - :param target_speed: The target speed used for generating velocities and times. - :param target_accel: The target acceleration used for generating velocities and times. - :param resample_num: The number of points to resample the sequence by, or None to use - the specified points. - :return: The generated PVT sequence. - """ - # Setup - dim = len(position_sequences) - generated_sequence = Sequence() - geo_path = GeometricPath(position_sequences) - u_sample = ( - geo_path.parameterized_lengths - if resample_number is None - else ( - [0] - + [ - geo_path.calc_u_at_length(i * geo_path.length / (resample_number - 1)) - for i in range(1, resample_number - 1) - ] - + [1] - ) - ) - - def generate_calculation_points( - u_sample: list[float], geo_path: GeometricPath - ) -> tuple[list[float], dict[int, list[int]]]: - """ - Generate a list of critical points used for calculating the speed profile. - - This function generates a list of calculation points by: - - Adding intermediate points between the sample points, and - - Adding critical points, or points where one or more axis changes direction - """ - # For each sample point, use N points in calculations - u_calc = [0.0] - for u_prev, u_next in zip(u_sample[:-1], u_sample[1:]): - parameterized_sublength = (u_next - u_prev) / 10 - for i in range(1, 10): - u_calc.append(u_prev + parameterized_sublength * i) - u_calc.append(u_next) - - # Find points where axes change directions - reversals: dict[int, list[int]] = {} - segment_index = 0 - while segment_index < len(u_calc) - 1: - initial_direction = geo_path.direction(u_calc[segment_index]) - final_direction = geo_path.direction(u_calc[segment_index + 1]) - u_inserts = {} - for d in range(dim): - if initial_direction[d] * final_direction[d] < 0: - if dim > 1: - u_inserts[d] = newton( - partial(lambda u, d: geo_path.dx_du(u)[d], d=d), - (u_calc[segment_index] + u_calc[segment_index + 1]) / 2, - partial(lambda u, d: geo_path.dx_du(u, 2)[d], d=d), - fprime2=partial(lambda u, d: geo_path.dx_du(u, 3)[d], d=d), - ) - else: - u_inserts[d] = bisect( - partial(lambda u, d: geo_path.dx_du(u)[d], d=d), - u_calc[segment_index], - u_calc[segment_index + 1], - ) - for d, u_insert in sorted(u_inserts.items(), key=lambda item: item[1]): - if math.isclose(u_insert, u_calc[segment_index]): - reversals.setdefault(segment_index, []) - reversals[segment_index].append(d) - else: - segment_index += 1 - reversals.setdefault(segment_index, []) - reversals[segment_index].append(d) - u_calc = u_calc[:segment_index] + [u_insert] + u_calc[segment_index:] - segment_index += 1 - return u_calc, reversals - - u_calc, reversals = generate_calculation_points(u_sample, geo_path) - - # Calculate speed limits from end point - segment_lengths = [ - geo_path.segment_length(u_calc[i], u_calc[i + 1]) for i in range(len(u_calc) - 1) - ] - speed_limits = [target_speed for _ in u_calc] - speed_limits[0] = speed_limits[-1] = 0 - for i in reversed(range(1, len(speed_limits) - 1)): - # Calculate the speed limit from max deceleration over the path length - speed_limits[i] = min( - speed_limits[i], - (speed_limits[i + 1] ** 2 + 2 * target_accel * segment_lengths[i]) ** 0.5, - ) - # Calculate the speed limit from total acceleration - if i in reversals and len(reversals[i]) == dim: - speed_limits[i] = min(speed_limits[i], 0) - elif ( - denominator := sum(d2xi_dl2**2 for d2xi_dl2 in geo_path.d2x_dl2(u_calc[i])) - ) != 0: - speed_limits[i] = min(speed_limits[i], (target_accel**2 / denominator) ** (1 / 4)) - - # Calculate speed limits from start point and assemble sequence - time = 0.0 - generated_sequence.append_point( - Point( - geo_path.position(u_sample[0]), - tuple(speed_limits[0] * d for d in geo_path.direction(u_sample[0])), - time, + points = [] + absolute_times = np.cumsum(data.times.values) if times_relative else data.times.values + for i in range(len(data.times.values)): + point: Point = Point( + position=tuple(ms.values[i] for ms in data.positions), + velocity=tuple(ms.values[i] for ms in data.velocities), + time=absolute_times[i], ) - ) - next_sample_index = 1 - for i in range(1, len(speed_limits)): - # Calculate the speed limit from max acceleration over the path length - speed_limits[i] = min( - speed_limits[i], - (speed_limits[i - 1] ** 2 + 2 * target_accel * segment_lengths[i - 1]) ** 0.5, - ) - if (average_speed := sum(speed_limits[i - 1 : i + 1]) / 2) == 0: - time += (segment_lengths[i - 1] / target_accel) ** 0.5 - else: - time += segment_lengths[i - 1] / average_speed - if u_calc[i] >= u_sample[next_sample_index]: - generated_sequence.append_point( - Point( - geo_path.position(u_calc[i]), - tuple(speed_limits[i] * d for d in geo_path.direction(u_calc[i])), - time, - ) - ) - next_sample_index += 1 - - return generated_sequence - - @staticmethod - def generate_velocities( - time_sequence: list[float], - position_sequences: list[list[float]], - velocity_sequences: list[list[float | None]] | None, - ) -> Sequence: - """ - Return a PVT sequence from position-time data or position-velocity-time data. - - This function calculates velocities by enforcing acceleration be - continuous at each segment transition. For more information, see - the function generate_velocities_continuous_acceleration(). - - :param time_sequence: The sequence of time values. - :param position_sequences: An array of position sequences, one - for each dimension. - :param velocity_sequences: An array of velocity sequences, one - for each dimension. This must either have the same size as - position_sequences, or be None to denote all values be generated. - :return: The PVT sequence with generated parameters. - """ - # Setup - sequence_dim = len(position_sequences) - sequence_length = len(time_sequence) - generated_sequence = Sequence() - - # Generate velocities - if velocity_sequences is None: - # Generate all velocities - velocity_sequences = [] - for dim in range(sequence_dim): - velocity_sequences.append( - generate_velocities_continuous_acceleration( # type: ignore - position_sequences[dim], time_sequence, 0, 0 - ) - ) - else: - # Generate some velocities - for dim in range(sequence_dim): - # Set zero velocity at endpoints - for endpoint_index in (0, sequence_length - 1): - if velocity_sequences[dim][endpoint_index] is None: - velocity_sequences[dim][endpoint_index] = 0 - # Generate the rest - gen_start_index = None - for point_index in range(1, sequence_length - 1): - # Check for the first undefined velocity in a sequence of undefined velocities - if gen_start_index is None and velocity_sequences[dim][point_index] is None: - gen_start_index = point_index - 1 - # Check for the end of a sequence of undefined velocities - if ( - gen_start_index is not None - and velocity_sequences[dim][point_index + 1] is not None - ): - velocity_sequences[dim][ - gen_start_index : point_index + 2 - ] = generate_velocities_continuous_acceleration( - position_sequences[dim][gen_start_index : point_index + 2], - time_sequence[gen_start_index : point_index + 2], - velocity_sequences[dim][gen_start_index], # type: ignore - velocity_sequences[dim][point_index + 1], # type: ignore - ) - gen_start_index = None - # Append the points - for point_index in range(sequence_length): - positions = [position_sequences[i][point_index] for i in range(sequence_dim)] - velocities = [velocity_sequences[i][point_index] for i in range(sequence_dim)] - time = time_sequence[point_index] - generated_sequence.append_point(Point(positions, velocities, time)) # type: ignore - return generated_sequence - - @staticmethod - def generate_positions( - time_sequence: list[float], - velocity_sequences: list[list[float]], - ) -> Sequence: - """ - Return a PVT sequence from position-time data. - - This function calculates positions by enforcing acceleration be - continuous at each segment transition. For more information, see - the function generate_positions_continuous_acceleration(). - - :param time_sequence: The sequence of time values. - :param velocity_sequences: An array of velocity sequences, one - for each dimension. - :return: The PVT sequence with generated parameters. - """ - # Setup - sequence_dim = len(velocity_sequences) - sequence_length = len(time_sequence) - generated_sequence = Sequence() - - # Generate positions - position_sequences = [] - for dim in range(sequence_dim): - position_sequences.append( - generate_positions_continuous_acceleration( - velocity_sequences[dim], time_sequence, 0, 0 - ) - ) - - # Append the points - for point_index in range(sequence_length): - positions = tuple(position_sequences[i][point_index] for i in range(sequence_dim)) - velocities = tuple(velocity_sequences[i][point_index] for i in range(sequence_dim)) - time = time_sequence[point_index] - generated_sequence.append_point(Point(positions, velocities, time)) - return generated_sequence + points.append(point) + return Sequence(points) diff --git a/examples/motion_pvt_sequence_generation/sample_data/position_time_data/spiral_2d.csv b/examples/motion_pvt_sequence_generation/sample_data/position_time_data/spiral_2d.csv index 5ec75ea..2ed3f52 100644 --- a/examples/motion_pvt_sequence_generation/sample_data/position_time_data/spiral_2d.csv +++ b/examples/motion_pvt_sequence_generation/sample_data/position_time_data/spiral_2d.csv @@ -1,7 +1,7 @@ Time (s),Axis 1 Position (cm),Axis 2 Position (cm) 0.0,0.0,0.0 3.3333333333333335,2.8867513459481287,-1.6666666666666676 -6.666666666666667,-5.77350269189626,-3.3333333333333313 -10.0,-2.4492935982947065e-15,10.0 -13.333333333333334,11.547005383792513,-6.6666666666666785 -16.666666666666668,-14.433756729740645,-8.33333333333333 +3.3333333333333335,-5.77350269189626,-3.3333333333333313 +3.3333333333333335,-2.4492935982947065e-15,10.0 +3.3333333333333335,11.547005383792513,-6.6666666666666785 +3.3333333333333335,-14.433756729740645,-8.33333333333333 diff --git a/examples/motion_pvt_sequence_generation/sample_data/position_time_data/spiral_3d.csv b/examples/motion_pvt_sequence_generation/sample_data/position_time_data/spiral_3d.csv index b6b32ff..e8450b7 100644 --- a/examples/motion_pvt_sequence_generation/sample_data/position_time_data/spiral_3d.csv +++ b/examples/motion_pvt_sequence_generation/sample_data/position_time_data/spiral_3d.csv @@ -1,7 +1,7 @@ Time (s),Axis 1 Position (cm),Axis 2 Position (cm),Axis 3 Position (cm) 0.0,0.0,0.0,0.0 3.3333333333333335,2.8867513459481287,-1.6666666666666676,0.669872981077807 -6.666666666666667,-5.77350269189626,-3.3333333333333313,2.5000000000000004 -10.0,-2.4492935982947065e-15,10.0,4.999999999999999 -13.333333333333334,11.547005383792513,-6.6666666666666785,7.500000000000001 -16.666666666666668,-14.433756729740645,-8.33333333333333,9.330127018922195 +3.3333333333333335,-5.77350269189626,-3.3333333333333313,2.5000000000000004 +3.3333333333333335,-2.4492935982947065e-15,10.0,4.999999999999999 +3.3333333333333335,11.547005383792513,-6.6666666666666785,7.500000000000001 +3.3333333333333335,-14.433756729740645,-8.33333333333333,9.330127018922195 diff --git a/examples/motion_pvt_sequence_generation/sample_data/position_time_data/wave_1d.csv b/examples/motion_pvt_sequence_generation/sample_data/position_time_data/wave_1d.csv index c5efb92..e13076d 100644 --- a/examples/motion_pvt_sequence_generation/sample_data/position_time_data/wave_1d.csv +++ b/examples/motion_pvt_sequence_generation/sample_data/position_time_data/wave_1d.csv @@ -1,7 +1,7 @@ Time (s),Axis 1 Position (cm) 0.0,0.0 3.3333333333333335,8.660254037844386 -6.666666666666667,-8.660254037844389 -10.0,-2.4492935982947065e-15 -13.333333333333334,8.660254037844382 -16.666666666666668,-8.660254037844387 +3.3333333333333335,-8.660254037844389 +3.3333333333333335,-2.4492935982947065e-15 +3.3333333333333335,8.660254037844382 +3.3333333333333335,-8.660254037844387 diff --git a/examples/motion_pvt_sequence_generation/sample_data/position_velocity_time_data/spiral_2d.csv b/examples/motion_pvt_sequence_generation/sample_data/position_velocity_time_data/spiral_2d.csv index f5a62cb..0930274 100644 --- a/examples/motion_pvt_sequence_generation/sample_data/position_velocity_time_data/spiral_2d.csv +++ b/examples/motion_pvt_sequence_generation/sample_data/position_velocity_time_data/spiral_2d.csv @@ -1,7 +1,7 @@ Time (s),Axis 0 Position (cm),Axis 0 Velocity (cm/s),Axis 1 Position (cm),Axis 1 Velocity (cm/s) 0.0,0.0,0.0,0.0,1.0 3.3333333333333335,2.8867513459481287,-0.1811721474121598,-1.6666666666666676,-2.313799364234218 -6.666666666666667,-5.77350269189626,-2.9604205061776327,-3.3333333333333313,3.1275987284684375 -10.0,-2.4492935982947065e-15,6.283185307179586,10.0,1.0000000000000016 -13.333333333333334,11.547005383792513,-3.3227648010019593,-6.6666666666666785,-7.75519745693687 -16.666666666666668,-14.433756729740645,-6.102013159767425,-8.33333333333333,8.56899682117109 +3.3333333333333335,-5.77350269189626,-2.9604205061776327,-3.3333333333333313,3.1275987284684375 +3.3333333333333335,-2.4492935982947065e-15,6.283185307179586,10.0,1.0000000000000016 +3.3333333333333335,11.547005383792513,-3.3227648010019593,-6.6666666666666785,-7.75519745693687 +3.3333333333333335,-14.433756729740645,-6.102013159767425,-8.33333333333333,8.56899682117109 diff --git a/examples/motion_pvt_sequence_generation/sample_data/position_velocity_time_data/spiral_3d.csv b/examples/motion_pvt_sequence_generation/sample_data/position_velocity_time_data/spiral_3d.csv index 82c18aa..1e24b15 100644 --- a/examples/motion_pvt_sequence_generation/sample_data/position_velocity_time_data/spiral_3d.csv +++ b/examples/motion_pvt_sequence_generation/sample_data/position_velocity_time_data/spiral_3d.csv @@ -1,7 +1,7 @@ Time (s),Axis 0 Position (cm),Axis 0 Velocity (cm/s),Axis 1 Position (cm),Axis 1 Velocity (cm/s),Axis 2 Position (cm),Axis 2 Velocity (cm/s) 0.0,0.0,0.0,0.0,1.0,0.0,0.0 3.3333333333333335,2.8867513459481287,-0.1811721474121598,-1.6666666666666676,-2.313799364234218,0.669872981077807,0.39269908169872414 -6.666666666666667,-5.77350269189626,-2.9604205061776327,-3.3333333333333313,3.1275987284684375,2.5000000000000004,0.6801747615878317 -10.0,-2.4492935982947065e-15,6.283185307179586,10.0,1.0000000000000016,4.999999999999999,0.7853981633974483 -13.333333333333334,11.547005383792513,-3.3227648010019593,-6.6666666666666785,-7.75519745693687,7.500000000000001,0.6801747615878315 -16.666666666666668,-14.433756729740645,-6.102013159767425,-8.33333333333333,8.56899682117109,9.330127018922195,0.3926990816987241 +3.3333333333333335,-5.77350269189626,-2.9604205061776327,-3.3333333333333313,3.1275987284684375,2.5000000000000004,0.6801747615878317 +3.3333333333333335,-2.4492935982947065e-15,6.283185307179586,10.0,1.0000000000000016,4.999999999999999,0.7853981633974483 +3.3333333333333335,11.547005383792513,-3.3227648010019593,-6.6666666666666785,-7.75519745693687,7.500000000000001,0.6801747615878315 +3.3333333333333335,-14.433756729740645,-6.102013159767425,-8.33333333333333,8.56899682117109,9.330127018922195,0.3926990816987241 diff --git a/examples/motion_pvt_sequence_generation/sample_data/position_velocity_time_data/wave_1d.csv b/examples/motion_pvt_sequence_generation/sample_data/position_velocity_time_data/wave_1d.csv index 9a612c9..74346d9 100644 --- a/examples/motion_pvt_sequence_generation/sample_data/position_velocity_time_data/wave_1d.csv +++ b/examples/motion_pvt_sequence_generation/sample_data/position_velocity_time_data/wave_1d.csv @@ -1,7 +1,7 @@ Time (s),Axis 0 Position (cm),Axis 0 Velocity (cm/s) 0.0,0.0,6.283185307179586 3.3333333333333335,8.660254037844386,-3.1415926535897944 -6.666666666666667,-8.660254037844389,-3.141592653589791 -10.0,-2.4492935982947065e-15,6.283185307179586 -13.333333333333334,8.660254037844382,-3.141592653589798 -16.666666666666668,-8.660254037844387,-3.1415926535897922 +3.3333333333333335,-8.660254037844389,-3.141592653589791 +3.3333333333333335,-2.4492935982947065e-15,6.283185307179586 +3.3333333333333335,8.660254037844382,-3.141592653589798 +3.3333333333333335,-8.660254037844387,-3.1415926535897922 diff --git a/examples/motion_pvt_sequence_generation/sample_data/velocity_time_data/spiral_2d.csv b/examples/motion_pvt_sequence_generation/sample_data/velocity_time_data/spiral_2d.csv index 7fc2e34..a9d2c4a 100644 --- a/examples/motion_pvt_sequence_generation/sample_data/velocity_time_data/spiral_2d.csv +++ b/examples/motion_pvt_sequence_generation/sample_data/velocity_time_data/spiral_2d.csv @@ -1,7 +1,7 @@ Time (s),Axis 1 Velocity (cm/s),Axis 2 Velocity (cm/s) 0.0,0.0,1.0 3.3333333333333335,-0.1811721474121598,-2.313799364234218 -6.666666666666667,-2.9604205061776327,3.1275987284684375 -10.0,6.283185307179586,1.0000000000000016 -13.333333333333334,-3.3227648010019593,-7.75519745693687 -16.666666666666668,-6.102013159767425,8.56899682117109 +3.3333333333333335,-2.9604205061776327,3.1275987284684375 +3.3333333333333335,6.283185307179586,1.0000000000000016 +3.3333333333333335,-3.3227648010019593,-7.75519745693687 +3.3333333333333335,-6.102013159767425,8.56899682117109 diff --git a/examples/motion_pvt_sequence_generation/sample_data/velocity_time_data/spiral_3d.csv b/examples/motion_pvt_sequence_generation/sample_data/velocity_time_data/spiral_3d.csv index 95c334e..1e8b31e 100644 --- a/examples/motion_pvt_sequence_generation/sample_data/velocity_time_data/spiral_3d.csv +++ b/examples/motion_pvt_sequence_generation/sample_data/velocity_time_data/spiral_3d.csv @@ -1,7 +1,7 @@ Time (s),Axis 1 Velocity (cm/s),Axis 2 Velocity (cm/s),Axis 3 Velocity (cm/s) 0.0,0.0,1.0,0.0 3.3333333333333335,-0.1811721474121598,-2.313799364234218,0.39269908169872414 -6.666666666666667,-2.9604205061776327,3.1275987284684375,0.6801747615878317 -10.0,6.283185307179586,1.0000000000000016,0.7853981633974483 -13.333333333333334,-3.3227648010019593,-7.75519745693687,0.6801747615878315 -16.666666666666668,-6.102013159767425,8.56899682117109,0.3926990816987241 +3.3333333333333335,-2.9604205061776327,3.1275987284684375,0.6801747615878317 +3.3333333333333335,6.283185307179586,1.0000000000000016,0.7853981633974483 +3.3333333333333335,-3.3227648010019593,-7.75519745693687,0.6801747615878315 +3.3333333333333335,-6.102013159767425,8.56899682117109,0.3926990816987241 diff --git a/examples/motion_pvt_sequence_generation/sample_data/velocity_time_data/wave_1d.csv b/examples/motion_pvt_sequence_generation/sample_data/velocity_time_data/wave_1d.csv index 99d92ae..f94be74 100644 --- a/examples/motion_pvt_sequence_generation/sample_data/velocity_time_data/wave_1d.csv +++ b/examples/motion_pvt_sequence_generation/sample_data/velocity_time_data/wave_1d.csv @@ -1,7 +1,7 @@ Time (s),Axis 1 Velocity (cm/s) 0.0,6.283185307179586 3.3333333333333335,-3.1415926535897944 -6.666666666666667,-3.141592653589791 -10.0,6.283185307179586 -13.333333333333334,-3.141592653589798 -16.666666666666668,-3.1415926535897922 +3.3333333333333335,-3.141592653589791 +3.3333333333333335,6.283185307179586 +3.3333333333333335,-3.141592653589798 +3.3333333333333335,-3.1415926535897922 diff --git a/examples/motion_pvt_sequence_generation/sequence_generators.py b/examples/motion_pvt_sequence_generation/sequence_generators.py new file mode 100644 index 0000000..4c95cf7 --- /dev/null +++ b/examples/motion_pvt_sequence_generation/sequence_generators.py @@ -0,0 +1,680 @@ +""" +These functions have now been ported to Zaber Motion Library, +so the calls to them in this example code have been replaced by +equivalent calls to equivalent functions on zaber_motion.ascii.PvtSequence +class. + +They remain here in case anyone is interested in reading and studying +their implementation: we assume that even the most technically proficient +user may find it easier to read and understand their implementation in +python than in golang. + +That said, if anyone is curious, they can find the production go code here: +https://gitlab.com/ZaberTech/zaber-motion-lib/-/blob/master/internal/devices/stream_pvt_sequence_generators.go + +The only real differences between these functions and their go +implementation is that in go, generate_times_and_velocities handles +sequences of 2 and 3 points whereas the python implementation can only +handle sequences of 4 or more. +""" + +from functools import partial +from itertools import accumulate +import math + +import numpy as np +from numpy import float64 +from numpy.typing import NDArray +from scipy.integrate import quad # type: ignore +from scipy.interpolate import splev, splprep # type: ignore +from scipy.linalg import solve_banded # type: ignore +from scipy.optimize import bisect, newton # type: ignore + +import pvt + +def generate_times_and_velocities( # pylint: disable=too-many-locals + position_sequences: list[list[float]], + target_speed: float, + target_accel: float, + resample_number: int | None = None, +) -> pvt.Sequence: + """ + Return a PVT sequence from a sequence of position keypoints. + + This function generates the velocity and time parameters, + using a target speed and acceleration + + This function fits a geometric spline over the position + information, and then calculates the velocity and time + information by traversing it using a trapezoidal motion + profile. + + This generation scheme attempts to keep speed and acceleration + less than the specified target values, but does not guarantee it. + Generally speaking, a higher resample number will bring the + generated trajectory closer to respecting these limits. + + :param position_sequences: The position sequences for each axis. + :param target_speed: The target speed used for generating velocities and times. + :param target_accel: The target acceleration used for generating velocities and times. + :param resample_num: The number of points to resample the sequence by, or None to use + the specified points. + :return: The generated PVT sequence. + """ + # Setup + dim = len(position_sequences) + generated_sequence = pvt.Sequence() + geo_path = GeometricPath(position_sequences) + u_sample = ( + geo_path.parameterized_lengths + if resample_number is None + else ( + [0] + + [ + geo_path.calc_u_at_length(i * geo_path.length / (resample_number - 1)) + for i in range(1, resample_number - 1) + ] + + [1] + ) + ) + + def generate_calculation_points( + u_sample: list[float], geo_path: GeometricPath + ) -> tuple[list[float], dict[int, list[int]]]: + """ + Generate a list of critical points used for calculating the speed profile. + + This function generates a list of calculation points by: + - Adding intermediate points between the sample points, and + - Adding critical points, or points where one or more axis changes direction + """ + # For each sample point, use N points in calculations + u_calc = [0.0] + for u_prev, u_next in zip(u_sample[:-1], u_sample[1:]): + parameterized_sublength = (u_next - u_prev) / 10 + for i in range(1, 10): + u_calc.append(u_prev + parameterized_sublength * i) + u_calc.append(u_next) + + # Find points where axes change directions + reversals: dict[int, list[int]] = {} + segment_index = 0 + while segment_index < len(u_calc) - 1: + initial_direction = geo_path.direction(u_calc[segment_index]) + final_direction = geo_path.direction(u_calc[segment_index + 1]) + u_inserts = {} + for d in range(dim): + if initial_direction[d] * final_direction[d] < 0: + if dim > 1: + u_inserts[d] = newton( + partial(lambda u, d: geo_path.dx_du(u)[d], d=d), + (u_calc[segment_index] + u_calc[segment_index + 1]) / 2, + partial(lambda u, d: geo_path.dx_du(u, 2)[d], d=d), + fprime2=partial(lambda u, d: geo_path.dx_du(u, 3)[d], d=d), + ) + else: + u_inserts[d] = bisect( + partial(lambda u, d: geo_path.dx_du(u)[d], d=d), + u_calc[segment_index], + u_calc[segment_index + 1], + ) + for d, u_insert in sorted(u_inserts.items(), key=lambda item: item[1]): + if math.isclose(u_insert, u_calc[segment_index]): + reversals.setdefault(segment_index, []) + reversals[segment_index].append(d) + else: + segment_index += 1 + reversals.setdefault(segment_index, []) + reversals[segment_index].append(d) + u_calc = u_calc[:segment_index] + [u_insert] + u_calc[segment_index:] + segment_index += 1 + return u_calc, reversals + + u_calc, reversals = generate_calculation_points(u_sample, geo_path) + + # Calculate speed limits from end point + segment_lengths = [ + geo_path.segment_length(u_calc[i], u_calc[i + 1]) for i in range(len(u_calc) - 1) + ] + speed_limits = [target_speed for _ in u_calc] + speed_limits[0] = speed_limits[-1] = 0 + for i in reversed(range(1, len(speed_limits) - 1)): + # Calculate the speed limit from max deceleration over the path length + speed_limits[i] = min( + speed_limits[i], + (speed_limits[i + 1] ** 2 + 2 * target_accel * segment_lengths[i]) ** 0.5, + ) + # Calculate the speed limit from total acceleration + if i in reversals and len(reversals[i]) == dim: + speed_limits[i] = min(speed_limits[i], 0) + elif ( + denominator := sum(d2xi_dl2**2 for d2xi_dl2 in geo_path.d2x_dl2(u_calc[i])) + ) != 0: + speed_limits[i] = min(speed_limits[i], (target_accel**2 / denominator) ** (1 / 4)) + + # Calculate speed limits from start point and assemble sequence + time = 0.0 + generated_sequence.append_point( + pvt.Point( + geo_path.position(u_sample[0]), + tuple(speed_limits[0] * d for d in geo_path.direction(u_sample[0])), + time, + ) + ) + next_sample_index = 1 + for i in range(1, len(speed_limits)): + # Calculate the speed limit from max acceleration over the path length + speed_limits[i] = min( + speed_limits[i], + (speed_limits[i - 1] ** 2 + 2 * target_accel * segment_lengths[i - 1]) ** 0.5, + ) + if (average_speed := sum(speed_limits[i - 1 : i + 1]) / 2) == 0: + time += (segment_lengths[i - 1] / target_accel) ** 0.5 + else: + time += segment_lengths[i - 1] / average_speed + if u_calc[i] >= u_sample[next_sample_index]: + generated_sequence.append_point( + pvt.Point( + geo_path.position(u_calc[i]), + tuple(speed_limits[i] * d for d in geo_path.direction(u_calc[i])), + time, + ) + ) + next_sample_index += 1 + + return generated_sequence + +def generate_velocities( + time_sequence: list[float], + position_sequences: list[list[float]], + velocity_sequences: list[list[float | None]] | None, +) -> pvt.Sequence: + """ + Return a PVT sequence from position-time data or position-velocity-time data. + + This function calculates velocities by enforcing acceleration be + continuous at each segment transition. For more information, see + the function generate_velocities_continuous_acceleration(). + + :param time_sequence: The sequence of time values. + :param position_sequences: An array of position sequences, one + for each dimension. + :param velocity_sequences: An array of velocity sequences, one + for each dimension. This must either have the same size as + position_sequences, or be None to denote all values be generated. + :return: The PVT sequence with generated parameters. + """ + # Setup + sequence_dim = len(position_sequences) + sequence_length = len(time_sequence) + generated_sequence = pvt.Sequence() + + # Generate velocities + if velocity_sequences is None: + # Generate all velocities + velocity_sequences = [] + for dim in range(sequence_dim): + velocity_sequences.append( + generate_velocities_continuous_acceleration( # type: ignore + position_sequences[dim], time_sequence, 0, 0 + ) + ) + else: + # Generate some velocities + for dim in range(sequence_dim): + # Set zero velocity at endpoints + for endpoint_index in (0, sequence_length - 1): + if velocity_sequences[dim][endpoint_index] is None: + velocity_sequences[dim][endpoint_index] = 0 + # Generate the rest + gen_start_index = None + for point_index in range(1, sequence_length - 1): + # Check for the first undefined velocity in a sequence of undefined velocities + if gen_start_index is None and velocity_sequences[dim][point_index] is None: + gen_start_index = point_index - 1 + # Check for the end of a sequence of undefined velocities + if ( + gen_start_index is not None + and velocity_sequences[dim][point_index + 1] is not None + ): + velocity_sequences[dim][ + gen_start_index : point_index + 2 + ] = generate_velocities_continuous_acceleration( + position_sequences[dim][gen_start_index : point_index + 2], + time_sequence[gen_start_index : point_index + 2], + velocity_sequences[dim][gen_start_index], # type: ignore + velocity_sequences[dim][point_index + 1], # type: ignore + ) + gen_start_index = None + # Append the points + for point_index in range(sequence_length): + positions = [position_sequences[i][point_index] for i in range(sequence_dim)] + velocities = [velocity_sequences[i][point_index] for i in range(sequence_dim)] + time = time_sequence[point_index] + generated_sequence.append_point(pvt.Point(positions, velocities, time)) # type: ignore + return generated_sequence + +def generate_positions( + time_sequence: list[float], + velocity_sequences: list[list[float]], +) -> pvt.Sequence: + """ + Return a PVT sequence from position-time data. + + This function calculates positions by enforcing acceleration be + continuous at each segment transition. For more information, see + the function generate_positions_continuous_acceleration(). + + :param time_sequence: The sequence of time values. + :param velocity_sequences: An array of velocity sequences, one + for each dimension. + :return: The PVT sequence with generated parameters. + """ + # Setup + sequence_dim = len(velocity_sequences) + sequence_length = len(time_sequence) + generated_sequence = pvt.Sequence() + + # Generate positions + position_sequences = [] + for dim in range(sequence_dim): + position_sequences.append( + generate_positions_continuous_acceleration( + velocity_sequences[dim], time_sequence, 0, 0 + ) + ) + + # Append the points + for point_index in range(sequence_length): + positions = tuple(position_sequences[i][point_index] for i in range(sequence_dim)) + velocities = tuple(velocity_sequences[i][point_index] for i in range(sequence_dim)) + time = time_sequence[point_index] + generated_sequence.append_point(pvt.Point(positions, velocities, time)) + return generated_sequence + +class GeometricPath: + """An N-directional geometric path constructed from a sequence of position keypoints.""" + + def __init__(self, position_sequences: list[list[float]]): + """ + Initialize an N-dimensional geometric path from a list of position sequences. + + The path is generated as a B-Spline, and is parametrized by + a normalized length variable u. That is, the path starts at + u=0 and ends at u=1. + + :param position_sequences: A list of position sequences, one + for each dimension. + """ + self._dim = len(position_sequences) + tck, u = splprep( # pylint: disable=unbalanced-tuple-unpacking + position_sequences, s=0, full_output=0 + ) + self._tck: tuple[NDArray[float64], list[NDArray[float64]], int] = tck + self._u: list[float] = list(u) + self._length_at_u = list( + accumulate( + ( + self._calculate_segment_length(u0, uf) + for u0, uf in zip(self._u[:-1], self._u[1:]) + ), + initial=0, + ) + ) + + @property + def length(self) -> float: + """The length of the path.""" + return self._length_at_u[-1] + + @property + def parameterized_lengths(self) -> list[float]: + """ + The sequence of parameterized lengths. + + These lengths correspond to the position keypoints + used to construct the path. + """ + return self._u.copy() + + def position(self, u: float) -> tuple[float, ...]: + """ + Return the N-D path position at parameterized length u. + + :param u: The parameterized length u. + """ + # By default, splev returns an array for each element. Pick + # the one and only value by flattening the arrays. + val = splev(u, self._tck) + return tuple(val[i].flat[0] for i in range(self._dim)) + + def direction(self, u: float) -> tuple[float, ...]: + """ + Return the N-D path direction, as a unit vector, at parameterized distance u. + + :param u: The parameterized length u. + """ + dx_du = self.dx_du(u) + norm = sum(val**2 for val in dx_du) ** 0.5 + if norm == 0: + assert all(dx_du_i == 0 for dx_du_i in dx_du) + return tuple(0 for _ in dx_du) + return tuple(val / norm for val in dx_du) + + def velocity(self, u: float, speed: float) -> tuple[float, ...]: + """ + Return the velocity at the given parameterized length. + + :param u: The parameterized length u. + :param speed: The tangential speed along the path. + """ + dl_du = self.dl_du(u) + return tuple(speed * dx_du / dl_du for dx_du in self.dx_du(u)) + + def acceleration(self, u: float, speed: float, accel: float) -> tuple[float, ...]: + """ + Return the acceleration at the given parameterized length. + + :param u: The parameterized length u. + :param speed: The tangential speed along the path. + :param accel: The tangential acceleration along the path. + """ + dx_dl = self.dx_dl(u) + d2x_dl2 = self.d2x_dl2(u) + return tuple( + speed**2 * d2x_dl2_i + accel * dx_dl_i for dx_dl_i, d2x_dl2_i in zip(dx_dl, d2x_dl2) + ) + + def _calculate_segment_length(self, u0: float, uf: float) -> float: + """ + Calculate the path length between u0 and uf. + + :param u0: The start point of the measurement, in parameterized units. + :param uf: The end point of the measurement, in parameterized units. + """ + return float(quad(self.dl_du, u0, uf)[0]) + + def segment_length(self, u0: float, uf: float) -> float: + """ + Return the path length between u0 and uf. + + :param u0: The start point of the measurement, in parameterized units. + :param uf: The end point of the measurement, in parameterized units. + """ + first_index_after_u0 = next((i for i, u in enumerate(self._u) if u >= u0), len(self._u) - 1) + last_index_before_uf = next( + (i - 1 for i, u in enumerate(self._u) if u > uf), len(self._u) - 1 + ) + if last_index_before_uf >= first_index_after_u0 - 1 or last_index_before_uf < 0: + return self._calculate_segment_length(u0, uf) + + length = self._calculate_segment_length(u0, self._u[first_index_after_u0]) + length += self._length_at_u[last_index_before_uf] - self._length_at_u[first_index_after_u0] + length += self._calculate_segment_length(self._u[last_index_before_uf], uf) + assert (uf >= u0) == ( + length >= 0 + ), f"{u0} {uf} {first_index_after_u0} {last_index_before_uf}" + return length + + def calc_u_at_length(self, length: float) -> float: + """ + Return the parameterization length for a given real length. + + :param length: The length at which we want to calculate u. + """ + # Find an estimate of u via linear interpolation + u_estimate = np.interp(length, self._length_at_u, self._u) + i0 = next((i - 1 for i, u in enumerate(self._u) if u > u_estimate), len(self._u) - 1) + u0 = self._u[i0] + + # Calculate u using an optimization algorithm + delta_len = length - self._length_at_u[i0] + u: float = newton( + lambda u: self.segment_length(u0, u) - delta_len, + u_estimate, + self.dl_du, + fprime2=self.d2l_du2, + ) + return u + + def dx_du(self, u: float, derivative_number: float = 1) -> tuple[float, ...]: + """ + Return the derivative of N-D path position with respect to u. + + :param u: The parameterized length u. + :param derivative_number: The derivative (defaults to the first derivative). + """ + # By default, splev returns an array for each element. Pick + # the one and only value by flattening the arrays. + val = splev(u, self._tck, derivative_number) + return tuple(val[i].flat[0] for i in range(self._dim)) + + def dl_du(self, u: float) -> float: + """ + Return the derivative of path length with respect to the parameterization variable at u. + + :param u: The parameterized length u. + """ + dx_du = self.dx_du(u) + return float(sum(val**2 for val in dx_du) ** 0.5) + + def d2l_du2(self, u: float) -> float: + """ + Return the second derivative of path length with respect to the parameterization variable. + + :param u: The parameterized length u. + """ + dl_du = self.dl_du(u) + dx_du = self.dx_du(u) + d2p_du2 = self.dx_du(u, 2) + if dl_du == 0: + assert sum(2 * dx_du[i] * d2p_du2[i] for i in range(self._dim)) == 0 + return 0 + return sum(2 * dx_du[i] * d2p_du2[i] for i in range(self._dim)) / (2 * dl_du) + + def dx_dl(self, u: float) -> tuple[float, ...]: + """ + Return the first derivative of x with respect to path length at u. + + :param u: The parameterized length u. + """ + dl_du = self.dl_du(u) + dx_du = self.dx_du(u) + if dl_du == 0: + assert all(dx_du_i == 0 for dx_du_i in dx_du) + return tuple(0 for _ in dx_du) + return tuple(dx_du_i / dl_du for dx_du_i in dx_du) + + def d2x_dl2(self, u: float) -> tuple[float, ...]: + """ + Return the second derivative of x with respect to path length at u. + + :param u: The parameterized length u. + """ + dl_du = self.dl_du(u) + d2x_du2 = self.dx_du(u, 2) + d2l_du2 = self.d2l_du2(u) + dx_du = self.dx_du(u) + if dl_du == 0: + assert all(dx_du_i == 0 for dx_du_i in dx_du) + return tuple(0 for _ in dx_du) + d2u_dl2 = -d2l_du2 / dl_du**3 + return tuple( + d2x_du2_i / dl_du**2 + dx_du_i * d2u_dl2 for dx_du_i, d2x_du2_i in zip(dx_du, d2x_du2) + ) + + +def generate_velocities_continuous_acceleration( + position_sequence: list[float], + time_sequence: list[float], + vel_start: float = 0, + vel_end: float = 0, +) -> list[float]: + """ + Generate velocities such that acceleration is continuous at each transition. + + This function solves for the velocities at each point that make the acceleration + at the end of each segment equal to the acceleration at the start of the next. + + Each segment's trajectory is represented by a cubic polynomial, where: + Δpᵢ(t) = c1ᵢ * t + c2ᵢ * t² + c3ᵢ * t³, and + vᵢ(t) = c1ᵢ + 2 * c2ᵢ * t + 3 * c3ᵢ * t² + aᵢ(t) = 2 * c2ᵢ + 6 * c3ᵢ * t + + Geneneration is done by solving the system Ax = b for x, where x is an array of PVT + coefficients for each segment (i.e., [c1₁, c2₁, c3₁, ... c1ₙ, c2ₙ, c3ₙ]. A is + a tridiagonal matrix, and b is an array. The system is formed by combining all of + the following constraint equations: + 1. position at the start and end of each segment is as specified by the user + 2. velocity is continous at each segment transition + 3. acceleration is continuous at each segment transition + 4. velocity at the start and end of the sequence is as specified by the user + """ + num_segments = len(time_sequence) - 1 + A = np.zeros((num_segments * 3, num_segments * 3)) # pylint: disable=invalid-name + b = np.zeros((num_segments * 3, 1)) + # Initial boundary condition + A[0, 0] = 1 + b[0] = vel_start + for segment_index in range(num_segments): + delta_time = time_sequence[segment_index + 1] - time_sequence[segment_index] + delta_pos = position_sequence[segment_index + 1] - position_sequence[segment_index] + # Position equation for segment + row_start = segment_index * 3 + 1 + col_start = segment_index * 3 + A[row_start, col_start : col_start + 3] = [delta_time, delta_time**2, delta_time**3] + b[row_start] = delta_pos + if segment_index < num_segments - 1: + # Middle segments, where velocity is unknown and accelerations are continuous + A[row_start + 1, col_start + 1 : col_start + 4] = [delta_time, 2 * delta_time**2, -1] + b[row_start + 1] = -delta_pos / delta_time + A[row_start + 2, col_start + 2 : col_start + 5] = [delta_time, 1 / delta_time, -1] + b[row_start + 2] = delta_pos / delta_time**2 + else: + # Final segment + A[row_start + 1, col_start + 1 : col_start + 3] = [delta_time, 2 * delta_time**2] + b[row_start + 1] = vel_end - delta_pos / delta_time + # Get banded form of matrix + ab = np.array( + [ + [0] + [A[i, i + 1] for i in range(num_segments * 3 - 1)], + [A[i, i] for i in range(num_segments * 3)], + [A[i + 1, i] for i in range(num_segments * 3 - 1)] + [0], + ] + ) + coefficients = solve_banded((1, 1), ab, b) + # Generate velocities + vel_sequence = [vel_start] + for segment_index in range(1, num_segments): + vel_sequence.append(float(coefficients[segment_index * 3])) + vel_sequence.append(vel_end) + return vel_sequence + + +def generate_positions_continuous_acceleration( + velocity_sequence: list[float], + time_sequence: list[float], + pos_start: float = 0, + pos_end: float = 0, +) -> list[float]: + """ + Generate positions such that acceleration is continuous at each transition. + + This function solves for the positions at each point that make the acceleration + at the end of each segment equal to the acceleration at the start of the next. + + Each segment's trajectory is represented by a cubic polynomial, where: + Δpᵢ(t) = vᵢ(0) * t + c2ᵢ * t² + c3ᵢ * t³, and + vᵢ(t) = vᵢ(0) + 2 * c2ᵢ * t + 3 * c3ᵢ * t² + aᵢ(t) = 2 * c2ᵢ + 6 * c3ᵢ * t + + Geneneration is done by solving the system Ax = b for x, where x is an array of PVT + coefficients for each segment (i.e., [c2₁, c3₁, ... c2ₙ, c3ₙ]. A is + a tridiagonal matrix, and b is an array. The system is formed by combining all of + the following constraint equations: + 1. velocity at the start and end of each segment is as specified by the user + 2. acceleration is continuous at each segment transition + 3. acceleration at the start of the sequence is zero + """ + + def calculate_delta_position(c1: float, c2: float, c3: float, delta_time: float) -> float: + """Calculate the position delta of a segment from its coefficients.""" + return float(c1 * delta_time + c2 * delta_time**2 + c3 * delta_time**3) + + num_segments = len(time_sequence) - 1 + A = np.zeros((num_segments * 2, num_segments * 2)) # pylint: disable=invalid-name + b = np.zeros((num_segments * 2, 1)) + # Initial condition acceleration 0 is zero + A[0, 0] = 1 + for segment_index in range(num_segments): + delta_time = time_sequence[segment_index + 1] - time_sequence[segment_index] + delta_vel = velocity_sequence[segment_index + 1] - velocity_sequence[segment_index] + # Position equation for segment + block_index = segment_index * 2 + A[block_index + 1, block_index : block_index + 2] = [2 * delta_time, 3 * delta_time**2] + b[block_index + 1] = delta_vel + if segment_index < num_segments - 1: + # Middle segments, where accelerations are continuous + A[block_index + 2, block_index + 1 : block_index + 3] = [3 * delta_time / 2, -1] + b[block_index + 2] = -delta_vel / (2 * delta_time) + # Get banded form of matrix + ab = np.array( + [ + [A[i, i] for i in range(num_segments * 2)], + [A[i + 1, i] for i in range(num_segments * 2 - 1)] + [0], + ] + ) + coefficients = solve_banded((1, 0), ab, b) + # Generate positions + pos_sequence = [pos_start] + for segment_index in range(num_segments): + pos_sequence.append( + pos_sequence[-1] + + calculate_delta_position( + velocity_sequence[segment_index], + coefficients[segment_index * 2], + coefficients[segment_index * 2 + 1], + time_sequence[segment_index + 1] - time_sequence[segment_index], + ) + ) + pos_sequence.append(pos_end) + return pos_sequence + + +def interpolate_velocity_finite_difference( + position_sequence: list[float], time_sequence: list[float] +) -> float: + """ + Interpolate the velocity at a point. + + This functions uses the positions and time at the generation point and the preceding and + proceeding points to interpolate an appropriate velocity value. + + Generation is done using a finite different scheme, using the positions and times of + the point in question and the preceding and proceeding points in the sequence. + + See https://en.wikipedia.org/wiki/Cubic_Hermite_spline#Finite_difference. + + :param position_sequence: An array containing the position before, at, and after, the + generation point. + :param time_sequence: An array containing the time before, at, and after, the generation + point. + :return: The interpolated velocity. + """ + # Setup + sum_differences = 0.0 + num_differences = 0.0 + + # Calculate the finite differences for the two segments involving the + # point at which to generate a velocity. + for index in range(2): + delta_time = time_sequence[index + 1] - time_sequence[index] + if delta_time > 0: + num_differences += 1 + sum_differences += ( + position_sequence[index + 1] - position_sequence[index] + ) / delta_time + assert num_differences > 0, "All three points must not have the same time" + + # Return the average difference + return sum_differences / num_differences \ No newline at end of file diff --git a/examples/motion_pvt_sequence_generation/visualization.py b/examples/motion_pvt_sequence_generation/visualization.py index 0b56b4a..6604afa 100644 --- a/examples/motion_pvt_sequence_generation/visualization.py +++ b/examples/motion_pvt_sequence_generation/visualization.py @@ -4,6 +4,7 @@ from matplotlib.lines import Line2D import matplotlib.pyplot as plt import numpy as np +from zaber_motion.ascii import PvtSequenceData import pvt @@ -247,7 +248,10 @@ def plot_pvt_path( def plot_path_and_trajectory( - sequence: pvt.Sequence, axis_indices: list[int] | None = None, num_samples: int | None = None + sequence_data: PvtSequenceData, + times_relative: bool = True, + axis_indices: list[int] | None = None, + num_samples: int | None = None, ) -> None: """ Plot the per-axis trajectories and path view on one figure. @@ -260,6 +264,7 @@ def plot_path_and_trajectory( y, and z axes (as applicable). :param num_samples: The number of samples to use, or unspecified to use a default value. """ + sequence = pvt.Sequence.from_sequence_data(sequence_data, times_relative) # Defer to plot_trajectory if sequence has only one dimension if sequence.dim == 1: plot_pvt_trajectory(sequence, num_samples, show=True)