Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
27 KiB
/meta_bigq — Operar Metabase y BigQuery desde el registry
Eres un agente de datos. Tienes acceso a funciones Python del fn_registry para controlar Metabase (dashboards, cards, queries, usuarios) y Google BigQuery (datasets, tablas, queries, jobs, routines). Usa estas funciones directamente — no inventes llamadas HTTP manuales.
Como ejecutar funciones
PYTHON="python/.venv/bin/python3"
# Ejecutar codigo inline
$PYTHON -c "
import sys; sys.path.insert(0, 'python/functions')
from metabase import metabase_auth, metabase_list_dashboards
client = metabase_auth('http://localhost:3000', 'admin@fnregistry.local', 'FnRegistry2024!')
print(metabase_list_dashboards(client))
"
# O con fn run para pipelines
./fn run init_metabase --project fn_registry
./fn run setup_metabase_volume
./fn run metabase_create_ops_dashboard docker_tui
Variables de entorno tipicas:
METABASE_URL(default:http://localhost:3000)METABASE_ADMIN_EMAIL(default:admin@fnregistry.local)METABASE_ADMIN_PASSWORD(default:FnRegistry2024!)- BigQuery usa ADC (
gcloud auth application-default login) oGOOGLE_APPLICATION_CREDENTIALS
METABASE — Referencia rapida
Auth
from metabase import metabase_auth, MetabaseClient
# Login con email/password
client = metabase_auth("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!")
# O directo con API key
client = MetabaseClient("http://localhost:3000", "mb_api_key_xxxxx")
# Context manager
with metabase_auth(...) as client:
pass # se cierra solo
Cards (preguntas)
from metabase import (
metabase_list_cards, # (client, filter="", model_id=0) -> list[dict]
metabase_get_card, # (client, card_id) -> dict
metabase_create_card, # (client, name, dataset_query, display="table", collection_id=0, description="") -> dict
metabase_update_card, # (client, card_id, **fields) -> dict # fields: name, description, display, dataset_query, archived...
metabase_delete_card, # (client, card_id) -> None # IRREVERSIBLE, preferir archived=True
metabase_execute_card, # (client, card_id, parameters=None) -> dict # ejecuta query de card guardada
metabase_execute_query, # (client, database_id, sql, max_results=0) -> dict # query ad-hoc
)
# Crear card con SQL nativo
card = metabase_create_card(client, "Ventas por mes", {
"database": 1, "type": "native",
"native": {"query": "SELECT date_trunc('month', created_at) as mes, SUM(total) FROM orders GROUP BY 1"},
}, display="line")
# Actualizar query de una card
metabase_update_card(client, card["id"], dataset_query={
"database": 1, "type": "native",
"native": {"query": "SELECT ... nueva query ..."},
})
# Archivar (soft-delete)
metabase_update_card(client, 42, archived=True)
# Query ad-hoc sin guardar
result = metabase_execute_query(client, 1, "SELECT COUNT(*) FROM users")
# result["data"]["rows"] = [[42]]
Filtros de list_cards: all, mine, fav, archived, recent, popular, database, table
Dashboards
from metabase import (
metabase_list_dashboards, # (client, filter="") -> list[dict]
metabase_get_dashboard, # (client, dashboard_id) -> dict # incluye dashcards
metabase_create_dashboard, # (client, name, description="", collection_id=0) -> dict
metabase_update_dashboard, # (client, dashboard_id, **fields) -> dict
metabase_delete_dashboard, # (client, dashboard_id) -> None # IRREVERSIBLE
)
# Crear dashboard + agregar cards
dash = metabase_create_dashboard(client, "KPIs Operativos", description="Metricas diarias")
# Posicionar cards en el dashboard (dashcards es el estado COMPLETO)
metabase_update_dashboard(client, dash["id"], dashcards=[
{"id": -1, "card_id": card1["id"], "row": 0, "col": 0, "size_x": 6, "size_y": 4},
{"id": -2, "card_id": card2["id"], "row": 0, "col": 6, "size_x": 6, "size_y": 4},
{"id": -3, "card_id": card3["id"], "row": 4, "col": 0, "size_x": 12, "size_y": 6},
])
# id negativo = card nueva, id positivo = card existente, omitida = eliminada
Filtros de list_dashboards: all, mine, archived
Dashboards — helpers compositivos (añadir KPIs a dashboard existente)
Helpers para el flujo tipico "anadir N cards (KPI) al final de un tab existente reusando los mismos filtros que otro card vecino". Evitan los gotchas: replicar parameter_mappings, calcular row libre, escapado raro de column_settings, generacion de lib/uuid en MBQL.
from metabase import (
metabase_mbql_from_source_card,
metabase_copy_dashcard_mappings,
metabase_dashboard_next_row,
metabase_dashboard_append_row,
metabase_viz_column_format,
metabase_smartscalar_anothercolumn_viz,
)
metabase_mbql_from_source_card
Construye dataset_query MBQL sobre una saved-card (source-card), con aggregations + joins + filters + breakouts + segunda stage de expressions. Genera lib/uuid automatico en cada nodo.
dq = metabase_mbql_from_source_card(
database_id=6,
source_card_id=5305,
aggregations=[
{"op": "sum", "field": "PrecioVenta", "base_type": "type/Decimal"},
{"op": "sum", "field": "PrecioCompra", "base_type": "type/Decimal"},
{"op": "sum", "field": "PrecioTasas", "base_type": "type/Float"},
],
joins=[
{"alias": "Centros - idCentro", "source_card_id": 4076,
"fields": "none", "local_field": "idCentro", "local_base_type": "type/Text",
"foreign_field_id": 17316, "foreign_base_type": "type/Text"},
],
filters=[["not-empty", {}, ["field", {"base-type": "type/Text"},
"Centros - idCentro__Companies__name"]]],
expressions=[
{"name": "MasadeMargen", "expr":
{"op": "-", "args": [{"field": "sum"},
{"op": "+", "args": [{"field": "sum_2"}, {"field": "sum_3", "base_type": "type/Float"}]}]}},
{"name": "Margen", "expr":
{"op": "coalesce", "args": [
{"op": "/", "args": [
{"op": "-", "args": [{"field": "sum"},
{"op": "+", "args": [{"field": "sum_2"}, {"field": "sum_3", "base_type": "type/Float"}]}]},
{"field": "sum"}]},
0]}},
],
)
Ops soportadas en expressions: +, -, *, /, coalesce, case. Referencia a otra expresion en la misma stage: {"ref": "Margen"}. Aliases de aggregations son posicionales: sum, sum_2, sum_3... (orden = declaracion).
metabase_copy_dashcard_mappings
Copia los parameter_mappings de un dashcard "donante" a un card nuevo. Devuelve lista lista para pegar en dashcards_add.
mappings = metabase_copy_dashcard_mappings(
client,
dashboard_id=734,
source_card_id=9918, # card donante con 18 filtros mapeados
dest_card_id=9947, # card destino nueva
)
# Devuelve [{"parameter_id","card_id","target"}, ...] con card_id=9947
metabase_dashboard_next_row
Calcula el primer row libre al final de un tab.
row = metabase_dashboard_next_row(client, dashboard_id=734, tab_id=191)
# row=12 si el ultimo card termina en row+size_y=12
# tab_id=0 → dashboards sin tabs
metabase_dashboard_append_row
Combo: append N cards en una fila horizontal al final del tab, copiando mappings de un donante. Una sola llamada hace next_row + grid math + copy_mappings + update_dashboard_safe.
metabase_dashboard_append_row(
client,
dashboard_id=734,
tab_id=191,
card_ids=[9947, 9948, 9949],
height=4,
donor_card_id=9918, # mismos 18 filtros del dashboard
grid_width=24, # default Metabase v0.59
)
# Coloca 3 cards de size_x=8 en row=next, cols 0/8/16, con mappings copiados
metabase_viz_column_format
Construye una entrada de column_settings con la clave JSON-escaped ('["name","Margen"]') sin tener que recordar el formato exacto.
metabase_viz_column_format("Margen", number_style="percent", decimals=2)
# {'["name","Margen"]': {"number_style": "percent", "decimals": 2}}
metabase_viz_column_format("MasadeMargen", number_style="currency",
currency="EUR", decimals=0, currency_in_header=False)
# {'["name","MasadeMargen"]': {...}}
Mergea varios resultados en column_settings de las visualization_settings.
metabase_smartscalar_anothercolumn_viz
Construye visualization_settings completo para display=smartscalar con comparativa tipo anotherColumn (compara dos columnas de la misma fila — no requiere breakout temporal).
viz = metabase_smartscalar_anothercolumn_viz(
main_column="Margen",
compare_column="Margen_N1",
label="vs N-1",
number_style="percent",
decimals=2,
)
# Setear en /api/card via PUT visualization_settings=viz
⚠ Gotcha smartscalar Metabase v0.59: el visualization solo acepta type: "anotherColumn" cuando la query NO produce filas multiples. Si Metabase muestra el error "Agrupa solo por un campo de tiempo para ver como ha cambiado con el tiempo", hace falta un breakout temporal en la MBQL (ej. breakouts=[{"field":"fecha","base_type":"type/Date","temporal_unit":"month"}]) y usar el comparison previousValue en lugar de anotherColumn. Alternativa: metabase_smartscalar_kpi_sql + metabase_smartscalar_kpi_payload (patron 2-row nativo) si la card es SQL nativo.
Patron canonico — anadir 3 KPI cards a tab existente
import os, sys
sys.path.insert(0, "python/functions")
from metabase import (
MetabaseClient, metabase_create_card, metabase_mbql_from_source_card,
metabase_dashboard_append_row, metabase_viz_column_format,
metabase_smartscalar_anothercolumn_viz,
)
c = MetabaseClient("https://reports.autingo.es", os.environ["MB_API_KEY"])
# 1) MBQL reusando una saved-card como source
def query():
return metabase_mbql_from_source_card(
database_id=6, source_card_id=5305,
aggregations=[
{"op":"sum","field":"PrecioVenta","base_type":"type/Decimal"},
{"op":"sum","field":"PrecioCompra","base_type":"type/Decimal"},
{"op":"sum","field":"PrecioTasas","base_type":"type/Float"},
],
# joins/filters/expressions ...
)
# 2) Crear cards
card1 = metabase_create_card(c, "Masa de Margen", query(),
display="scalar", collection_id=500)
viz1 = {"scalar.field": "MasadeMargen",
"column_settings": metabase_viz_column_format(
"MasadeMargen", number_style="currency", currency="EUR", decimals=0)}
c._http.request("PUT", f"/api/card/{card1['id']}", json={"visualization_settings": viz1})
card2 = metabase_create_card(c, "Margen", query(), display="smartscalar", collection_id=500)
viz2 = metabase_smartscalar_anothercolumn_viz(
main_column="Margen", compare_column="Margen_N1", number_style="percent", decimals=2)
c._http.request("PUT", f"/api/card/{card2['id']}", json={"visualization_settings": viz2})
# 3) Append fila al tab con mappings copiados del donante
metabase_dashboard_append_row(
c, dashboard_id=734, tab_id=191,
card_ids=[card1["id"], card2["id"]],
height=4, donor_card_id=9918,
)
Documents (ProseMirror)
Los "documents" son páginas narrativas editables con texto rico y cards embebidas. No hay helpers en fn_registry todavía — usa el endpoint REST directamente a través de client._http.
Endpoints:
| Método | Ruta | Qué hace |
|---|---|---|
| GET | /api/document |
Lista documents ({items: [...]}) |
| GET | /api/document/{id} |
Lee un document (incluye document con árbol ProseMirror) |
| POST | /api/document |
Crea. Payload: {name, collection_id, document} |
| PUT | /api/document/{id} |
Actualiza. Mismo payload que POST |
| PUT | /api/document/{id} con {archived: true} |
Soft-delete |
# Crear documento
resp = client._http.request("POST", "/api/document", json={
"name": "Mi análisis",
"collection_id": 583, # obligatorio — raíz no se acepta desde API
"document": {"type": "doc", "content": [
{"type": "heading", "attrs": {"level": 1}, "content": [{"type": "text", "text": "Título"}]},
{"type": "paragraph", "content": [{"type": "text", "text": "Cuerpo."}]},
]},
})
doc_id = resp.json()["id"]
print(f"https://reports.autingo.es/document/{doc_id}")
Tipos de nodo SOPORTADOS en Metabase v0.59.x
Solo estos tipos renderizan. Cualquier tipo fuera de esta lista hace que el documento se vea vacío al abrirlo.
ALLOWED_DOC_NODES = {
"doc", "heading", "paragraph", "text",
"horizontalRule", "blockquote",
"bulletList", "listItem",
"codeBlock", # attrs.language ej: "sql"
"resizeNode", # envuelve SIEMPRE a cardEmbed
"cardEmbed", # solo dentro de resizeNode
}
Marcas inline válidas en nodos text: bold, italic, code, strike (se aplican con "marks": [{"type": "bold"}, ...]).
Tipos PROHIBIDOS (rompen el render)
table,tableRow,tableHeader,tableCell→ en v0.59.x no están registrados en el schema del editor y el doc entero se vuelve invisible.callout→ idem (documentado en memoriafeedback_metabase_prosemirror.md).image,video,iframe,mention, cualquier embed de terceros → no registrados.
Si necesitas una tabla, emúlala con una bulletList de **clave:** valor:
def kv_list(pairs):
return {"type": "bulletList", "content": [
{"type": "listItem", "content": [
{"type": "paragraph", "content": [
{"type": "text", "text": k, "marks": [{"type": "bold"}]},
{"type": "text", "text": f": {v}"},
]},
]}
for k, v in pairs
]}
cardEmbed SIEMPRE dentro de resizeNode
Un cardEmbed suelto no renderiza. Patrón obligatorio:
def card_embed(card_id, height=420):
import uuid
return {
"type": "resizeNode",
"attrs": {"height": height, "minHeight": 280},
"content": [{
"type": "cardEmbed",
"attrs": {"id": card_id, "name": None, "_id": str(uuid.uuid4())},
}],
}
Validación OBLIGATORIA antes de POST/PUT
Nunca envíes un document a Metabase sin validar primero. Un solo nodo prohibido lo deja invisible sin devolver error HTTP:
ALLOWED = {"doc","heading","paragraph","text","horizontalRule","blockquote",
"bulletList","listItem","codeBlock","resizeNode","cardEmbed"}
def validate_doc(node, path=""):
errs = []
if isinstance(node, dict):
typ = node.get("type", "?")
if typ not in ALLOWED:
errs.append(f"{path}: tipo no permitido '{typ}'")
if typ == "resizeNode":
inner = node.get("content", [])
if not (len(inner) == 1 and inner[0].get("type") == "cardEmbed"):
errs.append(f"{path}: resizeNode debe contener exactamente un cardEmbed")
return errs # no re-descender al cardEmbed interno
for i, c in enumerate(node.get("content", []) or []):
errs += validate_doc(c, f"{path}/{typ}[{i}]")
return errs
errs = validate_doc(my_doc)
assert not errs, f"Doc inválido:\n" + "\n".join(f" - {e}" for e in errs)
Aprender estructura de un doc que ya funciona
Si dudas sobre un nodo, clónalo de un doc existente que renderice:
d = client._http.request("GET", "/api/document/2").json()
# d["document"] contiene el árbol completo en ProseMirror
Databases
from metabase import (
metabase_list_databases, # (client, include_tables=False) -> list
metabase_add_database, # (client, name, engine, details) -> dict
metabase_get_database, # (client, database_id) -> dict
)
# Agregar SQLite
metabase_add_database(client, "Operations DB", "sqlite", {"db": "/data/operations.db"})
# Agregar PostgreSQL
metabase_add_database(client, "DW", "postgres", {
"host": "localhost", "port": 5432, "dbname": "warehouse",
"user": "reader", "password": "secret",
})
Usuarios
from metabase import (
metabase_list_users, # (client, status="", query="", limit=0, offset=0) -> dict
metabase_get_user, # (client, user_id) -> dict
metabase_create_user, # (client, first_name, last_name, email, password="", group_ids=None) -> dict
metabase_update_user, # (client, user_id, **fields) -> dict
metabase_deactivate_user, # (client, user_id) -> None # soft-delete
)
Setup y pipelines
from metabase import metabase_setup
# Setup inicial de instancia nueva (obtiene setup-token automaticamente)
metabase_setup("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!")
# Pipelines ejecutables con fn run
./fn run init_metabase --project fn_registry # Docker: Postgres + Metabase
./fn run setup_metabase_volume # Copiar registry.db al contenedor
./fn run metabase_add_ops_db docker_tui # Registrar operations.db como database
./fn run metabase_create_ops_dashboard docker_tui # Dashboard operativo completo
./fn run metabase_fix_permissions # Arreglar permisos SQLite en Docker
BIGQUERY — Referencia rapida
Auth
from bigquery import bq_auth, BQClient
# ADC (gcloud auth application-default login)
client = bq_auth()
# Proyecto explicito
client = bq_auth("my-project-id")
# Service account JSON
client = bq_auth(credentials_path="/path/to/sa.json")
# Context manager
with bq_auth("my-project") as client:
pass
Datasets
from bigquery import (
bq_create_dataset, # (client, dataset_id, location="US", description="", labels=None, default_table_expiration_ms=0) -> dict
bq_get_dataset, # (client, dataset_id) -> dict
bq_list_datasets, # (client) -> list[dict]
bq_update_dataset, # (client, dataset_id, description=None, labels=None, default_table_expiration_ms=None) -> dict
bq_delete_dataset, # (client, dataset_id, delete_contents=False) -> None
)
bq_create_dataset(client, "analytics", location="EU", description="Data warehouse")
bq_delete_dataset(client, "temp", delete_contents=True) # borra tablas incluidas
Tables
from bigquery import (
bq_create_table, # (client, dataset_id, table_id, schema, partitioning=None, clustering=None, description="", labels=None) -> dict
bq_get_table, # (client, dataset_id, table_id) -> dict # schema, num_rows, num_bytes, partitioning...
bq_list_tables, # (client, dataset_id) -> list[dict]
bq_update_table, # (client, dataset_id, table_id, schema=None, description=None, labels=None) -> dict
bq_delete_table, # (client, dataset_id, table_id) -> None
bq_preview_rows, # (client, dataset_id, table_id, max_results=10) -> dict # SIN COSTE de query
)
# Crear tabla con particionamiento
bq_create_table(client, "analytics", "events",
schema=[
{"name": "event_id", "type": "STRING", "mode": "REQUIRED"},
{"name": "user_id", "type": "STRING"},
{"name": "event_type", "type": "STRING"},
{"name": "created_at", "type": "TIMESTAMP"},
{"name": "payload", "type": "JSON"},
],
partitioning={"type": "DAY", "field": "created_at"},
clustering=["event_type", "user_id"],
)
# Preview sin coste (usa Storage Read API, no ejecuta query)
preview = bq_preview_rows(client, "analytics", "events", max_results=5)
# {"columns": [...], "rows": [[...], ...], "total_rows": 1234567}
# Schema: solo se pueden AGREGAR columnas, nunca eliminar
bq_update_table(client, "analytics", "events", schema=[
*existing_schema,
{"name": "new_col", "type": "STRING"},
])
Tipos de schema: STRING, INT64, FLOAT64, BOOL, TIMESTAMP, DATE, DATETIME, BYTES, NUMERIC, JSON, RECORD/STRUCT, GEOGRAPHY
Modos: NULLABLE (default), REQUIRED, REPEATED
Queries y datos
from bigquery import (
bq_query, # (client, sql, params=None, dry_run=False) -> dict
bq_insert_rows, # (client, dataset_id, table_id, rows) -> dict
bq_load_from_gcs, # (client, uri, dataset_id, table_id, source_format="CSV", write_disposition="WRITE_APPEND", autodetect=True, skip_leading_rows=0) -> dict
bq_load_from_file, # (client, file_path, dataset_id, table_id, ...) -> dict # mismos params que gcs
bq_export_to_gcs, # (client, dataset_id, table_id, destination_uri, destination_format="CSV", compression="NONE") -> dict
bq_copy_table, # (client, source_dataset, source_table, dest_dataset, dest_table, write_disposition="WRITE_EMPTY") -> dict
)
# Query simple
result = bq_query(client, "SELECT COUNT(*) as total FROM analytics.events")
# {"columns": ["total"], "rows": [[1234567]], "total_rows": 1, "bytes_processed": 0, "cache_hit": True}
# Query parametrizada (usa @nombre en SQL)
result = bq_query(client, "SELECT * FROM analytics.events WHERE event_type = @tipo LIMIT @n", params=[
{"name": "tipo", "type": "STRING", "value": "purchase"},
{"name": "n", "type": "INT64", "value": 100},
])
# Estimar coste ANTES de ejecutar (no procesa datos)
estimate = bq_query(client, "SELECT * FROM analytics.events", dry_run=True)
# {"total_bytes_processed": 5368709120, "total_bytes_billed": 5368709120}
gb = estimate["total_bytes_processed"] / (1024**3)
print(f"Esta query procesara {gb:.2f} GB (~${gb * 6.25:.2f} USD)")
# Streaming insert
bq_insert_rows(client, "analytics", "events", [
{"event_id": "e1", "user_id": "u1", "event_type": "click", "created_at": "2026-04-07T10:00:00Z"},
{"event_id": "e2", "user_id": "u2", "event_type": "purchase", "created_at": "2026-04-07T10:01:00Z"},
])
# {"inserted": 2, "errors": []}
# Cargar CSV desde GCS
bq_load_from_gcs(client, "gs://bucket/data/*.csv", "analytics", "events",
source_format="CSV", write_disposition="WRITE_TRUNCATE", skip_leading_rows=1)
# Cargar archivo local
bq_load_from_file(client, "/tmp/data.parquet", "analytics", "events",
source_format="PARQUET", write_disposition="WRITE_APPEND")
# Exportar a GCS
bq_export_to_gcs(client, "analytics", "events", "gs://bucket/export/events-*.csv",
destination_format="CSV", compression="GZIP")
# Copiar tabla
bq_copy_table(client, "analytics", "events", "analytics_backup", "events_20260407")
write_disposition: WRITE_TRUNCATE (reemplazar), WRITE_APPEND (agregar), WRITE_EMPTY (solo si vacia)
source_format: CSV, NEWLINE_DELIMITED_JSON, AVRO, PARQUET, ORC
Jobs
from bigquery import (
bq_list_jobs, # (client, state_filter="", max_results=50, all_users=False) -> list[dict]
bq_get_job, # (client, job_id) -> dict # state, bytes_processed, errors
bq_cancel_job, # (client, job_id) -> dict
)
# Ver jobs corriendo
running = bq_list_jobs(client, state_filter="running")
for j in running:
print(j["job_id"], j["job_type"], j["bytes_processed"])
# Cancelar un job pesado
bq_cancel_job(client, "job_abc123")
state_filter: running, pending, done
Routines (UDFs / Procedures)
from bigquery import (
bq_create_routine, # (client, dataset_id, routine_id, body, routine_type="SCALAR_FUNCTION", language="SQL", arguments=None, return_type="", description="") -> dict
bq_list_routines, # (client, dataset_id) -> list[dict]
bq_delete_routine, # (client, dataset_id, routine_id) -> None
)
# UDF SQL
bq_create_routine(client, "analytics", "double_value",
body="x * 2",
arguments=[{"name": "x", "data_type": "INT64"}],
return_type="INT64",
)
# Stored procedure
bq_create_routine(client, "analytics", "refresh_summary",
body="BEGIN INSERT INTO summary SELECT ... FROM events; END;",
routine_type="PROCEDURE",
)
# UDF JavaScript
bq_create_routine(client, "analytics", "parse_ua",
body="return uaParser.parse(ua).browser.name;",
language="JAVASCRIPT",
arguments=[{"name": "ua", "data_type": "STRING"}],
return_type="STRING",
)
Flujos tipicos
1. Explorar BigQuery y visualizar en Metabase
import sys; sys.path.insert(0, "python/functions")
from bigquery import bq_auth, bq_query
from metabase import metabase_auth, metabase_create_card, metabase_create_dashboard, metabase_update_dashboard
# 1. Explorar datos en BQ
bq = bq_auth("my-project")
result = bq_query(bq, "SELECT event_type, COUNT(*) as cnt FROM analytics.events GROUP BY 1 ORDER BY 2 DESC LIMIT 10")
print(result["columns"], result["rows"])
# 2. Registrar BQ como database en Metabase (si no esta)
# Metabase soporta BigQuery como engine nativo
# 3. Crear cards en Metabase apuntando a BQ
mb = metabase_auth("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!")
card = metabase_create_card(mb, "Eventos por tipo", {
"database": 2, # ID de la database BQ en Metabase
"type": "native",
"native": {"query": "SELECT event_type, COUNT(*) as cnt FROM analytics.events GROUP BY 1 ORDER BY 2 DESC"},
}, display="bar")
# 4. Crear dashboard
dash = metabase_create_dashboard(mb, "Analytics Overview")
metabase_update_dashboard(mb, dash["id"], dashcards=[
{"id": -1, "card_id": card["id"], "row": 0, "col": 0, "size_x": 12, "size_y": 6},
])
2. ETL: archivo local -> BigQuery -> Metabase dashboard
from bigquery import bq_auth, bq_load_from_file, bq_query, bq_preview_rows
from metabase import metabase_auth, metabase_execute_query
bq = bq_auth("my-project")
# Cargar datos
bq_load_from_file(bq, "/tmp/sales.csv", "warehouse", "sales",
source_format="CSV", write_disposition="WRITE_TRUNCATE", skip_leading_rows=1)
# Verificar
preview = bq_preview_rows(bq, "warehouse", "sales", max_results=3)
print(preview["total_rows"], "filas cargadas")
# Consultar via Metabase (si BQ esta registrado como database)
mb = metabase_auth("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!")
result = metabase_execute_query(mb, 2, "SELECT region, SUM(amount) FROM sales GROUP BY 1")
3. Montar infraestructura desde cero
# 1. Levantar Metabase + Postgres
./fn run init_metabase --project fn_registry
# 2. Copiar registry.db al contenedor
./fn run setup_metabase_volume
# 3. Setup inicial
python/.venv/bin/python3 -c "
import sys; sys.path.insert(0, 'python/functions')
from metabase import metabase_setup
metabase_setup('http://localhost:3000', 'admin@fnregistry.local', 'FnRegistry2024!')
"
# 4. Registrar operations.db de una app
./fn run metabase_add_ops_db docker_tui
# 5. Dashboard operativo automatico
./fn run metabase_create_ops_dashboard docker_tui
4. Auditar costes de BigQuery
from bigquery import bq_auth, bq_list_jobs, bq_query
bq = bq_auth("my-project")
# Jobs recientes completados
jobs = bq_list_jobs(bq, state_filter="done", max_results=20, all_users=True)
total_bytes = sum(j.get("bytes_processed") or 0 for j in jobs)
print(f"Ultimos 20 jobs: {total_bytes / (1024**3):.2f} GB procesados")
# Dry-run antes de queries caras
estimate = bq_query(bq, "SELECT * FROM analytics.events WHERE created_at > '2026-01-01'", dry_run=True)
gb = estimate["total_bytes_processed"] / (1024**3)
cost = gb * 6.25 # $6.25/TB on-demand
print(f"Coste estimado: ${cost:.2f} USD ({gb:.1f} GB)")
Buscar mas funciones
Si necesitas algo que no esta aqui, busca en el registry:
# FTS5 por nombre o descripcion
./fn search "lo que buscas"
# Ver detalles de una funcion
./fn show <id>
# Inline desde Python
sqlite3 registry.db "SELECT id, description FROM functions WHERE id IN (SELECT id FROM functions_fts WHERE functions_fts MATCH 'description:export*') ORDER BY name;"
$ARGUMENTS