"""Descomposicion STL de una serie temporal en tendencia/estacional/resto (grupo eda). Funcion pura y determinista que aplica STL (Seasonal-Trend decomposition using Loess, Cleveland et al. 1990) via statsmodels y reporta las tres componentes mas las medidas de fuerza de tendencia y de estacionalidad de Hyndman ("Forecasting: Principles and Practice", seccion de feature extraction). Util en EDA para entender que parte de la variacion de una serie es tendencia, ciclo estacional o ruido antes de modelar o desestacionalizar. """ from __future__ import annotations import math import numpy as np from statsmodels.tsa.seasonal import STL def _clean(values: list) -> list[float]: """Conserva solo valores numericos finitos, descartando None/NaN/no-numericos.""" out: list[float] = [] for v in values: if v is None or isinstance(v, bool): continue if not isinstance(v, (int, float)): continue x = float(v) if math.isnan(x) or math.isinf(x): continue out.append(x) return out def _infer_period(arr: np.ndarray, max_period: int) -> int | None: """Infiere el periodo estacional dominante via autocorrelacion del residuo detrended. Busca el retardo (entre 2 y ``max_period``) que maximiza la autocorrelacion de la serie tras restarle su tendencia lineal. El detrend es clave: sobre una serie con tendencia la autocorrelacion cruda decae monotonamente y el retardo minimo (2) gana siempre, produciendo un ``period=2`` espurio que enmascara la estacionalidad real (falso negativo). Quitando primero la recta de mejor ajuste por minimos cuadrados, el lag ganador refleja el ciclo estacional y no la deriva. Devuelve None si no encuentra un pico claro (autocorrelacion maxima por debajo de un umbral pequeno). """ n = len(arr) if n < 6: return None # Detrend lineal: resta la recta de mejor ajuste para que la tendencia no # domine la autocorrelacion (si no, lag=2 gana siempre en series con deriva). t = np.arange(n, dtype=float) try: slope, intercept = np.polyfit(t, arr, 1) detrended = arr - (slope * t + intercept) except (np.linalg.LinAlgError, ValueError): detrended = arr - arr.mean() x = detrended - detrended.mean() denom = float(np.dot(x, x)) if denom == 0.0: return None best_lag = None best_corr = 0.0 upper = min(max_period, n // 2) for lag in range(2, upper + 1): corr = float(np.dot(x[:-lag], x[lag:]) / denom) if corr > best_corr: best_corr = corr best_lag = lag if best_lag is None or best_corr < 0.2: return None return best_lag def _summarize(component: list[float], max_inline: int = 200) -> dict: """Resume una componente: la incluye entera si es corta, si no estadisticos.""" arr = np.asarray(component, dtype=float) summary = { "min": float(arr.min()), "max": float(arr.max()), "mean": float(arr.mean()), "std": float(arr.std(ddof=0)), } if len(component) <= max_inline: summary["values"] = [float(v) for v in component] else: summary["values"] = None summary["note"] = f"serie larga ({len(component)} puntos): solo estadisticos" return summary def stl_decompose(values: list, period: int = None, robust: bool = True) -> dict: """Descompone una serie temporal en tendencia, estacional y resto via STL. Aplica STL (Seasonal-Trend decomposition using Loess) sobre ``values`` y devuelve las tres componentes (resumidas si la serie es larga) junto a la fuerza de tendencia y la fuerza estacional de Hyndman:: F_trend = max(0, 1 - Var(resto) / Var(resto + tendencia)) F_seasonal = max(0, 1 - Var(resto) / Var(resto + estacional)) Ambas en ``[0, 1]``: cercano a 1 indica una componente fuerte y bien definida; cercano a 0 indica que esa componente apenas existe. Funcion pura y determinista: no hace I/O, no muta los inputs. Args: values: serie temporal de valores numericos en orden cronologico. None/NaN/infinitos/no-numericos se descartan antes de descomponer. period: periodo estacional (numero de observaciones por ciclo, p.ej. 12 para datos mensuales con estacionalidad anual). Si es ``None`` se intenta inferir por autocorrelacion; si no se halla un periodo claro, se devuelve una nota. robust: si ``True`` (default) usa el ajuste robusto de STL, que reduce el efecto de outliers sobre tendencia y estacionalidad. Returns: Con menos de ``2 * period`` puntos (o <8 si no hay periodo) devuelve un dict con ``note`` explicando por que no se pudo descomponer y ``trend_strength``/``seasonal_strength`` en ``None``. En otro caso un dict con:: { "n": int, "period": int, # periodo usado (inferido o dado) "period_inferred": bool, # True si se infirio automaticamente "robust": bool, "trend": {min,max,mean,std, values|note}, "seasonal": {...}, "resid": {...}, "trend_strength": float, # F_trend de Hyndman en [0,1] "seasonal_strength": float, # F_seasonal de Hyndman en [0,1] } """ clean = _clean(values) n = len(clean) if n < 8: return { "n": n, "note": "datos insuficientes", "trend_strength": None, "seasonal_strength": None, } arr = np.asarray(clean, dtype=float) inferred = False if period is None: period = _infer_period(arr, max_period=max(2, n // 2)) inferred = True if period is None: return { "n": n, "note": "no se pudo inferir un periodo estacional; pasa period explicito", "trend_strength": None, "seasonal_strength": None, } period = int(period) if period < 2: return { "n": n, "note": "period debe ser >= 2", "trend_strength": None, "seasonal_strength": None, } # STL exige al menos dos ciclos completos. if n < 2 * period: return { "n": n, "period": period, "note": f"serie corta: STL necesita >= 2*period ({2 * period}) puntos", "trend_strength": None, "seasonal_strength": None, } result = STL(arr, period=period, robust=robust).fit() trend = np.asarray(result.trend, dtype=float) seasonal = np.asarray(result.seasonal, dtype=float) resid = np.asarray(result.resid, dtype=float) # Fuerza de tendencia y estacional (Hyndman). Var con ddof=0. var_resid = float(np.var(resid, ddof=0)) var_resid_trend = float(np.var(resid + trend, ddof=0)) var_resid_seasonal = float(np.var(resid + seasonal, ddof=0)) trend_strength = ( max(0.0, 1.0 - var_resid / var_resid_trend) if var_resid_trend > 0 else 0.0 ) seasonal_strength = ( max(0.0, 1.0 - var_resid / var_resid_seasonal) if var_resid_seasonal > 0 else 0.0 ) return { "n": n, "period": period, "period_inferred": bool(inferred), "robust": bool(robust), "trend": _summarize(trend.tolist()), "seasonal": _summarize(seasonal.tolist()), "resid": _summarize(resid.tolist()), "trend_strength": float(trend_strength), "seasonal_strength": float(seasonal_strength), }