"""Explica que dimensiones (columnas) hacen rara cada fila anomala. Toma la salida multivariante de `isolation_forest_outliers` (lista de `{row_index, score}`) y, para cada outlier, devuelve las columnas con mayor |z-score| respecto a la distribucion de las filas validas. Es la capa de "explicabilidad" del paso de outliers multivariante en la fase EDA: el Isolation Forest dice QUE filas son raras, esta funcion dice POR QUE (en que columnas se desvian mas). Pura y determinista: reconstruye EXACTAMENTE las mismas "filas validas" que usa `isolation_forest_outliers` (mismo filtro de columnas numericas y mismo descarte de filas con None), de modo que el `row_index` apunta a la misma fila en ambas funciones. No hace I/O ni depende de estado. """ import math import numpy as np def _is_finite_number(v) -> bool: """True si v es int/float finito. bool NO cuenta; NaN/Inf tampoco.""" if isinstance(v, bool): return False if not isinstance(v, (int, float)): return False if isinstance(v, float) and (math.isnan(v) or math.isinf(v)): return False return True def summarize_outlier_dims( raw_numeric: dict, outlier_rows: list, top_k: int = 3, ) -> list: """Resume las dimensiones que mas desvian a cada fila anomala. Args: raw_numeric: dict {nombre_columna: [valores]} alineado por fila (como ctx['raw_numeric'] del motor AutomaticEDA). Solo se usan columnas cuyos valores sean todos numericos (None permitido por fila; bool, str, NaN e Inf descartan la columna entera) — filtro identico al de isolation_forest_outliers. outlier_rows: lista de {row_index, score} tal como la devuelve isolation_forest_outliers. row_index cuenta SOLO las filas validas (sin None) en orden de aparicion, empezando en 0. top_k: numero de columnas (las de mayor |z-score|) a reportar por cada outlier. Default 3. Valores invalidos caen a 3. Returns: Lista paralela a outlier_rows (mismo orden) de dicts {row_index, score, dims}, donde dims es la lista de hasta top_k columnas ordenadas por |z| descendente: [{col, value, z}, ...] con z redondeado a 3 decimales. Las entradas de outlier_rows fuera de rango o malformadas se omiten (defensivo). Ante raw_numeric vacio/no-dict, outlier_rows no-lista, 0 columnas numericas o 0 filas validas devuelve []. """ # Validacion defensiva de los argumentos principales. if not isinstance(raw_numeric, dict) or not isinstance(outlier_rows, list): return [] if not isinstance(top_k, int) or isinstance(top_k, bool) or top_k < 1: top_k = 3 # Seleccion de columnas numericas: identica a isolation_forest_outliers. # Una columna entra solo si todos sus valores son numericos (None permitido # por fila); cualquier bool/str/NaN/Inf descarta la columna completa. numeric_cols: dict[str, list] = {} for name, values in raw_numeric.items(): if not isinstance(values, (list, tuple)): continue ok = True for v in values: if v is None: continue if not _is_finite_number(v): ok = False break if ok: numeric_cols[name] = list(values) if len(numeric_cols) < 1: return [] col_names = list(numeric_cols.keys()) try: n_rows_total = min(len(numeric_cols[c]) for c in col_names) except ValueError: return [] # Reconstruye las filas validas con el MISMO criterio que el detector: la # fila i toma un valor por columna; si cualquier valor es None, la fila se # descarta y NO incrementa el indice valido. Asi row_index de outlier_rows # apunta a esta misma secuencia (base 0, orden de aparicion). valid_rows: list[list[float]] = [] for i in range(n_rows_total): row = [numeric_cols[c][i] for c in col_names] if any(v is None for v in row): continue valid_rows.append([float(v) for v in row]) if not valid_rows: return [] matrix = np.asarray(valid_rows, dtype=float) n_valid = matrix.shape[0] means = matrix.mean(axis=0) stds = matrix.std(axis=0, ddof=0) # poblacional (ddof=0) out: list = [] for entry in outlier_rows: if not isinstance(entry, dict): continue ri = entry.get("row_index") # bool es subclase de int: lo excluimos explicitamente. if not isinstance(ri, int) or isinstance(ri, bool): continue if ri < 0 or ri >= n_valid: continue try: score = float(entry.get("score")) except (TypeError, ValueError): score = 0.0 row = matrix[ri] dims = [] for j, name in enumerate(col_names): std = stds[j] if std == 0.0: z = 0.0 else: z = float((row[j] - means[j]) / std) dims.append({"col": name, "value": float(row[j]), "z": z}) # Mayor |z| primero; sort estable, empates por orden de columna. dims.sort(key=lambda d: abs(d["z"]), reverse=True) dims = dims[:top_k] for d in dims: d["z"] = round(d["z"], 3) out.append({"row_index": int(ri), "score": score, "dims": dims}) return out