From 7ecbee1175c6ea47a3788575ad41f6eeb15d140b Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Fri, 15 May 2026 16:36:34 +0200 Subject: [PATCH] feat(dag_engine): WS hub /api/ws/dagruns + migracion DAGs desde dagu - events.go: DagRunHub broadcastea snapshot+deltas live (500ms tick, 5s recent finished window) sobre dag_runs + dag_step_results. - api.go: handler GET /api/ws/dagruns upgrade WS, opt-in en RegisterAPI. - store.go: expone Conn() para read-only desde el hub. - main.go: construye DagRunHub al arrancar server. - dags_migrated/: 5 YAMLs migrados desde ~/dagu/dags tras desinstalar dagu (issue 0095 step 1). Smoke: snapshot inicial OK, trigger /api/dags/test_claude_access/run -> delta WS observa 3 step_results + run success en <1s. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/dag_engine/api.go | 7 +- apps/dag_engine/dags_migrated/example.yaml | 23 + .../example_lineage_tracking.yaml | 178 +++++++ apps/dag_engine/dags_migrated/fn_backup.yaml | 21 + .../revision_viernes_finanzas.yaml | 51 ++ .../dags_migrated/test_claude_access.yaml | 23 + apps/dag_engine/events.go | 438 ++++++++++++++++++ apps/dag_engine/go.mod | 15 +- apps/dag_engine/go.sum | 28 +- apps/dag_engine/main.go | 3 +- apps/dag_engine/store/store.go | 6 + 11 files changed, 775 insertions(+), 18 deletions(-) create mode 100644 apps/dag_engine/dags_migrated/example.yaml create mode 100644 apps/dag_engine/dags_migrated/example_lineage_tracking.yaml create mode 100644 apps/dag_engine/dags_migrated/fn_backup.yaml create mode 100644 apps/dag_engine/dags_migrated/revision_viernes_finanzas.yaml create mode 100644 apps/dag_engine/dags_migrated/test_claude_access.yaml create mode 100644 apps/dag_engine/events.go diff --git a/apps/dag_engine/api.go b/apps/dag_engine/api.go index e112f30e..5b8abab2 100644 --- a/apps/dag_engine/api.go +++ b/apps/dag_engine/api.go @@ -6,7 +6,7 @@ import ( ) // RegisterAPI sets up all HTTP routes on the given mux. -func RegisterAPI(mux *http.ServeMux, executor *Executor, scheduler *Scheduler, frontendFS fs.FS) { +func RegisterAPI(mux *http.ServeMux, executor *Executor, scheduler *Scheduler, hub *DagRunHub, frontendFS fs.FS) { // API routes. mux.HandleFunc("GET /api/dags", handleListDags(executor)) mux.HandleFunc("GET /api/dags/{name}", handleGetDag(executor)) @@ -19,6 +19,11 @@ func RegisterAPI(mux *http.ServeMux, executor *Executor, scheduler *Scheduler, f mux.HandleFunc("POST /api/scheduler/stop", handleSchedulerStop(scheduler)) mux.HandleFunc("GET /api/scheduler/status", handleSchedulerStatus(scheduler)) + // Live updates (WS hub). + if hub != nil { + mux.HandleFunc("GET /api/ws/dagruns", handleDagRunsWS(hub)) + } + // Frontend SPA fallback. if frontendFS != nil { mux.Handle("/", spaHandler(frontendFS)) diff --git a/apps/dag_engine/dags_migrated/example.yaml b/apps/dag_engine/dags_migrated/example.yaml new file mode 100644 index 00000000..5d1ea9f9 --- /dev/null +++ b/apps/dag_engine/dags_migrated/example.yaml @@ -0,0 +1,23 @@ +# Example Dagu DAG +# This is a simple example workflow + +name: example +description: Example workflow to demonstrate Dagu capabilities + +schedule: + # Run every day at 9:00 AM + - "0 9 * * *" + +steps: + - name: hello + command: echo "Hello from Dagu!" + + - name: list_files + command: ls -la /home/lucas/dagu/scripts + depends: + - hello + + - name: date + command: date + depends: + - hello diff --git a/apps/dag_engine/dags_migrated/example_lineage_tracking.yaml b/apps/dag_engine/dags_migrated/example_lineage_tracking.yaml new file mode 100644 index 00000000..6e1934aa --- /dev/null +++ b/apps/dag_engine/dags_migrated/example_lineage_tracking.yaml @@ -0,0 +1,178 @@ +name: example_lineage_tracking +description: | + Ejemplo completo de pipeline con lineage tracking usando marquez-cli. + + Este DAG demuestra: + - Generación de Run ID único + - Eventos START, RUNNING, COMPLETE + - Tracking de inputs/outputs en cada paso + - Manejo de errores con evento FAIL + +tags: + - example + - lineage + - marquez + +schedule: + - "0 */6 * * *" # Cada 6 horas + +env: + - MARQUEZ_URL: http://localhost:5000 + - MARQUEZ_NAMESPACE: automatic-process + - JOB_NAME: example_lineage_tracking + - RUN_ID: "" + +steps: + # PASO 0: Generar Run ID único para todo el pipeline + - name: init_run_id + description: Generate unique Run ID for this execution + command: | + RUN_ID=$(uuidgen) + echo "RUN_ID=$RUN_ID" >> $DAGU_ENV + echo "Generated Run ID: $RUN_ID" + + # PASO 1: START event + - name: start_run + description: Send START event to Marquez + command: | + marquez-cli run start \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -namespace $MARQUEZ_NAMESPACE \ + -inputs "api://jsonplaceholder.typicode.com/users" + + echo "✓ Run started with ID: $RUN_ID" + depends: + - init_run_id + + # PASO 2: Extract - Fetch data from API + - name: extract_data + description: Fetch data from external API + command: | + echo "Fetching data from API..." + curl -s https://jsonplaceholder.typicode.com/users > /tmp/lineage_users.json + + marquez-cli run running \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -namespace $MARQUEZ_NAMESPACE \ + -inputs "api://jsonplaceholder.typicode.com/users" \ + -outputs "file:///tmp/lineage_users.json" + + echo "✓ Data extracted: $(cat /tmp/lineage_users.json | jq '. | length') records" + depends: + - start_run + + # PASO 3: Transform - Clean and transform data + - name: transform_data + description: Transform and clean the data + command: | + echo "Transforming data..." + jq '[.[] | {email: .email, name: .name, company: .company.name}]' \ + /tmp/lineage_users.json > /tmp/lineage_users_clean.json + + marquez-cli run running \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -namespace $MARQUEZ_NAMESPACE \ + -inputs "file:///tmp/lineage_users.json" \ + -outputs "file:///tmp/lineage_users_clean.json" + + echo "✓ Data transformed: $(cat /tmp/lineage_users_clean.json | jq '. | length') records" + depends: + - extract_data + + # PASO 4: Load - Save to PostgreSQL + - name: load_data + description: Load data to PostgreSQL + command: | + echo "Loading data to PostgreSQL..." + + # Crear tabla si no existe + psql -h localhost -p 5434 -U postgres -d postgres -c " + CREATE TABLE IF NOT EXISTS lineage_example ( + email TEXT, + name TEXT, + company TEXT, + loaded_at TIMESTAMP DEFAULT NOW() + ); + " + + # Truncar tabla + psql -h localhost -p 5434 -U postgres -d postgres -c "TRUNCATE TABLE lineage_example;" + + # Cargar datos + jq -r '.[] | [.email, .name, .company] | @csv' /tmp/lineage_users_clean.json | \ + psql -h localhost -p 5434 -U postgres -d postgres -c " + COPY lineage_example (email, name, company) FROM STDIN WITH CSV; + " + + RECORD_COUNT=$(psql -h localhost -p 5434 -U postgres -d postgres -t -c "SELECT COUNT(*) FROM lineage_example;") + + marquez-cli run running \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -namespace $MARQUEZ_NAMESPACE \ + -inputs "file:///tmp/lineage_users_clean.json" \ + -outputs "postgres://localhost:5434/postgres/public/lineage_example" + + echo "✓ Data loaded: $(echo $RECORD_COUNT | xargs) records" + depends: + - transform_data + + # PASO 5: COMPLETE event + - name: complete_run + description: Mark run as completed in Marquez + command: | + marquez-cli run complete \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -namespace $MARQUEZ_NAMESPACE \ + -inputs "api://jsonplaceholder.typicode.com/users" \ + -outputs "postgres://localhost:5434/postgres/public/lineage_example" + + echo "✓ Run completed successfully: $RUN_ID" + echo "" + echo "Verify lineage at: http://localhost:3001" + echo "Or run: marquez-cli lineage -name 'postgres://localhost:5434/postgres/public/lineage_example'" + depends: + - load_data + + # PASO 6: Cleanup temporary files + - name: cleanup + description: Remove temporary files + command: | + rm -f /tmp/lineage_users.json /tmp/lineage_users_clean.json + echo "✓ Temporary files cleaned" + depends: + - complete_run + +# Handler para errores +handlers: + failure: + - name: mark_as_failed + command: | + echo "❌ Pipeline failed, marking run as FAILED in Marquez" + + if [ -n "$RUN_ID" ]; then + marquez-cli run fail \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -namespace $MARQUEZ_NAMESPACE + + echo "✓ Run marked as FAILED: $RUN_ID" + else + echo "⚠ No RUN_ID found, skipping FAIL event" + fi + + success: + - name: notify_success + command: | + echo "🎉 Pipeline completed successfully!" + echo "Run ID: $RUN_ID" + echo "View lineage: http://localhost:3001" + +# Configuración de logs +logCleanup: + enabled: true + retentionDays: 7 diff --git a/apps/dag_engine/dags_migrated/fn_backup.yaml b/apps/dag_engine/dags_migrated/fn_backup.yaml new file mode 100644 index 00000000..69ebac8d --- /dev/null +++ b/apps/dag_engine/dags_migrated/fn_backup.yaml @@ -0,0 +1,21 @@ +name: fn_backup +description: Backup diario de fn_registry (registry.db + operations.db + vaults) + +schedule: + - "0 3 * * *" + +env: + - FN_REGISTRY_ROOT: /home/lucas/fn_registry + - BACKUP_ROOT: /home/lucas/backups/fn_registry + +steps: + - name: ensure_dirs + command: mkdir -p ${BACKUP_ROOT} + + - name: run_backup_all + command: bash /home/lucas/fn_registry/bash/functions/pipelines/backup_all.sh ${BACKUP_ROOT} + continue_on: + exit_code: [4] + + - name: report_status + command: bash -c 'ls -lh ${BACKUP_ROOT}/registry/daily.0 ${BACKUP_ROOT}/operations/*/daily.0 2>/dev/null | tail -20' diff --git a/apps/dag_engine/dags_migrated/revision_viernes_finanzas.yaml b/apps/dag_engine/dags_migrated/revision_viernes_finanzas.yaml new file mode 100644 index 00000000..6106e8d4 --- /dev/null +++ b/apps/dag_engine/dags_migrated/revision_viernes_finanzas.yaml @@ -0,0 +1,51 @@ +name: revision-viernes-finanzas +description: Revisión semanal de finanzas personales - ingesta, informe y push a Gitea +tags: [finanzas, semanal] +type: graph + +schedule: "0 9 * * 5" + +env: + - PROJECT_DIR: /home/lucas/analysis/finanzas_personales + - PYTHON: /home/lucas/analysis/finanzas_personales/.venv/bin/python + +handler_on: + failure: + command: echo "[$(date)] FALLÓ revision-viernes-finanzas" >> /home/lucas/dagu/logs/failures.log + +steps: + - id: ingest + description: Procesar archivos nuevos del inbox (BBVA xlsx + Revolut csv) + working_dir: ${PROJECT_DIR} + command: ./bin/ingest -skip-notebooks + continue_on: + failure: true + + - id: informe + description: Generar informe semanal de cumplimiento del presupuesto + command: ${PYTHON} /home/lucas/dagu/scripts/informe_finanzas.py + depends: [ingest] + + - id: git_push + description: Commit y push del informe a Gitea + working_dir: ${PROJECT_DIR} + script: | + #!/bin/bash + set -euo pipefail + + if git diff --quiet data/04_output/informe_semanal.md 2>/dev/null && \ + ! git ls-files --others --exclude-standard | grep -q informe_semanal.md; then + echo "Sin cambios en el informe, skip push" + exit 0 + fi + + git add data/04_output/informe_semanal.md + git add data/03_processed/ 2>/dev/null || true + + git commit -m "Informe semanal $(date +%Y-%m-%d) + + Co-Authored-By: Dagu Automation " + + git push origin master:main + echo "Push completado" + depends: [informe] diff --git a/apps/dag_engine/dags_migrated/test_claude_access.yaml b/apps/dag_engine/dags_migrated/test_claude_access.yaml new file mode 100644 index 00000000..875d6e96 --- /dev/null +++ b/apps/dag_engine/dags_migrated/test_claude_access.yaml @@ -0,0 +1,23 @@ +name: test_claude_access +description: Test workflow created by Claude to verify access + +tags: + - test + - claude + +steps: + - name: verify_access + command: echo "✓ Claude tiene acceso completo para gestionar tus pipelines de Dagu!" + + - name: show_info + command: | + echo "Usuario: $(whoami)" + echo "Fecha: $(date)" + echo "Directorio: $(pwd)" + depends: + - verify_access + + - name: cleanup + command: echo "Pipeline de prueba completado exitosamente" + depends: + - show_info diff --git a/apps/dag_engine/events.go b/apps/dag_engine/events.go new file mode 100644 index 00000000..d19f1a17 --- /dev/null +++ b/apps/dag_engine/events.go @@ -0,0 +1,438 @@ +package main + +// WebSocket hub para live updates de dag_runs + dag_step_results. +// Patron: sqlite_api/events.go (CallMonitorHub) — issue 0095. +// +// Diseño: +// - Hub global con N subscribers WS. +// - Ticker arranca solo con >=1 subscriber. Cero overhead si nadie mira. +// - Cada tick (500ms): query rowid>watermark + activos (status running/pending) +// + recientes finished (ultimos 5s) -> broadcast upsert. +// - Snapshot inicial: lista de DAGs + ultimos 50 runs + step_results. +// - El cliente trata `runs` y `steps` como upserts por id. + +import ( + "context" + "database/sql" + "log" + "net/http" + "sync" + "time" + + "nhooyr.io/websocket" + "nhooyr.io/websocket/wsjson" + + "dag-engine/store" +) + +const ( + dagWSTickInterval = 500 * time.Millisecond + dagWSTickIntervalIdle = 2 * time.Second + dagWSIdleThreshold = 30 * time.Second + dagWSSnapshotRuns = 50 + dagWSBroadcastTimeout = 2 * time.Second + dagWSRecentFinishedS = 5 +) + +type wsRun struct { + ID string `json:"id"` + DagName string `json:"dag_name"` + DagPath string `json:"dag_path"` + Status string `json:"status"` + Trigger string `json:"trigger"` + StartedAt string `json:"started_at"` + FinishedAt string `json:"finished_at,omitempty"` + Error string `json:"error,omitempty"` +} + +type wsStep struct { + ID string `json:"id"` + RunID string `json:"run_id"` + StepName string `json:"step_name"` + Status string `json:"status"` + ExitCode int `json:"exit_code"` + Stdout string `json:"stdout,omitempty"` + Stderr string `json:"stderr,omitempty"` + StartedAt string `json:"started_at,omitempty"` + FinishedAt string `json:"finished_at,omitempty"` + DurationMs int64 `json:"duration_ms"` + Error string `json:"error,omitempty"` +} + +type wsWatermark struct { + Runs int64 `json:"runs"` + Steps int64 `json:"steps"` +} + +type wsDagMessage struct { + Type string `json:"type"` // snapshot|delta|ping + Watermark wsWatermark `json:"watermark"` + Dags []DagInfo `json:"dags,omitempty"` + Runs []wsRun `json:"runs,omitempty"` + Steps []wsStep `json:"steps,omitempty"` + ServerTime int64 `json:"server_time"` +} + +type wsDagClientCmd struct { + Watermark wsWatermark `json:"watermark,omitempty"` +} + +type dagSubscriber struct { + conn *websocket.Conn + ctx context.Context + cancel context.CancelFunc + out chan wsDagMessage + watermark wsWatermark +} + +// DagRunHub broadcastea cambios de dag_runs + dag_step_results a clientes WS. +type DagRunHub struct { + db *store.DB + executor *Executor + + mu sync.Mutex + subscribers map[*dagSubscriber]struct{} + tickerStop chan struct{} + tickerOn bool + watermark wsWatermark + lastEventAt time.Time +} + +func NewDagRunHub(db *store.DB, executor *Executor) *DagRunHub { + return &DagRunHub{ + db: db, + executor: executor, + subscribers: make(map[*dagSubscriber]struct{}), + } +} + +func (h *DagRunHub) register(s *dagSubscriber) { + h.mu.Lock() + h.subscribers[s] = struct{}{} + shouldStart := !h.tickerOn + if shouldStart { + h.tickerStop = make(chan struct{}) + h.tickerOn = true + h.lastEventAt = time.Now() + } + h.mu.Unlock() + + if shouldStart { + go h.tickerLoop() + } +} + +func (h *DagRunHub) unregister(s *dagSubscriber) { + h.mu.Lock() + if _, ok := h.subscribers[s]; !ok { + h.mu.Unlock() + return + } + delete(h.subscribers, s) + close(s.out) + shouldStop := h.tickerOn && len(h.subscribers) == 0 + if shouldStop { + close(h.tickerStop) + h.tickerOn = false + } + h.mu.Unlock() +} + +func (h *DagRunHub) tickerLoop() { + interval := dagWSTickInterval + t := time.NewTimer(interval) + defer t.Stop() + for { + select { + case <-h.tickerStop: + return + case <-t.C: + runs, steps, wm, err := h.fetchDelta(h.getWatermark()) + if err != nil { + log.Printf("[dagws] fetchDelta: %v", err) + } else if len(runs) > 0 || len(steps) > 0 { + h.setWatermark(wm) + h.recordActivity() + h.broadcast(wsDagMessage{ + Type: "delta", + Watermark: wm, + Runs: runs, + Steps: steps, + ServerTime: time.Now().Unix(), + }) + } + + if time.Since(h.lastActivityAt()) > dagWSIdleThreshold { + interval = dagWSTickIntervalIdle + } else { + interval = dagWSTickInterval + } + t.Reset(interval) + } + } +} + +func (h *DagRunHub) getWatermark() wsWatermark { + h.mu.Lock() + defer h.mu.Unlock() + return h.watermark +} + +func (h *DagRunHub) setWatermark(v wsWatermark) { + h.mu.Lock() + if v.Runs > h.watermark.Runs { + h.watermark.Runs = v.Runs + } + if v.Steps > h.watermark.Steps { + h.watermark.Steps = v.Steps + } + h.mu.Unlock() +} + +func (h *DagRunHub) recordActivity() { + h.mu.Lock() + h.lastEventAt = time.Now() + h.mu.Unlock() +} + +func (h *DagRunHub) lastActivityAt() time.Time { + h.mu.Lock() + defer h.mu.Unlock() + return h.lastEventAt +} + +// fetchDelta devuelve runs/steps con (rowid > watermark) OR (status in-flight) +// OR (recently finished). Watermark devuelto = max rowid visto. +func (h *DagRunHub) fetchDelta(since wsWatermark) ([]wsRun, []wsStep, wsWatermark, error) { + conn := h.db.Conn() + if conn == nil { + return nil, nil, since, nil + } + cutoff := time.Now().Add(-time.Duration(dagWSRecentFinishedS) * time.Second).Format(time.RFC3339) + + runs, maxRuns, err := scanRuns(conn, ` + SELECT rowid, id, dag_name, dag_path, status, trigger, started_at, + COALESCE(finished_at,''), error + FROM dag_runs + WHERE rowid > ? + OR status IN ('running','pending') + OR (finished_at IS NOT NULL AND finished_at >= ?) + ORDER BY rowid ASC`, since.Runs, cutoff) + if err != nil { + return nil, nil, since, err + } + + steps, maxSteps, err := scanSteps(conn, ` + SELECT rowid, id, run_id, step_name, status, exit_code, stdout, stderr, + COALESCE(started_at,''), COALESCE(finished_at,''), duration_ms, error + FROM dag_step_results + WHERE rowid > ? + OR status IN ('running','pending') + OR (finished_at IS NOT NULL AND finished_at >= ?) + ORDER BY rowid ASC`, since.Steps, cutoff) + if err != nil { + return runs, nil, since, err + } + + out := wsWatermark{Runs: maxRuns, Steps: maxSteps} + if out.Runs < since.Runs { + out.Runs = since.Runs + } + if out.Steps < since.Steps { + out.Steps = since.Steps + } + return runs, steps, out, nil +} + +// fetchSnapshot devuelve DAGs + ultimos N runs + sus step_results + watermark. +func (h *DagRunHub) fetchSnapshot() ([]DagInfo, []wsRun, []wsStep, wsWatermark, error) { + dags, err := h.executor.ListDAGs() + if err != nil { + log.Printf("[dagws] list dags: %v", err) + dags = nil + } + conn := h.db.Conn() + if conn == nil { + return dags, nil, nil, wsWatermark{}, nil + } + + runs, maxRuns, err := scanRuns(conn, ` + SELECT rowid, id, dag_name, dag_path, status, trigger, started_at, + COALESCE(finished_at,''), error + FROM dag_runs + ORDER BY started_at DESC + LIMIT ?`, dagWSSnapshotRuns) + if err != nil { + return dags, nil, nil, wsWatermark{}, err + } + + steps, maxSteps, err := scanSteps(conn, ` + SELECT rowid, id, run_id, step_name, status, exit_code, stdout, stderr, + COALESCE(started_at,''), COALESCE(finished_at,''), duration_ms, error + FROM dag_step_results + WHERE run_id IN (SELECT id FROM dag_runs ORDER BY started_at DESC LIMIT ?) + ORDER BY rowid ASC`, dagWSSnapshotRuns) + if err != nil { + return dags, runs, nil, wsWatermark{Runs: maxRuns}, err + } + + return dags, runs, steps, wsWatermark{Runs: maxRuns, Steps: maxSteps}, nil +} + +func scanRuns(conn *sql.DB, q string, args ...any) ([]wsRun, int64, error) { + rows, err := conn.Query(q, args...) + if err != nil { + return nil, 0, err + } + defer rows.Close() + var out []wsRun + var max int64 + for rows.Next() { + var r wsRun + var rowid int64 + if err := rows.Scan(&rowid, &r.ID, &r.DagName, &r.DagPath, &r.Status, + &r.Trigger, &r.StartedAt, &r.FinishedAt, &r.Error); err != nil { + return nil, 0, err + } + if rowid > max { + max = rowid + } + out = append(out, r) + } + return out, max, rows.Err() +} + +func scanSteps(conn *sql.DB, q string, args ...any) ([]wsStep, int64, error) { + rows, err := conn.Query(q, args...) + if err != nil { + return nil, 0, err + } + defer rows.Close() + var out []wsStep + var max int64 + for rows.Next() { + var s wsStep + var rowid int64 + if err := rows.Scan(&rowid, &s.ID, &s.RunID, &s.StepName, &s.Status, + &s.ExitCode, &s.Stdout, &s.Stderr, &s.StartedAt, &s.FinishedAt, + &s.DurationMs, &s.Error); err != nil { + return nil, 0, err + } + if rowid > max { + max = rowid + } + out = append(out, s) + } + return out, max, rows.Err() +} + +func (h *DagRunHub) broadcast(msg wsDagMessage) { + h.mu.Lock() + subs := make([]*dagSubscriber, 0, len(h.subscribers)) + for s := range h.subscribers { + subs = append(subs, s) + } + h.mu.Unlock() + + for _, s := range subs { + select { + case s.out <- msg: + default: + log.Printf("[dagws] dropping frame for slow subscriber") + } + } +} + +// handleDagRunsWS upgrade WS y gestiona lifecycle. +// Endpoint: GET /api/ws/dagruns +func handleDagRunsWS(hub *DagRunHub) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ + InsecureSkipVerify: true, + }) + if err != nil { + log.Printf("[dagws] accept: %v", err) + return + } + defer conn.Close(websocket.StatusInternalError, "closing") + + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + + sub := &dagSubscriber{ + conn: conn, + ctx: ctx, + cancel: cancel, + out: make(chan wsDagMessage, 64), + } + hub.register(sub) + defer hub.unregister(sub) + + dags, runs, steps, wm, err := hub.fetchSnapshot() + if err != nil { + log.Printf("[dagws] snapshot: %v", err) + conn.Close(websocket.StatusInternalError, "snapshot failed") + return + } + hub.setWatermark(wm) + initial := wsDagMessage{ + Type: "snapshot", + Watermark: wm, + Dags: dags, + Runs: runs, + Steps: steps, + ServerTime: time.Now().Unix(), + } + if err := wsjson.Write(ctx, conn, initial); err != nil { + return + } + + readErr := make(chan error, 1) + go func() { + for { + var cmd wsDagClientCmd + if err := wsjson.Read(ctx, conn, &cmd); err != nil { + readErr <- err + return + } + if cmd.Watermark.Runs > 0 || cmd.Watermark.Steps > 0 { + runs, steps, wm, err := hub.fetchDelta(cmd.Watermark) + if err == nil && (len(runs) > 0 || len(steps) > 0) { + hub.setWatermark(wm) + select { + case sub.out <- wsDagMessage{ + Type: "delta", + Watermark: wm, + Runs: runs, + Steps: steps, + ServerTime: time.Now().Unix(), + }: + default: + } + } + } + } + }() + + for { + select { + case <-ctx.Done(): + return + case err := <-readErr: + if err != nil { + return + } + case msg, ok := <-sub.out: + if !ok { + return + } + wctx, wcancel := context.WithTimeout(ctx, dagWSBroadcastTimeout) + err := wsjson.Write(wctx, conn, msg) + wcancel() + if err != nil { + return + } + } + } + } +} diff --git a/apps/dag_engine/go.mod b/apps/dag_engine/go.mod index 5eeabd69..a5d64fe8 100644 --- a/apps/dag_engine/go.mod +++ b/apps/dag_engine/go.mod @@ -5,6 +5,7 @@ go 1.25.0 require ( fn-registry v0.0.0-00010101000000-000000000000 github.com/mattn/go-sqlite3 v1.14.37 + nhooyr.io/websocket v1.8.17 ) require ( @@ -28,19 +29,21 @@ require ( github.com/marcboeker/go-duckdb v1.8.5 // indirect github.com/paulmach/orb v0.12.0 // indirect github.com/pierrec/lz4/v4 v4.1.25 // indirect - github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/segmentio/asm v1.2.1 // indirect github.com/shopspring/decimal v1.4.0 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/otel v1.41.0 // indirect go.opentelemetry.io/otel/trace v1.41.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/crypto v0.50.0 // indirect golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect - golang.org/x/mod v0.27.0 // indirect - golang.org/x/sync v0.19.0 // indirect - golang.org/x/sys v0.41.0 // indirect - golang.org/x/text v0.29.0 // indirect - golang.org/x/tools v0.36.0 // indirect + golang.org/x/mod v0.34.0 // indirect + golang.org/x/net v0.53.0 // indirect + golang.org/x/sync v0.20.0 // indirect + golang.org/x/sys v0.43.0 // indirect + golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c // indirect + golang.org/x/text v0.36.0 // indirect + golang.org/x/tools v0.43.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/apps/dag_engine/go.sum b/apps/dag_engine/go.sum index d8b66118..6cc78806 100644 --- a/apps/dag_engine/go.sum +++ b/apps/dag_engine/go.sum @@ -111,44 +111,50 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= +golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc= golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ= -golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc= +golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI= +golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= +golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= -golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c h1:6a8FdnNk6bTXBjR4AGKFgUKuo+7GnR3FX5L7CbveeZc= +golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c/go.mod h1:TpUTTEp9frx7rTdLpC9gFG9kdI7zVLFTFFlqaH2Cncw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= -golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg= -golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s= +golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s= +golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -166,3 +172,5 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y= +nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/apps/dag_engine/main.go b/apps/dag_engine/main.go index a3498001..a544c198 100644 --- a/apps/dag_engine/main.go +++ b/apps/dag_engine/main.go @@ -286,6 +286,7 @@ func cmdServer(args []string) { executor := NewExecutor(db, cfg.DagsDir) scheduler := NewScheduler(executor, cfg.DagsDir) + dagRunHub := NewDagRunHub(db, executor) // Prepare frontend FS. var feFS iofs.FS @@ -303,7 +304,7 @@ func cmdServer(args []string) { } mux := http.NewServeMux() - RegisterAPI(mux, executor, scheduler, feFS) + RegisterAPI(mux, executor, scheduler, dagRunHub, feFS) handler := corsMiddleware(loggingMiddleware(mux)) diff --git a/apps/dag_engine/store/store.go b/apps/dag_engine/store/store.go index 9f049062..f2b605bb 100644 --- a/apps/dag_engine/store/store.go +++ b/apps/dag_engine/store/store.go @@ -36,6 +36,12 @@ func (db *DB) Close() error { return db.conn.Close() } +// Conn exposes the underlying *sql.DB for read-only queries from other +// packages (e.g. WS hub in events.go). Do not Close() the returned conn. +func (db *DB) Conn() *sql.DB { + return db.conn +} + // --- DagRun CRUD --- // DagRun mirrors infra.DagRun for the store layer.