Files
fn_registry/dev/issues/completed/0011-websocket-sse.md
T

15 KiB

id, title, status, type, domain, scope, priority, depends, blocks, related, created, updated, tags
id title status type domain scope priority depends blocks related created updated tags
0011 WebSocket & SSE Server completado feature
multi-app alta
2026-05-17 2026-05-17

0011 — WebSocket & SSE Server

Metadata

Campo Valor
ID 0011
Estado pendiente
Prioridad alta
Tipo feature

Dependencias

ID Titulo Estado Requerido
0009 HTTP Server Foundation pendiente si

Bloqueada por: #0009 — las funciones WebSocket y SSE son handlers HTTP que se montan sobre las primitivas de servidor (router, middleware chain, graceful shutdown).

Desbloquea: apps de dashboard en tiempo real, notificaciones push, pipelines con feedback live.


Objetivo

Crear funciones reutilizables de WebSocket y Server-Sent Events en Go (dominio infra) que permitan anadir comunicacion bidireccional (WS) y unidireccional server-to-client (SSE) a cualquier app del registry, componiendo con las primitivas HTTP de #0009.

Contexto

  • Actualmente hay CERO funciones de WebSocket/SSE server en el registry. Las unicas conexiones WebSocket existentes son de cliente: cdp_connect_go_browser (conecta a Chrome DevTools), stream_ticks_go_finance (consume un stream de ticks), y las funciones de jupyter_*_py_notebook (hablan con kernels Jupyter).
  • stream_ticks_go_finance abre un WebSocket como cliente para recibir datos financieros — es especifico de un dominio, no una primitiva de server.
  • Patrones comunes que necesitan WS/SSE server: dashboards con datos en vivo, logs en streaming, notificaciones de estado de pipelines, chat entre agentes.
  • WebSocket es bidireccional (cliente y servidor envian mensajes). SSE es unidireccional server-to-client (mas simple, funciona sobre HTTP normal, reconexion automatica en el browser).
  • Go stdlib soporta SSE nativamente con http.Flusher. Para WebSocket se necesita una dependencia externa: nhooyr.io/websocket (moderna, context-aware) o gorilla/websocket (clasica, ampliamente usada).
  • Con estas funciones, una app nueva que necesite real-time solo hace: montar el ws_handler o sse_handler como una ruta mas del router de #0009.

Arquitectura

functions/infra/
├── ws_upgrader.go          — NEW: upgrade HTTP connection a WebSocket
├── ws_upgrader.md          — NEW
├── ws_hub.go               — NEW: hub de conexiones (register/unregister/broadcast)
├── ws_hub.md               — NEW
├── ws_broadcast.go         — NEW: enviar mensaje a todos los clientes conectados
├── ws_broadcast.md         — NEW
├── ws_send.go              — NEW: enviar mensaje a un cliente especifico
├── ws_send.md              — NEW
├── ws_handler.go           — NEW: HTTP handler que upgradea y gestiona una conexion WS
├── ws_handler.md           — NEW
├── sse_handler.go          — NEW: HTTP handler para stream SSE con flush
├── sse_handler.md          — NEW
├── sse_send.go             — NEW: enviar un evento SSE (event, data, id)
├── sse_send.md             — NEW
├── sse_keepalive.go        — NEW: enviar comentarios keepalive periodicos
├── sse_keepalive.md        — NEW

types/infra/
├── ws_hub.md               — NEW: metadata del tipo WSHub
├── ws_client.md            — NEW: metadata del tipo WSClient
├── ws_message.md           — NEW: metadata del tipo WSMessage
├── sse_event.md            — NEW: metadata del tipo SSEEvent

Patron pure core / impure shell

Todas las funciones de este issue son impuras — manejan conexiones de red, goroutines y estado mutable (el hub mantiene un mapa de clientes). No hay funciones puras en este issue porque la naturaleza del real-time es inherentemente I/O-bound.

