diff --git a/apps/dag_engine/README.md b/apps/dag_engine/README.md new file mode 100644 index 00000000..13715362 --- /dev/null +++ b/apps/dag_engine/README.md @@ -0,0 +1,335 @@ +# dag_engine — Guia de uso + +Motor de DAGs propio (reemplazo de Dagu). Backend Go + frontend web (Vite/React) + frontend C++ ImGui (`cpp/apps/dag_engine_ui`). + +Doc canonica para **anadir DAGs**, **formato YAML**, **comandos CLI**, y **diagnostico de fallos**. + +--- + +## 1. Donde viven los DAGs + +| Path | Que | +|---|---| +| `apps/dag_engine/dags_migrated/` | DAGs activos servidos por `dag_engine.service` (systemd user unit). | +| `~/dagu/dags/` | Path por defecto del binario si no se pasa `--dags-dir`. Vacio tras la migracion del 2026-05-15 (ver tag `dagu_pre_removal`). | +| `~/backups/dagu_pre_removal_.tar.gz` | Backup completo de la carpeta dagu antes de borrar. | + +Por defecto el systemd unit apunta a `apps/dag_engine/dags_migrated/`. Para usar otro dir, edita `~/.config/systemd/user/dag_engine.service`: + +```ini +ExecStart=/home/lucas/fn_registry/apps/dag_engine/dag_engine server \ + --port 8090 \ + --dags-dir /home/lucas/fn_registry/apps/dag_engine/dags_migrated \ + --db /home/lucas/fn_registry/apps/dag_engine/dag_engine.db \ + --scheduler +``` + +Y reload + restart: +```bash +systemctl --user daemon-reload +systemctl --user restart dag_engine.service +``` + +--- + +## 2. Anadir un DAG nuevo (workflow) + +### Paso a paso + +1. **Crear YAML** en `apps/dag_engine/dags_migrated/.yaml` (ver formato en seccion 3). +2. **Validar** sin ejecutar: + ```bash + ./apps/dag_engine/dag_engine validate apps/dag_engine/dags_migrated/.yaml + ``` + Salida esperada: `Validation: PASS`. Si falla, ver seccion 5 (diagnostico). +3. **Probar ejecucion manual** una vez: + ```bash + ./apps/dag_engine/dag_engine run apps/dag_engine/dags_migrated/.yaml + ``` +4. **Recargar scheduler** (toma el YAML automaticamente al iterar el dir): + ```bash + systemctl --user restart dag_engine.service + journalctl --user-unit dag_engine.service -n 30 --no-pager + ``` + Busca la linea `[scheduler] ticker started for ()` en los logs. +5. **Verificar en frontend**: + - C++ ImGui: panel `DAGs` muestra el nuevo DAG. Pulsa `Refresh` si no aparece. + - Web: `http://localhost:8090`. + +### Disparo manual desde curl o frontend + +```bash +curl -X POST http://127.0.0.1:8090/api/dags//run +``` + +Devuelve `{"dag":"","run_id":"...","status":"accepted"}` y dispara el WS broadcast — los frontends ven la run en `<1s`. + +--- + +## 3. Formato YAML + +dag_engine es **compatible con el formato Dagu**. Los YAMLs heredados de `~/dagu/dags/` validan sin modificaciones. + +### Ejemplo completo + +```yaml +name: my_pipeline +description: "Pipeline diario que importa CSV y actualiza Metabase." +group: finanzas # opcional, agrupa DAGs en listados +type: graph # opcional: graph (default) | chain +tags: [daily, csv, metabase] # opcional, filtros en la UI + +# Variables de entorno (heredadas por todos los steps). +env: + - DATA_DIR: /home/lucas/data + - SLACK_HOOK: ${SLACK_HOOK_PROD} # interpolacion de ENV del host + +# Cron schedule. Puede ser string o lista. +schedule: + - "0 9 * * *" # 09:00 todos los dias + - "0 21 * * 5" # 21:00 viernes (segundo trigger) + +# Working dir + shell por defecto para todos los steps. +working_dir: /home/lucas/fn_registry +shell: /bin/bash +timeout_sec: 1800 # 30 min para todo el DAG + +steps: + - name: ingest + description: "Descarga CSV." + command: ./bash/functions/pipelines/ingest_csv.sh + timeout_sec: 300 # 5 min para este step + env: + - SOURCE_URL: https://example.com/data.csv + + - name: transform + description: "Limpieza y agregacion." + script: | + #!/usr/bin/env python3 + import pandas as pd + df = pd.read_csv("$DATA_DIR/raw.csv") + df.to_parquet("$DATA_DIR/clean.parquet") + depends: [ingest] # debe terminar OK antes + retry_policy: + limit: 2 # reintentos en caso de fallo + interval_sec: 60 + + - name: load_metabase + command: ./bash/functions/metabase/refresh_dashboard.sh + depends: [transform] + continue_on: + failure: true # no aborta el DAG aunque falle + + - name: notify + command: ./bash/functions/io/slack_send.sh "pipeline OK" + depends: [load_metabase] + +# Hooks de ciclo de vida. +handler_on: + success: ./bash/functions/io/notify_success.sh + failure: ./bash/functions/io/notify_failure.sh + exit: ./bash/functions/io/cleanup.sh +``` + +### Campos del DAG (top-level) + +| Campo | Tipo | Default | Que | +|---|---|---|---| +| `name` | string | (obligatorio) | Identificador unico. Debe matchear el filename sin extension. | +| `description` | string | "" | Texto libre, aparece en la UI. | +| `group` | string | "" | Agrupa DAGs en listados. | +| `type` | string | `""` (graph) | `graph` o `chain`. graph = grafo dirigido por `depends`. chain = ejecucion secuencial implicita. | +| `working_dir` | string | cwd del server | Path absoluto desde donde lanzar los steps. | +| `shell` | string | `/bin/sh` | Shell para `command:`. | +| `env` | list/map | [] | Variables de entorno DAG-wide. | +| `schedule` | string/list | "" | Cron expressions (5 campos: min hour dom mon dow). Vacio = solo manual. | +| `steps` | list | (obligatorio) | Pasos del DAG (>=1). | +| `handler_on` | map | null | Hooks `init/success/failure/exit`. Alias: `handlers`. | +| `tags` | list[string] | [] | Filtros en la UI. | +| `timeout_sec` | int | 0 (sin timeout) | Timeout global del DAG en segundos. | + +### Campos de cada step + +| Campo | Tipo | Default | Que | +|---|---|---|---| +| `name` | string | (obligatorio) | Identificador del step dentro del DAG. | +| `id` | string | "" | Override del id auto-generado. | +| `description` | string | "" | Texto libre. | +| `command` | string | "" | Comando shell (mutuamente excluyente con `script`). | +| `script` | string | "" | Bloque heredoc. Util para Python/Lua inline. | +| `args` | list[string] | [] | Args extra para `command`. | +| `shell` | string | hereda | Override del shell. | +| `dir` / `working_dir` | string | hereda | Working dir para este step. | +| `depends` | list[string] | [] | Steps que deben terminar OK antes. Si vacio + `type:graph`, arranca en paralelo. | +| `env` | list/map | hereda | Env del step (sobrescribe el del DAG). | +| `continue_on.failure` | bool | false | Si true, el DAG sigue aunque este step falle. | +| `continue_on.skipped` | bool | false | Si true, dependientes corren aunque este step quede skipped. | +| `retry_policy.limit` | int | 0 | Reintentos. | +| `retry_policy.interval_sec` | int | 0 | Segundos entre reintentos. | +| `timeout_sec` | int | 0 (sin timeout) | Timeout del step. | +| `output` | string | "" | Nombre de variable donde guardar stdout (consumible por dependientes). | +| `tags` | list[string] | [] | Tags por step (UI). | + +### Cron schedule + +5 campos clasicos: `min hour dom mon dow`. Ejemplos: + +| Expresion | Significado | +|---|---| +| `0 9 * * *` | Todos los dias a las 09:00 | +| `*/15 * * * *` | Cada 15 minutos | +| `0 */6 * * *` | Cada 6 horas en punto | +| `0 9 * * 1-5` | 09:00 lunes-viernes | +| `0 21 * * 5` | 21:00 viernes | + +Multiples cron en `schedule:` -> el DAG dispara por cada uno. + +--- + +## 4. Comandos CLI + +```bash +./dag_engine run # ejecuta un DAG ad-hoc +./dag_engine list [dir] # lista DAGs con schedule + ultimo status +./dag_engine status [dag_name] # historial de ejecuciones +./dag_engine validate # parse + validate (no ejecuta) +./dag_engine server # arranca HTTP + WS hub + frontend embebido +``` + +Flags del `server`: + +| Flag | Default | Que | +|---|---|---| +| `--port` | 8090 | Puerto HTTP. | +| `--dags-dir` | `~/dagu/dags` | Dir scaneado para YAMLs. | +| `--db` | `dag_engine.db` | SQLite con `dag_runs` + `dag_step_results`. | +| `--scheduler` | false | Si presente, arranca cron tickers automaticamente. | + +--- + +## 5. Que hacer si algo falla + +### 5.1. El DAG no aparece en la UI + +**Sintoma:** anadiste un YAML pero `GET /api/dags` no lo lista. + +| Causa | Diagnostico | Fix | +|---|---|---| +| YAML invalido | `./dag_engine validate ` muestra el error. | Corregir segun el mensaje (campo desconocido, indentacion, type wrong). | +| Filename con extension fuera de `.yaml`/`.yml` | `ls apps/dag_engine/dags_migrated/` | Renombrar a `.yaml`. | +| El servidor apunta a otro dir | `systemctl --user cat dag_engine.service` -> ver `--dags-dir`. | Ajustar unit y `daemon-reload + restart`. | +| Cache UI antiguo | C++: pulsa `Refresh`. Web: `Ctrl+F5`. | — | + +### 5.2. Validation: FAIL + +`validate` muestra `parse error: ...` o `Validation: FAIL`. Causas tipicas: + +| Mensaje | Causa | Fix | +|---|---|---| +| `yaml unmarshal: ...` | Sintaxis YAML rota (indentacion, tab vs espacios). | Usar 2 espacios consistentes. Validar online con `yamllint`. | +| `dag_parse: step[N]: name is required` | Step sin `name:`. | Anadir `name:`. | +| `dag_parse: step[N]: command or script required` | Step sin `command` ni `script`. | Anadir uno de los dos. | +| `cycle detected: A -> B -> A` | `depends` forma ciclo. | Romper la dependencia o convertir uno de los nodos en step distinto. | +| `unknown depends: ` | `depends:` referencia un step inexistente. | Comprobar nombres exactos (case-sensitive). | +| `invalid cron: ` | Cron mal formado (4 o 6 campos en vez de 5). | Verificar `0 9 * * *` (5 campos). | + +### 5.3. El DAG corre pero un step falla + +**Sintoma:** `status: failed` en la UI. + +1. Abre `DAG Detail` y haz doble-click en el run rojo -> `Run Detail`. +2. Expande el step que fallo (CollapsingHeader). Muestra `stdout` + `stderr`. +3. Errores tipicos: + +| stderr | Causa | Fix | +|---|---|---| +| `command not found` | `command:` apunta a un binario fuera de `PATH`. | Path absoluto o setear `env: [PATH: ...]`. | +| `permission denied` | Script sin `chmod +x`. | `chmod +x