From 5d58ce791bd5a701b57d5f57c366f97c8075ce64 Mon Sep 17 00:00:00 2001 From: Simon Bachhuber Date: Fri, 18 Apr 2025 14:55:48 +0200 Subject: [PATCH 1/3] v0.3.0; added new 3D connection constraint in `qmt.headingCorrection` --- qmt/functions/heading_correction.py | 883 ++++++++++++++++++++-------- setup.py | 90 +-- 2 files changed, 697 insertions(+), 276 deletions(-) diff --git a/qmt/functions/heading_correction.py b/qmt/functions/heading_correction.py index 94bc205..5365dcd 100644 --- a/qmt/functions/heading_correction.py +++ b/qmt/functions/heading_correction.py @@ -3,17 +3,30 @@ # # SPDX-License-Identifier: MIT import numpy as np -import scipy.signal as signal -from scipy.linalg import lstsq from scipy import interpolate +from scipy.linalg import lstsq +import scipy.signal as signal import qmt from qmt.utils.misc import setDefaults from qmt.utils.plot import AutoFigure -def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings=None, verbose=False, - debug=False, plot=False): +def headingCorrection( + gyr1, + gyr2, + acc1, + acc2, + quat1, + quat2, + t, + joint, + jointInfo, + estSettings=None, + verbose=False, + debug=False, + plot=False, +): """ This function corrects the heading of a kinematic chain of two segments whose orientations are estimated without the use of magnetometers. It corrects the heading of the second segment in a way that its orientation is estimated @@ -25,14 +38,18 @@ def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings :param gyr1: Nx3 array of angular velocities in rad/s :param gyr2: Nx3 array of angular velocities in rad/s + :param acc1: Nx3 array of accelerations in m/s^2 + :param acc2: Nx3 array of accelerations in m/s^2 :param quat1: Nx4 array of orientation quaternions :param quat2: Nx4 array of orientation quaternions :param t: Nx1 vector of the equidistant sampled time signal :param joint: Dofx3 array containing the axes of the joint. :param jointInfo: Dictionary containing additional joint information. Only needed for 3D-joints. Keys: - - **convention**: String of the chosen euler angles convention, e.g. 'xyz' - - **angle_ranges**: 3x2 array with the minimum and maximum value for each joint angle in radians + - **convention**: String of the chosen euler angles convention, e.g. 'xyz' (for `rom` constraint) + - **angle_ranges**: 3x2 array with the minimum and maximum value for each joint angle in radians (for `rom` constraint) + - **r1**: 3D vector specificing IMU1-to-jointcenter vector in meters (for `conn` constraint) + - **r2**: 3D vector specificing IMU2-to-jointcenter vector in meters (for `conn` constraint) :param verbose: show all logs in function. Disabled by Default. :param estSettings: structure containing the needed settings for the estimation: @@ -56,7 +73,7 @@ def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings - Constraints for 1D: (proj, euler, euler_2d, 1d_corr). - Constraints for 2D: (euler, gyro, combined). - - Constraints for 3D: (default) + - Constraints for 3D: (rom, conn) - **optimizer_steps**: Number of Gauss-Newton steps during optimization. @@ -75,12 +92,12 @@ def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings # Input Checking joint = np.array(joint).copy() if len(joint.shape) < 2: - assert joint.shape == (3,), 'Wrong dimension of joint matrix' + assert joint.shape == (3,), "Wrong dimension of joint matrix" dof = 1 else: N = joint.shape[0] - assert N <= 3, 'Wrong dimension of joint matrix' - assert joint.shape == (N, 3), 'Wrong dimension of joint matrix' + assert N <= 3, "Wrong dimension of joint matrix" + assert joint.shape == (N, 3), "Wrong dimension of joint matrix" dof = N # Parameter loading @@ -93,7 +110,7 @@ def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings tauDelta=5.0, tauBias=5.0, ratingMin=0.4, - alignment='backward', + alignment="backward", enableStillness=True, optimizerSteps=5, stillnessTime=3.0, @@ -102,52 +119,61 @@ def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings startRating=1, deltaRange=np.deg2rad(np.linspace(0, 359, 360)), angleRanges=np.ones((3, 1)) * np.array([[-np.pi, np.pi]]), - convention='xyz', + convention="xyz", constraint=None, ) estSettings = setDefaults(estSettings, defaults) - if estSettings['constraint'] is None: - del estSettings['constraint'] # default values are later handled depending on DoF - - useRomConstraints = estSettings['useRomConstraints'] - windowTime = estSettings['windowTime'] - estimationRate = estSettings['estimationRate'] - dataRate = estSettings['dataRate'] - tauDelta = estSettings['tauDelta'] - tauBias = estSettings['tauBias'] - ratingMin = estSettings['ratingMin'] - alignment = estSettings['alignment'] - enableStillness = estSettings['enableStillness'] - optimizerSteps = estSettings['optimizerSteps'] - stillnessTime = estSettings['stillnessTime'] - stillnessThreshold = estSettings['stillnessThreshold'] - stillnessRating = estSettings['stillnessRating'] - startRating = estSettings['startRating'] - deltaRange = estSettings['deltaRange'] - angleRanges = np.asarray(estSettings['angleRanges'], float) - convention = estSettings['convention'] + if estSettings["constraint"] is None: + del estSettings[ + "constraint" + ] # default values are later handled depending on DoF + + useRomConstraints = estSettings["useRomConstraints"] + windowTime = estSettings["windowTime"] + estimationRate = estSettings["estimationRate"] + dataRate = estSettings["dataRate"] + tauDelta = estSettings["tauDelta"] + tauBias = estSettings["tauBias"] + ratingMin = estSettings["ratingMin"] + alignment = estSettings["alignment"] + enableStillness = estSettings["enableStillness"] + optimizerSteps = estSettings["optimizerSteps"] + stillnessTime = estSettings["stillnessTime"] + stillnessThreshold = estSettings["stillnessThreshold"] + stillnessRating = estSettings["stillnessRating"] + startRating = estSettings["startRating"] + deltaRange = estSettings["deltaRange"] + angleRanges = np.asarray(estSettings["angleRanges"], float) + convention = estSettings["convention"] if dof == 3: - angleRanges = np.asarray(jointInfo['angle_ranges'], float) - convention = jointInfo['convention'] - constraint = estSettings.get('constraint', 'default') + constraint = estSettings.get("constraint", "rom") + angleRanges = jointInfo.get("angle_ranges", None) + if angleRanges is not None: + angleRanges = np.asarray(angleRanges, float) + convention = jointInfo.get("convention", None) + conn_constr_r1 = jointInfo.get("r1", None) + conn_constr_r2 = jointInfo.get("r2", None) elif dof == 2: - constraint = estSettings.get('constraint', 'euler') + constraint = estSettings.get("constraint", "euler") else: - constraint = estSettings.get('constraint', 'euler_1d') + constraint = estSettings.get("constraint", "euler_1d") if verbose: - print('parameters: ') + print("parameters: ") for key, val in estSettings.items(): - print(f'{key}: {val}') + print(f"{key}: {val}") # Determine data rate from one of the time signals.Assumption: constant timeDiff = np.diff(t) rate = 1 / timeDiff[1] - assert np.allclose(timeDiff, 1 / rate) # make sure the sampling time is constant to avoid surprises - assert np.isclose(rate % dataRate, 0) or np.isclose(rate % dataRate, dataRate), f'rate should be divisible by ' \ - f'given dataRate: {dataRate}' + assert np.allclose( + timeDiff, 1 / rate + ) # make sure the sampling time is constant to avoid surprises + assert np.isclose(rate % dataRate, 0) or np.isclose(rate % dataRate, dataRate), ( + f"rate should be divisible by " f"given dataRate: {dataRate}" + ) windowSteps = round(windowTime * rate) estimationSteps = round(1 / estimationRate * rate) @@ -157,26 +183,34 @@ def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings # number of total time steps in the time series N = len(t) - if alignment == 'backward': + if alignment == "backward": starts = np.arange(windowSteps, N, estimationSteps, dtype=int) - elif alignment == 'center': - starts = np.arange(windowSteps / 2, N - windowSteps / 2, estimationSteps, dtype=int) - elif alignment == 'forward': + elif alignment == "center": + starts = np.arange( + windowSteps / 2, N - windowSteps / 2, estimationSteps, dtype=int + ) + elif alignment == "forward": starts = np.arange(1, N - windowSteps, estimationSteps, dtype=int) else: - raise ValueError('Wrong alignment type') + raise ValueError("Wrong alignment type") # to have a smoother start up, # add estimations at each data step in the beginning until a complete time window has passed regularStart = starts[0] # start of the regular estimation without smooth startup - starts = np.concatenate((np.arange(dataSteps, starts[0], dataSteps, dtype=int), starts)) + starts = np.concatenate( + (np.arange(dataSteps, starts[0], dataSteps, dtype=int), starts) + ) estimations = len(starts) # number of performed estimations # Initialize the result vectors - delta = np.zeros((estimations, 1)) # delta is the heading offset between the first and second segment - rating = np.zeros((estimations, 1)) # the rating indicates the quality of the estimation + delta = np.zeros( + (estimations, 1) + ) # delta is the heading offset between the first and second segment + rating = np.zeros( + (estimations, 1) + ) # the rating indicates the quality of the estimation stateOut = np.zeros((estimations, 1)) cost = np.zeros((estimations, 1)) # Initialize the filtering time constants @@ -188,23 +222,23 @@ def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings for k in range(1, estimations): # set default values to variables stillnessTrigger = False - state = 'none' + state = "none" # get the current index in the data index = starts[k] # check whether the smooth startup is active # If so, the used data, for the estimation is from index 1 to the current index if index < regularStart: - state = 'startup' + state = "startup" indexStart = 0 indexEnd = index else: # during regular estimation, the start and end index are determined based on # the current index and the chosen aligment type - if alignment == 'center': + if alignment == "center": indexStart = index - int(windowSteps / 2) indexEnd = index + int(windowSteps / 2) - elif alignment == 'forward': + elif alignment == "forward": indexStart = index indexEnd = index + windowSteps else: # alignment == 'backward' @@ -221,67 +255,128 @@ def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings # only do the stillness detection of the setting is set to true and do # not do stillness detection during startup - if enableStillness and state != 'startup': + if enableStillness and state != "startup": # check whether the number of passed samples is larger than the detection peroiod for the stillness # detection if index > stillnessSteps: - stillness = checkStillness(gyr1[index - stillnessSteps:index + 1, :], - gyr2[index - stillnessSteps:index + 1, :], stillnessThreshold) + stillness = checkStillness( + gyr1[index - stillnessSteps : index + 1, :], + gyr2[index - stillnessSteps : index + 1, :], + stillnessThreshold, + ) if stillness: - if stateOut[k-1] == 1: # regular + if stateOut[k - 1] == 1: # regular stillnessTrigger = True - state = 'stillness' + state = "stillness" else: - state = 'regular' + state = "regular" else: - state = 'regular' + state = "regular" else: - if state != 'startup': - state = 'regular' + if state != "startup": + state = "regular" # Stillness correction - if state == 'stillness': - delta[k, :] = stillnessCorrector.stillnessCorrection(quat1[index, :], quat2[index, :], delta[k - 1, :], - stillnessTrigger) + if state == "stillness": + delta[k, :] = stillnessCorrector.stillnessCorrection( + quat1[index, :], quat2[index, :], delta[k - 1, :], stillnessTrigger + ) # Estimation # the estimation will only be performed in startup and regular state, # not in stillness state - if state == 'startup' or state == 'regular': + if state == "startup" or state == "regular": # % for convenience extract the necessary data windows from both # % segments - q1 = quat1[indexStart:indexEnd + 1:dataSteps] - q2 = quat2[indexStart:indexEnd + 1:dataSteps] - g1 = gyr1[indexStart:indexEnd + 1:dataSteps] - g2 = gyr2[indexStart:indexEnd + 1:dataSteps] - time = t[indexStart:indexEnd + 1:dataSteps] + q1 = quat1[indexStart : indexEnd + 1 : dataSteps] + q2 = quat2[indexStart : indexEnd + 1 : dataSteps] + g1 = gyr1[indexStart : indexEnd + 1 : dataSteps] + g2 = gyr2[indexStart : indexEnd + 1 : dataSteps] + a1 = acc1[indexStart : indexEnd + 1 : dataSteps] + a2 = acc2[indexStart : indexEnd + 1 : dataSteps] + time = t[indexStart : indexEnd + 1 : dataSteps] # execute a dedicated estimation algorithm for each type of joint. # for more detailed explanation look in the descriptions of the # corresponding method if dof == 1: - delta[k, :], rating[k, :], cost[k, :] = estimateDelta1d(q1, q2, joint, delta[k - 1, :], constraint, - optimizerSteps) + delta[k, :], rating[k, :], cost[k, :] = estimateDelta1d( + q1, q2, joint, delta[k - 1, :], constraint, optimizerSteps + ) elif dof == 2: - delta[k, :], rating[k, :], cost[k, :] = estimateDelta2d(q1, q2, g1, g2, time, joint, delta[k - 1, :], - constraint, optimizerSteps) + delta[k, :], rating[k, :], cost[k, :] = estimateDelta2d( + q1, + q2, + g1, + g2, + time, + joint, + delta[k - 1, :], + constraint, + optimizerSteps, + ) else: - delta[k, :], rating[k, :], cost[k, :] = estimateDelta3d(q1, q2, angleRanges, convention, deltaRange, - delta[k - 1, :]) + delta[k, :], rating[k, :], cost[k, :] = estimateDelta3d( + q1, + q2, + angleRanges, + convention, + deltaRange, + delta[k - 1, :], + constraint, + conn_constr_r1, + conn_constr_r2, + a1, + a2, + g1, + g2, + optimizerSteps, + 1 / rate, + ) # during startup estimation a second estimation is performed from a # different starting value to ensure that the global minimum is found - if state == 'startup': + if state == "startup": if dof == 1: - delta2, _, cost2 = estimateDelta1d(q1, q2, joint, delta[k - 1, :] + np.pi, constraint, - optimizerSteps) + delta2, _, cost2 = estimateDelta1d( + q1, + q2, + joint, + delta[k - 1, :] + np.pi, + constraint, + optimizerSteps, + ) elif dof == 2: - delta2, _, cost2 = estimateDelta2d(q1, q2, g1, g2, time, joint, delta[k - 1, :] + np.pi, - constraint, optimizerSteps) + delta2, _, cost2 = estimateDelta2d( + q1, + q2, + g1, + g2, + time, + joint, + delta[k - 1, :] + np.pi, + constraint, + optimizerSteps, + ) else: - delta2, _, cost2 = estimateDelta3d(q1, q2, angleRanges, convention, deltaRange, - delta[k - 1, :] + np.pi) + delta2, _, cost2 = estimateDelta3d( + q1, + q2, + angleRanges, + convention, + deltaRange, + delta[k - 1, :] + np.pi, + constraint, + conn_constr_r1, + conn_constr_r2, + a1, + a2, + g1, + g2, + optimizerSteps, + 1 / rate, + ) if useRomConstraints: costRom = romCost(angleRanges, convention, q1, q2, delta[k, :]) costRom2 = romCost(angleRanges, convention, q1, q2, delta2) @@ -292,20 +387,20 @@ def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings # adapt the rating in the two special states startup and stillness and set it to 1 in order # to enable fast convergence - if state == 'startup': + if state == "startup": rating[k, :] = startRating tauDelta[k, :] = 0.4 tauBias[k, :] = 1 - if state == 'stillness': + if state == "stillness": rating[k, :] = stillnessRating - if state == 'regular': + if state == "regular": stateOut[k, :] = 1 - elif state == 'startup': + elif state == "startup": stateOut[k, :] = 2 - elif state == 'stillness': + elif state == "stillness": stateOut[k, :] = 3 else: - raise ValueError('Wrong state') + raise ValueError("Wrong state") # Filtering # use a filter to smooth the trajectory of the estimated delta. @@ -314,19 +409,41 @@ def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings # the filter only extrapolates the estimated slope and dismisses new estimates until rating >= rating_min # shape(Nx1) for delta, rating tauDelta, tauBias inputs - deltaFilt, bias = headingFilter(delta, rating, stateOut, estimationRate, tauDelta, tauBias, ratingMin, windowTime, - alignment) + deltaFilt, bias = headingFilter( + delta, + rating, + stateOut, + estimationRate, + tauDelta, + tauBias, + ratingMin, + windowTime, + alignment, + ) if debug or plot: # uninterpolated debug data - uninterpolated = {'delta': delta, 'deltaFilt': deltaFilt, 'rating': rating, 'stateOut': stateOut} + uninterpolated = { + "delta": delta, + "deltaFilt": deltaFilt, + "rating": rating, + "stateOut": stateOut, + } # Interpolation # Interpolate the data to the given time signal - fDelta = interpolate.interp1d(t[starts], np.unwrap(delta.ravel()), kind='linear', fill_value='extrapolate') - fDeltaFilt = interpolate.interp1d(t[starts], deltaFilt.ravel(), kind='linear', fill_value='extrapolate') - fRating = interpolate.interp1d(t[starts], rating.ravel(), kind='linear', fill_value='extrapolate') - fStateOut = interpolate.interp1d(t[starts], stateOut.ravel(), kind='linear', fill_value='extrapolate') + fDelta = interpolate.interp1d( + t[starts], np.unwrap(delta.ravel()), kind="linear", fill_value="extrapolate" + ) + fDeltaFilt = interpolate.interp1d( + t[starts], deltaFilt.ravel(), kind="linear", fill_value="extrapolate" + ) + fRating = interpolate.interp1d( + t[starts], rating.ravel(), kind="linear", fill_value="extrapolate" + ) + fStateOut = interpolate.interp1d( + t[starts], stateOut.ravel(), kind="linear", fill_value="extrapolate" + ) delta = fDelta(t) deltaFilt = fDeltaFilt(t) rating = fRating(t) @@ -367,7 +484,7 @@ def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings angleRanges=angleRanges, convention=convention, joint=joint, - ) + ), ) if plot: headingCorrection_debugPlot(debugData, plot) @@ -380,24 +497,28 @@ def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings def headingCorrection_debugPlot(debug, fig=None): with AutoFigure(fig) as fig: (ax1, ax2), (ax3, ax4) = fig.subplots(2, 2, sharex=True) - fig.suptitle(AutoFigure.title('headingCorrection')) + fig.suptitle(AutoFigure.title("headingCorrection")) - ax1.plot(np.rad2deg(debug['delta'])[:, np.newaxis]) - ax1.set_ylabel('[°]') - ax1.set_xlabel('index') + ax1.plot(np.rad2deg(debug["delta"])[:, np.newaxis]) + ax1.set_ylabel("[°]") + ax1.set_xlabel("index") ax1.set_title(f'delta, {debug["delta"].shape} {debug["delta"].dtype}') - ax2.plot(np.rad2deg(debug['delta_filt'])[:, np.newaxis]) - ax2.set_ylabel('[°]') - ax2.set_xlabel('index') - ax2.set_title(f'delta_filtered, {debug["delta_filt"].shape} {debug["delta_filt"].dtype}') + ax2.plot(np.rad2deg(debug["delta_filt"])[:, np.newaxis]) + ax2.set_ylabel("[°]") + ax2.set_xlabel("index") + ax2.set_title( + f'delta_filtered, {debug["delta_filt"].shape} {debug["delta_filt"].dtype}' + ) - ax3.plot(debug['rating'][:, np.newaxis]) + ax3.plot(debug["rating"][:, np.newaxis]) ax3.set_title(f'rating, {debug["rating"].shape} {debug["rating"].dtype}') - ax4.plot(debug['state_out'][:, np.newaxis]) - ax4.set_ylabel('[1: regular, 2: startup, 3: stillness]') - ax4.set_title(f'state_out, {debug["state_out"].shape} {debug["state_out"].dtype}') + ax4.plot(debug["state_out"][:, np.newaxis]) + ax4.set_ylabel("[1: regular, 2: startup, 3: stillness]") + ax4.set_title( + f'state_out, {debug["state_out"].shape} {debug["state_out"].dtype}' + ) for ax in (ax1, ax2, ax3, ax4): ax.grid() @@ -409,8 +530,10 @@ def headingCorrection_debugPlot(debug, fig=None): def checkStillness(gyr1, gyr2, still_threshold): # based on Matlab implementation in matlab/lib/HeadingCorrection/heading_corection.m - if np.mean(np.linalg.norm(gyr1, axis=1)) < still_threshold and \ - np.mean(np.linalg.norm(gyr2, axis=1)) < still_threshold: + if ( + np.mean(np.linalg.norm(gyr1, axis=1)) < still_threshold + and np.mean(np.linalg.norm(gyr2, axis=1)) < still_threshold + ): stillness = True else: stillness = False @@ -430,14 +553,19 @@ def stillnessCorrection(self, quat1, quat2, lastDelta, stillnessTrigger): # % stillness phase. It is reset after each rising edge of the stillness # % detection. if self.quatRelRef is None: - self.quatRelRef = qmt.qmult(qmt.qmult(qmt.qinv(quat1), qmt.quatFromAngleAxis( - lastDelta, [0, 0, 1])), quat2) + self.quatRelRef = qmt.qmult( + qmt.qmult(qmt.qinv(quat1), qmt.quatFromAngleAxis(lastDelta, [0, 0, 1])), + quat2, + ) self.deltaStill = lastDelta else: if stillnessTrigger: self.quatRelRef = qmt.qmult( - qmt.qmult(qmt.qinv(quat1), qmt.quatFromAngleAxis(lastDelta, [0, 0, 1])), - quat2) + qmt.qmult( + qmt.qinv(quat1), qmt.quatFromAngleAxis(lastDelta, [0, 0, 1]) + ), + quat2, + ) self.deltaStill = lastDelta qE2E1 = qmt.qmult(qmt.qmult(quat1, self.quatRelRef), qmt.qinv(quat2)) qRel = qmt.qrel(qmt.quatFromAngleAxis(self.deltaStill, [0, 0, 1]), qE2E1) @@ -448,7 +576,17 @@ def stillnessCorrection(self, quat1, quat2, lastDelta, stillnessTrigger): return delta -def headingFilter(delta, rating, state, estimationRate, tauBias, tauDelta, minRating, windowTime, alignment): +def headingFilter( + delta, + rating, + state, + estimationRate, + tauBias, + tauDelta, + minRating, + windowTime, + alignment, +): # based on Matlab implementation in matlab/lib/HeadingCorrection/heading_corection.m N = delta.shape[0] @@ -484,12 +622,22 @@ def headingFilter(delta, rating, state, estimationRate, tauBias, tauDelta, minRa kDeltaEff = max(rating[i, :] * kDelta[i, :], 1 / (j + 1)) j += 1 - deltaDiff = np.clip(qmt.wrapToPi(delta[i, :] - delta[i - 1, :]), -biasClip, biasClip) - bias[i, :] = np.clip(bias[i - 1, :] + kBiasEff * (deltaDiff - bias[i - 1, :]), -biasClip, biasClip) - out[i, :] = out[i - 1, :] + bias[i, :] + kDeltaEff * (qmt.wrapToPi(delta[i, :] - out[i - 1, :]) - bias[i, :]) - if alignment == 'backward': + deltaDiff = np.clip( + qmt.wrapToPi(delta[i, :] - delta[i - 1, :]), -biasClip, biasClip + ) + bias[i, :] = np.clip( + bias[i - 1, :] + kBiasEff * (deltaDiff - bias[i - 1, :]), + -biasClip, + biasClip, + ) + out[i, :] = ( + out[i - 1, :] + + bias[i, :] + + kDeltaEff * (qmt.wrapToPi(delta[i, :] - out[i - 1, :]) - bias[i, :]) + ) + if alignment == "backward": delta_out = out + windowSize / 2 * bias - elif alignment == 'forward': + elif alignment == "forward": delta_out = out - windowSize / 2 * bias else: # center delta_out = out @@ -512,7 +660,9 @@ def estimateDelta1d(quat1, quat2, joint, deltaStart, constraint, optimizationSte # Rating calculation v1 = qmt.rotate(quat1, j) v2 = qmt.rotate(quat2, j) - weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt(v2[:, 0] ** 2 + v2[:, 1] ** 2) + weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt( + v2[:, 0] ** 2 + v2[:, 1] ** 2 + ) rating = qmt.rms(weight) return delta, rating, cost @@ -521,7 +671,9 @@ def estimateDelta1d(quat1, quat2, joint, deltaStart, constraint, optimizationSte def gaussNewtonStep(quat1, quat2, jB1, jB2, delta, constraint): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m errors, jacobi = errorAndJac1D(quat1, quat2, jB1, jB2, delta, constraint) - deltaParams = -1 * lstsq(np.atleast_2d(jacobi @ jacobi), np.atleast_2d(jacobi @ errors))[0] + deltaParams = ( + -1 * lstsq(np.atleast_2d(jacobi @ jacobi), np.atleast_2d(jacobi @ errors))[0] + ) deltaParams = np.squeeze(deltaParams) totalError = np.linalg.norm(errors) @@ -530,16 +682,16 @@ def gaussNewtonStep(quat1, quat2, jB1, jB2, delta, constraint): def errorAndJac1D(q1, q2, jB1, jB2, delta, constraint): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m - if constraint == 'proj': + if constraint == "proj": errors, jacobi = getErrorAndJacProj(delta, q1, q2, jB1) - elif constraint == '1d_corr': + elif constraint == "1d_corr": errors, jacobi = getErrorAndJac1dCorr(delta, q1, q2, jB1, jB2) - elif constraint == 'euler_1d': + elif constraint == "euler_1d": errors, jacobi = getErrorAndJacEuler1D(delta, q1, q2, jB1, jB2) - elif constraint == 'euler_2d': + elif constraint == "euler_2d": errors, jacobi = getErrorAndJacEuler2D(delta, q1, q2, jB1, jB2) else: - raise ValueError('Wrong constrain') + raise ValueError("Wrong constrain") return errors, jacobi @@ -556,18 +708,22 @@ def getErrorAndJacEuler1D(delta, q1, q2, j, jAlt): def CostEuler1D(delta, q1, q2, jB1, jB2): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1]) - qRot1 = qmt.quatFromAngleAxis(np.arccos(np.array([0, 0, 1]) @ jB1), np.cross(np.array([0, 0, 1]), jB1)) - qRot2 = qmt.quatFromAngleAxis(np.arccos(np.array([0, 0, 1]) @ jB2), np.cross(np.array([0, 0, 1]), jB2)) + qRot1 = qmt.quatFromAngleAxis( + np.arccos(np.array([0, 0, 1]) @ jB1), np.cross(np.array([0, 0, 1]), jB1) + ) + qRot2 = qmt.quatFromAngleAxis( + np.arccos(np.array([0, 0, 1]) @ jB2), np.cross(np.array([0, 0, 1]), jB2) + ) qB1E1 = qmt.qmult(q1, qRot1) qB2E2 = qmt.qmult(q2, qRot2) qB2B1 = qmt.qmult(qmt.qmult(qmt.qinv(qB1E1), qE2E1), qB2E2) - angles = qmt.eulerAngles(qB2B1, 'zxy', True) + angles = qmt.eulerAngles(qB2B1, "zxy", True) secondAngle = angles[:, 1] thirdAngle = angles[:, 2] - error = getWeight(q1, q2, jB1, jB2) * np.sqrt(secondAngle ** 2 + thirdAngle ** 2) + error = getWeight(q1, q2, jB1, jB2) * np.sqrt(secondAngle**2 + thirdAngle**2) return error @@ -576,7 +732,9 @@ def getWeight(q1, q2, jB1, jB2): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m v1 = qmt.rotate(q1, jB1) v2 = qmt.rotate(q2, jB2) - weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt(v2[:, 0] ** 2 + v2[:, 1] ** 2) + weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt( + v2[:, 0] ** 2 + v2[:, 1] ** 2 + ) return weight @@ -601,8 +759,10 @@ def CostProj(delta, q1, q2, j): v1 = qmt.rotate(q1, j) v2 = qmt.rotate(q2, j) # weight = sqrt(v1(:, 1).^ 2 + v1(:, 2).^ 2).*sqrt(v2(:, 1).^ 2 + v2(:, 2).^ 2); - weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt(v2[:, 0] ** 2 + v2[:, 1] ** 2) - error = weight * 2. * np.arccos(qRes[:, 0]) + weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt( + v2[:, 0] ** 2 + v2[:, 1] ** 2 + ) + error = weight * 2.0 * np.arccos(qRes[:, 0]) return error @@ -613,7 +773,12 @@ def getErrorAndJac1dCorr(delta, q1, q2, jB1, jB2): weight = getWeight(q1, q2, jB1, jB2) - error = qmt.wrapToPi(np.arctan2(v2[:, 1], v2[:, 0]) - np.arctan2(v1[:, 1], v1[:, 0]) + delta) * weight + error = ( + qmt.wrapToPi( + np.arctan2(v2[:, 1], v2[:, 0]) - np.arctan2(v1[:, 1], v1[:, 0]) + delta + ) + * weight + ) jac = weight return error, jac @@ -622,8 +787,12 @@ def getErrorAndJacEuler2D(delta, q1, q2, j1, j2): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m # % Calculate the error qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1]) - qB2S2 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2)) - qB1S1 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1)) + qB2S2 = qmt.quatFromAngleAxis( + np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2) + ) + qB1S1 = qmt.quatFromAngleAxis( + np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1) + ) qE1B1 = qmt.qinv(qmt.qmult(q1, qB1S1)) qE2B1 = qmt.qmult(qE1B1, qE2E1) @@ -644,9 +813,13 @@ def getErrorAndJacEuler2D(delta, q1, q2, j1, j2): dQB = qmt.qmult(qmt.qmult(qE1B1, dQ3Ba), qB2E2) dQ = dQB - jac = 2 * (dQ[:, 1] * qB2B1[:, 0] + qB2B1[:, 1] * dQ[:, 0] + - dQ[:, 2] * qB2B1[:, 3] + qB2B1[:, 2] * dQ[:, 3]) - jac = jac / np.sqrt(1 - arcsinArg ** 2) + jac = 2 * ( + dQ[:, 1] * qB2B1[:, 0] + + qB2B1[:, 1] * dQ[:, 0] + + dQ[:, 2] * qB2B1[:, 3] + + qB2B1[:, 2] * dQ[:, 3] + ) + jac = jac / np.sqrt(1 - arcsinArg**2) return error, jac @@ -654,14 +827,14 @@ def getErrorAndJacEuler2D(delta, q1, q2, j1, j2): # 2D functions def errorAndJac2D(q1, q2, gyr1, gyr2, t, j1, j2, deltaStart, constraint): # based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_2d.m - if constraint == 'euler': + if constraint == "euler": errors, jacobi = getJacEulerCons(deltaStart[0], q1, q2, j1, j2) - elif constraint == 'gyro': + elif constraint == "gyro": errors, jacobi = getJacGyroCons(j1, j2, q1, q2, gyr1, gyr2, deltaStart) - elif constraint == 'combined': + elif constraint == "combined": errors, jacobi = getJacCombCons(j1, j2, q1, q2, gyr1, gyr2, deltaStart) else: - raise ValueError(f'Wrong constraint: {constraint}') + raise ValueError(f"Wrong constraint: {constraint}") return errors, jacobi @@ -669,8 +842,12 @@ def getJacEulerCons(delta, q1, q2, j1, j2): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_2D.m # % Calculate the error qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1]) - qB2S2 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2)) - qB1S1 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1)) + qB2S2 = qmt.quatFromAngleAxis( + np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2) + ) + qB1S1 = qmt.quatFromAngleAxis( + np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1) + ) qE1B1 = qmt.qinv(qmt.qmult(q1, qB1S1)) qE2B1 = qmt.qmult(qE1B1, qE2E1) @@ -691,9 +868,13 @@ def getJacEulerCons(delta, q1, q2, j1, j2): dQB = qmt.qmult(qmt.qmult(qE1B1, dQ3Ba), qB2E2) dQ = dQB # - jac = 2 * (dQ[:, 1] * qB2B1[:, 0] + qB2B1[:, 1] * dQ[:, 0] + - dQ[:, 2] * qB2B1[:, 3] + qB2B1[:, 2] * dQ[:, 3]) - jac = jac / np.sqrt(1 - arcsinArg ** 2) + jac = 2 * ( + dQ[:, 1] * qB2B1[:, 0] + + qB2B1[:, 1] * dQ[:, 0] + + dQ[:, 2] * qB2B1[:, 3] + + qB2B1[:, 2] * dQ[:, 3] + ) + jac = jac / np.sqrt(1 - arcsinArg**2) return error, jac @@ -704,11 +885,15 @@ def get2DStaticWeight(jointAxes, quat1, quat2): axis2 = jointAxes[1] j1 = qmt.rotate(quat1, axis1) j2 = qmt.rotate(quat2, axis2) - weight = np.sqrt(j1[:, 0] ** 2 + j1[:, 1] ** 2) * np.sqrt(j2[:, 0] ** 2 + j2[:, 1] ** 2) + weight = np.sqrt(j1[:, 0] ** 2 + j1[:, 1] ** 2) * np.sqrt( + j2[:, 0] ** 2 + j2[:, 1] ** 2 + ) return weight -def estimateDelta2d(quat1, quat2, gyr1, gyr2, time, joint, deltaStart, constraint, optimizationSteps): +def estimateDelta2d( + quat1, quat2, gyr1, gyr2, time, joint, deltaStart, constraint, optimizationSteps +): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_2D.m j1 = joint[0, :] j2 = joint[1, :] @@ -716,24 +901,35 @@ def estimateDelta2d(quat1, quat2, gyr1, gyr2, time, joint, deltaStart, constrain cost = 0 for _ in range(optimizationSteps): - deltaInc, cost = gaussNewtonStep2d(quat1, quat2, gyr1, gyr2, time, j1, j2, delta, constraint) + deltaInc, cost = gaussNewtonStep2d( + quat1, quat2, gyr1, gyr2, time, j1, j2, delta, constraint + ) delta = delta + deltaInc - if constraint == 'euler_lin': + if constraint == "euler_lin": delta = delta[0] * time + delta[1] delta = qmt.wrapTo2Pi(delta) # %% Rating calculation j1Global = qmt.rotate(quat1, j1) j2Global = qmt.rotate(quat2, j2) - rating = qmt.rms(np.sqrt(j1Global[:, 0] ** 2 + j1Global[:, 1] ** 2) - * np.sqrt(j2Global[:, 0] ** 2 + j2Global[:, 1] ** 2)) + rating = qmt.rms( + np.sqrt(j1Global[:, 0] ** 2 + j1Global[:, 1] ** 2) + * np.sqrt(j2Global[:, 0] ** 2 + j2Global[:, 1] ** 2) + ) return delta, rating, cost def gaussNewtonStep2d(quat1, quat2, gyr1, gyr2, time, j1, j2, delta, constraint): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_2D.m - errors, jacobi = errorAndJac2D(quat1, quat2, gyr1, gyr2, time, j1, j2, delta, constraint) - deltaParams = -1 * np.linalg.lstsq(np.atleast_2d(jacobi @ jacobi), np.atleast_2d(jacobi @ errors), rcond=None)[0] + errors, jacobi = errorAndJac2D( + quat1, quat2, gyr1, gyr2, time, j1, j2, delta, constraint + ) + deltaParams = ( + -1 + * np.linalg.lstsq( + np.atleast_2d(jacobi @ jacobi), np.atleast_2d(jacobi @ errors), rcond=None + )[0] + ) deltaParams = np.squeeze(deltaParams) totalError = np.linalg.norm(errors) return deltaParams, totalError @@ -757,11 +953,19 @@ def getJacGyroCons(j1, j2, q1, q2, gyr1, gyr2, delta): errors = np.sum(wD * jN, axis=1) # % Calculate one Row of the Jacobian - dj2B = -j2E2 * np.sin(delta) + np.cross(eZ, j2E2, axis=1) * np.cos(delta) \ + dj2B = ( + -j2E2 * np.sin(delta) + + np.cross(eZ, j2E2, axis=1) * np.cos(delta) + eZ * (np.sum(eZ * j2E2, axis=1) * np.sin(delta))[:, np.newaxis] - dwdB = -1 * (-w2E2 * np.sin(delta) + np.cross(eZ, w2E2, axis=1) * np.cos(delta) - + eZ * (np.sum(eZ, w2E2, axis=1) * np.sin(delta))[:, np.newaxis]) - dB = np.sum(dwdB * np.cross(j1E1, j2E1, axis=1), axis=1) + np.sum(wD * np.cross(j1E1, dj2B, axis=1), axis=1) + ) + dwdB = -1 * ( + -w2E2 * np.sin(delta) + + np.cross(eZ, w2E2, axis=1) * np.cos(delta) + + eZ * (np.sum(eZ, w2E2, axis=1) * np.sin(delta))[:, np.newaxis] + ) + dB = np.sum(dwdB * np.cross(j1E1, j2E1, axis=1), axis=1) + np.sum( + wD * np.cross(j1E1, dj2B, axis=1), axis=1 + ) jac = dB return errors, jac @@ -784,17 +988,29 @@ def getJacCombCons(j1, j2, q1, q2, gyr1, gyr2, delta): errorsGyro = np.sum(wD * jN, axis=1) # % Calculate one Row of the Jacobian - dj2B = -j2E2 * np.sin(delta) + np.cross(eE, j2E2, axis=1) * np.cos(delta) + \ - eE * (np.sum(eE * j2E2, axis=1) * np.sin(delta))[:, np.newaxis] - dwdB = -(-w2E2 * np.sin(delta) + np.cross(eE, w2E2, axis=1) * np.cos(delta) + - eE * (np.sum(eE * w2E2, axis=1) * np.sin(delta))[:, np.newaxis]) - dB = np.sum(dwdB * np.cross(j1E1, j2E1, axis=1), axis=1) + np.sum(wD * np.cross(j1E1, dj2B, axis=1), axis=1) + dj2B = ( + -j2E2 * np.sin(delta) + + np.cross(eE, j2E2, axis=1) * np.cos(delta) + + eE * (np.sum(eE * j2E2, axis=1) * np.sin(delta))[:, np.newaxis] + ) + dwdB = -( + -w2E2 * np.sin(delta) + + np.cross(eE, w2E2, axis=1) * np.cos(delta) + + eE * (np.sum(eE * w2E2, axis=1) * np.sin(delta))[:, np.newaxis] + ) + dB = np.sum(dwdB * np.cross(j1E1, j2E1, axis=1), axis=1) + np.sum( + wD * np.cross(j1E1, dj2B, axis=1), axis=1 + ) jacGyro = dB # % Calculate the error qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1]) - qB2S2 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2)) - qB1S1 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1)) + qB2S2 = qmt.quatFromAngleAxis( + np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2) + ) + qB1S1 = qmt.quatFromAngleAxis( + np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1) + ) qE1B1 = qmt.qinv(qmt.qmult(q1, qB1S1)) qE2B1 = qmt.qmult(qE1B1, qE2E1) @@ -816,9 +1032,13 @@ def getJacCombCons(j1, j2, q1, q2, gyr1, gyr2, delta): dQB = qmt.qmult(qmt.qmult(qE1B1, dQ3Ba), qB2E2) dQ = dQB - jac = 2 * (dQ[:, 1] * qB2B1[:, 0] + qB2B1[:, 1] * dQ[:, 0] + - dQ[:, 2] * qB2B1[:, 3] + qB2B1[:, 2] * dQ[:, 3]) - jacEuler = jac / np.sqrt(1 - arcsinArg ** 2) + jac = 2 * ( + dQ[:, 1] * qB2B1[:, 0] + + qB2B1[:, 1] * dQ[:, 0] + + dQ[:, 2] * qB2B1[:, 3] + + qB2B1[:, 2] * dQ[:, 3] + ) + jacEuler = jac / np.sqrt(1 - arcsinArg**2) errors = errorsGyro + errorsEuler jac = jacEuler + jacGyro @@ -826,20 +1046,177 @@ def getJacCombCons(j1, j2, q1, q2, gyr1, gyr2, delta): return errors, jac +def estimateDelta3d( + quat1, + quat2, + angleRanges, + convention, + deltaRange, + deltaStart, + constraint, + r1, + r2, + acc1, + acc2, + gyr1, + gyr2, + optimizationSteps, + Ts, +): + if constraint == "rom": + return estimateDelta3d_rom( + quat1, quat2, angleRanges, convention, deltaRange, deltaStart + ) + elif constraint == "conn": + return estimateDelta3d_conn( + quat1, + quat2, + r1, + r2, + acc1, + acc2, + gyr1, + gyr2, + deltaStart, + optimizationSteps, + Ts, + ) + else: + raise NotImplementedError + + +def estimateDelta3d_conn( + quat1, quat2, r1, r2, acc1, acc2, gyr1, gyr2, deltaStart, optimizationSteps, Ts +): + delta = deltaStart + cost = 0.0 + + for _ in range(optimizationSteps): + deltaInc, cost = gaussNewtonStep_conn( + delta, quat1, quat2, r1, r2, acc1, acc2, gyr1, gyr2, Ts + ) + delta = delta + deltaInc + + delta = qmt.wrapTo2Pi(delta) + + ratings = Cost3DConn( + delta, quat1, quat2, r1, r2, acc1, acc2, gyr1, gyr2, Ts, ratings=True + ) + rating = qmt.rms(ratings) + + return delta, rating, cost + + +def gaussNewtonStep_conn(delta, quat1, quat2, r1, r2, acc1, acc2, gyr1, gyr2, Ts): + # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m + errors, jacobi = getErrorAndJac3DConn( + delta, quat1, quat2, r1, r2, acc1, acc2, gyr1, gyr2, Ts + ) + deltaParams = ( + -1 * lstsq(np.atleast_2d(jacobi @ jacobi), np.atleast_2d(jacobi @ errors))[0] + ) + deltaParams = np.squeeze(deltaParams) + totalError = np.linalg.norm(errors) + + return deltaParams, totalError + + +def getErrorAndJac3DConn(delta, quat1, quat2, r1, r2, acc1, acc2, gyr1, gyr2, Ts): + # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m + errors = Cost3DConn(delta, quat1, quat2, r1, r2, acc1, acc2, gyr1, gyr2, Ts) + eps = 1e-8 + errorsEps = Cost3DConn(delta, quat1, quat2, r1, r2, acc1, acc2, gyr1, gyr2, Ts) + jacobi = (errorsEps - errors) / eps + return errors, jacobi + + +def Cost3DConn(delta, quat1, quat2, r1, r2, acc1, acc2, gyr1, gyr2, Ts, ratings=False): + # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m + qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1]) + # Body 2 realtive to Earth1 + quat2 = qmt.qmult(qE2E1, quat2) + + # calculate both acc1 and acc2 in the common joint center, then they are only + # in different CS, but vector should be the same + acc1_jc = _shift_operator_acc(gyr1, acc1, Ts, r1) + acc2_jc = _shift_operator_acc(gyr2, acc2, Ts, r2) + + acc1_jc_E1 = qmt.rotate(quat1, acc1_jc) + acc2_jc_E1 = qmt.rotate(quat2, acc2_jc) + + error = np.linalg.norm(acc1_jc_E1 - acc2_jc_E1, axis=-1) + + if ratings: + return getRatingCost3DConn(acc1_jc_E1, acc2_jc_E1) + return error + + +def getRatingCost3DConn(acc1_jc_E1, acc2_jc_E1): + # we have two rating measures for this constraint + # 1) both acceleration norms should be identical + # 2) both accelerations without gravity should be > 0, we need excitation + gravity = np.array([0, 0, 9.81]) + a1, a2 = acc1_jc_E1 - gravity, acc2_jc_E1 - gravity + a1_norm, a2_norm = np.linalg.norm(a1, axis=-1), np.linalg.norm(a2, axis=-1) + + rating1 = 1 / (1 + np.abs(a1_norm - a2_norm)) + rating2 = (a1_norm + a2_norm) / 2 + return rating1 * rating2 + + +def _shift_operator_acc(gyr, acc, Ts, r): + gyrdot = _gyrdot_fifth_order(gyr, Ts) + centripetal = np.cross(gyr, np.cross(gyr, r)) + tangential = np.cross(gyrdot, r) + joint_center_acc = acc + centripetal + tangential + return joint_center_acc + + +def _gyrdot_fifth_order(gyr, Ts): + gyrdot = np.zeros_like(gyr) + h = Ts + + gyrdot[2:-2] = (-gyr[4:] + 8 * gyr[3:-1] - 8 * gyr[1:-3] + gyr[:-4]) / (12 * h) + + gyrdot[0] = (gyr[1] - gyr[0]) / h + gyrdot[1] = (gyr[2] - gyr[0]) / (2 * h) + gyrdot[-2] = (gyr[-1] - gyr[-3]) / (2 * h) + gyrdot[-1] = (gyr[-1] - gyr[-2]) / h + return gyrdot + + # 3-D joint -def estimateDelta3d(quat1, quat2, angleRanges, convention, deltaRange, deltaStart): +def estimateDelta3d_rom(quat1, quat2, angleRanges, convention, deltaRange, deltaStart): # based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_3d.m # generate the distribution of values for delta that produce a valid relative orientation - deltaProbability = getPossibleAngles(quat1, quat2, angleRanges[0, :], angleRanges[1, :], angleRanges[2, :], - convention, deltaRange, 'max') + deltaProbability = getPossibleAngles( + quat1, + quat2, + angleRanges[0, :], + angleRanges[1, :], + angleRanges[2, :], + convention, + deltaRange, + "max", + ) # get the maximum maxVal = max(deltaProbability) - distributionLastDelta = quat1.shape[0] / np.pi * abs(qmt.wrapToPi(deltaRange - deltaStart)) + distributionLastDelta = ( + quat1.shape[0] / np.pi * abs(qmt.wrapToPi(deltaRange - deltaStart)) + ) - deltaProbabilityMin = getPossibleAngles(quat1, quat2, angleRanges[0, :], angleRanges[1, :], angleRanges[2, :], - convention, deltaRange, 'min') + deltaProbabilityMin = getPossibleAngles( + quat1, + quat2, + angleRanges[0, :], + angleRanges[1, :], + angleRanges[2, :], + convention, + deltaRange, + "min", + ) distribNewMin = deltaProbabilityMin + distributionLastDelta # % get the minimum @@ -852,8 +1229,10 @@ def estimateDelta3d(quat1, quat2, angleRanges, convention, deltaRange, deltaStar # %% Rating calculation # % calculate the standard deviation of the distribution in order to quantify the quality of the estimation - _, stdConstraint = stdProbAngles(deltaRange[deltaProbability > maxVal / 2], - deltaProbability[deltaProbability > maxVal / 2]) + _, stdConstraint = stdProbAngles( + deltaRange[deltaProbability > maxVal / 2], + deltaProbability[deltaProbability > maxVal / 2], + ) # % scale the standard deviation to a range of [0.1] rating = map_(stdConstraint, 0, np.deg2rad(20), 1, 0) # % clip the window rating to values of [0,1] @@ -861,7 +1240,9 @@ def estimateDelta3d(quat1, quat2, angleRanges, convention, deltaRange, deltaStar return delta, rating, cost -def getPossibleAngles(quat1, quat2, angle1Range, angle2Range, angle3Range, convention, deltaRange, mode): +def getPossibleAngles( + quat1, quat2, angle1Range, angle2Range, angle3Range, convention, deltaRange, mode +): # based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_3d.m qHand = quat1[~np.isnan(quat1[:, 0]), :] qMeta = quat2[~np.isnan(quat2[:, 0]), :] @@ -869,17 +1250,25 @@ def getPossibleAngles(quat1, quat2, angle1Range, angle2Range, angle3Range, conve # qMeta: Mx4 # qHand: Mx4 qE1E2 = qmt.quatFromAngleAxis(deltaRange, [0, 0, 1]) - qE1E2Broadcast, qMetaBroadcast, qHandBroadcast = np.broadcast_arrays(qE1E2[None], qMeta[:, None], qHand[:, None]) + qE1E2Broadcast, qMetaBroadcast, qHandBroadcast = np.broadcast_arrays( + qE1E2[None], qMeta[:, None], qHand[:, None] + ) N = qE1E2.shape[0] M = qMeta.shape[0] - qMetaCorr = qmt.qmult(qE1E2Broadcast.reshape(M * N, 4), qMetaBroadcast.reshape(M * N, 4)).reshape(M, N, 4) + qMetaCorr = qmt.qmult( + qE1E2Broadcast.reshape(M * N, 4), qMetaBroadcast.reshape(M * N, 4) + ).reshape(M, N, 4) # Mx360x4 - qRel = qmt.qrel(qHandBroadcast.reshape(M * N, 4), qMetaCorr.reshape(M * N, 4)).reshape(M, N, 4) + qRel = qmt.qrel( + qHandBroadcast.reshape(M * N, 4), qMetaCorr.reshape(M * N, 4) + ).reshape(M, N, 4) angles = qmt.eulerAngles(qRel.reshape(M * N, 4), convention, True).reshape(M * N, 3) - angleDiff = getAngleDiff(angles.reshape(M * N, 3), angle1Range, angle2Range, angle3Range) + angleDiff = getAngleDiff( + angles.reshape(M * N, 3), angle1Range, angle2Range, angle3Range + ) angleDiff[angleDiff > 0] = 1 angleDiff = angleDiff.astype(bool).reshape(M, N) - if mode == 'min': + if mode == "min": possibleAngles = angleDiff else: possibleAngles = ~angleDiff @@ -893,10 +1282,14 @@ def getAngleDiff(angles, angle1Range, angle2Range, angle3Range): out = abs(np.minimum(-np.sign(d1[:, 0] * d1[:, 1]) * np.min(np.abs(d1), axis=1), 0)) d2 = angle2Range[np.newaxis, :] - angles[:, 1][:, np.newaxis] - out = out + abs(np.minimum(-np.sign(d2[:, 0] * d2[:, 1]) * np.min(np.abs(d2), axis=1), 0)) + out = out + abs( + np.minimum(-np.sign(d2[:, 0] * d2[:, 1]) * np.min(np.abs(d2), axis=1), 0) + ) d3 = angle3Range[np.newaxis, :] - angles[:, 2][:, np.newaxis] - out = out + abs(np.minimum(-np.sign(d3[:, 0] * d3[:, 1]) * np.min(np.abs(d3), axis=1), 0)) + out = out + abs( + np.minimum(-np.sign(d3[:, 0] * d3[:, 1]) * np.min(np.abs(d3), axis=1), 0) + ) return out @@ -922,7 +1315,10 @@ def stdProbAngles(val=None, prob=None): def angularMean(angles, probability): # based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_3d.m N = len(angles) - mean = np.arctan2(1 / N * sum(probability * np.sin(angles)), 1 / N * sum(probability * np.cos(angles))) + mean = np.arctan2( + 1 / N * sum(probability * np.sin(angles)), + 1 / N * sum(probability * np.cos(angles)), + ) mean = qmt.wrapTo2Pi(mean) return mean @@ -936,7 +1332,7 @@ def map_(val, inMin, inMax, outMin, outMax): def movMean1D(x, windowSize): windowSize = np.ones(windowSize) / windowSize x = np.array(x, dtype=float) - xFilt = signal.correlate(x, windowSize, mode='same') + xFilt = signal.correlate(x, windowSize, mode="same") return xFilt @@ -969,7 +1365,9 @@ def romCost(ROM, convention, q1, q2, delta): return rating -def removeHeadingDriftForStraightWalk(t, quat, winlength, stepthreshold, movmeanwidth, plot=False, debug=False): +def removeHeadingDriftForStraightWalk( + t, quat, winlength, stepthreshold, movmeanwidth, plot=False, debug=False +): """ A function that takes a quaternion with slow heading drift and removes that heading drift by assumimg that the heading remains constant except for a few large steps (turns) by multiples of pi. @@ -992,14 +1390,14 @@ def removeHeadingDriftForStraightWalk(t, quat, winlength, stepthreshold, movmean assert np.allclose(timediff, 1 / rate) qrel = qmt.qmult(quat, qmt.qinv(quat[199, :])) - h = qmt.eulerAngles(qrel, 'zyx', intrinsic=True) + h = qmt.eulerAngles(qrel, "zyx", intrinsic=True) h = np.unwrap(h, axis=0) h = h[:, 0] hsteps1 = np.zeros(h.shape) hsteps2 = np.zeros(h.shape) hsteps3 = np.zeros(h.shape) for i in range(winlength, (h.shape[0] - winlength)): - hsteps1[i] = np.mean(h[i:i + winlength]) - np.mean(h[i - winlength:i]) + hsteps1[i] = np.mean(h[i : i + winlength]) - np.mean(h[i - winlength : i]) hsteps1 = hsteps1 - np.mean(hsteps1[np.abs(hsteps1) < np.pi / 4]) for i in range(winlength, (h.shape[0] - winlength)): @@ -1021,26 +1419,33 @@ def removeHeadingDriftForStraightWalk(t, quat, winlength, stepthreshold, movmean hnice[driInd] = drift[driInd] hnice[~driInd] = hstepsfree[~driInd] drift = movMean1D(hnice, movmeanwidth) - corrquat = np.concatenate((np.cos(-drift / 2)[:, None], - np.sin(-drift / 2)[:, None] * np.array([0, 0, 1])), axis=1) + corrquat = np.concatenate( + ( + np.cos(-drift / 2)[:, None], + np.sin(-drift / 2)[:, None] * np.array([0, 0, 1]), + ), + axis=1, + ) quatCorrected = qmt.qmult(corrquat, quat) qrelCorrected = qmt.qmult(quatCorrected, qmt.qinv(quatCorrected[200, :])) - hCorrected = np.unwrap(qmt.eulerAngles(qrelCorrected, 'zyx', 1), 30.0 * np.pi) - hOld = np.unwrap(qmt.eulerAngles(qrel, 'zyx', 1), 3.0 * np.pi) + hCorrected = np.unwrap(qmt.eulerAngles(qrelCorrected, "zyx", 1), 30.0 * np.pi) + hOld = np.unwrap(qmt.eulerAngles(qrel, "zyx", 1), 3.0 * np.pi) if debug or plot: - debugData = dict(h=h, - hsteps1=hsteps1, - hsteps2=hsteps2, - hsteps3=hsteps3, - hsteps4=hsteps4, - hstepsfree=hstepsfree, - drift=drift, - hnice=hnice, - t=t, - h_corrected=hCorrected[:, 0], - h_old=hOld[:, 0]) + debugData = dict( + h=h, + hsteps1=hsteps1, + hsteps2=hsteps2, + hsteps3=hsteps3, + hsteps4=hsteps4, + hstepsfree=hstepsfree, + drift=drift, + hnice=hnice, + t=t, + h_corrected=hCorrected[:, 0], + h_old=hOld[:, 0], + ) if plot: removeHeadingDriftForStraightWalk_debugPlot(debugData, plot) if debug: @@ -1052,35 +1457,35 @@ def removeHeadingDriftForStraightWalk(t, quat, winlength, stepthreshold, movmean def removeHeadingDriftForStraightWalk_debugPlot(debug, fig=None): with AutoFigure(fig) as fig: (ax1, ax2), (ax3, ax4) = fig.subplots(2, 2) - fig.suptitle('removeHeadDrift debug plot') - - ax1.plot(debug['t'], np.rad2deg(debug['h']), '.-') - ax1.plot(debug['t'], np.rad2deg(debug['hsteps1']), '.-') - ax1.plot(debug['t'], np.rad2deg(debug['hsteps2']), '.-') - ax1.plot(debug['t'], np.rad2deg(debug['hsteps3']), '.-') - ax1.legend(['h0', 'h1', 'h2', 'h3']) - ax1.set_ylabel('[°]') - ax1.set_title('h0~h3') - - ax2.plot(debug['t'], np.rad2deg(debug['h']), '.-') - ax2.plot(debug['t'], np.rad2deg(debug['hstepsfree']), '.-') - ax2.plot(debug['t'], np.rad2deg(debug['drift']), '.-') - ax2.legend(['h', 'hstepsfree', 'drift']) - ax2.set_title('h , hstepsfree, drift') - ax2.set_ylabel('[°]') - - ax3.plot(debug['t'], np.rad2deg(debug['hstepsfree']), '.-') - ax3.plot(debug['t'], np.rad2deg(debug['hnice']), '.-') - ax3.plot(debug['t'], np.rad2deg(debug['drift']), '.-') - ax3.legend(['hstepsfree', 'hnice', 'drift']) - ax3.set_title('hstepsfree, hnice, drift') - ax3.set_ylabel('[°]') - - ax4.plot(debug['t'], np.rad2deg(debug['h_old']), '.-') - ax4.plot(debug['t'], np.rad2deg(debug['h_corrected']), '.-') - ax4.legend(['h_old', 'h_corrected']) - ax4.set_title('eulerAngle z-axis') - ax4.set_ylabel('[°]') + fig.suptitle("removeHeadDrift debug plot") + + ax1.plot(debug["t"], np.rad2deg(debug["h"]), ".-") + ax1.plot(debug["t"], np.rad2deg(debug["hsteps1"]), ".-") + ax1.plot(debug["t"], np.rad2deg(debug["hsteps2"]), ".-") + ax1.plot(debug["t"], np.rad2deg(debug["hsteps3"]), ".-") + ax1.legend(["h0", "h1", "h2", "h3"]) + ax1.set_ylabel("[°]") + ax1.set_title("h0~h3") + + ax2.plot(debug["t"], np.rad2deg(debug["h"]), ".-") + ax2.plot(debug["t"], np.rad2deg(debug["hstepsfree"]), ".-") + ax2.plot(debug["t"], np.rad2deg(debug["drift"]), ".-") + ax2.legend(["h", "hstepsfree", "drift"]) + ax2.set_title("h , hstepsfree, drift") + ax2.set_ylabel("[°]") + + ax3.plot(debug["t"], np.rad2deg(debug["hstepsfree"]), ".-") + ax3.plot(debug["t"], np.rad2deg(debug["hnice"]), ".-") + ax3.plot(debug["t"], np.rad2deg(debug["drift"]), ".-") + ax3.legend(["hstepsfree", "hnice", "drift"]) + ax3.set_title("hstepsfree, hnice, drift") + ax3.set_ylabel("[°]") + + ax4.plot(debug["t"], np.rad2deg(debug["h_old"]), ".-") + ax4.plot(debug["t"], np.rad2deg(debug["h_corrected"]), ".-") + ax4.legend(["h_old", "h_corrected"]) + ax4.set_title("eulerAngle z-axis") + ax4.set_ylabel("[°]") for ax in (ax1, ax2, ax3, ax4): ax.grid() diff --git a/setup.py b/setup.py index 12e26a1..51c72e9 100644 --- a/setup.py +++ b/setup.py @@ -5,6 +5,7 @@ Recompile extension modules python3 setup.py build_ext --inplace -f """ + # SPDX-FileCopyrightText: 2021 Daniel Laidig # # SPDX-License-Identifier: MIT @@ -13,69 +14,84 @@ # https://stackoverflow.com/a/60740179 # (note that even with pyproject.toml this is still useful to make `python setup.py sdist` work out-of-the-box) from setuptools import dist -dist.Distribution().fetch_build_eggs(['Cython', 'numpy']) + +dist.Distribution().fetch_build_eggs(["Cython", "numpy"]) import site import sys -from setuptools import setup, find_packages + from Cython.Build import cythonize import numpy as np +from setuptools import find_packages +from setuptools import setup # workaround for develop mode (pip install -e) with PEP517/pyproject.toml cf. https://github.com/pypa/pip/issues/7953 -site.ENABLE_USER_SITE = '--user' in sys.argv[1:] +site.ENABLE_USER_SITE = "--user" in sys.argv[1:] -ext_modules = cythonize([ - 'qmt/cpp/oriestimu.pyx', - 'qmt/cpp/madgwick.pyx', - 'qmt/cpp/quaternion.pyx', -]) +ext_modules = cythonize( + [ + "qmt/cpp/oriestimu.pyx", + "qmt/cpp/madgwick.pyx", + "qmt/cpp/quaternion.pyx", + ] +) for m in ext_modules: m.include_dirs.insert(0, np.get_include()) setup( - name='qmt', - version='0.2.4', - - description='Quaternion-based Inertial Motion Tracking Toolbox', - long_description=open('README.rst', encoding='utf-8').read(), + name="qmt", + version="0.3.0", + description="Quaternion-based Inertial Motion Tracking Toolbox", + long_description=open("README.rst", encoding="utf-8").read(), long_description_content_type="text/x-rst", - url='https://github.com/dlaidig/qmt/', + url="https://github.com/dlaidig/qmt/", project_urls={ - 'Documentation': 'https://qmt.readthedocs.io/', + "Documentation": "https://qmt.readthedocs.io/", }, - - author='Daniel Laidig', - author_email='laidig@control.tu-berlin.de', - license='MIT', + author="Daniel Laidig", + author_email="laidig@control.tu-berlin.de", + license="MIT", classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Science/Research', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 3', + "Development Status :: 4 - Beta", + "Intended Audience :: Science/Research", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", ], - - packages=find_packages(exclude=['contrib', 'docs', 'tests']), + packages=find_packages(exclude=["contrib", "docs", "tests"]), include_package_data=True, zip_safe=False, - - python_requires='>=3.8', - install_requires=['numpy', 'scipy>=1.8.0', 'matplotlib', 'PyYAML', - 'transplant>=0.8.11', # 0.8.11 fixes https://github.com/bastibe/transplant/issues/81 - 'aiohttp>=3.8.1', 'aiofiles', 'orjson', 'qasync', 'vqf>=2.0.0'], + python_requires=">=3.8", + install_requires=[ + "numpy", + "scipy>=1.8.0", + "matplotlib", + "PyYAML", + "transplant>=0.8.11", # 0.8.11 fixes https://github.com/bastibe/transplant/issues/81 + "aiohttp>=3.8.1", + "aiofiles", + "orjson", + "qasync", + "vqf>=2.0.0", + ], extras_require={ # pip3 install --user -e ".[dev]" - 'dev': ['pytest', 'flake8', - 'reuse', 'Cython', 'sphinx', 'recommonmark', 'sphinx-rtd-theme', 'sphinxcontrib-matlabdomain'], + "dev": [ + "pytest", + "flake8", + "reuse", + "Cython", + "sphinx", + "recommonmark", + "sphinx-rtd-theme", + "sphinxcontrib-matlabdomain", + ], }, - entry_points={ - 'console_scripts': [ - 'qmt-webapp = qmt.utils.webapp_cli:main', + "console_scripts": [ + "qmt-webapp = qmt.utils.webapp_cli:main", ], }, - ext_modules=ext_modules, - include_dirs=[np.get_include()], ) From ed1746a200aab86b32b30ec753910d7251d94634 Mon Sep 17 00:00:00 2001 From: Simon Bachhuber Date: Fri, 18 Apr 2025 15:08:33 +0200 Subject: [PATCH 2/3] fixes minus sign error --- qmt/functions/heading_correction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qmt/functions/heading_correction.py b/qmt/functions/heading_correction.py index 5365dcd..d954c79 100644 --- a/qmt/functions/heading_correction.py +++ b/qmt/functions/heading_correction.py @@ -1168,7 +1168,7 @@ def _shift_operator_acc(gyr, acc, Ts, r): gyrdot = _gyrdot_fifth_order(gyr, Ts) centripetal = np.cross(gyr, np.cross(gyr, r)) tangential = np.cross(gyrdot, r) - joint_center_acc = acc + centripetal + tangential + joint_center_acc = acc - centripetal - tangential return joint_center_acc From fdb31607ae0aa09293a86774eb0ae5f8bcbb72cc Mon Sep 17 00:00:00 2001 From: Simon Bachhuber Date: Tue, 22 Apr 2025 10:42:21 +0200 Subject: [PATCH 3/3] undo all black auto-formatting related edits --- qmt/functions/heading_correction.py | 725 +++++++++------------------- setup.py | 92 ++-- 2 files changed, 274 insertions(+), 543 deletions(-) diff --git a/qmt/functions/heading_correction.py b/qmt/functions/heading_correction.py index d954c79..b06fbf9 100644 --- a/qmt/functions/heading_correction.py +++ b/qmt/functions/heading_correction.py @@ -3,30 +3,17 @@ # # SPDX-License-Identifier: MIT import numpy as np -from scipy import interpolate -from scipy.linalg import lstsq import scipy.signal as signal +from scipy.linalg import lstsq +from scipy import interpolate import qmt from qmt.utils.misc import setDefaults from qmt.utils.plot import AutoFigure -def headingCorrection( - gyr1, - gyr2, - acc1, - acc2, - quat1, - quat2, - t, - joint, - jointInfo, - estSettings=None, - verbose=False, - debug=False, - plot=False, -): +def headingCorrection(gyr1, gyr2, acc1, acc2, quat1, quat2, t, joint, jointInfo, estSettings=None, verbose=False, + debug=False, plot=False): """ This function corrects the heading of a kinematic chain of two segments whose orientations are estimated without the use of magnetometers. It corrects the heading of the second segment in a way that its orientation is estimated @@ -92,12 +79,12 @@ def headingCorrection( # Input Checking joint = np.array(joint).copy() if len(joint.shape) < 2: - assert joint.shape == (3,), "Wrong dimension of joint matrix" + assert joint.shape == (3,), 'Wrong dimension of joint matrix' dof = 1 else: N = joint.shape[0] - assert N <= 3, "Wrong dimension of joint matrix" - assert joint.shape == (N, 3), "Wrong dimension of joint matrix" + assert N <= 3, 'Wrong dimension of joint matrix' + assert joint.shape == (N, 3), 'Wrong dimension of joint matrix' dof = N # Parameter loading @@ -110,7 +97,7 @@ def headingCorrection( tauDelta=5.0, tauBias=5.0, ratingMin=0.4, - alignment="backward", + alignment='backward', enableStillness=True, optimizerSteps=5, stillnessTime=3.0, @@ -119,33 +106,31 @@ def headingCorrection( startRating=1, deltaRange=np.deg2rad(np.linspace(0, 359, 360)), angleRanges=np.ones((3, 1)) * np.array([[-np.pi, np.pi]]), - convention="xyz", + convention='xyz', constraint=None, ) estSettings = setDefaults(estSettings, defaults) - if estSettings["constraint"] is None: - del estSettings[ - "constraint" - ] # default values are later handled depending on DoF - - useRomConstraints = estSettings["useRomConstraints"] - windowTime = estSettings["windowTime"] - estimationRate = estSettings["estimationRate"] - dataRate = estSettings["dataRate"] - tauDelta = estSettings["tauDelta"] - tauBias = estSettings["tauBias"] - ratingMin = estSettings["ratingMin"] - alignment = estSettings["alignment"] - enableStillness = estSettings["enableStillness"] - optimizerSteps = estSettings["optimizerSteps"] - stillnessTime = estSettings["stillnessTime"] - stillnessThreshold = estSettings["stillnessThreshold"] - stillnessRating = estSettings["stillnessRating"] - startRating = estSettings["startRating"] - deltaRange = estSettings["deltaRange"] - angleRanges = np.asarray(estSettings["angleRanges"], float) - convention = estSettings["convention"] + if estSettings['constraint'] is None: + del estSettings['constraint'] # default values are later handled depending on DoF + + useRomConstraints = estSettings['useRomConstraints'] + windowTime = estSettings['windowTime'] + estimationRate = estSettings['estimationRate'] + dataRate = estSettings['dataRate'] + tauDelta = estSettings['tauDelta'] + tauBias = estSettings['tauBias'] + ratingMin = estSettings['ratingMin'] + alignment = estSettings['alignment'] + enableStillness = estSettings['enableStillness'] + optimizerSteps = estSettings['optimizerSteps'] + stillnessTime = estSettings['stillnessTime'] + stillnessThreshold = estSettings['stillnessThreshold'] + stillnessRating = estSettings['stillnessRating'] + startRating = estSettings['startRating'] + deltaRange = estSettings['deltaRange'] + angleRanges = np.asarray(estSettings['angleRanges'], float) + convention = estSettings['convention'] if dof == 3: constraint = estSettings.get("constraint", "rom") @@ -156,24 +141,21 @@ def headingCorrection( conn_constr_r1 = jointInfo.get("r1", None) conn_constr_r2 = jointInfo.get("r2", None) elif dof == 2: - constraint = estSettings.get("constraint", "euler") + constraint = estSettings.get('constraint', 'euler') else: - constraint = estSettings.get("constraint", "euler_1d") + constraint = estSettings.get('constraint', 'euler_1d') if verbose: - print("parameters: ") + print('parameters: ') for key, val in estSettings.items(): - print(f"{key}: {val}") + print(f'{key}: {val}') # Determine data rate from one of the time signals.Assumption: constant timeDiff = np.diff(t) rate = 1 / timeDiff[1] - assert np.allclose( - timeDiff, 1 / rate - ) # make sure the sampling time is constant to avoid surprises - assert np.isclose(rate % dataRate, 0) or np.isclose(rate % dataRate, dataRate), ( - f"rate should be divisible by " f"given dataRate: {dataRate}" - ) + assert np.allclose(timeDiff, 1 / rate) # make sure the sampling time is constant to avoid surprises + assert np.isclose(rate % dataRate, 0) or np.isclose(rate % dataRate, dataRate), f'rate should be divisible by ' \ + f'given dataRate: {dataRate}' windowSteps = round(windowTime * rate) estimationSteps = round(1 / estimationRate * rate) @@ -183,34 +165,26 @@ def headingCorrection( # number of total time steps in the time series N = len(t) - if alignment == "backward": + if alignment == 'backward': starts = np.arange(windowSteps, N, estimationSteps, dtype=int) - elif alignment == "center": - starts = np.arange( - windowSteps / 2, N - windowSteps / 2, estimationSteps, dtype=int - ) - elif alignment == "forward": + elif alignment == 'center': + starts = np.arange(windowSteps / 2, N - windowSteps / 2, estimationSteps, dtype=int) + elif alignment == 'forward': starts = np.arange(1, N - windowSteps, estimationSteps, dtype=int) else: - raise ValueError("Wrong alignment type") + raise ValueError('Wrong alignment type') # to have a smoother start up, # add estimations at each data step in the beginning until a complete time window has passed regularStart = starts[0] # start of the regular estimation without smooth startup - starts = np.concatenate( - (np.arange(dataSteps, starts[0], dataSteps, dtype=int), starts) - ) + starts = np.concatenate((np.arange(dataSteps, starts[0], dataSteps, dtype=int), starts)) estimations = len(starts) # number of performed estimations # Initialize the result vectors - delta = np.zeros( - (estimations, 1) - ) # delta is the heading offset between the first and second segment - rating = np.zeros( - (estimations, 1) - ) # the rating indicates the quality of the estimation + delta = np.zeros((estimations, 1)) # delta is the heading offset between the first and second segment + rating = np.zeros((estimations, 1)) # the rating indicates the quality of the estimation stateOut = np.zeros((estimations, 1)) cost = np.zeros((estimations, 1)) # Initialize the filtering time constants @@ -222,23 +196,23 @@ def headingCorrection( for k in range(1, estimations): # set default values to variables stillnessTrigger = False - state = "none" + state = 'none' # get the current index in the data index = starts[k] # check whether the smooth startup is active # If so, the used data, for the estimation is from index 1 to the current index if index < regularStart: - state = "startup" + state = 'startup' indexStart = 0 indexEnd = index else: # during regular estimation, the start and end index are determined based on # the current index and the chosen aligment type - if alignment == "center": + if alignment == 'center': indexStart = index - int(windowSteps / 2) indexEnd = index + int(windowSteps / 2) - elif alignment == "forward": + elif alignment == 'forward': indexStart = index indexEnd = index + windowSteps else: # alignment == 'backward' @@ -255,128 +229,71 @@ def headingCorrection( # only do the stillness detection of the setting is set to true and do # not do stillness detection during startup - if enableStillness and state != "startup": + if enableStillness and state != 'startup': # check whether the number of passed samples is larger than the detection peroiod for the stillness # detection if index > stillnessSteps: - stillness = checkStillness( - gyr1[index - stillnessSteps : index + 1, :], - gyr2[index - stillnessSteps : index + 1, :], - stillnessThreshold, - ) + stillness = checkStillness(gyr1[index - stillnessSteps:index + 1, :], + gyr2[index - stillnessSteps:index + 1, :], stillnessThreshold) if stillness: - if stateOut[k - 1] == 1: # regular + if stateOut[k-1] == 1: # regular stillnessTrigger = True - state = "stillness" + state = 'stillness' else: - state = "regular" + state = 'regular' else: - state = "regular" + state = 'regular' else: - if state != "startup": - state = "regular" + if state != 'startup': + state = 'regular' # Stillness correction - if state == "stillness": - delta[k, :] = stillnessCorrector.stillnessCorrection( - quat1[index, :], quat2[index, :], delta[k - 1, :], stillnessTrigger - ) + if state == 'stillness': + delta[k, :] = stillnessCorrector.stillnessCorrection(quat1[index, :], quat2[index, :], delta[k - 1, :], + stillnessTrigger) # Estimation # the estimation will only be performed in startup and regular state, # not in stillness state - if state == "startup" or state == "regular": + if state == 'startup' or state == 'regular': # % for convenience extract the necessary data windows from both # % segments - q1 = quat1[indexStart : indexEnd + 1 : dataSteps] - q2 = quat2[indexStart : indexEnd + 1 : dataSteps] - g1 = gyr1[indexStart : indexEnd + 1 : dataSteps] - g2 = gyr2[indexStart : indexEnd + 1 : dataSteps] + q1 = quat1[indexStart:indexEnd + 1:dataSteps] + q2 = quat2[indexStart:indexEnd + 1:dataSteps] + g1 = gyr1[indexStart:indexEnd + 1:dataSteps] + g2 = gyr2[indexStart:indexEnd + 1:dataSteps] a1 = acc1[indexStart : indexEnd + 1 : dataSteps] a2 = acc2[indexStart : indexEnd + 1 : dataSteps] - time = t[indexStart : indexEnd + 1 : dataSteps] + time = t[indexStart:indexEnd + 1:dataSteps] # execute a dedicated estimation algorithm for each type of joint. # for more detailed explanation look in the descriptions of the # corresponding method if dof == 1: - delta[k, :], rating[k, :], cost[k, :] = estimateDelta1d( - q1, q2, joint, delta[k - 1, :], constraint, optimizerSteps - ) + delta[k, :], rating[k, :], cost[k, :] = estimateDelta1d(q1, q2, joint, delta[k - 1, :], constraint, + optimizerSteps) elif dof == 2: - delta[k, :], rating[k, :], cost[k, :] = estimateDelta2d( - q1, - q2, - g1, - g2, - time, - joint, - delta[k - 1, :], - constraint, - optimizerSteps, - ) + delta[k, :], rating[k, :], cost[k, :] = estimateDelta2d(q1, q2, g1, g2, time, joint, delta[k - 1, :], + constraint, optimizerSteps) else: - delta[k, :], rating[k, :], cost[k, :] = estimateDelta3d( - q1, - q2, - angleRanges, - convention, - deltaRange, - delta[k - 1, :], - constraint, - conn_constr_r1, - conn_constr_r2, - a1, - a2, - g1, - g2, - optimizerSteps, - 1 / rate, - ) + delta[k, :], rating[k, :], cost[k, :] = estimateDelta3d(q1, q2, angleRanges, convention, deltaRange, + delta[k - 1, :], constraint, conn_constr_r1, + conn_constr_r2, a1, a2, g1, g2, optimizerSteps, 1 / rate) # during startup estimation a second estimation is performed from a # different starting value to ensure that the global minimum is found - if state == "startup": + if state == 'startup': if dof == 1: - delta2, _, cost2 = estimateDelta1d( - q1, - q2, - joint, - delta[k - 1, :] + np.pi, - constraint, - optimizerSteps, - ) + delta2, _, cost2 = estimateDelta1d(q1, q2, joint, delta[k - 1, :] + np.pi, constraint, + optimizerSteps) elif dof == 2: - delta2, _, cost2 = estimateDelta2d( - q1, - q2, - g1, - g2, - time, - joint, - delta[k - 1, :] + np.pi, - constraint, - optimizerSteps, - ) + delta2, _, cost2 = estimateDelta2d(q1, q2, g1, g2, time, joint, delta[k - 1, :] + np.pi, + constraint, optimizerSteps) else: - delta2, _, cost2 = estimateDelta3d( - q1, - q2, - angleRanges, - convention, - deltaRange, - delta[k - 1, :] + np.pi, - constraint, - conn_constr_r1, - conn_constr_r2, - a1, - a2, - g1, - g2, - optimizerSteps, - 1 / rate, - ) + delta2, _, cost2 = estimateDelta3d(q1, q2, angleRanges, convention, deltaRange, + delta[k - 1, :] + np.pi, constraint, conn_constr_r1, + conn_constr_r2, a1, a2, g1, g2, optimizerSteps, 1 / rate) if useRomConstraints: costRom = romCost(angleRanges, convention, q1, q2, delta[k, :]) costRom2 = romCost(angleRanges, convention, q1, q2, delta2) @@ -387,20 +304,20 @@ def headingCorrection( # adapt the rating in the two special states startup and stillness and set it to 1 in order # to enable fast convergence - if state == "startup": + if state == 'startup': rating[k, :] = startRating tauDelta[k, :] = 0.4 tauBias[k, :] = 1 - if state == "stillness": + if state == 'stillness': rating[k, :] = stillnessRating - if state == "regular": + if state == 'regular': stateOut[k, :] = 1 - elif state == "startup": + elif state == 'startup': stateOut[k, :] = 2 - elif state == "stillness": + elif state == 'stillness': stateOut[k, :] = 3 else: - raise ValueError("Wrong state") + raise ValueError('Wrong state') # Filtering # use a filter to smooth the trajectory of the estimated delta. @@ -409,41 +326,19 @@ def headingCorrection( # the filter only extrapolates the estimated slope and dismisses new estimates until rating >= rating_min # shape(Nx1) for delta, rating tauDelta, tauBias inputs - deltaFilt, bias = headingFilter( - delta, - rating, - stateOut, - estimationRate, - tauDelta, - tauBias, - ratingMin, - windowTime, - alignment, - ) + deltaFilt, bias = headingFilter(delta, rating, stateOut, estimationRate, tauDelta, tauBias, ratingMin, windowTime, + alignment) if debug or plot: # uninterpolated debug data - uninterpolated = { - "delta": delta, - "deltaFilt": deltaFilt, - "rating": rating, - "stateOut": stateOut, - } + uninterpolated = {'delta': delta, 'deltaFilt': deltaFilt, 'rating': rating, 'stateOut': stateOut} # Interpolation # Interpolate the data to the given time signal - fDelta = interpolate.interp1d( - t[starts], np.unwrap(delta.ravel()), kind="linear", fill_value="extrapolate" - ) - fDeltaFilt = interpolate.interp1d( - t[starts], deltaFilt.ravel(), kind="linear", fill_value="extrapolate" - ) - fRating = interpolate.interp1d( - t[starts], rating.ravel(), kind="linear", fill_value="extrapolate" - ) - fStateOut = interpolate.interp1d( - t[starts], stateOut.ravel(), kind="linear", fill_value="extrapolate" - ) + fDelta = interpolate.interp1d(t[starts], np.unwrap(delta.ravel()), kind='linear', fill_value='extrapolate') + fDeltaFilt = interpolate.interp1d(t[starts], deltaFilt.ravel(), kind='linear', fill_value='extrapolate') + fRating = interpolate.interp1d(t[starts], rating.ravel(), kind='linear', fill_value='extrapolate') + fStateOut = interpolate.interp1d(t[starts], stateOut.ravel(), kind='linear', fill_value='extrapolate') delta = fDelta(t) deltaFilt = fDeltaFilt(t) rating = fRating(t) @@ -484,7 +379,7 @@ def headingCorrection( angleRanges=angleRanges, convention=convention, joint=joint, - ), + ) ) if plot: headingCorrection_debugPlot(debugData, plot) @@ -497,28 +392,24 @@ def headingCorrection( def headingCorrection_debugPlot(debug, fig=None): with AutoFigure(fig) as fig: (ax1, ax2), (ax3, ax4) = fig.subplots(2, 2, sharex=True) - fig.suptitle(AutoFigure.title("headingCorrection")) + fig.suptitle(AutoFigure.title('headingCorrection')) - ax1.plot(np.rad2deg(debug["delta"])[:, np.newaxis]) - ax1.set_ylabel("[°]") - ax1.set_xlabel("index") + ax1.plot(np.rad2deg(debug['delta'])[:, np.newaxis]) + ax1.set_ylabel('[°]') + ax1.set_xlabel('index') ax1.set_title(f'delta, {debug["delta"].shape} {debug["delta"].dtype}') - ax2.plot(np.rad2deg(debug["delta_filt"])[:, np.newaxis]) - ax2.set_ylabel("[°]") - ax2.set_xlabel("index") - ax2.set_title( - f'delta_filtered, {debug["delta_filt"].shape} {debug["delta_filt"].dtype}' - ) + ax2.plot(np.rad2deg(debug['delta_filt'])[:, np.newaxis]) + ax2.set_ylabel('[°]') + ax2.set_xlabel('index') + ax2.set_title(f'delta_filtered, {debug["delta_filt"].shape} {debug["delta_filt"].dtype}') - ax3.plot(debug["rating"][:, np.newaxis]) + ax3.plot(debug['rating'][:, np.newaxis]) ax3.set_title(f'rating, {debug["rating"].shape} {debug["rating"].dtype}') - ax4.plot(debug["state_out"][:, np.newaxis]) - ax4.set_ylabel("[1: regular, 2: startup, 3: stillness]") - ax4.set_title( - f'state_out, {debug["state_out"].shape} {debug["state_out"].dtype}' - ) + ax4.plot(debug['state_out'][:, np.newaxis]) + ax4.set_ylabel('[1: regular, 2: startup, 3: stillness]') + ax4.set_title(f'state_out, {debug["state_out"].shape} {debug["state_out"].dtype}') for ax in (ax1, ax2, ax3, ax4): ax.grid() @@ -530,10 +421,8 @@ def headingCorrection_debugPlot(debug, fig=None): def checkStillness(gyr1, gyr2, still_threshold): # based on Matlab implementation in matlab/lib/HeadingCorrection/heading_corection.m - if ( - np.mean(np.linalg.norm(gyr1, axis=1)) < still_threshold - and np.mean(np.linalg.norm(gyr2, axis=1)) < still_threshold - ): + if np.mean(np.linalg.norm(gyr1, axis=1)) < still_threshold and \ + np.mean(np.linalg.norm(gyr2, axis=1)) < still_threshold: stillness = True else: stillness = False @@ -553,19 +442,14 @@ def stillnessCorrection(self, quat1, quat2, lastDelta, stillnessTrigger): # % stillness phase. It is reset after each rising edge of the stillness # % detection. if self.quatRelRef is None: - self.quatRelRef = qmt.qmult( - qmt.qmult(qmt.qinv(quat1), qmt.quatFromAngleAxis(lastDelta, [0, 0, 1])), - quat2, - ) + self.quatRelRef = qmt.qmult(qmt.qmult(qmt.qinv(quat1), qmt.quatFromAngleAxis( + lastDelta, [0, 0, 1])), quat2) self.deltaStill = lastDelta else: if stillnessTrigger: self.quatRelRef = qmt.qmult( - qmt.qmult( - qmt.qinv(quat1), qmt.quatFromAngleAxis(lastDelta, [0, 0, 1]) - ), - quat2, - ) + qmt.qmult(qmt.qinv(quat1), qmt.quatFromAngleAxis(lastDelta, [0, 0, 1])), + quat2) self.deltaStill = lastDelta qE2E1 = qmt.qmult(qmt.qmult(quat1, self.quatRelRef), qmt.qinv(quat2)) qRel = qmt.qrel(qmt.quatFromAngleAxis(self.deltaStill, [0, 0, 1]), qE2E1) @@ -576,17 +460,7 @@ def stillnessCorrection(self, quat1, quat2, lastDelta, stillnessTrigger): return delta -def headingFilter( - delta, - rating, - state, - estimationRate, - tauBias, - tauDelta, - minRating, - windowTime, - alignment, -): +def headingFilter(delta, rating, state, estimationRate, tauBias, tauDelta, minRating, windowTime, alignment): # based on Matlab implementation in matlab/lib/HeadingCorrection/heading_corection.m N = delta.shape[0] @@ -622,22 +496,12 @@ def headingFilter( kDeltaEff = max(rating[i, :] * kDelta[i, :], 1 / (j + 1)) j += 1 - deltaDiff = np.clip( - qmt.wrapToPi(delta[i, :] - delta[i - 1, :]), -biasClip, biasClip - ) - bias[i, :] = np.clip( - bias[i - 1, :] + kBiasEff * (deltaDiff - bias[i - 1, :]), - -biasClip, - biasClip, - ) - out[i, :] = ( - out[i - 1, :] - + bias[i, :] - + kDeltaEff * (qmt.wrapToPi(delta[i, :] - out[i - 1, :]) - bias[i, :]) - ) - if alignment == "backward": + deltaDiff = np.clip(qmt.wrapToPi(delta[i, :] - delta[i - 1, :]), -biasClip, biasClip) + bias[i, :] = np.clip(bias[i - 1, :] + kBiasEff * (deltaDiff - bias[i - 1, :]), -biasClip, biasClip) + out[i, :] = out[i - 1, :] + bias[i, :] + kDeltaEff * (qmt.wrapToPi(delta[i, :] - out[i - 1, :]) - bias[i, :]) + if alignment == 'backward': delta_out = out + windowSize / 2 * bias - elif alignment == "forward": + elif alignment == 'forward': delta_out = out - windowSize / 2 * bias else: # center delta_out = out @@ -660,9 +524,7 @@ def estimateDelta1d(quat1, quat2, joint, deltaStart, constraint, optimizationSte # Rating calculation v1 = qmt.rotate(quat1, j) v2 = qmt.rotate(quat2, j) - weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt( - v2[:, 0] ** 2 + v2[:, 1] ** 2 - ) + weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt(v2[:, 0] ** 2 + v2[:, 1] ** 2) rating = qmt.rms(weight) return delta, rating, cost @@ -671,9 +533,7 @@ def estimateDelta1d(quat1, quat2, joint, deltaStart, constraint, optimizationSte def gaussNewtonStep(quat1, quat2, jB1, jB2, delta, constraint): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m errors, jacobi = errorAndJac1D(quat1, quat2, jB1, jB2, delta, constraint) - deltaParams = ( - -1 * lstsq(np.atleast_2d(jacobi @ jacobi), np.atleast_2d(jacobi @ errors))[0] - ) + deltaParams = -1 * lstsq(np.atleast_2d(jacobi @ jacobi), np.atleast_2d(jacobi @ errors))[0] deltaParams = np.squeeze(deltaParams) totalError = np.linalg.norm(errors) @@ -682,16 +542,16 @@ def gaussNewtonStep(quat1, quat2, jB1, jB2, delta, constraint): def errorAndJac1D(q1, q2, jB1, jB2, delta, constraint): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m - if constraint == "proj": + if constraint == 'proj': errors, jacobi = getErrorAndJacProj(delta, q1, q2, jB1) - elif constraint == "1d_corr": + elif constraint == '1d_corr': errors, jacobi = getErrorAndJac1dCorr(delta, q1, q2, jB1, jB2) - elif constraint == "euler_1d": + elif constraint == 'euler_1d': errors, jacobi = getErrorAndJacEuler1D(delta, q1, q2, jB1, jB2) - elif constraint == "euler_2d": + elif constraint == 'euler_2d': errors, jacobi = getErrorAndJacEuler2D(delta, q1, q2, jB1, jB2) else: - raise ValueError("Wrong constrain") + raise ValueError('Wrong constrain') return errors, jacobi @@ -708,22 +568,18 @@ def getErrorAndJacEuler1D(delta, q1, q2, j, jAlt): def CostEuler1D(delta, q1, q2, jB1, jB2): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1]) - qRot1 = qmt.quatFromAngleAxis( - np.arccos(np.array([0, 0, 1]) @ jB1), np.cross(np.array([0, 0, 1]), jB1) - ) - qRot2 = qmt.quatFromAngleAxis( - np.arccos(np.array([0, 0, 1]) @ jB2), np.cross(np.array([0, 0, 1]), jB2) - ) + qRot1 = qmt.quatFromAngleAxis(np.arccos(np.array([0, 0, 1]) @ jB1), np.cross(np.array([0, 0, 1]), jB1)) + qRot2 = qmt.quatFromAngleAxis(np.arccos(np.array([0, 0, 1]) @ jB2), np.cross(np.array([0, 0, 1]), jB2)) qB1E1 = qmt.qmult(q1, qRot1) qB2E2 = qmt.qmult(q2, qRot2) qB2B1 = qmt.qmult(qmt.qmult(qmt.qinv(qB1E1), qE2E1), qB2E2) - angles = qmt.eulerAngles(qB2B1, "zxy", True) + angles = qmt.eulerAngles(qB2B1, 'zxy', True) secondAngle = angles[:, 1] thirdAngle = angles[:, 2] - error = getWeight(q1, q2, jB1, jB2) * np.sqrt(secondAngle**2 + thirdAngle**2) + error = getWeight(q1, q2, jB1, jB2) * np.sqrt(secondAngle ** 2 + thirdAngle ** 2) return error @@ -732,9 +588,7 @@ def getWeight(q1, q2, jB1, jB2): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m v1 = qmt.rotate(q1, jB1) v2 = qmt.rotate(q2, jB2) - weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt( - v2[:, 0] ** 2 + v2[:, 1] ** 2 - ) + weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt(v2[:, 0] ** 2 + v2[:, 1] ** 2) return weight @@ -759,10 +613,8 @@ def CostProj(delta, q1, q2, j): v1 = qmt.rotate(q1, j) v2 = qmt.rotate(q2, j) # weight = sqrt(v1(:, 1).^ 2 + v1(:, 2).^ 2).*sqrt(v2(:, 1).^ 2 + v2(:, 2).^ 2); - weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt( - v2[:, 0] ** 2 + v2[:, 1] ** 2 - ) - error = weight * 2.0 * np.arccos(qRes[:, 0]) + weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt(v2[:, 0] ** 2 + v2[:, 1] ** 2) + error = weight * 2. * np.arccos(qRes[:, 0]) return error @@ -773,12 +625,7 @@ def getErrorAndJac1dCorr(delta, q1, q2, jB1, jB2): weight = getWeight(q1, q2, jB1, jB2) - error = ( - qmt.wrapToPi( - np.arctan2(v2[:, 1], v2[:, 0]) - np.arctan2(v1[:, 1], v1[:, 0]) + delta - ) - * weight - ) + error = qmt.wrapToPi(np.arctan2(v2[:, 1], v2[:, 0]) - np.arctan2(v1[:, 1], v1[:, 0]) + delta) * weight jac = weight return error, jac @@ -787,12 +634,8 @@ def getErrorAndJacEuler2D(delta, q1, q2, j1, j2): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m # % Calculate the error qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1]) - qB2S2 = qmt.quatFromAngleAxis( - np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2) - ) - qB1S1 = qmt.quatFromAngleAxis( - np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1) - ) + qB2S2 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2)) + qB1S1 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1)) qE1B1 = qmt.qinv(qmt.qmult(q1, qB1S1)) qE2B1 = qmt.qmult(qE1B1, qE2E1) @@ -813,13 +656,9 @@ def getErrorAndJacEuler2D(delta, q1, q2, j1, j2): dQB = qmt.qmult(qmt.qmult(qE1B1, dQ3Ba), qB2E2) dQ = dQB - jac = 2 * ( - dQ[:, 1] * qB2B1[:, 0] - + qB2B1[:, 1] * dQ[:, 0] - + dQ[:, 2] * qB2B1[:, 3] - + qB2B1[:, 2] * dQ[:, 3] - ) - jac = jac / np.sqrt(1 - arcsinArg**2) + jac = 2 * (dQ[:, 1] * qB2B1[:, 0] + qB2B1[:, 1] * dQ[:, 0] + + dQ[:, 2] * qB2B1[:, 3] + qB2B1[:, 2] * dQ[:, 3]) + jac = jac / np.sqrt(1 - arcsinArg ** 2) return error, jac @@ -827,14 +666,14 @@ def getErrorAndJacEuler2D(delta, q1, q2, j1, j2): # 2D functions def errorAndJac2D(q1, q2, gyr1, gyr2, t, j1, j2, deltaStart, constraint): # based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_2d.m - if constraint == "euler": + if constraint == 'euler': errors, jacobi = getJacEulerCons(deltaStart[0], q1, q2, j1, j2) - elif constraint == "gyro": + elif constraint == 'gyro': errors, jacobi = getJacGyroCons(j1, j2, q1, q2, gyr1, gyr2, deltaStart) - elif constraint == "combined": + elif constraint == 'combined': errors, jacobi = getJacCombCons(j1, j2, q1, q2, gyr1, gyr2, deltaStart) else: - raise ValueError(f"Wrong constraint: {constraint}") + raise ValueError(f'Wrong constraint: {constraint}') return errors, jacobi @@ -842,12 +681,8 @@ def getJacEulerCons(delta, q1, q2, j1, j2): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_2D.m # % Calculate the error qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1]) - qB2S2 = qmt.quatFromAngleAxis( - np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2) - ) - qB1S1 = qmt.quatFromAngleAxis( - np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1) - ) + qB2S2 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2)) + qB1S1 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1)) qE1B1 = qmt.qinv(qmt.qmult(q1, qB1S1)) qE2B1 = qmt.qmult(qE1B1, qE2E1) @@ -868,13 +703,9 @@ def getJacEulerCons(delta, q1, q2, j1, j2): dQB = qmt.qmult(qmt.qmult(qE1B1, dQ3Ba), qB2E2) dQ = dQB # - jac = 2 * ( - dQ[:, 1] * qB2B1[:, 0] - + qB2B1[:, 1] * dQ[:, 0] - + dQ[:, 2] * qB2B1[:, 3] - + qB2B1[:, 2] * dQ[:, 3] - ) - jac = jac / np.sqrt(1 - arcsinArg**2) + jac = 2 * (dQ[:, 1] * qB2B1[:, 0] + qB2B1[:, 1] * dQ[:, 0] + + dQ[:, 2] * qB2B1[:, 3] + qB2B1[:, 2] * dQ[:, 3]) + jac = jac / np.sqrt(1 - arcsinArg ** 2) return error, jac @@ -885,15 +716,11 @@ def get2DStaticWeight(jointAxes, quat1, quat2): axis2 = jointAxes[1] j1 = qmt.rotate(quat1, axis1) j2 = qmt.rotate(quat2, axis2) - weight = np.sqrt(j1[:, 0] ** 2 + j1[:, 1] ** 2) * np.sqrt( - j2[:, 0] ** 2 + j2[:, 1] ** 2 - ) + weight = np.sqrt(j1[:, 0] ** 2 + j1[:, 1] ** 2) * np.sqrt(j2[:, 0] ** 2 + j2[:, 1] ** 2) return weight -def estimateDelta2d( - quat1, quat2, gyr1, gyr2, time, joint, deltaStart, constraint, optimizationSteps -): +def estimateDelta2d(quat1, quat2, gyr1, gyr2, time, joint, deltaStart, constraint, optimizationSteps): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_2D.m j1 = joint[0, :] j2 = joint[1, :] @@ -901,35 +728,24 @@ def estimateDelta2d( cost = 0 for _ in range(optimizationSteps): - deltaInc, cost = gaussNewtonStep2d( - quat1, quat2, gyr1, gyr2, time, j1, j2, delta, constraint - ) + deltaInc, cost = gaussNewtonStep2d(quat1, quat2, gyr1, gyr2, time, j1, j2, delta, constraint) delta = delta + deltaInc - if constraint == "euler_lin": + if constraint == 'euler_lin': delta = delta[0] * time + delta[1] delta = qmt.wrapTo2Pi(delta) # %% Rating calculation j1Global = qmt.rotate(quat1, j1) j2Global = qmt.rotate(quat2, j2) - rating = qmt.rms( - np.sqrt(j1Global[:, 0] ** 2 + j1Global[:, 1] ** 2) - * np.sqrt(j2Global[:, 0] ** 2 + j2Global[:, 1] ** 2) - ) + rating = qmt.rms(np.sqrt(j1Global[:, 0] ** 2 + j1Global[:, 1] ** 2) + * np.sqrt(j2Global[:, 0] ** 2 + j2Global[:, 1] ** 2)) return delta, rating, cost def gaussNewtonStep2d(quat1, quat2, gyr1, gyr2, time, j1, j2, delta, constraint): # based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_2D.m - errors, jacobi = errorAndJac2D( - quat1, quat2, gyr1, gyr2, time, j1, j2, delta, constraint - ) - deltaParams = ( - -1 - * np.linalg.lstsq( - np.atleast_2d(jacobi @ jacobi), np.atleast_2d(jacobi @ errors), rcond=None - )[0] - ) + errors, jacobi = errorAndJac2D(quat1, quat2, gyr1, gyr2, time, j1, j2, delta, constraint) + deltaParams = -1 * np.linalg.lstsq(np.atleast_2d(jacobi @ jacobi), np.atleast_2d(jacobi @ errors), rcond=None)[0] deltaParams = np.squeeze(deltaParams) totalError = np.linalg.norm(errors) return deltaParams, totalError @@ -953,19 +769,11 @@ def getJacGyroCons(j1, j2, q1, q2, gyr1, gyr2, delta): errors = np.sum(wD * jN, axis=1) # % Calculate one Row of the Jacobian - dj2B = ( - -j2E2 * np.sin(delta) - + np.cross(eZ, j2E2, axis=1) * np.cos(delta) + dj2B = -j2E2 * np.sin(delta) + np.cross(eZ, j2E2, axis=1) * np.cos(delta) \ + eZ * (np.sum(eZ * j2E2, axis=1) * np.sin(delta))[:, np.newaxis] - ) - dwdB = -1 * ( - -w2E2 * np.sin(delta) - + np.cross(eZ, w2E2, axis=1) * np.cos(delta) - + eZ * (np.sum(eZ, w2E2, axis=1) * np.sin(delta))[:, np.newaxis] - ) - dB = np.sum(dwdB * np.cross(j1E1, j2E1, axis=1), axis=1) + np.sum( - wD * np.cross(j1E1, dj2B, axis=1), axis=1 - ) + dwdB = -1 * (-w2E2 * np.sin(delta) + np.cross(eZ, w2E2, axis=1) * np.cos(delta) + + eZ * (np.sum(eZ, w2E2, axis=1) * np.sin(delta))[:, np.newaxis]) + dB = np.sum(dwdB * np.cross(j1E1, j2E1, axis=1), axis=1) + np.sum(wD * np.cross(j1E1, dj2B, axis=1), axis=1) jac = dB return errors, jac @@ -988,29 +796,17 @@ def getJacCombCons(j1, j2, q1, q2, gyr1, gyr2, delta): errorsGyro = np.sum(wD * jN, axis=1) # % Calculate one Row of the Jacobian - dj2B = ( - -j2E2 * np.sin(delta) - + np.cross(eE, j2E2, axis=1) * np.cos(delta) - + eE * (np.sum(eE * j2E2, axis=1) * np.sin(delta))[:, np.newaxis] - ) - dwdB = -( - -w2E2 * np.sin(delta) - + np.cross(eE, w2E2, axis=1) * np.cos(delta) - + eE * (np.sum(eE * w2E2, axis=1) * np.sin(delta))[:, np.newaxis] - ) - dB = np.sum(dwdB * np.cross(j1E1, j2E1, axis=1), axis=1) + np.sum( - wD * np.cross(j1E1, dj2B, axis=1), axis=1 - ) + dj2B = -j2E2 * np.sin(delta) + np.cross(eE, j2E2, axis=1) * np.cos(delta) + \ + eE * (np.sum(eE * j2E2, axis=1) * np.sin(delta))[:, np.newaxis] + dwdB = -(-w2E2 * np.sin(delta) + np.cross(eE, w2E2, axis=1) * np.cos(delta) + + eE * (np.sum(eE * w2E2, axis=1) * np.sin(delta))[:, np.newaxis]) + dB = np.sum(dwdB * np.cross(j1E1, j2E1, axis=1), axis=1) + np.sum(wD * np.cross(j1E1, dj2B, axis=1), axis=1) jacGyro = dB # % Calculate the error qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1]) - qB2S2 = qmt.quatFromAngleAxis( - np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2) - ) - qB1S1 = qmt.quatFromAngleAxis( - np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1) - ) + qB2S2 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2)) + qB1S1 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1)) qE1B1 = qmt.qinv(qmt.qmult(q1, qB1S1)) qE2B1 = qmt.qmult(qE1B1, qE2E1) @@ -1032,13 +828,9 @@ def getJacCombCons(j1, j2, q1, q2, gyr1, gyr2, delta): dQB = qmt.qmult(qmt.qmult(qE1B1, dQ3Ba), qB2E2) dQ = dQB - jac = 2 * ( - dQ[:, 1] * qB2B1[:, 0] - + qB2B1[:, 1] * dQ[:, 0] - + dQ[:, 2] * qB2B1[:, 3] - + qB2B1[:, 2] * dQ[:, 3] - ) - jacEuler = jac / np.sqrt(1 - arcsinArg**2) + jac = 2 * (dQ[:, 1] * qB2B1[:, 0] + qB2B1[:, 1] * dQ[:, 0] + + dQ[:, 2] * qB2B1[:, 3] + qB2B1[:, 2] * dQ[:, 3]) + jacEuler = jac / np.sqrt(1 - arcsinArg ** 2) errors = errorsGyro + errorsEuler jac = jacEuler + jacGyro @@ -1046,6 +838,7 @@ def getJacCombCons(j1, j2, q1, q2, gyr1, gyr2, delta): return errors, jac + def estimateDelta3d( quat1, quat2, @@ -1189,34 +982,16 @@ def _gyrdot_fifth_order(gyr, Ts): def estimateDelta3d_rom(quat1, quat2, angleRanges, convention, deltaRange, deltaStart): # based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_3d.m # generate the distribution of values for delta that produce a valid relative orientation - deltaProbability = getPossibleAngles( - quat1, - quat2, - angleRanges[0, :], - angleRanges[1, :], - angleRanges[2, :], - convention, - deltaRange, - "max", - ) + deltaProbability = getPossibleAngles(quat1, quat2, angleRanges[0, :], angleRanges[1, :], angleRanges[2, :], + convention, deltaRange, 'max') # get the maximum maxVal = max(deltaProbability) - distributionLastDelta = ( - quat1.shape[0] / np.pi * abs(qmt.wrapToPi(deltaRange - deltaStart)) - ) + distributionLastDelta = quat1.shape[0] / np.pi * abs(qmt.wrapToPi(deltaRange - deltaStart)) - deltaProbabilityMin = getPossibleAngles( - quat1, - quat2, - angleRanges[0, :], - angleRanges[1, :], - angleRanges[2, :], - convention, - deltaRange, - "min", - ) + deltaProbabilityMin = getPossibleAngles(quat1, quat2, angleRanges[0, :], angleRanges[1, :], angleRanges[2, :], + convention, deltaRange, 'min') distribNewMin = deltaProbabilityMin + distributionLastDelta # % get the minimum @@ -1229,10 +1004,8 @@ def estimateDelta3d_rom(quat1, quat2, angleRanges, convention, deltaRange, delta # %% Rating calculation # % calculate the standard deviation of the distribution in order to quantify the quality of the estimation - _, stdConstraint = stdProbAngles( - deltaRange[deltaProbability > maxVal / 2], - deltaProbability[deltaProbability > maxVal / 2], - ) + _, stdConstraint = stdProbAngles(deltaRange[deltaProbability > maxVal / 2], + deltaProbability[deltaProbability > maxVal / 2]) # % scale the standard deviation to a range of [0.1] rating = map_(stdConstraint, 0, np.deg2rad(20), 1, 0) # % clip the window rating to values of [0,1] @@ -1240,9 +1013,7 @@ def estimateDelta3d_rom(quat1, quat2, angleRanges, convention, deltaRange, delta return delta, rating, cost -def getPossibleAngles( - quat1, quat2, angle1Range, angle2Range, angle3Range, convention, deltaRange, mode -): +def getPossibleAngles(quat1, quat2, angle1Range, angle2Range, angle3Range, convention, deltaRange, mode): # based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_3d.m qHand = quat1[~np.isnan(quat1[:, 0]), :] qMeta = quat2[~np.isnan(quat2[:, 0]), :] @@ -1250,25 +1021,17 @@ def getPossibleAngles( # qMeta: Mx4 # qHand: Mx4 qE1E2 = qmt.quatFromAngleAxis(deltaRange, [0, 0, 1]) - qE1E2Broadcast, qMetaBroadcast, qHandBroadcast = np.broadcast_arrays( - qE1E2[None], qMeta[:, None], qHand[:, None] - ) + qE1E2Broadcast, qMetaBroadcast, qHandBroadcast = np.broadcast_arrays(qE1E2[None], qMeta[:, None], qHand[:, None]) N = qE1E2.shape[0] M = qMeta.shape[0] - qMetaCorr = qmt.qmult( - qE1E2Broadcast.reshape(M * N, 4), qMetaBroadcast.reshape(M * N, 4) - ).reshape(M, N, 4) + qMetaCorr = qmt.qmult(qE1E2Broadcast.reshape(M * N, 4), qMetaBroadcast.reshape(M * N, 4)).reshape(M, N, 4) # Mx360x4 - qRel = qmt.qrel( - qHandBroadcast.reshape(M * N, 4), qMetaCorr.reshape(M * N, 4) - ).reshape(M, N, 4) + qRel = qmt.qrel(qHandBroadcast.reshape(M * N, 4), qMetaCorr.reshape(M * N, 4)).reshape(M, N, 4) angles = qmt.eulerAngles(qRel.reshape(M * N, 4), convention, True).reshape(M * N, 3) - angleDiff = getAngleDiff( - angles.reshape(M * N, 3), angle1Range, angle2Range, angle3Range - ) + angleDiff = getAngleDiff(angles.reshape(M * N, 3), angle1Range, angle2Range, angle3Range) angleDiff[angleDiff > 0] = 1 angleDiff = angleDiff.astype(bool).reshape(M, N) - if mode == "min": + if mode == 'min': possibleAngles = angleDiff else: possibleAngles = ~angleDiff @@ -1282,14 +1045,10 @@ def getAngleDiff(angles, angle1Range, angle2Range, angle3Range): out = abs(np.minimum(-np.sign(d1[:, 0] * d1[:, 1]) * np.min(np.abs(d1), axis=1), 0)) d2 = angle2Range[np.newaxis, :] - angles[:, 1][:, np.newaxis] - out = out + abs( - np.minimum(-np.sign(d2[:, 0] * d2[:, 1]) * np.min(np.abs(d2), axis=1), 0) - ) + out = out + abs(np.minimum(-np.sign(d2[:, 0] * d2[:, 1]) * np.min(np.abs(d2), axis=1), 0)) d3 = angle3Range[np.newaxis, :] - angles[:, 2][:, np.newaxis] - out = out + abs( - np.minimum(-np.sign(d3[:, 0] * d3[:, 1]) * np.min(np.abs(d3), axis=1), 0) - ) + out = out + abs(np.minimum(-np.sign(d3[:, 0] * d3[:, 1]) * np.min(np.abs(d3), axis=1), 0)) return out @@ -1315,10 +1074,7 @@ def stdProbAngles(val=None, prob=None): def angularMean(angles, probability): # based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_3d.m N = len(angles) - mean = np.arctan2( - 1 / N * sum(probability * np.sin(angles)), - 1 / N * sum(probability * np.cos(angles)), - ) + mean = np.arctan2(1 / N * sum(probability * np.sin(angles)), 1 / N * sum(probability * np.cos(angles))) mean = qmt.wrapTo2Pi(mean) return mean @@ -1332,7 +1088,7 @@ def map_(val, inMin, inMax, outMin, outMax): def movMean1D(x, windowSize): windowSize = np.ones(windowSize) / windowSize x = np.array(x, dtype=float) - xFilt = signal.correlate(x, windowSize, mode="same") + xFilt = signal.correlate(x, windowSize, mode='same') return xFilt @@ -1365,9 +1121,7 @@ def romCost(ROM, convention, q1, q2, delta): return rating -def removeHeadingDriftForStraightWalk( - t, quat, winlength, stepthreshold, movmeanwidth, plot=False, debug=False -): +def removeHeadingDriftForStraightWalk(t, quat, winlength, stepthreshold, movmeanwidth, plot=False, debug=False): """ A function that takes a quaternion with slow heading drift and removes that heading drift by assumimg that the heading remains constant except for a few large steps (turns) by multiples of pi. @@ -1390,14 +1144,14 @@ def removeHeadingDriftForStraightWalk( assert np.allclose(timediff, 1 / rate) qrel = qmt.qmult(quat, qmt.qinv(quat[199, :])) - h = qmt.eulerAngles(qrel, "zyx", intrinsic=True) + h = qmt.eulerAngles(qrel, 'zyx', intrinsic=True) h = np.unwrap(h, axis=0) h = h[:, 0] hsteps1 = np.zeros(h.shape) hsteps2 = np.zeros(h.shape) hsteps3 = np.zeros(h.shape) for i in range(winlength, (h.shape[0] - winlength)): - hsteps1[i] = np.mean(h[i : i + winlength]) - np.mean(h[i - winlength : i]) + hsteps1[i] = np.mean(h[i:i + winlength]) - np.mean(h[i - winlength:i]) hsteps1 = hsteps1 - np.mean(hsteps1[np.abs(hsteps1) < np.pi / 4]) for i in range(winlength, (h.shape[0] - winlength)): @@ -1419,33 +1173,26 @@ def removeHeadingDriftForStraightWalk( hnice[driInd] = drift[driInd] hnice[~driInd] = hstepsfree[~driInd] drift = movMean1D(hnice, movmeanwidth) - corrquat = np.concatenate( - ( - np.cos(-drift / 2)[:, None], - np.sin(-drift / 2)[:, None] * np.array([0, 0, 1]), - ), - axis=1, - ) + corrquat = np.concatenate((np.cos(-drift / 2)[:, None], + np.sin(-drift / 2)[:, None] * np.array([0, 0, 1])), axis=1) quatCorrected = qmt.qmult(corrquat, quat) qrelCorrected = qmt.qmult(quatCorrected, qmt.qinv(quatCorrected[200, :])) - hCorrected = np.unwrap(qmt.eulerAngles(qrelCorrected, "zyx", 1), 30.0 * np.pi) - hOld = np.unwrap(qmt.eulerAngles(qrel, "zyx", 1), 3.0 * np.pi) + hCorrected = np.unwrap(qmt.eulerAngles(qrelCorrected, 'zyx', 1), 30.0 * np.pi) + hOld = np.unwrap(qmt.eulerAngles(qrel, 'zyx', 1), 3.0 * np.pi) if debug or plot: - debugData = dict( - h=h, - hsteps1=hsteps1, - hsteps2=hsteps2, - hsteps3=hsteps3, - hsteps4=hsteps4, - hstepsfree=hstepsfree, - drift=drift, - hnice=hnice, - t=t, - h_corrected=hCorrected[:, 0], - h_old=hOld[:, 0], - ) + debugData = dict(h=h, + hsteps1=hsteps1, + hsteps2=hsteps2, + hsteps3=hsteps3, + hsteps4=hsteps4, + hstepsfree=hstepsfree, + drift=drift, + hnice=hnice, + t=t, + h_corrected=hCorrected[:, 0], + h_old=hOld[:, 0]) if plot: removeHeadingDriftForStraightWalk_debugPlot(debugData, plot) if debug: @@ -1457,35 +1204,35 @@ def removeHeadingDriftForStraightWalk( def removeHeadingDriftForStraightWalk_debugPlot(debug, fig=None): with AutoFigure(fig) as fig: (ax1, ax2), (ax3, ax4) = fig.subplots(2, 2) - fig.suptitle("removeHeadDrift debug plot") - - ax1.plot(debug["t"], np.rad2deg(debug["h"]), ".-") - ax1.plot(debug["t"], np.rad2deg(debug["hsteps1"]), ".-") - ax1.plot(debug["t"], np.rad2deg(debug["hsteps2"]), ".-") - ax1.plot(debug["t"], np.rad2deg(debug["hsteps3"]), ".-") - ax1.legend(["h0", "h1", "h2", "h3"]) - ax1.set_ylabel("[°]") - ax1.set_title("h0~h3") - - ax2.plot(debug["t"], np.rad2deg(debug["h"]), ".-") - ax2.plot(debug["t"], np.rad2deg(debug["hstepsfree"]), ".-") - ax2.plot(debug["t"], np.rad2deg(debug["drift"]), ".-") - ax2.legend(["h", "hstepsfree", "drift"]) - ax2.set_title("h , hstepsfree, drift") - ax2.set_ylabel("[°]") - - ax3.plot(debug["t"], np.rad2deg(debug["hstepsfree"]), ".-") - ax3.plot(debug["t"], np.rad2deg(debug["hnice"]), ".-") - ax3.plot(debug["t"], np.rad2deg(debug["drift"]), ".-") - ax3.legend(["hstepsfree", "hnice", "drift"]) - ax3.set_title("hstepsfree, hnice, drift") - ax3.set_ylabel("[°]") - - ax4.plot(debug["t"], np.rad2deg(debug["h_old"]), ".-") - ax4.plot(debug["t"], np.rad2deg(debug["h_corrected"]), ".-") - ax4.legend(["h_old", "h_corrected"]) - ax4.set_title("eulerAngle z-axis") - ax4.set_ylabel("[°]") + fig.suptitle('removeHeadDrift debug plot') + + ax1.plot(debug['t'], np.rad2deg(debug['h']), '.-') + ax1.plot(debug['t'], np.rad2deg(debug['hsteps1']), '.-') + ax1.plot(debug['t'], np.rad2deg(debug['hsteps2']), '.-') + ax1.plot(debug['t'], np.rad2deg(debug['hsteps3']), '.-') + ax1.legend(['h0', 'h1', 'h2', 'h3']) + ax1.set_ylabel('[°]') + ax1.set_title('h0~h3') + + ax2.plot(debug['t'], np.rad2deg(debug['h']), '.-') + ax2.plot(debug['t'], np.rad2deg(debug['hstepsfree']), '.-') + ax2.plot(debug['t'], np.rad2deg(debug['drift']), '.-') + ax2.legend(['h', 'hstepsfree', 'drift']) + ax2.set_title('h , hstepsfree, drift') + ax2.set_ylabel('[°]') + + ax3.plot(debug['t'], np.rad2deg(debug['hstepsfree']), '.-') + ax3.plot(debug['t'], np.rad2deg(debug['hnice']), '.-') + ax3.plot(debug['t'], np.rad2deg(debug['drift']), '.-') + ax3.legend(['hstepsfree', 'hnice', 'drift']) + ax3.set_title('hstepsfree, hnice, drift') + ax3.set_ylabel('[°]') + + ax4.plot(debug['t'], np.rad2deg(debug['h_old']), '.-') + ax4.plot(debug['t'], np.rad2deg(debug['h_corrected']), '.-') + ax4.legend(['h_old', 'h_corrected']) + ax4.set_title('eulerAngle z-axis') + ax4.set_ylabel('[°]') for ax in (ax1, ax2, ax3, ax4): - ax.grid() + ax.grid() \ No newline at end of file diff --git a/setup.py b/setup.py index 51c72e9..e433d97 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,6 @@ Recompile extension modules python3 setup.py build_ext --inplace -f """ - # SPDX-FileCopyrightText: 2021 Daniel Laidig # # SPDX-License-Identifier: MIT @@ -14,84 +13,69 @@ # https://stackoverflow.com/a/60740179 # (note that even with pyproject.toml this is still useful to make `python setup.py sdist` work out-of-the-box) from setuptools import dist - -dist.Distribution().fetch_build_eggs(["Cython", "numpy"]) +dist.Distribution().fetch_build_eggs(['Cython', 'numpy']) import site import sys - +from setuptools import setup, find_packages from Cython.Build import cythonize import numpy as np -from setuptools import find_packages -from setuptools import setup # workaround for develop mode (pip install -e) with PEP517/pyproject.toml cf. https://github.com/pypa/pip/issues/7953 -site.ENABLE_USER_SITE = "--user" in sys.argv[1:] +site.ENABLE_USER_SITE = '--user' in sys.argv[1:] -ext_modules = cythonize( - [ - "qmt/cpp/oriestimu.pyx", - "qmt/cpp/madgwick.pyx", - "qmt/cpp/quaternion.pyx", - ] -) +ext_modules = cythonize([ + 'qmt/cpp/oriestimu.pyx', + 'qmt/cpp/madgwick.pyx', + 'qmt/cpp/quaternion.pyx', +]) for m in ext_modules: m.include_dirs.insert(0, np.get_include()) setup( - name="qmt", - version="0.3.0", - description="Quaternion-based Inertial Motion Tracking Toolbox", - long_description=open("README.rst", encoding="utf-8").read(), + name='qmt', + version='0.3.0', + + description='Quaternion-based Inertial Motion Tracking Toolbox', + long_description=open('README.rst', encoding='utf-8').read(), long_description_content_type="text/x-rst", - url="https://github.com/dlaidig/qmt/", + url='https://github.com/dlaidig/qmt/', project_urls={ - "Documentation": "https://qmt.readthedocs.io/", + 'Documentation': 'https://qmt.readthedocs.io/', }, - author="Daniel Laidig", - author_email="laidig@control.tu-berlin.de", - license="MIT", + + author='Daniel Laidig', + author_email='laidig@control.tu-berlin.de', + license='MIT', classifiers=[ - "Development Status :: 4 - Beta", - "Intended Audience :: Science/Research", - "License :: OSI Approved :: MIT License", - "Programming Language :: Python :: 3", + 'Development Status :: 4 - Beta', + 'Intended Audience :: Science/Research', + 'License :: OSI Approved :: MIT License', + 'Programming Language :: Python :: 3', ], - packages=find_packages(exclude=["contrib", "docs", "tests"]), + + packages=find_packages(exclude=['contrib', 'docs', 'tests']), include_package_data=True, zip_safe=False, - python_requires=">=3.8", - install_requires=[ - "numpy", - "scipy>=1.8.0", - "matplotlib", - "PyYAML", - "transplant>=0.8.11", # 0.8.11 fixes https://github.com/bastibe/transplant/issues/81 - "aiohttp>=3.8.1", - "aiofiles", - "orjson", - "qasync", - "vqf>=2.0.0", - ], + + python_requires='>=3.8', + install_requires=['numpy', 'scipy>=1.8.0', 'matplotlib', 'PyYAML', + 'transplant>=0.8.11', # 0.8.11 fixes https://github.com/bastibe/transplant/issues/81 + 'aiohttp>=3.8.1', 'aiofiles', 'orjson', 'qasync', 'vqf>=2.0.0'], extras_require={ # pip3 install --user -e ".[dev]" - "dev": [ - "pytest", - "flake8", - "reuse", - "Cython", - "sphinx", - "recommonmark", - "sphinx-rtd-theme", - "sphinxcontrib-matlabdomain", - ], + 'dev': ['pytest', 'flake8', + 'reuse', 'Cython', 'sphinx', 'recommonmark', 'sphinx-rtd-theme', 'sphinxcontrib-matlabdomain'], }, + entry_points={ - "console_scripts": [ - "qmt-webapp = qmt.utils.webapp_cli:main", + 'console_scripts': [ + 'qmt-webapp = qmt.utils.webapp_cli:main', ], }, + ext_modules=ext_modules, + include_dirs=[np.get_include()], -) +) \ No newline at end of file