"""
Data association algorithms for multi-target tracking.
This module provides algorithms for associating measurements to tracks,
including Global Nearest Neighbor (GNN) and related methods.
"""
from typing import List, NamedTuple, Optional
import numpy as np
from numpy.typing import ArrayLike, NDArray
from pytcl.assignment_algorithms.gating import mahalanobis_batch, mahalanobis_distance
from pytcl.assignment_algorithms.two_dimensional import assign2d
[docs]
class AssociationResult(NamedTuple):
"""Result of data association.
Attributes
----------
track_to_measurement : ndarray
Array of shape (n_tracks,). track_to_measurement[i] gives the
measurement index assigned to track i, or -1 if unassigned.
measurement_to_track : ndarray
Array of shape (n_measurements,). measurement_to_track[j] gives the
track index assigned to measurement j, or -1 if unassigned.
costs : ndarray
Array of association costs (squared Mahalanobis distances) for
each assigned pair.
total_cost : float
Total assignment cost.
"""
track_to_measurement: NDArray[np.intp]
measurement_to_track: NDArray[np.intp]
costs: NDArray[np.float64]
total_cost: float
[docs]
def compute_association_cost(
track_predictions: ArrayLike,
track_covariances: ArrayLike,
measurements: ArrayLike,
measurement_models: Optional[ArrayLike] = None,
) -> NDArray[np.float64]:
"""
Compute cost matrix for track-to-measurement association.
Parameters
----------
track_predictions : array_like
Predicted track states of shape (n_tracks, n_state).
track_covariances : array_like
Track covariance matrices of shape (n_tracks, n_state, n_state).
measurements : array_like
Measurements of shape (n_measurements, n_meas).
measurement_models : array_like, optional
Measurement matrices of shape (n_tracks, n_meas, n_state) or
(n_meas, n_state) if same for all tracks. If None, assumes
direct measurement of first n_meas states.
Returns
-------
cost_matrix : ndarray
Cost matrix of shape (n_tracks, n_measurements).
cost_matrix[i, j] is the squared Mahalanobis distance from
track i to measurement j.
Examples
--------
>>> # 2 tracks, 3 measurements, 2D state [x, vx]
>>> predictions = np.array([[0.0, 1.0], [5.0, -1.0]])
>>> covariances = np.array([np.eye(2), np.eye(2)])
>>> measurements = np.array([[0.1], [4.9], [10.0]])
>>> H = np.array([[1.0, 0.0]]) # Measure position only
>>> costs = compute_association_cost(predictions, covariances, measurements, H)
"""
X = np.asarray(track_predictions, dtype=np.float64)
P = np.asarray(track_covariances, dtype=np.float64)
Z = np.asarray(measurements, dtype=np.float64)
n_tracks = X.shape[0]
n_meas = Z.shape[0]
n_state = X.shape[1]
meas_dim = Z.shape[1]
# Handle measurement model
if measurement_models is None:
# Default: measure first meas_dim states
H = np.zeros((meas_dim, n_state))
H[:meas_dim, :meas_dim] = np.eye(meas_dim)
H_per_track = np.tile(H, (n_tracks, 1, 1))
else:
H_arr = np.asarray(measurement_models, dtype=np.float64)
if H_arr.ndim == 2:
# Same H for all tracks
H_per_track = np.tile(H_arr, (n_tracks, 1, 1))
else:
H_per_track = H_arr
# Compute cost matrix using batch Mahalanobis distance for performance
cost_matrix = np.full((n_tracks, n_meas), np.inf, dtype=np.float64)
for i in range(n_tracks):
H = H_per_track[i]
z_pred = H @ X[i]
S = H @ P[i] @ H.T # Innovation covariance
# Compute innovations for all measurements at once
innovations = Z - z_pred # (n_meas, meas_dim)
# Use batch Mahalanobis distance (JIT-compiled)
try:
S_inv = np.linalg.inv(S)
mahalanobis_batch(innovations, S_inv, cost_matrix[i])
except np.linalg.LinAlgError:
# Fallback if S is singular
for j in range(n_meas):
cost_matrix[i, j] = mahalanobis_distance(innovations[j], S)
return cost_matrix
[docs]
def nearest_neighbor(
cost_matrix: ArrayLike,
gate_threshold: float = np.inf,
) -> AssociationResult:
"""
Simple nearest neighbor data association.
Each track is assigned to its closest measurement, but a measurement
can only be assigned to one track. Greedy assignment, not globally optimal.
Parameters
----------
cost_matrix : array_like
Cost matrix of shape (n_tracks, n_measurements).
gate_threshold : float, optional
Maximum cost for valid assignment. Assignments with cost above
this threshold are rejected.
Returns
-------
result : AssociationResult
Association result with track and measurement assignments.
Notes
-----
This is a greedy algorithm that processes tracks in order of their
minimum cost. It does not guarantee globally optimal assignment.
Use gnn_association for globally optimal assignment.
Examples
--------
>>> cost = np.array([[1.0, 5.0], [4.0, 2.0]])
>>> result = nearest_neighbor(cost, gate_threshold=10.0)
>>> result.track_to_measurement
array([0, 1])
"""
C = np.asarray(cost_matrix, dtype=np.float64)
n_tracks, n_meas = C.shape
track_to_meas = np.full(n_tracks, -1, dtype=np.intp)
meas_to_track = np.full(n_meas, -1, dtype=np.intp)
costs: List[float] = []
# Mask for available measurements
available = np.ones(n_meas, dtype=bool)
# Process tracks in order of minimum cost
min_costs = np.min(C, axis=1)
track_order = np.argsort(min_costs)
for i in track_order:
# Find minimum cost to available measurements
available_costs = np.where(available, C[i], np.inf)
j = np.argmin(available_costs)
cost = available_costs[j]
if cost <= gate_threshold:
track_to_meas[i] = j
meas_to_track[j] = i
available[j] = False
costs.append(cost)
return AssociationResult(
track_to_measurement=track_to_meas,
measurement_to_track=meas_to_track,
costs=np.array(costs, dtype=np.float64),
total_cost=float(np.sum(costs)),
)
[docs]
def gnn_association(
cost_matrix: ArrayLike,
gate_threshold: float = np.inf,
cost_of_non_assignment: Optional[float] = None,
) -> AssociationResult:
"""
Global Nearest Neighbor (GNN) data association.
Finds the globally optimal assignment of tracks to measurements
that minimizes the total association cost.
Parameters
----------
cost_matrix : array_like
Cost matrix of shape (n_tracks, n_measurements).
cost_matrix[i, j] is the cost of assigning track i to measurement j.
gate_threshold : float, optional
Maximum cost for valid assignment. Entries above this threshold
are set to infinity before optimization.
cost_of_non_assignment : float, optional
Cost for not assigning a track or measurement. If None, all
tracks/measurements that can be assigned will be assigned.
Returns
-------
result : AssociationResult
Association result with track and measurement assignments.
Examples
--------
>>> cost = np.array([[1.0, 5.0, 2.0],
... [4.0, 2.0, 3.0]])
>>> result = gnn_association(cost, gate_threshold=10.0)
>>> result.track_to_measurement
array([0, 1])
>>> result.total_cost
3.0
Notes
-----
GNN uses the Hungarian algorithm to find the globally optimal assignment.
It is more computationally expensive than nearest neighbor but guarantees
optimality.
The algorithm handles rectangular cost matrices (different numbers of
tracks and measurements) and allows tracks/measurements to remain
unassigned if cost_of_non_assignment is specified.
References
----------
.. [1] Blackman, S.S. and Popoli, R., "Design and Analysis of Modern
Tracking Systems", Artech House, 1999.
"""
C = np.asarray(cost_matrix, dtype=np.float64).copy()
n_tracks, n_meas = C.shape
# Apply gate threshold
C[C > gate_threshold] = np.inf
# Determine cost of non-assignment
if cost_of_non_assignment is None:
non_assign_cost = np.inf
else:
non_assign_cost = cost_of_non_assignment
# Solve assignment problem
assignment_result = assign2d(C, cost_of_non_assignment=non_assign_cost)
# Build output
track_to_meas = np.full(n_tracks, -1, dtype=np.intp)
meas_to_track = np.full(n_meas, -1, dtype=np.intp)
costs: List[float] = []
for i, j in zip(assignment_result.row_indices, assignment_result.col_indices):
track_to_meas[i] = j
meas_to_track[j] = i
costs.append(C[i, j])
return AssociationResult(
track_to_measurement=track_to_meas,
measurement_to_track=meas_to_track,
costs=np.array(costs, dtype=np.float64),
total_cost=float(np.sum(costs)),
)
[docs]
def gated_gnn_association(
track_predictions: ArrayLike,
track_covariances: ArrayLike,
measurements: ArrayLike,
measurement_models: Optional[ArrayLike] = None,
gate_probability: float = 0.99,
cost_of_non_assignment: Optional[float] = None,
) -> AssociationResult:
"""
GNN association with automatic gating.
Combines gating and GNN association in a single function for convenience.
Parameters
----------
track_predictions : array_like
Predicted track states of shape (n_tracks, n_state).
track_covariances : array_like
Track covariance matrices of shape (n_tracks, n_state, n_state).
measurements : array_like
Measurements of shape (n_measurements, n_meas).
measurement_models : array_like, optional
Measurement matrices. See compute_association_cost for details.
gate_probability : float, optional
Probability for chi-squared gate threshold (default: 0.99).
cost_of_non_assignment : float, optional
Cost for not assigning a track or measurement.
Returns
-------
result : AssociationResult
Association result.
Examples
--------
>>> predictions = np.array([[0.0, 1.0], [5.0, -1.0]])
>>> covariances = np.array([0.1 * np.eye(2), 0.1 * np.eye(2)])
>>> measurements = np.array([[0.1], [4.9]])
>>> H = np.array([[1.0, 0.0]])
>>> result = gated_gnn_association(
... predictions, covariances, measurements, H,
... gate_probability=0.99
... )
"""
from scipy.stats import chi2
Z = np.asarray(measurements, dtype=np.float64)
meas_dim = Z.shape[1]
# Compute gate threshold
gate_threshold = chi2.ppf(gate_probability, df=meas_dim)
# Compute cost matrix
cost_matrix = compute_association_cost(
track_predictions,
track_covariances,
measurements,
measurement_models,
)
# Run GNN with gating
return gnn_association(
cost_matrix,
gate_threshold=gate_threshold,
cost_of_non_assignment=cost_of_non_assignment,
)
__all__ = [
"AssociationResult",
"compute_association_cost",
"nearest_neighbor",
"gnn_association",
"gated_gnn_association",
]