"""Models chapter (MODELOS) — cheap unsupervised models, rendered as markdown. Builds the *Modelos* chapter of an AutomaticEDA document from the ``models`` block of a TableProfile (``run_eda_models`` output: ``{pca, kmeans, outliers, normality}``). It renders, as structured markdown/tables/figures that the core paginator never cuts: 1. **Normalization note** — every multivariate model below standardizes the columns with z-score first; the chapter explains why (different scales would otherwise dominate distance/variance). 2. **PCA** — a scree plot (explained + cumulative variance, single Y axis) plus variance and top-loadings tables. 3. **KMeans segments** — a PCA scatter **coloured by cluster** (its own page/slide), the cluster-size table, and a per-cluster LLM micro-analysis with a title for each segment. 4. **Isolation Forest outliers** — a short explanation of how anomalous rows are isolated multivariately and how the threshold is chosen, plus the counts. 5. **Normality** — per-column Jarque-Bera / D'Agostino / Shapiro verdicts. The raw numeric data needed to colour the cluster scatter is **not** in the TableProfile, so — exactly like ``overview`` reads ``head_rows`` from ``ctx`` — this chapter looks for the cluster projection / raw numeric columns in ``ctx`` (or in ``profile``) and degrades honestly when they are absent: it falls back to the uncoloured ``pca.projection`` with a note, or omits the scatter entirely. ctx keys this chapter consumes (all optional): cluster_projection : dict — a pre-computed ``project_clusters_2d`` result (``points``/``labels``/``centers_2d``/``cluster_profiles``/...). Used directly when present (forward-compatible with the calculation phase). raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present and ``cluster_projection`` is not, the chapter calls ``project_clusters_2d`` live to build points + aligned labels. cluster_titles : list — pre-computed ``[{cluster, title, description}]`` (a ``describe_clusters_llm`` ``clusters`` list). Used for the per-cluster micro-analysis without an LLM call (offline/tests). run_cluster_llm : bool — when True and ``cluster_titles`` is absent, call ``describe_clusters_llm`` live on the cluster profiles. cluster_llm_model : str — model id for the live LLM call. Contract: build_(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z". """ from __future__ import annotations from .. import model CHAPTER_VERSION = "1.0.0" CHAPTER_ID = "modelos" CHAPTER_TITLE = "Modelos" # Tableau-10 palette (matplotlib's default cycle) — used both for the matplotlib # scatter and to keep the legend/colours stable per cluster index. _CLUSTER_COLORS = [ "#4e79a7", "#f28e2b", "#e15759", "#76b7b2", "#59a14f", "#edc948", "#b07aa1", "#ff9da7", "#9c755f", "#bab0ac", ] # --------------------------------------------------------------------------- # # Formatting helpers (mirror the overview chapter's defensive style). # --------------------------------------------------------------------------- # def _fmt_num(value, decimals: int = 3) -> str: if value is None: return "—" if isinstance(value, bool): return "sí" if value else "no" if isinstance(value, int): return f"{value:,}".replace(",", ".") if isinstance(value, float): if value != value: # NaN return "NaN" if value in (float("inf"), float("-inf")): return str(value) text = f"{value:.{decimals}f}".rstrip("0").rstrip(".") return text if text else "0" return model._safe_str(value) def _fmt_pct_ratio(value, decimals: int = 1) -> str: """Format a 0..1 ratio as a percentage.""" if value is None: return "—" try: return f"{float(value) * 100:.{decimals}f}%" except (TypeError, ValueError): return model._safe_str(value) def _fmt_pct_already(value, decimals: int = 2) -> str: """Format a value that is *already* a 0..100 percentage.""" if value is None: return "—" try: return f"{float(value):.{decimals}f}%" except (TypeError, ValueError): return model._safe_str(value) def _is_dict(v) -> bool: return isinstance(v, dict) # --------------------------------------------------------------------------- # # Cluster projection: prefer a pre-computed result, else compute it live, else # fall back to the uncoloured PCA projection. # --------------------------------------------------------------------------- # def _resolve_cluster_projection(profile: dict, ctx: dict): """Return (projection_dict_or_None, source_label). Order: ctx/profile['cluster_projection'] (pre-computed) → live project_clusters_2d on ctx/profile['raw_numeric'] → None. """ pre = ctx.get("cluster_projection") or profile.get("cluster_projection") models = profile.get("models") if _is_dict(profile.get("models")) else {} if not pre and _is_dict(models): pre = models.get("cluster_projection") if _is_dict(pre) and pre.get("points"): return pre, "precomputed" raw = ctx.get("raw_numeric") or profile.get("raw_numeric") if _is_dict(raw) and raw: try: # Import the submodule's function explicitly (avoid the package # attribute shadowing the function with the same-named module). from datascience.project_clusters_2d import project_clusters_2d proj = project_clusters_2d(raw) if _is_dict(proj) and proj.get("points"): return proj, "live" except Exception: # noqa: BLE001 — never break the chapter. return None, "none" return None, "none" def _cluster_titles(profile: dict, ctx: dict, projection: dict): """Return a list of {cluster, title, description} for the segments. Order: ctx['cluster_titles'] (pre-computed) → live describe_clusters_llm when ctx['run_cluster_llm'] and we have cluster_profiles → derived titles from the distinctive features → None. """ pre = ctx.get("cluster_titles") if isinstance(pre, list) and pre: return [c for c in pre if _is_dict(c)] profiles = (projection or {}).get("cluster_profiles") or [] feats = (projection or {}).get("feature_names") or [] if ctx.get("run_cluster_llm") and profiles: try: from datascience.describe_clusters_llm import describe_clusters_llm out = describe_clusters_llm( profiles, feats, model=ctx.get("cluster_llm_model", "claude-haiku-4-5-20251001")) clusters = (out or {}).get("clusters") if isinstance(clusters, list) and clusters: return [c for c in clusters if _is_dict(c)] except Exception: # noqa: BLE001 pass # Derived fallback: name each cluster by its distinctive features. if profiles: derived = [] for p in profiles: if not _is_dict(p): continue cid = p.get("cluster", len(derived)) dist = p.get("distinctive") or [] label = ", ".join(model._safe_str(d) for d in dist[:2]) if dist else "" title = f"Segmento {cid}" + (f" — {label}" if label else "") derived.append({"cluster": cid, "title": title, "description": ""}) if derived: return derived return None # --------------------------------------------------------------------------- # # Figure builders (lazy: matplotlib only imported when the renderer draws them). # --------------------------------------------------------------------------- # def _make_scree(pca: dict): """Return a zero-arg callable drawing the PCA scree plot, or None.""" evr = pca.get("explained_variance_ratio") or [] cum = pca.get("cumulative") or [] if not evr: return None def _draw(): import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt comps = list(range(1, len(evr) + 1)) fig, ax = plt.subplots(figsize=(7.0, 4.2)) ax.bar(comps, evr, color="#4e79a7", alpha=0.85, label="Varianza explicada") if cum: ax.plot(comps[:len(cum)], cum, color="#e15759", marker="o", linewidth=1.8, label="Acumulada") ax.set_xlabel("Componente principal") ax.set_ylabel("Proporción de varianza") ax.set_xticks(comps) ax.set_ylim(0, 1.0) ax.grid(axis="y", color="#dddddd", linewidth=0.6) ax.legend(loc="best", fontsize=8, frameon=False) ax.set_title("Varianza explicada por componente (PCA)", fontsize=10) fig.tight_layout() return fig return _draw def _make_cluster_scatter(projection: dict): """Return a zero-arg callable drawing the cluster scatter, or None.""" points = projection.get("points") or [] labels = projection.get("labels") or [] if not points or len(points) != len(labels): return None centers = projection.get("centers_2d") or [] explained = projection.get("explained_2d") or [] def _draw(): import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt fig, ax = plt.subplots(figsize=(7.0, 5.2)) uniq = sorted(set(int(l) for l in labels)) for cl in uniq: xs = [p[0] for p, l in zip(points, labels) if int(l) == cl] ys = [p[1] for p, l in zip(points, labels) if int(l) == cl] color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)] ax.scatter(xs, ys, s=14, c=color, alpha=0.7, linewidths=0, label=f"Cluster {cl} (n={len(xs)})") for cl, c in enumerate(centers): color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)] ax.scatter([c[0]], [c[1]], s=180, c=color, marker="X", edgecolors="black", linewidths=1.2, zorder=5) xlab, ylab = "PC1", "PC2" if len(explained) >= 2: xlab = f"PC1 ({_fmt_pct_ratio(explained[0])} var.)" ylab = f"PC2 ({_fmt_pct_ratio(explained[1])} var.)" ax.set_xlabel(xlab) ax.set_ylabel(ylab) ax.set_title("Segmentos KMeans proyectados sobre el plano PCA", fontsize=10) ax.grid(color="#eeeeee", linewidth=0.5) ax.legend(loc="best", fontsize=8, frameon=True, framealpha=0.9) fig.tight_layout() return fig return _draw # --------------------------------------------------------------------------- # # Section builders. Each returns a list of blocks (possibly empty). # --------------------------------------------------------------------------- # def _normalization_intro() -> list: text = ( "Estos modelos son **no supervisados**: buscan estructura latente sin " "una variable objetivo. Antes de aplicarlos, todas las columnas " "numéricas se **estandarizan con z-score** (cada valor menos la media, " "dividido por la desviación típica). Sin esta normalización, una " "variable con escala grande (p.ej. ingresos en euros) dominaría las " "distancias y la varianza frente a otra de escala pequeña (p.ej. un " "ratio entre 0 y 1), sesgando tanto el PCA como el KMeans. Tras la " "estandarización todas las variables pesan por igual." ) return [model.Heading(text="Modelos no supervisados", level=1), model.Markdown(text=text)] def _pca_section(pca: dict) -> list: if not _is_dict(pca) or not pca.get("explained_variance_ratio"): return [] blocks = [model.Heading(text="PCA — varianza explicada", level=2)] n_used = pca.get("n_rows_used") n_feat = pca.get("n_features") intro = ( f"El PCA resume {_fmt_num(n_feat)} variables numéricas en componentes " f"ortogonales ordenados por la varianza que capturan " f"({_fmt_num(n_used)} filas usadas tras eliminar nulos). El gráfico de " "sedimentación (scree) muestra cuánta varianza aporta cada componente y " "su acumulado: un codo marca cuántos componentes bastan." ) blocks.append(model.Markdown(text=intro)) scree = _make_scree(pca) if scree is not None: blocks.append(model.Figure( make=scree, caption="Varianza explicada y acumulada por componente.")) evr = pca.get("explained_variance_ratio") or [] cum = pca.get("cumulative") or [] rows = [] for i, v in enumerate(evr): acc = cum[i] if i < len(cum) else None rows.append([f"PC{i + 1}", _fmt_pct_ratio(v), _fmt_pct_ratio(acc)]) if rows: blocks.append(model.DataTable( header=["Componente", "Varianza", "Acumulada"], rows=rows, title="Varianza por componente")) # Top loadings: keep the strongest features per component (capped). loadings = pca.get("top_loadings") or [] if loadings: per_comp: dict = {} for ld in loadings: if not _is_dict(ld): continue comp = ld.get("component") per_comp.setdefault(comp, []) if len(per_comp[comp]) < 4: per_comp[comp].append(ld) rows = [] for comp in sorted(per_comp.keys(), key=lambda x: (x is None, x)): for ld in per_comp[comp]: rows.append([f"PC{int(comp) + 1}" if comp is not None else "—", model._safe_str(ld.get("feature")), _fmt_num(ld.get("loading"))]) if rows: blocks.append(model.DataTable( header=["Componente", "Variable", "Carga"], rows=rows, title="Cargas principales (top por componente)", note="Cargas con mayor valor absoluto: qué variables definen " "cada eje.")) return blocks def _kmeans_section(kmeans: dict, projection: dict, titles) -> list: has_km = _is_dict(kmeans) and kmeans.get("best_k") has_proj = _is_dict(projection) and projection.get("points") if not has_km and not has_proj: return [] blocks = [model.Heading(text="Segmentación (KMeans)", level=2)] best_k = (projection or {}).get("best_k") or (kmeans or {}).get("best_k") sil = (projection or {}).get("silhouette") if sil is None: sil = (kmeans or {}).get("silhouette") intro = ( f"KMeans agrupa las filas en **{_fmt_num(best_k)} segmentos** elegidos " "automáticamente maximizando el coeficiente de *silhouette* " f"(**{_fmt_num(sil)}**, rango −1 a 1: cuanto más alto, segmentos más " "compactos y separados). Los segmentos se proyectan sobre el plano de " "los dos primeros componentes principales para visualizarlos." ) blocks.append(model.Markdown(text=intro)) if has_proj: scatter = _make_cluster_scatter(projection) if scatter is not None: blocks.append(model.Figure( make=scatter, caption="Cada punto es una fila coloreada por su segmento " "KMeans; las «X» son los centroides.")) else: blocks.append(model.Note( "Proyección de clusters no dibujable (puntos y etiquetas " "desalineados).")) else: # We have kmeans stats but no aligned points+labels to colour by. blocks.append(model.Note( "Scatter coloreado por segmento no disponible: el perfil no incluye " "la proyección con etiquetas alineadas (pásala en " "ctx['cluster_projection'] o las columnas crudas en " "ctx['raw_numeric'] para colorear el plano PCA).")) # Cluster sizes table. sizes = (projection or {}).get("cluster_sizes") or (kmeans or {}).get("cluster_sizes") or [] total = sum(s for s in sizes if isinstance(s, (int, float))) or 0 if sizes: rows = [] for i, s in enumerate(sizes): pct = (s / total) if total else None rows.append([f"Cluster {i}", _fmt_num(s), _fmt_pct_ratio(pct)]) blocks.append(model.DataTable( header=["Segmento", "Tamaño", "% del total"], rows=rows, title="Tamaño de cada segmento")) # Per-cluster LLM micro-analysis (each entry kept indivisible as one block). if titles: blocks.append(model.Heading(text="Interpretación de los segmentos", level=3)) for t in titles: if not _is_dict(t): continue cid = t.get("cluster") title = model._safe_str(t.get("title")) or f"Cluster {cid}" desc = model._safe_str(t.get("description")) line = f"**Cluster {cid} — {title}.**" if desc: line += " " + desc blocks.append(model.Markdown(text=line)) return blocks def _outliers_section(outliers: dict) -> list: if not _is_dict(outliers) or outliers.get("n_outliers") is None: return [] if outliers.get("note") and not outliers.get("n_rows_used"): # insufficient data — nothing meaningful to show. return [] blocks = [model.Heading(text="Detección de anomalías (Isolation Forest)", level=2)] explain = ( "**Isolation Forest** detecta filas anómalas de forma *multivariante*: " "construye árboles que parten el espacio con cortes aleatorios y mide " "cuántos cortes hacen falta para aislar cada fila. Las filas raras " "(combinaciones de valores poco frecuentes considerando **todas las " "columnas a la vez**, no una sola) se aíslan con muy pocos cortes y " "obtienen un score bajo. El **umbral** de decisión separa las filas " "normales de las anómalas según la contaminación esperada del modelo: " "una fila es outlier cuando su score queda por debajo de ese umbral." ) blocks.append(model.Markdown(text=explain)) blocks.append(model.KVTable(rows=[ ("Filas analizadas", _fmt_num(outliers.get("n_rows_used"))), ("Outliers detectados", _fmt_num(outliers.get("n_outliers"))), ("% outliers", _fmt_pct_already(outliers.get("outlier_pct"))), ("Umbral de decisión", _fmt_num(outliers.get("threshold"), 4)), ], title="Anomalías multivariantes")) return blocks def _normality_section(normality: dict) -> list: if not _is_dict(normality) or not normality: return [] header = ["Columna", "Jarque-Bera (p)", "D'Agostino (p)", "Shapiro (p)", "¿Normal?"] rows = [] for col, res in normality.items(): if not _is_dict(res): continue jb = res.get("jarque_bera") if _is_dict(res.get("jarque_bera")) else {} da = res.get("dagostino") if _is_dict(res.get("dagostino")) else {} sh = res.get("shapiro") if _is_dict(res.get("shapiro")) else {} is_norm = res.get("is_normal") if res.get("note") and is_norm is None and not jb: rows.append([model._safe_str(col), "—", "—", "—", model._safe_str(res.get("note"))]) continue rows.append([ model._safe_str(col), _fmt_num(jb.get("p"), 4) if jb else "—", _fmt_num(da.get("p"), 4) if da else "—", _fmt_num(sh.get("p"), 4) if sh else "—", "sí" if is_norm else ("no" if is_norm is not None else "—"), ]) if not rows: return [] return [ model.Heading(text="Normalidad de las variables", level=2), model.Markdown(text=( "Tests de hipótesis de normalidad por columna (hipótesis nula: la " "muestra proviene de una distribución normal). Se marca **normal** " "cuando el p-valor supera 0,05 (no se rechaza la nula). Pocas " "variables reales son estrictamente normales; esto orienta qué " "transformaciones o tests robustos aplicar después.")), model.DataTable(header=header, rows=rows, title="Pruebas de normalidad"), ] # --------------------------------------------------------------------------- # # Entry point. # --------------------------------------------------------------------------- # def build_modelos(profile: dict, ctx: dict): """Build the MODELOS Chapter, or None if there are no models to show.""" profile = profile or {} ctx = ctx or {} if not isinstance(profile, dict): return None models = profile.get("models") if not _is_dict(models): return None pca = models.get("pca") if _is_dict(models.get("pca")) else None kmeans = models.get("kmeans") if _is_dict(models.get("kmeans")) else None outliers = models.get("outliers") if _is_dict(models.get("outliers")) else None normality = models.get("normality") if _is_dict(models.get("normality")) else None projection, _src = _resolve_cluster_projection(profile, ctx) titles = _cluster_titles(profile, ctx, projection) if ( (kmeans and kmeans.get("best_k")) or (projection and projection.get("points")) ) else None sections = [] sections += _pca_section(pca) if pca else [] sections += _kmeans_section(kmeans, projection, titles) sections += _outliers_section(outliers) if outliers else [] sections += _normality_section(normality) if normality else [] if not sections: return None # models block present but nothing renderable. blocks = _normalization_intro() + sections return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE, version=CHAPTER_VERSION, blocks=blocks)