5d2a14e50a
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
516 lines
12 KiB
Go
516 lines
12 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"fn-registry/functions/core"
|
|
"fn-registry/functions/infra"
|
|
|
|
"dag-engine/store"
|
|
)
|
|
|
|
// Executor orchestrates DAG parsing, validation, and execution.
|
|
type Executor struct {
|
|
store *store.DB
|
|
dagsDir string
|
|
}
|
|
|
|
// NewExecutor creates a new executor.
|
|
func NewExecutor(s *store.DB, dagsDir string) *Executor {
|
|
return &Executor{store: s, dagsDir: dagsDir}
|
|
}
|
|
|
|
// ExecuteDAG runs a DAG from a YAML file path and returns the run ID.
|
|
// It runs asynchronously: steps execute in topological order with parallel levels.
|
|
func (e *Executor) ExecuteDAG(ctx context.Context, dagPath string, trigger string) (string, error) {
|
|
data, err := os.ReadFile(dagPath)
|
|
if err != nil {
|
|
return "", fmt.Errorf("read dag: %w", err)
|
|
}
|
|
|
|
dag, err := core.DagParse(data)
|
|
if err != nil {
|
|
return "", fmt.Errorf("parse dag: %w", err)
|
|
}
|
|
dag.FilePath = dagPath
|
|
|
|
// Resolve env variables.
|
|
dag = core.DagResolveEnv(dag, os.Environ())
|
|
|
|
// Validate.
|
|
result := core.DagValidate(dag)
|
|
if !result.Valid {
|
|
return "", fmt.Errorf("validate dag: %s", strings.Join(result.Errors, "; "))
|
|
}
|
|
|
|
// Create run record.
|
|
runID := generateID()
|
|
now := time.Now()
|
|
run := &store.DagRun{
|
|
ID: runID,
|
|
DagName: dag.Name,
|
|
DagPath: dagPath,
|
|
Status: "running",
|
|
Trigger: trigger,
|
|
StartedAt: now,
|
|
}
|
|
if err := e.store.CreateRun(run); err != nil {
|
|
return "", fmt.Errorf("create run: %w", err)
|
|
}
|
|
|
|
// Topological sort.
|
|
levels, err := core.DagTopoSort(dag.Steps)
|
|
if err != nil {
|
|
e.failRun(runID, err)
|
|
return runID, err
|
|
}
|
|
|
|
// Setup DAGU_ENV temp file for inter-step communication.
|
|
daguEnvFile, err := os.CreateTemp("", "dagu_env_*")
|
|
if err != nil {
|
|
e.failRun(runID, err)
|
|
return runID, err
|
|
}
|
|
daguEnvPath := daguEnvFile.Name()
|
|
daguEnvFile.Close()
|
|
defer os.Remove(daguEnvPath)
|
|
|
|
// Track step outputs for ${step_id.stdout} references.
|
|
stepOutputs := make(map[string]string)
|
|
|
|
// Execute levels.
|
|
runFailed := false
|
|
var runErr error
|
|
|
|
for _, level := range levels {
|
|
if runFailed {
|
|
// Skip remaining levels, mark steps as skipped.
|
|
for _, step := range level {
|
|
e.recordStepSkipped(runID, step)
|
|
}
|
|
continue
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
var mu sync.Mutex
|
|
levelFailed := false
|
|
|
|
for _, step := range level {
|
|
step := step
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
mu.Lock()
|
|
if levelFailed {
|
|
mu.Unlock()
|
|
e.recordStepSkipped(runID, step)
|
|
return
|
|
}
|
|
mu.Unlock()
|
|
|
|
err := e.executeStep(ctx, runID, dag, step, daguEnvPath, stepOutputs, &mu)
|
|
if err != nil && !step.ContinueOn.Failure {
|
|
mu.Lock()
|
|
levelFailed = true
|
|
runFailed = true
|
|
runErr = fmt.Errorf("step %q failed: %w", stepName(step), err)
|
|
mu.Unlock()
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// Run handlers.
|
|
if runFailed {
|
|
e.runHandlers(ctx, runID, dag, dag.HandlerOn.Failure, daguEnvPath, stepOutputs)
|
|
} else {
|
|
e.runHandlers(ctx, runID, dag, dag.HandlerOn.Success, daguEnvPath, stepOutputs)
|
|
}
|
|
e.runHandlers(ctx, runID, dag, dag.HandlerOn.Exit, daguEnvPath, stepOutputs)
|
|
|
|
// Finalize run.
|
|
fin := time.Now()
|
|
status := "success"
|
|
errMsg := ""
|
|
if runFailed {
|
|
status = "failed"
|
|
if runErr != nil {
|
|
errMsg = runErr.Error()
|
|
}
|
|
}
|
|
e.store.UpdateRunStatus(runID, status, &fin, errMsg)
|
|
|
|
return runID, runErr
|
|
}
|
|
|
|
// executeStep runs a single step, recording results in the store.
|
|
func (e *Executor) executeStep(ctx context.Context, runID string, dag core.DagDefinition, step core.DagStep, daguEnvPath string, outputs map[string]string, mu *sync.Mutex) error {
|
|
stepID := generateID()
|
|
now := time.Now()
|
|
|
|
// Resolve command source: function (registry) takes precedence over command/script.
|
|
var command string
|
|
var stepFunctionID string
|
|
var fnRegistryRoot string
|
|
if step.Function != "" {
|
|
stepFunctionID = step.Function
|
|
fnRegistryRoot = os.Getenv("FN_REGISTRY_ROOT")
|
|
if fnRegistryRoot == "" {
|
|
fnRegistryRoot = "/home/lucas/fn_registry"
|
|
}
|
|
fnBin := os.Getenv("FN_BIN")
|
|
if fnBin == "" {
|
|
fnBin = fnRegistryRoot + "/fn"
|
|
}
|
|
parts := []string{fnBin, "run", step.Function}
|
|
parts = append(parts, step.Args...)
|
|
command = strings.Join(parts, " ")
|
|
} else if step.Command != "" {
|
|
command = step.Command
|
|
} else if step.Script != "" {
|
|
command = step.Script
|
|
}
|
|
|
|
e.store.InsertStepResult(&store.DagStepResult{
|
|
ID: stepID,
|
|
RunID: runID,
|
|
StepName: stepName(step),
|
|
FunctionID: stepFunctionID,
|
|
Status: "running",
|
|
StartedAt: &now,
|
|
})
|
|
|
|
// Build environment.
|
|
env := buildStepEnv(dag, step, daguEnvPath, outputs)
|
|
|
|
// For function: steps, force FN_REGISTRY_ROOT into env so `fn run`
|
|
// resolves the canonical registry.db (not whatever lives at the spawn cwd).
|
|
// Prevents the apps/dag_engine/registry.db stale-shadow bug (2026-05-16).
|
|
if stepFunctionID != "" {
|
|
env = append(env, "FN_REGISTRY_ROOT="+fnRegistryRoot)
|
|
}
|
|
|
|
if command == "" {
|
|
e.store.UpdateStepResult(stepID, "skipped", 0, "", "", nil, 0, "no command or script")
|
|
return nil
|
|
}
|
|
|
|
// Resolve step-level ${VAR} references and ${step_id.stdout} patterns.
|
|
mu.Lock()
|
|
command = resolveStepRefs(command, outputs)
|
|
mu.Unlock()
|
|
|
|
// Determine working directory. function: steps default to FN_REGISTRY_ROOT
|
|
// so `fn` resolves registry.db correctly via go.mod walk-up.
|
|
dir := step.Dir
|
|
if dir == "" {
|
|
dir = dag.WorkingDir
|
|
}
|
|
if dir == "" && stepFunctionID != "" {
|
|
dir = fnRegistryRoot
|
|
}
|
|
|
|
shell := step.Shell
|
|
if shell == "" {
|
|
shell = dag.Shell
|
|
}
|
|
|
|
// Spawn process.
|
|
handle, err := infra.ProcessSpawn(command, dir, env, shell)
|
|
if err != nil {
|
|
fin := time.Now()
|
|
e.store.UpdateStepResult(stepID, "failed", -1, "", "", &fin, time.Since(now).Milliseconds(), err.Error())
|
|
return err
|
|
}
|
|
|
|
// Wait for process.
|
|
result, err := infra.ProcessWait(handle, step.TimeoutSec)
|
|
fin := time.Now()
|
|
duration := time.Since(now).Milliseconds()
|
|
|
|
if err != nil && result.ExitCode == 0 {
|
|
result.ExitCode = -1
|
|
}
|
|
|
|
status := "success"
|
|
errMsg := ""
|
|
if result.ExitCode != 0 || err != nil {
|
|
status = "failed"
|
|
if err != nil {
|
|
errMsg = err.Error()
|
|
}
|
|
}
|
|
|
|
e.store.UpdateStepResult(stepID, status, result.ExitCode, result.Stdout, result.Stderr, &fin, duration, errMsg)
|
|
|
|
// Store output for ${step_id.stdout} references.
|
|
if step.ID != "" || step.Output != "" {
|
|
mu.Lock()
|
|
key := step.ID
|
|
if key == "" {
|
|
key = step.Output
|
|
}
|
|
outputs[key] = strings.TrimSpace(result.Stdout)
|
|
mu.Unlock()
|
|
}
|
|
|
|
// Read DAGU_ENV for inter-step env propagation.
|
|
readDaguEnv(daguEnvPath, outputs)
|
|
|
|
if status == "failed" {
|
|
return fmt.Errorf("exit code %d", result.ExitCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *Executor) runHandlers(ctx context.Context, runID string, dag core.DagDefinition, handlers []core.DagStep, daguEnvPath string, outputs map[string]string) {
|
|
var mu sync.Mutex
|
|
for _, step := range handlers {
|
|
e.executeStep(ctx, runID, dag, step, daguEnvPath, outputs, &mu)
|
|
}
|
|
}
|
|
|
|
func (e *Executor) failRun(runID string, err error) {
|
|
fin := time.Now()
|
|
e.store.UpdateRunStatus(runID, "failed", &fin, err.Error())
|
|
}
|
|
|
|
func (e *Executor) recordStepSkipped(runID string, step core.DagStep) {
|
|
now := time.Now()
|
|
e.store.InsertStepResult(&store.DagStepResult{
|
|
ID: generateID(),
|
|
RunID: runID,
|
|
StepName: stepName(step),
|
|
Status: "skipped",
|
|
StartedAt: &now,
|
|
})
|
|
}
|
|
|
|
// --- helpers ---
|
|
|
|
func stepName(s core.DagStep) string {
|
|
if s.Name != "" {
|
|
return s.Name
|
|
}
|
|
return s.ID
|
|
}
|
|
|
|
func buildStepEnv(dag core.DagDefinition, step core.DagStep, daguEnvPath string, outputs map[string]string) []string {
|
|
env := os.Environ()
|
|
|
|
// Add DAG-level env.
|
|
for k, v := range dag.Env {
|
|
env = append(env, k+"="+v)
|
|
}
|
|
|
|
// Add step-level env.
|
|
for k, v := range step.Env {
|
|
env = append(env, k+"="+v)
|
|
}
|
|
|
|
// Add DAGU_ENV path.
|
|
env = append(env, "DAGU_ENV="+daguEnvPath)
|
|
|
|
return env
|
|
}
|
|
|
|
func resolveStepRefs(command string, outputs map[string]string) string {
|
|
for k, v := range outputs {
|
|
command = strings.ReplaceAll(command, "${"+k+".stdout}", v)
|
|
command = strings.ReplaceAll(command, "$"+k+".stdout", v)
|
|
}
|
|
return command
|
|
}
|
|
|
|
func readDaguEnv(path string, outputs map[string]string) {
|
|
data, err := os.ReadFile(path)
|
|
if err != nil || len(data) == 0 {
|
|
return
|
|
}
|
|
for _, line := range strings.Split(string(data), "\n") {
|
|
line = strings.TrimSpace(line)
|
|
if line == "" || strings.HasPrefix(line, "#") {
|
|
continue
|
|
}
|
|
parts := strings.SplitN(line, "=", 2)
|
|
if len(parts) == 2 {
|
|
outputs[parts[0]] = parts[1]
|
|
}
|
|
}
|
|
}
|
|
|
|
// generateID creates a simple time-based unique ID.
|
|
func generateID() string {
|
|
return fmt.Sprintf("%d-%04x", time.Now().UnixNano(), time.Now().Nanosecond()%0xFFFF)
|
|
}
|
|
|
|
// --- DAG listing helpers ---
|
|
|
|
// DagInfo summarizes a DAG file for listing.
|
|
type DagInfo struct {
|
|
Name string `json:"name"`
|
|
Description string `json:"description,omitempty"`
|
|
Schedule []string `json:"schedule,omitempty"`
|
|
Tags []string `json:"tags,omitempty"`
|
|
Type string `json:"type,omitempty"`
|
|
FilePath string `json:"file_path"`
|
|
Valid bool `json:"valid"`
|
|
LastRun *store.DagRun `json:"last_run,omitempty"`
|
|
LastRuns []store.DagRun `json:"last_runs,omitempty"` // 5 mas recientes (mas reciente primero)
|
|
}
|
|
|
|
// ListDAGs scans a directory for YAML files and returns parsed DAG info.
|
|
func (e *Executor) ListDAGs() ([]DagInfo, error) {
|
|
entries, err := os.ReadDir(e.dagsDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read dags dir: %w", err)
|
|
}
|
|
|
|
var dags []DagInfo
|
|
for _, entry := range entries {
|
|
if entry.IsDir() {
|
|
continue
|
|
}
|
|
ext := filepath.Ext(entry.Name())
|
|
if ext != ".yaml" && ext != ".yml" {
|
|
continue
|
|
}
|
|
|
|
path := filepath.Join(e.dagsDir, entry.Name())
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
dag, err := core.DagParse(data)
|
|
if err != nil {
|
|
dags = append(dags, DagInfo{
|
|
Name: strings.TrimSuffix(entry.Name(), ext),
|
|
FilePath: path,
|
|
Valid: false,
|
|
})
|
|
continue
|
|
}
|
|
|
|
info := DagInfo{
|
|
Name: dag.Name,
|
|
Description: dag.Description,
|
|
Schedule: dag.Schedule,
|
|
Tags: dag.Tags,
|
|
Type: dag.Type,
|
|
FilePath: path,
|
|
Valid: true,
|
|
}
|
|
|
|
// Attach last 5 runs (most recent first).
|
|
runs, _, _ := e.store.ListRuns(dag.Name, 5, 0)
|
|
if len(runs) > 0 {
|
|
info.LastRun = &runs[0]
|
|
info.LastRuns = runs
|
|
}
|
|
|
|
dags = append(dags, info)
|
|
}
|
|
|
|
return dags, nil
|
|
}
|
|
|
|
// GetDAG returns detailed info for a specific DAG by name.
|
|
func (e *Executor) GetDAG(name string) (*DagInfo, *core.DagDefinition, *core.DagValidationResult, error) {
|
|
// Find the YAML file.
|
|
entries, err := os.ReadDir(e.dagsDir)
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
for _, entry := range entries {
|
|
ext := filepath.Ext(entry.Name())
|
|
base := strings.TrimSuffix(entry.Name(), ext)
|
|
if (ext != ".yaml" && ext != ".yml") || base != name {
|
|
continue
|
|
}
|
|
|
|
path := filepath.Join(e.dagsDir, entry.Name())
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
dag, err := core.DagParse(data)
|
|
if err != nil {
|
|
return nil, nil, nil, fmt.Errorf("parse: %w", err)
|
|
}
|
|
dag.FilePath = path
|
|
|
|
validationResult := core.DagValidate(dag)
|
|
|
|
info := &DagInfo{
|
|
Name: dag.Name,
|
|
Description: dag.Description,
|
|
Schedule: dag.Schedule,
|
|
Tags: dag.Tags,
|
|
Type: dag.Type,
|
|
FilePath: path,
|
|
Valid: validationResult.Valid,
|
|
}
|
|
|
|
runs, _, _ := e.store.ListRuns(dag.Name, 1, 0)
|
|
if len(runs) > 0 {
|
|
info.LastRun = &runs[0]
|
|
}
|
|
|
|
return info, &dag, &validationResult, nil
|
|
}
|
|
|
|
return nil, nil, nil, fmt.Errorf("dag %q not found in %s", name, e.dagsDir)
|
|
}
|
|
|
|
// ValidateDAG parses and validates a DAG file, printing results.
|
|
func ValidateDAG(path string) error {
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dag, err := core.DagParse(data)
|
|
if err != nil {
|
|
return fmt.Errorf("parse error: %w", err)
|
|
}
|
|
|
|
result := core.DagValidate(dag)
|
|
|
|
log.Printf("DAG: %s", dag.Name)
|
|
log.Printf("Steps: %d", len(dag.Steps))
|
|
log.Printf("Schedule: %v", dag.Schedule)
|
|
|
|
if result.Valid {
|
|
log.Printf("Validation: PASS")
|
|
log.Printf("Topological levels: %d", len(result.Levels))
|
|
for i, level := range result.Levels {
|
|
log.Printf(" Level %d: %v", i, level)
|
|
}
|
|
} else {
|
|
log.Printf("Validation: FAIL")
|
|
for _, e := range result.Errors {
|
|
log.Printf(" ERROR: %s", e)
|
|
}
|
|
}
|
|
for _, w := range result.Warnings {
|
|
log.Printf(" WARNING: %s", w)
|
|
}
|
|
|
|
if !result.Valid {
|
|
return fmt.Errorf("validation failed")
|
|
}
|
|
return nil
|
|
}
|