Custom Filter Implementation

Extend the Tracker Component Library with custom filtering algorithms. This guide covers designing, implementing, and integrating new filters while leveraging existing infrastructure for testing, logging, and integration.

Why Implement Custom Filters

Scenarios:

  1. Proprietary Algorithms: Company-specific filter designs not publicly available

  2. Research Extensions: Novel variants for academic work or specialized applications

  3. Legacy Integration: Wrapping existing MATLAB/C++ filter implementations

  4. Performance Optimization: Heavily optimized versions for specific hardware

  5. Domain-Specific Logic: Filters incorporating application-specific constraints

Benefits of integrating with TCL:

  • ✅ Leverage existing coordinate systems, dynamic models, data association

  • ✅ Use standard testing framework and diagnostics tools

  • ✅ Automatic documentation generation

  • ✅ Community review and potential contribution back to library

Design Patterns: Base Class Architecture

TCL uses abstract base classes to define filter interfaces. Understanding these patterns helps create compatible custom filters.

Filter Base Class Structure

Most filters in TCL follow this pattern:

from abc import ABC, abstractmethod
import numpy as np
from typing import Tuple, Optional, Dict, Any
from numpy.typing import NDArray

class BaseFilter(ABC):
    """
    Abstract base class for filtering algorithms.

    All filters should implement:
    - predict(): Propagate state forward
    - update(): Incorporate measurement
    - get_state(): Return current estimate
    """

    def __init__(self, state_dim: int, meas_dim: int):
        """
        Parameters
        ----------
        state_dim : int
            Dimension of state vector
        meas_dim : int
            Dimension of measurement vector
        """
        self.n = state_dim
        self.m = meas_dim

        # Subclasses should initialize:
        self.x: Optional[NDArray[np.floating[Any]]] = None  # State estimate
        self.P: Optional[NDArray[np.floating[Any]]] = None  # Covariance

    @abstractmethod
    def predict(self, **kwargs: Any) -> None:
        """
        Prediction step.

        Subclasses must implement. Typical signature:
            predict(F, Q, dt, ...)
        where F is state transition, Q is process noise covariance.
        """
        pass

    @abstractmethod
    def update(self, z: NDArray[np.floating[Any]],
              **kwargs: Any) -> None:
        """
        Measurement update step.

        Subclasses must implement. Typical signature:
            update(z, H, R, ...)
        where z is measurement, H is measurement matrix, R is noise covariance.
        """
        pass

    @abstractmethod
    def get_state(self) -> Tuple[NDArray[np.floating[Any]],
                                NDArray[np.floating[Any]]]:
        """
        Return current state estimate and covariance.

        Returns
        -------
        (x, P) tuple
            x: State vector (n,)
            P: Covariance matrix (n, n)
        """
        pass

    @property
    def state_dim(self) -> int:
        """State dimension."""
        return self.n

    @property
    def meas_dim(self) -> int:
        """Measurement dimension."""
        return self.m

Example 1: Custom Adaptive Constant Velocity Filter

Create a specialized filter for tracking objects moving at constant velocity with adaptive noise estimation.

import numpy as np
from scipy.linalg import cholesky
from typing import Dict, Any, Tuple, Optional
from numpy.typing import NDArray

