Transforms

This example demonstrates FFT, power spectrum, wavelets, and other transforms.

Overview

Transform methods convert signals between domains:

  • Fourier Transform: Time to frequency domain

  • Short-Time Fourier: Time-frequency analysis

  • Wavelets: Multi-resolution analysis

  • Power Spectrum: Signal power distribution

Fourier Analysis

FFT (Fast Fourier Transform)
  • O(n log n) algorithm

  • Frequency content of signals

  • Foundation for spectral analysis

Power Spectrum
  • Signal power vs frequency

  • Periodogram estimation

  • Welch’s method for noise reduction

Spectrogram
  • Time-frequency representation

  • Short-time Fourier Transform

  • Frequency changes over time

Wavelet Analysis

Continuous Wavelet Transform (CWT)
  • Multi-scale analysis

  • Good time-frequency localization

  • Various mother wavelets

Discrete Wavelet Transform (DWT)
  • Efficient decomposition

  • Signal compression

  • Denoising applications

Code Highlights

The example demonstrates:

  • FFT with fft() and ifft()

  • Power spectrum with power_spectrum()

  • Spectrogram with spectrogram()

  • Wavelet transforms with cwt() and dwt()

Source Code

  1"""
  2Transforms Example.
  3
  4This example demonstrates:
  51. FFT and power spectrum analysis
  62. Short-time Fourier Transform (STFT)
  73. Spectrogram visualization
  84. Wavelet transforms (CWT, DWT)
  9
 10Run with: python examples/transforms.py
 11"""
 12
 13import sys
 14from pathlib import Path
 15
 16sys.path.insert(0, str(Path(__file__).parent.parent))
 17
 18import numpy as np  # noqa: E402
 19import plotly.graph_objects as go  # noqa: E402
 20from plotly.subplots import make_subplots  # noqa: E402
 21
 22from pytcl.mathematical_functions.transforms import (  # noqa: E402
 23    coherence,
 24    cross_spectrum,
 25    cwt,
 26    dwt,
 27    fft,
 28    idwt,
 29    ifft,
 30    istft,
 31    morlet_wavelet,
 32    power_spectrum,
 33    rfft,
 34    ricker_wavelet,
 35    spectrogram,
 36    stft,
 37)
 38
 39
 40def fft_demo() -> None:
 41    """Demonstrate FFT operations."""
 42    print("=" * 60)
 43    print("1. FFT AND SPECTRAL ANALYSIS")
 44    print("=" * 60)
 45
 46    np.random.seed(42)
 47
 48    # Create a test signal with known frequency content
 49    fs = 1000.0  # Sample rate
 50    t = np.linspace(0, 1, int(fs), endpoint=False)
 51
 52    # Multi-tone signal
 53    f1, f2, f3 = 50, 120, 200  # Hz
 54    signal = np.sin(2 * np.pi * f1 * t) + 0.5 * np.sin(2 * np.pi * f2 * t)
 55    signal += 0.25 * np.sin(2 * np.pi * f3 * t)
 56
 57    print(f"\nTest signal: sum of sinusoids at {f1}, {f2}, {f3} Hz")
 58    print(f"  Sample rate: {fs} Hz")
 59    print(f"  Duration: 1 second ({len(t)} samples)")
 60
 61    # Compute FFT
 62    X = fft(signal)
 63    freqs = np.fft.fftfreq(len(signal), 1 / fs)
 64
 65    print("\nFFT Results:")
 66    print(f"  FFT length: {len(X)}")
 67    print(f"  Frequency resolution: {fs / len(signal):.2f} Hz")
 68
 69    # Find peaks in positive frequencies
 70    pos_mask = freqs >= 0
 71    pos_freqs = freqs[pos_mask]
 72    pos_mag = np.abs(X[pos_mask])
 73
 74    # Find significant peaks
 75    peak_threshold = 0.1 * np.max(pos_mag)
 76    print("\nSignificant frequency peaks:")
 77    for i in range(1, len(pos_mag) - 1):
 78        if pos_mag[i] > pos_mag[i - 1] and pos_mag[i] > pos_mag[i + 1]:
 79            if pos_mag[i] > peak_threshold:
 80                print(f"  {pos_freqs[i]:.1f} Hz: magnitude = {pos_mag[i]:.2f}")
 81
 82    # Verify inverse FFT
 83    signal_recovered = ifft(X).real
 84    reconstruction_error = np.max(np.abs(signal - signal_recovered))
 85    print(f"\nIFFT reconstruction error: {reconstruction_error:.2e}")
 86
 87    # Real FFT (more efficient for real signals)
 88    print("\nReal FFT (rfft):")
 89    X_real = rfft(signal)
 90    print(f"  rfft length: {len(X_real)} (vs {len(X)} for full fft)")
 91    print(f"  Memory savings: {100 * (1 - len(X_real) / len(X)):.1f}%")
 92
 93
 94def power_spectrum_demo() -> None:
 95    """Demonstrate power spectrum estimation."""
 96    print("\n" + "=" * 60)
 97    print("2. POWER SPECTRUM ESTIMATION")
 98    print("=" * 60)
 99
