chore: auto-commit (3 archivos)
- app.md - handlers.go - events.go Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -4,7 +4,8 @@ lang: go
|
||||
domain: infra
|
||||
description: "API REST HTTP read-only sobre registry.db y operations.db de cada app. Permite consultas SQL (solo SELECT/PRAGMA), busqueda FTS5, exploracion de tablas y schema. Bind por defecto a localhost:8484."
|
||||
tags: [service, api, sqlite, http, registry, fts5]
|
||||
uses_functions: []
|
||||
uses_functions:
|
||||
- http_json_response_go_infra
|
||||
uses_types: []
|
||||
framework: "net/http"
|
||||
entry_point: "main.go"
|
||||
|
||||
@@ -0,0 +1,410 @@
|
||||
package main
|
||||
|
||||
// WebSocket event hub for ops:call_monitor (issue 0086).
|
||||
//
|
||||
// Diseño:
|
||||
// - Hub global con N subscribers WS.
|
||||
// - Ticker arranca SOLO cuando hay >=1 subscriber. Para al desconectar el
|
||||
// ultimo. Cero overhead si nadie esta mirando el dashboard.
|
||||
// - Cada tick (250ms) hace SELECT * FROM calls WHERE id > watermark y
|
||||
// broadcastea filas nuevas a todos los subscribers.
|
||||
// - Al conectar, el subscriber recibe un snapshot inicial: KPIs +
|
||||
// ultimas 100 filas + watermark. Despues solo deltas.
|
||||
// - Reconnect: el cliente puede mandar {"watermark": N} en el primer
|
||||
// mensaje para resumir desde un punto conocido sin perder eventos.
|
||||
//
|
||||
// La escritura a calls la hace el hook PostToolUse via call_monitor CLI,
|
||||
// independiente. Si el dashboard no esta abierto, los datos siguen
|
||||
// registrandose normalmente — nada cuelga del WS.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"nhooyr.io/websocket"
|
||||
"nhooyr.io/websocket/wsjson"
|
||||
)
|
||||
|
||||
const (
|
||||
wsTickInterval = 250 * time.Millisecond
|
||||
wsTickIntervalIdle = 2 * time.Second
|
||||
wsIdleThreshold = 30 * time.Second // tras 30s sin eventos, sube intervalo
|
||||
wsSnapshotLimit = 100
|
||||
wsBroadcastTimeout = 2 * time.Second
|
||||
)
|
||||
|
||||
// callRow es el contrato JSON que se envia al cliente. Misma forma para
|
||||
// snapshot.recent y delta.calls.
|
||||
type callRow struct {
|
||||
ID int64 `json:"id"`
|
||||
TS int64 `json:"ts"`
|
||||
FunctionID string `json:"function_id"`
|
||||
ToolUsed string `json:"tool_used"`
|
||||
DurationMS int `json:"duration_ms"`
|
||||
Success bool `json:"success"`
|
||||
ErrorClass string `json:"error_class"`
|
||||
SessionID string `json:"session_id"`
|
||||
}
|
||||
|
||||
type kpiStats struct {
|
||||
TotalCalls int `json:"total_calls"`
|
||||
TotalErrors int `json:"total_errors"`
|
||||
TotalViolations int `json:"total_violations"`
|
||||
TotalCopies int `json:"total_copies"`
|
||||
TotalVersions int `json:"total_versions"`
|
||||
}
|
||||
|
||||
type wsMessage struct {
|
||||
Type string `json:"type"` // "snapshot" | "delta" | "ping"
|
||||
Watermark int64 `json:"watermark,omitempty"`
|
||||
Stats *kpiStats `json:"stats,omitempty"`
|
||||
Calls []callRow `json:"calls,omitempty"`
|
||||
ServerTime int64 `json:"server_time"`
|
||||
}
|
||||
|
||||
// clientCmd: mensaje que envia el cliente al conectar (o despues).
|
||||
type clientCmd struct {
|
||||
Watermark int64 `json:"watermark,omitempty"`
|
||||
}
|
||||
|
||||
// subscriber: conexion WS individual con su propia cola de salida.
|
||||
type subscriber struct {
|
||||
conn *websocket.Conn
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
out chan wsMessage
|
||||
watermark int64
|
||||
}
|
||||
|
||||
// CallMonitorHub gestiona broadcast a todos los subscribers WS.
|
||||
type CallMonitorHub struct {
|
||||
pool *DBPool
|
||||
|
||||
mu sync.Mutex
|
||||
subscribers map[*subscriber]struct{}
|
||||
tickerStop chan struct{}
|
||||
tickerOn bool
|
||||
watermark int64 // max calls.id procesado por el ticker
|
||||
lastEventAt time.Time
|
||||
}
|
||||
|
||||
func NewCallMonitorHub(pool *DBPool) *CallMonitorHub {
|
||||
return &CallMonitorHub{
|
||||
pool: pool,
|
||||
subscribers: make(map[*subscriber]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// register añade un subscriber y arranca el ticker si era el primero.
|
||||
func (h *CallMonitorHub) register(s *subscriber) {
|
||||
h.mu.Lock()
|
||||
h.subscribers[s] = struct{}{}
|
||||
shouldStart := !h.tickerOn
|
||||
if shouldStart {
|
||||
h.tickerStop = make(chan struct{})
|
||||
h.tickerOn = true
|
||||
h.lastEventAt = time.Now()
|
||||
}
|
||||
h.mu.Unlock()
|
||||
|
||||
if shouldStart {
|
||||
go h.tickerLoop()
|
||||
}
|
||||
}
|
||||
|
||||
// unregister elimina un subscriber y para el ticker si era el ultimo.
|
||||
func (h *CallMonitorHub) unregister(s *subscriber) {
|
||||
h.mu.Lock()
|
||||
if _, ok := h.subscribers[s]; !ok {
|
||||
h.mu.Unlock()
|
||||
return
|
||||
}
|
||||
delete(h.subscribers, s)
|
||||
close(s.out)
|
||||
shouldStop := h.tickerOn && len(h.subscribers) == 0
|
||||
if shouldStop {
|
||||
close(h.tickerStop)
|
||||
h.tickerOn = false
|
||||
}
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
// tickerLoop hace polling a calls table y broadcastea deltas. Adapta el
|
||||
// intervalo si lleva tiempo sin eventos: 250ms activo → 2s idle.
|
||||
func (h *CallMonitorHub) tickerLoop() {
|
||||
interval := wsTickInterval
|
||||
t := time.NewTimer(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-h.tickerStop:
|
||||
return
|
||||
case <-t.C:
|
||||
rows, max, err := h.fetchSince(h.getWatermark())
|
||||
if err != nil {
|
||||
log.Printf("[ws] fetchSince: %v", err)
|
||||
} else if len(rows) > 0 {
|
||||
h.setWatermark(max)
|
||||
h.recordActivity()
|
||||
h.broadcast(wsMessage{
|
||||
Type: "delta",
|
||||
Watermark: max,
|
||||
Calls: rows,
|
||||
ServerTime: time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
|
||||
// Adaptive interval.
|
||||
if time.Since(h.lastActivityAt()) > wsIdleThreshold {
|
||||
interval = wsTickIntervalIdle
|
||||
} else {
|
||||
interval = wsTickInterval
|
||||
}
|
||||
t.Reset(interval)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *CallMonitorHub) getWatermark() int64 {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return h.watermark
|
||||
}
|
||||
|
||||
func (h *CallMonitorHub) setWatermark(v int64) {
|
||||
h.mu.Lock()
|
||||
if v > h.watermark {
|
||||
h.watermark = v
|
||||
}
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *CallMonitorHub) recordActivity() {
|
||||
h.mu.Lock()
|
||||
h.lastEventAt = time.Now()
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *CallMonitorHub) lastActivityAt() time.Time {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return h.lastEventAt
|
||||
}
|
||||
|
||||
// fetchSince devuelve filas con id > since, ordenadas asc.
|
||||
func (h *CallMonitorHub) fetchSince(since int64) ([]callRow, int64, error) {
|
||||
db, err := h.pool.Get("ops:call_monitor")
|
||||
if err != nil {
|
||||
return nil, since, err
|
||||
}
|
||||
rows, err := db.Query(`
|
||||
SELECT id, ts, function_id, tool_used, duration_ms, success, error_class, session_id
|
||||
FROM calls
|
||||
WHERE id > ?
|
||||
ORDER BY id ASC
|
||||
LIMIT 500`, since)
|
||||
if err != nil {
|
||||
return nil, since, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := make([]callRow, 0, 16)
|
||||
max := since
|
||||
for rows.Next() {
|
||||
var c callRow
|
||||
var succ int
|
||||
if err := rows.Scan(&c.ID, &c.TS, &c.FunctionID, &c.ToolUsed,
|
||||
&c.DurationMS, &succ, &c.ErrorClass, &c.SessionID); err != nil {
|
||||
return nil, since, err
|
||||
}
|
||||
c.Success = succ != 0
|
||||
out = append(out, c)
|
||||
if c.ID > max {
|
||||
max = c.ID
|
||||
}
|
||||
}
|
||||
return out, max, rows.Err()
|
||||
}
|
||||
|
||||
// fetchSnapshot devuelve KPIs + ultimas N filas + watermark.
|
||||
func (h *CallMonitorHub) fetchSnapshot() (*kpiStats, []callRow, int64, error) {
|
||||
db, err := h.pool.Get("ops:call_monitor")
|
||||
if err != nil {
|
||||
return nil, nil, 0, err
|
||||
}
|
||||
|
||||
stats := &kpiStats{}
|
||||
queries := []struct {
|
||||
sql string
|
||||
dst *int
|
||||
}{
|
||||
{"SELECT COUNT(*) FROM calls", &stats.TotalCalls},
|
||||
{"SELECT COUNT(*) FROM calls WHERE success = 0", &stats.TotalErrors},
|
||||
{"SELECT COUNT(*) FROM violations", &stats.TotalViolations},
|
||||
{"SELECT COUNT(*) FROM copied_code", &stats.TotalCopies},
|
||||
{"SELECT COUNT(*) FROM function_versions", &stats.TotalVersions},
|
||||
}
|
||||
for _, q := range queries {
|
||||
if err := db.QueryRow(q.sql).Scan(q.dst); err != nil {
|
||||
// tabla puede no existir si call_monitor sin migrar; lo dejamos a 0.
|
||||
if err != sql.ErrNoRows {
|
||||
log.Printf("[ws] snapshot %q: %v", q.sql, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rows, err := db.Query(`
|
||||
SELECT id, ts, function_id, tool_used, duration_ms, success, error_class, session_id
|
||||
FROM calls
|
||||
ORDER BY id DESC
|
||||
LIMIT ?`, wsSnapshotLimit)
|
||||
if err != nil {
|
||||
return stats, nil, 0, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
recent := make([]callRow, 0, wsSnapshotLimit)
|
||||
max := int64(0)
|
||||
for rows.Next() {
|
||||
var c callRow
|
||||
var succ int
|
||||
if err := rows.Scan(&c.ID, &c.TS, &c.FunctionID, &c.ToolUsed,
|
||||
&c.DurationMS, &succ, &c.ErrorClass, &c.SessionID); err != nil {
|
||||
return stats, nil, 0, err
|
||||
}
|
||||
c.Success = succ != 0
|
||||
recent = append(recent, c)
|
||||
if c.ID > max {
|
||||
max = c.ID
|
||||
}
|
||||
}
|
||||
return stats, recent, max, rows.Err()
|
||||
}
|
||||
|
||||
// broadcast envia el mensaje a todos los subscribers. Si la cola de uno esta
|
||||
// llena (cliente lento), drop del mensaje para ese — no bloquea al resto.
|
||||
func (h *CallMonitorHub) broadcast(msg wsMessage) {
|
||||
h.mu.Lock()
|
||||
subs := make([]*subscriber, 0, len(h.subscribers))
|
||||
for s := range h.subscribers {
|
||||
subs = append(subs, s)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
|
||||
for _, s := range subs {
|
||||
select {
|
||||
case s.out <- msg:
|
||||
default:
|
||||
// cola llena, cliente lento: drop frame para no atascar el hub.
|
||||
log.Printf("[ws] dropping frame for slow subscriber")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleEvents es el handler HTTP que upgradea a WS.
|
||||
// Endpoint: GET /api/events/call_monitor
|
||||
func (s *Server) handleEvents(hub *CallMonitorHub) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
||||
InsecureSkipVerify: true, // CORS ya manejado por el middleware
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("[ws] accept: %v", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close(websocket.StatusInternalError, "closing")
|
||||
|
||||
ctx, cancel := context.WithCancel(r.Context())
|
||||
defer cancel()
|
||||
|
||||
sub := &subscriber{
|
||||
conn: conn,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
out: make(chan wsMessage, 64),
|
||||
}
|
||||
hub.register(sub)
|
||||
defer hub.unregister(sub)
|
||||
|
||||
// Snapshot inicial.
|
||||
stats, recent, watermark, err := hub.fetchSnapshot()
|
||||
if err != nil {
|
||||
log.Printf("[ws] snapshot: %v", err)
|
||||
conn.Close(websocket.StatusInternalError, "snapshot failed")
|
||||
return
|
||||
}
|
||||
hub.setWatermark(watermark)
|
||||
initial := wsMessage{
|
||||
Type: "snapshot",
|
||||
Watermark: watermark,
|
||||
Stats: stats,
|
||||
Calls: recent,
|
||||
ServerTime: time.Now().Unix(),
|
||||
}
|
||||
if err := wsjson.Write(ctx, conn, initial); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Reader goroutine: procesa comandos del cliente (watermark override).
|
||||
readErr := make(chan error, 1)
|
||||
go func() {
|
||||
for {
|
||||
var cmd clientCmd
|
||||
if err := wsjson.Read(ctx, conn, &cmd); err != nil {
|
||||
readErr <- err
|
||||
return
|
||||
}
|
||||
if cmd.Watermark > 0 {
|
||||
// Cliente reanuda desde un watermark conocido — enviamos
|
||||
// el delta acumulado desde ese punto.
|
||||
rows, max, err := hub.fetchSince(cmd.Watermark)
|
||||
if err == nil && len(rows) > 0 {
|
||||
hub.setWatermark(max)
|
||||
select {
|
||||
case sub.out <- wsMessage{
|
||||
Type: "delta",
|
||||
Watermark: max,
|
||||
Calls: rows,
|
||||
ServerTime: time.Now().Unix(),
|
||||
}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Writer loop: drena la cola del subscriber.
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case err := <-readErr:
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
case msg, ok := <-sub.out:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
wctx, wcancel := context.WithTimeout(ctx, wsBroadcastTimeout)
|
||||
err := wsjson.Write(wctx, conn, msg)
|
||||
wcancel()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// jsonMarshal helper para tests / debug.
|
||||
func mustJSON(v any) string {
|
||||
b, _ := json.Marshal(v)
|
||||
return string(b)
|
||||
}
|
||||
+13
-4
@@ -7,6 +7,8 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"fn-registry/functions/infra"
|
||||
)
|
||||
|
||||
const queryTimeout = 5 * time.Second
|
||||
@@ -15,10 +17,15 @@ const queryTimeout = 5 * time.Second
|
||||
type Server struct {
|
||||
pool *DBPool
|
||||
registryRoot string // raiz del fn_registry (para exec fn ...)
|
||||
hub *CallMonitorHub
|
||||
}
|
||||
|
||||
func NewServer(pool *DBPool, registryRoot string) *Server {
|
||||
return &Server{pool: pool, registryRoot: registryRoot}
|
||||
return &Server{
|
||||
pool: pool,
|
||||
registryRoot: registryRoot,
|
||||
hub: NewCallMonitorHub(pool),
|
||||
}
|
||||
}
|
||||
|
||||
// Routes registers all API routes on the given mux.
|
||||
@@ -39,6 +46,10 @@ func (s *Server) Routes(mux *http.ServeMux) {
|
||||
mux.HandleFunc("POST /api/add/app", s.handleAddApp)
|
||||
mux.HandleFunc("POST /api/add/analysis", s.handleAddAnalysis)
|
||||
mux.HandleFunc("POST /api/add/vault", s.handleAddVault)
|
||||
|
||||
// Issue 0086: WebSocket live stream de ops:call_monitor (calls table).
|
||||
// Hub global con ticker bajo demanda (solo corre con >=1 subscriber).
|
||||
mux.HandleFunc("GET /api/events/call_monitor", s.handleEvents(s.hub))
|
||||
}
|
||||
|
||||
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -315,9 +326,7 @@ func corsMiddleware(next http.Handler) http.Handler {
|
||||
}
|
||||
|
||||
func writeJSON(w http.ResponseWriter, status int, data any) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(status)
|
||||
json.NewEncoder(w).Encode(data)
|
||||
infra.HTTPJSONResponse(w, status, data)
|
||||
}
|
||||
|
||||
func writeError(w http.ResponseWriter, status int, msg string) {
|
||||
|
||||
Reference in New Issue
Block a user