class AdaptiveConstantVelocityFilter:
    """
    Specialized filter for constant velocity targets.

    Features:
    - Assumes constant velocity motion model
    - Adaptively estimates measurement noise
    - Detects target acceleration (maneuvers)
    - Simplified interface for CV-only scenarios
    """

    def __init__(self, pos_init: NDArray[np.floating[Any]],
                 vel_init: Optional[NDArray[np.floating[Any]]] = None,
                 P_pos: float = 1.0, P_vel: float = 0.1):
        """
        Parameters
        ----------
        pos_init : (d,) array
            Initial position (d = 2 or 3 for 2D/3D)
        vel_init : (d,) array, optional
            Initial velocity (assumed zero if not provided)
        P_pos : float
            Initial position uncertainty
        P_vel : float
            Initial velocity uncertainty
        """
        self.d = len(pos_init)  # Dimension (2D, 3D, etc.)
        self.n = 2 * self.d     # State: [pos, vel]

        # Initialize state [x, y, vx, vy] (in 2D)
        self.x = np.zeros(self.n)
        self.x[:self.d] = pos_init
        if vel_init is not None:
            self.x[self.d:] = vel_init

        # Initialize covariance
        self.P = np.diag(np.concatenate([
            np.full(self.d, P_pos),
            np.full(self.d, P_vel)
        ]))

        # Adaptive noise estimation
        self.R_est = 1.0  # Will be updated
        self.innovation_history = []
        self.forgetting_factor = 0.95

        # Maneuver detection
        self.maneuver_prob = 0.0
        self.Q_base = np.eye(self.n) * 0.01

        # Diagnostics
        self.update_count = 0

    def predict(self, dt: float) -> None:
        """
        Predict using constant velocity model.

        Parameters
        ----------
        dt : float
            Time step
        """
        # State transition matrix F = [[I, I*dt], [0, I]]
        F = np.eye(self.n)
        F[:self.d, self.d:] = np.eye(self.d) * dt

        # Predictor-corrector: adjust Q based on maneuver detection
        Q = self.Q_base.copy()
        if self.maneuver_prob > 0.5:
            # Increase process noise if maneuvering likely
            Q *= (1.0 + 2.0 * self.maneuver_prob)

        # Propagate
        self.x = F @ self.x
        self.P = F @ self.P @ F.T + Q

    def update(self, z: NDArray[np.floating[Any]]) -> None:
        """
        Update with position measurement.

        Parameters
        ----------
        z : (d,) array
            Measured position
        """
        # Measurement matrix (observe position only)
        H = np.hstack([np.eye(self.d), np.zeros((self.d, self.d))])

        # Innovation
        z_pred = H @ self.x
        innovation = z - z_pred

        # Innovation covariance
        S = H @ self.P @ H.T + self.R_est * np.eye(self.d)

        # Kalman gain
        K = self.P @ H.T @ np.linalg.inv(S)

        # Update
        self.x = self.x + K @ innovation
        self.P = (np.eye(self.n) - K @ H) @ self.P

        # Adaptive R estimation
        innov_squared = (innovation ** 2).sum()
        self.innovation_history.append(innov_squared)

        if len(self.innovation_history) > 1:
            # Exponential moving average
            self.R_est = (self.forgetting_factor * self.R_est +
                         (1 - self.forgetting_factor) * innov_squared)

            # Limit R to prevent numerical issues
            self.R_est = np.clip(self.R_est, 0.01, 100.0)

        # Detect maneuvers: innovation should be small for CV model
        expected_innov = self.d * self.R_est
        innov_ratio = innov_squared / (expected_innov + 1e-6)

        # Update maneuver probability
        self.maneuver_prob = 0.9 * self.maneuver_prob + 0.1 * (innov_ratio > 2.0)

        self.update_count += 1

    def get_state(self) -> Tuple[NDArray[np.floating[Any]],
                                 NDArray[np.floating[Any]]]:
        """Return state and covariance."""
        return self.x.copy(), self.P.copy()

    def get_position(self) -> NDArray[np.floating[Any]]:
        """Return position estimate only."""
        return self.x[:self.d].copy()

    def get_velocity(self) -> NDArray[np.floating[Any]]:
        """Return velocity estimate only."""
        return self.x[self.d:].copy()

    def get_diagnostics(self) -> Dict[str, Any]:
        """Return diagnostic information."""
        return {
            'updates': self.update_count,
            'R_estimate': self.R_est,
            'maneuver_probability': self.maneuver_prob,
            'innovations_recent_mean': (np.mean(self.innovation_history[-10:])
                                       if self.innovation_history else 0.0)
        }

Usage:

# 2D tracking scenario
pos_init = np.array([0.0, 0.0])
cvf = AdaptiveConstantVelocityFilter(pos_init)

for k in range(100):
    # Predict
    cvf.predict(dt=0.1)

    # Measurement
    true_pos = np.array([k * 0.1, 0.05 * k])
    z = true_pos + 0.5 * np.random.randn(2)

    # Update
    cvf.update(z)

    if k % 20 == 0:
        pos, vel = cvf.get_position(), cvf.get_velocity()
        diag = cvf.get_diagnostics()
        print(f"Step {k}: pos={pos}, vel={vel:.3f}, "
              f"R_est={diag['R_estimate']:.2f}, "
              f"maneuver_prob={diag['maneuver_probability']:.2f}")

