Files
fn_registry/apps/dag_engine/scheduler.go
T
egutierrez d9414e4cba feat: add dag_engine app — CLI + web frontend for DAG execution (0007e)
Full DAG engine app with CLI subcommands (run, list, status, validate, server)
and React/Mantine web frontend. Uses net/http + embedded Vite build. SQLite
store for run history. Scheduler with cron_ticker for automated execution.
Compatible with existing dagu YAML format.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 13:05:36 +02:00

189 lines
4.0 KiB
Go

package main
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
"fn-registry/functions/core"
"fn-registry/functions/infra"
)
// ScheduledDAG represents a DAG with a parsed cron schedule.
type ScheduledDAG struct {
Name string `json:"name"`
Path string `json:"path"`
Schedule string `json:"schedule"`
NextRun time.Time `json:"next_run"`
}
// Scheduler manages cron-triggered DAG execution.
type Scheduler struct {
mu sync.Mutex
running bool
cancel context.CancelFunc
dagsDir string
executor *Executor
dags []ScheduledDAG
}
// NewScheduler creates a new scheduler.
func NewScheduler(executor *Executor, dagsDir string) *Scheduler {
return &Scheduler{
executor: executor,
dagsDir: dagsDir,
}
}
// Start scans for DAGs with schedules and starts cron tickers for each.
func (s *Scheduler) Start() error {
s.mu.Lock()
if s.running {
s.mu.Unlock()
return fmt.Errorf("scheduler already running")
}
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
s.running = true
s.mu.Unlock()
scheduled, err := s.scanDAGs()
if err != nil {
s.mu.Lock()
s.running = false
s.mu.Unlock()
cancel()
return err
}
s.mu.Lock()
s.dags = scheduled
s.mu.Unlock()
log.Printf("[scheduler] started with %d DAGs", len(scheduled))
for _, dag := range scheduled {
dag := dag
go s.runTicker(ctx, dag)
}
return nil
}
// Stop cancels all tickers and stops the scheduler.
func (s *Scheduler) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.running {
return
}
s.cancel()
s.running = false
s.dags = nil
log.Printf("[scheduler] stopped")
}
// IsRunning returns true if the scheduler is active.
func (s *Scheduler) IsRunning() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.running
}
// Status returns the list of scheduled DAGs with their next run time.
type SchedulerStatus struct {
Running bool `json:"running"`
DAGs []ScheduledDAG `json:"dags"`
}
func (s *Scheduler) Status() SchedulerStatus {
s.mu.Lock()
defer s.mu.Unlock()
return SchedulerStatus{
Running: s.running,
DAGs: s.dags,
}
}
// scanDAGs reads the dags directory and returns DAGs that have cron schedules.
func (s *Scheduler) scanDAGs() ([]ScheduledDAG, error) {
entries, err := os.ReadDir(s.dagsDir)
if err != nil {
return nil, err
}
var scheduled []ScheduledDAG
for _, entry := range entries {
ext := filepath.Ext(entry.Name())
if ext != ".yaml" && ext != ".yml" {
continue
}
path := filepath.Join(s.dagsDir, entry.Name())
data, err := os.ReadFile(path)
if err != nil {
continue
}
dag, err := core.DagParse(data)
if err != nil {
continue
}
for _, expr := range dag.Schedule {
sched, err := core.ParseCronExpr(strings.TrimSpace(expr))
if err != nil {
log.Printf("[scheduler] invalid cron %q in %s: %v", expr, dag.Name, err)
continue
}
next := core.NextCronTime(sched, time.Now())
scheduled = append(scheduled, ScheduledDAG{
Name: dag.Name,
Path: path,
Schedule: expr,
NextRun: next,
})
}
}
return scheduled, nil
}
// runTicker starts a cron ticker for a single DAG schedule.
func (s *Scheduler) runTicker(ctx context.Context, dag ScheduledDAG) {
sched, err := core.ParseCronExpr(strings.TrimSpace(dag.Schedule))
if err != nil {
return
}
// Convert core.CronSchedule to infra.CronTickerSchedule.
tickerSched := infra.CronTickerSchedule{
Minute: sched.Minute,
Hour: sched.Hour,
DayOfMonth: sched.DayOfMonth,
Month: sched.Month,
DayOfWeek: sched.DayOfWeek,
}
ch := infra.CronTicker(tickerSched, ctx)
log.Printf("[scheduler] ticker started for %s (%s), next: %s", dag.Name, dag.Schedule, dag.NextRun.Format(time.RFC3339))
for t := range ch {
log.Printf("[scheduler] triggered %s at %s", dag.Name, t.Format(time.RFC3339))
go func() {
runID, err := s.executor.ExecuteDAG(ctx, dag.Path, "cron")
if err != nil {
log.Printf("[scheduler] %s failed: %v (run: %s)", dag.Name, err, runID)
} else {
log.Printf("[scheduler] %s completed (run: %s)", dag.Name, runID)
}
}()
}
}