feat: add DAG core functions — parse, validate, topo sort, resolve env, cron match (0007a, 0007d)
Pure functions for parsing dagu-compatible YAML, validating DAG structure, topological sorting with parallel levels (Kahn's algorithm), and env variable resolution. Also adds cron_match for schedule matching. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,281 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// rawDagStep is the loosely-typed intermediate representation of a DAG step.
|
||||
type rawDagStep struct {
|
||||
Name string `yaml:"name"`
|
||||
ID string `yaml:"id"`
|
||||
Description string `yaml:"description"`
|
||||
Command string `yaml:"command"`
|
||||
Script string `yaml:"script"`
|
||||
Args []string `yaml:"args"`
|
||||
Shell string `yaml:"shell"`
|
||||
Dir string `yaml:"dir"`
|
||||
WorkingDir string `yaml:"working_dir"`
|
||||
Depends []string `yaml:"depends"`
|
||||
Env interface{} `yaml:"env"`
|
||||
ContinueOn rawDagContinueOn `yaml:"continue_on"`
|
||||
RetryPolicy rawDagRetryPolicy `yaml:"retry_policy"`
|
||||
TimeoutSec int `yaml:"timeout_sec"`
|
||||
Output string `yaml:"output"`
|
||||
Tags []string `yaml:"tags"`
|
||||
}
|
||||
|
||||
type rawDagContinueOn struct {
|
||||
Failure bool `yaml:"failure"`
|
||||
Skipped bool `yaml:"skipped"`
|
||||
}
|
||||
|
||||
type rawDagRetryPolicy struct {
|
||||
Limit int `yaml:"limit"`
|
||||
IntervalSec int `yaml:"interval_sec"`
|
||||
}
|
||||
|
||||
// rawDagHandlers handles both handler_on and handlers aliases.
|
||||
type rawDagHandlers struct {
|
||||
Init interface{} `yaml:"init"`
|
||||
Success interface{} `yaml:"success"`
|
||||
Failure interface{} `yaml:"failure"`
|
||||
Exit interface{} `yaml:"exit"`
|
||||
}
|
||||
|
||||
// rawDag is the loosely-typed intermediate representation of a DAG YAML.
|
||||
type rawDag struct {
|
||||
Name string `yaml:"name"`
|
||||
Description string `yaml:"description"`
|
||||
Group string `yaml:"group"`
|
||||
Type string `yaml:"type"`
|
||||
WorkingDir string `yaml:"working_dir"`
|
||||
Shell string `yaml:"shell"`
|
||||
Env interface{} `yaml:"env"`
|
||||
Schedule interface{} `yaml:"schedule"`
|
||||
Steps []rawDagStep `yaml:"steps"`
|
||||
HandlerOn *rawDagHandlers `yaml:"handler_on"`
|
||||
Handlers *rawDagHandlers `yaml:"handlers"`
|
||||
Tags []string `yaml:"tags"`
|
||||
TimeoutSec int `yaml:"timeout_sec"`
|
||||
}
|
||||
|
||||
// DagParse parses a YAML DAG definition compatible with Dagu format.
|
||||
// Handles schedule as string or list, env as list of single-key maps,
|
||||
// handler_on and handlers as aliases, and steps with command/script/depends.
|
||||
func DagParse(data []byte) (DagDefinition, error) {
|
||||
var raw rawDag
|
||||
if err := yaml.Unmarshal(data, &raw); err != nil {
|
||||
return DagDefinition{}, fmt.Errorf("dag_parse: yaml unmarshal: %w", err)
|
||||
}
|
||||
|
||||
def := DagDefinition{
|
||||
Name: raw.Name,
|
||||
Description: raw.Description,
|
||||
Group: raw.Group,
|
||||
Type: raw.Type,
|
||||
WorkingDir: raw.WorkingDir,
|
||||
Shell: raw.Shell,
|
||||
Tags: raw.Tags,
|
||||
TimeoutSec: raw.TimeoutSec,
|
||||
}
|
||||
|
||||
// Normalize env (list of single-key maps or plain map).
|
||||
dagEnv, err := normalizeEnv(raw.Env)
|
||||
if err != nil {
|
||||
return DagDefinition{}, fmt.Errorf("dag_parse: env: %w", err)
|
||||
}
|
||||
def.Env = dagEnv
|
||||
|
||||
// Normalize schedule (string or list).
|
||||
def.Schedule = normalizeSchedule(raw.Schedule)
|
||||
|
||||
// Normalize steps.
|
||||
steps := make([]DagStep, 0, len(raw.Steps))
|
||||
for i, rs := range raw.Steps {
|
||||
step, err := normalizeStep(rs)
|
||||
if err != nil {
|
||||
return DagDefinition{}, fmt.Errorf("dag_parse: step[%d]: %w", i, err)
|
||||
}
|
||||
steps = append(steps, step)
|
||||
}
|
||||
def.Steps = steps
|
||||
|
||||
// Normalize handlers: handler_on takes precedence, fallback to handlers.
|
||||
rawHandlers := raw.HandlerOn
|
||||
if rawHandlers == nil {
|
||||
rawHandlers = raw.Handlers
|
||||
}
|
||||
if rawHandlers != nil {
|
||||
handlers, err := normalizeHandlers(rawHandlers)
|
||||
if err != nil {
|
||||
return DagDefinition{}, fmt.Errorf("dag_parse: handlers: %w", err)
|
||||
}
|
||||
def.HandlerOn = handlers
|
||||
}
|
||||
|
||||
return def, nil
|
||||
}
|
||||
|
||||
// normalizeSchedule converts schedule from string or []interface{} to []string.
|
||||
func normalizeSchedule(v interface{}) []string {
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
switch t := v.(type) {
|
||||
case string:
|
||||
s := strings.TrimSpace(t)
|
||||
if s == "" {
|
||||
return nil
|
||||
}
|
||||
return []string{s}
|
||||
case []interface{}:
|
||||
result := make([]string, 0, len(t))
|
||||
for _, item := range t {
|
||||
if s, ok := item.(string); ok {
|
||||
s = strings.TrimSpace(s)
|
||||
if s != "" {
|
||||
result = append(result, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// normalizeEnv converts env from Dagu format (list of single-key maps) or plain map to map[string]string.
|
||||
func normalizeEnv(v interface{}) (map[string]string, error) {
|
||||
if v == nil {
|
||||
return nil, nil
|
||||
}
|
||||
switch t := v.(type) {
|
||||
case map[string]interface{}:
|
||||
// Plain YAML map.
|
||||
result := make(map[string]string, len(t))
|
||||
for k, val := range t {
|
||||
result[k] = fmt.Sprintf("%v", val)
|
||||
}
|
||||
return result, nil
|
||||
case []interface{}:
|
||||
// Dagu format: list of single-key maps [KEY: value, KEY2: value2].
|
||||
result := make(map[string]string)
|
||||
for _, item := range t {
|
||||
m, ok := item.(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for k, val := range m {
|
||||
result[k] = fmt.Sprintf("%v", val)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
return nil, fmt.Errorf("unexpected env type %T", v)
|
||||
}
|
||||
|
||||
// normalizeStep converts a rawDagStep to a DagStep.
|
||||
func normalizeStep(rs rawDagStep) (DagStep, error) {
|
||||
stepEnv, err := normalizeEnv(rs.Env)
|
||||
if err != nil {
|
||||
return DagStep{}, fmt.Errorf("env: %w", err)
|
||||
}
|
||||
|
||||
// working_dir is an alias for dir at the step level.
|
||||
dir := rs.Dir
|
||||
if dir == "" {
|
||||
dir = rs.WorkingDir
|
||||
}
|
||||
|
||||
return DagStep{
|
||||
Name: rs.Name,
|
||||
ID: rs.ID,
|
||||
Description: rs.Description,
|
||||
Command: rs.Command,
|
||||
Script: rs.Script,
|
||||
Args: rs.Args,
|
||||
Shell: rs.Shell,
|
||||
Dir: dir,
|
||||
Depends: rs.Depends,
|
||||
Env: stepEnv,
|
||||
ContinueOn: DagContinueOn{
|
||||
Failure: rs.ContinueOn.Failure,
|
||||
Skipped: rs.ContinueOn.Skipped,
|
||||
},
|
||||
RetryPolicy: DagRetryPolicy{
|
||||
Limit: rs.RetryPolicy.Limit,
|
||||
IntervalSec: rs.RetryPolicy.IntervalSec,
|
||||
},
|
||||
TimeoutSec: rs.TimeoutSec,
|
||||
Output: rs.Output,
|
||||
Tags: rs.Tags,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// normalizeHandlers converts rawDagHandlers to DagHandlers.
|
||||
// Each handler field can be a single step object or a list of steps.
|
||||
func normalizeHandlers(rh *rawDagHandlers) (DagHandlers, error) {
|
||||
var h DagHandlers
|
||||
var err error
|
||||
h.Init, err = normalizeHandlerField(rh.Init)
|
||||
if err != nil {
|
||||
return DagHandlers{}, fmt.Errorf("init: %w", err)
|
||||
}
|
||||
h.Success, err = normalizeHandlerField(rh.Success)
|
||||
if err != nil {
|
||||
return DagHandlers{}, fmt.Errorf("success: %w", err)
|
||||
}
|
||||
h.Failure, err = normalizeHandlerField(rh.Failure)
|
||||
if err != nil {
|
||||
return DagHandlers{}, fmt.Errorf("failure: %w", err)
|
||||
}
|
||||
h.Exit, err = normalizeHandlerField(rh.Exit)
|
||||
if err != nil {
|
||||
return DagHandlers{}, fmt.Errorf("exit: %w", err)
|
||||
}
|
||||
return h, nil
|
||||
}
|
||||
|
||||
// normalizeHandlerField converts a handler field (single step or list) to []DagStep.
|
||||
func normalizeHandlerField(v interface{}) ([]DagStep, error) {
|
||||
if v == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Re-marshal and unmarshal to handle polymorphic types cleanly.
|
||||
b, err := yaml.Marshal(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("re-marshal handler: %w", err)
|
||||
}
|
||||
|
||||
// Try as a list first.
|
||||
var rawList []rawDagStep
|
||||
if err := yaml.Unmarshal(b, &rawList); err == nil && len(rawList) > 0 {
|
||||
steps := make([]DagStep, 0, len(rawList))
|
||||
for i, rs := range rawList {
|
||||
step, err := normalizeStep(rs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("step[%d]: %w", i, err)
|
||||
}
|
||||
steps = append(steps, step)
|
||||
}
|
||||
return steps, nil
|
||||
}
|
||||
|
||||
// Try as single step.
|
||||
var rs rawDagStep
|
||||
if err := yaml.Unmarshal(b, &rs); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal handler step: %w", err)
|
||||
}
|
||||
step, err := normalizeStep(rs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// If the step has no meaningful content, return nil.
|
||||
if step.Name == "" && step.ID == "" && step.Command == "" && step.Script == "" {
|
||||
return nil, nil
|
||||
}
|
||||
return []DagStep{step}, nil
|
||||
}
|
||||
Reference in New Issue
Block a user