El core puro vive en los tipos (structs sin metodos con side effects) y en la logica de serializado/parseado de mensajes que se delega a funciones existentes del registry (json_marshal, etc.).

Diseno

Tipos

// WSHub gestiona el ciclo de vida de conexiones WebSocket.
// Mantiene un mapa de clientes activos y canales para registro,
// desregistro y broadcast. Se ejecuta como goroutine via Run().
type WSHub struct {
    Clients    map[*WSClient]bool
    Broadcast  chan []byte
    Register   chan *WSClient
    Unregister chan *WSClient
}

// WSClient representa una conexion WebSocket individual.
// Cada cliente tiene su propio canal de envio buffereado
// y una referencia al hub al que pertenece.
type WSClient struct {
    Hub    *WSHub
    Conn   *websocket.Conn
    Send   chan []byte
    ID     string
}

// WSMessage es un mensaje tipado que viaja por WebSocket.
// El campo Type permite al receptor decidir como procesar el payload.
type WSMessage struct {
    Type     string `json:"type"`
    Payload  []byte `json:"payload"`
    SenderID string `json:"sender_id"`
    Ts       int64  `json:"ts"`
}

// SSEEvent es un evento Server-Sent Events segun la spec W3C.
// Campos opcionales: si Event esta vacio se envia solo data,
// si ID esta vacio no se incluye campo id, Retry en ms (0 = omitir).
type SSEEvent struct {
    Event string `json:"event"`
    Data  string `json:"data"`
    ID    string `json:"id"`
    Retry int    `json:"retry"`
}

Funciones

Funcion Purity Firma (simplificada) Descripcion
ws_upgrader impure (w http.ResponseWriter, r *http.Request, origins []string) (*websocket.Conn, error) Upgrade HTTP a WebSocket con validacion de origen
ws_hub impure () *WSHub + (hub *WSHub) Run() Crea hub y lo ejecuta como goroutine (loop select sobre canales)
ws_broadcast impure (hub *WSHub, msg []byte) Envia mensaje al canal Broadcast del hub
ws_send impure (client *WSClient, msg []byte) error Envia mensaje al canal Send de un cliente especifico
ws_handler impure (hub *WSHub, origins []string) http.HandlerFunc Retorna handler que upgradea la conexion, registra el cliente en el hub, y lanza read/write pumps
sse_handler impure (events <-chan SSEEvent) http.HandlerFunc Retorna handler que setea headers SSE, consume del canal y flushea cada evento
sse_send impure (w http.ResponseWriter, event SSEEvent) error Escribe un evento SSE formateado al writer y hace flush
sse_keepalive impure (w http.ResponseWriter, interval time.Duration, done <-chan struct{}) Goroutine que envia : keepalive\n\n periodicamente hasta que done se cierre

Protocolo WebSocket

El hub sigue el patron clasico de Go concurrency:

                    ┌──────────┐
  HTTP request ──→  │ws_handler│ ──→ ws_upgrader ──→ *websocket.Conn
                    └────┬─────┘
                         │
                   ┌─────▼─────┐
                   │  WSClient │
                   │  .Send ch │
                   └─────┬─────┘
                         │ Register
                   ┌─────▼─────┐
                   │   WSHub   │ ←── ws_broadcast
                   │  .Run()   │
                   │  loop {   │
                   │   select  │
                   │  }        │
                   └───────────┘
                         │ Broadcast
                   ┌─────▼─────┐
                   │ client.Send│ ──→ writePump ──→ conn.WriteMessage
                   └───────────┘

Cada cliente tiene dos goroutines internas:

  • readPump: lee mensajes del conn y los envia al hub Broadcast (o los procesa con un callback)
  • writePump: consume del canal Send y escribe al conn

Protocolo SSE

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

: keepalive

event: price_update
id: 42
data: {"symbol":"BTC","price":67000}

data: simple message without event type

: keepalive

