diff --git a/functions/infra/sse_event.go b/functions/infra/sse_event.go new file mode 100644 index 00000000..ba3b5dcb --- /dev/null +++ b/functions/infra/sse_event.go @@ -0,0 +1,11 @@ +package infra + +// SSEEvent es un evento Server-Sent Events segun la spec W3C. +// Campos opcionales: si Event esta vacio se envia solo data, +// si ID esta vacio no se incluye campo id, Retry en ms (0 = omitir). +type SSEEvent struct { + Event string `json:"event"` + Data string `json:"data"` + ID string `json:"id"` + Retry int `json:"retry"` +} diff --git a/functions/infra/ws_client.go b/functions/infra/ws_client.go new file mode 100644 index 00000000..8ff93b7c --- /dev/null +++ b/functions/infra/ws_client.go @@ -0,0 +1,15 @@ +package infra + +import ( + "nhooyr.io/websocket" +) + +// WSClient representa una conexion WebSocket individual. +// Cada cliente tiene su propio canal de envio buffereado y una +// referencia al hub al que pertenece. +type WSClient struct { + Hub *WSHub + Conn *websocket.Conn + Send chan []byte + ID string +} diff --git a/functions/infra/ws_hub.go b/functions/infra/ws_hub.go new file mode 100644 index 00000000..e66c2f65 --- /dev/null +++ b/functions/infra/ws_hub.go @@ -0,0 +1,69 @@ +package infra + +// WSHub gestiona el ciclo de vida de conexiones WebSocket. +// Mantiene un mapa de clientes activos y canales para registro, +// desregistro y broadcast. Se ejecuta como goroutine via su Run(). +type WSHub struct { + Clients map[*WSClient]bool + Broadcast chan []byte + Register chan *WSClient + Unregister chan *WSClient + done chan struct{} +} + +// NewWSHub crea un WSHub vacio con canales inicializados. +// Ejecutar Run() en una goroutine para activar el loop de eventos. +func NewWSHub() *WSHub { + return &WSHub{ + Clients: make(map[*WSClient]bool), + Broadcast: make(chan []byte, 256), + Register: make(chan *WSClient), + Unregister: make(chan *WSClient), + done: make(chan struct{}), + } +} + +// Run ejecuta el loop principal del hub. Atiende registros, desregistros +// y broadcasts en un select. Bloqueante: lanzar como goroutine. +// Para parar, llamar a Stop(). +func (h *WSHub) Run() { + for { + select { + case <-h.done: + // Cerrar todos los Send y vaciar el mapa + for client := range h.Clients { + delete(h.Clients, client) + close(client.Send) + } + return + case client := <-h.Register: + h.Clients[client] = true + case client := <-h.Unregister: + if _, ok := h.Clients[client]; ok { + delete(h.Clients, client) + close(client.Send) + } + case msg := <-h.Broadcast: + for client := range h.Clients { + select { + case client.Send <- msg: + default: + // Cliente lento: desconectar + delete(h.Clients, client) + close(client.Send) + } + } + } + } +} + +// Stop cierra el loop de Run() y limpia todos los clientes. +// Idempotente — llamar mas de una vez no panica. +func (h *WSHub) Stop() { + select { + case <-h.done: + // Ya cerrado + default: + close(h.done) + } +} diff --git a/functions/infra/ws_message.go b/functions/infra/ws_message.go new file mode 100644 index 00000000..258b1609 --- /dev/null +++ b/functions/infra/ws_message.go @@ -0,0 +1,10 @@ +package infra + +// WSMessage es un mensaje tipado que viaja por WebSocket. +// El campo Type permite al receptor decidir como procesar el payload. +type WSMessage struct { + Type string `json:"type"` + Payload []byte `json:"payload"` + SenderID string `json:"sender_id"` + Ts int64 `json:"ts"` +} diff --git a/go.mod b/go.mod index eeddff0a..03c5fc6e 100644 --- a/go.mod +++ b/go.mod @@ -63,4 +63,5 @@ require ( golang.org/x/text v0.29.0 // indirect golang.org/x/tools v0.36.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect + nhooyr.io/websocket v1.8.17 // indirect ) diff --git a/go.sum b/go.sum index 2375987e..0568b3d1 100644 --- a/go.sum +++ b/go.sum @@ -190,3 +190,5 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y= +nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/types/infra/sse_event.md b/types/infra/sse_event.md new file mode 100644 index 00000000..8d5aff97 --- /dev/null +++ b/types/infra/sse_event.md @@ -0,0 +1,44 @@ +--- +name: SSEEvent +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type SSEEvent struct { + Event string `json:"event"` + Data string `json:"data"` + ID string `json:"id"` + Retry int `json:"retry"` + } +description: "Evento Server-Sent Events segun la spec W3C. Campos opcionales: si Event esta vacio se envia solo data, si ID esta vacio no se incluye campo id, Retry en ms (0 = omitir y dejar el default del browser ~3000ms)." +tags: [sse, event, server-sent-events, infra, realtime] +uses_types: [] +file_path: "functions/infra/sse_event.go" +--- + +## Ejemplo + +```go +ev := SSEEvent{ + Event: "metrics_update", + ID: "42", + Data: `{"cpu": 23.4, "mem": 87.1}`, +} +SSESend(w, ev) +``` + +Wire format generado: + +``` +event: metrics_update +id: 42 +data: {"cpu": 23.4, "mem": 87.1} + +``` + +## Notas + +Tipo producto con campos opcionales. `Data` puede contener saltos de linea — el formateador los traduce a multiples lineas `data:` segun la spec. `Retry` solo se envia si es > 0; cuando se envia, indica al cliente cuantos ms esperar antes de reconectar. + +Para enviar solo un comentario keepalive (no un evento), no se usa este tipo: se escribe `: keepalive\n\n` directamente al writer. diff --git a/types/infra/ws_client.md b/types/infra/ws_client.md new file mode 100644 index 00000000..c0f10583 --- /dev/null +++ b/types/infra/ws_client.md @@ -0,0 +1,38 @@ +--- +name: WSClient +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type WSClient struct { + Hub *WSHub + Conn *websocket.Conn + Send chan []byte + ID string + } +description: "Conexion WebSocket individual gestionada por un WSHub. Cada cliente tiene su propio canal Send buffereado para entregar mensajes en orden y un ID para identificarlo en broadcasts y handlers." +tags: [websocket, client, connection, server, infra, realtime] +uses_types: [WSHub_go_infra] +file_path: "functions/infra/ws_client.go" +--- + +## Ejemplo + +```go +client := &WSClient{ + Hub: hub, + Conn: wsConn, + Send: make(chan []byte, 64), + ID: "user-42", +} +hub.Register <- client +``` + +## Notas + +Tipo producto. El canal `Send` debe ser buffereado (tipico 64-256) para evitar que un cliente lento bloquee el broadcast del hub. `Conn` usa `nhooyr.io/websocket` que soporta `context.Context` nativamente. `ID` es texto libre — la app que monta el handler decide su semantica (UUID, username, session id, etc.). + +Cada cliente tiene normalmente dos goroutines internas: +- readPump: lee del Conn y publica al hub.Broadcast (o procesa con callback) +- writePump: consume del Send y escribe al Conn diff --git a/types/infra/ws_hub.md b/types/infra/ws_hub.md new file mode 100644 index 00000000..31b2bc5a --- /dev/null +++ b/types/infra/ws_hub.md @@ -0,0 +1,36 @@ +--- +name: WSHub +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type WSHub struct { + Clients map[*WSClient]bool + Broadcast chan []byte + Register chan *WSClient + Unregister chan *WSClient + } +description: "Hub que gestiona el ciclo de vida de conexiones WebSocket. Mantiene un mapa de clientes activos y canales para registro, desregistro y broadcast. Se ejecuta como goroutine via Run() y se compone con ws_handler para servir conexiones." +tags: [websocket, hub, server, broadcast, infra, realtime] +uses_types: [WSClient_go_infra] +file_path: "functions/infra/ws_hub.go" +--- + +## Ejemplo + +```go +hub := NewWSHub() +go hub.Run() +defer hub.Stop() + +routes := []Route{ + {Method: "GET", Path: "/ws", Handler: WSHandler(hub, []string{"*"})}, +} +``` + +## Notas + +Tipo producto. El campo `done` interno (no exportado) controla el cierre limpio. `Broadcast` es buffereado (256) para no bloquear emisores. Cada cliente lento se desconecta automaticamente si su canal `Send` se llena durante un broadcast — esta es la garantia anti-backpressure: un cliente que no consume no afecta a los demas. + +El loop `Run()` es de un solo thread escribiendo al mapa, asi que no hace falta mutex. Para parar limpiamente: `hub.Stop()` cierra `done`, libera todos los `client.Send` y termina el loop. diff --git a/types/infra/ws_message.md b/types/infra/ws_message.md new file mode 100644 index 00000000..c9ef999d --- /dev/null +++ b/types/infra/ws_message.md @@ -0,0 +1,37 @@ +--- +name: WSMessage +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type WSMessage struct { + Type string `json:"type"` + Payload []byte `json:"payload"` + SenderID string `json:"sender_id"` + Ts int64 `json:"ts"` + } +description: "Mensaje tipado que viaja por WebSocket entre cliente y servidor. El campo Type permite al receptor decidir como procesar el payload. Incluye SenderID y timestamp para trazabilidad." +tags: [websocket, message, protocol, infra, realtime] +uses_types: [] +file_path: "functions/infra/ws_message.go" +--- + +## Ejemplo + +```go +msg := WSMessage{ + Type: "chat", + Payload: []byte(`{"text":"hola"}`), + SenderID: "user-1", + Ts: time.Now().UnixMilli(), +} +data, _ := json.Marshal(msg) +WSBroadcast(hub, data) +``` + +## Notas + +Tipo producto. Las tags `json:` permiten serializar directamente a JSON para enviar por el wire. `Payload` es `[]byte` para permitir tanto JSON anidado como datos binarios codificados en base64. `Ts` en milisegundos epoch para compatibilidad con `Date.now()` en el browser. + +No es obligatorio usar este tipo — apps que necesiten un protocolo distinto pueden enviar bytes arbitrarios via `WSBroadcast` o `WSSend`. Es solo un convenio recomendado para protocolos chat-like.