Source code for pytcl.trackers.mht

"""
Multiple Hypothesis Tracking (MHT) implementation.

MHT maintains multiple hypotheses about measurement-to-track associations,
deferring hard decisions until more information is available. This allows
the tracker to recover from association errors.

This implementation uses track-oriented MHT with N-scan pruning.

References
----------
.. [1] S. Blackman and R. Popoli, "Design and Analysis of Modern
       Tracking Systems," Artech House, 1999.
.. [2] D. Reid, "An Algorithm for Tracking Multiple Targets,"
       IEEE Trans. Automatic Control, 1979.
"""

from typing import Callable, Dict, List, NamedTuple, Optional

import numpy as np
from numpy.typing import ArrayLike, NDArray
from scipy.stats import chi2

from pytcl.assignment_algorithms.gating import mahalanobis_distance
from pytcl.trackers.hypothesis import (
    Hypothesis,
    HypothesisTree,
    MHTTrack,
    MHTTrackStatus,
    generate_joint_associations,
)


[docs] class MHTConfig(NamedTuple): """Configuration for MHT tracker. Attributes ---------- n_scan : int Number of scans for N-scan pruning. Default 3. max_hypotheses : int Maximum number of hypotheses to maintain. Default 100. detection_prob : float Probability of detection (Pd). Default 0.9. clutter_density : float Spatial density of false alarms. Default 1e-6. gate_probability : float Gating probability for chi-squared test. Default 0.99. confirm_threshold : int Number of hits to confirm a track. Default 3. delete_threshold : int Number of consecutive misses to delete a track. Default 5. min_hypothesis_prob : float Minimum hypothesis probability. Default 1e-6. new_track_weight : float Prior weight for new track hypothesis. Default 0.1. """ n_scan: int = 3 max_hypotheses: int = 100 detection_prob: float = 0.9 clutter_density: float = 1e-6 gate_probability: float = 0.99 confirm_threshold: int = 3 delete_threshold: int = 5 min_hypothesis_prob: float = 1e-6 new_track_weight: float = 0.1
[docs] class MHTResult(NamedTuple): """Result of MHT processing step. Attributes ---------- confirmed_tracks : list of MHTTrack Tracks that are confirmed. tentative_tracks : list of MHTTrack Tracks that are tentative. all_tracks : list of MHTTrack All active tracks from best hypothesis. n_hypotheses : int Number of active hypotheses. best_hypothesis_prob : float Probability of the best hypothesis. """ confirmed_tracks: List[MHTTrack] tentative_tracks: List[MHTTrack] all_tracks: List[MHTTrack] n_hypotheses: int best_hypothesis_prob: float
[docs] class MHTTracker: """ Multiple Hypothesis Tracking (MHT) tracker. Maintains multiple hypotheses about measurement-to-track associations, with N-scan pruning for complexity control. Parameters ---------- state_dim : int Dimension of state vector. meas_dim : int Dimension of measurement vector. F : callable or ndarray State transition matrix or function F(dt) -> ndarray. H : ndarray Measurement matrix. Q : callable or ndarray Process noise covariance or function Q(dt) -> ndarray. R : ndarray Measurement noise covariance. config : MHTConfig, optional Tracker configuration. Uses defaults if not provided. init_covariance : ndarray, optional Initial covariance for new tracks. Examples -------- >>> import numpy as np >>> # Constant velocity model >>> F = lambda dt: np.array([[1, dt, 0, 0], ... [0, 1, 0, 0], ... [0, 0, 1, dt], ... [0, 0, 0, 1]]) >>> H = np.array([[1, 0, 0, 0], ... [0, 0, 1, 0]]) >>> Q = lambda dt: 0.1 * np.eye(4) >>> R = np.eye(2) * 0.5 >>> tracker = MHTTracker(4, 2, F, H, Q, R) >>> # Process measurements >>> measurements = [np.array([1, 2]), np.array([5, 6])] >>> result = tracker.process(measurements, dt=1.0) """
[docs] def __init__( self, state_dim: int, meas_dim: int, F: Callable[[float], NDArray[np.floating]] | NDArray[np.floating], H: NDArray[np.floating], Q: Callable[[float], NDArray[np.floating]] | NDArray[np.floating], R: NDArray[np.floating], config: Optional[MHTConfig] = None, init_covariance: Optional[NDArray[np.floating]] = None, ): self.state_dim = state_dim self.meas_dim = meas_dim self._F = F if callable(F) else lambda dt: np.asarray(F, dtype=np.float64) self.H = np.asarray(H, dtype=np.float64) self._Q = Q if callable(Q) else lambda dt: np.asarray(Q, dtype=np.float64) self.R = np.asarray(R, dtype=np.float64) self.config = config or MHTConfig() if init_covariance is not None: self.init_covariance = np.asarray(init_covariance, dtype=np.float64) else: self.init_covariance = np.eye(state_dim) * 100.0 # Compute gate threshold self.gate_threshold = chi2.ppf(self.config.gate_probability, df=meas_dim) # Initialize hypothesis tree self.hypothesis_tree = HypothesisTree( max_hypotheses=self.config.max_hypotheses, n_scan=self.config.n_scan, min_probability=self.config.min_hypothesis_prob, ) self.hypothesis_tree.initialize() self._time = 0.0 self._scan = 0
[docs] def process( self, measurements: List[ArrayLike], dt: float, ) -> MHTResult: """ Process measurements at new time step. Parameters ---------- measurements : list of array_like List of measurement vectors. dt : float Time step since last update. Returns ------- result : MHTResult Tracking result with confirmed and tentative tracks. """ self._time += dt self._scan += 1 # Convert measurements if len(measurements) == 0: Z = np.zeros((0, self.meas_dim)) else: Z = np.array([np.asarray(m, dtype=np.float64) for m in measurements]) n_meas = len(Z) # Get current tracks from all hypotheses all_track_ids = set() for hyp in self.hypothesis_tree.hypotheses: all_track_ids.update(hyp.track_ids) current_tracks = { tid: self.hypothesis_tree.tracks[tid] for tid in all_track_ids if tid in self.hypothesis_tree.tracks } # Predict all tracks F = self._F(dt) Q = self._Q(dt) predicted_tracks = self._predict_tracks(current_tracks, F, Q) # Compute gating and likelihoods gated, likelihood_matrix = self._compute_gating_and_likelihoods( predicted_tracks, Z ) # Generate associations for each hypothesis track_id_list = list(predicted_tracks.keys()) n_tracks = len(track_id_list) if n_tracks > 0: # Create gating matrix indexed by position gated_matrix = np.zeros((n_tracks, n_meas), dtype=bool) likelihood_mat = np.zeros((n_tracks, n_meas)) for i, tid in enumerate(track_id_list): for j in range(n_meas): if (tid, j) in gated: gated_matrix[i, j] = True likelihood_mat[i, j] = likelihood_matrix.get((tid, j), 0.0) # Generate joint associations associations = generate_joint_associations(gated_matrix, n_tracks, n_meas) else: associations = [{}] # Compute likelihood for each association assoc_likelihoods = [] for assoc in associations: # Convert position-based to track_id-based track_assoc = {} for pos_idx, meas_idx in assoc.items(): track_id = track_id_list[pos_idx] if n_tracks > 0 else -1 track_assoc[track_id] = meas_idx # Compute likelihood lik = self._compute_association_likelihood( track_assoc, predicted_tracks, Z, likelihood_matrix ) assoc_likelihoods.append(lik) # Normalize likelihoods total_lik = sum(assoc_likelihoods) if total_lik > 0: assoc_likelihoods = [lik / total_lik for lik in assoc_likelihoods] elif len(assoc_likelihoods) > 0: assoc_likelihoods = [1.0 / len(assoc_likelihoods)] * len(assoc_likelihoods) # Update tracks based on associations new_tracks_per_assoc: Dict[int, List[MHTTrack]] = {} updated_tracks: Dict[int, Dict[int, MHTTrack]] = ( {} ) # assoc_idx -> track_id -> track for assoc_idx, assoc in enumerate(associations): updated_tracks[assoc_idx] = {} new_tracks_per_assoc[assoc_idx] = [] # Update existing tracks for pos_idx, meas_idx in assoc.items(): if n_tracks == 0: continue track_id = track_id_list[pos_idx] pred_track = predicted_tracks[track_id] if meas_idx >= 0: # Track with measurement upd_track = self._update_track(pred_track, Z[meas_idx], meas_idx) else: # Missed detection upd_track = self._miss_track(pred_track) updated_tracks[assoc_idx][track_id] = upd_track # Handle unassigned measurements -> new tracks assigned_meas = set( meas_idx for meas_idx in assoc.values() if meas_idx >= 0 ) for j in range(n_meas): if j not in assigned_meas: new_track = self._initiate_track(Z[j], j) new_tracks_per_assoc[assoc_idx].append(new_track) # Store updated tracks for assoc_idx, track_dict in updated_tracks.items(): for track_id, track in track_dict.items(): self.hypothesis_tree.tracks[track_id] = track # Expand hypotheses self._expand_hypotheses( associations, assoc_likelihoods, new_tracks_per_assoc, track_id_list, ) # Build result return self._build_result()
def _predict_tracks( self, tracks: Dict[int, MHTTrack], F: NDArray[np.floating], Q: NDArray[np.floating], ) -> Dict[int, MHTTrack]: """Predict all tracks forward in time.""" predicted = {} for tid, track in tracks.items(): if track.status == MHTTrackStatus.DELETED: continue x_pred = F @ track.state P_pred = F @ track.covariance @ F.T + Q predicted[tid] = MHTTrack( id=track.id, state=x_pred, covariance=P_pred, score=track.score, status=track.status, history=track.history, parent_id=track.parent_id, scan_created=track.scan_created, n_hits=track.n_hits, n_misses=track.n_misses, ) return predicted def _compute_gating_and_likelihoods( self, tracks: Dict[int, MHTTrack], Z: NDArray[np.floating], ) -> tuple[set[tuple[int, int]], dict[tuple[int, int], float]]: """Compute gating matrix and likelihood values.""" gated = set() likelihood_matrix = {} for tid, track in tracks.items(): z_pred = self.H @ track.state S = self.H @ track.covariance @ self.H.T + self.R for j in range(len(Z)): innovation = Z[j] - z_pred mahal_dist = mahalanobis_distance(innovation, S) if mahal_dist <= self.gate_threshold: gated.add((tid, j)) # Compute likelihood det_S = np.linalg.det(S) if det_S > 0: m = len(innovation) mahal_sq = innovation @ np.linalg.solve(S, innovation) likelihood = ( self.config.detection_prob * np.exp(-0.5 * mahal_sq) / np.sqrt((2 * np.pi) ** m * det_S) ) likelihood_matrix[(tid, j)] = likelihood return gated, likelihood_matrix def _compute_association_likelihood( self, association: Dict[int, int], tracks: Dict[int, MHTTrack], Z: NDArray[np.floating], likelihood_matrix: dict[tuple[int, int], float], ) -> float: """Compute likelihood of a joint association.""" likelihood = 1.0 used_meas = set() for track_id, meas_idx in association.items(): if meas_idx == -1: # Missed detection likelihood *= 1.0 - self.config.detection_prob else: # Detection if (track_id, meas_idx) in likelihood_matrix: likelihood *= likelihood_matrix[(track_id, meas_idx)] else: likelihood *= 1e-10 # Very small for ungated used_meas.add(meas_idx) # Clutter and new track terms for unassigned measurements n_unassigned = len(Z) - len(used_meas) likelihood *= ( self.config.clutter_density + self.config.new_track_weight ) ** n_unassigned return likelihood def _update_track( self, track: MHTTrack, measurement: NDArray[np.floating], meas_idx: int, ) -> MHTTrack: """Update a track with a measurement.""" # Innovation z_pred = self.H @ track.state innovation = measurement - z_pred S = self.H @ track.covariance @ self.H.T + self.R # Kalman gain K = track.covariance @ self.H.T @ np.linalg.inv(S) # Update state and covariance x_upd = track.state + K @ innovation P_upd = (np.eye(self.state_dim) - K @ self.H) @ track.covariance # Update score (log-likelihood ratio) det_S = np.linalg.det(S) if det_S > 0: mahal_sq = innovation @ np.linalg.solve(S, innovation) score_update = 0.5 * ( np.log(self.config.detection_prob) - np.log(self.config.clutter_density) - 0.5 * mahal_sq - 0.5 * np.log(det_S) ) else: score_update = 0.0 new_score = track.score + score_update # Update status n_hits = track.n_hits + 1 n_misses = 0 status = track.status if status == MHTTrackStatus.TENTATIVE: if n_hits >= self.config.confirm_threshold: status = MHTTrackStatus.CONFIRMED # Update history new_history = track.history + [meas_idx] return MHTTrack( id=track.id, state=x_upd, covariance=P_upd, score=new_score, status=status, history=new_history, parent_id=track.parent_id, scan_created=track.scan_created, n_hits=n_hits, n_misses=n_misses, ) def _miss_track(self, track: MHTTrack) -> MHTTrack: """Handle missed detection for a track.""" # Update score for missed detection score_update = np.log(1.0 - self.config.detection_prob) new_score = track.score + score_update # Update status n_misses = track.n_misses + 1 status = track.status if n_misses >= self.config.delete_threshold: status = MHTTrackStatus.DELETED # Update history new_history = track.history + [-1] return MHTTrack( id=track.id, state=track.state, covariance=track.covariance, score=new_score, status=status, history=new_history, parent_id=track.parent_id, scan_created=track.scan_created, n_hits=track.n_hits, n_misses=n_misses, ) def _initiate_track( self, measurement: NDArray[np.floating], meas_idx: int, ) -> MHTTrack: """Initiate a new track from a measurement.""" # Initialize state from measurement H_pinv = np.linalg.pinv(self.H) state = H_pinv @ measurement return MHTTrack( id=-1, # Will be assigned by hypothesis tree state=state, covariance=self.init_covariance.copy(), score=np.log(self.config.new_track_weight), status=MHTTrackStatus.TENTATIVE, history=[meas_idx], parent_id=-1, scan_created=self._scan, n_hits=1, n_misses=0, ) def _expand_hypotheses( self, associations: List[Dict[int, int]], likelihoods: List[float], new_tracks: Dict[int, List[MHTTrack]], track_id_list: List[int], ) -> None: """Expand hypotheses with new associations.""" new_hypotheses = [] for hyp in self.hypothesis_tree.hypotheses: for assoc_idx, (assoc, likelihood) in enumerate( zip(associations, likelihoods) ): # Compute new hypothesis probability new_prob = hyp.probability * likelihood if new_prob < self.config.min_hypothesis_prob: continue # Determine track IDs for new hypothesis new_track_ids = [] # Keep surviving tracks from original hypothesis for track_id in hyp.track_ids: if track_id in self.hypothesis_tree.tracks: track = self.hypothesis_tree.tracks[track_id] if track.status != MHTTrackStatus.DELETED: new_track_ids.append(track_id) # Add new tracks from this association if assoc_idx in new_tracks: for new_track in new_tracks[assoc_idx]: tid = self.hypothesis_tree.add_track(new_track) new_track_ids.append(tid) # Create new hypothesis new_hyp = Hypothesis( id=self.hypothesis_tree._get_next_hypothesis_id(), probability=new_prob, track_ids=new_track_ids, scan_created=self._scan, parent_id=hyp.id, ) new_hypotheses.append(new_hyp) # Update hypotheses if new_hypotheses: self.hypothesis_tree.hypotheses = new_hypotheses else: # Keep at least one hypothesis if self.hypothesis_tree.hypotheses: best = max(self.hypothesis_tree.hypotheses, key=lambda h: h.probability) self.hypothesis_tree.hypotheses = [best] # Prune self.hypothesis_tree.prune() def _build_result(self) -> MHTResult: """Build result from current state.""" best_tracks = self.hypothesis_tree.get_best_tracks() confirmed = [t for t in best_tracks if t.status == MHTTrackStatus.CONFIRMED] tentative = [t for t in best_tracks if t.status == MHTTrackStatus.TENTATIVE] best_hyp = self.hypothesis_tree.get_best_hypothesis() best_prob = best_hyp.probability if best_hyp else 0.0 return MHTResult( confirmed_tracks=confirmed, tentative_tracks=tentative, all_tracks=best_tracks, n_hypotheses=len(self.hypothesis_tree.hypotheses), best_hypothesis_prob=best_prob, ) @property def tracks(self) -> List[MHTTrack]: """Get all tracks from best hypothesis.""" return self.hypothesis_tree.get_best_tracks() @property def confirmed_tracks(self) -> List[MHTTrack]: """Get confirmed tracks from best hypothesis.""" return self.hypothesis_tree.get_confirmed_tracks() @property def n_hypotheses(self) -> int: """Number of active hypotheses.""" return len(self.hypothesis_tree.hypotheses)
__all__ = [ "MHTConfig", "MHTResult", "MHTTracker", ]