Files
fn_registry/scratchpad/trace_lineage.py
T
egutierrez 2ebc9efeb2 chore: auto-commit (8 archivos)
- scratchpad/gen_docs.py
- scratchpad/gen_intel.py
- scratchpad/gen_verify.py
- scratchpad/intel_build.json
- scratchpad/intel_lineage.json
- scratchpad/lineage_graph.json
- scratchpad/trace_intel.py
- scratchpad/trace_lineage.py

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-07-01 19:00:06 +02:00

159 lines
5.5 KiB
Python

"""Traza el linaje recursivo de las vistas de customer_marts hasta las tablas fuente.
Para cada objeto: obtiene su tipo (VIEW/BASE TABLE/EXTERNAL/MATERIALIZED VIEW) y su DDL
via INFORMATION_SCHEMA.TABLES, extrae las referencias a otras tablas del DDL y recurre
sobre las que son vistas. Vuelca el grafo completo a un JSON en scratchpad.
"""
import json
import re
import sys
import warnings
warnings.filterwarnings("ignore")
import google.auth
from google.cloud import bigquery
PROJECT = "autingo-159109"
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/bigquery"])
creds = creds.with_quota_project(None)
client = bigquery.Client(project=PROJECT, credentials=creds)
# Cache de metadata por dataset: {dataset: {table_name: {"type":..., "ddl":...}}}
dataset_cache: dict[str, dict] = {}
def load_dataset(dataset: str) -> dict:
"""Carga todas las tablas/vistas de un dataset (una query por dataset)."""
if dataset in dataset_cache:
return dataset_cache[dataset]
result: dict[str, dict] = {}
try:
sql = f"""
SELECT table_name, table_type, ddl
FROM `{PROJECT}`.`{dataset}`.INFORMATION_SCHEMA.TABLES
"""
for r in client.query(sql).result():
result[r.table_name] = {"type": r.table_type, "ddl": r.ddl or ""}
except Exception as e: # noqa: BLE001
print(f" [warn] no se pudo leer dataset {dataset}: {e}", file=sys.stderr)
dataset_cache[dataset] = result
return result
# En el DDL que emite INFORMATION_SCHEMA, las referencias a otras tablas SIEMPRE van
# entre backticks y totalmente cualificadas: `proyecto.dataset.tabla`. Los alias de
# CTE/JOIN (dp, fcp, f...) nunca llevan backticks, asi que restringiendo a lo que hay
# entre backticks eliminamos todo el ruido.
BACKTICK_RE = re.compile(r"`([^`]+)`")
# Variante con cada parte en su propio backtick: `proj`.`dataset`.`tabla`
MULTIPART_RE = re.compile(
r"`([A-Za-z0-9_-]+)`\.`([A-Za-z0-9_-]+)`(?:\.`([A-Za-z0-9_-]+)`)?"
)
def _norm(proj: str, ds: str, tbl: str) -> tuple[str, str] | None:
if ds.upper() == "INFORMATION_SCHEMA" or tbl.upper() == "INFORMATION_SCHEMA":
return None
return (ds, tbl)
def extract_refs(ddl: str) -> set[tuple[str, str]]:
"""Devuelve el conjunto de (dataset, table) referenciados en el cuerpo del DDL.
Se queda con el SELECT (tras el primer 'AS') para no capturar el nombre del propio objeto.
"""
body = ddl
m = re.search(r"\bAS\b", ddl, flags=re.IGNORECASE)
if m:
body = ddl[m.end():]
refs: set[tuple[str, str]] = set()
# Estilo `proyecto.dataset.tabla` (todo en un backtick).
for tok in BACKTICK_RE.findall(body):
parts = [p for p in tok.split(".") if p]
if len(parts) == 3:
r = _norm(parts[0], parts[1], parts[2])
elif len(parts) == 2:
r = _norm(PROJECT, parts[0], parts[1])
else:
r = None
if r:
refs.add(r)
# Estilo `proj`.`dataset`.`tabla` (parte por backtick, 3 partes cualificadas).
# OJO: `alias`.`columna` (2 partes con cada parte en su propio backtick) es una
# referencia a columna, NO a tabla — se descarta exigiendo las 3 partes.
for mt in MULTIPART_RE.finditer(body):
g1, g2, g3 = mt.group(1), mt.group(2), mt.group(3)
if g3:
r = _norm(g1, g2, g3)
if r:
refs.add(r)
return refs
graph: dict[str, dict] = {} # key "dataset.table" -> {type, ddl, refs:[...]}
visited: set[str] = set()
def visit(dataset: str, table: str, depth: int = 0):
key = f"{dataset}.{table}"
if key in visited:
return
visited.add(key)
meta = load_dataset(dataset).get(table)
if meta is None:
graph[key] = {"type": "UNKNOWN", "ddl": "", "refs": [], "depth": depth}
return
ddl = meta["ddl"]
ttype = meta["type"]
refs: list[str] = []
if ttype in ("VIEW", "MATERIALIZED VIEW"):
for ds, tbl in sorted(extract_refs(ddl)):
# Evitar auto-referencia
if ds == dataset and tbl == table:
continue
refs.append(f"{ds}.{tbl}")
graph[key] = {"type": ttype, "ddl": ddl, "refs": refs, "depth": depth}
for ref in refs:
rds, rtbl = ref.split(".", 1)
visit(rds, rtbl, depth + 1)
# Semillas: las 14 vistas de customer_marts.
SEEDS = [
"customer_brand_affinity", "customer_category_spend", "customer_channel",
"customer_contactability", "customer_monetary", "customer_payment_method",
"customer_predictive", "customer_product", "customer_profile",
"customer_promo_tolerance", "customer_promo_usage", "customer_store_spend",
"customer_temporal", "customer_vehicles",
]
for s in SEEDS:
visit("customer_marts", s, 0)
out = {
"project": PROJECT,
"seeds": [f"customer_marts.{s}" for s in SEEDS],
"graph": graph,
}
with open("scratchpad/lineage_graph.json", "w") as f:
json.dump(out, f, indent=2, ensure_ascii=False)
# Resumen
n_view = sum(1 for v in graph.values() if v["type"] in ("VIEW", "MATERIALIZED VIEW"))
n_base = sum(1 for v in graph.values() if v["type"] == "BASE TABLE")
n_ext = sum(1 for v in graph.values() if v["type"] == "EXTERNAL")
n_unk = sum(1 for v in graph.values() if v["type"] == "UNKNOWN")
print(f"objetos totales: {len(graph)} vistas: {n_view} base: {n_base} external: {n_ext} desconocidos: {n_unk}")
print("\n== objetos por dataset ==")
by_ds: dict[str, int] = {}
for k in graph:
ds = k.split(".", 1)[0]
by_ds[ds] = by_ds.get(ds, 0) + 1
for ds, n in sorted(by_ds.items(), key=lambda x: -x[1]):
print(f" {n:3d} {ds}")