Source code for pytcl.gpu.kalman

"""
GPU-accelerated Linear Kalman Filter using CuPy.

This module provides GPU-accelerated implementations of the linear Kalman filter
for batch processing of multiple tracks. The implementations achieve 5-10x
speedup compared to CPU for batch sizes > 100.

Key Features
------------
- Batch processing of multiple tracks in parallel
- Memory-efficient operations using CuPy's memory pool
- Compatible API with CPU implementations
- Automatic fallback to CPU if GPU unavailable

Examples
--------
Batch predict for 1000 tracks:

>>> from pytcl.gpu.kalman import batch_kf_predict
>>> import numpy as np
>>> n_tracks = 1000
>>> state_dim = 4
>>> x = np.random.randn(n_tracks, state_dim)
>>> P = np.tile(np.eye(state_dim), (n_tracks, 1, 1))
>>> F = np.array([[1, 1, 0, 0], [0, 1, 0, 0], [0, 0, 1, 1], [0, 0, 0, 1]])
>>> Q = np.eye(state_dim) * 0.1
>>> x_pred, P_pred = batch_kf_predict(x, P, F, Q)

See Also
--------
pytcl.dynamic_estimation.kalman.linear : CPU Kalman filter
pytcl.gpu.ekf : GPU Extended Kalman filter
"""

from typing import NamedTuple, Optional, Tuple

import numpy as np
from numpy.typing import ArrayLike, NDArray

from pytcl.core.optional_deps import import_optional, requires
from pytcl.gpu.utils import ensure_gpu_array


class BatchKalmanPrediction(NamedTuple):
    """Result of batch Kalman filter prediction.

    Attributes
    ----------
    x : ndarray
        Predicted state estimates, shape (n_tracks, state_dim).
    P : ndarray
        Predicted covariances, shape (n_tracks, state_dim, state_dim).
    """

    x: NDArray[np.floating]
    P: NDArray[np.floating]


class BatchKalmanUpdate(NamedTuple):
    """Result of batch Kalman filter update.

    Attributes
    ----------
    x : ndarray
        Updated state estimates, shape (n_tracks, state_dim).
    P : ndarray
        Updated covariances, shape (n_tracks, state_dim, state_dim).
    y : ndarray
        Innovations, shape (n_tracks, meas_dim).
    S : ndarray
        Innovation covariances, shape (n_tracks, meas_dim, meas_dim).
    K : ndarray
        Kalman gains, shape (n_tracks, state_dim, meas_dim).
    likelihood : ndarray
        Measurement likelihoods, shape (n_tracks,).
    """

    x: NDArray[np.floating]
    P: NDArray[np.floating]
    y: NDArray[np.floating]
    S: NDArray[np.floating]
    K: NDArray[np.floating]
    likelihood: NDArray[np.floating]