Example 2: Wrapping External C++ Filter

Integrate a pre-existing C++ filter implementation via ctypes or Cython.

import ctypes
import numpy as np
from pathlib import Path
from typing import Tuple, Any
from numpy.typing import NDArray

class ExternalCppFilterWrapper:
    """
    Wrapper for external C++ filter implementation.

    Assumes compiled library with C interface:
    - kf_init(n, m)
    - kf_predict(F, Q, dt)
    - kf_update(z, H, R)
    - kf_get_state(x_out, P_out)
    """

    def __init__(self, state_dim: int, meas_dim: int,
                 lib_path: str = None):
        """
        Parameters
        ----------
        state_dim : int
            State space dimension
        meas_dim : int
            Measurement space dimension
        lib_path : str, optional
            Path to compiled .so/.dll file
            If None, looks for libcppfilter.so in current directory
        """
        self.n = state_dim
        self.m = meas_dim

        # Load external library
        if lib_path is None:
            lib_path = "./libcppfilter.so"

        try:
            self.lib = ctypes.CDLL(lib_path)
        except OSError as e:
            raise RuntimeError(f"Failed to load C++ library: {lib_path}") from e

        # Define C function signatures
        self._define_c_functions()

        # Initialize filter handle
        init_func = self.lib.kf_init
        init_func.argtypes = [ctypes.c_int, ctypes.c_int]
        init_func.restype = ctypes.c_void_p

        self.handle = init_func(ctypes.c_int(state_dim),
                               ctypes.c_int(meas_dim))

        if not self.handle:
            raise RuntimeError("Failed to initialize C++ filter")

        # Cache current state (C++ ownership)
        self.x = np.zeros(state_dim)
        self.P = np.eye(state_dim)

    def _define_c_functions(self) -> None:
        """Define ctypes signatures for C functions."""
        # kf_predict(handle, F, dt, Q)
        predict_func = self.lib.kf_predict
        predict_func.argtypes = [
            ctypes.c_void_p,
            np.ctypeslib.ndpointer(ctypes.c_double, flags='C_CONTIGUOUS'),
            ctypes.c_double,
            np.ctypeslib.ndpointer(ctypes.c_double, flags='C_CONTIGUOUS')
        ]
        predict_func.restype = ctypes.c_int
        self._predict_c = predict_func

        # kf_update(handle, z, H, R)
        update_func = self.lib.kf_update
        update_func.argtypes = [
            ctypes.c_void_p,
            np.ctypeslib.ndpointer(ctypes.c_double, flags='C_CONTIGUOUS'),
            np.ctypeslib.ndpointer(ctypes.c_double, flags='C_CONTIGUOUS'),
            np.ctypeslib.ndpointer(ctypes.c_double, flags='C_CONTIGUOUS')
        ]
        update_func.restype = ctypes.c_int
        self._update_c = update_func

        # kf_get_state(handle, x_out, P_out)
        get_state_func = self.lib.kf_get_state
        get_state_func.argtypes = [
            ctypes.c_void_p,
            np.ctypeslib.ndpointer(ctypes.c_double, flags='C_CONTIGUOUS'),
            np.ctypeslib.ndpointer(ctypes.c_double, flags='C_CONTIGUOUS')
        ]
        get_state_func.restype = ctypes.c_int
        self._get_state_c = get_state_func

    def predict(self, F: NDArray[np.floating[Any]],
               Q: NDArray[np.floating[Any]],
               dt: float = 1.0) -> None:
        """
        Call C++ predict function.

        Parameters
        ----------
        F : (n, n) array
            State transition matrix
        Q : (n, n) array
            Process noise covariance
        dt : float
            Time step
        """
        # Ensure arrays are contiguous and double precision
        F_c = np.ascontiguousarray(F, dtype=np.float64)
        Q_c = np.ascontiguousarray(Q, dtype=np.float64)

        ret = self._predict_c(self.handle, F_c, dt, Q_c)
        if ret != 0:
            raise RuntimeError(f"C++ predict failed with code {ret}")

    def update(self, z: NDArray[np.floating[Any]],
              H: NDArray[np.floating[Any]],
              R: NDArray[np.floating[Any]]) -> None:
        """Call C++ update function."""
        z_c = np.ascontiguousarray(z, dtype=np.float64)
        H_c = np.ascontiguousarray(H, dtype=np.float64)
        R_c = np.ascontiguousarray(R, dtype=np.float64)

        ret = self._update_c(self.handle, z_c, H_c, R_c)
        if ret != 0:
            raise RuntimeError(f"C++ update failed with code {ret}")

        # Sync state from C++ to Python
        self._sync_state()

    def _sync_state(self) -> None:
        """Fetch current state from C++ implementation."""
        x_out = np.zeros(self.n, dtype=np.float64)
        P_out = np.zeros((self.n, self.n), dtype=np.float64)

        ret = self._get_state_c(self.handle, x_out, P_out)
        if ret != 0:
            raise RuntimeError(f"C++ get_state failed with code {ret}")

        self.x = x_out.copy()
        self.P = P_out.copy()

    def get_state(self) -> Tuple[NDArray[np.floating[Any]],
                                NDArray[np.floating[Any]]]:
        """Return state estimate and covariance."""
        return self.x.copy(), self.P.copy()

    def __del__(self) -> None:
        """Clean up C++ resources."""
        if hasattr(self, 'lib') and hasattr(self, 'handle'):
            try:
                cleanup = self.lib.kf_cleanup
                cleanup.argtypes = [ctypes.c_void_p]
                cleanup(self.handle)
            except:
                pass