100    np.random.seed(42)
101
102    # Create a noisy signal with known spectrum
103    fs = 1000.0
104    t = np.linspace(0, 2, int(2 * fs), endpoint=False)
105
106    # Signal with two frequency components plus noise
107    signal = np.sin(2 * np.pi * 50 * t) + 0.5 * np.sin(2 * np.pi * 120 * t)
108    signal += 0.5 * np.random.randn(len(t))
109
110    print("\nSignal: 50 Hz + 120 Hz sinusoids in noise")
111    print(f"  SNR (approx): {10 * np.log10(1.25 / 0.25):.1f} dB")
112
113    # Welch's method power spectrum
114    ps = power_spectrum(signal, fs, window="hann", nperseg=256)
115
116    print("\nPower Spectrum (Welch's method):")
117    print(f"  Number of frequency bins: {len(ps.frequencies)}")
118    print(f"  Frequency resolution: {ps.frequencies[1] - ps.frequencies[0]:.2f} Hz")
119
120    # Find peaks
121    psd_db = 10 * np.log10(np.maximum(ps.psd, 1e-10))
122    print("\nPeak frequencies:")
123    for i in range(1, len(psd_db) - 1):
124        if psd_db[i] > psd_db[i - 1] and psd_db[i] > psd_db[i + 1]:
125            if psd_db[i] > np.max(psd_db) - 20:
126                print(f"  {ps.frequencies[i]:.1f} Hz: {psd_db[i]:.1f} dB")
127
128
129def cross_spectrum_demo() -> None:
130    """Demonstrate cross-spectrum and coherence."""
131    print("\n" + "=" * 60)
132    print("3. CROSS-SPECTRUM AND COHERENCE")
133    print("=" * 60)
134
135    np.random.seed(42)
136
137    # Create two related signals
138    fs = 1000.0
139    t = np.linspace(0, 2, int(2 * fs), endpoint=False)
140
141    # Common component
142    common = np.sin(2 * np.pi * 50 * t)
143
144    # Independent noise
145    noise1 = 0.5 * np.random.randn(len(t))
146    noise2 = 0.5 * np.random.randn(len(t))
147
148    # Signal 1: common + independent noise
149    x = common + noise1
150    # Signal 2: common (delayed) + independent noise
151    delay_samples = 10
152    y = np.roll(common, delay_samples) + noise2
153
154    print("\nTwo signals with common 50 Hz component:")
155    print("  Signal x: 50 Hz + noise")
156    print(f"  Signal y: 50 Hz (delayed {delay_samples} samples) + noise")
157
158    # Cross-spectrum
159    csd = cross_spectrum(x, y, fs, window="hann")
160    print("\nCross-spectrum:")
161    print(f"  Frequency bins: {len(csd.frequencies)}")
162
163    # Find phase at 50 Hz
164    idx_50hz = np.argmin(np.abs(csd.frequencies - 50))
165    phase_50hz = np.angle(csd.csd[idx_50hz])
166    expected_phase = -2 * np.pi * 50 * delay_samples / fs
167    print("\nPhase at 50 Hz:")
168    print(f"  Measured: {np.degrees(phase_50hz):.1f} deg")
169    print(f"  Expected: {np.degrees(expected_phase):.1f} deg")
170
171    # Coherence
172    coh = coherence(x, y, fs, nperseg=256)
173    print("\nCoherence at 50 Hz:")
174    idx_coh = np.argmin(np.abs(coh.frequencies - 50))
175    print(f"  Coherence: {coh.coherence[idx_coh]:.3f}")
176    print("  (1.0 = perfectly correlated, 0.0 = uncorrelated)")
177
178
179def stft_demo() -> None:
180    """Demonstrate Short-Time Fourier Transform."""
181    print("\n" + "=" * 60)
182    print("4. SHORT-TIME FOURIER TRANSFORM (STFT)")
183    print("=" * 60)
184
185    np.random.seed(42)
186
187    # Create a chirp signal (frequency varies with time)
188    fs = 1000.0
189    duration = 2.0
190    t = np.linspace(0, duration, int(duration * fs), endpoint=False)
191
192    # Linear chirp from 50 Hz to 200 Hz
193    f0, f1 = 50, 200
194    chirp = np.sin(2 * np.pi * (f0 + (f1 - f0) / (2 * duration) * t) * t)
195
196    print(f"\nChirp signal: frequency sweeps from {f0} to {f1} Hz")
197    print(f"  Duration: {duration} seconds")
198    print(f"  Sample rate: {fs} Hz")
199
200    # Compute STFT
201    nperseg = 128
202    noverlap = nperseg // 2
203    result = stft(chirp, fs, window="hann", nperseg=nperseg, noverlap=noverlap)
204
205    print("\nSTFT Results:")
206    print(f"  Segment length: {nperseg} samples")
207    print(f"  Overlap: {noverlap} samples")
208    print(f"  Number of time frames: {len(result.times)}")
209    print(f"  Number of frequency bins: {len(result.frequencies)}")
210    print(f"  Time resolution: {result.times[1] - result.times[0]:.3f} s")
211    print(
212        f"  Frequency resolution: {result.frequencies[1] - result.frequencies[0]:.2f} Hz"
213    )
214
215    # Verify reconstruction with inverse STFT
216    t_rec, reconstructed = istft(
217        result.Zxx, fs, window="hann", nperseg=nperseg, noverlap=noverlap
218    )
219    min_len = min(len(chirp), len(reconstructed))
220    recon_error = np.sqrt(np.mean((chirp[:min_len] - reconstructed[:min_len]) ** 2))
221    print(f"\nReconstruction RMS error: {recon_error:.6f}")
222
223    # Track instantaneous frequency over time
224    print("\nInstantaneous frequency tracking:")
225    for i in range(0, len(result.times), len(result.times) // 5):
226        # Find peak frequency at this time
227        mag = np.abs(result.Zxx[:, i])
228        peak_idx = np.argmax(mag)
229        peak_freq = result.frequencies[peak_idx]
230        expected_freq = f0 + (f1 - f0) * result.times[i] / duration
231        print(
232            f"  t={result.times[i]:.2f}s: measured={peak_freq:.1f} Hz, "
233            f"expected={expected_freq:.1f} Hz"
234        )
235
236
237def spectrogram_demo() -> None:
238    """Demonstrate spectrogram computation."""
239    print("\n" + "=" * 60)
240    print("5. SPECTROGRAM")
241    print("=" * 60)
242
243    np.random.seed(42)
244
245    # Create a signal with time-varying frequency content
246    fs = 1000.0
247    duration = 3.0
248    t = np.linspace(0, duration, int(duration * fs), endpoint=False)
249
250    # Three segments with different frequencies
251    signal = np.zeros_like(t)
252    signal[t < 1] = np.sin(2 * np.pi * 50 * t[t < 1])  # 50 Hz
253    signal[(t >= 1) & (t < 2)] = np.sin(
254        2 * np.pi * 100 * t[(t >= 1) & (t < 2)]
255    )  # 100 Hz
256    signal[t >= 2] = np.sin(2 * np.pi * 150 * t[t >= 2])  # 150 Hz
257
258    print("\nTime-varying signal:")
259    print("  0-1 s: 50 Hz")
260    print("  1-2 s: 100 Hz")
261    print("  2-3 s: 150 Hz")
262
263    # Compute spectrogram
264    spec = spectrogram(signal, fs, window="hann", nperseg=256, noverlap=128)
265
266    print("\nSpectrogram dimensions:")
267    print(f"  Time bins: {len(spec.times)}")
268    print(f"  Frequency bins: {len(spec.frequencies)}")
269
270    # Find dominant frequency in each time segment
271    print("\nDominant frequencies by segment:")
272    time_points = [0.5, 1.5, 2.5]  # Center of each segment
273    for tp in time_points:
274        idx = np.argmin(np.abs(spec.times - tp))
275        power_slice = spec.power[:, idx]
276        peak_idx = np.argmax(power_slice)
277        print(f"  t={tp}s: {spec.frequencies[peak_idx]:.1f} Hz")
278
279
280def wavelet_demo() -> None:
281    """Demonstrate wavelet transforms."""
282    print("\n" + "=" * 60)
283    print("6. WAVELET TRANSFORMS")
284    print("=" * 60)
285
286    np.random.seed(42)
287
288    # Create a signal with a transient event
289    fs = 1000.0
290    t = np.linspace(0, 1, int(fs), endpoint=False)
291
292    # Background oscillation + transient pulse
293    signal = 0.5 * np.sin(2 * np.pi * 10 * t)  # 10 Hz background
294    # Add transient at t=0.5s
295    transient_center = 0.5
296    transient_width = 0.02
297    transient = np.exp(-((t - transient_center) ** 2) / (2 * transient_width**2))
298    signal += transient
299
300    print("\nTest signal: 10 Hz oscillation + Gaussian pulse at t=0.5s")
301
302    # Generate wavelet shapes
303    print("\nWavelet shapes:")
304    morlet = morlet_wavelet(64, w=5.0)
305    ricker = ricker_wavelet(64, a=8.0)
306    print(f"  Morlet wavelet: {len(morlet)} points")
307    print(f"  Ricker (Mexican hat) wavelet: {len(ricker)} points")
308
309    # Continuous Wavelet Transform
310    scales = np.arange(1, 64)
311    cwt_result = cwt(signal, scales, wavelet="morlet", fs=fs)
312
313    print("\nContinuous Wavelet Transform (CWT):")
314    print(f"  Number of scales: {len(cwt_result.scales)}")
315    print(
316        f"  Frequency range: {cwt_result.frequencies[-1]:.1f} to {cwt_result.frequencies[0]:.1f} Hz"
317    )
318
319    # Find the transient in CWT
320    cwt_mag = np.abs(cwt_result.coefficients)
321    max_idx = np.unravel_index(np.argmax(cwt_mag), cwt_mag.shape)
322    peak_time = t[max_idx[1]]
323    peak_freq = cwt_result.frequencies[max_idx[0]]
324    print("\nTransient detection:")
325    print(f"  Peak at time: {peak_time:.3f} s (true: {transient_center} s)")
326    print(f"  Peak frequency: {peak_freq:.1f} Hz")
327
328    # Discrete Wavelet Transform
329    print("\nDiscrete Wavelet Transform (DWT):")
330    dwt_result = dwt(signal, wavelet="db4", level=4)
331    print("  Wavelet: Daubechies 4 (db4)")
332    print(f"  Decomposition levels: {dwt_result.levels}")
333    print(f"  Approximation coefficients: {len(dwt_result.cA)} samples")
334    for i, cD in enumerate(dwt_result.cD):
335        print(f"  Detail level {i + 1}: {len(cD)} samples")
336
337    # Verify reconstruction
338    reconstructed = idwt(dwt_result)
339    min_len = min(len(signal), len(reconstructed))
340    recon_error = np.sqrt(np.mean((signal[:min_len] - reconstructed[:min_len]) ** 2))
341    print(f"\nDWT reconstruction RMS error: {recon_error:.6f}")
342
343
344def main() -> None:
345    """Run transform demonstrations."""
346    print("\nTransforms Examples")
347    print("=" * 60)
348    print("Demonstrating pytcl transform capabilities")
349
350    fft_demo()
351    power_spectrum_demo()
352    cross_spectrum_demo()
353    stft_demo()
354    spectrogram_demo()
355    wavelet_demo()
356
357    # Visualization
358    visualize_fft_analysis()
359
360    print("\n" + "=" * 60)
361    print("Done!")
362    print("=" * 60)
363
364
365def visualize_fft_analysis() -> None:
366    """Visualize FFT analysis of multi-frequency signal."""
367    print("\nGenerating FFT analysis visualization...")
368
369    # Create test signal
370    fs = 1000.0
371    t = np.linspace(0, 1, int(fs), endpoint=False)
372    f1, f2, f3 = 50, 120, 200
373
374    signal = (
375        np.sin(2 * np.pi * f1 * t)
376        + 0.5 * np.sin(2 * np.pi * f2 * t)
377        + 0.25 * np.sin(2 * np.pi * f3 * t)
378    )
379    signal += 0.1 * np.random.randn(len(t))
380
381    # Compute FFT
382    X = fft(signal)
383    freqs = np.fft.fftfreq(len(signal), 1 / fs)
384
385    # Only positive frequencies
386    pos_mask = freqs >= 0
387    pos_freqs = freqs[pos_mask]
388    pos_mag = np.abs(X[pos_mask])
389
390    fig = make_subplots(
391        rows=1,
392        cols=2,
393        subplot_titles=("Time Domain Signal", "Frequency Domain (FFT)"),
394    )
395
396    # Time domain
397    fig.add_trace(
398        go.Scatter(
399            x=t,
400            y=signal,
401            mode="lines",
402            name="Signal",
403            line=dict(color="blue", width=1),
404        ),
405        row=1,
406        col=1,
407    )
408
409    # Frequency domain
410    fig.add_trace(
411        go.Scatter(
412            x=pos_freqs[:500],
413            y=pos_mag[:500],
414            mode="lines",
415            name="Magnitude",
416            line=dict(color="red", width=1),
417        ),
418        row=1,
419        col=2,
420    )
421
422    fig.update_xaxes(title_text="Time (s)", row=1, col=1)
423    fig.update_yaxes(title_text="Amplitude", row=1, col=1)
424    fig.update_xaxes(title_text="Frequency (Hz)", row=1, col=2)
425    fig.update_yaxes(title_text="Magnitude", row=1, col=2)
426
427    fig.update_layout(
428        title="FFT Analysis: Multi-Frequency Signal",
429        height=500,
430        width=1000,
431        showlegend=False,
432    )
433
434    fig.show()
435
436
437if __name__ == "__main__":
438    main()

Running the Example

python examples/transforms.py

See Also