Compare commits

..

1 Commits

Author SHA1 Message Date
egutierrez 03f3dca823 feat(eda): capítulo CORRELACION de AutomaticEDA (matriz + top pares ±)
Implementa chapters/correlacion.py siguiendo el contrato de capítulos:
build_correlacion(profile, ctx) -> Chapter|None, CHAPTER_VERSION="1.0.0".

Consume profile['correlations'] (salida de association_matrix del grupo eda,
sin recalcular estadística) y emite, como bloques del modelo:

- Matriz de asociación (Figure/heatmap perezoso, RdBu_r, con signo en num-num
  y magnitud en métricas mixtas; etiquetas ordenadas por conectividad y
  recortadas a las 16 más conectadas para legibilidad).
- TOP de pares POSITIVOS y TOP de pares NEGATIVOS en dos DataTable separadas
  (los negativos son por construcción num-num, único método con signo), con
  método, valor, p-valor corregido (FDR) y significancia.
- Resumen FDR (multiple_testing) + leyenda de métodos.
- Aviso de espuriedad por niveles no estacionarios (Granger-Newbold) cuando el
  profile lo marca.

Lectura defensiva en todo (None si no hay pares; nunca lanza). Anti-cortes:
sólo bloques del modelo, el paginador parte tablas repitiendo cabecera y escala
la figura entera.

