a1769a3976
- app.md - handlers.go - events.go Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
411 lines
10 KiB
Go
411 lines
10 KiB
Go
package main
|
|
|
|
// WebSocket event hub for ops:call_monitor (issue 0086).
|
|
//
|
|
// Diseño:
|
|
// - Hub global con N subscribers WS.
|
|
// - Ticker arranca SOLO cuando hay >=1 subscriber. Para al desconectar el
|
|
// ultimo. Cero overhead si nadie esta mirando el dashboard.
|
|
// - Cada tick (250ms) hace SELECT * FROM calls WHERE id > watermark y
|
|
// broadcastea filas nuevas a todos los subscribers.
|
|
// - Al conectar, el subscriber recibe un snapshot inicial: KPIs +
|
|
// ultimas 100 filas + watermark. Despues solo deltas.
|
|
// - Reconnect: el cliente puede mandar {"watermark": N} en el primer
|
|
// mensaje para resumir desde un punto conocido sin perder eventos.
|
|
//
|
|
// La escritura a calls la hace el hook PostToolUse via call_monitor CLI,
|
|
// independiente. Si el dashboard no esta abierto, los datos siguen
|
|
// registrandose normalmente — nada cuelga del WS.
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"log"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"nhooyr.io/websocket"
|
|
"nhooyr.io/websocket/wsjson"
|
|
)
|
|
|
|
const (
|
|
wsTickInterval = 250 * time.Millisecond
|
|
wsTickIntervalIdle = 2 * time.Second
|
|
wsIdleThreshold = 30 * time.Second // tras 30s sin eventos, sube intervalo
|
|
wsSnapshotLimit = 100
|
|
wsBroadcastTimeout = 2 * time.Second
|
|
)
|
|
|
|
// callRow es el contrato JSON que se envia al cliente. Misma forma para
|
|
// snapshot.recent y delta.calls.
|
|
type callRow struct {
|
|
ID int64 `json:"id"`
|
|
TS int64 `json:"ts"`
|
|
FunctionID string `json:"function_id"`
|
|
ToolUsed string `json:"tool_used"`
|
|
DurationMS int `json:"duration_ms"`
|
|
Success bool `json:"success"`
|
|
ErrorClass string `json:"error_class"`
|
|
SessionID string `json:"session_id"`
|
|
}
|
|
|
|
type kpiStats struct {
|
|
TotalCalls int `json:"total_calls"`
|
|
TotalErrors int `json:"total_errors"`
|
|
TotalViolations int `json:"total_violations"`
|
|
TotalCopies int `json:"total_copies"`
|
|
TotalVersions int `json:"total_versions"`
|
|
}
|
|
|
|
type wsMessage struct {
|
|
Type string `json:"type"` // "snapshot" | "delta" | "ping"
|
|
Watermark int64 `json:"watermark,omitempty"`
|
|
Stats *kpiStats `json:"stats,omitempty"`
|
|
Calls []callRow `json:"calls,omitempty"`
|
|
ServerTime int64 `json:"server_time"`
|
|
}
|
|
|
|
// clientCmd: mensaje que envia el cliente al conectar (o despues).
|
|
type clientCmd struct {
|
|
Watermark int64 `json:"watermark,omitempty"`
|
|
}
|
|
|
|
// subscriber: conexion WS individual con su propia cola de salida.
|
|
type subscriber struct {
|
|
conn *websocket.Conn
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
out chan wsMessage
|
|
watermark int64
|
|
}
|
|
|
|
// CallMonitorHub gestiona broadcast a todos los subscribers WS.
|
|
type CallMonitorHub struct {
|
|
pool *DBPool
|
|
|
|
mu sync.Mutex
|
|
subscribers map[*subscriber]struct{}
|
|
tickerStop chan struct{}
|
|
tickerOn bool
|
|
watermark int64 // max calls.id procesado por el ticker
|
|
lastEventAt time.Time
|
|
}
|
|
|
|
func NewCallMonitorHub(pool *DBPool) *CallMonitorHub {
|
|
return &CallMonitorHub{
|
|
pool: pool,
|
|
subscribers: make(map[*subscriber]struct{}),
|
|
}
|
|
}
|
|
|
|
// register añade un subscriber y arranca el ticker si era el primero.
|
|
func (h *CallMonitorHub) register(s *subscriber) {
|
|
h.mu.Lock()
|
|
h.subscribers[s] = struct{}{}
|
|
shouldStart := !h.tickerOn
|
|
if shouldStart {
|
|
h.tickerStop = make(chan struct{})
|
|
h.tickerOn = true
|
|
h.lastEventAt = time.Now()
|
|
}
|
|
h.mu.Unlock()
|
|
|
|
if shouldStart {
|
|
go h.tickerLoop()
|
|
}
|
|
}
|
|
|
|
// unregister elimina un subscriber y para el ticker si era el ultimo.
|
|
func (h *CallMonitorHub) unregister(s *subscriber) {
|
|
h.mu.Lock()
|
|
if _, ok := h.subscribers[s]; !ok {
|
|
h.mu.Unlock()
|
|
return
|
|
}
|
|
delete(h.subscribers, s)
|
|
close(s.out)
|
|
shouldStop := h.tickerOn && len(h.subscribers) == 0
|
|
if shouldStop {
|
|
close(h.tickerStop)
|
|
h.tickerOn = false
|
|
}
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
// tickerLoop hace polling a calls table y broadcastea deltas. Adapta el
|
|
// intervalo si lleva tiempo sin eventos: 250ms activo → 2s idle.
|
|
func (h *CallMonitorHub) tickerLoop() {
|
|
interval := wsTickInterval
|
|
t := time.NewTimer(interval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-h.tickerStop:
|
|
return
|
|
case <-t.C:
|
|
rows, max, err := h.fetchSince(h.getWatermark())
|
|
if err != nil {
|
|
log.Printf("[ws] fetchSince: %v", err)
|
|
} else if len(rows) > 0 {
|
|
h.setWatermark(max)
|
|
h.recordActivity()
|
|
h.broadcast(wsMessage{
|
|
Type: "delta",
|
|
Watermark: max,
|
|
Calls: rows,
|
|
ServerTime: time.Now().Unix(),
|
|
})
|
|
}
|
|
|
|
// Adaptive interval.
|
|
if time.Since(h.lastActivityAt()) > wsIdleThreshold {
|
|
interval = wsTickIntervalIdle
|
|
} else {
|
|
interval = wsTickInterval
|
|
}
|
|
t.Reset(interval)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *CallMonitorHub) getWatermark() int64 {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
return h.watermark
|
|
}
|
|
|
|
func (h *CallMonitorHub) setWatermark(v int64) {
|
|
h.mu.Lock()
|
|
if v > h.watermark {
|
|
h.watermark = v
|
|
}
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
func (h *CallMonitorHub) recordActivity() {
|
|
h.mu.Lock()
|
|
h.lastEventAt = time.Now()
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
func (h *CallMonitorHub) lastActivityAt() time.Time {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
return h.lastEventAt
|
|
}
|
|
|
|
// fetchSince devuelve filas con id > since, ordenadas asc.
|
|
func (h *CallMonitorHub) fetchSince(since int64) ([]callRow, int64, error) {
|
|
db, err := h.pool.Get("ops:call_monitor")
|
|
if err != nil {
|
|
return nil, since, err
|
|
}
|
|
rows, err := db.Query(`
|
|
SELECT id, ts, function_id, tool_used, duration_ms, success, error_class, session_id
|
|
FROM calls
|
|
WHERE id > ?
|
|
ORDER BY id ASC
|
|
LIMIT 500`, since)
|
|
if err != nil {
|
|
return nil, since, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
out := make([]callRow, 0, 16)
|
|
max := since
|
|
for rows.Next() {
|
|
var c callRow
|
|
var succ int
|
|
if err := rows.Scan(&c.ID, &c.TS, &c.FunctionID, &c.ToolUsed,
|
|
&c.DurationMS, &succ, &c.ErrorClass, &c.SessionID); err != nil {
|
|
return nil, since, err
|
|
}
|
|
c.Success = succ != 0
|
|
out = append(out, c)
|
|
if c.ID > max {
|
|
max = c.ID
|
|
}
|
|
}
|
|
return out, max, rows.Err()
|
|
}
|
|
|
|
// fetchSnapshot devuelve KPIs + ultimas N filas + watermark.
|
|
func (h *CallMonitorHub) fetchSnapshot() (*kpiStats, []callRow, int64, error) {
|
|
db, err := h.pool.Get("ops:call_monitor")
|
|
if err != nil {
|
|
return nil, nil, 0, err
|
|
}
|
|
|
|
stats := &kpiStats{}
|
|
queries := []struct {
|
|
sql string
|
|
dst *int
|
|
}{
|
|
{"SELECT COUNT(*) FROM calls", &stats.TotalCalls},
|
|
{"SELECT COUNT(*) FROM calls WHERE success = 0", &stats.TotalErrors},
|
|
{"SELECT COUNT(*) FROM violations", &stats.TotalViolations},
|
|
{"SELECT COUNT(*) FROM copied_code", &stats.TotalCopies},
|
|
{"SELECT COUNT(*) FROM function_versions", &stats.TotalVersions},
|
|
}
|
|
for _, q := range queries {
|
|
if err := db.QueryRow(q.sql).Scan(q.dst); err != nil {
|
|
// tabla puede no existir si call_monitor sin migrar; lo dejamos a 0.
|
|
if err != sql.ErrNoRows {
|
|
log.Printf("[ws] snapshot %q: %v", q.sql, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
rows, err := db.Query(`
|
|
SELECT id, ts, function_id, tool_used, duration_ms, success, error_class, session_id
|
|
FROM calls
|
|
ORDER BY id DESC
|
|
LIMIT ?`, wsSnapshotLimit)
|
|
if err != nil {
|
|
return stats, nil, 0, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
recent := make([]callRow, 0, wsSnapshotLimit)
|
|
max := int64(0)
|
|
for rows.Next() {
|
|
var c callRow
|
|
var succ int
|
|
if err := rows.Scan(&c.ID, &c.TS, &c.FunctionID, &c.ToolUsed,
|
|
&c.DurationMS, &succ, &c.ErrorClass, &c.SessionID); err != nil {
|
|
return stats, nil, 0, err
|
|
}
|
|
c.Success = succ != 0
|
|
recent = append(recent, c)
|
|
if c.ID > max {
|
|
max = c.ID
|
|
}
|
|
}
|
|
return stats, recent, max, rows.Err()
|
|
}
|
|
|
|
// broadcast envia el mensaje a todos los subscribers. Si la cola de uno esta
|
|
// llena (cliente lento), drop del mensaje para ese — no bloquea al resto.
|
|
func (h *CallMonitorHub) broadcast(msg wsMessage) {
|
|
h.mu.Lock()
|
|
subs := make([]*subscriber, 0, len(h.subscribers))
|
|
for s := range h.subscribers {
|
|
subs = append(subs, s)
|
|
}
|
|
h.mu.Unlock()
|
|
|
|
for _, s := range subs {
|
|
select {
|
|
case s.out <- msg:
|
|
default:
|
|
// cola llena, cliente lento: drop frame para no atascar el hub.
|
|
log.Printf("[ws] dropping frame for slow subscriber")
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleEvents es el handler HTTP que upgradea a WS.
|
|
// Endpoint: GET /api/events/call_monitor
|
|
func (s *Server) handleEvents(hub *CallMonitorHub) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
|
InsecureSkipVerify: true, // CORS ya manejado por el middleware
|
|
})
|
|
if err != nil {
|
|
log.Printf("[ws] accept: %v", err)
|
|
return
|
|
}
|
|
defer conn.Close(websocket.StatusInternalError, "closing")
|
|
|
|
ctx, cancel := context.WithCancel(r.Context())
|
|
defer cancel()
|
|
|
|
sub := &subscriber{
|
|
conn: conn,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
out: make(chan wsMessage, 64),
|
|
}
|
|
hub.register(sub)
|
|
defer hub.unregister(sub)
|
|
|
|
// Snapshot inicial.
|
|
stats, recent, watermark, err := hub.fetchSnapshot()
|
|
if err != nil {
|
|
log.Printf("[ws] snapshot: %v", err)
|
|
conn.Close(websocket.StatusInternalError, "snapshot failed")
|
|
return
|
|
}
|
|
hub.setWatermark(watermark)
|
|
initial := wsMessage{
|
|
Type: "snapshot",
|
|
Watermark: watermark,
|
|
Stats: stats,
|
|
Calls: recent,
|
|
ServerTime: time.Now().Unix(),
|
|
}
|
|
if err := wsjson.Write(ctx, conn, initial); err != nil {
|
|
return
|
|
}
|
|
|
|
// Reader goroutine: procesa comandos del cliente (watermark override).
|
|
readErr := make(chan error, 1)
|
|
go func() {
|
|
for {
|
|
var cmd clientCmd
|
|
if err := wsjson.Read(ctx, conn, &cmd); err != nil {
|
|
readErr <- err
|
|
return
|
|
}
|
|
if cmd.Watermark > 0 {
|
|
// Cliente reanuda desde un watermark conocido — enviamos
|
|
// el delta acumulado desde ese punto.
|
|
rows, max, err := hub.fetchSince(cmd.Watermark)
|
|
if err == nil && len(rows) > 0 {
|
|
hub.setWatermark(max)
|
|
select {
|
|
case sub.out <- wsMessage{
|
|
Type: "delta",
|
|
Watermark: max,
|
|
Calls: rows,
|
|
ServerTime: time.Now().Unix(),
|
|
}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Writer loop: drena la cola del subscriber.
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case err := <-readErr:
|
|
if err != nil {
|
|
return
|
|
}
|
|
case msg, ok := <-sub.out:
|
|
if !ok {
|
|
return
|
|
}
|
|
wctx, wcancel := context.WithTimeout(ctx, wsBroadcastTimeout)
|
|
err := wsjson.Write(wctx, conn, msg)
|
|
wcancel()
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// jsonMarshal helper para tests / debug.
|
|
func mustJSON(v any) string {
|
|
b, _ := json.Marshal(v)
|
|
return string(b)
|
|
}
|