"""Pipeline: crea dashboard operativo en Metabase para una app. Compone: metabase_auth + metabase_list_databases + metabase_create_card + metabase_create_dashboard + metabase_update_dashboard. Genera un dashboard estandar con KPIs, distribucion y tablas sobre la operations.db de una app: entities, relations, executions, assertions. Uso: python metabase_create_ops_dashboard.py python metabase_create_ops_dashboard.py docker_tui python metabase_create_ops_dashboard.py --all """ import argparse import os import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from metabase.client import metabase_auth from metabase import ( metabase_list_databases, metabase_create_card, metabase_create_dashboard, metabase_update_dashboard, metabase_list_dashboards, metabase_delete_dashboard, ) METABASE_URL = os.environ.get("METABASE_URL", "http://localhost:3000") EMAIL = os.environ.get("METABASE_ADMIN_EMAIL", "admin@fnregistry.local") PASSWORD = os.environ.get("METABASE_ADMIN_PASSWORD", "FnRegistry2024!") def dashboard_name(app_name: str) -> str: return f"ops: {app_name.replace('_', '-')}" def db_name_for_app(app_name: str) -> str: return f"ops-{app_name.replace('_', '-')}" def build_cards(app_name: str) -> list[dict]: """Define las cards del dashboard operativo.""" return [ # ---- Fila 0: KPIs (h=5) ---- { "name": f"[{app_name}] Entities", "display": "scalar", "sql": "SELECT COUNT(*) FROM entities;", "size_x": 6, "size_y": 5, "col": 0, "row": 0, }, { "name": f"[{app_name}] Relations", "display": "scalar", "sql": "SELECT COUNT(*) FROM relations;", "size_x": 6, "size_y": 5, "col": 6, "row": 0, }, { "name": f"[{app_name}] Executions", "display": "scalar", "sql": "SELECT COUNT(*) FROM executions;", "size_x": 6, "size_y": 5, "col": 12, "row": 0, }, { "name": f"[{app_name}] Assertions", "display": "scalar", "sql": "SELECT COUNT(*) FROM assertions;", "size_x": 6, "size_y": 5, "col": 18, "row": 0, }, # ---- Fila 5: Distribucion (h=8) ---- { "name": f"[{app_name}] Entities por Status", "display": "pie", "sql": "SELECT status, COUNT(*) AS cantidad FROM entities GROUP BY status ORDER BY cantidad DESC;", "size_x": 8, "size_y": 8, "col": 0, "row": 5, }, { "name": f"[{app_name}] Entities por Tipo", "display": "bar", "sql": "SELECT type_ref, COUNT(*) AS cantidad FROM entities GROUP BY type_ref ORDER BY cantidad DESC;", "size_x": 8, "size_y": 8, "col": 8, "row": 5, }, { "name": f"[{app_name}] Relations por Status", "display": "pie", "sql": "SELECT status, COUNT(*) AS cantidad FROM relations GROUP BY status ORDER BY cantidad DESC;", "size_x": 8, "size_y": 8, "col": 16, "row": 5, }, # ---- Fila 13: Executions y Assertions (h=9) ---- { "name": f"[{app_name}] Executions: Success vs Failure", "display": "pie", "sql": "SELECT status, COUNT(*) AS cantidad FROM executions GROUP BY status ORDER BY cantidad DESC;", "size_x": 8, "size_y": 9, "col": 0, "row": 13, }, { "name": f"[{app_name}] Assertions por Severity", "display": "bar", "sql": "SELECT severity, COUNT(*) AS cantidad FROM assertions GROUP BY severity ORDER BY cantidad DESC;", "size_x": 8, "size_y": 9, "col": 8, "row": 13, }, { "name": f"[{app_name}] Assertion Results", "display": "pie", "sql": "SELECT status, COUNT(*) AS cantidad FROM assertion_results GROUP BY status ORDER BY cantidad DESC;", "size_x": 8, "size_y": 9, "col": 16, "row": 13, }, # ---- Fila 22: Relaciones frecuentes (h=8) ---- { "name": f"[{app_name}] Top Relaciones (via)", "display": "row", "sql": """ SELECT CASE WHEN via = '' THEN '(directo)' ELSE via END AS via_funcion, COUNT(*) AS cantidad FROM relations GROUP BY via ORDER BY cantidad DESC LIMIT 15; """, "size_x": 12, "size_y": 8, "col": 0, "row": 22, }, { "name": f"[{app_name}] Executions Recientes", "display": "table", "sql": """ SELECT id, pipeline_id, status, duration_ms, records_in, records_out, CASE WHEN error = '' THEN '-' ELSE error END AS error, started_at FROM executions ORDER BY started_at DESC LIMIT 20; """, "size_x": 12, "size_y": 8, "col": 12, "row": 22, }, # ---- Fila 30: Tablas detalle (h=8) ---- { "name": f"[{app_name}] Lista de Entities", "display": "table", "sql": """ SELECT id, name, type_ref, status, domain, source, created_at, updated_at FROM entities ORDER BY updated_at DESC; """, "size_x": 24, "size_y": 8, "col": 0, "row": 30, }, { "name": f"[{app_name}] Assertion Results Detalle", "display": "table", "sql": """ SELECT ar.id, a.name AS assertion, a.kind, a.severity, ar.status, ar.value, ar.message, ar.evaluated_at FROM assertion_results ar JOIN assertions a ON a.id = ar.assertion_id ORDER BY ar.evaluated_at DESC LIMIT 50; """, "size_x": 24, "size_y": 8, "col": 0, "row": 38, }, ] def create_ops_dashboard(client, app_name: str, db_id: int) -> int: """Crea el dashboard operativo para una app. Retorna dashboard_id.""" dash_name = dashboard_name(app_name) # Eliminar dashboard existente si hay for d in metabase_list_dashboards(client): if d.get("name") == dash_name: print(f" Dashboard existente (id={d['id']}), recreando...") metabase_delete_dashboard(client, d["id"]) cards_def = build_cards(app_name) print(f" Creando {len(cards_def)} cards...") created = [] for i, card_def in enumerate(cards_def): card = metabase_create_card( client, name=card_def["name"], dataset_query={ "database": db_id, "type": "native", "native": {"query": card_def["sql"]}, }, display=card_def["display"], description=f"ops dashboard: {app_name}", ) created.append((card, card_def)) print(f" [{i+1}/{len(cards_def)}] {card_def['name']} (id={card['id']})") print(" Creando dashboard...") dashboard = metabase_create_dashboard( client, name=dash_name, description=f"Dashboard operativo de {app_name}: entities, relations, executions, assertions.", ) dash_id = dashboard["id"] dashcards = [] for idx, (card, card_def) in enumerate(created): dashcards.append({ "id": -(idx + 1), "card_id": card["id"], "size_x": card_def["size_x"], "size_y": card_def["size_y"], "col": card_def["col"], "row": card_def["row"], }) metabase_update_dashboard(client, dash_id, dashcards=dashcards) return dash_id def main(): parser = argparse.ArgumentParser(description="Crea dashboard operativo en Metabase para una app") parser.add_argument("app_name", nargs="?", help="Nombre de la app") parser.add_argument("--all", action="store_true", help="Crea dashboards para todas las apps registradas en Metabase") args = parser.parse_args() if not args.app_name and not args.all: parser.error("Se requiere app_name o --all") print("Autenticando en Metabase...") client = metabase_auth(METABASE_URL, EMAIL, PASSWORD) dbs = metabase_list_databases(client) if args.all: ops_dbs = [ (db["name"].replace("ops-", "").replace("-", "_"), db["id"]) for db in dbs if db["name"].startswith("ops-") ] if not ops_dbs: print("No hay databases operacionales registradas en Metabase.") sys.exit(1) print(f"Creando dashboards para {len(ops_dbs)} apps...\n") for app_name, db_id in ops_dbs: print(f"--- {app_name} (db_id={db_id}) ---") dash_id = create_ops_dashboard(client, app_name, db_id) print(f" Dashboard: {METABASE_URL}/dashboard/{dash_id}\n") else: app_name = args.app_name expected_db_name = f"ops-{app_name.replace('_', '-')}" db_id = None for db in dbs: if db["name"] == expected_db_name: db_id = db["id"] break if not db_id: print(f"ERROR: Database '{expected_db_name}' no encontrada en Metabase.") print("Databases disponibles:") for db in dbs: print(f" - {db['name']} (id={db['id']})") print(f"\nRegistra primero con: python metabase_add_ops_db.py {app_name}") sys.exit(1) print(f"App: {app_name} (db_id={db_id})") dash_id = create_ops_dashboard(client, app_name, db_id) print(f"\nDashboard listo: {METABASE_URL}/dashboard/{dash_id}") client.close() if __name__ == "__main__": main()