diff --git a/apps/dag_engine/.gitignore b/apps/dag_engine/.gitignore new file mode 100644 index 00000000..0472ec5b --- /dev/null +++ b/apps/dag_engine/.gitignore @@ -0,0 +1,5 @@ +dag-engine +dag_engine +dag_engine.db +frontend/node_modules/ +frontend/dist/ diff --git a/apps/dag_engine/api.go b/apps/dag_engine/api.go new file mode 100644 index 00000000..e112f30e --- /dev/null +++ b/apps/dag_engine/api.go @@ -0,0 +1,47 @@ +package main + +import ( + "io/fs" + "net/http" +) + +// RegisterAPI sets up all HTTP routes on the given mux. +func RegisterAPI(mux *http.ServeMux, executor *Executor, scheduler *Scheduler, frontendFS fs.FS) { + // API routes. + mux.HandleFunc("GET /api/dags", handleListDags(executor)) + mux.HandleFunc("GET /api/dags/{name}", handleGetDag(executor)) + mux.HandleFunc("POST /api/dags/{name}/run", handleRunDag(executor)) + + mux.HandleFunc("GET /api/runs", handleListRuns(executor)) + mux.HandleFunc("GET /api/runs/{id}", handleGetRun(executor)) + + mux.HandleFunc("POST /api/scheduler/start", handleSchedulerStart(scheduler)) + mux.HandleFunc("POST /api/scheduler/stop", handleSchedulerStop(scheduler)) + mux.HandleFunc("GET /api/scheduler/status", handleSchedulerStatus(scheduler)) + + // Frontend SPA fallback. + if frontendFS != nil { + mux.Handle("/", spaHandler(frontendFS)) + } +} + +// spaHandler serves static files from the embedded FS, falling back to index.html +// for unknown paths (SPA client-side routing). +func spaHandler(fsys fs.FS) http.Handler { + fileServer := http.FileServer(http.FS(fsys)) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Try to serve the file directly. + path := r.URL.Path + if path == "/" { + path = "index.html" + } else { + path = path[1:] // strip leading / + } + + if _, err := fs.Stat(fsys, path); err != nil { + // File not found — serve index.html for SPA routing. + r.URL.Path = "/" + } + fileServer.ServeHTTP(w, r) + }) +} diff --git a/apps/dag_engine/app.md b/apps/dag_engine/app.md new file mode 100644 index 00000000..c8e50c4e --- /dev/null +++ b/apps/dag_engine/app.md @@ -0,0 +1,86 @@ +--- +name: dag_engine +lang: go +domain: infra +description: "Motor de ejecucion de DAGs con CLI y interfaz web. Reemplaza Dagu con implementacion propia compatible con el formato YAML existente. Almacena historial de ejecuciones en SQLite." +tags: [service, dag, workflow, scheduler, web, cron] +uses_functions: + - dag_parse_go_core + - dag_validate_go_core + - dag_topo_sort_go_core + - dag_resolve_env_go_core + - parse_cron_expr_go_core + - next_cron_time_go_core + - cron_ticker_go_infra + - cron_match_go_core + - process_spawn_go_infra + - process_wait_go_infra + - process_kill_go_infra +uses_types: + - dag_definition_go_core + - dag_step_go_core + - dag_validation_result_go_core + - cron_schedule_go_core + - process_handle_go_infra + - process_result_go_infra + - DagRun_go_infra + - DagStepResult_go_infra +framework: "net/http + vite + react" +entry_point: "main.go" +dir_path: "apps/dag_engine" +--- + +## Arquitectura + +CLI + servidor web en un unico binario: + +``` +dag-engine run # ejecuta un DAG desde terminal +dag-engine list [dir] # lista DAGs con schedule y estado +dag-engine status [dag_name] # historial de ejecuciones +dag-engine validate # valida sin ejecutar +dag-engine server # arranca HTTP + frontend web +``` + +### Backend (Go) + +- `net/http` con `ServeMux` (Go 1.22+ pattern routing) +- SQLite via `go-sqlite3` para historial de runs +- Executor: parse -> validate -> topo_sort -> spawn/wait por nivel -> store +- Scheduler: cron_ticker por cada DAG con schedule + +### Frontend (Vite + React + Mantine) + +- DagList: tabla de DAGs con schedule, tags, ultimo status +- DagDetail: metadata + "Run Now" + historial +- RunDetail: timeline de steps con stdout/stderr expandible + +### Storage + +SQLite `dag_engine.db`: +- `dag_runs`: id, dag_name, status, trigger, started_at, finished_at, error +- `dag_step_results`: id, run_id, step_name, status, exit_code, stdout, stderr, duration_ms + +### Build + +```bash +cd frontend && pnpm install && pnpm build +cd .. && CGO_ENABLED=1 go build -tags fts5 -o dag-engine . +``` + +### Uso + +```bash +# CLI +./dag-engine run ~/dagu/dags/example.yaml +./dag-engine list ~/dagu/dags/ + +# Servidor web +./dag-engine server --port 8090 --dags-dir ~/dagu/dags/ --scheduler +# Browser: http://localhost:8090 +``` + +## Notas + +Compatible con el formato YAML de Dagu. Lee DAGs existentes de `~/dagu/dags/` sin modificaciones. +Puerto por defecto 8090 (mismo que Dagu). diff --git a/apps/dag_engine/config.go b/apps/dag_engine/config.go new file mode 100644 index 00000000..f4936921 --- /dev/null +++ b/apps/dag_engine/config.go @@ -0,0 +1,34 @@ +package main + +import ( + "flag" + "os" + "path/filepath" +) + +// Config holds the runtime configuration for the DAG engine. +type Config struct { + Port int + DagsDir string + DBPath string + AutoScheduler bool +} + +// DefaultConfig returns sensible defaults. +func DefaultConfig() Config { + home, _ := os.UserHomeDir() + return Config{ + Port: 8090, + DagsDir: filepath.Join(home, "dagu", "dags"), + DBPath: "dag_engine.db", + } +} + +// ParseFlags populates config from CLI flags for the "server" subcommand. +func (c *Config) ParseFlags(fs *flag.FlagSet, args []string) error { + fs.IntVar(&c.Port, "port", c.Port, "HTTP server port") + fs.StringVar(&c.DagsDir, "dags-dir", c.DagsDir, "directory containing DAG YAML files") + fs.StringVar(&c.DBPath, "db", c.DBPath, "path to SQLite database") + fs.BoolVar(&c.AutoScheduler, "scheduler", c.AutoScheduler, "auto-start cron scheduler") + return fs.Parse(args) +} diff --git a/apps/dag_engine/executor.go b/apps/dag_engine/executor.go new file mode 100644 index 00000000..44fd9ee9 --- /dev/null +++ b/apps/dag_engine/executor.go @@ -0,0 +1,482 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "fn-registry/functions/core" + "fn-registry/functions/infra" + + "dag-engine/store" +) + +// Executor orchestrates DAG parsing, validation, and execution. +type Executor struct { + store *store.DB + dagsDir string +} + +// NewExecutor creates a new executor. +func NewExecutor(s *store.DB, dagsDir string) *Executor { + return &Executor{store: s, dagsDir: dagsDir} +} + +// ExecuteDAG runs a DAG from a YAML file path and returns the run ID. +// It runs asynchronously: steps execute in topological order with parallel levels. +func (e *Executor) ExecuteDAG(ctx context.Context, dagPath string, trigger string) (string, error) { + data, err := os.ReadFile(dagPath) + if err != nil { + return "", fmt.Errorf("read dag: %w", err) + } + + dag, err := core.DagParse(data) + if err != nil { + return "", fmt.Errorf("parse dag: %w", err) + } + dag.FilePath = dagPath + + // Resolve env variables. + dag = core.DagResolveEnv(dag, os.Environ()) + + // Validate. + result := core.DagValidate(dag) + if !result.Valid { + return "", fmt.Errorf("validate dag: %s", strings.Join(result.Errors, "; ")) + } + + // Create run record. + runID := generateID() + now := time.Now() + run := &store.DagRun{ + ID: runID, + DagName: dag.Name, + DagPath: dagPath, + Status: "running", + Trigger: trigger, + StartedAt: now, + } + if err := e.store.CreateRun(run); err != nil { + return "", fmt.Errorf("create run: %w", err) + } + + // Topological sort. + levels, err := core.DagTopoSort(dag.Steps) + if err != nil { + e.failRun(runID, err) + return runID, err + } + + // Setup DAGU_ENV temp file for inter-step communication. + daguEnvFile, err := os.CreateTemp("", "dagu_env_*") + if err != nil { + e.failRun(runID, err) + return runID, err + } + daguEnvPath := daguEnvFile.Name() + daguEnvFile.Close() + defer os.Remove(daguEnvPath) + + // Track step outputs for ${step_id.stdout} references. + stepOutputs := make(map[string]string) + + // Execute levels. + runFailed := false + var runErr error + + for _, level := range levels { + if runFailed { + // Skip remaining levels, mark steps as skipped. + for _, step := range level { + e.recordStepSkipped(runID, step) + } + continue + } + + var wg sync.WaitGroup + var mu sync.Mutex + levelFailed := false + + for _, step := range level { + step := step + wg.Add(1) + go func() { + defer wg.Done() + + mu.Lock() + if levelFailed { + mu.Unlock() + e.recordStepSkipped(runID, step) + return + } + mu.Unlock() + + err := e.executeStep(ctx, runID, dag, step, daguEnvPath, stepOutputs, &mu) + if err != nil && !step.ContinueOn.Failure { + mu.Lock() + levelFailed = true + runFailed = true + runErr = fmt.Errorf("step %q failed: %w", stepName(step), err) + mu.Unlock() + } + }() + } + wg.Wait() + } + + // Run handlers. + if runFailed { + e.runHandlers(ctx, runID, dag, dag.HandlerOn.Failure, daguEnvPath, stepOutputs) + } else { + e.runHandlers(ctx, runID, dag, dag.HandlerOn.Success, daguEnvPath, stepOutputs) + } + e.runHandlers(ctx, runID, dag, dag.HandlerOn.Exit, daguEnvPath, stepOutputs) + + // Finalize run. + fin := time.Now() + status := "success" + errMsg := "" + if runFailed { + status = "failed" + if runErr != nil { + errMsg = runErr.Error() + } + } + e.store.UpdateRunStatus(runID, status, &fin, errMsg) + + return runID, runErr +} + +// executeStep runs a single step, recording results in the store. +func (e *Executor) executeStep(ctx context.Context, runID string, dag core.DagDefinition, step core.DagStep, daguEnvPath string, outputs map[string]string, mu *sync.Mutex) error { + stepID := generateID() + now := time.Now() + e.store.InsertStepResult(&store.DagStepResult{ + ID: stepID, + RunID: runID, + StepName: stepName(step), + Status: "running", + StartedAt: &now, + }) + + // Build environment. + env := buildStepEnv(dag, step, daguEnvPath, outputs) + + // Determine command. + command := step.Command + if command == "" && step.Script != "" { + command = step.Script + } + if command == "" { + e.store.UpdateStepResult(stepID, "skipped", 0, "", "", nil, 0, "no command or script") + return nil + } + + // Resolve step-level ${VAR} references and ${step_id.stdout} patterns. + mu.Lock() + command = resolveStepRefs(command, outputs) + mu.Unlock() + + // Determine working directory. + dir := step.Dir + if dir == "" { + dir = dag.WorkingDir + } + + shell := step.Shell + if shell == "" { + shell = dag.Shell + } + + // Spawn process. + handle, err := infra.ProcessSpawn(command, dir, env, shell) + if err != nil { + fin := time.Now() + e.store.UpdateStepResult(stepID, "failed", -1, "", "", &fin, time.Since(now).Milliseconds(), err.Error()) + return err + } + + // Wait for process. + result, err := infra.ProcessWait(handle, step.TimeoutSec) + fin := time.Now() + duration := time.Since(now).Milliseconds() + + if err != nil && result.ExitCode == 0 { + result.ExitCode = -1 + } + + status := "success" + errMsg := "" + if result.ExitCode != 0 || err != nil { + status = "failed" + if err != nil { + errMsg = err.Error() + } + } + + e.store.UpdateStepResult(stepID, status, result.ExitCode, result.Stdout, result.Stderr, &fin, duration, errMsg) + + // Store output for ${step_id.stdout} references. + if step.ID != "" || step.Output != "" { + mu.Lock() + key := step.ID + if key == "" { + key = step.Output + } + outputs[key] = strings.TrimSpace(result.Stdout) + mu.Unlock() + } + + // Read DAGU_ENV for inter-step env propagation. + readDaguEnv(daguEnvPath, outputs) + + if status == "failed" { + return fmt.Errorf("exit code %d", result.ExitCode) + } + return nil +} + +func (e *Executor) runHandlers(ctx context.Context, runID string, dag core.DagDefinition, handlers []core.DagStep, daguEnvPath string, outputs map[string]string) { + var mu sync.Mutex + for _, step := range handlers { + e.executeStep(ctx, runID, dag, step, daguEnvPath, outputs, &mu) + } +} + +func (e *Executor) failRun(runID string, err error) { + fin := time.Now() + e.store.UpdateRunStatus(runID, "failed", &fin, err.Error()) +} + +func (e *Executor) recordStepSkipped(runID string, step core.DagStep) { + now := time.Now() + e.store.InsertStepResult(&store.DagStepResult{ + ID: generateID(), + RunID: runID, + StepName: stepName(step), + Status: "skipped", + StartedAt: &now, + }) +} + +// --- helpers --- + +func stepName(s core.DagStep) string { + if s.Name != "" { + return s.Name + } + return s.ID +} + +func buildStepEnv(dag core.DagDefinition, step core.DagStep, daguEnvPath string, outputs map[string]string) []string { + env := os.Environ() + + // Add DAG-level env. + for k, v := range dag.Env { + env = append(env, k+"="+v) + } + + // Add step-level env. + for k, v := range step.Env { + env = append(env, k+"="+v) + } + + // Add DAGU_ENV path. + env = append(env, "DAGU_ENV="+daguEnvPath) + + return env +} + +func resolveStepRefs(command string, outputs map[string]string) string { + for k, v := range outputs { + command = strings.ReplaceAll(command, "${"+k+".stdout}", v) + command = strings.ReplaceAll(command, "$"+k+".stdout", v) + } + return command +} + +func readDaguEnv(path string, outputs map[string]string) { + data, err := os.ReadFile(path) + if err != nil || len(data) == 0 { + return + } + for _, line := range strings.Split(string(data), "\n") { + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + parts := strings.SplitN(line, "=", 2) + if len(parts) == 2 { + outputs[parts[0]] = parts[1] + } + } +} + +// generateID creates a simple time-based unique ID. +func generateID() string { + return fmt.Sprintf("%d-%04x", time.Now().UnixNano(), time.Now().Nanosecond()%0xFFFF) +} + +// --- DAG listing helpers --- + +// DagInfo summarizes a DAG file for listing. +type DagInfo struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Schedule []string `json:"schedule,omitempty"` + Tags []string `json:"tags,omitempty"` + Type string `json:"type,omitempty"` + FilePath string `json:"file_path"` + Valid bool `json:"valid"` + LastRun *store.DagRun `json:"last_run,omitempty"` +} + +// ListDAGs scans a directory for YAML files and returns parsed DAG info. +func (e *Executor) ListDAGs() ([]DagInfo, error) { + entries, err := os.ReadDir(e.dagsDir) + if err != nil { + return nil, fmt.Errorf("read dags dir: %w", err) + } + + var dags []DagInfo + for _, entry := range entries { + if entry.IsDir() { + continue + } + ext := filepath.Ext(entry.Name()) + if ext != ".yaml" && ext != ".yml" { + continue + } + + path := filepath.Join(e.dagsDir, entry.Name()) + data, err := os.ReadFile(path) + if err != nil { + continue + } + + dag, err := core.DagParse(data) + if err != nil { + dags = append(dags, DagInfo{ + Name: strings.TrimSuffix(entry.Name(), ext), + FilePath: path, + Valid: false, + }) + continue + } + + info := DagInfo{ + Name: dag.Name, + Description: dag.Description, + Schedule: dag.Schedule, + Tags: dag.Tags, + Type: dag.Type, + FilePath: path, + Valid: true, + } + + // Attach last run info. + runs, _, _ := e.store.ListRuns(dag.Name, 1, 0) + if len(runs) > 0 { + info.LastRun = &runs[0] + } + + dags = append(dags, info) + } + + return dags, nil +} + +// GetDAG returns detailed info for a specific DAG by name. +func (e *Executor) GetDAG(name string) (*DagInfo, *core.DagDefinition, *core.DagValidationResult, error) { + // Find the YAML file. + entries, err := os.ReadDir(e.dagsDir) + if err != nil { + return nil, nil, nil, err + } + + for _, entry := range entries { + ext := filepath.Ext(entry.Name()) + base := strings.TrimSuffix(entry.Name(), ext) + if (ext != ".yaml" && ext != ".yml") || base != name { + continue + } + + path := filepath.Join(e.dagsDir, entry.Name()) + data, err := os.ReadFile(path) + if err != nil { + return nil, nil, nil, err + } + + dag, err := core.DagParse(data) + if err != nil { + return nil, nil, nil, fmt.Errorf("parse: %w", err) + } + dag.FilePath = path + + validationResult := core.DagValidate(dag) + + info := &DagInfo{ + Name: dag.Name, + Description: dag.Description, + Schedule: dag.Schedule, + Tags: dag.Tags, + Type: dag.Type, + FilePath: path, + Valid: validationResult.Valid, + } + + runs, _, _ := e.store.ListRuns(dag.Name, 1, 0) + if len(runs) > 0 { + info.LastRun = &runs[0] + } + + return info, &dag, &validationResult, nil + } + + return nil, nil, nil, fmt.Errorf("dag %q not found in %s", name, e.dagsDir) +} + +// ValidateDAG parses and validates a DAG file, printing results. +func ValidateDAG(path string) error { + data, err := os.ReadFile(path) + if err != nil { + return err + } + + dag, err := core.DagParse(data) + if err != nil { + return fmt.Errorf("parse error: %w", err) + } + + result := core.DagValidate(dag) + + log.Printf("DAG: %s", dag.Name) + log.Printf("Steps: %d", len(dag.Steps)) + log.Printf("Schedule: %v", dag.Schedule) + + if result.Valid { + log.Printf("Validation: PASS") + log.Printf("Topological levels: %d", len(result.Levels)) + for i, level := range result.Levels { + log.Printf(" Level %d: %v", i, level) + } + } else { + log.Printf("Validation: FAIL") + for _, e := range result.Errors { + log.Printf(" ERROR: %s", e) + } + } + for _, w := range result.Warnings { + log.Printf(" WARNING: %s", w) + } + + if !result.Valid { + return fmt.Errorf("validation failed") + } + return nil +} diff --git a/apps/dag_engine/frontend/index.html b/apps/dag_engine/frontend/index.html new file mode 100644 index 00000000..e551b951 --- /dev/null +++ b/apps/dag_engine/frontend/index.html @@ -0,0 +1,12 @@ + + + + + + DAG Engine + + +
+ + + diff --git a/apps/dag_engine/frontend/package.json b/apps/dag_engine/frontend/package.json new file mode 100644 index 00000000..560e2c31 --- /dev/null +++ b/apps/dag_engine/frontend/package.json @@ -0,0 +1,28 @@ +{ + "name": "dag-engine-frontend", + "private": true, + "version": "1.0.0", + "type": "module", + "scripts": { + "dev": "vite", + "build": "tsc -b && vite build", + "preview": "vite preview" + }, + "dependencies": { + "@mantine/core": "^9.0.2", + "@mantine/hooks": "^9.0.2", + "@tabler/icons-react": "^3.31.0", + "react": "^19.1.0", + "react-dom": "^19.1.0", + "react-router-dom": "^7.6.1" + }, + "devDependencies": { + "@types/react": "^19.1.6", + "@types/react-dom": "^19.1.6", + "@vitejs/plugin-react": "^4.5.2", + "postcss": "^8.5.4", + "postcss-preset-mantine": "^1.17.0", + "typescript": "~5.8.3", + "vite": "^6.3.5" + } +} diff --git a/apps/dag_engine/frontend/postcss.config.cjs b/apps/dag_engine/frontend/postcss.config.cjs new file mode 100644 index 00000000..bbce5388 --- /dev/null +++ b/apps/dag_engine/frontend/postcss.config.cjs @@ -0,0 +1,5 @@ +module.exports = { + plugins: { + "postcss-preset-mantine": {}, + }, +}; diff --git a/apps/dag_engine/frontend/src/App.tsx b/apps/dag_engine/frontend/src/App.tsx new file mode 100644 index 00000000..024797b7 --- /dev/null +++ b/apps/dag_engine/frontend/src/App.tsx @@ -0,0 +1,32 @@ +import { Routes, Route } from "react-router-dom"; +import { AppShell, Container, Title, Group, Text } from "@mantine/core"; +import { IconTopologyRing } from "@tabler/icons-react"; +import { DagList } from "./pages/DagList"; +import { DagDetail } from "./pages/DagDetail"; +import { RunDetail } from "./pages/RunDetail"; + +export function App() { + return ( + + + + + DAG Engine + + fn_registry workflow executor + + + + + + + + } /> + } /> + } /> + + + + + ); +} diff --git a/apps/dag_engine/frontend/src/api.ts b/apps/dag_engine/frontend/src/api.ts new file mode 100644 index 00000000..8035a5a8 --- /dev/null +++ b/apps/dag_engine/frontend/src/api.ts @@ -0,0 +1,63 @@ +import type { + DagSummary, + DagDetail, + DagRun, + RunDetail, + SchedulerStatus, +} from "./types"; + +const BASE = "/api"; + +async function fetchJSON(path: string, init?: RequestInit): Promise { + const res = await fetch(`${BASE}${path}`, init); + if (!res.ok) { + const err = await res.json().catch(() => ({ error: res.statusText })); + throw new Error(err.error || res.statusText); + } + return res.json(); +} + +export function listDags(): Promise { + return fetchJSON("/dags"); +} + +export function getDag(name: string): Promise { + return fetchJSON(`/dags/${encodeURIComponent(name)}`); +} + +export function triggerDag( + name: string +): Promise<{ status: string; dag: string; message: string }> { + return fetchJSON(`/dags/${encodeURIComponent(name)}/run`, { + method: "POST", + }); +} + +export function listRuns(params?: { + dag?: string; + limit?: number; + offset?: number; +}): Promise<{ runs: DagRun[]; total: number }> { + const search = new URLSearchParams(); + if (params?.dag) search.set("dag", params.dag); + if (params?.limit) search.set("limit", String(params.limit)); + if (params?.offset) search.set("offset", String(params.offset)); + const qs = search.toString(); + return fetchJSON(`/runs${qs ? "?" + qs : ""}`); +} + +export function getRun(id: string): Promise { + return fetchJSON(`/runs/${encodeURIComponent(id)}`); +} + +export function startScheduler(): Promise { + return fetchJSON("/scheduler/start", { method: "POST" }); +} + +export function stopScheduler(): Promise { + return fetchJSON("/scheduler/stop", { method: "POST" }); +} + +export function getSchedulerStatus(): Promise { + return fetchJSON("/scheduler/status"); +} diff --git a/apps/dag_engine/frontend/src/components/StatusBadge.tsx b/apps/dag_engine/frontend/src/components/StatusBadge.tsx new file mode 100644 index 00000000..f8114df5 --- /dev/null +++ b/apps/dag_engine/frontend/src/components/StatusBadge.tsx @@ -0,0 +1,18 @@ +import { Badge } from "@mantine/core"; + +const colorMap: Record = { + success: "green", + failed: "red", + running: "blue", + pending: "gray", + cancelled: "yellow", + skipped: "dimmed", +}; + +export function StatusBadge({ status }: { status: string }) { + return ( + + {status} + + ); +} diff --git a/apps/dag_engine/frontend/src/components/StepTimeline.tsx b/apps/dag_engine/frontend/src/components/StepTimeline.tsx new file mode 100644 index 00000000..10558df1 --- /dev/null +++ b/apps/dag_engine/frontend/src/components/StepTimeline.tsx @@ -0,0 +1,85 @@ +import { Timeline, Text, Code, Collapse, Box, Group } from "@mantine/core"; +import { + IconCircleCheck, + IconCircleX, + IconLoader, + IconCircleMinus, + IconClock, +} from "@tabler/icons-react"; +import { useDisclosure } from "@mantine/hooks"; +import type { DagStepResult } from "../types"; + +const iconMap: Record = { + success: , + failed: , + running: , + skipped: , + pending: , +}; + +function StepItem({ step }: { step: DagStepResult }) { + const [opened, { toggle }] = useDisclosure(step.Status === "failed"); + const hasOutput = step.Stdout || step.Stderr; + + return ( + + + {step.StepName} + + + {step.DurationMs}ms + + {step.ExitCode !== 0 && step.ExitCode !== -1 && ( + + exit {step.ExitCode} + + )} + + } + > + {hasOutput && ( + + + {step.Stdout && ( + + {step.Stdout} + + )} + {step.Stderr && ( + + {step.Stderr} + + )} + + + )} + + ); +} + +export function StepTimeline({ steps }: { steps: DagStepResult[] }) { + const activeIndex = steps.findIndex((s) => s.Status === "running"); + + return ( + = 0 ? activeIndex : steps.length - 1} + bulletSize={24} + > + {steps.map((step) => ( + + ))} + + ); +} diff --git a/apps/dag_engine/frontend/src/main.tsx b/apps/dag_engine/frontend/src/main.tsx new file mode 100644 index 00000000..530eb513 --- /dev/null +++ b/apps/dag_engine/frontend/src/main.tsx @@ -0,0 +1,18 @@ +import "@mantine/core/styles.css"; +import { MantineProvider, createTheme } from "@mantine/core"; +import { createRoot } from "react-dom/client"; +import { BrowserRouter } from "react-router-dom"; +import { App } from "./App"; + +const theme = createTheme({ + primaryColor: "blue", + fontFamily: "system-ui, -apple-system, sans-serif", +}); + +createRoot(document.getElementById("root")!).render( + + + + + +); diff --git a/apps/dag_engine/frontend/src/pages/DagDetail.tsx b/apps/dag_engine/frontend/src/pages/DagDetail.tsx new file mode 100644 index 00000000..d4e9d9f6 --- /dev/null +++ b/apps/dag_engine/frontend/src/pages/DagDetail.tsx @@ -0,0 +1,204 @@ +import { useEffect, useState } from "react"; +import { useParams, useNavigate } from "react-router-dom"; +import { + Title, + Text, + Group, + Button, + Badge, + Stack, + Paper, + Table, + Alert, + Loader, + Code, +} from "@mantine/core"; +import { IconPlayerPlay, IconArrowLeft } from "@tabler/icons-react"; +import { getDag, triggerDag } from "../api"; +import { StatusBadge } from "../components/StatusBadge"; +import type { DagDetail as DagDetailType } from "../types"; + +export function DagDetail() { + const { name } = useParams<{ name: string }>(); + const navigate = useNavigate(); + const [data, setData] = useState(null); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const [triggering, setTriggering] = useState(false); + + const load = async () => { + if (!name) return; + setLoading(true); + try { + setData(await getDag(name)); + setError(null); + } catch (e) { + setError((e as Error).message); + } finally { + setLoading(false); + } + }; + + useEffect(() => { + load(); + }, [name]); + + const handleRun = async () => { + if (!name) return; + setTriggering(true); + try { + await triggerDag(name); + setTimeout(load, 1000); + } catch (e) { + setError((e as Error).message); + } finally { + setTriggering(false); + } + }; + + if (loading) return ; + if (error) return {error}; + if (!data) return Not found; + + const { dag, validation, runs } = data; + + return ( + + + + + + +
+ {dag.Name} + {dag.Description && ( + + {dag.Description} + + )} +
+ +
+ + + {dag.Schedule?.map((s: string) => ( + + {s} + + ))} + {dag.Type || "chain"} + {dag.Tags?.map((t: string) => ( + + {t} + + ))} + + + {!validation.Valid && ( + + {validation.Errors.map((e: string, i: number) => ( + + {e} + + ))} + + )} + + + + Steps ({dag.Steps?.length || 0}) + + {validation.Levels?.map((level: string[], i: number) => ( + + + Level {i}: + + {level.map((name: string) => { + const step = dag.Steps?.find( + (s) => s.Name === name || s.ID === name + ); + return ( + + {name} + {step?.Depends?.length + ? ` (after ${step.Depends.join(",")})` + : ""} + + ); + })} + + ))} + + {dag.Env && Object.keys(dag.Env).length > 0 && ( + <> + + Environment + + + {Object.entries(dag.Env) + .map(([k, v]) => `${k}=${v}`) + .join("\n")} + + + )} + + + + + Run History + + {runs?.length ? ( + + + + Status + Trigger + Started + Duration + + + + {runs.map((r) => ( + navigate(`/runs/${r.ID}`)} + > + + + + {r.Trigger} + + {new Date(r.StartedAt).toLocaleString()} + + + {r.FinishedAt + ? `${Math.round((new Date(r.FinishedAt).getTime() - new Date(r.StartedAt).getTime()) / 1000)}s` + : "running..."} + + + ))} + +
+ ) : ( + + No runs yet + + )} +
+
+ ); +} diff --git a/apps/dag_engine/frontend/src/pages/DagList.tsx b/apps/dag_engine/frontend/src/pages/DagList.tsx new file mode 100644 index 00000000..7993bd08 --- /dev/null +++ b/apps/dag_engine/frontend/src/pages/DagList.tsx @@ -0,0 +1,164 @@ +import { useEffect, useState } from "react"; +import { useNavigate } from "react-router-dom"; +import { + Table, + Title, + Group, + Button, + Badge, + Text, + Loader, + Stack, + Alert, +} from "@mantine/core"; +import { + IconPlayerPlay, + IconPlayerStop, + IconRefresh, +} from "@tabler/icons-react"; +import { listDags, getSchedulerStatus, startScheduler, stopScheduler } from "../api"; +import { StatusBadge } from "../components/StatusBadge"; +import type { DagSummary, SchedulerStatus } from "../types"; + +export function DagList() { + const [dags, setDags] = useState([]); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const [scheduler, setScheduler] = useState(null); + const navigate = useNavigate(); + + const load = async () => { + setLoading(true); + setError(null); + try { + const [d, s] = await Promise.all([listDags(), getSchedulerStatus()]); + setDags(d || []); + setScheduler(s); + } catch (e) { + setError((e as Error).message); + } finally { + setLoading(false); + } + }; + + useEffect(() => { + load(); + const interval = setInterval(load, 10000); + return () => clearInterval(interval); + }, []); + + const toggleScheduler = async () => { + if (scheduler?.running) { + await stopScheduler(); + } else { + await startScheduler(); + } + const s = await getSchedulerStatus(); + setScheduler(s); + }; + + return ( + + + DAGs + + + + + + + {error && {error}} + + {loading && !dags.length ? ( + + ) : ( + + + + Name + Schedule + Type + Tags + Last Status + Last Run + + + + {dags.map((d) => ( + navigate(`/dags/${d.name}`)} + > + + {d.name} + {d.description && ( + + {d.description} + + )} + + + + {d.schedule?.join(", ") || "-"} + + + + + {d.type || "chain"} + + + + + {d.tags?.map((t) => ( + + {t} + + ))} + + + + {d.last_run ? ( + + ) : ( + + - + + )} + + + + {d.last_run + ? new Date(d.last_run.StartedAt).toLocaleString() + : "-"} + + + + ))} + +
+ )} +
+ ); +} diff --git a/apps/dag_engine/frontend/src/pages/RunDetail.tsx b/apps/dag_engine/frontend/src/pages/RunDetail.tsx new file mode 100644 index 00000000..99e2d80d --- /dev/null +++ b/apps/dag_engine/frontend/src/pages/RunDetail.tsx @@ -0,0 +1,105 @@ +import { useEffect, useState } from "react"; +import { useParams, useNavigate } from "react-router-dom"; +import { + Title, + Text, + Group, + Button, + Stack, + Paper, + Alert, + Loader, +} from "@mantine/core"; +import { IconArrowLeft } from "@tabler/icons-react"; +import { getRun } from "../api"; +import { StatusBadge } from "../components/StatusBadge"; +import { StepTimeline } from "../components/StepTimeline"; +import type { RunDetail as RunDetailType } from "../types"; + +export function RunDetail() { + const { id } = useParams<{ id: string }>(); + const navigate = useNavigate(); + const [data, setData] = useState(null); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + + const load = async () => { + if (!id) return; + try { + setData(await getRun(id)); + setError(null); + } catch (e) { + setError((e as Error).message); + } finally { + setLoading(false); + } + }; + + useEffect(() => { + load(); + // Auto-refresh while running. + const interval = setInterval(() => { + if (data?.run.Status === "running") { + load(); + } + }, 2000); + return () => clearInterval(interval); + }, [id, data?.run.Status]); + + if (loading) return ; + if (error) return {error}; + if (!data) return Not found; + + const { run, steps } = data; + const duration = run.FinishedAt + ? `${Math.round((new Date(run.FinishedAt).getTime() - new Date(run.StartedAt).getTime()) / 1000)}s` + : "running..."; + + return ( + + + + + + +
+ Run {run.ID.substring(0, 16)}... + + {run.DagName} · {run.Trigger} ·{" "} + {new Date(run.StartedAt).toLocaleString()} + +
+ + + {duration} + +
+ + {run.Error && ( + + {run.Error} + + )} + + + + Steps ({steps?.length || 0}) + + {steps?.length ? ( + + ) : ( + + No steps recorded + + )} + +
+ ); +} diff --git a/apps/dag_engine/frontend/src/types.ts b/apps/dag_engine/frontend/src/types.ts new file mode 100644 index 00000000..4c510709 --- /dev/null +++ b/apps/dag_engine/frontend/src/types.ts @@ -0,0 +1,66 @@ +export interface DagSummary { + name: string; + description?: string; + schedule?: string[]; + tags?: string[]; + type?: string; + file_path: string; + valid: boolean; + last_run?: DagRun; +} + +export interface DagRun { + ID: string; + DagName: string; + DagPath: string; + Status: string; + Trigger: string; + StartedAt: string; + FinishedAt?: string; + Error: string; +} + +export interface DagStepResult { + ID: string; + RunID: string; + StepName: string; + Status: string; + ExitCode: number; + Stdout: string; + Stderr: string; + StartedAt?: string; + FinishedAt?: string; + DurationMs: number; + Error: string; +} + +export interface DagDetail { + info: DagSummary; + dag: { + Name: string; + Description: string; + Type: string; + Schedule: string[]; + Steps: { Name: string; ID: string; Command: string; Script: string; Depends: string[] }[]; + Env: Record; + Tags: string[]; + HandlerOn: { Failure: unknown[]; Success: unknown[] }; + }; + validation: { + Valid: boolean; + Errors: string[]; + Warnings: string[]; + Levels: string[][]; + }; + runs: DagRun[]; +} + +export interface RunDetail { + run: DagRun; + steps: DagStepResult[]; +} + +export interface SchedulerStatus { + running: boolean; + dags: { name: string; path: string; schedule: string; next_run: string }[]; +} diff --git a/apps/dag_engine/frontend/tsconfig.json b/apps/dag_engine/frontend/tsconfig.json new file mode 100644 index 00000000..39a405b9 --- /dev/null +++ b/apps/dag_engine/frontend/tsconfig.json @@ -0,0 +1,21 @@ +{ + "compilerOptions": { + "target": "ES2020", + "useDefineForClassFields": true, + "lib": ["ES2020", "DOM", "DOM.Iterable"], + "module": "ESNext", + "skipLibCheck": true, + "moduleResolution": "bundler", + "allowImportingTsExtensions": true, + "isolatedModules": true, + "moduleDetection": "force", + "noEmit": true, + "jsx": "react-jsx", + "strict": true, + "noUnusedLocals": true, + "noUnusedParameters": true, + "noFallthroughCasesInSwitch": true, + "noUncheckedSideEffectImports": true + }, + "include": ["src"] +} diff --git a/apps/dag_engine/frontend/vite.config.ts b/apps/dag_engine/frontend/vite.config.ts new file mode 100644 index 00000000..bd5bbb7c --- /dev/null +++ b/apps/dag_engine/frontend/vite.config.ts @@ -0,0 +1,15 @@ +import { defineConfig } from "vite"; +import react from "@vitejs/plugin-react"; + +export default defineConfig({ + plugins: [react()], + server: { + port: 5175, + proxy: { + "/api": "http://localhost:8090", + }, + }, + build: { + outDir: "dist", + }, +}); diff --git a/apps/dag_engine/go.mod b/apps/dag_engine/go.mod new file mode 100644 index 00000000..5eeabd69 --- /dev/null +++ b/apps/dag_engine/go.mod @@ -0,0 +1,48 @@ +module dag-engine + +go 1.25.0 + +require ( + fn-registry v0.0.0-00010101000000-000000000000 + github.com/mattn/go-sqlite3 v1.14.37 +) + +require ( + github.com/ClickHouse/ch-go v0.71.0 // indirect + github.com/ClickHouse/clickhouse-go/v2 v2.44.0 // indirect + github.com/andybalholm/brotli v1.2.0 // indirect + github.com/apache/arrow-go/v18 v18.1.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/go-faster/city v1.0.1 // indirect + github.com/go-faster/errors v0.7.1 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/goccy/go-json v0.10.5 // indirect + github.com/google/flatbuffers v25.1.24+incompatible // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/pgx/v5 v5.9.1 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/klauspost/compress v1.18.3 // indirect + github.com/klauspost/cpuid/v2 v2.2.9 // indirect + github.com/marcboeker/go-duckdb v1.8.5 // indirect + github.com/paulmach/orb v0.12.0 // indirect + github.com/pierrec/lz4/v4 v4.1.25 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/segmentio/asm v1.2.1 // indirect + github.com/shopspring/decimal v1.4.0 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + go.opentelemetry.io/otel v1.41.0 // indirect + go.opentelemetry.io/otel/trace v1.41.0 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect + golang.org/x/mod v0.27.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/text v0.29.0 // indirect + golang.org/x/tools v0.36.0 // indirect + golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace fn-registry => /home/lucas/fn_registry diff --git a/apps/dag_engine/go.sum b/apps/dag_engine/go.sum new file mode 100644 index 00000000..d8b66118 --- /dev/null +++ b/apps/dag_engine/go.sum @@ -0,0 +1,168 @@ +github.com/ClickHouse/ch-go v0.71.0 h1:bUdZ/EZj/LcVHsMqaRUP2holqygrPWQKeMjc6nZoyRM= +github.com/ClickHouse/ch-go v0.71.0/go.mod h1:NwbNc+7jaqfY58dmdDUbG4Jl22vThgx1cYjBw0vtgXw= +github.com/ClickHouse/clickhouse-go/v2 v2.44.0 h1:9pxs5pRwIvhni5BDRPn/n5A8DeUod5TnBaeulFBX8EQ= +github.com/ClickHouse/clickhouse-go/v2 v2.44.0/go.mod h1:giJfUVlMkcfUEPVfRpt51zZaGEx9i17gCos8gBl392c= +github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= +github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= +github.com/apache/arrow-go/v18 v18.1.0 h1:agLwJUiVuwXZdwPYVrlITfx7bndULJ/dggbnLFgDp/Y= +github.com/apache/arrow-go/v18 v18.1.0/go.mod h1:tigU/sIgKNXaesf5d7Y95jBBKS5KsxTqYBKXFsvKzo0= +github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= +github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= +github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= +github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= +github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v25.1.24+incompatible h1:4wPqL3K7GzBd1CwyhSd3usxLKOaJN/AC6puCca6Jm7o= +github.com/google/flatbuffers v25.1.24+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc= +github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw= +github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= +github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/marcboeker/go-duckdb v1.8.5 h1:tkYp+TANippy0DaIOP5OEfBEwbUINqiFqgwMQ44jME0= +github.com/marcboeker/go-duckdb v1.8.5/go.mod h1:6mK7+WQE4P4u5AFLvVBmhFxY5fvhymFptghgJX6B+/8= +github.com/mattn/go-sqlite3 v1.14.37 h1:3DOZp4cXis1cUIpCfXLtmlGolNLp2VEqhiB/PARNBIg= +github.com/mattn/go-sqlite3 v1.14.37/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/paulmach/orb v0.12.0 h1:z+zOwjmG3MyEEqzv92UN49Lg1JFYx0L9GpGKNVDKk1s= +github.com/paulmach/orb v0.12.0/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= +github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= +github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0= +github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= +github.com/segmentio/asm v1.2.1/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= +github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= +github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= +go.opentelemetry.io/otel v1.41.0 h1:YlEwVsGAlCvczDILpUXpIpPSL/VPugt7zHThEMLce1c= +go.opentelemetry.io/otel v1.41.0/go.mod h1:Yt4UwgEKeT05QbLwbyHXEwhnjxNO6D8L5PQP51/46dE= +go.opentelemetry.io/otel/trace v1.41.0 h1:Vbk2co6bhj8L59ZJ6/xFTskY+tGAbOnCtQGVVa9TIN0= +go.opentelemetry.io/otel/trace v1.41.0/go.mod h1:U1NU4ULCoxeDKc09yCWdWe+3QoyweJcISEVa1RBzOis= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ= +golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg= +golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0= +gonum.org/v1/gonum v0.15.1/go.mod h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/apps/dag_engine/handlers_dags.go b/apps/dag_engine/handlers_dags.go new file mode 100644 index 00000000..8998d6de --- /dev/null +++ b/apps/dag_engine/handlers_dags.go @@ -0,0 +1,76 @@ +package main + +import ( + "context" + "encoding/json" + "net/http" +) + +func handleListDags(executor *Executor) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + dags, err := executor.ListDAGs() + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, dags) + } +} + +func handleGetDag(executor *Executor) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + name := r.PathValue("name") + info, dag, validation, err := executor.GetDAG(name) + if err != nil { + writeError(w, http.StatusNotFound, err.Error()) + return + } + + // Get recent runs. + runs, _, _ := executor.store.ListRuns(dag.Name, 10, 0) + + resp := map[string]interface{}{ + "info": info, + "dag": dag, + "validation": validation, + "runs": runs, + } + writeJSON(w, http.StatusOK, resp) + } +} + +func handleRunDag(executor *Executor) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + name := r.PathValue("name") + info, _, _, err := executor.GetDAG(name) + if err != nil { + writeError(w, http.StatusNotFound, err.Error()) + return + } + + // Execute asynchronously. + go func() { + ctx := context.Background() + executor.ExecuteDAG(ctx, info.FilePath, "api") + }() + + // Return run acknowledgment. + writeJSON(w, http.StatusAccepted, map[string]string{ + "status": "accepted", + "dag": name, + "message": "DAG execution started", + }) + } +} + +// --- JSON helpers --- + +func writeJSON(w http.ResponseWriter, status int, data interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + json.NewEncoder(w).Encode(data) +} + +func writeError(w http.ResponseWriter, status int, msg string) { + writeJSON(w, status, map[string]string{"error": msg}) +} diff --git a/apps/dag_engine/handlers_runs.go b/apps/dag_engine/handlers_runs.go new file mode 100644 index 00000000..cdd0811c --- /dev/null +++ b/apps/dag_engine/handlers_runs.go @@ -0,0 +1,59 @@ +package main + +import ( + "net/http" + "strconv" +) + +func handleListRuns(executor *Executor) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + dagName := r.URL.Query().Get("dag") + limit, _ := strconv.Atoi(r.URL.Query().Get("limit")) + offset, _ := strconv.Atoi(r.URL.Query().Get("offset")) + if limit <= 0 || limit > 100 { + limit = 20 + } + if offset < 0 { + offset = 0 + } + + runs, total, err := executor.store.ListRuns(dagName, limit, offset) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + writeJSON(w, http.StatusOK, map[string]interface{}{ + "runs": runs, + "total": total, + "limit": limit, + "offset": offset, + }) + } +} + +func handleGetRun(executor *Executor) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + run, err := executor.store.GetRun(id) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + if run == nil { + writeError(w, http.StatusNotFound, "run not found") + return + } + + steps, err := executor.store.ListStepResults(id) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + writeJSON(w, http.StatusOK, map[string]interface{}{ + "run": run, + "steps": steps, + }) + } +} diff --git a/apps/dag_engine/handlers_scheduler.go b/apps/dag_engine/handlers_scheduler.go new file mode 100644 index 00000000..c7e33abf --- /dev/null +++ b/apps/dag_engine/handlers_scheduler.go @@ -0,0 +1,27 @@ +package main + +import "net/http" + +func handleSchedulerStart(scheduler *Scheduler) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if err := scheduler.Start(); err != nil { + writeError(w, http.StatusConflict, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]string{"status": "started"}) + } +} + +func handleSchedulerStop(scheduler *Scheduler) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + scheduler.Stop() + writeJSON(w, http.StatusOK, map[string]string{"status": "stopped"}) + } +} + +func handleSchedulerStatus(scheduler *Scheduler) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + status := scheduler.Status() + writeJSON(w, http.StatusOK, status) + } +} diff --git a/apps/dag_engine/main.go b/apps/dag_engine/main.go new file mode 100644 index 00000000..a3498001 --- /dev/null +++ b/apps/dag_engine/main.go @@ -0,0 +1,336 @@ +package main + +import ( + "context" + "embed" + "flag" + "fmt" + iofs "io/fs" + "log" + "net/http" + "os" + "os/signal" + "path/filepath" + "strings" + "syscall" + "text/tabwriter" + "time" + + "fn-registry/functions/core" + + "dag-engine/store" +) + +//go:embed all:frontend/dist +var frontendDist embed.FS + +func main() { + if len(os.Args) < 2 { + printUsage() + os.Exit(1) + } + + cmd := os.Args[1] + args := os.Args[2:] + + switch cmd { + case "run": + cmdRun(args) + case "list": + cmdList(args) + case "status": + cmdStatus(args) + case "validate": + cmdValidate(args) + case "server": + cmdServer(args) + case "help", "-h", "--help": + printUsage() + default: + fmt.Fprintf(os.Stderr, "unknown command: %s\n", cmd) + printUsage() + os.Exit(1) + } +} + +func printUsage() { + fmt.Println(`dag-engine — DAG workflow executor + +Usage: + dag-engine [options] + +Commands: + run Execute a DAG and show results + list [dir] List DAGs with schedule and last status + status [dag_name] Show execution history + validate Parse and validate without executing + server Start HTTP server with web frontend + +Server options: + --port HTTP port (default: 8090) + --dags-dir DAGs directory (default: ~/dagu/dags) + --db SQLite database path (default: dag_engine.db) + --scheduler Auto-start cron scheduler`) +} + +// --- CLI Commands --- + +func cmdRun(args []string) { + if len(args) < 1 { + fmt.Fprintln(os.Stderr, "usage: dag-engine run ") + os.Exit(1) + } + + dagPath := args[0] + cfg := DefaultConfig() + + // Parse optional flags after the path. + fs := flag.NewFlagSet("run", flag.ExitOnError) + fs.StringVar(&cfg.DBPath, "db", cfg.DBPath, "SQLite database path") + fs.Parse(args[1:]) + + db, err := store.Open(cfg.DBPath) + if err != nil { + log.Fatalf("open db: %v", err) + } + defer db.Close() + + executor := NewExecutor(db, filepath.Dir(dagPath)) + + fmt.Printf("Executing %s...\n", dagPath) + ctx := context.Background() + runID, err := executor.ExecuteDAG(ctx, dagPath, "manual") + + // Print results. + if runID != "" { + run, _ := db.GetRun(runID) + steps, _ := db.ListStepResults(runID) + + if run != nil { + fmt.Println() + for _, s := range steps { + icon := " " + switch s.Status { + case "success": + icon = "OK" + case "failed": + icon = "!!" + case "skipped": + icon = "--" + case "running": + icon = ".." + } + fmt.Printf("[%s] %s (%dms)\n", icon, s.StepName, s.DurationMs) + if s.Status == "failed" && s.Stderr != "" { + for _, line := range strings.Split(strings.TrimSpace(s.Stderr), "\n") { + fmt.Printf(" %s\n", line) + } + } + } + fmt.Println() + + dur := "" + if run.FinishedAt != nil { + dur = fmt.Sprintf(" (%s)", run.FinishedAt.Sub(run.StartedAt).Round(time.Millisecond)) + } + fmt.Printf("Run %s: %s%s\n", runID, strings.ToUpper(run.Status), dur) + } + } + + if err != nil { + os.Exit(1) + } +} + +func cmdList(args []string) { + cfg := DefaultConfig() + if len(args) > 0 && !strings.HasPrefix(args[0], "-") { + cfg.DagsDir = args[0] + args = args[1:] + } + + fs := flag.NewFlagSet("list", flag.ExitOnError) + fs.StringVar(&cfg.DBPath, "db", cfg.DBPath, "SQLite database path") + fs.Parse(args) + + db, err := store.Open(cfg.DBPath) + if err != nil { + log.Fatalf("open db: %v", err) + } + defer db.Close() + + executor := NewExecutor(db, cfg.DagsDir) + dags, err := executor.ListDAGs() + if err != nil { + log.Fatalf("list dags: %v", err) + } + + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + fmt.Fprintln(w, "NAME\tSCHEDULE\tTYPE\tTAGS\tLAST STATUS\tLAST RUN") + for _, d := range dags { + sched := strings.Join(d.Schedule, ", ") + tags := strings.Join(d.Tags, ", ") + lastStatus := "-" + lastRun := "-" + if d.LastRun != nil { + lastStatus = d.LastRun.Status + lastRun = d.LastRun.StartedAt.Format("2006-01-02 15:04") + } + typ := d.Type + if typ == "" { + typ = "chain" + } + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", d.Name, sched, typ, tags, lastStatus, lastRun) + } + w.Flush() +} + +func cmdStatus(args []string) { + cfg := DefaultConfig() + + fs := flag.NewFlagSet("status", flag.ExitOnError) + fs.StringVar(&cfg.DBPath, "db", cfg.DBPath, "SQLite database path") + limit := fs.Int("limit", 10, "number of runs to show") + fs.Parse(args) + + dagName := "" + if fs.NArg() > 0 { + dagName = fs.Arg(0) + } + + db, err := store.Open(cfg.DBPath) + if err != nil { + log.Fatalf("open db: %v", err) + } + defer db.Close() + + runs, total, err := db.ListRuns(dagName, *limit, 0) + if err != nil { + log.Fatalf("list runs: %v", err) + } + + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + fmt.Fprintf(w, "Showing %d of %d runs", len(runs), total) + if dagName != "" { + fmt.Fprintf(w, " for %s", dagName) + } + fmt.Fprintln(w) + fmt.Fprintln(w, "RUN_ID\tDAG\tSTATUS\tTRIGGER\tSTARTED\tDURATION") + for _, r := range runs { + dur := "-" + if r.FinishedAt != nil { + dur = r.FinishedAt.Sub(r.StartedAt).Round(time.Millisecond).String() + } + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", + r.ID, r.DagName, r.Status, r.Trigger, + r.StartedAt.Format("2006-01-02 15:04:05"), dur) + } + w.Flush() +} + +func cmdValidate(args []string) { + if len(args) < 1 { + fmt.Fprintln(os.Stderr, "usage: dag-engine validate ") + os.Exit(1) + } + + data, err := os.ReadFile(args[0]) + if err != nil { + log.Fatalf("read: %v", err) + } + + dag, err := core.DagParse(data) + if err != nil { + log.Fatalf("parse error: %v", err) + } + + result := core.DagValidate(dag) + + fmt.Printf("DAG: %s\n", dag.Name) + fmt.Printf("Steps: %d\n", len(dag.Steps)) + fmt.Printf("Schedule: %v\n", dag.Schedule) + fmt.Printf("Type: %s\n", dag.Type) + + if result.Valid { + fmt.Println("Validation: PASS") + for i, level := range result.Levels { + fmt.Printf(" Level %d: %v\n", i, level) + } + } else { + fmt.Println("Validation: FAIL") + for _, e := range result.Errors { + fmt.Printf(" ERROR: %s\n", e) + } + } + for _, w := range result.Warnings { + fmt.Printf(" WARNING: %s\n", w) + } + + if !result.Valid { + os.Exit(1) + } +} + +// --- Server Command --- + +func cmdServer(args []string) { + cfg := DefaultConfig() + fs := flag.NewFlagSet("server", flag.ExitOnError) + cfg.ParseFlags(fs, args) + + db, err := store.Open(cfg.DBPath) + if err != nil { + log.Fatalf("open db: %v", err) + } + defer db.Close() + + executor := NewExecutor(db, cfg.DagsDir) + scheduler := NewScheduler(executor, cfg.DagsDir) + + // Prepare frontend FS. + var feFS iofs.FS + distFS, err := iofs.Sub(frontendDist, "frontend/dist") + if err == nil { + // Check if dist has content (built frontend exists). + entries, _ := iofs.ReadDir(distFS, ".") + if len(entries) > 0 { + feFS = distFS + log.Printf("serving frontend from embedded dist/") + } + } + if feFS == nil { + log.Printf("no frontend build found, API-only mode") + } + + mux := http.NewServeMux() + RegisterAPI(mux, executor, scheduler, feFS) + + handler := corsMiddleware(loggingMiddleware(mux)) + + if cfg.AutoScheduler { + if err := scheduler.Start(); err != nil { + log.Printf("scheduler start: %v", err) + } + } + + addr := fmt.Sprintf(":%d", cfg.Port) + log.Printf("dag-engine server starting on http://0.0.0.0%s", addr) + log.Printf("dags dir: %s", cfg.DagsDir) + log.Printf("database: %s", cfg.DBPath) + + srv := &http.Server{Addr: addr, Handler: handler} + + // Graceful shutdown. + go func() { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + log.Println("shutting down...") + scheduler.Stop() + srv.Shutdown(context.Background()) + }() + + if err := srv.ListenAndServe(); err != http.ErrServerClosed { + log.Fatalf("server: %v", err) + } +} diff --git a/apps/dag_engine/middleware.go b/apps/dag_engine/middleware.go new file mode 100644 index 00000000..5e42d955 --- /dev/null +++ b/apps/dag_engine/middleware.go @@ -0,0 +1,30 @@ +package main + +import ( + "log" + "net/http" + "time" +) + +// corsMiddleware adds permissive CORS headers for development. +func corsMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusNoContent) + return + } + next.ServeHTTP(w, r) + }) +} + +// loggingMiddleware logs each HTTP request with method, path and duration. +func loggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + next.ServeHTTP(w, r) + log.Printf("%s %s %s", r.Method, r.URL.Path, time.Since(start).Round(time.Millisecond)) + }) +} diff --git a/apps/dag_engine/scheduler.go b/apps/dag_engine/scheduler.go new file mode 100644 index 00000000..a2da6d8e --- /dev/null +++ b/apps/dag_engine/scheduler.go @@ -0,0 +1,188 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "fn-registry/functions/core" + "fn-registry/functions/infra" +) + +// ScheduledDAG represents a DAG with a parsed cron schedule. +type ScheduledDAG struct { + Name string `json:"name"` + Path string `json:"path"` + Schedule string `json:"schedule"` + NextRun time.Time `json:"next_run"` +} + +// Scheduler manages cron-triggered DAG execution. +type Scheduler struct { + mu sync.Mutex + running bool + cancel context.CancelFunc + dagsDir string + executor *Executor + dags []ScheduledDAG +} + +// NewScheduler creates a new scheduler. +func NewScheduler(executor *Executor, dagsDir string) *Scheduler { + return &Scheduler{ + executor: executor, + dagsDir: dagsDir, + } +} + +// Start scans for DAGs with schedules and starts cron tickers for each. +func (s *Scheduler) Start() error { + s.mu.Lock() + if s.running { + s.mu.Unlock() + return fmt.Errorf("scheduler already running") + } + + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + s.running = true + s.mu.Unlock() + + scheduled, err := s.scanDAGs() + if err != nil { + s.mu.Lock() + s.running = false + s.mu.Unlock() + cancel() + return err + } + + s.mu.Lock() + s.dags = scheduled + s.mu.Unlock() + + log.Printf("[scheduler] started with %d DAGs", len(scheduled)) + + for _, dag := range scheduled { + dag := dag + go s.runTicker(ctx, dag) + } + + return nil +} + +// Stop cancels all tickers and stops the scheduler. +func (s *Scheduler) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + if !s.running { + return + } + s.cancel() + s.running = false + s.dags = nil + log.Printf("[scheduler] stopped") +} + +// IsRunning returns true if the scheduler is active. +func (s *Scheduler) IsRunning() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.running +} + +// Status returns the list of scheduled DAGs with their next run time. +type SchedulerStatus struct { + Running bool `json:"running"` + DAGs []ScheduledDAG `json:"dags"` +} + +func (s *Scheduler) Status() SchedulerStatus { + s.mu.Lock() + defer s.mu.Unlock() + return SchedulerStatus{ + Running: s.running, + DAGs: s.dags, + } +} + +// scanDAGs reads the dags directory and returns DAGs that have cron schedules. +func (s *Scheduler) scanDAGs() ([]ScheduledDAG, error) { + entries, err := os.ReadDir(s.dagsDir) + if err != nil { + return nil, err + } + + var scheduled []ScheduledDAG + for _, entry := range entries { + ext := filepath.Ext(entry.Name()) + if ext != ".yaml" && ext != ".yml" { + continue + } + + path := filepath.Join(s.dagsDir, entry.Name()) + data, err := os.ReadFile(path) + if err != nil { + continue + } + + dag, err := core.DagParse(data) + if err != nil { + continue + } + + for _, expr := range dag.Schedule { + sched, err := core.ParseCronExpr(strings.TrimSpace(expr)) + if err != nil { + log.Printf("[scheduler] invalid cron %q in %s: %v", expr, dag.Name, err) + continue + } + next := core.NextCronTime(sched, time.Now()) + scheduled = append(scheduled, ScheduledDAG{ + Name: dag.Name, + Path: path, + Schedule: expr, + NextRun: next, + }) + } + } + + return scheduled, nil +} + +// runTicker starts a cron ticker for a single DAG schedule. +func (s *Scheduler) runTicker(ctx context.Context, dag ScheduledDAG) { + sched, err := core.ParseCronExpr(strings.TrimSpace(dag.Schedule)) + if err != nil { + return + } + + // Convert core.CronSchedule to infra.CronTickerSchedule. + tickerSched := infra.CronTickerSchedule{ + Minute: sched.Minute, + Hour: sched.Hour, + DayOfMonth: sched.DayOfMonth, + Month: sched.Month, + DayOfWeek: sched.DayOfWeek, + } + + ch := infra.CronTicker(tickerSched, ctx) + log.Printf("[scheduler] ticker started for %s (%s), next: %s", dag.Name, dag.Schedule, dag.NextRun.Format(time.RFC3339)) + + for t := range ch { + log.Printf("[scheduler] triggered %s at %s", dag.Name, t.Format(time.RFC3339)) + go func() { + runID, err := s.executor.ExecuteDAG(ctx, dag.Path, "cron") + if err != nil { + log.Printf("[scheduler] %s failed: %v (run: %s)", dag.Name, err, runID) + } else { + log.Printf("[scheduler] %s completed (run: %s)", dag.Name, runID) + } + }() + } +} diff --git a/apps/dag_engine/store/migrations/001_init.sql b/apps/dag_engine/store/migrations/001_init.sql new file mode 100644 index 00000000..218ae39c --- /dev/null +++ b/apps/dag_engine/store/migrations/001_init.sql @@ -0,0 +1,29 @@ +CREATE TABLE IF NOT EXISTS dag_runs ( + id TEXT PRIMARY KEY, + dag_name TEXT NOT NULL, + dag_path TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'pending' CHECK(status IN ('pending','running','success','failed','cancelled')), + trigger TEXT NOT NULL DEFAULT 'manual' CHECK(trigger IN ('manual','cron','api')), + started_at TEXT NOT NULL, + finished_at TEXT, + error TEXT NOT NULL DEFAULT '' +); + +CREATE TABLE IF NOT EXISTS dag_step_results ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL REFERENCES dag_runs(id) ON DELETE CASCADE, + step_name TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending' CHECK(status IN ('pending','running','success','failed','skipped')), + exit_code INTEGER NOT NULL DEFAULT -1, + stdout TEXT NOT NULL DEFAULT '', + stderr TEXT NOT NULL DEFAULT '', + started_at TEXT, + finished_at TEXT, + duration_ms INTEGER NOT NULL DEFAULT 0, + error TEXT NOT NULL DEFAULT '' +); + +CREATE INDEX IF NOT EXISTS idx_runs_dag_name ON dag_runs(dag_name); +CREATE INDEX IF NOT EXISTS idx_runs_status ON dag_runs(status); +CREATE INDEX IF NOT EXISTS idx_runs_started ON dag_runs(started_at DESC); +CREATE INDEX IF NOT EXISTS idx_step_results_run ON dag_step_results(run_id); diff --git a/apps/dag_engine/store/store.go b/apps/dag_engine/store/store.go new file mode 100644 index 00000000..9f049062 --- /dev/null +++ b/apps/dag_engine/store/store.go @@ -0,0 +1,231 @@ +package store + +import ( + "database/sql" + _ "embed" + "fmt" + "time" + + _ "github.com/mattn/go-sqlite3" +) + +//go:embed migrations/001_init.sql +var migrationSQL string + +// DB wraps a SQLite connection for DAG run persistence. +type DB struct { + conn *sql.DB + path string +} + +// Open opens or creates a DAG engine database at the given path. +func Open(path string) (*DB, error) { + conn, err := sql.Open("sqlite3", path+"?_journal_mode=WAL&_busy_timeout=5000&_foreign_keys=on") + if err != nil { + return nil, fmt.Errorf("store: open %s: %w", path, err) + } + if _, err := conn.Exec(migrationSQL); err != nil { + conn.Close() + return nil, fmt.Errorf("store: migrate: %w", err) + } + return &DB{conn: conn, path: path}, nil +} + +// Close closes the database connection. +func (db *DB) Close() error { + return db.conn.Close() +} + +// --- DagRun CRUD --- + +// DagRun mirrors infra.DagRun for the store layer. +type DagRun struct { + ID string + DagName string + DagPath string + Status string + Trigger string + StartedAt time.Time + FinishedAt *time.Time + Error string +} + +// CreateRun inserts a new run record. +func (db *DB) CreateRun(run *DagRun) error { + _, err := db.conn.Exec( + `INSERT INTO dag_runs (id, dag_name, dag_path, status, trigger, started_at, error) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + run.ID, run.DagName, run.DagPath, run.Status, run.Trigger, + run.StartedAt.Format(time.RFC3339), run.Error, + ) + return err +} + +// UpdateRunStatus updates a run's status and optionally its finished_at and error. +func (db *DB) UpdateRunStatus(id, status string, finishedAt *time.Time, errMsg string) error { + var fin *string + if finishedAt != nil { + s := finishedAt.Format(time.RFC3339) + fin = &s + } + _, err := db.conn.Exec( + `UPDATE dag_runs SET status=?, finished_at=?, error=? WHERE id=?`, + status, fin, errMsg, id, + ) + return err +} + +// GetRun retrieves a single run by ID. +func (db *DB) GetRun(id string) (*DagRun, error) { + row := db.conn.QueryRow( + `SELECT id, dag_name, dag_path, status, trigger, started_at, finished_at, error + FROM dag_runs WHERE id=?`, id, + ) + return scanRun(row) +} + +// ListRuns returns runs, newest first, with optional dag name filter. +func (db *DB) ListRuns(dagName string, limit, offset int) ([]DagRun, int, error) { + var total int + var args []interface{} + where := "" + if dagName != "" { + where = " WHERE dag_name=?" + args = append(args, dagName) + } + err := db.conn.QueryRow("SELECT COUNT(*) FROM dag_runs"+where, args...).Scan(&total) + if err != nil { + return nil, 0, err + } + + query := "SELECT id, dag_name, dag_path, status, trigger, started_at, finished_at, error FROM dag_runs" + + where + " ORDER BY started_at DESC LIMIT ? OFFSET ?" + args = append(args, limit, offset) + + rows, err := db.conn.Query(query, args...) + if err != nil { + return nil, 0, err + } + defer rows.Close() + + var runs []DagRun + for rows.Next() { + r, err := scanRunRows(rows) + if err != nil { + return nil, 0, err + } + runs = append(runs, *r) + } + return runs, total, rows.Err() +} + +// --- DagStepResult CRUD --- + +// DagStepResult mirrors infra.DagStepResult for the store layer. +type DagStepResult struct { + ID string + RunID string + StepName string + Status string + ExitCode int + Stdout string + Stderr string + StartedAt *time.Time + FinishedAt *time.Time + DurationMs int64 + Error string +} + +// InsertStepResult inserts a new step result. +func (db *DB) InsertStepResult(r *DagStepResult) error { + var startedAt, finishedAt *string + if r.StartedAt != nil { + s := r.StartedAt.Format(time.RFC3339) + startedAt = &s + } + if r.FinishedAt != nil { + s := r.FinishedAt.Format(time.RFC3339) + finishedAt = &s + } + _, err := db.conn.Exec( + `INSERT INTO dag_step_results (id, run_id, step_name, status, exit_code, stdout, stderr, started_at, finished_at, duration_ms, error) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + r.ID, r.RunID, r.StepName, r.Status, r.ExitCode, r.Stdout, r.Stderr, + startedAt, finishedAt, r.DurationMs, r.Error, + ) + return err +} + +// UpdateStepResult updates a step result by ID. +func (db *DB) UpdateStepResult(id, status string, exitCode int, stdout, stderr string, finishedAt *time.Time, durationMs int64, errMsg string) error { + var fin *string + if finishedAt != nil { + s := finishedAt.Format(time.RFC3339) + fin = &s + } + _, err := db.conn.Exec( + `UPDATE dag_step_results SET status=?, exit_code=?, stdout=?, stderr=?, finished_at=?, duration_ms=?, error=? WHERE id=?`, + status, exitCode, stdout, stderr, fin, durationMs, errMsg, id, + ) + return err +} + +// ListStepResults returns all step results for a given run. +func (db *DB) ListStepResults(runID string) ([]DagStepResult, error) { + rows, err := db.conn.Query( + `SELECT id, run_id, step_name, status, exit_code, stdout, stderr, started_at, finished_at, duration_ms, error + FROM dag_step_results WHERE run_id=? ORDER BY started_at ASC`, runID, + ) + if err != nil { + return nil, err + } + defer rows.Close() + + var results []DagStepResult + for rows.Next() { + var r DagStepResult + var startedAt, finishedAt sql.NullString + if err := rows.Scan(&r.ID, &r.RunID, &r.StepName, &r.Status, &r.ExitCode, + &r.Stdout, &r.Stderr, &startedAt, &finishedAt, &r.DurationMs, &r.Error); err != nil { + return nil, err + } + if startedAt.Valid { + t, _ := time.Parse(time.RFC3339, startedAt.String) + r.StartedAt = &t + } + if finishedAt.Valid { + t, _ := time.Parse(time.RFC3339, finishedAt.String) + r.FinishedAt = &t + } + results = append(results, r) + } + return results, rows.Err() +} + +// --- scan helpers --- + +type scanner interface { + Scan(dest ...interface{}) error +} + +func scanRun(s scanner) (*DagRun, error) { + var r DagRun + var startedAt string + var finishedAt sql.NullString + if err := s.Scan(&r.ID, &r.DagName, &r.DagPath, &r.Status, &r.Trigger, &startedAt, &finishedAt, &r.Error); err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + r.StartedAt, _ = time.Parse(time.RFC3339, startedAt) + if finishedAt.Valid { + t, _ := time.Parse(time.RFC3339, finishedAt.String) + r.FinishedAt = &t + } + return &r, nil +} + +func scanRunRows(rows *sql.Rows) (*DagRun, error) { + return scanRun(rows) +}