feat(core): dag_parse parsea continue_on.exit_code + retry_policy (v1.1.0)

DagContinueOn gana el campo ExitCodes []int (codigos de salida tolerados) y el
parser mapea continue_on.exit_code desde el YAML. retry_policy (limit,
interval_sec) ya existia en el modelo y ahora queda documentado como contrato
estable para los executors.

Funcion pura: solo normaliza el YAML al modelo DagDefinition; la interpretacion
(reintentar, tolerar codigos) vive en el executor que lo consuma (dag_engine).

Test: 'parsea continue_on.exit_code y retry_policy'. Tag de grupo: scheduler.
This commit is contained in:
Egutierrez
2026-06-03 12:44:26 +02:00
parent ba5d262c6c
commit 3c1061fbd8
4 changed files with 71 additions and 7 deletions
+5
View File
@@ -4,6 +4,11 @@ package core
type DagContinueOn struct { type DagContinueOn struct {
Failure bool Failure bool
Skipped bool Skipped bool
// ExitCodes lists exit codes that are tolerated: if the step exits with one
// of these non-zero codes, it is treated as a non-failure (the DAG keeps
// going and the run is not marked failed). The real exit code is still
// recorded. Empty means only exit 0 is a success (unless Failure is set).
ExitCodes []int
} }
// DagRetryPolicy configures automatic retries for a step. // DagRetryPolicy configures automatic retries for a step.
+6 -4
View File
@@ -29,8 +29,9 @@ type rawDagStep struct {
} }
type rawDagContinueOn struct { type rawDagContinueOn struct {
Failure bool `yaml:"failure"` Failure bool `yaml:"failure"`
Skipped bool `yaml:"skipped"` Skipped bool `yaml:"skipped"`
ExitCode []int `yaml:"exit_code"`
} }
type rawDagRetryPolicy struct { type rawDagRetryPolicy struct {
@@ -203,8 +204,9 @@ func normalizeStep(rs rawDagStep) (DagStep, error) {
Depends: rs.Depends, Depends: rs.Depends,
Env: stepEnv, Env: stepEnv,
ContinueOn: DagContinueOn{ ContinueOn: DagContinueOn{
Failure: rs.ContinueOn.Failure, Failure: rs.ContinueOn.Failure,
Skipped: rs.ContinueOn.Skipped, Skipped: rs.ContinueOn.Skipped,
ExitCodes: rs.ContinueOn.ExitCode,
}, },
RetryPolicy: DagRetryPolicy{ RetryPolicy: DagRetryPolicy{
Limit: rs.RetryPolicy.Limit, Limit: rs.RetryPolicy.Limit,
+29 -3
View File
@@ -3,11 +3,11 @@ name: dag_parse
kind: function kind: function
lang: go lang: go
domain: core domain: core
version: "1.0.0" version: "1.1.0"
purity: pure purity: pure
signature: "func DagParse(data []byte) (DagDefinition, error)" signature: "func DagParse(data []byte) (DagDefinition, error)"
description: "Parsea YAML de definicion de DAG en formato compatible con Dagu. Soporta schedule como string o lista, env como lista de maps single-key (formato Dagu), handler_on y handlers como aliases, steps con command/script/depends/continue_on, y type graph." description: "Parsea YAML de definicion de DAG en formato compatible con Dagu. Soporta schedule como string o lista, env como lista de maps single-key (formato Dagu), handler_on y handlers como aliases, steps con command/script/depends/continue_on (failure, skipped y exit_code) y retry_policy (limit, interval_sec), y type graph."
tags: [dag, yaml, parsing, workflow, dagu, pure] tags: [dag, yaml, parsing, workflow, dagu, pure, scheduler]
uses_functions: [] uses_functions: []
uses_types: [dag_definition_go_core, dag_step_go_core, dag_handlers_go_core] uses_types: [dag_definition_go_core, dag_step_go_core, dag_handlers_go_core]
returns: [] returns: []
@@ -25,6 +25,7 @@ tests:
- "parsea env en formato lista de maps" - "parsea env en formato lista de maps"
- "parsea handler_on y handlers como alias" - "parsea handler_on y handlers como alias"
- "parsea continue_on y working_dir a nivel step" - "parsea continue_on y working_dir a nivel step"
- "parsea continue_on.exit_code y retry_policy"
- "parsea type graph" - "parsea type graph"
test_file_path: "functions/core/dag_parse_test.go" test_file_path: "functions/core/dag_parse_test.go"
file_path: "functions/core/dag_parse.go" file_path: "functions/core/dag_parse.go"
@@ -49,6 +50,31 @@ dag, err := DagParse(data)
// dag.Steps[1].Depends = ["hello"] // dag.Steps[1].Depends = ["hello"]
``` ```
Step con tolerancia de fallos y reintentos:
```go
data := []byte(`
name: backup
steps:
- name: snapshot
command: ./backup.sh
continue_on:
exit_code: [4] # exit 4 = "ok con avisos", no rompe el DAG
retry_policy:
limit: 3 # hasta 3 reintentos
interval_sec: 10 # 10s entre intentos
`)
dag, _ := DagParse(data)
// dag.Steps[0].ContinueOn.ExitCodes = [4]
// dag.Steps[0].RetryPolicy.Limit = 3
```
## Notas ## Notas
Funcion pura (el YAML es inmutable, no hay I/O). Internamente usa un struct rawDag para deserializar loosely y luego normaliza campos polimorficos. La estrategia de normalizacion: schedule string->[]string, env lista->map, handlers single-o-lista->[]DagStep. handler_on tiene precedencia sobre handlers si ambos estan presentes. Funcion pura (el YAML es inmutable, no hay I/O). Internamente usa un struct rawDag para deserializar loosely y luego normaliza campos polimorficos. La estrategia de normalizacion: schedule string->[]string, env lista->map, handlers single-o-lista->[]DagStep. handler_on tiene precedencia sobre handlers si ambos estan presentes.
`continue_on` y `retry_policy` son declarativos: `DagParse` solo los normaliza al modelo `DagDefinition`. Quien los interpreta (reintentar, tolerar exit codes) es el executor que consuma el DAG — en este ecosistema, `apps/dag_engine`.
## Capability growth log
- v1.1.0 (2026-06-03) — minor: parsea `continue_on.exit_code` (lista de codigos de salida tolerados) y expone `retry_policy` ya existente en el modelo. Habilita reintentos y tolerancia de fallos reales en el executor de `dag_engine`.
+31
View File
@@ -187,6 +187,37 @@ steps:
} }
}) })
t.Run("parsea continue_on.exit_code y retry_policy", func(t *testing.T) {
data := []byte(`
name: dag-retry
steps:
- name: backup
command: ./backup.sh
continue_on:
exit_code: [4, 5]
retry_policy:
limit: 3
interval_sec: 10
`)
dag, err := DagParse(data)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(dag.Steps) != 1 {
t.Fatalf("Steps: got %d, want 1", len(dag.Steps))
}
step := dag.Steps[0]
if len(step.ContinueOn.ExitCodes) != 2 || step.ContinueOn.ExitCodes[0] != 4 || step.ContinueOn.ExitCodes[1] != 5 {
t.Errorf("ContinueOn.ExitCodes: got %v, want [4 5]", step.ContinueOn.ExitCodes)
}
if step.RetryPolicy.Limit != 3 {
t.Errorf("RetryPolicy.Limit: got %d, want 3", step.RetryPolicy.Limit)
}
if step.RetryPolicy.IntervalSec != 10 {
t.Errorf("RetryPolicy.IntervalSec: got %d, want 10", step.RetryPolicy.IntervalSec)
}
})
t.Run("parsea step con function id y args", func(t *testing.T) { t.Run("parsea step con function id y args", func(t *testing.T) {
data := []byte(` data := []byte(`
name: dag-function name: dag-function