Source code for pytcl.dynamic_estimation.smoothers

"""
Smoothing algorithms for state estimation.

This module provides optimal fixed-interval, fixed-lag, and fixed-point
smoothers for linear and nonlinear systems. Smoothers use both past and
future measurements to produce optimal state estimates.

The main algorithms are:
- RTS (Rauch-Tung-Striebel) smoother for linear systems
- Fixed-lag smoother for real-time applications
- Fixed-interval smoother for batch processing
- Two-filter smoother for parallel processing
"""

from typing import List, NamedTuple, Optional

import numpy as np
from numpy.typing import ArrayLike, NDArray

from pytcl.dynamic_estimation.kalman.linear import kf_predict, kf_smooth, kf_update


[docs] class SmoothedState(NamedTuple): """Smoothed state estimate. Attributes ---------- x : ndarray Smoothed state estimate. P : ndarray Smoothed state covariance. """ x: NDArray[np.floating] P: NDArray[np.floating]
[docs] class RTSResult(NamedTuple): """Result of RTS (Rauch-Tung-Striebel) smoother. Attributes ---------- x_smooth : list of ndarray Smoothed state estimates at each time step. P_smooth : list of ndarray Smoothed covariances at each time step. x_filt : list of ndarray Filtered state estimates (forward pass). P_filt : list of ndarray Filtered covariances (forward pass). """ x_smooth: List[NDArray[np.floating]] P_smooth: List[NDArray[np.floating]] x_filt: List[NDArray[np.floating]] P_filt: List[NDArray[np.floating]]
[docs] class FixedLagResult(NamedTuple): """Result of fixed-lag smoother. Attributes ---------- x_smooth : list of ndarray Smoothed state estimates with lag L. P_smooth : list of ndarray Smoothed covariances with lag L. lag : int Smoothing lag used. """ x_smooth: List[NDArray[np.floating]] P_smooth: List[NDArray[np.floating]] lag: int
[docs] def rts_smoother( x0: ArrayLike, P0: ArrayLike, measurements: List[ArrayLike], F: ArrayLike, Q: ArrayLike, H: ArrayLike, R: ArrayLike, F_list: Optional[List[ArrayLike]] = None, Q_list: Optional[List[ArrayLike]] = None, H_list: Optional[List[ArrayLike]] = None, R_list: Optional[List[ArrayLike]] = None, ) -> RTSResult: """ Rauch-Tung-Striebel (RTS) fixed-interval smoother. Runs a forward Kalman filter pass followed by a backward smoothing pass to produce optimal smoothed estimates using all measurements. Parameters ---------- x0 : array_like Initial state estimate, shape (n,). P0 : array_like Initial state covariance, shape (n, n). measurements : list of array_like List of measurements at each time step. Use None for missing. F : array_like State transition matrix, shape (n, n). Used if F_list not provided. Q : array_like Process noise covariance. Used if Q_list not provided. H : array_like Measurement matrix. Used if H_list not provided. R : array_like Measurement noise covariance. Used if R_list not provided. F_list : list of array_like, optional Time-varying state transition matrices. Q_list : list of array_like, optional Time-varying process noise covariances. H_list : list of array_like, optional Time-varying measurement matrices. R_list : list of array_like, optional Time-varying measurement noise covariances. Returns ------- result : RTSResult Named tuple containing smoothed states/covariances and filtered states/covariances. Examples -------- >>> import numpy as np >>> # Simple 1D position tracking with velocity >>> x0 = np.array([0.0, 0.0]) # [position, velocity] >>> P0 = np.eye(2) * 10.0 >>> F = np.array([[1, 1], [0, 1]]) # CV model >>> Q = np.array([[0.25, 0.5], [0.5, 1.0]]) * 0.1 >>> H = np.array([[1, 0]]) # Position measurement >>> R = np.array([[1.0]]) >>> measurements = [np.array([1.1]), np.array([2.3]), np.array([3.2])] >>> result = rts_smoother(x0, P0, measurements, F, Q, H, R) >>> len(result.x_smooth) 3 Notes ----- The RTS smoother provides the optimal linear estimate using all available data. It consists of two passes: Forward pass (Kalman filter): x_k|k, P_k|k from measurements z_1, ..., z_k Backward pass: G_k = P_k|k @ F_k' @ P_{k+1|k}^{-1} x_k|N = x_k|k + G_k @ (x_{k+1|N} - x_{k+1|k}) P_k|N = P_k|k + G_k @ (P_{k+1|N} - P_{k+1|k}) @ G_k' where N is the total number of measurements. References ---------- .. [1] Rauch, H.E., Tung, F., and Striebel, C.T., "Maximum likelihood estimates of linear dynamic systems", AIAA Journal, 1965. """ x0 = np.asarray(x0, dtype=np.float64).flatten() P0 = np.asarray(P0, dtype=np.float64) F = np.asarray(F, dtype=np.float64) Q = np.asarray(Q, dtype=np.float64) H = np.asarray(H, dtype=np.float64) R = np.asarray(R, dtype=np.float64) n_steps = len(measurements) # Storage for forward pass x_filt: List[NDArray[np.floating]] = [] P_filt: List[NDArray[np.floating]] = [] x_pred: List[NDArray[np.floating]] = [] P_pred: List[NDArray[np.floating]] = [] F_used: List[NDArray[np.floating]] = [] # Forward pass (Kalman filter) x = x0.copy() P = P0.copy() for k in range(n_steps): # Get time-varying matrices if provided F_k = np.asarray(F_list[k], dtype=np.float64) if F_list else F Q_k = np.asarray(Q_list[k], dtype=np.float64) if Q_list else Q H_k = np.asarray(H_list[k], dtype=np.float64) if H_list else H R_k = np.asarray(R_list[k], dtype=np.float64) if R_list else R # Predict pred = kf_predict(x, P, F_k, Q_k) x_pred.append(pred.x) P_pred.append(pred.P) F_used.append(F_k) # Update (if measurement available) z = measurements[k] if z is not None: z = np.asarray(z, dtype=np.float64).flatten() upd = kf_update(pred.x, pred.P, z, H_k, R_k) x = upd.x P = upd.P else: x = pred.x P = pred.P x_filt.append(x.copy()) P_filt.append(P.copy()) # Backward pass (RTS smoother) x_smooth: List[NDArray[np.floating]] = [None] * n_steps # type: ignore P_smooth: List[NDArray[np.floating]] = [None] * n_steps # type: ignore # Initialize with last filtered estimate x_smooth[n_steps - 1] = x_filt[n_steps - 1] P_smooth[n_steps - 1] = P_filt[n_steps - 1] # Backward recursion for k in range(n_steps - 2, -1, -1): x_s, P_s = kf_smooth( x_filt[k], P_filt[k], x_pred[k + 1], P_pred[k + 1], x_smooth[k + 1], P_smooth[k + 1], F_used[k], ) x_smooth[k] = x_s P_smooth[k] = P_s return RTSResult( x_smooth=x_smooth, P_smooth=P_smooth, x_filt=x_filt, P_filt=P_filt, )
[docs] def fixed_lag_smoother( x0: ArrayLike, P0: ArrayLike, measurements: List[ArrayLike], F: ArrayLike, Q: ArrayLike, H: ArrayLike, R: ArrayLike, lag: int = 5, ) -> FixedLagResult: """ Fixed-lag smoother. Produces smoothed estimates with a fixed delay of L time steps. At time k, outputs the smoothed estimate for time k-L using measurements up to time k. Parameters ---------- x0 : array_like Initial state estimate. P0 : array_like Initial state covariance. measurements : list of array_like List of measurements. F : array_like State transition matrix. Q : array_like Process noise covariance. H : array_like Measurement matrix. R : array_like Measurement noise covariance. lag : int, optional Smoothing lag in time steps (default: 5). Returns ------- result : FixedLagResult Smoothed estimates with specified lag. Examples -------- >>> import numpy as np >>> x0 = np.array([0.0, 0.0]) >>> P0 = np.eye(2) * 10.0 >>> F = np.array([[1, 1], [0, 1]]) >>> Q = np.eye(2) * 0.1 >>> H = np.array([[1, 0]]) >>> R = np.array([[1.0]]) >>> measurements = [np.array([i + 0.1*np.random.randn()]) for i in range(10)] >>> result = fixed_lag_smoother(x0, P0, measurements, F, Q, H, R, lag=3) >>> len(result.x_smooth) 10 Notes ----- The fixed-lag smoother is suitable for real-time applications where a delay of L steps is acceptable. It requires storing only the last L filter results instead of the entire sequence. At each time k: - Run forward filter to get x_{k|k}, P_{k|k} - Apply backward smoothing for L steps - Output x_{k-L|k}, P_{k-L|k} """ x0 = np.asarray(x0, dtype=np.float64).flatten() P0 = np.asarray(P0, dtype=np.float64) F = np.asarray(F, dtype=np.float64) Q = np.asarray(Q, dtype=np.float64) H = np.asarray(H, dtype=np.float64) R = np.asarray(R, dtype=np.float64) n_steps = len(measurements) lag = min(lag, n_steps) # Ring buffer for filter results (size = lag + 1) buffer_size = lag + 1 x_buffer: List[Optional[NDArray[np.floating]]] = [None] * buffer_size P_buffer: List[Optional[NDArray[np.floating]]] = [None] * buffer_size x_pred_buffer: List[Optional[NDArray[np.floating]]] = [None] * buffer_size P_pred_buffer: List[Optional[NDArray[np.floating]]] = [None] * buffer_size x_smooth: List[NDArray[np.floating]] = [] P_smooth: List[NDArray[np.floating]] = [] x = x0.copy() P = P0.copy() for k in range(n_steps): # Store index in ring buffer buf_idx = k % buffer_size # Predict pred = kf_predict(x, P, F, Q) x_pred_buffer[buf_idx] = pred.x.copy() P_pred_buffer[buf_idx] = pred.P.copy() # Update z = measurements[k] if z is not None: z = np.asarray(z, dtype=np.float64).flatten() upd = kf_update(pred.x, pred.P, z, H, R) x = upd.x P = upd.P else: x = pred.x P = pred.P x_buffer[buf_idx] = x.copy() P_buffer[buf_idx] = P.copy() # Compute smoothed estimate for time k-lag (if available) if k >= lag: # Run backward smoothing from k to k-lag x_s = x.copy() P_s = P.copy() for j in range(lag): back_k = k - j back_idx = back_k % buffer_size prev_idx = (back_k - 1) % buffer_size if x_buffer[prev_idx] is None: break x_s, P_s = kf_smooth( x_buffer[prev_idx], P_buffer[prev_idx], x_pred_buffer[back_idx], P_pred_buffer[back_idx], x_s, P_s, F, ) x_smooth.append(x_s) P_smooth.append(P_s) else: # Not enough data for full lag, use filtered estimate x_smooth.append(x.copy()) P_smooth.append(P.copy()) return FixedLagResult( x_smooth=x_smooth, P_smooth=P_smooth, lag=lag, )
[docs] def fixed_interval_smoother( x0: ArrayLike, P0: ArrayLike, measurements: List[ArrayLike], F: ArrayLike, Q: ArrayLike, H: ArrayLike, R: ArrayLike, ) -> RTSResult: """ Fixed-interval smoother (alias for RTS smoother). Produces optimal smoothed estimates over a fixed time interval using all measurements in that interval. Parameters ---------- x0 : array_like Initial state estimate. P0 : array_like Initial state covariance. measurements : list of array_like List of measurements in the interval. F : array_like State transition matrix. Q : array_like Process noise covariance. H : array_like Measurement matrix. R : array_like Measurement noise covariance. Returns ------- result : RTSResult Smoothed estimates over the interval. Examples -------- >>> import numpy as np >>> # 1D constant velocity model >>> x0 = np.array([0.0, 1.0]) # [position, velocity] >>> P0 = np.eye(2) * 5.0 >>> F = np.array([[1, 1], [0, 1]]) >>> Q = np.array([[0.25, 0.5], [0.5, 1.0]]) * 0.01 >>> H = np.array([[1, 0]]) >>> R = np.array([[0.5]]) >>> measurements = [np.array([0.9]), np.array([2.1]), np.array([3.0]), ... np.array([4.2]), np.array([4.9])] >>> result = fixed_interval_smoother(x0, P0, measurements, F, Q, H, R) >>> len(result.x_smooth) 5 >>> # Smoothed estimates have lower uncertainty >>> np.trace(result.P_smooth[2]) < np.trace(result.P_filt[2]) True See Also -------- rts_smoother : Full RTS smoother with time-varying parameters. Notes ----- This is equivalent to the RTS smoother but with a simpler interface for the common case of time-invariant system matrices. """ return rts_smoother(x0, P0, measurements, F, Q, H, R)
[docs] def two_filter_smoother( x0_fwd: ArrayLike, P0_fwd: ArrayLike, x0_bwd: ArrayLike, P0_bwd: ArrayLike, measurements: List[ArrayLike], F: ArrayLike, Q: ArrayLike, H: ArrayLike, R: ArrayLike, ) -> RTSResult: """ Two-filter smoother (Fraser-Potter form). Combines forward and backward filter passes to produce smoothed estimates. Useful for parallel implementation. Parameters ---------- x0_fwd : array_like Initial state for forward filter. P0_fwd : array_like Initial covariance for forward filter. x0_bwd : array_like Initial state for backward filter (typically diffuse). P0_bwd : array_like Initial covariance for backward filter (typically large). measurements : list of array_like List of measurements. F : array_like State transition matrix. Q : array_like Process noise covariance. H : array_like Measurement matrix. R : array_like Measurement noise covariance. Returns ------- result : RTSResult Smoothed estimates. Examples -------- >>> import numpy as np >>> # 1D position-velocity system >>> x0_fwd = np.array([0.0, 1.0]) >>> P0_fwd = np.eye(2) * 5.0 >>> # Backward filter starts with diffuse (high uncertainty) prior >>> x0_bwd = np.array([5.0, 1.0]) # approximate final state >>> P0_bwd = np.eye(2) * 100.0 # diffuse prior >>> F = np.array([[1, 1], [0, 1]]) >>> Q = np.eye(2) * 0.1 >>> H = np.array([[1, 0]]) >>> R = np.array([[1.0]]) >>> measurements = [np.array([0.8]), np.array([1.9]), np.array([3.1]), ... np.array([4.0]), np.array([5.2])] >>> result = two_filter_smoother(x0_fwd, P0_fwd, x0_bwd, P0_bwd, ... measurements, F, Q, H, R) >>> len(result.x_smooth) 5 Notes ----- The two-filter smoother runs two independent Kalman filters: - Forward filter: uses measurements z_1, ..., z_k - Backward filter: uses measurements z_N, ..., z_{k+1} The smoothed estimate combines both filters using information fusion: Y_k|N = Y_k|k^{fwd} + Y_k|k^{bwd} y_k|N = y_k|k^{fwd} + y_k|k^{bwd} This form is useful for parallel computation since both filters can run simultaneously. References ---------- .. [1] Fraser, D.C. and Potter, J.E., "The optimum linear smoother as a combination of two optimum linear filters", IEEE Trans. Automatic Control, 1969. """ x0_fwd = np.asarray(x0_fwd, dtype=np.float64).flatten() P0_fwd = np.asarray(P0_fwd, dtype=np.float64) x0_bwd = np.asarray(x0_bwd, dtype=np.float64).flatten() P0_bwd = np.asarray(P0_bwd, dtype=np.float64) F = np.asarray(F, dtype=np.float64) Q = np.asarray(Q, dtype=np.float64) H = np.asarray(H, dtype=np.float64) R = np.asarray(R, dtype=np.float64) n_steps = len(measurements) # Forward filter pass x_fwd: List[NDArray[np.floating]] = [] P_fwd: List[NDArray[np.floating]] = [] x = x0_fwd.copy() P = P0_fwd.copy() for k in range(n_steps): pred = kf_predict(x, P, F, Q) z = measurements[k] if z is not None: z = np.asarray(z, dtype=np.float64).flatten() upd = kf_update(pred.x, pred.P, z, H, R) x = upd.x P = upd.P else: x = pred.x P = pred.P x_fwd.append(x.copy()) P_fwd.append(P.copy()) # Backward filter pass (using inverse transition) x_bwd: List[NDArray[np.floating]] = [None] * n_steps # type: ignore P_bwd: List[NDArray[np.floating]] = [None] * n_steps # type: ignore x = x0_bwd.copy() P = P0_bwd.copy() # Inverse transition for backward filter try: F_inv = np.linalg.inv(F) except np.linalg.LinAlgError: # Fall back to pseudo-inverse F_inv = np.linalg.pinv(F) for k in range(n_steps - 1, -1, -1): # Backward prediction (using inverse dynamics) x_pred = F_inv @ x P_pred = F_inv @ P @ F_inv.T + F_inv @ Q @ F_inv.T # Backward update z = measurements[k] if z is not None: z = np.asarray(z, dtype=np.float64).flatten() upd = kf_update(x_pred, P_pred, z, H, R) x = upd.x P = upd.P else: x = x_pred P = P_pred x_bwd[k] = x.copy() P_bwd[k] = P.copy() # Fuse forward and backward estimates using information form x_smooth: List[NDArray[np.floating]] = [] P_smooth: List[NDArray[np.floating]] = [] for k in range(n_steps): # Convert to information form try: Y_fwd = np.linalg.inv(P_fwd[k]) y_fwd = Y_fwd @ x_fwd[k] except np.linalg.LinAlgError: Y_fwd = np.linalg.pinv(P_fwd[k]) y_fwd = Y_fwd @ x_fwd[k] try: Y_bwd = np.linalg.inv(P_bwd[k]) y_bwd = Y_bwd @ x_bwd[k] except np.linalg.LinAlgError: Y_bwd = np.linalg.pinv(P_bwd[k]) y_bwd = Y_bwd @ x_bwd[k] # Fuse Y_smooth = Y_fwd + Y_bwd y_smooth = y_fwd + y_bwd # Convert back to state form try: P_s = np.linalg.inv(Y_smooth) except np.linalg.LinAlgError: P_s = np.linalg.pinv(Y_smooth) x_s = P_s @ y_smooth # Ensure symmetry P_s = (P_s + P_s.T) / 2 x_smooth.append(x_s) P_smooth.append(P_s) return RTSResult( x_smooth=x_smooth, P_smooth=P_smooth, x_filt=x_fwd, P_filt=P_fwd, )
[docs] def rts_smoother_single_step( x_filt: ArrayLike, P_filt: ArrayLike, x_pred_next: ArrayLike, P_pred_next: ArrayLike, x_smooth_next: ArrayLike, P_smooth_next: ArrayLike, F: ArrayLike, ) -> SmoothedState: """ Single backward step of RTS smoother. This is a convenience wrapper around kf_smooth that returns a SmoothedState named tuple. Parameters ---------- x_filt : array_like Filtered state at current time. P_filt : array_like Filtered covariance at current time. x_pred_next : array_like Predicted state at next time. P_pred_next : array_like Predicted covariance at next time. x_smooth_next : array_like Smoothed state at next time. P_smooth_next : array_like Smoothed covariance at next time. F : array_like State transition matrix. Returns ------- result : SmoothedState Smoothed state and covariance at current time. Examples -------- >>> import numpy as np >>> # After running forward filter and getting smoothed estimate at k+1 >>> x_filt = np.array([1.0, 0.5]) # filtered state at k >>> P_filt = np.eye(2) * 0.5 # filtered covariance at k >>> x_pred_next = np.array([1.5, 0.5]) # predicted state at k+1 >>> P_pred_next = np.eye(2) * 0.6 # predicted covariance at k+1 >>> x_smooth_next = np.array([1.4, 0.6]) # smoothed state at k+1 >>> P_smooth_next = np.eye(2) * 0.3 # smoothed covariance at k+1 >>> F = np.array([[1, 1], [0, 1]]) >>> result = rts_smoother_single_step(x_filt, P_filt, x_pred_next, ... P_pred_next, x_smooth_next, ... P_smooth_next, F) >>> result.x.shape (2,) >>> result.P.shape (2, 2) """ x_s, P_s = kf_smooth( x_filt, P_filt, x_pred_next, P_pred_next, x_smooth_next, P_smooth_next, F ) return SmoothedState(x=x_s, P=P_s)
__all__ = [ "SmoothedState", "RTSResult", "FixedLagResult", "rts_smoother", "fixed_lag_smoother", "fixed_interval_smoother", "two_filter_smoother", "rts_smoother_single_step", ]