Tracking Containers
This example demonstrates track and measurement container data structures.
Overview
Efficient tracking systems require organized data structures:
TrackList: Collection of tracks with spatial queries
MeasurementSet: Organized measurement storage
Track state: Position, velocity, covariance, metadata
Key Concepts
Track ID management: Unique identifiers for each track
Temporal indexing: Accessing data by time step
Spatial queries: Finding tracks in a region
Track history: Storing past states for smoothing
Spatial Indexing: KD-trees enable efficient nearest-neighbor queries for track-to-measurement association.
Range Queries: R-trees support efficient rectangular range queries for gating operations.
Code Highlights
The example demonstrates:
Creating and populating TrackList containers
Adding tracks with state and covariance
Querying tracks by ID, time, or spatial region
Iterating over tracks for batch processing
Source Code
1"""
2Tracking Containers Example
3===========================
4
5This example demonstrates the tracking container classes in PyTCL:
6- TrackList: Collection of tracks with filtering and batch operations
7- MeasurementSet: Time-indexed measurements with spatial queries
8- ClusterSet: Track clustering for formation detection
9
10These containers provide efficient data management for multi-target tracking
11applications with immutable design patterns and lazy spatial indexing.
12"""
13
14import numpy as np
15import plotly.graph_objects as go
16from plotly.subplots import make_subplots
17
18from pytcl.containers import (
19 ClusterSet,
20 MeasurementSet,
21 TrackList,
22)
23from pytcl.containers.cluster_set import cluster_tracks_dbscan, cluster_tracks_kmeans
24from pytcl.containers.measurement_set import Measurement
25from pytcl.containers.track_list import Track, TrackStatus
26
27
28def create_sample_tracks(n_tracks: int = 10, seed: int = 42) -> TrackList:
29 """Create sample tracks for demonstration."""
30 rng = np.random.default_rng(seed)
31
32 tracks = []
33 for i in range(n_tracks):
34 # State: [x, vx, y, vy] - 2D position and velocity
35 x = rng.uniform(-100, 100)
36 y = rng.uniform(-100, 100)
37 vx = rng.uniform(-5, 5)
38 vy = rng.uniform(-5, 5)
39 state = np.array([x, vx, y, vy])
40
41 # Covariance matrix
42 pos_var = rng.uniform(1, 5)
43 vel_var = rng.uniform(0.1, 0.5)
44 P = np.diag([pos_var, vel_var, pos_var, vel_var])
45
46 # Random status based on hits
47 hits = rng.integers(1, 20)
48 misses = rng.integers(0, 5)
49 if hits >= 5:
50 status = TrackStatus.CONFIRMED
51 elif misses >= 3:
52 status = TrackStatus.DELETED
53 else:
54 status = TrackStatus.TENTATIVE
55
56 track = Track(
57 id=i,
58 state=state,
59 covariance=P,
60 status=status,
61 hits=hits,
62 misses=misses,
63 time=10.0 + rng.uniform(0, 5),
64 )
65 tracks.append(track)
66
67 return TrackList(tracks)
68
69
70def create_sample_measurements(n_times: int = 5, n_per_time: int = 8, seed: int = 42):
71 """Create sample measurements across multiple time steps."""
72 rng = np.random.default_rng(seed)
73
74 measurements = []
75 meas_id = 0
76 for t in range(n_times):
77 time = float(t)
78 for _ in range(n_per_time):
79 # 2D position measurement
80 value = rng.uniform(-50, 50, size=2)
81 covariance = np.eye(2) * rng.uniform(0.5, 2.0)
82 sensor_id = rng.integers(0, 3) # 3 sensors
83
84 meas = Measurement(
85 value=value,
86 time=time,
87 covariance=covariance,
88 sensor_id=sensor_id,
89 id=meas_id,
90 )
91 measurements.append(meas)
92 meas_id += 1
93
94 return MeasurementSet(measurements)
95
96
97def demo_track_list():
98 """Demonstrate TrackList container operations."""
99 print("=" * 70)
100 print("TrackList Container Demo")
101 print("=" * 70)
102
103 # Create sample tracks
104 tracks = create_sample_tracks(n_tracks=15)
105 print(f"\nCreated TrackList with {len(tracks)} tracks")
106
107 # Get statistics
108 stats = tracks.stats()
109 print("\nTrack Statistics:")
110 print(f" Total tracks: {stats.n_tracks}")
111 print(f" Confirmed: {stats.n_confirmed}")
112 print(f" Tentative: {stats.n_tentative}")
113 print(f" Deleted: {stats.n_deleted}")
114 print(f" Mean hits: {stats.mean_hits:.1f}")
115 print(f" Mean misses: {stats.mean_misses:.1f}")
116
117 # Filter by status
118 confirmed = tracks.filter_by_status(TrackStatus.CONFIRMED)
119 tentative = tracks.filter_by_status(TrackStatus.TENTATIVE)
120 print("\nFiltered by status:")
121 print(f" Confirmed tracks: {len(confirmed)}")
122 print(f" Tentative tracks: {len(tentative)}")
123
124 # Shortcut properties
125 print(f" Using .confirmed property: {len(tracks.confirmed)}")
126 print(f" Using .tentative property: {len(tracks.tentative)}")
127
128 # Filter by region (tracks near origin)
129 center = np.array([0.0, 0.0])
130 nearby = tracks.filter_by_region(center, radius=50.0, state_indices=(0, 2))
131 print(f"\nTracks within 50 units of origin: {len(nearby)}")
132
133 # Filter by time
134 recent = tracks.filter_by_time(min_time=12.0)
135 print(f"Tracks updated after t=12.0: {len(recent)}")
136
137 # Custom predicate filter
138 high_confidence = tracks.filter_by_predicate(lambda t: t.hits >= 10)
139 print(f"Tracks with 10+ hits: {len(high_confidence)}")
140
141 # Batch data extraction
142 if len(confirmed) > 0:
143 states = confirmed.states()
144 positions = confirmed.positions(indices=(0, 2))
145 print("\nBatch extraction from confirmed tracks:")
146 print(f" States shape: {states.shape}")
147 print(f" Positions shape: {positions.shape}")
148
149 # Access by ID
150 track_ids = tracks.track_ids
151 if track_ids:
152 track = tracks.get_by_id(track_ids[0])
153 print(f"\nTrack {track.id}:")
154 print(f" Position: ({track.state[0]:.1f}, {track.state[2]:.1f})")
155 print(f" Velocity: ({track.state[1]:.1f}, {track.state[3]:.1f})")
156 print(f" Status: {track.status.name}")
157
158 # Immutable operations
159 new_track = Track(
160 id=100,
161 state=np.array([0, 0, 0, 0]),
162 covariance=np.eye(4),
163 status=TrackStatus.TENTATIVE,
164 hits=1,
165 misses=0,
166 time=15.0,
167 )
168 tracks_with_new = tracks.add(new_track)
169 print("\nAfter adding track:")
170 print(f" Original TrackList: {len(tracks)} tracks")
171 print(f" New TrackList: {len(tracks_with_new)} tracks")
172
173 # Merge two track lists
174 merged = confirmed.merge(tentative)
175 print(f"\nMerged confirmed + tentative: {len(merged)} tracks")
176
177
178def demo_measurement_set():
179 """Demonstrate MeasurementSet container operations."""
180 print("\n" + "=" * 70)
181 print("MeasurementSet Container Demo")
182 print("=" * 70)
183
184 # Create sample measurements
185 meas_set = create_sample_measurements(n_times=5, n_per_time=8)
186 print(f"\nCreated MeasurementSet with {len(meas_set)} measurements")
187
188 # Time properties
189 times = meas_set.times
190 time_range = meas_set.time_range
191 print("\nTime information:")
192 print(f" Unique times: {times}")
193 print(f" Time range: {time_range}")
194
195 # Query by time
196 at_t2 = meas_set.at_time(2.0)
197 print(f"\nMeasurements at t=2.0: {len(at_t2)}")
198
199 # Query time window
200 window = meas_set.in_time_window(1.0, 3.0)
201 print(f"Measurements in window [1.0, 3.0]: {len(window)}")
202
203 # Query by sensor
204 sensors = meas_set.sensors
205 print(f"\nSensors: {sensors}")
206 for sensor_id in sensors:
207 sensor_meas = meas_set.by_sensor(sensor_id)
208 print(f" Sensor {sensor_id}: {len(sensor_meas)} measurements")
209
210 # Spatial queries
211 center = np.array([0.0, 0.0])
212 nearby = meas_set.in_region(center, radius=25.0)
213 print(f"\nMeasurements within 25 units of origin: {len(nearby)}")
214
215 # K-nearest neighbors
216 query_point = np.array([10.0, 10.0])
217 nearest = meas_set.nearest_to(query_point, k=3)
218 print("\n3 nearest measurements to (10, 10):")
219 for meas in nearest.measurements:
220 dist = np.linalg.norm(meas.value - query_point)
221 print(f" ID {meas.id}: value={meas.value}, distance={dist:.2f}")
222
223 # Batch extraction
224 values = meas_set.values()
225 print("\nBatch extraction:")
226 print(f" All values shape: {values.shape}")
227
228 values_at_t1 = meas_set.values_at_time(1.0)
229 print(f" Values at t=1.0 shape: {values_at_t1.shape}")
230
231 # Create from arrays
232 new_values = np.random.randn(5, 2) * 10
233 new_times = np.array([10.0, 10.0, 10.1, 10.1, 10.2])
234 meas_from_arrays = MeasurementSet.from_arrays(new_values, new_times)
235 print(f"\nCreated from arrays: {len(meas_from_arrays)} measurements")
236
237
238def demo_cluster_set():
239 """Demonstrate ClusterSet container operations."""
240 print("\n" + "=" * 70)
241 print("ClusterSet Container Demo")
242 print("=" * 70)
243
244 # Create tracks with some spatial clustering
245 rng = np.random.default_rng(42)
246 tracks = []
247
248 # Cluster 1: tracks near (50, 50)
249 for i in range(5):
250 state = np.array(
251 [
252 50 + rng.normal(0, 3), # x
253 2 + rng.normal(0, 0.5), # vx
254 50 + rng.normal(0, 3), # y
255 1 + rng.normal(0, 0.5), # vy
256 ]
257 )
258 tracks.append(
259 Track(
260 id=i,
261 state=state,
262 covariance=np.eye(4),
263 status=TrackStatus.CONFIRMED,
264 hits=10,
265 misses=0,
266 time=0.0,
267 )
268 )
269
270 # Cluster 2: tracks near (-30, -30)
271 for i in range(4):
272 state = np.array(
273 [
274 -30 + rng.normal(0, 3),
275 -1 + rng.normal(0, 0.5),
276 -30 + rng.normal(0, 3),
277 2 + rng.normal(0, 0.5),
278 ]
279 )
280 tracks.append(
281 Track(
282 id=5 + i,
283 state=state,
284 covariance=np.eye(4),
285 status=TrackStatus.CONFIRMED,
286 hits=8,
287 misses=1,
288 time=0.0,
289 )
290 )
291
292 # Isolated tracks (noise)
293 for i in range(3):
294 state = np.array(
295 [
296 rng.uniform(-100, 100),
297 rng.uniform(-3, 3),
298 rng.uniform(-100, 100),
299 rng.uniform(-3, 3),
300 ]
301 )
302 tracks.append(
303 Track(
304 id=9 + i,
305 state=state,
306 covariance=np.eye(4),
307 status=TrackStatus.CONFIRMED,
308 hits=5,
309 misses=2,
310 time=0.0,
311 )
312 )
313
314 track_list = TrackList(tracks)
315 print(f"\nCreated {len(track_list)} tracks with 2 clusters + noise")
316
317 # DBSCAN clustering
318 print("\n--- DBSCAN Clustering ---")
319 clusters_dbscan = cluster_tracks_dbscan(
320 track_list,
321 eps=10.0, # Max distance between neighbors
322 min_samples=3, # Minimum cluster size
323 state_indices=(0, 2), # Use x, y positions
324 )
325 print(f"Found {len(clusters_dbscan)} clusters")
326
327 for cluster in clusters_dbscan:
328 print(f"\n Cluster {cluster.id}:")
329 print(f" Track IDs: {cluster.track_ids}")
330 print(f" Centroid: ({cluster.centroid[0]:.1f}, {cluster.centroid[1]:.1f})")
331 print(f" Covariance diagonal: {np.diag(cluster.covariance)}")
332
333 # Cluster statistics
334 print("\n--- Cluster Statistics ---")
335 all_stats = clusters_dbscan.all_stats(
336 tracks=track_list,
337 state_indices=(0, 2),
338 velocity_indices=(1, 3),
339 )
340 for cluster_id, stats in all_stats.items():
341 print(f"\n Cluster {cluster_id}:")
342 print(f" Tracks: {stats.n_tracks}")
343 print(f" Mean separation: {stats.mean_separation:.2f}")
344 print(f" Max separation: {stats.max_separation:.2f}")
345 print(f" Velocity coherence: {stats.velocity_coherence:.2f}")
346
347 # K-means clustering
348 print("\n--- K-Means Clustering ---")
349 clusters_kmeans = cluster_tracks_kmeans(
350 track_list,
351 n_clusters=3,
352 state_indices=(0, 2),
353 rng=np.random.default_rng(42),
354 )
355 print(f"Created {len(clusters_kmeans)} clusters")
356
357 for cluster in clusters_kmeans:
358 print(
359 f" Cluster {cluster.id}: {len(cluster.track_ids)} tracks at "
360 f"({cluster.centroid[0]:.1f}, {cluster.centroid[1]:.1f})"
361 )
362
363 # Using ClusterSet.from_tracks factory
364 print("\n--- Factory Method ---")
365 clusters = ClusterSet.from_tracks(
366 track_list,
367 method="dbscan",
368 eps=10.0,
369 min_samples=2,
370 )
371 print(f"Created ClusterSet with {len(clusters)} clusters")
372
373 # Spatial query on clusters
374 center = np.array([50.0, 50.0])
375 nearby_clusters = clusters.clusters_in_region(center, radius=30.0)
376 print(f"\nClusters within 30 units of (50, 50): {len(nearby_clusters)}")
377
378 # Track to cluster lookup
379 if len(clusters) > 0:
380 track_id = 0
381 cluster = clusters.get_cluster_for_track(track_id)
382 if cluster:
383 print(f"Track {track_id} belongs to cluster {cluster.id}")
384
385 # Cluster manipulation (immutable)
386 if len(clusters) >= 2:
387 cluster_ids = clusters.cluster_ids
388 merged = clusters.merge_clusters(cluster_ids[0], cluster_ids[1])
389 print(f"\nAfter merging clusters {cluster_ids[0]} and {cluster_ids[1]}:")
390 print(f" Original: {len(clusters)} clusters")
391 print(f" After merge: {len(merged)} clusters")
392
393
394def demo_integration():
395 """Demonstrate integration between containers."""
396 print("\n" + "=" * 70)
397 print("Container Integration Demo")
398 print("=" * 70)
399
400 # Create tracks and measurements
401 tracks = create_sample_tracks(n_tracks=20)
402 measurements = create_sample_measurements(n_times=10, n_per_time=15)
403
404 print(f"\nDataset: {len(tracks)} tracks, {len(measurements)} measurements")
405
406 # Filter to confirmed tracks
407 confirmed = tracks.confirmed
408 print(f"\nConfirmed tracks: {len(confirmed)}")
409
410 # For each confirmed track, find nearby measurements
411 print("\nMatching tracks to nearby measurements:")
412 for track in list(confirmed)[:3]: # Show first 3
413 pos = track.state[[0, 2]] # x, y position
414 nearby_meas = measurements.in_region(pos, radius=20.0)
415 print(
416 f" Track {track.id} at ({pos[0]:.1f}, {pos[1]:.1f}): "
417 f"{len(nearby_meas)} nearby measurements"
418 )
419
420 # Cluster confirmed tracks
421 if len(confirmed) >= 3:
422 clusters = ClusterSet.from_tracks(
423 confirmed,
424 method="dbscan",
425 eps=50.0,
426 min_samples=2,
427 )
428 print(f"\nClustered confirmed tracks: {len(clusters)} formations")
429
430 # For each cluster, find measurements near centroid
431 for cluster in clusters:
432 nearby = measurements.in_region(cluster.centroid, radius=30.0)
433 print(
434 f" Cluster {cluster.id} ({len(cluster.track_ids)} tracks): "
435 f"{len(nearby)} measurements near centroid"
436 )
437
438 # Time-synchronized analysis
439 print("\n--- Time-Synchronized Analysis ---")
440 for t in [0.0, 2.0, 4.0]:
441 meas_at_t = measurements.at_time(t)
442 tracks_at_t = tracks.filter_by_time(max_time=t + 1.0)
443 print(
444 f" t={t}: {len(meas_at_t)} measurements, "
445 f"{len(tracks_at_t)} tracks updated before t={t + 1}"
446 )
447
448
449def main():
450 """Run all demonstrations."""
451 print("\n" + "#" * 70)
452 print("# PyTCL Tracking Containers Example")
453 print("#" * 70)
454
455 demo_track_list()
456 demo_measurement_set()
457 demo_cluster_set()
458 demo_integration()
459
460 # Visualization
461 visualize_track_distribution()
462
463 print("\n" + "=" * 70)
464 print("Example complete!")
465 print("=" * 70)
466
467
468def visualize_track_distribution():
469 """Visualize track spatial distribution."""
470 print("\nGenerating track distribution visualization...")
471
472 # Create sample tracks
473 tracks = create_sample_tracks(n_tracks=15)
474
475 # Extract positions
476 positions = []
477 for track in tracks:
478 if track.state is not None:
479 # Assuming state is [x, vx, y, vy]
480 pos = track.state[[0, 2]]
481 positions.append(pos)
482
483 if positions:
484 positions = np.array(positions)
485
486 # Create scatter plot
487 fig = go.Figure()
488
489 fig.add_trace(
490 go.Scatter(
491 x=positions[:, 0],
492 y=positions[:, 1],
493 mode="markers+text",
494 text=[f"T{i}" for i in range(len(positions))],
495 marker=dict(size=10, color="blue", opacity=0.7),
496 textposition="top center",
497 name="Track Positions",
498 )
499 )
500
501 fig.update_layout(
502 title="Track Spatial Distribution",
503 xaxis_title="X Position (m)",
504 yaxis_title="Y Position (m)",
505 height=600,
506 width=700,
507 showlegend=False,
508 )
509
510 fig.show()
511
512
513if __name__ == "__main__":
514 main()
Running the Example
python examples/tracking_containers.py
See Also
Multi-Target Tracking - Using containers in MTT
spatial_data_structures - Spatial indexing