From 3c1061fbd870f37b08551ac53b04c1a224c51517 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Wed, 3 Jun 2026 12:44:26 +0200 Subject: [PATCH] feat(core): dag_parse parsea continue_on.exit_code + retry_policy (v1.1.0) DagContinueOn gana el campo ExitCodes []int (codigos de salida tolerados) y el parser mapea continue_on.exit_code desde el YAML. retry_policy (limit, interval_sec) ya existia en el modelo y ahora queda documentado como contrato estable para los executors. Funcion pura: solo normaliza el YAML al modelo DagDefinition; la interpretacion (reintentar, tolerar codigos) vive en el executor que lo consuma (dag_engine). Test: 'parsea continue_on.exit_code y retry_policy'. Tag de grupo: scheduler. --- functions/core/dag_definition.go | 5 +++++ functions/core/dag_parse.go | 10 ++++++---- functions/core/dag_parse.md | 32 +++++++++++++++++++++++++++++--- functions/core/dag_parse_test.go | 31 +++++++++++++++++++++++++++++++ 4 files changed, 71 insertions(+), 7 deletions(-) diff --git a/functions/core/dag_definition.go b/functions/core/dag_definition.go index 4a3a0178..b7bb9ec7 100644 --- a/functions/core/dag_definition.go +++ b/functions/core/dag_definition.go @@ -4,6 +4,11 @@ package core type DagContinueOn struct { Failure bool Skipped bool + // ExitCodes lists exit codes that are tolerated: if the step exits with one + // of these non-zero codes, it is treated as a non-failure (the DAG keeps + // going and the run is not marked failed). The real exit code is still + // recorded. Empty means only exit 0 is a success (unless Failure is set). + ExitCodes []int } // DagRetryPolicy configures automatic retries for a step. diff --git a/functions/core/dag_parse.go b/functions/core/dag_parse.go index 4a67b142..01280aa9 100644 --- a/functions/core/dag_parse.go +++ b/functions/core/dag_parse.go @@ -29,8 +29,9 @@ type rawDagStep struct { } type rawDagContinueOn struct { - Failure bool `yaml:"failure"` - Skipped bool `yaml:"skipped"` + Failure bool `yaml:"failure"` + Skipped bool `yaml:"skipped"` + ExitCode []int `yaml:"exit_code"` } type rawDagRetryPolicy struct { @@ -203,8 +204,9 @@ func normalizeStep(rs rawDagStep) (DagStep, error) { Depends: rs.Depends, Env: stepEnv, ContinueOn: DagContinueOn{ - Failure: rs.ContinueOn.Failure, - Skipped: rs.ContinueOn.Skipped, + Failure: rs.ContinueOn.Failure, + Skipped: rs.ContinueOn.Skipped, + ExitCodes: rs.ContinueOn.ExitCode, }, RetryPolicy: DagRetryPolicy{ Limit: rs.RetryPolicy.Limit, diff --git a/functions/core/dag_parse.md b/functions/core/dag_parse.md index 8b61aae5..e17b258d 100644 --- a/functions/core/dag_parse.md +++ b/functions/core/dag_parse.md @@ -3,11 +3,11 @@ name: dag_parse kind: function lang: go domain: core -version: "1.0.0" +version: "1.1.0" purity: pure signature: "func DagParse(data []byte) (DagDefinition, error)" -description: "Parsea YAML de definicion de DAG en formato compatible con Dagu. Soporta schedule como string o lista, env como lista de maps single-key (formato Dagu), handler_on y handlers como aliases, steps con command/script/depends/continue_on, y type graph." -tags: [dag, yaml, parsing, workflow, dagu, pure] +description: "Parsea YAML de definicion de DAG en formato compatible con Dagu. Soporta schedule como string o lista, env como lista de maps single-key (formato Dagu), handler_on y handlers como aliases, steps con command/script/depends/continue_on (failure, skipped y exit_code) y retry_policy (limit, interval_sec), y type graph." +tags: [dag, yaml, parsing, workflow, dagu, pure, scheduler] uses_functions: [] uses_types: [dag_definition_go_core, dag_step_go_core, dag_handlers_go_core] returns: [] @@ -25,6 +25,7 @@ tests: - "parsea env en formato lista de maps" - "parsea handler_on y handlers como alias" - "parsea continue_on y working_dir a nivel step" + - "parsea continue_on.exit_code y retry_policy" - "parsea type graph" test_file_path: "functions/core/dag_parse_test.go" file_path: "functions/core/dag_parse.go" @@ -49,6 +50,31 @@ dag, err := DagParse(data) // dag.Steps[1].Depends = ["hello"] ``` +Step con tolerancia de fallos y reintentos: + +```go +data := []byte(` +name: backup +steps: + - name: snapshot + command: ./backup.sh + continue_on: + exit_code: [4] # exit 4 = "ok con avisos", no rompe el DAG + retry_policy: + limit: 3 # hasta 3 reintentos + interval_sec: 10 # 10s entre intentos +`) +dag, _ := DagParse(data) +// dag.Steps[0].ContinueOn.ExitCodes = [4] +// dag.Steps[0].RetryPolicy.Limit = 3 +``` + ## Notas Funcion pura (el YAML es inmutable, no hay I/O). Internamente usa un struct rawDag para deserializar loosely y luego normaliza campos polimorficos. La estrategia de normalizacion: schedule string->[]string, env lista->map, handlers single-o-lista->[]DagStep. handler_on tiene precedencia sobre handlers si ambos estan presentes. + +`continue_on` y `retry_policy` son declarativos: `DagParse` solo los normaliza al modelo `DagDefinition`. Quien los interpreta (reintentar, tolerar exit codes) es el executor que consuma el DAG — en este ecosistema, `apps/dag_engine`. + +## Capability growth log + +- v1.1.0 (2026-06-03) — minor: parsea `continue_on.exit_code` (lista de codigos de salida tolerados) y expone `retry_policy` ya existente en el modelo. Habilita reintentos y tolerancia de fallos reales en el executor de `dag_engine`. diff --git a/functions/core/dag_parse_test.go b/functions/core/dag_parse_test.go index bc57c58a..9edc6c03 100644 --- a/functions/core/dag_parse_test.go +++ b/functions/core/dag_parse_test.go @@ -187,6 +187,37 @@ steps: } }) + t.Run("parsea continue_on.exit_code y retry_policy", func(t *testing.T) { + data := []byte(` +name: dag-retry +steps: + - name: backup + command: ./backup.sh + continue_on: + exit_code: [4, 5] + retry_policy: + limit: 3 + interval_sec: 10 +`) + dag, err := DagParse(data) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(dag.Steps) != 1 { + t.Fatalf("Steps: got %d, want 1", len(dag.Steps)) + } + step := dag.Steps[0] + if len(step.ContinueOn.ExitCodes) != 2 || step.ContinueOn.ExitCodes[0] != 4 || step.ContinueOn.ExitCodes[1] != 5 { + t.Errorf("ContinueOn.ExitCodes: got %v, want [4 5]", step.ContinueOn.ExitCodes) + } + if step.RetryPolicy.Limit != 3 { + t.Errorf("RetryPolicy.Limit: got %d, want 3", step.RetryPolicy.Limit) + } + if step.RetryPolicy.IntervalSec != 10 { + t.Errorf("RetryPolicy.IntervalSec: got %d, want 10", step.RetryPolicy.IntervalSec) + } + }) + t.Run("parsea step con function id y args", func(t *testing.T) { data := []byte(` name: dag-function