Source code for pytcl.assignment_algorithms.network_flow

"""
Network flow solutions for assignment problems.

This module provides min-cost flow formulations for assignment problems,
offering an alternative to Hungarian algorithm and relaxation methods.

A min-cost flow approach:
1. Models assignment as flow network
2. Uses cost edges for penalties
3. Enforces supply/demand constraints
4. Finds minimum-cost flow solution
5. Extracts assignment from flow

References
----------
.. [1] Ahuja, R. K., Magnanti, T. L., & Orlin, J. B. (1993). Network Flows:
       Theory, Algorithms, and Applications. Prentice-Hall.
.. [2] Costain, G., & Liang, H. (2012). An Auction Algorithm for the
       Minimum Cost Flow Problem. CoRR, abs/1208.4859.
"""

from enum import Enum
from typing import Any, NamedTuple, Tuple

import numpy as np
from numpy.typing import NDArray


[docs] class FlowStatus(Enum): """Status of min-cost flow computation.""" OPTIMAL = 0 UNBOUNDED = 1 INFEASIBLE = 2 TIMEOUT = 3
[docs] class MinCostFlowResult(NamedTuple): """Result of min-cost flow computation. Attributes ---------- flow : ndarray Flow values on each edge, shape (n_edges,). cost : float Total flow cost. status : FlowStatus Optimization status. iterations : int Number of iterations used. """ flow: NDArray[np.float64] cost: float status: FlowStatus iterations: int
class FlowEdge(NamedTuple): """Edge in a flow network. Attributes ---------- from_node : int Source node index. to_node : int Destination node index. capacity : float Maximum flow on edge (default 1.0 for assignment). cost : float Cost per unit flow. """ from_node: int to_node: int capacity: float cost: float
[docs] def assignment_to_flow_network( cost_matrix: NDArray[np.float64], ) -> Tuple[list[FlowEdge], NDArray[np.floating], NDArray[Any]]: """ Convert 2D assignment problem to min-cost flow network. Network structure: - Source node (0) supplies all workers - Worker nodes (1 to m) demand 1 unit each - Task nodes (m+1 to m+n) supply 1 unit each - Sink node (m+n+1) collects all completed tasks Parameters ---------- cost_matrix : ndarray Cost matrix of shape (m, n) where cost[i,j] is cost of assigning worker i to task j. Returns ------- edges : list[FlowEdge] List of edges in the flow network. supplies : ndarray Supply/demand at each node (shape n_nodes,). Positive = supply, negative = demand. node_names : ndarray Names of nodes for reference. Examples -------- >>> import numpy as np >>> # 2 workers, 3 tasks >>> cost = np.array([[1.0, 2.0, 3.0], ... [4.0, 5.0, 6.0]]) >>> edges, supplies, names = assignment_to_flow_network(cost) >>> len(edges) # source->workers + workers->tasks + tasks->sink 11 >>> supplies[0] # source supplies 2 (num workers) 2.0 >>> names[0] 'source' """ m, n = cost_matrix.shape # Node numbering: # 0: source # 1 to m: workers # m+1 to m+n: tasks # m+n+1: sink n_nodes = m + n + 2 source = 0 sink = m + n + 1 edges = [] # Source to workers: capacity 1, cost 0 for i in range(1, m + 1): edges.append(FlowEdge(from_node=source, to_node=i, capacity=1.0, cost=0.0)) # Workers to tasks: capacity 1, cost = assignment cost for i in range(m): for j in range(n): worker_node = i + 1 task_node = m + 1 + j edges.append( FlowEdge( from_node=worker_node, to_node=task_node, capacity=1.0, cost=cost_matrix[i, j], ) ) # Tasks to sink: capacity 1, cost 0 for j in range(1, n + 1): task_node = m + j edges.append( FlowEdge(from_node=task_node, to_node=sink, capacity=1.0, cost=0.0) ) # Supply/demand: source supplies m units, sink demands m units supplies = np.zeros(n_nodes) supplies[source] = float(m) supplies[sink] = float(-m) node_names = np.array( ["source"] + [f"worker_{i}" for i in range(m)] + [f"task_{j}" for j in range(n)] + ["sink"] ) return edges, supplies, node_names
[docs] def min_cost_flow_successive_shortest_paths( edges: list[FlowEdge], supplies: NDArray[np.float64], max_iterations: int = 1000, ) -> MinCostFlowResult: """ Solve min-cost flow using successive shortest paths with cost scaling. Algorithm: 1. Initialize potentials using Bellman-Ford 2. While there is excess supply: - Find shortest path using reduced costs (Dijkstra with potentials) - Push unit flow along path - Update node potentials - Recompute shortest paths to maintain optimality This is the standard min-cost flow algorithm that guarantees optimality and convergence. It uses Dijkstra's algorithm with potentials, which maintains the dual feasibility (reduced cost property). Parameters ---------- edges : list[FlowEdge] List of edges with capacities and costs. supplies : ndarray Supply/demand at each node. max_iterations : int, optional Maximum iterations (default 1000). Returns ------- MinCostFlowResult Solution with flow values, cost, status, and iterations. Examples -------- >>> import numpy as np >>> from pytcl.assignment_algorithms.network_flow import ( ... FlowEdge, assignment_to_flow_network ... ) >>> cost = np.array([[1.0, 5.0], [4.0, 2.0]]) >>> edges, supplies, _ = assignment_to_flow_network(cost) >>> result = min_cost_flow_successive_shortest_paths(edges, supplies) >>> result.status == FlowStatus.OPTIMAL True >>> result.cost # Optimal is 1+2=3 (diagonal) 3.0 Notes ----- This implementation uses successive shortest paths with potentials. The algorithm is guaranteed to find the optimal solution for any feasible min-cost flow problem. For rectangular assignment problems (m < n or m > n), all m units of flow must be satisfied. The algorithm ensures this by finding augmenting paths until all supply is routed. """ n_nodes = len(supplies) n_edges = len(edges) # Initialize flow and residual capacity flow = np.zeros(n_edges) residual_capacity = np.array([e.capacity for e in edges]) # Initialize node potentials using Bellman-Ford from a dummy source # This ensures all potentials are finite and maintains dual feasibility potential = np.zeros(n_nodes) # Build adjacency list representation # Each entry: (to_node, edge_idx, is_reverse, cost) graph: list[list[tuple[int, int, int, float]]] = [[] for _ in range(n_nodes)] for edge_idx, edge in enumerate(edges): # Forward edge graph[edge.from_node].append((edge.to_node, edge_idx, 0, edge.cost)) # Reverse edge (for flow cancellation) graph[edge.to_node].append((edge.from_node, edge_idx, 1, -edge.cost)) current_supplies = supplies.copy() iteration = 0 # Main algorithm loop while iteration < max_iterations: # Find excess and deficit nodes excess_node = -1 deficit_node = -1 for node in range(n_nodes): if current_supplies[node] > 1e-10: excess_node = node break if excess_node < 0: break # No more excess nodes for node in range(n_nodes): if current_supplies[node] < -1e-10: deficit_node = node break if deficit_node < 0: break # No deficit nodes # Find shortest path using Dijkstra with potentials # Reduced cost: c_reduced(u,v) = c(u,v) + π(u) - π(v) dist = np.full(n_nodes, np.inf) dist[excess_node] = 0.0 parent = np.full(n_nodes, -1, dtype=int) parent_edge = np.full(n_nodes, -1, dtype=int) parent_reverse = np.full(n_nodes, 0, dtype=int) visited = np.zeros(n_nodes, dtype=bool) # Dijkstra's algorithm for _ in range(n_nodes): # Find unvisited node with minimum distance u = -1 min_dist = np.inf for node in range(n_nodes): if not visited[node] and dist[node] < min_dist: u = node min_dist = dist[node] if u < 0 or dist[u] == np.inf: break visited[u] = True # Relax edges from u for v, eidx, is_rev, cost in graph[u]: if residual_capacity[eidx] > 1e-10: # Compute reduced cost reduced_cost = cost + potential[u] - potential[v] new_dist = dist[u] + reduced_cost if new_dist < dist[v] - 1e-10: dist[v] = new_dist parent[v] = u parent_edge[v] = eidx parent_reverse[v] = is_rev if dist[deficit_node] >= np.inf: # No path found break # Update potentials to maintain dual feasibility for node in range(n_nodes): if dist[node] < np.inf: potential[node] += dist[node] # Extract path by backtracking path_edges = [] path_reverse_flags = [] node = deficit_node path_length = 0 visited_set = set() while parent[node] >= 0: if path_length >= n_nodes: break # Safety check if node in visited_set: break # Cycle detected visited_set.add(node) path_edges.append(parent_edge[node]) path_reverse_flags.append(parent_reverse[node]) node = parent[node] path_length += 1 if not path_edges: iteration += 1 continue path_edges.reverse() path_reverse_flags.reverse() # Find bottleneck capacity min_flow = min(residual_capacity[e] for e in path_edges) min_flow = min( min_flow, current_supplies[excess_node], -current_supplies[deficit_node], ) # Push flow along path for edge_idx, is_reverse in zip(path_edges, path_reverse_flags): if is_reverse == 0: # Forward edge: increase flow flow[edge_idx] += min_flow residual_capacity[edge_idx] -= min_flow else: # Reverse edge: decrease flow (cancel) flow[edge_idx] -= min_flow residual_capacity[edge_idx] += min_flow current_supplies[excess_node] -= min_flow current_supplies[deficit_node] += min_flow iteration += 1 # Compute total cost: include all flows (including negative which cancel) total_cost = 0.0 for i, edge in enumerate(edges): total_cost += flow[i] * edge.cost # Determine status if np.allclose(current_supplies, 0, atol=1e-6): status = FlowStatus.OPTIMAL elif iteration >= max_iterations: status = FlowStatus.TIMEOUT else: status = FlowStatus.INFEASIBLE return MinCostFlowResult( flow=flow, cost=total_cost, status=status, iterations=iteration, )
def min_cost_flow_simplex( edges: list[FlowEdge], supplies: NDArray[np.float64], max_iterations: int = 10000, ) -> MinCostFlowResult: """ Solve min-cost flow using Dijkstra-based successive shortest paths. This optimized version uses: - Dijkstra's algorithm (O(E log V)) instead of Bellman-Ford (O(VE)) - Node potentials to maintain non-negative edge costs - Johnson's technique for cost adjustment This is significantly faster than Bellman-Ford while maintaining guaranteed correctness and optimality. Time complexity: O(K * E log V) where K = number of shortest paths Space complexity: O(V + E) Parameters ---------- edges : list[FlowEdge] List of edges with capacities and costs. supplies : ndarray Supply/demand at each node. max_iterations : int, optional Maximum iterations (default 10000). Returns ------- MinCostFlowResult Solution with flow values, cost, status, and iterations. References ---------- .. [1] Ahuja, R. K., Magnanti, T. L., & Orlin, J. B. (1993). Network Flows: Theory, Algorithms, and Applications. (Chapter on successive shortest paths with potentials) .. [2] Johnson, D. B. (1977). Efficient All-Pairs Shortest Paths in Weighted Graphs. """ from pytcl.assignment_algorithms.dijkstra_min_cost import ( min_cost_flow_dijkstra_potentials, ) n_nodes = len(supplies) # Convert FlowEdge objects to tuples edge_tuples = [(e.from_node, e.to_node, e.capacity, e.cost) for e in edges] # Run optimized Dijkstra-based algorithm flow, total_cost, iterations = min_cost_flow_dijkstra_potentials( n_nodes, edge_tuples, supplies, max_iterations ) # Check feasibility residual_supplies = supplies.copy() for i, edge in enumerate(edges): residual_supplies[edge.from_node] -= flow[i] residual_supplies[edge.to_node] += flow[i] if np.allclose(residual_supplies, 0, atol=1e-6): status = FlowStatus.OPTIMAL elif iterations >= max_iterations: status = FlowStatus.TIMEOUT else: status = FlowStatus.INFEASIBLE return MinCostFlowResult( flow=flow, cost=total_cost, status=status, iterations=iterations, ) def assignment_from_flow_solution( flow: NDArray[np.float64], edges: list[FlowEdge], cost_matrix_shape: Tuple[int, int], ) -> Tuple[NDArray[np.intp], float]: """ Extract assignment from flow network solution. A valid flow solution for assignment should have: - Exactly 1 unit of flow from each worker to some task - Exactly 1 unit of flow to each task from some worker - No negative flows on worker->task edges (those are cancellations) This function extracts the actual assignment by identifying which worker->task edges carry the net positive flow. Parameters ---------- flow : ndarray Flow values on each edge. edges : list[FlowEdge] List of edges used in network. cost_matrix_shape : tuple Shape of original cost matrix (m, n). Returns ------- assignment : ndarray Assignment array of shape (n_assignments, 2) with [worker, task]. cost : float Total assignment cost. """ m, n = cost_matrix_shape assignment = [] cost = 0.0 # Build source node (node 0) and sink node (node m+n+1) indices source = 0 sink = m + n + 1 # For a valid assignment solution: # - Count flow out of source to each worker # - Count flow into sink from each task worker_outflow = np.zeros(m) task_inflow = np.zeros(n) # Collect all worker->task edges and their flows worker_task_edges = [] for edge_idx, edge in enumerate(edges): # Worker edges: from source (0) to worker nodes (1..m) if edge.from_node == source and 1 <= edge.to_node <= m: worker_id = edge.to_node - 1 worker_outflow[worker_id] += flow[edge_idx] # Task edges: from task nodes (m+1..m+n) to sink if m + 1 <= edge.from_node <= m + n and edge.to_node == sink: task_id = edge.from_node - (m + 1) task_inflow[task_id] += flow[edge_idx] # Worker-to-task edges if 1 <= edge.from_node <= m and m + 1 <= edge.to_node <= m + n: worker_id = edge.from_node - 1 task_id = edge.to_node - (m + 1) if flow[edge_idx] > 0.5: # Positive flow means this edge is used worker_task_edges.append( { "worker": worker_id, "task": task_id, "flow": flow[edge_idx], "cost": edge.cost, "edge_idx": edge_idx, } ) # For assignment problems, each worker should have exactly 1 outgoing flow # and each task should have exactly 1 incoming flow # Extract the assignment from worker->task edges with positive flow for edge_info in worker_task_edges: assignment.append([edge_info["worker"], edge_info["task"]]) cost += edge_info["flow"] * edge_info["cost"] assignment = ( np.array(assignment, dtype=np.intp) if assignment else np.empty((0, 2), dtype=np.intp) ) return assignment, cost
[docs] def min_cost_assignment_via_flow( cost_matrix: NDArray[np.float64], use_simplex: bool = True, ) -> Tuple[NDArray[np.intp], float]: """ Solve 2D assignment problem via min-cost flow network. Uses Dijkstra-optimized successive shortest paths (Phase 1B) by default. Falls back to Bellman-Ford if needed. Parameters ---------- cost_matrix : ndarray Cost matrix of shape (m, n). use_simplex : bool, optional Use Dijkstra-optimized algorithm (default True) or Bellman-Ford based successive shortest paths (False). Returns ------- assignment : ndarray Assignment array of shape (n_assignments, 2). total_cost : float Total assignment cost. Examples -------- >>> import numpy as np >>> cost = np.array([[1.0, 5.0, 9.0], ... [3.0, 2.0, 8.0], ... [7.0, 6.0, 4.0]]) >>> assignment, total_cost = min_cost_assignment_via_flow(cost) >>> total_cost # Optimal assignment: (0,0), (1,1), (2,2) = 1+2+4 = 7 7.0 >>> len(assignment) 3 Notes ----- Phase 1B: Dijkstra-based optimization provides O(K*E log V) vs Bellman-Ford O(K*V*E), where K is number of shortest paths needed. """ edges, supplies, _ = assignment_to_flow_network(cost_matrix) if use_simplex: result = min_cost_flow_simplex(edges, supplies) else: result = min_cost_flow_successive_shortest_paths(edges, supplies) assignment, cost = assignment_from_flow_solution( result.flow, edges, cost_matrix.shape ) return assignment, cost