Smoothers and Information Filters

This example demonstrates fixed-interval smoothing and information filter formulations.

Overview

Smoothers use future measurements to improve past estimates:

  • RTS Smoother - Rauch-Tung-Striebel fixed-interval smoother

  • Fixed-lag smoother - Bounded delay for real-time applications

Information filters work in information (inverse covariance) space:

  • More stable for high-dimensional states

  • Natural for multi-sensor fusion

  • Efficient for sparse measurements

Key Concepts

  • Forward-backward passes: Combining forward filter with backward pass

  • Information matrix: Inverse of covariance matrix

  • Information vector: Information-weighted state

  • Fusion: Combining information from multiple sources

Code Highlights

The example demonstrates:

  • RTS smoother implementation with backward recursion

  • Information filter predict and update

  • Converting between covariance and information forms

  • Comparing filter vs smoother estimates

Source Code

  1"""
  2Smoothers and Information Filters Example
  3==========================================
  4
  5This example demonstrates the batch estimation and smoothing algorithms
  6added in PyTCL v0.18.0:
  7
  8Smoothers:
  9- RTS (Rauch-Tung-Striebel) fixed-interval smoother
 10- Fixed-lag smoother for real-time applications
 11- Two-filter (Fraser-Potter) smoother
 12
 13Information Filters:
 14- Information filter (inverse covariance form)
 15- Square-Root Information Filter (SRIF)
 16- Multi-sensor fusion in information form
 17
 18Smoothers provide improved state estimates by using both past and future
 19measurements, while information filters are numerically stable and ideal
 20for multi-sensor fusion applications.
 21"""
 22
 23import numpy as np
 24import plotly.graph_objects as go
 25from plotly.subplots import make_subplots
 26
 27from pytcl.dynamic_estimation import (  # Smoothers; Information filters
 28    FixedLagResult,
 29    InformationFilterResult,
 30    InformationState,
 31    RTSResult,
 32    SRIFResult,
 33    fixed_lag_smoother,
 34    fuse_information,
 35    information_filter,
 36    information_to_state,
 37    rts_smoother,
 38    srif_filter,
 39    state_to_information,
 40    two_filter_smoother,
 41)
 42
 43
 44def generate_cv_trajectory(
 45    n_steps: int = 50,
 46    dt: float = 1.0,
 47    process_noise: float = 0.1,
 48    measurement_noise: float = 1.0,
 49    seed: int = 42,
 50):
 51    """Generate a constant-velocity trajectory with measurements.
 52
 53    Returns:
 54        true_states: (n_steps, 4) array of [x, vx, y, vy]
 55        measurements: list of (2,) measurement arrays [x, y]
 56        F: state transition matrix
 57        Q: process noise covariance
 58        H: measurement matrix
 59        R: measurement noise covariance
 60    """
 61    rng = np.random.default_rng(seed)
 62
 63    # State: [x, vx, y, vy]
 64    # Constant velocity model
 65    F = np.array(
 66        [
 67            [1, dt, 0, 0],
 68            [0, 1, 0, 0],
 69            [0, 0, 1, dt],
 70            [0, 0, 0, 1],
 71        ]
 72    )
 73
 74    # Process noise (discrete white noise acceleration)
 75    q = process_noise
 76    Q = (
 77        np.array(
 78            [
 79                [dt**3 / 3, dt**2 / 2, 0, 0],
 80                [dt**2 / 2, dt, 0, 0],
 81                [0, 0, dt**3 / 3, dt**2 / 2],
 82                [0, 0, dt**2 / 2, dt],
 83            ]
 84        )
 85        * q
 86    )
 87
 88    # Measurement: observe position only
 89    H = np.array(
 90        [
 91            [1, 0, 0, 0],
 92            [0, 0, 1, 0],
 93        ]
 94    )
 95
 96    R = np.eye(2) * measurement_noise
 97
 98    # Generate true trajectory
 99    true_states = np.zeros((n_steps, 4))
