fix(fn-run): propagar stdout/stderr de bash functions library-style #1

Open
dataforge wants to merge 537 commits from auto/0077-fn-run-bash-mudo into master
33 changed files with 3116 additions and 0 deletions
Showing only changes of commit 58fab5ad34 - Show all commits
+9
View File
@@ -0,0 +1,9 @@
projects/*/.env
projects/*/state/session.json
projects/*/state/push.log
projects/*/state/backups/
operations.db
operations.db-shm
operations.db-wal
__pycache__/
*.pyc
+390
View File
@@ -0,0 +1,390 @@
---
name: auto_metabase
lang: py
domain: analytics
description: "Sincronizacion bidireccional entre archivos YAML locales y una instancia Metabase. Cada dashboard, card, database, collection y document es un archivo editable; pull/push mantiene Metabase y disco en sintonia. Inspirado en rapid_dashboards."
tags: [metabase, sync, declarative, yaml, dashboards, gitops]
uses_functions:
- metabase_auth_py_infra
- metabase_list_databases_py_infra
- metabase_get_database_py_infra
- metabase_add_database_py_infra
- metabase_list_dashboards_py_infra
- metabase_get_dashboard_py_infra
- metabase_create_dashboard_py_infra
- metabase_update_dashboard_py_infra
- metabase_delete_dashboard_py_infra
- metabase_list_cards_py_infra
- metabase_get_card_py_infra
- metabase_create_card_py_infra
- metabase_update_card_py_infra
- metabase_delete_card_py_infra
- metabase_execute_query_py_infra
uses_types: []
framework: httpx
entry_point: "main.py"
dir_path: "apps/auto_metabase"
---
## Idea
Cada artefacto de Metabase (dashboard, card, database, collection, document) vive
como un archivo YAML editable. Los archivos son la fuente de verdad.
**Multi-proyecto:** cada entorno Metabase (local, prod, staging, ...) es un
proyecto aislado bajo `projects/{name}/` con su propio `config.yaml`, `.env`,
`state/` y carpetas de YAMLs. El proyecto activo se elige con `-p NAME`
o por defecto desde `config.yaml` top-level (`default_project`).
## Comandos
| Comando | Categoria | Que hace |
|---------|-----------|----------|
| `projects` | proyectos | Lista proyectos disponibles y marca el default |
| `init-project NAME --base-url URL` | proyectos | Crea proyecto nuevo (config + carpetas + index vacio) |
| `login` | sesion | Autentica contra Metabase y guarda token en `state/session.json` |
| `status` | sesion | Muestra estado del proyecto (sesion, items en index, archivos en disco) |
| `describe <db>` | exploracion | Lista tablas, columnas y tipos de un database (`--samples` para 3 filas demo) |
| `sql <db> "QUERY"` | exploracion | Ejecuta SQL ad-hoc, NO crea card. `--limit 100` por defecto |
| `remote <kind>` | exploracion | Lista items en Metabase sin descargar nada |
| `pull <kind> <ref>` | sync | Trae UN item de Metabase a disco (per-item, nunca bulk) |
| `validate <kind> <slug>` | sync | Valida YAML local sin tocar Metabase. `--check-sql` ejecuta la query |
| `push <kind> <slug>` | sync | Aplica UN item a Metabase. Dry-run por defecto. `--apply` para enviar |
| `push-all` | sync | Pushea TODOS los YAMLs del proyecto. Solo CREATE/UPDATE, nunca DELETE |
| `restore <kind> <slug>` | sync | Restaura YAML local desde backup (no aplica a Metabase) |
| `diff <kind> <slug>` | sync | Alias temporal de `validate --show-payload` |
## Layout
```
apps/auto_metabase/
config.yaml # default_project + projects_dir
main.py # CLI entrypoint
payload.py # builders: YAML -> payload Metabase (resuelve refs, auto-inyecta)
sync_pull.py # pull per-item desde Metabase
sync_push.py # push per-item + push_all (R1-R20)
sync_validate.py # validacion estructural + SQL check
sync_restore.py # restore desde backups locales
explore.py # describe + sql ad-hoc
scripts/seed_test_data.py # seed inicial de db+cards+dashboard de prueba
projects/
{name}/ # un directorio por entorno Metabase
config.yaml # base_url + auth + sync rules
.env # credenciales (gitignored)
state/
session.json # token de sesion (gitignored)
index.json # slug local <-> metabase_id por tipo
push.log # JSONL append-only de cada push
backups/{ts}/{kind}/... # backup automatico antes de UPDATE
databases/{slug}.yaml # connections
collections/{slug}.yaml # collections (carpetas)
cards/{slug}.yaml # cards reusables
dashboards/{slug}.yaml # dashboards con dashcards
documents/{slug}.yaml # [pendiente] Metabase >= v0.51
```
## Configuracion
**Top-level** `config.yaml` (en la raiz del app) selecciona el proyecto por defecto:
```yaml
default_project: test_local
projects_dir: projects
```
**Por proyecto** `projects/{name}/config.yaml` define la URL y reglas de sync:
```yaml
name: test_local
description: "Metabase de prueba en Docker local"
base_url: http://localhost:3000
auth:
email_env: METABASE_EMAIL
password_env: METABASE_PASSWORD
sync:
ignore_collections: [] # IDs de colecciones a no sincronizar
ignore_databases: [1] # 1 = "Sample Database" interno de Metabase
prefer_archive: true # archivar en vez de borrar al hacer push
```
Credenciales en `projects/{name}/.env` (gitignored):
```
METABASE_EMAIL=admin@example.com
METABASE_PASSWORD=changeme
```
## Estado
`state/index.json` mantiene el mapeo slug local <-> id de Metabase para no
duplicar al hacer push. Estructura:
```json
{
"databases": { "registry": 2, "ops_demo": 3 },
"collections": { "auto_metabase": 5 },
"cards": { "totals_by_domain": 12 },
"dashboards": { "fn_overview": 4 }
}
```
## Pendiente
- [ ] Soporte de Metabase Documents (a investigar — endpoint nuevo en versiones recientes).
- [ ] Soporte de Pulses / Alerts / Subscriptions.
- [ ] Soporte de Permissions (groups + permission_graph).
- [ ] Diff con render colorizado en TUI.
- [ ] Recovery automatico cuando un CREATE de dashboard falla a medias
(POST ok + PUT fail deja un dashboard huerfano sin entrar al index).
---
## Crear cards y dashboards desde cero (workflow probado)
### 1. Referenciar una database que no se sincroniza
Si quieres apuntar cards a la **Sample Database** de Metabase (db `1`, `ignore_databases: [1]` por defecto), basta con anadir el slug al `state/index.json` — no hace falta YAML en `databases/`:
```json
"databases": {
"metabase_internal_pg": 2,
"sample_database": 1
}
```
Las cards luego usan `_refs.database: sample_database` y `dataset_query.database: sample_database`, y el builder lo resuelve a `1` via index.
### 2. Formato del `dataset_query` (MBQL v2 con SQL nativo)
Metabase moderno usa MBQL stages, no la forma vieja `{type: native, native: {query: ...}}`. La forma correcta para cards SQL:
```yaml
dataset_query:
lib/type: mbql/query
database: <slug_o_id>
stages:
- lib/type: mbql.stage/native
native: |-
SELECT ...
```
`query_type: native` se mantiene en el payload top-level (Metabase lo usa internamente).
### 3. SQL del Sample Database = H2
Tablas en MAYUSCULAS (`PEOPLE`, `ORDERS`, `PRODUCTS`...) y funciones H2:
- `FORMATDATETIME(col, 'yyyy-MM')` para truncar fechas (no `DATE_TRUNC`).
- `ROUND(SUM(x), 2)` igual que en Postgres.
- Joins y agregaciones SQL-92 estandar.
### 4. Crear cards nuevas
YAML minimo de una card nueva:
```yaml
_meta:
kind: card
id: null # null = nueva, push hara CREATE
slug: clientes_total # debe coincidir con el filename
_refs:
database: sample_database
collection: null
payload:
name: clientes_total
description: ...
type: question
query_type: native
display: scalar # o table, bar, line, pie, area, ...
archived: false
enable_embedding: false
collection_preview: true
visualization_settings: {}
parameters: []
parameter_mappings: []
dataset_query:
lib/type: mbql/query
database: sample_database
stages:
- lib/type: mbql.stage/native
native: SELECT COUNT(*) AS total FROM PEOPLE
```
Validar antes de pushear:
```bash
python main.py validate card clientes_total --check-sql
```
`--check-sql` ejecuta la query contra Metabase (solo para cards `native`). Pillar errores de sintaxis aqui es mas barato que en push.
Push:
```bash
python main.py push card clientes_total --apply
```
Sin `--apply` es dry-run. Tras un CREATE exitoso:
- Se asigna id real, se actualiza `state/index.json`.
- Se hace `pull` automatico → el YAML se reescribe con `_meta.id`, `synced_at`, `remote_updated_at`.
- Hay que pushear las cards UNA POR UNA (no hay batch).
### 5. Crear dashboards nuevos
Las cards referenciadas deben existir antes (ya en el index). Estructura minima del YAML:
```yaml
_meta:
kind: dashboard
id: null
slug: compras_y_clientes
_refs:
collection: null
payload:
name: Compras y Clientes
description: ...
archived: false
enable_embedding: false
width: fixed # o "full"
auto_apply_filters: true
parameters: []
tabs: []
dashcards:
- card: clientes_total # slug, se resuelve a card_id
row: 0
col: 0
size_x: 8 # grid de 24 columnas
size_y: 3
- card: compras_total
row: 0
col: 8
size_x: 8
size_y: 3
```
Push:
```bash
python main.py push dashboard compras_y_clientes --apply
```
**Auto-inyeccion (`build_dashboard_payload`)** — para cada dashcard sin `id`,
inyecta automaticamente:
- `id`: negativo unico (-1, -2, ...) en orden de aparicion. Metabase lo necesita
para distinguir dashcards nuevas (`id < 0`) de las existentes (`id > 0`).
Si la dashcard ya trae `id` (positivo o negativo), se respeta tal cual.
- `visualization_settings: {}` si falta.
- `parameter_mappings: []` si falta.
Esto resuelve el **escollo historico**: antes, omitir esos campos provocaba que el
POST creara el dashboard pero el PUT siguiente devolviera 500 — quedando un
dashboard huerfano en Metabase que no entraba al index local. Ahora los YAMLs
de dashboards son tan simples como los de cards: solo `card`, `row`, `col`,
`size_x`, `size_y` por dashcard.
### 6. `push-all` — proyecto entero en un comando
```bash
# Dry-run de todo el proyecto (cards primero, dashboards despues)
python main.py push-all
# Aplicar
python main.py push-all --apply
# Solo cards (saltar dashboards)
python main.py push-all --apply --kinds card
# Forzar saltando R17 (freshness) y R18 (count) en cada item
python main.py push-all --apply --force-overwrite
# Permitir warnings estructurales
python main.py push-all --apply --allow-warnings
```
Garantias:
- **Solo CREATE o UPDATE — nunca DELETE.** Si un YAML local desaparece, el
item correspondiente sigue intacto en Metabase. La unica via para borrar
algo en Metabase es manualmente via UI o API.
- **Cards primero, dashboards despues.** Asi cualquier card recien creada ya
esta en el index cuando se construye el payload del dashboard que la
referencia.
- **Resiliente a fallos por item.** Si una card falla (validation error, SQL
invalido, conflicto de freshness), `push-all` captura el `SystemExit`,
lo registra en el resumen final y continua con el siguiente item.
- **Reusa toda la logica de `push_one`**: backup obligatorio antes de UPDATE
(R6), freshness check (R17), count check para dashboards (R18), log JSONL
en `state/push.log` (R13).
Resumen final mostrado en stdout:
```
=== resumen push all ===
OK: 11 ['card:clientes_total', 'card:compras_total', ...]
FAILED: 0 []
```
Exit code: 1 si hubo fallos, 0 si todo OK.
### 7. Grid de Metabase
24 columnas, alturas en filas de ~30px. Layout que funciona bien para 6 cards:
| Fila | Cards | Filas (size_y) |
|------|-------|----------------|
| 0 | 3 KPIs scalar (8 cols c/u) | 3 |
| 3 | 2 graficos (12 cols c/u) | 6 |
| 9 | 1 tabla ancha (24 cols) | 7 |
### 8. Exploracion rapida: `describe` + `sql`
Dos comandos para no escribir cards a ciegas. **Read-only**, no tocan nada en disco ni en Metabase.
```bash
# Schema del database (tablas + columnas + tipos)
python main.py describe sample_database
python main.py describe sample_database --tables-only # solo nombres + descripcion
python main.py describe sample_database --filter products # una sola tabla
python main.py describe sample_database --samples # +3 filas de ejemplo por tabla
# SQL ad-hoc — ideal para iterar antes de guardar como card
python main.py sql sample_database "SELECT CATEGORY, COUNT(*) FROM PRODUCTS GROUP BY CATEGORY"
python main.py sql sample_database "SELECT * FROM ORDERS" --limit 5
```
Tres barreras anti-explosion en `sql`:
1. `--limit N` (default **100**) → se envia como `max-results` constraint a Metabase. Metabase corta server-side, no transferimos filas de mas.
2. **Hard ceiling 10 000 filas** — incluso `--limit 999999` se cappea.
3. **5 MB de payload** — si las filas son anchas y exceden, se recorta antes de imprimir.
Queries que empiezan por `INSERT/UPDATE/DELETE/DROP/TRUNCATE/ALTER/CREATE` se bloquean. `--allow-write` las deja pasar (Metabase normalmente las bloquea igualmente en `/api/dataset`).
Errores SQL devuelven el mensaje de Metabase limpio:
```
$ python main.py sql sample_database "SELECT NOPE FROM PEOPLE"
ERROR (400): Column "NOPE" not found; SQL statement: SELECT NOPE FROM PEOPLE [42122-214]
```
**Workflow recomendado para card nueva:**
```bash
python main.py describe sample_database --filter orders # 1. ver columnas
python main.py sql sample_database "SELECT ... FROM ORDERS" # 2. iterar SQL en stdout
# 3. cuando la query funciona, copiarla a cards/<slug>.yaml
python main.py push card <slug> --apply # 4. guardarla como card
```
### 9. Resumen del flujo
```
1. Editar YAMLs de cards en projects/{name}/cards/
2. Editar YAML del dashboard con dashcards (solo card slug + row/col/size — auto-inject hace el resto)
3. python main.py push-all --apply
4. Abrir http://localhost:3000/dashboard/<id>
```
Cualquier cambio futuro: editar el YAML, `push-all --apply` (o `push <kind> <slug> --apply` si quieres aplicar solo uno). Cualquier cambio hecho desde la UI de Metabase: `pull <kind> <slug>`. La logica de freshness (R17) avisa si hubo cambios remotos no traidos antes de pushear local.
+5
View File
@@ -0,0 +1,5 @@
# Configuracion top-level de auto_metabase.
# Cada proyecto vive en projects/{name}/ con su propio config.yaml.
default_project: test_local
projects_dir: projects
+212
View File
@@ -0,0 +1,212 @@
"""Comandos de exploracion: describe + sql.
- describe <db_slug> Lista tablas, columnas, tipos y conteo de filas.
- sql <db_slug> "SELECT ..." Ejecuta SQL ad-hoc con limites de seguridad.
Ambos resuelven el slug de database via state/index.json del proyecto activo.
No tocan disco ni crean cards — son herramientas de inspeccion pura.
"""
from __future__ import annotations
import sys
from typing import Any
import httpx
from metabase.cards import metabase_execute_query
# ---------------------------------------------------------------- Limites
# Hard ceiling: ni con --limit muy alto se exceden estas filas/celdas.
HARD_MAX_ROWS = 10_000
DEFAULT_MAX_ROWS = 100
MAX_CELL_CHARS = 60 # truncar celdas largas en stdout
MAX_TOTAL_BYTES = 5_000_000 # 5 MB de payload de respuesta — corta antes
# ---------------------------------------------------------------- Pretty-print
def _truncate(s: str, n: int = MAX_CELL_CHARS) -> str:
if len(s) <= n:
return s
return s[: n - 1] + ""
def _format_cell(v: Any) -> str:
if v is None:
return ""
if isinstance(v, float):
# evitar 1.5000000001
return f"{v:.4g}" if abs(v) < 1e6 else f"{v:.2f}"
return _truncate(str(v))
def _print_table(headers: list[str], rows: list[list[Any]], total_rows: int | None = None) -> None:
"""Imprime una tabla simple en stdout. Calcula anchos por columna."""
if not rows:
print(" (sin filas)")
if total_rows:
print(f" total en BD: {total_rows}")
return
formatted = [[_format_cell(c) for c in row] for row in rows]
widths = [len(h) for h in headers]
for row in formatted:
for i, cell in enumerate(row):
widths[i] = max(widths[i], len(cell))
sep = " ".join("-" * w for w in widths)
print(" " + " ".join(h.ljust(widths[i]) for i, h in enumerate(headers)))
print(" " + sep)
for row in formatted:
print(" " + " ".join(row[i].ljust(widths[i]) for i in range(len(row))))
print()
n = len(rows)
if total_rows is not None and total_rows > n:
print(f" ({n} filas mostradas, {total_rows} en BD)")
else:
print(f" ({n} filas)")
# ---------------------------------------------------------------- describe
def _resolve_db_id(project, db_slug: str) -> int:
idx = project.load_index()
dbs = idx.get("databases", {})
if db_slug not in dbs:
# tambien aceptar id numerico
try:
return int(db_slug)
except ValueError:
raise SystemExit(
f"database slug '{db_slug}' no esta en index. "
f"Conocidos: {sorted(dbs.keys())}"
)
return dbs[db_slug]
def cmd_describe(args, project, client) -> None:
"""Describe un database: tablas, columnas, tipos."""
db_id = _resolve_db_id(project, args.db)
meta = client.request("GET", f"/api/database/{db_id}/metadata")
print(f"\ndatabase: {meta.get('name')} (id={db_id}, engine={meta.get('engine')})")
if meta.get("description"):
print(f" {meta['description']}")
tables = meta.get("tables", []) or []
if args.filter:
f = args.filter.lower()
tables = [t for t in tables if f in (t.get("name") or "").lower()]
print(f"\ntablas: {len(tables)}")
for t in tables:
name = t.get("name")
schema = t.get("schema") or ""
rows = t.get("rows")
rows_str = f"~{rows} filas" if rows is not None else ""
prefix = f"{schema}." if schema and schema not in ("public", "PUBLIC") else ""
print(f"\n {prefix}{name} ({rows_str})")
if t.get("description"):
print(f" {t['description']}")
if args.tables_only:
continue
fields = t.get("fields", []) or []
max_name_len = max((len(f.get("name") or "") for f in fields), default=0)
for f in fields:
fname = (f.get("name") or "").ljust(max_name_len)
ftype = f.get("base_type", "").replace("type/", "")
extras = []
if f.get("semantic_type"):
extras.append(f.get("semantic_type").replace("type/", ""))
if f.get("fk_target_field_id"):
extras.append("FK")
extra_str = f" [{', '.join(extras)}]" if extras else ""
print(f" {fname} {ftype}{extra_str}")
if args.samples and not args.tables_only:
try:
sql = f'SELECT * FROM "{name}" LIMIT 3'
# Adapta al engine: H2/postgres usan dobles comillas; mysql backticks
if meta.get("engine") == "mysql":
sql = f"SELECT * FROM `{name}` LIMIT 3"
result = metabase_execute_query(client, db_id, sql, max_results=3)
cols = [c["display_name"] for c in result["data"]["cols"]]
rows_data = result["data"]["rows"][:3]
print(f" sample (3 rows):")
for row in rows_data:
pairs = [f"{cols[i]}={_format_cell(v)}" for i, v in enumerate(row)]
print(f" - {', '.join(pairs[:6])}{'...' if len(pairs) > 6 else ''}")
except Exception as e:
print(f" (sample fallo: {type(e).__name__})")
# ---------------------------------------------------------------- sql
def cmd_sql(args, project, client) -> None:
"""Ejecuta SQL ad-hoc contra un database. Limite de filas obligatorio."""
db_id = _resolve_db_id(project, args.db)
sql = args.query.strip().rstrip(";")
if not sql:
raise SystemExit("query vacia")
# Aviso si la query es claramente destructiva — solo lectura via /api/dataset
upper = sql.upper().lstrip()
destructive = ("INSERT", "UPDATE", "DELETE", "DROP", "TRUNCATE", "ALTER", "CREATE")
if any(upper.startswith(kw) for kw in destructive):
if not args.allow_write:
raise SystemExit(
"query empieza con keyword destructiva. "
"/api/dataset suele bloquearlas, pero si quieres seguir: --allow-write"
)
limit = min(max(1, args.limit), HARD_MAX_ROWS)
if args.limit > HARD_MAX_ROWS:
print(f" (--limit {args.limit} capado al hard ceiling {HARD_MAX_ROWS})")
print(f"\nsql: {sql[:200]}{'...' if len(sql) > 200 else ''}")
print(f"db: {args.db} (id={db_id}) limit: {limit}")
try:
result = metabase_execute_query(client, db_id, sql, max_results=limit)
except httpx.HTTPStatusError as e:
# Metabase mete el error en el JSON body incluso con 4xx
try:
body = e.response.json()
err = body.get("error") or body.get("message") or e.response.text[:500]
except Exception:
err = e.response.text[:500]
print(f"\nERROR ({e.response.status_code}): {err}", file=sys.stderr)
sys.exit(1)
status = result.get("status")
if status != "completed":
err = result.get("error") or result.get("message") or "(sin mensaje)"
print(f"\nERROR de Metabase: {err}", file=sys.stderr)
sys.exit(1)
cols_meta = result["data"]["cols"]
rows = result["data"]["rows"]
headers = [c.get("display_name") or c.get("name") for c in cols_meta]
rt = result.get("running_time", 0)
rc = result.get("row_count", len(rows))
print(f"running_time: {rt}ms row_count: {rc}\n")
# Cap de bytes de payload por seguridad (visualizacion en terminal)
payload_size = sum(sum(len(str(c)) for c in row) for row in rows)
if payload_size > MAX_TOTAL_BYTES:
keep = max(1, len(rows) * MAX_TOTAL_BYTES // max(1, payload_size))
print(f" ! payload {payload_size} bytes > {MAX_TOTAL_BYTES} — recortando a {keep} filas")
rows = rows[:keep]
_print_table(headers, rows, total_rows=rc)
+450
View File
@@ -0,0 +1,450 @@
"""auto_metabase — sincronizacion bidireccional Metabase ↔ archivos YAML.
Multi-proyecto: cada entorno Metabase (local, prod, staging...) es un proyecto
aislado bajo projects/{name}/ con su propio config.yaml, .env, state/ y YAMLs.
Uso:
python main.py projects # lista proyectos
python main.py init-project NAME --base-url URL
python main.py [-p PROJECT] login
python main.py [-p PROJECT] status
python main.py [-p PROJECT] pull [--types databases,collections,cards,dashboards]
python main.py [-p PROJECT] push [--dry-run]
python main.py [-p PROJECT] diff
Si no se pasa --project, se usa default_project del config.yaml top-level.
"""
import argparse
import json
import os
import sys
from pathlib import Path
# Hacer accesibles las funciones del registry
APP_DIR = Path(__file__).resolve().parent
REGISTRY_ROOT = APP_DIR.parent.parent
sys.path.insert(0, str(REGISTRY_ROOT / "python" / "functions"))
import yaml # noqa: E402
from metabase.client import MetabaseClient, metabase_auth # noqa: E402
# ---------------------------------------------------------------- Top-level config
TOP_CONFIG_PATH = APP_DIR / "config.yaml"
def load_top_config() -> dict:
if not TOP_CONFIG_PATH.exists():
return {"default_project": "test_local", "projects_dir": "projects"}
with TOP_CONFIG_PATH.open() as f:
return yaml.safe_load(f) or {}
def projects_root() -> Path:
return APP_DIR / load_top_config().get("projects_dir", "projects")
def list_projects() -> list[str]:
root = projects_root()
if not root.exists():
return []
return sorted(p.name for p in root.iterdir() if p.is_dir() and (p / "config.yaml").exists())
# ---------------------------------------------------------------- Project context
class Project:
"""Contexto inmutable de un proyecto."""
def __init__(self, name: str):
self.name = name
self.dir = projects_root() / name
if not self.dir.exists():
raise SystemExit(
f"Proyecto '{name}' no existe. Ejecuta: "
f"python main.py init-project {name} --base-url URL"
)
self.config_path = self.dir / "config.yaml"
self.env_path = self.dir / ".env"
self.state_dir = self.dir / "state"
self.session_path = self.state_dir / "session.json"
self.index_path = self.state_dir / "index.json"
@property
def config(self) -> dict:
with self.config_path.open() as f:
return yaml.safe_load(f) or {}
@property
def base_url(self) -> str:
return self.config["base_url"]
def load_env(self) -> dict:
env = {}
if not self.env_path.exists():
return env
for line in self.env_path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
env[k.strip()] = v.strip().strip('"').strip("'")
return env
def load_session(self) -> dict | None:
if not self.session_path.exists():
return None
try:
return json.loads(self.session_path.read_text())
except json.JSONDecodeError:
return None
def save_session(self, base_url: str, token: str) -> None:
self.state_dir.mkdir(exist_ok=True)
self.session_path.write_text(
json.dumps({"base_url": base_url, "token": token}, indent=2)
)
def load_index(self) -> dict:
if not self.index_path.exists():
return {"databases": {}, "collections": {}, "cards": {}, "dashboards": {}, "documents": {}}
return json.loads(self.index_path.read_text())
def save_index(self, idx: dict) -> None:
self.state_dir.mkdir(exist_ok=True)
self.index_path.write_text(json.dumps(idx, indent=2, sort_keys=True))
def resolve_project(arg_name: str | None) -> Project:
name = arg_name or load_top_config().get("default_project")
if not name:
raise SystemExit("No hay default_project en config.yaml. Pasa --project NAME.")
return Project(name)
# ---------------------------------------------------------------- Client
def get_client(project: Project, force_login: bool = False) -> MetabaseClient:
base_url = project.base_url
sess = None if force_login else project.load_session()
if sess and sess.get("base_url") == base_url:
client = MetabaseClient(base_url, sess["token"])
try:
client.request("GET", "/api/user/current")
return client
except Exception:
pass # token caducado
env = {**os.environ, **project.load_env()}
auth_cfg = project.config.get("auth", {})
email = env.get(auth_cfg.get("email_env", "METABASE_EMAIL"))
password = env.get(auth_cfg.get("password_env", "METABASE_PASSWORD"))
if not email or not password:
raise SystemExit(
f"Faltan credenciales para proyecto '{project.name}'. "
f"Define {auth_cfg.get('email_env')} y {auth_cfg.get('password_env')} "
f"en {project.env_path.relative_to(APP_DIR)}"
)
client = metabase_auth(base_url, email, password)
project.save_session(base_url, client.token)
return client
# ---------------------------------------------------------------- Commands
def cmd_projects(_args):
top = load_top_config()
default = top.get("default_project")
projs = list_projects()
if not projs:
print("(sin proyectos. Crea uno con: init-project NAME --base-url URL)")
return
print(f"default: {default}\n")
for name in projs:
marker = "*" if name == default else " "
try:
cfg = (projects_root() / name / "config.yaml").read_text()
base = next(
(l.split(":", 1)[1].strip() for l in cfg.splitlines() if l.startswith("base_url")),
"?",
)
except Exception:
base = "?"
print(f" {marker} {name:20s} {base}")
def cmd_init_project(args):
name = args.name
pdir = projects_root() / name
if pdir.exists():
raise SystemExit(f"Proyecto '{name}' ya existe en {pdir}")
for sub in ("databases", "collections", "cards", "dashboards", "documents", "state"):
(pdir / sub).mkdir(parents=True, exist_ok=True)
cfg = {
"name": name,
"description": args.description or f"Proyecto Metabase: {name}",
"base_url": args.base_url,
"auth": {
"email_env": "METABASE_EMAIL",
"password_env": "METABASE_PASSWORD",
},
"sync": {
"ignore_databases": [1],
"ignore_collections": [],
"prefer_archive": True,
},
}
with (pdir / "config.yaml").open("w") as f:
yaml.safe_dump(cfg, f, sort_keys=False, default_flow_style=False)
(pdir / ".env.example").write_text(
"METABASE_EMAIL=admin@example.com\nMETABASE_PASSWORD=changeme\n"
)
(pdir / "state" / "index.json").write_text(
json.dumps({"databases": {}, "collections": {}, "cards": {}, "dashboards": {}, "documents": {}}, indent=2)
)
print(f"Proyecto '{name}' creado en {pdir.relative_to(APP_DIR)}")
print(f"Siguiente paso: cp {pdir.relative_to(APP_DIR)}/.env.example {pdir.relative_to(APP_DIR)}/.env y edita credenciales")
def cmd_login(args):
project = resolve_project(args.project)
client = get_client(project, force_login=True)
me = client.request("GET", "/api/user/current")
print(f"[{project.name}] login OK — {me['email']} (id={me['id']}, super={me.get('is_superuser')})")
def cmd_status(args):
project = resolve_project(args.project)
print(f"project: {project.name}")
print(f" base_url: {project.base_url}")
print(f" session: {'present' if project.load_session() else 'missing'}")
idx = project.load_index()
for kind in ("databases", "collections", "cards", "dashboards", "documents"):
n = len(idx.get(kind, {}))
print(f" indexed {kind:12s} {n}")
for sub in ("databases", "collections", "cards", "dashboards", "documents"):
p = project.dir / sub
n = len(list(p.glob("*.yaml"))) if p.exists() else 0
print(f" on disk {sub:12s} {n} archivos")
def cmd_pull(args):
from sync_pull import pull_one
project = resolve_project(args.project)
client = get_client(project)
pull_one(client, project, args.kind, args.ref)
def cmd_remote(args):
from sync_pull import remote_list
project = resolve_project(args.project)
client = get_client(project)
items = remote_list(client, args.kind, filter_name=args.filter)
if not items:
print("(sin resultados)")
return
print(f"{'ID':>5} {'NAME':40s} {'COL':>4} ARCH UPDATED_AT")
for i in items[:200]:
n = (i["name"] or "")[:40]
print(f"{i['id']:>5} {n:40s} {str(i.get('collection_id') or ''):>4} {'X' if i.get('archived') else ' '} {i.get('updated_at') or ''}")
if len(items) > 200:
print(f"... ({len(items) - 200} mas)")
def cmd_push(args):
from sync_push import push_one
project = resolve_project(args.project)
client = get_client(project)
push_one(
project, client, args.kind, args.slug,
apply=args.apply,
force_overwrite=args.force_overwrite,
allow_warnings=args.allow_warnings,
)
def cmd_push_all(args):
from sync_push import push_all
project = resolve_project(args.project)
client = get_client(project)
summary = push_all(
project, client,
apply=args.apply,
force_overwrite=args.force_overwrite,
allow_warnings=args.allow_warnings,
kinds=tuple(args.kinds),
)
# Exit 1 si hubo fallos, 0 si todo OK
sys.exit(1 if summary["failed"] else 0)
def cmd_restore(args):
from sync_restore import list_backups, restore_one
project = resolve_project(args.project)
if args.list:
backups = list_backups(project, args.kind, args.slug)
if not backups:
print(f"(sin backups para {args.kind} {args.slug})")
return
print(f"Backups disponibles para {args.kind} {args.slug} (mas reciente primero):")
for b in backups:
print(f" {b.relative_to(project.dir.parent.parent)}")
return
restore_one(project, args.kind, args.slug, from_ts=args.from_ts)
def cmd_validate(args):
from sync_validate import print_result, validate_one
project = resolve_project(args.project)
client = get_client(project) if args.check_sql else None
result = validate_one(project, args.kind, args.slug, check_sql=args.check_sql, client=client)
print_result(args.kind, args.slug, result)
if args.show_payload and result.payload is not None:
import json
print("\n--- payload ---")
print(json.dumps(result.payload, indent=2, default=str))
sys.exit(result.exit_code())
def cmd_diff(args):
print(f"diff: usa `validate {args.kind} {args.slug} --show-payload` por ahora")
def cmd_describe(args):
from explore import cmd_describe as _impl
project = resolve_project(args.project)
client = get_client(project)
_impl(args, project, client)
def cmd_sql(args):
from explore import cmd_sql as _impl
project = resolve_project(args.project)
client = get_client(project)
_impl(args, project, client)
# ---------------------------------------------------------------- Entrypoint
def main():
p = argparse.ArgumentParser(description="auto_metabase — Metabase as code, multi-proyecto")
p.add_argument("-p", "--project", help="Nombre del proyecto (default: del config top-level)")
sub = p.add_subparsers(dest="cmd", required=True)
sub.add_parser("projects", help="Lista proyectos").set_defaults(func=cmd_projects)
ip = sub.add_parser("init-project", help="Crea un proyecto nuevo")
ip.add_argument("name")
ip.add_argument("--base-url", required=True, help="URL del Metabase (ej: http://localhost:3000)")
ip.add_argument("--description")
ip.set_defaults(func=cmd_init_project)
sub.add_parser("login", help="Autentica y guarda token").set_defaults(func=cmd_login)
sub.add_parser("status", help="Estado del proyecto").set_defaults(func=cmd_status)
pp = sub.add_parser("pull", help="Trae UN item de Metabase a disco (per-item, nunca bulk)")
pp.add_argument("kind", choices=["card", "dashboard", "database", "collection"])
pp.add_argument("ref", help="Slug del index, o id Metabase (numerico)")
pp.set_defaults(func=cmd_pull)
rl = sub.add_parser("remote", help="Lista items en Metabase sin descargar nada")
rl.add_argument("kind", choices=["card", "dashboard", "database", "collection"])
rl.add_argument("--filter", help="Substring case-insensitive sobre name")
rl.set_defaults(func=cmd_remote)
va = sub.add_parser("validate", help="Valida un YAML local (read-only, no toca Metabase)")
va.add_argument("kind", choices=["card", "dashboard", "database", "collection"])
va.add_argument("slug")
va.add_argument("--check-sql", action="store_true",
help="Ejecuta la SQL contra Metabase para validar (solo cards native)")
va.add_argument("--show-payload", action="store_true",
help="Imprime el payload final que se enviaria")
va.set_defaults(func=cmd_validate)
pu = sub.add_parser("push", help="Aplica UN item a Metabase. Dry-run por defecto.")
pu.add_argument("kind", choices=["card", "dashboard"])
pu.add_argument("slug")
pu.add_argument("--apply", action="store_true",
help="Realmente envia a Metabase (sin esto solo dry-run)")
pu.add_argument("--force-overwrite", action="store_true",
help="Salta R17 (freshness) y R18 (count) — perdida de trabajo posible")
pu.add_argument("--allow-warnings", action="store_true",
help="Aplica aunque la validacion estructural genere warnings")
pu.set_defaults(func=cmd_push)
pa = sub.add_parser(
"push-all",
help="Pushea TODOS los YAMLs del proyecto (cards primero, dashboards despues). "
"Solo CREATE/UPDATE — nunca DELETE. Dry-run por defecto.",
)
pa.add_argument("--apply", action="store_true",
help="Realmente envia (sin esto, dry-run de cada item)")
pa.add_argument("--force-overwrite", action="store_true",
help="Salta R17 (freshness) y R18 (count) en cada item")
pa.add_argument("--allow-warnings", action="store_true",
help="Aplica aunque la validacion estructural genere warnings")
pa.add_argument("--kinds", nargs="+", default=["card", "dashboard"],
choices=["card", "dashboard"],
help="Que tipos pushear y en que orden (default: card dashboard)")
pa.set_defaults(func=cmd_push_all)
re_ = sub.add_parser("restore", help="Restaura YAML local desde backup (no aplica a Metabase)")
re_.add_argument("kind", choices=["card", "dashboard"])
re_.add_argument("slug")
re_.add_argument("--from", dest="from_ts", help="Timestamp del backup (default: mas reciente)")
re_.add_argument("--list", action="store_true", help="Lista backups disponibles")
re_.set_defaults(func=cmd_restore)
di = sub.add_parser("diff", help="Alias temporal de validate --show-payload")
di.add_argument("kind", choices=["card", "dashboard", "database", "collection"])
di.add_argument("slug")
di.set_defaults(func=cmd_diff)
de = sub.add_parser(
"describe",
help="Describe un database: tablas, columnas y tipos. Util para escribir cards sin adivinar.",
)
de.add_argument("db", help="slug de database (del index) o id numerico")
de.add_argument("--filter", help="Substring case-insensitive sobre nombre de tabla")
de.add_argument("--samples", action="store_true",
help="Muestra 3 filas de ejemplo por tabla (1 query SELECT * LIMIT 3 por tabla)")
de.add_argument("--tables-only", action="store_true",
help="Solo nombre de tabla y row count, sin columnas")
de.set_defaults(func=cmd_describe)
sq = sub.add_parser(
"sql",
help="Ejecuta SQL ad-hoc contra un database (read-only). NO crea card, "
"limite obligatorio para no explotar.",
)
sq.add_argument("db", help="slug de database (del index) o id numerico")
sq.add_argument("query", help="SQL a ejecutar (entre comillas)")
sq.add_argument("--limit", type=int, default=100,
help=f"Maximo filas a traer (default: 100, hard ceiling: 10000)")
sq.add_argument("--allow-write", action="store_true",
help="Permite queries que empiecen por INSERT/UPDATE/DELETE/etc (Metabase suele bloquearlas)")
sq.set_defaults(func=cmd_sql)
args = p.parse_args()
args.func(args)
if __name__ == "__main__":
main()
+220
View File
@@ -0,0 +1,220 @@
"""Construccion del payload final que se envia a Metabase.
Toma el YAML de un item local (con _meta + _refs + payload) y devuelve el
payload listo para POST/PUT, con slugs reemplazados por IDs reales del index.
Funciones puras — sin red, sin escritura, deterministicas.
"""
from __future__ import annotations
import copy
from pathlib import Path
from typing import Any
import yaml
# ---------------------------------------------------------------- Carga de YAMLs
def load_item_yaml(path: Path) -> dict:
"""Lee un YAML de item (card/dashboard/database/collection)."""
with path.open() as f:
doc = yaml.safe_load(f) or {}
if not isinstance(doc, dict):
raise ValueError(f"{path}: contenido no es un dict YAML")
for key in ("_meta", "_refs", "payload"):
if key not in doc:
raise ValueError(f"{path}: falta bloque '{key}'")
return doc
def assert_meta(doc: dict, expected_kind: str, expected_slug: str, path: Path) -> None:
"""Aborta si _meta no coincide con kind/slug esperados (regla R9)."""
meta = doc.get("_meta", {})
if meta.get("kind") != expected_kind:
raise ValueError(
f"{path}: _meta.kind='{meta.get('kind')}' pero esperaba '{expected_kind}'"
)
if meta.get("slug") != expected_slug:
raise ValueError(
f"{path}: _meta.slug='{meta.get('slug')}' pero esperaba '{expected_slug}'"
)
def assert_id_matches_index(doc: dict, kind: str, slug: str, index: dict, path: Path) -> None:
"""Aborta si _meta.id difiere del id del index (regla R11)."""
meta_id = doc.get("_meta", {}).get("id")
idx_id = index.get(kind + "s", {}).get(slug) # cards/dashboards/...
if meta_id is None and idx_id is None:
return # item nuevo, no hay id que comparar
if meta_id != idx_id:
raise ValueError(
f"{path}: _meta.id={meta_id} no coincide con index ({idx_id}). "
f"Posible YAML duplicado con slug renombrado pero id sin actualizar."
)
# ---------------------------------------------------------------- Resolucion de refs
def _resolve_slug(slug: str | None, kind_plural: str, index: dict) -> int | None:
"""Slug -> id Metabase. None -> None. Slug desconocido -> ValueError."""
if slug is None:
return None
mapping = index.get(kind_plural, {})
if slug not in mapping:
raise ValueError(
f"slug '{slug}' (tipo {kind_plural}) no existe en index. "
f"Conocidos: {sorted(mapping.keys())}"
)
return mapping[slug]
# ---------------------------------------------------------------- Builders por kind
def build_card_payload(doc: dict, index: dict) -> dict:
"""Resuelve refs y devuelve el payload listo para POST/PUT a /api/card.
Sin lecturas de Metabase. Sin merges con estado remoto. Solo lo que
tiene el YAML (regla R8).
"""
refs = doc.get("_refs", {}) or {}
payload = copy.deepcopy(doc.get("payload", {}) or {})
# database (obligatorio en cards)
db_slug = refs.get("database")
if db_slug is None:
raise ValueError("card payload: falta _refs.database")
db_id = _resolve_slug(db_slug, "databases", index)
payload["database_id"] = db_id
# dataset_query.database (mismo id, tambien debe ir resuelto)
dq = payload.get("dataset_query")
if isinstance(dq, dict) and "database" in dq:
dq["database"] = db_id
# collection (opcional; puede ser None = root)
if "collection" in refs:
coll_slug = refs["collection"]
payload["collection_id"] = _resolve_slug(coll_slug, "collections", index)
return payload
def build_dashboard_payload(doc: dict, index: dict) -> dict:
"""Resuelve refs y devuelve el payload listo para POST/PUT a /api/dashboard.
Para dashcards: cada `card` slug -> `card_id` int. Series tambien.
Mantiene la lista de dashcards COMPLETA tal como esta en el YAML
(Metabase la trata como estado deseado).
Auto-inyeccion para dashcards nuevas (sin `id`):
- `id`: asigna negativo unico (-1, -2, ...) en orden de aparicion.
- `visualization_settings`: {} si falta.
- `parameter_mappings`: [] si falta.
Si la dashcard ya trae `id` (positivo o negativo), no se toca.
"""
refs = doc.get("_refs", {}) or {}
payload = copy.deepcopy(doc.get("payload", {}) or {})
# collection (opcional)
if "collection" in refs:
coll_slug = refs["collection"]
payload["collection_id"] = _resolve_slug(coll_slug, "collections", index)
# dashcards: card slug -> card_id int + auto-inyeccion de campos nuevos
dashcards = payload.get("dashcards", []) or []
new_dashcards = []
next_neg_id = -1 # contador de ids negativos auto-asignados
used_neg_ids = {dc["id"] for dc in dashcards if isinstance(dc.get("id"), int) and dc["id"] < 0}
for i, dc in enumerate(dashcards):
dc = dict(dc)
card_slug = dc.pop("card", None)
if card_slug is not None:
try:
dc["card_id"] = _resolve_slug(card_slug, "cards", index)
except ValueError as e:
raise ValueError(f"dashcard #{i}: {e}") from None
# series: lista de slugs -> lista de {id}
series_slugs = dc.get("series") or []
if series_slugs:
dc["series"] = [
{"id": _resolve_slug(s, "cards", index)} for s in series_slugs
]
# auto-inyeccion: id negativo si no hay id (CREATE de dashcard)
if "id" not in dc:
while next_neg_id in used_neg_ids:
next_neg_id -= 1
dc["id"] = next_neg_id
used_neg_ids.add(next_neg_id)
next_neg_id -= 1
# auto-inyeccion: viz_settings y parameter_mappings vacios si faltan
dc.setdefault("visualization_settings", {})
dc.setdefault("parameter_mappings", [])
new_dashcards.append(dc)
payload["dashcards"] = new_dashcards
return payload
def build_database_payload(doc: dict, index: dict, env: dict) -> dict:
"""Construye payload de database. Resuelve passwords desde env vars.
Si details.password es ${VAR}, lo sustituye por env[VAR]. Si no esta,
deja el placeholder (push fallara con error claro).
"""
payload = copy.deepcopy(doc.get("payload", {}) or {})
details = payload.get("details", {}) or {}
pwd = details.get("password")
if isinstance(pwd, str) and pwd.startswith("${") and pwd.endswith("}"):
var_name = pwd[2:-1]
if var_name in env:
details["password"] = env[var_name]
# si no esta, queda el placeholder y push fallara
payload["details"] = details
return payload
def build_collection_payload(doc: dict, index: dict) -> dict:
refs = doc.get("_refs", {}) or {}
payload = copy.deepcopy(doc.get("payload", {}) or {})
if "parent" in refs:
parent_slug = refs["parent"]
payload["parent_id"] = _resolve_slug(parent_slug, "collections", index)
return payload
# ---------------------------------------------------------------- Dispatch
_BUILDERS = {
"card": build_card_payload,
"dashboard": build_dashboard_payload,
"database": build_database_payload,
"collection": build_collection_payload,
}
def build_payload(kind: str, doc: dict, index: dict, env: dict | None = None) -> dict:
"""Punto de entrada: dispatch por kind."""
if kind not in _BUILDERS:
raise ValueError(f"kind '{kind}' desconocido. Validos: {sorted(_BUILDERS)}")
if kind == "database":
return _BUILDERS[kind](doc, index, env or {})
return _BUILDERS[kind](doc, index)
def known_card_ids(index: dict) -> set[int]:
"""Set de ids de cards conocidas (para validacion de dashcards)."""
return set(index.get("cards", {}).values())
# ---------------------------------------------------------------- Lookup paths
def item_path(project_dir: Path, kind: str, slug: str) -> Path:
"""Path al YAML de un item: cards/foo.yaml, dashboards/bar.yaml, etc."""
return project_dir / (kind + "s") / f"{slug}.yaml"
@@ -0,0 +1,2 @@
METABASE_EMAIL=admin@auto-metabase.local
METABASE_PASSWORD=AutoMetabase123!
@@ -0,0 +1,28 @@
_meta:
kind: card
id: 48
slug: clientes_nuevos_por_mes
synced_at: '2026-04-13T10:49:59Z'
remote_updated_at: '2026-04-13T10:49:59.096661Z'
_refs:
database: sample_database
collection: null
payload:
description: Altas mensuales en PEOPLE (CREATED_AT)
archived: false
enable_embedding: false
query_type: native
name: Clientes nuevos por mes
type: question
dataset_query:
lib/type: mbql/query
database: sample_database
stages:
- lib/type: mbql.stage/native
native: "SELECT FORMATDATETIME(CREATED_AT, 'yyyy-MM') AS mes,\n COUNT(*) AS nuevos\n FROM PEOPLE\n GROUP BY mes\n\
\ ORDER BY mes"
parameter_mappings: []
display: line
collection_preview: true
visualization_settings: {}
parameters: []
@@ -0,0 +1,31 @@
_meta:
kind: card
id: 49
slug: clientes_por_edad
synced_at: '2026-04-13T10:49:59Z'
remote_updated_at: '2026-04-13T10:49:59.154517Z'
_refs:
database: sample_database
collection: null
payload:
description: Distribucion calculada desde BIRTH_DATE a fecha actual
archived: false
enable_embedding: false
query_type: native
name: Clientes por rango de edad
type: question
dataset_query:
lib/type: mbql/query
database: sample_database
stages:
- lib/type: mbql.stage/native
native: "SELECT\n CASE\n WHEN DATEDIFF('year', BIRTH_DATE, CURRENT_DATE) < 25 THEN '1) <25'\n WHEN DATEDIFF('year',\
\ BIRTH_DATE, CURRENT_DATE) < 35 THEN '2) 25-34'\n WHEN DATEDIFF('year', BIRTH_DATE, CURRENT_DATE) < 45 THEN '3)\
\ 35-44'\n WHEN DATEDIFF('year', BIRTH_DATE, CURRENT_DATE) < 55 THEN '4) 45-54'\n WHEN DATEDIFF('year', BIRTH_DATE,\
\ CURRENT_DATE) < 65 THEN '5) 55-64'\n ELSE '6) 65+'\n END AS rango_edad,\n COUNT(*) AS clientes\nFROM PEOPLE\n\
WHERE BIRTH_DATE IS NOT NULL\nGROUP BY rango_edad\nORDER BY rango_edad"
parameter_mappings: []
display: bar
collection_preview: true
visualization_settings: {}
parameters: []
@@ -0,0 +1,27 @@
_meta:
kind: card
id: 46
slug: clientes_por_estado
synced_at: '2026-04-13T10:49:59Z'
remote_updated_at: '2026-04-13T10:49:59.215737Z'
_refs:
database: sample_database
collection: null
payload:
description: Top 10 estados (US) por numero de clientes
archived: false
enable_embedding: false
query_type: native
name: clientes_por_estado
type: question
dataset_query:
lib/type: mbql/query
database: sample_database
stages:
- lib/type: mbql.stage/native
native: "SELECT STATE, COUNT(*) AS clientes\n FROM PEOPLE\n GROUP BY STATE\n ORDER BY clientes DESC\n LIMIT 10"
parameter_mappings: []
display: bar
collection_preview: true
visualization_settings: {}
parameters: []
@@ -0,0 +1,28 @@
_meta:
kind: card
id: 50
slug: clientes_por_source
synced_at: '2026-04-13T10:49:59Z'
remote_updated_at: '2026-04-13T10:49:59.276556Z'
_refs:
database: sample_database
collection: null
payload:
description: Distribucion por SOURCE (Google, Twitter, Facebook, Organic, Affiliate)
archived: false
enable_embedding: false
query_type: native
name: Clientes por canal
type: question
dataset_query:
lib/type: mbql/query
database: sample_database
stages:
- lib/type: mbql.stage/native
native: "SELECT COALESCE(SOURCE, 'unknown') AS canal,\n COUNT(*) AS clientes\n FROM PEOPLE\n GROUP BY canal\n\
\ ORDER BY clientes DESC"
parameter_mappings: []
display: pie
collection_preview: true
visualization_settings: {}
parameters: []
@@ -0,0 +1,27 @@
_meta:
kind: card
id: 51
slug: clientes_recientes
synced_at: '2026-04-13T10:49:59Z'
remote_updated_at: '2026-04-13T10:49:59.325878Z'
_refs:
database: sample_database
collection: null
payload:
description: Tabla de las 20 altas mas recientes en PEOPLE
archived: false
enable_embedding: false
query_type: native
name: Ultimos 20 clientes registrados
type: question
dataset_query:
lib/type: mbql/query
database: sample_database
stages:
- lib/type: mbql.stage/native
native: "SELECT NAME, EMAIL, CITY, STATE, SOURCE, CREATED_AT\n FROM PEOPLE\n ORDER BY CREATED_AT DESC\n LIMIT 20"
parameter_mappings: []
display: table
collection_preview: true
visualization_settings: {}
parameters: []
@@ -0,0 +1,28 @@
_meta:
kind: card
id: 52
slug: clientes_top_ciudades
synced_at: '2026-04-13T10:49:59Z'
remote_updated_at: '2026-04-13T10:49:59.37711Z'
_refs:
database: sample_database
collection: null
payload:
description: Ciudades con mas clientes
archived: false
enable_embedding: false
query_type: native
name: Top 15 ciudades
type: question
dataset_query:
lib/type: mbql/query
database: sample_database
stages:
- lib/type: mbql.stage/native
native: "SELECT CITY, STATE, COUNT(*) AS clientes\n FROM PEOPLE\n GROUP BY CITY, STATE\n ORDER BY clientes DESC\n LIMIT\
\ 15"
parameter_mappings: []
display: bar
collection_preview: true
visualization_settings: {}
parameters: []
@@ -0,0 +1,27 @@
_meta:
kind: card
id: 42
slug: clientes_total
synced_at: '2026-04-13T10:49:59Z'
remote_updated_at: '2026-04-13T10:49:59.441467Z'
_refs:
database: sample_database
collection: null
payload:
description: Numero total de clientes en PEOPLE
archived: false
enable_embedding: false
query_type: native
name: clientes_total
type: question
dataset_query:
lib/type: mbql/query
database: sample_database
stages:
- lib/type: mbql.stage/native
native: SELECT COUNT(*) AS total FROM PEOPLE
parameter_mappings: []
display: scalar
collection_preview: true
visualization_settings: {}
parameters: []
@@ -0,0 +1,28 @@
_meta:
kind: card
id: 45
slug: compras_por_mes
synced_at: '2026-04-13T10:49:59Z'
remote_updated_at: '2026-04-13T10:49:59.516411Z'
_refs:
database: sample_database
collection: null
payload:
description: Numero de compras agrupadas por mes
archived: false
enable_embedding: false
query_type: native
name: compras_por_mes
type: question
dataset_query:
lib/type: mbql/query
database: sample_database
stages:
- lib/type: mbql.stage/native
native: "SELECT FORMATDATETIME(CREATED_AT, 'yyyy-MM') AS mes,\n COUNT(*) AS compras\n FROM ORDERS\n GROUP BY\
\ mes\n ORDER BY mes"
parameter_mappings: []
display: line
collection_preview: true
visualization_settings: {}
parameters: []
@@ -0,0 +1,27 @@
_meta:
kind: card
id: 43
slug: compras_total
synced_at: '2026-04-13T10:49:59Z'
remote_updated_at: '2026-04-13T10:49:59.586558Z'
_refs:
database: sample_database
collection: null
payload:
description: Numero total de compras en ORDERS
archived: false
enable_embedding: false
query_type: native
name: compras_total
type: question
dataset_query:
lib/type: mbql/query
database: sample_database
stages:
- lib/type: mbql.stage/native
native: SELECT COUNT(*) AS total FROM ORDERS
parameter_mappings: []
display: scalar
collection_preview: true
visualization_settings: {}
parameters: []
@@ -0,0 +1,27 @@
_meta:
kind: card
id: 44
slug: ingresos_totales
synced_at: '2026-04-13T10:49:59Z'
remote_updated_at: '2026-04-13T10:49:59.663393Z'
_refs:
database: sample_database
collection: null
payload:
description: Suma de TOTAL de todas las compras
archived: false
enable_embedding: false
query_type: native
name: ingresos_totales
type: question
dataset_query:
lib/type: mbql/query
database: sample_database
stages:
- lib/type: mbql.stage/native
native: SELECT ROUND(SUM(TOTAL), 2) AS ingresos FROM ORDERS
parameter_mappings: []
display: scalar
collection_preview: true
visualization_settings: {}
parameters: []
@@ -0,0 +1,27 @@
_meta:
kind: card
id: 40
slug: test_count_users
synced_at: '2026-04-13T10:49:59Z'
remote_updated_at: '2026-04-13T10:49:59.727051Z'
_refs:
database: metabase_internal_pg
collection: null
payload:
description: otro cambio externo
archived: false
enable_embedding: false
query_type: native
name: test_count_users
type: question
dataset_query:
lib/type: mbql/query
database: metabase_internal_pg
stages:
- lib/type: mbql.stage/native
native: SELECT COUNT(*) AS users FROM core_user
parameter_mappings: []
display: scalar
collection_preview: true
visualization_settings: {}
parameters: []
@@ -0,0 +1,26 @@
_meta:
kind: card
id: 41
slug: test_users_by_locale
synced_at: '2026-04-13T10:49:59Z'
remote_updated_at: '2026-04-13T10:49:59.796377Z'
_refs:
database: metabase_internal_pg
collection: null
payload:
archived: false
enable_embedding: false
query_type: native
name: test_users_by_locale
type: question
dataset_query:
lib/type: mbql/query
database: metabase_internal_pg
stages:
- lib/type: mbql.stage/native
native: SELECT COALESCE(locale, 'unknown') AS locale, COUNT(*) AS n FROM core_user GROUP BY locale ORDER BY n DESC
parameter_mappings: []
display: bar
collection_preview: true
visualization_settings: {}
parameters: []
@@ -0,0 +1,29 @@
_meta:
kind: card
id: 47
slug: top_clientes
synced_at: '2026-04-13T10:49:59Z'
remote_updated_at: '2026-04-13T10:49:59.882916Z'
_refs:
database: sample_database
collection: null
payload:
description: Top 10 clientes por gasto total
archived: false
enable_embedding: false
query_type: native
name: top_clientes
type: question
dataset_query:
lib/type: mbql/query
database: sample_database
stages:
- lib/type: mbql.stage/native
native: "SELECT P.NAME AS cliente,\n P.EMAIL AS email,\n COUNT(O.ID) AS num_compras,\n ROUND(SUM(O.TOTAL),\
\ 2) AS total_gastado\n FROM PEOPLE P\n JOIN ORDERS O ON O.USER_ID = P.ID\n GROUP BY P.NAME, P.EMAIL\n ORDER BY\
\ total_gastado DESC\n LIMIT 10"
parameter_mappings: []
display: table
collection_preview: true
visualization_settings: {}
parameters: []
@@ -0,0 +1,15 @@
name: test_local
description: "Metabase de prueba en Docker local (container auto_metabase_test-metabase)"
base_url: http://localhost:3000
auth:
email_env: METABASE_EMAIL
password_env: METABASE_PASSWORD
sync:
# IDs de databases a ignorar (1 = Sample Database interno)
ignore_databases: [1]
# IDs de colecciones a ignorar
ignore_collections: []
# Si true, archiva en Metabase en vez de eliminar al hacer push
prefer_archive: true
@@ -0,0 +1,31 @@
_meta:
kind: dashboard
id: 2
slug: auto_metabase_test_dashboard
synced_at: '2026-04-13T10:50:00Z'
remote_updated_at: '2026-04-13T09:43:33.289419Z'
dashcards_count: 2
tabs_count: 0
parameters_count: 0
_refs:
collection: null
payload:
description: Dashboard de prueba para auto_metabase
archived: false
dashcards:
- size_x: 6
col: 0
size_y: 4
row: 0
card: test_count_users
- size_x: 6
col: 6
size_y: 4
row: 0
card: test_users_by_locale
tabs: []
enable_embedding: false
name: auto_metabase test dashboard
width: fixed
parameters: []
auto_apply_filters: true
@@ -0,0 +1,51 @@
_meta:
kind: dashboard
id: 4
slug: compras_y_clientes
synced_at: '2026-04-13T10:50:00Z'
remote_updated_at: '2026-04-13T10:40:30.171442Z'
dashcards_count: 6
tabs_count: 0
parameters_count: 0
_refs:
collection: null
payload:
description: Vista general de compras (ORDERS) y clientes (PEOPLE) del Sample Database
archived: false
dashcards:
- size_x: 12
col: 12
size_y: 6
row: 3
card: clientes_por_estado
- size_x: 12
col: 0
size_y: 6
row: 3
card: compras_por_mes
- size_x: 8
col: 0
size_y: 3
row: 0
card: clientes_total
- size_x: 8
col: 8
size_y: 3
row: 0
card: compras_total
- size_x: 8
col: 16
size_y: 3
row: 0
card: ingresos_totales
- size_x: 24
col: 0
size_y: 7
row: 9
card: top_clientes
tabs: []
enable_embedding: false
name: Compras y Clientes
width: fixed
parameters: []
auto_apply_filters: true
@@ -0,0 +1,36 @@
_meta:
kind: dashboard
id: 5
slug: kpis_minimal
synced_at: '2026-04-13T10:50:00Z'
remote_updated_at: '2026-04-13T10:47:00.301085Z'
dashcards_count: 3
tabs_count: 0
parameters_count: 0
_refs:
collection: null
payload:
description: Dashboard test del auto-inject — YAML sin id/viz/param_mappings en dashcards
archived: false
dashcards:
- size_x: 8
col: 0
size_y: 3
row: 0
card: clientes_total
- size_x: 8
col: 8
size_y: 3
row: 0
card: compras_total
- size_x: 8
col: 16
size_y: 3
row: 0
card: ingresos_totales
tabs: []
enable_embedding: false
name: KPIs Minimal
width: fixed
parameters: []
auto_apply_filters: true
@@ -0,0 +1,56 @@
_meta:
kind: dashboard
id: 6
slug: panel_clientes
synced_at: '2026-04-13T10:50:00Z'
remote_updated_at: '2026-04-13T10:50:00.213836Z'
dashcards_count: 7
tabs_count: 0
parameters_count: 0
_refs:
collection: null
payload:
description: Vista 360 de los clientes (PEOPLE) del Sample Database — total, altas, canal, geografia, edad y registros recientes.
archived: false
dashcards:
- size_x: 24
col: 0
size_y: 3
row: 0
card: clientes_total
- size_x: 24
col: 0
size_y: 6
row: 3
card: clientes_nuevos_por_mes
- size_x: 12
col: 0
size_y: 6
row: 9
card: clientes_por_source
- size_x: 12
col: 12
size_y: 6
row: 9
card: clientes_por_edad
- size_x: 12
col: 0
size_y: 6
row: 15
card: clientes_por_estado
- size_x: 12
col: 12
size_y: 6
row: 15
card: clientes_top_ciudades
- size_x: 24
col: 0
size_y: 8
row: 21
card: clientes_recientes
tabs: []
enable_embedding: false
name: Panel de Clientes
width: fixed
parameters: []
auto_apply_filters: true
@@ -0,0 +1,17 @@
_meta:
kind: database
id: 2
slug: metabase_internal_pg
_refs: {}
payload:
timezone: GMT
auto_run_queries: true
name: metabase_internal_pg
details:
host: auto_metabase_test-postgres
port: 5432
dbname: metabase
user: metabase
password: ${METABASE_DB_PASSWORD_METABASE_INTERNAL_PG}
ssl: false
engine: postgres
@@ -0,0 +1,29 @@
{
"cards": {
"clientes_nuevos_por_mes": 48,
"clientes_por_edad": 49,
"clientes_por_estado": 46,
"clientes_por_source": 50,
"clientes_recientes": 51,
"clientes_top_ciudades": 52,
"clientes_total": 42,
"compras_por_mes": 45,
"compras_total": 43,
"ingresos_totales": 44,
"test_count_users": 40,
"test_users_by_locale": 41,
"top_clientes": 47
},
"collections": {},
"dashboards": {
"auto_metabase_test_dashboard": 2,
"compras_y_clientes": 4,
"kpis_minimal": 5,
"panel_clientes": 6
},
"databases": {
"metabase_internal_pg": 2,
"sample_database": 1
},
"documents": {}
}
@@ -0,0 +1,122 @@
"""Crea database + cards + dashboard de prueba en Metabase para validar pull/push.
Usa la propia Postgres interna de Metabase (auto_metabase_test-postgres) como
database de prueba, ya que es accesible desde el container metabase via la
red docker compartida.
"""
import sys
from pathlib import Path
APP_DIR = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(APP_DIR.parent.parent / "python" / "functions"))
sys.path.insert(0, str(APP_DIR))
from main import get_client # noqa: E402
from metabase.databases import metabase_add_database, metabase_list_databases # noqa: E402
from metabase.cards import metabase_create_card, metabase_list_cards # noqa: E402
from metabase.dashboards import ( # noqa: E402
metabase_create_dashboard,
metabase_list_dashboards,
metabase_update_dashboard,
)
def find_or_create_database(client) -> int:
dbs = metabase_list_databases(client)
# list_databases puede retornar un dict con 'data' o una lista directa
items = dbs["data"] if isinstance(dbs, dict) and "data" in dbs else dbs
for db in items:
if db.get("name") == "metabase_internal_pg":
print(f" database existente id={db['id']}")
return db["id"]
db = metabase_add_database(
client,
name="metabase_internal_pg",
engine="postgres",
details={
"host": "auto_metabase_test-postgres",
"port": 5432,
"dbname": "metabase",
"user": "metabase",
"password": "metabase",
"ssl": False,
},
)
print(f" database creada id={db['id']}")
return db["id"]
def find_or_create_card(client, name: str, db_id: int, sql: str, display: str = "table") -> int:
cards = metabase_list_cards(client)
for c in cards:
if c.get("name") == name:
print(f" card '{name}' existente id={c['id']}")
return c["id"]
card = metabase_create_card(
client,
name=name,
dataset_query={
"type": "native",
"native": {"query": sql},
"database": db_id,
},
display=display,
)
print(f" card '{name}' creada id={card['id']}")
return card["id"]
def find_or_create_dashboard(client, name: str) -> int:
dashes = metabase_list_dashboards(client)
for d in dashes:
if d.get("name") == name:
print(f" dashboard '{name}' existente id={d['id']}")
return d["id"]
d = metabase_create_dashboard(client, name=name, description="Dashboard de prueba para auto_metabase")
print(f" dashboard '{name}' creado id={d['id']}")
return d["id"]
def main():
client = get_client()
print("Seeding test data en Metabase...")
print("\n[1] Database")
db_id = find_or_create_database(client)
print("\n[2] Cards")
c1 = find_or_create_card(
client, "test_count_users", db_id,
"SELECT COUNT(*) AS users FROM core_user", "scalar",
)
c2 = find_or_create_card(
client, "test_users_by_locale", db_id,
"SELECT COALESCE(locale, 'unknown') AS locale, COUNT(*) AS n FROM core_user GROUP BY locale ORDER BY n DESC",
"bar",
)
print("\n[3] Dashboard con cards")
dash_id = find_or_create_dashboard(client, "auto_metabase test dashboard")
# Re-fetch dashboard para ver estado actual
from metabase.dashboards import metabase_get_dashboard
dash = metabase_get_dashboard(client, dash_id)
existing_card_ids = {dc.get("card_id") for dc in dash.get("dashcards", [])}
if c1 in existing_card_ids and c2 in existing_card_ids:
print(f" dashboard ya tiene las {len(dash.get('dashcards', []))} dashcards esperadas")
else:
# Construir dashcards: id negativo => nueva
new_dashcards = [
{"id": -1, "card_id": c1, "row": 0, "col": 0, "size_x": 6, "size_y": 4},
{"id": -2, "card_id": c2, "row": 0, "col": 6, "size_x": 6, "size_y": 4},
]
metabase_update_dashboard(client, dash_id, dashcards=new_dashcards)
print(f" dashcards añadidas: {len(new_dashcards)}")
print(f"\nListo. Abre http://localhost:3000/dashboard/{dash_id}")
if __name__ == "__main__":
main()
+372
View File
@@ -0,0 +1,372 @@
"""Pull per-item: trae UN item de Metabase a disco. Nunca bulk.
R14: pull de dashboard SIEMPRE completo (todas las dashcards, tabs, parameters).
R15: para cada card_id referenciado en dashcards no presente en index, registra
slug→id en index sin escribir el YAML (option C: tracked sin file).
R16: cada YAML lleva en _meta los campos:
- synced_at: timestamp del momento del pull (ISO UTC)
- remote_updated_at: updated_at que Metabase reportaba en ese momento
- dashcards_count, tabs_count, parameters_count: snapshots para R18/R20
Funciones publicas:
pull_one(client, project, kind, ref) -> dict # ref: int id o str slug
"""
from __future__ import annotations
import datetime as dt
import re
from pathlib import Path
from typing import Any
import yaml
from metabase.cards import metabase_get_card, metabase_list_cards
from metabase.dashboards import metabase_get_dashboard, metabase_list_dashboards
from metabase.databases import metabase_get_database, metabase_list_databases
# Campos volatiles a descartar del payload (mismos que ya teniamos)
_VOLATILE_KEYS = frozenset({
"created_at", "updated_at", "last_used_at", "last_viewed_at",
"last_query_start", "last_used_param_values", "view_count",
"dashboard_count", "parameter_usage_count", "average_query_time",
"creator_id", "creator", "made_public_by_id", "last-edit-info",
"public_uuid", "entity_id", "card_schema", "metabase_version",
"result_metadata", "legacy_query", "source_card_id",
"can_write", "can_restore", "can_delete", "can_run_adhoc_query",
"can_manage_db", "can_set_cache_policy", "can-manage", "can_upload",
"archived_directly", "moderation_reviews", "embedding_type",
"dependency_analysis_version", "initially_published_at",
"param_fields", "is_remote_synced", "show_in_getting_started",
"collection_position", "position", "cache_invalidated_at",
"is_sample", "is_audit", "is_attached_dwh", "is_on_demand",
"is_full_sync", "initial_sync_status", "dbms_version",
"router_database_id", "router_user_attribute",
"uploads_enabled", "uploads_schema_name", "uploads_table_prefix",
"refingerprint", "schedules", "metadata_sync_schedule",
"cache_field_values_schedule", "write_data_details", "provider_name",
"workspace_permissions_status", "features", "id",
"dashboard", "dashboard_id", "table_id",
})
def _utc_now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _slugify(name: str) -> str:
s = re.sub(r"[^a-z0-9]+", "_", (name or "").lower()).strip("_")
return s or "untitled"
def _strip_volatile(value: Any) -> Any:
if isinstance(value, dict):
out = {}
for k, v in value.items():
if k in _VOLATILE_KEYS:
continue
cleaned = _strip_volatile(v)
if cleaned is None:
continue
out[k] = cleaned
return out
if isinstance(value, list):
return [_strip_volatile(x) for x in value]
return value
def _yaml_dump(path: Path, data: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w") as f:
yaml.safe_dump(data, f, sort_keys=False, allow_unicode=True, default_flow_style=False, width=120)
def _id_to_slug(id_: int | None, mapping: dict[str, int]) -> str | None:
if id_ is None:
return None
for slug, mid in mapping.items():
if mid == id_:
return slug
return None
def _resolve_ref(ref: str | int, kind_plural: str, index: dict) -> int:
"""Devuelve el id Metabase a partir de un id int o slug str."""
if isinstance(ref, int):
return ref
if isinstance(ref, str) and ref.isdigit():
return int(ref)
mapping = index.get(kind_plural, {})
if ref not in mapping:
raise SystemExit(
f"Ref '{ref}' no encontrado en index.{kind_plural}. "
f"Conocidos: {sorted(mapping.keys()) or '(vacio)'}. "
f"Si es un id Metabase nuevo, pasa el numero directamente."
)
return mapping[ref]
def _slug_for(name: str, existing_mapping: dict[str, int], item_id: int) -> str:
"""Reusa el slug del index si ya esta mapeado al mismo id, sino genera uno nuevo."""
for slug, mid in existing_mapping.items():
if mid == item_id:
return slug
base = _slugify(name)
if base not in existing_mapping:
return base
i = 2
while f"{base}_{i}" in existing_mapping:
i += 1
return f"{base}_{i}"
# ---------------------------------------------------------------- Per-kind
def pull_database(client, project, ref: str | int) -> dict:
index = project.load_index()
db_id = _resolve_ref(ref, "databases", index)
full = metabase_get_database(client, db_id)
slug = _slug_for(full.get("name", "db"), index.get("databases", {}), db_id)
payload = _strip_volatile(full)
if "details" in payload and "password" in payload["details"]:
payload["details"]["password"] = f"${{METABASE_DB_PASSWORD_{slug.upper()}}}"
body = {
"_meta": {
"kind": "database",
"id": db_id,
"slug": slug,
"synced_at": _utc_now_iso(),
"remote_updated_at": full.get("updated_at"),
},
"_refs": {},
"payload": payload,
}
path = project.dir / "databases" / f"{slug}.yaml"
_yaml_dump(path, body)
index.setdefault("databases", {})[slug] = db_id
project.save_index(index)
print(f"[{project.name}] pull database {slug} (id={db_id}) -> {path.relative_to(project.dir.parent.parent)}")
return body
def pull_collection(client, project, ref: str | int) -> dict:
index = project.load_index()
coll_id = _resolve_ref(ref, "collections", index)
full = client.request("GET", f"/api/collection/{coll_id}")
slug = _slug_for(full.get("name", "col"), index.get("collections", {}), coll_id)
parent_id = full.get("parent_id")
parent_slug = _id_to_slug(parent_id, index.get("collections", {}))
payload = _strip_volatile(full)
payload.pop("parent_id", None)
body = {
"_meta": {
"kind": "collection",
"id": coll_id,
"slug": slug,
"synced_at": _utc_now_iso(),
"remote_updated_at": full.get("updated_at"),
},
"_refs": {"parent": parent_slug},
"payload": payload,
}
path = project.dir / "collections" / f"{slug}.yaml"
_yaml_dump(path, body)
index.setdefault("collections", {})[slug] = coll_id
project.save_index(index)
print(f"[{project.name}] pull collection {slug} (id={coll_id}) -> {path.relative_to(project.dir.parent.parent)}")
return body
def pull_card(client, project, ref: str | int) -> dict:
index = project.load_index()
card_id = _resolve_ref(ref, "cards", index)
full = metabase_get_card(client, card_id)
slug = _slug_for(full.get("name", "card"), index.get("cards", {}), card_id)
refs = {
"database": _id_to_slug(full.get("database_id"), index.get("databases", {})),
"collection": _id_to_slug(full.get("collection_id"), index.get("collections", {})),
}
if refs["database"] is None and full.get("database_id") is not None:
# Card apunta a una database que no esta en nuestro index todavia
print(
f" ! warning: database_id={full['database_id']} no esta en index. "
f"El push de esta card fallara hasta que pullees esa database."
)
payload = _strip_volatile(full)
payload.pop("database_id", None)
payload.pop("collection_id", None)
payload.pop("collection", None)
if isinstance(payload.get("dataset_query"), dict) and "database" in payload["dataset_query"]:
payload["dataset_query"]["database"] = refs["database"]
body = {
"_meta": {
"kind": "card",
"id": card_id,
"slug": slug,
"synced_at": _utc_now_iso(),
"remote_updated_at": full.get("updated_at"),
},
"_refs": refs,
"payload": payload,
}
path = project.dir / "cards" / f"{slug}.yaml"
_yaml_dump(path, body)
index.setdefault("cards", {})[slug] = card_id
project.save_index(index)
print(f"[{project.name}] pull card {slug} (id={card_id}) -> {path.relative_to(project.dir.parent.parent)}")
return body
def pull_dashboard(client, project, ref: str | int) -> dict:
"""R14: pull SIEMPRE completo. R15: registra card refs en index sin escribir files."""
index = project.load_index()
dash_id = _resolve_ref(ref, "dashboards", index)
full = metabase_get_dashboard(client, dash_id)
slug = _slug_for(full.get("name", "dashboard"), index.get("dashboards", {}), dash_id)
coll_slug = _id_to_slug(full.get("collection_id"), index.get("collections", {}))
refs = {"collection": coll_slug}
payload = _strip_volatile(full)
payload.pop("collection_id", None)
payload.pop("collection", None)
# Procesar dashcards: registrar cada card_id en index si no esta (R15)
cards_idx = index.setdefault("cards", {})
clean_dashcards = []
tracked_count = 0
for dc in payload.get("dashcards", []) or []:
dc = dict(dc)
cid = dc.pop("card_id", None)
dc.pop("card", None)
dc.pop("dashboard_id", None)
card_slug: str | None = None
if cid is not None:
card_slug = _id_to_slug(cid, cards_idx)
if card_slug is None:
# Card no esta en index: la registramos sin descargarla
# Solo necesitamos el name para slugify
try:
card_meta = metabase_get_card(client, cid)
card_slug = _slug_for(card_meta.get("name", f"card_{cid}"), cards_idx, cid)
cards_idx[card_slug] = cid
tracked_count += 1
except Exception as e:
print(f" ! warning: card_id={cid} en dashcards no se pudo trackear: {e}")
card_slug = f"_unknown_card_{cid}"
dc["card"] = card_slug
# series: lista de cards extra
series = dc.get("series") or []
if series:
new_series = []
for s in series:
sid = s.get("id") if isinstance(s, dict) else s
s_slug = _id_to_slug(sid, cards_idx)
if s_slug is None and sid is not None:
try:
sm = metabase_get_card(client, sid)
s_slug = _slug_for(sm.get("name", f"card_{sid}"), cards_idx, sid)
cards_idx[s_slug] = sid
tracked_count += 1
except Exception:
s_slug = f"_unknown_card_{sid}"
new_series.append(s_slug)
dc["series"] = new_series
clean_dashcards.append({k: v for k, v in dc.items() if v not in (None, [], {})})
payload["dashcards"] = clean_dashcards
body = {
"_meta": {
"kind": "dashboard",
"id": dash_id,
"slug": slug,
"synced_at": _utc_now_iso(),
"remote_updated_at": full.get("updated_at"),
"dashcards_count": len(clean_dashcards),
"tabs_count": len(payload.get("tabs", []) or []),
"parameters_count": len(payload.get("parameters", []) or []),
},
"_refs": refs,
"payload": payload,
}
path = project.dir / "dashboards" / f"{slug}.yaml"
_yaml_dump(path, body)
index.setdefault("dashboards", {})[slug] = dash_id
project.save_index(index)
msg = f"[{project.name}] pull dashboard {slug} (id={dash_id}) -> {path.relative_to(project.dir.parent.parent)}"
if tracked_count:
msg += f" [+{tracked_count} cards trackeadas en index sin file]"
print(msg)
return body
# ---------------------------------------------------------------- Dispatch
_PULLERS = {
"card": pull_card,
"dashboard": pull_dashboard,
"database": pull_database,
"collection": pull_collection,
}
def pull_one(client, project, kind: str, ref: str | int) -> dict:
if kind not in _PULLERS:
raise SystemExit(f"kind '{kind}' invalido. Validos: {sorted(_PULLERS)}")
return _PULLERS[kind](client, project, ref)
# ---------------------------------------------------------------- Remote list (descubrir sin descargar)
def remote_list(client, kind: str, *, filter_name: str | None = None) -> list[dict]:
"""Lista items en Metabase sin tocar disco. Resumen ligero."""
if kind == "card":
items = metabase_list_cards(client)
elif kind == "dashboard":
items = metabase_list_dashboards(client)
elif kind == "database":
raw = metabase_list_databases(client)
items = raw["data"] if isinstance(raw, dict) and "data" in raw else raw
elif kind == "collection":
items = client.request("GET", "/api/collection") or []
else:
raise SystemExit(f"kind '{kind}' invalido")
if filter_name:
f = filter_name.lower()
items = [i for i in items if f in (i.get("name") or "").lower()]
out = []
for i in items:
out.append({
"id": i.get("id"),
"name": i.get("name"),
"collection_id": i.get("collection_id"),
"archived": i.get("archived", False),
"updated_at": i.get("updated_at"),
})
return out
+406
View File
@@ -0,0 +1,406 @@
"""Push per-item: aplica UN cambio a Metabase. Implementa las 20 reglas duras.
Resumen de las reglas que este modulo garantiza:
- R1: target unico (validado por argparse en main.py).
- R2: 1 sola request HTTP por invocacion en la fase de apply
(excepcion: dashboards nuevos con dashcards = POST + PUT, documentado
en metabase_create_dashboard_raw).
- R3: push de dashboard NO toca cards. Solo dashcards refs + layout + meta.
- R4: push de card NO toca dashboards.
- R5: dry-run por defecto. --apply requerido para enviar.
- R6: backup obligatorio antes de UPDATE (no en CREATE).
- R7: payload se construye solo desde el YAML del item.
- R8: payload PUT/POST contiene solo lo del YAML, sin merge con remoto.
- R9: _meta.kind/slug deben coincidir con args (validado en validate_one).
- R10: _refs deben resolver a ids del index (validado en validate_one).
- R11: _meta.id debe coincidir con index (validado en validate_one).
- R12: cap de tamano de payload — pide confirmacion si supera 100KB.
- R13: log de cada push en state/push.log (jsonl).
- R14, R15, R16: garantizadas en sync_pull.py.
- R17: freshness check (compara remote.updated_at vs _meta.remote_updated_at).
- R18: count check para dashboards (dashcards/tabs/parameters no menores en local).
- R19: --force-overwrite para saltar R17 + R18 explicitamente.
- R20: cubierto por R18 (cuenta tabs y parameters tambien).
"""
from __future__ import annotations
import datetime as dt
import json
import shutil
import sys
from pathlib import Path
from typing import Any
import yaml
from metabase.cards import (
metabase_create_card_raw,
metabase_get_card,
metabase_update_card,
)
from metabase.dashboards import (
metabase_create_dashboard_raw,
metabase_get_dashboard,
metabase_update_dashboard,
)
from payload import item_path, load_item_yaml
from sync_pull import pull_one
from sync_validate import print_result, validate_one
# Limite del payload para R12
_PAYLOAD_SIZE_WARN_BYTES = 100_000
# ---------------------------------------------------------------- Helpers
def _utc_now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _ts_for_path() -> str:
return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%d_%H%M%S")
def _yaml_dump(path: Path, data: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w") as f:
yaml.safe_dump(data, f, sort_keys=False, allow_unicode=True, default_flow_style=False, width=120)
def _log_push(project, entry: dict) -> None:
"""R13: append-only jsonl log de cada push (dry-run o apply)."""
log_path = project.state_dir / "push.log"
project.state_dir.mkdir(exist_ok=True)
with log_path.open("a") as f:
f.write(json.dumps(entry) + "\n")
_active_log_entry: dict | None = None
_active_project = None
def _abort(msg: str) -> None:
"""Aborta con exit 2. Si hay un log_entry activo, lo persiste como 'aborted'."""
print(f"\nABORT — {msg}", file=sys.stderr)
if _active_log_entry is not None and _active_project is not None:
_active_log_entry["status"] = "aborted"
_active_log_entry["abort_reason"] = msg.split("\n", 1)[0]
_log_push(_active_project, _active_log_entry)
sys.exit(2)
# ---------------------------------------------------------------- R6: backup
def _backup_or_abort(project, kind: str, slug: str, current_remote: dict) -> Path:
"""R6: serializa el estado remoto actual a state/backups/{ts}/{kind}/{slug}.yaml.
Si la escritura falla, aborta antes de tocar Metabase."""
ts = _ts_for_path()
backup_path = project.state_dir / "backups" / ts / (kind + "s") / f"{slug}.yaml"
try:
backup_path.parent.mkdir(parents=True, exist_ok=True)
with backup_path.open("w") as f:
yaml.safe_dump(
{"_backup_of": {"kind": kind, "slug": slug, "ts": ts},
"remote_state": current_remote},
f, sort_keys=False, allow_unicode=True, default_flow_style=False, width=120,
)
except Exception as e:
_abort(f"R6 backup fallo, no se aplicara nada. Error: {e}")
print(f" backup: {backup_path.relative_to(project.dir.parent.parent)}")
return backup_path
# ---------------------------------------------------------------- R17 + R18 + R20
def _freshness_check_or_abort(
kind: str, slug: str, local_doc: dict, remote: dict, force: bool,
) -> None:
"""R17: si remote.updated_at != _meta.remote_updated_at, abortar (salvo --force)."""
local_remote_ts = local_doc.get("_meta", {}).get("remote_updated_at")
current_remote_ts = remote.get("updated_at")
if local_remote_ts is None:
# Item nuevo o pull antiguo sin metadata — proceder con cuidado
return
if current_remote_ts != local_remote_ts:
if force:
print(
f" ! force-overwrite ACTIVO: ignorando R17 — "
f"local snapshot={local_remote_ts}, metabase={current_remote_ts}"
)
return
_abort(
f"R17 freshness check fallido para {kind} {slug}.\n"
f" Tu snapshot: remote_updated_at = {local_remote_ts}\n"
f" Metabase ahora: updated_at = {current_remote_ts}\n"
f" → alguien (o tu mismo) cambio este {kind} en Metabase entre tu pull y este push.\n"
f" → para no sobrescribir esos cambios: python main.py pull {kind} {slug}\n"
f" → para sobrescribir igualmente: --force-overwrite (NO recomendado)"
)
def _count_check_or_abort(
slug: str, local_payload: dict, remote: dict, force: bool,
) -> None:
"""R18+R20: para dashboards, si remoto tiene mas dashcards/tabs/parameters
que el YAML local, abortar (salvo --force)."""
keys = ("dashcards", "tabs", "parameters")
losses = []
for k in keys:
local_count = len(local_payload.get(k, []) or [])
remote_count = len(remote.get(k, []) or [])
if remote_count > local_count:
losses.append(f"{k}: local={local_count}, metabase={remote_count} (perderias {remote_count - local_count})")
if losses:
if force:
print(f" ! force-overwrite ACTIVO: ignorando R18 — perdidas: {losses}")
return
_abort(
f"R18 count check fallido para dashboard {slug}:\n "
+ "\n ".join(losses)
+ f"\n → si genuinamente quieres eliminar elementos: --force-overwrite (NO recomendado)\n"
f" → si no, haz pull primero: python main.py pull dashboard {slug}"
)
# ---------------------------------------------------------------- Push paths
def _push_card_create(client, payload: dict) -> dict:
"""R2: 1 request, POST /api/card. Devuelve la card creada con su id."""
return metabase_create_card_raw(client, payload)
def _push_card_update(client, card_id: int, payload: dict) -> dict:
"""R2: 1 request, PUT /api/card/:id. R4: solo toca esta card."""
return metabase_update_card(client, card_id, **payload)
def _push_dashboard_create(client, payload: dict) -> dict:
"""POST + PUT (si hay dashcards) — documentado en metabase_create_dashboard_raw."""
return metabase_create_dashboard_raw(client, payload)
def _push_dashboard_update(client, dash_id: int, payload: dict) -> dict:
"""R2: 1 request, PUT /api/dashboard/:id. R3: solo toca este dashboard."""
return metabase_update_dashboard(client, dash_id, **payload)
# ---------------------------------------------------------------- Orchestrator
def push_one(
project, client, kind: str, slug: str,
*, apply: bool = False, force_overwrite: bool = False, allow_warnings: bool = False,
) -> dict:
"""Punto de entrada. Devuelve un dict con el resultado."""
log_entry: dict = {
"ts": _utc_now_iso(), "kind": kind, "slug": slug,
"apply": apply, "force_overwrite": force_overwrite,
}
# Hacer el entry visible para _abort() para que pueda loguear si aborta
global _active_log_entry, _active_project
_active_log_entry = log_entry
_active_project = project
# Fase 1: validate (R7+R9+R10+R11+estructura+SQL opcional)
val_client = client if (apply and kind == "card") else None
val_result = validate_one(
project, kind, slug,
check_sql=(apply and kind == "card"),
client=val_client,
)
print_result(kind, slug, val_result)
if val_result.errors:
log_entry["status"] = "validation_errors"
log_entry["issues"] = val_result.errors
_log_push(project, log_entry)
sys.exit(2)
if val_result.warnings and not allow_warnings:
if apply:
print(
f"\n ! hay {len(val_result.warnings)} warnings. "
f"Para aplicar igualmente: --allow-warnings"
)
log_entry["status"] = "warnings_blocking_apply"
log_entry["warnings"] = val_result.warnings
_log_push(project, log_entry)
sys.exit(1)
payload = val_result.payload
assert payload is not None
# R12: tamano del payload
payload_size = len(json.dumps(payload, default=str))
log_entry["payload_bytes"] = payload_size
if payload_size > _PAYLOAD_SIZE_WARN_BYTES:
print(f"\n ! payload size = {payload_size} bytes (>{_PAYLOAD_SIZE_WARN_BYTES})")
if apply:
resp = input(" ¿Continuar con apply? (escribir 'si'): ")
if resp.strip().lower() != "si":
_abort("usuario cancelo por tamano")
# Cargar el doc para tener _meta
doc = load_item_yaml(item_path(project.dir, kind, slug))
meta = doc.get("_meta", {})
item_id = meta.get("id")
is_create = item_id is None
# ---- Dry-run path
if not apply:
method, url = _resolve_method_url(kind, item_id)
print(f"\n--- DRY-RUN ({method} {url}) ---")
print(json.dumps(payload, indent=2, default=str))
print(f"\n payload: {payload_size} bytes")
print(f" para aplicar: añade --apply")
log_entry["status"] = "dry_run"
log_entry["method"] = method
log_entry["url"] = url
_log_push(project, log_entry)
return {"dry_run": True, "payload": payload}
# ---- Apply path
print(f"\n--- APPLY ---")
if is_create:
# R6 no aplica: nada que respaldar
print(" modo: CREATE (no hay backup, item nuevo)")
if kind == "card":
response = _push_card_create(client, payload)
elif kind == "dashboard":
response = _push_dashboard_create(client, payload)
else:
_abort(f"create por push de '{kind}' no soportado todavia (solo card/dashboard)")
new_id = response["id"]
print(f" creado con id={new_id}")
# Actualizar index + _meta del YAML local
idx = project.load_index()
idx.setdefault(kind + "s", {})[slug] = new_id
project.save_index(idx)
# Re-pull para refrescar _meta con synced_at + remote_updated_at + counts
print(" re-pull para refrescar _meta...")
pull_one(client, project, kind, new_id)
log_entry["status"] = "created"
log_entry["new_id"] = new_id
else:
# UPDATE path: R6 backup obligatorio + R17/R18 checks
print(f" modo: UPDATE (id={item_id})")
# Fetch estado remoto actual
if kind == "card":
remote = metabase_get_card(client, item_id)
elif kind == "dashboard":
remote = metabase_get_dashboard(client, item_id)
else:
_abort(f"update por push de '{kind}' no soportado todavia")
# R6: backup ANTES de hacer nada destructivo
backup_path = _backup_or_abort(project, kind, slug, remote)
log_entry["backup"] = str(backup_path.relative_to(project.dir.parent.parent))
# R17: freshness
_freshness_check_or_abort(kind, slug, doc, remote, force_overwrite)
# R18: count check (solo dashboards)
if kind == "dashboard":
_count_check_or_abort(slug, payload, remote, force_overwrite)
# Apply
if kind == "card":
response = _push_card_update(client, item_id, payload)
elif kind == "dashboard":
response = _push_dashboard_update(client, item_id, payload)
print(f" aplicado.")
# Re-pull para refrescar _meta
print(" re-pull para refrescar _meta...")
pull_one(client, project, kind, item_id)
log_entry["status"] = "updated"
log_entry["id"] = item_id
_log_push(project, log_entry)
print("OK")
return {"applied": True, "response": response}
def _resolve_method_url(kind: str, item_id: int | None) -> tuple[str, str]:
"""Devuelve (method, url) que usariamos en apply, para logs."""
if item_id is None:
return "POST", f"/api/{kind}"
return "PUT", f"/api/{kind}/{item_id}"
# ---------------------------------------------------------------- push_all
def _list_slugs(project, kind: str) -> list[str]:
"""Lista slugs (filenames sin .yaml) en projects/{name}/{kind}s/."""
sub = project.dir / (kind + "s")
if not sub.exists():
return []
return sorted(p.stem for p in sub.glob("*.yaml"))
def push_all(
project, client,
*, apply: bool = False, force_overwrite: bool = False, allow_warnings: bool = False,
kinds: tuple[str, ...] = ("card", "dashboard"),
) -> dict:
"""Pushea todos los YAMLs de cards y dashboards de un proyecto.
Solo CREATE o UPDATE (reusa push_one) — nunca DELETE.
Cards primero, dashboards despues, para que los slugs esten en el index
cuando se resuelven las dashcards.
Por defecto dry-run. Pasa apply=True para realmente enviar.
Si un item falla (SystemExit desde push_one), se captura y se continua
con el siguiente. Devuelve un resumen con el resultado por item.
"""
print(f"\n=== push all ({'APPLY' if apply else 'DRY-RUN'}) project={project.name} ===")
summary = {"ok": [], "failed": [], "skipped": []}
for kind in kinds:
slugs = _list_slugs(project, kind)
if not slugs:
print(f"\n[{kind}] (sin YAMLs en {kind}s/)")
continue
print(f"\n[{kind}] {len(slugs)} item(s): {', '.join(slugs)}")
for slug in slugs:
print(f"\n--- {kind} {slug} ---")
try:
push_one(
project, client, kind, slug,
apply=apply,
force_overwrite=force_overwrite,
allow_warnings=allow_warnings,
)
summary["ok"].append(f"{kind}:{slug}")
except SystemExit as e:
code = e.code if isinstance(e.code, int) else 1
print(f" ! {kind} {slug} fallo (exit_code={code}) — continuo")
summary["failed"].append(f"{kind}:{slug} (exit={code})")
except Exception as e:
print(f" ! {kind} {slug} excepcion: {type(e).__name__}: {e}")
summary["failed"].append(f"{kind}:{slug} ({type(e).__name__})")
print(f"\n=== resumen push all ===")
print(f" OK: {len(summary['ok'])} {summary['ok']}")
print(f" FAILED: {len(summary['failed'])} {summary['failed']}")
if not apply:
print(f" (dry-run — para aplicar de verdad: --apply)")
return summary
+157
View File
@@ -0,0 +1,157 @@
"""Restore desde backup.
Estructura de backups:
state/backups/{YYYY-MM-DD_HHMMSS}/{cards|dashboards|...}/{slug}.yaml
Cada backup es un YAML con:
_backup_of: {kind, slug, ts}
remote_state: <payload tal cual estaba en Metabase justo antes del push>
Restore NO aplica automaticamente a Metabase. Solo escribe el remote_state
de vuelta al YAML activo del item, en formato local (con _meta + _refs +
payload). Despues el usuario debe hacer `push <kind> <slug> --apply` para
aplicar.
Esto deja al usuario inspeccionar el resultado antes de aplicar.
"""
from __future__ import annotations
import datetime as dt
from pathlib import Path
import yaml
from payload import item_path
def list_backups(project, kind: str, slug: str) -> list[Path]:
"""Lista todos los backups disponibles para un item, ordenados (mas recientes primero)."""
backups_root = project.state_dir / "backups"
if not backups_root.exists():
return []
candidates = []
for ts_dir in sorted(backups_root.iterdir(), reverse=True):
if not ts_dir.is_dir():
continue
bp = ts_dir / (kind + "s") / f"{slug}.yaml"
if bp.exists():
candidates.append(bp)
return candidates
def _utc_now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _id_to_slug(id_: int | None, mapping: dict[str, int]) -> str | None:
if id_ is None:
return None
for slug, mid in mapping.items():
if mid == id_:
return slug
return None
def _strip_remote_to_local(kind: str, remote: dict, project) -> dict:
"""Convierte un payload remoto en formato YAML local (_meta + _refs + payload).
Reusa la misma logica de transformacion que sync_pull.pull_one. Lo mas
simple es importar las funciones de strip y reconstruir el doc.
"""
from sync_pull import _id_to_slug as _id_to_slug_fn, _strip_volatile
index = project.load_index()
payload = _strip_volatile(remote)
payload.pop("collection", None)
body: dict = {"_meta": {}, "_refs": {}, "payload": payload}
body["_meta"] = {
"kind": kind,
"id": remote.get("id"),
"slug": None, # se rellena despues
"synced_at": _utc_now_iso(),
"remote_updated_at": remote.get("updated_at"),
"restored_from_backup": True,
}
if kind == "card":
db_slug = _id_to_slug_fn(remote.get("database_id"), index.get("databases", {}))
coll_slug = _id_to_slug_fn(remote.get("collection_id"), index.get("collections", {}))
body["_refs"] = {"database": db_slug, "collection": coll_slug}
payload.pop("database_id", None)
payload.pop("collection_id", None)
if isinstance(payload.get("dataset_query"), dict) and "database" in payload["dataset_query"]:
payload["dataset_query"]["database"] = db_slug
elif kind == "dashboard":
coll_slug = _id_to_slug_fn(remote.get("collection_id"), index.get("collections", {}))
body["_refs"] = {"collection": coll_slug}
payload.pop("collection_id", None)
# dashcards: card_id -> card slug
cards_idx = index.get("cards", {})
clean = []
for dc in payload.get("dashcards", []) or []:
dc = dict(dc)
cid = dc.pop("card_id", None)
dc.pop("card", None)
dc.pop("dashboard_id", None)
dc["card"] = _id_to_slug_fn(cid, cards_idx)
series = dc.get("series") or []
if series:
dc["series"] = [_id_to_slug_fn(s.get("id") if isinstance(s, dict) else s, cards_idx) for s in series]
clean.append({k: v for k, v in dc.items() if v not in (None, [], {})})
payload["dashcards"] = clean
body["_meta"]["dashcards_count"] = len(clean)
body["_meta"]["tabs_count"] = len(payload.get("tabs", []) or [])
body["_meta"]["parameters_count"] = len(payload.get("parameters", []) or [])
return body
def restore_one(project, kind: str, slug: str, *, from_ts: str | None = None) -> Path:
"""Restaura el YAML local desde un backup.
NO aplica a Metabase. Solo escribe el archivo de disco para que el
usuario inspeccione y haga push --apply manualmente.
"""
backups = list_backups(project, kind, slug)
if not backups:
raise SystemExit(
f"No hay backups para {kind} {slug} en {project.state_dir / 'backups'}"
)
if from_ts is None:
chosen = backups[0] # mas reciente
else:
matches = [b for b in backups if from_ts in str(b)]
if not matches:
raise SystemExit(
f"No hay backup con timestamp '{from_ts}'. Disponibles:\n "
+ "\n ".join(str(b.relative_to(project.dir.parent.parent)) for b in backups)
)
chosen = matches[0]
print(f"[{project.name}] restore {kind} {slug}")
print(f" desde: {chosen.relative_to(project.dir.parent.parent)}")
with chosen.open() as f:
backup_doc = yaml.safe_load(f) or {}
remote_state = backup_doc.get("remote_state")
if not remote_state:
raise SystemExit(f"Backup corrupto: falta 'remote_state' en {chosen}")
body = _strip_remote_to_local(kind, remote_state, project)
body["_meta"]["slug"] = slug
body["_meta"]["restored_from"] = str(chosen.relative_to(project.dir.parent.parent))
target = item_path(project.dir, kind, slug)
target.parent.mkdir(parents=True, exist_ok=True)
with target.open("w") as f:
yaml.safe_dump(body, f, sort_keys=False, allow_unicode=True, default_flow_style=False, width=120)
print(f" escrito en: {target.relative_to(project.dir.parent.parent)}")
print(f"\n El backup quedo restaurado al disco. Para aplicarlo a Metabase:")
print(f" python main.py push {kind} {slug} --apply")
print(f" Antes, te recomiendo: cat {target.relative_to(project.dir.parent.parent)}")
return target
+176
View File
@@ -0,0 +1,176 @@
"""Validate: lee un YAML local, construye el payload, valida estructura y SQL.
Read-only — nunca escribe nada en Metabase ni en disco. Es la red de
seguridad antes de `push`.
Tres niveles de validacion (todos se ejecutan, recolectando issues):
1. Carga del YAML y consistencia de _meta vs args + _meta.id vs index.
(R9 + R11 — abortan si fallan, son corruption checks).
2. Resolucion de _refs: todos los slugs deben existir en index.
(R10 — aborta).
3. Estructura del payload: usa metabase_validate_card_payload /
_dashboard_payload del registry. Reporta issues como warnings,
no aborta.
4. SQL dry-run (solo cards native, opcional con --check-sql).
Usa metabase_validate_sql. Aborta de la lista de issues si SQL falla.
"""
from __future__ import annotations
from pathlib import Path
from metabase.validation import (
metabase_validate_card_payload,
metabase_validate_dashboard_payload,
metabase_validate_sql,
)
from payload import (
assert_id_matches_index,
assert_meta,
build_payload,
item_path,
known_card_ids,
load_item_yaml,
)
# Codigo de salida: 0 = OK, 1 = warnings, 2 = errores fatales (R9/R10/R11)
class ValidationResult:
def __init__(self):
self.errors: list[str] = [] # fatales (corruption, refs rotas)
self.warnings: list[str] = [] # estructurales (validators puros)
self.sql_status: str | None = None # "ok" / "failed" / "skipped"
self.payload: dict | None = None
@property
def ok(self) -> bool:
return not self.errors and not self.warnings and self.sql_status != "failed"
def exit_code(self) -> int:
if self.errors or self.sql_status == "failed":
return 2
if self.warnings:
return 1
return 0
def _extract_native_sql(payload: dict) -> str | None:
"""Extrae SQL de un payload de card si es native query. Soporta legacy y MBQL5."""
dq = payload.get("dataset_query")
if not isinstance(dq, dict):
return None
# Legacy: dq.native.query
native = dq.get("native")
if isinstance(native, dict) and isinstance(native.get("query"), str):
return native["query"]
# MBQL5: dq.stages[0].native (string directo)
stages = dq.get("stages")
if isinstance(stages, list) and stages:
first = stages[0]
if isinstance(first, dict):
n = first.get("native")
if isinstance(n, str):
return n
return None
def validate_one(
project, kind: str, slug: str,
*, check_sql: bool = False, client=None,
) -> ValidationResult:
"""Punto de entrada. `project` es main.Project."""
result = ValidationResult()
# ---- Capa 1: carga + meta consistency
path = item_path(project.dir, kind, slug)
if not path.exists():
result.errors.append(f"YAML no existe: {path.relative_to(project.dir.parent.parent)}")
return result
try:
doc = load_item_yaml(path)
except ValueError as e:
result.errors.append(str(e))
return result
index = project.load_index()
try:
assert_meta(doc, kind, slug, path)
except ValueError as e:
result.errors.append(f"R9 violado: {e}")
try:
assert_id_matches_index(doc, kind, slug, index, path)
except ValueError as e:
result.errors.append(f"R11 violado: {e}")
if result.errors:
return result # corruption — no seguir
# ---- Capa 2: build payload (resuelve refs)
try:
payload = build_payload(kind, doc, index, env=project.load_env())
except ValueError as e:
result.errors.append(f"R10 violado: {e}")
return result
result.payload = payload
# ---- Capa 3: validacion estructural (puras del registry)
if kind == "card":
result.warnings.extend(metabase_validate_card_payload(payload))
elif kind == "dashboard":
result.warnings.extend(
metabase_validate_dashboard_payload(payload, known_card_ids(index))
)
# databases/collections: no tienen validators todavia (pocos campos, bajo riesgo)
# ---- Capa 4: SQL dry-run (opcional, solo cards native)
if check_sql and kind == "card":
sql = _extract_native_sql(payload)
if sql is None:
result.sql_status = "skipped"
else:
if client is None:
result.warnings.append("--check-sql pedido pero client no inicializado")
result.sql_status = "skipped"
else:
db_id = payload.get("database_id")
if db_id is None:
result.warnings.append("no se puede check-sql: payload sin database_id")
result.sql_status = "skipped"
else:
sql_result = metabase_validate_sql(client, db_id, sql)
if sql_result["ok"]:
result.sql_status = "ok"
else:
result.sql_status = "failed"
result.errors.append(f"SQL invalido: {sql_result['error']}")
return result
def print_result(kind: str, slug: str, result: ValidationResult) -> None:
"""Imprime el resultado de la validacion en formato humano."""
print(f"validate {kind} {slug}")
if result.errors:
print(f" ERRORS ({len(result.errors)}):")
for e in result.errors:
print(f"{e}")
if result.warnings:
print(f" WARNINGS ({len(result.warnings)}):")
for w in result.warnings:
print(f" ! {w}")
if result.sql_status:
marker = {"ok": "", "failed": "", "skipped": "-"}[result.sql_status]
print(f" SQL: {marker} {result.sql_status}")
if not result.errors and not result.warnings:
print(" ✓ payload valido")
print(f" exit_code: {result.exit_code()}")
BIN
View File
Binary file not shown.