Files
fn_registry/apps/dag_engine/executor.go
T
egutierrez d9414e4cba feat: add dag_engine app — CLI + web frontend for DAG execution (0007e)
Full DAG engine app with CLI subcommands (run, list, status, validate, server)
and React/Mantine web frontend. Uses net/http + embedded Vite build. SQLite
store for run history. Scheduler with cron_ticker for automated execution.
Compatible with existing dagu YAML format.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 13:05:36 +02:00

483 lines
11 KiB
Go

package main
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
"fn-registry/functions/core"
"fn-registry/functions/infra"
"dag-engine/store"
)
// Executor orchestrates DAG parsing, validation, and execution.
type Executor struct {
store *store.DB
dagsDir string
}
// NewExecutor creates a new executor.
func NewExecutor(s *store.DB, dagsDir string) *Executor {
return &Executor{store: s, dagsDir: dagsDir}
}
// ExecuteDAG runs a DAG from a YAML file path and returns the run ID.
// It runs asynchronously: steps execute in topological order with parallel levels.
func (e *Executor) ExecuteDAG(ctx context.Context, dagPath string, trigger string) (string, error) {
data, err := os.ReadFile(dagPath)
if err != nil {
return "", fmt.Errorf("read dag: %w", err)
}
dag, err := core.DagParse(data)
if err != nil {
return "", fmt.Errorf("parse dag: %w", err)
}
dag.FilePath = dagPath
// Resolve env variables.
dag = core.DagResolveEnv(dag, os.Environ())
// Validate.
result := core.DagValidate(dag)
if !result.Valid {
return "", fmt.Errorf("validate dag: %s", strings.Join(result.Errors, "; "))
}
// Create run record.
runID := generateID()
now := time.Now()
run := &store.DagRun{
ID: runID,
DagName: dag.Name,
DagPath: dagPath,
Status: "running",
Trigger: trigger,
StartedAt: now,
}
if err := e.store.CreateRun(run); err != nil {
return "", fmt.Errorf("create run: %w", err)
}
// Topological sort.
levels, err := core.DagTopoSort(dag.Steps)
if err != nil {
e.failRun(runID, err)
return runID, err
}
// Setup DAGU_ENV temp file for inter-step communication.
daguEnvFile, err := os.CreateTemp("", "dagu_env_*")
if err != nil {
e.failRun(runID, err)
return runID, err
}
daguEnvPath := daguEnvFile.Name()
daguEnvFile.Close()
defer os.Remove(daguEnvPath)
// Track step outputs for ${step_id.stdout} references.
stepOutputs := make(map[string]string)
// Execute levels.
runFailed := false
var runErr error
for _, level := range levels {
if runFailed {
// Skip remaining levels, mark steps as skipped.
for _, step := range level {
e.recordStepSkipped(runID, step)
}
continue
}
var wg sync.WaitGroup
var mu sync.Mutex
levelFailed := false
for _, step := range level {
step := step
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
if levelFailed {
mu.Unlock()
e.recordStepSkipped(runID, step)
return
}
mu.Unlock()
err := e.executeStep(ctx, runID, dag, step, daguEnvPath, stepOutputs, &mu)
if err != nil && !step.ContinueOn.Failure {
mu.Lock()
levelFailed = true
runFailed = true
runErr = fmt.Errorf("step %q failed: %w", stepName(step), err)
mu.Unlock()
}
}()
}
wg.Wait()
}
// Run handlers.
if runFailed {
e.runHandlers(ctx, runID, dag, dag.HandlerOn.Failure, daguEnvPath, stepOutputs)
} else {
e.runHandlers(ctx, runID, dag, dag.HandlerOn.Success, daguEnvPath, stepOutputs)
}
e.runHandlers(ctx, runID, dag, dag.HandlerOn.Exit, daguEnvPath, stepOutputs)
// Finalize run.
fin := time.Now()
status := "success"
errMsg := ""
if runFailed {
status = "failed"
if runErr != nil {
errMsg = runErr.Error()
}
}
e.store.UpdateRunStatus(runID, status, &fin, errMsg)
return runID, runErr
}
// executeStep runs a single step, recording results in the store.
func (e *Executor) executeStep(ctx context.Context, runID string, dag core.DagDefinition, step core.DagStep, daguEnvPath string, outputs map[string]string, mu *sync.Mutex) error {
stepID := generateID()
now := time.Now()
e.store.InsertStepResult(&store.DagStepResult{
ID: stepID,
RunID: runID,
StepName: stepName(step),
Status: "running",
StartedAt: &now,
})
// Build environment.
env := buildStepEnv(dag, step, daguEnvPath, outputs)
// Determine command.
command := step.Command
if command == "" && step.Script != "" {
command = step.Script
}
if command == "" {
e.store.UpdateStepResult(stepID, "skipped", 0, "", "", nil, 0, "no command or script")
return nil
}
// Resolve step-level ${VAR} references and ${step_id.stdout} patterns.
mu.Lock()
command = resolveStepRefs(command, outputs)
mu.Unlock()
// Determine working directory.
dir := step.Dir
if dir == "" {
dir = dag.WorkingDir
}
shell := step.Shell
if shell == "" {
shell = dag.Shell
}
// Spawn process.
handle, err := infra.ProcessSpawn(command, dir, env, shell)
if err != nil {
fin := time.Now()
e.store.UpdateStepResult(stepID, "failed", -1, "", "", &fin, time.Since(now).Milliseconds(), err.Error())
return err
}
// Wait for process.
result, err := infra.ProcessWait(handle, step.TimeoutSec)
fin := time.Now()
duration := time.Since(now).Milliseconds()
if err != nil && result.ExitCode == 0 {
result.ExitCode = -1
}
status := "success"
errMsg := ""
if result.ExitCode != 0 || err != nil {
status = "failed"
if err != nil {
errMsg = err.Error()
}
}
e.store.UpdateStepResult(stepID, status, result.ExitCode, result.Stdout, result.Stderr, &fin, duration, errMsg)
// Store output for ${step_id.stdout} references.
if step.ID != "" || step.Output != "" {
mu.Lock()
key := step.ID
if key == "" {
key = step.Output
}
outputs[key] = strings.TrimSpace(result.Stdout)
mu.Unlock()
}
// Read DAGU_ENV for inter-step env propagation.
readDaguEnv(daguEnvPath, outputs)
if status == "failed" {
return fmt.Errorf("exit code %d", result.ExitCode)
}
return nil
}
func (e *Executor) runHandlers(ctx context.Context, runID string, dag core.DagDefinition, handlers []core.DagStep, daguEnvPath string, outputs map[string]string) {
var mu sync.Mutex
for _, step := range handlers {
e.executeStep(ctx, runID, dag, step, daguEnvPath, outputs, &mu)
}
}
func (e *Executor) failRun(runID string, err error) {
fin := time.Now()
e.store.UpdateRunStatus(runID, "failed", &fin, err.Error())
}
func (e *Executor) recordStepSkipped(runID string, step core.DagStep) {
now := time.Now()
e.store.InsertStepResult(&store.DagStepResult{
ID: generateID(),
RunID: runID,
StepName: stepName(step),
Status: "skipped",
StartedAt: &now,
})
}
// --- helpers ---
func stepName(s core.DagStep) string {
if s.Name != "" {
return s.Name
}
return s.ID
}
func buildStepEnv(dag core.DagDefinition, step core.DagStep, daguEnvPath string, outputs map[string]string) []string {
env := os.Environ()
// Add DAG-level env.
for k, v := range dag.Env {
env = append(env, k+"="+v)
}
// Add step-level env.
for k, v := range step.Env {
env = append(env, k+"="+v)
}
// Add DAGU_ENV path.
env = append(env, "DAGU_ENV="+daguEnvPath)
return env
}
func resolveStepRefs(command string, outputs map[string]string) string {
for k, v := range outputs {
command = strings.ReplaceAll(command, "${"+k+".stdout}", v)
command = strings.ReplaceAll(command, "$"+k+".stdout", v)
}
return command
}
func readDaguEnv(path string, outputs map[string]string) {
data, err := os.ReadFile(path)
if err != nil || len(data) == 0 {
return
}
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "#") {
continue
}
parts := strings.SplitN(line, "=", 2)
if len(parts) == 2 {
outputs[parts[0]] = parts[1]
}
}
}
// generateID creates a simple time-based unique ID.
func generateID() string {
return fmt.Sprintf("%d-%04x", time.Now().UnixNano(), time.Now().Nanosecond()%0xFFFF)
}
// --- DAG listing helpers ---
// DagInfo summarizes a DAG file for listing.
type DagInfo struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Schedule []string `json:"schedule,omitempty"`
Tags []string `json:"tags,omitempty"`
Type string `json:"type,omitempty"`
FilePath string `json:"file_path"`
Valid bool `json:"valid"`
LastRun *store.DagRun `json:"last_run,omitempty"`
}
// ListDAGs scans a directory for YAML files and returns parsed DAG info.
func (e *Executor) ListDAGs() ([]DagInfo, error) {
entries, err := os.ReadDir(e.dagsDir)
if err != nil {
return nil, fmt.Errorf("read dags dir: %w", err)
}
var dags []DagInfo
for _, entry := range entries {
if entry.IsDir() {
continue
}
ext := filepath.Ext(entry.Name())
if ext != ".yaml" && ext != ".yml" {
continue
}
path := filepath.Join(e.dagsDir, entry.Name())
data, err := os.ReadFile(path)
if err != nil {
continue
}
dag, err := core.DagParse(data)
if err != nil {
dags = append(dags, DagInfo{
Name: strings.TrimSuffix(entry.Name(), ext),
FilePath: path,
Valid: false,
})
continue
}
info := DagInfo{
Name: dag.Name,
Description: dag.Description,
Schedule: dag.Schedule,
Tags: dag.Tags,
Type: dag.Type,
FilePath: path,
Valid: true,
}
// Attach last run info.
runs, _, _ := e.store.ListRuns(dag.Name, 1, 0)
if len(runs) > 0 {
info.LastRun = &runs[0]
}
dags = append(dags, info)
}
return dags, nil
}
// GetDAG returns detailed info for a specific DAG by name.
func (e *Executor) GetDAG(name string) (*DagInfo, *core.DagDefinition, *core.DagValidationResult, error) {
// Find the YAML file.
entries, err := os.ReadDir(e.dagsDir)
if err != nil {
return nil, nil, nil, err
}
for _, entry := range entries {
ext := filepath.Ext(entry.Name())
base := strings.TrimSuffix(entry.Name(), ext)
if (ext != ".yaml" && ext != ".yml") || base != name {
continue
}
path := filepath.Join(e.dagsDir, entry.Name())
data, err := os.ReadFile(path)
if err != nil {
return nil, nil, nil, err
}
dag, err := core.DagParse(data)
if err != nil {
return nil, nil, nil, fmt.Errorf("parse: %w", err)
}
dag.FilePath = path
validationResult := core.DagValidate(dag)
info := &DagInfo{
Name: dag.Name,
Description: dag.Description,
Schedule: dag.Schedule,
Tags: dag.Tags,
Type: dag.Type,
FilePath: path,
Valid: validationResult.Valid,
}
runs, _, _ := e.store.ListRuns(dag.Name, 1, 0)
if len(runs) > 0 {
info.LastRun = &runs[0]
}
return info, &dag, &validationResult, nil
}
return nil, nil, nil, fmt.Errorf("dag %q not found in %s", name, e.dagsDir)
}
// ValidateDAG parses and validates a DAG file, printing results.
func ValidateDAG(path string) error {
data, err := os.ReadFile(path)
if err != nil {
return err
}
dag, err := core.DagParse(data)
if err != nil {
return fmt.Errorf("parse error: %w", err)
}
result := core.DagValidate(dag)
log.Printf("DAG: %s", dag.Name)
log.Printf("Steps: %d", len(dag.Steps))
log.Printf("Schedule: %v", dag.Schedule)
if result.Valid {
log.Printf("Validation: PASS")
log.Printf("Topological levels: %d", len(result.Levels))
for i, level := range result.Levels {
log.Printf(" Level %d: %v", i, level)
}
} else {
log.Printf("Validation: FAIL")
for _, e := range result.Errors {
log.Printf(" ERROR: %s", e)
}
}
for _, w := range result.Warnings {
log.Printf(" WARNING: %s", w)
}
if !result.Valid {
return fmt.Errorf("validation failed")
}
return nil
}