Files
fn_registry/functions/infra/docker_container_logs_test.go
egutierrez 621e8895c9 feat(infra): auto-commit con 86 cambios
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 19:38:15 +02:00

321 lines
8.7 KiB
Go

package infra
import (
"context"
"encoding/binary"
"errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
)
// buildDockerFrame construye un frame del protocolo de multiplexion de Docker.
// streamType: 1=stdout, 2=stderr. payload: contenido (puede incluir newline).
func buildDockerFrame(streamType uint8, payload string) []byte {
data := []byte(payload)
frame := make([]byte, 8+len(data))
frame[0] = streamType
binary.BigEndian.PutUint32(frame[4:8], uint32(len(data)))
copy(frame[8:], data)
return frame
}
// buildMultiFrame concatena multiples frames en un unico slice de bytes.
func buildMultiFrame(frames ...[]byte) []byte {
var buf []byte
for _, f := range frames {
buf = append(buf, f...)
}
return buf
}
func TestDockerContainerLogs_Snapshot(t *testing.T) {
t.Run("snapshot stdout y stderr demuxeados", func(t *testing.T) {
body := buildMultiFrame(
buildDockerFrame(1, "linea stdout 1\n"),
buildDockerFrame(2, "linea stderr 1\n"),
buildDockerFrame(1, "linea stdout 2\n"),
)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.Path, "/containers/") {
t.Errorf("path inesperado: %s", r.URL.Path)
}
w.WriteHeader(http.StatusOK)
w.Write(body)
}))
defer srv.Close()
opts := DockerLogsOpts{
ContainerID: "test-container",
Tail: 10,
Stdout: true,
Stderr: true,
DockerHost: "tcp://" + srv.Listener.Addr().String(),
}
lines, err := DockerContainerLogs(opts)
if err != nil {
t.Fatalf("DockerContainerLogs error: %v", err)
}
if len(lines) != 3 {
t.Fatalf("esperadas 3 lineas, got %d", len(lines))
}
if lines[0].Stream != "stdout" {
t.Errorf("linea[0].Stream = %q, want stdout", lines[0].Stream)
}
if lines[0].Line != "linea stdout 1" {
t.Errorf("linea[0].Line = %q, want 'linea stdout 1'", lines[0].Line)
}
if lines[1].Stream != "stderr" {
t.Errorf("linea[1].Stream = %q, want stderr", lines[1].Stream)
}
if lines[1].Line != "linea stderr 1" {
t.Errorf("linea[1].Line = %q, want 'linea stderr 1'", lines[1].Line)
}
if lines[2].Stream != "stdout" {
t.Errorf("linea[2].Stream = %q, want stdout", lines[2].Stream)
}
})
t.Run("container no encontrado retorna error", func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(`{"message":"No such container: missing"}`))
}))
defer srv.Close()
opts := DockerLogsOpts{
ContainerID: "missing",
DockerHost: "tcp://" + srv.Listener.Addr().String(),
}
_, err := DockerContainerLogs(opts)
if err == nil {
t.Fatal("esperaba error para container no encontrado")
}
if !strings.Contains(err.Error(), "missing") {
t.Errorf("error no menciona el container: %v", err)
}
})
t.Run("timestamps parseados del prefijo Docker", func(t *testing.T) {
// Docker prefija: "2026-05-23T12:00:00.000000000Z texto\n"
payload := "2026-05-23T12:00:00.000000000Z hello timestamps\n"
body := buildDockerFrame(1, payload)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Query().Get("timestamps") != "1" {
t.Errorf("timestamps param no enviado, query: %s", r.URL.RawQuery)
}
w.WriteHeader(http.StatusOK)
w.Write(body)
}))
defer srv.Close()
opts := DockerLogsOpts{
ContainerID: "ts-container",
Stdout: true,
Timestamps: true,
DockerHost: "tcp://" + srv.Listener.Addr().String(),
}
lines, err := DockerContainerLogs(opts)
if err != nil {
t.Fatalf("error: %v", err)
}
if len(lines) != 1 {
t.Fatalf("esperada 1 linea, got %d", len(lines))
}
if lines[0].Timestamp != "2026-05-23T12:00:00.000000000Z" {
t.Errorf("Timestamp = %q, want RFC3339", lines[0].Timestamp)
}
if lines[0].Line != "hello timestamps" {
t.Errorf("Line = %q, want 'hello timestamps'", lines[0].Line)
}
})
t.Run("tail y since se envian como query params", func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
if q.Get("tail") != "50" {
t.Errorf("tail = %q, want '50'", q.Get("tail"))
}
if q.Get("since") != "10m" {
t.Errorf("since = %q, want '10m'", q.Get("since"))
}
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()
opts := DockerLogsOpts{
ContainerID: "c1",
Tail: 50,
Since: "10m",
Stdout: true,
DockerHost: "tcp://" + srv.Listener.Addr().String(),
}
lines, err := DockerContainerLogs(opts)
if err != nil {
t.Fatalf("error: %v", err)
}
if len(lines) != 0 {
t.Errorf("esperadas 0 lineas de body vacio, got %d", len(lines))
}
})
}
func TestDockerContainerLogsStream(t *testing.T) {
t.Run("streaming recibe lineas via callback", func(t *testing.T) {
frames := buildMultiFrame(
buildDockerFrame(1, "stream line 1\n"),
buildDockerFrame(2, "stream line 2\n"),
buildDockerFrame(1, "stream line 3\n"),
)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Query().Get("follow") != "1" {
t.Errorf("follow param no enviado")
}
w.WriteHeader(http.StatusOK)
w.Write(frames)
}))
defer srv.Close()
opts := DockerLogsOpts{
ContainerID: "stream-container",
Stdout: true,
Stderr: true,
DockerHost: "tcp://" + srv.Listener.Addr().String(),
}
var received []DockerLogLine
ctx := context.Background()
err := DockerContainerLogsStream(ctx, opts, func(line DockerLogLine) error {
received = append(received, line)
return nil
})
if err != nil {
t.Fatalf("DockerContainerLogsStream error: %v", err)
}
if len(received) != 3 {
t.Fatalf("esperadas 3 lineas, got %d", len(received))
}
if received[0].Line != "stream line 1" {
t.Errorf("received[0].Line = %q", received[0].Line)
}
if received[1].Stream != "stderr" {
t.Errorf("received[1].Stream = %q, want stderr", received[1].Stream)
}
})
t.Run("ctx cancel detiene el stream", func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
flusher := w.(http.Flusher)
w.Write(buildDockerFrame(1, "antes del cancel\n"))
flusher.Flush()
// Bloquear hasta que el cliente cierre la conexion.
<-r.Context().Done()
}))
defer srv.Close()
opts := DockerLogsOpts{
ContainerID: "cancel-container",
Stdout: true,
DockerHost: "tcp://" + srv.Listener.Addr().String(),
}
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel()
var count int
err := DockerContainerLogsStream(ctx, opts, func(line DockerLogLine) error {
count++
return nil
})
if err == nil {
t.Error("esperaba error de cancelacion de contexto")
}
if count == 0 {
t.Error("esperaba recibir al menos 1 linea antes del cancel")
}
})
t.Run("callback error cancela el stream", func(t *testing.T) {
frames := buildMultiFrame(
buildDockerFrame(1, "linea 1\n"),
buildDockerFrame(1, "linea 2\n"),
buildDockerFrame(1, "linea 3\n"),
)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(frames)
}))
defer srv.Close()
opts := DockerLogsOpts{
ContainerID: "cb-error-container",
Stdout: true,
DockerHost: "tcp://" + srv.Listener.Addr().String(),
}
stopErr := errors.New("stop processing")
var count int
err := DockerContainerLogsStream(context.Background(), opts, func(line DockerLogLine) error {
count++
if count >= 2 {
return stopErr
}
return nil
})
if !errors.Is(err, stopErr) {
t.Errorf("esperaba stopErr, got: %v", err)
}
if count < 2 {
t.Errorf("esperaba al menos 2 invocaciones del callback, got %d", count)
}
if count > 3 {
t.Errorf("callback invocado demasiadas veces tras error: %d", count)
}
})
}
func TestDockerDemuxFrame(t *testing.T) {
t.Run("frame stdout decodificado correctamente", func(t *testing.T) {
payload := "hello world"
frame := buildDockerFrame(1, payload)
r := strings.NewReader(string(frame))
streamType, data, err := dockerDemuxFrame(r)
if err != nil {
t.Fatalf("error: %v", err)
}
if streamType != 1 {
t.Errorf("streamType = %d, want 1", streamType)
}
if string(data) != payload {
t.Errorf("payload = %q, want %q", string(data), payload)
}
})
t.Run("frame stderr decodificado correctamente", func(t *testing.T) {
frame := buildDockerFrame(2, "error line")
r := strings.NewReader(string(frame))
streamType, _, err := dockerDemuxFrame(r)
if err != nil {
t.Fatalf("error: %v", err)
}
if streamType != 2 {
t.Errorf("streamType = %d, want 2 (stderr)", streamType)
}
})
}