package main // WebSocket hub para live updates de dag_runs + dag_step_results. // Patron: sqlite_api/events.go (CallMonitorHub) — issue 0095. // // Diseño: // - Hub global con N subscribers WS. // - Ticker arranca solo con >=1 subscriber. Cero overhead si nadie mira. // - Cada tick (500ms): query rowid>watermark + activos (status running/pending) // + recientes finished (ultimos 5s) -> broadcast upsert. // - Snapshot inicial: lista de DAGs + ultimos 50 runs + step_results. // - El cliente trata `runs` y `steps` como upserts por id. import ( "context" "database/sql" "log" "net/http" "sync" "time" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" "dag-engine/store" ) const ( dagWSTickInterval = 500 * time.Millisecond dagWSTickIntervalIdle = 2 * time.Second dagWSIdleThreshold = 30 * time.Second dagWSSnapshotRuns = 50 dagWSBroadcastTimeout = 2 * time.Second dagWSRecentFinishedS = 5 ) type wsRun struct { ID string `json:"id"` DagName string `json:"dag_name"` DagPath string `json:"dag_path"` Status string `json:"status"` Trigger string `json:"trigger"` StartedAt string `json:"started_at"` FinishedAt string `json:"finished_at,omitempty"` Error string `json:"error,omitempty"` } type wsStep struct { ID string `json:"id"` RunID string `json:"run_id"` StepName string `json:"step_name"` Status string `json:"status"` ExitCode int `json:"exit_code"` Stdout string `json:"stdout,omitempty"` Stderr string `json:"stderr,omitempty"` StartedAt string `json:"started_at,omitempty"` FinishedAt string `json:"finished_at,omitempty"` DurationMs int64 `json:"duration_ms"` Error string `json:"error,omitempty"` } type wsWatermark struct { Runs int64 `json:"runs"` Steps int64 `json:"steps"` } type wsDagMessage struct { Type string `json:"type"` // snapshot|delta|ping Watermark wsWatermark `json:"watermark"` Dags []DagInfo `json:"dags,omitempty"` Runs []wsRun `json:"runs,omitempty"` Steps []wsStep `json:"steps,omitempty"` ServerTime int64 `json:"server_time"` } type wsDagClientCmd struct { Watermark wsWatermark `json:"watermark,omitempty"` } type dagSubscriber struct { conn *websocket.Conn ctx context.Context cancel context.CancelFunc out chan wsDagMessage watermark wsWatermark } // DagRunHub broadcastea cambios de dag_runs + dag_step_results a clientes WS. type DagRunHub struct { db *store.DB executor *Executor mu sync.Mutex subscribers map[*dagSubscriber]struct{} tickerStop chan struct{} tickerOn bool watermark wsWatermark lastEventAt time.Time } func NewDagRunHub(db *store.DB, executor *Executor) *DagRunHub { return &DagRunHub{ db: db, executor: executor, subscribers: make(map[*dagSubscriber]struct{}), } } func (h *DagRunHub) register(s *dagSubscriber) { h.mu.Lock() h.subscribers[s] = struct{}{} shouldStart := !h.tickerOn if shouldStart { h.tickerStop = make(chan struct{}) h.tickerOn = true h.lastEventAt = time.Now() } h.mu.Unlock() if shouldStart { go h.tickerLoop() } } func (h *DagRunHub) unregister(s *dagSubscriber) { h.mu.Lock() if _, ok := h.subscribers[s]; !ok { h.mu.Unlock() return } delete(h.subscribers, s) close(s.out) shouldStop := h.tickerOn && len(h.subscribers) == 0 if shouldStop { close(h.tickerStop) h.tickerOn = false } h.mu.Unlock() } func (h *DagRunHub) tickerLoop() { interval := dagWSTickInterval t := time.NewTimer(interval) defer t.Stop() for { select { case <-h.tickerStop: return case <-t.C: runs, steps, wm, err := h.fetchDelta(h.getWatermark()) if err != nil { log.Printf("[dagws] fetchDelta: %v", err) } else if len(runs) > 0 || len(steps) > 0 { h.setWatermark(wm) h.recordActivity() h.broadcast(wsDagMessage{ Type: "delta", Watermark: wm, Runs: runs, Steps: steps, ServerTime: time.Now().Unix(), }) } if time.Since(h.lastActivityAt()) > dagWSIdleThreshold { interval = dagWSTickIntervalIdle } else { interval = dagWSTickInterval } t.Reset(interval) } } } func (h *DagRunHub) getWatermark() wsWatermark { h.mu.Lock() defer h.mu.Unlock() return h.watermark } func (h *DagRunHub) setWatermark(v wsWatermark) { h.mu.Lock() if v.Runs > h.watermark.Runs { h.watermark.Runs = v.Runs } if v.Steps > h.watermark.Steps { h.watermark.Steps = v.Steps } h.mu.Unlock() } func (h *DagRunHub) recordActivity() { h.mu.Lock() h.lastEventAt = time.Now() h.mu.Unlock() } func (h *DagRunHub) lastActivityAt() time.Time { h.mu.Lock() defer h.mu.Unlock() return h.lastEventAt } // fetchDelta devuelve runs/steps con (rowid > watermark) OR (status in-flight) // OR (recently finished). Watermark devuelto = max rowid visto. func (h *DagRunHub) fetchDelta(since wsWatermark) ([]wsRun, []wsStep, wsWatermark, error) { conn := h.db.Conn() if conn == nil { return nil, nil, since, nil } cutoff := time.Now().Add(-time.Duration(dagWSRecentFinishedS) * time.Second).Format(time.RFC3339) runs, maxRuns, err := scanRuns(conn, ` SELECT rowid, id, dag_name, dag_path, status, trigger, started_at, COALESCE(finished_at,''), error FROM dag_runs WHERE rowid > ? OR status IN ('running','pending') OR (finished_at IS NOT NULL AND finished_at >= ?) ORDER BY rowid ASC`, since.Runs, cutoff) if err != nil { return nil, nil, since, err } steps, maxSteps, err := scanSteps(conn, ` SELECT rowid, id, run_id, step_name, status, exit_code, stdout, stderr, COALESCE(started_at,''), COALESCE(finished_at,''), duration_ms, error FROM dag_step_results WHERE rowid > ? OR status IN ('running','pending') OR (finished_at IS NOT NULL AND finished_at >= ?) ORDER BY rowid ASC`, since.Steps, cutoff) if err != nil { return runs, nil, since, err } out := wsWatermark{Runs: maxRuns, Steps: maxSteps} if out.Runs < since.Runs { out.Runs = since.Runs } if out.Steps < since.Steps { out.Steps = since.Steps } return runs, steps, out, nil } // fetchSnapshot devuelve DAGs + ultimos N runs + sus step_results + watermark. func (h *DagRunHub) fetchSnapshot() ([]DagInfo, []wsRun, []wsStep, wsWatermark, error) { dags, err := h.executor.ListDAGs() if err != nil { log.Printf("[dagws] list dags: %v", err) dags = nil } conn := h.db.Conn() if conn == nil { return dags, nil, nil, wsWatermark{}, nil } runs, maxRuns, err := scanRuns(conn, ` SELECT rowid, id, dag_name, dag_path, status, trigger, started_at, COALESCE(finished_at,''), error FROM dag_runs ORDER BY started_at DESC LIMIT ?`, dagWSSnapshotRuns) if err != nil { return dags, nil, nil, wsWatermark{}, err } steps, maxSteps, err := scanSteps(conn, ` SELECT rowid, id, run_id, step_name, status, exit_code, stdout, stderr, COALESCE(started_at,''), COALESCE(finished_at,''), duration_ms, error FROM dag_step_results WHERE run_id IN (SELECT id FROM dag_runs ORDER BY started_at DESC LIMIT ?) ORDER BY rowid ASC`, dagWSSnapshotRuns) if err != nil { return dags, runs, nil, wsWatermark{Runs: maxRuns}, err } return dags, runs, steps, wsWatermark{Runs: maxRuns, Steps: maxSteps}, nil } func scanRuns(conn *sql.DB, q string, args ...any) ([]wsRun, int64, error) { rows, err := conn.Query(q, args...) if err != nil { return nil, 0, err } defer rows.Close() var out []wsRun var max int64 for rows.Next() { var r wsRun var rowid int64 if err := rows.Scan(&rowid, &r.ID, &r.DagName, &r.DagPath, &r.Status, &r.Trigger, &r.StartedAt, &r.FinishedAt, &r.Error); err != nil { return nil, 0, err } if rowid > max { max = rowid } out = append(out, r) } return out, max, rows.Err() } func scanSteps(conn *sql.DB, q string, args ...any) ([]wsStep, int64, error) { rows, err := conn.Query(q, args...) if err != nil { return nil, 0, err } defer rows.Close() var out []wsStep var max int64 for rows.Next() { var s wsStep var rowid int64 if err := rows.Scan(&rowid, &s.ID, &s.RunID, &s.StepName, &s.Status, &s.ExitCode, &s.Stdout, &s.Stderr, &s.StartedAt, &s.FinishedAt, &s.DurationMs, &s.Error); err != nil { return nil, 0, err } if rowid > max { max = rowid } out = append(out, s) } return out, max, rows.Err() } func (h *DagRunHub) broadcast(msg wsDagMessage) { h.mu.Lock() subs := make([]*dagSubscriber, 0, len(h.subscribers)) for s := range h.subscribers { subs = append(subs, s) } h.mu.Unlock() for _, s := range subs { select { case s.out <- msg: default: log.Printf("[dagws] dropping frame for slow subscriber") } } } // handleDagRunsWS upgrade WS y gestiona lifecycle. // Endpoint: GET /api/ws/dagruns func handleDagRunsWS(hub *DagRunHub) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ InsecureSkipVerify: true, }) if err != nil { log.Printf("[dagws] accept: %v", err) return } defer conn.Close(websocket.StatusInternalError, "closing") ctx, cancel := context.WithCancel(r.Context()) defer cancel() sub := &dagSubscriber{ conn: conn, ctx: ctx, cancel: cancel, out: make(chan wsDagMessage, 64), } hub.register(sub) defer hub.unregister(sub) dags, runs, steps, wm, err := hub.fetchSnapshot() if err != nil { log.Printf("[dagws] snapshot: %v", err) conn.Close(websocket.StatusInternalError, "snapshot failed") return } hub.setWatermark(wm) initial := wsDagMessage{ Type: "snapshot", Watermark: wm, Dags: dags, Runs: runs, Steps: steps, ServerTime: time.Now().Unix(), } if err := wsjson.Write(ctx, conn, initial); err != nil { return } readErr := make(chan error, 1) go func() { for { var cmd wsDagClientCmd if err := wsjson.Read(ctx, conn, &cmd); err != nil { readErr <- err return } if cmd.Watermark.Runs > 0 || cmd.Watermark.Steps > 0 { runs, steps, wm, err := hub.fetchDelta(cmd.Watermark) if err == nil && (len(runs) > 0 || len(steps) > 0) { hub.setWatermark(wm) select { case sub.out <- wsDagMessage{ Type: "delta", Watermark: wm, Runs: runs, Steps: steps, ServerTime: time.Now().Unix(), }: default: } } } } }() for { select { case <-ctx.Done(): return case err := <-readErr: if err != nil { return } case msg, ok := <-sub.out: if !ok { return } wctx, wcancel := context.WithTimeout(ctx, dagWSBroadcastTimeout) err := wsjson.Write(wctx, conn, msg) wcancel() if err != nil { return } } } } }