Assignment Algorithms

This example demonstrates various assignment algorithms for data association.

Overview

Assignment algorithms solve the measurement-to-track association problem:

  • 2D Assignment: One-to-one matching (Hungarian, Auction)

  • Multi-dimensional: S-D assignment for multi-sensor fusion

  • K-best: Finding multiple good assignments (Murty’s algorithm)

Algorithms

Hungarian Algorithm (Kuhn-Munkres)
  • Optimal O(n³) solution for 2D assignment

  • Guaranteed minimum cost assignment

Auction Algorithm
  • Bertsekas’ auction-based approach

  • Good for sparse cost matrices

Assignment in Action: The result of optimal assignment showing tracks correctly associated with measurements over time.

Global Nearest Neighbor (GNN)
  • Fast suboptimal assignment

  • Uses gating for efficiency

JPDA
  • Joint Probabilistic Data Association

  • Maintains association probabilities

Murty’s K-best
  • Finds k best assignments

  • Used in MHT hypothesis management

Multi-Sensor Fusion: S-D assignment extends 2D methods to fuse measurements from multiple sensors.

Key Concepts

  • Cost matrix: Negative log-likelihood of associations

  • Gating: Statistical test to reduce candidates

  • Dummy assignments: Handling missed detections and clutter

Code Highlights

The example demonstrates:

  • Constructing cost matrices from track-measurement distances

  • Using hungarian() for optimal assignment

  • auction_algorithm() for iterative bidding

  • murty_kbest() for ranked assignments

  • jpda() for association probabilities

Source Code

  1"""
  2Assignment Algorithms Example
  3=============================
  4
  5This example demonstrates the assignment algorithms in PyTCL:
  6
  72D Assignment:
  8- Hungarian algorithm (optimal)
  9- Auction algorithm
 10- Linear sum assignment wrapper
 11
 12K-Best 2D Assignment (v0.17.0):
 13- Murty's algorithm for finding k-best assignments
 14- Ranked assignment enumeration with cost thresholds
 15
 163D Assignment (v0.17.0):
 17- Lagrangian relaxation
 18- Auction-based 3D assignment
 19- Greedy assignment
 20- 2D decomposition method
 21
 22Data Association:
 23- Global Nearest Neighbor (GNN)
 24- Gating (ellipsoidal and rectangular)
 25- JPDA (Joint Probabilistic Data Association)
 26
 27These algorithms are fundamental for multi-target tracking, where
 28measurements must be assigned to tracks optimally.
 29"""
 30
 31import numpy as np
 32import plotly.graph_objects as go
 33from plotly.subplots import make_subplots
 34
 35from pytcl.assignment_algorithms import (  # 2D Assignment
 36    assign2d,
 37    assign3d,
 38    assign3d_auction,
 39    assign3d_lagrangian,
 40    auction,
 41    chi2_gate_threshold,
 42    compute_association_cost,
 43    decompose_to_2d,
 44    ellipsoidal_gate,
 45    gated_gnn_association,
 46    gnn_association,
 47    greedy_3d,
 48    hungarian,
 49    jpda,
 50    kbest_assign2d,
 51    mahalanobis_distance,
 52    murty,
 53    ranked_assignments,
 54    rectangular_gate,
 55)
 56
 57
 58def demo_2d_assignment():
 59    """Demonstrate basic 2D assignment algorithms."""
 60    print("=" * 70)
 61    print("2D Assignment Algorithms Demo")
 62    print("=" * 70)
 63
 64    # Create a cost matrix: 4 tracks, 5 measurements
 65    # Lower cost = better match
 66    np.random.seed(42)
 67    cost = np.array(
 68        [
 69            [10, 5, 13, 4, 8],  # Track 0: best match is measurement 3
 70            [3, 15, 8, 12, 6],  # Track 1: best match is measurement 0
 71            [12, 7, 9, 5, 11],  # Track 2: best match is measurement 3 or 1
 72            [8, 6, 4, 10, 3],  # Track 3: best match is measurement 4
 73        ],
 74        dtype=float,
 75    )
 76
 77    print("\nCost matrix (4 tracks x 5 measurements):")
 78    print(cost)
 79
 80    # Hungarian algorithm (optimal)
 81    row_h, col_h, cost_h = hungarian(cost)
 82    print("\n--- Hungarian Algorithm (Optimal) ---")
 83    print(f"Assignments: {list(zip(row_h, col_h))}")
 84    print(f"Total cost: {cost_h}")
 85
 86    # Auction algorithm
 87    row_a, col_a, cost_a = auction(cost)
 88    print("\n--- Auction Algorithm ---")
 89    print(f"Assignments: {list(zip(row_a, col_a))}")
 90    print(f"Total cost: {cost_a}")
 91
 92    # Using assign2d unified interface
 93    result = assign2d(cost)
 94    print("\n--- assign2d() Interface ---")
 95    print(f"Row indices: {result.row_indices}")
 96    print(f"Col indices: {result.col_indices}")
 97    print(f"Total cost: {result.cost}")
 98
 99    # Rectangular (non-square) assignment