[docs] @requires("cupy", extra="gpu", feature="GPU Kalman filter") def batch_kf_predict( x: ArrayLike, P: ArrayLike, F: ArrayLike, Q: ArrayLike, B: Optional[ArrayLike] = None, u: Optional[ArrayLike] = None, ) -> BatchKalmanPrediction: """ Batch Kalman filter prediction for multiple tracks. Performs the prediction step for N tracks in parallel on GPU: x_pred[i] = F @ x[i] + B @ u[i] (if B, u provided) P_pred[i] = F @ P[i] @ F' + Q Parameters ---------- x : array_like Current state estimates, shape (n_tracks, state_dim). P : array_like Current covariances, shape (n_tracks, state_dim, state_dim). F : array_like State transition matrix, shape (state_dim, state_dim). Can also be (n_tracks, state_dim, state_dim) for track-specific matrices. Q : array_like Process noise covariance, shape (state_dim, state_dim). Can also be (n_tracks, state_dim, state_dim) for track-specific noise. B : array_like, optional Control input matrix, shape (state_dim, control_dim). u : array_like, optional Control inputs, shape (n_tracks, control_dim). Returns ------- result : BatchKalmanPrediction Named tuple with predicted states and covariances. Examples -------- >>> import numpy as np >>> from pytcl.gpu.kalman import batch_kf_predict >>> n_tracks = 100 >>> x = np.random.randn(n_tracks, 4) >>> P = np.tile(np.eye(4) * 0.1, (n_tracks, 1, 1)) >>> F = np.array([[1, 1, 0, 0], [0, 1, 0, 0], ... [0, 0, 1, 1], [0, 0, 0, 1]]) >>> Q = np.eye(4) * 0.01 >>> pred = batch_kf_predict(x, P, F, Q) >>> pred.x.shape (100, 4) """ cp = import_optional("cupy", extra="gpu", feature="GPU Kalman filter") # Move arrays to GPU x_gpu = ensure_gpu_array(x, dtype=cp.float64) P_gpu = ensure_gpu_array(P, dtype=cp.float64) F_gpu = ensure_gpu_array(F, dtype=cp.float64) Q_gpu = ensure_gpu_array(Q, dtype=cp.float64) n_tracks = x_gpu.shape[0] state_dim = x_gpu.shape[1] # Handle F matrix dimensions if F_gpu.ndim == 2: # Broadcast F to all tracks: (n, n) -> (n_tracks, n, n) F_batch = cp.broadcast_to(F_gpu, (n_tracks, state_dim, state_dim)) else: F_batch = F_gpu # Handle Q matrix dimensions if Q_gpu.ndim == 2: Q_batch = cp.broadcast_to(Q_gpu, (n_tracks, state_dim, state_dim)) else: Q_batch = Q_gpu # Batch prediction: x_pred = F @ x # Use einsum for batched matrix-vector multiplication x_pred = cp.einsum("nij,nj->ni", F_batch, x_gpu) # Add control input if provided if B is not None and u is not None: B_gpu = ensure_gpu_array(B, dtype=cp.float64) u_gpu = ensure_gpu_array(u, dtype=cp.float64) if B_gpu.ndim == 2: # Broadcast B x_pred += cp.einsum("ij,nj->ni", B_gpu, u_gpu) else: x_pred += cp.einsum("nij,nj->ni", B_gpu, u_gpu) # Batch covariance prediction: P_pred = F @ P @ F' + Q # Step 1: FP = F @ P FP = cp.einsum("nij,njk->nik", F_batch, P_gpu) # Step 2: P_pred = FP @ F' + Q P_pred = cp.einsum("nij,nkj->nik", FP, F_batch) + Q_batch # Ensure symmetry P_pred = (P_pred + cp.swapaxes(P_pred, -2, -1)) / 2 return BatchKalmanPrediction(x=x_pred, P=P_pred)
[docs] @requires("cupy", extra="gpu", feature="GPU Kalman filter") def batch_kf_update( x: ArrayLike, P: ArrayLike, z: ArrayLike, H: ArrayLike, R: ArrayLike, ) -> BatchKalmanUpdate: """ Batch Kalman filter update for multiple tracks. Performs the update step for N tracks in parallel on GPU: y[i] = z[i] - H @ x[i] (innovation) S[i] = H @ P[i] @ H' + R (innovation covariance) K[i] = P[i] @ H' @ S[i]^{-1} (Kalman gain) x_upd[i] = x[i] + K[i] @ y[i] (updated state) P_upd[i] = (I - K[i] @ H) @ P[i] (updated covariance) Parameters ---------- x : array_like Predicted state estimates, shape (n_tracks, state_dim). P : array_like Predicted covariances, shape (n_tracks, state_dim, state_dim). z : array_like Measurements, shape (n_tracks, meas_dim). H : array_like Measurement matrix, shape (meas_dim, state_dim). Can also be (n_tracks, meas_dim, state_dim). R : array_like Measurement noise covariance, shape (meas_dim, meas_dim). Can also be (n_tracks, meas_dim, meas_dim). Returns ------- result : BatchKalmanUpdate Named tuple with updated states, covariances, and statistics. Examples -------- >>> import numpy as np >>> from pytcl.gpu.kalman import batch_kf_update >>> n_tracks = 100 >>> x = np.random.randn(n_tracks, 4) >>> P = np.tile(np.eye(4) * 0.1, (n_tracks, 1, 1)) >>> z = np.random.randn(n_tracks, 2) # position measurements >>> H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]]) >>> R = np.eye(2) * 0.5 >>> upd = batch_kf_update(x, P, z, H, R) >>> upd.x.shape (100, 4) """ cp = import_optional("cupy", extra="gpu", feature="GPU Kalman filter") # Move arrays to GPU x_gpu = ensure_gpu_array(x, dtype=cp.float64) P_gpu = ensure_gpu_array(P, dtype=cp.float64) z_gpu = ensure_gpu_array(z, dtype=cp.float64) H_gpu = ensure_gpu_array(H, dtype=cp.float64) R_gpu = ensure_gpu_array(R, dtype=cp.float64) n_tracks = x_gpu.shape[0] state_dim = x_gpu.shape[1] meas_dim = z_gpu.shape[1] # Handle H matrix dimensions if H_gpu.ndim == 2: H_batch = cp.broadcast_to(H_gpu, (n_tracks, meas_dim, state_dim)) else: H_batch = H_gpu # Handle R matrix dimensions if R_gpu.ndim == 2: R_batch = cp.broadcast_to(R_gpu, (n_tracks, meas_dim, meas_dim)) else: R_batch = R_gpu # Innovation: y = z - H @ x z_pred = cp.einsum("nij,nj->ni", H_batch, x_gpu) y = z_gpu - z_pred # Innovation covariance: S = H @ P @ H' + R HP = cp.einsum("nij,njk->nik", H_batch, P_gpu) S = cp.einsum("nij,nkj->nik", HP, H_batch) + R_batch # Kalman gain: K = P @ H' @ S^{-1} # First compute P @ H' PHT = cp.einsum("nij,nkj->nik", P_gpu, H_batch) # Batch matrix inverse using batched solve # K = PHT @ S^{-1} is equivalent to solving S @ K' = PHT' for K # But for efficiency, we solve S @ X = I for S^{-1}, then compute K = PHT @ S^{-1} S_inv = cp.linalg.inv(S) K = cp.einsum("nij,njk->nik", PHT, S_inv) # Updated state: x_upd = x + K @ y x_upd = x_gpu + cp.einsum("nij,nj->ni", K, y) # Updated covariance using Joseph form: P_upd = (I - K @ H) @ P @ (I - K @ H)' + K @ R @ K' eye = cp.eye(state_dim, dtype=cp.float64) I_KH = eye - cp.einsum("nij,njk->nik", K, H_batch) # Joseph form for numerical stability P_upd = cp.einsum("nij,njk->nik", I_KH, P_gpu) P_upd = cp.einsum("nij,nkj->nik", P_upd, I_KH) KRK = cp.einsum("nij,njk,nlk->nil", K, R_batch, K) P_upd = P_upd + KRK # Ensure symmetry P_upd = (P_upd + cp.swapaxes(P_upd, -2, -1)) / 2 # Compute likelihoods # log(L) = -0.5 * (y' @ S^{-1} @ y + log(det(S)) + m*log(2*pi)) mahal_sq = cp.einsum("ni,nij,nj->n", y, S_inv, y) sign, logdet = cp.linalg.slogdet(S) log_likelihood = -0.5 * (mahal_sq + logdet + meas_dim * np.log(2 * np.pi)) likelihood = cp.exp(log_likelihood) return BatchKalmanUpdate( x=x_upd, P=P_upd, y=y, S=S, K=K, likelihood=likelihood, )
@requires("cupy", extra="gpu", feature="GPU Kalman filter") def batch_kf_predict_update( x: ArrayLike, P: ArrayLike, z: ArrayLike, F: ArrayLike, Q: ArrayLike, H: ArrayLike, R: ArrayLike, B: Optional[ArrayLike] = None, u: Optional[ArrayLike] = None, ) -> BatchKalmanUpdate: """ Combined batch Kalman filter prediction and update. Parameters ---------- x : array_like Current state estimates, shape (n_tracks, state_dim). P : array_like Current covariances, shape (n_tracks, state_dim, state_dim). z : array_like Measurements, shape (n_tracks, meas_dim). F : array_like State transition matrix. Q : array_like Process noise covariance. H : array_like Measurement matrix. R : array_like Measurement noise covariance. B : array_like, optional Control input matrix. u : array_like, optional Control inputs. Returns ------- result : BatchKalmanUpdate Named tuple with updated states, covariances, and statistics. Examples -------- >>> import numpy as np >>> from pytcl.gpu.kalman import batch_kf_predict_update >>> n_tracks = 50 >>> x = np.random.randn(n_tracks, 2) >>> P = np.tile(np.eye(2), (n_tracks, 1, 1)) >>> z = np.random.randn(n_tracks, 2) # Measurements >>> F = np.array([[1, 0.1], [0, 1]]) # Constant velocity >>> Q = np.eye(2) * 0.01 >>> H = np.eye(2) >>> R = np.eye(2) * 0.1 >>> result = batch_kf_predict_update(x, P, z, F, Q, H, R) >>> result.x.shape (50, 2) """ pred = batch_kf_predict(x, P, F, Q, B, u) return batch_kf_update(pred.x, pred.P, z, H, R)
[docs] class CuPyKalmanFilter: """ GPU-accelerated Linear Kalman Filter for batch processing. This class provides a stateful interface for processing multiple tracks in parallel on the GPU. It maintains the filter matrices and provides methods for prediction and update. Parameters ---------- state_dim : int Dimension of the state vector. meas_dim : int Dimension of the measurement vector. F : array_like, optional State transition matrix. If None, uses identity. H : array_like, optional Measurement matrix. If None, measures first meas_dim states. Q : array_like, optional Process noise covariance. If None, uses 0.01 * I. R : array_like, optional Measurement noise covariance. If None, uses 1.0 * I. Examples -------- >>> import numpy as np >>> from pytcl.gpu.kalman import CuPyKalmanFilter >>> >>> # Create filter for 2D constant velocity model >>> kf = CuPyKalmanFilter( ... state_dim=4, # [x, vx, y, vy] ... meas_dim=2, # [x, y] ... F=np.array([[1, 1, 0, 0], [0, 1, 0, 0], ... [0, 0, 1, 1], [0, 0, 0, 1]]), ... H=np.array([[1, 0, 0, 0], [0, 0, 1, 0]]), ... Q=np.eye(4) * 0.1, ... R=np.eye(2) * 1.0, ... ) >>> >>> # Process batch of tracks >>> n_tracks = 1000 >>> x = np.random.randn(n_tracks, 4) >>> P = np.tile(np.eye(4), (n_tracks, 1, 1)) >>> z = np.random.randn(n_tracks, 2) >>> >>> # Predict and update >>> x_pred, P_pred = kf.predict(x, P) >>> result = kf.update(x_pred, P_pred, z) """
[docs] @requires("cupy", extra="gpu", feature="GPU Kalman filter") def __init__( self, state_dim: int, meas_dim: int, F: Optional[ArrayLike] = None, H: Optional[ArrayLike] = None, Q: Optional[ArrayLike] = None, R: Optional[ArrayLike] = None, ): cp = import_optional("cupy", extra="gpu", feature="GPU Kalman filter") self.state_dim = state_dim self.meas_dim = meas_dim # Initialize matrices on GPU if F is None: self.F = cp.eye(state_dim, dtype=cp.float64) else: self.F = ensure_gpu_array(F, dtype=cp.float64) if H is None: self.H = cp.zeros((meas_dim, state_dim), dtype=cp.float64) self.H[:meas_dim, :meas_dim] = cp.eye(meas_dim, dtype=cp.float64) else: self.H = ensure_gpu_array(H, dtype=cp.float64) if Q is None: self.Q = cp.eye(state_dim, dtype=cp.float64) * 0.01 else: self.Q = ensure_gpu_array(Q, dtype=cp.float64) if R is None: self.R = cp.eye(meas_dim, dtype=cp.float64) else: self.R = ensure_gpu_array(R, dtype=cp.float64)
[docs] def predict( self, x: ArrayLike, P: ArrayLike, B: Optional[ArrayLike] = None, u: Optional[ArrayLike] = None, ) -> Tuple[NDArray[np.floating], NDArray[np.floating]]: """ Perform batch prediction. Parameters ---------- x : array_like State estimates, shape (n_tracks, state_dim). P : array_like Covariances, shape (n_tracks, state_dim, state_dim). B : array_like, optional Control input matrix. u : array_like, optional Control inputs. Returns ------- x_pred : ndarray Predicted states. P_pred : ndarray Predicted covariances. """ result = batch_kf_predict(x, P, self.F, self.Q, B, u) return result.x, result.P
[docs] def update( self, x: ArrayLike, P: ArrayLike, z: ArrayLike, ) -> BatchKalmanUpdate: """ Perform batch update. Parameters ---------- x : array_like Predicted state estimates. P : array_like Predicted covariances. z : array_like Measurements. Returns ------- result : BatchKalmanUpdate Update results including states, covariances, and statistics. """ return batch_kf_update(x, P, z, self.H, self.R)
[docs] def predict_update( self, x: ArrayLike, P: ArrayLike, z: ArrayLike, B: Optional[ArrayLike] = None, u: Optional[ArrayLike] = None, ) -> BatchKalmanUpdate: """ Combined batch prediction and update. Parameters ---------- x : array_like Current state estimates. P : array_like Current covariances. z : array_like Measurements. B : array_like, optional Control input matrix. u : array_like, optional Control inputs. Returns ------- result : BatchKalmanUpdate Update results. """ return batch_kf_predict_update(x, P, z, self.F, self.Q, self.H, self.R, B, u)
__all__ = [ "BatchKalmanPrediction", "BatchKalmanUpdate", "batch_kf_predict", "batch_kf_update", "batch_kf_predict_update", "CuPyKalmanFilter", ]