feat(infra): auto-commit con 86 cambios
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,23 +1,302 @@
|
||||
package infra
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// DockerContainerLogs obtiene los logs de un contenedor. tail limita las últimas N líneas (0 = todas).
|
||||
func DockerContainerLogs(nameOrID string, tail int) (string, error) {
|
||||
args := []string{"logs"}
|
||||
if tail > 0 {
|
||||
args = append(args, "--tail", strconv.Itoa(tail))
|
||||
// dockerHTTPClient devuelve un http.Client configurado para hablar con el daemon Docker.
|
||||
// host puede ser "" (unix socket por defecto), "unix:///ruta/al/socket" o "tcp://host:port".
|
||||
func dockerHTTPClient(host string) (*http.Client, string, error) {
|
||||
if host == "" {
|
||||
host = "unix:///var/run/docker.sock"
|
||||
}
|
||||
args = append(args, nameOrID)
|
||||
|
||||
out, err := exec.Command("docker", args...).CombinedOutput()
|
||||
u, err := url.Parse(host)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("docker logs %s: %w", nameOrID, err)
|
||||
return nil, "", fmt.Errorf("docker host URL invalida %q: %w", host, err)
|
||||
}
|
||||
|
||||
return string(out), nil
|
||||
var transport http.RoundTripper
|
||||
var baseURL string
|
||||
|
||||
switch u.Scheme {
|
||||
case "unix":
|
||||
socketPath := u.Path
|
||||
if socketPath == "" {
|
||||
socketPath = u.Host // algunos parsers meten el path en Host para unix://
|
||||
}
|
||||
transport = &http.Transport{
|
||||
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
|
||||
return (&net.Dialer{}).DialContext(ctx, "unix", socketPath)
|
||||
},
|
||||
}
|
||||
baseURL = "http://localhost"
|
||||
case "tcp", "http":
|
||||
transport = http.DefaultTransport
|
||||
baseURL = "http://" + u.Host
|
||||
case "https":
|
||||
transport = http.DefaultTransport
|
||||
baseURL = "https://" + u.Host
|
||||
default:
|
||||
return nil, "", fmt.Errorf("docker host scheme no soportado: %q", u.Scheme)
|
||||
}
|
||||
|
||||
return &http.Client{Transport: transport, Timeout: 0}, baseURL, nil
|
||||
}
|
||||
|
||||
// dockerLogsURL construye la URL para GET /containers/<id>/logs con los parametros dados.
|
||||
func dockerLogsURL(baseURL, containerID string, opts DockerLogsOpts, follow bool) string {
|
||||
tail := opts.Tail
|
||||
if tail == 0 {
|
||||
tail = 100
|
||||
}
|
||||
|
||||
tailStr := fmt.Sprintf("%d", tail)
|
||||
if tail < 0 {
|
||||
tailStr = "all"
|
||||
}
|
||||
|
||||
stdout := opts.Stdout
|
||||
stderr := opts.Stderr
|
||||
if !stdout && !stderr {
|
||||
stdout = true
|
||||
stderr = true
|
||||
}
|
||||
|
||||
params := url.Values{}
|
||||
params.Set("stdout", boolParam(stdout))
|
||||
params.Set("stderr", boolParam(stderr))
|
||||
params.Set("tail", tailStr)
|
||||
params.Set("timestamps", boolParam(opts.Timestamps))
|
||||
if follow {
|
||||
params.Set("follow", "1")
|
||||
} else {
|
||||
params.Set("follow", "0")
|
||||
}
|
||||
if opts.Since != "" {
|
||||
params.Set("since", opts.Since)
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s/containers/%s/logs?%s", baseURL, url.PathEscape(containerID), params.Encode())
|
||||
}
|
||||
|
||||
func boolParam(b bool) string {
|
||||
if b {
|
||||
return "1"
|
||||
}
|
||||
return "0"
|
||||
}
|
||||
|
||||
// dockerDemuxFrame lee un frame del protocolo de multiplexion de Docker.
|
||||
// El frame header tiene 8 bytes: [stream_type(1), 0,0,0, size(4 big-endian)].
|
||||
// Retorna (streamType, payload, error). streamType: 1=stdout, 2=stderr.
|
||||
// Retorna io.EOF cuando no hay mas frames.
|
||||
func dockerDemuxFrame(r io.Reader) (uint8, []byte, error) {
|
||||
var header [8]byte
|
||||
if _, err := io.ReadFull(r, header[:]); err != nil {
|
||||
return 0, nil, err // io.EOF si el stream termino limpiamente
|
||||
}
|
||||
|
||||
streamType := header[0]
|
||||
size := binary.BigEndian.Uint32(header[4:8])
|
||||
|
||||
if size == 0 {
|
||||
return streamType, nil, nil
|
||||
}
|
||||
|
||||
payload := make([]byte, size)
|
||||
if _, err := io.ReadFull(r, payload); err != nil {
|
||||
return 0, nil, fmt.Errorf("leyendo payload del frame docker: %w", err)
|
||||
}
|
||||
|
||||
return streamType, payload, nil
|
||||
}
|
||||
|
||||
// dockerStreamToString convierte el streamType del frame header al string "stdout"/"stderr".
|
||||
func dockerStreamToString(t uint8) string {
|
||||
if t == 2 {
|
||||
return "stderr"
|
||||
}
|
||||
return "stdout"
|
||||
}
|
||||
|
||||
// dockerParsePayload convierte el payload de un frame en DockerLogLines.
|
||||
// Cada linea del payload se convierte en una DockerLogLine separada.
|
||||
// Si timestamps esta habilitado, Docker prefija cada linea con "2006-01-02T15:04:05.000000000Z ".
|
||||
func dockerParsePayload(streamType uint8, payload []byte, timestamps bool) []DockerLogLine {
|
||||
raw := strings.TrimRight(string(payload), "\n")
|
||||
rawLines := strings.Split(raw, "\n")
|
||||
stream := dockerStreamToString(streamType)
|
||||
|
||||
lines := make([]DockerLogLine, 0, len(rawLines))
|
||||
for _, l := range rawLines {
|
||||
if l == "" {
|
||||
continue
|
||||
}
|
||||
line := DockerLogLine{Stream: stream}
|
||||
if timestamps {
|
||||
// Docker antepone timestamp seguido de espacio: "2026-05-23T12:00:00.000Z texto"
|
||||
idx := strings.Index(l, " ")
|
||||
if idx > 0 {
|
||||
line.Timestamp = l[:idx]
|
||||
line.Line = l[idx+1:]
|
||||
} else {
|
||||
line.Line = l
|
||||
}
|
||||
} else {
|
||||
line.Line = l
|
||||
}
|
||||
lines = append(lines, line)
|
||||
}
|
||||
return lines
|
||||
}
|
||||
|
||||
// DockerContainerLogs obtiene los logs de un contenedor en modo snapshot (sin follow).
|
||||
// Retorna hasta opts.Tail lineas (default 100, -1 = todas). Demuxea stdout/stderr.
|
||||
func DockerContainerLogs(opts DockerLogsOpts) ([]DockerLogLine, error) {
|
||||
client, baseURL, err := dockerHTTPClient(opts.DockerHost)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reqURL := dockerLogsURL(baseURL, opts.ContainerID, opts, false)
|
||||
|
||||
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, reqURL, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("construyendo request logs: %w", err)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("docker logs %s: %w", opts.ContainerID, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return nil, fmt.Errorf("container %q no encontrado", opts.ContainerID)
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
|
||||
return nil, fmt.Errorf("docker logs HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
|
||||
}
|
||||
|
||||
var result []DockerLogLine
|
||||
for {
|
||||
streamType, payload, err := dockerDemuxFrame(resp.Body)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("leyendo frame docker logs: %w", err)
|
||||
}
|
||||
if len(payload) == 0 {
|
||||
continue
|
||||
}
|
||||
lines := dockerParsePayload(streamType, payload, opts.Timestamps)
|
||||
result = append(result, lines...)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// DockerContainerLogsStream hace follow de los logs de un contenedor en modo streaming.
|
||||
// Por cada linea recibida llama cb. Si cb retorna error, el stream se cancela.
|
||||
// ctx permite cancelacion externa. No hace reconexion automatica — el caller decide si reintentar.
|
||||
func DockerContainerLogsStream(ctx context.Context, opts DockerLogsOpts, cb func(DockerLogLine) error) error {
|
||||
client, baseURL, err := dockerHTTPClient(opts.DockerHost)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Para streaming necesitamos un cliente sin timeout de lectura global.
|
||||
client.Timeout = 0
|
||||
|
||||
reqURL := dockerLogsURL(baseURL, opts.ContainerID, opts, true)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("construyendo request logs stream: %w", err)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
return fmt.Errorf("docker logs stream %s: %w", opts.ContainerID, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return fmt.Errorf("container %q no encontrado", opts.ContainerID)
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
|
||||
return fmt.Errorf("docker logs stream HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
|
||||
}
|
||||
|
||||
// Leer frames hasta ctx cancel, EOF o error de cb.
|
||||
for {
|
||||
// Verificar cancelacion antes de cada lectura.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
streamType, payload, err := dockerDemuxFrame(resp.Body)
|
||||
if err == io.EOF {
|
||||
return nil // contenedor termino o daemon cerro el stream
|
||||
}
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
return fmt.Errorf("leyendo frame docker logs stream: %w", err)
|
||||
}
|
||||
if len(payload) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
lines := dockerParsePayload(streamType, payload, opts.Timestamps)
|
||||
for _, line := range lines {
|
||||
if err := cb(line); err != nil {
|
||||
return err // caller cancela via error
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dockerSince convierte duracion tipo "10m" a unix timestamp string para la API de Docker.
|
||||
// La API acepta directamente duraciones asi que esta funcion es solo documentacion del contrato.
|
||||
// Docker Engine acepta: unix timestamp int, RFC3339 timestamp, o Go duration string ("10m", "1h30m").
|
||||
func dockerSince(_ string) string {
|
||||
// Documentacion: Docker acepta directamente la string. No necesitamos conversion.
|
||||
return ""
|
||||
}
|
||||
|
||||
// dockerClientWithTimeout crea un http.Client con timeout de conexion pero sin timeout de lectura.
|
||||
// Util para detectar rapido si el daemon no responde.
|
||||
func dockerClientWithTimeout(host string, connectTimeout time.Duration) (*http.Client, string, error) {
|
||||
client, baseURL, err := dockerHTTPClient(host)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
if transport, ok := client.Transport.(*http.Transport); ok {
|
||||
origDial := transport.DialContext
|
||||
transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
dialCtx, cancel := context.WithTimeout(ctx, connectTimeout)
|
||||
defer cancel()
|
||||
return origDial(dialCtx, network, addr)
|
||||
}
|
||||
}
|
||||
return client, baseURL, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user