Source code for pytcl.magnetism.emm

"""
Enhanced Magnetic Model (EMM) and World Magnetic Model High Resolution (WMMHR).

This module provides support for high-degree magnetic field models:
- EMM2017: Degree 790, resolves anomalies down to 51 km wavelength
- WMMHR2025: Degree 133, resolves anomalies down to ~300 km wavelength

These models provide higher spatial resolution than the standard WMM (degree 12)
by including crustal magnetic field contributions.

References
----------
.. [1] Maus, S., et al. "EMAG2: A 2-arc min resolution Earth Magnetic
       Anomaly Grid compiled from satellite, airborne, and marine
       magnetic measurements." Geochemistry, Geophysics, Geosystems 10.8 (2009).
.. [2] NOAA National Centers for Environmental Information,
       "Enhanced Magnetic Model (EMM)."
       https://www.ncei.noaa.gov/products/enhanced-magnetic-model
.. [3] NOAA National Centers for Environmental Information,
       "World Magnetic Model High Resolution (WMMHR)."
       https://www.ncei.noaa.gov/products/world-magnetic-model-high-resolution
"""

from functools import lru_cache
from pathlib import Path
from typing import Any, NamedTuple, Optional, Tuple, Union

import numpy as np
from numpy.typing import NDArray

from pytcl.core.paths import get_data_dir

from .wmm import MagneticResult

# Model parameters
EMM_PARAMETERS: dict[str, dict[str, Any]] = {
    "EMM2017": {
        "n_max": 790,
        "epoch": 2017.0,
        "valid_start": 2000.0,
        "valid_end": 2022.0,
        "reference_radius": 6371.2,  # km
        "file_size_mb": 24.5,
    },
    "WMMHR2025": {
        "n_max": 133,
        "n_max_sv": 15,  # Secular variation only to degree 15
        "epoch": 2025.0,
        "valid_start": 2025.0,
        "valid_end": 2030.0,
        "reference_radius": 6371.2,  # km
        "file_size_kb": 534,
    },
}


