Smoothing Algorithms & Offline Estimation
Comprehensive guide to batch and fixed-lag smoothing for improved state estimation using forward and backward passes.
Smoothing differs from filtering: while filters use only past measurements, smoothers use all available measurements (past and future) for better accuracy. Ideal for offline analysis, trajectory refinement, and post-processing.
Table of Contents:
Smoothing Fundamentals
Rauch-Tsch iersky-Striebel (RTS) Smoother
Fixed-Lag Smoothing
Two-Filter Approach
Fixed-Interval Smoothing
Performance Comparison
Practical Implementation
Applications
Common Issues & Solutions
Best Practices
Smoothing Fundamentals
Filtering vs Smoothing:
Filter: $p(x_k | z_{1:k})$ - estimate at time $k$ using measurements up to $k$
Smoother: $p(x_k | z_{1:N})$ - estimate at time $k$ using all $N$ measurements
Why Smoothing?
Better accuracy: ~30-50% lower RMS error vs filtering
No real-time constraint: can use all data
Ideal for trajectory analysis and post-processing
Symmetric uncertainty (less bias than filtering)
Trade-off:
Filtering: Real-time, lower latency
Smoothing: Offline only, requires complete dataset
Mathematical Concept:
Smoothing likelihood: $$p(x_k|z_{1:N}) propto p(z_{k+1:N}|x_k) p(x_k|z_{1:k})$$
Where: - $p(x_k|z_{1:k})$: forward filter estimate - $p(z_{k+1:N}|x_k)$: backward filter estimate
Rauch-Tschiersky-Striebel (RTS) Smoother
Algorithm: Forward-Backward Smoothing
The RTS smoother combines forward (filtering) and backward (smoothing) passes.
Step 1: Forward Pass (Standard Kalman Filter)
def forward_pass(measurements, F, H, Q, R, x0, P0):
"""
Standard Kalman filter forward pass.
Args:
measurements: N × m array of measurements
F: n × n state transition matrix
H: m × n measurement matrix
Q: n × n process noise covariance
R: m × m measurement noise covariance
x0: initial state estimate
P0: initial covariance
Returns:
x_f: N × n forward estimates
P_f: N × n × n forward covariances
x_pred: N × n predicted states
P_pred: N × n × n predicted covariances
"""
N = len(measurements)
n = len(x0)
x_f = np.zeros((N, n))
P_f = np.zeros((N, n, n))
x_pred = np.zeros((N, n))
P_pred = np.zeros((N, n, n))
x = x0.copy()
P = P0.copy()
for k in range(N):
# Predict
x_pred[k] = F @ x
P_pred[k] = F @ P @ F.T + Q
# Update
y = measurements[k] - H @ x_pred[k]
S = H @ P_pred[k] @ H.T + R
K = P_pred[k] @ H.T @ np.linalg.inv(S)
x = x_pred[k] + K @ y
P = (np.eye(n) - K @ H) @ P_pred[k]
x_f[k] = x
P_f[k] = P
return x_f, P_f, x_pred, P_pred
Step 2: Backward Pass (RTS Smoother)
def rts_backward_pass(x_f, P_f, x_pred, P_pred, F):
"""
RTS backward smoothing pass.
Args:
x_f: N × n forward filter estimates
P_f: N × n × n forward covariances
x_pred: N × n predicted states
P_pred: N × n × n predicted covariances
F: state transition matrix
Returns:
x_s: N × n smoothed estimates (RTS smoother)
P_s: N × n × n smoothed covariances
"""
N = len(x_f)
n = x_f.shape[1]
x_s = np.zeros_like(x_f)
P_s = np.zeros_like(P_f)
# Initialize: last estimate is same as filtering
x_s[-1] = x_f[-1]
P_s[-1] = P_f[-1]
# Backward pass
for k in range(N - 2, -1, -1):
# Smoother gain
D = P_f[k] @ F.T @ np.linalg.inv(P_pred[k+1])
# Smoothed estimate
x_s[k] = x_f[k] + D @ (x_s[k+1] - x_pred[k+1])
# Smoothed covariance
P_s[k] = P_f[k] + D @ (P_s[k+1] - P_pred[k+1]) @ D.T
return x_s, P_s
Complete RTS Smoother:
class RTSSmoother:
"""Rauch-Tschiersky-Striebel backward smoother."""
def __init__(self, F, H, Q, R):
"""
Args:
F: state transition matrix
H: measurement matrix
Q: process noise covariance
R: measurement noise covariance
"""
self.F = F
self.H = H
self.Q = Q
self.R = R
def smooth(self, measurements, x0, P0):
"""
Perform complete RTS smoothing.
Returns:
x_smooth: smoothed trajectory
P_smooth: smoothed covariances
x_filter: for comparison (forward pass only)
P_filter: for comparison
"""
# Forward pass
x_f, P_f, x_pred, P_pred = forward_pass(
measurements, self.F, self.H, self.Q, self.R, x0, P0
)
# Backward pass
x_s, P_s = rts_backward_pass(x_f, P_f, x_pred, P_pred, self.F)
return x_s, P_s, x_f, P_f
def improvement(self, P_filter, P_smooth):
"""Quantify smoothing improvement."""
# Ratio of filter to smoother uncertainty
ratio = np.trace(P_filter) / np.trace(P_smooth)
percent_improvement = (1 - 1/ratio) * 100
return percent_improvement
Practical Example:
def example_rts_smoothing():
"""Simple 1D constant velocity model."""
# System
dt = 0.1
F = np.array([[1, dt], [0, 1]]) # state transition
H = np.array([[1, 0]]) # measure position only
Q = np.eye(2) * 0.01 # process noise
R = np.array([[1.0]]) # measurement noise
# Measurement data (noisy observations)
true_pos = np.linspace(0, 10, 100)
measurements = true_pos + np.random.normal(0, 1, 100)
# Initial estimates
x0 = np.array([measurements[0], 0])
P0 = np.eye(2)
# Smooth
smoother = RTSSmoother(F, H, Q, R)
x_smooth, P_smooth, x_filter, P_filter = smoother.smooth(
measurements[:, np.newaxis], x0, P0
)
# Compare
improvement = smoother.improvement(P_filter, P_smooth)
print(f"Smoothing improvement: {improvement:.1f}%")
return x_smooth, P_smooth, x_filter, P_filter
Fixed-Lag Smoothing
Real-Time Smoothing with Bounded Delay
Fixed-lag smoother maintains a sliding window of past estimates, providing lower latency than batch smoothing.
class FixedLagSmoother:
"""
Fixed-lag smoother: combines current filter update with lag-step
past estimates for near-real-time smoothing.
"""
def __init__(self, F, H, Q, R, lag=5):
"""
Args:
lag: number of timesteps to remember (fixed lag)
"""
self.F = F
self.H = H
self.Q = Q
self.R = R
self.lag = lag
# Buffers
self.history_x = [] # state estimates
self.history_P = [] # covariances
self.history_x_pred = []
self.history_P_pred = []
def predict_update(self, measurement):
"""One step: predict, update current, smooth past."""
n = self.F.shape[0]
if len(self.history_x) == 0:
# Initialize
x = np.zeros(n)
P = np.eye(n)
else:
# Predict from last
x = self.F @ self.history_x[-1]
P = self.F @ self.history_P[-1] @ self.F.T + self.Q
# Store predicted
self.history_x_pred.append(x.copy())
self.history_P_pred.append(P.copy())
# Update with measurement
y = measurement - self.H @ x
S = self.H @ P @ self.H.T + self.R
K = P @ self.H.T @ np.linalg.inv(S)
x = x + K @ y
P = (np.eye(n) - K @ self.H) @ P
# Store updated
self.history_x.append(x)
self.history_P.append(P)
# Smooth lag steps back
if len(self.history_x) > self.lag:
self._smooth_backward(lag_steps=self.lag)
# Return current estimate (already smoothed if possible)
return x, P
def _smooth_backward(self, lag_steps):
"""Smooth past estimates using RTS-like update."""
# Smooth back lag_steps
idx = len(self.history_x) - lag_steps - 1
if idx < 0:
return
# Multiple RTS iterations for the lag
for _ in range(lag_steps):
if idx < 0:
break
# RTS smoother gain
D = (self.history_P[idx] @ self.F.T @
np.linalg.inv(self.history_P_pred[idx+1]))
# Update laggy estimate
self.history_x[idx] = (self.history_x[idx] +
D @ (self.history_x[idx+1] - self.history_x_pred[idx+1]))
self.history_P[idx] = (self.history_P[idx] +
D @ (self.history_P[idx+1] - self.history_P_pred[idx+1]) @ D.T)
idx -= 1
def get_delayed_estimate(self, steps_back):
"""Get smoothed estimate from steps_back ago."""
if steps_back < len(self.history_x):
idx = len(self.history_x) - steps_back - 1
return self.history_x[idx], self.history_P[idx]
return None, None
Two-Filter Approach
Forward and Backward Filtering
Alternative to RTS: run forward and backward filters independently, then combine.
class TwoFilterSmoother:
"""Combine forward and backward filters for smoothing."""
def __init__(self, F, H, Q, R):
self.F = F
self.H = H
self.Q = Q
self.R = R
def backward_filter(self, measurements, x0_back, P0_back):
"""
Run Kalman filter backward (in reverse time).
Reverse dynamics: x_{k-1} = F^{-T} x_k
"""
N = len(measurements)
n = len(x0_back)
x_b = np.zeros((N, n))
P_b = np.zeros((N, n, n))
F_inv = np.linalg.inv(self.F)
x = x0_back.copy()
P = P0_back.copy()
# Process in reverse
for k in range(N - 1, -1, -1):
# Update at current time
y = measurements[k] - self.H @ x
S = self.H @ P @ self.H.T + self.R
K = P @ self.H.T @ np.linalg.inv(S)
x = x + K @ y
P = (np.eye(n) - K @ self.H) @ P
x_b[k] = x
P_b[k] = P
# Predict backward
if k > 0:
x = F_inv.T @ x
P = F_inv.T @ P @ F_inv + self.Q
return x_b, P_b
def combine_estimates(self, x_f, P_f, x_b, P_b):
"""
Combine forward and backward filter estimates.
Information filter form: combine inverses of covariances.
"""
N = len(x_f)
x_combined = np.zeros_like(x_f)
P_combined = np.zeros_like(P_f)
for k in range(N):
# Information form combination
P_f_inv = np.linalg.inv(P_f[k])
P_b_inv = np.linalg.inv(P_b[k])
# Combined information and state
P_combined[k] = np.linalg.inv(P_f_inv + P_b_inv)
x_combined[k] = P_combined[k] @ (
P_f_inv @ x_f[k] + P_b_inv @ x_b[k]
)
return x_combined, P_combined
def smooth(self, measurements, x0_f, P0_f, x0_b, P0_b):
"""Complete two-filter smoothing."""
# Forward filter
x_f, P_f, _, _ = forward_pass(
measurements, self.F, self.H, self.Q, self.R, x0_f, P0_f
)
# Backward filter
x_b, P_b = self.backward_filter(measurements, x0_b, P0_b)
# Combine
x_smooth, P_smooth = self.combine_estimates(x_f, P_f, x_b, P_b)
return x_smooth, P_smooth
Fixed-Interval Smoothing
Batch Smoothing with Fixed Time Window
Process complete measurement epochs, then refine.
def fixed_interval_smooth(measurements, batch_size, overlap=0):
"""
Smooth in batches with optional overlap.
Args:
measurements: all measurements
batch_size: number of measurements per batch
overlap: number of overlapping measurements between batches
"""
N = len(measurements)
smoothed_trajectory = np.zeros_like(measurements)
stride = batch_size - overlap
for start_idx in range(0, N, stride):
end_idx = min(start_idx + batch_size, N)
batch = measurements[start_idx:end_idx]
if len(batch) < 2:
continue
# Smooth this batch
smoother = RTSSmoother(F, H, Q, R)
x_smooth, _ = smoother.forward_backward(batch)
# Store (with handling for overlap)
smoothed_trajectory[start_idx:end_idx] = x_smooth
return smoothed_trajectory
Performance Comparison
Accuracy Metrics:
def compare_filter_vs_smoother(x_true, x_filter, x_smooth, P_filter, P_smooth):
"""Quantify smoothing improvement."""
# RMS errors
error_filter = np.sqrt(np.mean((x_filter - x_true)**2, axis=0))
error_smooth = np.sqrt(np.mean((x_smooth - x_true)**2, axis=0))
# Uncertainty reduction
uncertainty_filter = np.sqrt(np.mean(np.diagonal(P_filter, axis1=1, axis2=2)[:, 0]))
uncertainty_smooth = np.sqrt(np.mean(np.diagonal(P_smooth, axis1=1, axis2=2)[:, 0]))
print("=== Filter vs Smoother Comparison ===")
print(f"Filter RMS error: {error_filter[0]:.4f}")
print(f"Smoother RMS error: {error_smooth[0]:.4f}")
print(f"Improvement: {(1 - error_smooth[0]/error_filter[0])*100:.1f}%")
print()
print(f"Filter uncertainty: {uncertainty_filter:.4f}")
print(f"Smoother uncertainty: {uncertainty_smooth:.4f}")
print(f"Reduction: {(1 - uncertainty_smooth/uncertainty_filter)*100:.1f}%")
Practical Implementation
Complete Smoothing System:
class TrajectorySmoother:
"""Full trajectory smoothing pipeline."""
def __init__(self, process_model, measurement_model, process_noise,
measurement_noise):
self.F = process_model
self.H = measurement_model
self.Q = process_noise
self.R = measurement_noise
def smooth_trajectory(self, measurements, x0, P0):
"""Smooth complete trajectory."""
smoother = RTSSmoother(self.F, self.H, self.Q, self.R)
x_smooth, P_smooth, x_filter, P_filter = smoother.smooth(
measurements, x0, P0
)
return {
'smoothed': x_smooth,
'filtered': x_filter,
'smoother_cov': P_smooth,
'filter_cov': P_filter
}
def estimate_uncertainties(self, P_smooth):
"""Extract uncertainty bounds."""
std_devs = np.sqrt(np.diagonal(P_smooth, axis1=1, axis2=2))
return std_devs
def compute_likelihoods(self, measurements, x_smooth, P_smooth):
"""Evaluate measurement likelihoods at smoothed trajectory."""
N = len(measurements)
likelihoods = np.zeros(N)
for k in range(N):
# Predicted measurement
z_pred = self.H @ x_smooth[k]
# Innovation
y = measurements[k] - z_pred
# Innovation covariance
S = self.H @ P_smooth[k] @ self.H.T + self.R
# Gaussian likelihood
likelihoods[k] = np.exp(-0.5 * y @ np.linalg.inv(S) @ y)
return likelihoods
Applications
1. Post-Mission Analysis
Analyze complete flight/mission after recording:
def analyze_mission_post_flight(telemetry_log):
"""Process complete mission using batch smoothing."""
measurements = telemetry_log['measurements']
timestamps = telemetry_log['timestamps']
smoother = RTSSmoother(F, H, Q, R)
x_smooth, P_smooth = smoother.smooth(measurements, x0, P0)
# Report
report = {
'trajectory': x_smooth,
'uncertainties': np.sqrt(np.diagonal(P_smooth, axis1=1, axis2=2)),
'timestamps': timestamps,
'filter_vs_smooth_improvement': compute_improvement(P_filter, P_smooth)
}
return report
2. Multi-Sensor Fusion with Latency
Resync delayed measurements:
def fuse_delayed_measurements(primary_measurements, delayed_measurements_with_time,
delay_samples):
"""
Insert delayed measurement into smoothed trajectory.
Smoother handles time-correlation naturally.
"""
smoother = RTSSmoother(F, H, Q, R)
x_smooth, P_smooth = smoother.smooth(primary_measurements, x0, P0)
# Align delayed measurement
insertion_idx = len(primary_measurements) - delay_samples
proposed_state = x_smooth[insertion_idx]
return proposed_state
3. GPS/INS Trajectory Refinement
Smooth GPS-corrected inertial trajectory:
def refine_inertial_trajectory(ins_trajectory, gps_fixes, gps_times):
"""
INS provides high-rate, GPS provides occasional corrections.
Smoother integrates both optimally.
"""
# Blend measurements
measurements = np.zeros_like(ins_trajectory)
for i, (time, fix) in enumerate(zip(gps_times, gps_fixes)):
idx = int(time / dt)
measurements[idx] = fix
smoother = RTSSmoother(F_ins, H_gps, Q_ins, R_gps)
x_refined, P_refined = smoother.smooth(measurements, x0, P0)
return x_refined
Common Issues & Solutions
Problem: Backward Pass Diverges
Solution: Check matrix invertibility in RTS gain:
def stable_rts_gain(P_f, P_pred, F):
"""Compute RTS gain with numerical stability."""
try:
# Standard RTS
D = P_f @ F.T @ np.linalg.inv(P_pred)
except np.linalg.LinAlgError:
# Use pseudo-inverse if singular
D = P_f @ F.T @ np.linalg.pinv(P_pred)
return D
Problem: Backward Covariance Grows
Indicates filter was too optimistic (P too small). Solutions:
Increase process noise Q
Reduce measurement noise R (if overestimated)
Use Joseph form covariance update
def joseph_covariance_update(P, K, H):
"""Joseph stabilized covariance (numerically stable)."""
n = P.shape[0]
I_KH = np.eye(n) - K @ H
return I_KH @ P @ I_KH.T + K @ R @ K.T
Problem: Smoother Solution Violates Constraints
Use constrained smoothing:
def constrained_smooth(x_filter, P_filter, constraints):
"""Project smoothed estimates into constraint set."""
# Quadratic programming to find closest feasible point
from scipy.optimize import minimize
def constraint_cost(x):
# Minimize distance from filter estimate
return np.sum((x - x_filter)**2)
# ... set up constraints, solve
Best Practices
Batch Smoothing (Offline Analysis) - Use RTS smoother for complete datasets - Provides optimal estimates given all data - ~30-50% accuracy improvement over filtering
Fixed-Lag (Near Real-Time) - When you need low latency but can wait for lag - Typical lag: 5-10 timesteps - Good balance: ~20% improvement, low delay
Monitor Consistency - Check that P_smooth < P_filter everywhere (if not, filter issue) - Verify backward pass using simulated data - Compare forward-only vs full smoother
Memory Efficient - Don’t store full history; process in batches - Use fixed-interval smoothing for long missions - Overlap batches to avoid edge artifacts
Numerical Stability - Use Joseph covariance form - Check matrix condition numbers - Use pseudo-inverse for near-singular matrices
Parameter Tuning - Validate process/measurement noise with smoothed residuals - Residuals should be white noise (check autocorrelation) - Use smoother to detect parameter mismatches
Troubleshooting
Diagnostic Checklist:
def diagnose_smoother(measurements, x_filter, x_smooth, P_filter, P_smooth):
"""Check smoother quality."""
issues = []
# Check 1: P_smooth < P_filter
ratio = np.trace(P_filter) / np.trace(P_smooth)
if ratio < 1.1:
issues.append("Minimal improvement - check data quality")
# Check 2: Backward covariance growth
if P_smooth[0, 0, 0] > P_smooth[-1, 0, 0]:
issues.append("Backward covariance grows - check Q/R tuning")
# Check 3: Smoothed state sanity
max_vel = np.max(np.abs(x_smooth[:, 1]))
if max_vel > 1000:
issues.append("Unrealistic velocities in smoother")
# Check 4: Residual whiteness
residuals = measurements - (H @ x_smooth.T).T
acf = np.correlate(residuals[:, 0], residuals[:, 0], mode='same')
if acf[len(acf)//2 + 1] > 0.3 * acf[len(acf)//2]:
issues.append("Residuals auto-correlated - check measurements")
return issues if issues else ["✓ Smoother diagnostics OK"]
See Also
Common Use Cases & Recipes - Ready-to-use smoothing implementations
Kalman Filter Tuning Guide - Parameter selection for smoothers
Troubleshooting Guide - Smoothing issues and debugging
API: tcl.smoothers module for RTS, fixed-lag implementations