chore: auto-commit (26 archivos)
- python/functions/bigquery/bq_auth.md - python/functions/bigquery/bq_load_from_file.md - python/functions/bigquery/bq_load_from_gcs.md - python/functions/bigquery/client.py - python/functions/bigquery/queries.py - python/functions/datascience/__init__.py - python/functions/datascience/decode_qr_image.py - python/functions/datascience/load_bq_table_to_duckdb.md - python/functions/datascience/load_bq_table_to_duckdb.py - python/functions/pipelines/profile_bq_table.md - ... Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,133 @@
|
||||
---
|
||||
name: profile_bq_dataset
|
||||
kind: pipeline
|
||||
lang: py
|
||||
domain: pipelines
|
||||
purity: impure
|
||||
version: "1.1.0"
|
||||
signature: "def profile_bq_dataset(project_id: str, dataset: str, tables: list = None, include_views: bool = False, sample_frac: float = None, max_rows: int = 0, pseudonymize_cols: dict = None, report_dir: str = \"reports\", duckdb_path: str = \"\", keep_duckdb: bool = True, min_inclusion: float = 0.9, emit_pdf: bool = True, run_llm: bool = False) -> dict"
|
||||
description: "EDA one-shot de un dataset BigQuery ENTERO con descubrimiento cross-tabla: materializa CADA tabla del dataset (COMPLETA por defecto; muestreo opt-in con sample_frac; seudonimizacion PII por tabla, LOPDGDD/RGPD) en UN MISMO DuckDB compartido con load_bq_table_to_duckdb, lo perfila entero con profile_database (perfiles por tabla + relaciones FK inter-tabla por containment + join graph Mermaid + report markdown/JSON + PDF movil), y construye el diccionario de columnas del dataset con build_column_dictionary. Es el analogo BigQuery de profile_database a nivel de dataset, resuelto por composicion estricta (list_bq_dataset_tables -> load_bq_table_to_duckdb x N -> profile_database -> build_column_dictionary) sin duplicar descubrimiento, perfilado ni inferencia de relaciones. Es el hazme un EDA de este dataset BigQuery entero y descubre como se relacionan sus tablas, en una sola llamada."
|
||||
tags: [eda, bigquery, relations, launcher]
|
||||
uses_functions:
|
||||
- list_bq_dataset_tables_py_datascience
|
||||
- load_bq_table_to_duckdb_py_datascience
|
||||
- profile_database_py_pipelines
|
||||
- build_column_dictionary_py_datascience
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: error_go_core
|
||||
imports: []
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "python/functions/pipelines/profile_bq_dataset.py"
|
||||
params:
|
||||
- name: project_id
|
||||
desc: "Proyecto GCP (facturacion + primer segmento del FQN). Ej: 'autingo-159109'."
|
||||
- name: dataset
|
||||
desc: "Dataset BigQuery a perfilar entero. Ej: 'customer_marts'."
|
||||
- name: tables
|
||||
desc: "Lista de NOMBRES de tabla del dataset. None (DEFAULT) = todas las del dataset (filtradas por include_views)."
|
||||
- name: include_views
|
||||
desc: "Si True incluye las VIEW ademas de las BASE TABLE cuando tables=None. Default False (solo BASE TABLE, coherente con profile_database que salta las VIEWs)."
|
||||
- name: sample_frac
|
||||
desc: "None (DEFAULT) = FULL, perfila TODAS las filas de cada tabla. Un float en (0,1) activa el muestreo opt-in por tabla (`WHERE rand() < frac`)."
|
||||
- name: max_rows
|
||||
desc: "Tope duro de filas por tabla (LIMIT). 0 (DEFAULT) = sin tope. Se aplica a cada tabla materializada."
|
||||
- name: pseudonymize_cols
|
||||
desc: "Dict {\"tabla\": [\"col1\", \"col2\"]} de columnas PII a seudonimizar (hash) por tabla ANTES de materializar (LOPDGDD/RGPD [POL-MMNSEG-001-1.0]). Preserva nulos y cardinalidad."
|
||||
- name: report_dir
|
||||
desc: "Directorio de salida de los reports + del DuckDB compartido por defecto. Default 'reports' (artefacto local gitignored). Se crea si no existe."
|
||||
- name: duckdb_path
|
||||
desc: "Ruta del DuckDB compartido donde se materializan todas las tablas. Vacio (DEFAULT) = report_dir/eda_bq_<dataset>.duckdb (se limpia si ya existia)."
|
||||
- name: keep_duckdb
|
||||
desc: "Si True (DEFAULT) conserva el DuckDB materializado: es el artefacto explorable post-EDA (notebook Jupyter, joins ad-hoc). Con False se borra al terminar."
|
||||
- name: min_inclusion
|
||||
desc: "Umbral de inclusion (0-1) para emitir una FK candidata cross-tabla (se pasa a profile_database -> infer_fk_containment_duckdb). Default 0.9."
|
||||
- name: emit_pdf
|
||||
desc: "Si True (DEFAULT) emite el PDF movil DB-level (resumen de tablas + relaciones FK + join graph). Se pasa a profile_database."
|
||||
- name: run_llm
|
||||
desc: "Si True (default False) activa la capa LLM interpretativa por tabla (se pasa a profile_database -> profile_table): una llamada LLM por tabla sobre el perfil agregado, nunca filas crudas."
|
||||
output: "dict dict-no-throw. En exito {status:'ok', project_id, dataset, n_tables_loaded, n_tables_profiled, tables_skipped:[...], errors:[...], duckdb_path, db_profile:<DatabaseProfile con tables[resumen], table_profiles[completos], fk_candidates, join_graph{nodes,edges,mermaid,hubs}>, column_dictionary:{entries,pii_columns} (sin markdown), report_md_path, report_json_path, report_pdf_path, dict_md_path, dict_json_path}. En error {status:'error', error, stage}."
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
from pipelines.profile_bq_dataset import profile_bq_dataset
|
||||
|
||||
# FULL por defecto: EDA del dataset ENTERO (todas las tablas, todas las filas),
|
||||
# con FK cross-tabla + join graph + diccionario de columnas. El DuckDB compartido
|
||||
# queda en reports/eda_bq_customer_marts.duckdb para seguir explorando.
|
||||
r = profile_bq_dataset("autingo-159109", "customer_marts")
|
||||
print(r["n_tables_loaded"], "tablas materializadas,", r["n_tables_profiled"], "perfiladas")
|
||||
print("FKs:", [f"{fk['from_table']}.{fk['from_col']}->{fk['to_table']}.{fk['to_col']}"
|
||||
for fk in r["db_profile"]["fk_candidates"]])
|
||||
print(r["report_md_path"]); print(r["report_pdf_path"]); print(r["dict_md_path"])
|
||||
print("DuckDB explorable:", r["duckdb_path"])
|
||||
|
||||
# Dataset con tablas enormes: muestreo opt-in + PII seudonimizada por tabla.
|
||||
r = profile_bq_dataset(
|
||||
"autingo-159109",
|
||||
"customer_marts",
|
||||
sample_frac=0.05,
|
||||
pseudonymize_cols={
|
||||
"customer_profile": ["document_number", "full_name", "email", "phone"],
|
||||
},
|
||||
)
|
||||
```
|
||||
|
||||
## Cuando usarla
|
||||
|
||||
Cuando pidan un EDA de un DATASET BigQuery entero y no solo de una tabla: quieres
|
||||
el perfil de todas sus tablas MAS su esquema relacional (que tabla referencia a
|
||||
cual, con que cardinalidad) descubierto cross-tabla en una sola llamada. Es el
|
||||
escalon a nivel de dataset sobre `profile_bq_table` (una tabla) y el adaptador
|
||||
BigQuery de `profile_database` (una base DuckDB). Usala al recibir un dataset
|
||||
BigQuery desconocido, para documentar un data mart, para descubrir el star schema
|
||||
(las tablas hub del join graph) o antes de escribir joins sin tener el modelo
|
||||
declarado. Para datasets con tablas enormes, pasa `sample_frac` o `max_rows` y
|
||||
dejalo declarado en el report.
|
||||
|
||||
## Gotchas
|
||||
|
||||
- Impura: requiere ADC de BigQuery configurado (Application Default Credentials).
|
||||
Si el ADC del usuario lleva un quota project ajeno, `load_bq_table_to_duckdb` /
|
||||
`list_bq_dataset_tables` aplican `creds.with_quota_project(None)` para evitar el
|
||||
403 USER_PROJECT_DENIED — remitido a los gotchas de esas funciones.
|
||||
- Coste de traer un DATASET entero: FULL por defecto materializa TODAS las filas
|
||||
de CADA tabla a RAM (via BigQuery Storage Read API/Arrow) antes de volcar al
|
||||
DuckDB compartido. Un dataset con varias tablas de millones de filas puede
|
||||
costar en tiempo, bytes escaneados de BigQuery y GBs de RAM/disco. Acota con
|
||||
`sample_frac` in (0,1) (muestreo opt-in por tabla) o `max_rows` (tope duro por
|
||||
tabla). Si por limite de recursos no cabe el total, dilo explicito en el report
|
||||
con el maximo que si se cargo.
|
||||
- El DuckDB compartido puede ocupar GBs: todas las tablas del dataset viven en un
|
||||
MISMO archivo (necesario para que la inferencia de FK opere cross-tabla). Con
|
||||
`keep_duckdb=True` (default) queda en disco como artefacto explorable; pasa
|
||||
`keep_duckdb=False` para borrarlo al terminar. Con `duckdb_path` explicito la
|
||||
ruta se respeta; el path por defecto se limpia al inicio para no mezclar tablas
|
||||
de corridas anteriores.
|
||||
- FK por containment es una HEURISTICA (falsos positivos/negativos posibles) y
|
||||
`profile_database` SALTA los pares de FK hacia tablas con mas de 200k filas (el
|
||||
lado caro del INTERSECT): esas relaciones quedan sin evaluar. Es un mapa de
|
||||
partida del esquema, no un DDL autoritativo.
|
||||
- Vistas excluidas por defecto (`include_views=False`, coherente con
|
||||
profile_database que salta VIEWs — perfilarlas infla n_tables y multiplica FK
|
||||
falsas). Pasa `include_views=True` solo si necesitas perfilarlas como si fueran
|
||||
tablas materializadas.
|
||||
- Seudonimiza PII con `pseudonymize_cols` (dict por tabla) para cumplir LOPDGDD/
|
||||
RGPD [POL-MMNSEG-001-1.0] ANTES de escribir a disco: nombres, DNI/NIE, email,
|
||||
telefono, direccion, IDs de cliente, IBAN, etc. Sin seudonimizar, el DuckDB
|
||||
compartido + los reports contienen datos personales reales.
|
||||
- Tolera fallos por tabla: si una carga falla, se anota en `errors[]` +
|
||||
`tables_skipped[]` y el pipeline sigue con las demas; `n_tables_profiled` cuenta
|
||||
solo las perfiladas con exito. Revisa `errors` para saber que quedo fuera.
|
||||
- Escribe a `report_dir` (default 'reports', artefacto local gitignored): el report
|
||||
DB-level de `profile_database` (markdown + JSON + PDF si emit_pdf) MAS el
|
||||
diccionario de columnas (`..._dict.md` + `..._dict.json`).
|
||||
|
||||
## Capability growth log
|
||||
|
||||
- v1.1.0 (2026-07-02) — añade `run_llm` (passthrough a profile_database -> profile_table: una llamada LLM por tabla sobre el perfil agregado). Default False, sin breaking changes.
|
||||
@@ -0,0 +1,263 @@
|
||||
"""profile_bq_dataset — EDA one-shot de un dataset BigQuery ENTERO (cross-tabla).
|
||||
|
||||
Pipeline impuro: perfila un dataset de Google BigQuery COMPLETO con descubrimiento
|
||||
cross-tabla (relaciones FK inter-tabla + join graph + diccionario de columnas). Es
|
||||
el analogo a nivel de dataset de `profile_bq_table` (que perfila UNA tabla) y el
|
||||
adaptador BigQuery de `profile_database` (que perfila una base DuckDB entera),
|
||||
resuelto por composicion estricta de funciones del registry — sin reimplementar
|
||||
ni el descubrimiento, ni el perfilado, ni la inferencia de relaciones.
|
||||
|
||||
Clave del diseno: materializa CADA tabla del dataset en UN MISMO archivo DuckDB
|
||||
compartido, para que la inferencia de FK por containment (que necesita todas las
|
||||
tablas en la misma base) opere cross-tabla. El DuckDB compartido queda como el
|
||||
artefacto explorable post-EDA (keep_duckdb=True por defecto).
|
||||
|
||||
Funciones del registry compuestas (NO se reimplementa su logica):
|
||||
- list_bq_dataset_tables : catalogo de tablas/vistas del dataset (descubrimiento).
|
||||
- load_bq_table_to_duckdb: materializa cada tabla BigQuery al DuckDB compartido
|
||||
(completa por defecto, muestra si sample_frac; PII
|
||||
seudonimizada por tabla antes de escribir a disco).
|
||||
- profile_database : perfila el DuckDB compartido entero (perfiles por
|
||||
tabla + FK por containment + join graph Mermaid +
|
||||
report markdown/JSON + PDF movil DB-level).
|
||||
- build_column_dictionary: diccionario de columnas buscable (tipo semantico, PII)
|
||||
a partir del DatabaseProfile ensamblado.
|
||||
|
||||
Modo por defecto = FULL: `sample_frac=None` trae cada tabla entera (preferencia
|
||||
estandar del usuario: los EDA se corren sobre el total salvo que se pida lo
|
||||
contrario). El muestreo es opt-in explicito por dataset: `sample_frac=0.05` trae
|
||||
~5 % de cada tabla; `max_rows` es un tope duro por tabla (0 = sin tope).
|
||||
|
||||
Seudonimizacion LOPDGDD/RGPD [POL-MMNSEG-001-1.0]: `pseudonymize_cols` es un dict
|
||||
`{"tabla": ["col1", "col2"]}` que se aplica por tabla ANTES de materializar.
|
||||
|
||||
Estilo dict-no-throw del grupo `eda`: nunca lanza; captura cualquier error y
|
||||
devuelve {status:'error', error, stage}. Los fallos por tabla individual se
|
||||
toleran: se anotan en errors[]/tables_skipped[] y se sigue con las demas.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from datascience import (
|
||||
build_column_dictionary,
|
||||
list_bq_dataset_tables,
|
||||
load_bq_table_to_duckdb,
|
||||
)
|
||||
from pipelines.profile_database import profile_database
|
||||
|
||||
|
||||
def profile_bq_dataset(
|
||||
project_id: str,
|
||||
dataset: str,
|
||||
tables: list = None,
|
||||
include_views: bool = False,
|
||||
sample_frac: float = None,
|
||||
max_rows: int = 0,
|
||||
pseudonymize_cols: dict = None,
|
||||
report_dir: str = "reports",
|
||||
duckdb_path: str = "",
|
||||
keep_duckdb: bool = True,
|
||||
min_inclusion: float = 0.9,
|
||||
emit_pdf: bool = True,
|
||||
run_llm: bool = False,
|
||||
) -> dict:
|
||||
"""EDA one-shot de un dataset BigQuery entero, con descubrimiento cross-tabla.
|
||||
|
||||
Materializa cada tabla del dataset a un DuckDB compartido, lo perfila entero
|
||||
con `profile_database` (perfiles por tabla + FK inter-tabla + join graph +
|
||||
reports + PDF) y construye el diccionario de columnas del dataset. Por defecto
|
||||
perfila TODAS las filas de cada tabla (`sample_frac=None`, modo FULL) y solo
|
||||
las BASE TABLE (las vistas se excluyen salvo `include_views=True`).
|
||||
|
||||
Args:
|
||||
project_id: proyecto GCP (facturacion + primer segmento del FQN).
|
||||
dataset: dataset BigQuery a perfilar.
|
||||
tables: lista de NOMBRES de tabla del dataset. None (default) = todas las
|
||||
del dataset (filtradas por include_views).
|
||||
include_views: si True incluye las VIEW ademas de las BASE TABLE cuando
|
||||
tables=None. Default False (solo BASE TABLE, coherente con
|
||||
profile_database que salta las VIEWs).
|
||||
sample_frac: None (default) = FULL, perfila todas las filas de cada tabla.
|
||||
Un float en (0,1) activa el muestreo opt-in por tabla.
|
||||
max_rows: tope duro de filas por tabla (LIMIT). 0 (default) = sin tope.
|
||||
pseudonymize_cols: dict {"tabla": ["col1", "col2"]} de columnas PII a
|
||||
seudonimizar (hash) por tabla ANTES de materializar (LOPDGDD/RGPD).
|
||||
report_dir: directorio de salida de los reports + del DuckDB por defecto.
|
||||
duckdb_path: ruta del DuckDB compartido. Vacio = report_dir/eda_bq_<dataset>.duckdb.
|
||||
keep_duckdb: si True (default) conserva el DuckDB materializado (es el
|
||||
artefacto explorable post-EDA). Con False se borra al terminar.
|
||||
min_inclusion: umbral de inclusion (0-1) para emitir una FK candidata
|
||||
(se pasa a profile_database -> infer_fk_containment_duckdb).
|
||||
emit_pdf: si True (default) emite el PDF movil DB-level (se pasa a
|
||||
profile_database).
|
||||
run_llm: si True (default False) activa la capa LLM interpretativa por
|
||||
tabla (se pasa a profile_database -> profile_table; una llamada LLM
|
||||
por tabla sobre el perfil agregado, nunca filas crudas).
|
||||
|
||||
Returns:
|
||||
dict dict-no-throw con el resultado del pipeline (ver output del .md).
|
||||
"""
|
||||
try:
|
||||
# 1) Resolver la lista de tablas a materializar como (nombre, fqn).
|
||||
if tables is None:
|
||||
lst = list_bq_dataset_tables(
|
||||
project_id, dataset, include_views=include_views
|
||||
)
|
||||
if lst.get("status") != "ok":
|
||||
return {
|
||||
"status": "error",
|
||||
"error": lst.get("error", "list_bq_dataset_tables fallo"),
|
||||
"stage": "list_tables",
|
||||
}
|
||||
table_specs = [
|
||||
(t["table"], t.get("fqn") or f"{project_id}.{dataset}.{t['table']}")
|
||||
for t in lst.get("tables", [])
|
||||
]
|
||||
elif isinstance(tables, list):
|
||||
table_specs = [
|
||||
(name, f"{project_id}.{dataset}.{name}") for name in tables
|
||||
]
|
||||
else:
|
||||
return {
|
||||
"status": "error",
|
||||
"error": "tables debe ser una lista de nombres o None",
|
||||
"stage": "list_tables",
|
||||
}
|
||||
|
||||
if not table_specs:
|
||||
return {
|
||||
"status": "error",
|
||||
"error": (
|
||||
"el dataset no tiene tablas que perfilar "
|
||||
"(revisa include_views / el nombre del dataset)"
|
||||
),
|
||||
"stage": "list_tables",
|
||||
}
|
||||
|
||||
# 2) Resolver el DuckDB compartido. Vacio -> report_dir/eda_bq_<dataset>.duckdb.
|
||||
os.makedirs(report_dir, exist_ok=True)
|
||||
created_default = False
|
||||
if not duckdb_path:
|
||||
duckdb_path = os.path.join(report_dir, f"eda_bq_{dataset}.duckdb")
|
||||
created_default = True
|
||||
# Si es el path por defecto y ya existe de una corrida previa, empezar
|
||||
# limpio para que no se mezclen tablas de datasets distintos en la base.
|
||||
if created_default and os.path.exists(duckdb_path):
|
||||
try:
|
||||
os.remove(duckdb_path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# 3) Materializar CADA tabla en el MISMO DuckDB (tolerando fallos).
|
||||
pseudo_map = pseudonymize_cols or {}
|
||||
loaded_tables = [] # nombres de tabla dentro del DuckDB
|
||||
tables_skipped = []
|
||||
errors = []
|
||||
for name, fqn in table_specs:
|
||||
load = load_bq_table_to_duckdb(
|
||||
fqn,
|
||||
duckdb_path,
|
||||
sample_frac=sample_frac,
|
||||
max_rows=max_rows,
|
||||
project_id=project_id,
|
||||
pseudonymize_cols=pseudo_map.get(name),
|
||||
)
|
||||
if load.get("status") == "ok":
|
||||
loaded_tables.append(load["table"])
|
||||
else:
|
||||
errors.append(
|
||||
{
|
||||
"table": name,
|
||||
"stage": "load",
|
||||
"error": load.get("error", "load fallo"),
|
||||
}
|
||||
)
|
||||
tables_skipped.append(name)
|
||||
|
||||
if not loaded_tables:
|
||||
return {
|
||||
"status": "error",
|
||||
"error": "ninguna tabla del dataset se pudo materializar",
|
||||
"stage": "load",
|
||||
"errors": errors,
|
||||
}
|
||||
|
||||
# 4) Perfilar el DuckDB compartido entero: perfiles por tabla + FK
|
||||
# cross-tabla + join graph + report markdown/JSON + PDF (si emit_pdf).
|
||||
prof = profile_database(
|
||||
duckdb_path,
|
||||
tables=loaded_tables,
|
||||
report_dir=report_dir,
|
||||
write_report=True,
|
||||
min_inclusion=min_inclusion,
|
||||
emit_pdf=emit_pdf,
|
||||
run_llm=run_llm,
|
||||
)
|
||||
if prof.get("status") != "ok":
|
||||
return {
|
||||
"status": "error",
|
||||
"error": prof.get("error", "profile_database fallo"),
|
||||
"stage": "profile",
|
||||
}
|
||||
db_profile = prof.get("db_profile", {})
|
||||
|
||||
# 5) Diccionario de columnas del dataset sobre el DatabaseProfile.
|
||||
dict_md_path = None
|
||||
dict_json_path = None
|
||||
column_dictionary = None
|
||||
dict_res = build_column_dictionary(db_profile)
|
||||
if dict_res.get("status") == "ok":
|
||||
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
|
||||
dict_md_path = os.path.join(
|
||||
report_dir, f"eda_bq_dataset_{dataset}_{ts}_dict.md"
|
||||
)
|
||||
dict_json_path = os.path.join(
|
||||
report_dir, f"eda_bq_dataset_{dataset}_{ts}_dict.json"
|
||||
)
|
||||
with open(dict_md_path, "w", encoding="utf-8") as fh:
|
||||
fh.write(dict_res.get("markdown", ""))
|
||||
# JSON sidecar del diccionario sin el markdown (compacto).
|
||||
dict_payload = {k: v for k, v in dict_res.items() if k != "markdown"}
|
||||
with open(dict_json_path, "w", encoding="utf-8") as fh:
|
||||
fh.write(
|
||||
json.dumps(dict_payload, ensure_ascii=False, indent=1, default=str)
|
||||
)
|
||||
column_dictionary = dict_payload
|
||||
else:
|
||||
errors.append(
|
||||
{
|
||||
"stage": "column_dictionary",
|
||||
"error": dict_res.get("error", "build_column_dictionary fallo"),
|
||||
}
|
||||
)
|
||||
|
||||
# 6) Limpieza del DuckDB compartido salvo que se pida conservarlo.
|
||||
final_duckdb = duckdb_path
|
||||
if not keep_duckdb and os.path.exists(duckdb_path):
|
||||
try:
|
||||
os.remove(duckdb_path)
|
||||
except OSError:
|
||||
pass
|
||||
final_duckdb = None
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"project_id": project_id,
|
||||
"dataset": dataset,
|
||||
"n_tables_loaded": len(loaded_tables),
|
||||
"n_tables_profiled": db_profile.get("n_tables", 0),
|
||||
"tables_skipped": tables_skipped,
|
||||
"errors": errors,
|
||||
"duckdb_path": final_duckdb,
|
||||
"db_profile": db_profile,
|
||||
"column_dictionary": column_dictionary,
|
||||
"report_md_path": prof.get("report_md_path"),
|
||||
"report_json_path": prof.get("report_json_path"),
|
||||
"report_pdf_path": prof.get("report_pdf_path"),
|
||||
"dict_md_path": dict_md_path,
|
||||
"dict_json_path": dict_json_path,
|
||||
}
|
||||
except Exception as e: # noqa: BLE001
|
||||
return {"status": "error", "error": str(e), "stage": "unexpected"}
|
||||
@@ -4,8 +4,8 @@ kind: pipeline
|
||||
lang: py
|
||||
domain: pipelines
|
||||
purity: impure
|
||||
version: "1.1.0"
|
||||
signature: "def profile_bq_table(table_fqn: str, sample_frac: float = None, max_rows: int = 0, pseudonymize_cols: list = None, run_models: bool = True, run_series: bool = False, run_llm: bool = False, project_id: str = \"\", report_dir: str = \"reports\", duckdb_path: str = \"\", keep_duckdb: bool = False) -> dict"
|
||||
version: "1.2.0"
|
||||
signature: "def profile_bq_table(table_fqn: str, sample_frac: float = None, max_rows: int = 0, pseudonymize_cols: list = None, run_models: bool = True, run_series: bool = False, run_llm: bool = False, project_id: str = \"\", report_dir: str = \"reports\", duckdb_path: str = \"\", keep_duckdb: bool = False, where_sql: str = \"\", select_sql: str = \"\") -> dict"
|
||||
description: "EDA one-shot de una tabla o vista de BigQuery: materializa el origen COMPLETO por defecto (todas las filas; muestreo opt-in con sample_frac; seudonimizacion PII opcional, LOPDGDD/RGPD) a un DuckDB local con load_bq_table_to_duckdb y lo perfila end-to-end con profile_table del grupo de capacidad eda, emitiendo el informe AutomaticEDA (PDF A5 movil + PPTX 16:9), Markdown y JSON sidecar. Es el adaptador BigQuery que faltaba en el grupo eda, resuelto por composicion (BigQuery -> DuckDB local -> profile_table) sin duplicar la logica de perfilado ni de render. Es el hazme un EDA de esta tabla BigQuery en una sola llamada, sobre el total de filas por defecto."
|
||||
tags: [eda, bigquery, launcher]
|
||||
uses_functions:
|
||||
@@ -43,7 +43,11 @@ params:
|
||||
desc: "Ruta DuckDB a usar. Vacio = temporal autogestionado."
|
||||
- name: keep_duckdb
|
||||
desc: "Si True conserva el DuckDB materializado (para el notebook Jupyter). Default False."
|
||||
output: "dict dict-no-throw. En exito {status:'ok', table_fqn, load:{n_rows_source,n_rows_fetched,sampled,sample_frac,pseudonymized,table}, duckdb_path, report_md_path, report_json_path, aeda_pdf_path, aeda_pptx_path, aeda_manifest_path, profile}. En error {status:'error', error, stage}."
|
||||
- name: where_sql
|
||||
desc: "Clausula WHERE SQL (sin la palabra WHERE) aplicada al origen y a su COUNT. Pass-through a load_bq_table_to_duckdb; se combina con sample_frac via AND. Ej: `fecha <= CURRENT_DATE() AND venta_n IS NOT NULL`. Se interpola tal cual: no usar con input no confiable."
|
||||
- name: select_sql
|
||||
desc: "Expresiones del SELECT (sin la palabra SELECT); vacio (DEFAULT) = `*`. Pass-through a load_bq_table_to_duckdb. Util para castear tipos problematicos (p. ej. BIGNUMERIC->FLOAT64) antes de perfilar. Se interpola tal cual: no usar con input no confiable."
|
||||
output: "dict dict-no-throw. En exito {status:'ok', table_fqn, load:{n_rows_source,n_rows_fetched,sampled,sample_frac,pseudonymized,table,streamed, where_sql?, select_sql?}, duckdb_path, report_md_path, report_json_path, aeda_pdf_path, aeda_pptx_path, aeda_manifest_path, profile}. En error {status:'error', error, stage}. where_sql/select_sql aparecen en load solo si vienen informados."
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
@@ -66,6 +70,16 @@ r = profile_bq_table(
|
||||
sample_frac=0.05,
|
||||
pseudonymize_cols=["document_number", "full_name", "email", "phone", "postal_code", "salesforce_customer_id"],
|
||||
)
|
||||
|
||||
# Filtrar el origen + castear una columna BIGNUMERIC antes de perfilar. where_sql y
|
||||
# select_sql se pasan al loader (que ingiere por batches Arrow, RAM acotada).
|
||||
r = profile_bq_table(
|
||||
"autingo-159109.data.ventas_39M",
|
||||
where_sql="fecha <= CURRENT_DATE() AND venta_n IS NOT NULL",
|
||||
select_sql="fecha, idCentro, CAST(importe_bignumeric AS FLOAT64) AS importe",
|
||||
run_models=True,
|
||||
)
|
||||
print(r["load"]["n_rows_fetched"], "filas, streamed=", r["load"].get("streamed"))
|
||||
```
|
||||
|
||||
## Cuando usarla
|
||||
@@ -83,12 +97,19 @@ en RAM, pasa `sample_frac` (muestreo opt-in).
|
||||
|
||||
- Impura: requiere ADC de BigQuery configurado (Application Default Credentials)
|
||||
para que `load_bq_table_to_duckdb` autentique contra el proyecto.
|
||||
- FULL por defecto: `sample_frac=None` perfila TODAS las filas del origen. Una
|
||||
vista de millones de filas se trae entera a RAM (varios GB posibles) antes de
|
||||
materializar en DuckDB; el fetch usa el BigQuery Storage Read API (Arrow) cuando
|
||||
esta disponible, mucho mas rapido que REST. Para acotar coste/memoria, pasa
|
||||
`sample_frac` in (0,1) (muestreo opt-in) o `max_rows` (tope duro). Si por limite
|
||||
de recursos no cabe el total, dilo explicito con el maximo que si se cargo.
|
||||
- FULL por defecto: `sample_frac=None` perfila TODAS las filas del origen. El
|
||||
loader `load_bq_table_to_duckdb` (v1.2.0) ingiere por batches Arrow ->
|
||||
DuckDB cuando `pyarrow` esta disponible, con la RAM acotada al tamano de un
|
||||
batch (una tabla de decenas de millones de filas cabe sin cargarse entera); si
|
||||
no, cae al camino DataFrame completo (todo en RAM, varios GB posibles). Para
|
||||
acotar coste/memoria pasa `sample_frac` in (0,1), `max_rows` (tope duro) o
|
||||
`where_sql` (filtra el origen). Si por limite de recursos no cabe el total,
|
||||
dilo explicito con el maximo que si se cargo.
|
||||
- `where_sql` / `select_sql` (pass-through al loader) se interpolan TAL CUAL en la
|
||||
query BigQuery: NO los construyas a partir de input no confiable (inyeccion SQL).
|
||||
`select_sql` es la via para castear columnas BIGNUMERIC (Arrow decimal256, que
|
||||
DuckDB no ingiere) a FLOAT64 antes de perfilar; si no las casteas, el loader
|
||||
devuelve `{status:'error', stage:'stream_schema'|'stream_insert'}`.
|
||||
- Seudonimiza PII con `pseudonymize_cols` para cumplir LOPDGDD/RGPD ANTES de
|
||||
escribir a disco: nombres, DNI/NIE, email, telefono, direccion, IDs de cliente,
|
||||
etc. Se hashean preservando nulos y cardinalidad. Sin seudonimizar, la muestra
|
||||
@@ -103,4 +124,5 @@ en RAM, pasa `sample_frac` (muestreo opt-in).
|
||||
|
||||
## Capability growth log
|
||||
|
||||
- v1.2.0 (2026-07-02) — Añade `where_sql` y `select_sql` como pass-through al loader `load_bq_table_to_duckdb`: filtran/proyectan el origen antes de perfilar (`where_sql` tambien acota el COUNT del origen; `select_sql` permite castear BIGNUMERIC->FLOAT64). Ambos se reflejan en el bloque `load` del retorno (solo si vienen informados), junto con la nueva clave `streamed`. Hereda del loader v1.2.0 la ingesta streaming Arrow -> DuckDB por batches (RAM acotada) para tablas que no caben en RAM, con fallback DataFrame completo.
|
||||
- v1.1.0 (2026-07-01) — FULL pasa a ser el DEFAULT del pipeline: se sustituye `max_rows=300000, sample=True` por `sample_frac=None` (None = perfila todas las filas) + `max_rows=0` (tope duro opcional). El muestreo es opt-in explicito (`sample_frac`). Alinea con la preferencia estandar del usuario: los EDA se corren sobre el total salvo que se pida lo contrario. Hereda el fetch acelerado (Arrow/bqstorage) de `load_bq_table_to_duckdb` v1.1.0.
|
||||
|
||||
@@ -42,6 +42,8 @@ def profile_bq_table(
|
||||
report_dir: str = "reports",
|
||||
duckdb_path: str = "",
|
||||
keep_duckdb: bool = False,
|
||||
where_sql: str = "",
|
||||
select_sql: str = "",
|
||||
) -> dict:
|
||||
"""EDA one-shot de una tabla/vista BigQuery.
|
||||
|
||||
@@ -63,6 +65,13 @@ def profile_bq_table(
|
||||
report_dir: Directorio de salida de los reports.
|
||||
duckdb_path: Ruta DuckDB a usar. Vacio = temporal autogestionado.
|
||||
keep_duckdb: Si True conserva el DuckDB materializado.
|
||||
where_sql: Clausula WHERE SQL (sin la palabra WHERE) aplicada al origen y a
|
||||
su COUNT. Pass-through a `load_bq_table_to_duckdb`. Ej:
|
||||
"fecha <= CURRENT_DATE() AND venta_n IS NOT NULL". Se interpola tal cual:
|
||||
no usar con input no confiable.
|
||||
select_sql: Expresiones del SELECT (sin la palabra SELECT); vacio = `*`.
|
||||
Pass-through a `load_bq_table_to_duckdb`. Util para castear tipos
|
||||
problematicos (p. ej. BIGNUMERIC->FLOAT64) antes de perfilar.
|
||||
|
||||
Returns:
|
||||
dict dict-no-throw con el resultado del pipeline (ver output del .md).
|
||||
@@ -83,6 +92,8 @@ def profile_bq_table(
|
||||
max_rows=max_rows,
|
||||
project_id=project_id,
|
||||
pseudonymize_cols=pseudonymize_cols,
|
||||
where_sql=where_sql,
|
||||
select_sql=select_sql,
|
||||
)
|
||||
if load.get("status") != "ok":
|
||||
return {
|
||||
@@ -111,14 +122,24 @@ def profile_bq_table(
|
||||
"load": load,
|
||||
}
|
||||
|
||||
load_block = {
|
||||
k: load[k]
|
||||
for k in (
|
||||
"n_rows_source", "n_rows_fetched", "sampled", "sample_frac",
|
||||
"pseudonymized", "table", "streamed",
|
||||
)
|
||||
if k in load
|
||||
}
|
||||
# Trazabilidad de los filtros de origen (solo si vienen informados).
|
||||
if where_sql:
|
||||
load_block["where_sql"] = where_sql
|
||||
if select_sql:
|
||||
load_block["select_sql"] = select_sql
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"table_fqn": table_fqn,
|
||||
"load": {
|
||||
k: load[k]
|
||||
for k in ("n_rows_source", "n_rows_fetched", "sampled", "sample_frac", "pseudonymized", "table")
|
||||
if k in load
|
||||
},
|
||||
"load": load_block,
|
||||
"duckdb_path": duckdb_path if keep_duckdb else None,
|
||||
"report_md_path": prof.get("report_md_path"),
|
||||
"report_json_path": prof.get("report_json_path"),
|
||||
|
||||
@@ -4,8 +4,8 @@ kind: pipeline
|
||||
lang: py
|
||||
domain: pipelines
|
||||
purity: impure
|
||||
version: "1.0.0"
|
||||
signature: "def profile_database(db_path: str, tables: list = None, sample: int = 5000, report_dir: str = \"reports\", write_report: bool = True, min_inclusion: float = 0.9) -> dict"
|
||||
version: "1.1.0"
|
||||
signature: "def profile_database(db_path: str, tables: list = None, sample: int = 5000, report_dir: str = \"reports\", write_report: bool = True, min_inclusion: float = 0.9, emit_pdf: bool = False, run_llm: bool = False) -> dict"
|
||||
description: "Orquestador one-shot del grupo eda a nivel de BASE: perfila TODA una base DuckDB (todas las tablas o las indicadas) componiendo profile_table por tabla, infiere las relaciones FK inter-tabla por containment y construye el join graph con diagrama Mermaid. Ensambla un DatabaseProfile (resumen por tabla + TableProfiles completos + fk_candidates + join_graph) y opcionalmente emite un report markdown DB-level + JSON sidecar. Es la composicion canonica para hazme un EDA de esta base de datos y entender su esquema relacional."
|
||||
tags: [eda, relations, duckdb, profiling, data-quality, pipeline, dataops]
|
||||
uses_functions:
|
||||
@@ -38,6 +38,10 @@ params:
|
||||
desc: "Si True (default) escribe report markdown DB-level + JSON sidecar timestamped en report_dir; si False no toca disco y los paths del retorno son None."
|
||||
- name: min_inclusion
|
||||
desc: "Umbral minimo de inclusion (0-1) para emitir una FK candidata (se pasa a infer_fk_containment_duckdb). Default 0.9."
|
||||
- name: emit_pdf
|
||||
desc: "Si True (default False) renderiza el PDF movil DB-level con render_eda_pdf_relational junto a los reports (report_pdf_path en el retorno)."
|
||||
- name: run_llm
|
||||
desc: "Si True (default False) activa la capa LLM interpretativa de profile_table para CADA tabla: una llamada LLM por tabla sobre el perfil agregado, nunca filas crudas."
|
||||
output: "dict {status:'ok', db_profile:<DatabaseProfile con db_path, profiled_at, n_tables, tables[resumen], table_profiles[completos], fk_candidates, join_graph{nodes,edges,mermaid,hubs}, errors>, report_md_path:str|None, report_json_path:str|None} o {status:'error', error:str} (dict-no-throw)."
|
||||
---
|
||||
|
||||
@@ -101,3 +105,7 @@ se infieren las FK y se dibuja el diagrama de relaciones.
|
||||
perfiladas con exito. Revisa `errors` para saber que quedo fuera.
|
||||
- `db_path` debe existir: DuckDB read-only NO crea la base. El muestreo de cada
|
||||
tabla usa el sandbox read-only por defecto (sin acceso a FS/red).
|
||||
|
||||
## Capability growth log
|
||||
|
||||
- v1.1.0 (2026-07-02) — añade `run_llm` (passthrough a profile_table: capa LLM interpretativa por tabla) y documenta `emit_pdf` en el frontmatter (existía en el código desde el renderer relational). Sin breaking changes: ambos default False.
|
||||
|
||||
@@ -121,6 +121,7 @@ def profile_database(
|
||||
write_report: bool = True,
|
||||
min_inclusion: float = 0.9,
|
||||
emit_pdf: bool = False,
|
||||
run_llm: bool = False,
|
||||
) -> dict:
|
||||
"""Perfila una base DuckDB entera + sus relaciones inter-tabla.
|
||||
|
||||
@@ -141,6 +142,9 @@ def profile_database(
|
||||
render_eda_pdf_relational (resumen de tablas + relaciones FK + join
|
||||
graph) junto a los reports y devuelve su ruta en report_pdf_path. Con
|
||||
False no se toca el PDF (retrocompatible) y report_pdf_path es None.
|
||||
run_llm: si True (default False) activa la capa LLM interpretativa de
|
||||
profile_table para CADA tabla (una llamada LLM por tabla sobre el
|
||||
perfil agregado, nunca filas crudas).
|
||||
|
||||
Returns:
|
||||
dict dict-no-throw. En exito:
|
||||
@@ -177,7 +181,9 @@ def profile_database(
|
||||
|
||||
# 2) Perfilar cada tabla (tolerando fallos individuales).
|
||||
for table in tables:
|
||||
r = profile_table(db_path, table, sample=sample, write_report=False)
|
||||
r = profile_table(
|
||||
db_path, table, sample=sample, write_report=False, run_llm=run_llm
|
||||
)
|
||||
if r.get("status") == "ok":
|
||||
prof = r["profile"]
|
||||
table_profiles.append(prof)
|
||||
|
||||
@@ -0,0 +1,103 @@
|
||||
---
|
||||
name: run_sales_forecast
|
||||
kind: pipeline
|
||||
lang: py
|
||||
domain: pipelines
|
||||
purity: impure
|
||||
version: "1.1.0"
|
||||
signature: "def run_sales_forecast(as_of: str = '', horizon: int = 7, model: str = 'baseline_v1', author: str = 'egutierrez', dry_run: bool = False) -> dict"
|
||||
description: "Forecast diario de ventas Aurgi (dia x centro x subcategoria CGQ) escrito en BigQuery autingo-159109.sales_forecast.predictions, en una sola llamada. Compone funciones del registry: bq_auth(drop_quota_project=True) para el cliente sin quota project ajeno, bq_query para leer la historia agregada del mart bi_ventas_mart.base_margenes_aa (18 semanas, venta_n saneado) y ejecutar el DELETE de idempotencia, forecast_seasonal_median (modelo PURO mediana estacional + tendencia acotada) para generar todas las predicciones, y bq_load_from_file para cargar el JSONL a la tabla de predicciones. Historia utilizable hasta as_of-1 (el dia as_of esta parcial cuando corre el cron a las 21:00); predice as_of+1..as_of+horizon; run_date=as_of. Solo predice series activas (venta>0 en las ultimas 8 semanas). Idempotente por (run_date, model, author). --dry-run no escribe."
|
||||
tags: [forecast, bigquery, sales, aurgi, pipeline, launcher]
|
||||
uses_functions:
|
||||
- forecast_seasonal_median_py_datascience
|
||||
- bq_auth_py_infra
|
||||
- bq_query_py_infra
|
||||
- bq_load_from_file_py_infra
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: error_go_core
|
||||
imports: [google-cloud-bigquery]
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "python/functions/pipelines/run_sales_forecast.py"
|
||||
params:
|
||||
- name: as_of
|
||||
desc: "fecha de corte 'YYYY-MM-DD' (dia de la corrida). Vacio (DEFAULT) = hoy. La historia utilizable llega hasta as_of-1 dia (el dia as_of esta parcial en el cron 21:00); se predice as_of+1..as_of+horizon; run_date=as_of"
|
||||
- name: horizon
|
||||
desc: "numero de dias futuros a predecir a partir de as_of+1. Default 7"
|
||||
- name: model
|
||||
desc: "etiqueta del modelo escrita en la columna model de cada fila. Default 'baseline_v1'. Forma parte de la clave de idempotencia"
|
||||
- name: author
|
||||
desc: "autor de la corrida (columna author). Default 'egutierrez'. Forma parte de la clave de idempotencia"
|
||||
- name: dry_run
|
||||
desc: "si True no escribe en BigQuery (ni DELETE ni load): devuelve el resumen + una muestra de 5 filas. Default False"
|
||||
output: "dict dict-no-throw. En exito {status:'ok', run_date, series:N (series activas), rows:N (filas predichas), model, author, rows_loaded, job_id}; con dry_run=True incluye sample:[5 filas] y omite rows_loaded/job_id. En error {status:'error', error, stage}. Por stdout imprime el JSON del resumen; exit 0 si ok, 1 si error"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```bash
|
||||
# Corrida real (cron 21:00): predice los 7 dias siguientes a hoy y carga a BigQuery.
|
||||
./fn run run_sales_forecast
|
||||
|
||||
# Fecha de corte y horizonte explicitos, sin escribir (revisar la muestra):
|
||||
./fn run run_sales_forecast --as-of 2026-07-01 --horizon 7 --dry-run
|
||||
|
||||
# Modelo alternativo (clave de idempotencia distinta: no pisa baseline_v1):
|
||||
./fn run run_sales_forecast --model baseline_v2 --author egutierrez
|
||||
```
|
||||
|
||||
```python
|
||||
# Uso programatico (venv del proyecto, PYTHONPATH=python/functions):
|
||||
from pipelines.run_sales_forecast import run_sales_forecast
|
||||
|
||||
r = run_sales_forecast(as_of="2026-07-01", horizon=7, dry_run=True)
|
||||
print(r["series"], "series activas,", r["rows"], "filas")
|
||||
for row in r["sample"]:
|
||||
print(row["forecast_date"], row["center_id"], row["subcat_cgq"], row["y_pred"])
|
||||
```
|
||||
|
||||
## Cuando usarla
|
||||
|
||||
Cuando quieras (re)generar el forecast diario de ventas Aurgi por centro y
|
||||
subcategoria CGQ y dejarlo en `autingo-159109.sales_forecast.predictions` en una
|
||||
sola llamada. Es el pipeline que dispara el cron nocturno (21:00): lee la historia
|
||||
del mart, aplica el baseline estacional, y carga las predicciones de forma
|
||||
idempotente. Usa `--dry-run` para inspeccionar la muestra antes de escribir, o
|
||||
para probar tras un cambio en el mart o en el modelo. Cambia `--model` para probar
|
||||
una variante sin pisar las predicciones del modelo actual (la clave de
|
||||
idempotencia es run_date + model + author).
|
||||
|
||||
## Gotchas
|
||||
|
||||
- Impura: requiere ADC de BigQuery configurado (`gcloud auth application-default
|
||||
login`) con acceso a `autingo-159109`. Usa `bq_auth(drop_quota_project=True)`
|
||||
para descartar el quota project del ADC del usuario `egutierrez` y evitar el
|
||||
`403 USER_PROJECT_DENIED` (gotcha conocido del repo).
|
||||
- Escribe en produccion: en modo real hace `DELETE` de las predicciones previas de
|
||||
`(run_date, model, author)` y luego carga (WRITE_APPEND). Es idempotente para esa
|
||||
combinacion: re-ejecutar la misma corrida no duplica. Cambiar `model` o `author`
|
||||
crea un conjunto de predicciones paralelo. Usa `--dry-run` si solo quieres mirar.
|
||||
- La tabla `sales_forecast.predictions` debe existir con schema fijo y con las
|
||||
columnas exactas que emite el pipeline: `run_ts` (TIMESTAMP), `run_date` (DATE),
|
||||
`forecast_date` (DATE), `lag_days` (INT64), `center_id` (STRING), `center_name`
|
||||
(STRING), `ambito` (STRING), `subcat_cgq` (STRING), `model` (STRING), `author`
|
||||
(STRING), `y_pred` (FLOAT64). El load usa `autodetect=False`: los nombres del
|
||||
JSONL deben coincidir con los de la tabla o el load falla.
|
||||
- `center_id` se emite como STRING (str(idCentro)); `subcat_cgq` toma el valor de
|
||||
la columna `subcat_cqq` del mart (el nombre difiere entre origen y destino a
|
||||
proposito). center_name/ambito son los ultimos conocidos por serie (fecha maxima).
|
||||
- Historia hasta as_of-1: el dia `as_of` NO entra en la historia (esta parcial en
|
||||
el cron de las 21:00). Si necesitas incluir el dia en curso, pasa `--as-of` con
|
||||
el dia siguiente.
|
||||
- Solo predice series con venta > 0 en las ultimas 8 semanas: las series muertas se
|
||||
omiten (no aparecen en la tabla). `series` en la salida cuenta las activas.
|
||||
- Guarda 18 semanas de historia del mart: cubre la ventana estacional (8 semanas
|
||||
del mismo dia) mas la tendencia (4+4 semanas) con margen. venta_n se filtra
|
||||
`ABS < 1e9` para descartar las filas veneno del mart.
|
||||
|
||||
## Capability growth log
|
||||
|
||||
- v1.1.0 (2026-07-02) — añade paso 9: refresh de `sales_forecast.actuals_daily` (tabla física de venta real, ventana móvil de 10 días) tras cargar las predicciones; `forecast_eval` y el dashboard de competición comparan contra ella.
|
||||
@@ -0,0 +1,298 @@
|
||||
"""run_sales_forecast — forecast diario de ventas Aurgi a BigQuery (one-shot).
|
||||
|
||||
Pipeline IMPURO que produce el forecast diario de ventas (dia x centro x
|
||||
subcategoria CGQ) y lo escribe en `autingo-159109.sales_forecast.predictions`.
|
||||
Compone funciones del registry sin reimplementar su logica:
|
||||
|
||||
- bq_auth(..., drop_quota_project=True): cliente BigQuery sin quota project ajeno
|
||||
(evita el 403 USER_PROJECT_DENIED del ADC del usuario).
|
||||
- bq_query: lee la historia agregada del mart `bi_ventas_mart.base_margenes_aa`
|
||||
y ejecuta el DELETE de idempotencia (parametros tipados).
|
||||
- forecast_seasonal_median: modelo PURO (mediana estacional + tendencia acotada)
|
||||
que genera todas las predicciones de golpe.
|
||||
- bq_load_from_file: carga las filas (JSONL) a la tabla de predicciones.
|
||||
|
||||
Cron previsto: 21:00. Por eso la historia utilizable llega hasta as_of - 1 dia
|
||||
(el dia as_of aun esta parcial) y se predice as_of + 1 .. as_of + horizon.
|
||||
|
||||
Estilo dict-no-throw: nunca lanza; captura errores y devuelve
|
||||
{status:'error', error, stage}. Idempotente por (run_date, model, author):
|
||||
borra las predicciones previas de esa combinacion antes de cargar.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
from bigquery import bq_auth, bq_query, bq_load_from_file
|
||||
from datascience import forecast_seasonal_median
|
||||
|
||||
PROJECT_ID = "autingo-159109"
|
||||
SOURCE_TABLE = "autingo-159109.bi_ventas_mart.base_margenes_aa"
|
||||
DEST_DATASET = "sales_forecast"
|
||||
DEST_TABLE = "predictions"
|
||||
|
||||
HISTORY_SQL = f"""
|
||||
SELECT fecha, idCentro, subcat_cqq,
|
||||
ANY_VALUE(NombreCentro) AS center_name, ANY_VALUE(Ambito) AS ambito,
|
||||
SUM(CAST(venta_n AS FLOAT64)) AS venta
|
||||
FROM `{SOURCE_TABLE}`
|
||||
WHERE fecha BETWEEN DATE_SUB(@as_of, INTERVAL 18 WEEK) AND DATE_SUB(@as_of, INTERVAL 1 DAY)
|
||||
AND venta_n IS NOT NULL AND ABS(CAST(venta_n AS FLOAT64)) < 1e9
|
||||
AND subcat_cqq IS NOT NULL AND idCentro IS NOT NULL
|
||||
GROUP BY fecha, idCentro, subcat_cqq
|
||||
"""
|
||||
|
||||
DELETE_SQL = (
|
||||
f"DELETE FROM `{PROJECT_ID}.{DEST_DATASET}.{DEST_TABLE}` "
|
||||
"WHERE run_date = @d AND model = @m AND author = @a"
|
||||
)
|
||||
|
||||
# Refresh de la tabla fisica de reales (sales_forecast.actuals_daily), consumida
|
||||
# por la vista forecast_eval y por los dashboards de competicion. Ventana movil
|
||||
# para recoger correcciones retroactivas del mart.
|
||||
ACTUALS_DELETE_SQL = (
|
||||
f"DELETE FROM `{PROJECT_ID}.{DEST_DATASET}.actuals_daily` "
|
||||
"WHERE fecha BETWEEN DATE_SUB(@as_of, INTERVAL @w DAY) AND DATE_SUB(@as_of, INTERVAL 1 DAY)"
|
||||
)
|
||||
|
||||
ACTUALS_INSERT_SQL = f"""
|
||||
INSERT INTO `{PROJECT_ID}.{DEST_DATASET}.actuals_daily`
|
||||
(fecha, center_id, center_name, ambito, subcat_cgq, y_real, unidades, loaded_ts)
|
||||
SELECT forecast_date, IFNULL(center_id, 'SIN_CENTRO'), center_name, ambito,
|
||||
IFNULL(subcat_cgq, 'Sin subcategoria'),
|
||||
y_real, unidades, CURRENT_TIMESTAMP()
|
||||
FROM `{PROJECT_ID}.{DEST_DATASET}.actuals`
|
||||
WHERE forecast_date BETWEEN DATE_SUB(@as_of, INTERVAL @w DAY) AND DATE_SUB(@as_of, INTERVAL 1 DAY)
|
||||
"""
|
||||
|
||||
|
||||
def _refresh_actuals(client, as_of: date, window_days: int = 10) -> None:
|
||||
"""Rehace los ultimos `window_days` dias de actuals_daily desde la vista actuals."""
|
||||
params = [
|
||||
{"name": "as_of", "type": "DATE", "value": as_of},
|
||||
{"name": "w", "type": "INT64", "value": window_days},
|
||||
]
|
||||
bq_query(client, ACTUALS_DELETE_SQL, params=params)
|
||||
bq_query(client, ACTUALS_INSERT_SQL, params=params)
|
||||
|
||||
|
||||
def _as_date(value) -> date:
|
||||
if isinstance(value, date) and not isinstance(value, datetime):
|
||||
return value
|
||||
if isinstance(value, datetime):
|
||||
return value.date()
|
||||
return datetime.strptime(str(value)[:10], "%Y-%m-%d").date()
|
||||
|
||||
|
||||
def run_sales_forecast(
|
||||
as_of: str = "",
|
||||
horizon: int = 7,
|
||||
model: str = "baseline_v1",
|
||||
author: str = "egutierrez",
|
||||
dry_run: bool = False,
|
||||
) -> dict:
|
||||
"""Genera el forecast diario de ventas y lo escribe en BigQuery.
|
||||
|
||||
Args:
|
||||
as_of: fecha de corte 'YYYY-MM-DD' (dia de la corrida). Vacio = hoy. La
|
||||
historia utilizable llega hasta as_of - 1 dia; se predice
|
||||
as_of + 1 .. as_of + horizon. run_date = as_of.
|
||||
horizon: numero de dias futuros a predecir. Default 7.
|
||||
model: etiqueta del modelo escrita en cada fila (columna model). Default
|
||||
'baseline_v1'.
|
||||
author: autor de la corrida (columna author). Default 'egutierrez'.
|
||||
dry_run: si True no escribe en BigQuery; devuelve el resumen + una muestra
|
||||
de filas.
|
||||
|
||||
Returns:
|
||||
dict dict-no-throw. En exito {status:'ok', run_date, series, rows, model,
|
||||
author, (sample si dry_run)}. En error {status:'error', error, stage}.
|
||||
"""
|
||||
try:
|
||||
run_d = _as_date(as_of) if as_of else date.today()
|
||||
# Ultimo dia de historia utilizable (inclusive): as_of - 1 dia.
|
||||
hist_as_of = run_d - timedelta(days=1)
|
||||
horizon_dates = [
|
||||
(run_d + timedelta(days=k)).isoformat() for k in range(1, horizon + 1)
|
||||
]
|
||||
|
||||
# 1) Cliente BigQuery sin quota project (evita 403 USER_PROJECT_DENIED).
|
||||
client = bq_auth(PROJECT_ID, drop_quota_project=True)
|
||||
|
||||
# 2) Historia agregada del mart (hasta run_d - 1 via el WHERE de la query).
|
||||
q = bq_query(
|
||||
client,
|
||||
HISTORY_SQL,
|
||||
params=[{"name": "as_of", "type": "DATE", "value": run_d}],
|
||||
)
|
||||
cols = {name: i for i, name in enumerate(q["columns"])}
|
||||
|
||||
# Historia por serie + ultimos center_name/ambito conocidos + venta 8 semanas.
|
||||
history = []
|
||||
last_meta = {} # series_id -> (max_date, center_name, ambito, center_id, subcat)
|
||||
recent_sum = {} # series_id -> venta acumulada en las ultimas 8 semanas
|
||||
active_cutoff = hist_as_of - timedelta(weeks=8)
|
||||
for row in q["rows"]:
|
||||
fecha = _as_date(row[cols["fecha"]])
|
||||
center_id = str(row[cols["idCentro"]])
|
||||
subcat = row[cols["subcat_cqq"]]
|
||||
center_name = row[cols["center_name"]]
|
||||
ambito = row[cols["ambito"]]
|
||||
venta = float(row[cols["venta"]] or 0.0)
|
||||
series_id = f"{center_id}|{subcat}"
|
||||
|
||||
history.append(
|
||||
{"series_id": series_id, "date": fecha.isoformat(), "value": venta}
|
||||
)
|
||||
prev = last_meta.get(series_id)
|
||||
if prev is None or fecha > prev[0]:
|
||||
last_meta[series_id] = (fecha, center_name, ambito, center_id, subcat)
|
||||
if fecha > active_cutoff:
|
||||
recent_sum[series_id] = recent_sum.get(series_id, 0.0) + venta
|
||||
|
||||
# 3) Series activas: venta > 0 en las ultimas 8 semanas.
|
||||
active = {sid for sid, s in recent_sum.items() if s > 0.0}
|
||||
history = [h for h in history if h["series_id"] in active]
|
||||
|
||||
if not history:
|
||||
result = {
|
||||
"status": "ok",
|
||||
"run_date": run_d.isoformat(),
|
||||
"series": 0,
|
||||
"rows": 0,
|
||||
"model": model,
|
||||
"author": author,
|
||||
}
|
||||
if dry_run:
|
||||
result["sample"] = []
|
||||
return result
|
||||
|
||||
# 4) Modelo puro: todas las predicciones de golpe.
|
||||
preds = forecast_seasonal_median(
|
||||
history, horizon_dates, as_of=hist_as_of.isoformat()
|
||||
)
|
||||
|
||||
# 5) Filas para la tabla de predicciones.
|
||||
run_ts = datetime.now(timezone.utc).isoformat()
|
||||
rows_out = []
|
||||
for p in preds:
|
||||
sid = p["series_id"]
|
||||
meta = last_meta.get(sid)
|
||||
_, center_name, ambito, center_id, subcat = meta
|
||||
forecast_date = _as_date(p["date"])
|
||||
rows_out.append(
|
||||
{
|
||||
"run_ts": run_ts,
|
||||
"run_date": run_d.isoformat(),
|
||||
"forecast_date": forecast_date.isoformat(),
|
||||
"lag_days": (forecast_date - run_d).days,
|
||||
"center_id": center_id,
|
||||
"center_name": center_name,
|
||||
"ambito": ambito,
|
||||
"subcat_cgq": subcat,
|
||||
"model": model,
|
||||
"author": author,
|
||||
"y_pred": round(float(p["y_pred"]), 4),
|
||||
}
|
||||
)
|
||||
|
||||
summary = {
|
||||
"status": "ok",
|
||||
"run_date": run_d.isoformat(),
|
||||
"series": len(active),
|
||||
"rows": len(rows_out),
|
||||
"model": model,
|
||||
"author": author,
|
||||
}
|
||||
|
||||
# 6) dry-run: no escribe; devuelve resumen + muestra.
|
||||
if dry_run:
|
||||
summary["sample"] = rows_out[:5]
|
||||
return summary
|
||||
|
||||
# 7) Idempotencia: borra las predicciones previas de (run_date, model, author).
|
||||
bq_query(
|
||||
client,
|
||||
DELETE_SQL,
|
||||
params=[
|
||||
{"name": "d", "type": "DATE", "value": run_d},
|
||||
{"name": "m", "type": "STRING", "value": model},
|
||||
{"name": "a", "type": "STRING", "value": author},
|
||||
],
|
||||
)
|
||||
|
||||
# 8) Carga JSONL a la tabla (WRITE_APPEND, schema fijo de la tabla).
|
||||
tmp_path = None
|
||||
try:
|
||||
fd, tmp_path = tempfile.mkstemp(prefix="sales_forecast_", suffix=".jsonl")
|
||||
with os.fdopen(fd, "w", encoding="utf-8") as fh:
|
||||
for r in rows_out:
|
||||
fh.write(json.dumps(r, ensure_ascii=False) + "\n")
|
||||
load = bq_load_from_file(
|
||||
client,
|
||||
tmp_path,
|
||||
DEST_DATASET,
|
||||
DEST_TABLE,
|
||||
source_format="NEWLINE_DELIMITED_JSON",
|
||||
write_disposition="WRITE_APPEND",
|
||||
autodetect=False,
|
||||
)
|
||||
finally:
|
||||
if tmp_path and os.path.exists(tmp_path):
|
||||
os.remove(tmp_path)
|
||||
|
||||
if load.get("status") != "DONE":
|
||||
return {
|
||||
"status": "error",
|
||||
"error": f"load job no termino DONE: {load}",
|
||||
"stage": "load",
|
||||
}
|
||||
|
||||
summary["rows_loaded"] = load.get("rows_loaded")
|
||||
summary["job_id"] = load.get("job_id")
|
||||
|
||||
# 9) Refresca la tabla fisica de reales (ventana movil de 10 dias) para
|
||||
# que forecast_eval y el dashboard de competicion comparen contra el
|
||||
# ultimo estado del mart.
|
||||
try:
|
||||
_refresh_actuals(client, run_d)
|
||||
summary["actuals_refreshed"] = True
|
||||
except Exception as e: # noqa: BLE001
|
||||
# No invalida las predicciones ya cargadas: se reporta y se sigue.
|
||||
summary["actuals_refreshed"] = False
|
||||
summary["actuals_error"] = str(e)
|
||||
|
||||
return summary
|
||||
except Exception as e: # noqa: BLE001
|
||||
return {"status": "error", "error": str(e), "stage": "unexpected"}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Forecast diario de ventas Aurgi -> BigQuery sales_forecast.predictions."
|
||||
)
|
||||
parser.add_argument("--as-of", default="", help="Fecha de corte YYYY-MM-DD (vacio = hoy).")
|
||||
parser.add_argument("--horizon", type=int, default=7, help="Dias a predecir. Default 7.")
|
||||
parser.add_argument("--model", default="baseline_v1", help="Etiqueta del modelo.")
|
||||
parser.add_argument("--author", default="egutierrez", help="Autor de la corrida.")
|
||||
parser.add_argument(
|
||||
"--dry-run", action="store_true", help="No escribe en BigQuery; imprime muestra."
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
out = run_sales_forecast(
|
||||
as_of=args.as_of,
|
||||
horizon=args.horizon,
|
||||
model=args.model,
|
||||
author=args.author,
|
||||
dry_run=args.dry_run,
|
||||
)
|
||||
print(json.dumps(out, ensure_ascii=False, default=str))
|
||||
sys.exit(0 if out.get("status") == "ok" else 1)
|
||||
Reference in New Issue
Block a user