Files
fn_registry/apps/dag_engine/README.md
T
egutierrez 4461875b18 chore: auto-commit (15 archivos)
- apps/dag_engine/.gitignore
- apps/dag_engine/README.md
- apps/dag_engine/app.md
- apps/dag_engine/config.go
- apps/dag_engine/dags_migrated/fn_backup.yaml
- apps/dag_engine/dags_migrated/revision_viernes_finanzas.yaml
- apps/dag_engine/executor.go
- apps/dag_engine/frontend/package.json
- apps/dag_engine/frontend/src/App.tsx
- apps/dag_engine/frontend/src/components/StatusBadge.tsx
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-02 23:00:46 +02:00

15 KiB

dag_engine — Guia de uso

Motor de DAGs propio del fn_registry. Scheduler oficial del ecosistema (issue 0007a-e + flow 0001). Backend Go + frontend web (Vite/React) + frontend C++ ImGui (cpp/apps/dag_engine_ui).

Doc canonica para anadir DAGs, formato YAML, comandos CLI, y diagnostico de fallos.


1. Donde viven los DAGs

Path Que
apps/dag_engine/dags/ DAGs activos servidos por dag_engine.service (systemd user unit).

Por defecto el systemd unit apunta a apps/dag_engine/dags/. Para usar otro dir, edita ~/.config/systemd/user/dag_engine.service:

ExecStart=/home/lucas/fn_registry/apps/dag_engine/dag_engine server \
    --port 8090 \
    --dags-dir /home/lucas/fn_registry/apps/dag_engine/dags \
    --db     /home/lucas/fn_registry/apps/dag_engine/dag_engine.db \
    --scheduler

Y reload + restart:

systemctl --user daemon-reload
systemctl --user restart dag_engine.service

2. Anadir un DAG nuevo (workflow)

Paso a paso

  1. Crear YAML en apps/dag_engine/dags/<nombre>.yaml (ver formato en seccion 3).
  2. Validar sin ejecutar:
    ./apps/dag_engine/dag_engine validate apps/dag_engine/dags/<nombre>.yaml
    
    Salida esperada: Validation: PASS. Si falla, ver seccion 5 (diagnostico).
  3. Probar ejecucion manual una vez:
    ./apps/dag_engine/dag_engine run apps/dag_engine/dags/<nombre>.yaml
    
  4. Recargar scheduler (toma el YAML automaticamente al iterar el dir):
    systemctl --user restart dag_engine.service
    journalctl --user-unit dag_engine.service -n 30 --no-pager
    
    Busca la linea [scheduler] ticker started for <nombre> (<cron>) en los logs.
  5. Verificar en frontend:
    • C++ ImGui: panel DAGs muestra el nuevo DAG. Pulsa Refresh si no aparece.
    • Web: http://localhost:8090.

Disparo manual desde curl o frontend

curl -X POST http://127.0.0.1:8090/api/dags/<nombre>/run

Devuelve {"dag":"<nombre>","run_id":"...","status":"accepted"} y dispara el WS broadcast — los frontends ven la run en <1s.


3. Formato YAML

Formato YAML propio de dag_engine. Schema: name, description, schedule, env, tags, working_dir, steps[], handlers (alias handler_on).

Ejemplo completo

name: my_pipeline
description: "Pipeline diario que importa CSV y actualiza Metabase."
group: finanzas              # opcional, agrupa DAGs en listados
type: graph                  # opcional: graph (default) | chain
tags: [daily, csv, metabase] # opcional, filtros en la UI

# Variables de entorno (heredadas por todos los steps).
env:
  - DATA_DIR: /home/lucas/data
  - SLACK_HOOK: ${SLACK_HOOK_PROD}    # interpolacion de ENV del host

# Cron schedule. Puede ser string o lista.
schedule:
  - "0 9 * * *"              # 09:00 todos los dias
  - "0 21 * * 5"             # 21:00 viernes (segundo trigger)

# Working dir + shell por defecto para todos los steps.
working_dir: /home/lucas/fn_registry
shell: /bin/bash
timeout_sec: 1800            # 30 min para todo el DAG

