Files
fn_registry/python/functions/infra/load_folder_to_duckdb.py
T
egutierrez 6a1520f458 feat(eda): EDA de carpeta/base multi-tabla -> AutomaticEDA por capitulos (PDF+PPTX+MD)
Pipeline render_automatic_eda_folder: apunta el AutomaticEDA a una CARPETA de
archivos tabulares (CSV/Parquet/JSON) o a una DuckDB existente y emite el informe
de la BASE por capitulos en PDF (A5 movil) + PPTX (16:9) + Markdown. Documento-base
con portada-base, resumen de todas las tablas y relaciones inter-tabla (FK
candidatas por containment + diagrama Mermaid del join graph). Flag per_table_eda
anexa el mini-EDA de cada tabla. Aditivo: render_automatic_eda (tabla unica) intacto.

Funcion nueva load_folder_to_duckdb (infra, grupo eda+duckdb): carga una carpeta a
una DuckDB (temp si no se da path), CREATE TABLE por archivo con read_csv_auto/
read_parquet/read_json_auto. dict-no-throw.

Compone profile_database + los 3 renderers del motor AutomaticEDA + build_document
(per-tabla), sin reimplementar su logica. Tests: golden 3 CSV relacionados (FK
orders.customer_id->customers.id detectada) + edges (carpeta vacia, 1 tabla,
DuckDB existente, path inexistente). fn index sin error.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 20:34:10 +02:00

176 lines
6.6 KiB
Python

"""Carga una carpeta de archivos tabulares (CSV/Parquet/JSON) como tablas DuckDB.
Funcion impura: escanea el primer nivel de un directorio buscando archivos que
casen con uno o varios globs, y por cada archivo crea una tabla en una base
DuckDB usando los lectores nativos (`read_csv_auto`, `read_parquet`,
`read_json_auto`). Es la pieza de entrada del EDA a nivel de carpeta (grupo
`eda`): deja una DuckDB con una tabla por archivo, lista para perfilar y
correlacionar aguas abajo.
Devuelve siempre un dict sin lanzar excepciones, siguiendo el estilo del grupo
duckdb del registry: {status:'ok', db_path, tables, errors} en exito (incluida
la carpeta sin archivos tabulares, que es un exito con tables=[]) y
{status:'error', error:str} cuando la carpeta no existe o falla algo global.
El nombre de cada tabla se deriva del basename del archivo, saneado a
`[0-9a-zA-Z_]` en minusculas, prefijado con `t_` si empieza por digito, y
desambiguado con sufijos `_2`, `_3`, ... ante colisiones. El path del archivo se
escapa (comilla simple, `'`->`''`) antes de interpolarlo en el SQL del lector,
ya que los lectores DuckDB no admiten el path como parametro posicional. Un fallo
al cargar un archivo concreto NO aborta el resto: se registra en `errors` y se
continua con los siguientes.
"""
import glob
import os
import re
import tempfile
def _sanitize_table_name(basename_no_ext: str, index: int) -> str:
"""Deriva un identificador de tabla valido desde el basename de un archivo.
Reemplaza todo lo que no sea ``[0-9a-zA-Z_]`` por ``_`` y baja a minusculas.
Si tras el saneo queda vacio, usa ``tabla_<index>``. Si empieza por digito,
prefija ``t_`` para que sea un identificador SQL valido.
"""
name = re.sub(r"[^0-9a-zA-Z_]", "_", basename_no_ext).lower()
if not name:
name = f"tabla_{index}"
if name[0].isdigit():
name = "t_" + name
return name
def _reader_for_extension(ext: str, quoted_path: str):
"""Devuelve la expresion de lector DuckDB para una extension, o None.
El ``quoted_path`` ya viene escapado y entre comillas simples. Extensiones
desconocidas devuelven None para que el llamador salte el archivo.
"""
ext = ext.lower()
if ext in (".csv", ".tsv", ".txt"):
return f"read_csv_auto('{quoted_path}')"
if ext in (".parquet", ".pq"):
return f"read_parquet('{quoted_path}')"
if ext in (".json", ".ndjson"):
return f"read_json_auto('{quoted_path}')"
return None
def load_folder_to_duckdb(
folder: str,
db_path: str = None,
pattern: str = "*.csv,*.parquet,*.json",
) -> dict:
"""Carga los archivos tabulares de una carpeta como tablas en una DuckDB.
Args:
folder: ruta a un directorio. Si no existe o no es un directorio,
devuelve {status:'error', ...} sin lanzar.
db_path: ruta de la DuckDB destino (read-write, se crea si no existe). Si
es None, se genera una base temporal con NamedTemporaryFile y su ruta
se devuelve en el retorno (`db_path`).
pattern: CSV de globs separados por coma (default
"*.csv,*.parquet,*.json"). Cada glob se aplica con
glob.glob(os.path.join(folder, g)) en el primer nivel (NO recursivo);
los resultados se deduplican y ordenan.
Returns:
dict. En exito: {status:'ok', db_path:str, tables:[{name, source_file,
n_rows}], errors:[{name?, source_file, error}]}. La carpeta sin archivos
tabulares es un exito con tables=[] y errors=[]. En error (sin lanzar):
{status:'error', error:str}.
"""
if not isinstance(folder, str) or not os.path.isdir(folder):
return {
"status": "error",
"error": f"folder does not exist or is not a directory: {folder!r}",
}
conn = None
try:
# Resolver la ruta de la DuckDB destino. Si no se da, reservar un nombre
# temporal unico y borrar el archivo vacio que crea mkstemp: DuckDB 1.5.2
# rechaza abrir un archivo de 0 bytes ("not a valid DuckDB database
# file"), por lo que debe crear el archivo el mismo desde cero.
if db_path is None:
fd, tmp_name = tempfile.mkstemp(suffix=".duckdb")
os.close(fd)
os.remove(tmp_name)
db_path = tmp_name
# Resolver los archivos: un glob por cada patron, dedup + orden estable.
globs = [g.strip() for g in pattern.split(",") if g.strip()]
found = set()
for g in globs:
for path in glob.glob(os.path.join(folder, g)):
if os.path.isfile(path):
found.add(path)
files = sorted(found)
conn = __import__("duckdb").connect(db_path)
tables = []
errors = []
used_names = set()
for i, path in enumerate(files):
base = os.path.basename(path)
stem, ext = os.path.splitext(base)
quoted_path = path.replace("'", "''")
reader = _reader_for_extension(ext, quoted_path)
if reader is None:
errors.append(
{
"source_file": path,
"error": f"unsupported extension: {ext!r}",
}
)
continue
name = _sanitize_table_name(stem, i)
# Desambiguar colisiones con sufijos _2, _3, ...
if name in used_names:
suffix = 2
while f"{name}_{suffix}" in used_names:
suffix += 1
name = f"{name}_{suffix}"
quoted_ident = '"' + name.replace('"', '""') + '"'
try:
conn.execute(
f"CREATE TABLE {quoted_ident} AS SELECT * FROM {reader}"
)
n_rows = conn.execute(
f"SELECT count(*) FROM {quoted_ident}"
).fetchone()[0]
used_names.add(name)
tables.append(
{
"name": name,
"source_file": path,
"n_rows": int(n_rows),
}
)
except Exception as e: # noqa: BLE001
errors.append(
{
"name": name,
"source_file": path,
"error": str(e),
}
)
return {
"status": "ok",
"db_path": db_path,
"tables": tables,
"errors": errors,
}
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e)}
finally:
if conn is not None:
conn.close()