Signal Processing Fundamentals

Overview

The Tracker Component Library provides radar detection, signal filtering, FFT, and wavelet functionality for processing sensor data.

Key Modules:

  • signal_processing.detection - CFAR algorithm (Constant False Alarm Rate)

  • signal_processing.filtering - FIR/IIR digital filters

  • signal_processing.optimal - Matched filtering, likelihood ratio tests

  • signal_processing.transforms - FFT, wavelets, spectral analysis

CFAR Detection (Constant False Alarm Rate)

Why CFAR?

Raw radar returns contain both signal and noise. CFAR adaptively sets detection threshold based on local clutter level to maintain constant false alarm probability.

Raw Signal
^
|     ┌─┐
|     │ │  ┌─ Detection threshold
|   ┌─┘ └──┐ (adaptive)
| ┌─┘      └─┐
|_┴─────────┴─────→ Range

Green: Signal + Noise
Red line: CFAR adaptive threshold

1D CFAR (along range/time axis)

from pytcl.signal_processing.detection import cfar_1d
import numpy as np

# Simulated radar return
signal = np.zeros(1000)
signal[500:510] = 10.0  # Target return
noise = np.random.randn(1000) * 1.0
radar_return = signal + noise

# CFAR detection
detections, threshold = cfar_1d(
    radar_return,
    window_size=32,      # Training window size
    guard_size=4,        # Guard band around test cell
    pfa=1e-6,            # Probability of false alarm
    method='cagoesulien'  # Algorithm variant
)

# Detections is binary array (1 = detection, 0 = no detection)
detection_indices = np.where(detections)[0]
print(f"Detections at indices: {detection_indices}")

# Plot
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 4))
plt.plot(radar_return, label='Signal+Noise', alpha=0.7)
plt.plot(threshold, label='CFAR Threshold', color='red')
plt.scatter(detection_indices, radar_return[detection_indices],
            color='green', s=50, label='Detections')
plt.legend()
plt.show()

CFAR Methods

Different algorithms for different clutter types:

# CA-CFAR (Cell Averaging): Simple, works well in uniform clutter
detections, th = cfar_1d(radar_data, method='cell_averaging')

# OS-CFAR (Order Statistics): Robust to multiple targets
detections, th = cfar_1d(radar_data, method='order_statistics', rank=4)

# SO-CFAR (Smallest Of): Conservative (few false alarms)
detections, th = cfar_1d(radar_data, method='smallest_of')

# GO-CFAR (Greatest Of): Aggressive (more detections)
detections, th = cfar_1d(radar_data, method='greatest_of')

2D CFAR (range × Doppler)

from pytcl.signal_processing.detection import cfar_2d

# Simulated range-Doppler map
# Dimensions: [n_range, n_doppler]
range_doppler = np.random.randn(256, 128) * 1.0  # Background clutter

# Add target
range_doppler[100:110, 50:55] += 5.0

# 2D CFAR
detections_2d, threshold_2d = cfar_2d(
    range_doppler,
    window_size=(32, 32),
    guard_size=(4, 4),
    pfa=1e-6
)

# Plot
plt.figure(figsize=(10, 8))
plt.subplot(2, 1, 1)
plt.imshow(range_doppler, aspect='auto', origin='lower', cmap='viridis')
plt.title('Range-Doppler Map')
plt.colorbar()

plt.subplot(2, 1, 2)
plt.imshow(detections_2d, aspect='auto', origin='lower', cmap='RdYl')
plt.title('CFAR Detections')
plt.colorbar()
plt.show()

Probability of Detection vs False Alarm

from pytcl.signal_processing.detection import cfar_1d

def compute_roc_curve(signal_snr_db, noise_std, pfa_values):
    """Receiver Operating Characteristic curve"""

    # Create test signal
    signal = np.zeros(1000)
    signal[500:510] = 10 ** (signal_snr_db / 20) * noise_std  # Target at SNR

    detections_per_pfa = []

    for pfa in pfa_values:
        radar_return = signal + np.random.randn(1000) * noise_std
        detections, _ = cfar_1d(radar_return, pfa=pfa)
        pd = np.sum(detections[500:510] == 1) / 10  # Prob of detection
        detections_per_pfa.append(pd)

    return detections_per_pfa

# Compute ROC
snr_db = 6  # 6 dB SNR
pfa_vals = np.logspace(-8, -3, 10)
pd_list = compute_roc_curve(snr_db, 1.0, pfa_vals)

