"""Aggregation chapter (AGREGACION) — group analysis / OLAP of the EDA. This chapter is the group-by / pivot ("OLAP") section of an AutomaticEDA report and is meant to be present **whenever the dataset has at least one low-cardinality categorical column to group by**. For the most interesting categoricals (chosen by their cardinality/relevance, optionally with an LLM) it renders, as blocks the core paginator never cuts: 1. **Per-group statistics** (split-apply-combine) — for each interesting categorical key, the count of rows per group and, for each numeric measure, its mean/median/std/min/max. One compact summary table (mean of every measure per group) plus a per-measure detail table. 2. **Bar charts** — a vertical bar chart of a measure's mean per group, bars from zero (Tufte Lie-Factor = 1). 3. **Pivot tables** — categorical A x categorical B -> aggregate of a measure, limited to the top rows/cols so it fits a mobile page/slide, with a grouped bar chart of the same pivot. The raw data needed to aggregate is **not** in the TableProfile, so — exactly like ``modelos`` reads its cluster projection from ``ctx`` — this chapter gets the aggregation results in one of two ways and degrades honestly when neither is available: ctx keys this chapter consumes (all optional): aggregations : dict — pre-computed results, used directly (offline / tests / forward-compatible with a calculation phase). Shape:: {"groupby": [{"group_by": str, "measures": [str], "why": str, "result": }], "pivots": [{"index": str, "columns": str, "value": str, "agg": str, "why": str, "result": }]} db_path, table : str — when ``aggregations`` is absent, the chapter selects the interesting keys (``select_groupby_keys``), optionally asks an LLM which to show (``suggest_aggregations_llm`` when ``run_agg_llm`` is True) and computes the group-by/pivot results live via the push-down registry functions ``groupby_stats_duckdb`` / ``pivot_table_duckdb``. run_agg_llm : bool — when True (and ``db_path``/``table`` present), let the LLM pick the interesting aggregations; otherwise the deterministic quantitative selection is used. agg_llm_model : str — model id for the optional LLM selection. agg_max_keys, agg_max_card, agg_max_measures, agg_top_n : int — limits. agg_insights : list — optional pre-computed micro-analysis entries (``[{"title": str, "text": str}]``) rendered as an interpretation section. Contract: build_(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z". Reads everything defensively (``.get``) and never raises: anything missing degrades to a note instead of aborting the chapter; the chapter returns ``None`` only when the dataset has no categorical column to group by. """ from __future__ import annotations from .. import model # Pure/impure registry functions (group ``eda``) this chapter composes. Imported # defensively so the chapter still builds (degrading the affected part to a note) # if a function is somehow unavailable / not indexed yet. try: from datascience.select_groupby_keys import select_groupby_keys except Exception: # noqa: BLE001 — keep the chapter importable no matter what. select_groupby_keys = None # type: ignore[assignment] try: from datascience.groupby_stats_duckdb import groupby_stats_duckdb except Exception: # noqa: BLE001 groupby_stats_duckdb = None # type: ignore[assignment] try: from datascience.pivot_table_duckdb import pivot_table_duckdb except Exception: # noqa: BLE001 pivot_table_duckdb = None # type: ignore[assignment] try: from datascience.suggest_aggregations_llm import suggest_aggregations_llm except Exception: # noqa: BLE001 suggest_aggregations_llm = None # type: ignore[assignment] CHAPTER_VERSION = "1.0.0" CHAPTER_ID = "agregacion" CHAPTER_TITLE = "Agregación por grupos" # Tableau-10 palette — stable colours for the pivot's grouped-bar series. _SERIES_COLORS = [ "#4e79a7", "#f28e2b", "#e15759", "#76b7b2", "#59a14f", "#edc948", "#b07aa1", "#ff9da7", "#9c755f", "#bab0ac", ] # Defaults for the live selection/aggregation (overridable via ctx). _DEF_MAX_KEYS = 3 _DEF_MAX_CARD = 20 _DEF_MAX_MEASURES = 4 _DEF_TOP_N = 12 # --------------------------------------------------------------------------- # # Formatting helpers (mirror the other chapters' defensive style). # --------------------------------------------------------------------------- # def _fmt_num(value, decimals: int = 3) -> str: if value is None: return "—" if isinstance(value, bool): return "sí" if value else "no" if isinstance(value, int): return f"{value:,}".replace(",", ".") if isinstance(value, float): if value != value: # NaN return "NaN" if value in (float("inf"), float("-inf")): return str(value) text = f"{value:.{decimals}f}".rstrip("0").rstrip(".") return text if text else "0" return model._safe_str(value) def _is_dict(v) -> bool: return isinstance(v, dict) def _measure_mean(group: dict, measure: str): """Pull the mean of one measure out of a groupby-result group entry.""" stats = group.get("stats") if _is_dict(group.get("stats")) else {} ms = stats.get(measure) if _is_dict(stats.get(measure)) else {} return ms.get("mean") # --------------------------------------------------------------------------- # # Plan + data resolution. Either a pre-computed ctx['aggregations'] is used # verbatim, or the plan is selected and the results are computed live. # --------------------------------------------------------------------------- # def _resolve_candidates(profile: dict, ctx: dict) -> dict: """Return {group_keys, measures, pivots, note} of interesting columns.""" pre = ctx.get("agg_candidates") if _is_dict(pre) and pre.get("group_keys") is not None: return pre if select_groupby_keys is not None: try: out = select_groupby_keys( profile, max_keys=int(ctx.get("agg_max_keys", _DEF_MAX_KEYS)), max_card=int(ctx.get("agg_max_card", _DEF_MAX_CARD)), max_measures=int(ctx.get("agg_max_measures", _DEF_MAX_MEASURES)), ) if _is_dict(out): return out except Exception: # noqa: BLE001 — fall through to the inline fallback. pass return _inline_candidates(profile, ctx) def _inline_candidates(profile: dict, ctx: dict) -> dict: """Minimal defensive selection when select_groupby_keys is unavailable.""" max_card = int(ctx.get("agg_max_card", _DEF_MAX_CARD)) max_keys = int(ctx.get("agg_max_keys", _DEF_MAX_KEYS)) max_measures = int(ctx.get("agg_max_measures", _DEF_MAX_MEASURES)) keys = profile.get("key_candidates") or [] group_keys, measures = [], [] for col in profile.get("columns") or []: if not _is_dict(col): continue name = col.get("name") it = col.get("inferred_type") flags = col.get("flags") or [] dc = col.get("distinct_count") if it in ("categorical", "boolean") and name not in keys: if ("possible_id" not in flags and "high_cardinality" not in flags and "constant" not in flags and isinstance(dc, int) and 2 <= dc <= max_card): group_keys.append({"col": name, "cardinality": dc, "score": 0.0}) elif it == "numeric": num = col.get("numeric") or {} if num.get("std") not in (None, 0) and not ( "possible_id" in flags and (col.get("unique_pct") or 0) >= 0.99): measures.append(name) group_keys = group_keys[:max_keys] measures = measures[:max_measures] pivots = [] if len(group_keys) >= 2: pivots.append({"index": group_keys[0]["col"], "columns": group_keys[1]["col"], "value": measures[0] if measures else None}) return {"group_keys": group_keys, "measures": measures, "pivots": pivots, "note": "selección cuantitativa básica"} def _resolve_plan(profile: dict, ctx: dict, candidates: dict) -> dict: """Return {aggregations:[{group_by,measures,why}], pivots:[...], source}.""" group_keys = candidates.get("group_keys") or [] measures = candidates.get("measures") or [] if ctx.get("run_agg_llm") and suggest_aggregations_llm is not None: try: plan = suggest_aggregations_llm( profile, candidates, max_aggs=int(ctx.get("agg_max_keys", _DEF_MAX_KEYS)), model=ctx.get("agg_llm_model", "claude-haiku-4-5-20251001")) if _is_dict(plan) and plan.get("aggregations"): return {"aggregations": plan.get("aggregations") or [], "pivots": plan.get("pivots") or [], "source": plan.get("source", "llm")} except Exception: # noqa: BLE001 — fall back to the quantitative plan. pass aggregations = [{ "group_by": gk.get("col"), "measures": measures, "why": f"categórica de {_fmt_num(gk.get('cardinality'))} niveles", } for gk in group_keys if _is_dict(gk) and gk.get("col")] pivots = [] for pv in candidates.get("pivots") or []: if _is_dict(pv) and pv.get("index") and pv.get("columns"): pivots.append({"index": pv.get("index"), "columns": pv.get("columns"), "value": pv.get("value") or (measures[0] if measures else None), "agg": "mean", "why": "cruce de dos categóricas"}) return {"aggregations": aggregations, "pivots": pivots, "source": "quantitative"} def _live_groupby(ctx: dict, group_by: str, measures: list, top_n: int): """Compute one group-by result live via the push-down registry function.""" db_path = ctx.get("db_path") table = ctx.get("table") if not db_path or not table or groupby_stats_duckdb is None: return None try: out = groupby_stats_duckdb(db_path, table, group_by, list(measures or []), top_n=top_n) if _is_dict(out) and out.get("status") == "ok": return out except Exception: # noqa: BLE001 return None return None def _live_pivot(ctx: dict, index: str, columns: str, value, agg: str): """Compute one pivot live via the push-down registry function.""" db_path = ctx.get("db_path") table = ctx.get("table") if not db_path or not table or pivot_table_duckdb is None or not value: return None try: out = pivot_table_duckdb(db_path, table, index, columns, value, agg=agg or "mean") if _is_dict(out) and out.get("status") == "ok": return out except Exception: # noqa: BLE001 return None return None # --------------------------------------------------------------------------- # # Figure builders (lazy: matplotlib only imported when the renderer draws them). # --------------------------------------------------------------------------- # def _make_group_bars(group_by: str, measure: str, groups: list): """Vertical bars: mean of ``measure`` per group, bars from zero.""" labels, values = [], [] for g in groups: if not _is_dict(g): continue mean = _measure_mean(g, measure) if mean is None: continue labels.append(model._safe_str(g.get("key"))) values.append(float(mean)) if not labels: return None def _draw(): import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt fig, ax = plt.subplots(figsize=(6.6, 3.6)) xs = list(range(len(labels))) ax.bar(xs, values, color="#4e79a7", alpha=0.9, edgecolor="#2f4d6e", linewidth=0.4) ax.set_xticks(xs) short = [(s[:18] + "…") if len(s) > 19 else s for s in labels] rot = 30 if max((len(s) for s in short), default=0) > 6 else 0 ax.set_xticklabels(short, rotation=rot, ha="right" if rot else "center", fontsize=7) ax.set_ylabel(f"media de {measure}", fontsize=8) ax.set_xlabel(group_by, fontsize=8) ax.set_title(f"Media de «{measure}» por «{group_by}»", fontsize=10) ax.grid(axis="y", color="#dddddd", linewidth=0.6) for spine in ("top", "right"): ax.spines[spine].set_visible(False) # Value labels above each bar. vmax = max(values) if values else 0 for x, v in zip(xs, values): ax.text(x, v + (abs(vmax) * 0.01 if vmax else 0.01), _fmt_num(v, 2), ha="center", va="bottom", fontsize=6.5) fig.tight_layout() return fig return _draw def _make_pivot_bars(pivot: dict): """Grouped bars of a pivot: x = row_labels, one series per col_label.""" row_labels = pivot.get("row_labels") or [] col_labels = pivot.get("col_labels") or [] matrix = pivot.get("matrix") or [] if not row_labels or not col_labels or not matrix: return None def _draw(): import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt n_rows = len(row_labels) n_cols = len(col_labels) fig, ax = plt.subplots(figsize=(6.8, 3.8)) total_w = 0.8 bar_w = total_w / max(n_cols, 1) base = list(range(n_rows)) for j, clabel in enumerate(col_labels): offs = [b - total_w / 2 + bar_w * (j + 0.5) for b in base] vals = [] for i in range(n_rows): cell = matrix[i][j] if (i < len(matrix) and j < len(matrix[i])) else None vals.append(float(cell) if isinstance(cell, (int, float)) else 0.0) color = _SERIES_COLORS[j % len(_SERIES_COLORS)] ax.bar(offs, vals, width=bar_w, color=color, alpha=0.9, label=model._safe_str(clabel)) ax.set_xticks(base) short = [(s[:16] + "…") if len(s) > 17 else s for s in (model._safe_str(r) for r in row_labels)] rot = 30 if max((len(s) for s in short), default=0) > 6 else 0 ax.set_xticklabels(short, rotation=rot, ha="right" if rot else "center", fontsize=7) ax.set_xlabel(model._safe_str(pivot.get("index")), fontsize=8) ax.set_ylabel(f"{pivot.get('agg','mean')} de {pivot.get('value')}", fontsize=8) ax.set_title(f"{pivot.get('index')} × {pivot.get('columns')}", fontsize=10) ax.grid(axis="y", color="#dddddd", linewidth=0.6) ax.legend(title=model._safe_str(pivot.get("columns")), fontsize=6.5, title_fontsize=7, frameon=True, framealpha=0.9, loc="best") for spine in ("top", "right"): ax.spines[spine].set_visible(False) fig.tight_layout() return fig return _draw def _group_bars_maker(group_by: str, measure: str, groups: list): """Bind per-aggregation args so the lazy closure is loop-safe.""" def _make(): return _make_group_bars(group_by, measure, groups)() return _make def _pivot_bars_maker(pivot: dict): def _make(): return _make_pivot_bars(pivot)() return _make # --------------------------------------------------------------------------- # # Section builders. Each returns a list of blocks (possibly empty). # --------------------------------------------------------------------------- # def _groupby_section(group_by: str, measures: list, result: dict, why: str) -> list: """Build the blocks for one group-by aggregation, or [] if unusable.""" if not _is_dict(result) or not result.get("groups"): return [] groups = [g for g in result.get("groups") or [] if _is_dict(g)] if not groups: return [] eff_measures = result.get("measures") or measures or [] blocks = [model.Heading(text=f"Agrupado por «{group_by}»", level=2)] intro = f"**{why}.** " if why else "" intro += (f"{_fmt_num(result.get('n_groups') or len(groups))} grupos" f"{' (top por tamaño)' if result.get('truncated') else ''}.") blocks.append(model.Markdown(text=intro)) # Summary table: one row per group, count + mean of every measure. header = ["Grupo", "n"] + [f"{m} (media)" for m in eff_measures] rows = [] for g in groups: row = [model._safe_str(g.get("key")), _fmt_num(g.get("n"))] for m in eff_measures: row.append(_fmt_num(_measure_mean(g, m), 2)) rows.append(row) blocks.append(model.DataTable( header=header, rows=rows, title=f"Resumen por «{group_by}»", note="Conteo de filas y media de cada medida por grupo.")) if not eff_measures: return blocks # Primary measure: a bar chart + a detail table (mean/median/std/min/max). primary = eff_measures[0] bars = _make_group_bars(group_by, primary, groups) if bars is not None: blocks.append(model.Figure( make=_group_bars_maker(group_by, primary, groups), caption=f"Media de «{primary}» por «{group_by}» (barras desde cero).")) det_header = ["Grupo", "n", "media", "mediana", "σ", "mín", "máx"] det_rows = [] for g in groups: stats = g.get("stats") if _is_dict(g.get("stats")) else {} ms = stats.get(primary) if _is_dict(stats.get(primary)) else {} det_rows.append([ model._safe_str(g.get("key")), _fmt_num(g.get("n")), _fmt_num(ms.get("mean"), 2), _fmt_num(ms.get("median"), 2), _fmt_num(ms.get("std"), 2), _fmt_num(ms.get("min"), 2), _fmt_num(ms.get("max"), 2), ]) blocks.append(model.DataTable( header=det_header, rows=det_rows, title=f"Detalle de «{primary}» por «{group_by}»")) return blocks def _pivot_section(pivot_spec: dict, result: dict) -> list: """Build the blocks for one pivot table, or [] if unusable.""" if not _is_dict(result) or not result.get("row_labels"): return [] row_labels = result.get("row_labels") or [] col_labels = result.get("col_labels") or [] matrix = result.get("matrix") or [] if not row_labels or not col_labels or not matrix: return [] index = result.get("index") or pivot_spec.get("index") columns = result.get("columns") or pivot_spec.get("columns") value = result.get("value") or pivot_spec.get("value") agg = result.get("agg") or pivot_spec.get("agg") or "mean" why = pivot_spec.get("why") or "" blocks = [model.Heading(text=f"Pivot: «{index}» × «{columns}»", level=2)] intro = f"**{why}.** " if why else "" intro += (f"{agg} de «{value}» cruzando «{index}» (filas) y «{columns}» " f"(columnas).") if result.get("truncated_rows") or result.get("truncated_cols"): intro += " Limitado a las filas/columnas más frecuentes." blocks.append(model.Markdown(text=intro)) header = [model._safe_str(index)] + [model._safe_str(c) for c in col_labels] rows = [] for i, rlabel in enumerate(row_labels): row = [model._safe_str(rlabel)] cells = matrix[i] if i < len(matrix) else [] for j in range(len(col_labels)): cell = cells[j] if j < len(cells) else None row.append(_fmt_num(cell, 2)) rows.append(row) blocks.append(model.DataTable( header=header, rows=rows, title=f"{agg} de «{value}»", note=f"Cada celda es {agg} de «{value}» para esa combinación.")) fig_pivot = {"row_labels": row_labels, "col_labels": col_labels, "matrix": matrix, "index": index, "columns": columns, "value": value, "agg": agg} if _make_pivot_bars(fig_pivot) is not None: blocks.append(model.Figure( make=_pivot_bars_maker(fig_pivot), caption=f"{agg} de «{value}» por «{index}» y «{columns}» " f"(barras agrupadas).")) return blocks def _insights_section(ctx: dict) -> list: """Optional pre-computed micro-analysis of the aggregations (SHOULD-11.4).""" entries = ctx.get("agg_insights") if not isinstance(entries, list) or not entries: return [] blocks = [model.Heading(text="Interpretación de los grupos", level=2)] for e in entries: if not _is_dict(e): continue title = model._safe_str(e.get("title")) text = model._safe_str(e.get("text")) line = (f"**{title}.** " if title else "") + text if line.strip(): blocks.append(model.Markdown(text=line)) return blocks if len(blocks) > 1 else [] # --------------------------------------------------------------------------- # # Pre-computed path: ctx['aggregations'] already carries the results. # --------------------------------------------------------------------------- # def _sections_from_precomputed(agg: dict) -> list: sections = [] for entry in agg.get("groupby") or []: if not _is_dict(entry): continue sections += _groupby_section( entry.get("group_by"), entry.get("measures") or [], entry.get("result") or {}, entry.get("why") or "") for entry in agg.get("pivots") or []: if not _is_dict(entry): continue sections += _pivot_section(entry, entry.get("result") or {}) return sections # --------------------------------------------------------------------------- # # Live path: select keys, pick a plan, compute results via push-down functions. # --------------------------------------------------------------------------- # def _sections_live(profile: dict, ctx: dict, candidates: dict) -> list: top_n = int(ctx.get("agg_top_n", _DEF_TOP_N)) plan = _resolve_plan(profile, ctx, candidates) sections = [] for agg in plan.get("aggregations") or []: if not _is_dict(agg) or not agg.get("group_by"): continue result = _live_groupby(ctx, agg.get("group_by"), agg.get("measures") or [], top_n) if result is not None: sections += _groupby_section(agg.get("group_by"), agg.get("measures") or [], result, agg.get("why") or "") for pv in plan.get("pivots") or []: if not _is_dict(pv) or not pv.get("index") or not pv.get("columns"): continue result = _live_pivot(ctx, pv.get("index"), pv.get("columns"), pv.get("value"), pv.get("agg") or "mean") if result is not None: sections += _pivot_section(pv, result) return sections # --------------------------------------------------------------------------- # # Entry point. # --------------------------------------------------------------------------- # def _intro_blocks() -> list: text = ( "Este capítulo analiza la tabla **por grupos** (split-apply-combine): " "elige las columnas categóricas más informativas — por su cardinalidad " "y relevancia, no todas contra todas, para no inflar comparaciones " "espurias — y resume las variables numéricas dentro de cada grupo " "(conteo, media, mediana, desviación). Las **tablas dinámicas** (pivot) " "cruzan dos categóricas sobre una medida, y los **gráficos de barras** " "(siempre desde cero) comparan los grupos de un vistazo." ) return [model.Heading(text=CHAPTER_TITLE, level=1), model.Markdown(text=text)] def build_agregacion(profile: dict, ctx: dict): """Build the AGREGACION Chapter, or None if the dataset can't be grouped. Args: profile: the ``eda`` group TableProfile dict. ctx: presentation context (see module docstring for the keys consumed). Returns: A ``model.Chapter`` with per-group stats, pivots and bar charts; or ``None`` when the dataset has no low-cardinality categorical column to group by (the chapter does not apply). """ profile = profile or {} ctx = ctx or {} if not isinstance(profile, dict): return None # Pre-computed results take precedence (offline / tests / forward-compat). pre = ctx.get("aggregations") if _is_dict(pre) and (pre.get("groupby") or pre.get("pivots")): sections = _sections_from_precomputed(pre) if not sections: return None blocks = _intro_blocks() + sections + _insights_section(ctx) return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE, version=CHAPTER_VERSION, blocks=blocks) # Live path: needs at least one categorical key to group by. candidates = _resolve_candidates(profile, ctx) if not _is_dict(candidates) or not (candidates.get("group_keys")): return None # chapter does not apply: nothing to group by. sections = _sections_live(profile, ctx, candidates) if not sections: # Applies (there are categorical keys) but no aggregation data is # reachable: emit an honest note instead of fabricating numbers. keys = ", ".join(model._safe_str((k or {}).get("col")) for k in candidates.get("group_keys") or [] if _is_dict(k)) note = model.Note( "No se pudo calcular la agregación: el capítulo necesita los datos " "crudos. Pasa ctx['db_path'] + ctx['table'] (para el cálculo " "push-down en DuckDB) o ctx['aggregations'] ya precalculado. " f"Columnas categóricas candidatas: {keys or '—'}.") blocks = _intro_blocks() + [note] + _insights_section(ctx) return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE, version=CHAPTER_VERSION, blocks=blocks) blocks = _intro_blocks() + sections + _insights_section(ctx) return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE, version=CHAPTER_VERSION, blocks=blocks)