docs(dag_engine): README autoritativo (anadir DAGs + formato YAML + troubleshooting)
apps/dag_engine/README.md cubre: - Donde viven los DAGs y como apuntar el systemd unit. - Workflow paso a paso para anadir uno nuevo (crear/validar/probar/recargar/verificar). - Formato YAML completo: top-level fields + step fields + cron schedule + ejemplo de extremo a extremo (env, depends, retry_policy, continue_on, handlers). - Comandos CLI (run/list/status/validate/server) + flags. - 7 secciones de "que hacer si algo falla": DAG invisible, validation fail, step fallido, scheduler no dispara, WS disconnected, cleanup runs viejos, restaurar backup. - Endpoints HTTP completos. - Referencias a funciones del registry y commit de migracion. app.md de dag_engine + dag_engine_ui apuntan a README.md. gitlink dag_engine_ui actualizado a commit con app.md mejorado. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,335 @@
|
||||
# dag_engine — Guia de uso
|
||||
|
||||
Motor de DAGs propio (reemplazo de Dagu). Backend Go + frontend web (Vite/React) + frontend C++ ImGui (`cpp/apps/dag_engine_ui`).
|
||||
|
||||
Doc canonica para **anadir DAGs**, **formato YAML**, **comandos CLI**, y **diagnostico de fallos**.
|
||||
|
||||
---
|
||||
|
||||
## 1. Donde viven los DAGs
|
||||
|
||||
| Path | Que |
|
||||
|---|---|
|
||||
| `apps/dag_engine/dags_migrated/` | DAGs activos servidos por `dag_engine.service` (systemd user unit). |
|
||||
| `~/dagu/dags/` | Path por defecto del binario si no se pasa `--dags-dir`. Vacio tras la migracion del 2026-05-15 (ver tag `dagu_pre_removal`). |
|
||||
| `~/backups/dagu_pre_removal_<fecha>.tar.gz` | Backup completo de la carpeta dagu antes de borrar. |
|
||||
|
||||
Por defecto el systemd unit apunta a `apps/dag_engine/dags_migrated/`. Para usar otro dir, edita `~/.config/systemd/user/dag_engine.service`:
|
||||
|
||||
```ini
|
||||
ExecStart=/home/lucas/fn_registry/apps/dag_engine/dag_engine server \
|
||||
--port 8090 \
|
||||
--dags-dir /home/lucas/fn_registry/apps/dag_engine/dags_migrated \
|
||||
--db /home/lucas/fn_registry/apps/dag_engine/dag_engine.db \
|
||||
--scheduler
|
||||
```
|
||||
|
||||
Y reload + restart:
|
||||
```bash
|
||||
systemctl --user daemon-reload
|
||||
systemctl --user restart dag_engine.service
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 2. Anadir un DAG nuevo (workflow)
|
||||
|
||||
### Paso a paso
|
||||
|
||||
1. **Crear YAML** en `apps/dag_engine/dags_migrated/<nombre>.yaml` (ver formato en seccion 3).
|
||||
2. **Validar** sin ejecutar:
|
||||
```bash
|
||||
./apps/dag_engine/dag_engine validate apps/dag_engine/dags_migrated/<nombre>.yaml
|
||||
```
|
||||
Salida esperada: `Validation: PASS`. Si falla, ver seccion 5 (diagnostico).
|
||||
3. **Probar ejecucion manual** una vez:
|
||||
```bash
|
||||
./apps/dag_engine/dag_engine run apps/dag_engine/dags_migrated/<nombre>.yaml
|
||||
```
|
||||
4. **Recargar scheduler** (toma el YAML automaticamente al iterar el dir):
|
||||
```bash
|
||||
systemctl --user restart dag_engine.service
|
||||
journalctl --user-unit dag_engine.service -n 30 --no-pager
|
||||
```
|
||||
Busca la linea `[scheduler] ticker started for <nombre> (<cron>)` en los logs.
|
||||
5. **Verificar en frontend**:
|
||||
- C++ ImGui: panel `DAGs` muestra el nuevo DAG. Pulsa `Refresh` si no aparece.
|
||||
- Web: `http://localhost:8090`.
|
||||
|
||||
### Disparo manual desde curl o frontend
|
||||
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:8090/api/dags/<nombre>/run
|
||||
```
|
||||
|
||||
Devuelve `{"dag":"<nombre>","run_id":"...","status":"accepted"}` y dispara el WS broadcast — los frontends ven la run en `<1s`.
|
||||
|
||||
---
|
||||
|
||||
## 3. Formato YAML
|
||||
|
||||
dag_engine es **compatible con el formato Dagu**. Los YAMLs heredados de `~/dagu/dags/` validan sin modificaciones.
|
||||
|
||||
### Ejemplo completo
|
||||
|
||||
```yaml
|
||||
name: my_pipeline
|
||||
description: "Pipeline diario que importa CSV y actualiza Metabase."
|
||||
group: finanzas # opcional, agrupa DAGs en listados
|
||||
type: graph # opcional: graph (default) | chain
|
||||
tags: [daily, csv, metabase] # opcional, filtros en la UI
|
||||
|
||||
# Variables de entorno (heredadas por todos los steps).
|
||||
env:
|
||||
- DATA_DIR: /home/lucas/data
|
||||
- SLACK_HOOK: ${SLACK_HOOK_PROD} # interpolacion de ENV del host
|
||||
|
||||
# Cron schedule. Puede ser string o lista.
|
||||
schedule:
|
||||
- "0 9 * * *" # 09:00 todos los dias
|
||||
- "0 21 * * 5" # 21:00 viernes (segundo trigger)
|
||||
|
||||
# Working dir + shell por defecto para todos los steps.
|
||||
working_dir: /home/lucas/fn_registry
|
||||
shell: /bin/bash
|
||||
timeout_sec: 1800 # 30 min para todo el DAG
|
||||
|
||||
steps:
|
||||
- name: ingest
|
||||
description: "Descarga CSV."
|
||||
command: ./bash/functions/pipelines/ingest_csv.sh
|
||||
timeout_sec: 300 # 5 min para este step
|
||||
env:
|
||||
- SOURCE_URL: https://example.com/data.csv
|
||||
|
||||
- name: transform
|
||||
description: "Limpieza y agregacion."
|
||||
script: |
|
||||
#!/usr/bin/env python3
|
||||
import pandas as pd
|
||||
df = pd.read_csv("$DATA_DIR/raw.csv")
|
||||
df.to_parquet("$DATA_DIR/clean.parquet")
|
||||
depends: [ingest] # debe terminar OK antes
|
||||
retry_policy:
|
||||
limit: 2 # reintentos en caso de fallo
|
||||
interval_sec: 60
|
||||
|
||||
- name: load_metabase
|
||||
command: ./bash/functions/metabase/refresh_dashboard.sh
|
||||
depends: [transform]
|
||||
continue_on:
|
||||
failure: true # no aborta el DAG aunque falle
|
||||
|
||||
- name: notify
|
||||
command: ./bash/functions/io/slack_send.sh "pipeline OK"
|
||||
depends: [load_metabase]
|
||||
|
||||
# Hooks de ciclo de vida.
|
||||
handler_on:
|
||||
success: ./bash/functions/io/notify_success.sh
|
||||
failure: ./bash/functions/io/notify_failure.sh
|
||||
exit: ./bash/functions/io/cleanup.sh
|
||||
```
|
||||
|
||||
### Campos del DAG (top-level)
|
||||
|
||||
| Campo | Tipo | Default | Que |
|
||||
|---|---|---|---|
|
||||
| `name` | string | (obligatorio) | Identificador unico. Debe matchear el filename sin extension. |
|
||||
| `description` | string | "" | Texto libre, aparece en la UI. |
|
||||
| `group` | string | "" | Agrupa DAGs en listados. |
|
||||
| `type` | string | `""` (graph) | `graph` o `chain`. graph = grafo dirigido por `depends`. chain = ejecucion secuencial implicita. |
|
||||
| `working_dir` | string | cwd del server | Path absoluto desde donde lanzar los steps. |
|
||||
| `shell` | string | `/bin/sh` | Shell para `command:`. |
|
||||
| `env` | list/map | [] | Variables de entorno DAG-wide. |
|
||||
| `schedule` | string/list | "" | Cron expressions (5 campos: min hour dom mon dow). Vacio = solo manual. |
|
||||
| `steps` | list | (obligatorio) | Pasos del DAG (>=1). |
|
||||
| `handler_on` | map | null | Hooks `init/success/failure/exit`. Alias: `handlers`. |
|
||||
| `tags` | list[string] | [] | Filtros en la UI. |
|
||||
| `timeout_sec` | int | 0 (sin timeout) | Timeout global del DAG en segundos. |
|
||||
|
||||
### Campos de cada step
|
||||
|
||||
| Campo | Tipo | Default | Que |
|
||||
|---|---|---|---|
|
||||
| `name` | string | (obligatorio) | Identificador del step dentro del DAG. |
|
||||
| `id` | string | "" | Override del id auto-generado. |
|
||||
| `description` | string | "" | Texto libre. |
|
||||
| `command` | string | "" | Comando shell (mutuamente excluyente con `script`). |
|
||||
| `script` | string | "" | Bloque heredoc. Util para Python/Lua inline. |
|
||||
| `args` | list[string] | [] | Args extra para `command`. |
|
||||
| `shell` | string | hereda | Override del shell. |
|
||||
| `dir` / `working_dir` | string | hereda | Working dir para este step. |
|
||||
| `depends` | list[string] | [] | Steps que deben terminar OK antes. Si vacio + `type:graph`, arranca en paralelo. |
|
||||
| `env` | list/map | hereda | Env del step (sobrescribe el del DAG). |
|
||||
| `continue_on.failure` | bool | false | Si true, el DAG sigue aunque este step falle. |
|
||||
| `continue_on.skipped` | bool | false | Si true, dependientes corren aunque este step quede skipped. |
|
||||
| `retry_policy.limit` | int | 0 | Reintentos. |
|
||||
| `retry_policy.interval_sec` | int | 0 | Segundos entre reintentos. |
|
||||
| `timeout_sec` | int | 0 (sin timeout) | Timeout del step. |
|
||||
| `output` | string | "" | Nombre de variable donde guardar stdout (consumible por dependientes). |
|
||||
| `tags` | list[string] | [] | Tags por step (UI). |
|
||||
|
||||
### Cron schedule
|
||||
|
||||
5 campos clasicos: `min hour dom mon dow`. Ejemplos:
|
||||
|
||||
| Expresion | Significado |
|
||||
|---|---|
|
||||
| `0 9 * * *` | Todos los dias a las 09:00 |
|
||||
| `*/15 * * * *` | Cada 15 minutos |
|
||||
| `0 */6 * * *` | Cada 6 horas en punto |
|
||||
| `0 9 * * 1-5` | 09:00 lunes-viernes |
|
||||
| `0 21 * * 5` | 21:00 viernes |
|
||||
|
||||
Multiples cron en `schedule:` -> el DAG dispara por cada uno.
|
||||
|
||||
---
|
||||
|
||||
## 4. Comandos CLI
|
||||
|
||||
```bash
|
||||
./dag_engine run <path.yaml> # ejecuta un DAG ad-hoc
|
||||
./dag_engine list [dir] # lista DAGs con schedule + ultimo status
|
||||
./dag_engine status [dag_name] # historial de ejecuciones
|
||||
./dag_engine validate <path.yaml> # parse + validate (no ejecuta)
|
||||
./dag_engine server # arranca HTTP + WS hub + frontend embebido
|
||||
```
|
||||
|
||||
Flags del `server`:
|
||||
|
||||
| Flag | Default | Que |
|
||||
|---|---|---|
|
||||
| `--port` | 8090 | Puerto HTTP. |
|
||||
| `--dags-dir` | `~/dagu/dags` | Dir scaneado para YAMLs. |
|
||||
| `--db` | `dag_engine.db` | SQLite con `dag_runs` + `dag_step_results`. |
|
||||
| `--scheduler` | false | Si presente, arranca cron tickers automaticamente. |
|
||||
|
||||
---
|
||||
|
||||
## 5. Que hacer si algo falla
|
||||
|
||||
### 5.1. El DAG no aparece en la UI
|
||||
|
||||
**Sintoma:** anadiste un YAML pero `GET /api/dags` no lo lista.
|
||||
|
||||
| Causa | Diagnostico | Fix |
|
||||
|---|---|---|
|
||||
| YAML invalido | `./dag_engine validate <path>` muestra el error. | Corregir segun el mensaje (campo desconocido, indentacion, type wrong). |
|
||||
| Filename con extension fuera de `.yaml`/`.yml` | `ls apps/dag_engine/dags_migrated/` | Renombrar a `.yaml`. |
|
||||
| El servidor apunta a otro dir | `systemctl --user cat dag_engine.service` -> ver `--dags-dir`. | Ajustar unit y `daemon-reload + restart`. |
|
||||
| Cache UI antiguo | C++: pulsa `Refresh`. Web: `Ctrl+F5`. | — |
|
||||
|
||||
### 5.2. Validation: FAIL
|
||||
|
||||
`validate` muestra `parse error: ...` o `Validation: FAIL`. Causas tipicas:
|
||||
|
||||
| Mensaje | Causa | Fix |
|
||||
|---|---|---|
|
||||
| `yaml unmarshal: ...` | Sintaxis YAML rota (indentacion, tab vs espacios). | Usar 2 espacios consistentes. Validar online con `yamllint`. |
|
||||
| `dag_parse: step[N]: name is required` | Step sin `name:`. | Anadir `name:`. |
|
||||
| `dag_parse: step[N]: command or script required` | Step sin `command` ni `script`. | Anadir uno de los dos. |
|
||||
| `cycle detected: A -> B -> A` | `depends` forma ciclo. | Romper la dependencia o convertir uno de los nodos en step distinto. |
|
||||
| `unknown depends: <step>` | `depends:` referencia un step inexistente. | Comprobar nombres exactos (case-sensitive). |
|
||||
| `invalid cron: <expr>` | Cron mal formado (4 o 6 campos en vez de 5). | Verificar `0 9 * * *` (5 campos). |
|
||||
|
||||
### 5.3. El DAG corre pero un step falla
|
||||
|
||||
**Sintoma:** `status: failed` en la UI.
|
||||
|
||||
1. Abre `DAG Detail` y haz doble-click en el run rojo -> `Run Detail`.
|
||||
2. Expande el step que fallo (CollapsingHeader). Muestra `stdout` + `stderr`.
|
||||
3. Errores tipicos:
|
||||
|
||||
| stderr | Causa | Fix |
|
||||
|---|---|---|
|
||||
| `command not found` | `command:` apunta a un binario fuera de `PATH`. | Path absoluto o setear `env: [PATH: ...]`. |
|
||||
| `permission denied` | Script sin `chmod +x`. | `chmod +x <script>` (o usar `bash <script>`). |
|
||||
| `no such file or directory` | `working_dir:` mal o ruta relativa rota. | Path absoluto en `working_dir:`. |
|
||||
| Timeout | Step duro mas que `timeout_sec`. | Subir el limite o partir el step. |
|
||||
| Exit 137 / OOM kill | Out-of-memory. | Reducir batch o anadir swap. |
|
||||
|
||||
### 5.4. El scheduler no dispara
|
||||
|
||||
**Sintoma:** Hay `schedule:` valido pero el DAG no corre solo.
|
||||
|
||||
1. Verifica que el server arranco con `--scheduler`:
|
||||
```bash
|
||||
systemctl --user cat dag_engine.service | grep scheduler
|
||||
```
|
||||
2. Logs:
|
||||
```bash
|
||||
journalctl --user-unit dag_engine.service -n 50 --no-pager | grep -E "scheduler|ticker"
|
||||
```
|
||||
Debes ver `[scheduler] ticker started for <name> (<cron>), next: <ISO8601>`.
|
||||
3. Si `next:` es muy lejano (ej. en una semana) y necesitas probar -> dispara manual:
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:8090/api/dags/<name>/run
|
||||
```
|
||||
4. Hora del sistema descalibrada:
|
||||
```bash
|
||||
timedatectl status
|
||||
```
|
||||
Si difiere de la hora real, `sudo timedatectl set-ntp true`.
|
||||
|
||||
### 5.5. El frontend C++ no conecta WS
|
||||
|
||||
**Sintoma:** Panel `Live (WS)` muestra `disconnected`.
|
||||
|
||||
| Causa | Fix |
|
||||
|---|---|
|
||||
| Servidor caido | `systemctl --user status dag_engine.service`, `restart` si `inactive`. |
|
||||
| Puerto cambiado | El cliente apunta a `127.0.0.1:8090` por codigo (constante `g_ws_port`). Reedificar si cambiaste el puerto del server. |
|
||||
| Firewall Windows -> WSL | WSL2 expone `localhost`, normalmente OK. Si falla: `wsl --shutdown` y reabrir. |
|
||||
|
||||
### 5.6. Cleanup de runs viejos
|
||||
|
||||
`dag_runs` y `dag_step_results` crecen sin limite. Para limpiar:
|
||||
|
||||
```bash
|
||||
sqlite3 apps/dag_engine/dag_engine.db <<'SQL'
|
||||
DELETE FROM dag_step_results WHERE run_id IN (
|
||||
SELECT id FROM dag_runs WHERE started_at < datetime('now', '-30 days')
|
||||
);
|
||||
DELETE FROM dag_runs WHERE started_at < datetime('now', '-30 days');
|
||||
VACUUM;
|
||||
SQL
|
||||
```
|
||||
|
||||
### 5.7. Restaurar desde backup
|
||||
|
||||
Si rompes `dags_migrated/`:
|
||||
|
||||
```bash
|
||||
tar -xzf ~/backups/dagu_pre_removal_20260515.tar.gz -C /tmp/
|
||||
cp /tmp/dagu/dags/*.yaml apps/dag_engine/dags_migrated/
|
||||
systemctl --user restart dag_engine.service
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 6. Endpoints HTTP
|
||||
|
||||
| Metodo | Path | Que |
|
||||
|---|---|---|
|
||||
| GET | `/api/dags` | Lista DAGs + last_run + last_runs[5]. |
|
||||
| GET | `/api/dags/{name}` | Detalle + validation. |
|
||||
| POST | `/api/dags/{name}/run` | Dispara ejecucion (trigger=`api`). Devuelve `run_id`. |
|
||||
| GET | `/api/runs` | Historial. Query: `dag`, `limit`, `offset`. |
|
||||
| GET | `/api/runs/{id}` | Detalle de un run + sus step_results. |
|
||||
| GET | `/api/ws/dagruns` | WebSocket. Snapshot + deltas en vivo (issue 0095). |
|
||||
| GET | `/api/scheduler/status` | Tickers activos. |
|
||||
| POST | `/api/scheduler/start` | Arranca scheduler (si no estaba). |
|
||||
| POST | `/api/scheduler/stop` | Para scheduler. |
|
||||
|
||||
---
|
||||
|
||||
## 7. Referencias
|
||||
|
||||
- Schema parser: `functions/core/dag_parse.go` (frontmatter en `dag_parse_go_core`).
|
||||
- Validator: `functions/core/dag_validate.go` (`dag_validate_go_core`).
|
||||
- Topo sort: `functions/core/dag_topo_sort.go` (`dag_topo_sort_go_core`).
|
||||
- Cron: `functions/core/parse_cron_expr.go` + `next_cron_time.go`.
|
||||
- Frontend C++: `cpp/apps/dag_engine_ui/` (issue 0095).
|
||||
- WS hub: `apps/dag_engine/events.go`.
|
||||
- Migracion desde Dagu: 2026-05-15. Backup en `~/backups/dagu_pre_removal_20260515.tar.gz`.
|
||||
@@ -83,3 +83,8 @@ cd .. && CGO_ENABLED=1 go build -tags fts5 -o dag-engine .
|
||||
|
||||
Compatible con el formato YAML de Dagu. Lee DAGs existentes de `~/dagu/dags/` sin modificaciones.
|
||||
Puerto por defecto 8090 (mismo que Dagu).
|
||||
|
||||
## Documentacion de usuario
|
||||
|
||||
Guia completa (formato YAML, anadir DAGs, troubleshooting, endpoints HTTP):
|
||||
**[apps/dag_engine/README.md](README.md)**.
|
||||
|
||||
+1
-1
Submodule cpp/apps/dag_engine_ui updated: 4cb36a92e8...f150f96c8f
Reference in New Issue
Block a user