Integration with TCL Components

Integrate custom filters with existing TCL infrastructure.

Using Dynamic Models

from pytcl.dynamic_models import f_constant_velocity, q_constant_velocity

class FilterWithTCLModels:
    """
    Custom filter using TCL dynamic models.
    """

    def __init__(self, x0: NDArray[np.floating[Any]],
                 P0: NDArray[np.floating[Any]],
                 num_dims: int = 2):
        self.x = x0.copy()
        self.P = P0.copy()
        self.num_dims = num_dims
        self.n = len(x0)

    def predict(self, dt: float, Q_sigma_a: float = 0.1) -> None:
        """
        Predict using TCL constant velocity model.
        """
        # Get TCL motion models
        F = f_constant_velocity(T=dt, num_dims=self.num_dims)
        Q = q_constant_velocity(T=dt, sigma_a=Q_sigma_a,
                               num_dims=self.num_dims)

        # Propagate
        self.x = F @ self.x
        self.P = F @ self.P @ F.T + Q

    def update(self, z: NDArray[np.floating[Any]],
              R: float) -> None:
        """Update with position measurement."""
        # Measurement matrix (position only)
        H = np.zeros((self.num_dims, self.n))
        H[:self.num_dims, :self.num_dims] = np.eye(self.num_dims)

        # Standard Kalman update
        S = H @ self.P @ H.T + R * np.eye(self.num_dims)
        K = self.P @ H.T @ np.linalg.inv(S)

        self.x = self.x + K @ (z - H @ self.x)
        self.P = (np.eye(self.n) - K @ H) @ self.P

    def get_state(self) -> Tuple[NDArray[np.floating[Any]],
                                NDArray[np.floating[Any]]]:
        return self.x.copy(), self.P.copy()

Using Coordinate Transformations

from pytcl.coordinate_systems import cart2sphere, sphere2cart

