"""
Gaussian Mixture operations for tracking applications.
This module provides Gaussian mixture representation, moment matching,
and mixture reduction algorithms (Runnalls', West's) commonly used in
multi-target tracking for hypothesis pruning.
References
----------
.. [1] A. R. Runnalls, "Kullback-Leibler Approach to Gaussian Mixture Reduction,"
IEEE Trans. Aerospace and Electronic Systems, vol. 43, no. 3, 2007.
.. [2] M. West, "Approximating posterior distributions by mixture,"
Journal of the Royal Statistical Society, Series B, vol. 55, no. 2, 1993.
"""
from typing import List, NamedTuple, Optional, Tuple
import numpy as np
from numpy.typing import ArrayLike, NDArray
[docs]
class GaussianComponent(NamedTuple):
"""A single Gaussian component in a mixture.
Attributes
----------
weight : float
Component weight (0, 1], must sum to 1 across mixture.
mean : ndarray
Mean vector, shape (n,).
covariance : ndarray
Covariance matrix, shape (n, n).
"""
weight: float
mean: NDArray[np.floating]
covariance: NDArray[np.floating]
[docs]
class MergeResult(NamedTuple):
"""Result of merging two Gaussian components.
Attributes
----------
component : GaussianComponent
The merged Gaussian component.
cost : float
KL-divergence-based merge cost.
"""
component: GaussianComponent
cost: float
[docs]
class ReductionResult(NamedTuple):
"""Result of mixture reduction.
Attributes
----------
components : list of GaussianComponent
Reduced set of components.
n_original : int
Original number of components.
n_reduced : int
Reduced number of components.
total_cost : float
Total cost incurred by merging.
"""
components: List[GaussianComponent]
n_original: int
n_reduced: int
total_cost: float
[docs]
def moment_match(
weights: ArrayLike,
means: List[ArrayLike],
covariances: List[ArrayLike],
) -> Tuple[NDArray[np.floating], NDArray[np.floating]]:
"""
Compute moment-matched mean and covariance from mixture components.
Given a Gaussian mixture, computes a single Gaussian that matches
the first two moments (mean and covariance) of the mixture.
Parameters
----------
weights : array_like
Component weights, shape (k,). Must sum to 1.
means : list of array_like
Component means, each shape (n,).
covariances : list of array_like
Component covariances, each shape (n, n).
Returns
-------
mean : ndarray
Moment-matched mean, shape (n,).
covariance : ndarray
Moment-matched covariance, shape (n, n).
Examples
--------
>>> import numpy as np
>>> weights = [0.5, 0.5]
>>> means = [np.array([0., 0.]), np.array([2., 0.])]
>>> covs = [np.eye(2) * 0.1, np.eye(2) * 0.1]
>>> m, P = moment_match(weights, means, covs)
>>> m # Should be [1., 0.]
array([1., 0.])
"""
weights = np.asarray(weights, dtype=np.float64)
means = [np.asarray(m, dtype=np.float64).flatten() for m in means]
covariances = [np.asarray(P, dtype=np.float64) for P in covariances]
k = len(weights)
n = len(means[0])
# Normalize weights
w_sum = np.sum(weights)
if w_sum > 0:
weights = weights / w_sum
# Combined mean: x = sum_i w_i * x_i
mean = np.zeros(n)
for i in range(k):
mean += weights[i] * means[i]
# Combined covariance: P = sum_i w_i * (P_i + (x_i - x)(x_i - x)^T)
covariance = np.zeros((n, n))
for i in range(k):
diff = means[i] - mean
covariance += weights[i] * (covariances[i] + np.outer(diff, diff))
# Ensure symmetry
covariance = (covariance + covariance.T) / 2
return mean, covariance
[docs]
def runnalls_merge_cost(
c1: GaussianComponent,
c2: GaussianComponent,
) -> float:
"""
Compute Runnalls' KL-divergence-based merge cost for two components.
The cost approximates the increase in KL-divergence when merging
components c1 and c2 into a single moment-matched Gaussian.
Parameters
----------
c1 : GaussianComponent
First component.
c2 : GaussianComponent
Second component.
Returns
-------
cost : float
Merge cost (non-negative). Lower cost means components are
more similar and merging causes less information loss.
Notes
-----
The cost function is:
cost = 0.5 * [w_m * log|P_m| - w_1 * log|P_1| - w_2 * log|P_2|]
where w_m = w_1 + w_2, P_m is the moment-matched covariance.
Examples
--------
>>> c1 = GaussianComponent(0.3, np.array([0., 0.]), np.eye(2) * 0.1)
>>> c2 = GaussianComponent(0.2, np.array([0.1, 0.]), np.eye(2) * 0.1) # Close
>>> c3 = GaussianComponent(0.2, np.array([5., 5.]), np.eye(2) * 0.1) # Far
>>> cost_close = runnalls_merge_cost(c1, c2)
>>> cost_far = runnalls_merge_cost(c1, c3)
>>> cost_close < cost_far # Closer components have lower merge cost
True
"""
w1, w2 = c1.weight, c2.weight
w_merged = w1 + w2
if w_merged < 1e-15:
return 0.0
# Merged mean
mean_merged = (w1 * c1.mean + w2 * c2.mean) / w_merged
# Merged covariance
diff1 = c1.mean - mean_merged
diff2 = c2.mean - mean_merged
P_merged = (
w1 * (c1.covariance + np.outer(diff1, diff1))
+ w2 * (c2.covariance + np.outer(diff2, diff2))
) / w_merged
# Ensure numerical stability
P_merged = (P_merged + P_merged.T) / 2
# Compute log determinants with numerical safeguards
try:
log_det_merged = np.linalg.slogdet(P_merged)[1]
log_det_1 = np.linalg.slogdet(c1.covariance)[1]
log_det_2 = np.linalg.slogdet(c2.covariance)[1]
except np.linalg.LinAlgError:
return np.inf
# Cost: 0.5 * (w_m * log|P_m| - w_1 * log|P_1| - w_2 * log|P_2|)
cost = 0.5 * (w_merged * log_det_merged - w1 * log_det_1 - w2 * log_det_2)
return max(0.0, cost) # Ensure non-negative
[docs]
def merge_gaussians(
c1: GaussianComponent,
c2: GaussianComponent,
) -> MergeResult:
"""
Merge two Gaussian components via moment matching.
Parameters
----------
c1 : GaussianComponent
First component.
c2 : GaussianComponent
Second component.
Returns
-------
result : MergeResult
Merged component and merge cost.
Examples
--------
>>> c1 = GaussianComponent(0.3, np.array([0., 0.]), np.eye(2) * 0.1)
>>> c2 = GaussianComponent(0.2, np.array([1., 0.]), np.eye(2) * 0.1)
>>> result = merge_gaussians(c1, c2)
>>> result.component.weight
0.5
"""
w_merged = c1.weight + c2.weight
cost = runnalls_merge_cost(c1, c2)
if w_merged < 1e-15:
# Return first component if both have zero weight
return MergeResult(c1, 0.0)
# Merged mean
mean_merged = (c1.weight * c1.mean + c2.weight * c2.mean) / w_merged
# Merged covariance
diff1 = c1.mean - mean_merged
diff2 = c2.mean - mean_merged
P_merged = (
c1.weight * (c1.covariance + np.outer(diff1, diff1))
+ c2.weight * (c2.covariance + np.outer(diff2, diff2))
) / w_merged
# Ensure symmetry
P_merged = (P_merged + P_merged.T) / 2
merged = GaussianComponent(w_merged, mean_merged, P_merged)
return MergeResult(merged, cost)
[docs]
def prune_mixture(
components: List[GaussianComponent],
weight_threshold: float = 1e-5,
) -> List[GaussianComponent]:
"""
Remove components with weights below threshold and renormalize.
Parameters
----------
components : list of GaussianComponent
Input components.
weight_threshold : float
Components with weight below this are removed.
Returns
-------
pruned : list of GaussianComponent
Pruned and renormalized components.
Examples
--------
>>> comps = [
... GaussianComponent(0.9, np.array([0.]), np.array([[1.]])),
... GaussianComponent(1e-6, np.array([10.]), np.array([[1.]])),
... ]
>>> pruned = prune_mixture(comps, weight_threshold=1e-5)
>>> len(pruned)
1
>>> pruned[0].weight # Renormalized
1.0
"""
# Filter by threshold
surviving = [c for c in components if c.weight >= weight_threshold]
if not surviving:
# Keep highest weight component if all would be pruned
surviving = [max(components, key=lambda c: c.weight)]
# Renormalize weights
total_weight = sum(c.weight for c in surviving)
if total_weight > 0:
surviving = [
GaussianComponent(c.weight / total_weight, c.mean, c.covariance)
for c in surviving
]
return surviving
[docs]
def reduce_mixture_runnalls(
components: List[GaussianComponent],
max_components: int,
weight_threshold: float = 1e-5,
) -> ReductionResult:
"""
Reduce mixture using Runnalls' greedy algorithm.
Iteratively merges the pair of components with the smallest
KL-divergence merge cost until the target number is reached.
Parameters
----------
components : list of GaussianComponent
Input mixture components.
max_components : int
Maximum number of components in output.
weight_threshold : float
Components below this weight are pruned first.
Returns
-------
result : ReductionResult
Reduced mixture with cost information.
References
----------
.. [1] A. R. Runnalls, "Kullback-Leibler Approach to Gaussian Mixture
Reduction," IEEE Trans. Aerospace and Electronic Systems, 2007.
Examples
--------
>>> import numpy as np
>>> # 4 components, reduce to 2
>>> comps = [
... GaussianComponent(0.25, np.array([0., 0.]), np.eye(2) * 0.1),
... GaussianComponent(0.25, np.array([0.1, 0.]), np.eye(2) * 0.1),
... GaussianComponent(0.25, np.array([5., 5.]), np.eye(2) * 0.1),
... GaussianComponent(0.25, np.array([5.1, 5.]), np.eye(2) * 0.1),
... ]
>>> result = reduce_mixture_runnalls(comps, max_components=2)
>>> len(result.components)
2
"""
n_original = len(components)
if n_original == 0:
return ReductionResult([], 0, 0, 0.0)
# First, prune low-weight components
working = prune_mixture(components, weight_threshold)
# If already at or below target, return
if len(working) <= max_components:
return ReductionResult(working, n_original, len(working), 0.0)
total_cost = 0.0
# Greedy merging
while len(working) > max_components:
n = len(working)
min_cost = np.inf
best_i, best_j = 0, 1
# Find pair with minimum merge cost
for i in range(n):
for j in range(i + 1, n):
cost = runnalls_merge_cost(working[i], working[j])
if cost < min_cost:
min_cost = cost
best_i, best_j = i, j
# Merge the best pair
merged = merge_gaussians(working[best_i], working[best_j])
total_cost += merged.cost
# Build new list: remove i and j, add merged
new_working = []
for k in range(n):
if k != best_i and k != best_j:
new_working.append(working[k])
new_working.append(merged.component)
working = new_working
# Renormalize final weights
total_weight = sum(c.weight for c in working)
if total_weight > 0:
working = [
GaussianComponent(c.weight / total_weight, c.mean, c.covariance)
for c in working
]
return ReductionResult(working, n_original, len(working), total_cost)
[docs]
def west_merge_cost(
c1: GaussianComponent,
c2: GaussianComponent,
) -> float:
"""
Compute West's merge cost for two components.
West's algorithm uses a simpler cost based on weighted Mahalanobis
distance between component means.
Parameters
----------
c1 : GaussianComponent
First component.
c2 : GaussianComponent
Second component.
Returns
-------
cost : float
Merge cost based on weighted mean separation.
Examples
--------
>>> c1 = GaussianComponent(0.3, np.array([0., 0.]), np.eye(2) * 0.1)
>>> c2 = GaussianComponent(0.2, np.array([0.1, 0.]), np.eye(2) * 0.1) # Close
>>> c3 = GaussianComponent(0.2, np.array([5., 5.]), np.eye(2) * 0.1) # Far
>>> cost_close = west_merge_cost(c1, c2)
>>> cost_far = west_merge_cost(c1, c3)
>>> cost_close < cost_far # Closer components have lower merge cost
True
"""
w1, w2 = c1.weight, c2.weight
w_merged = w1 + w2
if w_merged < 1e-15:
return 0.0
# Mean difference
diff = c1.mean - c2.mean
# Use average covariance for distance computation
P_avg = (w1 * c1.covariance + w2 * c2.covariance) / w_merged
try:
# Mahalanobis distance squared
P_inv = np.linalg.inv(P_avg)
mahal_sq = diff @ P_inv @ diff
except np.linalg.LinAlgError:
return np.inf
# West's cost: (w1 * w2 / w_merged) * d^2
cost = (w1 * w2 / w_merged) * mahal_sq
return max(0.0, cost)
[docs]
def reduce_mixture_west(
components: List[GaussianComponent],
max_components: int,
weight_threshold: float = 1e-5,
) -> ReductionResult:
"""
Reduce mixture using West's algorithm.
Similar to Runnalls' but uses a simpler cost function based on
weighted Mahalanobis distance between means.
Parameters
----------
components : list of GaussianComponent
Input mixture components.
max_components : int
Maximum number of components in output.
weight_threshold : float
Components below this weight are pruned first.
Returns
-------
result : ReductionResult
Reduced mixture with cost information.
References
----------
.. [1] M. West, "Approximating posterior distributions by mixture,"
Journal of the Royal Statistical Society, Series B, 1993.
Examples
--------
>>> import numpy as np
>>> comps = [
... GaussianComponent(0.25, np.array([0., 0.]), np.eye(2) * 0.1),
... GaussianComponent(0.25, np.array([0.1, 0.]), np.eye(2) * 0.1),
... GaussianComponent(0.25, np.array([5., 5.]), np.eye(2) * 0.1),
... GaussianComponent(0.25, np.array([5.1, 5.]), np.eye(2) * 0.1),
... ]
>>> result = reduce_mixture_west(comps, max_components=2)
>>> len(result.components)
2
"""
n_original = len(components)
if n_original == 0:
return ReductionResult([], 0, 0, 0.0)
# First, prune low-weight components
working = prune_mixture(components, weight_threshold)
# If already at or below target, return
if len(working) <= max_components:
return ReductionResult(working, n_original, len(working), 0.0)
total_cost = 0.0
# Greedy merging using West's cost
while len(working) > max_components:
n = len(working)
min_cost = np.inf
best_i, best_j = 0, 1
# Find pair with minimum merge cost
for i in range(n):
for j in range(i + 1, n):
cost = west_merge_cost(working[i], working[j])
if cost < min_cost:
min_cost = cost
best_i, best_j = i, j
# Merge the best pair
merged = merge_gaussians(working[best_i], working[best_j])
total_cost += min_cost
# Build new list: remove i and j, add merged
new_working = []
for k in range(n):
if k != best_i and k != best_j:
new_working.append(working[k])
new_working.append(merged.component)
working = new_working
# Renormalize final weights
total_weight = sum(c.weight for c in working)
if total_weight > 0:
working = [
GaussianComponent(c.weight / total_weight, c.mean, c.covariance)
for c in working
]
return ReductionResult(working, n_original, len(working), total_cost)
[docs]
class GaussianMixture:
"""
Gaussian Mixture representation with reduction operations.
Provides an object-oriented interface for Gaussian mixture operations
including density evaluation, sampling, and reduction.
Parameters
----------
components : list of GaussianComponent, optional
Initial components. If None, creates empty mixture.
Attributes
----------
components : list of GaussianComponent
Current mixture components.
Examples
--------
>>> import numpy as np
>>> gm = GaussianMixture()
>>> gm.add_component(0.5, np.array([0., 0.]), np.eye(2) * 0.1)
>>> gm.add_component(0.5, np.array([2., 2.]), np.eye(2) * 0.1)
>>> len(gm)
2
>>> gm.mean # Mixture mean
array([1., 1.])
"""
[docs]
def __init__(self, components: Optional[List[GaussianComponent]] = None):
if components is None:
self.components: List[GaussianComponent] = []
else:
self.components = list(components)
def __len__(self) -> int:
return len(self.components)
[docs]
def add_component(
self,
weight: float,
mean: ArrayLike,
covariance: ArrayLike,
) -> None:
"""
Add a component to the mixture.
Parameters
----------
weight : float
Component weight.
mean : array_like
Mean vector.
covariance : array_like
Covariance matrix.
"""
mean = np.asarray(mean, dtype=np.float64).flatten()
covariance = np.asarray(covariance, dtype=np.float64)
self.components.append(GaussianComponent(weight, mean, covariance))
[docs]
def normalize_weights(self) -> None:
"""Normalize component weights to sum to 1."""
total = sum(c.weight for c in self.components)
if total > 0:
self.components = [
GaussianComponent(c.weight / total, c.mean, c.covariance)
for c in self.components
]
@property
def weights(self) -> NDArray[np.floating]:
"""Component weights as array."""
return np.array([c.weight for c in self.components])
@property
def means(self) -> List[NDArray[np.floating]]:
"""List of component means."""
return [c.mean for c in self.components]
@property
def covariances(self) -> List[NDArray[np.floating]]:
"""List of component covariances."""
return [c.covariance for c in self.components]
@property
def mean(self) -> NDArray[np.floating]:
"""Mixture mean (moment-matched)."""
if not self.components:
raise ValueError("Mixture is empty")
m, _ = moment_match(self.weights, self.means, self.covariances)
return m
@property
def covariance(self) -> NDArray[np.floating]:
"""Mixture covariance (moment-matched)."""
if not self.components:
raise ValueError("Mixture is empty")
_, P = moment_match(self.weights, self.means, self.covariances)
return P
@property
def dim(self) -> int:
"""Dimension of the state space."""
if not self.components:
return 0
return len(self.components[0].mean)
[docs]
def pdf(self, x: ArrayLike) -> float:
"""
Evaluate mixture probability density at x.
Parameters
----------
x : array_like
Point at which to evaluate density.
Returns
-------
density : float
Mixture probability density.
"""
x = np.asarray(x, dtype=np.float64).flatten()
if not self.components:
return 0.0
density = 0.0
for c in self.components:
density += c.weight * self._gaussian_pdf(x, c.mean, c.covariance)
return density
def _gaussian_pdf(
self,
x: NDArray[np.floating],
mean: NDArray[np.floating],
cov: NDArray[np.floating],
) -> float:
"""Evaluate single Gaussian PDF."""
n = len(x)
diff = x - mean
try:
cov_inv = np.linalg.inv(cov)
sign, log_det = np.linalg.slogdet(cov)
if sign <= 0:
return 0.0
mahal_sq = diff @ cov_inv @ diff
log_pdf = -0.5 * (n * np.log(2 * np.pi) + log_det + mahal_sq)
return np.exp(log_pdf)
except np.linalg.LinAlgError:
return 0.0
[docs]
def sample(
self,
n_samples: int = 1,
rng: Optional[np.random.Generator] = None,
) -> NDArray[np.floating]:
"""
Draw samples from the mixture.
Parameters
----------
n_samples : int
Number of samples to draw.
rng : numpy.random.Generator, optional
Random number generator.
Returns
-------
samples : ndarray
Samples, shape (n_samples, n) or (n,) if n_samples=1.
"""
if not self.components:
raise ValueError("Mixture is empty")
if rng is None:
rng = np.random.default_rng()
# Choose components based on weights
weights = self.weights
weights = weights / weights.sum() # Normalize
component_indices = rng.choice(len(self.components), size=n_samples, p=weights)
# Sample from chosen components
samples = []
for idx in component_indices:
c = self.components[idx]
sample = rng.multivariate_normal(c.mean, c.covariance)
samples.append(sample)
samples = np.array(samples)
if n_samples == 1:
return samples[0]
return samples
[docs]
def prune(self, weight_threshold: float = 1e-5) -> "GaussianMixture":
"""
Remove low-weight components.
Parameters
----------
weight_threshold : float
Minimum weight to retain.
Returns
-------
pruned : GaussianMixture
New mixture with low-weight components removed.
"""
pruned = prune_mixture(self.components, weight_threshold)
return GaussianMixture(pruned)
[docs]
def reduce_runnalls(self, max_components: int) -> "GaussianMixture":
"""
Reduce mixture using Runnalls' algorithm.
Parameters
----------
max_components : int
Maximum number of components.
Returns
-------
reduced : GaussianMixture
Reduced mixture.
"""
result = reduce_mixture_runnalls(self.components, max_components)
return GaussianMixture(result.components)
[docs]
def reduce_west(self, max_components: int) -> "GaussianMixture":
"""
Reduce mixture using West's algorithm.
Parameters
----------
max_components : int
Maximum number of components.
Returns
-------
reduced : GaussianMixture
Reduced mixture.
"""
result = reduce_mixture_west(self.components, max_components)
return GaussianMixture(result.components)
[docs]
def copy(self) -> "GaussianMixture":
"""Create a deep copy of the mixture."""
return GaussianMixture(
[
GaussianComponent(c.weight, c.mean.copy(), c.covariance.copy())
for c in self.components
]
)
__all__ = [
"GaussianComponent",
"MergeResult",
"ReductionResult",
"moment_match",
"runnalls_merge_cost",
"merge_gaussians",
"prune_mixture",
"reduce_mixture_runnalls",
"west_merge_cost",
"reduce_mixture_west",
"GaussianMixture",
]