372 lines
9.0 KiB
Go
372 lines
9.0 KiB
Go
package infra
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"nhooyr.io/websocket"
|
|
)
|
|
|
|
// --- WSHub ---
|
|
|
|
func TestWSHub(t *testing.T) {
|
|
t.Run("registra y desregistra clientes", func(t *testing.T) {
|
|
hub := NewWSHub()
|
|
go hub.Run()
|
|
defer hub.Stop()
|
|
|
|
client := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c1"}
|
|
hub.Register <- client
|
|
|
|
// Esperar a que el hub procese
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
hub.Unregister <- client
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
// El canal Send del cliente debe estar cerrado tras unregister
|
|
select {
|
|
case _, ok := <-client.Send:
|
|
if ok {
|
|
t.Error("client.Send should be closed after unregister")
|
|
}
|
|
default:
|
|
t.Error("client.Send should be closed (zero-value receive)")
|
|
}
|
|
})
|
|
|
|
t.Run("Stop cierra todos los clientes", func(t *testing.T) {
|
|
hub := NewWSHub()
|
|
go hub.Run()
|
|
|
|
c1 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c1"}
|
|
c2 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c2"}
|
|
hub.Register <- c1
|
|
hub.Register <- c2
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
hub.Stop()
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
for _, c := range []*WSClient{c1, c2} {
|
|
select {
|
|
case _, ok := <-c.Send:
|
|
if ok {
|
|
t.Errorf("client %s Send should be closed after Stop", c.ID)
|
|
}
|
|
default:
|
|
t.Errorf("client %s Send should be closed (zero-value receive)", c.ID)
|
|
}
|
|
}
|
|
})
|
|
|
|
t.Run("Stop es idempotente", func(t *testing.T) {
|
|
hub := NewWSHub()
|
|
go hub.Run()
|
|
hub.Stop()
|
|
hub.Stop() // no debe panicar
|
|
})
|
|
}
|
|
|
|
// --- WSBroadcast ---
|
|
|
|
func TestWSBroadcast(t *testing.T) {
|
|
t.Run("envia mensaje al canal Broadcast del hub", func(t *testing.T) {
|
|
hub := NewWSHub()
|
|
err := WSBroadcast(hub, []byte("hola"))
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
select {
|
|
case msg := <-hub.Broadcast:
|
|
if string(msg) != "hola" {
|
|
t.Errorf("got %q, want hola", string(msg))
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Error("message not in Broadcast channel")
|
|
}
|
|
})
|
|
|
|
t.Run("retorna error si hub es nil", func(t *testing.T) {
|
|
err := WSBroadcast(nil, []byte("x"))
|
|
if err == nil {
|
|
t.Error("expected error for nil hub, got nil")
|
|
}
|
|
})
|
|
|
|
t.Run("el hub entrega el mensaje a todos los clientes registrados", func(t *testing.T) {
|
|
hub := NewWSHub()
|
|
go hub.Run()
|
|
defer hub.Stop()
|
|
|
|
c1 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c1"}
|
|
c2 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c2"}
|
|
hub.Register <- c1
|
|
hub.Register <- c2
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
WSBroadcast(hub, []byte("ping"))
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
for _, c := range []*WSClient{c1, c2} {
|
|
select {
|
|
case msg := <-c.Send:
|
|
if string(msg) != "ping" {
|
|
t.Errorf("client %s got %q, want ping", c.ID, string(msg))
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Errorf("client %s did not receive broadcast", c.ID)
|
|
}
|
|
}
|
|
})
|
|
|
|
t.Run("cliente lento es desconectado del hub", func(t *testing.T) {
|
|
hub := NewWSHub()
|
|
go hub.Run()
|
|
defer hub.Stop()
|
|
|
|
// Cliente con buffer de 1 — el segundo broadcast lo tira
|
|
slow := &WSClient{Hub: hub, Send: make(chan []byte, 1), ID: "slow"}
|
|
hub.Register <- slow
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
// Llenar buffer y forzar drop
|
|
WSBroadcast(hub, []byte("1"))
|
|
WSBroadcast(hub, []byte("2"))
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
// Drenar Send: deberia tener el primer mensaje y luego estar cerrado
|
|
got := []string{}
|
|
for msg := range slow.Send {
|
|
got = append(got, string(msg))
|
|
}
|
|
if len(got) > 1 {
|
|
t.Errorf("expected at most 1 message in slow client, got %d: %v", len(got), got)
|
|
}
|
|
})
|
|
}
|
|
|
|
// --- WSSend ---
|
|
|
|
func TestWSSend(t *testing.T) {
|
|
t.Run("envia mensaje al canal Send del cliente", func(t *testing.T) {
|
|
client := &WSClient{Send: make(chan []byte, 4), ID: "c1"}
|
|
err := WSSend(client, []byte("hola"))
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
got := <-client.Send
|
|
if string(got) != "hola" {
|
|
t.Errorf("got %q, want hola", string(got))
|
|
}
|
|
})
|
|
|
|
t.Run("retorna error si client es nil", func(t *testing.T) {
|
|
err := WSSend(nil, []byte("x"))
|
|
if err == nil {
|
|
t.Error("expected error for nil client, got nil")
|
|
}
|
|
})
|
|
|
|
t.Run("retorna error si el canal esta lleno", func(t *testing.T) {
|
|
client := &WSClient{Send: make(chan []byte, 1), ID: "c1"}
|
|
_ = WSSend(client, []byte("1"))
|
|
err := WSSend(client, []byte("2"))
|
|
if err == nil {
|
|
t.Error("expected error when send channel full, got nil")
|
|
}
|
|
})
|
|
}
|
|
|
|
// --- WSUpgrader & WSHandler integration ---
|
|
|
|
func TestWSUpgrader(t *testing.T) {
|
|
t.Run("upgradea conexion valida con `*` en origenes", func(t *testing.T) {
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
conn, err := WSUpgrader(w, r, []string{"*"})
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer conn.Close(websocket.StatusNormalClosure, "")
|
|
// Echo
|
|
_, data, _ := conn.Read(r.Context())
|
|
conn.Write(r.Context(), websocket.MessageText, data)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
|
|
c, _, err := websocket.Dial(ctx, wsURL, nil)
|
|
if err != nil {
|
|
t.Fatalf("dial failed: %v", err)
|
|
}
|
|
defer c.Close(websocket.StatusNormalClosure, "")
|
|
|
|
if err := c.Write(ctx, websocket.MessageText, []byte("ping")); err != nil {
|
|
t.Fatalf("write failed: %v", err)
|
|
}
|
|
_, data, err := c.Read(ctx)
|
|
if err != nil {
|
|
t.Fatalf("read failed: %v", err)
|
|
}
|
|
if string(data) != "ping" {
|
|
t.Errorf("got %q, want ping", string(data))
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestWSHandler(t *testing.T) {
|
|
t.Run("upgradea conexion y registra cliente en hub", func(t *testing.T) {
|
|
hub := NewWSHub()
|
|
go hub.Run()
|
|
defer hub.Stop()
|
|
|
|
ts := httptest.NewServer(WSHandler(hub, []string{"*"}))
|
|
defer ts.Close()
|
|
|
|
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
|
|
c, _, err := websocket.Dial(ctx, wsURL, nil)
|
|
if err != nil {
|
|
t.Fatalf("dial failed: %v", err)
|
|
}
|
|
defer c.Close(websocket.StatusNormalClosure, "")
|
|
|
|
// Esperar a que el hub procese el Register
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
if hub.ClientCount() != 1 {
|
|
t.Errorf("expected 1 client in hub, got %d", hub.ClientCount())
|
|
}
|
|
})
|
|
|
|
t.Run("broadcast del hub llega al cliente conectado", func(t *testing.T) {
|
|
hub := NewWSHub()
|
|
go hub.Run()
|
|
defer hub.Stop()
|
|
|
|
ts := httptest.NewServer(WSHandler(hub, []string{"*"}))
|
|
defer ts.Close()
|
|
|
|
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
|
|
c, _, err := websocket.Dial(ctx, wsURL, nil)
|
|
if err != nil {
|
|
t.Fatalf("dial failed: %v", err)
|
|
}
|
|
defer c.Close(websocket.StatusNormalClosure, "")
|
|
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
WSBroadcast(hub, []byte("hello-all"))
|
|
|
|
_, data, err := c.Read(ctx)
|
|
if err != nil {
|
|
t.Fatalf("read failed: %v", err)
|
|
}
|
|
if string(data) != "hello-all" {
|
|
t.Errorf("got %q, want hello-all", string(data))
|
|
}
|
|
})
|
|
|
|
t.Run("multiples clientes reciben el broadcast", func(t *testing.T) {
|
|
hub := NewWSHub()
|
|
go hub.Run()
|
|
defer hub.Stop()
|
|
|
|
ts := httptest.NewServer(WSHandler(hub, []string{"*"}))
|
|
defer ts.Close()
|
|
|
|
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
|
|
const N = 3
|
|
conns := make([]*websocket.Conn, N)
|
|
for i := 0; i < N; i++ {
|
|
c, _, err := websocket.Dial(ctx, wsURL, nil)
|
|
if err != nil {
|
|
t.Fatalf("dial %d failed: %v", i, err)
|
|
}
|
|
conns[i] = c
|
|
defer c.Close(websocket.StatusNormalClosure, "")
|
|
}
|
|
|
|
time.Sleep(80 * time.Millisecond)
|
|
|
|
WSBroadcast(hub, []byte("multicast"))
|
|
|
|
var wg sync.WaitGroup
|
|
errs := make(chan error, N)
|
|
for i := 0; i < N; i++ {
|
|
wg.Add(1)
|
|
go func(idx int, conn *websocket.Conn) {
|
|
defer wg.Done()
|
|
_, data, err := conn.Read(ctx)
|
|
if err != nil {
|
|
errs <- err
|
|
return
|
|
}
|
|
if string(data) != "multicast" {
|
|
errs <- &readMismatchError{idx: idx, got: string(data)}
|
|
}
|
|
}(i, conns[i])
|
|
}
|
|
wg.Wait()
|
|
close(errs)
|
|
for e := range errs {
|
|
t.Errorf("client receive error: %v", e)
|
|
}
|
|
})
|
|
|
|
t.Run("desregistra cliente al desconectar", func(t *testing.T) {
|
|
hub := NewWSHub()
|
|
go hub.Run()
|
|
defer hub.Stop()
|
|
|
|
ts := httptest.NewServer(WSHandler(hub, []string{"*"}))
|
|
defer ts.Close()
|
|
|
|
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
|
|
c, _, err := websocket.Dial(ctx, wsURL, nil)
|
|
if err != nil {
|
|
t.Fatalf("dial failed: %v", err)
|
|
}
|
|
|
|
time.Sleep(50 * time.Millisecond)
|
|
if hub.ClientCount() != 1 {
|
|
t.Fatalf("expected 1 client after dial, got %d", hub.ClientCount())
|
|
}
|
|
|
|
c.Close(websocket.StatusNormalClosure, "")
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
if hub.ClientCount() != 0 {
|
|
t.Errorf("expected 0 clients after close, got %d", hub.ClientCount())
|
|
}
|
|
})
|
|
}
|
|
|
|
type readMismatchError struct {
|
|
idx int
|
|
got string
|
|
}
|
|
|
|
func (e *readMismatchError) Error() string {
|
|
return "client " + string(rune('0'+e.idx)) + " got " + e.got
|
|
}
|