--- id: "0011" title: "WebSocket & SSE Server" status: completado type: feature domain: [] scope: multi-app priority: alta depends: [] blocks: [] related: [] created: 2026-05-17 updated: 2026-05-17 tags: [] --- # 0011 — WebSocket & SSE Server ## Metadata | Campo | Valor | |-------|-------| | **ID** | 0011 | | **Estado** | pendiente | | **Prioridad** | alta | | **Tipo** | feature | ## Dependencias | ID | Titulo | Estado | Requerido | |----|--------|--------|-----------| | 0009 | HTTP Server Foundation | pendiente | si | **Bloqueada por:** `#0009` — las funciones WebSocket y SSE son handlers HTTP que se montan sobre las primitivas de servidor (router, middleware chain, graceful shutdown). **Desbloquea:** apps de dashboard en tiempo real, notificaciones push, pipelines con feedback live. --- ## Objetivo Crear funciones reutilizables de WebSocket y Server-Sent Events en Go (dominio infra) que permitan anadir comunicacion bidireccional (WS) y unidireccional server-to-client (SSE) a cualquier app del registry, componiendo con las primitivas HTTP de `#0009`. ## Contexto - Actualmente hay **CERO** funciones de WebSocket/SSE server en el registry. Las unicas conexiones WebSocket existentes son de **cliente**: `cdp_connect_go_browser` (conecta a Chrome DevTools), `stream_ticks_go_finance` (consume un stream de ticks), y las funciones de `jupyter_*_py_notebook` (hablan con kernels Jupyter). - `stream_ticks_go_finance` abre un WebSocket como cliente para recibir datos financieros — es especifico de un dominio, no una primitiva de server. - Patrones comunes que necesitan WS/SSE server: dashboards con datos en vivo, logs en streaming, notificaciones de estado de pipelines, chat entre agentes. - WebSocket es bidireccional (cliente y servidor envian mensajes). SSE es unidireccional server-to-client (mas simple, funciona sobre HTTP normal, reconexion automatica en el browser). - Go stdlib soporta SSE nativamente con `http.Flusher`. Para WebSocket se necesita una dependencia externa: `nhooyr.io/websocket` (moderna, context-aware) o `gorilla/websocket` (clasica, ampliamente usada). - Con estas funciones, una app nueva que necesite real-time solo hace: montar el `ws_handler` o `sse_handler` como una ruta mas del router de `#0009`. ## Arquitectura ``` functions/infra/ ├── ws_upgrader.go — NEW: upgrade HTTP connection a WebSocket ├── ws_upgrader.md — NEW ├── ws_hub.go — NEW: hub de conexiones (register/unregister/broadcast) ├── ws_hub.md — NEW ├── ws_broadcast.go — NEW: enviar mensaje a todos los clientes conectados ├── ws_broadcast.md — NEW ├── ws_send.go — NEW: enviar mensaje a un cliente especifico ├── ws_send.md — NEW ├── ws_handler.go — NEW: HTTP handler que upgradea y gestiona una conexion WS ├── ws_handler.md — NEW ├── sse_handler.go — NEW: HTTP handler para stream SSE con flush ├── sse_handler.md — NEW ├── sse_send.go — NEW: enviar un evento SSE (event, data, id) ├── sse_send.md — NEW ├── sse_keepalive.go — NEW: enviar comentarios keepalive periodicos ├── sse_keepalive.md — NEW types/infra/ ├── ws_hub.md — NEW: metadata del tipo WSHub ├── ws_client.md — NEW: metadata del tipo WSClient ├── ws_message.md — NEW: metadata del tipo WSMessage ├── sse_event.md — NEW: metadata del tipo SSEEvent ``` ### Patron pure core / impure shell Todas las funciones de este issue son **impuras** — manejan conexiones de red, goroutines y estado mutable (el hub mantiene un mapa de clientes). No hay funciones puras en este issue porque la naturaleza del real-time es inherentemente I/O-bound. El core puro vive en los tipos (structs sin metodos con side effects) y en la logica de serializado/parseado de mensajes que se delega a funciones existentes del registry (`json_marshal`, etc.). ## Diseno ### Tipos ```go // WSHub gestiona el ciclo de vida de conexiones WebSocket. // Mantiene un mapa de clientes activos y canales para registro, // desregistro y broadcast. Se ejecuta como goroutine via Run(). type WSHub struct { Clients map[*WSClient]bool Broadcast chan []byte Register chan *WSClient Unregister chan *WSClient } // WSClient representa una conexion WebSocket individual. // Cada cliente tiene su propio canal de envio buffereado // y una referencia al hub al que pertenece. type WSClient struct { Hub *WSHub Conn *websocket.Conn Send chan []byte ID string } // WSMessage es un mensaje tipado que viaja por WebSocket. // El campo Type permite al receptor decidir como procesar el payload. type WSMessage struct { Type string `json:"type"` Payload []byte `json:"payload"` SenderID string `json:"sender_id"` Ts int64 `json:"ts"` } // SSEEvent es un evento Server-Sent Events segun la spec W3C. // Campos opcionales: si Event esta vacio se envia solo data, // si ID esta vacio no se incluye campo id, Retry en ms (0 = omitir). type SSEEvent struct { Event string `json:"event"` Data string `json:"data"` ID string `json:"id"` Retry int `json:"retry"` } ``` ### Funciones | Funcion | Purity | Firma (simplificada) | Descripcion | |---------|--------|---------------------|-------------| | `ws_upgrader` | impure | `(w http.ResponseWriter, r *http.Request, origins []string) (*websocket.Conn, error)` | Upgrade HTTP a WebSocket con validacion de origen | | `ws_hub` | impure | `() *WSHub` + `(hub *WSHub) Run()` | Crea hub y lo ejecuta como goroutine (loop select sobre canales) | | `ws_broadcast` | impure | `(hub *WSHub, msg []byte)` | Envia mensaje al canal Broadcast del hub | | `ws_send` | impure | `(client *WSClient, msg []byte) error` | Envia mensaje al canal Send de un cliente especifico | | `ws_handler` | impure | `(hub *WSHub, origins []string) http.HandlerFunc` | Retorna handler que upgradea la conexion, registra el cliente en el hub, y lanza read/write pumps | | `sse_handler` | impure | `(events <-chan SSEEvent) http.HandlerFunc` | Retorna handler que setea headers SSE, consume del canal y flushea cada evento | | `sse_send` | impure | `(w http.ResponseWriter, event SSEEvent) error` | Escribe un evento SSE formateado al writer y hace flush | | `sse_keepalive` | impure | `(w http.ResponseWriter, interval time.Duration, done <-chan struct{})` | Goroutine que envia `: keepalive\n\n` periodicamente hasta que done se cierre | ### Protocolo WebSocket El hub sigue el patron clasico de Go concurrency: ``` ┌──────────┐ HTTP request ──→ │ws_handler│ ──→ ws_upgrader ──→ *websocket.Conn └────┬─────┘ │ ┌─────▼─────┐ │ WSClient │ │ .Send ch │ └─────┬─────┘ │ Register ┌─────▼─────┐ │ WSHub │ ←── ws_broadcast │ .Run() │ │ loop { │ │ select │ │ } │ └───────────┘ │ Broadcast ┌─────▼─────┐ │ client.Send│ ──→ writePump ──→ conn.WriteMessage └───────────┘ ``` Cada cliente tiene dos goroutines internas: - **readPump**: lee mensajes del conn y los envia al hub Broadcast (o los procesa con un callback) - **writePump**: consume del canal Send y escribe al conn ### Protocolo SSE ``` HTTP/1.1 200 OK Content-Type: text/event-stream Cache-Control: no-cache Connection: keep-alive : keepalive event: price_update id: 42 data: {"symbol":"BTC","price":67000} data: simple message without event type : keepalive ``` `sse_handler` detecta si el `ResponseWriter` implementa `http.Flusher` y hace flush despues de cada evento. Si el cliente se desconecta (context cancelado), el handler retorna limpiamente. ## Tareas ### Fase 1: Tipos - [ ] **1.1** Crear tipo `WSHub` en `functions/infra/ws_hub.go` con `.md` en `types/infra/ws_hub.md` - [ ] **1.2** Crear tipo `WSClient` en `functions/infra/ws_client.go` con `.md` en `types/infra/ws_client.md` - [ ] **1.3** Crear tipo `WSMessage` en `functions/infra/ws_message.go` con `.md` en `types/infra/ws_message.md` - [ ] **1.4** Crear tipo `SSEEvent` en `functions/infra/sse_event.go` con `.md` en `types/infra/sse_event.md` ### Fase 2: SSE (mas simple, sin dependencia externa) - [ ] **2.1** `sse_send` — formatea y escribe un SSEEvent al writer, hace flush - [ ] **2.2** `sse_keepalive` — goroutine que envia comentarios keepalive periodicamente - [ ] **2.3** `sse_handler` — HTTP handler completo: setea headers, consume canal de eventos, flush, detecta desconexion ### Fase 3: WebSocket - [ ] **3.1** Elegir dependencia: `nhooyr.io/websocket` (preferida por soporte nativo de context) o `gorilla/websocket` (mas madura). Anadir a `go.mod`. - [ ] **3.2** `ws_upgrader` — upgrade HTTP a WebSocket con validacion de origenes permitidos - [ ] **3.3** `ws_hub` — constructor + metodo Run() con loop select sobre Register/Unregister/Broadcast - [ ] **3.4** `ws_send` — envia bytes al canal Send de un cliente - [ ] **3.5** `ws_broadcast` — envia bytes al canal Broadcast del hub - [ ] **3.6** `ws_handler` — handler HTTP que upgradea, crea WSClient, registra en hub, lanza readPump/writePump ### Fase 4: Tests - [ ] **4.1** Tests de SSE con `httptest.NewRecorder` y pipe para simular flush - [ ] **4.2** Tests de WebSocket con `httptest.NewServer` y cliente WS de test - [ ] **4.3** Test de integracion: hub con multiples clientes, broadcast, desconexion - [ ] **4.4** `fn index` y verificar que todas las funciones y tipos aparecen en registry.db - [ ] **4.5** `go vet -tags fts5` limpio --- ## Ejemplo de uso ### Chat-like: broadcast de mensajes entre clientes ```go // Montar WebSocket en una app con las primitivas de #0009 hub := infra.NewWSHub() go hub.Run() routes := []infra.Route{ {Method: "GET", Path: "/health", Handler: healthHandler}, {Method: "GET", Path: "/api/data", Handler: dataHandler}, {Method: "GET", Path: "/ws", Handler: infra.WsHandler(hub, []string{"*"})}, } mux := infra.HttpRouter(routes) middleware := infra.HttpMiddlewareChain( infra.HttpCorsMiddleware([]string{"*"}, []string{"GET", "POST"}), infra.HttpLoggerMiddleware(os.Stdout), ) ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel() infra.HttpServe(":8080", middleware(mux), ctx) ``` ```javascript // Cliente browser const ws = new WebSocket("ws://localhost:8080/ws"); ws.onmessage = (e) => { const msg = JSON.parse(e.data); console.log(`[${msg.sender_id}] ${msg.type}: ${new TextDecoder().decode(msg.payload)}`); }; ws.send(JSON.stringify({type: "chat", payload: btoa("hola"), sender_id: "user1"})); ``` ### Dashboard live updates via SSE ```go // Canal de eventos que se alimenta desde cualquier goroutine events := make(chan infra.SSEEvent, 100) // Goroutine que genera eventos (ej: watch a operations.db) go func() { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for i := 0; ; i++ { <-ticker.C events <- infra.SSEEvent{ Event: "metrics_update", ID: fmt.Sprintf("%d", i), Data: fmt.Sprintf(`{"cpu": %.1f, "mem": %.1f}`, rand.Float64()*100, rand.Float64()*100), } } }() routes := []infra.Route{ {Method: "GET", Path: "/health", Handler: healthHandler}, {Method: "GET", Path: "/events", Handler: infra.SseHandler(events)}, {Method: "GET", Path: "/api/snapshot", Handler: snapshotHandler}, } mux := infra.HttpRouter(routes) infra.HttpServe(":8080", mux, ctx) ``` ```javascript // Cliente browser — reconexion automatica gratis con EventSource const es = new EventSource("http://localhost:8080/events"); es.addEventListener("metrics_update", (e) => { const data = JSON.parse(e.data); updateDashboard(data.cpu, data.mem); }); es.onerror = () => console.log("reconectando..."); ``` ### Notificaciones de estado de pipelines ```go // En una app que ejecuta pipelines del registry: hub := infra.NewWSHub() go hub.Run() // Cada vez que un step termina, broadcast a todos los clientes func onStepComplete(step string, status string, durationMs int) { msg, _ := json.Marshal(infra.WSMessage{ Type: "step_complete", Payload: []byte(fmt.Sprintf(`{"step":%q,"status":%q,"ms":%d}`, step, status, durationMs)), SenderID: "pipeline_runner", Ts: time.Now().UnixMilli(), }) infra.WsBroadcast(hub, msg) } ``` ## Decisiones de diseno - **`nhooyr.io/websocket` como primera opcion:** mas moderna que gorilla/websocket, soporta `context.Context` nativamente (encaja con graceful shutdown de `#0009`), API mas simple. Si da problemas de compatibilidad, fallback a gorilla. - **Hub como goroutine con canales:** patron estandar de Go para gestionar estado compartido sin mutex. Un solo punto de escritura al mapa de clientes evita races. - **SSE sin dependencias externas:** solo usa `net/http` stdlib + `http.Flusher` interface. Mas simple que WebSocket y suficiente para dashboards y notificaciones unidireccionales. - **Separar send de broadcast:** `ws_send` (un cliente) y `ws_broadcast` (todos) son funciones distintas porque tienen patrones de uso y error handling diferentes. - **Canal buffereado en WSClient.Send:** evita que un cliente lento bloquee el broadcast a los demas. Si el canal se llena, el hub desconecta al cliente. - **SSEEvent.Retry opcional:** el campo retry en SSE le dice al browser cuanto esperar antes de reconectar. Dejarlo en 0 usa el default del browser (~3 segundos). - **Validacion de origenes en ws_upgrader:** proteccion basica contra cross-origin WebSocket hijacking. Para produccion se complementa con auth middleware de `#0009`. - **Todas impuras, sin excepciones:** a diferencia de `#0009` donde hay funciones puras (middleware chain, CORS config), aqui todo toca red o estado mutable. No forzar pureza artificial. ## Riesgos - **Leak de goroutines:** Cada cliente WS genera 2 goroutines (readPump + writePump). Si no se limpian bien al desconectar, se acumulan. Mitigado con el patron hub.Unregister + defer cleanup en cada pump. - **Clientes lentos saturan el hub:** Un cliente que no consume su canal Send puede bloquear el broadcast. Mitigado con canal buffereado y desconexion forzada si el buffer se llena (write deadline). - **Dependencia externa para WebSocket:** `nhooyr.io/websocket` anade un import fuera de stdlib. Mitigado porque es una dependencia mantenida, sin subdependencias transitivas, y solo afecta a las funciones `ws_*` (no contamina el resto del paquete infra). - **Compatibilidad con proxies/load balancers:** WebSocket requiere que proxies soporten upgrade HTTP. SSE funciona sobre HTTP normal sin problema. Documentar en los `.md` que WS detras de nginx/caddy necesita config especifica (`proxy_pass` con upgrade headers). - **Scope creep hacia un framework de real-time:** Mitigado manteniendo funciones atomicas. El hub es un mapa de conexiones, no un pub/sub con rooms, channels, ni autenticacion. Esas abstracciones se componen encima si se necesitan.