feat(dag_engine): WS hub /api/ws/dagruns + migracion DAGs desde dagu
- events.go: DagRunHub broadcastea snapshot+deltas live (500ms tick, 5s recent finished window) sobre dag_runs + dag_step_results. - api.go: handler GET /api/ws/dagruns upgrade WS, opt-in en RegisterAPI. - store.go: expone Conn() para read-only desde el hub. - main.go: construye DagRunHub al arrancar server. - dags_migrated/: 5 YAMLs migrados desde ~/dagu/dags tras desinstalar dagu (issue 0095 step 1). Smoke: snapshot inicial OK, trigger /api/dags/test_claude_access/run -> delta WS observa 3 step_results + run success en <1s. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,438 @@
|
||||
package main
|
||||
|
||||
// WebSocket hub para live updates de dag_runs + dag_step_results.
|
||||
// Patron: sqlite_api/events.go (CallMonitorHub) — issue 0095.
|
||||
//
|
||||
// Diseño:
|
||||
// - Hub global con N subscribers WS.
|
||||
// - Ticker arranca solo con >=1 subscriber. Cero overhead si nadie mira.
|
||||
// - Cada tick (500ms): query rowid>watermark + activos (status running/pending)
|
||||
// + recientes finished (ultimos 5s) -> broadcast upsert.
|
||||
// - Snapshot inicial: lista de DAGs + ultimos 50 runs + step_results.
|
||||
// - El cliente trata `runs` y `steps` como upserts por id.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"nhooyr.io/websocket"
|
||||
"nhooyr.io/websocket/wsjson"
|
||||
|
||||
"dag-engine/store"
|
||||
)
|
||||
|
||||
const (
|
||||
dagWSTickInterval = 500 * time.Millisecond
|
||||
dagWSTickIntervalIdle = 2 * time.Second
|
||||
dagWSIdleThreshold = 30 * time.Second
|
||||
dagWSSnapshotRuns = 50
|
||||
dagWSBroadcastTimeout = 2 * time.Second
|
||||
dagWSRecentFinishedS = 5
|
||||
)
|
||||
|
||||
type wsRun struct {
|
||||
ID string `json:"id"`
|
||||
DagName string `json:"dag_name"`
|
||||
DagPath string `json:"dag_path"`
|
||||
Status string `json:"status"`
|
||||
Trigger string `json:"trigger"`
|
||||
StartedAt string `json:"started_at"`
|
||||
FinishedAt string `json:"finished_at,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type wsStep struct {
|
||||
ID string `json:"id"`
|
||||
RunID string `json:"run_id"`
|
||||
StepName string `json:"step_name"`
|
||||
Status string `json:"status"`
|
||||
ExitCode int `json:"exit_code"`
|
||||
Stdout string `json:"stdout,omitempty"`
|
||||
Stderr string `json:"stderr,omitempty"`
|
||||
StartedAt string `json:"started_at,omitempty"`
|
||||
FinishedAt string `json:"finished_at,omitempty"`
|
||||
DurationMs int64 `json:"duration_ms"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type wsWatermark struct {
|
||||
Runs int64 `json:"runs"`
|
||||
Steps int64 `json:"steps"`
|
||||
}
|
||||
|
||||
type wsDagMessage struct {
|
||||
Type string `json:"type"` // snapshot|delta|ping
|
||||
Watermark wsWatermark `json:"watermark"`
|
||||
Dags []DagInfo `json:"dags,omitempty"`
|
||||
Runs []wsRun `json:"runs,omitempty"`
|
||||
Steps []wsStep `json:"steps,omitempty"`
|
||||
ServerTime int64 `json:"server_time"`
|
||||
}
|
||||
|
||||
type wsDagClientCmd struct {
|
||||
Watermark wsWatermark `json:"watermark,omitempty"`
|
||||
}
|
||||
|
||||
type dagSubscriber struct {
|
||||
conn *websocket.Conn
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
out chan wsDagMessage
|
||||
watermark wsWatermark
|
||||
}
|
||||
|
||||
// DagRunHub broadcastea cambios de dag_runs + dag_step_results a clientes WS.
|
||||
type DagRunHub struct {
|
||||
db *store.DB
|
||||
executor *Executor
|
||||
|
||||
mu sync.Mutex
|
||||
subscribers map[*dagSubscriber]struct{}
|
||||
tickerStop chan struct{}
|
||||
tickerOn bool
|
||||
watermark wsWatermark
|
||||
lastEventAt time.Time
|
||||
}
|
||||
|
||||
func NewDagRunHub(db *store.DB, executor *Executor) *DagRunHub {
|
||||
return &DagRunHub{
|
||||
db: db,
|
||||
executor: executor,
|
||||
subscribers: make(map[*dagSubscriber]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DagRunHub) register(s *dagSubscriber) {
|
||||
h.mu.Lock()
|
||||
h.subscribers[s] = struct{}{}
|
||||
shouldStart := !h.tickerOn
|
||||
if shouldStart {
|
||||
h.tickerStop = make(chan struct{})
|
||||
h.tickerOn = true
|
||||
h.lastEventAt = time.Now()
|
||||
}
|
||||
h.mu.Unlock()
|
||||
|
||||
if shouldStart {
|
||||
go h.tickerLoop()
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DagRunHub) unregister(s *dagSubscriber) {
|
||||
h.mu.Lock()
|
||||
if _, ok := h.subscribers[s]; !ok {
|
||||
h.mu.Unlock()
|
||||
return
|
||||
}
|
||||
delete(h.subscribers, s)
|
||||
close(s.out)
|
||||
shouldStop := h.tickerOn && len(h.subscribers) == 0
|
||||
if shouldStop {
|
||||
close(h.tickerStop)
|
||||
h.tickerOn = false
|
||||
}
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *DagRunHub) tickerLoop() {
|
||||
interval := dagWSTickInterval
|
||||
t := time.NewTimer(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-h.tickerStop:
|
||||
return
|
||||
case <-t.C:
|
||||
runs, steps, wm, err := h.fetchDelta(h.getWatermark())
|
||||
if err != nil {
|
||||
log.Printf("[dagws] fetchDelta: %v", err)
|
||||
} else if len(runs) > 0 || len(steps) > 0 {
|
||||
h.setWatermark(wm)
|
||||
h.recordActivity()
|
||||
h.broadcast(wsDagMessage{
|
||||
Type: "delta",
|
||||
Watermark: wm,
|
||||
Runs: runs,
|
||||
Steps: steps,
|
||||
ServerTime: time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
|
||||
if time.Since(h.lastActivityAt()) > dagWSIdleThreshold {
|
||||
interval = dagWSTickIntervalIdle
|
||||
} else {
|
||||
interval = dagWSTickInterval
|
||||
}
|
||||
t.Reset(interval)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DagRunHub) getWatermark() wsWatermark {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return h.watermark
|
||||
}
|
||||
|
||||
func (h *DagRunHub) setWatermark(v wsWatermark) {
|
||||
h.mu.Lock()
|
||||
if v.Runs > h.watermark.Runs {
|
||||
h.watermark.Runs = v.Runs
|
||||
}
|
||||
if v.Steps > h.watermark.Steps {
|
||||
h.watermark.Steps = v.Steps
|
||||
}
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *DagRunHub) recordActivity() {
|
||||
h.mu.Lock()
|
||||
h.lastEventAt = time.Now()
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *DagRunHub) lastActivityAt() time.Time {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return h.lastEventAt
|
||||
}
|
||||
|
||||
// fetchDelta devuelve runs/steps con (rowid > watermark) OR (status in-flight)
|
||||
// OR (recently finished). Watermark devuelto = max rowid visto.
|
||||
func (h *DagRunHub) fetchDelta(since wsWatermark) ([]wsRun, []wsStep, wsWatermark, error) {
|
||||
conn := h.db.Conn()
|
||||
if conn == nil {
|
||||
return nil, nil, since, nil
|
||||
}
|
||||
cutoff := time.Now().Add(-time.Duration(dagWSRecentFinishedS) * time.Second).Format(time.RFC3339)
|
||||
|
||||
runs, maxRuns, err := scanRuns(conn, `
|
||||
SELECT rowid, id, dag_name, dag_path, status, trigger, started_at,
|
||||
COALESCE(finished_at,''), error
|
||||
FROM dag_runs
|
||||
WHERE rowid > ?
|
||||
OR status IN ('running','pending')
|
||||
OR (finished_at IS NOT NULL AND finished_at >= ?)
|
||||
ORDER BY rowid ASC`, since.Runs, cutoff)
|
||||
if err != nil {
|
||||
return nil, nil, since, err
|
||||
}
|
||||
|
||||
steps, maxSteps, err := scanSteps(conn, `
|
||||
SELECT rowid, id, run_id, step_name, status, exit_code, stdout, stderr,
|
||||
COALESCE(started_at,''), COALESCE(finished_at,''), duration_ms, error
|
||||
FROM dag_step_results
|
||||
WHERE rowid > ?
|
||||
OR status IN ('running','pending')
|
||||
OR (finished_at IS NOT NULL AND finished_at >= ?)
|
||||
ORDER BY rowid ASC`, since.Steps, cutoff)
|
||||
if err != nil {
|
||||
return runs, nil, since, err
|
||||
}
|
||||
|
||||
out := wsWatermark{Runs: maxRuns, Steps: maxSteps}
|
||||
if out.Runs < since.Runs {
|
||||
out.Runs = since.Runs
|
||||
}
|
||||
if out.Steps < since.Steps {
|
||||
out.Steps = since.Steps
|
||||
}
|
||||
return runs, steps, out, nil
|
||||
}
|
||||
|
||||
// fetchSnapshot devuelve DAGs + ultimos N runs + sus step_results + watermark.
|
||||
func (h *DagRunHub) fetchSnapshot() ([]DagInfo, []wsRun, []wsStep, wsWatermark, error) {
|
||||
dags, err := h.executor.ListDAGs()
|
||||
if err != nil {
|
||||
log.Printf("[dagws] list dags: %v", err)
|
||||
dags = nil
|
||||
}
|
||||
conn := h.db.Conn()
|
||||
if conn == nil {
|
||||
return dags, nil, nil, wsWatermark{}, nil
|
||||
}
|
||||
|
||||
runs, maxRuns, err := scanRuns(conn, `
|
||||
SELECT rowid, id, dag_name, dag_path, status, trigger, started_at,
|
||||
COALESCE(finished_at,''), error
|
||||
FROM dag_runs
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ?`, dagWSSnapshotRuns)
|
||||
if err != nil {
|
||||
return dags, nil, nil, wsWatermark{}, err
|
||||
}
|
||||
|
||||
steps, maxSteps, err := scanSteps(conn, `
|
||||
SELECT rowid, id, run_id, step_name, status, exit_code, stdout, stderr,
|
||||
COALESCE(started_at,''), COALESCE(finished_at,''), duration_ms, error
|
||||
FROM dag_step_results
|
||||
WHERE run_id IN (SELECT id FROM dag_runs ORDER BY started_at DESC LIMIT ?)
|
||||
ORDER BY rowid ASC`, dagWSSnapshotRuns)
|
||||
if err != nil {
|
||||
return dags, runs, nil, wsWatermark{Runs: maxRuns}, err
|
||||
}
|
||||
|
||||
return dags, runs, steps, wsWatermark{Runs: maxRuns, Steps: maxSteps}, nil
|
||||
}
|
||||
|
||||
func scanRuns(conn *sql.DB, q string, args ...any) ([]wsRun, int64, error) {
|
||||
rows, err := conn.Query(q, args...)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []wsRun
|
||||
var max int64
|
||||
for rows.Next() {
|
||||
var r wsRun
|
||||
var rowid int64
|
||||
if err := rows.Scan(&rowid, &r.ID, &r.DagName, &r.DagPath, &r.Status,
|
||||
&r.Trigger, &r.StartedAt, &r.FinishedAt, &r.Error); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if rowid > max {
|
||||
max = rowid
|
||||
}
|
||||
out = append(out, r)
|
||||
}
|
||||
return out, max, rows.Err()
|
||||
}
|
||||
|
||||
func scanSteps(conn *sql.DB, q string, args ...any) ([]wsStep, int64, error) {
|
||||
rows, err := conn.Query(q, args...)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []wsStep
|
||||
var max int64
|
||||
for rows.Next() {
|
||||
var s wsStep
|
||||
var rowid int64
|
||||
if err := rows.Scan(&rowid, &s.ID, &s.RunID, &s.StepName, &s.Status,
|
||||
&s.ExitCode, &s.Stdout, &s.Stderr, &s.StartedAt, &s.FinishedAt,
|
||||
&s.DurationMs, &s.Error); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if rowid > max {
|
||||
max = rowid
|
||||
}
|
||||
out = append(out, s)
|
||||
}
|
||||
return out, max, rows.Err()
|
||||
}
|
||||
|
||||
func (h *DagRunHub) broadcast(msg wsDagMessage) {
|
||||
h.mu.Lock()
|
||||
subs := make([]*dagSubscriber, 0, len(h.subscribers))
|
||||
for s := range h.subscribers {
|
||||
subs = append(subs, s)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
|
||||
for _, s := range subs {
|
||||
select {
|
||||
case s.out <- msg:
|
||||
default:
|
||||
log.Printf("[dagws] dropping frame for slow subscriber")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleDagRunsWS upgrade WS y gestiona lifecycle.
|
||||
// Endpoint: GET /api/ws/dagruns
|
||||
func handleDagRunsWS(hub *DagRunHub) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
||||
InsecureSkipVerify: true,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("[dagws] accept: %v", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close(websocket.StatusInternalError, "closing")
|
||||
|
||||
ctx, cancel := context.WithCancel(r.Context())
|
||||
defer cancel()
|
||||
|
||||
sub := &dagSubscriber{
|
||||
conn: conn,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
out: make(chan wsDagMessage, 64),
|
||||
}
|
||||
hub.register(sub)
|
||||
defer hub.unregister(sub)
|
||||
|
||||
dags, runs, steps, wm, err := hub.fetchSnapshot()
|
||||
if err != nil {
|
||||
log.Printf("[dagws] snapshot: %v", err)
|
||||
conn.Close(websocket.StatusInternalError, "snapshot failed")
|
||||
return
|
||||
}
|
||||
hub.setWatermark(wm)
|
||||
initial := wsDagMessage{
|
||||
Type: "snapshot",
|
||||
Watermark: wm,
|
||||
Dags: dags,
|
||||
Runs: runs,
|
||||
Steps: steps,
|
||||
ServerTime: time.Now().Unix(),
|
||||
}
|
||||
if err := wsjson.Write(ctx, conn, initial); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
readErr := make(chan error, 1)
|
||||
go func() {
|
||||
for {
|
||||
var cmd wsDagClientCmd
|
||||
if err := wsjson.Read(ctx, conn, &cmd); err != nil {
|
||||
readErr <- err
|
||||
return
|
||||
}
|
||||
if cmd.Watermark.Runs > 0 || cmd.Watermark.Steps > 0 {
|
||||
runs, steps, wm, err := hub.fetchDelta(cmd.Watermark)
|
||||
if err == nil && (len(runs) > 0 || len(steps) > 0) {
|
||||
hub.setWatermark(wm)
|
||||
select {
|
||||
case sub.out <- wsDagMessage{
|
||||
Type: "delta",
|
||||
Watermark: wm,
|
||||
Runs: runs,
|
||||
Steps: steps,
|
||||
ServerTime: time.Now().Unix(),
|
||||
}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case err := <-readErr:
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
case msg, ok := <-sub.out:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
wctx, wcancel := context.WithTimeout(ctx, dagWSBroadcastTimeout)
|
||||
err := wsjson.Write(wctx, conn, msg)
|
||||
wcancel()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user