15 KiB
0011 — WebSocket & SSE Server
Metadata
| Campo | Valor |
|---|---|
| ID | 0011 |
| Estado | pendiente |
| Prioridad | alta |
| Tipo | feature |
Dependencias
| ID | Titulo | Estado | Requerido |
|---|---|---|---|
| 0009 | HTTP Server Foundation | pendiente | si |
Bloqueada por: #0009 — las funciones WebSocket y SSE son handlers HTTP que se montan sobre las primitivas de servidor (router, middleware chain, graceful shutdown).
Desbloquea: apps de dashboard en tiempo real, notificaciones push, pipelines con feedback live.
Objetivo
Crear funciones reutilizables de WebSocket y Server-Sent Events en Go (dominio infra) que permitan anadir comunicacion bidireccional (WS) y unidireccional server-to-client (SSE) a cualquier app del registry, componiendo con las primitivas HTTP de #0009.
Contexto
- Actualmente hay CERO funciones de WebSocket/SSE server en el registry. Las unicas conexiones WebSocket existentes son de cliente:
cdp_connect_go_browser(conecta a Chrome DevTools),stream_ticks_go_finance(consume un stream de ticks), y las funciones dejupyter_*_py_notebook(hablan con kernels Jupyter). stream_ticks_go_financeabre un WebSocket como cliente para recibir datos financieros — es especifico de un dominio, no una primitiva de server.- Patrones comunes que necesitan WS/SSE server: dashboards con datos en vivo, logs en streaming, notificaciones de estado de pipelines, chat entre agentes.
- WebSocket es bidireccional (cliente y servidor envian mensajes). SSE es unidireccional server-to-client (mas simple, funciona sobre HTTP normal, reconexion automatica en el browser).
- Go stdlib soporta SSE nativamente con
http.Flusher. Para WebSocket se necesita una dependencia externa:nhooyr.io/websocket(moderna, context-aware) ogorilla/websocket(clasica, ampliamente usada). - Con estas funciones, una app nueva que necesite real-time solo hace: montar el
ws_handlerosse_handlercomo una ruta mas del router de#0009.
Arquitectura
functions/infra/
├── ws_upgrader.go — NEW: upgrade HTTP connection a WebSocket
├── ws_upgrader.md — NEW
├── ws_hub.go — NEW: hub de conexiones (register/unregister/broadcast)
├── ws_hub.md — NEW
├── ws_broadcast.go — NEW: enviar mensaje a todos los clientes conectados
├── ws_broadcast.md — NEW
├── ws_send.go — NEW: enviar mensaje a un cliente especifico
├── ws_send.md — NEW
├── ws_handler.go — NEW: HTTP handler que upgradea y gestiona una conexion WS
├── ws_handler.md — NEW
├── sse_handler.go — NEW: HTTP handler para stream SSE con flush
├── sse_handler.md — NEW
├── sse_send.go — NEW: enviar un evento SSE (event, data, id)
├── sse_send.md — NEW
├── sse_keepalive.go — NEW: enviar comentarios keepalive periodicos
├── sse_keepalive.md — NEW
types/infra/
├── ws_hub.md — NEW: metadata del tipo WSHub
├── ws_client.md — NEW: metadata del tipo WSClient
├── ws_message.md — NEW: metadata del tipo WSMessage
├── sse_event.md — NEW: metadata del tipo SSEEvent
Patron pure core / impure shell
Todas las funciones de este issue son impuras — manejan conexiones de red, goroutines y estado mutable (el hub mantiene un mapa de clientes). No hay funciones puras en este issue porque la naturaleza del real-time es inherentemente I/O-bound.
El core puro vive en los tipos (structs sin metodos con side effects) y en la logica de serializado/parseado de mensajes que se delega a funciones existentes del registry (json_marshal, etc.).
Diseno
Tipos
// WSHub gestiona el ciclo de vida de conexiones WebSocket.
// Mantiene un mapa de clientes activos y canales para registro,
// desregistro y broadcast. Se ejecuta como goroutine via Run().
type WSHub struct {
Clients map[*WSClient]bool
Broadcast chan []byte
Register chan *WSClient
Unregister chan *WSClient
}
// WSClient representa una conexion WebSocket individual.
// Cada cliente tiene su propio canal de envio buffereado
// y una referencia al hub al que pertenece.
type WSClient struct {
Hub *WSHub
Conn *websocket.Conn
Send chan []byte
ID string
}
// WSMessage es un mensaje tipado que viaja por WebSocket.
// El campo Type permite al receptor decidir como procesar el payload.
type WSMessage struct {
Type string `json:"type"`
Payload []byte `json:"payload"`
SenderID string `json:"sender_id"`
Ts int64 `json:"ts"`
}
// SSEEvent es un evento Server-Sent Events segun la spec W3C.
// Campos opcionales: si Event esta vacio se envia solo data,
// si ID esta vacio no se incluye campo id, Retry en ms (0 = omitir).
type SSEEvent struct {
Event string `json:"event"`
Data string `json:"data"`
ID string `json:"id"`
Retry int `json:"retry"`
}
Funciones
| Funcion | Purity | Firma (simplificada) | Descripcion |
|---|---|---|---|
ws_upgrader |
impure | (w http.ResponseWriter, r *http.Request, origins []string) (*websocket.Conn, error) |
Upgrade HTTP a WebSocket con validacion de origen |
ws_hub |
impure | () *WSHub + (hub *WSHub) Run() |
Crea hub y lo ejecuta como goroutine (loop select sobre canales) |
ws_broadcast |
impure | (hub *WSHub, msg []byte) |
Envia mensaje al canal Broadcast del hub |
ws_send |
impure | (client *WSClient, msg []byte) error |
Envia mensaje al canal Send de un cliente especifico |
ws_handler |
impure | (hub *WSHub, origins []string) http.HandlerFunc |
Retorna handler que upgradea la conexion, registra el cliente en el hub, y lanza read/write pumps |
sse_handler |
impure | (events <-chan SSEEvent) http.HandlerFunc |
Retorna handler que setea headers SSE, consume del canal y flushea cada evento |
sse_send |
impure | (w http.ResponseWriter, event SSEEvent) error |
Escribe un evento SSE formateado al writer y hace flush |
sse_keepalive |
impure | (w http.ResponseWriter, interval time.Duration, done <-chan struct{}) |
Goroutine que envia : keepalive\n\n periodicamente hasta que done se cierre |
Protocolo WebSocket
El hub sigue el patron clasico de Go concurrency:
┌──────────┐
HTTP request ──→ │ws_handler│ ──→ ws_upgrader ──→ *websocket.Conn
└────┬─────┘
│
┌─────▼─────┐
│ WSClient │
│ .Send ch │
└─────┬─────┘
│ Register
┌─────▼─────┐
│ WSHub │ ←── ws_broadcast
│ .Run() │
│ loop { │
│ select │
│ } │
└───────────┘
│ Broadcast
┌─────▼─────┐
│ client.Send│ ──→ writePump ──→ conn.WriteMessage
└───────────┘
Cada cliente tiene dos goroutines internas:
- readPump: lee mensajes del conn y los envia al hub Broadcast (o los procesa con un callback)
- writePump: consume del canal Send y escribe al conn
Protocolo SSE
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
: keepalive
event: price_update
id: 42
data: {"symbol":"BTC","price":67000}
data: simple message without event type
: keepalive
sse_handler detecta si el ResponseWriter implementa http.Flusher y hace flush despues de cada evento. Si el cliente se desconecta (context cancelado), el handler retorna limpiamente.
Tareas
Fase 1: Tipos
- 1.1 Crear tipo
WSHubenfunctions/infra/ws_hub.gocon.mdentypes/infra/ws_hub.md - 1.2 Crear tipo
WSClientenfunctions/infra/ws_client.gocon.mdentypes/infra/ws_client.md - 1.3 Crear tipo
WSMessageenfunctions/infra/ws_message.gocon.mdentypes/infra/ws_message.md - 1.4 Crear tipo
SSEEventenfunctions/infra/sse_event.gocon.mdentypes/infra/sse_event.md
Fase 2: SSE (mas simple, sin dependencia externa)
- 2.1
sse_send— formatea y escribe un SSEEvent al writer, hace flush - 2.2
sse_keepalive— goroutine que envia comentarios keepalive periodicamente - 2.3
sse_handler— HTTP handler completo: setea headers, consume canal de eventos, flush, detecta desconexion
Fase 3: WebSocket
- 3.1 Elegir dependencia:
nhooyr.io/websocket(preferida por soporte nativo de context) ogorilla/websocket(mas madura). Anadir ago.mod. - 3.2
ws_upgrader— upgrade HTTP a WebSocket con validacion de origenes permitidos - 3.3
ws_hub— constructor + metodo Run() con loop select sobre Register/Unregister/Broadcast - 3.4
ws_send— envia bytes al canal Send de un cliente - 3.5
ws_broadcast— envia bytes al canal Broadcast del hub - 3.6
ws_handler— handler HTTP que upgradea, crea WSClient, registra en hub, lanza readPump/writePump
Fase 4: Tests
- 4.1 Tests de SSE con
httptest.NewRecordery pipe para simular flush - 4.2 Tests de WebSocket con
httptest.NewServery cliente WS de test - 4.3 Test de integracion: hub con multiples clientes, broadcast, desconexion
- 4.4
fn indexy verificar que todas las funciones y tipos aparecen en registry.db - 4.5
go vet -tags fts5limpio
Ejemplo de uso
Chat-like: broadcast de mensajes entre clientes
// Montar WebSocket en una app con las primitivas de #0009
hub := infra.NewWSHub()
go hub.Run()
routes := []infra.Route{
{Method: "GET", Path: "/health", Handler: healthHandler},
{Method: "GET", Path: "/api/data", Handler: dataHandler},
{Method: "GET", Path: "/ws", Handler: infra.WsHandler(hub, []string{"*"})},
}
mux := infra.HttpRouter(routes)
middleware := infra.HttpMiddlewareChain(
infra.HttpCorsMiddleware([]string{"*"}, []string{"GET", "POST"}),
infra.HttpLoggerMiddleware(os.Stdout),
)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
infra.HttpServe(":8080", middleware(mux), ctx)
// Cliente browser
const ws = new WebSocket("ws://localhost:8080/ws");
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
console.log(`[${msg.sender_id}] ${msg.type}: ${new TextDecoder().decode(msg.payload)}`);
};
ws.send(JSON.stringify({type: "chat", payload: btoa("hola"), sender_id: "user1"}));
Dashboard live updates via SSE
// Canal de eventos que se alimenta desde cualquier goroutine
events := make(chan infra.SSEEvent, 100)
// Goroutine que genera eventos (ej: watch a operations.db)
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for i := 0; ; i++ {
<-ticker.C
events <- infra.SSEEvent{
Event: "metrics_update",
ID: fmt.Sprintf("%d", i),
Data: fmt.Sprintf(`{"cpu": %.1f, "mem": %.1f}`, rand.Float64()*100, rand.Float64()*100),
}
}
}()
routes := []infra.Route{
{Method: "GET", Path: "/health", Handler: healthHandler},
{Method: "GET", Path: "/events", Handler: infra.SseHandler(events)},
{Method: "GET", Path: "/api/snapshot", Handler: snapshotHandler},
}
mux := infra.HttpRouter(routes)
infra.HttpServe(":8080", mux, ctx)
// Cliente browser — reconexion automatica gratis con EventSource
const es = new EventSource("http://localhost:8080/events");
es.addEventListener("metrics_update", (e) => {
const data = JSON.parse(e.data);
updateDashboard(data.cpu, data.mem);
});
es.onerror = () => console.log("reconectando...");
Notificaciones de estado de pipelines
// En una app que ejecuta pipelines del registry:
hub := infra.NewWSHub()
go hub.Run()
// Cada vez que un step termina, broadcast a todos los clientes
func onStepComplete(step string, status string, durationMs int) {
msg, _ := json.Marshal(infra.WSMessage{
Type: "step_complete",
Payload: []byte(fmt.Sprintf(`{"step":%q,"status":%q,"ms":%d}`, step, status, durationMs)),
SenderID: "pipeline_runner",
Ts: time.Now().UnixMilli(),
})
infra.WsBroadcast(hub, msg)
}
Decisiones de diseno
nhooyr.io/websocketcomo primera opcion: mas moderna que gorilla/websocket, soportacontext.Contextnativamente (encaja con graceful shutdown de#0009), API mas simple. Si da problemas de compatibilidad, fallback a gorilla.- Hub como goroutine con canales: patron estandar de Go para gestionar estado compartido sin mutex. Un solo punto de escritura al mapa de clientes evita races.
- SSE sin dependencias externas: solo usa
net/httpstdlib +http.Flusherinterface. Mas simple que WebSocket y suficiente para dashboards y notificaciones unidireccionales. - Separar send de broadcast:
ws_send(un cliente) yws_broadcast(todos) son funciones distintas porque tienen patrones de uso y error handling diferentes. - Canal buffereado en WSClient.Send: evita que un cliente lento bloquee el broadcast a los demas. Si el canal se llena, el hub desconecta al cliente.
- SSEEvent.Retry opcional: el campo retry en SSE le dice al browser cuanto esperar antes de reconectar. Dejarlo en 0 usa el default del browser (~3 segundos).
- Validacion de origenes en ws_upgrader: proteccion basica contra cross-origin WebSocket hijacking. Para produccion se complementa con auth middleware de
#0009. - Todas impuras, sin excepciones: a diferencia de
#0009donde hay funciones puras (middleware chain, CORS config), aqui todo toca red o estado mutable. No forzar pureza artificial.
Riesgos
- Leak de goroutines: Cada cliente WS genera 2 goroutines (readPump + writePump). Si no se limpian bien al desconectar, se acumulan. Mitigado con el patron hub.Unregister + defer cleanup en cada pump.
- Clientes lentos saturan el hub: Un cliente que no consume su canal Send puede bloquear el broadcast. Mitigado con canal buffereado y desconexion forzada si el buffer se llena (write deadline).
- Dependencia externa para WebSocket:
nhooyr.io/websocketanade un import fuera de stdlib. Mitigado porque es una dependencia mantenida, sin subdependencias transitivas, y solo afecta a las funcionesws_*(no contamina el resto del paquete infra). - Compatibilidad con proxies/load balancers: WebSocket requiere que proxies soporten upgrade HTTP. SSE funciona sobre HTTP normal sin problema. Documentar en los
.mdque WS detras de nginx/caddy necesita config especifica (proxy_passcon upgrade headers). - Scope creep hacia un framework de real-time: Mitigado manteniendo funciones atomicas. El hub es un mapa de conexiones, no un pub/sub con rooms, channels, ni autenticacion. Esas abstracciones se componen encima si se necesitan.