Tracking Containers

This example demonstrates track and measurement container data structures.

Overview

Efficient tracking systems require organized data structures:

  • TrackList: Collection of tracks with spatial queries

  • MeasurementSet: Organized measurement storage

  • Track state: Position, velocity, covariance, metadata

Key Concepts

  • Track ID management: Unique identifiers for each track

  • Temporal indexing: Accessing data by time step

  • Spatial queries: Finding tracks in a region

  • Track history: Storing past states for smoothing

Spatial Indexing: KD-trees enable efficient nearest-neighbor queries for track-to-measurement association.

Range Queries: R-trees support efficient rectangular range queries for gating operations.

Code Highlights

The example demonstrates:

  • Creating and populating TrackList containers

  • Adding tracks with state and covariance

  • Querying tracks by ID, time, or spatial region

  • Iterating over tracks for batch processing

Source Code

  1"""
  2Tracking Containers Example
  3===========================
  4
  5This example demonstrates the tracking container classes in PyTCL:
  6- TrackList: Collection of tracks with filtering and batch operations
  7- MeasurementSet: Time-indexed measurements with spatial queries
  8- ClusterSet: Track clustering for formation detection
  9
 10These containers provide efficient data management for multi-target tracking
 11applications with immutable design patterns and lazy spatial indexing.
 12"""
 13
 14import numpy as np
 15import plotly.graph_objects as go
 16from plotly.subplots import make_subplots
 17
 18from pytcl.containers import (
 19    ClusterSet,
 20    MeasurementSet,
 21    TrackList,
 22)
 23from pytcl.containers.cluster_set import cluster_tracks_dbscan, cluster_tracks_kmeans
 24from pytcl.containers.measurement_set import Measurement
 25from pytcl.containers.track_list import Track, TrackStatus
 26
 27
 28def create_sample_tracks(n_tracks: int = 10, seed: int = 42) -> TrackList:
 29    """Create sample tracks for demonstration."""
 30    rng = np.random.default_rng(seed)
 31
 32    tracks = []
 33    for i in range(n_tracks):
 34        # State: [x, vx, y, vy] - 2D position and velocity
 35        x = rng.uniform(-100, 100)
 36        y = rng.uniform(-100, 100)
 37        vx = rng.uniform(-5, 5)
 38        vy = rng.uniform(-5, 5)
 39        state = np.array([x, vx, y, vy])
 40
 41        # Covariance matrix
 42        pos_var = rng.uniform(1, 5)
 43        vel_var = rng.uniform(0.1, 0.5)
 44        P = np.diag([pos_var, vel_var, pos_var, vel_var])
 45
 46        # Random status based on hits
 47        hits = rng.integers(1, 20)
 48        misses = rng.integers(0, 5)
 49        if hits >= 5:
 50            status = TrackStatus.CONFIRMED
 51        elif misses >= 3:
 52            status = TrackStatus.DELETED
 53        else:
 54            status = TrackStatus.TENTATIVE
 55
 56        track = Track(
 57            id=i,
 58            state=state,
 59            covariance=P,
 60            status=status,
 61            hits=hits,
 62            misses=misses,
 63            time=10.0 + rng.uniform(0, 5),
 64        )
 65        tracks.append(track)
 66
 67    return TrackList(tracks)
 68
 69
 70def create_sample_measurements(n_times: int = 5, n_per_time: int = 8, seed: int = 42):
 71    """Create sample measurements across multiple time steps."""
 72    rng = np.random.default_rng(seed)
 73
 74    measurements = []
 75    meas_id = 0
 76    for t in range(n_times):
 77        time = float(t)
 78        for _ in range(n_per_time):
 79            # 2D position measurement
 80            value = rng.uniform(-50, 50, size=2)
 81            covariance = np.eye(2) * rng.uniform(0.5, 2.0)
 82            sensor_id = rng.integers(0, 3)  # 3 sensors
 83
 84            meas = Measurement(
 85                value=value,
 86                time=time,
 87                covariance=covariance,
 88                sensor_id=sensor_id,
 89                id=meas_id,
 90            )
 91            measurements.append(meas)
 92            meas_id += 1
 93
 94    return MeasurementSet(measurements)
 95
 96
 97def demo_track_list():
 98    """Demonstrate TrackList container operations."""
 99    print("=" * 70)
