diff --git a/python/functions/datascience/__init__.py b/python/functions/datascience/__init__.py index 5a47aaf4..fbd5ffc2 100644 --- a/python/functions/datascience/__init__.py +++ b/python/functions/datascience/__init__.py @@ -79,8 +79,10 @@ from .render_paper_pdf import render_paper_pdf from .draw_join_graph_figure import draw_join_graph_figure from .generate_synthetic_eda_table import generate_synthetic_eda_table from .generate_synthetic_eda_folder import generate_synthetic_eda_folder +from .load_bq_table_to_duckdb import load_bq_table_to_duckdb __all__ = [ + "load_bq_table_to_duckdb", "generate_synthetic_eda_table", "generate_synthetic_eda_folder", "render_paper_pdf", diff --git a/python/functions/datascience/load_bq_table_to_duckdb.md b/python/functions/datascience/load_bq_table_to_duckdb.md new file mode 100644 index 00000000..7bfcb19e --- /dev/null +++ b/python/functions/datascience/load_bq_table_to_duckdb.md @@ -0,0 +1,86 @@ +--- +name: load_bq_table_to_duckdb +kind: function +lang: py +domain: datascience +version: "1.1.0" +purity: impure +signature: "def load_bq_table_to_duckdb(table_fqn: str, duckdb_path: str, dest_table: str = '', sample_frac: float = None, max_rows: int = 0, project_id: str = '', pseudonymize_cols: list = None) -> dict" +description: "Adaptador BigQuery -> DuckDB local para el grupo eda. Trae una tabla o vista de Google BigQuery a un archivo DuckDB local (por defecto COMPLETA, todas las filas; muestreo opt-in con sample_frac), de modo que las funciones del grupo de capacidad eda (que solo hablan DuckDB/PostgreSQL) puedan perfilarla. Fetch via BigQuery Storage Read API (Arrow) con fallback REST. Seudonimiza columnas PII con hash SHA-1 truncado antes de materializar (LOPDGDD/RGPD)." +tags: [eda, bigquery, duckdb, datascience] +params: + - name: table_fqn + desc: "FQN completo de la tabla/vista BigQuery: `project.dataset.table`." + - name: duckdb_path + desc: "Ruta del archivo DuckDB local donde materializar la tabla (se crea/sobrescribe la tabla dest)." + - name: dest_table + desc: "Nombre de la tabla DuckDB destino. Vacío = último segmento del FQN, saneado." + - name: sample_frac + desc: "None (DEFAULT) = FULL, trae todas las filas. Un float en (0,1) activa el muestreo opt-in con `WHERE rand() < frac` (~frac del total). Vistas no admiten TABLESAMPLE, por eso rand()." + - name: max_rows + desc: "Tope duro opcional de filas (LIMIT). 0 (DEFAULT) = sin tope. Se combina con sample_frac si ambos se pasan." + - name: project_id + desc: "Proyecto GCP de facturación. Vacío = primer segmento del FQN o el del ADC." + - name: pseudonymize_cols + desc: "Lista de columnas PII a seudonimizar con hash SHA-1 truncado antes de materializar (LOPDGDD/RGPD). Preserva nulos y cardinalidad." +output: "dict dict-no-throw. En éxito {status:'ok', duckdb_path, table, n_rows_source, n_rows_fetched, sampled, sample_frac, columns, pseudonymized}. En error {status:'error', error}." +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [] +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/datascience/load_bq_table_to_duckdb.py" +--- + +## Ejemplo + +```python +from datascience import load_bq_table_to_duckdb + +# FULL por defecto: trae TODAS las filas de la vista (3,8M) a DuckDB. +r = load_bq_table_to_duckdb( + "autingo-159109.customer_marts.customer_profile", + "/tmp/eda_bq.duckdb", + pseudonymize_cols=["document_number", "full_name", "email", "phone"], +) +print(r["table"], r["n_rows_fetched"], "de", r["n_rows_source"], "sampled=", r["sampled"]) + +# Muestreo opt-in: ~5 % de las filas. +r = load_bq_table_to_duckdb( + "autingo-159109.customer_marts.customer_profile", + "/tmp/eda_bq_sample.duckdb", + sample_frac=0.05, + pseudonymize_cols=["document_number", "full_name", "email", "phone"], +) +``` + +## Cuando usarla + +- Antes de perfilar una tabla/vista de BigQuery con el grupo `eda` (que solo habla DuckDB/PostgreSQL): trae el origen COMPLETO a DuckDB local (o una muestra con `sample_frac`) con seudonimización PII. +- Cuando necesites un puente único BigQuery -> DuckDB local -> grupo `eda` sin escribir el bridge inline cada vez. +- Cuando quieras que un EDA sobre datos de negocio conserve valor analítico (cardinalidad, nulos, distribución) sin incrustar datos personales reales. + +## Gotchas + +- **Impura**: hace I/O de red (BigQuery) + escritura a disco (DuckDB). Requiere ADC configurado (`gcloud auth application-default login`). +- **403 USER_PROJECT_DENIED**: se evita aplicando `creds.with_quota_project(None)` cuando el ADC arrastra un quota project ajeno (memoria `bq_direct_quota_project`). +- **TABLESAMPLE no funciona en vistas**: el muestreo (opt-in, `sample_frac`) usa `WHERE rand() < frac` (aplicable a tablas y vistas). `max_rows` es un `LIMIT` como tope duro opcional. +- **FULL por defecto**: `sample_frac=None` trae TODAS las filas. Trae el resultado a RAM como DataFrame de pandas antes de materializar en DuckDB, así que una tabla de muchos millones × muchas columnas puede consumir varios GB. Para tablas enormes que no quepan, pasa `sample_frac` (muestra) o `max_rows` (tope). El fetch usa el BigQuery Storage Read API (Arrow) cuando `google-cloud-bigquery-storage` + `pyarrow` están disponibles — mucho más rápido que REST para millones de filas; si no, cae al conversor REST automáticamente. +- **La seudonimización es un hash unidireccional** (SHA-1 truncado a 12 hex): no es reversible, correcto para EDA. Preserva nulos, cardinalidad y patrón de faltantes, pero NO permite recuperar el valor original. +- **dict-no-throw**: nunca lanza excepción; ante cualquier fallo (FQN inválido, auth, query) devuelve `{status:'error', error:str}`. + +## Notas + +Adaptador del grupo de capacidad `eda`: el resto de funciones del grupo perfilan +DuckDB/PostgreSQL, pero no hablan BigQuery de forma nativa. Esta función cubre ese +hueco materializando una sola tabla DuckDB desde el DataFrame resultante de la +query BigQuery. El nombre de tabla destino se sanea (`^[A-Za-z_][A-Za-z0-9_]*$`) +antes de citarlo en el `CREATE OR REPLACE TABLE`. + +## Capability growth log + +- v1.1.0 (2026-07-01) — FULL pasa a ser el DEFAULT: se sustituye `max_rows=300000, sample=True` por `sample_frac=None` (None = todas las filas) + `max_rows=0` (tope duro opcional). El muestreo es opt-in explícito. Fetch acelerado via BigQuery Storage Read API (Arrow) con fallback REST. Preferencia estándar del usuario: los EDA se corren sobre el total salvo que se pida lo contrario. diff --git a/python/functions/datascience/load_bq_table_to_duckdb.py b/python/functions/datascience/load_bq_table_to_duckdb.py new file mode 100644 index 00000000..2a468cf1 --- /dev/null +++ b/python/functions/datascience/load_bq_table_to_duckdb.py @@ -0,0 +1,157 @@ +"""load_bq_table_to_duckdb — adaptador BigQuery -> DuckDB local para el grupo `eda`. + +Trae una tabla o vista de Google BigQuery a un archivo DuckDB local (por defecto +COMPLETA — todas las filas — o una muestra si se pasa `sample_frac`), de modo que +las funciones del grupo de capacidad `eda` (que perfilan DuckDB/PostgreSQL) +puedan analizarla sin un adaptador BigQuery nativo. Materializa una sola tabla +DuckDB desde un DataFrame de pandas. + +Modo por defecto = FULL: `sample_frac=None` trae la vista/tabla entera (preferencia +estándar del usuario: los EDA se corren sobre el total salvo que se pida lo +contrario). El muestreo es opt-in explícito: `sample_frac=0.05` trae ~5 %; `max_rows` +es un tope duro opcional (0 = sin tope). El fetch usa el BigQuery Storage Read API +(Arrow) cuando está disponible, con fallback al conversor REST. + +Seudonimización LOPDGDD/RGPD: las columnas listadas en `pseudonymize_cols` se +transforman con un hash SHA-1 truncado ANTES de escribir a disco, preservando +nulos, cardinalidad y patrón de faltantes pero sin volcar el valor real (DNI, +nombre, email, teléfono, etc.). El EDA conserva su valor analítico sin incrustar +datos personales reales. + +Autenticación: ADC (gcloud auth). Aplica creds.with_quota_project(None) para +evitar el 403 USER_PROJECT_DENIED cuando el ADC lleva quota project ajeno. + +Estilo dict-no-throw del grupo `eda`: nunca lanza; devuelve {status:'error', ...}. +""" + +import hashlib +import re + +_FQN_RE = re.compile(r"^[A-Za-z0-9_.\-]+$") + + +def _pseudonymize_series(values): + """Hash SHA-1 truncado (12 hex) de cada valor no nulo; conserva None/NaN.""" + import pandas as pd + out = [] + for v in values: + if v is None or (isinstance(v, float) and pd.isna(v)) or ( + not isinstance(v, (list, dict)) and pd.isna(v) if _safe_isna(v) else False + ): + out.append(None) + else: + h = hashlib.sha1(str(v).encode("utf-8")).hexdigest()[:12] + out.append(h) + return out + + +def _safe_isna(v): + import pandas as pd + try: + return bool(pd.isna(v)) + except (TypeError, ValueError): + return False + + +def load_bq_table_to_duckdb( + table_fqn: str, + duckdb_path: str, + dest_table: str = "", + sample_frac: float = None, + max_rows: int = 0, + project_id: str = "", + pseudonymize_cols: list = None, +) -> dict: + try: + import duckdb + import google.auth + from google.cloud import bigquery + + if not table_fqn or not _FQN_RE.match(table_fqn): + return {"status": "error", "error": f"table_fqn inválido: {table_fqn!r}"} + + # dest_table: derivar del último segmento del FQN si no se pasa. + dest = dest_table or table_fqn.split(".")[-1] + if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", dest): + dest = re.sub(r"[^A-Za-z0-9_]", "_", dest) or "bq_table" + + # Auth ADC con fix de quota project (403 USER_PROJECT_DENIED). + creds, adc_project = google.auth.default( + scopes=["https://www.googleapis.com/auth/bigquery"] + ) + if hasattr(creds, "with_quota_project"): + creds = creds.with_quota_project(None) + proj = project_id or table_fqn.split(".")[0] or adc_project + client = bigquery.Client(project=proj, credentials=creds) + + # Conteo de filas de origen. + cnt = client.query( + f"SELECT COUNT(*) AS n FROM `{table_fqn}`" + ).result() + n_source = 0 + for row in cnt: + n_source = int(row["n"]) + + # Modo por defecto = FULL (sample_frac=None -> todas las filas). El + # muestreo es opt-in: sample_frac in (0,1) muestrea esa fracción con + # `WHERE rand() < frac` (aplicable a tablas y vistas; TABLESAMPLE no va + # en vistas). max_rows>0 es un tope duro opcional (LIMIT); 0 = sin tope. + sampled = False + where = "" + if sample_frac is not None and 0 < float(sample_frac) < 1: + where = f" WHERE rand() < {float(sample_frac)}" + sampled = True + limit = f" LIMIT {int(max_rows)}" if max_rows and int(max_rows) > 0 else "" + sql = f"SELECT * FROM `{table_fqn}`{where}{limit}" + + # Fetch: BigQuery Storage Read API (Arrow, rápido para millones de filas) + # con fallback al conversor REST si la lib no está o falla. + try: + df = client.query(sql).result().to_dataframe(create_bqstorage_client=True) + except Exception: # noqa: BLE001 + df = client.query(sql).result().to_dataframe(create_bqstorage_client=False) + n_fetched = len(df) + + # Normalizar dtypes de db-dtypes: el conversor REST de BigQuery mapea las + # columnas DATE/TIME a las extension dtypes `dbdate`/`dbtime` de db-dtypes, + # que DuckDB NO reconoce al registrar el DataFrame ("Data type 'dbdate' not + # recognized"). Se convierten a tipos estándar que DuckDB sí ingiere: DATE + # -> datetime64[ns], TIME -> string. El resto de dtypes (datetime64 de + # TIMESTAMP, Int64/boolean nullable, object) los acepta DuckDB tal cual. + import pandas as pd + for col in df.columns: + dt = str(df[col].dtype) + if dt == "dbdate": + df[col] = pd.to_datetime(df[col], errors="coerce") + elif dt == "dbtime": + df[col] = df[col].astype("string").astype(object) + + # Seudonimización de columnas PII antes de escribir a disco. + pseudo_applied = [] + for col in (pseudonymize_cols or []): + if col in df.columns: + df[col] = _pseudonymize_series(df[col].tolist()) + pseudo_applied.append(col) + + # Materializar a DuckDB (una tabla desde el DataFrame). + con = duckdb.connect(duckdb_path) + try: + con.register("_src_df", df) + con.execute(f'CREATE OR REPLACE TABLE "{dest}" AS SELECT * FROM _src_df') + con.unregister("_src_df") + finally: + con.close() + + return { + "status": "ok", + "duckdb_path": duckdb_path, + "table": dest, + "n_rows_source": n_source, + "n_rows_fetched": n_fetched, + "sampled": sampled, + "sample_frac": float(sample_frac) if sampled else None, + "columns": list(df.columns), + "pseudonymized": pseudo_applied, + } + except Exception as e: # noqa: BLE001 + return {"status": "error", "error": str(e)} diff --git a/python/functions/pipelines/profile_bq_table.md b/python/functions/pipelines/profile_bq_table.md new file mode 100644 index 00000000..7621b07a --- /dev/null +++ b/python/functions/pipelines/profile_bq_table.md @@ -0,0 +1,106 @@ +--- +name: profile_bq_table +kind: pipeline +lang: py +domain: pipelines +purity: impure +version: "1.1.0" +signature: "def profile_bq_table(table_fqn: str, sample_frac: float = None, max_rows: int = 0, pseudonymize_cols: list = None, run_models: bool = True, run_series: bool = False, run_llm: bool = False, project_id: str = \"\", report_dir: str = \"reports\", duckdb_path: str = \"\", keep_duckdb: bool = False) -> dict" +description: "EDA one-shot de una tabla o vista de BigQuery: materializa el origen COMPLETO por defecto (todas las filas; muestreo opt-in con sample_frac; seudonimizacion PII opcional, LOPDGDD/RGPD) a un DuckDB local con load_bq_table_to_duckdb y lo perfila end-to-end con profile_table del grupo de capacidad eda, emitiendo el informe AutomaticEDA (PDF A5 movil + PPTX 16:9), Markdown y JSON sidecar. Es el adaptador BigQuery que faltaba en el grupo eda, resuelto por composicion (BigQuery -> DuckDB local -> profile_table) sin duplicar la logica de perfilado ni de render. Es el hazme un EDA de esta tabla BigQuery en una sola llamada, sobre el total de filas por defecto." +tags: [eda, bigquery, launcher] +uses_functions: + - load_bq_table_to_duckdb_py_datascience + - profile_table_py_pipelines +uses_types: [] +returns: [] +returns_optional: false +error_type: error_go_core +imports: [] +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/pipelines/profile_bq_table.py" +params: + - name: table_fqn + desc: "FQN de la tabla/vista BigQuery: `project.dataset.table`." + - name: sample_frac + desc: "None (DEFAULT) = FULL, perfila TODAS las filas del origen. Un float en (0,1) activa el muestreo opt-in (`WHERE rand() < frac`, ~frac del total)." + - name: max_rows + desc: "Tope duro opcional de filas (LIMIT). 0 (DEFAULT) = sin tope. Se combina con sample_frac si ambos se pasan." + - name: pseudonymize_cols + desc: "Columnas PII a seudonimizar (hash) antes de materializar (LOPDGDD/RGPD). Preserva nulos y cardinalidad." + - name: run_models + desc: "PCA/KMeans/IsolationForest/normalidad sobre numericas. Default True (informe AutomaticEDA completo)." + - name: run_series + desc: "Analisis de serie temporal por columna numerica. Default False." + - name: run_llm + desc: "1 llamada LLM sobre el perfil agregado (nunca filas crudas). Default False." + - name: project_id + desc: "Proyecto GCP de facturacion. Vacio = primer segmento del FQN." + - name: report_dir + desc: "Directorio de salida de los reports. Default 'reports' (artefacto local gitignored)." + - name: duckdb_path + desc: "Ruta DuckDB a usar. Vacio = temporal autogestionado." + - name: keep_duckdb + desc: "Si True conserva el DuckDB materializado (para el notebook Jupyter). Default False." +output: "dict dict-no-throw. En exito {status:'ok', table_fqn, load:{n_rows_source,n_rows_fetched,sampled,sample_frac,pseudonymized,table}, duckdb_path, report_md_path, report_json_path, aeda_pdf_path, aeda_pptx_path, aeda_manifest_path, profile}. En error {status:'error', error, stage}." +--- + +## Ejemplo + +```python +from pipelines.profile_bq_table import profile_bq_table + +# FULL por defecto: EDA sobre TODAS las filas de la vista (3,8M). +r = profile_bq_table( + "autingo-159109.customer_marts.customer_profile", + pseudonymize_cols=["document_number", "full_name", "email", "phone", "postal_code", "salesforce_customer_id"], + run_models=True, +) +print(r["load"]["n_rows_fetched"], "filas perfiladas, sampled=", r["load"]["sampled"]) +print(r["aeda_pdf_path"]); print(r["aeda_pptx_path"]); print(r["report_md_path"]) + +# Muestreo opt-in: EDA sobre ~5 % de las filas (tabla enorme / iteracion rapida). +r = profile_bq_table( + "autingo-159109.customer_marts.customer_profile", + sample_frac=0.05, + pseudonymize_cols=["document_number", "full_name", "email", "phone", "postal_code", "salesforce_customer_id"], +) +``` + +## Cuando usarla + +Cuando pidan un EDA de una tabla o vista de BigQuery ("hazme un EDA de esta +tabla BigQuery"). Es el adaptador BigQuery del grupo de capacidad `eda` por +composicion: trae el origen COMPLETO (todas las filas, por defecto) a un DuckDB +local y delega todo el perfilado y render en `profile_table`, sin adaptador +BigQuery nativo ni logica de EDA duplicada. Usala como primer paso al recibir un +dataset BigQuery desconocido, antes de modelar o limpiar, o para auditar la +calidad de una vista ya productiva. Para iteracion rapida o tablas que no quepan +en RAM, pasa `sample_frac` (muestreo opt-in). + +## Gotchas + +- Impura: requiere ADC de BigQuery configurado (Application Default Credentials) + para que `load_bq_table_to_duckdb` autentique contra el proyecto. +- FULL por defecto: `sample_frac=None` perfila TODAS las filas del origen. Una + vista de millones de filas se trae entera a RAM (varios GB posibles) antes de + materializar en DuckDB; el fetch usa el BigQuery Storage Read API (Arrow) cuando + esta disponible, mucho mas rapido que REST. Para acotar coste/memoria, pasa + `sample_frac` in (0,1) (muestreo opt-in) o `max_rows` (tope duro). Si por limite + de recursos no cabe el total, dilo explicito con el maximo que si se cargo. +- Seudonimiza PII con `pseudonymize_cols` para cumplir LOPDGDD/RGPD ANTES de + escribir a disco: nombres, DNI/NIE, email, telefono, direccion, IDs de cliente, + etc. Se hashean preservando nulos y cardinalidad. Sin seudonimizar, la muestra + materializada (DuckDB + reports) contiene datos personales reales [POL-MMNSEG-001-1.0]. +- El DuckDB temporal se borra al terminar salvo `keep_duckdb=True` (pasalo para + seguir explorando la muestra desde un notebook Jupyter). Si pasas `duckdb_path` + explicito, la ruta se respeta y solo se conserva con `keep_duckdb=True`. +- Escribe reports a `report_dir` (default 'reports', artefacto local gitignored): + Markdown + JSON sidecar + PDF A5 movil + PPTX 16:9 del informe AutomaticEDA. +- `run_llm=True` gasta tokens (haiku) pero solo envia el perfil agregado, nunca + filas crudas ni datos personales. + +## Capability growth log + +- v1.1.0 (2026-07-01) — FULL pasa a ser el DEFAULT del pipeline: se sustituye `max_rows=300000, sample=True` por `sample_frac=None` (None = perfila todas las filas) + `max_rows=0` (tope duro opcional). El muestreo es opt-in explicito (`sample_frac`). Alinea con la preferencia estandar del usuario: los EDA se corren sobre el total salvo que se pida lo contrario. Hereda el fetch acelerado (Arrow/bqstorage) de `load_bq_table_to_duckdb` v1.1.0. diff --git a/python/functions/pipelines/profile_bq_table.py b/python/functions/pipelines/profile_bq_table.py new file mode 100644 index 00000000..37ad52a1 --- /dev/null +++ b/python/functions/pipelines/profile_bq_table.py @@ -0,0 +1,138 @@ +"""profile_bq_table — EDA one-shot de una tabla/vista BigQuery con el grupo `eda`. + +Pipeline impuro: materializa una tabla o vista de BigQuery (por defecto COMPLETA — +todas las filas — o una muestra si se pasa `sample_frac`, con seudonimizacion PII +opcional, LOPDGDD/RGPD) a un DuckDB local con `load_bq_table_to_duckdb`, y la +perfila end-to-end con `profile_table` del grupo de capacidad `eda`, emitiendo el +informe AutomaticEDA (PDF A5 movil + PPTX 16:9), Markdown y JSON sidecar. Es el +adaptador BigQuery que faltaba en el grupo `eda`, resuelto por composicion +(BigQuery -> DuckDB local -> profile_table) sin duplicar la logica de perfilado ni +de render. + +Modo por defecto = FULL: `sample_frac=None` perfila TODAS las filas del origen +(preferencia estandar del usuario: los EDA se corren sobre el total salvo que se +pida lo contrario). El muestreo es opt-in explicito: `sample_frac=0.05` perfila +~5 % de las filas; `max_rows` es un tope duro opcional (0 = sin tope). + +Funciones del registry compuestas (NO se reimplementa su logica): + - load_bq_table_to_duckdb : trae la tabla/vista BigQuery a un DuckDB local + (completa por defecto, o muestra si sample_frac). + - profile_table : orquestador one-shot del grupo `eda` que perfila la + DuckDB materializada y emite el informe AutomaticEDA. + +Estilo dict-no-throw del grupo `eda`: nunca lanza; devuelve {status:'error', ...}. +""" + +import os +import tempfile + +from datascience import load_bq_table_to_duckdb +from pipelines.profile_table import profile_table + + +def profile_bq_table( + table_fqn: str, + sample_frac: float = None, + max_rows: int = 0, + pseudonymize_cols: list = None, + run_models: bool = True, + run_series: bool = False, + run_llm: bool = False, + project_id: str = "", + report_dir: str = "reports", + duckdb_path: str = "", + keep_duckdb: bool = False, +) -> dict: + """EDA one-shot de una tabla/vista BigQuery. + + Por defecto perfila TODAS las filas del origen (`sample_frac=None`, modo FULL). + Materializa el origen (con seudonimizacion PII opcional) a un DuckDB local y lo + perfila con `profile_table` del grupo `eda`, emitiendo el informe AutomaticEDA + (PDF A5 movil + PPTX 16:9) + Markdown + JSON sidecar. + + Args: + table_fqn: FQN de la tabla/vista BigQuery ("project.dataset.table"). + sample_frac: None (default) = FULL, perfila todas las filas. Un float en + (0,1) activa el muestreo opt-in (`WHERE rand() < frac`, ~frac del total). + max_rows: Tope duro opcional de filas (LIMIT). 0 (default) = sin tope. + pseudonymize_cols: Columnas PII a seudonimizar (hash) antes de materializar. + run_models: Modelos baratos (PCA/KMeans/IsolationForest/normalidad). + run_series: Analisis de serie temporal por columna numerica. + run_llm: 1 llamada LLM sobre el perfil agregado (nunca filas crudas). + project_id: Proyecto GCP de facturacion. Vacio = primer segmento del FQN. + report_dir: Directorio de salida de los reports. + duckdb_path: Ruta DuckDB a usar. Vacio = temporal autogestionado. + keep_duckdb: Si True conserva el DuckDB materializado. + + Returns: + dict dict-no-throw con el resultado del pipeline (ver output del .md). + """ + tmp_created = False + try: + # DuckDB temporal si no se pasa ruta. + if not duckdb_path: + fd, duckdb_path = tempfile.mkstemp(prefix="eda_bq_", suffix=".duckdb") + os.close(fd) + os.remove(duckdb_path) # que lo cree DuckDB limpio + tmp_created = True + + load = load_bq_table_to_duckdb( + table_fqn, + duckdb_path, + sample_frac=sample_frac, + max_rows=max_rows, + project_id=project_id, + pseudonymize_cols=pseudonymize_cols, + ) + if load.get("status") != "ok": + return { + "status": "error", + "error": load.get("error", "load fallo"), + "stage": "load", + } + + prof = profile_table( + duckdb_path, + load["table"], + backend="duckdb", + run_models=run_models, + run_series=run_series, + run_llm=run_llm, + emit_automatic=True, # PDF A5 movil + PPTX 16:9 + emit_pdf=False, + write_report=True, # Markdown + JSON sidecar + report_dir=report_dir, + ) + if prof.get("status") != "ok": + return { + "status": "error", + "error": prof.get("error", "profile fallo"), + "stage": "profile", + "load": load, + } + + return { + "status": "ok", + "table_fqn": table_fqn, + "load": { + k: load[k] + for k in ("n_rows_source", "n_rows_fetched", "sampled", "sample_frac", "pseudonymized", "table") + if k in load + }, + "duckdb_path": duckdb_path if keep_duckdb else None, + "report_md_path": prof.get("report_md_path"), + "report_json_path": prof.get("report_json_path"), + "aeda_pdf_path": prof.get("aeda_pdf_path"), + "aeda_pptx_path": prof.get("aeda_pptx_path"), + "aeda_manifest_path": prof.get("aeda_manifest_path"), + "profile": prof.get("profile"), + } + except Exception as e: # noqa: BLE001 + return {"status": "error", "error": str(e)} + finally: + # Limpia el DuckDB temporal salvo que se pida conservarlo. + if tmp_created and not keep_duckdb and duckdb_path and os.path.exists(duckdb_path): + try: + os.remove(duckdb_path) + except OSError: + pass