--- id: "0007e" title: "DAG executor app: CLI/TUI que reemplaza Dagu" status: completado type: feature domain: [] scope: app-scoped priority: alta depends: [] blocks: [] related: [] created: 2026-05-17 updated: 2026-05-17 tags: [] --- # 0007e — DAG executor app: CLI/TUI que reemplaza Dagu ## Metadata | Campo | Valor | |-------|-------| | **ID** | 0007e | | **Estado** | pendiente | | **Prioridad** | alta | | **Tipo** | feature | ## Dependencias | ID | Título | Estado | Requerido | |----|--------|--------|-----------| | 0007a | Funciones core del DAG engine | pendiente | Si | | 0007b | Process manager | pendiente | Si | | 0007c | Execution store | pendiente | Si | | 0007d | Scheduler | pendiente | Si | **Bloqueada por:** `#0007a, #0007b, #0007c, #0007d` **Desbloquea:** ninguna --- ## Objetivo App que compone todas las funciones de 0007a-d en un ejecutable unico que reemplaza a Dagu: lee DAGs YAML, los ejecuta con dependencias, persiste estado, y opcionalmente corre como daemon con scheduler. ## Contexto - Vive en `apps/dag_engine/` (es una app, no una funcion reutilizable) - Lee DAGs del directorio `~/dagu/dags/` (o configurable) - El executor es el nucleo: toma un DagDefinition, lanza steps en orden topologico, gestiona paralelismo - Modos: `run` (ejecuta un DAG), `start` (daemon con scheduler), `status` (consulta ejecuciones), `list` (lista DAGs) ## Arquitectura ``` apps/dag_engine/ ├── app.md — metadata del registry ├── main.go — CLI: subcomandos run/start/status/list ├── executor.go — compone dag_topo_sort + process_spawn/wait + store ├── server.go — (futuro) HTTP API para trigger remoto ├── go.mod ├── .gitignore ``` ### Patron pure core / impure shell - `core/` — ya creadas en 0007a y 0007d (funciones puras del registry) - `infra/` — ya creadas en 0007b y 0007c (funciones impuras del registry) - `app/` — `executor.go` compone todo, `main.go` orquesta ## Tareas ### Fase 1: Executor - [ ] **1.1** `executor.go` — funcion `ExecuteDAG(dag DagDefinition, store DB) DagRun` - Crea run en store - Resuelve env - Ordena steps (topo sort) - Ejecuta nivel por nivel: steps del mismo nivel van en paralelo (goroutines) - Cada step: spawn → wait → guarda result en store - Si un step falla: cancela dependientes, marca run como failed - Retorna DagRun con resultado final ### Fase 2: CLI - [ ] **2.1** `fn-dag run ` — parsea, valida, ejecuta, muestra resultado - [ ] **2.2** `fn-dag list [dir]` — lista DAGs con su schedule y ultimo estado - [ ] **2.3** `fn-dag status [dag_name]` — ultimas ejecuciones, detalle de steps - [ ] **2.4** `fn-dag start [dir]` — daemon: carga todos los DAGs, arranca ticker ### Fase 3: Integracion - [ ] **3.1** `app.md` con uses_functions referenciando todas las funciones de 0007a-d - [ ] **3.2** `operations.db` inicializado (fn ops init) - [ ] **3.3** Publicar en Gitea (dataforge/dag_engine) ### Fase 4: Tests e2e - [ ] **4.1** Ejecutar DAGs existentes de `~/dagu/dags/` y comparar resultado con Dagu - [ ] **4.2** Test: DAG con steps paralelos, DAG con fallo en medio, DAG con timeout ### Fase 5: Cleanup - [ ] `fn index` - [ ] Actualizar CLAUDE.md con documentacion del dag engine --- ## Ejemplo de uso ```bash # Ejecutar un DAG fn-dag run ~/dagu/dags/example.yaml # Step hello... done (0.1s) # Step list_files... done (0.2s) # Step date... done (0.1s) # Run completed: 3/3 steps succeeded (0.4s) # Listar DAGs fn-dag list ~/dagu/dags/ # NAME SCHEDULE LAST RUN STATUS # example 0 9 * * * 2026-04-07 success # example_lineage_tracking 0 */6 * * * 2026-04-08 failed # Ver estado fn-dag status example # RUN_ID STARTED STATUS STEPS # 01HXZ... 2026-04-08 09:00:01 success 3/3 # 01HXY... 2026-04-07 09:00:00 success 3/3 # Daemon con scheduler fn-dag start ~/dagu/dags/ # [09:00] Scheduler started. Watching 5 DAGs. # [09:00] Triggered: example (schedule match) # ... ``` ## Decisiones de diseno - **Un binario, no un servicio**: `fn-dag run` es fire-and-forget. `fn-dag start` es el unico modo daemon. - **Paralelismo por niveles**: steps en el mismo nivel topologico corren en goroutines, no hay limite de concurrencia (por ahora) - **Compatible con DAGs de Dagu**: lee el mismo formato YAML, no requiere migracion - **Sin web UI por ahora**: la TUI y/o web UI es un issue futuro, el CLI cubre el 80% del uso ## Riesgos - **Riesgo**: DAGs complejos de Dagu usan features que no implementamos (preconditions, params, mail on failure). **Mitigacion**: empezar con el subset que usamos, documentar que no se soporta. - **Riesgo**: Race conditions en el executor paralelo. **Mitigacion**: cada goroutine tiene su propio ProcessHandle, el store usa transacciones SQLite. ## Criterios de aceptacion - [ ] `fn-dag run` ejecuta correctamente los DAGs existentes - [ ] Steps paralelos se ejecutan concurrentemente - [ ] Fallos en un step cancelan dependientes - [ ] Estado se persiste en SQLite - [ ] `fn-dag start` corre como daemon con scheduler - [ ] App registrada en registry.db e indexada - [ ] Publicada en Gitea