sse_handler detecta si el ResponseWriter implementa http.Flusher y hace flush despues de cada evento. Si el cliente se desconecta (context cancelado), el handler retorna limpiamente.

Tareas

Fase 1: Tipos

  • 1.1 Crear tipo WSHub en functions/infra/ws_hub.go con .md en types/infra/ws_hub.md
  • 1.2 Crear tipo WSClient en functions/infra/ws_client.go con .md en types/infra/ws_client.md
  • 1.3 Crear tipo WSMessage en functions/infra/ws_message.go con .md en types/infra/ws_message.md
  • 1.4 Crear tipo SSEEvent en functions/infra/sse_event.go con .md en types/infra/sse_event.md

Fase 2: SSE (mas simple, sin dependencia externa)

  • 2.1 sse_send — formatea y escribe un SSEEvent al writer, hace flush
  • 2.2 sse_keepalive — goroutine que envia comentarios keepalive periodicamente
  • 2.3 sse_handler — HTTP handler completo: setea headers, consume canal de eventos, flush, detecta desconexion

Fase 3: WebSocket

  • 3.1 Elegir dependencia: nhooyr.io/websocket (preferida por soporte nativo de context) o gorilla/websocket (mas madura). Anadir a go.mod.
  • 3.2 ws_upgrader — upgrade HTTP a WebSocket con validacion de origenes permitidos
  • 3.3 ws_hub — constructor + metodo Run() con loop select sobre Register/Unregister/Broadcast
  • 3.4 ws_send — envia bytes al canal Send de un cliente
  • 3.5 ws_broadcast — envia bytes al canal Broadcast del hub
  • 3.6 ws_handler — handler HTTP que upgradea, crea WSClient, registra en hub, lanza readPump/writePump

Fase 4: Tests

  • 4.1 Tests de SSE con httptest.NewRecorder y pipe para simular flush
  • 4.2 Tests de WebSocket con httptest.NewServer y cliente WS de test
  • 4.3 Test de integracion: hub con multiples clientes, broadcast, desconexion
  • 4.4 fn index y verificar que todas las funciones y tipos aparecen en registry.db
  • 4.5 go vet -tags fts5 limpio

Ejemplo de uso

Chat-like: broadcast de mensajes entre clientes

// Montar WebSocket en una app con las primitivas de #0009
hub := infra.NewWSHub()
go hub.Run()

routes := []infra.Route{
    {Method: "GET", Path: "/health",   Handler: healthHandler},
    {Method: "GET", Path: "/api/data", Handler: dataHandler},
    {Method: "GET", Path: "/ws",       Handler: infra.WsHandler(hub, []string{"*"})},
}

mux := infra.HttpRouter(routes)
middleware := infra.HttpMiddlewareChain(
    infra.HttpCorsMiddleware([]string{"*"}, []string{"GET", "POST"}),
    infra.HttpLoggerMiddleware(os.Stdout),
)

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
infra.HttpServe(":8080", middleware(mux), ctx)
// Cliente browser
const ws = new WebSocket("ws://localhost:8080/ws");
ws.onmessage = (e) => {
    const msg = JSON.parse(e.data);
    console.log(`[${msg.sender_id}] ${msg.type}: ${new TextDecoder().decode(msg.payload)}`);
};
ws.send(JSON.stringify({type: "chat", payload: btoa("hola"), sender_id: "user1"}));

Dashboard live updates via SSE

// Canal de eventos que se alimenta desde cualquier goroutine
events := make(chan infra.SSEEvent, 100)

// Goroutine que genera eventos (ej: watch a operations.db)
go func() {
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()
    for i := 0; ; i++ {
        <-ticker.C
        events <- infra.SSEEvent{
            Event: "metrics_update",
            ID:    fmt.Sprintf("%d", i),
            Data:  fmt.Sprintf(`{"cpu": %.1f, "mem": %.1f}`, rand.Float64()*100, rand.Float64()*100),
        }
    }
}()