class PolarCoordinateFilter:
    """
    Filter that works in polar coordinates natively.
    Useful for radar systems.
    """

    def __init__(self, r_init: float, theta_init: float,
                 P_r: float = 1.0, P_theta: float = 0.1):
        """
        Parameters
        ----------
        r_init, theta_init : float
            Initial range and bearing
        P_r, P_theta : float
            Uncertainties
        """
        # State: [r, theta, dr, dtheta]
        self.x = np.array([r_init, theta_init, 0.0, 0.0])
        self.P = np.diag([P_r, P_theta, 0.1, 0.01])

    def predict(self, dt: float) -> None:
        """Predict in polar coordinates."""
        F = np.eye(4)
        F[0, 2] = dt  # r += dr*dt
        F[1, 3] = dt  # theta += dtheta*dt

        Q = np.eye(4) * 0.01

        self.x = F @ self.x
        self.P = F @ self.P @ F.T + Q

    def update_from_cartesian(self, z_cart: NDArray[np.floating[Any]]) -> None:
        """
        Update with Cartesian measurement.
        Convert to polar, then update.
        """
        # Convert measurement to polar
        r_meas, theta_meas = cart2sphere(z_cart)

        # Use simplified update (nonlinear)
        H = np.array([[1, 0, 0, 0],  # Measure range
                     [0, 1, 0, 0]])  # Measure bearing

        z_pred = H @ self.x
        z_meas = np.array([r_meas, theta_meas])

        innovation = z_meas - z_pred

        # Handle angle wrapping
        innovation[1] = np.arctan2(np.sin(innovation[1]),
                                  np.cos(innovation[1]))

        R = np.diag([0.5, 0.1])

        S = H @ self.P @ H.T + R
        K = self.P @ H.T @ np.linalg.inv(S)

        self.x = self.x + K @ innovation
        self.P = (np.eye(4) - K @ H) @ self.P

    def get_state_cartesian(self) -> Tuple[NDArray[np.floating[Any]],
                                           NDArray[np.floating[Any]]]:
        """Return state in Cartesian coordinates."""
        # Convert position estimate to Cartesian
        r, theta = self.x[:2]
        x_cart = np.array([r * np.cos(theta),
                          r * np.sin(theta)])
        return x_cart, self.P[:2, :2]

Testing Custom Filters

Implement comprehensive tests for your filter.

import unittest
import numpy as np
from numpy.testing import assert_allclose, assert_array_less

class TestCustomFilter(unittest.TestCase):
    """Unit tests for custom filter."""

    def setUp(self) -> None:
        """Initialize test fixtures."""
        self.x0 = np.array([0.0, 0.0])
        self.P0 = np.eye(2) * 0.1
        self.filter = AdaptiveConstantVelocityFilter(self.x0)

    def test_initialization(self) -> None:
        """Test filter initialization."""
        x, P = self.filter.get_state()
        assert_allclose(x[:2], self.x0)
        assert_allclose(np.diag(P)[:2], 1.0, rtol=0.1)

    def test_predict_updates_covariance(self) -> None:
        """Covariance should increase during prediction."""
        self.filter.predict(dt=0.1)
        x, P = self.filter.get_state()

        # Covariance norm should increase
        P_norm = np.linalg.norm(P)
        P0_norm = np.linalg.norm(self.P0)
        self.assertGreater(P_norm, P0_norm)

    def test_update_reduces_uncertainty(self) -> None:
        """Update should reduce uncertainty."""
        self.filter.predict(dt=0.1)
        P_pred = self.filter.get_state()[1]

        # Measurement
        z = np.array([1.0, 1.0])
        self.filter.update(z)
        P_post = self.filter.get_state()[1]

        # Trace of covariance should decrease
        self.assertLess(np.trace(P_post), np.trace(P_pred))

    def test_constant_velocity_tracking(self) -> None:
        """Test steady-state tracking of constant velocity."""
        # Truth: constant velocity
        v_true = np.array([1.0, 0.5])

        errors = []
        for k in range(100):
            pos_true = v_true * 0.1 * k
            z = pos_true + 0.1 * np.random.randn(2)

            self.filter.predict(dt=0.1)
            self.filter.update(z)

            x, _ = self.filter.get_state()
            error = np.linalg.norm(x[:2] - pos_true)
            errors.append(error)

        # Steady-state error should be bounded
        steady_state_error = np.mean(errors[-10:])
        self.assertLess(steady_state_error, 0.5)

    def test_filter_interface_compatibility(self) -> None:
        """Verify filter implements expected interface."""
        # Should have these methods
        self.assertTrue(hasattr(self.filter, 'predict'))
        self.assertTrue(hasattr(self.filter, 'update'))
        self.assertTrue(hasattr(self.filter, 'get_state'))

    def test_numerical_stability(self) -> None:
        """Test for NaN/Inf propagation."""
        for k in range(50):
            self.filter.predict(dt=0.1)
            z = np.array([1e6 * np.random.randn(),
                         1e6 * np.random.randn()])
            self.filter.update(z)

            x, P = self.filter.get_state()

            # Check no NaNs or Infs
            self.assertTrue(np.all(np.isfinite(x)))
            self.assertTrue(np.all(np.isfinite(P)))

