"""
Hierarchical (Agglomerative) Clustering.
Hierarchical clustering builds a tree of clusters by iteratively
merging the closest pairs. This is useful for track fusion and
understanding cluster relationships.
References
----------
.. [1] S. C. Johnson, "Hierarchical clustering schemes,"
Psychometrika, 1967.
"""
from enum import Enum
from typing import Any, List, Literal, NamedTuple, Optional
import numpy as np
from numba import njit
from numpy.typing import ArrayLike, NDArray
[docs]
class LinkageType(Enum):
"""Linkage methods for hierarchical clustering."""
SINGLE = "single" # Minimum distance
COMPLETE = "complete" # Maximum distance
AVERAGE = "average" # Average distance
WARD = "ward" # Ward's minimum variance
[docs]
class DendrogramNode(NamedTuple):
"""A node in the dendrogram (merge tree).
Attributes
----------
left : int
Index of left child (negative for original samples).
right : int
Index of right child (negative for original samples).
distance : float
Distance/dissimilarity at which merge occurred.
count : int
Number of original samples in this cluster.
"""
left: int
right: int
distance: float
count: int
[docs]
class HierarchicalResult(NamedTuple):
"""Result of hierarchical clustering.
Attributes
----------
labels : ndarray
Cluster labels for each point, shape (n_samples,).
n_clusters : int
Number of clusters.
linkage_matrix : ndarray
Linkage matrix of shape (n_samples-1, 4).
Each row [i, j, dist, count] represents a merge.
dendrogram : list of DendrogramNode
List of merge operations.
"""
labels: NDArray[np.intp]
n_clusters: int
linkage_matrix: NDArray[np.floating]
dendrogram: List[DendrogramNode]
@njit(cache=True)
def _compute_distance_matrix_jit(X: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
"""JIT-compiled pairwise Euclidean distance computation."""
n = X.shape[0]
n_features = X.shape[1]
distances = np.zeros((n, n), dtype=np.float64)
for i in range(n):
for j in range(i + 1, n):
d = 0.0
for k in range(n_features):
diff = X[i, k] - X[j, k]
d += diff * diff
d = np.sqrt(d)
distances[i, j] = d
distances[j, i] = d
return distances
[docs]
def compute_distance_matrix(
X: NDArray[np.floating],
) -> NDArray[np.floating]:
"""
Compute pairwise Euclidean distance matrix.
Parameters
----------
X : ndarray
Data points, shape (n_samples, n_features).
Returns
-------
distances : ndarray
Distance matrix, shape (n_samples, n_samples).
Examples
--------
>>> X = np.array([[0.0, 0.0], [1.0, 0.0], [0.0, 1.0]])
>>> D = compute_distance_matrix(X)
>>> D.shape
(3, 3)
>>> D[0, 1] # Distance between points 0 and 1
1.0
"""
X = np.asarray(X, dtype=np.float64)
return _compute_distance_matrix_jit(X)
def _single_linkage(
dist_i: NDArray[Any],
dist_j: NDArray[Any],
size_i: int,
size_j: int,
) -> NDArray[Any]:
"""Single linkage: minimum of distances."""
return np.minimum(dist_i, dist_j)
def _complete_linkage(
dist_i: NDArray[Any],
dist_j: NDArray[Any],
size_i: int,
size_j: int,
) -> NDArray[Any]:
"""Complete linkage: maximum of distances."""
return np.maximum(dist_i, dist_j)
def _average_linkage(
dist_i: NDArray[Any],
dist_j: NDArray[Any],
size_i: int,
size_j: int,
) -> NDArray[Any]:
"""Average linkage: weighted average of distances."""
return (size_i * dist_i + size_j * dist_j) / (size_i + size_j)
def _ward_linkage(
dist_i: NDArray[Any],
dist_j: NDArray[Any],
size_i: int,
size_j: int,
size_k: NDArray[Any],
dist_ij: float,
) -> NDArray[Any]:
"""Ward's linkage: minimum variance merge."""
total = size_i + size_j + size_k
return np.sqrt(
(
(size_i + size_k) * dist_i**2
+ (size_j + size_k) * dist_j**2
- size_k * dist_ij**2
)
/ total
)
[docs]
def agglomerative_clustering(
X: ArrayLike,
n_clusters: Optional[int] = None,
distance_threshold: Optional[float] = None,
linkage: Literal["single", "complete", "average", "ward"] = "ward",
) -> HierarchicalResult:
"""
Agglomerative hierarchical clustering.
Recursively merges pairs of clusters that minimize the linkage
criterion until the desired number of clusters is reached.
Parameters
----------
X : array_like
Data points, shape (n_samples, n_features).
n_clusters : int, optional
Number of clusters to find. If None, uses distance_threshold.
distance_threshold : float, optional
Distance threshold for cluster merging. Merging stops when
the minimum linkage distance exceeds this. If None, uses n_clusters.
linkage : {'single', 'complete', 'average', 'ward'}
Linkage criterion. Default 'ward'.
Returns
-------
result : HierarchicalResult
Clustering result with labels and dendrogram.
Examples
--------
>>> import numpy as np
>>> rng = np.random.default_rng(42)
>>> X = np.vstack([rng.normal(0, 0.5, (20, 2)),
... rng.normal(3, 0.5, (20, 2))])
>>> result = agglomerative_clustering(X, n_clusters=2)
>>> result.n_clusters
2
Notes
-----
Linkage methods:
- 'single': Distance between closest points (can create chains)
- 'complete': Distance between farthest points (tends to create compact clusters)
- 'average': Average distance between all pairs
- 'ward': Minimizes within-cluster variance (requires Euclidean distance)
"""
X = np.asarray(X, dtype=np.float64)
n_samples = X.shape[0]
if n_samples == 0:
return HierarchicalResult(
labels=np.array([], dtype=np.intp),
n_clusters=0,
linkage_matrix=np.zeros((0, 4)),
dendrogram=[],
)
if n_samples == 1:
return HierarchicalResult(
labels=np.array([0], dtype=np.intp),
n_clusters=1,
linkage_matrix=np.zeros((0, 4)),
dendrogram=[],
)
if n_clusters is None and distance_threshold is None:
n_clusters = 2
# Compute initial distance matrix
distances = compute_distance_matrix(X)
# Initialize clusters
# cluster_id -> list of original sample indices
clusters = {i: [i] for i in range(n_samples)}
cluster_sizes = {i: 1 for i in range(n_samples)}
# Active cluster IDs
active = set(range(n_samples))
# Linkage matrix and dendrogram
linkage_matrix = []
dendrogram = []
# Current cluster distance matrix (upper triangular, inf on diagonal)
cluster_dist = distances.copy()
np.fill_diagonal(cluster_dist, np.inf)
next_cluster_id = n_samples
# Merge until stopping criterion
while len(active) > 1:
# Check stopping criteria
if n_clusters is not None and len(active) <= n_clusters:
break
# Find minimum distance pair among active clusters
min_dist = np.inf
merge_i, merge_j = -1, -1
active_list = list(active)
for idx_a, i in enumerate(active_list):
for j in active_list[idx_a + 1 :]:
if cluster_dist[i, j] < min_dist:
min_dist = cluster_dist[i, j]
merge_i, merge_j = i, j
if merge_i == -1 or min_dist == np.inf:
break
if distance_threshold is not None and min_dist > distance_threshold:
break
# Ensure merge_i < merge_j for consistency
if merge_i > merge_j:
merge_i, merge_j = merge_j, merge_i
# Record merge
size_new = cluster_sizes[merge_i] + cluster_sizes[merge_j]
linkage_matrix.append([merge_i, merge_j, min_dist, size_new])
dendrogram.append(
DendrogramNode(
left=merge_i,
right=merge_j,
distance=min_dist,
count=size_new,
)
)
# Create new cluster
new_cluster_id = next_cluster_id
next_cluster_id += 1
clusters[new_cluster_id] = clusters[merge_i] + clusters[merge_j]
cluster_sizes[new_cluster_id] = size_new
# Update distance matrix for new cluster
# Expand matrix if needed
if new_cluster_id >= cluster_dist.shape[0]:
new_size = new_cluster_id + 1
new_dist = np.full((new_size, new_size), np.inf)
old_size = cluster_dist.shape[0]
new_dist[:old_size, :old_size] = cluster_dist
cluster_dist = new_dist
# Compute distances from new cluster to all other active clusters
for k in active:
if k == merge_i or k == merge_j:
continue
if linkage == "single":
new_d = _single_linkage(
cluster_dist[merge_i, k],
cluster_dist[merge_j, k],
cluster_sizes[merge_i],
cluster_sizes[merge_j],
)
elif linkage == "complete":
new_d = _complete_linkage(
cluster_dist[merge_i, k],
cluster_dist[merge_j, k],
cluster_sizes[merge_i],
cluster_sizes[merge_j],
)
elif linkage == "average":
new_d = _average_linkage(
cluster_dist[merge_i, k],
cluster_dist[merge_j, k],
cluster_sizes[merge_i],
cluster_sizes[merge_j],
)
elif linkage == "ward":
new_d = _ward_linkage(
cluster_dist[merge_i, k],
cluster_dist[merge_j, k],
cluster_sizes[merge_i],
cluster_sizes[merge_j],
np.array([cluster_sizes[k]]),
cluster_dist[merge_i, merge_j],
)[0]
else:
raise ValueError(f"Unknown linkage: {linkage}")
cluster_dist[new_cluster_id, k] = new_d
cluster_dist[k, new_cluster_id] = new_d
# Update active set
active.remove(merge_i)
active.remove(merge_j)
active.add(new_cluster_id)
# Clean up old clusters
del clusters[merge_i]
del clusters[merge_j]
del cluster_sizes[merge_i]
del cluster_sizes[merge_j]
# Assign labels based on final clusters
labels = np.zeros(n_samples, dtype=np.intp)
for cluster_label, cluster_id in enumerate(active):
for sample_idx in clusters[cluster_id]:
labels[sample_idx] = cluster_label
# Convert linkage matrix to array
if linkage_matrix:
linkage_array = np.array(linkage_matrix)
else:
linkage_array = np.zeros((0, 4))
return HierarchicalResult(
labels=labels,
n_clusters=len(active),
linkage_matrix=linkage_array,
dendrogram=dendrogram,
)
[docs]
def cut_dendrogram(
linkage_matrix: ArrayLike,
n_samples: int,
n_clusters: Optional[int] = None,
distance_threshold: Optional[float] = None,
) -> NDArray[np.intp]:
"""
Cut a dendrogram to obtain cluster labels.
Parameters
----------
linkage_matrix : array_like
Linkage matrix from agglomerative_clustering, shape (n-1, 4).
n_samples : int
Number of original samples.
n_clusters : int, optional
Number of clusters to form.
distance_threshold : float, optional
Maximum linkage distance.
Returns
-------
labels : ndarray
Cluster labels, shape (n_samples,).
Examples
--------
>>> import numpy as np
>>> rng = np.random.default_rng(42)
>>> X = np.vstack([rng.normal(0, 0.5, (10, 2)), rng.normal(3, 0.5, (10, 2))])
>>> result = agglomerative_clustering(X)
>>> labels = cut_dendrogram(result.linkage_matrix, n_samples=20, n_clusters=2)
>>> len(np.unique(labels))
2
"""
linkage_matrix = np.asarray(linkage_matrix)
if len(linkage_matrix) == 0:
return np.zeros(n_samples, dtype=np.intp)
if n_clusters is None and distance_threshold is None:
n_clusters = 2
# Build cluster membership by replaying merges
# Initially each sample is its own cluster
cluster_members = {i: {i} for i in range(n_samples)}
next_id = n_samples
n_current_clusters = n_samples
for row in linkage_matrix:
left, right, dist, _ = int(row[0]), int(row[1]), row[2], int(row[3])
# Check stopping criteria
if n_clusters is not None and n_current_clusters <= n_clusters:
break
if distance_threshold is not None and dist > distance_threshold:
break
# Merge clusters
new_members = cluster_members[left] | cluster_members[right]
cluster_members[next_id] = new_members
del cluster_members[left]
del cluster_members[right]
next_id += 1
n_current_clusters -= 1
# Assign labels
labels = np.zeros(n_samples, dtype=np.intp)
for label, (cluster_id, members) in enumerate(cluster_members.items()):
for sample_idx in members:
labels[sample_idx] = label
return labels
[docs]
def fcluster(
linkage_matrix: ArrayLike,
n_samples: int,
t: float,
criterion: Literal["distance", "maxclust"] = "distance",
) -> NDArray[np.intp]:
"""
Form flat clusters from hierarchical clustering.
Compatible interface with scipy.cluster.hierarchy.fcluster.
Parameters
----------
linkage_matrix : array_like
Linkage matrix from agglomerative_clustering.
n_samples : int
Number of original samples.
t : float
Threshold for forming clusters.
criterion : {'distance', 'maxclust'}
'distance': t is maximum cophenetic distance
'maxclust': t is maximum number of clusters
Returns
-------
labels : ndarray
Cluster labels (1-indexed for scipy compatibility).
Examples
--------
>>> import numpy as np
>>> rng = np.random.default_rng(42)
>>> X = np.vstack([rng.normal(0, 0.5, (10, 2)), rng.normal(3, 0.5, (10, 2))])
>>> result = agglomerative_clustering(X)
>>> labels = fcluster(result.linkage_matrix, n_samples=20, t=2, criterion='maxclust')
>>> labels.min() # 1-indexed
1
"""
if criterion == "distance":
labels = cut_dendrogram(linkage_matrix, n_samples, distance_threshold=t)
elif criterion == "maxclust":
labels = cut_dendrogram(linkage_matrix, n_samples, n_clusters=int(t))
else:
raise ValueError(f"Unknown criterion: {criterion}")
# Convert to 1-indexed for scipy compatibility
return labels + 1
__all__ = [
"LinkageType",
"DendrogramNode",
"HierarchicalResult",
"compute_distance_matrix",
"agglomerative_clustering",
"cut_dendrogram",
"fcluster",
]