Source code for pytcl.assignment_algorithms.jpda

"""
Joint Probabilistic Data Association (JPDA) algorithm.

JPDA computes association probabilities between tracks and measurements
by considering all possible joint association hypotheses, then performs
a probabilistically weighted update for each track.

This is more sophisticated than GNN which makes hard assignment decisions,
as JPDA can handle measurement origin uncertainty in cluttered environments.
"""

from typing import Any, List, NamedTuple, Optional

import numpy as np
from numba import njit
from numpy.typing import ArrayLike, NDArray
from scipy.stats import chi2

from pytcl.assignment_algorithms.gating import mahalanobis_distance


[docs] class JPDAResult(NamedTuple): """Result of JPDA algorithm. Attributes ---------- association_probs : ndarray[Any] Association probability matrix of shape (n_tracks, n_measurements + 1). association_probs[i, j] is the probability that track i is associated with measurement j. The last column (j = n_measurements) represents the probability that track i has no measurement. marginal_probs : list of ndarray List of marginal association probabilities for each track. marginal_probs[i][j] = P(measurement j originated from track i). likelihood_matrix : ndarray[Any] Measurement likelihood matrix of shape (n_tracks, n_measurements). gated : ndarray[Any] Boolean matrix indicating which track-measurement pairs passed gating. """ association_probs: NDArray[np.floating] marginal_probs: List[NDArray[np.floating]] likelihood_matrix: NDArray[np.floating] gated: NDArray[np.bool_]
[docs] class JPDAUpdate(NamedTuple): """Result of JPDA-based track update. Attributes ---------- states : list of ndarray Updated state estimates for each track. covariances : list of ndarray Updated covariances for each track (includes spread of means). association_probs : ndarray[Any] Association probability matrix. innovations : list of ndarray Combined weighted innovations for each track. """ states: List[NDArray[np.floating]] covariances: List[NDArray[np.floating]] association_probs: NDArray[np.floating] innovations: List[NDArray[np.floating]]
[docs] def compute_measurement_likelihood( innovation: NDArray[Any], innovation_cov: NDArray[Any], detection_prob: float = 1.0, ) -> float: """ Compute measurement likelihood for a track-measurement pair. Parameters ---------- innovation : ndarray[Any] Measurement innovation (residual), shape (m,). innovation_cov : ndarray[Any] Innovation covariance, shape (m, m). detection_prob : float Probability of detection (Pd). Returns ------- likelihood : float Measurement likelihood. """ m = len(innovation) det_S = np.linalg.det(innovation_cov) if det_S <= 0: return 0.0 mahal_sq = innovation @ np.linalg.solve(innovation_cov, innovation) likelihood = ( detection_prob * np.exp(-0.5 * mahal_sq) / np.sqrt((2 * np.pi) ** m * det_S) ) return likelihood
[docs] def compute_likelihood_matrix( track_states: list[NDArray[Any]], track_covariances: list[NDArray[Any]], measurements: NDArray[Any], H: NDArray[Any], R: NDArray[Any], detection_prob: float = 1.0, gate_threshold: Optional[float] = None, ) -> tuple[NDArray[Any], NDArray[Any]]: """ Compute likelihood matrix for all track-measurement pairs. Parameters ---------- track_states : list of ndarray State estimates for each track. track_covariances : list of ndarray Covariances for each track. measurements : ndarray[Any] Measurements, shape (n_meas, m). H : ndarray[Any] Measurement matrix, shape (m, n). R : ndarray[Any] Measurement noise covariance, shape (m, m). detection_prob : float Probability of detection. gate_threshold : float, optional Mahalanobis distance threshold for gating. Returns ------- likelihood_matrix : ndarray[Any] Likelihood values, shape (n_tracks, n_meas). gated : ndarray[Any] Boolean gating matrix, shape (n_tracks, n_meas). Examples -------- >>> import numpy as np >>> # Two tracks, two measurements >>> states = [np.array([0.0, 1.0]), np.array([5.0, 0.0])] >>> covs = [np.eye(2) * 0.5, np.eye(2) * 0.5] >>> measurements = np.array([[0.1], [5.2]]) >>> H = np.array([[1, 0]]) >>> R = np.array([[0.1]]) >>> L, gated = compute_likelihood_matrix(states, covs, measurements, H, R) >>> L.shape (2, 2) >>> # Track 0 should have high likelihood for measurement 0 >>> L[0, 0] > L[0, 1] True """ n_tracks = len(track_states) n_meas = len(measurements) likelihood_matrix = np.zeros((n_tracks, n_meas)) gated = np.zeros((n_tracks, n_meas), dtype=bool) for i in range(n_tracks): # Predicted measurement and innovation covariance z_pred = H @ track_states[i] S = H @ track_covariances[i] @ H.T + R for j in range(n_meas): innovation = measurements[j] - z_pred mahal_dist = mahalanobis_distance(innovation, S) # Check gate if gate_threshold is None or mahal_dist <= gate_threshold: gated[i, j] = True likelihood_matrix[i, j] = compute_measurement_likelihood( innovation, S, detection_prob ) return likelihood_matrix, gated
[docs] def jpda_probabilities( likelihood_matrix: NDArray[Any], gated: NDArray[Any], detection_prob: float = 1.0, clutter_density: float = 1e-6, ) -> NDArray[Any]: """ Compute JPDA association probabilities. Uses an efficient algorithm that avoids explicit enumeration of all joint hypotheses when possible. Parameters ---------- likelihood_matrix : ndarray[Any] Likelihood values, shape (n_tracks, n_meas). gated : ndarray[Any] Boolean gating matrix, shape (n_tracks, n_meas). detection_prob : float Probability of detection (Pd). clutter_density : float Spatial density of clutter (lambda_c). Returns ------- beta : ndarray[Any] Association probability matrix, shape (n_tracks, n_meas + 1). beta[i, j] = P(measurement j is from track i) for j < n_meas. beta[i, n_meas] = P(track i has no measurement). Examples -------- >>> import numpy as np >>> # Likelihood matrix: 2 tracks, 2 measurements >>> # Track 0 has high likelihood for meas 0 >>> # Track 1 has high likelihood for meas 1 >>> likelihood = np.array([[0.9, 0.1], ... [0.1, 0.8]]) >>> gated = np.array([[True, True], ... [True, True]]) >>> beta = jpda_probabilities(likelihood, gated, detection_prob=0.9) >>> beta.shape # 2 tracks, 3 columns (2 meas + 1 miss) (2, 3) >>> # Track 0 most likely associated with measurement 0 >>> np.argmax(beta[0, :2]) 0 """ n_tracks, n_meas = likelihood_matrix.shape # Initialize beta matrix (last column is for "no measurement") beta = np.zeros((n_tracks, n_meas + 1)) if n_meas == 0: # No measurements - all tracks have no association beta[:, 0] = 1.0 return beta if n_tracks == 0: return beta # For small problems, use exact enumeration # For larger problems, use approximate method if n_tracks <= 5 and n_meas <= 5: beta = _jpda_exact(likelihood_matrix, gated, detection_prob, clutter_density) else: beta = _jpda_approximate( likelihood_matrix, gated, detection_prob, clutter_density ) return beta
def _jpda_exact( likelihood_matrix: NDArray[Any], gated: NDArray[Any], detection_prob: float, clutter_density: float, ) -> NDArray[Any]: """ Exact JPDA computation via hypothesis enumeration. This is exponential in complexity but exact for small problems. """ n_tracks, n_meas = likelihood_matrix.shape beta = np.zeros((n_tracks, n_meas + 1)) # Generate all valid joint association hypotheses # A hypothesis is a mapping from measurements to tracks (or clutter) # measurement j -> track assignment[j] where -1 means clutter # We enumerate by considering all possible measurement-to-track assignments # where each measurement can go to at most one track that gates it def generate_hypotheses( meas_idx: int, current_assignment: List[int], used_tracks: set[Any], ) -> Any: """Recursively generate valid hypotheses.""" if meas_idx == n_meas: yield current_assignment.copy() return # Option 1: measurement is clutter current_assignment.append(-1) yield from generate_hypotheses(meas_idx + 1, current_assignment, used_tracks) current_assignment.pop() # Option 2: measurement is from a track (if gated and track not used) for track_idx in range(n_tracks): if gated[track_idx, meas_idx] and track_idx not in used_tracks: current_assignment.append(track_idx) used_tracks.add(track_idx) yield from generate_hypotheses( meas_idx + 1, current_assignment, used_tracks ) used_tracks.remove(track_idx) current_assignment.pop() # Compute hypothesis probabilities hypothesis_probs = [] hypothesis_assignments = [] for assignment in generate_hypotheses(0, [], set[Any]()): # Compute probability of this hypothesis prob = 1.0 detected_tracks = set[Any]() for j, track_idx in enumerate(assignment): if track_idx == -1: # Measurement j is clutter prob *= clutter_density else: # Measurement j is from track track_idx prob *= likelihood_matrix[track_idx, j] detected_tracks.add(track_idx) # Account for non-detected tracks for i in range(n_tracks): if i in detected_tracks: prob *= detection_prob else: prob *= 1.0 - detection_prob hypothesis_probs.append(prob) hypothesis_assignments.append(assignment) # Normalize total_prob = sum(hypothesis_probs) if total_prob > 0: hypothesis_probs = [p / total_prob for p in hypothesis_probs] # Compute marginal association probabilities for h_idx, (assignment, prob) in enumerate( zip(hypothesis_assignments, hypothesis_probs) ): detected_tracks = set[Any]() for j, track_idx in enumerate(assignment): if track_idx >= 0: beta[track_idx, j] += prob detected_tracks.add(track_idx) # Tracks with no measurement for i in range(n_tracks): if i not in detected_tracks: beta[i, n_meas] += prob return beta @njit(cache=True) def _jpda_approximate_core( likelihood_matrix: np.ndarray[Any, Any], gated: np.ndarray[Any, Any], detection_prob: float, clutter_density: float, ) -> np.ndarray[Any, Any]: """JIT-compiled core of approximate JPDA computation.""" n_tracks = likelihood_matrix.shape[0] n_meas = likelihood_matrix.shape[1] beta = np.zeros((n_tracks, n_meas + 1), dtype=np.float64) # Likelihood ratio for each measurement given each track L = np.zeros((n_tracks, n_meas), dtype=np.float64) for i in range(n_tracks): for j in range(n_meas): if gated[i, j] and clutter_density > 0: L[i, j] = likelihood_matrix[i, j] / clutter_density elif gated[i, j]: L[i, j] = likelihood_matrix[i, j] * 1e10 # Compute delta factors (accounts for other tracks) delta = np.ones((n_tracks, n_meas), dtype=np.float64) for j in range(n_meas): sum_L = 0.0 for i in range(n_tracks): sum_L += L[i, j] for i in range(n_tracks): if sum_L > 0: delta[i, j] = 1.0 / (1.0 + sum_L - L[i, j]) else: delta[i, j] = 1.0 # Compute association probabilities for i in range(n_tracks): denom = 1.0 - detection_prob for j in range(n_meas): if gated[i, j]: beta[i, j] = detection_prob * L[i, j] * delta[i, j] denom += beta[i, j] if denom > 0: for j in range(n_meas): beta[i, j] /= denom beta[i, n_meas] = (1.0 - detection_prob) / denom else: beta[i, n_meas] = 1.0 return beta def _jpda_approximate( likelihood_matrix: NDArray[Any], gated: NDArray[Any], detection_prob: float, clutter_density: float, ) -> NDArray[Any]: """ Approximate JPDA using parametric approach. Uses the approach from [1] which is O(n_tracks * n_meas^2). References ---------- .. [1] Fitzgerald, R.J., "Development of Practical PDA Logic for Multitarget Tracking by Microprocessor", American Control Conference, 1986. """ return _jpda_approximate_core( likelihood_matrix.astype(np.float64), gated.astype(np.bool_), detection_prob, clutter_density, )
[docs] def jpda_update( track_states: List[ArrayLike], track_covariances: List[ArrayLike], measurements: ArrayLike, H: ArrayLike, R: ArrayLike, detection_prob: float = 0.9, clutter_density: float = 1e-6, gate_probability: float = 0.99, ) -> JPDAUpdate: """ Perform JPDA-based track update. Parameters ---------- track_states : list of array_like Predicted state estimates for each track. track_covariances : list of array_like Predicted covariances for each track. measurements : array_like Measurements, shape (n_meas, m). H : array_like Measurement matrix, shape (m, n). R : array_like Measurement noise covariance, shape (m, m). detection_prob : float Probability of detection (Pd). Default 0.9. clutter_density : float Spatial density of clutter. Default 1e-6. gate_probability : float Probability for chi-squared gate. Default 0.99. Returns ------- result : JPDAUpdate Updated states and covariances with association probabilities. Examples -------- >>> import numpy as np >>> # Two tracks, three measurements >>> x1 = np.array([0., 1.]) # [position, velocity] >>> x2 = np.array([5., -1.]) >>> P = np.eye(2) * 0.1 >>> measurements = np.array([[0.1], [5.2], [10.0]]) >>> H = np.array([[1., 0.]]) # Measure position >>> R = np.array([[0.1]]) >>> result = jpda_update([x1, x2], [P, P], measurements, H, R) >>> len(result.states) 2 Notes ----- The JPDA update consists of: 1. Compute measurement likelihoods and gating 2. Compute association probabilities 3. Compute combined innovations for each track 4. Update each track with weighted innovation 5. Compute covariance with spread of means term References ---------- .. [1] Bar-Shalom, Y. and Fortmann, T.E., "Tracking and Data Association", Academic Press, 1988. """ # Convert inputs track_states = [np.asarray(x, dtype=np.float64).flatten() for x in track_states] track_covariances = [np.asarray(P, dtype=np.float64) for P in track_covariances] measurements = np.asarray(measurements, dtype=np.float64) H = np.asarray(H, dtype=np.float64) R = np.asarray(R, dtype=np.float64) n_tracks = len(track_states) n_meas = len(measurements) n_state = len(track_states[0]) if n_tracks > 0 else 0 m = H.shape[0] # Compute gate threshold gate_threshold = chi2.ppf(gate_probability, df=m) # Compute likelihood matrix and gating likelihood_matrix, gated = compute_likelihood_matrix( track_states, track_covariances, measurements, H, R, detection_prob, gate_threshold, ) # Compute association probabilities beta = jpda_probabilities(likelihood_matrix, gated, detection_prob, clutter_density) # Update each track updated_states = [] updated_covs = [] innovations = [] for i in range(n_tracks): x = track_states[i] P = track_covariances[i] # Innovation covariance S = H @ P @ H.T + R # Kalman gain K = P @ H.T @ np.linalg.inv(S) # Predicted measurement z_pred = H @ x # Compute combined innovation (weighted sum of innovations) combined_innovation = np.zeros(m) for j in range(n_meas): if gated[i, j]: innovation_j = measurements[j] - z_pred combined_innovation += beta[i, j] * innovation_j innovations.append(combined_innovation) # Updated state x_upd = x + K @ combined_innovation # Updated covariance with spread of means term # P_upd = beta_0 * P + (1 - beta_0) * P_c + P_spread # where P_c = P - K @ S @ K.T (covariance after update) # and P_spread = K @ E[y*y'] @ K.T - K @ E[y] @ E[y]' @ K.T beta_0 = beta[i, n_meas] # Probability of no measurement # Covariance reduction from update P_c = P - K @ S @ K.T # Spread of means P_spread = np.zeros((n_state, n_state)) for j in range(n_meas): if gated[i, j]: innovation_j = measurements[j] - z_pred y_weighted = innovation_j - combined_innovation P_spread += beta[i, j] * K @ np.outer(y_weighted, y_weighted) @ K.T # Combined covariance P_upd = beta_0 * P + (1 - beta_0) * P_c + P_spread # Ensure symmetry P_upd = (P_upd + P_upd.T) / 2 updated_states.append(x_upd) updated_covs.append(P_upd) return JPDAUpdate( states=updated_states, covariances=updated_covs, association_probs=beta, innovations=innovations, )
[docs] def jpda( track_states: List[ArrayLike], track_covariances: List[ArrayLike], measurements: ArrayLike, H: ArrayLike, R: ArrayLike, detection_prob: float = 0.9, clutter_density: float = 1e-6, gate_probability: float = 0.99, ) -> JPDAResult: """ Compute JPDA association probabilities. This is a convenience function that computes association probabilities without performing the state update. Parameters ---------- track_states : list of array_like Predicted state estimates for each track. track_covariances : list of array_like Predicted covariances for each track. measurements : array_like Measurements, shape (n_meas, m). H : array_like Measurement matrix, shape (m, n). R : array_like Measurement noise covariance, shape (m, m). detection_prob : float Probability of detection. Default 0.9. clutter_density : float Spatial density of clutter. Default 1e-6. gate_probability : float Probability for chi-squared gate. Default 0.99. Returns ------- result : JPDAResult Association probabilities and related information. Examples -------- Compute association probabilities for 2 tracks and 3 measurements: >>> import numpy as np >>> # Two tracks with [x, vx] state >>> states = [np.array([0.0, 1.0]), np.array([10.0, -0.5])] >>> covariances = [np.eye(2) * 0.5, np.eye(2) * 0.5] >>> # Three position measurements >>> measurements = np.array([[0.1], [9.8], [5.0]]) >>> H = np.array([[1, 0]]) # measure position only >>> R = np.array([[0.1]]) >>> result = jpda(states, covariances, measurements, H, R) >>> result.association_probs.shape # (2 tracks, 4 columns: 3 meas + miss) (2, 4) >>> # Track 0 should have high prob for measurement 0 >>> result.association_probs[0, 0] > 0.5 True See Also -------- jpda_update : JPDA with state update. """ # Convert inputs track_states = [np.asarray(x, dtype=np.float64).flatten() for x in track_states] track_covariances = [np.asarray(P, dtype=np.float64) for P in track_covariances] measurements = np.asarray(measurements, dtype=np.float64) H = np.asarray(H, dtype=np.float64) R = np.asarray(R, dtype=np.float64) m = H.shape[0] # Compute gate threshold gate_threshold = chi2.ppf(gate_probability, df=m) # Compute likelihood matrix and gating likelihood_matrix, gated = compute_likelihood_matrix( track_states, track_covariances, measurements, H, R, detection_prob, gate_threshold, ) # Compute association probabilities beta = jpda_probabilities(likelihood_matrix, gated, detection_prob, clutter_density) # Compute marginal probabilities for each track n_tracks = len(track_states) n_meas = len(measurements) marginal_probs = [] for i in range(n_tracks): marginal_probs.append(beta[i, :n_meas].copy()) return JPDAResult( association_probs=beta, marginal_probs=marginal_probs, likelihood_matrix=likelihood_matrix, gated=gated, )
__all__ = [ "JPDAResult", "JPDAUpdate", "compute_measurement_likelihood", "compute_likelihood_matrix", "jpda_probabilities", "jpda_update", "jpda", ]