"""
Cluster set container.
This module provides a container for managing groups of tracks
that move together (formations, convoys, etc.).
"""
from __future__ import annotations
from typing import (
Any,
Dict,
Iterable,
Iterator,
List,
NamedTuple,
Optional,
Tuple,
Union,
)
import numpy as np
from numpy.typing import ArrayLike, NDArray
from pytcl.clustering.dbscan import dbscan
from pytcl.clustering.kmeans import kmeans
from pytcl.containers.track_list import TrackList
from pytcl.trackers.multi_target import Track
[docs]
class TrackCluster(NamedTuple):
"""
A cluster of related tracks.
Attributes
----------
id : int
Unique cluster identifier.
track_ids : Tuple[int, ...]
Immutable tuple of track IDs in this cluster.
centroid : ndarray
Cluster center position.
covariance : ndarray
Cluster spread covariance matrix.
time : float
Time at which cluster was computed.
"""
id: int
track_ids: Tuple[int, ...]
centroid: NDArray[np.float64]
covariance: NDArray[np.float64]
time: float
[docs]
class ClusterStats(NamedTuple):
"""
Statistics for a cluster.
Attributes
----------
n_tracks : int
Number of tracks in the cluster.
mean_separation : float
Average distance between tracks and centroid.
max_separation : float
Maximum distance from any track to centroid.
velocity_coherence : float
Measure of how aligned velocities are (0-1).
1.0 means perfectly aligned, 0.0 means random directions.
"""
n_tracks: int
mean_separation: float
max_separation: float
velocity_coherence: float
[docs]
def compute_cluster_centroid(
tracks: Iterable[Track],
state_indices: Tuple[int, int] = (0, 2),
) -> NDArray[np.float64]:
"""
Compute the centroid of a group of tracks.
Parameters
----------
tracks : Iterable[Track]
Tracks to compute centroid for.
state_indices : tuple of int, optional
Indices of x and y in state vector (default: (0, 2)).
Returns
-------
centroid : ndarray
Centroid position [x, y].
Examples
--------
>>> from pytcl.trackers.multi_target import Track
>>> import numpy as np
>>> # Create sample tracks with [x, vx, y, vy] state vectors
>>> track1 = Track(state=np.array([0.0, 1.0, 0.0, 1.0]))
>>> track2 = Track(state=np.array([2.0, 1.0, 2.0, 1.0]))
>>> track3 = Track(state=np.array([4.0, 1.0, 4.0, 1.0]))
>>> tracks = [track1, track2, track3]
>>> centroid = compute_cluster_centroid(tracks)
>>> centroid
array([2., 2.])
See Also
--------
compute_cluster_covariance : Compute covariance of track positions.
"""
track_list = list(tracks)
if len(track_list) == 0:
return np.array([0.0, 0.0])
ix, iy = state_indices
positions = np.array([[t.state[ix], t.state[iy]] for t in track_list])
return np.mean(positions, axis=0)
def compute_cluster_covariance(
tracks: Iterable[Track],
state_indices: Tuple[int, int] = (0, 2),
) -> NDArray[np.float64]:
"""
Compute the covariance of track positions in a cluster.
Parameters
----------
tracks : Iterable[Track]
Tracks to compute covariance for.
state_indices : tuple of int, optional
Indices of x and y in state vector (default: (0, 2)).
Returns
-------
covariance : ndarray
Position covariance matrix (2x2).
Examples
--------
>>> from pytcl.trackers.multi_target import Track
>>> import numpy as np
>>> # Create collinear tracks (high variance along x-axis)
>>> track1 = Track(state=np.array([0.0, 1.0, 0.0, 0.0]))
>>> track2 = Track(state=np.array([1.0, 1.0, 0.0, 0.0]))
>>> track3 = Track(state=np.array([2.0, 1.0, 0.0, 0.0]))
>>> tracks = [track1, track2, track3]
>>> cov = compute_cluster_covariance(tracks)
>>> cov.shape
(2, 2)
>>> cov[0, 0] > cov[1, 1] # More spread in x than y
True
See Also
--------
compute_cluster_centroid : Compute centroid of track positions.
"""
track_list = list(tracks)
if len(track_list) < 2:
return np.eye(2)
ix, iy = state_indices
positions = np.array([[t.state[ix], t.state[iy]] for t in track_list])
return np.cov(positions.T)
[docs]
def cluster_tracks_dbscan(
tracks: TrackList,
eps: float,
min_samples: int = 2,
state_indices: Tuple[int, int] = (0, 2),
) -> "ClusterSet":
"""
Cluster tracks using DBSCAN algorithm.
Parameters
----------
tracks : TrackList
Tracks to cluster.
eps : float
Maximum distance between two tracks to be considered neighbors.
min_samples : int, optional
Minimum number of tracks to form a cluster (default: 2).
state_indices : tuple of int, optional
Indices of x and y in state vector (default: (0, 2)).
Returns
-------
ClusterSet
Set of track clusters.
"""
if len(tracks) == 0:
return ClusterSet()
# Extract positions
positions = tracks.positions(indices=state_indices)
# Run DBSCAN
result = dbscan(positions, eps=eps, min_samples=min_samples)
# Build clusters from labels
clusters = []
track_list = list(tracks)
time = track_list[0].time if track_list else 0.0
for cluster_id in range(result.n_clusters):
mask = result.labels == cluster_id
cluster_tracks = [track_list[i] for i in range(len(track_list)) if mask[i]]
track_ids = tuple(t.id for t in cluster_tracks)
centroid = compute_cluster_centroid(cluster_tracks, state_indices)
covariance = compute_cluster_covariance(cluster_tracks, state_indices)
clusters.append(
TrackCluster(
id=cluster_id,
track_ids=track_ids,
centroid=centroid,
covariance=covariance,
time=time,
)
)
return ClusterSet(clusters)
[docs]
def cluster_tracks_kmeans(
tracks: TrackList,
n_clusters: int,
state_indices: Tuple[int, int] = (0, 2),
rng: Optional[np.random.Generator] = None,
) -> "ClusterSet":
"""
Cluster tracks using K-means algorithm.
Parameters
----------
tracks : TrackList
Tracks to cluster.
n_clusters : int
Number of clusters to form.
state_indices : tuple of int, optional
Indices of x and y in state vector (default: (0, 2)).
rng : numpy.random.Generator, optional
Random number generator for reproducibility.
Returns
-------
ClusterSet
Set of track clusters.
"""
if len(tracks) == 0:
return ClusterSet()
if n_clusters > len(tracks):
n_clusters = len(tracks)
# Extract positions
positions = tracks.positions(indices=state_indices)
# Run K-means
result = kmeans(positions, n_clusters=n_clusters, rng=rng)
# Build clusters from labels
clusters = []
track_list = list(tracks)
time = track_list[0].time if track_list else 0.0
for cluster_id in range(n_clusters):
mask = result.labels == cluster_id
cluster_tracks = [track_list[i] for i in range(len(track_list)) if mask[i]]
if len(cluster_tracks) == 0:
continue
track_ids = tuple(t.id for t in cluster_tracks)
centroid = compute_cluster_centroid(cluster_tracks, state_indices)
covariance = compute_cluster_covariance(cluster_tracks, state_indices)
clusters.append(
TrackCluster(
id=cluster_id,
track_ids=track_ids,
centroid=centroid,
covariance=covariance,
time=time,
)
)
return ClusterSet(clusters)
[docs]
class ClusterSet:
"""
Collection of track clusters.
Provides:
- Cluster creation from tracks
- Cluster queries
- Cluster merging/splitting
Parameters
----------
clusters : Iterable[TrackCluster], optional
Initial clusters to add.
Examples
--------
>>> import numpy as np
>>> from pytcl.trackers.multi_target import Track, TrackStatus
>>> from pytcl.containers.track_list import TrackList
>>> # Create some tracks in two groups
>>> t1 = Track(id=0, state=np.array([0, 0, 0, 0]),
... covariance=np.eye(4), status=TrackStatus.CONFIRMED,
... hits=5, misses=0, time=1.0)
>>> t2 = Track(id=1, state=np.array([1, 0, 1, 0]),
... covariance=np.eye(4), status=TrackStatus.CONFIRMED,
... hits=5, misses=0, time=1.0)
>>> t3 = Track(id=2, state=np.array([10, 0, 10, 0]),
... covariance=np.eye(4), status=TrackStatus.CONFIRMED,
... hits=5, misses=0, time=1.0)
>>> tracks = TrackList([t1, t2, t3])
>>> # Cluster using DBSCAN
>>> clusters = cluster_tracks_dbscan(tracks, eps=5.0, min_samples=2)
"""
[docs]
def __init__(self, clusters: Optional[Iterable[TrackCluster]] = None) -> None:
"""Initialize cluster set."""
if clusters is None:
self._clusters: List[TrackCluster] = []
else:
self._clusters = list(clusters)
# Build lookups
self._id_to_idx: Dict[int, int] = {
c.id: i for i, c in enumerate(self._clusters)
}
self._track_to_cluster: Dict[int, int] = {}
for cluster in self._clusters:
for track_id in cluster.track_ids:
self._track_to_cluster[track_id] = cluster.id
[docs]
@classmethod
def from_tracks(
cls,
tracks: TrackList,
method: str = "dbscan",
**kwargs: Any,
) -> ClusterSet:
"""
Create a ClusterSet by clustering tracks.
Parameters
----------
tracks : TrackList
Tracks to cluster.
method : str
Clustering method: 'dbscan' or 'kmeans'.
**kwargs
Additional arguments passed to the clustering function.
For DBSCAN: eps, min_samples, state_indices
For K-means: n_clusters, state_indices, rng
Returns
-------
ClusterSet
New ClusterSet containing the computed clusters.
Examples
--------
>>> clusters = ClusterSet.from_tracks(tracks, method='dbscan', eps=2.0)
>>> clusters = ClusterSet.from_tracks(tracks, method='kmeans', n_clusters=3)
"""
if method == "dbscan":
return cluster_tracks_dbscan(tracks, **kwargs)
elif method == "kmeans":
return cluster_tracks_kmeans(tracks, **kwargs)
else:
raise ValueError(f"Unknown clustering method: {method}")
[docs]
def __len__(self) -> int:
"""Return number of clusters."""
return len(self._clusters)
[docs]
def __iter__(self) -> Iterator[TrackCluster]:
"""Iterate over clusters."""
return iter(self._clusters)
[docs]
def __getitem__(self, idx: Union[int, slice]) -> Union[TrackCluster, "ClusterSet"]:
"""
Get cluster by index or slice.
Parameters
----------
idx : int or slice
Index or slice to retrieve.
Returns
-------
TrackCluster or ClusterSet
Single cluster if int, ClusterSet if slice.
"""
if isinstance(idx, int):
return self._clusters[idx]
else:
return ClusterSet(self._clusters[idx])
[docs]
def __contains__(self, cluster_id: int) -> bool:
"""Check if cluster ID exists in set."""
return cluster_id in self._id_to_idx
[docs]
def __repr__(self) -> str:
"""String representation."""
return f"ClusterSet(n_clusters={len(self)})"
[docs]
def get_cluster(self, cluster_id: int) -> Optional[TrackCluster]:
"""
Get cluster by ID.
Parameters
----------
cluster_id : int
Cluster ID to find.
Returns
-------
TrackCluster or None
The cluster if found, None otherwise.
"""
idx = self._id_to_idx.get(cluster_id)
if idx is not None:
return self._clusters[idx]
return None
[docs]
def get_cluster_for_track(self, track_id: int) -> Optional[TrackCluster]:
"""
Get the cluster containing a specific track.
Parameters
----------
track_id : int
Track ID to find.
Returns
-------
TrackCluster or None
The cluster containing the track, or None if not found.
"""
cluster_id = self._track_to_cluster.get(track_id)
if cluster_id is not None:
return self.get_cluster(cluster_id)
return None
[docs]
def clusters_in_region(
self, center: ArrayLike, radius: float
) -> List[TrackCluster]:
"""
Get clusters with centroids within a spatial region.
Parameters
----------
center : array_like
Center point [x, y].
radius : float
Radius of the region.
Returns
-------
list of TrackCluster
Clusters within the region.
"""
center = np.asarray(center, dtype=np.float64)
result = []
for cluster in self._clusters:
dist = np.linalg.norm(cluster.centroid - center)
if dist <= radius:
result.append(cluster)
return result
@property
def cluster_ids(self) -> List[int]:
"""Get list of all cluster IDs."""
return [c.id for c in self._clusters]
@property
def n_tracks_total(self) -> int:
"""Get total number of tracks across all clusters."""
return sum(len(c.track_ids) for c in self._clusters)
[docs]
def cluster_stats(
self,
cluster_id: int,
tracks: Optional[TrackList] = None,
state_indices: Tuple[int, int] = (0, 2),
velocity_indices: Tuple[int, int] = (1, 3),
) -> Optional[ClusterStats]:
"""
Compute statistics for a cluster.
Parameters
----------
cluster_id : int
ID of the cluster.
tracks : TrackList, optional
TrackList containing the tracks. Required for velocity coherence.
state_indices : tuple of int, optional
Indices of x and y in state vector (default: (0, 2)).
velocity_indices : tuple of int, optional
Indices of vx and vy in state vector (default: (1, 3)).
Returns
-------
ClusterStats or None
Statistics for the cluster, or None if cluster not found.
"""
cluster = self.get_cluster(cluster_id)
if cluster is None:
return None
n_tracks = len(cluster.track_ids)
if n_tracks == 0:
return ClusterStats(
n_tracks=0,
mean_separation=0.0,
max_separation=0.0,
velocity_coherence=0.0,
)
# Compute separations if we have track data
mean_separation = 0.0
max_separation = 0.0
velocity_coherence = 0.0
if tracks is not None:
ix, iy = state_indices
positions = []
velocities = []
for tid in cluster.track_ids:
track = tracks.get_by_id(tid)
if track is not None:
positions.append([track.state[ix], track.state[iy]])
ivx, ivy = velocity_indices
if len(track.state) > max(ivx, ivy):
velocities.append([track.state[ivx], track.state[ivy]])
if len(positions) > 0:
positions = np.array(positions)
centroid = cluster.centroid
# Compute separations
separations = np.sqrt(np.sum((positions - centroid) ** 2, axis=1))
mean_separation = float(np.mean(separations))
max_separation = float(np.max(separations))
# Compute velocity coherence
if len(velocities) > 1:
velocities = np.array(velocities)
# Normalize velocities
norms = np.linalg.norm(velocities, axis=1, keepdims=True)
norms = np.maximum(norms, 1e-10)
unit_velocities = velocities / norms
# Mean velocity direction
mean_vel = np.mean(unit_velocities, axis=0)
mean_vel_norm = np.linalg.norm(mean_vel)
# Coherence is magnitude of mean unit velocity
velocity_coherence = float(mean_vel_norm)
return ClusterStats(
n_tracks=n_tracks,
mean_separation=mean_separation,
max_separation=max_separation,
velocity_coherence=velocity_coherence,
)
[docs]
def all_stats(
self,
tracks: Optional[TrackList] = None,
state_indices: Tuple[int, int] = (0, 2),
velocity_indices: Tuple[int, int] = (1, 3),
) -> Dict[int, ClusterStats]:
"""
Compute statistics for all clusters.
Parameters
----------
tracks : TrackList, optional
TrackList containing the tracks.
state_indices : tuple of int, optional
Indices of x and y in state vector (default: (0, 2)).
velocity_indices : tuple of int, optional
Indices of vx and vy in state vector (default: (1, 3)).
Returns
-------
dict
Mapping from cluster ID to ClusterStats.
"""
result = {}
for cluster in self._clusters:
stats = self.cluster_stats(
cluster.id, tracks, state_indices, velocity_indices
)
if stats is not None:
result[cluster.id] = stats
return result
[docs]
def add_cluster(self, cluster: TrackCluster) -> ClusterSet:
"""
Add a cluster and return a new ClusterSet.
Parameters
----------
cluster : TrackCluster
Cluster to add.
Returns
-------
ClusterSet
New ClusterSet with the cluster added.
"""
return ClusterSet(self._clusters + [cluster])
[docs]
def remove_cluster(self, cluster_id: int) -> ClusterSet:
"""
Remove a cluster by ID and return a new ClusterSet.
Parameters
----------
cluster_id : int
ID of cluster to remove.
Returns
-------
ClusterSet
New ClusterSet without the specified cluster.
"""
return ClusterSet([c for c in self._clusters if c.id != cluster_id])
[docs]
def merge_clusters(
self,
id1: int,
id2: int,
new_id: Optional[int] = None,
) -> ClusterSet:
"""
Merge two clusters into one.
Parameters
----------
id1 : int
ID of first cluster.
id2 : int
ID of second cluster.
new_id : int, optional
ID for the merged cluster. Defaults to id1.
Returns
-------
ClusterSet
New ClusterSet with merged cluster.
Raises
------
ValueError
If either cluster ID is not found.
"""
c1 = self.get_cluster(id1)
c2 = self.get_cluster(id2)
if c1 is None:
raise ValueError(f"Cluster {id1} not found")
if c2 is None:
raise ValueError(f"Cluster {id2} not found")
if new_id is None:
new_id = id1
# Merge track IDs
merged_track_ids = tuple(set(c1.track_ids) | set(c2.track_ids))
# Compute new centroid (weighted average)
n1, n2 = len(c1.track_ids), len(c2.track_ids)
new_centroid = (n1 * c1.centroid + n2 * c2.centroid) / (n1 + n2)
# Combine covariances (simple average for now)
new_covariance = (c1.covariance + c2.covariance) / 2
merged = TrackCluster(
id=new_id,
track_ids=merged_track_ids,
centroid=new_centroid,
covariance=new_covariance,
time=max(c1.time, c2.time),
)
# Build new cluster list
new_clusters = [c for c in self._clusters if c.id not in (id1, id2)]
new_clusters.append(merged)
return ClusterSet(new_clusters)
[docs]
def split_cluster(
self,
cluster_id: int,
track_ids_1: Iterable[int],
track_ids_2: Iterable[int],
new_id_1: Optional[int] = None,
new_id_2: Optional[int] = None,
tracks: Optional[TrackList] = None,
state_indices: Tuple[int, int] = (0, 2),
) -> ClusterSet:
"""
Split a cluster into two.
Parameters
----------
cluster_id : int
ID of cluster to split.
track_ids_1 : Iterable[int]
Track IDs for first new cluster.
track_ids_2 : Iterable[int]
Track IDs for second new cluster.
new_id_1 : int, optional
ID for first new cluster. Defaults to cluster_id.
new_id_2 : int, optional
ID for second new cluster. Defaults to max(cluster_ids) + 1.
tracks : TrackList, optional
TrackList for computing centroids. If None, uses existing centroid.
state_indices : tuple of int, optional
Indices of x and y in state vector (default: (0, 2)).
Returns
-------
ClusterSet
New ClusterSet with split clusters.
Raises
------
ValueError
If cluster ID is not found.
"""
original = self.get_cluster(cluster_id)
if original is None:
raise ValueError(f"Cluster {cluster_id} not found")
ids_1 = tuple(track_ids_1)
ids_2 = tuple(track_ids_2)
if new_id_1 is None:
new_id_1 = cluster_id
if new_id_2 is None:
new_id_2 = max(self.cluster_ids) + 1 if self.cluster_ids else 0
# Compute centroids
if tracks is not None:
tracks_1 = [tracks.get_by_id(tid) for tid in ids_1]
tracks_1 = [t for t in tracks_1 if t is not None]
tracks_2 = [tracks.get_by_id(tid) for tid in ids_2]
tracks_2 = [t for t in tracks_2 if t is not None]
centroid_1 = compute_cluster_centroid(tracks_1, state_indices)
centroid_2 = compute_cluster_centroid(tracks_2, state_indices)
cov_1 = compute_cluster_covariance(tracks_1, state_indices)
cov_2 = compute_cluster_covariance(tracks_2, state_indices)
else:
# Use original centroid for both
centroid_1 = original.centroid.copy()
centroid_2 = original.centroid.copy()
cov_1 = original.covariance.copy()
cov_2 = original.covariance.copy()
cluster_1 = TrackCluster(
id=new_id_1,
track_ids=ids_1,
centroid=centroid_1,
covariance=cov_1,
time=original.time,
)
cluster_2 = TrackCluster(
id=new_id_2,
track_ids=ids_2,
centroid=centroid_2,
covariance=cov_2,
time=original.time,
)
# Build new cluster list
new_clusters = [c for c in self._clusters if c.id != cluster_id]
new_clusters.extend([cluster_1, cluster_2])
return ClusterSet(new_clusters)
[docs]
def copy(self) -> ClusterSet:
"""
Create a copy of this ClusterSet.
Returns
-------
ClusterSet
A new ClusterSet with the same clusters.
"""
return ClusterSet(self._clusters)
__all__ = [
"TrackCluster",
"ClusterSet",
"ClusterStats",
"cluster_tracks_dbscan",
"cluster_tracks_kmeans",
"compute_cluster_centroid",
]