Files
fn_registry/python/functions/datascience/stl_decompose.py
T
Egutierrez e142ef026d fix(eda): hallazgos de comportamiento del benchmark (H2,H3,H6,H7,H8,H10,H11)
Ronda 4 (verificada con re-corrida sobre los datasets afectados):
- H2: stl_decompose deriva periodo de la frecuencia del indice (seattle period=365
  seasonal_strength=0.84; fin del period=2 espurio)
- H3+H10: infer_fk por senal de nombre (<X>Id->X.<X>Id) + excluir no-clave -> chinook
  111->9 FK, todas reales, cero absurdas, 16-27x mas rapido; base intacta (flag off->111)
- H6: association no computa eta2 si cardinalidad~=n (Ticket-Fare espurio fuera)
- H7: id secuencial monotono excluido de correlacion y PCA/KMeans (PassengerId fuera)
- H8: correlacion de series no estacionarias marcada espuria / sobre retornos
- H11: distribution_type usa modos/cardinalidad/normalidad (quality->discrete)
- 66 tests verdes

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 06:37:47 +02:00

209 lines
7.5 KiB
Python

"""Descomposicion STL de una serie temporal en tendencia/estacional/resto (grupo eda).
Funcion pura y determinista que aplica STL (Seasonal-Trend decomposition using
Loess, Cleveland et al. 1990) via statsmodels y reporta las tres componentes mas
las medidas de fuerza de tendencia y de estacionalidad de Hyndman ("Forecasting:
Principles and Practice", seccion de feature extraction). Util en EDA para
entender que parte de la variacion de una serie es tendencia, ciclo estacional o
ruido antes de modelar o desestacionalizar.
"""
from __future__ import annotations
import math
import numpy as np
from statsmodels.tsa.seasonal import STL
def _clean(values: list) -> list[float]:
"""Conserva solo valores numericos finitos, descartando None/NaN/no-numericos."""
out: list[float] = []
for v in values:
if v is None or isinstance(v, bool):
continue
if not isinstance(v, (int, float)):
continue
x = float(v)
if math.isnan(x) or math.isinf(x):
continue
out.append(x)
return out
def _infer_period(arr: np.ndarray, max_period: int) -> int | None:
"""Infiere el periodo estacional dominante via autocorrelacion del residuo detrended.
Busca el retardo (entre 2 y ``max_period``) que maximiza la autocorrelacion
de la serie tras restarle su tendencia lineal. El detrend es clave: sobre una
serie con tendencia la autocorrelacion cruda decae monotonamente y el retardo
minimo (2) gana siempre, produciendo un ``period=2`` espurio que enmascara la
estacionalidad real (falso negativo). Quitando primero la recta de mejor ajuste
por minimos cuadrados, el lag ganador refleja el ciclo estacional y no la deriva.
Devuelve None si no encuentra un pico claro (autocorrelacion maxima por debajo
de un umbral pequeno).
"""
n = len(arr)
if n < 6:
return None
# Detrend lineal: resta la recta de mejor ajuste para que la tendencia no
# domine la autocorrelacion (si no, lag=2 gana siempre en series con deriva).
t = np.arange(n, dtype=float)
try:
slope, intercept = np.polyfit(t, arr, 1)
detrended = arr - (slope * t + intercept)
except (np.linalg.LinAlgError, ValueError):
detrended = arr - arr.mean()
x = detrended - detrended.mean()
denom = float(np.dot(x, x))
if denom == 0.0:
return None
best_lag = None
best_corr = 0.0
upper = min(max_period, n // 2)
for lag in range(2, upper + 1):
corr = float(np.dot(x[:-lag], x[lag:]) / denom)
if corr > best_corr:
best_corr = corr
best_lag = lag
if best_lag is None or best_corr < 0.2:
return None
return best_lag
def _summarize(component: list[float], max_inline: int = 200) -> dict:
"""Resume una componente: la incluye entera si es corta, si no estadisticos."""
arr = np.asarray(component, dtype=float)
summary = {
"min": float(arr.min()),
"max": float(arr.max()),
"mean": float(arr.mean()),
"std": float(arr.std(ddof=0)),
}
if len(component) <= max_inline:
summary["values"] = [float(v) for v in component]
else:
summary["values"] = None
summary["note"] = f"serie larga ({len(component)} puntos): solo estadisticos"
return summary
def stl_decompose(values: list, period: int = None, robust: bool = True) -> dict:
"""Descompone una serie temporal en tendencia, estacional y resto via STL.
Aplica STL (Seasonal-Trend decomposition using Loess) sobre ``values`` y
devuelve las tres componentes (resumidas si la serie es larga) junto a la
fuerza de tendencia y la fuerza estacional de Hyndman::
F_trend = max(0, 1 - Var(resto) / Var(resto + tendencia))
F_seasonal = max(0, 1 - Var(resto) / Var(resto + estacional))
Ambas en ``[0, 1]``: cercano a 1 indica una componente fuerte y bien
definida; cercano a 0 indica que esa componente apenas existe.
Funcion pura y determinista: no hace I/O, no muta los inputs.
Args:
values: serie temporal de valores numericos en orden cronologico.
None/NaN/infinitos/no-numericos se descartan antes de descomponer.
period: periodo estacional (numero de observaciones por ciclo, p.ej. 12
para datos mensuales con estacionalidad anual). Si es ``None`` se
intenta inferir por autocorrelacion; si no se halla un periodo
claro, se devuelve una nota.
robust: si ``True`` (default) usa el ajuste robusto de STL, que reduce el
efecto de outliers sobre tendencia y estacionalidad.
Returns:
Con menos de ``2 * period`` puntos (o <8 si no hay periodo) devuelve un
dict con ``note`` explicando por que no se pudo descomponer y
``trend_strength``/``seasonal_strength`` en ``None``.
En otro caso un dict con::
{
"n": int,
"period": int, # periodo usado (inferido o dado)
"period_inferred": bool, # True si se infirio automaticamente
"robust": bool,
"trend": {min,max,mean,std, values|note},
"seasonal": {...},
"resid": {...},
"trend_strength": float, # F_trend de Hyndman en [0,1]
"seasonal_strength": float, # F_seasonal de Hyndman en [0,1]
}
"""
clean = _clean(values)
n = len(clean)
if n < 8:
return {
"n": n,
"note": "datos insuficientes",
"trend_strength": None,
"seasonal_strength": None,
}
arr = np.asarray(clean, dtype=float)
inferred = False
if period is None:
period = _infer_period(arr, max_period=max(2, n // 2))
inferred = True
if period is None:
return {
"n": n,
"note": "no se pudo inferir un periodo estacional; pasa period explicito",
"trend_strength": None,
"seasonal_strength": None,
}
period = int(period)
if period < 2:
return {
"n": n,
"note": "period debe ser >= 2",
"trend_strength": None,
"seasonal_strength": None,
}
# STL exige al menos dos ciclos completos.
if n < 2 * period:
return {
"n": n,
"period": period,
"note": f"serie corta: STL necesita >= 2*period ({2 * period}) puntos",
"trend_strength": None,
"seasonal_strength": None,
}
result = STL(arr, period=period, robust=robust).fit()
trend = np.asarray(result.trend, dtype=float)
seasonal = np.asarray(result.seasonal, dtype=float)
resid = np.asarray(result.resid, dtype=float)
# Fuerza de tendencia y estacional (Hyndman). Var con ddof=0.
var_resid = float(np.var(resid, ddof=0))
var_resid_trend = float(np.var(resid + trend, ddof=0))
var_resid_seasonal = float(np.var(resid + seasonal, ddof=0))
trend_strength = (
max(0.0, 1.0 - var_resid / var_resid_trend) if var_resid_trend > 0 else 0.0
)
seasonal_strength = (
max(0.0, 1.0 - var_resid / var_resid_seasonal)
if var_resid_seasonal > 0
else 0.0
)
return {
"n": n,
"period": period,
"period_inferred": bool(inferred),
"robust": bool(robust),
"trend": _summarize(trend.tolist()),
"seasonal": _summarize(seasonal.tolist()),
"resid": _summarize(resid.tolist()),
"trend_strength": float(trend_strength),
"seasonal_strength": float(seasonal_strength),
}