--- id: "0007a" title: "Funciones core del DAG engine" status: completado type: feature domain: [] scope: multi-app priority: alta depends: [] blocks: [] related: [] created: 2026-05-17 updated: 2026-05-17 tags: [] --- # 0007a — Funciones core del DAG engine ## Metadata | Campo | Valor | |-------|-------| | **ID** | 0007a | | **Estado** | pendiente | | **Prioridad** | alta | | **Tipo** | feature | ## Dependencias | ID | Título | Estado | Requerido | |----|--------|--------|-----------| | — | Ninguna | — | — | **Bloqueada por:** ninguna **Desbloquea:** `#0007b, #0007c, #0007d, #0007e` --- ## Objetivo Crear las funciones puras que parsean, validan y ordenan DAGs definidos en YAML. Estas funciones son el nucleo del sistema de orquestacion — todo lo demas depende de ellas. ## Contexto - Dagu usa YAML con `steps`, `depends`, `env`, `schedule` — queremos compatibilidad con ese formato - Las funciones deben ser puras: reciben datos, retornan datos, sin I/O - Deben vivir en `functions/core/` (Go) para maxima composabilidad - El formato YAML de Dagu existente en `~/dagu/dags/` debe poder parsearse sin cambios ## Arquitectura ``` functions/core/ ├── dag_parse.go — NEW: YAML → DagDefinition ├── dag_parse.md — NEW: metadata ├── dag_validate.go — NEW: valida ciclos, refs rotas, campos requeridos ├── dag_validate.md — NEW: metadata ├── dag_topo_sort.go — NEW: ordena steps por dependencias (Kahn's algorithm) ├── dag_topo_sort.md — NEW: metadata ├── dag_resolve_env.go — NEW: sustituye variables ${VAR} en steps ├── dag_resolve_env.md — NEW: metadata types/core/ ├── dag_definition.md — NEW: tipo DagDefinition (product) ├── dag_step.md — NEW: tipo DagStep (product) ├── dag_schedule.md — NEW: tipo DagSchedule (product) ├── dag_result.md — NEW: tipo DagValidationResult (product) ``` ### Patron pure core / impure shell - `core/` — Todas las funciones de este issue son puras - No hay shell/impure en este issue - Los tipos usan nativos de Go en firmas, tipos del registry en `uses_types` ## Tareas ### Fase 1: Tipos - [ ] **1.1** Definir `DagStep` — name, command, args, depends, env, timeout, retry, tags - [ ] **1.2** Definir `DagSchedule` — cron expressions, timezone - [ ] **1.3** Definir `DagDefinition` — name, description, steps, env, schedule, tags - [ ] **1.4** Definir `DagValidationResult` — errors, warnings, step_order ### Fase 2: Parser - [ ] **2.1** `dag_parse` — YAML bytes → DagDefinition. Soportar formato Dagu: steps con command/depends/env - [ ] **2.2** Tests: parsear DAGs existentes de `~/dagu/dags/`, edge cases (YAML invalido, campos faltantes) ### Fase 3: Validacion - [ ] **3.1** `dag_validate` — detectar ciclos (DFS), referencias rotas en depends, steps sin nombre, nombres duplicados - [ ] **3.2** Tests: grafos ciclicos, DAGs validos, depends a steps inexistentes ### Fase 4: Topological sort - [ ] **4.1** `dag_topo_sort` — Kahn's algorithm, retorna steps en orden de ejecucion con niveles de paralelismo - [ ] **4.2** Tests: DAGs lineales, DAGs con ramas paralelas, diamond dependencies ### Fase 5: Resolucion de env - [ ] **5.1** `dag_resolve_env` — sustituye `${VAR}` y `$VAR` en command/args de cada step usando env del DAG + env del step - [ ] **5.2** Tests: variables anidadas, variables no definidas, escaping ### Fase 6: Cleanup - [ ] `fn index` y verificar todos los IDs - [ ] Verificar que todos los tipos son referenciados correctamente en uses_types --- ## Ejemplo de uso ```go // Parsear un DAG data, _ := os.ReadFile("dags/my_pipeline.yaml") dag, err := dag_parse(data) // Validar result := dag_validate(dag) if len(result.Errors) > 0 { // ciclos, refs rotas... } // Ordenar ordered := dag_topo_sort(dag.Steps) // ordered = [[step_a], [step_b, step_c], [step_d]] // nivel 0 nivel 1 (paralelo) nivel 2 // Resolver env resolved := dag_resolve_env(dag, os.Environ()) ``` ## Decisiones de diseno - **Kahn's algorithm sobre DFS topo sort**: Kahn's da niveles de paralelismo gratis — steps en el mismo nivel pueden ejecutarse en paralelo - **Formato Dagu compatible**: no inventar formato nuevo, reutilizar el YAML que ya existe - **Tipos nativos en firma**: `[]byte` entrada, structs con campos basicos, sin dependencias externas para parsear ## Criterios de aceptacion - [ ] `dag_parse` parsea correctamente los DAGs existentes en `~/dagu/dags/` - [ ] `dag_validate` detecta ciclos y referencias rotas - [ ] `dag_topo_sort` retorna orden correcto con niveles de paralelismo - [ ] Todas las funciones son puras (sin I/O, sin estado) - [ ] Tests pasan con `go test -tags fts5 ./...` - [ ] Indexado en registry.db ## Referencias - Dagu YAML spec: `~/dagu/dags/example.yaml` - Kahn's algorithm: topological sort con BFS que da niveles