Run tests:

# Run all tests
python -m pytest test_custom_filter.py -v

# Run specific test
python -m pytest test_custom_filter.py::TestCustomFilter::test_constant_velocity_tracking -v

Performance Optimization

Optimize custom filter for production use.

Profiling

import cProfile
import pstats
from io import StringIO

def profile_filter() -> None:
    """Profile filter performance."""
    pr = cProfile.Profile()
    pr.enable()

    # Run filter for 1000 steps
    filt = AdaptiveConstantVelocityFilter(np.array([0.0, 0.0]))
    for k in range(1000):
        filt.predict(dt=0.1)
        z = np.array([k * 0.1, 0.05 * k]) + 0.1 * np.random.randn(2)
        filt.update(z)

    pr.disable()

    # Print stats
    s = StringIO()
    ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
    ps.print_stats(10)  # Top 10 functions
    print(s.getvalue())

Vectorization with NumPy

def batch_filter_update(filter_obj, measurements: NDArray[np.floating[Any]]) -> None:
    """
    Update filter with batch of measurements.

    More efficient than loop for large batches.
    """
    for z in measurements:
        filter_obj.predict(dt=0.1)
        filter_obj.update(z)

# Benchmark
import timeit

measurements = np.random.randn(10000, 2)

t = timeit.timeit(
    lambda: batch_filter_update(AdaptiveConstantVelocityFilter(np.array([0.0, 0.0])),
                               measurements[:100]),
    number=100
)
print(f"Average time per update: {t/100/100*1000:.2f} ms")

Numba JIT Compilation

For compute-intensive filters, use Numba JIT:

from numba import njit

@njit
def kalman_gain_update(P: NDArray[np.floating[Any]],
                      H: NDArray[np.floating[Any]],
                      R: float) -> NDArray[np.floating[Any]]:
    """
    Compute Kalman gain (JIT compiled).

    Much faster for repeated calls.
    """
    S = H @ P @ H.T + R
    K = P @ H.T / S  # Scalar R
    return K

Practical Workflow: Algorithm to Integration

Step-by-step example from research paper to production.

Scenario: Implement a novel “Adaptive Fading Memory Filter” from a paper.

  1. Understand Algorithm

    • Read paper carefully

    • Note all equations and parameters

    • Identify state/measurement dimensions

    • Check numerical considerations

  2. Prototype (NumPy)

class AdaptiveFadingMemoryFilter:
    """
    From: Smith et al. (2024) "Adaptive Fading Memory for Tracking"

    Key equations:
    - Fading factor λ_k controls memory length
    - λ_k = 1 - α * innovation_normalized
    - Covariance scaling: P_k *= λ_k
    """

    def __init__(self, x0, P0, alpha: float = 0.01):
        self.x = x0.copy()
        self.P = P0.copy()
        self.alpha = alpha
        self.lambda_k = 1.0

    def predict(self, F, Q):
        P_prev = self.P.copy()

        self.x = F @ self.x
        self.P = self.lambda_k * (F @ self.P @ F.T + Q)

    def update(self, z, H, R):
        innovation = z - H @ self.x
        S = H @ self.P @ H.T + R
        K = self.P @ H.T @ np.linalg.inv(S)

        # Compute innovation normalized squared
        innov_norm_sq = innovation.T @ np.linalg.inv(S) @ innovation

        # Update fading factor
        # λ_k = 1 - α * (γ_k - m) where m = measurement_dim
        m = len(z)
        self.lambda_k = max(0.5, 1.0 - self.alpha * (innov_norm_sq - m))

        self.x = self.x + K @ innovation
        self.P = (np.eye(len(self.x)) - K @ H) @ self.P
  1. Test (Unit Tests)

