Signal Processing
This example demonstrates digital filter design, matched filtering, and CFAR detection.
Overview
Signal processing fundamentals for radar and tracking:
Digital filters: FIR and IIR filter design
Matched filtering: Optimal detection in noise
CFAR detection: Constant False Alarm Rate processing
Spectral analysis: FFT, power spectrum, spectrograms
Digital Filters
- FIR Filters
Finite Impulse Response
Linear phase available
Always stable
- IIR Filters (Butterworth)
Infinite Impulse Response
Efficient implementation
Maximally flat passband
Matched Filtering
Matched filters maximize SNR for known waveforms:
Correlates signal with template
Optimal for white Gaussian noise
Pulse compression for radar
CFAR Detection
CFAR maintains constant false alarm rate:
CA-CFAR: Cell-averaging
GO-CFAR: Greatest-of
OS-CFAR: Ordered-statistic
Adaptive threshold estimation
Code Highlights
The example demonstrates:
Butterworth filter design with
butter()FIR filter design with
firwin()Matched filtering with
matched_filter()CA-CFAR with
cfar_ca()Power spectrum estimation
Source Code
1"""
2Signal Processing Example.
3
4This example demonstrates:
51. Digital filter design (Butterworth, Chebyshev, FIR)
62. Matched filtering for pulse detection
73. CFAR (Constant False Alarm Rate) detection
84. Power spectrum analysis
9
10Run with: python examples/signal_processing.py
11"""
12
13import sys
14from pathlib import Path
15
16sys.path.insert(0, str(Path(__file__).parent.parent))
17
18import numpy as np # noqa: E402
19import plotly.graph_objects as go # noqa: E402
20from plotly.subplots import make_subplots # noqa: E402
21
22from pytcl.mathematical_functions.signal_processing import ( # noqa: E402
23 apply_filter,
24 butter_design,
25 cfar_ca,
26 cfar_go,
27 cfar_os,
28 cfar_so,
29 cheby1_design,
30 detection_probability,
31 filtfilt,
32 fir_design,
33 frequency_response,
34 matched_filter,
35 pulse_compression,
36 threshold_factor,
37)
38
39
40def filter_design_demo() -> None:
41 """Demonstrate digital filter design."""
42 print("=" * 60)
43 print("1. DIGITAL FILTER DESIGN")
44 print("=" * 60)
45
46 # Sample rate
47 fs = 1000.0 # Hz
48
49 # Design Butterworth lowpass filter
50 print("\nButterworth Lowpass Filter (order=4, cutoff=50 Hz):")
51 butter_filt = butter_design(order=4, cutoff=50.0, fs=fs, btype="low")
52 print(f" Numerator coefficients (b): {butter_filt.b[:3]}...")
53 print(f" Denominator coefficients (a): {butter_filt.a[:3]}...")
54
55 # Design Chebyshev Type I filter
56 print("\nChebyshev Type I Lowpass (order=4, ripple=1dB, cutoff=50 Hz):")
57 cheby_filt = cheby1_design(order=4, ripple=1.0, cutoff=50.0, fs=fs, btype="low")
58 print(f" Numerator coefficients (b): {cheby_filt.b[:3]}...")
59
60 # Design FIR filter
61 print("\nFIR Lowpass Filter (numtaps=51, cutoff=50 Hz):")
62 fir_coeffs = fir_design(numtaps=51, cutoff=50.0, fs=fs, window="hamming")
63 print(f" Number of taps: {len(fir_coeffs)}")
64 print(f" Center tap value: {fir_coeffs[25]:.6f}")
65
66 # Compare frequency responses
67 print("\nFrequency Response Comparison:")
68 butter_resp = frequency_response(butter_filt.b, butter_filt.a, fs)
69 cheby_resp = frequency_response(cheby_filt.b, cheby_filt.a, fs)
70
71 # Find -3dB point for Butterworth
72 butter_mag_db = 20 * np.log10(np.maximum(butter_resp.magnitude, 1e-10))
73 idx_3db = np.argmin(np.abs(butter_mag_db + 3))
74 print(f" Butterworth -3dB frequency: {butter_resp.frequencies[idx_3db]:.1f} Hz")
75
76 # Find -3dB point for Chebyshev
77 cheby_mag_db = 20 * np.log10(np.maximum(cheby_resp.magnitude, 1e-10))
78 idx_3db = np.argmin(np.abs(cheby_mag_db + 3))
79 print(f" Chebyshev -3dB frequency: {cheby_resp.frequencies[idx_3db]:.1f} Hz")
80
81
82def filtering_demo() -> None:
83 """Demonstrate signal filtering."""
84 print("\n" + "=" * 60)
85 print("2. SIGNAL FILTERING")
86 print("=" * 60)
87
88 np.random.seed(42)
89
90 # Create a test signal: 10 Hz sine + 60 Hz noise
91 fs = 1000.0
92 t = np.linspace(0, 1, int(fs), endpoint=False)
93 signal_clean = np.sin(2 * np.pi * 10 * t) # 10 Hz signal
94 noise = 0.5 * np.sin(2 * np.pi * 60 * t) # 60 Hz noise
95 signal_noisy = signal_clean + noise + 0.1 * np.random.randn(len(t))
96
97 print("\nTest signal: 10 Hz sine wave + 60 Hz interference + white noise")
98 print(f" Sample rate: {fs} Hz")
99 print(f" Duration: 1 second ({len(t)} samples)")
100
101 # Design 30 Hz lowpass filter to remove 60 Hz noise
102 filt = butter_design(order=4, cutoff=30.0, fs=fs, btype="low")
103
104 # Apply filter using different methods
105 print("\nFiltering methods:")
106
107 # Standard filter (has phase shift)
108 _filtered_standard = apply_filter(filt, signal_noisy) # noqa: F841
109 print(" Standard filtering: introduces phase shift")
110
111 # Zero-phase filter (no phase shift)
112 filtered_zerophase = filtfilt(filt, signal_noisy)
113 print(" Zero-phase filtering: no phase shift (filtfilt)")
114
115 # Compute SNR improvement
116 noise_power_before = np.var(signal_noisy - signal_clean)
117 noise_power_after = np.var(filtered_zerophase - signal_clean)
118 snr_improvement = 10 * np.log10(noise_power_before / noise_power_after)
119 print(f"\n SNR improvement: {snr_improvement:.1f} dB")
120
121 # RMS error
122 rms_before = np.sqrt(np.mean((signal_noisy - signal_clean) ** 2))
123 rms_after = np.sqrt(np.mean((filtered_zerophase - signal_clean) ** 2))
124 print(f" RMS error before filtering: {rms_before:.4f}")
125 print(f" RMS error after filtering: {rms_after:.4f}")
126
127
128def matched_filter_demo() -> None:
129 """Demonstrate matched filtering for pulse detection."""
130 print("\n" + "=" * 60)
131 print("3. MATCHED FILTERING")
132 print("=" * 60)
133
134 np.random.seed(42)
135
136 # Create a chirp pulse
137 fs = 10000.0 # Sample rate
138 T = 0.01 # Pulse duration (10 ms)
139 f0 = 500 # Start frequency
140 f1 = 2000 # End frequency
141
142 t_pulse = np.linspace(0, T, int(T * fs), endpoint=False)
143 # Linear frequency sweep (chirp)
144 chirp = np.sin(2 * np.pi * (f0 + (f1 - f0) / (2 * T) * t_pulse) * t_pulse)
145
146 print("\nChirp pulse parameters:")
147 print(f" Duration: {T * 1000:.1f} ms")
148 print(f" Frequency sweep: {f0} Hz to {f1} Hz")
149 print(f" Bandwidth: {f1 - f0} Hz")
150 print(f" Time-bandwidth product: {(f1 - f0) * T:.1f}")
151
152 # Create received signal with delayed pulse in noise
153 n_samples = int(0.1 * fs) # 100 ms of data
154 received = 0.5 * np.random.randn(n_samples) # Noise
155
156 # Add pulse at known location with attenuation
157 pulse_location = 500
158 pulse_amplitude = 0.3
159 received[pulse_location : pulse_location + len(chirp)] += pulse_amplitude * chirp
160
161 # Matched filter
162 result = matched_filter(received, chirp, normalize=True)
163
164 print("\nMatched filter results:")
165 print(f" True pulse location: sample {pulse_location}")
166 print(f" Detected peak location: sample {result.peak_index}")
167 print(f" Detection error: {abs(result.peak_index - pulse_location)} samples")
168 print(f" Peak correlation value: {result.peak_value:.4f}")
169 print(f" SNR gain: {result.snr_gain:.1f} dB")
170
171 # Pulse compression
172 pc_result = pulse_compression(received, chirp)
173 compressed_peak = np.argmax(np.abs(pc_result.output))
174 print("\nPulse compression:")
175 print(f" Compressed pulse peak: sample {compressed_peak}")
176 print(f" Compression ratio: {pc_result.compression_ratio:.0f}:1")
177
178
179def cfar_detection_demo() -> None:
180 """Demonstrate CFAR detection algorithms."""
181 print("\n" + "=" * 60)
182 print("4. CFAR DETECTION")
183 print("=" * 60)
184
185 np.random.seed(42)
186
187 # Create a range profile with targets
188 n_cells = 200
189 noise_power = 1.0
190 noise = np.sqrt(noise_power) * np.abs(np.random.randn(n_cells))
191
192 # Add targets at known locations
193 targets = [
194 (50, 15.0), # Location, amplitude (strong target)
195 (100, 8.0), # Medium target
196 (150, 5.0), # Weak target
197 ]
198
199 signal = noise.copy()
200 for loc, amp in targets:
201 signal[loc] = amp
202
203 print("\nSimulated range profile:")
204 print(f" {n_cells} range cells")
205 print(f" Noise power: {noise_power:.1f}")
206 print(f" Targets at cells: {[t[0] for t in targets]}")
207 print(f" Target amplitudes: {[t[1] for t in targets]}")
208
209 # CFAR parameters
210 guard_cells = 2
211 ref_cells = 8
212 pfa = 1e-4 # Probability of false alarm
213
214 print("\nCFAR parameters:")
215 print(f" Guard cells: {guard_cells}")
216 print(f" Reference cells: {ref_cells}")
217 print(f" Pfa: {pfa}")
218
219 # Compute threshold factor
220 alpha = threshold_factor(pfa, ref_cells, method="ca")
221 print(f" Threshold factor (CA-CFAR): {alpha:.2f}")
222
223 # Run different CFAR algorithms
224 print("\nCFAR Detection Results:")
225 print("-" * 60)
226
227 # Cell-Averaging CFAR
228 ca_result = cfar_ca(signal, guard_cells, ref_cells, pfa)
229 ca_detections = ca_result.detection_indices
230 print("\nCA-CFAR (Cell-Averaging):")
231 print(f" Detections: {ca_detections.tolist()}")
232 print(
233 f" Targets detected: {len(set(ca_detections) & {t[0] for t in targets})}/{len(targets)}"
234 )
235
236 # Greatest-Of CFAR (good at clutter edges)
237 go_result = cfar_go(signal, guard_cells, ref_cells, pfa)
238 go_detections = go_result.detection_indices
239 print("\nGO-CFAR (Greatest-Of):")
240 print(f" Detections: {go_detections.tolist()}")
241
242 # Smallest-Of CFAR (good in clutter)
243 so_result = cfar_so(signal, guard_cells, ref_cells, pfa)
244 so_detections = so_result.detection_indices
245 print("\nSO-CFAR (Smallest-Of):")
246 print(f" Detections: {so_detections.tolist()}")
247
248 # Order-Statistic CFAR
249 k = int(0.75 * ref_cells) # Use 75th percentile
250 os_result = cfar_os(signal, guard_cells, ref_cells, pfa, k)
251 os_detections = os_result.detection_indices
252 print(f"\nOS-CFAR (Order-Statistic, k={k}):")
253 print(f" Detections: {os_detections.tolist()}")
254
255 # Detection probability analysis
256 print("\nDetection Probability vs SNR:")
257 print("-" * 40)
258 snr_values = [5, 10, 15, 20]
259 for snr in snr_values:
260 pd = detection_probability(snr, pfa, ref_cells, method="ca")
261 print(f" SNR = {snr:2d} dB: Pd = {pd:.4f}")
262
263
264def spectrum_analysis_demo() -> None:
265 """Demonstrate power spectrum analysis."""
266 print("\n" + "=" * 60)
267 print("5. SPECTRUM ANALYSIS")
268 print("=" * 60)
269
270 np.random.seed(42)
271
272 # Create a multi-tone signal
273 fs = 1000.0
274 t = np.linspace(0, 1, int(fs), endpoint=False)
275
276 # Signal with known frequency components
277 frequencies = [50, 120, 200] # Hz
278 amplitudes = [1.0, 0.5, 0.3]
279
280 signal = np.zeros_like(t)
281 for f, a in zip(frequencies, amplitudes):
282 signal += a * np.sin(2 * np.pi * f * t)
283
284 # Add noise
285 signal += 0.2 * np.random.randn(len(t))
286
287 print("\nTest signal:")
288 print(f" Sample rate: {fs} Hz")
289 print(" Duration: 1 second")
290 print(f" Frequency components: {frequencies} Hz")
291 print(f" Amplitudes: {amplitudes}")
292
293 # Compute power spectrum using FFT
294 from pytcl.mathematical_functions.transforms import power_spectrum
295
296 ps = power_spectrum(signal, fs, window="hann", nperseg=256)
297
298 print("\nPower spectrum analysis:")
299 print(f" Frequency resolution: {fs / 256:.2f} Hz")
300
301 # Find peaks in spectrum
302 psd_db = 10 * np.log10(np.maximum(ps.psd, 1e-10))
303 peak_threshold = np.max(psd_db) - 20 # Peaks within 20 dB of max
304
305 print(f"\nDetected frequency peaks (>{peak_threshold:.1f} dB):")
306 for i in range(1, len(psd_db) - 1):
307 if psd_db[i] > psd_db[i - 1] and psd_db[i] > psd_db[i + 1]:
308 if psd_db[i] > peak_threshold:
309 print(f" {ps.frequencies[i]:.1f} Hz: {psd_db[i]:.1f} dB")
310
311
312def main() -> None:
313 """Run signal processing demonstrations."""
314 print("\nSignal Processing Examples")
315 print("=" * 60)
316 print("Demonstrating pytcl signal processing capabilities")
317
318 filter_design_demo()
319 filtering_demo()
320 matched_filter_demo()
321 cfar_detection_demo()
322 spectrum_analysis_demo()
323
324 # Visualization
325 visualize_filter_response()
326
327 print("\n" + "=" * 60)
328 print("Done!")
329 print("=" * 60)
330
331
332def visualize_filter_response() -> None:
333 """Visualize digital filter frequency response."""
334 print("\nGenerating filter response visualization...")
335
336 fs = 1000.0
337 butter_filt = butter_design(order=4, cutoff=50.0, fs=fs, btype="low")
338 fir_filt_coeffs = fir_design(numtaps=64, cutoff=50.0, fs=fs)
339
340 # Compute magnitude responses
341 butter_resp = frequency_response(butter_filt, fs)
342 fir_resp = frequency_response(fir_filt_coeffs, fs)
343
344 fig = make_subplots(specs=[[{"secondary_y": False}]])
345
346 fig.add_trace(
347 go.Scatter(
348 x=butter_resp.frequencies,
349 y=20 * np.log10(np.maximum(butter_resp.magnitude, 1e-10)),
350 mode="lines",
351 name="Butterworth (4th order)",
352 line=dict(color="blue", width=2),
353 )
354 )
355
356 fig.add_trace(
357 go.Scatter(
358 x=fir_resp.frequencies,
359 y=20 * np.log10(np.maximum(fir_resp.magnitude, 1e-10)),
360 mode="lines",
361 name="FIR (order 64)",
362 line=dict(color="red", width=2, dash="dash"),
363 )
364 )
365
366 fig.update_layout(
367 title="Digital Filter Frequency Response Comparison",
368 xaxis_title="Frequency (Hz)",
369 yaxis_title="Magnitude (dB)",
370 height=500,
371 width=800,
372 hovermode="x unified",
373 )
374
375 fig.show()
376
377
378if __name__ == "__main__":
379 main()
Running the Example
python examples/signal_processing.py
See Also
Transforms - FFT and spectral analysis