merge: issue/0011-websocket-sse — WebSocket + SSE (8 fns, 4 tipos)

# Conflicts:
#	registry.db
This commit is contained in:
2026-04-18 17:33:32 +02:00
28 changed files with 1593 additions and 0 deletions
+11
View File
@@ -0,0 +1,11 @@
package infra
// SSEEvent es un evento Server-Sent Events segun la spec W3C.
// Campos opcionales: si Event esta vacio se envia solo data,
// si ID esta vacio no se incluye campo id, Retry en ms (0 = omitir).
type SSEEvent struct {
Event string `json:"event"`
Data string `json:"data"`
ID string `json:"id"`
Retry int `json:"retry"`
}
+44
View File
@@ -0,0 +1,44 @@
package infra
import "net/http"
// SSEHandler retorna un http.HandlerFunc que setea los headers SSE,
// consume eventos del canal y los envia con flush. Cierra limpiamente
// si el cliente se desconecta (context cancelado) o si el canal se cierra.
//
// Headers seteados:
// Content-Type: text/event-stream
// Cache-Control: no-cache
// Connection: keep-alive
// X-Accel-Buffering: no (deshabilita buffering en nginx)
func SSEHandler(events <-chan SSEEvent) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
flusher.Flush()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case ev, ok := <-events:
if !ok {
return
}
if err := SSESend(w, ev); err != nil {
return
}
}
}
}
}
+57
View File
@@ -0,0 +1,57 @@
---
name: sse_handler
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func SSEHandler(events <-chan SSEEvent) http.HandlerFunc"
description: "Retorna un http.HandlerFunc que setea los headers SSE (Content-Type, Cache-Control, Connection, X-Accel-Buffering), consume eventos del canal y los envia con flush a cada cliente conectado. Cierra limpiamente si el cliente se desconecta (context cancelado) o si el canal de eventos se cierra."
tags: [sse, server-sent-events, http, handler, server, infra, realtime]
uses_functions: [sse_send_go_infra]
uses_types: [SSEEvent_go_infra]
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [net/http]
params:
- name: events
desc: "canal de SSEEvent del que se consumen eventos para enviar al cliente. Cerrar el canal termina el handler limpiamente."
output: "http.HandlerFunc lista para montarse en una ruta. Sirve un solo cliente por invocacion (cada request abre su propio stream)."
tested: true
tests: ["setea headers SSE correctos", "envia eventos del canal al writer", "termina si el contexto del request se cancela", "termina si el canal de eventos se cierra"]
test_file_path: "functions/infra/sse_test.go"
file_path: "functions/infra/sse_handler.go"
---
## Ejemplo
```go
events := make(chan SSEEvent, 100)
routes := []Route{
{Method: "GET", Path: "/events", Handler: SSEHandler(events)},
}
go func() {
for i := 0; ; i++ {
events <- SSEEvent{
Event: "tick",
ID: fmt.Sprintf("%d", i),
Data: fmt.Sprintf(`{"n":%d}`, i),
}
time.Sleep(time.Second)
}
}()
mux := HTTPRouter(routes)
HTTPServe(":8080", mux, ctx)
```
## Notas
Importante: este handler asume **un canal compartido entre todos los clientes** o un canal por request. Si se quiere broadcast a multiples clientes desde una sola fuente, montar un fan-out por encima (ej: un hub similar a `WSHub` pero para SSE). Para uso tipico (1 cliente = 1 stream) basta con crear el canal dentro del handler externo y pasarlo.
`X-Accel-Buffering: no` deshabilita el buffering en nginx (sin esto los eventos se acumulan hasta que nginx flushea su buffer, anulando el real-time). En Caddy/Traefik no es necesario pero no estorba.
El handler **no monta keepalives** automaticamente. Si la conexion va a estar idle mas de ~30s, lanzar `SSEKeepalive` en una goroutine paralela compartiendo el mismo writer.
+34
View File
@@ -0,0 +1,34 @@
package infra
import (
"net/http"
"time"
)
// SSEKeepalive envia comentarios SSE periodicos (": keepalive\n\n") al writer
// hasta que done se cierre. Bloqueante: lanzar como goroutine.
// Sirve para evitar que proxies/load balancers cierren conexiones inactivas.
// Si w implementa http.Flusher hace flush tras cada keepalive.
func SSEKeepalive(w http.ResponseWriter, interval time.Duration, done <-chan struct{}) {
if interval <= 0 {
interval = 30 * time.Second
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
flusher, _ := w.(http.Flusher)
for {
select {
case <-done:
return
case <-ticker.C:
if _, err := w.Write([]byte(": keepalive\n\n")); err != nil {
return
}
if flusher != nil {
flusher.Flush()
}
}
}
}
+52
View File
@@ -0,0 +1,52 @@
---
name: sse_keepalive
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func SSEKeepalive(w http.ResponseWriter, interval time.Duration, done <-chan struct{})"
description: "Envia comentarios SSE periodicos (`: keepalive\\n\\n`) al writer hasta que done se cierre. Sirve para evitar que proxies o load balancers cierren conexiones inactivas. Bloqueante: lanzar como goroutine. Si w implementa http.Flusher hace flush tras cada keepalive."
tags: [sse, keepalive, server-sent-events, http, server, infra, realtime]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [net/http, time]
params:
- name: w
desc: "http.ResponseWriter destino del stream SSE. Si implementa http.Flusher se hace flush tras cada keepalive."
- name: interval
desc: "intervalo entre keepalives (recomendado 15-30s). Si <= 0 se usa 30s por defecto."
- name: done
desc: "canal de cierre. Cuando se cierre, la goroutine retorna inmediatamente."
output: "ningun retorno; la funcion retorna cuando done se cierra o cuando una escritura falla (cliente desconectado)"
tested: true
tests: ["escribe comentario keepalive periodicamente", "termina cuando done se cierra", "termina cuando la escritura falla"]
test_file_path: "functions/infra/sse_test.go"
file_path: "functions/infra/sse_keepalive.go"
---
## Ejemplo
```go
http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
done := make(chan struct{})
defer close(done)
go SSEKeepalive(w, 15*time.Second, done)
// Bucle principal de envio de eventos...
})
```
## Notas
Comentario SSE: la spec dice que cualquier linea que empiece con `:` es un comentario y el cliente la ignora. Mantienen viva la conexion sin generar eventos espurios. Recomendado intervalo entre 15-30s para infraestructuras tipicas (nginx default = 60s timeout idle).
Si la escritura falla (conexion cerrada) la funcion retorna sin error, asumiendo que el handler principal ya detectara la desconexion via `r.Context().Done()`.
+43
View File
@@ -0,0 +1,43 @@
package infra
import (
"fmt"
"net/http"
"strings"
)
// SSESend escribe un evento SSE formateado al ResponseWriter y hace flush si es posible.
// Sigue la spec W3C: campos opcionales (event, id, retry) solo se incluyen si tienen valor.
// Data con saltos de linea se traduce a multiples lineas "data:" segun la spec.
// Retorna error si la escritura falla.
func SSESend(w http.ResponseWriter, event SSEEvent) error {
var b strings.Builder
if event.Event != "" {
b.WriteString("event: ")
b.WriteString(event.Event)
b.WriteByte('\n')
}
if event.ID != "" {
b.WriteString("id: ")
b.WriteString(event.ID)
b.WriteByte('\n')
}
if event.Retry > 0 {
fmt.Fprintf(&b, "retry: %d\n", event.Retry)
}
// Data: cada salto de linea genera una nueva linea "data:"
for _, line := range strings.Split(event.Data, "\n") {
b.WriteString("data: ")
b.WriteString(line)
b.WriteByte('\n')
}
b.WriteByte('\n') // separador de eventos
if _, err := w.Write([]byte(b.String())); err != nil {
return fmt.Errorf("sse send: %w", err)
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
return nil
}
+49
View File
@@ -0,0 +1,49 @@
---
name: sse_send
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func SSESend(w http.ResponseWriter, event SSEEvent) error"
description: "Escribe un evento Server-Sent Events formateado al ResponseWriter segun la spec W3C y hace flush si el writer implementa http.Flusher. Campos opcionales (event, id, retry) solo se incluyen si tienen valor. Data con saltos de linea se traduce a multiples lineas data: segun la spec."
tags: [sse, server-sent-events, http, server, infra, realtime]
uses_functions: []
uses_types: [SSEEvent_go_infra]
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [fmt, net/http, strings]
params:
- name: w
desc: "http.ResponseWriter destino. Debe tener headers SSE ya seteados (Content-Type: text/event-stream). Si implementa http.Flusher se hace flush tras la escritura."
- name: event
desc: "SSEEvent a serializar. Campos opcionales se omiten si estan vacios. Data multilinea se trocea en varias lineas data: segun la spec."
output: "error si la escritura al ResponseWriter falla, nil si el evento se envio y se hizo flush correctamente"
tested: true
tests: ["serializa data simple sin event ni id", "incluye event cuando esta presente", "incluye id cuando esta presente", "incluye retry cuando es positivo", "data multilinea genera multiples lineas data:", "hace flush si el writer es Flusher"]
test_file_path: "functions/infra/sse_test.go"
file_path: "functions/infra/sse_send.go"
---
## Ejemplo
```go
http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
SSESend(w, SSEEvent{
Event: "metric",
ID: "1",
Data: `{"cpu": 45.2}`,
})
})
```
## Notas
Funcion atomica: solo formatea y flushea un evento. La gestion del ciclo de vida (headers, loop de eventos, deteccion de desconexion) se hace en `sse_handler`.
Sigue estrictamente la spec W3C de Server-Sent Events. El doble salto de linea final separa eventos. Para enviar un comentario keepalive (no un evento) escribir `: keepalive\n\n` directamente — no usar esta funcion.
+267
View File
@@ -0,0 +1,267 @@
package infra
import (
"context"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"
)
// nonFlushWriter es un http.ResponseWriter que NO implementa http.Flusher,
// usado para validar el path de error de SSEHandler.
type nonFlushWriter struct {
header http.Header
body []byte
status int
}
func (n *nonFlushWriter) Header() http.Header { return n.header }
func (n *nonFlushWriter) Write(b []byte) (int, error) {
n.body = append(n.body, b...)
return len(b), nil
}
func (n *nonFlushWriter) WriteHeader(s int) { n.status = s }
// flushRecorder es un httptest.ResponseRecorder que tambien implementa http.Flusher
// para que las funciones SSE puedan flushear sin perder los bytes en buffer.
type flushRecorder struct {
*httptest.ResponseRecorder
flushes int32
}
func newFlushRecorder() *flushRecorder {
return &flushRecorder{ResponseRecorder: httptest.NewRecorder()}
}
func (f *flushRecorder) Flush() {
atomic.AddInt32(&f.flushes, 1)
}
func (f *flushRecorder) FlushCount() int {
return int(atomic.LoadInt32(&f.flushes))
}
// --- SSESend ---
func TestSSESend(t *testing.T) {
t.Run("serializa data simple sin event ni id", func(t *testing.T) {
rec := newFlushRecorder()
err := SSESend(rec, SSEEvent{Data: "hola"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
body := rec.Body.String()
want := "data: hola\n\n"
if body != want {
t.Errorf("got body %q, want %q", body, want)
}
})
t.Run("incluye event cuando esta presente", func(t *testing.T) {
rec := newFlushRecorder()
_ = SSESend(rec, SSEEvent{Event: "tick", Data: "1"})
body := rec.Body.String()
if !strings.HasPrefix(body, "event: tick\n") {
t.Errorf("body should start with 'event: tick\\n', got %q", body)
}
})
t.Run("incluye id cuando esta presente", func(t *testing.T) {
rec := newFlushRecorder()
_ = SSESend(rec, SSEEvent{ID: "42", Data: "x"})
if !strings.Contains(rec.Body.String(), "id: 42\n") {
t.Errorf("body should contain 'id: 42\\n', got %q", rec.Body.String())
}
})
t.Run("incluye retry cuando es positivo", func(t *testing.T) {
rec := newFlushRecorder()
_ = SSESend(rec, SSEEvent{Retry: 5000, Data: "x"})
if !strings.Contains(rec.Body.String(), "retry: 5000\n") {
t.Errorf("body should contain 'retry: 5000\\n', got %q", rec.Body.String())
}
})
t.Run("omite retry cuando es 0", func(t *testing.T) {
rec := newFlushRecorder()
_ = SSESend(rec, SSEEvent{Retry: 0, Data: "x"})
if strings.Contains(rec.Body.String(), "retry:") {
t.Errorf("body should NOT contain 'retry:', got %q", rec.Body.String())
}
})
t.Run("data multilinea genera multiples lineas data:", func(t *testing.T) {
rec := newFlushRecorder()
_ = SSESend(rec, SSEEvent{Data: "linea1\nlinea2\nlinea3"})
body := rec.Body.String()
for _, want := range []string{"data: linea1\n", "data: linea2\n", "data: linea3\n"} {
if !strings.Contains(body, want) {
t.Errorf("body should contain %q, got %q", want, body)
}
}
})
t.Run("hace flush si el writer es Flusher", func(t *testing.T) {
rec := newFlushRecorder()
_ = SSESend(rec, SSEEvent{Data: "x"})
if rec.FlushCount() != 1 {
t.Errorf("expected 1 flush, got %d", rec.FlushCount())
}
})
t.Run("termina cada evento con doble salto de linea", func(t *testing.T) {
rec := newFlushRecorder()
_ = SSESend(rec, SSEEvent{Event: "x", Data: "y"})
body := rec.Body.String()
if !strings.HasSuffix(body, "\n\n") {
t.Errorf("body should end with '\\n\\n', got %q", body)
}
})
}
// --- SSEKeepalive ---
func TestSSEKeepalive(t *testing.T) {
t.Run("escribe comentario keepalive periodicamente", func(t *testing.T) {
rec := newFlushRecorder()
done := make(chan struct{})
go SSEKeepalive(rec, 10*time.Millisecond, done)
time.Sleep(50 * time.Millisecond)
close(done)
time.Sleep(20 * time.Millisecond)
body := rec.Body.String()
if !strings.Contains(body, ": keepalive\n\n") {
t.Errorf("body should contain ': keepalive\\n\\n', got %q", body)
}
count := strings.Count(body, ": keepalive\n\n")
if count < 2 {
t.Errorf("expected at least 2 keepalives, got %d", count)
}
})
t.Run("termina cuando done se cierra", func(t *testing.T) {
rec := newFlushRecorder()
done := make(chan struct{})
finished := make(chan struct{})
go func() {
SSEKeepalive(rec, 5*time.Millisecond, done)
close(finished)
}()
close(done)
select {
case <-finished:
// ok
case <-time.After(100 * time.Millisecond):
t.Error("SSEKeepalive did not return after done was closed")
}
})
}
// --- SSEHandler ---
func TestSSEHandler(t *testing.T) {
t.Run("setea headers SSE correctos", func(t *testing.T) {
events := make(chan SSEEvent)
handler := SSEHandler(events)
// Lanzar handler en goroutine y cerrar canal para que retorne
go func() {
time.Sleep(20 * time.Millisecond)
close(events)
}()
ts := httptest.NewServer(handler)
defer ts.Close()
resp, err := http.Get(ts.URL)
if err != nil {
t.Fatalf("get failed: %v", err)
}
defer resp.Body.Close()
if got := resp.Header.Get("Content-Type"); got != "text/event-stream" {
t.Errorf("Content-Type = %q, want text/event-stream", got)
}
if got := resp.Header.Get("Cache-Control"); got != "no-cache" {
t.Errorf("Cache-Control = %q, want no-cache", got)
}
})
t.Run("envia eventos del canal al writer", func(t *testing.T) {
events := make(chan SSEEvent, 2)
events <- SSEEvent{Event: "first", Data: "1"}
events <- SSEEvent{Event: "second", Data: "2"}
close(events)
handler := SSEHandler(events)
ts := httptest.NewServer(handler)
defer ts.Close()
resp, err := http.Get(ts.URL)
if err != nil {
t.Fatalf("get failed: %v", err)
}
defer resp.Body.Close()
buf := make([]byte, 4096)
n, _ := resp.Body.Read(buf)
body := string(buf[:n])
for _, want := range []string{"event: first", "event: second", "data: 1", "data: 2"} {
if !strings.Contains(body, want) {
t.Errorf("body should contain %q, got %q", want, body)
}
}
})
t.Run("termina si el contexto del request se cancela", func(t *testing.T) {
events := make(chan SSEEvent)
handler := SSEHandler(events)
ts := httptest.NewServer(handler)
defer ts.Close()
ctx, cancel := context.WithCancel(context.Background())
req, _ := http.NewRequestWithContext(ctx, "GET", ts.URL, nil)
done := make(chan struct{})
go func() {
resp, err := http.DefaultClient.Do(req)
if err == nil {
resp.Body.Close()
}
close(done)
}()
time.Sleep(20 * time.Millisecond)
cancel()
select {
case <-done:
// handler debe haber retornado tras la cancelacion del cliente
case <-time.After(2 * time.Second):
t.Error("handler did not return after context cancel")
}
})
t.Run("retorna 500 si el writer no es Flusher", func(t *testing.T) {
// nonFlushWriter no implementa Flusher
events := make(chan SSEEvent)
handler := SSEHandler(events)
rec := &nonFlushWriter{header: http.Header{}}
req := httptest.NewRequest("GET", "/", nil)
handler(rec, req)
if rec.status != http.StatusInternalServerError {
t.Errorf("got status %d, want 500", rec.status)
}
})
}
+14
View File
@@ -0,0 +1,14 @@
package infra
import "fmt"
// WSBroadcast envia bytes al canal Broadcast del hub para entregar el mensaje
// a todos los clientes conectados. La entrega real la hace el loop Run() del hub.
// Bloqueante hasta que el canal Broadcast tenga espacio (capacidad 256 por defecto).
func WSBroadcast(hub *WSHub, msg []byte) error {
if hub == nil {
return fmt.Errorf("ws broadcast: hub is nil")
}
hub.Broadcast <- msg
return nil
}
+50
View File
@@ -0,0 +1,50 @@
---
name: ws_broadcast
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func WSBroadcast(hub *WSHub, msg []byte) error"
description: "Envia bytes al canal Broadcast del hub para que se entreguen a todos los clientes WebSocket conectados. La entrega real la hace el loop Run() del hub. Bloqueante hasta que el canal Broadcast tenga espacio (capacidad 256 por defecto). Retorna error si el hub es nil."
tags: [websocket, broadcast, server, fanout, infra, realtime]
uses_functions: []
uses_types: [WSHub_go_infra]
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [fmt]
params:
- name: hub
desc: "*WSHub donde estan registrados los clientes. Si es nil retorna error."
- name: msg
desc: "bytes a entregar a todos los clientes. Tipicamente JSON serializado de un WSMessage."
output: "error si hub es nil. Nil si el mensaje se encolo en el canal Broadcast (la entrega es asincrona)."
tested: true
tests: ["envia mensaje al canal Broadcast del hub", "retorna error si hub es nil", "el hub entrega el mensaje a todos los clientes registrados"]
test_file_path: "functions/infra/ws_test.go"
file_path: "functions/infra/ws_broadcast.go"
---
## Ejemplo
```go
hub := NewWSHub()
go hub.Run()
defer hub.Stop()
// Notificar a todos los clientes conectados
msg, _ := json.Marshal(WSMessage{
Type: "step_complete",
Payload: []byte(`{"step":"build","status":"ok"}`),
SenderID: "pipeline_runner",
Ts: time.Now().UnixMilli(),
})
WSBroadcast(hub, msg)
```
## Notas
Bloqueante con backpressure controlado: si el canal `Broadcast` se llena (256 mensajes pendientes), la llamada se bloquea hasta que el hub procese alguno. Esto da feedback natural al productor cuando el sistema esta saturado.
La entrega a clientes individuales es no bloqueante (clientes lentos se desconectan automaticamente). Si necesitas semantica at-least-once con retry, montar la logica encima.
+15
View File
@@ -0,0 +1,15 @@
package infra
import (
"nhooyr.io/websocket"
)
// WSClient representa una conexion WebSocket individual.
// Cada cliente tiene su propio canal de envio buffereado y una
// referencia al hub al que pertenece.
type WSClient struct {
Hub *WSHub
Conn *websocket.Conn
Send chan []byte
ID string
}
+92
View File
@@ -0,0 +1,92 @@
package infra
import (
"context"
"crypto/rand"
"encoding/hex"
"net/http"
"time"
"nhooyr.io/websocket"
)
// WSHandler retorna un http.HandlerFunc que upgradea la conexion HTTP a WebSocket,
// crea un WSClient, lo registra en el hub y lanza dos goroutines:
// - readPump: lee mensajes del Conn y los publica al hub.Broadcast
// - writePump: consume del client.Send y escribe al Conn
//
// El cliente se desregistra del hub cuando alguna de las pumps termina (cliente
// desconectado, error de I/O, o canal cerrado). Asigna un ID hex aleatorio si
// no se sobreescribe externamente.
func WSHandler(hub *WSHub, origins []string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
conn, err := WSUpgrader(w, r, origins)
if err != nil {
return
}
client := &WSClient{
Hub: hub,
Conn: conn,
Send: make(chan []byte, 64),
ID: randomID(),
}
hub.Register <- client
// writePump
go wsWritePump(client)
// readPump (bloqueante en el handler para mantener viva la request)
wsReadPump(client)
}
}
// wsReadPump lee mensajes del Conn y los publica al hub.Broadcast.
// Termina si Read retorna error (cliente desconectado o cerrado).
// Al terminar, desregistra el cliente y cierra la conexion.
func wsReadPump(client *WSClient) {
defer func() {
// Unregister no bloqueante: si el hub ya esta cerrado, no esperamos
select {
case client.Hub.Unregister <- client:
case <-client.Hub.done:
}
_ = client.Conn.Close(websocket.StatusNormalClosure, "")
}()
ctx := context.Background()
for {
_, data, err := client.Conn.Read(ctx)
if err != nil {
return
}
// Encolar al hub.Broadcast sin bloquear si esta lleno
select {
case client.Hub.Broadcast <- data:
default:
// Hub saturado: dropear mensaje del cliente para no bloquear el read
}
}
}
// wsWritePump consume del canal Send del cliente y escribe al Conn.
// Termina si el canal se cierra (hub desregistro al cliente) o si Write falla.
func wsWritePump(client *WSClient) {
for msg := range client.Send {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err := client.Conn.Write(ctx, websocket.MessageText, msg)
cancel()
if err != nil {
return
}
}
}
// randomID genera un identificador hex aleatorio de 16 caracteres (8 bytes).
// No es criptograficamente perfecto para autenticacion — solo identificacion.
func randomID() string {
b := make([]byte, 8)
if _, err := rand.Read(b); err != nil {
return "anon"
}
return hex.EncodeToString(b)
}
+49
View File
@@ -0,0 +1,49 @@
---
name: ws_handler
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func WSHandler(hub *WSHub, origins []string) http.HandlerFunc"
description: "Retorna un http.HandlerFunc que upgradea la conexion HTTP a WebSocket via WSUpgrader, crea un WSClient con ID hex aleatorio, lo registra en el hub y lanza readPump y writePump como goroutines. La readPump bloquea el handler para mantener la request viva. Al desconectar (error de I/O o canal cerrado) se desregistra el cliente y se cierra la conexion limpiamente."
tags: [websocket, handler, http, server, hub, infra, realtime]
uses_functions: [ws_upgrader_go_infra]
uses_types: [WSHub_go_infra, WSClient_go_infra]
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [context, crypto/rand, encoding/hex, net/http, time, "nhooyr.io/websocket"]
params:
- name: hub
desc: "*WSHub donde se registran los clientes que se conecten via este handler. Debe estar corriendo (hub.Run() lanzado en goroutine)."
- name: origins
desc: "lista de patrones de origen permitidos para el upgrade. Pasa directamente a WSUpgrader. Para dev: `[\"*\"]`. Para prod: lista explicita."
output: "http.HandlerFunc lista para montarse en una ruta GET. Cada conexion entrante crea un cliente nuevo en el hub."
tested: true
tests: ["upgradea conexion y registra cliente en hub", "broadcast del hub llega al cliente conectado", "desregistra cliente al desconectar", "multiples clientes reciben el broadcast"]
test_file_path: "functions/infra/ws_test.go"
file_path: "functions/infra/ws_handler.go"
---
## Ejemplo
```go
hub := NewWSHub()
go hub.Run()
defer hub.Stop()
routes := []Route{
{Method: "GET", Path: "/ws", Handler: WSHandler(hub, []string{"example.com"})},
}
mux := HTTPRouter(routes)
HTTPServe(":8080", mux, ctx)
```
## Notas
El handler asigna un ID hex aleatorio de 16 caracteres a cada cliente (`crypto/rand`). Si se quiere usar IDs propios (UUID, username autenticado), envolver el handler para sobreescribir `client.ID` antes del Register.
readPump publica todos los mensajes recibidos al `hub.Broadcast` — modo chat-like por defecto. Para procesar mensajes con un callback (sin reenviar a todos), copiar el codigo y reemplazar el Broadcast por la logica deseada. Esta separacion es deliberada: el hub es un mecanismo de fan-out, no un router de mensajes.
writePump usa write deadline de 10s — si un Write tarda mas, asume cliente muerto y termina. Esto previene goroutines colgadas si el cliente cierra TCP sin enviar Close frame.
+82
View File
@@ -0,0 +1,82 @@
package infra
import "sync/atomic"
// WSHub gestiona el ciclo de vida de conexiones WebSocket.
// Mantiene un mapa de clientes activos y canales para registro,
// desregistro y broadcast. Se ejecuta como goroutine via su Run().
type WSHub struct {
Clients map[*WSClient]bool
Broadcast chan []byte
Register chan *WSClient
Unregister chan *WSClient
done chan struct{}
count atomic.Int64
}
// ClientCount retorna el numero de clientes registrados de forma thread-safe.
// Util para metricas, dashboards y tests.
func (h *WSHub) ClientCount() int {
return int(h.count.Load())
}
// NewWSHub crea un WSHub vacio con canales inicializados.
// Ejecutar Run() en una goroutine para activar el loop de eventos.
func NewWSHub() *WSHub {
return &WSHub{
Clients: make(map[*WSClient]bool),
Broadcast: make(chan []byte, 256),
Register: make(chan *WSClient),
Unregister: make(chan *WSClient),
done: make(chan struct{}),
}
}
// Run ejecuta el loop principal del hub. Atiende registros, desregistros
// y broadcasts en un select. Bloqueante: lanzar como goroutine.
// Para parar, llamar a Stop().
func (h *WSHub) Run() {
for {
select {
case <-h.done:
// Cerrar todos los Send y vaciar el mapa
for client := range h.Clients {
delete(h.Clients, client)
close(client.Send)
}
h.count.Store(0)
return
case client := <-h.Register:
h.Clients[client] = true
h.count.Store(int64(len(h.Clients)))
case client := <-h.Unregister:
if _, ok := h.Clients[client]; ok {
delete(h.Clients, client)
close(client.Send)
h.count.Store(int64(len(h.Clients)))
}
case msg := <-h.Broadcast:
for client := range h.Clients {
select {
case client.Send <- msg:
default:
// Cliente lento: desconectar
delete(h.Clients, client)
close(client.Send)
}
}
h.count.Store(int64(len(h.Clients)))
}
}
}
// Stop cierra el loop de Run() y limpia todos los clientes.
// Idempotente — llamar mas de una vez no panica.
func (h *WSHub) Stop() {
select {
case <-h.done:
// Ya cerrado
default:
close(h.done)
}
}
+42
View File
@@ -0,0 +1,42 @@
---
name: ws_hub
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func NewWSHub() *WSHub"
description: "Crea un WSHub con canales Broadcast (buffereado, capacidad 256), Register y Unregister. El metodo Run() ejecuta el loop principal: select sobre Register/Unregister/Broadcast. Stop() cierra el loop y limpia todos los clientes. Patron hub clasico de Go: un solo writer al mapa de clientes, sin mutex."
tags: [websocket, hub, server, broadcast, infra, realtime]
uses_functions: []
uses_types: [WSHub_go_infra, WSClient_go_infra]
returns: [WSHub_go_infra]
returns_optional: false
error_type: "error_go_core"
imports: []
params: []
output: "*WSHub listo para usar. Llamar a Run() en una goroutine para activar el loop. Llamar a Stop() para terminar limpiamente."
tested: true
tests: ["registra y desregistra clientes", "broadcast envia mensaje a todos los clientes", "cliente lento es desconectado del hub", "Stop cierra todos los clientes"]
test_file_path: "functions/infra/ws_test.go"
file_path: "functions/infra/ws_hub.go"
---
## Ejemplo
```go
hub := NewWSHub()
go hub.Run()
defer hub.Stop()
// Cualquier goroutine puede broadcastear
WSBroadcast(hub, []byte("evento"))
```
## Notas
El loop `Run()` es bloqueante y esta diseñado para correr como goroutine durante toda la vida del proceso. La unica forma de pararlo es llamar a `Stop()`.
Un cliente lento (cuyo canal `Send` esta lleno) es desconectado automaticamente del hub durante el broadcast — esta es la garantia anti-backpressure: ningun cliente puede bloquear el broadcast a los demas.
El hub no autentica ni autoriza — es solo un fan-out. La auth se hace en el handler antes de registrar el cliente.
+10
View File
@@ -0,0 +1,10 @@
package infra
// WSMessage es un mensaje tipado que viaja por WebSocket.
// El campo Type permite al receptor decidir como procesar el payload.
type WSMessage struct {
Type string `json:"type"`
Payload []byte `json:"payload"`
SenderID string `json:"sender_id"`
Ts int64 `json:"ts"`
}
+18
View File
@@ -0,0 +1,18 @@
package infra
import "fmt"
// WSSend envia bytes al canal Send de un cliente especifico de forma no bloqueante.
// Si el canal esta lleno o el cliente desconectado, retorna error sin bloquear al emisor.
// Para broadcast a todos los clientes usar WSBroadcast.
func WSSend(client *WSClient, msg []byte) error {
if client == nil {
return fmt.Errorf("ws send: client is nil")
}
select {
case client.Send <- msg:
return nil
default:
return fmt.Errorf("ws send: client %s send channel full or closed", client.ID)
}
}
+44
View File
@@ -0,0 +1,44 @@
---
name: ws_send
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func WSSend(client *WSClient, msg []byte) error"
description: "Envia bytes al canal Send de un cliente WebSocket especifico de forma no bloqueante. Si el canal esta lleno o el cliente desconectado, retorna error sin bloquear al emisor. Para broadcast a todos los clientes del hub usar WSBroadcast en su lugar."
tags: [websocket, send, server, infra, realtime]
uses_functions: []
uses_types: [WSClient_go_infra]
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [fmt]
params:
- name: client
desc: "*WSClient destinatario. Si es nil retorna error."
- name: msg
desc: "bytes a enviar. Tipicamente JSON serializado de un WSMessage. Se entregan tal cual al websocket.Conn."
output: "error si el canal Send esta lleno (cliente lento) o cerrado (cliente desconectado). Nil si el mensaje se encolo correctamente."
tested: true
tests: ["envia mensaje al canal Send del cliente", "retorna error si client es nil", "retorna error si el canal esta lleno"]
test_file_path: "functions/infra/ws_test.go"
file_path: "functions/infra/ws_send.go"
---
## Ejemplo
```go
// Enviar mensaje a un cliente especifico (unicast)
target := findClientByID(hub, "user-42")
err := WSSend(target, []byte(`{"type":"notification","text":"hola"}`))
if err != nil {
log.Printf("send failed: %v", err)
}
```
## Notas
Funcion no bloqueante: si el cliente no consume su canal `Send`, este se llena y la funcion retorna error inmediatamente en vez de bloquear al emisor. Esto previene que un cliente lento bloquee a otros productores.
El mensaje se encola en `client.Send` — la goroutine writePump del cliente lo escribira al `websocket.Conn`. No hay garantia de orden estricto entre llamadas concurrentes a `WSSend` sobre el mismo cliente.
+371
View File
@@ -0,0 +1,371 @@
package infra
import (
"context"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
"nhooyr.io/websocket"
)
// --- WSHub ---
func TestWSHub(t *testing.T) {
t.Run("registra y desregistra clientes", func(t *testing.T) {
hub := NewWSHub()
go hub.Run()
defer hub.Stop()
client := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c1"}
hub.Register <- client
// Esperar a que el hub procese
time.Sleep(20 * time.Millisecond)
hub.Unregister <- client
time.Sleep(20 * time.Millisecond)
// El canal Send del cliente debe estar cerrado tras unregister
select {
case _, ok := <-client.Send:
if ok {
t.Error("client.Send should be closed after unregister")
}
default:
t.Error("client.Send should be closed (zero-value receive)")
}
})
t.Run("Stop cierra todos los clientes", func(t *testing.T) {
hub := NewWSHub()
go hub.Run()
c1 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c1"}
c2 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c2"}
hub.Register <- c1
hub.Register <- c2
time.Sleep(20 * time.Millisecond)
hub.Stop()
time.Sleep(20 * time.Millisecond)
for _, c := range []*WSClient{c1, c2} {
select {
case _, ok := <-c.Send:
if ok {
t.Errorf("client %s Send should be closed after Stop", c.ID)
}
default:
t.Errorf("client %s Send should be closed (zero-value receive)", c.ID)
}
}
})
t.Run("Stop es idempotente", func(t *testing.T) {
hub := NewWSHub()
go hub.Run()
hub.Stop()
hub.Stop() // no debe panicar
})
}
// --- WSBroadcast ---
func TestWSBroadcast(t *testing.T) {
t.Run("envia mensaje al canal Broadcast del hub", func(t *testing.T) {
hub := NewWSHub()
err := WSBroadcast(hub, []byte("hola"))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
select {
case msg := <-hub.Broadcast:
if string(msg) != "hola" {
t.Errorf("got %q, want hola", string(msg))
}
case <-time.After(time.Second):
t.Error("message not in Broadcast channel")
}
})
t.Run("retorna error si hub es nil", func(t *testing.T) {
err := WSBroadcast(nil, []byte("x"))
if err == nil {
t.Error("expected error for nil hub, got nil")
}
})
t.Run("el hub entrega el mensaje a todos los clientes registrados", func(t *testing.T) {
hub := NewWSHub()
go hub.Run()
defer hub.Stop()
c1 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c1"}
c2 := &WSClient{Hub: hub, Send: make(chan []byte, 4), ID: "c2"}
hub.Register <- c1
hub.Register <- c2
time.Sleep(20 * time.Millisecond)
WSBroadcast(hub, []byte("ping"))
time.Sleep(20 * time.Millisecond)
for _, c := range []*WSClient{c1, c2} {
select {
case msg := <-c.Send:
if string(msg) != "ping" {
t.Errorf("client %s got %q, want ping", c.ID, string(msg))
}
case <-time.After(time.Second):
t.Errorf("client %s did not receive broadcast", c.ID)
}
}
})
t.Run("cliente lento es desconectado del hub", func(t *testing.T) {
hub := NewWSHub()
go hub.Run()
defer hub.Stop()
// Cliente con buffer de 1 — el segundo broadcast lo tira
slow := &WSClient{Hub: hub, Send: make(chan []byte, 1), ID: "slow"}
hub.Register <- slow
time.Sleep(10 * time.Millisecond)
// Llenar buffer y forzar drop
WSBroadcast(hub, []byte("1"))
WSBroadcast(hub, []byte("2"))
time.Sleep(20 * time.Millisecond)
// Drenar Send: deberia tener el primer mensaje y luego estar cerrado
got := []string{}
for msg := range slow.Send {
got = append(got, string(msg))
}
if len(got) > 1 {
t.Errorf("expected at most 1 message in slow client, got %d: %v", len(got), got)
}
})
}
// --- WSSend ---
func TestWSSend(t *testing.T) {
t.Run("envia mensaje al canal Send del cliente", func(t *testing.T) {
client := &WSClient{Send: make(chan []byte, 4), ID: "c1"}
err := WSSend(client, []byte("hola"))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
got := <-client.Send
if string(got) != "hola" {
t.Errorf("got %q, want hola", string(got))
}
})
t.Run("retorna error si client es nil", func(t *testing.T) {
err := WSSend(nil, []byte("x"))
if err == nil {
t.Error("expected error for nil client, got nil")
}
})
t.Run("retorna error si el canal esta lleno", func(t *testing.T) {
client := &WSClient{Send: make(chan []byte, 1), ID: "c1"}
_ = WSSend(client, []byte("1"))
err := WSSend(client, []byte("2"))
if err == nil {
t.Error("expected error when send channel full, got nil")
}
})
}
// --- WSUpgrader & WSHandler integration ---
func TestWSUpgrader(t *testing.T) {
t.Run("upgradea conexion valida con `*` en origenes", func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := WSUpgrader(w, r, []string{"*"})
if err != nil {
return
}
defer conn.Close(websocket.StatusNormalClosure, "")
// Echo
_, data, _ := conn.Read(r.Context())
conn.Write(r.Context(), websocket.MessageText, data)
}))
defer ts.Close()
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
c, _, err := websocket.Dial(ctx, wsURL, nil)
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer c.Close(websocket.StatusNormalClosure, "")
if err := c.Write(ctx, websocket.MessageText, []byte("ping")); err != nil {
t.Fatalf("write failed: %v", err)
}
_, data, err := c.Read(ctx)
if err != nil {
t.Fatalf("read failed: %v", err)
}
if string(data) != "ping" {
t.Errorf("got %q, want ping", string(data))
}
})
}
func TestWSHandler(t *testing.T) {
t.Run("upgradea conexion y registra cliente en hub", func(t *testing.T) {
hub := NewWSHub()
go hub.Run()
defer hub.Stop()
ts := httptest.NewServer(WSHandler(hub, []string{"*"}))
defer ts.Close()
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
c, _, err := websocket.Dial(ctx, wsURL, nil)
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer c.Close(websocket.StatusNormalClosure, "")
// Esperar a que el hub procese el Register
time.Sleep(50 * time.Millisecond)
if hub.ClientCount() != 1 {
t.Errorf("expected 1 client in hub, got %d", hub.ClientCount())
}
})
t.Run("broadcast del hub llega al cliente conectado", func(t *testing.T) {
hub := NewWSHub()
go hub.Run()
defer hub.Stop()
ts := httptest.NewServer(WSHandler(hub, []string{"*"}))
defer ts.Close()
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
c, _, err := websocket.Dial(ctx, wsURL, nil)
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer c.Close(websocket.StatusNormalClosure, "")
time.Sleep(50 * time.Millisecond)
WSBroadcast(hub, []byte("hello-all"))
_, data, err := c.Read(ctx)
if err != nil {
t.Fatalf("read failed: %v", err)
}
if string(data) != "hello-all" {
t.Errorf("got %q, want hello-all", string(data))
}
})
t.Run("multiples clientes reciben el broadcast", func(t *testing.T) {
hub := NewWSHub()
go hub.Run()
defer hub.Stop()
ts := httptest.NewServer(WSHandler(hub, []string{"*"}))
defer ts.Close()
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
const N = 3
conns := make([]*websocket.Conn, N)
for i := 0; i < N; i++ {
c, _, err := websocket.Dial(ctx, wsURL, nil)
if err != nil {
t.Fatalf("dial %d failed: %v", i, err)
}
conns[i] = c
defer c.Close(websocket.StatusNormalClosure, "")
}
time.Sleep(80 * time.Millisecond)
WSBroadcast(hub, []byte("multicast"))
var wg sync.WaitGroup
errs := make(chan error, N)
for i := 0; i < N; i++ {
wg.Add(1)
go func(idx int, conn *websocket.Conn) {
defer wg.Done()
_, data, err := conn.Read(ctx)
if err != nil {
errs <- err
return
}
if string(data) != "multicast" {
errs <- &readMismatchError{idx: idx, got: string(data)}
}
}(i, conns[i])
}
wg.Wait()
close(errs)
for e := range errs {
t.Errorf("client receive error: %v", e)
}
})
t.Run("desregistra cliente al desconectar", func(t *testing.T) {
hub := NewWSHub()
go hub.Run()
defer hub.Stop()
ts := httptest.NewServer(WSHandler(hub, []string{"*"}))
defer ts.Close()
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http")
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
c, _, err := websocket.Dial(ctx, wsURL, nil)
if err != nil {
t.Fatalf("dial failed: %v", err)
}
time.Sleep(50 * time.Millisecond)
if hub.ClientCount() != 1 {
t.Fatalf("expected 1 client after dial, got %d", hub.ClientCount())
}
c.Close(websocket.StatusNormalClosure, "")
time.Sleep(100 * time.Millisecond)
if hub.ClientCount() != 0 {
t.Errorf("expected 0 clients after close, got %d", hub.ClientCount())
}
})
}
type readMismatchError struct {
idx int
got string
}
func (e *readMismatchError) Error() string {
return "client " + string(rune('0'+e.idx)) + " got " + e.got
}
+36
View File
@@ -0,0 +1,36 @@
package infra
import (
"fmt"
"net/http"
"nhooyr.io/websocket"
)
// WSUpgrader hace el upgrade de una conexion HTTP a WebSocket usando nhooyr.io/websocket.
// Si origins contiene "*" se acepta cualquier origen (InsecureSkipVerify=true).
// En caso contrario, OriginPatterns valida el header Origin del cliente con filepath.Match.
// Retorna el *websocket.Conn listo para Read/Write o un error si el handshake falla.
func WSUpgrader(w http.ResponseWriter, r *http.Request, origins []string) (*websocket.Conn, error) {
opts := &websocket.AcceptOptions{}
allowAny := false
for _, o := range origins {
if o == "*" {
allowAny = true
break
}
}
if allowAny {
opts.InsecureSkipVerify = true
} else {
opts.OriginPatterns = origins
}
conn, err := websocket.Accept(w, r, opts)
if err != nil {
return nil, fmt.Errorf("ws upgrade: %w", err)
}
return conn, nil
}
+55
View File
@@ -0,0 +1,55 @@
---
name: ws_upgrader
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func WSUpgrader(w http.ResponseWriter, r *http.Request, origins []string) (*websocket.Conn, error)"
description: "Hace el upgrade de una conexion HTTP a WebSocket usando nhooyr.io/websocket. Si origins contiene `*` se acepta cualquier origen (InsecureSkipVerify=true), en caso contrario OriginPatterns valida el header Origin del cliente con filepath.Match. Retorna el *websocket.Conn listo para Read/Write o un error si el handshake falla."
tags: [websocket, upgrade, http, server, infra, realtime]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [fmt, net/http, "nhooyr.io/websocket"]
params:
- name: w
desc: "http.ResponseWriter del request entrante. Debe soportar hijack (no envuelto en middlewares que rompan Hijacker)."
- name: r
desc: "*http.Request del cliente que pide el upgrade. Debe contener los headers Connection: Upgrade y Upgrade: websocket."
- name: origins
desc: "lista de patrones de origen permitidos (filepath.Match). Si contiene `*` se aceptan todos los origenes (modo inseguro, solo dev). Para produccion: lista explicita de hosts (ej: [`example.com`, `app.example.com`])."
output: "*websocket.Conn listo para Read/Write y error. Si el handshake falla (origen no autorizado, headers invalidos), el writer ya tiene la respuesta de error escrita."
tested: true
tests: ["upgradea conexion valida con origen permitido", "rechaza origen no permitido", "acepta cualquier origen con `*`"]
test_file_path: "functions/infra/ws_test.go"
file_path: "functions/infra/ws_upgrader.go"
---
## Ejemplo
```go
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := WSUpgrader(w, r, []string{"example.com", "app.example.com"})
if err != nil {
return // error ya escrito al writer
}
defer conn.Close(websocket.StatusNormalClosure, "")
for {
_, data, err := conn.Read(r.Context())
if err != nil {
return
}
conn.Write(r.Context(), websocket.MessageText, data) // echo
}
})
```
## Notas
`nhooyr.io/websocket` exige un patron de origen explicito o `InsecureSkipVerify=true` — no admite `*` como pattern. Esta funcion traduce `["*"]` a InsecureSkipVerify para mantener una API uniforme con CORS.
Para produccion: nunca usar `["*"]`, listar hosts explicitos. La validacion protege contra cross-origin WebSocket hijacking (un sitio malicioso abriendo WS al servidor desde el browser de la victima).
+1
View File
@@ -64,4 +64,5 @@ require (
golang.org/x/text v0.29.0 // indirect
golang.org/x/tools v0.36.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
nhooyr.io/websocket v1.8.17 // indirect
)
+2
View File
@@ -212,3 +212,5 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y=
nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
+44
View File
@@ -0,0 +1,44 @@
---
name: SSEEvent
lang: go
domain: infra
version: "1.0.0"
algebraic: product
definition: |
type SSEEvent struct {
Event string `json:"event"`
Data string `json:"data"`
ID string `json:"id"`
Retry int `json:"retry"`
}
description: "Evento Server-Sent Events segun la spec W3C. Campos opcionales: si Event esta vacio se envia solo data, si ID esta vacio no se incluye campo id, Retry en ms (0 = omitir y dejar el default del browser ~3000ms)."
tags: [sse, event, server-sent-events, infra, realtime]
uses_types: []
file_path: "functions/infra/sse_event.go"
---
## Ejemplo
```go
ev := SSEEvent{
Event: "metrics_update",
ID: "42",
Data: `{"cpu": 23.4, "mem": 87.1}`,
}
SSESend(w, ev)
```
Wire format generado:
```
event: metrics_update
id: 42
data: {"cpu": 23.4, "mem": 87.1}
```
## Notas
Tipo producto con campos opcionales. `Data` puede contener saltos de linea — el formateador los traduce a multiples lineas `data:` segun la spec. `Retry` solo se envia si es > 0; cuando se envia, indica al cliente cuantos ms esperar antes de reconectar.
Para enviar solo un comentario keepalive (no un evento), no se usa este tipo: se escribe `: keepalive\n\n` directamente al writer.
+38
View File
@@ -0,0 +1,38 @@
---
name: WSClient
lang: go
domain: infra
version: "1.0.0"
algebraic: product
definition: |
type WSClient struct {
Hub *WSHub
Conn *websocket.Conn
Send chan []byte
ID string
}
description: "Conexion WebSocket individual gestionada por un WSHub. Cada cliente tiene su propio canal Send buffereado para entregar mensajes en orden y un ID para identificarlo en broadcasts y handlers."
tags: [websocket, client, connection, server, infra, realtime]
uses_types: [WSHub_go_infra]
file_path: "functions/infra/ws_client.go"
---
## Ejemplo
```go
client := &WSClient{
Hub: hub,
Conn: wsConn,
Send: make(chan []byte, 64),
ID: "user-42",
}
hub.Register <- client
```
## Notas
Tipo producto. El canal `Send` debe ser buffereado (tipico 64-256) para evitar que un cliente lento bloquee el broadcast del hub. `Conn` usa `nhooyr.io/websocket` que soporta `context.Context` nativamente. `ID` es texto libre — la app que monta el handler decide su semantica (UUID, username, session id, etc.).
Cada cliente tiene normalmente dos goroutines internas:
- readPump: lee del Conn y publica al hub.Broadcast (o procesa con callback)
- writePump: consume del Send y escribe al Conn
+36
View File
@@ -0,0 +1,36 @@
---
name: WSHub
lang: go
domain: infra
version: "1.0.0"
algebraic: product
definition: |
type WSHub struct {
Clients map[*WSClient]bool
Broadcast chan []byte
Register chan *WSClient
Unregister chan *WSClient
}
description: "Hub que gestiona el ciclo de vida de conexiones WebSocket. Mantiene un mapa de clientes activos y canales para registro, desregistro y broadcast. Se ejecuta como goroutine via Run() y se compone con ws_handler para servir conexiones."
tags: [websocket, hub, server, broadcast, infra, realtime]
uses_types: [WSClient_go_infra]
file_path: "functions/infra/ws_hub.go"
---
## Ejemplo
```go
hub := NewWSHub()
go hub.Run()
defer hub.Stop()
routes := []Route{
{Method: "GET", Path: "/ws", Handler: WSHandler(hub, []string{"*"})},
}
```
## Notas
Tipo producto. El campo `done` interno (no exportado) controla el cierre limpio. `Broadcast` es buffereado (256) para no bloquear emisores. Cada cliente lento se desconecta automaticamente si su canal `Send` se llena durante un broadcast — esta es la garantia anti-backpressure: un cliente que no consume no afecta a los demas.
El loop `Run()` es de un solo thread escribiendo al mapa, asi que no hace falta mutex. Para parar limpiamente: `hub.Stop()` cierra `done`, libera todos los `client.Send` y termina el loop.
+37
View File
@@ -0,0 +1,37 @@
---
name: WSMessage
lang: go
domain: infra
version: "1.0.0"
algebraic: product
definition: |
type WSMessage struct {
Type string `json:"type"`
Payload []byte `json:"payload"`
SenderID string `json:"sender_id"`
Ts int64 `json:"ts"`
}
description: "Mensaje tipado que viaja por WebSocket entre cliente y servidor. El campo Type permite al receptor decidir como procesar el payload. Incluye SenderID y timestamp para trazabilidad."
tags: [websocket, message, protocol, infra, realtime]
uses_types: []
file_path: "functions/infra/ws_message.go"
---
## Ejemplo
```go
msg := WSMessage{
Type: "chat",
Payload: []byte(`{"text":"hola"}`),
SenderID: "user-1",
Ts: time.Now().UnixMilli(),
}
data, _ := json.Marshal(msg)
WSBroadcast(hub, data)
```
## Notas
Tipo producto. Las tags `json:` permiten serializar directamente a JSON para enviar por el wire. `Payload` es `[]byte` para permitir tanto JSON anidado como datos binarios codificados en base64. `Ts` en milisegundos epoch para compatibilidad con `Date.now()` en el browser.
No es obligatorio usar este tipo — apps que necesiten un protocolo distinto pueden enviar bytes arbitrarios via `WSBroadcast` o `WSSend`. Es solo un convenio recomendado para protocolos chat-like.