100    print("TrackList Container Demo")
101    print("=" * 70)
102
103    # Create sample tracks
104    tracks = create_sample_tracks(n_tracks=15)
105    print(f"\nCreated TrackList with {len(tracks)} tracks")
106
107    # Get statistics
108    stats = tracks.stats()
109    print("\nTrack Statistics:")
110    print(f"  Total tracks: {stats.n_tracks}")
111    print(f"  Confirmed: {stats.n_confirmed}")
112    print(f"  Tentative: {stats.n_tentative}")
113    print(f"  Deleted: {stats.n_deleted}")
114    print(f"  Mean hits: {stats.mean_hits:.1f}")
115    print(f"  Mean misses: {stats.mean_misses:.1f}")
116
117    # Filter by status
118    confirmed = tracks.filter_by_status(TrackStatus.CONFIRMED)
119    tentative = tracks.filter_by_status(TrackStatus.TENTATIVE)
120    print("\nFiltered by status:")
121    print(f"  Confirmed tracks: {len(confirmed)}")
122    print(f"  Tentative tracks: {len(tentative)}")
123
124    # Shortcut properties
125    print(f"  Using .confirmed property: {len(tracks.confirmed)}")
126    print(f"  Using .tentative property: {len(tracks.tentative)}")
127
128    # Filter by region (tracks near origin)
129    center = np.array([0.0, 0.0])
130    nearby = tracks.filter_by_region(center, radius=50.0, state_indices=(0, 2))
131    print(f"\nTracks within 50 units of origin: {len(nearby)}")
132
133    # Filter by time
134    recent = tracks.filter_by_time(min_time=12.0)
135    print(f"Tracks updated after t=12.0: {len(recent)}")
136
137    # Custom predicate filter
138    high_confidence = tracks.filter_by_predicate(lambda t: t.hits >= 10)
139    print(f"Tracks with 10+ hits: {len(high_confidence)}")
140
141    # Batch data extraction
142    if len(confirmed) > 0:
143        states = confirmed.states()
144        positions = confirmed.positions(indices=(0, 2))
145        print("\nBatch extraction from confirmed tracks:")
146        print(f"  States shape: {states.shape}")
147        print(f"  Positions shape: {positions.shape}")
148
149    # Access by ID
150    track_ids = tracks.track_ids
151    if track_ids:
152        track = tracks.get_by_id(track_ids[0])
153        print(f"\nTrack {track.id}:")
154        print(f"  Position: ({track.state[0]:.1f}, {track.state[2]:.1f})")
155        print(f"  Velocity: ({track.state[1]:.1f}, {track.state[3]:.1f})")
156        print(f"  Status: {track.status.name}")
157
158    # Immutable operations
159    new_track = Track(
160        id=100,
161        state=np.array([0, 0, 0, 0]),
162        covariance=np.eye(4),
163        status=TrackStatus.TENTATIVE,
164        hits=1,
165        misses=0,
166        time=15.0,
167    )
168    tracks_with_new = tracks.add(new_track)
169    print("\nAfter adding track:")
170    print(f"  Original TrackList: {len(tracks)} tracks")
171    print(f"  New TrackList: {len(tracks_with_new)} tracks")
172
173    # Merge two track lists
174    merged = confirmed.merge(tentative)
175    print(f"\nMerged confirmed + tentative: {len(merged)} tracks")
176
177
178def demo_measurement_set():
179    """Demonstrate MeasurementSet container operations."""
180    print("\n" + "=" * 70)
181    print("MeasurementSet Container Demo")
182    print("=" * 70)
183
184    # Create sample measurements
185    meas_set = create_sample_measurements(n_times=5, n_per_time=8)
186    print(f"\nCreated MeasurementSet with {len(meas_set)} measurements")
187
188    # Time properties
189    times = meas_set.times
190    time_range = meas_set.time_range
191    print("\nTime information:")
192    print(f"  Unique times: {times}")
193    print(f"  Time range: {time_range}")
194
195    # Query by time
196    at_t2 = meas_set.at_time(2.0)
197    print(f"\nMeasurements at t=2.0: {len(at_t2)}")
198
199    # Query time window
200    window = meas_set.in_time_window(1.0, 3.0)
201    print(f"Measurements in window [1.0, 3.0]: {len(window)}")
202
203    # Query by sensor
204    sensors = meas_set.sensors
205    print(f"\nSensors: {sensors}")
206    for sensor_id in sensors:
207        sensor_meas = meas_set.by_sensor(sensor_id)
208        print(f"  Sensor {sensor_id}: {len(sensor_meas)} measurements")
209
210    # Spatial queries
211    center = np.array([0.0, 0.0])
212    nearby = meas_set.in_region(center, radius=25.0)
213    print(f"\nMeasurements within 25 units of origin: {len(nearby)}")
214
215    # K-nearest neighbors
216    query_point = np.array([10.0, 10.0])
217    nearest = meas_set.nearest_to(query_point, k=3)
218    print("\n3 nearest measurements to (10, 10):")
219    for meas in nearest.measurements:
220        dist = np.linalg.norm(meas.value - query_point)
221        print(f"  ID {meas.id}: value={meas.value}, distance={dist:.2f}")
222
223    # Batch extraction
224    values = meas_set.values()
225    print("\nBatch extraction:")
226    print(f"  All values shape: {values.shape}")
227
228    values_at_t1 = meas_set.values_at_time(1.0)
229    print(f"  Values at t=1.0 shape: {values_at_t1.shape}")
230
231    # Create from arrays
232    new_values = np.random.randn(5, 2) * 10
233    new_times = np.array([10.0, 10.0, 10.1, 10.1, 10.2])
234    meas_from_arrays = MeasurementSet.from_arrays(new_values, new_times)
235    print(f"\nCreated from arrays: {len(meas_from_arrays)} measurements")
236
237
238def demo_cluster_set():
239    """Demonstrate ClusterSet container operations."""
240    print("\n" + "=" * 70)
241    print("ClusterSet Container Demo")
242    print("=" * 70)
243
244    # Create tracks with some spatial clustering
245    rng = np.random.default_rng(42)
246    tracks = []
247
248    # Cluster 1: tracks near (50, 50)
249    for i in range(5):
250        state = np.array(
251            [
252                50 + rng.normal(0, 3),  # x
253                2 + rng.normal(0, 0.5),  # vx
254                50 + rng.normal(0, 3),  # y
255                1 + rng.normal(0, 0.5),  # vy
256            ]
257        )
258        tracks.append(
259            Track(
260                id=i,
261                state=state,
262                covariance=np.eye(4),
263                status=TrackStatus.CONFIRMED,
264                hits=10,
265                misses=0,
266                time=0.0,
267            )
268        )
269
270    # Cluster 2: tracks near (-30, -30)
271    for i in range(4):
272        state = np.array(
273            [
274                -30 + rng.normal(0, 3),
275                -1 + rng.normal(0, 0.5),
276                -30 + rng.normal(0, 3),
277                2 + rng.normal(0, 0.5),
278            ]
279        )
280        tracks.append(
281            Track(
282                id=5 + i,
283                state=state,
284                covariance=np.eye(4),
285                status=TrackStatus.CONFIRMED,
286                hits=8,
287                misses=1,
288                time=0.0,
289            )
290        )
291
292    # Isolated tracks (noise)
293    for i in range(3):
294        state = np.array(
295            [
296                rng.uniform(-100, 100),
297                rng.uniform(-3, 3),
298                rng.uniform(-100, 100),
299                rng.uniform(-3, 3),
300            ]
301        )
302        tracks.append(
303            Track(
304                id=9 + i,
305                state=state,
306                covariance=np.eye(4),
307                status=TrackStatus.CONFIRMED,
308                hits=5,
309                misses=2,
310                time=0.0,
311            )
312        )
313
314    track_list = TrackList(tracks)
315    print(f"\nCreated {len(track_list)} tracks with 2 clusters + noise")
316
317    # DBSCAN clustering
318    print("\n--- DBSCAN Clustering ---")
319    clusters_dbscan = cluster_tracks_dbscan(
320        track_list,
321        eps=10.0,  # Max distance between neighbors
322        min_samples=3,  # Minimum cluster size
323        state_indices=(0, 2),  # Use x, y positions
324    )
325    print(f"Found {len(clusters_dbscan)} clusters")
326
327    for cluster in clusters_dbscan:
328        print(f"\n  Cluster {cluster.id}:")
329        print(f"    Track IDs: {cluster.track_ids}")
330        print(f"    Centroid: ({cluster.centroid[0]:.1f}, {cluster.centroid[1]:.1f})")
331        print(f"    Covariance diagonal: {np.diag(cluster.covariance)}")
332
333    # Cluster statistics
334    print("\n--- Cluster Statistics ---")
335    all_stats = clusters_dbscan.all_stats(
336        tracks=track_list,
337        state_indices=(0, 2),
338        velocity_indices=(1, 3),
339    )
340    for cluster_id, stats in all_stats.items():
341        print(f"\n  Cluster {cluster_id}:")
342        print(f"    Tracks: {stats.n_tracks}")
343        print(f"    Mean separation: {stats.mean_separation:.2f}")
344        print(f"    Max separation: {stats.max_separation:.2f}")
345        print(f"    Velocity coherence: {stats.velocity_coherence:.2f}")
346
347    # K-means clustering
348    print("\n--- K-Means Clustering ---")
349    clusters_kmeans = cluster_tracks_kmeans(
350        track_list,
351        n_clusters=3,
352        state_indices=(0, 2),
353        rng=np.random.default_rng(42),
354    )
355    print(f"Created {len(clusters_kmeans)} clusters")
356
357    for cluster in clusters_kmeans:
358        print(
359            f"  Cluster {cluster.id}: {len(cluster.track_ids)} tracks at "
360            f"({cluster.centroid[0]:.1f}, {cluster.centroid[1]:.1f})"
361        )
362
363    # Using ClusterSet.from_tracks factory
364    print("\n--- Factory Method ---")
365    clusters = ClusterSet.from_tracks(
366        track_list,
367        method="dbscan",
368        eps=10.0,
369        min_samples=2,
370    )
371    print(f"Created ClusterSet with {len(clusters)} clusters")
372
373    # Spatial query on clusters
374    center = np.array([50.0, 50.0])
375    nearby_clusters = clusters.clusters_in_region(center, radius=30.0)
376    print(f"\nClusters within 30 units of (50, 50): {len(nearby_clusters)}")
377
378    # Track to cluster lookup
379    if len(clusters) > 0:
380        track_id = 0
381        cluster = clusters.get_cluster_for_track(track_id)
382        if cluster:
383            print(f"Track {track_id} belongs to cluster {cluster.id}")
384
385    # Cluster manipulation (immutable)
386    if len(clusters) >= 2:
387        cluster_ids = clusters.cluster_ids
388        merged = clusters.merge_clusters(cluster_ids[0], cluster_ids[1])
389        print(f"\nAfter merging clusters {cluster_ids[0]} and {cluster_ids[1]}:")
390        print(f"  Original: {len(clusters)} clusters")
391        print(f"  After merge: {len(merged)} clusters")
392
393
394def demo_integration():
395    """Demonstrate integration between containers."""
396    print("\n" + "=" * 70)
397    print("Container Integration Demo")
398    print("=" * 70)
399
400    # Create tracks and measurements
401    tracks = create_sample_tracks(n_tracks=20)
402    measurements = create_sample_measurements(n_times=10, n_per_time=15)
403
404    print(f"\nDataset: {len(tracks)} tracks, {len(measurements)} measurements")
405
406    # Filter to confirmed tracks
407    confirmed = tracks.confirmed
408    print(f"\nConfirmed tracks: {len(confirmed)}")
409
410    # For each confirmed track, find nearby measurements
411    print("\nMatching tracks to nearby measurements:")
412    for track in list(confirmed)[:3]:  # Show first 3
413        pos = track.state[[0, 2]]  # x, y position
414        nearby_meas = measurements.in_region(pos, radius=20.0)
415        print(
416            f"  Track {track.id} at ({pos[0]:.1f}, {pos[1]:.1f}): "
417            f"{len(nearby_meas)} nearby measurements"
418        )
419
420    # Cluster confirmed tracks
421    if len(confirmed) >= 3:
422        clusters = ClusterSet.from_tracks(
423            confirmed,
424            method="dbscan",
425            eps=50.0,
426            min_samples=2,
427        )
428        print(f"\nClustered confirmed tracks: {len(clusters)} formations")
429
430        # For each cluster, find measurements near centroid
431        for cluster in clusters:
432            nearby = measurements.in_region(cluster.centroid, radius=30.0)
433            print(
434                f"  Cluster {cluster.id} ({len(cluster.track_ids)} tracks): "
435                f"{len(nearby)} measurements near centroid"
436            )
437
438    # Time-synchronized analysis
439    print("\n--- Time-Synchronized Analysis ---")
440    for t in [0.0, 2.0, 4.0]:
441        meas_at_t = measurements.at_time(t)
442        tracks_at_t = tracks.filter_by_time(max_time=t + 1.0)
443        print(
444            f"  t={t}: {len(meas_at_t)} measurements, "
445            f"{len(tracks_at_t)} tracks updated before t={t + 1}"
446        )
447
448
449def main():
450    """Run all demonstrations."""
451    print("\n" + "#" * 70)
452    print("# PyTCL Tracking Containers Example")
453    print("#" * 70)
454
455    demo_track_list()
456    demo_measurement_set()
457    demo_cluster_set()
458    demo_integration()
459
460    # Visualization
461    visualize_track_distribution()
462
463    print("\n" + "=" * 70)
464    print("Example complete!")
465    print("=" * 70)
466
467
468def visualize_track_distribution():
469    """Visualize track spatial distribution."""
470    print("\nGenerating track distribution visualization...")
471
472    # Create sample tracks
473    tracks = create_sample_tracks(n_tracks=15)
474
475    # Extract positions
476    positions = []
477    for track in tracks:
478        if track.state is not None:
479            # Assuming state is [x, vx, y, vy]
480            pos = track.state[[0, 2]]
481            positions.append(pos)
482
483    if positions:
484        positions = np.array(positions)
485
486        # Create scatter plot
487        fig = go.Figure()
488
489        fig.add_trace(
490            go.Scatter(
491                x=positions[:, 0],
492                y=positions[:, 1],
493                mode="markers+text",
494                text=[f"T{i}" for i in range(len(positions))],
495                marker=dict(size=10, color="blue", opacity=0.7),
496                textposition="top center",
497                name="Track Positions",
498            )
499        )
500
501        fig.update_layout(
502            title="Track Spatial Distribution",
503            xaxis_title="X Position (m)",
504            yaxis_title="Y Position (m)",
505            height=600,
506            width=700,
507            showlegend=False,
508        )
509
510        fig.show()
511
512
513if __name__ == "__main__":
514    main()

Running the Example

python examples/tracking_containers.py

See Also