Particle Filters: Sequential Monte Carlo Methods

This notebook covers particle filtering techniques for nonlinear, non-Gaussian state estimation. We explore:

  1. Bootstrap Particle Filter - The fundamental SIR algorithm

  2. Resampling Strategies - Multinomial, systematic, residual, stratified

  3. Degeneracy and Effective Sample Size - Diagnosing filter health

  4. Effect of Particle Count - Accuracy vs computation trade-offs

Prerequisites

pip install nrl-tracker plotly numpy
[2]:
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import time

from pytcl.dynamic_estimation.particle_filters import (
    bootstrap_pf_predict, bootstrap_pf_update, bootstrap_pf_step,
    resample_multinomial, resample_systematic, resample_residual,
    effective_sample_size, particle_mean, particle_covariance,
    initialize_particles, gaussian_likelihood,
)

print("✓ PyTCL particle filter modules and Plotly imported successfully")

np.random.seed(42)

# Plotly dark theme template
dark_template = go.layout.Template()
dark_template.layout = go.Layout(
    paper_bgcolor='#0d1117',
    plot_bgcolor='#0d1117',
    font=dict(color='#e6edf3'),
    xaxis=dict(gridcolor='#30363d', zerolinecolor='#30363d'),
    yaxis=dict(gridcolor='#30363d', zerolinecolor='#30363d'),
)
✓ PyTCL particle filter modules and Plotly imported successfully

1. Bootstrap Particle Filter

The bootstrap particle filter (Sequential Importance Resampling) represents the posterior distribution using a set of weighted samples:

\[p(x_k | z_{1:k}) \approx \sum_{i=1}^{N} w_k^{(i)} \delta(x - x_k^{(i)})\]

Algorithm:

  1. Prediction: Propagate particles through dynamics

  2. Update: Compute likelihood weights

  3. Resample: Eliminate low-weight particles

Example: Highly Nonlinear System

[3]:
# Nonlinear state-space model
def f_nonlinear(x, k):
    """Highly nonlinear dynamics."""
    return x / 2 + 25 * x / (1 + x**2) + 8 * np.cos(1.2 * k)

def h_nonlinear(x):
    """Nonlinear measurement."""
    return x**2 / 20

# Noise parameters
Q = 10.0  # Process noise variance
R = 1.0   # Measurement noise variance

# Generate true trajectory and measurements
n_steps = 100
true_states = [0.0]
measurements = []

for k in range(n_steps):
    # Propagate state
    x_new = f_nonlinear(true_states[-1], k) + np.random.normal(0, np.sqrt(Q))
    true_states.append(x_new)

    # Generate measurement
    z = h_nonlinear(x_new) + np.random.normal(0, np.sqrt(R))
    measurements.append(z)

true_states = np.array(true_states)
measurements = np.array(measurements)

print(f"State range: [{true_states.min():.1f}, {true_states.max():.1f}]")
State range: [-24.8, 19.4]
[4]:
# Particle filter parameters
N_particles = 500

# Initialize particles from prior
particles = np.random.normal(0, np.sqrt(10), N_particles)  # Initial uncertainty
weights = np.ones(N_particles) / N_particles

# Storage for results
pf_estimates = [np.average(particles, weights=weights)]
pf_variances = [np.average((particles - pf_estimates[0])**2, weights=weights)]
ess_history = [effective_sample_size(weights)]

print(f"Initialized {N_particles} particles")
print(f"Initial ESS: {ess_history[0]:.1f}")
Initialized 500 particles
Initial ESS: 500.0
[6]:
# Run particle filter
ESS_threshold = N_particles / 2  # Resample when ESS drops below this
rng = np.random.default_rng(42)

