Files
fn_registry/python/functions/datascience/automatic_eda/chapters/modelos.py
T
egutierrez 81e8597d21 feat(eda): capitulo MODELOS de AutomaticEDA (markdown, scatter PCA+clusters, micro-LLM)
Implementa chapters/modelos.py (build_modelos / CHAPTER_VERSION) consumiendo
profile['models'] {pca,kmeans,outliers,normality} de run_eda_models. Render
markdown estructurado con bloques anti-corte:

- Intro de normalizacion z-score: por que se estandariza antes de PCA/KMeans (MUST-8.3).
- PCA: scree plot (varianza explicada + acumulada, un solo eje Y) + tablas de
  varianza y cargas principales (SHOULD-8.4).
- Segmentacion KMeans: scatter PCA coloreado por cluster con centroides, en su
  propia pagina/slide (MUST-8.1); tabla de tamaños; micro-analisis LLM por
  cluster con titulo, cada entrada indivisible (MUST-8.2).
- Isolation Forest: explicacion de la deteccion multivariante de outliers y del
  umbral + conteos (MUST-8.3).
- Normalidad: tabla por columna (Jarque-Bera / D'Agostino / Shapiro), pagina sola.

El scatter coloreado y los titulos LLM no estan en el TableProfile, asi que el
capitulo los toma de ctx (cluster_projection precomputado, o raw_numeric para
calcular project_clusters_2d en vivo, o cluster_titles/run_cluster_llm para el
micro-analisis), igual que overview lee head_rows; degrada honesto con una Note
cuando faltan. Devuelve None si el profile no trae bloque models renderizable.

Tests self-contained (sin DuckDB/sklearn/LLM/red): golden PDF+PPTX, edges
(profile None/vacio/insuficiente, kmeans sin proyeccion), anti-corte (tabla de
normalidad de 40 columnas parte repitiendo cabecera sin perder ninguna). 8/8.
Suite del nucleo render_automatic_eda_pdf/pptx sigue verde.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 14:57:43 +02:00

499 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Models chapter (MODELOS) — cheap unsupervised models, rendered as markdown.
Builds the *Modelos* chapter of an AutomaticEDA document from the ``models``
block of a TableProfile (``run_eda_models`` output: ``{pca, kmeans, outliers,
normality}``). It renders, as structured markdown/tables/figures that the core
paginator never cuts:
1. **Normalization note** — every multivariate model below standardizes the
columns with z-score first; the chapter explains why (different scales would
otherwise dominate distance/variance).
2. **PCA** — a scree plot (explained + cumulative variance, single Y axis) plus
variance and top-loadings tables.
3. **KMeans segments** — a PCA scatter **coloured by cluster** (its own
page/slide), the cluster-size table, and a per-cluster LLM micro-analysis
with a title for each segment.
4. **Isolation Forest outliers** — a short explanation of how anomalous rows are
isolated multivariately and how the threshold is chosen, plus the counts.
5. **Normality** — per-column Jarque-Bera / D'Agostino / Shapiro verdicts.
The raw numeric data needed to colour the cluster scatter is **not** in the
TableProfile, so — exactly like ``overview`` reads ``head_rows`` from ``ctx`` —
this chapter looks for the cluster projection / raw numeric columns in ``ctx``
(or in ``profile``) and degrades honestly when they are absent: it falls back to
the uncoloured ``pca.projection`` with a note, or omits the scatter entirely.
ctx keys this chapter consumes (all optional):
cluster_projection : dict — a pre-computed ``project_clusters_2d`` result
(``points``/``labels``/``centers_2d``/``cluster_profiles``/...). Used
directly when present (forward-compatible with the calculation phase).
raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present
and ``cluster_projection`` is not, the chapter calls
``project_clusters_2d`` live to build points + aligned labels.
cluster_titles : list — pre-computed ``[{cluster, title, description}]``
(a ``describe_clusters_llm`` ``clusters`` list). Used for the per-cluster
micro-analysis without an LLM call (offline/tests).
run_cluster_llm : bool — when True and ``cluster_titles`` is absent, call
``describe_clusters_llm`` live on the cluster profiles.
cluster_llm_model : str — model id for the live LLM call.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
"""
from __future__ import annotations
from .. import model
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "modelos"
CHAPTER_TITLE = "Modelos"
# Tableau-10 palette (matplotlib's default cycle) — used both for the matplotlib
# scatter and to keep the legend/colours stable per cluster index.
_CLUSTER_COLORS = [
"#4e79a7", "#f28e2b", "#e15759", "#76b7b2", "#59a14f",
"#edc948", "#b07aa1", "#ff9da7", "#9c755f", "#bab0ac",
]
# --------------------------------------------------------------------------- #
# Formatting helpers (mirror the overview chapter's defensive style).
# --------------------------------------------------------------------------- #
def _fmt_num(value, decimals: int = 3) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "" if value else "no"
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return model._safe_str(value)
def _fmt_pct_ratio(value, decimals: int = 1) -> str:
"""Format a 0..1 ratio as a percentage."""
if value is None:
return ""
try:
return f"{float(value) * 100:.{decimals}f}%"
except (TypeError, ValueError):
return model._safe_str(value)
def _fmt_pct_already(value, decimals: int = 2) -> str:
"""Format a value that is *already* a 0..100 percentage."""
if value is None:
return ""
try:
return f"{float(value):.{decimals}f}%"
except (TypeError, ValueError):
return model._safe_str(value)
def _is_dict(v) -> bool:
return isinstance(v, dict)
# --------------------------------------------------------------------------- #
# Cluster projection: prefer a pre-computed result, else compute it live, else
# fall back to the uncoloured PCA projection.
# --------------------------------------------------------------------------- #
def _resolve_cluster_projection(profile: dict, ctx: dict):
"""Return (projection_dict_or_None, source_label).
Order: ctx/profile['cluster_projection'] (pre-computed) → live
project_clusters_2d on ctx/profile['raw_numeric'] → None.
"""
pre = ctx.get("cluster_projection") or profile.get("cluster_projection")
models = profile.get("models") if _is_dict(profile.get("models")) else {}
if not pre and _is_dict(models):
pre = models.get("cluster_projection")
if _is_dict(pre) and pre.get("points"):
return pre, "precomputed"
raw = ctx.get("raw_numeric") or profile.get("raw_numeric")
if _is_dict(raw) and raw:
try:
# Import the submodule's function explicitly (avoid the package
# attribute shadowing the function with the same-named module).
from datascience.project_clusters_2d import project_clusters_2d
proj = project_clusters_2d(raw)
if _is_dict(proj) and proj.get("points"):
return proj, "live"
except Exception: # noqa: BLE001 — never break the chapter.
return None, "none"
return None, "none"
def _cluster_titles(profile: dict, ctx: dict, projection: dict):
"""Return a list of {cluster, title, description} for the segments.
Order: ctx['cluster_titles'] (pre-computed) → live describe_clusters_llm when
ctx['run_cluster_llm'] and we have cluster_profiles → derived titles from the
distinctive features → None.
"""
pre = ctx.get("cluster_titles")
if isinstance(pre, list) and pre:
return [c for c in pre if _is_dict(c)]
profiles = (projection or {}).get("cluster_profiles") or []
feats = (projection or {}).get("feature_names") or []
if ctx.get("run_cluster_llm") and profiles:
try:
from datascience.describe_clusters_llm import describe_clusters_llm
out = describe_clusters_llm(
profiles, feats,
model=ctx.get("cluster_llm_model", "claude-haiku-4-5-20251001"))
clusters = (out or {}).get("clusters")
if isinstance(clusters, list) and clusters:
return [c for c in clusters if _is_dict(c)]
except Exception: # noqa: BLE001
pass
# Derived fallback: name each cluster by its distinctive features.
if profiles:
derived = []
for p in profiles:
if not _is_dict(p):
continue
cid = p.get("cluster", len(derived))
dist = p.get("distinctive") or []
label = ", ".join(model._safe_str(d) for d in dist[:2]) if dist else ""
title = f"Segmento {cid}" + (f"{label}" if label else "")
derived.append({"cluster": cid, "title": title, "description": ""})
if derived:
return derived
return None
# --------------------------------------------------------------------------- #
# Figure builders (lazy: matplotlib only imported when the renderer draws them).
# --------------------------------------------------------------------------- #
def _make_scree(pca: dict):
"""Return a zero-arg callable drawing the PCA scree plot, or None."""
evr = pca.get("explained_variance_ratio") or []
cum = pca.get("cumulative") or []
if not evr:
return None
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
comps = list(range(1, len(evr) + 1))
fig, ax = plt.subplots(figsize=(7.0, 4.2))
ax.bar(comps, evr, color="#4e79a7", alpha=0.85,
label="Varianza explicada")
if cum:
ax.plot(comps[:len(cum)], cum, color="#e15759", marker="o",
linewidth=1.8, label="Acumulada")
ax.set_xlabel("Componente principal")
ax.set_ylabel("Proporción de varianza")
ax.set_xticks(comps)
ax.set_ylim(0, 1.0)
ax.grid(axis="y", color="#dddddd", linewidth=0.6)
ax.legend(loc="best", fontsize=8, frameon=False)
ax.set_title("Varianza explicada por componente (PCA)", fontsize=10)
fig.tight_layout()
return fig
return _draw
def _make_cluster_scatter(projection: dict):
"""Return a zero-arg callable drawing the cluster scatter, or None."""
points = projection.get("points") or []
labels = projection.get("labels") or []
if not points or len(points) != len(labels):
return None
centers = projection.get("centers_2d") or []
explained = projection.get("explained_2d") or []
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(7.0, 5.2))
uniq = sorted(set(int(l) for l in labels))
for cl in uniq:
xs = [p[0] for p, l in zip(points, labels) if int(l) == cl]
ys = [p[1] for p, l in zip(points, labels) if int(l) == cl]
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
ax.scatter(xs, ys, s=14, c=color, alpha=0.7, linewidths=0,
label=f"Cluster {cl} (n={len(xs)})")
for cl, c in enumerate(centers):
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
ax.scatter([c[0]], [c[1]], s=180, c=color, marker="X",
edgecolors="black", linewidths=1.2, zorder=5)
xlab, ylab = "PC1", "PC2"
if len(explained) >= 2:
xlab = f"PC1 ({_fmt_pct_ratio(explained[0])} var.)"
ylab = f"PC2 ({_fmt_pct_ratio(explained[1])} var.)"
ax.set_xlabel(xlab)
ax.set_ylabel(ylab)
ax.set_title("Segmentos KMeans proyectados sobre el plano PCA",
fontsize=10)
ax.grid(color="#eeeeee", linewidth=0.5)
ax.legend(loc="best", fontsize=8, frameon=True, framealpha=0.9)
fig.tight_layout()
return fig
return _draw
# --------------------------------------------------------------------------- #
# Section builders. Each returns a list of blocks (possibly empty).
# --------------------------------------------------------------------------- #
def _normalization_intro() -> list:
text = (
"Estos modelos son **no supervisados**: buscan estructura latente sin "
"una variable objetivo. Antes de aplicarlos, todas las columnas "
"numéricas se **estandarizan con z-score** (cada valor menos la media, "
"dividido por la desviación típica). Sin esta normalización, una "
"variable con escala grande (p.ej. ingresos en euros) dominaría las "
"distancias y la varianza frente a otra de escala pequeña (p.ej. un "
"ratio entre 0 y 1), sesgando tanto el PCA como el KMeans. Tras la "
"estandarización todas las variables pesan por igual."
)
return [model.Heading(text="Modelos no supervisados", level=1),
model.Markdown(text=text)]
def _pca_section(pca: dict) -> list:
if not _is_dict(pca) or not pca.get("explained_variance_ratio"):
return []
blocks = [model.Heading(text="PCA — varianza explicada", level=2)]
n_used = pca.get("n_rows_used")
n_feat = pca.get("n_features")
intro = (
f"El PCA resume {_fmt_num(n_feat)} variables numéricas en componentes "
f"ortogonales ordenados por la varianza que capturan "
f"({_fmt_num(n_used)} filas usadas tras eliminar nulos). El gráfico de "
"sedimentación (scree) muestra cuánta varianza aporta cada componente y "
"su acumulado: un codo marca cuántos componentes bastan."
)
blocks.append(model.Markdown(text=intro))
scree = _make_scree(pca)
if scree is not None:
blocks.append(model.Figure(
make=scree, caption="Varianza explicada y acumulada por componente."))
evr = pca.get("explained_variance_ratio") or []
cum = pca.get("cumulative") or []
rows = []
for i, v in enumerate(evr):
acc = cum[i] if i < len(cum) else None
rows.append([f"PC{i + 1}", _fmt_pct_ratio(v), _fmt_pct_ratio(acc)])
if rows:
blocks.append(model.DataTable(
header=["Componente", "Varianza", "Acumulada"], rows=rows,
title="Varianza por componente"))
# Top loadings: keep the strongest features per component (capped).
loadings = pca.get("top_loadings") or []
if loadings:
per_comp: dict = {}
for ld in loadings:
if not _is_dict(ld):
continue
comp = ld.get("component")
per_comp.setdefault(comp, [])
if len(per_comp[comp]) < 4:
per_comp[comp].append(ld)
rows = []
for comp in sorted(per_comp.keys(), key=lambda x: (x is None, x)):
for ld in per_comp[comp]:
rows.append([f"PC{int(comp) + 1}" if comp is not None else "",
model._safe_str(ld.get("feature")),
_fmt_num(ld.get("loading"))])
if rows:
blocks.append(model.DataTable(
header=["Componente", "Variable", "Carga"], rows=rows,
title="Cargas principales (top por componente)",
note="Cargas con mayor valor absoluto: qué variables definen "
"cada eje."))
return blocks
def _kmeans_section(kmeans: dict, projection: dict, titles) -> list:
has_km = _is_dict(kmeans) and kmeans.get("best_k")
has_proj = _is_dict(projection) and projection.get("points")
if not has_km and not has_proj:
return []
blocks = [model.Heading(text="Segmentación (KMeans)", level=2)]
best_k = (projection or {}).get("best_k") or (kmeans or {}).get("best_k")
sil = (projection or {}).get("silhouette")
if sil is None:
sil = (kmeans or {}).get("silhouette")
intro = (
f"KMeans agrupa las filas en **{_fmt_num(best_k)} segmentos** elegidos "
"automáticamente maximizando el coeficiente de *silhouette* "
f"(**{_fmt_num(sil)}**, rango 1 a 1: cuanto más alto, segmentos más "
"compactos y separados). Los segmentos se proyectan sobre el plano de "
"los dos primeros componentes principales para visualizarlos."
)
blocks.append(model.Markdown(text=intro))
if has_proj:
scatter = _make_cluster_scatter(projection)
if scatter is not None:
blocks.append(model.Figure(
make=scatter,
caption="Cada punto es una fila coloreada por su segmento "
"KMeans; las «X» son los centroides."))
else:
blocks.append(model.Note(
"Proyección de clusters no dibujable (puntos y etiquetas "
"desalineados)."))
else:
# We have kmeans stats but no aligned points+labels to colour by.
blocks.append(model.Note(
"Scatter coloreado por segmento no disponible: el perfil no incluye "
"la proyección con etiquetas alineadas (pásala en "
"ctx['cluster_projection'] o las columnas crudas en "
"ctx['raw_numeric'] para colorear el plano PCA)."))
# Cluster sizes table.
sizes = (projection or {}).get("cluster_sizes") or (kmeans or {}).get("cluster_sizes") or []
total = sum(s for s in sizes if isinstance(s, (int, float))) or 0
if sizes:
rows = []
for i, s in enumerate(sizes):
pct = (s / total) if total else None
rows.append([f"Cluster {i}", _fmt_num(s), _fmt_pct_ratio(pct)])
blocks.append(model.DataTable(
header=["Segmento", "Tamaño", "% del total"], rows=rows,
title="Tamaño de cada segmento"))
# Per-cluster LLM micro-analysis (each entry kept indivisible as one block).
if titles:
blocks.append(model.Heading(text="Interpretación de los segmentos",
level=3))
for t in titles:
if not _is_dict(t):
continue
cid = t.get("cluster")
title = model._safe_str(t.get("title")) or f"Cluster {cid}"
desc = model._safe_str(t.get("description"))
line = f"**Cluster {cid}{title}.**"
if desc:
line += " " + desc
blocks.append(model.Markdown(text=line))
return blocks
def _outliers_section(outliers: dict) -> list:
if not _is_dict(outliers) or outliers.get("n_outliers") is None:
return []
if outliers.get("note") and not outliers.get("n_rows_used"):
# insufficient data — nothing meaningful to show.
return []
blocks = [model.Heading(text="Detección de anomalías (Isolation Forest)",
level=2)]
explain = (
"**Isolation Forest** detecta filas anómalas de forma *multivariante*: "
"construye árboles que parten el espacio con cortes aleatorios y mide "
"cuántos cortes hacen falta para aislar cada fila. Las filas raras "
"(combinaciones de valores poco frecuentes considerando **todas las "
"columnas a la vez**, no una sola) se aíslan con muy pocos cortes y "
"obtienen un score bajo. El **umbral** de decisión separa las filas "
"normales de las anómalas según la contaminación esperada del modelo: "
"una fila es outlier cuando su score queda por debajo de ese umbral."
)
blocks.append(model.Markdown(text=explain))
blocks.append(model.KVTable(rows=[
("Filas analizadas", _fmt_num(outliers.get("n_rows_used"))),
("Outliers detectados", _fmt_num(outliers.get("n_outliers"))),
("% outliers", _fmt_pct_already(outliers.get("outlier_pct"))),
("Umbral de decisión", _fmt_num(outliers.get("threshold"), 4)),
], title="Anomalías multivariantes"))
return blocks
def _normality_section(normality: dict) -> list:
if not _is_dict(normality) or not normality:
return []
header = ["Columna", "Jarque-Bera (p)", "D'Agostino (p)", "Shapiro (p)",
"¿Normal?"]
rows = []
for col, res in normality.items():
if not _is_dict(res):
continue
jb = res.get("jarque_bera") if _is_dict(res.get("jarque_bera")) else {}
da = res.get("dagostino") if _is_dict(res.get("dagostino")) else {}
sh = res.get("shapiro") if _is_dict(res.get("shapiro")) else {}
is_norm = res.get("is_normal")
if res.get("note") and is_norm is None and not jb:
rows.append([model._safe_str(col), "", "", "",
model._safe_str(res.get("note"))])
continue
rows.append([
model._safe_str(col),
_fmt_num(jb.get("p"), 4) if jb else "",
_fmt_num(da.get("p"), 4) if da else "",
_fmt_num(sh.get("p"), 4) if sh else "",
"" if is_norm else ("no" if is_norm is not None else ""),
])
if not rows:
return []
return [
model.Heading(text="Normalidad de las variables", level=2),
model.Markdown(text=(
"Tests de hipótesis de normalidad por columna (hipótesis nula: la "
"muestra proviene de una distribución normal). Se marca **normal** "
"cuando el p-valor supera 0,05 (no se rechaza la nula). Pocas "
"variables reales son estrictamente normales; esto orienta qué "
"transformaciones o tests robustos aplicar después.")),
model.DataTable(header=header, rows=rows,
title="Pruebas de normalidad"),
]
# --------------------------------------------------------------------------- #
# Entry point.
# --------------------------------------------------------------------------- #
def build_modelos(profile: dict, ctx: dict):
"""Build the MODELOS Chapter, or None if there are no models to show."""
profile = profile or {}
ctx = ctx or {}
if not isinstance(profile, dict):
return None
models = profile.get("models")
if not _is_dict(models):
return None
pca = models.get("pca") if _is_dict(models.get("pca")) else None
kmeans = models.get("kmeans") if _is_dict(models.get("kmeans")) else None
outliers = models.get("outliers") if _is_dict(models.get("outliers")) else None
normality = models.get("normality") if _is_dict(models.get("normality")) else None
projection, _src = _resolve_cluster_projection(profile, ctx)
titles = _cluster_titles(profile, ctx, projection) if (
(kmeans and kmeans.get("best_k")) or (projection and projection.get("points"))
) else None
sections = []
sections += _pca_section(pca) if pca else []
sections += _kmeans_section(kmeans, projection, titles)
sections += _outliers_section(outliers) if outliers else []
sections += _normality_section(normality) if normality else []
if not sections:
return None # models block present but nothing renderable.
blocks = _normalization_intro() + sections
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)