Signal Processing Fundamentals
Overview
The Tracker Component Library provides radar detection, signal filtering, FFT, and wavelet functionality for processing sensor data.
Key Modules:
signal_processing.detection- CFAR algorithm (Constant False Alarm Rate)signal_processing.filtering- FIR/IIR digital filterssignal_processing.optimal- Matched filtering, likelihood ratio testssignal_processing.transforms- FFT, wavelets, spectral analysis
CFAR Detection (Constant False Alarm Rate)
Why CFAR?
Raw radar returns contain both signal and noise. CFAR adaptively sets detection threshold based on local clutter level to maintain constant false alarm probability.
Raw Signal
^
| ┌─┐
| │ │ ┌─ Detection threshold
| ┌─┘ └──┐ (adaptive)
| ┌─┘ └─┐
|_┴─────────┴─────→ Range
Green: Signal + Noise
Red line: CFAR adaptive threshold
1D CFAR (along range/time axis)
from pytcl.signal_processing.detection import cfar_1d
import numpy as np
# Simulated radar return
signal = np.zeros(1000)
signal[500:510] = 10.0 # Target return
noise = np.random.randn(1000) * 1.0
radar_return = signal + noise
# CFAR detection
detections, threshold = cfar_1d(
radar_return,
window_size=32, # Training window size
guard_size=4, # Guard band around test cell
pfa=1e-6, # Probability of false alarm
method='cagoesulien' # Algorithm variant
)
# Detections is binary array (1 = detection, 0 = no detection)
detection_indices = np.where(detections)[0]
print(f"Detections at indices: {detection_indices}")
# Plot
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 4))
plt.plot(radar_return, label='Signal+Noise', alpha=0.7)
plt.plot(threshold, label='CFAR Threshold', color='red')
plt.scatter(detection_indices, radar_return[detection_indices],
color='green', s=50, label='Detections')
plt.legend()
plt.show()
CFAR Methods
Different algorithms for different clutter types:
# CA-CFAR (Cell Averaging): Simple, works well in uniform clutter
detections, th = cfar_1d(radar_data, method='cell_averaging')
# OS-CFAR (Order Statistics): Robust to multiple targets
detections, th = cfar_1d(radar_data, method='order_statistics', rank=4)
# SO-CFAR (Smallest Of): Conservative (few false alarms)
detections, th = cfar_1d(radar_data, method='smallest_of')
# GO-CFAR (Greatest Of): Aggressive (more detections)
detections, th = cfar_1d(radar_data, method='greatest_of')
2D CFAR (range × Doppler)
from pytcl.signal_processing.detection import cfar_2d
# Simulated range-Doppler map
# Dimensions: [n_range, n_doppler]
range_doppler = np.random.randn(256, 128) * 1.0 # Background clutter
# Add target
range_doppler[100:110, 50:55] += 5.0
# 2D CFAR
detections_2d, threshold_2d = cfar_2d(
range_doppler,
window_size=(32, 32),
guard_size=(4, 4),
pfa=1e-6
)
# Plot
plt.figure(figsize=(10, 8))
plt.subplot(2, 1, 1)
plt.imshow(range_doppler, aspect='auto', origin='lower', cmap='viridis')
plt.title('Range-Doppler Map')
plt.colorbar()
plt.subplot(2, 1, 2)
plt.imshow(detections_2d, aspect='auto', origin='lower', cmap='RdYl')
plt.title('CFAR Detections')
plt.colorbar()
plt.show()
Probability of Detection vs False Alarm
from pytcl.signal_processing.detection import cfar_1d
def compute_roc_curve(signal_snr_db, noise_std, pfa_values):
"""Receiver Operating Characteristic curve"""
# Create test signal
signal = np.zeros(1000)
signal[500:510] = 10 ** (signal_snr_db / 20) * noise_std # Target at SNR
detections_per_pfa = []
for pfa in pfa_values:
radar_return = signal + np.random.randn(1000) * noise_std
detections, _ = cfar_1d(radar_return, pfa=pfa)
pd = np.sum(detections[500:510] == 1) / 10 # Prob of detection
detections_per_pfa.append(pd)
return detections_per_pfa
# Compute ROC
snr_db = 6 # 6 dB SNR
pfa_vals = np.logspace(-8, -3, 10)
pd_list = compute_roc_curve(snr_db, 1.0, pfa_vals)
plt.semilogx(pfa_vals, pd_list, 'o-', label=f'SNR={snr_db} dB')
plt.xlabel('Probability of False Alarm')
plt.ylabel('Probability of Detection')
plt.grid(True)
plt.legend()
plt.show()
Matched Filtering
Why Matched Filters?
Maximize signal-to-noise ratio (SNR) for known signal shape. Optimal for white Gaussian noise.
from pytcl.signal_processing.optimal import matched_filter
import numpy as np
# Known transmit signal (e.g., chirp pulse)
duration = 1e-6 # 1 microsecond
fs = 1e6 # 1 MHz sampling
t = np.arange(0, duration, 1/fs)
# Chirp signal (frequency sweeps 1-10 MHz)
f_start = 1e6
f_end = 10e6
known_signal = np.exp(2j * np.pi * (f_start + (f_end-f_start) * t / duration) * t)
# Received signal = transmitted + target return + noise
delay = 2e-7 # 2-way propagation delay
target_return = 0.1 * np.roll(known_signal, int(delay * fs)) # Weak return
noise = (np.random.randn(len(known_signal)) + 1j * np.random.randn(len(known_signal))) * 0.01
received = target_return + noise
# Apply matched filter
filtered, peaks = matched_filter(received, known_signal)
print(f"SNR improvement: {20*np.log10(np.abs(peaks).max()):.1f} dB")
# Detect range
detection_index = np.argmax(np.abs(filtered))
estimated_distance = detection_index / fs * 3e8 / 2 # c/2 for round trip
print(f"Estimated range: {estimated_distance:.1f} m")
Pulse Compression (matched filtering for radar pulses)
def pulse_compression_example():
"""Demonstrate range resolution improvement via pulse compression"""
# LFM chirp: linear frequency modulation (common in modern radars)
from pytcl.signal_processing.optimal import matched_filter
# Parameters
pulse_duration = 10e-6 # 10 μs
bandwidth = 100e6 # 100 MHz
fs = 1e9 # 1 GHz sampling
c = 3e8 # Speed of light
# Uncompressed range resolution
tau_uncompressed = pulse_duration
res_uncompressed = c * tau_uncompressed / 2
# Compressed range resolution (determined by bandwidth)
res_compressed = c / (2 * bandwidth)
print(f"Uncompressed resolution: {res_uncompressed*100:.1f} m")
print(f"Compressed resolution: {res_compressed*100:.3f} m")
print(f"Improvement factor: {tau_uncompressed / (1/bandwidth):.0f}x")
Digital Filtering
FIR (Finite Impulse Response) Filter Design
from pytcl.signal_processing.filtering import design_fir_filter
# Design a low-pass FIR filter
fsample = 1000 # Sampling frequency (Hz)
fpass = 100 # Passband edge (Hz)
fstop = 150 # Stopband edge (Hz)
gpass = 1 # Passband ripple (dB)
gstop = 60 # Stopband attenuation (dB)
b = design_fir_filter(
order=100, # Filter order (number of taps)
frequencies=[fpass, fstop],
gains=[1, 0],
fs=fsample
)
print(f"FIR coefficient length: {len(b)}")
# Apply filter to data
from pytcl.signal_processing.filtering import apply_filter
# Generate test signal: 50 Hz + 200 Hz components
t = np.linspace(0, 1, fsample)
signal = np.sin(2*np.pi*50*t) + 0.5*np.sin(2*np.pi*200*t)
# Filter
filtered = apply_filter(b, signal)
# Plot
plt.figure(figsize=(12, 4))
plt.plot(t, signal, label='Original (50+200 Hz)', alpha=0.7)
plt.plot(t, filtered, label='Filtered (50 Hz LPF)', linewidth=2)
plt.xlim([0, 0.1])
plt.legend()
plt.grid(True)
plt.show()
IIR (Infinite Impulse Response) Filter Design
from pytcl.signal_processing.filtering import (
design_butterworth, apply_filter_iir
)
# Butterworth filter (maximally flat response)
fsample = 1000
fpass = 100 # Hz
order = 4
b, a = design_butterworth(
order=order,
cutoff_frequency=fpass,
fs=fsample,
filter_type='lowpass'
)
print(f"Numerator coefficients: {b}")
print(f"Denominator coefficients: {a}")
# Apply IIR filter
signal = np.sin(2*np.pi*50*t) + 0.5*np.sin(2*np.pi*200*t)
filtered = apply_filter_iir(b, a, signal)
FIR vs IIR Comparison
Property FIR IIR
Stability Always stable Can be unstable (careful design)
Order Higher Lower (for same stopband atten.)
Delay Linear phase Nonlinear phase
Group Delay Constant Varying
Real-time More samples Fewer computations
Numerical Better Prone to roundoff errors
Implementation Simple More complex
Use FIR for: Low latency, linear phase, when order not critical
Use IIR for: Low computation cost, tight resource constraints
FFT and Spectral Analysis
FFT for Radar Processing
from pytcl.signal_processing.transforms import fft_1d, fft_2d
import numpy as np
# 1D FFT: Time to frequency domain
signal = np.sin(2*np.pi*10*t) + 0.5*np.sin(2*np.pi*25*t) + noise
X = fft_1d(signal)
magnitude = np.abs(X)
# Plot magnitude spectrum
freqs = np.linspace(0, fsample/2, len(X)//2)
plt.plot(freqs, magnitude[:len(X)//2])
plt.xlabel('Frequency (Hz)')
plt.ylabel('Magnitude')
plt.show()
Doppler Processing (FFT along time dimension)
def doppler_processing(range_time_matrix, prf, c=3e8):
"""
Extract Doppler velocity from radar returns
Args:
range_time_matrix: (n_range, n_pulses) complex radar data
prf: Pulse repetition frequency (Hz)
c: Speed of light
Returns:
range_doppler: (n_range, n_doppler) magnitude spectrum
"""
# FFT along time dimension (pulses)
range_doppler = fft_1d(range_time_matrix, axis=1)
# Get magnitude
magnitude = np.abs(range_doppler)
# Create velocity axis
n_doppler = range_doppler.shape[1]
doppler_freqs = np.fft.fftfreq(n_doppler, 1/prf)
# Convert to velocity
# v = (f_doppler * c) / (2 * fc)
# For simplicity, use proportional to Doppler frequency
return magnitude
Time-Frequency Analysis (when target Doppler changes with time)
from pytcl.signal_processing.transforms import spectrogram
# Signal with changing frequency (like moving target)
t = np.linspace(0, 1, 1000)
# Frequency increases from 10 to 50 Hz
freq_t = 10 + 40 * t
signal = np.sin(2*np.pi*np.cumsum(freq_t)/1000)
# Spectrogram (FFT in windows)
T, F, Sxx = spectrogram(
signal,
fs=1000,
nperseg=256, # Window length
noverlap=128 # Overlap
)
# Plot
plt.pcolormesh(T, F, 10*np.log10(Sxx), shading='auto')
plt.ylabel('Frequency (Hz)')
plt.xlabel('Time (s)')
plt.colorbar(label='Power (dB)')
plt.show()
Wavelets (Time-Frequency Analysis)
Wavelet Transform for analyzing non-stationary signals
from pytcl.signal_processing.transforms import continuous_wavelet_transform
# Signal: sum of two transients
t = np.linspace(0, 1, 1000)
transient1 = np.exp(-20*(t-0.2)**2) * np.sin(2*np.pi*100*t)
transient2 = np.exp(-20*(t-0.8)**2) * np.sin(2*np.pi*200*t)
signal = transient1 + transient2
# Continuous wavelet transform
scales = np.arange(1, 128)
coefficients, frequencies = continuous_wavelet_transform(
signal,
scales,
mother_wavelet='morlet',
fs=1000
)
# Plot scalogram
plt.contourf(t, frequencies, np.abs(coefficients), 64)
plt.colorbar(label='|Coefficient|')
plt.ylabel('Frequency')
plt.xlabel('Time')
plt.title('Scalogram')
plt.show()
Signal Detection Workflow
Complete Radar Detection Pipeline
class RadarProcessor:
"""End-to-end radar signal processing"""
def __init__(self, fs, prf, c=3e8):
self.fs = fs
self.prf = prf
self.c = c
self.ca_alarm_rate = 1e-5
def process_frame(self, raw_iq_data):
"""
Process one radar frame
Args:
raw_iq_data: (n_range, n_pulses) complex I/Q samples
Returns:
detections: List of detected targets
"""
from pytcl.signal_processing.detection import cfar_2d
# Step 1: Pulse compression (matched filtering)
# (compute reference signal from transmit waveform)
# compressed = matched_filter_all_ranges(raw_iq_data)
# Step 2: Doppler processing (FFT)
range_doppler = np.fft.fft(raw_iq_data, axis=1)
magnitude = np.abs(range_doppler)
# Step 3: CFAR detection
detections_2d, threshold = cfar_2d(
magnitude,
window_size=(8, 8),
guard_size=(2, 2),
pfa=self.ca_alarm_rate
)
# Step 4: Extract target parameters
detection_cells = np.argwhere(detections_2d)
targets = []
for range_idx, doppler_idx in detection_cells:
# Range computation
range_m = range_idx * self.c / (2 * self.fs)
# Doppler frequency
doppler_idx_shifted = doppler_idx if doppler_idx < magnitude.shape[1]/2 else doppler_idx - magnitude.shape[1]
doppler_freq = doppler_idx_shifted * self.prf / magnitude.shape[1]
# Relative velocity
velocity = doppler_freq * self.c / (2 * 10e9) # Assumes 10 GHz carrier
# Signal-to-noise ratio
snr = magnitude[range_idx, doppler_idx] / threshold[range_idx, doppler_idx]
targets.append({
'range': range_m,
'velocity': velocity,
'snr': 10*np.log10(snr),
'range_cell': range_idx,
'doppler_cell': doppler_idx
})
return targets
def track_detections(self, detections_sequence):
"""Track detections across frames"""
# Simple association: nearest neighbor
tracks = []
for detections in detections_sequence:
# Associate with existing tracks
# Update or create new track
pass
return tracks
Multi-Channel Processing
Beamforming (coherent combination of multiple antenna elements)
def phased_array_beamform(signal_matrix, angles):
"""
Conventional beamforming (delay-and-sum)
Args:
signal_matrix: (n_channels, n_samples) received signals
angles: Array of angles to beamform toward
Returns:
beam_output: Beamformed signals at each angle
"""
n_channels = signal_matrix.shape[0]
beam_output = []
for angle in angles:
# Phase shifts for each channel to steer beam
# For uniform linear array: phase[ch] = 2π * ch * sin(angle) / wavelength
phase_shifts = 2*np.pi * np.arange(n_channels) * np.sin(angle) / 1.0
weights = np.exp(1j * phase_shifts)
# Beamform: weighted sum across channels
beam = np.dot(weights, signal_matrix)
beam_output.append(beam)
return np.array(beam_output)
Range Resolution and Ambiguity
Range Resolution = c × τ / 2, where τ is pulse duration
# Higher resolution requires shorter pulses
# But shorter pulses have:
# - Lower average power → worse SNR
# - Pulse compression solution: long pulse + matched filter
pulse_durations = [1e-6, 10e-6, 100e-6] # 1, 10, 100 μs
c = 3e8
for tau in pulse_durations:
resolution = c * tau / 2
print(f"τ={tau*1e6:.0f}μs → Resolution = {resolution:.1f}m")
Doppler Ambiguity (max unambiguous Doppler)
# Max unambiguous Doppler velocity
# v_max = PRF * λ / 4
prf = 10e3 # 10 kHz
wavelength = 3e8 / 10e9 # 3 cm for 10 GHz carrier
v_max = prf * wavelength / 4
print(f"Max unambiguous velocity: ±{v_max:.1f} m/s")
Performance Considerations
Computation Cost
Operation Complexity Speed
1D CFAR O(N) Fast ~ms
2D CFAR O(N×M) Medium ~10ms
Matched filter O(N²) →O(N) Fast with FFT
FFT (N samples) O(N log N) Fast
Butterworth IIR O(N) Fastest
FIR filter O(N × L) Medium (L=order)
Wavelets O(N log N) Medium
Streaming Processing (on-line)
class StreamingCFARDetector:
"""CFAR detector for continuous streaming data"""
def __init__(self, window_size=32, guard_size=4):
self.buffer = np.zeros(window_size + 2*guard_size)
self.window_size = window_size
self.guard_size = guard_size
def process_sample(self, sample):
"""Process one sample, return detection"""
# Shift buffer
self.buffer = np.roll(self.buffer, -1)
self.buffer[-1] = np.abs(sample)
# Compute local mean (training window)
mean_value = (np.sum(self.buffer[:self.guard_size]) +
np.sum(self.buffer[-self.guard_size:])) / (2*self.guard_size)
# Test center sample
threshold = mean_value * 10 # Adjust threshold factor for desired PFA
detection = self.buffer[self.guard_size + self.window_size//2] > threshold
return detection
See Also
Library Architecture - Module organization
API Navigation Guide - Finding signal processing functions
Performance Optimization Guide - Optimization techniques
examples/signal_processing.py- Signal processing examples