for k, z in enumerate(measurements):
    # Prediction: propagate particles
    particles_pred = np.array([f_nonlinear(p, k) for p in particles])
    particles_pred += np.random.normal(0, np.sqrt(Q), N_particles)

    # Update: compute likelihood weights
    z_pred = np.array([h_nonlinear(p) for p in particles_pred])
    likelihoods = np.exp(-0.5 * (z - z_pred)**2 / R)
    weights = weights * likelihoods
    weights = weights / np.sum(weights)  # Normalize

    # Compute ESS
    ess = effective_sample_size(weights)
    ess_history.append(ess)

    # Resample if needed
    if ess < ESS_threshold:
        particles = resample_systematic(particles_pred, weights, rng)
        weights = np.ones(N_particles) / N_particles
    else:
        particles = particles_pred

    # Store estimates
    mean = np.average(particles, weights=weights)
    var = np.average((particles - mean)**2, weights=weights)
    pf_estimates.append(mean)
    pf_variances.append(var)

pf_estimates = np.array(pf_estimates)
pf_variances = np.array(pf_variances)

print(f"Minimum ESS: {min(ess_history):.1f}")
print(f"Resampling events: {sum(1 for e in ess_history if e < ESS_threshold)}")
Minimum ESS: 2.0
Resampling events: 78
[7]:
# Visualization
fig = make_subplots(
    rows=3, cols=1,
    subplot_titles=('Particle Filter State Estimation',
                    f'Absolute Error (RMSE: {np.sqrt(np.mean((pf_estimates - true_states)**2)):.3f})',
                    'Effective Sample Size'),
    vertical_spacing=0.1
)

time = np.arange(len(true_states))

# State estimation
fig.add_trace(
    go.Scatter(x=time, y=pf_estimates + 2*np.sqrt(pf_variances), mode='lines',
               line=dict(width=0), showlegend=False, hoverinfo='skip'),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=time, y=pf_estimates - 2*np.sqrt(pf_variances), mode='lines',
               fill='tonexty', fillcolor='rgba(0, 212, 255, 0.3)',
               line=dict(width=0), name='±2σ'),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=time, y=true_states, mode='lines',
               name='True state', line=dict(color='#00ff88', width=2)),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=time, y=pf_estimates, mode='lines',
               name='PF estimate', line=dict(color='#00d4ff', width=1.5, dash='dash')),
    row=1, col=1
)

# Estimation error
error = np.abs(pf_estimates - true_states)
fig.add_trace(
    go.Scatter(x=time, y=error, mode='lines',
               name='|Error|', line=dict(color='#ff4757', width=1.5)),
    row=2, col=1
)

# Effective sample size
fig.add_trace(
    go.Scatter(x=time, y=ess_history, mode='lines',
               name='ESS', line=dict(color='#a855f7', width=1.5)),
    row=3, col=1
)
fig.add_trace(
    go.Scatter(x=[time[0], time[-1]], y=[ESS_threshold, ESS_threshold], mode='lines',
               name=f'Threshold ({ESS_threshold:.0f})', line=dict(color='#ff4757', dash='dash')),
    row=3, col=1
)

fig.update_layout(
    template=dark_template,
    height=700,
    showlegend=True,
    legend=dict(x=1.02, y=0.5)
)
fig.update_yaxes(title_text='State', row=1, col=1)
fig.update_yaxes(title_text='|Error|', row=2, col=1)
fig.update_yaxes(title_text='ESS', row=3, col=1)
fig.update_xaxes(title_text='Time step', row=3, col=1)

fig.show()

Data type cannot be displayed: application/vnd.plotly.v1+json

2. Resampling Strategies Comparison

Different resampling methods have different variance and computation properties:

\[\begin{split}\begin{array}{l|l|l|l} \text{Method} & \text{Variance} & \text{Computation} & \text{Notes} \\ \hline \text{Multinomial} & \text{High} & O(N \log N) & \text{Simple, but high variance} \\ \text{Systematic} & \text{Low} & O(N) & \text{Single random number, best balance} \\ \text{Residual} & \text{Very Low} & O(N) & \text{Deterministic + random hybrid} \\ \end{array}\end{split}\]

Key Properties

  • Multinomial: Most straightforward approach but suffers from high variance

  • Systematic: Uses a single random number for all particles; preserves good diversity with low variance

  • Residual: Combines deterministic copying of high-weight particles with random resampling of the remainder

