fix(fn-run): propagar stdout/stderr de bash functions library-style #1

Open
dataforge wants to merge 537 commits from auto/0077-fn-run-bash-mudo into master
Showing only changes of commit f3e62e8303 - Show all commits
+457
View File
@@ -0,0 +1,457 @@
# /meta_bigq — Operar Metabase y BigQuery desde el registry
Eres un agente de datos. Tienes acceso a funciones Python del fn_registry para controlar **Metabase** (dashboards, cards, queries, usuarios) y **Google BigQuery** (datasets, tablas, queries, jobs, routines). Usa estas funciones directamente — no inventes llamadas HTTP manuales.
---
## Como ejecutar funciones
```bash
PYTHON="python/.venv/bin/python3"
# Ejecutar codigo inline
$PYTHON -c "
import sys; sys.path.insert(0, 'python/functions')
from metabase import metabase_auth, metabase_list_dashboards
client = metabase_auth('http://localhost:3000', 'admin@fnregistry.local', 'FnRegistry2024!')
print(metabase_list_dashboards(client))
"
# O con fn run para pipelines
./fn run init_metabase --project fn_registry
./fn run setup_metabase_volume
./fn run metabase_create_ops_dashboard docker_tui
```
Variables de entorno tipicas:
- `METABASE_URL` (default: `http://localhost:3000`)
- `METABASE_ADMIN_EMAIL` (default: `admin@fnregistry.local`)
- `METABASE_ADMIN_PASSWORD` (default: `FnRegistry2024!`)
- BigQuery usa ADC (`gcloud auth application-default login`) o `GOOGLE_APPLICATION_CREDENTIALS`
---
## METABASE — Referencia rapida
### Auth
```python
from metabase import metabase_auth, MetabaseClient
# Login con email/password
client = metabase_auth("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!")
# O directo con API key
client = MetabaseClient("http://localhost:3000", "mb_api_key_xxxxx")
# Context manager
with metabase_auth(...) as client:
pass # se cierra solo
```
### Cards (preguntas)
```python
from metabase import (
metabase_list_cards, # (client, filter="", model_id=0) -> list[dict]
metabase_get_card, # (client, card_id) -> dict
metabase_create_card, # (client, name, dataset_query, display="table", collection_id=0, description="") -> dict
metabase_update_card, # (client, card_id, **fields) -> dict # fields: name, description, display, dataset_query, archived...
metabase_delete_card, # (client, card_id) -> None # IRREVERSIBLE, preferir archived=True
metabase_execute_card, # (client, card_id, parameters=None) -> dict # ejecuta query de card guardada
metabase_execute_query, # (client, database_id, sql, max_results=0) -> dict # query ad-hoc
)
# Crear card con SQL nativo
card = metabase_create_card(client, "Ventas por mes", {
"database": 1, "type": "native",
"native": {"query": "SELECT date_trunc('month', created_at) as mes, SUM(total) FROM orders GROUP BY 1"},
}, display="line")
# Actualizar query de una card
metabase_update_card(client, card["id"], dataset_query={
"database": 1, "type": "native",
"native": {"query": "SELECT ... nueva query ..."},
})
# Archivar (soft-delete)
metabase_update_card(client, 42, archived=True)
# Query ad-hoc sin guardar
result = metabase_execute_query(client, 1, "SELECT COUNT(*) FROM users")
# result["data"]["rows"] = [[42]]
```
**Filtros de list_cards:** `all`, `mine`, `fav`, `archived`, `recent`, `popular`, `database`, `table`
### Dashboards
```python
from metabase import (
metabase_list_dashboards, # (client, filter="") -> list[dict]
metabase_get_dashboard, # (client, dashboard_id) -> dict # incluye dashcards
metabase_create_dashboard, # (client, name, description="", collection_id=0) -> dict
metabase_update_dashboard, # (client, dashboard_id, **fields) -> dict
metabase_delete_dashboard, # (client, dashboard_id) -> None # IRREVERSIBLE
)
# Crear dashboard + agregar cards
dash = metabase_create_dashboard(client, "KPIs Operativos", description="Metricas diarias")
# Posicionar cards en el dashboard (dashcards es el estado COMPLETO)
metabase_update_dashboard(client, dash["id"], dashcards=[
{"id": -1, "card_id": card1["id"], "row": 0, "col": 0, "size_x": 6, "size_y": 4},
{"id": -2, "card_id": card2["id"], "row": 0, "col": 6, "size_x": 6, "size_y": 4},
{"id": -3, "card_id": card3["id"], "row": 4, "col": 0, "size_x": 12, "size_y": 6},
])
# id negativo = card nueva, id positivo = card existente, omitida = eliminada
```
**Filtros de list_dashboards:** `all`, `mine`, `archived`
### Databases
```python
from metabase import (
metabase_list_databases, # (client, include_tables=False) -> list
metabase_add_database, # (client, name, engine, details) -> dict
metabase_get_database, # (client, database_id) -> dict
)
# Agregar SQLite
metabase_add_database(client, "Operations DB", "sqlite", {"db": "/data/operations.db"})
# Agregar PostgreSQL
metabase_add_database(client, "DW", "postgres", {
"host": "localhost", "port": 5432, "dbname": "warehouse",
"user": "reader", "password": "secret",
})
```
### Usuarios
```python
from metabase import (
metabase_list_users, # (client, status="", query="", limit=0, offset=0) -> dict
metabase_get_user, # (client, user_id) -> dict
metabase_create_user, # (client, first_name, last_name, email, password="", group_ids=None) -> dict
metabase_update_user, # (client, user_id, **fields) -> dict
metabase_deactivate_user, # (client, user_id) -> None # soft-delete
)
```
### Setup y pipelines
```python
from metabase import metabase_setup
# Setup inicial de instancia nueva (obtiene setup-token automaticamente)
metabase_setup("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!")
```
```bash
# Pipelines ejecutables con fn run
./fn run init_metabase --project fn_registry # Docker: Postgres + Metabase
./fn run setup_metabase_volume # Copiar registry.db al contenedor
./fn run metabase_add_ops_db docker_tui # Registrar operations.db como database
./fn run metabase_create_ops_dashboard docker_tui # Dashboard operativo completo
./fn run metabase_fix_permissions # Arreglar permisos SQLite en Docker
```
---
## BIGQUERY — Referencia rapida
### Auth
```python
from bigquery import bq_auth, BQClient
# ADC (gcloud auth application-default login)
client = bq_auth()
# Proyecto explicito
client = bq_auth("my-project-id")
# Service account JSON
client = bq_auth(credentials_path="/path/to/sa.json")
# Context manager
with bq_auth("my-project") as client:
pass
```
### Datasets
```python
from bigquery import (
bq_create_dataset, # (client, dataset_id, location="US", description="", labels=None, default_table_expiration_ms=0) -> dict
bq_get_dataset, # (client, dataset_id) -> dict
bq_list_datasets, # (client) -> list[dict]
bq_update_dataset, # (client, dataset_id, description=None, labels=None, default_table_expiration_ms=None) -> dict
bq_delete_dataset, # (client, dataset_id, delete_contents=False) -> None
)
bq_create_dataset(client, "analytics", location="EU", description="Data warehouse")
bq_delete_dataset(client, "temp", delete_contents=True) # borra tablas incluidas
```
### Tables
```python
from bigquery import (
bq_create_table, # (client, dataset_id, table_id, schema, partitioning=None, clustering=None, description="", labels=None) -> dict
bq_get_table, # (client, dataset_id, table_id) -> dict # schema, num_rows, num_bytes, partitioning...
bq_list_tables, # (client, dataset_id) -> list[dict]
bq_update_table, # (client, dataset_id, table_id, schema=None, description=None, labels=None) -> dict
bq_delete_table, # (client, dataset_id, table_id) -> None
bq_preview_rows, # (client, dataset_id, table_id, max_results=10) -> dict # SIN COSTE de query
)
# Crear tabla con particionamiento
bq_create_table(client, "analytics", "events",
schema=[
{"name": "event_id", "type": "STRING", "mode": "REQUIRED"},
{"name": "user_id", "type": "STRING"},
{"name": "event_type", "type": "STRING"},
{"name": "created_at", "type": "TIMESTAMP"},
{"name": "payload", "type": "JSON"},
],
partitioning={"type": "DAY", "field": "created_at"},
clustering=["event_type", "user_id"],
)
# Preview sin coste (usa Storage Read API, no ejecuta query)
preview = bq_preview_rows(client, "analytics", "events", max_results=5)
# {"columns": [...], "rows": [[...], ...], "total_rows": 1234567}
# Schema: solo se pueden AGREGAR columnas, nunca eliminar
bq_update_table(client, "analytics", "events", schema=[
*existing_schema,
{"name": "new_col", "type": "STRING"},
])
```
**Tipos de schema:** `STRING`, `INT64`, `FLOAT64`, `BOOL`, `TIMESTAMP`, `DATE`, `DATETIME`, `BYTES`, `NUMERIC`, `JSON`, `RECORD`/`STRUCT`, `GEOGRAPHY`
**Modos:** `NULLABLE` (default), `REQUIRED`, `REPEATED`
### Queries y datos
```python
from bigquery import (
bq_query, # (client, sql, params=None, dry_run=False) -> dict
bq_insert_rows, # (client, dataset_id, table_id, rows) -> dict
bq_load_from_gcs, # (client, uri, dataset_id, table_id, source_format="CSV", write_disposition="WRITE_APPEND", autodetect=True, skip_leading_rows=0) -> dict
bq_load_from_file, # (client, file_path, dataset_id, table_id, ...) -> dict # mismos params que gcs
bq_export_to_gcs, # (client, dataset_id, table_id, destination_uri, destination_format="CSV", compression="NONE") -> dict
bq_copy_table, # (client, source_dataset, source_table, dest_dataset, dest_table, write_disposition="WRITE_EMPTY") -> dict
)
# Query simple
result = bq_query(client, "SELECT COUNT(*) as total FROM analytics.events")
# {"columns": ["total"], "rows": [[1234567]], "total_rows": 1, "bytes_processed": 0, "cache_hit": True}
# Query parametrizada (usa @nombre en SQL)
result = bq_query(client, "SELECT * FROM analytics.events WHERE event_type = @tipo LIMIT @n", params=[
{"name": "tipo", "type": "STRING", "value": "purchase"},
{"name": "n", "type": "INT64", "value": 100},
])
# Estimar coste ANTES de ejecutar (no procesa datos)
estimate = bq_query(client, "SELECT * FROM analytics.events", dry_run=True)
# {"total_bytes_processed": 5368709120, "total_bytes_billed": 5368709120}
gb = estimate["total_bytes_processed"] / (1024**3)
print(f"Esta query procesara {gb:.2f} GB (~${gb * 6.25:.2f} USD)")
# Streaming insert
bq_insert_rows(client, "analytics", "events", [
{"event_id": "e1", "user_id": "u1", "event_type": "click", "created_at": "2026-04-07T10:00:00Z"},
{"event_id": "e2", "user_id": "u2", "event_type": "purchase", "created_at": "2026-04-07T10:01:00Z"},
])
# {"inserted": 2, "errors": []}
# Cargar CSV desde GCS
bq_load_from_gcs(client, "gs://bucket/data/*.csv", "analytics", "events",
source_format="CSV", write_disposition="WRITE_TRUNCATE", skip_leading_rows=1)
# Cargar archivo local
bq_load_from_file(client, "/tmp/data.parquet", "analytics", "events",
source_format="PARQUET", write_disposition="WRITE_APPEND")
# Exportar a GCS
bq_export_to_gcs(client, "analytics", "events", "gs://bucket/export/events-*.csv",
destination_format="CSV", compression="GZIP")
# Copiar tabla
bq_copy_table(client, "analytics", "events", "analytics_backup", "events_20260407")
```
**write_disposition:** `WRITE_TRUNCATE` (reemplazar), `WRITE_APPEND` (agregar), `WRITE_EMPTY` (solo si vacia)
**source_format:** `CSV`, `NEWLINE_DELIMITED_JSON`, `AVRO`, `PARQUET`, `ORC`
### Jobs
```python
from bigquery import (
bq_list_jobs, # (client, state_filter="", max_results=50, all_users=False) -> list[dict]
bq_get_job, # (client, job_id) -> dict # state, bytes_processed, errors
bq_cancel_job, # (client, job_id) -> dict
)
# Ver jobs corriendo
running = bq_list_jobs(client, state_filter="running")
for j in running:
print(j["job_id"], j["job_type"], j["bytes_processed"])
# Cancelar un job pesado
bq_cancel_job(client, "job_abc123")
```
**state_filter:** `running`, `pending`, `done`
### Routines (UDFs / Procedures)
```python
from bigquery import (
bq_create_routine, # (client, dataset_id, routine_id, body, routine_type="SCALAR_FUNCTION", language="SQL", arguments=None, return_type="", description="") -> dict
bq_list_routines, # (client, dataset_id) -> list[dict]
bq_delete_routine, # (client, dataset_id, routine_id) -> None
)
# UDF SQL
bq_create_routine(client, "analytics", "double_value",
body="x * 2",
arguments=[{"name": "x", "data_type": "INT64"}],
return_type="INT64",
)
# Stored procedure
bq_create_routine(client, "analytics", "refresh_summary",
body="BEGIN INSERT INTO summary SELECT ... FROM events; END;",
routine_type="PROCEDURE",
)
# UDF JavaScript
bq_create_routine(client, "analytics", "parse_ua",
body="return uaParser.parse(ua).browser.name;",
language="JAVASCRIPT",
arguments=[{"name": "ua", "data_type": "STRING"}],
return_type="STRING",
)
```
---
## Flujos tipicos
### 1. Explorar BigQuery y visualizar en Metabase
```python
import sys; sys.path.insert(0, "python/functions")
from bigquery import bq_auth, bq_query
from metabase import metabase_auth, metabase_create_card, metabase_create_dashboard, metabase_update_dashboard
# 1. Explorar datos en BQ
bq = bq_auth("my-project")
result = bq_query(bq, "SELECT event_type, COUNT(*) as cnt FROM analytics.events GROUP BY 1 ORDER BY 2 DESC LIMIT 10")
print(result["columns"], result["rows"])
# 2. Registrar BQ como database en Metabase (si no esta)
# Metabase soporta BigQuery como engine nativo
# 3. Crear cards en Metabase apuntando a BQ
mb = metabase_auth("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!")
card = metabase_create_card(mb, "Eventos por tipo", {
"database": 2, # ID de la database BQ en Metabase
"type": "native",
"native": {"query": "SELECT event_type, COUNT(*) as cnt FROM analytics.events GROUP BY 1 ORDER BY 2 DESC"},
}, display="bar")
# 4. Crear dashboard
dash = metabase_create_dashboard(mb, "Analytics Overview")
metabase_update_dashboard(mb, dash["id"], dashcards=[
{"id": -1, "card_id": card["id"], "row": 0, "col": 0, "size_x": 12, "size_y": 6},
])
```
### 2. ETL: archivo local -> BigQuery -> Metabase dashboard
```python
from bigquery import bq_auth, bq_load_from_file, bq_query, bq_preview_rows
from metabase import metabase_auth, metabase_execute_query
bq = bq_auth("my-project")
# Cargar datos
bq_load_from_file(bq, "/tmp/sales.csv", "warehouse", "sales",
source_format="CSV", write_disposition="WRITE_TRUNCATE", skip_leading_rows=1)
# Verificar
preview = bq_preview_rows(bq, "warehouse", "sales", max_results=3)
print(preview["total_rows"], "filas cargadas")
# Consultar via Metabase (si BQ esta registrado como database)
mb = metabase_auth("http://localhost:3000", "admin@fnregistry.local", "FnRegistry2024!")
result = metabase_execute_query(mb, 2, "SELECT region, SUM(amount) FROM sales GROUP BY 1")
```
### 3. Montar infraestructura desde cero
```bash
# 1. Levantar Metabase + Postgres
./fn run init_metabase --project fn_registry
# 2. Copiar registry.db al contenedor
./fn run setup_metabase_volume
# 3. Setup inicial
python/.venv/bin/python3 -c "
import sys; sys.path.insert(0, 'python/functions')
from metabase import metabase_setup
metabase_setup('http://localhost:3000', 'admin@fnregistry.local', 'FnRegistry2024!')
"
# 4. Registrar operations.db de una app
./fn run metabase_add_ops_db docker_tui
# 5. Dashboard operativo automatico
./fn run metabase_create_ops_dashboard docker_tui
```
### 4. Auditar costes de BigQuery
```python
from bigquery import bq_auth, bq_list_jobs, bq_query
bq = bq_auth("my-project")
# Jobs recientes completados
jobs = bq_list_jobs(bq, state_filter="done", max_results=20, all_users=True)
total_bytes = sum(j.get("bytes_processed") or 0 for j in jobs)
print(f"Ultimos 20 jobs: {total_bytes / (1024**3):.2f} GB procesados")
# Dry-run antes de queries caras
estimate = bq_query(bq, "SELECT * FROM analytics.events WHERE created_at > '2026-01-01'", dry_run=True)
gb = estimate["total_bytes_processed"] / (1024**3)
cost = gb * 6.25 # $6.25/TB on-demand
print(f"Coste estimado: ${cost:.2f} USD ({gb:.1f} GB)")
```
---
## Buscar mas funciones
Si necesitas algo que no esta aqui, busca en el registry:
```bash
# FTS5 por nombre o descripcion
./fn search "lo que buscas"
# Ver detalles de una funcion
./fn show <id>
# Inline desde Python
sqlite3 registry.db "SELECT id, description FROM functions WHERE id IN (SELECT id FROM functions_fts WHERE functions_fts MATCH 'description:export*') ORDER BY name;"
```
$ARGUMENTS