-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathpda.py
More file actions
181 lines (147 loc) · 6.07 KB
/
pda.py
File metadata and controls
181 lines (147 loc) · 6.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from typing import TypeVar, Optional, Dict, Any, List, Generic
from dataclasses import dataclass, field
import numpy as np
import scipy
import scipy.special
from estimatorduck import StateEstimator
from mixturedata import MixtureParameters
from gaussparams import GaussParams
ET = TypeVar("ET")
@dataclass
class PDA(Generic[ET]): # Probabilistic Data Association
state_filter: StateEstimator[ET]
clutter_intensity: float
PD: float
gate_size: float
def predict(self, filter_state: ET, Ts: float) -> ET:
"""Predict state estimate Ts time units ahead"""
return self.state_filter.predict(filter_state, Ts)
def gate(
self,
# measurements of shape=(M, m)=(#measurements, dim)
Z: np.ndarray,
filter_state: ET,
*,
sensor_state: Optional[Dict[str, Any]] = None,
) -> List[bool]: # gated (m x 1): gated(j) = true if measurement j is within gate
"""Gate/validate measurements: (z-h(x))'S^(-1)(z-h(x)) <= g^2."""
M = Z.shape[0]
g_squared = self.gate_size ** 2
# The loop can be done using ether of these: normal loop, list comprehension or map
gated = list(map(
lambda z: self.state_filter.gate(z, filter_state, gate_size_square=g_squared, sensor_state=sensor_state),
Z))
# debug
# count = np.sum(gated)
# print(f"{count} out of {M} measurements used")
return gated
def loglikelihood_ratios(
self, # measurements of shape=(M, m)=(#measurements, dim)
Z: np.ndarray,
filter_state: ET,
*,
sensor_state: Optional[Dict[str, Any]] = None,
) -> np.ndarray: # shape=(M + 1,), first element for no detection
""" Calculates the posterior event loglikelihood ratios."""
log_PD = np.log(self.PD)
log_PND = np.log(1 - self.PD) # P_ND = 1 - P_D
log_clutter = np.log(self.clutter_intensity)
# allocate
ll = np.empty(Z.shape[0] + 1)
# calculate log likelihood ratios
ll[0] = log_clutter + log_PND
for i, z in enumerate(Z):
ak = i+1
ll[ak] = log_PD + self.state_filter.loglikelihood(z, filter_state, sensor_state=sensor_state)
return ll
def association_probabilities(
self,
# measurements of shape=(M, m)=(#measurements, dim)
Z: np.ndarray,
filter_state: ET,
*,
sensor_state: Optional[Dict[str, Any]] = None,
) -> np.ndarray: # beta, shape=(M + 1,): the association probabilities (normalized likelihood ratios)
"""calculate the poseterior event/association probabilities."""
# log likelihoods
lls = self.loglikelihood_ratios(Z, filter_state, sensor_state=sensor_state)
# lls_normalized[ak] = log(alpha) + lls[ak]
# log(alpha) = -logsumexp(ll[:])
log_a = -scipy.special.logsumexp(lls)
lls_normalized = log_a + lls
# probabilities
beta = np.exp(lls_normalized)
return beta
def conditional_update(
self,
# measurements of shape=(M, m)=(#measurements, dim)
Z: np.ndarray,
filter_state: ET,
*,
sensor_state: Optional[Dict[str, Any]] = None,
) -> List[
ET
]: # Updated filter_state for all association events, first element is no detection
"""Update the state with all possible measurement associations."""
conditional_update = []
conditional_update.append(
filter_state
)
conditional_update.extend(
[self.state_filter.update(z, filter_state, sensor_state=sensor_state) for z in Z]
)
return conditional_update
def reduce_mixture(
self, mixture_filter_state: MixtureParameters[ET]
) -> ET: # the two first moments of the mixture
"""Reduce a Gaussian mixture to a single Gaussian."""
return self.state_filter.reduce_mixture(mixture_filter_state)
def update(
self,
# measurements of shape=(M, m)=(#measurements, dim)
Z: np.ndarray,
filter_state: ET,
*,
sensor_state: Optional[Dict[str, Any]] = None,
) -> ET: # The filter_state updated by approximating the data association
"""
Perform the PDA update cycle.
Gate -> association probabilities -> conditional update -> reduce mixture.
"""
# remove the not gated measurements from consideration
gated = self.gate(Z, filter_state, sensor_state=sensor_state)
Zg = Z[gated]
# find association probabilities
beta = self.association_probabilities(Zg, filter_state, sensor_state=sensor_state)
# find the mixture components
filter_state_updated_mixture_components = self.conditional_update(Zg, filter_state, sensor_state=sensor_state)
# make mixture
filter_state_update_mixture = MixtureParameters(
beta, filter_state_updated_mixture_components
)
# reduce mixture
filter_state_updated_reduced = self.reduce_mixture(filter_state_update_mixture)
return filter_state_updated_reduced
def step(
self,
# measurements of shape=(M, m)=(#measurements, dim)
Z: np.ndarray,
filter_state: ET,
Ts: float,
*,
sensor_state: Optional[Dict[str, Any]] = None,
) -> ET:
"""Perform a predict update cycle with Ts time units and measurements Z in sensor_state"""
filter_state_predicted = self.predict(filter_state, Ts, sensor_state=sensor_state)
filter_state_updated = self.update(Z, filter_state_predicted, sensor_state=sensor_state)
return filter_state_updated
def estimate(self, filter_state: ET) -> GaussParams:
"""Get an estimate with its covariance from the filter state."""
return self.state_filter.estimate(filter_state)
def init_filter_state(
self,
# need to have the type required by the specified state_filter
init_state: "ET_like",
) -> ET:
"""Initialize a filter state to proper form."""
return self.state_filter.init_filter_state(init_state)