Files
fn_registry/python/functions/pipelines/metabase_create_ops_dashboard.py
T
egutierrez dd324b7785 feat: pipelines Metabase — add ops db, create ops dashboard, fix permissions
Tres pipelines Python para gestionar operations.db en Metabase:
- metabase_add_ops_db: registra la operations.db de una app como database SQLite
- metabase_create_ops_dashboard: genera dashboard operativo con 14 cards (KPIs,
  distribución, executions, assertions) para cualquier app
- metabase_fix_permissions: arregla SQLITE_READONLY_DIRECTORY haciendo chmod
  777/666 sin chown (que se propaga al host via bind mount)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 00:54:24 +01:00

292 lines
10 KiB
Python

"""Pipeline: crea dashboard operativo en Metabase para una app.
Compone: metabase_auth + metabase_list_databases + metabase_create_card
+ metabase_create_dashboard + metabase_update_dashboard.
Genera un dashboard estandar con KPIs, distribucion y tablas sobre la
operations.db de una app: entities, relations, executions, assertions.
Uso:
python metabase_create_ops_dashboard.py <app_name>
python metabase_create_ops_dashboard.py docker_tui
python metabase_create_ops_dashboard.py --all
"""
import argparse
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from metabase.client import metabase_auth
from metabase import (
metabase_list_databases,
metabase_create_card,
metabase_create_dashboard,
metabase_update_dashboard,
metabase_list_dashboards,
metabase_delete_dashboard,
)
METABASE_URL = os.environ.get("METABASE_URL", "http://localhost:3000")
EMAIL = os.environ.get("METABASE_ADMIN_EMAIL", "admin@fnregistry.local")
PASSWORD = os.environ.get("METABASE_ADMIN_PASSWORD", "FnRegistry2024!")
def dashboard_name(app_name: str) -> str:
return f"ops: {app_name.replace('_', '-')}"
def db_name_for_app(app_name: str) -> str:
return f"ops-{app_name.replace('_', '-')}"
def build_cards(app_name: str) -> list[dict]:
"""Define las cards del dashboard operativo."""
return [
# ---- Fila 0: KPIs (h=5) ----
{
"name": f"[{app_name}] Entities",
"display": "scalar",
"sql": "SELECT COUNT(*) FROM entities;",
"size_x": 6, "size_y": 5, "col": 0, "row": 0,
},
{
"name": f"[{app_name}] Relations",
"display": "scalar",
"sql": "SELECT COUNT(*) FROM relations;",
"size_x": 6, "size_y": 5, "col": 6, "row": 0,
},
{
"name": f"[{app_name}] Executions",
"display": "scalar",
"sql": "SELECT COUNT(*) FROM executions;",
"size_x": 6, "size_y": 5, "col": 12, "row": 0,
},
{
"name": f"[{app_name}] Assertions",
"display": "scalar",
"sql": "SELECT COUNT(*) FROM assertions;",
"size_x": 6, "size_y": 5, "col": 18, "row": 0,
},
# ---- Fila 5: Distribucion (h=8) ----
{
"name": f"[{app_name}] Entities por Status",
"display": "pie",
"sql": "SELECT status, COUNT(*) AS cantidad FROM entities GROUP BY status ORDER BY cantidad DESC;",
"size_x": 8, "size_y": 8, "col": 0, "row": 5,
},
{
"name": f"[{app_name}] Entities por Tipo",
"display": "bar",
"sql": "SELECT type_ref, COUNT(*) AS cantidad FROM entities GROUP BY type_ref ORDER BY cantidad DESC;",
"size_x": 8, "size_y": 8, "col": 8, "row": 5,
},
{
"name": f"[{app_name}] Relations por Status",
"display": "pie",
"sql": "SELECT status, COUNT(*) AS cantidad FROM relations GROUP BY status ORDER BY cantidad DESC;",
"size_x": 8, "size_y": 8, "col": 16, "row": 5,
},
# ---- Fila 13: Executions y Assertions (h=9) ----
{
"name": f"[{app_name}] Executions: Success vs Failure",
"display": "pie",
"sql": "SELECT status, COUNT(*) AS cantidad FROM executions GROUP BY status ORDER BY cantidad DESC;",
"size_x": 8, "size_y": 9, "col": 0, "row": 13,
},
{
"name": f"[{app_name}] Assertions por Severity",
"display": "bar",
"sql": "SELECT severity, COUNT(*) AS cantidad FROM assertions GROUP BY severity ORDER BY cantidad DESC;",
"size_x": 8, "size_y": 9, "col": 8, "row": 13,
},
{
"name": f"[{app_name}] Assertion Results",
"display": "pie",
"sql": "SELECT status, COUNT(*) AS cantidad FROM assertion_results GROUP BY status ORDER BY cantidad DESC;",
"size_x": 8, "size_y": 9, "col": 16, "row": 13,
},
# ---- Fila 22: Relaciones frecuentes (h=8) ----
{
"name": f"[{app_name}] Top Relaciones (via)",
"display": "row",
"sql": """
SELECT
CASE WHEN via = '' THEN '(directo)' ELSE via END AS via_funcion,
COUNT(*) AS cantidad
FROM relations
GROUP BY via
ORDER BY cantidad DESC
LIMIT 15;
""",
"size_x": 12, "size_y": 8, "col": 0, "row": 22,
},
{
"name": f"[{app_name}] Executions Recientes",
"display": "table",
"sql": """
SELECT
id,
pipeline_id,
status,
duration_ms,
records_in,
records_out,
CASE WHEN error = '' THEN '-' ELSE error END AS error,
started_at
FROM executions
ORDER BY started_at DESC
LIMIT 20;
""",
"size_x": 12, "size_y": 8, "col": 12, "row": 22,
},
# ---- Fila 30: Tablas detalle (h=8) ----
{
"name": f"[{app_name}] Lista de Entities",
"display": "table",
"sql": """
SELECT
id,
name,
type_ref,
status,
domain,
source,
created_at,
updated_at
FROM entities
ORDER BY updated_at DESC;
""",
"size_x": 24, "size_y": 8, "col": 0, "row": 30,
},
{
"name": f"[{app_name}] Assertion Results Detalle",
"display": "table",
"sql": """
SELECT
ar.id,
a.name AS assertion,
a.kind,
a.severity,
ar.status,
ar.value,
ar.message,
ar.evaluated_at
FROM assertion_results ar
JOIN assertions a ON a.id = ar.assertion_id
ORDER BY ar.evaluated_at DESC
LIMIT 50;
""",
"size_x": 24, "size_y": 8, "col": 0, "row": 38,
},
]
def create_ops_dashboard(client, app_name: str, db_id: int) -> int:
"""Crea el dashboard operativo para una app. Retorna dashboard_id."""
dash_name = dashboard_name(app_name)
# Eliminar dashboard existente si hay
for d in metabase_list_dashboards(client):
if d.get("name") == dash_name:
print(f" Dashboard existente (id={d['id']}), recreando...")
metabase_delete_dashboard(client, d["id"])
cards_def = build_cards(app_name)
print(f" Creando {len(cards_def)} cards...")
created = []
for i, card_def in enumerate(cards_def):
card = metabase_create_card(
client,
name=card_def["name"],
dataset_query={
"database": db_id,
"type": "native",
"native": {"query": card_def["sql"]},
},
display=card_def["display"],
description=f"ops dashboard: {app_name}",
)
created.append((card, card_def))
print(f" [{i+1}/{len(cards_def)}] {card_def['name']} (id={card['id']})")
print(" Creando dashboard...")
dashboard = metabase_create_dashboard(
client,
name=dash_name,
description=f"Dashboard operativo de {app_name}: entities, relations, executions, assertions.",
)
dash_id = dashboard["id"]
dashcards = []
for idx, (card, card_def) in enumerate(created):
dashcards.append({
"id": -(idx + 1),
"card_id": card["id"],
"size_x": card_def["size_x"],
"size_y": card_def["size_y"],
"col": card_def["col"],
"row": card_def["row"],
})
metabase_update_dashboard(client, dash_id, dashcards=dashcards)
return dash_id
def main():
parser = argparse.ArgumentParser(description="Crea dashboard operativo en Metabase para una app")
parser.add_argument("app_name", nargs="?", help="Nombre de la app")
parser.add_argument("--all", action="store_true", help="Crea dashboards para todas las apps registradas en Metabase")
args = parser.parse_args()
if not args.app_name and not args.all:
parser.error("Se requiere app_name o --all")
print("Autenticando en Metabase...")
client = metabase_auth(METABASE_URL, EMAIL, PASSWORD)
dbs = metabase_list_databases(client)
if args.all:
ops_dbs = [
(db["name"].replace("ops-", "").replace("-", "_"), db["id"])
for db in dbs
if db["name"].startswith("ops-")
]
if not ops_dbs:
print("No hay databases operacionales registradas en Metabase.")
sys.exit(1)
print(f"Creando dashboards para {len(ops_dbs)} apps...\n")
for app_name, db_id in ops_dbs:
print(f"--- {app_name} (db_id={db_id}) ---")
dash_id = create_ops_dashboard(client, app_name, db_id)
print(f" Dashboard: {METABASE_URL}/dashboard/{dash_id}\n")
else:
app_name = args.app_name
expected_db_name = f"ops-{app_name.replace('_', '-')}"
db_id = None
for db in dbs:
if db["name"] == expected_db_name:
db_id = db["id"]
break
if not db_id:
print(f"ERROR: Database '{expected_db_name}' no encontrada en Metabase.")
print("Databases disponibles:")
for db in dbs:
print(f" - {db['name']} (id={db['id']})")
print(f"\nRegistra primero con: python metabase_add_ops_db.py {app_name}")
sys.exit(1)
print(f"App: {app_name} (db_id={db_id})")
dash_id = create_ops_dashboard(client, app_name, db_id)
print(f"\nDashboard listo: {METABASE_URL}/dashboard/{dash_id}")
client.close()
if __name__ == "__main__":
main()