GPU Acceleration
The GPU module provides hardware-accelerated implementations of key tracking algorithms using CuPy (NVIDIA CUDA) or MLX (Apple Silicon). These implementations offer significant speedups (5-15x) for batch processing of multiple tracks.
The module automatically selects the best available backend:
On Apple Silicon (M1/M2/M3): Uses MLX if installed
On systems with NVIDIA GPUs: Uses CuPy if installed
Falls back to CPU (numpy) if no GPU backend is available
Installation
For NVIDIA CUDA GPUs:
pip install nrl-tracker[gpu]
# or directly:
pip install cupy-cuda12x
For Apple Silicon (M1/M2/M3):
pip install nrl-tracker[gpu-apple]
# or directly:
pip install mlx
Quick Start
Check GPU availability and backend:
from pytcl.gpu import is_gpu_available, get_backend, is_apple_silicon
if is_gpu_available():
print(f"GPU available, using {get_backend()} backend")
if is_apple_silicon():
print("Running on Apple Silicon")
Transfer arrays between CPU and GPU:
from pytcl.gpu import to_gpu, to_cpu
import numpy as np
# CPU array
x = np.random.randn(100, 4)
# Transfer to GPU (uses best available backend)
x_gpu = to_gpu(x)
# Transfer back to CPU
x_cpu = to_cpu(x_gpu)
Platform Detection
- pytcl.gpu.utils.is_apple_silicon()[source]
Check if running on Apple Silicon (ARM64 Mac).
- Returns:
True if running on Apple Silicon (M1, M2, M3, etc.).
- Return type:
Examples
>>> from pytcl.gpu.utils import is_apple_silicon >>> if is_apple_silicon(): ... print("Running on Apple Silicon")
- pytcl.gpu.utils.is_mlx_available()[source]
Check if MLX acceleration is available (Apple Silicon).
Returns True if: - Running on Apple Silicon (ARM64 Mac) - MLX is installed
- Returns:
True if MLX acceleration is available.
- Return type:
Examples
>>> from pytcl.gpu.utils import is_mlx_available >>> if is_mlx_available(): ... print("MLX acceleration enabled")
- pytcl.gpu.utils.is_cupy_available()[source]
Check if CuPy (CUDA) acceleration is available.
Returns True if: - CuPy is installed - A CUDA-capable GPU is detected - CUDA runtime is functional
- Returns:
True if CuPy acceleration is available.
- Return type:
Examples
>>> from pytcl.gpu.utils import is_cupy_available >>> if is_cupy_available(): ... print("CUDA GPU available")
- pytcl.gpu.utils.get_backend()[source]
Get the best available GPU backend for the current platform.
Priority: 1. MLX on Apple Silicon 2. CuPy on systems with NVIDIA GPUs 3. numpy (CPU fallback)
- Returns:
One of “mlx”, “cupy”, or “numpy”.
- Return type:
Examples
>>> from pytcl.gpu.utils import get_backend >>> backend = get_backend() >>> print(f"Using {backend} backend")
- pytcl.gpu.utils.is_gpu_available()[source]
Check if GPU acceleration is available.
Returns True if either: - MLX is available (Apple Silicon) - CuPy is available with a CUDA GPU
- Returns:
True if GPU acceleration is available.
- Return type:
Examples
>>> from pytcl.gpu.utils import is_gpu_available >>> if is_gpu_available(): ... print("GPU acceleration enabled") ... else: ... print("Falling back to CPU")
Notes
The result is cached after the first call for performance. Use get_backend() to determine which backend is being used.
Array Operations
- pytcl.gpu.utils.to_gpu(arr, dtype=None, backend=None)[source]
Transfer an array to GPU memory.
Automatically selects the best available backend (MLX on Apple Silicon, CuPy on NVIDIA GPUs) unless a specific backend is requested.
- Parameters:
arr (array_like) – Input array (typically numpy).
dtype (dtype, optional) – Data type for the GPU array. If None, uses the input dtype.
backend (str, optional) – Specific backend to use (“mlx”, “cupy”). If None, auto-selects.
- Returns:
Array in GPU memory (cupy.ndarray or mlx.array).
- Return type:
GPUArray
- Raises:
DependencyError – If required backend is not installed.
RuntimeError – If no GPU is available.
Examples
>>> import numpy as np >>> from pytcl.gpu.utils import to_gpu, is_gpu_available >>> x = np.array([1.0, 2.0, 3.0]) >>> if is_gpu_available(): ... x_gpu = to_gpu(x) ... print(type(x_gpu).__name__) 'ndarray' # cupy.ndarray or 'array' for mlx
Notes
If the input is already a GPU array, it is returned as-is (or converted to the requested dtype).
- pytcl.gpu.utils.to_cpu(arr)[source]
Transfer an array from GPU to CPU memory.
- Parameters:
arr (array_like, cupy.ndarray, or mlx.array) – Input array (numpy, cupy, or mlx).
- Returns:
Array in CPU memory.
- Return type:
Examples
>>> import numpy as np >>> from pytcl.gpu.utils import to_gpu, to_cpu, is_gpu_available >>> x = np.array([1.0, 2.0, 3.0]) >>> if is_gpu_available(): ... x_gpu = to_gpu(x) ... x_cpu = to_cpu(x_gpu) ... np.allclose(x, x_cpu) True
Notes
If the input is already a numpy array, it is returned as-is.
- pytcl.gpu.utils.get_array_module(arr)[source]
Get the array module (numpy, cupy, or mlx.core) for the given array.
This function enables writing code that works with numpy, cupy, and mlx arrays by returning the appropriate module.
- Parameters:
arr (array_like) – Input array (numpy, cupy, or mlx).
- Returns:
numpy, cupy, or mlx.core module, depending on the input array type.
- Return type:
module
Examples
>>> import numpy as np >>> from pytcl.gpu.utils import get_array_module >>> x = np.array([1, 2, 3]) >>> xp = get_array_module(x) >>> xp is np True
>>> # With CuPy array >>> import cupy as cp >>> x_gpu = cp.array([1, 2, 3]) >>> xp = get_array_module(x_gpu) >>> xp is cp True
>>> # With MLX array >>> import mlx.core as mx >>> x_mlx = mx.array([1, 2, 3]) >>> xp = get_array_module(x_mlx) >>> xp.__name__ 'mlx.core'
- pytcl.gpu.utils.ensure_gpu_array(arr, dtype=<class 'numpy.float64'>, backend=None)[source]
Ensure an array is on the GPU with the specified dtype.
- Parameters:
arr (array_like) – Input array.
dtype (dtype) – Desired data type.
backend (str, optional) – Specific backend to use (“mlx”, “cupy”). If None, auto-selects.
- Returns:
Array on GPU with specified dtype (cupy.ndarray or mlx.array).
- Return type:
GPUArray
Examples
>>> import numpy as np >>> from pytcl.gpu.utils import ensure_gpu_array, is_gpu_available >>> x = np.array([1, 2, 3]) >>> if is_gpu_available(): ... x_gpu = ensure_gpu_array(x, dtype=np.float32) ... print(x_gpu.dtype)
Memory Management
- pytcl.gpu.utils.sync_gpu()[source]
Synchronize GPU operations.
This blocks until all pending GPU operations are complete. Useful for accurate timing measurements.
Examples
>>> import time >>> from pytcl.gpu.utils import sync_gpu, is_gpu_available >>> if is_gpu_available(): ... # ... perform GPU operations ... ... sync_gpu() # Wait for completion ... elapsed = time.time() - start
- pytcl.gpu.utils.get_gpu_memory_info()[source]
Get GPU memory usage information.
- Returns:
Dictionary with keys: - ‘backend’: Backend in use (“mlx”, “cupy”, or “numpy”) - ‘free’: Free memory in bytes (if available) - ‘total’: Total memory in bytes (if available) - ‘used’: Used memory in bytes (if available)
- Return type:
Examples
>>> from pytcl.gpu.utils import get_gpu_memory_info, is_gpu_available >>> if is_gpu_available(): ... info = get_gpu_memory_info() ... print(f"Backend: {info['backend']}")
- pytcl.gpu.utils.clear_gpu_memory()[source]
Clear GPU memory pools.
This frees cached memory blocks held by the GPU backend. Call this when you need to free GPU memory for other operations.
Examples
>>> from pytcl.gpu.utils import clear_gpu_memory, is_gpu_available >>> if is_gpu_available(): ... # ... perform GPU operations ... ... clear_gpu_memory() # Free cached memory
Batch Kalman Filter
GPU-accelerated batch Kalman filter operations for processing multiple tracks in parallel. These functions provide 5-10x speedup compared to sequential CPU processing.
- pytcl.gpu.kalman.batch_kf_predict(x, P, F, Q, B=None, u=None)[source]
Batch Kalman filter prediction for multiple tracks.
- Performs the prediction step for N tracks in parallel on GPU:
x_pred[i] = F @ x[i] + B @ u[i] (if B, u provided) P_pred[i] = F @ P[i] @ F’ + Q
- Parameters:
x (array_like) – Current state estimates, shape (n_tracks, state_dim).
P (array_like) – Current covariances, shape (n_tracks, state_dim, state_dim).
F (array_like) – State transition matrix, shape (state_dim, state_dim). Can also be (n_tracks, state_dim, state_dim) for track-specific matrices.
Q (array_like) – Process noise covariance, shape (state_dim, state_dim). Can also be (n_tracks, state_dim, state_dim) for track-specific noise.
B (array_like, optional) – Control input matrix, shape (state_dim, control_dim).
u (array_like, optional) – Control inputs, shape (n_tracks, control_dim).
- Returns:
result – Named tuple with predicted states and covariances.
- Return type:
BatchKalmanPrediction
Examples
>>> import numpy as np >>> from pytcl.gpu.kalman import batch_kf_predict >>> n_tracks = 100 >>> x = np.random.randn(n_tracks, 4) >>> P = np.tile(np.eye(4) * 0.1, (n_tracks, 1, 1)) >>> F = np.array([[1, 1, 0, 0], [0, 1, 0, 0], ... [0, 0, 1, 1], [0, 0, 0, 1]]) >>> Q = np.eye(4) * 0.01 >>> pred = batch_kf_predict(x, P, F, Q) >>> pred.x.shape (100, 4)
- pytcl.gpu.kalman.batch_kf_update(x, P, z, H, R)[source]
Batch Kalman filter update for multiple tracks.
- Performs the update step for N tracks in parallel on GPU:
y[i] = z[i] - H @ x[i] (innovation) S[i] = H @ P[i] @ H’ + R (innovation covariance) K[i] = P[i] @ H’ @ S[i]^{-1} (Kalman gain) x_upd[i] = x[i] + K[i] @ y[i] (updated state) P_upd[i] = (I - K[i] @ H) @ P[i] (updated covariance)
- Parameters:
x (array_like) – Predicted state estimates, shape (n_tracks, state_dim).
P (array_like) – Predicted covariances, shape (n_tracks, state_dim, state_dim).
z (array_like) – Measurements, shape (n_tracks, meas_dim).
H (array_like) – Measurement matrix, shape (meas_dim, state_dim). Can also be (n_tracks, meas_dim, state_dim).
R (array_like) – Measurement noise covariance, shape (meas_dim, meas_dim). Can also be (n_tracks, meas_dim, meas_dim).
- Returns:
result – Named tuple with updated states, covariances, and statistics.
- Return type:
BatchKalmanUpdate
Examples
>>> import numpy as np >>> from pytcl.gpu.kalman import batch_kf_update >>> n_tracks = 100 >>> x = np.random.randn(n_tracks, 4) >>> P = np.tile(np.eye(4) * 0.1, (n_tracks, 1, 1)) >>> z = np.random.randn(n_tracks, 2) # position measurements >>> H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]]) >>> R = np.eye(2) * 0.5 >>> upd = batch_kf_update(x, P, z, H, R) >>> upd.x.shape (100, 4)
- class pytcl.gpu.kalman.CuPyKalmanFilter(state_dim, meas_dim, F=None, H=None, Q=None, R=None)[source]
GPU-accelerated Linear Kalman Filter for batch processing.
This class provides a stateful interface for processing multiple tracks in parallel on the GPU. It maintains the filter matrices and provides methods for prediction and update.
- Parameters:
state_dim (int) – Dimension of the state vector.
meas_dim (int) – Dimension of the measurement vector.
F (array_like, optional) – State transition matrix. If None, uses identity.
H (array_like, optional) – Measurement matrix. If None, measures first meas_dim states.
Q (array_like, optional) – Process noise covariance. If None, uses 0.01 * I.
R (array_like, optional) – Measurement noise covariance. If None, uses 1.0 * I.
Examples
>>> import numpy as np >>> from pytcl.gpu.kalman import CuPyKalmanFilter >>> >>> # Create filter for 2D constant velocity model >>> kf = CuPyKalmanFilter( ... state_dim=4, # [x, vx, y, vy] ... meas_dim=2, # [x, y] ... F=np.array([[1, 1, 0, 0], [0, 1, 0, 0], ... [0, 0, 1, 1], [0, 0, 0, 1]]), ... H=np.array([[1, 0, 0, 0], [0, 0, 1, 0]]), ... Q=np.eye(4) * 0.1, ... R=np.eye(2) * 1.0, ... ) >>> >>> # Process batch of tracks >>> n_tracks = 1000 >>> x = np.random.randn(n_tracks, 4) >>> P = np.tile(np.eye(4), (n_tracks, 1, 1)) >>> z = np.random.randn(n_tracks, 2) >>> >>> # Predict and update >>> x_pred, P_pred = kf.predict(x, P) >>> result = kf.update(x_pred, P_pred, z)
- predict(x, P, B=None, u=None)[source]
Perform batch prediction.
- Parameters:
x (array_like) – State estimates, shape (n_tracks, state_dim).
P (array_like) – Covariances, shape (n_tracks, state_dim, state_dim).
B (array_like, optional) – Control input matrix.
u (array_like, optional) – Control inputs.
- Returns:
x_pred (ndarray) – Predicted states.
P_pred (ndarray) – Predicted covariances.
- Return type:
Tuple[ndarray[tuple[Any, …], dtype[floating]], ndarray[tuple[Any, …], dtype[floating]]]
- update(x, P, z)[source]
Perform batch update.
- Parameters:
x (array_like) – Predicted state estimates.
P (array_like) – Predicted covariances.
z (array_like) – Measurements.
- Returns:
result – Update results including states, covariances, and statistics.
- Return type:
BatchKalmanUpdate
- predict_update(x, P, z, B=None, u=None)[source]
Combined batch prediction and update.
- Parameters:
x (array_like) – Current state estimates.
P (array_like) – Current covariances.
z (array_like) – Measurements.
B (array_like, optional) – Control input matrix.
u (array_like, optional) – Control inputs.
- Returns:
result – Update results.
- Return type:
BatchKalmanUpdate
Batch Extended Kalman Filter
GPU-accelerated Extended Kalman Filter for nonlinear dynamics.
- pytcl.gpu.ekf.batch_ekf_predict(x, P, f, F_jacobian, Q)[source]
Batch EKF prediction for multiple tracks.
- Parameters:
x (array_like) – Current state estimates, shape (n_tracks, state_dim).
P (array_like) – Current covariances, shape (n_tracks, state_dim, state_dim).
f (callable) – Nonlinear dynamics function f(x) -> x_next. Applied to each track’s state vector.
F_jacobian (callable or None) – Jacobian of dynamics df/dx. If None, computed numerically.
Q (array_like) – Process noise covariance, shape (state_dim, state_dim) or (n_tracks, state_dim, state_dim).
- Returns:
result – Predicted states and covariances.
- Return type:
BatchEKFPrediction
Examples
>>> import numpy as np >>> from pytcl.gpu.ekf import batch_ekf_predict >>> # Nonlinear dynamics: coordinated turn >>> def f_turn(x): ... w = 0.01 # Turn rate ... return np.array([x[0] + np.cos(w)*x[2], ... x[1] + np.sin(w)*x[3], ... x[2], x[3]]) >>> def F_jacobian(x): ... w = 0.01 ... return np.array([[1, 0, np.cos(w), 0], ... [0, 1, np.sin(w), 0], ... [0, 0, 1, 0], ... [0, 0, 0, 1]]) >>> n_tracks = 30 >>> x = np.random.randn(n_tracks, 4) * 0.1 >>> P = np.tile(np.eye(4) * 0.01, (n_tracks, 1, 1)) >>> Q = np.eye(4) * 0.001 >>> result = batch_ekf_predict(x, P, f_turn, F_jacobian, Q) >>> result.x.shape (30, 4)
Notes
The nonlinear dynamics are applied on CPU (Python function), then covariance propagation is performed on GPU. This is efficient when the number of tracks is large relative to the cost of the dynamics.
- pytcl.gpu.ekf.batch_ekf_update(x, P, z, h, H_jacobian, R)[source]
Batch EKF update for multiple tracks.
- Parameters:
x (array_like) – Predicted state estimates, shape (n_tracks, state_dim).
P (array_like) – Predicted covariances, shape (n_tracks, state_dim, state_dim).
z (array_like) – Measurements, shape (n_tracks, meas_dim).
h (callable) – Nonlinear measurement function h(x) -> z_predicted.
H_jacobian (callable or None) – Jacobian of measurement function dh/dx. If None, computed numerically.
R (array_like) – Measurement noise covariance.
- Returns:
result – Update results including states, covariances, and statistics.
- Return type:
BatchEKFUpdate
Examples
>>> import numpy as np >>> from pytcl.gpu.ekf import batch_ekf_update >>> # Polar measurement from Cartesian state >>> def h_polar(x): ... r = np.sqrt(x[0]**2 + x[1]**2) ... theta = np.arctan2(x[1], x[0]) ... return np.array([r, theta]) >>> def H_jacobian(x): ... r = np.sqrt(x[0]**2 + x[1]**2) ... return np.array([[x[0]/r, x[1]/r], ... [-x[1]/r**2, x[0]/r**2]]) >>> n_tracks = 20 >>> x = np.random.randn(n_tracks, 2) >>> P = np.tile(np.eye(2), (n_tracks, 1, 1)) >>> z = np.random.randn(n_tracks, 2) * [100, 0.1] # r, theta >>> R = np.diag([10.0, 0.01]) >>> result = batch_ekf_update(x, P, z, h_polar, H_jacobian, R) >>> result.x.shape (20, 2)
- class pytcl.gpu.ekf.CuPyExtendedKalmanFilter(state_dim, meas_dim, f, h, F_jacobian=None, H_jacobian=None, Q=None, R=None)[source]
GPU-accelerated Extended Kalman Filter for batch processing.
- Parameters:
state_dim (int) – Dimension of state vector.
meas_dim (int) – Dimension of measurement vector.
f (callable) – Nonlinear dynamics function f(x) -> x_next.
h (callable) – Nonlinear measurement function h(x) -> z.
F_jacobian (callable, optional) – Jacobian of dynamics. If None, computed numerically.
H_jacobian (callable, optional) – Jacobian of measurement. If None, computed numerically.
Q (array_like, optional) – Process noise covariance.
R (array_like, optional) – Measurement noise covariance.
Examples
>>> import numpy as np >>> from pytcl.gpu.ekf import CuPyExtendedKalmanFilter >>> >>> # Nonlinear dynamics >>> def f(x): ... return np.array([x[0] + x[1], x[1] * 0.99]) >>> >>> def h(x): ... return np.array([np.sqrt(x[0]**2 + x[1]**2)]) >>> >>> ekf = CuPyExtendedKalmanFilter( ... state_dim=2, meas_dim=1, ... f=f, h=h, ... Q=np.eye(2) * 0.01, ... R=np.array([[0.1]]), ... )
Batch Unscented Kalman Filter
GPU-accelerated Unscented Kalman Filter for highly nonlinear systems.
- pytcl.gpu.ukf.batch_ukf_predict(x, P, f, Q, alpha=0.001, beta=2.0, kappa=0.0)[source]
Batch UKF prediction for multiple tracks.
- Parameters:
x (array_like) – Current state estimates, shape (n_tracks, state_dim).
P (array_like) – Current covariances, shape (n_tracks, state_dim, state_dim).
f (callable) – Nonlinear dynamics function f(x) -> x_next.
Q (array_like) – Process noise covariance.
alpha (float) – Sigma point parameters.
beta (float) – Sigma point parameters.
kappa (float) – Sigma point parameters.
- Returns:
result – Predicted states and covariances.
- Return type:
BatchUKFPrediction
Examples
>>> import numpy as np >>> from pytcl.gpu.ukf import batch_ukf_predict >>> # Nonlinear dynamics example >>> def f_dynamics(x): ... return np.array([x[0] + 0.1*x[1], x[1] * 0.99]) >>> n_tracks = 50 >>> x = np.random.randn(n_tracks, 2) >>> P = np.tile(np.eye(2) * 0.01, (n_tracks, 1, 1)) >>> Q = np.eye(2) * 0.001 >>> result = batch_ukf_predict(x, P, f_dynamics, Q) >>> result.x.shape (50, 2)
- pytcl.gpu.ukf.batch_ukf_update(x, P, z, h, R, alpha=0.001, beta=2.0, kappa=0.0)[source]
Batch UKF update for multiple tracks.
- Parameters:
x (array_like) – Predicted state estimates, shape (n_tracks, state_dim).
P (array_like) – Predicted covariances, shape (n_tracks, state_dim, state_dim).
z (array_like) – Measurements, shape (n_tracks, meas_dim).
h (callable) – Nonlinear measurement function h(x) -> z.
R (array_like) – Measurement noise covariance.
alpha (float) – Sigma point parameters.
beta (float) – Sigma point parameters.
kappa (float) – Sigma point parameters.
- Returns:
result – Update results.
- Return type:
BatchUKFUpdate
Examples
>>> import numpy as np >>> from pytcl.gpu.ukf import batch_ukf_update >>> # Nonlinear measurement example >>> def h_measurement(x): ... return np.sqrt(x[0]**2 + x[1]**2) # Range-only >>> n_tracks = 40 >>> x = np.random.randn(n_tracks, 2) >>> P = np.tile(np.eye(2), (n_tracks, 1, 1)) >>> z = np.random.randn(n_tracks, 1) * 10 + 100 >>> R = np.array([[1.0]]) >>> result = batch_ukf_update(x, P, z, h_measurement, R) >>> result.x.shape (40, 2)
- class pytcl.gpu.ukf.CuPyUnscentedKalmanFilter(state_dim, meas_dim, f, h, Q=None, R=None, alpha=0.001, beta=2.0, kappa=0.0)[source]
GPU-accelerated Unscented Kalman Filter for batch processing.
- Parameters:
state_dim (int) – Dimension of state vector.
meas_dim (int) – Dimension of measurement vector.
f (callable) – Nonlinear dynamics function.
h (callable) – Nonlinear measurement function.
Q (array_like, optional) – Process noise covariance.
R (array_like, optional) – Measurement noise covariance.
alpha (float) – Spread of sigma points (default 1e-3).
beta (float) – Prior knowledge parameter (default 2.0).
kappa (float) – Secondary scaling (default 0.0).
Examples
>>> import numpy as np >>> from pytcl.gpu.ukf import CuPyUnscentedKalmanFilter >>> >>> def f(x): ... return np.array([x[0] + x[1], x[1]]) >>> >>> def h(x): ... return np.array([np.sqrt(x[0]**2 + x[1]**2)]) >>> >>> ukf = CuPyUnscentedKalmanFilter( ... state_dim=2, meas_dim=1, ... f=f, h=h, ... )
GPU Particle Filter
GPU-accelerated particle filtering with efficient resampling algorithms.
- pytcl.gpu.particle_filter.gpu_resample_systematic(weights)[source]
GPU-accelerated systematic resampling.
Systematic resampling uses a single random number to select particles, resulting in low variance and O(N) complexity.
- Parameters:
weights (array_like) – Normalized particle weights, shape (n_particles,).
- Returns:
indices – Resampled particle indices, shape (n_particles,).
- Return type:
ndarray
Examples
>>> import numpy as np >>> from pytcl.gpu.particle_filter import gpu_resample_systematic >>> weights = np.array([0.1, 0.3, 0.4, 0.2]) >>> indices = gpu_resample_systematic(weights) >>> # Particles 1 and 2 will be selected more often
- pytcl.gpu.particle_filter.gpu_resample_multinomial(weights)[source]
GPU-accelerated multinomial resampling.
Multinomial resampling samples particles independently according to their weights.
- Parameters:
weights (array_like) – Normalized particle weights, shape (n_particles,).
- Returns:
indices – Resampled particle indices, shape (n_particles,).
- Return type:
ndarray
Examples
>>> import numpy as np >>> from pytcl.gpu.particle_filter import gpu_resample_multinomial >>> weights = np.array([0.1, 0.4, 0.5]) >>> indices = gpu_resample_multinomial(weights) >>> indices.shape (3,) >>> np.all(indices < 3) True
Notes
Multinomial resampling has higher variance than systematic resampling but is simpler and can be more efficient on GPU for certain sizes.
- pytcl.gpu.particle_filter.gpu_resample_stratified(weights)[source]
GPU-accelerated stratified resampling.
Stratified resampling divides the CDF into N equal strata and samples one particle from each stratum.
- Parameters:
weights (array_like) – Normalized particle weights, shape (n_particles,).
- Returns:
indices – Resampled particle indices, shape (n_particles,).
- Return type:
ndarray
- pytcl.gpu.particle_filter.gpu_effective_sample_size(weights)[source]
Compute effective sample size on GPU.
ESS = 1 / sum(w_i^2)
- Parameters:
weights (array_like) – Normalized particle weights.
- Returns:
ess – Effective sample size.
- Return type:
Examples
>>> import numpy as np >>> from pytcl.gpu.particle_filter import gpu_effective_sample_size >>> weights = np.array([0.1, 0.2, 0.3, 0.4]) >>> ess = gpu_effective_sample_size(weights) >>> ess > 0 True >>> ess <= len(weights) True
- pytcl.gpu.particle_filter.gpu_normalize_weights(log_weights)[source]
Normalize log weights to proper weights on GPU.
Uses log-sum-exp trick for numerical stability.
- Parameters:
log_weights (array_like) – Unnormalized log weights, shape (n_particles,).
- Returns:
weights (ndarray) – Normalized weights, shape (n_particles,).
log_likelihood (float) – Log of the normalization constant.
- Return type:
Examples
>>> import numpy as np >>> from pytcl.gpu.particle_filter import gpu_normalize_weights >>> log_w = np.array([-1.0, -0.5, -2.0]) >>> weights, log_likelihood = gpu_normalize_weights(log_w) >>> np.allclose(weights.sum(), 1.0) True >>> log_likelihood < 0 True
- class pytcl.gpu.particle_filter.CuPyParticleFilter(n_particles, state_dim, resample_method='systematic', resample_threshold=0.5)[source]
GPU-accelerated Bootstrap Particle Filter.
This class implements the Sequential Importance Resampling (SIR) particle filter with GPU acceleration.
- Parameters:
- particles
Current particle states, shape (n_particles, state_dim).
- Type:
cupy.ndarray
- weights
Current particle weights, shape (n_particles,).
- Type:
cupy.ndarray
Examples
>>> import numpy as np >>> from pytcl.gpu.particle_filter import CuPyParticleFilter >>> >>> # Initialize filter >>> pf = CuPyParticleFilter(n_particles=10000, state_dim=4) >>> pf.initialize(initial_state, initial_cov) >>> >>> # Run filter >>> for measurement in measurements: ... pf.predict(dynamics_fn) ... pf.update(measurement, likelihood_fn) ... state_estimate = pf.get_estimate()
- initialize(mean, cov)[source]
Initialize particles from Gaussian distribution.
- Parameters:
mean (array_like) – Mean state, shape (state_dim,).
cov (array_like) – Covariance matrix, shape (state_dim, state_dim).
- initialize_uniform(low, high)[source]
Initialize particles from uniform distribution.
- Parameters:
low (array_like) – Lower bounds, shape (state_dim,).
high (array_like) – Upper bounds, shape (state_dim,).
- predict(dynamics_fn, *args, **kwargs)[source]
Propagate particles through dynamics.
- Parameters:
Notes
The dynamics function receives CuPy arrays if GPU is available. It should return arrays of the same type.
- update(measurement, likelihood_fn)[source]
Update weights based on measurement likelihood.
- Parameters:
measurement (array_like) – Measurement vector.
likelihood_fn (callable) – Function that computes likelihood for each particle. Takes (particles, measurement) and returns likelihoods (n_particles,).
- Returns:
log_likelihood – Log of the marginal likelihood (normalization constant).
- Return type:
- get_estimate()[source]
Compute weighted mean estimate.
- Returns:
estimate – Weighted mean state, shape (state_dim,).
- Return type:
ndarray
- get_covariance()[source]
Compute weighted covariance estimate.
- Returns:
cov – Weighted covariance, shape (state_dim, state_dim).
- Return type:
ndarray
GPU Matrix Utilities
GPU-accelerated matrix operations commonly used in tracking algorithms.
- pytcl.gpu.matrix_utils.gpu_cholesky(A, lower=True)[source]
GPU-accelerated Cholesky decomposition.
Computes L such that A = L @ L.T (lower=True) or A = U.T @ U (lower=False).
- Parameters:
A (array_like) – Symmetric positive definite matrix, shape (n, n) or batch (k, n, n).
lower (bool) – If True, return lower triangular. If False, return upper triangular.
- Returns:
L – Cholesky factor, same shape as A.
- Return type:
ndarray
- Raises:
numpy.linalg.LinAlgError – If matrix is not positive definite.
Examples
>>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_cholesky >>> A = np.array([[4, 2], [2, 3]]) >>> L = gpu_cholesky(A) >>> np.allclose(L @ L.T, A) True
- pytcl.gpu.matrix_utils.gpu_cholesky_safe(A, lower=True, regularization=1e-10)[source]
GPU Cholesky decomposition with fallback for non-positive-definite matrices.
If standard Cholesky fails, adds regularization to diagonal and retries.
- Parameters:
- Returns:
L (ndarray) – Cholesky factor.
success (bool) – True if succeeded without regularization.
- Return type:
Examples
>>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_cholesky_safe >>> A = np.array([[1, 2], [2, 1]]) # Not positive definite >>> L, success = gpu_cholesky_safe(A) >>> success False
- pytcl.gpu.matrix_utils.gpu_qr(A, mode='reduced')[source]
GPU-accelerated QR decomposition.
Computes A = Q @ R where Q is orthogonal and R is upper triangular.
- Parameters:
A (array_like) – Matrix to decompose, shape (m, n) or batch (k, m, n).
mode (str) – ‘reduced’ (default) or ‘complete’.
- Returns:
Q (ndarray) – Orthogonal matrix.
R (ndarray) – Upper triangular matrix.
- Return type:
Tuple[ndarray[tuple[Any, …], dtype[floating[Any]]], ndarray[tuple[Any, …], dtype[floating[Any]]]]
Examples
>>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_qr >>> A = np.random.randn(4, 3) >>> Q, R = gpu_qr(A) >>> np.allclose(Q @ R, A) True
- pytcl.gpu.matrix_utils.gpu_solve(A, b)[source]
GPU-accelerated linear system solve.
Solves A @ x = b for x.
- Parameters:
A (array_like) – Coefficient matrix, shape (n, n) or batch (k, n, n).
b (array_like) – Right-hand side, shape (n,) or (n, m) or batch (k, n).
- Returns:
x – Solution vector/matrix.
- Return type:
ndarray
Examples
>>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_solve >>> A = np.array([[3, 1], [1, 2]]) >>> b = np.array([9, 8]) >>> x = gpu_solve(A, b) >>> np.allclose(A @ x, b) True
- pytcl.gpu.matrix_utils.gpu_inv(A)[source]
GPU-accelerated matrix inversion.
- Parameters:
A (array_like) – Matrix to invert, shape (n, n) or batch (k, n, n).
- Returns:
A_inv – Inverse matrix.
- Return type:
ndarray
Examples
>>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_inv >>> A = np.array([[1, 2], [3, 4]]) >>> A_inv = gpu_inv(A) >>> np.allclose(A @ A_inv, np.eye(2)) True
- pytcl.gpu.matrix_utils.gpu_eigh(A)[source]
GPU-accelerated eigendecomposition for symmetric matrices.
Computes eigenvalues and eigenvectors of symmetric matrix A.
- Parameters:
A (array_like) – Symmetric matrix, shape (n, n) or batch (k, n, n).
- Returns:
eigenvalues (ndarray) – Eigenvalues in ascending order.
eigenvectors (ndarray) – Corresponding eigenvectors as columns.
- Return type:
Tuple[ndarray[tuple[Any, …], dtype[floating[Any]]], ndarray[tuple[Any, …], dtype[floating[Any]]]]
Examples
>>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_eigh >>> A = np.array([[2, 1], [1, 2]]) >>> eigvals, eigvecs = gpu_eigh(A) >>> eigvals array([1., 3.])
- pytcl.gpu.matrix_utils.gpu_matrix_sqrt(A)[source]
GPU-accelerated matrix square root for positive definite matrices.
Computes S such that S @ S = A using eigendecomposition.
- Parameters:
A (array_like) – Symmetric positive definite matrix.
- Returns:
S – Matrix square root.
- Return type:
ndarray
Examples
>>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_matrix_sqrt >>> A = np.array([[4, 0], [0, 9]]) >>> S = gpu_matrix_sqrt(A) >>> np.allclose(S @ S, A) True
- class pytcl.gpu.matrix_utils.MemoryPool[source]
GPU memory pool manager for efficient memory allocation.
This class provides convenient access to CuPy’s memory pool with additional monitoring and management utilities.
Examples
>>> from pytcl.gpu.matrix_utils import MemoryPool >>> pool = MemoryPool() >>> print(pool.get_stats()) {'used': 0, 'total': 0, 'free': ...} >>> >>> # Allocate some arrays >>> import cupy as cp >>> x = cp.zeros((1000, 1000)) >>> print(pool.get_stats()) {'used': 8000000, ...} >>> >>> # Free cached memory >>> pool.free_all()
- get_stats()[source]
Get memory pool statistics.
- Returns:
stats – Dictionary with ‘used’, ‘total’, and ‘free’ bytes.
- Return type:
Examples
>>> from pytcl.gpu.matrix_utils import get_memory_pool >>> pool = get_memory_pool() >>> stats = pool.get_stats() >>> stats.keys() dict_keys(['used', 'total', 'free', 'device_total']) >>> stats['used'] >= 0 True
- free_all()[source]
Free all cached memory blocks.
Clears the memory pool cache, which can help free up GPU memory when operations are complete.
Examples
>>> from pytcl.gpu.matrix_utils import get_memory_pool >>> pool = get_memory_pool() >>> # After allocations >>> pool.free_all() # Clear cached blocks
- set_limit(limit=None)[source]
Set memory pool limit.
- Parameters:
limit (int or None) – Maximum bytes to allocate. None for no limit.
Examples
>>> from pytcl.gpu.matrix_utils import get_memory_pool >>> pool = get_memory_pool() >>> # Limit to 2 GB >>> pool.set_limit(2 * 1024**3) >>> # Reset to unlimited >>> pool.set_limit(None)
Example: Batch Track Processing
Process multiple tracks in parallel using GPU acceleration:
import numpy as np
from pytcl.gpu import (
is_gpu_available,
to_gpu,
to_cpu,
batch_kf_predict,
batch_kf_update,
)
if not is_gpu_available():
raise RuntimeError("GPU not available")
# Simulate 1000 tracks with 4D state (x, vx, y, vy)
n_tracks = 1000
state_dim = 4
meas_dim = 2
# Initial states and covariances
x = np.random.randn(n_tracks, state_dim)
P = np.tile(np.eye(state_dim), (n_tracks, 1, 1))
# System matrices
dt = 0.1
F = np.array([
[1, dt, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, dt],
[0, 0, 0, 1]
])
Q = np.eye(state_dim) * 0.1
H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]])
R = np.eye(meas_dim) * 0.5
# Transfer to GPU
x_gpu = to_gpu(x)
P_gpu = to_gpu(P)
# Batch predict (all 1000 tracks at once!)
pred_result = batch_kf_predict(x_gpu, P_gpu, F, Q)
# Generate measurements
z = np.random.randn(n_tracks, meas_dim)
# Batch update
upd_result = batch_kf_update(
pred_result.x, pred_result.P, z, H, R
)
# Transfer results back to CPU
x_updated = to_cpu(upd_result.x)
P_updated = to_cpu(upd_result.P)
print(f"Processed {n_tracks} tracks in batch")
Performance Notes
The GPU implementations achieve significant speedups for:
Large batch sizes: Processing 100+ tracks simultaneously
Large particle counts: Particle filters with 1000+ particles
Matrix operations: Cholesky, QR, and eigendecompositions
For small batch sizes (< 10 tracks), CPU implementations may be faster due to GPU transfer overhead.
Backend Differences
CuPy (NVIDIA CUDA): - Full float64 (double precision) support - Explicit memory pool management - CUDA stream synchronization
MLX (Apple Silicon):
- Optimized for float32 (single precision)
- Automatic memory management
- Lazy evaluation with explicit sync via mx.eval()