From f3e62e830391bbe4c8b3765e6d83fe8c11bae68b Mon Sep 17 00:00:00 2001 From: egutierrez Date: Tue, 7 Apr 2026 18:45:11 +0200 Subject: [PATCH] feat: add meta_bigq command for Metabase + BigQuery operations Comando Claude con referencia completa de funciones Metabase y BigQuery, flujos tipicos y ejemplos de uso combinado. --- .claude/commands/meta_bigq.md | 457 ++++++++++++++++++++++++++++++++++ 1 file changed, 457 insertions(+) create mode 100644 .claude/commands/meta_bigq.md diff --git a/.claude/commands/meta_bigq.md b/.claude/commands/meta_bigq.md new file mode 100644 index 00000000..ed92cec9 --- /dev/null +++ b/.claude/commands/meta_bigq.md @@ -0,0 +1,457 @@ +# /meta_bigq — Operar Metabase y BigQuery desde el registry + +Eres un agente de datos. Tienes acceso a funciones Python del fn_registry para controlar **Metabase** (dashboards, cards, queries, usuarios) y **Google BigQuery** (datasets, tablas, queries, jobs, routines). Usa estas funciones directamente — no inventes llamadas HTTP manuales. + +--- + +## Como ejecutar funciones + +```bash +PYTHON="python/.venv/bin/python3" + +# Ejecutar codigo inline +$PYTHON -c " +import sys; sys.path.insert(0, 'python/functions') +from metabase import metabase_auth, metabase_list_dashboards +client = metabase_auth('http://localhost:3000', 'admin@fnregistry.local', 'FnRegistry2024!') +print(metabase_list_dashboards(client)) +" + +# O con fn run para pipelines +./fn run init_metabase --project fn_registry +./fn run setup_metabase_volume +./fn run metabase_create_ops_dashboard docker_tui +``` + +Variables de entorno tipicas: +- `METABASE_URL` (default: `http://localhost:3000`) +- `METABASE_ADMIN_EMAIL` (default: `admin@fnregistry.local`) +- `METABASE_ADMIN_PASSWORD` (default: `FnRegistry2024!`) +- BigQuery usa ADC (`gcloud auth application-default login`) o `GOOGLE_APPLICATION_CREDENTIALS` + +--- + +## METABASE — Referencia rapida + +### Auth + +```python +from metabase import metabase_auth, MetabaseClient + +# Login con email/password +client = metabase_auth("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!") + +# O directo con API key +client = MetabaseClient("http://localhost:3000", "mb_api_key_xxxxx") + +# Context manager +with metabase_auth(...) as client: + pass # se cierra solo +``` + +### Cards (preguntas) + +```python +from metabase import ( + metabase_list_cards, # (client, filter="", model_id=0) -> list[dict] + metabase_get_card, # (client, card_id) -> dict + metabase_create_card, # (client, name, dataset_query, display="table", collection_id=0, description="") -> dict + metabase_update_card, # (client, card_id, **fields) -> dict # fields: name, description, display, dataset_query, archived... + metabase_delete_card, # (client, card_id) -> None # IRREVERSIBLE, preferir archived=True + metabase_execute_card, # (client, card_id, parameters=None) -> dict # ejecuta query de card guardada + metabase_execute_query, # (client, database_id, sql, max_results=0) -> dict # query ad-hoc +) + +# Crear card con SQL nativo +card = metabase_create_card(client, "Ventas por mes", { + "database": 1, "type": "native", + "native": {"query": "SELECT date_trunc('month', created_at) as mes, SUM(total) FROM orders GROUP BY 1"}, +}, display="line") + +# Actualizar query de una card +metabase_update_card(client, card["id"], dataset_query={ + "database": 1, "type": "native", + "native": {"query": "SELECT ... nueva query ..."}, +}) + +# Archivar (soft-delete) +metabase_update_card(client, 42, archived=True) + +# Query ad-hoc sin guardar +result = metabase_execute_query(client, 1, "SELECT COUNT(*) FROM users") +# result["data"]["rows"] = [[42]] +``` + +**Filtros de list_cards:** `all`, `mine`, `fav`, `archived`, `recent`, `popular`, `database`, `table` + +### Dashboards + +```python +from metabase import ( + metabase_list_dashboards, # (client, filter="") -> list[dict] + metabase_get_dashboard, # (client, dashboard_id) -> dict # incluye dashcards + metabase_create_dashboard, # (client, name, description="", collection_id=0) -> dict + metabase_update_dashboard, # (client, dashboard_id, **fields) -> dict + metabase_delete_dashboard, # (client, dashboard_id) -> None # IRREVERSIBLE +) + +# Crear dashboard + agregar cards +dash = metabase_create_dashboard(client, "KPIs Operativos", description="Metricas diarias") + +# Posicionar cards en el dashboard (dashcards es el estado COMPLETO) +metabase_update_dashboard(client, dash["id"], dashcards=[ + {"id": -1, "card_id": card1["id"], "row": 0, "col": 0, "size_x": 6, "size_y": 4}, + {"id": -2, "card_id": card2["id"], "row": 0, "col": 6, "size_x": 6, "size_y": 4}, + {"id": -3, "card_id": card3["id"], "row": 4, "col": 0, "size_x": 12, "size_y": 6}, +]) +# id negativo = card nueva, id positivo = card existente, omitida = eliminada +``` + +**Filtros de list_dashboards:** `all`, `mine`, `archived` + +### Databases + +```python +from metabase import ( + metabase_list_databases, # (client, include_tables=False) -> list + metabase_add_database, # (client, name, engine, details) -> dict + metabase_get_database, # (client, database_id) -> dict +) + +# Agregar SQLite +metabase_add_database(client, "Operations DB", "sqlite", {"db": "/data/operations.db"}) + +# Agregar PostgreSQL +metabase_add_database(client, "DW", "postgres", { + "host": "localhost", "port": 5432, "dbname": "warehouse", + "user": "reader", "password": "secret", +}) +``` + +### Usuarios + +```python +from metabase import ( + metabase_list_users, # (client, status="", query="", limit=0, offset=0) -> dict + metabase_get_user, # (client, user_id) -> dict + metabase_create_user, # (client, first_name, last_name, email, password="", group_ids=None) -> dict + metabase_update_user, # (client, user_id, **fields) -> dict + metabase_deactivate_user, # (client, user_id) -> None # soft-delete +) +``` + +### Setup y pipelines + +```python +from metabase import metabase_setup + +# Setup inicial de instancia nueva (obtiene setup-token automaticamente) +metabase_setup("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!") +``` + +```bash +# Pipelines ejecutables con fn run +./fn run init_metabase --project fn_registry # Docker: Postgres + Metabase +./fn run setup_metabase_volume # Copiar registry.db al contenedor +./fn run metabase_add_ops_db docker_tui # Registrar operations.db como database +./fn run metabase_create_ops_dashboard docker_tui # Dashboard operativo completo +./fn run metabase_fix_permissions # Arreglar permisos SQLite en Docker +``` + +--- + +## BIGQUERY — Referencia rapida + +### Auth + +```python +from bigquery import bq_auth, BQClient + +# ADC (gcloud auth application-default login) +client = bq_auth() + +# Proyecto explicito +client = bq_auth("my-project-id") + +# Service account JSON +client = bq_auth(credentials_path="/path/to/sa.json") + +# Context manager +with bq_auth("my-project") as client: + pass +``` + +### Datasets + +```python +from bigquery import ( + bq_create_dataset, # (client, dataset_id, location="US", description="", labels=None, default_table_expiration_ms=0) -> dict + bq_get_dataset, # (client, dataset_id) -> dict + bq_list_datasets, # (client) -> list[dict] + bq_update_dataset, # (client, dataset_id, description=None, labels=None, default_table_expiration_ms=None) -> dict + bq_delete_dataset, # (client, dataset_id, delete_contents=False) -> None +) + +bq_create_dataset(client, "analytics", location="EU", description="Data warehouse") +bq_delete_dataset(client, "temp", delete_contents=True) # borra tablas incluidas +``` + +### Tables + +```python +from bigquery import ( + bq_create_table, # (client, dataset_id, table_id, schema, partitioning=None, clustering=None, description="", labels=None) -> dict + bq_get_table, # (client, dataset_id, table_id) -> dict # schema, num_rows, num_bytes, partitioning... + bq_list_tables, # (client, dataset_id) -> list[dict] + bq_update_table, # (client, dataset_id, table_id, schema=None, description=None, labels=None) -> dict + bq_delete_table, # (client, dataset_id, table_id) -> None + bq_preview_rows, # (client, dataset_id, table_id, max_results=10) -> dict # SIN COSTE de query +) + +# Crear tabla con particionamiento +bq_create_table(client, "analytics", "events", + schema=[ + {"name": "event_id", "type": "STRING", "mode": "REQUIRED"}, + {"name": "user_id", "type": "STRING"}, + {"name": "event_type", "type": "STRING"}, + {"name": "created_at", "type": "TIMESTAMP"}, + {"name": "payload", "type": "JSON"}, + ], + partitioning={"type": "DAY", "field": "created_at"}, + clustering=["event_type", "user_id"], +) + +# Preview sin coste (usa Storage Read API, no ejecuta query) +preview = bq_preview_rows(client, "analytics", "events", max_results=5) +# {"columns": [...], "rows": [[...], ...], "total_rows": 1234567} + +# Schema: solo se pueden AGREGAR columnas, nunca eliminar +bq_update_table(client, "analytics", "events", schema=[ + *existing_schema, + {"name": "new_col", "type": "STRING"}, +]) +``` + +**Tipos de schema:** `STRING`, `INT64`, `FLOAT64`, `BOOL`, `TIMESTAMP`, `DATE`, `DATETIME`, `BYTES`, `NUMERIC`, `JSON`, `RECORD`/`STRUCT`, `GEOGRAPHY` +**Modos:** `NULLABLE` (default), `REQUIRED`, `REPEATED` + +### Queries y datos + +```python +from bigquery import ( + bq_query, # (client, sql, params=None, dry_run=False) -> dict + bq_insert_rows, # (client, dataset_id, table_id, rows) -> dict + bq_load_from_gcs, # (client, uri, dataset_id, table_id, source_format="CSV", write_disposition="WRITE_APPEND", autodetect=True, skip_leading_rows=0) -> dict + bq_load_from_file, # (client, file_path, dataset_id, table_id, ...) -> dict # mismos params que gcs + bq_export_to_gcs, # (client, dataset_id, table_id, destination_uri, destination_format="CSV", compression="NONE") -> dict + bq_copy_table, # (client, source_dataset, source_table, dest_dataset, dest_table, write_disposition="WRITE_EMPTY") -> dict +) + +# Query simple +result = bq_query(client, "SELECT COUNT(*) as total FROM analytics.events") +# {"columns": ["total"], "rows": [[1234567]], "total_rows": 1, "bytes_processed": 0, "cache_hit": True} + +# Query parametrizada (usa @nombre en SQL) +result = bq_query(client, "SELECT * FROM analytics.events WHERE event_type = @tipo LIMIT @n", params=[ + {"name": "tipo", "type": "STRING", "value": "purchase"}, + {"name": "n", "type": "INT64", "value": 100}, +]) + +# Estimar coste ANTES de ejecutar (no procesa datos) +estimate = bq_query(client, "SELECT * FROM analytics.events", dry_run=True) +# {"total_bytes_processed": 5368709120, "total_bytes_billed": 5368709120} +gb = estimate["total_bytes_processed"] / (1024**3) +print(f"Esta query procesara {gb:.2f} GB (~${gb * 6.25:.2f} USD)") + +# Streaming insert +bq_insert_rows(client, "analytics", "events", [ + {"event_id": "e1", "user_id": "u1", "event_type": "click", "created_at": "2026-04-07T10:00:00Z"}, + {"event_id": "e2", "user_id": "u2", "event_type": "purchase", "created_at": "2026-04-07T10:01:00Z"}, +]) +# {"inserted": 2, "errors": []} + +# Cargar CSV desde GCS +bq_load_from_gcs(client, "gs://bucket/data/*.csv", "analytics", "events", + source_format="CSV", write_disposition="WRITE_TRUNCATE", skip_leading_rows=1) + +# Cargar archivo local +bq_load_from_file(client, "/tmp/data.parquet", "analytics", "events", + source_format="PARQUET", write_disposition="WRITE_APPEND") + +# Exportar a GCS +bq_export_to_gcs(client, "analytics", "events", "gs://bucket/export/events-*.csv", + destination_format="CSV", compression="GZIP") + +# Copiar tabla +bq_copy_table(client, "analytics", "events", "analytics_backup", "events_20260407") +``` + +**write_disposition:** `WRITE_TRUNCATE` (reemplazar), `WRITE_APPEND` (agregar), `WRITE_EMPTY` (solo si vacia) +**source_format:** `CSV`, `NEWLINE_DELIMITED_JSON`, `AVRO`, `PARQUET`, `ORC` + +### Jobs + +```python +from bigquery import ( + bq_list_jobs, # (client, state_filter="", max_results=50, all_users=False) -> list[dict] + bq_get_job, # (client, job_id) -> dict # state, bytes_processed, errors + bq_cancel_job, # (client, job_id) -> dict +) + +# Ver jobs corriendo +running = bq_list_jobs(client, state_filter="running") +for j in running: + print(j["job_id"], j["job_type"], j["bytes_processed"]) + +# Cancelar un job pesado +bq_cancel_job(client, "job_abc123") +``` + +**state_filter:** `running`, `pending`, `done` + +### Routines (UDFs / Procedures) + +```python +from bigquery import ( + bq_create_routine, # (client, dataset_id, routine_id, body, routine_type="SCALAR_FUNCTION", language="SQL", arguments=None, return_type="", description="") -> dict + bq_list_routines, # (client, dataset_id) -> list[dict] + bq_delete_routine, # (client, dataset_id, routine_id) -> None +) + +# UDF SQL +bq_create_routine(client, "analytics", "double_value", + body="x * 2", + arguments=[{"name": "x", "data_type": "INT64"}], + return_type="INT64", +) + +# Stored procedure +bq_create_routine(client, "analytics", "refresh_summary", + body="BEGIN INSERT INTO summary SELECT ... FROM events; END;", + routine_type="PROCEDURE", +) + +# UDF JavaScript +bq_create_routine(client, "analytics", "parse_ua", + body="return uaParser.parse(ua).browser.name;", + language="JAVASCRIPT", + arguments=[{"name": "ua", "data_type": "STRING"}], + return_type="STRING", +) +``` + +--- + +## Flujos tipicos + +### 1. Explorar BigQuery y visualizar en Metabase + +```python +import sys; sys.path.insert(0, "python/functions") +from bigquery import bq_auth, bq_query +from metabase import metabase_auth, metabase_create_card, metabase_create_dashboard, metabase_update_dashboard + +# 1. Explorar datos en BQ +bq = bq_auth("my-project") +result = bq_query(bq, "SELECT event_type, COUNT(*) as cnt FROM analytics.events GROUP BY 1 ORDER BY 2 DESC LIMIT 10") +print(result["columns"], result["rows"]) + +# 2. Registrar BQ como database en Metabase (si no esta) +# Metabase soporta BigQuery como engine nativo + +# 3. Crear cards en Metabase apuntando a BQ +mb = metabase_auth("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!") +card = metabase_create_card(mb, "Eventos por tipo", { + "database": 2, # ID de la database BQ en Metabase + "type": "native", + "native": {"query": "SELECT event_type, COUNT(*) as cnt FROM analytics.events GROUP BY 1 ORDER BY 2 DESC"}, +}, display="bar") + +# 4. Crear dashboard +dash = metabase_create_dashboard(mb, "Analytics Overview") +metabase_update_dashboard(mb, dash["id"], dashcards=[ + {"id": -1, "card_id": card["id"], "row": 0, "col": 0, "size_x": 12, "size_y": 6}, +]) +``` + +### 2. ETL: archivo local -> BigQuery -> Metabase dashboard + +```python +from bigquery import bq_auth, bq_load_from_file, bq_query, bq_preview_rows +from metabase import metabase_auth, metabase_execute_query + +bq = bq_auth("my-project") + +# Cargar datos +bq_load_from_file(bq, "/tmp/sales.csv", "warehouse", "sales", + source_format="CSV", write_disposition="WRITE_TRUNCATE", skip_leading_rows=1) + +# Verificar +preview = bq_preview_rows(bq, "warehouse", "sales", max_results=3) +print(preview["total_rows"], "filas cargadas") + +# Consultar via Metabase (si BQ esta registrado como database) +mb = metabase_auth("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!") +result = metabase_execute_query(mb, 2, "SELECT region, SUM(amount) FROM sales GROUP BY 1") +``` + +### 3. Montar infraestructura desde cero + +```bash +# 1. Levantar Metabase + Postgres +./fn run init_metabase --project fn_registry + +# 2. Copiar registry.db al contenedor +./fn run setup_metabase_volume + +# 3. Setup inicial +python/.venv/bin/python3 -c " +import sys; sys.path.insert(0, 'python/functions') +from metabase import metabase_setup +metabase_setup('http://localhost:3000', 'admin@fnregistry.local', 'FnRegistry2024!') +" + +# 4. Registrar operations.db de una app +./fn run metabase_add_ops_db docker_tui + +# 5. Dashboard operativo automatico +./fn run metabase_create_ops_dashboard docker_tui +``` + +### 4. Auditar costes de BigQuery + +```python +from bigquery import bq_auth, bq_list_jobs, bq_query + +bq = bq_auth("my-project") + +# Jobs recientes completados +jobs = bq_list_jobs(bq, state_filter="done", max_results=20, all_users=True) +total_bytes = sum(j.get("bytes_processed") or 0 for j in jobs) +print(f"Ultimos 20 jobs: {total_bytes / (1024**3):.2f} GB procesados") + +# Dry-run antes de queries caras +estimate = bq_query(bq, "SELECT * FROM analytics.events WHERE created_at > '2026-01-01'", dry_run=True) +gb = estimate["total_bytes_processed"] / (1024**3) +cost = gb * 6.25 # $6.25/TB on-demand +print(f"Coste estimado: ${cost:.2f} USD ({gb:.1f} GB)") +``` + +--- + +## Buscar mas funciones + +Si necesitas algo que no esta aqui, busca en el registry: + +```bash +# FTS5 por nombre o descripcion +./fn search "lo que buscas" + +# Ver detalles de una funcion +./fn show + +# Inline desde Python +sqlite3 registry.db "SELECT id, description FROM functions WHERE id IN (SELECT id FROM functions_fts WHERE functions_fts MATCH 'description:export*') ORDER BY name;" +``` + +$ARGUMENTS