feat(eda): EDA de carpeta/base multi-tabla -> AutomaticEDA por capitulos (PDF+PPTX+MD)

Pipeline render_automatic_eda_folder: apunta el AutomaticEDA a una CARPETA de
archivos tabulares (CSV/Parquet/JSON) o a una DuckDB existente y emite el informe
de la BASE por capitulos en PDF (A5 movil) + PPTX (16:9) + Markdown. Documento-base
con portada-base, resumen de todas las tablas y relaciones inter-tabla (FK
candidatas por containment + diagrama Mermaid del join graph). Flag per_table_eda
anexa el mini-EDA de cada tabla. Aditivo: render_automatic_eda (tabla unica) intacto.

Funcion nueva load_folder_to_duckdb (infra, grupo eda+duckdb): carga una carpeta a
una DuckDB (temp si no se da path), CREATE TABLE por archivo con read_csv_auto/
read_parquet/read_json_auto. dict-no-throw.

Compone profile_database + los 3 renderers del motor AutomaticEDA + build_document
(per-tabla), sin reimplementar su logica. Tests: golden 3 CSV relacionados (FK
orders.customer_id->customers.id detectada) + edges (carpeta vacia, 1 tabla,
DuckDB existente, path inexistente). fn index sin error.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-30 20:34:10 +02:00
parent a1e2e3567c
commit 6a1520f458
7 changed files with 958 additions and 0 deletions
+2
View File
@@ -34,6 +34,7 @@ from .upsert_xlsx_sheet import upsert_xlsx_sheet
from .duckdb_query_readonly import duckdb_query_readonly
from .duckdb_execute import duckdb_execute
from .duckdb_upsert import duckdb_upsert
from .load_folder_to_duckdb import load_folder_to_duckdb
from .imap_connect import imap_connect
from .imap_list_mailboxes import imap_list_mailboxes
from .imap_search import imap_search
@@ -50,6 +51,7 @@ __all__ = [
"upsert_xlsx_sheet",
"duckdb_query_readonly",
"duckdb_execute",
"load_folder_to_duckdb",
"duckdb_upsert",
"pg_insert_rows",
"pg_apply_sql",
@@ -0,0 +1,100 @@
---
name: load_folder_to_duckdb
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def load_folder_to_duckdb(folder: str, db_path: str = None, pattern: str = '*.csv,*.parquet,*.json') -> dict"
description: "Escanea el primer nivel de una CARPETA buscando archivos tabulares (CSV/TSV/TXT, Parquet, JSON/NDJSON) y los carga como tablas en una base DuckDB usando los lectores nativos read_csv_auto/read_parquet/read_json_auto. Es la pieza de entrada del EDA a nivel de carpeta (grupo eda). Por cada archivo crea una tabla cuyo nombre se deriva del basename saneado a [0-9a-zA-Z_] en minusculas (prefijo t_ si empieza por digito, sufijos _2/_3 ante colisiones, tabla_<i> si queda vacio). El path se escapa (comilla simple '->'') antes de interpolarlo porque los lectores DuckDB no aceptan el path como parametro posicional. Glob NO recursivo: un glob.glob(os.path.join(folder, g)) por cada patron del CSV, dedup y ordenado. db_path=None genera una DuckDB temporal (mkstemp, se borra el placeholder vacio porque DuckDB rechaza un archivo de 0 bytes) y devuelve su ruta. Un fallo al cargar un archivo concreto no aborta el resto: se registra en errors y se continua. Devuelve siempre un dict sin lanzar (estilo del grupo duckdb): {status:'ok', db_path, tables, errors} en exito (carpeta sin archivos tabulares incluida, tables=[]) y {status:'error', error} cuando la carpeta no existe o falla algo global. Depende del paquete duckdb (1.5.2)."
tags: [eda, duckdb, ingest, etl, folder]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_py_core"
imports: [glob, os, re, tempfile, duckdb]
params:
- name: folder
desc: "ruta a un directorio. Se escanea solo su primer nivel (NO recursivo). Si no existe o no es un directorio devuelve {status:'error'} sin lanzar."
- name: db_path
desc: "ruta del archivo DuckDB destino, abierto en modo read-write (lo crea si no existe). None (default) genera una DuckDB temporal unica con tempfile.mkstemp y devuelve su ruta en el campo db_path del retorno. DuckDB es single-writer: si otro proceso lo tiene abierto en escritura, connect falla con error de lock devuelto en el dict."
- name: pattern
desc: "CSV de globs separados por coma (default '*.csv,*.parquet,*.json'). Cada glob se aplica con glob.glob(os.path.join(folder, g)) sobre el primer nivel de folder; los resultados de todos los globs se deduplican y ordenan. Los globs con ** NO descienden recursivamente (glob.glob sin recursive=True)."
output: "dict. En exito: {status:'ok', db_path:str (ruta DuckDB usada), tables:[{name:str, source_file:str, n_rows:int}], errors:[{name?:str, source_file:str, error:str}]}. La carpeta sin archivos tabulares es un exito con tables=[] y errors=[]. En error (sin lanzar): {status:'error', error:str}."
tested: true
tests:
- "test_carga_dos_csv_como_tablas"
- "test_db_path_none_crea_temporal"
- "test_carpeta_vacia_es_ok_sin_tablas"
- "test_carpeta_inexistente_devuelve_status_error"
test_file_path: "python/functions/infra/load_folder_to_duckdb_test.py"
file_path: "python/functions/infra/load_folder_to_duckdb.py"
---
## Ejemplo
```python
import sys
sys.path.insert(0, "python/functions")
from infra.load_folder_to_duckdb import load_folder_to_duckdb
# Preparar una carpeta de demo con dos CSV.
import os
os.makedirs("/tmp/eda_folder_demo", exist_ok=True)
with open("/tmp/eda_folder_demo/ventas.csv", "w") as f:
f.write("id,total\n1,10.5\n2,20.0\n3,5.25\n")
with open("/tmp/eda_folder_demo/clientes.csv", "w") as f:
f.write("id,nombre\n1,ana\n2,luis\n")
# Cargar todos los tabulares de la carpeta a una DuckDB temporal.
res = load_folder_to_duckdb("/tmp/eda_folder_demo")
print(res["status"]) # ok
print(res["db_path"]) # /tmp/tmpXXXXXXXX.duckdb (temporal)
for t in res["tables"]:
print(t["name"], t["n_rows"]) # ventas 3 / clientes 2
# Persistir en una DuckDB concreta y limitar a CSV.
res2 = load_folder_to_duckdb(
"/tmp/eda_folder_demo",
db_path="/tmp/eda_folder_demo/folder.duckdb",
pattern="*.csv",
)
print(res2["tables"]) # [{'name': 'clientes', ...}, {'name': 'ventas', ...}]
```
## Cuando usarla
Cuando tienes una carpeta de datos sueltos (un dump, un export, varios CSV/Parquet
descargados) y quieres analizarlos juntos con SQL sin montar la ingesta a mano,
archivo por archivo. Es el primer eslabon del EDA a nivel de carpeta (grupo `eda`):
deja una DuckDB con una tabla por archivo, lista para perfilar con
`duckdb_table_schema_py_infra`, consultar con `duckdb_query_readonly_py_infra`, o
correlacionar aguas abajo. Usala antes de cualquier paso de perfilado cuando la
unidad de trabajo es "todos los archivos de este directorio".
## Gotchas
- **Glob NO recursivo**: solo se escanea el primer nivel de `folder`. Archivos en
subdirectorios se ignoran (ni siquiera con `**` en el patron, porque
`glob.glob` se llama sin `recursive=True`). Si necesitas recursion, aplana la
carpeta antes o amplia la funcion.
- **Saneo de nombres de tabla**: el basename se reduce a `[0-9a-zA-Z_]` en
minusculas. `Ventas 2024.csv` -> tabla `ventas_2024`. Dos archivos distintos
pueden sanear al mismo nombre (`a-b.csv` y `a_b.csv`); el segundo se desambigua
con sufijo `_2`, `_3`, ... El mapeo real archivo->tabla esta en `tables[].name`
/ `tables[].source_file`, no lo asumas.
- **`read_json_auto` requiere JSON tabular** (array de objetos u objetos NDJSON
homogeneos). Un JSON anidado o irregular puede fallar la carga de ESA tabla; el
error se registra en `errors` y el resto de archivos siguen cargandose.
- **Extension desconocida = se salta**, no falla: queda anotada en `errors` con
`unsupported extension`. Mapeo de lectores: `.csv/.tsv/.txt`->`read_csv_auto`,
`.parquet/.pq`->`read_parquet`, `.json/.ndjson`->`read_json_auto`.
- **Escritura real en disco (impura)**. DuckDB es single-writer: si otro proceso
tiene `db_path` abierto en escritura, `connect` falla con error de lock devuelto
en el dict. Un `db_path` con un directorio padre inexistente tambien falla.
- **`db_path=None` crea un archivo temporal que NO se borra solo**: la ruta se
devuelve en `db_path` para que el llamador la consuma y la limpie cuando termine.
- **Tipos inferidos por los lectores `_auto`**: los tipos de columna los infiere
DuckDB. Revisa el schema con `duckdb_table_schema_py_infra` si el tipado importa
aguas abajo.
@@ -0,0 +1,175 @@
"""Carga una carpeta de archivos tabulares (CSV/Parquet/JSON) como tablas DuckDB.
Funcion impura: escanea el primer nivel de un directorio buscando archivos que
casen con uno o varios globs, y por cada archivo crea una tabla en una base
DuckDB usando los lectores nativos (`read_csv_auto`, `read_parquet`,
`read_json_auto`). Es la pieza de entrada del EDA a nivel de carpeta (grupo
`eda`): deja una DuckDB con una tabla por archivo, lista para perfilar y
correlacionar aguas abajo.
Devuelve siempre un dict sin lanzar excepciones, siguiendo el estilo del grupo
duckdb del registry: {status:'ok', db_path, tables, errors} en exito (incluida
la carpeta sin archivos tabulares, que es un exito con tables=[]) y
{status:'error', error:str} cuando la carpeta no existe o falla algo global.
El nombre de cada tabla se deriva del basename del archivo, saneado a
`[0-9a-zA-Z_]` en minusculas, prefijado con `t_` si empieza por digito, y
desambiguado con sufijos `_2`, `_3`, ... ante colisiones. El path del archivo se
escapa (comilla simple, `'`->`''`) antes de interpolarlo en el SQL del lector,
ya que los lectores DuckDB no admiten el path como parametro posicional. Un fallo
al cargar un archivo concreto NO aborta el resto: se registra en `errors` y se
continua con los siguientes.
"""
import glob
import os
import re
import tempfile
def _sanitize_table_name(basename_no_ext: str, index: int) -> str:
"""Deriva un identificador de tabla valido desde el basename de un archivo.
Reemplaza todo lo que no sea ``[0-9a-zA-Z_]`` por ``_`` y baja a minusculas.
Si tras el saneo queda vacio, usa ``tabla_<index>``. Si empieza por digito,
prefija ``t_`` para que sea un identificador SQL valido.
"""
name = re.sub(r"[^0-9a-zA-Z_]", "_", basename_no_ext).lower()
if not name:
name = f"tabla_{index}"
if name[0].isdigit():
name = "t_" + name
return name
def _reader_for_extension(ext: str, quoted_path: str):
"""Devuelve la expresion de lector DuckDB para una extension, o None.
El ``quoted_path`` ya viene escapado y entre comillas simples. Extensiones
desconocidas devuelven None para que el llamador salte el archivo.
"""
ext = ext.lower()
if ext in (".csv", ".tsv", ".txt"):
return f"read_csv_auto('{quoted_path}')"
if ext in (".parquet", ".pq"):
return f"read_parquet('{quoted_path}')"
if ext in (".json", ".ndjson"):
return f"read_json_auto('{quoted_path}')"
return None
def load_folder_to_duckdb(
folder: str,
db_path: str = None,
pattern: str = "*.csv,*.parquet,*.json",
) -> dict:
"""Carga los archivos tabulares de una carpeta como tablas en una DuckDB.
Args:
folder: ruta a un directorio. Si no existe o no es un directorio,
devuelve {status:'error', ...} sin lanzar.
db_path: ruta de la DuckDB destino (read-write, se crea si no existe). Si
es None, se genera una base temporal con NamedTemporaryFile y su ruta
se devuelve en el retorno (`db_path`).
pattern: CSV de globs separados por coma (default
"*.csv,*.parquet,*.json"). Cada glob se aplica con
glob.glob(os.path.join(folder, g)) en el primer nivel (NO recursivo);
los resultados se deduplican y ordenan.
Returns:
dict. En exito: {status:'ok', db_path:str, tables:[{name, source_file,
n_rows}], errors:[{name?, source_file, error}]}. La carpeta sin archivos
tabulares es un exito con tables=[] y errors=[]. En error (sin lanzar):
{status:'error', error:str}.
"""
if not isinstance(folder, str) or not os.path.isdir(folder):
return {
"status": "error",
"error": f"folder does not exist or is not a directory: {folder!r}",
}
conn = None
try:
# Resolver la ruta de la DuckDB destino. Si no se da, reservar un nombre
# temporal unico y borrar el archivo vacio que crea mkstemp: DuckDB 1.5.2
# rechaza abrir un archivo de 0 bytes ("not a valid DuckDB database
# file"), por lo que debe crear el archivo el mismo desde cero.
if db_path is None:
fd, tmp_name = tempfile.mkstemp(suffix=".duckdb")
os.close(fd)
os.remove(tmp_name)
db_path = tmp_name
# Resolver los archivos: un glob por cada patron, dedup + orden estable.
globs = [g.strip() for g in pattern.split(",") if g.strip()]
found = set()
for g in globs:
for path in glob.glob(os.path.join(folder, g)):
if os.path.isfile(path):
found.add(path)
files = sorted(found)
conn = __import__("duckdb").connect(db_path)
tables = []
errors = []
used_names = set()
for i, path in enumerate(files):
base = os.path.basename(path)
stem, ext = os.path.splitext(base)
quoted_path = path.replace("'", "''")
reader = _reader_for_extension(ext, quoted_path)
if reader is None:
errors.append(
{
"source_file": path,
"error": f"unsupported extension: {ext!r}",
}
)
continue
name = _sanitize_table_name(stem, i)
# Desambiguar colisiones con sufijos _2, _3, ...
if name in used_names:
suffix = 2
while f"{name}_{suffix}" in used_names:
suffix += 1
name = f"{name}_{suffix}"
quoted_ident = '"' + name.replace('"', '""') + '"'
try:
conn.execute(
f"CREATE TABLE {quoted_ident} AS SELECT * FROM {reader}"
)
n_rows = conn.execute(
f"SELECT count(*) FROM {quoted_ident}"
).fetchone()[0]
used_names.add(name)
tables.append(
{
"name": name,
"source_file": path,
"n_rows": int(n_rows),
}
)
except Exception as e: # noqa: BLE001
errors.append(
{
"name": name,
"source_file": path,
"error": str(e),
}
)
return {
"status": "ok",
"db_path": db_path,
"tables": tables,
"errors": errors,
}
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e)}
finally:
if conn is not None:
conn.close()
@@ -0,0 +1,73 @@
"""Tests para load_folder_to_duckdb."""
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
import duckdb # noqa: E402
from load_folder_to_duckdb import load_folder_to_duckdb # noqa: E402
def _write_csv(path: str, header: str, rows: list[str]) -> None:
with open(path, "w", encoding="utf-8") as f:
f.write(header + "\n")
for r in rows:
f.write(r + "\n")
def test_carga_dos_csv_como_tablas(tmp_path):
_write_csv(
str(tmp_path / "ventas.csv"),
"id,total",
["1,10.5", "2,20.0", "3,5.25"],
)
_write_csv(
str(tmp_path / "clientes.csv"),
"id,nombre",
["1,ana", "2,luis"],
)
db = tmp_path / "out.duckdb"
res = load_folder_to_duckdb(str(tmp_path), str(db))
assert res["status"] == "ok", res
assert res["errors"] == []
assert len(res["tables"]) == 2
assert res["db_path"] == str(db)
assert os.path.exists(str(db))
by_name = {t["name"]: t for t in res["tables"]}
assert by_name["ventas"]["n_rows"] == 3
assert by_name["clientes"]["n_rows"] == 2
# Verificar que las tablas existen realmente en la base.
con = duckdb.connect(str(db), read_only=True)
assert con.execute("SELECT count(*) FROM ventas").fetchone()[0] == 3
assert con.execute("SELECT count(*) FROM clientes").fetchone()[0] == 2
con.close()
def test_db_path_none_crea_temporal(tmp_path):
_write_csv(str(tmp_path / "datos.csv"), "x", ["1", "2"])
res = load_folder_to_duckdb(str(tmp_path))
assert res["status"] == "ok", res
assert res["db_path"]
assert os.path.exists(res["db_path"])
assert len(res["tables"]) == 1
assert res["tables"][0]["n_rows"] == 2
os.remove(res["db_path"])
def test_carpeta_vacia_es_ok_sin_tablas(tmp_path):
db = tmp_path / "out.duckdb"
res = load_folder_to_duckdb(str(tmp_path), str(db))
assert res["status"] == "ok", res
assert res["tables"] == []
assert res["errors"] == []
def test_carpeta_inexistente_devuelve_status_error(tmp_path):
res = load_folder_to_duckdb(str(tmp_path / "no_existe"))
assert res["status"] == "error"
assert "folder" in res["error"]