Files

15 KiB

id, title, status, type, domain, scope, priority, depends, blocks, related, created, updated, tags
id title status type domain scope priority depends blocks related created updated tags
0011 WebSocket & SSE Server completado feature
multi-app alta
2026-05-17 2026-05-17

0011 — WebSocket & SSE Server

Metadata

Campo Valor
ID 0011
Estado pendiente
Prioridad alta
Tipo feature

Dependencias

ID Titulo Estado Requerido
0009 HTTP Server Foundation pendiente si

Bloqueada por: #0009 — las funciones WebSocket y SSE son handlers HTTP que se montan sobre las primitivas de servidor (router, middleware chain, graceful shutdown).

Desbloquea: apps de dashboard en tiempo real, notificaciones push, pipelines con feedback live.


Objetivo

Crear funciones reutilizables de WebSocket y Server-Sent Events en Go (dominio infra) que permitan anadir comunicacion bidireccional (WS) y unidireccional server-to-client (SSE) a cualquier app del registry, componiendo con las primitivas HTTP de #0009.

Contexto

  • Actualmente hay CERO funciones de WebSocket/SSE server en el registry. Las unicas conexiones WebSocket existentes son de cliente: cdp_connect_go_browser (conecta a Chrome DevTools), stream_ticks_go_finance (consume un stream de ticks), y las funciones de jupyter_*_py_notebook (hablan con kernels Jupyter).
  • stream_ticks_go_finance abre un WebSocket como cliente para recibir datos financieros — es especifico de un dominio, no una primitiva de server.
  • Patrones comunes que necesitan WS/SSE server: dashboards con datos en vivo, logs en streaming, notificaciones de estado de pipelines, chat entre agentes.
  • WebSocket es bidireccional (cliente y servidor envian mensajes). SSE es unidireccional server-to-client (mas simple, funciona sobre HTTP normal, reconexion automatica en el browser).
  • Go stdlib soporta SSE nativamente con http.Flusher. Para WebSocket se necesita una dependencia externa: nhooyr.io/websocket (moderna, context-aware) o gorilla/websocket (clasica, ampliamente usada).
  • Con estas funciones, una app nueva que necesite real-time solo hace: montar el ws_handler o sse_handler como una ruta mas del router de #0009.

Arquitectura

functions/infra/
├── ws_upgrader.go          — NEW: upgrade HTTP connection a WebSocket
├── ws_upgrader.md          — NEW
├── ws_hub.go               — NEW: hub de conexiones (register/unregister/broadcast)
├── ws_hub.md               — NEW
├── ws_broadcast.go         — NEW: enviar mensaje a todos los clientes conectados
├── ws_broadcast.md         — NEW
├── ws_send.go              — NEW: enviar mensaje a un cliente especifico
├── ws_send.md              — NEW
├── ws_handler.go           — NEW: HTTP handler que upgradea y gestiona una conexion WS
├── ws_handler.md           — NEW
├── sse_handler.go          — NEW: HTTP handler para stream SSE con flush
├── sse_handler.md          — NEW
├── sse_send.go             — NEW: enviar un evento SSE (event, data, id)
├── sse_send.md             — NEW
├── sse_keepalive.go        — NEW: enviar comentarios keepalive periodicos
├── sse_keepalive.md        — NEW

types/infra/
├── ws_hub.md               — NEW: metadata del tipo WSHub
├── ws_client.md            — NEW: metadata del tipo WSClient
├── ws_message.md           — NEW: metadata del tipo WSMessage
├── sse_event.md            — NEW: metadata del tipo SSEEvent

Patron pure core / impure shell

Todas las funciones de este issue son impuras — manejan conexiones de red, goroutines y estado mutable (el hub mantiene un mapa de clientes). No hay funciones puras en este issue porque la naturaleza del real-time es inherentemente I/O-bound.

