package main // WebSocket event hub for data_factory.db `runs` table (issue 0097). // // Mirrors CallMonitorHub in events.go but watches `runs` of data_factory.db. // Lazy: ticker only runs while >=1 subscriber is connected. // Snapshot at connect = last 50 runs + active nodes + watermark. // Delta every wsTickInterval = rows with rowid > watermark. // // Reuses subscriber/wsMessage/clientCmd from events.go — those types are // generic enough for any append-only event stream. import ( "context" "database/sql" "log" "net/http" "sync" "time" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" ) const dfSnapshotLimit = 50 // dfNode is the JSON shape served by the snapshot's `nodes` field. type dfNode struct { ID string `json:"id"` Kind string `json:"kind"` Name string `json:"name"` FunctionID string `json:"function_id"` Description string `json:"description"` Enabled bool `json:"enabled"` TagsCSV string `json:"tags_csv"` } // dfRun is the JSON contract for snapshot.runs and delta.runs. type dfRun struct { Rowid int64 `json:"rowid"` ID string `json:"id"` NodeID string `json:"node_id"` StartedAt string `json:"started_at"` FinishedAt string `json:"finished_at"` Status string `json:"status"` RowsIn int64 `json:"rows_in"` RowsOut int64 `json:"rows_out"` KbIn int64 `json:"kb_in"` KbOut int64 `json:"kb_out"` DurationMS int64 `json:"duration_ms"` Trigger string `json:"trigger"` Error string `json:"error"` } // dfMessage is what travels on the WS. Distinct from wsMessage to avoid // schema collision with call_monitor's KPIs/rows. type dfMessage struct { Type string `json:"type"` // "snapshot" | "delta" | "ping" Watermark int64 `json:"watermark,omitempty"` Nodes []dfNode `json:"nodes,omitempty"` Runs []dfRun `json:"runs,omitempty"` ServerTime int64 `json:"server_time"` } type dfSubscriber struct { conn *websocket.Conn ctx context.Context cancel context.CancelFunc out chan dfMessage watermark int64 } // DataFactoryHub gestiona broadcast WS sobre `runs` table de data_factory.db. // // Holds a direct *sql.DB pointer (lazy-opened RW by Server). NOT pulled from // DBPool because data_factory.db is opened by datafactory.Open(), not by the // pool's read-only discovery flow. type DataFactoryHub struct { getDB func() (*sql.DB, error) mu sync.Mutex subscribers map[*dfSubscriber]struct{} tickerStop chan struct{} tickerOn bool watermark int64 lastEventAt time.Time } func NewDataFactoryHub(getDB func() (*sql.DB, error)) *DataFactoryHub { return &DataFactoryHub{ getDB: getDB, subscribers: make(map[*dfSubscriber]struct{}), } } func (h *DataFactoryHub) register(s *dfSubscriber) { h.mu.Lock() h.subscribers[s] = struct{}{} shouldStart := !h.tickerOn if shouldStart { h.tickerStop = make(chan struct{}) h.tickerOn = true h.lastEventAt = time.Now() } h.mu.Unlock() if shouldStart { go h.tickerLoop() } } func (h *DataFactoryHub) unregister(s *dfSubscriber) { h.mu.Lock() if _, ok := h.subscribers[s]; !ok { h.mu.Unlock() return } delete(h.subscribers, s) close(s.out) shouldStop := h.tickerOn && len(h.subscribers) == 0 if shouldStop { close(h.tickerStop) h.tickerOn = false } h.mu.Unlock() } func (h *DataFactoryHub) tickerLoop() { interval := wsTickInterval t := time.NewTimer(interval) defer t.Stop() for { select { case <-h.tickerStop: return case <-t.C: rows, max, err := h.fetchSince(h.getWatermark()) if err != nil { log.Printf("[ws-df] fetchSince: %v", err) } else if len(rows) > 0 { h.setWatermark(max) h.recordActivity() h.broadcast(dfMessage{ Type: "delta", Watermark: max, Runs: rows, ServerTime: time.Now().Unix(), }) } if time.Since(h.lastActivityAt()) > wsIdleThreshold { interval = wsTickIntervalIdle } else { interval = wsTickInterval } t.Reset(interval) } } } func (h *DataFactoryHub) getWatermark() int64 { h.mu.Lock() defer h.mu.Unlock() return h.watermark } func (h *DataFactoryHub) setWatermark(v int64) { h.mu.Lock() if v > h.watermark { h.watermark = v } h.mu.Unlock() } func (h *DataFactoryHub) recordActivity() { h.mu.Lock() h.lastEventAt = time.Now() h.mu.Unlock() } func (h *DataFactoryHub) lastActivityAt() time.Time { h.mu.Lock() defer h.mu.Unlock() return h.lastEventAt } func (h *DataFactoryHub) fetchSince(since int64) ([]dfRun, int64, error) { db, err := h.getDB() if err != nil { return nil, since, err } rows, err := db.Query(` SELECT rowid, id, node_id, started_at, COALESCE(finished_at,''), status, rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error FROM runs WHERE rowid > ? ORDER BY rowid ASC LIMIT 500`, since) if err != nil { return nil, since, err } defer rows.Close() out := make([]dfRun, 0, 16) max := since for rows.Next() { var r dfRun if err := rows.Scan(&r.Rowid, &r.ID, &r.NodeID, &r.StartedAt, &r.FinishedAt, &r.Status, &r.RowsIn, &r.RowsOut, &r.KbIn, &r.KbOut, &r.DurationMS, &r.Trigger, &r.Error); err != nil { return nil, since, err } out = append(out, r) if r.Rowid > max { max = r.Rowid } } return out, max, rows.Err() } func (h *DataFactoryHub) fetchSnapshot() ([]dfNode, []dfRun, int64, error) { db, err := h.getDB() if err != nil { return nil, nil, 0, err } // Active nodes (enabled=1). nodeRows, err := db.Query(` SELECT id, kind, name, function_id, description, enabled, tags_csv FROM nodes WHERE enabled = 1 ORDER BY kind, name`) if err != nil { return nil, nil, 0, err } defer nodeRows.Close() nodes := make([]dfNode, 0, 16) for nodeRows.Next() { var n dfNode var enabled int if err := nodeRows.Scan(&n.ID, &n.Kind, &n.Name, &n.FunctionID, &n.Description, &enabled, &n.TagsCSV); err != nil { return nil, nil, 0, err } n.Enabled = enabled != 0 nodes = append(nodes, n) } if err := nodeRows.Err(); err != nil { return nil, nil, 0, err } // Last 50 runs (newest first). runRows, err := db.Query(` SELECT rowid, id, node_id, started_at, COALESCE(finished_at,''), status, rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error FROM runs ORDER BY rowid DESC LIMIT ?`, dfSnapshotLimit) if err != nil { return nodes, nil, 0, err } defer runRows.Close() runs := make([]dfRun, 0, dfSnapshotLimit) max := int64(0) for runRows.Next() { var r dfRun if err := runRows.Scan(&r.Rowid, &r.ID, &r.NodeID, &r.StartedAt, &r.FinishedAt, &r.Status, &r.RowsIn, &r.RowsOut, &r.KbIn, &r.KbOut, &r.DurationMS, &r.Trigger, &r.Error); err != nil { return nodes, nil, 0, err } runs = append(runs, r) if r.Rowid > max { max = r.Rowid } } return nodes, runs, max, runRows.Err() } func (h *DataFactoryHub) broadcast(msg dfMessage) { h.mu.Lock() subs := make([]*dfSubscriber, 0, len(h.subscribers)) for s := range h.subscribers { subs = append(subs, s) } h.mu.Unlock() for _, s := range subs { select { case s.out <- msg: default: log.Printf("[ws-df] dropping frame for slow subscriber") } } } // handleDataFactoryEvents upgradea HTTP a WS y mantiene la conexion. // Endpoint: GET /api/ws/datafactory func (s *Server) handleDataFactoryEvents(hub *DataFactoryHub) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ InsecureSkipVerify: true, }) if err != nil { log.Printf("[ws-df] accept: %v", err) return } defer conn.Close(websocket.StatusInternalError, "closing") ctx, cancel := context.WithCancel(r.Context()) defer cancel() sub := &dfSubscriber{ conn: conn, ctx: ctx, cancel: cancel, out: make(chan dfMessage, 64), } hub.register(sub) defer hub.unregister(sub) nodes, runs, watermark, err := hub.fetchSnapshot() if err != nil { log.Printf("[ws-df] snapshot: %v", err) conn.Close(websocket.StatusInternalError, "snapshot failed") return } hub.setWatermark(watermark) if err := wsjson.Write(ctx, conn, dfMessage{ Type: "snapshot", Watermark: watermark, Nodes: nodes, Runs: runs, ServerTime: time.Now().Unix(), }); err != nil { return } readErr := make(chan error, 1) go func() { for { var cmd clientCmd if err := wsjson.Read(ctx, conn, &cmd); err != nil { readErr <- err return } if cmd.Watermark > 0 { rows, max, err := hub.fetchSince(cmd.Watermark) if err == nil && len(rows) > 0 { hub.setWatermark(max) select { case sub.out <- dfMessage{ Type: "delta", Watermark: max, Runs: rows, ServerTime: time.Now().Unix(), }: default: } } } } }() for { select { case <-ctx.Done(): return case err := <-readErr: if err != nil { return } case msg, ok := <-sub.out: if !ok { return } wctx, wcancel := context.WithTimeout(ctx, wsBroadcastTimeout) err := wsjson.Write(wctx, conn, msg) wcancel() if err != nil { return } } } } }