Test self-contained (5 casos): golden a nivel de bloques + golden render
PDF/PPTX, edge sin pares -> None, edge sólo positivos -> nota honesta, y
anti-corte con matriz ancha + etiquetas largas (dato íntegro a nivel de bloque,
ambos renderers sin reventar).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 14:59:50 +02:00
11 changed files with 527 additions and 1688 deletions
-4
View File
@@ -42,8 +42,6 @@ from .isolation_forest_outliers import isolation_forest_outliers
from .normality_tests import normality_tests
from .trend_slope import trend_slope
from .run_eda_models import run_eda_models
from .project_clusters_2d import project_clusters_2d
from .describe_clusters_llm import describe_clusters_llm
from .eda_llm_insights import eda_llm_insights
from .build_eda_notebook import build_eda_notebook
from .decode_qr_image import decode_qr_image
@@ -88,8 +86,6 @@ __all__ = [
"normality_tests",
"trend_slope",
"run_eda_models",
"project_clusters_2d",
"describe_clusters_llm",
"eda_llm_insights",
"build_eda_notebook",
"describe_numeric",
@@ -0,0 +1,352 @@
"""Correlation chapter — association matrix plus top positive/negative pairs.
Builds the CORRELACION chapter of an AutomaticEDA document from a TableProfile.
It renders exactly what the user asked for:
1. A correlation/association **matrix** (heatmap) reconstructed from the evaluated
pairs, signed for numeric-numeric pairs (Pearson/Spearman, ``[-1, 1]``) and as
magnitude for the mixed-type metrics (Cramér's V, correlation ratio, mutual
information, ``[0, 1]``). Labels are ordered by total connectivity so strong
associations cluster together instead of being scattered alphabetically.
2. The **TOP positive** pairs and the **TOP negative** pairs as two separate
tables. Only numeric-numeric metrics carry a sign, so negative pairs are by
construction Pearson/Spearman; positive pairs may use any method.
3. The methods legend and the multiple-testing (FDR) summary, so the reader sees
how many pairs survive the correction.
4. A spuriousness caveat when the profile flags level-based correlations on
non-stationary series (GrangerNewbold).
All data comes from ``profile['correlations']`` — the output of the ``eda`` group
function ``association_matrix`` (optionally enriched by ``profile_table``). The
chapter never recomputes any statistic; it only lays the existing values out as
format-independent blocks. The renderers paginate tables (repeating the header)
and scale the heatmap to fit entirely, so nothing is ever cut.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
"""
from __future__ import annotations
import math
from .. import model
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "correlacion"
CHAPTER_TITLE = "Correlación"
# Methods whose value carries a sign (direction). Everything else is a magnitude
# in [0, 1] and therefore only ever contributes to the positive side.
_SIGNED_METHODS = ("pearson", "spearman")
# Cap the heatmap to the most-connected variables so it stays legible on a phone
# screen / a slide. The renderer would scale a bigger matrix to fit, but the
# cells become unreadable; we instead show the top-N and say so.
_MAX_MATRIX_LABELS = 16
# How many pairs to show in each of the top-positive / top-negative tables.
_TOP_N = 10
def _is_num(v) -> bool:
"""True for a real, finite int/float (not bool, not NaN/inf)."""
return (
isinstance(v, (int, float))
and not isinstance(v, bool)
and not (isinstance(v, float) and (math.isnan(v) or math.isinf(v)))
)
def _fmt_val(value, decimals: int = 2) -> str:
"""Format an association value compactly, signed, with a fixed width feel."""
if not _is_num(value):
return ""
text = f"{float(value):+.{decimals}f}"
# Strip a trailing -0.00 / +0.00 into a clean 0.00 for readability.
if text in ("+0.00", "-0.00"):
return "0.00"
return text
def _fmt_p(value) -> str:
"""Format an adjusted p-value; tiny values collapse to a '<' threshold."""
if not _is_num(value):
return ""
p = float(value)
if p < 0.001:
return "<0.001"
return f"{p:.3f}"
def _is_signed(pair: dict) -> bool:
"""True if the pair's method reports a directional (signed) value."""
method = str(pair.get("method") or "").lower()
return any(m in method for m in _SIGNED_METHODS)
def _significant(pair: dict) -> bool:
"""True if the pair is significant after FDR (or has no test to correct)."""
if pair.get("significant") is True:
return True
# Pairs without an applicable test (p_value None) are not penalised: they are
# admitted on magnitude alone upstream, so treat missing as "not rejected".
return pair.get("p_value") is None and pair.get("significant") is None
def _label(pair: dict) -> str:
"""Human label for a pair, e.g. 'alcohol ↔ density'."""
return f"{model._safe_str(pair.get('a'))}{model._safe_str(pair.get('b'))}"
def _split_top(pairs: list, top_n: int = _TOP_N):
"""Split evaluated pairs into ranked top-positive and top-negative lists.
Positive: any pair with a positive value, ranked by value descending.
Negative: only signed (numeric-numeric) pairs with a negative value, ranked
by value ascending (most negative first). Non-finite values are dropped.
"""
positive = []
negative = []
for pair in pairs:
if not isinstance(pair, dict):
continue
value = pair.get("value")
if not _is_num(value):
continue
if value > 0:
positive.append(pair)
elif value < 0 and _is_signed(pair):
negative.append(pair)
positive.sort(key=lambda p: float(p.get("value", 0.0)), reverse=True)
negative.sort(key=lambda p: float(p.get("value", 0.0)))
return positive[:top_n], negative[:top_n]
def _top_table(pairs: list, title: str):
"""Build a DataTable for a list of pairs, or None if there are none."""
if not pairs:
return None
header = ["Par", "Método", "Valor", "p (FDR)", "Sig."]
rows = []
for pair in pairs:
method = model._safe_str(pair.get("method")) or ""
rows.append([
_label(pair),
method,
_fmt_val(pair.get("value")),
_fmt_p(pair.get("p_value_adjusted")),
"" if _significant(pair) else "no",
])
return model.DataTable(header=header, rows=rows, title=title)
def _ordered_labels(pairs: list):
"""Pick and order the matrix labels by total connectivity (descending).
Returns the list of variable names to place on the axes, capped at
``_MAX_MATRIX_LABELS`` (the most-connected ones), plus a boolean saying
whether the cap trimmed anything.
"""
strength = {}
for pair in pairs:
if not isinstance(pair, dict):
continue
value = pair.get("value")
if not _is_num(value):
continue
mag = abs(float(value))
for key in ("a", "b"):
name = pair.get(key)
if name is None:
continue
strength[name] = strength.get(name, 0.0) + mag
if not strength:
return [], False
ordered = sorted(strength, key=lambda n: strength[n], reverse=True)
trimmed = len(ordered) > _MAX_MATRIX_LABELS
return ordered[:_MAX_MATRIX_LABELS], trimmed
def _matrix_figure(pairs: list, labels: list):
"""Return a Figure (lazy) with the signed association heatmap, or None.
The matplotlib figure is built lazily inside ``make`` so importing this
module never requires matplotlib and a malformed plot degrades to nothing
instead of aborting the chapter.
"""
if len(labels) < 2:
return None
index = {name: i for i, name in enumerate(labels)}
def make():
import numpy as np
from matplotlib.figure import Figure
n = len(labels)
grid = np.full((n, n), np.nan, dtype=float)
for i in range(n):
grid[i, i] = 1.0
for pair in pairs:
if not isinstance(pair, dict):
continue
a = pair.get("a")
b = pair.get("b")
value = pair.get("value")
if a not in index or b not in index or not _is_num(value):
continue
v = float(value)
# Mixed-type magnitudes are non-negative; keep them as-is on [0, 1].
ia, ib = index[a], index[b]
grid[ia, ib] = v
grid[ib, ia] = v
import matplotlib
masked = np.ma.masked_invalid(grid)
fig = Figure(figsize=(6.2, 5.6))
ax = fig.add_subplot(111)
cmap = matplotlib.colormaps["RdBu_r"].copy()
cmap.set_bad(color="#eeeeee")
im = ax.imshow(masked, cmap=cmap, vmin=-1.0, vmax=1.0, aspect="auto")
ax.set_xticks(range(n))
ax.set_yticks(range(n))
short = [str(s)[:14] for s in labels]
ax.set_xticks(range(n))
ax.set_xticklabels(short, rotation=90, fontsize=7)
ax.set_yticklabels(short, fontsize=7)
# Annotate cells only when the matrix is small enough to stay legible.
if n <= 8:
for i in range(n):
for j in range(n):
cell = grid[i, j]
if _is_num(cell):
ax.text(j, i, f"{cell:+.2f}".replace("+", "") if cell < 0
else f"{cell:.2f}",
ha="center", va="center", fontsize=6,
color="#222222")
fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04,
label="asociación (signo en num-num)")
fig.tight_layout()
return fig
return model.Figure(make=make,
caption="Matriz de asociación. Azul = positiva, rojo = "
"negativa (sólo num-num lleva signo); gris = par "
"no evaluado.")
def _methods_block(corr: dict):
"""Build a KVTable with the legend of the methods actually present."""
legend = corr.get("methods_legend")
if not isinstance(legend, dict) or not legend:
return None
rows = [(model._safe_str(k), model._safe_str(v)) for k, v in legend.items()]
return model.KVTable(rows=rows, title="Métodos de asociación")
def _fdr_text(corr: dict) -> str | None:
"""One-line summary of the multiple-testing (FDR) correction, or None."""
mt = corr.get("multiple_testing")
if not isinstance(mt, dict) or not mt:
return None
method = model._safe_str(mt.get("method")).upper() or "FDR"
alpha = mt.get("alpha")
n_tests = mt.get("n_tests")
n_rej = mt.get("n_rejected")
parts = [f"Corrección por comparaciones múltiples ({method}"]
if _is_num(alpha):
parts[0] += f", α={float(alpha):g}"
parts[0] += ")."
if _is_num(n_tests):
rej = n_rej if _is_num(n_rej) else ""
parts.append(
f"De {int(n_tests)} pares con test, {rej} siguen siendo "
f"significativos tras la corrección.")
return " ".join(parts)
def build_correlacion(profile: dict, ctx: dict):
"""Build the Correlation Chapter, or None if there are no pairs to show.
Reads ``profile['correlations']`` (the ``association_matrix`` output). Returns
``None`` when the dataset has fewer than two associable columns (no evaluated
pairs), so the chapter is omitted instead of showing an empty section. Never
raises: every access is defensive.
ctx keys consumed: none specific (presentation metadata is inherited from the
document). The chapter reads everything it needs from the profile.
"""
profile = profile or {}
ctx = ctx or {}
corr = profile.get("correlations")
if not isinstance(corr, dict):
return None
pairs = corr.get("pairs")
if not isinstance(pairs, list) or not pairs:
return None
blocks: list = []
# Intro: what this chapter shows and how to read the sign.
blocks.append(model.Markdown(text=(
"Asociación entre columnas. Cada par se evalúa con la métrica adecuada a "
"sus tipos (Pearson/Spearman entre numéricas — con **signo**; Cramér's V "
"entre categóricas; razón de correlación num-categórica; información mutua "
"como medida común no lineal). Sólo las correlaciones **num-num** tienen "
"dirección: por eso los pares **negativos** son siempre num-num.")))
# 1) Association matrix (heatmap).
labels, trimmed = _ordered_labels(pairs)
fig = _matrix_figure(pairs, labels)
if fig is not None:
blocks.append(model.Heading(text="Matriz de asociación", level=2))
blocks.append(fig)
if trimmed:
blocks.append(model.Note(text=(
f"Se muestran las {len(labels)} variables más conectadas de la "
"matriz para mantenerla legible; el resto de pares siguen en las "
"tablas de abajo.")))
# 2) Top positive / top negative pairs.
positive, negative = _split_top(pairs, _TOP_N)
pos_table = _top_table(positive, f"Top {len(positive)} positivas")
neg_table = _top_table(negative, f"Top {len(negative)} negativas")
if pos_table is not None:
blocks.append(model.Heading(text="Pares más correlacionados (positivos)",
level=2))
blocks.append(pos_table)
if neg_table is not None:
blocks.append(model.Heading(text="Pares más correlacionados (negativos)",
level=2))
blocks.append(neg_table)
elif pos_table is not None:
# No signed-negative pairs at all: say so honestly rather than omit.
blocks.append(model.Note(text=(
"No se han hallado correlaciones negativas significativas entre "
"columnas numéricas.")))
# 3) Spuriousness caveat for level-based correlations (GrangerNewbold).
caveat = corr.get("levels_caveat")
if isinstance(caveat, str) and caveat.strip():
blocks.append(model.Note(text=caveat.strip()))
elif corr.get("levels_possible_spurious"):
blocks.append(model.Note(text=(
"Aviso: algunas correlaciones se calcularon sobre niveles de series "
"no estacionarias y pueden ser espurias (GrangerNewbold). Compáralas "
"sobre los retornos/diferencias antes de interpretarlas.")))
# 4) FDR summary + methods legend.
fdr_text = _fdr_text(corr)
if fdr_text:
blocks.append(model.Markdown(text=fdr_text))
methods = _methods_block(corr)
if methods is not None:
blocks.append(model.Heading(text="Métodos y leyenda", level=2))
blocks.append(methods)
if not blocks:
return None
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -0,0 +1,175 @@
"""Tests for the CORRELACION chapter — DoD: golden + edges + error/anti-cut.
Self-contained: builds a synthetic TableProfile carrying a ``correlations`` block
shaped exactly like ``association_matrix`` output (no DuckDB), so the suite is
fast and deterministic. Verifies that the chapter emits the association-matrix
figure plus separate top-positive / top-negative tables with the right pairs,
that it returns None when the profile has no pairs, that a None/empty profile
does not raise, and that a wide matrix with long labels renders to PDF *and* PPTX
without cutting anything.
"""
import os
import re
import tempfile
from pypdf import PdfReader
from datascience.automatic_eda.chapters.correlacion import (
CHAPTER_VERSION,
build_correlacion,
)
from datascience.automatic_eda.model import DataTable, Figure
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
def _pair(a, b, value, method, padj, sig, p=0.0001):
return {
"a": a, "b": b, "a_type": "numeric", "b_type": "numeric",
"method": method, "value": value, "extra": {"mi": abs(value) * 0.5},
"p_value": p, "p_value_adjusted": padj, "significant": sig,
}
def _profile() -> dict:
"""Synthetic wine-like profile with signed and unsigned associations."""
pairs = [
_pair("alcohol", "quality", 0.48, "pearson/spearman", 0.0005, True),
_pair("density", "alcohol", -0.78, "pearson/spearman", 0.0001, True),
_pair("ph", "fixed_acidity", -0.68, "pearson/spearman", 0.0002, True),
_pair("sulphates", "quality", 0.25, "pearson/spearman", 0.03, True),
# Unsigned mixed-type metrics: only ever positive, never in the neg table.
{"a": "region", "b": "type", "a_type": "categorical",
"b_type": "categorical", "method": "cramers_v", "value": 0.55,
"extra": {"mi": 0.3}, "p_value": 0.001, "p_value_adjusted": 0.004,
"significant": True},
]
return {
"table": "wine",
"source": "/data/wine.csv",
"n_rows": 1599,
"n_cols": 12,
"correlations": {
"pairs": pairs,
"strong": [p for p in pairs if abs(p["value"]) >= 0.5],
"methods_legend": {
"pearson": "num-num lineal (Pearson r), [-1, 1]",
"cramers_v": "cat-cat simétrica (Cramér's V), [0, 1]",
},
"multiple_testing": {"method": "bh", "alpha": 0.05,
"n_tests": 5, "n_rejected": 5},
},
}
def _pdf_text(path: str) -> str:
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
return re.sub(r"\s+", " ", txt)
def test_golden_chapter_tiene_matriz_y_top_positivos_y_negativos():
ch = build_correlacion(_profile(), {})
assert ch is not None
assert ch.id == "correlacion"
assert ch.version == CHAPTER_VERSION
kinds = [b.kind for b in ch.blocks]
assert "figure" in kinds # association matrix heatmap.
figs = [b for b in ch.blocks if isinstance(b, Figure)]
assert figs and figs[0].make is not None # lazy figure.
tables = [b for b in ch.blocks if isinstance(b, DataTable)]
assert len(tables) >= 2 # top positive + top negative.
flat = " ".join(str(c) for t in tables for r in t.rows for c in r)
# Strongest positive present and signed +, strongest negative present and -.
assert "alcohol" in flat and "quality" in flat
assert "+0.48" in flat
assert "density" in flat and "-0.78" in flat
def test_golden_render_pdf_y_pptx_muestran_lo_exigido():
prof = _profile()
with tempfile.TemporaryDirectory() as d:
pdf = os.path.join(d, "corr.pdf")
pptx = os.path.join(d, "corr.pptx")
rp = render_automatic_eda_pdf(prof, pdf, {"title": "EDA — wine"})
rx = render_automatic_eda_pptx(prof, pptx, {"title": "EDA — wine"})
assert rp["path"] == pdf and rp["n_pages"] >= 1
assert rx["path"] == pptx and rx["n_slides"] >= 1
assert "correlacion" in [c["id"] for c in rp["chapters"]]
assert "correlacion" in [c["id"] for c in rx["chapters"]]
txt = _pdf_text(pdf)
# The requirement: matrix + top positive/negative pairs, all visible.
assert "Correlaci" in txt # chapter title (accents may vary in extract).
assert "density" in txt and "alcohol" in txt and "quality" in txt
assert "0.78" in txt and "0.48" in txt
# Both signs surfaced as separate sections.
assert "positiv" in txt.lower() and "negativ" in txt.lower()
def test_edge_sin_pares_devuelve_none():
# No correlations key, empty pairs, and wrong types all yield None, not error.
assert build_correlacion({"table": "x"}, {}) is None
assert build_correlacion({"correlations": {}}, {}) is None
assert build_correlacion({"correlations": {"pairs": []}}, {}) is None
assert build_correlacion({"correlations": {"pairs": "nope"}}, {}) is None
assert build_correlacion(None, None) is None
assert build_correlacion({}, {}) is None
def test_edge_solo_positivos_emite_nota_sin_tabla_negativa():
prof = {
"correlations": {
"pairs": [
_pair("a", "b", 0.6, "pearson/spearman", 0.001, True),
{"a": "c", "b": "d", "a_type": "categorical",
"b_type": "categorical", "method": "cramers_v", "value": 0.7,
"extra": {"mi": 0.4}, "p_value": 0.001,
"p_value_adjusted": 0.003, "significant": True},
],
},
}
ch = build_correlacion(prof, {})
assert ch is not None
tables = [b for b in ch.blocks if isinstance(b, DataTable)]
assert len(tables) == 1 # only the positive table.
notes = " ".join(b.text for b in ch.blocks if b.kind == "note")
assert "negativas" in notes # honest "no negative correlations" note.
def test_anticorte_matriz_ancha_y_etiquetas_largas_no_se_cortan():
# 20 numeric vars with long names -> matrix trimmed to top-N + both renderers
# must lay the chapter out without raising and keep a long label intact.
long_a = "concentracion_de_dioxido_de_azufre_libre"
long_b = "concentracion_de_dioxido_de_azufre_total"
pairs = [_pair(long_a, long_b, -0.72, "pearson/spearman", 0.0001, True)]
for i in range(20):
pairs.append(_pair(f"variable_numerica_larga_{i:02d}",
f"variable_numerica_larga_{(i + 1) % 20:02d}",
0.55 - i * 0.02, "pearson/spearman", 0.01, True))
prof = {"correlations": {"pairs": pairs,
"multiple_testing": {"method": "bh", "alpha": 0.05,
"n_tests": len(pairs),
"n_rejected": len(pairs)}}}
ch = build_correlacion(prof, {})
assert ch is not None
# A "showing top-N most connected" note appears when the matrix is trimmed.
notes = " ".join(b.text for b in ch.blocks if b.kind == "note")
assert "más conectadas" in notes
# Anti-cut guarantee at the block level: the long pair reaches the renderer
# whole (the block never truncates); the renderer then wraps the cell inside
# its column. Both long labels are present, intact, in a table cell.
tables = [b for b in ch.blocks if isinstance(b, DataTable)]
cells = [str(c) for t in tables for r in t.rows for c in r]
assert any(long_a in c and long_b in c for c in cells)
with tempfile.TemporaryDirectory() as d:
pdf = os.path.join(d, "wide.pdf")
pptx = os.path.join(d, "wide.pptx")
rp = render_automatic_eda_pdf(prof, pdf, {"write_manifest": False})
rx = render_automatic_eda_pptx(prof, pptx, {"write_manifest": False})
# Both renderers lay the wide chapter out without raising and produce a
# non-empty document (nothing dropped, just wrapped/scaled to fit).
assert rp["path"] == pdf and os.path.exists(pdf) and rp["n_pages"] >= 1
assert rx["path"] == pptx and os.path.exists(pptx) and rx["n_slides"] >= 1
# A short, unbreakable fragment of the long label survives the wrap.
assert "azufre" in _pdf_text(pdf)
@@ -1,498 +0,0 @@
"""Models chapter (MODELOS) — cheap unsupervised models, rendered as markdown.
Builds the *Modelos* chapter of an AutomaticEDA document from the ``models``
block of a TableProfile (``run_eda_models`` output: ``{pca, kmeans, outliers,
normality}``). It renders, as structured markdown/tables/figures that the core
paginator never cuts:
1. **Normalization note** — every multivariate model below standardizes the
columns with z-score first; the chapter explains why (different scales would
otherwise dominate distance/variance).
2. **PCA** — a scree plot (explained + cumulative variance, single Y axis) plus
variance and top-loadings tables.
3. **KMeans segments** — a PCA scatter **coloured by cluster** (its own
page/slide), the cluster-size table, and a per-cluster LLM micro-analysis
with a title for each segment.
4. **Isolation Forest outliers** — a short explanation of how anomalous rows are
isolated multivariately and how the threshold is chosen, plus the counts.
5. **Normality** — per-column Jarque-Bera / D'Agostino / Shapiro verdicts.
The raw numeric data needed to colour the cluster scatter is **not** in the
TableProfile, so — exactly like ``overview`` reads ``head_rows`` from ``ctx`` —
this chapter looks for the cluster projection / raw numeric columns in ``ctx``
(or in ``profile``) and degrades honestly when they are absent: it falls back to
the uncoloured ``pca.projection`` with a note, or omits the scatter entirely.
ctx keys this chapter consumes (all optional):
cluster_projection : dict — a pre-computed ``project_clusters_2d`` result
(``points``/``labels``/``centers_2d``/``cluster_profiles``/...). Used
directly when present (forward-compatible with the calculation phase).
raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present
and ``cluster_projection`` is not, the chapter calls
``project_clusters_2d`` live to build points + aligned labels.
cluster_titles : list — pre-computed ``[{cluster, title, description}]``
(a ``describe_clusters_llm`` ``clusters`` list). Used for the per-cluster
micro-analysis without an LLM call (offline/tests).
run_cluster_llm : bool — when True and ``cluster_titles`` is absent, call
``describe_clusters_llm`` live on the cluster profiles.
cluster_llm_model : str — model id for the live LLM call.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
"""
from __future__ import annotations
from .. import model
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "modelos"
CHAPTER_TITLE = "Modelos"
# Tableau-10 palette (matplotlib's default cycle) — used both for the matplotlib
# scatter and to keep the legend/colours stable per cluster index.
_CLUSTER_COLORS = [
"#4e79a7", "#f28e2b", "#e15759", "#76b7b2", "#59a14f",
"#edc948", "#b07aa1", "#ff9da7", "#9c755f", "#bab0ac",
]
# --------------------------------------------------------------------------- #
# Formatting helpers (mirror the overview chapter's defensive style).
# --------------------------------------------------------------------------- #
def _fmt_num(value, decimals: int = 3) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "" if value else "no"
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return model._safe_str(value)
def _fmt_pct_ratio(value, decimals: int = 1) -> str:
"""Format a 0..1 ratio as a percentage."""
if value is None:
return ""
try:
return f"{float(value) * 100:.{decimals}f}%"
except (TypeError, ValueError):
return model._safe_str(value)
def _fmt_pct_already(value, decimals: int = 2) -> str:
"""Format a value that is *already* a 0..100 percentage."""
if value is None:
return ""
try:
return f"{float(value):.{decimals}f}%"
except (TypeError, ValueError):
return model._safe_str(value)
def _is_dict(v) -> bool:
return isinstance(v, dict)
# --------------------------------------------------------------------------- #
# Cluster projection: prefer a pre-computed result, else compute it live, else
# fall back to the uncoloured PCA projection.
# --------------------------------------------------------------------------- #
def _resolve_cluster_projection(profile: dict, ctx: dict):
"""Return (projection_dict_or_None, source_label).
Order: ctx/profile['cluster_projection'] (pre-computed) → live
project_clusters_2d on ctx/profile['raw_numeric'] → None.
"""
pre = ctx.get("cluster_projection") or profile.get("cluster_projection")
models = profile.get("models") if _is_dict(profile.get("models")) else {}
if not pre and _is_dict(models):
pre = models.get("cluster_projection")
if _is_dict(pre) and pre.get("points"):
return pre, "precomputed"
raw = ctx.get("raw_numeric") or profile.get("raw_numeric")
if _is_dict(raw) and raw:
try:
# Import the submodule's function explicitly (avoid the package
# attribute shadowing the function with the same-named module).
from datascience.project_clusters_2d import project_clusters_2d
proj = project_clusters_2d(raw)
if _is_dict(proj) and proj.get("points"):
return proj, "live"
except Exception: # noqa: BLE001 — never break the chapter.
return None, "none"
return None, "none"
def _cluster_titles(profile: dict, ctx: dict, projection: dict):
"""Return a list of {cluster, title, description} for the segments.
Order: ctx['cluster_titles'] (pre-computed) → live describe_clusters_llm when
ctx['run_cluster_llm'] and we have cluster_profiles → derived titles from the
distinctive features → None.
"""
pre = ctx.get("cluster_titles")
if isinstance(pre, list) and pre:
return [c for c in pre if _is_dict(c)]
profiles = (projection or {}).get("cluster_profiles") or []
feats = (projection or {}).get("feature_names") or []
if ctx.get("run_cluster_llm") and profiles:
try:
from datascience.describe_clusters_llm import describe_clusters_llm
out = describe_clusters_llm(
profiles, feats,
model=ctx.get("cluster_llm_model", "claude-haiku-4-5-20251001"))
clusters = (out or {}).get("clusters")
if isinstance(clusters, list) and clusters:
return [c for c in clusters if _is_dict(c)]
except Exception: # noqa: BLE001
pass
# Derived fallback: name each cluster by its distinctive features.
if profiles:
derived = []
for p in profiles:
if not _is_dict(p):
continue
cid = p.get("cluster", len(derived))
dist = p.get("distinctive") or []
label = ", ".join(model._safe_str(d) for d in dist[:2]) if dist else ""
title = f"Segmento {cid}" + (f"{label}" if label else "")
derived.append({"cluster": cid, "title": title, "description": ""})
if derived:
return derived
return None
# --------------------------------------------------------------------------- #
# Figure builders (lazy: matplotlib only imported when the renderer draws them).
# --------------------------------------------------------------------------- #
def _make_scree(pca: dict):
"""Return a zero-arg callable drawing the PCA scree plot, or None."""
evr = pca.get("explained_variance_ratio") or []
cum = pca.get("cumulative") or []
if not evr:
return None
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
comps = list(range(1, len(evr) + 1))
fig, ax = plt.subplots(figsize=(7.0, 4.2))
ax.bar(comps, evr, color="#4e79a7", alpha=0.85,
label="Varianza explicada")
if cum:
ax.plot(comps[:len(cum)], cum, color="#e15759", marker="o",
linewidth=1.8, label="Acumulada")
ax.set_xlabel("Componente principal")
ax.set_ylabel("Proporción de varianza")
ax.set_xticks(comps)
ax.set_ylim(0, 1.0)
ax.grid(axis="y", color="#dddddd", linewidth=0.6)
ax.legend(loc="best", fontsize=8, frameon=False)
ax.set_title("Varianza explicada por componente (PCA)", fontsize=10)
fig.tight_layout()
return fig
return _draw
def _make_cluster_scatter(projection: dict):
"""Return a zero-arg callable drawing the cluster scatter, or None."""
points = projection.get("points") or []
labels = projection.get("labels") or []
if not points or len(points) != len(labels):
return None
centers = projection.get("centers_2d") or []
explained = projection.get("explained_2d") or []
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(7.0, 5.2))
uniq = sorted(set(int(l) for l in labels))
for cl in uniq:
xs = [p[0] for p, l in zip(points, labels) if int(l) == cl]
ys = [p[1] for p, l in zip(points, labels) if int(l) == cl]
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
ax.scatter(xs, ys, s=14, c=color, alpha=0.7, linewidths=0,
label=f"Cluster {cl} (n={len(xs)})")
for cl, c in enumerate(centers):
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
ax.scatter([c[0]], [c[1]], s=180, c=color, marker="X",
edgecolors="black", linewidths=1.2, zorder=5)
xlab, ylab = "PC1", "PC2"
if len(explained) >= 2:
xlab = f"PC1 ({_fmt_pct_ratio(explained[0])} var.)"
ylab = f"PC2 ({_fmt_pct_ratio(explained[1])} var.)"
ax.set_xlabel(xlab)
ax.set_ylabel(ylab)
ax.set_title("Segmentos KMeans proyectados sobre el plano PCA",
fontsize=10)
ax.grid(color="#eeeeee", linewidth=0.5)
ax.legend(loc="best", fontsize=8, frameon=True, framealpha=0.9)
fig.tight_layout()
return fig
return _draw
# --------------------------------------------------------------------------- #
# Section builders. Each returns a list of blocks (possibly empty).
# --------------------------------------------------------------------------- #
def _normalization_intro() -> list:
text = (
"Estos modelos son **no supervisados**: buscan estructura latente sin "
"una variable objetivo. Antes de aplicarlos, todas las columnas "
"numéricas se **estandarizan con z-score** (cada valor menos la media, "
"dividido por la desviación típica). Sin esta normalización, una "
"variable con escala grande (p.ej. ingresos en euros) dominaría las "
"distancias y la varianza frente a otra de escala pequeña (p.ej. un "
"ratio entre 0 y 1), sesgando tanto el PCA como el KMeans. Tras la "
"estandarización todas las variables pesan por igual."
)
return [model.Heading(text="Modelos no supervisados", level=1),
model.Markdown(text=text)]
def _pca_section(pca: dict) -> list:
if not _is_dict(pca) or not pca.get("explained_variance_ratio"):
return []
blocks = [model.Heading(text="PCA — varianza explicada", level=2)]
n_used = pca.get("n_rows_used")
n_feat = pca.get("n_features")
intro = (
f"El PCA resume {_fmt_num(n_feat)} variables numéricas en componentes "
f"ortogonales ordenados por la varianza que capturan "
f"({_fmt_num(n_used)} filas usadas tras eliminar nulos). El gráfico de "
"sedimentación (scree) muestra cuánta varianza aporta cada componente y "
"su acumulado: un codo marca cuántos componentes bastan."
)
blocks.append(model.Markdown(text=intro))
scree = _make_scree(pca)
if scree is not None:
blocks.append(model.Figure(
make=scree, caption="Varianza explicada y acumulada por componente."))
evr = pca.get("explained_variance_ratio") or []
cum = pca.get("cumulative") or []
rows = []
for i, v in enumerate(evr):
acc = cum[i] if i < len(cum) else None
rows.append([f"PC{i + 1}", _fmt_pct_ratio(v), _fmt_pct_ratio(acc)])
if rows:
blocks.append(model.DataTable(
header=["Componente", "Varianza", "Acumulada"], rows=rows,
title="Varianza por componente"))
# Top loadings: keep the strongest features per component (capped).
loadings = pca.get("top_loadings") or []
if loadings:
per_comp: dict = {}
for ld in loadings:
if not _is_dict(ld):
continue
comp = ld.get("component")
per_comp.setdefault(comp, [])
if len(per_comp[comp]) < 4:
per_comp[comp].append(ld)
rows = []
for comp in sorted(per_comp.keys(), key=lambda x: (x is None, x)):
for ld in per_comp[comp]:
rows.append([f"PC{int(comp) + 1}" if comp is not None else "",
model._safe_str(ld.get("feature")),
_fmt_num(ld.get("loading"))])
if rows:
blocks.append(model.DataTable(
header=["Componente", "Variable", "Carga"], rows=rows,
title="Cargas principales (top por componente)",
note="Cargas con mayor valor absoluto: qué variables definen "
"cada eje."))
return blocks
def _kmeans_section(kmeans: dict, projection: dict, titles) -> list:
has_km = _is_dict(kmeans) and kmeans.get("best_k")
has_proj = _is_dict(projection) and projection.get("points")
if not has_km and not has_proj:
return []
blocks = [model.Heading(text="Segmentación (KMeans)", level=2)]
best_k = (projection or {}).get("best_k") or (kmeans or {}).get("best_k")
sil = (projection or {}).get("silhouette")
if sil is None:
sil = (kmeans or {}).get("silhouette")
intro = (
f"KMeans agrupa las filas en **{_fmt_num(best_k)} segmentos** elegidos "
"automáticamente maximizando el coeficiente de *silhouette* "
f"(**{_fmt_num(sil)}**, rango 1 a 1: cuanto más alto, segmentos más "
"compactos y separados). Los segmentos se proyectan sobre el plano de "
"los dos primeros componentes principales para visualizarlos."
)
blocks.append(model.Markdown(text=intro))
if has_proj:
scatter = _make_cluster_scatter(projection)
if scatter is not None:
blocks.append(model.Figure(
make=scatter,
caption="Cada punto es una fila coloreada por su segmento "
"KMeans; las «X» son los centroides."))
else:
blocks.append(model.Note(
"Proyección de clusters no dibujable (puntos y etiquetas "
"desalineados)."))
else:
# We have kmeans stats but no aligned points+labels to colour by.
blocks.append(model.Note(
"Scatter coloreado por segmento no disponible: el perfil no incluye "
"la proyección con etiquetas alineadas (pásala en "
"ctx['cluster_projection'] o las columnas crudas en "
"ctx['raw_numeric'] para colorear el plano PCA)."))
# Cluster sizes table.
sizes = (projection or {}).get("cluster_sizes") or (kmeans or {}).get("cluster_sizes") or []
total = sum(s for s in sizes if isinstance(s, (int, float))) or 0
if sizes:
rows = []
for i, s in enumerate(sizes):
pct = (s / total) if total else None
rows.append([f"Cluster {i}", _fmt_num(s), _fmt_pct_ratio(pct)])
blocks.append(model.DataTable(
header=["Segmento", "Tamaño", "% del total"], rows=rows,
title="Tamaño de cada segmento"))
# Per-cluster LLM micro-analysis (each entry kept indivisible as one block).
if titles:
blocks.append(model.Heading(text="Interpretación de los segmentos",
level=3))
for t in titles:
if not _is_dict(t):
continue
cid = t.get("cluster")
title = model._safe_str(t.get("title")) or f"Cluster {cid}"
desc = model._safe_str(t.get("description"))
line = f"**Cluster {cid}{title}.**"
if desc:
line += " " + desc
blocks.append(model.Markdown(text=line))
return blocks
def _outliers_section(outliers: dict) -> list:
if not _is_dict(outliers) or outliers.get("n_outliers") is None:
return []
if outliers.get("note") and not outliers.get("n_rows_used"):
# insufficient data — nothing meaningful to show.
return []
blocks = [model.Heading(text="Detección de anomalías (Isolation Forest)",
level=2)]
explain = (
"**Isolation Forest** detecta filas anómalas de forma *multivariante*: "
"construye árboles que parten el espacio con cortes aleatorios y mide "
"cuántos cortes hacen falta para aislar cada fila. Las filas raras "
"(combinaciones de valores poco frecuentes considerando **todas las "
"columnas a la vez**, no una sola) se aíslan con muy pocos cortes y "
"obtienen un score bajo. El **umbral** de decisión separa las filas "
"normales de las anómalas según la contaminación esperada del modelo: "
"una fila es outlier cuando su score queda por debajo de ese umbral."
)
blocks.append(model.Markdown(text=explain))
blocks.append(model.KVTable(rows=[
("Filas analizadas", _fmt_num(outliers.get("n_rows_used"))),
("Outliers detectados", _fmt_num(outliers.get("n_outliers"))),
("% outliers", _fmt_pct_already(outliers.get("outlier_pct"))),
("Umbral de decisión", _fmt_num(outliers.get("threshold"), 4)),
], title="Anomalías multivariantes"))
return blocks
def _normality_section(normality: dict) -> list:
if not _is_dict(normality) or not normality:
return []
header = ["Columna", "Jarque-Bera (p)", "D'Agostino (p)", "Shapiro (p)",
"¿Normal?"]
rows = []
for col, res in normality.items():
if not _is_dict(res):
continue
jb = res.get("jarque_bera") if _is_dict(res.get("jarque_bera")) else {}
da = res.get("dagostino") if _is_dict(res.get("dagostino")) else {}
sh = res.get("shapiro") if _is_dict(res.get("shapiro")) else {}
is_norm = res.get("is_normal")
if res.get("note") and is_norm is None and not jb:
rows.append([model._safe_str(col), "", "", "",
model._safe_str(res.get("note"))])
continue
rows.append([
model._safe_str(col),
_fmt_num(jb.get("p"), 4) if jb else "",
_fmt_num(da.get("p"), 4) if da else "",
_fmt_num(sh.get("p"), 4) if sh else "",
"" if is_norm else ("no" if is_norm is not None else ""),
])
if not rows:
return []
return [
model.Heading(text="Normalidad de las variables", level=2),
model.Markdown(text=(
"Tests de hipótesis de normalidad por columna (hipótesis nula: la "
"muestra proviene de una distribución normal). Se marca **normal** "
"cuando el p-valor supera 0,05 (no se rechaza la nula). Pocas "
"variables reales son estrictamente normales; esto orienta qué "
"transformaciones o tests robustos aplicar después.")),
model.DataTable(header=header, rows=rows,
title="Pruebas de normalidad"),
]
# --------------------------------------------------------------------------- #
# Entry point.
# --------------------------------------------------------------------------- #
def build_modelos(profile: dict, ctx: dict):
"""Build the MODELOS Chapter, or None if there are no models to show."""
profile = profile or {}
ctx = ctx or {}
if not isinstance(profile, dict):
return None
models = profile.get("models")
if not _is_dict(models):
return None
pca = models.get("pca") if _is_dict(models.get("pca")) else None
kmeans = models.get("kmeans") if _is_dict(models.get("kmeans")) else None
outliers = models.get("outliers") if _is_dict(models.get("outliers")) else None
normality = models.get("normality") if _is_dict(models.get("normality")) else None
projection, _src = _resolve_cluster_projection(profile, ctx)
titles = _cluster_titles(profile, ctx, projection) if (
(kmeans and kmeans.get("best_k")) or (projection and projection.get("points"))
) else None
sections = []
sections += _pca_section(pca) if pca else []
sections += _kmeans_section(kmeans, projection, titles)
sections += _outliers_section(outliers) if outliers else []
sections += _normality_section(normality) if normality else []
if not sections:
return None # models block present but nothing renderable.
blocks = _normalization_intro() + sections
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -1,259 +0,0 @@
"""Tests for the MODELOS chapter — DoD: golden + edges + anti-cut.
Self-contained: builds a synthetic TableProfile with a ``models`` block (no
DuckDB, no sklearn, no LLM, no network). The cluster scatter is fed a synthetic
pre-computed ``cluster_projection`` via ``ctx`` and the per-cluster titles via
``ctx['cluster_titles']`` so the suite is fast and deterministic. The live paths
(``project_clusters_2d`` / ``describe_clusters_llm``) are exercised against the
real wine dataset in the work report, not here.
Verifies: the chapter renders to PDF *and* PPTX showing the user-required pieces
(markdown text, PCA scree, cluster scatter, per-cluster LLM micro-analysis,
outlier + normalization explanations); that an inapplicable profile yields None
without raising; and that a long normality table is split without losing any
column (anti-cut).
"""
import os
import re
import tempfile
from pypdf import PdfReader
from pptx import Presentation
from datascience.automatic_eda.chapters.modelos import build_modelos
from datascience.automatic_eda.model import Figure, DataTable, Markdown
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
# --------------------------------------------------------------------------- #
# Synthetic fixtures.
# --------------------------------------------------------------------------- #
def _models_block(n_norm_cols: int = 4) -> dict:
feats = ["fixed_acidity", "alcohol", "ph", "sulphates"]
normality = {}
for i in range(n_norm_cols):
normality[f"col_{i}"] = {
"n": 500,
"jarque_bera": {"stat": 12.3, "p": 0.002 + i * 0.0001, "normal": False},
"dagostino": {"stat": 9.1, "p": 0.01, "normal": False},
"shapiro": {"stat": 0.98, "p": 0.04, "normal": False},
"is_normal": False,
}
return {
"n_numeric_cols": 4,
"pca": {
"n_components": 2, "n_rows_used": 1599, "n_features": 4,
"explained_variance_ratio": [0.41, 0.22],
"cumulative": [0.41, 0.63],
"top_loadings": [
{"component": 0, "feature": "alcohol", "loading": 0.62},
{"component": 0, "feature": "fixed_acidity", "loading": -0.48},
{"component": 1, "feature": "ph", "loading": 0.71},
{"component": 1, "feature": "sulphates", "loading": 0.33},
],
"projection": [[0.1, 0.2], [0.3, -0.1]],
},
"kmeans": {
"best_k": 3, "silhouette": 0.27,
"scores_by_k": [{"k": 2, "silhouette": 0.21}, {"k": 3, "silhouette": 0.27}],
"cluster_sizes": [700, 500, 399],
"centers": [[0.1, 0.2, 0.3, 0.4]],
"n_rows_used": 1599, "n_features": 4,
},
"outliers": {
"n_outliers": 80, "outlier_pct": 5.0, "threshold": -0.0123,
"n_rows_used": 1599,
},
"normality": normality,
"note": "",
"_feats": feats,
}
def _cluster_projection() -> dict:
# 30 points across 3 clusters, aligned points<->labels.
points, labels = [], []
centers = [(-2.0, -2.0), (2.0, 0.0), (0.0, 2.5)]
for cl, (cx, cy) in enumerate(centers):
for j in range(10):
points.append([cx + (j - 5) * 0.05, cy + (j - 5) * 0.05])
labels.append(cl)
return {
"points": points, "labels": labels,
"centers_2d": [list(c) for c in centers],
"best_k": 3, "silhouette": 0.27,
"explained_2d": [0.41, 0.22],
"cluster_sizes": [10, 10, 10],
"cluster_profiles": [
{"cluster": 0, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 9.5, "ph": 3.5},
"distinctive": ["alcohol", "ph"], "centroid_z": {"alcohol": -1.2}},
{"cluster": 1, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 12.0, "ph": 3.1},
"distinctive": ["alcohol"], "centroid_z": {"alcohol": 1.4}},
{"cluster": 2, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 10.5, "ph": 3.8},
"distinctive": ["ph"], "centroid_z": {"ph": 1.6}},
],
"feature_names": ["alcohol", "ph", "fixed_acidity", "sulphates"],
"n_used": 1599, "note": "",
}
def _ctx_full() -> dict:
return {
"cluster_projection": _cluster_projection(),
"cluster_titles": [
{"cluster": 0, "title": "Vinos suaves de baja graduación",
"description": "Alcohol bajo y pH alto; perfil ligero."},
{"cluster": 1, "title": "Vinos potentes",
"description": "Alta graduación alcohólica."},
{"cluster": 2, "title": "Vinos de pH elevado",
"description": "Acidez baja relativa al resto."},
],
}
def _profile() -> dict:
return {"table": "wine", "n_rows": 1599, "n_cols": 12,
"models": _models_block()}
def _pdf_text(path: str) -> str:
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
return re.sub(r"\s+", " ", txt)
def _pptx_text(path: str) -> str:
prs = Presentation(path)
out = []
for slide in prs.slides:
for shape in slide.shapes:
if shape.has_text_frame:
out.append(shape.text_frame.text)
return re.sub(r"\s+", " ", " ".join(out))
# --------------------------------------------------------------------------- #
# Golden.
# --------------------------------------------------------------------------- #
def test_golden_build_modelos_bloques_requeridos():
ch = build_modelos(_profile(), _ctx_full())
assert ch is not None
assert ch.id == "modelos" and ch.version
# Both figures present: scree plot + cluster scatter.
n_figures = sum(1 for b in ch.blocks if isinstance(b, Figure))
assert n_figures >= 2
# Tables present (variance, loadings, sizes, normality).
assert sum(1 for b in ch.blocks if isinstance(b, DataTable)) >= 3
# Markdown carries the required explanations.
md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown))
assert "z-score" in md # normalization explained
assert "Isolation Forest" in md # outlier generation explained
assert "silhouette" in md # kmeans
# Per-cluster micro-analysis titles present.
assert "Vinos potentes" in md
assert "Cluster 1" in md
def test_golden_render_pdf_muestra_lo_exigido():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "modelos.pdf")
res = render_automatic_eda_pdf(
_profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()})
assert res["path"] == out and os.path.exists(out)
ids = [c["id"] for c in res["chapters"]]
assert "modelos" in ids
txt = _pdf_text(out)
for needle in ("Modelos no supervisados", "z-score", "PCA",
"Segmentación", "Isolation Forest", "Normalidad",
"Vinos potentes"):
assert needle in txt, f"falta en PDF: {needle}"
def test_golden_render_pptx_muestra_lo_exigido():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "modelos.pptx")
res = render_automatic_eda_pptx(
_profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()})
assert res["path"] == out and os.path.exists(out)
assert res["n_slides"] >= 1
txt = _pptx_text(out)
for needle in ("Modelos no supervisados", "z-score", "Isolation Forest",
"Vinos potentes"):
assert needle in txt, f"falta en PPTX: {needle}"
# --------------------------------------------------------------------------- #
# Edges.
# --------------------------------------------------------------------------- #
def test_edge_profile_none_o_vacio_devuelve_none():
assert build_modelos(None, {}) is None
assert build_modelos({}, {}) is None
assert build_modelos({"n_rows": 5}, None) is None # no 'models' key
def test_edge_models_insuficiente_devuelve_none():
prof = {"table": "tiny", "models": {
"n_numeric_cols": 1,
"pca": {"n_components": 0, "explained_variance_ratio": [],
"note": "datos insuficientes"},
"kmeans": {"best_k": 0, "note": "datos insuficientes"},
"outliers": {"n_outliers": 0, "note": "datos insuficientes"},
"normality": None,
"note": "insuficientes columnas numericas para modelos multivariantes",
}}
assert build_modelos(prof, {}) is None
def test_edge_solo_normalidad_si_genera_capitulo():
# A single numeric column: only normality applies. Chapter must still build.
prof = {"table": "one", "models": {
"n_numeric_cols": 1, "pca": None, "kmeans": None, "outliers": None,
"normality": {"x": {"n": 500, "jarque_bera": {"stat": 1.0, "p": 0.2,
"normal": True}, "dagostino": {"stat": 1.0, "p": 0.3,
"normal": True}, "shapiro": {"stat": 0.99, "p": 0.4,
"normal": True}, "is_normal": True}},
}}
ch = build_modelos(prof, {})
assert ch is not None
md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown))
assert "z-score" in md # normalization intro still present
def test_edge_kmeans_sin_proyeccion_degrada_sin_romper():
# kmeans stats present but no cluster_projection / raw_numeric to colour by.
prof = _profile()
ch = build_modelos(prof, {}) # no ctx projection
assert ch is not None
# No scatter figure for clusters, but a Note explaining the degradation.
notes = [b.text for b in ch.blocks if b.kind == "note"]
assert any("ctx['raw_numeric']" in n or "cluster_projection" in n
for n in notes)
# PDF still renders fine.
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "deg.pdf")
res = render_automatic_eda_pdf(prof, out, {"write_manifest": False})
assert res["path"] == out and os.path.exists(out)
# --------------------------------------------------------------------------- #
# Anti-cut.
# --------------------------------------------------------------------------- #
def test_anticortes_tabla_normalidad_larga_no_corta():
# 40 numeric columns → the normality DataTable must split across pages,
# repeating the header, without losing any column name.
prof = {"table": "wide", "models": _models_block(n_norm_cols=40)}
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "wide.pdf")
render_automatic_eda_pdf(prof, out, {"write_manifest": False,
"ctx": _ctx_full()})
reader = PdfReader(out)
n_pages = len(reader.pages)
assert n_pages > 1
txt = "".join((pg.extract_text() or "") for pg in reader.pages)
# Every column name survives (wrapped/split, never truncated).
for i in (0, 19, 39):
assert f"col_{i}" in txt
@@ -1,97 +0,0 @@
---
name: describe_clusters_llm
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def describe_clusters_llm(cluster_profiles: list, feature_names: list, model: str = \"claude-haiku-4-5-20251001\") -> dict"
description: "Micro-analisis LLM de clusters de KMeans (grupo eda). Toma los perfiles AGREGADOS de cada cluster (los que produce project_clusters_2d: tamano, centroide en escala original, features distintivas y centroide en z-score) y, con UNA sola llamada al LLM, pide por cada cluster un TITULO corto + una descripcion de 1-2 frases en espanol. Clave de coste/privacidad: NO envia filas crudas, solo el resumen agregado de cada grupo (tamano, % del total y la media de las features distintivas con su signo respecto a la media global). Reusa ask_llm del grupo claude-direct (API directa con token OAuth de Claude). Impura, dict-no-throw: nunca lanza, degrada a titulos genericos 'Cluster N' si el LLM no responde o el parseo falla."
tags: [eda, clustering, llm, claude-direct, datascience, kmeans]
params:
- name: cluster_profiles
desc: "Lista de perfiles de cluster con la forma que produce project_clusters_2d: cada uno {cluster:int, size:int, pct:float, centroid_original:{feature: media en escala original}, distinctive:[features distintivas], centroid_z:{feature: z-score}}. Solo se le envia al LLM un resumen agregado; nunca filas crudas. Lista vacia o no-lista -> clusters=[] sin llamar al LLM."
- name: feature_names
desc: "Nombres de las features del dataset. Se incluyen como contexto en el prompt para que el LLM pueda nombrar los clusters; no es obligatorio que coincida con las features distintivas de cada perfil."
- name: model
desc: "id del modelo Anthropic a usar. Default 'claude-haiku-4-5-20251001' (haiku, coste bajo, ~2-3s). Para titulos/descripciones mas finas, pasar p.ej. 'claude-opus-4-8'."
output: "dict dict-no-throw: {clusters:[{cluster:int, title:str, description:str}], model:str, note:str}. note=='' si todo fue bien. Si el LLM no respondio (note='LLM no disponible') o el parseo fallo (note='parse fallido'), clusters trae titulos genericos 'Cluster N' con description vacia. Si cluster_profiles esta vacio o no es lista: {clusters:[], model, note:'sin clusters'}. NUNCA lanza."
uses_functions: [ask_llm_py_core]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: true
tests: ["test_parse_clusters_json_valid_array", "test_parse_clusters_json_wrapped_in_junk_text", "test_parse_clusters_json_non_json_returns_none", "test_parse_clusters_json_fills_missing_cluster_by_index", "test_describe_clusters_llm_ok_with_monkeypatched_llm", "test_describe_clusters_llm_degrades_on_empty_response", "test_describe_clusters_llm_degrades_on_unparseable_response", "test_describe_clusters_llm_empty_list_skips_llm", "test_describe_clusters_llm_non_list_input_skips_llm"]
test_file_path: "python/functions/datascience/describe_clusters_llm_test.py"
file_path: "python/functions/datascience/describe_clusters_llm.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.describe_clusters_llm import describe_clusters_llm
# Perfiles agregados producidos por project_clusters_2d (no hay filas crudas).
cluster_profiles = [
{
"cluster": 0, "size": 60, "pct": 60.0,
"centroid_original": {"acidez": 8.5, "alcohol": 9.2},
"distinctive": ["acidez", "alcohol"],
"centroid_z": {"acidez": 1.4, "alcohol": -0.9},
},
{
"cluster": 1, "size": 40, "pct": 40.0,
"centroid_original": {"acidez": 5.1, "alcohol": 13.0},
"distinctive": ["alcohol"],
"centroid_z": {"acidez": -0.7, "alcohol": 1.6},
},
]
feature_names = ["acidez", "alcohol", "azucar"]
out = describe_clusters_llm(cluster_profiles, feature_names) # haiku por defecto
# out = describe_clusters_llm(cluster_profiles, feature_names, model="claude-opus-4-8")
if not out["note"]:
for c in out["clusters"]:
print(f"Cluster {c['cluster']}: {c['title']}")
print(" ", c["description"])
else:
# Degradacion: titulos genericos "Cluster N".
print("LLM no usado:", out["note"])
for c in out["clusters"]:
print(c["cluster"], c["title"])
```
## Cuando usarla
Cuando ya has clusterizado un dataset (KMeans + `project_clusters_2d`) y quieres
poner NOMBRE y descripcion legible a cada grupo en vez de dejar "Cluster 0/1/2".
Es el paso interpretativo que sigue al perfilado de clusters: `project_clusters_2d`
calcula tamano, centroides y features distintivas, y `describe_clusters_llm` los
traduce a un titulo corto + 1-2 frases por cluster. Usala al cerrar un EDA con
segmentacion para el resumen final o el report. Una sola llamada al LLM describe
todos los clusters a la vez (barato).
## Gotchas
- **Impura: hace 1 llamada de red al LLM.** No es determinista ni gratis. Latencia
tipica ~2-3s con haiku.
- **Requiere token OAuth de Claude** en `~/.claude/.credentials.json` (via `ask_llm`
/ grupo `claude-direct`). Sin token / sin red, NO lanza: degrada a titulos
genericos `Cluster N` con `note="LLM no disponible"`.
- **NO envia filas crudas al LLM**, solo el resumen AGREGADO de cada cluster
(tamano, % del total y la media de las features distintivas con su signo respecto
a la media global). Privacidad y coste minimos por diseno — pero requiere que los
perfiles vengan ya calculados por `project_clusters_2d`.
- **Modelo `haiku` por defecto** para coste bajo; sube a `claude-opus-4-8` si
necesitas titulos/descripciones mas finas (mas caro y lento).
- **dict-no-throw**: si el modelo no devuelve un JSON array parseable, retorna
titulos genericos con `note="parse fallido"`. Comprueba siempre `out["note"]`
antes de fiarte de los titulos.
- El LLM puede sobre-interpretar: el system prompt le pide ser sobrio y no inventar
causas, pero revisa los titulos antes de publicarlos en un report.
@@ -1,240 +0,0 @@
"""describe_clusters_llm — micro-analisis LLM de clusters de KMeans (grupo `eda`).
Toma los PERFILES AGREGADOS de cada cluster (los que produce `project_clusters_2d`:
tamano, centroide en escala original, features distintivas y centroide en z-score)
y, con UNA sola llamada al LLM, pide por cada cluster un TITULO corto + una
descripcion de 1-2 frases, en espanol.
Clave de coste y privacidad: NO se envian filas crudas al LLM. Solo viaja el
perfil AGREGADO de cada grupo (tamano, % del total y la media de las features
distintivas con su signo respecto a la media global). El coste es minimo y ningun
dato fila-a-fila sale del proceso.
Reusa `ask_llm` del registry (grupo claude-direct, API directa con el token OAuth
de Claude en ~/.claude/.credentials.json, arranque 0). Impura: una llamada de red.
Estilo dict-no-throw: NUNCA lanza; ante cualquier fallo (red, LLM caido, parseo)
degrada a titulos genericos "Cluster N" + una nota explicando el motivo.
"""
import json
from core.ask_llm import ask_llm
_SYSTEM = (
"Eres un analista de datos. Recibes los PERFILES AGREGADOS de los clusters de "
"un KMeans (por cada grupo: su tamano y la media de sus features distintivas, "
"con el signo respecto a la media global; nunca filas crudas) y los describes "
"de forma sobria y util. Para cada cluster generas un titulo corto y "
"descriptivo (por ejemplo 'Vinos de alta acidez y baja graduacion') y una "
"descripcion de 1-2 frases. NO inventes causas ni sobre-interpretes: limitate a "
"lo que dicen los numeros. Responde en espanol. Responde SIEMPRE y SOLO con un "
"unico JSON array valido, sin texto alrededor y sin fences de markdown, con "
'EXACTAMENTE la forma [{"cluster": <int>, "title": "<titulo corto>", '
'"description": "<1-2 frases>"}], un objeto por cluster.'
)
def _fmt_num(value) -> str:
"""Formatea un numero de forma compacta para el prompt (None -> '?')."""
if value is None:
return "?"
if isinstance(value, bool):
return str(value)
if isinstance(value, float):
if value == int(value):
return str(int(value))
return f"{value:.4g}"
return str(value)
def _cluster_id(profile: dict, index: int) -> int:
"""Devuelve el id del cluster del perfil, o el indice si no es un int valido."""
raw = (profile or {}).get("cluster")
if isinstance(raw, bool):
return index
if isinstance(raw, int):
return raw
try:
return int(raw)
except (TypeError, ValueError):
return index
def _build_prompt(cluster_profiles: list, feature_names: list) -> str:
"""Construye un resumen textual compacto de los perfiles para el LLM.
Funcion interna PURA: no toca red ni disco, es testeable sin credenciales.
Por cada cluster incluye su numero, tamano (size + pct%) y, para cada feature
distintiva, el valor del centroide en escala original mas si esta por encima o
por debajo de la media (signo del z-score en centroid_z). Pasa AGREGADOS, nunca
dato crudo de filas.
Args:
cluster_profiles: lista de perfiles de cluster (forma de project_clusters_2d).
feature_names: nombres de las features del dataset (solo contexto).
Returns:
El texto del prompt.
"""
cluster_profiles = cluster_profiles or []
feature_names = feature_names if isinstance(feature_names, list) else []
lines = [
"Perfiles AGREGADOS de clusters de KMeans. No hay filas crudas, solo medias por grupo.",
f"Numero de clusters: {len(cluster_profiles)}",
]
if feature_names:
lines.append("Features del dataset: " + ", ".join(str(f) for f in feature_names))
lines.append("")
for i, prof in enumerate(cluster_profiles):
prof = prof or {}
cid = _cluster_id(prof, i)
size = prof.get("size")
pct = prof.get("pct")
pct_str = f"{pct:.1f}%" if isinstance(pct, (int, float)) and not isinstance(pct, bool) else "?"
lines.append(f"Cluster {cid}: tamano={_fmt_num(size)} ({pct_str} del total)")
distinctive = prof.get("distinctive") or []
centroid_o = prof.get("centroid_original") or {}
centroid_z = prof.get("centroid_z") or {}
if distinctive:
lines.append(" Features distintivas (media del grupo):")
for feat in distinctive:
val = centroid_o.get(feat)
z = centroid_z.get(feat)
direction = ""
if isinstance(z, (int, float)) and not isinstance(z, bool):
if z > 0:
direction = "por encima de la media"
elif z < 0:
direction = "por debajo de la media"
else:
direction = "en la media"
if direction:
lines.append(f" - {feat}: {_fmt_num(val)} ({direction})")
else:
lines.append(f" - {feat}: {_fmt_num(val)}")
else:
lines.append(" (sin features distintivas marcadas)")
lines.append("")
lines.append(
"Devuelve SOLO el JSON array descrito en las instrucciones del sistema, "
"sin texto antes ni despues."
)
return "\n".join(lines)
def _parse_clusters_json(text: str, n: int):
"""Extrae y normaliza el array JSON de la respuesta del LLM.
Funcion interna testeable sin red. Localiza el primer '[' y el ultimo ']' del
texto (tolerando texto basura alrededor o fences de markdown), hace json.loads
y normaliza cada entrada a {cluster:int, title:str, description:str}, rellenando
el cluster por indice si falta. NUNCA lanza: ante cualquier fallo devuelve None
(senal de degradacion para el caller).
Args:
text: respuesta cruda del LLM.
n: numero de perfiles esperados (referencia; la longitud real la marca el array).
Returns:
Lista normalizada de dicts, o None si no se pudo parsear un array valido.
"""
if not text or not isinstance(text, str):
return None
start = text.find("[")
end = text.rfind("]")
if start == -1 or end == -1 or end <= start:
return None
try:
data = json.loads(text[start : end + 1])
except (ValueError, TypeError):
return None
if not isinstance(data, list):
return None
out = []
for i, item in enumerate(data):
if not isinstance(item, dict):
out.append({"cluster": i, "title": f"Cluster {i}", "description": ""})
continue
raw_cluster = item.get("cluster")
if isinstance(raw_cluster, bool):
cluster = i
elif isinstance(raw_cluster, int):
cluster = raw_cluster
else:
try:
cluster = int(raw_cluster)
except (TypeError, ValueError):
cluster = i
title = item.get("title")
title = str(title) if title is not None else f"Cluster {cluster}"
desc = item.get("description")
desc = str(desc) if desc is not None else ""
out.append({"cluster": cluster, "title": title, "description": desc})
return out
def _generic_clusters(cluster_profiles: list) -> list:
"""Titulos genericos por cluster para la degradacion (sin LLM)."""
out = []
for i, prof in enumerate(cluster_profiles):
cid = _cluster_id(prof or {}, i)
out.append({"cluster": cid, "title": f"Cluster {cid}", "description": ""})
return out
def describe_clusters_llm(
cluster_profiles: list,
feature_names: list,
model: str = "claude-haiku-4-5-20251001",
) -> dict:
"""Describe los clusters de un KMeans con UNA sola llamada al LLM.
Args:
cluster_profiles: lista de perfiles de cluster (la forma que produce
project_clusters_2d): cada uno {"cluster": int, "size": int,
"pct": float, "centroid_original": {feature: media},
"distinctive": [features], "centroid_z": {feature: z}}. Solo se le
envia al LLM el resumen agregado, nunca filas crudas.
feature_names: nombres de las features del dataset (contexto para el LLM).
model: id del modelo Anthropic. Default claude-haiku-4-5-20251001
(haiku, coste bajo).
Returns:
dict dict-no-throw: {"clusters": [{cluster:int, title:str, description:str}],
"model": str, "note": str}. note == "" si todo fue bien; si el LLM no
respondio o el parseo fallo, clusters trae titulos genericos "Cluster N" y
note explica el motivo ("LLM no disponible" / "parse fallido"). Si
cluster_profiles esta vacio o no es lista, devuelve clusters=[] sin llamar
al LLM (note "sin clusters"). NUNCA lanza.
"""
if not isinstance(cluster_profiles, list) or not cluster_profiles:
return {"clusters": [], "model": model, "note": "sin clusters"}
n = len(cluster_profiles)
prompt = _build_prompt(cluster_profiles, feature_names)
try:
text = ask_llm(prompt, model=model, system=_SYSTEM, echo=False)
except Exception: # noqa: BLE001 — degradacion: cualquier fallo de red/LLM.
text = ""
parsed = _parse_clusters_json(text, n)
if parsed:
return {"clusters": parsed, "model": model, "note": ""}
note = "LLM no disponible" if not text else "parse fallido"
return {"clusters": _generic_clusters(cluster_profiles), "model": model, "note": note}
@@ -1,160 +0,0 @@
"""Tests para describe_clusters_llm.
NO acceden a red ni a credenciales: _parse_clusters_json es testeable aislada y la
unica via que llamaria al LLM (describe_clusters_llm) se prueba monkeypatcheando
ask_llm con respuestas simuladas. Cubre golden (LLM ok), edge (cluster faltante,
array envuelto en basura, lista vacia / input no-lista) y error (LLM caido, texto
no parseable) — todos sin tocar la red.
"""
import importlib
import json
from datascience.describe_clusters_llm import (
_parse_clusters_json,
describe_clusters_llm,
)
# Perfiles de ejemplo con la forma que produce project_clusters_2d.
_PROFILES = [
{
"cluster": 0,
"size": 60,
"pct": 60.0,
"centroid_original": {"acidez": 8.5, "alcohol": 9.2},
"distinctive": ["acidez", "alcohol"],
"centroid_z": {"acidez": 1.4, "alcohol": -0.9},
},
{
"cluster": 1,
"size": 40,
"pct": 40.0,
"centroid_original": {"acidez": 5.1, "alcohol": 13.0},
"distinctive": ["alcohol"],
"centroid_z": {"acidez": -0.7, "alcohol": 1.6},
},
]
_FEATURES = ["acidez", "alcohol", "azucar"]
def _patch_ask_llm(monkeypatch, returner):
"""Monkeypatchea ask_llm en el modulo bajo prueba con un callable simulado."""
mod = importlib.import_module("datascience.describe_clusters_llm")
monkeypatch.setattr(
mod, "ask_llm", lambda prompt, model="x", system="", echo=True: returner
)
# --- _parse_clusters_json (parser puro, sin red) ---
def test_parse_clusters_json_valid_array():
text = json.dumps(
[
{"cluster": 0, "title": "A", "description": "desc a"},
{"cluster": 1, "title": "B", "description": "desc b"},
]
)
parsed = _parse_clusters_json(text, 2)
assert parsed == [
{"cluster": 0, "title": "A", "description": "desc a"},
{"cluster": 1, "title": "B", "description": "desc b"},
]
def test_parse_clusters_json_wrapped_in_junk_text():
payload = [{"cluster": 0, "title": "Solo uno", "description": "d"}]
text = "Claro, aqui tienes el resultado:\n" + json.dumps(payload) + "\nEspero que sirva."
parsed = _parse_clusters_json(text, 1)
assert parsed[0]["title"] == "Solo uno"
assert parsed[0]["cluster"] == 0
def test_parse_clusters_json_non_json_returns_none():
# Texto sin array JSON -> degradacion (None) sin lanzar.
assert _parse_clusters_json("no hay json aqui", 2) is None
assert _parse_clusters_json("", 2) is None
assert _parse_clusters_json("{solo un objeto}", 2) is None
def test_parse_clusters_json_fills_missing_cluster_by_index():
text = json.dumps(
[
{"title": "A", "description": "d"},
{"title": "B", "description": "e"},
]
)
parsed = _parse_clusters_json(text, 2)
assert parsed[0]["cluster"] == 0
assert parsed[1]["cluster"] == 1
assert parsed[0]["title"] == "A"
# --- describe_clusters_llm (con ask_llm monkeypatcheado, sin red) ---
def test_describe_clusters_llm_ok_with_monkeypatched_llm(monkeypatch):
fake = json.dumps(
[
{
"cluster": 0,
"title": "Vinos de alta acidez",
"description": "Acidez por encima de la media y graduacion baja.",
},
{
"cluster": 1,
"title": "Vinos de alta graduacion",
"description": "Alcohol claramente por encima de la media.",
},
]
)
_patch_ask_llm(monkeypatch, fake)
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["note"] == ""
assert out["model"] == "claude-haiku-4-5-20251001"
assert len(out["clusters"]) == 2
assert out["clusters"][0]["title"] == "Vinos de alta acidez"
assert set(out["clusters"][0].keys()) == {"cluster", "title", "description"}
def test_describe_clusters_llm_degrades_on_empty_response(monkeypatch):
# ask_llm devuelve "" (error/red caida) -> titulos genericos + note.
_patch_ask_llm(monkeypatch, "")
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["clusters"][0]["title"] == "Cluster 0"
assert out["clusters"][1]["title"] == "Cluster 1"
assert out["clusters"][0]["description"] == ""
assert out["note"] == "LLM no disponible"
assert out["model"] == "claude-haiku-4-5-20251001"
def test_describe_clusters_llm_degrades_on_unparseable_response(monkeypatch):
_patch_ask_llm(monkeypatch, "lo siento, no puedo ayudarte con eso")
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["clusters"][0]["title"] == "Cluster 0"
assert out["clusters"][1]["title"] == "Cluster 1"
assert out["note"] == "parse fallido"
def test_describe_clusters_llm_empty_list_skips_llm(monkeypatch):
# Con lista vacia NO debe llamarse al LLM en absoluto.
def boom(*args, **kwargs):
raise AssertionError("ask_llm no debe llamarse con lista vacia")
mod = importlib.import_module("datascience.describe_clusters_llm")
monkeypatch.setattr(mod, "ask_llm", boom)
out = describe_clusters_llm([], _FEATURES)
assert out["clusters"] == []
assert out["note"] == "sin clusters"
def test_describe_clusters_llm_non_list_input_skips_llm():
# Input no-lista (None) -> clusters vacio sin tocar la red.
out = describe_clusters_llm(None, _FEATURES)
assert out["clusters"] == []
assert out["note"] == "sin clusters"
assert out["model"] == "claude-haiku-4-5-20251001"
@@ -1,95 +0,0 @@
---
name: project_clusters_2d
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def project_clusters_2d(columns: dict, k_min: int = 2, k_max: int = 8, max_points: int = 2000) -> dict"
description: "PCA a 2D + KMeans sobre el MISMO subset numerico estandarizado, devolviendo proyeccion 2D y labels de cluster ALINEADOS por fila para pintar un scatter PCA coloreado por cluster. Estandariza una sola vez, elige k por silhouette y proyecta centroides al espacio PCA. Determinista."
tags: [eda, models, clustering, pca, kmeans, scatter, dimensionality-reduction, datascience, sklearn]
params:
- name: columns
desc: "Mapa {nombre_columna: [valores numericos]}. Listas alineadas por fila (misma longitud). Columnas no numericas o con <2 valores distintos se descartan; None/NaN descartan la fila completa (listwise)."
- name: k_min
desc: "Numero minimo de clusters a probar por silhouette (default 2). El minimo de filas validas requerido es max(3, k_min*2)."
- name: k_max
desc: "Numero maximo de clusters a probar (default 8). Se acota a min(k_max, n_filas_validas-1)."
- name: max_points
desc: "Tope de puntos devueltos en points/labels (default 2000). Si n_used lo supera, points y labels se submuestrean CONJUNTAMENTE con paso determinista para seguir alineados; el fit usa siempre todas las filas."
output: "dict con points (proyeccion 2D, posiblemente submuestreada a max_points), labels (cluster de cada point, alineado con points), centers_2d (centroides en espacio PCA, len==best_k), best_k, silhouette, explained_2d ([var PC1, var PC2]), cluster_sizes (sobre n_used total), cluster_profiles (lista de {cluster, size, pct, centroid_original, distinctive top-3 por |z|, centroid_z}), feature_names, n_used (filas del fit antes de muestreo) y note (\"\" si ok). Con <2 columnas numericas o <max(3, k_min*2) filas validas devuelve best_k=0, listas vacias y note 'datos insuficientes' sin lanzar excepcion."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [numpy, scikit-learn]
tested: true
tests: ["test_golden_three_blobs_aligned_projection_and_clusters", "test_edge_subsampling_keeps_points_labels_aligned", "test_edge_single_numeric_column_insufficient", "test_edge_too_few_rows_insufficient", "test_edge_non_numeric_column_dropped_without_error", "test_edge_constant_column_dropped"]
test_file_path: "python/functions/datascience/project_clusters_2d_test.py"
file_path: "python/functions/datascience/project_clusters_2d.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.project_clusters_2d import project_clusters_2d
# Tres grupos gaussianos bien separados sobre 4 features.
import numpy as np
rng = np.random.default_rng(0)
rows = []
for center in (np.full(4, 0.0), np.full(4, 12.0), np.array([0.0, 12.0, 0.0, 12.0])):
rows.extend(rng.normal(loc=center, scale=0.4, size=(50, 4)))
mat = np.array(rows)
columns = {f"f{j}": [float(v) for v in mat[:, j]] for j in range(4)}
res = project_clusters_2d(columns, k_min=2, k_max=8)
print(res["best_k"]) # 3
print(len(res["points"]), len(res["labels"])) # 150 150 (alineados)
print(len(res["centers_2d"])) # == best_k
print([round(v, 2) for v in res["explained_2d"]]) # varianza de PC1, PC2
# Pintar: scatter(points[:,0], points[:,1], c=labels) + marcar centers_2d.
```
## Cuando usarla
Cuando, durante un EDA, quieres un scatter 2D de un dataset tabular numerico
coloreado por segmento descubierto automaticamente, y necesitas que cada punto
de la proyeccion lleve su etiqueta de cluster correcta. Usala en vez de
combinar `pca_explained` + `kmeans_segments` a mano: esas estandarizan por
separado y descartan los labels, asi que sus salidas no se pueden cruzar fila a
fila. Esta funcion garantiza esa alineacion (mismo X estandarizado para PCA y
KMeans) y ademas proyecta los centroides KMeans al espacio PCA para dibujarlos.
## Gotchas
- Funcion pura y determinista (StandardScaler + PCA random_state=0 + KMeans
random_state=0, n_init=10), pero requiere `numpy` y `scikit-learn` instalados.
- `points`/`labels` pueden venir submuestreados si `n_used > max_points` (paso
determinista `[::ceil(n_used/max_points)]`); `n_used`, `centers_2d`,
`cluster_sizes` y `cluster_profiles` se calculan SIEMPRE sobre todas las filas.
Cuando hay submuestreo, `note` lo indica.
- `centroid_z` y `distinctive` estan en z-score (espacio escalado);
`centroid_original` esta en las unidades originales (via
`scaler.inverse_transform`). No mezcles ambos al interpretar.
- `centers_2d` esta en el espacio PCA (coordenadas del scatter), no en unidades
originales: pintalo sobre el mismo eje que `points`.
- Silhouette baja con best_k alto sugiere que no hay estructura de cluster real;
el scatter puede no mostrar grupos separados.
## Notas
Pieza de composicion que `pca_explained` + `kmeans_segments` no cubren: ambas
estandarizan internamente por separado (cada una su propio `StandardScaler`) y
`kmeans_segments` no expone los labels por fila, por lo que no se pueden cruzar
con la `projection` de `pca_explained`. Esta funcion usa `sklearn` directo
(StandardScaler una sola vez compartido por PCA y KMeans) para garantizar la
alineacion `points[i] <-> labels[i]` y proyectar los centroides KMeans al
espacio PCA. Coercion y listwise deletion siguen el estilo de `pca_explained`
(None/NaN -> fila descartada, columnas no parseables o constantes descartadas).
Degrada con gracia: con <2 columnas numericas o <max(3, k_min*2) filas validas
devuelve `note: "datos insuficientes"` sin lanzar excepcion (try/except
defensivo en todo el cuerpo).
@@ -1,208 +0,0 @@
"""Proyeccion PCA-2D + KMeans sobre el mismo subset, con puntos y labels alineados.
Estandariza una sola vez las columnas numericas (z-score), proyecta a 2D con PCA
y clusteriza con KMeans sobre EXACTAMENTE la misma matriz escalada, de modo que
la proyeccion 2D (`points`) y la etiqueta de cluster (`labels`) quedan alineadas
fila a fila. Es la pieza que `pca_explained` + `kmeans_segments` no cubren: esas
dos estandarizan por separado y descartan los labels, asi que sus salidas no se
pueden cruzar para pintar un scatter PCA coloreado por cluster. Determinista.
"""
import math
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
def project_clusters_2d(
columns: dict,
k_min: int = 2,
k_max: int = 8,
max_points: int = 2000,
) -> dict:
"""Proyecta a 2D (PCA) y clusteriza (KMeans) el mismo subset estandarizado.
PCA a 2D y KMeans se ajustan sobre la MISMA matriz estandarizada, por lo que
`points` (proyeccion 2D) y `labels` (cluster por fila) quedan alineados por
indice. El k se elige automaticamente por silhouette en el rango
[k_min, min(k_max, n_rows-1)], igual criterio que `kmeans_segments`.
Determinista: StandardScaler + PCA(random_state=0) + KMeans(random_state=0,
n_init=10).
Args:
columns: mapa {nombre_columna: [valores numericos]}. Listas alineadas por
fila (misma longitud). Columnas no numericas o con menos de 2 valores
distintos se descartan. None/NaN marcan filas a descartar listwise
(una fila se elimina si cualquier feature falta).
k_min: numero minimo de clusters a probar (default 2).
k_max: numero maximo de clusters a probar (default 8). Se acota a
min(k_max, n_rows_validas-1).
max_points: tope de puntos devueltos en `points`/`labels`. Si las filas
usadas superan este tope, se submuestrea points y labels CONJUNTAMENTE
con paso determinista para mantenerlos alineados. El fit (best_k,
silhouette, centroides, perfiles) usa SIEMPRE todas las filas.
Returns:
dict con points (proyeccion 2D, posiblemente submuestreada a max_points),
labels (cluster de cada point, alineado con points), centers_2d
(centroides en espacio PCA, len == best_k), best_k, silhouette,
explained_2d (varianza de PC1 y PC2), cluster_sizes (sobre n_used total),
cluster_profiles (ver abajo), feature_names, n_used (filas del fit antes
de muestreo) y note ("" si ok). Cada entrada de cluster_profiles:
{cluster, size, pct, centroid_original (medias en escala original),
centroid_z (z del centroide), distinctive (top 3 features por |z|)}.
Con <2 columnas numericas o <max(3, k_min*2) filas validas devuelve
best_k=0 y note "datos insuficientes" sin lanzar excepcion.
"""
feature_names: list[str] = []
def insufficient(names: list[str], n_used: int) -> dict:
return {
"best_k": 0,
"points": [],
"labels": [],
"centers_2d": [],
"cluster_profiles": [],
"feature_names": names,
"n_used": int(n_used),
"note": "datos insuficientes",
}
try:
if not isinstance(columns, dict) or not columns:
return insufficient([], 0)
# 1. Coerce a numerico, descartando columnas no parseables o constantes.
numeric_cols: dict[str, list] = {}
for name, values in columns.items():
if not isinstance(values, (list, tuple)):
continue
coerced: list[float] = []
usable = True
for v in values:
if v is None:
coerced.append(math.nan)
continue
try:
coerced.append(float(v))
except (TypeError, ValueError):
usable = False
break
if not usable:
continue
# Menos de 2 valores distintos no aporta varianza -> descartar.
distinct = {x for x in coerced if not math.isnan(x)}
if len(distinct) < 2:
continue
numeric_cols[name] = coerced
feature_names = list(numeric_cols.keys())
if len(feature_names) < 2:
return insufficient(feature_names, 0)
# 2. Matriz alineada por fila + listwise deletion (cualquier NaN -> fuera).
matrix = np.array(
[numeric_cols[n] for n in feature_names], dtype=float
).T
valid_mask = ~np.isnan(matrix).any(axis=1)
data = matrix[valid_mask]
n_used = int(data.shape[0])
min_rows = max(3, k_min * 2)
if n_used < min_rows:
return insufficient(feature_names, n_used)
# 3. Estandarizar UNA sola vez (guardamos el scaler para desestandarizar).
scaler = StandardScaler()
X_scaled = scaler.fit_transform(data)
# 4. PCA a 2D sobre la matriz escalada.
pca = PCA(n_components=2, random_state=0)
pca.fit(X_scaled)
proj = pca.transform(X_scaled)
# 5. KMeans con seleccion automatica de k por silhouette (mismo X_scaled).
upper_k = min(k_max, n_used - 1)
if upper_k < k_min:
return insufficient(feature_names, n_used)
best = None # (silhouette, k, model, labels)
for k in range(k_min, upper_k + 1):
model = KMeans(n_clusters=k, n_init=10, random_state=0)
labels_k = model.fit_predict(X_scaled)
if len(set(labels_k)) < 2:
sil = -1.0
else:
sil = float(silhouette_score(X_scaled, labels_k))
if best is None or sil > best[0]:
best = (sil, k, model, labels_k)
best_sil, best_k, best_model, labels = best
# 6. Centroides KMeans (espacio escalado) proyectados al espacio PCA.
centers_2d = pca.transform(best_model.cluster_centers_)
# 7. Perfiles por cluster sobre TODAS las filas usadas.
centroids_original = scaler.inverse_transform(best_model.cluster_centers_)
cluster_sizes: list[int] = []
cluster_profiles: list[dict] = []
for c in range(best_k):
size = int(np.sum(labels == c))
cluster_sizes.append(size)
z_vec = best_model.cluster_centers_[c]
orig_vec = centroids_original[c]
centroid_z = {
feature_names[j]: float(z_vec[j]) for j in range(len(feature_names))
}
centroid_original = {
feature_names[j]: float(orig_vec[j])
for j in range(len(feature_names))
}
order = np.argsort(np.abs(z_vec))[::-1]
distinctive = [feature_names[int(j)] for j in order[:3]]
cluster_profiles.append(
{
"cluster": int(c),
"size": size,
"pct": float(size / n_used) if n_used else 0.0,
"centroid_original": centroid_original,
"distinctive": distinctive,
"centroid_z": centroid_z,
}
)
# 8. Muestreo determinista CONJUNTO de points + labels (mantiene alineacion).
note = ""
if n_used > max_points and max_points > 0:
step = math.ceil(n_used / max_points)
proj_out = proj[::step]
labels_out = labels[::step]
note = f"submuestreado a {len(proj_out)} de {n_used} puntos para visualizacion"
else:
proj_out = proj
labels_out = labels
points = [[float(row[0]), float(row[1])] for row in proj_out]
labels_list = [int(v) for v in labels_out]
centers_list = [[float(row[0]), float(row[1])] for row in centers_2d]
explained_2d = [float(x) for x in pca.explained_variance_ratio_]
return {
"points": points,
"labels": labels_list,
"centers_2d": centers_list,
"best_k": int(best_k),
"silhouette": float(best_sil),
"explained_2d": explained_2d,
"cluster_sizes": cluster_sizes,
"cluster_profiles": cluster_profiles,
"feature_names": feature_names,
"n_used": n_used,
"note": note,
}
except Exception:
# Lectura defensiva: nunca propagar excepciones al caller del EDA.
return insufficient(feature_names, 0)
@@ -1,127 +0,0 @@
"""Tests para project_clusters_2d."""
import numpy as np
from project_clusters_2d import project_clusters_2d
def _three_blobs(seed: int = 0, per_blob: int = 50, n_features: int = 4):
"""Genera 3 gaussianas bien separadas en n_features dims, alineadas por fila.
Devuelve un dict {col: [valores]} con las columnas alineadas por fila.
"""
rng = np.random.default_rng(seed)
base_centers = [
np.full(n_features, 0.0),
np.full(n_features, 12.0),
np.array([0.0, 12.0, 0.0, 12.0][:n_features] + [0.0] * max(0, n_features - 4)),
]
rows: list[np.ndarray] = []
for center in base_centers:
pts = rng.normal(loc=center, scale=0.4, size=(per_blob, n_features))
rows.extend(pts)
mat = np.array(rows)
return {f"f{j}": [float(v) for v in mat[:, j]] for j in range(n_features)}
def test_golden_three_blobs_aligned_projection_and_clusters():
columns = _three_blobs(seed=0, per_blob=50, n_features=4)
result = project_clusters_2d(columns, k_min=2, k_max=8)
n_used = result["n_used"]
assert n_used == 150
assert result["note"] == ""
best_k = result["best_k"]
assert 2 <= best_k <= 4
# points y labels alineados por fila.
assert len(result["points"]) == len(result["labels"])
assert len(result["points"]) == n_used # sin submuestreo (150 < 2000)
# Cada punto es un par (x, y).
assert all(len(p) == 2 for p in result["points"])
# Labels dentro del rango [0, best_k).
assert all(0 <= lbl < best_k for lbl in result["labels"])
# Centroides 2D: uno por cluster.
assert len(result["centers_2d"]) == best_k
assert all(len(c) == 2 for c in result["centers_2d"])
# Varianza explicada de los 2 componentes.
assert len(result["explained_2d"]) == 2
# cluster_sizes cubre todas las filas usadas.
assert sum(result["cluster_sizes"]) == n_used
assert len(result["cluster_sizes"]) == best_k
# cluster_profiles: una entrada por cluster, con centroid_original poblado.
assert len(result["cluster_profiles"]) == best_k
for prof in result["cluster_profiles"]:
assert set(prof["centroid_original"].keys()) == set(result["feature_names"])
assert set(prof["centroid_z"].keys()) == set(result["feature_names"])
assert 1 <= len(prof["distinctive"]) <= 3
assert prof["size"] >= 0
assert 0.0 <= prof["pct"] <= 1.0
def test_edge_subsampling_keeps_points_labels_aligned():
# max_points pequeño fuerza submuestreo conjunto de points + labels.
columns = _three_blobs(seed=1, per_blob=50, n_features=3)
result = project_clusters_2d(columns, k_min=2, k_max=6, max_points=40)
n_used = result["n_used"]
assert n_used == 150 # el fit usa todas las filas
# points y labels submuestreados pero siempre con la misma longitud.
assert len(result["points"]) == len(result["labels"])
assert len(result["points"]) <= 40
# centers/sizes/profiles se calculan sobre TODOS los puntos.
assert sum(result["cluster_sizes"]) == n_used
assert len(result["centers_2d"]) == result["best_k"]
assert result["note"] != "" # senala el submuestreo
def test_edge_single_numeric_column_insufficient():
columns = {"x": [float(i) for i in range(50)]}
result = project_clusters_2d(columns, k_min=2, k_max=8)
assert result["best_k"] == 0
assert result["note"] == "datos insuficientes"
assert result["points"] == []
assert result["labels"] == []
assert result["centers_2d"] == []
assert result["cluster_profiles"] == []
def test_edge_too_few_rows_insufficient():
# Solo 2 filas validas, min_rows = max(3, k_min*2) = 4 -> insuficiente.
columns = {"x": [1.0, 5.0], "y": [2.0, 9.0]}
result = project_clusters_2d(columns, k_min=2, k_max=8)
assert result["best_k"] == 0
assert result["note"] == "datos insuficientes"
def test_edge_non_numeric_column_dropped_without_error():
# La columna de strings se descarta; quedan 3 numericas -> funciona.
columns = _three_blobs(seed=2, per_blob=50, n_features=3)
columns["label"] = ["a"] * len(columns["f0"])
result = project_clusters_2d(columns, k_min=2, k_max=6)
assert result["best_k"] >= 2
assert "label" not in result["feature_names"]
assert set(result["feature_names"]) == {"f0", "f1", "f2"}
assert len(result["points"]) == len(result["labels"])
def test_edge_constant_column_dropped():
# Una columna constante (0 varianza) se descarta por <2 valores distintos.
columns = _three_blobs(seed=3, per_blob=50, n_features=3)
columns["const"] = [7.0] * len(columns["f0"])
result = project_clusters_2d(columns, k_min=2, k_max=6)
assert "const" not in result["feature_names"]
assert result["best_k"] >= 2