"""
GPU-accelerated Linear Kalman Filter using CuPy.
This module provides GPU-accelerated implementations of the linear Kalman filter
for batch processing of multiple tracks. The implementations achieve 5-10x
speedup compared to CPU for batch sizes > 100.
Key Features
------------
- Batch processing of multiple tracks in parallel
- Memory-efficient operations using CuPy's memory pool
- Compatible API with CPU implementations
- Automatic fallback to CPU if GPU unavailable
Examples
--------
Batch predict for 1000 tracks:
>>> from pytcl.gpu.kalman import batch_kf_predict
>>> import numpy as np
>>> n_tracks = 1000
>>> state_dim = 4
>>> x = np.random.randn(n_tracks, state_dim)
>>> P = np.tile(np.eye(state_dim), (n_tracks, 1, 1))
>>> F = np.array([[1, 1, 0, 0], [0, 1, 0, 0], [0, 0, 1, 1], [0, 0, 0, 1]])
>>> Q = np.eye(state_dim) * 0.1
>>> x_pred, P_pred = batch_kf_predict(x, P, F, Q)
See Also
--------
pytcl.dynamic_estimation.kalman.linear : CPU Kalman filter
pytcl.gpu.ekf : GPU Extended Kalman filter
"""
from typing import NamedTuple, Optional, Tuple
import numpy as np
from numpy.typing import ArrayLike, NDArray
from pytcl.core.optional_deps import import_optional, requires
from pytcl.gpu.utils import ensure_gpu_array
class BatchKalmanPrediction(NamedTuple):
"""Result of batch Kalman filter prediction.
Attributes
----------
x : ndarray
Predicted state estimates, shape (n_tracks, state_dim).
P : ndarray
Predicted covariances, shape (n_tracks, state_dim, state_dim).
"""
x: NDArray[np.floating]
P: NDArray[np.floating]
class BatchKalmanUpdate(NamedTuple):
"""Result of batch Kalman filter update.
Attributes
----------
x : ndarray
Updated state estimates, shape (n_tracks, state_dim).
P : ndarray
Updated covariances, shape (n_tracks, state_dim, state_dim).
y : ndarray
Innovations, shape (n_tracks, meas_dim).
S : ndarray
Innovation covariances, shape (n_tracks, meas_dim, meas_dim).
K : ndarray
Kalman gains, shape (n_tracks, state_dim, meas_dim).
likelihood : ndarray
Measurement likelihoods, shape (n_tracks,).
"""
x: NDArray[np.floating]
P: NDArray[np.floating]
y: NDArray[np.floating]
S: NDArray[np.floating]
K: NDArray[np.floating]
likelihood: NDArray[np.floating]
[docs]
@requires("cupy", extra="gpu", feature="GPU Kalman filter")
def batch_kf_predict(
x: ArrayLike,
P: ArrayLike,
F: ArrayLike,
Q: ArrayLike,
B: Optional[ArrayLike] = None,
u: Optional[ArrayLike] = None,
) -> BatchKalmanPrediction:
"""
Batch Kalman filter prediction for multiple tracks.
Performs the prediction step for N tracks in parallel on GPU:
x_pred[i] = F @ x[i] + B @ u[i] (if B, u provided)
P_pred[i] = F @ P[i] @ F' + Q
Parameters
----------
x : array_like
Current state estimates, shape (n_tracks, state_dim).
P : array_like
Current covariances, shape (n_tracks, state_dim, state_dim).
F : array_like
State transition matrix, shape (state_dim, state_dim).
Can also be (n_tracks, state_dim, state_dim) for track-specific matrices.
Q : array_like
Process noise covariance, shape (state_dim, state_dim).
Can also be (n_tracks, state_dim, state_dim) for track-specific noise.
B : array_like, optional
Control input matrix, shape (state_dim, control_dim).
u : array_like, optional
Control inputs, shape (n_tracks, control_dim).
Returns
-------
result : BatchKalmanPrediction
Named tuple with predicted states and covariances.
Examples
--------
>>> import numpy as np
>>> from pytcl.gpu.kalman import batch_kf_predict
>>> n_tracks = 100
>>> x = np.random.randn(n_tracks, 4)
>>> P = np.tile(np.eye(4) * 0.1, (n_tracks, 1, 1))
>>> F = np.array([[1, 1, 0, 0], [0, 1, 0, 0],
... [0, 0, 1, 1], [0, 0, 0, 1]])
>>> Q = np.eye(4) * 0.01
>>> pred = batch_kf_predict(x, P, F, Q)
>>> pred.x.shape
(100, 4)
"""
cp = import_optional("cupy", extra="gpu", feature="GPU Kalman filter")
# Move arrays to GPU
x_gpu = ensure_gpu_array(x, dtype=cp.float64)
P_gpu = ensure_gpu_array(P, dtype=cp.float64)
F_gpu = ensure_gpu_array(F, dtype=cp.float64)
Q_gpu = ensure_gpu_array(Q, dtype=cp.float64)
n_tracks = x_gpu.shape[0]
state_dim = x_gpu.shape[1]
# Handle F matrix dimensions
if F_gpu.ndim == 2:
# Broadcast F to all tracks: (n, n) -> (n_tracks, n, n)
F_batch = cp.broadcast_to(F_gpu, (n_tracks, state_dim, state_dim))
else:
F_batch = F_gpu
# Handle Q matrix dimensions
if Q_gpu.ndim == 2:
Q_batch = cp.broadcast_to(Q_gpu, (n_tracks, state_dim, state_dim))
else:
Q_batch = Q_gpu
# Batch prediction: x_pred = F @ x
# Use einsum for batched matrix-vector multiplication
x_pred = cp.einsum("nij,nj->ni", F_batch, x_gpu)
# Add control input if provided
if B is not None and u is not None:
B_gpu = ensure_gpu_array(B, dtype=cp.float64)
u_gpu = ensure_gpu_array(u, dtype=cp.float64)
if B_gpu.ndim == 2:
# Broadcast B
x_pred += cp.einsum("ij,nj->ni", B_gpu, u_gpu)
else:
x_pred += cp.einsum("nij,nj->ni", B_gpu, u_gpu)
# Batch covariance prediction: P_pred = F @ P @ F' + Q
# Step 1: FP = F @ P
FP = cp.einsum("nij,njk->nik", F_batch, P_gpu)
# Step 2: P_pred = FP @ F' + Q
P_pred = cp.einsum("nij,nkj->nik", FP, F_batch) + Q_batch
# Ensure symmetry
P_pred = (P_pred + cp.swapaxes(P_pred, -2, -1)) / 2
return BatchKalmanPrediction(x=x_pred, P=P_pred)
[docs]
@requires("cupy", extra="gpu", feature="GPU Kalman filter")
def batch_kf_update(
x: ArrayLike,
P: ArrayLike,
z: ArrayLike,
H: ArrayLike,
R: ArrayLike,
) -> BatchKalmanUpdate:
"""
Batch Kalman filter update for multiple tracks.
Performs the update step for N tracks in parallel on GPU:
y[i] = z[i] - H @ x[i] (innovation)
S[i] = H @ P[i] @ H' + R (innovation covariance)
K[i] = P[i] @ H' @ S[i]^{-1} (Kalman gain)
x_upd[i] = x[i] + K[i] @ y[i] (updated state)
P_upd[i] = (I - K[i] @ H) @ P[i] (updated covariance)
Parameters
----------
x : array_like
Predicted state estimates, shape (n_tracks, state_dim).
P : array_like
Predicted covariances, shape (n_tracks, state_dim, state_dim).
z : array_like
Measurements, shape (n_tracks, meas_dim).
H : array_like
Measurement matrix, shape (meas_dim, state_dim).
Can also be (n_tracks, meas_dim, state_dim).
R : array_like
Measurement noise covariance, shape (meas_dim, meas_dim).
Can also be (n_tracks, meas_dim, meas_dim).
Returns
-------
result : BatchKalmanUpdate
Named tuple with updated states, covariances, and statistics.
Examples
--------
>>> import numpy as np
>>> from pytcl.gpu.kalman import batch_kf_update
>>> n_tracks = 100
>>> x = np.random.randn(n_tracks, 4)
>>> P = np.tile(np.eye(4) * 0.1, (n_tracks, 1, 1))
>>> z = np.random.randn(n_tracks, 2) # position measurements
>>> H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]])
>>> R = np.eye(2) * 0.5
>>> upd = batch_kf_update(x, P, z, H, R)
>>> upd.x.shape
(100, 4)
"""
cp = import_optional("cupy", extra="gpu", feature="GPU Kalman filter")
# Move arrays to GPU
x_gpu = ensure_gpu_array(x, dtype=cp.float64)
P_gpu = ensure_gpu_array(P, dtype=cp.float64)
z_gpu = ensure_gpu_array(z, dtype=cp.float64)
H_gpu = ensure_gpu_array(H, dtype=cp.float64)
R_gpu = ensure_gpu_array(R, dtype=cp.float64)
n_tracks = x_gpu.shape[0]
state_dim = x_gpu.shape[1]
meas_dim = z_gpu.shape[1]
# Handle H matrix dimensions
if H_gpu.ndim == 2:
H_batch = cp.broadcast_to(H_gpu, (n_tracks, meas_dim, state_dim))
else:
H_batch = H_gpu
# Handle R matrix dimensions
if R_gpu.ndim == 2:
R_batch = cp.broadcast_to(R_gpu, (n_tracks, meas_dim, meas_dim))
else:
R_batch = R_gpu
# Innovation: y = z - H @ x
z_pred = cp.einsum("nij,nj->ni", H_batch, x_gpu)
y = z_gpu - z_pred
# Innovation covariance: S = H @ P @ H' + R
HP = cp.einsum("nij,njk->nik", H_batch, P_gpu)
S = cp.einsum("nij,nkj->nik", HP, H_batch) + R_batch
# Kalman gain: K = P @ H' @ S^{-1}
# First compute P @ H'
PHT = cp.einsum("nij,nkj->nik", P_gpu, H_batch)
# Batch matrix inverse using batched solve
# K = PHT @ S^{-1} is equivalent to solving S @ K' = PHT' for K
# But for efficiency, we solve S @ X = I for S^{-1}, then compute K = PHT @ S^{-1}
S_inv = cp.linalg.inv(S)
K = cp.einsum("nij,njk->nik", PHT, S_inv)
# Updated state: x_upd = x + K @ y
x_upd = x_gpu + cp.einsum("nij,nj->ni", K, y)
# Updated covariance using Joseph form: P_upd = (I - K @ H) @ P @ (I - K @ H)' + K @ R @ K'
eye = cp.eye(state_dim, dtype=cp.float64)
I_KH = eye - cp.einsum("nij,njk->nik", K, H_batch)
# Joseph form for numerical stability
P_upd = cp.einsum("nij,njk->nik", I_KH, P_gpu)
P_upd = cp.einsum("nij,nkj->nik", P_upd, I_KH)
KRK = cp.einsum("nij,njk,nlk->nil", K, R_batch, K)
P_upd = P_upd + KRK
# Ensure symmetry
P_upd = (P_upd + cp.swapaxes(P_upd, -2, -1)) / 2
# Compute likelihoods
# log(L) = -0.5 * (y' @ S^{-1} @ y + log(det(S)) + m*log(2*pi))
mahal_sq = cp.einsum("ni,nij,nj->n", y, S_inv, y)
sign, logdet = cp.linalg.slogdet(S)
log_likelihood = -0.5 * (mahal_sq + logdet + meas_dim * np.log(2 * np.pi))
likelihood = cp.exp(log_likelihood)
return BatchKalmanUpdate(
x=x_upd,
P=P_upd,
y=y,
S=S,
K=K,
likelihood=likelihood,
)
@requires("cupy", extra="gpu", feature="GPU Kalman filter")
def batch_kf_predict_update(
x: ArrayLike,
P: ArrayLike,
z: ArrayLike,
F: ArrayLike,
Q: ArrayLike,
H: ArrayLike,
R: ArrayLike,
B: Optional[ArrayLike] = None,
u: Optional[ArrayLike] = None,
) -> BatchKalmanUpdate:
"""
Combined batch Kalman filter prediction and update.
Parameters
----------
x : array_like
Current state estimates, shape (n_tracks, state_dim).
P : array_like
Current covariances, shape (n_tracks, state_dim, state_dim).
z : array_like
Measurements, shape (n_tracks, meas_dim).
F : array_like
State transition matrix.
Q : array_like
Process noise covariance.
H : array_like
Measurement matrix.
R : array_like
Measurement noise covariance.
B : array_like, optional
Control input matrix.
u : array_like, optional
Control inputs.
Returns
-------
result : BatchKalmanUpdate
Named tuple with updated states, covariances, and statistics.
Examples
--------
>>> import numpy as np
>>> from pytcl.gpu.kalman import batch_kf_predict_update
>>> n_tracks = 50
>>> x = np.random.randn(n_tracks, 2)
>>> P = np.tile(np.eye(2), (n_tracks, 1, 1))
>>> z = np.random.randn(n_tracks, 2) # Measurements
>>> F = np.array([[1, 0.1], [0, 1]]) # Constant velocity
>>> Q = np.eye(2) * 0.01
>>> H = np.eye(2)
>>> R = np.eye(2) * 0.1
>>> result = batch_kf_predict_update(x, P, z, F, Q, H, R)
>>> result.x.shape
(50, 2)
"""
pred = batch_kf_predict(x, P, F, Q, B, u)
return batch_kf_update(pred.x, pred.P, z, H, R)
[docs]
class CuPyKalmanFilter:
"""
GPU-accelerated Linear Kalman Filter for batch processing.
This class provides a stateful interface for processing multiple tracks
in parallel on the GPU. It maintains the filter matrices and provides
methods for prediction and update.
Parameters
----------
state_dim : int
Dimension of the state vector.
meas_dim : int
Dimension of the measurement vector.
F : array_like, optional
State transition matrix. If None, uses identity.
H : array_like, optional
Measurement matrix. If None, measures first meas_dim states.
Q : array_like, optional
Process noise covariance. If None, uses 0.01 * I.
R : array_like, optional
Measurement noise covariance. If None, uses 1.0 * I.
Examples
--------
>>> import numpy as np
>>> from pytcl.gpu.kalman import CuPyKalmanFilter
>>>
>>> # Create filter for 2D constant velocity model
>>> kf = CuPyKalmanFilter(
... state_dim=4, # [x, vx, y, vy]
... meas_dim=2, # [x, y]
... F=np.array([[1, 1, 0, 0], [0, 1, 0, 0],
... [0, 0, 1, 1], [0, 0, 0, 1]]),
... H=np.array([[1, 0, 0, 0], [0, 0, 1, 0]]),
... Q=np.eye(4) * 0.1,
... R=np.eye(2) * 1.0,
... )
>>>
>>> # Process batch of tracks
>>> n_tracks = 1000
>>> x = np.random.randn(n_tracks, 4)
>>> P = np.tile(np.eye(4), (n_tracks, 1, 1))
>>> z = np.random.randn(n_tracks, 2)
>>>
>>> # Predict and update
>>> x_pred, P_pred = kf.predict(x, P)
>>> result = kf.update(x_pred, P_pred, z)
"""
[docs]
@requires("cupy", extra="gpu", feature="GPU Kalman filter")
def __init__(
self,
state_dim: int,
meas_dim: int,
F: Optional[ArrayLike] = None,
H: Optional[ArrayLike] = None,
Q: Optional[ArrayLike] = None,
R: Optional[ArrayLike] = None,
):
cp = import_optional("cupy", extra="gpu", feature="GPU Kalman filter")
self.state_dim = state_dim
self.meas_dim = meas_dim
# Initialize matrices on GPU
if F is None:
self.F = cp.eye(state_dim, dtype=cp.float64)
else:
self.F = ensure_gpu_array(F, dtype=cp.float64)
if H is None:
self.H = cp.zeros((meas_dim, state_dim), dtype=cp.float64)
self.H[:meas_dim, :meas_dim] = cp.eye(meas_dim, dtype=cp.float64)
else:
self.H = ensure_gpu_array(H, dtype=cp.float64)
if Q is None:
self.Q = cp.eye(state_dim, dtype=cp.float64) * 0.01
else:
self.Q = ensure_gpu_array(Q, dtype=cp.float64)
if R is None:
self.R = cp.eye(meas_dim, dtype=cp.float64)
else:
self.R = ensure_gpu_array(R, dtype=cp.float64)
[docs]
def predict(
self,
x: ArrayLike,
P: ArrayLike,
B: Optional[ArrayLike] = None,
u: Optional[ArrayLike] = None,
) -> Tuple[NDArray[np.floating], NDArray[np.floating]]:
"""
Perform batch prediction.
Parameters
----------
x : array_like
State estimates, shape (n_tracks, state_dim).
P : array_like
Covariances, shape (n_tracks, state_dim, state_dim).
B : array_like, optional
Control input matrix.
u : array_like, optional
Control inputs.
Returns
-------
x_pred : ndarray
Predicted states.
P_pred : ndarray
Predicted covariances.
"""
result = batch_kf_predict(x, P, self.F, self.Q, B, u)
return result.x, result.P
[docs]
def update(
self,
x: ArrayLike,
P: ArrayLike,
z: ArrayLike,
) -> BatchKalmanUpdate:
"""
Perform batch update.
Parameters
----------
x : array_like
Predicted state estimates.
P : array_like
Predicted covariances.
z : array_like
Measurements.
Returns
-------
result : BatchKalmanUpdate
Update results including states, covariances, and statistics.
"""
return batch_kf_update(x, P, z, self.H, self.R)
[docs]
def predict_update(
self,
x: ArrayLike,
P: ArrayLike,
z: ArrayLike,
B: Optional[ArrayLike] = None,
u: Optional[ArrayLike] = None,
) -> BatchKalmanUpdate:
"""
Combined batch prediction and update.
Parameters
----------
x : array_like
Current state estimates.
P : array_like
Current covariances.
z : array_like
Measurements.
B : array_like, optional
Control input matrix.
u : array_like, optional
Control inputs.
Returns
-------
result : BatchKalmanUpdate
Update results.
"""
return batch_kf_predict_update(x, P, z, self.F, self.Q, self.H, self.R, B, u)
__all__ = [
"BatchKalmanPrediction",
"BatchKalmanUpdate",
"batch_kf_predict",
"batch_kf_update",
"batch_kf_predict_update",
"CuPyKalmanFilter",
]