"""
GPU-accelerated Extended Kalman Filter using CuPy.
This module provides GPU-accelerated implementations of the Extended Kalman
Filter (EKF) for batch processing of multiple tracks with nonlinear dynamics.
The EKF handles nonlinear systems by linearizing around the current estimate:
x_k = f(x_{k-1}) + w (nonlinear dynamics)
z_k = h(x_k) + v (nonlinear measurement)
Key Features
------------
- Batch processing of multiple tracks with same or different dynamics
- Support for user-provided Jacobian functions
- Numerical Jacobian computation when analytic unavailable
- Memory-efficient operations using CuPy
Examples
--------
>>> from pytcl.gpu.ekf import batch_ekf_predict, batch_ekf_update
>>> import numpy as np
>>>
>>> # Define nonlinear dynamics (on CPU, applied per-particle)
>>> def f_dynamics(x):
... return np.array([x[0] + x[1], x[1] * 0.99])
>>>
>>> def F_jacobian(x):
... return np.array([[1, 1], [0, 0.99]])
>>>
>>> # Batch prediction
>>> x_pred, P_pred = batch_ekf_predict(x, P, f_dynamics, F_jacobian, Q)
"""
from typing import Any, Callable, NamedTuple, Optional
import numpy as np
from numpy.typing import ArrayLike, NDArray
from pytcl.core.optional_deps import import_optional, requires
from pytcl.gpu.utils import ensure_gpu_array, to_cpu
class BatchEKFPrediction(NamedTuple):
"""Result of batch EKF prediction.
Attributes
----------
x : ndarray
Predicted state estimates, shape (n_tracks, state_dim).
P : ndarray
Predicted covariances, shape (n_tracks, state_dim, state_dim).
"""
x: NDArray[np.floating]
P: NDArray[np.floating]
class BatchEKFUpdate(NamedTuple):
"""Result of batch EKF update.
Attributes
----------
x : ndarray
Updated state estimates.
P : ndarray
Updated covariances.
y : ndarray
Innovations.
S : ndarray
Innovation covariances.
K : ndarray
Kalman gains.
likelihood : ndarray
Measurement likelihoods.
"""
x: NDArray[np.floating]
P: NDArray[np.floating]
y: NDArray[np.floating]
S: NDArray[np.floating]
K: NDArray[np.floating]
likelihood: NDArray[np.floating]
def _compute_numerical_jacobian(
f: Callable[[NDArray[np.floating[Any]]], NDArray[np.floating[Any]]],
x: NDArray[np.floating[Any]],
eps: float = 1e-7,
) -> NDArray[np.floating[Any]]:
"""
Compute numerical Jacobian using central differences.
Parameters
----------
f : callable
Function to differentiate.
x : ndarray
Point at which to evaluate Jacobian.
eps : float
Finite difference step size.
Returns
-------
J : ndarray
Jacobian matrix, shape (output_dim, input_dim).
"""
x = np.asarray(x).flatten()
n = len(x)
f0 = np.asarray(f(x)).flatten()
m = len(f0)
J = np.zeros((m, n))
for i in range(n):
x_plus = x.copy()
x_minus = x.copy()
x_plus[i] += eps
x_minus[i] -= eps
f_plus = np.asarray(f(x_plus)).flatten()
f_minus = np.asarray(f(x_minus)).flatten()
J[:, i] = (f_plus - f_minus) / (2 * eps)
return J
[docs]
@requires("cupy", extra="gpu", feature="GPU Extended Kalman filter")
def batch_ekf_predict(
x: ArrayLike,
P: ArrayLike,
f: Callable[[NDArray[np.floating[Any]]], NDArray[np.floating[Any]]],
F_jacobian: Optional[
Callable[[NDArray[np.floating[Any]]], NDArray[np.floating[Any]]]
],
Q: ArrayLike,
) -> BatchEKFPrediction:
"""
Batch EKF prediction for multiple tracks.
Parameters
----------
x : array_like
Current state estimates, shape (n_tracks, state_dim).
P : array_like
Current covariances, shape (n_tracks, state_dim, state_dim).
f : callable
Nonlinear dynamics function f(x) -> x_next.
Applied to each track's state vector.
F_jacobian : callable or None
Jacobian of dynamics df/dx. If None, computed numerically.
Q : array_like
Process noise covariance, shape (state_dim, state_dim)
or (n_tracks, state_dim, state_dim).
Returns
-------
result : BatchEKFPrediction
Predicted states and covariances.
Examples
--------
>>> import numpy as np
>>> from pytcl.gpu.ekf import batch_ekf_predict
>>> # Nonlinear dynamics: coordinated turn
>>> def f_turn(x):
... w = 0.01 # Turn rate
... return np.array([x[0] + np.cos(w)*x[2],
... x[1] + np.sin(w)*x[3],
... x[2], x[3]])
>>> def F_jacobian(x):
... w = 0.01
... return np.array([[1, 0, np.cos(w), 0],
... [0, 1, np.sin(w), 0],
... [0, 0, 1, 0],
... [0, 0, 0, 1]])
>>> n_tracks = 30
>>> x = np.random.randn(n_tracks, 4) * 0.1
>>> P = np.tile(np.eye(4) * 0.01, (n_tracks, 1, 1))
>>> Q = np.eye(4) * 0.001
>>> result = batch_ekf_predict(x, P, f_turn, F_jacobian, Q)
>>> result.x.shape
(30, 4)
Notes
-----
The nonlinear dynamics are applied on CPU (Python function), then
covariance propagation is performed on GPU. This is efficient when
the number of tracks is large relative to the cost of the dynamics.
"""
cp = import_optional("cupy", extra="gpu", feature="GPU Extended Kalman filter")
# Convert to numpy for dynamics evaluation
x_np = np.asarray(x)
P_gpu = ensure_gpu_array(P, dtype=cp.float64)
Q_gpu = ensure_gpu_array(Q, dtype=cp.float64)
n_tracks = x_np.shape[0]
state_dim = x_np.shape[1]
# Apply nonlinear dynamics to each track (on CPU)
x_pred_np = np.zeros_like(x_np)
F_matrices = np.zeros((n_tracks, state_dim, state_dim))
for i in range(n_tracks):
x_i = x_np[i]
x_pred_np[i] = f(x_i)
# Compute Jacobian
if F_jacobian is not None:
F_matrices[i] = F_jacobian(x_i)
else:
F_matrices[i] = _compute_numerical_jacobian(f, x_i)
# Move to GPU
x_pred_gpu = ensure_gpu_array(x_pred_np, dtype=cp.float64)
F_gpu = ensure_gpu_array(F_matrices, dtype=cp.float64)
# Handle Q dimensions
if Q_gpu.ndim == 2:
Q_batch = cp.broadcast_to(Q_gpu, (n_tracks, state_dim, state_dim))
else:
Q_batch = Q_gpu
# Covariance prediction on GPU: P_pred = F @ P @ F' + Q
FP = cp.einsum("nij,njk->nik", F_gpu, P_gpu)
P_pred = cp.einsum("nij,nkj->nik", FP, F_gpu) + Q_batch
# Ensure symmetry
P_pred = (P_pred + cp.swapaxes(P_pred, -2, -1)) / 2
return BatchEKFPrediction(x=x_pred_gpu, P=P_pred)
[docs]
@requires("cupy", extra="gpu", feature="GPU Extended Kalman filter")
def batch_ekf_update(
x: ArrayLike,
P: ArrayLike,
z: ArrayLike,
h: Callable[[NDArray[np.floating[Any]]], NDArray[np.floating[Any]]],
H_jacobian: Optional[
Callable[[NDArray[np.floating[Any]]], NDArray[np.floating[Any]]]
],
R: ArrayLike,
) -> BatchEKFUpdate:
"""
Batch EKF update for multiple tracks.
Parameters
----------
x : array_like
Predicted state estimates, shape (n_tracks, state_dim).
P : array_like
Predicted covariances, shape (n_tracks, state_dim, state_dim).
z : array_like
Measurements, shape (n_tracks, meas_dim).
h : callable
Nonlinear measurement function h(x) -> z_predicted.
H_jacobian : callable or None
Jacobian of measurement function dh/dx. If None, computed numerically.
R : array_like
Measurement noise covariance.
Returns
-------
result : BatchEKFUpdate
Update results including states, covariances, and statistics.
Examples
--------
>>> import numpy as np
>>> from pytcl.gpu.ekf import batch_ekf_update
>>> # Polar measurement from Cartesian state
>>> def h_polar(x):
... r = np.sqrt(x[0]**2 + x[1]**2)
... theta = np.arctan2(x[1], x[0])
... return np.array([r, theta])
>>> def H_jacobian(x):
... r = np.sqrt(x[0]**2 + x[1]**2)
... return np.array([[x[0]/r, x[1]/r],
... [-x[1]/r**2, x[0]/r**2]])
>>> n_tracks = 20
>>> x = np.random.randn(n_tracks, 2)
>>> P = np.tile(np.eye(2), (n_tracks, 1, 1))
>>> z = np.random.randn(n_tracks, 2) * [100, 0.1] # r, theta
>>> R = np.diag([10.0, 0.01])
>>> result = batch_ekf_update(x, P, z, h_polar, H_jacobian, R)
>>> result.x.shape
(20, 2)
"""
cp = import_optional("cupy", extra="gpu", feature="GPU Extended Kalman filter")
# Convert to numpy for measurement evaluation
x_np = np.asarray(to_cpu(x))
z_np = np.asarray(z)
P_gpu = ensure_gpu_array(P, dtype=cp.float64)
z_gpu = ensure_gpu_array(z, dtype=cp.float64)
R_gpu = ensure_gpu_array(R, dtype=cp.float64)
n_tracks = x_np.shape[0]
state_dim = x_np.shape[1]
meas_dim = z_np.shape[1]
# Evaluate measurement function and Jacobian for each track
z_pred_np = np.zeros((n_tracks, meas_dim))
H_matrices = np.zeros((n_tracks, meas_dim, state_dim))
for i in range(n_tracks):
x_i = x_np[i]
z_pred_np[i] = h(x_i)
if H_jacobian is not None:
H_matrices[i] = H_jacobian(x_i)
else:
H_matrices[i] = _compute_numerical_jacobian(h, x_i)
# Move to GPU
x_gpu = ensure_gpu_array(x_np, dtype=cp.float64)
z_pred_gpu = ensure_gpu_array(z_pred_np, dtype=cp.float64)
H_gpu = ensure_gpu_array(H_matrices, dtype=cp.float64)
# Handle R dimensions
if R_gpu.ndim == 2:
R_batch = cp.broadcast_to(R_gpu, (n_tracks, meas_dim, meas_dim))
else:
R_batch = R_gpu
# Innovation
y = z_gpu - z_pred_gpu
# Innovation covariance: S = H @ P @ H' + R
HP = cp.einsum("nij,njk->nik", H_gpu, P_gpu)
S = cp.einsum("nij,nkj->nik", HP, H_gpu) + R_batch
# Kalman gain: K = P @ H' @ S^{-1}
PHT = cp.einsum("nij,nkj->nik", P_gpu, H_gpu)
S_inv = cp.linalg.inv(S)
K = cp.einsum("nij,njk->nik", PHT, S_inv)
# Updated state
x_upd = x_gpu + cp.einsum("nij,nj->ni", K, y)
# Updated covariance (Joseph form)
eye = cp.eye(state_dim, dtype=cp.float64)
I_KH = eye - cp.einsum("nij,njk->nik", K, H_gpu)
P_upd = cp.einsum("nij,njk->nik", I_KH, P_gpu)
P_upd = cp.einsum("nij,nkj->nik", P_upd, I_KH)
KRK = cp.einsum("nij,njk,nlk->nil", K, R_batch, K)
P_upd = P_upd + KRK
# Ensure symmetry
P_upd = (P_upd + cp.swapaxes(P_upd, -2, -1)) / 2
# Likelihoods
mahal_sq = cp.einsum("ni,nij,nj->n", y, S_inv, y)
sign, logdet = cp.linalg.slogdet(S)
log_likelihood = -0.5 * (mahal_sq + logdet + meas_dim * np.log(2 * np.pi))
likelihood = cp.exp(log_likelihood)
return BatchEKFUpdate(
x=x_upd,
P=P_upd,
y=y,
S=S,
K=K,
likelihood=likelihood,
)
[docs]
class CuPyExtendedKalmanFilter:
"""
GPU-accelerated Extended Kalman Filter for batch processing.
Parameters
----------
state_dim : int
Dimension of state vector.
meas_dim : int
Dimension of measurement vector.
f : callable
Nonlinear dynamics function f(x) -> x_next.
h : callable
Nonlinear measurement function h(x) -> z.
F_jacobian : callable, optional
Jacobian of dynamics. If None, computed numerically.
H_jacobian : callable, optional
Jacobian of measurement. If None, computed numerically.
Q : array_like, optional
Process noise covariance.
R : array_like, optional
Measurement noise covariance.
Examples
--------
>>> import numpy as np
>>> from pytcl.gpu.ekf import CuPyExtendedKalmanFilter
>>>
>>> # Nonlinear dynamics
>>> def f(x):
... return np.array([x[0] + x[1], x[1] * 0.99])
>>>
>>> def h(x):
... return np.array([np.sqrt(x[0]**2 + x[1]**2)])
>>>
>>> ekf = CuPyExtendedKalmanFilter(
... state_dim=2, meas_dim=1,
... f=f, h=h,
... Q=np.eye(2) * 0.01,
... R=np.array([[0.1]]),
... )
"""
[docs]
@requires("cupy", extra="gpu", feature="GPU Extended Kalman filter")
def __init__(
self,
state_dim: int,
meas_dim: int,
f: Callable[[NDArray[np.floating[Any]]], NDArray[np.floating[Any]]],
h: Callable[[NDArray[np.floating[Any]]], NDArray[np.floating[Any]]],
F_jacobian: Optional[
Callable[[NDArray[np.floating[Any]]], NDArray[np.floating[Any]]]
] = None,
H_jacobian: Optional[
Callable[[NDArray[np.floating[Any]]], NDArray[np.floating[Any]]]
] = None,
Q: Optional[ArrayLike] = None,
R: Optional[ArrayLike] = None,
):
cp = import_optional("cupy", extra="gpu", feature="GPU Extended Kalman filter")
self.state_dim = state_dim
self.meas_dim = meas_dim
self.f = f
self.h = h
self.F_jacobian = F_jacobian
self.H_jacobian = H_jacobian
if Q is None:
self.Q = cp.eye(state_dim, dtype=cp.float64) * 0.01
else:
self.Q = ensure_gpu_array(Q, dtype=cp.float64)
if R is None:
self.R = cp.eye(meas_dim, dtype=cp.float64)
else:
self.R = ensure_gpu_array(R, dtype=cp.float64)
[docs]
def predict(
self,
x: ArrayLike,
P: ArrayLike,
) -> BatchEKFPrediction:
"""Perform batch EKF prediction."""
return batch_ekf_predict(x, P, self.f, self.F_jacobian, self.Q)
[docs]
def update(
self,
x: ArrayLike,
P: ArrayLike,
z: ArrayLike,
) -> BatchEKFUpdate:
"""Perform batch EKF update."""
return batch_ekf_update(x, P, z, self.h, self.H_jacobian, self.R)
[docs]
def predict_update(
self,
x: ArrayLike,
P: ArrayLike,
z: ArrayLike,
) -> BatchEKFUpdate:
"""Combined prediction and update."""
pred = self.predict(x, P)
return self.update(pred.x, pred.P, z)
__all__ = [
"BatchEKFPrediction",
"BatchEKFUpdate",
"batch_ekf_predict",
"batch_ekf_update",
"CuPyExtendedKalmanFilter",
]