merge: issue/0086-ws-events — WebSocket hub for ops:call_monitor

New endpoint: GET /api/events/call_monitor (upgrade to WS).
Per-subscriber writer queue, lazy ticker (250ms when active / 2s idle)
that only runs while >=1 subscriber is connected. Snapshot on connect
(KPIs + last 100 calls + watermark), deltas after (id > watermark).
Client may send {watermark: N} to resume.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Egutierrez
2026-05-14 00:35:53 +02:00
3 changed files with 425 additions and 5 deletions
+2 -1
View File
@@ -4,7 +4,8 @@ lang: go
domain: infra
description: "API REST HTTP read-only sobre registry.db y operations.db de cada app. Permite consultas SQL (solo SELECT/PRAGMA), busqueda FTS5, exploracion de tablas y schema. Bind por defecto a localhost:8484."
tags: [service, api, sqlite, http, registry, fts5]
uses_functions: []
uses_functions:
- http_json_response_go_infra
uses_types: []
framework: "net/http"
entry_point: "main.go"
+410
View File
@@ -0,0 +1,410 @@
package main
// WebSocket event hub for ops:call_monitor (issue 0086).
//
// Diseño:
// - Hub global con N subscribers WS.
// - Ticker arranca SOLO cuando hay >=1 subscriber. Para al desconectar el
// ultimo. Cero overhead si nadie esta mirando el dashboard.
// - Cada tick (250ms) hace SELECT * FROM calls WHERE id > watermark y
// broadcastea filas nuevas a todos los subscribers.
// - Al conectar, el subscriber recibe un snapshot inicial: KPIs +
// ultimas 100 filas + watermark. Despues solo deltas.
// - Reconnect: el cliente puede mandar {"watermark": N} en el primer
// mensaje para resumir desde un punto conocido sin perder eventos.
//
// La escritura a calls la hace el hook PostToolUse via call_monitor CLI,
// independiente. Si el dashboard no esta abierto, los datos siguen
// registrandose normalmente — nada cuelga del WS.
import (
"context"
"database/sql"
"encoding/json"
"log"
"net/http"
"sync"
"time"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)
const (
wsTickInterval = 250 * time.Millisecond
wsTickIntervalIdle = 2 * time.Second
wsIdleThreshold = 30 * time.Second // tras 30s sin eventos, sube intervalo
wsSnapshotLimit = 100
wsBroadcastTimeout = 2 * time.Second
)
// callRow es el contrato JSON que se envia al cliente. Misma forma para
// snapshot.recent y delta.calls.
type callRow struct {
ID int64 `json:"id"`
TS int64 `json:"ts"`
FunctionID string `json:"function_id"`
ToolUsed string `json:"tool_used"`
DurationMS int `json:"duration_ms"`
Success bool `json:"success"`
ErrorClass string `json:"error_class"`
SessionID string `json:"session_id"`
}
type kpiStats struct {
TotalCalls int `json:"total_calls"`
TotalErrors int `json:"total_errors"`
TotalViolations int `json:"total_violations"`
TotalCopies int `json:"total_copies"`
TotalVersions int `json:"total_versions"`
}
type wsMessage struct {
Type string `json:"type"` // "snapshot" | "delta" | "ping"
Watermark int64 `json:"watermark,omitempty"`
Stats *kpiStats `json:"stats,omitempty"`
Calls []callRow `json:"calls,omitempty"`
ServerTime int64 `json:"server_time"`
}
// clientCmd: mensaje que envia el cliente al conectar (o despues).
type clientCmd struct {
Watermark int64 `json:"watermark,omitempty"`
}
// subscriber: conexion WS individual con su propia cola de salida.
type subscriber struct {
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
out chan wsMessage
watermark int64
}
// CallMonitorHub gestiona broadcast a todos los subscribers WS.
type CallMonitorHub struct {
pool *DBPool
mu sync.Mutex
subscribers map[*subscriber]struct{}
tickerStop chan struct{}
tickerOn bool
watermark int64 // max calls.id procesado por el ticker
lastEventAt time.Time
}
func NewCallMonitorHub(pool *DBPool) *CallMonitorHub {
return &CallMonitorHub{
pool: pool,
subscribers: make(map[*subscriber]struct{}),
}
}
// register añade un subscriber y arranca el ticker si era el primero.
func (h *CallMonitorHub) register(s *subscriber) {
h.mu.Lock()
h.subscribers[s] = struct{}{}
shouldStart := !h.tickerOn
if shouldStart {
h.tickerStop = make(chan struct{})
h.tickerOn = true
h.lastEventAt = time.Now()
}
h.mu.Unlock()
if shouldStart {
go h.tickerLoop()
}
}
// unregister elimina un subscriber y para el ticker si era el ultimo.
func (h *CallMonitorHub) unregister(s *subscriber) {
h.mu.Lock()
if _, ok := h.subscribers[s]; !ok {
h.mu.Unlock()
return
}
delete(h.subscribers, s)
close(s.out)
shouldStop := h.tickerOn && len(h.subscribers) == 0
if shouldStop {
close(h.tickerStop)
h.tickerOn = false
}
h.mu.Unlock()
}
// tickerLoop hace polling a calls table y broadcastea deltas. Adapta el
// intervalo si lleva tiempo sin eventos: 250ms activo → 2s idle.
func (h *CallMonitorHub) tickerLoop() {
interval := wsTickInterval
t := time.NewTimer(interval)
defer t.Stop()
for {
select {
case <-h.tickerStop:
return
case <-t.C:
rows, max, err := h.fetchSince(h.getWatermark())
if err != nil {
log.Printf("[ws] fetchSince: %v", err)
} else if len(rows) > 0 {
h.setWatermark(max)
h.recordActivity()
h.broadcast(wsMessage{
Type: "delta",
Watermark: max,
Calls: rows,
ServerTime: time.Now().Unix(),
})
}
// Adaptive interval.
if time.Since(h.lastActivityAt()) > wsIdleThreshold {
interval = wsTickIntervalIdle
} else {
interval = wsTickInterval
}
t.Reset(interval)
}
}
}
func (h *CallMonitorHub) getWatermark() int64 {
h.mu.Lock()
defer h.mu.Unlock()
return h.watermark
}
func (h *CallMonitorHub) setWatermark(v int64) {
h.mu.Lock()
if v > h.watermark {
h.watermark = v
}
h.mu.Unlock()
}
func (h *CallMonitorHub) recordActivity() {
h.mu.Lock()
h.lastEventAt = time.Now()
h.mu.Unlock()
}
func (h *CallMonitorHub) lastActivityAt() time.Time {
h.mu.Lock()
defer h.mu.Unlock()
return h.lastEventAt
}
// fetchSince devuelve filas con id > since, ordenadas asc.
func (h *CallMonitorHub) fetchSince(since int64) ([]callRow, int64, error) {
db, err := h.pool.Get("ops:call_monitor")
if err != nil {
return nil, since, err
}
rows, err := db.Query(`
SELECT id, ts, function_id, tool_used, duration_ms, success, error_class, session_id
FROM calls
WHERE id > ?
ORDER BY id ASC
LIMIT 500`, since)
if err != nil {
return nil, since, err
}
defer rows.Close()
out := make([]callRow, 0, 16)
max := since
for rows.Next() {
var c callRow
var succ int
if err := rows.Scan(&c.ID, &c.TS, &c.FunctionID, &c.ToolUsed,
&c.DurationMS, &succ, &c.ErrorClass, &c.SessionID); err != nil {
return nil, since, err
}
c.Success = succ != 0
out = append(out, c)
if c.ID > max {
max = c.ID
}
}
return out, max, rows.Err()
}
// fetchSnapshot devuelve KPIs + ultimas N filas + watermark.
func (h *CallMonitorHub) fetchSnapshot() (*kpiStats, []callRow, int64, error) {
db, err := h.pool.Get("ops:call_monitor")
if err != nil {
return nil, nil, 0, err
}
stats := &kpiStats{}
queries := []struct {
sql string
dst *int
}{
{"SELECT COUNT(*) FROM calls", &stats.TotalCalls},
{"SELECT COUNT(*) FROM calls WHERE success = 0", &stats.TotalErrors},
{"SELECT COUNT(*) FROM violations", &stats.TotalViolations},
{"SELECT COUNT(*) FROM copied_code", &stats.TotalCopies},
{"SELECT COUNT(*) FROM function_versions", &stats.TotalVersions},
}
for _, q := range queries {
if err := db.QueryRow(q.sql).Scan(q.dst); err != nil {
// tabla puede no existir si call_monitor sin migrar; lo dejamos a 0.
if err != sql.ErrNoRows {
log.Printf("[ws] snapshot %q: %v", q.sql, err)
}
}
}
rows, err := db.Query(`
SELECT id, ts, function_id, tool_used, duration_ms, success, error_class, session_id
FROM calls
ORDER BY id DESC
LIMIT ?`, wsSnapshotLimit)
if err != nil {
return stats, nil, 0, err
}
defer rows.Close()
recent := make([]callRow, 0, wsSnapshotLimit)
max := int64(0)
for rows.Next() {
var c callRow
var succ int
if err := rows.Scan(&c.ID, &c.TS, &c.FunctionID, &c.ToolUsed,
&c.DurationMS, &succ, &c.ErrorClass, &c.SessionID); err != nil {
return stats, nil, 0, err
}
c.Success = succ != 0
recent = append(recent, c)
if c.ID > max {
max = c.ID
}
}
return stats, recent, max, rows.Err()
}
// broadcast envia el mensaje a todos los subscribers. Si la cola de uno esta
// llena (cliente lento), drop del mensaje para ese — no bloquea al resto.
func (h *CallMonitorHub) broadcast(msg wsMessage) {
h.mu.Lock()
subs := make([]*subscriber, 0, len(h.subscribers))
for s := range h.subscribers {
subs = append(subs, s)
}
h.mu.Unlock()
for _, s := range subs {
select {
case s.out <- msg:
default:
// cola llena, cliente lento: drop frame para no atascar el hub.
log.Printf("[ws] dropping frame for slow subscriber")
}
}
}
// handleEvents es el handler HTTP que upgradea a WS.
// Endpoint: GET /api/events/call_monitor
func (s *Server) handleEvents(hub *CallMonitorHub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true, // CORS ya manejado por el middleware
})
if err != nil {
log.Printf("[ws] accept: %v", err)
return
}
defer conn.Close(websocket.StatusInternalError, "closing")
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
sub := &subscriber{
conn: conn,
ctx: ctx,
cancel: cancel,
out: make(chan wsMessage, 64),
}
hub.register(sub)
defer hub.unregister(sub)
// Snapshot inicial.
stats, recent, watermark, err := hub.fetchSnapshot()
if err != nil {
log.Printf("[ws] snapshot: %v", err)
conn.Close(websocket.StatusInternalError, "snapshot failed")
return
}
hub.setWatermark(watermark)
initial := wsMessage{
Type: "snapshot",
Watermark: watermark,
Stats: stats,
Calls: recent,
ServerTime: time.Now().Unix(),
}
if err := wsjson.Write(ctx, conn, initial); err != nil {
return
}
// Reader goroutine: procesa comandos del cliente (watermark override).
readErr := make(chan error, 1)
go func() {
for {
var cmd clientCmd
if err := wsjson.Read(ctx, conn, &cmd); err != nil {
readErr <- err
return
}
if cmd.Watermark > 0 {
// Cliente reanuda desde un watermark conocido — enviamos
// el delta acumulado desde ese punto.
rows, max, err := hub.fetchSince(cmd.Watermark)
if err == nil && len(rows) > 0 {
hub.setWatermark(max)
select {
case sub.out <- wsMessage{
Type: "delta",
Watermark: max,
Calls: rows,
ServerTime: time.Now().Unix(),
}:
default:
}
}
}
}
}()
// Writer loop: drena la cola del subscriber.
for {
select {
case <-ctx.Done():
return
case err := <-readErr:
if err != nil {
return
}
case msg, ok := <-sub.out:
if !ok {
return
}
wctx, wcancel := context.WithTimeout(ctx, wsBroadcastTimeout)
err := wsjson.Write(wctx, conn, msg)
wcancel()
if err != nil {
return
}
}
}
}
}
// jsonMarshal helper para tests / debug.
func mustJSON(v any) string {
b, _ := json.Marshal(v)
return string(b)
}
+13 -4
View File
@@ -7,6 +7,8 @@ import (
"net/http"
"strings"
"time"
"fn-registry/functions/infra"
)
const queryTimeout = 5 * time.Second
@@ -15,10 +17,15 @@ const queryTimeout = 5 * time.Second
type Server struct {
pool *DBPool
registryRoot string // raiz del fn_registry (para exec fn ...)
hub *CallMonitorHub
}
func NewServer(pool *DBPool, registryRoot string) *Server {
return &Server{pool: pool, registryRoot: registryRoot}
return &Server{
pool: pool,
registryRoot: registryRoot,
hub: NewCallMonitorHub(pool),
}
}
// Routes registers all API routes on the given mux.
@@ -39,6 +46,10 @@ func (s *Server) Routes(mux *http.ServeMux) {
mux.HandleFunc("POST /api/add/app", s.handleAddApp)
mux.HandleFunc("POST /api/add/analysis", s.handleAddAnalysis)
mux.HandleFunc("POST /api/add/vault", s.handleAddVault)
// Issue 0086: WebSocket live stream de ops:call_monitor (calls table).
// Hub global con ticker bajo demanda (solo corre con >=1 subscriber).
mux.HandleFunc("GET /api/events/call_monitor", s.handleEvents(s.hub))
}
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
@@ -315,9 +326,7 @@ func corsMiddleware(next http.Handler) http.Handler {
}
func writeJSON(w http.ResponseWriter, status int, data any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(data)
infra.HTTPJSONResponse(w, status, data)
}
func writeError(w http.ResponseWriter, status int, msg string) {