feat: add dag_engine app — CLI + web frontend for DAG execution (0007e)
Full DAG engine app with CLI subcommands (run, list, status, validate, server) and React/Mantine web frontend. Uses net/http + embedded Vite build. SQLite store for run history. Scheduler with cron_ticker for automated execution. Compatible with existing dagu YAML format. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,482 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"fn-registry/functions/core"
|
||||
"fn-registry/functions/infra"
|
||||
|
||||
"dag-engine/store"
|
||||
)
|
||||
|
||||
// Executor orchestrates DAG parsing, validation, and execution.
|
||||
type Executor struct {
|
||||
store *store.DB
|
||||
dagsDir string
|
||||
}
|
||||
|
||||
// NewExecutor creates a new executor.
|
||||
func NewExecutor(s *store.DB, dagsDir string) *Executor {
|
||||
return &Executor{store: s, dagsDir: dagsDir}
|
||||
}
|
||||
|
||||
// ExecuteDAG runs a DAG from a YAML file path and returns the run ID.
|
||||
// It runs asynchronously: steps execute in topological order with parallel levels.
|
||||
func (e *Executor) ExecuteDAG(ctx context.Context, dagPath string, trigger string) (string, error) {
|
||||
data, err := os.ReadFile(dagPath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("read dag: %w", err)
|
||||
}
|
||||
|
||||
dag, err := core.DagParse(data)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("parse dag: %w", err)
|
||||
}
|
||||
dag.FilePath = dagPath
|
||||
|
||||
// Resolve env variables.
|
||||
dag = core.DagResolveEnv(dag, os.Environ())
|
||||
|
||||
// Validate.
|
||||
result := core.DagValidate(dag)
|
||||
if !result.Valid {
|
||||
return "", fmt.Errorf("validate dag: %s", strings.Join(result.Errors, "; "))
|
||||
}
|
||||
|
||||
// Create run record.
|
||||
runID := generateID()
|
||||
now := time.Now()
|
||||
run := &store.DagRun{
|
||||
ID: runID,
|
||||
DagName: dag.Name,
|
||||
DagPath: dagPath,
|
||||
Status: "running",
|
||||
Trigger: trigger,
|
||||
StartedAt: now,
|
||||
}
|
||||
if err := e.store.CreateRun(run); err != nil {
|
||||
return "", fmt.Errorf("create run: %w", err)
|
||||
}
|
||||
|
||||
// Topological sort.
|
||||
levels, err := core.DagTopoSort(dag.Steps)
|
||||
if err != nil {
|
||||
e.failRun(runID, err)
|
||||
return runID, err
|
||||
}
|
||||
|
||||
// Setup DAGU_ENV temp file for inter-step communication.
|
||||
daguEnvFile, err := os.CreateTemp("", "dagu_env_*")
|
||||
if err != nil {
|
||||
e.failRun(runID, err)
|
||||
return runID, err
|
||||
}
|
||||
daguEnvPath := daguEnvFile.Name()
|
||||
daguEnvFile.Close()
|
||||
defer os.Remove(daguEnvPath)
|
||||
|
||||
// Track step outputs for ${step_id.stdout} references.
|
||||
stepOutputs := make(map[string]string)
|
||||
|
||||
// Execute levels.
|
||||
runFailed := false
|
||||
var runErr error
|
||||
|
||||
for _, level := range levels {
|
||||
if runFailed {
|
||||
// Skip remaining levels, mark steps as skipped.
|
||||
for _, step := range level {
|
||||
e.recordStepSkipped(runID, step)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
levelFailed := false
|
||||
|
||||
for _, step := range level {
|
||||
step := step
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
mu.Lock()
|
||||
if levelFailed {
|
||||
mu.Unlock()
|
||||
e.recordStepSkipped(runID, step)
|
||||
return
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
err := e.executeStep(ctx, runID, dag, step, daguEnvPath, stepOutputs, &mu)
|
||||
if err != nil && !step.ContinueOn.Failure {
|
||||
mu.Lock()
|
||||
levelFailed = true
|
||||
runFailed = true
|
||||
runErr = fmt.Errorf("step %q failed: %w", stepName(step), err)
|
||||
mu.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Run handlers.
|
||||
if runFailed {
|
||||
e.runHandlers(ctx, runID, dag, dag.HandlerOn.Failure, daguEnvPath, stepOutputs)
|
||||
} else {
|
||||
e.runHandlers(ctx, runID, dag, dag.HandlerOn.Success, daguEnvPath, stepOutputs)
|
||||
}
|
||||
e.runHandlers(ctx, runID, dag, dag.HandlerOn.Exit, daguEnvPath, stepOutputs)
|
||||
|
||||
// Finalize run.
|
||||
fin := time.Now()
|
||||
status := "success"
|
||||
errMsg := ""
|
||||
if runFailed {
|
||||
status = "failed"
|
||||
if runErr != nil {
|
||||
errMsg = runErr.Error()
|
||||
}
|
||||
}
|
||||
e.store.UpdateRunStatus(runID, status, &fin, errMsg)
|
||||
|
||||
return runID, runErr
|
||||
}
|
||||
|
||||
// executeStep runs a single step, recording results in the store.
|
||||
func (e *Executor) executeStep(ctx context.Context, runID string, dag core.DagDefinition, step core.DagStep, daguEnvPath string, outputs map[string]string, mu *sync.Mutex) error {
|
||||
stepID := generateID()
|
||||
now := time.Now()
|
||||
e.store.InsertStepResult(&store.DagStepResult{
|
||||
ID: stepID,
|
||||
RunID: runID,
|
||||
StepName: stepName(step),
|
||||
Status: "running",
|
||||
StartedAt: &now,
|
||||
})
|
||||
|
||||
// Build environment.
|
||||
env := buildStepEnv(dag, step, daguEnvPath, outputs)
|
||||
|
||||
// Determine command.
|
||||
command := step.Command
|
||||
if command == "" && step.Script != "" {
|
||||
command = step.Script
|
||||
}
|
||||
if command == "" {
|
||||
e.store.UpdateStepResult(stepID, "skipped", 0, "", "", nil, 0, "no command or script")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Resolve step-level ${VAR} references and ${step_id.stdout} patterns.
|
||||
mu.Lock()
|
||||
command = resolveStepRefs(command, outputs)
|
||||
mu.Unlock()
|
||||
|
||||
// Determine working directory.
|
||||
dir := step.Dir
|
||||
if dir == "" {
|
||||
dir = dag.WorkingDir
|
||||
}
|
||||
|
||||
shell := step.Shell
|
||||
if shell == "" {
|
||||
shell = dag.Shell
|
||||
}
|
||||
|
||||
// Spawn process.
|
||||
handle, err := infra.ProcessSpawn(command, dir, env, shell)
|
||||
if err != nil {
|
||||
fin := time.Now()
|
||||
e.store.UpdateStepResult(stepID, "failed", -1, "", "", &fin, time.Since(now).Milliseconds(), err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait for process.
|
||||
result, err := infra.ProcessWait(handle, step.TimeoutSec)
|
||||
fin := time.Now()
|
||||
duration := time.Since(now).Milliseconds()
|
||||
|
||||
if err != nil && result.ExitCode == 0 {
|
||||
result.ExitCode = -1
|
||||
}
|
||||
|
||||
status := "success"
|
||||
errMsg := ""
|
||||
if result.ExitCode != 0 || err != nil {
|
||||
status = "failed"
|
||||
if err != nil {
|
||||
errMsg = err.Error()
|
||||
}
|
||||
}
|
||||
|
||||
e.store.UpdateStepResult(stepID, status, result.ExitCode, result.Stdout, result.Stderr, &fin, duration, errMsg)
|
||||
|
||||
// Store output for ${step_id.stdout} references.
|
||||
if step.ID != "" || step.Output != "" {
|
||||
mu.Lock()
|
||||
key := step.ID
|
||||
if key == "" {
|
||||
key = step.Output
|
||||
}
|
||||
outputs[key] = strings.TrimSpace(result.Stdout)
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
// Read DAGU_ENV for inter-step env propagation.
|
||||
readDaguEnv(daguEnvPath, outputs)
|
||||
|
||||
if status == "failed" {
|
||||
return fmt.Errorf("exit code %d", result.ExitCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Executor) runHandlers(ctx context.Context, runID string, dag core.DagDefinition, handlers []core.DagStep, daguEnvPath string, outputs map[string]string) {
|
||||
var mu sync.Mutex
|
||||
for _, step := range handlers {
|
||||
e.executeStep(ctx, runID, dag, step, daguEnvPath, outputs, &mu)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Executor) failRun(runID string, err error) {
|
||||
fin := time.Now()
|
||||
e.store.UpdateRunStatus(runID, "failed", &fin, err.Error())
|
||||
}
|
||||
|
||||
func (e *Executor) recordStepSkipped(runID string, step core.DagStep) {
|
||||
now := time.Now()
|
||||
e.store.InsertStepResult(&store.DagStepResult{
|
||||
ID: generateID(),
|
||||
RunID: runID,
|
||||
StepName: stepName(step),
|
||||
Status: "skipped",
|
||||
StartedAt: &now,
|
||||
})
|
||||
}
|
||||
|
||||
// --- helpers ---
|
||||
|
||||
func stepName(s core.DagStep) string {
|
||||
if s.Name != "" {
|
||||
return s.Name
|
||||
}
|
||||
return s.ID
|
||||
}
|
||||
|
||||
func buildStepEnv(dag core.DagDefinition, step core.DagStep, daguEnvPath string, outputs map[string]string) []string {
|
||||
env := os.Environ()
|
||||
|
||||
// Add DAG-level env.
|
||||
for k, v := range dag.Env {
|
||||
env = append(env, k+"="+v)
|
||||
}
|
||||
|
||||
// Add step-level env.
|
||||
for k, v := range step.Env {
|
||||
env = append(env, k+"="+v)
|
||||
}
|
||||
|
||||
// Add DAGU_ENV path.
|
||||
env = append(env, "DAGU_ENV="+daguEnvPath)
|
||||
|
||||
return env
|
||||
}
|
||||
|
||||
func resolveStepRefs(command string, outputs map[string]string) string {
|
||||
for k, v := range outputs {
|
||||
command = strings.ReplaceAll(command, "${"+k+".stdout}", v)
|
||||
command = strings.ReplaceAll(command, "$"+k+".stdout", v)
|
||||
}
|
||||
return command
|
||||
}
|
||||
|
||||
func readDaguEnv(path string, outputs map[string]string) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil || len(data) == 0 {
|
||||
return
|
||||
}
|
||||
for _, line := range strings.Split(string(data), "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" || strings.HasPrefix(line, "#") {
|
||||
continue
|
||||
}
|
||||
parts := strings.SplitN(line, "=", 2)
|
||||
if len(parts) == 2 {
|
||||
outputs[parts[0]] = parts[1]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// generateID creates a simple time-based unique ID.
|
||||
func generateID() string {
|
||||
return fmt.Sprintf("%d-%04x", time.Now().UnixNano(), time.Now().Nanosecond()%0xFFFF)
|
||||
}
|
||||
|
||||
// --- DAG listing helpers ---
|
||||
|
||||
// DagInfo summarizes a DAG file for listing.
|
||||
type DagInfo struct {
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description,omitempty"`
|
||||
Schedule []string `json:"schedule,omitempty"`
|
||||
Tags []string `json:"tags,omitempty"`
|
||||
Type string `json:"type,omitempty"`
|
||||
FilePath string `json:"file_path"`
|
||||
Valid bool `json:"valid"`
|
||||
LastRun *store.DagRun `json:"last_run,omitempty"`
|
||||
}
|
||||
|
||||
// ListDAGs scans a directory for YAML files and returns parsed DAG info.
|
||||
func (e *Executor) ListDAGs() ([]DagInfo, error) {
|
||||
entries, err := os.ReadDir(e.dagsDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read dags dir: %w", err)
|
||||
}
|
||||
|
||||
var dags []DagInfo
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
ext := filepath.Ext(entry.Name())
|
||||
if ext != ".yaml" && ext != ".yml" {
|
||||
continue
|
||||
}
|
||||
|
||||
path := filepath.Join(e.dagsDir, entry.Name())
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
dag, err := core.DagParse(data)
|
||||
if err != nil {
|
||||
dags = append(dags, DagInfo{
|
||||
Name: strings.TrimSuffix(entry.Name(), ext),
|
||||
FilePath: path,
|
||||
Valid: false,
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
info := DagInfo{
|
||||
Name: dag.Name,
|
||||
Description: dag.Description,
|
||||
Schedule: dag.Schedule,
|
||||
Tags: dag.Tags,
|
||||
Type: dag.Type,
|
||||
FilePath: path,
|
||||
Valid: true,
|
||||
}
|
||||
|
||||
// Attach last run info.
|
||||
runs, _, _ := e.store.ListRuns(dag.Name, 1, 0)
|
||||
if len(runs) > 0 {
|
||||
info.LastRun = &runs[0]
|
||||
}
|
||||
|
||||
dags = append(dags, info)
|
||||
}
|
||||
|
||||
return dags, nil
|
||||
}
|
||||
|
||||
// GetDAG returns detailed info for a specific DAG by name.
|
||||
func (e *Executor) GetDAG(name string) (*DagInfo, *core.DagDefinition, *core.DagValidationResult, error) {
|
||||
// Find the YAML file.
|
||||
entries, err := os.ReadDir(e.dagsDir)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
ext := filepath.Ext(entry.Name())
|
||||
base := strings.TrimSuffix(entry.Name(), ext)
|
||||
if (ext != ".yaml" && ext != ".yml") || base != name {
|
||||
continue
|
||||
}
|
||||
|
||||
path := filepath.Join(e.dagsDir, entry.Name())
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
dag, err := core.DagParse(data)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("parse: %w", err)
|
||||
}
|
||||
dag.FilePath = path
|
||||
|
||||
validationResult := core.DagValidate(dag)
|
||||
|
||||
info := &DagInfo{
|
||||
Name: dag.Name,
|
||||
Description: dag.Description,
|
||||
Schedule: dag.Schedule,
|
||||
Tags: dag.Tags,
|
||||
Type: dag.Type,
|
||||
FilePath: path,
|
||||
Valid: validationResult.Valid,
|
||||
}
|
||||
|
||||
runs, _, _ := e.store.ListRuns(dag.Name, 1, 0)
|
||||
if len(runs) > 0 {
|
||||
info.LastRun = &runs[0]
|
||||
}
|
||||
|
||||
return info, &dag, &validationResult, nil
|
||||
}
|
||||
|
||||
return nil, nil, nil, fmt.Errorf("dag %q not found in %s", name, e.dagsDir)
|
||||
}
|
||||
|
||||
// ValidateDAG parses and validates a DAG file, printing results.
|
||||
func ValidateDAG(path string) error {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dag, err := core.DagParse(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse error: %w", err)
|
||||
}
|
||||
|
||||
result := core.DagValidate(dag)
|
||||
|
||||
log.Printf("DAG: %s", dag.Name)
|
||||
log.Printf("Steps: %d", len(dag.Steps))
|
||||
log.Printf("Schedule: %v", dag.Schedule)
|
||||
|
||||
if result.Valid {
|
||||
log.Printf("Validation: PASS")
|
||||
log.Printf("Topological levels: %d", len(result.Levels))
|
||||
for i, level := range result.Levels {
|
||||
log.Printf(" Level %d: %v", i, level)
|
||||
}
|
||||
} else {
|
||||
log.Printf("Validation: FAIL")
|
||||
for _, e := range result.Errors {
|
||||
log.Printf(" ERROR: %s", e)
|
||||
}
|
||||
}
|
||||
for _, w := range result.Warnings {
|
||||
log.Printf(" WARNING: %s", w)
|
||||
}
|
||||
|
||||
if !result.Valid {
|
||||
return fmt.Errorf("validation failed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user