From a1769a39765420ff3d9c03abd2999934fccaafd7 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Thu, 14 May 2026 00:28:22 +0200 Subject: [PATCH] chore: auto-commit (3 archivos) - app.md - handlers.go - events.go Co-Authored-By: Claude Opus 4.7 (1M context) --- app.md | 3 +- events.go | 410 ++++++++++++++++++++++++++++++++++++++++++++++++++++ handlers.go | 17 ++- 3 files changed, 425 insertions(+), 5 deletions(-) create mode 100644 events.go diff --git a/app.md b/app.md index 30fa013..1f2c2ef 100644 --- a/app.md +++ b/app.md @@ -4,7 +4,8 @@ lang: go domain: infra description: "API REST HTTP read-only sobre registry.db y operations.db de cada app. Permite consultas SQL (solo SELECT/PRAGMA), busqueda FTS5, exploracion de tablas y schema. Bind por defecto a localhost:8484." tags: [service, api, sqlite, http, registry, fts5] -uses_functions: [] +uses_functions: + - http_json_response_go_infra uses_types: [] framework: "net/http" entry_point: "main.go" diff --git a/events.go b/events.go new file mode 100644 index 0000000..01ddca5 --- /dev/null +++ b/events.go @@ -0,0 +1,410 @@ +package main + +// WebSocket event hub for ops:call_monitor (issue 0086). +// +// Diseño: +// - Hub global con N subscribers WS. +// - Ticker arranca SOLO cuando hay >=1 subscriber. Para al desconectar el +// ultimo. Cero overhead si nadie esta mirando el dashboard. +// - Cada tick (250ms) hace SELECT * FROM calls WHERE id > watermark y +// broadcastea filas nuevas a todos los subscribers. +// - Al conectar, el subscriber recibe un snapshot inicial: KPIs + +// ultimas 100 filas + watermark. Despues solo deltas. +// - Reconnect: el cliente puede mandar {"watermark": N} en el primer +// mensaje para resumir desde un punto conocido sin perder eventos. +// +// La escritura a calls la hace el hook PostToolUse via call_monitor CLI, +// independiente. Si el dashboard no esta abierto, los datos siguen +// registrandose normalmente — nada cuelga del WS. + +import ( + "context" + "database/sql" + "encoding/json" + "log" + "net/http" + "sync" + "time" + + "nhooyr.io/websocket" + "nhooyr.io/websocket/wsjson" +) + +const ( + wsTickInterval = 250 * time.Millisecond + wsTickIntervalIdle = 2 * time.Second + wsIdleThreshold = 30 * time.Second // tras 30s sin eventos, sube intervalo + wsSnapshotLimit = 100 + wsBroadcastTimeout = 2 * time.Second +) + +// callRow es el contrato JSON que se envia al cliente. Misma forma para +// snapshot.recent y delta.calls. +type callRow struct { + ID int64 `json:"id"` + TS int64 `json:"ts"` + FunctionID string `json:"function_id"` + ToolUsed string `json:"tool_used"` + DurationMS int `json:"duration_ms"` + Success bool `json:"success"` + ErrorClass string `json:"error_class"` + SessionID string `json:"session_id"` +} + +type kpiStats struct { + TotalCalls int `json:"total_calls"` + TotalErrors int `json:"total_errors"` + TotalViolations int `json:"total_violations"` + TotalCopies int `json:"total_copies"` + TotalVersions int `json:"total_versions"` +} + +type wsMessage struct { + Type string `json:"type"` // "snapshot" | "delta" | "ping" + Watermark int64 `json:"watermark,omitempty"` + Stats *kpiStats `json:"stats,omitempty"` + Calls []callRow `json:"calls,omitempty"` + ServerTime int64 `json:"server_time"` +} + +// clientCmd: mensaje que envia el cliente al conectar (o despues). +type clientCmd struct { + Watermark int64 `json:"watermark,omitempty"` +} + +// subscriber: conexion WS individual con su propia cola de salida. +type subscriber struct { + conn *websocket.Conn + ctx context.Context + cancel context.CancelFunc + out chan wsMessage + watermark int64 +} + +// CallMonitorHub gestiona broadcast a todos los subscribers WS. +type CallMonitorHub struct { + pool *DBPool + + mu sync.Mutex + subscribers map[*subscriber]struct{} + tickerStop chan struct{} + tickerOn bool + watermark int64 // max calls.id procesado por el ticker + lastEventAt time.Time +} + +func NewCallMonitorHub(pool *DBPool) *CallMonitorHub { + return &CallMonitorHub{ + pool: pool, + subscribers: make(map[*subscriber]struct{}), + } +} + +// register añade un subscriber y arranca el ticker si era el primero. +func (h *CallMonitorHub) register(s *subscriber) { + h.mu.Lock() + h.subscribers[s] = struct{}{} + shouldStart := !h.tickerOn + if shouldStart { + h.tickerStop = make(chan struct{}) + h.tickerOn = true + h.lastEventAt = time.Now() + } + h.mu.Unlock() + + if shouldStart { + go h.tickerLoop() + } +} + +// unregister elimina un subscriber y para el ticker si era el ultimo. +func (h *CallMonitorHub) unregister(s *subscriber) { + h.mu.Lock() + if _, ok := h.subscribers[s]; !ok { + h.mu.Unlock() + return + } + delete(h.subscribers, s) + close(s.out) + shouldStop := h.tickerOn && len(h.subscribers) == 0 + if shouldStop { + close(h.tickerStop) + h.tickerOn = false + } + h.mu.Unlock() +} + +// tickerLoop hace polling a calls table y broadcastea deltas. Adapta el +// intervalo si lleva tiempo sin eventos: 250ms activo → 2s idle. +func (h *CallMonitorHub) tickerLoop() { + interval := wsTickInterval + t := time.NewTimer(interval) + defer t.Stop() + for { + select { + case <-h.tickerStop: + return + case <-t.C: + rows, max, err := h.fetchSince(h.getWatermark()) + if err != nil { + log.Printf("[ws] fetchSince: %v", err) + } else if len(rows) > 0 { + h.setWatermark(max) + h.recordActivity() + h.broadcast(wsMessage{ + Type: "delta", + Watermark: max, + Calls: rows, + ServerTime: time.Now().Unix(), + }) + } + + // Adaptive interval. + if time.Since(h.lastActivityAt()) > wsIdleThreshold { + interval = wsTickIntervalIdle + } else { + interval = wsTickInterval + } + t.Reset(interval) + } + } +} + +func (h *CallMonitorHub) getWatermark() int64 { + h.mu.Lock() + defer h.mu.Unlock() + return h.watermark +} + +func (h *CallMonitorHub) setWatermark(v int64) { + h.mu.Lock() + if v > h.watermark { + h.watermark = v + } + h.mu.Unlock() +} + +func (h *CallMonitorHub) recordActivity() { + h.mu.Lock() + h.lastEventAt = time.Now() + h.mu.Unlock() +} + +func (h *CallMonitorHub) lastActivityAt() time.Time { + h.mu.Lock() + defer h.mu.Unlock() + return h.lastEventAt +} + +// fetchSince devuelve filas con id > since, ordenadas asc. +func (h *CallMonitorHub) fetchSince(since int64) ([]callRow, int64, error) { + db, err := h.pool.Get("ops:call_monitor") + if err != nil { + return nil, since, err + } + rows, err := db.Query(` + SELECT id, ts, function_id, tool_used, duration_ms, success, error_class, session_id + FROM calls + WHERE id > ? + ORDER BY id ASC + LIMIT 500`, since) + if err != nil { + return nil, since, err + } + defer rows.Close() + + out := make([]callRow, 0, 16) + max := since + for rows.Next() { + var c callRow + var succ int + if err := rows.Scan(&c.ID, &c.TS, &c.FunctionID, &c.ToolUsed, + &c.DurationMS, &succ, &c.ErrorClass, &c.SessionID); err != nil { + return nil, since, err + } + c.Success = succ != 0 + out = append(out, c) + if c.ID > max { + max = c.ID + } + } + return out, max, rows.Err() +} + +// fetchSnapshot devuelve KPIs + ultimas N filas + watermark. +func (h *CallMonitorHub) fetchSnapshot() (*kpiStats, []callRow, int64, error) { + db, err := h.pool.Get("ops:call_monitor") + if err != nil { + return nil, nil, 0, err + } + + stats := &kpiStats{} + queries := []struct { + sql string + dst *int + }{ + {"SELECT COUNT(*) FROM calls", &stats.TotalCalls}, + {"SELECT COUNT(*) FROM calls WHERE success = 0", &stats.TotalErrors}, + {"SELECT COUNT(*) FROM violations", &stats.TotalViolations}, + {"SELECT COUNT(*) FROM copied_code", &stats.TotalCopies}, + {"SELECT COUNT(*) FROM function_versions", &stats.TotalVersions}, + } + for _, q := range queries { + if err := db.QueryRow(q.sql).Scan(q.dst); err != nil { + // tabla puede no existir si call_monitor sin migrar; lo dejamos a 0. + if err != sql.ErrNoRows { + log.Printf("[ws] snapshot %q: %v", q.sql, err) + } + } + } + + rows, err := db.Query(` + SELECT id, ts, function_id, tool_used, duration_ms, success, error_class, session_id + FROM calls + ORDER BY id DESC + LIMIT ?`, wsSnapshotLimit) + if err != nil { + return stats, nil, 0, err + } + defer rows.Close() + + recent := make([]callRow, 0, wsSnapshotLimit) + max := int64(0) + for rows.Next() { + var c callRow + var succ int + if err := rows.Scan(&c.ID, &c.TS, &c.FunctionID, &c.ToolUsed, + &c.DurationMS, &succ, &c.ErrorClass, &c.SessionID); err != nil { + return stats, nil, 0, err + } + c.Success = succ != 0 + recent = append(recent, c) + if c.ID > max { + max = c.ID + } + } + return stats, recent, max, rows.Err() +} + +// broadcast envia el mensaje a todos los subscribers. Si la cola de uno esta +// llena (cliente lento), drop del mensaje para ese — no bloquea al resto. +func (h *CallMonitorHub) broadcast(msg wsMessage) { + h.mu.Lock() + subs := make([]*subscriber, 0, len(h.subscribers)) + for s := range h.subscribers { + subs = append(subs, s) + } + h.mu.Unlock() + + for _, s := range subs { + select { + case s.out <- msg: + default: + // cola llena, cliente lento: drop frame para no atascar el hub. + log.Printf("[ws] dropping frame for slow subscriber") + } + } +} + +// handleEvents es el handler HTTP que upgradea a WS. +// Endpoint: GET /api/events/call_monitor +func (s *Server) handleEvents(hub *CallMonitorHub) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ + InsecureSkipVerify: true, // CORS ya manejado por el middleware + }) + if err != nil { + log.Printf("[ws] accept: %v", err) + return + } + defer conn.Close(websocket.StatusInternalError, "closing") + + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + + sub := &subscriber{ + conn: conn, + ctx: ctx, + cancel: cancel, + out: make(chan wsMessage, 64), + } + hub.register(sub) + defer hub.unregister(sub) + + // Snapshot inicial. + stats, recent, watermark, err := hub.fetchSnapshot() + if err != nil { + log.Printf("[ws] snapshot: %v", err) + conn.Close(websocket.StatusInternalError, "snapshot failed") + return + } + hub.setWatermark(watermark) + initial := wsMessage{ + Type: "snapshot", + Watermark: watermark, + Stats: stats, + Calls: recent, + ServerTime: time.Now().Unix(), + } + if err := wsjson.Write(ctx, conn, initial); err != nil { + return + } + + // Reader goroutine: procesa comandos del cliente (watermark override). + readErr := make(chan error, 1) + go func() { + for { + var cmd clientCmd + if err := wsjson.Read(ctx, conn, &cmd); err != nil { + readErr <- err + return + } + if cmd.Watermark > 0 { + // Cliente reanuda desde un watermark conocido — enviamos + // el delta acumulado desde ese punto. + rows, max, err := hub.fetchSince(cmd.Watermark) + if err == nil && len(rows) > 0 { + hub.setWatermark(max) + select { + case sub.out <- wsMessage{ + Type: "delta", + Watermark: max, + Calls: rows, + ServerTime: time.Now().Unix(), + }: + default: + } + } + } + } + }() + + // Writer loop: drena la cola del subscriber. + for { + select { + case <-ctx.Done(): + return + case err := <-readErr: + if err != nil { + return + } + case msg, ok := <-sub.out: + if !ok { + return + } + wctx, wcancel := context.WithTimeout(ctx, wsBroadcastTimeout) + err := wsjson.Write(wctx, conn, msg) + wcancel() + if err != nil { + return + } + } + } + } +} + +// jsonMarshal helper para tests / debug. +func mustJSON(v any) string { + b, _ := json.Marshal(v) + return string(b) +} diff --git a/handlers.go b/handlers.go index 2a304f9..1572e7d 100644 --- a/handlers.go +++ b/handlers.go @@ -7,6 +7,8 @@ import ( "net/http" "strings" "time" + + "fn-registry/functions/infra" ) const queryTimeout = 5 * time.Second @@ -15,10 +17,15 @@ const queryTimeout = 5 * time.Second type Server struct { pool *DBPool registryRoot string // raiz del fn_registry (para exec fn ...) + hub *CallMonitorHub } func NewServer(pool *DBPool, registryRoot string) *Server { - return &Server{pool: pool, registryRoot: registryRoot} + return &Server{ + pool: pool, + registryRoot: registryRoot, + hub: NewCallMonitorHub(pool), + } } // Routes registers all API routes on the given mux. @@ -39,6 +46,10 @@ func (s *Server) Routes(mux *http.ServeMux) { mux.HandleFunc("POST /api/add/app", s.handleAddApp) mux.HandleFunc("POST /api/add/analysis", s.handleAddAnalysis) mux.HandleFunc("POST /api/add/vault", s.handleAddVault) + + // Issue 0086: WebSocket live stream de ops:call_monitor (calls table). + // Hub global con ticker bajo demanda (solo corre con >=1 subscriber). + mux.HandleFunc("GET /api/events/call_monitor", s.handleEvents(s.hub)) } func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { @@ -315,9 +326,7 @@ func corsMiddleware(next http.Handler) http.Handler { } func writeJSON(w http.ResponseWriter, status int, data any) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(status) - json.NewEncoder(w).Encode(data) + infra.HTTPJSONResponse(w, status, data) } func writeError(w http.ResponseWriter, status int, msg string) {