4bce095964
Mueve duckdb_open, clickhouse_open, postgres_open, matrix_* y keyring_token_store
del paquete monolitico functions/infra a subpaquetes propios
(functions/infra/{duckdb,clickhouse,postgres,matrix,keyring}). El paquete infra ya
no importa los drivers (go-duckdb, clickhouse-go, pgx, mautrix, go-keyring), por lo
que las apps que solo usan funciones ligeras (process, cron, http, sqlite) dejan de
arrastrarlos. Reduccion de binarios: dag_engine 72->10MB, registry_api 70->8.7MB,
services_api 70->9MB, call_monitor 68->6.6MB, sqlite_api 70->8.9MB.
Los IDs del registry se mantienen estables (domain: infra en frontmatter). Se
preservan los build tags goolm/libolm de matrix_crypto_init.
Tambien corrige TestSSEHandler: el test leia el body con un unico Read() que con
HTTP chunked solo capturaba el primer evento; ahora usa io.ReadAll hasta EOF.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
276 lines
7.4 KiB
Go
276 lines
7.4 KiB
Go
package infra
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// nonFlushWriter es un http.ResponseWriter que NO implementa http.Flusher,
|
|
// usado para validar el path de error de SSEHandler.
|
|
type nonFlushWriter struct {
|
|
header http.Header
|
|
body []byte
|
|
status int
|
|
}
|
|
|
|
func (n *nonFlushWriter) Header() http.Header { return n.header }
|
|
func (n *nonFlushWriter) Write(b []byte) (int, error) {
|
|
n.body = append(n.body, b...)
|
|
return len(b), nil
|
|
}
|
|
func (n *nonFlushWriter) WriteHeader(s int) { n.status = s }
|
|
|
|
// flushRecorder es un httptest.ResponseRecorder que tambien implementa http.Flusher
|
|
// para que las funciones SSE puedan flushear sin perder los bytes en buffer.
|
|
type flushRecorder struct {
|
|
*httptest.ResponseRecorder
|
|
flushes int32
|
|
}
|
|
|
|
func newFlushRecorder() *flushRecorder {
|
|
return &flushRecorder{ResponseRecorder: httptest.NewRecorder()}
|
|
}
|
|
|
|
func (f *flushRecorder) Flush() {
|
|
atomic.AddInt32(&f.flushes, 1)
|
|
}
|
|
|
|
func (f *flushRecorder) FlushCount() int {
|
|
return int(atomic.LoadInt32(&f.flushes))
|
|
}
|
|
|
|
// --- SSESend ---
|
|
|
|
func TestSSESend(t *testing.T) {
|
|
t.Run("serializa data simple sin event ni id", func(t *testing.T) {
|
|
rec := newFlushRecorder()
|
|
err := SSESend(rec, SSEEvent{Data: "hola"})
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
body := rec.Body.String()
|
|
want := "data: hola\n\n"
|
|
if body != want {
|
|
t.Errorf("got body %q, want %q", body, want)
|
|
}
|
|
})
|
|
|
|
t.Run("incluye event cuando esta presente", func(t *testing.T) {
|
|
rec := newFlushRecorder()
|
|
_ = SSESend(rec, SSEEvent{Event: "tick", Data: "1"})
|
|
body := rec.Body.String()
|
|
if !strings.HasPrefix(body, "event: tick\n") {
|
|
t.Errorf("body should start with 'event: tick\\n', got %q", body)
|
|
}
|
|
})
|
|
|
|
t.Run("incluye id cuando esta presente", func(t *testing.T) {
|
|
rec := newFlushRecorder()
|
|
_ = SSESend(rec, SSEEvent{ID: "42", Data: "x"})
|
|
if !strings.Contains(rec.Body.String(), "id: 42\n") {
|
|
t.Errorf("body should contain 'id: 42\\n', got %q", rec.Body.String())
|
|
}
|
|
})
|
|
|
|
t.Run("incluye retry cuando es positivo", func(t *testing.T) {
|
|
rec := newFlushRecorder()
|
|
_ = SSESend(rec, SSEEvent{Retry: 5000, Data: "x"})
|
|
if !strings.Contains(rec.Body.String(), "retry: 5000\n") {
|
|
t.Errorf("body should contain 'retry: 5000\\n', got %q", rec.Body.String())
|
|
}
|
|
})
|
|
|
|
t.Run("omite retry cuando es 0", func(t *testing.T) {
|
|
rec := newFlushRecorder()
|
|
_ = SSESend(rec, SSEEvent{Retry: 0, Data: "x"})
|
|
if strings.Contains(rec.Body.String(), "retry:") {
|
|
t.Errorf("body should NOT contain 'retry:', got %q", rec.Body.String())
|
|
}
|
|
})
|
|
|
|
t.Run("data multilinea genera multiples lineas data:", func(t *testing.T) {
|
|
rec := newFlushRecorder()
|
|
_ = SSESend(rec, SSEEvent{Data: "linea1\nlinea2\nlinea3"})
|
|
body := rec.Body.String()
|
|
for _, want := range []string{"data: linea1\n", "data: linea2\n", "data: linea3\n"} {
|
|
if !strings.Contains(body, want) {
|
|
t.Errorf("body should contain %q, got %q", want, body)
|
|
}
|
|
}
|
|
})
|
|
|
|
t.Run("hace flush si el writer es Flusher", func(t *testing.T) {
|
|
rec := newFlushRecorder()
|
|
_ = SSESend(rec, SSEEvent{Data: "x"})
|
|
if rec.FlushCount() != 1 {
|
|
t.Errorf("expected 1 flush, got %d", rec.FlushCount())
|
|
}
|
|
})
|
|
|
|
t.Run("termina cada evento con doble salto de linea", func(t *testing.T) {
|
|
rec := newFlushRecorder()
|
|
_ = SSESend(rec, SSEEvent{Event: "x", Data: "y"})
|
|
body := rec.Body.String()
|
|
if !strings.HasSuffix(body, "\n\n") {
|
|
t.Errorf("body should end with '\\n\\n', got %q", body)
|
|
}
|
|
})
|
|
}
|
|
|
|
// --- SSEKeepalive ---
|
|
|
|
func TestSSEKeepalive(t *testing.T) {
|
|
t.Run("escribe comentario keepalive periodicamente", func(t *testing.T) {
|
|
rec := newFlushRecorder()
|
|
done := make(chan struct{})
|
|
|
|
go SSEKeepalive(rec, 10*time.Millisecond, done)
|
|
time.Sleep(50 * time.Millisecond)
|
|
close(done)
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
body := rec.Body.String()
|
|
if !strings.Contains(body, ": keepalive\n\n") {
|
|
t.Errorf("body should contain ': keepalive\\n\\n', got %q", body)
|
|
}
|
|
count := strings.Count(body, ": keepalive\n\n")
|
|
if count < 2 {
|
|
t.Errorf("expected at least 2 keepalives, got %d", count)
|
|
}
|
|
})
|
|
|
|
t.Run("termina cuando done se cierra", func(t *testing.T) {
|
|
rec := newFlushRecorder()
|
|
done := make(chan struct{})
|
|
finished := make(chan struct{})
|
|
|
|
go func() {
|
|
SSEKeepalive(rec, 5*time.Millisecond, done)
|
|
close(finished)
|
|
}()
|
|
|
|
close(done)
|
|
select {
|
|
case <-finished:
|
|
// ok
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("SSEKeepalive did not return after done was closed")
|
|
}
|
|
})
|
|
}
|
|
|
|
// --- SSEHandler ---
|
|
|
|
func TestSSEHandler(t *testing.T) {
|
|
t.Run("setea headers SSE correctos", func(t *testing.T) {
|
|
events := make(chan SSEEvent)
|
|
handler := SSEHandler(events)
|
|
|
|
// Lanzar handler en goroutine y cerrar canal para que retorne
|
|
go func() {
|
|
time.Sleep(20 * time.Millisecond)
|
|
close(events)
|
|
}()
|
|
|
|
ts := httptest.NewServer(handler)
|
|
defer ts.Close()
|
|
|
|
resp, err := http.Get(ts.URL)
|
|
if err != nil {
|
|
t.Fatalf("get failed: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if got := resp.Header.Get("Content-Type"); got != "text/event-stream" {
|
|
t.Errorf("Content-Type = %q, want text/event-stream", got)
|
|
}
|
|
if got := resp.Header.Get("Cache-Control"); got != "no-cache" {
|
|
t.Errorf("Cache-Control = %q, want no-cache", got)
|
|
}
|
|
})
|
|
|
|
t.Run("envia eventos del canal al writer", func(t *testing.T) {
|
|
events := make(chan SSEEvent, 2)
|
|
events <- SSEEvent{Event: "first", Data: "1"}
|
|
events <- SSEEvent{Event: "second", Data: "2"}
|
|
close(events)
|
|
|
|
handler := SSEHandler(events)
|
|
ts := httptest.NewServer(handler)
|
|
defer ts.Close()
|
|
|
|
resp, err := http.Get(ts.URL)
|
|
if err != nil {
|
|
t.Fatalf("get failed: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Leer el body completo hasta EOF. El canal se cierra antes de la
|
|
// peticion, asi que el handler envia ambos eventos y termina, cerrando
|
|
// el stream. Un unico Read podria devolver solo el primer chunk
|
|
// (event: first), porque io.Reader.Read no garantiza llenar el buffer;
|
|
// io.ReadAll consume todos los chunks emitidos por el handler.
|
|
raw, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
t.Fatalf("read body failed: %v", err)
|
|
}
|
|
body := string(raw)
|
|
|
|
for _, want := range []string{"event: first", "event: second", "data: 1", "data: 2"} {
|
|
if !strings.Contains(body, want) {
|
|
t.Errorf("body should contain %q, got %q", want, body)
|
|
}
|
|
}
|
|
})
|
|
|
|
t.Run("termina si el contexto del request se cancela", func(t *testing.T) {
|
|
events := make(chan SSEEvent)
|
|
handler := SSEHandler(events)
|
|
|
|
ts := httptest.NewServer(handler)
|
|
defer ts.Close()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
req, _ := http.NewRequestWithContext(ctx, "GET", ts.URL, nil)
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err == nil {
|
|
resp.Body.Close()
|
|
}
|
|
close(done)
|
|
}()
|
|
|
|
time.Sleep(20 * time.Millisecond)
|
|
cancel()
|
|
|
|
select {
|
|
case <-done:
|
|
// handler debe haber retornado tras la cancelacion del cliente
|
|
case <-time.After(2 * time.Second):
|
|
t.Error("handler did not return after context cancel")
|
|
}
|
|
})
|
|
|
|
t.Run("retorna 500 si el writer no es Flusher", func(t *testing.T) {
|
|
// nonFlushWriter no implementa Flusher
|
|
events := make(chan SSEEvent)
|
|
handler := SSEHandler(events)
|
|
|
|
rec := &nonFlushWriter{header: http.Header{}}
|
|
req := httptest.NewRequest("GET", "/", nil)
|
|
handler(rec, req)
|
|
|
|
if rec.status != http.StatusInternalServerError {
|
|
t.Errorf("got status %d, want 500", rec.status)
|
|
}
|
|
})
|
|
}
|