Source code for pytcl.dynamic_estimation.kalman.constrained

"""
Constrained Extended Kalman Filter (CEKF).

Extends the Extended Kalman Filter to enforce constraints on the state
estimate. Uses Lagrange multiplier method to project onto constraint manifold
while maintaining positive definite covariance.

References
----------
.. [1] Simon, D. (2006). Optimal State Estimation: Kalman, H∞, and Nonlinear
       Approaches. Wiley-Interscience.
.. [2] Simon, D. & Simon, D. L. (2010). Constrained Kalman filtering via
       density function truncation. Journal of Guidance, Control, and Dynamics.
"""

from typing import Any, Callable, Optional

import numpy as np
from numpy.typing import ArrayLike, NDArray

from pytcl.dynamic_estimation.kalman.extended import ekf_predict, ekf_update
from pytcl.dynamic_estimation.kalman.linear import KalmanPrediction, KalmanUpdate


[docs] class ConstraintFunction: """Base class for state constraints."""
[docs] def __init__( self, g: Callable[[NDArray[Any]], NDArray[Any]], G: Optional[Callable[[NDArray[Any]], NDArray[Any]]] = None, constraint_type: str = "inequality", ): """ Define a constraint: g(x) ≤ 0 (inequality) or g(x) = 0 (equality). Parameters ---------- g : callable Constraint function: g(x) -> scalar or array Inequality: g(x) ≤ 0 Equality: g(x) = 0 G : callable, optional Jacobian of g with respect to x: ∂g/∂x If None, computed numerically. constraint_type : {'inequality', 'equality'} Constraint type. """ self.g = g self.G = G self.constraint_type = constraint_type
[docs] def evaluate(self, x: NDArray[Any]) -> NDArray[Any]: """Evaluate constraint at state x.""" return np.atleast_1d(np.asarray(self.g(x), dtype=np.float64))
[docs] def jacobian(self, x: NDArray[Any]) -> NDArray[Any]: """Compute constraint Jacobian at x.""" if self.G is not None: return np.atleast_2d(np.asarray(self.G(x), dtype=np.float64)) else: # Numerical differentiation eps = 1e-6 n = len(x) g_x = self.evaluate(x) m = len(g_x) J = np.zeros((m, n)) for i in range(n): x_plus = x.copy() x_plus[i] += eps g_plus = self.evaluate(x_plus) J[:, i] = (g_plus - g_x) / eps return J
[docs] def is_satisfied(self, x: NDArray[Any], tol: float = 1e-6) -> bool: """Check if constraint is satisfied.""" g_val = self.evaluate(x) if self.constraint_type == "inequality": return np.all(g_val <= tol) else: # equality return np.allclose(g_val, 0, atol=tol)
[docs] class ConstrainedEKF: """ Extended Kalman Filter with state constraints. Enforces linear and/or nonlinear constraints on state estimate using Lagrange multiplier method with covariance projection. Attributes ---------- constraints : list of ConstraintFunction List of active constraints. """
[docs] def __init__(self) -> None: """Initialize Constrained EKF.""" self.constraints: list[ConstraintFunction] = []
[docs] def add_constraint(self, constraint: ConstraintFunction) -> None: """ Add a constraint to the filter. Parameters ---------- constraint : ConstraintFunction Constraint to enforce. """ self.constraints.append(constraint)
[docs] def predict( self, x: ArrayLike, P: ArrayLike, f: Callable[[NDArray[Any]], NDArray[Any]], F: ArrayLike, Q: ArrayLike, ) -> KalmanPrediction: """ Constrained EKF prediction step. Performs standard EKF prediction (constraints not enforced here, only checked). Constraint enforcement happens in update step. Parameters ---------- x : array_like Current state estimate, shape (n,). P : array_like Current state covariance, shape (n, n). f : callable Nonlinear state transition function. F : array_like Jacobian of f at current state. Q : array_like Process noise covariance, shape (n, n). Returns ------- result : KalmanPrediction Predicted state and covariance. """ return ekf_predict(x, P, f, F, Q)
[docs] def update( self, x: ArrayLike, P: ArrayLike, z: ArrayLike, h: Callable[[NDArray[Any]], NDArray[Any]], H: ArrayLike, R: ArrayLike, ) -> KalmanUpdate: """ Constrained EKF update step. Performs standard EKF update, then projects result onto constraint manifold. Parameters ---------- x : array_like Predicted state estimate, shape (n,). P : array_like Predicted state covariance, shape (n, n). z : array_like Measurement, shape (m,). h : callable Nonlinear measurement function. H : array_like Jacobian of h at current state. R : array_like Measurement noise covariance, shape (m, m). Returns ------- result : KalmanUpdate Updated state and covariance (constrained). """ # Standard EKF update result = ekf_update(x, P, z, h, H, R) x_upd = result.x P_upd = result.P # Apply constraint projection if self.constraints: x_upd, P_upd = self._project_onto_constraints(x_upd, P_upd) return KalmanUpdate( x=x_upd, P=P_upd, y=result.y, S=result.S, K=result.K, likelihood=result.likelihood, )
def _project_onto_constraints( self, x: NDArray[Any], P: NDArray[Any], max_iter: int = 10, tol: float = 1e-6, ) -> tuple[NDArray[Any], NDArray[Any]]: """ Project state and covariance onto constraint manifold. Uses iterative Lagrange multiplier method with covariance projection to enforce constraints while maintaining positive definiteness. Parameters ---------- x : ndarray State estimate, shape (n,). P : ndarray Covariance matrix, shape (n, n). max_iter : int Maximum iterations for constraint projection. tol : float Convergence tolerance. Returns ------- x_proj : ndarray Constrained state estimate. P_proj : ndarray Projected covariance. """ x_proj = x.copy() P_proj = P.copy() # Check which constraints are violated violated: list[ConstraintFunction] = [ c for c in self.constraints if not c.is_satisfied(x_proj) ] if not violated: return x_proj, P_proj # Iterative projection for iteration in range(max_iter): converged = True for constraint in violated: g_val = constraint.evaluate(x_proj) G = constraint.jacobian(x_proj) # Only process violated constraints if constraint.constraint_type == "inequality": mask = g_val > tol else: mask = np.abs(g_val) > tol if not np.any(mask): continue converged = False # Lagrange multiplier for this constraint # λ = -(G P Gᵀ + μ)⁻¹ G (x - x_ref) # where x_ref is desired state (we use x) GP = G @ P_proj GPGt = GP @ G.T # Add small regularization for numerical stability mu = np.eye(GPGt.shape[0]) * 1e-6 try: GPGt_inv = np.linalg.inv(GPGt + mu) lam = -GPGt_inv @ (G @ x_proj + g_val) # State correction x_corr = P_proj @ G.T @ lam x_proj = x_proj + x_corr # Covariance projection # P_proj = P - P G^T (G P G^T)^{-1} G P P_proj = P_proj - GP.T @ GPGt_inv @ GP # Ensure symmetry P_proj = (P_proj + P_proj.T) / 2 # Enforce positive definiteness eigvals, eigvecs = np.linalg.eigh(P_proj) if np.any(eigvals < 0): eigvals[eigvals < 1e-10] = 1e-10 P_proj = eigvecs @ np.diag(eigvals) @ eigvecs.T except np.linalg.LinAlgError: # If inversion fails, use pseudoinverse GPGt_pinv = np.linalg.pinv(GPGt) lam = -GPGt_pinv @ (G @ x_proj + g_val) x_proj = x_proj + P_proj @ G.T @ lam if converged: break return x_proj, P_proj
[docs] def constrained_ekf_predict( x: ArrayLike, P: ArrayLike, f: Callable[[NDArray[Any]], NDArray[Any]], F: ArrayLike, Q: ArrayLike, ) -> KalmanPrediction: """ Convenience function for constrained EKF prediction. Parameters ---------- x : array_like Current state estimate. P : array_like Current covariance. f : callable Nonlinear dynamics function. F : array_like Jacobian of f. Q : array_like Process noise covariance. Returns ------- result : KalmanPrediction Predicted state and covariance. """ cekf = ConstrainedEKF() return cekf.predict(x, P, f, F, Q)
[docs] def constrained_ekf_update( x: ArrayLike, P: ArrayLike, z: ArrayLike, h: Callable[[NDArray[Any]], NDArray[Any]], H: ArrayLike, R: ArrayLike, constraints: Optional[list[ConstraintFunction]] = None, ) -> KalmanUpdate: """ Convenience function for constrained EKF update. Parameters ---------- x : array_like Predicted state. P : array_like Predicted covariance. z : array_like Measurement. h : callable Nonlinear measurement function. H : array_like Jacobian of h. R : array_like Measurement noise covariance. constraints : list, optional List of ConstraintFunction objects. Returns ------- result : KalmanUpdate Updated state and covariance. """ cekf = ConstrainedEKF() if constraints: for c in constraints: cekf.add_constraint(c) return cekf.update(x, P, z, h, H, R)
__all__ = [ "ConstraintFunction", "ConstrainedEKF", "constrained_ekf_predict", "constrained_ekf_update", ]