Source code for pytcl.dynamic_estimation.kalman.extended

"""
Extended Kalman Filter (EKF) implementation.

The EKF handles nonlinear dynamics and/or measurements by linearizing
around the current state estimate using Jacobians.
"""

from typing import Any, Callable

import numpy as np
from numpy.typing import ArrayLike, NDArray

from pytcl.dynamic_estimation.kalman.linear import KalmanPrediction, KalmanUpdate


[docs] def ekf_predict( x: ArrayLike, P: ArrayLike, f: Callable[[NDArray[Any]], NDArray[Any]], F: ArrayLike, Q: ArrayLike, ) -> KalmanPrediction: """ Extended Kalman filter prediction step. Uses nonlinear dynamics f(x) with Jacobian F for covariance propagation. Parameters ---------- x : array_like Current state estimate, shape (n,). P : array_like Current state covariance, shape (n, n). f : callable Nonlinear state transition function f(x) -> x_next. F : array_like Jacobian of f evaluated at x, shape (n, n). Q : array_like Process noise covariance, shape (n, n). Returns ------- result : KalmanPrediction Predicted state and covariance. Examples -------- >>> import numpy as np >>> # Coordinated turn dynamics >>> def f_turn(x, omega=0.1, T=1.0): ... px, vx, py, vy = x ... return np.array([ ... px + np.sin(omega*T)/omega * vx - (1-np.cos(omega*T))/omega * vy, ... np.cos(omega*T) * vx - np.sin(omega*T) * vy, ... py + (1-np.cos(omega*T))/omega * vx + np.sin(omega*T)/omega * vy, ... np.sin(omega*T) * vx + np.cos(omega*T) * vy, ... ]) >>> x = np.array([0, 10, 0, 0]) >>> P = np.eye(4) >>> F = np.eye(4) # Jacobian (simplified) >>> Q = np.eye(4) * 0.1 >>> pred = ekf_predict(x, P, f_turn, F, Q) See Also -------- ekf_update : EKF measurement update. """ x = np.asarray(x, dtype=np.float64).flatten() P = np.asarray(P, dtype=np.float64) F = np.asarray(F, dtype=np.float64) Q = np.asarray(Q, dtype=np.float64) # Predicted state using nonlinear dynamics x_pred = np.asarray(f(x), dtype=np.float64).flatten() # Predicted covariance using linearized dynamics P_pred = F @ P @ F.T + Q # Ensure symmetry P_pred = (P_pred + P_pred.T) / 2 return KalmanPrediction(x=x_pred, P=P_pred)
[docs] def ekf_update( x: ArrayLike, P: ArrayLike, z: ArrayLike, h: Callable[[NDArray[Any]], NDArray[Any]], H: ArrayLike, R: ArrayLike, ) -> KalmanUpdate: """ Extended Kalman filter update step. Uses nonlinear measurement function h(x) with Jacobian H. Parameters ---------- x : array_like Predicted state estimate, shape (n,). P : array_like Predicted state covariance, shape (n, n). z : array_like Measurement, shape (m,). h : callable Nonlinear measurement function h(x) -> z. H : array_like Jacobian of h evaluated at x, shape (m, n). R : array_like Measurement noise covariance, shape (m, m). Returns ------- result : KalmanUpdate Updated state, covariance, and innovation statistics. Examples -------- >>> import numpy as np >>> # Range-bearing measurement >>> def h_rb(x): ... px, vx, py, vy = x ... r = np.sqrt(px**2 + py**2) ... theta = np.arctan2(py, px) ... return np.array([r, theta]) >>> x = np.array([100, 10, 50, 5]) >>> P = np.eye(4) >>> z = np.array([112, 0.46]) >>> H = np.array([[0.89, 0, 0.45, 0], [-0.004, 0, 0.008, 0]]) >>> R = np.diag([1.0, 0.01]) >>> upd = ekf_update(x, P, z, h_rb, H, R) See Also -------- ekf_predict : EKF prediction step. """ x = np.asarray(x, dtype=np.float64).flatten() P = np.asarray(P, dtype=np.float64) z = np.asarray(z, dtype=np.float64).flatten() H = np.asarray(H, dtype=np.float64) R = np.asarray(R, dtype=np.float64) # Predicted measurement using nonlinear function z_pred = np.asarray(h(x), dtype=np.float64).flatten() # Innovation y = z - z_pred # Innovation covariance S = H @ P @ H.T + R # Kalman gain K = np.linalg.solve(S.T, H @ P.T).T # Updated state x_upd = x + K @ y # Updated covariance (Joseph form) I_KH = np.eye(len(x)) - K @ H P_upd = I_KH @ P @ I_KH.T + K @ R @ K.T # Ensure symmetry P_upd = (P_upd + P_upd.T) / 2 # Likelihood m = len(z) det_S = np.linalg.det(S) if det_S > 0: mahal_sq = y @ np.linalg.solve(S, y) likelihood = np.exp(-0.5 * mahal_sq) / np.sqrt((2 * np.pi) ** m * det_S) else: likelihood = 0.0 return KalmanUpdate( x=x_upd, P=P_upd, y=y, S=S, K=K, likelihood=likelihood, )
[docs] def numerical_jacobian( f: Callable[[NDArray[Any]], NDArray[Any]], x: ArrayLike, dx: float = 1e-7, ) -> NDArray[np.floating]: """ Compute Jacobian numerically using central differences. Parameters ---------- f : callable Function f(x) -> y. x : array_like Point at which to evaluate Jacobian. dx : float, optional Step size for finite differences. Returns ------- J : ndarray Jacobian matrix of shape (m, n) where m = len(f(x)), n = len(x). Examples -------- >>> import numpy as np >>> def f(x): ... return np.array([x[0]**2, x[0]*x[1]]) >>> x = np.array([2.0, 3.0]) >>> J = numerical_jacobian(f, x) >>> J # [[2*x[0], 0], [x[1], x[0]]] = [[4, 0], [3, 2]] array([[4., 0.], [3., 2.]]) """ x = np.asarray(x, dtype=np.float64).flatten() f0 = np.asarray(f(x), dtype=np.float64).flatten() n = len(x) m = len(f0) J = np.zeros((m, n), dtype=np.float64) for i in range(n): x_plus = x.copy() x_minus = x.copy() x_plus[i] += dx x_minus[i] -= dx f_plus = np.asarray(f(x_plus), dtype=np.float64).flatten() f_minus = np.asarray(f(x_minus), dtype=np.float64).flatten() J[:, i] = (f_plus - f_minus) / (2 * dx) return J
[docs] def ekf_predict_auto( x: ArrayLike, P: ArrayLike, f: Callable[[NDArray[Any]], NDArray[Any]], Q: ArrayLike, dx: float = 1e-7, ) -> KalmanPrediction: """ EKF prediction with automatic Jacobian computation. Parameters ---------- x : array_like Current state estimate. P : array_like Current state covariance. f : callable Nonlinear state transition function. Q : array_like Process noise covariance. dx : float, optional Step size for numerical Jacobian. Returns ------- result : KalmanPrediction Predicted state and covariance. Examples -------- >>> import numpy as np >>> # Simple nonlinear dynamics >>> def f(x): ... return np.array([x[0] + x[1], 0.9 * x[1]]) >>> x = np.array([0.0, 1.0]) >>> P = np.eye(2) * 0.1 >>> Q = np.eye(2) * 0.01 >>> pred = ekf_predict_auto(x, P, f, Q) >>> pred.x array([1., 0.9]) See Also -------- ekf_predict : EKF prediction with explicit Jacobian. """ x = np.asarray(x, dtype=np.float64).flatten() F = numerical_jacobian(f, x, dx) return ekf_predict(x, P, f, F, Q)
[docs] def ekf_update_auto( x: ArrayLike, P: ArrayLike, z: ArrayLike, h: Callable[[NDArray[Any]], NDArray[Any]], R: ArrayLike, dx: float = 1e-7, ) -> KalmanUpdate: """ EKF update with automatic Jacobian computation. Parameters ---------- x : array_like Predicted state estimate. P : array_like Predicted state covariance. z : array_like Measurement. h : callable Nonlinear measurement function. R : array_like Measurement noise covariance. dx : float, optional Step size for numerical Jacobian. Returns ------- result : KalmanUpdate Updated state and covariance. Examples -------- >>> import numpy as np >>> # Range measurement h(x) = sqrt(x[0]^2 + x[1]^2) >>> def h(x): ... return np.array([np.sqrt(x[0]**2 + x[1]**2)]) >>> x = np.array([3.0, 4.0]) # predicted position >>> P = np.eye(2) * 0.5 >>> z = np.array([5.1]) # measured range >>> R = np.array([[0.1]]) >>> upd = ekf_update_auto(x, P, z, h, R) >>> np.linalg.norm(upd.x) # estimate moves toward measurement 5.0... See Also -------- ekf_update : EKF update with explicit Jacobian. """ x = np.asarray(x, dtype=np.float64).flatten() H = numerical_jacobian(h, x, dx) return ekf_update(x, P, z, h, H, R)
[docs] def iterated_ekf_update( x: ArrayLike, P: ArrayLike, z: ArrayLike, h: Callable[[NDArray[Any]], NDArray[Any]], H_func: Callable[[NDArray[Any]], NDArray[Any]], R: ArrayLike, max_iter: int = 10, tol: float = 1e-6, ) -> KalmanUpdate: """ Iterated Extended Kalman Filter (IEKF) update. The IEKF iteratively relinearizes around the updated estimate to improve accuracy for highly nonlinear measurements. Parameters ---------- x : array_like Predicted state estimate. P : array_like Predicted state covariance. z : array_like Measurement. h : callable Nonlinear measurement function h(x) -> z. H_func : callable Function returning Jacobian H(x) at given state. R : array_like Measurement noise covariance. max_iter : int, optional Maximum iterations. tol : float, optional Convergence tolerance. Returns ------- result : KalmanUpdate Updated state and covariance after convergence. Examples -------- >>> import numpy as np >>> # Bearing-only measurement (highly nonlinear) >>> def h(x): ... return np.array([np.arctan2(x[1], x[0])]) >>> def H_func(x): ... r2 = x[0]**2 + x[1]**2 ... return np.array([[-x[1]/r2, x[0]/r2]]) >>> x = np.array([10.0, 10.0]) >>> P = np.eye(2) * 5.0 >>> z = np.array([np.radians(50)]) # measured bearing >>> R = np.array([[0.01]]) >>> upd = iterated_ekf_update(x, P, z, h, H_func, R) >>> upd.x.shape (2,) See Also -------- ekf_update : Standard (non-iterated) EKF update. """ x = np.asarray(x, dtype=np.float64).flatten() P = np.asarray(P, dtype=np.float64) z = np.asarray(z, dtype=np.float64).flatten() R = np.asarray(R, dtype=np.float64) x_iter = x.copy() for _ in range(max_iter): H = np.asarray(H_func(x_iter), dtype=np.float64) z_pred = np.asarray(h(x_iter), dtype=np.float64).flatten() # Innovation at linearization point y = z - z_pred - H @ (x - x_iter) # Innovation covariance S = H @ P @ H.T + R # Kalman gain K = np.linalg.solve(S.T, H @ P.T).T # Updated state x_new = x + K @ y # Check convergence if np.linalg.norm(x_new - x_iter) < tol: x_iter = x_new break x_iter = x_new # Final update with converged linearization H = np.asarray(H_func(x_iter), dtype=np.float64) z_pred = np.asarray(h(x_iter), dtype=np.float64).flatten() y = z - z_pred S = H @ P @ H.T + R K = np.linalg.solve(S.T, H @ P.T).T I_KH = np.eye(len(x)) - K @ H P_upd = I_KH @ P @ I_KH.T + K @ R @ K.T P_upd = (P_upd + P_upd.T) / 2 m = len(z) det_S = np.linalg.det(S) if det_S > 0: mahal_sq = y @ np.linalg.solve(S, y) likelihood = np.exp(-0.5 * mahal_sq) / np.sqrt((2 * np.pi) ** m * det_S) else: likelihood = 0.0 return KalmanUpdate( x=x_iter, P=P_upd, y=y, S=S, K=K, likelihood=likelihood, )
__all__ = [ "ekf_predict", "ekf_update", "numerical_jacobian", "ekf_predict_auto", "ekf_update_auto", "iterated_ekf_update", ]