routes := []infra.Route{
    {Method: "GET", Path: "/health",       Handler: healthHandler},
    {Method: "GET", Path: "/events",       Handler: infra.SseHandler(events)},
    {Method: "GET", Path: "/api/snapshot",  Handler: snapshotHandler},
}

mux := infra.HttpRouter(routes)
infra.HttpServe(":8080", mux, ctx)
// Cliente browser — reconexion automatica gratis con EventSource
const es = new EventSource("http://localhost:8080/events");
es.addEventListener("metrics_update", (e) => {
    const data = JSON.parse(e.data);
    updateDashboard(data.cpu, data.mem);
});
es.onerror = () => console.log("reconectando...");

Notificaciones de estado de pipelines

// En una app que ejecuta pipelines del registry:
hub := infra.NewWSHub()
go hub.Run()

// Cada vez que un step termina, broadcast a todos los clientes
func onStepComplete(step string, status string, durationMs int) {
    msg, _ := json.Marshal(infra.WSMessage{
        Type:     "step_complete",
        Payload:  []byte(fmt.Sprintf(`{"step":%q,"status":%q,"ms":%d}`, step, status, durationMs)),
        SenderID: "pipeline_runner",
        Ts:       time.Now().UnixMilli(),
    })
    infra.WsBroadcast(hub, msg)
}

Decisiones de diseno

  • nhooyr.io/websocket como primera opcion: mas moderna que gorilla/websocket, soporta context.Context nativamente (encaja con graceful shutdown de #0009), API mas simple. Si da problemas de compatibilidad, fallback a gorilla.
  • Hub como goroutine con canales: patron estandar de Go para gestionar estado compartido sin mutex. Un solo punto de escritura al mapa de clientes evita races.
  • SSE sin dependencias externas: solo usa net/http stdlib + http.Flusher interface. Mas simple que WebSocket y suficiente para dashboards y notificaciones unidireccionales.
  • Separar send de broadcast: ws_send (un cliente) y ws_broadcast (todos) son funciones distintas porque tienen patrones de uso y error handling diferentes.
  • Canal buffereado en WSClient.Send: evita que un cliente lento bloquee el broadcast a los demas. Si el canal se llena, el hub desconecta al cliente.
  • SSEEvent.Retry opcional: el campo retry en SSE le dice al browser cuanto esperar antes de reconectar. Dejarlo en 0 usa el default del browser (~3 segundos).
  • Validacion de origenes en ws_upgrader: proteccion basica contra cross-origin WebSocket hijacking. Para produccion se complementa con auth middleware de #0009.
  • Todas impuras, sin excepciones: a diferencia de #0009 donde hay funciones puras (middleware chain, CORS config), aqui todo toca red o estado mutable. No forzar pureza artificial.

Riesgos

  • Leak de goroutines: Cada cliente WS genera 2 goroutines (readPump + writePump). Si no se limpian bien al desconectar, se acumulan. Mitigado con el patron hub.Unregister + defer cleanup en cada pump.
  • Clientes lentos saturan el hub: Un cliente que no consume su canal Send puede bloquear el broadcast. Mitigado con canal buffereado y desconexion forzada si el buffer se llena (write deadline).
  • Dependencia externa para WebSocket: nhooyr.io/websocket anade un import fuera de stdlib. Mitigado porque es una dependencia mantenida, sin subdependencias transitivas, y solo afecta a las funciones ws_* (no contamina el resto del paquete infra).
  • Compatibilidad con proxies/load balancers: WebSocket requiere que proxies soporten upgrade HTTP. SSE funciona sobre HTTP normal sin problema. Documentar en los .md que WS detras de nginx/caddy necesita config especifica (proxy_pass con upgrade headers).
  • Scope creep hacia un framework de real-time: Mitigado manteniendo funciones atomicas. El hub es un mapa de conexiones, no un pub/sub con rooms, channels, ni autenticacion. Esas abstracciones se componen encima si se necesitan.