steps:
  - name: ingest
    description: "Descarga CSV."
    command: ./bash/functions/pipelines/ingest_csv.sh
    timeout_sec: 300         # 5 min para este step
    env:
      - SOURCE_URL: https://example.com/data.csv

  - name: transform
    description: "Limpieza y agregacion."
    script: |
      #!/usr/bin/env python3
      import pandas as pd
      df = pd.read_csv("$DATA_DIR/raw.csv")
      df.to_parquet("$DATA_DIR/clean.parquet")
    depends: [ingest]        # debe terminar OK antes
    retry_policy:
      limit: 2               # reintentos en caso de fallo
      interval_sec: 60

  - name: load_metabase
    command: ./bash/functions/metabase/refresh_dashboard.sh
    depends: [transform]
    continue_on:
      failure: true          # no aborta el DAG aunque falle

  - name: notify
    command: ./bash/functions/io/slack_send.sh "pipeline OK"
    depends: [load_metabase]

# Hooks de ciclo de vida.
handler_on:
  success: ./bash/functions/io/notify_success.sh
  failure: ./bash/functions/io/notify_failure.sh
  exit:    ./bash/functions/io/cleanup.sh

Campos del DAG (top-level)

Campo Tipo Default Que
name string (obligatorio) Identificador unico. Debe matchear el filename sin extension.
description string "" Texto libre, aparece en la UI.
group string "" Agrupa DAGs en listados.
type string "" (graph) graph o chain. graph = grafo dirigido por depends. chain = ejecucion secuencial implicita.
working_dir string cwd del server Path absoluto desde donde lanzar los steps.
shell string /bin/sh Shell para command:.
env list/map [] Variables de entorno DAG-wide.
schedule string/list "" Cron expressions (5 campos: min hour dom mon dow). Vacio = solo manual.
steps list (obligatorio) Pasos del DAG (>=1).
handler_on map null Hooks init/success/failure/exit. Alias: handlers.
tags list[string] [] Filtros en la UI.
timeout_sec int 0 (sin timeout) Timeout global del DAG en segundos.

Campos de cada step

Campo Tipo Default Que
name string (obligatorio) Identificador del step dentro del DAG.
id string "" Override del id auto-generado.
description string "" Texto libre.
command string "" Comando shell (mutuamente excluyente con script/function).
script string "" Bloque heredoc. Util para Python/Lua inline.
function string "" ID de funcion del registry (ej audit_capability_groups_go_infra). Si set, executor invoca ${FN_REGISTRY_ROOT}/fn run <id> <args...> y captura function_id en dag_step_results. Mutuamente exclusivo con command/script; si convive, gana function.
args list[string] [] Args extra para command o para la function.
shell string hereda Override del shell.
dir / working_dir string hereda Working dir para este step.
depends list[string] [] Steps que deben terminar OK antes. Si vacio + type:graph, arranca en paralelo.
env list/map hereda Env del step (sobrescribe el del DAG).
continue_on.failure bool false Si true, el DAG sigue aunque este step falle.
continue_on.skipped bool false Si true, dependientes corren aunque este step quede skipped.
retry_policy.limit int 0 Reintentos.
retry_policy.interval_sec int 0 Segundos entre reintentos.
timeout_sec int 0 (sin timeout) Timeout del step.
output string "" Nombre de variable donde guardar stdout (consumible por dependientes).
tags list[string] [] Tags por step (UI).

Function steps (coherencia con el registry)

Un DAG idiomatico llama funciones del registry, no scripts ad-hoc. Cada step function: queda trazado en call_monitor.calls por el hook PostToolUse del agente y en dag_step_results.function_id del propio dag_engine — el bucle reactivo (issue 0085) tiene visibilidad end-to-end.

steps:
  - name: audit_capabilities
    function: audit_capability_groups_go_infra
    args: ["--json"]
    description: "Audita drift entre tags de capability group y paginas madre"

