feat: WebSocket upgrader, hub, send, broadcast, handler con tests (issue 0011 fase 3-4)
This commit is contained in:
@@ -0,0 +1,371 @@
|
||||
package infra
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"nhooyr.io/websocket"
|
||||
)
|
||||
|
||||
// --- WSHub ---
|
||||
|
||||
func TestWSHub(t *testing.T) {
|
||||
t.Run("registra y desregistra clientes", func(t *testing.T) {
|
||||
hub := NewWSHub()
|
||||
go hub.Run()
|
||||
defer hub.Stop()
|
||||
|
||||
client := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c1"}
|
||||
hub.Register <- client
|
||||
|
||||
// Esperar a que el hub procese
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
hub.Unregister <- client
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
// El canal Send del cliente debe estar cerrado tras unregister
|
||||
select {
|
||||
case _, ok := <-client.Send:
|
||||
if ok {
|
||||
t.Error("client.Send should be closed after unregister")
|
||||
}
|
||||
default:
|
||||
t.Error("client.Send should be closed (zero-value receive)")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Stop cierra todos los clientes", func(t *testing.T) {
|
||||
hub := NewWSHub()
|
||||
go hub.Run()
|
||||
|
||||
c1 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c1"}
|
||||
c2 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c2"}
|
||||
hub.Register <- c1
|
||||
hub.Register <- c2
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
hub.Stop()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
for _, c := range []*WSClient{c1, c2} {
|
||||
select {
|
||||
case _, ok := <-c.Send:
|
||||
if ok {
|
||||
t.Errorf("client %s Send should be closed after Stop", c.ID)
|
||||
}
|
||||
default:
|
||||
t.Errorf("client %s Send should be closed (zero-value receive)", c.ID)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Stop es idempotente", func(t *testing.T) {
|
||||
hub := NewWSHub()
|
||||
go hub.Run()
|
||||
hub.Stop()
|
||||
hub.Stop() // no debe panicar
|
||||
})
|
||||
}
|
||||
|
||||
// --- WSBroadcast ---
|
||||
|
||||
func TestWSBroadcast(t *testing.T) {
|
||||
t.Run("envia mensaje al canal Broadcast del hub", func(t *testing.T) {
|
||||
hub := NewWSHub()
|
||||
err := WSBroadcast(hub, []byte("hola"))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
select {
|
||||
case msg := <-hub.Broadcast:
|
||||
if string(msg) != "hola" {
|
||||
t.Errorf("got %q, want hola", string(msg))
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Error("message not in Broadcast channel")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("retorna error si hub es nil", func(t *testing.T) {
|
||||
err := WSBroadcast(nil, []byte("x"))
|
||||
if err == nil {
|
||||
t.Error("expected error for nil hub, got nil")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("el hub entrega el mensaje a todos los clientes registrados", func(t *testing.T) {
|
||||
hub := NewWSHub()
|
||||
go hub.Run()
|
||||
defer hub.Stop()
|
||||
|
||||
c1 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c1"}
|
||||
c2 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c2"}
|
||||
hub.Register <- c1
|
||||
hub.Register <- c2
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
WSBroadcast(hub, []byte("ping"))
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
for _, c := range []*WSClient{c1, c2} {
|
||||
select {
|
||||
case msg := <-c.Send:
|
||||
if string(msg) != "ping" {
|
||||
t.Errorf("client %s got %q, want ping", c.ID, string(msg))
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Errorf("client %s did not receive broadcast", c.ID)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("cliente lento es desconectado del hub", func(t *testing.T) {
|
||||
hub := NewWSHub()
|
||||
go hub.Run()
|
||||
defer hub.Stop()
|
||||
|
||||
// Cliente con buffer de 1 — el segundo broadcast lo tira
|
||||
slow := &WSClient{Hub: hub, Send: make(chan []byte, 1), ID: "slow"}
|
||||
hub.Register <- slow
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Llenar buffer y forzar drop
|
||||
WSBroadcast(hub, []byte("1"))
|
||||
WSBroadcast(hub, []byte("2"))
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
// Drenar Send: deberia tener el primer mensaje y luego estar cerrado
|
||||
got := []string{}
|
||||
for msg := range slow.Send {
|
||||
got = append(got, string(msg))
|
||||
}
|
||||
if len(got) > 1 {
|
||||
t.Errorf("expected at most 1 message in slow client, got %d: %v", len(got), got)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// --- WSSend ---
|
||||
|
||||
func TestWSSend(t *testing.T) {
|
||||
t.Run("envia mensaje al canal Send del cliente", func(t *testing.T) {
|
||||
client := &WSClient{Send: make(chan []byte, 4), ID: "c1"}
|
||||
err := WSSend(client, []byte("hola"))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
got := <-client.Send
|
||||
if string(got) != "hola" {
|
||||
t.Errorf("got %q, want hola", string(got))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("retorna error si client es nil", func(t *testing.T) {
|
||||
err := WSSend(nil, []byte("x"))
|
||||
if err == nil {
|
||||
t.Error("expected error for nil client, got nil")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("retorna error si el canal esta lleno", func(t *testing.T) {
|
||||
client := &WSClient{Send: make(chan []byte, 1), ID: "c1"}
|
||||
_ = WSSend(client, []byte("1"))
|
||||
err := WSSend(client, []byte("2"))
|
||||
if err == nil {
|
||||
t.Error("expected error when send channel full, got nil")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// --- WSUpgrader & WSHandler integration ---
|
||||
|
||||
func TestWSUpgrader(t *testing.T) {
|
||||
t.Run("upgradea conexion valida con `*` en origenes", func(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := WSUpgrader(w, r, []string{"*"})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer conn.Close(websocket.StatusNormalClosure, "")
|
||||
// Echo
|
||||
_, data, _ := conn.Read(r.Context())
|
||||
conn.Write(r.Context(), websocket.MessageText, data)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
c, _, err := websocket.Dial(ctx, wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial failed: %v", err)
|
||||
}
|
||||
defer c.Close(websocket.StatusNormalClosure, "")
|
||||
|
||||
if err := c.Write(ctx, websocket.MessageText, []byte("ping")); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
_, data, err := c.Read(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("read failed: %v", err)
|
||||
}
|
||||
if string(data) != "ping" {
|
||||
t.Errorf("got %q, want ping", string(data))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestWSHandler(t *testing.T) {
|
||||
t.Run("upgradea conexion y registra cliente en hub", func(t *testing.T) {
|
||||
hub := NewWSHub()
|
||||
go hub.Run()
|
||||
defer hub.Stop()
|
||||
|
||||
ts := httptest.NewServer(WSHandler(hub, []string{"*"}))
|
||||
defer ts.Close()
|
||||
|
||||
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
c, _, err := websocket.Dial(ctx, wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial failed: %v", err)
|
||||
}
|
||||
defer c.Close(websocket.StatusNormalClosure, "")
|
||||
|
||||
// Esperar a que el hub procese el Register
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
if hub.ClientCount() != 1 {
|
||||
t.Errorf("expected 1 client in hub, got %d", hub.ClientCount())
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("broadcast del hub llega al cliente conectado", func(t *testing.T) {
|
||||
hub := NewWSHub()
|
||||
go hub.Run()
|
||||
defer hub.Stop()
|
||||
|
||||
ts := httptest.NewServer(WSHandler(hub, []string{"*"}))
|
||||
defer ts.Close()
|
||||
|
||||
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
c, _, err := websocket.Dial(ctx, wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial failed: %v", err)
|
||||
}
|
||||
defer c.Close(websocket.StatusNormalClosure, "")
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
WSBroadcast(hub, []byte("hello-all"))
|
||||
|
||||
_, data, err := c.Read(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("read failed: %v", err)
|
||||
}
|
||||
if string(data) != "hello-all" {
|
||||
t.Errorf("got %q, want hello-all", string(data))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("multiples clientes reciben el broadcast", func(t *testing.T) {
|
||||
hub := NewWSHub()
|
||||
go hub.Run()
|
||||
defer hub.Stop()
|
||||
|
||||
ts := httptest.NewServer(WSHandler(hub, []string{"*"}))
|
||||
defer ts.Close()
|
||||
|
||||
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
const N = 3
|
||||
conns := make([]*websocket.Conn, N)
|
||||
for i := 0; i < N; i++ {
|
||||
c, _, err := websocket.Dial(ctx, wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial %d failed: %v", i, err)
|
||||
}
|
||||
conns[i] = c
|
||||
defer c.Close(websocket.StatusNormalClosure, "")
|
||||
}
|
||||
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
|
||||
WSBroadcast(hub, []byte("multicast"))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
errs := make(chan error, N)
|
||||
for i := 0; i < N; i++ {
|
||||
wg.Add(1)
|
||||
go func(idx int, conn *websocket.Conn) {
|
||||
defer wg.Done()
|
||||
_, data, err := conn.Read(ctx)
|
||||
if err != nil {
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
if string(data) != "multicast" {
|
||||
errs <- &readMismatchError{idx: idx, got: string(data)}
|
||||
}
|
||||
}(i, conns[i])
|
||||
}
|
||||
wg.Wait()
|
||||
close(errs)
|
||||
for e := range errs {
|
||||
t.Errorf("client receive error: %v", e)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("desregistra cliente al desconectar", func(t *testing.T) {
|
||||
hub := NewWSHub()
|
||||
go hub.Run()
|
||||
defer hub.Stop()
|
||||
|
||||
ts := httptest.NewServer(WSHandler(hub, []string{"*"}))
|
||||
defer ts.Close()
|
||||
|
||||
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
c, _, err := websocket.Dial(ctx, wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial failed: %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
if hub.ClientCount() != 1 {
|
||||
t.Fatalf("expected 1 client after dial, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
c.Close(websocket.StatusNormalClosure, "")
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
if hub.ClientCount() != 0 {
|
||||
t.Errorf("expected 0 clients after close, got %d", hub.ClientCount())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type readMismatchError struct {
|
||||
idx int
|
||||
got string
|
||||
}
|
||||
|
||||
func (e *readMismatchError) Error() string {
|
||||
return "client " + string(rune('0'+e.idx)) + " got " + e.got
|
||||
}
|
||||
Reference in New Issue
Block a user