def test_fading_factor_adaptation():
    """Fading factor should respond to anomalies."""
    filt = AdaptiveFadingMemoryFilter(np.array([0.0, 0.0]), np.eye(2))

    # Normal measurement
    filt.predict(np.eye(2), np.eye(2) * 0.01)
    filt.update(np.array([0.0, 0.0]), np.eye(2), np.eye(2) * 0.1)
    lambda1 = filt.lambda_k

    # Large residual -> fading factor should decrease
    filt.predict(np.eye(2), np.eye(2) * 0.01)
    filt.update(np.array([10.0, 10.0]), np.eye(2), np.eye(2) * 0.1)
    lambda2 = filt.lambda_k

    assert lambda2 < lambda1  # Fading factor decreases with anomaly
  1. Integrate with TCL

from pytcl.dynamic_models import f_constant_velocity

class TCLAdaptiveFadingMemoryFilter(AdaptiveFadingMemoryFilter):
    """Integration with TCL models."""

    def predict_cv(self, dt: float, sigma_a: float = 0.1):
        """Predict using TCL constant velocity model."""
        from pytcl.dynamic_models import q_constant_velocity

        F = f_constant_velocity(T=dt, num_dims=2)
        Q = q_constant_velocity(T=dt, sigma_a=sigma_a, num_dims=2)

        self.predict(F, Q)
  1. Validate (Benchmarks)

def benchmark_against_standard_kf():
    """Compare performance against baseline."""
    from pytcl.dynamic_estimation.kalman import KalmanFilter

    # Scenario: tracking with variable noise
    errors_adaptive = []
    errors_standard = []

    for trial in range(100):
        adaptive = TCLAdaptiveFadingMemoryFilter(
            np.array([0.0, 0.0, 1.0, 0.0]),
            np.eye(4) * 0.1
        )

        # ... simulate and compare ...

    print(f"Adaptive RMSE: {np.mean(errors_adaptive):.4f}")
    print(f"Standard RMSE: {np.mean(errors_standard):.4f}")
  1. Document & Release

"""
Adaptive Fading Memory Filter
=============================

Implementation of the algorithm from Smith et al. (2024).

Features:
- Automatic memory length adaptation
- Response to target maneuvers
- No parameter tuning required

Example:
    >>> filt = TCLAdaptiveFadingMemoryFilter(x0, P0)
    >>> for t in range(N):
    ...     filt.predict_cv(dt=0.1)
    ...     filt.update(z, H, R)
    ...     x, P = filt.get_state()

References:
    Smith et al. (2024). Adaptive Fading Memory for Tracking.
"""

Documentation and Type Hints

Ensure proper type hints and documentation.

from typing import Tuple, Optional, Union, Dict, Any
from numpy.typing import NDArray
import numpy as np

