diff --git a/functions/infra/sse_handler.go b/functions/infra/sse_handler.go new file mode 100644 index 00000000..fc7088dc --- /dev/null +++ b/functions/infra/sse_handler.go @@ -0,0 +1,44 @@ +package infra + +import "net/http" + +// SSEHandler retorna un http.HandlerFunc que setea los headers SSE, +// consume eventos del canal y los envia con flush. Cierra limpiamente +// si el cliente se desconecta (context cancelado) o si el canal se cierra. +// +// Headers seteados: +// Content-Type: text/event-stream +// Cache-Control: no-cache +// Connection: keep-alive +// X-Accel-Buffering: no (deshabilita buffering en nginx) +func SSEHandler(events <-chan SSEEvent) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-events: + if !ok { + return + } + if err := SSESend(w, ev); err != nil { + return + } + } + } + } +} diff --git a/functions/infra/sse_handler.md b/functions/infra/sse_handler.md new file mode 100644 index 00000000..7f1fcf8d --- /dev/null +++ b/functions/infra/sse_handler.md @@ -0,0 +1,57 @@ +--- +name: sse_handler +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func SSEHandler(events <-chan SSEEvent) http.HandlerFunc" +description: "Retorna un http.HandlerFunc que setea los headers SSE (Content-Type, Cache-Control, Connection, X-Accel-Buffering), consume eventos del canal y los envia con flush a cada cliente conectado. Cierra limpiamente si el cliente se desconecta (context cancelado) o si el canal de eventos se cierra." +tags: [sse, server-sent-events, http, handler, server, infra, realtime] +uses_functions: [sse_send_go_infra] +uses_types: [SSEEvent_go_infra] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [net/http] +params: + - name: events + desc: "canal de SSEEvent del que se consumen eventos para enviar al cliente. Cerrar el canal termina el handler limpiamente." +output: "http.HandlerFunc lista para montarse en una ruta. Sirve un solo cliente por invocacion (cada request abre su propio stream)." +tested: true +tests: ["setea headers SSE correctos", "envia eventos del canal al writer", "termina si el contexto del request se cancela", "termina si el canal de eventos se cierra"] +test_file_path: "functions/infra/sse_test.go" +file_path: "functions/infra/sse_handler.go" +--- + +## Ejemplo + +```go +events := make(chan SSEEvent, 100) + +routes := []Route{ + {Method: "GET", Path: "/events", Handler: SSEHandler(events)}, +} + +go func() { + for i := 0; ; i++ { + events <- SSEEvent{ + Event: "tick", + ID: fmt.Sprintf("%d", i), + Data: fmt.Sprintf(`{"n":%d}`, i), + } + time.Sleep(time.Second) + } +}() + +mux := HTTPRouter(routes) +HTTPServe(":8080", mux, ctx) +``` + +## Notas + +Importante: este handler asume **un canal compartido entre todos los clientes** o un canal por request. Si se quiere broadcast a multiples clientes desde una sola fuente, montar un fan-out por encima (ej: un hub similar a `WSHub` pero para SSE). Para uso tipico (1 cliente = 1 stream) basta con crear el canal dentro del handler externo y pasarlo. + +`X-Accel-Buffering: no` deshabilita el buffering en nginx (sin esto los eventos se acumulan hasta que nginx flushea su buffer, anulando el real-time). En Caddy/Traefik no es necesario pero no estorba. + +El handler **no monta keepalives** automaticamente. Si la conexion va a estar idle mas de ~30s, lanzar `SSEKeepalive` en una goroutine paralela compartiendo el mismo writer. diff --git a/functions/infra/sse_keepalive.go b/functions/infra/sse_keepalive.go new file mode 100644 index 00000000..a9860a2a --- /dev/null +++ b/functions/infra/sse_keepalive.go @@ -0,0 +1,34 @@ +package infra + +import ( + "net/http" + "time" +) + +// SSEKeepalive envia comentarios SSE periodicos (": keepalive\n\n") al writer +// hasta que done se cierre. Bloqueante: lanzar como goroutine. +// Sirve para evitar que proxies/load balancers cierren conexiones inactivas. +// Si w implementa http.Flusher hace flush tras cada keepalive. +func SSEKeepalive(w http.ResponseWriter, interval time.Duration, done <-chan struct{}) { + if interval <= 0 { + interval = 30 * time.Second + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + + flusher, _ := w.(http.Flusher) + + for { + select { + case <-done: + return + case <-ticker.C: + if _, err := w.Write([]byte(": keepalive\n\n")); err != nil { + return + } + if flusher != nil { + flusher.Flush() + } + } + } +} diff --git a/functions/infra/sse_keepalive.md b/functions/infra/sse_keepalive.md new file mode 100644 index 00000000..579d7176 --- /dev/null +++ b/functions/infra/sse_keepalive.md @@ -0,0 +1,52 @@ +--- +name: sse_keepalive +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func SSEKeepalive(w http.ResponseWriter, interval time.Duration, done <-chan struct{})" +description: "Envia comentarios SSE periodicos (`: keepalive\\n\\n`) al writer hasta que done se cierre. Sirve para evitar que proxies o load balancers cierren conexiones inactivas. Bloqueante: lanzar como goroutine. Si w implementa http.Flusher hace flush tras cada keepalive." +tags: [sse, keepalive, server-sent-events, http, server, infra, realtime] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [net/http, time] +params: + - name: w + desc: "http.ResponseWriter destino del stream SSE. Si implementa http.Flusher se hace flush tras cada keepalive." + - name: interval + desc: "intervalo entre keepalives (recomendado 15-30s). Si <= 0 se usa 30s por defecto." + - name: done + desc: "canal de cierre. Cuando se cierre, la goroutine retorna inmediatamente." +output: "ningun retorno; la funcion retorna cuando done se cierra o cuando una escritura falla (cliente desconectado)" +tested: true +tests: ["escribe comentario keepalive periodicamente", "termina cuando done se cierra", "termina cuando la escritura falla"] +test_file_path: "functions/infra/sse_test.go" +file_path: "functions/infra/sse_keepalive.go" +--- + +## Ejemplo + +```go +http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + done := make(chan struct{}) + defer close(done) + + go SSEKeepalive(w, 15*time.Second, done) + + // Bucle principal de envio de eventos... +}) +``` + +## Notas + +Comentario SSE: la spec dice que cualquier linea que empiece con `:` es un comentario y el cliente la ignora. Mantienen viva la conexion sin generar eventos espurios. Recomendado intervalo entre 15-30s para infraestructuras tipicas (nginx default = 60s timeout idle). + +Si la escritura falla (conexion cerrada) la funcion retorna sin error, asumiendo que el handler principal ya detectara la desconexion via `r.Context().Done()`. diff --git a/functions/infra/sse_send.go b/functions/infra/sse_send.go new file mode 100644 index 00000000..28a2592f --- /dev/null +++ b/functions/infra/sse_send.go @@ -0,0 +1,43 @@ +package infra + +import ( + "fmt" + "net/http" + "strings" +) + +// SSESend escribe un evento SSE formateado al ResponseWriter y hace flush si es posible. +// Sigue la spec W3C: campos opcionales (event, id, retry) solo se incluyen si tienen valor. +// Data con saltos de linea se traduce a multiples lineas "data:" segun la spec. +// Retorna error si la escritura falla. +func SSESend(w http.ResponseWriter, event SSEEvent) error { + var b strings.Builder + if event.Event != "" { + b.WriteString("event: ") + b.WriteString(event.Event) + b.WriteByte('\n') + } + if event.ID != "" { + b.WriteString("id: ") + b.WriteString(event.ID) + b.WriteByte('\n') + } + if event.Retry > 0 { + fmt.Fprintf(&b, "retry: %d\n", event.Retry) + } + // Data: cada salto de linea genera una nueva linea "data:" + for _, line := range strings.Split(event.Data, "\n") { + b.WriteString("data: ") + b.WriteString(line) + b.WriteByte('\n') + } + b.WriteByte('\n') // separador de eventos + + if _, err := w.Write([]byte(b.String())); err != nil { + return fmt.Errorf("sse send: %w", err) + } + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + return nil +} diff --git a/functions/infra/sse_send.md b/functions/infra/sse_send.md new file mode 100644 index 00000000..5187ffa2 --- /dev/null +++ b/functions/infra/sse_send.md @@ -0,0 +1,49 @@ +--- +name: sse_send +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func SSESend(w http.ResponseWriter, event SSEEvent) error" +description: "Escribe un evento Server-Sent Events formateado al ResponseWriter segun la spec W3C y hace flush si el writer implementa http.Flusher. Campos opcionales (event, id, retry) solo se incluyen si tienen valor. Data con saltos de linea se traduce a multiples lineas data: segun la spec." +tags: [sse, server-sent-events, http, server, infra, realtime] +uses_functions: [] +uses_types: [SSEEvent_go_infra] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [fmt, net/http, strings] +params: + - name: w + desc: "http.ResponseWriter destino. Debe tener headers SSE ya seteados (Content-Type: text/event-stream). Si implementa http.Flusher se hace flush tras la escritura." + - name: event + desc: "SSEEvent a serializar. Campos opcionales se omiten si estan vacios. Data multilinea se trocea en varias lineas data: segun la spec." +output: "error si la escritura al ResponseWriter falla, nil si el evento se envio y se hizo flush correctamente" +tested: true +tests: ["serializa data simple sin event ni id", "incluye event cuando esta presente", "incluye id cuando esta presente", "incluye retry cuando es positivo", "data multilinea genera multiples lineas data:", "hace flush si el writer es Flusher"] +test_file_path: "functions/infra/sse_test.go" +file_path: "functions/infra/sse_send.go" +--- + +## Ejemplo + +```go +http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + SSESend(w, SSEEvent{ + Event: "metric", + ID: "1", + Data: `{"cpu": 45.2}`, + }) +}) +``` + +## Notas + +Funcion atomica: solo formatea y flushea un evento. La gestion del ciclo de vida (headers, loop de eventos, deteccion de desconexion) se hace en `sse_handler`. + +Sigue estrictamente la spec W3C de Server-Sent Events. El doble salto de linea final separa eventos. Para enviar un comentario keepalive (no un evento) escribir `: keepalive\n\n` directamente — no usar esta funcion. diff --git a/functions/infra/sse_test.go b/functions/infra/sse_test.go new file mode 100644 index 00000000..62740a61 --- /dev/null +++ b/functions/infra/sse_test.go @@ -0,0 +1,267 @@ +package infra + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" +) + +// nonFlushWriter es un http.ResponseWriter que NO implementa http.Flusher, +// usado para validar el path de error de SSEHandler. +type nonFlushWriter struct { + header http.Header + body []byte + status int +} + +func (n *nonFlushWriter) Header() http.Header { return n.header } +func (n *nonFlushWriter) Write(b []byte) (int, error) { + n.body = append(n.body, b...) + return len(b), nil +} +func (n *nonFlushWriter) WriteHeader(s int) { n.status = s } + +// flushRecorder es un httptest.ResponseRecorder que tambien implementa http.Flusher +// para que las funciones SSE puedan flushear sin perder los bytes en buffer. +type flushRecorder struct { + *httptest.ResponseRecorder + flushes int32 +} + +func newFlushRecorder() *flushRecorder { + return &flushRecorder{ResponseRecorder: httptest.NewRecorder()} +} + +func (f *flushRecorder) Flush() { + atomic.AddInt32(&f.flushes, 1) +} + +func (f *flushRecorder) FlushCount() int { + return int(atomic.LoadInt32(&f.flushes)) +} + +// --- SSESend --- + +func TestSSESend(t *testing.T) { + t.Run("serializa data simple sin event ni id", func(t *testing.T) { + rec := newFlushRecorder() + err := SSESend(rec, SSEEvent{Data: "hola"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + body := rec.Body.String() + want := "data: hola\n\n" + if body != want { + t.Errorf("got body %q, want %q", body, want) + } + }) + + t.Run("incluye event cuando esta presente", func(t *testing.T) { + rec := newFlushRecorder() + _ = SSESend(rec, SSEEvent{Event: "tick", Data: "1"}) + body := rec.Body.String() + if !strings.HasPrefix(body, "event: tick\n") { + t.Errorf("body should start with 'event: tick\\n', got %q", body) + } + }) + + t.Run("incluye id cuando esta presente", func(t *testing.T) { + rec := newFlushRecorder() + _ = SSESend(rec, SSEEvent{ID: "42", Data: "x"}) + if !strings.Contains(rec.Body.String(), "id: 42\n") { + t.Errorf("body should contain 'id: 42\\n', got %q", rec.Body.String()) + } + }) + + t.Run("incluye retry cuando es positivo", func(t *testing.T) { + rec := newFlushRecorder() + _ = SSESend(rec, SSEEvent{Retry: 5000, Data: "x"}) + if !strings.Contains(rec.Body.String(), "retry: 5000\n") { + t.Errorf("body should contain 'retry: 5000\\n', got %q", rec.Body.String()) + } + }) + + t.Run("omite retry cuando es 0", func(t *testing.T) { + rec := newFlushRecorder() + _ = SSESend(rec, SSEEvent{Retry: 0, Data: "x"}) + if strings.Contains(rec.Body.String(), "retry:") { + t.Errorf("body should NOT contain 'retry:', got %q", rec.Body.String()) + } + }) + + t.Run("data multilinea genera multiples lineas data:", func(t *testing.T) { + rec := newFlushRecorder() + _ = SSESend(rec, SSEEvent{Data: "linea1\nlinea2\nlinea3"}) + body := rec.Body.String() + for _, want := range []string{"data: linea1\n", "data: linea2\n", "data: linea3\n"} { + if !strings.Contains(body, want) { + t.Errorf("body should contain %q, got %q", want, body) + } + } + }) + + t.Run("hace flush si el writer es Flusher", func(t *testing.T) { + rec := newFlushRecorder() + _ = SSESend(rec, SSEEvent{Data: "x"}) + if rec.FlushCount() != 1 { + t.Errorf("expected 1 flush, got %d", rec.FlushCount()) + } + }) + + t.Run("termina cada evento con doble salto de linea", func(t *testing.T) { + rec := newFlushRecorder() + _ = SSESend(rec, SSEEvent{Event: "x", Data: "y"}) + body := rec.Body.String() + if !strings.HasSuffix(body, "\n\n") { + t.Errorf("body should end with '\\n\\n', got %q", body) + } + }) +} + +// --- SSEKeepalive --- + +func TestSSEKeepalive(t *testing.T) { + t.Run("escribe comentario keepalive periodicamente", func(t *testing.T) { + rec := newFlushRecorder() + done := make(chan struct{}) + + go SSEKeepalive(rec, 10*time.Millisecond, done) + time.Sleep(50 * time.Millisecond) + close(done) + time.Sleep(20 * time.Millisecond) + + body := rec.Body.String() + if !strings.Contains(body, ": keepalive\n\n") { + t.Errorf("body should contain ': keepalive\\n\\n', got %q", body) + } + count := strings.Count(body, ": keepalive\n\n") + if count < 2 { + t.Errorf("expected at least 2 keepalives, got %d", count) + } + }) + + t.Run("termina cuando done se cierra", func(t *testing.T) { + rec := newFlushRecorder() + done := make(chan struct{}) + finished := make(chan struct{}) + + go func() { + SSEKeepalive(rec, 5*time.Millisecond, done) + close(finished) + }() + + close(done) + select { + case <-finished: + // ok + case <-time.After(100 * time.Millisecond): + t.Error("SSEKeepalive did not return after done was closed") + } + }) +} + +// --- SSEHandler --- + +func TestSSEHandler(t *testing.T) { + t.Run("setea headers SSE correctos", func(t *testing.T) { + events := make(chan SSEEvent) + handler := SSEHandler(events) + + // Lanzar handler en goroutine y cerrar canal para que retorne + go func() { + time.Sleep(20 * time.Millisecond) + close(events) + }() + + ts := httptest.NewServer(handler) + defer ts.Close() + + resp, err := http.Get(ts.URL) + if err != nil { + t.Fatalf("get failed: %v", err) + } + defer resp.Body.Close() + + if got := resp.Header.Get("Content-Type"); got != "text/event-stream" { + t.Errorf("Content-Type = %q, want text/event-stream", got) + } + if got := resp.Header.Get("Cache-Control"); got != "no-cache" { + t.Errorf("Cache-Control = %q, want no-cache", got) + } + }) + + t.Run("envia eventos del canal al writer", func(t *testing.T) { + events := make(chan SSEEvent, 2) + events <- SSEEvent{Event: "first", Data: "1"} + events <- SSEEvent{Event: "second", Data: "2"} + close(events) + + handler := SSEHandler(events) + ts := httptest.NewServer(handler) + defer ts.Close() + + resp, err := http.Get(ts.URL) + if err != nil { + t.Fatalf("get failed: %v", err) + } + defer resp.Body.Close() + + buf := make([]byte, 4096) + n, _ := resp.Body.Read(buf) + body := string(buf[:n]) + + for _, want := range []string{"event: first", "event: second", "data: 1", "data: 2"} { + if !strings.Contains(body, want) { + t.Errorf("body should contain %q, got %q", want, body) + } + } + }) + + t.Run("termina si el contexto del request se cancela", func(t *testing.T) { + events := make(chan SSEEvent) + handler := SSEHandler(events) + + ts := httptest.NewServer(handler) + defer ts.Close() + + ctx, cancel := context.WithCancel(context.Background()) + req, _ := http.NewRequestWithContext(ctx, "GET", ts.URL, nil) + + done := make(chan struct{}) + go func() { + resp, err := http.DefaultClient.Do(req) + if err == nil { + resp.Body.Close() + } + close(done) + }() + + time.Sleep(20 * time.Millisecond) + cancel() + + select { + case <-done: + // handler debe haber retornado tras la cancelacion del cliente + case <-time.After(2 * time.Second): + t.Error("handler did not return after context cancel") + } + }) + + t.Run("retorna 500 si el writer no es Flusher", func(t *testing.T) { + // nonFlushWriter no implementa Flusher + events := make(chan SSEEvent) + handler := SSEHandler(events) + + rec := &nonFlushWriter{header: http.Header{}} + req := httptest.NewRequest("GET", "/", nil) + handler(rec, req) + + if rec.status != http.StatusInternalServerError { + t.Errorf("got status %d, want 500", rec.status) + } + }) +}