Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 649de07d6b |
@@ -42,8 +42,6 @@ from .isolation_forest_outliers import isolation_forest_outliers
|
||||
from .normality_tests import normality_tests
|
||||
from .trend_slope import trend_slope
|
||||
from .run_eda_models import run_eda_models
|
||||
from .project_clusters_2d import project_clusters_2d
|
||||
from .describe_clusters_llm import describe_clusters_llm
|
||||
from .eda_llm_insights import eda_llm_insights
|
||||
from .build_eda_notebook import build_eda_notebook
|
||||
from .decode_qr_image import decode_qr_image
|
||||
@@ -88,8 +86,6 @@ __all__ = [
|
||||
"normality_tests",
|
||||
"trend_slope",
|
||||
"run_eda_models",
|
||||
"project_clusters_2d",
|
||||
"describe_clusters_llm",
|
||||
"eda_llm_insights",
|
||||
"build_eda_notebook",
|
||||
"describe_numeric",
|
||||
|
||||
@@ -0,0 +1,402 @@
|
||||
"""Categorical distributions chapter (CAT DISTR).
|
||||
|
||||
Third reference chapter for AutomaticEDA. For every categorical column it shows,
|
||||
fulfilling the user's request:
|
||||
|
||||
1. A short opening explanation of **Shannon entropy** (what it measures, its 0
|
||||
and log2(k) bounds, the normalized 0–1 version) and the dataset row total used
|
||||
as a comparison baseline.
|
||||
2. Per column, a cardinality key/value table: distinct values, ``% distinct``
|
||||
(distinct / total rows), total dataset rows, singleton values (frequency 1),
|
||||
entropy with its theoretical maximum and the normalized ratio, mode, imbalance
|
||||
and string-length stats.
|
||||
3. A short note flagging problematic cardinality (id-like ≈100% distinct, or a
|
||||
single dominating category).
|
||||
4. A ``top-k`` table (value / count / %).
|
||||
5. A **donut pie chart** of the most common categories (top-k + an "Otros"
|
||||
bucket), drawn lazily so the renderers scale it to fit entirely.
|
||||
|
||||
Data comes from the ``eda`` group: each ``columns[i]['categorical']`` is the
|
||||
output of ``summarize_categorical`` (``top[{value,count,pct}]``, ``mode``,
|
||||
``n_distinct``, ``entropy``, ``imbalance``, ``len_min/mean/max``). The derived
|
||||
cardinality metrics and the pie figure are delegated to two registry functions
|
||||
(``categorical_cardinality_block`` and ``categorical_top_pie_figure``); both are
|
||||
imported lazily and degrade to a minimal inline fallback so this chapter never
|
||||
raises even if they are unavailable.
|
||||
|
||||
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
|
||||
from .. import model
|
||||
|
||||
CHAPTER_VERSION = "1.0.0"
|
||||
CHAPTER_ID = "cat_distr"
|
||||
CHAPTER_TITLE = "Distribuciones categóricas"
|
||||
|
||||
# Cap the number of categorical columns rendered to keep the document bounded;
|
||||
# the rest are summarized in a closing note (no silent truncation).
|
||||
MAX_COLS = 40
|
||||
# Rows shown in each top-k table and explicit slices in the pie.
|
||||
TOP_TABLE_ROWS = 15
|
||||
PIE_TOP_K = 6
|
||||
# Truncate very long category labels in tables (the renderer also wraps).
|
||||
LABEL_MAX = 48
|
||||
|
||||
|
||||
def _fmt_int(value) -> str:
|
||||
if value is None:
|
||||
return "—"
|
||||
try:
|
||||
return f"{int(value):,}".replace(",", ".")
|
||||
except (TypeError, ValueError):
|
||||
return str(value)
|
||||
|
||||
|
||||
def _fmt_num(value, decimals: int = 3) -> str:
|
||||
if value is None:
|
||||
return "—"
|
||||
if isinstance(value, bool):
|
||||
return str(value)
|
||||
if isinstance(value, int):
|
||||
return f"{value:,}".replace(",", ".")
|
||||
if isinstance(value, float):
|
||||
if value != value: # NaN
|
||||
return "NaN"
|
||||
if value in (float("inf"), float("-inf")):
|
||||
return str(value)
|
||||
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
|
||||
return text if text else "0"
|
||||
return str(value)
|
||||
|
||||
|
||||
def _fmt_pct_value(value, decimals: int = 1) -> str:
|
||||
"""Format an already-in-percent value (0–100). None -> placeholder."""
|
||||
if value is None:
|
||||
return "—"
|
||||
try:
|
||||
return f"{float(value):.{decimals}f}%"
|
||||
except (TypeError, ValueError):
|
||||
return str(value)
|
||||
|
||||
|
||||
def _pct_from_maybe_fraction(value, decimals: int = 1) -> str:
|
||||
"""Format a percentage that may arrive as a 0–1 fraction or a 0–100 number."""
|
||||
if value is None:
|
||||
return "—"
|
||||
try:
|
||||
v = float(value)
|
||||
except (TypeError, ValueError):
|
||||
return str(value)
|
||||
if v <= 1.0:
|
||||
v *= 100.0
|
||||
return f"{v:.{decimals}f}%"
|
||||
|
||||
|
||||
def _truncate(text: str, limit: int = LABEL_MAX) -> str:
|
||||
s = model._safe_str(text)
|
||||
if len(s) <= limit:
|
||||
return s
|
||||
return s[: max(1, limit - 1)].rstrip() + "…"
|
||||
|
||||
|
||||
def _is_categorical(col: dict) -> bool:
|
||||
"""A column is treated as categorical when it carries a non-empty top list
|
||||
and is not a pure numeric column (numeric columns may still expose a top)."""
|
||||
if not isinstance(col, dict):
|
||||
return False
|
||||
cat = col.get("categorical")
|
||||
if not (isinstance(cat, dict) and cat.get("top")):
|
||||
return False
|
||||
if col.get("inferred_type") == "numeric":
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _cardinality(cat: dict, n_rows) -> dict:
|
||||
"""Derive cardinality metrics for a column, via the registry function when
|
||||
available, otherwise a minimal inline fallback. Never raises."""
|
||||
try:
|
||||
from datascience.categorical_cardinality_block import (
|
||||
categorical_cardinality_block,
|
||||
)
|
||||
|
||||
out = categorical_cardinality_block(cat=cat, n_rows=n_rows)
|
||||
if isinstance(out, dict):
|
||||
return out
|
||||
except Exception: # noqa: BLE001 — fall back to the inline derivation.
|
||||
pass
|
||||
return _fallback_cardinality(cat, n_rows)
|
||||
|
||||
|
||||
def _fallback_cardinality(cat: dict, n_rows) -> dict:
|
||||
cat = cat or {}
|
||||
top = cat.get("top") or []
|
||||
n_distinct = cat.get("n_distinct")
|
||||
entropy = cat.get("entropy")
|
||||
try:
|
||||
nr = int(n_rows) if n_rows is not None else None
|
||||
except (TypeError, ValueError):
|
||||
nr = None
|
||||
pct_distinct = None
|
||||
if isinstance(n_distinct, (int, float)) and nr:
|
||||
pct_distinct = float(n_distinct) / nr * 100.0
|
||||
entropy_max = None
|
||||
if isinstance(n_distinct, (int, float)):
|
||||
entropy_max = math.log2(n_distinct) if n_distinct > 1 else 0.0
|
||||
entropy_norm = None
|
||||
if isinstance(entropy, (int, float)) and entropy_max:
|
||||
entropy_norm = max(0.0, min(1.0, float(entropy) / entropy_max))
|
||||
mode_pct = cat.get("mode_pct")
|
||||
if mode_pct is None and top and isinstance(top[0], dict):
|
||||
mode_pct = top[0].get("pct")
|
||||
# Normalize to a 0–100 scale: summarize_categorical emits a 0–1 fraction.
|
||||
if isinstance(mode_pct, (int, float)) and not isinstance(mode_pct, bool):
|
||||
mode_pct = float(mode_pct) * 100.0 if mode_pct <= 1.0 else float(mode_pct)
|
||||
else:
|
||||
mode_pct = None
|
||||
n_singletons = None
|
||||
if top:
|
||||
n_singletons = sum(
|
||||
1 for t in top if isinstance(t, dict) and t.get("count") == 1)
|
||||
return {
|
||||
"n_distinct": n_distinct,
|
||||
"n_rows": nr,
|
||||
"pct_distinct": pct_distinct,
|
||||
"entropy": entropy,
|
||||
"entropy_max": entropy_max,
|
||||
"entropy_norm": entropy_norm,
|
||||
"mode": cat.get("mode"),
|
||||
"mode_pct": mode_pct,
|
||||
"imbalance": cat.get("imbalance"),
|
||||
"n_singletons": n_singletons,
|
||||
"n_singletons_partial": (
|
||||
isinstance(n_distinct, (int, float)) and n_distinct > len(top)),
|
||||
"len_min": cat.get("len_min"),
|
||||
"len_mean": cat.get("len_mean"),
|
||||
"len_max": cat.get("len_max"),
|
||||
"id_like": pct_distinct is not None and pct_distinct >= 99.0,
|
||||
"dominated": mode_pct is not None and mode_pct >= 90.0,
|
||||
}
|
||||
|
||||
|
||||
def _pie_make(top, n_distinct, title, n_rows):
|
||||
"""Return a zero-arg callable that builds the donut figure lazily."""
|
||||
|
||||
def make():
|
||||
try:
|
||||
from datascience.categorical_top_pie_figure import (
|
||||
categorical_top_pie_figure,
|
||||
)
|
||||
|
||||
return categorical_top_pie_figure(
|
||||
top=top, n_distinct=n_distinct or 0, title=title,
|
||||
top_k=PIE_TOP_K, n_rows=n_rows)
|
||||
except Exception: # noqa: BLE001 — minimal local fallback figure.
|
||||
return _fallback_pie(top, title)
|
||||
|
||||
return make
|
||||
|
||||
|
||||
def _fallback_pie(top, title):
|
||||
"""Minimal donut figure used only if the registry function is unavailable."""
|
||||
import matplotlib
|
||||
|
||||
matplotlib.use("Agg")
|
||||
from matplotlib.figure import Figure
|
||||
|
||||
fig = Figure(figsize=(5.0, 3.2))
|
||||
ax = fig.add_subplot(111)
|
||||
items = [t for t in (top or [])
|
||||
if isinstance(t, dict) and isinstance(t.get("count"), (int, float))]
|
||||
items = sorted(items, key=lambda t: t.get("count") or 0, reverse=True)
|
||||
head = items[:PIE_TOP_K]
|
||||
rest = items[PIE_TOP_K:]
|
||||
labels = [_truncate(t.get("value"), 20) for t in head]
|
||||
sizes = [float(t.get("count") or 0) for t in head]
|
||||
if rest:
|
||||
labels.append(f"Otros ({len(rest)})")
|
||||
sizes.append(sum(float(t.get("count") or 0) for t in rest))
|
||||
if not sizes or sum(sizes) <= 0:
|
||||
ax.text(0.5, 0.5, "sin datos categóricos", ha="center", va="center")
|
||||
ax.axis("off")
|
||||
return fig
|
||||
ax.pie(sizes, labels=None, wedgeprops={"width": 0.42},
|
||||
autopct=lambda p: f"{p:.0f}%" if p >= 4 else "")
|
||||
ax.legend(labels, loc="center left", bbox_to_anchor=(1.0, 0.5),
|
||||
fontsize=7, frameon=False)
|
||||
ax.set_title(_truncate(title, 40))
|
||||
fig.tight_layout()
|
||||
return fig
|
||||
|
||||
|
||||
def _normalize_card(card: dict) -> dict:
|
||||
"""Make the cardinality dict robust regardless of the upstream scale.
|
||||
|
||||
``summarize_categorical`` emits ``mode_pct`` as a 0–1 fraction; bring it to a
|
||||
0–100 scale and recompute the ``dominated`` flag here so the chapter is
|
||||
correct whether it consumed the registry function or the inline fallback.
|
||||
"""
|
||||
card = dict(card or {})
|
||||
mp = card.get("mode_pct")
|
||||
if isinstance(mp, (int, float)) and not isinstance(mp, bool):
|
||||
mp = float(mp) * 100.0 if mp <= 1.0 else float(mp)
|
||||
else:
|
||||
mp = None
|
||||
card["mode_pct"] = mp
|
||||
card["dominated"] = mp is not None and mp >= 90.0
|
||||
pd = card.get("pct_distinct")
|
||||
card["id_like"] = isinstance(pd, (int, float)) and pd >= 99.0
|
||||
return card
|
||||
|
||||
|
||||
def _cardinality_block(card: dict):
|
||||
"""KVTable with the cardinality / entropy metrics for one column."""
|
||||
n_singletons = card.get("n_singletons")
|
||||
if n_singletons is not None and card.get("n_singletons_partial"):
|
||||
singletons = f"≥{_fmt_int(n_singletons)} (en top mostrado)"
|
||||
elif n_singletons is not None:
|
||||
singletons = _fmt_int(n_singletons)
|
||||
else:
|
||||
singletons = "—"
|
||||
|
||||
entropy_ref = _fmt_num(card.get("entropy"))
|
||||
emax = card.get("entropy_max")
|
||||
if emax is not None:
|
||||
entropy_ref = f"{entropy_ref} (máx {_fmt_num(emax)})"
|
||||
|
||||
mode = card.get("mode")
|
||||
mode_pct = card.get("mode_pct")
|
||||
mode_str = "—" if mode is None else model._safe_str(mode)
|
||||
if mode is not None and mode_pct is not None:
|
||||
mode_str = f"{mode_str} ({_fmt_pct_value(mode_pct)})"
|
||||
|
||||
rows = [
|
||||
("Valores distintos", _fmt_int(card.get("n_distinct"))),
|
||||
("% distintos", _fmt_pct_value(card.get("pct_distinct"))),
|
||||
("Total filas (dataset)", _fmt_int(card.get("n_rows"))),
|
||||
("Valores únicos (frecuencia 1)", singletons),
|
||||
("Entropía (bits)", entropy_ref),
|
||||
("Entropía normalizada (0–1)", _fmt_num(card.get("entropy_norm"))),
|
||||
("Moda", mode_str),
|
||||
]
|
||||
imbalance = card.get("imbalance")
|
||||
if imbalance is not None:
|
||||
rows.append(("Desbalance", _fmt_num(imbalance)))
|
||||
lm = card.get("len_min")
|
||||
lmean = card.get("len_mean")
|
||||
lmax = card.get("len_max")
|
||||
if any(v is not None for v in (lm, lmean, lmax)):
|
||||
rows.append((
|
||||
"Longitud (mín/media/máx)",
|
||||
f"{_fmt_num(lm)} / {_fmt_num(lmean)} / {_fmt_num(lmax)}"))
|
||||
return model.KVTable(rows=rows, title="Cardinalidad")
|
||||
|
||||
|
||||
def _flag_note(card: dict):
|
||||
"""Return a Note flagging problematic cardinality, or None."""
|
||||
if card.get("id_like"):
|
||||
return model.Note(
|
||||
"Casi todos los valores son distintos (≈100% distintos): la columna "
|
||||
"se comporta como un identificador y aporta poco para agrupar o "
|
||||
"comparar categorías.")
|
||||
if card.get("dominated"):
|
||||
mp = card.get("mode_pct")
|
||||
mp_str = _fmt_pct_value(mp) if mp is not None else "muy alta"
|
||||
return model.Note(
|
||||
f"Una sola categoría domina la columna (moda {mp_str}): la "
|
||||
"distribución está muy desbalanceada.")
|
||||
return None
|
||||
|
||||
|
||||
def _topk_table(cat: dict):
|
||||
"""DataTable value / count / % for the top categories."""
|
||||
top = cat.get("top") or []
|
||||
n_distinct = cat.get("n_distinct")
|
||||
header = ["Valor", "Conteo", "%"]
|
||||
rows = []
|
||||
for t in top[:TOP_TABLE_ROWS]:
|
||||
if not isinstance(t, dict):
|
||||
continue
|
||||
rows.append([
|
||||
model._safe_str(t.get("value")),
|
||||
_fmt_int(t.get("count")),
|
||||
_pct_from_maybe_fraction(t.get("pct")),
|
||||
])
|
||||
if not rows:
|
||||
return None
|
||||
shown = len(rows)
|
||||
if isinstance(n_distinct, (int, float)) and n_distinct > shown:
|
||||
note = f"top {shown} de {_fmt_int(n_distinct)} categorías distintas"
|
||||
else:
|
||||
note = f"{shown} categorías"
|
||||
return model.DataTable(header=header, rows=rows, title="Top categorías",
|
||||
note=note)
|
||||
|
||||
|
||||
def _intro_blocks(n_rows):
|
||||
total = _fmt_int(n_rows)
|
||||
text = (
|
||||
"La **entropía de Shannon** mide cómo de repartidos están los valores de "
|
||||
"una columna categórica, en bits. Vale 0 cuando una sola categoría "
|
||||
"concentra todas las filas (máxima previsibilidad) y alcanza su máximo, "
|
||||
"log2(k) para k categorías distintas, cuando todas aparecen por igual "
|
||||
"(máxima diversidad). La **entropía normalizada** (entropía dividida por "
|
||||
"su máximo) la lleva al rango 0–1 para comparar columnas con distinto "
|
||||
"número de categorías. Para cada columna se muestran los valores "
|
||||
"distintos, el porcentaje que representan sobre el total de filas, los "
|
||||
"valores únicos (que aparecen una sola vez), la tabla de las categorías "
|
||||
"más frecuentes y un gráfico de tarta (donut) de las más comunes."
|
||||
)
|
||||
if n_rows is not None:
|
||||
text += f" El dataset tiene {total} filas en total como referencia."
|
||||
return [
|
||||
model.Heading(text="Entropía y cardinalidad", level=2),
|
||||
model.Markdown(text=text),
|
||||
]
|
||||
|
||||
|
||||
def build_cat_distr(profile: dict, ctx: dict):
|
||||
"""Build the categorical-distributions Chapter, or None if the dataset has
|
||||
no categorical columns."""
|
||||
profile = profile or {}
|
||||
ctx = ctx or {}
|
||||
cols = profile.get("columns") or []
|
||||
cat_cols = [c for c in cols if _is_categorical(c)]
|
||||
if not cat_cols:
|
||||
return None
|
||||
|
||||
n_rows = profile.get("n_rows")
|
||||
blocks = list(_intro_blocks(n_rows))
|
||||
|
||||
rendered = cat_cols[:MAX_COLS]
|
||||
for col in rendered:
|
||||
name = col.get("name") or "(columna)"
|
||||
cat = col.get("categorical") or {}
|
||||
card = _normalize_card(_cardinality(cat, n_rows))
|
||||
|
||||
blocks.append(model.Heading(text=str(name), level=2))
|
||||
blocks.append(_cardinality_block(card))
|
||||
note = _flag_note(card)
|
||||
if note is not None:
|
||||
blocks.append(note)
|
||||
topk = _topk_table(cat)
|
||||
if topk is not None:
|
||||
blocks.append(topk)
|
||||
blocks.append(model.Figure(
|
||||
make=_pie_make(cat.get("top") or [], card.get("n_distinct"),
|
||||
str(name), n_rows),
|
||||
caption=(f"Categorías más comunes de «{_truncate(name, 32)}» "
|
||||
"(donut: top-k + «Otros»)")))
|
||||
|
||||
if len(cat_cols) > len(rendered):
|
||||
omitted = len(cat_cols) - len(rendered)
|
||||
blocks.append(model.Note(
|
||||
f"Se muestran las primeras {len(rendered)} columnas categóricas; "
|
||||
f"quedan {omitted} sin mostrar para mantener acotado el informe."))
|
||||
|
||||
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
|
||||
version=CHAPTER_VERSION, blocks=blocks)
|
||||
@@ -0,0 +1,186 @@
|
||||
"""Tests for the CAT DISTR chapter — DoD: golden + edges + anti-cut.
|
||||
|
||||
Self-contained: builds synthetic TableProfiles (no DuckDB) so the suite is fast
|
||||
and deterministic. Verifies that ``build_cat_distr`` emits the blocks the user
|
||||
asked for (entropy intro, distinct/total/%-distinct/unique metrics, top-k table
|
||||
and a donut figure), that the chapter renders inside the full document to both
|
||||
PDF and PPTX showing that content, that a profile with no categorical columns
|
||||
yields ``None`` without raising, and that long labels / many columns are never
|
||||
cut in either output.
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import tempfile
|
||||
|
||||
from pypdf import PdfReader
|
||||
from pptx import Presentation
|
||||
|
||||
from datascience.automatic_eda.model import (
|
||||
DataTable, Figure, Heading, KVTable, Note,
|
||||
)
|
||||
from datascience.automatic_eda.chapters.cat_distr import (
|
||||
CHAPTER_ID, CHAPTER_VERSION, build_cat_distr,
|
||||
)
|
||||
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
|
||||
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
|
||||
|
||||
|
||||
def _profile() -> dict:
|
||||
return {
|
||||
"table": "productos",
|
||||
"source": "/data/productos.csv",
|
||||
"profiled_at": "2026-06-30T10:00:00+00:00",
|
||||
"n_rows": 1000,
|
||||
"n_cols": 3,
|
||||
"quality_score": 90.0,
|
||||
"columns": [
|
||||
{"name": "precio", "inferred_type": "numeric", "null_pct": 0.0,
|
||||
"null_count": 0,
|
||||
"numeric": {"mean": 42.5, "median": 40.0, "min": 1.0,
|
||||
"max": 100.0, "std": 12.3}},
|
||||
{"name": "categoria", "inferred_type": "categorical",
|
||||
"null_pct": 0.0, "null_count": 0, "distinct_count": 8,
|
||||
"categorical": {
|
||||
"top": [
|
||||
{"value": "neumaticos", "count": 500, "pct": 0.5},
|
||||
{"value": "aceite", "count": 300, "pct": 0.3},
|
||||
{"value": "filtros", "count": 120, "pct": 0.12},
|
||||
{"value": "frenos", "count": 80, "pct": 0.08},
|
||||
],
|
||||
"mode": "neumaticos", "n_distinct": 8, "entropy": 1.6,
|
||||
"imbalance": 6.25, "len_min": 6, "len_mean": 7.5,
|
||||
"len_max": 10}},
|
||||
{"name": "uuid", "inferred_type": "categorical",
|
||||
"null_pct": 0.0, "null_count": 0, "distinct_count": 1000,
|
||||
"categorical": {
|
||||
"top": [{"value": f"id-{i}", "count": 1} for i in range(5)],
|
||||
"mode": "id-0", "n_distinct": 1000, "entropy": 9.97,
|
||||
"imbalance": 1.0}},
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def _pdf_text(path: str) -> str:
|
||||
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
|
||||
return re.sub(r"\s+", " ", txt)
|
||||
|
||||
|
||||
def _pptx_text(path: str) -> str:
|
||||
prs = Presentation(path)
|
||||
parts = []
|
||||
for sl in prs.slides:
|
||||
for sh in sl.shapes:
|
||||
if sh.has_text_frame:
|
||||
parts.append(sh.text_frame.text)
|
||||
if sh.has_table:
|
||||
tb = sh.table
|
||||
for r in range(len(tb.rows)):
|
||||
for c in range(len(tb.columns)):
|
||||
parts.append(tb.cell(r, c).text)
|
||||
return re.sub(r"\s+", " ", " ".join(parts))
|
||||
|
||||
|
||||
def _kinds(chapter):
|
||||
return [b.kind for b in chapter.blocks]
|
||||
|
||||
|
||||
def test_golden_build_cat_distr_emite_bloques_pedidos():
|
||||
ch = build_cat_distr(_profile(), {})
|
||||
assert ch is not None
|
||||
assert ch.id == CHAPTER_ID
|
||||
assert ch.version == CHAPTER_VERSION
|
||||
kinds = _kinds(ch)
|
||||
# Entropy intro present.
|
||||
headings = [b.text for b in ch.blocks if isinstance(b, Heading)]
|
||||
assert any("Entrop" in h for h in headings)
|
||||
md = next(b for b in ch.blocks if b.kind == "markdown")
|
||||
assert "entropía" in md.text.lower() and "log2" in md.text
|
||||
# Cardinality metrics: distinct, total rows, %-distinct, unique values.
|
||||
kv = next(b for b in ch.blocks if isinstance(b, KVTable))
|
||||
labels = [r[0] for r in kv.rows]
|
||||
assert "Valores distintos" in labels
|
||||
assert "% distintos" in labels
|
||||
assert "Total filas (dataset)" in labels
|
||||
assert "Valores únicos (frecuencia 1)" in labels
|
||||
assert any("Entropía" in lbl for lbl in labels)
|
||||
# Top-k table + pie figure.
|
||||
dt = next(b for b in ch.blocks if isinstance(b, DataTable))
|
||||
assert dt.header == ["Valor", "Conteo", "%"]
|
||||
assert any("neumaticos" in str(cell) for row in dt.rows for cell in row)
|
||||
assert any(isinstance(b, Figure) for b in ch.blocks)
|
||||
# id-like column flagged with a Note.
|
||||
assert any(isinstance(b, Note) and "identificador" in b.text
|
||||
for b in ch.blocks)
|
||||
|
||||
|
||||
def test_golden_render_pdf_muestra_categoricas():
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
out = os.path.join(d, "eda.pdf")
|
||||
res = render_automatic_eda_pdf(_profile(), out, {"title": "EDA"})
|
||||
assert res["path"] == out and os.path.exists(out)
|
||||
assert CHAPTER_ID in [c["id"] for c in res["chapters"]]
|
||||
txt = _pdf_text(out)
|
||||
assert "Entrop" in txt
|
||||
assert "distintos" in txt
|
||||
assert "categoria" in txt and "neumaticos" in txt
|
||||
assert "donut" in txt # figure caption rendered as text.
|
||||
assert "identificador" in txt # id-like note rendered.
|
||||
|
||||
|
||||
def test_golden_render_pptx_muestra_categoricas():
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
out = os.path.join(d, "eda.pptx")
|
||||
res = render_automatic_eda_pptx(_profile(), out, {"title": "EDA"})
|
||||
assert res["path"] == out and os.path.exists(out)
|
||||
assert CHAPTER_ID in [c["id"] for c in res["chapters"]]
|
||||
txt = _pptx_text(out)
|
||||
assert "Entrop" in txt
|
||||
assert "categoria" in txt and "neumaticos" in txt
|
||||
assert "distintos" in txt
|
||||
|
||||
|
||||
def test_edge_sin_categoricas_devuelve_none():
|
||||
only_numeric = {
|
||||
"n_rows": 10, "columns": [
|
||||
{"name": "x", "inferred_type": "numeric",
|
||||
"numeric": {"mean": 1.0}}]}
|
||||
assert build_cat_distr(only_numeric, {}) is None
|
||||
# None / empty / no-columns never raise and yield None.
|
||||
assert build_cat_distr(None, None) is None
|
||||
assert build_cat_distr({}, {}) is None
|
||||
assert build_cat_distr({"columns": []}, {}) is None
|
||||
|
||||
|
||||
def test_anti_corte_label_largo_y_muchas_columnas():
|
||||
long_label = ("Lorem ipsum dolor sit amet consectetur adipiscing elit sed "
|
||||
"do eiusmod tempor incididunt ut labore reprehenderit voluptate")
|
||||
cols = []
|
||||
for i in range(30):
|
||||
cols.append({
|
||||
"name": f"cat_{i}", "inferred_type": "categorical",
|
||||
"distinct_count": 3,
|
||||
"categorical": {
|
||||
"top": [{"value": long_label, "count": 60},
|
||||
{"value": "b", "count": 30},
|
||||
{"value": "c", "count": 10}],
|
||||
"mode": long_label, "n_distinct": 3, "entropy": 1.2}})
|
||||
profile = {"table": "t", "source": "t.csv", "n_rows": 100,
|
||||
"n_cols": len(cols), "columns": cols}
|
||||
|
||||
ch = build_cat_distr(profile, {})
|
||||
assert ch is not None
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
pdf = os.path.join(d, "anti.pdf")
|
||||
res = render_automatic_eda_pdf(profile, pdf, {"write_manifest": False})
|
||||
assert res["path"] == pdf
|
||||
assert res["n_pages"] > 1 # many columns spilled across pages, OK.
|
||||
txt = _pdf_text(pdf)
|
||||
# Long label wrapped (not truncated): every word survives.
|
||||
for word in ("Lorem", "incididunt", "reprehenderit", "voluptate"):
|
||||
assert word in txt
|
||||
# PPTX path must not raise either.
|
||||
pptx = os.path.join(d, "anti.pptx")
|
||||
res2 = render_automatic_eda_pptx(profile, pptx,
|
||||
{"write_manifest": False})
|
||||
assert res2["path"] == pptx and os.path.exists(pptx)
|
||||
@@ -1,498 +0,0 @@
|
||||
"""Models chapter (MODELOS) — cheap unsupervised models, rendered as markdown.
|
||||
|
||||
Builds the *Modelos* chapter of an AutomaticEDA document from the ``models``
|
||||
block of a TableProfile (``run_eda_models`` output: ``{pca, kmeans, outliers,
|
||||
normality}``). It renders, as structured markdown/tables/figures that the core
|
||||
paginator never cuts:
|
||||
|
||||
1. **Normalization note** — every multivariate model below standardizes the
|
||||
columns with z-score first; the chapter explains why (different scales would
|
||||
otherwise dominate distance/variance).
|
||||
2. **PCA** — a scree plot (explained + cumulative variance, single Y axis) plus
|
||||
variance and top-loadings tables.
|
||||
3. **KMeans segments** — a PCA scatter **coloured by cluster** (its own
|
||||
page/slide), the cluster-size table, and a per-cluster LLM micro-analysis
|
||||
with a title for each segment.
|
||||
4. **Isolation Forest outliers** — a short explanation of how anomalous rows are
|
||||
isolated multivariately and how the threshold is chosen, plus the counts.
|
||||
5. **Normality** — per-column Jarque-Bera / D'Agostino / Shapiro verdicts.
|
||||
|
||||
The raw numeric data needed to colour the cluster scatter is **not** in the
|
||||
TableProfile, so — exactly like ``overview`` reads ``head_rows`` from ``ctx`` —
|
||||
this chapter looks for the cluster projection / raw numeric columns in ``ctx``
|
||||
(or in ``profile``) and degrades honestly when they are absent: it falls back to
|
||||
the uncoloured ``pca.projection`` with a note, or omits the scatter entirely.
|
||||
|
||||
ctx keys this chapter consumes (all optional):
|
||||
cluster_projection : dict — a pre-computed ``project_clusters_2d`` result
|
||||
(``points``/``labels``/``centers_2d``/``cluster_profiles``/...). Used
|
||||
directly when present (forward-compatible with the calculation phase).
|
||||
raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present
|
||||
and ``cluster_projection`` is not, the chapter calls
|
||||
``project_clusters_2d`` live to build points + aligned labels.
|
||||
cluster_titles : list — pre-computed ``[{cluster, title, description}]``
|
||||
(a ``describe_clusters_llm`` ``clusters`` list). Used for the per-cluster
|
||||
micro-analysis without an LLM call (offline/tests).
|
||||
run_cluster_llm : bool — when True and ``cluster_titles`` is absent, call
|
||||
``describe_clusters_llm`` live on the cluster profiles.
|
||||
cluster_llm_model : str — model id for the live LLM call.
|
||||
|
||||
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from .. import model
|
||||
|
||||
CHAPTER_VERSION = "1.0.0"
|
||||
CHAPTER_ID = "modelos"
|
||||
CHAPTER_TITLE = "Modelos"
|
||||
|
||||
# Tableau-10 palette (matplotlib's default cycle) — used both for the matplotlib
|
||||
# scatter and to keep the legend/colours stable per cluster index.
|
||||
_CLUSTER_COLORS = [
|
||||
"#4e79a7", "#f28e2b", "#e15759", "#76b7b2", "#59a14f",
|
||||
"#edc948", "#b07aa1", "#ff9da7", "#9c755f", "#bab0ac",
|
||||
]
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Formatting helpers (mirror the overview chapter's defensive style).
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _fmt_num(value, decimals: int = 3) -> str:
|
||||
if value is None:
|
||||
return "—"
|
||||
if isinstance(value, bool):
|
||||
return "sí" if value else "no"
|
||||
if isinstance(value, int):
|
||||
return f"{value:,}".replace(",", ".")
|
||||
if isinstance(value, float):
|
||||
if value != value: # NaN
|
||||
return "NaN"
|
||||
if value in (float("inf"), float("-inf")):
|
||||
return str(value)
|
||||
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
|
||||
return text if text else "0"
|
||||
return model._safe_str(value)
|
||||
|
||||
|
||||
def _fmt_pct_ratio(value, decimals: int = 1) -> str:
|
||||
"""Format a 0..1 ratio as a percentage."""
|
||||
if value is None:
|
||||
return "—"
|
||||
try:
|
||||
return f"{float(value) * 100:.{decimals}f}%"
|
||||
except (TypeError, ValueError):
|
||||
return model._safe_str(value)
|
||||
|
||||
|
||||
def _fmt_pct_already(value, decimals: int = 2) -> str:
|
||||
"""Format a value that is *already* a 0..100 percentage."""
|
||||
if value is None:
|
||||
return "—"
|
||||
try:
|
||||
return f"{float(value):.{decimals}f}%"
|
||||
except (TypeError, ValueError):
|
||||
return model._safe_str(value)
|
||||
|
||||
|
||||
def _is_dict(v) -> bool:
|
||||
return isinstance(v, dict)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Cluster projection: prefer a pre-computed result, else compute it live, else
|
||||
# fall back to the uncoloured PCA projection.
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _resolve_cluster_projection(profile: dict, ctx: dict):
|
||||
"""Return (projection_dict_or_None, source_label).
|
||||
|
||||
Order: ctx/profile['cluster_projection'] (pre-computed) → live
|
||||
project_clusters_2d on ctx/profile['raw_numeric'] → None.
|
||||
"""
|
||||
pre = ctx.get("cluster_projection") or profile.get("cluster_projection")
|
||||
models = profile.get("models") if _is_dict(profile.get("models")) else {}
|
||||
if not pre and _is_dict(models):
|
||||
pre = models.get("cluster_projection")
|
||||
if _is_dict(pre) and pre.get("points"):
|
||||
return pre, "precomputed"
|
||||
|
||||
raw = ctx.get("raw_numeric") or profile.get("raw_numeric")
|
||||
if _is_dict(raw) and raw:
|
||||
try:
|
||||
# Import the submodule's function explicitly (avoid the package
|
||||
# attribute shadowing the function with the same-named module).
|
||||
from datascience.project_clusters_2d import project_clusters_2d
|
||||
proj = project_clusters_2d(raw)
|
||||
if _is_dict(proj) and proj.get("points"):
|
||||
return proj, "live"
|
||||
except Exception: # noqa: BLE001 — never break the chapter.
|
||||
return None, "none"
|
||||
return None, "none"
|
||||
|
||||
|
||||
def _cluster_titles(profile: dict, ctx: dict, projection: dict):
|
||||
"""Return a list of {cluster, title, description} for the segments.
|
||||
|
||||
Order: ctx['cluster_titles'] (pre-computed) → live describe_clusters_llm when
|
||||
ctx['run_cluster_llm'] and we have cluster_profiles → derived titles from the
|
||||
distinctive features → None.
|
||||
"""
|
||||
pre = ctx.get("cluster_titles")
|
||||
if isinstance(pre, list) and pre:
|
||||
return [c for c in pre if _is_dict(c)]
|
||||
|
||||
profiles = (projection or {}).get("cluster_profiles") or []
|
||||
feats = (projection or {}).get("feature_names") or []
|
||||
if ctx.get("run_cluster_llm") and profiles:
|
||||
try:
|
||||
from datascience.describe_clusters_llm import describe_clusters_llm
|
||||
out = describe_clusters_llm(
|
||||
profiles, feats,
|
||||
model=ctx.get("cluster_llm_model", "claude-haiku-4-5-20251001"))
|
||||
clusters = (out or {}).get("clusters")
|
||||
if isinstance(clusters, list) and clusters:
|
||||
return [c for c in clusters if _is_dict(c)]
|
||||
except Exception: # noqa: BLE001
|
||||
pass
|
||||
|
||||
# Derived fallback: name each cluster by its distinctive features.
|
||||
if profiles:
|
||||
derived = []
|
||||
for p in profiles:
|
||||
if not _is_dict(p):
|
||||
continue
|
||||
cid = p.get("cluster", len(derived))
|
||||
dist = p.get("distinctive") or []
|
||||
label = ", ".join(model._safe_str(d) for d in dist[:2]) if dist else ""
|
||||
title = f"Segmento {cid}" + (f" — {label}" if label else "")
|
||||
derived.append({"cluster": cid, "title": title, "description": ""})
|
||||
if derived:
|
||||
return derived
|
||||
return None
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Figure builders (lazy: matplotlib only imported when the renderer draws them).
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _make_scree(pca: dict):
|
||||
"""Return a zero-arg callable drawing the PCA scree plot, or None."""
|
||||
evr = pca.get("explained_variance_ratio") or []
|
||||
cum = pca.get("cumulative") or []
|
||||
if not evr:
|
||||
return None
|
||||
|
||||
def _draw():
|
||||
import matplotlib
|
||||
matplotlib.use("Agg")
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
comps = list(range(1, len(evr) + 1))
|
||||
fig, ax = plt.subplots(figsize=(7.0, 4.2))
|
||||
ax.bar(comps, evr, color="#4e79a7", alpha=0.85,
|
||||
label="Varianza explicada")
|
||||
if cum:
|
||||
ax.plot(comps[:len(cum)], cum, color="#e15759", marker="o",
|
||||
linewidth=1.8, label="Acumulada")
|
||||
ax.set_xlabel("Componente principal")
|
||||
ax.set_ylabel("Proporción de varianza")
|
||||
ax.set_xticks(comps)
|
||||
ax.set_ylim(0, 1.0)
|
||||
ax.grid(axis="y", color="#dddddd", linewidth=0.6)
|
||||
ax.legend(loc="best", fontsize=8, frameon=False)
|
||||
ax.set_title("Varianza explicada por componente (PCA)", fontsize=10)
|
||||
fig.tight_layout()
|
||||
return fig
|
||||
|
||||
return _draw
|
||||
|
||||
|
||||
def _make_cluster_scatter(projection: dict):
|
||||
"""Return a zero-arg callable drawing the cluster scatter, or None."""
|
||||
points = projection.get("points") or []
|
||||
labels = projection.get("labels") or []
|
||||
if not points or len(points) != len(labels):
|
||||
return None
|
||||
centers = projection.get("centers_2d") or []
|
||||
explained = projection.get("explained_2d") or []
|
||||
|
||||
def _draw():
|
||||
import matplotlib
|
||||
matplotlib.use("Agg")
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
fig, ax = plt.subplots(figsize=(7.0, 5.2))
|
||||
uniq = sorted(set(int(l) for l in labels))
|
||||
for cl in uniq:
|
||||
xs = [p[0] for p, l in zip(points, labels) if int(l) == cl]
|
||||
ys = [p[1] for p, l in zip(points, labels) if int(l) == cl]
|
||||
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
|
||||
ax.scatter(xs, ys, s=14, c=color, alpha=0.7, linewidths=0,
|
||||
label=f"Cluster {cl} (n={len(xs)})")
|
||||
for cl, c in enumerate(centers):
|
||||
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
|
||||
ax.scatter([c[0]], [c[1]], s=180, c=color, marker="X",
|
||||
edgecolors="black", linewidths=1.2, zorder=5)
|
||||
xlab, ylab = "PC1", "PC2"
|
||||
if len(explained) >= 2:
|
||||
xlab = f"PC1 ({_fmt_pct_ratio(explained[0])} var.)"
|
||||
ylab = f"PC2 ({_fmt_pct_ratio(explained[1])} var.)"
|
||||
ax.set_xlabel(xlab)
|
||||
ax.set_ylabel(ylab)
|
||||
ax.set_title("Segmentos KMeans proyectados sobre el plano PCA",
|
||||
fontsize=10)
|
||||
ax.grid(color="#eeeeee", linewidth=0.5)
|
||||
ax.legend(loc="best", fontsize=8, frameon=True, framealpha=0.9)
|
||||
fig.tight_layout()
|
||||
return fig
|
||||
|
||||
return _draw
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Section builders. Each returns a list of blocks (possibly empty).
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _normalization_intro() -> list:
|
||||
text = (
|
||||
"Estos modelos son **no supervisados**: buscan estructura latente sin "
|
||||
"una variable objetivo. Antes de aplicarlos, todas las columnas "
|
||||
"numéricas se **estandarizan con z-score** (cada valor menos la media, "
|
||||
"dividido por la desviación típica). Sin esta normalización, una "
|
||||
"variable con escala grande (p.ej. ingresos en euros) dominaría las "
|
||||
"distancias y la varianza frente a otra de escala pequeña (p.ej. un "
|
||||
"ratio entre 0 y 1), sesgando tanto el PCA como el KMeans. Tras la "
|
||||
"estandarización todas las variables pesan por igual."
|
||||
)
|
||||
return [model.Heading(text="Modelos no supervisados", level=1),
|
||||
model.Markdown(text=text)]
|
||||
|
||||
|
||||
def _pca_section(pca: dict) -> list:
|
||||
if not _is_dict(pca) or not pca.get("explained_variance_ratio"):
|
||||
return []
|
||||
blocks = [model.Heading(text="PCA — varianza explicada", level=2)]
|
||||
|
||||
n_used = pca.get("n_rows_used")
|
||||
n_feat = pca.get("n_features")
|
||||
intro = (
|
||||
f"El PCA resume {_fmt_num(n_feat)} variables numéricas en componentes "
|
||||
f"ortogonales ordenados por la varianza que capturan "
|
||||
f"({_fmt_num(n_used)} filas usadas tras eliminar nulos). El gráfico de "
|
||||
"sedimentación (scree) muestra cuánta varianza aporta cada componente y "
|
||||
"su acumulado: un codo marca cuántos componentes bastan."
|
||||
)
|
||||
blocks.append(model.Markdown(text=intro))
|
||||
|
||||
scree = _make_scree(pca)
|
||||
if scree is not None:
|
||||
blocks.append(model.Figure(
|
||||
make=scree, caption="Varianza explicada y acumulada por componente."))
|
||||
|
||||
evr = pca.get("explained_variance_ratio") or []
|
||||
cum = pca.get("cumulative") or []
|
||||
rows = []
|
||||
for i, v in enumerate(evr):
|
||||
acc = cum[i] if i < len(cum) else None
|
||||
rows.append([f"PC{i + 1}", _fmt_pct_ratio(v), _fmt_pct_ratio(acc)])
|
||||
if rows:
|
||||
blocks.append(model.DataTable(
|
||||
header=["Componente", "Varianza", "Acumulada"], rows=rows,
|
||||
title="Varianza por componente"))
|
||||
|
||||
# Top loadings: keep the strongest features per component (capped).
|
||||
loadings = pca.get("top_loadings") or []
|
||||
if loadings:
|
||||
per_comp: dict = {}
|
||||
for ld in loadings:
|
||||
if not _is_dict(ld):
|
||||
continue
|
||||
comp = ld.get("component")
|
||||
per_comp.setdefault(comp, [])
|
||||
if len(per_comp[comp]) < 4:
|
||||
per_comp[comp].append(ld)
|
||||
rows = []
|
||||
for comp in sorted(per_comp.keys(), key=lambda x: (x is None, x)):
|
||||
for ld in per_comp[comp]:
|
||||
rows.append([f"PC{int(comp) + 1}" if comp is not None else "—",
|
||||
model._safe_str(ld.get("feature")),
|
||||
_fmt_num(ld.get("loading"))])
|
||||
if rows:
|
||||
blocks.append(model.DataTable(
|
||||
header=["Componente", "Variable", "Carga"], rows=rows,
|
||||
title="Cargas principales (top por componente)",
|
||||
note="Cargas con mayor valor absoluto: qué variables definen "
|
||||
"cada eje."))
|
||||
return blocks
|
||||
|
||||
|
||||
def _kmeans_section(kmeans: dict, projection: dict, titles) -> list:
|
||||
has_km = _is_dict(kmeans) and kmeans.get("best_k")
|
||||
has_proj = _is_dict(projection) and projection.get("points")
|
||||
if not has_km and not has_proj:
|
||||
return []
|
||||
|
||||
blocks = [model.Heading(text="Segmentación (KMeans)", level=2)]
|
||||
|
||||
best_k = (projection or {}).get("best_k") or (kmeans or {}).get("best_k")
|
||||
sil = (projection or {}).get("silhouette")
|
||||
if sil is None:
|
||||
sil = (kmeans or {}).get("silhouette")
|
||||
intro = (
|
||||
f"KMeans agrupa las filas en **{_fmt_num(best_k)} segmentos** elegidos "
|
||||
"automáticamente maximizando el coeficiente de *silhouette* "
|
||||
f"(**{_fmt_num(sil)}**, rango −1 a 1: cuanto más alto, segmentos más "
|
||||
"compactos y separados). Los segmentos se proyectan sobre el plano de "
|
||||
"los dos primeros componentes principales para visualizarlos."
|
||||
)
|
||||
blocks.append(model.Markdown(text=intro))
|
||||
|
||||
if has_proj:
|
||||
scatter = _make_cluster_scatter(projection)
|
||||
if scatter is not None:
|
||||
blocks.append(model.Figure(
|
||||
make=scatter,
|
||||
caption="Cada punto es una fila coloreada por su segmento "
|
||||
"KMeans; las «X» son los centroides."))
|
||||
else:
|
||||
blocks.append(model.Note(
|
||||
"Proyección de clusters no dibujable (puntos y etiquetas "
|
||||
"desalineados)."))
|
||||
else:
|
||||
# We have kmeans stats but no aligned points+labels to colour by.
|
||||
blocks.append(model.Note(
|
||||
"Scatter coloreado por segmento no disponible: el perfil no incluye "
|
||||
"la proyección con etiquetas alineadas (pásala en "
|
||||
"ctx['cluster_projection'] o las columnas crudas en "
|
||||
"ctx['raw_numeric'] para colorear el plano PCA)."))
|
||||
|
||||
# Cluster sizes table.
|
||||
sizes = (projection or {}).get("cluster_sizes") or (kmeans or {}).get("cluster_sizes") or []
|
||||
total = sum(s for s in sizes if isinstance(s, (int, float))) or 0
|
||||
if sizes:
|
||||
rows = []
|
||||
for i, s in enumerate(sizes):
|
||||
pct = (s / total) if total else None
|
||||
rows.append([f"Cluster {i}", _fmt_num(s), _fmt_pct_ratio(pct)])
|
||||
blocks.append(model.DataTable(
|
||||
header=["Segmento", "Tamaño", "% del total"], rows=rows,
|
||||
title="Tamaño de cada segmento"))
|
||||
|
||||
# Per-cluster LLM micro-analysis (each entry kept indivisible as one block).
|
||||
if titles:
|
||||
blocks.append(model.Heading(text="Interpretación de los segmentos",
|
||||
level=3))
|
||||
for t in titles:
|
||||
if not _is_dict(t):
|
||||
continue
|
||||
cid = t.get("cluster")
|
||||
title = model._safe_str(t.get("title")) or f"Cluster {cid}"
|
||||
desc = model._safe_str(t.get("description"))
|
||||
line = f"**Cluster {cid} — {title}.**"
|
||||
if desc:
|
||||
line += " " + desc
|
||||
blocks.append(model.Markdown(text=line))
|
||||
return blocks
|
||||
|
||||
|
||||
def _outliers_section(outliers: dict) -> list:
|
||||
if not _is_dict(outliers) or outliers.get("n_outliers") is None:
|
||||
return []
|
||||
if outliers.get("note") and not outliers.get("n_rows_used"):
|
||||
# insufficient data — nothing meaningful to show.
|
||||
return []
|
||||
blocks = [model.Heading(text="Detección de anomalías (Isolation Forest)",
|
||||
level=2)]
|
||||
explain = (
|
||||
"**Isolation Forest** detecta filas anómalas de forma *multivariante*: "
|
||||
"construye árboles que parten el espacio con cortes aleatorios y mide "
|
||||
"cuántos cortes hacen falta para aislar cada fila. Las filas raras "
|
||||
"(combinaciones de valores poco frecuentes considerando **todas las "
|
||||
"columnas a la vez**, no una sola) se aíslan con muy pocos cortes y "
|
||||
"obtienen un score bajo. El **umbral** de decisión separa las filas "
|
||||
"normales de las anómalas según la contaminación esperada del modelo: "
|
||||
"una fila es outlier cuando su score queda por debajo de ese umbral."
|
||||
)
|
||||
blocks.append(model.Markdown(text=explain))
|
||||
blocks.append(model.KVTable(rows=[
|
||||
("Filas analizadas", _fmt_num(outliers.get("n_rows_used"))),
|
||||
("Outliers detectados", _fmt_num(outliers.get("n_outliers"))),
|
||||
("% outliers", _fmt_pct_already(outliers.get("outlier_pct"))),
|
||||
("Umbral de decisión", _fmt_num(outliers.get("threshold"), 4)),
|
||||
], title="Anomalías multivariantes"))
|
||||
return blocks
|
||||
|
||||
|
||||
def _normality_section(normality: dict) -> list:
|
||||
if not _is_dict(normality) or not normality:
|
||||
return []
|
||||
header = ["Columna", "Jarque-Bera (p)", "D'Agostino (p)", "Shapiro (p)",
|
||||
"¿Normal?"]
|
||||
rows = []
|
||||
for col, res in normality.items():
|
||||
if not _is_dict(res):
|
||||
continue
|
||||
jb = res.get("jarque_bera") if _is_dict(res.get("jarque_bera")) else {}
|
||||
da = res.get("dagostino") if _is_dict(res.get("dagostino")) else {}
|
||||
sh = res.get("shapiro") if _is_dict(res.get("shapiro")) else {}
|
||||
is_norm = res.get("is_normal")
|
||||
if res.get("note") and is_norm is None and not jb:
|
||||
rows.append([model._safe_str(col), "—", "—", "—",
|
||||
model._safe_str(res.get("note"))])
|
||||
continue
|
||||
rows.append([
|
||||
model._safe_str(col),
|
||||
_fmt_num(jb.get("p"), 4) if jb else "—",
|
||||
_fmt_num(da.get("p"), 4) if da else "—",
|
||||
_fmt_num(sh.get("p"), 4) if sh else "—",
|
||||
"sí" if is_norm else ("no" if is_norm is not None else "—"),
|
||||
])
|
||||
if not rows:
|
||||
return []
|
||||
return [
|
||||
model.Heading(text="Normalidad de las variables", level=2),
|
||||
model.Markdown(text=(
|
||||
"Tests de hipótesis de normalidad por columna (hipótesis nula: la "
|
||||
"muestra proviene de una distribución normal). Se marca **normal** "
|
||||
"cuando el p-valor supera 0,05 (no se rechaza la nula). Pocas "
|
||||
"variables reales son estrictamente normales; esto orienta qué "
|
||||
"transformaciones o tests robustos aplicar después.")),
|
||||
model.DataTable(header=header, rows=rows,
|
||||
title="Pruebas de normalidad"),
|
||||
]
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Entry point.
|
||||
# --------------------------------------------------------------------------- #
|
||||
def build_modelos(profile: dict, ctx: dict):
|
||||
"""Build the MODELOS Chapter, or None if there are no models to show."""
|
||||
profile = profile or {}
|
||||
ctx = ctx or {}
|
||||
if not isinstance(profile, dict):
|
||||
return None
|
||||
models = profile.get("models")
|
||||
if not _is_dict(models):
|
||||
return None
|
||||
|
||||
pca = models.get("pca") if _is_dict(models.get("pca")) else None
|
||||
kmeans = models.get("kmeans") if _is_dict(models.get("kmeans")) else None
|
||||
outliers = models.get("outliers") if _is_dict(models.get("outliers")) else None
|
||||
normality = models.get("normality") if _is_dict(models.get("normality")) else None
|
||||
|
||||
projection, _src = _resolve_cluster_projection(profile, ctx)
|
||||
titles = _cluster_titles(profile, ctx, projection) if (
|
||||
(kmeans and kmeans.get("best_k")) or (projection and projection.get("points"))
|
||||
) else None
|
||||
|
||||
sections = []
|
||||
sections += _pca_section(pca) if pca else []
|
||||
sections += _kmeans_section(kmeans, projection, titles)
|
||||
sections += _outliers_section(outliers) if outliers else []
|
||||
sections += _normality_section(normality) if normality else []
|
||||
|
||||
if not sections:
|
||||
return None # models block present but nothing renderable.
|
||||
|
||||
blocks = _normalization_intro() + sections
|
||||
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
|
||||
version=CHAPTER_VERSION, blocks=blocks)
|
||||
@@ -1,259 +0,0 @@
|
||||
"""Tests for the MODELOS chapter — DoD: golden + edges + anti-cut.
|
||||
|
||||
Self-contained: builds a synthetic TableProfile with a ``models`` block (no
|
||||
DuckDB, no sklearn, no LLM, no network). The cluster scatter is fed a synthetic
|
||||
pre-computed ``cluster_projection`` via ``ctx`` and the per-cluster titles via
|
||||
``ctx['cluster_titles']`` so the suite is fast and deterministic. The live paths
|
||||
(``project_clusters_2d`` / ``describe_clusters_llm``) are exercised against the
|
||||
real wine dataset in the work report, not here.
|
||||
|
||||
Verifies: the chapter renders to PDF *and* PPTX showing the user-required pieces
|
||||
(markdown text, PCA scree, cluster scatter, per-cluster LLM micro-analysis,
|
||||
outlier + normalization explanations); that an inapplicable profile yields None
|
||||
without raising; and that a long normality table is split without losing any
|
||||
column (anti-cut).
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import tempfile
|
||||
|
||||
from pypdf import PdfReader
|
||||
from pptx import Presentation
|
||||
|
||||
from datascience.automatic_eda.chapters.modelos import build_modelos
|
||||
from datascience.automatic_eda.model import Figure, DataTable, Markdown
|
||||
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
|
||||
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Synthetic fixtures.
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _models_block(n_norm_cols: int = 4) -> dict:
|
||||
feats = ["fixed_acidity", "alcohol", "ph", "sulphates"]
|
||||
normality = {}
|
||||
for i in range(n_norm_cols):
|
||||
normality[f"col_{i}"] = {
|
||||
"n": 500,
|
||||
"jarque_bera": {"stat": 12.3, "p": 0.002 + i * 0.0001, "normal": False},
|
||||
"dagostino": {"stat": 9.1, "p": 0.01, "normal": False},
|
||||
"shapiro": {"stat": 0.98, "p": 0.04, "normal": False},
|
||||
"is_normal": False,
|
||||
}
|
||||
return {
|
||||
"n_numeric_cols": 4,
|
||||
"pca": {
|
||||
"n_components": 2, "n_rows_used": 1599, "n_features": 4,
|
||||
"explained_variance_ratio": [0.41, 0.22],
|
||||
"cumulative": [0.41, 0.63],
|
||||
"top_loadings": [
|
||||
{"component": 0, "feature": "alcohol", "loading": 0.62},
|
||||
{"component": 0, "feature": "fixed_acidity", "loading": -0.48},
|
||||
{"component": 1, "feature": "ph", "loading": 0.71},
|
||||
{"component": 1, "feature": "sulphates", "loading": 0.33},
|
||||
],
|
||||
"projection": [[0.1, 0.2], [0.3, -0.1]],
|
||||
},
|
||||
"kmeans": {
|
||||
"best_k": 3, "silhouette": 0.27,
|
||||
"scores_by_k": [{"k": 2, "silhouette": 0.21}, {"k": 3, "silhouette": 0.27}],
|
||||
"cluster_sizes": [700, 500, 399],
|
||||
"centers": [[0.1, 0.2, 0.3, 0.4]],
|
||||
"n_rows_used": 1599, "n_features": 4,
|
||||
},
|
||||
"outliers": {
|
||||
"n_outliers": 80, "outlier_pct": 5.0, "threshold": -0.0123,
|
||||
"n_rows_used": 1599,
|
||||
},
|
||||
"normality": normality,
|
||||
"note": "",
|
||||
"_feats": feats,
|
||||
}
|
||||
|
||||
|
||||
def _cluster_projection() -> dict:
|
||||
# 30 points across 3 clusters, aligned points<->labels.
|
||||
points, labels = [], []
|
||||
centers = [(-2.0, -2.0), (2.0, 0.0), (0.0, 2.5)]
|
||||
for cl, (cx, cy) in enumerate(centers):
|
||||
for j in range(10):
|
||||
points.append([cx + (j - 5) * 0.05, cy + (j - 5) * 0.05])
|
||||
labels.append(cl)
|
||||
return {
|
||||
"points": points, "labels": labels,
|
||||
"centers_2d": [list(c) for c in centers],
|
||||
"best_k": 3, "silhouette": 0.27,
|
||||
"explained_2d": [0.41, 0.22],
|
||||
"cluster_sizes": [10, 10, 10],
|
||||
"cluster_profiles": [
|
||||
{"cluster": 0, "size": 10, "pct": 0.33,
|
||||
"centroid_original": {"alcohol": 9.5, "ph": 3.5},
|
||||
"distinctive": ["alcohol", "ph"], "centroid_z": {"alcohol": -1.2}},
|
||||
{"cluster": 1, "size": 10, "pct": 0.33,
|
||||
"centroid_original": {"alcohol": 12.0, "ph": 3.1},
|
||||
"distinctive": ["alcohol"], "centroid_z": {"alcohol": 1.4}},
|
||||
{"cluster": 2, "size": 10, "pct": 0.33,
|
||||
"centroid_original": {"alcohol": 10.5, "ph": 3.8},
|
||||
"distinctive": ["ph"], "centroid_z": {"ph": 1.6}},
|
||||
],
|
||||
"feature_names": ["alcohol", "ph", "fixed_acidity", "sulphates"],
|
||||
"n_used": 1599, "note": "",
|
||||
}
|
||||
|
||||
|
||||
def _ctx_full() -> dict:
|
||||
return {
|
||||
"cluster_projection": _cluster_projection(),
|
||||
"cluster_titles": [
|
||||
{"cluster": 0, "title": "Vinos suaves de baja graduación",
|
||||
"description": "Alcohol bajo y pH alto; perfil ligero."},
|
||||
{"cluster": 1, "title": "Vinos potentes",
|
||||
"description": "Alta graduación alcohólica."},
|
||||
{"cluster": 2, "title": "Vinos de pH elevado",
|
||||
"description": "Acidez baja relativa al resto."},
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def _profile() -> dict:
|
||||
return {"table": "wine", "n_rows": 1599, "n_cols": 12,
|
||||
"models": _models_block()}
|
||||
|
||||
|
||||
def _pdf_text(path: str) -> str:
|
||||
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
|
||||
return re.sub(r"\s+", " ", txt)
|
||||
|
||||
|
||||
def _pptx_text(path: str) -> str:
|
||||
prs = Presentation(path)
|
||||
out = []
|
||||
for slide in prs.slides:
|
||||
for shape in slide.shapes:
|
||||
if shape.has_text_frame:
|
||||
out.append(shape.text_frame.text)
|
||||
return re.sub(r"\s+", " ", " ".join(out))
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Golden.
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_golden_build_modelos_bloques_requeridos():
|
||||
ch = build_modelos(_profile(), _ctx_full())
|
||||
assert ch is not None
|
||||
assert ch.id == "modelos" and ch.version
|
||||
# Both figures present: scree plot + cluster scatter.
|
||||
n_figures = sum(1 for b in ch.blocks if isinstance(b, Figure))
|
||||
assert n_figures >= 2
|
||||
# Tables present (variance, loadings, sizes, normality).
|
||||
assert sum(1 for b in ch.blocks if isinstance(b, DataTable)) >= 3
|
||||
# Markdown carries the required explanations.
|
||||
md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown))
|
||||
assert "z-score" in md # normalization explained
|
||||
assert "Isolation Forest" in md # outlier generation explained
|
||||
assert "silhouette" in md # kmeans
|
||||
# Per-cluster micro-analysis titles present.
|
||||
assert "Vinos potentes" in md
|
||||
assert "Cluster 1" in md
|
||||
|
||||
|
||||
def test_golden_render_pdf_muestra_lo_exigido():
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
out = os.path.join(d, "modelos.pdf")
|
||||
res = render_automatic_eda_pdf(
|
||||
_profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()})
|
||||
assert res["path"] == out and os.path.exists(out)
|
||||
ids = [c["id"] for c in res["chapters"]]
|
||||
assert "modelos" in ids
|
||||
txt = _pdf_text(out)
|
||||
for needle in ("Modelos no supervisados", "z-score", "PCA",
|
||||
"Segmentación", "Isolation Forest", "Normalidad",
|
||||
"Vinos potentes"):
|
||||
assert needle in txt, f"falta en PDF: {needle}"
|
||||
|
||||
|
||||
def test_golden_render_pptx_muestra_lo_exigido():
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
out = os.path.join(d, "modelos.pptx")
|
||||
res = render_automatic_eda_pptx(
|
||||
_profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()})
|
||||
assert res["path"] == out and os.path.exists(out)
|
||||
assert res["n_slides"] >= 1
|
||||
txt = _pptx_text(out)
|
||||
for needle in ("Modelos no supervisados", "z-score", "Isolation Forest",
|
||||
"Vinos potentes"):
|
||||
assert needle in txt, f"falta en PPTX: {needle}"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Edges.
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_edge_profile_none_o_vacio_devuelve_none():
|
||||
assert build_modelos(None, {}) is None
|
||||
assert build_modelos({}, {}) is None
|
||||
assert build_modelos({"n_rows": 5}, None) is None # no 'models' key
|
||||
|
||||
|
||||
def test_edge_models_insuficiente_devuelve_none():
|
||||
prof = {"table": "tiny", "models": {
|
||||
"n_numeric_cols": 1,
|
||||
"pca": {"n_components": 0, "explained_variance_ratio": [],
|
||||
"note": "datos insuficientes"},
|
||||
"kmeans": {"best_k": 0, "note": "datos insuficientes"},
|
||||
"outliers": {"n_outliers": 0, "note": "datos insuficientes"},
|
||||
"normality": None,
|
||||
"note": "insuficientes columnas numericas para modelos multivariantes",
|
||||
}}
|
||||
assert build_modelos(prof, {}) is None
|
||||
|
||||
|
||||
def test_edge_solo_normalidad_si_genera_capitulo():
|
||||
# A single numeric column: only normality applies. Chapter must still build.
|
||||
prof = {"table": "one", "models": {
|
||||
"n_numeric_cols": 1, "pca": None, "kmeans": None, "outliers": None,
|
||||
"normality": {"x": {"n": 500, "jarque_bera": {"stat": 1.0, "p": 0.2,
|
||||
"normal": True}, "dagostino": {"stat": 1.0, "p": 0.3,
|
||||
"normal": True}, "shapiro": {"stat": 0.99, "p": 0.4,
|
||||
"normal": True}, "is_normal": True}},
|
||||
}}
|
||||
ch = build_modelos(prof, {})
|
||||
assert ch is not None
|
||||
md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown))
|
||||
assert "z-score" in md # normalization intro still present
|
||||
|
||||
|
||||
def test_edge_kmeans_sin_proyeccion_degrada_sin_romper():
|
||||
# kmeans stats present but no cluster_projection / raw_numeric to colour by.
|
||||
prof = _profile()
|
||||
ch = build_modelos(prof, {}) # no ctx projection
|
||||
assert ch is not None
|
||||
# No scatter figure for clusters, but a Note explaining the degradation.
|
||||
notes = [b.text for b in ch.blocks if b.kind == "note"]
|
||||
assert any("ctx['raw_numeric']" in n or "cluster_projection" in n
|
||||
for n in notes)
|
||||
# PDF still renders fine.
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
out = os.path.join(d, "deg.pdf")
|
||||
res = render_automatic_eda_pdf(prof, out, {"write_manifest": False})
|
||||
assert res["path"] == out and os.path.exists(out)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Anti-cut.
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_anticortes_tabla_normalidad_larga_no_corta():
|
||||
# 40 numeric columns → the normality DataTable must split across pages,
|
||||
# repeating the header, without losing any column name.
|
||||
prof = {"table": "wide", "models": _models_block(n_norm_cols=40)}
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
out = os.path.join(d, "wide.pdf")
|
||||
render_automatic_eda_pdf(prof, out, {"write_manifest": False,
|
||||
"ctx": _ctx_full()})
|
||||
reader = PdfReader(out)
|
||||
n_pages = len(reader.pages)
|
||||
assert n_pages > 1
|
||||
txt = "".join((pg.extract_text() or "") for pg in reader.pages)
|
||||
# Every column name survives (wrapped/split, never truncated).
|
||||
for i in (0, 19, 39):
|
||||
assert f"col_{i}" in txt
|
||||
@@ -0,0 +1,115 @@
|
||||
---
|
||||
id: categorical_cardinality_block_py_datascience
|
||||
name: categorical_cardinality_block
|
||||
kind: function
|
||||
lang: py
|
||||
domain: datascience
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def categorical_cardinality_block(cat: dict, n_rows: int) -> dict"
|
||||
description: "Deriva métricas de cardinalidad listas para renderizar a partir de la salida de summarize_categorical para UNA columna categórica más el número total de filas. Calcula pct_distinct, entropy_max=log2(n_distinct), entropy_norm (recortada a [0,1]), n_singletons (sobre el top visible) y los flags id_like / dominated. NO recalcula la entropía ni reimplementa summarize_categorical: la consume. Estilo dict-no-throw del grupo eda — nunca lanza."
|
||||
tags: [eda, categorical, cardinality, entropy, profiling, datascience, pure]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [math]
|
||||
example: |
|
||||
from categorical_cardinality_block import categorical_cardinality_block
|
||||
cat = {"top": [{"value": "a", "count": 5, "pct": 0.5}], "mode": "a",
|
||||
"mode_pct": 0.5, "n_distinct": 4, "entropy": 1.685, "imbalance": 5.0,
|
||||
"len_min": 1, "len_mean": 1.0, "len_max": 1}
|
||||
block = categorical_cardinality_block(cat, n_rows=10)
|
||||
tested: true
|
||||
tests:
|
||||
- "test_normal_case"
|
||||
- "test_empty_cat_does_not_raise"
|
||||
- "test_none_cat_does_not_raise"
|
||||
- "test_n_rows_zero_no_zero_division"
|
||||
- "test_id_like_when_distinct_near_rows"
|
||||
- "test_dominated_when_mode_pct_high"
|
||||
- "test_mode_pct_fallback_from_top_fraction"
|
||||
- "test_n_singletons_partial_when_top_truncated"
|
||||
- "test_single_distinct_value_entropy_norm_none"
|
||||
test_file_path: "python/functions/datascience/categorical_cardinality_block_test.py"
|
||||
file_path: "python/functions/datascience/categorical_cardinality_block.py"
|
||||
params:
|
||||
- name: cat
|
||||
desc: "Dict producido por summarize_categorical para UNA columna categórica. Claves leídas (todas opcionales, lectura defensiva): top (list de {value,count,pct}), mode, mode_pct (puede faltar), n_distinct, entropy (Shannon en bits), imbalance, len_min, len_mean, len_max. None o no-dict se tratan como {}."
|
||||
- name: n_rows
|
||||
desc: "Número total de filas del dataset. Usado para pct_distinct. Si es 0 o None, pct_distinct sale None (sin ZeroDivisionError)."
|
||||
output: "Dict con exactamente 16 claves, todas siempre presentes: n_distinct, n_rows, pct_distinct, entropy, entropy_max, entropy_norm, mode, mode_pct, imbalance, n_singletons, n_singletons_partial, len_min, len_mean, len_max, id_like, dominated. Valores None/False cuando no son derivables; la función nunca lanza. pct_distinct en escala 0-100. entropy_max=log2(n_distinct) (0.0 si n_distinct in {0,1}). entropy_norm=entropy/entropy_max recortada a [0,1]. n_singletons = nº de elementos de top con count==1 (None si top vacío). n_singletons_partial=True si n_distinct>len(top). id_like=pct_distinct>=99. dominated=mode_pct>=90."
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
from categorical_cardinality_block import categorical_cardinality_block
|
||||
|
||||
# Salida típica de summarize_categorical para una columna, con n_rows del dataset.
|
||||
cat = {
|
||||
"top": [
|
||||
{"value": "a", "count": 5, "pct": 0.5},
|
||||
{"value": "b", "count": 3, "pct": 0.3},
|
||||
{"value": "c", "count": 1, "pct": 0.1},
|
||||
{"value": "d", "count": 1, "pct": 0.1},
|
||||
],
|
||||
"mode": "a",
|
||||
"mode_pct": 0.5,
|
||||
"n_distinct": 4,
|
||||
"entropy": 1.685, # Shannon en bits (<= log2(4) = 2.0)
|
||||
"imbalance": 5.0,
|
||||
"len_min": 1, "len_mean": 1.0, "len_max": 1,
|
||||
}
|
||||
|
||||
categorical_cardinality_block(cat, n_rows=10)
|
||||
# {
|
||||
# "n_distinct": 4, "n_rows": 10,
|
||||
# "pct_distinct": 40.0, # 4 / 10 * 100
|
||||
# "entropy": 1.685,
|
||||
# "entropy_max": 2.0, # log2(4)
|
||||
# "entropy_norm": 0.8425, # 1.685 / 2.0, recortado a [0,1]
|
||||
# "mode": "a", "mode_pct": 0.5,
|
||||
# "imbalance": 5.0,
|
||||
# "n_singletons": 2, # c y d con count == 1
|
||||
# "n_singletons_partial": False, # top cubre los 4 distintos
|
||||
# "len_min": 1, "len_mean": 1.0, "len_max": 1,
|
||||
# "id_like": False, # pct_distinct 40 < 99
|
||||
# "dominated": False, # mode_pct 0.5 < 90
|
||||
# }
|
||||
```
|
||||
|
||||
## Cuando usarla
|
||||
|
||||
Úsala justo después de `summarize_categorical`, cuando vayas a renderizar el
|
||||
bloque de cardinalidad de una columna categórica en un EDA: necesitas el ratio
|
||||
de valores distintos (`pct_distinct`), la entropía normalizada al rango `[0,1]`
|
||||
para comparar columnas con cardinalidades distintas, el conteo de singletons, y
|
||||
las banderas heurísticas `id_like` (la columna parece un identificador) y
|
||||
`dominated` (una sola categoría domina). Pásale el dict crudo de
|
||||
`summarize_categorical` para esa columna y el `n_rows` total del dataset. No
|
||||
reimplementa nada: solo deriva métricas de presentación a partir de lo ya
|
||||
calculado.
|
||||
|
||||
## Gotchas
|
||||
|
||||
- **`mode_pct` se pasa tal cual viene en `cat`.** `summarize_categorical`
|
||||
produce `mode_pct` como **fracción** (0–1), no como porcentaje. El flag
|
||||
`dominated` compara `mode_pct >= 90.0`, así que con la salida cruda de
|
||||
`summarize_categorical` (fracciones) `dominated` no se dispara: aliméntalo con
|
||||
`mode_pct` en escala 0–100 si quieres usar esa bandera. Solo el camino de
|
||||
*fallback* (cuando `cat` no trae `mode_pct` y se deriva de `top[0]['pct']`)
|
||||
normaliza una fracción `<= 1` multiplicándola por 100.
|
||||
- **`n_singletons` solo cubre el `top` visible.** Si `summarize_categorical` se
|
||||
llamó con `top_k` pequeño, hay valores fuera del top; en ese caso
|
||||
`n_singletons_partial` es `True` para avisar de que el conteo es parcial.
|
||||
- **`pct_distinct` es `None` si `n_rows` es 0 o `None`** (no lanza
|
||||
`ZeroDivisionError`); por tanto `id_like` queda `False` en ese caso.
|
||||
- **`entropy_norm` es `None` cuando `entropy_max <= 0`** (columna constante,
|
||||
`n_distinct in {0,1}`): no hay división por cero y no se inventa un 0/1.
|
||||
- **No recalcula la entropía.** Si `cat['entropy']` es incoherente con
|
||||
`n_distinct`, `entropy_norm` se recorta a `[0,1]` pero el valor de entrada no
|
||||
se corrige.
|
||||
- **`bool` no cuenta como número.** Un `True`/`False` en una clave numérica de
|
||||
`cat` se trata como ausente (`None`), por la guarda defensiva.
|
||||
@@ -0,0 +1,132 @@
|
||||
"""Pure EDA helper: cardinality metrics block from a `summarize_categorical` output.
|
||||
|
||||
Part of the `eda` capability group. Consumes the per-column dict produced by
|
||||
``summarize_categorical`` (for a single categorical/text column) plus the total
|
||||
row count of the dataset and derives render-ready cardinality metrics: distinct
|
||||
ratio, normalized entropy, singleton count, and the ``id_like`` / ``dominated``
|
||||
flags.
|
||||
|
||||
It does NOT recompute the entropy nor reimplement ``summarize_categorical`` — it
|
||||
only reads that function's output. Dict-no-throw style of the `eda` group: it
|
||||
never raises. Missing or malformed inputs yield ``None``/``False``/``0`` for the
|
||||
affected keys, never an exception. Stdlib only (``math.log2``).
|
||||
"""
|
||||
|
||||
from math import log2
|
||||
|
||||
|
||||
def _num(value):
|
||||
"""Return ``value`` unchanged if it is a real (non-bool) number, else ``None``.
|
||||
|
||||
``bool`` is rejected on purpose: in Python ``True`` is an ``int`` but it is
|
||||
never a meaningful count/ratio here.
|
||||
"""
|
||||
if isinstance(value, bool):
|
||||
return None
|
||||
if isinstance(value, (int, float)):
|
||||
return value
|
||||
return None
|
||||
|
||||
|
||||
def categorical_cardinality_block(cat: dict, n_rows: int) -> dict:
|
||||
"""Derive cardinality metrics for one categorical column.
|
||||
|
||||
Args:
|
||||
cat: The per-column dict produced by ``summarize_categorical`` for a
|
||||
single categorical/text column. Expected (all optional, read
|
||||
defensively) keys: ``top`` (list of ``{value, count, pct}``),
|
||||
``mode``, ``mode_pct``, ``n_distinct``, ``entropy`` (Shannon, bits),
|
||||
``imbalance``, ``len_min``, ``len_mean``, ``len_max``. ``None`` or a
|
||||
non-dict is treated as ``{}``.
|
||||
n_rows: Total number of rows in the dataset (used for ``pct_distinct``).
|
||||
|
||||
Returns:
|
||||
Dict with exactly these keys, every one always present:
|
||||
``n_distinct``, ``n_rows``, ``pct_distinct``, ``entropy``,
|
||||
``entropy_max``, ``entropy_norm``, ``mode``, ``mode_pct``,
|
||||
``imbalance``, ``n_singletons``, ``n_singletons_partial``, ``len_min``,
|
||||
``len_mean``, ``len_max``, ``id_like``, ``dominated``. Values are
|
||||
``None``/``False`` when not derivable; the function never raises.
|
||||
"""
|
||||
cat = cat if isinstance(cat, dict) else {}
|
||||
|
||||
# --- passthroughs (numeric-validated, type preserved) ---
|
||||
n_distinct = _num(cat.get("n_distinct"))
|
||||
n_rows_out = _num(n_rows)
|
||||
entropy = _num(cat.get("entropy"))
|
||||
imbalance = _num(cat.get("imbalance"))
|
||||
len_min = _num(cat.get("len_min"))
|
||||
len_mean = _num(cat.get("len_mean"))
|
||||
len_max = _num(cat.get("len_max"))
|
||||
mode = cat.get("mode") # any value (or None); passthrough as-is
|
||||
|
||||
# --- pct_distinct ---
|
||||
if n_distinct is None or n_rows_out is None or n_rows_out == 0:
|
||||
pct_distinct = None
|
||||
else:
|
||||
pct_distinct = n_distinct / n_rows_out * 100.0
|
||||
|
||||
# --- entropy_max = log2(n_distinct) ---
|
||||
if n_distinct is None:
|
||||
entropy_max = None
|
||||
elif n_distinct > 1:
|
||||
entropy_max = log2(n_distinct)
|
||||
else: # n_distinct in {0, 1}
|
||||
entropy_max = 0.0
|
||||
|
||||
# --- entropy_norm = entropy / entropy_max, clipped to [0, 1] ---
|
||||
if entropy_max is not None and entropy_max > 0 and entropy is not None:
|
||||
entropy_norm = entropy / entropy_max
|
||||
entropy_norm = max(0.0, min(1.0, entropy_norm))
|
||||
else:
|
||||
entropy_norm = None
|
||||
|
||||
# --- mode_pct: prefer cat['mode_pct']; else derive from top[0].pct ---
|
||||
mode_pct = _num(cat.get("mode_pct"))
|
||||
top = cat.get("top")
|
||||
has_top = isinstance(top, (list, tuple)) and len(top) > 0
|
||||
if mode_pct is None and has_top:
|
||||
first = top[0]
|
||||
if isinstance(first, dict):
|
||||
first_pct = _num(first.get("pct"))
|
||||
if first_pct is not None:
|
||||
# Normalize to 0-100: a fraction (<= 1) becomes a percentage.
|
||||
mode_pct = first_pct * 100.0 if first_pct <= 1 else first_pct
|
||||
|
||||
# --- singletons (count == 1) within the visible top ---
|
||||
if has_top:
|
||||
n_singletons = sum(
|
||||
1
|
||||
for item in top
|
||||
if isinstance(item, dict) and _num(item.get("count")) == 1
|
||||
)
|
||||
else:
|
||||
n_singletons = None
|
||||
|
||||
# The singleton count only covers the visible top; there may be more
|
||||
# distinct values (and thus more singletons) outside it.
|
||||
top_len = len(top) if isinstance(top, (list, tuple)) else 0
|
||||
n_singletons_partial = bool(n_distinct is not None and n_distinct > top_len)
|
||||
|
||||
# --- derived flags ---
|
||||
id_like = pct_distinct is not None and pct_distinct >= 99.0
|
||||
dominated = mode_pct is not None and mode_pct >= 90.0
|
||||
|
||||
return {
|
||||
"n_distinct": n_distinct,
|
||||
"n_rows": n_rows_out,
|
||||
"pct_distinct": pct_distinct,
|
||||
"entropy": entropy,
|
||||
"entropy_max": entropy_max,
|
||||
"entropy_norm": entropy_norm,
|
||||
"mode": mode,
|
||||
"mode_pct": mode_pct,
|
||||
"imbalance": imbalance,
|
||||
"n_singletons": n_singletons,
|
||||
"n_singletons_partial": n_singletons_partial,
|
||||
"len_min": len_min,
|
||||
"len_mean": len_mean,
|
||||
"len_max": len_max,
|
||||
"id_like": id_like,
|
||||
"dominated": dominated,
|
||||
}
|
||||
@@ -0,0 +1,216 @@
|
||||
"""Tests para categorical_cardinality_block."""
|
||||
|
||||
import sys
|
||||
import os
|
||||
from math import log2
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
|
||||
from categorical_cardinality_block import categorical_cardinality_block
|
||||
|
||||
|
||||
# Output contract: every call returns exactly these 16 keys.
|
||||
EXPECTED_KEYS = {
|
||||
"n_distinct",
|
||||
"n_rows",
|
||||
"pct_distinct",
|
||||
"entropy",
|
||||
"entropy_max",
|
||||
"entropy_norm",
|
||||
"mode",
|
||||
"mode_pct",
|
||||
"imbalance",
|
||||
"n_singletons",
|
||||
"n_singletons_partial",
|
||||
"len_min",
|
||||
"len_mean",
|
||||
"len_max",
|
||||
"id_like",
|
||||
"dominated",
|
||||
}
|
||||
|
||||
|
||||
def _sample_cat():
|
||||
"""A realistic summarize_categorical output for one column."""
|
||||
return {
|
||||
"top": [
|
||||
{"value": "a", "count": 5, "pct": 0.5},
|
||||
{"value": "b", "count": 3, "pct": 0.3},
|
||||
{"value": "c", "count": 1, "pct": 0.1},
|
||||
{"value": "d", "count": 1, "pct": 0.1},
|
||||
],
|
||||
"mode": "a",
|
||||
"mode_pct": 0.5,
|
||||
"n_distinct": 4,
|
||||
"entropy": 1.685, # <= log2(4) = 2.0
|
||||
"imbalance": 5.0,
|
||||
"len_min": 1,
|
||||
"len_mean": 1.0,
|
||||
"len_max": 1,
|
||||
}
|
||||
|
||||
|
||||
def test_normal_case():
|
||||
"""Caso normal: pct_distinct, entropy_max=log2(n_distinct), entropy_norm in [0,1], n_singletons."""
|
||||
cat = _sample_cat()
|
||||
result = categorical_cardinality_block(cat, n_rows=10)
|
||||
|
||||
assert set(result.keys()) == EXPECTED_KEYS
|
||||
|
||||
# passthroughs
|
||||
assert result["n_distinct"] == 4
|
||||
assert result["n_rows"] == 10
|
||||
assert result["entropy"] == 1.685
|
||||
assert result["imbalance"] == 5.0
|
||||
assert result["mode"] == "a"
|
||||
assert result["mode_pct"] == 0.5 # passthrough, not normalized
|
||||
assert result["len_min"] == 1
|
||||
assert result["len_max"] == 1
|
||||
|
||||
# pct_distinct = 4 / 10 * 100
|
||||
assert abs(result["pct_distinct"] - 40.0) < 1e-12
|
||||
|
||||
# entropy_max = log2(4) = 2.0
|
||||
assert abs(result["entropy_max"] - log2(4)) < 1e-12
|
||||
assert abs(result["entropy_max"] - 2.0) < 1e-12
|
||||
|
||||
# entropy_norm = 1.685 / 2.0 = 0.8425, within [0, 1]
|
||||
assert abs(result["entropy_norm"] - 1.685 / 2.0) < 1e-12
|
||||
assert 0.0 <= result["entropy_norm"] <= 1.0
|
||||
|
||||
# singletons: c and d have count == 1
|
||||
assert result["n_singletons"] == 2
|
||||
# top covers all distinct values (4 == 4)
|
||||
assert result["n_singletons_partial"] is False
|
||||
|
||||
# neither id-like (40%) nor dominated (mode_pct 0.5)
|
||||
assert result["id_like"] is False
|
||||
assert result["dominated"] is False
|
||||
|
||||
|
||||
def test_empty_cat_does_not_raise():
|
||||
"""Caso cat={}: no lanza, claves derivadas None y flags False."""
|
||||
result = categorical_cardinality_block({}, n_rows=100)
|
||||
|
||||
assert set(result.keys()) == EXPECTED_KEYS
|
||||
for key in (
|
||||
"n_distinct",
|
||||
"pct_distinct",
|
||||
"entropy",
|
||||
"entropy_max",
|
||||
"entropy_norm",
|
||||
"mode",
|
||||
"mode_pct",
|
||||
"imbalance",
|
||||
"n_singletons",
|
||||
"len_min",
|
||||
"len_mean",
|
||||
"len_max",
|
||||
):
|
||||
assert result[key] is None
|
||||
assert result["n_singletons_partial"] is False
|
||||
assert result["id_like"] is False
|
||||
assert result["dominated"] is False
|
||||
# n_rows is a passthrough of the argument, still coherent.
|
||||
assert result["n_rows"] == 100
|
||||
|
||||
|
||||
def test_none_cat_does_not_raise():
|
||||
"""Caso cat=None: tratado como {}, mismas garantias que el dict vacio."""
|
||||
result = categorical_cardinality_block(None, n_rows=None)
|
||||
assert set(result.keys()) == EXPECTED_KEYS
|
||||
assert result["n_distinct"] is None
|
||||
assert result["pct_distinct"] is None
|
||||
assert result["entropy_max"] is None
|
||||
assert result["entropy_norm"] is None
|
||||
assert result["id_like"] is False
|
||||
assert result["dominated"] is False
|
||||
|
||||
|
||||
def test_n_rows_zero_no_zero_division():
|
||||
"""Caso n_rows=0: pct_distinct None sin ZeroDivisionError."""
|
||||
cat = _sample_cat()
|
||||
result = categorical_cardinality_block(cat, n_rows=0)
|
||||
assert result["pct_distinct"] is None
|
||||
# n_distinct still passes through.
|
||||
assert result["n_distinct"] == 4
|
||||
assert result["id_like"] is False
|
||||
|
||||
|
||||
def test_id_like_when_distinct_near_rows():
|
||||
"""id_like True cuando n_distinct ~ n_rows (pct_distinct >= 99)."""
|
||||
cat = {"n_distinct": 99, "entropy": 6.6, "top": [], "mode": None}
|
||||
result = categorical_cardinality_block(cat, n_rows=100)
|
||||
assert abs(result["pct_distinct"] - 99.0) < 1e-12
|
||||
assert result["id_like"] is True
|
||||
|
||||
# exact identity column: 100 / 100 = 100%
|
||||
cat_full = {"n_distinct": 100, "top": []}
|
||||
result_full = categorical_cardinality_block(cat_full, n_rows=100)
|
||||
assert result_full["id_like"] is True
|
||||
|
||||
|
||||
def test_dominated_when_mode_pct_high():
|
||||
"""dominated True cuando mode_pct alto (>= 90)."""
|
||||
cat = {
|
||||
"n_distinct": 3,
|
||||
"entropy": 0.3,
|
||||
"mode": "x",
|
||||
"mode_pct": 95.0,
|
||||
"top": [
|
||||
{"value": "x", "count": 95, "pct": 0.95},
|
||||
{"value": "y", "count": 3, "pct": 0.03},
|
||||
{"value": "z", "count": 2, "pct": 0.02},
|
||||
],
|
||||
"imbalance": 47.5,
|
||||
}
|
||||
result = categorical_cardinality_block(cat, n_rows=100)
|
||||
assert result["mode_pct"] == 95.0
|
||||
assert result["dominated"] is True
|
||||
|
||||
|
||||
def test_mode_pct_fallback_from_top_fraction():
|
||||
"""Sin mode_pct: deriva del pct del primer top, fraccion <=1 escala a 0-100."""
|
||||
cat = {
|
||||
"n_distinct": 3,
|
||||
"top": [
|
||||
{"value": "x", "count": 95, "pct": 0.95},
|
||||
{"value": "y", "count": 5, "pct": 0.05},
|
||||
],
|
||||
}
|
||||
result = categorical_cardinality_block(cat, n_rows=100)
|
||||
# 0.95 (fraction) -> 95.0 (percentage)
|
||||
assert abs(result["mode_pct"] - 95.0) < 1e-12
|
||||
assert result["dominated"] is True
|
||||
|
||||
|
||||
def test_n_singletons_partial_when_top_truncated():
|
||||
"""n_distinct > len(top): n_singletons cubre solo el top visible, partial True."""
|
||||
cat = {
|
||||
"n_distinct": 10,
|
||||
"top": [
|
||||
{"value": "a", "count": 4, "pct": 0.4},
|
||||
{"value": "b", "count": 1, "pct": 0.1},
|
||||
{"value": "c", "count": 1, "pct": 0.1},
|
||||
],
|
||||
"entropy": 2.5,
|
||||
}
|
||||
result = categorical_cardinality_block(cat, n_rows=12)
|
||||
assert result["n_singletons"] == 2 # only b, c visible
|
||||
assert result["n_singletons_partial"] is True
|
||||
|
||||
|
||||
def test_single_distinct_value_entropy_norm_none():
|
||||
"""n_distinct=1: entropy_max=0.0 -> entropy_norm None (no division by zero)."""
|
||||
cat = {
|
||||
"n_distinct": 1,
|
||||
"entropy": 0.0,
|
||||
"mode": "only",
|
||||
"mode_pct": 1.0,
|
||||
"top": [{"value": "only", "count": 7, "pct": 1.0}],
|
||||
"imbalance": 1.0,
|
||||
}
|
||||
result = categorical_cardinality_block(cat, n_rows=7)
|
||||
assert result["entropy_max"] == 0.0
|
||||
assert result["entropy_norm"] is None
|
||||
assert result["n_singletons"] == 0
|
||||
@@ -0,0 +1,108 @@
|
||||
---
|
||||
id: categorical_top_pie_figure_py_datascience
|
||||
name: categorical_top_pie_figure
|
||||
kind: function
|
||||
lang: py
|
||||
domain: datascience
|
||||
version: "1.0.0"
|
||||
purity: impure
|
||||
signature: "def categorical_top_pie_figure(top: list, n_distinct: int = 0, title: str = \"\", top_k: int = 6, n_rows=None) -> \"matplotlib.figure.Figure\""
|
||||
description: "Construye una figura matplotlib tipo donut (pie con agujero central) de las top_k categorías más frecuentes de una columna categórica, agregando el resto en un sector gris \"Otros (N categorías)\". Consume el bloque `top` de summarize_categorical y devuelve un matplotlib.figure.Figure listo para rasterizar por el renderer del informe EDA. Backend Agg sin pyplot global; defensivo ante top vacío/None."
|
||||
tags: [eda, categorical, pie, donut, matplotlib, figure, visualization, datascience, impure]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: "error_go_core"
|
||||
imports: [matplotlib]
|
||||
example: |
|
||||
from categorical_top_pie_figure import categorical_top_pie_figure
|
||||
top = [
|
||||
{"value": "rojo", "count": 40, "pct": 0.4},
|
||||
{"value": "azul", "count": 30, "pct": 0.3},
|
||||
{"value": "verde", "count": 20, "pct": 0.2},
|
||||
]
|
||||
fig = categorical_top_pie_figure(top, n_distinct=12, title="color", top_k=6, n_rows=100)
|
||||
tested: true
|
||||
tests:
|
||||
- "test_returns_figure"
|
||||
- "test_ten_items_topk_six_yields_seven_wedges"
|
||||
- "test_empty_top_does_not_raise_and_returns_figure"
|
||||
- "test_long_value_truncated_in_legend"
|
||||
- "test_none_value_and_none_count_are_handled"
|
||||
- "test_n_rows_adds_exact_others_slice"
|
||||
test_file_path: "python/functions/datascience/categorical_top_pie_figure_test.py"
|
||||
file_path: "python/functions/datascience/categorical_top_pie_figure.py"
|
||||
params:
|
||||
- name: top
|
||||
desc: "Lista de dicts {value, count, pct} ordenada de mayor a menor por count (salida del bloque `top` de summarize_categorical). Puede venir vacía o con dicts incompletos: items no-dict, sin count, con count None o count <= 0 se descartan. value None se admite (sin etiqueta)."
|
||||
- name: n_distinct
|
||||
desc: "Nº total de categorías distintas de la columna. Etiqueta el sector agregado como \"Otros (n_distinct - top_k)\" (mínimo 0). Si no supera el nº de sectores mostrados, se usa el overflow real de `top` como nº de categorías agregadas. Default 0."
|
||||
- name: title
|
||||
desc: "Título de la figura (nombre de la columna). Se trunca a ~48 chars con elipsis si es muy largo. Default \"\" (sin título)."
|
||||
- name: top_k
|
||||
desc: "Nº máximo de sectores explícitos. Default 6. El sector \"Otros\" no cuenta contra este límite. Con top_k <= 0 se muestra al menos la categoría mayor."
|
||||
- name: n_rows
|
||||
desc: "Opcional. Total de filas del dataset. Si se da y la suma de counts mostrados < n_rows, el sector \"Otros\" usa (n_rows - suma_mostrada) como count para que los ángulos sean exactos respecto al total real. Si se omite, \"Otros\" usa la suma de counts fuera del top_k mostrado (solo cuando top trae más de top_k items). Default None."
|
||||
output: "Un matplotlib.figure.Figure (figsize 6.4x4.0, dpi 150) con un Axes donut (wedgeprops width 0.42) más una leyenda lateral con value truncado a 20 chars + count; el sector \"Otros\" en gris. Anotación central con el total n. Si no hay counts válidos, devuelve igualmente una Figure con un texto centrado \"sin datos categóricos\" (nunca lanza). El caller rasteriza/cierra la figura; la función no la muestra ni la guarda."
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
from categorical_top_pie_figure import categorical_top_pie_figure
|
||||
|
||||
# `top` es la salida del bloque "top" de summarize_categorical (ya ordenado desc).
|
||||
top = [
|
||||
{"value": "rojo", "count": 40, "pct": 0.40},
|
||||
{"value": "azul", "count": 30, "pct": 0.30},
|
||||
{"value": "verde", "count": 20, "pct": 0.20},
|
||||
{"value": "amarillo", "count": 5, "pct": 0.05},
|
||||
]
|
||||
|
||||
fig = categorical_top_pie_figure(
|
||||
top,
|
||||
n_distinct=12, # 12 categorías distintas en total
|
||||
title="color_producto",
|
||||
top_k=6, # hasta 6 sectores explícitos
|
||||
n_rows=100, # "Otros" = 100 - 95 = 5, sobre 8 categorías agregadas
|
||||
)
|
||||
|
||||
# El renderer del informe lo rasteriza; aquí solo persistimos para inspección.
|
||||
fig.savefig("/tmp/donut_color.png")
|
||||
```
|
||||
|
||||
## Cuando usarla
|
||||
|
||||
Úsala dentro de un informe EDA cuando quieras visualizar la composición de una
|
||||
columna categórica de un vistazo: cuántas filas caen en las categorías
|
||||
dominantes frente a la cola larga. Pásale directamente el bloque `top` de
|
||||
`summarize_categorical` (ya ordenado de mayor a menor) más `n_distinct` para que
|
||||
el sector "Otros" indique cuántas categorías quedan agrupadas. Es la pareja
|
||||
"composición" del gráfico de barras top-k: el donut comunica proporciones del
|
||||
total, las barras comunican magnitudes comparables.
|
||||
|
||||
## Gotchas
|
||||
|
||||
- **Impura por matplotlib.** Toca la maquinaria de render. Usa el backend `Agg`
|
||||
y la API orientada a objetos `Figure`/`add_subplot` — NUNCA `pyplot.*` aquí,
|
||||
para no tocar el estado global ni filtrar figuras entre llamadas. `pyplot` NO
|
||||
es thread-safe; esta función evita ese riesgo construyendo el `Figure`
|
||||
directamente, así que es segura de llamar en bucle desde el renderer.
|
||||
- **El caller cierra la figura.** La función devuelve el `Figure` pero no lo
|
||||
muestra ni lo guarda. Quien la consume debe rasterizarla y luego liberarla
|
||||
(`fig.clf()` / `matplotlib.pyplot.close(fig)` si se usó pyplot en el caller)
|
||||
para no acumular memoria en lotes grandes de columnas.
|
||||
- **Pie engaña con muchos sectores.** Por eso `top_k` por defecto es 6 y el
|
||||
resto se agrega en "Otros": donuts con 15+ sectores son ilegibles. Para
|
||||
cardinalidad muy alta el donut solo muestra la cabeza de la distribución; la
|
||||
cola vive en el sector gris.
|
||||
- **Ángulos exactos solo con `n_rows`.** Sin `n_rows`, el sector "Otros" se
|
||||
calcula con el overflow presente en `top`; si `top` ya viene recortado a
|
||||
`top_k` por el productor, no habrá "Otros" aunque existan más categorías. Pasa
|
||||
`n_rows` (total de filas del dataset) para ángulos correctos respecto al total
|
||||
real.
|
||||
- **Defensiva, nunca lanza.** `top=[]`, `value=None`, `count=None` o counts no
|
||||
numéricos se manejan sin error: en el peor caso devuelve una `Figure` con
|
||||
"sin datos categóricos". No envuelvas la llamada en try/except por miedo a un
|
||||
raise — no lo hay.
|
||||
@@ -0,0 +1,230 @@
|
||||
"""Impure EDA helper: donut figure of the most common categories (`eda` group).
|
||||
|
||||
Builds a matplotlib donut (pie with a central hole) of the ``top_k`` most
|
||||
frequent categories of a categorical column, folding everything else into a
|
||||
single "Otros (N categorías)" slice. Returns a ready-to-rasterize
|
||||
``matplotlib.figure.Figure``; it never shows nor saves it.
|
||||
|
||||
Impure because it touches matplotlib's rendering machinery. It uses the headless
|
||||
Agg backend and the object-oriented ``Figure`` API (no ``pyplot``) so it leaks no
|
||||
global state and is safe to call repeatedly from a report renderer.
|
||||
"""
|
||||
|
||||
import matplotlib
|
||||
|
||||
matplotlib.use("Agg")
|
||||
|
||||
from matplotlib.figure import Figure # noqa: E402
|
||||
|
||||
|
||||
# Gray reserved for the aggregated "Otros" slice.
|
||||
_OTHER_COLOR = "#9e9e9e"
|
||||
# Muted gray for secondary text (title fallback, center annotation, no-data).
|
||||
_MUTED_TEXT = "#5f6b7a"
|
||||
# Pleasant, colour-blind-friendly qualitative palette for the explicit slices.
|
||||
_PALETTE = [
|
||||
"#4C72B0",
|
||||
"#DD8452",
|
||||
"#55A868",
|
||||
"#C44E52",
|
||||
"#8172B3",
|
||||
"#937860",
|
||||
"#DA8BC3",
|
||||
"#8C8C8C",
|
||||
"#CCB974",
|
||||
"#64B5CD",
|
||||
]
|
||||
|
||||
|
||||
def _truncate(text, width: int = 20) -> str:
|
||||
"""Truncate ``text`` to ``width`` chars, appending an ellipsis if cut."""
|
||||
s = "" if text is None else str(text)
|
||||
if len(s) <= width:
|
||||
return s
|
||||
if width <= 1:
|
||||
return s[:width]
|
||||
return s[: width - 1] + "…"
|
||||
|
||||
|
||||
def categorical_top_pie_figure(
|
||||
top: list,
|
||||
n_distinct: int = 0,
|
||||
title: str = "",
|
||||
top_k: int = 6,
|
||||
n_rows=None,
|
||||
) -> "matplotlib.figure.Figure":
|
||||
"""Build a donut figure of the most common categories of a column.
|
||||
|
||||
Renders the ``top_k`` most frequent categories as explicit donut slices and
|
||||
aggregates every remaining category into a single gray "Otros (N
|
||||
categorías)" slice. Category names are not painted on the wedges; they are
|
||||
listed in a lateral legend (truncated value + count) to avoid overlap on
|
||||
narrow (mobile) figures.
|
||||
|
||||
The function is fully defensive: empty input, missing/``None`` values or
|
||||
counts never raise. When there is nothing valid to draw it still returns a
|
||||
``Figure`` carrying a centered "sin datos categóricos" message.
|
||||
|
||||
Args:
|
||||
top: List of ``{value, count, pct}`` dicts, already sorted by ``count``
|
||||
descending (the ``top`` block of ``summarize_categorical``). May be
|
||||
empty or carry incomplete/``None`` entries; non-dict items, items
|
||||
without a positive numeric ``count`` and ``None`` counts are skipped.
|
||||
n_distinct: Total number of distinct categories in the column. Used to
|
||||
label the aggregated slice as "Otros (n_distinct - top_k)" (floored
|
||||
at 0). Ignored when it does not exceed the number of shown slices.
|
||||
title: Figure title (the column name). Truncated when too long.
|
||||
top_k: Maximum number of explicit slices. Default 6. The "Otros" slice
|
||||
does not count against this limit.
|
||||
n_rows: Optional total row count of the dataset. When given and the sum
|
||||
of shown counts is below ``n_rows``, the "Otros" slice uses
|
||||
``n_rows - sum_shown`` as its count so the wedge angles are exact
|
||||
with respect to the real total. When omitted, "Otros" uses the sum
|
||||
of the counts that fall outside the shown ``top_k`` (only when
|
||||
``top`` carries more than ``top_k`` items).
|
||||
|
||||
Returns:
|
||||
A ``matplotlib.figure.Figure`` with a single donut Axes plus a lateral
|
||||
legend. The caller is responsible for rasterizing/closing it.
|
||||
"""
|
||||
fig = Figure(figsize=(6.4, 4.0), dpi=150)
|
||||
ax = fig.add_subplot(111)
|
||||
|
||||
safe_title = _truncate(title, 48)
|
||||
|
||||
# --- Defensive parse: keep only well-formed {value, count} with count > 0.
|
||||
cleaned = []
|
||||
if isinstance(top, list):
|
||||
for item in top:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
count = item.get("count")
|
||||
if count is None:
|
||||
continue
|
||||
try:
|
||||
count = float(count)
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
if count <= 0:
|
||||
continue
|
||||
cleaned.append((item.get("value"), count))
|
||||
|
||||
if not cleaned:
|
||||
ax.axis("off")
|
||||
ax.text(
|
||||
0.5,
|
||||
0.5,
|
||||
"sin datos categóricos",
|
||||
ha="center",
|
||||
va="center",
|
||||
fontsize=12,
|
||||
color=_MUTED_TEXT,
|
||||
transform=ax.transAxes,
|
||||
)
|
||||
if safe_title:
|
||||
ax.set_title(safe_title, fontsize=12, loc="center", pad=8)
|
||||
fig.tight_layout()
|
||||
return fig
|
||||
|
||||
# --- Split into shown slices and the aggregated remainder.
|
||||
shown = cleaned[: max(int(top_k), 0)]
|
||||
if not shown: # top_k <= 0 — show at least the largest category.
|
||||
shown = cleaned[:1]
|
||||
|
||||
sum_shown = sum(c for _, c in shown)
|
||||
overflow_count = sum(c for _, c in cleaned[len(shown):])
|
||||
|
||||
# How many categories are folded into "Otros".
|
||||
try:
|
||||
nd = int(n_distinct)
|
||||
except (TypeError, ValueError):
|
||||
nd = 0
|
||||
others_categories = max(nd - len(shown), 0)
|
||||
# If n_distinct is unknown/too small, fall back to the overflow we actually
|
||||
# have in `top` beyond the shown slices.
|
||||
overflow_items = len(cleaned) - len(shown)
|
||||
if others_categories == 0 and overflow_items > 0:
|
||||
others_categories = overflow_items
|
||||
|
||||
# Count attributed to the "Otros" slice for exact angles.
|
||||
others_count = 0.0
|
||||
if n_rows is not None:
|
||||
try:
|
||||
total_rows = float(n_rows)
|
||||
except (TypeError, ValueError):
|
||||
total_rows = None
|
||||
if total_rows is not None and total_rows > sum_shown:
|
||||
others_count = total_rows - sum_shown
|
||||
if others_count <= 0:
|
||||
others_count = overflow_count
|
||||
|
||||
labels = [v for v, _ in shown]
|
||||
values = [c for _, c in shown]
|
||||
colors = [_PALETTE[i % len(_PALETTE)] for i in range(len(shown))]
|
||||
|
||||
has_others = others_count > 0 and others_categories > 0
|
||||
if has_others:
|
||||
values.append(others_count)
|
||||
labels.append("Otros")
|
||||
colors.append(_OTHER_COLOR)
|
||||
|
||||
total = sum(values)
|
||||
|
||||
def _autopct(pct: float) -> str:
|
||||
# Hide tiny labels to avoid crowding the wedges.
|
||||
return f"{pct:.0f}%" if pct >= 5 else ""
|
||||
|
||||
wedges, _texts, autotexts = ax.pie(
|
||||
values,
|
||||
colors=colors,
|
||||
startangle=90,
|
||||
counterclock=False,
|
||||
wedgeprops={"width": 0.42, "edgecolor": "white", "linewidth": 1.0},
|
||||
autopct=_autopct,
|
||||
pctdistance=0.79,
|
||||
textprops={"fontsize": 8},
|
||||
)
|
||||
for at in autotexts:
|
||||
at.set_color("white")
|
||||
at.set_fontweight("bold")
|
||||
ax.set_aspect("equal")
|
||||
|
||||
# --- Lateral legend: truncated value + count (+ "(N categorías)" for Otros).
|
||||
legend_labels = []
|
||||
for idx, (lab, val) in enumerate(zip(labels, values)):
|
||||
if has_others and idx == len(labels) - 1:
|
||||
legend_labels.append(
|
||||
f"Otros ({others_categories} categorías) — {int(round(val))}"
|
||||
)
|
||||
else:
|
||||
legend_labels.append(f"{_truncate(lab, 20)} — {int(round(val))}")
|
||||
|
||||
ax.legend(
|
||||
wedges,
|
||||
legend_labels,
|
||||
title="Categorías",
|
||||
loc="center left",
|
||||
bbox_to_anchor=(1.02, 0.5),
|
||||
fontsize=8,
|
||||
title_fontsize=9,
|
||||
frameon=False,
|
||||
)
|
||||
|
||||
if safe_title:
|
||||
ax.set_title(safe_title, fontsize=13, loc="left", pad=10)
|
||||
|
||||
# Center annotation: total count covered by the donut.
|
||||
ax.text(
|
||||
0,
|
||||
0,
|
||||
f"n={int(round(total))}",
|
||||
ha="center",
|
||||
va="center",
|
||||
fontsize=11,
|
||||
color=_MUTED_TEXT,
|
||||
fontweight="bold",
|
||||
)
|
||||
|
||||
# Leave room on the right for the legend (avoid clipping it).
|
||||
fig.subplots_adjust(left=0.02, right=0.62, top=0.88, bottom=0.06)
|
||||
return fig
|
||||
@@ -0,0 +1,104 @@
|
||||
"""Tests para categorical_top_pie_figure (donut de categorías top, grupo eda).
|
||||
|
||||
Usa el backend Agg sin pyplot; no muestra ni guarda figuras. Cada test cierra
|
||||
explícitamente la Figure construida (matplotlib.pyplot.close) para no acumular
|
||||
estado entre tests.
|
||||
"""
|
||||
|
||||
import matplotlib
|
||||
|
||||
matplotlib.use("Agg")
|
||||
|
||||
import matplotlib.pyplot as plt # noqa: E402
|
||||
from matplotlib.figure import Figure # noqa: E402
|
||||
|
||||
from categorical_top_pie_figure import categorical_top_pie_figure
|
||||
|
||||
|
||||
def _make_top(n):
|
||||
"""n items {value, count, pct} ordenados desc por count."""
|
||||
return [
|
||||
{"value": f"cat_{i}", "count": n - i, "pct": (n - i) / sum(range(1, n + 1))}
|
||||
for i in range(n)
|
||||
]
|
||||
|
||||
|
||||
def _wedges(ax):
|
||||
"""Devuelve los wedges (sectores) de un Axes con un pie."""
|
||||
from matplotlib.patches import Wedge
|
||||
|
||||
return [p for p in ax.patches if isinstance(p, Wedge)]
|
||||
|
||||
|
||||
def test_returns_figure():
|
||||
fig = categorical_top_pie_figure(_make_top(3), n_distinct=3, title="col")
|
||||
assert isinstance(fig, Figure)
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
def test_ten_items_topk_six_yields_seven_wedges():
|
||||
top = _make_top(10)
|
||||
fig = categorical_top_pie_figure(top, n_distinct=10, title="muchas", top_k=6)
|
||||
ax = fig.axes[0]
|
||||
wedges = _wedges(ax)
|
||||
# 6 categorías explícitas + 1 sector "Otros".
|
||||
assert len(wedges) == 7
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
def test_empty_top_does_not_raise_and_returns_figure():
|
||||
fig = categorical_top_pie_figure([], n_distinct=0, title="vacía")
|
||||
assert isinstance(fig, Figure)
|
||||
# Sin datos: no debe haber sectores de pie.
|
||||
assert len(_wedges(fig.axes[0])) == 0
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
def test_long_value_truncated_in_legend():
|
||||
long_value = "una_categoria_con_un_nombre_larguisimo_que_excede_el_limite"
|
||||
top = [
|
||||
{"value": long_value, "count": 10, "pct": 0.5},
|
||||
{"value": "corta", "count": 10, "pct": 0.5},
|
||||
]
|
||||
fig = categorical_top_pie_figure(top, n_distinct=2, title="col", top_k=6)
|
||||
ax = fig.axes[0]
|
||||
legend = ax.get_legend()
|
||||
assert legend is not None
|
||||
texts = [t.get_text() for t in legend.get_texts()]
|
||||
# El valor largo aparece truncado con elipsis y NO en su forma completa.
|
||||
assert any("…" in t for t in texts)
|
||||
assert long_value not in " ".join(texts)
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
def test_none_value_and_none_count_are_handled():
|
||||
top = [
|
||||
{"value": None, "count": 5, "pct": 0.5},
|
||||
{"value": "b", "count": None, "pct": 0.0}, # count None -> se descarta
|
||||
{"value": "c", "count": 5, "pct": 0.5},
|
||||
]
|
||||
fig = categorical_top_pie_figure(top, n_distinct=2, title="con nones", top_k=6)
|
||||
assert isinstance(fig, Figure)
|
||||
# Solo 2 items válidos, sin overflow -> 2 wedges, sin "Otros".
|
||||
assert len(_wedges(fig.axes[0])) == 2
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
def test_n_rows_adds_exact_others_slice():
|
||||
# 3 categorías mostradas suman 30, dataset real 100 -> "Otros" = 70.
|
||||
top = _make_top(3) # counts 3,2,1 -> reescalamos abajo
|
||||
top = [
|
||||
{"value": "a", "count": 15, "pct": 0.15},
|
||||
{"value": "b", "count": 10, "pct": 0.10},
|
||||
{"value": "c", "count": 5, "pct": 0.05},
|
||||
]
|
||||
fig = categorical_top_pie_figure(
|
||||
top, n_distinct=20, title="col", top_k=3, n_rows=100
|
||||
)
|
||||
ax = fig.axes[0]
|
||||
# 3 explícitas + Otros.
|
||||
assert len(_wedges(ax)) == 4
|
||||
legend_texts = [t.get_text() for t in ax.get_legend().get_texts()]
|
||||
# El sector Otros refleja n_distinct - top_k = 17 categorías y count 70.
|
||||
assert any("Otros (17 categorías)" in t and "70" in t for t in legend_texts)
|
||||
plt.close(fig)
|
||||
@@ -1,97 +0,0 @@
|
||||
---
|
||||
name: describe_clusters_llm
|
||||
kind: function
|
||||
lang: py
|
||||
domain: datascience
|
||||
version: "1.0.0"
|
||||
purity: impure
|
||||
signature: "def describe_clusters_llm(cluster_profiles: list, feature_names: list, model: str = \"claude-haiku-4-5-20251001\") -> dict"
|
||||
description: "Micro-analisis LLM de clusters de KMeans (grupo eda). Toma los perfiles AGREGADOS de cada cluster (los que produce project_clusters_2d: tamano, centroide en escala original, features distintivas y centroide en z-score) y, con UNA sola llamada al LLM, pide por cada cluster un TITULO corto + una descripcion de 1-2 frases en espanol. Clave de coste/privacidad: NO envia filas crudas, solo el resumen agregado de cada grupo (tamano, % del total y la media de las features distintivas con su signo respecto a la media global). Reusa ask_llm del grupo claude-direct (API directa con token OAuth de Claude). Impura, dict-no-throw: nunca lanza, degrada a titulos genericos 'Cluster N' si el LLM no responde o el parseo falla."
|
||||
tags: [eda, clustering, llm, claude-direct, datascience, kmeans]
|
||||
params:
|
||||
- name: cluster_profiles
|
||||
desc: "Lista de perfiles de cluster con la forma que produce project_clusters_2d: cada uno {cluster:int, size:int, pct:float, centroid_original:{feature: media en escala original}, distinctive:[features distintivas], centroid_z:{feature: z-score}}. Solo se le envia al LLM un resumen agregado; nunca filas crudas. Lista vacia o no-lista -> clusters=[] sin llamar al LLM."
|
||||
- name: feature_names
|
||||
desc: "Nombres de las features del dataset. Se incluyen como contexto en el prompt para que el LLM pueda nombrar los clusters; no es obligatorio que coincida con las features distintivas de cada perfil."
|
||||
- name: model
|
||||
desc: "id del modelo Anthropic a usar. Default 'claude-haiku-4-5-20251001' (haiku, coste bajo, ~2-3s). Para titulos/descripciones mas finas, pasar p.ej. 'claude-opus-4-8'."
|
||||
output: "dict dict-no-throw: {clusters:[{cluster:int, title:str, description:str}], model:str, note:str}. note=='' si todo fue bien. Si el LLM no respondio (note='LLM no disponible') o el parseo fallo (note='parse fallido'), clusters trae titulos genericos 'Cluster N' con description vacia. Si cluster_profiles esta vacio o no es lista: {clusters:[], model, note:'sin clusters'}. NUNCA lanza."
|
||||
uses_functions: [ask_llm_py_core]
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: "error_go_core"
|
||||
imports: []
|
||||
tested: true
|
||||
tests: ["test_parse_clusters_json_valid_array", "test_parse_clusters_json_wrapped_in_junk_text", "test_parse_clusters_json_non_json_returns_none", "test_parse_clusters_json_fills_missing_cluster_by_index", "test_describe_clusters_llm_ok_with_monkeypatched_llm", "test_describe_clusters_llm_degrades_on_empty_response", "test_describe_clusters_llm_degrades_on_unparseable_response", "test_describe_clusters_llm_empty_list_skips_llm", "test_describe_clusters_llm_non_list_input_skips_llm"]
|
||||
test_file_path: "python/functions/datascience/describe_clusters_llm_test.py"
|
||||
file_path: "python/functions/datascience/describe_clusters_llm.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
import sys, os
|
||||
sys.path.insert(0, os.path.join("python", "functions"))
|
||||
|
||||
from datascience.describe_clusters_llm import describe_clusters_llm
|
||||
|
||||
# Perfiles agregados producidos por project_clusters_2d (no hay filas crudas).
|
||||
cluster_profiles = [
|
||||
{
|
||||
"cluster": 0, "size": 60, "pct": 60.0,
|
||||
"centroid_original": {"acidez": 8.5, "alcohol": 9.2},
|
||||
"distinctive": ["acidez", "alcohol"],
|
||||
"centroid_z": {"acidez": 1.4, "alcohol": -0.9},
|
||||
},
|
||||
{
|
||||
"cluster": 1, "size": 40, "pct": 40.0,
|
||||
"centroid_original": {"acidez": 5.1, "alcohol": 13.0},
|
||||
"distinctive": ["alcohol"],
|
||||
"centroid_z": {"acidez": -0.7, "alcohol": 1.6},
|
||||
},
|
||||
]
|
||||
feature_names = ["acidez", "alcohol", "azucar"]
|
||||
|
||||
out = describe_clusters_llm(cluster_profiles, feature_names) # haiku por defecto
|
||||
# out = describe_clusters_llm(cluster_profiles, feature_names, model="claude-opus-4-8")
|
||||
|
||||
if not out["note"]:
|
||||
for c in out["clusters"]:
|
||||
print(f"Cluster {c['cluster']}: {c['title']}")
|
||||
print(" ", c["description"])
|
||||
else:
|
||||
# Degradacion: titulos genericos "Cluster N".
|
||||
print("LLM no usado:", out["note"])
|
||||
for c in out["clusters"]:
|
||||
print(c["cluster"], c["title"])
|
||||
```
|
||||
|
||||
## Cuando usarla
|
||||
|
||||
Cuando ya has clusterizado un dataset (KMeans + `project_clusters_2d`) y quieres
|
||||
poner NOMBRE y descripcion legible a cada grupo en vez de dejar "Cluster 0/1/2".
|
||||
Es el paso interpretativo que sigue al perfilado de clusters: `project_clusters_2d`
|
||||
calcula tamano, centroides y features distintivas, y `describe_clusters_llm` los
|
||||
traduce a un titulo corto + 1-2 frases por cluster. Usala al cerrar un EDA con
|
||||
segmentacion para el resumen final o el report. Una sola llamada al LLM describe
|
||||
todos los clusters a la vez (barato).
|
||||
|
||||
## Gotchas
|
||||
|
||||
- **Impura: hace 1 llamada de red al LLM.** No es determinista ni gratis. Latencia
|
||||
tipica ~2-3s con haiku.
|
||||
- **Requiere token OAuth de Claude** en `~/.claude/.credentials.json` (via `ask_llm`
|
||||
/ grupo `claude-direct`). Sin token / sin red, NO lanza: degrada a titulos
|
||||
genericos `Cluster N` con `note="LLM no disponible"`.
|
||||
- **NO envia filas crudas al LLM**, solo el resumen AGREGADO de cada cluster
|
||||
(tamano, % del total y la media de las features distintivas con su signo respecto
|
||||
a la media global). Privacidad y coste minimos por diseno — pero requiere que los
|
||||
perfiles vengan ya calculados por `project_clusters_2d`.
|
||||
- **Modelo `haiku` por defecto** para coste bajo; sube a `claude-opus-4-8` si
|
||||
necesitas titulos/descripciones mas finas (mas caro y lento).
|
||||
- **dict-no-throw**: si el modelo no devuelve un JSON array parseable, retorna
|
||||
titulos genericos con `note="parse fallido"`. Comprueba siempre `out["note"]`
|
||||
antes de fiarte de los titulos.
|
||||
- El LLM puede sobre-interpretar: el system prompt le pide ser sobrio y no inventar
|
||||
causas, pero revisa los titulos antes de publicarlos en un report.
|
||||
@@ -1,240 +0,0 @@
|
||||
"""describe_clusters_llm — micro-analisis LLM de clusters de KMeans (grupo `eda`).
|
||||
|
||||
Toma los PERFILES AGREGADOS de cada cluster (los que produce `project_clusters_2d`:
|
||||
tamano, centroide en escala original, features distintivas y centroide en z-score)
|
||||
y, con UNA sola llamada al LLM, pide por cada cluster un TITULO corto + una
|
||||
descripcion de 1-2 frases, en espanol.
|
||||
|
||||
Clave de coste y privacidad: NO se envian filas crudas al LLM. Solo viaja el
|
||||
perfil AGREGADO de cada grupo (tamano, % del total y la media de las features
|
||||
distintivas con su signo respecto a la media global). El coste es minimo y ningun
|
||||
dato fila-a-fila sale del proceso.
|
||||
|
||||
Reusa `ask_llm` del registry (grupo claude-direct, API directa con el token OAuth
|
||||
de Claude en ~/.claude/.credentials.json, arranque 0). Impura: una llamada de red.
|
||||
Estilo dict-no-throw: NUNCA lanza; ante cualquier fallo (red, LLM caido, parseo)
|
||||
degrada a titulos genericos "Cluster N" + una nota explicando el motivo.
|
||||
"""
|
||||
|
||||
import json
|
||||
|
||||
from core.ask_llm import ask_llm
|
||||
|
||||
_SYSTEM = (
|
||||
"Eres un analista de datos. Recibes los PERFILES AGREGADOS de los clusters de "
|
||||
"un KMeans (por cada grupo: su tamano y la media de sus features distintivas, "
|
||||
"con el signo respecto a la media global; nunca filas crudas) y los describes "
|
||||
"de forma sobria y util. Para cada cluster generas un titulo corto y "
|
||||
"descriptivo (por ejemplo 'Vinos de alta acidez y baja graduacion') y una "
|
||||
"descripcion de 1-2 frases. NO inventes causas ni sobre-interpretes: limitate a "
|
||||
"lo que dicen los numeros. Responde en espanol. Responde SIEMPRE y SOLO con un "
|
||||
"unico JSON array valido, sin texto alrededor y sin fences de markdown, con "
|
||||
'EXACTAMENTE la forma [{"cluster": <int>, "title": "<titulo corto>", '
|
||||
'"description": "<1-2 frases>"}], un objeto por cluster.'
|
||||
)
|
||||
|
||||
|
||||
def _fmt_num(value) -> str:
|
||||
"""Formatea un numero de forma compacta para el prompt (None -> '?')."""
|
||||
if value is None:
|
||||
return "?"
|
||||
if isinstance(value, bool):
|
||||
return str(value)
|
||||
if isinstance(value, float):
|
||||
if value == int(value):
|
||||
return str(int(value))
|
||||
return f"{value:.4g}"
|
||||
return str(value)
|
||||
|
||||
|
||||
def _cluster_id(profile: dict, index: int) -> int:
|
||||
"""Devuelve el id del cluster del perfil, o el indice si no es un int valido."""
|
||||
raw = (profile or {}).get("cluster")
|
||||
if isinstance(raw, bool):
|
||||
return index
|
||||
if isinstance(raw, int):
|
||||
return raw
|
||||
try:
|
||||
return int(raw)
|
||||
except (TypeError, ValueError):
|
||||
return index
|
||||
|
||||
|
||||
def _build_prompt(cluster_profiles: list, feature_names: list) -> str:
|
||||
"""Construye un resumen textual compacto de los perfiles para el LLM.
|
||||
|
||||
Funcion interna PURA: no toca red ni disco, es testeable sin credenciales.
|
||||
Por cada cluster incluye su numero, tamano (size + pct%) y, para cada feature
|
||||
distintiva, el valor del centroide en escala original mas si esta por encima o
|
||||
por debajo de la media (signo del z-score en centroid_z). Pasa AGREGADOS, nunca
|
||||
dato crudo de filas.
|
||||
|
||||
Args:
|
||||
cluster_profiles: lista de perfiles de cluster (forma de project_clusters_2d).
|
||||
feature_names: nombres de las features del dataset (solo contexto).
|
||||
|
||||
Returns:
|
||||
El texto del prompt.
|
||||
"""
|
||||
cluster_profiles = cluster_profiles or []
|
||||
feature_names = feature_names if isinstance(feature_names, list) else []
|
||||
|
||||
lines = [
|
||||
"Perfiles AGREGADOS de clusters de KMeans. No hay filas crudas, solo medias por grupo.",
|
||||
f"Numero de clusters: {len(cluster_profiles)}",
|
||||
]
|
||||
if feature_names:
|
||||
lines.append("Features del dataset: " + ", ".join(str(f) for f in feature_names))
|
||||
lines.append("")
|
||||
|
||||
for i, prof in enumerate(cluster_profiles):
|
||||
prof = prof or {}
|
||||
cid = _cluster_id(prof, i)
|
||||
size = prof.get("size")
|
||||
pct = prof.get("pct")
|
||||
pct_str = f"{pct:.1f}%" if isinstance(pct, (int, float)) and not isinstance(pct, bool) else "?"
|
||||
lines.append(f"Cluster {cid}: tamano={_fmt_num(size)} ({pct_str} del total)")
|
||||
|
||||
distinctive = prof.get("distinctive") or []
|
||||
centroid_o = prof.get("centroid_original") or {}
|
||||
centroid_z = prof.get("centroid_z") or {}
|
||||
|
||||
if distinctive:
|
||||
lines.append(" Features distintivas (media del grupo):")
|
||||
for feat in distinctive:
|
||||
val = centroid_o.get(feat)
|
||||
z = centroid_z.get(feat)
|
||||
direction = ""
|
||||
if isinstance(z, (int, float)) and not isinstance(z, bool):
|
||||
if z > 0:
|
||||
direction = "por encima de la media"
|
||||
elif z < 0:
|
||||
direction = "por debajo de la media"
|
||||
else:
|
||||
direction = "en la media"
|
||||
if direction:
|
||||
lines.append(f" - {feat}: {_fmt_num(val)} ({direction})")
|
||||
else:
|
||||
lines.append(f" - {feat}: {_fmt_num(val)}")
|
||||
else:
|
||||
lines.append(" (sin features distintivas marcadas)")
|
||||
lines.append("")
|
||||
|
||||
lines.append(
|
||||
"Devuelve SOLO el JSON array descrito en las instrucciones del sistema, "
|
||||
"sin texto antes ni despues."
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _parse_clusters_json(text: str, n: int):
|
||||
"""Extrae y normaliza el array JSON de la respuesta del LLM.
|
||||
|
||||
Funcion interna testeable sin red. Localiza el primer '[' y el ultimo ']' del
|
||||
texto (tolerando texto basura alrededor o fences de markdown), hace json.loads
|
||||
y normaliza cada entrada a {cluster:int, title:str, description:str}, rellenando
|
||||
el cluster por indice si falta. NUNCA lanza: ante cualquier fallo devuelve None
|
||||
(senal de degradacion para el caller).
|
||||
|
||||
Args:
|
||||
text: respuesta cruda del LLM.
|
||||
n: numero de perfiles esperados (referencia; la longitud real la marca el array).
|
||||
|
||||
Returns:
|
||||
Lista normalizada de dicts, o None si no se pudo parsear un array valido.
|
||||
"""
|
||||
if not text or not isinstance(text, str):
|
||||
return None
|
||||
|
||||
start = text.find("[")
|
||||
end = text.rfind("]")
|
||||
if start == -1 or end == -1 or end <= start:
|
||||
return None
|
||||
|
||||
try:
|
||||
data = json.loads(text[start : end + 1])
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
if not isinstance(data, list):
|
||||
return None
|
||||
|
||||
out = []
|
||||
for i, item in enumerate(data):
|
||||
if not isinstance(item, dict):
|
||||
out.append({"cluster": i, "title": f"Cluster {i}", "description": ""})
|
||||
continue
|
||||
|
||||
raw_cluster = item.get("cluster")
|
||||
if isinstance(raw_cluster, bool):
|
||||
cluster = i
|
||||
elif isinstance(raw_cluster, int):
|
||||
cluster = raw_cluster
|
||||
else:
|
||||
try:
|
||||
cluster = int(raw_cluster)
|
||||
except (TypeError, ValueError):
|
||||
cluster = i
|
||||
|
||||
title = item.get("title")
|
||||
title = str(title) if title is not None else f"Cluster {cluster}"
|
||||
|
||||
desc = item.get("description")
|
||||
desc = str(desc) if desc is not None else ""
|
||||
|
||||
out.append({"cluster": cluster, "title": title, "description": desc})
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def _generic_clusters(cluster_profiles: list) -> list:
|
||||
"""Titulos genericos por cluster para la degradacion (sin LLM)."""
|
||||
out = []
|
||||
for i, prof in enumerate(cluster_profiles):
|
||||
cid = _cluster_id(prof or {}, i)
|
||||
out.append({"cluster": cid, "title": f"Cluster {cid}", "description": ""})
|
||||
return out
|
||||
|
||||
|
||||
def describe_clusters_llm(
|
||||
cluster_profiles: list,
|
||||
feature_names: list,
|
||||
model: str = "claude-haiku-4-5-20251001",
|
||||
) -> dict:
|
||||
"""Describe los clusters de un KMeans con UNA sola llamada al LLM.
|
||||
|
||||
Args:
|
||||
cluster_profiles: lista de perfiles de cluster (la forma que produce
|
||||
project_clusters_2d): cada uno {"cluster": int, "size": int,
|
||||
"pct": float, "centroid_original": {feature: media},
|
||||
"distinctive": [features], "centroid_z": {feature: z}}. Solo se le
|
||||
envia al LLM el resumen agregado, nunca filas crudas.
|
||||
feature_names: nombres de las features del dataset (contexto para el LLM).
|
||||
model: id del modelo Anthropic. Default claude-haiku-4-5-20251001
|
||||
(haiku, coste bajo).
|
||||
|
||||
Returns:
|
||||
dict dict-no-throw: {"clusters": [{cluster:int, title:str, description:str}],
|
||||
"model": str, "note": str}. note == "" si todo fue bien; si el LLM no
|
||||
respondio o el parseo fallo, clusters trae titulos genericos "Cluster N" y
|
||||
note explica el motivo ("LLM no disponible" / "parse fallido"). Si
|
||||
cluster_profiles esta vacio o no es lista, devuelve clusters=[] sin llamar
|
||||
al LLM (note "sin clusters"). NUNCA lanza.
|
||||
"""
|
||||
if not isinstance(cluster_profiles, list) or not cluster_profiles:
|
||||
return {"clusters": [], "model": model, "note": "sin clusters"}
|
||||
|
||||
n = len(cluster_profiles)
|
||||
prompt = _build_prompt(cluster_profiles, feature_names)
|
||||
|
||||
try:
|
||||
text = ask_llm(prompt, model=model, system=_SYSTEM, echo=False)
|
||||
except Exception: # noqa: BLE001 — degradacion: cualquier fallo de red/LLM.
|
||||
text = ""
|
||||
|
||||
parsed = _parse_clusters_json(text, n)
|
||||
if parsed:
|
||||
return {"clusters": parsed, "model": model, "note": ""}
|
||||
|
||||
note = "LLM no disponible" if not text else "parse fallido"
|
||||
return {"clusters": _generic_clusters(cluster_profiles), "model": model, "note": note}
|
||||
@@ -1,160 +0,0 @@
|
||||
"""Tests para describe_clusters_llm.
|
||||
|
||||
NO acceden a red ni a credenciales: _parse_clusters_json es testeable aislada y la
|
||||
unica via que llamaria al LLM (describe_clusters_llm) se prueba monkeypatcheando
|
||||
ask_llm con respuestas simuladas. Cubre golden (LLM ok), edge (cluster faltante,
|
||||
array envuelto en basura, lista vacia / input no-lista) y error (LLM caido, texto
|
||||
no parseable) — todos sin tocar la red.
|
||||
"""
|
||||
|
||||
import importlib
|
||||
import json
|
||||
|
||||
from datascience.describe_clusters_llm import (
|
||||
_parse_clusters_json,
|
||||
describe_clusters_llm,
|
||||
)
|
||||
|
||||
# Perfiles de ejemplo con la forma que produce project_clusters_2d.
|
||||
_PROFILES = [
|
||||
{
|
||||
"cluster": 0,
|
||||
"size": 60,
|
||||
"pct": 60.0,
|
||||
"centroid_original": {"acidez": 8.5, "alcohol": 9.2},
|
||||
"distinctive": ["acidez", "alcohol"],
|
||||
"centroid_z": {"acidez": 1.4, "alcohol": -0.9},
|
||||
},
|
||||
{
|
||||
"cluster": 1,
|
||||
"size": 40,
|
||||
"pct": 40.0,
|
||||
"centroid_original": {"acidez": 5.1, "alcohol": 13.0},
|
||||
"distinctive": ["alcohol"],
|
||||
"centroid_z": {"acidez": -0.7, "alcohol": 1.6},
|
||||
},
|
||||
]
|
||||
_FEATURES = ["acidez", "alcohol", "azucar"]
|
||||
|
||||
|
||||
def _patch_ask_llm(monkeypatch, returner):
|
||||
"""Monkeypatchea ask_llm en el modulo bajo prueba con un callable simulado."""
|
||||
mod = importlib.import_module("datascience.describe_clusters_llm")
|
||||
monkeypatch.setattr(
|
||||
mod, "ask_llm", lambda prompt, model="x", system="", echo=True: returner
|
||||
)
|
||||
|
||||
|
||||
# --- _parse_clusters_json (parser puro, sin red) ---
|
||||
|
||||
|
||||
def test_parse_clusters_json_valid_array():
|
||||
text = json.dumps(
|
||||
[
|
||||
{"cluster": 0, "title": "A", "description": "desc a"},
|
||||
{"cluster": 1, "title": "B", "description": "desc b"},
|
||||
]
|
||||
)
|
||||
parsed = _parse_clusters_json(text, 2)
|
||||
assert parsed == [
|
||||
{"cluster": 0, "title": "A", "description": "desc a"},
|
||||
{"cluster": 1, "title": "B", "description": "desc b"},
|
||||
]
|
||||
|
||||
|
||||
def test_parse_clusters_json_wrapped_in_junk_text():
|
||||
payload = [{"cluster": 0, "title": "Solo uno", "description": "d"}]
|
||||
text = "Claro, aqui tienes el resultado:\n" + json.dumps(payload) + "\nEspero que sirva."
|
||||
parsed = _parse_clusters_json(text, 1)
|
||||
assert parsed[0]["title"] == "Solo uno"
|
||||
assert parsed[0]["cluster"] == 0
|
||||
|
||||
|
||||
def test_parse_clusters_json_non_json_returns_none():
|
||||
# Texto sin array JSON -> degradacion (None) sin lanzar.
|
||||
assert _parse_clusters_json("no hay json aqui", 2) is None
|
||||
assert _parse_clusters_json("", 2) is None
|
||||
assert _parse_clusters_json("{solo un objeto}", 2) is None
|
||||
|
||||
|
||||
def test_parse_clusters_json_fills_missing_cluster_by_index():
|
||||
text = json.dumps(
|
||||
[
|
||||
{"title": "A", "description": "d"},
|
||||
{"title": "B", "description": "e"},
|
||||
]
|
||||
)
|
||||
parsed = _parse_clusters_json(text, 2)
|
||||
assert parsed[0]["cluster"] == 0
|
||||
assert parsed[1]["cluster"] == 1
|
||||
assert parsed[0]["title"] == "A"
|
||||
|
||||
|
||||
# --- describe_clusters_llm (con ask_llm monkeypatcheado, sin red) ---
|
||||
|
||||
|
||||
def test_describe_clusters_llm_ok_with_monkeypatched_llm(monkeypatch):
|
||||
fake = json.dumps(
|
||||
[
|
||||
{
|
||||
"cluster": 0,
|
||||
"title": "Vinos de alta acidez",
|
||||
"description": "Acidez por encima de la media y graduacion baja.",
|
||||
},
|
||||
{
|
||||
"cluster": 1,
|
||||
"title": "Vinos de alta graduacion",
|
||||
"description": "Alcohol claramente por encima de la media.",
|
||||
},
|
||||
]
|
||||
)
|
||||
_patch_ask_llm(monkeypatch, fake)
|
||||
|
||||
out = describe_clusters_llm(_PROFILES, _FEATURES)
|
||||
assert out["note"] == ""
|
||||
assert out["model"] == "claude-haiku-4-5-20251001"
|
||||
assert len(out["clusters"]) == 2
|
||||
assert out["clusters"][0]["title"] == "Vinos de alta acidez"
|
||||
assert set(out["clusters"][0].keys()) == {"cluster", "title", "description"}
|
||||
|
||||
|
||||
def test_describe_clusters_llm_degrades_on_empty_response(monkeypatch):
|
||||
# ask_llm devuelve "" (error/red caida) -> titulos genericos + note.
|
||||
_patch_ask_llm(monkeypatch, "")
|
||||
|
||||
out = describe_clusters_llm(_PROFILES, _FEATURES)
|
||||
assert out["clusters"][0]["title"] == "Cluster 0"
|
||||
assert out["clusters"][1]["title"] == "Cluster 1"
|
||||
assert out["clusters"][0]["description"] == ""
|
||||
assert out["note"] == "LLM no disponible"
|
||||
assert out["model"] == "claude-haiku-4-5-20251001"
|
||||
|
||||
|
||||
def test_describe_clusters_llm_degrades_on_unparseable_response(monkeypatch):
|
||||
_patch_ask_llm(monkeypatch, "lo siento, no puedo ayudarte con eso")
|
||||
|
||||
out = describe_clusters_llm(_PROFILES, _FEATURES)
|
||||
assert out["clusters"][0]["title"] == "Cluster 0"
|
||||
assert out["clusters"][1]["title"] == "Cluster 1"
|
||||
assert out["note"] == "parse fallido"
|
||||
|
||||
|
||||
def test_describe_clusters_llm_empty_list_skips_llm(monkeypatch):
|
||||
# Con lista vacia NO debe llamarse al LLM en absoluto.
|
||||
def boom(*args, **kwargs):
|
||||
raise AssertionError("ask_llm no debe llamarse con lista vacia")
|
||||
|
||||
mod = importlib.import_module("datascience.describe_clusters_llm")
|
||||
monkeypatch.setattr(mod, "ask_llm", boom)
|
||||
|
||||
out = describe_clusters_llm([], _FEATURES)
|
||||
assert out["clusters"] == []
|
||||
assert out["note"] == "sin clusters"
|
||||
|
||||
|
||||
def test_describe_clusters_llm_non_list_input_skips_llm():
|
||||
# Input no-lista (None) -> clusters vacio sin tocar la red.
|
||||
out = describe_clusters_llm(None, _FEATURES)
|
||||
assert out["clusters"] == []
|
||||
assert out["note"] == "sin clusters"
|
||||
assert out["model"] == "claude-haiku-4-5-20251001"
|
||||
@@ -1,95 +0,0 @@
|
||||
---
|
||||
name: project_clusters_2d
|
||||
kind: function
|
||||
lang: py
|
||||
domain: datascience
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def project_clusters_2d(columns: dict, k_min: int = 2, k_max: int = 8, max_points: int = 2000) -> dict"
|
||||
description: "PCA a 2D + KMeans sobre el MISMO subset numerico estandarizado, devolviendo proyeccion 2D y labels de cluster ALINEADOS por fila para pintar un scatter PCA coloreado por cluster. Estandariza una sola vez, elige k por silhouette y proyecta centroides al espacio PCA. Determinista."
|
||||
tags: [eda, models, clustering, pca, kmeans, scatter, dimensionality-reduction, datascience, sklearn]
|
||||
params:
|
||||
- name: columns
|
||||
desc: "Mapa {nombre_columna: [valores numericos]}. Listas alineadas por fila (misma longitud). Columnas no numericas o con <2 valores distintos se descartan; None/NaN descartan la fila completa (listwise)."
|
||||
- name: k_min
|
||||
desc: "Numero minimo de clusters a probar por silhouette (default 2). El minimo de filas validas requerido es max(3, k_min*2)."
|
||||
- name: k_max
|
||||
desc: "Numero maximo de clusters a probar (default 8). Se acota a min(k_max, n_filas_validas-1)."
|
||||
- name: max_points
|
||||
desc: "Tope de puntos devueltos en points/labels (default 2000). Si n_used lo supera, points y labels se submuestrean CONJUNTAMENTE con paso determinista para seguir alineados; el fit usa siempre todas las filas."
|
||||
output: "dict con points (proyeccion 2D, posiblemente submuestreada a max_points), labels (cluster de cada point, alineado con points), centers_2d (centroides en espacio PCA, len==best_k), best_k, silhouette, explained_2d ([var PC1, var PC2]), cluster_sizes (sobre n_used total), cluster_profiles (lista de {cluster, size, pct, centroid_original, distinctive top-3 por |z|, centroid_z}), feature_names, n_used (filas del fit antes de muestreo) y note (\"\" si ok). Con <2 columnas numericas o <max(3, k_min*2) filas validas devuelve best_k=0, listas vacias y note 'datos insuficientes' sin lanzar excepcion."
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [numpy, scikit-learn]
|
||||
tested: true
|
||||
tests: ["test_golden_three_blobs_aligned_projection_and_clusters", "test_edge_subsampling_keeps_points_labels_aligned", "test_edge_single_numeric_column_insufficient", "test_edge_too_few_rows_insufficient", "test_edge_non_numeric_column_dropped_without_error", "test_edge_constant_column_dropped"]
|
||||
test_file_path: "python/functions/datascience/project_clusters_2d_test.py"
|
||||
file_path: "python/functions/datascience/project_clusters_2d.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
import sys, os
|
||||
sys.path.insert(0, os.path.join("python", "functions"))
|
||||
from datascience.project_clusters_2d import project_clusters_2d
|
||||
|
||||
# Tres grupos gaussianos bien separados sobre 4 features.
|
||||
import numpy as np
|
||||
rng = np.random.default_rng(0)
|
||||
rows = []
|
||||
for center in (np.full(4, 0.0), np.full(4, 12.0), np.array([0.0, 12.0, 0.0, 12.0])):
|
||||
rows.extend(rng.normal(loc=center, scale=0.4, size=(50, 4)))
|
||||
mat = np.array(rows)
|
||||
columns = {f"f{j}": [float(v) for v in mat[:, j]] for j in range(4)}
|
||||
|
||||
res = project_clusters_2d(columns, k_min=2, k_max=8)
|
||||
print(res["best_k"]) # 3
|
||||
print(len(res["points"]), len(res["labels"])) # 150 150 (alineados)
|
||||
print(len(res["centers_2d"])) # == best_k
|
||||
print([round(v, 2) for v in res["explained_2d"]]) # varianza de PC1, PC2
|
||||
# Pintar: scatter(points[:,0], points[:,1], c=labels) + marcar centers_2d.
|
||||
```
|
||||
|
||||
## Cuando usarla
|
||||
|
||||
Cuando, durante un EDA, quieres un scatter 2D de un dataset tabular numerico
|
||||
coloreado por segmento descubierto automaticamente, y necesitas que cada punto
|
||||
de la proyeccion lleve su etiqueta de cluster correcta. Usala en vez de
|
||||
combinar `pca_explained` + `kmeans_segments` a mano: esas estandarizan por
|
||||
separado y descartan los labels, asi que sus salidas no se pueden cruzar fila a
|
||||
fila. Esta funcion garantiza esa alineacion (mismo X estandarizado para PCA y
|
||||
KMeans) y ademas proyecta los centroides KMeans al espacio PCA para dibujarlos.
|
||||
|
||||
## Gotchas
|
||||
|
||||
- Funcion pura y determinista (StandardScaler + PCA random_state=0 + KMeans
|
||||
random_state=0, n_init=10), pero requiere `numpy` y `scikit-learn` instalados.
|
||||
- `points`/`labels` pueden venir submuestreados si `n_used > max_points` (paso
|
||||
determinista `[::ceil(n_used/max_points)]`); `n_used`, `centers_2d`,
|
||||
`cluster_sizes` y `cluster_profiles` se calculan SIEMPRE sobre todas las filas.
|
||||
Cuando hay submuestreo, `note` lo indica.
|
||||
- `centroid_z` y `distinctive` estan en z-score (espacio escalado);
|
||||
`centroid_original` esta en las unidades originales (via
|
||||
`scaler.inverse_transform`). No mezcles ambos al interpretar.
|
||||
- `centers_2d` esta en el espacio PCA (coordenadas del scatter), no en unidades
|
||||
originales: pintalo sobre el mismo eje que `points`.
|
||||
- Silhouette baja con best_k alto sugiere que no hay estructura de cluster real;
|
||||
el scatter puede no mostrar grupos separados.
|
||||
|
||||
## Notas
|
||||
|
||||
Pieza de composicion que `pca_explained` + `kmeans_segments` no cubren: ambas
|
||||
estandarizan internamente por separado (cada una su propio `StandardScaler`) y
|
||||
`kmeans_segments` no expone los labels por fila, por lo que no se pueden cruzar
|
||||
con la `projection` de `pca_explained`. Esta funcion usa `sklearn` directo
|
||||
(StandardScaler una sola vez compartido por PCA y KMeans) para garantizar la
|
||||
alineacion `points[i] <-> labels[i]` y proyectar los centroides KMeans al
|
||||
espacio PCA. Coercion y listwise deletion siguen el estilo de `pca_explained`
|
||||
(None/NaN -> fila descartada, columnas no parseables o constantes descartadas).
|
||||
Degrada con gracia: con <2 columnas numericas o <max(3, k_min*2) filas validas
|
||||
devuelve `note: "datos insuficientes"` sin lanzar excepcion (try/except
|
||||
defensivo en todo el cuerpo).
|
||||
@@ -1,208 +0,0 @@
|
||||
"""Proyeccion PCA-2D + KMeans sobre el mismo subset, con puntos y labels alineados.
|
||||
|
||||
Estandariza una sola vez las columnas numericas (z-score), proyecta a 2D con PCA
|
||||
y clusteriza con KMeans sobre EXACTAMENTE la misma matriz escalada, de modo que
|
||||
la proyeccion 2D (`points`) y la etiqueta de cluster (`labels`) quedan alineadas
|
||||
fila a fila. Es la pieza que `pca_explained` + `kmeans_segments` no cubren: esas
|
||||
dos estandarizan por separado y descartan los labels, asi que sus salidas no se
|
||||
pueden cruzar para pintar un scatter PCA coloreado por cluster. Determinista.
|
||||
"""
|
||||
|
||||
import math
|
||||
|
||||
import numpy as np
|
||||
from sklearn.cluster import KMeans
|
||||
from sklearn.decomposition import PCA
|
||||
from sklearn.metrics import silhouette_score
|
||||
from sklearn.preprocessing import StandardScaler
|
||||
|
||||
|
||||
def project_clusters_2d(
|
||||
columns: dict,
|
||||
k_min: int = 2,
|
||||
k_max: int = 8,
|
||||
max_points: int = 2000,
|
||||
) -> dict:
|
||||
"""Proyecta a 2D (PCA) y clusteriza (KMeans) el mismo subset estandarizado.
|
||||
|
||||
PCA a 2D y KMeans se ajustan sobre la MISMA matriz estandarizada, por lo que
|
||||
`points` (proyeccion 2D) y `labels` (cluster por fila) quedan alineados por
|
||||
indice. El k se elige automaticamente por silhouette en el rango
|
||||
[k_min, min(k_max, n_rows-1)], igual criterio que `kmeans_segments`.
|
||||
Determinista: StandardScaler + PCA(random_state=0) + KMeans(random_state=0,
|
||||
n_init=10).
|
||||
|
||||
Args:
|
||||
columns: mapa {nombre_columna: [valores numericos]}. Listas alineadas por
|
||||
fila (misma longitud). Columnas no numericas o con menos de 2 valores
|
||||
distintos se descartan. None/NaN marcan filas a descartar listwise
|
||||
(una fila se elimina si cualquier feature falta).
|
||||
k_min: numero minimo de clusters a probar (default 2).
|
||||
k_max: numero maximo de clusters a probar (default 8). Se acota a
|
||||
min(k_max, n_rows_validas-1).
|
||||
max_points: tope de puntos devueltos en `points`/`labels`. Si las filas
|
||||
usadas superan este tope, se submuestrea points y labels CONJUNTAMENTE
|
||||
con paso determinista para mantenerlos alineados. El fit (best_k,
|
||||
silhouette, centroides, perfiles) usa SIEMPRE todas las filas.
|
||||
|
||||
Returns:
|
||||
dict con points (proyeccion 2D, posiblemente submuestreada a max_points),
|
||||
labels (cluster de cada point, alineado con points), centers_2d
|
||||
(centroides en espacio PCA, len == best_k), best_k, silhouette,
|
||||
explained_2d (varianza de PC1 y PC2), cluster_sizes (sobre n_used total),
|
||||
cluster_profiles (ver abajo), feature_names, n_used (filas del fit antes
|
||||
de muestreo) y note ("" si ok). Cada entrada de cluster_profiles:
|
||||
{cluster, size, pct, centroid_original (medias en escala original),
|
||||
centroid_z (z del centroide), distinctive (top 3 features por |z|)}.
|
||||
Con <2 columnas numericas o <max(3, k_min*2) filas validas devuelve
|
||||
best_k=0 y note "datos insuficientes" sin lanzar excepcion.
|
||||
"""
|
||||
feature_names: list[str] = []
|
||||
|
||||
def insufficient(names: list[str], n_used: int) -> dict:
|
||||
return {
|
||||
"best_k": 0,
|
||||
"points": [],
|
||||
"labels": [],
|
||||
"centers_2d": [],
|
||||
"cluster_profiles": [],
|
||||
"feature_names": names,
|
||||
"n_used": int(n_used),
|
||||
"note": "datos insuficientes",
|
||||
}
|
||||
|
||||
try:
|
||||
if not isinstance(columns, dict) or not columns:
|
||||
return insufficient([], 0)
|
||||
|
||||
# 1. Coerce a numerico, descartando columnas no parseables o constantes.
|
||||
numeric_cols: dict[str, list] = {}
|
||||
for name, values in columns.items():
|
||||
if not isinstance(values, (list, tuple)):
|
||||
continue
|
||||
coerced: list[float] = []
|
||||
usable = True
|
||||
for v in values:
|
||||
if v is None:
|
||||
coerced.append(math.nan)
|
||||
continue
|
||||
try:
|
||||
coerced.append(float(v))
|
||||
except (TypeError, ValueError):
|
||||
usable = False
|
||||
break
|
||||
if not usable:
|
||||
continue
|
||||
# Menos de 2 valores distintos no aporta varianza -> descartar.
|
||||
distinct = {x for x in coerced if not math.isnan(x)}
|
||||
if len(distinct) < 2:
|
||||
continue
|
||||
numeric_cols[name] = coerced
|
||||
|
||||
feature_names = list(numeric_cols.keys())
|
||||
if len(feature_names) < 2:
|
||||
return insufficient(feature_names, 0)
|
||||
|
||||
# 2. Matriz alineada por fila + listwise deletion (cualquier NaN -> fuera).
|
||||
matrix = np.array(
|
||||
[numeric_cols[n] for n in feature_names], dtype=float
|
||||
).T
|
||||
valid_mask = ~np.isnan(matrix).any(axis=1)
|
||||
data = matrix[valid_mask]
|
||||
|
||||
n_used = int(data.shape[0])
|
||||
min_rows = max(3, k_min * 2)
|
||||
if n_used < min_rows:
|
||||
return insufficient(feature_names, n_used)
|
||||
|
||||
# 3. Estandarizar UNA sola vez (guardamos el scaler para desestandarizar).
|
||||
scaler = StandardScaler()
|
||||
X_scaled = scaler.fit_transform(data)
|
||||
|
||||
# 4. PCA a 2D sobre la matriz escalada.
|
||||
pca = PCA(n_components=2, random_state=0)
|
||||
pca.fit(X_scaled)
|
||||
proj = pca.transform(X_scaled)
|
||||
|
||||
# 5. KMeans con seleccion automatica de k por silhouette (mismo X_scaled).
|
||||
upper_k = min(k_max, n_used - 1)
|
||||
if upper_k < k_min:
|
||||
return insufficient(feature_names, n_used)
|
||||
|
||||
best = None # (silhouette, k, model, labels)
|
||||
for k in range(k_min, upper_k + 1):
|
||||
model = KMeans(n_clusters=k, n_init=10, random_state=0)
|
||||
labels_k = model.fit_predict(X_scaled)
|
||||
if len(set(labels_k)) < 2:
|
||||
sil = -1.0
|
||||
else:
|
||||
sil = float(silhouette_score(X_scaled, labels_k))
|
||||
if best is None or sil > best[0]:
|
||||
best = (sil, k, model, labels_k)
|
||||
|
||||
best_sil, best_k, best_model, labels = best
|
||||
|
||||
# 6. Centroides KMeans (espacio escalado) proyectados al espacio PCA.
|
||||
centers_2d = pca.transform(best_model.cluster_centers_)
|
||||
|
||||
# 7. Perfiles por cluster sobre TODAS las filas usadas.
|
||||
centroids_original = scaler.inverse_transform(best_model.cluster_centers_)
|
||||
cluster_sizes: list[int] = []
|
||||
cluster_profiles: list[dict] = []
|
||||
for c in range(best_k):
|
||||
size = int(np.sum(labels == c))
|
||||
cluster_sizes.append(size)
|
||||
z_vec = best_model.cluster_centers_[c]
|
||||
orig_vec = centroids_original[c]
|
||||
centroid_z = {
|
||||
feature_names[j]: float(z_vec[j]) for j in range(len(feature_names))
|
||||
}
|
||||
centroid_original = {
|
||||
feature_names[j]: float(orig_vec[j])
|
||||
for j in range(len(feature_names))
|
||||
}
|
||||
order = np.argsort(np.abs(z_vec))[::-1]
|
||||
distinctive = [feature_names[int(j)] for j in order[:3]]
|
||||
cluster_profiles.append(
|
||||
{
|
||||
"cluster": int(c),
|
||||
"size": size,
|
||||
"pct": float(size / n_used) if n_used else 0.0,
|
||||
"centroid_original": centroid_original,
|
||||
"distinctive": distinctive,
|
||||
"centroid_z": centroid_z,
|
||||
}
|
||||
)
|
||||
|
||||
# 8. Muestreo determinista CONJUNTO de points + labels (mantiene alineacion).
|
||||
note = ""
|
||||
if n_used > max_points and max_points > 0:
|
||||
step = math.ceil(n_used / max_points)
|
||||
proj_out = proj[::step]
|
||||
labels_out = labels[::step]
|
||||
note = f"submuestreado a {len(proj_out)} de {n_used} puntos para visualizacion"
|
||||
else:
|
||||
proj_out = proj
|
||||
labels_out = labels
|
||||
|
||||
points = [[float(row[0]), float(row[1])] for row in proj_out]
|
||||
labels_list = [int(v) for v in labels_out]
|
||||
centers_list = [[float(row[0]), float(row[1])] for row in centers_2d]
|
||||
explained_2d = [float(x) for x in pca.explained_variance_ratio_]
|
||||
|
||||
return {
|
||||
"points": points,
|
||||
"labels": labels_list,
|
||||
"centers_2d": centers_list,
|
||||
"best_k": int(best_k),
|
||||
"silhouette": float(best_sil),
|
||||
"explained_2d": explained_2d,
|
||||
"cluster_sizes": cluster_sizes,
|
||||
"cluster_profiles": cluster_profiles,
|
||||
"feature_names": feature_names,
|
||||
"n_used": n_used,
|
||||
"note": note,
|
||||
}
|
||||
except Exception:
|
||||
# Lectura defensiva: nunca propagar excepciones al caller del EDA.
|
||||
return insufficient(feature_names, 0)
|
||||
@@ -1,127 +0,0 @@
|
||||
"""Tests para project_clusters_2d."""
|
||||
|
||||
import numpy as np
|
||||
|
||||
from project_clusters_2d import project_clusters_2d
|
||||
|
||||
|
||||
def _three_blobs(seed: int = 0, per_blob: int = 50, n_features: int = 4):
|
||||
"""Genera 3 gaussianas bien separadas en n_features dims, alineadas por fila.
|
||||
|
||||
Devuelve un dict {col: [valores]} con las columnas alineadas por fila.
|
||||
"""
|
||||
rng = np.random.default_rng(seed)
|
||||
base_centers = [
|
||||
np.full(n_features, 0.0),
|
||||
np.full(n_features, 12.0),
|
||||
np.array([0.0, 12.0, 0.0, 12.0][:n_features] + [0.0] * max(0, n_features - 4)),
|
||||
]
|
||||
rows: list[np.ndarray] = []
|
||||
for center in base_centers:
|
||||
pts = rng.normal(loc=center, scale=0.4, size=(per_blob, n_features))
|
||||
rows.extend(pts)
|
||||
mat = np.array(rows)
|
||||
return {f"f{j}": [float(v) for v in mat[:, j]] for j in range(n_features)}
|
||||
|
||||
|
||||
def test_golden_three_blobs_aligned_projection_and_clusters():
|
||||
columns = _three_blobs(seed=0, per_blob=50, n_features=4)
|
||||
result = project_clusters_2d(columns, k_min=2, k_max=8)
|
||||
|
||||
n_used = result["n_used"]
|
||||
assert n_used == 150
|
||||
assert result["note"] == ""
|
||||
|
||||
best_k = result["best_k"]
|
||||
assert 2 <= best_k <= 4
|
||||
|
||||
# points y labels alineados por fila.
|
||||
assert len(result["points"]) == len(result["labels"])
|
||||
assert len(result["points"]) == n_used # sin submuestreo (150 < 2000)
|
||||
|
||||
# Cada punto es un par (x, y).
|
||||
assert all(len(p) == 2 for p in result["points"])
|
||||
|
||||
# Labels dentro del rango [0, best_k).
|
||||
assert all(0 <= lbl < best_k for lbl in result["labels"])
|
||||
|
||||
# Centroides 2D: uno por cluster.
|
||||
assert len(result["centers_2d"]) == best_k
|
||||
assert all(len(c) == 2 for c in result["centers_2d"])
|
||||
|
||||
# Varianza explicada de los 2 componentes.
|
||||
assert len(result["explained_2d"]) == 2
|
||||
|
||||
# cluster_sizes cubre todas las filas usadas.
|
||||
assert sum(result["cluster_sizes"]) == n_used
|
||||
assert len(result["cluster_sizes"]) == best_k
|
||||
|
||||
# cluster_profiles: una entrada por cluster, con centroid_original poblado.
|
||||
assert len(result["cluster_profiles"]) == best_k
|
||||
for prof in result["cluster_profiles"]:
|
||||
assert set(prof["centroid_original"].keys()) == set(result["feature_names"])
|
||||
assert set(prof["centroid_z"].keys()) == set(result["feature_names"])
|
||||
assert 1 <= len(prof["distinctive"]) <= 3
|
||||
assert prof["size"] >= 0
|
||||
assert 0.0 <= prof["pct"] <= 1.0
|
||||
|
||||
|
||||
def test_edge_subsampling_keeps_points_labels_aligned():
|
||||
# max_points pequeño fuerza submuestreo conjunto de points + labels.
|
||||
columns = _three_blobs(seed=1, per_blob=50, n_features=3)
|
||||
result = project_clusters_2d(columns, k_min=2, k_max=6, max_points=40)
|
||||
|
||||
n_used = result["n_used"]
|
||||
assert n_used == 150 # el fit usa todas las filas
|
||||
|
||||
# points y labels submuestreados pero siempre con la misma longitud.
|
||||
assert len(result["points"]) == len(result["labels"])
|
||||
assert len(result["points"]) <= 40
|
||||
|
||||
# centers/sizes/profiles se calculan sobre TODOS los puntos.
|
||||
assert sum(result["cluster_sizes"]) == n_used
|
||||
assert len(result["centers_2d"]) == result["best_k"]
|
||||
assert result["note"] != "" # senala el submuestreo
|
||||
|
||||
|
||||
def test_edge_single_numeric_column_insufficient():
|
||||
columns = {"x": [float(i) for i in range(50)]}
|
||||
result = project_clusters_2d(columns, k_min=2, k_max=8)
|
||||
|
||||
assert result["best_k"] == 0
|
||||
assert result["note"] == "datos insuficientes"
|
||||
assert result["points"] == []
|
||||
assert result["labels"] == []
|
||||
assert result["centers_2d"] == []
|
||||
assert result["cluster_profiles"] == []
|
||||
|
||||
|
||||
def test_edge_too_few_rows_insufficient():
|
||||
# Solo 2 filas validas, min_rows = max(3, k_min*2) = 4 -> insuficiente.
|
||||
columns = {"x": [1.0, 5.0], "y": [2.0, 9.0]}
|
||||
result = project_clusters_2d(columns, k_min=2, k_max=8)
|
||||
|
||||
assert result["best_k"] == 0
|
||||
assert result["note"] == "datos insuficientes"
|
||||
|
||||
|
||||
def test_edge_non_numeric_column_dropped_without_error():
|
||||
# La columna de strings se descarta; quedan 3 numericas -> funciona.
|
||||
columns = _three_blobs(seed=2, per_blob=50, n_features=3)
|
||||
columns["label"] = ["a"] * len(columns["f0"])
|
||||
result = project_clusters_2d(columns, k_min=2, k_max=6)
|
||||
|
||||
assert result["best_k"] >= 2
|
||||
assert "label" not in result["feature_names"]
|
||||
assert set(result["feature_names"]) == {"f0", "f1", "f2"}
|
||||
assert len(result["points"]) == len(result["labels"])
|
||||
|
||||
|
||||
def test_edge_constant_column_dropped():
|
||||
# Una columna constante (0 varianza) se descarta por <2 valores distintos.
|
||||
columns = _three_blobs(seed=3, per_blob=50, n_features=3)
|
||||
columns["const"] = [7.0] * len(columns["f0"])
|
||||
result = project_clusters_2d(columns, k_min=2, k_max=6)
|
||||
|
||||
assert "const" not in result["feature_names"]
|
||||
assert result["best_k"] >= 2
|
||||
Reference in New Issue
Block a user