feat(eda): capitulo MODELOS de AutomaticEDA (markdown, scatter PCA+clusters, micro-LLM)

Implementa chapters/modelos.py (build_modelos / CHAPTER_VERSION) consumiendo
profile['models'] {pca,kmeans,outliers,normality} de run_eda_models. Render
markdown estructurado con bloques anti-corte:

- Intro de normalizacion z-score: por que se estandariza antes de PCA/KMeans (MUST-8.3).
- PCA: scree plot (varianza explicada + acumulada, un solo eje Y) + tablas de
  varianza y cargas principales (SHOULD-8.4).
- Segmentacion KMeans: scatter PCA coloreado por cluster con centroides, en su
  propia pagina/slide (MUST-8.1); tabla de tamaños; micro-analisis LLM por
  cluster con titulo, cada entrada indivisible (MUST-8.2).
- Isolation Forest: explicacion de la deteccion multivariante de outliers y del
  umbral + conteos (MUST-8.3).
- Normalidad: tabla por columna (Jarque-Bera / D'Agostino / Shapiro), pagina sola.

El scatter coloreado y los titulos LLM no estan en el TableProfile, asi que el
capitulo los toma de ctx (cluster_projection precomputado, o raw_numeric para
calcular project_clusters_2d en vivo, o cluster_titles/run_cluster_llm para el
micro-analisis), igual que overview lee head_rows; degrada honesto con una Note
cuando faltan. Devuelve None si el profile no trae bloque models renderizable.

Tests self-contained (sin DuckDB/sklearn/LLM/red): golden PDF+PPTX, edges
(profile None/vacio/insuficiente, kmeans sin proyeccion), anti-corte (tabla de
normalidad de 40 columnas parte repitiendo cabecera sin perder ninguna). 8/8.
Suite del nucleo render_automatic_eda_pdf/pptx sigue verde.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-30 14:57:43 +02:00
parent 4de071f2f9
commit 81e8597d21
2 changed files with 757 additions and 0 deletions
@@ -0,0 +1,498 @@
"""Models chapter (MODELOS) — cheap unsupervised models, rendered as markdown.
Builds the *Modelos* chapter of an AutomaticEDA document from the ``models``
block of a TableProfile (``run_eda_models`` output: ``{pca, kmeans, outliers,
normality}``). It renders, as structured markdown/tables/figures that the core
paginator never cuts:
1. **Normalization note** — every multivariate model below standardizes the
columns with z-score first; the chapter explains why (different scales would
otherwise dominate distance/variance).
2. **PCA** — a scree plot (explained + cumulative variance, single Y axis) plus
variance and top-loadings tables.
3. **KMeans segments** — a PCA scatter **coloured by cluster** (its own
page/slide), the cluster-size table, and a per-cluster LLM micro-analysis
with a title for each segment.
4. **Isolation Forest outliers** — a short explanation of how anomalous rows are
isolated multivariately and how the threshold is chosen, plus the counts.
5. **Normality** — per-column Jarque-Bera / D'Agostino / Shapiro verdicts.
The raw numeric data needed to colour the cluster scatter is **not** in the
TableProfile, so — exactly like ``overview`` reads ``head_rows`` from ``ctx`` —
this chapter looks for the cluster projection / raw numeric columns in ``ctx``
(or in ``profile``) and degrades honestly when they are absent: it falls back to
the uncoloured ``pca.projection`` with a note, or omits the scatter entirely.
ctx keys this chapter consumes (all optional):
cluster_projection : dict — a pre-computed ``project_clusters_2d`` result
(``points``/``labels``/``centers_2d``/``cluster_profiles``/...). Used
directly when present (forward-compatible with the calculation phase).
raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present
and ``cluster_projection`` is not, the chapter calls
``project_clusters_2d`` live to build points + aligned labels.
cluster_titles : list — pre-computed ``[{cluster, title, description}]``
(a ``describe_clusters_llm`` ``clusters`` list). Used for the per-cluster
micro-analysis without an LLM call (offline/tests).
run_cluster_llm : bool — when True and ``cluster_titles`` is absent, call
``describe_clusters_llm`` live on the cluster profiles.
cluster_llm_model : str — model id for the live LLM call.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
"""
from __future__ import annotations
from .. import model
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "modelos"
CHAPTER_TITLE = "Modelos"
# Tableau-10 palette (matplotlib's default cycle) — used both for the matplotlib
# scatter and to keep the legend/colours stable per cluster index.
_CLUSTER_COLORS = [
"#4e79a7", "#f28e2b", "#e15759", "#76b7b2", "#59a14f",
"#edc948", "#b07aa1", "#ff9da7", "#9c755f", "#bab0ac",
]
# --------------------------------------------------------------------------- #
# Formatting helpers (mirror the overview chapter's defensive style).
# --------------------------------------------------------------------------- #
def _fmt_num(value, decimals: int = 3) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "" if value else "no"
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return model._safe_str(value)
def _fmt_pct_ratio(value, decimals: int = 1) -> str:
"""Format a 0..1 ratio as a percentage."""
if value is None:
return ""
try:
return f"{float(value) * 100:.{decimals}f}%"
except (TypeError, ValueError):
return model._safe_str(value)
def _fmt_pct_already(value, decimals: int = 2) -> str:
"""Format a value that is *already* a 0..100 percentage."""
if value is None:
return ""
try:
return f"{float(value):.{decimals}f}%"
except (TypeError, ValueError):
return model._safe_str(value)
def _is_dict(v) -> bool:
return isinstance(v, dict)
# --------------------------------------------------------------------------- #
# Cluster projection: prefer a pre-computed result, else compute it live, else
# fall back to the uncoloured PCA projection.
# --------------------------------------------------------------------------- #
def _resolve_cluster_projection(profile: dict, ctx: dict):
"""Return (projection_dict_or_None, source_label).
Order: ctx/profile['cluster_projection'] (pre-computed) → live
project_clusters_2d on ctx/profile['raw_numeric'] → None.
"""
pre = ctx.get("cluster_projection") or profile.get("cluster_projection")
models = profile.get("models") if _is_dict(profile.get("models")) else {}
if not pre and _is_dict(models):
pre = models.get("cluster_projection")
if _is_dict(pre) and pre.get("points"):
return pre, "precomputed"
raw = ctx.get("raw_numeric") or profile.get("raw_numeric")
if _is_dict(raw) and raw:
try:
# Import the submodule's function explicitly (avoid the package
# attribute shadowing the function with the same-named module).
from datascience.project_clusters_2d import project_clusters_2d
proj = project_clusters_2d(raw)
if _is_dict(proj) and proj.get("points"):
return proj, "live"
except Exception: # noqa: BLE001 — never break the chapter.
return None, "none"
return None, "none"
def _cluster_titles(profile: dict, ctx: dict, projection: dict):
"""Return a list of {cluster, title, description} for the segments.
Order: ctx['cluster_titles'] (pre-computed) → live describe_clusters_llm when
ctx['run_cluster_llm'] and we have cluster_profiles → derived titles from the
distinctive features → None.
"""
pre = ctx.get("cluster_titles")
if isinstance(pre, list) and pre:
return [c for c in pre if _is_dict(c)]
profiles = (projection or {}).get("cluster_profiles") or []
feats = (projection or {}).get("feature_names") or []
if ctx.get("run_cluster_llm") and profiles:
try:
from datascience.describe_clusters_llm import describe_clusters_llm
out = describe_clusters_llm(
profiles, feats,
model=ctx.get("cluster_llm_model", "claude-haiku-4-5-20251001"))
clusters = (out or {}).get("clusters")
if isinstance(clusters, list) and clusters:
return [c for c in clusters if _is_dict(c)]
except Exception: # noqa: BLE001
pass
# Derived fallback: name each cluster by its distinctive features.
if profiles:
derived = []
for p in profiles:
if not _is_dict(p):
continue
cid = p.get("cluster", len(derived))
dist = p.get("distinctive") or []
label = ", ".join(model._safe_str(d) for d in dist[:2]) if dist else ""
title = f"Segmento {cid}" + (f"{label}" if label else "")
derived.append({"cluster": cid, "title": title, "description": ""})
if derived:
return derived
return None
# --------------------------------------------------------------------------- #
# Figure builders (lazy: matplotlib only imported when the renderer draws them).
# --------------------------------------------------------------------------- #
def _make_scree(pca: dict):
"""Return a zero-arg callable drawing the PCA scree plot, or None."""
evr = pca.get("explained_variance_ratio") or []
cum = pca.get("cumulative") or []
if not evr:
return None
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
comps = list(range(1, len(evr) + 1))
fig, ax = plt.subplots(figsize=(7.0, 4.2))
ax.bar(comps, evr, color="#4e79a7", alpha=0.85,
label="Varianza explicada")
if cum:
ax.plot(comps[:len(cum)], cum, color="#e15759", marker="o",
linewidth=1.8, label="Acumulada")
ax.set_xlabel("Componente principal")
ax.set_ylabel("Proporción de varianza")
ax.set_xticks(comps)
ax.set_ylim(0, 1.0)
ax.grid(axis="y", color="#dddddd", linewidth=0.6)
ax.legend(loc="best", fontsize=8, frameon=False)
ax.set_title("Varianza explicada por componente (PCA)", fontsize=10)
fig.tight_layout()
return fig
return _draw
def _make_cluster_scatter(projection: dict):
"""Return a zero-arg callable drawing the cluster scatter, or None."""
points = projection.get("points") or []
labels = projection.get("labels") or []
if not points or len(points) != len(labels):
return None
centers = projection.get("centers_2d") or []
explained = projection.get("explained_2d") or []
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(7.0, 5.2))
uniq = sorted(set(int(l) for l in labels))
for cl in uniq:
xs = [p[0] for p, l in zip(points, labels) if int(l) == cl]
ys = [p[1] for p, l in zip(points, labels) if int(l) == cl]
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
ax.scatter(xs, ys, s=14, c=color, alpha=0.7, linewidths=0,
label=f"Cluster {cl} (n={len(xs)})")
for cl, c in enumerate(centers):
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
ax.scatter([c[0]], [c[1]], s=180, c=color, marker="X",
edgecolors="black", linewidths=1.2, zorder=5)
xlab, ylab = "PC1", "PC2"
if len(explained) >= 2:
xlab = f"PC1 ({_fmt_pct_ratio(explained[0])} var.)"
ylab = f"PC2 ({_fmt_pct_ratio(explained[1])} var.)"
ax.set_xlabel(xlab)
ax.set_ylabel(ylab)
ax.set_title("Segmentos KMeans proyectados sobre el plano PCA",
fontsize=10)
ax.grid(color="#eeeeee", linewidth=0.5)
ax.legend(loc="best", fontsize=8, frameon=True, framealpha=0.9)
fig.tight_layout()
return fig
return _draw
# --------------------------------------------------------------------------- #
# Section builders. Each returns a list of blocks (possibly empty).
# --------------------------------------------------------------------------- #
def _normalization_intro() -> list:
text = (
"Estos modelos son **no supervisados**: buscan estructura latente sin "
"una variable objetivo. Antes de aplicarlos, todas las columnas "
"numéricas se **estandarizan con z-score** (cada valor menos la media, "
"dividido por la desviación típica). Sin esta normalización, una "
"variable con escala grande (p.ej. ingresos en euros) dominaría las "
"distancias y la varianza frente a otra de escala pequeña (p.ej. un "
"ratio entre 0 y 1), sesgando tanto el PCA como el KMeans. Tras la "
"estandarización todas las variables pesan por igual."
)
return [model.Heading(text="Modelos no supervisados", level=1),
model.Markdown(text=text)]
def _pca_section(pca: dict) -> list:
if not _is_dict(pca) or not pca.get("explained_variance_ratio"):
return []
blocks = [model.Heading(text="PCA — varianza explicada", level=2)]
n_used = pca.get("n_rows_used")
n_feat = pca.get("n_features")
intro = (
f"El PCA resume {_fmt_num(n_feat)} variables numéricas en componentes "
f"ortogonales ordenados por la varianza que capturan "
f"({_fmt_num(n_used)} filas usadas tras eliminar nulos). El gráfico de "
"sedimentación (scree) muestra cuánta varianza aporta cada componente y "
"su acumulado: un codo marca cuántos componentes bastan."
)
blocks.append(model.Markdown(text=intro))
scree = _make_scree(pca)
if scree is not None:
blocks.append(model.Figure(
make=scree, caption="Varianza explicada y acumulada por componente."))
evr = pca.get("explained_variance_ratio") or []
cum = pca.get("cumulative") or []
rows = []
for i, v in enumerate(evr):
acc = cum[i] if i < len(cum) else None
rows.append([f"PC{i + 1}", _fmt_pct_ratio(v), _fmt_pct_ratio(acc)])
if rows:
blocks.append(model.DataTable(
header=["Componente", "Varianza", "Acumulada"], rows=rows,
title="Varianza por componente"))
# Top loadings: keep the strongest features per component (capped).
loadings = pca.get("top_loadings") or []
if loadings:
per_comp: dict = {}
for ld in loadings:
if not _is_dict(ld):
continue
comp = ld.get("component")
per_comp.setdefault(comp, [])
if len(per_comp[comp]) < 4:
per_comp[comp].append(ld)
rows = []
for comp in sorted(per_comp.keys(), key=lambda x: (x is None, x)):
for ld in per_comp[comp]:
rows.append([f"PC{int(comp) + 1}" if comp is not None else "",
model._safe_str(ld.get("feature")),
_fmt_num(ld.get("loading"))])
if rows:
blocks.append(model.DataTable(
header=["Componente", "Variable", "Carga"], rows=rows,
title="Cargas principales (top por componente)",
note="Cargas con mayor valor absoluto: qué variables definen "
"cada eje."))
return blocks
def _kmeans_section(kmeans: dict, projection: dict, titles) -> list:
has_km = _is_dict(kmeans) and kmeans.get("best_k")
has_proj = _is_dict(projection) and projection.get("points")
if not has_km and not has_proj:
return []
blocks = [model.Heading(text="Segmentación (KMeans)", level=2)]
best_k = (projection or {}).get("best_k") or (kmeans or {}).get("best_k")
sil = (projection or {}).get("silhouette")
if sil is None:
sil = (kmeans or {}).get("silhouette")
intro = (
f"KMeans agrupa las filas en **{_fmt_num(best_k)} segmentos** elegidos "
"automáticamente maximizando el coeficiente de *silhouette* "
f"(**{_fmt_num(sil)}**, rango 1 a 1: cuanto más alto, segmentos más "
"compactos y separados). Los segmentos se proyectan sobre el plano de "
"los dos primeros componentes principales para visualizarlos."
)
blocks.append(model.Markdown(text=intro))
if has_proj:
scatter = _make_cluster_scatter(projection)
if scatter is not None:
blocks.append(model.Figure(
make=scatter,
caption="Cada punto es una fila coloreada por su segmento "
"KMeans; las «X» son los centroides."))
else:
blocks.append(model.Note(
"Proyección de clusters no dibujable (puntos y etiquetas "
"desalineados)."))
else:
# We have kmeans stats but no aligned points+labels to colour by.
blocks.append(model.Note(
"Scatter coloreado por segmento no disponible: el perfil no incluye "
"la proyección con etiquetas alineadas (pásala en "
"ctx['cluster_projection'] o las columnas crudas en "
"ctx['raw_numeric'] para colorear el plano PCA)."))
# Cluster sizes table.
sizes = (projection or {}).get("cluster_sizes") or (kmeans or {}).get("cluster_sizes") or []
total = sum(s for s in sizes if isinstance(s, (int, float))) or 0
if sizes:
rows = []
for i, s in enumerate(sizes):
pct = (s / total) if total else None
rows.append([f"Cluster {i}", _fmt_num(s), _fmt_pct_ratio(pct)])
blocks.append(model.DataTable(
header=["Segmento", "Tamaño", "% del total"], rows=rows,
title="Tamaño de cada segmento"))
# Per-cluster LLM micro-analysis (each entry kept indivisible as one block).
if titles:
blocks.append(model.Heading(text="Interpretación de los segmentos",
level=3))
for t in titles:
if not _is_dict(t):
continue
cid = t.get("cluster")
title = model._safe_str(t.get("title")) or f"Cluster {cid}"
desc = model._safe_str(t.get("description"))
line = f"**Cluster {cid}{title}.**"
if desc:
line += " " + desc
blocks.append(model.Markdown(text=line))
return blocks
def _outliers_section(outliers: dict) -> list:
if not _is_dict(outliers) or outliers.get("n_outliers") is None:
return []
if outliers.get("note") and not outliers.get("n_rows_used"):
# insufficient data — nothing meaningful to show.
return []
blocks = [model.Heading(text="Detección de anomalías (Isolation Forest)",
level=2)]
explain = (
"**Isolation Forest** detecta filas anómalas de forma *multivariante*: "
"construye árboles que parten el espacio con cortes aleatorios y mide "
"cuántos cortes hacen falta para aislar cada fila. Las filas raras "
"(combinaciones de valores poco frecuentes considerando **todas las "
"columnas a la vez**, no una sola) se aíslan con muy pocos cortes y "
"obtienen un score bajo. El **umbral** de decisión separa las filas "
"normales de las anómalas según la contaminación esperada del modelo: "
"una fila es outlier cuando su score queda por debajo de ese umbral."
)
blocks.append(model.Markdown(text=explain))
blocks.append(model.KVTable(rows=[
("Filas analizadas", _fmt_num(outliers.get("n_rows_used"))),
("Outliers detectados", _fmt_num(outliers.get("n_outliers"))),
("% outliers", _fmt_pct_already(outliers.get("outlier_pct"))),
("Umbral de decisión", _fmt_num(outliers.get("threshold"), 4)),
], title="Anomalías multivariantes"))
return blocks
def _normality_section(normality: dict) -> list:
if not _is_dict(normality) or not normality:
return []
header = ["Columna", "Jarque-Bera (p)", "D'Agostino (p)", "Shapiro (p)",
"¿Normal?"]
rows = []
for col, res in normality.items():
if not _is_dict(res):
continue
jb = res.get("jarque_bera") if _is_dict(res.get("jarque_bera")) else {}
da = res.get("dagostino") if _is_dict(res.get("dagostino")) else {}
sh = res.get("shapiro") if _is_dict(res.get("shapiro")) else {}
is_norm = res.get("is_normal")
if res.get("note") and is_norm is None and not jb:
rows.append([model._safe_str(col), "", "", "",
model._safe_str(res.get("note"))])
continue
rows.append([
model._safe_str(col),
_fmt_num(jb.get("p"), 4) if jb else "",
_fmt_num(da.get("p"), 4) if da else "",
_fmt_num(sh.get("p"), 4) if sh else "",
"" if is_norm else ("no" if is_norm is not None else ""),
])
if not rows:
return []
return [
model.Heading(text="Normalidad de las variables", level=2),
model.Markdown(text=(
"Tests de hipótesis de normalidad por columna (hipótesis nula: la "
"muestra proviene de una distribución normal). Se marca **normal** "
"cuando el p-valor supera 0,05 (no se rechaza la nula). Pocas "
"variables reales son estrictamente normales; esto orienta qué "
"transformaciones o tests robustos aplicar después.")),
model.DataTable(header=header, rows=rows,
title="Pruebas de normalidad"),
]
# --------------------------------------------------------------------------- #
# Entry point.
# --------------------------------------------------------------------------- #
def build_modelos(profile: dict, ctx: dict):
"""Build the MODELOS Chapter, or None if there are no models to show."""
profile = profile or {}
ctx = ctx or {}
if not isinstance(profile, dict):
return None
models = profile.get("models")
if not _is_dict(models):
return None
pca = models.get("pca") if _is_dict(models.get("pca")) else None
kmeans = models.get("kmeans") if _is_dict(models.get("kmeans")) else None
outliers = models.get("outliers") if _is_dict(models.get("outliers")) else None
normality = models.get("normality") if _is_dict(models.get("normality")) else None
projection, _src = _resolve_cluster_projection(profile, ctx)
titles = _cluster_titles(profile, ctx, projection) if (
(kmeans and kmeans.get("best_k")) or (projection and projection.get("points"))
) else None
sections = []
sections += _pca_section(pca) if pca else []
sections += _kmeans_section(kmeans, projection, titles)
sections += _outliers_section(outliers) if outliers else []
sections += _normality_section(normality) if normality else []
if not sections:
return None # models block present but nothing renderable.
blocks = _normalization_intro() + sections
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -0,0 +1,259 @@
"""Tests for the MODELOS chapter — DoD: golden + edges + anti-cut.
Self-contained: builds a synthetic TableProfile with a ``models`` block (no
DuckDB, no sklearn, no LLM, no network). The cluster scatter is fed a synthetic
pre-computed ``cluster_projection`` via ``ctx`` and the per-cluster titles via
``ctx['cluster_titles']`` so the suite is fast and deterministic. The live paths
(``project_clusters_2d`` / ``describe_clusters_llm``) are exercised against the
real wine dataset in the work report, not here.
Verifies: the chapter renders to PDF *and* PPTX showing the user-required pieces
(markdown text, PCA scree, cluster scatter, per-cluster LLM micro-analysis,
outlier + normalization explanations); that an inapplicable profile yields None
without raising; and that a long normality table is split without losing any
column (anti-cut).
"""
import os
import re
import tempfile
from pypdf import PdfReader
from pptx import Presentation
from datascience.automatic_eda.chapters.modelos import build_modelos
from datascience.automatic_eda.model import Figure, DataTable, Markdown
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
# --------------------------------------------------------------------------- #
# Synthetic fixtures.
# --------------------------------------------------------------------------- #
def _models_block(n_norm_cols: int = 4) -> dict:
feats = ["fixed_acidity", "alcohol", "ph", "sulphates"]
normality = {}
for i in range(n_norm_cols):
normality[f"col_{i}"] = {
"n": 500,
"jarque_bera": {"stat": 12.3, "p": 0.002 + i * 0.0001, "normal": False},
"dagostino": {"stat": 9.1, "p": 0.01, "normal": False},
"shapiro": {"stat": 0.98, "p": 0.04, "normal": False},
"is_normal": False,
}
return {
"n_numeric_cols": 4,
"pca": {
"n_components": 2, "n_rows_used": 1599, "n_features": 4,
"explained_variance_ratio": [0.41, 0.22],
"cumulative": [0.41, 0.63],
"top_loadings": [
{"component": 0, "feature": "alcohol", "loading": 0.62},
{"component": 0, "feature": "fixed_acidity", "loading": -0.48},
{"component": 1, "feature": "ph", "loading": 0.71},
{"component": 1, "feature": "sulphates", "loading": 0.33},
],
"projection": [[0.1, 0.2], [0.3, -0.1]],
},
"kmeans": {
"best_k": 3, "silhouette": 0.27,
"scores_by_k": [{"k": 2, "silhouette": 0.21}, {"k": 3, "silhouette": 0.27}],
"cluster_sizes": [700, 500, 399],
"centers": [[0.1, 0.2, 0.3, 0.4]],
"n_rows_used": 1599, "n_features": 4,
},
"outliers": {
"n_outliers": 80, "outlier_pct": 5.0, "threshold": -0.0123,
"n_rows_used": 1599,
},
"normality": normality,
"note": "",
"_feats": feats,
}
def _cluster_projection() -> dict:
# 30 points across 3 clusters, aligned points<->labels.
points, labels = [], []
centers = [(-2.0, -2.0), (2.0, 0.0), (0.0, 2.5)]
for cl, (cx, cy) in enumerate(centers):
for j in range(10):
points.append([cx + (j - 5) * 0.05, cy + (j - 5) * 0.05])
labels.append(cl)
return {
"points": points, "labels": labels,
"centers_2d": [list(c) for c in centers],
"best_k": 3, "silhouette": 0.27,
"explained_2d": [0.41, 0.22],
"cluster_sizes": [10, 10, 10],
"cluster_profiles": [
{"cluster": 0, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 9.5, "ph": 3.5},
"distinctive": ["alcohol", "ph"], "centroid_z": {"alcohol": -1.2}},
{"cluster": 1, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 12.0, "ph": 3.1},
"distinctive": ["alcohol"], "centroid_z": {"alcohol": 1.4}},
{"cluster": 2, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 10.5, "ph": 3.8},
"distinctive": ["ph"], "centroid_z": {"ph": 1.6}},
],
"feature_names": ["alcohol", "ph", "fixed_acidity", "sulphates"],
"n_used": 1599, "note": "",
}
def _ctx_full() -> dict:
return {
"cluster_projection": _cluster_projection(),
"cluster_titles": [
{"cluster": 0, "title": "Vinos suaves de baja graduación",
"description": "Alcohol bajo y pH alto; perfil ligero."},
{"cluster": 1, "title": "Vinos potentes",
"description": "Alta graduación alcohólica."},
{"cluster": 2, "title": "Vinos de pH elevado",
"description": "Acidez baja relativa al resto."},
],
}
def _profile() -> dict:
return {"table": "wine", "n_rows": 1599, "n_cols": 12,
"models": _models_block()}
def _pdf_text(path: str) -> str:
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
return re.sub(r"\s+", " ", txt)
def _pptx_text(path: str) -> str:
prs = Presentation(path)
out = []
for slide in prs.slides:
for shape in slide.shapes:
if shape.has_text_frame:
out.append(shape.text_frame.text)
return re.sub(r"\s+", " ", " ".join(out))
# --------------------------------------------------------------------------- #
# Golden.
# --------------------------------------------------------------------------- #
def test_golden_build_modelos_bloques_requeridos():
ch = build_modelos(_profile(), _ctx_full())
assert ch is not None
assert ch.id == "modelos" and ch.version
# Both figures present: scree plot + cluster scatter.
n_figures = sum(1 for b in ch.blocks if isinstance(b, Figure))
assert n_figures >= 2
# Tables present (variance, loadings, sizes, normality).
assert sum(1 for b in ch.blocks if isinstance(b, DataTable)) >= 3
# Markdown carries the required explanations.
md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown))
assert "z-score" in md # normalization explained
assert "Isolation Forest" in md # outlier generation explained
assert "silhouette" in md # kmeans
# Per-cluster micro-analysis titles present.
assert "Vinos potentes" in md
assert "Cluster 1" in md
def test_golden_render_pdf_muestra_lo_exigido():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "modelos.pdf")
res = render_automatic_eda_pdf(
_profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()})
assert res["path"] == out and os.path.exists(out)
ids = [c["id"] for c in res["chapters"]]
assert "modelos" in ids
txt = _pdf_text(out)
for needle in ("Modelos no supervisados", "z-score", "PCA",
"Segmentación", "Isolation Forest", "Normalidad",
"Vinos potentes"):
assert needle in txt, f"falta en PDF: {needle}"
def test_golden_render_pptx_muestra_lo_exigido():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "modelos.pptx")
res = render_automatic_eda_pptx(
_profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()})
assert res["path"] == out and os.path.exists(out)
assert res["n_slides"] >= 1
txt = _pptx_text(out)
for needle in ("Modelos no supervisados", "z-score", "Isolation Forest",
"Vinos potentes"):
assert needle in txt, f"falta en PPTX: {needle}"
# --------------------------------------------------------------------------- #
# Edges.
# --------------------------------------------------------------------------- #
def test_edge_profile_none_o_vacio_devuelve_none():
assert build_modelos(None, {}) is None
assert build_modelos({}, {}) is None
assert build_modelos({"n_rows": 5}, None) is None # no 'models' key
def test_edge_models_insuficiente_devuelve_none():
prof = {"table": "tiny", "models": {
"n_numeric_cols": 1,
"pca": {"n_components": 0, "explained_variance_ratio": [],
"note": "datos insuficientes"},
"kmeans": {"best_k": 0, "note": "datos insuficientes"},
"outliers": {"n_outliers": 0, "note": "datos insuficientes"},
"normality": None,
"note": "insuficientes columnas numericas para modelos multivariantes",
}}
assert build_modelos(prof, {}) is None
def test_edge_solo_normalidad_si_genera_capitulo():
# A single numeric column: only normality applies. Chapter must still build.
prof = {"table": "one", "models": {
"n_numeric_cols": 1, "pca": None, "kmeans": None, "outliers": None,
"normality": {"x": {"n": 500, "jarque_bera": {"stat": 1.0, "p": 0.2,
"normal": True}, "dagostino": {"stat": 1.0, "p": 0.3,
"normal": True}, "shapiro": {"stat": 0.99, "p": 0.4,
"normal": True}, "is_normal": True}},
}}
ch = build_modelos(prof, {})
assert ch is not None
md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown))
assert "z-score" in md # normalization intro still present
def test_edge_kmeans_sin_proyeccion_degrada_sin_romper():
# kmeans stats present but no cluster_projection / raw_numeric to colour by.
prof = _profile()
ch = build_modelos(prof, {}) # no ctx projection
assert ch is not None
# No scatter figure for clusters, but a Note explaining the degradation.
notes = [b.text for b in ch.blocks if b.kind == "note"]
assert any("ctx['raw_numeric']" in n or "cluster_projection" in n
for n in notes)
# PDF still renders fine.
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "deg.pdf")
res = render_automatic_eda_pdf(prof, out, {"write_manifest": False})
assert res["path"] == out and os.path.exists(out)
# --------------------------------------------------------------------------- #
# Anti-cut.
# --------------------------------------------------------------------------- #
def test_anticortes_tabla_normalidad_larga_no_corta():
# 40 numeric columns → the normality DataTable must split across pages,
# repeating the header, without losing any column name.
prof = {"table": "wide", "models": _models_block(n_norm_cols=40)}
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "wide.pdf")
render_automatic_eda_pdf(prof, out, {"write_manifest": False,
"ctx": _ctx_full()})
reader = PdfReader(out)
n_pages = len(reader.pages)
assert n_pages > 1
txt = "".join((pg.extract_text() or "") for pg in reader.pages)
# Every column name survives (wrapped/split, never truncated).
for i in (0, 19, 39):
assert f"col_{i}" in txt