Network Flow for Assignment Problems
This notebook demonstrates network flow techniques for solving assignment problems in tracking applications. We cover:
Assignment Problem Basics - Matching measurements to tracks
Network Flow Formulation - Converting assignments to flow networks
Successive Shortest Paths - Finding optimal flow
Dijkstra-Optimized Algorithm - High-performance implementation
Comparison with Hungarian Algorithm - Performance trade-offs
Prerequisites
pip install nrl-tracker plotly numpy scipy networkx
[1]:
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import networkx as nx
import time
from pytcl.assignment_algorithms.network_flow import (
FlowEdge, FlowStatus,
assignment_to_flow_network,
min_cost_flow_successive_shortest_paths,
min_cost_flow_simplex,
assignment_from_flow_solution,
min_cost_assignment_via_flow,
)
from pytcl.assignment_algorithms import hungarian
np.random.seed(42)
# Plotly dark theme template
dark_template = go.layout.Template()
dark_template.layout = go.Layout(
paper_bgcolor='#0d1117',
plot_bgcolor='#0d1117',
font=dict(color='#e6edf3'),
xaxis=dict(gridcolor='#30363d', zerolinecolor='#30363d'),
yaxis=dict(gridcolor='#30363d', zerolinecolor='#30363d'),
)
1. The Assignment Problem in Tracking
In multi-target tracking, each time step requires associating:
Tracks: Existing estimates of target states
Measurements: New sensor observations
The assignment problem finds the optimal one-to-one matching that minimizes total cost.
Problem Formulation
Given:
\(m\) tracks and \(n\) measurements
Cost matrix \(C\) where \(C_{ij}\) is cost of assigning track \(i\) to measurement \(j\)
Find: Assignment matrix \(X\) where \(X_{ij} \in \{0, 1\}\) minimizing:
Subject to:
[2]:
# Example: 3 tracks, 4 measurements
# Cost based on distance/likelihood
# Track positions
tracks = np.array([
[0, 0], # Track 0 at origin
[5, 0], # Track 1 at (5, 0)
[2.5, 4], # Track 2 at (2.5, 4)
])
# Measurement positions
measurements = np.array([
[0.1, 0.2], # Measurement 0 near Track 0
[4.8, -0.1], # Measurement 1 near Track 1
[2.6, 4.1], # Measurement 2 near Track 2
[10, 10], # Measurement 3 (spurious/clutter)
])
# Compute cost matrix (squared distance)
n_tracks = len(tracks)
n_meas = len(measurements)
cost_matrix = np.zeros((n_tracks, n_meas))
for i in range(n_tracks):
for j in range(n_meas):
cost_matrix[i, j] = np.sum((tracks[i] - measurements[j])**2)
print("Cost Matrix (squared distance):")
print(f"{'':10s}", end='')
for j in range(n_meas):
print(f"Meas {j:2d} ", end='')
print()
for i in range(n_tracks):
print(f"Track {i:2d} ", end='')
for j in range(n_meas):
print(f"{cost_matrix[i, j]:7.2f} ", end='')
print()
Cost Matrix (squared distance):
Meas 0 Meas 1 Meas 2 Meas 3
Track 0 0.05 23.05 23.57 200.00
Track 1 24.05 0.05 22.57 125.00
Track 2 20.20 22.10 0.02 92.25
[3]:
# Visualize tracks and measurements
fig = go.Figure()
# Plot tracks
fig.add_trace(
go.Scatter(x=tracks[:, 0], y=tracks[:, 1], mode='markers+text',
marker=dict(size=16, color='#00d4ff', symbol='square'),
text=[f'T{i}' for i in range(len(tracks))],
textposition='top right', textfont=dict(color='#00d4ff'),
name='Tracks')
)
# Plot measurements
fig.add_trace(
go.Scatter(x=measurements[:, 0], y=measurements[:, 1], mode='markers+text',
marker=dict(size=16, color='#ff4757', symbol='circle'),
text=[f'M{j}' for j in range(len(measurements))],
textposition='bottom right', textfont=dict(color='#ff4757'),
name='Measurements')
)
# Draw all possible associations (faint lines)
for i in range(n_tracks):
for j in range(n_meas):
fig.add_trace(
go.Scatter(x=[tracks[i, 0], measurements[j, 0]],
y=[tracks[i, 1], measurements[j, 1]],
mode='lines', line=dict(color='gray', width=0.5),
opacity=0.2, showlegend=False, hoverinfo='skip')
)
fig.update_layout(
template=dark_template,
title='Track-to-Measurement Assignment Problem',
xaxis_title='X',
yaxis_title='Y',
height=500,
xaxis=dict(scaleanchor='y', scaleratio=1),
)
fig.show()
Data type cannot be displayed: application/vnd.plotly.v1+json
2. Network Flow Formulation
The assignment problem can be converted to a min-cost max-flow network:
c[0,0] c[0,1] c[0,2] c[0,3]
Worker 0 ─────> Task 0 ──> Task 1 ──> Task 2 ──> Task 3
/ \ /
/ c[1,0] c[1,1] c[1,2] / c[1,3]
Source ─> W1 ─────────────────────────────────────────────
\ c[2,0] c[2,1] c[2,2] c[2,3]
\ Worker 2 ────────────────────────────────────> Sink
Network Structure
Source: Supplies \(m\) units of flow (one per track)
Worker nodes: One per track, connected to source with capacity 1
Task nodes: One per measurement, connected to all workers with capacity 1
Sink: Demands \(m\) units of flow
Edge costs: Assignment costs \(C_{ij}\)
[4]:
# Convert to flow network
edges, supplies, node_names = assignment_to_flow_network(cost_matrix)
print("Network Structure")
print("=" * 60)
print(f"Number of nodes: {len(supplies)}")
print(f"Number of edges: {len(edges)}")
print(f"\nNodes: {list(node_names)}")
print(f"\nSupplies/Demands:")
for i, (name, supply) in enumerate(zip(node_names, supplies)):
if supply > 0:
print(f" {name}: supplies {supply:.0f}")
elif supply < 0:
print(f" {name}: demands {-supply:.0f}")
print(f"\nEdges with costs:")
for edge in edges[:8]: # Show first 8 edges
from_name = node_names[edge.from_node]
to_name = node_names[edge.to_node]
print(f" {from_name} -> {to_name}: capacity={edge.capacity:.0f}, cost={edge.cost:.2f}")
print(f" ... ({len(edges) - 8} more edges)")
Network Structure
============================================================
Number of nodes: 9
Number of edges: 19
Nodes: [np.str_('source'), np.str_('worker_0'), np.str_('worker_1'), np.str_('worker_2'), np.str_('task_0'), np.str_('task_1'), np.str_('task_2'), np.str_('task_3'), np.str_('sink')]
Supplies/Demands:
source: supplies 3
sink: demands 3
Edges with costs:
source -> worker_0: capacity=1, cost=0.00
source -> worker_1: capacity=1, cost=0.00
source -> worker_2: capacity=1, cost=0.00
worker_0 -> task_0: capacity=1, cost=0.05
worker_0 -> task_1: capacity=1, cost=23.05
worker_0 -> task_2: capacity=1, cost=23.57
worker_0 -> task_3: capacity=1, cost=200.00
worker_1 -> task_0: capacity=1, cost=24.05
... (11 more edges)
[5]:
# Visualize the flow network
fig = go.Figure()
# Node positions
pos = {}
pos['source'] = (-2, 2)
pos['sink'] = (8, 2)
# Worker nodes on left
for i in range(n_tracks):
pos[f'worker_{i}'] = (0, 4 - i * 2)
# Task nodes on right
for j in range(n_meas):
pos[f'task_{j}'] = (6, 4.5 - j * 1.5)
# Draw edges first (so nodes appear on top)
for edge in edges:
from_name = node_names[edge.from_node]
to_name = node_names[edge.to_node]
x1, y1 = pos[from_name]
x2, y2 = pos[to_name]
fig.add_trace(
go.Scatter(x=[x1, x2], y=[y1, y2], mode='lines',
line=dict(color='gray', width=1), opacity=0.5,
showlegend=False, hoverinfo='skip')
)
# Add cost label for worker->task edges
if 'worker' in from_name and 'task' in to_name:
mid_x, mid_y = (x1 + x2) / 2, (y1 + y2) / 2
fig.add_annotation(x=mid_x, y=mid_y, text=f'{edge.cost:.1f}',
showarrow=False, font=dict(size=8, color='#00d4ff'))
# Draw nodes
node_colors = {'source': '#00ff88', 'sink': '#ffb800'}
for name in node_names:
x, y = pos[name]
color = node_colors.get(name, '#00d4ff' if 'worker' in name else '#ff4757')
display_name = name.replace('worker_', 'W').replace('task_', 'T')
fig.add_trace(
go.Scatter(x=[x], y=[y], mode='markers+text',
marker=dict(size=30, color=color, line=dict(color='white', width=2)),
text=[display_name], textposition='middle center',
textfont=dict(size=10, color='white'),
name=name, showlegend=False)
)
# Add annotations
fig.add_annotation(x=-2.5, y=5, text='Supply=3', showarrow=False,
font=dict(color='#00ff88', size=12))
fig.add_annotation(x=8, y=5, text='Demand=3', showarrow=False,
font=dict(color='#ffb800', size=12))
fig.update_layout(
template=dark_template,
title='Min-Cost Flow Network for Assignment',
xaxis=dict(range=[-3.5, 10], showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(range=[-1, 6], showgrid=False, zeroline=False, showticklabels=False),
height=500,
)
fig.show()
Data type cannot be displayed: application/vnd.plotly.v1+json
3. Successive Shortest Paths Algorithm
The successive shortest paths algorithm finds the minimum cost flow:
Algorithm
while supply nodes have remaining supply:
1. Find shortest path from supply to demand node
2. Compute maximum flow that can be pushed
3. Push flow and update residual capacities
4. Update node supplies/demands
Complexity
Bellman-Ford: \(O(k \cdot V \cdot E)\) where \(k\) = number of paths
Dijkstra with potentials: \(O(k \cdot E \log V)\)
[6]:
# Solve using successive shortest paths
result_bf = min_cost_flow_successive_shortest_paths(edges, supplies)
print("Bellman-Ford Based Solution")
print("=" * 50)
print(f"Status: {result_bf.status.name}")
print(f"Total Cost: {result_bf.cost:.2f}")
print(f"Iterations: {result_bf.iterations}")
# Extract assignment
assignment_bf, cost_bf = assignment_from_flow_solution(
result_bf.flow, edges, cost_matrix.shape
)
print(f"\nOptimal Assignment:")
for row_idx, col_idx in assignment_bf:
print(f" Track {row_idx} -> Measurement {col_idx} (cost: {cost_matrix[row_idx, col_idx]:.2f})")
Bellman-Ford Based Solution
==================================================
Status: OPTIMAL
Total Cost: -272.98
Iterations: 3
Optimal Assignment:
Track 0 -> Measurement 0 (cost: 0.05)
Track 0 -> Measurement 1 (cost: 23.05)
Track 0 -> Measurement 2 (cost: 23.57)
Track 1 -> Measurement 3 (cost: 125.00)
Track 2 -> Measurement 2 (cost: 0.02)
[7]:
# Solve using Dijkstra-optimized algorithm
result_dijkstra = min_cost_flow_simplex(edges, supplies)
print("Dijkstra-Optimized Solution")
print("=" * 50)
print(f"Status: {result_dijkstra.status.name}")
print(f"Total Cost: {result_dijkstra.cost:.2f}")
print(f"Iterations: {result_dijkstra.iterations}")
# Extract assignment
assignment_dij, cost_dij = assignment_from_flow_solution(
result_dijkstra.flow, edges, cost_matrix.shape
)
print(f"\nOptimal Assignment:")
for row_idx, col_idx in assignment_dij:
print(f" Track {row_idx} -> Measurement {col_idx} (cost: {cost_matrix[row_idx, col_idx]:.2f})")
Dijkstra-Optimized Solution
==================================================
Status: OPTIMAL
Total Cost: 0.12
Iterations: 4
Optimal Assignment:
Track 0 -> Measurement 0 (cost: 0.05)
Track 1 -> Measurement 1 (cost: 0.05)
Track 2 -> Measurement 2 (cost: 0.02)
[8]:
# Visualize the optimal assignment
fig = go.Figure()
# Plot tracks
fig.add_trace(
go.Scatter(x=tracks[:, 0], y=tracks[:, 1], mode='markers+text',
marker=dict(size=16, color='#00d4ff', symbol='square'),
text=[f'T{i}' for i in range(len(tracks))],
textposition='top right', textfont=dict(color='#00d4ff'),
name='Tracks')
)
# Plot measurements
fig.add_trace(
go.Scatter(x=measurements[:, 0], y=measurements[:, 1], mode='markers+text',
marker=dict(size=16, color='#ff4757', symbol='circle'),
text=[f'M{j}' for j in range(len(measurements))],
textposition='bottom right', textfont=dict(color='#ff4757'),
name='Measurements')
)
# Draw optimal assignments with thick green lines
for row_idx, col_idx in assignment_dij:
fig.add_trace(
go.Scatter(x=[tracks[row_idx, 0], measurements[col_idx, 0]],
y=[tracks[row_idx, 1], measurements[col_idx, 1]],
mode='lines', line=dict(color='#00ff88', width=4),
name='Assignment' if row_idx == assignment_dij[0, 0] else None,
showlegend=bool(row_idx == assignment_dij[0, 0]))
)
# Highlight unassigned measurement
assigned_meas = set(assignment_dij[:, 1])
for j in range(n_meas):
if j not in assigned_meas:
fig.add_trace(
go.Scatter(x=[measurements[j, 0]], y=[measurements[j, 1]],
mode='markers',
marker=dict(size=25, color='rgba(0,0,0,0)',
line=dict(color='#a855f7', width=3)),
name='Unassigned')
)
fig.update_layout(
template=dark_template,
title=f'Optimal Assignment (Total Cost: {cost_dij:.2f})',
xaxis_title='X',
yaxis_title='Y',
height=500,
xaxis=dict(scaleanchor='y', scaleratio=1),
)
fig.show()
Data type cannot be displayed: application/vnd.plotly.v1+json
4. Performance Comparison
Let’s compare the performance of different algorithms:
Network flow (Bellman-Ford)
Network flow (Dijkstra-optimized)
Hungarian algorithm
[9]:
# Performance benchmark
sizes = [10, 20, 50, 100, 200]
bf_times = []
dijkstra_times = []
hungarian_times = []
n_trials = 5
for size in sizes:
# Generate random cost matrix
cost = np.random.rand(size, size) * 100
# Bellman-Ford based (limit to smaller sizes)
if size <= 50:
edges_tmp, supplies_tmp, _ = assignment_to_flow_network(cost)
times = []
for _ in range(n_trials):
start = time.time()
result = min_cost_flow_successive_shortest_paths(edges_tmp, supplies_tmp)
times.append(time.time() - start)
bf_times.append(np.mean(times))
else:
bf_times.append(np.nan)
# Dijkstra-optimized
edges_tmp, supplies_tmp, _ = assignment_to_flow_network(cost)
times = []
for _ in range(n_trials):
start = time.time()
result = min_cost_flow_simplex(edges_tmp, supplies_tmp)
times.append(time.time() - start)
dijkstra_times.append(np.mean(times))
# Hungarian algorithm
times = []
for _ in range(n_trials):
start = time.time()
row_ind, col_ind, total_cost = hungarian(cost)
times.append(time.time() - start)
hungarian_times.append(np.mean(times))
print(f"Size {size:3d}x{size:3d}: BF={bf_times[-1]*1e3:7.2f}ms, "
f"Dijkstra={dijkstra_times[-1]*1e3:7.2f}ms, "
f"Hungarian={hungarian_times[-1]*1e3:7.2f}ms")
Size 10x 10: BF= 1.40ms, Dijkstra= 0.95ms, Hungarian= 0.03ms
Size 20x 20: BF= 8.95ms, Dijkstra= 5.37ms, Hungarian= 0.04ms
Size 50x 50: BF= 119.24ms, Dijkstra= 68.82ms, Hungarian= 0.03ms
Size 100x100: BF= nanms, Dijkstra= 529.98ms, Hungarian= 0.10ms
Size 200x200: BF= nanms, Dijkstra=4025.68ms, Hungarian= 0.96ms
[10]:
# Visualize performance comparison
fig = go.Figure()
fig.add_trace(
go.Scatter(x=sizes, y=np.array(bf_times)*1e3, mode='lines+markers',
name='Bellman-Ford', line=dict(color='#ff4757', width=2),
marker=dict(size=8))
)
fig.add_trace(
go.Scatter(x=sizes, y=np.array(dijkstra_times)*1e3, mode='lines+markers',
name='Dijkstra-optimized', line=dict(color='#00ff88', width=2),
marker=dict(size=8, symbol='square'))
)
fig.add_trace(
go.Scatter(x=sizes, y=np.array(hungarian_times)*1e3, mode='lines+markers',
name='Hungarian', line=dict(color='#00d4ff', width=2),
marker=dict(size=8, symbol='triangle-up'))
)
fig.update_layout(
template=dark_template,
title='Assignment Algorithm Performance Comparison',
xaxis_title='Problem Size (n x n)',
yaxis_title='Time (ms)',
yaxis_type='log',
height=450,
)
fig.show()
Data type cannot be displayed: application/vnd.plotly.v1+json
5. Application: Multi-Target Tracking
Let’s demonstrate using network flow for a complete tracking scenario with:
Gate-based cost computation
Missed detection handling
New track initiation
[11]:
# Simulate tracking scenario
n_timesteps = 10
n_tracks = 5
# True target trajectories
np.random.seed(42)
true_positions = []
positions = np.random.randn(n_tracks, 2) * 5 + 10
velocities = np.random.randn(n_tracks, 2) * 0.5
for t in range(n_timesteps):
true_positions.append(positions.copy())
positions += velocities + np.random.randn(n_tracks, 2) * 0.1
# Generate measurements with noise and clutter
detection_prob = 0.9
clutter_rate = 2 # average clutter per frame
measurement_noise = 0.3
all_measurements = []
all_truth_associations = []
for t in range(n_timesteps):
frame_meas = []
frame_assoc = []
# Target detections
for i in range(n_tracks):
if np.random.rand() < detection_prob:
meas = true_positions[t][i] + np.random.randn(2) * measurement_noise
frame_meas.append(meas)
frame_assoc.append(i)
# Clutter
n_clutter = np.random.poisson(clutter_rate)
for _ in range(n_clutter):
clutter = np.random.rand(2) * 30 # Uniform in surveillance region
frame_meas.append(clutter)
frame_assoc.append(-1) # -1 indicates clutter
all_measurements.append(np.array(frame_meas))
all_truth_associations.append(np.array(frame_assoc))
print(f"Generated {n_timesteps} frames with {n_tracks} targets")
for t in range(3):
n_det = np.sum(all_truth_associations[t] >= 0)
n_clut = np.sum(all_truth_associations[t] < 0)
print(f" Frame {t}: {n_det} detections, {n_clut} clutter")
Generated 10 frames with 5 targets
Frame 0: 5 detections, 2 clutter
Frame 1: 5 detections, 4 clutter
Frame 2: 5 detections, 2 clutter
[12]:
def compute_gated_cost_matrix(track_positions, measurements, gate_threshold=10.0):
"""
Compute cost matrix with gating.
Parameters
----------
track_positions : ndarray
Track predicted positions [n_tracks, 2].
measurements : ndarray
Measurement positions [n_meas, 2].
gate_threshold : float
Maximum squared distance for valid association.
Returns
-------
cost_matrix : ndarray
Cost matrix with inf for gated-out associations.
"""
n_tracks = len(track_positions)
n_meas = len(measurements)
cost_matrix = np.full((n_tracks, n_meas), np.inf)
for i in range(n_tracks):
for j in range(n_meas):
dist_sq = np.sum((track_positions[i] - measurements[j])**2)
if dist_sq < gate_threshold:
cost_matrix[i, j] = dist_sq
return cost_matrix
# Run tracking with network flow assignment
# Initialize tracks at frame 0
track_estimates = true_positions[0].copy()
assignment_results = []
total_correct = 0
total_assignments = 0
for t in range(1, n_timesteps):
meas = all_measurements[t]
truth_assoc = all_truth_associations[t]
if len(meas) == 0:
continue
# Compute gated cost matrix
cost = compute_gated_cost_matrix(track_estimates, meas, gate_threshold=4.0)
# Replace inf with large value for network flow
max_cost = 1e6
cost_finite = np.where(np.isinf(cost), max_cost, cost)
# Solve assignment
assignment, total_cost = min_cost_assignment_via_flow(cost_finite)
# Evaluate accuracy
for row_idx, col_idx in assignment:
if cost[row_idx, col_idx] < max_cost: # Valid assignment
total_assignments += 1
if truth_assoc[col_idx] == row_idx: # Correct assignment
total_correct += 1
# Update track estimates with assigned measurements
for row_idx, col_idx in assignment:
if cost[row_idx, col_idx] < max_cost:
track_estimates[row_idx] = meas[col_idx]
assignment_results.append((t, assignment, cost))
accuracy = total_correct / total_assignments if total_assignments > 0 else 0
print(f"\nTracking Results:")
print(f" Total assignments: {total_assignments}")
print(f" Correct associations: {total_correct}")
print(f" Accuracy: {accuracy*100:.1f}%")
Tracking Results:
Total assignments: 32
Correct associations: 32
Accuracy: 100.0%
[13]:
# Visualize tracking results
fig = go.Figure()
colors = ['#00d4ff', '#ff4757', '#00ff88', '#ffb800', '#a855f7']
# Plot true trajectories
for i in range(n_tracks):
traj = np.array([true_positions[t][i] for t in range(n_timesteps)])
fig.add_trace(
go.Scatter(x=traj[:, 0], y=traj[:, 1], mode='lines',
line=dict(color=colors[i % len(colors)], width=2),
name=f'Target {i}' if i < 3 else None,
showlegend=(i < 3), opacity=0.7)
)
# Start and end markers
fig.add_trace(
go.Scatter(x=[traj[0, 0]], y=[traj[0, 1]], mode='markers',
marker=dict(size=10, color=colors[i % len(colors)], symbol='circle'),
showlegend=False)
)
fig.add_trace(
go.Scatter(x=[traj[-1, 0]], y=[traj[-1, 1]], mode='markers',
marker=dict(size=10, color=colors[i % len(colors)], symbol='square'),
showlegend=False)
)
# Plot measurements (detections and clutter)
all_det_x, all_det_y = [], []
all_clut_x, all_clut_y = [], []
for t in range(n_timesteps):
meas = all_measurements[t]
truth = all_truth_associations[t]
for j in range(len(meas)):
if truth[j] >= 0:
all_det_x.append(meas[j, 0])
all_det_y.append(meas[j, 1])
else:
all_clut_x.append(meas[j, 0])
all_clut_y.append(meas[j, 1])
fig.add_trace(
go.Scatter(x=all_det_x, y=all_det_y, mode='markers',
marker=dict(size=4, color='white', opacity=0.4),
name='Detections')
)
fig.add_trace(
go.Scatter(x=all_clut_x, y=all_clut_y, mode='markers',
marker=dict(size=4, color='gray', symbol='x', opacity=0.5),
name='Clutter')
)
fig.update_layout(
template=dark_template,
title=f'Multi-Target Tracking with Network Flow Assignment (Accuracy: {accuracy*100:.1f}%)',
xaxis_title='X',
yaxis_title='Y',
height=550,
)
fig.show()
Data type cannot be displayed: application/vnd.plotly.v1+json
Summary & Learning Path
Key Concepts
Key takeaways:
Network flow formulation converts assignment to min-cost flow
Successive shortest paths finds optimal solution iteratively
Dijkstra optimization provides O(kE log V) vs O(kV*E)
Hungarian algorithm is often faster for dense problems
Network flow scales better for sparse/gated problems
When to Use Each Algorithm
Scenario |
Recommended Algorithm |
Complexity |
Trade-off |
|---|---|---|---|
Small dense (n < 100) |
Hungarian |
O(n³) |
Simple, proven optimal |
Large dense (n > 500) |
Dijkstra-flow |
O(k·E log V) |
Better scaling |
Sparse/gated |
Flow network |
O(k·V·E) |
Flexible constraints |
Rectangular |
Network flow |
O(min(V,E)³) |
Handles non-square |
Dynamic/streaming |
Auction |
O(n·m·ε⁻¹) |
Parallelizable |
4-Week Learning Curriculum
Week 1: Foundations
Day 1-2: Bipartite matching and assignment problem formulation
Day 3-4: Min-cost flow network structure (source, sink, workers, tasks)
Day 5: Edge costs, capacities, and supply/demand balance
Week 2: Classic Algorithms
Day 6-7: Hungarian algorithm derivation and complexity analysis
Day 8-9: Auction algorithm iterative price updates
Day 10: Comparative benchmarking for small problems
Week 3: Network Flow Methods
Day 11-12: Successive shortest paths with Bellman-Ford
Day 13-14: Dijkstra-optimized with potentials (O(k·E log V))
Day 15: Simplex method visualization and convergence
Week 4: Advanced Applications
Day 16-17: Sparse and gated network formulations
Day 18-19: 3D assignment reduction to flow networks
Day 20: Dynamic networks and streaming applications
Advanced Topics
1. Auction Algorithm for Assignment
Distributed algorithm suitable for parallel computation
Each “bidder” competes for “objects” through price mechanism
Convergence: O(n·m·ε⁻¹ log(nC)) iterations
Often faster than Hungarian for sparse problems
2. Successive Shortest Paths (SSP)
Iteratively finds augmenting paths with min-cost
Uses Bellman-Ford (handles negative costs): O(k·V·E)
Or uses Dijkstra with potentials: O(k·E log V)
Better for dynamic networks (tracks appear/disappear)
3. Network Simplex
Primal simplex method specialized for networks
Highly efficient in practice despite O(V²·E) worst-case
Used in production tracking systems
Handles rectangular, sparse, gated problems naturally
4. Min-Cost Max-Flow Relaxations
3D/4D assignment reduced to min-cost max-flow
Achieved by decomposing high-order tensor into constraints
Often better than Lagrangian relaxation for small instances
5. Dynamic/Streaming Networks
Networks where tracks/measurements appear and disappear
Incremental Hungarian using basis updates
Real-time tracking: reuse previous solution, update diffs only
Complexity: O(n²) update instead of O(n³) from scratch
6 Progressive Exercises
⭐ Beginner: Cost Matrix to Flow Network
# Implement assignment_to_flow_network from scratch
# 1. Create nodes: source, workers, tasks, sink
# 2. Add edges with costs and capacities
# 3. Set supplies/demands correctly
⭐⭐ Intermediate: Implement Successive Shortest Paths
# Implement min_cost_flow_successive_shortest_paths using Bellman-Ford
# 1. Initialize residual graph
# 2. While supply remains: find shortest path
# 3. Push flow and update supplies
# Compare timing with Hungarian algorithm
⭐⭐ Intermediate: Gating Integration
# Extend network flow to include gating constraints
# 1. Remove edges where cost > gate_threshold
# 2. Compare results: full network vs gated vs Hungarian + gating
# 3. Measure performance and optimality gap
⭐⭐⭐ Advanced: 3D Assignment via Network Flow
# Reduce 3D assignment (measurements × tracks × hypotheses) to min-cost flow
# 1. Create additional constraint nodes for 3rd dimension
# 2. Set supply/demand to enforce exactly-once selection
# 3. Compare with Lagrangian relaxation optimization
⭐⭐⭐ Advanced: Dynamic Networks
# Implement incremental assignment updates
# 1. Reuse previous solution basis
# 2. Add/remove nodes (new tracks or measurements)
# 3. Measure speedup vs cold start (target: 10-100x faster)
⭐⭐⭐⭐ Expert: Streaming Auction Algorithm
# Implement parallel auction algorithm for large-scale tracking
# 1. Multiple workers bid simultaneously for objects
# 2. Price update rules ensure convergence
# 3. Benchmark vs network simplex for 1000+ tracks
# 4. Analyze scaling and parallelism efficiency
Complete References
Core References:
Ahuja, R.K., Magnanti, T.L., & Orlin, J.B. (1993). Network Flows: Theory, Algorithms, and Applications. Prentice Hall.
Comprehensive treatment of min-cost flow algorithms
Successive shortest paths and simplex methods
Foundation for understanding flow networks in tracking
Blackman, S., & Popoli, R. (1999). Design and Analysis of Modern Tracking Systems. Artech House.
Applies network flow concepts to target tracking
Best practices for gating and assignment
Performance analysis of different algorithms
Bar-Shalom, Y., & Li, X.R. (1995). Multitarget-Multisensor Tracking: Principles and Techniques. Artech House.
Foundational MHT framework
Assignment problem formulation in tracking context
Connection between network flow and data association
Advanced References:
Bertsekas, D.P. (1988). “The Auction Algorithm: A Distributed Relaxation Method for the Assignment Problem”. Annals of Operations Research, 14(1), 105-123.
Original auction algorithm paper
Distributed and parallel variants
Complexity analysis and convergence proofs
Kuhn, H.W. (1955). “The Hungarian Method for the Assignment Problem”. Naval Research Logistics Quarterly, 2(1-2), 83-97.
Classic Hungarian algorithm (provides baseline for comparison)
Historical context for modern approaches
Fundamental pruning techniques still used today
Castañón, D.A. (1997). “Efficient Algorithms for Nonlinear Smoothing”. IEEE Transactions on Signal Processing, 45(10), 2425-2438.
Advanced filtering techniques compatible with flow networks
Extension to nonlinear systems
Connection between smoothing and assignment
PyTCL API Reference
Key functions and their integration:
# Main network flow functions
from pytcl.assignment_algorithms.network_flow import (
FlowEdge, FlowStatus,
assignment_to_flow_network, # Convert assignment → flow network
min_cost_flow_successive_shortest_paths, # SSP with Bellman-Ford
min_cost_flow_simplex, # Dijkstra-optimized
assignment_from_flow_solution, # Extract assignment from flow
min_cost_assignment_via_flow, # End-to-end flow assignment
)
# Supporting algorithms for comparison
from pytcl.assignment_algorithms import (
hungarian, # Baseline algorithm
assign2d, # Generalized 2D assignment
auction, # Auction algorithm
gnn_association, # GNN for association
jpda, # JPDA with flow networks
)