Source code for pytcl.dynamic_estimation.rbpf

"""Rao-Blackwellized Particle Filter (RBPF).

The RBPF partitions the state into a nonlinear part (handled by particles) and
a linear part (handled by Kalman filters for each particle). This provides
better estimation quality than plain particle filters for systems with both
nonlinear and linear dynamics.

The algorithm:
1. Maintain N particles, each with:
   - Position in nonlinear state space (y)
   - Kalman filter state (x, P) for linear subspace
   - Weight w based on measurement likelihood
2. For each time step:
   - Predict: Propagate nonlinear particles, update KF for each
   - Update: Compute measurement likelihood, adapt weights
   - Resample: When effective sample size is low, draw new particles
   - Merge: Combine nearby particles to reduce variance

References:
- Doucet et al., "On Sequential Monte Carlo Sampling with Adaptive Weights"
  (Doucet & Tadic, 2003)
- Andrieu et al., "Particle Methods for Change Detection, System Identification"
  (IEEE SPM, 2004)
"""

from typing import Any, Callable, NamedTuple

import numpy as np
from numpy.typing import NDArray

from pytcl.dynamic_estimation.kalman.extended import ekf_predict, ekf_update


[docs] class RBPFParticle(NamedTuple): """Rao-Blackwellized particle with nonlinear and linear components. Parameters ---------- y : NDArray Nonlinear state component (propagated by particle transition) x : NDArray Linear state component (estimated by Kalman filter for this particle) P : NDArray Covariance of linear state component w : float Particle weight (typically normalized to sum to 1) """ y: NDArray[Any] x: NDArray[Any] P: NDArray[Any] w: float
[docs] class RBPFFilter: """Rao-Blackwellized Particle Filter. Combines particle filtering for nonlinear states with Kalman filtering for conditionally-linear states. For a system partitioned as: - y: nonlinear state (particles) - x: linear state given y (Kalman filter) Attributes ---------- particles : list[RBPFParticle] Current particles with nonlinear/linear states and weights max_particles : int Maximum number of particles (default 100) resample_threshold : float Resample when N_eff < resample_threshold * N (default 0.5) merge_threshold : float Merge nearby particles when KL divergence < threshold (default 0.5) """
[docs] def __init__( self, max_particles: int = 100, resample_threshold: float = 0.5, merge_threshold: float = 0.5, ): """Initialize RBPF. Parameters ---------- max_particles : int Maximum number of particles to maintain resample_threshold : float Resample threshold as fraction of max particles merge_threshold : float KL divergence threshold for merging particles """ self.particles: list[RBPFParticle] = [] self.max_particles = max_particles self.resample_threshold = resample_threshold self.merge_threshold = merge_threshold
[docs] def initialize( self, y0: NDArray[Any], x0: NDArray[Any], P0: NDArray[Any], num_particles: int = 100, ) -> None: """Initialize particles. Parameters ---------- y0 : NDArray Initial nonlinear state (broadcasted to all particles) x0 : NDArray Initial linear state (broadcasted to all particles) P0 : NDArray Initial linear state covariance (same for all particles) num_particles : int Number of particles to initialize """ self.particles = [] weight = 1.0 / num_particles # Add small noise to particle y values to break ties ny = y0.shape[0] for i in range(num_particles): # Nonlinear component: small perturbation around y0 y = y0 + np.random.randn(ny) * 1e-6 # Linear component: same for all particles (improved by update) x = x0.copy() P = P0.copy() particle = RBPFParticle(y=y, x=x, P=P, w=weight) self.particles.append(particle)
[docs] def predict( self, g: Callable[[NDArray[Any]], NDArray[Any]], G: NDArray[Any], Qy: NDArray[Any], f: Callable[[NDArray[Any], NDArray[Any]], NDArray[Any]], F: NDArray[Any], Qx: NDArray[Any], ) -> None: """Predict step: propagate particles and linear states. Parameters ---------- g : callable Nonlinear state transition: y[k+1] = g(y[k]) G : NDArray Jacobian of g with respect to y (for covariance propagation) Qy : NDArray Process noise covariance for nonlinear state f : callable Linear transition: x[k+1] = f(x[k], y[k]) F : NDArray Jacobian matrix dF/dx (linearized around y) Qx : NDArray Process noise covariance for linear state """ new_particles = [] for particle in self.particles: # Predict nonlinear component y_pred = g(particle.y) # Add process noise y_pred = y_pred + np.random.multivariate_normal( np.zeros(y_pred.shape[0]), Qy ) # Create wrapper for linear dynamics with current y_pred def f_wrapper(x: NDArray[Any]) -> NDArray[Any]: return f(x, y_pred) # Predict linear component using EKF pred = ekf_predict(particle.x, particle.P, f_wrapper, F, Qx) new_particle = RBPFParticle( y=y_pred, x=pred.x, P=pred.P, w=particle.w, ) new_particles.append(new_particle) self.particles = new_particles
[docs] def update( self, z: NDArray[Any], h: Callable[[NDArray[Any], NDArray[Any]], NDArray[Any]], H: NDArray[Any], R: NDArray[Any], ) -> None: """Update step: adapt particle weights based on measurement. Parameters ---------- z : NDArray Measurement vector h : callable Measurement function: z = h(x, y) H : NDArray Jacobian matrix dH/dx (measurement sensitivity) R : NDArray Measurement noise covariance """ weights = np.zeros(len(self.particles)) new_particles = [] for i, particle in enumerate(self.particles): # Create wrapper for measurement function with current y def h_wrapper(x: NDArray[Any]) -> NDArray[Any]: return h(x, particle.y) # Update linear component (Kalman update) upd = ekf_update(particle.x, particle.P, z, h_wrapper, H, R) # Weight: measurement likelihood from Kalman update likelihood = upd.likelihood # Unnormalized weight weights[i] = particle.w * likelihood new_particle = RBPFParticle( y=particle.y, x=upd.x, P=upd.P, w=particle.w, # Will renormalize below ) new_particles.append(new_particle) # Normalize weights w_sum = np.sum(weights) if w_sum > 0: weights = weights / w_sum else: # Uniform weights if all likelihoods are zero weights = np.ones(len(self.particles)) / len(self.particles) # Update particles with new weights self.particles = [ RBPFParticle(y=p.y, x=p.x, P=p.P, w=w) for p, w in zip(new_particles, weights) ] # Resample if needed self._resample_if_needed() # Merge if too many particles self._merge_particles()
[docs] def estimate(self) -> tuple[NDArray[Any], NDArray[Any], NDArray[Any]]: """Estimate state as weighted mean and covariance. Returns ------- y_est : NDArray Weighted mean of nonlinear components x_est : NDArray Weighted mean of linear components P_est : NDArray Weighted covariance (includes mixture and linear uncertainties) """ if not self.particles: raise ValueError("No particles to estimate") weights = np.array([p.w for p in self.particles]) # Nonlinear state: weighted mean y_particles = np.array([p.y for p in self.particles]) y_est = np.average(y_particles, axis=0, weights=weights) # Linear state: weighted mean and covariance x_particles = np.array([p.x for p in self.particles]) x_est = np.average(x_particles, axis=0, weights=weights) # Covariance: E[(x - x_est)(x - x_est)^T] = E[Cov[x|y]] + Cov[E[x|y]] # = weighted_mean(P) + weighted_cov(x) # Weighted mean of covariances P_mean = np.zeros((self.particles[0].P.shape[0], self.particles[0].P.shape[1])) for p in self.particles: P_mean += p.w * p.P # Weighted covariance of means P_cov = np.zeros((self.particles[0].P.shape[0], self.particles[0].P.shape[1])) for p in self.particles: dx = p.x - x_est P_cov += p.w * np.outer(dx, dx) P_est = P_mean + P_cov return y_est, x_est, P_est
[docs] def get_particles(self) -> list[RBPFParticle]: """Get current particles. Returns ------- list[RBPFParticle] Current particle list """ return self.particles.copy()
def _resample_if_needed(self) -> None: """Resample particles if effective sample size is too low. Uses systematic resampling to reduce variance. """ weights = np.array([p.w for p in self.particles]) # Effective sample size N_eff = 1.0 / np.sum(weights**2) threshold = self.resample_threshold * len(self.particles) if N_eff < threshold: self._systematic_resample() def _systematic_resample(self) -> None: """Perform systematic resampling.""" weights = np.array([p.w for p in self.particles]) n = len(self.particles) # Cumulative sum cs = np.cumsum(weights) # Resample indices indices = [] u = np.random.uniform(0, 1.0 / n) j = 0 for i in range(n): while u > cs[j]: j += 1 indices.append(j) u += 1.0 / n # Create new particles with uniform weights new_particles = [] weight = 1.0 / n for idx in indices: p = self.particles[idx] new_particles.append( RBPFParticle(y=p.y.copy(), x=p.x.copy(), P=p.P.copy(), w=weight) ) self.particles = new_particles def _merge_particles(self) -> None: """Merge nearby particles to reduce variance.""" if len(self.particles) <= 1: return # Find closest pair by KL divergence max_iter = len(self.particles) - self.max_particles for _ in range(max_iter): if len(self.particles) <= self.max_particles: break best_div = np.inf best_i, best_j = 0, 1 # Find closest pair for i in range(len(self.particles)): for j in range(i + 1, len(self.particles)): div = self._kl_divergence( self.particles[i].P, self.particles[j].P, self.particles[i].x, self.particles[j].x, ) if div < best_div: best_div = div best_i, best_j = i, j if best_div < self.merge_threshold: # Merge particles i and j p_i = self.particles[best_i] p_j = self.particles[best_j] # Weighted merge w_total = p_i.w + p_j.w w_i = p_i.w / w_total w_j = p_j.w / w_total # Merged nonlinear state y_merged = w_i * p_i.y + w_j * p_j.y # Merged linear state and covariance x_merged = w_i * p_i.x + w_j * p_j.x # Merged covariance P_merged = ( w_i * p_i.P + w_j * p_j.P + w_i * np.outer(p_i.x - x_merged, p_i.x - x_merged) + w_j * np.outer(p_j.x - x_merged, p_j.x - x_merged) ) merged_particle = RBPFParticle( y=y_merged, x=x_merged, P=P_merged, w=w_total ) # Replace particles if best_i < best_j: self.particles[best_i] = merged_particle self.particles.pop(best_j) else: self.particles[best_j] = merged_particle self.particles.pop(best_i) else: break # Renormalize weights w_sum = sum(p.w for p in self.particles) if w_sum > 0: self.particles = [ RBPFParticle(y=p.y, x=p.x, P=p.P, w=p.w / w_sum) for p in self.particles ] @staticmethod def _kl_divergence( P1: NDArray[Any], P2: NDArray[Any], x1: NDArray[Any], x2: NDArray[Any] ) -> float: """Compute KL divergence between two Gaussians. KL(N(x1, P1) || N(x2, P2)) = 0.5 * [ trace(P2^-1 @ P1) + (x2-x1)^T @ P2^-1 @ (x2-x1) - n + ln(|P2|/|P1|) ] Parameters ---------- P1 : NDArray Covariance of first Gaussian P2 : NDArray Covariance of second Gaussian x1 : NDArray Mean of first Gaussian x2 : NDArray Mean of second Gaussian Returns ------- float KL divergence (always >= 0) """ try: P2_inv = np.linalg.inv(P2) n = P1.shape[0] # Trace term trace_term = np.trace(P2_inv @ P1) # Mean difference term dx = x2 - x1 mean_term = dx @ P2_inv @ dx # Determinant term det_term = np.linalg.slogdet(P2)[1] - np.linalg.slogdet(P1)[1] kl = 0.5 * (trace_term + mean_term - n + det_term) return float(np.maximum(kl, 0.0)) # Ensure non-negative except (np.linalg.LinAlgError, ValueError): return np.inf
# Convenience functions for functional interface
[docs] def rbpf_predict( particles: list[RBPFParticle], g: Callable[[NDArray[Any]], NDArray[Any]], G: NDArray[Any], Qy: NDArray[Any], f: Callable[[NDArray[Any], NDArray[Any]], NDArray[Any]], F: NDArray[Any], Qx: NDArray[Any], ) -> list[RBPFParticle]: """Predict step for RBPF particles. Parameters ---------- particles : list[RBPFParticle] Current particles g : callable Nonlinear state transition G : NDArray Jacobian of nonlinear transition Qy : NDArray Process noise covariance for nonlinear state f : callable Linear state transition F : NDArray Jacobian of linear transition Qx : NDArray Process noise covariance for linear state Returns ------- list[RBPFParticle] Predicted particles Examples -------- >>> import numpy as np >>> from pytcl.dynamic_estimation.rbpf import RBPFParticle >>> np.random.seed(42) >>> # 3 particles with nonlinear bearing and linear position >>> particles = [ ... RBPFParticle(y=np.array([0.1]), x=np.array([0.0, 1.0]), ... P=np.eye(2) * 0.5, w=1/3), ... RBPFParticle(y=np.array([0.0]), x=np.array([0.0, 1.0]), ... P=np.eye(2) * 0.5, w=1/3), ... RBPFParticle(y=np.array([-0.1]), x=np.array([0.0, 1.0]), ... P=np.eye(2) * 0.5, w=1/3), ... ] >>> # Nonlinear dynamics for bearing >>> g = lambda y: y # bearing stays constant >>> G = np.eye(1) >>> Qy = np.eye(1) * 0.01 >>> # Linear dynamics for position >>> f = lambda x, y: np.array([x[0] + x[1] * 0.1, x[1]]) >>> F = np.array([[1, 0.1], [0, 1]]) >>> Qx = np.eye(2) * 0.01 >>> predicted = rbpf_predict(particles, g, G, Qy, f, F, Qx) >>> len(predicted) 3 """ new_particles = [] for particle in particles: # Predict nonlinear component y_pred = g(particle.y) y_pred = y_pred + np.random.multivariate_normal(np.zeros(y_pred.shape[0]), Qy) # Create wrapper for linear dynamics with current y_pred def f_wrapper(x: NDArray[Any]) -> NDArray[Any]: return f(x, y_pred) # Predict linear component pred = ekf_predict(particle.x, particle.P, f_wrapper, F, Qx) new_particle = RBPFParticle( y=y_pred, x=pred.x, P=pred.P, w=particle.w, ) new_particles.append(new_particle) return new_particles
[docs] def rbpf_update( particles: list[RBPFParticle], z: NDArray[Any], h: Callable[[NDArray[Any], NDArray[Any]], NDArray[Any]], H: NDArray[Any], R: NDArray[Any], ) -> list[RBPFParticle]: """Update step for RBPF particles. Parameters ---------- particles : list[RBPFParticle] Predicted particles z : NDArray Measurement h : callable Measurement function H : NDArray Jacobian of measurement function R : NDArray Measurement noise covariance Returns ------- list[RBPFParticle] Updated particles with adapted weights Examples -------- >>> import numpy as np >>> from pytcl.dynamic_estimation.rbpf import RBPFParticle >>> # 3 particles at different bearings >>> particles = [ ... RBPFParticle(y=np.array([0.5]), x=np.array([1.0, 0.0]), ... P=np.eye(2) * 0.5, w=1/3), ... RBPFParticle(y=np.array([0.0]), x=np.array([1.0, 0.0]), ... P=np.eye(2) * 0.5, w=1/3), ... RBPFParticle(y=np.array([-0.5]), x=np.array([1.0, 0.0]), ... P=np.eye(2) * 0.5, w=1/3), ... ] >>> # Position measurement >>> z = np.array([1.1]) >>> h = lambda x, y: np.array([x[0]]) # measure position >>> H = np.array([[1, 0]]) >>> R = np.array([[0.1]]) >>> updated = rbpf_update(particles, z, h, H, R) >>> len(updated) 3 >>> # Weights should sum to 1 >>> abs(sum(p.w for p in updated) - 1.0) < 1e-10 True """ weights = np.zeros(len(particles)) new_particles = [] for i, particle in enumerate(particles): # Create wrapper for measurement function with current y def h_wrapper(x: NDArray[Any]) -> NDArray[Any]: return h(x, particle.y) # Update linear component upd = ekf_update(particle.x, particle.P, z, h_wrapper, H, R) # Weight by measurement likelihood weights[i] = particle.w * upd.likelihood new_particle = RBPFParticle( y=particle.y, x=upd.x, P=upd.P, w=particle.w, ) new_particles.append(new_particle) # Normalize weights w_sum = np.sum(weights) if w_sum > 0: weights = weights / w_sum else: weights = np.ones(len(particles)) / len(particles) # Update with new weights return [ RBPFParticle(y=p.y, x=p.x, P=p.P, w=w) for p, w in zip(new_particles, weights) ]