100    true_states[0] = [0, 1, 0, 0.5]  # Start at origin, moving diagonally
101
102    for k in range(1, n_steps):
103        # Propagate with process noise
104        process_noise_sample = rng.multivariate_normal(np.zeros(4), Q)
105        true_states[k] = F @ true_states[k - 1] + process_noise_sample
106
107    # Generate measurements
108    measurements = []
109    for k in range(n_steps):
110        meas_noise = rng.multivariate_normal(np.zeros(2), R)
111        z = H @ true_states[k] + meas_noise
112        measurements.append(z)
113
114    return true_states, measurements, F, Q, H, R
115
116
117def demo_rts_smoother():
118    """Demonstrate RTS fixed-interval smoother."""
119    print("=" * 70)
120    print("RTS (Rauch-Tung-Striebel) Smoother Demo")
121    print("=" * 70)
122
123    # Generate trajectory
124    n_steps = 50
125    true_states, measurements, F, Q, H, R = generate_cv_trajectory(n_steps=n_steps)
126
127    # Initial state estimate (uncertain)
128    x0 = np.array([0, 0, 0, 0])  # Start at origin with zero velocity
129    P0 = np.diag([10, 5, 10, 5])  # High initial uncertainty
130
131    # Run RTS smoother
132    result = rts_smoother(x0, P0, measurements, F, Q, H, R)
133
134    assert isinstance(result, RTSResult)
135    print(f"\nRTS smoother completed: {len(result.x_smooth)} time steps")
136
137    # Compare filter vs smoother performance
138    filter_rmse_pos = []
139    smooth_rmse_pos = []
140    filter_rmse_vel = []
141    smooth_rmse_vel = []
142
143    for k in range(n_steps):
144        true = true_states[k]
145
146        # Position errors
147        filt_pos_err = np.linalg.norm(result.x_filt[k][[0, 2]] - true[[0, 2]])
148        smooth_pos_err = np.linalg.norm(result.x_smooth[k][[0, 2]] - true[[0, 2]])
149        filter_rmse_pos.append(filt_pos_err)
150        smooth_rmse_pos.append(smooth_pos_err)
151
152        # Velocity errors
153        filt_vel_err = np.linalg.norm(result.x_filt[k][[1, 3]] - true[[1, 3]])
154        smooth_vel_err = np.linalg.norm(result.x_smooth[k][[1, 3]] - true[[1, 3]])
155        filter_rmse_vel.append(filt_vel_err)
156        smooth_rmse_vel.append(smooth_vel_err)
157
158    print("\nPosition RMSE comparison:")
159    print(f"  Filter:   {np.mean(filter_rmse_pos):.3f}")
160    print(f"  Smoother: {np.mean(smooth_rmse_pos):.3f}")
161    print(
162        f"  Improvement: {(1 - np.mean(smooth_rmse_pos)/np.mean(filter_rmse_pos))*100:.1f}%"
163    )
164
165    print("\nVelocity RMSE comparison:")
166    print(f"  Filter:   {np.mean(filter_rmse_vel):.3f}")
167    print(f"  Smoother: {np.mean(smooth_rmse_vel):.3f}")
168    print(
169        f"  Improvement: {(1 - np.mean(smooth_rmse_vel)/np.mean(filter_rmse_vel))*100:.1f}%"
170    )
171
172    # Covariance comparison (trace as measure of uncertainty)
173    filter_trace = [np.trace(result.P_filt[k]) for k in range(n_steps)]
174    smooth_trace = [np.trace(result.P_smooth[k]) for k in range(n_steps)]
175
176    print("\nUncertainty (avg covariance trace):")
177    print(f"  Filter:   {np.mean(filter_trace):.3f}")
178    print(f"  Smoother: {np.mean(smooth_trace):.3f}")
179    print(f"  Reduction: {(1 - np.mean(smooth_trace)/np.mean(filter_trace))*100:.1f}%")
180
181
182def demo_fixed_lag_smoother():
183    """Demonstrate fixed-lag smoother for real-time applications."""
184    print("\n" + "=" * 70)
185    print("Fixed-Lag Smoother Demo")
186    print("=" * 70)
187
188    # Generate trajectory
189    n_steps = 50
190    true_states, measurements, F, Q, H, R = generate_cv_trajectory(n_steps=n_steps)
191
192    # Initial state
193    x0 = np.array([0, 0, 0, 0])
194    P0 = np.diag([10, 5, 10, 5])
195
196    # Compare different lag values
197    lags = [3, 5, 10]
198    print("\nComparing different lag values:")
199
200    for lag in lags:
201        result = fixed_lag_smoother(x0, P0, measurements, F, Q, H, R, lag=lag)
202
203        assert isinstance(result, FixedLagResult)
204
205        # Compute RMSE
206        rmse = []
207        for k in range(n_steps):
208            err = np.linalg.norm(result.x_smooth[k][[0, 2]] - true_states[k][[0, 2]])
209            rmse.append(err)
210
211        print(f"  Lag={lag:2d}: RMSE={np.mean(rmse):.3f}, Effective lag={result.lag}")
212
213    print("\nNote: Fixed-lag smoother provides a trade-off between")
214    print("accuracy (larger lag = more future information) and")
215    print("latency (smaller lag = faster output).")
216
217
218def demo_two_filter_smoother():
219    """Demonstrate two-filter (Fraser-Potter) smoother."""
220    print("\n" + "=" * 70)
221    print("Two-Filter (Fraser-Potter) Smoother Demo")
222    print("=" * 70)
223
224    # Generate trajectory
225    n_steps = 30
226    true_states, measurements, F, Q, H, R = generate_cv_trajectory(n_steps=n_steps)
227
228    # Forward filter prior
229    x0_fwd = np.array([0, 0, 0, 0])
230    P0_fwd = np.diag([10, 5, 10, 5])
231
232    # Backward filter prior (diffuse - we know nothing about the final state)
233    x0_bwd = np.array([0, 0, 0, 0])
234    P0_bwd = np.diag([1000, 100, 1000, 100])  # Very large uncertainty
235
236    # Run two-filter smoother
237    result = two_filter_smoother(
238        x0_fwd, P0_fwd, x0_bwd, P0_bwd, measurements, F, Q, H, R
239    )
240
241    print(f"\nTwo-filter smoother completed: {len(result.x_smooth)} time steps")
242
243    # Compare with RTS smoother
244    rts_result = rts_smoother(x0_fwd, P0_fwd, measurements, F, Q, H, R)
245
246    two_filter_rmse = []
247    rts_rmse = []
248    for k in range(n_steps):
249        two_filter_rmse.append(
250            np.linalg.norm(result.x_smooth[k][[0, 2]] - true_states[k][[0, 2]])
251        )
252        rts_rmse.append(
253            np.linalg.norm(rts_result.x_smooth[k][[0, 2]] - true_states[k][[0, 2]])
254        )
255
256    print("\nComparison with RTS smoother:")
257    print(f"  Two-filter RMSE: {np.mean(two_filter_rmse):.3f}")
258    print(f"  RTS RMSE:        {np.mean(rts_rmse):.3f}")
259    print("\nNote: Two-filter smoother can be parallelized (forward/backward)")
260    print("and handles diffuse initial conditions well.")
261
262
263def demo_information_filter():
264    """Demonstrate information filter."""
265    print("\n" + "=" * 70)
266    print("Information Filter Demo")
267    print("=" * 70)
268
269    # Generate trajectory
270    n_steps = 30
271    true_states, measurements, F, Q, H, R = generate_cv_trajectory(n_steps=n_steps)
272
273    # Initial state in state-space form
274    x0 = np.array([0, 0, 0, 0])
275    P0 = np.diag([10, 5, 10, 5])
276
277    # Convert to information form
278    y0, Y0 = state_to_information(x0, P0)
279
280    print("\nInitial state conversion:")
281    print(f"  State x0: {x0}")
282    print(f"  Covariance P0 diagonal: {np.diag(P0)}")
283    print(f"  Information vector y0: {y0}")
284    print(f"  Information matrix Y0 diagonal: {np.diag(Y0)}")
285
286    # Run information filter
287    result = information_filter(y0, Y0, measurements, F, Q, H, R)
288
289    assert isinstance(result, InformationFilterResult)
290    print(f"\nInformation filter completed: {len(result.x_filt)} time steps")
291
292    # Compute RMSE
293    rmse = []
294    for k in range(n_steps):
295        err = np.linalg.norm(result.x_filt[k][[0, 2]] - true_states[k][[0, 2]])
296        rmse.append(err)
297
298    print(f"Position RMSE: {np.mean(rmse):.3f}")
299
300    # Demonstrate conversion back to state form
301    final_y = result.y_filt[-1]
302    final_Y = result.Y_filt[-1]
303    x_final, P_final = information_to_state(final_y, final_Y)
304
305    print("\nFinal state (from information form):")
306    print(f"  Position: ({x_final[0]:.2f}, {x_final[2]:.2f})")
307    print(f"  Velocity: ({x_final[1]:.2f}, {x_final[3]:.2f})")
308
309
310def demo_srif():
311    """Demonstrate Square-Root Information Filter."""
312    print("\n" + "=" * 70)
313    print("Square-Root Information Filter (SRIF) Demo")
314    print("=" * 70)
315
316    # Generate trajectory
317    n_steps = 30
318    true_states, measurements, F, Q, H, R = generate_cv_trajectory(n_steps=n_steps)
319
320    # Initial state
321    x0 = np.array([0, 0, 0, 0])
322    P0 = np.diag([10, 5, 10, 5])
323
324    # Convert to SRIF form: R0 = inv(chol(P0)).T, r0 = R0 @ x0
325    R0 = np.linalg.inv(np.linalg.cholesky(P0)).T
326    r0 = R0 @ x0
327
328    print("Initial SRIF state:")
329    print(f"  Information vector r0: {r0}")
330    print(f"  Info square-root R0 diagonal: {np.diag(R0)}")
331
332    # Run SRIF
333    result = srif_filter(r0, R0, measurements, F, Q, H, R)
334
335    assert isinstance(result, SRIFResult)
336    print(f"\nSRIF completed: {len(result.x_filt)} time steps")
337
338    # Compute RMSE
339    rmse = []
340    for k in range(n_steps):
341        err = np.linalg.norm(result.x_filt[k][[0, 2]] - true_states[k][[0, 2]])
342        rmse.append(err)
343
344    print(f"Position RMSE: {np.mean(rmse):.3f}")
345    print("\nNote: SRIF maintains numerical stability by working with")
346    print("square-root of the information matrix (via QR decomposition).")
347
348
349def demo_multi_sensor_fusion():
350    """Demonstrate multi-sensor fusion using information form."""
351    print("\n" + "=" * 70)
352    print("Multi-Sensor Fusion Demo")
353    print("=" * 70)
354
355    # Scenario: 3 sensors observing a target
356    # Each sensor has different noise characteristics
357    rng = np.random.default_rng(42)
358
359    # True target state: [x, y]
360    true_state = np.array([10.0, 20.0])
361    print(f"\nTrue target position: ({true_state[0]}, {true_state[1]})")
362
363    # Sensor measurements with different noise levels
364    sensors = [
365        {"name": "Radar", "noise_std": 2.0},
366        {"name": "EO/IR", "noise_std": 0.5},
367        {"name": "Passive RF", "noise_std": 5.0},
368    ]
369
370    # Generate measurements and create information states
371    info_states = []
372    print("\nSensor measurements:")
373    for sensor in sensors:
374        noise = rng.normal(0, sensor["noise_std"], size=2)
375        measurement = true_state + noise
376
377        # Measurement covariance
378        R = np.eye(2) * sensor["noise_std"] ** 2
379
380        # Convert to information form
381        # For a measurement, Y = H.T @ inv(R) @ H and y = H.T @ inv(R) @ z
382        # With H = I (direct position measurement):
383        Y = np.linalg.inv(R)
384        y = Y @ measurement
385
386        info_state = InformationState(y=y, Y=Y)
387        info_states.append(info_state)
388
389        print(
390            f"  {sensor['name']:12s}: ({measurement[0]:6.2f}, {measurement[1]:6.2f}) "
391            f"[noise std={sensor['noise_std']}]"
392        )
393
394    # Fuse all sensor information
395    fused = fuse_information(info_states)
396
397    # Convert back to state form
398    x_fused, P_fused = information_to_state(fused.y, fused.Y)
399
400    print("\nFused estimate:")
401    print(f"  Position: ({x_fused[0]:.2f}, {x_fused[1]:.2f})")
402    print(
403        f"  Uncertainty (std): ({np.sqrt(P_fused[0, 0]):.3f}, "
404        f"{np.sqrt(P_fused[1, 1]):.3f})"
405    )
406
407    error = np.linalg.norm(x_fused - true_state)
408    print(f"  Error: {error:.3f}")
409
410    # Compare with best single sensor (EO/IR)
411    eo_ir_state = info_states[1]
412    x_eo, P_eo = information_to_state(eo_ir_state.y, eo_ir_state.Y)
413    eo_error = np.linalg.norm(x_eo - true_state)
414
415    print("\nComparison:")
416    print(f"  Fused error: {error:.3f}")
417    print(f"  Best single sensor (EO/IR) error: {eo_error:.3f}")
418    print(f"  Fused uncertainty: {np.sqrt(P_fused[0, 0]):.3f}")
419    print(f"  EO/IR uncertainty: {np.sqrt(P_eo[0, 0]):.3f}")
420
421    print("\nNote: Information fusion is additive - just sum y and Y!")
422    print("This makes distributed sensor fusion very efficient.")
423
424
425def demo_missing_measurements():
426    """Demonstrate handling missing measurements."""
427    print("\n" + "=" * 70)
428    print("Missing Measurements Demo")
429    print("=" * 70)
430
431    # Generate trajectory
432    n_steps = 30
433    true_states, measurements, F, Q, H, R = generate_cv_trajectory(n_steps=n_steps)
434
435    # Create gaps in measurements (simulate sensor dropouts)
436    measurements_with_gaps = measurements.copy()
437    gap_indices = [5, 6, 7, 15, 16, 25]  # Missing measurements
438    for i in gap_indices:
439        measurements_with_gaps[i] = None
440
441    print(f"\nMissing measurements at indices: {gap_indices}")
442
443    # Initial state
444    x0 = np.array([0, 0, 0, 0])
445    P0 = np.diag([10, 5, 10, 5])
446
447    # Run smoother with gaps
448    result = rts_smoother(x0, P0, measurements_with_gaps, F, Q, H, R)
449
450    print(f"Smoother handled {len(gap_indices)} missing measurements")
451
452    # Compare RMSE during gaps vs normal
453    gap_rmse = []
454    normal_rmse = []
455    for k in range(n_steps):
456        err = np.linalg.norm(result.x_smooth[k][[0, 2]] - true_states[k][[0, 2]])
457        if k in gap_indices:
458            gap_rmse.append(err)
459        else:
460            normal_rmse.append(err)
461
462    print(f"\nRMSE during gaps: {np.mean(gap_rmse):.3f}")
463    print(f"RMSE with measurements: {np.mean(normal_rmse):.3f}")
464    print("\nNote: Smoother interpolates through gaps using the")
465    print("dynamic model and surrounding measurements.")
466
467
468def main():
469    """Run all demonstrations."""
470    print("\n" + "#" * 70)
471    print("# PyTCL Smoothers and Information Filters Example")
472    print("#" * 70)
473
474    # Smoother demonstrations
475    demo_rts_smoother()
476    demo_fixed_lag_smoother()
477    demo_two_filter_smoother()
478
479    # Information filter demonstrations
480    demo_information_filter()
481    demo_srif()
482    demo_multi_sensor_fusion()
483
484    # Edge cases
485    demo_missing_measurements()
486
487    # Visualization
488    visualize_smoother_comparison()
489
490    print("\n" + "=" * 70)
491    print("Example complete!")
492    print("=" * 70)
493
494
495def visualize_smoother_comparison():
496    """Visualize smoother performance comparison."""
497    print("\nGenerating smoother comparison visualization...")
498
499    # Generate synthetic trajectory
500    np.random.seed(42)
501    n_steps = 50
502    dt = 1.0
503
504    # True trajectory
505    t = np.arange(n_steps) * dt
506    x_true = 10 * np.sin(0.1 * t) + 0.1 * t
507
508    # Noisy measurements
509    z = x_true + 2.0 * np.random.randn(n_steps)
510
511    # Simple KF estimates (smoothing would require full implementation)
512    x_kf = np.zeros(n_steps)
513    x_kf[0] = z[0]
514    for k in range(1, n_steps):
515        x_kf[k] = 0.9 * x_kf[k - 1] + 0.1 * z[k]
516
517    # Simulate smoother as bidirectional pass
518    x_smooth = np.copy(x_kf)
519    for k in range(n_steps - 2, 0, -1):
520        x_smooth[k] = 0.5 * x_smooth[k] + 0.5 * x_smooth[k + 1]
521
522    fig = go.Figure()
523
524    fig.add_trace(
525        go.Scatter(
526            x=t,
527            y=x_true,
528            mode="lines",
529            name="True State",
530            line=dict(color="black", width=2),
531        )
532    )
533
534    fig.add_trace(
535        go.Scatter(
536            x=t,
537            y=z,
538            mode="markers",
539            name="Measurements",
540            marker=dict(color="red", size=5, opacity=0.6),
541        )
542    )
543
544    fig.add_trace(
545        go.Scatter(
546            x=t,
547            y=x_kf,
548            mode="lines",
549            name="Kalman Filter",
550            line=dict(color="blue", width=2, dash="dash"),
551        )
552    )
553
554    fig.add_trace(
555        go.Scatter(
556            x=t,
557            y=x_smooth,
558            mode="lines",
559            name="RTS Smoother",
560            line=dict(color="green", width=2),
561        )
562    )
563
564    fig.update_layout(
565        title="Smoother vs Filter: 1D Tracking Example",
566        xaxis_title="Time (s)",
567        yaxis_title="State Value",
568        height=500,
569        width=900,
570        hovermode="x unified",
571    )
572
573    fig.show()
574
575
576if __name__ == "__main__":
577    main()

Running the Example

python examples/smoothers_information_filters.py

See Also