Common Use Cases & Recipes

This guide provides ready-to-use code for common tracking and estimation scenarios. Each recipe is self-contained and can be adapted to your needs.

See Library Architecture for module organization and API Navigation Guide for function discovery.

Basic Single-Target Kalman Filtering

Problem: Track a single object with position and velocity in 2D, given range/bearing measurements.

Recipe:

import numpy as np
from pytcl.dynamic_estimation.kalman import kf_predict, kf_update
from pytcl.dynamic_models import f_constant_velocity, q_constant_velocity
from pytcl.coordinate_systems.conversions import sphere2cart

class SimpleTracker:
    def __init__(self, T=0.1, sigma_a=0.1, meas_std=0.5):
        """
        T: time step (seconds)
        sigma_a: acceleration standard deviation (m/s²)
        meas_std: measurement standard deviation (meters)
        """
        self.T = T

        # State: [x, vx, y, vy]
        self.x = np.array([0.0, 0.0, 0.0, 0.0])
        self.P = np.eye(4) * 1.0  # Initial uncertainty

        # Motion model (constant velocity)
        self.F = f_constant_velocity(T, num_dims=2)
        self.Q = q_constant_velocity(T, sigma_a, num_dims=2)

        # Measurement model (observe position only)
        self.H = np.array([[1, 0, 0, 0],
                           [0, 0, 1, 0]])
        self.R = np.eye(2) * meas_std**2

    def process_measurement(self, range_bearing):
        """
        Process a range/bearing measurement (from polar: range, azimuth)
        Convert to Cartesian and update filter
        """
        # Predict
        x_pred, P_pred = kf_predict(self.x, self.P, self.F, self.Q)

        # Convert measurement from polar to Cartesian
        # Input: [range, azimuth, elevation] → convert to [x, y, z]
        cart_meas = sphere2cart(range_bearing)[:2]  # Take x, y only

        # Update
        self.x, self.P = kf_update(x_pred, P_pred, cart_meas, self.H, self.R)

        return self.x[:2], np.sqrt(np.diag(self.P)[:2])

# Usage
tracker = SimpleTracker()

# Process measurements over time
measurements = [
    np.array([100.0, 0.0, 0.0]),      # range=100, az=0°, el=0°
    np.array([100.5, 0.01, 0.0]),
    np.array([101.2, 0.02, 0.0]),
]

for meas in measurements:
    pos, std = tracker.process_measurement(meas)
    print(f"Position: {pos}, Std: {std}")

Multi-Target Tracking with Assignment

Problem: Track multiple targets with ambiguous measurements using Global Nearest Neighbor (GNN).

Recipe:

import numpy as np
from pytcl.trackers.multi_tracker_gnn import GNNTracker
from pytcl.containers import TrackSet, Track
from pytcl.coordinate_systems.conversions import sphere2cart

class MultiTargetSystem:
    def __init__(self, gate_threshold=5.0):
        self.tracker = GNNTracker(
            gate_threshold=gate_threshold,
            min_observations=2,  # Need 2 observations to confirm track
            max_age=10           # Delete track if not observed for 10 steps
        )

    def process_frame(self, measurements_polar):
        """
        Process one frame of measurements
        Input: measurements_polar - list of [range, azimuth, elevation]
        """
        # Convert measurements to Cartesian
        measurements_cart = np.array([
            sphere2cart(m)[:2] for m in measurements_polar
        ])

        # Update tracker
        self.tracker.update(measurements_cart)

        # Get track information
        confirmed_tracks = self.tracker.tracks['confirmed']

        results = []
        for track_id, track in confirmed_tracks.items():
            pos = track.state[:2]
            vel = track.state[2:4]
            results.append({
                'id': track_id,
                'position': pos,
                'velocity': vel,
                'age': track.age
            })

        return results

# Usage
mtt = MultiTargetSystem()

# Process multiple frames per second
for frame_idx in range(100):
    # Simulate measurements (3 targets)
    measurements = np.array([
        [100 + frame_idx*0.5, 0.1, 0.0],      # Target 1
        [150 - frame_idx*0.3, 1.5, 0.0],      # Target 2
        [120, 0.5, 0.0],                       # False alarm
    ])

    tracks = mtt.process_frame(measurements)
    print(f"Frame {frame_idx}: {len(tracks)} confirmed tracks")

Extended Kalman Filter with Nonlinear Dynamics

Problem: Track with nonlinear motion (e.g., coordinated turn model).

Recipe:

import numpy as np
from pytcl.dynamic_estimation.kalman import extended_kalman_filter
from pytcl.coordinate_systems.jacobians import jacobian_cart2sphere

