d9414e4cba
Full DAG engine app with CLI subcommands (run, list, status, validate, server) and React/Mantine web frontend. Uses net/http + embedded Vite build. SQLite store for run history. Scheduler with cron_ticker for automated execution. Compatible with existing dagu YAML format. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
189 lines
4.0 KiB
Go
189 lines
4.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"fn-registry/functions/core"
|
|
"fn-registry/functions/infra"
|
|
)
|
|
|
|
// ScheduledDAG represents a DAG with a parsed cron schedule.
|
|
type ScheduledDAG struct {
|
|
Name string `json:"name"`
|
|
Path string `json:"path"`
|
|
Schedule string `json:"schedule"`
|
|
NextRun time.Time `json:"next_run"`
|
|
}
|
|
|
|
// Scheduler manages cron-triggered DAG execution.
|
|
type Scheduler struct {
|
|
mu sync.Mutex
|
|
running bool
|
|
cancel context.CancelFunc
|
|
dagsDir string
|
|
executor *Executor
|
|
dags []ScheduledDAG
|
|
}
|
|
|
|
// NewScheduler creates a new scheduler.
|
|
func NewScheduler(executor *Executor, dagsDir string) *Scheduler {
|
|
return &Scheduler{
|
|
executor: executor,
|
|
dagsDir: dagsDir,
|
|
}
|
|
}
|
|
|
|
// Start scans for DAGs with schedules and starts cron tickers for each.
|
|
func (s *Scheduler) Start() error {
|
|
s.mu.Lock()
|
|
if s.running {
|
|
s.mu.Unlock()
|
|
return fmt.Errorf("scheduler already running")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s.cancel = cancel
|
|
s.running = true
|
|
s.mu.Unlock()
|
|
|
|
scheduled, err := s.scanDAGs()
|
|
if err != nil {
|
|
s.mu.Lock()
|
|
s.running = false
|
|
s.mu.Unlock()
|
|
cancel()
|
|
return err
|
|
}
|
|
|
|
s.mu.Lock()
|
|
s.dags = scheduled
|
|
s.mu.Unlock()
|
|
|
|
log.Printf("[scheduler] started with %d DAGs", len(scheduled))
|
|
|
|
for _, dag := range scheduled {
|
|
dag := dag
|
|
go s.runTicker(ctx, dag)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop cancels all tickers and stops the scheduler.
|
|
func (s *Scheduler) Stop() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if !s.running {
|
|
return
|
|
}
|
|
s.cancel()
|
|
s.running = false
|
|
s.dags = nil
|
|
log.Printf("[scheduler] stopped")
|
|
}
|
|
|
|
// IsRunning returns true if the scheduler is active.
|
|
func (s *Scheduler) IsRunning() bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.running
|
|
}
|
|
|
|
// Status returns the list of scheduled DAGs with their next run time.
|
|
type SchedulerStatus struct {
|
|
Running bool `json:"running"`
|
|
DAGs []ScheduledDAG `json:"dags"`
|
|
}
|
|
|
|
func (s *Scheduler) Status() SchedulerStatus {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return SchedulerStatus{
|
|
Running: s.running,
|
|
DAGs: s.dags,
|
|
}
|
|
}
|
|
|
|
// scanDAGs reads the dags directory and returns DAGs that have cron schedules.
|
|
func (s *Scheduler) scanDAGs() ([]ScheduledDAG, error) {
|
|
entries, err := os.ReadDir(s.dagsDir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var scheduled []ScheduledDAG
|
|
for _, entry := range entries {
|
|
ext := filepath.Ext(entry.Name())
|
|
if ext != ".yaml" && ext != ".yml" {
|
|
continue
|
|
}
|
|
|
|
path := filepath.Join(s.dagsDir, entry.Name())
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
dag, err := core.DagParse(data)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
for _, expr := range dag.Schedule {
|
|
sched, err := core.ParseCronExpr(strings.TrimSpace(expr))
|
|
if err != nil {
|
|
log.Printf("[scheduler] invalid cron %q in %s: %v", expr, dag.Name, err)
|
|
continue
|
|
}
|
|
next := core.NextCronTime(sched, time.Now())
|
|
scheduled = append(scheduled, ScheduledDAG{
|
|
Name: dag.Name,
|
|
Path: path,
|
|
Schedule: expr,
|
|
NextRun: next,
|
|
})
|
|
}
|
|
}
|
|
|
|
return scheduled, nil
|
|
}
|
|
|
|
// runTicker starts a cron ticker for a single DAG schedule.
|
|
func (s *Scheduler) runTicker(ctx context.Context, dag ScheduledDAG) {
|
|
sched, err := core.ParseCronExpr(strings.TrimSpace(dag.Schedule))
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Convert core.CronSchedule to infra.CronTickerSchedule.
|
|
tickerSched := infra.CronTickerSchedule{
|
|
Minute: sched.Minute,
|
|
Hour: sched.Hour,
|
|
DayOfMonth: sched.DayOfMonth,
|
|
Month: sched.Month,
|
|
DayOfWeek: sched.DayOfWeek,
|
|
}
|
|
|
|
ch := infra.CronTicker(tickerSched, ctx)
|
|
log.Printf("[scheduler] ticker started for %s (%s), next: %s", dag.Name, dag.Schedule, dag.NextRun.Format(time.RFC3339))
|
|
|
|
for t := range ch {
|
|
log.Printf("[scheduler] triggered %s at %s", dag.Name, t.Format(time.RFC3339))
|
|
go func() {
|
|
runID, err := s.executor.ExecuteDAG(ctx, dag.Path, "cron")
|
|
if err != nil {
|
|
log.Printf("[scheduler] %s failed: %v (run: %s)", dag.Name, err, runID)
|
|
} else {
|
|
log.Printf("[scheduler] %s completed (run: %s)", dag.Name, runID)
|
|
}
|
|
}()
|
|
}
|
|
}
|