11c986edc7
- handlers.go - handlers_test.go - main.go - datafactory_events.go - handlers_datafactory.go Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
388 lines
9.0 KiB
Go
388 lines
9.0 KiB
Go
package main
|
|
|
|
// WebSocket event hub for data_factory.db `runs` table (issue 0097).
|
|
//
|
|
// Mirrors CallMonitorHub in events.go but watches `runs` of data_factory.db.
|
|
// Lazy: ticker only runs while >=1 subscriber is connected.
|
|
// Snapshot at connect = last 50 runs + active nodes + watermark.
|
|
// Delta every wsTickInterval = rows with rowid > watermark.
|
|
//
|
|
// Reuses subscriber/wsMessage/clientCmd from events.go — those types are
|
|
// generic enough for any append-only event stream.
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"log"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"nhooyr.io/websocket"
|
|
"nhooyr.io/websocket/wsjson"
|
|
)
|
|
|
|
const dfSnapshotLimit = 50
|
|
|
|
// dfNode is the JSON shape served by the snapshot's `nodes` field.
|
|
type dfNode struct {
|
|
ID string `json:"id"`
|
|
Kind string `json:"kind"`
|
|
Name string `json:"name"`
|
|
FunctionID string `json:"function_id"`
|
|
Description string `json:"description"`
|
|
Enabled bool `json:"enabled"`
|
|
TagsCSV string `json:"tags_csv"`
|
|
}
|
|
|
|
// dfRun is the JSON contract for snapshot.runs and delta.runs.
|
|
type dfRun struct {
|
|
Rowid int64 `json:"rowid"`
|
|
ID string `json:"id"`
|
|
NodeID string `json:"node_id"`
|
|
StartedAt string `json:"started_at"`
|
|
FinishedAt string `json:"finished_at"`
|
|
Status string `json:"status"`
|
|
RowsIn int64 `json:"rows_in"`
|
|
RowsOut int64 `json:"rows_out"`
|
|
KbIn int64 `json:"kb_in"`
|
|
KbOut int64 `json:"kb_out"`
|
|
DurationMS int64 `json:"duration_ms"`
|
|
Trigger string `json:"trigger"`
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
// dfMessage is what travels on the WS. Distinct from wsMessage to avoid
|
|
// schema collision with call_monitor's KPIs/rows.
|
|
type dfMessage struct {
|
|
Type string `json:"type"` // "snapshot" | "delta" | "ping"
|
|
Watermark int64 `json:"watermark,omitempty"`
|
|
Nodes []dfNode `json:"nodes,omitempty"`
|
|
Runs []dfRun `json:"runs,omitempty"`
|
|
ServerTime int64 `json:"server_time"`
|
|
}
|
|
|
|
type dfSubscriber struct {
|
|
conn *websocket.Conn
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
out chan dfMessage
|
|
watermark int64
|
|
}
|
|
|
|
// DataFactoryHub gestiona broadcast WS sobre `runs` table de data_factory.db.
|
|
//
|
|
// Holds a direct *sql.DB pointer (lazy-opened RW by Server). NOT pulled from
|
|
// DBPool because data_factory.db is opened by datafactory.Open(), not by the
|
|
// pool's read-only discovery flow.
|
|
type DataFactoryHub struct {
|
|
getDB func() (*sql.DB, error)
|
|
|
|
mu sync.Mutex
|
|
subscribers map[*dfSubscriber]struct{}
|
|
tickerStop chan struct{}
|
|
tickerOn bool
|
|
watermark int64
|
|
lastEventAt time.Time
|
|
}
|
|
|
|
func NewDataFactoryHub(getDB func() (*sql.DB, error)) *DataFactoryHub {
|
|
return &DataFactoryHub{
|
|
getDB: getDB,
|
|
subscribers: make(map[*dfSubscriber]struct{}),
|
|
}
|
|
}
|
|
|
|
func (h *DataFactoryHub) register(s *dfSubscriber) {
|
|
h.mu.Lock()
|
|
h.subscribers[s] = struct{}{}
|
|
shouldStart := !h.tickerOn
|
|
if shouldStart {
|
|
h.tickerStop = make(chan struct{})
|
|
h.tickerOn = true
|
|
h.lastEventAt = time.Now()
|
|
}
|
|
h.mu.Unlock()
|
|
|
|
if shouldStart {
|
|
go h.tickerLoop()
|
|
}
|
|
}
|
|
|
|
func (h *DataFactoryHub) unregister(s *dfSubscriber) {
|
|
h.mu.Lock()
|
|
if _, ok := h.subscribers[s]; !ok {
|
|
h.mu.Unlock()
|
|
return
|
|
}
|
|
delete(h.subscribers, s)
|
|
close(s.out)
|
|
shouldStop := h.tickerOn && len(h.subscribers) == 0
|
|
if shouldStop {
|
|
close(h.tickerStop)
|
|
h.tickerOn = false
|
|
}
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
func (h *DataFactoryHub) tickerLoop() {
|
|
interval := wsTickInterval
|
|
t := time.NewTimer(interval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-h.tickerStop:
|
|
return
|
|
case <-t.C:
|
|
rows, max, err := h.fetchSince(h.getWatermark())
|
|
if err != nil {
|
|
log.Printf("[ws-df] fetchSince: %v", err)
|
|
} else if len(rows) > 0 {
|
|
h.setWatermark(max)
|
|
h.recordActivity()
|
|
h.broadcast(dfMessage{
|
|
Type: "delta",
|
|
Watermark: max,
|
|
Runs: rows,
|
|
ServerTime: time.Now().Unix(),
|
|
})
|
|
}
|
|
if time.Since(h.lastActivityAt()) > wsIdleThreshold {
|
|
interval = wsTickIntervalIdle
|
|
} else {
|
|
interval = wsTickInterval
|
|
}
|
|
t.Reset(interval)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *DataFactoryHub) getWatermark() int64 {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
return h.watermark
|
|
}
|
|
|
|
func (h *DataFactoryHub) setWatermark(v int64) {
|
|
h.mu.Lock()
|
|
if v > h.watermark {
|
|
h.watermark = v
|
|
}
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
func (h *DataFactoryHub) recordActivity() {
|
|
h.mu.Lock()
|
|
h.lastEventAt = time.Now()
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
func (h *DataFactoryHub) lastActivityAt() time.Time {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
return h.lastEventAt
|
|
}
|
|
|
|
func (h *DataFactoryHub) fetchSince(since int64) ([]dfRun, int64, error) {
|
|
db, err := h.getDB()
|
|
if err != nil {
|
|
return nil, since, err
|
|
}
|
|
rows, err := db.Query(`
|
|
SELECT rowid, id, node_id, started_at, COALESCE(finished_at,''), status,
|
|
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error
|
|
FROM runs
|
|
WHERE rowid > ?
|
|
ORDER BY rowid ASC
|
|
LIMIT 500`, since)
|
|
if err != nil {
|
|
return nil, since, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
out := make([]dfRun, 0, 16)
|
|
max := since
|
|
for rows.Next() {
|
|
var r dfRun
|
|
if err := rows.Scan(&r.Rowid, &r.ID, &r.NodeID, &r.StartedAt, &r.FinishedAt,
|
|
&r.Status, &r.RowsIn, &r.RowsOut, &r.KbIn, &r.KbOut,
|
|
&r.DurationMS, &r.Trigger, &r.Error); err != nil {
|
|
return nil, since, err
|
|
}
|
|
out = append(out, r)
|
|
if r.Rowid > max {
|
|
max = r.Rowid
|
|
}
|
|
}
|
|
return out, max, rows.Err()
|
|
}
|
|
|
|
func (h *DataFactoryHub) fetchSnapshot() ([]dfNode, []dfRun, int64, error) {
|
|
db, err := h.getDB()
|
|
if err != nil {
|
|
return nil, nil, 0, err
|
|
}
|
|
|
|
// Active nodes (enabled=1).
|
|
nodeRows, err := db.Query(`
|
|
SELECT id, kind, name, function_id, description, enabled, tags_csv
|
|
FROM nodes
|
|
WHERE enabled = 1
|
|
ORDER BY kind, name`)
|
|
if err != nil {
|
|
return nil, nil, 0, err
|
|
}
|
|
defer nodeRows.Close()
|
|
|
|
nodes := make([]dfNode, 0, 16)
|
|
for nodeRows.Next() {
|
|
var n dfNode
|
|
var enabled int
|
|
if err := nodeRows.Scan(&n.ID, &n.Kind, &n.Name, &n.FunctionID,
|
|
&n.Description, &enabled, &n.TagsCSV); err != nil {
|
|
return nil, nil, 0, err
|
|
}
|
|
n.Enabled = enabled != 0
|
|
nodes = append(nodes, n)
|
|
}
|
|
if err := nodeRows.Err(); err != nil {
|
|
return nil, nil, 0, err
|
|
}
|
|
|
|
// Last 50 runs (newest first).
|
|
runRows, err := db.Query(`
|
|
SELECT rowid, id, node_id, started_at, COALESCE(finished_at,''), status,
|
|
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error
|
|
FROM runs
|
|
ORDER BY rowid DESC
|
|
LIMIT ?`, dfSnapshotLimit)
|
|
if err != nil {
|
|
return nodes, nil, 0, err
|
|
}
|
|
defer runRows.Close()
|
|
|
|
runs := make([]dfRun, 0, dfSnapshotLimit)
|
|
max := int64(0)
|
|
for runRows.Next() {
|
|
var r dfRun
|
|
if err := runRows.Scan(&r.Rowid, &r.ID, &r.NodeID, &r.StartedAt, &r.FinishedAt,
|
|
&r.Status, &r.RowsIn, &r.RowsOut, &r.KbIn, &r.KbOut,
|
|
&r.DurationMS, &r.Trigger, &r.Error); err != nil {
|
|
return nodes, nil, 0, err
|
|
}
|
|
runs = append(runs, r)
|
|
if r.Rowid > max {
|
|
max = r.Rowid
|
|
}
|
|
}
|
|
return nodes, runs, max, runRows.Err()
|
|
}
|
|
|
|
func (h *DataFactoryHub) broadcast(msg dfMessage) {
|
|
h.mu.Lock()
|
|
subs := make([]*dfSubscriber, 0, len(h.subscribers))
|
|
for s := range h.subscribers {
|
|
subs = append(subs, s)
|
|
}
|
|
h.mu.Unlock()
|
|
|
|
for _, s := range subs {
|
|
select {
|
|
case s.out <- msg:
|
|
default:
|
|
log.Printf("[ws-df] dropping frame for slow subscriber")
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleDataFactoryEvents upgradea HTTP a WS y mantiene la conexion.
|
|
// Endpoint: GET /api/ws/datafactory
|
|
func (s *Server) handleDataFactoryEvents(hub *DataFactoryHub) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
|
InsecureSkipVerify: true,
|
|
})
|
|
if err != nil {
|
|
log.Printf("[ws-df] accept: %v", err)
|
|
return
|
|
}
|
|
defer conn.Close(websocket.StatusInternalError, "closing")
|
|
|
|
ctx, cancel := context.WithCancel(r.Context())
|
|
defer cancel()
|
|
|
|
sub := &dfSubscriber{
|
|
conn: conn,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
out: make(chan dfMessage, 64),
|
|
}
|
|
hub.register(sub)
|
|
defer hub.unregister(sub)
|
|
|
|
nodes, runs, watermark, err := hub.fetchSnapshot()
|
|
if err != nil {
|
|
log.Printf("[ws-df] snapshot: %v", err)
|
|
conn.Close(websocket.StatusInternalError, "snapshot failed")
|
|
return
|
|
}
|
|
hub.setWatermark(watermark)
|
|
if err := wsjson.Write(ctx, conn, dfMessage{
|
|
Type: "snapshot",
|
|
Watermark: watermark,
|
|
Nodes: nodes,
|
|
Runs: runs,
|
|
ServerTime: time.Now().Unix(),
|
|
}); err != nil {
|
|
return
|
|
}
|
|
|
|
readErr := make(chan error, 1)
|
|
go func() {
|
|
for {
|
|
var cmd clientCmd
|
|
if err := wsjson.Read(ctx, conn, &cmd); err != nil {
|
|
readErr <- err
|
|
return
|
|
}
|
|
if cmd.Watermark > 0 {
|
|
rows, max, err := hub.fetchSince(cmd.Watermark)
|
|
if err == nil && len(rows) > 0 {
|
|
hub.setWatermark(max)
|
|
select {
|
|
case sub.out <- dfMessage{
|
|
Type: "delta",
|
|
Watermark: max,
|
|
Runs: rows,
|
|
ServerTime: time.Now().Unix(),
|
|
}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case err := <-readErr:
|
|
if err != nil {
|
|
return
|
|
}
|
|
case msg, ok := <-sub.out:
|
|
if !ok {
|
|
return
|
|
}
|
|
wctx, wcancel := context.WithTimeout(ctx, wsBroadcastTimeout)
|
|
err := wsjson.Write(wctx, conn, msg)
|
|
wcancel()
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|