feat: tipos WSHub, WSClient, WSMessage, SSEEvent (issue 0011 fase 1)
This commit is contained in:
@@ -0,0 +1,11 @@
|
|||||||
|
package infra
|
||||||
|
|
||||||
|
// SSEEvent es un evento Server-Sent Events segun la spec W3C.
|
||||||
|
// Campos opcionales: si Event esta vacio se envia solo data,
|
||||||
|
// si ID esta vacio no se incluye campo id, Retry en ms (0 = omitir).
|
||||||
|
type SSEEvent struct {
|
||||||
|
Event string `json:"event"`
|
||||||
|
Data string `json:"data"`
|
||||||
|
ID string `json:"id"`
|
||||||
|
Retry int `json:"retry"`
|
||||||
|
}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
package infra
|
||||||
|
|
||||||
|
import (
|
||||||
|
"nhooyr.io/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
// WSClient representa una conexion WebSocket individual.
|
||||||
|
// Cada cliente tiene su propio canal de envio buffereado y una
|
||||||
|
// referencia al hub al que pertenece.
|
||||||
|
type WSClient struct {
|
||||||
|
Hub *WSHub
|
||||||
|
Conn *websocket.Conn
|
||||||
|
Send chan []byte
|
||||||
|
ID string
|
||||||
|
}
|
||||||
@@ -0,0 +1,69 @@
|
|||||||
|
package infra
|
||||||
|
|
||||||
|
// WSHub gestiona el ciclo de vida de conexiones WebSocket.
|
||||||
|
// Mantiene un mapa de clientes activos y canales para registro,
|
||||||
|
// desregistro y broadcast. Se ejecuta como goroutine via su Run().
|
||||||
|
type WSHub struct {
|
||||||
|
Clients map[*WSClient]bool
|
||||||
|
Broadcast chan []byte
|
||||||
|
Register chan *WSClient
|
||||||
|
Unregister chan *WSClient
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWSHub crea un WSHub vacio con canales inicializados.
|
||||||
|
// Ejecutar Run() en una goroutine para activar el loop de eventos.
|
||||||
|
func NewWSHub() *WSHub {
|
||||||
|
return &WSHub{
|
||||||
|
Clients: make(map[*WSClient]bool),
|
||||||
|
Broadcast: make(chan []byte, 256),
|
||||||
|
Register: make(chan *WSClient),
|
||||||
|
Unregister: make(chan *WSClient),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run ejecuta el loop principal del hub. Atiende registros, desregistros
|
||||||
|
// y broadcasts en un select. Bloqueante: lanzar como goroutine.
|
||||||
|
// Para parar, llamar a Stop().
|
||||||
|
func (h *WSHub) Run() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-h.done:
|
||||||
|
// Cerrar todos los Send y vaciar el mapa
|
||||||
|
for client := range h.Clients {
|
||||||
|
delete(h.Clients, client)
|
||||||
|
close(client.Send)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
case client := <-h.Register:
|
||||||
|
h.Clients[client] = true
|
||||||
|
case client := <-h.Unregister:
|
||||||
|
if _, ok := h.Clients[client]; ok {
|
||||||
|
delete(h.Clients, client)
|
||||||
|
close(client.Send)
|
||||||
|
}
|
||||||
|
case msg := <-h.Broadcast:
|
||||||
|
for client := range h.Clients {
|
||||||
|
select {
|
||||||
|
case client.Send <- msg:
|
||||||
|
default:
|
||||||
|
// Cliente lento: desconectar
|
||||||
|
delete(h.Clients, client)
|
||||||
|
close(client.Send)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop cierra el loop de Run() y limpia todos los clientes.
|
||||||
|
// Idempotente — llamar mas de una vez no panica.
|
||||||
|
func (h *WSHub) Stop() {
|
||||||
|
select {
|
||||||
|
case <-h.done:
|
||||||
|
// Ya cerrado
|
||||||
|
default:
|
||||||
|
close(h.done)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
package infra
|
||||||
|
|
||||||
|
// WSMessage es un mensaje tipado que viaja por WebSocket.
|
||||||
|
// El campo Type permite al receptor decidir como procesar el payload.
|
||||||
|
type WSMessage struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Payload []byte `json:"payload"`
|
||||||
|
SenderID string `json:"sender_id"`
|
||||||
|
Ts int64 `json:"ts"`
|
||||||
|
}
|
||||||
@@ -63,4 +63,5 @@ require (
|
|||||||
golang.org/x/text v0.29.0 // indirect
|
golang.org/x/text v0.29.0 // indirect
|
||||||
golang.org/x/tools v0.36.0 // indirect
|
golang.org/x/tools v0.36.0 // indirect
|
||||||
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
|
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
|
||||||
|
nhooyr.io/websocket v1.8.17 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -190,3 +190,5 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
|
|||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y=
|
||||||
|
nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
|
||||||
|
|||||||
@@ -0,0 +1,44 @@
|
|||||||
|
---
|
||||||
|
name: SSEEvent
|
||||||
|
lang: go
|
||||||
|
domain: infra
|
||||||
|
version: "1.0.0"
|
||||||
|
algebraic: product
|
||||||
|
definition: |
|
||||||
|
type SSEEvent struct {
|
||||||
|
Event string `json:"event"`
|
||||||
|
Data string `json:"data"`
|
||||||
|
ID string `json:"id"`
|
||||||
|
Retry int `json:"retry"`
|
||||||
|
}
|
||||||
|
description: "Evento Server-Sent Events segun la spec W3C. Campos opcionales: si Event esta vacio se envia solo data, si ID esta vacio no se incluye campo id, Retry en ms (0 = omitir y dejar el default del browser ~3000ms)."
|
||||||
|
tags: [sse, event, server-sent-events, infra, realtime]
|
||||||
|
uses_types: []
|
||||||
|
file_path: "functions/infra/sse_event.go"
|
||||||
|
---
|
||||||
|
|
||||||
|
## Ejemplo
|
||||||
|
|
||||||
|
```go
|
||||||
|
ev := SSEEvent{
|
||||||
|
Event: "metrics_update",
|
||||||
|
ID: "42",
|
||||||
|
Data: `{"cpu": 23.4, "mem": 87.1}`,
|
||||||
|
}
|
||||||
|
SSESend(w, ev)
|
||||||
|
```
|
||||||
|
|
||||||
|
Wire format generado:
|
||||||
|
|
||||||
|
```
|
||||||
|
event: metrics_update
|
||||||
|
id: 42
|
||||||
|
data: {"cpu": 23.4, "mem": 87.1}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
## Notas
|
||||||
|
|
||||||
|
Tipo producto con campos opcionales. `Data` puede contener saltos de linea — el formateador los traduce a multiples lineas `data:` segun la spec. `Retry` solo se envia si es > 0; cuando se envia, indica al cliente cuantos ms esperar antes de reconectar.
|
||||||
|
|
||||||
|
Para enviar solo un comentario keepalive (no un evento), no se usa este tipo: se escribe `: keepalive\n\n` directamente al writer.
|
||||||
@@ -0,0 +1,38 @@
|
|||||||
|
---
|
||||||
|
name: WSClient
|
||||||
|
lang: go
|
||||||
|
domain: infra
|
||||||
|
version: "1.0.0"
|
||||||
|
algebraic: product
|
||||||
|
definition: |
|
||||||
|
type WSClient struct {
|
||||||
|
Hub *WSHub
|
||||||
|
Conn *websocket.Conn
|
||||||
|
Send chan []byte
|
||||||
|
ID string
|
||||||
|
}
|
||||||
|
description: "Conexion WebSocket individual gestionada por un WSHub. Cada cliente tiene su propio canal Send buffereado para entregar mensajes en orden y un ID para identificarlo en broadcasts y handlers."
|
||||||
|
tags: [websocket, client, connection, server, infra, realtime]
|
||||||
|
uses_types: [WSHub_go_infra]
|
||||||
|
file_path: "functions/infra/ws_client.go"
|
||||||
|
---
|
||||||
|
|
||||||
|
## Ejemplo
|
||||||
|
|
||||||
|
```go
|
||||||
|
client := &WSClient{
|
||||||
|
Hub: hub,
|
||||||
|
Conn: wsConn,
|
||||||
|
Send: make(chan []byte, 64),
|
||||||
|
ID: "user-42",
|
||||||
|
}
|
||||||
|
hub.Register <- client
|
||||||
|
```
|
||||||
|
|
||||||
|
## Notas
|
||||||
|
|
||||||
|
Tipo producto. El canal `Send` debe ser buffereado (tipico 64-256) para evitar que un cliente lento bloquee el broadcast del hub. `Conn` usa `nhooyr.io/websocket` que soporta `context.Context` nativamente. `ID` es texto libre — la app que monta el handler decide su semantica (UUID, username, session id, etc.).
|
||||||
|
|
||||||
|
Cada cliente tiene normalmente dos goroutines internas:
|
||||||
|
- readPump: lee del Conn y publica al hub.Broadcast (o procesa con callback)
|
||||||
|
- writePump: consume del Send y escribe al Conn
|
||||||
@@ -0,0 +1,36 @@
|
|||||||
|
---
|
||||||
|
name: WSHub
|
||||||
|
lang: go
|
||||||
|
domain: infra
|
||||||
|
version: "1.0.0"
|
||||||
|
algebraic: product
|
||||||
|
definition: |
|
||||||
|
type WSHub struct {
|
||||||
|
Clients map[*WSClient]bool
|
||||||
|
Broadcast chan []byte
|
||||||
|
Register chan *WSClient
|
||||||
|
Unregister chan *WSClient
|
||||||
|
}
|
||||||
|
description: "Hub que gestiona el ciclo de vida de conexiones WebSocket. Mantiene un mapa de clientes activos y canales para registro, desregistro y broadcast. Se ejecuta como goroutine via Run() y se compone con ws_handler para servir conexiones."
|
||||||
|
tags: [websocket, hub, server, broadcast, infra, realtime]
|
||||||
|
uses_types: [WSClient_go_infra]
|
||||||
|
file_path: "functions/infra/ws_hub.go"
|
||||||
|
---
|
||||||
|
|
||||||
|
## Ejemplo
|
||||||
|
|
||||||
|
```go
|
||||||
|
hub := NewWSHub()
|
||||||
|
go hub.Run()
|
||||||
|
defer hub.Stop()
|
||||||
|
|
||||||
|
routes := []Route{
|
||||||
|
{Method: "GET", Path: "/ws", Handler: WSHandler(hub, []string{"*"})},
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Notas
|
||||||
|
|
||||||
|
Tipo producto. El campo `done` interno (no exportado) controla el cierre limpio. `Broadcast` es buffereado (256) para no bloquear emisores. Cada cliente lento se desconecta automaticamente si su canal `Send` se llena durante un broadcast — esta es la garantia anti-backpressure: un cliente que no consume no afecta a los demas.
|
||||||
|
|
||||||
|
El loop `Run()` es de un solo thread escribiendo al mapa, asi que no hace falta mutex. Para parar limpiamente: `hub.Stop()` cierra `done`, libera todos los `client.Send` y termina el loop.
|
||||||
@@ -0,0 +1,37 @@
|
|||||||
|
---
|
||||||
|
name: WSMessage
|
||||||
|
lang: go
|
||||||
|
domain: infra
|
||||||
|
version: "1.0.0"
|
||||||
|
algebraic: product
|
||||||
|
definition: |
|
||||||
|
type WSMessage struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Payload []byte `json:"payload"`
|
||||||
|
SenderID string `json:"sender_id"`
|
||||||
|
Ts int64 `json:"ts"`
|
||||||
|
}
|
||||||
|
description: "Mensaje tipado que viaja por WebSocket entre cliente y servidor. El campo Type permite al receptor decidir como procesar el payload. Incluye SenderID y timestamp para trazabilidad."
|
||||||
|
tags: [websocket, message, protocol, infra, realtime]
|
||||||
|
uses_types: []
|
||||||
|
file_path: "functions/infra/ws_message.go"
|
||||||
|
---
|
||||||
|
|
||||||
|
## Ejemplo
|
||||||
|
|
||||||
|
```go
|
||||||
|
msg := WSMessage{
|
||||||
|
Type: "chat",
|
||||||
|
Payload: []byte(`{"text":"hola"}`),
|
||||||
|
SenderID: "user-1",
|
||||||
|
Ts: time.Now().UnixMilli(),
|
||||||
|
}
|
||||||
|
data, _ := json.Marshal(msg)
|
||||||
|
WSBroadcast(hub, data)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Notas
|
||||||
|
|
||||||
|
Tipo producto. Las tags `json:` permiten serializar directamente a JSON para enviar por el wire. `Payload` es `[]byte` para permitir tanto JSON anidado como datos binarios codificados en base64. `Ts` en milisegundos epoch para compatibilidad con `Date.now()` en el browser.
|
||||||
|
|
||||||
|
No es obligatorio usar este tipo — apps que necesiten un protocolo distinto pueden enviar bytes arbitrarios via `WSBroadcast` o `WSSend`. Es solo un convenio recomendado para protocolos chat-like.
|
||||||
Reference in New Issue
Block a user