Transforms
This example demonstrates FFT, power spectrum, wavelets, and other transforms.
Overview
Transform methods convert signals between domains:
Fourier Transform: Time to frequency domain
Short-Time Fourier: Time-frequency analysis
Wavelets: Multi-resolution analysis
Power Spectrum: Signal power distribution
Fourier Analysis
- FFT (Fast Fourier Transform)
O(n log n) algorithm
Frequency content of signals
Foundation for spectral analysis
- Power Spectrum
Signal power vs frequency
Periodogram estimation
Welch’s method for noise reduction
- Spectrogram
Time-frequency representation
Short-time Fourier Transform
Frequency changes over time
Wavelet Analysis
- Continuous Wavelet Transform (CWT)
Multi-scale analysis
Good time-frequency localization
Various mother wavelets
- Discrete Wavelet Transform (DWT)
Efficient decomposition
Signal compression
Denoising applications
Code Highlights
The example demonstrates:
FFT with
fft()andifft()Power spectrum with
power_spectrum()Spectrogram with
spectrogram()Wavelet transforms with
cwt()anddwt()
Source Code
1"""
2Transforms Example.
3
4This example demonstrates:
51. FFT and power spectrum analysis
62. Short-time Fourier Transform (STFT)
73. Spectrogram visualization
84. Wavelet transforms (CWT, DWT)
9
10Run with: python examples/transforms.py
11"""
12
13import sys
14from pathlib import Path
15
16sys.path.insert(0, str(Path(__file__).parent.parent))
17
18import numpy as np # noqa: E402
19import plotly.graph_objects as go # noqa: E402
20from plotly.subplots import make_subplots # noqa: E402
21
22from pytcl.mathematical_functions.transforms import ( # noqa: E402
23 coherence,
24 cross_spectrum,
25 cwt,
26 dwt,
27 fft,
28 idwt,
29 ifft,
30 istft,
31 morlet_wavelet,
32 power_spectrum,
33 rfft,
34 ricker_wavelet,
35 spectrogram,
36 stft,
37)
38
39
40def fft_demo() -> None:
41 """Demonstrate FFT operations."""
42 print("=" * 60)
43 print("1. FFT AND SPECTRAL ANALYSIS")
44 print("=" * 60)
45
46 np.random.seed(42)
47
48 # Create a test signal with known frequency content
49 fs = 1000.0 # Sample rate
50 t = np.linspace(0, 1, int(fs), endpoint=False)
51
52 # Multi-tone signal
53 f1, f2, f3 = 50, 120, 200 # Hz
54 signal = np.sin(2 * np.pi * f1 * t) + 0.5 * np.sin(2 * np.pi * f2 * t)
55 signal += 0.25 * np.sin(2 * np.pi * f3 * t)
56
57 print(f"\nTest signal: sum of sinusoids at {f1}, {f2}, {f3} Hz")
58 print(f" Sample rate: {fs} Hz")
59 print(f" Duration: 1 second ({len(t)} samples)")
60
61 # Compute FFT
62 X = fft(signal)
63 freqs = np.fft.fftfreq(len(signal), 1 / fs)
64
65 print("\nFFT Results:")
66 print(f" FFT length: {len(X)}")
67 print(f" Frequency resolution: {fs / len(signal):.2f} Hz")
68
69 # Find peaks in positive frequencies
70 pos_mask = freqs >= 0
71 pos_freqs = freqs[pos_mask]
72 pos_mag = np.abs(X[pos_mask])
73
74 # Find significant peaks
75 peak_threshold = 0.1 * np.max(pos_mag)
76 print("\nSignificant frequency peaks:")
77 for i in range(1, len(pos_mag) - 1):
78 if pos_mag[i] > pos_mag[i - 1] and pos_mag[i] > pos_mag[i + 1]:
79 if pos_mag[i] > peak_threshold:
80 print(f" {pos_freqs[i]:.1f} Hz: magnitude = {pos_mag[i]:.2f}")
81
82 # Verify inverse FFT
83 signal_recovered = ifft(X).real
84 reconstruction_error = np.max(np.abs(signal - signal_recovered))
85 print(f"\nIFFT reconstruction error: {reconstruction_error:.2e}")
86
87 # Real FFT (more efficient for real signals)
88 print("\nReal FFT (rfft):")
89 X_real = rfft(signal)
90 print(f" rfft length: {len(X_real)} (vs {len(X)} for full fft)")
91 print(f" Memory savings: {100 * (1 - len(X_real) / len(X)):.1f}%")
92
93
94def power_spectrum_demo() -> None:
95 """Demonstrate power spectrum estimation."""
96 print("\n" + "=" * 60)
97 print("2. POWER SPECTRUM ESTIMATION")
98 print("=" * 60)
99
100 np.random.seed(42)
101
102 # Create a noisy signal with known spectrum
103 fs = 1000.0
104 t = np.linspace(0, 2, int(2 * fs), endpoint=False)
105
106 # Signal with two frequency components plus noise
107 signal = np.sin(2 * np.pi * 50 * t) + 0.5 * np.sin(2 * np.pi * 120 * t)
108 signal += 0.5 * np.random.randn(len(t))
109
110 print("\nSignal: 50 Hz + 120 Hz sinusoids in noise")
111 print(f" SNR (approx): {10 * np.log10(1.25 / 0.25):.1f} dB")
112
113 # Welch's method power spectrum
114 ps = power_spectrum(signal, fs, window="hann", nperseg=256)
115
116 print("\nPower Spectrum (Welch's method):")
117 print(f" Number of frequency bins: {len(ps.frequencies)}")
118 print(f" Frequency resolution: {ps.frequencies[1] - ps.frequencies[0]:.2f} Hz")
119
120 # Find peaks
121 psd_db = 10 * np.log10(np.maximum(ps.psd, 1e-10))
122 print("\nPeak frequencies:")
123 for i in range(1, len(psd_db) - 1):
124 if psd_db[i] > psd_db[i - 1] and psd_db[i] > psd_db[i + 1]:
125 if psd_db[i] > np.max(psd_db) - 20:
126 print(f" {ps.frequencies[i]:.1f} Hz: {psd_db[i]:.1f} dB")
127
128
129def cross_spectrum_demo() -> None:
130 """Demonstrate cross-spectrum and coherence."""
131 print("\n" + "=" * 60)
132 print("3. CROSS-SPECTRUM AND COHERENCE")
133 print("=" * 60)
134
135 np.random.seed(42)
136
137 # Create two related signals
138 fs = 1000.0
139 t = np.linspace(0, 2, int(2 * fs), endpoint=False)
140
141 # Common component
142 common = np.sin(2 * np.pi * 50 * t)
143
144 # Independent noise
145 noise1 = 0.5 * np.random.randn(len(t))
146 noise2 = 0.5 * np.random.randn(len(t))
147
148 # Signal 1: common + independent noise
149 x = common + noise1
150 # Signal 2: common (delayed) + independent noise
151 delay_samples = 10
152 y = np.roll(common, delay_samples) + noise2
153
154 print("\nTwo signals with common 50 Hz component:")
155 print(" Signal x: 50 Hz + noise")
156 print(f" Signal y: 50 Hz (delayed {delay_samples} samples) + noise")
157
158 # Cross-spectrum
159 csd = cross_spectrum(x, y, fs, window="hann")
160 print("\nCross-spectrum:")
161 print(f" Frequency bins: {len(csd.frequencies)}")
162
163 # Find phase at 50 Hz
164 idx_50hz = np.argmin(np.abs(csd.frequencies - 50))
165 phase_50hz = np.angle(csd.csd[idx_50hz])
166 expected_phase = -2 * np.pi * 50 * delay_samples / fs
167 print("\nPhase at 50 Hz:")
168 print(f" Measured: {np.degrees(phase_50hz):.1f} deg")
169 print(f" Expected: {np.degrees(expected_phase):.1f} deg")
170
171 # Coherence
172 coh = coherence(x, y, fs, nperseg=256)
173 print("\nCoherence at 50 Hz:")
174 idx_coh = np.argmin(np.abs(coh.frequencies - 50))
175 print(f" Coherence: {coh.coherence[idx_coh]:.3f}")
176 print(" (1.0 = perfectly correlated, 0.0 = uncorrelated)")
177
178
179def stft_demo() -> None:
180 """Demonstrate Short-Time Fourier Transform."""
181 print("\n" + "=" * 60)
182 print("4. SHORT-TIME FOURIER TRANSFORM (STFT)")
183 print("=" * 60)
184
185 np.random.seed(42)
186
187 # Create a chirp signal (frequency varies with time)
188 fs = 1000.0
189 duration = 2.0
190 t = np.linspace(0, duration, int(duration * fs), endpoint=False)
191
192 # Linear chirp from 50 Hz to 200 Hz
193 f0, f1 = 50, 200
194 chirp = np.sin(2 * np.pi * (f0 + (f1 - f0) / (2 * duration) * t) * t)
195
196 print(f"\nChirp signal: frequency sweeps from {f0} to {f1} Hz")
197 print(f" Duration: {duration} seconds")
198 print(f" Sample rate: {fs} Hz")
199
200 # Compute STFT
201 nperseg = 128
202 noverlap = nperseg // 2
203 result = stft(chirp, fs, window="hann", nperseg=nperseg, noverlap=noverlap)
204
205 print("\nSTFT Results:")
206 print(f" Segment length: {nperseg} samples")
207 print(f" Overlap: {noverlap} samples")
208 print(f" Number of time frames: {len(result.times)}")
209 print(f" Number of frequency bins: {len(result.frequencies)}")
210 print(f" Time resolution: {result.times[1] - result.times[0]:.3f} s")
211 print(
212 f" Frequency resolution: {result.frequencies[1] - result.frequencies[0]:.2f} Hz"
213 )
214
215 # Verify reconstruction with inverse STFT
216 t_rec, reconstructed = istft(
217 result.Zxx, fs, window="hann", nperseg=nperseg, noverlap=noverlap
218 )
219 min_len = min(len(chirp), len(reconstructed))
220 recon_error = np.sqrt(np.mean((chirp[:min_len] - reconstructed[:min_len]) ** 2))
221 print(f"\nReconstruction RMS error: {recon_error:.6f}")
222
223 # Track instantaneous frequency over time
224 print("\nInstantaneous frequency tracking:")
225 for i in range(0, len(result.times), len(result.times) // 5):
226 # Find peak frequency at this time
227 mag = np.abs(result.Zxx[:, i])
228 peak_idx = np.argmax(mag)
229 peak_freq = result.frequencies[peak_idx]
230 expected_freq = f0 + (f1 - f0) * result.times[i] / duration
231 print(
232 f" t={result.times[i]:.2f}s: measured={peak_freq:.1f} Hz, "
233 f"expected={expected_freq:.1f} Hz"
234 )
235
236
237def spectrogram_demo() -> None:
238 """Demonstrate spectrogram computation."""
239 print("\n" + "=" * 60)
240 print("5. SPECTROGRAM")
241 print("=" * 60)
242
243 np.random.seed(42)
244
245 # Create a signal with time-varying frequency content
246 fs = 1000.0
247 duration = 3.0
248 t = np.linspace(0, duration, int(duration * fs), endpoint=False)
249
250 # Three segments with different frequencies
251 signal = np.zeros_like(t)
252 signal[t < 1] = np.sin(2 * np.pi * 50 * t[t < 1]) # 50 Hz
253 signal[(t >= 1) & (t < 2)] = np.sin(
254 2 * np.pi * 100 * t[(t >= 1) & (t < 2)]
255 ) # 100 Hz
256 signal[t >= 2] = np.sin(2 * np.pi * 150 * t[t >= 2]) # 150 Hz
257
258 print("\nTime-varying signal:")
259 print(" 0-1 s: 50 Hz")
260 print(" 1-2 s: 100 Hz")
261 print(" 2-3 s: 150 Hz")
262
263 # Compute spectrogram
264 spec = spectrogram(signal, fs, window="hann", nperseg=256, noverlap=128)
265
266 print("\nSpectrogram dimensions:")
267 print(f" Time bins: {len(spec.times)}")
268 print(f" Frequency bins: {len(spec.frequencies)}")
269
270 # Find dominant frequency in each time segment
271 print("\nDominant frequencies by segment:")
272 time_points = [0.5, 1.5, 2.5] # Center of each segment
273 for tp in time_points:
274 idx = np.argmin(np.abs(spec.times - tp))
275 power_slice = spec.power[:, idx]
276 peak_idx = np.argmax(power_slice)
277 print(f" t={tp}s: {spec.frequencies[peak_idx]:.1f} Hz")
278
279
280def wavelet_demo() -> None:
281 """Demonstrate wavelet transforms."""
282 print("\n" + "=" * 60)
283 print("6. WAVELET TRANSFORMS")
284 print("=" * 60)
285
286 np.random.seed(42)
287
288 # Create a signal with a transient event
289 fs = 1000.0
290 t = np.linspace(0, 1, int(fs), endpoint=False)
291
292 # Background oscillation + transient pulse
293 signal = 0.5 * np.sin(2 * np.pi * 10 * t) # 10 Hz background
294 # Add transient at t=0.5s
295 transient_center = 0.5
296 transient_width = 0.02
297 transient = np.exp(-((t - transient_center) ** 2) / (2 * transient_width**2))
298 signal += transient
299
300 print("\nTest signal: 10 Hz oscillation + Gaussian pulse at t=0.5s")
301
302 # Generate wavelet shapes
303 print("\nWavelet shapes:")
304 morlet = morlet_wavelet(64, w=5.0)
305 ricker = ricker_wavelet(64, a=8.0)
306 print(f" Morlet wavelet: {len(morlet)} points")
307 print(f" Ricker (Mexican hat) wavelet: {len(ricker)} points")
308
309 # Continuous Wavelet Transform
310 scales = np.arange(1, 64)
311 cwt_result = cwt(signal, scales, wavelet="morlet", fs=fs)
312
313 print("\nContinuous Wavelet Transform (CWT):")
314 print(f" Number of scales: {len(cwt_result.scales)}")
315 print(
316 f" Frequency range: {cwt_result.frequencies[-1]:.1f} to {cwt_result.frequencies[0]:.1f} Hz"
317 )
318
319 # Find the transient in CWT
320 cwt_mag = np.abs(cwt_result.coefficients)
321 max_idx = np.unravel_index(np.argmax(cwt_mag), cwt_mag.shape)
322 peak_time = t[max_idx[1]]
323 peak_freq = cwt_result.frequencies[max_idx[0]]
324 print("\nTransient detection:")
325 print(f" Peak at time: {peak_time:.3f} s (true: {transient_center} s)")
326 print(f" Peak frequency: {peak_freq:.1f} Hz")
327
328 # Discrete Wavelet Transform
329 print("\nDiscrete Wavelet Transform (DWT):")
330 dwt_result = dwt(signal, wavelet="db4", level=4)
331 print(" Wavelet: Daubechies 4 (db4)")
332 print(f" Decomposition levels: {dwt_result.levels}")
333 print(f" Approximation coefficients: {len(dwt_result.cA)} samples")
334 for i, cD in enumerate(dwt_result.cD):
335 print(f" Detail level {i + 1}: {len(cD)} samples")
336
337 # Verify reconstruction
338 reconstructed = idwt(dwt_result)
339 min_len = min(len(signal), len(reconstructed))
340 recon_error = np.sqrt(np.mean((signal[:min_len] - reconstructed[:min_len]) ** 2))
341 print(f"\nDWT reconstruction RMS error: {recon_error:.6f}")
342
343
344def main() -> None:
345 """Run transform demonstrations."""
346 print("\nTransforms Examples")
347 print("=" * 60)
348 print("Demonstrating pytcl transform capabilities")
349
350 fft_demo()
351 power_spectrum_demo()
352 cross_spectrum_demo()
353 stft_demo()
354 spectrogram_demo()
355 wavelet_demo()
356
357 # Visualization
358 visualize_fft_analysis()
359
360 print("\n" + "=" * 60)
361 print("Done!")
362 print("=" * 60)
363
364
365def visualize_fft_analysis() -> None:
366 """Visualize FFT analysis of multi-frequency signal."""
367 print("\nGenerating FFT analysis visualization...")
368
369 # Create test signal
370 fs = 1000.0
371 t = np.linspace(0, 1, int(fs), endpoint=False)
372 f1, f2, f3 = 50, 120, 200
373
374 signal = (
375 np.sin(2 * np.pi * f1 * t)
376 + 0.5 * np.sin(2 * np.pi * f2 * t)
377 + 0.25 * np.sin(2 * np.pi * f3 * t)
378 )
379 signal += 0.1 * np.random.randn(len(t))
380
381 # Compute FFT
382 X = fft(signal)
383 freqs = np.fft.fftfreq(len(signal), 1 / fs)
384
385 # Only positive frequencies
386 pos_mask = freqs >= 0
387 pos_freqs = freqs[pos_mask]
388 pos_mag = np.abs(X[pos_mask])
389
390 fig = make_subplots(
391 rows=1,
392 cols=2,
393 subplot_titles=("Time Domain Signal", "Frequency Domain (FFT)"),
394 )
395
396 # Time domain
397 fig.add_trace(
398 go.Scatter(
399 x=t,
400 y=signal,
401 mode="lines",
402 name="Signal",
403 line=dict(color="blue", width=1),
404 ),
405 row=1,
406 col=1,
407 )
408
409 # Frequency domain
410 fig.add_trace(
411 go.Scatter(
412 x=pos_freqs[:500],
413 y=pos_mag[:500],
414 mode="lines",
415 name="Magnitude",
416 line=dict(color="red", width=1),
417 ),
418 row=1,
419 col=2,
420 )
421
422 fig.update_xaxes(title_text="Time (s)", row=1, col=1)
423 fig.update_yaxes(title_text="Amplitude", row=1, col=1)
424 fig.update_xaxes(title_text="Frequency (Hz)", row=1, col=2)
425 fig.update_yaxes(title_text="Magnitude", row=1, col=2)
426
427 fig.update_layout(
428 title="FFT Analysis: Multi-Frequency Signal",
429 height=500,
430 width=1000,
431 showlegend=False,
432 )
433
434 fig.show()
435
436
437if __name__ == "__main__":
438 main()
Running the Example
python examples/transforms.py
See Also
Signal Processing - Filter design and detection