Compare commits

...

7 Commits

Author SHA1 Message Date
egutierrez 2ebc9efeb2 chore: auto-commit (8 archivos)
- scratchpad/gen_docs.py
- scratchpad/gen_intel.py
- scratchpad/gen_verify.py
- scratchpad/intel_build.json
- scratchpad/intel_lineage.json
- scratchpad/lineage_graph.json
- scratchpad/trace_intel.py
- scratchpad/trace_lineage.py

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-07-01 19:00:06 +02:00
egutierrez fbdf80bd71 chore: auto-commit (10 archivos)
- scratchpad/ap.parquet
- scratchpad/bq.py
- scratchpad/cards.json
- scratchpad/citas_recon.csv
- scratchpad/dash.txt
- scratchpad/diego.parquet
- scratchpad/diego_literals.sql
- scratchpad/exf/
- scratchpad/va.parquet
- scratchpad/vm.parquet

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-07-01 17:58:03 +02:00
egutierrez 8408863cfa feat(eda): pipeline BQ-EDA sobre tablas BigQuery (grupo eda)
Añade el conector y el pipeline para hacer EDA automático sobre tablas/vistas
de BigQuery, reutilizando profile_table del grupo eda sin duplicar profiling:

- load_bq_table_to_duckdb (datascience): trae una tabla BQ a DuckDB con
  seudonimización SHA-1 de columnas PII y normalización de dtypes. Por defecto
  carga el total de filas (sample_frac=None); el muestreo es opt-in explícito.
- profile_bq_table (pipeline): orquesta load -> profile_table -> render report
  (JSON + Markdown + PDF/PPTX). Full por defecto.

