Performance Optimization: Profiling and Benchmarking
This notebook covers performance optimization techniques for tracking applications. We explore:
Profiling Basics - Identifying bottlenecks
Numba JIT Compilation - Accelerating numerical code
Vectorization - Efficient NumPy operations
Caching Strategies - Avoiding redundant computations
Memory Optimization - Reducing allocations
Benchmarking Best Practices - Reliable measurements
Prerequisites
pip install nrl-tracker plotly numpy numba line_profiler memory_profiler
[2]:
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import time
import cProfile
import pstats
from io import StringIO
from functools import lru_cache
try:
from numba import njit, prange
NUMBA_AVAILABLE = True
except ImportError:
NUMBA_AVAILABLE = False
print("Numba not available. Install with: pip install numba")
# Import tracking functions
from pytcl.dynamic_estimation.kalman import kf_predict, kf_update
from pytcl.assignment_algorithms import hungarian
from pytcl.dynamic_estimation.particle_filters import (
resample_systematic, effective_sample_size
)
np.random.seed(42)
# Plotly dark theme template
dark_template = go.layout.Template()
dark_template.layout = go.Layout(
paper_bgcolor='#0d1117',
plot_bgcolor='#0d1117',
font=dict(color='#e6edf3'),
xaxis=dict(gridcolor='#30363d', zerolinecolor='#30363d'),
yaxis=dict(gridcolor='#30363d', zerolinecolor='#30363d'),
)
1. Profiling Basics
Before optimizing, identify where time is actually spent.
Profiling Tools
Tool |
Purpose |
Overhead |
|---|---|---|
|
Quick timing |
Minimal |
|
Function-level profiling |
Moderate |
|
Line-by-line profiling |
High |
|
Memory usage |
High |
[3]:
def simple_timer(func, *args, n_runs=10, **kwargs):
"""
Time a function call with multiple runs.
Parameters
----------
func : callable
Function to time.
*args : tuple
Arguments to pass to function.
n_runs : int
Number of timing runs.
**kwargs : dict
Keyword arguments to pass to function.
Returns
-------
dict
Timing statistics.
"""
# Warmup
func(*args, **kwargs)
times = []
for _ in range(n_runs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
times.append(end - start)
return {
'mean': np.mean(times),
'std': np.std(times),
'min': np.min(times),
'max': np.max(times),
'n_runs': n_runs
}
# Example: Time matrix multiplication
n = 500
A = np.random.randn(n, n)
B = np.random.randn(n, n)
stats = simple_timer(np.dot, A, B, n_runs=20)
print(f"Matrix multiplication ({n}x{n}):")
print(f" Mean: {stats['mean']*1e3:.2f} ms")
print(f" Std: {stats['std']*1e3:.2f} ms")
Matrix multiplication (500x500):
Mean: 0.42 ms
Std: 0.08 ms
[6]:
# Profile a tracking scenario
def run_kalman_tracking(n_steps=100):
"""Run Kalman filter tracking."""
# System matrices
dt = 1.0
F = np.array([[1, dt, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, dt],
[0, 0, 0, 1]])
H = np.array([[1, 0, 0, 0],
[0, 0, 1, 0]])
Q = np.eye(4) * 0.1
R = np.eye(2) * 1.0
x = np.zeros(4)
P = np.eye(4) * 100
for _ in range(n_steps):
# Predict
x = F @ x
P = F @ P @ F.T + Q
# Update
z = np.random.randn(2) * np.sqrt(R[0, 0])
y = z - H @ x
S = H @ P @ H.T + R
K = P @ H.T @ np.linalg.inv(S)
x = x + K @ y
P = (np.eye(4) - K @ H) @ P
return x, P
# Profile with cProfile
profiler = cProfile.Profile()
profiler.enable()
for _ in range(100):
run_kalman_tracking(100)
profiler.disable()
# Print top 10 functions by cumulative time
s = StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(10)
print(s.getvalue())
271792 function calls (271789 primitive calls) in 0.213 seconds
Ordered by: cumulative time
List reduced from 102 to 10 due to restriction <10>
ncalls tottime percall cumtime percall filename:lineno(function)
100 0.135 0.001 0.212 0.002 /var/folders/01/nxrmp0tn3tn73c5ty1j1ywm40000gn/T/ipykernel_51637/3480273987.py:2(run_kalman_tracking)
1 0.000 0.000 0.104 0.104 /Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/events.py:87(_run)
1 0.000 0.000 0.104 0.104 {method 'run' of '_contextvars.Context' objects}
1 0.000 0.000 0.104 0.104 /Users/nedonatelli/Library/Python/3.13/lib/python/site-packages/tornado/platform/asyncio.py:206(_handle_events)
1 0.000 0.000 0.104 0.104 /Users/nedonatelli/Library/Python/3.13/lib/python/site-packages/zmq/eventloop/zmqstream.py:573(_handle_events)
1 0.000 0.000 0.104 0.104 /Users/nedonatelli/Library/Python/3.13/lib/python/site-packages/zmq/eventloop/zmqstream.py:614(_handle_recv)
1 0.000 0.000 0.104 0.104 /Users/nedonatelli/Library/Python/3.13/lib/python/site-packages/zmq/eventloop/zmqstream.py:546(_run_callback)
1 0.000 0.000 0.104 0.104 /Users/nedonatelli/Library/Python/3.13/lib/python/site-packages/ipykernel/iostream.py:157(_handle_event)
1 0.000 0.000 0.104 0.104 /Users/nedonatelli/Library/Python/3.13/lib/python/site-packages/ipykernel/iostream.py:276(<lambda>)
1 0.000 0.000 0.104 0.104 /Users/nedonatelli/Library/Python/3.13/lib/python/site-packages/ipykernel/iostream.py:278(_really_send)
2. Numba JIT Compilation
Numba compiles Python functions to optimized machine code at runtime.
Key Features
@njit- Compile without Python interpreter (“nopython” mode)@njit(parallel=True)- Automatic parallelizationprange- Parallel range for loopscache=True- Cache compiled code between runs
[7]:
# Pure Python implementation
def compute_distances_python(points, center):
"""Compute distances from points to center (pure Python)."""
n = len(points)
distances = np.zeros(n)
for i in range(n):
d = 0.0
for j in range(len(center)):
d += (points[i, j] - center[j]) ** 2
distances[i] = np.sqrt(d)
return distances
# Vectorized NumPy implementation
def compute_distances_numpy(points, center):
"""Compute distances from points to center (NumPy)."""
return np.sqrt(np.sum((points - center) ** 2, axis=1))
if NUMBA_AVAILABLE:
# Numba JIT implementation
@njit(cache=True)
def compute_distances_numba(points, center):
"""Compute distances from points to center (Numba)."""
n = len(points)
dim = len(center)
distances = np.zeros(n)
for i in range(n):
d = 0.0
for j in range(dim):
d += (points[i, j] - center[j]) ** 2
distances[i] = np.sqrt(d)
return distances
# Parallel Numba implementation
@njit(parallel=True, cache=True)
def compute_distances_numba_parallel(points, center):
"""Compute distances from points to center (parallel Numba)."""
n = len(points)
dim = len(center)
distances = np.zeros(n)
for i in prange(n):
d = 0.0
for j in range(dim):
d += (points[i, j] - center[j]) ** 2
distances[i] = np.sqrt(d)
return distances
print("Distance functions defined.")
Distance functions defined.
[8]:
# Benchmark different implementations
n_points_list = [1000, 10000, 100000, 1000000]
dim = 3
python_times = []
numpy_times = []
numba_times = []
numba_parallel_times = []
for n_points in n_points_list:
points = np.random.randn(n_points, dim)
center = np.random.randn(dim)
# Python (only for smaller sizes)
if n_points <= 10000:
stats = simple_timer(compute_distances_python, points, center, n_runs=3)
python_times.append(stats['mean'])
else:
python_times.append(np.nan)
# NumPy
stats = simple_timer(compute_distances_numpy, points, center, n_runs=10)
numpy_times.append(stats['mean'])
if NUMBA_AVAILABLE:
# Numba (warmup first)
_ = compute_distances_numba(points, center)
stats = simple_timer(compute_distances_numba, points, center, n_runs=10)
numba_times.append(stats['mean'])
# Parallel Numba
_ = compute_distances_numba_parallel(points, center)
stats = simple_timer(compute_distances_numba_parallel, points, center, n_runs=10)
numba_parallel_times.append(stats['mean'])
print(f"n={n_points:7d}: NumPy={numpy_times[-1]*1e3:7.2f}ms", end='')
if NUMBA_AVAILABLE:
print(f", Numba={numba_times[-1]*1e3:7.2f}ms, "
f"Parallel={numba_parallel_times[-1]*1e3:7.2f}ms")
else:
print()
n= 1000: NumPy= 0.04ms, Numba= 0.00ms, Parallel= 0.10ms
n= 10000: NumPy= 0.10ms, Numba= 0.02ms, Parallel= 0.11ms
n= 100000: NumPy= 0.91ms, Numba= 0.20ms, Parallel= 0.15ms
n=1000000: NumPy= 9.38ms, Numba= 2.08ms, Parallel= 0.33ms
[9]:
# Visualize performance comparison
fig = go.Figure()
fig.add_trace(
go.Scatter(x=n_points_list, y=np.array(numpy_times)*1e3, mode='lines+markers',
name='NumPy', line=dict(color='#00d4ff', width=2),
marker=dict(size=8))
)
if NUMBA_AVAILABLE:
fig.add_trace(
go.Scatter(x=n_points_list, y=np.array(numba_times)*1e3, mode='lines+markers',
name='Numba', line=dict(color='#00ff88', width=2),
marker=dict(size=8, symbol='square'))
)
fig.add_trace(
go.Scatter(x=n_points_list, y=np.array(numba_parallel_times)*1e3, mode='lines+markers',
name='Numba Parallel', line=dict(color='#ff4757', width=2),
marker=dict(size=8, symbol='triangle-up'))
)
fig.update_layout(
template=dark_template,
title='Distance Computation Performance',
xaxis_title='Number of Points',
yaxis_title='Time (ms)',
xaxis_type='log',
yaxis_type='log',
height=450,
)
fig.show()
Data type cannot be displayed: application/vnd.plotly.v1+json
3. Vectorization Patterns
Replacing loops with vectorized NumPy operations can provide significant speedups.
[10]:
# Example: Compute pairwise distances
def pairwise_distances_loop(X):
"""Compute pairwise distances using loops."""
n = len(X)
D = np.zeros((n, n))
for i in range(n):
for j in range(i+1, n):
d = np.sqrt(np.sum((X[i] - X[j])**2))
D[i, j] = d
D[j, i] = d
return D
def pairwise_distances_broadcast(X):
"""Compute pairwise distances using broadcasting."""
# Shape: (n, 1, d) - (1, n, d) = (n, n, d)
diff = X[:, np.newaxis, :] - X[np.newaxis, :, :]
return np.sqrt(np.sum(diff**2, axis=-1))
def pairwise_distances_einsum(X):
"""Compute pairwise distances using einsum."""
# D[i,j] = ||X[i] - X[j]||² = ||X[i]||² + ||X[j]||² - 2*X[i]·X[j]
norms_sq = np.sum(X**2, axis=1)
dot_products = np.einsum('ik,jk->ij', X, X)
D_sq = norms_sq[:, np.newaxis] + norms_sq[np.newaxis, :] - 2 * dot_products
D_sq = np.maximum(D_sq, 0) # Handle numerical errors
return np.sqrt(D_sq)
# Benchmark
n = 500
X = np.random.randn(n, 3)
print(f"Pairwise distances for {n} points:")
stats = simple_timer(pairwise_distances_loop, X, n_runs=3)
print(f" Loop: {stats['mean']*1e3:8.2f} ms")
stats = simple_timer(pairwise_distances_broadcast, X, n_runs=10)
print(f" Broadcast: {stats['mean']*1e3:8.2f} ms")
stats = simple_timer(pairwise_distances_einsum, X, n_runs=10)
print(f" Einsum: {stats['mean']*1e3:8.2f} ms")
# Verify results match
D1 = pairwise_distances_loop(X)
D2 = pairwise_distances_broadcast(X)
D3 = pairwise_distances_einsum(X)
print(f"\nMax difference: {max(np.max(np.abs(D1-D2)), np.max(np.abs(D1-D3))):.2e}")
Pairwise distances for 500 points:
Loop: 245.64 ms
Broadcast: 3.34 ms
Einsum: 1.00 ms
Max difference: 5.96e-08
[4]:
# Batch Kalman filter - vectorized version
def batch_kf_predict_loop(x_batch, P_batch, F, Q):
"""Predict step using loop."""
n_batch = len(x_batch)
x_pred = np.zeros_like(x_batch)
P_pred = np.zeros_like(P_batch)
for i in range(n_batch):
x_pred[i] = F @ x_batch[i]
P_pred[i] = F @ P_batch[i] @ F.T + Q
return x_pred, P_pred
def batch_kf_predict_vectorized(x_batch, P_batch, F, Q):
"""Predict step using vectorized operations."""
# x_pred[i] = F @ x_batch[i] -> use einsum
x_pred = np.einsum('ij,bj->bi', F, x_batch)
# P_pred[i] = F @ P_batch[i] @ F.T + Q
FP = np.einsum('ij,bjk->bik', F, P_batch) # F @ P for each batch
FPFt = np.einsum('bij,kj->bik', FP, F) # ... @ F.T
P_pred = FPFt + Q
return x_pred, P_pred
# Benchmark
n_batch = 1000
state_dim = 6
x_batch = np.random.randn(n_batch, state_dim)
P_batch = np.tile(np.eye(state_dim), (n_batch, 1, 1))
F = np.random.randn(state_dim, state_dim)
Q = np.eye(state_dim) * 0.1
print(f"Batch KF predict ({n_batch} tracks, {state_dim}D state):")
stats = simple_timer(batch_kf_predict_loop, x_batch, P_batch, F, Q, n_runs=10)
print(f" Loop: {stats['mean']*1e3:8.2f} ms")
stats = simple_timer(batch_kf_predict_vectorized, x_batch, P_batch, F, Q, n_runs=10)
print(f" Vectorized: {stats['mean']*1e3:8.2f} ms")
Batch KF predict (1000 tracks, 6D state):
Loop: 2.39 ms
Vectorized: 0.49 ms
4. Caching Strategies
Avoid recomputing expensive results that are used multiple times.
[5]:
# Example: Spherical harmonic coefficients
def compute_legendre_uncached(n_max, theta):
"""Compute associated Legendre polynomials without caching."""
x = np.cos(theta)
P = np.zeros((n_max + 1, n_max + 1))
P[0, 0] = 1.0
if n_max > 0:
P[1, 0] = x
P[1, 1] = -np.sin(theta)
for n in range(2, n_max + 1):
for m in range(n + 1):
if m == n:
P[n, m] = -(2*n - 1) * np.sin(theta) * P[n-1, m-1]
elif m == n - 1:
P[n, m] = (2*n - 1) * x * P[n-1, m]
else:
P[n, m] = ((2*n - 1) * x * P[n-1, m] - (n + m - 1) * P[n-2, m]) / (n - m)
return P
@lru_cache(maxsize=128)
def compute_legendre_cached(n_max, theta_index):
"""Compute associated Legendre polynomials with caching.
Note: Using theta_index (discretized) for hashability.
"""
theta = theta_index * 0.01 # Convert index back to angle
return compute_legendre_uncached(n_max, theta)
# Simulate repeated evaluation at same locations (common in tracking)
n_max = 20
n_evals = 1000
n_unique_locations = 50 # Many evaluations at same locations
# Random angles, but with repetition
theta_unique = np.random.rand(n_unique_locations) * np.pi
theta_indices = np.random.randint(0, n_unique_locations, n_evals)
# Uncached
start = time.time()
for i in range(n_evals):
_ = compute_legendre_uncached(n_max, theta_unique[theta_indices[i]])
uncached_time = time.time() - start
# Cached (clear cache first)
compute_legendre_cached.cache_clear()
start = time.time()
for i in range(n_evals):
# Convert theta to discrete index for caching
theta_idx = int(theta_unique[theta_indices[i]] * 100)
_ = compute_legendre_cached(n_max, theta_idx)
cached_time = time.time() - start
print(f"Legendre polynomials (n_max={n_max}, {n_evals} evaluations):")
print(f" Uncached: {uncached_time*1e3:.2f} ms")
print(f" Cached: {cached_time*1e3:.2f} ms")
print(f" Speedup: {uncached_time/cached_time:.1f}x")
print(f" Cache hits: {compute_legendre_cached.cache_info().hits}")
print(f" Cache misses: {compute_legendre_cached.cache_info().misses}")
Legendre polynomials (n_max=20, 1000 evaluations):
Uncached: 79.07 ms
Cached: 3.56 ms
Speedup: 22.2x
Cache hits: 952
Cache misses: 48
5. Memory Optimization
Reducing memory allocations can significantly improve performance.
[6]:
# Pre-allocation vs. dynamic allocation
def simulate_with_append(n_steps):
"""Simulate trajectory using list append."""
trajectory = []
x = np.zeros(4)
for _ in range(n_steps):
x = x + np.random.randn(4) * 0.1
trajectory.append(x.copy())
return np.array(trajectory)
def simulate_with_prealloc(n_steps):
"""Simulate trajectory with pre-allocated array."""
trajectory = np.zeros((n_steps, 4))
x = np.zeros(4)
for i in range(n_steps):
x = x + np.random.randn(4) * 0.1
trajectory[i] = x
return trajectory
# Benchmark
n_steps = 10000
stats1 = simple_timer(simulate_with_append, n_steps, n_runs=20)
stats2 = simple_timer(simulate_with_prealloc, n_steps, n_runs=20)
print(f"Trajectory simulation ({n_steps} steps):")
print(f" List append: {stats1['mean']*1e3:.2f} ms")
print(f" Pre-allocated: {stats2['mean']*1e3:.2f} ms")
print(f" Speedup: {stats1['mean']/stats2['mean']:.1f}x")
Trajectory simulation (10000 steps):
List append: 11.78 ms
Pre-allocated: 10.15 ms
Speedup: 1.2x
[7]:
# In-place operations
def update_with_copy(x, dx):
"""Update using copy."""
return x + dx
def update_in_place(x, dx):
"""Update in place."""
x += dx
return x
# Benchmark with many iterations
n_iter = 100000
x = np.zeros(100)
dx = np.random.randn(100) * 0.001
# With copy
x_copy = x.copy()
start = time.time()
for _ in range(n_iter):
x_copy = update_with_copy(x_copy, dx)
copy_time = time.time() - start
# In place
x_inplace = x.copy()
start = time.time()
for _ in range(n_iter):
update_in_place(x_inplace, dx)
inplace_time = time.time() - start
print(f"Update operations ({n_iter} iterations):")
print(f" With copy: {copy_time*1e3:.2f} ms")
print(f" In place: {inplace_time*1e3:.2f} ms")
print(f" Speedup: {copy_time/inplace_time:.1f}x")
Update operations (100000 iterations):
With copy: 28.16 ms
In place: 24.99 ms
Speedup: 1.1x
6. Benchmarking Best Practices
Guidelines for reliable performance measurements.
[8]:
def reliable_benchmark(func, *args, n_warmup=5, n_runs=50, **kwargs):
"""
Perform a reliable benchmark with proper warmup and statistics.
Parameters
----------
func : callable
Function to benchmark.
n_warmup : int
Number of warmup runs.
n_runs : int
Number of measurement runs.
Returns
-------
dict
Benchmark results with statistics.
"""
import gc
# Disable garbage collection during timing
gc_was_enabled = gc.isenabled()
gc.disable()
try:
# Warmup
for _ in range(n_warmup):
func(*args, **kwargs)
# Measurement
times = []
for _ in range(n_runs):
start = time.perf_counter_ns()
result = func(*args, **kwargs)
end = time.perf_counter_ns()
times.append((end - start) / 1e6) # Convert to ms
finally:
if gc_was_enabled:
gc.enable()
times = np.array(times)
return {
'mean': np.mean(times),
'std': np.std(times),
'median': np.median(times),
'min': np.min(times),
'max': np.max(times),
'p25': np.percentile(times, 25),
'p75': np.percentile(times, 75),
'n_runs': n_runs,
'times': times
}
def print_benchmark_results(name, results):
"""Print formatted benchmark results."""
print(f"{name}:")
print(f" Mean: {results['mean']:.3f} ms (±{results['std']:.3f})")
print(f" Median: {results['median']:.3f} ms")
print(f" Range: [{results['min']:.3f}, {results['max']:.3f}] ms")
print(f" IQR: [{results['p25']:.3f}, {results['p75']:.3f}] ms")
[10]:
# Run comprehensive benchmark
print("Comprehensive Benchmark Results")
print("=" * 60)
# Matrix operations
n = 500
A = np.random.randn(n, n)
B = np.random.randn(n, n)
results = reliable_benchmark(np.dot, A, B)
print_benchmark_results(f"Matrix multiply ({n}x{n})", results)
print()
# Matrix inversion
results = reliable_benchmark(np.linalg.inv, A)
print_benchmark_results(f"Matrix inversion ({n}x{n})", results)
print()
# Hungarian assignment
m = 100
cost = np.random.rand(m, m) * 100
results = reliable_benchmark(hungarian, cost)
print_benchmark_results(f"Hungarian assignment ({m}x{m})", results)
Comprehensive Benchmark Results
============================================================
Matrix multiply (500x500):
Mean: 0.378 ms (±0.051)
Median: 0.362 ms
Range: [0.345, 0.648] ms
IQR: [0.353, 0.381] ms
Matrix inversion (500x500):
Mean: 2.401 ms (±0.077)
Median: 2.389 ms
Range: [2.302, 2.764] ms
IQR: [2.352, 2.434] ms
Hungarian assignment (100x100):
Mean: 0.067 ms (±0.006)
Median: 0.064 ms
Range: [0.063, 0.085] ms
IQR: [0.063, 0.068] ms
[11]:
# Visualize benchmark distribution
# Run multiple sizes
sizes = [100, 200, 300, 400, 500]
means = []
stds = []
for n in sizes:
A = np.random.randn(n, n)
B = np.random.randn(n, n)
results = reliable_benchmark(np.dot, A, B, n_runs=50)
means.append(results['mean'])
stds.append(results['std'])
means = np.array(means)
stds = np.array(stds)
fig = go.Figure()
# Error bars with shaded region
fig.add_trace(
go.Scatter(x=sizes, y=means + stds, mode='lines',
line=dict(width=0), showlegend=False, hoverinfo='skip')
)
fig.add_trace(
go.Scatter(x=sizes, y=means - stds, mode='lines',
fill='tonexty', fillcolor='rgba(0, 212, 255, 0.3)',
line=dict(width=0), name='±1σ')
)
fig.add_trace(
go.Scatter(x=sizes, y=means, mode='lines+markers',
name='Mean time', line=dict(color='#00d4ff', width=2),
marker=dict(size=10), error_y=dict(type='data', array=stds,
visible=True, color='#00d4ff'))
)
# O(n³) reference
n3_fit = means[0] * (np.array(sizes) / sizes[0])**3
fig.add_trace(
go.Scatter(x=sizes, y=n3_fit, mode='lines',
name='O(n³) reference', line=dict(color='#ff4757', width=1.5, dash='dash'))
)
fig.update_layout(
template=dark_template,
title='Matrix Multiplication Benchmark',
xaxis_title='Matrix Size',
yaxis_title='Time (ms)',
height=450,
)
fig.show()
Data type cannot be displayed: application/vnd.plotly.v1+json
Summary & Learning Path
Key Optimization Strategies
Key optimization strategies (in priority order):
Profile first - Identify actual bottlenecks (not guesses)
Vectorize - Replace explicit loops with NumPy operations
Use Numba - JIT compile hot spots to machine code
Cache results - Memoize expensive pure functions
Pre-allocate - Use np.zeros instead of append loops
In-place operations - Use += instead of temp = x + y
Data types - Use float32 if precision allows (2x memory/speed)
Parallelization - Use Numba prange or multiprocessing
Optimization Checklist
Check |
Tool |
Speedup |
Effort |
|---|---|---|---|
Profile |
cProfile, line_profiler |
0x (info) |
Low |
Vectorize |
NumPy broadcasting |
10-100x |
Medium |
Numba |
@njit, @njit(parallel=True) |
100-1000x |
Medium |
Cache |
@lru_cache, functools |
5-1000x |
Low |
Pre-allocate |
np.zeros, np.empty |
2-10x |
Low |
In-place |
+=, @=, reductions |
1.5-3x |
Trivial |
Data types |
float32 vs float64 |
2x |
Low† |
Parallel |
prange, multiprocessing |
N·(speedup) |
High |
4-Week Learning Curriculum
Week 1: Profiling & Analysis
Day 1-2: cProfile for function-level analysis
Day 3-4: line_profiler for line-by-line bottlenecks
Day 5: memory_profiler and tracemalloc for memory leaks
Week 2: NumPy Vectorization
Day 6-7: Broadcasting rules and shapes
Day 8-9: einsum for advanced tensor operations
Day 10: Avoiding common pitfalls (memory layout, dtype conversions)
Week 3: Numba JIT Compilation
Day 11-12: @njit decorator and type inference
Day 13-14: Automatic parallelization with prange
Day 15: Debugging Numba code (limitations and workarounds)
Week 4: Integration & Advanced Patterns
Day 16-17: Hybrid CPython-Numba code (when to use each)
Day 18-19: Multi-level optimization (profile → vectorize → Numba)
Day 20: Real-world tracking pipeline optimization (target: 10-100x speedup)
Advanced Topics
1. Numba Futures & Parallel Programming
Fine-grained control over parallelization with prange
Thread pool vs process pool trade-offs
GPU acceleration with CUDA (via CuPy or Numba CUDA)
Scaling from single machine to distributed processing
2. Memory Layout & Cache Efficiency
Row-major (C) vs column-major (Fortran) memory layout
L1/L2/L3 cache line utilization (typically 64 bytes)
Memory bandwidth bottlenecks for large matrices
Profiling with perf (Linux) or Instruments (macOS)
3. Algorithmic Improvements
Reducing algorithmic complexity (O(n²) → O(n log n))
Approximate vs exact algorithms (trade accuracy for speed)
Early termination and pruning strategies
Lazy evaluation and streaming algorithms
4. Advanced Caching Strategies
@lru_cache vs @cache (Python 3.9+)
Custom cache implementations (LRU, LFU, ARC)
Cache-aware algorithm design
Memoization of stateful computations
5. Batch Processing & Vectorization
Kalman filter batch operations (vectorized over 100+ tracks)
Association algorithm batch matrices
Resampling algorithms with particle batches
Efficient tensor operations for multi-dimensional data
8 Progressive Exercises
⭐ Beginner: Profile a Tracking Pipeline
import cProfile, pstats
# 1. Create tracking loop with initialization, prediction, update
# 2. Profile with cProfile
# 3. Identify top 3 functions by cumulative time
# 4. Measure total runtime
⭐ Beginner: Vectorize Pairwise Distances
# Replace nested loop with broadcasting:
# Loop: for i: for j: distances[i,j] = ||X[i] - X[j]||
# Vectorized: diff = X[:, np.newaxis] - X[np.newaxis, :]
# Measure speedup for n=1000 points
⭐⭐ Intermediate: Numba JIT Compilation
from numba import njit
# @njit a distance computation function
# Time: Python vs Numba vs NumPy
# Expected: Numba ≈ NumPy > Python (by 10-100x)
⭐⭐ Intermediate: Batch Kalman Filter
# Replace loop-based Kalman for each track with:
# einsum('ij,bj->bi', F, x_batch) # x_pred = F @ x for all batches
# einsum('bij,jk->bik', FP, F) # P_pred = FPF.T for all batches
# Measure speedup for 1000 tracks
⭐⭐⭐ Advanced: Memory-Efficient Particle Filter
# Implement resampling with:
# - In-place operations where possible
# - Minimal temporary array allocations
# - Numba prange for parallel resampling
# Profile memory usage and execution time
⭐⭐⭐ Advanced: Cached Matrix Operations
# Implement @lru_cache for:
# - Covariance computations from fixed state
# - Gate threshold calculations
# - Measurement likelihood computations
# Measure cache hit rate and throughput improvement
⭐⭐⭐⭐ Expert: GPU Acceleration with CuPy
# Port Kalman filter prediction to GPU:
# - Use CuPy arrays (cupy.array)
# - Implement batch matrix operations on GPU
# - Measure speedup vs NumPy (typical: 10-50x for large batches)
# - Optimize GPU memory transfers
⭐⭐⭐⭐ Expert: End-to-End Pipeline Optimization
# Multi-target tracking optimization strategy:
# 1. Profile: Identify bottleneck (e.g., Hungarian assignment)
# 2. Vectorize: Batch-process distance/cost matrices
# 3. Numba: JIT compile inner loops (+100x)
# 4. Cache: Memoize repeated gating computations (+5-10x)
# 5. Goal: 10-100x total speedup on 1000+ track scenario
Complete References
Core References:
VanderPlas, J. (2016). Python Data Science Handbook: Essential Tools for Working with Data. O’Reilly Media.
NumPy vectorization patterns and best practices
Memory layout and performance considerations
Practical examples for data science workflows
Numba Official Documentation. https://numba.pydata.org/
CUDA for GPU acceleration
Performance tips and optimization guide
Limitations and workarounds reference
Gorelick, M., & Ozsvald, I. (2020). High Performance Python: Practical Performant Programming for Humans (2nd ed.). O’Reilly Media.
Comprehensive performance profiling strategies
Pure Python, NumPy, Cython, Numba comparisons
Real-world case studies and optimization tales
Advanced References:
Harris, M., et al. (2017). “High-Performance Computing Pattern Recognition in GPU Architectures”. IEEE Computing in Science & Engineering, 19(4), 34-41.
GPU memory hierarchy and optimization
Batch matrix operations on accelerators
Tracking algorithm GPU implementation patterns
Oliphant, T. (2015). “Guide to NumPy” (2nd ed.). https://numpy.org/doc/stable/
Internal NumPy architecture and broadcasting
Einsum and advanced indexing
Memory layout and performance implications
Intel MKL Documentation. https://software.intel.com/en-us/mkl-developer-reference-c
BLAS/LAPACK optimizations (used by NumPy)
Batch linear algebra operations
Performance tuning for matrix operations
PyTCL API Reference & Performance Characteristics
Key functions optimized for performance:
# Kalman filter (vectorizable)
from pytcl.dynamic_estimation.kalman import (
kf_predict, # O(n³) matrix operations
kf_update, # O(n³) matrix inversion
)
# Profile: inversion dominates; can batch with einsum
# Assignment algorithms (critical path)
from pytcl.assignment_algorithms import (
hungarian, # O(n³) optimal
auction, # O(n²·m·ε⁻¹) often faster in practice
gnn_association, # O(n·m) nearest neighbor (suboptimal)
jpda, # O(2^n) hypothesis enumeration (n=associations)
)
# Profile: Hungarian sparse for large n; use GNN for real-time
# Particle filters (highly vectorizable)
from pytcl.dynamic_estimation.particle_filters import (
resample_systematic, # O(n) but loop-intensive
effective_sample_size, # O(n) lightweight computation
)
# Numba acceleration: 100-1000x for large particle counts (N > 10k)
# Resampling with Numba prange:
# @njit(parallel=True)
# def resample_umkf(weights): ... # Parallelizable resampling
Performance Targets by Component
Component |
Operation |
Current |
Target |
Method |
|---|---|---|---|---|
Kalman |
1000 tracks × 1 step |
50 ms |
5 ms |
Batch einsum |
Hungarian |
500×500 assignment |
100 ms |
5 ms |
Numba or Auction |
Particle Filter |
Resample 100k particles |
200 ms |
10 ms |
Numba prange |
JPDA |
Association for 100 tracks |
500 ms |
50 ms |
GNN or Numba |
Complete Pipeline |
1000 tracks × 100 steps |
1000s |
100s |
All above |