feat(eda): pipeline BQ-EDA sobre tablas BigQuery (grupo eda)
Añade el conector y el pipeline para hacer EDA automático sobre tablas/vistas de BigQuery, reutilizando profile_table del grupo eda sin duplicar profiling: - load_bq_table_to_duckdb (datascience): trae una tabla BQ a DuckDB con seudonimización SHA-1 de columnas PII y normalización de dtypes. Por defecto carga el total de filas (sample_frac=None); el muestreo es opt-in explícito. - profile_bq_table (pipeline): orquesta load -> profile_table -> render report (JSON + Markdown + PDF/PPTX). Full por defecto. Ambas tageadas eda+bigquery, v1.1.0. El default full responde a la preferencia del operador: los EDA se corren sobre el total salvo indicación contraria. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -79,8 +79,10 @@ from .render_paper_pdf import render_paper_pdf
|
||||
from .draw_join_graph_figure import draw_join_graph_figure
|
||||
from .generate_synthetic_eda_table import generate_synthetic_eda_table
|
||||
from .generate_synthetic_eda_folder import generate_synthetic_eda_folder
|
||||
from .load_bq_table_to_duckdb import load_bq_table_to_duckdb
|
||||
|
||||
__all__ = [
|
||||
"load_bq_table_to_duckdb",
|
||||
"generate_synthetic_eda_table",
|
||||
"generate_synthetic_eda_folder",
|
||||
"render_paper_pdf",
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
---
|
||||
name: load_bq_table_to_duckdb
|
||||
kind: function
|
||||
lang: py
|
||||
domain: datascience
|
||||
version: "1.1.0"
|
||||
purity: impure
|
||||
signature: "def load_bq_table_to_duckdb(table_fqn: str, duckdb_path: str, dest_table: str = '', sample_frac: float = None, max_rows: int = 0, project_id: str = '', pseudonymize_cols: list = None) -> dict"
|
||||
description: "Adaptador BigQuery -> DuckDB local para el grupo eda. Trae una tabla o vista de Google BigQuery a un archivo DuckDB local (por defecto COMPLETA, todas las filas; muestreo opt-in con sample_frac), de modo que las funciones del grupo de capacidad eda (que solo hablan DuckDB/PostgreSQL) puedan perfilarla. Fetch via BigQuery Storage Read API (Arrow) con fallback REST. Seudonimiza columnas PII con hash SHA-1 truncado antes de materializar (LOPDGDD/RGPD)."
|
||||
tags: [eda, bigquery, duckdb, datascience]
|
||||
params:
|
||||
- name: table_fqn
|
||||
desc: "FQN completo de la tabla/vista BigQuery: `project.dataset.table`."
|
||||
- name: duckdb_path
|
||||
desc: "Ruta del archivo DuckDB local donde materializar la tabla (se crea/sobrescribe la tabla dest)."
|
||||
- name: dest_table
|
||||
desc: "Nombre de la tabla DuckDB destino. Vacío = último segmento del FQN, saneado."
|
||||
- name: sample_frac
|
||||
desc: "None (DEFAULT) = FULL, trae todas las filas. Un float en (0,1) activa el muestreo opt-in con `WHERE rand() < frac` (~frac del total). Vistas no admiten TABLESAMPLE, por eso rand()."
|
||||
- name: max_rows
|
||||
desc: "Tope duro opcional de filas (LIMIT). 0 (DEFAULT) = sin tope. Se combina con sample_frac si ambos se pasan."
|
||||
- name: project_id
|
||||
desc: "Proyecto GCP de facturación. Vacío = primer segmento del FQN o el del ADC."
|
||||
- name: pseudonymize_cols
|
||||
desc: "Lista de columnas PII a seudonimizar con hash SHA-1 truncado antes de materializar (LOPDGDD/RGPD). Preserva nulos y cardinalidad."
|
||||
output: "dict dict-no-throw. En éxito {status:'ok', duckdb_path, table, n_rows_source, n_rows_fetched, sampled, sample_frac, columns, pseudonymized}. En error {status:'error', error}."
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: "error_go_core"
|
||||
imports: []
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "python/functions/datascience/load_bq_table_to_duckdb.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
from datascience import load_bq_table_to_duckdb
|
||||
|
||||
# FULL por defecto: trae TODAS las filas de la vista (3,8M) a DuckDB.
|
||||
r = load_bq_table_to_duckdb(
|
||||
"autingo-159109.customer_marts.customer_profile",
|
||||
"/tmp/eda_bq.duckdb",
|
||||
pseudonymize_cols=["document_number", "full_name", "email", "phone"],
|
||||
)
|
||||
print(r["table"], r["n_rows_fetched"], "de", r["n_rows_source"], "sampled=", r["sampled"])
|
||||
|
||||
# Muestreo opt-in: ~5 % de las filas.
|
||||
r = load_bq_table_to_duckdb(
|
||||
"autingo-159109.customer_marts.customer_profile",
|
||||
"/tmp/eda_bq_sample.duckdb",
|
||||
sample_frac=0.05,
|
||||
pseudonymize_cols=["document_number", "full_name", "email", "phone"],
|
||||
)
|
||||
```
|
||||
|
||||
## Cuando usarla
|
||||
|
||||
- Antes de perfilar una tabla/vista de BigQuery con el grupo `eda` (que solo habla DuckDB/PostgreSQL): trae el origen COMPLETO a DuckDB local (o una muestra con `sample_frac`) con seudonimización PII.
|
||||
- Cuando necesites un puente único BigQuery -> DuckDB local -> grupo `eda` sin escribir el bridge inline cada vez.
|
||||
- Cuando quieras que un EDA sobre datos de negocio conserve valor analítico (cardinalidad, nulos, distribución) sin incrustar datos personales reales.
|
||||
|
||||
## Gotchas
|
||||
|
||||
- **Impura**: hace I/O de red (BigQuery) + escritura a disco (DuckDB). Requiere ADC configurado (`gcloud auth application-default login`).
|
||||
- **403 USER_PROJECT_DENIED**: se evita aplicando `creds.with_quota_project(None)` cuando el ADC arrastra un quota project ajeno (memoria `bq_direct_quota_project`).
|
||||
- **TABLESAMPLE no funciona en vistas**: el muestreo (opt-in, `sample_frac`) usa `WHERE rand() < frac` (aplicable a tablas y vistas). `max_rows` es un `LIMIT` como tope duro opcional.
|
||||
- **FULL por defecto**: `sample_frac=None` trae TODAS las filas. Trae el resultado a RAM como DataFrame de pandas antes de materializar en DuckDB, así que una tabla de muchos millones × muchas columnas puede consumir varios GB. Para tablas enormes que no quepan, pasa `sample_frac` (muestra) o `max_rows` (tope). El fetch usa el BigQuery Storage Read API (Arrow) cuando `google-cloud-bigquery-storage` + `pyarrow` están disponibles — mucho más rápido que REST para millones de filas; si no, cae al conversor REST automáticamente.
|
||||
- **La seudonimización es un hash unidireccional** (SHA-1 truncado a 12 hex): no es reversible, correcto para EDA. Preserva nulos, cardinalidad y patrón de faltantes, pero NO permite recuperar el valor original.
|
||||
- **dict-no-throw**: nunca lanza excepción; ante cualquier fallo (FQN inválido, auth, query) devuelve `{status:'error', error:str}`.
|
||||
|
||||
## Notas
|
||||
|
||||
Adaptador del grupo de capacidad `eda`: el resto de funciones del grupo perfilan
|
||||
DuckDB/PostgreSQL, pero no hablan BigQuery de forma nativa. Esta función cubre ese
|
||||
hueco materializando una sola tabla DuckDB desde el DataFrame resultante de la
|
||||
query BigQuery. El nombre de tabla destino se sanea (`^[A-Za-z_][A-Za-z0-9_]*$`)
|
||||
antes de citarlo en el `CREATE OR REPLACE TABLE`.
|
||||
|
||||
## Capability growth log
|
||||
|
||||
- v1.1.0 (2026-07-01) — FULL pasa a ser el DEFAULT: se sustituye `max_rows=300000, sample=True` por `sample_frac=None` (None = todas las filas) + `max_rows=0` (tope duro opcional). El muestreo es opt-in explícito. Fetch acelerado via BigQuery Storage Read API (Arrow) con fallback REST. Preferencia estándar del usuario: los EDA se corren sobre el total salvo que se pida lo contrario.
|
||||
@@ -0,0 +1,157 @@
|
||||
"""load_bq_table_to_duckdb — adaptador BigQuery -> DuckDB local para el grupo `eda`.
|
||||
|
||||
Trae una tabla o vista de Google BigQuery a un archivo DuckDB local (por defecto
|
||||
COMPLETA — todas las filas — o una muestra si se pasa `sample_frac`), de modo que
|
||||
las funciones del grupo de capacidad `eda` (que perfilan DuckDB/PostgreSQL)
|
||||
puedan analizarla sin un adaptador BigQuery nativo. Materializa una sola tabla
|
||||
DuckDB desde un DataFrame de pandas.
|
||||
|
||||
Modo por defecto = FULL: `sample_frac=None` trae la vista/tabla entera (preferencia
|
||||
estándar del usuario: los EDA se corren sobre el total salvo que se pida lo
|
||||
contrario). El muestreo es opt-in explícito: `sample_frac=0.05` trae ~5 %; `max_rows`
|
||||
es un tope duro opcional (0 = sin tope). El fetch usa el BigQuery Storage Read API
|
||||
(Arrow) cuando está disponible, con fallback al conversor REST.
|
||||
|
||||
Seudonimización LOPDGDD/RGPD: las columnas listadas en `pseudonymize_cols` se
|
||||
transforman con un hash SHA-1 truncado ANTES de escribir a disco, preservando
|
||||
nulos, cardinalidad y patrón de faltantes pero sin volcar el valor real (DNI,
|
||||
nombre, email, teléfono, etc.). El EDA conserva su valor analítico sin incrustar
|
||||
datos personales reales.
|
||||
|
||||
Autenticación: ADC (gcloud auth). Aplica creds.with_quota_project(None) para
|
||||
evitar el 403 USER_PROJECT_DENIED cuando el ADC lleva quota project ajeno.
|
||||
|
||||
Estilo dict-no-throw del grupo `eda`: nunca lanza; devuelve {status:'error', ...}.
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import re
|
||||
|
||||
_FQN_RE = re.compile(r"^[A-Za-z0-9_.\-]+$")
|
||||
|
||||
|
||||
def _pseudonymize_series(values):
|
||||
"""Hash SHA-1 truncado (12 hex) de cada valor no nulo; conserva None/NaN."""
|
||||
import pandas as pd
|
||||
out = []
|
||||
for v in values:
|
||||
if v is None or (isinstance(v, float) and pd.isna(v)) or (
|
||||
not isinstance(v, (list, dict)) and pd.isna(v) if _safe_isna(v) else False
|
||||
):
|
||||
out.append(None)
|
||||
else:
|
||||
h = hashlib.sha1(str(v).encode("utf-8")).hexdigest()[:12]
|
||||
out.append(h)
|
||||
return out
|
||||
|
||||
|
||||
def _safe_isna(v):
|
||||
import pandas as pd
|
||||
try:
|
||||
return bool(pd.isna(v))
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
|
||||
|
||||
def load_bq_table_to_duckdb(
|
||||
table_fqn: str,
|
||||
duckdb_path: str,
|
||||
dest_table: str = "",
|
||||
sample_frac: float = None,
|
||||
max_rows: int = 0,
|
||||
project_id: str = "",
|
||||
pseudonymize_cols: list = None,
|
||||
) -> dict:
|
||||
try:
|
||||
import duckdb
|
||||
import google.auth
|
||||
from google.cloud import bigquery
|
||||
|
||||
if not table_fqn or not _FQN_RE.match(table_fqn):
|
||||
return {"status": "error", "error": f"table_fqn inválido: {table_fqn!r}"}
|
||||
|
||||
# dest_table: derivar del último segmento del FQN si no se pasa.
|
||||
dest = dest_table or table_fqn.split(".")[-1]
|
||||
if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", dest):
|
||||
dest = re.sub(r"[^A-Za-z0-9_]", "_", dest) or "bq_table"
|
||||
|
||||
# Auth ADC con fix de quota project (403 USER_PROJECT_DENIED).
|
||||
creds, adc_project = google.auth.default(
|
||||
scopes=["https://www.googleapis.com/auth/bigquery"]
|
||||
)
|
||||
if hasattr(creds, "with_quota_project"):
|
||||
creds = creds.with_quota_project(None)
|
||||
proj = project_id or table_fqn.split(".")[0] or adc_project
|
||||
client = bigquery.Client(project=proj, credentials=creds)
|
||||
|
||||
# Conteo de filas de origen.
|
||||
cnt = client.query(
|
||||
f"SELECT COUNT(*) AS n FROM `{table_fqn}`"
|
||||
).result()
|
||||
n_source = 0
|
||||
for row in cnt:
|
||||
n_source = int(row["n"])
|
||||
|
||||
# Modo por defecto = FULL (sample_frac=None -> todas las filas). El
|
||||
# muestreo es opt-in: sample_frac in (0,1) muestrea esa fracción con
|
||||
# `WHERE rand() < frac` (aplicable a tablas y vistas; TABLESAMPLE no va
|
||||
# en vistas). max_rows>0 es un tope duro opcional (LIMIT); 0 = sin tope.
|
||||
sampled = False
|
||||
where = ""
|
||||
if sample_frac is not None and 0 < float(sample_frac) < 1:
|
||||
where = f" WHERE rand() < {float(sample_frac)}"
|
||||
sampled = True
|
||||
limit = f" LIMIT {int(max_rows)}" if max_rows and int(max_rows) > 0 else ""
|
||||
sql = f"SELECT * FROM `{table_fqn}`{where}{limit}"
|
||||
|
||||
# Fetch: BigQuery Storage Read API (Arrow, rápido para millones de filas)
|
||||
# con fallback al conversor REST si la lib no está o falla.
|
||||
try:
|
||||
df = client.query(sql).result().to_dataframe(create_bqstorage_client=True)
|
||||
except Exception: # noqa: BLE001
|
||||
df = client.query(sql).result().to_dataframe(create_bqstorage_client=False)
|
||||
n_fetched = len(df)
|
||||
|
||||
# Normalizar dtypes de db-dtypes: el conversor REST de BigQuery mapea las
|
||||
# columnas DATE/TIME a las extension dtypes `dbdate`/`dbtime` de db-dtypes,
|
||||
# que DuckDB NO reconoce al registrar el DataFrame ("Data type 'dbdate' not
|
||||
# recognized"). Se convierten a tipos estándar que DuckDB sí ingiere: DATE
|
||||
# -> datetime64[ns], TIME -> string. El resto de dtypes (datetime64 de
|
||||
# TIMESTAMP, Int64/boolean nullable, object) los acepta DuckDB tal cual.
|
||||
import pandas as pd
|
||||
for col in df.columns:
|
||||
dt = str(df[col].dtype)
|
||||
if dt == "dbdate":
|
||||
df[col] = pd.to_datetime(df[col], errors="coerce")
|
||||
elif dt == "dbtime":
|
||||
df[col] = df[col].astype("string").astype(object)
|
||||
|
||||
# Seudonimización de columnas PII antes de escribir a disco.
|
||||
pseudo_applied = []
|
||||
for col in (pseudonymize_cols or []):
|
||||
if col in df.columns:
|
||||
df[col] = _pseudonymize_series(df[col].tolist())
|
||||
pseudo_applied.append(col)
|
||||
|
||||
# Materializar a DuckDB (una tabla desde el DataFrame).
|
||||
con = duckdb.connect(duckdb_path)
|
||||
try:
|
||||
con.register("_src_df", df)
|
||||
con.execute(f'CREATE OR REPLACE TABLE "{dest}" AS SELECT * FROM _src_df')
|
||||
con.unregister("_src_df")
|
||||
finally:
|
||||
con.close()
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"duckdb_path": duckdb_path,
|
||||
"table": dest,
|
||||
"n_rows_source": n_source,
|
||||
"n_rows_fetched": n_fetched,
|
||||
"sampled": sampled,
|
||||
"sample_frac": float(sample_frac) if sampled else None,
|
||||
"columns": list(df.columns),
|
||||
"pseudonymized": pseudo_applied,
|
||||
}
|
||||
except Exception as e: # noqa: BLE001
|
||||
return {"status": "error", "error": str(e)}
|
||||
Reference in New Issue
Block a user