"""Matriz de asociacion unificada para una tabla con columnas de tipos mezclados. Funcion pura del grupo eda. Para cada par de columnas elige la metrica de asociacion adecuada al par de tipos (Pearson/Spearman para num-num, Cramer's V para cat-cat, correlation ratio para num-cat) y, ademas, calcula informacion mutua normalizada como medida comun no-lineal para todos los pares. Devuelve la lista de pares evaluados, el subconjunto de pares fuertes y una leyenda de los metodos. Compone las funciones atomicas del registry; no reimplementa metricas. """ import math from collections import Counter, defaultdict from scipy.stats import chi2_contingency, f_oneway, pearsonr, spearmanr from datascience import ( correlation_ratio, cramers_v, mutual_info_columns, pearson, spearman_corr, theils_u, ) # Modulo hoja directo: no depende de que el paquete reexporte la funcion en su # __init__ (lo integra el orquestador al cerrar el grupo eda). from datascience.fdr_correction import fdr_correction # Tipos que, para efectos de asociacion, se tratan como categoricos. _CATEGORICAL_LIKE = {"categorical", "datetime", "boolean", "text"} def _is_num(v) -> bool: """True si v es un numero real (int/float) que no es bool ni NaN.""" return ( isinstance(v, (int, float)) and not isinstance(v, bool) and not (isinstance(v, float) and math.isnan(v)) ) def _is_numeric_type(t: str) -> bool: return t == "numeric" def _valid_count(values: list, numeric: bool) -> int: """Numero de valores validos: numericos finitos si numeric, no-None si cat.""" if numeric: return sum(1 for v in values if _is_num(v)) return sum(1 for v in values if v is not None) def _cardinality(values: list) -> int: """Numero de valores distintos no-None.""" return len({v for v in values if v is not None}) def _clean_numeric_pairs(xs: list, ys: list) -> tuple[list, list]: """Empareja por indice y conserva solo pares con ambos lados numericos.""" cx: list[float] = [] cy: list[float] = [] for x, y in zip(xs, ys): if _is_num(x) and _is_num(y): cx.append(float(x)) cy.append(float(y)) return cx, cy def _safe_pvalue(value) -> float | None: """Convierte un p-valor de scipy a float, devolviendo None si es NaN/invalido.""" if value is None: return None try: pv = float(value) except (TypeError, ValueError): return None if math.isnan(pv) or math.isinf(pv): return None return pv def _pearson_pvalue(cx: list, cy: list) -> float | None: """p-valor del test de correlacion de Pearson (H0: r == 0). None si degenerado.""" if len(cx) < 3 or len(set(cx)) < 2 or len(set(cy)) < 2: return None try: return _safe_pvalue(pearsonr(cx, cy).pvalue) except Exception: return None def _spearman_pvalue(cx: list, cy: list) -> float | None: """p-valor del test de correlacion de Spearman (H0: rho == 0). None si degenerado.""" if len(cx) < 3 or len(set(cx)) < 2 or len(set(cy)) < 2: return None try: return _safe_pvalue(spearmanr(cx, cy).pvalue) except Exception: return None def _chi2_pvalue(a_vals: list, b_vals: list) -> float | None: """p-valor del test chi-cuadrado de independencia (cat-cat). None si degenerado.""" pairs = [(x, y) for x, y in zip(a_vals, b_vals) if x is not None and y is not None] if len(pairs) < 2: return None rows = sorted({x for x, _ in pairs}, key=repr) cols = sorted({y for _, y in pairs}, key=repr) if len(rows) < 2 or len(cols) < 2: return None row_idx = {v: i for i, v in enumerate(rows)} col_idx = {v: j for j, v in enumerate(cols)} counts = Counter((row_idx[x], col_idx[y]) for x, y in pairs) table = [ [counts.get((i, j), 0) for j in range(len(cols))] for i in range(len(rows)) ] try: return _safe_pvalue(chi2_contingency(table).pvalue) except Exception: return None def _anova_pvalue(cat_vals: list, num_vals: list) -> float | None: """p-valor del ANOVA de una via (H0: misma media numerica por categoria). None si degenerado.""" groups: dict = defaultdict(list) for c, x in zip(cat_vals, num_vals): if c is None or not _is_num(x): continue groups[c].append(float(x)) valid = [g for g in groups.values() if len(g) >= 2] if len(valid) < 2: return None try: return _safe_pvalue(f_oneway(*valid).pvalue) except Exception: return None def association_matrix( columns: dict, strong_threshold: float = 0.5, top_n: int = 20, alpha: float = 0.05, fdr_method: str = "bh", ) -> dict: """Construye la matriz de asociacion de una tabla con tipos mezclados. Para cada par de columnas (i < j) selecciona la metrica adecuada al par de tipos y calcula tambien informacion mutua normalizada como medida comun: - num-num: `pearson` (lineal) y `spearman_corr` (monotonica). El `value` principal es el de mayor valor absoluto; ambos se guardan en `extra`. - cat-cat: `cramers_v` (simetrica) como `value`; `theils_u` en ambas direcciones en `extra` (u_ab = U(a|b), u_ba = U(b|a)). - num-cat: `correlation_ratio(categorias, valores)` como `value`. - Todos los pares: `mutual_info_columns` normalizada en `extra["mi"]`. Se saltan los pares donde alguna columna tenga menos de 3 valores validos o sea de tipo `text` con cardinalidad cercana al numero de filas (ruido sin asociacion util). Es una funcion pura: no falla con dict vacio o una sola columna (devuelve `pairs=[]`, `strong=[]`). Ademas de la magnitud de la asociacion, cada par evaluado lleva un p-valor del test de hipotesis adecuado a su metodo (Pearson/Spearman: test de correlacion; Cramer's V: chi-cuadrado de independencia; correlation ratio: ANOVA de una via; informacion mutua: sin test, p-valor None). Como se evaluan todos los pares a la vez, esos p-valores se corrigen por comparaciones multiples con `fdr_correction` (data-mining bias, Aronson cap. 6) y el subconjunto `strong` se basa en la **significancia corregida**, no solo en superar el umbral de magnitud: un par con magnitud alta pero p-valor ajustado > alpha NO entra en `strong`. Args: columns: dict {nombre_columna: {"values": list, "type": str}} donde type es uno de "numeric", "categorical", "datetime", "boolean", "text". Los tipos datetime/boolean/text se tratan como categoricos. strong_threshold: umbral en [0, 1]. Condicion de magnitud para ser "fuerte": abs(value) >= umbral o extra["mi"] >= umbral. Necesaria pero ya no suficiente (ver alpha). top_n: numero maximo de pares fuertes a devolver, ordenados por relevancia (max(abs(value), mi)) descendente. alpha: nivel de significancia tras la correccion FDR (default 0.05). Un par con p-valor disponible solo es fuerte si ademas su p-valor ajustado <= alpha. fdr_method: metodo de correccion de comparaciones multiples, "bh" (Benjamini-Hochberg, FDR; default) o "bonferroni" (FWER). Returns: dict con claves: pairs: lista de todos los pares evaluados, cada uno {a, b, a_type, b_type, method, value, extra, p_value, p_value_adjusted, significant}. `p_value` es el del test del metodo principal (None si no aplica / degenerado); `p_value_adjusted` el p-valor tras FDR; `significant` True si p_value_adjusted <= alpha. strong: subconjunto de pairs que cumplen magnitud >= umbral Y son significativos tras la correccion (los pares sin test disponible se admiten por magnitud), ordenado por relevancia descendente y truncado a top_n. methods_legend: dict {metodo: descripcion}. n_tests: numero total de pares evaluados (== len(pairs)). multiple_testing: dict {method, alpha, n_tests, n_rejected} con el resumen de la correccion (n_tests aqui = p-valores validos corregidos, puede ser < len(pairs) si algun par no tiene test). """ legend = { "pearson": "num-num lineal (Pearson r), signo indica direccion, [-1, 1]", "spearman": "num-num monotonica (Spearman rho), robusta a outliers, [-1, 1]", "cramers_v": "cat-cat simetrica (Cramer's V, sesgo-corregido), [0, 1]", "theils_u": "cat-cat direccional (Theil's U), incertidumbre explicada, [0, 1]", "correlation_ratio": "num-cat (eta), varianza numerica explicada por la categoria, [0, 1]", "mutual_info": "general no-lineal (NMI normalizada) para cualquier par de tipos, [0, 1]", } names = list(columns.keys()) if len(names) < 2: return {"pairs": [], "strong": [], "methods_legend": legend} n_rows = max( (len(columns[name].get("values", [])) for name in names), default=0, ) def _skip(name: str) -> bool: """True si la columna no aporta asociacion util (pocos validos o text ruidoso).""" col = columns[name] vals = col.get("values", []) ctype = col.get("type", "categorical") numeric = _is_numeric_type(ctype) if _valid_count(vals, numeric) < 3: return True # Texto de cardinalidad ~ n: identificadores/free-text, sin asociacion util. if ctype == "text" and n_rows > 0 and _cardinality(vals) >= 0.9 * n_rows: return True return False pairs: list[dict] = [] for i in range(len(names)): a_name = names[i] if _skip(a_name): continue a_col = columns[a_name] a_vals = a_col.get("values", []) a_type = a_col.get("type", "categorical") a_numeric = _is_numeric_type(a_type) for j in range(i + 1, len(names)): b_name = names[j] if _skip(b_name): continue b_col = columns[b_name] b_vals = b_col.get("values", []) b_type = b_col.get("type", "categorical") b_numeric = _is_numeric_type(b_type) extra: dict = {} # Medida comun no-lineal para todos los pares. mi = mutual_info_columns( a_vals, b_vals, a_numeric=a_numeric, b_numeric=b_numeric, normalized=True, ) extra["mi"] = mi if a_numeric and b_numeric: method = "pearson/spearman" cx, cy = _clean_numeric_pairs(a_vals, b_vals) p = pearson(cx, cy) s = spearman_corr(a_vals, b_vals) extra["pearson"] = p extra["spearman"] = s pearson_p = _pearson_pvalue(cx, cy) spearman_p = _spearman_pvalue(cx, cy) extra["pearson_p"] = pearson_p extra["spearman_p"] = spearman_p if abs(p) >= abs(s): value = p p_value = pearson_p else: value = s p_value = spearman_p elif (not a_numeric) and (not b_numeric): method = "cramers_v" value = cramers_v(a_vals, b_vals) extra["u_ab"] = theils_u(a_vals, b_vals) extra["u_ba"] = theils_u(b_vals, a_vals) p_value = _chi2_pvalue(a_vals, b_vals) else: method = "correlation_ratio" if a_numeric: # a numerica, b categorica. value = correlation_ratio(b_vals, a_vals) p_value = _anova_pvalue(b_vals, a_vals) else: # a categorica, b numerica. value = correlation_ratio(a_vals, b_vals) p_value = _anova_pvalue(a_vals, b_vals) pairs.append( { "a": a_name, "b": b_name, "a_type": a_type, "b_type": b_type, "method": method, "value": value, "extra": extra, "p_value": p_value, } ) # Correccion de comparaciones multiples sobre los p-valores disponibles. # Se pasa la lista completa (incluidos los None de pares sin test): la # correccion devuelve un mapeo alineado 1:1 y los None no cuentan como prueba. fdr = fdr_correction( [pair["p_value"] for pair in pairs], alpha=alpha, method=fdr_method, ) for pair, padj, rej in zip( pairs, fdr["p_values_adjusted"], fdr["reject"] ): pair["p_value_adjusted"] = padj pair["significant"] = bool(rej) def _relevance(pair: dict) -> float: return max(abs(pair["value"]), pair["extra"].get("mi", 0.0)) def _is_strong(pair: dict) -> bool: # Condicion 1: magnitud por encima del umbral (necesaria). magnitude_ok = ( abs(pair["value"]) >= strong_threshold or pair["extra"].get("mi", 0.0) >= strong_threshold ) if not magnitude_ok: return False # Condicion 2: significancia tras la correccion FDR. Los pares sin test # disponible (p_value None, p.ej. informacion mutua o caso degenerado) se # admiten por magnitud, ya que no hay p-valor que corregir. if pair["p_value"] is None: return True return pair["significant"] strong = [pair for pair in pairs if _is_strong(pair)] strong.sort(key=_relevance, reverse=True) strong = strong[:top_n] return { "pairs": pairs, "strong": strong, "methods_legend": legend, "n_tests": len(pairs), "multiple_testing": { "method": fdr_method, "alpha": alpha, "n_tests": fdr["n_tests"], "n_rejected": fdr["n_rejected"], }, }