Files
fn_registry/dev/issues/completed/0007e-dag-executor-app.md

159 lines
5.1 KiB
Markdown

---
id: "0007e"
title: "DAG executor app: CLI/TUI que reemplaza Dagu"
status: completado
type: feature
domain: []
scope: app-scoped
priority: alta
depends: []
blocks: []
related: []
created: 2026-05-17
updated: 2026-05-17
tags: []
---
# 0007e — DAG executor app: CLI/TUI que reemplaza Dagu
## Metadata
| Campo | Valor |
|-------|-------|
| **ID** | 0007e |
| **Estado** | pendiente |
| **Prioridad** | alta |
| **Tipo** | feature |
## Dependencias
| ID | Título | Estado | Requerido |
|----|--------|--------|-----------|
| 0007a | Funciones core del DAG engine | pendiente | Si |
| 0007b | Process manager | pendiente | Si |
| 0007c | Execution store | pendiente | Si |
| 0007d | Scheduler | pendiente | Si |
**Bloqueada por:** `#0007a, #0007b, #0007c, #0007d`
**Desbloquea:** ninguna
---
## Objetivo
App que compone todas las funciones de 0007a-d en un ejecutable unico que reemplaza a Dagu: lee DAGs YAML, los ejecuta con dependencias, persiste estado, y opcionalmente corre como daemon con scheduler.
## Contexto
- Vive en `apps/dag_engine/` (es una app, no una funcion reutilizable)
- Lee DAGs del directorio `~/dagu/dags/` (o configurable)
- El executor es el nucleo: toma un DagDefinition, lanza steps en orden topologico, gestiona paralelismo
- Modos: `run` (ejecuta un DAG), `start` (daemon con scheduler), `status` (consulta ejecuciones), `list` (lista DAGs)
## Arquitectura
```
apps/dag_engine/
├── app.md — metadata del registry
├── main.go — CLI: subcomandos run/start/status/list
├── executor.go — compone dag_topo_sort + process_spawn/wait + store
├── server.go — (futuro) HTTP API para trigger remoto
├── go.mod
├── .gitignore
```
### Patron pure core / impure shell
- `core/` — ya creadas en 0007a y 0007d (funciones puras del registry)
- `infra/` — ya creadas en 0007b y 0007c (funciones impuras del registry)
- `app/``executor.go` compone todo, `main.go` orquesta
## Tareas
### Fase 1: Executor
- [ ] **1.1** `executor.go` — funcion `ExecuteDAG(dag DagDefinition, store DB) DagRun`
- Crea run en store
- Resuelve env
- Ordena steps (topo sort)
- Ejecuta nivel por nivel: steps del mismo nivel van en paralelo (goroutines)
- Cada step: spawn → wait → guarda result en store
- Si un step falla: cancela dependientes, marca run como failed
- Retorna DagRun con resultado final
### Fase 2: CLI
- [ ] **2.1** `fn-dag run <path.yaml>` — parsea, valida, ejecuta, muestra resultado
- [ ] **2.2** `fn-dag list [dir]` — lista DAGs con su schedule y ultimo estado
- [ ] **2.3** `fn-dag status [dag_name]` — ultimas ejecuciones, detalle de steps
- [ ] **2.4** `fn-dag start [dir]` — daemon: carga todos los DAGs, arranca ticker
### Fase 3: Integracion
- [ ] **3.1** `app.md` con uses_functions referenciando todas las funciones de 0007a-d
- [ ] **3.2** `operations.db` inicializado (fn ops init)
- [ ] **3.3** Publicar en Gitea (dataforge/dag_engine)
### Fase 4: Tests e2e
- [ ] **4.1** Ejecutar DAGs existentes de `~/dagu/dags/` y comparar resultado con Dagu
- [ ] **4.2** Test: DAG con steps paralelos, DAG con fallo en medio, DAG con timeout
### Fase 5: Cleanup
- [ ] `fn index`
- [ ] Actualizar CLAUDE.md con documentacion del dag engine
---
## Ejemplo de uso
```bash
# Ejecutar un DAG
fn-dag run ~/dagu/dags/example.yaml
# Step hello... done (0.1s)
# Step list_files... done (0.2s)
# Step date... done (0.1s)
# Run completed: 3/3 steps succeeded (0.4s)
# Listar DAGs
fn-dag list ~/dagu/dags/
# NAME SCHEDULE LAST RUN STATUS
# example 0 9 * * * 2026-04-07 success
# example_lineage_tracking 0 */6 * * * 2026-04-08 failed
# Ver estado
fn-dag status example
# RUN_ID STARTED STATUS STEPS
# 01HXZ... 2026-04-08 09:00:01 success 3/3
# 01HXY... 2026-04-07 09:00:00 success 3/3
# Daemon con scheduler
fn-dag start ~/dagu/dags/
# [09:00] Scheduler started. Watching 5 DAGs.
# [09:00] Triggered: example (schedule match)
# ...
```
## Decisiones de diseno
- **Un binario, no un servicio**: `fn-dag run` es fire-and-forget. `fn-dag start` es el unico modo daemon.
- **Paralelismo por niveles**: steps en el mismo nivel topologico corren en goroutines, no hay limite de concurrencia (por ahora)
- **Compatible con DAGs de Dagu**: lee el mismo formato YAML, no requiere migracion
- **Sin web UI por ahora**: la TUI y/o web UI es un issue futuro, el CLI cubre el 80% del uso
## Riesgos
- **Riesgo**: DAGs complejos de Dagu usan features que no implementamos (preconditions, params, mail on failure). **Mitigacion**: empezar con el subset que usamos, documentar que no se soporta.
- **Riesgo**: Race conditions en el executor paralelo. **Mitigacion**: cada goroutine tiene su propio ProcessHandle, el store usa transacciones SQLite.
## Criterios de aceptacion
- [ ] `fn-dag run` ejecuta correctamente los DAGs existentes
- [ ] Steps paralelos se ejecutan concurrentemente
- [ ] Fallos en un step cancelan dependientes
- [ ] Estado se persiste en SQLite
- [ ] `fn-dag start` corre como daemon con scheduler
- [ ] App registrada en registry.db e indexada
- [ ] Publicada en Gitea