Compare commits

..

1 Commits

Author SHA1 Message Date
Egutierrez 8917105184 chore(infra): audit_uses_functions detecta mejor simbolos Go con abreviaturas (issue 0057)
Reduce falsos positivos en la deteccion de simbolos Go del auditor
uses_functions, por dos vias complementarias.

Fase 1 — commonAbbrevs ampliado: anade abreviaturas verificadas contra los
nombres reales de las funciones Go del registry (OHLCV, DuckDB, ClickHouse,
NordVPN, SHA256, MD5, ANSI, CIDR, AEAD, PTY, VPS, WG, VT, FFT, EMA, RSI, SMA,
VWAP, AX, E2E, URLs). El analisis empirico mostro que reduce los mismatches
PascalCase-vs-real de 76 a 40 sin romper ninguna funcion. Se documenta por que
NO se mapean "cdp" (el registry usa Cdp: CdpGetHTML, CdpNavigate) ni "pdf"
(inconsistente: CdpPrintPDF vs PdfSimpleReport) — anadirlos generaria mas
falsos positivos de los que arregla.

Fase 2 — fallback a lectura del .go: cuando ni la signature ni PascalCase(name)
localizan el simbolo, se lee el .go de la funcion del registry y se extrae el
primer func exportado top-level (cache por ejecucion para no reabrir archivos).
El fallback esta GATEADO a signature vacia: cuando la signature ya aporta un
`func <Name>` es la fuente de verdad y no se sobreescribe. Esto evita la
mis-atribucion en archivos .go compartidos por varias funciones (patron "TU
adicional", p.ej. cdp_new_tab vive en cdp_list_tabs.go): sin el gate, el primer
func del archivo (CdpListTabs) se atribuiria a cada hermano y suprimiria
hallazgos reales de "unused".

Verificacion (DoD):
- go build -tags fts5 + go vet limpios.
- Tests nuevos: TestSnakeToPascal_HandlesAbbreviations (golden + non-mappings
  cdp/pdf), TestAuditUsesFunctions_GoFileFallback (golden + error sin archivo),
  TestAuditUsesFunctions_SharedGoFileNotMisattributed (regresion del archivo
  compartido), TestGoRealExportedName (top-level/generic/missing/empty).
- A/B contra el registry real (fn doctor uses-functions): baseline 69 unused vs
  nuevo 69, cero regresion; cdp_get_html_go_browser sigue sin marcarse unused en
  script_navegador (Fase 3.1).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 13:14:27 +02:00
32 changed files with 424 additions and 4649 deletions
+14 -35
View File
@@ -3,11 +3,11 @@ name: launch_fleetclaude
kind: function
lang: bash
domain: infra
version: "1.6.0"
version: "1.5.0"
purity: impure
signature: "launch_fleetclaude [--cwd <dir>] [--bin <path>] [--session <name>] [--reuse] [--cols <n>]"
description: "Entrypoint de FleetView: abre una ventana de terminal con una sesion tmux (socket aislado por perfil) de dos panes (TUI fleetview a la izquierda, claude --dangerously-skip-permissions a la derecha) para centralizar la flota de Claudes. La terminal se AUTO-DETECTA sin config por PC: kitty si esta instalado y hay display ($DISPLAY/$WAYLAND_DISPLAY), si no Windows Terminal (wt.exe) en WSL adjuntando via wsl.exe. El pane de la TUI corre dentro del bucle supervisor supervise_fleetview_tui, que la relanza si muere (crash/panic/kill), asi el panel de control NUNCA se pierde. Soporta PERFILES multiples: sin --session/--reuse cada invocacion abre un perfil nuevo (fleet, fleet2, fleet3, ...) con su propia flota; inyecta FLEET_SOCKET/FLEET_SESSION a la TUI para que cada panel vea solo sus Claudes. Instala atajos alt+flechas/alt+enter/alt+n que controlan la TUI desde cualquier pane, y fija el ancho del sidebar con hooks."
tags: [claude-fleet, infra, kitty, tmux, claude, fleetview, launcher, wsl, windows-terminal]
description: "Entrypoint de FleetView: abre una ventana kitty con una sesion tmux (socket aislado por perfil) de dos panes (TUI fleetview a la izquierda, claude --dangerously-skip-permissions a la derecha) para centralizar la flota de Claudes. El pane de la TUI corre dentro del bucle supervisor supervise_fleetview_tui, que la relanza si muere (crash/panic/kill), asi el panel de control NUNCA se pierde. Soporta PERFILES multiples: sin --session/--reuse cada invocacion abre un perfil nuevo (fleet, fleet2, fleet3, ...) con su propia flota; inyecta FLEET_SOCKET/FLEET_SESSION a la TUI para que cada panel vea solo sus Claudes. Instala atajos alt+flechas/alt+enter/alt+n que controlan la TUI desde cualquier pane, y fija el ancho del sidebar con hooks."
tags: [claude-fleet, infra, kitty, tmux, claude, fleetview, launcher]
params:
- name: --cwd
desc: "Directorio de trabajo de ambos panes tmux. Opcional. Default: raiz del repo fn_registry, derivada dinamicamente via git rev-parse desde la ubicacion del script (sin hardcodear paths de usuario)."
@@ -19,7 +19,7 @@ params:
desc: "Reattach al perfil principal 'fleet' en vez de abrir uno nuevo. Opcional. Recupera el comportamiento idempotente clasico (volver a invocar NO duplica la flota, reusa la existente)."
- name: --cols
desc: "Ancho en columnas del pane izquierdo (la TUI). Opcional. Default: 40."
output: "Crea/reutiliza una sesion tmux detached con dos panes y lanza una ventana de terminal 'FleetView' adjunta a ella (kitty o Windows Terminal segun auto-deteccion), desacoplada del shell padre. Imprime el estado por stdout. Sin valor de retorno; exit 0 en exito."
output: "Crea/reutiliza una sesion tmux detached con dos panes y lanza una ventana kitty 'FleetView' adjunta a ella, desacoplada del shell padre (setsid). Imprime el estado por stdout. Sin valor de retorno; exit 0 en exito."
uses_functions:
- supervise_fleetview_tui_bash_infra
uses_types: []
@@ -49,7 +49,7 @@ launch_fleetclaude --reuse
launch_fleetclaude --session trabajo --cols 50
```
Tras invocarlo aparece una ventana de terminal titulada `FleetView (<perfil>)` con dos
Tras invocarlo aparece una ventana kitty titulada `FleetView (<perfil>)` con dos
panes lado a lado: a la izquierda la TUI `fleetview`, a la derecha una sesion de
`claude --dangerously-skip-permissions`. Cada perfil es un socket+sesion tmux
aislados con su propia flota: puedes tener varias FleetView abiertas a la vez.
@@ -78,24 +78,12 @@ al retomar el trabajo en el repo `fn_registry`.
`respawn-pane` de alt+R y los Claude nuevos hereden el socket). `main.go` los
lee con fallback a `fleet`. Por eso cada panel ve SOLO los Claude de su perfil
(cruza la lista del sistema con los panes de su socket).
- **Auto-deteccion de terminal (sin config por PC)**: en la ruta ventana-nueva el
launcher elige terminal solo. (1) `kitty` instalado **y** display usable
(`$DISPLAY`/`$WAYLAND_DISPLAY`) → kitty (escritorio Linux nativo o WSLg con
kitty). (2) Si no, WSL con `wt.exe` en el PATH → Windows Terminal ejecutando
`wsl.exe [-d $WSL_DISTRO_NAME] -- bash -lic 'tmux -L <perfil> attach ...'`.
(3) Ninguna → error con las salidas posibles. Asi el MISMO `fleetclaude`
funciona en un PC con kitty y en otro WSL sin kitty, cada uno elige su
terminal. Causa raiz del sintoma "se lanza la flota pero no se ve": kitty no
instalado en WSL hacia que la sesion tmux se creara sin ventana que la mostrara.
- **Dentro de tmux abre ventana nueva**: si invocas `fleetclaude` desde dentro de
una sesion tmux (`$TMUX` definido), NO hace `attach` anidado (rompe / avisa de
nesting); cae a la ruta ventana-nueva (auto-deteccion de terminal). Fuera de
tmux y con TTY, reutiliza la terminal actual con `exec tmux attach`.
- **kitty detached (setsid)**: la ventana kitty se lanza con `setsid ... &` para
sobrevivir al cierre de la terminal que la invoco. La ventana de Windows
Terminal (wt.exe) ya es un proceso Windows independiente del arbol Linux, asi
que sobrevive sola (se lanza con `&`+`disown` desde un subshell con cwd `/mnt/c`
para evitar el warning de wt.exe por cwd UNC `\\wsl.localhost\...`).
nesting); cae a la ruta kitty y abre una ventana nueva. Fuera de tmux y con
TTY, reutiliza la terminal actual con `exec tmux attach`.
- **kitty detached (setsid)**: la ventana se lanza con `setsid ... &` para
sobrevivir al cierre de la terminal que la invoco. No bloquea al shell padre.
- **TUI bajo supervisor (auto-respawn)**: el pane izquierdo NO corre un
`exec fleetview` de una sola vida, sino `supervise_fleetview_tui` (bucle que
relanza la TUI si muere por crash/panic/kill). Asi el panel de control nunca se
@@ -128,23 +116,14 @@ al retomar el trabajo en el repo `fn_registry`.
- **Ancho del sidebar via hooks**: `client-resized` y `window-layout-changed`
re-fijan el pane 0 (TUI) a `--cols` columnas, porque el `attach` de kitty y el
conmutar de Claude redistribuyen el espacio.
- **tmux siempre; terminal (kitty/wt.exe) solo sin TTY**: `tmux` es obligatorio
(aborta != 0 si falta). Una terminal nueva (kitty o Windows Terminal) solo se
necesita en la ruta sin-TTY (dentro de tmux, atajo de escritorio, cron, script),
donde abre una ventana nueva. Invocado desde una terminal interactiva fuera de
tmux (el caso normal del alias `fleetclaude`), reutiliza la terminal actual con
`exec tmux attach` y no necesita ni kitty ni wt.exe.
- **tmux siempre, kitty solo sin TTY**: `tmux` es obligatorio (aborta != 0 si
falta). `kitty` solo se necesita en la ruta sin-TTY (atajo de escritorio, cron,
script), donde abre una ventana nueva. Invocado desde una terminal interactiva
(el caso normal del alias `fleetclaude`), reutiliza la terminal actual con
`exec tmux attach` y NO necesita kitty — util en WSL u hosts sin kitty.
## Capability growth log
- v1.6.0 (2026-06-29) — **auto-deteccion de terminal (kitty ↔ Windows Terminal)**.
La ruta ventana-nueva ya no asume kitty: elige terminal segun el host. kitty si
esta instalado y hay display (`$DISPLAY`/`$WAYLAND_DISPLAY`); si no, en WSL abre
Windows Terminal (`wt.exe`) ejecutando `wsl.exe [-d $WSL_DISTRO_NAME] -- bash
-lic 'tmux ... attach'`. Mismo `fleetclaude` en un PC con kitty y en otro WSL
sin kitty. Arregla el sintoma "se lanza la flota pero no se ve": en WSL sin
kitty la sesion tmux se creaba pero ninguna ventana la mostraba. wt.exe se
lanza desde un subshell con cwd `/mnt/c` para evitar el warning por cwd UNC.
- v1.5.0 (2026-06-24) — **auto-respawn de la TUI**. El pane izquierdo ya no corre
`exec fleetview` (una sola vida), sino el bucle supervisor
`supervise_fleetview_tui`, que relanza la TUI si muere (crash/panic/kill de su
+13 -43
View File
@@ -294,61 +294,31 @@ USAGE
$T set-hook -g window-layout-changed "resize-pane -t $left_pane -x $cols"
# -----------------------------------------------------------------------
# Adjuntar la sesion en una terminal, DESACOPLADA del shell padre para que
# no muera al cerrar la terminal invocadora.
# Lanzar kitty adjuntando la sesion, DESACOPLADA del shell padre con
# setsid, para que no muera al cerrar la terminal invocadora.
# (Mismo patron que reboot_all_claudes para relanzar terminales.)
# -----------------------------------------------------------------------
# Adjuntar la sesion:
# - Terminal interactiva y FUERA de tmux: convertir ESA terminal en el
# panel FleetView (exec reemplaza el proceso; al hacer detach vuelve la
# shell). Asi `fleetclaude` no abre otra ventana: usa la actual.
# - DENTRO de tmux (o sin TTY: atajo de escritorio, cron, script): abrir
# una ventana de terminal NUEVA desacoplada. No hacemos `attach`
# una ventana kitty nueva desacoplada (setsid). No hacemos `attach`
# anidado dentro de otra sesion tmux (rompe / da el warning de nesting).
if [ -t 0 ] && [ -t 1 ] && [ -z "${TMUX:-}" ]; then
exec tmux -L "$session" attach -t "$session"
fi
# -----------------------------------------------------------------------
# Ruta ventana-nueva: AUTO-DETECTAR la terminal disponible (sin config por
# PC). El mismo `fleetclaude` funciona en un escritorio Linux con kitty y en
# un WSL sin kitty pero con Windows Terminal.
# 1. kitty instalado + display usable ($DISPLAY/$WAYLAND_DISPLAY) -> kitty
# (escritorio Linux nativo, o WSLg con kitty instalado).
# 2. WSL con wt.exe alcanzable -> Windows Terminal ejecutando wsl.exe que
# adjunta la sesion tmux (PCs WSL sin kitty: la ventana kitty nunca
# aparece sin una terminal Linux real, por eso "se lanza pero no se ve").
# 3. Ninguna -> error claro con las dos salidas posibles.
# -----------------------------------------------------------------------
if command -v kitty >/dev/null 2>&1 && [[ -n "${DISPLAY:-}${WAYLAND_DISPLAY:-}" ]]; then
setsid kitty --title "FleetView ($session)" -e tmux -L "$session" attach -t "$session" </dev/null >/dev/null 2>&1 &
disown 2>/dev/null || true
echo "launch_fleetclaude: ventana kitty 'FleetView ($session)' adjunta al perfil '$session'."
return 0
# Ruta ventana-nueva: necesitamos kitty para abrirla.
if ! command -v kitty >/dev/null 2>&1; then
echo "launch_fleetclaude: kitty no esta instalado (necesario para abrir ventana nueva)." >&2
echo "launch_fleetclaude: lanzalo desde una terminal interactiva fuera de tmux, o instala kitty." >&2
return 1
fi
setsid kitty --title "FleetView ($session)" -e tmux -L "$session" attach -t "$session" </dev/null >/dev/null 2>&1 &
disown 2>/dev/null || true
if command -v wt.exe >/dev/null 2>&1; then
# bash -lic <attach> dentro de wsl.exe: login+interactive para que tmux y
# el PATH del perfil esten disponibles en la ventana de Windows Terminal.
local attach_cmd
attach_cmd="tmux -L $(printf '%q' "$session") attach -t $(printf '%q' "$session")"
local distro="${WSL_DISTRO_NAME:-}"
local wsl_args=(wsl.exe)
[[ -n "$distro" ]] && wsl_args+=(-d "$distro")
wsl_args+=(-- bash -lic "$attach_cmd")
# cd a una ruta Windows (/mnt/c) evita el warning de wt.exe por cwd UNC
# (\\wsl.localhost\...). El cwd real de los panes lo fija la sesion tmux.
( cd /mnt/c 2>/dev/null || cd /
wt.exe new-tab --title "FleetView ($session)" "${wsl_args[@]}" </dev/null >/dev/null 2>&1 &
disown 2>/dev/null || true )
echo "launch_fleetclaude: Windows Terminal 'FleetView ($session)' adjunta al perfil '$session' (WSL distro '${distro:-default}')."
return 0
fi
echo "launch_fleetclaude: no hay terminal para abrir una ventana nueva." >&2
echo "launch_fleetclaude: - escritorio Linux: instala kitty y exporta DISPLAY/WAYLAND_DISPLAY." >&2
echo "launch_fleetclaude: - WSL: usa Windows Terminal (wt.exe debe estar en el PATH)." >&2
echo "launch_fleetclaude: - o lanza fleetclaude desde una terminal interactiva fuera de tmux." >&2
return 1
echo "launch_fleetclaude: ventana kitty 'FleetView ($session)' adjunta al perfil '$session'."
return 0
}
# Permitir ejecutar el archivo directamente (no solo como funcion sourced).
-299
View File
@@ -1,299 +0,0 @@
# AutomaticEDA — contrato de capítulos
Documento autoritativo para **escribir capítulos** del informe AutomaticEDA. Léelo
entero antes de añadir un capítulo: define el modelo de bloques, la firma del builder,
el versionado, dónde colocar el módulo, cómo se registra en el orden del documento, qué
claves del `profile` consume cada capítulo y un ejemplo completo de capítulo de
referencia (OVERVIEW).
AutomaticEDA es la capa intermedia entre **contenido** (lo que un capítulo quiere
decir) y **formato de salida** (PDF móvil + PPTX para compartir). Un mismo documento por
capítulos se renderiza a los dos formatos con garantía de **no-corte**: el texto se
envuelve a líneas completas, las tablas largas se parten por filas repitiendo la
cabecera, y figuras/imágenes se escalan para caber enteras.
- Código del motor: `python/functions/datascience/automatic_eda/` (paquete de soporte).
- Funciones públicas del registry (grupo `eda`): `render_automatic_eda_pdf`,
`render_automatic_eda_pptx`.
- Sustituye evolutivamente a `render_eda_pdf` **de forma aditiva** (ese sigue activo en
`profile_table(emit_pdf=True)`).
---
## 1. Modelo de documento
```
Document = list[Chapter]
Chapter = { id: str, title: str, version: str, blocks: list[Block] }
Block = Heading | Markdown | KVTable | DataTable | Figure | Image | Caption | Note
```
Importa el modelo desde `datascience.automatic_eda.model` (o
`from datascience.automatic_eda import ...`). Todos los bloques son dataclasses; los
renderers también aceptan **dicts** con la clave `kind` (lectura defensiva: lo no
reconocido se degrada a `Note`, nunca lanza).
### Bloques
| Bloque | Construcción | Qué hace en el render |
|---|---|---|
| `Heading(text, level=1)` | título de sección, `level` 1 (grande) … 3 (chico) | una o varias líneas en negrita; nivel 1 lleva subrayado de acento |
| `Markdown(text)` | texto markdown ligero | ver subset abajo; **nunca corta a media línea** |
| `KVTable(rows, title=None)` | `rows = [(clave, valor), ...]` | tabla de 2 columnas etiqueta/valor; el valor se envuelve |
| `DataTable(header, rows, title=None, note=None)` | `header=[...]`, `rows=[[...],...]` | tabla con cabecera; **se parte por filas repitiendo cabecera**; las celdas largas se envuelven dentro de su columna |
| `Figure(fig=None, make=None, caption=None, height_in=None)` | una `matplotlib.figure.Figure` ya construida (`fig`) o un callable `make()->Figure` (perezoso) | se rasteriza y escala para caber entera (nunca recortada) |
| `Image(path, caption=None, height_in=None)` | ruta a PNG/JPG | se escala para caber entera |
| `Caption(text)` / `Note(text)` | texto auxiliar pequeño | pie/nota en gris; `Note` es además el fallback de lo desconocido |
### Subset de markdown soportado (`Markdown`)
`#`/`##`/`###` → headings; `-`/`*` → viñetas; líneas `| a | b |` consecutivas → una
`DataTable`; línea en blanco → separación de párrafo; `**bold**`/`__bold__`/`` `code` ``
→ se quitan los marcadores y se conserva el texto. Todo lo demás se renderiza tal cual.
Garantía: ningún carácter se pierde; lo que no cabe se envuelve o pasa de página/slide.
---
## 2. Firma del builder de capítulo (OBLIGATORIA)
Cada capítulo es un módulo `python/functions/datascience/automatic_eda/chapters/<id>.py`
que expone **dos** símbolos:
```python
CHAPTER_VERSION = "1.0.0" # semver de generación del capítulo (ver §4)
def build_<id>(profile: dict, ctx: dict) -> "Chapter | None":
"""Construye el capítulo desde el TableProfile y el contexto de presentación.
Devuelve None si el capítulo NO aplica a este dataset (p.ej. timeseries sin
columna fecha). Lee SIEMPRE defensivamente con .get y NUNCA lanza.
"""
```
- El nombre de la función es exactamente `build_<id>` donde `<id>` es el del módulo y
el de `CHAPTER_ORDER` (§3). Ej.: `chapters/num_distr.py` → `build_num_distr`.
- Devuelve un `model.Chapter(id, title, version=CHAPTER_VERSION, blocks=[...])` o `None`.
- Un capítulo que devuelve `None` o cuyos `blocks` quedan vacíos se omite del documento.
---
## 3. Registro y orden del documento
El orden canónico está **pre-declarado** en
`python/functions/datascience/automatic_eda/chapters_registry.py`:
```python
CHAPTER_ORDER = [
"portada", "overview", "num_distr", "cat_distr", "calidad", "correlacion",
"modelos", "analisis_llm", "timeseries", "geospatial", "agregacion",
]
```
`build_document(profile, ctx)` recorre este orden, importa perezosamente
`chapters/<id>.py` y llama `build_<id>`. **Para añadir un capítulo NO se edita
`chapters_registry.py`**: basta crear el módulo `chapters/<id>.py` (con su `<id>` ya en
`CHAPTER_ORDER`) y aparecerá automáticamente en su posición. Esto permite que muchos
agentes trabajen **en paralelo** sin contención: cada uno toca solo su archivo.
Si tu capítulo usa un `<id>` que aún no está en `CHAPTER_ORDER`, añádelo en la posición
correcta (única edición compartida; coordínala con el orquestador).
`build_document` nunca lanza: un capítulo cuyo módulo no existe se salta, y uno que falla
o devuelve `None` se omite.
---
## 4. Versionado por capítulo + manifiesto
- `CHAPTER_VERSION` (semver) identifica la **generación** del capítulo. Bumpéalo cuando
cambies qué/cómo emite el capítulo (no en cada corrida). Se estampa en el pie de cada
página/slide: `<Título> · v<version>`.
- `ENGINE_VERSION` (en `model.py`) versiona el motor global.
- Al renderizar se escribe `automatic_eda_manifest.json` junto a la salida:
```json
{
"engine": "AutomaticEDA",
"engine_version": "1.0.0",
"generated_at": "2026-06-30 12:20:56 UTC",
"chapters": {
"portada": { "version": "1.0.0", "n_pages": 1, "n_slides": 1 },
"overview": { "version": "1.0.0", "n_pages": 2, "n_slides": 2 }
}
}
```
Llamar a uno o ambos renderers crea/actualiza el manifiesto (read-modify-write
defensivo). Esto habilita el **seguimiento y la mejora continua por capítulo**.
---
## 5. `ctx` — contexto de presentación
`ctx` lleva metadatos que **no están** en el `TableProfile` (lo aporta el caller via
`meta['ctx']`). Claves convencionales (todas opcionales):
| Clave | Uso |
|---|---|
| `dataset_name` | nombre del dataset (portada). Default: `profile['table']` |
| `source_origin` | de dónde viene el dataset (portada). Default: `profile['source']` |
| `storage` | tecnología de almacenamiento (portada). Default: inferido de `source` |
| `generated_at` | fecha de generación (portada/manifiesto). Default: `profiled_at`/ahora |
| `description` | frase de descripción del dataset (portada) |
| `granularity` | "Cada fila es…" (portada). Default: derivado de `key_candidates` |
| `quality_criteria` | criterios del score de calidad (portada) |
| `head_rows` | `list[dict]` con `df.head` (overview). Ver §7 |
Un capítulo puede definir y consumir sus propias claves `ctx` — documenta cuáles en su
docstring.
---
## 6. Claves del `profile` que consume cada capítulo
El `TableProfile` lo produce `profile_table(...)["profile"]` (grupo `eda`). Claves de
nivel superior: `table, source, profiled_at, n_rows, n_cols, size_bytes, duplicate_rows,
duplicate_pct, null_cell_pct, constant_cols, all_null_cols, quality_score,
type_breakdown, key_candidates, columns[], correlations, llm, models, series, caveats`.
Cada `columns[i]`: `name, inferred_type, semantic_type, physical_type, distinct_count,
unique_pct, null_count, null_pct, empty_count, empty_pct, flags, quality_score,
numeric{min,max,mean,median,std,variance,cv,iqr,skew,kurtosis,p1..p99,mode,n_outliers,
outlier_pct,zero_pct,negative_pct,distribution_type,histogram[{lo,hi,count}]},
categorical{top[{value,count,pct}],mode,n_distinct,entropy,imbalance,len_min/mean/max},
reexpression, series{...}`.
| Capítulo | Claves del profile que consume |
|---|---|
| `portada` | `table, source, profiled_at, n_rows, n_cols, quality_score, key_candidates` + `ctx` |
| `overview` | `columns[].{name,inferred_type,semantic_type,physical_type,null_pct,null_count,categorical.top,numeric.{min,median,max,mean,std}}`, `head_rows` (ver §7) |
| `num_distr` (pendiente) | `columns[] numeric.{histogram,mean,median,std,outlier_pct,...}` |
| `cat_distr` (pendiente) | `columns[] categorical.{top,entropy,imbalance}` |
| `calidad` (pendiente) | `quality_score`, `columns[].{quality_score,flags,issues}`, `duplicate_*`, `null_cell_pct`, `constant_cols`, `all_null_cols` |
| `correlacion` (pendiente) | `correlations.pairs[{a,b,value,method}]`, `correlations.levels_caveat` |
| `modelos` (pendiente) | `models.{pca,kmeans,outliers,normality}` |
| `analisis_llm` (pendiente) | `llm` |
| `timeseries` (pendiente) | `series{col:{stationarity,acf_pacf,stl,levels_*}}` |
| `geospatial` (pendiente) | columnas con `semantic_type` geográfico (lat/lon) |
| `agregacion` (pendiente) | `columns[]` + agregados que la fase de cálculo añada |
---
## 7. Claves nuevas del profile que la fase de cálculo debe añadir
El `TableProfile` actual **no** trae estas claves; el capítulo OVERVIEW las consume y, si
faltan, degrada honestamente (placeholder + derivación de valores reales). Para un
overview completo, la fase de cálculo (otro agente) debe añadir:
- `profile['head_rows']`: `list[dict]` con las primeras N filas (`df.head`), una por
dict `{columna: valor}`. Mientras tanto OVERVIEW muestra un placeholder.
- `columns[i]['examples']`: `list` de hasta N valores **no nulos** crudos de la columna.
Mientras tanto OVERVIEW deriva ejemplos de `categorical.top[].value` (categóricas) y de
`numeric.{min,median,max}` (numéricas) — son valores reales, no inventados.
Sugerencia de implementación (no obligatoria en esta fase): una función del registry que
muestree `head_rows`/`examples` desde DuckDB y las inyecte en el profile antes de
renderizar (delegar a `fn-constructor`, tag `eda`).
---
## 8. Ejemplo COMPLETO de capítulo de referencia (OVERVIEW)
Copia este patrón. Archivo real:
`python/functions/datascience/automatic_eda/chapters/overview.py`.
```python
from .. import model
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "overview"
CHAPTER_TITLE = "Overview"
def _fmt_num(v, d=3):
# ... formateo defensivo (None -> "—", floats compactos) ...
...
def _examples_for(col: dict) -> str:
# 1) col['examples'] si existe; 2) categorical.top[].value;
# 3) numeric.{min,median,max}. Nunca celda vacía ni inventada.
...
def build_overview(profile: dict, ctx: dict):
profile = profile or {}
ctx = ctx or {}
cols = profile.get("columns") or []
if not cols and not (ctx.get("head_rows") or profile.get("head_rows")):
return None # no aplica.
blocks = [
model.Heading(text="Primeras filas (df.head)", level=2),
_head_block(profile, ctx), # DataTable(df.head) o Note si falta head_rows.
]
cols_block = _columns_block(profile) # DataTable: nombre/tipo/nulos/ejemplos.
if cols_block is not None:
blocks.append(model.Heading(text="Diccionario de columnas", level=2))
blocks.append(cols_block)
desc_block = _describe_block(profile) # DataTable: mean/median/min/max/std.
if desc_block is not None:
blocks.append(model.Heading(text="Resumen estadístico numérico", level=2))
blocks.append(desc_block)
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
```
Puntos clave que todo capítulo debe respetar:
1. **Lectura defensiva**: `profile.get(...)`, `or []`, comprobar `isinstance` — nunca
asumir que una clave existe ni lanzar.
2. **`None` si no aplica**: devuelve `None` (o `blocks` vacíos) cuando el dataset no tiene
lo que el capítulo necesita.
3. **No inventar**: si falta un dato (p.ej. `df.head`), muestra un placeholder honesto o
deriva de valores reales del perfil; deja el hueco documentado.
4. **Tablas vía `DataTable`**: deja que el renderer las parta y repita cabecera; no
pre-pagines tú.
5. **Figuras vía `Figure(make=...)`**: pásalas perezosas; las dibuja y escala el renderer.
---
## 9. Cómo se prueba un capítulo
```python
from datascience.automatic_eda import build_document, render_pdf, render_pptx
chapters = build_document(profile, ctx={"dataset_name": "..."})
render_pdf(chapters, "reports/x.pdf", {"title": "EDA"})
render_pptx(chapters, "reports/x.pptx", {"title": "EDA"})
```
O directo desde las funciones públicas con el profile entero (construyen los capítulos):
```python
from datascience import render_automatic_eda_pdf, render_automatic_eda_pptx
render_automatic_eda_pdf(profile, "reports/x.pdf", {"ctx": {...}})
render_automatic_eda_pptx(profile, "reports/x.pptx", {"ctx": {...}})
```
Añade un test self-contained por capítulo (perfil sintético, sin DuckDB) que verifique
sus bloques presentes y el no-corte (texto largo intacto en la salida). Patrón:
`render_automatic_eda_pdf_test.py`.
---
## 10. Integración futura con `profile_table` (siguiente fase)
`profile_table(emit_pdf=True)` usa hoy `render_eda_pdf` (intacto). En la siguiente fase
se añadirá `emit_automatic=True` (o se migrará `emit_pdf`) para que cada EDA emita
**siempre** PDF + PPTX del motor AutomaticEDA desde el mismo profile:
```python
# Bosquejo de la integración aditiva (NO activar si rompe los tests actuales):
if emit_automatic:
ctx = {"dataset_name": table, "source_origin": db_path, ...}
render_automatic_eda_pdf(prof, os.path.join(report_dir, f"aeda_{table}_{ts}.pdf"),
{"title": f"EDA — {table}", "ctx": ctx})
render_automatic_eda_pptx(prof, os.path.join(report_dir, f"aeda_{table}_{ts}.pptx"),
{"title": f"EDA — {table}", "ctx": ctx})
```
Hasta entonces los renderers se invocan directamente sobre el `profile` que
`profile_table` ya devuelve.
+1 -1
View File
@@ -68,7 +68,7 @@ Indice de grupos de capacidades del registry. Cada grupo agrupa >=3 funciones qu
| [consent](consent.md) | 3 | CMP / IAB TCF / data brokers: detectar el CMP de un sitio (Didomi/OneTrust/Sourcepoint/Quantcast), leer `__tcfapi` para contar vendors y propositos, aceptar el banner (selectores + fallback LLM con haiku que localiza Aceptar/Ver socios), y descargar la GVL de IAB para nominar cada broker y que datos recopila. Nacio de `projects/databrokers/` |
| [onlyoffice](onlyoffice.md) | 3 | Operar ONLYOFFICE Desktop Editors (binario onlyoffice-desktopeditors) en Linux/X11 desde terminal via instancia aislada (slot HOME=/tmp/oo_<instance>): abrir un archivo en ventana propia, cerrar+reabrir para mostrar datos editados en disco (no hay reload nativo, Issue #2313), y matar el proceso del slot. Solo gestiona la ventana, NO edita ni crea archivos. Requiere X11 + wmctrl + xdotool. No confundir con el Document Server (web/Docker) |
| [email](email.md) | 21 | Gestionar cuentas de correo por IMAP+SMTP directo (Python stdlib, sin browser ni MCP Gmail): conectar/listar/buscar/leer (imap_*), mutar estado (mark_seen/move/delete/save_draft) por UID, y construir+enviar (email_build_html/smtp_send). Auth user+app-password (NO OAuth; Outlook fuera). Credenciales desde pass, resueltas por la capa app. Complementa al browser (interactivo) — no lo reemplaza |
| [eda](eda.md) | 29 | Exploratory Data Analysis por tabla y base con motor DuckDB + PostgreSQL push-down: perfil base SQL (SUMMARIZE + distinct exacto), estadística numérica/categórica, tipo semántico regex, calidad, correlación/asociación (Pearson/Spearman/Cramér's V/Theil's U/η/MI), relaciones inter-tabla (FK containment + join graph mermaid), modelos baratos (PCA/KMeans/IsolationForest/normalidad/tendencia), capa LLM (dictionary/PII/limpieza/análisis) y generación de notebook. Orquestadores `profile_table` (backend duckdb/postgres, flags run_models/run_llm) y `profile_database` |
| [eda](eda.md) | 27 | Exploratory Data Analysis por tabla y base con motor DuckDB + PostgreSQL push-down: perfil base SQL (SUMMARIZE + distinct exacto), estadística numérica/categórica, tipo semántico regex, calidad, correlación/asociación (Pearson/Spearman/Cramér's V/Theil's U/η/MI), relaciones inter-tabla (FK containment + join graph mermaid), modelos baratos (PCA/KMeans/IsolationForest/normalidad/tendencia), capa LLM (dictionary/PII/limpieza/análisis) y generación de notebook. Orquestadores `profile_table` (backend duckdb/postgres, flags run_models/run_llm) y `profile_database` |
| [seo](seo.md) | 3 | SEO orientado a datos sobre Google Search Console: autenticar con service account (`gsc_auth`), extraer Search Analytics paginado (`pull_gsc_search_analytics`) y el pipeline de ingesta a DuckDB + espejo Postgres para Metabase (`ingest_gsc_search_analytics`). Cadena de ingesta del proyecto `seo_analytics`; alimenta dashboards de striking distance, CTR opportunities y content decay |
| [local-hub](local-hub.md) | 4 | Exponer los procesos locales como subdominios `*.localhost` (via Caddy, sin DNS) y reunirlos en una pantalla principal Glance con estado en vivo, refrescada a diario por dag_engine. Descubre servicios (manifiesto + registry), renderiza Caddyfile + config Glance (puras), y el pipeline `refresh_local_hub` regenera+recarga. Fuente de verdad: `apps/local_hub/local_services.yaml` |
| [comfyui-judge](comfyui-judge.md) | 4 | Panel multi-juez de calidad de imagen: estético LAION-V2 (`comfyui_score_aesthetic`, 0-10) + fidelidad CLIP prompt↔imagen (`comfyui_score_clip_alignment`, 0-1) + crítica LLM-vision (`comfyui_critique_image_llm`, good/bad). Agregados por voto mayoría en `comfyui_judge_image`. Gate objetivo para tests/DoD y el bucle de mejora de skills ComfyUI; degrada con gracia si un juez cae. Jueces estético/fidelidad por subproceso al venv ComfyUI (torch+open_clip), crítico via claude-direct |
-4
View File
@@ -71,10 +71,6 @@ Orquestadores one-shot:
| `eda_llm_insights_py_datascience` | impure | 1 call LLM sobre el perfil agregado (no filas crudas): data dictionary, resumen, granularidad de fila, PII/RGPD, limpieza, análisis sugeridos. |
| `build_eda_notebook_py_datascience` | impure | Genera un `.ipynb` (nbformat v4) que perfila la tabla, listo para lanzar en Jupyter colaborativo. |
| `render_eda_pdf_py_datascience` | impure | Renderiza el `TableProfile` a un PDF multipágina **vertical (A5), legible en móvil** (estilo Tufte: histogramas como small multiples, top-k, heatmap de asociación). 4ª salida del workflow, junto a JSON/Markdown/notebook. |
| `render_automatic_eda_pdf_py_datascience` | impure | Motor **AutomaticEDA**: documento por CAPÍTULOS (modelo de bloques independiente del formato) → PDF A5 móvil que **nunca corta** texto/tablas/imágenes (tablas largas se parten repitiendo cabecera) + manifiesto versionado por capítulo. Acepta el `TableProfile` o capítulos del modelo. Aditivo, no reemplaza `render_eda_pdf`. |
| `render_automatic_eda_pptx_py_datascience` | impure | Motor **AutomaticEDA** → PPTX 16:9 para **compartir** desde el mismo documento por capítulos; mismo principio anti-corte (continúa en slide `(cont.)`). Motor `python-pptx`. |
> **AutomaticEDA** (núcleo nuevo, fase de capítulos): separa contenido (capítulos/bloques) de formato (PDF móvil + PPTX). Para escribir un capítulo nuevo (NUM DISTR, CAT DISTR, CALIDAD, CORRELACIÓN, MODELOS, ANÁLISIS LLM, TIMESERIES, GEOSPATIAL, AGREGACIÓN) lee el contrato: **`docs/automatic_eda_contract.md`**. Código del motor en `python/functions/datascience/automatic_eda/`; capítulos de referencia: `portada`, `overview`.
### Orquestadores (pipelines)
| ID | Qué hace |
+129 -16
View File
@@ -30,6 +30,7 @@ type auditFnMeta struct {
domain string
lang string
signature string
filePath string // registry-relative path to the .go source (Go funcs only)
}
// skipDirs are directory names ignored when walking source for audits.
@@ -80,15 +81,16 @@ func AuditUsesFunctions(registryRoot string) ([]UsesFunctionsAudit, error) {
return nil, fmt.Errorf("audit_uses_functions: ping db: %w", err)
}
// Load all Go/Python/TS functions from registry: id → name, domain, lang, signature.
rows, err := db.Query(`SELECT id, name, domain, lang, COALESCE(signature, '') FROM functions WHERE lang IN ('go','py','ts')`)
// Load all Go/Python/TS functions from registry: id → name, domain, lang,
// signature, file_path. file_path feeds the Go .go fallback (see auditGoApp).
rows, err := db.Query(`SELECT id, name, domain, lang, COALESCE(signature, ''), COALESCE(file_path, '') FROM functions WHERE lang IN ('go','py','ts')`)
if err != nil {
return nil, fmt.Errorf("audit_uses_functions: query functions: %w", err)
}
allFunctions := make(map[string]auditFnMeta) // id → meta
for rows.Next() {
var m auditFnMeta
if err := rows.Scan(&m.id, &m.name, &m.domain, &m.lang, &m.signature); err != nil {
if err := rows.Scan(&m.id, &m.name, &m.domain, &m.lang, &m.signature, &m.filePath); err != nil {
continue
}
allFunctions[m.id] = m
@@ -144,7 +146,7 @@ func AuditUsesFunctions(registryRoot string) ([]UsesFunctionsAudit, error) {
switch app.lang {
case "go":
importedIDs = append(importedIDs, auditGoApp(absDir, allFunctions)...)
importedIDs = append(importedIDs, auditGoApp(absDir, allFunctions, registryRoot)...)
scannedLangs["go"] = true
case "py":
importedIDs = append(importedIDs, auditPyApp(absDir, allFunctions)...)
@@ -197,11 +199,18 @@ func AuditUsesFunctions(registryRoot string) ([]UsesFunctionsAudit, error) {
// Strategy:
// 1. Find all "fn-registry/functions/<domain>" import paths (production code only).
// 2. For each domain, collect registry functions in that domain.
// 3. Grep source files for the exported symbol. The token tried first is the
// real Go func identifier parsed from the registry signature; fallback is
// PascalCase(name). Many functions deviate (e.g. sqlite_column_exists has
// `func ColumnExists`), so signature is the source of truth.
func auditGoApp(appDir string, all map[string]auditFnMeta) []string {
// 3. Grep source files for the exported symbol. Tokens tried, in order:
// a) the real Go func identifier parsed from the registry signature;
// b) PascalCase(name) (with commonAbbrevs);
// c) the real exported func read straight from the function's .go file.
//
// Many functions deviate from snake_case→PascalCase (e.g. sqlite_column_exists
// has `func ColumnExists`, wails_bind_crud has `func GenerateWailsCRUD`). The
// signature is usually the source of truth, but some signatures omit the `func`
// keyword or list a different primary symbol; step (c) reads the .go file as a
// last-resort fallback so those cases stop being false positives ("unused").
// The .go read is cached per execution to avoid reopening the same file.
func auditGoApp(appDir string, all map[string]auditFnMeta, registryRoot string) []string {
// Step 1: collect imported domains.
importedDomains := collectGoImportedDomains(appDir)
if len(importedDomains) == 0 {
@@ -216,6 +225,10 @@ func auditGoApp(appDir string, all map[string]auditFnMeta) []string {
return nil
}
// Cache for the .go fallback: registry file_path → real exported func name.
// Populated lazily, only when the cheaper tokens fail to match.
goFileSymbolCache := make(map[string]string)
for _, m := range all {
if m.lang != "go" {
continue
@@ -223,17 +236,76 @@ func auditGoApp(appDir string, all map[string]auditFnMeta) []string {
if !importedDomains[m.domain] {
continue
}
tokens := goCandidateTokens(m)
for _, tok := range tokens {
matched := false
for _, tok := range goCandidateTokens(m) {
if containsToken(blob, tok) {
used = append(used, m.id)
matched = true
break
}
}
if !matched && goSignatureSymbol(m) == "" {
// Fallback (c): read the registry .go file and look for the real
// exported func name. Gated on an EMPTY signature symbol on purpose:
// when the signature already yields a concrete `func <Name>` it is the
// authoritative symbol, so reading the .go (which can only guess the
// file's first exported func) must not override it. Several registry
// functions share one .go file via the "TU adicional" pattern (e.g.
// cdp_new_tab lives in cdp_list_tabs.go); without this gate the first
// func would be mis-attributed to every sibling and suppress real
// "unused" findings. The file read therefore only happens for the rare
// functions whose stored signature omits the `func` keyword.
if sym := goRealExportedName(registryRoot, m.filePath, goFileSymbolCache); sym != "" {
if containsToken(blob, sym) {
matched = true
}
}
}
if matched {
used = append(used, m.id)
}
}
return used
}
// goRealExportedFnRe matches a top-level exported func declaration in a .go
// source file: `func Name(` or the generic form `func Name[T any](`. It captures
// the func identifier. Method declarations (`func (r *T) Name(`) are skipped on
// purpose — a registry function's primary symbol is a top-level func, and method
// names would risk spurious matches. Used by the .go fallback to recover the real
// symbol name when the registry signature/name heuristics fail.
var goRealExportedFnRe = regexp.MustCompile(`^func\s+([A-Z][A-Za-z0-9_]*)\s*[\(\[]`)
// goRealExportedName reads the registry .go file at filePath (relative to
// registryRoot) and returns the first exported func identifier found. Results
// are memoised in cache (filePath → symbol, "" when the file is unreadable or
// has no exported func) so a file is opened at most once per audit run.
func goRealExportedName(registryRoot, filePath string, cache map[string]string) string {
if filePath == "" {
return ""
}
if sym, ok := cache[filePath]; ok {
return sym
}
cache[filePath] = "" // pre-seed so an unreadable file is not retried
abs := filePath
if !filepath.IsAbs(abs) {
abs = filepath.Join(registryRoot, filePath)
}
f, err := os.Open(abs)
if err != nil {
return ""
}
defer f.Close()
sc := bufio.NewScanner(f)
for sc.Scan() {
if m := goRealExportedFnRe.FindStringSubmatch(sc.Text()); m != nil {
cache[filePath] = m[1]
return m[1]
}
}
return ""
}
// goCandidateTokens returns the identifiers we try when looking for usages
// of a Go function in source. Real exported name from signature first,
// PascalCase(name) as fallback.
@@ -241,10 +313,8 @@ var goSignatureFnRe = regexp.MustCompile(`^\s*func\s+(?:\([^)]*\)\s+)?([A-Z][A-Z
func goCandidateTokens(m auditFnMeta) []string {
out := []string{}
if m.signature != "" {
if match := goSignatureFnRe.FindStringSubmatch(m.signature); match != nil {
out = append(out, match[1])
}
if sym := goSignatureSymbol(m); sym != "" {
out = append(out, sym)
}
pascal := snakeToPascal(m.name)
if pascal != "" && (len(out) == 0 || out[0] != pascal) {
@@ -253,6 +323,21 @@ func goCandidateTokens(m auditFnMeta) []string {
return out
}
// goSignatureSymbol returns the exported Go identifier parsed from the registry
// signature (`func Name(...)` or `func (r *T) Name(...)`), or "" when the
// signature is empty or does not start with a `func` declaration. A non-empty
// result is the authoritative symbol for the function and gates off the .go
// fallback in auditGoApp.
func goSignatureSymbol(m auditFnMeta) string {
if m.signature == "" {
return ""
}
if match := goSignatureFnRe.FindStringSubmatch(m.signature); match != nil {
return match[1]
}
return ""
}
// collectGoImportedDomains returns the set of registry domains imported by .go files.
var goImportRe = regexp.MustCompile(`"fn-registry/functions/([a-z]+)"`)
@@ -452,6 +537,34 @@ var commonAbbrevs = map[string]string{
"io": "IO",
"ok": "OK",
"ui": "UI",
// Issue 0057 — abbreviations verified consistent across the registry's own
// Go func names (each entry maps a real `func <Name>` deviation). These only
// improve the PascalCase fallback; the signature and the .go fallback remain
// the primary sources of truth. Deliberately NOT added because the registry
// itself is inconsistent for them (mapping would create more mismatches than
// it fixes): "cdp" (uses Cdp: CdpGetHTML, CdpNavigate — not CDP) and
// "pdf" (CdpPrintPDF vs PdfSimpleReport).
"ohlcv": "OHLCV",
"duckdb": "DuckDB",
"clickhouse": "ClickHouse",
"nordvpn": "NordVPN",
"sha256": "SHA256",
"md5": "MD5",
"ansi": "ANSI",
"cidr": "CIDR",
"aead": "AEAD",
"pty": "PTY",
"vps": "VPS",
"wg": "WG",
"vt": "VT",
"fft": "FFT",
"ema": "EMA",
"rsi": "RSI",
"sma": "SMA",
"vwap": "VWAP",
"ax": "AX",
"e2e": "E2E",
"urls": "URLs",
}
// hasTSSources reports whether appDir contains any production .ts/.tsx files
@@ -148,6 +148,273 @@ func main() { fmt.Println("hello") }
})
}
// TestSnakeToPascal_HandlesAbbreviations verifies the commonAbbrevs expansion
// (issue 0057, Fase 1). Each "want" is the exported Go symbol the registry
// actually uses for that snake_case name. It also pins the deliberate
// non-mappings (cdp, pdf): the registry's own convention is mixed-case there,
// so the abbreviation must NOT fire.
func TestSnakeToPascal_HandlesAbbreviations(t *testing.T) {
cases := []struct{ in, want string }{
// New abbreviations added by issue 0057 (verified against real func names).
{"fetch_ohlcv", "FetchOHLCV"},
{"normalize_ohlcv", "NormalizeOHLCV"},
{"duckdb_open", "DuckDBOpen"},
{"load_ohlcv_from_duckdb", "LoadOHLCVFromDuckDB"},
{"clickhouse_open", "ClickHouseOpen"},
{"nordvpn_container_run", "NordVPNContainerRun"},
{"parse_nordvpn_status", "ParseNordVPNStatus"},
{"hash_sha256", "HashSHA256"},
{"hash_md5", "HashMD5"},
{"strip_ansi", "StripANSI"},
{"parse_ip_cidr", "ParseIPCIDR"},
{"open_aead", "OpenAEAD"},
{"seal_aead", "SealAEAD"},
{"pty_capture_stream", "PTYCaptureStream"},
{"setup_vps_app", "SetupVPSApp"},
{"vps_setup_app", "VPSSetupApp"},
{"wg_keygen", "WGKeygen"},
{"wg_peer_add", "WGPeerAdd"},
{"vt_render", "VTRender"},
{"fft", "FFT"},
{"ema", "EMA"},
{"rsi", "RSI"},
{"sma", "SMA"},
{"vwap", "VWAP"},
{"cdp_get_ax_outline", "CdpGetAXOutline"},
{"audit_e2e_coverage", "AuditE2ECoverage"},
{"e2e_run_checks", "E2ERunChecks"},
{"extract_urls", "ExtractURLs"},
// Pre-existing abbreviations (regression guard — must keep working).
{"http_json_response", "HTTPJSONResponse"},
{"sqlite_open", "SQLiteOpen"},
{"random_hex_id", "RandomHexID"},
// Deliberate non-mappings: registry uses mixed-case (Cdp, Pdf) here, so
// the snake_case→Pascal conversion must leave them mixed-case. These are
// the cases the .go fallback (Fase 2) and the signature path cover.
{"cdp_get_html", "CdpGetHTML"},
{"cdp_navigate", "CdpNavigate"},
{"pdf_simple_report", "PdfSimpleReport"},
}
for _, c := range cases {
if got := snakeToPascal(c.in); got != c.want {
t.Errorf("snakeToPascal(%q) = %q, want %q", c.in, got, c.want)
}
}
}
// goFallbackEnv builds a minimal registry.db + app on disk for the .go fallback
// test. The registry function gen_wails_crud_go_infra mimics wails_bind_crud:
// its signature omits the `func` keyword (so the signature regex misses) and its
// PascalCase("gen_wails_crud")="GenWailsCRUD" differs from the real exported
// symbol "GenerateWailsCRUD". The app calls the real symbol. When writeFnFile is
// true, the registry .go file exists and the fallback can recover the symbol.
func goFallbackEnv(t *testing.T, fnFilePath string, writeFnFile bool) UsesFunctionsAudit {
t.Helper()
root := t.TempDir()
dbPath := filepath.Join(root, "registry.db")
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
t.Fatal(err)
}
_, err = db.Exec(`
CREATE TABLE functions (id TEXT PRIMARY KEY, name TEXT, domain TEXT, lang TEXT, signature TEXT, file_path TEXT);
CREATE TABLE apps (id TEXT PRIMARY KEY, lang TEXT, dir_path TEXT, uses_functions TEXT DEFAULT '[]');
`)
if err != nil {
t.Fatal(err)
}
_, err = db.Exec(
`INSERT INTO functions (id,name,domain,lang,signature,file_path) VALUES (?,?,?,?,?,?)`,
"gen_wails_crud_go_infra", "gen_wails_crud", "infra", "go",
"GenerateWailsCRUD(spec WailsCRUDSpec) string", fnFilePath,
)
if err != nil {
t.Fatal(err)
}
_, err = db.Exec(
`INSERT INTO apps (id,lang,dir_path,uses_functions) VALUES (?,?,?,?)`,
"myapp_go_infra", "go", "apps/myapp", `["gen_wails_crud_go_infra"]`,
)
if err != nil {
t.Fatal(err)
}
db.Close()
if writeFnFile {
fnAbsDir := filepath.Join(root, filepath.Dir(fnFilePath))
if err := os.MkdirAll(fnAbsDir, 0755); err != nil {
t.Fatal(err)
}
src := "package infra\n\ntype WailsCRUDSpec struct{}\n\nfunc GenerateWailsCRUD(spec WailsCRUDSpec) string { return \"\" }\n"
if err := os.WriteFile(filepath.Join(root, fnFilePath), []byte(src), 0644); err != nil {
t.Fatal(err)
}
}
appDir := filepath.Join(root, "apps", "myapp")
if err := os.MkdirAll(appDir, 0755); err != nil {
t.Fatal(err)
}
appSrc := "package main\n\nimport (\n\t\"fmt\"\n\t\"fn-registry/functions/infra\"\n)\n\nfunc main() {\n\tfmt.Println(infra.GenerateWailsCRUD(infra.WailsCRUDSpec{}))\n}\n"
if err := os.WriteFile(filepath.Join(appDir, "main.go"), []byte(appSrc), 0644); err != nil {
t.Fatal(err)
}
results, err := AuditUsesFunctions(root)
if err != nil {
t.Fatalf("AuditUsesFunctions: %v", err)
}
if len(results) != 1 {
t.Fatalf("expected 1 result, got %d", len(results))
}
return results[0]
}
// TestAuditUsesFunctions_GoFileFallback verifies the .go fallback (issue 0057,
// Fase 2): when neither the registry signature nor PascalCase(name) yields the
// real exported symbol, the auditor reads the function's .go file to recover it,
// so a genuinely-used function is not a false "unused". The error sub-case (file
// absent) shows the fallback degrades gracefully and the function is then
// correctly reported unused — proving the fallback is load-bearing.
func TestAuditUsesFunctions_GoFileFallback(t *testing.T) {
t.Run("golden: .go fallback recovers real symbol -> not unused", func(t *testing.T) {
got := goFallbackEnv(t, "functions/infra/gen_wails_crud.go", true)
if len(got.Unused) != 0 {
t.Errorf("Unused = %v, want [] (fallback should find GenerateWailsCRUD)", got.Unused)
}
if len(got.Missing) != 0 {
t.Errorf("Missing = %v, want []", got.Missing)
}
})
t.Run("error: missing .go file -> flagged unused, no crash", func(t *testing.T) {
got := goFallbackEnv(t, "functions/infra/gen_wails_crud.go", false)
if len(got.Unused) != 1 || got.Unused[0] != "gen_wails_crud_go_infra" {
t.Errorf("Unused = %v, want [gen_wails_crud_go_infra] (no fallback file to read)", got.Unused)
}
})
}
// TestAuditUsesFunctions_SharedGoFileNotMisattributed pins the regression caught
// during issue 0057 verification: several registry functions can share one .go
// file (the "TU adicional" pattern, e.g. cdp_new_tab living in cdp_list_tabs.go).
// Because they have valid signatures, the .go fallback must stay GATED OFF for
// them — otherwise the file's first exported func (here ListTabs) would be
// mis-attributed to a sibling (NewTab) and suppress a genuine "unused" finding.
// The app below uses only ListTabs; NewTab must remain flagged unused.
func TestAuditUsesFunctions_SharedGoFileNotMisattributed(t *testing.T) {
root := t.TempDir()
dbPath := filepath.Join(root, "registry.db")
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
t.Fatal(err)
}
_, err = db.Exec(`
CREATE TABLE functions (id TEXT PRIMARY KEY, name TEXT, domain TEXT, lang TEXT, signature TEXT, file_path TEXT);
CREATE TABLE apps (id TEXT PRIMARY KEY, lang TEXT, dir_path TEXT, uses_functions TEXT DEFAULT '[]');
INSERT INTO functions (id,name,domain,lang,signature,file_path) VALUES
('list_tabs_go_browser','list_tabs','browser','go','func ListTabs() error','functions/browser/tabs.go'),
('new_tab_go_browser','new_tab','browser','go','func NewTab() error','functions/browser/tabs.go');
INSERT INTO apps (id,lang,dir_path,uses_functions) VALUES
('tabsapp_go_browser','go','apps/tabsapp','["list_tabs_go_browser","new_tab_go_browser"]');
`)
if err != nil {
t.Fatal(err)
}
db.Close()
// Shared registry .go file: ListTabs is the FIRST exported func.
fnDir := filepath.Join(root, "functions", "browser")
if err := os.MkdirAll(fnDir, 0755); err != nil {
t.Fatal(err)
}
tabsSrc := "package browser\n\nfunc ListTabs() error { return nil }\n\nfunc NewTab() error { return nil }\n"
if err := os.WriteFile(filepath.Join(fnDir, "tabs.go"), []byte(tabsSrc), 0644); err != nil {
t.Fatal(err)
}
// App calls only ListTabs, but declares both.
appDir := filepath.Join(root, "apps", "tabsapp")
if err := os.MkdirAll(appDir, 0755); err != nil {
t.Fatal(err)
}
appSrc := "package main\n\nimport (\n\t\"fmt\"\n\t\"fn-registry/functions/browser\"\n)\n\nfunc main() {\n\tfmt.Println(browser.ListTabs())\n}\n"
if err := os.WriteFile(filepath.Join(appDir, "main.go"), []byte(appSrc), 0644); err != nil {
t.Fatal(err)
}
results, err := AuditUsesFunctions(root)
if err != nil {
t.Fatalf("AuditUsesFunctions: %v", err)
}
if len(results) != 1 {
t.Fatalf("expected 1 result, got %d", len(results))
}
got := results[0]
if len(got.Unused) != 1 || got.Unused[0] != "new_tab_go_browser" {
t.Errorf("Unused = %v, want [new_tab_go_browser] (sibling must NOT rescue via shared file)", got.Unused)
}
}
// TestGoRealExportedName verifies the .go symbol extractor: top-level exported
// funcs (plain and generic) are recovered, method receivers are skipped, the
// result is cached, and unreadable/empty paths return "" without error.
func TestGoRealExportedName(t *testing.T) {
root := t.TempDir()
if err := os.MkdirAll(filepath.Join(root, "functions", "infra"), 0755); err != nil {
t.Fatal(err)
}
// File whose first exported func is preceded by an unexported func + a method.
src := "package infra\n\n" +
"import \"fmt\"\n\n" +
"func helper() {}\n\n" +
"type T struct{}\n\n" +
"func (t *T) Save() {}\n\n" +
"func GenerateWailsCRUD(spec int) string { fmt.Println(spec); return \"\" }\n\n" +
"func WailsStreamData[X any](xs []X) {}\n"
rel := "functions/infra/sample.go"
if err := os.WriteFile(filepath.Join(root, rel), []byte(src), 0644); err != nil {
t.Fatal(err)
}
cache := map[string]string{}
t.Run("golden: first top-level exported func (skips helper + method)", func(t *testing.T) {
if got := goRealExportedName(root, rel, cache); got != "GenerateWailsCRUD" {
t.Errorf("got %q, want GenerateWailsCRUD", got)
}
if cache[rel] != "GenerateWailsCRUD" {
t.Errorf("cache[%q] = %q, want GenerateWailsCRUD", rel, cache[rel])
}
})
t.Run("edge: generic func form func Name[T any](", func(t *testing.T) {
genRel := "functions/infra/gen.go"
genSrc := "package infra\n\nfunc WailsStreamData[X any](xs []X) {}\n"
if err := os.WriteFile(filepath.Join(root, genRel), []byte(genSrc), 0644); err != nil {
t.Fatal(err)
}
if got := goRealExportedName(root, genRel, cache); got != "WailsStreamData" {
t.Errorf("got %q, want WailsStreamData", got)
}
})
t.Run("error: missing file -> empty string, cached", func(t *testing.T) {
missRel := "functions/infra/does_not_exist.go"
if got := goRealExportedName(root, missRel, cache); got != "" {
t.Errorf("got %q, want empty for missing file", got)
}
if v, ok := cache[missRel]; !ok || v != "" {
t.Errorf("missing file should be cached as empty, got ok=%v v=%q", ok, v)
}
})
t.Run("error: empty file_path -> empty string", func(t *testing.T) {
if got := goRealExportedName(root, "", cache); got != "" {
t.Errorf("got %q, want empty for empty path", got)
}
})
}
// TestAuditUsesFunctions_MissingDir verifies that apps whose dir_path does not
// exist on disk get an entry with nil Missing/Unused slices (cannot inspect).
func TestAuditUsesFunctions_MissingDir(t *testing.T) {
-8
View File
@@ -42,8 +42,6 @@ from .isolation_forest_outliers import isolation_forest_outliers
from .normality_tests import normality_tests
from .trend_slope import trend_slope
from .run_eda_models import run_eda_models
from .project_clusters_2d import project_clusters_2d
from .describe_clusters_llm import describe_clusters_llm
from .eda_llm_insights import eda_llm_insights
from .build_eda_notebook import build_eda_notebook
from .decode_qr_image import decode_qr_image
@@ -55,12 +53,8 @@ from .fdr_correction import fdr_correction
from .suggest_reexpression import suggest_reexpression
from .exploratory_caveats import exploratory_caveats
from .render_eda_pdf import render_eda_pdf, render_eda_pdf_relational
from .render_automatic_eda_pdf import render_automatic_eda_pdf
from .render_automatic_eda_pptx import render_automatic_eda_pptx
__all__ = [
"render_automatic_eda_pdf",
"render_automatic_eda_pptx",
"decode_qr_image",
"adf_kpss_stationarity",
"acf_pacf",
@@ -88,8 +82,6 @@ __all__ = [
"normality_tests",
"trend_slope",
"run_eda_models",
"project_clusters_2d",
"describe_clusters_llm",
"eda_llm_insights",
"build_eda_notebook",
"describe_numeric",
@@ -1,57 +0,0 @@
"""AutomaticEDA — chapter-based, versioned EDA document with PDF + PPTX output.
Public surface (support package for the registry functions
``render_automatic_eda_pdf`` and ``render_automatic_eda_pptx``):
- Document model: ``Heading``, ``Markdown``, ``KVTable``, ``DataTable``,
``Figure``, ``Image``, ``Caption``, ``Note``, ``Chapter``; normalizers
``as_blocks`` / ``as_chapters``; ``ENGINE_VERSION`` / ``ENGINE_NAME``.
- ``build_document(profile, ctx)`` — assemble the ordered chapters of a profile.
- ``render_pdf(chapters, out_path, meta)`` / ``render_pptx(...)`` — the two
renderers (used by the public registry functions).
- ``merge_manifest(...)`` — write/update the per-chapter version manifest.
"""
from __future__ import annotations
from .model import ( # noqa: F401
ENGINE_NAME,
ENGINE_VERSION,
Caption,
Chapter,
DataTable,
Figure,
Heading,
Image,
KVTable,
Markdown,
Note,
as_blocks,
as_chapters,
merge_manifest,
)
from .chapters_registry import CHAPTER_ORDER, build_chapter, build_document # noqa: F401
from .render_pdf_impl import render_pdf # noqa: F401
from .render_pptx_impl import render_pptx # noqa: F401
__all__ = [
"ENGINE_NAME",
"ENGINE_VERSION",
"Heading",
"Markdown",
"KVTable",
"DataTable",
"Figure",
"Image",
"Caption",
"Note",
"Chapter",
"as_blocks",
"as_chapters",
"merge_manifest",
"CHAPTER_ORDER",
"build_chapter",
"build_document",
"render_pdf",
"render_pptx",
]
@@ -1,7 +0,0 @@
"""AutomaticEDA chapters.
Each chapter is a module ``<id>.py`` exposing ``build_<id>(profile, ctx) ->
Chapter | None`` and a ``CHAPTER_VERSION`` constant. The canonical document
order lives in :mod:`automatic_eda.chapters_registry`. Implemented today:
``portada`` and ``overview`` (the reference chapters other agents copy).
"""
@@ -1,498 +0,0 @@
"""Models chapter (MODELOS) — cheap unsupervised models, rendered as markdown.
Builds the *Modelos* chapter of an AutomaticEDA document from the ``models``
block of a TableProfile (``run_eda_models`` output: ``{pca, kmeans, outliers,
normality}``). It renders, as structured markdown/tables/figures that the core
paginator never cuts:
1. **Normalization note** — every multivariate model below standardizes the
columns with z-score first; the chapter explains why (different scales would
otherwise dominate distance/variance).
2. **PCA** — a scree plot (explained + cumulative variance, single Y axis) plus
variance and top-loadings tables.
3. **KMeans segments** — a PCA scatter **coloured by cluster** (its own
page/slide), the cluster-size table, and a per-cluster LLM micro-analysis
with a title for each segment.
4. **Isolation Forest outliers** — a short explanation of how anomalous rows are
isolated multivariately and how the threshold is chosen, plus the counts.
5. **Normality** — per-column Jarque-Bera / D'Agostino / Shapiro verdicts.
The raw numeric data needed to colour the cluster scatter is **not** in the
TableProfile, so — exactly like ``overview`` reads ``head_rows`` from ``ctx`` —
this chapter looks for the cluster projection / raw numeric columns in ``ctx``
(or in ``profile``) and degrades honestly when they are absent: it falls back to
the uncoloured ``pca.projection`` with a note, or omits the scatter entirely.
ctx keys this chapter consumes (all optional):
cluster_projection : dict — a pre-computed ``project_clusters_2d`` result
(``points``/``labels``/``centers_2d``/``cluster_profiles``/...). Used
directly when present (forward-compatible with the calculation phase).
raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present
and ``cluster_projection`` is not, the chapter calls
``project_clusters_2d`` live to build points + aligned labels.
cluster_titles : list — pre-computed ``[{cluster, title, description}]``
(a ``describe_clusters_llm`` ``clusters`` list). Used for the per-cluster
micro-analysis without an LLM call (offline/tests).
run_cluster_llm : bool — when True and ``cluster_titles`` is absent, call
``describe_clusters_llm`` live on the cluster profiles.
cluster_llm_model : str — model id for the live LLM call.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
"""
from __future__ import annotations
from .. import model
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "modelos"
CHAPTER_TITLE = "Modelos"
# Tableau-10 palette (matplotlib's default cycle) — used both for the matplotlib
# scatter and to keep the legend/colours stable per cluster index.
_CLUSTER_COLORS = [
"#4e79a7", "#f28e2b", "#e15759", "#76b7b2", "#59a14f",
"#edc948", "#b07aa1", "#ff9da7", "#9c755f", "#bab0ac",
]
# --------------------------------------------------------------------------- #
# Formatting helpers (mirror the overview chapter's defensive style).
# --------------------------------------------------------------------------- #
def _fmt_num(value, decimals: int = 3) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "" if value else "no"
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return model._safe_str(value)
def _fmt_pct_ratio(value, decimals: int = 1) -> str:
"""Format a 0..1 ratio as a percentage."""
if value is None:
return ""
try:
return f"{float(value) * 100:.{decimals}f}%"
except (TypeError, ValueError):
return model._safe_str(value)
def _fmt_pct_already(value, decimals: int = 2) -> str:
"""Format a value that is *already* a 0..100 percentage."""
if value is None:
return ""
try:
return f"{float(value):.{decimals}f}%"
except (TypeError, ValueError):
return model._safe_str(value)
def _is_dict(v) -> bool:
return isinstance(v, dict)
# --------------------------------------------------------------------------- #
# Cluster projection: prefer a pre-computed result, else compute it live, else
# fall back to the uncoloured PCA projection.
# --------------------------------------------------------------------------- #
def _resolve_cluster_projection(profile: dict, ctx: dict):
"""Return (projection_dict_or_None, source_label).
Order: ctx/profile['cluster_projection'] (pre-computed) → live
project_clusters_2d on ctx/profile['raw_numeric'] → None.
"""
pre = ctx.get("cluster_projection") or profile.get("cluster_projection")
models = profile.get("models") if _is_dict(profile.get("models")) else {}
if not pre and _is_dict(models):
pre = models.get("cluster_projection")
if _is_dict(pre) and pre.get("points"):
return pre, "precomputed"
raw = ctx.get("raw_numeric") or profile.get("raw_numeric")
if _is_dict(raw) and raw:
try:
# Import the submodule's function explicitly (avoid the package
# attribute shadowing the function with the same-named module).
from datascience.project_clusters_2d import project_clusters_2d
proj = project_clusters_2d(raw)
if _is_dict(proj) and proj.get("points"):
return proj, "live"
except Exception: # noqa: BLE001 — never break the chapter.
return None, "none"
return None, "none"
def _cluster_titles(profile: dict, ctx: dict, projection: dict):
"""Return a list of {cluster, title, description} for the segments.
Order: ctx['cluster_titles'] (pre-computed) → live describe_clusters_llm when
ctx['run_cluster_llm'] and we have cluster_profiles → derived titles from the
distinctive features → None.
"""
pre = ctx.get("cluster_titles")
if isinstance(pre, list) and pre:
return [c for c in pre if _is_dict(c)]
profiles = (projection or {}).get("cluster_profiles") or []
feats = (projection or {}).get("feature_names") or []
if ctx.get("run_cluster_llm") and profiles:
try:
from datascience.describe_clusters_llm import describe_clusters_llm
out = describe_clusters_llm(
profiles, feats,
model=ctx.get("cluster_llm_model", "claude-haiku-4-5-20251001"))
clusters = (out or {}).get("clusters")
if isinstance(clusters, list) and clusters:
return [c for c in clusters if _is_dict(c)]
except Exception: # noqa: BLE001
pass
# Derived fallback: name each cluster by its distinctive features.
if profiles:
derived = []
for p in profiles:
if not _is_dict(p):
continue
cid = p.get("cluster", len(derived))
dist = p.get("distinctive") or []
label = ", ".join(model._safe_str(d) for d in dist[:2]) if dist else ""
title = f"Segmento {cid}" + (f"{label}" if label else "")
derived.append({"cluster": cid, "title": title, "description": ""})
if derived:
return derived
return None
# --------------------------------------------------------------------------- #
# Figure builders (lazy: matplotlib only imported when the renderer draws them).
# --------------------------------------------------------------------------- #
def _make_scree(pca: dict):
"""Return a zero-arg callable drawing the PCA scree plot, or None."""
evr = pca.get("explained_variance_ratio") or []
cum = pca.get("cumulative") or []
if not evr:
return None
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
comps = list(range(1, len(evr) + 1))
fig, ax = plt.subplots(figsize=(7.0, 4.2))
ax.bar(comps, evr, color="#4e79a7", alpha=0.85,
label="Varianza explicada")
if cum:
ax.plot(comps[:len(cum)], cum, color="#e15759", marker="o",
linewidth=1.8, label="Acumulada")
ax.set_xlabel("Componente principal")
ax.set_ylabel("Proporción de varianza")
ax.set_xticks(comps)
ax.set_ylim(0, 1.0)
ax.grid(axis="y", color="#dddddd", linewidth=0.6)
ax.legend(loc="best", fontsize=8, frameon=False)
ax.set_title("Varianza explicada por componente (PCA)", fontsize=10)
fig.tight_layout()
return fig
return _draw
def _make_cluster_scatter(projection: dict):
"""Return a zero-arg callable drawing the cluster scatter, or None."""
points = projection.get("points") or []
labels = projection.get("labels") or []
if not points or len(points) != len(labels):
return None
centers = projection.get("centers_2d") or []
explained = projection.get("explained_2d") or []
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(7.0, 5.2))
uniq = sorted(set(int(l) for l in labels))
for cl in uniq:
xs = [p[0] for p, l in zip(points, labels) if int(l) == cl]
ys = [p[1] for p, l in zip(points, labels) if int(l) == cl]
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
ax.scatter(xs, ys, s=14, c=color, alpha=0.7, linewidths=0,
label=f"Cluster {cl} (n={len(xs)})")
for cl, c in enumerate(centers):
color = _CLUSTER_COLORS[cl % len(_CLUSTER_COLORS)]
ax.scatter([c[0]], [c[1]], s=180, c=color, marker="X",
edgecolors="black", linewidths=1.2, zorder=5)
xlab, ylab = "PC1", "PC2"
if len(explained) >= 2:
xlab = f"PC1 ({_fmt_pct_ratio(explained[0])} var.)"
ylab = f"PC2 ({_fmt_pct_ratio(explained[1])} var.)"
ax.set_xlabel(xlab)
ax.set_ylabel(ylab)
ax.set_title("Segmentos KMeans proyectados sobre el plano PCA",
fontsize=10)
ax.grid(color="#eeeeee", linewidth=0.5)
ax.legend(loc="best", fontsize=8, frameon=True, framealpha=0.9)
fig.tight_layout()
return fig
return _draw
# --------------------------------------------------------------------------- #
# Section builders. Each returns a list of blocks (possibly empty).
# --------------------------------------------------------------------------- #
def _normalization_intro() -> list:
text = (
"Estos modelos son **no supervisados**: buscan estructura latente sin "
"una variable objetivo. Antes de aplicarlos, todas las columnas "
"numéricas se **estandarizan con z-score** (cada valor menos la media, "
"dividido por la desviación típica). Sin esta normalización, una "
"variable con escala grande (p.ej. ingresos en euros) dominaría las "
"distancias y la varianza frente a otra de escala pequeña (p.ej. un "
"ratio entre 0 y 1), sesgando tanto el PCA como el KMeans. Tras la "
"estandarización todas las variables pesan por igual."
)
return [model.Heading(text="Modelos no supervisados", level=1),
model.Markdown(text=text)]
def _pca_section(pca: dict) -> list:
if not _is_dict(pca) or not pca.get("explained_variance_ratio"):
return []
blocks = [model.Heading(text="PCA — varianza explicada", level=2)]
n_used = pca.get("n_rows_used")
n_feat = pca.get("n_features")
intro = (
f"El PCA resume {_fmt_num(n_feat)} variables numéricas en componentes "
f"ortogonales ordenados por la varianza que capturan "
f"({_fmt_num(n_used)} filas usadas tras eliminar nulos). El gráfico de "
"sedimentación (scree) muestra cuánta varianza aporta cada componente y "
"su acumulado: un codo marca cuántos componentes bastan."
)
blocks.append(model.Markdown(text=intro))
scree = _make_scree(pca)
if scree is not None:
blocks.append(model.Figure(
make=scree, caption="Varianza explicada y acumulada por componente."))
evr = pca.get("explained_variance_ratio") or []
cum = pca.get("cumulative") or []
rows = []
for i, v in enumerate(evr):
acc = cum[i] if i < len(cum) else None
rows.append([f"PC{i + 1}", _fmt_pct_ratio(v), _fmt_pct_ratio(acc)])
if rows:
blocks.append(model.DataTable(
header=["Componente", "Varianza", "Acumulada"], rows=rows,
title="Varianza por componente"))
# Top loadings: keep the strongest features per component (capped).
loadings = pca.get("top_loadings") or []
if loadings:
per_comp: dict = {}
for ld in loadings:
if not _is_dict(ld):
continue
comp = ld.get("component")
per_comp.setdefault(comp, [])
if len(per_comp[comp]) < 4:
per_comp[comp].append(ld)
rows = []
for comp in sorted(per_comp.keys(), key=lambda x: (x is None, x)):
for ld in per_comp[comp]:
rows.append([f"PC{int(comp) + 1}" if comp is not None else "",
model._safe_str(ld.get("feature")),
_fmt_num(ld.get("loading"))])
if rows:
blocks.append(model.DataTable(
header=["Componente", "Variable", "Carga"], rows=rows,
title="Cargas principales (top por componente)",
note="Cargas con mayor valor absoluto: qué variables definen "
"cada eje."))
return blocks
def _kmeans_section(kmeans: dict, projection: dict, titles) -> list:
has_km = _is_dict(kmeans) and kmeans.get("best_k")
has_proj = _is_dict(projection) and projection.get("points")
if not has_km and not has_proj:
return []
blocks = [model.Heading(text="Segmentación (KMeans)", level=2)]
best_k = (projection or {}).get("best_k") or (kmeans or {}).get("best_k")
sil = (projection or {}).get("silhouette")
if sil is None:
sil = (kmeans or {}).get("silhouette")
intro = (
f"KMeans agrupa las filas en **{_fmt_num(best_k)} segmentos** elegidos "
"automáticamente maximizando el coeficiente de *silhouette* "
f"(**{_fmt_num(sil)}**, rango 1 a 1: cuanto más alto, segmentos más "
"compactos y separados). Los segmentos se proyectan sobre el plano de "
"los dos primeros componentes principales para visualizarlos."
)
blocks.append(model.Markdown(text=intro))
if has_proj:
scatter = _make_cluster_scatter(projection)
if scatter is not None:
blocks.append(model.Figure(
make=scatter,
caption="Cada punto es una fila coloreada por su segmento "
"KMeans; las «X» son los centroides."))
else:
blocks.append(model.Note(
"Proyección de clusters no dibujable (puntos y etiquetas "
"desalineados)."))
else:
# We have kmeans stats but no aligned points+labels to colour by.
blocks.append(model.Note(
"Scatter coloreado por segmento no disponible: el perfil no incluye "
"la proyección con etiquetas alineadas (pásala en "
"ctx['cluster_projection'] o las columnas crudas en "
"ctx['raw_numeric'] para colorear el plano PCA)."))
# Cluster sizes table.
sizes = (projection or {}).get("cluster_sizes") or (kmeans or {}).get("cluster_sizes") or []
total = sum(s for s in sizes if isinstance(s, (int, float))) or 0
if sizes:
rows = []
for i, s in enumerate(sizes):
pct = (s / total) if total else None
rows.append([f"Cluster {i}", _fmt_num(s), _fmt_pct_ratio(pct)])
blocks.append(model.DataTable(
header=["Segmento", "Tamaño", "% del total"], rows=rows,
title="Tamaño de cada segmento"))
# Per-cluster LLM micro-analysis (each entry kept indivisible as one block).
if titles:
blocks.append(model.Heading(text="Interpretación de los segmentos",
level=3))
for t in titles:
if not _is_dict(t):
continue
cid = t.get("cluster")
title = model._safe_str(t.get("title")) or f"Cluster {cid}"
desc = model._safe_str(t.get("description"))
line = f"**Cluster {cid}{title}.**"
if desc:
line += " " + desc
blocks.append(model.Markdown(text=line))
return blocks
def _outliers_section(outliers: dict) -> list:
if not _is_dict(outliers) or outliers.get("n_outliers") is None:
return []
if outliers.get("note") and not outliers.get("n_rows_used"):
# insufficient data — nothing meaningful to show.
return []
blocks = [model.Heading(text="Detección de anomalías (Isolation Forest)",
level=2)]
explain = (
"**Isolation Forest** detecta filas anómalas de forma *multivariante*: "
"construye árboles que parten el espacio con cortes aleatorios y mide "
"cuántos cortes hacen falta para aislar cada fila. Las filas raras "
"(combinaciones de valores poco frecuentes considerando **todas las "
"columnas a la vez**, no una sola) se aíslan con muy pocos cortes y "
"obtienen un score bajo. El **umbral** de decisión separa las filas "
"normales de las anómalas según la contaminación esperada del modelo: "
"una fila es outlier cuando su score queda por debajo de ese umbral."
)
blocks.append(model.Markdown(text=explain))
blocks.append(model.KVTable(rows=[
("Filas analizadas", _fmt_num(outliers.get("n_rows_used"))),
("Outliers detectados", _fmt_num(outliers.get("n_outliers"))),
("% outliers", _fmt_pct_already(outliers.get("outlier_pct"))),
("Umbral de decisión", _fmt_num(outliers.get("threshold"), 4)),
], title="Anomalías multivariantes"))
return blocks
def _normality_section(normality: dict) -> list:
if not _is_dict(normality) or not normality:
return []
header = ["Columna", "Jarque-Bera (p)", "D'Agostino (p)", "Shapiro (p)",
"¿Normal?"]
rows = []
for col, res in normality.items():
if not _is_dict(res):
continue
jb = res.get("jarque_bera") if _is_dict(res.get("jarque_bera")) else {}
da = res.get("dagostino") if _is_dict(res.get("dagostino")) else {}
sh = res.get("shapiro") if _is_dict(res.get("shapiro")) else {}
is_norm = res.get("is_normal")
if res.get("note") and is_norm is None and not jb:
rows.append([model._safe_str(col), "", "", "",
model._safe_str(res.get("note"))])
continue
rows.append([
model._safe_str(col),
_fmt_num(jb.get("p"), 4) if jb else "",
_fmt_num(da.get("p"), 4) if da else "",
_fmt_num(sh.get("p"), 4) if sh else "",
"" if is_norm else ("no" if is_norm is not None else ""),
])
if not rows:
return []
return [
model.Heading(text="Normalidad de las variables", level=2),
model.Markdown(text=(
"Tests de hipótesis de normalidad por columna (hipótesis nula: la "
"muestra proviene de una distribución normal). Se marca **normal** "
"cuando el p-valor supera 0,05 (no se rechaza la nula). Pocas "
"variables reales son estrictamente normales; esto orienta qué "
"transformaciones o tests robustos aplicar después.")),
model.DataTable(header=header, rows=rows,
title="Pruebas de normalidad"),
]
# --------------------------------------------------------------------------- #
# Entry point.
# --------------------------------------------------------------------------- #
def build_modelos(profile: dict, ctx: dict):
"""Build the MODELOS Chapter, or None if there are no models to show."""
profile = profile or {}
ctx = ctx or {}
if not isinstance(profile, dict):
return None
models = profile.get("models")
if not _is_dict(models):
return None
pca = models.get("pca") if _is_dict(models.get("pca")) else None
kmeans = models.get("kmeans") if _is_dict(models.get("kmeans")) else None
outliers = models.get("outliers") if _is_dict(models.get("outliers")) else None
normality = models.get("normality") if _is_dict(models.get("normality")) else None
projection, _src = _resolve_cluster_projection(profile, ctx)
titles = _cluster_titles(profile, ctx, projection) if (
(kmeans and kmeans.get("best_k")) or (projection and projection.get("points"))
) else None
sections = []
sections += _pca_section(pca) if pca else []
sections += _kmeans_section(kmeans, projection, titles)
sections += _outliers_section(outliers) if outliers else []
sections += _normality_section(normality) if normality else []
if not sections:
return None # models block present but nothing renderable.
blocks = _normalization_intro() + sections
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -1,259 +0,0 @@
"""Tests for the MODELOS chapter — DoD: golden + edges + anti-cut.
Self-contained: builds a synthetic TableProfile with a ``models`` block (no
DuckDB, no sklearn, no LLM, no network). The cluster scatter is fed a synthetic
pre-computed ``cluster_projection`` via ``ctx`` and the per-cluster titles via
``ctx['cluster_titles']`` so the suite is fast and deterministic. The live paths
(``project_clusters_2d`` / ``describe_clusters_llm``) are exercised against the
real wine dataset in the work report, not here.
Verifies: the chapter renders to PDF *and* PPTX showing the user-required pieces
(markdown text, PCA scree, cluster scatter, per-cluster LLM micro-analysis,
outlier + normalization explanations); that an inapplicable profile yields None
without raising; and that a long normality table is split without losing any
column (anti-cut).
"""
import os
import re
import tempfile
from pypdf import PdfReader
from pptx import Presentation
from datascience.automatic_eda.chapters.modelos import build_modelos
from datascience.automatic_eda.model import Figure, DataTable, Markdown
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
# --------------------------------------------------------------------------- #
# Synthetic fixtures.
# --------------------------------------------------------------------------- #
def _models_block(n_norm_cols: int = 4) -> dict:
feats = ["fixed_acidity", "alcohol", "ph", "sulphates"]
normality = {}
for i in range(n_norm_cols):
normality[f"col_{i}"] = {
"n": 500,
"jarque_bera": {"stat": 12.3, "p": 0.002 + i * 0.0001, "normal": False},
"dagostino": {"stat": 9.1, "p": 0.01, "normal": False},
"shapiro": {"stat": 0.98, "p": 0.04, "normal": False},
"is_normal": False,
}
return {
"n_numeric_cols": 4,
"pca": {
"n_components": 2, "n_rows_used": 1599, "n_features": 4,
"explained_variance_ratio": [0.41, 0.22],
"cumulative": [0.41, 0.63],
"top_loadings": [
{"component": 0, "feature": "alcohol", "loading": 0.62},
{"component": 0, "feature": "fixed_acidity", "loading": -0.48},
{"component": 1, "feature": "ph", "loading": 0.71},
{"component": 1, "feature": "sulphates", "loading": 0.33},
],
"projection": [[0.1, 0.2], [0.3, -0.1]],
},
"kmeans": {
"best_k": 3, "silhouette": 0.27,
"scores_by_k": [{"k": 2, "silhouette": 0.21}, {"k": 3, "silhouette": 0.27}],
"cluster_sizes": [700, 500, 399],
"centers": [[0.1, 0.2, 0.3, 0.4]],
"n_rows_used": 1599, "n_features": 4,
},
"outliers": {
"n_outliers": 80, "outlier_pct": 5.0, "threshold": -0.0123,
"n_rows_used": 1599,
},
"normality": normality,
"note": "",
"_feats": feats,
}
def _cluster_projection() -> dict:
# 30 points across 3 clusters, aligned points<->labels.
points, labels = [], []
centers = [(-2.0, -2.0), (2.0, 0.0), (0.0, 2.5)]
for cl, (cx, cy) in enumerate(centers):
for j in range(10):
points.append([cx + (j - 5) * 0.05, cy + (j - 5) * 0.05])
labels.append(cl)
return {
"points": points, "labels": labels,
"centers_2d": [list(c) for c in centers],
"best_k": 3, "silhouette": 0.27,
"explained_2d": [0.41, 0.22],
"cluster_sizes": [10, 10, 10],
"cluster_profiles": [
{"cluster": 0, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 9.5, "ph": 3.5},
"distinctive": ["alcohol", "ph"], "centroid_z": {"alcohol": -1.2}},
{"cluster": 1, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 12.0, "ph": 3.1},
"distinctive": ["alcohol"], "centroid_z": {"alcohol": 1.4}},
{"cluster": 2, "size": 10, "pct": 0.33,
"centroid_original": {"alcohol": 10.5, "ph": 3.8},
"distinctive": ["ph"], "centroid_z": {"ph": 1.6}},
],
"feature_names": ["alcohol", "ph", "fixed_acidity", "sulphates"],
"n_used": 1599, "note": "",
}
def _ctx_full() -> dict:
return {
"cluster_projection": _cluster_projection(),
"cluster_titles": [
{"cluster": 0, "title": "Vinos suaves de baja graduación",
"description": "Alcohol bajo y pH alto; perfil ligero."},
{"cluster": 1, "title": "Vinos potentes",
"description": "Alta graduación alcohólica."},
{"cluster": 2, "title": "Vinos de pH elevado",
"description": "Acidez baja relativa al resto."},
],
}
def _profile() -> dict:
return {"table": "wine", "n_rows": 1599, "n_cols": 12,
"models": _models_block()}
def _pdf_text(path: str) -> str:
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
return re.sub(r"\s+", " ", txt)
def _pptx_text(path: str) -> str:
prs = Presentation(path)
out = []
for slide in prs.slides:
for shape in slide.shapes:
if shape.has_text_frame:
out.append(shape.text_frame.text)
return re.sub(r"\s+", " ", " ".join(out))
# --------------------------------------------------------------------------- #
# Golden.
# --------------------------------------------------------------------------- #
def test_golden_build_modelos_bloques_requeridos():
ch = build_modelos(_profile(), _ctx_full())
assert ch is not None
assert ch.id == "modelos" and ch.version
# Both figures present: scree plot + cluster scatter.
n_figures = sum(1 for b in ch.blocks if isinstance(b, Figure))
assert n_figures >= 2
# Tables present (variance, loadings, sizes, normality).
assert sum(1 for b in ch.blocks if isinstance(b, DataTable)) >= 3
# Markdown carries the required explanations.
md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown))
assert "z-score" in md # normalization explained
assert "Isolation Forest" in md # outlier generation explained
assert "silhouette" in md # kmeans
# Per-cluster micro-analysis titles present.
assert "Vinos potentes" in md
assert "Cluster 1" in md
def test_golden_render_pdf_muestra_lo_exigido():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "modelos.pdf")
res = render_automatic_eda_pdf(
_profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()})
assert res["path"] == out and os.path.exists(out)
ids = [c["id"] for c in res["chapters"]]
assert "modelos" in ids
txt = _pdf_text(out)
for needle in ("Modelos no supervisados", "z-score", "PCA",
"Segmentación", "Isolation Forest", "Normalidad",
"Vinos potentes"):
assert needle in txt, f"falta en PDF: {needle}"
def test_golden_render_pptx_muestra_lo_exigido():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "modelos.pptx")
res = render_automatic_eda_pptx(
_profile(), out, {"title": "EDA — wine", "ctx": _ctx_full()})
assert res["path"] == out and os.path.exists(out)
assert res["n_slides"] >= 1
txt = _pptx_text(out)
for needle in ("Modelos no supervisados", "z-score", "Isolation Forest",
"Vinos potentes"):
assert needle in txt, f"falta en PPTX: {needle}"
# --------------------------------------------------------------------------- #
# Edges.
# --------------------------------------------------------------------------- #
def test_edge_profile_none_o_vacio_devuelve_none():
assert build_modelos(None, {}) is None
assert build_modelos({}, {}) is None
assert build_modelos({"n_rows": 5}, None) is None # no 'models' key
def test_edge_models_insuficiente_devuelve_none():
prof = {"table": "tiny", "models": {
"n_numeric_cols": 1,
"pca": {"n_components": 0, "explained_variance_ratio": [],
"note": "datos insuficientes"},
"kmeans": {"best_k": 0, "note": "datos insuficientes"},
"outliers": {"n_outliers": 0, "note": "datos insuficientes"},
"normality": None,
"note": "insuficientes columnas numericas para modelos multivariantes",
}}
assert build_modelos(prof, {}) is None
def test_edge_solo_normalidad_si_genera_capitulo():
# A single numeric column: only normality applies. Chapter must still build.
prof = {"table": "one", "models": {
"n_numeric_cols": 1, "pca": None, "kmeans": None, "outliers": None,
"normality": {"x": {"n": 500, "jarque_bera": {"stat": 1.0, "p": 0.2,
"normal": True}, "dagostino": {"stat": 1.0, "p": 0.3,
"normal": True}, "shapiro": {"stat": 0.99, "p": 0.4,
"normal": True}, "is_normal": True}},
}}
ch = build_modelos(prof, {})
assert ch is not None
md = " ".join(b.text for b in ch.blocks if isinstance(b, Markdown))
assert "z-score" in md # normalization intro still present
def test_edge_kmeans_sin_proyeccion_degrada_sin_romper():
# kmeans stats present but no cluster_projection / raw_numeric to colour by.
prof = _profile()
ch = build_modelos(prof, {}) # no ctx projection
assert ch is not None
# No scatter figure for clusters, but a Note explaining the degradation.
notes = [b.text for b in ch.blocks if b.kind == "note"]
assert any("ctx['raw_numeric']" in n or "cluster_projection" in n
for n in notes)
# PDF still renders fine.
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "deg.pdf")
res = render_automatic_eda_pdf(prof, out, {"write_manifest": False})
assert res["path"] == out and os.path.exists(out)
# --------------------------------------------------------------------------- #
# Anti-cut.
# --------------------------------------------------------------------------- #
def test_anticortes_tabla_normalidad_larga_no_corta():
# 40 numeric columns → the normality DataTable must split across pages,
# repeating the header, without losing any column name.
prof = {"table": "wide", "models": _models_block(n_norm_cols=40)}
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "wide.pdf")
render_automatic_eda_pdf(prof, out, {"write_manifest": False,
"ctx": _ctx_full()})
reader = PdfReader(out)
n_pages = len(reader.pages)
assert n_pages > 1
txt = "".join((pg.extract_text() or "") for pg in reader.pages)
# Every column name survives (wrapped/split, never truncated).
for i in (0, 19, 39):
assert f"col_{i}" in txt
@@ -1,176 +0,0 @@
"""Overview chapter — df.head, column dictionary and describe (reference).
Second reference chapter for AutomaticEDA. Renders (across as many pages/slides
as needed, the renderers paginate):
1. ``df.head`` the first rows of the table. The current ``TableProfile`` does
NOT carry the raw head, so this is read from ``ctx['head_rows']`` /
``profile['head_rows']`` (a list of row dicts). When absent the chapter shows
an honest placeholder documenting the missing key instead of inventing data.
2. Column dictionary name / type / nulls / non-null examples. Examples come
from ``columns[i]['examples']`` when present; otherwise they are derived from
real non-null profile values (categorical top values, numeric min/median/max)
so the cell is never empty nor fabricated.
3. ``df.describe`` mean / median / min / max / std for every numeric column.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
"""
from __future__ import annotations
from .. import model
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "overview"
CHAPTER_TITLE = "Overview"
# Profile/ctx keys the calculation phase must add for a full head + examples.
HEAD_KEY = "head_rows" # list[dict] — df.head(n)
EXAMPLES_KEY = "examples" # per column: list of non-null sample values
def _fmt_num(value, decimals: int = 3) -> str:
if value is None:
return ""
if isinstance(value, bool):
return str(value)
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return str(value)
def _fmt_pct(value, decimals: int = 1) -> str:
if value is None:
return ""
try:
return f"{float(value) * 100:.{decimals}f}%"
except (TypeError, ValueError):
return str(value)
def _examples_for(col: dict) -> str:
"""Build a short string of real non-null example values for a column."""
explicit = col.get(EXAMPLES_KEY)
if isinstance(explicit, (list, tuple)) and explicit:
return ", ".join(model._safe_str(v) for v in explicit[:4])
cat = col.get("categorical") or {}
top = cat.get("top") or []
if top:
vals = [model._safe_str((t or {}).get("value")) for t in top[:4]
if isinstance(t, dict)]
vals = [v for v in vals if v]
if vals:
return ", ".join(vals)
num = col.get("numeric") or {}
if num:
bits = []
for key in ("min", "median", "max"):
v = num.get(key)
if v is not None:
bits.append(_fmt_num(v))
if bits:
return ", ".join(bits)
return ""
def _head_block(profile: dict, ctx: dict):
"""Return a DataTable for df.head, or a Note documenting the missing key."""
head = ctx.get(HEAD_KEY) or profile.get(HEAD_KEY)
if isinstance(head, list) and head and isinstance(head[0], dict):
# Column order from the profile, then any extra keys present in rows.
cols = [c.get("name") for c in (profile.get("columns") or [])
if c.get("name")]
if not cols:
cols = list(head[0].keys())
rows = [[model._safe_str(r.get(c)) for c in cols] for r in head[:10]]
return model.DataTable(header=cols, rows=rows,
note=f"primeras {len(rows)} filas")
return model.Note(
"df.head no disponible: el TableProfile no incluye 'head_rows'. La fase "
"de cálculo debe añadir profile['head_rows'] (lista de dicts fila) o "
"pasarlo en ctx['head_rows'] para mostrar las primeras filas.")
def _columns_block(profile: dict):
cols = profile.get("columns") or []
header = ["Columna", "Tipo", "Nulos", "Ejemplos (no nulos)"]
rows = []
for c in cols:
if not isinstance(c, dict):
continue
name = c.get("name") or "(col)"
ctype = c.get("inferred_type") or c.get("physical_type") or ""
sem = c.get("semantic_type")
if sem:
ctype = f"{ctype} ({sem})"
null_pct = c.get("null_pct")
null_count = c.get("null_count")
if null_pct is not None:
nulls = _fmt_pct(null_pct)
if null_count is not None:
nulls += f" ({null_count})"
elif null_count is not None:
nulls = str(null_count)
else:
nulls = ""
rows.append([name, ctype, nulls, _examples_for(c)])
if not rows:
return None
return model.DataTable(header=header, rows=rows, title="Columnas")
def _describe_block(profile: dict):
cols = profile.get("columns") or []
header = ["Columna", "mean", "median", "min", "max", "std"]
rows = []
for c in cols:
if not isinstance(c, dict) or c.get("inferred_type") != "numeric":
continue
num = c.get("numeric") or {}
if not num:
continue
rows.append([
c.get("name") or "(col)",
_fmt_num(num.get("mean")),
_fmt_num(num.get("median")),
_fmt_num(num.get("min")),
_fmt_num(num.get("max")),
_fmt_num(num.get("std")),
])
if not rows:
return None
return model.DataTable(header=header, rows=rows, title="Estadística (describe)")
def build_overview(profile: dict, ctx: dict):
"""Build the Overview Chapter, or None if the profile has no columns."""
profile = profile or {}
ctx = ctx or {}
cols = profile.get("columns") or []
if not cols and not (ctx.get(HEAD_KEY) or profile.get(HEAD_KEY)):
return None
blocks = [
model.Heading(text="Primeras filas (df.head)", level=2),
_head_block(profile, ctx),
]
cols_block = _columns_block(profile)
if cols_block is not None:
blocks.append(model.Heading(
text="Diccionario de columnas", level=2))
blocks.append(cols_block)
desc_block = _describe_block(profile)
if desc_block is not None:
blocks.append(model.Heading(
text="Resumen estadístico numérico", level=2))
blocks.append(desc_block)
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -1,156 +0,0 @@
"""Cover chapter (PORTADA) — the reference chapter for AutomaticEDA.
Builds the document cover from a TableProfile plus an optional ``ctx`` of
presentation metadata. Reads everything defensively (``.get``) and degrades
honestly: a field that is neither in the profile nor in ``ctx`` is shown as a
placeholder rather than invented, leaving a hook for the LLM layer to fill it.
Contract for chapter authors (see ``docs/capabilities/automatic_eda.md``):
build_<id>(profile: dict, ctx: dict) -> Chapter | None
CHAPTER_VERSION = "x.y.z"
"""
from __future__ import annotations
import os
from datetime import datetime, timezone
from .. import model
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "portada"
CHAPTER_TITLE = "Portada"
# Default human description of what the table quality score measures. Chapters
# can override it via ctx["quality_criteria"].
_DEFAULT_QUALITY_CRITERIA = (
"media de los scores por columna (0100): completitud (sin nulos/vacíos), "
"validez (tipo y rango coherentes) y consistencia (sin duplicados/constantes)."
)
def _storage_from_source(source: str) -> str:
"""Infer the storage technology the dataset currently lives in.
Heuristic on the profile ``source`` string (a path, DSN or backend name).
Returns a human label; falls back to the raw source when unknown.
"""
s = (source or "").strip().lower()
if not s:
return ""
if s.endswith(".csv") or s.endswith(".tsv"):
return "CSV"
if s.endswith(".parquet") or s.endswith(".pq"):
return "Parquet"
if s.endswith(".json") or s.endswith(".ndjson"):
return "JSON"
if s.endswith(".xlsx") or s.endswith(".xls"):
return "Excel"
if s.endswith((".duckdb", ".ddb")) or s == "duckdb" or s.endswith(".db"):
return "DuckDB"
if s.startswith(("postgres://", "postgresql://")) or "postgres" in s:
return "PostgreSQL"
if s.startswith("bigquery") or "bigquery" in s or s.count(".") == 2 and " " not in s:
return "BigQuery"
if "sqlite" in s:
return "SQLite"
# Unknown: show the raw source so nothing is hidden.
return source
def _fmt_int(v) -> str:
if v is None:
return ""
try:
return f"{int(v):,}".replace(",", ".")
except (TypeError, ValueError):
return str(v)
def _fmt_date_eu(value) -> str:
"""Format a date/ISO string as European DD/MM/AAAA HH:mm (UI convention).
Accepts a datetime, an ISO-8601 string (with or without microseconds/tz) or
any other string. Non-parseable strings are returned verbatim so nothing is
lost; None yields a placeholder.
"""
if value is None:
return ""
if isinstance(value, datetime):
return value.strftime("%d/%m/%Y %H:%M")
s = str(value).strip()
if not s:
return ""
try:
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
return dt.strftime("%d/%m/%Y %H:%M")
except (TypeError, ValueError):
# Try a couple of common forms before giving up.
for fmt in ("%Y-%m-%d %H:%M:%S UTC", "%Y-%m-%d %H:%M UTC",
"%Y-%m-%d %H:%M:%S", "%Y-%m-%d"):
try:
return datetime.strptime(s, fmt).strftime("%d/%m/%Y %H:%M")
except ValueError:
continue
return s
def build_portada(profile: dict, ctx: dict):
"""Build the cover Chapter, or None if there is truly nothing to show."""
profile = profile or {}
ctx = ctx or {}
dataset_name = (ctx.get("dataset_name") or profile.get("table")
or "(dataset sin nombre)")
source = profile.get("source") or ""
# Where the dataset comes from (origin), distinct from where it is stored.
source_origin = ctx.get("source_origin") or source or ""
storage = ctx.get("storage") or _storage_from_source(source)
when = _fmt_date_eu(
ctx.get("generated_at") or profile.get("profiled_at")
or datetime.now(timezone.utc))
n_rows = profile.get("n_rows")
n_cols = profile.get("n_cols")
shape = f"{_fmt_int(n_rows)} filas × {_fmt_int(n_cols)} columnas"
score = profile.get("quality_score")
quality_criteria = ctx.get("quality_criteria") or _DEFAULT_QUALITY_CRITERIA
quality_value = "" if score is None else f"{score} / 100"
# Granularity: ctx wins; else derive from key candidates; else be honest.
granularity = ctx.get("granularity")
if not granularity:
keys = profile.get("key_candidates") or []
if keys:
granularity = ("Cada fila parece identificada por "
+ ", ".join(str(k) for k in keys[:3]) + ".")
else:
granularity = ("Cada fila es… (granularidad no determinada — "
"pendiente de la capa de cálculo/LLM).")
description = ctx.get("description")
if not description:
description = ("Descripción no provista — pendiente de la capa LLM "
"(`run_llm`) o de `ctx['description']`.")
blocks = [
model.Heading(text=str(dataset_name), level=1),
model.Markdown(text="**Automatic-EDA** · informe exploratorio automático"),
model.KVTable(rows=[
("Fuente", source_origin),
("Almacenamiento", storage),
("Generado", when),
("Tamaño", shape),
("Calidad", quality_value),
("Criterios de calidad", quality_criteria),
]),
model.Heading(text="Descripción", level=2),
model.Markdown(text=str(description)),
model.Heading(text="Granularidad", level=2),
model.Markdown(text=str(granularity)),
]
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -1,89 +0,0 @@
"""Chapter registry — the canonical order of an AutomaticEDA document.
``CHAPTER_ORDER`` declares every chapter the engine will *ever* place, in the
order they appear in the document. Each id maps by convention to a module
``automatic_eda/chapters/<id>.py`` exposing ``build_<id>(profile, ctx) ->
Chapter | None`` and a ``CHAPTER_VERSION`` constant.
This pre-declared order is what lets many agents add chapters in parallel
without contention: an agent only creates its own ``chapters/<id>.py`` module
it never edits this file. ``build_document`` imports each chapter lazily; a
chapter whose module does not exist yet (not implemented) is simply skipped, so
the document is always renderable with whatever chapters are present today.
``build_document`` never raises: a chapter that errors out is dropped with a
note, and a chapter that returns ``None`` (does not apply to this dataset, e.g.
time series on a dataset with no date column) is omitted.
"""
from __future__ import annotations
import importlib
from . import model
# Canonical document order. Implemented today: portada, overview. The rest are
# placeholders other agents will fill by creating chapters/<id>.py — they will
# appear in this exact position automatically once their module exists.
CHAPTER_ORDER = [
"portada", # cover
"overview", # df.head + columns/types/nulls/examples + describe
"num_distr", # numeric distributions
"cat_distr", # categorical distributions
"calidad", # data quality
"correlacion", # correlations / associations
"modelos", # cheap models (PCA/KMeans/outliers)
"analisis_llm", # LLM interpretation
"timeseries", # time-series analysis
"geospatial", # geospatial
"agregacion", # aggregations / pivots
]
def build_chapter(chapter_id: str, profile: dict, ctx: dict):
"""Build a single chapter by id, or None if absent/not-applicable/error.
Looks up ``automatic_eda.chapters.<chapter_id>`` and calls its
``build_<chapter_id>(profile, ctx)``. Returns a normalized Chapter, or None
when the module is missing, the builder returns None, or anything raises.
"""
mod_name = f"{__package__}.chapters.{chapter_id}"
try:
mod = importlib.import_module(mod_name)
except Exception: # noqa: BLE001 — chapter not implemented yet → skip.
return None
builder = getattr(mod, f"build_{chapter_id}", None)
if builder is None:
return None
try:
result = builder(profile or {}, ctx or {})
except Exception: # noqa: BLE001 — a broken chapter never aborts the doc.
return None
return model.as_chapter(result)
def build_document(profile: dict, ctx: dict = None) -> list:
"""Build the full ordered list of chapters for a TableProfile.
Args:
profile: the ``eda`` group TableProfile dict (may be None/empty).
ctx: optional context dict carrying presentation metadata not present in
the profile (dataset_name, source_origin, storage, generated_at,
description, granularity, quality_criteria, head_rows, ...).
Returns:
list[Chapter] in canonical order, containing only the chapters that are
implemented and applicable. Never raises.
"""
if profile is None:
profile = {}
if not isinstance(profile, dict):
profile = {}
if ctx is None:
ctx = {}
chapters = []
for cid in CHAPTER_ORDER:
ch = build_chapter(cid, profile, ctx)
if ch is not None and ch.blocks:
chapters.append(ch)
return chapters
@@ -1,310 +0,0 @@
"""AutomaticEDA document model — format-independent blocks and chapters.
This is the intermediate layer between *content* (what an EDA chapter wants to
say) and *output format* (PDF for mobile reading, PPTX for sharing). A document
is an ordered list of :class:`Chapter`. A chapter is ``{id, title, version,
blocks}``. A block is one of a small, closed set of presentation primitives
(heading, markdown, key/value table, data table, figure, image, caption, note).
Neither renderer knows anything about the EDA profile: they only know how to lay
out blocks so that **nothing is ever cut** long text wraps to whole lines,
long tables split by rows repeating the header, figures and images are scaled to
fit entirely. Each chapter declares its own ``version`` so every page/slide can
be stamped ``<Chapter> · v<version>`` and tracked in a manifest for continuous,
per-chapter improvement.
Reading is defensive throughout (the ``eda`` group "dict-no-throw" style): the
normalizers accept dataclass blocks *or* plain dicts, coerce anything unknown
into a readable :class:`Note` instead of raising, and the renderers degrade a
malformed block to text rather than crashing the whole document.
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
# Global engine version. Bump when the document model or a renderer changes in a
# way that affects output. Individual chapters carry their own CHAPTER_VERSION.
ENGINE_VERSION = "1.0.0"
ENGINE_NAME = "AutomaticEDA"
# --------------------------------------------------------------------------- #
# Block primitives. Each carries a stable ``kind`` string so renderers can
# dispatch by kind (works for dataclass instances and for plain dicts alike).
# --------------------------------------------------------------------------- #
@dataclass
class Heading:
"""A section heading. ``level`` 1 (largest) .. 3 (smallest)."""
text: str = ""
level: int = 1
kind: str = field(default="heading", init=False)
@dataclass
class Markdown:
"""A block of light markdown text.
Supported subset (everything else is rendered verbatim, never dropped):
``#``/``##``/``###`` headings, ``-``/``*`` bullet lists, ``| a | b |``
tables (consecutive pipe lines become a data table), blank lines as
paragraph breaks, and ``**bold**`` inline markers (markers are stripped, the
text is kept). Text is wrapped to whole lines so it is never cut mid-line.
"""
text: str = ""
kind: str = field(default="markdown", init=False)
@dataclass
class KVTable:
"""A two-column key/value table. ``rows`` is a list of ``(label, value)``."""
rows: list = field(default_factory=list)
title: Optional[str] = None
kind: str = field(default="kv_table", init=False)
@dataclass
class DataTable:
"""A tabular block with a header row.
If it does not fit in the remaining page/slide space it is split by rows,
**repeating the header** on each continuation. Long cell text wraps inside
its column (the row grows taller) so no cell content is ever lost.
"""
header: list = field(default_factory=list)
rows: list = field(default_factory=list) # list[list[Any]]
title: Optional[str] = None
note: Optional[str] = None
kind: str = field(default="data_table", init=False)
@dataclass
class Figure:
"""A matplotlib figure, scaled to fit entirely (never cropped).
Provide either an already-built ``fig`` (a ``matplotlib.figure.Figure``) or
a zero-arg ``make`` callable that returns one (lazy: only built when the
renderer needs it). ``height_in`` is an optional hint for the target height
on the page; renderers clamp it to the available space preserving aspect.
"""
fig: Any = None
make: Optional[Callable[[], Any]] = None
caption: Optional[str] = None
height_in: Optional[float] = None
kind: str = field(default="figure", init=False)
@dataclass
class Image:
"""A raster image (PNG/JPG) by path, scaled to fit entirely."""
path: str = ""
caption: Optional[str] = None
height_in: Optional[float] = None
kind: str = field(default="image", init=False)
@dataclass
class Caption:
"""Small auxiliary text rendered under a figure/table."""
text: str = ""
kind: str = field(default="caption", init=False)
@dataclass
class Note:
"""Small auxiliary note (italic). Also the fallback for unknown content."""
text: str = ""
kind: str = field(default="note", init=False)
@dataclass
class Chapter:
"""An ordered set of blocks with an id, a title and a generation version."""
id: str = ""
title: str = ""
version: str = "1.0.0"
blocks: list = field(default_factory=list)
# --------------------------------------------------------------------------- #
# Defensive normalizers — accept dataclasses OR plain dicts, never raise.
# --------------------------------------------------------------------------- #
_BLOCK_BY_KIND = {
"heading": Heading,
"markdown": Markdown,
"kv_table": KVTable,
"data_table": DataTable,
"figure": Figure,
"image": Image,
"caption": Caption,
"note": Note,
}
def as_block(obj: Any):
"""Coerce a value into a block dataclass. Unknown values become a Note."""
if isinstance(obj, (Heading, Markdown, KVTable, DataTable, Figure, Image,
Caption, Note)):
return obj
if isinstance(obj, dict):
kind = obj.get("kind")
cls = _BLOCK_BY_KIND.get(kind)
if cls is None:
return Note(text=_safe_str(obj))
# Build only with fields the dataclass accepts (ignore extras).
try:
if cls is Heading:
return Heading(text=_safe_str(obj.get("text")),
level=int(obj.get("level", 1) or 1))
if cls is Markdown:
return Markdown(text=_safe_str(obj.get("text")))
if cls is KVTable:
return KVTable(rows=list(obj.get("rows") or []),
title=obj.get("title"))
if cls is DataTable:
return DataTable(header=list(obj.get("header") or []),
rows=list(obj.get("rows") or []),
title=obj.get("title"), note=obj.get("note"))
if cls is Figure:
return Figure(fig=obj.get("fig"), make=obj.get("make"),
caption=obj.get("caption"),
height_in=obj.get("height_in"))
if cls is Image:
return Image(path=_safe_str(obj.get("path")),
caption=obj.get("caption"),
height_in=obj.get("height_in"))
if cls is Caption:
return Caption(text=_safe_str(obj.get("text")))
if cls is Note:
return Note(text=_safe_str(obj.get("text")))
except Exception: # noqa: BLE001 — never raise on a malformed block.
return Note(text=_safe_str(obj))
return Note(text=_safe_str(obj))
def as_blocks(seq: Any) -> list:
"""Normalize an arbitrary sequence into a list of block dataclasses."""
if seq is None:
return []
if not isinstance(seq, (list, tuple)):
return [as_block(seq)]
return [as_block(b) for b in seq]
def as_chapter(obj: Any) -> Optional[Chapter]:
"""Coerce a value into a Chapter (or None). Accepts a dict or a Chapter."""
if obj is None:
return None
if isinstance(obj, Chapter):
obj.blocks = as_blocks(obj.blocks)
return obj
if isinstance(obj, dict):
return Chapter(
id=_safe_str(obj.get("id")),
title=_safe_str(obj.get("title")) or _safe_str(obj.get("id")),
version=_safe_str(obj.get("version")) or "1.0.0",
blocks=as_blocks(obj.get("blocks")),
)
return None
def as_chapters(seq: Any) -> list:
"""Normalize a sequence of chapters, dropping anything that can't coerce."""
if seq is None:
return []
if isinstance(seq, Chapter):
return [as_chapter(seq)]
if not isinstance(seq, (list, tuple)):
return []
out = []
for c in seq:
ch = as_chapter(c)
if ch is not None:
out.append(ch)
return out
def _safe_str(v: Any) -> str:
"""str() that never raises and maps None to ''."""
if v is None:
return ""
try:
return str(v)
except Exception: # noqa: BLE001
return ""
# --------------------------------------------------------------------------- #
# Manifest — per-chapter versions and page/slide counts for tracking.
# --------------------------------------------------------------------------- #
def merge_manifest(manifest_path: str, renderer: str, chapters_meta: list,
generated_at: str,
engine_version: str = ENGINE_VERSION) -> dict:
"""Read-modify-write the AutomaticEDA manifest, merging one renderer's run.
The manifest lives next to the outputs as ``automatic_eda_manifest.json``
and records, per chapter, its version plus the page count (PDF) and slide
count (PPTX). Calling either renderer creates or updates it. Never raises:
on any error returns the in-memory manifest without writing.
Args:
manifest_path: path to the JSON manifest to create or update.
renderer: "pdf" or "pptx" selects which count key is written.
chapters_meta: list of ``{"id", "version", "n_pages"|"n_slides"}``.
generated_at: ISO-ish timestamp string for this run.
engine_version: AutomaticEDA engine version.
Returns:
The merged manifest dict (also written to disk on success).
"""
data: dict = {}
try:
if manifest_path and os.path.exists(manifest_path):
with open(manifest_path, "r", encoding="utf-8") as fh:
loaded = json.load(fh)
if isinstance(loaded, dict):
data = loaded
except Exception: # noqa: BLE001 — a corrupt manifest is overwritten.
data = {}
data["engine"] = ENGINE_NAME
data["engine_version"] = engine_version
data["generated_at"] = generated_at
chapters = data.get("chapters")
if not isinstance(chapters, dict):
chapters = {}
count_key = "n_slides" if renderer == "pptx" else "n_pages"
for cm in chapters_meta or []:
if not isinstance(cm, dict):
continue
cid = cm.get("id")
if not cid:
continue
entry = chapters.get(cid)
if not isinstance(entry, dict):
entry = {}
entry["version"] = cm.get("version") or entry.get("version") or "1.0.0"
entry[count_key] = cm.get(count_key, cm.get("n_pages", cm.get("n_slides")))
chapters[cid] = entry
data["chapters"] = chapters
try:
parent = os.path.dirname(os.path.abspath(manifest_path))
os.makedirs(parent, exist_ok=True)
with open(manifest_path, "w", encoding="utf-8") as fh:
json.dump(data, fh, ensure_ascii=False, indent=2, default=str)
except Exception: # noqa: BLE001 — never raise from the manifest writer.
pass
return data
@@ -1,532 +0,0 @@
"""AutomaticEDA PDF renderer — A5 portrait, mobile-first, never cuts content.
A flow paginator: it measures each block (using the deterministic character grid
from :mod:`text_layout`) and places it top-to-bottom on the current page. When a
unit does not fit in the remaining space it moves whole to the next page
text by whole lines (never mid-line, never mid-word), data tables by rows
**repeating the header**, figures/images scaled to fit entirely (never cropped).
Each chapter starts on a fresh page and every page is stamped in the footer with
``<Chapter> · v<version>`` plus the engine version and a running page number, so
output is versioned per chapter for continuous improvement.
dict-no-throw: a failure inside one block is caught and noted; the PDF is always
produced and at least one page is guaranteed. Engine: matplotlib ``PdfPages``.
"""
from __future__ import annotations
import io
import os
import matplotlib
matplotlib.use("Agg")
import matplotlib.image as mpimg # noqa: E402
import matplotlib.pyplot as plt # noqa: E402
from matplotlib.backends.backend_pdf import PdfPages # noqa: E402
from matplotlib.patches import Rectangle # noqa: E402
from . import model # noqa: E402
from . import text_layout as tl # noqa: E402
# A5 portrait, inches.
_W, _H = 5.83, 8.27
_ML, _MR, _MT, _MB = 0.5, 0.42, 0.55, 0.5
_FOOTER_H = 0.34
_USABLE_W = _W - _ML - _MR
_CONTENT_TOP = _MT
_CONTENT_BOTTOM = _H - _MB - _FOOTER_H
# Palette / type (inherits the Tufte-ish mobile look of render_eda_pdf).
_INK = "#1b1b1b"
_ACCENT = "#2a6f97"
_MUTED = "#8a8a8a"
_RULE = "#cccccc"
_HEAD_BG = "#eef3f6"
_RC = {
"font.size": 10,
"font.family": "sans-serif",
"figure.facecolor": "white",
"savefig.facecolor": "white",
"pdf.fonttype": 42, # embed TrueType — text stays selectable on mobile.
}
# Font sizes (pt) and derived line heights (in).
_FS_H1, _FS_H2, _FS_H3 = 17, 13, 11
_FS_BODY, _FS_CELL, _FS_NOTE = 10.5, 9.0, 9.0
_GAP = 0.12 # vertical gap after a block, inches.
_CELL_PAD = 0.06 # horizontal padding inside a table cell, inches.
_ROW_VPAD = 0.05 # vertical padding inside a table row, inches.
class _PdfState:
"""Mutable layout cursor for the running PDF document."""
def __init__(self, pdf, title: str):
self.pdf = pdf
self.title = title
self.fig = None
self.y = _CONTENT_TOP # inches from the top of the page.
self.page = 0 # global page counter.
self.chapter = None # current Chapter (for the footer).
self.chapter_pages = 0 # pages produced for the current chapter.
# --------------------------------------------------------------------------- #
# Coordinate helpers (inches-from-top → matplotlib figure fraction).
# --------------------------------------------------------------------------- #
def _yf(y_in: float) -> float:
return 1.0 - (y_in / _H)
def _xf(x_in: float) -> float:
return x_in / _W
def _new_page(st: _PdfState) -> None:
"""Close the current page (if any) and open a fresh one with a footer."""
_flush_page(st)
st.fig = plt.figure(figsize=(_W, _H))
st.y = _CONTENT_TOP
st.page += 1
st.chapter_pages += 1
_draw_footer(st)
def _flush_page(st: _PdfState) -> None:
if st.fig is not None:
st.pdf.savefig(st.fig)
plt.close(st.fig)
st.fig = None
def _draw_footer(st: _PdfState) -> None:
ch = st.chapter
left = ""
if ch is not None:
left = f"{ch.title} · v{ch.version}"
right = f"{model.ENGINE_NAME} v{model.ENGINE_VERSION} · p.{st.page}"
yb = (_MB * 0.45) / _H
st.fig.text(_xf(_ML), yb, left, fontsize=7.5, color=_MUTED,
ha="left", va="center")
st.fig.text(_xf(_W - _MR), yb, right, fontsize=7.5, color=_MUTED,
ha="right", va="center")
# A thin rule above the footer.
st.fig.add_artist(Rectangle(
(_xf(_ML), (_MB + _FOOTER_H * 0.5) / _H),
_xf(_W - _MR) - _xf(_ML), 0.0008,
transform=st.fig.transFigure, color=_RULE, lw=0.6))
def _remaining(st: _PdfState) -> float:
return _CONTENT_BOTTOM - st.y
def _ensure_space(st: _PdfState, height: float) -> None:
"""Open a new page if ``height`` does not fit in the remaining space."""
if _remaining(st) < height:
_new_page(st)
# --------------------------------------------------------------------------- #
# Block placers. Each advances st.y and paginates as needed.
# --------------------------------------------------------------------------- #
def _place_heading(st: _PdfState, block) -> None:
level = max(1, min(3, int(getattr(block, "level", 1) or 1)))
fs = {1: _FS_H1, 2: _FS_H2, 3: _FS_H3}[level]
text = tl.strip_inline_md(getattr(block, "text", ""))
max_chars = tl.chars_per_line(_USABLE_W, fs)
lines = tl.wrap(text, max_chars)
lh = tl.line_height_in(fs, leading=1.2)
block_h = lh * len(lines) + 0.06
# Keep at least the heading + a couple of body lines together when possible.
_ensure_space(st, min(block_h + tl.line_height_in(_FS_BODY) * 2,
_CONTENT_BOTTOM - _CONTENT_TOP))
for ln in lines:
_ensure_space(st, lh)
st.fig.text(_xf(_ML), _yf(st.y), ln, fontsize=fs, fontweight="bold",
color=_INK, ha="left", va="top")
st.y += lh
if level == 1:
# Accent underline under a top-level heading.
st.fig.add_artist(Rectangle(
(_xf(_ML), _yf(st.y + 0.02)), _xf(_ML + 1.4) - _xf(_ML), 0.0016,
transform=st.fig.transFigure, color=_ACCENT, lw=0))
st.y += 0.10
st.y += _GAP
def _place_text_lines(st: _PdfState, lines: list, fs: float, color: str,
style: str = "normal", indent: float = 0.0) -> None:
lh = tl.line_height_in(fs)
for ln in lines:
_ensure_space(st, lh)
st.fig.text(_xf(_ML + indent), _yf(st.y), ln, fontsize=fs, color=color,
ha="left", va="top", style=style)
st.y += lh
def _place_markdown(st: _PdfState, block) -> None:
raw = getattr(block, "text", "") or ""
md_lines = str(raw).split("\n")
i = 0
n = len(md_lines)
while i < n:
line = md_lines[i]
stripped = line.strip()
# Consecutive pipe-table lines → a DataTable.
if stripped.startswith("|") and stripped.endswith("|"):
j = i
tbl_lines = []
while j < n and md_lines[j].strip().startswith("|") \
and md_lines[j].strip().endswith("|"):
tbl_lines.append(md_lines[j])
j += 1
parsed = tl.parse_md_table(tbl_lines)
if parsed:
header, rows = parsed
_place_data_table(st, model.DataTable(header=header, rows=rows))
i = j
continue
if stripped == "":
st.y += tl.line_height_in(_FS_BODY) * 0.5
i += 1
continue
if stripped.startswith("### "):
_place_heading(st, model.Heading(stripped[4:], level=3))
i += 1
continue
if stripped.startswith("## "):
_place_heading(st, model.Heading(stripped[3:], level=2))
i += 1
continue
if stripped.startswith("# "):
_place_heading(st, model.Heading(stripped[2:], level=1))
i += 1
continue
if stripped.startswith("- ") or stripped.startswith("* "):
content = tl.strip_inline_md(stripped[2:])
bullet_chars = tl.chars_per_line(_USABLE_W - 0.22, _FS_BODY)
wrapped = tl.wrap(content, bullet_chars)
first = True
for w in wrapped:
prefix = "" if first else " "
_place_text_lines(st, [prefix + w], _FS_BODY, _INK,
indent=0.0)
first = False
i += 1
continue
# Plain paragraph (gather following plain lines into one paragraph).
para = [tl.strip_inline_md(stripped)]
j = i + 1
while j < n:
nxt = md_lines[j].strip()
if nxt == "" or nxt.startswith(("|", "#", "- ", "* ")):
break
para.append(tl.strip_inline_md(nxt))
j += 1
text = " ".join(para)
max_chars = tl.chars_per_line(_USABLE_W, _FS_BODY)
_place_text_lines(st, tl.wrap(text, max_chars), _FS_BODY, _INK)
i = j
st.y += _GAP
def _place_kv_table(st: _PdfState, block) -> None:
title = getattr(block, "title", None)
if title:
_place_heading(st, model.Heading(title, level=2))
rows = getattr(block, "rows", []) or []
key_w = 1.9 # inches reserved for the label column.
val_chars = tl.chars_per_line(_USABLE_W - key_w - 0.1, _FS_BODY)
lh = tl.line_height_in(_FS_BODY)
for row in rows:
try:
label, value = row[0], row[1]
except Exception: # noqa: BLE001
label, value = str(row), ""
v_lines = tl.wrap(model._safe_str(value), val_chars)
row_h = lh * len(v_lines) + _ROW_VPAD
_ensure_space(st, row_h)
y0 = st.y
st.fig.text(_xf(_ML), _yf(y0), tl.strip_inline_md(model._safe_str(label)),
fontsize=_FS_BODY, color=_MUTED, ha="left", va="top")
for k, vl in enumerate(v_lines):
st.fig.text(_xf(_ML + key_w), _yf(y0 + k * lh), vl,
fontsize=_FS_BODY, color=_INK, ha="left", va="top")
st.y = y0 + row_h
st.y += _GAP
def _col_widths(header: list, rows: list, fs: float) -> list:
"""Distribute usable width across columns proportional to content length."""
ncol = len(header) if header else (len(rows[0]) if rows else 1)
ncol = max(1, ncol)
natural = [3] * ncol
for c in range(ncol):
if header and c < len(header):
natural[c] = max(natural[c], len(model._safe_str(header[c])))
for r in rows:
if c < len(r):
natural[c] = max(natural[c], len(model._safe_str(r[c])))
# Clamp so one very long column does not starve the others.
clamped = [min(max(w, 4), 40) for w in natural]
total = float(sum(clamped)) or 1.0
widths = [_USABLE_W * w / total for w in clamped]
# Enforce a minimum readable column width.
min_w = 0.45
widths = [max(w, min_w) for w in widths]
# Renormalize if the minimums pushed us over the usable width.
s = sum(widths)
if s > _USABLE_W:
widths = [w * _USABLE_W / s for w in widths]
return widths
def _wrap_row(cells: list, widths: list, fs: float) -> list:
"""Wrap each cell to its column width → list of line-lists per cell."""
out = []
for c, w in enumerate(widths):
text = model._safe_str(cells[c]) if c < len(cells) else ""
max_chars = tl.chars_per_line(w - _CELL_PAD * 2, fs)
out.append(tl.wrap(text, max_chars))
return out
def _draw_table_row(st: _PdfState, cells_lines: list, widths: list, fs: float,
y0: float, header: bool) -> float:
lh = tl.line_height_in(fs)
nlines = max((len(c) for c in cells_lines), default=1)
row_h = lh * nlines + _ROW_VPAD * 2
if header:
st.fig.add_artist(Rectangle(
(_xf(_ML), _yf(y0 + row_h)), _xf(_ML + _USABLE_W) - _xf(_ML),
_yf(y0) - _yf(y0 + row_h), transform=st.fig.transFigure,
color=_HEAD_BG, lw=0, zorder=0))
x = _ML
for c, lines in enumerate(cells_lines):
for k, ln in enumerate(lines):
st.fig.text(_xf(x + _CELL_PAD), _yf(y0 + _ROW_VPAD + k * lh), ln,
fontsize=fs, color=_INK,
fontweight="bold" if header else "normal",
ha="left", va="top", zorder=2)
x += widths[c]
# Bottom rule of the row.
st.fig.add_artist(Rectangle(
(_xf(_ML), _yf(y0 + row_h)), _xf(_ML + _USABLE_W) - _xf(_ML), 0.0006,
transform=st.fig.transFigure, color=_RULE, lw=0, zorder=1))
return row_h
def _place_data_table(st: _PdfState, block) -> None:
title = getattr(block, "title", None)
if title:
_place_heading(st, model.Heading(title, level=2))
header = list(getattr(block, "header", []) or [])
rows = list(getattr(block, "rows", []) or [])
fs = _FS_CELL
widths = _col_widths(header, rows, fs)
header_lines = _wrap_row(header, widths, fs) if header else None
lh = tl.line_height_in(fs)
def header_h() -> float:
if not header_lines:
return 0.0
return lh * max((len(c) for c in header_lines), default=1) + _ROW_VPAD * 2
def draw_header() -> None:
if header_lines:
st.y += _draw_table_row(st, header_lines, widths, fs, st.y,
header=True)
# Ensure header + first row fit, else start on a new page.
first_row_h = 0.0
if rows:
first_lines = _wrap_row(rows[0], widths, fs)
first_row_h = lh * max((len(c) for c in first_lines), default=1) \
+ _ROW_VPAD * 2
_ensure_space(st, header_h() + max(first_row_h, lh))
draw_header()
for r in rows:
cells_lines = _wrap_row(r, widths, fs)
row_h = lh * max((len(c) for c in cells_lines), default=1) \
+ _ROW_VPAD * 2
if _remaining(st) < row_h:
_new_page(st)
draw_header() # repeat header on the continuation page.
st.y += _draw_table_row(st, cells_lines, widths, fs, st.y, header=False)
note = getattr(block, "note", None)
if note:
_place_text_lines(st, tl.wrap(model._safe_str(note),
tl.chars_per_line(_USABLE_W, _FS_NOTE)),
_FS_NOTE, _MUTED, style="italic")
st.y += _GAP
def _resolve_figure(block):
fig = getattr(block, "fig", None)
if fig is not None:
return fig, False
make = getattr(block, "make", None)
if callable(make):
try:
return make(), True
except Exception: # noqa: BLE001
return None, False
return None, False
def _png_from_figure(fig) -> bytes:
buf = io.BytesIO()
fig.savefig(buf, format="png", dpi=150, bbox_inches="tight")
buf.seek(0)
return buf.read()
def _place_image_array(st: _PdfState, arr, caption) -> None:
h_px, w_px = arr.shape[0], arr.shape[1]
aspect = (h_px / w_px) if w_px else 1.0
max_h = _CONTENT_BOTTOM - _CONTENT_TOP
target_w = _USABLE_W
target_h = target_w * aspect
if target_h > max_h:
target_h = max_h
target_w = target_h / aspect if aspect else _USABLE_W
cap_h = tl.line_height_in(_FS_NOTE) + 0.04 if caption else 0.0
# Move whole image to next page if it does not fit in remaining space.
if _remaining(st) < target_h + cap_h:
if (max_h) >= target_h + cap_h:
_new_page(st)
else:
# Taller than a full page even at min — already clamped to max_h.
_new_page(st)
left_frac = _xf(_ML + (_USABLE_W - target_w) / 2.0)
bottom_frac = _yf(st.y + target_h)
ax = st.fig.add_axes([left_frac, bottom_frac, target_w / _W, target_h / _H])
ax.imshow(arr)
ax.axis("off")
st.y += target_h + 0.04
if caption:
_place_text_lines(st, tl.wrap(model._safe_str(caption),
tl.chars_per_line(_USABLE_W, _FS_NOTE)),
_FS_NOTE, _MUTED, style="italic")
st.y += _GAP
def _place_figure(st: _PdfState, block) -> None:
fig, owned = _resolve_figure(block)
if fig is None:
_place_text_lines(st, ["(figura no disponible)"], _FS_NOTE, _MUTED,
style="italic")
st.y += _GAP
return
try:
png = _png_from_figure(fig)
finally:
if owned:
try:
plt.close(fig)
except Exception: # noqa: BLE001
pass
arr = mpimg.imread(io.BytesIO(png))
_place_image_array(st, arr, getattr(block, "caption", None))
def _place_image(st: _PdfState, block) -> None:
path = getattr(block, "path", "")
if not path or not os.path.exists(path):
_place_text_lines(st, [f"(imagen no encontrada: {path})"], _FS_NOTE,
_MUTED, style="italic")
st.y += _GAP
return
arr = mpimg.imread(path)
_place_image_array(st, arr, getattr(block, "caption", None))
def _place_caption(st: _PdfState, block) -> None:
_place_text_lines(st, tl.wrap(getattr(block, "text", ""),
tl.chars_per_line(_USABLE_W, _FS_NOTE)),
_FS_NOTE, _MUTED, style="italic")
st.y += _GAP
def _place_note(st: _PdfState, block) -> None:
_place_text_lines(st, tl.wrap(getattr(block, "text", ""),
tl.chars_per_line(_USABLE_W, _FS_NOTE)),
_FS_NOTE, _MUTED, style="italic")
st.y += _GAP
_PLACERS = {
"heading": _place_heading,
"markdown": _place_markdown,
"kv_table": _place_kv_table,
"data_table": _place_data_table,
"figure": _place_figure,
"image": _place_image,
"caption": _place_caption,
"note": _place_note,
}
def render_pdf(chapters: list, out_path: str, meta: dict = None) -> dict:
"""Render a list of Chapters into an A5-portrait, mobile-readable PDF.
Never raises. Returns ``{path, n_pages, chapters, note}`` where ``chapters``
is a list of ``{id, version, n_pages}`` for the manifest. On a fatal write
error ``path`` is None and ``note`` explains why.
"""
meta = meta or {}
chapters = model.as_chapters(chapters)
notes = []
try:
parent = os.path.dirname(os.path.abspath(out_path))
os.makedirs(parent, exist_ok=True)
except OSError as e:
return {"path": None, "n_pages": 0, "chapters": [],
"note": f"no se pudo crear el directorio destino: {e}"}
title = meta.get("title") or model.ENGINE_NAME
chapters_meta = []
try:
with plt.rc_context(_RC):
with PdfPages(out_path) as pdf:
st = _PdfState(pdf, title)
for ch in chapters:
st.chapter = ch
st.chapter_pages = 0
_new_page(st) # each chapter starts on a fresh page.
for block in ch.blocks:
placer = _PLACERS.get(getattr(block, "kind", ""),
_place_note)
try:
placer(st, block)
except Exception as e: # noqa: BLE001
notes.append(
f"bloque '{getattr(block, 'kind', '?')}' del "
f"capítulo '{ch.id}' omitido: {e}")
chapters_meta.append({"id": ch.id, "version": ch.version,
"n_pages": st.chapter_pages})
_flush_page(st)
if st.page == 0:
# No chapters at all → guarantee one valid page.
st.chapter = model.Chapter(id="vacio", title=title,
version=model.ENGINE_VERSION)
_new_page(st)
_place_note(st, model.Note(
"(documento vacío — sin capítulos aplicables)"))
_flush_page(st)
n_pages = st.page
except Exception as e: # noqa: BLE001
return {"path": None, "n_pages": 0, "chapters": [],
"note": f"fallo al escribir el PDF: {e}"}
note = f"{n_pages} páginas"
if notes:
note += " · " + "; ".join(notes)
return {"path": out_path, "n_pages": n_pages, "chapters": chapters_meta,
"note": note}
@@ -1,518 +0,0 @@
"""AutomaticEDA PPTX renderer — 16:9 slides, never cuts content.
Same flow principle as the PDF renderer but onto PowerPoint slides: measure each
block and place it top-to-bottom; when it does not fit in the remaining slide
space, continue on a new slide titled ``<Chapter> (cont.)``. Data tables split by
rows **repeating the header**; figures/images are scaled to fit entirely. Every
slide carries a footer ``<Chapter> · v<version>`` plus the engine version.
dict-no-throw: a failure inside one block is caught and noted; the deck is always
produced with at least one slide. Engine: ``python-pptx`` (added dependency).
"""
from __future__ import annotations
import io
import os
from . import model
from . import text_layout as tl
try:
from pptx import Presentation
from pptx.util import Inches, Pt, Emu
from pptx.dml.color import RGBColor
from pptx.enum.text import PP_ALIGN
_PPTX_OK = True
_PPTX_ERR = ""
except Exception as _e: # noqa: BLE001 — surfaced as a dict-no-throw note.
_PPTX_OK = False
_PPTX_ERR = str(_e)
# 16:9 widescreen, inches.
_W, _H = 13.333, 7.5
_ML, _MR = 0.7, 0.7
_TITLE_TOP, _TITLE_H = 0.28, 0.7
_CONTENT_TOP = 1.12
_FOOTER_H = 0.4
_CONTENT_BOTTOM = _H - _FOOTER_H - 0.15
_USABLE_W = _W - _ML - _MR
_INK = (0x1B, 0x1B, 0x1B)
_ACCENT = (0x2A, 0x6F, 0x97)
_MUTED = (0x8A, 0x8A, 0x8A)
_HEAD_BG = (0xEE, 0xF3, 0xF6)
_WHITE = (0xFF, 0xFF, 0xFF)
_FS_TITLE = 26
_FS_H1, _FS_H2, _FS_H3 = 20, 16, 13
_FS_BODY, _FS_CELL, _FS_NOTE = 14, 11, 11
_GAP = 0.12
class _PptxState:
def __init__(self, prs, title: str):
self.prs = prs
self.title = title
self.slide = None
self.y = _CONTENT_TOP
self.chapter = None
self.slide_no = 0
self.chapter_slides = 0
def _rgb(c):
return RGBColor(*c)
def _new_slide(st: _PptxState, cont: bool = False) -> None:
blank = st.prs.slide_layouts[6]
st.slide = st.prs.slides.add_slide(blank)
st.y = _CONTENT_TOP
st.slide_no += 1
st.chapter_slides += 1
_draw_title(st, cont)
_draw_footer(st)
def _draw_title(st: _PptxState, cont: bool) -> None:
ch = st.chapter
title = ch.title if ch is not None else st.title
if cont:
title = f"{title} (cont.)"
box = st.slide.shapes.add_textbox(
Inches(_ML), Inches(_TITLE_TOP), Inches(_USABLE_W), Inches(_TITLE_H))
tf = box.text_frame
tf.word_wrap = True
p = tf.paragraphs[0]
run = p.add_run()
run.text = title
run.font.size = Pt(_FS_TITLE)
run.font.bold = True
run.font.color.rgb = _rgb(_INK)
def _draw_footer(st: _PptxState) -> None:
ch = st.chapter
left = f"{ch.title} · v{ch.version}" if ch is not None else ""
right = f"{model.ENGINE_NAME} v{model.ENGINE_VERSION} · {st.slide_no}"
box = st.slide.shapes.add_textbox(
Inches(_ML), Inches(_H - _FOOTER_H), Inches(_USABLE_W),
Inches(_FOOTER_H * 0.7))
tf = box.text_frame
tf.word_wrap = False
p = tf.paragraphs[0]
r = p.add_run()
r.text = left
r.font.size = Pt(9)
r.font.color.rgb = _rgb(_MUTED)
# Right-aligned engine stamp on a second textbox.
box2 = st.slide.shapes.add_textbox(
Inches(_ML), Inches(_H - _FOOTER_H), Inches(_USABLE_W),
Inches(_FOOTER_H * 0.7))
tf2 = box2.text_frame
p2 = tf2.paragraphs[0]
p2.alignment = PP_ALIGN.RIGHT
r2 = p2.add_run()
r2.text = right
r2.font.size = Pt(9)
r2.font.color.rgb = _rgb(_MUTED)
def _remaining(st: _PptxState) -> float:
return _CONTENT_BOTTOM - st.y
def _ensure(st: _PptxState, height: float) -> None:
if _remaining(st) < height:
_new_slide(st, cont=True)
def _add_text(st: _PptxState, lines: list, fs: float, color, bold=False,
italic=False, indent=0.0, bullet=False) -> None:
lh = tl.line_height_in(fs)
height = lh * len(lines) + 0.05
_ensure(st, height)
box = st.slide.shapes.add_textbox(
Inches(_ML + indent), Inches(st.y), Inches(_USABLE_W - indent),
Inches(height))
tf = box.text_frame
tf.word_wrap = True
first = True
for ln in lines:
p = tf.paragraphs[0] if first else tf.add_paragraph()
first = False
run = p.add_run()
run.text = ("" + ln) if bullet else ln
run.font.size = Pt(fs)
run.font.bold = bold
run.font.italic = italic
run.font.color.rgb = _rgb(color)
st.y += height
def _place_heading(st: _PptxState, block) -> None:
level = max(1, min(3, int(getattr(block, "level", 1) or 1)))
fs = {1: _FS_H1, 2: _FS_H2, 3: _FS_H3}[level]
text = tl.strip_inline_md(getattr(block, "text", ""))
lines = tl.wrap(text, tl.chars_per_line(_USABLE_W, fs))
_add_text(st, lines, fs, _INK, bold=True)
st.y += 0.04
def _place_markdown(st: _PptxState, block) -> None:
raw = str(getattr(block, "text", "") or "")
md_lines = raw.split("\n")
i, n = 0, len(md_lines)
while i < n:
stripped = md_lines[i].strip()
if stripped.startswith("|") and stripped.endswith("|"):
j = i
tbl = []
while j < n and md_lines[j].strip().startswith("|") \
and md_lines[j].strip().endswith("|"):
tbl.append(md_lines[j])
j += 1
parsed = tl.parse_md_table(tbl)
if parsed:
header, rows = parsed
_place_data_table(st, model.DataTable(header=header, rows=rows))
i = j
continue
if stripped == "":
st.y += tl.line_height_in(_FS_BODY) * 0.4
i += 1
continue
if stripped.startswith("### "):
_place_heading(st, model.Heading(stripped[4:], level=3))
i += 1
continue
if stripped.startswith("## "):
_place_heading(st, model.Heading(stripped[3:], level=2))
i += 1
continue
if stripped.startswith("# "):
_place_heading(st, model.Heading(stripped[2:], level=1))
i += 1
continue
if stripped.startswith("- ") or stripped.startswith("* "):
content = tl.strip_inline_md(stripped[2:])
lines = tl.wrap(content, tl.chars_per_line(_USABLE_W - 0.3, _FS_BODY))
_add_text(st, lines, _FS_BODY, _INK, bullet=True)
i += 1
continue
para = [tl.strip_inline_md(stripped)]
j = i + 1
while j < n:
nxt = md_lines[j].strip()
if nxt == "" or nxt.startswith(("|", "#", "- ", "* ")):
break
para.append(tl.strip_inline_md(nxt))
j += 1
text = " ".join(para)
_add_text(st, tl.wrap(text, tl.chars_per_line(_USABLE_W, _FS_BODY)),
_FS_BODY, _INK)
i = j
st.y += _GAP
def _place_kv_table(st: _PptxState, block) -> None:
title = getattr(block, "title", None)
if title:
_place_heading(st, model.Heading(title, level=2))
rows = getattr(block, "rows", []) or []
data_rows = []
for row in rows:
try:
label, value = row[0], row[1]
except Exception: # noqa: BLE001
label, value = str(row), ""
data_rows.append([model._safe_str(label), model._safe_str(value)])
_place_data_table(st, model.DataTable(header=["Campo", "Valor"],
rows=data_rows), shaded_header=True,
key_value=True)
def _col_widths(header, rows):
ncol = len(header) if header else (len(rows[0]) if rows else 1)
ncol = max(1, ncol)
natural = [3] * ncol
for c in range(ncol):
if header and c < len(header):
natural[c] = max(natural[c], len(model._safe_str(header[c])))
for r in rows:
if c < len(r):
natural[c] = max(natural[c], len(model._safe_str(r[c])))
clamped = [min(max(w, 4), 44) for w in natural]
total = float(sum(clamped)) or 1.0
return [_USABLE_W * w / total for w in clamped]
def _row_height_in(cells, widths, fs) -> float:
lh = tl.line_height_in(fs)
maxlines = 1
for c, w in enumerate(widths):
text = model._safe_str(cells[c]) if c < len(cells) else ""
lines = tl.wrap(text, tl.chars_per_line(w - 0.12, fs))
maxlines = max(maxlines, len(lines))
return lh * maxlines + 0.10
def _emit_table(st: _PptxState, header, chunk, widths, fs) -> None:
nrows = len(chunk) + (1 if header else 0)
ncol = len(widths)
# Pre-measure total height to size the shape (pptx still auto-grows rows).
heights = []
if header:
heights.append(_row_height_in(header, widths, fs))
for r in chunk:
heights.append(_row_height_in(r, widths, fs))
total_h = sum(heights)
gtable = st.slide.shapes.add_table(
nrows, ncol, Inches(_ML), Inches(st.y), Inches(_USABLE_W),
Inches(total_h)).table
gtable.first_row = bool(header)
gtable.horz_banding = False
for c in range(ncol):
gtable.columns[c].width = Emu(int(Inches(widths[c])))
ridx = 0
if header:
for c in range(ncol):
cell = gtable.cell(0, c)
cell.text = model._safe_str(header[c]) if c < len(header) else ""
_style_cell(cell, fs, _INK, bold=True, fill=_HEAD_BG)
ridx = 1
for r in chunk:
for c in range(ncol):
cell = gtable.cell(ridx, c)
cell.text = model._safe_str(r[c]) if c < len(r) else ""
_style_cell(cell, fs, _INK, bold=False, fill=_WHITE)
ridx += 1
st.y += total_h + _GAP
def _style_cell(cell, fs, color, bold, fill) -> None:
cell.fill.solid()
cell.fill.fore_color.rgb = _rgb(fill)
cell.margin_left = Inches(0.05)
cell.margin_right = Inches(0.05)
cell.margin_top = Inches(0.02)
cell.margin_bottom = Inches(0.02)
for p in cell.text_frame.paragraphs:
for run in p.runs:
run.font.size = Pt(fs)
run.font.bold = bold
run.font.color.rgb = _rgb(color)
def _place_data_table(st: _PptxState, block, shaded_header=True,
key_value=False) -> None:
title = getattr(block, "title", None)
if title:
_place_heading(st, model.Heading(title, level=2))
header = list(getattr(block, "header", []) or [])
rows = list(getattr(block, "rows", []) or [])
fs = _FS_CELL
widths = _col_widths(header, rows)
header_h = _row_height_in(header, widths, fs) if header else 0.0
idx = 0
n = len(rows)
if n == 0:
# Header-only table still rendered (one slide).
_ensure(st, header_h + 0.2)
_emit_table(st, header, [], widths, fs)
return
while idx < n:
# Greedily fill the current slide with as many rows as fit.
if _remaining(st) < header_h + _row_height_in(rows[idx], widths, fs):
_new_slide(st, cont=True)
avail = _remaining(st) - header_h
chunk = []
used = 0.0
while idx < n:
rh = _row_height_in(rows[idx], widths, fs)
if used + rh > avail and chunk:
break
chunk.append(rows[idx])
used += rh
idx += 1
_emit_table(st, header, chunk, widths, fs)
note = getattr(block, "note", None)
if note:
_add_text(st, tl.wrap(model._safe_str(note),
tl.chars_per_line(_USABLE_W, _FS_NOTE)), _FS_NOTE, _MUTED,
italic=True)
def _img_size_px(data: bytes):
try:
from PIL import Image
with Image.open(io.BytesIO(data)) as im:
return im.size # (w, h)
except Exception: # noqa: BLE001
return (1200, 800)
def _resolve_png(block):
fig = getattr(block, "fig", None)
make = getattr(block, "make", None)
f = fig
owned = False
if f is None and callable(make):
try:
f = make()
owned = True
except Exception: # noqa: BLE001
f = None
if f is None:
return None
try:
import matplotlib.pyplot as plt
buf = io.BytesIO()
f.savefig(buf, format="png", dpi=150, bbox_inches="tight")
buf.seek(0)
return buf.read()
except Exception: # noqa: BLE001
return None
finally:
if owned:
try:
import matplotlib.pyplot as plt
plt.close(f)
except Exception: # noqa: BLE001
pass
def _place_picture_bytes(st: _PptxState, data: bytes, caption) -> None:
w_px, h_px = _img_size_px(data)
aspect = (h_px / w_px) if w_px else 0.66
max_h = _CONTENT_BOTTOM - _CONTENT_TOP
target_w = _USABLE_W
target_h = target_w * aspect
if target_h > max_h:
target_h = max_h
target_w = target_h / aspect if aspect else _USABLE_W
cap_h = tl.line_height_in(_FS_NOTE) + 0.05 if caption else 0.0
if _remaining(st) < target_h + cap_h:
_new_slide(st, cont=True)
left = _ML + (_USABLE_W - target_w) / 2.0
st.slide.shapes.add_picture(io.BytesIO(data), Inches(left), Inches(st.y),
width=Inches(target_w), height=Inches(target_h))
st.y += target_h + 0.05
if caption:
_add_text(st, tl.wrap(model._safe_str(caption),
tl.chars_per_line(_USABLE_W, _FS_NOTE)), _FS_NOTE, _MUTED,
italic=True)
st.y += _GAP
def _place_figure(st: _PptxState, block) -> None:
png = _resolve_png(block)
if png is None:
_add_text(st, ["(figura no disponible)"], _FS_NOTE, _MUTED, italic=True)
st.y += _GAP
return
_place_picture_bytes(st, png, getattr(block, "caption", None))
def _place_image(st: _PptxState, block) -> None:
path = getattr(block, "path", "")
if not path or not os.path.exists(path):
_add_text(st, [f"(imagen no encontrada: {path})"], _FS_NOTE, _MUTED,
italic=True)
st.y += _GAP
return
try:
with open(path, "rb") as fh:
data = fh.read()
except Exception as e: # noqa: BLE001
_add_text(st, [f"(no se pudo leer la imagen: {e})"], _FS_NOTE, _MUTED,
italic=True)
st.y += _GAP
return
_place_picture_bytes(st, data, getattr(block, "caption", None))
def _place_caption(st: _PptxState, block) -> None:
_add_text(st, tl.wrap(getattr(block, "text", ""),
tl.chars_per_line(_USABLE_W, _FS_NOTE)), _FS_NOTE, _MUTED,
italic=True)
st.y += _GAP
def _place_note(st: _PptxState, block) -> None:
_place_caption(st, block)
_PLACERS = {
"heading": _place_heading,
"markdown": _place_markdown,
"kv_table": _place_kv_table,
"data_table": _place_data_table,
"figure": _place_figure,
"image": _place_image,
"caption": _place_caption,
"note": _place_note,
}
def render_pptx(chapters: list, out_path: str, meta: dict = None) -> dict:
"""Render a list of Chapters into a 16:9 PPTX deck. Never raises.
Returns ``{path, n_slides, chapters, note}`` where ``chapters`` is a list of
``{id, version, n_slides}`` for the manifest. On a fatal error ``path`` is
None and ``note`` explains why (e.g. python-pptx not installed).
"""
meta = meta or {}
if not _PPTX_OK:
return {"path": None, "n_slides": 0, "chapters": [],
"note": f"python-pptx no disponible: {_PPTX_ERR}"}
chapters = model.as_chapters(chapters)
notes = []
try:
parent = os.path.dirname(os.path.abspath(out_path))
os.makedirs(parent, exist_ok=True)
except OSError as e:
return {"path": None, "n_slides": 0, "chapters": [],
"note": f"no se pudo crear el directorio destino: {e}"}
title = meta.get("title") or model.ENGINE_NAME
chapters_meta = []
try:
prs = Presentation()
prs.slide_width = Inches(_W)
prs.slide_height = Inches(_H)
st = _PptxState(prs, title)
for ch in chapters:
st.chapter = ch
st.chapter_slides = 0
_new_slide(st, cont=False)
for block in ch.blocks:
placer = _PLACERS.get(getattr(block, "kind", ""), _place_note)
try:
placer(st, block)
except Exception as e: # noqa: BLE001
notes.append(
f"bloque '{getattr(block, 'kind', '?')}' del capítulo "
f"'{ch.id}' omitido: {e}")
chapters_meta.append({"id": ch.id, "version": ch.version,
"n_slides": st.chapter_slides})
if st.slide_no == 0:
st.chapter = model.Chapter(id="vacio", title=title,
version=model.ENGINE_VERSION)
_new_slide(st, cont=False)
_place_note(st, model.Note(
"(documento vacío — sin capítulos aplicables)"))
prs.save(out_path)
n_slides = st.slide_no
except Exception as e: # noqa: BLE001
return {"path": None, "n_slides": 0, "chapters": [],
"note": f"fallo al escribir el PPTX: {e}"}
note = f"{n_slides} slides"
if notes:
note += " · " + "; ".join(notes)
return {"path": out_path, "n_slides": n_slides, "chapters": chapters_meta,
"note": note}
@@ -1,107 +0,0 @@
"""Shared text-measurement helpers for the AutomaticEDA renderers.
Both renderers flow content top-to-bottom and must know, *before* placing a
block, how much vertical space it will take that is what guarantees nothing is
cut: a unit either fits in the remaining space or moves to the next page/slide
whole. Measuring proportional text exactly in matplotlib/pptx is impractical, so
we use a deterministic character-grid estimate (chars-per-line from an average
glyph width) which slightly over-estimates and is therefore safe: it never
claims something fits when it would overflow.
Wrapping is word-aware (``textwrap``) and additionally hard-splits any single
token longer than the line so a 200-character value still wraps instead of
overflowing that is wrapping, not loss: every character is still rendered.
"""
from __future__ import annotations
import textwrap
def avg_char_width_in(fontsize_pt: float) -> float:
"""Approximate average glyph width in inches for a sans-serif font.
~0.5 of the point size is a conservative mean advance width for proportional
sans fonts; dividing by 72 converts points to inches.
"""
return 0.5 * fontsize_pt / 72.0
def line_height_in(fontsize_pt: float, leading: float = 1.32) -> float:
"""Line height in inches for a given font size and leading."""
return leading * fontsize_pt / 72.0
def chars_per_line(width_in: float, fontsize_pt: float) -> int:
"""How many average glyphs fit in ``width_in`` at ``fontsize_pt``."""
cw = avg_char_width_in(fontsize_pt)
if cw <= 0:
return 80
n = int(width_in / cw)
return max(1, n)
def wrap(text: str, max_chars: int) -> list:
"""Word-wrap ``text`` to lines of at most ``max_chars``, never losing chars.
Long tokens (no spaces) are hard-split so they cannot overflow. Existing
newlines are honored as hard breaks. Empty input yields a single empty line
so callers can still reserve a row.
"""
if max_chars < 1:
max_chars = 1
s = "" if text is None else str(text)
out: list = []
for raw_line in s.split("\n"):
if raw_line == "":
out.append("")
continue
# textwrap with break_long_words so no token overflows the column.
wrapped = textwrap.wrap(
raw_line, width=max_chars, break_long_words=True,
break_on_hyphens=False, replace_whitespace=True,
drop_whitespace=True,
)
if not wrapped:
out.append("")
else:
out.extend(wrapped)
return out or [""]
def strip_inline_md(text: str) -> str:
"""Strip a tiny subset of inline markdown markers, keeping the text.
Removes ``**bold**`` / ``__bold__`` / ``*em*`` / `` `code` `` markers so the
content is preserved without trying to style spans (which the line-grid
layout cannot do). Nothing is dropped except the markers themselves.
"""
if not text:
return ""
s = str(text)
for marker in ("**", "__", "`"):
s = s.replace(marker, "")
return s
def parse_md_table(lines: list):
"""Parse consecutive ``| a | b |`` lines into ``(header, rows)`` or None.
Accepts an optional separator row (``|---|---|``) right after the header,
which is ignored. Returns None if the lines are not a pipe table.
"""
cells_rows = []
for ln in lines:
s = ln.strip()
if not (s.startswith("|") and s.endswith("|")):
return None
parts = [c.strip() for c in s.strip("|").split("|")]
cells_rows.append(parts)
if not cells_rows:
return None
header = cells_rows[0]
body = cells_rows[1:]
# Drop a markdown separator row (all cells are dashes/colons).
if body and all(set(c) <= set("-: ") and "-" in c for c in body[0]):
body = body[1:]
return header, body
@@ -1,97 +0,0 @@
---
name: describe_clusters_llm
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def describe_clusters_llm(cluster_profiles: list, feature_names: list, model: str = \"claude-haiku-4-5-20251001\") -> dict"
description: "Micro-analisis LLM de clusters de KMeans (grupo eda). Toma los perfiles AGREGADOS de cada cluster (los que produce project_clusters_2d: tamano, centroide en escala original, features distintivas y centroide en z-score) y, con UNA sola llamada al LLM, pide por cada cluster un TITULO corto + una descripcion de 1-2 frases en espanol. Clave de coste/privacidad: NO envia filas crudas, solo el resumen agregado de cada grupo (tamano, % del total y la media de las features distintivas con su signo respecto a la media global). Reusa ask_llm del grupo claude-direct (API directa con token OAuth de Claude). Impura, dict-no-throw: nunca lanza, degrada a titulos genericos 'Cluster N' si el LLM no responde o el parseo falla."
tags: [eda, clustering, llm, claude-direct, datascience, kmeans]
params:
- name: cluster_profiles
desc: "Lista de perfiles de cluster con la forma que produce project_clusters_2d: cada uno {cluster:int, size:int, pct:float, centroid_original:{feature: media en escala original}, distinctive:[features distintivas], centroid_z:{feature: z-score}}. Solo se le envia al LLM un resumen agregado; nunca filas crudas. Lista vacia o no-lista -> clusters=[] sin llamar al LLM."
- name: feature_names
desc: "Nombres de las features del dataset. Se incluyen como contexto en el prompt para que el LLM pueda nombrar los clusters; no es obligatorio que coincida con las features distintivas de cada perfil."
- name: model
desc: "id del modelo Anthropic a usar. Default 'claude-haiku-4-5-20251001' (haiku, coste bajo, ~2-3s). Para titulos/descripciones mas finas, pasar p.ej. 'claude-opus-4-8'."
output: "dict dict-no-throw: {clusters:[{cluster:int, title:str, description:str}], model:str, note:str}. note=='' si todo fue bien. Si el LLM no respondio (note='LLM no disponible') o el parseo fallo (note='parse fallido'), clusters trae titulos genericos 'Cluster N' con description vacia. Si cluster_profiles esta vacio o no es lista: {clusters:[], model, note:'sin clusters'}. NUNCA lanza."
uses_functions: [ask_llm_py_core]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: true
tests: ["test_parse_clusters_json_valid_array", "test_parse_clusters_json_wrapped_in_junk_text", "test_parse_clusters_json_non_json_returns_none", "test_parse_clusters_json_fills_missing_cluster_by_index", "test_describe_clusters_llm_ok_with_monkeypatched_llm", "test_describe_clusters_llm_degrades_on_empty_response", "test_describe_clusters_llm_degrades_on_unparseable_response", "test_describe_clusters_llm_empty_list_skips_llm", "test_describe_clusters_llm_non_list_input_skips_llm"]
test_file_path: "python/functions/datascience/describe_clusters_llm_test.py"
file_path: "python/functions/datascience/describe_clusters_llm.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.describe_clusters_llm import describe_clusters_llm
# Perfiles agregados producidos por project_clusters_2d (no hay filas crudas).
cluster_profiles = [
{
"cluster": 0, "size": 60, "pct": 60.0,
"centroid_original": {"acidez": 8.5, "alcohol": 9.2},
"distinctive": ["acidez", "alcohol"],
"centroid_z": {"acidez": 1.4, "alcohol": -0.9},
},
{
"cluster": 1, "size": 40, "pct": 40.0,
"centroid_original": {"acidez": 5.1, "alcohol": 13.0},
"distinctive": ["alcohol"],
"centroid_z": {"acidez": -0.7, "alcohol": 1.6},
},
]
feature_names = ["acidez", "alcohol", "azucar"]
out = describe_clusters_llm(cluster_profiles, feature_names) # haiku por defecto
# out = describe_clusters_llm(cluster_profiles, feature_names, model="claude-opus-4-8")
if not out["note"]:
for c in out["clusters"]:
print(f"Cluster {c['cluster']}: {c['title']}")
print(" ", c["description"])
else:
# Degradacion: titulos genericos "Cluster N".
print("LLM no usado:", out["note"])
for c in out["clusters"]:
print(c["cluster"], c["title"])
```
## Cuando usarla
Cuando ya has clusterizado un dataset (KMeans + `project_clusters_2d`) y quieres
poner NOMBRE y descripcion legible a cada grupo en vez de dejar "Cluster 0/1/2".
Es el paso interpretativo que sigue al perfilado de clusters: `project_clusters_2d`
calcula tamano, centroides y features distintivas, y `describe_clusters_llm` los
traduce a un titulo corto + 1-2 frases por cluster. Usala al cerrar un EDA con
segmentacion para el resumen final o el report. Una sola llamada al LLM describe
todos los clusters a la vez (barato).
## Gotchas
- **Impura: hace 1 llamada de red al LLM.** No es determinista ni gratis. Latencia
tipica ~2-3s con haiku.
- **Requiere token OAuth de Claude** en `~/.claude/.credentials.json` (via `ask_llm`
/ grupo `claude-direct`). Sin token / sin red, NO lanza: degrada a titulos
genericos `Cluster N` con `note="LLM no disponible"`.
- **NO envia filas crudas al LLM**, solo el resumen AGREGADO de cada cluster
(tamano, % del total y la media de las features distintivas con su signo respecto
a la media global). Privacidad y coste minimos por diseno — pero requiere que los
perfiles vengan ya calculados por `project_clusters_2d`.
- **Modelo `haiku` por defecto** para coste bajo; sube a `claude-opus-4-8` si
necesitas titulos/descripciones mas finas (mas caro y lento).
- **dict-no-throw**: si el modelo no devuelve un JSON array parseable, retorna
titulos genericos con `note="parse fallido"`. Comprueba siempre `out["note"]`
antes de fiarte de los titulos.
- El LLM puede sobre-interpretar: el system prompt le pide ser sobrio y no inventar
causas, pero revisa los titulos antes de publicarlos en un report.
@@ -1,240 +0,0 @@
"""describe_clusters_llm — micro-analisis LLM de clusters de KMeans (grupo `eda`).
Toma los PERFILES AGREGADOS de cada cluster (los que produce `project_clusters_2d`:
tamano, centroide en escala original, features distintivas y centroide en z-score)
y, con UNA sola llamada al LLM, pide por cada cluster un TITULO corto + una
descripcion de 1-2 frases, en espanol.
Clave de coste y privacidad: NO se envian filas crudas al LLM. Solo viaja el
perfil AGREGADO de cada grupo (tamano, % del total y la media de las features
distintivas con su signo respecto a la media global). El coste es minimo y ningun
dato fila-a-fila sale del proceso.
Reusa `ask_llm` del registry (grupo claude-direct, API directa con el token OAuth
de Claude en ~/.claude/.credentials.json, arranque 0). Impura: una llamada de red.
Estilo dict-no-throw: NUNCA lanza; ante cualquier fallo (red, LLM caido, parseo)
degrada a titulos genericos "Cluster N" + una nota explicando el motivo.
"""
import json
from core.ask_llm import ask_llm
_SYSTEM = (
"Eres un analista de datos. Recibes los PERFILES AGREGADOS de los clusters de "
"un KMeans (por cada grupo: su tamano y la media de sus features distintivas, "
"con el signo respecto a la media global; nunca filas crudas) y los describes "
"de forma sobria y util. Para cada cluster generas un titulo corto y "
"descriptivo (por ejemplo 'Vinos de alta acidez y baja graduacion') y una "
"descripcion de 1-2 frases. NO inventes causas ni sobre-interpretes: limitate a "
"lo que dicen los numeros. Responde en espanol. Responde SIEMPRE y SOLO con un "
"unico JSON array valido, sin texto alrededor y sin fences de markdown, con "
'EXACTAMENTE la forma [{"cluster": <int>, "title": "<titulo corto>", '
'"description": "<1-2 frases>"}], un objeto por cluster.'
)
def _fmt_num(value) -> str:
"""Formatea un numero de forma compacta para el prompt (None -> '?')."""
if value is None:
return "?"
if isinstance(value, bool):
return str(value)
if isinstance(value, float):
if value == int(value):
return str(int(value))
return f"{value:.4g}"
return str(value)
def _cluster_id(profile: dict, index: int) -> int:
"""Devuelve el id del cluster del perfil, o el indice si no es un int valido."""
raw = (profile or {}).get("cluster")
if isinstance(raw, bool):
return index
if isinstance(raw, int):
return raw
try:
return int(raw)
except (TypeError, ValueError):
return index
def _build_prompt(cluster_profiles: list, feature_names: list) -> str:
"""Construye un resumen textual compacto de los perfiles para el LLM.
Funcion interna PURA: no toca red ni disco, es testeable sin credenciales.
Por cada cluster incluye su numero, tamano (size + pct%) y, para cada feature
distintiva, el valor del centroide en escala original mas si esta por encima o
por debajo de la media (signo del z-score en centroid_z). Pasa AGREGADOS, nunca
dato crudo de filas.
Args:
cluster_profiles: lista de perfiles de cluster (forma de project_clusters_2d).
feature_names: nombres de las features del dataset (solo contexto).
Returns:
El texto del prompt.
"""
cluster_profiles = cluster_profiles or []
feature_names = feature_names if isinstance(feature_names, list) else []
lines = [
"Perfiles AGREGADOS de clusters de KMeans. No hay filas crudas, solo medias por grupo.",
f"Numero de clusters: {len(cluster_profiles)}",
]
if feature_names:
lines.append("Features del dataset: " + ", ".join(str(f) for f in feature_names))
lines.append("")
for i, prof in enumerate(cluster_profiles):
prof = prof or {}
cid = _cluster_id(prof, i)
size = prof.get("size")
pct = prof.get("pct")
pct_str = f"{pct:.1f}%" if isinstance(pct, (int, float)) and not isinstance(pct, bool) else "?"
lines.append(f"Cluster {cid}: tamano={_fmt_num(size)} ({pct_str} del total)")
distinctive = prof.get("distinctive") or []
centroid_o = prof.get("centroid_original") or {}
centroid_z = prof.get("centroid_z") or {}
if distinctive:
lines.append(" Features distintivas (media del grupo):")
for feat in distinctive:
val = centroid_o.get(feat)
z = centroid_z.get(feat)
direction = ""
if isinstance(z, (int, float)) and not isinstance(z, bool):
if z > 0:
direction = "por encima de la media"
elif z < 0:
direction = "por debajo de la media"
else:
direction = "en la media"
if direction:
lines.append(f" - {feat}: {_fmt_num(val)} ({direction})")
else:
lines.append(f" - {feat}: {_fmt_num(val)}")
else:
lines.append(" (sin features distintivas marcadas)")
lines.append("")
lines.append(
"Devuelve SOLO el JSON array descrito en las instrucciones del sistema, "
"sin texto antes ni despues."
)
return "\n".join(lines)
def _parse_clusters_json(text: str, n: int):
"""Extrae y normaliza el array JSON de la respuesta del LLM.
Funcion interna testeable sin red. Localiza el primer '[' y el ultimo ']' del
texto (tolerando texto basura alrededor o fences de markdown), hace json.loads
y normaliza cada entrada a {cluster:int, title:str, description:str}, rellenando
el cluster por indice si falta. NUNCA lanza: ante cualquier fallo devuelve None
(senal de degradacion para el caller).
Args:
text: respuesta cruda del LLM.
n: numero de perfiles esperados (referencia; la longitud real la marca el array).
Returns:
Lista normalizada de dicts, o None si no se pudo parsear un array valido.
"""
if not text or not isinstance(text, str):
return None
start = text.find("[")
end = text.rfind("]")
if start == -1 or end == -1 or end <= start:
return None
try:
data = json.loads(text[start : end + 1])
except (ValueError, TypeError):
return None
if not isinstance(data, list):
return None
out = []
for i, item in enumerate(data):
if not isinstance(item, dict):
out.append({"cluster": i, "title": f"Cluster {i}", "description": ""})
continue
raw_cluster = item.get("cluster")
if isinstance(raw_cluster, bool):
cluster = i
elif isinstance(raw_cluster, int):
cluster = raw_cluster
else:
try:
cluster = int(raw_cluster)
except (TypeError, ValueError):
cluster = i
title = item.get("title")
title = str(title) if title is not None else f"Cluster {cluster}"
desc = item.get("description")
desc = str(desc) if desc is not None else ""
out.append({"cluster": cluster, "title": title, "description": desc})
return out
def _generic_clusters(cluster_profiles: list) -> list:
"""Titulos genericos por cluster para la degradacion (sin LLM)."""
out = []
for i, prof in enumerate(cluster_profiles):
cid = _cluster_id(prof or {}, i)
out.append({"cluster": cid, "title": f"Cluster {cid}", "description": ""})
return out
def describe_clusters_llm(
cluster_profiles: list,
feature_names: list,
model: str = "claude-haiku-4-5-20251001",
) -> dict:
"""Describe los clusters de un KMeans con UNA sola llamada al LLM.
Args:
cluster_profiles: lista de perfiles de cluster (la forma que produce
project_clusters_2d): cada uno {"cluster": int, "size": int,
"pct": float, "centroid_original": {feature: media},
"distinctive": [features], "centroid_z": {feature: z}}. Solo se le
envia al LLM el resumen agregado, nunca filas crudas.
feature_names: nombres de las features del dataset (contexto para el LLM).
model: id del modelo Anthropic. Default claude-haiku-4-5-20251001
(haiku, coste bajo).
Returns:
dict dict-no-throw: {"clusters": [{cluster:int, title:str, description:str}],
"model": str, "note": str}. note == "" si todo fue bien; si el LLM no
respondio o el parseo fallo, clusters trae titulos genericos "Cluster N" y
note explica el motivo ("LLM no disponible" / "parse fallido"). Si
cluster_profiles esta vacio o no es lista, devuelve clusters=[] sin llamar
al LLM (note "sin clusters"). NUNCA lanza.
"""
if not isinstance(cluster_profiles, list) or not cluster_profiles:
return {"clusters": [], "model": model, "note": "sin clusters"}
n = len(cluster_profiles)
prompt = _build_prompt(cluster_profiles, feature_names)
try:
text = ask_llm(prompt, model=model, system=_SYSTEM, echo=False)
except Exception: # noqa: BLE001 — degradacion: cualquier fallo de red/LLM.
text = ""
parsed = _parse_clusters_json(text, n)
if parsed:
return {"clusters": parsed, "model": model, "note": ""}
note = "LLM no disponible" if not text else "parse fallido"
return {"clusters": _generic_clusters(cluster_profiles), "model": model, "note": note}
@@ -1,160 +0,0 @@
"""Tests para describe_clusters_llm.
NO acceden a red ni a credenciales: _parse_clusters_json es testeable aislada y la
unica via que llamaria al LLM (describe_clusters_llm) se prueba monkeypatcheando
ask_llm con respuestas simuladas. Cubre golden (LLM ok), edge (cluster faltante,
array envuelto en basura, lista vacia / input no-lista) y error (LLM caido, texto
no parseable) todos sin tocar la red.
"""
import importlib
import json
from datascience.describe_clusters_llm import (
_parse_clusters_json,
describe_clusters_llm,
)
# Perfiles de ejemplo con la forma que produce project_clusters_2d.
_PROFILES = [
{
"cluster": 0,
"size": 60,
"pct": 60.0,
"centroid_original": {"acidez": 8.5, "alcohol": 9.2},
"distinctive": ["acidez", "alcohol"],
"centroid_z": {"acidez": 1.4, "alcohol": -0.9},
},
{
"cluster": 1,
"size": 40,
"pct": 40.0,
"centroid_original": {"acidez": 5.1, "alcohol": 13.0},
"distinctive": ["alcohol"],
"centroid_z": {"acidez": -0.7, "alcohol": 1.6},
},
]
_FEATURES = ["acidez", "alcohol", "azucar"]
def _patch_ask_llm(monkeypatch, returner):
"""Monkeypatchea ask_llm en el modulo bajo prueba con un callable simulado."""
mod = importlib.import_module("datascience.describe_clusters_llm")
monkeypatch.setattr(
mod, "ask_llm", lambda prompt, model="x", system="", echo=True: returner
)
# --- _parse_clusters_json (parser puro, sin red) ---
def test_parse_clusters_json_valid_array():
text = json.dumps(
[
{"cluster": 0, "title": "A", "description": "desc a"},
{"cluster": 1, "title": "B", "description": "desc b"},
]
)
parsed = _parse_clusters_json(text, 2)
assert parsed == [
{"cluster": 0, "title": "A", "description": "desc a"},
{"cluster": 1, "title": "B", "description": "desc b"},
]
def test_parse_clusters_json_wrapped_in_junk_text():
payload = [{"cluster": 0, "title": "Solo uno", "description": "d"}]
text = "Claro, aqui tienes el resultado:\n" + json.dumps(payload) + "\nEspero que sirva."
parsed = _parse_clusters_json(text, 1)
assert parsed[0]["title"] == "Solo uno"
assert parsed[0]["cluster"] == 0
def test_parse_clusters_json_non_json_returns_none():
# Texto sin array JSON -> degradacion (None) sin lanzar.
assert _parse_clusters_json("no hay json aqui", 2) is None
assert _parse_clusters_json("", 2) is None
assert _parse_clusters_json("{solo un objeto}", 2) is None
def test_parse_clusters_json_fills_missing_cluster_by_index():
text = json.dumps(
[
{"title": "A", "description": "d"},
{"title": "B", "description": "e"},
]
)
parsed = _parse_clusters_json(text, 2)
assert parsed[0]["cluster"] == 0
assert parsed[1]["cluster"] == 1
assert parsed[0]["title"] == "A"
# --- describe_clusters_llm (con ask_llm monkeypatcheado, sin red) ---
def test_describe_clusters_llm_ok_with_monkeypatched_llm(monkeypatch):
fake = json.dumps(
[
{
"cluster": 0,
"title": "Vinos de alta acidez",
"description": "Acidez por encima de la media y graduacion baja.",
},
{
"cluster": 1,
"title": "Vinos de alta graduacion",
"description": "Alcohol claramente por encima de la media.",
},
]
)
_patch_ask_llm(monkeypatch, fake)
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["note"] == ""
assert out["model"] == "claude-haiku-4-5-20251001"
assert len(out["clusters"]) == 2
assert out["clusters"][0]["title"] == "Vinos de alta acidez"
assert set(out["clusters"][0].keys()) == {"cluster", "title", "description"}
def test_describe_clusters_llm_degrades_on_empty_response(monkeypatch):
# ask_llm devuelve "" (error/red caida) -> titulos genericos + note.
_patch_ask_llm(monkeypatch, "")
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["clusters"][0]["title"] == "Cluster 0"
assert out["clusters"][1]["title"] == "Cluster 1"
assert out["clusters"][0]["description"] == ""
assert out["note"] == "LLM no disponible"
assert out["model"] == "claude-haiku-4-5-20251001"
def test_describe_clusters_llm_degrades_on_unparseable_response(monkeypatch):
_patch_ask_llm(monkeypatch, "lo siento, no puedo ayudarte con eso")
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["clusters"][0]["title"] == "Cluster 0"
assert out["clusters"][1]["title"] == "Cluster 1"
assert out["note"] == "parse fallido"
def test_describe_clusters_llm_empty_list_skips_llm(monkeypatch):
# Con lista vacia NO debe llamarse al LLM en absoluto.
def boom(*args, **kwargs):
raise AssertionError("ask_llm no debe llamarse con lista vacia")
mod = importlib.import_module("datascience.describe_clusters_llm")
monkeypatch.setattr(mod, "ask_llm", boom)
out = describe_clusters_llm([], _FEATURES)
assert out["clusters"] == []
assert out["note"] == "sin clusters"
def test_describe_clusters_llm_non_list_input_skips_llm():
# Input no-lista (None) -> clusters vacio sin tocar la red.
out = describe_clusters_llm(None, _FEATURES)
assert out["clusters"] == []
assert out["note"] == "sin clusters"
assert out["model"] == "claude-haiku-4-5-20251001"
@@ -1,95 +0,0 @@
---
name: project_clusters_2d
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def project_clusters_2d(columns: dict, k_min: int = 2, k_max: int = 8, max_points: int = 2000) -> dict"
description: "PCA a 2D + KMeans sobre el MISMO subset numerico estandarizado, devolviendo proyeccion 2D y labels de cluster ALINEADOS por fila para pintar un scatter PCA coloreado por cluster. Estandariza una sola vez, elige k por silhouette y proyecta centroides al espacio PCA. Determinista."
tags: [eda, models, clustering, pca, kmeans, scatter, dimensionality-reduction, datascience, sklearn]
params:
- name: columns
desc: "Mapa {nombre_columna: [valores numericos]}. Listas alineadas por fila (misma longitud). Columnas no numericas o con <2 valores distintos se descartan; None/NaN descartan la fila completa (listwise)."
- name: k_min
desc: "Numero minimo de clusters a probar por silhouette (default 2). El minimo de filas validas requerido es max(3, k_min*2)."
- name: k_max
desc: "Numero maximo de clusters a probar (default 8). Se acota a min(k_max, n_filas_validas-1)."
- name: max_points
desc: "Tope de puntos devueltos en points/labels (default 2000). Si n_used lo supera, points y labels se submuestrean CONJUNTAMENTE con paso determinista para seguir alineados; el fit usa siempre todas las filas."
output: "dict con points (proyeccion 2D, posiblemente submuestreada a max_points), labels (cluster de cada point, alineado con points), centers_2d (centroides en espacio PCA, len==best_k), best_k, silhouette, explained_2d ([var PC1, var PC2]), cluster_sizes (sobre n_used total), cluster_profiles (lista de {cluster, size, pct, centroid_original, distinctive top-3 por |z|, centroid_z}), feature_names, n_used (filas del fit antes de muestreo) y note (\"\" si ok). Con <2 columnas numericas o <max(3, k_min*2) filas validas devuelve best_k=0, listas vacias y note 'datos insuficientes' sin lanzar excepcion."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [numpy, scikit-learn]
tested: true
tests: ["test_golden_three_blobs_aligned_projection_and_clusters", "test_edge_subsampling_keeps_points_labels_aligned", "test_edge_single_numeric_column_insufficient", "test_edge_too_few_rows_insufficient", "test_edge_non_numeric_column_dropped_without_error", "test_edge_constant_column_dropped"]
test_file_path: "python/functions/datascience/project_clusters_2d_test.py"
file_path: "python/functions/datascience/project_clusters_2d.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.project_clusters_2d import project_clusters_2d
# Tres grupos gaussianos bien separados sobre 4 features.
import numpy as np
rng = np.random.default_rng(0)
rows = []
for center in (np.full(4, 0.0), np.full(4, 12.0), np.array([0.0, 12.0, 0.0, 12.0])):
rows.extend(rng.normal(loc=center, scale=0.4, size=(50, 4)))
mat = np.array(rows)
columns = {f"f{j}": [float(v) for v in mat[:, j]] for j in range(4)}
res = project_clusters_2d(columns, k_min=2, k_max=8)
print(res["best_k"]) # 3
print(len(res["points"]), len(res["labels"])) # 150 150 (alineados)
print(len(res["centers_2d"])) # == best_k
print([round(v, 2) for v in res["explained_2d"]]) # varianza de PC1, PC2
# Pintar: scatter(points[:,0], points[:,1], c=labels) + marcar centers_2d.
```
## Cuando usarla
Cuando, durante un EDA, quieres un scatter 2D de un dataset tabular numerico
coloreado por segmento descubierto automaticamente, y necesitas que cada punto
de la proyeccion lleve su etiqueta de cluster correcta. Usala en vez de
combinar `pca_explained` + `kmeans_segments` a mano: esas estandarizan por
separado y descartan los labels, asi que sus salidas no se pueden cruzar fila a
fila. Esta funcion garantiza esa alineacion (mismo X estandarizado para PCA y
KMeans) y ademas proyecta los centroides KMeans al espacio PCA para dibujarlos.
## Gotchas
- Funcion pura y determinista (StandardScaler + PCA random_state=0 + KMeans
random_state=0, n_init=10), pero requiere `numpy` y `scikit-learn` instalados.
- `points`/`labels` pueden venir submuestreados si `n_used > max_points` (paso
determinista `[::ceil(n_used/max_points)]`); `n_used`, `centers_2d`,
`cluster_sizes` y `cluster_profiles` se calculan SIEMPRE sobre todas las filas.
Cuando hay submuestreo, `note` lo indica.
- `centroid_z` y `distinctive` estan en z-score (espacio escalado);
`centroid_original` esta en las unidades originales (via
`scaler.inverse_transform`). No mezcles ambos al interpretar.
- `centers_2d` esta en el espacio PCA (coordenadas del scatter), no en unidades
originales: pintalo sobre el mismo eje que `points`.
- Silhouette baja con best_k alto sugiere que no hay estructura de cluster real;
el scatter puede no mostrar grupos separados.
## Notas
Pieza de composicion que `pca_explained` + `kmeans_segments` no cubren: ambas
estandarizan internamente por separado (cada una su propio `StandardScaler`) y
`kmeans_segments` no expone los labels por fila, por lo que no se pueden cruzar
con la `projection` de `pca_explained`. Esta funcion usa `sklearn` directo
(StandardScaler una sola vez compartido por PCA y KMeans) para garantizar la
alineacion `points[i] <-> labels[i]` y proyectar los centroides KMeans al
espacio PCA. Coercion y listwise deletion siguen el estilo de `pca_explained`
(None/NaN -> fila descartada, columnas no parseables o constantes descartadas).
Degrada con gracia: con <2 columnas numericas o <max(3, k_min*2) filas validas
devuelve `note: "datos insuficientes"` sin lanzar excepcion (try/except
defensivo en todo el cuerpo).
@@ -1,208 +0,0 @@
"""Proyeccion PCA-2D + KMeans sobre el mismo subset, con puntos y labels alineados.
Estandariza una sola vez las columnas numericas (z-score), proyecta a 2D con PCA
y clusteriza con KMeans sobre EXACTAMENTE la misma matriz escalada, de modo que
la proyeccion 2D (`points`) y la etiqueta de cluster (`labels`) quedan alineadas
fila a fila. Es la pieza que `pca_explained` + `kmeans_segments` no cubren: esas
dos estandarizan por separado y descartan los labels, asi que sus salidas no se
pueden cruzar para pintar un scatter PCA coloreado por cluster. Determinista.
"""
import math
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
def project_clusters_2d(
columns: dict,
k_min: int = 2,
k_max: int = 8,
max_points: int = 2000,
) -> dict:
"""Proyecta a 2D (PCA) y clusteriza (KMeans) el mismo subset estandarizado.
PCA a 2D y KMeans se ajustan sobre la MISMA matriz estandarizada, por lo que
`points` (proyeccion 2D) y `labels` (cluster por fila) quedan alineados por
indice. El k se elige automaticamente por silhouette en el rango
[k_min, min(k_max, n_rows-1)], igual criterio que `kmeans_segments`.
Determinista: StandardScaler + PCA(random_state=0) + KMeans(random_state=0,
n_init=10).
Args:
columns: mapa {nombre_columna: [valores numericos]}. Listas alineadas por
fila (misma longitud). Columnas no numericas o con menos de 2 valores
distintos se descartan. None/NaN marcan filas a descartar listwise
(una fila se elimina si cualquier feature falta).
k_min: numero minimo de clusters a probar (default 2).
k_max: numero maximo de clusters a probar (default 8). Se acota a
min(k_max, n_rows_validas-1).
max_points: tope de puntos devueltos en `points`/`labels`. Si las filas
usadas superan este tope, se submuestrea points y labels CONJUNTAMENTE
con paso determinista para mantenerlos alineados. El fit (best_k,
silhouette, centroides, perfiles) usa SIEMPRE todas las filas.
Returns:
dict con points (proyeccion 2D, posiblemente submuestreada a max_points),
labels (cluster de cada point, alineado con points), centers_2d
(centroides en espacio PCA, len == best_k), best_k, silhouette,
explained_2d (varianza de PC1 y PC2), cluster_sizes (sobre n_used total),
cluster_profiles (ver abajo), feature_names, n_used (filas del fit antes
de muestreo) y note ("" si ok). Cada entrada de cluster_profiles:
{cluster, size, pct, centroid_original (medias en escala original),
centroid_z (z del centroide), distinctive (top 3 features por |z|)}.
Con <2 columnas numericas o <max(3, k_min*2) filas validas devuelve
best_k=0 y note "datos insuficientes" sin lanzar excepcion.
"""
feature_names: list[str] = []
def insufficient(names: list[str], n_used: int) -> dict:
return {
"best_k": 0,
"points": [],
"labels": [],
"centers_2d": [],
"cluster_profiles": [],
"feature_names": names,
"n_used": int(n_used),
"note": "datos insuficientes",
}
try:
if not isinstance(columns, dict) or not columns:
return insufficient([], 0)
# 1. Coerce a numerico, descartando columnas no parseables o constantes.
numeric_cols: dict[str, list] = {}
for name, values in columns.items():
if not isinstance(values, (list, tuple)):
continue
coerced: list[float] = []
usable = True
for v in values:
if v is None:
coerced.append(math.nan)
continue
try:
coerced.append(float(v))
except (TypeError, ValueError):
usable = False
break
if not usable:
continue
# Menos de 2 valores distintos no aporta varianza -> descartar.
distinct = {x for x in coerced if not math.isnan(x)}
if len(distinct) < 2:
continue
numeric_cols[name] = coerced
feature_names = list(numeric_cols.keys())
if len(feature_names) < 2:
return insufficient(feature_names, 0)
# 2. Matriz alineada por fila + listwise deletion (cualquier NaN -> fuera).
matrix = np.array(
[numeric_cols[n] for n in feature_names], dtype=float
).T
valid_mask = ~np.isnan(matrix).any(axis=1)
data = matrix[valid_mask]
n_used = int(data.shape[0])
min_rows = max(3, k_min * 2)
if n_used < min_rows:
return insufficient(feature_names, n_used)
# 3. Estandarizar UNA sola vez (guardamos el scaler para desestandarizar).
scaler = StandardScaler()
X_scaled = scaler.fit_transform(data)
# 4. PCA a 2D sobre la matriz escalada.
pca = PCA(n_components=2, random_state=0)
pca.fit(X_scaled)
proj = pca.transform(X_scaled)
# 5. KMeans con seleccion automatica de k por silhouette (mismo X_scaled).
upper_k = min(k_max, n_used - 1)
if upper_k < k_min:
return insufficient(feature_names, n_used)
best = None # (silhouette, k, model, labels)
for k in range(k_min, upper_k + 1):
model = KMeans(n_clusters=k, n_init=10, random_state=0)
labels_k = model.fit_predict(X_scaled)
if len(set(labels_k)) < 2:
sil = -1.0
else:
sil = float(silhouette_score(X_scaled, labels_k))
if best is None or sil > best[0]:
best = (sil, k, model, labels_k)
best_sil, best_k, best_model, labels = best
# 6. Centroides KMeans (espacio escalado) proyectados al espacio PCA.
centers_2d = pca.transform(best_model.cluster_centers_)
# 7. Perfiles por cluster sobre TODAS las filas usadas.
centroids_original = scaler.inverse_transform(best_model.cluster_centers_)
cluster_sizes: list[int] = []
cluster_profiles: list[dict] = []
for c in range(best_k):
size = int(np.sum(labels == c))
cluster_sizes.append(size)
z_vec = best_model.cluster_centers_[c]
orig_vec = centroids_original[c]
centroid_z = {
feature_names[j]: float(z_vec[j]) for j in range(len(feature_names))
}
centroid_original = {
feature_names[j]: float(orig_vec[j])
for j in range(len(feature_names))
}
order = np.argsort(np.abs(z_vec))[::-1]
distinctive = [feature_names[int(j)] for j in order[:3]]
cluster_profiles.append(
{
"cluster": int(c),
"size": size,
"pct": float(size / n_used) if n_used else 0.0,
"centroid_original": centroid_original,
"distinctive": distinctive,
"centroid_z": centroid_z,
}
)
# 8. Muestreo determinista CONJUNTO de points + labels (mantiene alineacion).
note = ""
if n_used > max_points and max_points > 0:
step = math.ceil(n_used / max_points)
proj_out = proj[::step]
labels_out = labels[::step]
note = f"submuestreado a {len(proj_out)} de {n_used} puntos para visualizacion"
else:
proj_out = proj
labels_out = labels
points = [[float(row[0]), float(row[1])] for row in proj_out]
labels_list = [int(v) for v in labels_out]
centers_list = [[float(row[0]), float(row[1])] for row in centers_2d]
explained_2d = [float(x) for x in pca.explained_variance_ratio_]
return {
"points": points,
"labels": labels_list,
"centers_2d": centers_list,
"best_k": int(best_k),
"silhouette": float(best_sil),
"explained_2d": explained_2d,
"cluster_sizes": cluster_sizes,
"cluster_profiles": cluster_profiles,
"feature_names": feature_names,
"n_used": n_used,
"note": note,
}
except Exception:
# Lectura defensiva: nunca propagar excepciones al caller del EDA.
return insufficient(feature_names, 0)
@@ -1,127 +0,0 @@
"""Tests para project_clusters_2d."""
import numpy as np
from project_clusters_2d import project_clusters_2d
def _three_blobs(seed: int = 0, per_blob: int = 50, n_features: int = 4):
"""Genera 3 gaussianas bien separadas en n_features dims, alineadas por fila.
Devuelve un dict {col: [valores]} con las columnas alineadas por fila.
"""
rng = np.random.default_rng(seed)
base_centers = [
np.full(n_features, 0.0),
np.full(n_features, 12.0),
np.array([0.0, 12.0, 0.0, 12.0][:n_features] + [0.0] * max(0, n_features - 4)),
]
rows: list[np.ndarray] = []
for center in base_centers:
pts = rng.normal(loc=center, scale=0.4, size=(per_blob, n_features))
rows.extend(pts)
mat = np.array(rows)
return {f"f{j}": [float(v) for v in mat[:, j]] for j in range(n_features)}
def test_golden_three_blobs_aligned_projection_and_clusters():
columns = _three_blobs(seed=0, per_blob=50, n_features=4)
result = project_clusters_2d(columns, k_min=2, k_max=8)
n_used = result["n_used"]
assert n_used == 150
assert result["note"] == ""
best_k = result["best_k"]
assert 2 <= best_k <= 4
# points y labels alineados por fila.
assert len(result["points"]) == len(result["labels"])
assert len(result["points"]) == n_used # sin submuestreo (150 < 2000)
# Cada punto es un par (x, y).
assert all(len(p) == 2 for p in result["points"])
# Labels dentro del rango [0, best_k).
assert all(0 <= lbl < best_k for lbl in result["labels"])
# Centroides 2D: uno por cluster.
assert len(result["centers_2d"]) == best_k
assert all(len(c) == 2 for c in result["centers_2d"])
# Varianza explicada de los 2 componentes.
assert len(result["explained_2d"]) == 2
# cluster_sizes cubre todas las filas usadas.
assert sum(result["cluster_sizes"]) == n_used
assert len(result["cluster_sizes"]) == best_k
# cluster_profiles: una entrada por cluster, con centroid_original poblado.
assert len(result["cluster_profiles"]) == best_k
for prof in result["cluster_profiles"]:
assert set(prof["centroid_original"].keys()) == set(result["feature_names"])
assert set(prof["centroid_z"].keys()) == set(result["feature_names"])
assert 1 <= len(prof["distinctive"]) <= 3
assert prof["size"] >= 0
assert 0.0 <= prof["pct"] <= 1.0
def test_edge_subsampling_keeps_points_labels_aligned():
# max_points pequeño fuerza submuestreo conjunto de points + labels.
columns = _three_blobs(seed=1, per_blob=50, n_features=3)
result = project_clusters_2d(columns, k_min=2, k_max=6, max_points=40)
n_used = result["n_used"]
assert n_used == 150 # el fit usa todas las filas
# points y labels submuestreados pero siempre con la misma longitud.
assert len(result["points"]) == len(result["labels"])
assert len(result["points"]) <= 40
# centers/sizes/profiles se calculan sobre TODOS los puntos.
assert sum(result["cluster_sizes"]) == n_used
assert len(result["centers_2d"]) == result["best_k"]
assert result["note"] != "" # senala el submuestreo
def test_edge_single_numeric_column_insufficient():
columns = {"x": [float(i) for i in range(50)]}
result = project_clusters_2d(columns, k_min=2, k_max=8)
assert result["best_k"] == 0
assert result["note"] == "datos insuficientes"
assert result["points"] == []
assert result["labels"] == []
assert result["centers_2d"] == []
assert result["cluster_profiles"] == []
def test_edge_too_few_rows_insufficient():
# Solo 2 filas validas, min_rows = max(3, k_min*2) = 4 -> insuficiente.
columns = {"x": [1.0, 5.0], "y": [2.0, 9.0]}
result = project_clusters_2d(columns, k_min=2, k_max=8)
assert result["best_k"] == 0
assert result["note"] == "datos insuficientes"
def test_edge_non_numeric_column_dropped_without_error():
# La columna de strings se descarta; quedan 3 numericas -> funciona.
columns = _three_blobs(seed=2, per_blob=50, n_features=3)
columns["label"] = ["a"] * len(columns["f0"])
result = project_clusters_2d(columns, k_min=2, k_max=6)
assert result["best_k"] >= 2
assert "label" not in result["feature_names"]
assert set(result["feature_names"]) == {"f0", "f1", "f2"}
assert len(result["points"]) == len(result["labels"])
def test_edge_constant_column_dropped():
# Una columna constante (0 varianza) se descarta por <2 valores distintos.
columns = _three_blobs(seed=3, per_blob=50, n_features=3)
columns["const"] = [7.0] * len(columns["f0"])
result = project_clusters_2d(columns, k_min=2, k_max=6)
assert "const" not in result["feature_names"]
assert result["best_k"] >= 2
@@ -1,107 +0,0 @@
---
name: render_automatic_eda_pdf
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def render_automatic_eda_pdf(chapters_or_profile, out_path: str, meta: dict = None) -> dict"
description: "Renderiza un documento AutomaticEDA por CAPÍTULOS (modelo de bloques independiente del formato) en un PDF A5 retrato pensado para LEER EN EL MÓVIL. Acepta una lista de capítulos del modelo o directamente un TableProfile del grupo eda (en cuyo caso construye los capítulos canónicos con build_document). El paginador MIDE cada bloque y NUNCA corta nada: el texto se envuelve a líneas completas, las tablas largas se parten por filas REPITIENDO la cabecera, figuras e imágenes se escalan para caber enteras. Cada capítulo empieza en página nueva con pie 'Capítulo · vX.Y.Z' y se escribe un manifiesto automatic_eda_manifest.json junto a la salida para seguimiento por capítulo. dict-no-throw: nunca lanza, devuelve {path, n_pages, chapters, manifest_path, note}. Motor matplotlib PdfPages. Aditivo: NO reemplaza render_eda_pdf."
tags: [eda, pdf, render, report, mobile, automatic-eda, chapters, versioned, no-cut, pagination, matplotlib, datascience, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [os, matplotlib, "datascience.automatic_eda"]
params:
- name: chapters_or_profile
desc: "una lista de capítulos del modelo AutomaticEDA (dataclasses Chapter o dicts {id,title,version,blocks}) O un TableProfile dict del grupo eda. Si es un TableProfile, los capítulos canónicos se construyen con build_document(profile, meta['ctx']). Un capítulo es {id,title,version,blocks}; un bloque es uno de: heading, markdown, kv_table, data_table, figure, image, caption, note. Lectura defensiva: cualquier cosa no reconocida se degrada a Note, nunca lanza."
- name: out_path
desc: "ruta del archivo PDF de salida. Los directorios padre se crean si faltan. Si está en un directorio no escribible (p.ej. /proc/...) devuelve {path:None, note:<causa>} sin lanzar."
- name: meta
desc: "dict opcional. Claves: title (título de portada/pie), ctx (contexto de presentación pasado a los builders de capítulo cuando se da un profile: dataset_name, source_origin, storage, generated_at, description, granularity, quality_criteria, head_rows...), manifest_path (override; por defecto automatic_eda_manifest.json junto a out_path), write_manifest (False para no escribirlo), generated_at."
output: "dict (nunca lanza): {path: str|None, n_pages: int, chapters: list[{id,version,n_pages}], manifest_path: str|None, note: str}. En éxito path es la ruta escrita, n_pages el total de páginas, chapters el desglose por capítulo para el manifiesto. En error fatal path es None y note explica la causa."
tested: true
tests: ["test_golden_profile_genera_pdf_portada_y_overview", "test_edge_tabla_larga_parte_repitiendo_cabecera", "test_edge_celda_larga_no_se_corta", "test_no_corta_texto_markdown", "test_edge_profile_none_y_vacio_un_pagina", "test_error_path_directorio_no_escribible_no_revienta"]
test_file_path: "python/functions/datascience/render_automatic_eda_pdf_test.py"
file_path: "python/functions/datascience/render_automatic_eda_pdf.py"
---
## Ejemplo
```python
from datascience import render_automatic_eda_pdf
# Caso 1: directamente desde un TableProfile del grupo eda.
# profile = profile_table(db, "ventas", backend="duckdb")["profile"]
profile = {
"table": "ventas", "source": "/data/ventas.csv",
"n_rows": 1000, "n_cols": 2, "quality_score": 92.5,
"columns": [
{"name": "precio", "inferred_type": "numeric", "null_pct": 0.01,
"null_count": 10,
"numeric": {"mean": 42.5, "median": 40.0, "min": 1.0, "max": 100.0,
"std": 12.3}},
{"name": "categoria", "inferred_type": "categorical", "null_pct": 0.0,
"categorical": {"top": [{"value": "neumaticos", "count": 500},
{"value": "aceite", "count": 300}]}},
],
}
res = render_automatic_eda_pdf(
profile, "reports/ventas_aeda.pdf",
{"title": "EDA — ventas",
"ctx": {"dataset_name": "Ventas", "source_origin": "ERP export",
"description": "Líneas de venta del ERP.",
"granularity": "Cada fila es una línea de venta."}})
print(res["n_pages"], res["chapters"], res["manifest_path"])
# -> 3 [{'id':'portada','version':'1.0.0','n_pages':1},
# {'id':'overview','version':'1.0.0','n_pages':2}] reports/automatic_eda_manifest.json
# Caso 2: desde capítulos construidos a mano (modelo de bloques).
from datascience.automatic_eda.model import Chapter, Heading, DataTable
ch = Chapter(id="resumen", title="Resumen", version="1.0.0", blocks=[
Heading("Tabla", 1),
DataTable(header=["col", "valor"], rows=[["a", "1"], ["b", "2"]]),
])
render_automatic_eda_pdf([ch], "reports/manual.pdf")
```
## Cuando usarla
Cuando quieras el **PDF móvil del nuevo motor AutomaticEDA por capítulos** (portada
+ overview + los capítulos que existan): después de `profile_table(...)`, pásale el
`profile` y obtienes un PDF A5 retrato versionado por capítulo, con manifiesto. Úsala
como capa de presentación PDF del grupo `eda` cuando necesites **garantía de no-corte**
(texto, tablas e imágenes nunca recortados) y **versionado por capítulo** para mejora
continua. Es el reemplazo evolutivo de `render_eda_pdf`: comparte estética Tufte/móvil
pero separa contenido (capítulos/bloques) de formato (renderer), de modo que el mismo
documento se emite también como PPTX (`render_automatic_eda_pptx`). Para añadir un
capítulo nuevo, ver `docs/capabilities/automatic_eda.md`.
## Gotchas
- **Impura**: escribe el PDF en `out_path` (crea los directorios padre) y, salvo
`meta['write_manifest']=False`, un `automatic_eda_manifest.json` junto a la salida.
Backend headless `Agg` de matplotlib (corre en agentes/CI sin display).
- **Nunca lanza** (dict-no-throw): un bloque o capítulo que falle se omite y se anota
en `note`; el PDF se genera igual. Un profile `None`/`{}` produce un PDF de 1 página
válido. `out_path` no escribible → `{path: None, note: <causa>}`.
- **No corta nada**: el paginador mide cada bloque con una rejilla de caracteres
(sobre-estima ligeramente, nunca afirma que algo cabe cuando se desbordaría). El
texto se envuelve a líneas completas (sin cortar a media palabra), las tablas largas
se parten por filas **repitiendo la cabecera**, las celdas con texto largo se
envuelven dentro de su columna (la fila crece), y figuras/imágenes se escalan para
caber enteras (nunca se recortan).
- **Tablas muy anchas**: con muchas columnas (>10) cada columna se estrecha y su texto
se envuelve en varias líneas (sigue sin perderse). El reparto por columnas-en-grupos
para tablas muy anchas es una mejora pendiente (ver capability page).
- **head_rows / examples**: el capítulo Overview muestra `df.head` desde
`ctx['head_rows']`/`profile['head_rows']` y ejemplos no-nulos desde
`columns[i]['examples']`; si el profile no los trae (hoy no los trae), degrada con un
placeholder honesto y deriva los ejemplos de los valores reales del perfil (top
categóricos, min/median/max numéricos). Documentado en el contrato.
- **Registro en el package**: el `## Ejemplo` usa `from datascience import
render_automatic_eda_pdf` (añadido al `__init__.py`); el test importa el módulo
directo para no depender de ese registro.
- **Fechas en UI europeas**: la portada formatea la fecha como `DD/MM/AAAA HH:mm`.
@@ -1,83 +0,0 @@
"""render_automatic_eda_pdf — chapter-based EDA report as an A5-portrait PDF.
Public ``eda``-group entry point of the AutomaticEDA engine. Takes either a list
of chapters (the format-independent document model) or an ``eda`` TableProfile
dict (in which case the canonical chapters are built with ``build_document``),
and renders a mobile-first PDF whose paginator MEASURES every block and never
cuts text, tables or images: text wraps to whole lines, long tables split by
rows repeating the header, figures/images scale to fit entirely. Each chapter
starts on a fresh page stamped ``<Chapter> · v<version>`` in the footer, and a
per-chapter manifest (``automatic_eda_manifest.json``) is written next to the
output for version tracking.
dict-no-throw: never raises. Returns ``{path, n_pages, chapters, manifest_path,
note}``; on a fatal write error ``path`` is None and ``note`` explains why.
Additive: this does NOT replace ``render_eda_pdf`` (still used by
``profile_table(emit_pdf=True)``). It is the new engine that will, in the next
phase, let every EDA emit both a PDF and a PPTX from the same chapter model.
"""
from __future__ import annotations
import os
from datascience.automatic_eda import build_document, merge_manifest, render_pdf
from datascience.automatic_eda.model import as_chapter, as_chapters
def _coerce_chapters(chapters_or_profile, meta: dict) -> list:
"""Accept chapters OR an eda profile and return a list of Chapter."""
arg = chapters_or_profile
if isinstance(arg, (list, tuple)):
return as_chapters(list(arg))
if isinstance(arg, dict):
# A single chapter dict has 'blocks'; a profile has columns/table/rows.
if "blocks" in arg and "columns" not in arg:
ch = as_chapter(arg)
return [ch] if ch is not None else []
# Treat as an eda TableProfile.
return build_document(arg, (meta or {}).get("ctx"))
return []
def render_automatic_eda_pdf(chapters_or_profile, out_path: str,
meta: dict = None) -> dict:
"""Render an AutomaticEDA document into a mobile-readable PDF.
Args:
chapters_or_profile: either a list of chapters (``Chapter`` dataclasses
or dicts following the document model) or an ``eda`` TableProfile
dict in the latter case the canonical chapters are built via
``build_document(profile, meta['ctx'])``.
out_path: filesystem path for the PDF (parent dirs are created).
meta: optional dict. Recognised keys: ``title`` (cover/footer title),
``ctx`` (presentation context passed to chapter builders when a
profile is given), ``manifest_path`` (override; defaults to
``automatic_eda_manifest.json`` beside ``out_path``),
``write_manifest`` (set False to skip), ``generated_at``.
Returns:
dict (never raises): ``{path, n_pages, chapters, manifest_path, note}``.
"""
meta = dict(meta or {})
chapters = _coerce_chapters(chapters_or_profile, meta)
result = render_pdf(chapters, out_path, meta)
manifest_path = None
if meta.get("write_manifest", True) and result.get("path"):
manifest_path = meta.get("manifest_path")
if not manifest_path:
manifest_path = os.path.join(
os.path.dirname(os.path.abspath(out_path)),
"automatic_eda_manifest.json")
generated_at = meta.get("generated_at") or _now_iso()
merge_manifest(manifest_path, "pdf", result.get("chapters") or [],
generated_at)
result["manifest_path"] = manifest_path
return result
def _now_iso() -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
@@ -1,140 +0,0 @@
"""Tests for render_automatic_eda_pdf — DoD: golden + edges + error path.
Self-contained: builds a synthetic TableProfile (no DuckDB) so the suite is fast
and deterministic. Verifies the cover/overview reference chapters render, that
long tables split by rows repeating the header without losing any cell text,
that an empty/None profile still yields a valid 1-page PDF, and that an
unwritable destination returns ``{path: None}`` without raising.
"""
import os
import re
import tempfile
from pypdf import PdfReader
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
from datascience.automatic_eda.model import Chapter, DataTable, Heading, Markdown
def _profile() -> dict:
return {
"table": "ventas",
"source": "/data/ventas.csv",
"profiled_at": "2026-06-30T10:00:00+00:00",
"n_rows": 1000,
"n_cols": 3,
"quality_score": 92.5,
"key_candidates": ["id"],
"type_breakdown": {"numeric": 2, "categorical": 1},
"columns": [
{"name": "id", "inferred_type": "numeric", "null_pct": 0.0,
"null_count": 0,
"numeric": {"mean": 500.0, "median": 500.0, "min": 1.0,
"max": 1000.0, "std": 288.7}},
{"name": "precio", "inferred_type": "numeric", "null_pct": 0.01,
"null_count": 10,
"numeric": {"mean": 42.5, "median": 40.0, "min": 1.0,
"max": 100.0, "std": 12.3}},
{"name": "categoria", "inferred_type": "categorical",
"null_pct": 0.0, "null_count": 0,
"categorical": {"top": [{"value": "neumaticos", "count": 500},
{"value": "aceite", "count": 300}]}},
],
}
def _pdf_text(path: str) -> str:
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
return re.sub(r"\s+", " ", txt)
def test_golden_profile_genera_pdf_portada_y_overview():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "eda.pdf")
res = render_automatic_eda_pdf(_profile(), out, {"title": "EDA — ventas"})
assert res["path"] == out
assert os.path.exists(out)
assert res["n_pages"] >= 2 # portada + overview (1+ each).
ids = [c["id"] for c in res["chapters"]]
assert "portada" in ids and "overview" in ids
# Manifest written next to the output with both chapters versioned.
assert res["manifest_path"] and os.path.exists(res["manifest_path"])
txt = _pdf_text(out)
# Cover fields.
assert "Automatic-EDA" in txt
assert "CSV" in txt # storage inferred from .csv source.
assert "Calidad" in txt and "92.5" in txt
assert "Fuente" in txt
# Overview content: column dictionary + describe.
assert "precio" in txt and "categoria" in txt
assert "median" in txt
def test_edge_tabla_larga_parte_repitiendo_cabecera():
# 60 rows over 6 wide columns: the table must split across pages and repeat
# the header on every continuation page (headers wide enough not to wrap).
header = ["ALPHA", "BETA", "GAMMA", "DELTA", "EPSILON", "ZETA"]
rows = [[f"r{r}c{c}" for c in range(6)] for r in range(60)]
ch = Chapter(id="edge", title="Edge", version="1.0.0",
blocks=[Heading("Tabla", 1),
DataTable(header=header, rows=rows)])
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "edge.pdf")
res = render_automatic_eda_pdf([ch], out, {"write_manifest": False})
assert res["path"] == out
reader = PdfReader(out)
n_pages = len(reader.pages)
assert n_pages > 1 # table spilled to several pages.
pages_with_header = sum(
1 for pg in reader.pages if "ALPHA" in (pg.extract_text() or ""))
assert pages_with_header == n_pages # header repeated on every page.
def test_edge_celda_larga_no_se_corta():
# A single cell with ~150 chars must wrap inside its column (the row grows),
# never truncated: all of its words survive in the rendered PDF.
long_cell = ("Lorem ipsum dolor sit amet consectetur adipiscing elit sed do "
"eiusmod tempor incididunt ut labore et dolore magna aliqua "
"reprehenderit voluptate")
header = ["clave", "descripcion"]
rows = [["k1", long_cell], ["k2", "corto"]]
ch = Chapter(id="edge2", title="Edge2", version="1.0.0",
blocks=[DataTable(header=header, rows=rows)])
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "edge2.pdf")
render_automatic_eda_pdf([ch], out, {"write_manifest": False})
txt = _pdf_text(out)
# Every word of the long cell present (wrapped, not truncated).
for word in ("Lorem", "incididunt", "reprehenderit", "voluptate"):
assert word in txt
def test_no_corta_texto_markdown():
para = " ".join(f"palabra{i}" for i in range(120))
ch = Chapter(id="md", title="MD", version="1.0.0",
blocks=[Markdown(text=para)])
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "md.pdf")
render_automatic_eda_pdf([ch], out, {"write_manifest": False})
txt = _pdf_text(out)
for i in (0, 60, 119): # first, middle, last words all present.
assert f"palabra{i}" in txt
def test_edge_profile_none_y_vacio_un_pagina():
with tempfile.TemporaryDirectory() as d:
for arg, name in ((None, "none"), ({}, "empty")):
out = os.path.join(d, f"{name}.pdf")
res = render_automatic_eda_pdf(arg, out, {"write_manifest": False})
assert res["path"] == out
assert os.path.exists(out)
assert res["n_pages"] == 1
def test_error_path_directorio_no_escribible_no_revienta():
res = render_automatic_eda_pdf(_profile(), "/proc/nope/x.pdf",
{"write_manifest": False})
assert res["path"] is None
assert res["n_pages"] == 0
assert res["note"]
@@ -1,86 +0,0 @@
---
name: render_automatic_eda_pptx
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def render_automatic_eda_pptx(chapters_or_profile, out_path: str, meta: dict = None) -> dict"
description: "Renderiza un documento AutomaticEDA por CAPÍTULOS (modelo de bloques independiente del formato) en una presentación PPTX 16:9 pensada para COMPARTIR. Acepta una lista de capítulos del modelo o directamente un TableProfile del grupo eda (construye los capítulos canónicos con build_document). Mismo principio anti-corte que el renderer PDF: cada bloque se mide y, si no cabe en la slide, continúa en una slide '<Capítulo> (cont.)'; las tablas largas se parten por filas REPITIENDO la cabecera; las figuras matplotlib se exportan a PNG e insertan escaladas para caber enteras. Cada slide lleva pie 'Capítulo · vX.Y.Z' y se escribe automatic_eda_manifest.json junto a la salida. dict-no-throw: nunca lanza, devuelve {path, n_slides, chapters, manifest_path, note}. Motor python-pptx (dependencia declarada en python/pyproject.toml)."
tags: [eda, pptx, render, report, share, automatic-eda, chapters, versioned, no-cut, slides, python-pptx, datascience, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [os, "python-pptx", "datascience.automatic_eda"]
params:
- name: chapters_or_profile
desc: "una lista de capítulos del modelo AutomaticEDA (dataclasses Chapter o dicts {id,title,version,blocks}) O un TableProfile dict del grupo eda. Si es un TableProfile, los capítulos canónicos se construyen con build_document(profile, meta['ctx']). Bloques soportados: heading, markdown, kv_table, data_table, figure, image, caption, note. Lectura defensiva: lo no reconocido se degrada a Note, nunca lanza."
- name: out_path
desc: "ruta del archivo PPTX de salida. Los directorios padre se crean si faltan. Directorio no escribible → {path:None, note:<causa>} sin lanzar."
- name: meta
desc: "dict opcional. Claves: title (título), ctx (contexto de presentación para los builders de capítulo cuando se da un profile), manifest_path (override; por defecto automatic_eda_manifest.json junto a out_path), write_manifest (False para no escribirlo), generated_at."
output: "dict (nunca lanza): {path: str|None, n_slides: int, chapters: list[{id,version,n_slides}], manifest_path: str|None, note: str}. En error fatal (incluida python-pptx no instalada) path es None y note explica la causa."
tested: true
tests: ["test_golden_profile_genera_pptx_portada_y_overview", "test_edge_tabla_larga_parte_repitiendo_cabecera_sin_cortar", "test_edge_profile_none_y_vacio_un_slide", "test_error_path_directorio_no_escribible_no_revienta"]
test_file_path: "python/functions/datascience/render_automatic_eda_pptx_test.py"
file_path: "python/functions/datascience/render_automatic_eda_pptx.py"
---
## Ejemplo
```python
from datascience import render_automatic_eda_pptx
# Desde un TableProfile del grupo eda (mismo modelo que el renderer PDF).
profile = {
"table": "ventas", "source": "/data/ventas.csv",
"n_rows": 1000, "n_cols": 2, "quality_score": 92.5,
"columns": [
{"name": "precio", "inferred_type": "numeric", "null_pct": 0.01,
"numeric": {"mean": 42.5, "median": 40.0, "min": 1.0, "max": 100.0,
"std": 12.3}},
{"name": "categoria", "inferred_type": "categorical", "null_pct": 0.0,
"categorical": {"top": [{"value": "neumaticos", "count": 500}]}},
],
}
res = render_automatic_eda_pptx(
profile, "reports/ventas_aeda.pptx",
{"title": "EDA — ventas",
"ctx": {"dataset_name": "Ventas", "source_origin": "ERP export"}})
print(res["n_slides"], res["chapters"], res["manifest_path"])
# -> 3 [{'id':'portada','version':'1.0.0','n_slides':1},
# {'id':'overview','version':'1.0.0','n_slides':2}] reports/automatic_eda_manifest.json
```
## Cuando usarla
Cuando quieras **compartir el EDA como una presentación** (no para móvil sino para
enseñar a alguien): mismo documento por capítulos que el PDF, emitido como PPTX 16:9.
Úsala junto a `render_automatic_eda_pdf` para que cada EDA tenga sus dos salidas (PDF
móvil + PPTX para compartir) desde el mismo modelo de capítulos. Garantiza no-corte:
ningún texto, tabla ni imagen se recorta — lo que no cabe en una slide continúa en otra
`(cont.)` con la cabecera repetida en las tablas. Para añadir capítulos nuevos al
documento, ver `docs/capabilities/automatic_eda.md`.
## Gotchas
- **Impura**: escribe el PPTX en `out_path` y, salvo `meta['write_manifest']=False`, el
manifiesto `automatic_eda_manifest.json` junto a la salida.
- **Dependencia python-pptx**: declarada en `python/pyproject.toml`
(`python-pptx>=1.0.2`). Si no está instalada, devuelve `{path: None, note:
'python-pptx no disponible: ...'}` sin lanzar. Instalar:
`uv pip install --python python/.venv/bin/python3 python-pptx`.
- **Nunca lanza** (dict-no-throw): un bloque que falle se omite y se anota en `note`; el
deck se genera igual. Un profile `None`/`{}` produce un deck de 1 slide válido.
- **No corta nada**: cada bloque se mide; si no cabe en la slide actual, abre una slide
`(cont.)`. Las tablas largas se parten por filas **repitiendo la cabecera** (las filas
restantes pasan a la siguiente slide). Las figuras matplotlib se exportan a PNG en
memoria y se insertan escaladas para caber enteras (nunca recortadas).
- **Figuras**: un bloque `figure` puede traer una `matplotlib.figure.Figure` ya
construida o un callable `make` (se construye perezosamente). Se cierra tras
rasterizar. Las imágenes (`image`) por ruta se escalan manteniendo el aspecto.
- **Tablas anchas**: con muchas columnas el ancho por columna se reduce y el texto se
envuelve dentro de la celda (sigue sin perderse). El reparto por grupos de columnas
para tablas muy anchas es mejora pendiente.
@@ -1,76 +0,0 @@
"""render_automatic_eda_pptx — chapter-based EDA report as a 16:9 PPTX deck.
Public ``eda``-group entry point that renders an AutomaticEDA document (a list
of chapters, or an ``eda`` TableProfile from which the canonical chapters are
built) into a PowerPoint deck for sharing. Same anti-cut principle as the PDF
renderer: every block is measured and, when it does not fit, continues on a new
slide titled ``<Chapter> (cont.)``; data tables split by rows repeating the
header; matplotlib figures are exported to PNG and inserted scaled to fit
entirely. Each slide is stamped ``<Chapter> · v<version>`` and a per-chapter
manifest (``automatic_eda_manifest.json``) is written next to the output.
dict-no-throw: never raises. Returns ``{path, n_slides, chapters,
manifest_path, note}``; on a fatal error ``path`` is None and ``note`` explains
why (e.g. python-pptx not installed).
Engine: ``python-pptx`` (added dependency; declared in python/pyproject.toml).
"""
from __future__ import annotations
import os
from datascience.automatic_eda import build_document, merge_manifest, render_pptx
from datascience.automatic_eda.model import as_chapter, as_chapters
def _coerce_chapters(chapters_or_profile, meta: dict) -> list:
"""Accept chapters OR an eda profile and return a list of Chapter."""
arg = chapters_or_profile
if isinstance(arg, (list, tuple)):
return as_chapters(list(arg))
if isinstance(arg, dict):
if "blocks" in arg and "columns" not in arg:
ch = as_chapter(arg)
return [ch] if ch is not None else []
return build_document(arg, (meta or {}).get("ctx"))
return []
def render_automatic_eda_pptx(chapters_or_profile, out_path: str,
meta: dict = None) -> dict:
"""Render an AutomaticEDA document into a shareable PPTX deck.
Args:
chapters_or_profile: a list of chapters (``Chapter`` dataclasses or
dicts) or an ``eda`` TableProfile dict (chapters built via
``build_document(profile, meta['ctx'])``).
out_path: filesystem path for the PPTX (parent dirs are created).
meta: optional dict. Recognised keys: ``title``, ``ctx``,
``manifest_path`` (defaults to ``automatic_eda_manifest.json`` beside
``out_path``), ``write_manifest`` (False to skip), ``generated_at``.
Returns:
dict (never raises): ``{path, n_slides, chapters, manifest_path, note}``.
"""
meta = dict(meta or {})
chapters = _coerce_chapters(chapters_or_profile, meta)
result = render_pptx(chapters, out_path, meta)
manifest_path = None
if meta.get("write_manifest", True) and result.get("path"):
manifest_path = meta.get("manifest_path")
if not manifest_path:
manifest_path = os.path.join(
os.path.dirname(os.path.abspath(out_path)),
"automatic_eda_manifest.json")
generated_at = meta.get("generated_at") or _now_iso()
merge_manifest(manifest_path, "pptx", result.get("chapters") or [],
generated_at)
result["manifest_path"] = manifest_path
return result
def _now_iso() -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
@@ -1,114 +0,0 @@
"""Tests for render_automatic_eda_pptx — DoD: golden + edges + error path.
Self-contained synthetic TableProfile (no DuckDB). Verifies the cover/overview
chapters render to slides, that long tables split across slides repeating the
header without losing cell text, that an empty/None profile yields a valid
1-slide deck, and that an unwritable destination returns ``{path: None}``.
"""
import os
import tempfile
from pptx import Presentation
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
from datascience.automatic_eda.model import Chapter, DataTable, Heading
def _profile() -> dict:
return {
"table": "ventas",
"source": "/data/ventas.csv",
"profiled_at": "2026-06-30T10:00:00+00:00",
"n_rows": 1000,
"n_cols": 2,
"quality_score": 92.5,
"columns": [
{"name": "precio", "inferred_type": "numeric", "null_pct": 0.01,
"null_count": 10,
"numeric": {"mean": 42.5, "median": 40.0, "min": 1.0,
"max": 100.0, "std": 12.3}},
{"name": "categoria", "inferred_type": "categorical",
"null_pct": 0.0, "null_count": 0,
"categorical": {"top": [{"value": "neumaticos", "count": 500},
{"value": "aceite", "count": 300}]}},
],
}
def _slide_texts(path: str) -> list:
prs = Presentation(path)
out = []
for sl in prs.slides:
parts = []
for sh in sl.shapes:
if sh.has_text_frame:
parts.append(sh.text_frame.text)
if sh.has_table:
tb = sh.table
for r in range(len(tb.rows)):
for c in range(len(tb.columns)):
parts.append(tb.cell(r, c).text)
out.append(" ".join(parts))
return out
def test_golden_profile_genera_pptx_portada_y_overview():
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "eda.pptx")
res = render_automatic_eda_pptx(_profile(), out, {"title": "EDA — ventas"})
assert res["path"] == out
assert os.path.exists(out)
assert res["n_slides"] >= 2
ids = [c["id"] for c in res["chapters"]]
assert "portada" in ids and "overview" in ids
assert res["manifest_path"] and os.path.exists(res["manifest_path"])
joined = " ".join(_slide_texts(out))
assert "Automatic-EDA" in joined
assert "CSV" in joined
assert "92.5" in joined
assert "precio" in joined and "categoria" in joined
assert "median" in joined
def test_edge_tabla_larga_parte_repitiendo_cabecera_sin_cortar():
long_cell = ("Lorem ipsum dolor sit amet consectetur adipiscing elit sed do "
"eiusmod tempor incididunt reprehenderit voluptate")
header = ["ALPHA", "BETA", "GAMMA", "DELTA"]
rows = [[f"r{r}c{c}" for c in range(4)] for r in range(50)]
rows[0][1] = long_cell
ch = Chapter(id="edge", title="Edge", version="1.0.0",
blocks=[Heading("Tabla", 1),
DataTable(header=header, rows=rows)])
with tempfile.TemporaryDirectory() as d:
out = os.path.join(d, "edge.pptx")
res = render_automatic_eda_pptx([ch], out, {"write_manifest": False})
assert res["path"] == out
texts = _slide_texts(out)
assert res["n_slides"] > 1 # table spilled to several slides.
# Header repeated: every slide that carries table rows shows "ALPHA".
slides_with_header = sum(1 for t in texts if "ALPHA" in t)
assert slides_with_header >= 2
joined = " ".join(texts)
assert "Lorem ipsum dolor" in joined and "reprehenderit voluptate" in joined
# No row lost: every data cell r0..r49 col0 present.
for r in (0, 25, 49):
assert f"r{r}c0" in joined
def test_edge_profile_none_y_vacio_un_slide():
with tempfile.TemporaryDirectory() as d:
for arg, name in ((None, "none"), ({}, "empty")):
out = os.path.join(d, f"{name}.pptx")
res = render_automatic_eda_pptx(arg, out, {"write_manifest": False})
assert res["path"] == out
assert os.path.exists(out)
assert res["n_slides"] == 1
def test_error_path_directorio_no_escribible_no_revienta():
res = render_automatic_eda_pptx(_profile(), "/proc/nope/x.pptx",
{"write_manifest": False})
assert res["path"] is None
assert res["n_slides"] == 0
assert res["note"]
-1
View File
@@ -28,7 +28,6 @@ dependencies = [
"pypdf>=6.10.0",
"pyproj>=3.7.2",
"python-docx>=1.2.0",
"python-pptx>=1.0.2",
"pyyaml>=6.0.3",
"qrcode[pil]>=8.2",
"rapidfuzz>=3.14.5",