Ventajas vs command: ./fn run ...:

  • function_id se persiste como columna dedicada en dag_step_results (filtrable, agrupable).
  • El frontend dag_engine_ui muestra badge + panel lateral con uses_functions (subfunciones que el step va a usar transitivamente).
  • API: GET /api/functions/{id} devuelve {id, name, description, signature, purity, domain, lang, uses_functions[], uses_types[]} leyendo registry.db read-only. La UI consume este endpoint al expandir un step.
  • Validator regex en dag_validate: ^[a-z0-9_]+_[a-z]+_[a-z]+$. ID invalido = error.
  • Variables de entorno: FN_REGISTRY_ROOT (default /home/lucas/fn_registry) localiza el binario fn. Override con FN_BIN=/path/al/fn.
  • FN_REGISTRY_ROOT obligatorio cuando el servicio corre via systemd con WorkingDirectory fuera del root del registry. El binario fn resuelve registry.db por (1) env var, (2) walk-up buscando go.mod, (3) exe dir. Si (1) no esta y (2) encuentra el go.mod del propio servicio (ej. apps/dag_engine/go.mod), devuelve un dir donde registry.db no existe o esta stale, fallando con error: function "<id>" not found. Bug historico: apps/dag_engine/registry.db stale (May 15) tumbo 3 noches fn_backup + daily-registry-audit. Defensa en profundidad: el executor exporta FN_REGISTRY_ROOT y hace cd $FN_REGISTRY_ROOT antes del spawn de steps function: (executor.go), pero el Environment=FN_REGISTRY_ROOT=... del systemd unit sigue siendo la fuente de verdad.
  • PATH en el systemd unit: si steps function: invocan funciones Go sin tests (go vet) o Python (python3), el PATH del entorno systemd debe incluir esos binarios — declarar Environment=PATH=/usr/local/go/bin:/home/lucas/go/bin:/home/lucas/.local/bin:/usr/local/bin:/usr/bin:/bin.

Ejemplo completo: apps/dag_engine/dags/daily-registry-audit.yaml.

Cron schedule

5 campos clasicos: min hour dom mon dow. Ejemplos:

Expresion Significado
0 9 * * * Todos los dias a las 09:00
*/15 * * * * Cada 15 minutos
0 */6 * * * Cada 6 horas en punto
0 9 * * 1-5 09:00 lunes-viernes
0 21 * * 5 21:00 viernes

Multiples cron en schedule: -> el DAG dispara por cada uno.


4. Comandos CLI

./dag_engine run <path.yaml>       # ejecuta un DAG ad-hoc
./dag_engine list [dir]            # lista DAGs con schedule + ultimo status
./dag_engine status [dag_name]     # historial de ejecuciones
./dag_engine validate <path.yaml>  # parse + validate (no ejecuta)
./dag_engine server                # arranca HTTP + WS hub + frontend embebido

Flags del server:

Flag Default Que
--port 8090 Puerto HTTP.
--dags-dir apps/dag_engine/dags (via systemd unit) Dir scaneado para YAMLs.
--db dag_engine.db SQLite con dag_runs + dag_step_results.
--scheduler false Si presente, arranca cron tickers automaticamente.

5. Que hacer si algo falla

5.1. El DAG no aparece en la UI

Sintoma: anadiste un YAML pero GET /api/dags no lo lista.

Causa Diagnostico Fix
YAML invalido ./dag_engine validate <path> muestra el error. Corregir segun el mensaje (campo desconocido, indentacion, type wrong).
Filename con extension fuera de .yaml/.yml ls apps/dag_engine/dags/ Renombrar a .yaml.
El servidor apunta a otro dir systemctl --user cat dag_engine.service -> ver --dags-dir. Ajustar unit y daemon-reload + restart.
Cache UI antiguo C++: pulsa Refresh. Web: Ctrl+F5.

5.2. Validation: FAIL

validate muestra parse error: ... o Validation: FAIL. Causas tipicas:

Mensaje Causa Fix
yaml unmarshal: ... Sintaxis YAML rota (indentacion, tab vs espacios). Usar 2 espacios consistentes. Validar online con yamllint.
dag_parse: step[N]: name is required Step sin name:. Anadir name:.
dag_parse: step[N]: command or script required Step sin command ni script. Anadir uno de los dos.
cycle detected: A -> B -> A depends forma ciclo. Romper la dependencia o convertir uno de los nodos en step distinto.
unknown depends: <step> depends: referencia un step inexistente. Comprobar nombres exactos (case-sensitive).
invalid cron: <expr> Cron mal formado (4 o 6 campos en vez de 5). Verificar 0 9 * * * (5 campos).