def coordinated_turn_motion(x, T, omega):
    """
    Coordinated turn model transition
    x = [x, vx, y, vy, omega]
    """
    x_pred = x.copy()
    if abs(omega) > 1e-6:
        x_pred[0] = x[0] + x[1]/omega * (np.sin(omega*T)) + x[2]/omega * (np.cos(omega*T) - 1)
        x_pred[1] = x[1] * np.cos(omega*T) - x[2] * np.sin(omega*T)
        x_pred[2] = x[2] + x[1]/omega * (1 - np.cos(omega*T)) + x[2]/omega * np.sin(omega*T)
        x_pred[3] = x[1] * np.sin(omega*T) + x[2] * np.cos(omega*T)
    else:
        # Straight line if omega ≈ 0
        x_pred[:4] += x[1:5] * T

    return x_pred

def jacobian_motion(x, T, omega):
    """Jacobian of motion model"""
    F = np.eye(5)
    if abs(omega) > 1e-6:
        F[0, 1] = np.sin(omega * T) / omega
        F[0, 3] = (np.cos(omega * T) - 1) / omega
        F[1, 1] = np.cos(omega * T)
        F[1, 3] = -np.sin(omega * T)
        F[2, 1] = (1 - np.cos(omega * T)) / omega
        F[2, 3] = np.sin(omega * T) / omega
        F[3, 1] = np.sin(omega * T)
        F[3, 3] = np.cos(omega * T)
    else:
        F[:4, 1:3] = T * np.eye(2)

    return F

class EKFTracker:
    def __init__(self, T=0.1, omega=0.1):
        self.T = T
        self.omega = omega  # Turn rate

        # State: [x, vx, y, vy, omega]
        self.x = np.array([0.0, 1.0, 0.0, 0.0, 0.0])
        self.P = np.eye(5) * 0.1

        self.Q = np.eye(5) * 0.01
        self.H = np.eye(2, 5)  # Measure position
        self.R = np.eye(2) * 0.5**2

    def update(self, measurement):
        # Motion model
        def f_motion(x):
            return coordinated_turn_motion(x, self.T, self.omega)

        def F_jacobian(x):
            return jacobian_motion(x, self.T, self.omega)

        # Measurement model
        def h_measure(x):
            return x[:2]

        def H_jacobian(x):
            return self.H

        # Extended Kalman filter step
        result = extended_kalman_filter(
            self.x, self.P, measurement, self.Q, self.R,
            f_motion, F_jacobian, h_measure, H_jacobian
        )

        self.x = result.x
        self.P = result.P

        return self.x[:2]

# Usage
ekf = EKFTracker()
for t in range(100):
    # Simulated measurement
    z = np.array([100 + t*0.5, 50 + t*0.2]) + np.random.randn(2) * 0.5
    pos = ekf.update(z)
    print(f"Position: {pos}")

Unscented Kalman Filter

Problem: Track nonlinear system without computing Jacobians.

Recipe:

import numpy as np
from pytcl.dynamic_estimation.kalman import unscented_kalman_filter

class UKFTracker:
    def __init__(self, T=0.1, alpha=1e-3, beta=2.0, kappa=1.0):
        """
        Unscented Kalman Filter (no Jacobians needed!)

        alpha, beta, kappa: UKF parameters
        """
        self.T = T
        self.alpha = alpha
        self.beta = beta
        self.kappa = kappa

        # State: [x, vx, y, vy]
        self.x = np.array([0.0, 1.0, 0.0, 0.0])
        self.P = np.eye(4) * 0.1

        self.Q = np.eye(4) * 0.01
        self.R = np.eye(2) * 0.5**2

    def motion_model(self, x, dt):
        """Nonlinear motion: could be any complex model"""
        x_new = x.copy()
        x_new[0] += x[1] * dt
        x_new[2] += x[3] * dt
        return x_new

    def measurement_model(self, x):
        """Measure position and speed"""
        return np.array([x[0], np.sqrt(x[1]**2 + x[3]**2)])

    def update(self, measurement):
        # Predict
        def f(x):
            return self.motion_model(x, self.T)

        # UKF predict
        result = unscented_kalman_filter(
            self.x, self.P, measurement,
            self.Q, self.R, f,
            measurement_model=self.measurement_model,
            alpha=self.alpha, beta=self.beta, kappa=self.kappa
        )

        self.x = result.x
        self.P = result.P

        return self.x[:2]

# Usage
ukf = UKFTracker()
for measurement in measurements:
    pos = ukf.update(measurement)
    print(f"Position: {pos}")

INS/GNSS Navigation

Problem: Fuse inertial measurements (IMU) with occasional GNSS updates.

Recipe:

import numpy as np
from pytcl.navigation.ins_gnss import ins_update_discrete, gnss_update_discrete
from pytcl.coordinate_systems.conversions import geodetic2ecef