plt.semilogx(pfa_vals, pd_list, 'o-', label=f'SNR={snr_db} dB')
plt.xlabel('Probability of False Alarm')
plt.ylabel('Probability of Detection')
plt.grid(True)
plt.legend()
plt.show()

Matched Filtering

Why Matched Filters?

Maximize signal-to-noise ratio (SNR) for known signal shape. Optimal for white Gaussian noise.

from pytcl.signal_processing.optimal import matched_filter
import numpy as np

# Known transmit signal (e.g., chirp pulse)
duration = 1e-6  # 1 microsecond
fs = 1e6  # 1 MHz sampling
t = np.arange(0, duration, 1/fs)

# Chirp signal (frequency sweeps 1-10 MHz)
f_start = 1e6
f_end = 10e6
known_signal = np.exp(2j * np.pi * (f_start + (f_end-f_start) * t / duration) * t)

# Received signal = transmitted + target return + noise
delay = 2e-7  # 2-way propagation delay
target_return = 0.1 * np.roll(known_signal, int(delay * fs))  # Weak return
noise = (np.random.randn(len(known_signal)) + 1j * np.random.randn(len(known_signal))) * 0.01

received = target_return + noise

# Apply matched filter
filtered, peaks = matched_filter(received, known_signal)

print(f"SNR improvement: {20*np.log10(np.abs(peaks).max()):.1f} dB")

# Detect range
detection_index = np.argmax(np.abs(filtered))
estimated_distance = detection_index / fs * 3e8 / 2  # c/2 for round trip

print(f"Estimated range: {estimated_distance:.1f} m")

Pulse Compression (matched filtering for radar pulses)

def pulse_compression_example():
    """Demonstrate range resolution improvement via pulse compression"""

    # LFM chirp: linear frequency modulation (common in modern radars)
    from pytcl.signal_processing.optimal import matched_filter

    # Parameters
    pulse_duration = 10e-6  # 10 μs
    bandwidth = 100e6       # 100 MHz
    fs = 1e9               # 1 GHz sampling
    c = 3e8                # Speed of light

    # Uncompressed range resolution
    tau_uncompressed = pulse_duration
    res_uncompressed = c * tau_uncompressed / 2

    # Compressed range resolution (determined by bandwidth)
    res_compressed = c / (2 * bandwidth)

    print(f"Uncompressed resolution: {res_uncompressed*100:.1f} m")
    print(f"Compressed resolution: {res_compressed*100:.3f} m")
    print(f"Improvement factor: {tau_uncompressed / (1/bandwidth):.0f}x")

Digital Filtering

FIR (Finite Impulse Response) Filter Design

from pytcl.signal_processing.filtering import design_fir_filter

# Design a low-pass FIR filter
fsample = 1000  # Sampling frequency (Hz)
fpass = 100     # Passband edge (Hz)
fstop = 150     # Stopband edge (Hz)
gpass = 1       # Passband ripple (dB)
gstop = 60      # Stopband attenuation (dB)

b = design_fir_filter(
    order=100,          # Filter order (number of taps)
    frequencies=[fpass, fstop],
    gains=[1, 0],
    fs=fsample
)

print(f"FIR coefficient length: {len(b)}")

# Apply filter to data
from pytcl.signal_processing.filtering import apply_filter

# Generate test signal: 50 Hz + 200 Hz components
t = np.linspace(0, 1, fsample)
signal = np.sin(2*np.pi*50*t) + 0.5*np.sin(2*np.pi*200*t)

# Filter
filtered = apply_filter(b, signal)

# Plot
plt.figure(figsize=(12, 4))
plt.plot(t, signal, label='Original (50+200 Hz)', alpha=0.7)
plt.plot(t, filtered, label='Filtered (50 Hz LPF)', linewidth=2)
plt.xlim([0, 0.1])
plt.legend()
plt.grid(True)
plt.show()

IIR (Infinite Impulse Response) Filter Design

from pytcl.signal_processing.filtering import (
    design_butterworth, apply_filter_iir
)

# Butterworth filter (maximally flat response)
fsample = 1000
fpass = 100   # Hz
order = 4

b, a = design_butterworth(
    order=order,
    cutoff_frequency=fpass,
    fs=fsample,
    filter_type='lowpass'
)

print(f"Numerator coefficients: {b}")
print(f"Denominator coefficients: {a}")

# Apply IIR filter
signal = np.sin(2*np.pi*50*t) + 0.5*np.sin(2*np.pi*200*t)
filtered = apply_filter_iir(b, a, signal)

FIR vs IIR Comparison

Property              FIR              IIR

