- .claude/rules/registry_calls.md - apps/dag_engine/README.md - apps/dag_engine/app.md - docs/capabilities/INDEX.md - docs/capabilities/systemd.md - docs/execution_standard.md - dev/proposals_e2e_checks_0121/ - docs/capabilities/backends.md Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
16 KiB
dag_engine — Guia de uso
Motor de DAGs propio del fn_registry. Scheduler oficial del ecosistema (issue 0007a-e + flow 0001). Backend Go + frontend web (Vite/React) + frontend C++ ImGui (cpp/apps/dag_engine_ui).
Doc canonica para anadir DAGs, formato YAML, comandos CLI, y diagnostico de fallos.
1. Donde viven los DAGs
| Path | Que |
|---|---|
apps/dag_engine/dags_migrated/ |
DAGs activos servidos por dag_engine.service (systemd user unit). |
apps/dag_engine/dags_migrated/archive/ |
DAGs deshabilitados (no se cargan por el scheduler). |
Por defecto el systemd unit apunta a apps/dag_engine/dags_migrated/. Para usar otro dir, edita ~/.config/systemd/user/dag_engine.service:
ExecStart=/home/lucas/fn_registry/apps/dag_engine/dag_engine server \
--port 8090 \
--dags-dir /home/lucas/fn_registry/apps/dag_engine/dags_migrated \
--db /home/lucas/fn_registry/apps/dag_engine/dag_engine.db \
--scheduler
Y reload + restart:
systemctl --user daemon-reload
systemctl --user restart dag_engine.service
2. Anadir un DAG nuevo (workflow)
Paso a paso
- Crear YAML en
apps/dag_engine/dags_migrated/<nombre>.yaml(ver formato en seccion 3). - Validar sin ejecutar:
Salida esperada:
./apps/dag_engine/dag_engine validate apps/dag_engine/dags_migrated/<nombre>.yamlValidation: PASS. Si falla, ver seccion 5 (diagnostico). - Probar ejecucion manual una vez:
./apps/dag_engine/dag_engine run apps/dag_engine/dags_migrated/<nombre>.yaml - Recargar scheduler (toma el YAML automaticamente al iterar el dir):
Busca la linea
systemctl --user restart dag_engine.service journalctl --user-unit dag_engine.service -n 30 --no-pager[scheduler] ticker started for <nombre> (<cron>)en los logs. - Verificar en frontend:
- C++ ImGui: panel
DAGsmuestra el nuevo DAG. PulsaRefreshsi no aparece. - Web:
http://localhost:8090.
- C++ ImGui: panel
Disparo manual desde curl o frontend
curl -X POST http://127.0.0.1:8090/api/dags/<nombre>/run
Devuelve {"dag":"<nombre>","run_id":"...","status":"accepted"} y dispara el WS broadcast — los frontends ven la run en <1s.
3. Formato YAML
Formato YAML propio de dag_engine. Schema: name, description, schedule, env, tags, working_dir, steps[], handlers (alias handler_on).
Ejemplo completo
name: my_pipeline
description: "Pipeline diario que importa CSV y actualiza Metabase."
group: finanzas # opcional, agrupa DAGs en listados
type: graph # opcional: graph (default) | chain
tags: [daily, csv, metabase] # opcional, filtros en la UI
# Variables de entorno (heredadas por todos los steps).
env:
- DATA_DIR: /home/lucas/data
- SLACK_HOOK: ${SLACK_HOOK_PROD} # interpolacion de ENV del host
# Cron schedule. Puede ser string o lista.
schedule:
- "0 9 * * *" # 09:00 todos los dias
- "0 21 * * 5" # 21:00 viernes (segundo trigger)
# Working dir + shell por defecto para todos los steps.
working_dir: /home/lucas/fn_registry
shell: /bin/bash
timeout_sec: 1800 # 30 min para todo el DAG
steps:
- name: ingest
description: "Descarga CSV."
command: ./bash/functions/pipelines/ingest_csv.sh
timeout_sec: 300 # 5 min para este step
env:
- SOURCE_URL: https://example.com/data.csv
- name: transform
description: "Limpieza y agregacion."
script: |
#!/usr/bin/env python3
import pandas as pd
df = pd.read_csv("$DATA_DIR/raw.csv")
df.to_parquet("$DATA_DIR/clean.parquet")
depends: [ingest] # debe terminar OK antes
retry_policy:
limit: 2 # reintentos en caso de fallo
interval_sec: 60
- name: load_metabase
command: ./bash/functions/metabase/refresh_dashboard.sh
depends: [transform]
continue_on:
failure: true # no aborta el DAG aunque falle
- name: notify
command: ./bash/functions/io/slack_send.sh "pipeline OK"
depends: [load_metabase]
# Hooks de ciclo de vida.
handler_on:
success: ./bash/functions/io/notify_success.sh
failure: ./bash/functions/io/notify_failure.sh
exit: ./bash/functions/io/cleanup.sh
Campos del DAG (top-level)
| Campo | Tipo | Default | Que |
|---|---|---|---|
name |
string | (obligatorio) | Identificador unico. Debe matchear el filename sin extension. |
description |
string | "" | Texto libre, aparece en la UI. |
group |
string | "" | Agrupa DAGs en listados. |
type |
string | "" (graph) |
graph o chain. graph = grafo dirigido por depends. chain = ejecucion secuencial implicita. |
working_dir |
string | cwd del server | Path absoluto desde donde lanzar los steps. |
shell |
string | /bin/sh |
Shell para command:. |
env |
list/map | [] | Variables de entorno DAG-wide. |
schedule |
string/list | "" | Cron expressions (5 campos: min hour dom mon dow). Vacio = solo manual. |
steps |
list | (obligatorio) | Pasos del DAG (>=1). |
handler_on |
map | null | Hooks init/success/failure/exit. Alias: handlers. |
tags |
list[string] | [] | Filtros en la UI. |
timeout_sec |
int | 0 (sin timeout) | Timeout global del DAG en segundos. |
Campos de cada step
| Campo | Tipo | Default | Que |
|---|---|---|---|
name |
string | (obligatorio) | Identificador del step dentro del DAG. |
id |
string | "" | Override del id auto-generado. |
description |
string | "" | Texto libre. |
command |
string | "" | Comando shell (mutuamente excluyente con script/function). |
script |
string | "" | Bloque heredoc. Util para Python/Lua inline. |
function |
string | "" | ID de funcion del registry (ej audit_capability_groups_go_infra). Si set, executor invoca ${FN_REGISTRY_ROOT}/fn run <id> <args...> y captura function_id en dag_step_results. Mutuamente exclusivo con command/script; si convive, gana function. |
args |
list[string] | [] | Args extra para command o para la function. |
shell |
string | hereda | Override del shell. |
dir / working_dir |
string | hereda | Working dir para este step. |
depends |
list[string] | [] | Steps que deben terminar OK antes. Si vacio + type:graph, arranca en paralelo. |
env |
list/map | hereda | Env del step (sobrescribe el del DAG). |
continue_on.failure |
bool | false | Si true, el DAG sigue aunque este step falle. |
continue_on.skipped |
bool | false | Si true, dependientes corren aunque este step quede skipped. |
retry_policy.limit |
int | 0 | Reintentos. |
retry_policy.interval_sec |
int | 0 | Segundos entre reintentos. |
timeout_sec |
int | 0 (sin timeout) | Timeout del step. |
output |
string | "" | Nombre de variable donde guardar stdout (consumible por dependientes). |
tags |
list[string] | [] | Tags por step (UI). |
Function steps (coherencia con el registry)
Un DAG idiomatico llama funciones del registry, no scripts ad-hoc. Cada step function: queda trazado en call_monitor.calls por el hook PostToolUse del agente y en dag_step_results.function_id del propio dag_engine — el bucle reactivo (issue 0085) tiene visibilidad end-to-end.
steps:
- name: audit_capabilities
function: audit_capability_groups_go_infra
args: ["--json"]
description: "Audita drift entre tags de capability group y paginas madre"
Ventajas vs command: ./fn run ...:
function_idse persiste como columna dedicada endag_step_results(filtrable, agrupable).- El frontend
dag_engine_uimuestra badge + panel lateral conuses_functions(subfunciones que el step va a usar transitivamente). - API:
GET /api/functions/{id}devuelve{id, name, description, signature, purity, domain, lang, uses_functions[], uses_types[]}leyendoregistry.dbread-only. La UI consume este endpoint al expandir un step. - Validator regex en
dag_validate:^[a-z0-9_]+_[a-z]+_[a-z]+$. ID invalido = error. - Variables de entorno:
FN_REGISTRY_ROOT(default/home/lucas/fn_registry) localiza el binariofn. Override conFN_BIN=/path/al/fn. FN_REGISTRY_ROOTobligatorio cuando el servicio corre via systemd conWorkingDirectoryfuera del root del registry. El binariofnresuelveregistry.dbpor (1) env var, (2) walk-up buscandogo.mod, (3) exe dir. Si (1) no esta y (2) encuentra elgo.moddel propio servicio (ej.apps/dag_engine/go.mod), devuelve un dir donderegistry.dbno existe o esta stale, fallando conerror: function "<id>" not found. Bug historico:apps/dag_engine/registry.dbstale (May 15) tumbo 3 nochesfn_backup+daily-registry-audit. Defensa en profundidad: el executor exportaFN_REGISTRY_ROOTy hacecd $FN_REGISTRY_ROOTantes del spawn de stepsfunction:(executor.go), pero elEnvironment=FN_REGISTRY_ROOT=...del systemd unit sigue siendo la fuente de verdad.PATHen el systemd unit: si stepsfunction:invocan funciones Go sin tests (go vet) o Python (python3), elPATHdel entorno systemd debe incluir esos binarios — declararEnvironment=PATH=/usr/local/go/bin:/home/lucas/go/bin:/home/lucas/.local/bin:/usr/local/bin:/usr/bin:/bin.
Ejemplo completo: apps/dag_engine/dags_migrated/daily-registry-audit.yaml.
Cron schedule
5 campos clasicos: min hour dom mon dow. Ejemplos:
| Expresion | Significado |
|---|---|
0 9 * * * |
Todos los dias a las 09:00 |
*/15 * * * * |
Cada 15 minutos |
0 */6 * * * |
Cada 6 horas en punto |
0 9 * * 1-5 |
09:00 lunes-viernes |
0 21 * * 5 |
21:00 viernes |
Multiples cron en schedule: -> el DAG dispara por cada uno.
4. Comandos CLI
./dag_engine run <path.yaml> # ejecuta un DAG ad-hoc
./dag_engine list [dir] # lista DAGs con schedule + ultimo status
./dag_engine status [dag_name] # historial de ejecuciones
./dag_engine validate <path.yaml> # parse + validate (no ejecuta)
./dag_engine server # arranca HTTP + WS hub + frontend embebido
Flags del server:
| Flag | Default | Que |
|---|---|---|
--port |
8090 | Puerto HTTP. |
--dags-dir |
apps/dag_engine/dags_migrated (via systemd unit) |
Dir scaneado para YAMLs. |
--db |
dag_engine.db |
SQLite con dag_runs + dag_step_results. |
--scheduler |
false | Si presente, arranca cron tickers automaticamente. |
5. Que hacer si algo falla
5.1. El DAG no aparece en la UI
Sintoma: anadiste un YAML pero GET /api/dags no lo lista.
| Causa | Diagnostico | Fix |
|---|---|---|
| YAML invalido | ./dag_engine validate <path> muestra el error. |
Corregir segun el mensaje (campo desconocido, indentacion, type wrong). |
Filename con extension fuera de .yaml/.yml |
ls apps/dag_engine/dags_migrated/ |
Renombrar a .yaml. |
| El servidor apunta a otro dir | systemctl --user cat dag_engine.service -> ver --dags-dir. |
Ajustar unit y daemon-reload + restart. |
| Cache UI antiguo | C++: pulsa Refresh. Web: Ctrl+F5. |
— |
5.2. Validation: FAIL
validate muestra parse error: ... o Validation: FAIL. Causas tipicas:
| Mensaje | Causa | Fix |
|---|---|---|
yaml unmarshal: ... |
Sintaxis YAML rota (indentacion, tab vs espacios). | Usar 2 espacios consistentes. Validar online con yamllint. |
dag_parse: step[N]: name is required |
Step sin name:. |
Anadir name:. |
dag_parse: step[N]: command or script required |
Step sin command ni script. |
Anadir uno de los dos. |
cycle detected: A -> B -> A |
depends forma ciclo. |
Romper la dependencia o convertir uno de los nodos en step distinto. |
unknown depends: <step> |
depends: referencia un step inexistente. |
Comprobar nombres exactos (case-sensitive). |
invalid cron: <expr> |
Cron mal formado (4 o 6 campos en vez de 5). | Verificar 0 9 * * * (5 campos). |
5.3. El DAG corre pero un step falla
Sintoma: status: failed en la UI.
- Abre
DAG Detaily haz doble-click en el run rojo ->Run Detail. - Expande el step que fallo (CollapsingHeader). Muestra
stdout+stderr. - Errores tipicos:
| stderr | Causa | Fix |
|---|---|---|
command not found |
command: apunta a un binario fuera de PATH. |
Path absoluto o setear env: [PATH: ...]. |
permission denied |
Script sin chmod +x. |
chmod +x <script> (o usar bash <script>). |
no such file or directory |
working_dir: mal o ruta relativa rota. |
Path absoluto en working_dir:. |
| Timeout | Step duro mas que timeout_sec. |
Subir el limite o partir el step. |
| Exit 137 / OOM kill | Out-of-memory. | Reducir batch o anadir swap. |
5.4. El scheduler no dispara
Sintoma: Hay schedule: valido pero el DAG no corre solo.
- Verifica que el server arranco con
--scheduler:systemctl --user cat dag_engine.service | grep scheduler - Logs:
Debes ver
journalctl --user-unit dag_engine.service -n 50 --no-pager | grep -E "scheduler|ticker"[scheduler] ticker started for <name> (<cron>), next: <ISO8601>. - Si
next:es muy lejano (ej. en una semana) y necesitas probar -> dispara manual:curl -X POST http://127.0.0.1:8090/api/dags/<name>/run - Hora del sistema descalibrada:
Si difiere de la hora real,
timedatectl statussudo timedatectl set-ntp true.
5.5. El frontend C++ no conecta WS
Sintoma: Panel Live (WS) muestra disconnected.
| Causa | Fix |
|---|---|
| Servidor caido | systemctl --user status dag_engine.service, restart si inactive. |
| Puerto cambiado | El cliente apunta a 127.0.0.1:8090 por codigo (constante g_ws_port). Reedificar si cambiaste el puerto del server. |
| Firewall Windows -> WSL | WSL2 expone localhost, normalmente OK. Si falla: wsl --shutdown y reabrir. |
5.6. Cleanup de runs viejos
dag_runs y dag_step_results crecen sin limite. Para limpiar:
sqlite3 apps/dag_engine/dag_engine.db <<'SQL'
DELETE FROM dag_step_results WHERE run_id IN (
SELECT id FROM dag_runs WHERE started_at < datetime('now', '-30 days')
);
DELETE FROM dag_runs WHERE started_at < datetime('now', '-30 days');
VACUUM;
SQL
5.7. Restaurar desde backup
Si rompes dags_migrated/, recupera desde el snapshot de backup_all_bash_pipelines (BACKUP_ROOT por defecto ~/backups/fn_registry):
cp ~/backups/fn_registry/registry/daily.0/dags_migrated/*.yaml \
apps/dag_engine/dags_migrated/ 2>/dev/null || \
git checkout HEAD -- apps/dag_engine/dags_migrated/
systemctl --user restart dag_engine.service
6. Endpoints HTTP
| Metodo | Path | Que |
|---|---|---|
| GET | /api/dags |
Lista DAGs + last_run + last_runs[5]. |
| GET | /api/dags/{name} |
Detalle + validation. |
| POST | /api/dags/{name}/run |
Dispara ejecucion (trigger=api). Devuelve run_id. |
| GET | /api/runs |
Historial. Query: dag, limit, offset. |
| GET | /api/runs/{id} |
Detalle de un run + sus step_results. |
| GET | /api/ws/dagruns |
WebSocket. Snapshot + deltas en vivo (issue 0095). |
| GET | /api/scheduler/status |
Tickers activos. |
| POST | /api/scheduler/start |
Arranca scheduler (si no estaba). |
| POST | /api/scheduler/stop |
Para scheduler. |
7. Referencias
- Schema parser:
functions/core/dag_parse.go(frontmatter endag_parse_go_core). - Validator:
functions/core/dag_validate.go(dag_validate_go_core). - Topo sort:
functions/core/dag_topo_sort.go(dag_topo_sort_go_core). - Cron:
functions/core/parse_cron_expr.go+next_cron_time.go. - Frontend C++:
cpp/apps/dag_engine_ui/(issue 0095). - WS hub:
apps/dag_engine/events.go. - dag_engine es el scheduler oficial del ecosistema. Single-binary Go + SQLite, sin dependencias externas.