Source code for pytcl.gpu.matrix_utils

"""
GPU matrix utilities for numerical linear algebra.

This module provides GPU-accelerated matrix operations commonly used in
tracking algorithms, including:
- Cholesky decomposition
- QR factorization
- Matrix inversion and solving
- Memory pool management

Examples
--------
>>> from pytcl.gpu.matrix_utils import gpu_cholesky, gpu_solve
>>> import numpy as np
>>>
>>> # Compute Cholesky decomposition on GPU
>>> A = np.eye(4) + np.random.randn(4, 4) * 0.1
>>> A = A @ A.T  # Make positive definite
>>> L = gpu_cholesky(A)
>>>
>>> # Solve linear system
>>> b = np.random.randn(4)
>>> x = gpu_solve(A, b)
"""

import logging
from contextlib import contextmanager
from typing import Any, Generator, Optional, Tuple

import numpy as np
from numpy.typing import ArrayLike, NDArray

from pytcl.core.optional_deps import import_optional, is_available, requires
from pytcl.gpu.utils import ensure_gpu_array

# Module logger
_logger = logging.getLogger("pytcl.gpu.matrix_utils")


[docs] @requires("cupy", extra="gpu", feature="GPU matrix utilities") def gpu_cholesky(A: ArrayLike, lower: bool = True) -> NDArray[np.floating[Any]]: """ GPU-accelerated Cholesky decomposition. Computes L such that A = L @ L.T (lower=True) or A = U.T @ U (lower=False). Parameters ---------- A : array_like Symmetric positive definite matrix, shape (n, n) or batch (k, n, n). lower : bool If True, return lower triangular. If False, return upper triangular. Returns ------- L : ndarray Cholesky factor, same shape as A. Raises ------ numpy.linalg.LinAlgError If matrix is not positive definite. Examples -------- >>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_cholesky >>> A = np.array([[4, 2], [2, 3]]) >>> L = gpu_cholesky(A) >>> np.allclose(L @ L.T, A) True """ cp = import_optional("cupy", extra="gpu", feature="GPU matrix utilities") A_gpu = ensure_gpu_array(A, dtype=cp.float64) L = cp.linalg.cholesky(A_gpu) if not lower: if A_gpu.ndim == 2: L = L.T else: L = cp.swapaxes(L, -2, -1) return L
[docs] @requires("cupy", extra="gpu", feature="GPU matrix utilities") def gpu_cholesky_safe( A: ArrayLike, lower: bool = True, regularization: float = 1e-10, ) -> Tuple[NDArray[np.floating[Any]], bool]: """ GPU Cholesky decomposition with fallback for non-positive-definite matrices. If standard Cholesky fails, adds regularization to diagonal and retries. Parameters ---------- A : array_like Symmetric matrix, shape (n, n) or batch (k, n, n). lower : bool Return lower (True) or upper (False) triangular factor. regularization : float Amount to add to diagonal if matrix is not positive definite. Returns ------- L : ndarray Cholesky factor. success : bool True if succeeded without regularization. Examples -------- >>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_cholesky_safe >>> A = np.array([[1, 2], [2, 1]]) # Not positive definite >>> L, success = gpu_cholesky_safe(A) >>> success False """ cp = import_optional("cupy", extra="gpu", feature="GPU matrix utilities") A_gpu = ensure_gpu_array(A, dtype=cp.float64) try: L = cp.linalg.cholesky(A_gpu) success = True except cp.linalg.LinAlgError: # Add regularization if A_gpu.ndim == 2: A_reg = A_gpu + regularization * cp.eye(A_gpu.shape[0], dtype=cp.float64) else: # Batch case n = A_gpu.shape[-1] eye = cp.eye(n, dtype=cp.float64) A_reg = A_gpu + regularization * eye L = cp.linalg.cholesky(A_reg) success = False _logger.warning("Cholesky decomposition required regularization") if not lower: if A_gpu.ndim == 2: L = L.T else: L = cp.swapaxes(L, -2, -1) return L, success
[docs] @requires("cupy", extra="gpu", feature="GPU matrix utilities") def gpu_qr( A: ArrayLike, mode: str = "reduced" ) -> Tuple[NDArray[np.floating[Any]], NDArray[np.floating[Any]]]: """ GPU-accelerated QR decomposition. Computes A = Q @ R where Q is orthogonal and R is upper triangular. Parameters ---------- A : array_like Matrix to decompose, shape (m, n) or batch (k, m, n). mode : str 'reduced' (default) or 'complete'. Returns ------- Q : ndarray Orthogonal matrix. R : ndarray Upper triangular matrix. Examples -------- >>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_qr >>> A = np.random.randn(4, 3) >>> Q, R = gpu_qr(A) >>> np.allclose(Q @ R, A) True """ cp = import_optional("cupy", extra="gpu", feature="GPU matrix utilities") A_gpu = ensure_gpu_array(A, dtype=cp.float64) Q, R = cp.linalg.qr(A_gpu, mode=mode) return Q, R
[docs] @requires("cupy", extra="gpu", feature="GPU matrix utilities") def gpu_solve(A: ArrayLike, b: ArrayLike) -> NDArray[np.floating[Any]]: """ GPU-accelerated linear system solve. Solves A @ x = b for x. Parameters ---------- A : array_like Coefficient matrix, shape (n, n) or batch (k, n, n). b : array_like Right-hand side, shape (n,) or (n, m) or batch (k, n). Returns ------- x : ndarray Solution vector/matrix. Examples -------- >>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_solve >>> A = np.array([[3, 1], [1, 2]]) >>> b = np.array([9, 8]) >>> x = gpu_solve(A, b) >>> np.allclose(A @ x, b) True """ cp = import_optional("cupy", extra="gpu", feature="GPU matrix utilities") A_gpu = ensure_gpu_array(A, dtype=cp.float64) b_gpu = ensure_gpu_array(b, dtype=cp.float64) x = cp.linalg.solve(A_gpu, b_gpu) return x
[docs] @requires("cupy", extra="gpu", feature="GPU matrix utilities") def gpu_inv(A: ArrayLike) -> NDArray[np.floating[Any]]: """ GPU-accelerated matrix inversion. Parameters ---------- A : array_like Matrix to invert, shape (n, n) or batch (k, n, n). Returns ------- A_inv : ndarray Inverse matrix. Examples -------- >>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_inv >>> A = np.array([[1, 2], [3, 4]]) >>> A_inv = gpu_inv(A) >>> np.allclose(A @ A_inv, np.eye(2)) True """ cp = import_optional("cupy", extra="gpu", feature="GPU matrix utilities") A_gpu = ensure_gpu_array(A, dtype=cp.float64) A_inv = cp.linalg.inv(A_gpu) return A_inv
[docs] @requires("cupy", extra="gpu", feature="GPU matrix utilities") def gpu_eigh( A: ArrayLike, ) -> Tuple[NDArray[np.floating[Any]], NDArray[np.floating[Any]]]: """ GPU-accelerated eigendecomposition for symmetric matrices. Computes eigenvalues and eigenvectors of symmetric matrix A. Parameters ---------- A : array_like Symmetric matrix, shape (n, n) or batch (k, n, n). Returns ------- eigenvalues : ndarray Eigenvalues in ascending order. eigenvectors : ndarray Corresponding eigenvectors as columns. Examples -------- >>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_eigh >>> A = np.array([[2, 1], [1, 2]]) >>> eigvals, eigvecs = gpu_eigh(A) >>> eigvals array([1., 3.]) """ cp = import_optional("cupy", extra="gpu", feature="GPU matrix utilities") A_gpu = ensure_gpu_array(A, dtype=cp.float64) eigvals, eigvecs = cp.linalg.eigh(A_gpu) return eigvals, eigvecs
[docs] @requires("cupy", extra="gpu", feature="GPU matrix utilities") def gpu_matrix_sqrt(A: ArrayLike) -> NDArray[np.floating[Any]]: """ GPU-accelerated matrix square root for positive definite matrices. Computes S such that S @ S = A using eigendecomposition. Parameters ---------- A : array_like Symmetric positive definite matrix. Returns ------- S : ndarray Matrix square root. Examples -------- >>> import numpy as np >>> from pytcl.gpu.matrix_utils import gpu_matrix_sqrt >>> A = np.array([[4, 0], [0, 9]]) >>> S = gpu_matrix_sqrt(A) >>> np.allclose(S @ S, A) True """ cp = import_optional("cupy", extra="gpu", feature="GPU matrix utilities") A_gpu = ensure_gpu_array(A, dtype=cp.float64) # Eigendecomposition eigvals, eigvecs = cp.linalg.eigh(A_gpu) # Ensure non-negative eigenvalues eigvals = cp.maximum(eigvals, 0) # Compute sqrt sqrt_eigvals = cp.sqrt(eigvals) # Reconstruct: S = V @ diag(sqrt(lambda)) @ V' if A_gpu.ndim == 2: S = eigvecs @ cp.diag(sqrt_eigvals) @ eigvecs.T else: # Batch case S = cp.einsum("...ij,...j,...kj->...ik", eigvecs, sqrt_eigvals, eigvecs) return S
[docs] class MemoryPool: """ GPU memory pool manager for efficient memory allocation. This class provides convenient access to CuPy's memory pool with additional monitoring and management utilities. Examples -------- >>> from pytcl.gpu.matrix_utils import MemoryPool >>> pool = MemoryPool() >>> print(pool.get_stats()) {'used': 0, 'total': 0, 'free': ...} >>> >>> # Allocate some arrays >>> import cupy as cp >>> x = cp.zeros((1000, 1000)) >>> print(pool.get_stats()) {'used': 8000000, ...} >>> >>> # Free cached memory >>> pool.free_all() """
[docs] def __init__(self) -> None: """Initialize memory pool manager.""" if not is_available("cupy"): _logger.warning("CuPy not available, MemoryPool is a no-op") self._pool = None self._pinned_pool = None else: import cupy as cp self._pool = cp.get_default_memory_pool() self._pinned_pool = cp.get_default_pinned_memory_pool()
[docs] def get_stats(self) -> dict[str, int]: """ Get memory pool statistics. Returns ------- stats : dict Dictionary with 'used', 'total', and 'free' bytes. Examples -------- >>> from pytcl.gpu.matrix_utils import get_memory_pool >>> pool = get_memory_pool() >>> stats = pool.get_stats() >>> stats.keys() dict_keys(['used', 'total', 'free', 'device_total']) >>> stats['used'] >= 0 True """ if self._pool is None: return {"used": 0, "total": 0, "free": 0} import cupy as cp free, total = cp.cuda.Device().mem_info return { "used": self._pool.used_bytes(), "total": self._pool.total_bytes(), "free": free, "device_total": total, }
[docs] def free_all(self) -> None: """ Free all cached memory blocks. Clears the memory pool cache, which can help free up GPU memory when operations are complete. Examples -------- >>> from pytcl.gpu.matrix_utils import get_memory_pool >>> pool = get_memory_pool() >>> # After allocations >>> pool.free_all() # Clear cached blocks """ if self._pool is not None: self._pool.free_all_blocks() if self._pinned_pool is not None: self._pinned_pool.free_all_blocks()
[docs] def set_limit(self, limit: Optional[int] = None) -> None: """ Set memory pool limit. Parameters ---------- limit : int or None Maximum bytes to allocate. None for no limit. Examples -------- >>> from pytcl.gpu.matrix_utils import get_memory_pool >>> pool = get_memory_pool() >>> # Limit to 2 GB >>> pool.set_limit(2 * 1024**3) >>> # Reset to unlimited >>> pool.set_limit(None) """ if self._pool is not None: if limit is None: self._pool.set_limit(size=0) # 0 means no limit else: self._pool.set_limit(size=limit)
[docs] @contextmanager def limit_memory(self, max_bytes: int) -> Generator[None, None, None]: """ Context manager for temporary memory limit. Parameters ---------- max_bytes : int Maximum bytes allowed during context. Examples -------- >>> pool = MemoryPool() >>> with pool.limit_memory(1e9): # 1GB limit ... # Operations here have limited memory ... pass """ if self._pool is None: yield return old_limit = self._pool.get_limit() self._pool.set_limit(size=max_bytes) try: yield finally: self._pool.set_limit(size=old_limit)
# Global memory pool instance _memory_pool: Optional[MemoryPool] = None def get_memory_pool() -> MemoryPool: """ Get the global GPU memory pool manager. Returns ------- pool : MemoryPool Global memory pool instance. Examples -------- >>> from pytcl.gpu.matrix_utils import get_memory_pool >>> pool = get_memory_pool() >>> # Get current memory stats >>> stats = pool.get_stats() >>> print(f"Used: {stats['used']} bytes") >>> # Set memory limit >>> pool.set_limit(1024**3) # 1 GB limit >>> # Free cached blocks >>> pool.free_all() """ global _memory_pool if _memory_pool is None: _memory_pool = MemoryPool() return _memory_pool __all__ = [ "gpu_cholesky", "gpu_cholesky_safe", "gpu_qr", "gpu_solve", "gpu_inv", "gpu_eigh", "gpu_matrix_sqrt", "MemoryPool", "get_memory_pool", ]