fad4006f60
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
159 lines
5.1 KiB
Markdown
159 lines
5.1 KiB
Markdown
---
|
|
id: "0007e"
|
|
title: "DAG executor app: CLI/TUI que reemplaza Dagu"
|
|
status: completado
|
|
type: feature
|
|
domain: []
|
|
scope: app-scoped
|
|
priority: alta
|
|
depends: []
|
|
blocks: []
|
|
related: []
|
|
created: 2026-05-17
|
|
updated: 2026-05-17
|
|
tags: []
|
|
---
|
|
# 0007e — DAG executor app: CLI/TUI que reemplaza Dagu
|
|
|
|
## Metadata
|
|
|
|
| Campo | Valor |
|
|
|-------|-------|
|
|
| **ID** | 0007e |
|
|
| **Estado** | pendiente |
|
|
| **Prioridad** | alta |
|
|
| **Tipo** | feature |
|
|
|
|
## Dependencias
|
|
|
|
| ID | Título | Estado | Requerido |
|
|
|----|--------|--------|-----------|
|
|
| 0007a | Funciones core del DAG engine | pendiente | Si |
|
|
| 0007b | Process manager | pendiente | Si |
|
|
| 0007c | Execution store | pendiente | Si |
|
|
| 0007d | Scheduler | pendiente | Si |
|
|
|
|
**Bloqueada por:** `#0007a, #0007b, #0007c, #0007d`
|
|
|
|
**Desbloquea:** ninguna
|
|
|
|
---
|
|
|
|
## Objetivo
|
|
|
|
App que compone todas las funciones de 0007a-d en un ejecutable unico que reemplaza a Dagu: lee DAGs YAML, los ejecuta con dependencias, persiste estado, y opcionalmente corre como daemon con scheduler.
|
|
|
|
## Contexto
|
|
|
|
- Vive en `apps/dag_engine/` (es una app, no una funcion reutilizable)
|
|
- Lee DAGs del directorio `~/dagu/dags/` (o configurable)
|
|
- El executor es el nucleo: toma un DagDefinition, lanza steps en orden topologico, gestiona paralelismo
|
|
- Modos: `run` (ejecuta un DAG), `start` (daemon con scheduler), `status` (consulta ejecuciones), `list` (lista DAGs)
|
|
|
|
## Arquitectura
|
|
|
|
```
|
|
apps/dag_engine/
|
|
├── app.md — metadata del registry
|
|
├── main.go — CLI: subcomandos run/start/status/list
|
|
├── executor.go — compone dag_topo_sort + process_spawn/wait + store
|
|
├── server.go — (futuro) HTTP API para trigger remoto
|
|
├── go.mod
|
|
├── .gitignore
|
|
```
|
|
|
|
### Patron pure core / impure shell
|
|
|
|
- `core/` — ya creadas en 0007a y 0007d (funciones puras del registry)
|
|
- `infra/` — ya creadas en 0007b y 0007c (funciones impuras del registry)
|
|
- `app/` — `executor.go` compone todo, `main.go` orquesta
|
|
|
|
## Tareas
|
|
|
|
### Fase 1: Executor
|
|
|
|
- [ ] **1.1** `executor.go` — funcion `ExecuteDAG(dag DagDefinition, store DB) DagRun`
|
|
- Crea run en store
|
|
- Resuelve env
|
|
- Ordena steps (topo sort)
|
|
- Ejecuta nivel por nivel: steps del mismo nivel van en paralelo (goroutines)
|
|
- Cada step: spawn → wait → guarda result en store
|
|
- Si un step falla: cancela dependientes, marca run como failed
|
|
- Retorna DagRun con resultado final
|
|
|
|
### Fase 2: CLI
|
|
|
|
- [ ] **2.1** `fn-dag run <path.yaml>` — parsea, valida, ejecuta, muestra resultado
|
|
- [ ] **2.2** `fn-dag list [dir]` — lista DAGs con su schedule y ultimo estado
|
|
- [ ] **2.3** `fn-dag status [dag_name]` — ultimas ejecuciones, detalle de steps
|
|
- [ ] **2.4** `fn-dag start [dir]` — daemon: carga todos los DAGs, arranca ticker
|
|
|
|
### Fase 3: Integracion
|
|
|
|
- [ ] **3.1** `app.md` con uses_functions referenciando todas las funciones de 0007a-d
|
|
- [ ] **3.2** `operations.db` inicializado (fn ops init)
|
|
- [ ] **3.3** Publicar en Gitea (dataforge/dag_engine)
|
|
|
|
### Fase 4: Tests e2e
|
|
|
|
- [ ] **4.1** Ejecutar DAGs existentes de `~/dagu/dags/` y comparar resultado con Dagu
|
|
- [ ] **4.2** Test: DAG con steps paralelos, DAG con fallo en medio, DAG con timeout
|
|
|
|
### Fase 5: Cleanup
|
|
|
|
- [ ] `fn index`
|
|
- [ ] Actualizar CLAUDE.md con documentacion del dag engine
|
|
|
|
---
|
|
|
|
## Ejemplo de uso
|
|
|
|
```bash
|
|
# Ejecutar un DAG
|
|
fn-dag run ~/dagu/dags/example.yaml
|
|
# Step hello... done (0.1s)
|
|
# Step list_files... done (0.2s)
|
|
# Step date... done (0.1s)
|
|
# Run completed: 3/3 steps succeeded (0.4s)
|
|
|
|
# Listar DAGs
|
|
fn-dag list ~/dagu/dags/
|
|
# NAME SCHEDULE LAST RUN STATUS
|
|
# example 0 9 * * * 2026-04-07 success
|
|
# example_lineage_tracking 0 */6 * * * 2026-04-08 failed
|
|
|
|
# Ver estado
|
|
fn-dag status example
|
|
# RUN_ID STARTED STATUS STEPS
|
|
# 01HXZ... 2026-04-08 09:00:01 success 3/3
|
|
# 01HXY... 2026-04-07 09:00:00 success 3/3
|
|
|
|
# Daemon con scheduler
|
|
fn-dag start ~/dagu/dags/
|
|
# [09:00] Scheduler started. Watching 5 DAGs.
|
|
# [09:00] Triggered: example (schedule match)
|
|
# ...
|
|
```
|
|
|
|
## Decisiones de diseno
|
|
|
|
- **Un binario, no un servicio**: `fn-dag run` es fire-and-forget. `fn-dag start` es el unico modo daemon.
|
|
- **Paralelismo por niveles**: steps en el mismo nivel topologico corren en goroutines, no hay limite de concurrencia (por ahora)
|
|
- **Compatible con DAGs de Dagu**: lee el mismo formato YAML, no requiere migracion
|
|
- **Sin web UI por ahora**: la TUI y/o web UI es un issue futuro, el CLI cubre el 80% del uso
|
|
|
|
## Riesgos
|
|
|
|
- **Riesgo**: DAGs complejos de Dagu usan features que no implementamos (preconditions, params, mail on failure). **Mitigacion**: empezar con el subset que usamos, documentar que no se soporta.
|
|
- **Riesgo**: Race conditions en el executor paralelo. **Mitigacion**: cada goroutine tiene su propio ProcessHandle, el store usa transacciones SQLite.
|
|
|
|
## Criterios de aceptacion
|
|
|
|
- [ ] `fn-dag run` ejecuta correctamente los DAGs existentes
|
|
- [ ] Steps paralelos se ejecutan concurrentemente
|
|
- [ ] Fallos en un step cancelan dependientes
|
|
- [ ] Estado se persiste en SQLite
|
|
- [ ] `fn-dag start` corre como daemon con scheduler
|
|
- [ ] App registrada en registry.db e indexada
|
|
- [ ] Publicada en Gitea
|