package main import ( "context" "fmt" "log" "os" "path/filepath" "strings" "sync" "time" "fn-registry/functions/core" "fn-registry/functions/infra" "dag-engine/store" ) // Executor orchestrates DAG parsing, validation, and execution. type Executor struct { store *store.DB dagsDir string } // NewExecutor creates a new executor. func NewExecutor(s *store.DB, dagsDir string) *Executor { return &Executor{store: s, dagsDir: dagsDir} } // ExecuteDAG runs a DAG from a YAML file path and returns the run ID. // It runs asynchronously: steps execute in topological order with parallel levels. func (e *Executor) ExecuteDAG(ctx context.Context, dagPath string, trigger string) (string, error) { data, err := os.ReadFile(dagPath) if err != nil { return "", fmt.Errorf("read dag: %w", err) } dag, err := core.DagParse(data) if err != nil { return "", fmt.Errorf("parse dag: %w", err) } dag.FilePath = dagPath // Resolve env variables. dag = core.DagResolveEnv(dag, os.Environ()) // Validate. result := core.DagValidate(dag) if !result.Valid { return "", fmt.Errorf("validate dag: %s", strings.Join(result.Errors, "; ")) } // Create run record. runID := generateID() now := time.Now() run := &store.DagRun{ ID: runID, DagName: dag.Name, DagPath: dagPath, Status: "running", Trigger: trigger, StartedAt: now, } if err := e.store.CreateRun(run); err != nil { return "", fmt.Errorf("create run: %w", err) } // Topological sort. levels, err := core.DagTopoSort(dag.Steps) if err != nil { e.failRun(runID, err) return runID, err } // Setup DAGU_ENV temp file for inter-step communication. daguEnvFile, err := os.CreateTemp("", "dagu_env_*") if err != nil { e.failRun(runID, err) return runID, err } daguEnvPath := daguEnvFile.Name() daguEnvFile.Close() defer os.Remove(daguEnvPath) // Track step outputs for ${step_id.stdout} references. stepOutputs := make(map[string]string) // Execute levels. runFailed := false var runErr error for _, level := range levels { if runFailed { // Skip remaining levels, mark steps as skipped. for _, step := range level { e.recordStepSkipped(runID, step) } continue } var wg sync.WaitGroup var mu sync.Mutex levelFailed := false for _, step := range level { step := step wg.Add(1) go func() { defer wg.Done() mu.Lock() if levelFailed { mu.Unlock() e.recordStepSkipped(runID, step) return } mu.Unlock() err := e.executeStep(ctx, runID, dag, step, daguEnvPath, stepOutputs, &mu) if err != nil && !step.ContinueOn.Failure { mu.Lock() levelFailed = true runFailed = true runErr = fmt.Errorf("step %q failed: %w", stepName(step), err) mu.Unlock() } }() } wg.Wait() } // Run handlers. if runFailed { e.runHandlers(ctx, runID, dag, dag.HandlerOn.Failure, daguEnvPath, stepOutputs) } else { e.runHandlers(ctx, runID, dag, dag.HandlerOn.Success, daguEnvPath, stepOutputs) } e.runHandlers(ctx, runID, dag, dag.HandlerOn.Exit, daguEnvPath, stepOutputs) // Finalize run. fin := time.Now() status := "success" errMsg := "" if runFailed { status = "failed" if runErr != nil { errMsg = runErr.Error() } } e.store.UpdateRunStatus(runID, status, &fin, errMsg) return runID, runErr } // executeStep runs a single step, recording results in the store. func (e *Executor) executeStep(ctx context.Context, runID string, dag core.DagDefinition, step core.DagStep, daguEnvPath string, outputs map[string]string, mu *sync.Mutex) error { stepID := generateID() now := time.Now() e.store.InsertStepResult(&store.DagStepResult{ ID: stepID, RunID: runID, StepName: stepName(step), Status: "running", StartedAt: &now, }) // Build environment. env := buildStepEnv(dag, step, daguEnvPath, outputs) // Determine command. command := step.Command if command == "" && step.Script != "" { command = step.Script } if command == "" { e.store.UpdateStepResult(stepID, "skipped", 0, "", "", nil, 0, "no command or script") return nil } // Resolve step-level ${VAR} references and ${step_id.stdout} patterns. mu.Lock() command = resolveStepRefs(command, outputs) mu.Unlock() // Determine working directory. dir := step.Dir if dir == "" { dir = dag.WorkingDir } shell := step.Shell if shell == "" { shell = dag.Shell } // Spawn process. handle, err := infra.ProcessSpawn(command, dir, env, shell) if err != nil { fin := time.Now() e.store.UpdateStepResult(stepID, "failed", -1, "", "", &fin, time.Since(now).Milliseconds(), err.Error()) return err } // Wait for process. result, err := infra.ProcessWait(handle, step.TimeoutSec) fin := time.Now() duration := time.Since(now).Milliseconds() if err != nil && result.ExitCode == 0 { result.ExitCode = -1 } status := "success" errMsg := "" if result.ExitCode != 0 || err != nil { status = "failed" if err != nil { errMsg = err.Error() } } e.store.UpdateStepResult(stepID, status, result.ExitCode, result.Stdout, result.Stderr, &fin, duration, errMsg) // Store output for ${step_id.stdout} references. if step.ID != "" || step.Output != "" { mu.Lock() key := step.ID if key == "" { key = step.Output } outputs[key] = strings.TrimSpace(result.Stdout) mu.Unlock() } // Read DAGU_ENV for inter-step env propagation. readDaguEnv(daguEnvPath, outputs) if status == "failed" { return fmt.Errorf("exit code %d", result.ExitCode) } return nil } func (e *Executor) runHandlers(ctx context.Context, runID string, dag core.DagDefinition, handlers []core.DagStep, daguEnvPath string, outputs map[string]string) { var mu sync.Mutex for _, step := range handlers { e.executeStep(ctx, runID, dag, step, daguEnvPath, outputs, &mu) } } func (e *Executor) failRun(runID string, err error) { fin := time.Now() e.store.UpdateRunStatus(runID, "failed", &fin, err.Error()) } func (e *Executor) recordStepSkipped(runID string, step core.DagStep) { now := time.Now() e.store.InsertStepResult(&store.DagStepResult{ ID: generateID(), RunID: runID, StepName: stepName(step), Status: "skipped", StartedAt: &now, }) } // --- helpers --- func stepName(s core.DagStep) string { if s.Name != "" { return s.Name } return s.ID } func buildStepEnv(dag core.DagDefinition, step core.DagStep, daguEnvPath string, outputs map[string]string) []string { env := os.Environ() // Add DAG-level env. for k, v := range dag.Env { env = append(env, k+"="+v) } // Add step-level env. for k, v := range step.Env { env = append(env, k+"="+v) } // Add DAGU_ENV path. env = append(env, "DAGU_ENV="+daguEnvPath) return env } func resolveStepRefs(command string, outputs map[string]string) string { for k, v := range outputs { command = strings.ReplaceAll(command, "${"+k+".stdout}", v) command = strings.ReplaceAll(command, "$"+k+".stdout", v) } return command } func readDaguEnv(path string, outputs map[string]string) { data, err := os.ReadFile(path) if err != nil || len(data) == 0 { return } for _, line := range strings.Split(string(data), "\n") { line = strings.TrimSpace(line) if line == "" || strings.HasPrefix(line, "#") { continue } parts := strings.SplitN(line, "=", 2) if len(parts) == 2 { outputs[parts[0]] = parts[1] } } } // generateID creates a simple time-based unique ID. func generateID() string { return fmt.Sprintf("%d-%04x", time.Now().UnixNano(), time.Now().Nanosecond()%0xFFFF) } // --- DAG listing helpers --- // DagInfo summarizes a DAG file for listing. type DagInfo struct { Name string `json:"name"` Description string `json:"description,omitempty"` Schedule []string `json:"schedule,omitempty"` Tags []string `json:"tags,omitempty"` Type string `json:"type,omitempty"` FilePath string `json:"file_path"` Valid bool `json:"valid"` LastRun *store.DagRun `json:"last_run,omitempty"` } // ListDAGs scans a directory for YAML files and returns parsed DAG info. func (e *Executor) ListDAGs() ([]DagInfo, error) { entries, err := os.ReadDir(e.dagsDir) if err != nil { return nil, fmt.Errorf("read dags dir: %w", err) } var dags []DagInfo for _, entry := range entries { if entry.IsDir() { continue } ext := filepath.Ext(entry.Name()) if ext != ".yaml" && ext != ".yml" { continue } path := filepath.Join(e.dagsDir, entry.Name()) data, err := os.ReadFile(path) if err != nil { continue } dag, err := core.DagParse(data) if err != nil { dags = append(dags, DagInfo{ Name: strings.TrimSuffix(entry.Name(), ext), FilePath: path, Valid: false, }) continue } info := DagInfo{ Name: dag.Name, Description: dag.Description, Schedule: dag.Schedule, Tags: dag.Tags, Type: dag.Type, FilePath: path, Valid: true, } // Attach last run info. runs, _, _ := e.store.ListRuns(dag.Name, 1, 0) if len(runs) > 0 { info.LastRun = &runs[0] } dags = append(dags, info) } return dags, nil } // GetDAG returns detailed info for a specific DAG by name. func (e *Executor) GetDAG(name string) (*DagInfo, *core.DagDefinition, *core.DagValidationResult, error) { // Find the YAML file. entries, err := os.ReadDir(e.dagsDir) if err != nil { return nil, nil, nil, err } for _, entry := range entries { ext := filepath.Ext(entry.Name()) base := strings.TrimSuffix(entry.Name(), ext) if (ext != ".yaml" && ext != ".yml") || base != name { continue } path := filepath.Join(e.dagsDir, entry.Name()) data, err := os.ReadFile(path) if err != nil { return nil, nil, nil, err } dag, err := core.DagParse(data) if err != nil { return nil, nil, nil, fmt.Errorf("parse: %w", err) } dag.FilePath = path validationResult := core.DagValidate(dag) info := &DagInfo{ Name: dag.Name, Description: dag.Description, Schedule: dag.Schedule, Tags: dag.Tags, Type: dag.Type, FilePath: path, Valid: validationResult.Valid, } runs, _, _ := e.store.ListRuns(dag.Name, 1, 0) if len(runs) > 0 { info.LastRun = &runs[0] } return info, &dag, &validationResult, nil } return nil, nil, nil, fmt.Errorf("dag %q not found in %s", name, e.dagsDir) } // ValidateDAG parses and validates a DAG file, printing results. func ValidateDAG(path string) error { data, err := os.ReadFile(path) if err != nil { return err } dag, err := core.DagParse(data) if err != nil { return fmt.Errorf("parse error: %w", err) } result := core.DagValidate(dag) log.Printf("DAG: %s", dag.Name) log.Printf("Steps: %d", len(dag.Steps)) log.Printf("Schedule: %v", dag.Schedule) if result.Valid { log.Printf("Validation: PASS") log.Printf("Topological levels: %d", len(result.Levels)) for i, level := range result.Levels { log.Printf(" Level %d: %v", i, level) } } else { log.Printf("Validation: FAIL") for _, e := range result.Errors { log.Printf(" ERROR: %s", e) } } for _, w := range result.Warnings { log.Printf(" WARNING: %s", w) } if !result.Valid { return fmt.Errorf("validation failed") } return nil }