El core puro vive en los tipos (structs sin metodos con side effects) y en la logica de serializado/parseado de mensajes que se delega a funciones existentes del registry (json_marshal, etc.).

Diseno

Tipos

// WSHub gestiona el ciclo de vida de conexiones WebSocket.
// Mantiene un mapa de clientes activos y canales para registro,
// desregistro y broadcast. Se ejecuta como goroutine via Run().
type WSHub struct {
    Clients    map[*WSClient]bool
    Broadcast  chan []byte
    Register   chan *WSClient
    Unregister chan *WSClient
}

// WSClient representa una conexion WebSocket individual.
// Cada cliente tiene su propio canal de envio buffereado
// y una referencia al hub al que pertenece.
type WSClient struct {
    Hub    *WSHub
    Conn   *websocket.Conn
    Send   chan []byte
    ID     string
}

// WSMessage es un mensaje tipado que viaja por WebSocket.
// El campo Type permite al receptor decidir como procesar el payload.
type WSMessage struct {
    Type     string `json:"type"`
    Payload  []byte `json:"payload"`
    SenderID string `json:"sender_id"`
    Ts       int64  `json:"ts"`
}

// SSEEvent es un evento Server-Sent Events segun la spec W3C.
// Campos opcionales: si Event esta vacio se envia solo data,
// si ID esta vacio no se incluye campo id, Retry en ms (0 = omitir).
type SSEEvent struct {
    Event string `json:"event"`
    Data  string `json:"data"`
    ID    string `json:"id"`
    Retry int    `json:"retry"`
}

Funciones

Funcion Purity Firma (simplificada) Descripcion
ws_upgrader impure (w http.ResponseWriter, r *http.Request, origins []string) (*websocket.Conn, error) Upgrade HTTP a WebSocket con validacion de origen
ws_hub impure () *WSHub + (hub *WSHub) Run() Crea hub y lo ejecuta como goroutine (loop select sobre canales)
ws_broadcast impure (hub *WSHub, msg []byte) Envia mensaje al canal Broadcast del hub
ws_send impure (client *WSClient, msg []byte) error Envia mensaje al canal Send de un cliente especifico
ws_handler impure (hub *WSHub, origins []string) http.HandlerFunc Retorna handler que upgradea la conexion, registra el cliente en el hub, y lanza read/write pumps
sse_handler impure (events <-chan SSEEvent) http.HandlerFunc Retorna handler que setea headers SSE, consume del canal y flushea cada evento
sse_send impure (w http.ResponseWriter, event SSEEvent) error Escribe un evento SSE formateado al writer y hace flush
sse_keepalive impure (w http.ResponseWriter, interval time.Duration, done <-chan struct{}) Goroutine que envia : keepalive\n\n periodicamente hasta que done se cierre

Protocolo WebSocket

El hub sigue el patron clasico de Go concurrency:

                    ┌──────────┐
  HTTP request ──→  │ws_handler│ ──→ ws_upgrader ──→ *websocket.Conn
                    └────┬─────┘
                         │
                   ┌─────▼─────┐
                   │  WSClient │
                   │  .Send ch │
                   └─────┬─────┘
                         │ Register
                   ┌─────▼─────┐
                   │   WSHub   │ ←── ws_broadcast
                   │  .Run()   │
                   │  loop {   │
                   │   select  │
                   │  }        │
                   └───────────┘
                         │ Broadcast
                   ┌─────▼─────┐
                   │ client.Send│ ──→ writePump ──→ conn.WriteMessage
                   └───────────┘

Cada cliente tiene dos goroutines internas:

  • readPump: lee mensajes del conn y los envia al hub Broadcast (o los procesa con un callback)
  • writePump: consume del canal Send y escribe al conn

Protocolo SSE

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

: keepalive

event: price_update
id: 42
data: {"symbol":"BTC","price":67000}

data: simple message without event type

: keepalive

sse_handler detecta si el ResponseWriter implementa http.Flusher y hace flush despues de cada evento. Si el cliente se desconecta (context cancelado), el handler retorna limpiamente.

