Files
fn_registry/apps/dag_engine/README.md
T
egutierrez eb30074792 chore: auto-commit (8 archivos)
- .claude/rules/registry_calls.md
- apps/dag_engine/README.md
- apps/dag_engine/app.md
- docs/capabilities/INDEX.md
- docs/capabilities/systemd.md
- docs/execution_standard.md
- dev/proposals_e2e_checks_0121/
- docs/capabilities/backends.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 00:31:30 +02:00

361 lines
16 KiB
Markdown

# dag_engine — Guia de uso
Motor de DAGs propio del fn_registry. **Scheduler oficial** del ecosistema (issue 0007a-e + flow 0001). Backend Go + frontend web (Vite/React) + frontend C++ ImGui (`cpp/apps/dag_engine_ui`).
Doc canonica para **anadir DAGs**, **formato YAML**, **comandos CLI**, y **diagnostico de fallos**.
---
## 1. Donde viven los DAGs
| Path | Que |
|---|---|
| `apps/dag_engine/dags_migrated/` | DAGs activos servidos por `dag_engine.service` (systemd user unit). |
| `apps/dag_engine/dags_migrated/archive/` | DAGs deshabilitados (no se cargan por el scheduler). |
Por defecto el systemd unit apunta a `apps/dag_engine/dags_migrated/`. Para usar otro dir, edita `~/.config/systemd/user/dag_engine.service`:
```ini
ExecStart=/home/lucas/fn_registry/apps/dag_engine/dag_engine server \
--port 8090 \
--dags-dir /home/lucas/fn_registry/apps/dag_engine/dags_migrated \
--db /home/lucas/fn_registry/apps/dag_engine/dag_engine.db \
--scheduler
```
Y reload + restart:
```bash
systemctl --user daemon-reload
systemctl --user restart dag_engine.service
```
---
## 2. Anadir un DAG nuevo (workflow)
### Paso a paso
1. **Crear YAML** en `apps/dag_engine/dags_migrated/<nombre>.yaml` (ver formato en seccion 3).
2. **Validar** sin ejecutar:
```bash
./apps/dag_engine/dag_engine validate apps/dag_engine/dags_migrated/<nombre>.yaml
```
Salida esperada: `Validation: PASS`. Si falla, ver seccion 5 (diagnostico).
3. **Probar ejecucion manual** una vez:
```bash
./apps/dag_engine/dag_engine run apps/dag_engine/dags_migrated/<nombre>.yaml
```
4. **Recargar scheduler** (toma el YAML automaticamente al iterar el dir):
```bash
systemctl --user restart dag_engine.service
journalctl --user-unit dag_engine.service -n 30 --no-pager
```
Busca la linea `[scheduler] ticker started for <nombre> (<cron>)` en los logs.
5. **Verificar en frontend**:
- C++ ImGui: panel `DAGs` muestra el nuevo DAG. Pulsa `Refresh` si no aparece.
- Web: `http://localhost:8090`.
### Disparo manual desde curl o frontend
```bash
curl -X POST http://127.0.0.1:8090/api/dags/<nombre>/run
```
Devuelve `{"dag":"<nombre>","run_id":"...","status":"accepted"}` y dispara el WS broadcast — los frontends ven la run en `<1s`.
---
## 3. Formato YAML
Formato YAML propio de dag_engine. Schema: `name`, `description`, `schedule`, `env`, `tags`, `working_dir`, `steps[]`, `handlers` (alias `handler_on`).
### Ejemplo completo
```yaml
name: my_pipeline
description: "Pipeline diario que importa CSV y actualiza Metabase."
group: finanzas # opcional, agrupa DAGs en listados
type: graph # opcional: graph (default) | chain
tags: [daily, csv, metabase] # opcional, filtros en la UI
# Variables de entorno (heredadas por todos los steps).
env:
- DATA_DIR: /home/lucas/data
- SLACK_HOOK: ${SLACK_HOOK_PROD} # interpolacion de ENV del host
# Cron schedule. Puede ser string o lista.
schedule:
- "0 9 * * *" # 09:00 todos los dias
- "0 21 * * 5" # 21:00 viernes (segundo trigger)
# Working dir + shell por defecto para todos los steps.
working_dir: /home/lucas/fn_registry
shell: /bin/bash
timeout_sec: 1800 # 30 min para todo el DAG
steps:
- name: ingest
description: "Descarga CSV."
command: ./bash/functions/pipelines/ingest_csv.sh
timeout_sec: 300 # 5 min para este step
env:
- SOURCE_URL: https://example.com/data.csv
- name: transform
description: "Limpieza y agregacion."
script: |
#!/usr/bin/env python3
import pandas as pd
df = pd.read_csv("$DATA_DIR/raw.csv")
df.to_parquet("$DATA_DIR/clean.parquet")
depends: [ingest] # debe terminar OK antes
retry_policy:
limit: 2 # reintentos en caso de fallo
interval_sec: 60
- name: load_metabase
command: ./bash/functions/metabase/refresh_dashboard.sh
depends: [transform]
continue_on:
failure: true # no aborta el DAG aunque falle
- name: notify
command: ./bash/functions/io/slack_send.sh "pipeline OK"
depends: [load_metabase]
# Hooks de ciclo de vida.
handler_on:
success: ./bash/functions/io/notify_success.sh
failure: ./bash/functions/io/notify_failure.sh
exit: ./bash/functions/io/cleanup.sh
```
### Campos del DAG (top-level)
| Campo | Tipo | Default | Que |
|---|---|---|---|
| `name` | string | (obligatorio) | Identificador unico. Debe matchear el filename sin extension. |
| `description` | string | "" | Texto libre, aparece en la UI. |
| `group` | string | "" | Agrupa DAGs en listados. |
| `type` | string | `""` (graph) | `graph` o `chain`. graph = grafo dirigido por `depends`. chain = ejecucion secuencial implicita. |
| `working_dir` | string | cwd del server | Path absoluto desde donde lanzar los steps. |
| `shell` | string | `/bin/sh` | Shell para `command:`. |
| `env` | list/map | [] | Variables de entorno DAG-wide. |
| `schedule` | string/list | "" | Cron expressions (5 campos: min hour dom mon dow). Vacio = solo manual. |
| `steps` | list | (obligatorio) | Pasos del DAG (>=1). |
| `handler_on` | map | null | Hooks `init/success/failure/exit`. Alias: `handlers`. |
| `tags` | list[string] | [] | Filtros en la UI. |
| `timeout_sec` | int | 0 (sin timeout) | Timeout global del DAG en segundos. |
### Campos de cada step
| Campo | Tipo | Default | Que |
|---|---|---|---|
| `name` | string | (obligatorio) | Identificador del step dentro del DAG. |
| `id` | string | "" | Override del id auto-generado. |
| `description` | string | "" | Texto libre. |
| `command` | string | "" | Comando shell (mutuamente excluyente con `script`/`function`). |
| `script` | string | "" | Bloque heredoc. Util para Python/Lua inline. |
| `function` | string | "" | ID de funcion del registry (ej `audit_capability_groups_go_infra`). Si set, executor invoca `${FN_REGISTRY_ROOT}/fn run <id> <args...>` y captura `function_id` en `dag_step_results`. Mutuamente exclusivo con `command`/`script`; si convive, gana `function`. |
| `args` | list[string] | [] | Args extra para `command` o para la `function`. |
| `shell` | string | hereda | Override del shell. |
| `dir` / `working_dir` | string | hereda | Working dir para este step. |
| `depends` | list[string] | [] | Steps que deben terminar OK antes. Si vacio + `type:graph`, arranca en paralelo. |
| `env` | list/map | hereda | Env del step (sobrescribe el del DAG). |
| `continue_on.failure` | bool | false | Si true, el DAG sigue aunque este step falle. |
| `continue_on.skipped` | bool | false | Si true, dependientes corren aunque este step quede skipped. |
| `retry_policy.limit` | int | 0 | Reintentos. |
| `retry_policy.interval_sec` | int | 0 | Segundos entre reintentos. |
| `timeout_sec` | int | 0 (sin timeout) | Timeout del step. |
| `output` | string | "" | Nombre de variable donde guardar stdout (consumible por dependientes). |
| `tags` | list[string] | [] | Tags por step (UI). |
### Function steps (coherencia con el registry)
Un DAG idiomatico llama funciones del registry, no scripts ad-hoc. Cada step `function:` queda trazado en `call_monitor.calls` por el hook PostToolUse del agente y en `dag_step_results.function_id` del propio dag_engine — el bucle reactivo (issue 0085) tiene visibilidad end-to-end.
```yaml
steps:
- name: audit_capabilities
function: audit_capability_groups_go_infra
args: ["--json"]
description: "Audita drift entre tags de capability group y paginas madre"
```
Ventajas vs `command: ./fn run ...`:
- `function_id` se persiste como columna dedicada en `dag_step_results` (filtrable, agrupable).
- El frontend `dag_engine_ui` muestra badge + panel lateral con `uses_functions` (subfunciones que el step va a usar transitivamente).
- API: `GET /api/functions/{id}` devuelve `{id, name, description, signature, purity, domain, lang, uses_functions[], uses_types[]}` leyendo `registry.db` read-only. La UI consume este endpoint al expandir un step.
- Validator regex en `dag_validate`: `^[a-z0-9_]+_[a-z]+_[a-z]+$`. ID invalido = error.
- Variables de entorno: `FN_REGISTRY_ROOT` (default `/home/lucas/fn_registry`) localiza el binario `fn`. Override con `FN_BIN=/path/al/fn`.
- **`FN_REGISTRY_ROOT` obligatorio cuando el servicio corre via systemd** con `WorkingDirectory` fuera del root del registry. El binario `fn` resuelve `registry.db` por (1) env var, (2) walk-up buscando `go.mod`, (3) exe dir. Si (1) no esta y (2) encuentra el `go.mod` del propio servicio (ej. `apps/dag_engine/go.mod`), devuelve un dir donde `registry.db` no existe o esta stale, fallando con `error: function "<id>" not found`. Bug historico: `apps/dag_engine/registry.db` stale (May 15) tumbo 3 noches `fn_backup` + `daily-registry-audit`. Defensa en profundidad: el executor exporta `FN_REGISTRY_ROOT` y hace `cd $FN_REGISTRY_ROOT` antes del spawn de steps `function:` (executor.go), pero el `Environment=FN_REGISTRY_ROOT=...` del systemd unit sigue siendo la fuente de verdad.
- **`PATH` en el systemd unit**: si steps `function:` invocan funciones Go sin tests (`go vet`) o Python (`python3`), el `PATH` del entorno systemd debe incluir esos binarios — declarar `Environment=PATH=/usr/local/go/bin:/home/lucas/go/bin:/home/lucas/.local/bin:/usr/local/bin:/usr/bin:/bin`.
Ejemplo completo: `apps/dag_engine/dags_migrated/daily-registry-audit.yaml`.
### Cron schedule
5 campos clasicos: `min hour dom mon dow`. Ejemplos:
| Expresion | Significado |
|---|---|
| `0 9 * * *` | Todos los dias a las 09:00 |
| `*/15 * * * *` | Cada 15 minutos |
| `0 */6 * * *` | Cada 6 horas en punto |
| `0 9 * * 1-5` | 09:00 lunes-viernes |
| `0 21 * * 5` | 21:00 viernes |
Multiples cron en `schedule:` -> el DAG dispara por cada uno.
---
## 4. Comandos CLI
```bash
./dag_engine run <path.yaml> # ejecuta un DAG ad-hoc
./dag_engine list [dir] # lista DAGs con schedule + ultimo status
./dag_engine status [dag_name] # historial de ejecuciones
./dag_engine validate <path.yaml> # parse + validate (no ejecuta)
./dag_engine server # arranca HTTP + WS hub + frontend embebido
```
Flags del `server`:
| Flag | Default | Que |
|---|---|---|
| `--port` | 8090 | Puerto HTTP. |
| `--dags-dir` | `apps/dag_engine/dags_migrated` (via systemd unit) | Dir scaneado para YAMLs. |
| `--db` | `dag_engine.db` | SQLite con `dag_runs` + `dag_step_results`. |
| `--scheduler` | false | Si presente, arranca cron tickers automaticamente. |
---
## 5. Que hacer si algo falla
### 5.1. El DAG no aparece en la UI
**Sintoma:** anadiste un YAML pero `GET /api/dags` no lo lista.
| Causa | Diagnostico | Fix |
|---|---|---|
| YAML invalido | `./dag_engine validate <path>` muestra el error. | Corregir segun el mensaje (campo desconocido, indentacion, type wrong). |
| Filename con extension fuera de `.yaml`/`.yml` | `ls apps/dag_engine/dags_migrated/` | Renombrar a `.yaml`. |
| El servidor apunta a otro dir | `systemctl --user cat dag_engine.service` -> ver `--dags-dir`. | Ajustar unit y `daemon-reload + restart`. |
| Cache UI antiguo | C++: pulsa `Refresh`. Web: `Ctrl+F5`. | — |
### 5.2. Validation: FAIL
`validate` muestra `parse error: ...` o `Validation: FAIL`. Causas tipicas:
| Mensaje | Causa | Fix |
|---|---|---|
| `yaml unmarshal: ...` | Sintaxis YAML rota (indentacion, tab vs espacios). | Usar 2 espacios consistentes. Validar online con `yamllint`. |
| `dag_parse: step[N]: name is required` | Step sin `name:`. | Anadir `name:`. |
| `dag_parse: step[N]: command or script required` | Step sin `command` ni `script`. | Anadir uno de los dos. |
| `cycle detected: A -> B -> A` | `depends` forma ciclo. | Romper la dependencia o convertir uno de los nodos en step distinto. |
| `unknown depends: <step>` | `depends:` referencia un step inexistente. | Comprobar nombres exactos (case-sensitive). |
| `invalid cron: <expr>` | Cron mal formado (4 o 6 campos en vez de 5). | Verificar `0 9 * * *` (5 campos). |
### 5.3. El DAG corre pero un step falla
**Sintoma:** `status: failed` en la UI.
1. Abre `DAG Detail` y haz doble-click en el run rojo -> `Run Detail`.
2. Expande el step que fallo (CollapsingHeader). Muestra `stdout` + `stderr`.
3. Errores tipicos:
| stderr | Causa | Fix |
|---|---|---|
| `command not found` | `command:` apunta a un binario fuera de `PATH`. | Path absoluto o setear `env: [PATH: ...]`. |
| `permission denied` | Script sin `chmod +x`. | `chmod +x <script>` (o usar `bash <script>`). |
| `no such file or directory` | `working_dir:` mal o ruta relativa rota. | Path absoluto en `working_dir:`. |
| Timeout | Step duro mas que `timeout_sec`. | Subir el limite o partir el step. |
| Exit 137 / OOM kill | Out-of-memory. | Reducir batch o anadir swap. |
### 5.4. El scheduler no dispara
**Sintoma:** Hay `schedule:` valido pero el DAG no corre solo.
1. Verifica que el server arranco con `--scheduler`:
```bash
systemctl --user cat dag_engine.service | grep scheduler
```
2. Logs:
```bash
journalctl --user-unit dag_engine.service -n 50 --no-pager | grep -E "scheduler|ticker"
```
Debes ver `[scheduler] ticker started for <name> (<cron>), next: <ISO8601>`.
3. Si `next:` es muy lejano (ej. en una semana) y necesitas probar -> dispara manual:
```bash
curl -X POST http://127.0.0.1:8090/api/dags/<name>/run
```
4. Hora del sistema descalibrada:
```bash
timedatectl status
```
Si difiere de la hora real, `sudo timedatectl set-ntp true`.
### 5.5. El frontend C++ no conecta WS
**Sintoma:** Panel `Live (WS)` muestra `disconnected`.
| Causa | Fix |
|---|---|
| Servidor caido | `systemctl --user status dag_engine.service`, `restart` si `inactive`. |
| Puerto cambiado | El cliente apunta a `127.0.0.1:8090` por codigo (constante `g_ws_port`). Reedificar si cambiaste el puerto del server. |
| Firewall Windows -> WSL | WSL2 expone `localhost`, normalmente OK. Si falla: `wsl --shutdown` y reabrir. |
### 5.6. Cleanup de runs viejos
`dag_runs` y `dag_step_results` crecen sin limite. Para limpiar:
```bash
sqlite3 apps/dag_engine/dag_engine.db <<'SQL'
DELETE FROM dag_step_results WHERE run_id IN (
SELECT id FROM dag_runs WHERE started_at < datetime('now', '-30 days')
);
DELETE FROM dag_runs WHERE started_at < datetime('now', '-30 days');
VACUUM;
SQL
```
### 5.7. Restaurar desde backup
Si rompes `dags_migrated/`, recupera desde el snapshot de `backup_all_bash_pipelines` (BACKUP_ROOT por defecto `~/backups/fn_registry`):
```bash
cp ~/backups/fn_registry/registry/daily.0/dags_migrated/*.yaml \
apps/dag_engine/dags_migrated/ 2>/dev/null || \
git checkout HEAD -- apps/dag_engine/dags_migrated/
systemctl --user restart dag_engine.service
```
---
## 6. Endpoints HTTP
| Metodo | Path | Que |
|---|---|---|
| GET | `/api/dags` | Lista DAGs + last_run + last_runs[5]. |
| GET | `/api/dags/{name}` | Detalle + validation. |
| POST | `/api/dags/{name}/run` | Dispara ejecucion (trigger=`api`). Devuelve `run_id`. |
| GET | `/api/runs` | Historial. Query: `dag`, `limit`, `offset`. |
| GET | `/api/runs/{id}` | Detalle de un run + sus step_results. |
| GET | `/api/ws/dagruns` | WebSocket. Snapshot + deltas en vivo (issue 0095). |
| GET | `/api/scheduler/status` | Tickers activos. |
| POST | `/api/scheduler/start` | Arranca scheduler (si no estaba). |
| POST | `/api/scheduler/stop` | Para scheduler. |
---
## 7. Referencias
- Schema parser: `functions/core/dag_parse.go` (frontmatter en `dag_parse_go_core`).
- Validator: `functions/core/dag_validate.go` (`dag_validate_go_core`).
- Topo sort: `functions/core/dag_topo_sort.go` (`dag_topo_sort_go_core`).
- Cron: `functions/core/parse_cron_expr.go` + `next_cron_time.go`.
- Frontend C++: `cpp/apps/dag_engine_ui/` (issue 0095).
- WS hub: `apps/dag_engine/events.go`.
- dag_engine es el scheduler oficial del ecosistema. Single-binary Go + SQLite, sin dependencias externas.