class INSGNSSFusion:
    def __init__(self):
        """Initialize navigation state"""
        # State: position (lat, lon, alt), velocity (NED), attitude (quaternion)
        self.lat = 40.0  # degrees
        self.lon = -74.0
        self.alt = 0.0

        self.v_north = 0.0  # m/s
        self.v_east = 0.0
        self.v_down = 0.0

        # Covariance
        self.P_pos = np.eye(3) * 10**2  # 10m std
        self.P_vel = np.eye(3) * 1**2   # 1 m/s std

    def process_imu(self, accel, gyro, dt):
        """Process IMU data (high rate, ~100 Hz)"""
        # Simple integration (real system would use quaternion kinematics)
        self.v_north += accel[0] * dt
        self.v_east += accel[1] * dt
        self.v_down += accel[2] * dt

        # Update position
        self.lat += self.v_north * dt / 111320  # approx meters to degrees
        self.lon += self.v_east * dt / (111320 * np.cos(np.radians(self.lat)))
        self.alt += -self.v_down * dt

        # Increase position uncertainty (process noise)
        self.P_pos += np.eye(3) * 0.01
        self.P_vel += np.eye(3) * 0.1

    def process_gnss(self, gnss_pos_ecef, gnss_std=5.0):
        """Process GNSS update (low rate, ~1-10 Hz)"""
        # Measurement: position in ECEF
        H = np.eye(3)
        R = np.eye(3) * gnss_std**2

        # Simple update (combine with ECEF state in real system)
        innovation = gnss_pos_ecef - self.get_ecef_position()
        K = self.P_pos @ H.T @ np.linalg.inv(H @ self.P_pos @ H.T + R)

        # Update position covariance
        self.P_pos = (np.eye(3) - K @ H) @ self.P_pos

        # In real system, would update full state

    def get_ecef_position(self):
        """Convert geodetic to ECEF"""
        geodetic = np.array([self.lat, self.lon, self.alt])
        return geodetic2ecef(geodetic)

# Usage
fusion = INSGNSSFusion()

# High-rate IMU (100 Hz)
imu_data = [(accel, gyro) for accel, gyro in imu_buffer]
for accel, gyro in imu_data:
    fusion.process_imu(accel, gyro, dt=0.01)

# Low-rate GNSS (1 Hz)
if gnss_available:
    fusion.process_gnss(gnss_position)

Particle Filter for Nonlinear Non-Gaussian Systems

Problem: Track system with non-Gaussian noise or highly nonlinear dynamics.

Recipe:

import numpy as np
from pytcl.dynamic_estimation.particle_filters import particle_filter

class ParticleFilterTracker:
    def __init__(self, n_particles=1000):
        """Initialize particle filter"""
        self.n_particles = n_particles

        # Particles: [x, vx, y, vy] for each particle
        self.particles = np.random.randn(n_particles, 4) * 0.1
        self.weights = np.ones(n_particles) / n_particles

    def motion_model(self, x, noise_scale=0.1):
        """Nonlinear motion with Cauchy-distributed noise (heavy tails)"""
        # Standard motion
        x_pred = x.copy()
        x_pred[0] += x[1] * 0.1
        x_pred[2] += x[3] * 0.1

        # Add heavy-tailed noise (Cauchy instead of Gaussian)
        x_pred += np.random.standard_cauchy(4) * noise_scale

        return x_pred

    def measurement_model(self, x):
        """Nonlinear measurement: distance from origin"""
        return np.sqrt(x[0]**2 + x[2]**2)

    def update(self, measurement, meas_std=1.0):
        """Particle filter update step"""
        # Predict all particles
        self.particles = np.array([
            self.motion_model(p) for p in self.particles
        ])

        # Compute likelihood for each particle
        predicted_meas = np.array([
            self.measurement_model(p) for p in self.particles
        ])

        # Gaussian likelihood
        likelihood = np.exp(-(predicted_meas - measurement)**2 / (2 * meas_std**2))

        # Update weights
        self.weights = likelihood * self.weights
        self.weights /= np.sum(self.weights)  # Normalize

        # Resample if necessary (effective sample size)
        n_eff = 1 / np.sum(self.weights**2)
        if n_eff < self.n_particles / 2:
            # Low effective sample size → resample
            indices = np.random.choice(self.n_particles, size=self.n_particles,
                                      p=self.weights)
            self.particles = self.particles[indices]
            self.weights = np.ones(self.n_particles) / self.n_particles

        # Estimate state as weighted average
        return np.average(self.particles, axis=0, weights=self.weights)

# Usage
pf = ParticleFilterTracker(n_particles=2000)

for measurement in measurements:
    state_estimate = pf.update(measurement)
    print(f"Position: {state_estimate[:2]}")

Adaptive Kalman Filtering

Problem: Track when measurement noise σ varies over time.

Recipe:

import numpy as np
from pytcl.dynamic_estimation.kalman import kf_predict, kf_update