Ambas tageadas eda+bigquery, v1.1.0. El default full responde a la preferencia
del operador: los EDA se corren sobre el total salvo indicación contraria.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-01 12:45:39 +02:00
egutierrez 7273823087 Merge remote-tracking branch 'origin/master'
# Conflicts:
#	.claude/settings.local.json
2026-07-01 11:42:49 +02:00
egutierrez 76592e4dc0 chore: auto-commit (2 archivos)
- .claude/settings.local.json
- scratchpad/mbq.py

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-07-01 11:41:56 +02:00
egutierrez 26569c7015 chore: auto-commit (1 archivos)
- logs/ardour_mcp_server.log

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-07-01 02:16:25 +02:00
egutierrez 44622339fa merge(eda): cap4/cap5 distribuciones — parrafos al glosario, desc LLM+unidad por columna, donut->barras, PPT side_by_side 2026-07-01 02:11:53 +02:00
37 changed files with 3253 additions and 0 deletions
File diff suppressed because one or more lines are too long
+2
View File
@@ -79,8 +79,10 @@ from .render_paper_pdf import render_paper_pdf
from .draw_join_graph_figure import draw_join_graph_figure
from .generate_synthetic_eda_table import generate_synthetic_eda_table
from .generate_synthetic_eda_folder import generate_synthetic_eda_folder
from .load_bq_table_to_duckdb import load_bq_table_to_duckdb
__all__ = [
"load_bq_table_to_duckdb",
"generate_synthetic_eda_table",
"generate_synthetic_eda_folder",
"render_paper_pdf",
@@ -0,0 +1,86 @@
---
name: load_bq_table_to_duckdb
kind: function
lang: py
domain: datascience
version: "1.1.0"
purity: impure
signature: "def load_bq_table_to_duckdb(table_fqn: str, duckdb_path: str, dest_table: str = '', sample_frac: float = None, max_rows: int = 0, project_id: str = '', pseudonymize_cols: list = None) -> dict"
description: "Adaptador BigQuery -> DuckDB local para el grupo eda. Trae una tabla o vista de Google BigQuery a un archivo DuckDB local (por defecto COMPLETA, todas las filas; muestreo opt-in con sample_frac), de modo que las funciones del grupo de capacidad eda (que solo hablan DuckDB/PostgreSQL) puedan perfilarla. Fetch via BigQuery Storage Read API (Arrow) con fallback REST. Seudonimiza columnas PII con hash SHA-1 truncado antes de materializar (LOPDGDD/RGPD)."
tags: [eda, bigquery, duckdb, datascience]
params:
- name: table_fqn
desc: "FQN completo de la tabla/vista BigQuery: `project.dataset.table`."
- name: duckdb_path
desc: "Ruta del archivo DuckDB local donde materializar la tabla (se crea/sobrescribe la tabla dest)."
- name: dest_table
desc: "Nombre de la tabla DuckDB destino. Vacío = último segmento del FQN, saneado."
- name: sample_frac
desc: "None (DEFAULT) = FULL, trae todas las filas. Un float en (0,1) activa el muestreo opt-in con `WHERE rand() < frac` (~frac del total). Vistas no admiten TABLESAMPLE, por eso rand()."
- name: max_rows
desc: "Tope duro opcional de filas (LIMIT). 0 (DEFAULT) = sin tope. Se combina con sample_frac si ambos se pasan."
- name: project_id
desc: "Proyecto GCP de facturación. Vacío = primer segmento del FQN o el del ADC."
- name: pseudonymize_cols
desc: "Lista de columnas PII a seudonimizar con hash SHA-1 truncado antes de materializar (LOPDGDD/RGPD). Preserva nulos y cardinalidad."
output: "dict dict-no-throw. En éxito {status:'ok', duckdb_path, table, n_rows_source, n_rows_fetched, sampled, sample_frac, columns, pseudonymized}. En error {status:'error', error}."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/load_bq_table_to_duckdb.py"
---
## Ejemplo
```python
from datascience import load_bq_table_to_duckdb
# FULL por defecto: trae TODAS las filas de la vista (3,8M) a DuckDB.
r = load_bq_table_to_duckdb(
"autingo-159109.customer_marts.customer_profile",
"/tmp/eda_bq.duckdb",
pseudonymize_cols=["document_number", "full_name", "email", "phone"],
)
print(r["table"], r["n_rows_fetched"], "de", r["n_rows_source"], "sampled=", r["sampled"])
# Muestreo opt-in: ~5 % de las filas.
r = load_bq_table_to_duckdb(
"autingo-159109.customer_marts.customer_profile",
"/tmp/eda_bq_sample.duckdb",
sample_frac=0.05,
pseudonymize_cols=["document_number", "full_name", "email", "phone"],
)
```
## Cuando usarla
- Antes de perfilar una tabla/vista de BigQuery con el grupo `eda` (que solo habla DuckDB/PostgreSQL): trae el origen COMPLETO a DuckDB local (o una muestra con `sample_frac`) con seudonimización PII.
- Cuando necesites un puente único BigQuery -> DuckDB local -> grupo `eda` sin escribir el bridge inline cada vez.
- Cuando quieras que un EDA sobre datos de negocio conserve valor analítico (cardinalidad, nulos, distribución) sin incrustar datos personales reales.
## Gotchas
- **Impura**: hace I/O de red (BigQuery) + escritura a disco (DuckDB). Requiere ADC configurado (`gcloud auth application-default login`).
- **403 USER_PROJECT_DENIED**: se evita aplicando `creds.with_quota_project(None)` cuando el ADC arrastra un quota project ajeno (memoria `bq_direct_quota_project`).
- **TABLESAMPLE no funciona en vistas**: el muestreo (opt-in, `sample_frac`) usa `WHERE rand() < frac` (aplicable a tablas y vistas). `max_rows` es un `LIMIT` como tope duro opcional.
- **FULL por defecto**: `sample_frac=None` trae TODAS las filas. Trae el resultado a RAM como DataFrame de pandas antes de materializar en DuckDB, así que una tabla de muchos millones × muchas columnas puede consumir varios GB. Para tablas enormes que no quepan, pasa `sample_frac` (muestra) o `max_rows` (tope). El fetch usa el BigQuery Storage Read API (Arrow) cuando `google-cloud-bigquery-storage` + `pyarrow` están disponibles — mucho más rápido que REST para millones de filas; si no, cae al conversor REST automáticamente.
- **La seudonimización es un hash unidireccional** (SHA-1 truncado a 12 hex): no es reversible, correcto para EDA. Preserva nulos, cardinalidad y patrón de faltantes, pero NO permite recuperar el valor original.
- **dict-no-throw**: nunca lanza excepción; ante cualquier fallo (FQN inválido, auth, query) devuelve `{status:'error', error:str}`.
## Notas
Adaptador del grupo de capacidad `eda`: el resto de funciones del grupo perfilan
DuckDB/PostgreSQL, pero no hablan BigQuery de forma nativa. Esta función cubre ese
hueco materializando una sola tabla DuckDB desde el DataFrame resultante de la
query BigQuery. El nombre de tabla destino se sanea (`^[A-Za-z_][A-Za-z0-9_]*$`)
antes de citarlo en el `CREATE OR REPLACE TABLE`.
## Capability growth log
- v1.1.0 (2026-07-01) — FULL pasa a ser el DEFAULT: se sustituye `max_rows=300000, sample=True` por `sample_frac=None` (None = todas las filas) + `max_rows=0` (tope duro opcional). El muestreo es opt-in explícito. Fetch acelerado via BigQuery Storage Read API (Arrow) con fallback REST. Preferencia estándar del usuario: los EDA se corren sobre el total salvo que se pida lo contrario.
@@ -0,0 +1,157 @@
"""load_bq_table_to_duckdb — adaptador BigQuery -> DuckDB local para el grupo `eda`.
Trae una tabla o vista de Google BigQuery a un archivo DuckDB local (por defecto
COMPLETA — todas las filas — o una muestra si se pasa `sample_frac`), de modo que
las funciones del grupo de capacidad `eda` (que perfilan DuckDB/PostgreSQL)
puedan analizarla sin un adaptador BigQuery nativo. Materializa una sola tabla
DuckDB desde un DataFrame de pandas.
Modo por defecto = FULL: `sample_frac=None` trae la vista/tabla entera (preferencia
estándar del usuario: los EDA se corren sobre el total salvo que se pida lo
contrario). El muestreo es opt-in explícito: `sample_frac=0.05` trae ~5 %; `max_rows`
es un tope duro opcional (0 = sin tope). El fetch usa el BigQuery Storage Read API
(Arrow) cuando está disponible, con fallback al conversor REST.
Seudonimización LOPDGDD/RGPD: las columnas listadas en `pseudonymize_cols` se
transforman con un hash SHA-1 truncado ANTES de escribir a disco, preservando
nulos, cardinalidad y patrón de faltantes pero sin volcar el valor real (DNI,
nombre, email, teléfono, etc.). El EDA conserva su valor analítico sin incrustar
datos personales reales.
Autenticación: ADC (gcloud auth). Aplica creds.with_quota_project(None) para
evitar el 403 USER_PROJECT_DENIED cuando el ADC lleva quota project ajeno.
Estilo dict-no-throw del grupo `eda`: nunca lanza; devuelve {status:'error', ...}.
"""
import hashlib
import re
_FQN_RE = re.compile(r"^[A-Za-z0-9_.\-]+$")
def _pseudonymize_series(values):
"""Hash SHA-1 truncado (12 hex) de cada valor no nulo; conserva None/NaN."""
import pandas as pd
out = []
for v in values:
if v is None or (isinstance(v, float) and pd.isna(v)) or (
not isinstance(v, (list, dict)) and pd.isna(v) if _safe_isna(v) else False
):
out.append(None)
else:
h = hashlib.sha1(str(v).encode("utf-8")).hexdigest()[:12]
out.append(h)
return out
def _safe_isna(v):
import pandas as pd
try:
return bool(pd.isna(v))
except (TypeError, ValueError):
return False
def load_bq_table_to_duckdb(
table_fqn: str,
duckdb_path: str,
dest_table: str = "",
sample_frac: float = None,
max_rows: int = 0,
project_id: str = "",
pseudonymize_cols: list = None,
) -> dict:
try:
import duckdb
import google.auth
from google.cloud import bigquery
if not table_fqn or not _FQN_RE.match(table_fqn):
return {"status": "error", "error": f"table_fqn inválido: {table_fqn!r}"}
# dest_table: derivar del último segmento del FQN si no se pasa.
dest = dest_table or table_fqn.split(".")[-1]
if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", dest):
dest = re.sub(r"[^A-Za-z0-9_]", "_", dest) or "bq_table"
# Auth ADC con fix de quota project (403 USER_PROJECT_DENIED).
creds, adc_project = google.auth.default(
scopes=["https://www.googleapis.com/auth/bigquery"]
)
if hasattr(creds, "with_quota_project"):
creds = creds.with_quota_project(None)
proj = project_id or table_fqn.split(".")[0] or adc_project
client = bigquery.Client(project=proj, credentials=creds)
# Conteo de filas de origen.
cnt = client.query(
f"SELECT COUNT(*) AS n FROM `{table_fqn}`"
).result()
n_source = 0
for row in cnt:
n_source = int(row["n"])
# Modo por defecto = FULL (sample_frac=None -> todas las filas). El
# muestreo es opt-in: sample_frac in (0,1) muestrea esa fracción con
# `WHERE rand() < frac` (aplicable a tablas y vistas; TABLESAMPLE no va
# en vistas). max_rows>0 es un tope duro opcional (LIMIT); 0 = sin tope.
sampled = False
where = ""
if sample_frac is not None and 0 < float(sample_frac) < 1:
where = f" WHERE rand() < {float(sample_frac)}"
sampled = True
limit = f" LIMIT {int(max_rows)}" if max_rows and int(max_rows) > 0 else ""
sql = f"SELECT * FROM `{table_fqn}`{where}{limit}"
# Fetch: BigQuery Storage Read API (Arrow, rápido para millones de filas)
# con fallback al conversor REST si la lib no está o falla.
try:
df = client.query(sql).result().to_dataframe(create_bqstorage_client=True)
except Exception: # noqa: BLE001
df = client.query(sql).result().to_dataframe(create_bqstorage_client=False)
n_fetched = len(df)
# Normalizar dtypes de db-dtypes: el conversor REST de BigQuery mapea las
# columnas DATE/TIME a las extension dtypes `dbdate`/`dbtime` de db-dtypes,
# que DuckDB NO reconoce al registrar el DataFrame ("Data type 'dbdate' not
# recognized"). Se convierten a tipos estándar que DuckDB sí ingiere: DATE
# -> datetime64[ns], TIME -> string. El resto de dtypes (datetime64 de
# TIMESTAMP, Int64/boolean nullable, object) los acepta DuckDB tal cual.
import pandas as pd
for col in df.columns:
dt = str(df[col].dtype)
if dt == "dbdate":
df[col] = pd.to_datetime(df[col], errors="coerce")
elif dt == "dbtime":
df[col] = df[col].astype("string").astype(object)
# Seudonimización de columnas PII antes de escribir a disco.
pseudo_applied = []
for col in (pseudonymize_cols or []):
if col in df.columns:
df[col] = _pseudonymize_series(df[col].tolist())
pseudo_applied.append(col)
# Materializar a DuckDB (una tabla desde el DataFrame).
con = duckdb.connect(duckdb_path)
try:
con.register("_src_df", df)
con.execute(f'CREATE OR REPLACE TABLE "{dest}" AS SELECT * FROM _src_df')
con.unregister("_src_df")
finally:
con.close()
return {
"status": "ok",
"duckdb_path": duckdb_path,
"table": dest,
"n_rows_source": n_source,
"n_rows_fetched": n_fetched,
"sampled": sampled,
"sample_frac": float(sample_frac) if sampled else None,
"columns": list(df.columns),
"pseudonymized": pseudo_applied,
}
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e)}
@@ -0,0 +1,106 @@
---
name: profile_bq_table
kind: pipeline
lang: py
domain: pipelines
purity: impure
version: "1.1.0"
signature: "def profile_bq_table(table_fqn: str, sample_frac: float = None, max_rows: int = 0, pseudonymize_cols: list = None, run_models: bool = True, run_series: bool = False, run_llm: bool = False, project_id: str = \"\", report_dir: str = \"reports\", duckdb_path: str = \"\", keep_duckdb: bool = False) -> dict"
description: "EDA one-shot de una tabla o vista de BigQuery: materializa el origen COMPLETO por defecto (todas las filas; muestreo opt-in con sample_frac; seudonimizacion PII opcional, LOPDGDD/RGPD) a un DuckDB local con load_bq_table_to_duckdb y lo perfila end-to-end con profile_table del grupo de capacidad eda, emitiendo el informe AutomaticEDA (PDF A5 movil + PPTX 16:9), Markdown y JSON sidecar. Es el adaptador BigQuery que faltaba en el grupo eda, resuelto por composicion (BigQuery -> DuckDB local -> profile_table) sin duplicar la logica de perfilado ni de render. Es el hazme un EDA de esta tabla BigQuery en una sola llamada, sobre el total de filas por defecto."
tags: [eda, bigquery, launcher]
uses_functions:
- load_bq_table_to_duckdb_py_datascience
- profile_table_py_pipelines
uses_types: []
returns: []
returns_optional: false
error_type: error_go_core
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/pipelines/profile_bq_table.py"
params:
- name: table_fqn
desc: "FQN de la tabla/vista BigQuery: `project.dataset.table`."
- name: sample_frac
desc: "None (DEFAULT) = FULL, perfila TODAS las filas del origen. Un float en (0,1) activa el muestreo opt-in (`WHERE rand() < frac`, ~frac del total)."
- name: max_rows
desc: "Tope duro opcional de filas (LIMIT). 0 (DEFAULT) = sin tope. Se combina con sample_frac si ambos se pasan."
- name: pseudonymize_cols
desc: "Columnas PII a seudonimizar (hash) antes de materializar (LOPDGDD/RGPD). Preserva nulos y cardinalidad."
- name: run_models
desc: "PCA/KMeans/IsolationForest/normalidad sobre numericas. Default True (informe AutomaticEDA completo)."
- name: run_series
desc: "Analisis de serie temporal por columna numerica. Default False."
- name: run_llm
desc: "1 llamada LLM sobre el perfil agregado (nunca filas crudas). Default False."
- name: project_id
desc: "Proyecto GCP de facturacion. Vacio = primer segmento del FQN."
- name: report_dir
desc: "Directorio de salida de los reports. Default 'reports' (artefacto local gitignored)."
- name: duckdb_path
desc: "Ruta DuckDB a usar. Vacio = temporal autogestionado."
- name: keep_duckdb
desc: "Si True conserva el DuckDB materializado (para el notebook Jupyter). Default False."
output: "dict dict-no-throw. En exito {status:'ok', table_fqn, load:{n_rows_source,n_rows_fetched,sampled,sample_frac,pseudonymized,table}, duckdb_path, report_md_path, report_json_path, aeda_pdf_path, aeda_pptx_path, aeda_manifest_path, profile}. En error {status:'error', error, stage}."
---
## Ejemplo
```python
from pipelines.profile_bq_table import profile_bq_table
# FULL por defecto: EDA sobre TODAS las filas de la vista (3,8M).
r = profile_bq_table(
"autingo-159109.customer_marts.customer_profile",
pseudonymize_cols=["document_number", "full_name", "email", "phone", "postal_code", "salesforce_customer_id"],
run_models=True,
)
print(r["load"]["n_rows_fetched"], "filas perfiladas, sampled=", r["load"]["sampled"])
print(r["aeda_pdf_path"]); print(r["aeda_pptx_path"]); print(r["report_md_path"])
# Muestreo opt-in: EDA sobre ~5 % de las filas (tabla enorme / iteracion rapida).
r = profile_bq_table(
"autingo-159109.customer_marts.customer_profile",
sample_frac=0.05,
pseudonymize_cols=["document_number", "full_name", "email", "phone", "postal_code", "salesforce_customer_id"],
)
```
## Cuando usarla
Cuando pidan un EDA de una tabla o vista de BigQuery ("hazme un EDA de esta
tabla BigQuery"). Es el adaptador BigQuery del grupo de capacidad `eda` por
composicion: trae el origen COMPLETO (todas las filas, por defecto) a un DuckDB
local y delega todo el perfilado y render en `profile_table`, sin adaptador
BigQuery nativo ni logica de EDA duplicada. Usala como primer paso al recibir un
dataset BigQuery desconocido, antes de modelar o limpiar, o para auditar la
calidad de una vista ya productiva. Para iteracion rapida o tablas que no quepan
en RAM, pasa `sample_frac` (muestreo opt-in).
## Gotchas
- Impura: requiere ADC de BigQuery configurado (Application Default Credentials)
para que `load_bq_table_to_duckdb` autentique contra el proyecto.
- FULL por defecto: `sample_frac=None` perfila TODAS las filas del origen. Una
vista de millones de filas se trae entera a RAM (varios GB posibles) antes de
materializar en DuckDB; el fetch usa el BigQuery Storage Read API (Arrow) cuando
esta disponible, mucho mas rapido que REST. Para acotar coste/memoria, pasa
`sample_frac` in (0,1) (muestreo opt-in) o `max_rows` (tope duro). Si por limite
de recursos no cabe el total, dilo explicito con el maximo que si se cargo.
- Seudonimiza PII con `pseudonymize_cols` para cumplir LOPDGDD/RGPD ANTES de
escribir a disco: nombres, DNI/NIE, email, telefono, direccion, IDs de cliente,
etc. Se hashean preservando nulos y cardinalidad. Sin seudonimizar, la muestra
materializada (DuckDB + reports) contiene datos personales reales [POL-MMNSEG-001-1.0].
- El DuckDB temporal se borra al terminar salvo `keep_duckdb=True` (pasalo para
seguir explorando la muestra desde un notebook Jupyter). Si pasas `duckdb_path`
explicito, la ruta se respeta y solo se conserva con `keep_duckdb=True`.
- Escribe reports a `report_dir` (default 'reports', artefacto local gitignored):
Markdown + JSON sidecar + PDF A5 movil + PPTX 16:9 del informe AutomaticEDA.
- `run_llm=True` gasta tokens (haiku) pero solo envia el perfil agregado, nunca
filas crudas ni datos personales.
## Capability growth log
- v1.1.0 (2026-07-01) — FULL pasa a ser el DEFAULT del pipeline: se sustituye `max_rows=300000, sample=True` por `sample_frac=None` (None = perfila todas las filas) + `max_rows=0` (tope duro opcional). El muestreo es opt-in explicito (`sample_frac`). Alinea con la preferencia estandar del usuario: los EDA se corren sobre el total salvo que se pida lo contrario. Hereda el fetch acelerado (Arrow/bqstorage) de `load_bq_table_to_duckdb` v1.1.0.
@@ -0,0 +1,138 @@
"""profile_bq_table — EDA one-shot de una tabla/vista BigQuery con el grupo `eda`.
Pipeline impuro: materializa una tabla o vista de BigQuery (por defecto COMPLETA —
todas las filas — o una muestra si se pasa `sample_frac`, con seudonimizacion PII
opcional, LOPDGDD/RGPD) a un DuckDB local con `load_bq_table_to_duckdb`, y la
perfila end-to-end con `profile_table` del grupo de capacidad `eda`, emitiendo el
informe AutomaticEDA (PDF A5 movil + PPTX 16:9), Markdown y JSON sidecar. Es el
adaptador BigQuery que faltaba en el grupo `eda`, resuelto por composicion
(BigQuery -> DuckDB local -> profile_table) sin duplicar la logica de perfilado ni
de render.
Modo por defecto = FULL: `sample_frac=None` perfila TODAS las filas del origen
(preferencia estandar del usuario: los EDA se corren sobre el total salvo que se
pida lo contrario). El muestreo es opt-in explicito: `sample_frac=0.05` perfila
~5 % de las filas; `max_rows` es un tope duro opcional (0 = sin tope).
Funciones del registry compuestas (NO se reimplementa su logica):
- load_bq_table_to_duckdb : trae la tabla/vista BigQuery a un DuckDB local
(completa por defecto, o muestra si sample_frac).
- profile_table : orquestador one-shot del grupo `eda` que perfila la
DuckDB materializada y emite el informe AutomaticEDA.
Estilo dict-no-throw del grupo `eda`: nunca lanza; devuelve {status:'error', ...}.
"""
import os
import tempfile
from datascience import load_bq_table_to_duckdb
from pipelines.profile_table import profile_table
def profile_bq_table(
table_fqn: str,
sample_frac: float = None,
max_rows: int = 0,
pseudonymize_cols: list = None,
run_models: bool = True,
run_series: bool = False,
run_llm: bool = False,
project_id: str = "",
report_dir: str = "reports",
duckdb_path: str = "",
keep_duckdb: bool = False,
) -> dict:
"""EDA one-shot de una tabla/vista BigQuery.
Por defecto perfila TODAS las filas del origen (`sample_frac=None`, modo FULL).
Materializa el origen (con seudonimizacion PII opcional) a un DuckDB local y lo
perfila con `profile_table` del grupo `eda`, emitiendo el informe AutomaticEDA
(PDF A5 movil + PPTX 16:9) + Markdown + JSON sidecar.
Args:
table_fqn: FQN de la tabla/vista BigQuery ("project.dataset.table").
sample_frac: None (default) = FULL, perfila todas las filas. Un float en
(0,1) activa el muestreo opt-in (`WHERE rand() < frac`, ~frac del total).
max_rows: Tope duro opcional de filas (LIMIT). 0 (default) = sin tope.
pseudonymize_cols: Columnas PII a seudonimizar (hash) antes de materializar.
run_models: Modelos baratos (PCA/KMeans/IsolationForest/normalidad).
run_series: Analisis de serie temporal por columna numerica.
run_llm: 1 llamada LLM sobre el perfil agregado (nunca filas crudas).
project_id: Proyecto GCP de facturacion. Vacio = primer segmento del FQN.
report_dir: Directorio de salida de los reports.
duckdb_path: Ruta DuckDB a usar. Vacio = temporal autogestionado.
keep_duckdb: Si True conserva el DuckDB materializado.
Returns:
dict dict-no-throw con el resultado del pipeline (ver output del .md).
"""
tmp_created = False
try:
# DuckDB temporal si no se pasa ruta.
if not duckdb_path:
fd, duckdb_path = tempfile.mkstemp(prefix="eda_bq_", suffix=".duckdb")
os.close(fd)
os.remove(duckdb_path) # que lo cree DuckDB limpio
tmp_created = True
load = load_bq_table_to_duckdb(
table_fqn,
duckdb_path,
sample_frac=sample_frac,
max_rows=max_rows,
project_id=project_id,
pseudonymize_cols=pseudonymize_cols,
)
if load.get("status") != "ok":
return {
"status": "error",
"error": load.get("error", "load fallo"),
"stage": "load",
}
prof = profile_table(
duckdb_path,
load["table"],
backend="duckdb",
run_models=run_models,
run_series=run_series,
run_llm=run_llm,
emit_automatic=True, # PDF A5 movil + PPTX 16:9
emit_pdf=False,
write_report=True, # Markdown + JSON sidecar
report_dir=report_dir,
)
if prof.get("status") != "ok":
return {
"status": "error",
"error": prof.get("error", "profile fallo"),
"stage": "profile",
"load": load,
}
return {
"status": "ok",
"table_fqn": table_fqn,
"load": {
k: load[k]
for k in ("n_rows_source", "n_rows_fetched", "sampled", "sample_frac", "pseudonymized", "table")
if k in load
},
"duckdb_path": duckdb_path if keep_duckdb else None,
"report_md_path": prof.get("report_md_path"),
"report_json_path": prof.get("report_json_path"),
"aeda_pdf_path": prof.get("aeda_pdf_path"),
"aeda_pptx_path": prof.get("aeda_pptx_path"),
"aeda_manifest_path": prof.get("aeda_manifest_path"),
"profile": prof.get("profile"),
}
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e)}
finally:
# Limpia el DuckDB temporal salvo que se pida conservarlo.
if tmp_created and not keep_duckdb and duckdb_path and os.path.exists(duckdb_path):
try:
os.remove(duckdb_path)
except OSError:
pass
Binary file not shown.
+7
View File
@@ -0,0 +1,7 @@
import google.auth
from google.cloud import bigquery
_creds, _ = google.auth.default(scopes=['https://www.googleapis.com/auth/bigquery'])
_creds = _creds.with_quota_project(None)
client = bigquery.Client(project='autingo-159109', location='europe-west1', credentials=_creds)
def q(sql):
return client.query(sql).result().to_dataframe()
+1
View File
@@ -0,0 +1 @@
{"c1": 12363, "c2": 12364, "c3": 12365}
+61
View File
@@ -0,0 +1,61 @@
ensena,year,mes,diego,bq_neto,match
Aurgi,2023,feb,80.52,,
Aurgi,2023,mar,89.94,,
Aurgi,2023,abr,76.87,,
Aurgi,2023,may,87.95,,
Aurgi,2023,jun,97.84,,
Aurgi,2023,jul,138.24,,
Aurgi,2023,ago,89.7,,
Aurgi,2023,sep,61.53,,
Aurgi,2023,oct,56.48,,
Aurgi,2023,nov,73.2,,
Aurgi,2023,dic,78.81,,
Aurgi,2024,ene,75.34,75.35,100.0
Aurgi,2024,feb,60.21,60.21,100.0
Aurgi,2024,mar,70.62,71.26,99.1
Aurgi,2024,abr,70.46,70.46,100.0
Aurgi,2024,may,84.76,84.76,100.0
Aurgi,2024,jun,108.7,108.7,100.0
Aurgi,2024,jul,141.2,141.2,100.0
Aurgi,2024,ago,100.18,100.18,100.0
Aurgi,2024,sep,67.91,67.91,100.0
Aurgi,2024,oct,81.31,81.31,100.0
Aurgi,2024,nov,71.57,71.57,100.0
Aurgi,2024,dic,74.33,74.33,100.0
Aurgi,2025,ene,86.28,86.28,100.0
Aurgi,2025,feb,53.05,53.05,100.0
Aurgi,2025,mar,86.75,86.75,100.0
Aurgi,2025,abr,83.89,83.89,100.0
Aurgi,2025,may,84.24,84.24,100.0
Aurgi,2025,jun,134.46,134.46,100.0
Aurgi,2025,jul,101.17,174.32,58.0
MT,2023,feb,30.19,,
MT,2023,mar,41.89,,
MT,2023,abr,36.16,,
MT,2023,may,42.01,,
MT,2023,jun,44.24,,
MT,2023,jul,63.61,,
MT,2023,ago,40.7,,
MT,2023,sep,28.6,,
MT,2023,oct,28.79,,
MT,2023,nov,30.3,,
MT,2023,dic,35.21,,
MT,2024,ene,38.13,38.13,100.0
MT,2024,feb,32.44,32.44,100.0
MT,2024,mar,35.17,35.18,100.0
MT,2024,abr,35.38,35.38,100.0
MT,2024,may,37.58,37.58,100.0
MT,2024,jun,44.54,44.54,100.0
MT,2024,jul,58.92,58.92,100.0
MT,2024,ago,40.97,40.98,100.0
MT,2024,sep,35.03,35.03,100.0
MT,2024,oct,38.86,38.86,100.0
MT,2024,nov,36.48,36.48,100.0
MT,2024,dic,40.52,40.52,100.0
MT,2025,ene,39.16,39.16,100.0
MT,2025,feb,28.16,28.16,100.0
MT,2025,mar,42.26,42.26,100.0
MT,2025,abr,44.04,44.04,100.0
MT,2025,may,52.71,52.71,100.0
MT,2025,jun,63.54,63.54,100.0
MT,2025,jul,49.47,84.94,58.2
1 ensena year mes diego bq_neto match
2 Aurgi 2023 feb 80.52
3 Aurgi 2023 mar 89.94
4 Aurgi 2023 abr 76.87
5 Aurgi 2023 may 87.95
6 Aurgi 2023 jun 97.84
7 Aurgi 2023 jul 138.24
8 Aurgi 2023 ago 89.7
9 Aurgi 2023 sep 61.53
10 Aurgi 2023 oct 56.48
11 Aurgi 2023 nov 73.2
12 Aurgi 2023 dic 78.81
13 Aurgi 2024 ene 75.34 75.35 100.0
14 Aurgi 2024 feb 60.21 60.21 100.0
15 Aurgi 2024 mar 70.62 71.26 99.1
16 Aurgi 2024 abr 70.46 70.46 100.0
17 Aurgi 2024 may 84.76 84.76 100.0
18 Aurgi 2024 jun 108.7 108.7 100.0
19 Aurgi 2024 jul 141.2 141.2 100.0
20 Aurgi 2024 ago 100.18 100.18 100.0
21 Aurgi 2024 sep 67.91 67.91 100.0
22 Aurgi 2024 oct 81.31 81.31 100.0
23 Aurgi 2024 nov 71.57 71.57 100.0
24 Aurgi 2024 dic 74.33 74.33 100.0
25 Aurgi 2025 ene 86.28 86.28 100.0
26 Aurgi 2025 feb 53.05 53.05 100.0
27 Aurgi 2025 mar 86.75 86.75 100.0
28 Aurgi 2025 abr 83.89 83.89 100.0
29 Aurgi 2025 may 84.24 84.24 100.0
30 Aurgi 2025 jun 134.46 134.46 100.0
31 Aurgi 2025 jul 101.17 174.32 58.0
32 MT 2023 feb 30.19
33 MT 2023 mar 41.89
34 MT 2023 abr 36.16
35 MT 2023 may 42.01
36 MT 2023 jun 44.24
37 MT 2023 jul 63.61
38 MT 2023 ago 40.7
39 MT 2023 sep 28.6
40 MT 2023 oct 28.79
41 MT 2023 nov 30.3
42 MT 2023 dic 35.21
43 MT 2024 ene 38.13 38.13 100.0
44 MT 2024 feb 32.44 32.44 100.0
45 MT 2024 mar 35.17 35.18 100.0
46 MT 2024 abr 35.38 35.38 100.0
47 MT 2024 may 37.58 37.58 100.0
48 MT 2024 jun 44.54 44.54 100.0
49 MT 2024 jul 58.92 58.92 100.0
50 MT 2024 ago 40.97 40.98 100.0
51 MT 2024 sep 35.03 35.03 100.0
52 MT 2024 oct 38.86 38.86 100.0
53 MT 2024 nov 36.48 36.48 100.0
54 MT 2024 dic 40.52 40.52 100.0
55 MT 2025 ene 39.16 39.16 100.0
56 MT 2025 feb 28.16 28.16 100.0
57 MT 2025 mar 42.26 42.26 100.0
58 MT 2025 abr 44.04 44.04 100.0
59 MT 2025 may 52.71 52.71 100.0
60 MT 2025 jun 63.54 63.54 100.0
61 MT 2025 jul 49.47 84.94 58.2
+1
View File
@@ -0,0 +1 @@
https://reports.autingo.es/dashboard/1142
Binary file not shown.
+60
View File
@@ -0,0 +1,60 @@
STRUCT(DATE(2023,2,1) AS mes, 80.515 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2023,3,1) AS mes, 89.936 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2023,4,1) AS mes, 76.866 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2023,5,1) AS mes, 87.952 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2023,6,1) AS mes, 97.84 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2023,7,1) AS mes, 138.24 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2023,8,1) AS mes, 89.7 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2023,9,1) AS mes, 61.53 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2023,10,1) AS mes, 56.48 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2023,11,1) AS mes, 73.2 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2023,12,1) AS mes, 78.81 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2024,1,1) AS mes, 75.345 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2024,2,1) AS mes, 60.211 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2024,3,1) AS mes, 70.62 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2024,4,1) AS mes, 70.456 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2024,5,1) AS mes, 84.759 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2024,6,1) AS mes, 108.702 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2024,7,1) AS mes, 141.204 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2024,8,1) AS mes, 100.181 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2024,9,1) AS mes, 67.91 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2024,10,1) AS mes, 81.307 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2024,11,1) AS mes, 71.569 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2024,12,1) AS mes, 74.329 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2025,1,1) AS mes, 86.277 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2025,2,1) AS mes, 53.054 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2025,3,1) AS mes, 86.749 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2025,4,1) AS mes, 83.888 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2025,5,1) AS mes, 84.24 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2025,6,1) AS mes, 134.464 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2025,7,1) AS mes, 101.168 AS diego_neto_k, 1 AS company_id),
STRUCT(DATE(2023,2,1) AS mes, 30.189 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2023,3,1) AS mes, 41.89 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2023,4,1) AS mes, 36.16 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2023,5,1) AS mes, 42.011 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2023,6,1) AS mes, 44.24 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2023,7,1) AS mes, 63.61 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2023,8,1) AS mes, 40.7 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2023,9,1) AS mes, 28.6 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2023,10,1) AS mes, 28.79 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2023,11,1) AS mes, 30.3 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2023,12,1) AS mes, 35.207 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2024,1,1) AS mes, 38.132 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2024,2,1) AS mes, 32.438 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2024,3,1) AS mes, 35.174 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2024,4,1) AS mes, 35.382 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2024,5,1) AS mes, 37.584 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2024,6,1) AS mes, 44.54 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2024,7,1) AS mes, 58.921 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2024,8,1) AS mes, 40.974 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2024,9,1) AS mes, 35.029 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2024,10,1) AS mes, 38.861 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2024,11,1) AS mes, 36.48 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2024,12,1) AS mes, 40.522 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2025,1,1) AS mes, 39.161 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2025,2,1) AS mes, 28.16 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2025,3,1) AS mes, 42.263 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2025,4,1) AS mes, 44.04 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2025,5,1) AS mes, 52.71 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2025,6,1) AS mes, 63.544 AS diego_neto_k, 2 AS company_id),
STRUCT(DATE(2025,7,1) AS mes, 49.469 AS diego_neto_k, 2 AS company_id)
+8
View File
@@ -0,0 +1,8 @@
import sys, json
from google.cloud import bigquery
import google.auth
creds=google.auth.default(scopes=['https://www.googleapis.com/auth/bigquery'])[0].with_quota_project(None)
c=bigquery.Client(project='autingo-159109', location='europe-west1', credentials=creds)
sql=sys.stdin.read()
for r in c.query(sql).result():
print(json.dumps(dict(r), default=str))
+152
View File
@@ -0,0 +1,152 @@
import json, os, urllib.request, sys
MB = os.environ["MB"]; KEY = os.environ["KEY"]
def api(method, path, body=None, timeout=180):
data = json.dumps(body).encode() if body is not None else None
req = urllib.request.Request(MB + path, data=data, method=method,
headers={"X-API-KEY": KEY, "Content-Type": "application/json"})
try:
return json.load(urllib.request.urlopen(req, timeout=timeout))
except urllib.error.HTTPError as e:
print(f"HTTP {e.code} on {method} {path}:", e.read().decode()[:1200]); raise
# Bridge documento -> service_request (canal + charged), tal cual 1094 card 11751.
BASE = r"""
WITH vf AS (
SELECT document_id, LOGICAL_OR(is_pw) is_pw FROM (
SELECT CAST(document_id AS STRING) document_id, ANY_VALUE(is_precaweb) is_pw
FROM `autingo-159109.anjana_bi_datamart.VENTAS_aurgi` GROUP BY 1
UNION ALL
SELECT CAST(document_id AS STRING), ANY_VALUE(is_precaweb)
FROM `autingo-159109.anjana_bi_datamart.VENTAS_Motortown` GROUP BY 1
) GROUP BY 1
),
lineas AS (
SELECT
CAST(s.numeroDocumento AS STRING) AS numdoc,
CAST(s.idCentro AS STRING) AS idcentro,
DATE(s.Fecha) AS fecha,
s.Base_imponible_linea AS bil
FROM {{#4494}} s
WHERE DATE(s.Fecha) >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)
[[AND DATE(s.Fecha) >= {{fecha_desde}}]]
[[AND DATE(s.Fecha) <= {{fecha_hasta}}]]
),
web AS (
SELECT l.numdoc, l.fecha, l.bil, oc.name AS centro, oc.Companies__name AS ambito
FROM lineas l
LEFT JOIN vf ON l.numdoc = vf.document_id
LEFT JOIN `autingo-159109.rag_datasets.Objeto_Centros` oc
ON l.idcentro = CAST(oc.nav_id AS STRING)
WHERE (COALESCE(vf.is_pw, FALSE) OR oc.name IN ('Aurgi Web','MT Web'))
AND (oc.Companies__name IS NULL OR oc.Companies__name NOT IN ('Aurgi Glass','MotorTown Glass'))
[[AND oc.name IN ({{centro}})]]
[[AND oc.Companies__name IN ({{ensena}})]]
),
sr_link AS (
SELECT CAST(inv.nav_id AS STRING) numdoc, CAST(j.service_request_id AS STRING) sr_id
FROM `autingo-159109.psql_dcpublic.tpv_orders_invoice` inv
JOIN `autingo-159109.psql_dcpublic.tpv_precawebs_servicerequestjob` j ON j.order_id = inv.order_id
WHERE inv.nav_id IS NOT NULL
UNION DISTINCT
SELECT CAST(invoice_number AS STRING), CAST(service_request_id AS STRING)
FROM `autingo-159109.psql_dcpublic.logistic_orders`
WHERE invoice_number IS NOT NULL AND invoice_number != ''
),
sr_link1 AS (SELECT numdoc, MIN(sr_id) sr_id FROM sr_link GROUP BY 1),
sr AS (
SELECT CAST(id AS STRING) sr_id, channel_id, charged
FROM `autingo-159109.psql_dcpublic.service_requests`
),
doc AS (
SELECT
w.numdoc,
ANY_VALUE(w.fecha) AS fecha,
SUM(w.bil) AS venta,
ANY_VALUE(sl.sr_id) AS sr_id,
ANY_VALUE(sr.channel_id) AS channel_id,
ANY_VALUE(sr.charged) AS charged
FROM web w
LEFT JOIN sr_link1 sl USING (numdoc)
LEFT JOIN sr ON sr.sr_id = sl.sr_id
GROUP BY w.numdoc
),
fin AS (
SELECT
numdoc, fecha, venta,
CASE WHEN sr_id IS NULL THEN 'Sin solicitud'
WHEN channel_id = 1 THEN 'aurgi.com'
WHEN channel_id = 2 THEN 'motortown.es'
WHEN channel_id = 3 THEN 'Autingo'
WHEN channel_id IN (11,13,14,15,6,8) THEN 'Marketplaces'
WHEN channel_id = 10 THEN 'Talleres Digitales'
ELSE 'Otros' END AS canal,
CASE WHEN sr_id IS NULL THEN 'Sin solicitud'
WHEN charged THEN 'Pago web'
ELSE 'Pago tienda' END AS forma_pago
FROM doc
)
"""
CARDS = {
"total": {
"name": "Venta web total (facturacion NAV / modelo 4494)",
"sql": BASE + "SELECT ROUND(SUM(venta),0) AS venta_web_eur, COUNT(DISTINCT numdoc) AS documentos FROM fin",
"display": "scalar",
},
"canal": {
"name": "Venta web por canal",
"sql": BASE + "SELECT canal, ROUND(SUM(venta),0) AS venta_eur, COUNT(DISTINCT numdoc) AS documentos FROM fin GROUP BY canal ORDER BY venta_eur DESC",
"display": "bar",
},
"pago": {
"name": "Venta web por forma de pago",
"sql": BASE + "SELECT forma_pago, ROUND(SUM(venta),0) AS venta_eur, COUNT(DISTINCT numdoc) AS documentos FROM fin GROUP BY forma_pago ORDER BY venta_eur DESC",
"display": "row",
},
"matriz": {
"name": "Venta web: matriz canal x forma de pago",
"sql": BASE + "SELECT canal, forma_pago, ROUND(SUM(venta),0) AS venta_eur, COUNT(DISTINCT numdoc) AS documentos FROM fin GROUP BY canal, forma_pago ORDER BY venta_eur DESC",
"display": "table",
},
"evolutivo": {
"name": "Venta web mensual por canal",
"sql": BASE + "SELECT DATE_TRUNC(fecha, MONTH) AS mes, canal, ROUND(SUM(venta),0) AS venta_eur FROM fin GROUP BY mes, canal ORDER BY mes, venta_eur DESC",
"display": "bar",
},
}
TAGS = {
"#4494": {"type":"card","name":"#4494","id":"card__4494","display-name":"#4494","card-id":4494},
"fecha_desde": {"type":"date","name":"fecha_desde","id":"tag-fecha-desde","display-name":"Fecha desde"},
"fecha_hasta": {"type":"date","name":"fecha_hasta","id":"tag-fecha-hasta","display-name":"Fecha hasta"},
"centro": {"type":"text","name":"centro","id":"tag-centro","display-name":"Centro"},
"ensena": {"type":"text","name":"ensena","id":"tag-ensena","display-name":"Ensena"},
}
def dq(sql):
return {"type":"native","database":6,"native":{"query":sql,"template-tags":TAGS}}
def test_query(sql, params=None):
body = dq(sql)
body["parameters"] = params or []
r = api("POST", "/api/dataset", body)
if r.get("error"):
print("QUERY ERROR:", r.get("error")); return None
cols = [c["name"] for c in r["data"]["cols"]]
rows = r["data"]["rows"]
return cols, rows
if __name__ == "__main__":
which = sys.argv[1] if len(sys.argv) > 1 else "all"
# param YTD 2026 para verificar reconciliacion
p_ytd = [{"type":"date/single","value":"2026-01-01","target":["variable",["template-tag","fecha_desde"]]}]
for k, c in CARDS.items():
if which != "all" and which != k: continue
print(f"\n===== TEST {k}: {c['name']} =====")
res = test_query(c["sql"], p_ytd)
if res:
cols, rows = res
print("cols:", cols)
for row in rows[:15]: print(" ", row)
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
+1
View File
@@ -0,0 +1 @@
{"total": 12367, "canal": 12368, "pago": 12369, "matriz": 12370, "evolutivo": 12371}
+42
View File
@@ -0,0 +1,42 @@
import json, sys
sys.path.insert(0, "scratchpad/exf")
from build import api, BASE, CARDS, TAGS, dq
COLLECTION = 583 # "Claude" (junto a 1094)
CUR = {"number_style":"currency","currency":"EUR","currency_style":"symbol","decimals":0}
def viz(kind):
if kind == "total":
return {"column_settings":{'["name","venta_web_eur"]':CUR},
"scalar.field":"venta_web_eur"}
if kind == "canal":
return {"graph.dimensions":["canal"],"graph.metrics":["venta_eur"],
"graph.x_axis.title_text":"Canal","graph.y_axis.title_text":"Venta web (EUR)",
"column_settings":{'["name","venta_eur"]':CUR},"graph.show_values":True}
if kind == "pago":
return {"graph.dimensions":["forma_pago"],"graph.metrics":["venta_eur"],
"column_settings":{'["name","venta_eur"]':CUR},"graph.show_values":True}
if kind == "matriz":
return {"column_settings":{'["name","venta_eur"]':CUR},
"table.columns":[
{"name":"canal","enabled":True},{"name":"forma_pago","enabled":True},
{"name":"venta_eur","enabled":True},{"name":"documentos","enabled":True}]}
if kind == "evolutivo":
return {"graph.dimensions":["mes","canal"],"graph.metrics":["venta_eur"],
"stackable.stack_type":"stacked","column_settings":{'["name","venta_eur"]':CUR},
"graph.x_axis.title_text":"Mes","graph.y_axis.title_text":"Venta web (EUR)"}
return {}
created = {}
for k, c in CARDS.items():
body = {"name": c["name"], "display": c["display"],
"dataset_query": dq(c["sql"]),
"visualization_settings": viz(k),
"collection_id": COLLECTION}
r = api("POST", "/api/card", body)
created[k] = r["id"]
print(f"card {k}: id {r['id']} {c['name']}")
json.dump(created, open("scratchpad/exf/cards.json","w"))
print("CARDS:", created)
+1
View File
@@ -0,0 +1 @@
{"dashboard_id": 1143}
+54
View File
@@ -0,0 +1,54 @@
import json, sys
sys.path.insert(0, "scratchpad/exf")
from build import api
C = json.load(open("scratchpad/exf/cards.json"))
COLLECTION = 583
# 1) crear dashboard vacio
dash = api("POST", "/api/dashboard", {
"name": "Venta Web por Canal y Forma de Pago (facturacion NAV / modelo 4494)",
"collection_id": COLLECTION,
"description": "Solo venta web (origen precaweb) tomada del modelo 4494 (SUM Base_imponible_linea, facturacion NAV neta), desglosada por canal (channel_id) y forma de pago (pago web vs pago tienda), segun las convenciones del dashboard 1094. Glass excluido. Default: YTD 2026.",
})
DID = dash["id"]
print("dashboard id:", DID)
# 2) parametros del dashboard
PARAMS = [
{"id":"p_desde","name":"Fecha desde","slug":"fecha_desde","type":"date/single","default":"2026-01-01"},
{"id":"p_hasta","name":"Fecha hasta","slug":"fecha_hasta","type":"date/single"},
{"id":"p_centro","name":"Centro","slug":"centro","type":"string/=","sectionId":"string"},
{"id":"p_ensena","name":"Ensena","slug":"ensena","type":"string/=","sectionId":"string"},
]
def mappings(cid):
return [
{"parameter_id":"p_desde","card_id":cid,"target":["variable",["template-tag","fecha_desde"]]},
{"parameter_id":"p_hasta","card_id":cid,"target":["variable",["template-tag","fecha_hasta"]]},
{"parameter_id":"p_centro","card_id":cid,"target":["variable",["template-tag","centro"]]},
{"parameter_id":"p_ensena","card_id":cid,"target":["variable",["template-tag","ensena"]]},
]
# 3) layout (grid 24 col)
LAYOUT = {
"total": (0, 0, 6, 4),
"pago": (0, 6, 18, 4),
"canal": (4, 0, 12, 7),
"matriz": (4, 12, 12, 7),
"evolutivo": (11, 0, 24, 7),
}
dashcards = []
neg = -1
for k,(row,col,sx,sy) in LAYOUT.items():
cid = C[k]
dashcards.append({
"id": neg, "card_id": cid, "row": row, "col": col, "size_x": sx, "size_y": sy,
"series": [], "parameter_mappings": mappings(cid), "visualization_settings": {}
})
neg -= 1
r = api("PUT", f"/api/dashboard/{DID}", {"dashcards": dashcards, "parameters": PARAMS})
print("dashcards saved:", len(r.get("dashcards",[])))
print("URL: https://reports.autingo.es/dashboard/%d" % DID)
json.dump({"dashboard_id":DID}, open("scratchpad/exf/dash.json","w"))
+313
View File
@@ -0,0 +1,313 @@
"""Genera la carpeta de documentacion de linaje en el Escritorio de Windows.
A partir del grafo trazado (scratchpad/lineage_graph.json) escribe:
00_INDICE.txt resumen + mapa de capas + tabla de todos los objetos
01_marts/<vista>.txt una por vista de customer_marts: que es + arbol de linaje + SQL
02_intermedio_clientes_intel/*.txt tablas base del pipeline de inteligencia de clientes
03_producto/*.txt cadena de catalogo de producto (vistas con SQL + bases)
04_fuentes/*.txt tablas fuente (replica Postgres, Navision, imagenes, tasas)
Todos los .txt se escriben con CRLF para abrirse limpios en Bloc de notas de Windows.
"""
import json
import os
import textwrap
DEST = "/mnt/c/Users/egutierrez/Desktop/linaje_customer_marts"
DATA = json.load(open("scratchpad/lineage_graph.json"))
G = DATA["graph"]
PROJECT = DATA["project"]
# ---------------------------------------------------------------------------
# Descripciones ("que es") por objeto. La SQL/DDL incluida en cada archivo es la
# fuente de verdad; estas lineas son un resumen para orientar al lector.
# ---------------------------------------------------------------------------
DESC = {
# ---- customer_marts (marts finales, grano = persona_id / cliente) ----
"customer_marts.customer_profile":
"Ficha maestra 360 del cliente: identidad + features agregadas + score CLV + segmento. Vista de perfil que consolida todo lo demas.",
"customer_marts.customer_monetary":
"Metricas monetarias del cliente (gasto total, ticket medio, recencia/frecuencia/valor). Componente M del RFM.",
"customer_marts.customer_channel":
"Canal del cliente: canal preferido transaccional, mix aurgi/motortown/web/servicio, canal de entrada (canal8) y fuentes de origen.",
"customer_marts.customer_contactability":
"Contactabilidad del cliente: disponibilidad de email/telefono y consentimientos, a partir de la dimension persona + features + segmento.",
"customer_marts.customer_category_spend":
"Gasto del cliente desglosado por categoria de producto, a partir de la tabla de hechos de transaccion.",
"customer_marts.customer_brand_affinity":
"Afinidad de marca del cliente: que marcas compra y con que peso, cruzando transacciones con el catalogo de producto (Objeto_productos).",
"customer_marts.customer_product":
"Productos comprados por el cliente (detalle de que ha adquirido) desde la tabla de hechos de transaccion.",
"customer_marts.customer_store_spend":
"Gasto del cliente por centro/tienda desde la tabla de hechos de transaccion.",
"customer_marts.customer_temporal":
"Patrones temporales de compra del cliente (estacionalidad, recencia, frecuencia) desde transacciones + features.",
"customer_marts.customer_vehicles":
"Vehiculos asociados al cliente: dimension vehiculo + features de vehiculo + mapping N:N persona-vehiculo.",
"customer_marts.customer_payment_method":
"Metodo de pago del cliente reconstruido desde los pedidos TPV (orders/invoice/payment/payment_types).",
"customer_marts.customer_promo_usage":
"Uso de promociones/descuentos por el cliente (pedidos con descuento) desde transacciones + pedidos TPV + segmento.",
"customer_marts.customer_promo_tolerance":
"Tolerancia del cliente a promociones: respuesta a campanas + sensibilidad a descuentos en pedidos.",
"customer_marts.customer_predictive":
"Senales predictivas del cliente: score CLV, proxima mejor accion (recomendaciones) y segmento.",
# ---- clientes_intel (capa intermedia; tablas base del pipeline de inteligencia de clientes) ----
"clientes_intel.dim_persona":
"Dimension PERSONA: identidad de cliente consolidada (una fila por persona_id). Nucleo de la doble identidad persona+vehiculo.",
"clientes_intel.dim_vehiculo":
"Dimension VEHICULO: una fila por vehiculo (matricula/bastidor) con sus atributos.",
"clientes_intel.fact_transaccion":
"Tabla de HECHOS de transaccion: linea/venta por cliente. Base de casi todos los marts monetarios y de producto.",
"clientes_intel.fact_campana_respuesta":
"Tabla de HECHOS de respuesta a campanas de marketing (envio/apertura/conversion) por cliente.",
"clientes_intel.feat_cliente_persona":
"Features agregadas a nivel PERSONA (RFM, mix de canal, indicadores derivados). Alimenta perfil, monetary, channel, temporal, contactability.",
"clientes_intel.feat_cliente_vehiculo":
"Features agregadas a nivel VEHICULO. Alimenta customer_vehicles.",
"clientes_intel.seg_cliente_360":
"Segmentacion 360 del cliente (segmentos de negocio / clusters). Alimenta perfil, channel, contactability, predictive, promo_usage.",
"clientes_intel.score_clv":
"Score de valor de vida del cliente (CLV). Alimenta perfil y predictive.",
"clientes_intel.reco_acciones":
"Recomendaciones / proxima mejor accion (NBA) por cliente. Alimenta customer_predictive.",
"clientes_intel.map_persona_canal8":
"Mapeo persona -> canal8 (canal de entrada). Puente para customer_channel.",
"clientes_intel.map_persona_fuente":
"Mapeo persona -> fuente(s) de origen (de que sistema/canal proviene el cliente). Puente para customer_channel.",
"clientes_intel.map_persona_vehiculo":
"Mapeo N:N persona <-> vehiculo. Puente para customer_vehicles.",
# ---- cadena de catalogo de producto ----
"anjana_bi_datamart.Objeto_productos":
"Vista maestra de PRODUCTO: catalogo Navision + categorias CGQ + imagenes + tasa/margen por material. Se usa para afinidad de marca.",
"anjana_bi_datamart.Cruce_16_07_cgq":
"Tabla de cruce de categorias CGQ (categoria/subcategoria/tipo) usada por Objeto_productos.",
"claude_bi.productos_tasa_mat":
"Tabla de tasa/margen por material de producto. La consume Objeto_productos.",
"external_datasets.product_object_images":
"Imagenes de producto (imagen principal/secundaria). Dataset externo. La consume Objeto_productos.",
"stg_anjana_bi.producto":
"Staging de producto: cruza item de Navision con equivalencias de matriculas (SAF). Capa de preparacion sobre las tablas de SQL Server.",
# ---- fuentes base ----
"psql_dcpublic.products":
"Catalogo de productos. Replica en BigQuery de la BBDD Postgres ANJANA (DCPublic).",
"psql_dcpublic.product_categories":
"Categorias de producto. Replica Postgres ANJANA (DCPublic).",
"psql_dcpublic.product_groups":
"Grupos de producto. Replica Postgres ANJANA (DCPublic).",
"psql_dcpublic.tpv_orders_order":
"Pedidos TPV (cabecera de pedido). Replica Postgres ANJANA (DCPublic).",
"psql_dcpublic.tpv_orders_orderitem":
"Lineas de pedido TPV. Replica Postgres ANJANA (DCPublic).",
"psql_dcpublic.tpv_orders_invoice":
"Facturas TPV. Replica Postgres ANJANA (DCPublic).",
"psql_dcpublic.tpv_orders_payment":
"Pagos de pedidos TPV. Replica Postgres ANJANA (DCPublic).",
"psql_dcpublic.tpv_payment_types":
"Tipos de pago TPV (catalogo). Replica Postgres ANJANA (DCPublic).",
"mssql2022_dbo.item":
"Catalogo de articulos de Navision (SQL Server 2022, esquema dbo).",
"mssql2022_dbo.equivalencias_matriculas_saf":
"Equivalencias de matriculas (SAF) en Navision (SQL Server 2022, esquema dbo).",
}
TYPE_ES = {
"VIEW": "VISTA (tiene SQL propio)",
"MATERIALIZED VIEW": "VISTA MATERIALIZADA (tiene SQL propio)",
"BASE TABLE": "TABLA BASE (datos materializados; sin SQL de definicion, solo esquema)",
"EXTERNAL": "TABLA EXTERNA",
"UNKNOWN": "DESCONOCIDO",
}
# Carpeta destino por objeto.
def folder_of(key: str) -> str:
ds = key.split(".", 1)[0]
if ds == "customer_marts":
return "01_marts"
if ds == "clientes_intel":
return "02_intermedio_clientes_intel"
if ds in ("anjana_bi_datamart", "claude_bi", "external_datasets", "stg_anjana_bi"):
return "03_producto"
return "04_fuentes"
def fname_of(key: str) -> str:
return key.replace(".", "__") + ".txt"
def relpath_of(key: str) -> str:
return f"{folder_of(key)}/{fname_of(key)}"
def desc_of(key: str) -> str:
return DESC.get(key, "(sin descripcion)")
# ---------------------------------------------------------------------------
# Arbol de linaje recursivo (para los marts).
# ---------------------------------------------------------------------------
def render_tree(key: str, prefix: str | None = None, is_last: bool = True, seen=None) -> list[str]:
if seen is None:
seen = set()
tag = {"VIEW": "[vista]", "MATERIALIZED VIEW": "[vista mat]",
"BASE TABLE": "[TABLA BASE/FUENTE]", "EXTERNAL": "[externa]",
"UNKNOWN": "[?]"}.get(G.get(key, {"type": "UNKNOWN"})["type"], "")
if prefix is None: # raiz
lines = [f"{key} {tag}"]
child_prefix = ""
else:
connector = "└── " if is_last else "├── "
lines = [f"{prefix}{connector}{key} {tag}"]
child_prefix = prefix + (" " if is_last else "")
if key in seen:
lines[-1] += " (ya expandido arriba)"
return lines
seen.add(key)
refs = G.get(key, {"refs": []}).get("refs", [])
for i, r in enumerate(refs):
lines += render_tree(r, child_prefix, i == len(refs) - 1, seen)
return lines
# ---------------------------------------------------------------------------
# Escritura.
# ---------------------------------------------------------------------------
def w(path: str, text: str):
full = os.path.join(DEST, path)
os.makedirs(os.path.dirname(full), exist_ok=True)
with open(full, "w", newline="\r\n", encoding="utf-8") as f:
f.write(text)
SEP = "=" * 78 + "\n"
def object_file(key: str, include_tree: bool) -> str:
node = G[key]
out = []
out.append(SEP)
out.append(f"OBJETO : {PROJECT}.{key}\n")
out.append(f"TIPO : {TYPE_ES.get(node['type'], node['type'])}\n")
out.append(f"DATASET: {key.split('.',1)[0]}\n")
out.append(SEP)
out.append("\nQUE ES\n------\n")
out.append(textwrap.fill(desc_of(key), width=78) + "\n")
if node.get("refs"):
out.append("\nDEPENDE DIRECTAMENTE DE\n-----------------------\n")
for r in node["refs"]:
out.append(f" - {PROJECT}.{r} -> ver {relpath_of(r)}\n")
if include_tree:
out.append("\nLINAJE COMPLETO (hasta la fuente)\n---------------------------------\n")
out.append("\n".join(render_tree(key)) + "\n")
out.append("\nSQL / DDL\n---------\n")
if node["type"] in ("VIEW", "MATERIALIZED VIEW"):
out.append("(Definicion de la vista. Este es el SQL que puedes copiar.)\n\n")
else:
out.append("(Tabla base: no tiene SQL de transformacion. Se incluye el CREATE TABLE\n"
" con el esquema de columnas para referencia.)\n\n")
out.append(node["ddl"].strip() + "\n")
return "".join(out)
# Marts: incluir arbol de linaje.
marts = sorted(k for k in G if k.startswith("customer_marts."))
for k in marts:
w(f"01_marts/{fname_of(k)}", object_file(k, include_tree=True))
# Resto de objetos: sin arbol (o arbol solo si es vista con dependencias).
for k in sorted(G):
if k.startswith("customer_marts."):
continue
include_tree = G[k]["type"] in ("VIEW", "MATERIALIZED VIEW") and bool(G[k].get("refs"))
w(relpath_of(k), object_file(k, include_tree=include_tree))
# ---------------------------------------------------------------------------
# INDICE.
# ---------------------------------------------------------------------------
idx = []
idx.append(SEP)
idx.append("INDICE - LINAJE DEL DATASET customer_marts\n")
idx.append(f"Proyecto BigQuery: {PROJECT}\n")
idx.append(SEP)
idx.append("""
QUE ES ESTA CARPETA
-------------------
Documenta, para cada tabla/vista del dataset `customer_marts`, de donde salen sus
datos: la cadena completa desde el mart final hasta las tablas fuente, con el SQL
de cada vista listo para copiar y compartir.
Cada objeto tiene su propio .txt con:
- QUE ES (resumen de una linea; la SQL es la fuente de verdad)
- DE QUE DEPENDE (dependencias directas, con la ruta a su archivo)
- LINAJE COMPLETO (arbol hasta la fuente) -- solo en los marts y vistas
- SQL / DDL (el codigo: definicion de la vista, o el esquema si es tabla base)
MAPA DE CAPAS
-------------
customer_marts (VISTAS finales, grano = cliente/persona_id)
|
v
clientes_intel (TABLAS BASE: capa intermedia construida por el pipeline de
| inteligencia de clientes -- dim_*, feat_*, seg_*, score_*,
| reco_*, fact_*, map_*)
v
Fuentes:
- psql_dcpublic.* Replica en BigQuery de la BBDD Postgres ANJANA (TPV + catalogo)
- anjana_bi_datamart / claude_bi / external_datasets / stg_anjana_bi
Cadena de catalogo de PRODUCTO (Objeto_productos y sus fuentes)
- mssql2022_dbo.* Navision (SQL Server 2022, esquema dbo)
NOTA: las tablas de `clientes_intel` son TABLAS BASE: no son vistas, sino tablas que
un pipeline reconstruye cada dia con sentencias CREATE TABLE AS SELECT (CTAS). Su
esquema esta en 02_intermedio_clientes_intel/. El SQL REAL que las construye (y que
baja hasta TPV / customers / users / Navision / Salesforce) esta en la carpeta
05_construccion_clientes_intel/ -- ver tambien 00b_FUENTES_DE_CLIENTE.txt.
""")
idx.append(SEP)
idx.append("CARPETAS\n")
idx.append(SEP)
idx.append("""
01_marts/ Las 14 vistas de customer_marts (con arbol de linaje)
02_intermedio_clientes_intel/ Las 12 tablas base intermedias (esquema)
03_producto/ Cadena de catalogo de producto (vistas + bases)
04_fuentes/ Tablas fuente (replica Postgres, Navision, imagenes, tasas)
05_construccion_clientes_intel/ El SQL (CTAS) que construye cada tabla de clientes_intel
00b_FUENTES_DE_CLIENTE.txt Que consulta lee cada fuente de cliente (TPV/customers/
users/Navision/Salesforce)
""")
def index_block(title, keys):
lines = [SEP, title + "\n", SEP, "\n"]
for k in keys:
t = {"VIEW": "vista", "MATERIALIZED VIEW": "vista_mat", "BASE TABLE": "tabla",
"EXTERNAL": "externa", "UNKNOWN": "?"}.get(G[k]["type"], "")
lines.append(f"[{t:9s}] {k}\n")
lines.append(f" {desc_of(k)}\n")
lines.append(f" archivo: {relpath_of(k)}\n\n")
return "".join(lines)
idx.append(index_block("1) MARTS FINALES (customer_marts)", marts))
idx.append(index_block("2) CAPA INTERMEDIA (clientes_intel)",
sorted(k for k in G if k.startswith("clientes_intel."))))
idx.append(index_block("3) CADENA DE PRODUCTO",
sorted(k for k in G if folder_of(k) == "03_producto")))
idx.append(index_block("4) FUENTES BASE",
sorted(k for k in G if folder_of(k) == "04_fuentes")))
w("00_INDICE.txt", "".join(idx))
# Conteo final
n_files = sum(len(files) for _, _, files in os.walk(DEST))
print(f"Escrito en: {DEST}")
print(f"Archivos .txt generados: {n_files}")
print("Estructura:")
for root, dirs, files in sorted(os.walk(DEST)):
rel = os.path.relpath(root, DEST)
if rel == ".":
for f in sorted(files):
print(f" {f}")
else:
print(f" {rel}/ ({len(files)} archivos)")
+164
View File
@@ -0,0 +1,164 @@
"""Genera 05_construccion_clientes_intel/ (SQL CTAS de cada tabla de clientes_intel)
y 00b_FUENTES_DE_CLIENTE.txt (mapa fuente-de-cliente -> consulta que la lee).
Fuente de datos: scratchpad/intel_build.json (SQL de construccion capturado de
INFORMATION_SCHEMA.JOBS) y scratchpad/intel_lineage.json (tablas implicadas).
"""
import json
import os
import textwrap
DEST = "/mnt/c/Users/egutierrez/Desktop/linaje_customer_marts"
PROJECT = "autingo-159109"
builds = json.load(open("scratchpad/intel_build.json"))
lin = json.load(open("scratchpad/intel_lineage.json"))
# Tablas para las que escribimos el SQL de construccion: las del linaje de customer_marts
# + las que leen fuentes de cliente/Salesforce.
EXTRA = ["seg_vega_persona", "fact_campana_respuesta__sfnew"]
want = sorted(set(lin["intel_involved"]) | set(EXTRA))
want = [t for t in want if t in builds] # solo las que tienen SQL capturado
DESC = {
"_persona_records":
"IDENTIDAD DEL CLIENTE (nucleo). UNION de 7 fuentes -> normaliza DNI/NIE/CIF, email y "
"telefono -> resuelve persona_id (FARM_FINGERPRINT de persona_key) con nivel de confianza. "
"AQUI es donde se juntan TPV customers, customers web, OTR, Navision, citaprevia, users y "
"Salesforce contacts_latest.",
"dim_persona":
"Dimension PERSONA final: una fila por persona_id, elegida desde _persona_records "
"(prioriza el mejor registro por fuente/confianza) + banderas de contacto.",
"dim_vehiculo":
"Dimension VEHICULO: una fila por vehiculo (matricula/bastidor) desde TPV vehicles, OTR, "
"citaprevia matriculas y calibrado de ano de matricula.",
"map_persona_fuente":
"Mapeo persona -> fuente(s) de origen (tpv/web/otr/navision/citaprevia/users/salesforce). "
"Registra de que sistemas proviene cada persona.",
"map_persona_vehiculo":
"Mapeo N:N persona <-> vehiculo (quien conduce/posee que coche) desde OTR, TPV vehicleowner "
"y citaprevia matriculas.",
"map_persona_canal8":
"Mapeo persona -> canal8 (canal de entrada del cliente).",
"fact_transaccion":
"Tabla de HECHOS de transaccion (linea/venta por persona). Base de los marts monetarios.",
"fact_visita":
"Tabla de HECHOS de visita (visitas del cliente al taller/tienda).",
"fact_campana_respuesta":
"HECHOS de respuesta a campanas: cruza envios/aperturas/clics/sms de Salesforce con personas.",
"fact_campana_respuesta__sfnew":
"Variante de fact_campana_respuesta con el esquema nuevo de Salesforce (email_sent/opened/clicked/sms).",
"feat_cliente_persona":
"Features agregadas por PERSONA (RFM, mix de canal, ticket medio, margen, recencia...).",
"feat_cliente_vehiculo":
"Features agregadas por VEHICULO.",
"seg_cliente_360":
"Segmentacion 360 del cliente (segmentos/clusters de negocio).",
"seg_vega_persona":
"Segmentacion VEGA por persona (contactabilidad/valor); lee fuentes de cliente para calcular "
"disponibilidad de contacto.",
"seg_cluster_persona":
"Clustering de personas (asignacion de cluster) que alimenta la segmentacion.",
"reco_acciones":
"Recomendaciones / proxima mejor accion (NBA) por cliente.",
"data_points_contacto":
"Puntos de dato de contacto (email/telefono) consolidados y calidad por persona.",
"_margen_rate_producto":
"Tasa de margen por producto (auxiliar para features monetarias).",
"_plate_year_calib":
"Calibrado del ano a partir de la matricula (auxiliar para dim_vehiculo).",
"dim_cp_provincia":
"Diccionario codigo postal -> provincia/CCAA.",
"tipologia_cliente":
"Tipologia de cliente (clasificacion de negocio).",
}
# Descripcion corta de cada fuente de cliente.
SRC_DESC = {
"psql_dcpublic.tpv_customers": "Clientes del TPV (mostrador). Replica Postgres ANJANA (DCPublic).",
"psql_dcpublic.customers": "Clientes web (e-commerce). Replica Postgres ANJANA (DCPublic).",
"psql_dcpublic.otr_customers": "Clientes de OTR (ordenes de reparacion/taller). Replica Postgres ANJANA.",
"psql_dcpublic.users": "Usuarios (cuentas). Replica Postgres ANJANA (DCPublic).",
"mssql2022_dbo.anjana_customer": "Cliente de NAVISION (SQL Server 2022, esquema dbo). Campos no_/e_mail/movil/name/post_code.",
"salesforce_ew1.contacts_latest": "Contactos de SALESFORCE (ultima version). Dataset en europe-west1.",
"salesforce_ew1.email_sent": "Envios de email de Salesforce (Marketing Cloud).",
"salesforce_ew1.email_opened": "Aperturas de email de Salesforce.",
"salesforce_ew1.email_clicked": "Clics de email de Salesforce.",
"salesforce_ew1.sms": "SMS de Salesforce.",
"citaprevia_aurphcp.clientes": "Clientes de CITA PREVIA (aurphcp).",
"citaprevia_aurphcp.clientes_matriculas": "Matriculas por cliente en cita previa.",
"psql_dcpublic.tpv_vehicles_vehicle": "Vehiculos del TPV. Replica Postgres ANJANA.",
"psql_dcpublic.tpv_vehicles_vehicleowner": "Propietarios de vehiculo del TPV (N:N). Replica Postgres ANJANA.",
}
CUST_SOURCES = list(SRC_DESC.keys())
SEP = "=" * 78 + "\n"
def w(path, text):
full = os.path.join(DEST, path)
os.makedirs(os.path.dirname(full), exist_ok=True)
with open(full, "w", newline="\r\n", encoding="utf-8") as f:
f.write(text)
def build_file(tbl):
b = builds[tbl]
out = [SEP, f"OBJETO : {PROJECT}.clientes_intel.{tbl}\n",
f"TIPO : TABLA BASE construida por {b['stmt']} (se reconstruye periodicamente)\n",
f"ULTIMA EJECUCION CAPTURADA: {b['last_run']}\n", SEP,
"\nQUE ES\n------\n",
textwrap.fill(DESC.get(tbl, "(sin descripcion)"), width=78) + "\n"]
if b["refs"]:
out.append("\nLEE DE (tablas fuente / intermedias)\n------------------------------------\n")
for r in b["refs"]:
note = " << FUENTE DE CLIENTE" if r in SRC_DESC else ""
out.append(f" - {PROJECT}.{r}{note}\n")
out.append("\nSQL DE CONSTRUCCION (copiable)\n------------------------------\n\n")
out.append(b["query"].strip() + "\n")
return "".join(out)
for t in want:
w(f"05_construccion_clientes_intel/{t}.txt", build_file(t))
# 00b_FUENTES_DE_CLIENTE.txt
f = [SEP, "FUENTES DE CLIENTE -> QUE CONSULTA DE clientes_intel LAS USA\n", SEP,
"\nResponde a: de donde salen los clientes (TPV, web, OTR, Navision, Salesforce, cita\n"
"previa) y en que consulta se juntan. El punto de union de identidades es\n"
"_persona_records (ver 05_construccion_clientes_intel/_persona_records.txt).\n\n"]
f.append(SEP + "RESUMEN: LO QUE PEDISTE\n" + SEP + "\n")
mapping = [
("TPV customers", "psql_dcpublic.tpv_customers"),
("customers (web)", "psql_dcpublic.customers"),
("customers (OTR / taller)", "psql_dcpublic.otr_customers"),
("users", "psql_dcpublic.users"),
("customer de NAVISION", "mssql2022_dbo.anjana_customer"),
("SALESFORCE (contactos)", "salesforce_ew1.contacts_latest"),
]
for label, src in mapping:
f.append(f" {label:26s} -> {PROJECT}.{src}\n")
f.append("\n SI: tenemos Salesforce. El dataset es `salesforce_ew1` (europe-west1):\n"
" contactos en contacts_latest; marketing en email_sent/opened/clicked y sms.\n\n")
for src in CUST_SOURCES:
consumers = sorted(t for t, b in builds.items() if src in b["refs"])
f.append(SEP)
f.append(f"{PROJECT}.{src}\n")
f.append(SEP)
f.append(f" {SRC_DESC[src]}\n")
f.append(" La leen estas tablas de clientes_intel (con su SQL en 05_construccion_...):\n")
if consumers:
for t in consumers:
star = " [SQL disponible]" if t in want else ""
f.append(f" - {t} ({builds[t]['stmt']}){star}\n")
else:
f.append(" (ninguna la referencia directamente)\n")
f.append("\n")
w("00b_FUENTES_DE_CLIENTE.txt", "".join(f))
print("Generado:")
print(f" 05_construccion_clientes_intel/ -> {len(want)} archivos con SQL de construccion")
print(f" 00b_FUENTES_DE_CLIENTE.txt")
print("\nTablas con SQL de construccion escrito:")
for t in want:
print(f" - {t}")
+126
View File
@@ -0,0 +1,126 @@
"""Genera 00c_VERIFICACION.txt (chequeo de completitud del linaje) y
06_otros_outputs_clientes_intel/ (SQL de las tablas de clientes_intel que NO acaban
en customer_marts, para no dejar ninguna atras).
"""
import json
import os
import textwrap
DEST = "/mnt/c/Users/egutierrez/Desktop/linaje_customer_marts"
PROJECT = "autingo-159109"
builds = json.load(open("scratchpad/intel_build.json"))
lin = json.load(open("scratchpad/intel_lineage.json"))
involved = set(lin["intel_involved"])
# Catalogo completo de clientes_intel (40 objetos) reconstruido: involved + leftovers conocidos.
LEFTOVER = [
"_presupuesto_persona", "_veh_cluster_feat", "_veh_tec_feat", "audit_persona_divergencias",
"calidad_email_snapshot", "f0_audit_keys", "fact_impacto_campana", "map_mutualista_particular",
"reco_promo_personalizada", "reco_promo_segmento", "rpt_campana", "rpt_campana_lift",
"rpt_campana_usuario", "rpt_impacto_persona", "seg_audiencia", "seg_vega_persona",
"sf_contact_map", "tipologia_cliente_resumen", "veh_cluster",
]
# Clasificacion por proposito (a donde va cada leftover).
CATEGORY = {
"rpt_campana": "Informe de campanas (BI / dashboards de marketing)",
"rpt_campana_lift": "Informe de campanas: lift (BI / dashboards)",
"rpt_campana_usuario": "Informe de campanas por usuario (BI / dashboards)",
"rpt_impacto_persona": "Informe de impacto por persona (BI / dashboards)",
"fact_impacto_campana": "Hechos de impacto de campana (base de los informes)",
"reco_promo_personalizada": "Recomendacion de promo personalizada (activacion)",
"reco_promo_segmento": "Recomendacion de promo por segmento (activacion)",
"seg_audiencia": "Audiencias para activacion (probable push a Salesforce/Marketing)",
"sf_contact_map": "Mapa de contactos Salesforce (sincronizacion de IDs)",
"audit_persona_divergencias": "Auditoria de calidad: divergencias en resolucion de persona",
"calidad_email_snapshot": "Auditoria de calidad: snapshot de emails",
"f0_audit_keys": "Auditoria de claves (control interno del pipeline)",
"_presupuesto_persona": "Auxiliar: presupuestos por persona (interim)",
"_veh_cluster_feat": "Auxiliar: features para clustering de vehiculo (interim)",
"_veh_tec_feat": "Auxiliar: features tecnicas de vehiculo (interim)",
"veh_cluster": "Clustering de vehiculo (resultado; no lo usan los marts hoy)",
"tipologia_cliente_resumen": "Resumen de tipologia de cliente (BI)",
"map_mutualista_particular": "Vista auxiliar: mapa mutualista/particular",
"seg_vega_persona": "Segmentacion VEGA por persona (contactabilidad; lee fuentes de cliente)",
}
SEP = "=" * 78 + "\n"
def w(path, text):
full = os.path.join(DEST, path)
os.makedirs(os.path.dirname(full), exist_ok=True)
with open(full, "w", newline="\r\n", encoding="utf-8") as f:
f.write(text)
# --- 06: SQL de los leftovers que tengan build capturado ---
written = []
for t in LEFTOVER:
b = builds.get(t)
if not b:
continue
out = [SEP, f"OBJETO : {PROJECT}.clientes_intel.{t}\n",
f"TIPO : {b['stmt']} (NO alimenta customer_marts)\n",
f"ULTIMA EJECUCION CAPTURADA: {b['last_run']}\n", SEP,
"\nQUE ES / A DONDE VA\n-------------------\n",
textwrap.fill(CATEGORY.get(t, "(sin clasificar)"), width=78) + "\n"]
if b["refs"]:
out.append("\nLEE DE\n------\n")
for r in b["refs"]:
out.append(f" - {PROJECT}.{r}\n")
out.append("\nSQL DE CONSTRUCCION (copiable)\n------------------------------\n\n")
out.append(b["query"].strip() + "\n")
w(f"06_otros_outputs_clientes_intel/{t}.txt", "".join(out))
written.append(t)
# --- 00c: verificacion de completitud ---
v = [SEP, "VERIFICACION DE COMPLETITUD DEL LINAJE\n", SEP, "\n"]
v.append("PREGUNTA: todo esto acaba en customer_marts? Comprobado.\n\n")
v.append("""RESPUESTA CORTA
---------------
La cadena customer_marts -> fuentes esta COMPLETA (todas las referencias resueltas,
0 tablas sin identificar). PERO customer_marts NO es el unico destino: es UNO de los
consumidores de la capa clientes_intel.
- clientes_intel tiene 40 objetos.
- 21 de ellos alimentan (directa o indirectamente) las 14 vistas de customer_marts.
- 19 NO van a customer_marts: son OTRAS salidas del mismo pipeline (informes de
campana, recomendaciones de promo, audiencias, auditorias, auxiliares).
El unico dataset MODELADO que lee clientes_intel es customer_marts. El resto de lo que
lee clientes_intel y customer_marts son consultas de BI / ad-hoc (tablas temporales
_hexhash / anon...), es decir Metabase u otros lo consumen directamente. En ese sentido
customer_marts SI es terminal en el modelo (aguas abajo solo hay BI).
""")
v.append(SEP + "1) LAS 21 TABLAS DE clientes_intel QUE SI ALIMENTAN customer_marts\n" + SEP + "\n")
for t in sorted(involved):
b = builds.get(t, {})
v.append(f" - {t} ({b.get('stmt','(sin job)')})\n")
v.append("\n" + SEP + "2) LAS 19 TABLAS DE clientes_intel QUE NO VAN A customer_marts\n" + SEP + "\n")
v.append(" (SQL de cada una en 06_otros_outputs_clientes_intel/)\n\n")
for t in LEFTOVER:
sql_note = "" if t in written else " [sin SQL de job capturado]"
v.append(f" - {t:28s} {CATEGORY.get(t,'')}{sql_note}\n")
v.append("\n" + SEP + "3) FUENTES BASE ALCANZADAS (fin del linaje)\n" + SEP + "\n")
v.append(" Fuera de clientes_intel, el pipeline lee de:\n\n")
for s in sorted(lin["external_sources"]):
v.append(f" - {PROJECT}.{s}\n")
v.append("\n" + SEP + "4) NOTAS DE COBERTURA\n" + SEP + "\n")
v.append(""" - score_clv y seg_cluster_vehiculo: usadas por customer_marts pero sin CTAS reciente
en el historial de jobs (son modelos ML / cargas antiguas). Su esquema esta en
02_intermedio_clientes_intel/; no hay un SQL de un solo job que las reconstruya.
- El SQL de construccion se tomo del ULTIMO job exitoso de cada tabla
(INFORMATION_SCHEMA.JOBS, region europe-west1, ventana 120 dias). Si una tabla se
reconstruye con otra logica fuera de esa ventana, no se captura aqui.
- customer_marts: 14 vistas = el dataset entero (no falta ninguna).
""")
w("00c_VERIFICACION.txt", "".join(v))
print(f"06_otros_outputs_clientes_intel/ -> {len(written)} archivos")
print("00c_VERIFICACION.txt -> escrito")
print("\nLeftovers sin SQL capturado:", [t for t in LEFTOVER if t not in written] or "ninguno")
File diff suppressed because one or more lines are too long
+53
View File
@@ -0,0 +1,53 @@
{
"intel_involved": [
"_margen_rate_producto",
"_persona_records",
"_plate_year_calib",
"data_points_contacto",
"dim_cp_provincia",
"dim_persona",
"dim_vehiculo",
"fact_campana_respuesta",
"fact_transaccion",
"fact_visita",
"feat_cliente_persona",
"feat_cliente_vehiculo",
"map_persona_canal8",
"map_persona_fuente",
"map_persona_vehiculo",
"reco_acciones",
"score_clv",
"seg_cliente_360",
"seg_cluster_persona",
"seg_cluster_vehiculo",
"tipologia_cliente"
],
"external_sources": [
"anjana_bi_amg.margenes_mat",
"citaprevia_aurphcp.clientes",
"citaprevia_aurphcp.clientes_matriculas",
"claude_bi.churn_scores_current",
"claude_bi.conversion_cqg_base_mat",
"claude_bi.todos_datos_lineas_mat",
"mssql2022_dbo.anjana_customer",
"ontologia.aurgiCitas_mat",
"psql_dcpublic.call_transactions",
"psql_dcpublic.car_makes",
"psql_dcpublic.car_model_families",
"psql_dcpublic.car_models",
"psql_dcpublic.car_versions",
"psql_dcpublic.customers",
"psql_dcpublic.otr_customers",
"psql_dcpublic.otr_vehicles",
"psql_dcpublic.tecrmi_license_plates",
"psql_dcpublic.tpv_customers",
"psql_dcpublic.tpv_vehicles_vehicle",
"psql_dcpublic.tpv_vehicles_vehicleowner",
"psql_dcpublic.users",
"salesforce_ew1.contacts_latest",
"salesforce_ew1.email_clicked",
"salesforce_ew1.email_opened",
"salesforce_ew1.email_sent",
"salesforce_ew1.sms"
]
}
File diff suppressed because one or more lines are too long
+51
View File
@@ -0,0 +1,51 @@
"""Helper: run SQL against Metabase BigQuery db=6 via REST API.
Usage:
python3 mbq.py "SELECT 1"
python3 mbq.py < query.sql
Reads API key from `pass metabase/aurgi-api-key`.
Prints columns header + rows as TSV.
"""
import os
import sys
import json
import subprocess
sys.path.insert(0, "python/functions")
from metabase import MetabaseClient, metabase_execute_query
MB_URL = "https://reports.autingo.es"
DB_ID = 6
def get_key():
return subprocess.check_output(["pass", "show", "metabase/aurgi-api-key"]).decode().splitlines()[0].strip()
def run(sql, max_results=2000):
import httpx
c = MetabaseClient(MB_URL, get_key())
try:
res = metabase_execute_query(c, DB_ID, sql, max_results=max_results)
except httpx.HTTPStatusError as e:
print("HTTP", e.response.status_code, e.response.text[:3000])
return
data = res.get("data", {})
cols = [col.get("display_name") or col.get("name") for col in data.get("cols", [])]
rows = data.get("rows", [])
# error?
if res.get("error") or (res.get("status") and res.get("status") != "completed"):
print("ERROR:", json.dumps(res.get("error") or res, ensure_ascii=False)[:2000])
return
print("\t".join(str(x) for x in cols))
for r in rows:
print("\t".join("" if v is None else str(v) for v in r))
print(f"-- {len(rows)} rows", file=sys.stderr)
if __name__ == "__main__":
if len(sys.argv) > 1:
sql = sys.argv[1]
else:
sql = sys.stdin.read()
run(sql)
+106
View File
@@ -0,0 +1,106 @@
"""Traza la construccion de clientes_intel: para cada tabla, recupera el SQL del ultimo
job que la escribio (INFORMATION_SCHEMA.JOBS) + sus referenced_tables, y recorre hacia
atras hasta las tablas fuente (TPV, customers, users, Navision, Salesforce).
Vuelca todo a scratchpad/intel_build.json.
"""
import json
import warnings
warnings.filterwarnings("ignore")
import google.auth
from google.cloud import bigquery
PROJECT = "autingo-159109"
REGION = "region-europe-west1"
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/bigquery"])
creds = creds.with_quota_project(None)
c = bigquery.Client(project=PROJECT, credentials=creds)
# Ultimo job por tabla destino en clientes_intel: query + referenced_tables + stmt.
sql = f"""
WITH j AS (
SELECT
dest.table_id AS tbl,
query,
statement_type AS stmt,
creation_time,
ARRAY(
SELECT AS STRUCT rt.project_id, rt.dataset_id, rt.table_id
FROM UNNEST(referenced_tables) rt
) AS refs,
ROW_NUMBER() OVER (PARTITION BY dest.table_id ORDER BY creation_time DESC) AS rn
FROM `{PROJECT}`.`{REGION}`.INFORMATION_SCHEMA.JOBS_BY_PROJECT,
UNNEST([destination_table]) dest
WHERE dest.dataset_id = 'clientes_intel'
AND state = 'DONE' AND error_result IS NULL
AND statement_type IS NOT NULL
AND creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 120 DAY)
)
SELECT tbl, query, stmt, creation_time, refs FROM j WHERE rn = 1
ORDER BY tbl
"""
builds = {}
for r in c.query(sql).result():
refs = []
for rt in r.refs:
refs.append(f"{rt['dataset_id']}.{rt['table_id']}")
builds[r.tbl] = {
"query": r.query or "",
"stmt": r.stmt,
"last_run": str(r.creation_time),
"refs": sorted(set(x for x in refs if not x.endswith(f".{r.tbl}"))),
}
json.dump(builds, open("scratchpad/intel_build.json", "w"), indent=2, ensure_ascii=False)
print(f"tablas clientes_intel con SQL de construccion capturado: {len(builds)}\n")
# Recursion desde las 12 tablas usadas por customer_marts.
SEED = [
"dim_persona", "dim_vehiculo", "fact_transaccion", "fact_campana_respuesta",
"feat_cliente_persona", "feat_cliente_vehiculo", "seg_cliente_360", "score_clv",
"reco_acciones", "map_persona_canal8", "map_persona_fuente", "map_persona_vehiculo",
]
intel_involved = set()
external_sources = set()
stack = list(SEED)
while stack:
t = stack.pop()
if t in intel_involved:
continue
intel_involved.add(t)
b = builds.get(t)
if not b:
continue
for ref in b["refs"]:
ds, tbl = ref.split(".", 1)
if ds == "clientes_intel":
if tbl not in intel_involved:
stack.append(tbl)
else:
external_sources.add(ref)
print("== tablas clientes_intel implicadas en el linaje de customer_marts ==")
for t in sorted(intel_involved):
b = builds.get(t, {})
print(f" {t:26s} {b.get('stmt','(sin job)')}")
print("\n== FUENTES EXTERNAS (fuera de clientes_intel) usadas por el pipeline ==")
for s in sorted(external_sources):
print(f" {s}")
# Marcar las fuentes de CLIENTE que pide el usuario.
KEYS = ["customer", "customers", "cliente", "user", "usuario", "tpv", "salesforce",
"sf_", "contact", "mkt_cloud", "persona"]
print("\n== fuentes que parecen de CLIENTE/usuario ==")
for s in sorted(external_sources):
low = s.lower()
if any(k in low for k in KEYS):
print(f" {s}")
json.dump({
"intel_involved": sorted(intel_involved),
"external_sources": sorted(external_sources),
}, open("scratchpad/intel_lineage.json", "w"), indent=2, ensure_ascii=False)
+158
View File
@@ -0,0 +1,158 @@
"""Traza el linaje recursivo de las vistas de customer_marts hasta las tablas fuente.
Para cada objeto: obtiene su tipo (VIEW/BASE TABLE/EXTERNAL/MATERIALIZED VIEW) y su DDL
via INFORMATION_SCHEMA.TABLES, extrae las referencias a otras tablas del DDL y recurre
sobre las que son vistas. Vuelca el grafo completo a un JSON en scratchpad.
"""
import json
import re
import sys
import warnings
warnings.filterwarnings("ignore")
import google.auth
from google.cloud import bigquery
PROJECT = "autingo-159109"
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/bigquery"])
creds = creds.with_quota_project(None)
client = bigquery.Client(project=PROJECT, credentials=creds)
# Cache de metadata por dataset: {dataset: {table_name: {"type":..., "ddl":...}}}
dataset_cache: dict[str, dict] = {}
def load_dataset(dataset: str) -> dict:
"""Carga todas las tablas/vistas de un dataset (una query por dataset)."""
if dataset in dataset_cache:
return dataset_cache[dataset]
result: dict[str, dict] = {}
try:
sql = f"""
SELECT table_name, table_type, ddl
FROM `{PROJECT}`.`{dataset}`.INFORMATION_SCHEMA.TABLES
"""
for r in client.query(sql).result():
result[r.table_name] = {"type": r.table_type, "ddl": r.ddl or ""}
except Exception as e: # noqa: BLE001
print(f" [warn] no se pudo leer dataset {dataset}: {e}", file=sys.stderr)
dataset_cache[dataset] = result
return result
# En el DDL que emite INFORMATION_SCHEMA, las referencias a otras tablas SIEMPRE van
# entre backticks y totalmente cualificadas: `proyecto.dataset.tabla`. Los alias de
# CTE/JOIN (dp, fcp, f...) nunca llevan backticks, asi que restringiendo a lo que hay
# entre backticks eliminamos todo el ruido.
BACKTICK_RE = re.compile(r"`([^`]+)`")
# Variante con cada parte en su propio backtick: `proj`.`dataset`.`tabla`
MULTIPART_RE = re.compile(
r"`([A-Za-z0-9_-]+)`\.`([A-Za-z0-9_-]+)`(?:\.`([A-Za-z0-9_-]+)`)?"
)
def _norm(proj: str, ds: str, tbl: str) -> tuple[str, str] | None:
if ds.upper() == "INFORMATION_SCHEMA" or tbl.upper() == "INFORMATION_SCHEMA":
return None
return (ds, tbl)
def extract_refs(ddl: str) -> set[tuple[str, str]]:
"""Devuelve el conjunto de (dataset, table) referenciados en el cuerpo del DDL.
Se queda con el SELECT (tras el primer 'AS') para no capturar el nombre del propio objeto.
"""
body = ddl
m = re.search(r"\bAS\b", ddl, flags=re.IGNORECASE)
if m:
body = ddl[m.end():]
refs: set[tuple[str, str]] = set()
# Estilo `proyecto.dataset.tabla` (todo en un backtick).
for tok in BACKTICK_RE.findall(body):
parts = [p for p in tok.split(".") if p]
if len(parts) == 3:
r = _norm(parts[0], parts[1], parts[2])
elif len(parts) == 2:
r = _norm(PROJECT, parts[0], parts[1])
else:
r = None
if r:
refs.add(r)
# Estilo `proj`.`dataset`.`tabla` (parte por backtick, 3 partes cualificadas).
# OJO: `alias`.`columna` (2 partes con cada parte en su propio backtick) es una
# referencia a columna, NO a tabla — se descarta exigiendo las 3 partes.
for mt in MULTIPART_RE.finditer(body):
g1, g2, g3 = mt.group(1), mt.group(2), mt.group(3)
if g3:
r = _norm(g1, g2, g3)
if r:
refs.add(r)
return refs
graph: dict[str, dict] = {} # key "dataset.table" -> {type, ddl, refs:[...]}
visited: set[str] = set()
def visit(dataset: str, table: str, depth: int = 0):
key = f"{dataset}.{table}"
if key in visited:
return
visited.add(key)
meta = load_dataset(dataset).get(table)
if meta is None:
graph[key] = {"type": "UNKNOWN", "ddl": "", "refs": [], "depth": depth}
return
ddl = meta["ddl"]
ttype = meta["type"]
refs: list[str] = []
if ttype in ("VIEW", "MATERIALIZED VIEW"):
for ds, tbl in sorted(extract_refs(ddl)):
# Evitar auto-referencia
if ds == dataset and tbl == table:
continue
refs.append(f"{ds}.{tbl}")
graph[key] = {"type": ttype, "ddl": ddl, "refs": refs, "depth": depth}
for ref in refs:
rds, rtbl = ref.split(".", 1)
visit(rds, rtbl, depth + 1)
# Semillas: las 14 vistas de customer_marts.
SEEDS = [
"customer_brand_affinity", "customer_category_spend", "customer_channel",
"customer_contactability", "customer_monetary", "customer_payment_method",
"customer_predictive", "customer_product", "customer_profile",
"customer_promo_tolerance", "customer_promo_usage", "customer_store_spend",
"customer_temporal", "customer_vehicles",
]
for s in SEEDS:
visit("customer_marts", s, 0)
out = {
"project": PROJECT,
"seeds": [f"customer_marts.{s}" for s in SEEDS],
"graph": graph,
}
with open("scratchpad/lineage_graph.json", "w") as f:
json.dump(out, f, indent=2, ensure_ascii=False)
# Resumen
n_view = sum(1 for v in graph.values() if v["type"] in ("VIEW", "MATERIALIZED VIEW"))
n_base = sum(1 for v in graph.values() if v["type"] == "BASE TABLE")
n_ext = sum(1 for v in graph.values() if v["type"] == "EXTERNAL")
n_unk = sum(1 for v in graph.values() if v["type"] == "UNKNOWN")
print(f"objetos totales: {len(graph)} vistas: {n_view} base: {n_base} external: {n_ext} desconocidos: {n_unk}")
print("\n== objetos por dataset ==")
by_ds: dict[str, int] = {}
for k in graph:
ds = k.split(".", 1)[0]
by_ds[ds] = by_ds.get(ds, 0) + 1
for ds, n in sorted(by_ds.items(), key=lambda x: -x[1]):
print(f" {n:3d} {ds}")
Binary file not shown.
Binary file not shown.