[10]:
# Compare resampling methods
def run_pf_with_resampler(resampler_func, name):
    """Run particle filter with specified resampling method."""
    np.random.seed(42)
    n_pf = 500
    particles = np.random.normal(0, np.sqrt(10), (n_pf, 1))  # Shape (N, 1) for resampler
    weights = np.ones(n_pf) / n_pf
    estimates = [np.average(particles, axis=0, weights=weights)[0]]

    rng = np.random.default_rng(42)
    for k, z in enumerate(measurements):
        # Predict
        particles_pred = np.array([[f_nonlinear(p[0], k)] for p in particles])
        particles_pred += np.random.normal(0, np.sqrt(Q), (n_pf, 1))

        # Update weights
        z_pred = np.array([h_nonlinear(p[0]) for p in particles_pred])
        likelihoods = np.exp(-0.5 * (z - z_pred)**2 / R)
        weights = weights * likelihoods
        weights = weights / np.sum(weights)

        # Resample
        ess = effective_sample_size(weights)
        if ess < n_pf / 2:
            particles = resampler_func(particles_pred, weights, rng)
            weights = np.ones(n_pf) / n_pf
        else:
            particles = particles_pred

        estimates.append(np.average(particles, axis=0, weights=weights)[0])

    rmse = np.sqrt(np.mean((np.array(estimates) - true_states)**2))
    return np.array(estimates), rmse

# Run with each resampler
resamplers = [
    (resample_multinomial, 'Multinomial'),
    (resample_systematic, 'Systematic'),
    (resample_residual, 'Residual'),
]

results = {}
for resampler_func, name in resamplers:
    est, rmse = run_pf_with_resampler(resampler_func, name)
    results[name] = {'estimates': est, 'rmse': rmse}
    print(f"{name:12s}: RMSE = {rmse:.4f}")
Multinomial : RMSE = 4.7336
Systematic  : RMSE = 4.9128
Residual    : RMSE = 4.7257
[11]:
# Visualize differences (use subset for clarity)
fig = go.Figure()

fig.add_trace(
    go.Scatter(x=time, y=true_states, mode='lines',
               name='True', line=dict(color='white', width=2))
)

colors = ['#00d4ff', '#ff4757', '#00ff88', '#ffb800']
for (name, data), color in zip(results.items(), colors):
    fig.add_trace(
        go.Scatter(x=time, y=data['estimates'], mode='lines',
                   name=f"{name} (RMSE={data['rmse']:.3f})",
                   line=dict(color=color, width=1.5, dash='dash'), opacity=0.8)
    )

fig.update_layout(
    template=dark_template,
    title='Resampling Method Comparison',
    xaxis_title='Time step',
    yaxis_title='State',
    xaxis=dict(range=[0, 30]),  # Zoom in for detail
    height=450
)
fig.show()

Data type cannot be displayed: application/vnd.plotly.v1+json

3. Particle Degeneracy and Weight Distribution

A key challenge in particle filtering is degeneracy: after a few iterations, most particles have negligible weight. The Effective Sample Size (ESS) measures this:

\[\text{ESS} = \frac{1}{\sum_{i=1}^{N} (w^{(i)})^2}\]

When ESS drops significantly below N, resampling is needed.

[12]:
# Demonstrate weight degeneracy without resampling
np.random.seed(123)
particles = np.random.normal(0, np.sqrt(10), N_particles)
weights = np.ones(N_particles) / N_particles

ess_no_resample = [effective_sample_size(weights)]
weight_entropy = []

for k, z in enumerate(measurements[:50]):  # First 50 steps
    # Predict
    particles = np.array([f_nonlinear(p, k) for p in particles])
    particles += np.random.normal(0, np.sqrt(Q), N_particles)

    # Update WITHOUT resampling
    z_pred = np.array([h_nonlinear(p) for p in particles])
    likelihoods = np.exp(-0.5 * (z - z_pred)**2 / R)
    weights = weights * likelihoods
    weights = weights / np.sum(weights)

    ess_no_resample.append(effective_sample_size(weights))

    # Entropy of weights (measure of spread)
    w_nonzero = weights[weights > 1e-300]
    entropy = -np.sum(w_nonzero * np.log(w_nonzero))
    weight_entropy.append(entropy)