[docs] class HighResCoefficients(NamedTuple): """High-resolution magnetic model coefficients. Extends MagneticCoefficients with additional metadata for high-degree models like EMM and WMMHR. Attributes ---------- g : ndarray Main field cosine coefficients (nT), shape (n_max+1, n_max+1). h : ndarray Main field sine coefficients (nT), shape (n_max+1, n_max+1). g_dot : ndarray Secular variation of g (nT/year), shape (n_max_sv+1, n_max_sv+1). h_dot : ndarray Secular variation of h (nT/year), shape (n_max_sv+1, n_max_sv+1). epoch : float Reference epoch (decimal year). n_max : int Maximum degree for main field. n_max_sv : int Maximum degree for secular variation (may be < n_max). model_name : str Model identifier ("EMM2017" or "WMMHR2025"). """ g: NDArray[np.floating] h: NDArray[np.floating] g_dot: NDArray[np.floating] h_dot: NDArray[np.floating] epoch: float n_max: int n_max_sv: int model_name: str
[docs] def parse_emm_file( filepath: Path, n_max: Optional[int] = None, ) -> tuple[ NDArray[np.floating], NDArray[np.floating], NDArray[np.floating], NDArray[np.floating], float, int, ]: """Parse an EMM/WMMHR coefficient file. The file format is similar to WMM but with more coefficients: n m g_nm h_nm g_dot_nm h_dot_nm Parameters ---------- filepath : Path Path to the coefficient file. n_max : int, optional Maximum degree to load. If None, loads all coefficients. Returns ------- g : ndarray Cosine coefficients. h : ndarray Sine coefficients. g_dot : ndarray Secular variation of g. h_dot : ndarray Secular variation of h. epoch : float Model epoch. actual_n_max : int Maximum degree loaded. """ epoch = 2017.0 # Default max_n_in_file = 0 max_n_sv = 0 # First pass: determine dimensions and epoch with open(filepath, "r") as f: for line_num, line in enumerate(f): line = line.strip() if not line: continue # Parse header line (first line with epoch info) if line_num == 0: parts = line.split() if len(parts) >= 1: try: epoch = float(parts[0]) except ValueError: pass continue # Skip comment lines if line.startswith("#") or line.startswith("9999"): continue parts = line.split() if len(parts) >= 4: try: n = int(parts[0]) max_n_in_file = max(max_n_in_file, n) # Check if secular variation is present if len(parts) >= 6: g_dot_val = float(parts[4]) h_dot_val = float(parts[5]) if g_dot_val != 0 or h_dot_val != 0: max_n_sv = max(max_n_sv, n) except (ValueError, IndexError): continue # Determine actual limits if n_max is None: actual_n_max = max_n_in_file else: actual_n_max = min(n_max, max_n_in_file) actual_n_max_sv = min(max_n_sv, actual_n_max) # Initialize coefficient arrays g = np.zeros((actual_n_max + 1, actual_n_max + 1)) h = np.zeros((actual_n_max + 1, actual_n_max + 1)) g_dot = np.zeros((actual_n_max_sv + 1, actual_n_max_sv + 1)) h_dot = np.zeros((actual_n_max_sv + 1, actual_n_max_sv + 1)) # Second pass: read coefficients with open(filepath, "r") as f: for line_num, line in enumerate(f): line = line.strip() if not line or line_num == 0: continue if line.startswith("#") or line.startswith("9999"): continue parts = line.split() if len(parts) < 4: continue try: n = int(parts[0]) m = int(parts[1]) if n > actual_n_max or m > n: continue g[n, m] = float(parts[2]) h[n, m] = float(parts[3]) # Secular variation (if present) if len(parts) >= 6 and n <= actual_n_max_sv: g_dot[n, m] = float(parts[4]) h_dot[n, m] = float(parts[5]) except (ValueError, IndexError): continue return g, h, g_dot, h_dot, epoch, actual_n_max
[docs] def create_test_coefficients(n_max: int = 36) -> HighResCoefficients: """Create test coefficients for low-degree testing. Creates a simplified set of coefficients for testing purposes, using the same low-degree terms as WMM2020 extended with synthetic crustal field terms. Parameters ---------- n_max : int Maximum degree (default 36). Returns ------- HighResCoefficients Test coefficient set. """ n_max_sv = min(n_max, 12) # SV only to degree 12 g = np.zeros((n_max + 1, n_max + 1)) h = np.zeros((n_max + 1, n_max + 1)) g_dot = np.zeros((n_max_sv + 1, n_max_sv + 1)) h_dot = np.zeros((n_max_sv + 1, n_max_sv + 1)) # Core field coefficients from WMM2020 (degrees 1-12) # n=1 (dipole) g[1, 0] = -29404.5 g[1, 1] = -1450.7 h[1, 1] = 4652.9 # n=2 g[2, 0] = -2500.0 g[2, 1] = 2982.0 g[2, 2] = 1676.8 h[2, 1] = -2991.6 h[2, 2] = -734.8 # n=3 g[3, 0] = 1363.9 g[3, 1] = -2381.0 g[3, 2] = 1236.2 g[3, 3] = 525.7 h[3, 1] = -82.2 h[3, 2] = 241.8 h[3, 3] = -542.9 # n=4 g[4, 0] = 903.1 g[4, 1] = 809.4 g[4, 2] = 86.2 g[4, 3] = -309.4 g[4, 4] = 47.9 h[4, 1] = 282.0 h[4, 2] = -158.4 h[4, 3] = 199.8 h[4, 4] = -350.1 # n=5 g[5, 0] = -234.4 g[5, 1] = 363.1 g[5, 2] = 47.7 g[5, 3] = 187.8 g[5, 4] = -140.7 g[5, 5] = -151.2 h[5, 1] = 46.7 h[5, 2] = 196.9 h[5, 3] = -119.4 h[5, 4] = 16.0 h[5, 5] = 100.1 # n=6 g[6, 0] = 65.6 g[6, 1] = 65.5 g[6, 2] = 51.9 g[6, 3] = -61.1 g[6, 4] = 16.9 g[6, 5] = 0.7 g[6, 6] = -91.1 h[6, 1] = -77.0 h[6, 2] = 25.0 h[6, 3] = 26.4 h[6, 4] = -16.3 h[6, 5] = 28.9 h[6, 6] = 4.1 # Secular variation (degrees 1-6 only for test) g_dot[1, 0] = 6.7 g_dot[1, 1] = 7.7 h_dot[1, 1] = -25.1 g_dot[2, 0] = -11.5 g_dot[2, 1] = -7.1 g_dot[2, 2] = -2.2 h_dot[2, 1] = -30.2 h_dot[2, 2] = -23.9 # Add synthetic crustal field for higher degrees (small values) # These simulate the crustal field contribution np.random.seed(42) # Reproducible for n in range(13, n_max + 1): # Crustal field decays with increasing degree amplitude = 10.0 / n # nT for m in range(n + 1): g[n, m] = amplitude * (2 * np.random.random() - 1) h[n, m] = amplitude * (2 * np.random.random() - 1) return HighResCoefficients( g=g, h=h, g_dot=g_dot, h_dot=h_dot, epoch=2020.0, n_max=n_max, n_max_sv=n_max_sv, model_name="EMM_TEST", )
@lru_cache(maxsize=4) def _load_coefficients_cached( model: str, n_max: Optional[int], ) -> HighResCoefficients: """Cached coefficient loading (internal function). Parameters ---------- model : str Model name ("EMM2017" or "WMMHR2025"). n_max : int or None Maximum degree. Returns ------- HighResCoefficients """ if model not in EMM_PARAMETERS: raise ValueError(f"Unknown model: {model}. Use 'EMM2017' or 'WMMHR2025'.") params = EMM_PARAMETERS[model] # Determine coefficient file path data_dir = get_data_dir() # Try different file extensions for ext in [".cof", ".COF", ".txt", ".dat"]: filepath = data_dir / f"{model}{ext}" if filepath.exists(): break else: ncei_base = "https://www.ncei.noaa.gov/products" emm_url = f"{ncei_base}/enhanced-magnetic-model" wmmhr_url = f"{ncei_base}/world-magnetic-model-high-resolution" raise FileNotFoundError( f"Coefficient file not found for {model}\n" f"Please download the {model} coefficients from:\n" f" {emm_url} (EMM)\n" f" {wmmhr_url} (WMMHR)\n" f"and save to: {data_dir}/{model}.cof\n" f"Or use create_test_coefficients() for testing." ) # Parse the file actual_n_max = n_max if n_max is not None else params["n_max"] g, h, g_dot, h_dot, epoch, loaded_n_max = parse_emm_file(filepath, actual_n_max) n_max_sv = params.get("n_max_sv", loaded_n_max) n_max_sv = min(n_max_sv, g_dot.shape[0] - 1) return HighResCoefficients( g=g, h=h, g_dot=g_dot, h_dot=h_dot, epoch=epoch, n_max=loaded_n_max, n_max_sv=n_max_sv, model_name=model, )
[docs] def load_emm_coefficients( model: str = "EMM2017", n_max: Optional[int] = None, ) -> HighResCoefficients: """Load high-resolution magnetic model coefficients. Parameters ---------- model : str Model name, either "EMM2017" or "WMMHR2025". n_max : int, optional Maximum degree to load. If None, loads the full model. Returns ------- HighResCoefficients Loaded coefficient structure. Raises ------ FileNotFoundError If the coefficient file is not found. ValueError If an unknown model is specified. Examples -------- >>> coef = load_emm_coefficients("WMMHR2025", n_max=50) >>> coef.n_max 50 """ return _load_coefficients_cached(model, n_max)
def _high_res_field_spherical( lat: float, lon: float, r: float, year: float, coeffs: HighResCoefficients, n_max_eval: Optional[int] = None, ) -> Tuple[float, float, float]: """Compute magnetic field using high-resolution coefficients. Similar to magnetic_field_spherical but handles different n_max for main field and secular variation. Parameters ---------- lat : float Geocentric latitude in radians. lon : float Longitude in radians. r : float Radial distance from Earth's center in km. year : float Decimal year. coeffs : HighResCoefficients Model coefficients. n_max_eval : int, optional Maximum degree to evaluate. Default uses full model. Returns ------- B_r : float Radial component in nT. B_theta : float Colatitude component in nT. B_phi : float Longitude component in nT. """ from pytcl.gravity.spherical_harmonics import associated_legendre if n_max_eval is None: n_max_eval = coeffs.n_max else: n_max_eval = min(n_max_eval, coeffs.n_max) N = n_max_eval a = 6371.2 # Reference radius in km # Time adjustment — vectorized over lower-left triangle dt = year - coeffs.epoch g = coeffs.g[: N + 1, : N + 1].copy() h = coeffs.h[: N + 1, : N + 1].copy() n_max_sv = min(coeffs.n_max_sv, N) sv_n = n_max_sv + 1 g[:sv_n, :sv_n] += dt * coeffs.g_dot[:sv_n, :sv_n] h[:sv_n, :sv_n] += dt * coeffs.h_dot[:sv_n, :sv_n] # Colatitude theta = np.pi / 2 - lat cos_theta = np.cos(theta) sin_theta = np.sin(theta) # Compute associated Legendre functions P = associated_legendre(N, N, cos_theta, normalized=True) # Compute dP/dtheta — vectorized dP = np.zeros((N + 1, N + 1)) if abs(sin_theta) > 1e-10: cot = cos_theta / sin_theta csc = 1.0 / sin_theta for n in range(1, N + 1): # Diagonal: m == n dP[n, n] = n * cot * P[n, n] # Off-diagonal: m < n ms = np.arange(0, n) factors = np.sqrt((n - ms) * (n + ms + 1)) dP[n, :n] = n * cot * P[n, :n] - factors * P[n, 1 : n + 1] * csc # Precompute radial power: (a/r)^(n+2) for n = 0..N r_ratio = a / r ns = np.arange(N + 1) r_powers = r_ratio ** (ns + 2) # shape (N+1,) # Precompute trig: cos(m*lon), sin(m*lon) for m = 0..N ms = np.arange(N + 1) cos_m_lon = np.cos(ms * lon) # shape (N+1,) sin_m_lon = np.sin(ms * lon) # shape (N+1,) # Build lower-triangular mask (n, m) where m <= n, n >= 1 n_idx, m_idx = np.meshgrid(ns, ms, indexing="ij") mask = (m_idx <= n_idx) & (n_idx >= 1) # Compute field terms for all (n, m) at once gh_cos = g * cos_m_lon[np.newaxis, :] + h * sin_m_lon[np.newaxis, :] # (N+1, N+1) gh_sin = g * sin_m_lon[np.newaxis, :] - h * cos_m_lon[np.newaxis, :] # (N+1, N+1) rp = r_powers[:, np.newaxis] # (N+1, 1) np1 = (ns + 1)[:, np.newaxis] # (N+1, 1) B_r_terms = np1 * rp * P * gh_cos B_theta_terms = -rp * dP * gh_cos B_r = np.sum(B_r_terms[mask]) B_theta = np.sum(B_theta_terms[mask]) if abs(sin_theta) > 1e-10: B_phi_terms = rp * m_idx * P / sin_theta * gh_sin B_phi = np.sum(B_phi_terms[mask]) else: B_phi = 0.0 return B_r, B_theta, B_phi
[docs] def emm( lat: float, lon: float, h: float = 0.0, year: float = 2020.0, model: str = "EMM2017", n_max: Optional[int] = None, coefficients: Optional[HighResCoefficients] = None, ) -> MagneticResult: """Compute magnetic field using Enhanced Magnetic Model. The EMM provides higher spatial resolution than WMM by including crustal magnetic field contributions up to degree 790. Parameters ---------- lat : float Geodetic latitude in radians. lon : float Longitude in radians. h : float, optional Height above WGS84 ellipsoid in km. Default 0. year : float, optional Decimal year. Default 2020.0. model : str, optional Model name ("EMM2017" or "WMMHR2025"). Default "EMM2017". n_max : int, optional Maximum degree to evaluate. Default uses full model. coefficients : HighResCoefficients, optional Pre-loaded coefficients. If None, loads from file. Returns ------- result : MagneticResult Magnetic field components and derived quantities. Examples -------- >>> import numpy as np >>> # Use test coefficients for demonstration >>> coef = create_test_coefficients(n_max=36) >>> result = emm(np.radians(40), np.radians(-105), 1.0, 2020.0, coefficients=coef) >>> print(f"Declination: {np.degrees(result.D):.2f}°") >>> print(f"Total intensity: {result.F:.0f} nT") """ if coefficients is None: coefficients = load_emm_coefficients(model, n_max) # Handle array inputs by iterating (spherical harmonic sum is scalar) lat_arr = np.asarray(lat) if lat_arr.ndim > 0: lon_arr = np.broadcast_to(np.asarray(lon), lat_arr.shape) h_arr = np.broadcast_to(np.asarray(h), lat_arr.shape) results = [ emm(float(la), float(lo), float(hi), year, model, n_max, coefficients) for la, lo, hi in zip(lat_arr.ravel(), lon_arr.ravel(), h_arr.ravel()) ] shape = lat_arr.shape return MagneticResult( X=np.array([r.X for r in results]).reshape(shape), Y=np.array([r.Y for r in results]).reshape(shape), Z=np.array([r.Z for r in results]).reshape(shape), H=np.array([r.H for r in results]).reshape(shape), F=np.array([r.F for r in results]).reshape(shape), I=np.array([r.I for r in results]).reshape(shape), D=np.array([r.D for r in results]).reshape(shape), ) # Geocentric approximation a = 6371.2 # km lat_gc = lat r = a + h # Compute field in spherical coordinates B_r, B_theta, B_phi = _high_res_field_spherical( lat_gc, lon, r, year, coefficients, n_max ) # Convert to geodetic coordinates X = -B_theta # North Y = B_phi # East Z = -B_r # Down # Derived quantities H = np.sqrt(X * X + Y * Y) F = np.sqrt(H * H + Z * Z) incl = np.arctan2(Z, H) D = np.arctan2(Y, X) return MagneticResult(X=X, Y=Y, Z=Z, H=H, F=F, I=incl, D=D)
[docs] def wmmhr( lat: float, lon: float, h: float = 0.0, year: float = 2025.0, n_max: Optional[int] = None, coefficients: Optional[HighResCoefficients] = None, ) -> MagneticResult: """Compute magnetic field using World Magnetic Model High Resolution. WMMHR2025 extends WMM to degree 133, providing higher spatial resolution for applications requiring more accurate local field values. Parameters ---------- lat : float Geodetic latitude in radians. lon : float Longitude in radians. h : float, optional Height above WGS84 ellipsoid in km. Default 0. year : float, optional Decimal year. Default 2025.0. n_max : int, optional Maximum degree to evaluate. Default uses full model (133). coefficients : HighResCoefficients, optional Pre-loaded coefficients. If None, loads from file. Returns ------- result : MagneticResult Magnetic field components and derived quantities. Notes ----- WMMHR2025 is valid for 2025.0 - 2030.0. For dates outside this range, results may be less accurate. Secular variation is only computed for degrees 1-15. Higher degree terms represent static crustal field. """ return emm( lat, lon, h, year, model="WMMHR2025", n_max=n_max, coefficients=coefficients, )
_ScalarOrArray = Union[float, NDArray[np.floating]]
[docs] def emm_declination( lat: _ScalarOrArray, lon: _ScalarOrArray, h: _ScalarOrArray = 0.0, year: float = 2020.0, model: str = "EMM2017", n_max: Optional[int] = None, coefficients: Optional[HighResCoefficients] = None, ) -> _ScalarOrArray: """Compute magnetic declination using EMM/WMMHR. Parameters ---------- lat : float or ndarray Geodetic latitude in radians. lon : float or ndarray Longitude in radians. h : float or ndarray, optional Height above WGS84 ellipsoid in km. year : float, optional Decimal year. model : str, optional Model name. n_max : int, optional Maximum degree to evaluate. coefficients : HighResCoefficients, optional Pre-loaded coefficients. Returns ------- float or ndarray Magnetic declination in radians. """ result = emm(lat, lon, h, year, model, n_max, coefficients) return result.D
[docs] def emm_inclination( lat: _ScalarOrArray, lon: _ScalarOrArray, h: _ScalarOrArray = 0.0, year: float = 2020.0, model: str = "EMM2017", n_max: Optional[int] = None, coefficients: Optional[HighResCoefficients] = None, ) -> _ScalarOrArray: """Compute magnetic inclination using EMM/WMMHR. Parameters ---------- lat : float or ndarray Geodetic latitude in radians. lon : float or ndarray Longitude in radians. h : float or ndarray, optional Height above WGS84 ellipsoid in km. year : float, optional Decimal year. model : str, optional Model name. n_max : int, optional Maximum degree to evaluate. coefficients : HighResCoefficients, optional Pre-loaded coefficients. Returns ------- float or ndarray Magnetic inclination (dip) in radians. """ result = emm(lat, lon, h, year, model, n_max, coefficients) return result.I
[docs] def emm_intensity( lat: _ScalarOrArray, lon: _ScalarOrArray, h: _ScalarOrArray = 0.0, year: float = 2020.0, model: str = "EMM2017", n_max: Optional[int] = None, coefficients: Optional[HighResCoefficients] = None, ) -> _ScalarOrArray: """Compute total magnetic field intensity using EMM/WMMHR. Parameters ---------- lat : float or ndarray Geodetic latitude in radians. lon : float or ndarray Longitude in radians. h : float or ndarray, optional Height above WGS84 ellipsoid in km. year : float, optional Decimal year. model : str, optional Model name. n_max : int, optional Maximum degree to evaluate. coefficients : HighResCoefficients, optional Pre-loaded coefficients. Returns ------- float or ndarray Total magnetic field intensity in nT. """ result = emm(lat, lon, h, year, model, n_max, coefficients) return result.F
__all__ = [ "HighResCoefficients", "EMM_PARAMETERS", "get_data_dir", "parse_emm_file", "create_test_coefficients", "load_emm_coefficients", "emm", "wmmhr", "emm_declination", "emm_inclination", "emm_intensity", ]