5.3. El DAG corre pero un step falla

Sintoma: status: failed en la UI.

  1. Abre DAG Detail y haz doble-click en el run rojo -> Run Detail.
  2. Expande el step que fallo (CollapsingHeader). Muestra stdout + stderr.
  3. Errores tipicos:
stderr Causa Fix
command not found command: apunta a un binario fuera de PATH. Path absoluto o setear env: [PATH: ...].
permission denied Script sin chmod +x. chmod +x <script> (o usar bash <script>).
no such file or directory working_dir: mal o ruta relativa rota. Path absoluto en working_dir:.
Timeout Step duro mas que timeout_sec. Subir el limite o partir el step.
Exit 137 / OOM kill Out-of-memory. Reducir batch o anadir swap.

5.4. El scheduler no dispara

Sintoma: Hay schedule: valido pero el DAG no corre solo.

  1. Verifica que el server arranco con --scheduler:
    systemctl --user cat dag_engine.service | grep scheduler
    
  2. Logs:
    journalctl --user-unit dag_engine.service -n 50 --no-pager | grep -E "scheduler|ticker"
    
    Debes ver [scheduler] ticker started for <name> (<cron>), next: <ISO8601>.
  3. Si next: es muy lejano (ej. en una semana) y necesitas probar -> dispara manual:
    curl -X POST http://127.0.0.1:8090/api/dags/<name>/run
    
  4. Hora del sistema descalibrada:
    timedatectl status
    
    Si difiere de la hora real, sudo timedatectl set-ntp true.

5.5. El frontend C++ no conecta WS

Sintoma: Panel Live (WS) muestra disconnected.

Causa Fix
Servidor caido systemctl --user status dag_engine.service, restart si inactive.
Puerto cambiado El cliente apunta a 127.0.0.1:8090 por codigo (constante g_ws_port). Reedificar si cambiaste el puerto del server.
Firewall Windows -> WSL WSL2 expone localhost, normalmente OK. Si falla: wsl --shutdown y reabrir.

5.6. Cleanup de runs viejos

dag_runs y dag_step_results crecen sin limite. Para limpiar:

sqlite3 apps/dag_engine/dag_engine.db <<'SQL'
DELETE FROM dag_step_results WHERE run_id IN (
    SELECT id FROM dag_runs WHERE started_at < datetime('now', '-30 days')
);
DELETE FROM dag_runs WHERE started_at < datetime('now', '-30 days');
VACUUM;
SQL

5.7. Restaurar desde backup

Si rompes dags/, recupera desde el snapshot de backup_all_bash_pipelines (BACKUP_ROOT por defecto ~/backups/fn_registry):

cp ~/backups/fn_registry/registry/daily.0/dags/*.yaml \
   apps/dag_engine/dags/ 2>/dev/null || \
   git checkout HEAD -- apps/dag_engine/dags/
systemctl --user restart dag_engine.service

6. Endpoints HTTP

Metodo Path Que
GET /api/dags Lista DAGs + last_run + last_runs[5].
GET /api/dags/{name} Detalle + validation.
POST /api/dags/{name}/run Dispara ejecucion (trigger=api). Devuelve run_id.
GET /api/runs Historial. Query: dag, limit, offset.
GET /api/runs/{id} Detalle de un run + sus step_results.
GET /api/ws/dagruns WebSocket. Snapshot + deltas en vivo (issue 0095).
GET /api/scheduler/status Tickers activos.
POST /api/scheduler/start Arranca scheduler (si no estaba).
POST /api/scheduler/stop Para scheduler.

7. Referencias

  • Schema parser: functions/core/dag_parse.go (frontmatter en dag_parse_go_core).
  • Validator: functions/core/dag_validate.go (dag_validate_go_core).
  • Topo sort: functions/core/dag_topo_sort.go (dag_topo_sort_go_core).
  • Cron: functions/core/parse_cron_expr.go + next_cron_time.go.
  • Frontend C++: cpp/apps/dag_engine_ui/ (issue 0095).
  • WS hub: apps/dag_engine/events.go.
  • dag_engine es el scheduler oficial del ecosistema. Single-binary Go + SQLite, sin dependencias externas.