Compare commits

..

2 Commits

Author SHA1 Message Date
egutierrez 81e8597d21 feat(eda): capitulo MODELOS de AutomaticEDA (markdown, scatter PCA+clusters, micro-LLM)
Implementa chapters/modelos.py (build_modelos / CHAPTER_VERSION) consumiendo
profile['models'] {pca,kmeans,outliers,normality} de run_eda_models. Render
markdown estructurado con bloques anti-corte:

- Intro de normalizacion z-score: por que se estandariza antes de PCA/KMeans (MUST-8.3).
- PCA: scree plot (varianza explicada + acumulada, un solo eje Y) + tablas de
  varianza y cargas principales (SHOULD-8.4).
- Segmentacion KMeans: scatter PCA coloreado por cluster con centroides, en su
  propia pagina/slide (MUST-8.1); tabla de tamaños; micro-analisis LLM por
  cluster con titulo, cada entrada indivisible (MUST-8.2).
- Isolation Forest: explicacion de la deteccion multivariante de outliers y del
  umbral + conteos (MUST-8.3).
- Normalidad: tabla por columna (Jarque-Bera / D'Agostino / Shapiro), pagina sola.

El scatter coloreado y los titulos LLM no estan en el TableProfile, asi que el
capitulo los toma de ctx (cluster_projection precomputado, o raw_numeric para
calcular project_clusters_2d en vivo, o cluster_titles/run_cluster_llm para el
micro-analisis), igual que overview lee head_rows; degrada honesto con una Note
cuando faltan. Devuelve None si el profile no trae bloque models renderizable.

Tests self-contained (sin DuckDB/sklearn/LLM/red): golden PDF+PPTX, edges
(profile None/vacio/insuficiente, kmeans sin proyeccion), anti-corte (tabla de
normalidad de 40 columnas parte repitiendo cabecera sin perder ninguna). 8/8.
Suite del nucleo render_automatic_eda_pdf/pptx sigue verde.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 14:57:43 +02:00
egutierrez 4de071f2f9 feat(eda): project_clusters_2d + describe_clusters_llm para el capitulo MODELOS
project_clusters_2d (pura): PCA(2)+KMeans sobre el MISMO subset estandarizado,
devolviendo proyeccion 2D y labels alineados por fila + centroides en espacio PCA
+ perfiles de cluster desestandarizados. Es la pieza que garantiza la alineacion
points<->labels que pca_explained y kmeans_segments no cubren (estandarizan por
separado y kmeans descarta los labels). Habilita el scatter PCA coloreado por
cluster (MUST-8.1).

describe_clusters_llm (impura): micro-analisis LLM de los clusters en una sola
llamada a ask_llm (grupo claude-direct), devuelve titulo + descripcion por cluster
con degradacion dict-no-throw a titulos genericos si el LLM no responde (MUST-8.2).

Ambas re-exportadas en datascience/__init__.py. Tests: 6/6 y 9/9 (sin red).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 14:57:27 +02:00
9 changed files with 1688 additions and 0 deletions
+4
View File
@@ -42,6 +42,8 @@ from .isolation_forest_outliers import isolation_forest_outliers
from .normality_tests import normality_tests
from .trend_slope import trend_slope
from .run_eda_models import run_eda_models
from .project_clusters_2d import project_clusters_2d
from .describe_clusters_llm import describe_clusters_llm
from .eda_llm_insights import eda_llm_insights
from .build_eda_notebook import build_eda_notebook
from .decode_qr_image import decode_qr_image
@@ -86,6 +88,8 @@ __all__ = [
"normality_tests",
"trend_slope",
"run_eda_models",
"project_clusters_2d",
"describe_clusters_llm",
"eda_llm_insights",
"build_eda_notebook",
"describe_numeric",
@@ -0,0 +1,498 @@
"""Models chapter (MODELOS) — cheap unsupervised models, rendered as markdown.
Builds the *Modelos* chapter of an AutomaticEDA document from the ``models``
block of a TableProfile (``run_eda_models`` output: ``{pca, kmeans, outliers,
normality}``). It renders, as structured markdown/tables/figures that the core
paginator never cuts:
1. **Normalization note** — every multivariate model below standardizes the
columns with z-score first; the chapter explains why (different scales would
otherwise dominate distance/variance).
2. **PCA** — a scree plot (explained + cumulative variance, single Y axis) plus
variance and top-loadings tables.
3. **KMeans segments** — a PCA scatter **coloured by cluster** (its own
page/slide), the cluster-size table, and a per-cluster LLM micro-analysis
with a title for each segment.
4. **Isolation Forest outliers** — a short explanation of how anomalous rows are
isolated multivariately and how the threshold is chosen, plus the counts.
5. **Normality** — per-column Jarque-Bera / D'Agostino / Shapiro verdicts.
The raw numeric data needed to colour the cluster scatter is **not** in the
TableProfile, so — exactly like ``overview`` reads ``head_rows`` from ``ctx`` —
this chapter looks for the cluster projection / raw numeric columns in ``ctx``
(or in ``profile``) and degrades honestly when they are absent: it falls back to
the uncoloured ``pca.projection`` with a note, or omits the scatter entirely.
ctx keys this chapter consumes (all optional):
cluster_projection : dict — a pre-computed ``project_clusters_2d`` result
(``points``/``labels``/``centers_2d``/``cluster_profiles``/...). Used
directly when present (forward-compatible with the calculation phase).
raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present
and ``cluster_projection`` is not, the chapter calls
``project_clusters_2d`` live to build points + aligned labels.
cluster_titles : list — pre-computed ``[{cluster, title, description}]``
(a ``describe_clusters_llm`` ``clusters`` list). Used for the per-cluster
micro-analysis without an LLM call (offline/tests).
run_cluster_llm : bool — when True and ``cluster_titles`` is absent, call
``describe_clusters_llm`` live on the cluster profiles.
cluster_llm_model : str — model id for the live LLM call.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
"""
from __future__ import annotations
from .. import model
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "modelos"
CHAPTER_TITLE = "Modelos"
# Tableau-10 palette (matplotlib's default cycle) — used both for the matplotlib
# scatter and to keep the legend/colours stable per cluster index.
_CLUSTER_COLORS = [
"#4e79a7", "#f28e2b", "#e15759", "#76b7b2", "#59a14f",
"#edc948", "#b07aa1", "#ff9da7", "#9c755f", "#bab0ac",
]
# --------------------------------------------------------------------------- #
# Formatting helpers (mirror the overview chapter's defensive style).
# --------------------------------------------------------------------------- #
def _fmt_num(value, decimals: int = 3) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "" if value else "no"
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return model._safe_str(value)
def _fmt_pct_ratio(value, decimals: int = 1) -> str:
"""Format a 0..1 ratio as a percentage."""
if value is None:
return ""
try:
return f"{float(value) * 100:.{decimals}f}%"
except (TypeError, ValueError):
return model._safe_str(value)
def _fmt_pct_already(value, decimals: int = 2) -> str:
"""Format a value that is *already* a 0..100 percentage."""
if value is None:
return ""
try:
return f"{float(value):.{decimals}f}%"
except (TypeError, ValueError):
return model._safe_str(value)
def _is_dict(v) -> bool:
return isinstance(v, dict)
# --------------------------------------------------------------------------- #
# Cluster projection: prefer a pre-computed result, else compute it live, else
# fall back to the uncoloured PCA projection.
# --------------------------------------------------------------------------- #
def _resolve_cluster_projection(profile: dict, ctx: dict):
"""Return (projection_dict_or_None, source_label).
Order: ctx/profile['cluster_projection'] (pre-computed) → live
project_clusters_2d on ctx/profile['raw_numeric'] → None.
"""
pre = ctx.get("cluster_projection") or profile.get("cluster_projection")
models = profile.get("models") if _is_dict(profile.get("models")) else {}
if not pre and _is_dict(models):
pre = models.get("cluster_projection")
if _is_dict(pre) and pre.get("points"):
return pre, "precomputed"
raw = ctx.get("raw_numeric") or profile.get("raw_numeric")
if _is_dict(raw) and raw:
try:
# Import the submodule's function explicitly (avoid the package
# attribute shadowing the function with the same-named module).
from datascience.project_clusters_2d import project_clusters_2d
proj = project_clusters_2d(raw)
if _is_dict(proj) and proj.get("points"):
return proj, "live"
except Exception: # noqa: BLE001 — never break the chapter.
return None, "none"
return None, "none"
def _cluster_titles(profile: dict, ctx: dict, projection: dict):
"""Return a list of {cluster, title, description} for the segments.
Order: ctx['cluster_titles'] (pre-computed) → live describe_clusters_llm when
ctx['run_cluster_llm'] and we have cluster_profiles → derived titles from the
distinctive features → None.
"""
pre = ctx.get("cluster_titles")
if isinstance(pre, list) and pre:
return [c for c in pre if _is_dict(c)]
profiles = (projection or {}).get("cluster_profiles") or []
feats = (projection or {}).get("feature_names") or []
if ctx.get("run_cluster_llm") and profiles:
try:
from datascience.describe_clusters_llm import describe_clusters_llm
out = describe_clusters_llm(
profiles, feats,
model=ctx.get("cluster_llm_model", "claude-haiku-4-5-20251001"))
clusters = (out or {}).get("clusters")
if isinstance(clusters, list) and clusters:
return [c for c in clusters if _is_dict(c)]
except Exception: # noqa: BLE001
pass
# Derived fallback: name each cluster by its distinctive features.
if profiles:
derived = []
for p in profiles:
if not _is_dict(p):
continue
cid = p.get("cluster", len(derived))
dist = p.get("distinctive") or []
label = ", ".join(model._safe_str(d) for d in dist[:2]) if dist else ""
title = f"Segmento {cid}" + (f"{label}" if label else "")
derived.append({"cluster": cid, "title": title, "description": ""})
if derived:
return derived
return None
# --------------------------------------------------------------------------- #
# Figure builders (lazy: matplotlib only imported when the renderer draws them).
# --------------------------------------------------------------------------- #
def _make_scree(pca: dict):
"""Return a zero-arg callable drawing the PCA scree plot, or None."""
evr = pca.get("explained_variance_ratio") or []
cum = pca.get("cumulative") or []
if not evr:
return None
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
comps = list(range(1, len(evr) + 1))
fig, ax = plt.subplots(figsize=(7.0, 4.2))
ax.bar(comps, evr, color="#4e79a7", alpha=0.85,
label="Varianza explicada")
if cum:
ax.plot(comps[:len(cum)], cum, color="#e15759", marker="o",
linewidth=1.8, label="Acumulada")
ax.set_xlabel("Componente principal")
ax.set_ylabel("Proporción de varianza")
ax.set_xticks(comps)
ax.set_ylim(0, 1.0)
ax.grid(axis="y", color="#dddddd", linewidth=0.6)
ax.legend(loc="best", fontsize=8, frameon=False)
ax.set_title("Varianza explicada por componente (PCA)", fontsize=10)
fig.tight_layout()
return fig
return _draw
def _make_cluster_scatter(projection: dict):
"""Return a zero-arg callable drawing the cluster scatter, or None."""
points = projection.get("points") or []
labels = projection.get("labels") or []
if not points or len(points) != len(labels):
return None
centers = projection.get("centers_2d") or []
explained = projection.get("explained_2d") or []
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(7.0, 5.2))
uniq = sorted(set(int(l) for l in labels))
for cl in uniq:
xs = [p[0] for p, l in zip(points, labels) if int(l) == cl]
ys = [p[1] for p, l in zip(points, labels) if int(l) == cl]
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
ax.scatter(xs, ys, s=14, c=color, alpha=0.7, linewidths=0,
label=f"Cluster {cl} (n={len(xs)})")
for cl, c in enumerate(centers):
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
ax.scatter([c[0]], [c[1]], s=180, c=color, marker="X",
edgecolors="black", linewidths=1.2, zorder=5)
xlab, ylab = "PC1", "PC2"
if len(explained) >= 2:
xlab = f"PC1 ({_fmt_pct_ratio(explained[0])} var.)"
ylab = f"PC2 ({_fmt_pct_ratio(explained[1])} var.)"
ax.set_xlabel(xlab)
ax.set_ylabel(ylab)
ax.set_title("Segmentos KMeans proyectados sobre el plano PCA",
fontsize=10)
ax.grid(color="#eeeeee", linewidth=0.5)
ax.legend(loc="best", fontsize=8, frameon=True, framealpha=0.9)
fig.tight_layout()
return fig
return _draw
# --------------------------------------------------------------------------- #
# Section builders. Each returns a list of blocks (possibly empty).
# --------------------------------------------------------------------------- #
def _normalization_intro() -> list:
text = (
"Estos modelos son **no supervisados**: buscan estructura latente sin "
"una variable objetivo. Antes de aplicarlos, todas las columnas "
"numéricas se **estandarizan con z-score** (cada valor menos la media, "
"dividido por la desviación típica). Sin esta normalización, una "
"variable con escala grande (p.ej. ingresos en euros) dominaría las "
"distancias y la varianza frente a otra de escala pequeña (p.ej. un "
"ratio entre 0 y 1), sesgando tanto el PCA como el KMeans. Tras la "
"estandarización todas las variables pesan por igual."
)
return [model.Heading(text="Modelos no supervisados", level=1),
model.Markdown(text=text)]
def _pca_section(pca: dict) -> list:
if not _is_dict(pca) or not pca.get("explained_variance_ratio"):
return []
blocks = [model.Heading(text="PCA — varianza explicada", level=2)]
n_used = pca.get("n_rows_used")
n_feat = pca.get("n_features")
intro = (
f"El PCA resume {_fmt_num(n_feat)} variables numéricas en componentes "
f"ortogonales ordenados por la varianza que capturan "
f"({_fmt_num(n_used)} filas usadas tras eliminar nulos). El gráfico de "
"sedimentación (scree) muestra cuánta varianza aporta cada componente y "
"su acumulado: un codo marca cuántos componentes bastan."
)
blocks.append(model.Markdown(text=intro))
scree = _make_scree(pca)
if scree is not None:
blocks.append(model.Figure(
make=scree, caption="Varianza explicada y acumulada por componente."))
evr = pca.get("explained_variance_ratio") or []
cum = pca.get("cumulative") or []
rows = []
for i, v in enumerate(evr):
acc = cum[i] if i < len(cum) else None
rows.append([f"PC{i + 1}", _fmt_pct_ratio(v), _fmt_pct_ratio(acc)])
if rows:
blocks.append(model.DataTable(
header=["Componente", "Varianza", "Acumulada"], rows=rows,
title="Varianza por componente"))
# Top loadings: keep the strongest features per component (capped).
loadings = pca.get("top_loadings") or []
if loadings:
per_comp: dict = {}
for ld in loadings:
if not _is_dict(ld):
continue
comp = ld.get("component")
per_comp.setdefault(comp, [])
if len(per_comp[comp]) < 4:
per_comp[comp].append(ld)
rows = []
for comp in sorted(per_comp.keys(), key=lambda x: (x is None, x)):
for ld in per_comp[comp]:
rows.append([f"PC{int(comp) + 1}" if comp is not None else "",
model._safe_str(ld.get("feature")),
_fmt_num(ld.get("loading"))])
if rows:
blocks.append(model.DataTable(
header=["Componente", "Variable", "Carga"], rows=rows,
title="Cargas principales (top por componente)",
note="Cargas con mayor valor absoluto: qué variables definen "
"cada eje."))
return blocks
def _kmeans_section(kmeans: dict, projection: dict, titles) -> list:
has_km = _is_dict(kmeans) and kmeans.get("best_k")
has_proj = _is_dict(projection) and projection.get("points")
if not has_km and not has_proj:
return []
blocks = [model.Heading(text="Segmentación (KMeans)", level=2)]
best_k = (projection or {}).get("best_k") or (kmeans or {}).get("best_k")
sil = (projection or {}).get("silhouette")
if sil is None:
sil = (kmeans or {}).get("silhouette")
intro = (
f"KMeans agrupa las filas en **{_fmt_num(best_k)} segmentos** elegidos "
"automáticamente maximizando el coeficiente de *silhouette* "
f"(**{_fmt_num(sil)}**, rango 1 a 1: cuanto más alto, segmentos más "
"compactos y separados). Los segmentos se proyectan sobre el plano de "
"los dos primeros componentes principales para visualizarlos."
)
blocks.append(model.Markdown(text=intro))
if has_proj:
scatter = _make_cluster_scatter(projection)
if scatter is not None:
blocks.append(model.Figure(
make=scatter,
caption="Cada punto es una fila coloreada por su segmento "
"KMeans; las «X» son los centroides."))
else:
blocks.append(model.Note(
"Proyección de clusters no dibujable (puntos y etiquetas "
"desalineados)."))
else:
# We have kmeans stats but no aligned points+labels to colour by.
blocks.append(model.Note(
"Scatter coloreado por segmento no disponible: el perfil no incluye "
"la proyección con etiquetas alineadas (pásala en "
"ctx['cluster_projection'] o las columnas crudas en "
"ctx['raw_numeric'] para colorear el plano PCA)."))
# Cluster sizes table.
sizes = (projection or {}).get("cluster_sizes") or (kmeans or {}).get("cluster_sizes") or []
total = sum(s for s in sizes if isinstance(s, (int, float))) or 0
if sizes:
rows = []
for i, s in enumerate(sizes):
pct = (s / total) if total else None
rows.append([f"Cluster {i}", _fmt_num(s), _fmt_pct_ratio(pct)])
blocks.append(model.DataTable(
header=["Segmento", "Tamaño", "% del total"], rows=rows,
title="Tamaño de cada segmento"))
# Per-cluster LLM micro-analysis (each entry kept indivisible as one block).
if titles:
blocks.append(model.Heading(text="Interpretación de los segmentos",
level=3))
for t in titles:
if not _is_dict(t):
continue
cid = t.get("cluster")
title = model._safe_str(t.get("title")) or f"Cluster {cid}"
desc = model._safe_str(t.get("description"))
line = f"**Cluster {cid}{title}.**"
if desc:
line += " " + desc
blocks.append(model.Markdown(text=line))
return blocks
def _outliers_section(outliers: dict) -> list:
if not _is_dict(outliers) or outliers.get("n_outliers") is None:
return []
if outliers.get("note") and not outliers.get("n_rows_used"):
# insufficient data — nothing meaningful to show.
return []
blocks = [model.Heading(text="Detección de anomalías (Isolation Forest)",
level=2)]
explain = (
"**Isolation Forest** detecta filas anómalas de forma *multivariante*: "
"construye árboles que parten el espacio con cortes aleatorios y mide "
"cuántos cortes hacen falta para aislar cada fila. Las filas raras "
"(combinaciones de valores poco frecuentes considerando **todas las "
"columnas a la vez**, no una sola) se aíslan con muy pocos cortes y "
"obtienen un score bajo. El **umbral** de decisión separa las filas "
"normales de las anómalas según la contaminación esperada del modelo: "
"una fila es outlier cuando su score queda por debajo de ese umbral."
)
blocks.append(model.Markdown(text=explain))
blocks.append(model.KVTable(rows=[
("Filas analizadas", _fmt_num(outliers.get("n_rows_used"))),
("Outliers detectados", _fmt_num(outliers.get("n_outliers"))),
("% outliers", _fmt_pct_already(outliers.get("outlier_pct"))),
("Umbral de decisión", _fmt_num(outliers.get("threshold"), 4)),
], title="Anomalías multivariantes"))
return blocks
def _normality_section(normality: dict) -> list:
if not _is_dict(normality) or not normality:
return []
header = ["Columna", "Jarque-Bera (p)", "D'Agostino (p)", "Shapiro (p)",
"¿Normal?"]
rows = []
for col, res in normality.items():
if not _is_dict(res):
continue
jb = res.get("jarque_bera") if _is_dict(res.get("jarque_bera")) else {}
da = res.get("dagostino") if _is_dict(res.get("dagostino")) else {}
sh = res.get("shapiro") if _is_dict(res.get("shapiro")) else {}
is_norm = res.get("is_normal")
if res.get("note") and is_norm is None and not jb:
rows.append([model._safe_str(col), "", "", "",
model._safe_str(res.get("note"))])
continue
rows.append([
model._safe_str(col),
_fmt_num(jb.get("p"), 4) if jb else "",
_fmt_num(da.get("p"), 4) if da else "",
_fmt_num(sh.get("p"), 4) if sh else "",
"" if is_norm else ("no" if is_norm is not None else ""),
])
if not rows:
return []
return [
model.Heading(text="Normalidad de las variables", level=2),
model.Markdown(text=(
"Tests de hipótesis de normalidad por columna (hipótesis nula: la "
"muestra proviene de una distribución normal). Se marca **normal** "
"cuando el p-valor supera 0,05 (no se rechaza la nula). Pocas "
"variables reales son estrictamente normales; esto orienta qué "
"transformaciones o tests robustos aplicar después.")),
model.DataTable(header=header, rows=rows,
title="Pruebas de normalidad"),
]
# --------------------------------------------------------------------------- #
# Entry point.
# --------------------------------------------------------------------------- #
def build_modelos(profile: dict, ctx: dict):
"""Build the MODELOS Chapter, or None if there are no models to show."""
profile = profile or {}
ctx = ctx or {}
if not isinstance(profile, dict):
return None
models = profile.get("models")
if not _is_dict(models):
return None
pca = models.get("pca") if _is_dict(models.get("pca")) else None
kmeans = models.get("kmeans") if _is_dict(models.get("kmeans")) else None
outliers = models.get("outliers") if _is_dict(models.get("outliers")) else None
normality = models.get("normality") if _is_dict(models.get("normality")) else None
projection, _src = _resolve_cluster_projection(profile, ctx)
titles = _cluster_titles(profile, ctx, projection) if (
(kmeans and kmeans.get("best_k")) or (projection and projection.get("points"))
) else None
sections = []
sections += _pca_section(pca) if pca else []
sections += _kmeans_section(kmeans, projection, titles)
sections += _outliers_section(outliers) if outliers else []
sections += _normality_section(normality) if normality else []
if not sections:
return None # models block present but nothing renderable.
blocks = _normalization_intro() + sections
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -0,0 +1,259 @@
"""Tests for the MODELOS chapter — DoD: golden + edges + anti-cut.
Self-contained: builds a synthetic TableProfile with a ``models`` block (no
DuckDB, no sklearn, no LLM, no network). The cluster scatter is fed a synthetic
pre-computed ``cluster_projection`` via ``ctx`` and the per-cluster titles via
``ctx['cluster_titles']`` so the suite is fast and deterministic. The live paths
(``project_clusters_2d`` / ``describe_clusters_llm``) are exercised against the
real wine dataset in the work report, not here.
Verifies: the chapter renders to PDF *and* PPTX showing the user-required pieces
(markdown text, PCA scree, cluster scatter, per-cluster LLM micro-analysis,
outlier + normalization explanations); that an inapplicable profile yields None
without raising; and that a long normality table is split without losing any
column (anti-cut).
"""
import os
import re
import tempfile
from pypdf import PdfReader
from pptx import Presentation
from datascience.automatic_eda.chapters.modelos import build_modelos
from datascience.automatic_eda.model import Figure, DataTable, Markdown
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
# --------------------------------------------------------------------------- #
# Synthetic fixtures.
# --------------------------------------------------------------------------- #
def _models_block(n_norm_cols: int = 4) -> dict:
feats = ["fixed_acidity", "alcohol", "ph", "sulphates"]
normality = {}
for i in range(n_norm_cols):
normality[f"col_{i}"] = {
"n": 500,
"jarque_bera": {"stat": 12.3, "p": 0.002 + i * 0.0001, "normal": False},
"dagostino": {"stat": 9.1, "p": 0.01, "normal": False},
"shapiro": {"stat": 0.98, "p": 0.04, "normal": False},
"is_normal": False,
}
return {
"n_numeric_cols": 4,
"pca": {
"n_components": 2, "n_rows_used": 1599, "n_features": 4,
"explained_variance_ratio": [0.41, 0.22],
"cumulative": [0.41, 0.63],
"top_loadings": [
{"component": 0, "feature": "alcohol", "loading": 0.62},
{"component": 0, "feature": "fixed_acidity", "loading": -0.48},
{"component": 1, "feature": "ph", "loading": 0.71},
{"component": 1, "feature": "sulphates", "loading": 0.33},
],
"projection": [[0.1, 0.2], [0.3, -0.1]],
},
"kmeans": {
"best_k": 3, "silhouette": 0.27,
"scores_by_k": [{"k": 2, "silhouette": 0.21}, {"k": 3, "silhouette": 0.27}],
"cluster_sizes": [700, 500, 399],
"centers": [[0.1, 0.2, 0.3, 0.4]],
"n_rows_used": 1599, "n_features": 4,
},
"outliers": {
"n_outliers": 80, "outlier_pct": 5.0, "threshold": -0.0123,
"n_rows_used": 1599,
},
"normality": normality,
"note": "",
"_feats": feats,
}
def _cluster_projection() -> dict:
# 30 points across 3 clusters, aligned points<->labels.
points, labels = [], []
centers = [(-2.0, -2.0), (2.0, 0.0), (0.0, 2.5)]
for cl, (cx, cy) in enumerate(centers):
for j in range(10):
points.append([cx + (j - 5) * 0.05, cy + (j - 5) * 0.05])
labels.append(cl)
return {
"points": points, "labels": labels,
"centers_2d": [list(c) for c in centers],
"best_k": 3, "silhouette": 0.27,
"explained_2d": [0.41, 0.22],
"cluster_sizes": [10, 10, 10],
"cluster_profiles": [
{"cluster": 0, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 9.5, "ph": 3.5},
"distinctive": ["alcohol", "ph"], "centroid_z": {"alcohol": -1.2}},
{"cluster": 1, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 12.0, "ph": 3.1},
"distinctive": ["alcohol"], "centroid_z": {"alcohol": 1.4}},
{"cluster": 2, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 10.5, "ph": 3.8},
"distinctive": ["ph"], "centroid_z": {"ph": 1.6}},
],
"feature_names": ["alcohol", "ph", "fixed_acidity", "sulphates"],
"n_used": 1599, "note": "",
}
def _ctx_full() -> dict:
return {
"cluster_projection": _cluster_projection(),
"cluster_titles": [
{"cluster": 0, "title": "Vinos suaves de baja graduación",
"description": "Alcohol bajo y pH alto; perfil ligero."},
{"cluster": 1, "title": "Vinos potentes",
"description": "Alta graduación alcohólica."},
{"cluster": 2, "title": "Vinos de pH elevado",
"description": "Acidez baja relativa al resto."},
],
}
def _profile() -> dict:
return {"table": "wine", "n_rows": 1599, "n_cols": 12,
"models": _models_block()}
def _pdf_text(path: str) -> str:
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
return re.sub(r"\s+", " ", txt)
def _pptx_text(path: str) -> str:
prs = Presentation(path)
out = []
for slide in prs.slides:
for shape in slide.shapes:
if shape.has_text_frame:
out.append(shape.text_frame.text)
return re.sub(r"\s+", " ", " ".join(out))
# --------------------------------------------------------------------------- #
# Golden.
# --------------------------------------------------------------------------- #
def test_golden_build_modelos_bloques_requeridos():
ch = build_modelos(_profile(), _ctx_full())
assert ch is not None
assert ch.id == "modelos" and ch.version
# Both figures present: scree plot + cluster scatter.
n_figures = sum(1 for b in ch.blocks if isinstance(b, Figure))
assert n_figures >= 2
# Tables present (variance, loadings, sizes, normality).
assert sum(1 for b in ch.blocks if isinstance(b, DataTable)) >= 3
# Markdown carries the required explanations.
md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown))
assert "z-score" in md # normalization explained
assert "Isolation Forest" in md # outlier generation explained
assert "silhouette" in md # kmeans
# Per-cluster micro-analysis titles present.
assert "Vinos potentes" in md
assert "Cluster 1" in md
def test_golden_render_pdf_muestra_lo_exigido():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "modelos.pdf")
res = render_automatic_eda_pdf(
_profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()})
assert res["path"] == out and os.path.exists(out)
ids = [c["id"] for c in res["chapters"]]
assert "modelos" in ids
txt = _pdf_text(out)
for needle in ("Modelos no supervisados", "z-score", "PCA",
"Segmentación", "Isolation Forest", "Normalidad",
"Vinos potentes"):
assert needle in txt, f"falta en PDF: {needle}"
def test_golden_render_pptx_muestra_lo_exigido():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "modelos.pptx")
res = render_automatic_eda_pptx(
_profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()})
assert res["path"] == out and os.path.exists(out)
assert res["n_slides"] >= 1
txt = _pptx_text(out)
for needle in ("Modelos no supervisados", "z-score", "Isolation Forest",
"Vinos potentes"):
assert needle in txt, f"falta en PPTX: {needle}"
# --------------------------------------------------------------------------- #
# Edges.
# --------------------------------------------------------------------------- #
def test_edge_profile_none_o_vacio_devuelve_none():
assert build_modelos(None, {}) is None
assert build_modelos({}, {}) is None
assert build_modelos({"n_rows": 5}, None) is None # no 'models' key
def test_edge_models_insuficiente_devuelve_none():
prof = {"table": "tiny", "models": {
"n_numeric_cols": 1,
"pca": {"n_components": 0, "explained_variance_ratio": [],
"note": "datos insuficientes"},
"kmeans": {"best_k": 0, "note": "datos insuficientes"},
"outliers": {"n_outliers": 0, "note": "datos insuficientes"},
"normality": None,
"note": "insuficientes columnas numericas para modelos multivariantes",
}}
assert build_modelos(prof, {}) is None
def test_edge_solo_normalidad_si_genera_capitulo():
# A single numeric column: only normality applies. Chapter must still build.
prof = {"table": "one", "models": {
"n_numeric_cols": 1, "pca": None, "kmeans": None, "outliers": None,
"normality": {"x": {"n": 500, "jarque_bera": {"stat": 1.0, "p": 0.2,
"normal": True}, "dagostino": {"stat": 1.0, "p": 0.3,
"normal": True}, "shapiro": {"stat": 0.99, "p": 0.4,
"normal": True}, "is_normal": True}},
}}
ch = build_modelos(prof, {})
assert ch is not None
md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown))
assert "z-score" in md # normalization intro still present
def test_edge_kmeans_sin_proyeccion_degrada_sin_romper():
# kmeans stats present but no cluster_projection / raw_numeric to colour by.
prof = _profile()
ch = build_modelos(prof, {}) # no ctx projection
assert ch is not None
# No scatter figure for clusters, but a Note explaining the degradation.
notes = [b.text for b in ch.blocks if b.kind == "note"]
assert any("ctx['raw_numeric']" in n or "cluster_projection" in n
for n in notes)
# PDF still renders fine.
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "deg.pdf")
res = render_automatic_eda_pdf(prof, out, {"write_manifest": False})
assert res["path"] == out and os.path.exists(out)
# --------------------------------------------------------------------------- #
# Anti-cut.
# --------------------------------------------------------------------------- #
def test_anticortes_tabla_normalidad_larga_no_corta():
# 40 numeric columns → the normality DataTable must split across pages,
# repeating the header, without losing any column name.
prof = {"table": "wide", "models": _models_block(n_norm_cols=40)}
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "wide.pdf")
render_automatic_eda_pdf(prof, out, {"write_manifest": False,
"ctx": _ctx_full()})
reader = PdfReader(out)
n_pages = len(reader.pages)
assert n_pages > 1
txt = "".join((pg.extract_text() or "") for pg in reader.pages)
# Every column name survives (wrapped/split, never truncated).
for i in (0, 19, 39):
assert f"col_{i}" in txt
@@ -0,0 +1,97 @@
---
name: describe_clusters_llm
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def describe_clusters_llm(cluster_profiles: list, feature_names: list, model: str = \"claude-haiku-4-5-20251001\") -> dict"
description: "Micro-analisis LLM de clusters de KMeans (grupo eda). Toma los perfiles AGREGADOS de cada cluster (los que produce project_clusters_2d: tamano, centroide en escala original, features distintivas y centroide en z-score) y, con UNA sola llamada al LLM, pide por cada cluster un TITULO corto + una descripcion de 1-2 frases en espanol. Clave de coste/privacidad: NO envia filas crudas, solo el resumen agregado de cada grupo (tamano, % del total y la media de las features distintivas con su signo respecto a la media global). Reusa ask_llm del grupo claude-direct (API directa con token OAuth de Claude). Impura, dict-no-throw: nunca lanza, degrada a titulos genericos 'Cluster N' si el LLM no responde o el parseo falla."
tags: [eda, clustering, llm, claude-direct, datascience, kmeans]
params:
- name: cluster_profiles
desc: "Lista de perfiles de cluster con la forma que produce project_clusters_2d: cada uno {cluster:int, size:int, pct:float, centroid_original:{feature: media en escala original}, distinctive:[features distintivas], centroid_z:{feature: z-score}}. Solo se le envia al LLM un resumen agregado; nunca filas crudas. Lista vacia o no-lista -> clusters=[] sin llamar al LLM."
- name: feature_names
desc: "Nombres de las features del dataset. Se incluyen como contexto en el prompt para que el LLM pueda nombrar los clusters; no es obligatorio que coincida con las features distintivas de cada perfil."
- name: model
desc: "id del modelo Anthropic a usar. Default 'claude-haiku-4-5-20251001' (haiku, coste bajo, ~2-3s). Para titulos/descripciones mas finas, pasar p.ej. 'claude-opus-4-8'."
output: "dict dict-no-throw: {clusters:[{cluster:int, title:str, description:str}], model:str, note:str}. note=='' si todo fue bien. Si el LLM no respondio (note='LLM no disponible') o el parseo fallo (note='parse fallido'), clusters trae titulos genericos 'Cluster N' con description vacia. Si cluster_profiles esta vacio o no es lista: {clusters:[], model, note:'sin clusters'}. NUNCA lanza."
uses_functions: [ask_llm_py_core]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: true
tests: ["test_parse_clusters_json_valid_array", "test_parse_clusters_json_wrapped_in_junk_text", "test_parse_clusters_json_non_json_returns_none", "test_parse_clusters_json_fills_missing_cluster_by_index", "test_describe_clusters_llm_ok_with_monkeypatched_llm", "test_describe_clusters_llm_degrades_on_empty_response", "test_describe_clusters_llm_degrades_on_unparseable_response", "test_describe_clusters_llm_empty_list_skips_llm", "test_describe_clusters_llm_non_list_input_skips_llm"]
test_file_path: "python/functions/datascience/describe_clusters_llm_test.py"
file_path: "python/functions/datascience/describe_clusters_llm.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.describe_clusters_llm import describe_clusters_llm
# Perfiles agregados producidos por project_clusters_2d (no hay filas crudas).
cluster_profiles = [
{
"cluster": 0, "size": 60, "pct": 60.0,
"centroid_original": {"acidez": 8.5, "alcohol": 9.2},
"distinctive": ["acidez", "alcohol"],
"centroid_z": {"acidez": 1.4, "alcohol": -0.9},
},
{
"cluster": 1, "size": 40, "pct": 40.0,
"centroid_original": {"acidez": 5.1, "alcohol": 13.0},
"distinctive": ["alcohol"],
"centroid_z": {"acidez": -0.7, "alcohol": 1.6},
},
]
feature_names = ["acidez", "alcohol", "azucar"]
out = describe_clusters_llm(cluster_profiles, feature_names) # haiku por defecto
# out = describe_clusters_llm(cluster_profiles, feature_names, model="claude-opus-4-8")
if not out["note"]:
for c in out["clusters"]:
print(f"Cluster {c['cluster']}: {c['title']}")
print(" ", c["description"])
else:
# Degradacion: titulos genericos "Cluster N".
print("LLM no usado:", out["note"])
for c in out["clusters"]:
print(c["cluster"], c["title"])
```
## Cuando usarla
Cuando ya has clusterizado un dataset (KMeans + `project_clusters_2d`) y quieres
poner NOMBRE y descripcion legible a cada grupo en vez de dejar "Cluster 0/1/2".
Es el paso interpretativo que sigue al perfilado de clusters: `project_clusters_2d`
calcula tamano, centroides y features distintivas, y `describe_clusters_llm` los
traduce a un titulo corto + 1-2 frases por cluster. Usala al cerrar un EDA con
segmentacion para el resumen final o el report. Una sola llamada al LLM describe
todos los clusters a la vez (barato).
## Gotchas
- **Impura: hace 1 llamada de red al LLM.** No es determinista ni gratis. Latencia
tipica ~2-3s con haiku.
- **Requiere token OAuth de Claude** en `~/.claude/.credentials.json` (via `ask_llm`
/ grupo `claude-direct`). Sin token / sin red, NO lanza: degrada a titulos
genericos `Cluster N` con `note="LLM no disponible"`.
- **NO envia filas crudas al LLM**, solo el resumen AGREGADO de cada cluster
(tamano, % del total y la media de las features distintivas con su signo respecto
a la media global). Privacidad y coste minimos por diseno — pero requiere que los
perfiles vengan ya calculados por `project_clusters_2d`.
- **Modelo `haiku` por defecto** para coste bajo; sube a `claude-opus-4-8` si
necesitas titulos/descripciones mas finas (mas caro y lento).
- **dict-no-throw**: si el modelo no devuelve un JSON array parseable, retorna
titulos genericos con `note="parse fallido"`. Comprueba siempre `out["note"]`
antes de fiarte de los titulos.
- El LLM puede sobre-interpretar: el system prompt le pide ser sobrio y no inventar
causas, pero revisa los titulos antes de publicarlos en un report.
@@ -0,0 +1,240 @@
"""describe_clusters_llm — micro-analisis LLM de clusters de KMeans (grupo `eda`).
Toma los PERFILES AGREGADOS de cada cluster (los que produce `project_clusters_2d`:
tamano, centroide en escala original, features distintivas y centroide en z-score)
y, con UNA sola llamada al LLM, pide por cada cluster un TITULO corto + una
descripcion de 1-2 frases, en espanol.
Clave de coste y privacidad: NO se envian filas crudas al LLM. Solo viaja el
perfil AGREGADO de cada grupo (tamano, % del total y la media de las features
distintivas con su signo respecto a la media global). El coste es minimo y ningun
dato fila-a-fila sale del proceso.
Reusa `ask_llm` del registry (grupo claude-direct, API directa con el token OAuth
de Claude en ~/.claude/.credentials.json, arranque 0). Impura: una llamada de red.
Estilo dict-no-throw: NUNCA lanza; ante cualquier fallo (red, LLM caido, parseo)
degrada a titulos genericos "Cluster N" + una nota explicando el motivo.
"""
import json
from core.ask_llm import ask_llm
_SYSTEM = (
"Eres un analista de datos. Recibes los PERFILES AGREGADOS de los clusters de "
"un KMeans (por cada grupo: su tamano y la media de sus features distintivas, "
"con el signo respecto a la media global; nunca filas crudas) y los describes "
"de forma sobria y util. Para cada cluster generas un titulo corto y "
"descriptivo (por ejemplo 'Vinos de alta acidez y baja graduacion') y una "
"descripcion de 1-2 frases. NO inventes causas ni sobre-interpretes: limitate a "
"lo que dicen los numeros. Responde en espanol. Responde SIEMPRE y SOLO con un "
"unico JSON array valido, sin texto alrededor y sin fences de markdown, con "
'EXACTAMENTE la forma [{"cluster": <int>, "title": "<titulo corto>", '
'"description": "<1-2 frases>"}], un objeto por cluster.'
)
def _fmt_num(value) -> str:
"""Formatea un numero de forma compacta para el prompt (None -> '?')."""
if value is None:
return "?"
if isinstance(value, bool):
return str(value)
if isinstance(value, float):
if value == int(value):
return str(int(value))
return f"{value:.4g}"
return str(value)
def _cluster_id(profile: dict, index: int) -> int:
"""Devuelve el id del cluster del perfil, o el indice si no es un int valido."""
raw = (profile or {}).get("cluster")
if isinstance(raw, bool):
return index
if isinstance(raw, int):
return raw
try:
return int(raw)
except (TypeError, ValueError):
return index
def _build_prompt(cluster_profiles: list, feature_names: list) -> str:
"""Construye un resumen textual compacto de los perfiles para el LLM.
Funcion interna PURA: no toca red ni disco, es testeable sin credenciales.
Por cada cluster incluye su numero, tamano (size + pct%) y, para cada feature
distintiva, el valor del centroide en escala original mas si esta por encima o
por debajo de la media (signo del z-score en centroid_z). Pasa AGREGADOS, nunca
dato crudo de filas.
Args:
cluster_profiles: lista de perfiles de cluster (forma de project_clusters_2d).
feature_names: nombres de las features del dataset (solo contexto).
Returns:
El texto del prompt.
"""
cluster_profiles = cluster_profiles or []
feature_names = feature_names if isinstance(feature_names, list) else []
lines = [
"Perfiles AGREGADOS de clusters de KMeans. No hay filas crudas, solo medias por grupo.",
f"Numero de clusters: {len(cluster_profiles)}",
]
if feature_names:
lines.append("Features del dataset: " + ", ".join(str(f) for f in feature_names))
lines.append("")
for i, prof in enumerate(cluster_profiles):
prof = prof or {}
cid = _cluster_id(prof, i)
size = prof.get("size")
pct = prof.get("pct")
pct_str = f"{pct:.1f}%" if isinstance(pct, (int, float)) and not isinstance(pct, bool) else "?"
lines.append(f"Cluster {cid}: tamano={_fmt_num(size)} ({pct_str} del total)")
distinctive = prof.get("distinctive") or []
centroid_o = prof.get("centroid_original") or {}
centroid_z = prof.get("centroid_z") or {}
if distinctive:
lines.append(" Features distintivas (media del grupo):")
for feat in distinctive:
val = centroid_o.get(feat)
z = centroid_z.get(feat)
direction = ""
if isinstance(z, (int, float)) and not isinstance(z, bool):
if z > 0:
direction = "por encima de la media"
elif z < 0:
direction = "por debajo de la media"
else:
direction = "en la media"
if direction:
lines.append(f" - {feat}: {_fmt_num(val)} ({direction})")
else:
lines.append(f" - {feat}: {_fmt_num(val)}")
else:
lines.append(" (sin features distintivas marcadas)")
lines.append("")
lines.append(
"Devuelve SOLO el JSON array descrito en las instrucciones del sistema, "
"sin texto antes ni despues."
)
return "\n".join(lines)
def _parse_clusters_json(text: str, n: int):
"""Extrae y normaliza el array JSON de la respuesta del LLM.
Funcion interna testeable sin red. Localiza el primer '[' y el ultimo ']' del
texto (tolerando texto basura alrededor o fences de markdown), hace json.loads
y normaliza cada entrada a {cluster:int, title:str, description:str}, rellenando
el cluster por indice si falta. NUNCA lanza: ante cualquier fallo devuelve None
(senal de degradacion para el caller).
Args:
text: respuesta cruda del LLM.
n: numero de perfiles esperados (referencia; la longitud real la marca el array).
Returns:
Lista normalizada de dicts, o None si no se pudo parsear un array valido.
"""
if not text or not isinstance(text, str):
return None
start = text.find("[")
end = text.rfind("]")
if start == -1 or end == -1 or end <= start:
return None
try:
data = json.loads(text[start : end + 1])
except (ValueError, TypeError):
return None
if not isinstance(data, list):
return None
out = []
for i, item in enumerate(data):
if not isinstance(item, dict):
out.append({"cluster": i, "title": f"Cluster {i}", "description": ""})
continue
raw_cluster = item.get("cluster")
if isinstance(raw_cluster, bool):
cluster = i
elif isinstance(raw_cluster, int):
cluster = raw_cluster
else:
try:
cluster = int(raw_cluster)
except (TypeError, ValueError):
cluster = i
title = item.get("title")
title = str(title) if title is not None else f"Cluster {cluster}"
desc = item.get("description")
desc = str(desc) if desc is not None else ""
out.append({"cluster": cluster, "title": title, "description": desc})
return out
def _generic_clusters(cluster_profiles: list) -> list:
"""Titulos genericos por cluster para la degradacion (sin LLM)."""
out = []
for i, prof in enumerate(cluster_profiles):
cid = _cluster_id(prof or {}, i)
out.append({"cluster": cid, "title": f"Cluster {cid}", "description": ""})
return out
def describe_clusters_llm(
cluster_profiles: list,
feature_names: list,
model: str = "claude-haiku-4-5-20251001",
) -> dict:
"""Describe los clusters de un KMeans con UNA sola llamada al LLM.
Args:
cluster_profiles: lista de perfiles de cluster (la forma que produce
project_clusters_2d): cada uno {"cluster": int, "size": int,
"pct": float, "centroid_original": {feature: media},
"distinctive": [features], "centroid_z": {feature: z}}. Solo se le
envia al LLM el resumen agregado, nunca filas crudas.
feature_names: nombres de las features del dataset (contexto para el LLM).
model: id del modelo Anthropic. Default claude-haiku-4-5-20251001
(haiku, coste bajo).
Returns:
dict dict-no-throw: {"clusters": [{cluster:int, title:str, description:str}],
"model": str, "note": str}. note == "" si todo fue bien; si el LLM no
respondio o el parseo fallo, clusters trae titulos genericos "Cluster N" y
note explica el motivo ("LLM no disponible" / "parse fallido"). Si
cluster_profiles esta vacio o no es lista, devuelve clusters=[] sin llamar
al LLM (note "sin clusters"). NUNCA lanza.
"""
if not isinstance(cluster_profiles, list) or not cluster_profiles:
return {"clusters": [], "model": model, "note": "sin clusters"}
n = len(cluster_profiles)
prompt = _build_prompt(cluster_profiles, feature_names)
try:
text = ask_llm(prompt, model=model, system=_SYSTEM, echo=False)
except Exception: # noqa: BLE001 — degradacion: cualquier fallo de red/LLM.
text = ""
parsed = _parse_clusters_json(text, n)
if parsed:
return {"clusters": parsed, "model": model, "note": ""}
note = "LLM no disponible" if not text else "parse fallido"
return {"clusters": _generic_clusters(cluster_profiles), "model": model, "note": note}
@@ -0,0 +1,160 @@
"""Tests para describe_clusters_llm.
NO acceden a red ni a credenciales: _parse_clusters_json es testeable aislada y la
unica via que llamaria al LLM (describe_clusters_llm) se prueba monkeypatcheando
ask_llm con respuestas simuladas. Cubre golden (LLM ok), edge (cluster faltante,
array envuelto en basura, lista vacia / input no-lista) y error (LLM caido, texto
no parseable) — todos sin tocar la red.
"""
import importlib
import json
from datascience.describe_clusters_llm import (
_parse_clusters_json,
describe_clusters_llm,
)
# Perfiles de ejemplo con la forma que produce project_clusters_2d.
_PROFILES = [
{
"cluster": 0,
"size": 60,
"pct": 60.0,
"centroid_original": {"acidez": 8.5, "alcohol": 9.2},
"distinctive": ["acidez", "alcohol"],
"centroid_z": {"acidez": 1.4, "alcohol": -0.9},
},
{
"cluster": 1,
"size": 40,
"pct": 40.0,
"centroid_original": {"acidez": 5.1, "alcohol": 13.0},
"distinctive": ["alcohol"],
"centroid_z": {"acidez": -0.7, "alcohol": 1.6},
},
]
_FEATURES = ["acidez", "alcohol", "azucar"]
def _patch_ask_llm(monkeypatch, returner):
"""Monkeypatchea ask_llm en el modulo bajo prueba con un callable simulado."""
mod = importlib.import_module("datascience.describe_clusters_llm")
monkeypatch.setattr(
mod, "ask_llm", lambda prompt, model="x", system="", echo=True: returner
)
# --- _parse_clusters_json (parser puro, sin red) ---
def test_parse_clusters_json_valid_array():
text = json.dumps(
[
{"cluster": 0, "title": "A", "description": "desc a"},
{"cluster": 1, "title": "B", "description": "desc b"},
]
)
parsed = _parse_clusters_json(text, 2)
assert parsed == [
{"cluster": 0, "title": "A", "description": "desc a"},
{"cluster": 1, "title": "B", "description": "desc b"},
]
def test_parse_clusters_json_wrapped_in_junk_text():
payload = [{"cluster": 0, "title": "Solo uno", "description": "d"}]
text = "Claro, aqui tienes el resultado:\n" + json.dumps(payload) + "\nEspero que sirva."
parsed = _parse_clusters_json(text, 1)
assert parsed[0]["title"] == "Solo uno"
assert parsed[0]["cluster"] == 0
def test_parse_clusters_json_non_json_returns_none():
# Texto sin array JSON -> degradacion (None) sin lanzar.
assert _parse_clusters_json("no hay json aqui", 2) is None
assert _parse_clusters_json("", 2) is None
assert _parse_clusters_json("{solo un objeto}", 2) is None
def test_parse_clusters_json_fills_missing_cluster_by_index():
text = json.dumps(
[
{"title": "A", "description": "d"},
{"title": "B", "description": "e"},
]
)
parsed = _parse_clusters_json(text, 2)
assert parsed[0]["cluster"] == 0
assert parsed[1]["cluster"] == 1
assert parsed[0]["title"] == "A"
# --- describe_clusters_llm (con ask_llm monkeypatcheado, sin red) ---
def test_describe_clusters_llm_ok_with_monkeypatched_llm(monkeypatch):
fake = json.dumps(
[
{
"cluster": 0,
"title": "Vinos de alta acidez",
"description": "Acidez por encima de la media y graduacion baja.",
},
{
"cluster": 1,
"title": "Vinos de alta graduacion",
"description": "Alcohol claramente por encima de la media.",
},
]
)
_patch_ask_llm(monkeypatch, fake)
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["note"] == ""
assert out["model"] == "claude-haiku-4-5-20251001"
assert len(out["clusters"]) == 2
assert out["clusters"][0]["title"] == "Vinos de alta acidez"
assert set(out["clusters"][0].keys()) == {"cluster", "title", "description"}
def test_describe_clusters_llm_degrades_on_empty_response(monkeypatch):
# ask_llm devuelve "" (error/red caida) -> titulos genericos + note.
_patch_ask_llm(monkeypatch, "")
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["clusters"][0]["title"] == "Cluster 0"
assert out["clusters"][1]["title"] == "Cluster 1"
assert out["clusters"][0]["description"] == ""
assert out["note"] == "LLM no disponible"
assert out["model"] == "claude-haiku-4-5-20251001"
def test_describe_clusters_llm_degrades_on_unparseable_response(monkeypatch):
_patch_ask_llm(monkeypatch, "lo siento, no puedo ayudarte con eso")
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["clusters"][0]["title"] == "Cluster 0"
assert out["clusters"][1]["title"] == "Cluster 1"
assert out["note"] == "parse fallido"
def test_describe_clusters_llm_empty_list_skips_llm(monkeypatch):
# Con lista vacia NO debe llamarse al LLM en absoluto.
def boom(*args, **kwargs):
raise AssertionError("ask_llm no debe llamarse con lista vacia")
mod = importlib.import_module("datascience.describe_clusters_llm")
monkeypatch.setattr(mod, "ask_llm", boom)
out = describe_clusters_llm([], _FEATURES)
assert out["clusters"] == []
assert out["note"] == "sin clusters"
def test_describe_clusters_llm_non_list_input_skips_llm():
# Input no-lista (None) -> clusters vacio sin tocar la red.
out = describe_clusters_llm(None, _FEATURES)
assert out["clusters"] == []
assert out["note"] == "sin clusters"
assert out["model"] == "claude-haiku-4-5-20251001"
@@ -0,0 +1,95 @@
---
name: project_clusters_2d
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def project_clusters_2d(columns: dict, k_min: int = 2, k_max: int = 8, max_points: int = 2000) -> dict"
description: "PCA a 2D + KMeans sobre el MISMO subset numerico estandarizado, devolviendo proyeccion 2D y labels de cluster ALINEADOS por fila para pintar un scatter PCA coloreado por cluster. Estandariza una sola vez, elige k por silhouette y proyecta centroides al espacio PCA. Determinista."
tags: [eda, models, clustering, pca, kmeans, scatter, dimensionality-reduction, datascience, sklearn]
params:
- name: columns
desc: "Mapa {nombre_columna: [valores numericos]}. Listas alineadas por fila (misma longitud). Columnas no numericas o con <2 valores distintos se descartan; None/NaN descartan la fila completa (listwise)."
- name: k_min
desc: "Numero minimo de clusters a probar por silhouette (default 2). El minimo de filas validas requerido es max(3, k_min*2)."
- name: k_max
desc: "Numero maximo de clusters a probar (default 8). Se acota a min(k_max, n_filas_validas-1)."
- name: max_points
desc: "Tope de puntos devueltos en points/labels (default 2000). Si n_used lo supera, points y labels se submuestrean CONJUNTAMENTE con paso determinista para seguir alineados; el fit usa siempre todas las filas."
output: "dict con points (proyeccion 2D, posiblemente submuestreada a max_points), labels (cluster de cada point, alineado con points), centers_2d (centroides en espacio PCA, len==best_k), best_k, silhouette, explained_2d ([var PC1, var PC2]), cluster_sizes (sobre n_used total), cluster_profiles (lista de {cluster, size, pct, centroid_original, distinctive top-3 por |z|, centroid_z}), feature_names, n_used (filas del fit antes de muestreo) y note (\"\" si ok). Con <2 columnas numericas o <max(3, k_min*2) filas validas devuelve best_k=0, listas vacias y note 'datos insuficientes' sin lanzar excepcion."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [numpy, scikit-learn]
tested: true
tests: ["test_golden_three_blobs_aligned_projection_and_clusters", "test_edge_subsampling_keeps_points_labels_aligned", "test_edge_single_numeric_column_insufficient", "test_edge_too_few_rows_insufficient", "test_edge_non_numeric_column_dropped_without_error", "test_edge_constant_column_dropped"]
test_file_path: "python/functions/datascience/project_clusters_2d_test.py"
file_path: "python/functions/datascience/project_clusters_2d.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.project_clusters_2d import project_clusters_2d
# Tres grupos gaussianos bien separados sobre 4 features.
import numpy as np
rng = np.random.default_rng(0)
rows = []
for center in (np.full(4, 0.0), np.full(4, 12.0), np.array([0.0, 12.0, 0.0, 12.0])):
rows.extend(rng.normal(loc=center, scale=0.4, size=(50, 4)))
mat = np.array(rows)
columns = {f"f{j}": [float(v) for v in mat[:, j]] for j in range(4)}
res = project_clusters_2d(columns, k_min=2, k_max=8)
print(res["best_k"]) # 3
print(len(res["points"]), len(res["labels"])) # 150 150 (alineados)
print(len(res["centers_2d"])) # == best_k
print([round(v, 2) for v in res["explained_2d"]]) # varianza de PC1, PC2
# Pintar: scatter(points[:,0], points[:,1], c=labels) + marcar centers_2d.
```
## Cuando usarla
Cuando, durante un EDA, quieres un scatter 2D de un dataset tabular numerico
coloreado por segmento descubierto automaticamente, y necesitas que cada punto
de la proyeccion lleve su etiqueta de cluster correcta. Usala en vez de
combinar `pca_explained` + `kmeans_segments` a mano: esas estandarizan por
separado y descartan los labels, asi que sus salidas no se pueden cruzar fila a
fila. Esta funcion garantiza esa alineacion (mismo X estandarizado para PCA y
KMeans) y ademas proyecta los centroides KMeans al espacio PCA para dibujarlos.
## Gotchas
- Funcion pura y determinista (StandardScaler + PCA random_state=0 + KMeans
random_state=0, n_init=10), pero requiere `numpy` y `scikit-learn` instalados.
- `points`/`labels` pueden venir submuestreados si `n_used > max_points` (paso
determinista `[::ceil(n_used/max_points)]`); `n_used`, `centers_2d`,
`cluster_sizes` y `cluster_profiles` se calculan SIEMPRE sobre todas las filas.
Cuando hay submuestreo, `note` lo indica.
- `centroid_z` y `distinctive` estan en z-score (espacio escalado);
`centroid_original` esta en las unidades originales (via
`scaler.inverse_transform`). No mezcles ambos al interpretar.
- `centers_2d` esta en el espacio PCA (coordenadas del scatter), no en unidades
originales: pintalo sobre el mismo eje que `points`.
- Silhouette baja con best_k alto sugiere que no hay estructura de cluster real;
el scatter puede no mostrar grupos separados.
## Notas
Pieza de composicion que `pca_explained` + `kmeans_segments` no cubren: ambas
estandarizan internamente por separado (cada una su propio `StandardScaler`) y
`kmeans_segments` no expone los labels por fila, por lo que no se pueden cruzar
con la `projection` de `pca_explained`. Esta funcion usa `sklearn` directo
(StandardScaler una sola vez compartido por PCA y KMeans) para garantizar la
alineacion `points[i] <-> labels[i]` y proyectar los centroides KMeans al
espacio PCA. Coercion y listwise deletion siguen el estilo de `pca_explained`
(None/NaN -> fila descartada, columnas no parseables o constantes descartadas).
Degrada con gracia: con <2 columnas numericas o <max(3, k_min*2) filas validas
devuelve `note: "datos insuficientes"` sin lanzar excepcion (try/except
defensivo en todo el cuerpo).
@@ -0,0 +1,208 @@
"""Proyeccion PCA-2D + KMeans sobre el mismo subset, con puntos y labels alineados.
Estandariza una sola vez las columnas numericas (z-score), proyecta a 2D con PCA
y clusteriza con KMeans sobre EXACTAMENTE la misma matriz escalada, de modo que
la proyeccion 2D (`points`) y la etiqueta de cluster (`labels`) quedan alineadas
fila a fila. Es la pieza que `pca_explained` + `kmeans_segments` no cubren: esas
dos estandarizan por separado y descartan los labels, asi que sus salidas no se
pueden cruzar para pintar un scatter PCA coloreado por cluster. Determinista.
"""
import math
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
def project_clusters_2d(
columns: dict,
k_min: int = 2,
k_max: int = 8,
max_points: int = 2000,
) -> dict:
"""Proyecta a 2D (PCA) y clusteriza (KMeans) el mismo subset estandarizado.
PCA a 2D y KMeans se ajustan sobre la MISMA matriz estandarizada, por lo que
`points` (proyeccion 2D) y `labels` (cluster por fila) quedan alineados por
indice. El k se elige automaticamente por silhouette en el rango
[k_min, min(k_max, n_rows-1)], igual criterio que `kmeans_segments`.
Determinista: StandardScaler + PCA(random_state=0) + KMeans(random_state=0,
n_init=10).
Args:
columns: mapa {nombre_columna: [valores numericos]}. Listas alineadas por
fila (misma longitud). Columnas no numericas o con menos de 2 valores
distintos se descartan. None/NaN marcan filas a descartar listwise
(una fila se elimina si cualquier feature falta).
k_min: numero minimo de clusters a probar (default 2).
k_max: numero maximo de clusters a probar (default 8). Se acota a
min(k_max, n_rows_validas-1).
max_points: tope de puntos devueltos en `points`/`labels`. Si las filas
usadas superan este tope, se submuestrea points y labels CONJUNTAMENTE
con paso determinista para mantenerlos alineados. El fit (best_k,
silhouette, centroides, perfiles) usa SIEMPRE todas las filas.
Returns:
dict con points (proyeccion 2D, posiblemente submuestreada a max_points),
labels (cluster de cada point, alineado con points), centers_2d
(centroides en espacio PCA, len == best_k), best_k, silhouette,
explained_2d (varianza de PC1 y PC2), cluster_sizes (sobre n_used total),
cluster_profiles (ver abajo), feature_names, n_used (filas del fit antes
de muestreo) y note ("" si ok). Cada entrada de cluster_profiles:
{cluster, size, pct, centroid_original (medias en escala original),
centroid_z (z del centroide), distinctive (top 3 features por |z|)}.
Con <2 columnas numericas o <max(3, k_min*2) filas validas devuelve
best_k=0 y note "datos insuficientes" sin lanzar excepcion.
"""
feature_names: list[str] = []
def insufficient(names: list[str], n_used: int) -> dict:
return {
"best_k": 0,
"points": [],
"labels": [],
"centers_2d": [],
"cluster_profiles": [],
"feature_names": names,
"n_used": int(n_used),
"note": "datos insuficientes",
}
try:
if not isinstance(columns, dict) or not columns:
return insufficient([], 0)
# 1. Coerce a numerico, descartando columnas no parseables o constantes.
numeric_cols: dict[str, list] = {}
for name, values in columns.items():
if not isinstance(values, (list, tuple)):
continue
coerced: list[float] = []
usable = True
for v in values:
if v is None:
coerced.append(math.nan)
continue
try:
coerced.append(float(v))
except (TypeError, ValueError):
usable = False
break
if not usable:
continue
# Menos de 2 valores distintos no aporta varianza -> descartar.
distinct = {x for x in coerced if not math.isnan(x)}
if len(distinct) < 2:
continue
numeric_cols[name] = coerced
feature_names = list(numeric_cols.keys())
if len(feature_names) < 2:
return insufficient(feature_names, 0)
# 2. Matriz alineada por fila + listwise deletion (cualquier NaN -> fuera).
matrix = np.array(
[numeric_cols[n] for n in feature_names], dtype=float
).T
valid_mask = ~np.isnan(matrix).any(axis=1)
data = matrix[valid_mask]
n_used = int(data.shape[0])
min_rows = max(3, k_min * 2)
if n_used < min_rows:
return insufficient(feature_names, n_used)
# 3. Estandarizar UNA sola vez (guardamos el scaler para desestandarizar).
scaler = StandardScaler()
X_scaled = scaler.fit_transform(data)
# 4. PCA a 2D sobre la matriz escalada.
pca = PCA(n_components=2, random_state=0)
pca.fit(X_scaled)
proj = pca.transform(X_scaled)
# 5. KMeans con seleccion automatica de k por silhouette (mismo X_scaled).
upper_k = min(k_max, n_used - 1)
if upper_k < k_min:
return insufficient(feature_names, n_used)
best = None # (silhouette, k, model, labels)
for k in range(k_min, upper_k + 1):
model = KMeans(n_clusters=k, n_init=10, random_state=0)
labels_k = model.fit_predict(X_scaled)
if len(set(labels_k)) < 2:
sil = -1.0
else:
sil = float(silhouette_score(X_scaled, labels_k))
if best is None or sil > best[0]:
best = (sil, k, model, labels_k)
best_sil, best_k, best_model, labels = best
# 6. Centroides KMeans (espacio escalado) proyectados al espacio PCA.
centers_2d = pca.transform(best_model.cluster_centers_)
# 7. Perfiles por cluster sobre TODAS las filas usadas.
centroids_original = scaler.inverse_transform(best_model.cluster_centers_)
cluster_sizes: list[int] = []
cluster_profiles: list[dict] = []
for c in range(best_k):
size = int(np.sum(labels == c))
cluster_sizes.append(size)
z_vec = best_model.cluster_centers_[c]
orig_vec = centroids_original[c]
centroid_z = {
feature_names[j]: float(z_vec[j]) for j in range(len(feature_names))
}
centroid_original = {
feature_names[j]: float(orig_vec[j])
for j in range(len(feature_names))
}
order = np.argsort(np.abs(z_vec))[::-1]
distinctive = [feature_names[int(j)] for j in order[:3]]
cluster_profiles.append(
{
"cluster": int(c),
"size": size,
"pct": float(size / n_used) if n_used else 0.0,
"centroid_original": centroid_original,
"distinctive": distinctive,
"centroid_z": centroid_z,
}
)
# 8. Muestreo determinista CONJUNTO de points + labels (mantiene alineacion).
note = ""
if n_used > max_points and max_points > 0:
step = math.ceil(n_used / max_points)
proj_out = proj[::step]
labels_out = labels[::step]
note = f"submuestreado a {len(proj_out)} de {n_used} puntos para visualizacion"
else:
proj_out = proj
labels_out = labels
points = [[float(row[0]), float(row[1])] for row in proj_out]
labels_list = [int(v) for v in labels_out]
centers_list = [[float(row[0]), float(row[1])] for row in centers_2d]
explained_2d = [float(x) for x in pca.explained_variance_ratio_]
return {
"points": points,
"labels": labels_list,
"centers_2d": centers_list,
"best_k": int(best_k),
"silhouette": float(best_sil),
"explained_2d": explained_2d,
"cluster_sizes": cluster_sizes,
"cluster_profiles": cluster_profiles,
"feature_names": feature_names,
"n_used": n_used,
"note": note,
}
except Exception:
# Lectura defensiva: nunca propagar excepciones al caller del EDA.
return insufficient(feature_names, 0)
@@ -0,0 +1,127 @@
"""Tests para project_clusters_2d."""
import numpy as np
from project_clusters_2d import project_clusters_2d
def _three_blobs(seed: int = 0, per_blob: int = 50, n_features: int = 4):
"""Genera 3 gaussianas bien separadas en n_features dims, alineadas por fila.
Devuelve un dict {col: [valores]} con las columnas alineadas por fila.
"""
rng = np.random.default_rng(seed)
base_centers = [
np.full(n_features, 0.0),
np.full(n_features, 12.0),
np.array([0.0, 12.0, 0.0, 12.0][:n_features] + [0.0] * max(0, n_features - 4)),
]
rows: list[np.ndarray] = []
for center in base_centers:
pts = rng.normal(loc=center, scale=0.4, size=(per_blob, n_features))
rows.extend(pts)
mat = np.array(rows)
return {f"f{j}": [float(v) for v in mat[:, j]] for j in range(n_features)}
def test_golden_three_blobs_aligned_projection_and_clusters():
columns = _three_blobs(seed=0, per_blob=50, n_features=4)
result = project_clusters_2d(columns, k_min=2, k_max=8)
n_used = result["n_used"]
assert n_used == 150
assert result["note"] == ""
best_k = result["best_k"]
assert 2 <= best_k <= 4
# points y labels alineados por fila.
assert len(result["points"]) == len(result["labels"])
assert len(result["points"]) == n_used # sin submuestreo (150 < 2000)
# Cada punto es un par (x, y).
assert all(len(p) == 2 for p in result["points"])
# Labels dentro del rango [0, best_k).
assert all(0 <= lbl < best_k for lbl in result["labels"])
# Centroides 2D: uno por cluster.
assert len(result["centers_2d"]) == best_k
assert all(len(c) == 2 for c in result["centers_2d"])
# Varianza explicada de los 2 componentes.
assert len(result["explained_2d"]) == 2
# cluster_sizes cubre todas las filas usadas.
assert sum(result["cluster_sizes"]) == n_used
assert len(result["cluster_sizes"]) == best_k
# cluster_profiles: una entrada por cluster, con centroid_original poblado.
assert len(result["cluster_profiles"]) == best_k
for prof in result["cluster_profiles"]:
assert set(prof["centroid_original"].keys()) == set(result["feature_names"])
assert set(prof["centroid_z"].keys()) == set(result["feature_names"])
assert 1 <= len(prof["distinctive"]) <= 3
assert prof["size"] >= 0
assert 0.0 <= prof["pct"] <= 1.0
def test_edge_subsampling_keeps_points_labels_aligned():
# max_points pequeño fuerza submuestreo conjunto de points + labels.
columns = _three_blobs(seed=1, per_blob=50, n_features=3)
result = project_clusters_2d(columns, k_min=2, k_max=6, max_points=40)
n_used = result["n_used"]
assert n_used == 150 # el fit usa todas las filas
# points y labels submuestreados pero siempre con la misma longitud.
assert len(result["points"]) == len(result["labels"])
assert len(result["points"]) <= 40
# centers/sizes/profiles se calculan sobre TODOS los puntos.
assert sum(result["cluster_sizes"]) == n_used
assert len(result["centers_2d"]) == result["best_k"]
assert result["note"] != "" # senala el submuestreo
def test_edge_single_numeric_column_insufficient():
columns = {"x": [float(i) for i in range(50)]}
result = project_clusters_2d(columns, k_min=2, k_max=8)
assert result["best_k"] == 0
assert result["note"] == "datos insuficientes"
assert result["points"] == []
assert result["labels"] == []
assert result["centers_2d"] == []
assert result["cluster_profiles"] == []
def test_edge_too_few_rows_insufficient():
# Solo 2 filas validas, min_rows = max(3, k_min*2) = 4 -> insuficiente.
columns = {"x": [1.0, 5.0], "y": [2.0, 9.0]}
result = project_clusters_2d(columns, k_min=2, k_max=8)
assert result["best_k"] == 0
assert result["note"] == "datos insuficientes"
def test_edge_non_numeric_column_dropped_without_error():
# La columna de strings se descarta; quedan 3 numericas -> funciona.
columns = _three_blobs(seed=2, per_blob=50, n_features=3)
columns["label"] = ["a"] * len(columns["f0"])
result = project_clusters_2d(columns, k_min=2, k_max=6)
assert result["best_k"] >= 2
assert "label" not in result["feature_names"]
assert set(result["feature_names"]) == {"f0", "f1", "f2"}
assert len(result["points"]) == len(result["labels"])
def test_edge_constant_column_dropped():
# Una columna constante (0 varianza) se descarta por <2 valores distintos.
columns = _three_blobs(seed=3, per_blob=50, n_features=3)
columns["const"] = [7.0] * len(columns["f0"])
result = project_clusters_2d(columns, k_min=2, k_max=6)
assert "const" not in result["feature_names"]
assert result["best_k"] >= 2