Stability            Always stable    Can be unstable (careful design)
Order                Higher           Lower (for same stopband atten.)
Delay                Linear phase     Nonlinear phase
Group Delay          Constant         Varying
Real-time            More samples     Fewer computations
Numerical            Better           Prone to roundoff errors
Implementation       Simple           More complex

Use FIR for:      Low latency, linear phase, when order not critical
Use IIR for:      Low computation cost, tight resource constraints

```

FFT and Spectral Analysis

FFT for Radar Processing

from pytcl.signal_processing.transforms import fft_1d, fft_2d
import numpy as np

# 1D FFT: Time to frequency domain
signal = np.sin(2*np.pi*10*t) + 0.5*np.sin(2*np.pi*25*t) + noise

X = fft_1d(signal)
magnitude = np.abs(X)

# Plot magnitude spectrum
freqs = np.linspace(0, fsample/2, len(X)//2)
plt.plot(freqs, magnitude[:len(X)//2])
plt.xlabel('Frequency (Hz)')
plt.ylabel('Magnitude')
plt.show()

Doppler Processing (FFT along time dimension)

def doppler_processing(range_time_matrix, prf, c=3e8):
    """
    Extract Doppler velocity from radar returns

    Args:
        range_time_matrix: (n_range, n_pulses) complex radar data
        prf: Pulse repetition frequency (Hz)
        c: Speed of light

    Returns:
        range_doppler: (n_range, n_doppler) magnitude spectrum
    """
    # FFT along time dimension (pulses)
    range_doppler = fft_1d(range_time_matrix, axis=1)

    # Get magnitude
    magnitude = np.abs(range_doppler)

    # Create velocity axis
    n_doppler = range_doppler.shape[1]
    doppler_freqs = np.fft.fftfreq(n_doppler, 1/prf)

    # Convert to velocity
    # v = (f_doppler * c) / (2 * fc)
    # For simplicity, use proportional to Doppler frequency

    return magnitude

Time-Frequency Analysis (when target Doppler changes with time)

from pytcl.signal_processing.transforms import spectrogram

# Signal with changing frequency (like moving target)
t = np.linspace(0, 1, 1000)
# Frequency increases from 10 to 50 Hz
freq_t = 10 + 40 * t
signal = np.sin(2*np.pi*np.cumsum(freq_t)/1000)

# Spectrogram (FFT in windows)
T, F, Sxx = spectrogram(
    signal,
    fs=1000,
    nperseg=256,  # Window length
    noverlap=128   # Overlap
)

# Plot
plt.pcolormesh(T, F, 10*np.log10(Sxx), shading='auto')
plt.ylabel('Frequency (Hz)')
plt.xlabel('Time (s)')
plt.colorbar(label='Power (dB)')
plt.show()

Wavelets (Time-Frequency Analysis)

Wavelet Transform for analyzing non-stationary signals

from pytcl.signal_processing.transforms import continuous_wavelet_transform

# Signal: sum of two transients
t = np.linspace(0, 1, 1000)
transient1 = np.exp(-20*(t-0.2)**2) * np.sin(2*np.pi*100*t)
transient2 = np.exp(-20*(t-0.8)**2) * np.sin(2*np.pi*200*t)
signal = transient1 + transient2

# Continuous wavelet transform
scales = np.arange(1, 128)
coefficients, frequencies = continuous_wavelet_transform(
    signal,
    scales,
    mother_wavelet='morlet',
    fs=1000
)

# Plot scalogram
plt.contourf(t, frequencies, np.abs(coefficients), 64)
plt.colorbar(label='|Coefficient|')
plt.ylabel('Frequency')
plt.xlabel('Time')
plt.title('Scalogram')
plt.show()

Signal Detection Workflow

Complete Radar Detection Pipeline

class RadarProcessor:
    """End-to-end radar signal processing"""

    def __init__(self, fs, prf, c=3e8):
        self.fs = fs
        self.prf = prf
        self.c = c
        self.ca_alarm_rate = 1e-5

    def process_frame(self, raw_iq_data):
        """
        Process one radar frame

        Args:
            raw_iq_data: (n_range, n_pulses) complex I/Q samples

        Returns:
            detections: List of detected targets
        """
        from pytcl.signal_processing.detection import cfar_2d

        # Step 1: Pulse compression (matched filtering)
        # (compute reference signal from transmit waveform)
        # compressed = matched_filter_all_ranges(raw_iq_data)

        # Step 2: Doppler processing (FFT)
        range_doppler = np.fft.fft(raw_iq_data, axis=1)
        magnitude = np.abs(range_doppler)

        # Step 3: CFAR detection
        detections_2d, threshold = cfar_2d(
            magnitude,
            window_size=(8, 8),
            guard_size=(2, 2),
            pfa=self.ca_alarm_rate
        )

        # Step 4: Extract target parameters
        detection_cells = np.argwhere(detections_2d)

        targets = []
        for range_idx, doppler_idx in detection_cells:
            # Range computation
            range_m = range_idx * self.c / (2 * self.fs)

            # Doppler frequency
            doppler_idx_shifted = doppler_idx if doppler_idx < magnitude.shape[1]/2 else doppler_idx - magnitude.shape[1]
            doppler_freq = doppler_idx_shifted * self.prf / magnitude.shape[1]

            # Relative velocity
            velocity = doppler_freq * self.c / (2 * 10e9)  # Assumes 10 GHz carrier

            # Signal-to-noise ratio
            snr = magnitude[range_idx, doppler_idx] / threshold[range_idx, doppler_idx]

            targets.append({
                'range': range_m,
                'velocity': velocity,
                'snr': 10*np.log10(snr),
                'range_cell': range_idx,
                'doppler_cell': doppler_idx
            })

        return targets

    def track_detections(self, detections_sequence):
        """Track detections across frames"""
        # Simple association: nearest neighbor
        tracks = []

        for detections in detections_sequence:
            # Associate with existing tracks
            # Update or create new track
            pass

        return tracks

Multi-Channel Processing

Beamforming (coherent combination of multiple antenna elements)

def phased_array_beamform(signal_matrix, angles):
    """
    Conventional beamforming (delay-and-sum)

    Args:
        signal_matrix: (n_channels, n_samples) received signals
        angles: Array of angles to beamform toward

    Returns:
        beam_output: Beamformed signals at each angle
    """
    n_channels = signal_matrix.shape[0]
    beam_output = []

    for angle in angles:
        # Phase shifts for each channel to steer beam
        # For uniform linear array: phase[ch] = 2π * ch * sin(angle) / wavelength

        phase_shifts = 2*np.pi * np.arange(n_channels) * np.sin(angle) / 1.0
        weights = np.exp(1j * phase_shifts)

        # Beamform: weighted sum across channels
        beam = np.dot(weights, signal_matrix)
        beam_output.append(beam)

    return np.array(beam_output)

Range Resolution and Ambiguity

Range Resolution = c × τ / 2, where τ is pulse duration

# Higher resolution requires shorter pulses
# But shorter pulses have:
# - Lower average power → worse SNR
# - Pulse compression solution: long pulse + matched filter

pulse_durations = [1e-6, 10e-6, 100e-6]  # 1, 10, 100 μs
c = 3e8

for tau in pulse_durations:
    resolution = c * tau / 2
    print(f"τ={tau*1e6:.0f}μs → Resolution = {resolution:.1f}m")

Doppler Ambiguity (max unambiguous Doppler)

# Max unambiguous Doppler velocity
# v_max = PRF * λ / 4

prf = 10e3      # 10 kHz
wavelength = 3e8 / 10e9  # 3 cm for 10 GHz carrier

v_max = prf * wavelength / 4
print(f"Max unambiguous velocity: ±{v_max:.1f} m/s")

Performance Considerations

Computation Cost

Operation               Complexity      Speed

1D CFAR                O(N)            Fast ~ms
2D CFAR                O(N×M)          Medium ~10ms
Matched filter         O(N²) →O(N)     Fast with FFT
FFT (N samples)         O(N log N)      Fast
Butterworth IIR        O(N)            Fastest
FIR filter             O(N × L)        Medium (L=order)
Wavelets               O(N log N)      Medium

Streaming Processing (on-line)

class StreamingCFARDetector:
    """CFAR detector for continuous streaming data"""

    def __init__(self, window_size=32, guard_size=4):
        self.buffer = np.zeros(window_size + 2*guard_size)
        self.window_size = window_size
        self.guard_size = guard_size

    def process_sample(self, sample):
        """Process one sample, return detection"""
        # Shift buffer
        self.buffer = np.roll(self.buffer, -1)
        self.buffer[-1] = np.abs(sample)

        # Compute local mean (training window)
        mean_value = (np.sum(self.buffer[:self.guard_size]) +
                     np.sum(self.buffer[-self.guard_size:])) / (2*self.guard_size)

        # Test center sample
        threshold = mean_value * 10  # Adjust threshold factor for desired PFA
        detection = self.buffer[self.guard_size + self.window_size//2] > threshold

        return detection

See Also