fix(fn-run): propagar stdout/stderr de bash functions library-style #1

Open
dataforge wants to merge 537 commits from auto/0077-fn-run-bash-mudo into master
7 changed files with 641 additions and 0 deletions
Showing only changes of commit d9b448a07b - Show all commits
+7
View File
@@ -0,0 +1,7 @@
{
"dag-engine": {
"enabled": false,
"issue": "0007",
"description": "Sistema propio de orquestacion de DAGs para reemplazar Dagu. Incluye parser YAML, executor con paralelismo, process manager, execution store y scheduler cron."
}
}
+137
View File
@@ -0,0 +1,137 @@
# 0007a — Funciones core del DAG engine
## Metadata
| Campo | Valor |
|-------|-------|
| **ID** | 0007a |
| **Estado** | pendiente |
| **Prioridad** | alta |
| **Tipo** | feature |
## Dependencias
| ID | Título | Estado | Requerido |
|----|--------|--------|-----------|
| — | Ninguna | — | — |
**Bloqueada por:** ninguna
**Desbloquea:** `#0007b, #0007c, #0007d, #0007e`
---
## Objetivo
Crear las funciones puras que parsean, validan y ordenan DAGs definidos en YAML. Estas funciones son el nucleo del sistema de orquestacion — todo lo demas depende de ellas.
## Contexto
- Dagu usa YAML con `steps`, `depends`, `env`, `schedule` — queremos compatibilidad con ese formato
- Las funciones deben ser puras: reciben datos, retornan datos, sin I/O
- Deben vivir en `functions/core/` (Go) para maxima composabilidad
- El formato YAML de Dagu existente en `~/dagu/dags/` debe poder parsearse sin cambios
## Arquitectura
```
functions/core/
├── dag_parse.go — NEW: YAML → DagDefinition
├── dag_parse.md — NEW: metadata
├── dag_validate.go — NEW: valida ciclos, refs rotas, campos requeridos
├── dag_validate.md — NEW: metadata
├── dag_topo_sort.go — NEW: ordena steps por dependencias (Kahn's algorithm)
├── dag_topo_sort.md — NEW: metadata
├── dag_resolve_env.go — NEW: sustituye variables ${VAR} en steps
├── dag_resolve_env.md — NEW: metadata
types/core/
├── dag_definition.md — NEW: tipo DagDefinition (product)
├── dag_step.md — NEW: tipo DagStep (product)
├── dag_schedule.md — NEW: tipo DagSchedule (product)
├── dag_result.md — NEW: tipo DagValidationResult (product)
```
### Patron pure core / impure shell
- `core/` — Todas las funciones de este issue son puras
- No hay shell/impure en este issue
- Los tipos usan nativos de Go en firmas, tipos del registry en `uses_types`
## Tareas
### Fase 1: Tipos
- [ ] **1.1** Definir `DagStep` — name, command, args, depends, env, timeout, retry, tags
- [ ] **1.2** Definir `DagSchedule` — cron expressions, timezone
- [ ] **1.3** Definir `DagDefinition` — name, description, steps, env, schedule, tags
- [ ] **1.4** Definir `DagValidationResult` — errors, warnings, step_order
### Fase 2: Parser
- [ ] **2.1** `dag_parse` — YAML bytes → DagDefinition. Soportar formato Dagu: steps con command/depends/env
- [ ] **2.2** Tests: parsear DAGs existentes de `~/dagu/dags/`, edge cases (YAML invalido, campos faltantes)
### Fase 3: Validacion
- [ ] **3.1** `dag_validate` — detectar ciclos (DFS), referencias rotas en depends, steps sin nombre, nombres duplicados
- [ ] **3.2** Tests: grafos ciclicos, DAGs validos, depends a steps inexistentes
### Fase 4: Topological sort
- [ ] **4.1** `dag_topo_sort` — Kahn's algorithm, retorna steps en orden de ejecucion con niveles de paralelismo
- [ ] **4.2** Tests: DAGs lineales, DAGs con ramas paralelas, diamond dependencies
### Fase 5: Resolucion de env
- [ ] **5.1** `dag_resolve_env` — sustituye `${VAR}` y `$VAR` en command/args de cada step usando env del DAG + env del step
- [ ] **5.2** Tests: variables anidadas, variables no definidas, escaping
### Fase 6: Cleanup
- [ ] `fn index` y verificar todos los IDs
- [ ] Verificar que todos los tipos son referenciados correctamente en uses_types
---
## Ejemplo de uso
```go
// Parsear un DAG
data, _ := os.ReadFile("dags/my_pipeline.yaml")
dag, err := dag_parse(data)
// Validar
result := dag_validate(dag)
if len(result.Errors) > 0 {
// ciclos, refs rotas...
}
// Ordenar
ordered := dag_topo_sort(dag.Steps)
// ordered = [[step_a], [step_b, step_c], [step_d]]
// nivel 0 nivel 1 (paralelo) nivel 2
// Resolver env
resolved := dag_resolve_env(dag, os.Environ())
```
## Decisiones de diseno
- **Kahn's algorithm sobre DFS topo sort**: Kahn's da niveles de paralelismo gratis — steps en el mismo nivel pueden ejecutarse en paralelo
- **Formato Dagu compatible**: no inventar formato nuevo, reutilizar el YAML que ya existe
- **Tipos nativos en firma**: `[]byte` entrada, structs con campos basicos, sin dependencias externas para parsear
## Criterios de aceptacion
- [ ] `dag_parse` parsea correctamente los DAGs existentes en `~/dagu/dags/`
- [ ] `dag_validate` detecta ciclos y referencias rotas
- [ ] `dag_topo_sort` retorna orden correcto con niveles de paralelismo
- [ ] Todas las funciones son puras (sin I/O, sin estado)
- [ ] Tests pasan con `go test -tags fts5 ./...`
- [ ] Indexado en registry.db
## Referencias
- Dagu YAML spec: `~/dagu/dags/example.yaml`
- Kahn's algorithm: topological sort con BFS que da niveles
+115
View File
@@ -0,0 +1,115 @@
# 0007b — Process manager: spawn, wait, kill, status
## Metadata
| Campo | Valor |
|-------|-------|
| **ID** | 0007b |
| **Estado** | pendiente |
| **Prioridad** | alta |
| **Tipo** | feature |
## Dependencias
| ID | Título | Estado | Requerido |
|----|--------|--------|-----------|
| 0007a | Funciones core del DAG engine | pendiente | Si |
**Bloqueada por:** `#0007a`
**Desbloquea:** `#0007e`
---
## Objetivo
Funciones impuras para gestionar procesos hijo: lanzar, esperar, matar, consultar estado. Son los bloques que el executor usara para correr cada step de un DAG.
## Contexto
- Cada step de un DAG se ejecuta como un proceso hijo (`os/exec`)
- Necesitamos captura de stdout/stderr, timeout, señales (SIGTERM/SIGKILL)
- Deben ser funciones atomicas — el executor las compone
- Dominio `infra` porque gestionan recursos del sistema
## Arquitectura
```
functions/infra/
├── process_spawn.go — NEW: lanza proceso, retorna PID + pipes
├── process_spawn.md
├── process_wait.go — NEW: espera proceso con timeout
├── process_wait.md
├── process_kill.go — NEW: envia señal a proceso (SIGTERM, SIGKILL)
├── process_kill.md
├── process_status.go — NEW: consulta estado de PID (running, exited, code)
├── process_status.md
types/infra/
├── process_handle.md — NEW: PID, stdin/stdout/stderr pipes, start_time
├── process_result.md — NEW: exit_code, stdout, stderr, duration_ms
```
### Patron pure core / impure shell
- `core/` — No aplica en este issue
- `infra/` — Todas impuras (spawn procesos, I/O con OS)
- `error_type`: `error_go_core` para todas
## Tareas
### Fase 1: Tipos
- [ ] **1.1** Definir `ProcessHandle` — pid, cmd, start_time, working_dir
- [ ] **1.2** Definir `ProcessResult` — exit_code, stdout, stderr, duration_ms, killed
### Fase 2: Funciones
- [ ] **2.1** `process_spawn` — ejecuta comando con args, env, working_dir. Retorna ProcessHandle. No bloquea.
- [ ] **2.2** `process_wait` — espera a que el proceso termine o timeout. Retorna ProcessResult.
- [ ] **2.3** `process_kill` — envia SIGTERM, espera grace period, luego SIGKILL si sigue vivo
- [ ] **2.4** `process_status` — consulta si el PID sigue corriendo, retorna estado
### Fase 3: Tests
- [ ] **3.1** Tests: spawn+wait de `echo hello`, timeout con `sleep 999`, kill de proceso largo
- [ ] **3.2** Tests: captura correcta de stdout/stderr, exit codes no-zero
### Fase 4: Cleanup
- [ ] `fn index` y verificar IDs
- [ ] Verificar error_type en todas las funciones impuras
---
## Ejemplo de uso
```go
handle, err := process_spawn(ProcessSpawnInput{
Command: "python3",
Args: []string{"script.py", "--flag"},
Env: []string{"API_KEY=xxx"},
WorkingDir: "/home/lucas/project",
})
result, err := process_wait(handle, 30*time.Second) // timeout 30s
// result.ExitCode == 0, result.Stdout == "output..."
// O matar si tarda demasiado
process_kill(handle, 5*time.Second) // SIGTERM, 5s grace, luego SIGKILL
```
## Decisiones de diseno
- **Spawn no bloquea**: retorna handle inmediatamente, wait es separado — permite al executor lanzar steps en paralelo
- **Kill con grace period**: SIGTERM primero, espera, SIGKILL si no murio — comportamiento estandar de process managers
- **Stdout/stderr como strings**: para steps cortos. Para steps con output grande, futuro: streaming a archivo
## Criterios de aceptacion
- [ ] Spawn y wait funcionan con comandos reales
- [ ] Timeout mata el proceso correctamente
- [ ] Kill con grace period funciona
- [ ] Exit codes se capturan correctamente
- [ ] Tests pasan
- [ ] Indexado en registry.db
+115
View File
@@ -0,0 +1,115 @@
# 0007c — Execution store: persistencia de estado
## Metadata
| Campo | Valor |
|-------|-------|
| **ID** | 0007c |
| **Estado** | pendiente |
| **Prioridad** | alta |
| **Tipo** | feature |
## Dependencias
| ID | Título | Estado | Requerido |
|----|--------|--------|-----------|
| 0007a | Funciones core del DAG engine | pendiente | Si |
**Bloqueada por:** `#0007a`
**Desbloquea:** `#0007e`
---
## Objetivo
Funciones para persistir el estado de ejecuciones de DAGs en SQLite: que DAG se ejecuto, cuando, que steps corrieron, resultado de cada step, logs. Permite historial, reintentos y debugging.
## Contexto
- Cada ejecucion de un DAG genera un `run` con multiples `step_results`
- Similar a `operations.db` pero especifico para el DAG engine
- La BD vive en el directorio de la app del scheduler (no en raiz)
- Debe soportar consultas tipo: "ultimas 10 ejecuciones de X", "steps fallidos"
## Arquitectura
```
functions/infra/
├── dag_store_init.go — NEW: crea schema SQLite para runs/steps
├── dag_store_init.md
├── dag_store_run.go — NEW: CRUD de runs (create, update status, list, get)
├── dag_store_run.md
├── dag_store_step.go — NEW: CRUD de step results dentro de un run
├── dag_store_step.md
types/infra/
├── dag_run.md — NEW: id, dag_name, status, started_at, finished_at, trigger
├── dag_step_result.md — NEW: run_id, step_name, status, exit_code, stdout, stderr, duration_ms
```
### Patron pure core / impure shell
- `infra/` — Todas impuras (SQLite I/O)
- Schema sencillo: `dag_runs` + `dag_step_results`
## Tareas
### Fase 1: Tipos y schema
- [ ] **1.1** Definir `DagRun` — id (ULID), dag_name, dag_path, status (pending/running/success/failed/cancelled), started_at, finished_at, trigger (manual/schedule/api)
- [ ] **1.2** Definir `DagStepResult` — id, run_id, step_name, status, exit_code, stdout, stderr, started_at, finished_at, duration_ms
- [ ] **1.3** Schema SQLite: `dag_runs`, `dag_step_results` con indices
### Fase 2: Funciones
- [ ] **2.1** `dag_store_init` — crea/migra la BD SQLite
- [ ] **2.2** `dag_store_run` — create_run, update_run_status, get_run, list_runs (con filtros)
- [ ] **2.3** `dag_store_step` — insert_step_result, list_steps_for_run
### Fase 3: Tests
- [ ] **3.1** Ciclo completo: init → create run → insert steps → update status → query
- [ ] **3.2** Queries: ultimas N ejecuciones, ejecuciones fallidas, steps de un run
### Fase 4: Cleanup
- [ ] `fn index` y verificar IDs
---
## Ejemplo de uso
```go
db := dag_store_init("/path/to/scheduler.db")
run := dag_store_create_run(db, "my_pipeline", "manual")
// run.ID = "01HXZ..."
dag_store_insert_step(db, run.ID, DagStepResult{
StepName: "fetch_data",
Status: "success",
ExitCode: 0,
Stdout: "fetched 1000 rows",
DurationMs: 1234,
})
dag_store_update_status(db, run.ID, "success")
// Consultar
runs := dag_store_list_runs(db, "my_pipeline", 10) // ultimas 10
```
## Decisiones de diseno
- **SQLite por DAG engine, no por DAG**: una sola BD para todas las ejecuciones, no una por cada DAG
- **ULID para run IDs**: ordenables por tiempo, unicos sin coordinacion
- **stdout/stderr en BD**: para steps cortos. Para output grande, guardar path a archivo de log
## Criterios de aceptacion
- [ ] Schema se crea correctamente
- [ ] CRUD completo funciona
- [ ] Queries con filtros funcionan
- [ ] Tests pasan
- [ ] Indexado en registry.db
+109
View File
@@ -0,0 +1,109 @@
# 0007d — Scheduler: cron parser y ticker
## Metadata
| Campo | Valor |
|-------|-------|
| **ID** | 0007d |
| **Estado** | pendiente |
| **Prioridad** | media |
| **Tipo** | feature |
## Dependencias
| ID | Título | Estado | Requerido |
|----|--------|--------|-----------|
| 0007a | Funciones core del DAG engine | pendiente | Si |
| 0007c | Execution store | pendiente | Si |
**Bloqueada por:** `#0007a, #0007c`
**Desbloquea:** `#0007e`
---
## Objetivo
Funciones para parsear expresiones cron, calcular proximas ejecuciones, y un ticker que dispara DAGs segun su schedule. Es lo que reemplaza el scheduler de Dagu.
## Contexto
- Las expresiones cron de Dagu son estandar (5 campos: min hour dom mon dow)
- El ticker es un loop infinito que cada minuto evalua que DAGs deben lanzarse
- Funciones puras para parseo y calculo, impura solo el ticker
## Arquitectura
```
functions/core/
├── cron_parse.go — NEW: string → CronExpression
├── cron_parse.md
├── cron_next.go — NEW: CronExpression + time → proxima ejecucion
├── cron_next.md
├── cron_match.go — NEW: CronExpression + time → bool (coincide?)
├── cron_match.md
functions/infra/
├── dag_ticker.go — NEW: loop que evalua schedules y lanza DAGs
├── dag_ticker.md
types/core/
├── cron_expression.md — NEW: minute, hour, dom, month, dow (cada uno []int o wildcard)
```
### Patron pure core / impure shell
- `core/``cron_parse`, `cron_next`, `cron_match` son puras
- `infra/``dag_ticker` es impuro (time.Sleep, lanza ejecuciones)
## Tareas
### Fase 1: Tipos
- [ ] **1.1** Definir `CronExpression` — campos parseados con soporte para *, ranges (1-5), lists (1,3,5), intervals (*/5)
### Fase 2: Funciones puras
- [ ] **2.1** `cron_parse` — "0 9 * * *" → CronExpression. Soportar: *, N, N-M, N/M, listas
- [ ] **2.2** `cron_next` — dada una CronExpression y un time.Time, retorna el proximo time.Time que coincide
- [ ] **2.3** `cron_match` — dada una CronExpression y un time.Time, retorna true si coincide (para el ticker)
- [ ] **2.4** Tests exhaustivos: wildcards, ranges, listas, intervalos, edge cases (fin de mes, febrero)
### Fase 3: Ticker
- [ ] **3.1** `dag_ticker` — recibe lista de (DagDefinition, path), cada minuto evalua cron_match para cada uno, lanza los que coinciden
- [ ] **3.2** Soporte para cancelacion (context.Context) y graceful shutdown
### Fase 4: Cleanup
- [ ] `fn index` y verificar IDs
---
## Ejemplo de uso
```go
// Puro
expr, _ := cron_parse("*/5 9-17 * * 1-5") // cada 5 min, 9-17h, lun-vie
next := cron_next(expr, time.Now()) // proxima ejecucion
matches := cron_match(expr, time.Now()) // true si ahora coincide
// Impuro (el ticker)
ctx, cancel := context.WithCancel(context.Background())
dag_ticker(ctx, dags, executor) // loop infinito hasta cancel
```
## Decisiones de diseno
- **No usar libreria cron externa**: las expresiones son simples, implementar desde cero es ~100 lineas y evita dependencias
- **Separar parse/next/match**: parse es costoso, match es barato — parsear una vez, match cada minuto
- **Ticker como funcion, no como goroutine**: el caller decide como lanzarlo
## Criterios de aceptacion
- [ ] Parsea todas las expresiones cron de los DAGs existentes en `~/dagu/dags/`
- [ ] `cron_next` calcula correctamente la proxima ejecucion
- [ ] `cron_match` coincide correctamente para el minuto actual
- [ ] Ticker lanza DAGs en el momento correcto
- [ ] Tests pasan
- [ ] Indexado en registry.db
+143
View File
@@ -0,0 +1,143 @@
# 0007e — DAG executor app: CLI/TUI que reemplaza Dagu
## Metadata
| Campo | Valor |
|-------|-------|
| **ID** | 0007e |
| **Estado** | pendiente |
| **Prioridad** | alta |
| **Tipo** | feature |
## Dependencias
| ID | Título | Estado | Requerido |
|----|--------|--------|-----------|
| 0007a | Funciones core del DAG engine | pendiente | Si |
| 0007b | Process manager | pendiente | Si |
| 0007c | Execution store | pendiente | Si |
| 0007d | Scheduler | pendiente | Si |
**Bloqueada por:** `#0007a, #0007b, #0007c, #0007d`
**Desbloquea:** ninguna
---
## Objetivo
App que compone todas las funciones de 0007a-d en un ejecutable unico que reemplaza a Dagu: lee DAGs YAML, los ejecuta con dependencias, persiste estado, y opcionalmente corre como daemon con scheduler.
## Contexto
- Vive en `apps/dag_engine/` (es una app, no una funcion reutilizable)
- Lee DAGs del directorio `~/dagu/dags/` (o configurable)
- El executor es el nucleo: toma un DagDefinition, lanza steps en orden topologico, gestiona paralelismo
- Modos: `run` (ejecuta un DAG), `start` (daemon con scheduler), `status` (consulta ejecuciones), `list` (lista DAGs)
## Arquitectura
```
apps/dag_engine/
├── app.md — metadata del registry
├── main.go — CLI: subcomandos run/start/status/list
├── executor.go — compone dag_topo_sort + process_spawn/wait + store
├── server.go — (futuro) HTTP API para trigger remoto
├── go.mod
├── .gitignore
```
### Patron pure core / impure shell
- `core/` — ya creadas en 0007a y 0007d (funciones puras del registry)
- `infra/` — ya creadas en 0007b y 0007c (funciones impuras del registry)
- `app/``executor.go` compone todo, `main.go` orquesta
## Tareas
### Fase 1: Executor
- [ ] **1.1** `executor.go` — funcion `ExecuteDAG(dag DagDefinition, store DB) DagRun`
- Crea run en store
- Resuelve env
- Ordena steps (topo sort)
- Ejecuta nivel por nivel: steps del mismo nivel van en paralelo (goroutines)
- Cada step: spawn → wait → guarda result en store
- Si un step falla: cancela dependientes, marca run como failed
- Retorna DagRun con resultado final
### Fase 2: CLI
- [ ] **2.1** `fn-dag run <path.yaml>` — parsea, valida, ejecuta, muestra resultado
- [ ] **2.2** `fn-dag list [dir]` — lista DAGs con su schedule y ultimo estado
- [ ] **2.3** `fn-dag status [dag_name]` — ultimas ejecuciones, detalle de steps
- [ ] **2.4** `fn-dag start [dir]` — daemon: carga todos los DAGs, arranca ticker
### Fase 3: Integracion
- [ ] **3.1** `app.md` con uses_functions referenciando todas las funciones de 0007a-d
- [ ] **3.2** `operations.db` inicializado (fn ops init)
- [ ] **3.3** Publicar en Gitea (dataforge/dag_engine)
### Fase 4: Tests e2e
- [ ] **4.1** Ejecutar DAGs existentes de `~/dagu/dags/` y comparar resultado con Dagu
- [ ] **4.2** Test: DAG con steps paralelos, DAG con fallo en medio, DAG con timeout
### Fase 5: Cleanup
- [ ] `fn index`
- [ ] Actualizar CLAUDE.md con documentacion del dag engine
---
## Ejemplo de uso
```bash
# Ejecutar un DAG
fn-dag run ~/dagu/dags/example.yaml
# Step hello... done (0.1s)
# Step list_files... done (0.2s)
# Step date... done (0.1s)
# Run completed: 3/3 steps succeeded (0.4s)
# Listar DAGs
fn-dag list ~/dagu/dags/
# NAME SCHEDULE LAST RUN STATUS
# example 0 9 * * * 2026-04-07 success
# example_lineage_tracking 0 */6 * * * 2026-04-08 failed
# Ver estado
fn-dag status example
# RUN_ID STARTED STATUS STEPS
# 01HXZ... 2026-04-08 09:00:01 success 3/3
# 01HXY... 2026-04-07 09:00:00 success 3/3
# Daemon con scheduler
fn-dag start ~/dagu/dags/
# [09:00] Scheduler started. Watching 5 DAGs.
# [09:00] Triggered: example (schedule match)
# ...
```
## Decisiones de diseno
- **Un binario, no un servicio**: `fn-dag run` es fire-and-forget. `fn-dag start` es el unico modo daemon.
- **Paralelismo por niveles**: steps en el mismo nivel topologico corren en goroutines, no hay limite de concurrencia (por ahora)
- **Compatible con DAGs de Dagu**: lee el mismo formato YAML, no requiere migracion
- **Sin web UI por ahora**: la TUI y/o web UI es un issue futuro, el CLI cubre el 80% del uso
## Riesgos
- **Riesgo**: DAGs complejos de Dagu usan features que no implementamos (preconditions, params, mail on failure). **Mitigacion**: empezar con el subset que usamos, documentar que no se soporta.
- **Riesgo**: Race conditions en el executor paralelo. **Mitigacion**: cada goroutine tiene su propio ProcessHandle, el store usa transacciones SQLite.
## Criterios de aceptacion
- [ ] `fn-dag run` ejecuta correctamente los DAGs existentes
- [ ] Steps paralelos se ejecutan concurrentemente
- [ ] Fallos en un step cancelan dependientes
- [ ] Estado se persiste en SQLite
- [ ] `fn-dag start` corre como daemon con scheduler
- [ ] App registrada en registry.db e indexada
- [ ] Publicada en Gitea
+15
View File
@@ -0,0 +1,15 @@
# Issues
| ID | Título | Estado | Prioridad | Tipo | Bloquea |
|----|--------|--------|-----------|------|---------|
| 0001 | Jupyter create notebook | completado | — | feature | — |
| 0002 | Jupyter discover root dir | completado | — | bugfix | — |
| 0003 | Jupyter tools documentation | completado | — | docs | — |
| 0004 | Jupyter discover multiple instances | completado | — | feature | — |
| 0005 | Jupyter write batch | completado | — | feature | — |
| 0006 | Jupyter exec outputs keyerror | completado | — | bugfix | — |
| **0007a** | **DAG engine: core (parse, validate, topo sort)** | pendiente | alta | feature | 0007b-e |
| **0007b** | **DAG engine: process manager (spawn, wait, kill)** | pendiente | alta | feature | 0007e |
| **0007c** | **DAG engine: execution store (SQLite)** | pendiente | alta | feature | 0007e |
| **0007d** | **DAG engine: scheduler (cron parser, ticker)** | pendiente | media | feature | 0007e |
| **0007e** | **DAG engine: app CLI que reemplaza Dagu** | pendiente | alta | feature | — |