100    print("\n--- Rectangular Assignment ---")
101    rect_cost = np.random.rand(3, 6) * 10  # 3 tracks, 6 measurements
102    row_rect, col_rect, cost_rect = hungarian(rect_cost)
103    print("3 tracks assigned to 6 measurements")
104    print(f"Assignments: {list(zip(row_rect, col_rect))}")
105    assigned_cols = set(col_rect)
106    unassigned_cols = [i for i in range(6) if i not in assigned_cols]
107    print(f"Unassigned measurements: {unassigned_cols}")
108
109    # With cost of non-assignment (allows skipping bad matches)
110    print("\n--- With Cost of Non-Assignment ---")
111    cost_with_skip = np.array(
112        [
113            [10, 100, 100],
114            [100, 5, 100],
115            [100, 100, 100],  # Track 2 has no good matches
116        ],
117        dtype=float,
118    )
119    result_skip = assign2d(cost_with_skip, cost_of_non_assignment=20)
120    print("Cost matrix with one track having no good matches:")
121    print(cost_with_skip)
122    print(f"Assignments: {list(zip(result_skip.row_indices, result_skip.col_indices))}")
123    print(f"Unassigned rows: {list(result_skip.unassigned_rows)}")
124
125
126def demo_kbest_assignment():
127    """Demonstrate k-best 2D assignment (Murty's algorithm)."""
128    print("\n" + "=" * 70)
129    print("K-Best 2D Assignment Demo (Murty's Algorithm)")
130    print("=" * 70)
131
132    # Cost matrix
133    cost = np.array(
134        [
135            [10, 5, 13],
136            [3, 15, 8],
137            [12, 7, 9],
138        ],
139        dtype=float,
140    )
141
142    print("\nCost matrix (3x3):")
143    print(cost)
144
145    # Find k best assignments
146    k = 5
147    result = murty(cost, k=k)
148
149    print(f"\n--- Finding {k} Best Assignments ---")
150    print(f"Found: {result.n_found} assignments")
151
152    for i, (assignment, cost_val) in enumerate(zip(result.assignments, result.costs)):
153        row_ind = assignment.row_indices
154        col_ind = assignment.col_indices
155        print(f"\n  Solution {i + 1} (cost={cost_val:.1f}):")
156        print(f"    Assignments: {list(zip(row_ind, col_ind))}")
157
158    # With cost threshold
159    print("\n--- With Cost Threshold ---")
160    result_thresh = kbest_assign2d(cost, k=10, cost_threshold=20)
161    print(f"Assignments with cost <= 20: {result_thresh.n_found}")
162    for i, c in enumerate(result_thresh.costs):
163        print(f"  Solution {i + 1}: cost={c:.1f}")
164
165    # Ranked assignments (convenience function)
166    print("\n--- Ranked Assignment Enumeration ---")
167    ranked = ranked_assignments(cost, max_assignments=6)
168    print(f"Enumerated {ranked.n_found} assignments in order of increasing cost")
169    print(f"Cost range: [{ranked.costs[0]:.1f}, {ranked.costs[-1]:.1f}]")
170
171
172def demo_3d_assignment():
173    """Demonstrate 3D assignment algorithms."""
174    print("\n" + "=" * 70)
175    print("3D Assignment Algorithms Demo")
176    print("=" * 70)
177
178    # 3D assignment: associate measurements across 3 scans
179    # Cost tensor: cost[i, j, k] = cost of associating
180    #   measurement i from scan 1, j from scan 2, k from scan 3
181    np.random.seed(42)
182    n = 5  # 5 measurements per scan
183    cost = np.random.rand(n, n, n) * 10
184
185    # Add some low-cost "true" associations on the diagonal
186    for i in range(n):
187        cost[i, i, i] = np.random.rand() * 0.5
188
189    print(f"\nCost tensor shape: {cost.shape}")
190    print("(5 measurements per scan, 3 scans)")
191
192    # Greedy algorithm (fast but suboptimal)
193    print("\n--- Greedy Algorithm ---")
194    result_greedy = greedy_3d(cost)
195    print(f"Assignments found: {result_greedy.tuples.shape[0]}")
196    print(f"Total cost: {result_greedy.cost:.3f}")
197
198    # 2D decomposition method
199    print("\n--- 2D Decomposition Method ---")
200    result_decomp = decompose_to_2d(cost)
201    print(f"Assignments found: {result_decomp.tuples.shape[0]}")
202    print(f"Total cost: {result_decomp.cost:.3f}")
203
204    # Lagrangian relaxation (better quality)
205    print("\n--- Lagrangian Relaxation ---")
206    result_lagr = assign3d_lagrangian(cost, max_iter=100, tol=0.01)
207    print(f"Assignments found: {result_lagr.tuples.shape[0]}")
208    print(f"Total cost: {result_lagr.cost:.3f}")
209    print(f"Iterations: {result_lagr.n_iterations}")
210    print(f"Duality gap: {result_lagr.gap:.4f}")
211
212    # Auction algorithm
213    print("\n--- Auction Algorithm ---")
214    result_auct = assign3d_auction(cost, max_iter=500)
215    print(f"Assignments found: {result_auct.tuples.shape[0]}")
216    print(f"Total cost: {result_auct.cost:.3f}")
217    print(f"Iterations: {result_auct.n_iterations}")
218
219    # Unified interface
220    print("\n--- Unified assign3d() Interface ---")
221    for method in ["greedy", "decompose", "lagrangian"]:
222        result = assign3d(cost, method=method)
223        print(
224            f"  {method:12s}: cost={result.cost:.3f}, "
225            f"assignments={result.tuples.shape[0]}"
226        )
227
228    # Show the actual assignments from the best method
229    print("\n--- Best Solution Details ---")
230    best = result_lagr
231    print("Assignment tuples (scan1, scan2, scan3):")
232    for row in best.tuples:
233        i, j, k = row
234        print(f"  ({i}, {j}, {k}) -> cost={cost[i, j, k]:.3f}")
235
236
237def demo_gating():
238    """Demonstrate measurement gating."""
239    print("\n" + "=" * 70)
240    print("Measurement Gating Demo")
241    print("=" * 70)
242
243    # Track predicted state and covariance
244    track_pred = np.array([10.0, 20.0])  # 2D position
245    innovation_cov = np.array([[4.0, 0.5], [0.5, 2.0]])  # Innovation covariance
246
247    # Measurements (some close, some far)
248    measurements = np.array(
249        [
250            [10.5, 20.2],  # Very close
251            [11.0, 19.5],  # Close
252            [12.5, 22.0],  # Moderate
253            [8.0, 18.0],  # Moderate
254            [20.0, 30.0],  # Far
255            [5.0, 25.0],  # Far
256        ]
257    )
258
259    print(f"\nTrack predicted position: {track_pred}")
260    print(f"Innovation covariance:\n{innovation_cov}")
261    print(f"\nMeasurements:\n{measurements}")
262
263    # Compute Mahalanobis distances
264    print("\n--- Mahalanobis Distances ---")
265    for i, meas in enumerate(measurements):
266        innovation = meas - track_pred
267        dist = mahalanobis_distance(innovation, innovation_cov)
268        print(f"  Measurement {i}: distance = {dist:.3f}")
269
270    # Chi-squared gate threshold
271    n_dims = 2
272    gate_prob = 0.99
273    threshold = chi2_gate_threshold(gate_prob, n_dims)
274    print(f"\nGate threshold (99% probability, 2D): {threshold:.3f}")
275
276    # Ellipsoidal gating
277    print("\n--- Ellipsoidal Gating ---")
278    gated_indices = []
279    for i, meas in enumerate(measurements):
280        innovation = meas - track_pred
281        in_gate = ellipsoidal_gate(innovation, innovation_cov, threshold)
282        status = "PASS" if in_gate else "FAIL"
283        if in_gate:
284            gated_indices.append(i)
285        print(f"  Measurement {i}: {status}")
286    print(f"Gated measurement indices: {gated_indices}")
287
288    # Rectangular gating (simpler, less accurate)
289    print("\n--- Rectangular Gating ---")
290    for i, meas in enumerate(measurements):
291        innovation = meas - track_pred
292        in_gate = rectangular_gate(innovation, innovation_cov, num_sigmas=3.0)
293        status = "PASS" if in_gate else "FAIL"
294        print(f"  Measurement {i}: {status}")
295
296
297def demo_gnn_association():
298    """Demonstrate Global Nearest Neighbor association."""
299    print("\n" + "=" * 70)
300    print("Global Nearest Neighbor (GNN) Association Demo")
301    print("=" * 70)
302
303    # Scenario: 3 tracks, 4 measurements
304    np.random.seed(42)
305
306    # Track predicted positions (2D)
307    track_preds = np.array(
308        [
309            [10.0, 20.0],
310            [30.0, 40.0],
311            [50.0, 60.0],
312        ]
313    )
314
315    # Track covariances (stacked)
316    track_covs = np.array([np.eye(2) * 4.0 for _ in range(3)])
317
318    # Measurements (3 close to tracks, 1 false alarm)
319    measurements = np.array(
320        [
321            [10.5, 19.8],  # Close to track 0
322            [30.2, 40.5],  # Close to track 1
323            [49.5, 60.2],  # Close to track 2
324            [100.0, 100.0],  # False alarm (far from all tracks)
325        ]
326    )
327
328    print("Track positions:")
329    for i, pred in enumerate(track_preds):
330        print(f"  Track {i}: {pred}")
331
332    print("\nMeasurements:")
333    for i, meas in enumerate(measurements):
334        print(f"  Measurement {i}: {meas}")
335
336    # Compute cost matrix
337    print("\n--- Computing Cost Matrix ---")
338    cost_matrix = compute_association_cost(
339        track_preds,
340        track_covs,
341        measurements,
342    )
343    print("Cost matrix (tracks x measurements):")
344    print(np.array2string(cost_matrix, precision=2, suppress_small=True))
345
346    # GNN association using the cost matrix
347    print("\n--- GNN Association ---")
348    gate_threshold = chi2_gate_threshold(0.99, 2)
349    result = gnn_association(cost_matrix, gate_threshold=gate_threshold)
350
351    print(f"Track -> Measurement mapping: {list(result.track_to_measurement)}")
352    print(f"Measurement -> Track mapping: {list(result.measurement_to_track)}")
353    print(f"Total cost: {result.total_cost:.3f}")
354
355    # Interpret results
356    print("\nInterpretation:")
357    for track_idx, meas_idx in enumerate(result.track_to_measurement):
358        if meas_idx >= 0:
359            print(f"  Track {track_idx} <- Measurement {meas_idx}")
360        else:
361            print(f"  Track {track_idx} <- (no measurement)")
362
363    unassigned_meas = [i for i, t in enumerate(result.measurement_to_track) if t < 0]
364    if unassigned_meas:
365        print(f"  Unassigned measurements (false alarms): {unassigned_meas}")
366
367    # Gated GNN (combined gating + association)
368    print("\n--- Gated GNN Association ---")
369    result_gated = gated_gnn_association(
370        track_preds,
371        track_covs,
372        measurements,
373        gate_probability=0.99,
374    )
375    print(f"Track -> Measurement: {list(result_gated.track_to_measurement)}")
376    print(f"Total cost: {result_gated.total_cost:.3f}")
377
378
379def demo_jpda():
380    """Demonstrate Joint Probabilistic Data Association."""
381    print("\n" + "=" * 70)
382    print("JPDA (Joint Probabilistic Data Association) Demo")
383    print("=" * 70)
384
385    # Scenario: 2 closely-spaced tracks with ambiguous measurements
386    track_states = [
387        np.array([10.0, 0.0, 20.0, 0.0]),  # [x, vx, y, vy]
388        np.array([12.0, 0.0, 21.0, 0.0]),  # Close to track 0
389    ]
390
391    track_covs = [np.diag([2.0, 0.1, 2.0, 0.1]) for _ in range(2)]
392
393    # Measurements in the ambiguous region
394    measurements = np.array(
395        [
396            [10.5, 20.2],  # Could belong to track 0 or 1
397            [11.5, 20.8],  # Could belong to track 0 or 1
398            [50.0, 50.0],  # False alarm
399        ]
400    )
401
402    # Measurement model: H extracts [x, y] from state
403    H = np.array(
404        [
405            [1, 0, 0, 0],
406            [0, 0, 1, 0],
407        ]
408    )
409    R = np.eye(2) * 1.0
410
411    print("Track positions (closely spaced):")
412    for i, state in enumerate(track_states):
413        print(f"  Track {i}: position=({state[0]:.1f}, {state[2]:.1f})")
414
415    print("\nMeasurements:")
416    for i, meas in enumerate(measurements):
417        print(f"  Measurement {i}: {meas}")
418
419    # JPDA computes association probabilities
420    print("\n--- JPDA Association Probabilities ---")
421    result = jpda(
422        track_states,
423        track_covs,
424        measurements,
425        H=H,
426        R=R,
427        detection_prob=0.9,
428        clutter_density=1e-6,
429        gate_probability=0.99,
430    )
431
432    print("Association probability matrix (tracks x [measurements..., no-detect]):")
433    print("  Rows: tracks, Columns: [meas 0, meas 1, meas 2, no-detection]")
434    print(np.array2string(result.association_probs, precision=3, suppress_small=True))
435
436    print("\nInterpretation:")
437    n_tracks, n_cols = result.association_probs.shape
438    n_meas = n_cols - 1  # Last column is no-detection probability
439    for i in range(n_tracks):
440        print(f"  Track {i}:")
441        for j in range(n_meas):
442            prob = result.association_probs[i, j]
443            if prob > 0.01:  # Only show significant probabilities
444                print(f"    P(measurement {j}) = {prob:.3f}")
445        print(f"    P(no detection) = {result.association_probs[i, -1]:.3f}")
446
447
448def demo_tracking_scenario():
449    """Demonstrate a complete tracking scenario."""
450    print("\n" + "=" * 70)
451    print("Complete Tracking Scenario Demo")
452    print("=" * 70)
453
454    np.random.seed(42)
455
456    # Simulation: 3 targets, 10 time steps
457    n_targets = 3
458    n_steps = 10
459
460    # Initial target positions
461    targets = np.array(
462        [
463            [0.0, 0.0],
464            [50.0, 0.0],
465            [25.0, 43.3],  # Equilateral triangle
466        ]
467    )
468
469    # Target velocities
470    velocities = np.array(
471        [
472            [2.0, 1.0],
473            [-1.0, 2.0],
474            [0.0, -1.5],
475        ]
476    )
477
478    print(f"Simulating {n_targets} targets over {n_steps} time steps")
479    print("Initial positions:", targets.tolist())
480
481    # Track states (initially equal to true positions)
482    track_states = targets.copy()
483    track_covs = np.array([np.eye(2) * 10.0 for _ in range(n_targets)])
484
485    # Measurement noise
486    meas_std = 2.0
487
488    # Run simulation
489    assignment_history = []
490    for t in range(n_steps):
491        # Move targets
492        targets = targets + velocities
493
494        # Generate measurements (with some noise and false alarms)
495        measurements = targets + np.random.randn(n_targets, 2) * meas_std
496
497        # Add false alarms
498        n_false = np.random.poisson(1)  # Average 1 false alarm per scan
499        if n_false > 0:
500            false_alarms = np.random.rand(n_false, 2) * 100 - 25
501            measurements = np.vstack([measurements, false_alarms])
502
503        # Use gated_gnn_association which handles gating internally
504        result = gated_gnn_association(
505            track_states,
506            track_covs,
507            measurements,
508            gate_probability=0.99,
509        )
510
511        # Use track_to_measurement directly
512        track_to_meas = list(result.track_to_measurement)
513        assignment_history.append(track_to_meas)
514
515        # Update track states (simple: just use measurement if assigned)
516        for i, meas_idx in enumerate(track_to_meas):
517            if meas_idx >= 0:
518                # Blend prediction with measurement
519                alpha = 0.7  # Measurement weight
520                track_states[i] = alpha * measurements[meas_idx] + (1 - alpha) * (
521                    track_states[i] + velocities[i]
522                )
523            else:
524                # No measurement, just predict
525                track_states[i] = track_states[i] + velocities[i]
526
527    print("\nAssignment history (track -> measurement index):")
528    for t, assignments in enumerate(assignment_history):
529        print(f"  Step {t}: {assignments}")
530
531    print("\nFinal track positions:")
532    for i, state in enumerate(track_states):
533        true_pos = targets[i]
534        error = np.linalg.norm(state - true_pos)
535        print(f"  Track {i}: {state} (error: {error:.2f})")
536
537
538def main():
539    """Run all demonstrations."""
540    print("\n" + "#" * 70)
541    print("# PyTCL Assignment Algorithms Example")
542    print("#" * 70)
543
544    # Basic 2D assignment
545    demo_2d_assignment()
546
547    # K-best assignment (Murty)
548    demo_kbest_assignment()
549
550    # 3D assignment
551    demo_3d_assignment()
552
553    # Gating
554    demo_gating()
555
556    # Data association
557    demo_gnn_association()
558
559    # JPDA
560    demo_jpda()
561
562    # Complete scenario
563    demo_tracking_scenario()
564
565    # Visualization
566    visualize_assignment_problem()
567
568    print("\n" + "=" * 70)
569    print("Example complete!")
570    print("=" * 70)
571
572
573def visualize_assignment_problem():
574    """Visualize a 2D assignment problem with cost heatmap."""
575    print("\nGenerating cost matrix visualization...")
576
577    # Create a cost matrix
578    np.random.seed(42)
579    n_tracks = 5
580    n_measurements = 6
581    cost = np.random.randint(1, 20, (n_tracks, n_measurements)).astype(float)
582
583    # Solve assignment
584    track_indices, measurement_indices, assignment_cost = hungarian(cost)
585
586    # Create heatmap
587    fig = go.Figure(data=go.Heatmap(z=cost, colorscale="Viridis"))
588
589    # Add assignment lines
590    for t_idx, m_idx in zip(track_indices, measurement_indices):
591        fig.add_annotation(
592            x=m_idx,
593            y=t_idx,
594            text="✓",
595            showarrow=False,
596            font=dict(color="red", size=16),
597        )
598
599    fig.update_layout(
600        title="2D Assignment Problem: Cost Matrix with Optimal Assignments",
601        xaxis_title="Measurement Index",
602        yaxis_title="Track Index",
603        height=400,
604        width=600,
605    )
606
607    fig.show()
608
609
610if __name__ == "__main__":
611    main()

Running the Example

python examples/assignment_algorithms.py

See Also