fig = make_subplots(rows=1, cols=2,
                    subplot_titles=('ESS Collapse Without Resampling', 'Weight Distribution Entropy'))

fig.add_trace(
    go.Scatter(x=list(range(len(ess_no_resample))), y=ess_no_resample, mode='lines',
               name='ESS', line=dict(color='#00d4ff', width=2)),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=[0, len(ess_no_resample)], y=[1, 1], mode='lines',
               name='Complete degeneracy', line=dict(color='#ff4757', dash='dash')),
    row=1, col=1
)

fig.add_trace(
    go.Scatter(x=list(range(len(weight_entropy))), y=weight_entropy, mode='lines',
               name='Entropy', line=dict(color='#00ff88', width=2)),
    row=1, col=2
)

fig.update_layout(
    template=dark_template,
    height=350,
    showlegend=True
)
fig.update_yaxes(type='log', title_text='ESS (log scale)', row=1, col=1)
fig.update_yaxes(title_text='Weight Entropy', row=1, col=2)
fig.update_xaxes(title_text='Time step', row=1, col=1)
fig.update_xaxes(title_text='Time step', row=1, col=2)

fig.show()

print(f"ESS at step 0: {ess_no_resample[0]:.1f}")
print(f"ESS at step 10: {ess_no_resample[10]:.2f}")
print(f"ESS at step 50: {ess_no_resample[50]:.2f}")

Data type cannot be displayed: application/vnd.plotly.v1+json

ESS at step 0: 500.0
ESS at step 10: 1.00
ESS at step 50: 1.00

4. Effect of Number of Particles

More particles generally lead to better estimation, but with diminishing returns. The computational cost scales linearly with particle count.

[14]:
import time

particle_counts = [50, 100, 200, 500, 1000, 2000]
rmse_vs_particles = []
time_vs_particles = []

for N in particle_counts:
    np.random.seed(42)
    particles = np.random.normal(0, np.sqrt(10), (N, 1))
    weights = np.ones(N) / N
    estimates = []

    start = time.time()
    rng = np.random.default_rng(42)
    for k, z in enumerate(measurements):
        particles_pred = np.array([[f_nonlinear(p[0], k)] for p in particles])
        particles_pred += np.random.normal(0, np.sqrt(Q), (N, 1))

        z_pred = np.array([h_nonlinear(p[0]) for p in particles_pred])
        likelihoods = np.exp(-0.5 * (z - z_pred)**2 / R)
        weights = weights * likelihoods
        weights = weights / np.sum(weights)

        if effective_sample_size(weights) < N/2:
            particles = resample_systematic(particles_pred, weights, rng)
            weights = np.ones(N) / N
        else:
            particles = particles_pred

        estimates.append(np.average(particles, axis=0, weights=weights)[0])

    elapsed = time.time() - start
    rmse = np.sqrt(np.mean((np.array(estimates) - true_states[1:])**2))
    rmse_vs_particles.append(rmse)
    time_vs_particles.append(elapsed)
    print(f"N={N:5d}: RMSE={rmse:.4f}, Time={elapsed:.3f}s")
N=   50: RMSE=8.2356, Time=0.012s
N=  100: RMSE=5.2342, Time=0.015s
N=  200: RMSE=5.0700, Time=0.023s
N=  500: RMSE=4.9373, Time=0.048s
N= 1000: RMSE=4.8166, Time=0.094s
N= 2000: RMSE=4.7384, Time=0.187s
[15]:
fig = make_subplots(rows=1, cols=2,
                    subplot_titles=('Accuracy vs Particle Count', 'Computation Time vs Particle Count'))

fig.add_trace(
    go.Scatter(x=particle_counts, y=rmse_vs_particles, mode='lines+markers',
               name='RMSE', line=dict(color='#00d4ff', width=2),
               marker=dict(size=8)),
    row=1, col=1
)

fig.add_trace(
    go.Scatter(x=particle_counts, y=time_vs_particles, mode='lines+markers',
               name='Time', line=dict(color='#ff4757', width=2),
               marker=dict(size=8)),
    row=1, col=2
)

