Files
agents_and_robots/internal/api/server.go
T
egutierrez 4822208306 fix(api): statusWriter implements http.Flusher for SSE handlers
The logMiddleware wrapper (statusWriter) didn't forward Flush, so
`w.(http.Flusher)` in SSE handlers failed and returned the plain text
"streaming unsupported" with 500. SSE clients (agents_dashboard C++ app)
saw a closed connection with no events.

Add Flush() that delegates to the embedded ResponseWriter when it
implements Flusher. Required for /sse/status and /sse/agents/{id}/logs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:32:06 +02:00

171 lines
4.5 KiB
Go

// Package api provides the HTTP API server for agents_and_robots.
// It exposes REST endpoints for agent management and SSE streams for
// real-time status and log updates.
//
// Auth: every endpoint (except /health) requires:
//
// Authorization: Bearer <AGENTS_API_KEY>
//
// with crypto/subtle constant-time comparison.
package api
import (
"context"
"crypto/subtle"
"encoding/json"
"log/slog"
"net"
"net/http"
"strconv"
"time"
"github.com/enmanuel/agents/shell/process"
)
// Server is the HTTP API server.
type Server struct {
mgr *process.Manager
apiKey string
port int
logger *slog.Logger
bus *Bus
}
// New creates a new Server. apiKey is compared with subtle.ConstantTimeCompare.
// If apiKey is empty the server refuses to start.
func New(mgr *process.Manager, apiKey string, port int, logger *slog.Logger) *Server {
if logger == nil {
logger = slog.Default()
}
return &Server{
mgr: mgr,
apiKey: apiKey,
port: port,
logger: logger.With("component", "api"),
bus: NewBus(),
}
}
// Run starts the HTTP server and blocks until ctx is done.
// It also starts the status-diff poller that feeds /sse/status.
func (s *Server) Run(ctx context.Context) error {
mux := http.NewServeMux()
// Public endpoints
mux.HandleFunc("GET /health", s.handleHealth)
// Auth-gated REST endpoints
mux.Handle("GET /agents", s.auth(http.HandlerFunc(s.handleListAgents)))
mux.Handle("GET /agents/{id}", s.auth(http.HandlerFunc(s.handleGetAgent)))
mux.Handle("POST /agents/{id}/start", s.auth(http.HandlerFunc(s.handleStartAgent)))
mux.Handle("POST /agents/{id}/stop", s.auth(http.HandlerFunc(s.handleStopAgent)))
mux.Handle("POST /agents/{id}/restart", s.auth(http.HandlerFunc(s.handleRestartAgent)))
mux.Handle("GET /agents/{id}/logs", s.auth(http.HandlerFunc(s.handleAgentLogs)))
// SSE endpoints
mux.Handle("GET /sse/status", s.auth(http.HandlerFunc(s.handleSSEStatus)))
mux.Handle("GET /sse/agents/{id}/logs", s.auth(http.HandlerFunc(s.handleSSEAgentLogs)))
addr := ":" + strconv.Itoa(s.port)
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
srv := &http.Server{
Handler: s.logMiddleware(mux),
ReadTimeout: 10 * time.Second,
}
s.logger.Info("api server listening", "addr", addr)
// Start the status poller
go s.pollStatus(ctx)
errCh := make(chan error, 1)
go func() { errCh <- srv.Serve(ln) }()
select {
case <-ctx.Done():
shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return srv.Shutdown(shutCtx)
case err := <-errCh:
return err
}
}
// --- Auth middleware ---
func (s *Server) auth(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := extractBearerToken(r)
expected := []byte(s.apiKey)
got := []byte(key)
// Ensure equal-length comparison to avoid timing side-channel.
// subtle.ConstantTimeCompare returns 0 if lengths differ too.
if subtle.ConstantTimeCompare(got, expected) != 1 {
writeJSON(w, http.StatusUnauthorized, map[string]string{"error": "unauthorized"})
return
}
next.ServeHTTP(w, r)
})
}
func extractBearerToken(r *http.Request) string {
h := r.Header.Get("Authorization")
if len(h) > 7 && h[:7] == "Bearer " {
return h[7:]
}
return ""
}
// --- Log middleware ---
func (s *Server) logMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
rw := &statusWriter{ResponseWriter: w, code: http.StatusOK}
next.ServeHTTP(rw, r)
s.logger.Info("http",
"method", r.Method,
"path", r.URL.Path,
"status", rw.code,
"duration_ms", time.Since(start).Milliseconds(),
)
})
}
type statusWriter struct {
http.ResponseWriter
code int
}
func (sw *statusWriter) WriteHeader(code int) {
sw.code = code
sw.ResponseWriter.WriteHeader(code)
}
// Flush forwards to the underlying ResponseWriter when it implements Flusher.
// Without this method, the type assertion `w.(http.Flusher)` in the SSE handlers
// fails (the wrapper hides the inner Flusher), and the handler aborts with
// "streaming unsupported".
func (sw *statusWriter) Flush() {
if f, ok := sw.ResponseWriter.(http.Flusher); ok {
f.Flush()
}
}
// --- Helpers ---
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(v)
}
func writeError(w http.ResponseWriter, status int, msg string) {
writeJSON(w, status, map[string]string{"error": msg})
}