Signal Processing

This example demonstrates digital filter design, matched filtering, and CFAR detection.

Overview

Signal processing fundamentals for radar and tracking:

  • Digital filters: FIR and IIR filter design

  • Matched filtering: Optimal detection in noise

  • CFAR detection: Constant False Alarm Rate processing

  • Spectral analysis: FFT, power spectrum, spectrograms

Digital Filters

FIR Filters
  • Finite Impulse Response

  • Linear phase available

  • Always stable

IIR Filters (Butterworth)
  • Infinite Impulse Response

  • Efficient implementation

  • Maximally flat passband

Matched Filtering

Matched filters maximize SNR for known waveforms:

  • Correlates signal with template

  • Optimal for white Gaussian noise

  • Pulse compression for radar

CFAR Detection

CFAR maintains constant false alarm rate:

  • CA-CFAR: Cell-averaging

  • GO-CFAR: Greatest-of

  • OS-CFAR: Ordered-statistic

  • Adaptive threshold estimation

Code Highlights

The example demonstrates:

  • Butterworth filter design with butter()

  • FIR filter design with firwin()

  • Matched filtering with matched_filter()

  • CA-CFAR with cfar_ca()

  • Power spectrum estimation

Source Code

  1"""
  2Signal Processing Example.
  3
  4This example demonstrates:
  51. Digital filter design (Butterworth, Chebyshev, FIR)
  62. Matched filtering for pulse detection
  73. CFAR (Constant False Alarm Rate) detection
  84. Power spectrum analysis
  9
 10Run with: python examples/signal_processing.py
 11"""
 12
 13import sys
 14from pathlib import Path
 15
 16sys.path.insert(0, str(Path(__file__).parent.parent))
 17
 18import numpy as np  # noqa: E402
 19import plotly.graph_objects as go  # noqa: E402
 20from plotly.subplots import make_subplots  # noqa: E402
 21
 22from pytcl.mathematical_functions.signal_processing import (  # noqa: E402
 23    apply_filter,
 24    butter_design,
 25    cfar_ca,
 26    cfar_go,
 27    cfar_os,
 28    cfar_so,
 29    cheby1_design,
 30    detection_probability,
 31    filtfilt,
 32    fir_design,
 33    frequency_response,
 34    matched_filter,
 35    pulse_compression,
 36    threshold_factor,
 37)
 38
 39
 40def filter_design_demo() -> None:
 41    """Demonstrate digital filter design."""
 42    print("=" * 60)
 43    print("1. DIGITAL FILTER DESIGN")
 44    print("=" * 60)
 45
 46    # Sample rate
 47    fs = 1000.0  # Hz
 48
 49    # Design Butterworth lowpass filter
 50    print("\nButterworth Lowpass Filter (order=4, cutoff=50 Hz):")
 51    butter_filt = butter_design(order=4, cutoff=50.0, fs=fs, btype="low")
 52    print(f"  Numerator coefficients (b): {butter_filt.b[:3]}...")
 53    print(f"  Denominator coefficients (a): {butter_filt.a[:3]}...")
 54
 55    # Design Chebyshev Type I filter
 56    print("\nChebyshev Type I Lowpass (order=4, ripple=1dB, cutoff=50 Hz):")
 57    cheby_filt = cheby1_design(order=4, ripple=1.0, cutoff=50.0, fs=fs, btype="low")
 58    print(f"  Numerator coefficients (b): {cheby_filt.b[:3]}...")
 59
 60    # Design FIR filter
 61    print("\nFIR Lowpass Filter (numtaps=51, cutoff=50 Hz):")
 62    fir_coeffs = fir_design(numtaps=51, cutoff=50.0, fs=fs, window="hamming")
 63    print(f"  Number of taps: {len(fir_coeffs)}")
 64    print(f"  Center tap value: {fir_coeffs[25]:.6f}")
 65
 66    # Compare frequency responses
 67    print("\nFrequency Response Comparison:")
 68    butter_resp = frequency_response(butter_filt.b, butter_filt.a, fs)
 69    cheby_resp = frequency_response(cheby_filt.b, cheby_filt.a, fs)
 70
 71    # Find -3dB point for Butterworth
 72    butter_mag_db = 20 * np.log10(np.maximum(butter_resp.magnitude, 1e-10))
 73    idx_3db = np.argmin(np.abs(butter_mag_db + 3))
 74    print(f"  Butterworth -3dB frequency: {butter_resp.frequencies[idx_3db]:.1f} Hz")
 75
 76    # Find -3dB point for Chebyshev
 77    cheby_mag_db = 20 * np.log10(np.maximum(cheby_resp.magnitude, 1e-10))
 78    idx_3db = np.argmin(np.abs(cheby_mag_db + 3))
 79    print(f"  Chebyshev -3dB frequency: {cheby_resp.frequencies[idx_3db]:.1f} Hz")
 80
 81
 82def filtering_demo() -> None:
 83    """Demonstrate signal filtering."""
 84    print("\n" + "=" * 60)
 85    print("2. SIGNAL FILTERING")
 86    print("=" * 60)
 87
 88    np.random.seed(42)
 89
 90    # Create a test signal: 10 Hz sine + 60 Hz noise
 91    fs = 1000.0
 92    t = np.linspace(0, 1, int(fs), endpoint=False)
 93    signal_clean = np.sin(2 * np.pi * 10 * t)  # 10 Hz signal
 94    noise = 0.5 * np.sin(2 * np.pi * 60 * t)  # 60 Hz noise
 95    signal_noisy = signal_clean + noise + 0.1 * np.random.randn(len(t))
 96
 97    print("\nTest signal: 10 Hz sine wave + 60 Hz interference + white noise")
 98    print(f"  Sample rate: {fs} Hz")
 99    print(f"  Duration: 1 second ({len(t)} samples)")
