Smoothers and Information Filters
This example demonstrates fixed-interval smoothing and information filter formulations.
Overview
Smoothers use future measurements to improve past estimates:
RTS Smoother - Rauch-Tung-Striebel fixed-interval smoother
Fixed-lag smoother - Bounded delay for real-time applications
Information filters work in information (inverse covariance) space:
More stable for high-dimensional states
Natural for multi-sensor fusion
Efficient for sparse measurements
Key Concepts
Forward-backward passes: Combining forward filter with backward pass
Information matrix: Inverse of covariance matrix
Information vector: Information-weighted state
Fusion: Combining information from multiple sources
Code Highlights
The example demonstrates:
RTS smoother implementation with backward recursion
Information filter predict and update
Converting between covariance and information forms
Comparing filter vs smoother estimates
Source Code
1"""
2Smoothers and Information Filters Example
3==========================================
4
5This example demonstrates the batch estimation and smoothing algorithms
6added in PyTCL v0.18.0:
7
8Smoothers:
9- RTS (Rauch-Tung-Striebel) fixed-interval smoother
10- Fixed-lag smoother for real-time applications
11- Two-filter (Fraser-Potter) smoother
12
13Information Filters:
14- Information filter (inverse covariance form)
15- Square-Root Information Filter (SRIF)
16- Multi-sensor fusion in information form
17
18Smoothers provide improved state estimates by using both past and future
19measurements, while information filters are numerically stable and ideal
20for multi-sensor fusion applications.
21"""
22
23import numpy as np
24import plotly.graph_objects as go
25from plotly.subplots import make_subplots
26
27from pytcl.dynamic_estimation import ( # Smoothers; Information filters
28 FixedLagResult,
29 InformationFilterResult,
30 InformationState,
31 RTSResult,
32 SRIFResult,
33 fixed_lag_smoother,
34 fuse_information,
35 information_filter,
36 information_to_state,
37 rts_smoother,
38 srif_filter,
39 state_to_information,
40 two_filter_smoother,
41)
42
43
44def generate_cv_trajectory(
45 n_steps: int = 50,
46 dt: float = 1.0,
47 process_noise: float = 0.1,
48 measurement_noise: float = 1.0,
49 seed: int = 42,
50):
51 """Generate a constant-velocity trajectory with measurements.
52
53 Returns:
54 true_states: (n_steps, 4) array of [x, vx, y, vy]
55 measurements: list of (2,) measurement arrays [x, y]
56 F: state transition matrix
57 Q: process noise covariance
58 H: measurement matrix
59 R: measurement noise covariance
60 """
61 rng = np.random.default_rng(seed)
62
63 # State: [x, vx, y, vy]
64 # Constant velocity model
65 F = np.array(
66 [
67 [1, dt, 0, 0],
68 [0, 1, 0, 0],
69 [0, 0, 1, dt],
70 [0, 0, 0, 1],
71 ]
72 )
73
74 # Process noise (discrete white noise acceleration)
75 q = process_noise
76 Q = (
77 np.array(
78 [
79 [dt**3 / 3, dt**2 / 2, 0, 0],
80 [dt**2 / 2, dt, 0, 0],
81 [0, 0, dt**3 / 3, dt**2 / 2],
82 [0, 0, dt**2 / 2, dt],
83 ]
84 )
85 * q
86 )
87
88 # Measurement: observe position only
89 H = np.array(
90 [
91 [1, 0, 0, 0],
92 [0, 0, 1, 0],
93 ]
94 )
95
96 R = np.eye(2) * measurement_noise
97
98 # Generate true trajectory
99 true_states = np.zeros((n_steps, 4))
100 true_states[0] = [0, 1, 0, 0.5] # Start at origin, moving diagonally
101
102 for k in range(1, n_steps):
103 # Propagate with process noise
104 process_noise_sample = rng.multivariate_normal(np.zeros(4), Q)
105 true_states[k] = F @ true_states[k - 1] + process_noise_sample
106
107 # Generate measurements
108 measurements = []
109 for k in range(n_steps):
110 meas_noise = rng.multivariate_normal(np.zeros(2), R)
111 z = H @ true_states[k] + meas_noise
112 measurements.append(z)
113
114 return true_states, measurements, F, Q, H, R
115
116
117def demo_rts_smoother():
118 """Demonstrate RTS fixed-interval smoother."""
119 print("=" * 70)
120 print("RTS (Rauch-Tung-Striebel) Smoother Demo")
121 print("=" * 70)
122
123 # Generate trajectory
124 n_steps = 50
125 true_states, measurements, F, Q, H, R = generate_cv_trajectory(n_steps=n_steps)
126
127 # Initial state estimate (uncertain)
128 x0 = np.array([0, 0, 0, 0]) # Start at origin with zero velocity
129 P0 = np.diag([10, 5, 10, 5]) # High initial uncertainty
130
131 # Run RTS smoother
132 result = rts_smoother(x0, P0, measurements, F, Q, H, R)
133
134 assert isinstance(result, RTSResult)
135 print(f"\nRTS smoother completed: {len(result.x_smooth)} time steps")
136
137 # Compare filter vs smoother performance
138 filter_rmse_pos = []
139 smooth_rmse_pos = []
140 filter_rmse_vel = []
141 smooth_rmse_vel = []
142
143 for k in range(n_steps):
144 true = true_states[k]
145
146 # Position errors
147 filt_pos_err = np.linalg.norm(result.x_filt[k][[0, 2]] - true[[0, 2]])
148 smooth_pos_err = np.linalg.norm(result.x_smooth[k][[0, 2]] - true[[0, 2]])
149 filter_rmse_pos.append(filt_pos_err)
150 smooth_rmse_pos.append(smooth_pos_err)
151
152 # Velocity errors
153 filt_vel_err = np.linalg.norm(result.x_filt[k][[1, 3]] - true[[1, 3]])
154 smooth_vel_err = np.linalg.norm(result.x_smooth[k][[1, 3]] - true[[1, 3]])
155 filter_rmse_vel.append(filt_vel_err)
156 smooth_rmse_vel.append(smooth_vel_err)
157
158 print("\nPosition RMSE comparison:")
159 print(f" Filter: {np.mean(filter_rmse_pos):.3f}")
160 print(f" Smoother: {np.mean(smooth_rmse_pos):.3f}")
161 print(
162 f" Improvement: {(1 - np.mean(smooth_rmse_pos)/np.mean(filter_rmse_pos))*100:.1f}%"
163 )
164
165 print("\nVelocity RMSE comparison:")
166 print(f" Filter: {np.mean(filter_rmse_vel):.3f}")
167 print(f" Smoother: {np.mean(smooth_rmse_vel):.3f}")
168 print(
169 f" Improvement: {(1 - np.mean(smooth_rmse_vel)/np.mean(filter_rmse_vel))*100:.1f}%"
170 )
171
172 # Covariance comparison (trace as measure of uncertainty)
173 filter_trace = [np.trace(result.P_filt[k]) for k in range(n_steps)]
174 smooth_trace = [np.trace(result.P_smooth[k]) for k in range(n_steps)]
175
176 print("\nUncertainty (avg covariance trace):")
177 print(f" Filter: {np.mean(filter_trace):.3f}")
178 print(f" Smoother: {np.mean(smooth_trace):.3f}")
179 print(f" Reduction: {(1 - np.mean(smooth_trace)/np.mean(filter_trace))*100:.1f}%")
180
181
182def demo_fixed_lag_smoother():
183 """Demonstrate fixed-lag smoother for real-time applications."""
184 print("\n" + "=" * 70)
185 print("Fixed-Lag Smoother Demo")
186 print("=" * 70)
187
188 # Generate trajectory
189 n_steps = 50
190 true_states, measurements, F, Q, H, R = generate_cv_trajectory(n_steps=n_steps)
191
192 # Initial state
193 x0 = np.array([0, 0, 0, 0])
194 P0 = np.diag([10, 5, 10, 5])
195
196 # Compare different lag values
197 lags = [3, 5, 10]
198 print("\nComparing different lag values:")
199
200 for lag in lags:
201 result = fixed_lag_smoother(x0, P0, measurements, F, Q, H, R, lag=lag)
202
203 assert isinstance(result, FixedLagResult)
204
205 # Compute RMSE
206 rmse = []
207 for k in range(n_steps):
208 err = np.linalg.norm(result.x_smooth[k][[0, 2]] - true_states[k][[0, 2]])
209 rmse.append(err)
210
211 print(f" Lag={lag:2d}: RMSE={np.mean(rmse):.3f}, Effective lag={result.lag}")
212
213 print("\nNote: Fixed-lag smoother provides a trade-off between")
214 print("accuracy (larger lag = more future information) and")
215 print("latency (smaller lag = faster output).")
216
217
218def demo_two_filter_smoother():
219 """Demonstrate two-filter (Fraser-Potter) smoother."""
220 print("\n" + "=" * 70)
221 print("Two-Filter (Fraser-Potter) Smoother Demo")
222 print("=" * 70)
223
224 # Generate trajectory
225 n_steps = 30
226 true_states, measurements, F, Q, H, R = generate_cv_trajectory(n_steps=n_steps)
227
228 # Forward filter prior
229 x0_fwd = np.array([0, 0, 0, 0])
230 P0_fwd = np.diag([10, 5, 10, 5])
231
232 # Backward filter prior (diffuse - we know nothing about the final state)
233 x0_bwd = np.array([0, 0, 0, 0])
234 P0_bwd = np.diag([1000, 100, 1000, 100]) # Very large uncertainty
235
236 # Run two-filter smoother
237 result = two_filter_smoother(
238 x0_fwd, P0_fwd, x0_bwd, P0_bwd, measurements, F, Q, H, R
239 )
240
241 print(f"\nTwo-filter smoother completed: {len(result.x_smooth)} time steps")
242
243 # Compare with RTS smoother
244 rts_result = rts_smoother(x0_fwd, P0_fwd, measurements, F, Q, H, R)
245
246 two_filter_rmse = []
247 rts_rmse = []
248 for k in range(n_steps):
249 two_filter_rmse.append(
250 np.linalg.norm(result.x_smooth[k][[0, 2]] - true_states[k][[0, 2]])
251 )
252 rts_rmse.append(
253 np.linalg.norm(rts_result.x_smooth[k][[0, 2]] - true_states[k][[0, 2]])
254 )
255
256 print("\nComparison with RTS smoother:")
257 print(f" Two-filter RMSE: {np.mean(two_filter_rmse):.3f}")
258 print(f" RTS RMSE: {np.mean(rts_rmse):.3f}")
259 print("\nNote: Two-filter smoother can be parallelized (forward/backward)")
260 print("and handles diffuse initial conditions well.")
261
262
263def demo_information_filter():
264 """Demonstrate information filter."""
265 print("\n" + "=" * 70)
266 print("Information Filter Demo")
267 print("=" * 70)
268
269 # Generate trajectory
270 n_steps = 30
271 true_states, measurements, F, Q, H, R = generate_cv_trajectory(n_steps=n_steps)
272
273 # Initial state in state-space form
274 x0 = np.array([0, 0, 0, 0])
275 P0 = np.diag([10, 5, 10, 5])
276
277 # Convert to information form
278 y0, Y0 = state_to_information(x0, P0)
279
280 print("\nInitial state conversion:")
281 print(f" State x0: {x0}")
282 print(f" Covariance P0 diagonal: {np.diag(P0)}")
283 print(f" Information vector y0: {y0}")
284 print(f" Information matrix Y0 diagonal: {np.diag(Y0)}")
285
286 # Run information filter
287 result = information_filter(y0, Y0, measurements, F, Q, H, R)
288
289 assert isinstance(result, InformationFilterResult)
290 print(f"\nInformation filter completed: {len(result.x_filt)} time steps")
291
292 # Compute RMSE
293 rmse = []
294 for k in range(n_steps):
295 err = np.linalg.norm(result.x_filt[k][[0, 2]] - true_states[k][[0, 2]])
296 rmse.append(err)
297
298 print(f"Position RMSE: {np.mean(rmse):.3f}")
299
300 # Demonstrate conversion back to state form
301 final_y = result.y_filt[-1]
302 final_Y = result.Y_filt[-1]
303 x_final, P_final = information_to_state(final_y, final_Y)
304
305 print("\nFinal state (from information form):")
306 print(f" Position: ({x_final[0]:.2f}, {x_final[2]:.2f})")
307 print(f" Velocity: ({x_final[1]:.2f}, {x_final[3]:.2f})")
308
309
310def demo_srif():
311 """Demonstrate Square-Root Information Filter."""
312 print("\n" + "=" * 70)
313 print("Square-Root Information Filter (SRIF) Demo")
314 print("=" * 70)
315
316 # Generate trajectory
317 n_steps = 30
318 true_states, measurements, F, Q, H, R = generate_cv_trajectory(n_steps=n_steps)
319
320 # Initial state
321 x0 = np.array([0, 0, 0, 0])
322 P0 = np.diag([10, 5, 10, 5])
323
324 # Convert to SRIF form: R0 = inv(chol(P0)).T, r0 = R0 @ x0
325 R0 = np.linalg.inv(np.linalg.cholesky(P0)).T
326 r0 = R0 @ x0
327
328 print("Initial SRIF state:")
329 print(f" Information vector r0: {r0}")
330 print(f" Info square-root R0 diagonal: {np.diag(R0)}")
331
332 # Run SRIF
333 result = srif_filter(r0, R0, measurements, F, Q, H, R)
334
335 assert isinstance(result, SRIFResult)
336 print(f"\nSRIF completed: {len(result.x_filt)} time steps")
337
338 # Compute RMSE
339 rmse = []
340 for k in range(n_steps):
341 err = np.linalg.norm(result.x_filt[k][[0, 2]] - true_states[k][[0, 2]])
342 rmse.append(err)
343
344 print(f"Position RMSE: {np.mean(rmse):.3f}")
345 print("\nNote: SRIF maintains numerical stability by working with")
346 print("square-root of the information matrix (via QR decomposition).")
347
348
349def demo_multi_sensor_fusion():
350 """Demonstrate multi-sensor fusion using information form."""
351 print("\n" + "=" * 70)
352 print("Multi-Sensor Fusion Demo")
353 print("=" * 70)
354
355 # Scenario: 3 sensors observing a target
356 # Each sensor has different noise characteristics
357 rng = np.random.default_rng(42)
358
359 # True target state: [x, y]
360 true_state = np.array([10.0, 20.0])
361 print(f"\nTrue target position: ({true_state[0]}, {true_state[1]})")
362
363 # Sensor measurements with different noise levels
364 sensors = [
365 {"name": "Radar", "noise_std": 2.0},
366 {"name": "EO/IR", "noise_std": 0.5},
367 {"name": "Passive RF", "noise_std": 5.0},
368 ]
369
370 # Generate measurements and create information states
371 info_states = []
372 print("\nSensor measurements:")
373 for sensor in sensors:
374 noise = rng.normal(0, sensor["noise_std"], size=2)
375 measurement = true_state + noise
376
377 # Measurement covariance
378 R = np.eye(2) * sensor["noise_std"] ** 2
379
380 # Convert to information form
381 # For a measurement, Y = H.T @ inv(R) @ H and y = H.T @ inv(R) @ z
382 # With H = I (direct position measurement):
383 Y = np.linalg.inv(R)
384 y = Y @ measurement
385
386 info_state = InformationState(y=y, Y=Y)
387 info_states.append(info_state)
388
389 print(
390 f" {sensor['name']:12s}: ({measurement[0]:6.2f}, {measurement[1]:6.2f}) "
391 f"[noise std={sensor['noise_std']}]"
392 )
393
394 # Fuse all sensor information
395 fused = fuse_information(info_states)
396
397 # Convert back to state form
398 x_fused, P_fused = information_to_state(fused.y, fused.Y)
399
400 print("\nFused estimate:")
401 print(f" Position: ({x_fused[0]:.2f}, {x_fused[1]:.2f})")
402 print(
403 f" Uncertainty (std): ({np.sqrt(P_fused[0, 0]):.3f}, "
404 f"{np.sqrt(P_fused[1, 1]):.3f})"
405 )
406
407 error = np.linalg.norm(x_fused - true_state)
408 print(f" Error: {error:.3f}")
409
410 # Compare with best single sensor (EO/IR)
411 eo_ir_state = info_states[1]
412 x_eo, P_eo = information_to_state(eo_ir_state.y, eo_ir_state.Y)
413 eo_error = np.linalg.norm(x_eo - true_state)
414
415 print("\nComparison:")
416 print(f" Fused error: {error:.3f}")
417 print(f" Best single sensor (EO/IR) error: {eo_error:.3f}")
418 print(f" Fused uncertainty: {np.sqrt(P_fused[0, 0]):.3f}")
419 print(f" EO/IR uncertainty: {np.sqrt(P_eo[0, 0]):.3f}")
420
421 print("\nNote: Information fusion is additive - just sum y and Y!")
422 print("This makes distributed sensor fusion very efficient.")
423
424
425def demo_missing_measurements():
426 """Demonstrate handling missing measurements."""
427 print("\n" + "=" * 70)
428 print("Missing Measurements Demo")
429 print("=" * 70)
430
431 # Generate trajectory
432 n_steps = 30
433 true_states, measurements, F, Q, H, R = generate_cv_trajectory(n_steps=n_steps)
434
435 # Create gaps in measurements (simulate sensor dropouts)
436 measurements_with_gaps = measurements.copy()
437 gap_indices = [5, 6, 7, 15, 16, 25] # Missing measurements
438 for i in gap_indices:
439 measurements_with_gaps[i] = None
440
441 print(f"\nMissing measurements at indices: {gap_indices}")
442
443 # Initial state
444 x0 = np.array([0, 0, 0, 0])
445 P0 = np.diag([10, 5, 10, 5])
446
447 # Run smoother with gaps
448 result = rts_smoother(x0, P0, measurements_with_gaps, F, Q, H, R)
449
450 print(f"Smoother handled {len(gap_indices)} missing measurements")
451
452 # Compare RMSE during gaps vs normal
453 gap_rmse = []
454 normal_rmse = []
455 for k in range(n_steps):
456 err = np.linalg.norm(result.x_smooth[k][[0, 2]] - true_states[k][[0, 2]])
457 if k in gap_indices:
458 gap_rmse.append(err)
459 else:
460 normal_rmse.append(err)
461
462 print(f"\nRMSE during gaps: {np.mean(gap_rmse):.3f}")
463 print(f"RMSE with measurements: {np.mean(normal_rmse):.3f}")
464 print("\nNote: Smoother interpolates through gaps using the")
465 print("dynamic model and surrounding measurements.")
466
467
468def main():
469 """Run all demonstrations."""
470 print("\n" + "#" * 70)
471 print("# PyTCL Smoothers and Information Filters Example")
472 print("#" * 70)
473
474 # Smoother demonstrations
475 demo_rts_smoother()
476 demo_fixed_lag_smoother()
477 demo_two_filter_smoother()
478
479 # Information filter demonstrations
480 demo_information_filter()
481 demo_srif()
482 demo_multi_sensor_fusion()
483
484 # Edge cases
485 demo_missing_measurements()
486
487 # Visualization
488 visualize_smoother_comparison()
489
490 print("\n" + "=" * 70)
491 print("Example complete!")
492 print("=" * 70)
493
494
495def visualize_smoother_comparison():
496 """Visualize smoother performance comparison."""
497 print("\nGenerating smoother comparison visualization...")
498
499 # Generate synthetic trajectory
500 np.random.seed(42)
501 n_steps = 50
502 dt = 1.0
503
504 # True trajectory
505 t = np.arange(n_steps) * dt
506 x_true = 10 * np.sin(0.1 * t) + 0.1 * t
507
508 # Noisy measurements
509 z = x_true + 2.0 * np.random.randn(n_steps)
510
511 # Simple KF estimates (smoothing would require full implementation)
512 x_kf = np.zeros(n_steps)
513 x_kf[0] = z[0]
514 for k in range(1, n_steps):
515 x_kf[k] = 0.9 * x_kf[k - 1] + 0.1 * z[k]
516
517 # Simulate smoother as bidirectional pass
518 x_smooth = np.copy(x_kf)
519 for k in range(n_steps - 2, 0, -1):
520 x_smooth[k] = 0.5 * x_smooth[k] + 0.5 * x_smooth[k + 1]
521
522 fig = go.Figure()
523
524 fig.add_trace(
525 go.Scatter(
526 x=t,
527 y=x_true,
528 mode="lines",
529 name="True State",
530 line=dict(color="black", width=2),
531 )
532 )
533
534 fig.add_trace(
535 go.Scatter(
536 x=t,
537 y=z,
538 mode="markers",
539 name="Measurements",
540 marker=dict(color="red", size=5, opacity=0.6),
541 )
542 )
543
544 fig.add_trace(
545 go.Scatter(
546 x=t,
547 y=x_kf,
548 mode="lines",
549 name="Kalman Filter",
550 line=dict(color="blue", width=2, dash="dash"),
551 )
552 )
553
554 fig.add_trace(
555 go.Scatter(
556 x=t,
557 y=x_smooth,
558 mode="lines",
559 name="RTS Smoother",
560 line=dict(color="green", width=2),
561 )
562 )
563
564 fig.update_layout(
565 title="Smoother vs Filter: 1D Tracking Example",
566 xaxis_title="Time (s)",
567 yaxis_title="State Value",
568 height=500,
569 width=900,
570 hovermode="x unified",
571 )
572
573 fig.show()
574
575
576if __name__ == "__main__":
577 main()
Running the Example
python examples/smoothers_information_filters.py
See Also
Kalman Filter Comparison - Standard Kalman filters
multi_target_tracking - Multi-target tracking with smoothing