docs: cerrar issue 0011
This commit is contained in:
@@ -0,0 +1,327 @@
|
||||
# 0011 — WebSocket & SSE Server
|
||||
|
||||
## Metadata
|
||||
|
||||
| Campo | Valor |
|
||||
|-------|-------|
|
||||
| **ID** | 0011 |
|
||||
| **Estado** | pendiente |
|
||||
| **Prioridad** | alta |
|
||||
| **Tipo** | feature |
|
||||
|
||||
## Dependencias
|
||||
|
||||
| ID | Titulo | Estado | Requerido |
|
||||
|----|--------|--------|-----------|
|
||||
| 0009 | HTTP Server Foundation | pendiente | si |
|
||||
|
||||
**Bloqueada por:** `#0009` — las funciones WebSocket y SSE son handlers HTTP que se montan sobre las primitivas de servidor (router, middleware chain, graceful shutdown).
|
||||
|
||||
**Desbloquea:** apps de dashboard en tiempo real, notificaciones push, pipelines con feedback live.
|
||||
|
||||
---
|
||||
|
||||
## Objetivo
|
||||
|
||||
Crear funciones reutilizables de WebSocket y Server-Sent Events en Go (dominio infra) que permitan anadir comunicacion bidireccional (WS) y unidireccional server-to-client (SSE) a cualquier app del registry, componiendo con las primitivas HTTP de `#0009`.
|
||||
|
||||
## Contexto
|
||||
|
||||
- Actualmente hay **CERO** funciones de WebSocket/SSE server en el registry. Las unicas conexiones WebSocket existentes son de **cliente**: `cdp_connect_go_browser` (conecta a Chrome DevTools), `stream_ticks_go_finance` (consume un stream de ticks), y las funciones de `jupyter_*_py_notebook` (hablan con kernels Jupyter).
|
||||
- `stream_ticks_go_finance` abre un WebSocket como cliente para recibir datos financieros — es especifico de un dominio, no una primitiva de server.
|
||||
- Patrones comunes que necesitan WS/SSE server: dashboards con datos en vivo, logs en streaming, notificaciones de estado de pipelines, chat entre agentes.
|
||||
- WebSocket es bidireccional (cliente y servidor envian mensajes). SSE es unidireccional server-to-client (mas simple, funciona sobre HTTP normal, reconexion automatica en el browser).
|
||||
- Go stdlib soporta SSE nativamente con `http.Flusher`. Para WebSocket se necesita una dependencia externa: `nhooyr.io/websocket` (moderna, context-aware) o `gorilla/websocket` (clasica, ampliamente usada).
|
||||
- Con estas funciones, una app nueva que necesite real-time solo hace: montar el `ws_handler` o `sse_handler` como una ruta mas del router de `#0009`.
|
||||
|
||||
## Arquitectura
|
||||
|
||||
```
|
||||
functions/infra/
|
||||
├── ws_upgrader.go — NEW: upgrade HTTP connection a WebSocket
|
||||
├── ws_upgrader.md — NEW
|
||||
├── ws_hub.go — NEW: hub de conexiones (register/unregister/broadcast)
|
||||
├── ws_hub.md — NEW
|
||||
├── ws_broadcast.go — NEW: enviar mensaje a todos los clientes conectados
|
||||
├── ws_broadcast.md — NEW
|
||||
├── ws_send.go — NEW: enviar mensaje a un cliente especifico
|
||||
├── ws_send.md — NEW
|
||||
├── ws_handler.go — NEW: HTTP handler que upgradea y gestiona una conexion WS
|
||||
├── ws_handler.md — NEW
|
||||
├── sse_handler.go — NEW: HTTP handler para stream SSE con flush
|
||||
├── sse_handler.md — NEW
|
||||
├── sse_send.go — NEW: enviar un evento SSE (event, data, id)
|
||||
├── sse_send.md — NEW
|
||||
├── sse_keepalive.go — NEW: enviar comentarios keepalive periodicos
|
||||
├── sse_keepalive.md — NEW
|
||||
|
||||
types/infra/
|
||||
├── ws_hub.md — NEW: metadata del tipo WSHub
|
||||
├── ws_client.md — NEW: metadata del tipo WSClient
|
||||
├── ws_message.md — NEW: metadata del tipo WSMessage
|
||||
├── sse_event.md — NEW: metadata del tipo SSEEvent
|
||||
```
|
||||
|
||||
### Patron pure core / impure shell
|
||||
|
||||
Todas las funciones de este issue son **impuras** — manejan conexiones de red, goroutines y estado mutable (el hub mantiene un mapa de clientes). No hay funciones puras en este issue porque la naturaleza del real-time es inherentemente I/O-bound.
|
||||
|
||||
El core puro vive en los tipos (structs sin metodos con side effects) y en la logica de serializado/parseado de mensajes que se delega a funciones existentes del registry (`json_marshal`, etc.).
|
||||
|
||||
## Diseno
|
||||
|
||||
### Tipos
|
||||
|
||||
```go
|
||||
// WSHub gestiona el ciclo de vida de conexiones WebSocket.
|
||||
// Mantiene un mapa de clientes activos y canales para registro,
|
||||
// desregistro y broadcast. Se ejecuta como goroutine via Run().
|
||||
type WSHub struct {
|
||||
Clients map[*WSClient]bool
|
||||
Broadcast chan []byte
|
||||
Register chan *WSClient
|
||||
Unregister chan *WSClient
|
||||
}
|
||||
|
||||
// WSClient representa una conexion WebSocket individual.
|
||||
// Cada cliente tiene su propio canal de envio buffereado
|
||||
// y una referencia al hub al que pertenece.
|
||||
type WSClient struct {
|
||||
Hub *WSHub
|
||||
Conn *websocket.Conn
|
||||
Send chan []byte
|
||||
ID string
|
||||
}
|
||||
|
||||
// WSMessage es un mensaje tipado que viaja por WebSocket.
|
||||
// El campo Type permite al receptor decidir como procesar el payload.
|
||||
type WSMessage struct {
|
||||
Type string `json:"type"`
|
||||
Payload []byte `json:"payload"`
|
||||
SenderID string `json:"sender_id"`
|
||||
Ts int64 `json:"ts"`
|
||||
}
|
||||
|
||||
// SSEEvent es un evento Server-Sent Events segun la spec W3C.
|
||||
// Campos opcionales: si Event esta vacio se envia solo data,
|
||||
// si ID esta vacio no se incluye campo id, Retry en ms (0 = omitir).
|
||||
type SSEEvent struct {
|
||||
Event string `json:"event"`
|
||||
Data string `json:"data"`
|
||||
ID string `json:"id"`
|
||||
Retry int `json:"retry"`
|
||||
}
|
||||
```
|
||||
|
||||
### Funciones
|
||||
|
||||
| Funcion | Purity | Firma (simplificada) | Descripcion |
|
||||
|---------|--------|---------------------|-------------|
|
||||
| `ws_upgrader` | impure | `(w http.ResponseWriter, r *http.Request, origins []string) (*websocket.Conn, error)` | Upgrade HTTP a WebSocket con validacion de origen |
|
||||
| `ws_hub` | impure | `() *WSHub` + `(hub *WSHub) Run()` | Crea hub y lo ejecuta como goroutine (loop select sobre canales) |
|
||||
| `ws_broadcast` | impure | `(hub *WSHub, msg []byte)` | Envia mensaje al canal Broadcast del hub |
|
||||
| `ws_send` | impure | `(client *WSClient, msg []byte) error` | Envia mensaje al canal Send de un cliente especifico |
|
||||
| `ws_handler` | impure | `(hub *WSHub, origins []string) http.HandlerFunc` | Retorna handler que upgradea la conexion, registra el cliente en el hub, y lanza read/write pumps |
|
||||
| `sse_handler` | impure | `(events <-chan SSEEvent) http.HandlerFunc` | Retorna handler que setea headers SSE, consume del canal y flushea cada evento |
|
||||
| `sse_send` | impure | `(w http.ResponseWriter, event SSEEvent) error` | Escribe un evento SSE formateado al writer y hace flush |
|
||||
| `sse_keepalive` | impure | `(w http.ResponseWriter, interval time.Duration, done <-chan struct{})` | Goroutine que envia `: keepalive\n\n` periodicamente hasta que done se cierre |
|
||||
|
||||
### Protocolo WebSocket
|
||||
|
||||
El hub sigue el patron clasico de Go concurrency:
|
||||
|
||||
```
|
||||
┌──────────┐
|
||||
HTTP request ──→ │ws_handler│ ──→ ws_upgrader ──→ *websocket.Conn
|
||||
└────┬─────┘
|
||||
│
|
||||
┌─────▼─────┐
|
||||
│ WSClient │
|
||||
│ .Send ch │
|
||||
└─────┬─────┘
|
||||
│ Register
|
||||
┌─────▼─────┐
|
||||
│ WSHub │ ←── ws_broadcast
|
||||
│ .Run() │
|
||||
│ loop { │
|
||||
│ select │
|
||||
│ } │
|
||||
└───────────┘
|
||||
│ Broadcast
|
||||
┌─────▼─────┐
|
||||
│ client.Send│ ──→ writePump ──→ conn.WriteMessage
|
||||
└───────────┘
|
||||
```
|
||||
|
||||
Cada cliente tiene dos goroutines internas:
|
||||
- **readPump**: lee mensajes del conn y los envia al hub Broadcast (o los procesa con un callback)
|
||||
- **writePump**: consume del canal Send y escribe al conn
|
||||
|
||||
### Protocolo SSE
|
||||
|
||||
```
|
||||
HTTP/1.1 200 OK
|
||||
Content-Type: text/event-stream
|
||||
Cache-Control: no-cache
|
||||
Connection: keep-alive
|
||||
|
||||
: keepalive
|
||||
|
||||
event: price_update
|
||||
id: 42
|
||||
data: {"symbol":"BTC","price":67000}
|
||||
|
||||
data: simple message without event type
|
||||
|
||||
: keepalive
|
||||
```
|
||||
|
||||
`sse_handler` detecta si el `ResponseWriter` implementa `http.Flusher` y hace flush despues de cada evento. Si el cliente se desconecta (context cancelado), el handler retorna limpiamente.
|
||||
|
||||
## Tareas
|
||||
|
||||
### Fase 1: Tipos
|
||||
|
||||
- [ ] **1.1** Crear tipo `WSHub` en `functions/infra/ws_hub.go` con `.md` en `types/infra/ws_hub.md`
|
||||
- [ ] **1.2** Crear tipo `WSClient` en `functions/infra/ws_client.go` con `.md` en `types/infra/ws_client.md`
|
||||
- [ ] **1.3** Crear tipo `WSMessage` en `functions/infra/ws_message.go` con `.md` en `types/infra/ws_message.md`
|
||||
- [ ] **1.4** Crear tipo `SSEEvent` en `functions/infra/sse_event.go` con `.md` en `types/infra/sse_event.md`
|
||||
|
||||
### Fase 2: SSE (mas simple, sin dependencia externa)
|
||||
|
||||
- [ ] **2.1** `sse_send` — formatea y escribe un SSEEvent al writer, hace flush
|
||||
- [ ] **2.2** `sse_keepalive` — goroutine que envia comentarios keepalive periodicamente
|
||||
- [ ] **2.3** `sse_handler` — HTTP handler completo: setea headers, consume canal de eventos, flush, detecta desconexion
|
||||
|
||||
### Fase 3: WebSocket
|
||||
|
||||
- [ ] **3.1** Elegir dependencia: `nhooyr.io/websocket` (preferida por soporte nativo de context) o `gorilla/websocket` (mas madura). Anadir a `go.mod`.
|
||||
- [ ] **3.2** `ws_upgrader` — upgrade HTTP a WebSocket con validacion de origenes permitidos
|
||||
- [ ] **3.3** `ws_hub` — constructor + metodo Run() con loop select sobre Register/Unregister/Broadcast
|
||||
- [ ] **3.4** `ws_send` — envia bytes al canal Send de un cliente
|
||||
- [ ] **3.5** `ws_broadcast` — envia bytes al canal Broadcast del hub
|
||||
- [ ] **3.6** `ws_handler` — handler HTTP que upgradea, crea WSClient, registra en hub, lanza readPump/writePump
|
||||
|
||||
### Fase 4: Tests
|
||||
|
||||
- [ ] **4.1** Tests de SSE con `httptest.NewRecorder` y pipe para simular flush
|
||||
- [ ] **4.2** Tests de WebSocket con `httptest.NewServer` y cliente WS de test
|
||||
- [ ] **4.3** Test de integracion: hub con multiples clientes, broadcast, desconexion
|
||||
- [ ] **4.4** `fn index` y verificar que todas las funciones y tipos aparecen en registry.db
|
||||
- [ ] **4.5** `go vet -tags fts5` limpio
|
||||
|
||||
---
|
||||
|
||||
## Ejemplo de uso
|
||||
|
||||
### Chat-like: broadcast de mensajes entre clientes
|
||||
|
||||
```go
|
||||
// Montar WebSocket en una app con las primitivas de #0009
|
||||
hub := infra.NewWSHub()
|
||||
go hub.Run()
|
||||
|
||||
routes := []infra.Route{
|
||||
{Method: "GET", Path: "/health", Handler: healthHandler},
|
||||
{Method: "GET", Path: "/api/data", Handler: dataHandler},
|
||||
{Method: "GET", Path: "/ws", Handler: infra.WsHandler(hub, []string{"*"})},
|
||||
}
|
||||
|
||||
mux := infra.HttpRouter(routes)
|
||||
middleware := infra.HttpMiddlewareChain(
|
||||
infra.HttpCorsMiddleware([]string{"*"}, []string{"GET", "POST"}),
|
||||
infra.HttpLoggerMiddleware(os.Stdout),
|
||||
)
|
||||
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
defer cancel()
|
||||
infra.HttpServe(":8080", middleware(mux), ctx)
|
||||
```
|
||||
|
||||
```javascript
|
||||
// Cliente browser
|
||||
const ws = new WebSocket("ws://localhost:8080/ws");
|
||||
ws.onmessage = (e) => {
|
||||
const msg = JSON.parse(e.data);
|
||||
console.log(`[${msg.sender_id}] ${msg.type}: ${new TextDecoder().decode(msg.payload)}`);
|
||||
};
|
||||
ws.send(JSON.stringify({type: "chat", payload: btoa("hola"), sender_id: "user1"}));
|
||||
```
|
||||
|
||||
### Dashboard live updates via SSE
|
||||
|
||||
```go
|
||||
// Canal de eventos que se alimenta desde cualquier goroutine
|
||||
events := make(chan infra.SSEEvent, 100)
|
||||
|
||||
// Goroutine que genera eventos (ej: watch a operations.db)
|
||||
go func() {
|
||||
ticker := time.NewTicker(2 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for i := 0; ; i++ {
|
||||
<-ticker.C
|
||||
events <- infra.SSEEvent{
|
||||
Event: "metrics_update",
|
||||
ID: fmt.Sprintf("%d", i),
|
||||
Data: fmt.Sprintf(`{"cpu": %.1f, "mem": %.1f}`, rand.Float64()*100, rand.Float64()*100),
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
routes := []infra.Route{
|
||||
{Method: "GET", Path: "/health", Handler: healthHandler},
|
||||
{Method: "GET", Path: "/events", Handler: infra.SseHandler(events)},
|
||||
{Method: "GET", Path: "/api/snapshot", Handler: snapshotHandler},
|
||||
}
|
||||
|
||||
mux := infra.HttpRouter(routes)
|
||||
infra.HttpServe(":8080", mux, ctx)
|
||||
```
|
||||
|
||||
```javascript
|
||||
// Cliente browser — reconexion automatica gratis con EventSource
|
||||
const es = new EventSource("http://localhost:8080/events");
|
||||
es.addEventListener("metrics_update", (e) => {
|
||||
const data = JSON.parse(e.data);
|
||||
updateDashboard(data.cpu, data.mem);
|
||||
});
|
||||
es.onerror = () => console.log("reconectando...");
|
||||
```
|
||||
|
||||
### Notificaciones de estado de pipelines
|
||||
|
||||
```go
|
||||
// En una app que ejecuta pipelines del registry:
|
||||
hub := infra.NewWSHub()
|
||||
go hub.Run()
|
||||
|
||||
// Cada vez que un step termina, broadcast a todos los clientes
|
||||
func onStepComplete(step string, status string, durationMs int) {
|
||||
msg, _ := json.Marshal(infra.WSMessage{
|
||||
Type: "step_complete",
|
||||
Payload: []byte(fmt.Sprintf(`{"step":%q,"status":%q,"ms":%d}`, step, status, durationMs)),
|
||||
SenderID: "pipeline_runner",
|
||||
Ts: time.Now().UnixMilli(),
|
||||
})
|
||||
infra.WsBroadcast(hub, msg)
|
||||
}
|
||||
```
|
||||
|
||||
## Decisiones de diseno
|
||||
|
||||
- **`nhooyr.io/websocket` como primera opcion:** mas moderna que gorilla/websocket, soporta `context.Context` nativamente (encaja con graceful shutdown de `#0009`), API mas simple. Si da problemas de compatibilidad, fallback a gorilla.
|
||||
- **Hub como goroutine con canales:** patron estandar de Go para gestionar estado compartido sin mutex. Un solo punto de escritura al mapa de clientes evita races.
|
||||
- **SSE sin dependencias externas:** solo usa `net/http` stdlib + `http.Flusher` interface. Mas simple que WebSocket y suficiente para dashboards y notificaciones unidireccionales.
|
||||
- **Separar send de broadcast:** `ws_send` (un cliente) y `ws_broadcast` (todos) son funciones distintas porque tienen patrones de uso y error handling diferentes.
|
||||
- **Canal buffereado en WSClient.Send:** evita que un cliente lento bloquee el broadcast a los demas. Si el canal se llena, el hub desconecta al cliente.
|
||||
- **SSEEvent.Retry opcional:** el campo retry en SSE le dice al browser cuanto esperar antes de reconectar. Dejarlo en 0 usa el default del browser (~3 segundos).
|
||||
- **Validacion de origenes en ws_upgrader:** proteccion basica contra cross-origin WebSocket hijacking. Para produccion se complementa con auth middleware de `#0009`.
|
||||
- **Todas impuras, sin excepciones:** a diferencia de `#0009` donde hay funciones puras (middleware chain, CORS config), aqui todo toca red o estado mutable. No forzar pureza artificial.
|
||||
|
||||
## Riesgos
|
||||
|
||||
- **Leak de goroutines:** Cada cliente WS genera 2 goroutines (readPump + writePump). Si no se limpian bien al desconectar, se acumulan. Mitigado con el patron hub.Unregister + defer cleanup en cada pump.
|
||||
- **Clientes lentos saturan el hub:** Un cliente que no consume su canal Send puede bloquear el broadcast. Mitigado con canal buffereado y desconexion forzada si el buffer se llena (write deadline).
|
||||
- **Dependencia externa para WebSocket:** `nhooyr.io/websocket` anade un import fuera de stdlib. Mitigado porque es una dependencia mantenida, sin subdependencias transitivas, y solo afecta a las funciones `ws_*` (no contamina el resto del paquete infra).
|
||||
- **Compatibilidad con proxies/load balancers:** WebSocket requiere que proxies soporten upgrade HTTP. SSE funciona sobre HTTP normal sin problema. Documentar en los `.md` que WS detras de nginx/caddy necesita config especifica (`proxy_pass` con upgrade headers).
|
||||
- **Scope creep hacia un framework de real-time:** Mitigado manteniendo funciones atomicas. El hub es un mapa de conexiones, no un pub/sub con rooms, channels, ni autenticacion. Esas abstracciones se componen encima si se necesitan.
|
||||
Reference in New Issue
Block a user