Files
device_agent/capability_docker.go
T
2026-05-30 17:28:38 +02:00

319 lines
8.4 KiB
Go

package main
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"time"
)
// dockerSocketPath ruta del socket docker. Override via env DOCKER_HOST si fuera necesario en el futuro.
var dockerSocketPath = "/var/run/docker.sock"
// dockerHTTPClient devuelve un *http.Client cuyo Transport dial-tea via unix socket.
// hostOverride opcional: usar otro path (tests con httptest.NewServer sobre unix).
func dockerHTTPClient(sock string) *http.Client {
if sock == "" {
sock = dockerSocketPath
}
return &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
d := net.Dialer{Timeout: 5 * time.Second}
return d.DialContext(ctx, "unix", sock)
},
},
}
}
// dockerHostForReq host falso para HTTP sobre unix.
const dockerHost = "http://localhost"
// dockerGet hace GET path con query y devuelve body bytes.
func dockerGet(sock, path string, qs url.Values) ([]byte, int, error) {
cli := dockerHTTPClient(sock)
u := dockerHost + path
if len(qs) > 0 {
u += "?" + qs.Encode()
}
resp, err := cli.Get(u)
if err != nil {
return nil, -1, fmt.Errorf("docker GET %s: %w", path, err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
return body, resp.StatusCode, nil
}
// dockerPost hace POST path con body JSON (puede ser nil).
func dockerPost(sock, path string, body any) ([]byte, int, error) {
cli := dockerHTTPClient(sock)
var rdr io.Reader
if body != nil {
b, err := json.Marshal(body)
if err != nil {
return nil, -1, fmt.Errorf("marshal: %w", err)
}
rdr = strings.NewReader(string(b))
}
req, err := http.NewRequest("POST", dockerHost+path, rdr)
if err != nil {
return nil, -1, err
}
req.Header.Set("Content-Type", "application/json")
resp, err := cli.Do(req)
if err != nil {
return nil, -1, fmt.Errorf("docker POST %s: %w", path, err)
}
defer resp.Body.Close()
rb, _ := io.ReadAll(resp.Body)
return rb, resp.StatusCode, nil
}
// runDockerList wraps GET /containers/json.
func runDockerList(cap *Capability, args map[string]any) (any, int, error) {
_ = cap
qs := url.Values{}
if v, ok := args["all"]; ok {
if b, ok := v.(bool); ok && b {
qs.Set("all", "true")
}
}
if v, ok := args["filters"]; ok && v != nil {
// JSON-encoded filters per docker API
if filtersStr, ok := v.(string); ok {
qs.Set("filters", filtersStr)
} else {
b, _ := json.Marshal(v)
qs.Set("filters", string(b))
}
}
body, status, err := dockerGet(dockerSocketPath, "/containers/json", qs)
if err != nil {
return nil, -1, err
}
if status >= 400 {
return nil, status, fmt.Errorf("docker api status=%d: %s", status, string(body))
}
var containers []map[string]any
if err := json.Unmarshal(body, &containers); err != nil {
return nil, -1, fmt.Errorf("decode containers: %w", err)
}
// Slim para no devolver mega payload
slim := []map[string]any{}
for _, c := range containers {
entry := map[string]any{
"id": truncString(getStr(c, "Id"), 12),
"image": getStr(c, "Image"),
"state": getStr(c, "State"),
"status": getStr(c, "Status"),
}
if names, ok := c["Names"].([]any); ok && len(names) > 0 {
entry["name"] = strings.TrimPrefix(fmt.Sprintf("%v", names[0]), "/")
}
slim = append(slim, entry)
}
return map[string]any{
"containers": slim,
"count": len(slim),
}, 0, nil
}
// runDockerLogs GET /containers/{id}/logs?stdout=1&stderr=1&tail=N
func runDockerLogs(cap *Capability, args map[string]any) (any, int, error) {
_ = cap
container := mapStringField(args, "container")
if container == "" {
return nil, -1, fmt.Errorf("container required")
}
tail := mapIntField(args, "tail", 100)
since := mapStringField(args, "since")
qs := url.Values{}
qs.Set("stdout", "1")
qs.Set("stderr", "1")
if tail > 0 {
qs.Set("tail", fmt.Sprintf("%d", tail))
}
if since != "" {
qs.Set("since", since)
}
body, status, err := dockerGet(dockerSocketPath, "/containers/"+container+"/logs", qs)
if err != nil {
return nil, -1, err
}
if status >= 400 {
return nil, status, fmt.Errorf("docker logs status=%d: %s", status, string(body))
}
// Body es multiplexed con frame 8-byte header (stream_type|0|0|0|size_be32).
stdout, stderr := demuxDockerStream(body)
// Trunca a 256KB cada uno por defensa
const maxOut = 256 * 1024
if len(stdout) > maxOut {
stdout = stdout[len(stdout)-maxOut:]
}
if len(stderr) > maxOut {
stderr = stderr[len(stderr)-maxOut:]
}
return map[string]any{
"container": container,
"stdout": stdout,
"stderr": stderr,
"lines": strings.Split(stdout, "\n"),
"exit_code": 0,
}, 0, nil
}
// runDockerExec docker exec con whitelist binaries.
// Flujo: POST /containers/{id}/exec -> exec id. POST /exec/{id}/start (Detach=false, Tty=false) -> hijacked stream.
func runDockerExec(cap *Capability, args map[string]any) (any, int, error) {
container := mapStringField(args, "container")
if container == "" {
return nil, -1, fmt.Errorf("container required")
}
var argv []string
if raw, ok := args["argv"]; ok && raw != nil {
if arr, ok := raw.([]any); ok {
for _, v := range arr {
if s, ok := v.(string); ok {
argv = append(argv, s)
}
}
}
}
if len(argv) == 0 {
return nil, -1, fmt.Errorf("argv required (non-empty array)")
}
if len(cap.BinariesAllowed) == 0 {
return nil, -1, fmt.Errorf("no binaries whitelisted for docker.container.exec")
}
bin := argv[0]
allowed := false
for _, b := range cap.BinariesAllowed {
if b == bin || strings.HasSuffix(bin, "/"+b) {
allowed = true
break
}
}
if !allowed {
return nil, -1, fmt.Errorf("binary %q not in whitelist %v", bin, cap.BinariesAllowed)
}
// 1. Crear exec
createBody := map[string]any{
"Cmd": argv,
"AttachStdout": true,
"AttachStderr": true,
"Tty": false,
}
body, status, err := dockerPost(dockerSocketPath, "/containers/"+container+"/exec", createBody)
if err != nil {
return nil, -1, err
}
if status >= 400 {
return nil, status, fmt.Errorf("docker exec create status=%d: %s", status, string(body))
}
var created map[string]any
if err := json.Unmarshal(body, &created); err != nil {
return nil, -1, fmt.Errorf("decode exec create: %w", err)
}
execID, _ := created["Id"].(string)
if execID == "" {
return nil, -1, fmt.Errorf("exec create returned empty id")
}
// 2. Start exec con Detach=false hijacks la conexion.
startBody := map[string]any{"Detach": false, "Tty": false}
streamBody, status, err := dockerPost(dockerSocketPath, "/exec/"+execID+"/start", startBody)
if err != nil {
return nil, -1, err
}
if status >= 400 {
return nil, status, fmt.Errorf("docker exec start status=%d: %s", status, string(streamBody))
}
stdout, stderr := demuxDockerStream(streamBody)
// 3. Inspect para obtener exit_code
body2, status, err := dockerGet(dockerSocketPath, "/exec/"+execID+"/json", nil)
exitCode := -1
if err == nil && status < 400 {
var inspect map[string]any
if json.Unmarshal(body2, &inspect) == nil {
if v, ok := inspect["ExitCode"].(float64); ok {
exitCode = int(v)
}
}
}
// Trunca outputs
const maxOut = 256 * 1024
if len(stdout) > maxOut {
stdout = stdout[:maxOut]
}
if len(stderr) > maxOut {
stderr = stderr[:maxOut]
}
return map[string]any{
"container": container,
"argv": argv,
"stdout": stdout,
"stderr": stderr,
"exit_code": exitCode,
}, exitCode, nil
}
// demuxDockerStream parsea stream multiplexed de docker (8-byte header + payload).
// frame: [stream_type][0][0][0][size_be32][payload]. stream_type: 1=stdout 2=stderr.
// Si no parece multiplexed (sin header valido), devuelve todo como stdout.
func demuxDockerStream(b []byte) (string, string) {
var so, se strings.Builder
for len(b) >= 8 {
typ := b[0]
// size big-endian uint32 at offset 4
size := binary.BigEndian.Uint32(b[4:8])
if int(size) > len(b)-8 {
// frame corrupto, asumimos plain stdout
so.Write(b[8:])
return so.String(), se.String()
}
payload := b[8 : 8+size]
switch typ {
case 1:
so.Write(payload)
case 2:
se.Write(payload)
default:
// stdin (0) o stream desconocido -> stdout fallback
so.Write(payload)
}
b = b[8+size:]
}
if len(b) > 0 {
so.Write(b)
}
return so.String(), se.String()
}
func getStr(m map[string]any, k string) string {
if v, ok := m[k]; ok {
if s, ok := v.(string); ok {
return s
}
}
return ""
}
func truncString(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n]
}