Source code for pytcl.static_estimation.robust

"""
Robust estimation methods.

This module provides robust estimators that are resistant to outliers,
including M-estimators (Huber, Tukey bisquare) and RANSAC.

References
----------
.. [1] P. J. Huber, "Robust Statistics," Wiley, 1981.
.. [2] M. A. Fischler and R. C. Bolles, "Random Sample Consensus: A Paradigm
       for Model Fitting with Applications to Image Analysis and Automated
       Cartography," Communications of the ACM, 1981.
"""

from typing import Any, Callable, NamedTuple, Optional

import numpy as np
from numpy.typing import ArrayLike, NDArray


[docs] class RobustResult(NamedTuple): """Result of robust estimation. Attributes ---------- x : ndarray Estimated parameters. residuals : ndarray Residual vector. weights : ndarray Final weights for each observation. scale : float Estimated scale of residuals. n_iter : int Number of iterations performed. converged : bool Whether the algorithm converged. """ x: NDArray[np.floating] residuals: NDArray[np.floating] weights: NDArray[np.floating] scale: float n_iter: int converged: bool
[docs] class RANSACResult(NamedTuple): """Result of RANSAC estimation. Attributes ---------- x : ndarray Estimated parameters from best model. inliers : ndarray Boolean mask of inlier points. n_inliers : int Number of inliers. residuals : ndarray Residuals for all points. n_iter : int Number of iterations performed. best_score : float Score of the best model found. """ x: NDArray[np.floating] inliers: NDArray[np.bool_] n_inliers: int residuals: NDArray[np.floating] n_iter: int best_score: float
# ============================================================================= # Weight Functions for M-estimators # =============================================================================
[docs] def huber_weight(r: ArrayLike, c: float = 1.345) -> NDArray[np.floating]: """ Huber weight function. Parameters ---------- r : array_like Standardized residuals. c : float, optional Tuning constant. Default 1.345 gives 95% efficiency for normal distribution. Returns ------- weights : ndarray Weights for each residual. Notes ----- The Huber weight function is: w(r) = 1 if |r| <= c w(r) = c / |r| if |r| > c Examples -------- >>> r = np.array([0.5, 1.0, 2.0, 5.0]) # Standardized residuals >>> w = huber_weight(r, c=1.345) >>> w[0] # Small residual gets weight 1 1.0 >>> w[3] < 0.5 # Large residual gets reduced weight True """ r = np.asarray(r, dtype=np.float64) abs_r = np.abs(r) weights = np.ones_like(r) mask = abs_r > c weights[mask] = c / abs_r[mask] return weights
[docs] def huber_rho(r: ArrayLike, c: float = 1.345) -> NDArray[np.floating]: """ Huber rho (loss) function. Parameters ---------- r : array_like Standardized residuals. c : float, optional Tuning constant. Returns ------- rho : ndarray Loss values. Notes ----- The Huber rho function is: rho(r) = r^2 / 2 if |r| <= c rho(r) = c * |r| - c^2/2 if |r| > c Examples -------- >>> r = np.array([0.5, 1.0, 2.0]) >>> rho = huber_rho(r, c=1.345) >>> rho[0] # Small residual: r^2/2 0.125 """ r = np.asarray(r, dtype=np.float64) abs_r = np.abs(r) rho = np.where(abs_r <= c, r**2 / 2, c * abs_r - c**2 / 2) return rho
[docs] def tukey_weight(r: ArrayLike, c: float = 4.685) -> NDArray[np.floating]: """ Tukey bisquare weight function. Parameters ---------- r : array_like Standardized residuals. c : float, optional Tuning constant. Default 4.685 gives 95% efficiency for normal distribution. Returns ------- weights : ndarray Weights for each residual. Notes ----- The Tukey bisquare weight function is: w(r) = (1 - (r/c)^2)^2 if |r| <= c w(r) = 0 if |r| > c This provides complete rejection of large outliers. Examples -------- >>> r = np.array([0.5, 2.0, 5.0, 10.0]) >>> w = tukey_weight(r, c=4.685) >>> w[0] > 0.9 # Small residual gets high weight True >>> w[3] # Large residual completely rejected 0.0 """ r = np.asarray(r, dtype=np.float64) abs_r = np.abs(r) weights = np.zeros_like(r) mask = abs_r <= c weights[mask] = (1 - (r[mask] / c) ** 2) ** 2 return weights
[docs] def tukey_rho(r: ArrayLike, c: float = 4.685) -> NDArray[np.floating]: """ Tukey bisquare rho (loss) function. Parameters ---------- r : array_like Standardized residuals. c : float, optional Tuning constant. Returns ------- rho : ndarray Loss values. Notes ----- The Tukey rho function is: rho(r) = c^2/6 * (1 - (1 - (r/c)^2)^3) if |r| <= c rho(r) = c^2/6 if |r| > c Examples -------- >>> r = np.array([0.0, 2.0, 10.0]) >>> rho = tukey_rho(r, c=4.685) >>> rho[0] # Zero residual 0.0 >>> rho[2] == rho[2] # Large residuals saturate at c^2/6 True """ r = np.asarray(r, dtype=np.float64) abs_r = np.abs(r) rho = np.full_like(r, c**2 / 6) mask = abs_r <= c rho[mask] = c**2 / 6 * (1 - (1 - (r[mask] / c) ** 2) ** 3) return rho
[docs] def cauchy_weight(r: ArrayLike, c: float = 2.385) -> NDArray[np.floating]: """ Cauchy weight function. Parameters ---------- r : array_like Standardized residuals. c : float, optional Tuning constant. Returns ------- weights : ndarray Weights for each residual. Notes ----- The Cauchy weight function is: w(r) = 1 / (1 + (r/c)^2) Examples -------- >>> r = np.array([0.0, 1.0, 5.0]) >>> w = cauchy_weight(r, c=2.385) >>> w[0] # Zero residual gets weight 1 1.0 >>> 0 < w[2] < 1 # Large residuals get reduced weight (but never zero) True """ r = np.asarray(r, dtype=np.float64) return 1 / (1 + (r / c) ** 2)
# ============================================================================= # Scale Estimators # =============================================================================
[docs] def mad(residuals: ArrayLike, c: float = 1.4826) -> float: """ Median Absolute Deviation (MAD) scale estimator. Parameters ---------- residuals : array_like Residual vector. c : float, optional Consistency constant. Default 1.4826 makes MAD consistent for normal distribution. Returns ------- scale : float Estimated scale. Notes ----- MAD = c * median(|r - median(r)|) This is a robust scale estimator with 50% breakdown point. Examples -------- >>> residuals = np.array([1.0, 1.1, 0.9, 1.0, 100.0]) # One outlier >>> scale = mad(residuals) >>> scale < 1.0 # Robust to the outlier True """ r = np.asarray(residuals, dtype=np.float64) return c * float(np.median(np.abs(r - np.median(r))))
[docs] def tau_scale( residuals: ArrayLike, c1: float = 4.5, c2: float = 3.0, ) -> float: """ Tau scale estimator. Parameters ---------- residuals : array_like Residual vector. c1, c2 : float Tuning constants. Returns ------- scale : float Estimated scale. Notes ----- Tau scale combines high breakdown point with efficiency. Examples -------- >>> residuals = np.array([1.0, 1.1, 0.9, 1.0, 1.2, 100.0]) # One outlier >>> scale = tau_scale(residuals) >>> scale < 10.0 # Robust to the outlier True """ r = np.asarray(residuals, dtype=np.float64) n = len(r) # Initial scale from MAD s0 = mad(r) if s0 < 1e-10: return s0 # Standardize u = r / s0 # Compute weights w1 = tukey_rho(u, c1) w2 = tukey_rho(u, c2) # Tau scale num = np.sum(w1) denom = np.sum(w2) if denom < 1e-10: return s0 return s0 * np.sqrt(num / n) * np.sqrt(n / denom)
# ============================================================================= # M-estimators # =============================================================================
[docs] def irls( A: ArrayLike, b: ArrayLike, weight_func: Callable[[np.ndarray[Any, Any]], np.ndarray[Any, Any]] = huber_weight, scale_func: Callable[[np.ndarray[Any, Any]], float] = mad, max_iter: int = 50, tol: float = 1e-6, ) -> RobustResult: """ Iteratively Reweighted Least Squares (IRLS). General M-estimator using IRLS algorithm. Parameters ---------- A : array_like Design matrix of shape (m, n). b : array_like Observation vector of shape (m,). weight_func : callable, optional Weight function w(r) for standardized residuals. Default is Huber weight. scale_func : callable, optional Scale estimation function. Default is MAD. max_iter : int, optional Maximum number of iterations. Default 50. tol : float, optional Convergence tolerance for parameter change. Default 1e-6. Returns ------- result : RobustResult Robust estimation result. Examples -------- >>> A = np.array([[1, 1], [1, 2], [1, 3], [1, 4], [1, 10]]) >>> b = np.array([2, 3, 4, 5, 100]) # Last point is outlier >>> result = irls(A, b) >>> result.x # Should be close to [1, 1] ignoring outlier Notes ----- IRLS iteratively solves weighted least squares problems, updating weights based on the current residuals. """ A = np.asarray(A, dtype=np.float64) b = np.asarray(b, dtype=np.float64) m, n = A.shape # Initial estimate from OLS x, _, _, _ = np.linalg.lstsq(A, b, rcond=None) converged = False for iteration in range(max_iter): x_old = x.copy() # Compute residuals residuals = b - A @ x # Estimate scale scale = scale_func(residuals) if scale < 1e-10: scale = 1.0 # Standardize residuals r_std = residuals / scale # Compute weights weights = weight_func(r_std) weights = np.maximum(weights, 1e-10) # Avoid zero weights # Weighted least squares W = np.diag(weights) AtWA = A.T @ W @ A AtWb = A.T @ W @ b try: x = np.linalg.solve(AtWA, AtWb) except np.linalg.LinAlgError: x = np.linalg.lstsq(AtWA, AtWb, rcond=None)[0] # Check convergence if np.linalg.norm(x - x_old) < tol * (1 + np.linalg.norm(x_old)): converged = True break # Final residuals residuals = b - A @ x scale = scale_func(residuals) return RobustResult( x=x, residuals=residuals, weights=weights, scale=float(scale), n_iter=iteration + 1, converged=converged, )
[docs] def huber_regression( A: ArrayLike, b: ArrayLike, c: float = 1.345, max_iter: int = 50, tol: float = 1e-6, ) -> RobustResult: """ Huber robust regression. M-estimation using Huber's weight function. Parameters ---------- A : array_like Design matrix of shape (m, n). b : array_like Observation vector of shape (m,). c : float, optional Tuning constant. Default 1.345 gives 95% efficiency. max_iter : int, optional Maximum iterations. Default 50. tol : float, optional Convergence tolerance. Default 1e-6. Returns ------- result : RobustResult Robust estimation result. Examples -------- >>> A = np.array([[1, 1], [1, 2], [1, 3]]) >>> b = np.array([2.1, 2.9, 100]) # Outlier in last observation >>> result = huber_regression(A, b) Notes ----- Huber regression provides a balance between efficiency for Gaussian errors and resistance to outliers. """ def weight_func(r: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]: return huber_weight(r, c) return irls(A, b, weight_func=weight_func, max_iter=max_iter, tol=tol)
[docs] def tukey_regression( A: ArrayLike, b: ArrayLike, c: float = 4.685, max_iter: int = 50, tol: float = 1e-6, ) -> RobustResult: """ Tukey bisquare robust regression. M-estimation using Tukey's bisquare weight function. Parameters ---------- A : array_like Design matrix of shape (m, n). b : array_like Observation vector of shape (m,). c : float, optional Tuning constant. Default 4.685 gives 95% efficiency. max_iter : int, optional Maximum iterations. Default 50. tol : float, optional Convergence tolerance. Default 1e-6. Returns ------- result : RobustResult Robust estimation result. Examples -------- >>> A = np.array([[1, 1], [1, 2], [1, 3]]) >>> b = np.array([2.1, 2.9, 100]) # Outlier in last observation >>> result = tukey_regression(A, b) Notes ----- Tukey bisquare provides complete rejection of large outliers (zero weight for residuals > c), making it more robust than Huber for gross outliers. """ def weight_func(r: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]: return tukey_weight(r, c) return irls(A, b, weight_func=weight_func, max_iter=max_iter, tol=tol)
# ============================================================================= # RANSAC # =============================================================================
[docs] def ransac( A: ArrayLike, b: ArrayLike, min_samples: Optional[int] = None, residual_threshold: Optional[float] = None, max_trials: int = 100, stop_n_inliers: Optional[int] = None, stop_score: Optional[float] = None, random_state: Optional[int] = None, ) -> RANSACResult: """ RANdom SAmple Consensus (RANSAC) regression. Robust regression by iteratively selecting random subsets, fitting models, and identifying inliers. Parameters ---------- A : array_like Design matrix of shape (m, n). b : array_like Observation vector of shape (m,). min_samples : int, optional Minimum number of samples for fitting. Default is n (number of features). residual_threshold : float, optional Maximum residual for a point to be considered inlier. Default is MAD of initial OLS residuals. max_trials : int, optional Maximum number of random trials. Default 100. stop_n_inliers : int, optional Stop early if this many inliers found. Default None. stop_score : float, optional Stop early if score exceeds this. Default None. random_state : int, optional Random seed for reproducibility. Returns ------- result : RANSACResult RANSAC result with best model and inlier mask. Examples -------- >>> rng = np.random.default_rng(42) >>> A = np.column_stack([np.ones(100), np.arange(100)]) >>> b = 2 + 3 * np.arange(100) + rng.normal(0, 1, 100) >>> # Add outliers >>> b[90:] = 1000 >>> result = ransac(A, b, random_state=42) >>> result.x # Should be close to [2, 3] Notes ----- RANSAC is particularly effective when the fraction of outliers is large (>25%), where M-estimators may struggle. The algorithm: 1. Randomly select min_samples points 2. Fit model to selected points 3. Count inliers (points with residual < threshold) 4. If best model found, save it 5. Repeat for max_trials 6. Refit model using all inliers of best model References ---------- .. [1] M. A. Fischler and R. C. Bolles, "Random Sample Consensus," Communications of the ACM, 1981. """ A = np.asarray(A, dtype=np.float64) b = np.asarray(b, dtype=np.float64) m, n = A.shape if min_samples is None: min_samples = n if min_samples > m: raise ValueError(f"min_samples ({min_samples}) > n_samples ({m})") # Initialize threshold from OLS if residual_threshold is None: x_ols, _, _, _ = np.linalg.lstsq(A, b, rcond=None) residuals_ols = b - A @ x_ols residual_threshold = 2.5 * mad(residuals_ols) if residual_threshold < 1e-10: residual_threshold = 1.0 rng = np.random.default_rng(random_state) best_x = None best_inliers = np.zeros(m, dtype=bool) best_n_inliers = 0 best_score = -np.inf for trial in range(max_trials): # Random sample sample_idx = rng.choice(m, size=min_samples, replace=False) A_sample = A[sample_idx] b_sample = b[sample_idx] # Fit to sample try: x_sample, _, _, _ = np.linalg.lstsq(A_sample, b_sample, rcond=None) except np.linalg.LinAlgError: continue # Compute residuals for all points residuals = np.abs(b - A @ x_sample) # Find inliers inliers = residuals < residual_threshold n_inliers = np.sum(inliers) # Score (number of inliers) score = float(n_inliers) # Update best if improved if score > best_score: best_score = score best_x = x_sample best_inliers = inliers best_n_inliers = n_inliers # Early stopping if stop_n_inliers is not None and n_inliers >= stop_n_inliers: break if stop_score is not None and score >= stop_score: break # Refit using all inliers if best_n_inliers >= n: A_inliers = A[best_inliers] b_inliers = b[best_inliers] best_x, _, _, _ = np.linalg.lstsq(A_inliers, b_inliers, rcond=None) # Handle case where no good model found if best_x is None: best_x, _, _, _ = np.linalg.lstsq(A, b, rcond=None) best_inliers = np.ones(m, dtype=bool) best_n_inliers = m # Final residuals residuals = b - A @ best_x return RANSACResult( x=best_x, inliers=best_inliers, n_inliers=best_n_inliers, residuals=residuals, n_iter=trial + 1, best_score=best_score, )
[docs] def ransac_n_trials( n_samples: int, n_outliers: int, min_samples: int, probability: float = 0.99, ) -> int: """ Compute number of RANSAC trials needed. Parameters ---------- n_samples : int Total number of samples. n_outliers : int Expected number of outliers. min_samples : int Number of samples per trial. probability : float, optional Desired probability of success. Default 0.99. Returns ------- n_trials : int Number of trials needed. Examples -------- >>> ransac_n_trials(100, 30, 2) # 30% outliers, 2 samples per trial 10 Notes ----- Formula: k = log(1 - p) / log(1 - (1 - e)^n) where: - p = probability of success - e = outlier ratio - n = min_samples """ if n_samples <= n_outliers: return 1 outlier_ratio = n_outliers / n_samples inlier_ratio = 1 - outlier_ratio if inlier_ratio**min_samples < 1e-10: return 10000 # Very high outlier rate prob_all_inliers = inlier_ratio**min_samples prob_failure = 1 - prob_all_inliers if prob_failure < 1e-10: return 1 n_trials = np.log(1 - probability) / np.log(prob_failure) return max(1, int(np.ceil(n_trials)))
__all__ = [ "RobustResult", "RANSACResult", "huber_weight", "huber_rho", "tukey_weight", "tukey_rho", "cauchy_weight", "mad", "tau_scale", "irls", "huber_regression", "tukey_regression", "ransac", "ransac_n_trials", ]