Kalman Filter Comparison
This example demonstrates different Kalman filter variants for target tracking.
Overview
This example compares three Kalman filter implementations:
Linear Kalman Filter (KF) - For linear state-space models with Gaussian noise
Extended Kalman Filter (EKF) - Linearizes nonlinear models around current estimate
Unscented Kalman Filter (UKF) - Uses sigma points for better nonlinear approximation
Key Concepts
State estimation: Estimating position and velocity from noisy measurements
Filter consistency: NEES/NIS statistics for filter tuning validation
Measurement models: Linear vs nonlinear (range-bearing) measurements
Process noise: Modeling uncertainty in the motion model
Code Highlights
The example demonstrates:
Creating state transition matrices with
f_constant_velocity()Process noise covariance with
q_constant_velocity()Sigma point generation with
sigma_points_merwe()Filter predict/update cycles with
kf_predict(),kf_update()UKF operations with
ukf_predict(),ukf_update()
Source Code
1"""
2Kalman Filter Comparison Example.
3
4This example demonstrates:
51. Linear Kalman Filter for constant velocity tracking
62. Extended Kalman Filter (EKF) for nonlinear measurements
73. Unscented Kalman Filter (UKF) for highly nonlinear systems
84. Filter consistency checking with NEES/NIS
95. Comparison of filter performance
10
11Run with: python examples/kalman_filter_comparison.py
12"""
13
14import sys
15from pathlib import Path
16
17sys.path.insert(0, str(Path(__file__).parent.parent))
18
19# Output directory for generated plots
20OUTPUT_DIR = Path(__file__).parent.parent / "docs" / "_static" / "images" / "examples"
21OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
22
23from typing import List, Tuple # noqa: E402
24
25import numpy as np # noqa: E402
26import plotly.graph_objects as go # noqa: E402
27from plotly.subplots import make_subplots # noqa: E402
28
29from pytcl.dynamic_estimation import ( # noqa: E402; Linear Kalman Filter; Unscented Kalman Filter
30 kf_predict,
31 kf_update,
32 sigma_points_merwe,
33 ukf_predict,
34 ukf_update,
35)
36from pytcl.dynamic_models import ( # noqa: E402
37 f_constant_velocity,
38 q_constant_velocity,
39)
40
41
42def generate_trajectory(
43 n_steps: int = 100,
44 dt: float = 1.0,
45) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
46 """
47 Generate a 2D constant velocity trajectory with nonlinear measurements.
48
49 Returns
50 -------
51 true_states : ndarray, shape (n_steps, 4)
52 True states [x, vx, y, vy] at each time step.
53 linear_measurements : ndarray, shape (n_steps, 2)
54 Linear measurements [x, y] with noise.
55 nonlinear_measurements : ndarray, shape (n_steps, 2)
56 Nonlinear measurements [range, bearing] with noise.
57 """
58 # Initial state: position (100, 50), velocity (2, 1) m/s
59 x0 = np.array([100.0, 2.0, 50.0, 1.0])
60
61 # State transition
62 F = f_constant_velocity(dt, 2)
63
64 # Generate true trajectory
65 true_states = np.zeros((n_steps, 4))
66 true_states[0] = x0
67
68 for k in range(1, n_steps):
69 true_states[k] = F @ true_states[k - 1]
70
71 # Measurement noise
72 R_linear = np.diag([5.0**2, 5.0**2]) # Position noise std = 5m
73 R_nonlinear = np.diag([10.0**2, np.radians(2.0) ** 2]) # Range 10m, bearing 2 deg
74
75 # Generate measurements
76 linear_measurements = np.zeros((n_steps, 2))
77 nonlinear_measurements = np.zeros((n_steps, 2))
78
79 for k in range(n_steps):
80 # True position
81 x, y = true_states[k, 0], true_states[k, 2]
82
83 # Linear measurement: direct position
84 linear_measurements[k] = np.array([x, y]) + np.random.multivariate_normal(
85 [0, 0], R_linear
86 )
87
88 # Nonlinear measurement: range and bearing from origin
89 r = np.sqrt(x**2 + y**2)
90 theta = np.arctan2(y, x)
91 nonlinear_measurements[k] = np.array(
92 [r, theta]
93 ) + np.random.multivariate_normal([0, 0], R_nonlinear)
94
95 return true_states, linear_measurements, nonlinear_measurements
96
97
98def run_linear_kf(
99 measurements: np.ndarray,
100 dt: float = 1.0,
101 process_noise: float = 0.1,
102) -> Tuple[np.ndarray, np.ndarray, List[float], List[float]]:
103 """
104 Run linear Kalman filter on position measurements.
105
106 Returns state estimates, covariances, NEES values, and NIS values.
107 """
108 n_steps = len(measurements)
109
110 # System matrices
111 F = f_constant_velocity(dt, 2)
112 Q = q_constant_velocity(dt, process_noise, 2)
113 H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]]) # Measure x, y
114 R = np.diag([5.0**2, 5.0**2])
115
116 # Initial state and covariance
117 x = np.array([measurements[0, 0], 0.0, measurements[0, 1], 0.0])
118 P = np.diag([25.0, 10.0, 25.0, 10.0])
119
120 # Storage
121 estimates = np.zeros((n_steps, 4))
122 covariances = np.zeros((n_steps, 4, 4))
123 nis_values = []
124
125 estimates[0] = x
126 covariances[0] = P
127
128 for k in range(1, n_steps):
129 # Predict
130 x, P = kf_predict(x, P, F, Q)
131
132 # Update
133 z = measurements[k]
134 result = kf_update(x, P, z, H, R)
135 x, P = result.x, result.P
136
137 estimates[k] = x
138 covariances[k] = P
139
140 # NIS: innovation squared normalized by innovation covariance
141 nis_val = float(result.y.T @ np.linalg.solve(result.S, result.y))
142 nis_values.append(nis_val)
143
144 return estimates, covariances, nis_values
145
146
147def nonlinear_measurement(x: np.ndarray) -> np.ndarray:
148 """Nonlinear measurement function: h(x) = [range, bearing]."""
149 px, py = x[0], x[2]
150 r = np.sqrt(px**2 + py**2)
151 theta = np.arctan2(py, px)
152 return np.array([r, theta])
153
154
155def measurement_jacobian(x: np.ndarray) -> np.ndarray:
156 """Jacobian of nonlinear measurement function."""
157 px, py = x[0], x[2]
158 r = np.sqrt(px**2 + py**2)
159 r2 = r**2
160
161 # H = dh/dx
162 H = np.zeros((2, 4))
163 H[0, 0] = px / r # dr/dx
164 H[0, 2] = py / r # dr/dy
165 H[1, 0] = -py / r2 # dtheta/dx
166 H[1, 2] = px / r2 # dtheta/dy
167
168 return H
169
170
171def run_ekf(
172 measurements: np.ndarray,
173 dt: float = 1.0,
174 process_noise: float = 0.1,
175) -> Tuple[np.ndarray, np.ndarray, List[float]]:
176 """
177 Run Extended Kalman Filter on range-bearing measurements.
178
179 Returns state estimates, covariances, and NIS values.
180 """
181 n_steps = len(measurements)
182
183 # System matrices (linear dynamics, nonlinear measurements)
184 F = f_constant_velocity(dt, 2)
185 Q = q_constant_velocity(dt, process_noise, 2)
186 R = np.diag([10.0**2, np.radians(2.0) ** 2])
187
188 # Initialize from first measurement
189 r0, theta0 = measurements[0]
190 x0 = r0 * np.cos(theta0)
191 y0 = r0 * np.sin(theta0)
192 x = np.array([x0, 0.0, y0, 0.0])
193 P = np.diag([100.0, 10.0, 100.0, 10.0])
194
195 # Storage
196 estimates = np.zeros((n_steps, 4))
197 covariances = np.zeros((n_steps, 4, 4))
198 nis_values = []
199
200 estimates[0] = x
201 covariances[0] = P
202
203 for k in range(1, n_steps):
204 # Predict (linear dynamics - use standard KF predict)
205 x, P = kf_predict(x, P, F, Q)
206
207 # Update with nonlinear measurement (manual EKF update)
208 z = measurements[k]
209 H = measurement_jacobian(x)
210 z_pred = nonlinear_measurement(x)
211
212 # Angle wrapping for bearing innovation
213 y = z - z_pred
214 y[1] = np.arctan2(np.sin(y[1]), np.cos(y[1])) # Wrap to [-pi, pi]
215
216 # Innovation covariance
217 S = H @ P @ H.T + R
218
219 # Kalman gain
220 K = P @ H.T @ np.linalg.inv(S)
221
222 # Update
223 x = x + K @ y
224 P = (np.eye(4) - K @ H) @ P
225
226 estimates[k] = x
227 covariances[k] = P
228
229 # NIS
230 nis_val = float(y.T @ np.linalg.solve(S, y))
231 nis_values.append(nis_val)
232
233 return estimates, covariances, nis_values
234
235
236def run_ukf(
237 measurements: np.ndarray,
238 dt: float = 1.0,
239 process_noise: float = 0.1,
240) -> Tuple[np.ndarray, np.ndarray, List[float]]:
241 """
242 Run Unscented Kalman Filter on range-bearing measurements.
243
244 Returns state estimates, covariances, and NIS values.
245 """
246 n_steps = len(measurements)
247
248 # System matrices
249 F = f_constant_velocity(dt, 2)
250 Q = q_constant_velocity(dt, process_noise, 2)
251 R = np.diag([10.0**2, np.radians(2.0) ** 2])
252
253 # UKF parameters
254 alpha = 1e-3
255 beta = 2.0
256 kappa = 0.0
257
258 # Initialize from first measurement
259 r0, theta0 = measurements[0]
260 x0 = r0 * np.cos(theta0)
261 y0 = r0 * np.sin(theta0)
262 x = np.array([x0, 0.0, y0, 0.0])
263 P = np.diag([100.0, 10.0, 100.0, 10.0])
264
265 # Storage
266 estimates = np.zeros((n_steps, 4))
267 covariances = np.zeros((n_steps, 4, 4))
268 nis_values = []
269
270 estimates[0] = x
271 covariances[0] = P
272
273 # State transition function (linear, but UKF uses it as a function)
274 def f(x):
275 return F @ x
276
277 for k in range(1, n_steps):
278 # Generate sigma points
279 sigma_pts, Wm, Wc = sigma_points_merwe(x, P, alpha, beta, kappa)
280
281 # Predict step using UKF
282 x, P = ukf_predict(x, P, f, Q, alpha, beta, kappa)
283
284 # Update with nonlinear measurement
285 z = measurements[k]
286 result = ukf_update(
287 x,
288 P,
289 z,
290 nonlinear_measurement,
291 R,
292 alpha,
293 beta,
294 kappa,
295 )
296 x, P = result.x, result.P
297
298 estimates[k] = x
299 covariances[k] = P
300
301 # NIS
302 nis_val = float(result.y.T @ np.linalg.solve(result.S, result.y))
303 nis_values.append(nis_val)
304
305 return estimates, covariances, nis_values
306
307
308def compute_metrics(
309 true_states: np.ndarray,
310 estimates: np.ndarray,
311 covariances: np.ndarray,
312) -> dict:
313 """Compute performance metrics."""
314 # Position RMSE
315 pos_errors = estimates[:, [0, 2]] - true_states[:, [0, 2]]
316 pos_rmse = np.sqrt(np.mean(np.sum(pos_errors**2, axis=1)))
317
318 # Velocity RMSE
319 vel_errors = estimates[:, [1, 3]] - true_states[:, [1, 3]]
320 vel_rmse = np.sqrt(np.mean(np.sum(vel_errors**2, axis=1)))
321
322 # NEES (Normalized Estimation Error Squared)
323 nees_values = []
324 for k in range(len(true_states)):
325 err = estimates[k] - true_states[k]
326 P = covariances[k]
327 nees_val = float(err.T @ np.linalg.solve(P, err))
328 nees_values.append(nees_val)
329
330 avg_nees = np.mean(nees_values)
331
332 return {
333 "pos_rmse": pos_rmse,
334 "vel_rmse": vel_rmse,
335 "avg_nees": avg_nees,
336 "nees_values": nees_values,
337 }
338
339
340def plot_results(
341 true_states: np.ndarray,
342 linear_meas: np.ndarray,
343 kf_est: np.ndarray,
344 ekf_est: np.ndarray,
345 ukf_est: np.ndarray,
346 kf_metrics: dict,
347 ekf_metrics: dict,
348 ukf_metrics: dict,
349) -> None:
350 """Create comparison plots."""
351 fig = make_subplots(
352 rows=2,
353 cols=2,
354 subplot_titles=(
355 "Trajectory Comparison",
356 "Position Error Over Time",
357 "NEES Comparison",
358 "Filter Performance Summary",
359 ),
360 )
361
362 # Trajectory plot
363 fig.add_trace(
364 go.Scatter(
365 x=true_states[:, 0],
366 y=true_states[:, 2],
367 mode="lines",
368 name="True",
369 line=dict(color="black", width=2),
370 ),
371 row=1,
372 col=1,
373 )
374 fig.add_trace(
375 go.Scatter(
376 x=linear_meas[:, 0],
377 y=linear_meas[:, 1],
378 mode="markers",
379 name="Measurements",
380 marker=dict(color="gray", size=3, opacity=0.5),
381 ),
382 row=1,
383 col=1,
384 )
385 fig.add_trace(
386 go.Scatter(
387 x=kf_est[:, 0],
388 y=kf_est[:, 2],
389 mode="lines",
390 name="KF",
391 line=dict(color="blue", width=1.5),
392 ),
393 row=1,
394 col=1,
395 )
396 fig.add_trace(
397 go.Scatter(
398 x=ekf_est[:, 0],
399 y=ekf_est[:, 2],
400 mode="lines",
401 name="EKF",
402 line=dict(color="red", width=1.5),
403 ),
404 row=1,
405 col=1,
406 )
407 fig.add_trace(
408 go.Scatter(
409 x=ukf_est[:, 0],
410 y=ukf_est[:, 2],
411 mode="lines",
412 name="UKF",
413 line=dict(color="green", width=1.5),
414 ),
415 row=1,
416 col=1,
417 )
418
419 # Position error over time
420 time = np.arange(len(true_states))
421 kf_pos_err = np.sqrt(
422 (kf_est[:, 0] - true_states[:, 0]) ** 2
423 + (kf_est[:, 2] - true_states[:, 2]) ** 2
424 )
425 ekf_pos_err = np.sqrt(
426 (ekf_est[:, 0] - true_states[:, 0]) ** 2
427 + (ekf_est[:, 2] - true_states[:, 2]) ** 2
428 )
429 ukf_pos_err = np.sqrt(
430 (ukf_est[:, 0] - true_states[:, 0]) ** 2
431 + (ukf_est[:, 2] - true_states[:, 2]) ** 2
432 )
433
434 fig.add_trace(
435 go.Scatter(x=time, y=kf_pos_err, name="KF Error", line=dict(color="blue")),
436 row=1,
437 col=2,
438 )
439 fig.add_trace(
440 go.Scatter(x=time, y=ekf_pos_err, name="EKF Error", line=dict(color="red")),
441 row=1,
442 col=2,
443 )
444 fig.add_trace(
445 go.Scatter(x=time, y=ukf_pos_err, name="UKF Error", line=dict(color="green")),
446 row=1,
447 col=2,
448 )
449
450 # NEES comparison
451 fig.add_trace(
452 go.Scatter(
453 x=time,
454 y=kf_metrics["nees_values"],
455 name="KF NEES",
456 line=dict(color="blue"),
457 ),
458 row=2,
459 col=1,
460 )
461 fig.add_trace(
462 go.Scatter(
463 x=time,
464 y=ekf_metrics["nees_values"],
465 name="EKF NEES",
466 line=dict(color="red"),
467 ),
468 row=2,
469 col=1,
470 )
471 fig.add_trace(
472 go.Scatter(
473 x=time,
474 y=ukf_metrics["nees_values"],
475 name="UKF NEES",
476 line=dict(color="green"),
477 ),
478 row=2,
479 col=1,
480 )
481 # NEES expected value line (state dimension = 4)
482 fig.add_trace(
483 go.Scatter(
484 x=[0, len(time)],
485 y=[4, 4],
486 name="Expected NEES",
487 line=dict(color="black", dash="dash"),
488 ),
489 row=2,
490 col=1,
491 )
492
493 # Summary bar chart
494 filters = ["KF (linear)", "EKF", "UKF"]
495 pos_rmses = [
496 kf_metrics["pos_rmse"],
497 ekf_metrics["pos_rmse"],
498 ukf_metrics["pos_rmse"],
499 ]
500
501 fig.add_trace(
502 go.Bar(
503 x=filters,
504 y=pos_rmses,
505 name="Position RMSE (m)",
506 marker_color=["blue", "red", "green"],
507 ),
508 row=2,
509 col=2,
510 )
511
512 fig.update_layout(
513 title="Kalman Filter Comparison: KF vs EKF vs UKF",
514 height=800,
515 width=1200,
516 showlegend=True,
517 )
518
519 fig.update_xaxes(title_text="X Position (m)", row=1, col=1)
520 fig.update_yaxes(title_text="Y Position (m)", row=1, col=1)
521 fig.update_xaxes(title_text="Time Step", row=1, col=2)
522 fig.update_yaxes(title_text="Position Error (m)", row=1, col=2)
523 fig.update_xaxes(title_text="Time Step", row=2, col=1)
524 fig.update_yaxes(title_text="NEES", row=2, col=1)
525 fig.update_xaxes(title_text="Filter Type", row=2, col=2)
526 fig.update_yaxes(title_text="RMSE (m)", row=2, col=2)
527
528 output_path = OUTPUT_DIR / "kalman_filter_comparison.html"
529 fig.write_html(str(output_path))
530 print(f"\nInteractive plot saved to {output_path}")
531 fig.show()
532
533
534def main() -> None:
535 """Run Kalman filter comparison."""
536 print("Kalman Filter Comparison Example")
537 print("=" * 60)
538
539 np.random.seed(42)
540
541 # Generate trajectory and measurements
542 print("\nGenerating trajectory and measurements...")
543 n_steps = 100
544 dt = 1.0
545 true_states, linear_meas, nonlinear_meas = generate_trajectory(n_steps, dt)
546
547 print(f" {n_steps} time steps, dt = {dt}s")
548 print(f" Initial position: ({true_states[0, 0]:.1f}, {true_states[0, 2]:.1f}) m")
549 print(f" Final position: ({true_states[-1, 0]:.1f}, {true_states[-1, 2]:.1f}) m")
550
551 # Run filters
552 print("\nRunning filters...")
553
554 print(" Linear Kalman Filter (position measurements)...")
555 kf_est, kf_cov, kf_nis = run_linear_kf(linear_meas, dt)
556
557 print(" Extended Kalman Filter (range-bearing measurements)...")
558 ekf_est, ekf_cov, ekf_nis = run_ekf(nonlinear_meas, dt)
559
560 print(" Unscented Kalman Filter (range-bearing measurements)...")
561 ukf_est, ukf_cov, ukf_nis = run_ukf(nonlinear_meas, dt)
562
563 # Compute metrics
564 print("\nComputing performance metrics...")
565 kf_metrics = compute_metrics(true_states, kf_est, kf_cov)
566 ekf_metrics = compute_metrics(true_states, ekf_est, ekf_cov)
567 ukf_metrics = compute_metrics(true_states, ukf_est, ukf_cov)
568
569 # Print results
570 print("\n" + "=" * 60)
571 print("RESULTS")
572 print("=" * 60)
573
574 print("\nLinear Kalman Filter (with position measurements):")
575 print(f" Position RMSE: {kf_metrics['pos_rmse']:.2f} m")
576 print(f" Velocity RMSE: {kf_metrics['vel_rmse']:.2f} m/s")
577 print(f" Average NEES: {kf_metrics['avg_nees']:.2f} (expected: 4.0)")
578
579 print("\nExtended Kalman Filter (with range-bearing measurements):")
580 print(f" Position RMSE: {ekf_metrics['pos_rmse']:.2f} m")
581 print(f" Velocity RMSE: {ekf_metrics['vel_rmse']:.2f} m/s")
582 print(f" Average NEES: {ekf_metrics['avg_nees']:.2f} (expected: 4.0)")
583
584 print("\nUnscented Kalman Filter (with range-bearing measurements):")
585 print(f" Position RMSE: {ukf_metrics['pos_rmse']:.2f} m")
586 print(f" Velocity RMSE: {ukf_metrics['vel_rmse']:.2f} m/s")
587 print(f" Average NEES: {ukf_metrics['avg_nees']:.2f} (expected: 4.0)")
588
589 print("\n" + "-" * 60)
590 print("Note: KF uses linear (x,y) measurements, while EKF/UKF use")
591 print("nonlinear (range, bearing) measurements. EKF and UKF should")
592 print("perform similarly for this mildly nonlinear problem.")
593 print("-" * 60)
594
595 # Plot results
596 try:
597 plot_results(
598 true_states,
599 linear_meas,
600 kf_est,
601 ekf_est,
602 ukf_est,
603 kf_metrics,
604 ekf_metrics,
605 ukf_metrics,
606 )
607 except Exception as e:
608 print(f"\nCould not generate plot: {e}")
609
610 print("\nDone!")
611
612
613if __name__ == "__main__":
614 main()
Running the Example
python examples/kalman_filter_comparison.py
See Also
Filter Uncertainty Visualization - Covariance ellipse visualization
Advanced Filters Comparison - EKF, Gaussian Sum, Rao-Blackwellized PF
Smoothers and Information Filters - RTS smoother and information filters