763e06c127
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
119 lines
4.6 KiB
Python
119 lines
4.6 KiB
Python
"""Deteccion de outliers multivariante con Isolation Forest.
|
|
|
|
Detecta filas anomalas considerando TODAS las columnas a la vez (no columna a
|
|
columna): una fila puede ser normal en cada variable por separado y aun asi ser
|
|
un outlier por la combinacion de sus valores. Pura y determinista
|
|
(`random_state=0`).
|
|
"""
|
|
|
|
import numpy as np
|
|
from sklearn.ensemble import IsolationForest
|
|
from sklearn.preprocessing import StandardScaler
|
|
|
|
|
|
def isolation_forest_outliers(
|
|
columns: dict,
|
|
contamination: float = 0.05,
|
|
max_report: int = 50,
|
|
) -> dict:
|
|
"""Detecta outliers multivariante con Isolation Forest.
|
|
|
|
Args:
|
|
columns: dict {nombre_columna: [valores numericos]}. Todas las listas se
|
|
asumen alineadas por fila (misma longitud, la fila i de cada columna
|
|
forma una observacion). Solo se usan columnas cuyos valores sean
|
|
numericos; las demas se ignoran.
|
|
contamination: proporcion esperada de outliers en [0, 0.5], pasada a
|
|
IsolationForest. Default 0.05.
|
|
max_report: numero maximo de filas anomalas a devolver en
|
|
outlier_rows, las mas anomalas primero. Default 50.
|
|
|
|
Returns:
|
|
dict con:
|
|
n_outliers: numero total de filas marcadas como outlier.
|
|
outlier_pct: porcentaje de outliers sobre filas validas (0-100).
|
|
outlier_rows: lista de {row_index, score} de los outliers, mas
|
|
anomalo primero, truncada a max_report.
|
|
threshold: umbral de decision del modelo (offset_). Una fila es
|
|
outlier cuando su score (decision_function) es < threshold.
|
|
n_rows_used: filas validas usadas (tras descartar filas con None).
|
|
n_features: numero de columnas numericas usadas.
|
|
|
|
IMPORTANTE: row_index es el indice contando SOLO las filas validas (las
|
|
que no tenian ningun None en las columnas numericas usadas), empezando
|
|
en 0 en orden de aparicion. No es el indice en las listas originales si
|
|
se descarto alguna fila por contener None.
|
|
|
|
Si hay menos de 2 columnas numericas o menos de 10 filas validas,
|
|
devuelve {n_outliers: 0, note: "datos insuficientes"} sin petar.
|
|
"""
|
|
# Selecciona solo columnas con todos los valores numericos (ints/floats,
|
|
# bool no cuenta). None se permite a nivel de fila y se filtra despues.
|
|
numeric_cols: dict[str, list] = {}
|
|
for name, values in columns.items():
|
|
if not isinstance(values, (list, tuple)):
|
|
continue
|
|
ok = True
|
|
for v in values:
|
|
if v is None:
|
|
continue
|
|
if isinstance(v, bool) or not isinstance(v, (int, float)):
|
|
ok = False
|
|
break
|
|
if isinstance(v, float) and (np.isnan(v) or np.isinf(v)):
|
|
ok = False
|
|
break
|
|
if ok:
|
|
numeric_cols[name] = list(values)
|
|
|
|
if len(numeric_cols) < 2:
|
|
return {"n_outliers": 0, "note": "datos insuficientes"}
|
|
|
|
col_names = list(numeric_cols.keys())
|
|
n_rows_total = min(len(numeric_cols[c]) for c in col_names)
|
|
|
|
# Construye matriz fila a fila, descartando filas con None en cualquier
|
|
# columna usada. row_index = posicion entre las filas validas.
|
|
rows: list[list[float]] = []
|
|
for i in range(n_rows_total):
|
|
row = [numeric_cols[c][i] for c in col_names]
|
|
if any(v is None for v in row):
|
|
continue
|
|
rows.append([float(v) for v in row])
|
|
|
|
if len(rows) < 10:
|
|
return {"n_outliers": 0, "note": "datos insuficientes"}
|
|
|
|
matrix = np.asarray(rows, dtype=float)
|
|
n_rows_used = matrix.shape[0]
|
|
n_features = matrix.shape[1]
|
|
|
|
# Estandariza para que ninguna columna domine por escala.
|
|
scaled = StandardScaler().fit_transform(matrix)
|
|
|
|
model = IsolationForest(contamination=contamination, random_state=0)
|
|
labels = model.fit_predict(scaled) # -1 = outlier, 1 = inlier
|
|
# decision_function: cuanto menor, mas anomalo. Outlier <=> score < 0
|
|
# tras el ajuste de offset_ que aplica sklearn (score = raw - offset_).
|
|
scores = model.decision_function(scaled)
|
|
threshold = float(model.offset_)
|
|
|
|
outlier_idx = [i for i, lab in enumerate(labels) if lab == -1]
|
|
# Mas anomalo primero (score mas bajo primero).
|
|
outlier_idx.sort(key=lambda i: scores[i])
|
|
|
|
n_outliers = len(outlier_idx)
|
|
outlier_rows = [
|
|
{"row_index": int(i), "score": float(scores[i])}
|
|
for i in outlier_idx[:max_report]
|
|
]
|
|
|
|
return {
|
|
"n_outliers": n_outliers,
|
|
"outlier_pct": round(100.0 * n_outliers / n_rows_used, 4),
|
|
"outlier_rows": outlier_rows,
|
|
"threshold": threshold,
|
|
"n_rows_used": n_rows_used,
|
|
"n_features": n_features,
|
|
}
|