class AdaptiveKalmanFilter:
    def __init__(self, T=0.1, sigma_a=0.1):
        self.x = np.array([0.0, 1.0, 0.0, 1.0])
        self.P = np.eye(4) * 0.1

        self.F = np.array([
            [1, T, 0, 0],
            [0, 1, 0, 0],
            [0, 0, 1, T],
            [0, 0, 0, 1]
        ])
        self.Q = np.eye(4) * sigma_a**2
        self.H = np.eye(2, 4)

        # Measurement noise adaptation
        self.innovation_history = []
        self.max_history = 30

    def update(self, measurement):
        # Predict
        x_pred, P_pred = kf_predict(self.x, self.P, self.F, self.Q)

        # Compute innovation
        innovation = measurement - self.H @ x_pred
        self.innovation_history.append(np.linalg.norm(innovation))

        # Keep history bounded
        if len(self.innovation_history) > self.max_history:
            self.innovation_history = self.innovation_history[1:]

        # Adapt measurement noise based on innovation
        if len(self.innovation_history) >= 10:
            mean_innovation = np.mean(self.innovation_history)
            std_innovation = np.std(self.innovation_history)

            # Increase R if innovations getting larger (measurement unreliable)
            if std_innovation > mean_innovation * 2:
                self.R = np.eye(2) * (mean_innovation ** 2)
            else:
                self.R = np.eye(2) * 0.5**2  # Default
        else:
            self.R = np.eye(2) * 0.5**2

        # Update with adapted R
        self.x, self.P = kf_update(x_pred, P_pred, measurement, self.H, self.R)

        return self.x[:2]

# Usage
adaptive_kf = AdaptiveKalmanFilter()

for measurement in measurements:
    pos = adaptive_kf.update(measurement)
    print(f"Position: {pos}")

Batch Processing / Smoothing

Problem: Process all data offline to get the best parameter estimates.

Recipe:

import numpy as np
from pytcl.dynamic_estimation.batch_estimation import least_squares_batch
from pytcl.dynamic_models import f_constant_velocity, q_constant_velocity

class BatchProcessor:
    def __init__(self, measurements, T=0.1):
        """
        Batch process all measurements at once (offline smoothing)
        """
        self.measurements = measurements
        self.T = T

    def process(self):
        """Batch least-squares estimation"""
        n = len(self.measurements)
        n_states = n * 4  # [x, vx, y, vy] for each time step

        # Build state transition matrix
        F = f_constant_velocity(self.T, num_dims=2)

        # Build full system for batch
        Phi = np.eye(n_states)
        for i in range(1, n):
            Phi[i*4:(i+1)*4, (i-1)*4:i*4] = F

        # Build measurement matrix
        H_full = np.zeros((n*2, n*4))
        for i in range(n):
            H_full[i*2:(i+1)*2, i*4:(i+1)*4] = np.eye(2, 4)

        # Build measurement vector
        z = self.measurements.flatten()

        # Solve: x = (H^T R^-1 H)^-1 H^T R^-1 z
        R_inv = np.eye(n*2) * (1 / 0.5**2)
        HTR = H_full.T @ R_inv

        x_est = np.linalg.solve(HTR @ H_full, HTR @ z)

        return x_est.reshape(n, 4)

# Usage
all_measurements = np.array([
    [100.0, 50.0], [100.5, 50.2], [101.1, 50.4], ...
])

batch = BatchProcessor(all_measurements)
trajectory = batch.process()
print(f"Estimated trajectory shape: {trajectory.shape}")

Data Association with Gating

Problem: Only associate measurements within a statistical gate.

Recipe:

import numpy as np
from scipy.spatial.distance import mahalanobis, cdist

class GatedAssociation:
    def __init__(self, gate_chi2=12.592):
        """
        Gate based on chi-square test
        gate_chi2=12.592 → 99.5% confidence interval for 2-DOF
        """
        self.gate_chi2 = gate_chi2

    def associate_measurements(self, tracks, measurements, P_innovation_list):
        """
        Associate using Mahalanobis distance with gating
        """
        n_tracks = len(tracks)
        n_meas = len(measurements)

        # Cost matrix
        cost = np.full((n_tracks, n_meas), np.inf)

        for i, track in enumerate(tracks):
            for j, meas in enumerate(measurements):
                # Predicted position
                pred = track[:2]

                # Innovation covariance
                S = P_innovation_list[i]

                # Mahalanobis distance
                innovation = meas - pred
                dist_squared = innovation @ np.linalg.inv(S) @ innovation

                # Gate check: chi-square test
                if dist_squared <= self.gate_chi2:
                    cost[i, j] = dist_squared

        return cost

# Usage
gating = GatedAssociation()

cost_matrix = gating.associate_measurements(
    track_predictions, measurements, S_list
)

# Now solve assignment on gated cost matrix
from pytcl.assignment.optimization import assignment_nd
assignments = assignment_nd(cost_matrix)

See Also