Files
fn_registry/.claude/commands/meta_bigq.md
T
egutierrez f3e62e8303 feat: add meta_bigq command for Metabase + BigQuery operations
Comando Claude con referencia completa de funciones Metabase y BigQuery,
flujos tipicos y ejemplos de uso combinado.
2026-04-07 18:45:11 +02:00

16 KiB

/meta_bigq — Operar Metabase y BigQuery desde el registry

Eres un agente de datos. Tienes acceso a funciones Python del fn_registry para controlar Metabase (dashboards, cards, queries, usuarios) y Google BigQuery (datasets, tablas, queries, jobs, routines). Usa estas funciones directamente — no inventes llamadas HTTP manuales.


Como ejecutar funciones

PYTHON="python/.venv/bin/python3"

# Ejecutar codigo inline
$PYTHON -c "
import sys; sys.path.insert(0, 'python/functions')
from metabase import metabase_auth, metabase_list_dashboards
client = metabase_auth('http://localhost:3000', 'admin@fnregistry.local', 'FnRegistry2024!')
print(metabase_list_dashboards(client))
"

# O con fn run para pipelines
./fn run init_metabase --project fn_registry
./fn run setup_metabase_volume
./fn run metabase_create_ops_dashboard docker_tui

Variables de entorno tipicas:

  • METABASE_URL (default: http://localhost:3000)
  • METABASE_ADMIN_EMAIL (default: admin@fnregistry.local)
  • METABASE_ADMIN_PASSWORD (default: FnRegistry2024!)
  • BigQuery usa ADC (gcloud auth application-default login) o GOOGLE_APPLICATION_CREDENTIALS

METABASE — Referencia rapida

Auth

from metabase import metabase_auth, MetabaseClient

# Login con email/password
client = metabase_auth("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!")

# O directo con API key
client = MetabaseClient("http://localhost:3000", "mb_api_key_xxxxx")

# Context manager
with metabase_auth(...) as client:
    pass  # se cierra solo

Cards (preguntas)

from metabase import (
    metabase_list_cards,      # (client, filter="", model_id=0) -> list[dict]
    metabase_get_card,        # (client, card_id) -> dict
    metabase_create_card,     # (client, name, dataset_query, display="table", collection_id=0, description="") -> dict
    metabase_update_card,     # (client, card_id, **fields) -> dict  # fields: name, description, display, dataset_query, archived...
    metabase_delete_card,     # (client, card_id) -> None  # IRREVERSIBLE, preferir archived=True
    metabase_execute_card,    # (client, card_id, parameters=None) -> dict  # ejecuta query de card guardada
    metabase_execute_query,   # (client, database_id, sql, max_results=0) -> dict  # query ad-hoc
)

# Crear card con SQL nativo
card = metabase_create_card(client, "Ventas por mes", {
    "database": 1, "type": "native",
    "native": {"query": "SELECT date_trunc('month', created_at) as mes, SUM(total) FROM orders GROUP BY 1"},
}, display="line")

# Actualizar query de una card
metabase_update_card(client, card["id"], dataset_query={
    "database": 1, "type": "native",
    "native": {"query": "SELECT ... nueva query ..."},
})

# Archivar (soft-delete)
metabase_update_card(client, 42, archived=True)

# Query ad-hoc sin guardar
result = metabase_execute_query(client, 1, "SELECT COUNT(*) FROM users")
# result["data"]["rows"] = [[42]]

Filtros de list_cards: all, mine, fav, archived, recent, popular, database, table

Dashboards

from metabase import (
    metabase_list_dashboards,    # (client, filter="") -> list[dict]
    metabase_get_dashboard,      # (client, dashboard_id) -> dict  # incluye dashcards
    metabase_create_dashboard,   # (client, name, description="", collection_id=0) -> dict
    metabase_update_dashboard,   # (client, dashboard_id, **fields) -> dict
    metabase_delete_dashboard,   # (client, dashboard_id) -> None  # IRREVERSIBLE
)

# Crear dashboard + agregar cards
dash = metabase_create_dashboard(client, "KPIs Operativos", description="Metricas diarias")

# Posicionar cards en el dashboard (dashcards es el estado COMPLETO)
metabase_update_dashboard(client, dash["id"], dashcards=[
    {"id": -1, "card_id": card1["id"], "row": 0, "col": 0, "size_x": 6, "size_y": 4},
    {"id": -2, "card_id": card2["id"], "row": 0, "col": 6, "size_x": 6, "size_y": 4},
    {"id": -3, "card_id": card3["id"], "row": 4, "col": 0, "size_x": 12, "size_y": 6},
])
# id negativo = card nueva, id positivo = card existente, omitida = eliminada

Filtros de list_dashboards: all, mine, archived

Databases

from metabase import (
    metabase_list_databases,  # (client, include_tables=False) -> list
    metabase_add_database,    # (client, name, engine, details) -> dict
    metabase_get_database,    # (client, database_id) -> dict
)

# Agregar SQLite
metabase_add_database(client, "Operations DB", "sqlite", {"db": "/data/operations.db"})

# Agregar PostgreSQL
metabase_add_database(client, "DW", "postgres", {
    "host": "localhost", "port": 5432, "dbname": "warehouse",
    "user": "reader", "password": "secret",
})

Usuarios

from metabase import (
    metabase_list_users,       # (client, status="", query="", limit=0, offset=0) -> dict
    metabase_get_user,         # (client, user_id) -> dict
    metabase_create_user,      # (client, first_name, last_name, email, password="", group_ids=None) -> dict
    metabase_update_user,      # (client, user_id, **fields) -> dict
    metabase_deactivate_user,  # (client, user_id) -> None  # soft-delete
)

Setup y pipelines

from metabase import metabase_setup

# Setup inicial de instancia nueva (obtiene setup-token automaticamente)
metabase_setup("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!")
# Pipelines ejecutables con fn run
./fn run init_metabase --project fn_registry     # Docker: Postgres + Metabase
./fn run setup_metabase_volume                    # Copiar registry.db al contenedor
./fn run metabase_add_ops_db docker_tui           # Registrar operations.db como database
./fn run metabase_create_ops_dashboard docker_tui # Dashboard operativo completo
./fn run metabase_fix_permissions                 # Arreglar permisos SQLite en Docker

BIGQUERY — Referencia rapida

Auth

from bigquery import bq_auth, BQClient

# ADC (gcloud auth application-default login)
client = bq_auth()

# Proyecto explicito
client = bq_auth("my-project-id")

# Service account JSON
client = bq_auth(credentials_path="/path/to/sa.json")

# Context manager
with bq_auth("my-project") as client:
    pass

Datasets

from bigquery import (
    bq_create_dataset,  # (client, dataset_id, location="US", description="", labels=None, default_table_expiration_ms=0) -> dict
    bq_get_dataset,     # (client, dataset_id) -> dict
    bq_list_datasets,   # (client) -> list[dict]
    bq_update_dataset,  # (client, dataset_id, description=None, labels=None, default_table_expiration_ms=None) -> dict
    bq_delete_dataset,  # (client, dataset_id, delete_contents=False) -> None
)

bq_create_dataset(client, "analytics", location="EU", description="Data warehouse")
bq_delete_dataset(client, "temp", delete_contents=True)  # borra tablas incluidas

Tables

from bigquery import (
    bq_create_table,  # (client, dataset_id, table_id, schema, partitioning=None, clustering=None, description="", labels=None) -> dict
    bq_get_table,     # (client, dataset_id, table_id) -> dict  # schema, num_rows, num_bytes, partitioning...
    bq_list_tables,   # (client, dataset_id) -> list[dict]
    bq_update_table,  # (client, dataset_id, table_id, schema=None, description=None, labels=None) -> dict
    bq_delete_table,  # (client, dataset_id, table_id) -> None
    bq_preview_rows,  # (client, dataset_id, table_id, max_results=10) -> dict  # SIN COSTE de query
)

# Crear tabla con particionamiento
bq_create_table(client, "analytics", "events",
    schema=[
        {"name": "event_id", "type": "STRING", "mode": "REQUIRED"},
        {"name": "user_id", "type": "STRING"},
        {"name": "event_type", "type": "STRING"},
        {"name": "created_at", "type": "TIMESTAMP"},
        {"name": "payload", "type": "JSON"},
    ],
    partitioning={"type": "DAY", "field": "created_at"},
    clustering=["event_type", "user_id"],
)

# Preview sin coste (usa Storage Read API, no ejecuta query)
preview = bq_preview_rows(client, "analytics", "events", max_results=5)
# {"columns": [...], "rows": [[...], ...], "total_rows": 1234567}

# Schema: solo se pueden AGREGAR columnas, nunca eliminar
bq_update_table(client, "analytics", "events", schema=[
    *existing_schema,
    {"name": "new_col", "type": "STRING"},
])

Tipos de schema: STRING, INT64, FLOAT64, BOOL, TIMESTAMP, DATE, DATETIME, BYTES, NUMERIC, JSON, RECORD/STRUCT, GEOGRAPHY Modos: NULLABLE (default), REQUIRED, REPEATED

Queries y datos

from bigquery import (
    bq_query,          # (client, sql, params=None, dry_run=False) -> dict
    bq_insert_rows,    # (client, dataset_id, table_id, rows) -> dict
    bq_load_from_gcs,  # (client, uri, dataset_id, table_id, source_format="CSV", write_disposition="WRITE_APPEND", autodetect=True, skip_leading_rows=0) -> dict
    bq_load_from_file, # (client, file_path, dataset_id, table_id, ...) -> dict  # mismos params que gcs
    bq_export_to_gcs,  # (client, dataset_id, table_id, destination_uri, destination_format="CSV", compression="NONE") -> dict
    bq_copy_table,     # (client, source_dataset, source_table, dest_dataset, dest_table, write_disposition="WRITE_EMPTY") -> dict
)

# Query simple
result = bq_query(client, "SELECT COUNT(*) as total FROM analytics.events")
# {"columns": ["total"], "rows": [[1234567]], "total_rows": 1, "bytes_processed": 0, "cache_hit": True}

# Query parametrizada (usa @nombre en SQL)
result = bq_query(client, "SELECT * FROM analytics.events WHERE event_type = @tipo LIMIT @n", params=[
    {"name": "tipo", "type": "STRING", "value": "purchase"},
    {"name": "n", "type": "INT64", "value": 100},
])

# Estimar coste ANTES de ejecutar (no procesa datos)
estimate = bq_query(client, "SELECT * FROM analytics.events", dry_run=True)
# {"total_bytes_processed": 5368709120, "total_bytes_billed": 5368709120}
gb = estimate["total_bytes_processed"] / (1024**3)
print(f"Esta query procesara {gb:.2f} GB (~${gb * 6.25:.2f} USD)")

# Streaming insert
bq_insert_rows(client, "analytics", "events", [
    {"event_id": "e1", "user_id": "u1", "event_type": "click", "created_at": "2026-04-07T10:00:00Z"},
    {"event_id": "e2", "user_id": "u2", "event_type": "purchase", "created_at": "2026-04-07T10:01:00Z"},
])
# {"inserted": 2, "errors": []}

# Cargar CSV desde GCS
bq_load_from_gcs(client, "gs://bucket/data/*.csv", "analytics", "events",
    source_format="CSV", write_disposition="WRITE_TRUNCATE", skip_leading_rows=1)

# Cargar archivo local
bq_load_from_file(client, "/tmp/data.parquet", "analytics", "events",
    source_format="PARQUET", write_disposition="WRITE_APPEND")

# Exportar a GCS
bq_export_to_gcs(client, "analytics", "events", "gs://bucket/export/events-*.csv",
    destination_format="CSV", compression="GZIP")

# Copiar tabla
bq_copy_table(client, "analytics", "events", "analytics_backup", "events_20260407")

write_disposition: WRITE_TRUNCATE (reemplazar), WRITE_APPEND (agregar), WRITE_EMPTY (solo si vacia) source_format: CSV, NEWLINE_DELIMITED_JSON, AVRO, PARQUET, ORC

Jobs

from bigquery import (
    bq_list_jobs,   # (client, state_filter="", max_results=50, all_users=False) -> list[dict]
    bq_get_job,     # (client, job_id) -> dict  # state, bytes_processed, errors
    bq_cancel_job,  # (client, job_id) -> dict
)

# Ver jobs corriendo
running = bq_list_jobs(client, state_filter="running")
for j in running:
    print(j["job_id"], j["job_type"], j["bytes_processed"])

# Cancelar un job pesado
bq_cancel_job(client, "job_abc123")

state_filter: running, pending, done

Routines (UDFs / Procedures)

from bigquery import (
    bq_create_routine,  # (client, dataset_id, routine_id, body, routine_type="SCALAR_FUNCTION", language="SQL", arguments=None, return_type="", description="") -> dict
    bq_list_routines,   # (client, dataset_id) -> list[dict]
    bq_delete_routine,  # (client, dataset_id, routine_id) -> None
)

# UDF SQL
bq_create_routine(client, "analytics", "double_value",
    body="x * 2",
    arguments=[{"name": "x", "data_type": "INT64"}],
    return_type="INT64",
)

# Stored procedure
bq_create_routine(client, "analytics", "refresh_summary",
    body="BEGIN INSERT INTO summary SELECT ... FROM events; END;",
    routine_type="PROCEDURE",
)

# UDF JavaScript
bq_create_routine(client, "analytics", "parse_ua",
    body="return uaParser.parse(ua).browser.name;",
    language="JAVASCRIPT",
    arguments=[{"name": "ua", "data_type": "STRING"}],
    return_type="STRING",
)

Flujos tipicos

1. Explorar BigQuery y visualizar en Metabase

import sys; sys.path.insert(0, "python/functions")
from bigquery import bq_auth, bq_query
from metabase import metabase_auth, metabase_create_card, metabase_create_dashboard, metabase_update_dashboard

# 1. Explorar datos en BQ
bq = bq_auth("my-project")
result = bq_query(bq, "SELECT event_type, COUNT(*) as cnt FROM analytics.events GROUP BY 1 ORDER BY 2 DESC LIMIT 10")
print(result["columns"], result["rows"])

# 2. Registrar BQ como database en Metabase (si no esta)
# Metabase soporta BigQuery como engine nativo

# 3. Crear cards en Metabase apuntando a BQ
mb = metabase_auth("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!")
card = metabase_create_card(mb, "Eventos por tipo", {
    "database": 2,  # ID de la database BQ en Metabase
    "type": "native",
    "native": {"query": "SELECT event_type, COUNT(*) as cnt FROM analytics.events GROUP BY 1 ORDER BY 2 DESC"},
}, display="bar")

# 4. Crear dashboard
dash = metabase_create_dashboard(mb, "Analytics Overview")
metabase_update_dashboard(mb, dash["id"], dashcards=[
    {"id": -1, "card_id": card["id"], "row": 0, "col": 0, "size_x": 12, "size_y": 6},
])

2. ETL: archivo local -> BigQuery -> Metabase dashboard

from bigquery import bq_auth, bq_load_from_file, bq_query, bq_preview_rows
from metabase import metabase_auth, metabase_execute_query

bq = bq_auth("my-project")

# Cargar datos
bq_load_from_file(bq, "/tmp/sales.csv", "warehouse", "sales",
    source_format="CSV", write_disposition="WRITE_TRUNCATE", skip_leading_rows=1)

# Verificar
preview = bq_preview_rows(bq, "warehouse", "sales", max_results=3)
print(preview["total_rows"], "filas cargadas")

# Consultar via Metabase (si BQ esta registrado como database)
mb = metabase_auth("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!")
result = metabase_execute_query(mb, 2, "SELECT region, SUM(amount) FROM sales GROUP BY 1")

3. Montar infraestructura desde cero

# 1. Levantar Metabase + Postgres
./fn run init_metabase --project fn_registry

# 2. Copiar registry.db al contenedor
./fn run setup_metabase_volume

# 3. Setup inicial
python/.venv/bin/python3 -c "
import sys; sys.path.insert(0, 'python/functions')
from metabase import metabase_setup
metabase_setup('http://localhost:3000', 'admin@fnregistry.local', 'FnRegistry2024!')
"

# 4. Registrar operations.db de una app
./fn run metabase_add_ops_db docker_tui

# 5. Dashboard operativo automatico
./fn run metabase_create_ops_dashboard docker_tui

4. Auditar costes de BigQuery

from bigquery import bq_auth, bq_list_jobs, bq_query

bq = bq_auth("my-project")

# Jobs recientes completados
jobs = bq_list_jobs(bq, state_filter="done", max_results=20, all_users=True)
total_bytes = sum(j.get("bytes_processed") or 0 for j in jobs)
print(f"Ultimos 20 jobs: {total_bytes / (1024**3):.2f} GB procesados")

# Dry-run antes de queries caras
estimate = bq_query(bq, "SELECT * FROM analytics.events WHERE created_at > '2026-01-01'", dry_run=True)
gb = estimate["total_bytes_processed"] / (1024**3)
cost = gb * 6.25  # $6.25/TB on-demand
print(f"Coste estimado: ${cost:.2f} USD ({gb:.1f} GB)")

Buscar mas funciones

Si necesitas algo que no esta aqui, busca en el registry:

# FTS5 por nombre o descripcion
./fn search "lo que buscas"

# Ver detalles de una funcion
./fn show <id>

# Inline desde Python
sqlite3 registry.db "SELECT id, description FROM functions WHERE id IN (SELECT id FROM functions_fts WHERE functions_fts MATCH 'description:export*') ORDER BY name;"

$ARGUMENTS