diff --git a/python/functions/datascience/automatic_eda/chapters/modelos.py b/python/functions/datascience/automatic_eda/chapters/modelos.py new file mode 100644 index 00000000..ffc43346 --- /dev/null +++ b/python/functions/datascience/automatic_eda/chapters/modelos.py @@ -0,0 +1,498 @@ +"""Models chapter (MODELOS) — cheap unsupervised models, rendered as markdown. + +Builds the *Modelos* chapter of an AutomaticEDA document from the ``models`` +block of a TableProfile (``run_eda_models`` output: ``{pca, kmeans, outliers, +normality}``). It renders, as structured markdown/tables/figures that the core +paginator never cuts: + +1. **Normalization note** — every multivariate model below standardizes the + columns with z-score first; the chapter explains why (different scales would + otherwise dominate distance/variance). +2. **PCA** — a scree plot (explained + cumulative variance, single Y axis) plus + variance and top-loadings tables. +3. **KMeans segments** — a PCA scatter **coloured by cluster** (its own + page/slide), the cluster-size table, and a per-cluster LLM micro-analysis + with a title for each segment. +4. **Isolation Forest outliers** — a short explanation of how anomalous rows are + isolated multivariately and how the threshold is chosen, plus the counts. +5. **Normality** — per-column Jarque-Bera / D'Agostino / Shapiro verdicts. + +The raw numeric data needed to colour the cluster scatter is **not** in the +TableProfile, so — exactly like ``overview`` reads ``head_rows`` from ``ctx`` — +this chapter looks for the cluster projection / raw numeric columns in ``ctx`` +(or in ``profile``) and degrades honestly when they are absent: it falls back to +the uncoloured ``pca.projection`` with a note, or omits the scatter entirely. + +ctx keys this chapter consumes (all optional): + cluster_projection : dict — a pre-computed ``project_clusters_2d`` result + (``points``/``labels``/``centers_2d``/``cluster_profiles``/...). Used + directly when present (forward-compatible with the calculation phase). + raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present + and ``cluster_projection`` is not, the chapter calls + ``project_clusters_2d`` live to build points + aligned labels. + cluster_titles : list — pre-computed ``[{cluster, title, description}]`` + (a ``describe_clusters_llm`` ``clusters`` list). Used for the per-cluster + micro-analysis without an LLM call (offline/tests). + run_cluster_llm : bool — when True and ``cluster_titles`` is absent, call + ``describe_clusters_llm`` live on the cluster profiles. + cluster_llm_model : str — model id for the live LLM call. + +Contract: build_(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z". +""" + +from __future__ import annotations + +from .. import model + +CHAPTER_VERSION = "1.0.0" +CHAPTER_ID = "modelos" +CHAPTER_TITLE = "Modelos" + +# Tableau-10 palette (matplotlib's default cycle) — used both for the matplotlib +# scatter and to keep the legend/colours stable per cluster index. +_CLUSTER_COLORS = [ + "#4e79a7", "#f28e2b", "#e15759", "#76b7b2", "#59a14f", + "#edc948", "#b07aa1", "#ff9da7", "#9c755f", "#bab0ac", +] + + +# --------------------------------------------------------------------------- # +# Formatting helpers (mirror the overview chapter's defensive style). +# --------------------------------------------------------------------------- # +def _fmt_num(value, decimals: int = 3) -> str: + if value is None: + return "—" + if isinstance(value, bool): + return "sí" if value else "no" + if isinstance(value, int): + return f"{value:,}".replace(",", ".") + if isinstance(value, float): + if value != value: # NaN + return "NaN" + if value in (float("inf"), float("-inf")): + return str(value) + text = f"{value:.{decimals}f}".rstrip("0").rstrip(".") + return text if text else "0" + return model._safe_str(value) + + +def _fmt_pct_ratio(value, decimals: int = 1) -> str: + """Format a 0..1 ratio as a percentage.""" + if value is None: + return "—" + try: + return f"{float(value) * 100:.{decimals}f}%" + except (TypeError, ValueError): + return model._safe_str(value) + + +def _fmt_pct_already(value, decimals: int = 2) -> str: + """Format a value that is *already* a 0..100 percentage.""" + if value is None: + return "—" + try: + return f"{float(value):.{decimals}f}%" + except (TypeError, ValueError): + return model._safe_str(value) + + +def _is_dict(v) -> bool: + return isinstance(v, dict) + + +# --------------------------------------------------------------------------- # +# Cluster projection: prefer a pre-computed result, else compute it live, else +# fall back to the uncoloured PCA projection. +# --------------------------------------------------------------------------- # +def _resolve_cluster_projection(profile: dict, ctx: dict): + """Return (projection_dict_or_None, source_label). + + Order: ctx/profile['cluster_projection'] (pre-computed) → live + project_clusters_2d on ctx/profile['raw_numeric'] → None. + """ + pre = ctx.get("cluster_projection") or profile.get("cluster_projection") + models = profile.get("models") if _is_dict(profile.get("models")) else {} + if not pre and _is_dict(models): + pre = models.get("cluster_projection") + if _is_dict(pre) and pre.get("points"): + return pre, "precomputed" + + raw = ctx.get("raw_numeric") or profile.get("raw_numeric") + if _is_dict(raw) and raw: + try: + # Import the submodule's function explicitly (avoid the package + # attribute shadowing the function with the same-named module). + from datascience.project_clusters_2d import project_clusters_2d + proj = project_clusters_2d(raw) + if _is_dict(proj) and proj.get("points"): + return proj, "live" + except Exception: # noqa: BLE001 — never break the chapter. + return None, "none" + return None, "none" + + +def _cluster_titles(profile: dict, ctx: dict, projection: dict): + """Return a list of {cluster, title, description} for the segments. + + Order: ctx['cluster_titles'] (pre-computed) → live describe_clusters_llm when + ctx['run_cluster_llm'] and we have cluster_profiles → derived titles from the + distinctive features → None. + """ + pre = ctx.get("cluster_titles") + if isinstance(pre, list) and pre: + return [c for c in pre if _is_dict(c)] + + profiles = (projection or {}).get("cluster_profiles") or [] + feats = (projection or {}).get("feature_names") or [] + if ctx.get("run_cluster_llm") and profiles: + try: + from datascience.describe_clusters_llm import describe_clusters_llm + out = describe_clusters_llm( + profiles, feats, + model=ctx.get("cluster_llm_model", "claude-haiku-4-5-20251001")) + clusters = (out or {}).get("clusters") + if isinstance(clusters, list) and clusters: + return [c for c in clusters if _is_dict(c)] + except Exception: # noqa: BLE001 + pass + + # Derived fallback: name each cluster by its distinctive features. + if profiles: + derived = [] + for p in profiles: + if not _is_dict(p): + continue + cid = p.get("cluster", len(derived)) + dist = p.get("distinctive") or [] + label = ", ".join(model._safe_str(d) for d in dist[:2]) if dist else "" + title = f"Segmento {cid}" + (f" — {label}" if label else "") + derived.append({"cluster": cid, "title": title, "description": ""}) + if derived: + return derived + return None + + +# --------------------------------------------------------------------------- # +# Figure builders (lazy: matplotlib only imported when the renderer draws them). +# --------------------------------------------------------------------------- # +def _make_scree(pca: dict): + """Return a zero-arg callable drawing the PCA scree plot, or None.""" + evr = pca.get("explained_variance_ratio") or [] + cum = pca.get("cumulative") or [] + if not evr: + return None + + def _draw(): + import matplotlib + matplotlib.use("Agg") + import matplotlib.pyplot as plt + + comps = list(range(1, len(evr) + 1)) + fig, ax = plt.subplots(figsize=(7.0, 4.2)) + ax.bar(comps, evr, color="#4e79a7", alpha=0.85, + label="Varianza explicada") + if cum: + ax.plot(comps[:len(cum)], cum, color="#e15759", marker="o", + linewidth=1.8, label="Acumulada") + ax.set_xlabel("Componente principal") + ax.set_ylabel("Proporción de varianza") + ax.set_xticks(comps) + ax.set_ylim(0, 1.0) + ax.grid(axis="y", color="#dddddd", linewidth=0.6) + ax.legend(loc="best", fontsize=8, frameon=False) + ax.set_title("Varianza explicada por componente (PCA)", fontsize=10) + fig.tight_layout() + return fig + + return _draw + + +def _make_cluster_scatter(projection: dict): + """Return a zero-arg callable drawing the cluster scatter, or None.""" + points = projection.get("points") or [] + labels = projection.get("labels") or [] + if not points or len(points) != len(labels): + return None + centers = projection.get("centers_2d") or [] + explained = projection.get("explained_2d") or [] + + def _draw(): + import matplotlib + matplotlib.use("Agg") + import matplotlib.pyplot as plt + + fig, ax = plt.subplots(figsize=(7.0, 5.2)) + uniq = sorted(set(int(l) for l in labels)) + for cl in uniq: + xs = [p[0] for p, l in zip(points, labels) if int(l) == cl] + ys = [p[1] for p, l in zip(points, labels) if int(l) == cl] + color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)] + ax.scatter(xs, ys, s=14, c=color, alpha=0.7, linewidths=0, + label=f"Cluster {cl} (n={len(xs)})") + for cl, c in enumerate(centers): + color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)] + ax.scatter([c[0]], [c[1]], s=180, c=color, marker="X", + edgecolors="black", linewidths=1.2, zorder=5) + xlab, ylab = "PC1", "PC2" + if len(explained) >= 2: + xlab = f"PC1 ({_fmt_pct_ratio(explained[0])} var.)" + ylab = f"PC2 ({_fmt_pct_ratio(explained[1])} var.)" + ax.set_xlabel(xlab) + ax.set_ylabel(ylab) + ax.set_title("Segmentos KMeans proyectados sobre el plano PCA", + fontsize=10) + ax.grid(color="#eeeeee", linewidth=0.5) + ax.legend(loc="best", fontsize=8, frameon=True, framealpha=0.9) + fig.tight_layout() + return fig + + return _draw + + +# --------------------------------------------------------------------------- # +# Section builders. Each returns a list of blocks (possibly empty). +# --------------------------------------------------------------------------- # +def _normalization_intro() -> list: + text = ( + "Estos modelos son **no supervisados**: buscan estructura latente sin " + "una variable objetivo. Antes de aplicarlos, todas las columnas " + "numéricas se **estandarizan con z-score** (cada valor menos la media, " + "dividido por la desviación típica). Sin esta normalización, una " + "variable con escala grande (p.ej. ingresos en euros) dominaría las " + "distancias y la varianza frente a otra de escala pequeña (p.ej. un " + "ratio entre 0 y 1), sesgando tanto el PCA como el KMeans. Tras la " + "estandarización todas las variables pesan por igual." + ) + return [model.Heading(text="Modelos no supervisados", level=1), + model.Markdown(text=text)] + + +def _pca_section(pca: dict) -> list: + if not _is_dict(pca) or not pca.get("explained_variance_ratio"): + return [] + blocks = [model.Heading(text="PCA — varianza explicada", level=2)] + + n_used = pca.get("n_rows_used") + n_feat = pca.get("n_features") + intro = ( + f"El PCA resume {_fmt_num(n_feat)} variables numéricas en componentes " + f"ortogonales ordenados por la varianza que capturan " + f"({_fmt_num(n_used)} filas usadas tras eliminar nulos). El gráfico de " + "sedimentación (scree) muestra cuánta varianza aporta cada componente y " + "su acumulado: un codo marca cuántos componentes bastan." + ) + blocks.append(model.Markdown(text=intro)) + + scree = _make_scree(pca) + if scree is not None: + blocks.append(model.Figure( + make=scree, caption="Varianza explicada y acumulada por componente.")) + + evr = pca.get("explained_variance_ratio") or [] + cum = pca.get("cumulative") or [] + rows = [] + for i, v in enumerate(evr): + acc = cum[i] if i < len(cum) else None + rows.append([f"PC{i + 1}", _fmt_pct_ratio(v), _fmt_pct_ratio(acc)]) + if rows: + blocks.append(model.DataTable( + header=["Componente", "Varianza", "Acumulada"], rows=rows, + title="Varianza por componente")) + + # Top loadings: keep the strongest features per component (capped). + loadings = pca.get("top_loadings") or [] + if loadings: + per_comp: dict = {} + for ld in loadings: + if not _is_dict(ld): + continue + comp = ld.get("component") + per_comp.setdefault(comp, []) + if len(per_comp[comp]) < 4: + per_comp[comp].append(ld) + rows = [] + for comp in sorted(per_comp.keys(), key=lambda x: (x is None, x)): + for ld in per_comp[comp]: + rows.append([f"PC{int(comp) + 1}" if comp is not None else "—", + model._safe_str(ld.get("feature")), + _fmt_num(ld.get("loading"))]) + if rows: + blocks.append(model.DataTable( + header=["Componente", "Variable", "Carga"], rows=rows, + title="Cargas principales (top por componente)", + note="Cargas con mayor valor absoluto: qué variables definen " + "cada eje.")) + return blocks + + +def _kmeans_section(kmeans: dict, projection: dict, titles) -> list: + has_km = _is_dict(kmeans) and kmeans.get("best_k") + has_proj = _is_dict(projection) and projection.get("points") + if not has_km and not has_proj: + return [] + + blocks = [model.Heading(text="Segmentación (KMeans)", level=2)] + + best_k = (projection or {}).get("best_k") or (kmeans or {}).get("best_k") + sil = (projection or {}).get("silhouette") + if sil is None: + sil = (kmeans or {}).get("silhouette") + intro = ( + f"KMeans agrupa las filas en **{_fmt_num(best_k)} segmentos** elegidos " + "automáticamente maximizando el coeficiente de *silhouette* " + f"(**{_fmt_num(sil)}**, rango −1 a 1: cuanto más alto, segmentos más " + "compactos y separados). Los segmentos se proyectan sobre el plano de " + "los dos primeros componentes principales para visualizarlos." + ) + blocks.append(model.Markdown(text=intro)) + + if has_proj: + scatter = _make_cluster_scatter(projection) + if scatter is not None: + blocks.append(model.Figure( + make=scatter, + caption="Cada punto es una fila coloreada por su segmento " + "KMeans; las «X» son los centroides.")) + else: + blocks.append(model.Note( + "Proyección de clusters no dibujable (puntos y etiquetas " + "desalineados).")) + else: + # We have kmeans stats but no aligned points+labels to colour by. + blocks.append(model.Note( + "Scatter coloreado por segmento no disponible: el perfil no incluye " + "la proyección con etiquetas alineadas (pásala en " + "ctx['cluster_projection'] o las columnas crudas en " + "ctx['raw_numeric'] para colorear el plano PCA).")) + + # Cluster sizes table. + sizes = (projection or {}).get("cluster_sizes") or (kmeans or {}).get("cluster_sizes") or [] + total = sum(s for s in sizes if isinstance(s, (int, float))) or 0 + if sizes: + rows = [] + for i, s in enumerate(sizes): + pct = (s / total) if total else None + rows.append([f"Cluster {i}", _fmt_num(s), _fmt_pct_ratio(pct)]) + blocks.append(model.DataTable( + header=["Segmento", "Tamaño", "% del total"], rows=rows, + title="Tamaño de cada segmento")) + + # Per-cluster LLM micro-analysis (each entry kept indivisible as one block). + if titles: + blocks.append(model.Heading(text="Interpretación de los segmentos", + level=3)) + for t in titles: + if not _is_dict(t): + continue + cid = t.get("cluster") + title = model._safe_str(t.get("title")) or f"Cluster {cid}" + desc = model._safe_str(t.get("description")) + line = f"**Cluster {cid} — {title}.**" + if desc: + line += " " + desc + blocks.append(model.Markdown(text=line)) + return blocks + + +def _outliers_section(outliers: dict) -> list: + if not _is_dict(outliers) or outliers.get("n_outliers") is None: + return [] + if outliers.get("note") and not outliers.get("n_rows_used"): + # insufficient data — nothing meaningful to show. + return [] + blocks = [model.Heading(text="Detección de anomalías (Isolation Forest)", + level=2)] + explain = ( + "**Isolation Forest** detecta filas anómalas de forma *multivariante*: " + "construye árboles que parten el espacio con cortes aleatorios y mide " + "cuántos cortes hacen falta para aislar cada fila. Las filas raras " + "(combinaciones de valores poco frecuentes considerando **todas las " + "columnas a la vez**, no una sola) se aíslan con muy pocos cortes y " + "obtienen un score bajo. El **umbral** de decisión separa las filas " + "normales de las anómalas según la contaminación esperada del modelo: " + "una fila es outlier cuando su score queda por debajo de ese umbral." + ) + blocks.append(model.Markdown(text=explain)) + blocks.append(model.KVTable(rows=[ + ("Filas analizadas", _fmt_num(outliers.get("n_rows_used"))), + ("Outliers detectados", _fmt_num(outliers.get("n_outliers"))), + ("% outliers", _fmt_pct_already(outliers.get("outlier_pct"))), + ("Umbral de decisión", _fmt_num(outliers.get("threshold"), 4)), + ], title="Anomalías multivariantes")) + return blocks + + +def _normality_section(normality: dict) -> list: + if not _is_dict(normality) or not normality: + return [] + header = ["Columna", "Jarque-Bera (p)", "D'Agostino (p)", "Shapiro (p)", + "¿Normal?"] + rows = [] + for col, res in normality.items(): + if not _is_dict(res): + continue + jb = res.get("jarque_bera") if _is_dict(res.get("jarque_bera")) else {} + da = res.get("dagostino") if _is_dict(res.get("dagostino")) else {} + sh = res.get("shapiro") if _is_dict(res.get("shapiro")) else {} + is_norm = res.get("is_normal") + if res.get("note") and is_norm is None and not jb: + rows.append([model._safe_str(col), "—", "—", "—", + model._safe_str(res.get("note"))]) + continue + rows.append([ + model._safe_str(col), + _fmt_num(jb.get("p"), 4) if jb else "—", + _fmt_num(da.get("p"), 4) if da else "—", + _fmt_num(sh.get("p"), 4) if sh else "—", + "sí" if is_norm else ("no" if is_norm is not None else "—"), + ]) + if not rows: + return [] + return [ + model.Heading(text="Normalidad de las variables", level=2), + model.Markdown(text=( + "Tests de hipótesis de normalidad por columna (hipótesis nula: la " + "muestra proviene de una distribución normal). Se marca **normal** " + "cuando el p-valor supera 0,05 (no se rechaza la nula). Pocas " + "variables reales son estrictamente normales; esto orienta qué " + "transformaciones o tests robustos aplicar después.")), + model.DataTable(header=header, rows=rows, + title="Pruebas de normalidad"), + ] + + +# --------------------------------------------------------------------------- # +# Entry point. +# --------------------------------------------------------------------------- # +def build_modelos(profile: dict, ctx: dict): + """Build the MODELOS Chapter, or None if there are no models to show.""" + profile = profile or {} + ctx = ctx or {} + if not isinstance(profile, dict): + return None + models = profile.get("models") + if not _is_dict(models): + return None + + pca = models.get("pca") if _is_dict(models.get("pca")) else None + kmeans = models.get("kmeans") if _is_dict(models.get("kmeans")) else None + outliers = models.get("outliers") if _is_dict(models.get("outliers")) else None + normality = models.get("normality") if _is_dict(models.get("normality")) else None + + projection, _src = _resolve_cluster_projection(profile, ctx) + titles = _cluster_titles(profile, ctx, projection) if ( + (kmeans and kmeans.get("best_k")) or (projection and projection.get("points")) + ) else None + + sections = [] + sections += _pca_section(pca) if pca else [] + sections += _kmeans_section(kmeans, projection, titles) + sections += _outliers_section(outliers) if outliers else [] + sections += _normality_section(normality) if normality else [] + + if not sections: + return None # models block present but nothing renderable. + + blocks = _normalization_intro() + sections + return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE, + version=CHAPTER_VERSION, blocks=blocks) diff --git a/python/functions/datascience/automatic_eda/chapters/modelos_test.py b/python/functions/datascience/automatic_eda/chapters/modelos_test.py new file mode 100644 index 00000000..9d2597a5 --- /dev/null +++ b/python/functions/datascience/automatic_eda/chapters/modelos_test.py @@ -0,0 +1,259 @@ +"""Tests for the MODELOS chapter — DoD: golden + edges + anti-cut. + +Self-contained: builds a synthetic TableProfile with a ``models`` block (no +DuckDB, no sklearn, no LLM, no network). The cluster scatter is fed a synthetic +pre-computed ``cluster_projection`` via ``ctx`` and the per-cluster titles via +``ctx['cluster_titles']`` so the suite is fast and deterministic. The live paths +(``project_clusters_2d`` / ``describe_clusters_llm``) are exercised against the +real wine dataset in the work report, not here. + +Verifies: the chapter renders to PDF *and* PPTX showing the user-required pieces +(markdown text, PCA scree, cluster scatter, per-cluster LLM micro-analysis, +outlier + normalization explanations); that an inapplicable profile yields None +without raising; and that a long normality table is split without losing any +column (anti-cut). +""" + +import os +import re +import tempfile + +from pypdf import PdfReader +from pptx import Presentation + +from datascience.automatic_eda.chapters.modelos import build_modelos +from datascience.automatic_eda.model import Figure, DataTable, Markdown +from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf +from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx + + +# --------------------------------------------------------------------------- # +# Synthetic fixtures. +# --------------------------------------------------------------------------- # +def _models_block(n_norm_cols: int = 4) -> dict: + feats = ["fixed_acidity", "alcohol", "ph", "sulphates"] + normality = {} + for i in range(n_norm_cols): + normality[f"col_{i}"] = { + "n": 500, + "jarque_bera": {"stat": 12.3, "p": 0.002 + i * 0.0001, "normal": False}, + "dagostino": {"stat": 9.1, "p": 0.01, "normal": False}, + "shapiro": {"stat": 0.98, "p": 0.04, "normal": False}, + "is_normal": False, + } + return { + "n_numeric_cols": 4, + "pca": { + "n_components": 2, "n_rows_used": 1599, "n_features": 4, + "explained_variance_ratio": [0.41, 0.22], + "cumulative": [0.41, 0.63], + "top_loadings": [ + {"component": 0, "feature": "alcohol", "loading": 0.62}, + {"component": 0, "feature": "fixed_acidity", "loading": -0.48}, + {"component": 1, "feature": "ph", "loading": 0.71}, + {"component": 1, "feature": "sulphates", "loading": 0.33}, + ], + "projection": [[0.1, 0.2], [0.3, -0.1]], + }, + "kmeans": { + "best_k": 3, "silhouette": 0.27, + "scores_by_k": [{"k": 2, "silhouette": 0.21}, {"k": 3, "silhouette": 0.27}], + "cluster_sizes": [700, 500, 399], + "centers": [[0.1, 0.2, 0.3, 0.4]], + "n_rows_used": 1599, "n_features": 4, + }, + "outliers": { + "n_outliers": 80, "outlier_pct": 5.0, "threshold": -0.0123, + "n_rows_used": 1599, + }, + "normality": normality, + "note": "", + "_feats": feats, + } + + +def _cluster_projection() -> dict: + # 30 points across 3 clusters, aligned points<->labels. + points, labels = [], [] + centers = [(-2.0, -2.0), (2.0, 0.0), (0.0, 2.5)] + for cl, (cx, cy) in enumerate(centers): + for j in range(10): + points.append([cx + (j - 5) * 0.05, cy + (j - 5) * 0.05]) + labels.append(cl) + return { + "points": points, "labels": labels, + "centers_2d": [list(c) for c in centers], + "best_k": 3, "silhouette": 0.27, + "explained_2d": [0.41, 0.22], + "cluster_sizes": [10, 10, 10], + "cluster_profiles": [ + {"cluster": 0, "size": 10, "pct": 0.33, + "centroid_original": {"alcohol": 9.5, "ph": 3.5}, + "distinctive": ["alcohol", "ph"], "centroid_z": {"alcohol": -1.2}}, + {"cluster": 1, "size": 10, "pct": 0.33, + "centroid_original": {"alcohol": 12.0, "ph": 3.1}, + "distinctive": ["alcohol"], "centroid_z": {"alcohol": 1.4}}, + {"cluster": 2, "size": 10, "pct": 0.33, + "centroid_original": {"alcohol": 10.5, "ph": 3.8}, + "distinctive": ["ph"], "centroid_z": {"ph": 1.6}}, + ], + "feature_names": ["alcohol", "ph", "fixed_acidity", "sulphates"], + "n_used": 1599, "note": "", + } + + +def _ctx_full() -> dict: + return { + "cluster_projection": _cluster_projection(), + "cluster_titles": [ + {"cluster": 0, "title": "Vinos suaves de baja graduación", + "description": "Alcohol bajo y pH alto; perfil ligero."}, + {"cluster": 1, "title": "Vinos potentes", + "description": "Alta graduación alcohólica."}, + {"cluster": 2, "title": "Vinos de pH elevado", + "description": "Acidez baja relativa al resto."}, + ], + } + + +def _profile() -> dict: + return {"table": "wine", "n_rows": 1599, "n_cols": 12, + "models": _models_block()} + + +def _pdf_text(path: str) -> str: + txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages) + return re.sub(r"\s+", " ", txt) + + +def _pptx_text(path: str) -> str: + prs = Presentation(path) + out = [] + for slide in prs.slides: + for shape in slide.shapes: + if shape.has_text_frame: + out.append(shape.text_frame.text) + return re.sub(r"\s+", " ", " ".join(out)) + + +# --------------------------------------------------------------------------- # +# Golden. +# --------------------------------------------------------------------------- # +def test_golden_build_modelos_bloques_requeridos(): + ch = build_modelos(_profile(), _ctx_full()) + assert ch is not None + assert ch.id == "modelos" and ch.version + # Both figures present: scree plot + cluster scatter. + n_figures = sum(1 for b in ch.blocks if isinstance(b, Figure)) + assert n_figures >= 2 + # Tables present (variance, loadings, sizes, normality). + assert sum(1 for b in ch.blocks if isinstance(b, DataTable)) >= 3 + # Markdown carries the required explanations. + md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown)) + assert "z-score" in md # normalization explained + assert "Isolation Forest" in md # outlier generation explained + assert "silhouette" in md # kmeans + # Per-cluster micro-analysis titles present. + assert "Vinos potentes" in md + assert "Cluster 1" in md + + +def test_golden_render_pdf_muestra_lo_exigido(): + with tempfile.TemporaryDirectory() as d: + out = os.path.join(d, "modelos.pdf") + res = render_automatic_eda_pdf( + _profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()}) + assert res["path"] == out and os.path.exists(out) + ids = [c["id"] for c in res["chapters"]] + assert "modelos" in ids + txt = _pdf_text(out) + for needle in ("Modelos no supervisados", "z-score", "PCA", + "Segmentación", "Isolation Forest", "Normalidad", + "Vinos potentes"): + assert needle in txt, f"falta en PDF: {needle}" + + +def test_golden_render_pptx_muestra_lo_exigido(): + with tempfile.TemporaryDirectory() as d: + out = os.path.join(d, "modelos.pptx") + res = render_automatic_eda_pptx( + _profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()}) + assert res["path"] == out and os.path.exists(out) + assert res["n_slides"] >= 1 + txt = _pptx_text(out) + for needle in ("Modelos no supervisados", "z-score", "Isolation Forest", + "Vinos potentes"): + assert needle in txt, f"falta en PPTX: {needle}" + + +# --------------------------------------------------------------------------- # +# Edges. +# --------------------------------------------------------------------------- # +def test_edge_profile_none_o_vacio_devuelve_none(): + assert build_modelos(None, {}) is None + assert build_modelos({}, {}) is None + assert build_modelos({"n_rows": 5}, None) is None # no 'models' key + + +def test_edge_models_insuficiente_devuelve_none(): + prof = {"table": "tiny", "models": { + "n_numeric_cols": 1, + "pca": {"n_components": 0, "explained_variance_ratio": [], + "note": "datos insuficientes"}, + "kmeans": {"best_k": 0, "note": "datos insuficientes"}, + "outliers": {"n_outliers": 0, "note": "datos insuficientes"}, + "normality": None, + "note": "insuficientes columnas numericas para modelos multivariantes", + }} + assert build_modelos(prof, {}) is None + + +def test_edge_solo_normalidad_si_genera_capitulo(): + # A single numeric column: only normality applies. Chapter must still build. + prof = {"table": "one", "models": { + "n_numeric_cols": 1, "pca": None, "kmeans": None, "outliers": None, + "normality": {"x": {"n": 500, "jarque_bera": {"stat": 1.0, "p": 0.2, + "normal": True}, "dagostino": {"stat": 1.0, "p": 0.3, + "normal": True}, "shapiro": {"stat": 0.99, "p": 0.4, + "normal": True}, "is_normal": True}}, + }} + ch = build_modelos(prof, {}) + assert ch is not None + md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown)) + assert "z-score" in md # normalization intro still present + + +def test_edge_kmeans_sin_proyeccion_degrada_sin_romper(): + # kmeans stats present but no cluster_projection / raw_numeric to colour by. + prof = _profile() + ch = build_modelos(prof, {}) # no ctx projection + assert ch is not None + # No scatter figure for clusters, but a Note explaining the degradation. + notes = [b.text for b in ch.blocks if b.kind == "note"] + assert any("ctx['raw_numeric']" in n or "cluster_projection" in n + for n in notes) + # PDF still renders fine. + with tempfile.TemporaryDirectory() as d: + out = os.path.join(d, "deg.pdf") + res = render_automatic_eda_pdf(prof, out, {"write_manifest": False}) + assert res["path"] == out and os.path.exists(out) + + +# --------------------------------------------------------------------------- # +# Anti-cut. +# --------------------------------------------------------------------------- # +def test_anticortes_tabla_normalidad_larga_no_corta(): + # 40 numeric columns → the normality DataTable must split across pages, + # repeating the header, without losing any column name. + prof = {"table": "wide", "models": _models_block(n_norm_cols=40)} + with tempfile.TemporaryDirectory() as d: + out = os.path.join(d, "wide.pdf") + render_automatic_eda_pdf(prof, out, {"write_manifest": False, + "ctx": _ctx_full()}) + reader = PdfReader(out) + n_pages = len(reader.pages) + assert n_pages > 1 + txt = "".join((pg.extract_text() or "") for pg in reader.pages) + # Every column name survives (wrapped/split, never truncated). + for i in (0, 19, 39): + assert f"col_{i}" in txt