feat: add DAG core functions — parse, validate, topo sort, resolve env, cron match (0007a, 0007d)

Pure functions for parsing dagu-compatible YAML, validating DAG structure,
topological sorting with parallel levels (Kahn's algorithm), and env variable
resolution. Also adds cron_match for schedule matching.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-12 13:05:05 +02:00
parent cb96e85b69
commit c3dfc9315f
21 changed files with 1569 additions and 0 deletions
+12
View File
@@ -0,0 +1,12 @@
package core
import "time"
// CronMatch returns true if time t matches all fields of the cron schedule.
func CronMatch(sched CronSchedule, t time.Time) bool {
return intIn(t.Minute(), sched.Minute) &&
intIn(t.Hour(), sched.Hour) &&
intIn(t.Day(), sched.DayOfMonth) &&
intIn(int(t.Month()), sched.Month) &&
intIn(int(t.Weekday()), sched.DayOfWeek)
}
+49
View File
@@ -0,0 +1,49 @@
---
name: cron_match
kind: function
lang: go
domain: core
version: "1.0.0"
purity: pure
signature: "func CronMatch(sched CronSchedule, t time.Time) bool"
description: "Verifica si un instante de tiempo coincide con un cron schedule. Compara los 5 campos (minuto, hora, dia del mes, mes, dia de la semana) y retorna true si todos coinciden."
tags: [cron, scheduling, matching, time, pure]
uses_functions: []
uses_types: [cron_schedule_go_core]
returns: []
returns_optional: false
error_type: ""
imports: [time]
params:
- name: sched
desc: "CronSchedule con listas de valores validos por campo (resultado de ParseCronExpr)"
- name: t
desc: "instante de tiempo a verificar contra el schedule"
output: "true si t coincide con todos los campos del cron schedule"
tested: true
tests:
- "9:00 AM coincide con 0 9 * * *"
- "9:15 AM NO coincide con 0 9 * * *"
- "lunes a las 9 coincide con 0 9 * * 1"
- "domingo a las 9 NO coincide con 0 9 * * 1"
- "wildcard * coincide con cualquier valor"
- "specific month"
test_file_path: "functions/core/cron_match_test.go"
file_path: "functions/core/cron_match.go"
---
## Ejemplo
```go
sched, _ := ParseCronExpr("0 9 * * *")
t := time.Date(2026, 4, 11, 9, 0, 0, 0, time.UTC)
CronMatch(sched, t) // true
t2 := time.Date(2026, 4, 11, 10, 0, 0, 0, time.UTC)
CronMatch(sched, t2) // false
```
## Notas
Funcion pura. Usa AND semantics para day_of_month y day_of_week (ambos deben coincidir), igual que NextCronTime en el mismo paquete.
Reutiliza el helper intIn definido en next_cron_time.go (mismo paquete core).
+88
View File
@@ -0,0 +1,88 @@
package core
import (
"testing"
"time"
)
func TestCronMatch(t *testing.T) {
t.Run("9:00 AM coincide con 0 9 * * *", func(t *testing.T) {
sched, err := ParseCronExpr("0 9 * * *")
if err != nil {
t.Fatalf("ParseCronExpr: %v", err)
}
ts := time.Date(2026, 4, 11, 9, 0, 0, 0, time.UTC)
if !CronMatch(sched, ts) {
t.Errorf("expected match for 9:00 AM with '0 9 * * *'")
}
})
t.Run("9:15 AM NO coincide con 0 9 * * *", func(t *testing.T) {
sched, err := ParseCronExpr("0 9 * * *")
if err != nil {
t.Fatalf("ParseCronExpr: %v", err)
}
ts := time.Date(2026, 4, 11, 9, 15, 0, 0, time.UTC)
if CronMatch(sched, ts) {
t.Errorf("expected no match for 9:15 AM with '0 9 * * *'")
}
})
t.Run("lunes a las 9 coincide con 0 9 * * 1", func(t *testing.T) {
sched, err := ParseCronExpr("0 9 * * 1")
if err != nil {
t.Fatalf("ParseCronExpr: %v", err)
}
// 2026-04-13 is a Monday
ts := time.Date(2026, 4, 13, 9, 0, 0, 0, time.UTC)
if !CronMatch(sched, ts) {
t.Errorf("expected match for Monday 9:00 with '0 9 * * 1'")
}
})
t.Run("domingo a las 9 NO coincide con 0 9 * * 1", func(t *testing.T) {
sched, err := ParseCronExpr("0 9 * * 1")
if err != nil {
t.Fatalf("ParseCronExpr: %v", err)
}
// 2026-04-12 is a Sunday (weekday 0)
ts := time.Date(2026, 4, 12, 9, 0, 0, 0, time.UTC)
if CronMatch(sched, ts) {
t.Errorf("expected no match for Sunday 9:00 with '0 9 * * 1'")
}
})
t.Run("wildcard * coincide con cualquier valor", func(t *testing.T) {
sched, err := ParseCronExpr("*/15 * * * *")
if err != nil {
t.Fatalf("ParseCronExpr: %v", err)
}
// 9:30 should match */15 (0,15,30,45)
ts := time.Date(2026, 4, 11, 9, 30, 0, 0, time.UTC)
if !CronMatch(sched, ts) {
t.Errorf("expected match for 9:30 with '*/15 * * * *'")
}
// 9:07 should NOT match */15
ts2 := time.Date(2026, 4, 11, 9, 7, 0, 0, time.UTC)
if CronMatch(sched, ts2) {
t.Errorf("expected no match for 9:07 with '*/15 * * * *'")
}
})
t.Run("specific month", func(t *testing.T) {
sched, err := ParseCronExpr("0 0 1 4 *")
if err != nil {
t.Fatalf("ParseCronExpr: %v", err)
}
// April 1st matches
ts := time.Date(2026, 4, 1, 0, 0, 0, 0, time.UTC)
if !CronMatch(sched, ts) {
t.Errorf("expected match for April 1 with '0 0 1 4 *'")
}
// March 1st does NOT match
ts2 := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC)
if CronMatch(sched, ts2) {
t.Errorf("expected no match for March 1 with '0 0 1 4 *'")
}
})
}
+65
View File
@@ -0,0 +1,65 @@
package core
// DagContinueOn controls whether a step continues on failure/skip.
type DagContinueOn struct {
Failure bool
Skipped bool
}
// DagRetryPolicy configures automatic retries for a step.
type DagRetryPolicy struct {
Limit int
IntervalSec int
}
// DagStep represents a single step in a DAG workflow.
type DagStep struct {
Name string
ID string
Description string
Command string
Script string
Args []string
Shell string
Dir string
Depends []string
Env map[string]string
ContinueOn DagContinueOn
RetryPolicy DagRetryPolicy
TimeoutSec int
Output string
Tags []string
}
// DagHandlers contains lifecycle handler steps.
type DagHandlers struct {
Init []DagStep
Success []DagStep
Failure []DagStep
Exit []DagStep
}
// DagDefinition is a complete DAG workflow parsed from YAML.
type DagDefinition struct {
Name string
Description string
Group string
Type string // "graph" or "" (chain/sequential)
WorkingDir string
Shell string
Env map[string]string
Schedule []string
Steps []DagStep
HandlerOn DagHandlers
Tags []string
TimeoutSec int
FilePath string
}
// DagValidationResult contains validation output.
type DagValidationResult struct {
Valid bool
Errors []string
Warnings []string
Levels [][]string // topological levels (step names/IDs)
}
+281
View File
@@ -0,0 +1,281 @@
package core
import (
"fmt"
"strings"
"gopkg.in/yaml.v3"
)
// rawDagStep is the loosely-typed intermediate representation of a DAG step.
type rawDagStep struct {
Name string `yaml:"name"`
ID string `yaml:"id"`
Description string `yaml:"description"`
Command string `yaml:"command"`
Script string `yaml:"script"`
Args []string `yaml:"args"`
Shell string `yaml:"shell"`
Dir string `yaml:"dir"`
WorkingDir string `yaml:"working_dir"`
Depends []string `yaml:"depends"`
Env interface{} `yaml:"env"`
ContinueOn rawDagContinueOn `yaml:"continue_on"`
RetryPolicy rawDagRetryPolicy `yaml:"retry_policy"`
TimeoutSec int `yaml:"timeout_sec"`
Output string `yaml:"output"`
Tags []string `yaml:"tags"`
}
type rawDagContinueOn struct {
Failure bool `yaml:"failure"`
Skipped bool `yaml:"skipped"`
}
type rawDagRetryPolicy struct {
Limit int `yaml:"limit"`
IntervalSec int `yaml:"interval_sec"`
}
// rawDagHandlers handles both handler_on and handlers aliases.
type rawDagHandlers struct {
Init interface{} `yaml:"init"`
Success interface{} `yaml:"success"`
Failure interface{} `yaml:"failure"`
Exit interface{} `yaml:"exit"`
}
// rawDag is the loosely-typed intermediate representation of a DAG YAML.
type rawDag struct {
Name string `yaml:"name"`
Description string `yaml:"description"`
Group string `yaml:"group"`
Type string `yaml:"type"`
WorkingDir string `yaml:"working_dir"`
Shell string `yaml:"shell"`
Env interface{} `yaml:"env"`
Schedule interface{} `yaml:"schedule"`
Steps []rawDagStep `yaml:"steps"`
HandlerOn *rawDagHandlers `yaml:"handler_on"`
Handlers *rawDagHandlers `yaml:"handlers"`
Tags []string `yaml:"tags"`
TimeoutSec int `yaml:"timeout_sec"`
}
// DagParse parses a YAML DAG definition compatible with Dagu format.
// Handles schedule as string or list, env as list of single-key maps,
// handler_on and handlers as aliases, and steps with command/script/depends.
func DagParse(data []byte) (DagDefinition, error) {
var raw rawDag
if err := yaml.Unmarshal(data, &raw); err != nil {
return DagDefinition{}, fmt.Errorf("dag_parse: yaml unmarshal: %w", err)
}
def := DagDefinition{
Name: raw.Name,
Description: raw.Description,
Group: raw.Group,
Type: raw.Type,
WorkingDir: raw.WorkingDir,
Shell: raw.Shell,
Tags: raw.Tags,
TimeoutSec: raw.TimeoutSec,
}
// Normalize env (list of single-key maps or plain map).
dagEnv, err := normalizeEnv(raw.Env)
if err != nil {
return DagDefinition{}, fmt.Errorf("dag_parse: env: %w", err)
}
def.Env = dagEnv
// Normalize schedule (string or list).
def.Schedule = normalizeSchedule(raw.Schedule)
// Normalize steps.
steps := make([]DagStep, 0, len(raw.Steps))
for i, rs := range raw.Steps {
step, err := normalizeStep(rs)
if err != nil {
return DagDefinition{}, fmt.Errorf("dag_parse: step[%d]: %w", i, err)
}
steps = append(steps, step)
}
def.Steps = steps
// Normalize handlers: handler_on takes precedence, fallback to handlers.
rawHandlers := raw.HandlerOn
if rawHandlers == nil {
rawHandlers = raw.Handlers
}
if rawHandlers != nil {
handlers, err := normalizeHandlers(rawHandlers)
if err != nil {
return DagDefinition{}, fmt.Errorf("dag_parse: handlers: %w", err)
}
def.HandlerOn = handlers
}
return def, nil
}
// normalizeSchedule converts schedule from string or []interface{} to []string.
func normalizeSchedule(v interface{}) []string {
if v == nil {
return nil
}
switch t := v.(type) {
case string:
s := strings.TrimSpace(t)
if s == "" {
return nil
}
return []string{s}
case []interface{}:
result := make([]string, 0, len(t))
for _, item := range t {
if s, ok := item.(string); ok {
s = strings.TrimSpace(s)
if s != "" {
result = append(result, s)
}
}
}
return result
}
return nil
}
// normalizeEnv converts env from Dagu format (list of single-key maps) or plain map to map[string]string.
func normalizeEnv(v interface{}) (map[string]string, error) {
if v == nil {
return nil, nil
}
switch t := v.(type) {
case map[string]interface{}:
// Plain YAML map.
result := make(map[string]string, len(t))
for k, val := range t {
result[k] = fmt.Sprintf("%v", val)
}
return result, nil
case []interface{}:
// Dagu format: list of single-key maps [KEY: value, KEY2: value2].
result := make(map[string]string)
for _, item := range t {
m, ok := item.(map[string]interface{})
if !ok {
continue
}
for k, val := range m {
result[k] = fmt.Sprintf("%v", val)
}
}
return result, nil
}
return nil, fmt.Errorf("unexpected env type %T", v)
}
// normalizeStep converts a rawDagStep to a DagStep.
func normalizeStep(rs rawDagStep) (DagStep, error) {
stepEnv, err := normalizeEnv(rs.Env)
if err != nil {
return DagStep{}, fmt.Errorf("env: %w", err)
}
// working_dir is an alias for dir at the step level.
dir := rs.Dir
if dir == "" {
dir = rs.WorkingDir
}
return DagStep{
Name: rs.Name,
ID: rs.ID,
Description: rs.Description,
Command: rs.Command,
Script: rs.Script,
Args: rs.Args,
Shell: rs.Shell,
Dir: dir,
Depends: rs.Depends,
Env: stepEnv,
ContinueOn: DagContinueOn{
Failure: rs.ContinueOn.Failure,
Skipped: rs.ContinueOn.Skipped,
},
RetryPolicy: DagRetryPolicy{
Limit: rs.RetryPolicy.Limit,
IntervalSec: rs.RetryPolicy.IntervalSec,
},
TimeoutSec: rs.TimeoutSec,
Output: rs.Output,
Tags: rs.Tags,
}, nil
}
// normalizeHandlers converts rawDagHandlers to DagHandlers.
// Each handler field can be a single step object or a list of steps.
func normalizeHandlers(rh *rawDagHandlers) (DagHandlers, error) {
var h DagHandlers
var err error
h.Init, err = normalizeHandlerField(rh.Init)
if err != nil {
return DagHandlers{}, fmt.Errorf("init: %w", err)
}
h.Success, err = normalizeHandlerField(rh.Success)
if err != nil {
return DagHandlers{}, fmt.Errorf("success: %w", err)
}
h.Failure, err = normalizeHandlerField(rh.Failure)
if err != nil {
return DagHandlers{}, fmt.Errorf("failure: %w", err)
}
h.Exit, err = normalizeHandlerField(rh.Exit)
if err != nil {
return DagHandlers{}, fmt.Errorf("exit: %w", err)
}
return h, nil
}
// normalizeHandlerField converts a handler field (single step or list) to []DagStep.
func normalizeHandlerField(v interface{}) ([]DagStep, error) {
if v == nil {
return nil, nil
}
// Re-marshal and unmarshal to handle polymorphic types cleanly.
b, err := yaml.Marshal(v)
if err != nil {
return nil, fmt.Errorf("re-marshal handler: %w", err)
}
// Try as a list first.
var rawList []rawDagStep
if err := yaml.Unmarshal(b, &rawList); err == nil && len(rawList) > 0 {
steps := make([]DagStep, 0, len(rawList))
for i, rs := range rawList {
step, err := normalizeStep(rs)
if err != nil {
return nil, fmt.Errorf("step[%d]: %w", i, err)
}
steps = append(steps, step)
}
return steps, nil
}
// Try as single step.
var rs rawDagStep
if err := yaml.Unmarshal(b, &rs); err != nil {
return nil, fmt.Errorf("unmarshal handler step: %w", err)
}
step, err := normalizeStep(rs)
if err != nil {
return nil, err
}
// If the step has no meaningful content, return nil.
if step.Name == "" && step.ID == "" && step.Command == "" && step.Script == "" {
return nil, nil
}
return []DagStep{step}, nil
}
+54
View File
@@ -0,0 +1,54 @@
---
name: dag_parse
kind: function
lang: go
domain: core
version: "1.0.0"
purity: pure
signature: "func DagParse(data []byte) (DagDefinition, error)"
description: "Parsea YAML de definicion de DAG en formato compatible con Dagu. Soporta schedule como string o lista, env como lista de maps single-key (formato Dagu), handler_on y handlers como aliases, steps con command/script/depends/continue_on, y type graph."
tags: [dag, yaml, parsing, workflow, dagu, pure]
uses_functions: []
uses_types: [dag_definition_go_core, dag_step_go_core, dag_handlers_go_core]
returns: []
returns_optional: false
error_type: ""
imports: [fmt, strings, gopkg.in/yaml.v3]
params:
- name: data
desc: "contenido YAML de un archivo de definicion de DAG en formato Dagu"
output: "DagDefinition con todos los campos normalizados; error si el YAML es sintaticamente invalido"
tested: true
tests:
- "parsea DAG simple con steps y depends"
- "parsea schedule como string y como lista"
- "parsea env en formato lista de maps"
- "parsea handler_on y handlers como alias"
- "parsea continue_on y working_dir a nivel step"
- "parsea type graph"
test_file_path: "functions/core/dag_parse_test.go"
file_path: "functions/core/dag_parse.go"
---
## Ejemplo
```go
data := []byte(`
name: mi-dag
schedule: "0 9 * * *"
steps:
- name: hello
command: echo "hello"
- name: world
command: echo "world"
depends: [hello]
`)
dag, err := DagParse(data)
// dag.Name = "mi-dag"
// dag.Schedule = ["0 9 * * *"]
// dag.Steps[1].Depends = ["hello"]
```
## Notas
Funcion pura (el YAML es inmutable, no hay I/O). Internamente usa un struct rawDag para deserializar loosely y luego normaliza campos polimorficos. La estrategia de normalizacion: schedule string->[]string, env lista->map, handlers single-o-lista->[]DagStep. handler_on tiene precedencia sobre handlers si ambos estan presentes.
+213
View File
@@ -0,0 +1,213 @@
package core
import (
"testing"
)
func TestDagParse(t *testing.T) {
t.Run("parsea DAG simple con steps y depends", func(t *testing.T) {
data := []byte(`
name: example
description: Example workflow
steps:
- name: hello
command: echo "Hello!"
- name: list_files
command: ls -la /tmp
depends:
- hello
`)
dag, err := DagParse(data)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if dag.Name != "example" {
t.Errorf("Name: got %q, want %q", dag.Name, "example")
}
if len(dag.Steps) != 2 {
t.Fatalf("Steps: got %d, want 2", len(dag.Steps))
}
if dag.Steps[0].Name != "hello" {
t.Errorf("Steps[0].Name: got %q, want %q", dag.Steps[0].Name, "hello")
}
if dag.Steps[1].Name != "list_files" {
t.Errorf("Steps[1].Name: got %q, want %q", dag.Steps[1].Name, "list_files")
}
if len(dag.Steps[1].Depends) != 1 || dag.Steps[1].Depends[0] != "hello" {
t.Errorf("Steps[1].Depends: got %v, want [hello]", dag.Steps[1].Depends)
}
})
t.Run("parsea schedule como string y como lista", func(t *testing.T) {
// Schedule as string.
dataStr := []byte(`
name: dag-string-schedule
schedule: "0 9 * * 5"
steps:
- name: step1
command: echo ok
`)
dagStr, err := DagParse(dataStr)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(dagStr.Schedule) != 1 || dagStr.Schedule[0] != "0 9 * * 5" {
t.Errorf("Schedule (string): got %v, want [\"0 9 * * 5\"]", dagStr.Schedule)
}
// Schedule as list.
dataList := []byte(`
name: dag-list-schedule
schedule:
- "0 9 * * *"
- "0 18 * * *"
steps:
- name: step1
command: echo ok
`)
dagList, err := DagParse(dataList)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(dagList.Schedule) != 2 {
t.Fatalf("Schedule (list): got %d items, want 2", len(dagList.Schedule))
}
if dagList.Schedule[0] != "0 9 * * *" {
t.Errorf("Schedule[0]: got %q, want %q", dagList.Schedule[0], "0 9 * * *")
}
if dagList.Schedule[1] != "0 18 * * *" {
t.Errorf("Schedule[1]: got %q, want %q", dagList.Schedule[1], "0 18 * * *")
}
})
t.Run("parsea env en formato lista de maps", func(t *testing.T) {
data := []byte(`
name: dag-env
env:
- PROJECT_DIR: /home/lucas/analysis
- PYTHON: /home/lucas/.venv/bin/python
steps:
- name: step1
command: echo ${PROJECT_DIR}
`)
dag, err := DagParse(data)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if dag.Env["PROJECT_DIR"] != "/home/lucas/analysis" {
t.Errorf("Env[PROJECT_DIR]: got %q, want %q", dag.Env["PROJECT_DIR"], "/home/lucas/analysis")
}
if dag.Env["PYTHON"] != "/home/lucas/.venv/bin/python" {
t.Errorf("Env[PYTHON]: got %q, want %q", dag.Env["PYTHON"], "/home/lucas/.venv/bin/python")
}
})
t.Run("parsea handler_on y handlers como alias", func(t *testing.T) {
// handler_on with single step object.
dataHandlerOn := []byte(`
name: dag-handler-on
handler_on:
failure:
command: echo "FALLO"
steps:
- name: step1
command: echo hello
`)
dagHO, err := DagParse(dataHandlerOn)
if err != nil {
t.Fatalf("unexpected error (handler_on): %v", err)
}
if len(dagHO.HandlerOn.Failure) != 1 {
t.Fatalf("HandlerOn.Failure: got %d steps, want 1", len(dagHO.HandlerOn.Failure))
}
if dagHO.HandlerOn.Failure[0].Command != `echo "FALLO"` {
t.Errorf("HandlerOn.Failure[0].Command: got %q", dagHO.HandlerOn.Failure[0].Command)
}
// handlers alias with list of steps.
dataHandlers := []byte(`
name: dag-handlers
handlers:
failure:
- name: mark_as_failed
command: echo "FAILED"
success:
- name: notify
command: echo "OK"
steps:
- name: step1
command: echo hello
`)
dagH, err := DagParse(dataHandlers)
if err != nil {
t.Fatalf("unexpected error (handlers): %v", err)
}
if len(dagH.HandlerOn.Failure) != 1 || dagH.HandlerOn.Failure[0].Name != "mark_as_failed" {
t.Errorf("HandlerOn.Failure: got %v", dagH.HandlerOn.Failure)
}
if len(dagH.HandlerOn.Success) != 1 || dagH.HandlerOn.Success[0].Name != "notify" {
t.Errorf("HandlerOn.Success: got %v", dagH.HandlerOn.Success)
}
})
t.Run("parsea continue_on y working_dir a nivel step", func(t *testing.T) {
data := []byte(`
name: dag-step-options
steps:
- id: ingest
description: Procesar archivos
working_dir: /home/lucas/project
command: ./bin/ingest
continue_on:
failure: true
- id: informe
command: python /tmp/script.py
depends: [ingest]
`)
dag, err := DagParse(data)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(dag.Steps) != 2 {
t.Fatalf("Steps: got %d, want 2", len(dag.Steps))
}
step0 := dag.Steps[0]
if step0.ID != "ingest" {
t.Errorf("Steps[0].ID: got %q, want %q", step0.ID, "ingest")
}
if step0.Dir != "/home/lucas/project" {
t.Errorf("Steps[0].Dir: got %q, want %q", step0.Dir, "/home/lucas/project")
}
if !step0.ContinueOn.Failure {
t.Errorf("Steps[0].ContinueOn.Failure: got false, want true")
}
step1 := dag.Steps[1]
if len(step1.Depends) != 1 || step1.Depends[0] != "ingest" {
t.Errorf("Steps[1].Depends: got %v, want [ingest]", step1.Depends)
}
})
t.Run("parsea type graph", func(t *testing.T) {
data := []byte(`
name: dag-graph
type: graph
tags: [finanzas, semanal]
steps:
- name: step1
command: echo a
- name: step2
command: echo b
depends: [step1]
`)
dag, err := DagParse(data)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if dag.Type != "graph" {
t.Errorf("Type: got %q, want %q", dag.Type, "graph")
}
if len(dag.Tags) != 2 {
t.Errorf("Tags: got %v, want [finanzas semanal]", dag.Tags)
}
})
}
+157
View File
@@ -0,0 +1,157 @@
package core
import (
"strings"
)
// DagResolveEnv resolves environment variable references in a DagDefinition.
// It parses environ (KEY=VALUE pairs from os.Environ()), merges with dag.Env
// (dag.Env takes precedence), and substitutes ${VAR} and $VAR in Command,
// Script, Dir, Args, and Env values of each step. Also substitutes in
// DagDefinition.WorkingDir. Returns a new DagDefinition without mutating the original.
func DagResolveEnv(dag DagDefinition, environ []string) DagDefinition {
// Parse environ into a base map.
base := parseEnviron(environ)
// Merge dag.Env over base (dag.Env has precedence).
dagEnv := mergeMaps(base, dag.Env)
// Build new DagDefinition (shallow copy, then replace fields).
result := dag
// Substitute in WorkingDir.
result.WorkingDir = substitute(dag.WorkingDir, dagEnv)
// Substitute in Env values of the DAG itself.
result.Env = substituteMap(dag.Env, dagEnv)
// Resolve steps.
resolvedSteps := make([]DagStep, len(dag.Steps))
for i, step := range dag.Steps {
// Merge dag env with step env (step env has precedence).
stepEnv := mergeMaps(dagEnv, step.Env)
rs := step
rs.Command = substitute(step.Command, stepEnv)
rs.Script = substitute(step.Script, stepEnv)
rs.Dir = substitute(step.Dir, stepEnv)
if len(step.Args) > 0 {
args := make([]string, len(step.Args))
for j, arg := range step.Args {
args[j] = substitute(arg, stepEnv)
}
rs.Args = args
}
rs.Env = substituteMap(step.Env, stepEnv)
resolvedSteps[i] = rs
}
result.Steps = resolvedSteps
return result
}
// parseEnviron parses a slice of "KEY=VALUE" strings into a map.
func parseEnviron(environ []string) map[string]string {
m := make(map[string]string, len(environ))
for _, kv := range environ {
idx := strings.IndexByte(kv, '=')
if idx < 0 {
continue
}
m[kv[:idx]] = kv[idx+1:]
}
return m
}
// mergeMaps returns a new map with all entries from base, then overlaid with overlay.
func mergeMaps(base, overlay map[string]string) map[string]string {
result := make(map[string]string, len(base)+len(overlay))
for k, v := range base {
result[k] = v
}
for k, v := range overlay {
result[k] = v
}
return result
}
// substituteMap applies substitute to all values in a map.
func substituteMap(m map[string]string, env map[string]string) map[string]string {
if len(m) == 0 {
return m
}
result := make(map[string]string, len(m))
for k, v := range m {
result[k] = substitute(v, env)
}
return result
}
// substitute replaces ${VAR} and $VAR occurrences in s using the env map.
func substitute(s string, env map[string]string) string {
if s == "" || !strings.Contains(s, "$") {
return s
}
var b strings.Builder
i := 0
for i < len(s) {
if s[i] != '$' {
b.WriteByte(s[i])
i++
continue
}
// Found '$'
i++
if i >= len(s) {
b.WriteByte('$')
break
}
if s[i] == '{' {
// ${VAR} form.
i++
end := strings.IndexByte(s[i:], '}')
if end < 0 {
// No closing brace — write as-is.
b.WriteString("${")
continue
}
varName := s[i : i+end]
i += end + 1
if val, ok := env[varName]; ok {
b.WriteString(val)
} else {
b.WriteString("${")
b.WriteString(varName)
b.WriteByte('}')
}
} else if isEnvVarChar(s[i]) {
// $VAR form (bare variable name).
start := i
for i < len(s) && isEnvVarChar(s[i]) {
i++
}
varName := s[start:i]
if val, ok := env[varName]; ok {
b.WriteString(val)
} else {
b.WriteByte('$')
b.WriteString(varName)
}
} else {
// Not a valid variable start — keep '$' and continue.
b.WriteByte('$')
}
}
return b.String()
}
// isEnvVarChar returns true for characters valid in environment variable names.
func isEnvVarChar(c byte) bool {
return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_'
}
+39
View File
@@ -0,0 +1,39 @@
---
name: dag_resolve_env
kind: function
lang: go
domain: core
version: "1.0.0"
purity: pure
signature: "func DagResolveEnv(dag DagDefinition, environ []string) DagDefinition"
description: "Resuelve referencias a variables de entorno (${VAR} y $VAR) en un DagDefinition. Mergea environ del sistema con dag.Env (dag.Env tiene precedencia), y sustituye en Command, Script, Dir, Args y Env values de cada step. No muta el DagDefinition original."
tags: [dag, env, substitution, variable-expansion, pure]
uses_functions: []
uses_types: [dag_definition_go_core, dag_step_go_core]
returns: []
returns_optional: false
error_type: ""
imports: [strings]
params:
- name: dag
desc: "DagDefinition parseado con posibles referencias ${VAR} en sus campos"
- name: environ
desc: "lista de strings KEY=VALUE del entorno del sistema (tipicamente os.Environ())"
output: "nuevo DagDefinition con todas las referencias de variables sustituidas por sus valores"
tested: false
tests: []
test_file_path: ""
file_path: "functions/core/dag_resolve_env.go"
---
## Ejemplo
```go
dag, _ := DagParse(yamlData)
resolved := DagResolveEnv(dag, os.Environ())
// resolved.Steps[0].Command tiene ${PROJECT_DIR} sustituido
```
## Notas
Funcion pura. No muta el DagDefinition de entrada — retorna una copia con los campos resueltos. Precedencia de variables: environ base < dag.Env < step.Env. Sustituye tanto ${VAR} (con llaves) como $VAR (bare). Si una variable no existe en el entorno, se deja sin sustituir. Los campos sustituidos son: DagDefinition.WorkingDir, Env values del DAG, y por step: Command, Script, Dir, Args, Env values.
+78
View File
@@ -0,0 +1,78 @@
package core
import "fmt"
// DagTopoSort performs a topological sort of DAG steps using Kahn's algorithm.
// Returns levels where each level contains steps that can run in parallel.
// Returns an error if a dependency cycle is detected.
func DagTopoSort(steps []DagStep) ([][]DagStep, error) {
if len(steps) == 0 {
return nil, nil
}
// Build index: ref -> DagStep.
index := make(map[string]DagStep, len(steps))
for _, s := range steps {
index[stepRef(s)] = s
}
// Compute in-degree for each step.
inDegree := make(map[string]int, len(steps))
for _, s := range steps {
ref := stepRef(s)
if _, ok := inDegree[ref]; !ok {
inDegree[ref] = 0
}
for range s.Depends {
inDegree[ref]++ // ref depends on a dep, so ref gets +1
}
}
// Build adjacency list: dep -> list of steps that depend on dep.
adj := make(map[string][]string, len(steps))
for _, s := range steps {
ref := stepRef(s)
for _, dep := range s.Depends {
adj[dep] = append(adj[dep], ref)
}
}
// Initialize queue with steps that have in-degree 0.
var queue []string
for _, s := range steps {
ref := stepRef(s)
if inDegree[ref] == 0 {
queue = append(queue, ref)
}
}
var levels [][]DagStep
processed := 0
for len(queue) > 0 {
// The current queue forms a parallel level.
level := make([]DagStep, 0, len(queue))
nextQueue := []string{}
for _, ref := range queue {
level = append(level, index[ref])
processed++
// Reduce in-degree for dependents.
for _, dependent := range adj[ref] {
inDegree[dependent]--
if inDegree[dependent] == 0 {
nextQueue = append(nextQueue, dependent)
}
}
}
levels = append(levels, level)
queue = nextQueue
}
if processed != len(steps) {
return nil, fmt.Errorf("dag_topo_sort: cycle detected (%d of %d steps processed)", processed, len(steps))
}
return levels, nil
}
+48
View File
@@ -0,0 +1,48 @@
---
name: dag_topo_sort
kind: function
lang: go
domain: core
version: "1.0.0"
purity: pure
signature: "func DagTopoSort(steps []DagStep) ([][]DagStep, error)"
description: "Ordenamiento topologico de steps de un DAG usando el algoritmo de Kahn. Retorna niveles donde cada nivel contiene steps que pueden ejecutarse en paralelo. Detecta ciclos y retorna error si los hay."
tags: [dag, topological-sort, graph, kahn, pure]
uses_functions: []
uses_types: [dag_step_go_core]
returns: []
returns_optional: false
error_type: ""
imports: [fmt]
params:
- name: steps
desc: "lista de DagStep con sus dependencias definidas en el campo Depends"
output: "slice de niveles donde cada nivel es un slice de DagStep que pueden ejecutarse en paralelo; error si hay ciclo"
tested: true
tests:
- "cadena lineal A->B->C produce tres niveles"
- "diamond A->B A->C B->D C->D produce cuatro niveles"
- "steps paralelos sin depends produce un solo nivel"
- "ciclo A->B->A retorna error"
test_file_path: "functions/core/dag_topo_sort_test.go"
file_path: "functions/core/dag_topo_sort.go"
---
## Ejemplo
```go
steps := []DagStep{
{Name: "a"},
{Name: "b", Depends: []string{"a"}},
{Name: "c", Depends: []string{"a"}},
{Name: "d", Depends: []string{"b", "c"}},
}
levels, err := DagTopoSort(steps)
// levels[0] = [a]
// levels[1] = [b, c] (paralelo)
// levels[2] = [d]
```
## Notas
Funcion pura. Implementa Kahn's algorithm: calcula in-degree inicial, pone en cola los steps con in-degree 0, extrae un nivel completo por iteracion y reduce el in-degree de los dependientes. Si al terminar hay steps sin procesar, hay un ciclo. El orden dentro de cada nivel no esta garantizado — depende del orden del slice de entrada.
+95
View File
@@ -0,0 +1,95 @@
package core
import (
"testing"
)
func TestDagTopoSort(t *testing.T) {
t.Run("cadena lineal A->B->C produce tres niveles", func(t *testing.T) {
steps := []DagStep{
{Name: "a"},
{Name: "b", Depends: []string{"a"}},
{Name: "c", Depends: []string{"b"}},
}
levels, err := DagTopoSort(steps)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(levels) != 3 {
t.Fatalf("levels: got %d, want 3", len(levels))
}
if len(levels[0]) != 1 || levels[0][0].Name != "a" {
t.Errorf("levels[0]: got %v, want [a]", stepNames(levels[0]))
}
if len(levels[1]) != 1 || levels[1][0].Name != "b" {
t.Errorf("levels[1]: got %v, want [b]", stepNames(levels[1]))
}
if len(levels[2]) != 1 || levels[2][0].Name != "c" {
t.Errorf("levels[2]: got %v, want [c]", stepNames(levels[2]))
}
})
t.Run("diamond A->B A->C B->D C->D produce cuatro niveles", func(t *testing.T) {
steps := []DagStep{
{Name: "a"},
{Name: "b", Depends: []string{"a"}},
{Name: "c", Depends: []string{"a"}},
{Name: "d", Depends: []string{"b", "c"}},
}
levels, err := DagTopoSort(steps)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Expected: level0=[a], level1=[b,c], level2=[d]
if len(levels) != 3 {
t.Fatalf("levels: got %d, want 3", len(levels))
}
if len(levels[0]) != 1 || levels[0][0].Name != "a" {
t.Errorf("levels[0]: got %v, want [a]", stepNames(levels[0]))
}
if len(levels[1]) != 2 {
t.Errorf("levels[1]: got %v, want 2 steps", stepNames(levels[1]))
}
if len(levels[2]) != 1 || levels[2][0].Name != "d" {
t.Errorf("levels[2]: got %v, want [d]", stepNames(levels[2]))
}
})
t.Run("steps paralelos sin depends produce un solo nivel", func(t *testing.T) {
steps := []DagStep{
{Name: "x"},
{Name: "y"},
{Name: "z"},
}
levels, err := DagTopoSort(steps)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(levels) != 1 {
t.Fatalf("levels: got %d, want 1", len(levels))
}
if len(levels[0]) != 3 {
t.Errorf("levels[0]: got %d steps, want 3", len(levels[0]))
}
})
t.Run("ciclo A->B->A retorna error", func(t *testing.T) {
steps := []DagStep{
{Name: "a", Depends: []string{"b"}},
{Name: "b", Depends: []string{"a"}},
}
_, err := DagTopoSort(steps)
if err == nil {
t.Fatal("expected error for cycle, got nil")
}
})
}
// stepNames extracts step names for readable test output.
func stepNames(steps []DagStep) []string {
names := make([]string, len(steps))
for i, s := range steps {
names[i] = stepRef(s)
}
return names
}
+83
View File
@@ -0,0 +1,83 @@
package core
import "fmt"
// DagValidate validates a DagDefinition for structural correctness.
// Checks: steps have name/ID, no duplicate names/IDs, all depends reference
// existing steps, no dependency cycles. On success, computes topological levels.
// Returns warnings for steps with both command and script set.
func DagValidate(dag DagDefinition) DagValidationResult {
result := DagValidationResult{Valid: true}
// Build name/ID sets and check for missing identifiers and duplicates.
seen := make(map[string]bool)
for i, step := range dag.Steps {
ref := stepRef(step)
if ref == "" {
result.Errors = append(result.Errors,
fmt.Sprintf("step[%d]: must have name or id", i))
result.Valid = false
continue
}
if seen[ref] {
result.Errors = append(result.Errors,
fmt.Sprintf("step[%d]: duplicate name/id %q", i, ref))
result.Valid = false
}
seen[ref] = true
// Warning: command and script both set.
if step.Command != "" && step.Script != "" {
result.Warnings = append(result.Warnings,
fmt.Sprintf("step %q: has both command and script", ref))
}
}
if !result.Valid {
return result
}
// Check that all depends reference existing steps.
for _, step := range dag.Steps {
for _, dep := range step.Depends {
if !seen[dep] {
result.Errors = append(result.Errors,
fmt.Sprintf("step %q: depends on unknown step %q", stepRef(step), dep))
result.Valid = false
}
}
}
if !result.Valid {
return result
}
// Topological sort with Kahn's — detects cycles and computes levels.
levels, err := DagTopoSort(dag.Steps)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("cycle detected: %v", err))
result.Valid = false
return result
}
// Convert [][]DagStep to [][]string for the result.
strLevels := make([][]string, len(levels))
for i, level := range levels {
refs := make([]string, len(level))
for j, s := range level {
refs[j] = stepRef(s)
}
strLevels[i] = refs
}
result.Levels = strLevels
return result
}
// stepRef returns the canonical reference for a step (ID preferred, then Name).
func stepRef(s DagStep) string {
if s.ID != "" {
return s.ID
}
return s.Name
}
+46
View File
@@ -0,0 +1,46 @@
---
name: dag_validate
kind: function
lang: go
domain: core
version: "1.0.0"
purity: pure
signature: "func DagValidate(dag DagDefinition) DagValidationResult"
description: "Valida un DagDefinition para correcto uso estructural. Verifica que cada step tenga nombre o ID, que no haya duplicados, que todos los depends referencien steps existentes y que no haya ciclos (algoritmo de Kahn). Si el DAG es valido, calcula los niveles topologicos."
tags: [dag, validation, workflow, graph, pure]
uses_functions: [dag_topo_sort_go_core]
uses_types: [dag_definition_go_core, dag_validation_result_go_core]
returns: []
returns_optional: false
error_type: ""
imports: [fmt]
params:
- name: dag
desc: "DagDefinition parseado y a validar"
output: "DagValidationResult con Valid=true si no hay errores, Errors con los problemas encontrados, Warnings con avisos no fatales, y Levels con los niveles topologicos si el DAG es valido"
tested: true
tests:
- "DAG valido retorna Valid true y levels calculados"
- "step sin nombre retorna error"
- "depends a step inexistente retorna error"
- "ciclo en dependencias retorna error"
test_file_path: "functions/core/dag_validate_test.go"
file_path: "functions/core/dag_validate.go"
---
## Ejemplo
```go
dag, _ := DagParse(yamlData)
res := DagValidate(dag)
if !res.Valid {
for _, e := range res.Errors {
fmt.Println("ERROR:", e)
}
}
// res.Levels = [["step-a"], ["step-b", "step-c"], ["step-d"]]
```
## Notas
Funcion pura. No modifica el DagDefinition de entrada. El calculo de niveles topologicos se delega a DagTopoSort para mantener la separacion de responsabilidades. Un warning (command+script simultaneos) no invalida el DAG.
+113
View File
@@ -0,0 +1,113 @@
package core
import (
"testing"
)
func TestDagValidate(t *testing.T) {
t.Run("DAG valido retorna Valid true y levels calculados", func(t *testing.T) {
dag := DagDefinition{
Name: "valid-dag",
Steps: []DagStep{
{Name: "a", Command: "echo a"},
{Name: "b", Command: "echo b", Depends: []string{"a"}},
{Name: "c", Command: "echo c", Depends: []string{"a"}},
{Name: "d", Command: "echo d", Depends: []string{"b", "c"}},
},
}
res := DagValidate(dag)
if !res.Valid {
t.Errorf("Valid: got false, want true. Errors: %v", res.Errors)
}
if len(res.Errors) != 0 {
t.Errorf("Errors: got %v, want empty", res.Errors)
}
// Should have 3 levels: [a], [b,c], [d]
if len(res.Levels) != 3 {
t.Errorf("Levels: got %d, want 3. Levels: %v", len(res.Levels), res.Levels)
}
if len(res.Levels[0]) != 1 || res.Levels[0][0] != "a" {
t.Errorf("Levels[0]: got %v, want [a]", res.Levels[0])
}
if len(res.Levels[2]) != 1 || res.Levels[2][0] != "d" {
t.Errorf("Levels[2]: got %v, want [d]", res.Levels[2])
}
})
t.Run("step sin nombre retorna error", func(t *testing.T) {
dag := DagDefinition{
Name: "bad-dag",
Steps: []DagStep{
{Command: "echo hello"}, // no name, no ID
},
}
res := DagValidate(dag)
if res.Valid {
t.Error("Valid: got true, want false")
}
if len(res.Errors) == 0 {
t.Error("Errors: got empty, want at least one error")
}
})
t.Run("depends a step inexistente retorna error", func(t *testing.T) {
dag := DagDefinition{
Name: "bad-deps-dag",
Steps: []DagStep{
{Name: "step1", Command: "echo ok"},
{Name: "step2", Command: "echo ok", Depends: []string{"nonexistent"}},
},
}
res := DagValidate(dag)
if res.Valid {
t.Error("Valid: got true, want false")
}
found := false
for _, e := range res.Errors {
if containsStr(e, "nonexistent") {
found = true
break
}
}
if !found {
t.Errorf("Errors should mention 'nonexistent', got: %v", res.Errors)
}
})
t.Run("ciclo en dependencias retorna error", func(t *testing.T) {
dag := DagDefinition{
Name: "cyclic-dag",
Steps: []DagStep{
{Name: "a", Command: "echo a", Depends: []string{"b"}},
{Name: "b", Command: "echo b", Depends: []string{"a"}},
},
}
res := DagValidate(dag)
if res.Valid {
t.Error("Valid: got true, want false")
}
found := false
for _, e := range res.Errors {
if containsStr(e, "cycle") {
found = true
break
}
}
if !found {
t.Errorf("Errors should mention 'cycle', got: %v", res.Errors)
}
})
}
// containsStr returns true if s contains substr.
func containsStr(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(substr) == 0 ||
func() bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}())
}
+20
View File
@@ -0,0 +1,20 @@
---
name: dag_continue_on
lang: go
domain: core
version: "1.0.0"
algebraic: product
definition: |
type DagContinueOn struct {
Failure bool
Skipped bool
}
description: "Politica de continuacion de un step DAG ante fallos o saltos. Cuando Failure=true el DAG no se detiene si el step falla. Cuando Skipped=true tampoco se detiene si el step es saltado."
tags: [dag, workflow, policy, error-handling]
uses_types: []
file_path: "functions/core/dag_definition.go"
---
## Notas
Tipo producto. Embebido en DagStep. Corresponde al campo `continue_on` del YAML de Dagu.
+31
View File
@@ -0,0 +1,31 @@
---
name: dag_definition
lang: go
domain: core
version: "1.0.0"
algebraic: product
definition: |
type DagDefinition struct {
Name string
Description string
Group string
Type string
WorkingDir string
Shell string
Env map[string]string
Schedule []string
Steps []DagStep
HandlerOn DagHandlers
Tags []string
TimeoutSec int
FilePath string
}
description: "Definicion completa de un workflow DAG parseada desde YAML compatible con Dagu. Contiene steps, handlers de ciclo de vida, variables de entorno, schedule y metadatos del flujo."
tags: [dag, workflow, yaml, dagu, definition]
uses_types: [dag_step_go_core, dag_handlers_go_core]
file_path: "functions/core/dag_definition.go"
---
## Notas
Tipo producto. Type puede ser "graph" (ejecucion paralela por niveles topologicos) o vacio (cadena secuencial). Schedule es siempre una lista de strings (normalizado desde string o lista en el YAML). FilePath se rellena opcionalmente por el caller para saber el origen del archivo.
+22
View File
@@ -0,0 +1,22 @@
---
name: dag_handlers
lang: go
domain: core
version: "1.0.0"
algebraic: product
definition: |
type DagHandlers struct {
Init []DagStep
Success []DagStep
Failure []DagStep
Exit []DagStep
}
description: "Handlers de ciclo de vida de un DAG. Cada campo contiene steps que se ejecutan en el evento correspondiente: inicializacion, exito, fallo o salida (siempre). Corresponde a handler_on o handlers en el YAML de Dagu."
tags: [dag, workflow, handlers, lifecycle]
uses_types: [dag_step_go_core]
file_path: "functions/core/dag_definition.go"
---
## Notas
Tipo producto. Embebido en DagDefinition. Los campos handler_on y handlers son aliases en el YAML de Dagu — ambos se normalizan a este tipo. Cada handler puede ser un step unico o una lista de steps en el YAML.
+20
View File
@@ -0,0 +1,20 @@
---
name: dag_retry_policy
lang: go
domain: core
version: "1.0.0"
algebraic: product
definition: |
type DagRetryPolicy struct {
Limit int
IntervalSec int
}
description: "Politica de reintentos automaticos para un step DAG. Limit es el numero maximo de reintentos e IntervalSec es el tiempo de espera entre intentos en segundos."
tags: [dag, workflow, retry, policy]
uses_types: []
file_path: "functions/core/dag_definition.go"
---
## Notas
Tipo producto. Embebido en DagStep. Corresponde al campo `retry_policy` del YAML de Dagu. Limit=0 significa sin reintentos.
+33
View File
@@ -0,0 +1,33 @@
---
name: dag_step
lang: go
domain: core
version: "1.0.0"
algebraic: product
definition: |
type DagStep struct {
Name string
ID string
Description string
Command string
Script string
Args []string
Shell string
Dir string
Depends []string
Env map[string]string
ContinueOn DagContinueOn
RetryPolicy DagRetryPolicy
TimeoutSec int
Output string
Tags []string
}
description: "Un paso individual en un workflow DAG con command/script, dependencias y configuracion de reintentos. Soporta variables de entorno por step, directorio de trabajo propio y politica continue_on."
tags: [dag, workflow, step, yaml, dagu]
uses_types: [dag_continue_on_go_core, dag_retry_policy_go_core]
file_path: "functions/core/dag_definition.go"
---
## Notas
Tipo producto. El campo ID es opcional y se usa como referencia en otros steps (ej: `${id.stdout}`). Si Name e ID estan ambos vacios, el step es invalido. Env del step se mergea sobre el Env del DAG padre durante la resolucion.
+22
View File
@@ -0,0 +1,22 @@
---
name: dag_validation_result
lang: go
domain: core
version: "1.0.0"
algebraic: product
definition: |
type DagValidationResult struct {
Valid bool
Errors []string
Warnings []string
Levels [][]string
}
description: "Resultado de validar un DagDefinition. Contiene errores estructurales (ciclos, depends invalidos, nombres duplicados), warnings (command+script simultaneos) y los niveles topologicos calculados si el DAG es valido."
tags: [dag, validation, workflow, result]
uses_types: []
file_path: "functions/core/dag_definition.go"
---
## Notas
Tipo producto. Levels solo se rellena cuando Valid=true. Cada sub-slice de Levels contiene los nombres/IDs de steps que pueden ejecutarse en paralelo. El orden de Levels refleja el orden topologico del grafo de dependencias.