Tareas

Fase 1: Tipos

  • 1.1 Crear tipo WSHub en functions/infra/ws_hub.go con .md en types/infra/ws_hub.md
  • 1.2 Crear tipo WSClient en functions/infra/ws_client.go con .md en types/infra/ws_client.md
  • 1.3 Crear tipo WSMessage en functions/infra/ws_message.go con .md en types/infra/ws_message.md
  • 1.4 Crear tipo SSEEvent en functions/infra/sse_event.go con .md en types/infra/sse_event.md

Fase 2: SSE (mas simple, sin dependencia externa)

  • 2.1 sse_send — formatea y escribe un SSEEvent al writer, hace flush
  • 2.2 sse_keepalive — goroutine que envia comentarios keepalive periodicamente
  • 2.3 sse_handler — HTTP handler completo: setea headers, consume canal de eventos, flush, detecta desconexion

Fase 3: WebSocket

  • 3.1 Elegir dependencia: nhooyr.io/websocket (preferida por soporte nativo de context) o gorilla/websocket (mas madura). Anadir a go.mod.
  • 3.2 ws_upgrader — upgrade HTTP a WebSocket con validacion de origenes permitidos
  • 3.3 ws_hub — constructor + metodo Run() con loop select sobre Register/Unregister/Broadcast
  • 3.4 ws_send — envia bytes al canal Send de un cliente
  • 3.5 ws_broadcast — envia bytes al canal Broadcast del hub
  • 3.6 ws_handler — handler HTTP que upgradea, crea WSClient, registra en hub, lanza readPump/writePump

Fase 4: Tests

  • 4.1 Tests de SSE con httptest.NewRecorder y pipe para simular flush
  • 4.2 Tests de WebSocket con httptest.NewServer y cliente WS de test
  • 4.3 Test de integracion: hub con multiples clientes, broadcast, desconexion
  • 4.4 fn index y verificar que todas las funciones y tipos aparecen en registry.db
  • 4.5 go vet -tags fts5 limpio

Ejemplo de uso

Chat-like: broadcast de mensajes entre clientes

// Montar WebSocket en una app con las primitivas de #0009
hub := infra.NewWSHub()
go hub.Run()

routes := []infra.Route{
    {Method: "GET", Path: "/health",   Handler: healthHandler},
    {Method: "GET", Path: "/api/data", Handler: dataHandler},
    {Method: "GET", Path: "/ws",       Handler: infra.WsHandler(hub, []string{"*"})},
}

mux := infra.HttpRouter(routes)
middleware := infra.HttpMiddlewareChain(
    infra.HttpCorsMiddleware([]string{"*"}, []string{"GET", "POST"}),
    infra.HttpLoggerMiddleware(os.Stdout),
)

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
infra.HttpServe(":8080", middleware(mux), ctx)
// Cliente browser
const ws = new WebSocket("ws://localhost:8080/ws");
ws.onmessage = (e) => {
    const msg = JSON.parse(e.data);
    console.log(`[${msg.sender_id}] ${msg.type}: ${new TextDecoder().decode(msg.payload)}`);
};
ws.send(JSON.stringify({type: "chat", payload: btoa("hola"), sender_id: "user1"}));

Dashboard live updates via SSE

// Canal de eventos que se alimenta desde cualquier goroutine
events := make(chan infra.SSEEvent, 100)

// Goroutine que genera eventos (ej: watch a operations.db)
go func() {
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()
    for i := 0; ; i++ {
        <-ticker.C
        events <- infra.SSEEvent{
            Event: "metrics_update",
            ID:    fmt.Sprintf("%d", i),
            Data:  fmt.Sprintf(`{"cpu": %.1f, "mem": %.1f}`, rand.Float64()*100, rand.Float64()*100),
        }
    }
}()