fig.update_layout(
    template=dark_template,
    height=350,
    showlegend=False
)
fig.update_xaxes(type='log', title_text='Number of Particles', row=1, col=1)
fig.update_xaxes(type='log', title_text='Number of Particles', row=1, col=2)
fig.update_yaxes(title_text='RMSE', row=1, col=1)
fig.update_yaxes(type='log', title_text='Computation Time (s)', row=1, col=2)

fig.show()

Data type cannot be displayed: application/vnd.plotly.v1+json

Summary

Key takeaways:

  1. Particle filters handle arbitrary nonlinear and non-Gaussian systems

  2. Resampling is critical to prevent weight degeneracy

  3. Systematic resampling offers the best variance-computation trade-off

  4. ESS monitoring helps detect filter health issues

  5. More particles improve accuracy but with diminishing returns

Exercises

Exercise 1: Adaptive Resampling Threshold

Modify the filter to adjust the ESS threshold dynamically based on recent history. Track how it affects total resampling events.

Exercise 2: Outlier Robustness

Add outlier measurements to the stream and observe:

  • How filter estimates diverge when outliers appear

  • Recovery time after outlier disappears

  • Compare with Kalman filters on same data

Exercise 3: Multi-Modal Distributions

Implement a mixture model as the proposal distribution to better capture multimodal posterior distributions. Measure improvements in likelihood.

Exercise 4: GPU Acceleration

Modify the particle propagation loop to use CuPy (if GPU available) for batch operations on large particle sets (10,000+).

Exercise 5: Gaussian Process Proposal

Replace the simple dynamics propagation with a learned Gaussian process model for the proposal distribution.

Advanced Topics

Approximate Bayesian Computation (ABC)

When the likelihood is intractable, ABC methods use summary statistics:

  • Define acceptable tolerance distance

  • Run forward simulations

  • Keep particles where simulated data ≈ observed data

Sequential Monte Carlo Samplers (SMCS)

Alternative to time-series filtering; sample from evolving posterior distributions:

  • Temperature annealing: \(p_t(x) \propto p(x)^{t/T}\)

  • Tempering resample particles according to target likelihood changes

Rao-Blackwellized Particle Filter (RBPF)

Marginalize linear subspace to reduce particle requirement:

  • Split state: \(\mathbf{x} = [\mathbf{x}_{nl}, \mathbf{x}_l]\)

  • Use Kalman filter for \(\mathbf{x}_{l} | \mathbf{x}_{nl}, \mathbf{z}_{1:t}\)

  • Only resample \(\mathbf{x}_{nl}\)

References

Core References

  1. Arulampalam, M. S., et al. (2002). “A Tutorial on Particle Filters for Online Nonlinear/Non-Gaussian Bayesian Tracking.” IEEE Transactions on Signal Processing, 50(2), 174-188.

    • Best for: Foundational tutorial with excellent derivations

  2. Doucet, A., & Johansen, A. M. (2009). “A Tutorial on Particle Filtering and Smoothing: Fifteen Years Later.” Handbook of Nonlinear Filtering, 12(656-704).

    • Best for: Comprehensive modern reference with applications

  3. Ristic, B., Arulampalam, S., & Gordon, N. (2004). Beyond the Kalman Filter: Particle Filters for Tracking Applications. Artech House.

    • Best for: Practical implementation guidance

Advanced Methods

  1. Persson, A., Mehta, N., & Hendeby, G. (2020). “Detecting Particle Filter Divergence.” IEEE Conference on Decision and Control.

    • Coverage: Divergence detection, N_eff monitoring

  2. Andrieu, C., Doucet, A., & Holenstein, R. (2010). “Particle Markov chain Monte Carlo methods.” Journal of the Royal Statistical Society, 72(3), 269-342.

    • Best for: PMCMC methods extending particle filters

PyTCL Documentation

  • Particle Filter Module: pytcl.dynamic_estimation.particle_filters

  • GPU Acceleration: pytcl.gpu.particle_filter (CuPy backend)

  • Example Gallery: See examples/particle_filters.py for applications