Source code for pytcl.trackers.hypothesis

"""
Hypothesis management for Multiple Hypothesis Tracking (MHT).

This module provides data structures and algorithms for managing
hypothesis trees in track-oriented MHT implementations.

References
----------
.. [1] S. Blackman and R. Popoli, "Design and Analysis of Modern
       Tracking Systems," Artech House, 1999.
.. [2] D. Reid, "An Algorithm for Tracking Multiple Targets,"
       IEEE Trans. Automatic Control, 1979.
"""

from enum import Enum
from typing import Dict, List, NamedTuple, Optional, Set, Tuple

import numpy as np
from numpy.typing import NDArray


[docs] class MHTTrackStatus(Enum): """Track status in MHT.""" TENTATIVE = "tentative" CONFIRMED = "confirmed" DELETED = "deleted"
[docs] class MHTTrack(NamedTuple): """Track state within MHT. Attributes ---------- id : int Unique track identifier. state : ndarray State estimate vector. covariance : ndarray State covariance matrix. score : float Log-likelihood ratio score. status : MHTTrackStatus Track status. history : list of int Measurement indices associated with this track branch. -1 indicates a missed detection. parent_id : int ID of parent track (-1 for root tracks). scan_created : int Scan number when this track branch was created. n_hits : int Number of measurement updates. n_misses : int Number of consecutive misses. """ id: int state: NDArray[np.floating] covariance: NDArray[np.floating] score: float status: MHTTrackStatus history: List[int] parent_id: int scan_created: int n_hits: int n_misses: int
[docs] class Hypothesis(NamedTuple): """A global hypothesis in MHT. A hypothesis represents a consistent assignment of measurements to tracks across multiple scans. Each hypothesis maintains a set of track branches that are mutually compatible. Attributes ---------- id : int Unique hypothesis identifier. probability : float Posterior probability of this hypothesis. track_ids : list of int IDs of tracks included in this hypothesis. scan_created : int Scan number when this hypothesis was created. parent_id : int ID of parent hypothesis (-1 for initial hypothesis). """ id: int probability: float track_ids: List[int] scan_created: int parent_id: int
[docs] class HypothesisAssignment(NamedTuple): """A track-to-measurement assignment within a hypothesis. Attributes ---------- track_id : int Track ID. measurement_idx : int Measurement index (-1 for missed detection). likelihood : float Likelihood of this assignment. """ track_id: int measurement_idx: int likelihood: float
[docs] def generate_joint_associations( gated: NDArray[np.bool_], n_tracks: int, n_meas: int, ) -> List[Dict[int, int]]: """ Generate all valid joint measurement-to-track associations. A valid association satisfies: - Each measurement is assigned to at most one track - Each track is assigned to at most one measurement - Only gated track-measurement pairs are considered Parameters ---------- gated : ndarray Boolean gating matrix, shape (n_tracks, n_meas). gated[i, j] = True if track i can be associated with measurement j. n_tracks : int Number of tracks. n_meas : int Number of measurements. Returns ------- associations : list of dict List of valid associations. Each dict maps track_id to meas_idx. meas_idx = -1 indicates missed detection. Examples -------- >>> gated = np.array([[True, True], [True, True]]) >>> associations = generate_joint_associations(gated, 2, 2) >>> len(associations) # All valid 2-track, 2-measurement associations 9 """ associations = [] def _enumerate( track_idx: int, current: Dict[int, int], used_meas: Set[int], ) -> None: """Recursively enumerate associations.""" if track_idx == n_tracks: associations.append(current.copy()) return # Option 1: Track has no measurement (missed detection) current[track_idx] = -1 _enumerate(track_idx + 1, current, used_meas) # Option 2: Track is associated with a gated measurement for j in range(n_meas): if gated[track_idx, j] and j not in used_meas: current[track_idx] = j used_meas.add(j) _enumerate(track_idx + 1, current, used_meas) used_meas.remove(j) if n_tracks > 0: _enumerate(0, {}, set()) else: associations.append({}) return associations
[docs] def compute_association_likelihood( association: Dict[int, int], likelihood_matrix: NDArray[np.floating], detection_prob: float, clutter_density: float, n_meas: int, ) -> float: """ Compute likelihood of a joint association. Parameters ---------- association : dict Mapping from track_id to measurement_idx (-1 for miss). likelihood_matrix : ndarray Likelihood values, shape (n_tracks, n_meas). detection_prob : float Probability of detection. clutter_density : float Spatial density of clutter (false alarms). n_meas : int Total number of measurements. Returns ------- likelihood : float Joint likelihood of the association. Examples -------- >>> import numpy as np >>> # 2 tracks, 2 measurements >>> likelihood_matrix = np.array([[0.9, 0.1], ... [0.1, 0.8]]) >>> # Association: track 0 -> meas 0, track 1 -> meas 1 >>> association = {0: 0, 1: 1} >>> lik = compute_association_likelihood( ... association, likelihood_matrix, ... detection_prob=0.9, clutter_density=1e-6, n_meas=2 ... ) >>> lik > 0 True >>> # Association with missed detection >>> assoc_miss = {0: 0, 1: -1} # track 1 misses >>> lik_miss = compute_association_likelihood( ... assoc_miss, likelihood_matrix, ... detection_prob=0.9, clutter_density=1e-6, n_meas=2 ... ) >>> lik > lik_miss # Full detection more likely True """ likelihood = 1.0 used_meas = set() for track_id, meas_idx in association.items(): if meas_idx == -1: # Missed detection likelihood *= 1.0 - detection_prob else: # Detection likelihood *= detection_prob * likelihood_matrix[track_id, meas_idx] used_meas.add(meas_idx) # Clutter terms for unassigned measurements n_clutter = n_meas - len(used_meas) likelihood *= clutter_density**n_clutter return likelihood
[docs] def n_scan_prune( hypotheses: List[Hypothesis], tracks: Dict[int, MHTTrack], n_scan: int, current_scan: int, ) -> Tuple[List[Hypothesis], Set[int]]: """ N-scan pruning of hypotheses. Removes hypotheses that diverged from the most likely hypothesis more than n_scan scans ago. This implements "deferred decision" pruning where associations older than N scans are committed to. Parameters ---------- hypotheses : list of Hypothesis Current hypotheses. tracks : dict Mapping from track_id to MHTTrack. n_scan : int Number of scans to look back. current_scan : int Current scan number. Returns ------- pruned_hypotheses : list of Hypothesis Hypotheses surviving pruning. committed_track_ids : set Track IDs that are now committed (survived N-scan). Examples -------- >>> import numpy as np >>> from pytcl.trackers.hypothesis import ( ... Hypothesis, MHTTrack, MHTTrackStatus, n_scan_prune ... ) >>> # Two hypotheses, tracks with different creation scans >>> track1 = MHTTrack(id=0, state=np.zeros(2), covariance=np.eye(2), ... score=1.0, status=MHTTrackStatus.CONFIRMED, ... history=[0], parent_id=-1, scan_created=0, ... n_hits=3, n_misses=0) >>> track2 = MHTTrack(id=1, state=np.zeros(2), covariance=np.eye(2), ... score=0.5, status=MHTTrackStatus.TENTATIVE, ... history=[1], parent_id=-1, scan_created=2, ... n_hits=1, n_misses=0) >>> tracks = {0: track1, 1: track2} >>> hyp1 = Hypothesis(id=0, probability=0.8, track_ids=[0], ... scan_created=0, parent_id=-1) >>> hyp2 = Hypothesis(id=1, probability=0.2, track_ids=[1], ... scan_created=2, parent_id=-1) >>> pruned, committed = n_scan_prune([hyp1, hyp2], tracks, n_scan=2, ... current_scan=3) >>> len(pruned) >= 1 True Notes ----- N-scan pruning works by: 1. Finding tracks that agree across all high-probability hypotheses at scan (current_scan - n_scan) 2. Removing hypotheses that disagree with the "committed" decision """ if not hypotheses or n_scan <= 0: return hypotheses, set() cutoff_scan = current_scan - n_scan # Find best hypothesis best_hyp = max(hypotheses, key=lambda h: h.probability) # Get tracks in best hypothesis at cutoff scan best_tracks_at_cutoff = set() for track_id in best_hyp.track_ids: if track_id in tracks: track = tracks[track_id] if track.scan_created <= cutoff_scan: best_tracks_at_cutoff.add(track_id) # Prune hypotheses that disagree pruned = [] for hyp in hypotheses: # Check if hypothesis agrees with best at cutoff hyp_tracks_at_cutoff = set() for track_id in hyp.track_ids: if track_id in tracks: track = tracks[track_id] if track.scan_created <= cutoff_scan: hyp_tracks_at_cutoff.add(track_id) # Keep if tracks match (or if no tracks at cutoff) if ( hyp_tracks_at_cutoff == best_tracks_at_cutoff or len(best_tracks_at_cutoff) == 0 ): pruned.append(hyp) # Renormalize probabilities total_prob = sum(h.probability for h in pruned) if total_prob > 0: pruned = [ Hypothesis( id=h.id, probability=h.probability / total_prob, track_ids=h.track_ids, scan_created=h.scan_created, parent_id=h.parent_id, ) for h in pruned ] return pruned, best_tracks_at_cutoff
[docs] def prune_hypotheses_by_probability( hypotheses: List[Hypothesis], max_hypotheses: int, min_probability: float = 1e-6, ) -> List[Hypothesis]: """ Prune hypotheses by probability threshold and count limit. Parameters ---------- hypotheses : list of Hypothesis Current hypotheses. max_hypotheses : int Maximum number of hypotheses to retain. min_probability : float Minimum probability threshold. Returns ------- pruned : list of Hypothesis Pruned and renormalized hypotheses. Examples -------- >>> from pytcl.trackers.hypothesis import Hypothesis, prune_hypotheses_by_probability >>> # 5 hypotheses with varying probabilities >>> hyps = [ ... Hypothesis(id=0, probability=0.5, track_ids=[0], scan_created=0, parent_id=-1), ... Hypothesis(id=1, probability=0.3, track_ids=[1], scan_created=0, parent_id=-1), ... Hypothesis(id=2, probability=0.1, track_ids=[2], scan_created=0, parent_id=-1), ... Hypothesis(id=3, probability=0.05, track_ids=[3], scan_created=0, parent_id=-1), ... Hypothesis(id=4, probability=1e-8, track_ids=[4], scan_created=0, parent_id=-1), ... ] >>> pruned = prune_hypotheses_by_probability(hyps, max_hypotheses=3) >>> len(pruned) # Only top 3 kept 3 >>> sum(h.probability for h in pruned) # Renormalized to 1 1.0 """ if not hypotheses: return [] # Filter by minimum probability filtered = [h for h in hypotheses if h.probability >= min_probability] if not filtered: # Keep at least the best one filtered = [max(hypotheses, key=lambda h: h.probability)] # Sort by probability (descending) filtered.sort(key=lambda h: h.probability, reverse=True) # Keep top max_hypotheses filtered = filtered[:max_hypotheses] # Renormalize total_prob = sum(h.probability for h in filtered) if total_prob > 0: filtered = [ Hypothesis( id=h.id, probability=h.probability / total_prob, track_ids=h.track_ids, scan_created=h.scan_created, parent_id=h.parent_id, ) for h in filtered ] return filtered
[docs] class HypothesisTree: """ Manages hypothesis tree for MHT. The hypothesis tree represents all possible interpretations of measurement-to-track associations across multiple scans. Parameters ---------- max_hypotheses : int Maximum number of hypotheses to maintain. n_scan : int Number of scans for N-scan pruning. min_probability : float Minimum hypothesis probability threshold. Attributes ---------- hypotheses : list of Hypothesis Current set of hypotheses. tracks : dict Mapping from track_id to MHTTrack. current_scan : int Current scan number. """
[docs] def __init__( self, max_hypotheses: int = 100, n_scan: int = 3, min_probability: float = 1e-6, ): self.max_hypotheses = max_hypotheses self.n_scan = n_scan self.min_probability = min_probability self.hypotheses: List[Hypothesis] = [] self.tracks: Dict[int, MHTTrack] = {} self.current_scan = 0 self._next_hypothesis_id = 0 self._next_track_id = 0
[docs] def initialize( self, initial_tracks: Optional[List[MHTTrack]] = None, ) -> None: """ Initialize the hypothesis tree. Parameters ---------- initial_tracks : list of MHTTrack, optional Initial tracks to include. """ self.hypotheses = [] self.tracks = {} self.current_scan = 0 self._next_hypothesis_id = 0 self._next_track_id = 0 if initial_tracks: for track in initial_tracks: self.tracks[track.id] = track self._next_track_id = max(self._next_track_id, track.id + 1) # Create initial hypothesis track_ids = list(self.tracks.keys()) if initial_tracks else [] initial_hyp = Hypothesis( id=self._get_next_hypothesis_id(), probability=1.0, track_ids=track_ids, scan_created=0, parent_id=-1, ) self.hypotheses = [initial_hyp]
def _get_next_hypothesis_id(self) -> int: """Get next hypothesis ID.""" id = self._next_hypothesis_id self._next_hypothesis_id += 1 return id def _get_next_track_id(self) -> int: """Get next track ID.""" id = self._next_track_id self._next_track_id += 1 return id
[docs] def add_track(self, track: MHTTrack) -> int: """ Add a new track to the tree. Parameters ---------- track : MHTTrack Track to add. Returns ------- track_id : int ID assigned to the track. """ track_id = self._get_next_track_id() new_track = MHTTrack( id=track_id, state=track.state, covariance=track.covariance, score=track.score, status=track.status, history=track.history, parent_id=track.parent_id, scan_created=track.scan_created, n_hits=track.n_hits, n_misses=track.n_misses, ) self.tracks[track_id] = new_track return track_id
[docs] def expand_hypotheses( self, associations: List[Dict[int, int]], likelihoods: List[float], new_tracks: Dict[int, List[MHTTrack]], ) -> None: """ Expand hypotheses with new associations. Parameters ---------- associations : list of dict Valid joint associations. likelihoods : list of float Likelihood of each association. new_tracks : dict Mapping from association_idx to list of new tracks created by that association. """ self.current_scan += 1 new_hypotheses = [] for hyp in self.hypotheses: for assoc_idx, (assoc, likelihood) in enumerate( zip(associations, likelihoods) ): # Compute new hypothesis probability new_prob = hyp.probability * likelihood # Determine track IDs for new hypothesis new_track_ids = [] # Update existing tracks based on association for track_id in hyp.track_ids: if track_id in assoc: # Track continues with association new_track_ids.append(track_id) # Add new tracks from this association if assoc_idx in new_tracks: for new_track in new_tracks[assoc_idx]: tid = self.add_track(new_track) new_track_ids.append(tid) # Create new hypothesis new_hyp = Hypothesis( id=self._get_next_hypothesis_id(), probability=new_prob, track_ids=new_track_ids, scan_created=self.current_scan, parent_id=hyp.id, ) new_hypotheses.append(new_hyp) # Replace hypotheses self.hypotheses = new_hypotheses # Prune self.prune()
[docs] def prune(self) -> None: """Apply all pruning strategies.""" # Probability-based pruning self.hypotheses = prune_hypotheses_by_probability( self.hypotheses, self.max_hypotheses, self.min_probability, ) # N-scan pruning self.hypotheses, committed = n_scan_prune( self.hypotheses, self.tracks, self.n_scan, self.current_scan, ) # Remove orphaned tracks self._remove_orphaned_tracks()
def _remove_orphaned_tracks(self) -> None: """Remove tracks not referenced by any hypothesis.""" referenced = set() for hyp in self.hypotheses: referenced.update(hyp.track_ids) orphans = set(self.tracks.keys()) - referenced for track_id in orphans: del self.tracks[track_id]
[docs] def get_best_hypothesis(self) -> Optional[Hypothesis]: """Get the most probable hypothesis.""" if not self.hypotheses: return None return max(self.hypotheses, key=lambda h: h.probability)
[docs] def get_best_tracks(self) -> List[MHTTrack]: """Get tracks from the best hypothesis.""" best = self.get_best_hypothesis() if best is None: return [] return [self.tracks[tid] for tid in best.track_ids if tid in self.tracks]
[docs] def get_confirmed_tracks(self) -> List[MHTTrack]: """Get confirmed tracks from the best hypothesis.""" tracks = self.get_best_tracks() return [t for t in tracks if t.status == MHTTrackStatus.CONFIRMED]
__all__ = [ "MHTTrackStatus", "MHTTrack", "Hypothesis", "HypothesisAssignment", "generate_joint_associations", "compute_association_likelihood", "n_scan_prune", "prune_hypotheses_by_probability", "HypothesisTree", ]