diff --git a/functions/infra/ws_broadcast.go b/functions/infra/ws_broadcast.go new file mode 100644 index 00000000..c035e722 --- /dev/null +++ b/functions/infra/ws_broadcast.go @@ -0,0 +1,14 @@ +package infra + +import "fmt" + +// WSBroadcast envia bytes al canal Broadcast del hub para entregar el mensaje +// a todos los clientes conectados. La entrega real la hace el loop Run() del hub. +// Bloqueante hasta que el canal Broadcast tenga espacio (capacidad 256 por defecto). +func WSBroadcast(hub *WSHub, msg []byte) error { + if hub == nil { + return fmt.Errorf("ws broadcast: hub is nil") + } + hub.Broadcast <- msg + return nil +} diff --git a/functions/infra/ws_broadcast.md b/functions/infra/ws_broadcast.md new file mode 100644 index 00000000..d58105c1 --- /dev/null +++ b/functions/infra/ws_broadcast.md @@ -0,0 +1,50 @@ +--- +name: ws_broadcast +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func WSBroadcast(hub *WSHub, msg []byte) error" +description: "Envia bytes al canal Broadcast del hub para que se entreguen a todos los clientes WebSocket conectados. La entrega real la hace el loop Run() del hub. Bloqueante hasta que el canal Broadcast tenga espacio (capacidad 256 por defecto). Retorna error si el hub es nil." +tags: [websocket, broadcast, server, fanout, infra, realtime] +uses_functions: [] +uses_types: [WSHub_go_infra] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [fmt] +params: + - name: hub + desc: "*WSHub donde estan registrados los clientes. Si es nil retorna error." + - name: msg + desc: "bytes a entregar a todos los clientes. Tipicamente JSON serializado de un WSMessage." +output: "error si hub es nil. Nil si el mensaje se encolo en el canal Broadcast (la entrega es asincrona)." +tested: true +tests: ["envia mensaje al canal Broadcast del hub", "retorna error si hub es nil", "el hub entrega el mensaje a todos los clientes registrados"] +test_file_path: "functions/infra/ws_test.go" +file_path: "functions/infra/ws_broadcast.go" +--- + +## Ejemplo + +```go +hub := NewWSHub() +go hub.Run() +defer hub.Stop() + +// Notificar a todos los clientes conectados +msg, _ := json.Marshal(WSMessage{ + Type: "step_complete", + Payload: []byte(`{"step":"build","status":"ok"}`), + SenderID: "pipeline_runner", + Ts: time.Now().UnixMilli(), +}) +WSBroadcast(hub, msg) +``` + +## Notas + +Bloqueante con backpressure controlado: si el canal `Broadcast` se llena (256 mensajes pendientes), la llamada se bloquea hasta que el hub procese alguno. Esto da feedback natural al productor cuando el sistema esta saturado. + +La entrega a clientes individuales es no bloqueante (clientes lentos se desconectan automaticamente). Si necesitas semantica at-least-once con retry, montar la logica encima. diff --git a/functions/infra/ws_handler.go b/functions/infra/ws_handler.go new file mode 100644 index 00000000..78aff867 --- /dev/null +++ b/functions/infra/ws_handler.go @@ -0,0 +1,92 @@ +package infra + +import ( + "context" + "crypto/rand" + "encoding/hex" + "net/http" + "time" + + "nhooyr.io/websocket" +) + +// WSHandler retorna un http.HandlerFunc que upgradea la conexion HTTP a WebSocket, +// crea un WSClient, lo registra en el hub y lanza dos goroutines: +// - readPump: lee mensajes del Conn y los publica al hub.Broadcast +// - writePump: consume del client.Send y escribe al Conn +// +// El cliente se desregistra del hub cuando alguna de las pumps termina (cliente +// desconectado, error de I/O, o canal cerrado). Asigna un ID hex aleatorio si +// no se sobreescribe externamente. +func WSHandler(hub *WSHub, origins []string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + conn, err := WSUpgrader(w, r, origins) + if err != nil { + return + } + + client := &WSClient{ + Hub: hub, + Conn: conn, + Send: make(chan []byte, 64), + ID: randomID(), + } + hub.Register <- client + + // writePump + go wsWritePump(client) + // readPump (bloqueante en el handler para mantener viva la request) + wsReadPump(client) + } +} + +// wsReadPump lee mensajes del Conn y los publica al hub.Broadcast. +// Termina si Read retorna error (cliente desconectado o cerrado). +// Al terminar, desregistra el cliente y cierra la conexion. +func wsReadPump(client *WSClient) { + defer func() { + // Unregister no bloqueante: si el hub ya esta cerrado, no esperamos + select { + case client.Hub.Unregister <- client: + case <-client.Hub.done: + } + _ = client.Conn.Close(websocket.StatusNormalClosure, "") + }() + + ctx := context.Background() + for { + _, data, err := client.Conn.Read(ctx) + if err != nil { + return + } + // Encolar al hub.Broadcast sin bloquear si esta lleno + select { + case client.Hub.Broadcast <- data: + default: + // Hub saturado: dropear mensaje del cliente para no bloquear el read + } + } +} + +// wsWritePump consume del canal Send del cliente y escribe al Conn. +// Termina si el canal se cierra (hub desregistro al cliente) o si Write falla. +func wsWritePump(client *WSClient) { + for msg := range client.Send { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + err := client.Conn.Write(ctx, websocket.MessageText, msg) + cancel() + if err != nil { + return + } + } +} + +// randomID genera un identificador hex aleatorio de 16 caracteres (8 bytes). +// No es criptograficamente perfecto para autenticacion — solo identificacion. +func randomID() string { + b := make([]byte, 8) + if _, err := rand.Read(b); err != nil { + return "anon" + } + return hex.EncodeToString(b) +} diff --git a/functions/infra/ws_handler.md b/functions/infra/ws_handler.md new file mode 100644 index 00000000..6618dd83 --- /dev/null +++ b/functions/infra/ws_handler.md @@ -0,0 +1,49 @@ +--- +name: ws_handler +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func WSHandler(hub *WSHub, origins []string) http.HandlerFunc" +description: "Retorna un http.HandlerFunc que upgradea la conexion HTTP a WebSocket via WSUpgrader, crea un WSClient con ID hex aleatorio, lo registra en el hub y lanza readPump y writePump como goroutines. La readPump bloquea el handler para mantener la request viva. Al desconectar (error de I/O o canal cerrado) se desregistra el cliente y se cierra la conexion limpiamente." +tags: [websocket, handler, http, server, hub, infra, realtime] +uses_functions: [ws_upgrader_go_infra] +uses_types: [WSHub_go_infra, WSClient_go_infra] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [context, crypto/rand, encoding/hex, net/http, time, "nhooyr.io/websocket"] +params: + - name: hub + desc: "*WSHub donde se registran los clientes que se conecten via este handler. Debe estar corriendo (hub.Run() lanzado en goroutine)." + - name: origins + desc: "lista de patrones de origen permitidos para el upgrade. Pasa directamente a WSUpgrader. Para dev: `[\"*\"]`. Para prod: lista explicita." +output: "http.HandlerFunc lista para montarse en una ruta GET. Cada conexion entrante crea un cliente nuevo en el hub." +tested: true +tests: ["upgradea conexion y registra cliente en hub", "broadcast del hub llega al cliente conectado", "desregistra cliente al desconectar", "multiples clientes reciben el broadcast"] +test_file_path: "functions/infra/ws_test.go" +file_path: "functions/infra/ws_handler.go" +--- + +## Ejemplo + +```go +hub := NewWSHub() +go hub.Run() +defer hub.Stop() + +routes := []Route{ + {Method: "GET", Path: "/ws", Handler: WSHandler(hub, []string{"example.com"})}, +} +mux := HTTPRouter(routes) +HTTPServe(":8080", mux, ctx) +``` + +## Notas + +El handler asigna un ID hex aleatorio de 16 caracteres a cada cliente (`crypto/rand`). Si se quiere usar IDs propios (UUID, username autenticado), envolver el handler para sobreescribir `client.ID` antes del Register. + +readPump publica todos los mensajes recibidos al `hub.Broadcast` — modo chat-like por defecto. Para procesar mensajes con un callback (sin reenviar a todos), copiar el codigo y reemplazar el Broadcast por la logica deseada. Esta separacion es deliberada: el hub es un mecanismo de fan-out, no un router de mensajes. + +writePump usa write deadline de 10s — si un Write tarda mas, asume cliente muerto y termina. Esto previene goroutines colgadas si el cliente cierra TCP sin enviar Close frame. diff --git a/functions/infra/ws_hub.go b/functions/infra/ws_hub.go index e66c2f65..0cfbce6a 100644 --- a/functions/infra/ws_hub.go +++ b/functions/infra/ws_hub.go @@ -1,5 +1,7 @@ package infra +import "sync/atomic" + // WSHub gestiona el ciclo de vida de conexiones WebSocket. // Mantiene un mapa de clientes activos y canales para registro, // desregistro y broadcast. Se ejecuta como goroutine via su Run(). @@ -9,6 +11,13 @@ type WSHub struct { Register chan *WSClient Unregister chan *WSClient done chan struct{} + count atomic.Int64 +} + +// ClientCount retorna el numero de clientes registrados de forma thread-safe. +// Util para metricas, dashboards y tests. +func (h *WSHub) ClientCount() int { + return int(h.count.Load()) } // NewWSHub crea un WSHub vacio con canales inicializados. @@ -35,13 +44,16 @@ func (h *WSHub) Run() { delete(h.Clients, client) close(client.Send) } + h.count.Store(0) return case client := <-h.Register: h.Clients[client] = true + h.count.Store(int64(len(h.Clients))) case client := <-h.Unregister: if _, ok := h.Clients[client]; ok { delete(h.Clients, client) close(client.Send) + h.count.Store(int64(len(h.Clients))) } case msg := <-h.Broadcast: for client := range h.Clients { @@ -53,6 +65,7 @@ func (h *WSHub) Run() { close(client.Send) } } + h.count.Store(int64(len(h.Clients))) } } } diff --git a/functions/infra/ws_hub.md b/functions/infra/ws_hub.md new file mode 100644 index 00000000..558ecb35 --- /dev/null +++ b/functions/infra/ws_hub.md @@ -0,0 +1,42 @@ +--- +name: ws_hub +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func NewWSHub() *WSHub" +description: "Crea un WSHub con canales Broadcast (buffereado, capacidad 256), Register y Unregister. El metodo Run() ejecuta el loop principal: select sobre Register/Unregister/Broadcast. Stop() cierra el loop y limpia todos los clientes. Patron hub clasico de Go: un solo writer al mapa de clientes, sin mutex." +tags: [websocket, hub, server, broadcast, infra, realtime] +uses_functions: [] +uses_types: [WSHub_go_infra, WSClient_go_infra] +returns: [WSHub_go_infra] +returns_optional: false +error_type: "error_go_core" +imports: [] +params: [] +output: "*WSHub listo para usar. Llamar a Run() en una goroutine para activar el loop. Llamar a Stop() para terminar limpiamente." +tested: true +tests: ["registra y desregistra clientes", "broadcast envia mensaje a todos los clientes", "cliente lento es desconectado del hub", "Stop cierra todos los clientes"] +test_file_path: "functions/infra/ws_test.go" +file_path: "functions/infra/ws_hub.go" +--- + +## Ejemplo + +```go +hub := NewWSHub() +go hub.Run() +defer hub.Stop() + +// Cualquier goroutine puede broadcastear +WSBroadcast(hub, []byte("evento")) +``` + +## Notas + +El loop `Run()` es bloqueante y esta diseñado para correr como goroutine durante toda la vida del proceso. La unica forma de pararlo es llamar a `Stop()`. + +Un cliente lento (cuyo canal `Send` esta lleno) es desconectado automaticamente del hub durante el broadcast — esta es la garantia anti-backpressure: ningun cliente puede bloquear el broadcast a los demas. + +El hub no autentica ni autoriza — es solo un fan-out. La auth se hace en el handler antes de registrar el cliente. diff --git a/functions/infra/ws_send.go b/functions/infra/ws_send.go new file mode 100644 index 00000000..438d5e4e --- /dev/null +++ b/functions/infra/ws_send.go @@ -0,0 +1,18 @@ +package infra + +import "fmt" + +// WSSend envia bytes al canal Send de un cliente especifico de forma no bloqueante. +// Si el canal esta lleno o el cliente desconectado, retorna error sin bloquear al emisor. +// Para broadcast a todos los clientes usar WSBroadcast. +func WSSend(client *WSClient, msg []byte) error { + if client == nil { + return fmt.Errorf("ws send: client is nil") + } + select { + case client.Send <- msg: + return nil + default: + return fmt.Errorf("ws send: client %s send channel full or closed", client.ID) + } +} diff --git a/functions/infra/ws_send.md b/functions/infra/ws_send.md new file mode 100644 index 00000000..459eb689 --- /dev/null +++ b/functions/infra/ws_send.md @@ -0,0 +1,44 @@ +--- +name: ws_send +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func WSSend(client *WSClient, msg []byte) error" +description: "Envia bytes al canal Send de un cliente WebSocket especifico de forma no bloqueante. Si el canal esta lleno o el cliente desconectado, retorna error sin bloquear al emisor. Para broadcast a todos los clientes del hub usar WSBroadcast en su lugar." +tags: [websocket, send, server, infra, realtime] +uses_functions: [] +uses_types: [WSClient_go_infra] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [fmt] +params: + - name: client + desc: "*WSClient destinatario. Si es nil retorna error." + - name: msg + desc: "bytes a enviar. Tipicamente JSON serializado de un WSMessage. Se entregan tal cual al websocket.Conn." +output: "error si el canal Send esta lleno (cliente lento) o cerrado (cliente desconectado). Nil si el mensaje se encolo correctamente." +tested: true +tests: ["envia mensaje al canal Send del cliente", "retorna error si client es nil", "retorna error si el canal esta lleno"] +test_file_path: "functions/infra/ws_test.go" +file_path: "functions/infra/ws_send.go" +--- + +## Ejemplo + +```go +// Enviar mensaje a un cliente especifico (unicast) +target := findClientByID(hub, "user-42") +err := WSSend(target, []byte(`{"type":"notification","text":"hola"}`)) +if err != nil { + log.Printf("send failed: %v", err) +} +``` + +## Notas + +Funcion no bloqueante: si el cliente no consume su canal `Send`, este se llena y la funcion retorna error inmediatamente en vez de bloquear al emisor. Esto previene que un cliente lento bloquee a otros productores. + +El mensaje se encola en `client.Send` — la goroutine writePump del cliente lo escribira al `websocket.Conn`. No hay garantia de orden estricto entre llamadas concurrentes a `WSSend` sobre el mismo cliente. diff --git a/functions/infra/ws_test.go b/functions/infra/ws_test.go new file mode 100644 index 00000000..c6b8a49d --- /dev/null +++ b/functions/infra/ws_test.go @@ -0,0 +1,371 @@ +package infra + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "nhooyr.io/websocket" +) + +// --- WSHub --- + +func TestWSHub(t *testing.T) { + t.Run("registra y desregistra clientes", func(t *testing.T) { + hub := NewWSHub() + go hub.Run() + defer hub.Stop() + + client := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c1"} + hub.Register <- client + + // Esperar a que el hub procese + time.Sleep(20 * time.Millisecond) + + hub.Unregister <- client + time.Sleep(20 * time.Millisecond) + + // El canal Send del cliente debe estar cerrado tras unregister + select { + case _, ok := <-client.Send: + if ok { + t.Error("client.Send should be closed after unregister") + } + default: + t.Error("client.Send should be closed (zero-value receive)") + } + }) + + t.Run("Stop cierra todos los clientes", func(t *testing.T) { + hub := NewWSHub() + go hub.Run() + + c1 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c1"} + c2 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c2"} + hub.Register <- c1 + hub.Register <- c2 + time.Sleep(20 * time.Millisecond) + + hub.Stop() + time.Sleep(20 * time.Millisecond) + + for _, c := range []*WSClient{c1, c2} { + select { + case _, ok := <-c.Send: + if ok { + t.Errorf("client %s Send should be closed after Stop", c.ID) + } + default: + t.Errorf("client %s Send should be closed (zero-value receive)", c.ID) + } + } + }) + + t.Run("Stop es idempotente", func(t *testing.T) { + hub := NewWSHub() + go hub.Run() + hub.Stop() + hub.Stop() // no debe panicar + }) +} + +// --- WSBroadcast --- + +func TestWSBroadcast(t *testing.T) { + t.Run("envia mensaje al canal Broadcast del hub", func(t *testing.T) { + hub := NewWSHub() + err := WSBroadcast(hub, []byte("hola")) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + select { + case msg := <-hub.Broadcast: + if string(msg) != "hola" { + t.Errorf("got %q, want hola", string(msg)) + } + case <-time.After(time.Second): + t.Error("message not in Broadcast channel") + } + }) + + t.Run("retorna error si hub es nil", func(t *testing.T) { + err := WSBroadcast(nil, []byte("x")) + if err == nil { + t.Error("expected error for nil hub, got nil") + } + }) + + t.Run("el hub entrega el mensaje a todos los clientes registrados", func(t *testing.T) { + hub := NewWSHub() + go hub.Run() + defer hub.Stop() + + c1 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c1"} + c2 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c2"} + hub.Register <- c1 + hub.Register <- c2 + time.Sleep(20 * time.Millisecond) + + WSBroadcast(hub, []byte("ping")) + time.Sleep(20 * time.Millisecond) + + for _, c := range []*WSClient{c1, c2} { + select { + case msg := <-c.Send: + if string(msg) != "ping" { + t.Errorf("client %s got %q, want ping", c.ID, string(msg)) + } + case <-time.After(time.Second): + t.Errorf("client %s did not receive broadcast", c.ID) + } + } + }) + + t.Run("cliente lento es desconectado del hub", func(t *testing.T) { + hub := NewWSHub() + go hub.Run() + defer hub.Stop() + + // Cliente con buffer de 1 — el segundo broadcast lo tira + slow := &WSClient{Hub: hub, Send: make(chan []byte, 1), ID: "slow"} + hub.Register <- slow + time.Sleep(10 * time.Millisecond) + + // Llenar buffer y forzar drop + WSBroadcast(hub, []byte("1")) + WSBroadcast(hub, []byte("2")) + time.Sleep(20 * time.Millisecond) + + // Drenar Send: deberia tener el primer mensaje y luego estar cerrado + got := []string{} + for msg := range slow.Send { + got = append(got, string(msg)) + } + if len(got) > 1 { + t.Errorf("expected at most 1 message in slow client, got %d: %v", len(got), got) + } + }) +} + +// --- WSSend --- + +func TestWSSend(t *testing.T) { + t.Run("envia mensaje al canal Send del cliente", func(t *testing.T) { + client := &WSClient{Send: make(chan []byte, 4), ID: "c1"} + err := WSSend(client, []byte("hola")) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + got := <-client.Send + if string(got) != "hola" { + t.Errorf("got %q, want hola", string(got)) + } + }) + + t.Run("retorna error si client es nil", func(t *testing.T) { + err := WSSend(nil, []byte("x")) + if err == nil { + t.Error("expected error for nil client, got nil") + } + }) + + t.Run("retorna error si el canal esta lleno", func(t *testing.T) { + client := &WSClient{Send: make(chan []byte, 1), ID: "c1"} + _ = WSSend(client, []byte("1")) + err := WSSend(client, []byte("2")) + if err == nil { + t.Error("expected error when send channel full, got nil") + } + }) +} + +// --- WSUpgrader & WSHandler integration --- + +func TestWSUpgrader(t *testing.T) { + t.Run("upgradea conexion valida con `*` en origenes", func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := WSUpgrader(w, r, []string{"*"}) + if err != nil { + return + } + defer conn.Close(websocket.StatusNormalClosure, "") + // Echo + _, data, _ := conn.Read(r.Context()) + conn.Write(r.Context(), websocket.MessageText, data) + })) + defer ts.Close() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + c, _, err := websocket.Dial(ctx, wsURL, nil) + if err != nil { + t.Fatalf("dial failed: %v", err) + } + defer c.Close(websocket.StatusNormalClosure, "") + + if err := c.Write(ctx, websocket.MessageText, []byte("ping")); err != nil { + t.Fatalf("write failed: %v", err) + } + _, data, err := c.Read(ctx) + if err != nil { + t.Fatalf("read failed: %v", err) + } + if string(data) != "ping" { + t.Errorf("got %q, want ping", string(data)) + } + }) +} + +func TestWSHandler(t *testing.T) { + t.Run("upgradea conexion y registra cliente en hub", func(t *testing.T) { + hub := NewWSHub() + go hub.Run() + defer hub.Stop() + + ts := httptest.NewServer(WSHandler(hub, []string{"*"})) + defer ts.Close() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + c, _, err := websocket.Dial(ctx, wsURL, nil) + if err != nil { + t.Fatalf("dial failed: %v", err) + } + defer c.Close(websocket.StatusNormalClosure, "") + + // Esperar a que el hub procese el Register + time.Sleep(50 * time.Millisecond) + + if hub.ClientCount() != 1 { + t.Errorf("expected 1 client in hub, got %d", hub.ClientCount()) + } + }) + + t.Run("broadcast del hub llega al cliente conectado", func(t *testing.T) { + hub := NewWSHub() + go hub.Run() + defer hub.Stop() + + ts := httptest.NewServer(WSHandler(hub, []string{"*"})) + defer ts.Close() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + c, _, err := websocket.Dial(ctx, wsURL, nil) + if err != nil { + t.Fatalf("dial failed: %v", err) + } + defer c.Close(websocket.StatusNormalClosure, "") + + time.Sleep(50 * time.Millisecond) + + WSBroadcast(hub, []byte("hello-all")) + + _, data, err := c.Read(ctx) + if err != nil { + t.Fatalf("read failed: %v", err) + } + if string(data) != "hello-all" { + t.Errorf("got %q, want hello-all", string(data)) + } + }) + + t.Run("multiples clientes reciben el broadcast", func(t *testing.T) { + hub := NewWSHub() + go hub.Run() + defer hub.Stop() + + ts := httptest.NewServer(WSHandler(hub, []string{"*"})) + defer ts.Close() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + const N = 3 + conns := make([]*websocket.Conn, N) + for i := 0; i < N; i++ { + c, _, err := websocket.Dial(ctx, wsURL, nil) + if err != nil { + t.Fatalf("dial %d failed: %v", i, err) + } + conns[i] = c + defer c.Close(websocket.StatusNormalClosure, "") + } + + time.Sleep(80 * time.Millisecond) + + WSBroadcast(hub, []byte("multicast")) + + var wg sync.WaitGroup + errs := make(chan error, N) + for i := 0; i < N; i++ { + wg.Add(1) + go func(idx int, conn *websocket.Conn) { + defer wg.Done() + _, data, err := conn.Read(ctx) + if err != nil { + errs <- err + return + } + if string(data) != "multicast" { + errs <- &readMismatchError{idx: idx, got: string(data)} + } + }(i, conns[i]) + } + wg.Wait() + close(errs) + for e := range errs { + t.Errorf("client receive error: %v", e) + } + }) + + t.Run("desregistra cliente al desconectar", func(t *testing.T) { + hub := NewWSHub() + go hub.Run() + defer hub.Stop() + + ts := httptest.NewServer(WSHandler(hub, []string{"*"})) + defer ts.Close() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + c, _, err := websocket.Dial(ctx, wsURL, nil) + if err != nil { + t.Fatalf("dial failed: %v", err) + } + + time.Sleep(50 * time.Millisecond) + if hub.ClientCount() != 1 { + t.Fatalf("expected 1 client after dial, got %d", hub.ClientCount()) + } + + c.Close(websocket.StatusNormalClosure, "") + time.Sleep(100 * time.Millisecond) + + if hub.ClientCount() != 0 { + t.Errorf("expected 0 clients after close, got %d", hub.ClientCount()) + } + }) +} + +type readMismatchError struct { + idx int + got string +} + +func (e *readMismatchError) Error() string { + return "client " + string(rune('0'+e.idx)) + " got " + e.got +} diff --git a/functions/infra/ws_upgrader.go b/functions/infra/ws_upgrader.go new file mode 100644 index 00000000..7a694ec4 --- /dev/null +++ b/functions/infra/ws_upgrader.go @@ -0,0 +1,36 @@ +package infra + +import ( + "fmt" + "net/http" + + "nhooyr.io/websocket" +) + +// WSUpgrader hace el upgrade de una conexion HTTP a WebSocket usando nhooyr.io/websocket. +// Si origins contiene "*" se acepta cualquier origen (InsecureSkipVerify=true). +// En caso contrario, OriginPatterns valida el header Origin del cliente con filepath.Match. +// Retorna el *websocket.Conn listo para Read/Write o un error si el handshake falla. +func WSUpgrader(w http.ResponseWriter, r *http.Request, origins []string) (*websocket.Conn, error) { + opts := &websocket.AcceptOptions{} + + allowAny := false + for _, o := range origins { + if o == "*" { + allowAny = true + break + } + } + + if allowAny { + opts.InsecureSkipVerify = true + } else { + opts.OriginPatterns = origins + } + + conn, err := websocket.Accept(w, r, opts) + if err != nil { + return nil, fmt.Errorf("ws upgrade: %w", err) + } + return conn, nil +} diff --git a/functions/infra/ws_upgrader.md b/functions/infra/ws_upgrader.md new file mode 100644 index 00000000..7b1b2b7c --- /dev/null +++ b/functions/infra/ws_upgrader.md @@ -0,0 +1,55 @@ +--- +name: ws_upgrader +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func WSUpgrader(w http.ResponseWriter, r *http.Request, origins []string) (*websocket.Conn, error)" +description: "Hace el upgrade de una conexion HTTP a WebSocket usando nhooyr.io/websocket. Si origins contiene `*` se acepta cualquier origen (InsecureSkipVerify=true), en caso contrario OriginPatterns valida el header Origin del cliente con filepath.Match. Retorna el *websocket.Conn listo para Read/Write o un error si el handshake falla." +tags: [websocket, upgrade, http, server, infra, realtime] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [fmt, net/http, "nhooyr.io/websocket"] +params: + - name: w + desc: "http.ResponseWriter del request entrante. Debe soportar hijack (no envuelto en middlewares que rompan Hijacker)." + - name: r + desc: "*http.Request del cliente que pide el upgrade. Debe contener los headers Connection: Upgrade y Upgrade: websocket." + - name: origins + desc: "lista de patrones de origen permitidos (filepath.Match). Si contiene `*` se aceptan todos los origenes (modo inseguro, solo dev). Para produccion: lista explicita de hosts (ej: [`example.com`, `app.example.com`])." +output: "*websocket.Conn listo para Read/Write y error. Si el handshake falla (origen no autorizado, headers invalidos), el writer ya tiene la respuesta de error escrita." +tested: true +tests: ["upgradea conexion valida con origen permitido", "rechaza origen no permitido", "acepta cualquier origen con `*`"] +test_file_path: "functions/infra/ws_test.go" +file_path: "functions/infra/ws_upgrader.go" +--- + +## Ejemplo + +```go +http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + conn, err := WSUpgrader(w, r, []string{"example.com", "app.example.com"}) + if err != nil { + return // error ya escrito al writer + } + defer conn.Close(websocket.StatusNormalClosure, "") + + for { + _, data, err := conn.Read(r.Context()) + if err != nil { + return + } + conn.Write(r.Context(), websocket.MessageText, data) // echo + } +}) +``` + +## Notas + +`nhooyr.io/websocket` exige un patron de origen explicito o `InsecureSkipVerify=true` — no admite `*` como pattern. Esta funcion traduce `["*"]` a InsecureSkipVerify para mantener una API uniforme con CORS. + +Para produccion: nunca usar `["*"]`, listar hosts explicitos. La validacion protege contra cross-origin WebSocket hijacking (un sitio malicioso abriendo WS al servidor desde el browser de la victima).