100
101    # Design 30 Hz lowpass filter to remove 60 Hz noise
102    filt = butter_design(order=4, cutoff=30.0, fs=fs, btype="low")
103
104    # Apply filter using different methods
105    print("\nFiltering methods:")
106
107    # Standard filter (has phase shift)
108    _filtered_standard = apply_filter(filt, signal_noisy)  # noqa: F841
109    print("  Standard filtering: introduces phase shift")
110
111    # Zero-phase filter (no phase shift)
112    filtered_zerophase = filtfilt(filt, signal_noisy)
113    print("  Zero-phase filtering: no phase shift (filtfilt)")
114
115    # Compute SNR improvement
116    noise_power_before = np.var(signal_noisy - signal_clean)
117    noise_power_after = np.var(filtered_zerophase - signal_clean)
118    snr_improvement = 10 * np.log10(noise_power_before / noise_power_after)
119    print(f"\n  SNR improvement: {snr_improvement:.1f} dB")
120
121    # RMS error
122    rms_before = np.sqrt(np.mean((signal_noisy - signal_clean) ** 2))
123    rms_after = np.sqrt(np.mean((filtered_zerophase - signal_clean) ** 2))
124    print(f"  RMS error before filtering: {rms_before:.4f}")
125    print(f"  RMS error after filtering:  {rms_after:.4f}")
126
127
128def matched_filter_demo() -> None:
129    """Demonstrate matched filtering for pulse detection."""
130    print("\n" + "=" * 60)
131    print("3. MATCHED FILTERING")
132    print("=" * 60)
133
134    np.random.seed(42)
135
136    # Create a chirp pulse
137    fs = 10000.0  # Sample rate
138    T = 0.01  # Pulse duration (10 ms)
139    f0 = 500  # Start frequency
140    f1 = 2000  # End frequency
141
142    t_pulse = np.linspace(0, T, int(T * fs), endpoint=False)
143    # Linear frequency sweep (chirp)
144    chirp = np.sin(2 * np.pi * (f0 + (f1 - f0) / (2 * T) * t_pulse) * t_pulse)
145
146    print("\nChirp pulse parameters:")
147    print(f"  Duration: {T * 1000:.1f} ms")
148    print(f"  Frequency sweep: {f0} Hz to {f1} Hz")
149    print(f"  Bandwidth: {f1 - f0} Hz")
150    print(f"  Time-bandwidth product: {(f1 - f0) * T:.1f}")
151
152    # Create received signal with delayed pulse in noise
153    n_samples = int(0.1 * fs)  # 100 ms of data
154    received = 0.5 * np.random.randn(n_samples)  # Noise
155
156    # Add pulse at known location with attenuation
157    pulse_location = 500
158    pulse_amplitude = 0.3
159    received[pulse_location : pulse_location + len(chirp)] += pulse_amplitude * chirp
160
161    # Matched filter
162    result = matched_filter(received, chirp, normalize=True)
163
164    print("\nMatched filter results:")
165    print(f"  True pulse location: sample {pulse_location}")
166    print(f"  Detected peak location: sample {result.peak_index}")
167    print(f"  Detection error: {abs(result.peak_index - pulse_location)} samples")
168    print(f"  Peak correlation value: {result.peak_value:.4f}")
169    print(f"  SNR gain: {result.snr_gain:.1f} dB")
170
171    # Pulse compression
172    pc_result = pulse_compression(received, chirp)
173    compressed_peak = np.argmax(np.abs(pc_result.output))
174    print("\nPulse compression:")
175    print(f"  Compressed pulse peak: sample {compressed_peak}")
176    print(f"  Compression ratio: {pc_result.compression_ratio:.0f}:1")
177
178
179def cfar_detection_demo() -> None:
180    """Demonstrate CFAR detection algorithms."""
181    print("\n" + "=" * 60)
182    print("4. CFAR DETECTION")
183    print("=" * 60)
184
185    np.random.seed(42)
186
187    # Create a range profile with targets
188    n_cells = 200
189    noise_power = 1.0
190    noise = np.sqrt(noise_power) * np.abs(np.random.randn(n_cells))
191
192    # Add targets at known locations
193    targets = [
194        (50, 15.0),  # Location, amplitude (strong target)
195        (100, 8.0),  # Medium target
196        (150, 5.0),  # Weak target
197    ]
198
199    signal = noise.copy()
200    for loc, amp in targets:
201        signal[loc] = amp
202
203    print("\nSimulated range profile:")
204    print(f"  {n_cells} range cells")
205    print(f"  Noise power: {noise_power:.1f}")
206    print(f"  Targets at cells: {[t[0] for t in targets]}")
207    print(f"  Target amplitudes: {[t[1] for t in targets]}")
208
209    # CFAR parameters
210    guard_cells = 2
211    ref_cells = 8
212    pfa = 1e-4  # Probability of false alarm
213
214    print("\nCFAR parameters:")
215    print(f"  Guard cells: {guard_cells}")
216    print(f"  Reference cells: {ref_cells}")
217    print(f"  Pfa: {pfa}")
218
219    # Compute threshold factor
220    alpha = threshold_factor(pfa, ref_cells, method="ca")
221    print(f"  Threshold factor (CA-CFAR): {alpha:.2f}")
222
223    # Run different CFAR algorithms
224    print("\nCFAR Detection Results:")
225    print("-" * 60)
226
227    # Cell-Averaging CFAR
228    ca_result = cfar_ca(signal, guard_cells, ref_cells, pfa)
229    ca_detections = ca_result.detection_indices
230    print("\nCA-CFAR (Cell-Averaging):")
231    print(f"  Detections: {ca_detections.tolist()}")
232    print(
233        f"  Targets detected: {len(set(ca_detections) & {t[0] for t in targets})}/{len(targets)}"
234    )
235
236    # Greatest-Of CFAR (good at clutter edges)
237    go_result = cfar_go(signal, guard_cells, ref_cells, pfa)
238    go_detections = go_result.detection_indices
239    print("\nGO-CFAR (Greatest-Of):")
240    print(f"  Detections: {go_detections.tolist()}")
241
242    # Smallest-Of CFAR (good in clutter)
243    so_result = cfar_so(signal, guard_cells, ref_cells, pfa)
244    so_detections = so_result.detection_indices
245    print("\nSO-CFAR (Smallest-Of):")
246    print(f"  Detections: {so_detections.tolist()}")
247
248    # Order-Statistic CFAR
249    k = int(0.75 * ref_cells)  # Use 75th percentile
250    os_result = cfar_os(signal, guard_cells, ref_cells, pfa, k)
251    os_detections = os_result.detection_indices
252    print(f"\nOS-CFAR (Order-Statistic, k={k}):")
253    print(f"  Detections: {os_detections.tolist()}")
254
255    # Detection probability analysis
256    print("\nDetection Probability vs SNR:")
257    print("-" * 40)
258    snr_values = [5, 10, 15, 20]
259    for snr in snr_values:
260        pd = detection_probability(snr, pfa, ref_cells, method="ca")
261        print(f"  SNR = {snr:2d} dB: Pd = {pd:.4f}")
262
263
264def spectrum_analysis_demo() -> None:
265    """Demonstrate power spectrum analysis."""
266    print("\n" + "=" * 60)
267    print("5. SPECTRUM ANALYSIS")
268    print("=" * 60)
269
270    np.random.seed(42)
271
272    # Create a multi-tone signal
273    fs = 1000.0
274    t = np.linspace(0, 1, int(fs), endpoint=False)
275
276    # Signal with known frequency components
277    frequencies = [50, 120, 200]  # Hz
278    amplitudes = [1.0, 0.5, 0.3]
279
280    signal = np.zeros_like(t)
281    for f, a in zip(frequencies, amplitudes):
282        signal += a * np.sin(2 * np.pi * f * t)
283
284    # Add noise
285    signal += 0.2 * np.random.randn(len(t))
286
287    print("\nTest signal:")
288    print(f"  Sample rate: {fs} Hz")
289    print("  Duration: 1 second")
290    print(f"  Frequency components: {frequencies} Hz")
291    print(f"  Amplitudes: {amplitudes}")
292
293    # Compute power spectrum using FFT
294    from pytcl.mathematical_functions.transforms import power_spectrum
295
296    ps = power_spectrum(signal, fs, window="hann", nperseg=256)
297
298    print("\nPower spectrum analysis:")
299    print(f"  Frequency resolution: {fs / 256:.2f} Hz")
300
301    # Find peaks in spectrum
302    psd_db = 10 * np.log10(np.maximum(ps.psd, 1e-10))
303    peak_threshold = np.max(psd_db) - 20  # Peaks within 20 dB of max
304
305    print(f"\nDetected frequency peaks (>{peak_threshold:.1f} dB):")
306    for i in range(1, len(psd_db) - 1):
307        if psd_db[i] > psd_db[i - 1] and psd_db[i] > psd_db[i + 1]:
308            if psd_db[i] > peak_threshold:
309                print(f"  {ps.frequencies[i]:.1f} Hz: {psd_db[i]:.1f} dB")
310
311
312def main() -> None:
313    """Run signal processing demonstrations."""
314    print("\nSignal Processing Examples")
315    print("=" * 60)
316    print("Demonstrating pytcl signal processing capabilities")
317
318    filter_design_demo()
319    filtering_demo()
320    matched_filter_demo()
321    cfar_detection_demo()
322    spectrum_analysis_demo()
323
324    # Visualization
325    visualize_filter_response()
326
327    print("\n" + "=" * 60)
328    print("Done!")
329    print("=" * 60)
330
331
332def visualize_filter_response() -> None:
333    """Visualize digital filter frequency response."""
334    print("\nGenerating filter response visualization...")
335
336    fs = 1000.0
337    butter_filt = butter_design(order=4, cutoff=50.0, fs=fs, btype="low")
338    fir_filt_coeffs = fir_design(numtaps=64, cutoff=50.0, fs=fs)
339
340    # Compute magnitude responses
341    butter_resp = frequency_response(butter_filt, fs)
342    fir_resp = frequency_response(fir_filt_coeffs, fs)
343
344    fig = make_subplots(specs=[[{"secondary_y": False}]])
345
346    fig.add_trace(
347        go.Scatter(
348            x=butter_resp.frequencies,
349            y=20 * np.log10(np.maximum(butter_resp.magnitude, 1e-10)),
350            mode="lines",
351            name="Butterworth (4th order)",
352            line=dict(color="blue", width=2),
353        )
354    )
355
356    fig.add_trace(
357        go.Scatter(
358            x=fir_resp.frequencies,
359            y=20 * np.log10(np.maximum(fir_resp.magnitude, 1e-10)),
360            mode="lines",
361            name="FIR (order 64)",
362            line=dict(color="red", width=2, dash="dash"),
363        )
364    )
365
366    fig.update_layout(
367        title="Digital Filter Frequency Response Comparison",
368        xaxis_title="Frequency (Hz)",
369        yaxis_title="Magnitude (dB)",
370        height=500,
371        width=800,
372        hovermode="x unified",
373    )
374
375    fig.show()
376
377
378if __name__ == "__main__":
379    main()

Running the Example

python examples/signal_processing.py

See Also