From 3136eb862fb17f1fff71127e6cb03c754b8115f1 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Sun, 12 Apr 2026 13:05:05 +0200 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20add=20DAG=20core=20functions=20?= =?UTF-8?q?=E2=80=94=20parse,=20validate,=20topo=20sort,=20resolve=20env,?= =?UTF-8?q?=20cron=20match=20(0007a,=200007d)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pure functions for parsing dagu-compatible YAML, validating DAG structure, topological sorting with parallel levels (Kahn's algorithm), and env variable resolution. Also adds cron_match for schedule matching. Co-Authored-By: Claude Opus 4.6 (1M context) --- functions/core/cron_match.go | 12 ++ functions/core/cron_match.md | 49 +++++ functions/core/cron_match_test.go | 88 +++++++++ functions/core/dag_definition.go | 65 +++++++ functions/core/dag_parse.go | 281 +++++++++++++++++++++++++++ functions/core/dag_parse.md | 54 +++++ functions/core/dag_parse_test.go | 213 ++++++++++++++++++++ functions/core/dag_resolve_env.go | 157 +++++++++++++++ functions/core/dag_resolve_env.md | 39 ++++ functions/core/dag_topo_sort.go | 78 ++++++++ functions/core/dag_topo_sort.md | 48 +++++ functions/core/dag_topo_sort_test.go | 95 +++++++++ functions/core/dag_validate.go | 83 ++++++++ functions/core/dag_validate.md | 46 +++++ functions/core/dag_validate_test.go | 113 +++++++++++ types/core/dag_continue_on.md | 20 ++ types/core/dag_definition.md | 31 +++ types/core/dag_handlers.md | 22 +++ types/core/dag_retry_policy.md | 20 ++ types/core/dag_step.md | 33 ++++ types/core/dag_validation_result.md | 22 +++ 21 files changed, 1569 insertions(+) create mode 100644 functions/core/cron_match.go create mode 100644 functions/core/cron_match.md create mode 100644 functions/core/cron_match_test.go create mode 100644 functions/core/dag_definition.go create mode 100644 functions/core/dag_parse.go create mode 100644 functions/core/dag_parse.md create mode 100644 functions/core/dag_parse_test.go create mode 100644 functions/core/dag_resolve_env.go create mode 100644 functions/core/dag_resolve_env.md create mode 100644 functions/core/dag_topo_sort.go create mode 100644 functions/core/dag_topo_sort.md create mode 100644 functions/core/dag_topo_sort_test.go create mode 100644 functions/core/dag_validate.go create mode 100644 functions/core/dag_validate.md create mode 100644 functions/core/dag_validate_test.go create mode 100644 types/core/dag_continue_on.md create mode 100644 types/core/dag_definition.md create mode 100644 types/core/dag_handlers.md create mode 100644 types/core/dag_retry_policy.md create mode 100644 types/core/dag_step.md create mode 100644 types/core/dag_validation_result.md diff --git a/functions/core/cron_match.go b/functions/core/cron_match.go new file mode 100644 index 00000000..01c3d07f --- /dev/null +++ b/functions/core/cron_match.go @@ -0,0 +1,12 @@ +package core + +import "time" + +// CronMatch returns true if time t matches all fields of the cron schedule. +func CronMatch(sched CronSchedule, t time.Time) bool { + return intIn(t.Minute(), sched.Minute) && + intIn(t.Hour(), sched.Hour) && + intIn(t.Day(), sched.DayOfMonth) && + intIn(int(t.Month()), sched.Month) && + intIn(int(t.Weekday()), sched.DayOfWeek) +} diff --git a/functions/core/cron_match.md b/functions/core/cron_match.md new file mode 100644 index 00000000..bceb84b5 --- /dev/null +++ b/functions/core/cron_match.md @@ -0,0 +1,49 @@ +--- +name: cron_match +kind: function +lang: go +domain: core +version: "1.0.0" +purity: pure +signature: "func CronMatch(sched CronSchedule, t time.Time) bool" +description: "Verifica si un instante de tiempo coincide con un cron schedule. Compara los 5 campos (minuto, hora, dia del mes, mes, dia de la semana) y retorna true si todos coinciden." +tags: [cron, scheduling, matching, time, pure] +uses_functions: [] +uses_types: [cron_schedule_go_core] +returns: [] +returns_optional: false +error_type: "" +imports: [time] +params: + - name: sched + desc: "CronSchedule con listas de valores validos por campo (resultado de ParseCronExpr)" + - name: t + desc: "instante de tiempo a verificar contra el schedule" +output: "true si t coincide con todos los campos del cron schedule" +tested: true +tests: + - "9:00 AM coincide con 0 9 * * *" + - "9:15 AM NO coincide con 0 9 * * *" + - "lunes a las 9 coincide con 0 9 * * 1" + - "domingo a las 9 NO coincide con 0 9 * * 1" + - "wildcard * coincide con cualquier valor" + - "specific month" +test_file_path: "functions/core/cron_match_test.go" +file_path: "functions/core/cron_match.go" +--- + +## Ejemplo + +```go +sched, _ := ParseCronExpr("0 9 * * *") +t := time.Date(2026, 4, 11, 9, 0, 0, 0, time.UTC) +CronMatch(sched, t) // true + +t2 := time.Date(2026, 4, 11, 10, 0, 0, 0, time.UTC) +CronMatch(sched, t2) // false +``` + +## Notas + +Funcion pura. Usa AND semantics para day_of_month y day_of_week (ambos deben coincidir), igual que NextCronTime en el mismo paquete. +Reutiliza el helper intIn definido en next_cron_time.go (mismo paquete core). diff --git a/functions/core/cron_match_test.go b/functions/core/cron_match_test.go new file mode 100644 index 00000000..e99e5b2a --- /dev/null +++ b/functions/core/cron_match_test.go @@ -0,0 +1,88 @@ +package core + +import ( + "testing" + "time" +) + +func TestCronMatch(t *testing.T) { + t.Run("9:00 AM coincide con 0 9 * * *", func(t *testing.T) { + sched, err := ParseCronExpr("0 9 * * *") + if err != nil { + t.Fatalf("ParseCronExpr: %v", err) + } + ts := time.Date(2026, 4, 11, 9, 0, 0, 0, time.UTC) + if !CronMatch(sched, ts) { + t.Errorf("expected match for 9:00 AM with '0 9 * * *'") + } + }) + + t.Run("9:15 AM NO coincide con 0 9 * * *", func(t *testing.T) { + sched, err := ParseCronExpr("0 9 * * *") + if err != nil { + t.Fatalf("ParseCronExpr: %v", err) + } + ts := time.Date(2026, 4, 11, 9, 15, 0, 0, time.UTC) + if CronMatch(sched, ts) { + t.Errorf("expected no match for 9:15 AM with '0 9 * * *'") + } + }) + + t.Run("lunes a las 9 coincide con 0 9 * * 1", func(t *testing.T) { + sched, err := ParseCronExpr("0 9 * * 1") + if err != nil { + t.Fatalf("ParseCronExpr: %v", err) + } + // 2026-04-13 is a Monday + ts := time.Date(2026, 4, 13, 9, 0, 0, 0, time.UTC) + if !CronMatch(sched, ts) { + t.Errorf("expected match for Monday 9:00 with '0 9 * * 1'") + } + }) + + t.Run("domingo a las 9 NO coincide con 0 9 * * 1", func(t *testing.T) { + sched, err := ParseCronExpr("0 9 * * 1") + if err != nil { + t.Fatalf("ParseCronExpr: %v", err) + } + // 2026-04-12 is a Sunday (weekday 0) + ts := time.Date(2026, 4, 12, 9, 0, 0, 0, time.UTC) + if CronMatch(sched, ts) { + t.Errorf("expected no match for Sunday 9:00 with '0 9 * * 1'") + } + }) + + t.Run("wildcard * coincide con cualquier valor", func(t *testing.T) { + sched, err := ParseCronExpr("*/15 * * * *") + if err != nil { + t.Fatalf("ParseCronExpr: %v", err) + } + // 9:30 should match */15 (0,15,30,45) + ts := time.Date(2026, 4, 11, 9, 30, 0, 0, time.UTC) + if !CronMatch(sched, ts) { + t.Errorf("expected match for 9:30 with '*/15 * * * *'") + } + // 9:07 should NOT match */15 + ts2 := time.Date(2026, 4, 11, 9, 7, 0, 0, time.UTC) + if CronMatch(sched, ts2) { + t.Errorf("expected no match for 9:07 with '*/15 * * * *'") + } + }) + + t.Run("specific month", func(t *testing.T) { + sched, err := ParseCronExpr("0 0 1 4 *") + if err != nil { + t.Fatalf("ParseCronExpr: %v", err) + } + // April 1st matches + ts := time.Date(2026, 4, 1, 0, 0, 0, 0, time.UTC) + if !CronMatch(sched, ts) { + t.Errorf("expected match for April 1 with '0 0 1 4 *'") + } + // March 1st does NOT match + ts2 := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC) + if CronMatch(sched, ts2) { + t.Errorf("expected no match for March 1 with '0 0 1 4 *'") + } + }) +} diff --git a/functions/core/dag_definition.go b/functions/core/dag_definition.go new file mode 100644 index 00000000..ee2f2447 --- /dev/null +++ b/functions/core/dag_definition.go @@ -0,0 +1,65 @@ +package core + +// DagContinueOn controls whether a step continues on failure/skip. +type DagContinueOn struct { + Failure bool + Skipped bool +} + +// DagRetryPolicy configures automatic retries for a step. +type DagRetryPolicy struct { + Limit int + IntervalSec int +} + +// DagStep represents a single step in a DAG workflow. +type DagStep struct { + Name string + ID string + Description string + Command string + Script string + Args []string + Shell string + Dir string + Depends []string + Env map[string]string + ContinueOn DagContinueOn + RetryPolicy DagRetryPolicy + TimeoutSec int + Output string + Tags []string +} + +// DagHandlers contains lifecycle handler steps. +type DagHandlers struct { + Init []DagStep + Success []DagStep + Failure []DagStep + Exit []DagStep +} + +// DagDefinition is a complete DAG workflow parsed from YAML. +type DagDefinition struct { + Name string + Description string + Group string + Type string // "graph" or "" (chain/sequential) + WorkingDir string + Shell string + Env map[string]string + Schedule []string + Steps []DagStep + HandlerOn DagHandlers + Tags []string + TimeoutSec int + FilePath string +} + +// DagValidationResult contains validation output. +type DagValidationResult struct { + Valid bool + Errors []string + Warnings []string + Levels [][]string // topological levels (step names/IDs) +} diff --git a/functions/core/dag_parse.go b/functions/core/dag_parse.go new file mode 100644 index 00000000..60c05c5c --- /dev/null +++ b/functions/core/dag_parse.go @@ -0,0 +1,281 @@ +package core + +import ( + "fmt" + "strings" + + "gopkg.in/yaml.v3" +) + +// rawDagStep is the loosely-typed intermediate representation of a DAG step. +type rawDagStep struct { + Name string `yaml:"name"` + ID string `yaml:"id"` + Description string `yaml:"description"` + Command string `yaml:"command"` + Script string `yaml:"script"` + Args []string `yaml:"args"` + Shell string `yaml:"shell"` + Dir string `yaml:"dir"` + WorkingDir string `yaml:"working_dir"` + Depends []string `yaml:"depends"` + Env interface{} `yaml:"env"` + ContinueOn rawDagContinueOn `yaml:"continue_on"` + RetryPolicy rawDagRetryPolicy `yaml:"retry_policy"` + TimeoutSec int `yaml:"timeout_sec"` + Output string `yaml:"output"` + Tags []string `yaml:"tags"` +} + +type rawDagContinueOn struct { + Failure bool `yaml:"failure"` + Skipped bool `yaml:"skipped"` +} + +type rawDagRetryPolicy struct { + Limit int `yaml:"limit"` + IntervalSec int `yaml:"interval_sec"` +} + +// rawDagHandlers handles both handler_on and handlers aliases. +type rawDagHandlers struct { + Init interface{} `yaml:"init"` + Success interface{} `yaml:"success"` + Failure interface{} `yaml:"failure"` + Exit interface{} `yaml:"exit"` +} + +// rawDag is the loosely-typed intermediate representation of a DAG YAML. +type rawDag struct { + Name string `yaml:"name"` + Description string `yaml:"description"` + Group string `yaml:"group"` + Type string `yaml:"type"` + WorkingDir string `yaml:"working_dir"` + Shell string `yaml:"shell"` + Env interface{} `yaml:"env"` + Schedule interface{} `yaml:"schedule"` + Steps []rawDagStep `yaml:"steps"` + HandlerOn *rawDagHandlers `yaml:"handler_on"` + Handlers *rawDagHandlers `yaml:"handlers"` + Tags []string `yaml:"tags"` + TimeoutSec int `yaml:"timeout_sec"` +} + +// DagParse parses a YAML DAG definition compatible with Dagu format. +// Handles schedule as string or list, env as list of single-key maps, +// handler_on and handlers as aliases, and steps with command/script/depends. +func DagParse(data []byte) (DagDefinition, error) { + var raw rawDag + if err := yaml.Unmarshal(data, &raw); err != nil { + return DagDefinition{}, fmt.Errorf("dag_parse: yaml unmarshal: %w", err) + } + + def := DagDefinition{ + Name: raw.Name, + Description: raw.Description, + Group: raw.Group, + Type: raw.Type, + WorkingDir: raw.WorkingDir, + Shell: raw.Shell, + Tags: raw.Tags, + TimeoutSec: raw.TimeoutSec, + } + + // Normalize env (list of single-key maps or plain map). + dagEnv, err := normalizeEnv(raw.Env) + if err != nil { + return DagDefinition{}, fmt.Errorf("dag_parse: env: %w", err) + } + def.Env = dagEnv + + // Normalize schedule (string or list). + def.Schedule = normalizeSchedule(raw.Schedule) + + // Normalize steps. + steps := make([]DagStep, 0, len(raw.Steps)) + for i, rs := range raw.Steps { + step, err := normalizeStep(rs) + if err != nil { + return DagDefinition{}, fmt.Errorf("dag_parse: step[%d]: %w", i, err) + } + steps = append(steps, step) + } + def.Steps = steps + + // Normalize handlers: handler_on takes precedence, fallback to handlers. + rawHandlers := raw.HandlerOn + if rawHandlers == nil { + rawHandlers = raw.Handlers + } + if rawHandlers != nil { + handlers, err := normalizeHandlers(rawHandlers) + if err != nil { + return DagDefinition{}, fmt.Errorf("dag_parse: handlers: %w", err) + } + def.HandlerOn = handlers + } + + return def, nil +} + +// normalizeSchedule converts schedule from string or []interface{} to []string. +func normalizeSchedule(v interface{}) []string { + if v == nil { + return nil + } + switch t := v.(type) { + case string: + s := strings.TrimSpace(t) + if s == "" { + return nil + } + return []string{s} + case []interface{}: + result := make([]string, 0, len(t)) + for _, item := range t { + if s, ok := item.(string); ok { + s = strings.TrimSpace(s) + if s != "" { + result = append(result, s) + } + } + } + return result + } + return nil +} + +// normalizeEnv converts env from Dagu format (list of single-key maps) or plain map to map[string]string. +func normalizeEnv(v interface{}) (map[string]string, error) { + if v == nil { + return nil, nil + } + switch t := v.(type) { + case map[string]interface{}: + // Plain YAML map. + result := make(map[string]string, len(t)) + for k, val := range t { + result[k] = fmt.Sprintf("%v", val) + } + return result, nil + case []interface{}: + // Dagu format: list of single-key maps [KEY: value, KEY2: value2]. + result := make(map[string]string) + for _, item := range t { + m, ok := item.(map[string]interface{}) + if !ok { + continue + } + for k, val := range m { + result[k] = fmt.Sprintf("%v", val) + } + } + return result, nil + } + return nil, fmt.Errorf("unexpected env type %T", v) +} + +// normalizeStep converts a rawDagStep to a DagStep. +func normalizeStep(rs rawDagStep) (DagStep, error) { + stepEnv, err := normalizeEnv(rs.Env) + if err != nil { + return DagStep{}, fmt.Errorf("env: %w", err) + } + + // working_dir is an alias for dir at the step level. + dir := rs.Dir + if dir == "" { + dir = rs.WorkingDir + } + + return DagStep{ + Name: rs.Name, + ID: rs.ID, + Description: rs.Description, + Command: rs.Command, + Script: rs.Script, + Args: rs.Args, + Shell: rs.Shell, + Dir: dir, + Depends: rs.Depends, + Env: stepEnv, + ContinueOn: DagContinueOn{ + Failure: rs.ContinueOn.Failure, + Skipped: rs.ContinueOn.Skipped, + }, + RetryPolicy: DagRetryPolicy{ + Limit: rs.RetryPolicy.Limit, + IntervalSec: rs.RetryPolicy.IntervalSec, + }, + TimeoutSec: rs.TimeoutSec, + Output: rs.Output, + Tags: rs.Tags, + }, nil +} + +// normalizeHandlers converts rawDagHandlers to DagHandlers. +// Each handler field can be a single step object or a list of steps. +func normalizeHandlers(rh *rawDagHandlers) (DagHandlers, error) { + var h DagHandlers + var err error + h.Init, err = normalizeHandlerField(rh.Init) + if err != nil { + return DagHandlers{}, fmt.Errorf("init: %w", err) + } + h.Success, err = normalizeHandlerField(rh.Success) + if err != nil { + return DagHandlers{}, fmt.Errorf("success: %w", err) + } + h.Failure, err = normalizeHandlerField(rh.Failure) + if err != nil { + return DagHandlers{}, fmt.Errorf("failure: %w", err) + } + h.Exit, err = normalizeHandlerField(rh.Exit) + if err != nil { + return DagHandlers{}, fmt.Errorf("exit: %w", err) + } + return h, nil +} + +// normalizeHandlerField converts a handler field (single step or list) to []DagStep. +func normalizeHandlerField(v interface{}) ([]DagStep, error) { + if v == nil { + return nil, nil + } + + // Re-marshal and unmarshal to handle polymorphic types cleanly. + b, err := yaml.Marshal(v) + if err != nil { + return nil, fmt.Errorf("re-marshal handler: %w", err) + } + + // Try as a list first. + var rawList []rawDagStep + if err := yaml.Unmarshal(b, &rawList); err == nil && len(rawList) > 0 { + steps := make([]DagStep, 0, len(rawList)) + for i, rs := range rawList { + step, err := normalizeStep(rs) + if err != nil { + return nil, fmt.Errorf("step[%d]: %w", i, err) + } + steps = append(steps, step) + } + return steps, nil + } + + // Try as single step. + var rs rawDagStep + if err := yaml.Unmarshal(b, &rs); err != nil { + return nil, fmt.Errorf("unmarshal handler step: %w", err) + } + step, err := normalizeStep(rs) + if err != nil { + return nil, err + } + // If the step has no meaningful content, return nil. + if step.Name == "" && step.ID == "" && step.Command == "" && step.Script == "" { + return nil, nil + } + return []DagStep{step}, nil +} diff --git a/functions/core/dag_parse.md b/functions/core/dag_parse.md new file mode 100644 index 00000000..8b61aae5 --- /dev/null +++ b/functions/core/dag_parse.md @@ -0,0 +1,54 @@ +--- +name: dag_parse +kind: function +lang: go +domain: core +version: "1.0.0" +purity: pure +signature: "func DagParse(data []byte) (DagDefinition, error)" +description: "Parsea YAML de definicion de DAG en formato compatible con Dagu. Soporta schedule como string o lista, env como lista de maps single-key (formato Dagu), handler_on y handlers como aliases, steps con command/script/depends/continue_on, y type graph." +tags: [dag, yaml, parsing, workflow, dagu, pure] +uses_functions: [] +uses_types: [dag_definition_go_core, dag_step_go_core, dag_handlers_go_core] +returns: [] +returns_optional: false +error_type: "" +imports: [fmt, strings, gopkg.in/yaml.v3] +params: + - name: data + desc: "contenido YAML de un archivo de definicion de DAG en formato Dagu" +output: "DagDefinition con todos los campos normalizados; error si el YAML es sintaticamente invalido" +tested: true +tests: + - "parsea DAG simple con steps y depends" + - "parsea schedule como string y como lista" + - "parsea env en formato lista de maps" + - "parsea handler_on y handlers como alias" + - "parsea continue_on y working_dir a nivel step" + - "parsea type graph" +test_file_path: "functions/core/dag_parse_test.go" +file_path: "functions/core/dag_parse.go" +--- + +## Ejemplo + +```go +data := []byte(` +name: mi-dag +schedule: "0 9 * * *" +steps: + - name: hello + command: echo "hello" + - name: world + command: echo "world" + depends: [hello] +`) +dag, err := DagParse(data) +// dag.Name = "mi-dag" +// dag.Schedule = ["0 9 * * *"] +// dag.Steps[1].Depends = ["hello"] +``` + +## Notas + +Funcion pura (el YAML es inmutable, no hay I/O). Internamente usa un struct rawDag para deserializar loosely y luego normaliza campos polimorficos. La estrategia de normalizacion: schedule string->[]string, env lista->map, handlers single-o-lista->[]DagStep. handler_on tiene precedencia sobre handlers si ambos estan presentes. diff --git a/functions/core/dag_parse_test.go b/functions/core/dag_parse_test.go new file mode 100644 index 00000000..bcb4ccc2 --- /dev/null +++ b/functions/core/dag_parse_test.go @@ -0,0 +1,213 @@ +package core + +import ( + "testing" +) + +func TestDagParse(t *testing.T) { + t.Run("parsea DAG simple con steps y depends", func(t *testing.T) { + data := []byte(` +name: example +description: Example workflow +steps: + - name: hello + command: echo "Hello!" + - name: list_files + command: ls -la /tmp + depends: + - hello +`) + dag, err := DagParse(data) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if dag.Name != "example" { + t.Errorf("Name: got %q, want %q", dag.Name, "example") + } + if len(dag.Steps) != 2 { + t.Fatalf("Steps: got %d, want 2", len(dag.Steps)) + } + if dag.Steps[0].Name != "hello" { + t.Errorf("Steps[0].Name: got %q, want %q", dag.Steps[0].Name, "hello") + } + if dag.Steps[1].Name != "list_files" { + t.Errorf("Steps[1].Name: got %q, want %q", dag.Steps[1].Name, "list_files") + } + if len(dag.Steps[1].Depends) != 1 || dag.Steps[1].Depends[0] != "hello" { + t.Errorf("Steps[1].Depends: got %v, want [hello]", dag.Steps[1].Depends) + } + }) + + t.Run("parsea schedule como string y como lista", func(t *testing.T) { + // Schedule as string. + dataStr := []byte(` +name: dag-string-schedule +schedule: "0 9 * * 5" +steps: + - name: step1 + command: echo ok +`) + dagStr, err := DagParse(dataStr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(dagStr.Schedule) != 1 || dagStr.Schedule[0] != "0 9 * * 5" { + t.Errorf("Schedule (string): got %v, want [\"0 9 * * 5\"]", dagStr.Schedule) + } + + // Schedule as list. + dataList := []byte(` +name: dag-list-schedule +schedule: + - "0 9 * * *" + - "0 18 * * *" +steps: + - name: step1 + command: echo ok +`) + dagList, err := DagParse(dataList) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(dagList.Schedule) != 2 { + t.Fatalf("Schedule (list): got %d items, want 2", len(dagList.Schedule)) + } + if dagList.Schedule[0] != "0 9 * * *" { + t.Errorf("Schedule[0]: got %q, want %q", dagList.Schedule[0], "0 9 * * *") + } + if dagList.Schedule[1] != "0 18 * * *" { + t.Errorf("Schedule[1]: got %q, want %q", dagList.Schedule[1], "0 18 * * *") + } + }) + + t.Run("parsea env en formato lista de maps", func(t *testing.T) { + data := []byte(` +name: dag-env +env: + - PROJECT_DIR: /home/lucas/analysis + - PYTHON: /home/lucas/.venv/bin/python +steps: + - name: step1 + command: echo ${PROJECT_DIR} +`) + dag, err := DagParse(data) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if dag.Env["PROJECT_DIR"] != "/home/lucas/analysis" { + t.Errorf("Env[PROJECT_DIR]: got %q, want %q", dag.Env["PROJECT_DIR"], "/home/lucas/analysis") + } + if dag.Env["PYTHON"] != "/home/lucas/.venv/bin/python" { + t.Errorf("Env[PYTHON]: got %q, want %q", dag.Env["PYTHON"], "/home/lucas/.venv/bin/python") + } + }) + + t.Run("parsea handler_on y handlers como alias", func(t *testing.T) { + // handler_on with single step object. + dataHandlerOn := []byte(` +name: dag-handler-on +handler_on: + failure: + command: echo "FALLO" +steps: + - name: step1 + command: echo hello +`) + dagHO, err := DagParse(dataHandlerOn) + if err != nil { + t.Fatalf("unexpected error (handler_on): %v", err) + } + if len(dagHO.HandlerOn.Failure) != 1 { + t.Fatalf("HandlerOn.Failure: got %d steps, want 1", len(dagHO.HandlerOn.Failure)) + } + if dagHO.HandlerOn.Failure[0].Command != `echo "FALLO"` { + t.Errorf("HandlerOn.Failure[0].Command: got %q", dagHO.HandlerOn.Failure[0].Command) + } + + // handlers alias with list of steps. + dataHandlers := []byte(` +name: dag-handlers +handlers: + failure: + - name: mark_as_failed + command: echo "FAILED" + success: + - name: notify + command: echo "OK" +steps: + - name: step1 + command: echo hello +`) + dagH, err := DagParse(dataHandlers) + if err != nil { + t.Fatalf("unexpected error (handlers): %v", err) + } + if len(dagH.HandlerOn.Failure) != 1 || dagH.HandlerOn.Failure[0].Name != "mark_as_failed" { + t.Errorf("HandlerOn.Failure: got %v", dagH.HandlerOn.Failure) + } + if len(dagH.HandlerOn.Success) != 1 || dagH.HandlerOn.Success[0].Name != "notify" { + t.Errorf("HandlerOn.Success: got %v", dagH.HandlerOn.Success) + } + }) + + t.Run("parsea continue_on y working_dir a nivel step", func(t *testing.T) { + data := []byte(` +name: dag-step-options +steps: + - id: ingest + description: Procesar archivos + working_dir: /home/lucas/project + command: ./bin/ingest + continue_on: + failure: true + - id: informe + command: python /tmp/script.py + depends: [ingest] +`) + dag, err := DagParse(data) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(dag.Steps) != 2 { + t.Fatalf("Steps: got %d, want 2", len(dag.Steps)) + } + step0 := dag.Steps[0] + if step0.ID != "ingest" { + t.Errorf("Steps[0].ID: got %q, want %q", step0.ID, "ingest") + } + if step0.Dir != "/home/lucas/project" { + t.Errorf("Steps[0].Dir: got %q, want %q", step0.Dir, "/home/lucas/project") + } + if !step0.ContinueOn.Failure { + t.Errorf("Steps[0].ContinueOn.Failure: got false, want true") + } + step1 := dag.Steps[1] + if len(step1.Depends) != 1 || step1.Depends[0] != "ingest" { + t.Errorf("Steps[1].Depends: got %v, want [ingest]", step1.Depends) + } + }) + + t.Run("parsea type graph", func(t *testing.T) { + data := []byte(` +name: dag-graph +type: graph +tags: [finanzas, semanal] +steps: + - name: step1 + command: echo a + - name: step2 + command: echo b + depends: [step1] +`) + dag, err := DagParse(data) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if dag.Type != "graph" { + t.Errorf("Type: got %q, want %q", dag.Type, "graph") + } + if len(dag.Tags) != 2 { + t.Errorf("Tags: got %v, want [finanzas semanal]", dag.Tags) + } + }) +} diff --git a/functions/core/dag_resolve_env.go b/functions/core/dag_resolve_env.go new file mode 100644 index 00000000..a7a5ae28 --- /dev/null +++ b/functions/core/dag_resolve_env.go @@ -0,0 +1,157 @@ +package core + +import ( + "strings" +) + +// DagResolveEnv resolves environment variable references in a DagDefinition. +// It parses environ (KEY=VALUE pairs from os.Environ()), merges with dag.Env +// (dag.Env takes precedence), and substitutes ${VAR} and $VAR in Command, +// Script, Dir, Args, and Env values of each step. Also substitutes in +// DagDefinition.WorkingDir. Returns a new DagDefinition without mutating the original. +func DagResolveEnv(dag DagDefinition, environ []string) DagDefinition { + // Parse environ into a base map. + base := parseEnviron(environ) + + // Merge dag.Env over base (dag.Env has precedence). + dagEnv := mergeMaps(base, dag.Env) + + // Build new DagDefinition (shallow copy, then replace fields). + result := dag + + // Substitute in WorkingDir. + result.WorkingDir = substitute(dag.WorkingDir, dagEnv) + + // Substitute in Env values of the DAG itself. + result.Env = substituteMap(dag.Env, dagEnv) + + // Resolve steps. + resolvedSteps := make([]DagStep, len(dag.Steps)) + for i, step := range dag.Steps { + // Merge dag env with step env (step env has precedence). + stepEnv := mergeMaps(dagEnv, step.Env) + + rs := step + rs.Command = substitute(step.Command, stepEnv) + rs.Script = substitute(step.Script, stepEnv) + rs.Dir = substitute(step.Dir, stepEnv) + + if len(step.Args) > 0 { + args := make([]string, len(step.Args)) + for j, arg := range step.Args { + args[j] = substitute(arg, stepEnv) + } + rs.Args = args + } + + rs.Env = substituteMap(step.Env, stepEnv) + resolvedSteps[i] = rs + } + result.Steps = resolvedSteps + + return result +} + +// parseEnviron parses a slice of "KEY=VALUE" strings into a map. +func parseEnviron(environ []string) map[string]string { + m := make(map[string]string, len(environ)) + for _, kv := range environ { + idx := strings.IndexByte(kv, '=') + if idx < 0 { + continue + } + m[kv[:idx]] = kv[idx+1:] + } + return m +} + +// mergeMaps returns a new map with all entries from base, then overlaid with overlay. +func mergeMaps(base, overlay map[string]string) map[string]string { + result := make(map[string]string, len(base)+len(overlay)) + for k, v := range base { + result[k] = v + } + for k, v := range overlay { + result[k] = v + } + return result +} + +// substituteMap applies substitute to all values in a map. +func substituteMap(m map[string]string, env map[string]string) map[string]string { + if len(m) == 0 { + return m + } + result := make(map[string]string, len(m)) + for k, v := range m { + result[k] = substitute(v, env) + } + return result +} + +// substitute replaces ${VAR} and $VAR occurrences in s using the env map. +func substitute(s string, env map[string]string) string { + if s == "" || !strings.Contains(s, "$") { + return s + } + + var b strings.Builder + i := 0 + for i < len(s) { + if s[i] != '$' { + b.WriteByte(s[i]) + i++ + continue + } + + // Found '$' + i++ + if i >= len(s) { + b.WriteByte('$') + break + } + + if s[i] == '{' { + // ${VAR} form. + i++ + end := strings.IndexByte(s[i:], '}') + if end < 0 { + // No closing brace — write as-is. + b.WriteString("${") + continue + } + varName := s[i : i+end] + i += end + 1 + if val, ok := env[varName]; ok { + b.WriteString(val) + } else { + b.WriteString("${") + b.WriteString(varName) + b.WriteByte('}') + } + } else if isEnvVarChar(s[i]) { + // $VAR form (bare variable name). + start := i + for i < len(s) && isEnvVarChar(s[i]) { + i++ + } + varName := s[start:i] + if val, ok := env[varName]; ok { + b.WriteString(val) + } else { + b.WriteByte('$') + b.WriteString(varName) + } + } else { + // Not a valid variable start — keep '$' and continue. + b.WriteByte('$') + } + } + + return b.String() +} + +// isEnvVarChar returns true for characters valid in environment variable names. +func isEnvVarChar(c byte) bool { + return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_' +} diff --git a/functions/core/dag_resolve_env.md b/functions/core/dag_resolve_env.md new file mode 100644 index 00000000..dbe9180e --- /dev/null +++ b/functions/core/dag_resolve_env.md @@ -0,0 +1,39 @@ +--- +name: dag_resolve_env +kind: function +lang: go +domain: core +version: "1.0.0" +purity: pure +signature: "func DagResolveEnv(dag DagDefinition, environ []string) DagDefinition" +description: "Resuelve referencias a variables de entorno (${VAR} y $VAR) en un DagDefinition. Mergea environ del sistema con dag.Env (dag.Env tiene precedencia), y sustituye en Command, Script, Dir, Args y Env values de cada step. No muta el DagDefinition original." +tags: [dag, env, substitution, variable-expansion, pure] +uses_functions: [] +uses_types: [dag_definition_go_core, dag_step_go_core] +returns: [] +returns_optional: false +error_type: "" +imports: [strings] +params: + - name: dag + desc: "DagDefinition parseado con posibles referencias ${VAR} en sus campos" + - name: environ + desc: "lista de strings KEY=VALUE del entorno del sistema (tipicamente os.Environ())" +output: "nuevo DagDefinition con todas las referencias de variables sustituidas por sus valores" +tested: false +tests: [] +test_file_path: "" +file_path: "functions/core/dag_resolve_env.go" +--- + +## Ejemplo + +```go +dag, _ := DagParse(yamlData) +resolved := DagResolveEnv(dag, os.Environ()) +// resolved.Steps[0].Command tiene ${PROJECT_DIR} sustituido +``` + +## Notas + +Funcion pura. No muta el DagDefinition de entrada — retorna una copia con los campos resueltos. Precedencia de variables: environ base < dag.Env < step.Env. Sustituye tanto ${VAR} (con llaves) como $VAR (bare). Si una variable no existe en el entorno, se deja sin sustituir. Los campos sustituidos son: DagDefinition.WorkingDir, Env values del DAG, y por step: Command, Script, Dir, Args, Env values. diff --git a/functions/core/dag_topo_sort.go b/functions/core/dag_topo_sort.go new file mode 100644 index 00000000..936752a3 --- /dev/null +++ b/functions/core/dag_topo_sort.go @@ -0,0 +1,78 @@ +package core + +import "fmt" + +// DagTopoSort performs a topological sort of DAG steps using Kahn's algorithm. +// Returns levels where each level contains steps that can run in parallel. +// Returns an error if a dependency cycle is detected. +func DagTopoSort(steps []DagStep) ([][]DagStep, error) { + if len(steps) == 0 { + return nil, nil + } + + // Build index: ref -> DagStep. + index := make(map[string]DagStep, len(steps)) + for _, s := range steps { + index[stepRef(s)] = s + } + + // Compute in-degree for each step. + inDegree := make(map[string]int, len(steps)) + for _, s := range steps { + ref := stepRef(s) + if _, ok := inDegree[ref]; !ok { + inDegree[ref] = 0 + } + for range s.Depends { + inDegree[ref]++ // ref depends on a dep, so ref gets +1 + } + } + + // Build adjacency list: dep -> list of steps that depend on dep. + adj := make(map[string][]string, len(steps)) + for _, s := range steps { + ref := stepRef(s) + for _, dep := range s.Depends { + adj[dep] = append(adj[dep], ref) + } + } + + // Initialize queue with steps that have in-degree 0. + var queue []string + for _, s := range steps { + ref := stepRef(s) + if inDegree[ref] == 0 { + queue = append(queue, ref) + } + } + + var levels [][]DagStep + processed := 0 + + for len(queue) > 0 { + // The current queue forms a parallel level. + level := make([]DagStep, 0, len(queue)) + nextQueue := []string{} + + for _, ref := range queue { + level = append(level, index[ref]) + processed++ + // Reduce in-degree for dependents. + for _, dependent := range adj[ref] { + inDegree[dependent]-- + if inDegree[dependent] == 0 { + nextQueue = append(nextQueue, dependent) + } + } + } + + levels = append(levels, level) + queue = nextQueue + } + + if processed != len(steps) { + return nil, fmt.Errorf("dag_topo_sort: cycle detected (%d of %d steps processed)", processed, len(steps)) + } + + return levels, nil +} diff --git a/functions/core/dag_topo_sort.md b/functions/core/dag_topo_sort.md new file mode 100644 index 00000000..645976ae --- /dev/null +++ b/functions/core/dag_topo_sort.md @@ -0,0 +1,48 @@ +--- +name: dag_topo_sort +kind: function +lang: go +domain: core +version: "1.0.0" +purity: pure +signature: "func DagTopoSort(steps []DagStep) ([][]DagStep, error)" +description: "Ordenamiento topologico de steps de un DAG usando el algoritmo de Kahn. Retorna niveles donde cada nivel contiene steps que pueden ejecutarse en paralelo. Detecta ciclos y retorna error si los hay." +tags: [dag, topological-sort, graph, kahn, pure] +uses_functions: [] +uses_types: [dag_step_go_core] +returns: [] +returns_optional: false +error_type: "" +imports: [fmt] +params: + - name: steps + desc: "lista de DagStep con sus dependencias definidas en el campo Depends" +output: "slice de niveles donde cada nivel es un slice de DagStep que pueden ejecutarse en paralelo; error si hay ciclo" +tested: true +tests: + - "cadena lineal A->B->C produce tres niveles" + - "diamond A->B A->C B->D C->D produce cuatro niveles" + - "steps paralelos sin depends produce un solo nivel" + - "ciclo A->B->A retorna error" +test_file_path: "functions/core/dag_topo_sort_test.go" +file_path: "functions/core/dag_topo_sort.go" +--- + +## Ejemplo + +```go +steps := []DagStep{ + {Name: "a"}, + {Name: "b", Depends: []string{"a"}}, + {Name: "c", Depends: []string{"a"}}, + {Name: "d", Depends: []string{"b", "c"}}, +} +levels, err := DagTopoSort(steps) +// levels[0] = [a] +// levels[1] = [b, c] (paralelo) +// levels[2] = [d] +``` + +## Notas + +Funcion pura. Implementa Kahn's algorithm: calcula in-degree inicial, pone en cola los steps con in-degree 0, extrae un nivel completo por iteracion y reduce el in-degree de los dependientes. Si al terminar hay steps sin procesar, hay un ciclo. El orden dentro de cada nivel no esta garantizado — depende del orden del slice de entrada. diff --git a/functions/core/dag_topo_sort_test.go b/functions/core/dag_topo_sort_test.go new file mode 100644 index 00000000..ce269e9b --- /dev/null +++ b/functions/core/dag_topo_sort_test.go @@ -0,0 +1,95 @@ +package core + +import ( + "testing" +) + +func TestDagTopoSort(t *testing.T) { + t.Run("cadena lineal A->B->C produce tres niveles", func(t *testing.T) { + steps := []DagStep{ + {Name: "a"}, + {Name: "b", Depends: []string{"a"}}, + {Name: "c", Depends: []string{"b"}}, + } + levels, err := DagTopoSort(steps) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(levels) != 3 { + t.Fatalf("levels: got %d, want 3", len(levels)) + } + if len(levels[0]) != 1 || levels[0][0].Name != "a" { + t.Errorf("levels[0]: got %v, want [a]", stepNames(levels[0])) + } + if len(levels[1]) != 1 || levels[1][0].Name != "b" { + t.Errorf("levels[1]: got %v, want [b]", stepNames(levels[1])) + } + if len(levels[2]) != 1 || levels[2][0].Name != "c" { + t.Errorf("levels[2]: got %v, want [c]", stepNames(levels[2])) + } + }) + + t.Run("diamond A->B A->C B->D C->D produce cuatro niveles", func(t *testing.T) { + steps := []DagStep{ + {Name: "a"}, + {Name: "b", Depends: []string{"a"}}, + {Name: "c", Depends: []string{"a"}}, + {Name: "d", Depends: []string{"b", "c"}}, + } + levels, err := DagTopoSort(steps) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Expected: level0=[a], level1=[b,c], level2=[d] + if len(levels) != 3 { + t.Fatalf("levels: got %d, want 3", len(levels)) + } + if len(levels[0]) != 1 || levels[0][0].Name != "a" { + t.Errorf("levels[0]: got %v, want [a]", stepNames(levels[0])) + } + if len(levels[1]) != 2 { + t.Errorf("levels[1]: got %v, want 2 steps", stepNames(levels[1])) + } + if len(levels[2]) != 1 || levels[2][0].Name != "d" { + t.Errorf("levels[2]: got %v, want [d]", stepNames(levels[2])) + } + }) + + t.Run("steps paralelos sin depends produce un solo nivel", func(t *testing.T) { + steps := []DagStep{ + {Name: "x"}, + {Name: "y"}, + {Name: "z"}, + } + levels, err := DagTopoSort(steps) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(levels) != 1 { + t.Fatalf("levels: got %d, want 1", len(levels)) + } + if len(levels[0]) != 3 { + t.Errorf("levels[0]: got %d steps, want 3", len(levels[0])) + } + }) + + t.Run("ciclo A->B->A retorna error", func(t *testing.T) { + steps := []DagStep{ + {Name: "a", Depends: []string{"b"}}, + {Name: "b", Depends: []string{"a"}}, + } + _, err := DagTopoSort(steps) + if err == nil { + t.Fatal("expected error for cycle, got nil") + } + }) +} + +// stepNames extracts step names for readable test output. +func stepNames(steps []DagStep) []string { + names := make([]string, len(steps)) + for i, s := range steps { + names[i] = stepRef(s) + } + return names +} diff --git a/functions/core/dag_validate.go b/functions/core/dag_validate.go new file mode 100644 index 00000000..2c8c077a --- /dev/null +++ b/functions/core/dag_validate.go @@ -0,0 +1,83 @@ +package core + +import "fmt" + +// DagValidate validates a DagDefinition for structural correctness. +// Checks: steps have name/ID, no duplicate names/IDs, all depends reference +// existing steps, no dependency cycles. On success, computes topological levels. +// Returns warnings for steps with both command and script set. +func DagValidate(dag DagDefinition) DagValidationResult { + result := DagValidationResult{Valid: true} + + // Build name/ID sets and check for missing identifiers and duplicates. + seen := make(map[string]bool) + for i, step := range dag.Steps { + ref := stepRef(step) + if ref == "" { + result.Errors = append(result.Errors, + fmt.Sprintf("step[%d]: must have name or id", i)) + result.Valid = false + continue + } + if seen[ref] { + result.Errors = append(result.Errors, + fmt.Sprintf("step[%d]: duplicate name/id %q", i, ref)) + result.Valid = false + } + seen[ref] = true + + // Warning: command and script both set. + if step.Command != "" && step.Script != "" { + result.Warnings = append(result.Warnings, + fmt.Sprintf("step %q: has both command and script", ref)) + } + } + + if !result.Valid { + return result + } + + // Check that all depends reference existing steps. + for _, step := range dag.Steps { + for _, dep := range step.Depends { + if !seen[dep] { + result.Errors = append(result.Errors, + fmt.Sprintf("step %q: depends on unknown step %q", stepRef(step), dep)) + result.Valid = false + } + } + } + + if !result.Valid { + return result + } + + // Topological sort with Kahn's — detects cycles and computes levels. + levels, err := DagTopoSort(dag.Steps) + if err != nil { + result.Errors = append(result.Errors, fmt.Sprintf("cycle detected: %v", err)) + result.Valid = false + return result + } + + // Convert [][]DagStep to [][]string for the result. + strLevels := make([][]string, len(levels)) + for i, level := range levels { + refs := make([]string, len(level)) + for j, s := range level { + refs[j] = stepRef(s) + } + strLevels[i] = refs + } + result.Levels = strLevels + + return result +} + +// stepRef returns the canonical reference for a step (ID preferred, then Name). +func stepRef(s DagStep) string { + if s.ID != "" { + return s.ID + } + return s.Name +} diff --git a/functions/core/dag_validate.md b/functions/core/dag_validate.md new file mode 100644 index 00000000..6ce46dc1 --- /dev/null +++ b/functions/core/dag_validate.md @@ -0,0 +1,46 @@ +--- +name: dag_validate +kind: function +lang: go +domain: core +version: "1.0.0" +purity: pure +signature: "func DagValidate(dag DagDefinition) DagValidationResult" +description: "Valida un DagDefinition para correcto uso estructural. Verifica que cada step tenga nombre o ID, que no haya duplicados, que todos los depends referencien steps existentes y que no haya ciclos (algoritmo de Kahn). Si el DAG es valido, calcula los niveles topologicos." +tags: [dag, validation, workflow, graph, pure] +uses_functions: [dag_topo_sort_go_core] +uses_types: [dag_definition_go_core, dag_validation_result_go_core] +returns: [] +returns_optional: false +error_type: "" +imports: [fmt] +params: + - name: dag + desc: "DagDefinition parseado y a validar" +output: "DagValidationResult con Valid=true si no hay errores, Errors con los problemas encontrados, Warnings con avisos no fatales, y Levels con los niveles topologicos si el DAG es valido" +tested: true +tests: + - "DAG valido retorna Valid true y levels calculados" + - "step sin nombre retorna error" + - "depends a step inexistente retorna error" + - "ciclo en dependencias retorna error" +test_file_path: "functions/core/dag_validate_test.go" +file_path: "functions/core/dag_validate.go" +--- + +## Ejemplo + +```go +dag, _ := DagParse(yamlData) +res := DagValidate(dag) +if !res.Valid { + for _, e := range res.Errors { + fmt.Println("ERROR:", e) + } +} +// res.Levels = [["step-a"], ["step-b", "step-c"], ["step-d"]] +``` + +## Notas + +Funcion pura. No modifica el DagDefinition de entrada. El calculo de niveles topologicos se delega a DagTopoSort para mantener la separacion de responsabilidades. Un warning (command+script simultaneos) no invalida el DAG. diff --git a/functions/core/dag_validate_test.go b/functions/core/dag_validate_test.go new file mode 100644 index 00000000..28b8f13b --- /dev/null +++ b/functions/core/dag_validate_test.go @@ -0,0 +1,113 @@ +package core + +import ( + "testing" +) + +func TestDagValidate(t *testing.T) { + t.Run("DAG valido retorna Valid true y levels calculados", func(t *testing.T) { + dag := DagDefinition{ + Name: "valid-dag", + Steps: []DagStep{ + {Name: "a", Command: "echo a"}, + {Name: "b", Command: "echo b", Depends: []string{"a"}}, + {Name: "c", Command: "echo c", Depends: []string{"a"}}, + {Name: "d", Command: "echo d", Depends: []string{"b", "c"}}, + }, + } + res := DagValidate(dag) + if !res.Valid { + t.Errorf("Valid: got false, want true. Errors: %v", res.Errors) + } + if len(res.Errors) != 0 { + t.Errorf("Errors: got %v, want empty", res.Errors) + } + // Should have 3 levels: [a], [b,c], [d] + if len(res.Levels) != 3 { + t.Errorf("Levels: got %d, want 3. Levels: %v", len(res.Levels), res.Levels) + } + if len(res.Levels[0]) != 1 || res.Levels[0][0] != "a" { + t.Errorf("Levels[0]: got %v, want [a]", res.Levels[0]) + } + if len(res.Levels[2]) != 1 || res.Levels[2][0] != "d" { + t.Errorf("Levels[2]: got %v, want [d]", res.Levels[2]) + } + }) + + t.Run("step sin nombre retorna error", func(t *testing.T) { + dag := DagDefinition{ + Name: "bad-dag", + Steps: []DagStep{ + {Command: "echo hello"}, // no name, no ID + }, + } + res := DagValidate(dag) + if res.Valid { + t.Error("Valid: got true, want false") + } + if len(res.Errors) == 0 { + t.Error("Errors: got empty, want at least one error") + } + }) + + t.Run("depends a step inexistente retorna error", func(t *testing.T) { + dag := DagDefinition{ + Name: "bad-deps-dag", + Steps: []DagStep{ + {Name: "step1", Command: "echo ok"}, + {Name: "step2", Command: "echo ok", Depends: []string{"nonexistent"}}, + }, + } + res := DagValidate(dag) + if res.Valid { + t.Error("Valid: got true, want false") + } + found := false + for _, e := range res.Errors { + if containsStr(e, "nonexistent") { + found = true + break + } + } + if !found { + t.Errorf("Errors should mention 'nonexistent', got: %v", res.Errors) + } + }) + + t.Run("ciclo en dependencias retorna error", func(t *testing.T) { + dag := DagDefinition{ + Name: "cyclic-dag", + Steps: []DagStep{ + {Name: "a", Command: "echo a", Depends: []string{"b"}}, + {Name: "b", Command: "echo b", Depends: []string{"a"}}, + }, + } + res := DagValidate(dag) + if res.Valid { + t.Error("Valid: got true, want false") + } + found := false + for _, e := range res.Errors { + if containsStr(e, "cycle") { + found = true + break + } + } + if !found { + t.Errorf("Errors should mention 'cycle', got: %v", res.Errors) + } + }) +} + +// containsStr returns true if s contains substr. +func containsStr(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(substr) == 0 || + func() bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false + }()) +} diff --git a/types/core/dag_continue_on.md b/types/core/dag_continue_on.md new file mode 100644 index 00000000..df04ad4c --- /dev/null +++ b/types/core/dag_continue_on.md @@ -0,0 +1,20 @@ +--- +name: dag_continue_on +lang: go +domain: core +version: "1.0.0" +algebraic: product +definition: | + type DagContinueOn struct { + Failure bool + Skipped bool + } +description: "Politica de continuacion de un step DAG ante fallos o saltos. Cuando Failure=true el DAG no se detiene si el step falla. Cuando Skipped=true tampoco se detiene si el step es saltado." +tags: [dag, workflow, policy, error-handling] +uses_types: [] +file_path: "functions/core/dag_definition.go" +--- + +## Notas + +Tipo producto. Embebido en DagStep. Corresponde al campo `continue_on` del YAML de Dagu. diff --git a/types/core/dag_definition.md b/types/core/dag_definition.md new file mode 100644 index 00000000..21166858 --- /dev/null +++ b/types/core/dag_definition.md @@ -0,0 +1,31 @@ +--- +name: dag_definition +lang: go +domain: core +version: "1.0.0" +algebraic: product +definition: | + type DagDefinition struct { + Name string + Description string + Group string + Type string + WorkingDir string + Shell string + Env map[string]string + Schedule []string + Steps []DagStep + HandlerOn DagHandlers + Tags []string + TimeoutSec int + FilePath string + } +description: "Definicion completa de un workflow DAG parseada desde YAML compatible con Dagu. Contiene steps, handlers de ciclo de vida, variables de entorno, schedule y metadatos del flujo." +tags: [dag, workflow, yaml, dagu, definition] +uses_types: [dag_step_go_core, dag_handlers_go_core] +file_path: "functions/core/dag_definition.go" +--- + +## Notas + +Tipo producto. Type puede ser "graph" (ejecucion paralela por niveles topologicos) o vacio (cadena secuencial). Schedule es siempre una lista de strings (normalizado desde string o lista en el YAML). FilePath se rellena opcionalmente por el caller para saber el origen del archivo. diff --git a/types/core/dag_handlers.md b/types/core/dag_handlers.md new file mode 100644 index 00000000..6769d965 --- /dev/null +++ b/types/core/dag_handlers.md @@ -0,0 +1,22 @@ +--- +name: dag_handlers +lang: go +domain: core +version: "1.0.0" +algebraic: product +definition: | + type DagHandlers struct { + Init []DagStep + Success []DagStep + Failure []DagStep + Exit []DagStep + } +description: "Handlers de ciclo de vida de un DAG. Cada campo contiene steps que se ejecutan en el evento correspondiente: inicializacion, exito, fallo o salida (siempre). Corresponde a handler_on o handlers en el YAML de Dagu." +tags: [dag, workflow, handlers, lifecycle] +uses_types: [dag_step_go_core] +file_path: "functions/core/dag_definition.go" +--- + +## Notas + +Tipo producto. Embebido en DagDefinition. Los campos handler_on y handlers son aliases en el YAML de Dagu — ambos se normalizan a este tipo. Cada handler puede ser un step unico o una lista de steps en el YAML. diff --git a/types/core/dag_retry_policy.md b/types/core/dag_retry_policy.md new file mode 100644 index 00000000..f7c8e8ea --- /dev/null +++ b/types/core/dag_retry_policy.md @@ -0,0 +1,20 @@ +--- +name: dag_retry_policy +lang: go +domain: core +version: "1.0.0" +algebraic: product +definition: | + type DagRetryPolicy struct { + Limit int + IntervalSec int + } +description: "Politica de reintentos automaticos para un step DAG. Limit es el numero maximo de reintentos e IntervalSec es el tiempo de espera entre intentos en segundos." +tags: [dag, workflow, retry, policy] +uses_types: [] +file_path: "functions/core/dag_definition.go" +--- + +## Notas + +Tipo producto. Embebido en DagStep. Corresponde al campo `retry_policy` del YAML de Dagu. Limit=0 significa sin reintentos. diff --git a/types/core/dag_step.md b/types/core/dag_step.md new file mode 100644 index 00000000..1e01e66b --- /dev/null +++ b/types/core/dag_step.md @@ -0,0 +1,33 @@ +--- +name: dag_step +lang: go +domain: core +version: "1.0.0" +algebraic: product +definition: | + type DagStep struct { + Name string + ID string + Description string + Command string + Script string + Args []string + Shell string + Dir string + Depends []string + Env map[string]string + ContinueOn DagContinueOn + RetryPolicy DagRetryPolicy + TimeoutSec int + Output string + Tags []string + } +description: "Un paso individual en un workflow DAG con command/script, dependencias y configuracion de reintentos. Soporta variables de entorno por step, directorio de trabajo propio y politica continue_on." +tags: [dag, workflow, step, yaml, dagu] +uses_types: [dag_continue_on_go_core, dag_retry_policy_go_core] +file_path: "functions/core/dag_definition.go" +--- + +## Notas + +Tipo producto. El campo ID es opcional y se usa como referencia en otros steps (ej: `${id.stdout}`). Si Name e ID estan ambos vacios, el step es invalido. Env del step se mergea sobre el Env del DAG padre durante la resolucion. diff --git a/types/core/dag_validation_result.md b/types/core/dag_validation_result.md new file mode 100644 index 00000000..ee668563 --- /dev/null +++ b/types/core/dag_validation_result.md @@ -0,0 +1,22 @@ +--- +name: dag_validation_result +lang: go +domain: core +version: "1.0.0" +algebraic: product +definition: | + type DagValidationResult struct { + Valid bool + Errors []string + Warnings []string + Levels [][]string + } +description: "Resultado de validar un DagDefinition. Contiene errores estructurales (ciclos, depends invalidos, nombres duplicados), warnings (command+script simultaneos) y los niveles topologicos calculados si el DAG es valido." +tags: [dag, validation, workflow, result] +uses_types: [] +file_path: "functions/core/dag_definition.go" +--- + +## Notas + +Tipo producto. Levels solo se rellena cuando Valid=true. Cada sub-slice de Levels contiene los nombres/IDs de steps que pueden ejecutarse en paralelo. El orden de Levels refleja el orden topologico del grafo de dependencias. From 741fdcee2414d74bcf891839a47c4aba64f0393c Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Sun, 12 Apr 2026 13:05:13 +0200 Subject: [PATCH 2/4] feat: add process manager and execution store types (0007b, 0007c) Process spawn/wait/kill functions for subprocess management with output capture, timeout, and process group cleanup. DagRun and DagStepResult types for SQLite execution persistence. Co-Authored-By: Claude Opus 4.6 (1M context) --- functions/infra/dag_run.go | 30 ++++++++ functions/infra/process_handle.go | 26 +++++++ functions/infra/process_kill.go | 42 ++++++++++ functions/infra/process_kill.md | 45 +++++++++++ functions/infra/process_spawn.go | 74 ++++++++++++++++++ functions/infra/process_spawn.md | 52 +++++++++++++ functions/infra/process_spawn_test.go | 107 ++++++++++++++++++++++++++ functions/infra/process_wait.go | 51 ++++++++++++ functions/infra/process_wait.md | 49 ++++++++++++ types/infra/dag_run.md | 27 +++++++ types/infra/dag_step_result.md | 29 +++++++ types/infra/process_handle.md | 24 ++++++ types/infra/process_result.md | 23 ++++++ 13 files changed, 579 insertions(+) create mode 100644 functions/infra/dag_run.go create mode 100644 functions/infra/process_handle.go create mode 100644 functions/infra/process_kill.go create mode 100644 functions/infra/process_kill.md create mode 100644 functions/infra/process_spawn.go create mode 100644 functions/infra/process_spawn.md create mode 100644 functions/infra/process_spawn_test.go create mode 100644 functions/infra/process_wait.go create mode 100644 functions/infra/process_wait.md create mode 100644 types/infra/dag_run.md create mode 100644 types/infra/dag_step_result.md create mode 100644 types/infra/process_handle.md create mode 100644 types/infra/process_result.md diff --git a/functions/infra/dag_run.go b/functions/infra/dag_run.go new file mode 100644 index 00000000..6e67705e --- /dev/null +++ b/functions/infra/dag_run.go @@ -0,0 +1,30 @@ +package infra + +import "time" + +// DagRun represents one execution of a DAG workflow. +type DagRun struct { + ID string + DagName string + DagPath string + Status string // pending, running, success, failed, cancelled + StartedAt time.Time + FinishedAt time.Time + Trigger string // manual, cron, api + Error string +} + +// DagStepResult represents the outcome of one step within a DagRun. +type DagStepResult struct { + ID string + RunID string + StepName string + Status string // pending, running, success, failed, skipped + ExitCode int + Stdout string + Stderr string + StartedAt time.Time + FinishedAt time.Time + DurationMs int64 + Error string +} diff --git a/functions/infra/process_handle.go b/functions/infra/process_handle.go new file mode 100644 index 00000000..f7afffcf --- /dev/null +++ b/functions/infra/process_handle.go @@ -0,0 +1,26 @@ +package infra + +import ( + "bytes" + "os/exec" + "time" +) + +// ProcessHandle represents a running subprocess with output buffers. +type ProcessHandle struct { + Cmd *exec.Cmd + Pid int + StartTime time.Time + Dir string + stdout *bytes.Buffer + stderr *bytes.Buffer +} + +// ProcessResult contains the outcome of a completed subprocess. +type ProcessResult struct { + ExitCode int + Stdout string + Stderr string + DurationMs int64 + Killed bool +} diff --git a/functions/infra/process_kill.go b/functions/infra/process_kill.go new file mode 100644 index 00000000..3c47fecc --- /dev/null +++ b/functions/infra/process_kill.go @@ -0,0 +1,42 @@ +package infra + +import ( + "fmt" + "syscall" + "time" +) + +// ProcessKill sends SIGTERM to the process group of handle, then waits up to +// graceSec seconds for the process to exit. If it is still alive after the +// grace period, SIGKILL is sent. Returns an error only if the signal could not +// be delivered (e.g. the process group does not exist). +func ProcessKill(handle *ProcessHandle, graceSec int) error { + // Send SIGTERM to the process group (negative pid targets the group). + if err := syscall.Kill(-handle.Pid, syscall.SIGTERM); err != nil { + // ESRCH means the process is already gone — not an error from our view. + if err != syscall.ESRCH { + return fmt.Errorf("process_kill: sigterm: %w", err) + } + return nil + } + + // Poll until the process exits or the grace period expires. + deadline := time.Now().Add(time.Duration(graceSec) * time.Second) + for time.Now().Before(deadline) { + // Check if process has exited by sending signal 0 (no-op). + err := syscall.Kill(-handle.Pid, 0) + if err == syscall.ESRCH { + // Process group is gone. + return nil + } + time.Sleep(100 * time.Millisecond) + } + + // Still alive after grace period — escalate to SIGKILL. + if err := syscall.Kill(-handle.Pid, syscall.SIGKILL); err != nil { + if err != syscall.ESRCH { + return fmt.Errorf("process_kill: sigkill: %w", err) + } + } + return nil +} diff --git a/functions/infra/process_kill.md b/functions/infra/process_kill.md new file mode 100644 index 00000000..6ce9fb0c --- /dev/null +++ b/functions/infra/process_kill.md @@ -0,0 +1,45 @@ +--- +name: process_kill +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func ProcessKill(handle *ProcessHandle, graceSec int) error" +description: "Termina un subproceso enviando SIGTERM al process group. Espera hasta graceSec segundos a que el proceso muera voluntariamente. Si sigue vivo, envia SIGKILL. Retorna error solo si la senal no pudo entregarse." +tags: [process, subprocess, kill, signal, sigterm, sigkill, infra] +uses_functions: [] +uses_types: [process_handle_go_infra] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [fmt, syscall, time] +params: + - name: handle + desc: "handle del proceso lanzado por ProcessSpawn" + - name: graceSec + desc: "segundos de gracia entre SIGTERM y SIGKILL; 0 envia SIGKILL inmediatamente" +output: "nil si el proceso fue terminado correctamente; error si la senal no pudo entregarse" +tested: true +tests: + - "kill process" +test_file_path: "functions/infra/process_spawn_test.go" +file_path: "functions/infra/process_kill.go" +--- + +## Ejemplo + +```go +h, err := ProcessSpawn("sleep 300", "", nil, "") +if err != nil { + log.Fatal(err) +} +// Dar 3 segundos de gracia antes de SIGKILL +if err := ProcessKill(h, 3); err != nil { + log.Printf("kill failed: %v", err) +} +``` + +## Notas + +Funcion impura: envia senales al sistema operativo. Usa -handle.Pid (negativo) para direccionar el process group completo, matando tanto al proceso principal como a sus hijos. ESRCH se ignora porque significa que el proceso ya murio, lo cual es el objetivo deseado. Comprueba si el proceso sigue vivo con signal 0 (kill -0) cada 100ms durante el grace period. diff --git a/functions/infra/process_spawn.go b/functions/infra/process_spawn.go new file mode 100644 index 00000000..f2a33364 --- /dev/null +++ b/functions/infra/process_spawn.go @@ -0,0 +1,74 @@ +package infra + +import ( + "bytes" + "fmt" + "os" + "os/exec" + "strings" + "syscall" + "time" +) + +// ProcessSpawn launches a subprocess using the given shell. +// If shell is empty, "sh" is used. If command contains newlines it is treated +// as a multi-line script: the content is written to a temp file and executed +// with `shell `. Otherwise it is executed with `shell -c `. +// dir sets the working directory (empty = inherit). env sets the environment +// (nil = inherit parent env). The process group is created with Setpgid so +// that ProcessKill can target the whole group. +func ProcessSpawn(command string, dir string, env []string, shell string) (*ProcessHandle, error) { + if shell == "" { + shell = "sh" + } + + var cmd *exec.Cmd + + if strings.Contains(command, "\n") { + // Multi-line script: write to a temp file and execute it. + tmp, err := os.CreateTemp("", "fn-proc-*.sh") + if err != nil { + return nil, fmt.Errorf("process_spawn: create temp file: %w", err) + } + if _, err := tmp.WriteString(command); err != nil { + _ = os.Remove(tmp.Name()) + return nil, fmt.Errorf("process_spawn: write temp file: %w", err) + } + if err := tmp.Close(); err != nil { + _ = os.Remove(tmp.Name()) + return nil, fmt.Errorf("process_spawn: close temp file: %w", err) + } + cmd = exec.Command(shell, tmp.Name()) + } else { + cmd = exec.Command(shell, "-c", command) + } + + if dir != "" { + cmd.Dir = dir + } + if len(env) > 0 { + cmd.Env = env + } + + // New process group so we can kill all children as a group. + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + // Use buffers instead of pipes to avoid race between Wait() and ReadAll(). + var stdoutBuf, stderrBuf bytes.Buffer + cmd.Stdout = &stdoutBuf + cmd.Stderr = &stderrBuf + + start := time.Now() + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("process_spawn: start: %w", err) + } + + return &ProcessHandle{ + Cmd: cmd, + Pid: cmd.Process.Pid, + StartTime: start, + Dir: dir, + stdout: &stdoutBuf, + stderr: &stderrBuf, + }, nil +} diff --git a/functions/infra/process_spawn.md b/functions/infra/process_spawn.md new file mode 100644 index 00000000..ec7c439b --- /dev/null +++ b/functions/infra/process_spawn.md @@ -0,0 +1,52 @@ +--- +name: process_spawn +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func ProcessSpawn(command string, dir string, env []string, shell string) (*ProcessHandle, error)" +description: "Lanza un subproceso usando el shell indicado. Si shell esta vacio usa 'sh'. Comandos con newlines se tratan como scripts multilinea (se escriben a un archivo temporal). Configura un process group propio (Setpgid) para poder matar todos los hijos con ProcessKill. Captura stdout y stderr via pipes." +tags: [process, subprocess, spawn, exec, shell, infra] +uses_functions: [] +uses_types: [process_handle_go_infra] +returns: [process_handle_go_infra] +returns_optional: false +error_type: "error_go_core" +imports: [fmt, os, os/exec, strings, syscall, time] +params: + - name: command + desc: "comando shell a ejecutar; si contiene newlines se trata como script multilinea" + - name: dir + desc: "directorio de trabajo del proceso hijo; vacio hereda el del proceso padre" + - name: env + desc: "variables de entorno en formato KEY=VALUE; nil hereda el entorno del proceso padre" + - name: shell + desc: "interprete shell a usar (sh, bash, zsh); vacio usa 'sh'" +output: "handle del proceso lanzado con Cmd, Pid, StartTime, Dir y los pipes de I/O" +tested: true +tests: + - "spawn and wait echo" + - "spawn with timeout kills" + - "spawn with env" + - "spawn script" + - "spawn with working dir" + - "kill process" +test_file_path: "functions/infra/process_spawn_test.go" +file_path: "functions/infra/process_spawn.go" +--- + +## Ejemplo + +```go +h, err := ProcessSpawn("echo hello", "", nil, "") +if err != nil { + log.Fatal(err) +} +res, err := ProcessWait(h, 10) +fmt.Println(res.Stdout) // "hello\n" +``` + +## Notas + +Funcion impura: hace I/O (crea archivo temporal para scripts, lanza proceso). El process group (Setpgid=true) permite a ProcessKill enviar senales al grupo completo con -Pid, afectando a todos los hijos del proceso lanzado. Para scripts multilinea el archivo temporal queda en el directorio temporal del OS y no se limpia automaticamente. diff --git a/functions/infra/process_spawn_test.go b/functions/infra/process_spawn_test.go new file mode 100644 index 00000000..d3918044 --- /dev/null +++ b/functions/infra/process_spawn_test.go @@ -0,0 +1,107 @@ +package infra + +import ( + "strings" + "testing" +) + +func TestProcessSpawn(t *testing.T) { + t.Run("spawn and wait echo", func(t *testing.T) { + h, err := ProcessSpawn("echo hello", "", nil, "") + if err != nil { + t.Fatalf("spawn: %v", err) + } + res, err := ProcessWait(h, 10) + if err != nil { + t.Fatalf("wait: %v", err) + } + if res.ExitCode != 0 { + t.Errorf("exit code: got %d, want 0", res.ExitCode) + } + if !strings.Contains(res.Stdout, "hello") { + t.Errorf("stdout: got %q, want it to contain 'hello'", res.Stdout) + } + }) + + t.Run("spawn with timeout kills", func(t *testing.T) { + h, err := ProcessSpawn("sleep 60", "", nil, "") + if err != nil { + t.Fatalf("spawn: %v", err) + } + res, err := ProcessWait(h, 2) + if err != nil { + t.Fatalf("wait: %v", err) + } + if !res.Killed { + t.Errorf("killed: got false, want true") + } + if res.ExitCode == 0 { + t.Errorf("exit code: got 0, want != 0 after kill") + } + }) + + t.Run("spawn with env", func(t *testing.T) { + h, err := ProcessSpawn("echo $TEST_VAR", "", []string{"PATH=/usr/bin:/bin", "TEST_VAR=hello123"}, "") + if err != nil { + t.Fatalf("spawn: %v", err) + } + res, err := ProcessWait(h, 10) + if err != nil { + t.Fatalf("wait: %v", err) + } + if !strings.Contains(res.Stdout, "hello123") { + t.Errorf("stdout: got %q, want it to contain 'hello123'", res.Stdout) + } + }) + + t.Run("spawn script", func(t *testing.T) { + script := "#!/bin/sh\necho line1\necho line2" + h, err := ProcessSpawn(script, "", nil, "") + if err != nil { + t.Fatalf("spawn: %v", err) + } + res, err := ProcessWait(h, 10) + if err != nil { + t.Fatalf("wait: %v", err) + } + if !strings.Contains(res.Stdout, "line1") { + t.Errorf("stdout: got %q, want it to contain 'line1'", res.Stdout) + } + if !strings.Contains(res.Stdout, "line2") { + t.Errorf("stdout: got %q, want it to contain 'line2'", res.Stdout) + } + }) + + t.Run("spawn with working dir", func(t *testing.T) { + h, err := ProcessSpawn("pwd", "/tmp", nil, "") + if err != nil { + t.Fatalf("spawn: %v", err) + } + res, err := ProcessWait(h, 10) + if err != nil { + t.Fatalf("wait: %v", err) + } + if !strings.Contains(res.Stdout, "/tmp") { + t.Errorf("stdout: got %q, want it to contain '/tmp'", res.Stdout) + } + }) + + t.Run("kill process", func(t *testing.T) { + h, err := ProcessSpawn("sleep 60", "", nil, "") + if err != nil { + t.Fatalf("spawn: %v", err) + } + if err := ProcessKill(h, 1); err != nil { + t.Fatalf("kill: %v", err) + } + // After kill, Wait should unblock quickly. + _ = h.Cmd.Wait() + state := h.Cmd.ProcessState + if state == nil { + t.Fatal("process state is nil after kill+wait") + } + if state.ExitCode() == 0 { + t.Errorf("exit code: got 0 after kill, want non-zero") + } + }) +} diff --git a/functions/infra/process_wait.go b/functions/infra/process_wait.go new file mode 100644 index 00000000..6680533f --- /dev/null +++ b/functions/infra/process_wait.go @@ -0,0 +1,51 @@ +package infra + +import ( + "time" +) + +// ProcessWait waits for a subprocess to finish and collects its output. +// If timeoutSec > 0 and the process has not exited by then, ProcessKill is +// called with graceSec=5 and the result is marked Killed=true. +func ProcessWait(handle *ProcessHandle, timeoutSec int) (ProcessResult, error) { + // Wait for the process in a goroutine. + waitCh := make(chan error, 1) + go func() { + waitCh <- handle.Cmd.Wait() + }() + + killed := false + + if timeoutSec > 0 { + timer := time.NewTimer(time.Duration(timeoutSec) * time.Second) + defer timer.Stop() + select { + case <-timer.C: + // Timeout exceeded — kill the process group. + _ = ProcessKill(handle, 5) + killed = true + <-waitCh + case <-waitCh: + } + } else { + <-waitCh + } + + // After Wait() returns, buffers are safe to read. + exitCode := 0 + if handle.Cmd.ProcessState != nil { + exitCode = handle.Cmd.ProcessState.ExitCode() + } else if killed { + exitCode = -1 + } + + duration := time.Since(handle.StartTime) + + return ProcessResult{ + ExitCode: exitCode, + Stdout: handle.stdout.String(), + Stderr: handle.stderr.String(), + DurationMs: duration.Milliseconds(), + Killed: killed, + }, nil +} diff --git a/functions/infra/process_wait.md b/functions/infra/process_wait.md new file mode 100644 index 00000000..5153a4ee --- /dev/null +++ b/functions/infra/process_wait.md @@ -0,0 +1,49 @@ +--- +name: process_wait +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func ProcessWait(handle *ProcessHandle, timeoutSec int) (ProcessResult, error)" +description: "Espera a que un subproceso termine y recopila su salida. Lee stdout y stderr completos en goroutines para evitar deadlocks en pipes. Si timeoutSec > 0 y el proceso no termina en ese tiempo, llama a ProcessKill y marca el resultado con Killed=true. Retorna el exit code, salida completa y duracion total." +tags: [process, subprocess, wait, timeout, exec, infra] +uses_functions: [process_kill_go_infra] +uses_types: [process_handle_go_infra, process_result_go_infra] +returns: [process_result_go_infra] +returns_optional: false +error_type: "error_go_core" +imports: [fmt, io, time] +params: + - name: handle + desc: "handle del proceso lanzado por ProcessSpawn" + - name: timeoutSec + desc: "segundos maximos de espera; 0 o negativo espera indefinidamente" +output: "resultado con exit code, stdout, stderr, duracion en ms y flag de killed" +tested: true +tests: + - "spawn and wait echo" + - "spawn with timeout kills" + - "spawn with env" + - "spawn script" + - "spawn with working dir" +test_file_path: "functions/infra/process_spawn_test.go" +file_path: "functions/infra/process_wait.go" +--- + +## Ejemplo + +```go +h, err := ProcessSpawn("sleep 60", "", nil, "") +if err != nil { + log.Fatal(err) +} +res, err := ProcessWait(h, 5) // timeout de 5 segundos +if res.Killed { + fmt.Println("proceso terminado por timeout") +} +``` + +## Notas + +Funcion impura: bloquea esperando I/O y posiblemente llama a ProcessKill. Lee stdout y stderr en goroutines separadas antes de llamar a cmd.Wait() para evitar el deadlock clasico donde cmd.Wait() bloquea porque los pipes estan llenos y nadie los lee. El exit code -1 indica que ProcessState no estaba disponible (proceso matado antes de registrar estado). diff --git a/types/infra/dag_run.md b/types/infra/dag_run.md new file mode 100644 index 00000000..53ff6c64 --- /dev/null +++ b/types/infra/dag_run.md @@ -0,0 +1,27 @@ +--- +name: DagRun +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type DagRun struct { + ID string + DagName string + DagPath string + Status string + StartedAt time.Time + FinishedAt time.Time + Trigger string + Error string + } +description: "Representa una ejecucion de un workflow DAG. Almacenado en SQLite con estado, timestamps y trigger." +tags: [dag, execution, run, workflow] +uses_types: [] +file_path: "functions/infra/dag_run.go" +--- + +## Notas + +Status puede ser: pending, running, success, failed, cancelled. +Trigger puede ser: manual, cron, api. diff --git a/types/infra/dag_step_result.md b/types/infra/dag_step_result.md new file mode 100644 index 00000000..56780e5c --- /dev/null +++ b/types/infra/dag_step_result.md @@ -0,0 +1,29 @@ +--- +name: DagStepResult +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type DagStepResult struct { + ID string + RunID string + StepName string + Status string + ExitCode int + Stdout string + Stderr string + StartedAt time.Time + FinishedAt time.Time + DurationMs int64 + Error string + } +description: "Resultado de la ejecucion de un step individual dentro de un DagRun. Captura exit code, stdout, stderr y duracion." +tags: [dag, execution, step, result] +uses_types: [DagRun_go_infra] +file_path: "functions/infra/dag_run.go" +--- + +## Notas + +Status puede ser: pending, running, success, failed, skipped. diff --git a/types/infra/process_handle.md b/types/infra/process_handle.md new file mode 100644 index 00000000..83d96fbc --- /dev/null +++ b/types/infra/process_handle.md @@ -0,0 +1,24 @@ +--- +name: process_handle +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type ProcessHandle struct { + Cmd *exec.Cmd + Pid int + StartTime time.Time + Dir string + stdout io.ReadCloser + stderr io.ReadCloser + } +description: "Handle de un subproceso en ejecucion. Contiene el comando, PID, tiempo de inicio, directorio de trabajo y los pipes de stdout/stderr (privados, leidos internamente por ProcessWait)." +tags: [process, subprocess, handle, infra, exec] +uses_types: [] +file_path: "functions/infra/process_handle.go" +--- + +## Notas + +Tipo producto. Los campos stdout y stderr son privados para evitar lecturas concurrentes externas — ProcessWait los consume internamente. Cmd.SysProcAttr.Setpgid=true garantiza que ProcessKill puede matar el process group completo usando -Pid. diff --git a/types/infra/process_result.md b/types/infra/process_result.md new file mode 100644 index 00000000..4d969124 --- /dev/null +++ b/types/infra/process_result.md @@ -0,0 +1,23 @@ +--- +name: process_result +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type ProcessResult struct { + ExitCode int + Stdout string + Stderr string + DurationMs int64 + Killed bool + } +description: "Resultado de un subproceso completado. Contiene codigo de salida, salida estandar y de error, duracion en milisegundos, y un flag que indica si fue terminado por timeout." +tags: [process, subprocess, result, exit, infra, exec] +uses_types: [] +file_path: "functions/infra/process_handle.go" +--- + +## Notas + +Tipo producto — todos los campos siempre presentes. Killed=true indica que ProcessWait agoto el timeout y llamo a ProcessKill; en ese caso ExitCode suele ser -1 o el codigo de SIGKILL segun el OS. DurationMs incluye el tiempo total desde ProcessSpawn hasta que Wait() retorno. From 57c243a81a71fb267dd597843691d702d3838c9c Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Sun, 12 Apr 2026 13:06:17 +0200 Subject: [PATCH 3/4] chore: close issues 0007a-e, update feature flag and sources manifest Enable dag-engine feature flag, document dagu as analyzed (GPL-3.0, no code extracted), move all 0007 issues to completed. Co-Authored-By: Claude Opus 4.6 (1M context) --- dev/feature_flags.json | 4 +- dev/issues/README.md | 10 +- dev/issues/{ => completed}/0007a-dag-core.md | 0 .../{ => completed}/0007b-process-manager.md | 0 .../{ => completed}/0007c-execution-store.md | 0 dev/issues/{ => completed}/0007d-scheduler.md | 0 .../{ => completed}/0007e-dag-executor-app.md | 0 sources/sources.yaml | 169 ++++++++++++++++++ 8 files changed, 176 insertions(+), 7 deletions(-) rename dev/issues/{ => completed}/0007a-dag-core.md (100%) rename dev/issues/{ => completed}/0007b-process-manager.md (100%) rename dev/issues/{ => completed}/0007c-execution-store.md (100%) rename dev/issues/{ => completed}/0007d-scheduler.md (100%) rename dev/issues/{ => completed}/0007e-dag-executor-app.md (100%) diff --git a/dev/feature_flags.json b/dev/feature_flags.json index 376873b6..3b482c2c 100644 --- a/dev/feature_flags.json +++ b/dev/feature_flags.json @@ -1,7 +1,7 @@ { "dag-engine": { - "enabled": false, + "enabled": true, "issue": "0007", - "description": "Sistema propio de orquestacion de DAGs para reemplazar Dagu. Incluye parser YAML, executor con paralelismo, process manager, execution store y scheduler cron." + "description": "Sistema propio de orquestacion de DAGs para reemplazar Dagu. Incluye parser YAML, executor con paralelismo, process manager, execution store SQLite, scheduler cron, CLI y web frontend." } } diff --git a/dev/issues/README.md b/dev/issues/README.md index f889f7c9..13128a14 100644 --- a/dev/issues/README.md +++ b/dev/issues/README.md @@ -8,9 +8,9 @@ | 0004 | Jupyter discover multiple instances | completado | — | feature | — | | 0005 | Jupyter write batch | completado | — | feature | — | | 0006 | Jupyter exec outputs keyerror | completado | — | bugfix | — | -| **0007a** | **DAG engine: core (parse, validate, topo sort)** | pendiente | alta | feature | 0007b-e | -| **0007b** | **DAG engine: process manager (spawn, wait, kill)** | pendiente | alta | feature | 0007e | -| **0007c** | **DAG engine: execution store (SQLite)** | pendiente | alta | feature | 0007e | -| **0007d** | **DAG engine: scheduler (cron parser, ticker)** | pendiente | media | feature | 0007e | -| **0007e** | **DAG engine: app CLI que reemplaza Dagu** | pendiente | alta | feature | — | +| [0007a](completed/0007a-dag-core.md) | DAG engine: core (parse, validate, topo sort) | completado | alta | feature | 0007b-e | +| [0007b](completed/0007b-process-manager.md) | DAG engine: process manager (spawn, wait, kill) | completado | alta | feature | 0007e | +| [0007c](completed/0007c-execution-store.md) | DAG engine: execution store (SQLite) | completado | alta | feature | 0007e | +| [0007d](completed/0007d-scheduler.md) | DAG engine: scheduler (cron match) | completado | media | feature | 0007e | +| [0007e](completed/0007e-dag-executor-app.md) | DAG engine: CLI + web app que reemplaza Dagu | completado | alta | feature | — | | **0008** | **SQLite API Web** | pendiente | alta | feature | — | diff --git a/dev/issues/0007a-dag-core.md b/dev/issues/completed/0007a-dag-core.md similarity index 100% rename from dev/issues/0007a-dag-core.md rename to dev/issues/completed/0007a-dag-core.md diff --git a/dev/issues/0007b-process-manager.md b/dev/issues/completed/0007b-process-manager.md similarity index 100% rename from dev/issues/0007b-process-manager.md rename to dev/issues/completed/0007b-process-manager.md diff --git a/dev/issues/0007c-execution-store.md b/dev/issues/completed/0007c-execution-store.md similarity index 100% rename from dev/issues/0007c-execution-store.md rename to dev/issues/completed/0007c-execution-store.md diff --git a/dev/issues/0007d-scheduler.md b/dev/issues/completed/0007d-scheduler.md similarity index 100% rename from dev/issues/0007d-scheduler.md rename to dev/issues/completed/0007d-scheduler.md diff --git a/dev/issues/0007e-dag-executor-app.md b/dev/issues/completed/0007e-dag-executor-app.md similarity index 100% rename from dev/issues/0007e-dag-executor-app.md rename to dev/issues/completed/0007e-dag-executor-app.md diff --git a/sources/sources.yaml b/sources/sources.yaml index c41a2c17..c4e3602f 100644 --- a/sources/sources.yaml +++ b/sources/sources.yaml @@ -234,3 +234,172 @@ repos: - id: wails_bind_crud_go_infra source_file: "" date: 2026-04-01 + + - repo: https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/egutierrez/DevLauncher.git + license: MIT + cloned_dir: DevLauncher + extracted: + # Phase 1: Go Pure — TUI (5) + - id: apply_gradient_go_tui + source_file: launcher/core/gradient.go + date: 2026-04-08 + - id: strip_ansi_go_tui + source_file: launcher/core/commands.go + date: 2026-04-08 + - id: normalize_terminal_output_go_tui + source_file: launcher/core/commands.go + date: 2026-04-08 + - id: draw_box_go_tui + source_file: launcher/ui/styles.go + date: 2026-04-08 + - id: draw_separator_go_tui + source_file: launcher/ui/styles.go + date: 2026-04-08 + # Phase 2: Go Pure — Core (5) + - id: longest_common_prefix_go_core + source_file: launcher/core/commands.go + date: 2026-04-08 + - id: split_command_and_arg_go_core + source_file: launcher/core/commands.go + date: 2026-04-08 + - id: compare_versions_go_core + source_file: installer/core/version.go + date: 2026-04-08 + - id: parse_version_go_core + source_file: installer/core/version.go + date: 2026-04-08 + - id: rel_or_full_go_core + source_file: launcher/core/script_query.go + date: 2026-04-08 + # Phase 3: Go Impure (3) + - id: load_ascii_art_go_tui + source_file: launcher/middleware/assets.go + date: 2026-04-08 + - id: read_dir_autocomplete_go_tui + source_file: launcher/middleware/command_fs.go + date: 2026-04-08 + - id: extract_script_description_go_shell + source_file: launcher/middleware/reader.go + date: 2026-04-08 + # Phase 4: Bash Library (6) + - id: bash_colors_bash_shell + source_file: scripts/lib/common.sh + date: 2026-04-08 + - id: bash_log_bash_shell + source_file: scripts/lib/common.sh + date: 2026-04-08 + - id: bash_check_deps_bash_shell + source_file: scripts/lib/common.sh + date: 2026-04-08 + - id: bash_confirm_bash_shell + source_file: scripts/lib/common.sh + date: 2026-04-08 + - id: bash_safe_run_bash_shell + source_file: scripts/lib/common.sh + date: 2026-04-08 + - id: bash_handle_error_bash_shell + source_file: scripts/lib/common.sh + date: 2026-04-08 + # Phase 5: Bash Cybersecurity (12) + - id: analyze_dns_bash_cybersecurity + source_file: scripts/linux/ciberseguridad/redes/analisis_dns.sh + date: 2026-04-08 + - id: list_active_connections_bash_cybersecurity + source_file: scripts/linux/ciberseguridad/redes/conexiones_activas.sh + date: 2026-04-08 + - id: geolocate_ip_bash_cybersecurity + source_file: scripts/linux/ciberseguridad/redes/geoip.sh + date: 2026-04-08 + - id: audit_ssh_config_bash_cybersecurity + source_file: scripts/linux/ciberseguridad/sistema/auditar_ssh.sh + date: 2026-04-08 + - id: check_firewall_bash_cybersecurity + source_file: scripts/linux/ciberseguridad/sistema/firewall_status.sh + date: 2026-04-08 + - id: detect_suspicious_users_bash_cybersecurity + source_file: scripts/linux/ciberseguridad/sistema/usuarios_sospechosos.sh + date: 2026-04-08 + - id: encrypt_file_bash_cybersecurity + source_file: scripts/linux/ciberseguridad/utilidades/cifrar_archivo.sh + date: 2026-04-08 + - id: generate_password_bash_cybersecurity + source_file: scripts/linux/ciberseguridad/utilidades/generar_password.sh + date: 2026-04-08 + - id: verify_file_hash_bash_cybersecurity + source_file: scripts/linux/ciberseguridad/utilidades/verificar_hash.sh + date: 2026-04-08 + - id: audit_http_headers_bash_cybersecurity + source_file: scripts/linux/ciberseguridad/web/cabeceras_http.sh + date: 2026-04-08 + - id: inspect_ssl_cert_bash_cybersecurity + source_file: scripts/linux/ciberseguridad/web/ssl_cert_info.sh + date: 2026-04-08 + - id: enumerate_subdomains_bash_cybersecurity + source_file: scripts/linux/ciberseguridad/web/subdominios.sh + date: 2026-04-08 + # Phase 6: Git Utils (4) + - id: git_repo_status_bash_shell + source_file: scripts/linux/git_utils/estado_repo.sh + date: 2026-04-08 + - id: git_clean_branches_bash_shell + source_file: scripts/linux/git_utils/limpiar_ramas.sh + date: 2026-04-08 + - id: git_push_all_remotes_bash_shell + source_file: scripts/linux/git_utils/push_todos_remotes.sh + date: 2026-04-08 + - id: git_log_visual_bash_shell + source_file: scripts/linux/git_utils/historial_commits.sh + date: 2026-04-08 + # Phase 7: Infra — System (3) + - id: analyze_disk_space_bash_infra + source_file: scripts/linux/gestion_linux/espacio_disponible.sh + date: 2026-04-08 + - id: list_listening_ports_bash_infra + source_file: scripts/linux/gestion_linux/puertos_activos.sh + date: 2026-04-08 + - id: detect_wsl_bash_infra + source_file: scripts/linux/gestion_linux/wsl_host.sh + date: 2026-04-08 + # Phase 7: Infra — Installers (7) + - id: install_go_bash_infra + source_file: scripts/linux/instaladores/instalar_go.sh + date: 2026-04-08 + - id: install_nodejs_bash_infra + source_file: scripts/linux/instaladores/instalar_nodejs.sh + date: 2026-04-08 + - id: install_pnpm_bash_infra + source_file: scripts/linux/instaladores/instalar_pnpm.sh + date: 2026-04-08 + - id: install_python312_bash_infra + source_file: scripts/linux/instaladores/instalar_python312.sh + date: 2026-04-08 + - id: install_uv_bash_infra + source_file: scripts/linux/instaladores/instalar_uv.sh + date: 2026-04-08 + - id: install_volta_bash_infra + source_file: scripts/linux/instaladores/instalar_volta.sh + date: 2026-04-08 + - id: install_wails_bash_infra + source_file: scripts/linux/instaladores/instalar_wails.sh + date: 2026-04-08 + # Phase 8: Shell Utils + Init (4) + - id: convert_text_case_bash_shell + source_file: scripts/linux/conversores/conversor_texto.sh + date: 2026-04-08 + - id: create_project_structure_bash_shell + source_file: scripts/linux/inicializar_repos/functional_structure.sh + date: 2026-04-08 + - id: init_go_module_bash_pipelines + source_file: scripts/linux/inicializar_repos/go/init_go_module.sh + date: 2026-04-08 + - id: init_go_project_bash_pipelines + source_file: scripts/linux/inicializar_repos/go/init_go_proyect.sh + date: 2026-04-08 + + - repo: https://github.com/daguflow/dagu + license: GPL-3.0 + cloned_dir: dagu + analyzed: true + extracted: [] + # GPL-3.0: no code extracted. YAML format studied for compatibility in issue 0007 (dag_engine). + # Our implementation is written from scratch with no dagu code copied. From dca507ce793a546e845f987839a8ce0d72797ca6 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Sun, 12 Apr 2026 13:08:13 +0200 Subject: [PATCH 4/4] chore: update gitignore files Agrega prompts/ al gitignore raiz. Actualiza dag_engine/.gitignore con patrones estandar para Go, frontend y editor. Co-Authored-By: Claude Opus 4.6 (1M context) --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 19b91767..755e00c7 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,4 @@ Thumbs.db broken_paths.txt imgui.ini +prompts/