Compare commits

..

1 Commits

Author SHA1 Message Date
egutierrez 649de07d6b feat(eda): capítulo AutomaticEDA CAT DISTR + funciones cardinalidad/pie
Capítulo cat_distr del motor AutomaticEDA: distribuciones categóricas con
explicación de entropía de Shannon, métricas de cardinalidad por columna
(valores distintos, % distintos, total de filas, valores únicos, entropía y
su máximo log2(k) + normalizada), tabla top-k y un donut de las categorías
más comunes (top-k + «Otros»). Marca columnas id-like y dominadas.

Delegadas a fn-constructor (grupo eda):
- categorical_cardinality_block: deriva métricas de cardinalidad/entropía.
- categorical_top_pie_figure: figura donut top-k + «Otros», leyenda lateral.

Defensivo (dict-no-throw): None si no hay columnas categóricas; normaliza
mode_pct a escala 0-100 (summarize_categorical lo emite como fracción).
Tablas vía DataTable y figura perezosa: el paginador del núcleo garantiza
no-corte en PDF y PPTX. Tests: golden + edge (sin categóricas) + anti-corte
(label largo / muchas columnas) en ambos renderers.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:04:10 +02:00
11 changed files with 1494 additions and 412 deletions
@@ -1,221 +0,0 @@
"""LLM analysis chapter (ANÁLISIS LLM) — the interpretive layer, next to overview.
Third reference chapter for AutomaticEDA. Renders the ``llm`` block that the
``eda`` group function ``eda_llm_insights`` already produced and stored in the
``TableProfile`` — it does NOT call the LLM nor recompute anything. The block is
turned into clean, markdown-style document blocks so it reads as a real chapter
(table summary, row meaning, data dictionary, suggested analyses, cleaning
suggestions, PII findings) and, crucially, **nothing is ever cut** in PDF or
PPTX:
* Prose (summary, row meaning) → ``Markdown`` blocks the renderers wrap to whole
lines, so no word is lost no matter how long the text is.
* The data dictionary and PII findings → ``DataTable`` blocks the paginator
splits by rows (repeating the header) and whose long cells wrap inside their
column — wide, multi-row tables never overflow a page/slide.
* Cleaning suggestions and suggested analyses → ``Markdown`` bullet lists; each
item is a whole line the renderer wraps, never truncated mid-entry.
Position: this chapter is declared in ``chapters_registry.CHAPTER_ORDER`` right
after ``overview`` so the interpretation sits next to the table preview, as the
user asked ("va junto al overview").
Data source: the ``llm`` dict produced by ``eda_llm_insights`` (group ``eda``),
read from ``profile['llm']`` (or ``ctx['llm']`` as a fallback). Shape::
{
"summary": str, # what the table is, 2-3 sentences
"row_meaning": str, # what one row represents / granularity
"dictionary": [ {"column","description","business_meaning","unit"} ],
"pii": [ {"column","kind","severity"} ],
"cleaning": [str], # cleaning / transformation suggestions
"analyses": [str], # suggested questions / analyses / hypotheses
}
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
Reads everything defensively (``.get``) and NEVER raises; returns ``None`` when
the profile carries no LLM block (e.g. ``profile_table`` ran without
``run_llm``), so the chapter is simply omitted from the document.
"""
from __future__ import annotations
from .. import model
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "analisis_llm"
CHAPTER_TITLE = "Análisis LLM"
# Key under which eda_llm_insights stores its interpretive block in the profile.
LLM_KEY = "llm"
def _clean_text(value) -> str:
"""Coerce a value to a single trimmed line (collapse inner newlines).
Used for bullet items so each suggestion stays a single markdown bullet the
renderer wraps; never drops content, only normalizes whitespace.
"""
text = model._safe_str(value).strip()
if not text:
return ""
return " ".join(text.split())
def _para(value) -> str:
"""Coerce a value to trimmed prose, preserving paragraph breaks."""
text = model._safe_str(value).strip()
if not text:
return ""
# Keep blank-line paragraph breaks; collapse runs of spaces/tabs per line.
lines = [" ".join(ln.split()) for ln in text.splitlines()]
out: list = []
for ln in lines:
if ln or (out and out[-1] != ""):
out.append(ln)
return "\n".join(out).strip()
def _bullets(items) -> str:
"""Build a markdown bullet list from a sequence of strings.
Each item becomes one ``- ...`` line (a whole, wrappable unit). Empty items
and non-list inputs are handled gracefully; returns "" when there is nothing.
"""
if isinstance(items, str):
items = [items]
if not isinstance(items, (list, tuple)):
return ""
lines = []
for it in items:
text = _clean_text(it)
if text:
lines.append(f"- {text}")
return "\n".join(lines)
def _summary_blocks(llm: dict) -> list:
"""Heading + prose for the table summary, or [] if absent."""
text = _para(llm.get("summary"))
if not text:
return []
return [model.Heading(text="Resumen de la tabla", level=2),
model.Markdown(text=text)]
def _row_meaning_blocks(llm: dict) -> list:
"""Heading + prose for what one row represents, or [] if absent."""
text = _para(llm.get("row_meaning"))
if not text:
return []
return [model.Heading(text="Significado de una fila", level=2),
model.Markdown(text=text)]
def _dictionary_block(llm: dict):
"""DataTable for the data dictionary, or None if absent/empty.
Columns: Columna / Descripción / Significado de negocio / Unidad. The
paginator splits this by rows repeating the header and wraps long cells, so a
long dictionary (many columns) never gets cut.
"""
entries = llm.get("dictionary")
if not isinstance(entries, (list, tuple)) or not entries:
return None
header = ["Columna", "Descripción", "Significado de negocio", "Unidad"]
rows = []
for e in entries:
if not isinstance(e, dict):
# Be tolerant: a bare string still shows up as a description row.
rows.append(["", _clean_text(e), "", ""])
continue
rows.append([
_clean_text(e.get("column")) or "",
_clean_text(e.get("description")),
_clean_text(e.get("business_meaning")),
_clean_text(e.get("unit")),
])
if not rows:
return None
return model.DataTable(header=header, rows=rows, title="Diccionario de datos")
def _analyses_blocks(llm: dict) -> list:
"""Heading + bullet list of suggested analyses, or [] if absent."""
bullets = _bullets(llm.get("analyses"))
if not bullets:
return []
return [model.Heading(text="Análisis sugeridos", level=2),
model.Markdown(text=bullets)]
def _cleaning_blocks(llm: dict) -> list:
"""Heading + bullet list of cleaning suggestions, or [] if absent."""
bullets = _bullets(llm.get("cleaning"))
if not bullets:
return []
return [model.Heading(text="Limpieza sugerida", level=2),
model.Markdown(text=bullets)]
def _pii_block(llm: dict):
"""DataTable for PII/GDPR findings, or None if absent/empty."""
entries = llm.get("pii")
if not isinstance(entries, (list, tuple)) or not entries:
return None
header = ["Columna", "Tipo", "Severidad"]
rows = []
for e in entries:
if not isinstance(e, dict):
continue
rows.append([
_clean_text(e.get("column")) or "",
_clean_text(e.get("kind")),
_clean_text(e.get("severity")),
])
if not rows:
return None
return model.DataTable(
header=header, rows=rows, title="Datos personales (PII / RGPD)",
note="detección automática orientativa — revisar antes de tratar los datos")
def build_analisis_llm(profile: dict, ctx: dict):
"""Build the LLM analysis Chapter, or None if there is no LLM block.
Consumes ``profile['llm']`` (the block produced by ``eda_llm_insights``,
group ``eda``); falls back to ``ctx['llm']``. Returns ``None`` when no LLM
block is present or it carries no usable content, so the chapter is omitted
rather than rendering an empty section.
"""
profile = profile or {}
ctx = ctx or {}
llm = profile.get(LLM_KEY)
if not isinstance(llm, dict):
llm = ctx.get(LLM_KEY)
if not isinstance(llm, dict) or not llm:
return None
blocks: list = []
blocks += _summary_blocks(llm)
blocks += _row_meaning_blocks(llm)
dict_block = _dictionary_block(llm)
if dict_block is not None:
blocks.append(model.Heading(text="Diccionario de datos", level=2))
blocks.append(dict_block)
blocks += _analyses_blocks(llm)
blocks += _cleaning_blocks(llm)
pii_block = _pii_block(llm)
if pii_block is not None:
blocks.append(model.Heading(text="Datos personales (PII / RGPD)", level=2))
blocks.append(pii_block)
if not blocks:
return None # LLM block present but every field empty → omit chapter.
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -1,190 +0,0 @@
"""Tests for the ANÁLISIS LLM chapter — DoD: golden + edges + anti-cut.
Self-contained: builds a synthetic TableProfile carrying an ``llm`` block (the
shape ``eda_llm_insights`` produces) so the suite is fast and deterministic — no
DuckDB and no LLM call. Verifies:
* golden — ``build_analisis_llm`` yields the chapter and the full document
renders to PDF *and* PPTX with the summary, a suggested analysis, a cleaning
suggestion and a dictionary column all present;
* order — the chapter sits immediately after ``overview`` (user requirement);
* edges — a profile with no ``llm`` block (or None/empty/malformed) returns
``None`` and never raises;
* anti-cut — a long dictionary (40 rows) and a 150-char cleaning suggestion are
rendered to PDF and PPTX without losing a single row or word.
"""
import os
import re
import tempfile
from pypdf import PdfReader
from pptx import Presentation
from datascience.automatic_eda.chapters.analisis_llm import (
build_analisis_llm, CHAPTER_VERSION)
from datascience.automatic_eda.chapters_registry import build_document
from datascience.automatic_eda.model import Chapter, DataTable
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
def _profile() -> dict:
return {
"table": "ventas",
"source": "/data/ventas.csv",
"profiled_at": "2026-06-30T10:00:00+00:00",
"n_rows": 1000,
"n_cols": 2,
"quality_score": 92.5,
"columns": [
{"name": "precio", "inferred_type": "numeric", "null_pct": 0.0,
"null_count": 0,
"numeric": {"mean": 42.5, "median": 40.0, "min": 1.0,
"max": 100.0, "std": 12.3}},
{"name": "categoria", "inferred_type": "categorical",
"null_pct": 0.0, "null_count": 0,
"categorical": {"top": [{"value": "neumaticos", "count": 500}]}},
],
"llm": {
"summary": "Tabla de ventas por producto. Token SUMMARYTOKEN.",
"row_meaning": "Cada fila es una venta. Token ROWTOKEN.",
"dictionary": [
{"column": "precio", "description": "Precio unitario DESCTOKEN",
"business_meaning": "Ingreso por unidad", "unit": "EUR"},
{"column": "categoria", "description": "Familia de producto",
"business_meaning": "Segmento comercial", "unit": ""},
],
"pii": [{"column": "categoria", "kind": "ninguno", "severity": "low"}],
"cleaning": ["Quitar nulos de precio CLEANTOKEN",
"Normalizar mayusculas en categoria"],
"analyses": ["Estudiar relacion precio-categoria ANALYSISTOKEN",
"Detectar outliers de precio"],
},
}
def _pdf_text(path: str) -> str:
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
return re.sub(r"\s+", " ", txt)
def _pptx_text(path: str) -> str:
prs = Presentation(path)
parts = []
for sl in prs.slides:
for sh in sl.shapes:
if sh.has_text_frame:
parts.append(sh.text_frame.text)
if sh.has_table:
tb = sh.table
for r in range(len(tb.rows)):
for c in range(len(tb.columns)):
parts.append(tb.cell(r, c).text)
return re.sub(r"\s+", " ", " ".join(parts))
def test_golden_build_y_render_pdf_pptx():
prof = _profile()
ch = build_analisis_llm(prof, {})
assert ch is not None
assert ch.id == "analisis_llm"
assert ch.version == CHAPTER_VERSION
assert ch.blocks # non-empty.
with tempfile.TemporaryDirectory() as d:
out_pdf = os.path.join(d, "eda.pdf")
res = render_automatic_eda_pdf(prof, out_pdf, {"title": "EDA — ventas"})
assert res["path"] == out_pdf and os.path.exists(out_pdf)
ids = [c["id"] for c in res["chapters"]]
assert "analisis_llm" in ids
txt = _pdf_text(out_pdf)
# The user's required content: summary, suggested analyses, cleaning.
assert "SUMMARYTOKEN" in txt
assert "ANALYSISTOKEN" in txt
assert "CLEANTOKEN" in txt
assert "DESCTOKEN" in txt # data dictionary cell.
out_pptx = os.path.join(d, "eda.pptx")
res2 = render_automatic_eda_pptx(prof, out_pptx, {"title": "EDA — ventas"})
assert res2["path"] == out_pptx and os.path.exists(out_pptx)
ids2 = [c["id"] for c in res2["chapters"]]
assert "analisis_llm" in ids2
ptx = _pptx_text(out_pptx)
assert "SUMMARYTOKEN" in ptx
assert "ANALYSISTOKEN" in ptx
assert "CLEANTOKEN" in ptx
assert "DESCTOKEN" in ptx
def test_orden_capitulo_junto_a_overview():
chapters = build_document(_profile(), {})
ids = [c.id for c in chapters]
assert "overview" in ids and "analisis_llm" in ids
# User requirement: the LLM chapter sits right after overview.
assert ids.index("analisis_llm") == ids.index("overview") + 1
def test_edge_sin_llm_devuelve_none():
# No llm block at all.
prof = {k: v for k, v in _profile().items() if k != "llm"}
assert build_analisis_llm(prof, {}) is None
# None / empty / malformed never raise and yield None.
assert build_analisis_llm(None, None) is None
assert build_analisis_llm({}, {}) is None
assert build_analisis_llm({"llm": {}}, {}) is None
assert build_analisis_llm({"llm": "not-a-dict"}, {}) is None
# All-empty fields → omitted (no blocks).
empty = {"llm": {"summary": "", "dictionary": [], "cleaning": [],
"analyses": [], "pii": [], "row_meaning": ""}}
assert build_analisis_llm(empty, {}) is None
def test_edge_llm_via_ctx_fallback():
# The block may arrive in ctx instead of the profile.
prof = {k: v for k, v in _profile().items() if k != "llm"}
ctx = {"llm": {"summary": "Resumen via ctx CTXTOKEN."}}
ch = build_analisis_llm(prof, ctx)
assert ch is not None and ch.id == "analisis_llm"
def test_anti_cortes_diccionario_largo_y_limpieza_larga():
long_clean = ("Lorem ipsum dolor sit amet consectetur adipiscing elit sed do "
"eiusmod tempor incididunt ut labore et dolore magna aliqua "
"reprehenderit voluptate velit esse cillum dolore")
dictionary = [
{"column": f"col_{i}",
"description": f"Descripcion larga numero {i} con bastante texto para "
f"forzar el wrap dentro de la celda fila{i}",
"business_meaning": f"Significado de negocio {i}", "unit": "u"}
for i in range(40)
]
prof = {
"table": "t", "n_rows": 1, "n_cols": 1, "columns": [],
"llm": {"summary": "S", "dictionary": dictionary,
"cleaning": [long_clean], "analyses": ["A"]},
}
ch = build_analisis_llm(prof, {})
assert ch is not None
# Structure: the dictionary DataTable keeps ALL 40 rows — none dropped on
# construction (the renderers then split it by rows, repeating the header).
dts = [b for b in ch.blocks if isinstance(b, DataTable)]
assert any(len(dt.rows) == 40 for dt in dts)
with tempfile.TemporaryDirectory() as d:
out_pdf = os.path.join(d, "x.pdf")
render_automatic_eda_pdf([ch], out_pdf, {"write_manifest": False})
# 40 wide rows + a long cleaning line cannot fit one page → it spills,
# which is exactly the no-cut behaviour (paginate, never truncate).
assert len(PdfReader(out_pdf).pages) > 1
txt = _pdf_text(out_pdf)
# The long cleaning suggestion is wrapped word-by-word, not truncated.
for word in ("Lorem", "incididunt", "reprehenderit", "voluptate", "cillum"):
assert word in txt
out_pptx = os.path.join(d, "x.pptx")
res2 = render_automatic_eda_pptx([ch], out_pptx, {"write_manifest": False})
assert res2["n_slides"] > 1 # table + long text spill across slides.
ptx = _pptx_text(out_pptx)
for word in ("Lorem", "reprehenderit", "voluptate"):
assert word in ptx
@@ -0,0 +1,402 @@
"""Categorical distributions chapter (CAT DISTR).
Third reference chapter for AutomaticEDA. For every categorical column it shows,
fulfilling the user's request:
1. A short opening explanation of **Shannon entropy** (what it measures, its 0
and log2(k) bounds, the normalized 01 version) and the dataset row total used
as a comparison baseline.
2. Per column, a cardinality key/value table: distinct values, ``% distinct``
(distinct / total rows), total dataset rows, singleton values (frequency 1),
entropy with its theoretical maximum and the normalized ratio, mode, imbalance
and string-length stats.
3. A short note flagging problematic cardinality (id-like ≈100% distinct, or a
single dominating category).
4. A ``top-k`` table (value / count / %).
5. A **donut pie chart** of the most common categories (top-k + an "Otros"
bucket), drawn lazily so the renderers scale it to fit entirely.
Data comes from the ``eda`` group: each ``columns[i]['categorical']`` is the
output of ``summarize_categorical`` (``top[{value,count,pct}]``, ``mode``,
``n_distinct``, ``entropy``, ``imbalance``, ``len_min/mean/max``). The derived
cardinality metrics and the pie figure are delegated to two registry functions
(``categorical_cardinality_block`` and ``categorical_top_pie_figure``); both are
imported lazily and degrade to a minimal inline fallback so this chapter never
raises even if they are unavailable.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
"""
from __future__ import annotations
import math
from .. import model
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "cat_distr"
CHAPTER_TITLE = "Distribuciones categóricas"
# Cap the number of categorical columns rendered to keep the document bounded;
# the rest are summarized in a closing note (no silent truncation).
MAX_COLS = 40
# Rows shown in each top-k table and explicit slices in the pie.
TOP_TABLE_ROWS = 15
PIE_TOP_K = 6
# Truncate very long category labels in tables (the renderer also wraps).
LABEL_MAX = 48
def _fmt_int(value) -> str:
if value is None:
return ""
try:
return f"{int(value):,}".replace(",", ".")
except (TypeError, ValueError):
return str(value)
def _fmt_num(value, decimals: int = 3) -> str:
if value is None:
return ""
if isinstance(value, bool):
return str(value)
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return str(value)
def _fmt_pct_value(value, decimals: int = 1) -> str:
"""Format an already-in-percent value (0100). None -> placeholder."""
if value is None:
return ""
try:
return f"{float(value):.{decimals}f}%"
except (TypeError, ValueError):
return str(value)
def _pct_from_maybe_fraction(value, decimals: int = 1) -> str:
"""Format a percentage that may arrive as a 01 fraction or a 0100 number."""
if value is None:
return ""
try:
v = float(value)
except (TypeError, ValueError):
return str(value)
if v <= 1.0:
v *= 100.0
return f"{v:.{decimals}f}%"
def _truncate(text: str, limit: int = LABEL_MAX) -> str:
s = model._safe_str(text)
if len(s) <= limit:
return s
return s[: max(1, limit - 1)].rstrip() + ""
def _is_categorical(col: dict) -> bool:
"""A column is treated as categorical when it carries a non-empty top list
and is not a pure numeric column (numeric columns may still expose a top)."""
if not isinstance(col, dict):
return False
cat = col.get("categorical")
if not (isinstance(cat, dict) and cat.get("top")):
return False
if col.get("inferred_type") == "numeric":
return False
return True
def _cardinality(cat: dict, n_rows) -> dict:
"""Derive cardinality metrics for a column, via the registry function when
available, otherwise a minimal inline fallback. Never raises."""
try:
from datascience.categorical_cardinality_block import (
categorical_cardinality_block,
)
out = categorical_cardinality_block(cat=cat, n_rows=n_rows)
if isinstance(out, dict):
return out
except Exception: # noqa: BLE001 — fall back to the inline derivation.
pass
return _fallback_cardinality(cat, n_rows)
def _fallback_cardinality(cat: dict, n_rows) -> dict:
cat = cat or {}
top = cat.get("top") or []
n_distinct = cat.get("n_distinct")
entropy = cat.get("entropy")
try:
nr = int(n_rows) if n_rows is not None else None
except (TypeError, ValueError):
nr = None
pct_distinct = None
if isinstance(n_distinct, (int, float)) and nr:
pct_distinct = float(n_distinct) / nr * 100.0
entropy_max = None
if isinstance(n_distinct, (int, float)):
entropy_max = math.log2(n_distinct) if n_distinct > 1 else 0.0
entropy_norm = None
if isinstance(entropy, (int, float)) and entropy_max:
entropy_norm = max(0.0, min(1.0, float(entropy) / entropy_max))
mode_pct = cat.get("mode_pct")
if mode_pct is None and top and isinstance(top[0], dict):
mode_pct = top[0].get("pct")
# Normalize to a 0100 scale: summarize_categorical emits a 01 fraction.
if isinstance(mode_pct, (int, float)) and not isinstance(mode_pct, bool):
mode_pct = float(mode_pct) * 100.0 if mode_pct <= 1.0 else float(mode_pct)
else:
mode_pct = None
n_singletons = None
if top:
n_singletons = sum(
1 for t in top if isinstance(t, dict) and t.get("count") == 1)
return {
"n_distinct": n_distinct,
"n_rows": nr,
"pct_distinct": pct_distinct,
"entropy": entropy,
"entropy_max": entropy_max,
"entropy_norm": entropy_norm,
"mode": cat.get("mode"),
"mode_pct": mode_pct,
"imbalance": cat.get("imbalance"),
"n_singletons": n_singletons,
"n_singletons_partial": (
isinstance(n_distinct, (int, float)) and n_distinct > len(top)),
"len_min": cat.get("len_min"),
"len_mean": cat.get("len_mean"),
"len_max": cat.get("len_max"),
"id_like": pct_distinct is not None and pct_distinct >= 99.0,
"dominated": mode_pct is not None and mode_pct >= 90.0,
}
def _pie_make(top, n_distinct, title, n_rows):
"""Return a zero-arg callable that builds the donut figure lazily."""
def make():
try:
from datascience.categorical_top_pie_figure import (
categorical_top_pie_figure,
)
return categorical_top_pie_figure(
top=top, n_distinct=n_distinct or 0, title=title,
top_k=PIE_TOP_K, n_rows=n_rows)
except Exception: # noqa: BLE001 — minimal local fallback figure.
return _fallback_pie(top, title)
return make
def _fallback_pie(top, title):
"""Minimal donut figure used only if the registry function is unavailable."""
import matplotlib
matplotlib.use("Agg")
from matplotlib.figure import Figure
fig = Figure(figsize=(5.0, 3.2))
ax = fig.add_subplot(111)
items = [t for t in (top or [])
if isinstance(t, dict) and isinstance(t.get("count"), (int, float))]
items = sorted(items, key=lambda t: t.get("count") or 0, reverse=True)
head = items[:PIE_TOP_K]
rest = items[PIE_TOP_K:]
labels = [_truncate(t.get("value"), 20) for t in head]
sizes = [float(t.get("count") or 0) for t in head]
if rest:
labels.append(f"Otros ({len(rest)})")
sizes.append(sum(float(t.get("count") or 0) for t in rest))
if not sizes or sum(sizes) <= 0:
ax.text(0.5, 0.5, "sin datos categóricos", ha="center", va="center")
ax.axis("off")
return fig
ax.pie(sizes, labels=None, wedgeprops={"width": 0.42},
autopct=lambda p: f"{p:.0f}%" if p >= 4 else "")
ax.legend(labels, loc="center left", bbox_to_anchor=(1.0, 0.5),
fontsize=7, frameon=False)
ax.set_title(_truncate(title, 40))
fig.tight_layout()
return fig
def _normalize_card(card: dict) -> dict:
"""Make the cardinality dict robust regardless of the upstream scale.
``summarize_categorical`` emits ``mode_pct`` as a 01 fraction; bring it to a
0100 scale and recompute the ``dominated`` flag here so the chapter is
correct whether it consumed the registry function or the inline fallback.
"""
card = dict(card or {})
mp = card.get("mode_pct")
if isinstance(mp, (int, float)) and not isinstance(mp, bool):
mp = float(mp) * 100.0 if mp <= 1.0 else float(mp)
else:
mp = None
card["mode_pct"] = mp
card["dominated"] = mp is not None and mp >= 90.0
pd = card.get("pct_distinct")
card["id_like"] = isinstance(pd, (int, float)) and pd >= 99.0
return card
def _cardinality_block(card: dict):
"""KVTable with the cardinality / entropy metrics for one column."""
n_singletons = card.get("n_singletons")
if n_singletons is not None and card.get("n_singletons_partial"):
singletons = f"{_fmt_int(n_singletons)} (en top mostrado)"
elif n_singletons is not None:
singletons = _fmt_int(n_singletons)
else:
singletons = ""
entropy_ref = _fmt_num(card.get("entropy"))
emax = card.get("entropy_max")
if emax is not None:
entropy_ref = f"{entropy_ref} (máx {_fmt_num(emax)})"
mode = card.get("mode")
mode_pct = card.get("mode_pct")
mode_str = "" if mode is None else model._safe_str(mode)
if mode is not None and mode_pct is not None:
mode_str = f"{mode_str} ({_fmt_pct_value(mode_pct)})"
rows = [
("Valores distintos", _fmt_int(card.get("n_distinct"))),
("% distintos", _fmt_pct_value(card.get("pct_distinct"))),
("Total filas (dataset)", _fmt_int(card.get("n_rows"))),
("Valores únicos (frecuencia 1)", singletons),
("Entropía (bits)", entropy_ref),
("Entropía normalizada (01)", _fmt_num(card.get("entropy_norm"))),
("Moda", mode_str),
]
imbalance = card.get("imbalance")
if imbalance is not None:
rows.append(("Desbalance", _fmt_num(imbalance)))
lm = card.get("len_min")
lmean = card.get("len_mean")
lmax = card.get("len_max")
if any(v is not None for v in (lm, lmean, lmax)):
rows.append((
"Longitud (mín/media/máx)",
f"{_fmt_num(lm)} / {_fmt_num(lmean)} / {_fmt_num(lmax)}"))
return model.KVTable(rows=rows, title="Cardinalidad")
def _flag_note(card: dict):
"""Return a Note flagging problematic cardinality, or None."""
if card.get("id_like"):
return model.Note(
"Casi todos los valores son distintos (≈100% distintos): la columna "
"se comporta como un identificador y aporta poco para agrupar o "
"comparar categorías.")
if card.get("dominated"):
mp = card.get("mode_pct")
mp_str = _fmt_pct_value(mp) if mp is not None else "muy alta"
return model.Note(
f"Una sola categoría domina la columna (moda {mp_str}): la "
"distribución está muy desbalanceada.")
return None
def _topk_table(cat: dict):
"""DataTable value / count / % for the top categories."""
top = cat.get("top") or []
n_distinct = cat.get("n_distinct")
header = ["Valor", "Conteo", "%"]
rows = []
for t in top[:TOP_TABLE_ROWS]:
if not isinstance(t, dict):
continue
rows.append([
model._safe_str(t.get("value")),
_fmt_int(t.get("count")),
_pct_from_maybe_fraction(t.get("pct")),
])
if not rows:
return None
shown = len(rows)
if isinstance(n_distinct, (int, float)) and n_distinct > shown:
note = f"top {shown} de {_fmt_int(n_distinct)} categorías distintas"
else:
note = f"{shown} categorías"
return model.DataTable(header=header, rows=rows, title="Top categorías",
note=note)
def _intro_blocks(n_rows):
total = _fmt_int(n_rows)
text = (
"La **entropía de Shannon** mide cómo de repartidos están los valores de "
"una columna categórica, en bits. Vale 0 cuando una sola categoría "
"concentra todas las filas (máxima previsibilidad) y alcanza su máximo, "
"log2(k) para k categorías distintas, cuando todas aparecen por igual "
"(máxima diversidad). La **entropía normalizada** (entropía dividida por "
"su máximo) la lleva al rango 01 para comparar columnas con distinto "
"número de categorías. Para cada columna se muestran los valores "
"distintos, el porcentaje que representan sobre el total de filas, los "
"valores únicos (que aparecen una sola vez), la tabla de las categorías "
"más frecuentes y un gráfico de tarta (donut) de las más comunes."
)
if n_rows is not None:
text += f" El dataset tiene {total} filas en total como referencia."
return [
model.Heading(text="Entropía y cardinalidad", level=2),
model.Markdown(text=text),
]
def build_cat_distr(profile: dict, ctx: dict):
"""Build the categorical-distributions Chapter, or None if the dataset has
no categorical columns."""
profile = profile or {}
ctx = ctx or {}
cols = profile.get("columns") or []
cat_cols = [c for c in cols if _is_categorical(c)]
if not cat_cols:
return None
n_rows = profile.get("n_rows")
blocks = list(_intro_blocks(n_rows))
rendered = cat_cols[:MAX_COLS]
for col in rendered:
name = col.get("name") or "(columna)"
cat = col.get("categorical") or {}
card = _normalize_card(_cardinality(cat, n_rows))
blocks.append(model.Heading(text=str(name), level=2))
blocks.append(_cardinality_block(card))
note = _flag_note(card)
if note is not None:
blocks.append(note)
topk = _topk_table(cat)
if topk is not None:
blocks.append(topk)
blocks.append(model.Figure(
make=_pie_make(cat.get("top") or [], card.get("n_distinct"),
str(name), n_rows),
caption=(f"Categorías más comunes de «{_truncate(name, 32)}» "
"(donut: top-k + «Otros»)")))
if len(cat_cols) > len(rendered):
omitted = len(cat_cols) - len(rendered)
blocks.append(model.Note(
f"Se muestran las primeras {len(rendered)} columnas categóricas; "
f"quedan {omitted} sin mostrar para mantener acotado el informe."))
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -0,0 +1,186 @@
"""Tests for the CAT DISTR chapter — DoD: golden + edges + anti-cut.
Self-contained: builds synthetic TableProfiles (no DuckDB) so the suite is fast
and deterministic. Verifies that ``build_cat_distr`` emits the blocks the user
asked for (entropy intro, distinct/total/%-distinct/unique metrics, top-k table
and a donut figure), that the chapter renders inside the full document to both
PDF and PPTX showing that content, that a profile with no categorical columns
yields ``None`` without raising, and that long labels / many columns are never
cut in either output.
"""
import os
import re
import tempfile
from pypdf import PdfReader
from pptx import Presentation
from datascience.automatic_eda.model import (
DataTable, Figure, Heading, KVTable, Note,
)
from datascience.automatic_eda.chapters.cat_distr import (
CHAPTER_ID, CHAPTER_VERSION, build_cat_distr,
)
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
def _profile() -> dict:
return {
"table": "productos",
"source": "/data/productos.csv",
"profiled_at": "2026-06-30T10:00:00+00:00",
"n_rows": 1000,
"n_cols": 3,
"quality_score": 90.0,
"columns": [
{"name": "precio", "inferred_type": "numeric", "null_pct": 0.0,
"null_count": 0,
"numeric": {"mean": 42.5, "median": 40.0, "min": 1.0,
"max": 100.0, "std": 12.3}},
{"name": "categoria", "inferred_type": "categorical",
"null_pct": 0.0, "null_count": 0, "distinct_count": 8,
"categorical": {
"top": [
{"value": "neumaticos", "count": 500, "pct": 0.5},
{"value": "aceite", "count": 300, "pct": 0.3},
{"value": "filtros", "count": 120, "pct": 0.12},
{"value": "frenos", "count": 80, "pct": 0.08},
],
"mode": "neumaticos", "n_distinct": 8, "entropy": 1.6,
"imbalance": 6.25, "len_min": 6, "len_mean": 7.5,
"len_max": 10}},
{"name": "uuid", "inferred_type": "categorical",
"null_pct": 0.0, "null_count": 0, "distinct_count": 1000,
"categorical": {
"top": [{"value": f"id-{i}", "count": 1} for i in range(5)],
"mode": "id-0", "n_distinct": 1000, "entropy": 9.97,
"imbalance": 1.0}},
],
}
def _pdf_text(path: str) -> str:
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
return re.sub(r"\s+", " ", txt)
def _pptx_text(path: str) -> str:
prs = Presentation(path)
parts = []
for sl in prs.slides:
for sh in sl.shapes:
if sh.has_text_frame:
parts.append(sh.text_frame.text)
if sh.has_table:
tb = sh.table
for r in range(len(tb.rows)):
for c in range(len(tb.columns)):
parts.append(tb.cell(r, c).text)
return re.sub(r"\s+", " ", " ".join(parts))
def _kinds(chapter):
return [b.kind for b in chapter.blocks]
def test_golden_build_cat_distr_emite_bloques_pedidos():
ch = build_cat_distr(_profile(), {})
assert ch is not None
assert ch.id == CHAPTER_ID
assert ch.version == CHAPTER_VERSION
kinds = _kinds(ch)
# Entropy intro present.
headings = [b.text for b in ch.blocks if isinstance(b, Heading)]
assert any("Entrop" in h for h in headings)
md = next(b for b in ch.blocks if b.kind == "markdown")
assert "entropía" in md.text.lower() and "log2" in md.text
# Cardinality metrics: distinct, total rows, %-distinct, unique values.
kv = next(b for b in ch.blocks if isinstance(b, KVTable))
labels = [r[0] for r in kv.rows]
assert "Valores distintos" in labels
assert "% distintos" in labels
assert "Total filas (dataset)" in labels
assert "Valores únicos (frecuencia 1)" in labels
assert any("Entropía" in lbl for lbl in labels)
# Top-k table + pie figure.
dt = next(b for b in ch.blocks if isinstance(b, DataTable))
assert dt.header == ["Valor", "Conteo", "%"]
assert any("neumaticos" in str(cell) for row in dt.rows for cell in row)
assert any(isinstance(b, Figure) for b in ch.blocks)
# id-like column flagged with a Note.
assert any(isinstance(b, Note) and "identificador" in b.text
for b in ch.blocks)
def test_golden_render_pdf_muestra_categoricas():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "eda.pdf")
res = render_automatic_eda_pdf(_profile(), out, {"title": "EDA"})
assert res["path"] == out and os.path.exists(out)
assert CHAPTER_ID in [c["id"] for c in res["chapters"]]
txt = _pdf_text(out)
assert "Entrop" in txt
assert "distintos" in txt
assert "categoria" in txt and "neumaticos" in txt
assert "donut" in txt # figure caption rendered as text.
assert "identificador" in txt # id-like note rendered.
def test_golden_render_pptx_muestra_categoricas():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "eda.pptx")
res = render_automatic_eda_pptx(_profile(), out, {"title": "EDA"})
assert res["path"] == out and os.path.exists(out)
assert CHAPTER_ID in [c["id"] for c in res["chapters"]]
txt = _pptx_text(out)
assert "Entrop" in txt
assert "categoria" in txt and "neumaticos" in txt
assert "distintos" in txt
def test_edge_sin_categoricas_devuelve_none():
only_numeric = {
"n_rows": 10, "columns": [
{"name": "x", "inferred_type": "numeric",
"numeric": {"mean": 1.0}}]}
assert build_cat_distr(only_numeric, {}) is None
# None / empty / no-columns never raise and yield None.
assert build_cat_distr(None, None) is None
assert build_cat_distr({}, {}) is None
assert build_cat_distr({"columns": []}, {}) is None
def test_anti_corte_label_largo_y_muchas_columnas():
long_label = ("Lorem ipsum dolor sit amet consectetur adipiscing elit sed "
"do eiusmod tempor incididunt ut labore reprehenderit voluptate")
cols = []
for i in range(30):
cols.append({
"name": f"cat_{i}", "inferred_type": "categorical",
"distinct_count": 3,
"categorical": {
"top": [{"value": long_label, "count": 60},
{"value": "b", "count": 30},
{"value": "c", "count": 10}],
"mode": long_label, "n_distinct": 3, "entropy": 1.2}})
profile = {"table": "t", "source": "t.csv", "n_rows": 100,
"n_cols": len(cols), "columns": cols}
ch = build_cat_distr(profile, {})
assert ch is not None
with tempfile.TemporaryDirectory() as d:
pdf = os.path.join(d, "anti.pdf")
res = render_automatic_eda_pdf(profile, pdf, {"write_manifest": False})
assert res["path"] == pdf
assert res["n_pages"] > 1 # many columns spilled across pages, OK.
txt = _pdf_text(pdf)
# Long label wrapped (not truncated): every word survives.
for word in ("Lorem", "incididunt", "reprehenderit", "voluptate"):
assert word in txt
# PPTX path must not raise either.
pptx = os.path.join(d, "anti.pptx")
res2 = render_automatic_eda_pptx(profile, pptx,
{"write_manifest": False})
assert res2["path"] == pptx and os.path.exists(pptx)
@@ -28,12 +28,12 @@ from . import model
CHAPTER_ORDER = [
"portada", # cover
"overview", # df.head + columns/types/nulls/examples + describe
"analisis_llm", # LLM interpretation — sits next to overview (user request)
"num_distr", # numeric distributions
"cat_distr", # categorical distributions
"calidad", # data quality
"correlacion", # correlations / associations
"modelos", # cheap models (PCA/KMeans/outliers)
"analisis_llm", # LLM interpretation
"timeseries", # time-series analysis
"geospatial", # geospatial
"agregacion", # aggregations / pivots
@@ -0,0 +1,115 @@
---
id: categorical_cardinality_block_py_datascience
name: categorical_cardinality_block
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def categorical_cardinality_block(cat: dict, n_rows: int) -> dict"
description: "Deriva métricas de cardinalidad listas para renderizar a partir de la salida de summarize_categorical para UNA columna categórica más el número total de filas. Calcula pct_distinct, entropy_max=log2(n_distinct), entropy_norm (recortada a [0,1]), n_singletons (sobre el top visible) y los flags id_like / dominated. NO recalcula la entropía ni reimplementa summarize_categorical: la consume. Estilo dict-no-throw del grupo eda — nunca lanza."
tags: [eda, categorical, cardinality, entropy, profiling, datascience, pure]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math]
example: |
from categorical_cardinality_block import categorical_cardinality_block
cat = {"top": [{"value": "a", "count": 5, "pct": 0.5}], "mode": "a",
"mode_pct": 0.5, "n_distinct": 4, "entropy": 1.685, "imbalance": 5.0,
"len_min": 1, "len_mean": 1.0, "len_max": 1}
block = categorical_cardinality_block(cat, n_rows=10)
tested: true
tests:
- "test_normal_case"
- "test_empty_cat_does_not_raise"
- "test_none_cat_does_not_raise"
- "test_n_rows_zero_no_zero_division"
- "test_id_like_when_distinct_near_rows"
- "test_dominated_when_mode_pct_high"
- "test_mode_pct_fallback_from_top_fraction"
- "test_n_singletons_partial_when_top_truncated"
- "test_single_distinct_value_entropy_norm_none"
test_file_path: "python/functions/datascience/categorical_cardinality_block_test.py"
file_path: "python/functions/datascience/categorical_cardinality_block.py"
params:
- name: cat
desc: "Dict producido por summarize_categorical para UNA columna categórica. Claves leídas (todas opcionales, lectura defensiva): top (list de {value,count,pct}), mode, mode_pct (puede faltar), n_distinct, entropy (Shannon en bits), imbalance, len_min, len_mean, len_max. None o no-dict se tratan como {}."
- name: n_rows
desc: "Número total de filas del dataset. Usado para pct_distinct. Si es 0 o None, pct_distinct sale None (sin ZeroDivisionError)."
output: "Dict con exactamente 16 claves, todas siempre presentes: n_distinct, n_rows, pct_distinct, entropy, entropy_max, entropy_norm, mode, mode_pct, imbalance, n_singletons, n_singletons_partial, len_min, len_mean, len_max, id_like, dominated. Valores None/False cuando no son derivables; la función nunca lanza. pct_distinct en escala 0-100. entropy_max=log2(n_distinct) (0.0 si n_distinct in {0,1}). entropy_norm=entropy/entropy_max recortada a [0,1]. n_singletons = nº de elementos de top con count==1 (None si top vacío). n_singletons_partial=True si n_distinct>len(top). id_like=pct_distinct>=99. dominated=mode_pct>=90."
---
## Ejemplo
```python
from categorical_cardinality_block import categorical_cardinality_block
# Salida típica de summarize_categorical para una columna, con n_rows del dataset.
cat = {
"top": [
{"value": "a", "count": 5, "pct": 0.5},
{"value": "b", "count": 3, "pct": 0.3},
{"value": "c", "count": 1, "pct": 0.1},
{"value": "d", "count": 1, "pct": 0.1},
],
"mode": "a",
"mode_pct": 0.5,
"n_distinct": 4,
"entropy": 1.685, # Shannon en bits (<= log2(4) = 2.0)
"imbalance": 5.0,
"len_min": 1, "len_mean": 1.0, "len_max": 1,
}
categorical_cardinality_block(cat, n_rows=10)
# {
# "n_distinct": 4, "n_rows": 10,
# "pct_distinct": 40.0, # 4 / 10 * 100
# "entropy": 1.685,
# "entropy_max": 2.0, # log2(4)
# "entropy_norm": 0.8425, # 1.685 / 2.0, recortado a [0,1]
# "mode": "a", "mode_pct": 0.5,
# "imbalance": 5.0,
# "n_singletons": 2, # c y d con count == 1
# "n_singletons_partial": False, # top cubre los 4 distintos
# "len_min": 1, "len_mean": 1.0, "len_max": 1,
# "id_like": False, # pct_distinct 40 < 99
# "dominated": False, # mode_pct 0.5 < 90
# }
```
## Cuando usarla
Úsala justo después de `summarize_categorical`, cuando vayas a renderizar el
bloque de cardinalidad de una columna categórica en un EDA: necesitas el ratio
de valores distintos (`pct_distinct`), la entropía normalizada al rango `[0,1]`
para comparar columnas con cardinalidades distintas, el conteo de singletons, y
las banderas heurísticas `id_like` (la columna parece un identificador) y
`dominated` (una sola categoría domina). Pásale el dict crudo de
`summarize_categorical` para esa columna y el `n_rows` total del dataset. No
reimplementa nada: solo deriva métricas de presentación a partir de lo ya
calculado.
## Gotchas
- **`mode_pct` se pasa tal cual viene en `cat`.** `summarize_categorical`
produce `mode_pct` como **fracción** (01), no como porcentaje. El flag
`dominated` compara `mode_pct >= 90.0`, así que con la salida cruda de
`summarize_categorical` (fracciones) `dominated` no se dispara: aliméntalo con
`mode_pct` en escala 0100 si quieres usar esa bandera. Solo el camino de
*fallback* (cuando `cat` no trae `mode_pct` y se deriva de `top[0]['pct']`)
normaliza una fracción `<= 1` multiplicándola por 100.
- **`n_singletons` solo cubre el `top` visible.** Si `summarize_categorical` se
llamó con `top_k` pequeño, hay valores fuera del top; en ese caso
`n_singletons_partial` es `True` para avisar de que el conteo es parcial.
- **`pct_distinct` es `None` si `n_rows` es 0 o `None`** (no lanza
`ZeroDivisionError`); por tanto `id_like` queda `False` en ese caso.
- **`entropy_norm` es `None` cuando `entropy_max <= 0`** (columna constante,
`n_distinct in {0,1}`): no hay división por cero y no se inventa un 0/1.
- **No recalcula la entropía.** Si `cat['entropy']` es incoherente con
`n_distinct`, `entropy_norm` se recorta a `[0,1]` pero el valor de entrada no
se corrige.
- **`bool` no cuenta como número.** Un `True`/`False` en una clave numérica de
`cat` se trata como ausente (`None`), por la guarda defensiva.
@@ -0,0 +1,132 @@
"""Pure EDA helper: cardinality metrics block from a `summarize_categorical` output.
Part of the `eda` capability group. Consumes the per-column dict produced by
``summarize_categorical`` (for a single categorical/text column) plus the total
row count of the dataset and derives render-ready cardinality metrics: distinct
ratio, normalized entropy, singleton count, and the ``id_like`` / ``dominated``
flags.
It does NOT recompute the entropy nor reimplement ``summarize_categorical`` — it
only reads that function's output. Dict-no-throw style of the `eda` group: it
never raises. Missing or malformed inputs yield ``None``/``False``/``0`` for the
affected keys, never an exception. Stdlib only (``math.log2``).
"""
from math import log2
def _num(value):
"""Return ``value`` unchanged if it is a real (non-bool) number, else ``None``.
``bool`` is rejected on purpose: in Python ``True`` is an ``int`` but it is
never a meaningful count/ratio here.
"""
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
return value
return None
def categorical_cardinality_block(cat: dict, n_rows: int) -> dict:
"""Derive cardinality metrics for one categorical column.
Args:
cat: The per-column dict produced by ``summarize_categorical`` for a
single categorical/text column. Expected (all optional, read
defensively) keys: ``top`` (list of ``{value, count, pct}``),
``mode``, ``mode_pct``, ``n_distinct``, ``entropy`` (Shannon, bits),
``imbalance``, ``len_min``, ``len_mean``, ``len_max``. ``None`` or a
non-dict is treated as ``{}``.
n_rows: Total number of rows in the dataset (used for ``pct_distinct``).
Returns:
Dict with exactly these keys, every one always present:
``n_distinct``, ``n_rows``, ``pct_distinct``, ``entropy``,
``entropy_max``, ``entropy_norm``, ``mode``, ``mode_pct``,
``imbalance``, ``n_singletons``, ``n_singletons_partial``, ``len_min``,
``len_mean``, ``len_max``, ``id_like``, ``dominated``. Values are
``None``/``False`` when not derivable; the function never raises.
"""
cat = cat if isinstance(cat, dict) else {}
# --- passthroughs (numeric-validated, type preserved) ---
n_distinct = _num(cat.get("n_distinct"))
n_rows_out = _num(n_rows)
entropy = _num(cat.get("entropy"))
imbalance = _num(cat.get("imbalance"))
len_min = _num(cat.get("len_min"))
len_mean = _num(cat.get("len_mean"))
len_max = _num(cat.get("len_max"))
mode = cat.get("mode") # any value (or None); passthrough as-is
# --- pct_distinct ---
if n_distinct is None or n_rows_out is None or n_rows_out == 0:
pct_distinct = None
else:
pct_distinct = n_distinct / n_rows_out * 100.0
# --- entropy_max = log2(n_distinct) ---
if n_distinct is None:
entropy_max = None
elif n_distinct > 1:
entropy_max = log2(n_distinct)
else: # n_distinct in {0, 1}
entropy_max = 0.0
# --- entropy_norm = entropy / entropy_max, clipped to [0, 1] ---
if entropy_max is not None and entropy_max > 0 and entropy is not None:
entropy_norm = entropy / entropy_max
entropy_norm = max(0.0, min(1.0, entropy_norm))
else:
entropy_norm = None
# --- mode_pct: prefer cat['mode_pct']; else derive from top[0].pct ---
mode_pct = _num(cat.get("mode_pct"))
top = cat.get("top")
has_top = isinstance(top, (list, tuple)) and len(top) > 0
if mode_pct is None and has_top:
first = top[0]
if isinstance(first, dict):
first_pct = _num(first.get("pct"))
if first_pct is not None:
# Normalize to 0-100: a fraction (<= 1) becomes a percentage.
mode_pct = first_pct * 100.0 if first_pct <= 1 else first_pct
# --- singletons (count == 1) within the visible top ---
if has_top:
n_singletons = sum(
1
for item in top
if isinstance(item, dict) and _num(item.get("count")) == 1
)
else:
n_singletons = None
# The singleton count only covers the visible top; there may be more
# distinct values (and thus more singletons) outside it.
top_len = len(top) if isinstance(top, (list, tuple)) else 0
n_singletons_partial = bool(n_distinct is not None and n_distinct > top_len)
# --- derived flags ---
id_like = pct_distinct is not None and pct_distinct >= 99.0
dominated = mode_pct is not None and mode_pct >= 90.0
return {
"n_distinct": n_distinct,
"n_rows": n_rows_out,
"pct_distinct": pct_distinct,
"entropy": entropy,
"entropy_max": entropy_max,
"entropy_norm": entropy_norm,
"mode": mode,
"mode_pct": mode_pct,
"imbalance": imbalance,
"n_singletons": n_singletons,
"n_singletons_partial": n_singletons_partial,
"len_min": len_min,
"len_mean": len_mean,
"len_max": len_max,
"id_like": id_like,
"dominated": dominated,
}
@@ -0,0 +1,216 @@
"""Tests para categorical_cardinality_block."""
import sys
import os
from math import log2
sys.path.insert(0, os.path.dirname(__file__))
from categorical_cardinality_block import categorical_cardinality_block
# Output contract: every call returns exactly these 16 keys.
EXPECTED_KEYS = {
"n_distinct",
"n_rows",
"pct_distinct",
"entropy",
"entropy_max",
"entropy_norm",
"mode",
"mode_pct",
"imbalance",
"n_singletons",
"n_singletons_partial",
"len_min",
"len_mean",
"len_max",
"id_like",
"dominated",
}
def _sample_cat():
"""A realistic summarize_categorical output for one column."""
return {
"top": [
{"value": "a", "count": 5, "pct": 0.5},
{"value": "b", "count": 3, "pct": 0.3},
{"value": "c", "count": 1, "pct": 0.1},
{"value": "d", "count": 1, "pct": 0.1},
],
"mode": "a",
"mode_pct": 0.5,
"n_distinct": 4,
"entropy": 1.685, # <= log2(4) = 2.0
"imbalance": 5.0,
"len_min": 1,
"len_mean": 1.0,
"len_max": 1,
}
def test_normal_case():
"""Caso normal: pct_distinct, entropy_max=log2(n_distinct), entropy_norm in [0,1], n_singletons."""
cat = _sample_cat()
result = categorical_cardinality_block(cat, n_rows=10)
assert set(result.keys()) == EXPECTED_KEYS
# passthroughs
assert result["n_distinct"] == 4
assert result["n_rows"] == 10
assert result["entropy"] == 1.685
assert result["imbalance"] == 5.0
assert result["mode"] == "a"
assert result["mode_pct"] == 0.5 # passthrough, not normalized
assert result["len_min"] == 1
assert result["len_max"] == 1
# pct_distinct = 4 / 10 * 100
assert abs(result["pct_distinct"] - 40.0) < 1e-12
# entropy_max = log2(4) = 2.0
assert abs(result["entropy_max"] - log2(4)) < 1e-12
assert abs(result["entropy_max"] - 2.0) < 1e-12
# entropy_norm = 1.685 / 2.0 = 0.8425, within [0, 1]
assert abs(result["entropy_norm"] - 1.685 / 2.0) < 1e-12
assert 0.0 <= result["entropy_norm"] <= 1.0
# singletons: c and d have count == 1
assert result["n_singletons"] == 2
# top covers all distinct values (4 == 4)
assert result["n_singletons_partial"] is False
# neither id-like (40%) nor dominated (mode_pct 0.5)
assert result["id_like"] is False
assert result["dominated"] is False
def test_empty_cat_does_not_raise():
"""Caso cat={}: no lanza, claves derivadas None y flags False."""
result = categorical_cardinality_block({}, n_rows=100)
assert set(result.keys()) == EXPECTED_KEYS
for key in (
"n_distinct",
"pct_distinct",
"entropy",
"entropy_max",
"entropy_norm",
"mode",
"mode_pct",
"imbalance",
"n_singletons",
"len_min",
"len_mean",
"len_max",
):
assert result[key] is None
assert result["n_singletons_partial"] is False
assert result["id_like"] is False
assert result["dominated"] is False
# n_rows is a passthrough of the argument, still coherent.
assert result["n_rows"] == 100
def test_none_cat_does_not_raise():
"""Caso cat=None: tratado como {}, mismas garantias que el dict vacio."""
result = categorical_cardinality_block(None, n_rows=None)
assert set(result.keys()) == EXPECTED_KEYS
assert result["n_distinct"] is None
assert result["pct_distinct"] is None
assert result["entropy_max"] is None
assert result["entropy_norm"] is None
assert result["id_like"] is False
assert result["dominated"] is False
def test_n_rows_zero_no_zero_division():
"""Caso n_rows=0: pct_distinct None sin ZeroDivisionError."""
cat = _sample_cat()
result = categorical_cardinality_block(cat, n_rows=0)
assert result["pct_distinct"] is None
# n_distinct still passes through.
assert result["n_distinct"] == 4
assert result["id_like"] is False
def test_id_like_when_distinct_near_rows():
"""id_like True cuando n_distinct ~ n_rows (pct_distinct >= 99)."""
cat = {"n_distinct": 99, "entropy": 6.6, "top": [], "mode": None}
result = categorical_cardinality_block(cat, n_rows=100)
assert abs(result["pct_distinct"] - 99.0) < 1e-12
assert result["id_like"] is True
# exact identity column: 100 / 100 = 100%
cat_full = {"n_distinct": 100, "top": []}
result_full = categorical_cardinality_block(cat_full, n_rows=100)
assert result_full["id_like"] is True
def test_dominated_when_mode_pct_high():
"""dominated True cuando mode_pct alto (>= 90)."""
cat = {
"n_distinct": 3,
"entropy": 0.3,
"mode": "x",
"mode_pct": 95.0,
"top": [
{"value": "x", "count": 95, "pct": 0.95},
{"value": "y", "count": 3, "pct": 0.03},
{"value": "z", "count": 2, "pct": 0.02},
],
"imbalance": 47.5,
}
result = categorical_cardinality_block(cat, n_rows=100)
assert result["mode_pct"] == 95.0
assert result["dominated"] is True
def test_mode_pct_fallback_from_top_fraction():
"""Sin mode_pct: deriva del pct del primer top, fraccion <=1 escala a 0-100."""
cat = {
"n_distinct": 3,
"top": [
{"value": "x", "count": 95, "pct": 0.95},
{"value": "y", "count": 5, "pct": 0.05},
],
}
result = categorical_cardinality_block(cat, n_rows=100)
# 0.95 (fraction) -> 95.0 (percentage)
assert abs(result["mode_pct"] - 95.0) < 1e-12
assert result["dominated"] is True
def test_n_singletons_partial_when_top_truncated():
"""n_distinct > len(top): n_singletons cubre solo el top visible, partial True."""
cat = {
"n_distinct": 10,
"top": [
{"value": "a", "count": 4, "pct": 0.4},
{"value": "b", "count": 1, "pct": 0.1},
{"value": "c", "count": 1, "pct": 0.1},
],
"entropy": 2.5,
}
result = categorical_cardinality_block(cat, n_rows=12)
assert result["n_singletons"] == 2 # only b, c visible
assert result["n_singletons_partial"] is True
def test_single_distinct_value_entropy_norm_none():
"""n_distinct=1: entropy_max=0.0 -> entropy_norm None (no division by zero)."""
cat = {
"n_distinct": 1,
"entropy": 0.0,
"mode": "only",
"mode_pct": 1.0,
"top": [{"value": "only", "count": 7, "pct": 1.0}],
"imbalance": 1.0,
}
result = categorical_cardinality_block(cat, n_rows=7)
assert result["entropy_max"] == 0.0
assert result["entropy_norm"] is None
assert result["n_singletons"] == 0
@@ -0,0 +1,108 @@
---
id: categorical_top_pie_figure_py_datascience
name: categorical_top_pie_figure
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def categorical_top_pie_figure(top: list, n_distinct: int = 0, title: str = \"\", top_k: int = 6, n_rows=None) -> \"matplotlib.figure.Figure\""
description: "Construye una figura matplotlib tipo donut (pie con agujero central) de las top_k categorías más frecuentes de una columna categórica, agregando el resto en un sector gris \"Otros (N categorías)\". Consume el bloque `top` de summarize_categorical y devuelve un matplotlib.figure.Figure listo para rasterizar por el renderer del informe EDA. Backend Agg sin pyplot global; defensivo ante top vacío/None."
tags: [eda, categorical, pie, donut, matplotlib, figure, visualization, datascience, impure]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [matplotlib]
example: |
from categorical_top_pie_figure import categorical_top_pie_figure
top = [
{"value": "rojo", "count": 40, "pct": 0.4},
{"value": "azul", "count": 30, "pct": 0.3},
{"value": "verde", "count": 20, "pct": 0.2},
]
fig = categorical_top_pie_figure(top, n_distinct=12, title="color", top_k=6, n_rows=100)
tested: true
tests:
- "test_returns_figure"
- "test_ten_items_topk_six_yields_seven_wedges"
- "test_empty_top_does_not_raise_and_returns_figure"
- "test_long_value_truncated_in_legend"
- "test_none_value_and_none_count_are_handled"
- "test_n_rows_adds_exact_others_slice"
test_file_path: "python/functions/datascience/categorical_top_pie_figure_test.py"
file_path: "python/functions/datascience/categorical_top_pie_figure.py"
params:
- name: top
desc: "Lista de dicts {value, count, pct} ordenada de mayor a menor por count (salida del bloque `top` de summarize_categorical). Puede venir vacía o con dicts incompletos: items no-dict, sin count, con count None o count <= 0 se descartan. value None se admite (sin etiqueta)."
- name: n_distinct
desc: "Nº total de categorías distintas de la columna. Etiqueta el sector agregado como \"Otros (n_distinct - top_k)\" (mínimo 0). Si no supera el nº de sectores mostrados, se usa el overflow real de `top` como nº de categorías agregadas. Default 0."
- name: title
desc: "Título de la figura (nombre de la columna). Se trunca a ~48 chars con elipsis si es muy largo. Default \"\" (sin título)."
- name: top_k
desc: "Nº máximo de sectores explícitos. Default 6. El sector \"Otros\" no cuenta contra este límite. Con top_k <= 0 se muestra al menos la categoría mayor."
- name: n_rows
desc: "Opcional. Total de filas del dataset. Si se da y la suma de counts mostrados < n_rows, el sector \"Otros\" usa (n_rows - suma_mostrada) como count para que los ángulos sean exactos respecto al total real. Si se omite, \"Otros\" usa la suma de counts fuera del top_k mostrado (solo cuando top trae más de top_k items). Default None."
output: "Un matplotlib.figure.Figure (figsize 6.4x4.0, dpi 150) con un Axes donut (wedgeprops width 0.42) más una leyenda lateral con value truncado a 20 chars + count; el sector \"Otros\" en gris. Anotación central con el total n. Si no hay counts válidos, devuelve igualmente una Figure con un texto centrado \"sin datos categóricos\" (nunca lanza). El caller rasteriza/cierra la figura; la función no la muestra ni la guarda."
---
## Ejemplo
```python
from categorical_top_pie_figure import categorical_top_pie_figure
# `top` es la salida del bloque "top" de summarize_categorical (ya ordenado desc).
top = [
{"value": "rojo", "count": 40, "pct": 0.40},
{"value": "azul", "count": 30, "pct": 0.30},
{"value": "verde", "count": 20, "pct": 0.20},
{"value": "amarillo", "count": 5, "pct": 0.05},
]
fig = categorical_top_pie_figure(
top,
n_distinct=12, # 12 categorías distintas en total
title="color_producto",
top_k=6, # hasta 6 sectores explícitos
n_rows=100, # "Otros" = 100 - 95 = 5, sobre 8 categorías agregadas
)
# El renderer del informe lo rasteriza; aquí solo persistimos para inspección.
fig.savefig("/tmp/donut_color.png")
```
## Cuando usarla
Úsala dentro de un informe EDA cuando quieras visualizar la composición de una
columna categórica de un vistazo: cuántas filas caen en las categorías
dominantes frente a la cola larga. Pásale directamente el bloque `top` de
`summarize_categorical` (ya ordenado de mayor a menor) más `n_distinct` para que
el sector "Otros" indique cuántas categorías quedan agrupadas. Es la pareja
"composición" del gráfico de barras top-k: el donut comunica proporciones del
total, las barras comunican magnitudes comparables.
## Gotchas
- **Impura por matplotlib.** Toca la maquinaria de render. Usa el backend `Agg`
y la API orientada a objetos `Figure`/`add_subplot` — NUNCA `pyplot.*` aquí,
para no tocar el estado global ni filtrar figuras entre llamadas. `pyplot` NO
es thread-safe; esta función evita ese riesgo construyendo el `Figure`
directamente, así que es segura de llamar en bucle desde el renderer.
- **El caller cierra la figura.** La función devuelve el `Figure` pero no lo
muestra ni lo guarda. Quien la consume debe rasterizarla y luego liberarla
(`fig.clf()` / `matplotlib.pyplot.close(fig)` si se usó pyplot en el caller)
para no acumular memoria en lotes grandes de columnas.
- **Pie engaña con muchos sectores.** Por eso `top_k` por defecto es 6 y el
resto se agrega en "Otros": donuts con 15+ sectores son ilegibles. Para
cardinalidad muy alta el donut solo muestra la cabeza de la distribución; la
cola vive en el sector gris.
- **Ángulos exactos solo con `n_rows`.** Sin `n_rows`, el sector "Otros" se
calcula con el overflow presente en `top`; si `top` ya viene recortado a
`top_k` por el productor, no habrá "Otros" aunque existan más categorías. Pasa
`n_rows` (total de filas del dataset) para ángulos correctos respecto al total
real.
- **Defensiva, nunca lanza.** `top=[]`, `value=None`, `count=None` o counts no
numéricos se manejan sin error: en el peor caso devuelve una `Figure` con
"sin datos categóricos". No envuelvas la llamada en try/except por miedo a un
raise — no lo hay.
@@ -0,0 +1,230 @@
"""Impure EDA helper: donut figure of the most common categories (`eda` group).
Builds a matplotlib donut (pie with a central hole) of the ``top_k`` most
frequent categories of a categorical column, folding everything else into a
single "Otros (N categorías)" slice. Returns a ready-to-rasterize
``matplotlib.figure.Figure``; it never shows nor saves it.
Impure because it touches matplotlib's rendering machinery. It uses the headless
Agg backend and the object-oriented ``Figure`` API (no ``pyplot``) so it leaks no
global state and is safe to call repeatedly from a report renderer.
"""
import matplotlib
matplotlib.use("Agg")
from matplotlib.figure import Figure # noqa: E402
# Gray reserved for the aggregated "Otros" slice.
_OTHER_COLOR = "#9e9e9e"
# Muted gray for secondary text (title fallback, center annotation, no-data).
_MUTED_TEXT = "#5f6b7a"
# Pleasant, colour-blind-friendly qualitative palette for the explicit slices.
_PALETTE = [
"#4C72B0",
"#DD8452",
"#55A868",
"#C44E52",
"#8172B3",
"#937860",
"#DA8BC3",
"#8C8C8C",
"#CCB974",
"#64B5CD",
]
def _truncate(text, width: int = 20) -> str:
"""Truncate ``text`` to ``width`` chars, appending an ellipsis if cut."""
s = "" if text is None else str(text)
if len(s) <= width:
return s
if width <= 1:
return s[:width]
return s[: width - 1] + ""
def categorical_top_pie_figure(
top: list,
n_distinct: int = 0,
title: str = "",
top_k: int = 6,
n_rows=None,
) -> "matplotlib.figure.Figure":
"""Build a donut figure of the most common categories of a column.
Renders the ``top_k`` most frequent categories as explicit donut slices and
aggregates every remaining category into a single gray "Otros (N
categorías)" slice. Category names are not painted on the wedges; they are
listed in a lateral legend (truncated value + count) to avoid overlap on
narrow (mobile) figures.
The function is fully defensive: empty input, missing/``None`` values or
counts never raise. When there is nothing valid to draw it still returns a
``Figure`` carrying a centered "sin datos categóricos" message.
Args:
top: List of ``{value, count, pct}`` dicts, already sorted by ``count``
descending (the ``top`` block of ``summarize_categorical``). May be
empty or carry incomplete/``None`` entries; non-dict items, items
without a positive numeric ``count`` and ``None`` counts are skipped.
n_distinct: Total number of distinct categories in the column. Used to
label the aggregated slice as "Otros (n_distinct - top_k)" (floored
at 0). Ignored when it does not exceed the number of shown slices.
title: Figure title (the column name). Truncated when too long.
top_k: Maximum number of explicit slices. Default 6. The "Otros" slice
does not count against this limit.
n_rows: Optional total row count of the dataset. When given and the sum
of shown counts is below ``n_rows``, the "Otros" slice uses
``n_rows - sum_shown`` as its count so the wedge angles are exact
with respect to the real total. When omitted, "Otros" uses the sum
of the counts that fall outside the shown ``top_k`` (only when
``top`` carries more than ``top_k`` items).
Returns:
A ``matplotlib.figure.Figure`` with a single donut Axes plus a lateral
legend. The caller is responsible for rasterizing/closing it.
"""
fig = Figure(figsize=(6.4, 4.0), dpi=150)
ax = fig.add_subplot(111)
safe_title = _truncate(title, 48)
# --- Defensive parse: keep only well-formed {value, count} with count > 0.
cleaned = []
if isinstance(top, list):
for item in top:
if not isinstance(item, dict):
continue
count = item.get("count")
if count is None:
continue
try:
count = float(count)
except (TypeError, ValueError):
continue
if count <= 0:
continue
cleaned.append((item.get("value"), count))
if not cleaned:
ax.axis("off")
ax.text(
0.5,
0.5,
"sin datos categóricos",
ha="center",
va="center",
fontsize=12,
color=_MUTED_TEXT,
transform=ax.transAxes,
)
if safe_title:
ax.set_title(safe_title, fontsize=12, loc="center", pad=8)
fig.tight_layout()
return fig
# --- Split into shown slices and the aggregated remainder.
shown = cleaned[: max(int(top_k), 0)]
if not shown: # top_k <= 0 — show at least the largest category.
shown = cleaned[:1]
sum_shown = sum(c for _, c in shown)
overflow_count = sum(c for _, c in cleaned[len(shown):])
# How many categories are folded into "Otros".
try:
nd = int(n_distinct)
except (TypeError, ValueError):
nd = 0
others_categories = max(nd - len(shown), 0)
# If n_distinct is unknown/too small, fall back to the overflow we actually
# have in `top` beyond the shown slices.
overflow_items = len(cleaned) - len(shown)
if others_categories == 0 and overflow_items > 0:
others_categories = overflow_items
# Count attributed to the "Otros" slice for exact angles.
others_count = 0.0
if n_rows is not None:
try:
total_rows = float(n_rows)
except (TypeError, ValueError):
total_rows = None
if total_rows is not None and total_rows > sum_shown:
others_count = total_rows - sum_shown
if others_count <= 0:
others_count = overflow_count
labels = [v for v, _ in shown]
values = [c for _, c in shown]
colors = [_PALETTE[i % len(_PALETTE)] for i in range(len(shown))]
has_others = others_count > 0 and others_categories > 0
if has_others:
values.append(others_count)
labels.append("Otros")
colors.append(_OTHER_COLOR)
total = sum(values)
def _autopct(pct: float) -> str:
# Hide tiny labels to avoid crowding the wedges.
return f"{pct:.0f}%" if pct >= 5 else ""
wedges, _texts, autotexts = ax.pie(
values,
colors=colors,
startangle=90,
counterclock=False,
wedgeprops={"width": 0.42, "edgecolor": "white", "linewidth": 1.0},
autopct=_autopct,
pctdistance=0.79,
textprops={"fontsize": 8},
)
for at in autotexts:
at.set_color("white")
at.set_fontweight("bold")
ax.set_aspect("equal")
# --- Lateral legend: truncated value + count (+ "(N categorías)" for Otros).
legend_labels = []
for idx, (lab, val) in enumerate(zip(labels, values)):
if has_others and idx == len(labels) - 1:
legend_labels.append(
f"Otros ({others_categories} categorías) — {int(round(val))}"
)
else:
legend_labels.append(f"{_truncate(lab, 20)}{int(round(val))}")
ax.legend(
wedges,
legend_labels,
title="Categorías",
loc="center left",
bbox_to_anchor=(1.02, 0.5),
fontsize=8,
title_fontsize=9,
frameon=False,
)
if safe_title:
ax.set_title(safe_title, fontsize=13, loc="left", pad=10)
# Center annotation: total count covered by the donut.
ax.text(
0,
0,
f"n={int(round(total))}",
ha="center",
va="center",
fontsize=11,
color=_MUTED_TEXT,
fontweight="bold",
)
# Leave room on the right for the legend (avoid clipping it).
fig.subplots_adjust(left=0.02, right=0.62, top=0.88, bottom=0.06)
return fig
@@ -0,0 +1,104 @@
"""Tests para categorical_top_pie_figure (donut de categorías top, grupo eda).
Usa el backend Agg sin pyplot; no muestra ni guarda figuras. Cada test cierra
explícitamente la Figure construida (matplotlib.pyplot.close) para no acumular
estado entre tests.
"""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt # noqa: E402
from matplotlib.figure import Figure # noqa: E402
from categorical_top_pie_figure import categorical_top_pie_figure
def _make_top(n):
"""n items {value, count, pct} ordenados desc por count."""
return [
{"value": f"cat_{i}", "count": n - i, "pct": (n - i) / sum(range(1, n + 1))}
for i in range(n)
]
def _wedges(ax):
"""Devuelve los wedges (sectores) de un Axes con un pie."""
from matplotlib.patches import Wedge
return [p for p in ax.patches if isinstance(p, Wedge)]
def test_returns_figure():
fig = categorical_top_pie_figure(_make_top(3), n_distinct=3, title="col")
assert isinstance(fig, Figure)
plt.close(fig)
def test_ten_items_topk_six_yields_seven_wedges():
top = _make_top(10)
fig = categorical_top_pie_figure(top, n_distinct=10, title="muchas", top_k=6)
ax = fig.axes[0]
wedges = _wedges(ax)
# 6 categorías explícitas + 1 sector "Otros".
assert len(wedges) == 7
plt.close(fig)
def test_empty_top_does_not_raise_and_returns_figure():
fig = categorical_top_pie_figure([], n_distinct=0, title="vacía")
assert isinstance(fig, Figure)
# Sin datos: no debe haber sectores de pie.
assert len(_wedges(fig.axes[0])) == 0
plt.close(fig)
def test_long_value_truncated_in_legend():
long_value = "una_categoria_con_un_nombre_larguisimo_que_excede_el_limite"
top = [
{"value": long_value, "count": 10, "pct": 0.5},
{"value": "corta", "count": 10, "pct": 0.5},
]
fig = categorical_top_pie_figure(top, n_distinct=2, title="col", top_k=6)
ax = fig.axes[0]
legend = ax.get_legend()
assert legend is not None
texts = [t.get_text() for t in legend.get_texts()]
# El valor largo aparece truncado con elipsis y NO en su forma completa.
assert any("" in t for t in texts)
assert long_value not in " ".join(texts)
plt.close(fig)
def test_none_value_and_none_count_are_handled():
top = [
{"value": None, "count": 5, "pct": 0.5},
{"value": "b", "count": None, "pct": 0.0}, # count None -> se descarta
{"value": "c", "count": 5, "pct": 0.5},
]
fig = categorical_top_pie_figure(top, n_distinct=2, title="con nones", top_k=6)
assert isinstance(fig, Figure)
# Solo 2 items válidos, sin overflow -> 2 wedges, sin "Otros".
assert len(_wedges(fig.axes[0])) == 2
plt.close(fig)
def test_n_rows_adds_exact_others_slice():
# 3 categorías mostradas suman 30, dataset real 100 -> "Otros" = 70.
top = _make_top(3) # counts 3,2,1 -> reescalamos abajo
top = [
{"value": "a", "count": 15, "pct": 0.15},
{"value": "b", "count": 10, "pct": 0.10},
{"value": "c", "count": 5, "pct": 0.05},
]
fig = categorical_top_pie_figure(
top, n_distinct=20, title="col", top_k=3, n_rows=100
)
ax = fig.axes[0]
# 3 explícitas + Otros.
assert len(_wedges(ax)) == 4
legend_texts = [t.get_text() for t in ax.get_legend().get_texts()]
# El sector Otros refleja n_distinct - top_k = 17 categorías y count 70.
assert any("Otros (17 categorías)" in t and "70" in t for t in legend_texts)
plt.close(fig)