From d9b448a07bec3f7bedb252af32f7b6cd62027bd2 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Wed, 8 Apr 2026 01:18:19 +0200 Subject: [PATCH] feat: add DAG engine issues (0007a-e) and feature flag Desglose del sistema de orquestacion propio para reemplazar Dagu: - 0007a: core puro (parse, validate, topo sort) - 0007b: process manager (spawn, wait, kill) - 0007c: execution store (SQLite) - 0007d: scheduler (cron parser, ticker) - 0007e: app CLI que compone todo Co-Authored-By: Claude Opus 4.6 (1M context) --- dev/feature_flags.json | 7 ++ dev/issues/0007a-dag-core.md | 137 +++++++++++++++++++++++++ dev/issues/0007b-process-manager.md | 115 +++++++++++++++++++++ dev/issues/0007c-execution-store.md | 115 +++++++++++++++++++++ dev/issues/0007d-scheduler.md | 109 ++++++++++++++++++++ dev/issues/0007e-dag-executor-app.md | 143 +++++++++++++++++++++++++++ dev/issues/README.md | 15 +++ 7 files changed, 641 insertions(+) create mode 100644 dev/feature_flags.json create mode 100644 dev/issues/0007a-dag-core.md create mode 100644 dev/issues/0007b-process-manager.md create mode 100644 dev/issues/0007c-execution-store.md create mode 100644 dev/issues/0007d-scheduler.md create mode 100644 dev/issues/0007e-dag-executor-app.md create mode 100644 dev/issues/README.md diff --git a/dev/feature_flags.json b/dev/feature_flags.json new file mode 100644 index 00000000..376873b6 --- /dev/null +++ b/dev/feature_flags.json @@ -0,0 +1,7 @@ +{ + "dag-engine": { + "enabled": false, + "issue": "0007", + "description": "Sistema propio de orquestacion de DAGs para reemplazar Dagu. Incluye parser YAML, executor con paralelismo, process manager, execution store y scheduler cron." + } +} diff --git a/dev/issues/0007a-dag-core.md b/dev/issues/0007a-dag-core.md new file mode 100644 index 00000000..cc2981f1 --- /dev/null +++ b/dev/issues/0007a-dag-core.md @@ -0,0 +1,137 @@ +# 0007a — Funciones core del DAG engine + +## Metadata + +| Campo | Valor | +|-------|-------| +| **ID** | 0007a | +| **Estado** | pendiente | +| **Prioridad** | alta | +| **Tipo** | feature | + +## Dependencias + +| ID | Título | Estado | Requerido | +|----|--------|--------|-----------| +| — | Ninguna | — | — | + +**Bloqueada por:** ninguna + +**Desbloquea:** `#0007b, #0007c, #0007d, #0007e` + +--- + +## Objetivo + +Crear las funciones puras que parsean, validan y ordenan DAGs definidos en YAML. Estas funciones son el nucleo del sistema de orquestacion — todo lo demas depende de ellas. + +## Contexto + +- Dagu usa YAML con `steps`, `depends`, `env`, `schedule` — queremos compatibilidad con ese formato +- Las funciones deben ser puras: reciben datos, retornan datos, sin I/O +- Deben vivir en `functions/core/` (Go) para maxima composabilidad +- El formato YAML de Dagu existente en `~/dagu/dags/` debe poder parsearse sin cambios + +## Arquitectura + +``` +functions/core/ +├── dag_parse.go — NEW: YAML → DagDefinition +├── dag_parse.md — NEW: metadata +├── dag_validate.go — NEW: valida ciclos, refs rotas, campos requeridos +├── dag_validate.md — NEW: metadata +├── dag_topo_sort.go — NEW: ordena steps por dependencias (Kahn's algorithm) +├── dag_topo_sort.md — NEW: metadata +├── dag_resolve_env.go — NEW: sustituye variables ${VAR} en steps +├── dag_resolve_env.md — NEW: metadata + +types/core/ +├── dag_definition.md — NEW: tipo DagDefinition (product) +├── dag_step.md — NEW: tipo DagStep (product) +├── dag_schedule.md — NEW: tipo DagSchedule (product) +├── dag_result.md — NEW: tipo DagValidationResult (product) +``` + +### Patron pure core / impure shell + +- `core/` — Todas las funciones de este issue son puras +- No hay shell/impure en este issue +- Los tipos usan nativos de Go en firmas, tipos del registry en `uses_types` + +## Tareas + +### Fase 1: Tipos + +- [ ] **1.1** Definir `DagStep` — name, command, args, depends, env, timeout, retry, tags +- [ ] **1.2** Definir `DagSchedule` — cron expressions, timezone +- [ ] **1.3** Definir `DagDefinition` — name, description, steps, env, schedule, tags +- [ ] **1.4** Definir `DagValidationResult` — errors, warnings, step_order + +### Fase 2: Parser + +- [ ] **2.1** `dag_parse` — YAML bytes → DagDefinition. Soportar formato Dagu: steps con command/depends/env +- [ ] **2.2** Tests: parsear DAGs existentes de `~/dagu/dags/`, edge cases (YAML invalido, campos faltantes) + +### Fase 3: Validacion + +- [ ] **3.1** `dag_validate` — detectar ciclos (DFS), referencias rotas en depends, steps sin nombre, nombres duplicados +- [ ] **3.2** Tests: grafos ciclicos, DAGs validos, depends a steps inexistentes + +### Fase 4: Topological sort + +- [ ] **4.1** `dag_topo_sort` — Kahn's algorithm, retorna steps en orden de ejecucion con niveles de paralelismo +- [ ] **4.2** Tests: DAGs lineales, DAGs con ramas paralelas, diamond dependencies + +### Fase 5: Resolucion de env + +- [ ] **5.1** `dag_resolve_env` — sustituye `${VAR}` y `$VAR` en command/args de cada step usando env del DAG + env del step +- [ ] **5.2** Tests: variables anidadas, variables no definidas, escaping + +### Fase 6: Cleanup + +- [ ] `fn index` y verificar todos los IDs +- [ ] Verificar que todos los tipos son referenciados correctamente en uses_types + +--- + +## Ejemplo de uso + +```go +// Parsear un DAG +data, _ := os.ReadFile("dags/my_pipeline.yaml") +dag, err := dag_parse(data) + +// Validar +result := dag_validate(dag) +if len(result.Errors) > 0 { + // ciclos, refs rotas... +} + +// Ordenar +ordered := dag_topo_sort(dag.Steps) +// ordered = [[step_a], [step_b, step_c], [step_d]] +// nivel 0 nivel 1 (paralelo) nivel 2 + +// Resolver env +resolved := dag_resolve_env(dag, os.Environ()) +``` + +## Decisiones de diseno + +- **Kahn's algorithm sobre DFS topo sort**: Kahn's da niveles de paralelismo gratis — steps en el mismo nivel pueden ejecutarse en paralelo +- **Formato Dagu compatible**: no inventar formato nuevo, reutilizar el YAML que ya existe +- **Tipos nativos en firma**: `[]byte` entrada, structs con campos basicos, sin dependencias externas para parsear + +## Criterios de aceptacion + +- [ ] `dag_parse` parsea correctamente los DAGs existentes en `~/dagu/dags/` +- [ ] `dag_validate` detecta ciclos y referencias rotas +- [ ] `dag_topo_sort` retorna orden correcto con niveles de paralelismo +- [ ] Todas las funciones son puras (sin I/O, sin estado) +- [ ] Tests pasan con `go test -tags fts5 ./...` +- [ ] Indexado en registry.db + +## Referencias + +- Dagu YAML spec: `~/dagu/dags/example.yaml` +- Kahn's algorithm: topological sort con BFS que da niveles diff --git a/dev/issues/0007b-process-manager.md b/dev/issues/0007b-process-manager.md new file mode 100644 index 00000000..18a25028 --- /dev/null +++ b/dev/issues/0007b-process-manager.md @@ -0,0 +1,115 @@ +# 0007b — Process manager: spawn, wait, kill, status + +## Metadata + +| Campo | Valor | +|-------|-------| +| **ID** | 0007b | +| **Estado** | pendiente | +| **Prioridad** | alta | +| **Tipo** | feature | + +## Dependencias + +| ID | Título | Estado | Requerido | +|----|--------|--------|-----------| +| 0007a | Funciones core del DAG engine | pendiente | Si | + +**Bloqueada por:** `#0007a` + +**Desbloquea:** `#0007e` + +--- + +## Objetivo + +Funciones impuras para gestionar procesos hijo: lanzar, esperar, matar, consultar estado. Son los bloques que el executor usara para correr cada step de un DAG. + +## Contexto + +- Cada step de un DAG se ejecuta como un proceso hijo (`os/exec`) +- Necesitamos captura de stdout/stderr, timeout, señales (SIGTERM/SIGKILL) +- Deben ser funciones atomicas — el executor las compone +- Dominio `infra` porque gestionan recursos del sistema + +## Arquitectura + +``` +functions/infra/ +├── process_spawn.go — NEW: lanza proceso, retorna PID + pipes +├── process_spawn.md +├── process_wait.go — NEW: espera proceso con timeout +├── process_wait.md +├── process_kill.go — NEW: envia señal a proceso (SIGTERM, SIGKILL) +├── process_kill.md +├── process_status.go — NEW: consulta estado de PID (running, exited, code) +├── process_status.md + +types/infra/ +├── process_handle.md — NEW: PID, stdin/stdout/stderr pipes, start_time +├── process_result.md — NEW: exit_code, stdout, stderr, duration_ms +``` + +### Patron pure core / impure shell + +- `core/` — No aplica en este issue +- `infra/` — Todas impuras (spawn procesos, I/O con OS) +- `error_type`: `error_go_core` para todas + +## Tareas + +### Fase 1: Tipos + +- [ ] **1.1** Definir `ProcessHandle` — pid, cmd, start_time, working_dir +- [ ] **1.2** Definir `ProcessResult` — exit_code, stdout, stderr, duration_ms, killed + +### Fase 2: Funciones + +- [ ] **2.1** `process_spawn` — ejecuta comando con args, env, working_dir. Retorna ProcessHandle. No bloquea. +- [ ] **2.2** `process_wait` — espera a que el proceso termine o timeout. Retorna ProcessResult. +- [ ] **2.3** `process_kill` — envia SIGTERM, espera grace period, luego SIGKILL si sigue vivo +- [ ] **2.4** `process_status` — consulta si el PID sigue corriendo, retorna estado + +### Fase 3: Tests + +- [ ] **3.1** Tests: spawn+wait de `echo hello`, timeout con `sleep 999`, kill de proceso largo +- [ ] **3.2** Tests: captura correcta de stdout/stderr, exit codes no-zero + +### Fase 4: Cleanup + +- [ ] `fn index` y verificar IDs +- [ ] Verificar error_type en todas las funciones impuras + +--- + +## Ejemplo de uso + +```go +handle, err := process_spawn(ProcessSpawnInput{ + Command: "python3", + Args: []string{"script.py", "--flag"}, + Env: []string{"API_KEY=xxx"}, + WorkingDir: "/home/lucas/project", +}) + +result, err := process_wait(handle, 30*time.Second) // timeout 30s +// result.ExitCode == 0, result.Stdout == "output..." + +// O matar si tarda demasiado +process_kill(handle, 5*time.Second) // SIGTERM, 5s grace, luego SIGKILL +``` + +## Decisiones de diseno + +- **Spawn no bloquea**: retorna handle inmediatamente, wait es separado — permite al executor lanzar steps en paralelo +- **Kill con grace period**: SIGTERM primero, espera, SIGKILL si no murio — comportamiento estandar de process managers +- **Stdout/stderr como strings**: para steps cortos. Para steps con output grande, futuro: streaming a archivo + +## Criterios de aceptacion + +- [ ] Spawn y wait funcionan con comandos reales +- [ ] Timeout mata el proceso correctamente +- [ ] Kill con grace period funciona +- [ ] Exit codes se capturan correctamente +- [ ] Tests pasan +- [ ] Indexado en registry.db diff --git a/dev/issues/0007c-execution-store.md b/dev/issues/0007c-execution-store.md new file mode 100644 index 00000000..66a7fb0e --- /dev/null +++ b/dev/issues/0007c-execution-store.md @@ -0,0 +1,115 @@ +# 0007c — Execution store: persistencia de estado + +## Metadata + +| Campo | Valor | +|-------|-------| +| **ID** | 0007c | +| **Estado** | pendiente | +| **Prioridad** | alta | +| **Tipo** | feature | + +## Dependencias + +| ID | Título | Estado | Requerido | +|----|--------|--------|-----------| +| 0007a | Funciones core del DAG engine | pendiente | Si | + +**Bloqueada por:** `#0007a` + +**Desbloquea:** `#0007e` + +--- + +## Objetivo + +Funciones para persistir el estado de ejecuciones de DAGs en SQLite: que DAG se ejecuto, cuando, que steps corrieron, resultado de cada step, logs. Permite historial, reintentos y debugging. + +## Contexto + +- Cada ejecucion de un DAG genera un `run` con multiples `step_results` +- Similar a `operations.db` pero especifico para el DAG engine +- La BD vive en el directorio de la app del scheduler (no en raiz) +- Debe soportar consultas tipo: "ultimas 10 ejecuciones de X", "steps fallidos" + +## Arquitectura + +``` +functions/infra/ +├── dag_store_init.go — NEW: crea schema SQLite para runs/steps +├── dag_store_init.md +├── dag_store_run.go — NEW: CRUD de runs (create, update status, list, get) +├── dag_store_run.md +├── dag_store_step.go — NEW: CRUD de step results dentro de un run +├── dag_store_step.md + +types/infra/ +├── dag_run.md — NEW: id, dag_name, status, started_at, finished_at, trigger +├── dag_step_result.md — NEW: run_id, step_name, status, exit_code, stdout, stderr, duration_ms +``` + +### Patron pure core / impure shell + +- `infra/` — Todas impuras (SQLite I/O) +- Schema sencillo: `dag_runs` + `dag_step_results` + +## Tareas + +### Fase 1: Tipos y schema + +- [ ] **1.1** Definir `DagRun` — id (ULID), dag_name, dag_path, status (pending/running/success/failed/cancelled), started_at, finished_at, trigger (manual/schedule/api) +- [ ] **1.2** Definir `DagStepResult` — id, run_id, step_name, status, exit_code, stdout, stderr, started_at, finished_at, duration_ms +- [ ] **1.3** Schema SQLite: `dag_runs`, `dag_step_results` con indices + +### Fase 2: Funciones + +- [ ] **2.1** `dag_store_init` — crea/migra la BD SQLite +- [ ] **2.2** `dag_store_run` — create_run, update_run_status, get_run, list_runs (con filtros) +- [ ] **2.3** `dag_store_step` — insert_step_result, list_steps_for_run + +### Fase 3: Tests + +- [ ] **3.1** Ciclo completo: init → create run → insert steps → update status → query +- [ ] **3.2** Queries: ultimas N ejecuciones, ejecuciones fallidas, steps de un run + +### Fase 4: Cleanup + +- [ ] `fn index` y verificar IDs + +--- + +## Ejemplo de uso + +```go +db := dag_store_init("/path/to/scheduler.db") + +run := dag_store_create_run(db, "my_pipeline", "manual") +// run.ID = "01HXZ..." + +dag_store_insert_step(db, run.ID, DagStepResult{ + StepName: "fetch_data", + Status: "success", + ExitCode: 0, + Stdout: "fetched 1000 rows", + DurationMs: 1234, +}) + +dag_store_update_status(db, run.ID, "success") + +// Consultar +runs := dag_store_list_runs(db, "my_pipeline", 10) // ultimas 10 +``` + +## Decisiones de diseno + +- **SQLite por DAG engine, no por DAG**: una sola BD para todas las ejecuciones, no una por cada DAG +- **ULID para run IDs**: ordenables por tiempo, unicos sin coordinacion +- **stdout/stderr en BD**: para steps cortos. Para output grande, guardar path a archivo de log + +## Criterios de aceptacion + +- [ ] Schema se crea correctamente +- [ ] CRUD completo funciona +- [ ] Queries con filtros funcionan +- [ ] Tests pasan +- [ ] Indexado en registry.db diff --git a/dev/issues/0007d-scheduler.md b/dev/issues/0007d-scheduler.md new file mode 100644 index 00000000..4025cf87 --- /dev/null +++ b/dev/issues/0007d-scheduler.md @@ -0,0 +1,109 @@ +# 0007d — Scheduler: cron parser y ticker + +## Metadata + +| Campo | Valor | +|-------|-------| +| **ID** | 0007d | +| **Estado** | pendiente | +| **Prioridad** | media | +| **Tipo** | feature | + +## Dependencias + +| ID | Título | Estado | Requerido | +|----|--------|--------|-----------| +| 0007a | Funciones core del DAG engine | pendiente | Si | +| 0007c | Execution store | pendiente | Si | + +**Bloqueada por:** `#0007a, #0007c` + +**Desbloquea:** `#0007e` + +--- + +## Objetivo + +Funciones para parsear expresiones cron, calcular proximas ejecuciones, y un ticker que dispara DAGs segun su schedule. Es lo que reemplaza el scheduler de Dagu. + +## Contexto + +- Las expresiones cron de Dagu son estandar (5 campos: min hour dom mon dow) +- El ticker es un loop infinito que cada minuto evalua que DAGs deben lanzarse +- Funciones puras para parseo y calculo, impura solo el ticker + +## Arquitectura + +``` +functions/core/ +├── cron_parse.go — NEW: string → CronExpression +├── cron_parse.md +├── cron_next.go — NEW: CronExpression + time → proxima ejecucion +├── cron_next.md +├── cron_match.go — NEW: CronExpression + time → bool (coincide?) +├── cron_match.md + +functions/infra/ +├── dag_ticker.go — NEW: loop que evalua schedules y lanza DAGs +├── dag_ticker.md + +types/core/ +├── cron_expression.md — NEW: minute, hour, dom, month, dow (cada uno []int o wildcard) +``` + +### Patron pure core / impure shell + +- `core/` — `cron_parse`, `cron_next`, `cron_match` son puras +- `infra/` — `dag_ticker` es impuro (time.Sleep, lanza ejecuciones) + +## Tareas + +### Fase 1: Tipos + +- [ ] **1.1** Definir `CronExpression` — campos parseados con soporte para *, ranges (1-5), lists (1,3,5), intervals (*/5) + +### Fase 2: Funciones puras + +- [ ] **2.1** `cron_parse` — "0 9 * * *" → CronExpression. Soportar: *, N, N-M, N/M, listas +- [ ] **2.2** `cron_next` — dada una CronExpression y un time.Time, retorna el proximo time.Time que coincide +- [ ] **2.3** `cron_match` — dada una CronExpression y un time.Time, retorna true si coincide (para el ticker) +- [ ] **2.4** Tests exhaustivos: wildcards, ranges, listas, intervalos, edge cases (fin de mes, febrero) + +### Fase 3: Ticker + +- [ ] **3.1** `dag_ticker` — recibe lista de (DagDefinition, path), cada minuto evalua cron_match para cada uno, lanza los que coinciden +- [ ] **3.2** Soporte para cancelacion (context.Context) y graceful shutdown + +### Fase 4: Cleanup + +- [ ] `fn index` y verificar IDs + +--- + +## Ejemplo de uso + +```go +// Puro +expr, _ := cron_parse("*/5 9-17 * * 1-5") // cada 5 min, 9-17h, lun-vie +next := cron_next(expr, time.Now()) // proxima ejecucion +matches := cron_match(expr, time.Now()) // true si ahora coincide + +// Impuro (el ticker) +ctx, cancel := context.WithCancel(context.Background()) +dag_ticker(ctx, dags, executor) // loop infinito hasta cancel +``` + +## Decisiones de diseno + +- **No usar libreria cron externa**: las expresiones son simples, implementar desde cero es ~100 lineas y evita dependencias +- **Separar parse/next/match**: parse es costoso, match es barato — parsear una vez, match cada minuto +- **Ticker como funcion, no como goroutine**: el caller decide como lanzarlo + +## Criterios de aceptacion + +- [ ] Parsea todas las expresiones cron de los DAGs existentes en `~/dagu/dags/` +- [ ] `cron_next` calcula correctamente la proxima ejecucion +- [ ] `cron_match` coincide correctamente para el minuto actual +- [ ] Ticker lanza DAGs en el momento correcto +- [ ] Tests pasan +- [ ] Indexado en registry.db diff --git a/dev/issues/0007e-dag-executor-app.md b/dev/issues/0007e-dag-executor-app.md new file mode 100644 index 00000000..e01db05a --- /dev/null +++ b/dev/issues/0007e-dag-executor-app.md @@ -0,0 +1,143 @@ +# 0007e — DAG executor app: CLI/TUI que reemplaza Dagu + +## Metadata + +| Campo | Valor | +|-------|-------| +| **ID** | 0007e | +| **Estado** | pendiente | +| **Prioridad** | alta | +| **Tipo** | feature | + +## Dependencias + +| ID | Título | Estado | Requerido | +|----|--------|--------|-----------| +| 0007a | Funciones core del DAG engine | pendiente | Si | +| 0007b | Process manager | pendiente | Si | +| 0007c | Execution store | pendiente | Si | +| 0007d | Scheduler | pendiente | Si | + +**Bloqueada por:** `#0007a, #0007b, #0007c, #0007d` + +**Desbloquea:** ninguna + +--- + +## Objetivo + +App que compone todas las funciones de 0007a-d en un ejecutable unico que reemplaza a Dagu: lee DAGs YAML, los ejecuta con dependencias, persiste estado, y opcionalmente corre como daemon con scheduler. + +## Contexto + +- Vive en `apps/dag_engine/` (es una app, no una funcion reutilizable) +- Lee DAGs del directorio `~/dagu/dags/` (o configurable) +- El executor es el nucleo: toma un DagDefinition, lanza steps en orden topologico, gestiona paralelismo +- Modos: `run` (ejecuta un DAG), `start` (daemon con scheduler), `status` (consulta ejecuciones), `list` (lista DAGs) + +## Arquitectura + +``` +apps/dag_engine/ +├── app.md — metadata del registry +├── main.go — CLI: subcomandos run/start/status/list +├── executor.go — compone dag_topo_sort + process_spawn/wait + store +├── server.go — (futuro) HTTP API para trigger remoto +├── go.mod +├── .gitignore +``` + +### Patron pure core / impure shell + +- `core/` — ya creadas en 0007a y 0007d (funciones puras del registry) +- `infra/` — ya creadas en 0007b y 0007c (funciones impuras del registry) +- `app/` — `executor.go` compone todo, `main.go` orquesta + +## Tareas + +### Fase 1: Executor + +- [ ] **1.1** `executor.go` — funcion `ExecuteDAG(dag DagDefinition, store DB) DagRun` + - Crea run en store + - Resuelve env + - Ordena steps (topo sort) + - Ejecuta nivel por nivel: steps del mismo nivel van en paralelo (goroutines) + - Cada step: spawn → wait → guarda result en store + - Si un step falla: cancela dependientes, marca run como failed + - Retorna DagRun con resultado final + +### Fase 2: CLI + +- [ ] **2.1** `fn-dag run ` — parsea, valida, ejecuta, muestra resultado +- [ ] **2.2** `fn-dag list [dir]` — lista DAGs con su schedule y ultimo estado +- [ ] **2.3** `fn-dag status [dag_name]` — ultimas ejecuciones, detalle de steps +- [ ] **2.4** `fn-dag start [dir]` — daemon: carga todos los DAGs, arranca ticker + +### Fase 3: Integracion + +- [ ] **3.1** `app.md` con uses_functions referenciando todas las funciones de 0007a-d +- [ ] **3.2** `operations.db` inicializado (fn ops init) +- [ ] **3.3** Publicar en Gitea (dataforge/dag_engine) + +### Fase 4: Tests e2e + +- [ ] **4.1** Ejecutar DAGs existentes de `~/dagu/dags/` y comparar resultado con Dagu +- [ ] **4.2** Test: DAG con steps paralelos, DAG con fallo en medio, DAG con timeout + +### Fase 5: Cleanup + +- [ ] `fn index` +- [ ] Actualizar CLAUDE.md con documentacion del dag engine + +--- + +## Ejemplo de uso + +```bash +# Ejecutar un DAG +fn-dag run ~/dagu/dags/example.yaml +# Step hello... done (0.1s) +# Step list_files... done (0.2s) +# Step date... done (0.1s) +# Run completed: 3/3 steps succeeded (0.4s) + +# Listar DAGs +fn-dag list ~/dagu/dags/ +# NAME SCHEDULE LAST RUN STATUS +# example 0 9 * * * 2026-04-07 success +# example_lineage_tracking 0 */6 * * * 2026-04-08 failed + +# Ver estado +fn-dag status example +# RUN_ID STARTED STATUS STEPS +# 01HXZ... 2026-04-08 09:00:01 success 3/3 +# 01HXY... 2026-04-07 09:00:00 success 3/3 + +# Daemon con scheduler +fn-dag start ~/dagu/dags/ +# [09:00] Scheduler started. Watching 5 DAGs. +# [09:00] Triggered: example (schedule match) +# ... +``` + +## Decisiones de diseno + +- **Un binario, no un servicio**: `fn-dag run` es fire-and-forget. `fn-dag start` es el unico modo daemon. +- **Paralelismo por niveles**: steps en el mismo nivel topologico corren en goroutines, no hay limite de concurrencia (por ahora) +- **Compatible con DAGs de Dagu**: lee el mismo formato YAML, no requiere migracion +- **Sin web UI por ahora**: la TUI y/o web UI es un issue futuro, el CLI cubre el 80% del uso + +## Riesgos + +- **Riesgo**: DAGs complejos de Dagu usan features que no implementamos (preconditions, params, mail on failure). **Mitigacion**: empezar con el subset que usamos, documentar que no se soporta. +- **Riesgo**: Race conditions en el executor paralelo. **Mitigacion**: cada goroutine tiene su propio ProcessHandle, el store usa transacciones SQLite. + +## Criterios de aceptacion + +- [ ] `fn-dag run` ejecuta correctamente los DAGs existentes +- [ ] Steps paralelos se ejecutan concurrentemente +- [ ] Fallos en un step cancelan dependientes +- [ ] Estado se persiste en SQLite +- [ ] `fn-dag start` corre como daemon con scheduler +- [ ] App registrada en registry.db e indexada +- [ ] Publicada en Gitea diff --git a/dev/issues/README.md b/dev/issues/README.md new file mode 100644 index 00000000..faecec9d --- /dev/null +++ b/dev/issues/README.md @@ -0,0 +1,15 @@ +# Issues + +| ID | Título | Estado | Prioridad | Tipo | Bloquea | +|----|--------|--------|-----------|------|---------| +| 0001 | Jupyter create notebook | completado | — | feature | — | +| 0002 | Jupyter discover root dir | completado | — | bugfix | — | +| 0003 | Jupyter tools documentation | completado | — | docs | — | +| 0004 | Jupyter discover multiple instances | completado | — | feature | — | +| 0005 | Jupyter write batch | completado | — | feature | — | +| 0006 | Jupyter exec outputs keyerror | completado | — | bugfix | — | +| **0007a** | **DAG engine: core (parse, validate, topo sort)** | pendiente | alta | feature | 0007b-e | +| **0007b** | **DAG engine: process manager (spawn, wait, kill)** | pendiente | alta | feature | 0007e | +| **0007c** | **DAG engine: execution store (SQLite)** | pendiente | alta | feature | 0007e | +| **0007d** | **DAG engine: scheduler (cron parser, ticker)** | pendiente | media | feature | 0007e | +| **0007e** | **DAG engine: app CLI que reemplaza Dagu** | pendiente | alta | feature | — |