fix(fn-run): propagar stdout/stderr de bash functions library-style #1

Open
dataforge wants to merge 537 commits from auto/0077-fn-run-bash-mudo into master
7 changed files with 744 additions and 0 deletions
Showing only changes of commit dd324b7785 - Show all commits
@@ -0,0 +1,59 @@
---
name: metabase_add_ops_db
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "metabase_add_ops_db(app_name: str) -> None"
description: "Registra la operations.db de una app en Metabase como database SQLite. Verifica duplicados y muestra el mount necesario para el contenedor Docker."
tags: [metabase, operations, sqlite, docker, pipeline, infra, launcher]
uses_functions:
- metabase_auth_py_infra
- metabase_list_databases_py_infra
- metabase_add_database_py_infra
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/pipelines/metabase_add_ops_db.py"
---
## Ejemplo
```bash
# Registrar una app
python python/functions/pipelines/metabase_add_ops_db.py docker_tui
# Listar estado de todas las apps
python python/functions/pipelines/metabase_add_ops_db.py --list
```
## Flujo
1. `metabase_auth` - autentica contra Metabase
2. `metabase_list_databases` - verifica si la database ya existe
3. `metabase_add_database` - registra la operations.db como SQLite
## Requisitos
El contenedor Metabase debe tener montado el directorio de la app como volumen RW:
```
-v /home/lucas/fn_registry/apps/<app_name>:/data/ops-<app-name>
```
Ademas, el directorio debe tener permisos para el usuario metabase (UID 2000):
```bash
docker exec -u root <container> chown -R metabase:metabase /data/ops-<app-name>/
```
## Convencion de nombres
- Database en Metabase: `ops-<app-name>` (guiones en vez de underscores)
- Path en contenedor: `/data/ops-<app-name>/operations.db`
@@ -0,0 +1,116 @@
"""Pipeline: registra la operations.db de una app en Metabase.
Compone: metabase_auth + metabase_list_databases + metabase_add_database.
Requiere que el contenedor Metabase tenga montado el directorio de la app
como volumen RW para que SQLite pueda crear journal files.
Uso:
python metabase_add_ops_db.py <app_name>
python metabase_add_ops_db.py docker_tui
python metabase_add_ops_db.py --list
"""
import argparse
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from metabase.client import metabase_auth
from metabase import metabase_list_databases, metabase_add_database
METABASE_URL = os.environ.get("METABASE_URL", "http://localhost:3000")
EMAIL = os.environ.get("METABASE_ADMIN_EMAIL", "admin@fnregistry.local")
PASSWORD = os.environ.get("METABASE_ADMIN_PASSWORD", "FnRegistry2024!")
CONTAINER_DATA_PREFIX = "/data/ops-"
def find_apps(registry_root: str) -> list[str]:
"""Lista apps que tienen operations.db."""
apps_dir = os.path.join(registry_root, "apps")
if not os.path.isdir(apps_dir):
return []
return sorted(
d
for d in os.listdir(apps_dir)
if os.path.isfile(os.path.join(apps_dir, d, "operations.db"))
)
def db_name_for_app(app_name: str) -> str:
return f"ops-{app_name.replace('_', '-')}"
def container_path_for_app(app_name: str) -> str:
return f"{CONTAINER_DATA_PREFIX}{app_name.replace('_', '-')}/operations.db"
def main():
parser = argparse.ArgumentParser(description="Registra operations.db de una app en Metabase")
parser.add_argument("app_name", nargs="?", help="Nombre de la app (directorio en apps/)")
parser.add_argument("--list", action="store_true", help="Lista apps disponibles y su estado en Metabase")
args = parser.parse_args()
registry_root = os.environ.get(
"FN_REGISTRY_ROOT",
os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..")),
)
if args.list:
client = metabase_auth(METABASE_URL, EMAIL, PASSWORD)
dbs = metabase_list_databases(client)
db_names = {d["name"] for d in dbs}
apps = find_apps(registry_root)
print(f"Apps con operations.db ({len(apps)}):\n")
for app in apps:
name = db_name_for_app(app)
status = "registrada" if name in db_names else "NO registrada"
print(f" {app:30s} -> {name:30s} [{status}]")
client.close()
return
if not args.app_name:
parser.error("Se requiere app_name o --list")
app_name = args.app_name
ops_path = os.path.join(registry_root, "apps", app_name, "operations.db")
if not os.path.isfile(ops_path):
print(f"ERROR: No existe {ops_path}")
print(f"Apps disponibles: {', '.join(find_apps(registry_root))}")
sys.exit(1)
name = db_name_for_app(app_name)
container_path = container_path_for_app(app_name)
print(f"Registrando {app_name} en Metabase...")
print(f" Nombre: {name}")
print(f" Path: {container_path}")
client = metabase_auth(METABASE_URL, EMAIL, PASSWORD)
# Verificar si ya existe
dbs = metabase_list_databases(client)
for db in dbs:
if db["name"] == name:
print(f" Ya existe con id={db['id']}, nada que hacer.")
client.close()
return
# Registrar
result = metabase_add_database(client, name, "sqlite", {"db": container_path})
db_id = result["id"]
print(f" Registrada con id={db_id}")
# Recordar: el contenedor necesita el volumen montado
print(f"\nIMPORTANTE: El contenedor Metabase debe tener montado:")
print(f" -v {os.path.join(registry_root, 'apps', app_name)}:{CONTAINER_DATA_PREFIX}{app_name.replace('_', '-')}")
print(f"\nSi el mount no existe, hay que recrear el contenedor.")
print(f"\nDatabase disponible en: {METABASE_URL}/question#new?database={db_id}")
client.close()
if __name__ == "__main__":
main()
@@ -0,0 +1,68 @@
---
name: metabase_create_ops_dashboard
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "metabase_create_ops_dashboard(app_name: str) -> None"
description: "Crea dashboard operativo en Metabase para una app: KPIs de entities/relations/executions/assertions, distribucion por status y tipo, relaciones frecuentes, resultados de ejecuciones y assertions."
tags: [metabase, operations, dashboard, pipeline, infra, launcher]
uses_functions:
- metabase_auth_py_infra
- metabase_list_databases_py_infra
- metabase_create_card_py_infra
- metabase_create_dashboard_py_infra
- metabase_update_dashboard_py_infra
- metabase_list_dashboards_py_infra
- metabase_delete_dashboard_py_infra
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/pipelines/metabase_create_ops_dashboard.py"
---
## Ejemplo
```bash
# Dashboard para una app
python python/functions/pipelines/metabase_create_ops_dashboard.py docker_tui
# Dashboards para todas las apps registradas
python python/functions/pipelines/metabase_create_ops_dashboard.py --all
```
## Cards del dashboard
| Fila | Cards | Tipo |
|------|-------|------|
| 0 | Entities, Relations, Executions, Assertions | scalar (KPIs) |
| 5 | Entities por Status, Entities por Tipo, Relations por Status | pie/bar |
| 13 | Executions Success/Failure, Assertions por Severity, Assertion Results | pie/bar |
| 22 | Top Relaciones (via), Executions Recientes | row/table |
| 30 | Lista de Entities | table |
| 38 | Assertion Results Detalle | table |
## Flujo
1. `metabase_auth` - autentica contra Metabase
2. `metabase_list_databases` - busca la database operacional de la app
3. `metabase_create_card` x 14 - crea las cards con queries SQL nativas
4. `metabase_create_dashboard` - crea el dashboard
5. `metabase_update_dashboard` - posiciona las cards en el grid
## Requisitos
La database operacional debe estar registrada previamente con `metabase_add_ops_db`.
Las tablas `executions`, `assertions` y `assertion_results` requieren la migracion 002. Si no existen, las cards correspondientes mostraran error (no rompen el dashboard).
## Convencion
- Dashboard name: `ops: <app-name>`
- Si el dashboard ya existe, se elimina y recrea.
@@ -0,0 +1,291 @@
"""Pipeline: crea dashboard operativo en Metabase para una app.
Compone: metabase_auth + metabase_list_databases + metabase_create_card
+ metabase_create_dashboard + metabase_update_dashboard.
Genera un dashboard estandar con KPIs, distribucion y tablas sobre la
operations.db de una app: entities, relations, executions, assertions.
Uso:
python metabase_create_ops_dashboard.py <app_name>
python metabase_create_ops_dashboard.py docker_tui
python metabase_create_ops_dashboard.py --all
"""
import argparse
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from metabase.client import metabase_auth
from metabase import (
metabase_list_databases,
metabase_create_card,
metabase_create_dashboard,
metabase_update_dashboard,
metabase_list_dashboards,
metabase_delete_dashboard,
)
METABASE_URL = os.environ.get("METABASE_URL", "http://localhost:3000")
EMAIL = os.environ.get("METABASE_ADMIN_EMAIL", "admin@fnregistry.local")
PASSWORD = os.environ.get("METABASE_ADMIN_PASSWORD", "FnRegistry2024!")
def dashboard_name(app_name: str) -> str:
return f"ops: {app_name.replace('_', '-')}"
def db_name_for_app(app_name: str) -> str:
return f"ops-{app_name.replace('_', '-')}"
def build_cards(app_name: str) -> list[dict]:
"""Define las cards del dashboard operativo."""
return [
# ---- Fila 0: KPIs (h=5) ----
{
"name": f"[{app_name}] Entities",
"display": "scalar",
"sql": "SELECT COUNT(*) FROM entities;",
"size_x": 6, "size_y": 5, "col": 0, "row": 0,
},
{
"name": f"[{app_name}] Relations",
"display": "scalar",
"sql": "SELECT COUNT(*) FROM relations;",
"size_x": 6, "size_y": 5, "col": 6, "row": 0,
},
{
"name": f"[{app_name}] Executions",
"display": "scalar",
"sql": "SELECT COUNT(*) FROM executions;",
"size_x": 6, "size_y": 5, "col": 12, "row": 0,
},
{
"name": f"[{app_name}] Assertions",
"display": "scalar",
"sql": "SELECT COUNT(*) FROM assertions;",
"size_x": 6, "size_y": 5, "col": 18, "row": 0,
},
# ---- Fila 5: Distribucion (h=8) ----
{
"name": f"[{app_name}] Entities por Status",
"display": "pie",
"sql": "SELECT status, COUNT(*) AS cantidad FROM entities GROUP BY status ORDER BY cantidad DESC;",
"size_x": 8, "size_y": 8, "col": 0, "row": 5,
},
{
"name": f"[{app_name}] Entities por Tipo",
"display": "bar",
"sql": "SELECT type_ref, COUNT(*) AS cantidad FROM entities GROUP BY type_ref ORDER BY cantidad DESC;",
"size_x": 8, "size_y": 8, "col": 8, "row": 5,
},
{
"name": f"[{app_name}] Relations por Status",
"display": "pie",
"sql": "SELECT status, COUNT(*) AS cantidad FROM relations GROUP BY status ORDER BY cantidad DESC;",
"size_x": 8, "size_y": 8, "col": 16, "row": 5,
},
# ---- Fila 13: Executions y Assertions (h=9) ----
{
"name": f"[{app_name}] Executions: Success vs Failure",
"display": "pie",
"sql": "SELECT status, COUNT(*) AS cantidad FROM executions GROUP BY status ORDER BY cantidad DESC;",
"size_x": 8, "size_y": 9, "col": 0, "row": 13,
},
{
"name": f"[{app_name}] Assertions por Severity",
"display": "bar",
"sql": "SELECT severity, COUNT(*) AS cantidad FROM assertions GROUP BY severity ORDER BY cantidad DESC;",
"size_x": 8, "size_y": 9, "col": 8, "row": 13,
},
{
"name": f"[{app_name}] Assertion Results",
"display": "pie",
"sql": "SELECT status, COUNT(*) AS cantidad FROM assertion_results GROUP BY status ORDER BY cantidad DESC;",
"size_x": 8, "size_y": 9, "col": 16, "row": 13,
},
# ---- Fila 22: Relaciones frecuentes (h=8) ----
{
"name": f"[{app_name}] Top Relaciones (via)",
"display": "row",
"sql": """
SELECT
CASE WHEN via = '' THEN '(directo)' ELSE via END AS via_funcion,
COUNT(*) AS cantidad
FROM relations
GROUP BY via
ORDER BY cantidad DESC
LIMIT 15;
""",
"size_x": 12, "size_y": 8, "col": 0, "row": 22,
},
{
"name": f"[{app_name}] Executions Recientes",
"display": "table",
"sql": """
SELECT
id,
pipeline_id,
status,
duration_ms,
records_in,
records_out,
CASE WHEN error = '' THEN '-' ELSE error END AS error,
started_at
FROM executions
ORDER BY started_at DESC
LIMIT 20;
""",
"size_x": 12, "size_y": 8, "col": 12, "row": 22,
},
# ---- Fila 30: Tablas detalle (h=8) ----
{
"name": f"[{app_name}] Lista de Entities",
"display": "table",
"sql": """
SELECT
id,
name,
type_ref,
status,
domain,
source,
created_at,
updated_at
FROM entities
ORDER BY updated_at DESC;
""",
"size_x": 24, "size_y": 8, "col": 0, "row": 30,
},
{
"name": f"[{app_name}] Assertion Results Detalle",
"display": "table",
"sql": """
SELECT
ar.id,
a.name AS assertion,
a.kind,
a.severity,
ar.status,
ar.value,
ar.message,
ar.evaluated_at
FROM assertion_results ar
JOIN assertions a ON a.id = ar.assertion_id
ORDER BY ar.evaluated_at DESC
LIMIT 50;
""",
"size_x": 24, "size_y": 8, "col": 0, "row": 38,
},
]
def create_ops_dashboard(client, app_name: str, db_id: int) -> int:
"""Crea el dashboard operativo para una app. Retorna dashboard_id."""
dash_name = dashboard_name(app_name)
# Eliminar dashboard existente si hay
for d in metabase_list_dashboards(client):
if d.get("name") == dash_name:
print(f" Dashboard existente (id={d['id']}), recreando...")
metabase_delete_dashboard(client, d["id"])
cards_def = build_cards(app_name)
print(f" Creando {len(cards_def)} cards...")
created = []
for i, card_def in enumerate(cards_def):
card = metabase_create_card(
client,
name=card_def["name"],
dataset_query={
"database": db_id,
"type": "native",
"native": {"query": card_def["sql"]},
},
display=card_def["display"],
description=f"ops dashboard: {app_name}",
)
created.append((card, card_def))
print(f" [{i+1}/{len(cards_def)}] {card_def['name']} (id={card['id']})")
print(" Creando dashboard...")
dashboard = metabase_create_dashboard(
client,
name=dash_name,
description=f"Dashboard operativo de {app_name}: entities, relations, executions, assertions.",
)
dash_id = dashboard["id"]
dashcards = []
for idx, (card, card_def) in enumerate(created):
dashcards.append({
"id": -(idx + 1),
"card_id": card["id"],
"size_x": card_def["size_x"],
"size_y": card_def["size_y"],
"col": card_def["col"],
"row": card_def["row"],
})
metabase_update_dashboard(client, dash_id, dashcards=dashcards)
return dash_id
def main():
parser = argparse.ArgumentParser(description="Crea dashboard operativo en Metabase para una app")
parser.add_argument("app_name", nargs="?", help="Nombre de la app")
parser.add_argument("--all", action="store_true", help="Crea dashboards para todas las apps registradas en Metabase")
args = parser.parse_args()
if not args.app_name and not args.all:
parser.error("Se requiere app_name o --all")
print("Autenticando en Metabase...")
client = metabase_auth(METABASE_URL, EMAIL, PASSWORD)
dbs = metabase_list_databases(client)
if args.all:
ops_dbs = [
(db["name"].replace("ops-", "").replace("-", "_"), db["id"])
for db in dbs
if db["name"].startswith("ops-")
]
if not ops_dbs:
print("No hay databases operacionales registradas en Metabase.")
sys.exit(1)
print(f"Creando dashboards para {len(ops_dbs)} apps...\n")
for app_name, db_id in ops_dbs:
print(f"--- {app_name} (db_id={db_id}) ---")
dash_id = create_ops_dashboard(client, app_name, db_id)
print(f" Dashboard: {METABASE_URL}/dashboard/{dash_id}\n")
else:
app_name = args.app_name
expected_db_name = f"ops-{app_name.replace('_', '-')}"
db_id = None
for db in dbs:
if db["name"] == expected_db_name:
db_id = db["id"]
break
if not db_id:
print(f"ERROR: Database '{expected_db_name}' no encontrada en Metabase.")
print("Databases disponibles:")
for db in dbs:
print(f" - {db['name']} (id={db['id']})")
print(f"\nRegistra primero con: python metabase_add_ops_db.py {app_name}")
sys.exit(1)
print(f"App: {app_name} (db_id={db_id})")
dash_id = create_ops_dashboard(client, app_name, db_id)
print(f"\nDashboard listo: {METABASE_URL}/dashboard/{dash_id}")
client.close()
if __name__ == "__main__":
main()
@@ -0,0 +1,58 @@
---
name: metabase_fix_permissions
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "metabase_fix_permissions() -> None"
description: "Arregla permisos SQLITE_READONLY_DIRECTORY en el contenedor Metabase. Hace chmod 777/666 en directorios y archivos .db bajo /data/ para que el usuario metabase (UID 2000) pueda crear journal files."
tags: [metabase, sqlite, permissions, docker, pipeline, infra, launcher]
uses_functions:
- metabase_auth_py_infra
- metabase_list_databases_py_infra
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/pipelines/metabase_fix_permissions.py"
---
## Ejemplo
```bash
# Fix permisos (usa contenedor por defecto: fn_registry-metabase)
python python/functions/pipelines/metabase_fix_permissions.py
# Contenedor custom
python python/functions/pipelines/metabase_fix_permissions.py --container mi-metabase
```
## Problema
Metabase corre Java como UID 2000 (usuario `metabase`). SQLite necesita crear
journal/WAL files en el mismo directorio que la BD. Si el directorio no es
escribible por UID 2000, Metabase devuelve:
```
SQLITE_READONLY_DIRECTORY: Process does not have permission to create a
journal file in the same directory as the database
```
## Solucion
El pipeline ejecuta dentro del contenedor (como root):
- `chmod 777` en cada directorio que contiene un `.db`
- `chmod 666` en cada archivo `.db`, `-wal`, `-shm`, `-journal`
Esto NO cambia ownership (evita que `chown` se propague al host via bind mount).
## Cuando ejecutar
- Despues de recrear el contenedor Metabase
- Despues de añadir una nueva database con `metabase_add_ops_db`
- Cuando Metabase muestra error SQLITE_READONLY_DIRECTORY
@@ -0,0 +1,152 @@
"""Pipeline: arregla permisos de SQLite en el contenedor Metabase.
Compone: metabase_auth + metabase_list_databases (para verificar).
SQLite necesita escribir journal/WAL files en el mismo directorio que la BD.
El contenedor Metabase corre Java como UID 2000 (usuario metabase).
Si los directorios o archivos no son escribibles por ese UID, Metabase
devuelve SQLITE_READONLY_DIRECTORY.
Este pipeline:
1. Detecta el contenedor Metabase
2. Encuentra todos los directorios /data/ que contienen .db
3. Hace chmod 777 en directorios y chmod 666 en archivos .db
4. Verifica que Metabase puede leer cada database
Uso:
python metabase_fix_permissions.py
python metabase_fix_permissions.py --container fn_registry-metabase
"""
import argparse
import os
import subprocess
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from metabase.client import metabase_auth
from metabase import metabase_list_databases
METABASE_URL = os.environ.get("METABASE_URL", "http://localhost:3000")
EMAIL = os.environ.get("METABASE_ADMIN_EMAIL", "admin@fnregistry.local")
PASSWORD = os.environ.get("METABASE_ADMIN_PASSWORD", "FnRegistry2024!")
DEFAULT_CONTAINER = "fn_registry-metabase"
def docker_exec(container: str, cmd: list[str], user: str = "root") -> str:
"""Ejecuta comando dentro del contenedor."""
result = subprocess.run(
["docker", "exec", "-u", user, container] + cmd,
capture_output=True, text=True,
)
return result.stdout.strip()
def fix_container_permissions(container: str) -> list[str]:
"""Arregla permisos de todos los .db en /data/ del contenedor.
Returns:
Lista de paths arreglados.
"""
# Encontrar todos los .db bajo /data/
find_output = docker_exec(container, ["find", "/data", "-name", "*.db", "-type", "f"])
if not find_output:
print(" No se encontraron archivos .db en /data/")
return []
db_files = [f.strip() for f in find_output.splitlines() if f.strip()]
fixed = []
for db_path in db_files:
db_dir = os.path.dirname(db_path)
# Fix directorio: 777 para que UID 2000 pueda crear journal
docker_exec(container, ["chmod", "777", db_dir])
# Fix archivo: 666 para que UID 2000 pueda leer/escribir
docker_exec(container, ["chmod", "666", db_path])
# Fix WAL/SHM si existen
for suffix in ["-wal", "-shm", "-journal"]:
wal_path = db_path + suffix
docker_exec(container, ["chmod", "666", wal_path])
fixed.append(db_path)
print(f" Fixed: {db_path} (dir: {db_dir})")
return fixed
def verify_databases(client) -> bool:
"""Verifica que todas las databases SQLite responden."""
import httpx
dbs = metabase_list_databases(client)
all_ok = True
for db in dbs:
if db.get("engine") != "sqlite":
continue
name = db["name"]
db_id = db["id"]
try:
resp = client._http.request("POST", "/api/dataset", json={
"database": db_id,
"type": "native",
"native": {"query": "SELECT 1"},
}, extensions={"timeout": {"read": 60, "write": 10, "connect": 10, "pool": 10}})
if resp.status_code in (200, 202):
print(f" OK: {name} (id={db_id})")
else:
error = resp.text[:150]
print(f" FAIL: {name} (id={db_id}) -> {error}")
all_ok = False
except httpx.ReadTimeout:
print(f" TIMEOUT: {name} (id={db_id}) -> Metabase aun sincronizando, reintentar en unos segundos")
all_ok = False
return all_ok
def main():
parser = argparse.ArgumentParser(description="Arregla permisos SQLite en contenedor Metabase")
parser.add_argument("--container", default=DEFAULT_CONTAINER, help="Nombre del contenedor Metabase")
args = parser.parse_args()
container = args.container
# Verificar que el contenedor existe
result = subprocess.run(
["docker", "inspect", container, "--format", "{{.State.Running}}"],
capture_output=True, text=True,
)
if result.returncode != 0 or result.stdout.strip() != "true":
print(f"ERROR: Contenedor '{container}' no esta corriendo.")
sys.exit(1)
print(f"Contenedor: {container}")
print(f"\n1. Arreglando permisos en /data/...")
fixed = fix_container_permissions(container)
if not fixed:
print(" Nada que arreglar.")
return
print(f"\n2. Verificando databases en Metabase...")
client = metabase_auth(METABASE_URL, EMAIL, PASSWORD)
ok = verify_databases(client)
client.close()
if ok:
print(f"\nTodas las databases responden correctamente.")
else:
print(f"\nAlgunas databases siguen fallando. Revisar logs del contenedor:")
print(f" docker logs {container} --tail 50")
if __name__ == "__main__":
main()