Kalman Filter Comparison

This example demonstrates different Kalman filter variants for target tracking.

Overview

This example compares three Kalman filter implementations:

  1. Linear Kalman Filter (KF) - For linear state-space models with Gaussian noise

  2. Extended Kalman Filter (EKF) - Linearizes nonlinear models around current estimate

  3. Unscented Kalman Filter (UKF) - Uses sigma points for better nonlinear approximation

Key Concepts

  • State estimation: Estimating position and velocity from noisy measurements

  • Filter consistency: NEES/NIS statistics for filter tuning validation

  • Measurement models: Linear vs nonlinear (range-bearing) measurements

  • Process noise: Modeling uncertainty in the motion model

Code Highlights

The example demonstrates:

  • Creating state transition matrices with f_constant_velocity()

  • Process noise covariance with q_constant_velocity()

  • Sigma point generation with sigma_points_merwe()

  • Filter predict/update cycles with kf_predict(), kf_update()

  • UKF operations with ukf_predict(), ukf_update()

Source Code

  1"""
  2Kalman Filter Comparison Example.
  3
  4This example demonstrates:
  51. Linear Kalman Filter for constant velocity tracking
  62. Extended Kalman Filter (EKF) for nonlinear measurements
  73. Unscented Kalman Filter (UKF) for highly nonlinear systems
  84. Filter consistency checking with NEES/NIS
  95. Comparison of filter performance
 10
 11Run with: python examples/kalman_filter_comparison.py
 12"""
 13
 14import sys
 15from pathlib import Path
 16
 17sys.path.insert(0, str(Path(__file__).parent.parent))
 18
 19# Output directory for generated plots
 20OUTPUT_DIR = Path(__file__).parent.parent / "docs" / "_static" / "images" / "examples"
 21OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
 22
 23from typing import List, Tuple  # noqa: E402
 24
 25import numpy as np  # noqa: E402
 26import plotly.graph_objects as go  # noqa: E402
 27from plotly.subplots import make_subplots  # noqa: E402
 28
 29from pytcl.dynamic_estimation import (  # noqa: E402; Linear Kalman Filter; Unscented Kalman Filter
 30    kf_predict,
 31    kf_update,
 32    sigma_points_merwe,
 33    ukf_predict,
 34    ukf_update,
 35)
 36from pytcl.dynamic_models import (  # noqa: E402
 37    f_constant_velocity,
 38    q_constant_velocity,
 39)
 40
 41
 42def generate_trajectory(
 43    n_steps: int = 100,
 44    dt: float = 1.0,
 45) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
 46    """
 47    Generate a 2D constant velocity trajectory with nonlinear measurements.
 48
 49    Returns
 50    -------
 51    true_states : ndarray, shape (n_steps, 4)
 52        True states [x, vx, y, vy] at each time step.
 53    linear_measurements : ndarray, shape (n_steps, 2)
 54        Linear measurements [x, y] with noise.
 55    nonlinear_measurements : ndarray, shape (n_steps, 2)
 56        Nonlinear measurements [range, bearing] with noise.
 57    """
 58    # Initial state: position (100, 50), velocity (2, 1) m/s
 59    x0 = np.array([100.0, 2.0, 50.0, 1.0])
 60
 61    # State transition
 62    F = f_constant_velocity(dt, 2)
 63
 64    # Generate true trajectory
 65    true_states = np.zeros((n_steps, 4))
 66    true_states[0] = x0
 67
 68    for k in range(1, n_steps):
 69        true_states[k] = F @ true_states[k - 1]
 70
 71    # Measurement noise
 72    R_linear = np.diag([5.0**2, 5.0**2])  # Position noise std = 5m
 73    R_nonlinear = np.diag([10.0**2, np.radians(2.0) ** 2])  # Range 10m, bearing 2 deg
 74
 75    # Generate measurements
 76    linear_measurements = np.zeros((n_steps, 2))
 77    nonlinear_measurements = np.zeros((n_steps, 2))
 78
 79    for k in range(n_steps):
 80        # True position
 81        x, y = true_states[k, 0], true_states[k, 2]
 82
 83        # Linear measurement: direct position
 84        linear_measurements[k] = np.array([x, y]) + np.random.multivariate_normal(
 85            [0, 0], R_linear
 86        )
 87
 88        # Nonlinear measurement: range and bearing from origin
 89        r = np.sqrt(x**2 + y**2)
 90        theta = np.arctan2(y, x)
 91        nonlinear_measurements[k] = np.array(
 92            [r, theta]
 93        ) + np.random.multivariate_normal([0, 0], R_nonlinear)
 94
 95    return true_states, linear_measurements, nonlinear_measurements
 96
 97
 98def run_linear_kf(
 99    measurements: np.ndarray,
100    dt: float = 1.0,
101    process_noise: float = 0.1,
102) -> Tuple[np.ndarray, np.ndarray, List[float], List[float]]:
103    """
104    Run linear Kalman filter on position measurements.
105
106    Returns state estimates, covariances, NEES values, and NIS values.
107    """
108    n_steps = len(measurements)
109
110    # System matrices
111    F = f_constant_velocity(dt, 2)
112    Q = q_constant_velocity(dt, process_noise, 2)
113    H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]])  # Measure x, y
114    R = np.diag([5.0**2, 5.0**2])
115
116    # Initial state and covariance
117    x = np.array([measurements[0, 0], 0.0, measurements[0, 1], 0.0])
118    P = np.diag([25.0, 10.0, 25.0, 10.0])
119
120    # Storage
121    estimates = np.zeros((n_steps, 4))
122    covariances = np.zeros((n_steps, 4, 4))
123    nis_values = []
124
125    estimates[0] = x
126    covariances[0] = P
127
128    for k in range(1, n_steps):
129        # Predict
130        x, P = kf_predict(x, P, F, Q)
131
132        # Update
133        z = measurements[k]
134        result = kf_update(x, P, z, H, R)
135        x, P = result.x, result.P
136
137        estimates[k] = x
138        covariances[k] = P
139
140        # NIS: innovation squared normalized by innovation covariance
141        nis_val = float(result.y.T @ np.linalg.solve(result.S, result.y))
142        nis_values.append(nis_val)
143
144    return estimates, covariances, nis_values
145
146
147def nonlinear_measurement(x: np.ndarray) -> np.ndarray:
148    """Nonlinear measurement function: h(x) = [range, bearing]."""
149    px, py = x[0], x[2]
150    r = np.sqrt(px**2 + py**2)
151    theta = np.arctan2(py, px)
152    return np.array([r, theta])
153
154
155def measurement_jacobian(x: np.ndarray) -> np.ndarray:
156    """Jacobian of nonlinear measurement function."""
157    px, py = x[0], x[2]
158    r = np.sqrt(px**2 + py**2)
159    r2 = r**2
160
161    # H = dh/dx
162    H = np.zeros((2, 4))
163    H[0, 0] = px / r  # dr/dx
164    H[0, 2] = py / r  # dr/dy
165    H[1, 0] = -py / r2  # dtheta/dx
166    H[1, 2] = px / r2  # dtheta/dy
167
168    return H
169
170
171def run_ekf(
172    measurements: np.ndarray,
173    dt: float = 1.0,
174    process_noise: float = 0.1,
175) -> Tuple[np.ndarray, np.ndarray, List[float]]:
176    """
177    Run Extended Kalman Filter on range-bearing measurements.
178
179    Returns state estimates, covariances, and NIS values.
180    """
181    n_steps = len(measurements)
182
183    # System matrices (linear dynamics, nonlinear measurements)
184    F = f_constant_velocity(dt, 2)
185    Q = q_constant_velocity(dt, process_noise, 2)
186    R = np.diag([10.0**2, np.radians(2.0) ** 2])
187
188    # Initialize from first measurement
189    r0, theta0 = measurements[0]
190    x0 = r0 * np.cos(theta0)
191    y0 = r0 * np.sin(theta0)
192    x = np.array([x0, 0.0, y0, 0.0])
193    P = np.diag([100.0, 10.0, 100.0, 10.0])
194
195    # Storage
196    estimates = np.zeros((n_steps, 4))
197    covariances = np.zeros((n_steps, 4, 4))
198    nis_values = []
199
200    estimates[0] = x
201    covariances[0] = P
202
203    for k in range(1, n_steps):
204        # Predict (linear dynamics - use standard KF predict)
205        x, P = kf_predict(x, P, F, Q)
206
207        # Update with nonlinear measurement (manual EKF update)
208        z = measurements[k]
209        H = measurement_jacobian(x)
210        z_pred = nonlinear_measurement(x)
211
212        # Angle wrapping for bearing innovation
213        y = z - z_pred
214        y[1] = np.arctan2(np.sin(y[1]), np.cos(y[1]))  # Wrap to [-pi, pi]
215
216        # Innovation covariance
217        S = H @ P @ H.T + R
218
219        # Kalman gain
220        K = P @ H.T @ np.linalg.inv(S)
221
222        # Update
223        x = x + K @ y
224        P = (np.eye(4) - K @ H) @ P
225
226        estimates[k] = x
227        covariances[k] = P
228
229        # NIS
230        nis_val = float(y.T @ np.linalg.solve(S, y))
231        nis_values.append(nis_val)
232
233    return estimates, covariances, nis_values
234
235
236def run_ukf(
237    measurements: np.ndarray,
238    dt: float = 1.0,
239    process_noise: float = 0.1,
240) -> Tuple[np.ndarray, np.ndarray, List[float]]:
241    """
242    Run Unscented Kalman Filter on range-bearing measurements.
243
244    Returns state estimates, covariances, and NIS values.
245    """
246    n_steps = len(measurements)
247
248    # System matrices
249    F = f_constant_velocity(dt, 2)
250    Q = q_constant_velocity(dt, process_noise, 2)
251    R = np.diag([10.0**2, np.radians(2.0) ** 2])
252
253    # UKF parameters
254    alpha = 1e-3
255    beta = 2.0
256    kappa = 0.0
257
258    # Initialize from first measurement
259    r0, theta0 = measurements[0]
260    x0 = r0 * np.cos(theta0)
261    y0 = r0 * np.sin(theta0)
262    x = np.array([x0, 0.0, y0, 0.0])
263    P = np.diag([100.0, 10.0, 100.0, 10.0])
264
265    # Storage
266    estimates = np.zeros((n_steps, 4))
267    covariances = np.zeros((n_steps, 4, 4))
268    nis_values = []
269
270    estimates[0] = x
271    covariances[0] = P
272
273    # State transition function (linear, but UKF uses it as a function)
274    def f(x):
275        return F @ x
276
277    for k in range(1, n_steps):
278        # Generate sigma points
279        sigma_pts, Wm, Wc = sigma_points_merwe(x, P, alpha, beta, kappa)
280
281        # Predict step using UKF
282        x, P = ukf_predict(x, P, f, Q, alpha, beta, kappa)
283
284        # Update with nonlinear measurement
285        z = measurements[k]
286        result = ukf_update(
287            x,
288            P,
289            z,
290            nonlinear_measurement,
291            R,
292            alpha,
293            beta,
294            kappa,
295        )
296        x, P = result.x, result.P
297
298        estimates[k] = x
299        covariances[k] = P
300
301        # NIS
302        nis_val = float(result.y.T @ np.linalg.solve(result.S, result.y))
303        nis_values.append(nis_val)
304
305    return estimates, covariances, nis_values
306
307
308def compute_metrics(
309    true_states: np.ndarray,
310    estimates: np.ndarray,
311    covariances: np.ndarray,
312) -> dict:
313    """Compute performance metrics."""
314    # Position RMSE
315    pos_errors = estimates[:, [0, 2]] - true_states[:, [0, 2]]
316    pos_rmse = np.sqrt(np.mean(np.sum(pos_errors**2, axis=1)))
317
318    # Velocity RMSE
319    vel_errors = estimates[:, [1, 3]] - true_states[:, [1, 3]]
320    vel_rmse = np.sqrt(np.mean(np.sum(vel_errors**2, axis=1)))
321
322    # NEES (Normalized Estimation Error Squared)
323    nees_values = []
324    for k in range(len(true_states)):
325        err = estimates[k] - true_states[k]
326        P = covariances[k]
327        nees_val = float(err.T @ np.linalg.solve(P, err))
328        nees_values.append(nees_val)
329
330    avg_nees = np.mean(nees_values)
331
332    return {
333        "pos_rmse": pos_rmse,
334        "vel_rmse": vel_rmse,
335        "avg_nees": avg_nees,
336        "nees_values": nees_values,
337    }
338
339
340def plot_results(
341    true_states: np.ndarray,
342    linear_meas: np.ndarray,
343    kf_est: np.ndarray,
344    ekf_est: np.ndarray,
345    ukf_est: np.ndarray,
346    kf_metrics: dict,
347    ekf_metrics: dict,
348    ukf_metrics: dict,
349) -> None:
350    """Create comparison plots."""
351    fig = make_subplots(
352        rows=2,
353        cols=2,
354        subplot_titles=(
355            "Trajectory Comparison",
356            "Position Error Over Time",
357            "NEES Comparison",
358            "Filter Performance Summary",
359        ),
360    )
361
362    # Trajectory plot
363    fig.add_trace(
364        go.Scatter(
365            x=true_states[:, 0],
366            y=true_states[:, 2],
367            mode="lines",
368            name="True",
369            line=dict(color="black", width=2),
370        ),
371        row=1,
372        col=1,
373    )
374    fig.add_trace(
375        go.Scatter(
376            x=linear_meas[:, 0],
377            y=linear_meas[:, 1],
378            mode="markers",
379            name="Measurements",
380            marker=dict(color="gray", size=3, opacity=0.5),
381        ),
382        row=1,
383        col=1,
384    )
385    fig.add_trace(
386        go.Scatter(
387            x=kf_est[:, 0],
388            y=kf_est[:, 2],
389            mode="lines",
390            name="KF",
391            line=dict(color="blue", width=1.5),
392        ),
393        row=1,
394        col=1,
395    )
396    fig.add_trace(
397        go.Scatter(
398            x=ekf_est[:, 0],
399            y=ekf_est[:, 2],
400            mode="lines",
401            name="EKF",
402            line=dict(color="red", width=1.5),
403        ),
404        row=1,
405        col=1,
406    )
407    fig.add_trace(
408        go.Scatter(
409            x=ukf_est[:, 0],
410            y=ukf_est[:, 2],
411            mode="lines",
412            name="UKF",
413            line=dict(color="green", width=1.5),
414        ),
415        row=1,
416        col=1,
417    )
418
419    # Position error over time
420    time = np.arange(len(true_states))
421    kf_pos_err = np.sqrt(
422        (kf_est[:, 0] - true_states[:, 0]) ** 2
423        + (kf_est[:, 2] - true_states[:, 2]) ** 2
424    )
425    ekf_pos_err = np.sqrt(
426        (ekf_est[:, 0] - true_states[:, 0]) ** 2
427        + (ekf_est[:, 2] - true_states[:, 2]) ** 2
428    )
429    ukf_pos_err = np.sqrt(
430        (ukf_est[:, 0] - true_states[:, 0]) ** 2
431        + (ukf_est[:, 2] - true_states[:, 2]) ** 2
432    )
433
434    fig.add_trace(
435        go.Scatter(x=time, y=kf_pos_err, name="KF Error", line=dict(color="blue")),
436        row=1,
437        col=2,
438    )
439    fig.add_trace(
440        go.Scatter(x=time, y=ekf_pos_err, name="EKF Error", line=dict(color="red")),
441        row=1,
442        col=2,
443    )
444    fig.add_trace(
445        go.Scatter(x=time, y=ukf_pos_err, name="UKF Error", line=dict(color="green")),
446        row=1,
447        col=2,
448    )
449
450    # NEES comparison
451    fig.add_trace(
452        go.Scatter(
453            x=time,
454            y=kf_metrics["nees_values"],
455            name="KF NEES",
456            line=dict(color="blue"),
457        ),
458        row=2,
459        col=1,
460    )
461    fig.add_trace(
462        go.Scatter(
463            x=time,
464            y=ekf_metrics["nees_values"],
465            name="EKF NEES",
466            line=dict(color="red"),
467        ),
468        row=2,
469        col=1,
470    )
471    fig.add_trace(
472        go.Scatter(
473            x=time,
474            y=ukf_metrics["nees_values"],
475            name="UKF NEES",
476            line=dict(color="green"),
477        ),
478        row=2,
479        col=1,
480    )
481    # NEES expected value line (state dimension = 4)
482    fig.add_trace(
483        go.Scatter(
484            x=[0, len(time)],
485            y=[4, 4],
486            name="Expected NEES",
487            line=dict(color="black", dash="dash"),
488        ),
489        row=2,
490        col=1,
491    )
492
493    # Summary bar chart
494    filters = ["KF (linear)", "EKF", "UKF"]
495    pos_rmses = [
496        kf_metrics["pos_rmse"],
497        ekf_metrics["pos_rmse"],
498        ukf_metrics["pos_rmse"],
499    ]
500
501    fig.add_trace(
502        go.Bar(
503            x=filters,
504            y=pos_rmses,
505            name="Position RMSE (m)",
506            marker_color=["blue", "red", "green"],
507        ),
508        row=2,
509        col=2,
510    )
511
512    fig.update_layout(
513        title="Kalman Filter Comparison: KF vs EKF vs UKF",
514        height=800,
515        width=1200,
516        showlegend=True,
517    )
518
519    fig.update_xaxes(title_text="X Position (m)", row=1, col=1)
520    fig.update_yaxes(title_text="Y Position (m)", row=1, col=1)
521    fig.update_xaxes(title_text="Time Step", row=1, col=2)
522    fig.update_yaxes(title_text="Position Error (m)", row=1, col=2)
523    fig.update_xaxes(title_text="Time Step", row=2, col=1)
524    fig.update_yaxes(title_text="NEES", row=2, col=1)
525    fig.update_xaxes(title_text="Filter Type", row=2, col=2)
526    fig.update_yaxes(title_text="RMSE (m)", row=2, col=2)
527
528    output_path = OUTPUT_DIR / "kalman_filter_comparison.html"
529    fig.write_html(str(output_path))
530    print(f"\nInteractive plot saved to {output_path}")
531    fig.show()
532
533
534def main() -> None:
535    """Run Kalman filter comparison."""
536    print("Kalman Filter Comparison Example")
537    print("=" * 60)
538
539    np.random.seed(42)
540
541    # Generate trajectory and measurements
542    print("\nGenerating trajectory and measurements...")
543    n_steps = 100
544    dt = 1.0
545    true_states, linear_meas, nonlinear_meas = generate_trajectory(n_steps, dt)
546
547    print(f"  {n_steps} time steps, dt = {dt}s")
548    print(f"  Initial position: ({true_states[0, 0]:.1f}, {true_states[0, 2]:.1f}) m")
549    print(f"  Final position: ({true_states[-1, 0]:.1f}, {true_states[-1, 2]:.1f}) m")
550
551    # Run filters
552    print("\nRunning filters...")
553
554    print("  Linear Kalman Filter (position measurements)...")
555    kf_est, kf_cov, kf_nis = run_linear_kf(linear_meas, dt)
556
557    print("  Extended Kalman Filter (range-bearing measurements)...")
558    ekf_est, ekf_cov, ekf_nis = run_ekf(nonlinear_meas, dt)
559
560    print("  Unscented Kalman Filter (range-bearing measurements)...")
561    ukf_est, ukf_cov, ukf_nis = run_ukf(nonlinear_meas, dt)
562
563    # Compute metrics
564    print("\nComputing performance metrics...")
565    kf_metrics = compute_metrics(true_states, kf_est, kf_cov)
566    ekf_metrics = compute_metrics(true_states, ekf_est, ekf_cov)
567    ukf_metrics = compute_metrics(true_states, ukf_est, ukf_cov)
568
569    # Print results
570    print("\n" + "=" * 60)
571    print("RESULTS")
572    print("=" * 60)
573
574    print("\nLinear Kalman Filter (with position measurements):")
575    print(f"  Position RMSE: {kf_metrics['pos_rmse']:.2f} m")
576    print(f"  Velocity RMSE: {kf_metrics['vel_rmse']:.2f} m/s")
577    print(f"  Average NEES:  {kf_metrics['avg_nees']:.2f} (expected: 4.0)")
578
579    print("\nExtended Kalman Filter (with range-bearing measurements):")
580    print(f"  Position RMSE: {ekf_metrics['pos_rmse']:.2f} m")
581    print(f"  Velocity RMSE: {ekf_metrics['vel_rmse']:.2f} m/s")
582    print(f"  Average NEES:  {ekf_metrics['avg_nees']:.2f} (expected: 4.0)")
583
584    print("\nUnscented Kalman Filter (with range-bearing measurements):")
585    print(f"  Position RMSE: {ukf_metrics['pos_rmse']:.2f} m")
586    print(f"  Velocity RMSE: {ukf_metrics['vel_rmse']:.2f} m/s")
587    print(f"  Average NEES:  {ukf_metrics['avg_nees']:.2f} (expected: 4.0)")
588
589    print("\n" + "-" * 60)
590    print("Note: KF uses linear (x,y) measurements, while EKF/UKF use")
591    print("nonlinear (range, bearing) measurements. EKF and UKF should")
592    print("perform similarly for this mildly nonlinear problem.")
593    print("-" * 60)
594
595    # Plot results
596    try:
597        plot_results(
598            true_states,
599            linear_meas,
600            kf_est,
601            ekf_est,
602            ukf_est,
603            kf_metrics,
604            ekf_metrics,
605            ukf_metrics,
606        )
607    except Exception as e:
608        print(f"\nCould not generate plot: {e}")
609
610    print("\nDone!")
611
612
613if __name__ == "__main__":
614    main()

Running the Example

python examples/kalman_filter_comparison.py

See Also