class WellDocumentedFilter:
    """
    Example of properly documented custom filter.

    This filter demonstrates best practices for documentation,
    type hints, and code organization.

    Attributes
    ----------
    x : NDArray[np.floating[Any]]
        Current state estimate (n,)
    P : NDArray[np.floating[Any]]
        Current covariance matrix (n, n)
    n : int
        State dimension
    m : int
        Measurement dimension
    """

    def __init__(self,
                x0: NDArray[np.floating[Any]],
                P0: NDArray[np.floating[Any]]) -> None:
        """
        Initialize filter.

        Parameters
        ----------
        x0 : (n,) array
            Initial state estimate
        P0 : (n, n) array
            Initial error covariance matrix
            Must be positive definite.

        Raises
        ------
        ValueError
            If P0 is not positive definite

        Examples
        --------
        >>> x0 = np.array([0.0, 1.0])
        >>> P0 = np.eye(2) * 0.1
        >>> filt = WellDocumentedFilter(x0, P0)
        """
        self.n = len(x0)
        self.m: Optional[int] = None

        # Validate inputs
        if P0.shape != (self.n, self.n):
            raise ValueError("P0 must be square with shape (n, n)")

        self.x = x0.copy()
        self.P = P0.copy()

    def predict(self,
               F: NDArray[np.floating[Any]],
               Q: Optional[NDArray[np.floating[Any]]] = None) -> None:
        """
        Predict state forward using linear transition.

        Updates state estimate and covariance according to:
            x_pred = F @ x
            P_pred = F @ P @ F.T + Q

        Parameters
        ----------
        F : (n, n) array
            State transition matrix
        Q : (n, n) array, optional
            Process noise covariance
            Default: zero process noise

        Raises
        ------
        ValueError
            If F has wrong shape

        Notes
        -----
        The update is performed in-place. For numerical stability,
        ensure Q is positive semi-definite.
        """
        if F.shape != (self.n, self.n):
            raise ValueError(f"F must have shape ({self.n}, {self.n})")

        if Q is None:
            Q = np.zeros((self.n, self.n))

        self.x = F @ self.x
        self.P = F @ self.P @ F.T + Q

    def update(self,
              z: NDArray[np.floating[Any]],
              H: NDArray[np.floating[Any]],
              R: NDArray[np.floating[Any]]) -> Tuple[float, float]:
        """
        Update state using measurement.

        Standard Kalman measurement update:
            K = P @ H.T @ inv(H @ P @ H.T + R)
            x = x + K @ (z - H @ x)
            P = (I - K @ H) @ P

        Parameters
        ----------
        z : (m,) array
            Measurement vector
        H : (m, n) array
            Measurement matrix
        R : (m, m) array
            Measurement noise covariance

        Returns
        -------
        innovation : float
            Innovation (measurement residual) norm
        gain : float
            Kalman gain magnitude (Frobenius norm)

        Raises
        ------
        ValueError
            If dimensions don't match

        Examples
        --------
        >>> innov, gain = filt.update(z, H, R)
        >>> print(f"Innovation: {innov:.4f}, Gain: {gain:.4f}")
        """
        m = len(z)

        if H.shape != (m, self.n):
            raise ValueError(f"H must have shape ({m}, {self.n})")
        if R.shape != (m, m):
            raise ValueError(f"R must have shape ({m}, {m})")

        self.m = m

        # Innovation covariance
        S = H @ self.P @ H.T + R

        # Kalman gain
        try:
            K = self.P @ H.T @ np.linalg.inv(S)
        except np.linalg.LinAlgError:
            K = self.P @ H.T @ np.linalg.pinv(S)

        # Innovation
        innov = z - H @ self.x

        # Update
        self.x = self.x + K @ innov
        self.P = (np.eye(self.n) - K @ H) @ self.P

        return float(np.linalg.norm(innov)), float(np.linalg.norm(K))

    def get_state(self) -> Tuple[NDArray[np.floating[Any]],
                                NDArray[np.floating[Any]]]:
        """
        Return current state estimate and covariance.

        Returns
        -------
        x : (n,) array
            State estimate
        P : (n, n) array
            Error covariance matrix
        """
        return self.x.copy(), self.P.copy()

Common Pitfalls and Solutions

Pitfall 1: Modifying Input Arrays

# ❌ Bad: modifies input
def predict_bad(self, F):
    self.x = F @ self.x  # If F is mutable reference...

# ✅ Good: makes copy
def predict_good(self, F):
    F = np.asarray(F, dtype=np.float64)
    self.x = F @ self.x

Pitfall 2: Not Checking Matrix Dimensions

# ❌ Bad: silent failure on dimension mismatch
def update_bad(self, z, H, R):
    S = H @ self.P @ H.T + R  # No dimension check

# ✅ Good: validate dimensions
def update_good(self, z, H, R):
    if H.shape[1] != self.n:
        raise ValueError(f"H must have {self.n} columns")

Pitfall 3: Numerical Instability

# ❌ Bad: unstable covariance update
self.P = (np.eye(self.n) - K @ H) @ self.P

# ✅ Good: Joseph form (more stable)
I_KH = np.eye(self.n) - K @ H
self.P = I_KH @ self.P @ I_KH.T + K @ R @ K.T

  • [Kalman Filter Tuning](kalman_filter_tuning.rst) — Tuning custom filters

  • [Advanced KF Variants](advanced_kf_variants.rst) — Other filter types to extend

  • [Performance Optimization](performance_optimization.rst) — Profiling and optimization

  • [Troubleshooting](troubleshooting.rst) — Debugging filter issues

Resources: