Assignment Algorithms
This example demonstrates various assignment algorithms for data association.
Overview
Assignment algorithms solve the measurement-to-track association problem:
2D Assignment: One-to-one matching (Hungarian, Auction)
Multi-dimensional: S-D assignment for multi-sensor fusion
K-best: Finding multiple good assignments (Murty’s algorithm)
Algorithms
- Hungarian Algorithm (Kuhn-Munkres)
Optimal O(n³) solution for 2D assignment
Guaranteed minimum cost assignment
- Auction Algorithm
Bertsekas’ auction-based approach
Good for sparse cost matrices
Assignment in Action: The result of optimal assignment showing tracks correctly associated with measurements over time.
- Global Nearest Neighbor (GNN)
Fast suboptimal assignment
Uses gating for efficiency
- JPDA
Joint Probabilistic Data Association
Maintains association probabilities
- Murty’s K-best
Finds k best assignments
Used in MHT hypothesis management
Multi-Sensor Fusion: S-D assignment extends 2D methods to fuse measurements from multiple sensors.
Key Concepts
Cost matrix: Negative log-likelihood of associations
Gating: Statistical test to reduce candidates
Dummy assignments: Handling missed detections and clutter
Code Highlights
The example demonstrates:
Constructing cost matrices from track-measurement distances
Using
hungarian()for optimal assignmentauction_algorithm()for iterative biddingmurty_kbest()for ranked assignmentsjpda()for association probabilities
Source Code
1"""
2Assignment Algorithms Example
3=============================
4
5This example demonstrates the assignment algorithms in PyTCL:
6
72D Assignment:
8- Hungarian algorithm (optimal)
9- Auction algorithm
10- Linear sum assignment wrapper
11
12K-Best 2D Assignment (v0.17.0):
13- Murty's algorithm for finding k-best assignments
14- Ranked assignment enumeration with cost thresholds
15
163D Assignment (v0.17.0):
17- Lagrangian relaxation
18- Auction-based 3D assignment
19- Greedy assignment
20- 2D decomposition method
21
22Data Association:
23- Global Nearest Neighbor (GNN)
24- Gating (ellipsoidal and rectangular)
25- JPDA (Joint Probabilistic Data Association)
26
27These algorithms are fundamental for multi-target tracking, where
28measurements must be assigned to tracks optimally.
29"""
30
31import numpy as np
32import plotly.graph_objects as go
33from plotly.subplots import make_subplots
34
35from pytcl.assignment_algorithms import ( # 2D Assignment
36 assign2d,
37 assign3d,
38 assign3d_auction,
39 assign3d_lagrangian,
40 auction,
41 chi2_gate_threshold,
42 compute_association_cost,
43 decompose_to_2d,
44 ellipsoidal_gate,
45 gated_gnn_association,
46 gnn_association,
47 greedy_3d,
48 hungarian,
49 jpda,
50 kbest_assign2d,
51 mahalanobis_distance,
52 murty,
53 ranked_assignments,
54 rectangular_gate,
55)
56
57
58def demo_2d_assignment():
59 """Demonstrate basic 2D assignment algorithms."""
60 print("=" * 70)
61 print("2D Assignment Algorithms Demo")
62 print("=" * 70)
63
64 # Create a cost matrix: 4 tracks, 5 measurements
65 # Lower cost = better match
66 np.random.seed(42)
67 cost = np.array(
68 [
69 [10, 5, 13, 4, 8], # Track 0: best match is measurement 3
70 [3, 15, 8, 12, 6], # Track 1: best match is measurement 0
71 [12, 7, 9, 5, 11], # Track 2: best match is measurement 3 or 1
72 [8, 6, 4, 10, 3], # Track 3: best match is measurement 4
73 ],
74 dtype=float,
75 )
76
77 print("\nCost matrix (4 tracks x 5 measurements):")
78 print(cost)
79
80 # Hungarian algorithm (optimal)
81 row_h, col_h, cost_h = hungarian(cost)
82 print("\n--- Hungarian Algorithm (Optimal) ---")
83 print(f"Assignments: {list(zip(row_h, col_h))}")
84 print(f"Total cost: {cost_h}")
85
86 # Auction algorithm
87 row_a, col_a, cost_a = auction(cost)
88 print("\n--- Auction Algorithm ---")
89 print(f"Assignments: {list(zip(row_a, col_a))}")
90 print(f"Total cost: {cost_a}")
91
92 # Using assign2d unified interface
93 result = assign2d(cost)
94 print("\n--- assign2d() Interface ---")
95 print(f"Row indices: {result.row_indices}")
96 print(f"Col indices: {result.col_indices}")
97 print(f"Total cost: {result.cost}")
98
99 # Rectangular (non-square) assignment
100 print("\n--- Rectangular Assignment ---")
101 rect_cost = np.random.rand(3, 6) * 10 # 3 tracks, 6 measurements
102 row_rect, col_rect, cost_rect = hungarian(rect_cost)
103 print("3 tracks assigned to 6 measurements")
104 print(f"Assignments: {list(zip(row_rect, col_rect))}")
105 assigned_cols = set(col_rect)
106 unassigned_cols = [i for i in range(6) if i not in assigned_cols]
107 print(f"Unassigned measurements: {unassigned_cols}")
108
109 # With cost of non-assignment (allows skipping bad matches)
110 print("\n--- With Cost of Non-Assignment ---")
111 cost_with_skip = np.array(
112 [
113 [10, 100, 100],
114 [100, 5, 100],
115 [100, 100, 100], # Track 2 has no good matches
116 ],
117 dtype=float,
118 )
119 result_skip = assign2d(cost_with_skip, cost_of_non_assignment=20)
120 print("Cost matrix with one track having no good matches:")
121 print(cost_with_skip)
122 print(f"Assignments: {list(zip(result_skip.row_indices, result_skip.col_indices))}")
123 print(f"Unassigned rows: {list(result_skip.unassigned_rows)}")
124
125
126def demo_kbest_assignment():
127 """Demonstrate k-best 2D assignment (Murty's algorithm)."""
128 print("\n" + "=" * 70)
129 print("K-Best 2D Assignment Demo (Murty's Algorithm)")
130 print("=" * 70)
131
132 # Cost matrix
133 cost = np.array(
134 [
135 [10, 5, 13],
136 [3, 15, 8],
137 [12, 7, 9],
138 ],
139 dtype=float,
140 )
141
142 print("\nCost matrix (3x3):")
143 print(cost)
144
145 # Find k best assignments
146 k = 5
147 result = murty(cost, k=k)
148
149 print(f"\n--- Finding {k} Best Assignments ---")
150 print(f"Found: {result.n_found} assignments")
151
152 for i, (assignment, cost_val) in enumerate(zip(result.assignments, result.costs)):
153 row_ind = assignment.row_indices
154 col_ind = assignment.col_indices
155 print(f"\n Solution {i + 1} (cost={cost_val:.1f}):")
156 print(f" Assignments: {list(zip(row_ind, col_ind))}")
157
158 # With cost threshold
159 print("\n--- With Cost Threshold ---")
160 result_thresh = kbest_assign2d(cost, k=10, cost_threshold=20)
161 print(f"Assignments with cost <= 20: {result_thresh.n_found}")
162 for i, c in enumerate(result_thresh.costs):
163 print(f" Solution {i + 1}: cost={c:.1f}")
164
165 # Ranked assignments (convenience function)
166 print("\n--- Ranked Assignment Enumeration ---")
167 ranked = ranked_assignments(cost, max_assignments=6)
168 print(f"Enumerated {ranked.n_found} assignments in order of increasing cost")
169 print(f"Cost range: [{ranked.costs[0]:.1f}, {ranked.costs[-1]:.1f}]")
170
171
172def demo_3d_assignment():
173 """Demonstrate 3D assignment algorithms."""
174 print("\n" + "=" * 70)
175 print("3D Assignment Algorithms Demo")
176 print("=" * 70)
177
178 # 3D assignment: associate measurements across 3 scans
179 # Cost tensor: cost[i, j, k] = cost of associating
180 # measurement i from scan 1, j from scan 2, k from scan 3
181 np.random.seed(42)
182 n = 5 # 5 measurements per scan
183 cost = np.random.rand(n, n, n) * 10
184
185 # Add some low-cost "true" associations on the diagonal
186 for i in range(n):
187 cost[i, i, i] = np.random.rand() * 0.5
188
189 print(f"\nCost tensor shape: {cost.shape}")
190 print("(5 measurements per scan, 3 scans)")
191
192 # Greedy algorithm (fast but suboptimal)
193 print("\n--- Greedy Algorithm ---")
194 result_greedy = greedy_3d(cost)
195 print(f"Assignments found: {result_greedy.tuples.shape[0]}")
196 print(f"Total cost: {result_greedy.cost:.3f}")
197
198 # 2D decomposition method
199 print("\n--- 2D Decomposition Method ---")
200 result_decomp = decompose_to_2d(cost)
201 print(f"Assignments found: {result_decomp.tuples.shape[0]}")
202 print(f"Total cost: {result_decomp.cost:.3f}")
203
204 # Lagrangian relaxation (better quality)
205 print("\n--- Lagrangian Relaxation ---")
206 result_lagr = assign3d_lagrangian(cost, max_iter=100, tol=0.01)
207 print(f"Assignments found: {result_lagr.tuples.shape[0]}")
208 print(f"Total cost: {result_lagr.cost:.3f}")
209 print(f"Iterations: {result_lagr.n_iterations}")
210 print(f"Duality gap: {result_lagr.gap:.4f}")
211
212 # Auction algorithm
213 print("\n--- Auction Algorithm ---")
214 result_auct = assign3d_auction(cost, max_iter=500)
215 print(f"Assignments found: {result_auct.tuples.shape[0]}")
216 print(f"Total cost: {result_auct.cost:.3f}")
217 print(f"Iterations: {result_auct.n_iterations}")
218
219 # Unified interface
220 print("\n--- Unified assign3d() Interface ---")
221 for method in ["greedy", "decompose", "lagrangian"]:
222 result = assign3d(cost, method=method)
223 print(
224 f" {method:12s}: cost={result.cost:.3f}, "
225 f"assignments={result.tuples.shape[0]}"
226 )
227
228 # Show the actual assignments from the best method
229 print("\n--- Best Solution Details ---")
230 best = result_lagr
231 print("Assignment tuples (scan1, scan2, scan3):")
232 for row in best.tuples:
233 i, j, k = row
234 print(f" ({i}, {j}, {k}) -> cost={cost[i, j, k]:.3f}")
235
236
237def demo_gating():
238 """Demonstrate measurement gating."""
239 print("\n" + "=" * 70)
240 print("Measurement Gating Demo")
241 print("=" * 70)
242
243 # Track predicted state and covariance
244 track_pred = np.array([10.0, 20.0]) # 2D position
245 innovation_cov = np.array([[4.0, 0.5], [0.5, 2.0]]) # Innovation covariance
246
247 # Measurements (some close, some far)
248 measurements = np.array(
249 [
250 [10.5, 20.2], # Very close
251 [11.0, 19.5], # Close
252 [12.5, 22.0], # Moderate
253 [8.0, 18.0], # Moderate
254 [20.0, 30.0], # Far
255 [5.0, 25.0], # Far
256 ]
257 )
258
259 print(f"\nTrack predicted position: {track_pred}")
260 print(f"Innovation covariance:\n{innovation_cov}")
261 print(f"\nMeasurements:\n{measurements}")
262
263 # Compute Mahalanobis distances
264 print("\n--- Mahalanobis Distances ---")
265 for i, meas in enumerate(measurements):
266 innovation = meas - track_pred
267 dist = mahalanobis_distance(innovation, innovation_cov)
268 print(f" Measurement {i}: distance = {dist:.3f}")
269
270 # Chi-squared gate threshold
271 n_dims = 2
272 gate_prob = 0.99
273 threshold = chi2_gate_threshold(gate_prob, n_dims)
274 print(f"\nGate threshold (99% probability, 2D): {threshold:.3f}")
275
276 # Ellipsoidal gating
277 print("\n--- Ellipsoidal Gating ---")
278 gated_indices = []
279 for i, meas in enumerate(measurements):
280 innovation = meas - track_pred
281 in_gate = ellipsoidal_gate(innovation, innovation_cov, threshold)
282 status = "PASS" if in_gate else "FAIL"
283 if in_gate:
284 gated_indices.append(i)
285 print(f" Measurement {i}: {status}")
286 print(f"Gated measurement indices: {gated_indices}")
287
288 # Rectangular gating (simpler, less accurate)
289 print("\n--- Rectangular Gating ---")
290 for i, meas in enumerate(measurements):
291 innovation = meas - track_pred
292 in_gate = rectangular_gate(innovation, innovation_cov, num_sigmas=3.0)
293 status = "PASS" if in_gate else "FAIL"
294 print(f" Measurement {i}: {status}")
295
296
297def demo_gnn_association():
298 """Demonstrate Global Nearest Neighbor association."""
299 print("\n" + "=" * 70)
300 print("Global Nearest Neighbor (GNN) Association Demo")
301 print("=" * 70)
302
303 # Scenario: 3 tracks, 4 measurements
304 np.random.seed(42)
305
306 # Track predicted positions (2D)
307 track_preds = np.array(
308 [
309 [10.0, 20.0],
310 [30.0, 40.0],
311 [50.0, 60.0],
312 ]
313 )
314
315 # Track covariances (stacked)
316 track_covs = np.array([np.eye(2) * 4.0 for _ in range(3)])
317
318 # Measurements (3 close to tracks, 1 false alarm)
319 measurements = np.array(
320 [
321 [10.5, 19.8], # Close to track 0
322 [30.2, 40.5], # Close to track 1
323 [49.5, 60.2], # Close to track 2
324 [100.0, 100.0], # False alarm (far from all tracks)
325 ]
326 )
327
328 print("Track positions:")
329 for i, pred in enumerate(track_preds):
330 print(f" Track {i}: {pred}")
331
332 print("\nMeasurements:")
333 for i, meas in enumerate(measurements):
334 print(f" Measurement {i}: {meas}")
335
336 # Compute cost matrix
337 print("\n--- Computing Cost Matrix ---")
338 cost_matrix = compute_association_cost(
339 track_preds,
340 track_covs,
341 measurements,
342 )
343 print("Cost matrix (tracks x measurements):")
344 print(np.array2string(cost_matrix, precision=2, suppress_small=True))
345
346 # GNN association using the cost matrix
347 print("\n--- GNN Association ---")
348 gate_threshold = chi2_gate_threshold(0.99, 2)
349 result = gnn_association(cost_matrix, gate_threshold=gate_threshold)
350
351 print(f"Track -> Measurement mapping: {list(result.track_to_measurement)}")
352 print(f"Measurement -> Track mapping: {list(result.measurement_to_track)}")
353 print(f"Total cost: {result.total_cost:.3f}")
354
355 # Interpret results
356 print("\nInterpretation:")
357 for track_idx, meas_idx in enumerate(result.track_to_measurement):
358 if meas_idx >= 0:
359 print(f" Track {track_idx} <- Measurement {meas_idx}")
360 else:
361 print(f" Track {track_idx} <- (no measurement)")
362
363 unassigned_meas = [i for i, t in enumerate(result.measurement_to_track) if t < 0]
364 if unassigned_meas:
365 print(f" Unassigned measurements (false alarms): {unassigned_meas}")
366
367 # Gated GNN (combined gating + association)
368 print("\n--- Gated GNN Association ---")
369 result_gated = gated_gnn_association(
370 track_preds,
371 track_covs,
372 measurements,
373 gate_probability=0.99,
374 )
375 print(f"Track -> Measurement: {list(result_gated.track_to_measurement)}")
376 print(f"Total cost: {result_gated.total_cost:.3f}")
377
378
379def demo_jpda():
380 """Demonstrate Joint Probabilistic Data Association."""
381 print("\n" + "=" * 70)
382 print("JPDA (Joint Probabilistic Data Association) Demo")
383 print("=" * 70)
384
385 # Scenario: 2 closely-spaced tracks with ambiguous measurements
386 track_states = [
387 np.array([10.0, 0.0, 20.0, 0.0]), # [x, vx, y, vy]
388 np.array([12.0, 0.0, 21.0, 0.0]), # Close to track 0
389 ]
390
391 track_covs = [np.diag([2.0, 0.1, 2.0, 0.1]) for _ in range(2)]
392
393 # Measurements in the ambiguous region
394 measurements = np.array(
395 [
396 [10.5, 20.2], # Could belong to track 0 or 1
397 [11.5, 20.8], # Could belong to track 0 or 1
398 [50.0, 50.0], # False alarm
399 ]
400 )
401
402 # Measurement model: H extracts [x, y] from state
403 H = np.array(
404 [
405 [1, 0, 0, 0],
406 [0, 0, 1, 0],
407 ]
408 )
409 R = np.eye(2) * 1.0
410
411 print("Track positions (closely spaced):")
412 for i, state in enumerate(track_states):
413 print(f" Track {i}: position=({state[0]:.1f}, {state[2]:.1f})")
414
415 print("\nMeasurements:")
416 for i, meas in enumerate(measurements):
417 print(f" Measurement {i}: {meas}")
418
419 # JPDA computes association probabilities
420 print("\n--- JPDA Association Probabilities ---")
421 result = jpda(
422 track_states,
423 track_covs,
424 measurements,
425 H=H,
426 R=R,
427 detection_prob=0.9,
428 clutter_density=1e-6,
429 gate_probability=0.99,
430 )
431
432 print("Association probability matrix (tracks x [measurements..., no-detect]):")
433 print(" Rows: tracks, Columns: [meas 0, meas 1, meas 2, no-detection]")
434 print(np.array2string(result.association_probs, precision=3, suppress_small=True))
435
436 print("\nInterpretation:")
437 n_tracks, n_cols = result.association_probs.shape
438 n_meas = n_cols - 1 # Last column is no-detection probability
439 for i in range(n_tracks):
440 print(f" Track {i}:")
441 for j in range(n_meas):
442 prob = result.association_probs[i, j]
443 if prob > 0.01: # Only show significant probabilities
444 print(f" P(measurement {j}) = {prob:.3f}")
445 print(f" P(no detection) = {result.association_probs[i, -1]:.3f}")
446
447
448def demo_tracking_scenario():
449 """Demonstrate a complete tracking scenario."""
450 print("\n" + "=" * 70)
451 print("Complete Tracking Scenario Demo")
452 print("=" * 70)
453
454 np.random.seed(42)
455
456 # Simulation: 3 targets, 10 time steps
457 n_targets = 3
458 n_steps = 10
459
460 # Initial target positions
461 targets = np.array(
462 [
463 [0.0, 0.0],
464 [50.0, 0.0],
465 [25.0, 43.3], # Equilateral triangle
466 ]
467 )
468
469 # Target velocities
470 velocities = np.array(
471 [
472 [2.0, 1.0],
473 [-1.0, 2.0],
474 [0.0, -1.5],
475 ]
476 )
477
478 print(f"Simulating {n_targets} targets over {n_steps} time steps")
479 print("Initial positions:", targets.tolist())
480
481 # Track states (initially equal to true positions)
482 track_states = targets.copy()
483 track_covs = np.array([np.eye(2) * 10.0 for _ in range(n_targets)])
484
485 # Measurement noise
486 meas_std = 2.0
487
488 # Run simulation
489 assignment_history = []
490 for t in range(n_steps):
491 # Move targets
492 targets = targets + velocities
493
494 # Generate measurements (with some noise and false alarms)
495 measurements = targets + np.random.randn(n_targets, 2) * meas_std
496
497 # Add false alarms
498 n_false = np.random.poisson(1) # Average 1 false alarm per scan
499 if n_false > 0:
500 false_alarms = np.random.rand(n_false, 2) * 100 - 25
501 measurements = np.vstack([measurements, false_alarms])
502
503 # Use gated_gnn_association which handles gating internally
504 result = gated_gnn_association(
505 track_states,
506 track_covs,
507 measurements,
508 gate_probability=0.99,
509 )
510
511 # Use track_to_measurement directly
512 track_to_meas = list(result.track_to_measurement)
513 assignment_history.append(track_to_meas)
514
515 # Update track states (simple: just use measurement if assigned)
516 for i, meas_idx in enumerate(track_to_meas):
517 if meas_idx >= 0:
518 # Blend prediction with measurement
519 alpha = 0.7 # Measurement weight
520 track_states[i] = alpha * measurements[meas_idx] + (1 - alpha) * (
521 track_states[i] + velocities[i]
522 )
523 else:
524 # No measurement, just predict
525 track_states[i] = track_states[i] + velocities[i]
526
527 print("\nAssignment history (track -> measurement index):")
528 for t, assignments in enumerate(assignment_history):
529 print(f" Step {t}: {assignments}")
530
531 print("\nFinal track positions:")
532 for i, state in enumerate(track_states):
533 true_pos = targets[i]
534 error = np.linalg.norm(state - true_pos)
535 print(f" Track {i}: {state} (error: {error:.2f})")
536
537
538def main():
539 """Run all demonstrations."""
540 print("\n" + "#" * 70)
541 print("# PyTCL Assignment Algorithms Example")
542 print("#" * 70)
543
544 # Basic 2D assignment
545 demo_2d_assignment()
546
547 # K-best assignment (Murty)
548 demo_kbest_assignment()
549
550 # 3D assignment
551 demo_3d_assignment()
552
553 # Gating
554 demo_gating()
555
556 # Data association
557 demo_gnn_association()
558
559 # JPDA
560 demo_jpda()
561
562 # Complete scenario
563 demo_tracking_scenario()
564
565 # Visualization
566 visualize_assignment_problem()
567
568 print("\n" + "=" * 70)
569 print("Example complete!")
570 print("=" * 70)
571
572
573def visualize_assignment_problem():
574 """Visualize a 2D assignment problem with cost heatmap."""
575 print("\nGenerating cost matrix visualization...")
576
577 # Create a cost matrix
578 np.random.seed(42)
579 n_tracks = 5
580 n_measurements = 6
581 cost = np.random.randint(1, 20, (n_tracks, n_measurements)).astype(float)
582
583 # Solve assignment
584 track_indices, measurement_indices, assignment_cost = hungarian(cost)
585
586 # Create heatmap
587 fig = go.Figure(data=go.Heatmap(z=cost, colorscale="Viridis"))
588
589 # Add assignment lines
590 for t_idx, m_idx in zip(track_indices, measurement_indices):
591 fig.add_annotation(
592 x=m_idx,
593 y=t_idx,
594 text="✓",
595 showarrow=False,
596 font=dict(color="red", size=16),
597 )
598
599 fig.update_layout(
600 title="2D Assignment Problem: Cost Matrix with Optimal Assignments",
601 xaxis_title="Measurement Index",
602 yaxis_title="Track Index",
603 height=400,
604 width=600,
605 )
606
607 fig.show()
608
609
610if __name__ == "__main__":
611 main()
Running the Example
python examples/assignment_algorithms.py
See Also
Multi-Target Tracking - Using assignment in MTT
Performance Evaluation - Evaluating association quality