Compare commits

..

2 Commits

Author SHA1 Message Date
egutierrez 81e8597d21 feat(eda): capitulo MODELOS de AutomaticEDA (markdown, scatter PCA+clusters, micro-LLM)
Implementa chapters/modelos.py (build_modelos / CHAPTER_VERSION) consumiendo
profile['models'] {pca,kmeans,outliers,normality} de run_eda_models. Render
markdown estructurado con bloques anti-corte:

- Intro de normalizacion z-score: por que se estandariza antes de PCA/KMeans (MUST-8.3).
- PCA: scree plot (varianza explicada + acumulada, un solo eje Y) + tablas de
  varianza y cargas principales (SHOULD-8.4).
- Segmentacion KMeans: scatter PCA coloreado por cluster con centroides, en su
  propia pagina/slide (MUST-8.1); tabla de tamaños; micro-analisis LLM por
  cluster con titulo, cada entrada indivisible (MUST-8.2).
- Isolation Forest: explicacion de la deteccion multivariante de outliers y del
  umbral + conteos (MUST-8.3).
- Normalidad: tabla por columna (Jarque-Bera / D'Agostino / Shapiro), pagina sola.

El scatter coloreado y los titulos LLM no estan en el TableProfile, asi que el
capitulo los toma de ctx (cluster_projection precomputado, o raw_numeric para
calcular project_clusters_2d en vivo, o cluster_titles/run_cluster_llm para el
micro-analisis), igual que overview lee head_rows; degrada honesto con una Note
cuando faltan. Devuelve None si el profile no trae bloque models renderizable.

Tests self-contained (sin DuckDB/sklearn/LLM/red): golden PDF+PPTX, edges
(profile None/vacio/insuficiente, kmeans sin proyeccion), anti-corte (tabla de
normalidad de 40 columnas parte repitiendo cabecera sin perder ninguna). 8/8.
Suite del nucleo render_automatic_eda_pdf/pptx sigue verde.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 14:57:43 +02:00
egutierrez 4de071f2f9 feat(eda): project_clusters_2d + describe_clusters_llm para el capitulo MODELOS
project_clusters_2d (pura): PCA(2)+KMeans sobre el MISMO subset estandarizado,
devolviendo proyeccion 2D y labels alineados por fila + centroides en espacio PCA
+ perfiles de cluster desestandarizados. Es la pieza que garantiza la alineacion
points<->labels que pca_explained y kmeans_segments no cubren (estandarizan por
separado y kmeans descarta los labels). Habilita el scatter PCA coloreado por
cluster (MUST-8.1).

describe_clusters_llm (impura): micro-analisis LLM de los clusters en una sola
llamada a ask_llm (grupo claude-direct), devuelve titulo + descripcion por cluster
con degradacion dict-no-throw a titulos genericos si el LLM no responde (MUST-8.2).

Ambas re-exportadas en datascience/__init__.py. Tests: 6/6 y 9/9 (sin red).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 14:57:27 +02:00
17 changed files with 1688 additions and 1493 deletions
+4
View File
@@ -42,6 +42,8 @@ from .isolation_forest_outliers import isolation_forest_outliers
from .normality_tests import normality_tests
from .trend_slope import trend_slope
from .run_eda_models import run_eda_models
from .project_clusters_2d import project_clusters_2d
from .describe_clusters_llm import describe_clusters_llm
from .eda_llm_insights import eda_llm_insights
from .build_eda_notebook import build_eda_notebook
from .decode_qr_image import decode_qr_image
@@ -86,6 +88,8 @@ __all__ = [
"normality_tests",
"trend_slope",
"run_eda_models",
"project_clusters_2d",
"describe_clusters_llm",
"eda_llm_insights",
"build_eda_notebook",
"describe_numeric",
@@ -1,402 +0,0 @@
"""Categorical distributions chapter (CAT DISTR).
Third reference chapter for AutomaticEDA. For every categorical column it shows,
fulfilling the user's request:
1. A short opening explanation of **Shannon entropy** (what it measures, its 0
and log2(k) bounds, the normalized 01 version) and the dataset row total used
as a comparison baseline.
2. Per column, a cardinality key/value table: distinct values, ``% distinct``
(distinct / total rows), total dataset rows, singleton values (frequency 1),
entropy with its theoretical maximum and the normalized ratio, mode, imbalance
and string-length stats.
3. A short note flagging problematic cardinality (id-like ≈100% distinct, or a
single dominating category).
4. A ``top-k`` table (value / count / %).
5. A **donut pie chart** of the most common categories (top-k + an "Otros"
bucket), drawn lazily so the renderers scale it to fit entirely.
Data comes from the ``eda`` group: each ``columns[i]['categorical']`` is the
output of ``summarize_categorical`` (``top[{value,count,pct}]``, ``mode``,
``n_distinct``, ``entropy``, ``imbalance``, ``len_min/mean/max``). The derived
cardinality metrics and the pie figure are delegated to two registry functions
(``categorical_cardinality_block`` and ``categorical_top_pie_figure``); both are
imported lazily and degrade to a minimal inline fallback so this chapter never
raises even if they are unavailable.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
"""
from __future__ import annotations
import math
from .. import model
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "cat_distr"
CHAPTER_TITLE = "Distribuciones categóricas"
# Cap the number of categorical columns rendered to keep the document bounded;
# the rest are summarized in a closing note (no silent truncation).
MAX_COLS = 40
# Rows shown in each top-k table and explicit slices in the pie.
TOP_TABLE_ROWS = 15
PIE_TOP_K = 6
# Truncate very long category labels in tables (the renderer also wraps).
LABEL_MAX = 48
def _fmt_int(value) -> str:
if value is None:
return ""
try:
return f"{int(value):,}".replace(",", ".")
except (TypeError, ValueError):
return str(value)
def _fmt_num(value, decimals: int = 3) -> str:
if value is None:
return ""
if isinstance(value, bool):
return str(value)
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return str(value)
def _fmt_pct_value(value, decimals: int = 1) -> str:
"""Format an already-in-percent value (0100). None -> placeholder."""
if value is None:
return ""
try:
return f"{float(value):.{decimals}f}%"
except (TypeError, ValueError):
return str(value)
def _pct_from_maybe_fraction(value, decimals: int = 1) -> str:
"""Format a percentage that may arrive as a 01 fraction or a 0100 number."""
if value is None:
return ""
try:
v = float(value)
except (TypeError, ValueError):
return str(value)
if v <= 1.0:
v *= 100.0
return f"{v:.{decimals}f}%"
def _truncate(text: str, limit: int = LABEL_MAX) -> str:
s = model._safe_str(text)
if len(s) <= limit:
return s
return s[: max(1, limit - 1)].rstrip() + ""
def _is_categorical(col: dict) -> bool:
"""A column is treated as categorical when it carries a non-empty top list
and is not a pure numeric column (numeric columns may still expose a top)."""
if not isinstance(col, dict):
return False
cat = col.get("categorical")
if not (isinstance(cat, dict) and cat.get("top")):
return False
if col.get("inferred_type") == "numeric":
return False
return True
def _cardinality(cat: dict, n_rows) -> dict:
"""Derive cardinality metrics for a column, via the registry function when
available, otherwise a minimal inline fallback. Never raises."""
try:
from datascience.categorical_cardinality_block import (
categorical_cardinality_block,
)
out = categorical_cardinality_block(cat=cat, n_rows=n_rows)
if isinstance(out, dict):
return out
except Exception: # noqa: BLE001 — fall back to the inline derivation.
pass
return _fallback_cardinality(cat, n_rows)
def _fallback_cardinality(cat: dict, n_rows) -> dict:
cat = cat or {}
top = cat.get("top") or []
n_distinct = cat.get("n_distinct")
entropy = cat.get("entropy")
try:
nr = int(n_rows) if n_rows is not None else None
except (TypeError, ValueError):
nr = None
pct_distinct = None
if isinstance(n_distinct, (int, float)) and nr:
pct_distinct = float(n_distinct) / nr * 100.0
entropy_max = None
if isinstance(n_distinct, (int, float)):
entropy_max = math.log2(n_distinct) if n_distinct > 1 else 0.0
entropy_norm = None
if isinstance(entropy, (int, float)) and entropy_max:
entropy_norm = max(0.0, min(1.0, float(entropy) / entropy_max))
mode_pct = cat.get("mode_pct")
if mode_pct is None and top and isinstance(top[0], dict):
mode_pct = top[0].get("pct")
# Normalize to a 0100 scale: summarize_categorical emits a 01 fraction.
if isinstance(mode_pct, (int, float)) and not isinstance(mode_pct, bool):
mode_pct = float(mode_pct) * 100.0 if mode_pct <= 1.0 else float(mode_pct)
else:
mode_pct = None
n_singletons = None
if top:
n_singletons = sum(
1 for t in top if isinstance(t, dict) and t.get("count") == 1)
return {
"n_distinct": n_distinct,
"n_rows": nr,
"pct_distinct": pct_distinct,
"entropy": entropy,
"entropy_max": entropy_max,
"entropy_norm": entropy_norm,
"mode": cat.get("mode"),
"mode_pct": mode_pct,
"imbalance": cat.get("imbalance"),
"n_singletons": n_singletons,
"n_singletons_partial": (
isinstance(n_distinct, (int, float)) and n_distinct > len(top)),
"len_min": cat.get("len_min"),
"len_mean": cat.get("len_mean"),
"len_max": cat.get("len_max"),
"id_like": pct_distinct is not None and pct_distinct >= 99.0,
"dominated": mode_pct is not None and mode_pct >= 90.0,
}
def _pie_make(top, n_distinct, title, n_rows):
"""Return a zero-arg callable that builds the donut figure lazily."""
def make():
try:
from datascience.categorical_top_pie_figure import (
categorical_top_pie_figure,
)
return categorical_top_pie_figure(
top=top, n_distinct=n_distinct or 0, title=title,
top_k=PIE_TOP_K, n_rows=n_rows)
except Exception: # noqa: BLE001 — minimal local fallback figure.
return _fallback_pie(top, title)
return make
def _fallback_pie(top, title):
"""Minimal donut figure used only if the registry function is unavailable."""
import matplotlib
matplotlib.use("Agg")
from matplotlib.figure import Figure
fig = Figure(figsize=(5.0, 3.2))
ax = fig.add_subplot(111)
items = [t for t in (top or [])
if isinstance(t, dict) and isinstance(t.get("count"), (int, float))]
items = sorted(items, key=lambda t: t.get("count") or 0, reverse=True)
head = items[:PIE_TOP_K]
rest = items[PIE_TOP_K:]
labels = [_truncate(t.get("value"), 20) for t in head]
sizes = [float(t.get("count") or 0) for t in head]
if rest:
labels.append(f"Otros ({len(rest)})")
sizes.append(sum(float(t.get("count") or 0) for t in rest))
if not sizes or sum(sizes) <= 0:
ax.text(0.5, 0.5, "sin datos categóricos", ha="center", va="center")
ax.axis("off")
return fig
ax.pie(sizes, labels=None, wedgeprops={"width": 0.42},
autopct=lambda p: f"{p:.0f}%" if p >= 4 else "")
ax.legend(labels, loc="center left", bbox_to_anchor=(1.0, 0.5),
fontsize=7, frameon=False)
ax.set_title(_truncate(title, 40))
fig.tight_layout()
return fig
def _normalize_card(card: dict) -> dict:
"""Make the cardinality dict robust regardless of the upstream scale.
``summarize_categorical`` emits ``mode_pct`` as a 01 fraction; bring it to a
0100 scale and recompute the ``dominated`` flag here so the chapter is
correct whether it consumed the registry function or the inline fallback.
"""
card = dict(card or {})
mp = card.get("mode_pct")
if isinstance(mp, (int, float)) and not isinstance(mp, bool):
mp = float(mp) * 100.0 if mp <= 1.0 else float(mp)
else:
mp = None
card["mode_pct"] = mp
card["dominated"] = mp is not None and mp >= 90.0
pd = card.get("pct_distinct")
card["id_like"] = isinstance(pd, (int, float)) and pd >= 99.0
return card
def _cardinality_block(card: dict):
"""KVTable with the cardinality / entropy metrics for one column."""
n_singletons = card.get("n_singletons")
if n_singletons is not None and card.get("n_singletons_partial"):
singletons = f"{_fmt_int(n_singletons)} (en top mostrado)"
elif n_singletons is not None:
singletons = _fmt_int(n_singletons)
else:
singletons = ""
entropy_ref = _fmt_num(card.get("entropy"))
emax = card.get("entropy_max")
if emax is not None:
entropy_ref = f"{entropy_ref} (máx {_fmt_num(emax)})"
mode = card.get("mode")
mode_pct = card.get("mode_pct")
mode_str = "" if mode is None else model._safe_str(mode)
if mode is not None and mode_pct is not None:
mode_str = f"{mode_str} ({_fmt_pct_value(mode_pct)})"
rows = [
("Valores distintos", _fmt_int(card.get("n_distinct"))),
("% distintos", _fmt_pct_value(card.get("pct_distinct"))),
("Total filas (dataset)", _fmt_int(card.get("n_rows"))),
("Valores únicos (frecuencia 1)", singletons),
("Entropía (bits)", entropy_ref),
("Entropía normalizada (01)", _fmt_num(card.get("entropy_norm"))),
("Moda", mode_str),
]
imbalance = card.get("imbalance")
if imbalance is not None:
rows.append(("Desbalance", _fmt_num(imbalance)))
lm = card.get("len_min")
lmean = card.get("len_mean")
lmax = card.get("len_max")
if any(v is not None for v in (lm, lmean, lmax)):
rows.append((
"Longitud (mín/media/máx)",
f"{_fmt_num(lm)} / {_fmt_num(lmean)} / {_fmt_num(lmax)}"))
return model.KVTable(rows=rows, title="Cardinalidad")
def _flag_note(card: dict):
"""Return a Note flagging problematic cardinality, or None."""
if card.get("id_like"):
return model.Note(
"Casi todos los valores son distintos (≈100% distintos): la columna "
"se comporta como un identificador y aporta poco para agrupar o "
"comparar categorías.")
if card.get("dominated"):
mp = card.get("mode_pct")
mp_str = _fmt_pct_value(mp) if mp is not None else "muy alta"
return model.Note(
f"Una sola categoría domina la columna (moda {mp_str}): la "
"distribución está muy desbalanceada.")
return None
def _topk_table(cat: dict):
"""DataTable value / count / % for the top categories."""
top = cat.get("top") or []
n_distinct = cat.get("n_distinct")
header = ["Valor", "Conteo", "%"]
rows = []
for t in top[:TOP_TABLE_ROWS]:
if not isinstance(t, dict):
continue
rows.append([
model._safe_str(t.get("value")),
_fmt_int(t.get("count")),
_pct_from_maybe_fraction(t.get("pct")),
])
if not rows:
return None
shown = len(rows)
if isinstance(n_distinct, (int, float)) and n_distinct > shown:
note = f"top {shown} de {_fmt_int(n_distinct)} categorías distintas"
else:
note = f"{shown} categorías"
return model.DataTable(header=header, rows=rows, title="Top categorías",
note=note)
def _intro_blocks(n_rows):
total = _fmt_int(n_rows)
text = (
"La **entropía de Shannon** mide cómo de repartidos están los valores de "
"una columna categórica, en bits. Vale 0 cuando una sola categoría "
"concentra todas las filas (máxima previsibilidad) y alcanza su máximo, "
"log2(k) para k categorías distintas, cuando todas aparecen por igual "
"(máxima diversidad). La **entropía normalizada** (entropía dividida por "
"su máximo) la lleva al rango 01 para comparar columnas con distinto "
"número de categorías. Para cada columna se muestran los valores "
"distintos, el porcentaje que representan sobre el total de filas, los "
"valores únicos (que aparecen una sola vez), la tabla de las categorías "
"más frecuentes y un gráfico de tarta (donut) de las más comunes."
)
if n_rows is not None:
text += f" El dataset tiene {total} filas en total como referencia."
return [
model.Heading(text="Entropía y cardinalidad", level=2),
model.Markdown(text=text),
]
def build_cat_distr(profile: dict, ctx: dict):
"""Build the categorical-distributions Chapter, or None if the dataset has
no categorical columns."""
profile = profile or {}
ctx = ctx or {}
cols = profile.get("columns") or []
cat_cols = [c for c in cols if _is_categorical(c)]
if not cat_cols:
return None
n_rows = profile.get("n_rows")
blocks = list(_intro_blocks(n_rows))
rendered = cat_cols[:MAX_COLS]
for col in rendered:
name = col.get("name") or "(columna)"
cat = col.get("categorical") or {}
card = _normalize_card(_cardinality(cat, n_rows))
blocks.append(model.Heading(text=str(name), level=2))
blocks.append(_cardinality_block(card))
note = _flag_note(card)
if note is not None:
blocks.append(note)
topk = _topk_table(cat)
if topk is not None:
blocks.append(topk)
blocks.append(model.Figure(
make=_pie_make(cat.get("top") or [], card.get("n_distinct"),
str(name), n_rows),
caption=(f"Categorías más comunes de «{_truncate(name, 32)}» "
"(donut: top-k + «Otros»)")))
if len(cat_cols) > len(rendered):
omitted = len(cat_cols) - len(rendered)
blocks.append(model.Note(
f"Se muestran las primeras {len(rendered)} columnas categóricas; "
f"quedan {omitted} sin mostrar para mantener acotado el informe."))
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -1,186 +0,0 @@
"""Tests for the CAT DISTR chapter — DoD: golden + edges + anti-cut.
Self-contained: builds synthetic TableProfiles (no DuckDB) so the suite is fast
and deterministic. Verifies that ``build_cat_distr`` emits the blocks the user
asked for (entropy intro, distinct/total/%-distinct/unique metrics, top-k table
and a donut figure), that the chapter renders inside the full document to both
PDF and PPTX showing that content, that a profile with no categorical columns
yields ``None`` without raising, and that long labels / many columns are never
cut in either output.
"""
import os
import re
import tempfile
from pypdf import PdfReader
from pptx import Presentation
from datascience.automatic_eda.model import (
DataTable, Figure, Heading, KVTable, Note,
)
from datascience.automatic_eda.chapters.cat_distr import (
CHAPTER_ID, CHAPTER_VERSION, build_cat_distr,
)
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
def _profile() -> dict:
return {
"table": "productos",
"source": "/data/productos.csv",
"profiled_at": "2026-06-30T10:00:00+00:00",
"n_rows": 1000,
"n_cols": 3,
"quality_score": 90.0,
"columns": [
{"name": "precio", "inferred_type": "numeric", "null_pct": 0.0,
"null_count": 0,
"numeric": {"mean": 42.5, "median": 40.0, "min": 1.0,
"max": 100.0, "std": 12.3}},
{"name": "categoria", "inferred_type": "categorical",
"null_pct": 0.0, "null_count": 0, "distinct_count": 8,
"categorical": {
"top": [
{"value": "neumaticos", "count": 500, "pct": 0.5},
{"value": "aceite", "count": 300, "pct": 0.3},
{"value": "filtros", "count": 120, "pct": 0.12},
{"value": "frenos", "count": 80, "pct": 0.08},
],
"mode": "neumaticos", "n_distinct": 8, "entropy": 1.6,
"imbalance": 6.25, "len_min": 6, "len_mean": 7.5,
"len_max": 10}},
{"name": "uuid", "inferred_type": "categorical",
"null_pct": 0.0, "null_count": 0, "distinct_count": 1000,
"categorical": {
"top": [{"value": f"id-{i}", "count": 1} for i in range(5)],
"mode": "id-0", "n_distinct": 1000, "entropy": 9.97,
"imbalance": 1.0}},
],
}
def _pdf_text(path: str) -> str:
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
return re.sub(r"\s+", " ", txt)
def _pptx_text(path: str) -> str:
prs = Presentation(path)
parts = []
for sl in prs.slides:
for sh in sl.shapes:
if sh.has_text_frame:
parts.append(sh.text_frame.text)
if sh.has_table:
tb = sh.table
for r in range(len(tb.rows)):
for c in range(len(tb.columns)):
parts.append(tb.cell(r, c).text)
return re.sub(r"\s+", " ", " ".join(parts))
def _kinds(chapter):
return [b.kind for b in chapter.blocks]
def test_golden_build_cat_distr_emite_bloques_pedidos():
ch = build_cat_distr(_profile(), {})
assert ch is not None
assert ch.id == CHAPTER_ID
assert ch.version == CHAPTER_VERSION
kinds = _kinds(ch)
# Entropy intro present.
headings = [b.text for b in ch.blocks if isinstance(b, Heading)]
assert any("Entrop" in h for h in headings)
md = next(b for b in ch.blocks if b.kind == "markdown")
assert "entropía" in md.text.lower() and "log2" in md.text
# Cardinality metrics: distinct, total rows, %-distinct, unique values.
kv = next(b for b in ch.blocks if isinstance(b, KVTable))
labels = [r[0] for r in kv.rows]
assert "Valores distintos" in labels
assert "% distintos" in labels
assert "Total filas (dataset)" in labels
assert "Valores únicos (frecuencia 1)" in labels
assert any("Entropía" in lbl for lbl in labels)
# Top-k table + pie figure.
dt = next(b for b in ch.blocks if isinstance(b, DataTable))
assert dt.header == ["Valor", "Conteo", "%"]
assert any("neumaticos" in str(cell) for row in dt.rows for cell in row)
assert any(isinstance(b, Figure) for b in ch.blocks)
# id-like column flagged with a Note.
assert any(isinstance(b, Note) and "identificador" in b.text
for b in ch.blocks)
def test_golden_render_pdf_muestra_categoricas():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "eda.pdf")
res = render_automatic_eda_pdf(_profile(), out, {"title": "EDA"})
assert res["path"] == out and os.path.exists(out)
assert CHAPTER_ID in [c["id"] for c in res["chapters"]]
txt = _pdf_text(out)
assert "Entrop" in txt
assert "distintos" in txt
assert "categoria" in txt and "neumaticos" in txt
assert "donut" in txt # figure caption rendered as text.
assert "identificador" in txt # id-like note rendered.
def test_golden_render_pptx_muestra_categoricas():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "eda.pptx")
res = render_automatic_eda_pptx(_profile(), out, {"title": "EDA"})
assert res["path"] == out and os.path.exists(out)
assert CHAPTER_ID in [c["id"] for c in res["chapters"]]
txt = _pptx_text(out)
assert "Entrop" in txt
assert "categoria" in txt and "neumaticos" in txt
assert "distintos" in txt
def test_edge_sin_categoricas_devuelve_none():
only_numeric = {
"n_rows": 10, "columns": [
{"name": "x", "inferred_type": "numeric",
"numeric": {"mean": 1.0}}]}
assert build_cat_distr(only_numeric, {}) is None
# None / empty / no-columns never raise and yield None.
assert build_cat_distr(None, None) is None
assert build_cat_distr({}, {}) is None
assert build_cat_distr({"columns": []}, {}) is None
def test_anti_corte_label_largo_y_muchas_columnas():
long_label = ("Lorem ipsum dolor sit amet consectetur adipiscing elit sed "
"do eiusmod tempor incididunt ut labore reprehenderit voluptate")
cols = []
for i in range(30):
cols.append({
"name": f"cat_{i}", "inferred_type": "categorical",
"distinct_count": 3,
"categorical": {
"top": [{"value": long_label, "count": 60},
{"value": "b", "count": 30},
{"value": "c", "count": 10}],
"mode": long_label, "n_distinct": 3, "entropy": 1.2}})
profile = {"table": "t", "source": "t.csv", "n_rows": 100,
"n_cols": len(cols), "columns": cols}
ch = build_cat_distr(profile, {})
assert ch is not None
with tempfile.TemporaryDirectory() as d:
pdf = os.path.join(d, "anti.pdf")
res = render_automatic_eda_pdf(profile, pdf, {"write_manifest": False})
assert res["path"] == pdf
assert res["n_pages"] > 1 # many columns spilled across pages, OK.
txt = _pdf_text(pdf)
# Long label wrapped (not truncated): every word survives.
for word in ("Lorem", "incididunt", "reprehenderit", "voluptate"):
assert word in txt
# PPTX path must not raise either.
pptx = os.path.join(d, "anti.pptx")
res2 = render_automatic_eda_pptx(profile, pptx,
{"write_manifest": False})
assert res2["path"] == pptx and os.path.exists(pptx)
@@ -0,0 +1,498 @@
"""Models chapter (MODELOS) — cheap unsupervised models, rendered as markdown.
Builds the *Modelos* chapter of an AutomaticEDA document from the ``models``
block of a TableProfile (``run_eda_models`` output: ``{pca, kmeans, outliers,
normality}``). It renders, as structured markdown/tables/figures that the core
paginator never cuts:
1. **Normalization note** — every multivariate model below standardizes the
columns with z-score first; the chapter explains why (different scales would
otherwise dominate distance/variance).
2. **PCA** — a scree plot (explained + cumulative variance, single Y axis) plus
variance and top-loadings tables.
3. **KMeans segments** — a PCA scatter **coloured by cluster** (its own
page/slide), the cluster-size table, and a per-cluster LLM micro-analysis
with a title for each segment.
4. **Isolation Forest outliers** — a short explanation of how anomalous rows are
isolated multivariately and how the threshold is chosen, plus the counts.
5. **Normality** — per-column Jarque-Bera / D'Agostino / Shapiro verdicts.
The raw numeric data needed to colour the cluster scatter is **not** in the
TableProfile, so — exactly like ``overview`` reads ``head_rows`` from ``ctx`` —
this chapter looks for the cluster projection / raw numeric columns in ``ctx``
(or in ``profile``) and degrades honestly when they are absent: it falls back to
the uncoloured ``pca.projection`` with a note, or omits the scatter entirely.
ctx keys this chapter consumes (all optional):
cluster_projection : dict — a pre-computed ``project_clusters_2d`` result
(``points``/``labels``/``centers_2d``/``cluster_profiles``/...). Used
directly when present (forward-compatible with the calculation phase).
raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present
and ``cluster_projection`` is not, the chapter calls
``project_clusters_2d`` live to build points + aligned labels.
cluster_titles : list — pre-computed ``[{cluster, title, description}]``
(a ``describe_clusters_llm`` ``clusters`` list). Used for the per-cluster
micro-analysis without an LLM call (offline/tests).
run_cluster_llm : bool — when True and ``cluster_titles`` is absent, call
``describe_clusters_llm`` live on the cluster profiles.
cluster_llm_model : str — model id for the live LLM call.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
"""
from __future__ import annotations
from .. import model
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "modelos"
CHAPTER_TITLE = "Modelos"
# Tableau-10 palette (matplotlib's default cycle) — used both for the matplotlib
# scatter and to keep the legend/colours stable per cluster index.
_CLUSTER_COLORS = [
"#4e79a7", "#f28e2b", "#e15759", "#76b7b2", "#59a14f",
"#edc948", "#b07aa1", "#ff9da7", "#9c755f", "#bab0ac",
]
# --------------------------------------------------------------------------- #
# Formatting helpers (mirror the overview chapter's defensive style).
# --------------------------------------------------------------------------- #
def _fmt_num(value, decimals: int = 3) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "" if value else "no"
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return model._safe_str(value)
def _fmt_pct_ratio(value, decimals: int = 1) -> str:
"""Format a 0..1 ratio as a percentage."""
if value is None:
return ""
try:
return f"{float(value) * 100:.{decimals}f}%"
except (TypeError, ValueError):
return model._safe_str(value)
def _fmt_pct_already(value, decimals: int = 2) -> str:
"""Format a value that is *already* a 0..100 percentage."""
if value is None:
return ""
try:
return f"{float(value):.{decimals}f}%"
except (TypeError, ValueError):
return model._safe_str(value)
def _is_dict(v) -> bool:
return isinstance(v, dict)
# --------------------------------------------------------------------------- #
# Cluster projection: prefer a pre-computed result, else compute it live, else
# fall back to the uncoloured PCA projection.
# --------------------------------------------------------------------------- #
def _resolve_cluster_projection(profile: dict, ctx: dict):
"""Return (projection_dict_or_None, source_label).
Order: ctx/profile['cluster_projection'] (pre-computed) → live
project_clusters_2d on ctx/profile['raw_numeric'] → None.
"""
pre = ctx.get("cluster_projection") or profile.get("cluster_projection")
models = profile.get("models") if _is_dict(profile.get("models")) else {}
if not pre and _is_dict(models):
pre = models.get("cluster_projection")
if _is_dict(pre) and pre.get("points"):
return pre, "precomputed"
raw = ctx.get("raw_numeric") or profile.get("raw_numeric")
if _is_dict(raw) and raw:
try:
# Import the submodule's function explicitly (avoid the package
# attribute shadowing the function with the same-named module).
from datascience.project_clusters_2d import project_clusters_2d
proj = project_clusters_2d(raw)
if _is_dict(proj) and proj.get("points"):
return proj, "live"
except Exception: # noqa: BLE001 — never break the chapter.
return None, "none"
return None, "none"
def _cluster_titles(profile: dict, ctx: dict, projection: dict):
"""Return a list of {cluster, title, description} for the segments.
Order: ctx['cluster_titles'] (pre-computed) → live describe_clusters_llm when
ctx['run_cluster_llm'] and we have cluster_profiles → derived titles from the
distinctive features → None.
"""
pre = ctx.get("cluster_titles")
if isinstance(pre, list) and pre:
return [c for c in pre if _is_dict(c)]
profiles = (projection or {}).get("cluster_profiles") or []
feats = (projection or {}).get("feature_names") or []
if ctx.get("run_cluster_llm") and profiles:
try:
from datascience.describe_clusters_llm import describe_clusters_llm
out = describe_clusters_llm(
profiles, feats,
model=ctx.get("cluster_llm_model", "claude-haiku-4-5-20251001"))
clusters = (out or {}).get("clusters")
if isinstance(clusters, list) and clusters:
return [c for c in clusters if _is_dict(c)]
except Exception: # noqa: BLE001
pass
# Derived fallback: name each cluster by its distinctive features.
if profiles:
derived = []
for p in profiles:
if not _is_dict(p):
continue
cid = p.get("cluster", len(derived))
dist = p.get("distinctive") or []
label = ", ".join(model._safe_str(d) for d in dist[:2]) if dist else ""
title = f"Segmento {cid}" + (f"{label}" if label else "")
derived.append({"cluster": cid, "title": title, "description": ""})
if derived:
return derived
return None
# --------------------------------------------------------------------------- #
# Figure builders (lazy: matplotlib only imported when the renderer draws them).
# --------------------------------------------------------------------------- #
def _make_scree(pca: dict):
"""Return a zero-arg callable drawing the PCA scree plot, or None."""
evr = pca.get("explained_variance_ratio") or []
cum = pca.get("cumulative") or []
if not evr:
return None
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
comps = list(range(1, len(evr) + 1))
fig, ax = plt.subplots(figsize=(7.0, 4.2))
ax.bar(comps, evr, color="#4e79a7", alpha=0.85,
label="Varianza explicada")
if cum:
ax.plot(comps[:len(cum)], cum, color="#e15759", marker="o",
linewidth=1.8, label="Acumulada")
ax.set_xlabel("Componente principal")
ax.set_ylabel("Proporción de varianza")
ax.set_xticks(comps)
ax.set_ylim(0, 1.0)
ax.grid(axis="y", color="#dddddd", linewidth=0.6)
ax.legend(loc="best", fontsize=8, frameon=False)
ax.set_title("Varianza explicada por componente (PCA)", fontsize=10)
fig.tight_layout()
return fig
return _draw
def _make_cluster_scatter(projection: dict):
"""Return a zero-arg callable drawing the cluster scatter, or None."""
points = projection.get("points") or []
labels = projection.get("labels") or []
if not points or len(points) != len(labels):
return None
centers = projection.get("centers_2d") or []
explained = projection.get("explained_2d") or []
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(7.0, 5.2))
uniq = sorted(set(int(l) for l in labels))
for cl in uniq:
xs = [p[0] for p, l in zip(points, labels) if int(l) == cl]
ys = [p[1] for p, l in zip(points, labels) if int(l) == cl]
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
ax.scatter(xs, ys, s=14, c=color, alpha=0.7, linewidths=0,
label=f"Cluster {cl} (n={len(xs)})")
for cl, c in enumerate(centers):
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
ax.scatter([c[0]], [c[1]], s=180, c=color, marker="X",
edgecolors="black", linewidths=1.2, zorder=5)
xlab, ylab = "PC1", "PC2"
if len(explained) >= 2:
xlab = f"PC1 ({_fmt_pct_ratio(explained[0])} var.)"
ylab = f"PC2 ({_fmt_pct_ratio(explained[1])} var.)"
ax.set_xlabel(xlab)
ax.set_ylabel(ylab)
ax.set_title("Segmentos KMeans proyectados sobre el plano PCA",
fontsize=10)
ax.grid(color="#eeeeee", linewidth=0.5)
ax.legend(loc="best", fontsize=8, frameon=True, framealpha=0.9)
fig.tight_layout()
return fig
return _draw
# --------------------------------------------------------------------------- #
# Section builders. Each returns a list of blocks (possibly empty).
# --------------------------------------------------------------------------- #
def _normalization_intro() -> list:
text = (
"Estos modelos son **no supervisados**: buscan estructura latente sin "
"una variable objetivo. Antes de aplicarlos, todas las columnas "
"numéricas se **estandarizan con z-score** (cada valor menos la media, "
"dividido por la desviación típica). Sin esta normalización, una "
"variable con escala grande (p.ej. ingresos en euros) dominaría las "
"distancias y la varianza frente a otra de escala pequeña (p.ej. un "
"ratio entre 0 y 1), sesgando tanto el PCA como el KMeans. Tras la "
"estandarización todas las variables pesan por igual."
)
return [model.Heading(text="Modelos no supervisados", level=1),
model.Markdown(text=text)]
def _pca_section(pca: dict) -> list:
if not _is_dict(pca) or not pca.get("explained_variance_ratio"):
return []
blocks = [model.Heading(text="PCA — varianza explicada", level=2)]
n_used = pca.get("n_rows_used")
n_feat = pca.get("n_features")
intro = (
f"El PCA resume {_fmt_num(n_feat)} variables numéricas en componentes "
f"ortogonales ordenados por la varianza que capturan "
f"({_fmt_num(n_used)} filas usadas tras eliminar nulos). El gráfico de "
"sedimentación (scree) muestra cuánta varianza aporta cada componente y "
"su acumulado: un codo marca cuántos componentes bastan."
)
blocks.append(model.Markdown(text=intro))
scree = _make_scree(pca)
if scree is not None:
blocks.append(model.Figure(
make=scree, caption="Varianza explicada y acumulada por componente."))
evr = pca.get("explained_variance_ratio") or []
cum = pca.get("cumulative") or []
rows = []
for i, v in enumerate(evr):
acc = cum[i] if i < len(cum) else None
rows.append([f"PC{i + 1}", _fmt_pct_ratio(v), _fmt_pct_ratio(acc)])
if rows:
blocks.append(model.DataTable(
header=["Componente", "Varianza", "Acumulada"], rows=rows,
title="Varianza por componente"))
# Top loadings: keep the strongest features per component (capped).
loadings = pca.get("top_loadings") or []
if loadings:
per_comp: dict = {}
for ld in loadings:
if not _is_dict(ld):
continue
comp = ld.get("component")
per_comp.setdefault(comp, [])
if len(per_comp[comp]) < 4:
per_comp[comp].append(ld)
rows = []
for comp in sorted(per_comp.keys(), key=lambda x: (x is None, x)):
for ld in per_comp[comp]:
rows.append([f"PC{int(comp) + 1}" if comp is not None else "",
model._safe_str(ld.get("feature")),
_fmt_num(ld.get("loading"))])
if rows:
blocks.append(model.DataTable(
header=["Componente", "Variable", "Carga"], rows=rows,
title="Cargas principales (top por componente)",
note="Cargas con mayor valor absoluto: qué variables definen "
"cada eje."))
return blocks
def _kmeans_section(kmeans: dict, projection: dict, titles) -> list:
has_km = _is_dict(kmeans) and kmeans.get("best_k")
has_proj = _is_dict(projection) and projection.get("points")
if not has_km and not has_proj:
return []
blocks = [model.Heading(text="Segmentación (KMeans)", level=2)]
best_k = (projection or {}).get("best_k") or (kmeans or {}).get("best_k")
sil = (projection or {}).get("silhouette")
if sil is None:
sil = (kmeans or {}).get("silhouette")
intro = (
f"KMeans agrupa las filas en **{_fmt_num(best_k)} segmentos** elegidos "
"automáticamente maximizando el coeficiente de *silhouette* "
f"(**{_fmt_num(sil)}**, rango 1 a 1: cuanto más alto, segmentos más "
"compactos y separados). Los segmentos se proyectan sobre el plano de "
"los dos primeros componentes principales para visualizarlos."
)
blocks.append(model.Markdown(text=intro))
if has_proj:
scatter = _make_cluster_scatter(projection)
if scatter is not None:
blocks.append(model.Figure(
make=scatter,
caption="Cada punto es una fila coloreada por su segmento "
"KMeans; las «X» son los centroides."))
else:
blocks.append(model.Note(
"Proyección de clusters no dibujable (puntos y etiquetas "
"desalineados)."))
else:
# We have kmeans stats but no aligned points+labels to colour by.
blocks.append(model.Note(
"Scatter coloreado por segmento no disponible: el perfil no incluye "
"la proyección con etiquetas alineadas (pásala en "
"ctx['cluster_projection'] o las columnas crudas en "
"ctx['raw_numeric'] para colorear el plano PCA)."))
# Cluster sizes table.
sizes = (projection or {}).get("cluster_sizes") or (kmeans or {}).get("cluster_sizes") or []
total = sum(s for s in sizes if isinstance(s, (int, float))) or 0
if sizes:
rows = []
for i, s in enumerate(sizes):
pct = (s / total) if total else None
rows.append([f"Cluster {i}", _fmt_num(s), _fmt_pct_ratio(pct)])
blocks.append(model.DataTable(
header=["Segmento", "Tamaño", "% del total"], rows=rows,
title="Tamaño de cada segmento"))
# Per-cluster LLM micro-analysis (each entry kept indivisible as one block).
if titles:
blocks.append(model.Heading(text="Interpretación de los segmentos",
level=3))
for t in titles:
if not _is_dict(t):
continue
cid = t.get("cluster")
title = model._safe_str(t.get("title")) or f"Cluster {cid}"
desc = model._safe_str(t.get("description"))
line = f"**Cluster {cid}{title}.**"
if desc:
line += " " + desc
blocks.append(model.Markdown(text=line))
return blocks
def _outliers_section(outliers: dict) -> list:
if not _is_dict(outliers) or outliers.get("n_outliers") is None:
return []
if outliers.get("note") and not outliers.get("n_rows_used"):
# insufficient data — nothing meaningful to show.
return []
blocks = [model.Heading(text="Detección de anomalías (Isolation Forest)",
level=2)]
explain = (
"**Isolation Forest** detecta filas anómalas de forma *multivariante*: "
"construye árboles que parten el espacio con cortes aleatorios y mide "
"cuántos cortes hacen falta para aislar cada fila. Las filas raras "
"(combinaciones de valores poco frecuentes considerando **todas las "
"columnas a la vez**, no una sola) se aíslan con muy pocos cortes y "
"obtienen un score bajo. El **umbral** de decisión separa las filas "
"normales de las anómalas según la contaminación esperada del modelo: "
"una fila es outlier cuando su score queda por debajo de ese umbral."
)
blocks.append(model.Markdown(text=explain))
blocks.append(model.KVTable(rows=[
("Filas analizadas", _fmt_num(outliers.get("n_rows_used"))),
("Outliers detectados", _fmt_num(outliers.get("n_outliers"))),
("% outliers", _fmt_pct_already(outliers.get("outlier_pct"))),
("Umbral de decisión", _fmt_num(outliers.get("threshold"), 4)),
], title="Anomalías multivariantes"))
return blocks
def _normality_section(normality: dict) -> list:
if not _is_dict(normality) or not normality:
return []
header = ["Columna", "Jarque-Bera (p)", "D'Agostino (p)", "Shapiro (p)",
"¿Normal?"]
rows = []
for col, res in normality.items():
if not _is_dict(res):
continue
jb = res.get("jarque_bera") if _is_dict(res.get("jarque_bera")) else {}
da = res.get("dagostino") if _is_dict(res.get("dagostino")) else {}
sh = res.get("shapiro") if _is_dict(res.get("shapiro")) else {}
is_norm = res.get("is_normal")
if res.get("note") and is_norm is None and not jb:
rows.append([model._safe_str(col), "", "", "",
model._safe_str(res.get("note"))])
continue
rows.append([
model._safe_str(col),
_fmt_num(jb.get("p"), 4) if jb else "",
_fmt_num(da.get("p"), 4) if da else "",
_fmt_num(sh.get("p"), 4) if sh else "",
"" if is_norm else ("no" if is_norm is not None else ""),
])
if not rows:
return []
return [
model.Heading(text="Normalidad de las variables", level=2),
model.Markdown(text=(
"Tests de hipótesis de normalidad por columna (hipótesis nula: la "
"muestra proviene de una distribución normal). Se marca **normal** "
"cuando el p-valor supera 0,05 (no se rechaza la nula). Pocas "
"variables reales son estrictamente normales; esto orienta qué "
"transformaciones o tests robustos aplicar después.")),
model.DataTable(header=header, rows=rows,
title="Pruebas de normalidad"),
]
# --------------------------------------------------------------------------- #
# Entry point.
# --------------------------------------------------------------------------- #
def build_modelos(profile: dict, ctx: dict):
"""Build the MODELOS Chapter, or None if there are no models to show."""
profile = profile or {}
ctx = ctx or {}
if not isinstance(profile, dict):
return None
models = profile.get("models")
if not _is_dict(models):
return None
pca = models.get("pca") if _is_dict(models.get("pca")) else None
kmeans = models.get("kmeans") if _is_dict(models.get("kmeans")) else None
outliers = models.get("outliers") if _is_dict(models.get("outliers")) else None
normality = models.get("normality") if _is_dict(models.get("normality")) else None
projection, _src = _resolve_cluster_projection(profile, ctx)
titles = _cluster_titles(profile, ctx, projection) if (
(kmeans and kmeans.get("best_k")) or (projection and projection.get("points"))
) else None
sections = []
sections += _pca_section(pca) if pca else []
sections += _kmeans_section(kmeans, projection, titles)
sections += _outliers_section(outliers) if outliers else []
sections += _normality_section(normality) if normality else []
if not sections:
return None # models block present but nothing renderable.
blocks = _normalization_intro() + sections
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -0,0 +1,259 @@
"""Tests for the MODELOS chapter — DoD: golden + edges + anti-cut.
Self-contained: builds a synthetic TableProfile with a ``models`` block (no
DuckDB, no sklearn, no LLM, no network). The cluster scatter is fed a synthetic
pre-computed ``cluster_projection`` via ``ctx`` and the per-cluster titles via
``ctx['cluster_titles']`` so the suite is fast and deterministic. The live paths
(``project_clusters_2d`` / ``describe_clusters_llm``) are exercised against the
real wine dataset in the work report, not here.
Verifies: the chapter renders to PDF *and* PPTX showing the user-required pieces
(markdown text, PCA scree, cluster scatter, per-cluster LLM micro-analysis,
outlier + normalization explanations); that an inapplicable profile yields None
without raising; and that a long normality table is split without losing any
column (anti-cut).
"""
import os
import re
import tempfile
from pypdf import PdfReader
from pptx import Presentation
from datascience.automatic_eda.chapters.modelos import build_modelos
from datascience.automatic_eda.model import Figure, DataTable, Markdown
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
# --------------------------------------------------------------------------- #
# Synthetic fixtures.
# --------------------------------------------------------------------------- #
def _models_block(n_norm_cols: int = 4) -> dict:
feats = ["fixed_acidity", "alcohol", "ph", "sulphates"]
normality = {}
for i in range(n_norm_cols):
normality[f"col_{i}"] = {
"n": 500,
"jarque_bera": {"stat": 12.3, "p": 0.002 + i * 0.0001, "normal": False},
"dagostino": {"stat": 9.1, "p": 0.01, "normal": False},
"shapiro": {"stat": 0.98, "p": 0.04, "normal": False},
"is_normal": False,
}
return {
"n_numeric_cols": 4,
"pca": {
"n_components": 2, "n_rows_used": 1599, "n_features": 4,
"explained_variance_ratio": [0.41, 0.22],
"cumulative": [0.41, 0.63],
"top_loadings": [
{"component": 0, "feature": "alcohol", "loading": 0.62},
{"component": 0, "feature": "fixed_acidity", "loading": -0.48},
{"component": 1, "feature": "ph", "loading": 0.71},
{"component": 1, "feature": "sulphates", "loading": 0.33},
],
"projection": [[0.1, 0.2], [0.3, -0.1]],
},
"kmeans": {
"best_k": 3, "silhouette": 0.27,
"scores_by_k": [{"k": 2, "silhouette": 0.21}, {"k": 3, "silhouette": 0.27}],
"cluster_sizes": [700, 500, 399],
"centers": [[0.1, 0.2, 0.3, 0.4]],
"n_rows_used": 1599, "n_features": 4,
},
"outliers": {
"n_outliers": 80, "outlier_pct": 5.0, "threshold": -0.0123,
"n_rows_used": 1599,
},
"normality": normality,
"note": "",
"_feats": feats,
}
def _cluster_projection() -> dict:
# 30 points across 3 clusters, aligned points<->labels.
points, labels = [], []
centers = [(-2.0, -2.0), (2.0, 0.0), (0.0, 2.5)]
for cl, (cx, cy) in enumerate(centers):
for j in range(10):
points.append([cx + (j - 5) * 0.05, cy + (j - 5) * 0.05])
labels.append(cl)
return {
"points": points, "labels": labels,
"centers_2d": [list(c) for c in centers],
"best_k": 3, "silhouette": 0.27,
"explained_2d": [0.41, 0.22],
"cluster_sizes": [10, 10, 10],
"cluster_profiles": [
{"cluster": 0, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 9.5, "ph": 3.5},
"distinctive": ["alcohol", "ph"], "centroid_z": {"alcohol": -1.2}},
{"cluster": 1, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 12.0, "ph": 3.1},
"distinctive": ["alcohol"], "centroid_z": {"alcohol": 1.4}},
{"cluster": 2, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 10.5, "ph": 3.8},
"distinctive": ["ph"], "centroid_z": {"ph": 1.6}},
],
"feature_names": ["alcohol", "ph", "fixed_acidity", "sulphates"],
"n_used": 1599, "note": "",
}
def _ctx_full() -> dict:
return {
"cluster_projection": _cluster_projection(),
"cluster_titles": [
{"cluster": 0, "title": "Vinos suaves de baja graduación",
"description": "Alcohol bajo y pH alto; perfil ligero."},
{"cluster": 1, "title": "Vinos potentes",
"description": "Alta graduación alcohólica."},
{"cluster": 2, "title": "Vinos de pH elevado",
"description": "Acidez baja relativa al resto."},
],
}
def _profile() -> dict:
return {"table": "wine", "n_rows": 1599, "n_cols": 12,
"models": _models_block()}
def _pdf_text(path: str) -> str:
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
return re.sub(r"\s+", " ", txt)
def _pptx_text(path: str) -> str:
prs = Presentation(path)
out = []
for slide in prs.slides:
for shape in slide.shapes:
if shape.has_text_frame:
out.append(shape.text_frame.text)
return re.sub(r"\s+", " ", " ".join(out))
# --------------------------------------------------------------------------- #
# Golden.
# --------------------------------------------------------------------------- #
def test_golden_build_modelos_bloques_requeridos():
ch = build_modelos(_profile(), _ctx_full())
assert ch is not None
assert ch.id == "modelos" and ch.version
# Both figures present: scree plot + cluster scatter.
n_figures = sum(1 for b in ch.blocks if isinstance(b, Figure))
assert n_figures >= 2
# Tables present (variance, loadings, sizes, normality).
assert sum(1 for b in ch.blocks if isinstance(b, DataTable)) >= 3
# Markdown carries the required explanations.
md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown))
assert "z-score" in md # normalization explained
assert "Isolation Forest" in md # outlier generation explained
assert "silhouette" in md # kmeans
# Per-cluster micro-analysis titles present.
assert "Vinos potentes" in md
assert "Cluster 1" in md
def test_golden_render_pdf_muestra_lo_exigido():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "modelos.pdf")
res = render_automatic_eda_pdf(
_profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()})
assert res["path"] == out and os.path.exists(out)
ids = [c["id"] for c in res["chapters"]]
assert "modelos" in ids
txt = _pdf_text(out)
for needle in ("Modelos no supervisados", "z-score", "PCA",
"Segmentación", "Isolation Forest", "Normalidad",
"Vinos potentes"):
assert needle in txt, f"falta en PDF: {needle}"
def test_golden_render_pptx_muestra_lo_exigido():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "modelos.pptx")
res = render_automatic_eda_pptx(
_profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()})
assert res["path"] == out and os.path.exists(out)
assert res["n_slides"] >= 1
txt = _pptx_text(out)
for needle in ("Modelos no supervisados", "z-score", "Isolation Forest",
"Vinos potentes"):
assert needle in txt, f"falta en PPTX: {needle}"
# --------------------------------------------------------------------------- #
# Edges.
# --------------------------------------------------------------------------- #
def test_edge_profile_none_o_vacio_devuelve_none():
assert build_modelos(None, {}) is None
assert build_modelos({}, {}) is None
assert build_modelos({"n_rows": 5}, None) is None # no 'models' key
def test_edge_models_insuficiente_devuelve_none():
prof = {"table": "tiny", "models": {
"n_numeric_cols": 1,
"pca": {"n_components": 0, "explained_variance_ratio": [],
"note": "datos insuficientes"},
"kmeans": {"best_k": 0, "note": "datos insuficientes"},
"outliers": {"n_outliers": 0, "note": "datos insuficientes"},
"normality": None,
"note": "insuficientes columnas numericas para modelos multivariantes",
}}
assert build_modelos(prof, {}) is None
def test_edge_solo_normalidad_si_genera_capitulo():
# A single numeric column: only normality applies. Chapter must still build.
prof = {"table": "one", "models": {
"n_numeric_cols": 1, "pca": None, "kmeans": None, "outliers": None,
"normality": {"x": {"n": 500, "jarque_bera": {"stat": 1.0, "p": 0.2,
"normal": True}, "dagostino": {"stat": 1.0, "p": 0.3,
"normal": True}, "shapiro": {"stat": 0.99, "p": 0.4,
"normal": True}, "is_normal": True}},
}}
ch = build_modelos(prof, {})
assert ch is not None
md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown))
assert "z-score" in md # normalization intro still present
def test_edge_kmeans_sin_proyeccion_degrada_sin_romper():
# kmeans stats present but no cluster_projection / raw_numeric to colour by.
prof = _profile()
ch = build_modelos(prof, {}) # no ctx projection
assert ch is not None
# No scatter figure for clusters, but a Note explaining the degradation.
notes = [b.text for b in ch.blocks if b.kind == "note"]
assert any("ctx['raw_numeric']" in n or "cluster_projection" in n
for n in notes)
# PDF still renders fine.
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "deg.pdf")
res = render_automatic_eda_pdf(prof, out, {"write_manifest": False})
assert res["path"] == out and os.path.exists(out)
# --------------------------------------------------------------------------- #
# Anti-cut.
# --------------------------------------------------------------------------- #
def test_anticortes_tabla_normalidad_larga_no_corta():
# 40 numeric columns → the normality DataTable must split across pages,
# repeating the header, without losing any column name.
prof = {"table": "wide", "models": _models_block(n_norm_cols=40)}
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "wide.pdf")
render_automatic_eda_pdf(prof, out, {"write_manifest": False,
"ctx": _ctx_full()})
reader = PdfReader(out)
n_pages = len(reader.pages)
assert n_pages > 1
txt = "".join((pg.extract_text() or "") for pg in reader.pages)
# Every column name survives (wrapped/split, never truncated).
for i in (0, 19, 39):
assert f"col_{i}" in txt
@@ -1,115 +0,0 @@
---
id: categorical_cardinality_block_py_datascience
name: categorical_cardinality_block
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def categorical_cardinality_block(cat: dict, n_rows: int) -> dict"
description: "Deriva métricas de cardinalidad listas para renderizar a partir de la salida de summarize_categorical para UNA columna categórica más el número total de filas. Calcula pct_distinct, entropy_max=log2(n_distinct), entropy_norm (recortada a [0,1]), n_singletons (sobre el top visible) y los flags id_like / dominated. NO recalcula la entropía ni reimplementa summarize_categorical: la consume. Estilo dict-no-throw del grupo eda — nunca lanza."
tags: [eda, categorical, cardinality, entropy, profiling, datascience, pure]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math]
example: |
from categorical_cardinality_block import categorical_cardinality_block
cat = {"top": [{"value": "a", "count": 5, "pct": 0.5}], "mode": "a",
"mode_pct": 0.5, "n_distinct": 4, "entropy": 1.685, "imbalance": 5.0,
"len_min": 1, "len_mean": 1.0, "len_max": 1}
block = categorical_cardinality_block(cat, n_rows=10)
tested: true
tests:
- "test_normal_case"
- "test_empty_cat_does_not_raise"
- "test_none_cat_does_not_raise"
- "test_n_rows_zero_no_zero_division"
- "test_id_like_when_distinct_near_rows"
- "test_dominated_when_mode_pct_high"
- "test_mode_pct_fallback_from_top_fraction"
- "test_n_singletons_partial_when_top_truncated"
- "test_single_distinct_value_entropy_norm_none"
test_file_path: "python/functions/datascience/categorical_cardinality_block_test.py"
file_path: "python/functions/datascience/categorical_cardinality_block.py"
params:
- name: cat
desc: "Dict producido por summarize_categorical para UNA columna categórica. Claves leídas (todas opcionales, lectura defensiva): top (list de {value,count,pct}), mode, mode_pct (puede faltar), n_distinct, entropy (Shannon en bits), imbalance, len_min, len_mean, len_max. None o no-dict se tratan como {}."
- name: n_rows
desc: "Número total de filas del dataset. Usado para pct_distinct. Si es 0 o None, pct_distinct sale None (sin ZeroDivisionError)."
output: "Dict con exactamente 16 claves, todas siempre presentes: n_distinct, n_rows, pct_distinct, entropy, entropy_max, entropy_norm, mode, mode_pct, imbalance, n_singletons, n_singletons_partial, len_min, len_mean, len_max, id_like, dominated. Valores None/False cuando no son derivables; la función nunca lanza. pct_distinct en escala 0-100. entropy_max=log2(n_distinct) (0.0 si n_distinct in {0,1}). entropy_norm=entropy/entropy_max recortada a [0,1]. n_singletons = nº de elementos de top con count==1 (None si top vacío). n_singletons_partial=True si n_distinct>len(top). id_like=pct_distinct>=99. dominated=mode_pct>=90."
---
## Ejemplo
```python
from categorical_cardinality_block import categorical_cardinality_block
# Salida típica de summarize_categorical para una columna, con n_rows del dataset.
cat = {
"top": [
{"value": "a", "count": 5, "pct": 0.5},
{"value": "b", "count": 3, "pct": 0.3},
{"value": "c", "count": 1, "pct": 0.1},
{"value": "d", "count": 1, "pct": 0.1},
],
"mode": "a",
"mode_pct": 0.5,
"n_distinct": 4,
"entropy": 1.685, # Shannon en bits (<= log2(4) = 2.0)
"imbalance": 5.0,
"len_min": 1, "len_mean": 1.0, "len_max": 1,
}
categorical_cardinality_block(cat, n_rows=10)
# {
# "n_distinct": 4, "n_rows": 10,
# "pct_distinct": 40.0, # 4 / 10 * 100
# "entropy": 1.685,
# "entropy_max": 2.0, # log2(4)
# "entropy_norm": 0.8425, # 1.685 / 2.0, recortado a [0,1]
# "mode": "a", "mode_pct": 0.5,
# "imbalance": 5.0,
# "n_singletons": 2, # c y d con count == 1
# "n_singletons_partial": False, # top cubre los 4 distintos
# "len_min": 1, "len_mean": 1.0, "len_max": 1,
# "id_like": False, # pct_distinct 40 < 99
# "dominated": False, # mode_pct 0.5 < 90
# }
```
## Cuando usarla
Úsala justo después de `summarize_categorical`, cuando vayas a renderizar el
bloque de cardinalidad de una columna categórica en un EDA: necesitas el ratio
de valores distintos (`pct_distinct`), la entropía normalizada al rango `[0,1]`
para comparar columnas con cardinalidades distintas, el conteo de singletons, y
las banderas heurísticas `id_like` (la columna parece un identificador) y
`dominated` (una sola categoría domina). Pásale el dict crudo de
`summarize_categorical` para esa columna y el `n_rows` total del dataset. No
reimplementa nada: solo deriva métricas de presentación a partir de lo ya
calculado.
## Gotchas
- **`mode_pct` se pasa tal cual viene en `cat`.** `summarize_categorical`
produce `mode_pct` como **fracción** (01), no como porcentaje. El flag
`dominated` compara `mode_pct >= 90.0`, así que con la salida cruda de
`summarize_categorical` (fracciones) `dominated` no se dispara: aliméntalo con
`mode_pct` en escala 0100 si quieres usar esa bandera. Solo el camino de
*fallback* (cuando `cat` no trae `mode_pct` y se deriva de `top[0]['pct']`)
normaliza una fracción `<= 1` multiplicándola por 100.
- **`n_singletons` solo cubre el `top` visible.** Si `summarize_categorical` se
llamó con `top_k` pequeño, hay valores fuera del top; en ese caso
`n_singletons_partial` es `True` para avisar de que el conteo es parcial.
- **`pct_distinct` es `None` si `n_rows` es 0 o `None`** (no lanza
`ZeroDivisionError`); por tanto `id_like` queda `False` en ese caso.
- **`entropy_norm` es `None` cuando `entropy_max <= 0`** (columna constante,
`n_distinct in {0,1}`): no hay división por cero y no se inventa un 0/1.
- **No recalcula la entropía.** Si `cat['entropy']` es incoherente con
`n_distinct`, `entropy_norm` se recorta a `[0,1]` pero el valor de entrada no
se corrige.
- **`bool` no cuenta como número.** Un `True`/`False` en una clave numérica de
`cat` se trata como ausente (`None`), por la guarda defensiva.
@@ -1,132 +0,0 @@
"""Pure EDA helper: cardinality metrics block from a `summarize_categorical` output.
Part of the `eda` capability group. Consumes the per-column dict produced by
``summarize_categorical`` (for a single categorical/text column) plus the total
row count of the dataset and derives render-ready cardinality metrics: distinct
ratio, normalized entropy, singleton count, and the ``id_like`` / ``dominated``
flags.
It does NOT recompute the entropy nor reimplement ``summarize_categorical`` — it
only reads that function's output. Dict-no-throw style of the `eda` group: it
never raises. Missing or malformed inputs yield ``None``/``False``/``0`` for the
affected keys, never an exception. Stdlib only (``math.log2``).
"""
from math import log2
def _num(value):
"""Return ``value`` unchanged if it is a real (non-bool) number, else ``None``.
``bool`` is rejected on purpose: in Python ``True`` is an ``int`` but it is
never a meaningful count/ratio here.
"""
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
return value
return None
def categorical_cardinality_block(cat: dict, n_rows: int) -> dict:
"""Derive cardinality metrics for one categorical column.
Args:
cat: The per-column dict produced by ``summarize_categorical`` for a
single categorical/text column. Expected (all optional, read
defensively) keys: ``top`` (list of ``{value, count, pct}``),
``mode``, ``mode_pct``, ``n_distinct``, ``entropy`` (Shannon, bits),
``imbalance``, ``len_min``, ``len_mean``, ``len_max``. ``None`` or a
non-dict is treated as ``{}``.
n_rows: Total number of rows in the dataset (used for ``pct_distinct``).
Returns:
Dict with exactly these keys, every one always present:
``n_distinct``, ``n_rows``, ``pct_distinct``, ``entropy``,
``entropy_max``, ``entropy_norm``, ``mode``, ``mode_pct``,
``imbalance``, ``n_singletons``, ``n_singletons_partial``, ``len_min``,
``len_mean``, ``len_max``, ``id_like``, ``dominated``. Values are
``None``/``False`` when not derivable; the function never raises.
"""
cat = cat if isinstance(cat, dict) else {}
# --- passthroughs (numeric-validated, type preserved) ---
n_distinct = _num(cat.get("n_distinct"))
n_rows_out = _num(n_rows)
entropy = _num(cat.get("entropy"))
imbalance = _num(cat.get("imbalance"))
len_min = _num(cat.get("len_min"))
len_mean = _num(cat.get("len_mean"))
len_max = _num(cat.get("len_max"))
mode = cat.get("mode") # any value (or None); passthrough as-is
# --- pct_distinct ---
if n_distinct is None or n_rows_out is None or n_rows_out == 0:
pct_distinct = None
else:
pct_distinct = n_distinct / n_rows_out * 100.0
# --- entropy_max = log2(n_distinct) ---
if n_distinct is None:
entropy_max = None
elif n_distinct > 1:
entropy_max = log2(n_distinct)
else: # n_distinct in {0, 1}
entropy_max = 0.0
# --- entropy_norm = entropy / entropy_max, clipped to [0, 1] ---
if entropy_max is not None and entropy_max > 0 and entropy is not None:
entropy_norm = entropy / entropy_max
entropy_norm = max(0.0, min(1.0, entropy_norm))
else:
entropy_norm = None
# --- mode_pct: prefer cat['mode_pct']; else derive from top[0].pct ---
mode_pct = _num(cat.get("mode_pct"))
top = cat.get("top")
has_top = isinstance(top, (list, tuple)) and len(top) > 0
if mode_pct is None and has_top:
first = top[0]
if isinstance(first, dict):
first_pct = _num(first.get("pct"))
if first_pct is not None:
# Normalize to 0-100: a fraction (<= 1) becomes a percentage.
mode_pct = first_pct * 100.0 if first_pct <= 1 else first_pct
# --- singletons (count == 1) within the visible top ---
if has_top:
n_singletons = sum(
1
for item in top
if isinstance(item, dict) and _num(item.get("count")) == 1
)
else:
n_singletons = None
# The singleton count only covers the visible top; there may be more
# distinct values (and thus more singletons) outside it.
top_len = len(top) if isinstance(top, (list, tuple)) else 0
n_singletons_partial = bool(n_distinct is not None and n_distinct > top_len)
# --- derived flags ---
id_like = pct_distinct is not None and pct_distinct >= 99.0
dominated = mode_pct is not None and mode_pct >= 90.0
return {
"n_distinct": n_distinct,
"n_rows": n_rows_out,
"pct_distinct": pct_distinct,
"entropy": entropy,
"entropy_max": entropy_max,
"entropy_norm": entropy_norm,
"mode": mode,
"mode_pct": mode_pct,
"imbalance": imbalance,
"n_singletons": n_singletons,
"n_singletons_partial": n_singletons_partial,
"len_min": len_min,
"len_mean": len_mean,
"len_max": len_max,
"id_like": id_like,
"dominated": dominated,
}
@@ -1,216 +0,0 @@
"""Tests para categorical_cardinality_block."""
import sys
import os
from math import log2
sys.path.insert(0, os.path.dirname(__file__))
from categorical_cardinality_block import categorical_cardinality_block
# Output contract: every call returns exactly these 16 keys.
EXPECTED_KEYS = {
"n_distinct",
"n_rows",
"pct_distinct",
"entropy",
"entropy_max",
"entropy_norm",
"mode",
"mode_pct",
"imbalance",
"n_singletons",
"n_singletons_partial",
"len_min",
"len_mean",
"len_max",
"id_like",
"dominated",
}
def _sample_cat():
"""A realistic summarize_categorical output for one column."""
return {
"top": [
{"value": "a", "count": 5, "pct": 0.5},
{"value": "b", "count": 3, "pct": 0.3},
{"value": "c", "count": 1, "pct": 0.1},
{"value": "d", "count": 1, "pct": 0.1},
],
"mode": "a",
"mode_pct": 0.5,
"n_distinct": 4,
"entropy": 1.685, # <= log2(4) = 2.0
"imbalance": 5.0,
"len_min": 1,
"len_mean": 1.0,
"len_max": 1,
}
def test_normal_case():
"""Caso normal: pct_distinct, entropy_max=log2(n_distinct), entropy_norm in [0,1], n_singletons."""
cat = _sample_cat()
result = categorical_cardinality_block(cat, n_rows=10)
assert set(result.keys()) == EXPECTED_KEYS
# passthroughs
assert result["n_distinct"] == 4
assert result["n_rows"] == 10
assert result["entropy"] == 1.685
assert result["imbalance"] == 5.0
assert result["mode"] == "a"
assert result["mode_pct"] == 0.5 # passthrough, not normalized
assert result["len_min"] == 1
assert result["len_max"] == 1
# pct_distinct = 4 / 10 * 100
assert abs(result["pct_distinct"] - 40.0) < 1e-12
# entropy_max = log2(4) = 2.0
assert abs(result["entropy_max"] - log2(4)) < 1e-12
assert abs(result["entropy_max"] - 2.0) < 1e-12
# entropy_norm = 1.685 / 2.0 = 0.8425, within [0, 1]
assert abs(result["entropy_norm"] - 1.685 / 2.0) < 1e-12
assert 0.0 <= result["entropy_norm"] <= 1.0
# singletons: c and d have count == 1
assert result["n_singletons"] == 2
# top covers all distinct values (4 == 4)
assert result["n_singletons_partial"] is False
# neither id-like (40%) nor dominated (mode_pct 0.5)
assert result["id_like"] is False
assert result["dominated"] is False
def test_empty_cat_does_not_raise():
"""Caso cat={}: no lanza, claves derivadas None y flags False."""
result = categorical_cardinality_block({}, n_rows=100)
assert set(result.keys()) == EXPECTED_KEYS
for key in (
"n_distinct",
"pct_distinct",
"entropy",
"entropy_max",
"entropy_norm",
"mode",
"mode_pct",
"imbalance",
"n_singletons",
"len_min",
"len_mean",
"len_max",
):
assert result[key] is None
assert result["n_singletons_partial"] is False
assert result["id_like"] is False
assert result["dominated"] is False
# n_rows is a passthrough of the argument, still coherent.
assert result["n_rows"] == 100
def test_none_cat_does_not_raise():
"""Caso cat=None: tratado como {}, mismas garantias que el dict vacio."""
result = categorical_cardinality_block(None, n_rows=None)
assert set(result.keys()) == EXPECTED_KEYS
assert result["n_distinct"] is None
assert result["pct_distinct"] is None
assert result["entropy_max"] is None
assert result["entropy_norm"] is None
assert result["id_like"] is False
assert result["dominated"] is False
def test_n_rows_zero_no_zero_division():
"""Caso n_rows=0: pct_distinct None sin ZeroDivisionError."""
cat = _sample_cat()
result = categorical_cardinality_block(cat, n_rows=0)
assert result["pct_distinct"] is None
# n_distinct still passes through.
assert result["n_distinct"] == 4
assert result["id_like"] is False
def test_id_like_when_distinct_near_rows():
"""id_like True cuando n_distinct ~ n_rows (pct_distinct >= 99)."""
cat = {"n_distinct": 99, "entropy": 6.6, "top": [], "mode": None}
result = categorical_cardinality_block(cat, n_rows=100)
assert abs(result["pct_distinct"] - 99.0) < 1e-12
assert result["id_like"] is True
# exact identity column: 100 / 100 = 100%
cat_full = {"n_distinct": 100, "top": []}
result_full = categorical_cardinality_block(cat_full, n_rows=100)
assert result_full["id_like"] is True
def test_dominated_when_mode_pct_high():
"""dominated True cuando mode_pct alto (>= 90)."""
cat = {
"n_distinct": 3,
"entropy": 0.3,
"mode": "x",
"mode_pct": 95.0,
"top": [
{"value": "x", "count": 95, "pct": 0.95},
{"value": "y", "count": 3, "pct": 0.03},
{"value": "z", "count": 2, "pct": 0.02},
],
"imbalance": 47.5,
}
result = categorical_cardinality_block(cat, n_rows=100)
assert result["mode_pct"] == 95.0
assert result["dominated"] is True
def test_mode_pct_fallback_from_top_fraction():
"""Sin mode_pct: deriva del pct del primer top, fraccion <=1 escala a 0-100."""
cat = {
"n_distinct": 3,
"top": [
{"value": "x", "count": 95, "pct": 0.95},
{"value": "y", "count": 5, "pct": 0.05},
],
}
result = categorical_cardinality_block(cat, n_rows=100)
# 0.95 (fraction) -> 95.0 (percentage)
assert abs(result["mode_pct"] - 95.0) < 1e-12
assert result["dominated"] is True
def test_n_singletons_partial_when_top_truncated():
"""n_distinct > len(top): n_singletons cubre solo el top visible, partial True."""
cat = {
"n_distinct": 10,
"top": [
{"value": "a", "count": 4, "pct": 0.4},
{"value": "b", "count": 1, "pct": 0.1},
{"value": "c", "count": 1, "pct": 0.1},
],
"entropy": 2.5,
}
result = categorical_cardinality_block(cat, n_rows=12)
assert result["n_singletons"] == 2 # only b, c visible
assert result["n_singletons_partial"] is True
def test_single_distinct_value_entropy_norm_none():
"""n_distinct=1: entropy_max=0.0 -> entropy_norm None (no division by zero)."""
cat = {
"n_distinct": 1,
"entropy": 0.0,
"mode": "only",
"mode_pct": 1.0,
"top": [{"value": "only", "count": 7, "pct": 1.0}],
"imbalance": 1.0,
}
result = categorical_cardinality_block(cat, n_rows=7)
assert result["entropy_max"] == 0.0
assert result["entropy_norm"] is None
assert result["n_singletons"] == 0
@@ -1,108 +0,0 @@
---
id: categorical_top_pie_figure_py_datascience
name: categorical_top_pie_figure
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def categorical_top_pie_figure(top: list, n_distinct: int = 0, title: str = \"\", top_k: int = 6, n_rows=None) -> \"matplotlib.figure.Figure\""
description: "Construye una figura matplotlib tipo donut (pie con agujero central) de las top_k categorías más frecuentes de una columna categórica, agregando el resto en un sector gris \"Otros (N categorías)\". Consume el bloque `top` de summarize_categorical y devuelve un matplotlib.figure.Figure listo para rasterizar por el renderer del informe EDA. Backend Agg sin pyplot global; defensivo ante top vacío/None."
tags: [eda, categorical, pie, donut, matplotlib, figure, visualization, datascience, impure]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [matplotlib]
example: |
from categorical_top_pie_figure import categorical_top_pie_figure
top = [
{"value": "rojo", "count": 40, "pct": 0.4},
{"value": "azul", "count": 30, "pct": 0.3},
{"value": "verde", "count": 20, "pct": 0.2},
]
fig = categorical_top_pie_figure(top, n_distinct=12, title="color", top_k=6, n_rows=100)
tested: true
tests:
- "test_returns_figure"
- "test_ten_items_topk_six_yields_seven_wedges"
- "test_empty_top_does_not_raise_and_returns_figure"
- "test_long_value_truncated_in_legend"
- "test_none_value_and_none_count_are_handled"
- "test_n_rows_adds_exact_others_slice"
test_file_path: "python/functions/datascience/categorical_top_pie_figure_test.py"
file_path: "python/functions/datascience/categorical_top_pie_figure.py"
params:
- name: top
desc: "Lista de dicts {value, count, pct} ordenada de mayor a menor por count (salida del bloque `top` de summarize_categorical). Puede venir vacía o con dicts incompletos: items no-dict, sin count, con count None o count <= 0 se descartan. value None se admite (sin etiqueta)."
- name: n_distinct
desc: "Nº total de categorías distintas de la columna. Etiqueta el sector agregado como \"Otros (n_distinct - top_k)\" (mínimo 0). Si no supera el nº de sectores mostrados, se usa el overflow real de `top` como nº de categorías agregadas. Default 0."
- name: title
desc: "Título de la figura (nombre de la columna). Se trunca a ~48 chars con elipsis si es muy largo. Default \"\" (sin título)."
- name: top_k
desc: "Nº máximo de sectores explícitos. Default 6. El sector \"Otros\" no cuenta contra este límite. Con top_k <= 0 se muestra al menos la categoría mayor."
- name: n_rows
desc: "Opcional. Total de filas del dataset. Si se da y la suma de counts mostrados < n_rows, el sector \"Otros\" usa (n_rows - suma_mostrada) como count para que los ángulos sean exactos respecto al total real. Si se omite, \"Otros\" usa la suma de counts fuera del top_k mostrado (solo cuando top trae más de top_k items). Default None."
output: "Un matplotlib.figure.Figure (figsize 6.4x4.0, dpi 150) con un Axes donut (wedgeprops width 0.42) más una leyenda lateral con value truncado a 20 chars + count; el sector \"Otros\" en gris. Anotación central con el total n. Si no hay counts válidos, devuelve igualmente una Figure con un texto centrado \"sin datos categóricos\" (nunca lanza). El caller rasteriza/cierra la figura; la función no la muestra ni la guarda."
---
## Ejemplo
```python
from categorical_top_pie_figure import categorical_top_pie_figure
# `top` es la salida del bloque "top" de summarize_categorical (ya ordenado desc).
top = [
{"value": "rojo", "count": 40, "pct": 0.40},
{"value": "azul", "count": 30, "pct": 0.30},
{"value": "verde", "count": 20, "pct": 0.20},
{"value": "amarillo", "count": 5, "pct": 0.05},
]
fig = categorical_top_pie_figure(
top,
n_distinct=12, # 12 categorías distintas en total
title="color_producto",
top_k=6, # hasta 6 sectores explícitos
n_rows=100, # "Otros" = 100 - 95 = 5, sobre 8 categorías agregadas
)
# El renderer del informe lo rasteriza; aquí solo persistimos para inspección.
fig.savefig("/tmp/donut_color.png")
```
## Cuando usarla
Úsala dentro de un informe EDA cuando quieras visualizar la composición de una
columna categórica de un vistazo: cuántas filas caen en las categorías
dominantes frente a la cola larga. Pásale directamente el bloque `top` de
`summarize_categorical` (ya ordenado de mayor a menor) más `n_distinct` para que
el sector "Otros" indique cuántas categorías quedan agrupadas. Es la pareja
"composición" del gráfico de barras top-k: el donut comunica proporciones del
total, las barras comunican magnitudes comparables.
## Gotchas
- **Impura por matplotlib.** Toca la maquinaria de render. Usa el backend `Agg`
y la API orientada a objetos `Figure`/`add_subplot` — NUNCA `pyplot.*` aquí,
para no tocar el estado global ni filtrar figuras entre llamadas. `pyplot` NO
es thread-safe; esta función evita ese riesgo construyendo el `Figure`
directamente, así que es segura de llamar en bucle desde el renderer.
- **El caller cierra la figura.** La función devuelve el `Figure` pero no lo
muestra ni lo guarda. Quien la consume debe rasterizarla y luego liberarla
(`fig.clf()` / `matplotlib.pyplot.close(fig)` si se usó pyplot en el caller)
para no acumular memoria en lotes grandes de columnas.
- **Pie engaña con muchos sectores.** Por eso `top_k` por defecto es 6 y el
resto se agrega en "Otros": donuts con 15+ sectores son ilegibles. Para
cardinalidad muy alta el donut solo muestra la cabeza de la distribución; la
cola vive en el sector gris.
- **Ángulos exactos solo con `n_rows`.** Sin `n_rows`, el sector "Otros" se
calcula con el overflow presente en `top`; si `top` ya viene recortado a
`top_k` por el productor, no habrá "Otros" aunque existan más categorías. Pasa
`n_rows` (total de filas del dataset) para ángulos correctos respecto al total
real.
- **Defensiva, nunca lanza.** `top=[]`, `value=None`, `count=None` o counts no
numéricos se manejan sin error: en el peor caso devuelve una `Figure` con
"sin datos categóricos". No envuelvas la llamada en try/except por miedo a un
raise — no lo hay.
@@ -1,230 +0,0 @@
"""Impure EDA helper: donut figure of the most common categories (`eda` group).
Builds a matplotlib donut (pie with a central hole) of the ``top_k`` most
frequent categories of a categorical column, folding everything else into a
single "Otros (N categorías)" slice. Returns a ready-to-rasterize
``matplotlib.figure.Figure``; it never shows nor saves it.
Impure because it touches matplotlib's rendering machinery. It uses the headless
Agg backend and the object-oriented ``Figure`` API (no ``pyplot``) so it leaks no
global state and is safe to call repeatedly from a report renderer.
"""
import matplotlib
matplotlib.use("Agg")
from matplotlib.figure import Figure # noqa: E402
# Gray reserved for the aggregated "Otros" slice.
_OTHER_COLOR = "#9e9e9e"
# Muted gray for secondary text (title fallback, center annotation, no-data).
_MUTED_TEXT = "#5f6b7a"
# Pleasant, colour-blind-friendly qualitative palette for the explicit slices.
_PALETTE = [
"#4C72B0",
"#DD8452",
"#55A868",
"#C44E52",
"#8172B3",
"#937860",
"#DA8BC3",
"#8C8C8C",
"#CCB974",
"#64B5CD",
]
def _truncate(text, width: int = 20) -> str:
"""Truncate ``text`` to ``width`` chars, appending an ellipsis if cut."""
s = "" if text is None else str(text)
if len(s) <= width:
return s
if width <= 1:
return s[:width]
return s[: width - 1] + ""
def categorical_top_pie_figure(
top: list,
n_distinct: int = 0,
title: str = "",
top_k: int = 6,
n_rows=None,
) -> "matplotlib.figure.Figure":
"""Build a donut figure of the most common categories of a column.
Renders the ``top_k`` most frequent categories as explicit donut slices and
aggregates every remaining category into a single gray "Otros (N
categorías)" slice. Category names are not painted on the wedges; they are
listed in a lateral legend (truncated value + count) to avoid overlap on
narrow (mobile) figures.
The function is fully defensive: empty input, missing/``None`` values or
counts never raise. When there is nothing valid to draw it still returns a
``Figure`` carrying a centered "sin datos categóricos" message.
Args:
top: List of ``{value, count, pct}`` dicts, already sorted by ``count``
descending (the ``top`` block of ``summarize_categorical``). May be
empty or carry incomplete/``None`` entries; non-dict items, items
without a positive numeric ``count`` and ``None`` counts are skipped.
n_distinct: Total number of distinct categories in the column. Used to
label the aggregated slice as "Otros (n_distinct - top_k)" (floored
at 0). Ignored when it does not exceed the number of shown slices.
title: Figure title (the column name). Truncated when too long.
top_k: Maximum number of explicit slices. Default 6. The "Otros" slice
does not count against this limit.
n_rows: Optional total row count of the dataset. When given and the sum
of shown counts is below ``n_rows``, the "Otros" slice uses
``n_rows - sum_shown`` as its count so the wedge angles are exact
with respect to the real total. When omitted, "Otros" uses the sum
of the counts that fall outside the shown ``top_k`` (only when
``top`` carries more than ``top_k`` items).
Returns:
A ``matplotlib.figure.Figure`` with a single donut Axes plus a lateral
legend. The caller is responsible for rasterizing/closing it.
"""
fig = Figure(figsize=(6.4, 4.0), dpi=150)
ax = fig.add_subplot(111)
safe_title = _truncate(title, 48)
# --- Defensive parse: keep only well-formed {value, count} with count > 0.
cleaned = []
if isinstance(top, list):
for item in top:
if not isinstance(item, dict):
continue
count = item.get("count")
if count is None:
continue
try:
count = float(count)
except (TypeError, ValueError):
continue
if count <= 0:
continue
cleaned.append((item.get("value"), count))
if not cleaned:
ax.axis("off")
ax.text(
0.5,
0.5,
"sin datos categóricos",
ha="center",
va="center",
fontsize=12,
color=_MUTED_TEXT,
transform=ax.transAxes,
)
if safe_title:
ax.set_title(safe_title, fontsize=12, loc="center", pad=8)
fig.tight_layout()
return fig
# --- Split into shown slices and the aggregated remainder.
shown = cleaned[: max(int(top_k), 0)]
if not shown: # top_k <= 0 — show at least the largest category.
shown = cleaned[:1]
sum_shown = sum(c for _, c in shown)
overflow_count = sum(c for _, c in cleaned[len(shown):])
# How many categories are folded into "Otros".
try:
nd = int(n_distinct)
except (TypeError, ValueError):
nd = 0
others_categories = max(nd - len(shown), 0)
# If n_distinct is unknown/too small, fall back to the overflow we actually
# have in `top` beyond the shown slices.
overflow_items = len(cleaned) - len(shown)
if others_categories == 0 and overflow_items > 0:
others_categories = overflow_items
# Count attributed to the "Otros" slice for exact angles.
others_count = 0.0
if n_rows is not None:
try:
total_rows = float(n_rows)
except (TypeError, ValueError):
total_rows = None
if total_rows is not None and total_rows > sum_shown:
others_count = total_rows - sum_shown
if others_count <= 0:
others_count = overflow_count
labels = [v for v, _ in shown]
values = [c for _, c in shown]
colors = [_PALETTE[i % len(_PALETTE)] for i in range(len(shown))]
has_others = others_count > 0 and others_categories > 0
if has_others:
values.append(others_count)
labels.append("Otros")
colors.append(_OTHER_COLOR)
total = sum(values)
def _autopct(pct: float) -> str:
# Hide tiny labels to avoid crowding the wedges.
return f"{pct:.0f}%" if pct >= 5 else ""
wedges, _texts, autotexts = ax.pie(
values,
colors=colors,
startangle=90,
counterclock=False,
wedgeprops={"width": 0.42, "edgecolor": "white", "linewidth": 1.0},
autopct=_autopct,
pctdistance=0.79,
textprops={"fontsize": 8},
)
for at in autotexts:
at.set_color("white")
at.set_fontweight("bold")
ax.set_aspect("equal")
# --- Lateral legend: truncated value + count (+ "(N categorías)" for Otros).
legend_labels = []
for idx, (lab, val) in enumerate(zip(labels, values)):
if has_others and idx == len(labels) - 1:
legend_labels.append(
f"Otros ({others_categories} categorías) — {int(round(val))}"
)
else:
legend_labels.append(f"{_truncate(lab, 20)}{int(round(val))}")
ax.legend(
wedges,
legend_labels,
title="Categorías",
loc="center left",
bbox_to_anchor=(1.02, 0.5),
fontsize=8,
title_fontsize=9,
frameon=False,
)
if safe_title:
ax.set_title(safe_title, fontsize=13, loc="left", pad=10)
# Center annotation: total count covered by the donut.
ax.text(
0,
0,
f"n={int(round(total))}",
ha="center",
va="center",
fontsize=11,
color=_MUTED_TEXT,
fontweight="bold",
)
# Leave room on the right for the legend (avoid clipping it).
fig.subplots_adjust(left=0.02, right=0.62, top=0.88, bottom=0.06)
return fig
@@ -1,104 +0,0 @@
"""Tests para categorical_top_pie_figure (donut de categorías top, grupo eda).
Usa el backend Agg sin pyplot; no muestra ni guarda figuras. Cada test cierra
explícitamente la Figure construida (matplotlib.pyplot.close) para no acumular
estado entre tests.
"""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt # noqa: E402
from matplotlib.figure import Figure # noqa: E402
from categorical_top_pie_figure import categorical_top_pie_figure
def _make_top(n):
"""n items {value, count, pct} ordenados desc por count."""
return [
{"value": f"cat_{i}", "count": n - i, "pct": (n - i) / sum(range(1, n + 1))}
for i in range(n)
]
def _wedges(ax):
"""Devuelve los wedges (sectores) de un Axes con un pie."""
from matplotlib.patches import Wedge
return [p for p in ax.patches if isinstance(p, Wedge)]
def test_returns_figure():
fig = categorical_top_pie_figure(_make_top(3), n_distinct=3, title="col")
assert isinstance(fig, Figure)
plt.close(fig)
def test_ten_items_topk_six_yields_seven_wedges():
top = _make_top(10)
fig = categorical_top_pie_figure(top, n_distinct=10, title="muchas", top_k=6)
ax = fig.axes[0]
wedges = _wedges(ax)
# 6 categorías explícitas + 1 sector "Otros".
assert len(wedges) == 7
plt.close(fig)
def test_empty_top_does_not_raise_and_returns_figure():
fig = categorical_top_pie_figure([], n_distinct=0, title="vacía")
assert isinstance(fig, Figure)
# Sin datos: no debe haber sectores de pie.
assert len(_wedges(fig.axes[0])) == 0
plt.close(fig)
def test_long_value_truncated_in_legend():
long_value = "una_categoria_con_un_nombre_larguisimo_que_excede_el_limite"
top = [
{"value": long_value, "count": 10, "pct": 0.5},
{"value": "corta", "count": 10, "pct": 0.5},
]
fig = categorical_top_pie_figure(top, n_distinct=2, title="col", top_k=6)
ax = fig.axes[0]
legend = ax.get_legend()
assert legend is not None
texts = [t.get_text() for t in legend.get_texts()]
# El valor largo aparece truncado con elipsis y NO en su forma completa.
assert any("" in t for t in texts)
assert long_value not in " ".join(texts)
plt.close(fig)
def test_none_value_and_none_count_are_handled():
top = [
{"value": None, "count": 5, "pct": 0.5},
{"value": "b", "count": None, "pct": 0.0}, # count None -> se descarta
{"value": "c", "count": 5, "pct": 0.5},
]
fig = categorical_top_pie_figure(top, n_distinct=2, title="con nones", top_k=6)
assert isinstance(fig, Figure)
# Solo 2 items válidos, sin overflow -> 2 wedges, sin "Otros".
assert len(_wedges(fig.axes[0])) == 2
plt.close(fig)
def test_n_rows_adds_exact_others_slice():
# 3 categorías mostradas suman 30, dataset real 100 -> "Otros" = 70.
top = _make_top(3) # counts 3,2,1 -> reescalamos abajo
top = [
{"value": "a", "count": 15, "pct": 0.15},
{"value": "b", "count": 10, "pct": 0.10},
{"value": "c", "count": 5, "pct": 0.05},
]
fig = categorical_top_pie_figure(
top, n_distinct=20, title="col", top_k=3, n_rows=100
)
ax = fig.axes[0]
# 3 explícitas + Otros.
assert len(_wedges(ax)) == 4
legend_texts = [t.get_text() for t in ax.get_legend().get_texts()]
# El sector Otros refleja n_distinct - top_k = 17 categorías y count 70.
assert any("Otros (17 categorías)" in t and "70" in t for t in legend_texts)
plt.close(fig)
@@ -0,0 +1,97 @@
---
name: describe_clusters_llm
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def describe_clusters_llm(cluster_profiles: list, feature_names: list, model: str = \"claude-haiku-4-5-20251001\") -> dict"
description: "Micro-analisis LLM de clusters de KMeans (grupo eda). Toma los perfiles AGREGADOS de cada cluster (los que produce project_clusters_2d: tamano, centroide en escala original, features distintivas y centroide en z-score) y, con UNA sola llamada al LLM, pide por cada cluster un TITULO corto + una descripcion de 1-2 frases en espanol. Clave de coste/privacidad: NO envia filas crudas, solo el resumen agregado de cada grupo (tamano, % del total y la media de las features distintivas con su signo respecto a la media global). Reusa ask_llm del grupo claude-direct (API directa con token OAuth de Claude). Impura, dict-no-throw: nunca lanza, degrada a titulos genericos 'Cluster N' si el LLM no responde o el parseo falla."
tags: [eda, clustering, llm, claude-direct, datascience, kmeans]
params:
- name: cluster_profiles
desc: "Lista de perfiles de cluster con la forma que produce project_clusters_2d: cada uno {cluster:int, size:int, pct:float, centroid_original:{feature: media en escala original}, distinctive:[features distintivas], centroid_z:{feature: z-score}}. Solo se le envia al LLM un resumen agregado; nunca filas crudas. Lista vacia o no-lista -> clusters=[] sin llamar al LLM."
- name: feature_names
desc: "Nombres de las features del dataset. Se incluyen como contexto en el prompt para que el LLM pueda nombrar los clusters; no es obligatorio que coincida con las features distintivas de cada perfil."
- name: model
desc: "id del modelo Anthropic a usar. Default 'claude-haiku-4-5-20251001' (haiku, coste bajo, ~2-3s). Para titulos/descripciones mas finas, pasar p.ej. 'claude-opus-4-8'."
output: "dict dict-no-throw: {clusters:[{cluster:int, title:str, description:str}], model:str, note:str}. note=='' si todo fue bien. Si el LLM no respondio (note='LLM no disponible') o el parseo fallo (note='parse fallido'), clusters trae titulos genericos 'Cluster N' con description vacia. Si cluster_profiles esta vacio o no es lista: {clusters:[], model, note:'sin clusters'}. NUNCA lanza."
uses_functions: [ask_llm_py_core]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: true
tests: ["test_parse_clusters_json_valid_array", "test_parse_clusters_json_wrapped_in_junk_text", "test_parse_clusters_json_non_json_returns_none", "test_parse_clusters_json_fills_missing_cluster_by_index", "test_describe_clusters_llm_ok_with_monkeypatched_llm", "test_describe_clusters_llm_degrades_on_empty_response", "test_describe_clusters_llm_degrades_on_unparseable_response", "test_describe_clusters_llm_empty_list_skips_llm", "test_describe_clusters_llm_non_list_input_skips_llm"]
test_file_path: "python/functions/datascience/describe_clusters_llm_test.py"
file_path: "python/functions/datascience/describe_clusters_llm.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.describe_clusters_llm import describe_clusters_llm
# Perfiles agregados producidos por project_clusters_2d (no hay filas crudas).
cluster_profiles = [
{
"cluster": 0, "size": 60, "pct": 60.0,
"centroid_original": {"acidez": 8.5, "alcohol": 9.2},
"distinctive": ["acidez", "alcohol"],
"centroid_z": {"acidez": 1.4, "alcohol": -0.9},
},
{
"cluster": 1, "size": 40, "pct": 40.0,
"centroid_original": {"acidez": 5.1, "alcohol": 13.0},
"distinctive": ["alcohol"],
"centroid_z": {"acidez": -0.7, "alcohol": 1.6},
},
]
feature_names = ["acidez", "alcohol", "azucar"]
out = describe_clusters_llm(cluster_profiles, feature_names) # haiku por defecto
# out = describe_clusters_llm(cluster_profiles, feature_names, model="claude-opus-4-8")
if not out["note"]:
for c in out["clusters"]:
print(f"Cluster {c['cluster']}: {c['title']}")
print(" ", c["description"])
else:
# Degradacion: titulos genericos "Cluster N".
print("LLM no usado:", out["note"])
for c in out["clusters"]:
print(c["cluster"], c["title"])
```
## Cuando usarla
Cuando ya has clusterizado un dataset (KMeans + `project_clusters_2d`) y quieres
poner NOMBRE y descripcion legible a cada grupo en vez de dejar "Cluster 0/1/2".
Es el paso interpretativo que sigue al perfilado de clusters: `project_clusters_2d`
calcula tamano, centroides y features distintivas, y `describe_clusters_llm` los
traduce a un titulo corto + 1-2 frases por cluster. Usala al cerrar un EDA con
segmentacion para el resumen final o el report. Una sola llamada al LLM describe
todos los clusters a la vez (barato).
## Gotchas
- **Impura: hace 1 llamada de red al LLM.** No es determinista ni gratis. Latencia
tipica ~2-3s con haiku.
- **Requiere token OAuth de Claude** en `~/.claude/.credentials.json` (via `ask_llm`
/ grupo `claude-direct`). Sin token / sin red, NO lanza: degrada a titulos
genericos `Cluster N` con `note="LLM no disponible"`.
- **NO envia filas crudas al LLM**, solo el resumen AGREGADO de cada cluster
(tamano, % del total y la media de las features distintivas con su signo respecto
a la media global). Privacidad y coste minimos por diseno — pero requiere que los
perfiles vengan ya calculados por `project_clusters_2d`.
- **Modelo `haiku` por defecto** para coste bajo; sube a `claude-opus-4-8` si
necesitas titulos/descripciones mas finas (mas caro y lento).
- **dict-no-throw**: si el modelo no devuelve un JSON array parseable, retorna
titulos genericos con `note="parse fallido"`. Comprueba siempre `out["note"]`
antes de fiarte de los titulos.
- El LLM puede sobre-interpretar: el system prompt le pide ser sobrio y no inventar
causas, pero revisa los titulos antes de publicarlos en un report.
@@ -0,0 +1,240 @@
"""describe_clusters_llm — micro-analisis LLM de clusters de KMeans (grupo `eda`).
Toma los PERFILES AGREGADOS de cada cluster (los que produce `project_clusters_2d`:
tamano, centroide en escala original, features distintivas y centroide en z-score)
y, con UNA sola llamada al LLM, pide por cada cluster un TITULO corto + una
descripcion de 1-2 frases, en espanol.
Clave de coste y privacidad: NO se envian filas crudas al LLM. Solo viaja el
perfil AGREGADO de cada grupo (tamano, % del total y la media de las features
distintivas con su signo respecto a la media global). El coste es minimo y ningun
dato fila-a-fila sale del proceso.
Reusa `ask_llm` del registry (grupo claude-direct, API directa con el token OAuth
de Claude en ~/.claude/.credentials.json, arranque 0). Impura: una llamada de red.
Estilo dict-no-throw: NUNCA lanza; ante cualquier fallo (red, LLM caido, parseo)
degrada a titulos genericos "Cluster N" + una nota explicando el motivo.
"""
import json
from core.ask_llm import ask_llm
_SYSTEM = (
"Eres un analista de datos. Recibes los PERFILES AGREGADOS de los clusters de "
"un KMeans (por cada grupo: su tamano y la media de sus features distintivas, "
"con el signo respecto a la media global; nunca filas crudas) y los describes "
"de forma sobria y util. Para cada cluster generas un titulo corto y "
"descriptivo (por ejemplo 'Vinos de alta acidez y baja graduacion') y una "
"descripcion de 1-2 frases. NO inventes causas ni sobre-interpretes: limitate a "
"lo que dicen los numeros. Responde en espanol. Responde SIEMPRE y SOLO con un "
"unico JSON array valido, sin texto alrededor y sin fences de markdown, con "
'EXACTAMENTE la forma [{"cluster": <int>, "title": "<titulo corto>", '
'"description": "<1-2 frases>"}], un objeto por cluster.'
)
def _fmt_num(value) -> str:
"""Formatea un numero de forma compacta para el prompt (None -> '?')."""
if value is None:
return "?"
if isinstance(value, bool):
return str(value)
if isinstance(value, float):
if value == int(value):
return str(int(value))
return f"{value:.4g}"
return str(value)
def _cluster_id(profile: dict, index: int) -> int:
"""Devuelve el id del cluster del perfil, o el indice si no es un int valido."""
raw = (profile or {}).get("cluster")
if isinstance(raw, bool):
return index
if isinstance(raw, int):
return raw
try:
return int(raw)
except (TypeError, ValueError):
return index
def _build_prompt(cluster_profiles: list, feature_names: list) -> str:
"""Construye un resumen textual compacto de los perfiles para el LLM.
Funcion interna PURA: no toca red ni disco, es testeable sin credenciales.
Por cada cluster incluye su numero, tamano (size + pct%) y, para cada feature
distintiva, el valor del centroide en escala original mas si esta por encima o
por debajo de la media (signo del z-score en centroid_z). Pasa AGREGADOS, nunca
dato crudo de filas.
Args:
cluster_profiles: lista de perfiles de cluster (forma de project_clusters_2d).
feature_names: nombres de las features del dataset (solo contexto).
Returns:
El texto del prompt.
"""
cluster_profiles = cluster_profiles or []
feature_names = feature_names if isinstance(feature_names, list) else []
lines = [
"Perfiles AGREGADOS de clusters de KMeans. No hay filas crudas, solo medias por grupo.",
f"Numero de clusters: {len(cluster_profiles)}",
]
if feature_names:
lines.append("Features del dataset: " + ", ".join(str(f) for f in feature_names))
lines.append("")
for i, prof in enumerate(cluster_profiles):
prof = prof or {}
cid = _cluster_id(prof, i)
size = prof.get("size")
pct = prof.get("pct")
pct_str = f"{pct:.1f}%" if isinstance(pct, (int, float)) and not isinstance(pct, bool) else "?"
lines.append(f"Cluster {cid}: tamano={_fmt_num(size)} ({pct_str} del total)")
distinctive = prof.get("distinctive") or []
centroid_o = prof.get("centroid_original") or {}
centroid_z = prof.get("centroid_z") or {}
if distinctive:
lines.append(" Features distintivas (media del grupo):")
for feat in distinctive:
val = centroid_o.get(feat)
z = centroid_z.get(feat)
direction = ""
if isinstance(z, (int, float)) and not isinstance(z, bool):
if z > 0:
direction = "por encima de la media"
elif z < 0:
direction = "por debajo de la media"
else:
direction = "en la media"
if direction:
lines.append(f" - {feat}: {_fmt_num(val)} ({direction})")
else:
lines.append(f" - {feat}: {_fmt_num(val)}")
else:
lines.append(" (sin features distintivas marcadas)")
lines.append("")
lines.append(
"Devuelve SOLO el JSON array descrito en las instrucciones del sistema, "
"sin texto antes ni despues."
)
return "\n".join(lines)
def _parse_clusters_json(text: str, n: int):
"""Extrae y normaliza el array JSON de la respuesta del LLM.
Funcion interna testeable sin red. Localiza el primer '[' y el ultimo ']' del
texto (tolerando texto basura alrededor o fences de markdown), hace json.loads
y normaliza cada entrada a {cluster:int, title:str, description:str}, rellenando
el cluster por indice si falta. NUNCA lanza: ante cualquier fallo devuelve None
(senal de degradacion para el caller).
Args:
text: respuesta cruda del LLM.
n: numero de perfiles esperados (referencia; la longitud real la marca el array).
Returns:
Lista normalizada de dicts, o None si no se pudo parsear un array valido.
"""
if not text or not isinstance(text, str):
return None
start = text.find("[")
end = text.rfind("]")
if start == -1 or end == -1 or end <= start:
return None
try:
data = json.loads(text[start : end + 1])
except (ValueError, TypeError):
return None
if not isinstance(data, list):
return None
out = []
for i, item in enumerate(data):
if not isinstance(item, dict):
out.append({"cluster": i, "title": f"Cluster {i}", "description": ""})
continue
raw_cluster = item.get("cluster")
if isinstance(raw_cluster, bool):
cluster = i
elif isinstance(raw_cluster, int):
cluster = raw_cluster
else:
try:
cluster = int(raw_cluster)
except (TypeError, ValueError):
cluster = i
title = item.get("title")
title = str(title) if title is not None else f"Cluster {cluster}"
desc = item.get("description")
desc = str(desc) if desc is not None else ""
out.append({"cluster": cluster, "title": title, "description": desc})
return out
def _generic_clusters(cluster_profiles: list) -> list:
"""Titulos genericos por cluster para la degradacion (sin LLM)."""
out = []
for i, prof in enumerate(cluster_profiles):
cid = _cluster_id(prof or {}, i)
out.append({"cluster": cid, "title": f"Cluster {cid}", "description": ""})
return out
def describe_clusters_llm(
cluster_profiles: list,
feature_names: list,
model: str = "claude-haiku-4-5-20251001",
) -> dict:
"""Describe los clusters de un KMeans con UNA sola llamada al LLM.
Args:
cluster_profiles: lista de perfiles de cluster (la forma que produce
project_clusters_2d): cada uno {"cluster": int, "size": int,
"pct": float, "centroid_original": {feature: media},
"distinctive": [features], "centroid_z": {feature: z}}. Solo se le
envia al LLM el resumen agregado, nunca filas crudas.
feature_names: nombres de las features del dataset (contexto para el LLM).
model: id del modelo Anthropic. Default claude-haiku-4-5-20251001
(haiku, coste bajo).
Returns:
dict dict-no-throw: {"clusters": [{cluster:int, title:str, description:str}],
"model": str, "note": str}. note == "" si todo fue bien; si el LLM no
respondio o el parseo fallo, clusters trae titulos genericos "Cluster N" y
note explica el motivo ("LLM no disponible" / "parse fallido"). Si
cluster_profiles esta vacio o no es lista, devuelve clusters=[] sin llamar
al LLM (note "sin clusters"). NUNCA lanza.
"""
if not isinstance(cluster_profiles, list) or not cluster_profiles:
return {"clusters": [], "model": model, "note": "sin clusters"}
n = len(cluster_profiles)
prompt = _build_prompt(cluster_profiles, feature_names)
try:
text = ask_llm(prompt, model=model, system=_SYSTEM, echo=False)
except Exception: # noqa: BLE001 — degradacion: cualquier fallo de red/LLM.
text = ""
parsed = _parse_clusters_json(text, n)
if parsed:
return {"clusters": parsed, "model": model, "note": ""}
note = "LLM no disponible" if not text else "parse fallido"
return {"clusters": _generic_clusters(cluster_profiles), "model": model, "note": note}
@@ -0,0 +1,160 @@
"""Tests para describe_clusters_llm.
NO acceden a red ni a credenciales: _parse_clusters_json es testeable aislada y la
unica via que llamaria al LLM (describe_clusters_llm) se prueba monkeypatcheando
ask_llm con respuestas simuladas. Cubre golden (LLM ok), edge (cluster faltante,
array envuelto en basura, lista vacia / input no-lista) y error (LLM caido, texto
no parseable) — todos sin tocar la red.
"""
import importlib
import json
from datascience.describe_clusters_llm import (
_parse_clusters_json,
describe_clusters_llm,
)
# Perfiles de ejemplo con la forma que produce project_clusters_2d.
_PROFILES = [
{
"cluster": 0,
"size": 60,
"pct": 60.0,
"centroid_original": {"acidez": 8.5, "alcohol": 9.2},
"distinctive": ["acidez", "alcohol"],
"centroid_z": {"acidez": 1.4, "alcohol": -0.9},
},
{
"cluster": 1,
"size": 40,
"pct": 40.0,
"centroid_original": {"acidez": 5.1, "alcohol": 13.0},
"distinctive": ["alcohol"],
"centroid_z": {"acidez": -0.7, "alcohol": 1.6},
},
]
_FEATURES = ["acidez", "alcohol", "azucar"]
def _patch_ask_llm(monkeypatch, returner):
"""Monkeypatchea ask_llm en el modulo bajo prueba con un callable simulado."""
mod = importlib.import_module("datascience.describe_clusters_llm")
monkeypatch.setattr(
mod, "ask_llm", lambda prompt, model="x", system="", echo=True: returner
)
# --- _parse_clusters_json (parser puro, sin red) ---
def test_parse_clusters_json_valid_array():
text = json.dumps(
[
{"cluster": 0, "title": "A", "description": "desc a"},
{"cluster": 1, "title": "B", "description": "desc b"},
]
)
parsed = _parse_clusters_json(text, 2)
assert parsed == [
{"cluster": 0, "title": "A", "description": "desc a"},
{"cluster": 1, "title": "B", "description": "desc b"},
]
def test_parse_clusters_json_wrapped_in_junk_text():
payload = [{"cluster": 0, "title": "Solo uno", "description": "d"}]
text = "Claro, aqui tienes el resultado:\n" + json.dumps(payload) + "\nEspero que sirva."
parsed = _parse_clusters_json(text, 1)
assert parsed[0]["title"] == "Solo uno"
assert parsed[0]["cluster"] == 0
def test_parse_clusters_json_non_json_returns_none():
# Texto sin array JSON -> degradacion (None) sin lanzar.
assert _parse_clusters_json("no hay json aqui", 2) is None
assert _parse_clusters_json("", 2) is None
assert _parse_clusters_json("{solo un objeto}", 2) is None
def test_parse_clusters_json_fills_missing_cluster_by_index():
text = json.dumps(
[
{"title": "A", "description": "d"},
{"title": "B", "description": "e"},
]
)
parsed = _parse_clusters_json(text, 2)
assert parsed[0]["cluster"] == 0
assert parsed[1]["cluster"] == 1
assert parsed[0]["title"] == "A"
# --- describe_clusters_llm (con ask_llm monkeypatcheado, sin red) ---
def test_describe_clusters_llm_ok_with_monkeypatched_llm(monkeypatch):
fake = json.dumps(
[
{
"cluster": 0,
"title": "Vinos de alta acidez",
"description": "Acidez por encima de la media y graduacion baja.",
},
{
"cluster": 1,
"title": "Vinos de alta graduacion",
"description": "Alcohol claramente por encima de la media.",
},
]
)
_patch_ask_llm(monkeypatch, fake)
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["note"] == ""
assert out["model"] == "claude-haiku-4-5-20251001"
assert len(out["clusters"]) == 2
assert out["clusters"][0]["title"] == "Vinos de alta acidez"
assert set(out["clusters"][0].keys()) == {"cluster", "title", "description"}
def test_describe_clusters_llm_degrades_on_empty_response(monkeypatch):
# ask_llm devuelve "" (error/red caida) -> titulos genericos + note.
_patch_ask_llm(monkeypatch, "")
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["clusters"][0]["title"] == "Cluster 0"
assert out["clusters"][1]["title"] == "Cluster 1"
assert out["clusters"][0]["description"] == ""
assert out["note"] == "LLM no disponible"
assert out["model"] == "claude-haiku-4-5-20251001"
def test_describe_clusters_llm_degrades_on_unparseable_response(monkeypatch):
_patch_ask_llm(monkeypatch, "lo siento, no puedo ayudarte con eso")
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["clusters"][0]["title"] == "Cluster 0"
assert out["clusters"][1]["title"] == "Cluster 1"
assert out["note"] == "parse fallido"
def test_describe_clusters_llm_empty_list_skips_llm(monkeypatch):
# Con lista vacia NO debe llamarse al LLM en absoluto.
def boom(*args, **kwargs):
raise AssertionError("ask_llm no debe llamarse con lista vacia")
mod = importlib.import_module("datascience.describe_clusters_llm")
monkeypatch.setattr(mod, "ask_llm", boom)
out = describe_clusters_llm([], _FEATURES)
assert out["clusters"] == []
assert out["note"] == "sin clusters"
def test_describe_clusters_llm_non_list_input_skips_llm():
# Input no-lista (None) -> clusters vacio sin tocar la red.
out = describe_clusters_llm(None, _FEATURES)
assert out["clusters"] == []
assert out["note"] == "sin clusters"
assert out["model"] == "claude-haiku-4-5-20251001"
@@ -0,0 +1,95 @@
---
name: project_clusters_2d
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def project_clusters_2d(columns: dict, k_min: int = 2, k_max: int = 8, max_points: int = 2000) -> dict"
description: "PCA a 2D + KMeans sobre el MISMO subset numerico estandarizado, devolviendo proyeccion 2D y labels de cluster ALINEADOS por fila para pintar un scatter PCA coloreado por cluster. Estandariza una sola vez, elige k por silhouette y proyecta centroides al espacio PCA. Determinista."
tags: [eda, models, clustering, pca, kmeans, scatter, dimensionality-reduction, datascience, sklearn]
params:
- name: columns
desc: "Mapa {nombre_columna: [valores numericos]}. Listas alineadas por fila (misma longitud). Columnas no numericas o con <2 valores distintos se descartan; None/NaN descartan la fila completa (listwise)."
- name: k_min
desc: "Numero minimo de clusters a probar por silhouette (default 2). El minimo de filas validas requerido es max(3, k_min*2)."
- name: k_max
desc: "Numero maximo de clusters a probar (default 8). Se acota a min(k_max, n_filas_validas-1)."
- name: max_points
desc: "Tope de puntos devueltos en points/labels (default 2000). Si n_used lo supera, points y labels se submuestrean CONJUNTAMENTE con paso determinista para seguir alineados; el fit usa siempre todas las filas."
output: "dict con points (proyeccion 2D, posiblemente submuestreada a max_points), labels (cluster de cada point, alineado con points), centers_2d (centroides en espacio PCA, len==best_k), best_k, silhouette, explained_2d ([var PC1, var PC2]), cluster_sizes (sobre n_used total), cluster_profiles (lista de {cluster, size, pct, centroid_original, distinctive top-3 por |z|, centroid_z}), feature_names, n_used (filas del fit antes de muestreo) y note (\"\" si ok). Con <2 columnas numericas o <max(3, k_min*2) filas validas devuelve best_k=0, listas vacias y note 'datos insuficientes' sin lanzar excepcion."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [numpy, scikit-learn]
tested: true
tests: ["test_golden_three_blobs_aligned_projection_and_clusters", "test_edge_subsampling_keeps_points_labels_aligned", "test_edge_single_numeric_column_insufficient", "test_edge_too_few_rows_insufficient", "test_edge_non_numeric_column_dropped_without_error", "test_edge_constant_column_dropped"]
test_file_path: "python/functions/datascience/project_clusters_2d_test.py"
file_path: "python/functions/datascience/project_clusters_2d.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.project_clusters_2d import project_clusters_2d
# Tres grupos gaussianos bien separados sobre 4 features.
import numpy as np
rng = np.random.default_rng(0)
rows = []
for center in (np.full(4, 0.0), np.full(4, 12.0), np.array([0.0, 12.0, 0.0, 12.0])):
rows.extend(rng.normal(loc=center, scale=0.4, size=(50, 4)))
mat = np.array(rows)
columns = {f"f{j}": [float(v) for v in mat[:, j]] for j in range(4)}
res = project_clusters_2d(columns, k_min=2, k_max=8)
print(res["best_k"]) # 3
print(len(res["points"]), len(res["labels"])) # 150 150 (alineados)
print(len(res["centers_2d"])) # == best_k
print([round(v, 2) for v in res["explained_2d"]]) # varianza de PC1, PC2
# Pintar: scatter(points[:,0], points[:,1], c=labels) + marcar centers_2d.
```
## Cuando usarla
Cuando, durante un EDA, quieres un scatter 2D de un dataset tabular numerico
coloreado por segmento descubierto automaticamente, y necesitas que cada punto
de la proyeccion lleve su etiqueta de cluster correcta. Usala en vez de
combinar `pca_explained` + `kmeans_segments` a mano: esas estandarizan por
separado y descartan los labels, asi que sus salidas no se pueden cruzar fila a
fila. Esta funcion garantiza esa alineacion (mismo X estandarizado para PCA y
KMeans) y ademas proyecta los centroides KMeans al espacio PCA para dibujarlos.
## Gotchas
- Funcion pura y determinista (StandardScaler + PCA random_state=0 + KMeans
random_state=0, n_init=10), pero requiere `numpy` y `scikit-learn` instalados.
- `points`/`labels` pueden venir submuestreados si `n_used > max_points` (paso
determinista `[::ceil(n_used/max_points)]`); `n_used`, `centers_2d`,
`cluster_sizes` y `cluster_profiles` se calculan SIEMPRE sobre todas las filas.
Cuando hay submuestreo, `note` lo indica.
- `centroid_z` y `distinctive` estan en z-score (espacio escalado);
`centroid_original` esta en las unidades originales (via
`scaler.inverse_transform`). No mezcles ambos al interpretar.
- `centers_2d` esta en el espacio PCA (coordenadas del scatter), no en unidades
originales: pintalo sobre el mismo eje que `points`.
- Silhouette baja con best_k alto sugiere que no hay estructura de cluster real;
el scatter puede no mostrar grupos separados.
## Notas
Pieza de composicion que `pca_explained` + `kmeans_segments` no cubren: ambas
estandarizan internamente por separado (cada una su propio `StandardScaler`) y
`kmeans_segments` no expone los labels por fila, por lo que no se pueden cruzar
con la `projection` de `pca_explained`. Esta funcion usa `sklearn` directo
(StandardScaler una sola vez compartido por PCA y KMeans) para garantizar la
alineacion `points[i] <-> labels[i]` y proyectar los centroides KMeans al
espacio PCA. Coercion y listwise deletion siguen el estilo de `pca_explained`
(None/NaN -> fila descartada, columnas no parseables o constantes descartadas).
Degrada con gracia: con <2 columnas numericas o <max(3, k_min*2) filas validas
devuelve `note: "datos insuficientes"` sin lanzar excepcion (try/except
defensivo en todo el cuerpo).
@@ -0,0 +1,208 @@
"""Proyeccion PCA-2D + KMeans sobre el mismo subset, con puntos y labels alineados.
Estandariza una sola vez las columnas numericas (z-score), proyecta a 2D con PCA
y clusteriza con KMeans sobre EXACTAMENTE la misma matriz escalada, de modo que
la proyeccion 2D (`points`) y la etiqueta de cluster (`labels`) quedan alineadas
fila a fila. Es la pieza que `pca_explained` + `kmeans_segments` no cubren: esas
dos estandarizan por separado y descartan los labels, asi que sus salidas no se
pueden cruzar para pintar un scatter PCA coloreado por cluster. Determinista.
"""
import math
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
def project_clusters_2d(
columns: dict,
k_min: int = 2,
k_max: int = 8,
max_points: int = 2000,
) -> dict:
"""Proyecta a 2D (PCA) y clusteriza (KMeans) el mismo subset estandarizado.
PCA a 2D y KMeans se ajustan sobre la MISMA matriz estandarizada, por lo que
`points` (proyeccion 2D) y `labels` (cluster por fila) quedan alineados por
indice. El k se elige automaticamente por silhouette en el rango
[k_min, min(k_max, n_rows-1)], igual criterio que `kmeans_segments`.
Determinista: StandardScaler + PCA(random_state=0) + KMeans(random_state=0,
n_init=10).
Args:
columns: mapa {nombre_columna: [valores numericos]}. Listas alineadas por
fila (misma longitud). Columnas no numericas o con menos de 2 valores
distintos se descartan. None/NaN marcan filas a descartar listwise
(una fila se elimina si cualquier feature falta).
k_min: numero minimo de clusters a probar (default 2).
k_max: numero maximo de clusters a probar (default 8). Se acota a
min(k_max, n_rows_validas-1).
max_points: tope de puntos devueltos en `points`/`labels`. Si las filas
usadas superan este tope, se submuestrea points y labels CONJUNTAMENTE
con paso determinista para mantenerlos alineados. El fit (best_k,
silhouette, centroides, perfiles) usa SIEMPRE todas las filas.
Returns:
dict con points (proyeccion 2D, posiblemente submuestreada a max_points),
labels (cluster de cada point, alineado con points), centers_2d
(centroides en espacio PCA, len == best_k), best_k, silhouette,
explained_2d (varianza de PC1 y PC2), cluster_sizes (sobre n_used total),
cluster_profiles (ver abajo), feature_names, n_used (filas del fit antes
de muestreo) y note ("" si ok). Cada entrada de cluster_profiles:
{cluster, size, pct, centroid_original (medias en escala original),
centroid_z (z del centroide), distinctive (top 3 features por |z|)}.
Con <2 columnas numericas o <max(3, k_min*2) filas validas devuelve
best_k=0 y note "datos insuficientes" sin lanzar excepcion.
"""
feature_names: list[str] = []
def insufficient(names: list[str], n_used: int) -> dict:
return {
"best_k": 0,
"points": [],
"labels": [],
"centers_2d": [],
"cluster_profiles": [],
"feature_names": names,
"n_used": int(n_used),
"note": "datos insuficientes",
}
try:
if not isinstance(columns, dict) or not columns:
return insufficient([], 0)
# 1. Coerce a numerico, descartando columnas no parseables o constantes.
numeric_cols: dict[str, list] = {}
for name, values in columns.items():
if not isinstance(values, (list, tuple)):
continue
coerced: list[float] = []
usable = True
for v in values:
if v is None:
coerced.append(math.nan)
continue
try:
coerced.append(float(v))
except (TypeError, ValueError):
usable = False
break
if not usable:
continue
# Menos de 2 valores distintos no aporta varianza -> descartar.
distinct = {x for x in coerced if not math.isnan(x)}
if len(distinct) < 2:
continue
numeric_cols[name] = coerced
feature_names = list(numeric_cols.keys())
if len(feature_names) < 2:
return insufficient(feature_names, 0)
# 2. Matriz alineada por fila + listwise deletion (cualquier NaN -> fuera).
matrix = np.array(
[numeric_cols[n] for n in feature_names], dtype=float
).T
valid_mask = ~np.isnan(matrix).any(axis=1)
data = matrix[valid_mask]
n_used = int(data.shape[0])
min_rows = max(3, k_min * 2)
if n_used < min_rows:
return insufficient(feature_names, n_used)
# 3. Estandarizar UNA sola vez (guardamos el scaler para desestandarizar).
scaler = StandardScaler()
X_scaled = scaler.fit_transform(data)
# 4. PCA a 2D sobre la matriz escalada.
pca = PCA(n_components=2, random_state=0)
pca.fit(X_scaled)
proj = pca.transform(X_scaled)
# 5. KMeans con seleccion automatica de k por silhouette (mismo X_scaled).
upper_k = min(k_max, n_used - 1)
if upper_k < k_min:
return insufficient(feature_names, n_used)
best = None # (silhouette, k, model, labels)
for k in range(k_min, upper_k + 1):
model = KMeans(n_clusters=k, n_init=10, random_state=0)
labels_k = model.fit_predict(X_scaled)
if len(set(labels_k)) < 2:
sil = -1.0
else:
sil = float(silhouette_score(X_scaled, labels_k))
if best is None or sil > best[0]:
best = (sil, k, model, labels_k)
best_sil, best_k, best_model, labels = best
# 6. Centroides KMeans (espacio escalado) proyectados al espacio PCA.
centers_2d = pca.transform(best_model.cluster_centers_)
# 7. Perfiles por cluster sobre TODAS las filas usadas.
centroids_original = scaler.inverse_transform(best_model.cluster_centers_)
cluster_sizes: list[int] = []
cluster_profiles: list[dict] = []
for c in range(best_k):
size = int(np.sum(labels == c))
cluster_sizes.append(size)
z_vec = best_model.cluster_centers_[c]
orig_vec = centroids_original[c]
centroid_z = {
feature_names[j]: float(z_vec[j]) for j in range(len(feature_names))
}
centroid_original = {
feature_names[j]: float(orig_vec[j])
for j in range(len(feature_names))
}
order = np.argsort(np.abs(z_vec))[::-1]
distinctive = [feature_names[int(j)] for j in order[:3]]
cluster_profiles.append(
{
"cluster": int(c),
"size": size,
"pct": float(size / n_used) if n_used else 0.0,
"centroid_original": centroid_original,
"distinctive": distinctive,
"centroid_z": centroid_z,
}
)
# 8. Muestreo determinista CONJUNTO de points + labels (mantiene alineacion).
note = ""
if n_used > max_points and max_points > 0:
step = math.ceil(n_used / max_points)
proj_out = proj[::step]
labels_out = labels[::step]
note = f"submuestreado a {len(proj_out)} de {n_used} puntos para visualizacion"
else:
proj_out = proj
labels_out = labels
points = [[float(row[0]), float(row[1])] for row in proj_out]
labels_list = [int(v) for v in labels_out]
centers_list = [[float(row[0]), float(row[1])] for row in centers_2d]
explained_2d = [float(x) for x in pca.explained_variance_ratio_]
return {
"points": points,
"labels": labels_list,
"centers_2d": centers_list,
"best_k": int(best_k),
"silhouette": float(best_sil),
"explained_2d": explained_2d,
"cluster_sizes": cluster_sizes,
"cluster_profiles": cluster_profiles,
"feature_names": feature_names,
"n_used": n_used,
"note": note,
}
except Exception:
# Lectura defensiva: nunca propagar excepciones al caller del EDA.
return insufficient(feature_names, 0)
@@ -0,0 +1,127 @@
"""Tests para project_clusters_2d."""
import numpy as np
from project_clusters_2d import project_clusters_2d
def _three_blobs(seed: int = 0, per_blob: int = 50, n_features: int = 4):
"""Genera 3 gaussianas bien separadas en n_features dims, alineadas por fila.
Devuelve un dict {col: [valores]} con las columnas alineadas por fila.
"""
rng = np.random.default_rng(seed)
base_centers = [
np.full(n_features, 0.0),
np.full(n_features, 12.0),
np.array([0.0, 12.0, 0.0, 12.0][:n_features] + [0.0] * max(0, n_features - 4)),
]
rows: list[np.ndarray] = []
for center in base_centers:
pts = rng.normal(loc=center, scale=0.4, size=(per_blob, n_features))
rows.extend(pts)
mat = np.array(rows)
return {f"f{j}": [float(v) for v in mat[:, j]] for j in range(n_features)}
def test_golden_three_blobs_aligned_projection_and_clusters():
columns = _three_blobs(seed=0, per_blob=50, n_features=4)
result = project_clusters_2d(columns, k_min=2, k_max=8)
n_used = result["n_used"]
assert n_used == 150
assert result["note"] == ""
best_k = result["best_k"]
assert 2 <= best_k <= 4
# points y labels alineados por fila.
assert len(result["points"]) == len(result["labels"])
assert len(result["points"]) == n_used # sin submuestreo (150 < 2000)
# Cada punto es un par (x, y).
assert all(len(p) == 2 for p in result["points"])
# Labels dentro del rango [0, best_k).
assert all(0 <= lbl < best_k for lbl in result["labels"])
# Centroides 2D: uno por cluster.
assert len(result["centers_2d"]) == best_k
assert all(len(c) == 2 for c in result["centers_2d"])
# Varianza explicada de los 2 componentes.
assert len(result["explained_2d"]) == 2
# cluster_sizes cubre todas las filas usadas.
assert sum(result["cluster_sizes"]) == n_used
assert len(result["cluster_sizes"]) == best_k
# cluster_profiles: una entrada por cluster, con centroid_original poblado.
assert len(result["cluster_profiles"]) == best_k
for prof in result["cluster_profiles"]:
assert set(prof["centroid_original"].keys()) == set(result["feature_names"])
assert set(prof["centroid_z"].keys()) == set(result["feature_names"])
assert 1 <= len(prof["distinctive"]) <= 3
assert prof["size"] >= 0
assert 0.0 <= prof["pct"] <= 1.0
def test_edge_subsampling_keeps_points_labels_aligned():
# max_points pequeño fuerza submuestreo conjunto de points + labels.
columns = _three_blobs(seed=1, per_blob=50, n_features=3)
result = project_clusters_2d(columns, k_min=2, k_max=6, max_points=40)
n_used = result["n_used"]
assert n_used == 150 # el fit usa todas las filas
# points y labels submuestreados pero siempre con la misma longitud.
assert len(result["points"]) == len(result["labels"])
assert len(result["points"]) <= 40
# centers/sizes/profiles se calculan sobre TODOS los puntos.
assert sum(result["cluster_sizes"]) == n_used
assert len(result["centers_2d"]) == result["best_k"]
assert result["note"] != "" # senala el submuestreo
def test_edge_single_numeric_column_insufficient():
columns = {"x": [float(i) for i in range(50)]}
result = project_clusters_2d(columns, k_min=2, k_max=8)
assert result["best_k"] == 0
assert result["note"] == "datos insuficientes"
assert result["points"] == []
assert result["labels"] == []
assert result["centers_2d"] == []
assert result["cluster_profiles"] == []
def test_edge_too_few_rows_insufficient():
# Solo 2 filas validas, min_rows = max(3, k_min*2) = 4 -> insuficiente.
columns = {"x": [1.0, 5.0], "y": [2.0, 9.0]}
result = project_clusters_2d(columns, k_min=2, k_max=8)
assert result["best_k"] == 0
assert result["note"] == "datos insuficientes"
def test_edge_non_numeric_column_dropped_without_error():
# La columna de strings se descarta; quedan 3 numericas -> funciona.
columns = _three_blobs(seed=2, per_blob=50, n_features=3)
columns["label"] = ["a"] * len(columns["f0"])
result = project_clusters_2d(columns, k_min=2, k_max=6)
assert result["best_k"] >= 2
assert "label" not in result["feature_names"]
assert set(result["feature_names"]) == {"f0", "f1", "f2"}
assert len(result["points"]) == len(result["labels"])
def test_edge_constant_column_dropped():
# Una columna constante (0 varianza) se descarta por <2 valores distintos.
columns = _three_blobs(seed=3, per_blob=50, n_features=3)
columns["const"] = [7.0] * len(columns["f0"])
result = project_clusters_2d(columns, k_min=2, k_max=6)
assert "const" not in result["feature_names"]
assert result["best_k"] >= 2