# dag_engine — Guia de uso Motor de DAGs propio (reemplazo de Dagu). Backend Go + frontend web (Vite/React) + frontend C++ ImGui (`cpp/apps/dag_engine_ui`). Doc canonica para **anadir DAGs**, **formato YAML**, **comandos CLI**, y **diagnostico de fallos**. --- ## 1. Donde viven los DAGs | Path | Que | |---|---| | `apps/dag_engine/dags_migrated/` | DAGs activos servidos por `dag_engine.service` (systemd user unit). | | `~/dagu/dags/` | Path por defecto del binario si no se pasa `--dags-dir`. Vacio tras la migracion del 2026-05-15 (ver tag `dagu_pre_removal`). | | `~/backups/dagu_pre_removal_.tar.gz` | Backup completo de la carpeta dagu antes de borrar. | Por defecto el systemd unit apunta a `apps/dag_engine/dags_migrated/`. Para usar otro dir, edita `~/.config/systemd/user/dag_engine.service`: ```ini ExecStart=/home/lucas/fn_registry/apps/dag_engine/dag_engine server \ --port 8090 \ --dags-dir /home/lucas/fn_registry/apps/dag_engine/dags_migrated \ --db /home/lucas/fn_registry/apps/dag_engine/dag_engine.db \ --scheduler ``` Y reload + restart: ```bash systemctl --user daemon-reload systemctl --user restart dag_engine.service ``` --- ## 2. Anadir un DAG nuevo (workflow) ### Paso a paso 1. **Crear YAML** en `apps/dag_engine/dags_migrated/.yaml` (ver formato en seccion 3). 2. **Validar** sin ejecutar: ```bash ./apps/dag_engine/dag_engine validate apps/dag_engine/dags_migrated/.yaml ``` Salida esperada: `Validation: PASS`. Si falla, ver seccion 5 (diagnostico). 3. **Probar ejecucion manual** una vez: ```bash ./apps/dag_engine/dag_engine run apps/dag_engine/dags_migrated/.yaml ``` 4. **Recargar scheduler** (toma el YAML automaticamente al iterar el dir): ```bash systemctl --user restart dag_engine.service journalctl --user-unit dag_engine.service -n 30 --no-pager ``` Busca la linea `[scheduler] ticker started for ()` en los logs. 5. **Verificar en frontend**: - C++ ImGui: panel `DAGs` muestra el nuevo DAG. Pulsa `Refresh` si no aparece. - Web: `http://localhost:8090`. ### Disparo manual desde curl o frontend ```bash curl -X POST http://127.0.0.1:8090/api/dags//run ``` Devuelve `{"dag":"","run_id":"...","status":"accepted"}` y dispara el WS broadcast — los frontends ven la run en `<1s`. --- ## 3. Formato YAML dag_engine es **compatible con el formato Dagu**. Los YAMLs heredados de `~/dagu/dags/` validan sin modificaciones. ### Ejemplo completo ```yaml name: my_pipeline description: "Pipeline diario que importa CSV y actualiza Metabase." group: finanzas # opcional, agrupa DAGs en listados type: graph # opcional: graph (default) | chain tags: [daily, csv, metabase] # opcional, filtros en la UI # Variables de entorno (heredadas por todos los steps). env: - DATA_DIR: /home/lucas/data - SLACK_HOOK: ${SLACK_HOOK_PROD} # interpolacion de ENV del host # Cron schedule. Puede ser string o lista. schedule: - "0 9 * * *" # 09:00 todos los dias - "0 21 * * 5" # 21:00 viernes (segundo trigger) # Working dir + shell por defecto para todos los steps. working_dir: /home/lucas/fn_registry shell: /bin/bash timeout_sec: 1800 # 30 min para todo el DAG steps: - name: ingest description: "Descarga CSV." command: ./bash/functions/pipelines/ingest_csv.sh timeout_sec: 300 # 5 min para este step env: - SOURCE_URL: https://example.com/data.csv - name: transform description: "Limpieza y agregacion." script: | #!/usr/bin/env python3 import pandas as pd df = pd.read_csv("$DATA_DIR/raw.csv") df.to_parquet("$DATA_DIR/clean.parquet") depends: [ingest] # debe terminar OK antes retry_policy: limit: 2 # reintentos en caso de fallo interval_sec: 60 - name: load_metabase command: ./bash/functions/metabase/refresh_dashboard.sh depends: [transform] continue_on: failure: true # no aborta el DAG aunque falle - name: notify command: ./bash/functions/io/slack_send.sh "pipeline OK" depends: [load_metabase] # Hooks de ciclo de vida. handler_on: success: ./bash/functions/io/notify_success.sh failure: ./bash/functions/io/notify_failure.sh exit: ./bash/functions/io/cleanup.sh ``` ### Campos del DAG (top-level) | Campo | Tipo | Default | Que | |---|---|---|---| | `name` | string | (obligatorio) | Identificador unico. Debe matchear el filename sin extension. | | `description` | string | "" | Texto libre, aparece en la UI. | | `group` | string | "" | Agrupa DAGs en listados. | | `type` | string | `""` (graph) | `graph` o `chain`. graph = grafo dirigido por `depends`. chain = ejecucion secuencial implicita. | | `working_dir` | string | cwd del server | Path absoluto desde donde lanzar los steps. | | `shell` | string | `/bin/sh` | Shell para `command:`. | | `env` | list/map | [] | Variables de entorno DAG-wide. | | `schedule` | string/list | "" | Cron expressions (5 campos: min hour dom mon dow). Vacio = solo manual. | | `steps` | list | (obligatorio) | Pasos del DAG (>=1). | | `handler_on` | map | null | Hooks `init/success/failure/exit`. Alias: `handlers`. | | `tags` | list[string] | [] | Filtros en la UI. | | `timeout_sec` | int | 0 (sin timeout) | Timeout global del DAG en segundos. | ### Campos de cada step | Campo | Tipo | Default | Que | |---|---|---|---| | `name` | string | (obligatorio) | Identificador del step dentro del DAG. | | `id` | string | "" | Override del id auto-generado. | | `description` | string | "" | Texto libre. | | `command` | string | "" | Comando shell (mutuamente excluyente con `script`/`function`). | | `script` | string | "" | Bloque heredoc. Util para Python/Lua inline. | | `function` | string | "" | ID de funcion del registry (ej `audit_capability_groups_go_infra`). Si set, executor invoca `${FN_REGISTRY_ROOT}/fn run ` y captura `function_id` en `dag_step_results`. Mutuamente exclusivo con `command`/`script`; si convive, gana `function`. | | `args` | list[string] | [] | Args extra para `command` o para la `function`. | | `shell` | string | hereda | Override del shell. | | `dir` / `working_dir` | string | hereda | Working dir para este step. | | `depends` | list[string] | [] | Steps que deben terminar OK antes. Si vacio + `type:graph`, arranca en paralelo. | | `env` | list/map | hereda | Env del step (sobrescribe el del DAG). | | `continue_on.failure` | bool | false | Si true, el DAG sigue aunque este step falle. | | `continue_on.skipped` | bool | false | Si true, dependientes corren aunque este step quede skipped. | | `retry_policy.limit` | int | 0 | Reintentos. | | `retry_policy.interval_sec` | int | 0 | Segundos entre reintentos. | | `timeout_sec` | int | 0 (sin timeout) | Timeout del step. | | `output` | string | "" | Nombre de variable donde guardar stdout (consumible por dependientes). | | `tags` | list[string] | [] | Tags por step (UI). | ### Function steps (coherencia con el registry) Un DAG idiomatico llama funciones del registry, no scripts ad-hoc. Cada step `function:` queda trazado en `call_monitor.calls` por el hook PostToolUse del agente y en `dag_step_results.function_id` del propio dag_engine — el bucle reactivo (issue 0085) tiene visibilidad end-to-end. ```yaml steps: - name: audit_capabilities function: audit_capability_groups_go_infra args: ["--json"] description: "Audita drift entre tags de capability group y paginas madre" ``` Ventajas vs `command: ./fn run ...`: - `function_id` se persiste como columna dedicada en `dag_step_results` (filtrable, agrupable). - El frontend `dag_engine_ui` muestra badge + panel lateral con `uses_functions` (subfunciones que el step va a usar transitivamente). - API: `GET /api/functions/{id}` devuelve `{id, name, description, signature, purity, domain, lang, uses_functions[], uses_types[]}` leyendo `registry.db` read-only. La UI consume este endpoint al expandir un step. - Validator regex en `dag_validate`: `^[a-z0-9_]+_[a-z]+_[a-z]+$`. ID invalido = error. - Variables de entorno: `FN_REGISTRY_ROOT` (default `/home/lucas/fn_registry`) localiza el binario `fn`. Override con `FN_BIN=/path/al/fn`. - **`FN_REGISTRY_ROOT` obligatorio cuando el servicio corre via systemd** con `WorkingDirectory` fuera del root del registry. El binario `fn` resuelve `registry.db` por (1) env var, (2) walk-up buscando `go.mod`, (3) exe dir. Si (1) no esta y (2) encuentra el `go.mod` del propio servicio (ej. `apps/dag_engine/go.mod`), devuelve un dir donde `registry.db` no existe o esta stale, fallando con `error: function "" not found`. Bug historico: `apps/dag_engine/registry.db` stale (May 15) tumbo 3 noches `fn_backup` + `daily-registry-audit`. Defensa en profundidad: el executor exporta `FN_REGISTRY_ROOT` y hace `cd $FN_REGISTRY_ROOT` antes del spawn de steps `function:` (executor.go), pero el `Environment=FN_REGISTRY_ROOT=...` del systemd unit sigue siendo la fuente de verdad. - **`PATH` en el systemd unit**: si steps `function:` invocan funciones Go sin tests (`go vet`) o Python (`python3`), el `PATH` del entorno systemd debe incluir esos binarios — declarar `Environment=PATH=/usr/local/go/bin:/home/lucas/go/bin:/home/lucas/.local/bin:/usr/local/bin:/usr/bin:/bin`. Ejemplo completo: `~/.dagu/dags/example-fn-call.yaml`. ### Cron schedule 5 campos clasicos: `min hour dom mon dow`. Ejemplos: | Expresion | Significado | |---|---| | `0 9 * * *` | Todos los dias a las 09:00 | | `*/15 * * * *` | Cada 15 minutos | | `0 */6 * * *` | Cada 6 horas en punto | | `0 9 * * 1-5` | 09:00 lunes-viernes | | `0 21 * * 5` | 21:00 viernes | Multiples cron en `schedule:` -> el DAG dispara por cada uno. --- ## 4. Comandos CLI ```bash ./dag_engine run # ejecuta un DAG ad-hoc ./dag_engine list [dir] # lista DAGs con schedule + ultimo status ./dag_engine status [dag_name] # historial de ejecuciones ./dag_engine validate # parse + validate (no ejecuta) ./dag_engine server # arranca HTTP + WS hub + frontend embebido ``` Flags del `server`: | Flag | Default | Que | |---|---|---| | `--port` | 8090 | Puerto HTTP. | | `--dags-dir` | `~/dagu/dags` | Dir scaneado para YAMLs. | | `--db` | `dag_engine.db` | SQLite con `dag_runs` + `dag_step_results`. | | `--scheduler` | false | Si presente, arranca cron tickers automaticamente. | --- ## 5. Que hacer si algo falla ### 5.1. El DAG no aparece en la UI **Sintoma:** anadiste un YAML pero `GET /api/dags` no lo lista. | Causa | Diagnostico | Fix | |---|---|---| | YAML invalido | `./dag_engine validate ` muestra el error. | Corregir segun el mensaje (campo desconocido, indentacion, type wrong). | | Filename con extension fuera de `.yaml`/`.yml` | `ls apps/dag_engine/dags_migrated/` | Renombrar a `.yaml`. | | El servidor apunta a otro dir | `systemctl --user cat dag_engine.service` -> ver `--dags-dir`. | Ajustar unit y `daemon-reload + restart`. | | Cache UI antiguo | C++: pulsa `Refresh`. Web: `Ctrl+F5`. | — | ### 5.2. Validation: FAIL `validate` muestra `parse error: ...` o `Validation: FAIL`. Causas tipicas: | Mensaje | Causa | Fix | |---|---|---| | `yaml unmarshal: ...` | Sintaxis YAML rota (indentacion, tab vs espacios). | Usar 2 espacios consistentes. Validar online con `yamllint`. | | `dag_parse: step[N]: name is required` | Step sin `name:`. | Anadir `name:`. | | `dag_parse: step[N]: command or script required` | Step sin `command` ni `script`. | Anadir uno de los dos. | | `cycle detected: A -> B -> A` | `depends` forma ciclo. | Romper la dependencia o convertir uno de los nodos en step distinto. | | `unknown depends: ` | `depends:` referencia un step inexistente. | Comprobar nombres exactos (case-sensitive). | | `invalid cron: ` | Cron mal formado (4 o 6 campos en vez de 5). | Verificar `0 9 * * *` (5 campos). | ### 5.3. El DAG corre pero un step falla **Sintoma:** `status: failed` en la UI. 1. Abre `DAG Detail` y haz doble-click en el run rojo -> `Run Detail`. 2. Expande el step que fallo (CollapsingHeader). Muestra `stdout` + `stderr`. 3. Errores tipicos: | stderr | Causa | Fix | |---|---|---| | `command not found` | `command:` apunta a un binario fuera de `PATH`. | Path absoluto o setear `env: [PATH: ...]`. | | `permission denied` | Script sin `chmod +x`. | `chmod +x