routes := []infra.Route{
    {Method: "GET", Path: "/health",       Handler: healthHandler},
    {Method: "GET", Path: "/events",       Handler: infra.SseHandler(events)},
    {Method: "GET", Path: "/api/snapshot",  Handler: snapshotHandler},
}

mux := infra.HttpRouter(routes)
infra.HttpServe(":8080", mux, ctx)
// Cliente browser — reconexion automatica gratis con EventSource
const es = new EventSource("http://localhost:8080/events");
es.addEventListener("metrics_update", (e) => {
    const data = JSON.parse(e.data);
    updateDashboard(data.cpu, data.mem);
});
es.onerror = () => console.log("reconectando...");

Notificaciones de estado de pipelines

// En una app que ejecuta pipelines del registry:
hub := infra.NewWSHub()
go hub.Run()

// Cada vez que un step termina, broadcast a todos los clientes
func onStepComplete(step string, status string, durationMs int) {
    msg, _ := json.Marshal(infra.WSMessage{
        Type:     "step_complete",
        Payload:  []byte(fmt.Sprintf(`{"step":%q,"status":%q,"ms":%d}`, step, status, durationMs)),
        SenderID: "pipeline_runner",
        Ts:       time.Now().UnixMilli(),
    })
    infra.WsBroadcast(hub, msg)
}

Decisiones de diseno

  • nhooyr.io/websocket como primera opcion: mas moderna que gorilla/websocket, soporta context.Context nativamente (encaja con graceful shutdown de #0009), API mas simple. Si da problemas de compatibilidad, fallback a gorilla.
  • Hub como goroutine con canales: patron estandar de Go para gestionar estado compartido sin mutex. Un solo punto de escritura al mapa de clientes evita races.
  • SSE sin dependencias externas: solo usa net/http stdlib + http.Flusher interface. Mas simple que WebSocket y suficiente para dashboards y notificaciones unidireccionales.
  • Separar send de broadcast: ws_send (un cliente) y ws_broadcast (todos) son funciones distintas porque tienen patrones de uso y error handling diferentes.
  • Canal buffereado en WSClient.Send: evita que un cliente lento bloquee el broadcast a los demas. Si el canal se llena, el hub desconecta al cliente.
  • SSEEvent.Retry opcional: el campo retry en SSE le dice al browser cuanto esperar antes de reconectar. Dejarlo en 0 usa el default del browser (~3 segundos).
  • Validacion de origenes en ws_upgrader: proteccion basica contra cross-origin WebSocket hijacking. Para produccion se complementa con auth middleware de #0009.
  • Todas impuras, sin excepciones: a diferencia de #0009 donde hay funciones puras (middleware chain, CORS config), aqui todo toca red o estado mutable. No forzar pureza artificial.

Riesgos

  • Leak de goroutines: Cada cliente WS genera 2 goroutines (readPump + writePump). Si no se limpian bien al desconectar, se acumulan. Mitigado con el patron hub.Unregister + defer cleanup en cada pump.
  • Clientes lentos saturan el hub: Un cliente que no consume su canal Send puede bloquear el broadcast. Mitigado con canal buffereado y desconexion forzada si el buffer se llena (write deadline).
  • Dependencia externa para WebSocket: nhooyr.io/websocket anade un import fuera de stdlib. Mitigado porque es una dependencia mantenida, sin subdependencias transitivas, y solo afecta a las funciones ws_* (no contamina el resto del paquete infra).
  • Compatibilidad con proxies/load balancers: WebSocket requiere que proxies soporten upgrade HTTP. SSE funciona sobre HTTP normal sin problema. Documentar en los .md que WS detras de nginx/caddy necesita config especifica (proxy_pass con upgrade headers).
  • Scope creep hacia un framework de real-time: Mitigado manteniendo funciones atomicas. El hub es un mapa de conexiones, no un pub/sub con rooms, channels, ni autenticacion. Esas abstracciones se componen encima si se necesitan.