Source code for pytcl.containers.measurement_set

"""
Measurement set container.

This module provides a time-indexed collection of measurements
with spatial query support.
"""

from __future__ import annotations

from typing import Iterable, Iterator, List, NamedTuple, Optional, Tuple, Union

import numpy as np
from numpy.typing import ArrayLike, NDArray

from pytcl.containers.kd_tree import KDTree


[docs] class Measurement(NamedTuple): """ Single measurement with metadata. Attributes ---------- value : ndarray Measurement vector. time : float Time of measurement. covariance : ndarray, optional Measurement covariance matrix. sensor_id : int ID of the sensor that produced this measurement. id : int Unique measurement identifier. """ value: NDArray[np.float64] time: float covariance: Optional[NDArray[np.float64]] = None sensor_id: int = 0 id: int = -1
[docs] class MeasurementQuery(NamedTuple): """ Result of a measurement set query. Attributes ---------- measurements : List[Measurement] List of measurements matching the query. indices : List[int] Original indices of the matching measurements. """ measurements: List[Measurement] indices: List[int]
[docs] class MeasurementSet: """ Collection of measurements with time and spatial indexing. Provides: - Time-windowed queries - Spatial region queries - Sensor filtering - Batch value extraction Parameters ---------- measurements : Iterable[Measurement], optional Initial measurements to add. Examples -------- >>> import numpy as np >>> # Create measurements >>> m1 = Measurement(value=np.array([1.0, 2.0]), time=0.0, id=0) >>> m2 = Measurement(value=np.array([3.0, 4.0]), time=0.0, id=1) >>> m3 = Measurement(value=np.array([5.0, 6.0]), time=1.0, id=2) >>> # Create measurement set >>> mset = MeasurementSet([m1, m2, m3]) >>> len(mset) 3 >>> # Filter by time >>> at_t0 = mset.at_time(0.0) >>> len(at_t0) 2 >>> # Get values >>> values = mset.values() >>> values.shape (3, 2) """
[docs] def __init__(self, measurements: Optional[Iterable[Measurement]] = None) -> None: """Initialize measurement set.""" if measurements is None: self._measurements: List[Measurement] = [] else: self._measurements = list(measurements) # Spatial index (built lazily) self._spatial_index: Optional[KDTree] = None self._index_valid: bool = False
[docs] @classmethod def from_arrays( cls, values: ArrayLike, times: ArrayLike, covariances: Optional[ArrayLike] = None, sensor_ids: Optional[ArrayLike] = None, ) -> MeasurementSet: """ Create MeasurementSet from arrays. Parameters ---------- values : array_like Array of shape (n_meas, meas_dim) containing measurement values. times : array_like Array of length n_meas containing measurement times. covariances : array_like, optional Array of shape (n_meas, meas_dim, meas_dim) containing covariances. sensor_ids : array_like, optional Array of length n_meas containing sensor IDs. Returns ------- MeasurementSet New MeasurementSet containing the measurements. """ values = np.asarray(values, dtype=np.float64) times = np.asarray(times, dtype=np.float64) n_meas = len(values) if covariances is not None: covariances = np.asarray(covariances, dtype=np.float64) if sensor_ids is not None: sensor_ids = np.asarray(sensor_ids, dtype=np.int64) else: sensor_ids = np.zeros(n_meas, dtype=np.int64) measurements = [] for i in range(n_meas): cov = covariances[i] if covariances is not None else None m = Measurement( value=values[i].copy(), time=float(times[i]), covariance=cov.copy() if cov is not None else None, sensor_id=int(sensor_ids[i]), id=i, ) measurements.append(m) return cls(measurements)
[docs] def __len__(self) -> int: """Return number of measurements.""" return len(self._measurements)
[docs] def __iter__(self) -> Iterator[Measurement]: """Iterate over measurements.""" return iter(self._measurements)
[docs] def __getitem__(self, idx: Union[int, slice]) -> Union[Measurement, MeasurementSet]: """ Get measurement by index or slice. Parameters ---------- idx : int or slice Index or slice to retrieve. Returns ------- Measurement or MeasurementSet Single measurement if int, MeasurementSet if slice. """ if isinstance(idx, int): return self._measurements[idx] else: return MeasurementSet(self._measurements[idx])
[docs] def __repr__(self) -> str: """String representation.""" return f"MeasurementSet(n_meas={len(self)})"
[docs] def at_time(self, time: float, tolerance: float = 1e-9) -> MeasurementSet: """ Get measurements at a specific time. Parameters ---------- time : float Time to query. tolerance : float, optional Tolerance for time matching (default: 1e-9). Returns ------- MeasurementSet Measurements at the specified time. """ measurements = [ m for m in self._measurements if abs(m.time - time) <= tolerance ] return MeasurementSet(measurements)
[docs] def in_time_window(self, start: float, end: float) -> MeasurementSet: """ Get measurements in a time window. Parameters ---------- start : float Start time (inclusive). end : float End time (inclusive). Returns ------- MeasurementSet Measurements within the time window. """ measurements = [m for m in self._measurements if start <= m.time <= end] return MeasurementSet(measurements)
[docs] def in_region(self, center: ArrayLike, radius: float) -> MeasurementSet: """ Get measurements within a spatial region. Parameters ---------- center : array_like Center point of the region. radius : float Radius of the region. Returns ------- MeasurementSet Measurements within the region. """ center = np.asarray(center, dtype=np.float64) measurements = [] for m in self._measurements: # Use only dimensions that match the center meas_val = m.value[: len(center)] dist = np.linalg.norm(meas_val - center) if dist <= radius: measurements.append(m) return MeasurementSet(measurements)
[docs] def by_sensor(self, sensor_id: int) -> MeasurementSet: """ Get measurements from a specific sensor. Parameters ---------- sensor_id : int Sensor ID to filter by. Returns ------- MeasurementSet Measurements from the specified sensor. """ measurements = [m for m in self._measurements if m.sensor_id == sensor_id] return MeasurementSet(measurements)
[docs] def nearest_to(self, point: ArrayLike, k: int = 1) -> MeasurementQuery: """ Find k nearest measurements to a point. Parameters ---------- point : array_like Query point. k : int, optional Number of nearest neighbors (default: 1). Returns ------- MeasurementQuery Query result with measurements and indices. Notes ----- This method builds a spatial index on first call if not already built. """ if len(self._measurements) == 0: return MeasurementQuery(measurements=[], indices=[]) self.build_spatial_index() point = np.asarray(point, dtype=np.float64).reshape(1, -1) result = self._spatial_index.query(point, k=min(k, len(self))) indices = result.indices[0].tolist() measurements = [self._measurements[i] for i in indices] return MeasurementQuery(measurements=measurements, indices=indices)
@property def times(self) -> NDArray[np.float64]: """Get unique measurement times.""" if len(self._measurements) == 0: return np.array([]) return np.unique([m.time for m in self._measurements]) @property def sensors(self) -> List[int]: """Get unique sensor IDs.""" return list(set(m.sensor_id for m in self._measurements)) @property def time_range(self) -> Tuple[float, float]: """ Get time range of measurements. Returns ------- tuple of float (min_time, max_time) or (0.0, 0.0) if empty. """ if len(self._measurements) == 0: return (0.0, 0.0) times = [m.time for m in self._measurements] return (min(times), max(times))
[docs] def values(self) -> NDArray[np.float64]: """ Extract all measurement values as array. Returns ------- ndarray Array of shape (n_meas, meas_dim). """ if len(self._measurements) == 0: return np.zeros((0, 0)) return np.array([m.value for m in self._measurements])
[docs] def values_at_time( self, time: float, tolerance: float = 1e-9 ) -> NDArray[np.float64]: """ Extract measurement values at a specific time. Parameters ---------- time : float Time to query. tolerance : float, optional Tolerance for time matching (default: 1e-9). Returns ------- ndarray Array of shape (n_meas_at_time, meas_dim). """ return self.at_time(time, tolerance).values()
[docs] def add(self, measurement: Measurement) -> MeasurementSet: """ Add a measurement and return a new MeasurementSet. Parameters ---------- measurement : Measurement Measurement to add. Returns ------- MeasurementSet New MeasurementSet with the measurement added. """ return MeasurementSet(self._measurements + [measurement])
[docs] def add_batch(self, measurements: Iterable[Measurement]) -> MeasurementSet: """ Add multiple measurements and return a new MeasurementSet. Parameters ---------- measurements : Iterable[Measurement] Measurements to add. Returns ------- MeasurementSet New MeasurementSet with the measurements added. """ return MeasurementSet(self._measurements + list(measurements))
[docs] def merge(self, other: MeasurementSet) -> MeasurementSet: """ Merge with another MeasurementSet. Parameters ---------- other : MeasurementSet MeasurementSet to merge with. Returns ------- MeasurementSet New MeasurementSet containing measurements from both. """ return MeasurementSet(list(self._measurements) + list(other._measurements))
[docs] def copy(self) -> MeasurementSet: """ Create a copy of this MeasurementSet. Returns ------- MeasurementSet A new MeasurementSet with the same measurements. """ return MeasurementSet(self._measurements)
[docs] def build_spatial_index(self) -> None: """ Build spatial index for efficient nearest neighbor queries. The index is built from measurement values. This is called automatically by nearest_to() if needed. """ if self._index_valid and self._spatial_index is not None: return if len(self._measurements) == 0: self._spatial_index = None self._index_valid = True return # Build KDTree from measurement values values = self.values() self._spatial_index = KDTree(values) self._index_valid = True
__all__ = [ "Measurement", "MeasurementSet", "MeasurementQuery", ]