feat(api): HTTP API REST+SSE para gestion remota de agentes (issue 0128)

Nuevo paquete internal/api con servidor HTTP stdlib (sin gin/echo):
- Auth Bearer via AGENTS_API_KEY con subtle.ConstantTimeCompare
- REST: GET /health (sin auth), GET/POST /agents, /agents/{id}, /{id}/{start,stop,restart,logs}
- SSE: /sse/status (broadcast diffs cada 2s) y /sse/agents/{id}/logs (tail -f)
- Pubsub in-memory (TODO: NATS cuando haya 2do cliente)
- Tail de logfiles: retroalimenta ultimos 50KB + poll 200ms para streaming

Integracion en cmd/launcher/main.go:
- Flag --api-port (0=desactivado, 8487 en produccion)
- Flag --api-key (override de AGENTS_API_KEY env var)
- Si apiPort>0 y sin clave, WARN y deshabilita en vez de fallar

Systemd unit en systemd/agents_and_robots.service:
- Restart=always (no on-failure — evita que exit limpio mate el service)
- EnvironmentFile para AGENTS_API_KEY y demas tokens
- WorkingDirectory=/home/ubuntu/CodeProyects/agents_and_robots

app.md v0.2.0:
- port: 8487, health_endpoint: /health (fix drift anterior donde era null)
- e2e_checks: build, tests, smoke_health, smoke_auth
- Documentacion Traefik+DNS pendiente humano post-merge

Tests: 12 tests unitarios en internal/api (auth, health, bus, agents, logs)
Smoke: /health 200, /agents sin auth 401, /agents con key 200 — verificado local

Co-Authored-By: fn-constructor (agent)
This commit is contained in:
2026-05-22 21:19:10 +02:00
parent 1f90953ccc
commit 98839cd8a8
10 changed files with 1110 additions and 7 deletions
+271
View File
@@ -0,0 +1,271 @@
package api
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
"github.com/enmanuel/agents/shell/process"
)
// --- Response types ---
// AgentResponse is the JSON representation of an agent.
type AgentResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
Desc string `json:"desc"`
Enabled bool `json:"enabled"`
Running bool `json:"running"`
PID int `json:"pid,omitempty"`
Instances int `json:"instances"`
ConfigPath string `json:"config_path"`
}
// AgentDetailResponse extends AgentResponse with logs.
type AgentDetailResponse struct {
AgentResponse
Logs []string `json:"logs"`
}
func agentResponse(s process.AgentStatus) AgentResponse {
return AgentResponse{
ID: s.ID,
Name: s.Name,
Version: s.Version,
Desc: s.Desc,
Enabled: s.Enabled,
Running: s.Running,
PID: s.PID,
Instances: s.Instances,
ConfigPath: s.ConfigPath,
}
}
// --- Health ---
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok", "time": time.Now().UTC().Format(time.RFC3339)})
}
// --- List agents ---
func (s *Server) handleListAgents(w http.ResponseWriter, r *http.Request) {
statuses, err := s.mgr.StatusAll()
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err))
return
}
resp := make([]AgentResponse, 0, len(statuses))
for _, st := range statuses {
resp = append(resp, agentResponse(st))
}
writeJSON(w, http.StatusOK, resp)
}
// --- Get single agent ---
func (s *Server) handleGetAgent(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
statuses, err := s.mgr.StatusAll()
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err))
return
}
var found *process.AgentStatus
for i, st := range statuses {
if st.ID == id {
found = &statuses[i]
break
}
}
if found == nil {
writeError(w, http.StatusNotFound, "agent not found")
return
}
n := 200
if qn := r.URL.Query().Get("n"); qn != "" {
if parsed, err := strconv.Atoi(qn); err == nil && parsed > 0 {
n = parsed
}
}
logs, _ := s.mgr.LogTail(id, n)
writeJSON(w, http.StatusOK, AgentDetailResponse{
AgentResponse: agentResponse(*found),
Logs: logs,
})
}
// --- Start agent ---
func (s *Server) handleStartAgent(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
agents, err := s.mgr.Scan()
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err))
return
}
var info *process.AgentInfo
for i, a := range agents {
if a.ID == id {
info = &agents[i]
break
}
}
if info == nil {
writeError(w, http.StatusNotFound, "agent not found")
return
}
if err := s.mgr.Start(*info); err != nil {
writeError(w, http.StatusConflict, fmt.Sprintf("start: %v", err))
return
}
s.logger.Info("agent started via api", "id", id)
writeJSON(w, http.StatusOK, map[string]string{"status": "started", "id": id})
}
// --- Stop agent ---
func (s *Server) handleStopAgent(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
if err := s.mgr.Stop(id); err != nil {
writeError(w, http.StatusConflict, fmt.Sprintf("stop: %v", err))
return
}
s.logger.Info("agent stopped via api", "id", id)
writeJSON(w, http.StatusOK, map[string]string{"status": "stopped", "id": id})
}
// --- Restart agent ---
func (s *Server) handleRestartAgent(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
// Stop first (ignore not-running error)
_ = s.mgr.Stop(id)
// Wait up to 3s for process to die
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
if !s.mgr.IsRunning(id) {
break
}
time.Sleep(200 * time.Millisecond)
}
// Find agent info for Start
agents, err := s.mgr.Scan()
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err))
return
}
var info *process.AgentInfo
for i, a := range agents {
if a.ID == id {
info = &agents[i]
break
}
}
if info == nil {
writeError(w, http.StatusNotFound, "agent not found")
return
}
if err := s.mgr.Start(*info); err != nil {
writeError(w, http.StatusConflict, fmt.Sprintf("restart/start: %v", err))
return
}
s.logger.Info("agent restarted via api", "id", id)
writeJSON(w, http.StatusOK, map[string]string{"status": "restarted", "id": id})
}
// --- Agent logs snapshot ---
func (s *Server) handleAgentLogs(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
n := 200
if qn := r.URL.Query().Get("n"); qn != "" {
if parsed, err := strconv.Atoi(qn); err == nil && parsed > 0 {
n = parsed
}
}
logs, err := s.mgr.LogTail(id, n)
if err != nil {
writeError(w, http.StatusNotFound, fmt.Sprintf("logs: %v", err))
return
}
writeJSON(w, http.StatusOK, map[string]any{"id": id, "lines": logs, "count": len(logs)})
}
// --- SSE: status broadcast ---
func (s *Server) handleSSEStatus(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
flusher.Flush()
sub := s.bus.Subscribe("status")
defer s.bus.Unsubscribe("status", sub)
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case ev, ok := <-sub:
if !ok {
return
}
data, _ := json.Marshal(ev)
fmt.Fprintf(w, "event: status\ndata: %s\n\n", data)
flusher.Flush()
}
}
}
// --- SSE: agent log tail ---
func (s *Server) handleSSEAgentLogs(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
logPath := s.mgr.LogPath(id)
if logPath == "" {
http.Error(w, "agent not found", http.StatusNotFound)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
flusher.Flush()
ctx := r.Context()
tailLogFile(ctx, logPath, w, flusher)
}
+91
View File
@@ -0,0 +1,91 @@
package api
import (
"context"
"time"
"github.com/enmanuel/agents/shell/process"
)
// StatusDiff is published to the "status" topic when an agent's running state changes.
type StatusDiff struct {
AgentID string `json:"agent_id"`
OldStatus bool `json:"old_running"`
NewStatus bool `json:"new_running"`
PID int `json:"pid,omitempty"`
}
// pollStatus polls StatusAll every 2s and publishes StatusDiff events on changes.
func (s *Server) pollStatus(ctx context.Context) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
// Seed the previous state map.
prev := make(map[string]bool)
if statuses, err := s.mgr.StatusAll(); err == nil {
for _, st := range statuses {
prev[st.ID] = st.Running
}
}
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.checkAndPublishDiffs(prev)
}
}
}
func (s *Server) checkAndPublishDiffs(prev map[string]bool) {
statuses, err := s.mgr.StatusAll()
if err != nil {
return
}
for _, st := range statuses {
old, known := prev[st.ID]
if !known || old != st.Running {
s.bus.Publish("status", StatusDiff{
AgentID: st.ID,
OldStatus: old,
NewStatus: st.Running,
PID: st.PID,
})
prev[st.ID] = st.Running
}
}
// Handle agents that were removed (disappeared from scan)
current := make(map[string]bool, len(statuses))
for _, st := range statuses {
current[st.ID] = true
}
for id, wasRunning := range prev {
if !current[id] {
if wasRunning {
s.bus.Publish("status", StatusDiff{
AgentID: id,
OldStatus: true,
NewStatus: false,
})
}
delete(prev, id)
}
}
}
// agentInfoByID finds AgentInfo by ID in a StatusAll scan.
func agentInfoByID(mgr *process.Manager, id string) (*process.AgentInfo, error) {
agents, err := mgr.Scan()
if err != nil {
return nil, err
}
for i, a := range agents {
if a.ID == id {
return &agents[i], nil
}
}
return nil, nil
}
+64
View File
@@ -0,0 +1,64 @@
// Package api — in-memory pub/sub bus for SSE broadcast.
//
// TODO(v0.2): if a second consumer (e.g. from another VPS) is added,
// replace this in-memory bus with NATS or Redis pub/sub. For now
// (1 local client) the overhead of an external broker is unwarranted.
package api
import (
"sync"
)
// Event is a generic event payload (JSON-serialisable).
type Event = any
// Bus is a simple in-memory pub/sub hub.
// Topics are arbitrary strings (e.g. "status", "logs/agent-id").
type Bus struct {
mu sync.RWMutex
subs map[string][]chan Event
}
// NewBus creates an initialised Bus.
func NewBus() *Bus {
return &Bus{subs: make(map[string][]chan Event)}
}
// Subscribe returns a channel that receives events published to topic.
// The channel is buffered (32) to avoid blocking the publisher.
func (b *Bus) Subscribe(topic string) <-chan Event {
ch := make(chan Event, 32)
b.mu.Lock()
b.subs[topic] = append(b.subs[topic], ch)
b.mu.Unlock()
return ch
}
// Unsubscribe removes ch from topic and closes it.
func (b *Bus) Unsubscribe(topic string, ch <-chan Event) {
b.mu.Lock()
defer b.mu.Unlock()
list := b.subs[topic]
for i, c := range list {
if c == ch {
close(c)
b.subs[topic] = append(list[:i], list[i+1:]...)
return
}
}
}
// Publish sends ev to all subscribers of topic.
// Non-blocking: if a subscriber channel is full, the event is dropped for that subscriber.
func (b *Bus) Publish(topic string, ev Event) {
b.mu.RLock()
list := b.subs[topic]
b.mu.RUnlock()
for _, ch := range list {
select {
case ch <- ev:
default:
// drop for this slow subscriber
}
}
}
+160
View File
@@ -0,0 +1,160 @@
// Package api provides the HTTP API server for agents_and_robots.
// It exposes REST endpoints for agent management and SSE streams for
// real-time status and log updates.
//
// Auth: every endpoint (except /health) requires:
//
// Authorization: Bearer <AGENTS_API_KEY>
//
// with crypto/subtle constant-time comparison.
package api
import (
"context"
"crypto/subtle"
"encoding/json"
"log/slog"
"net"
"net/http"
"strconv"
"time"
"github.com/enmanuel/agents/shell/process"
)
// Server is the HTTP API server.
type Server struct {
mgr *process.Manager
apiKey string
port int
logger *slog.Logger
bus *Bus
}
// New creates a new Server. apiKey is compared with subtle.ConstantTimeCompare.
// If apiKey is empty the server refuses to start.
func New(mgr *process.Manager, apiKey string, port int, logger *slog.Logger) *Server {
if logger == nil {
logger = slog.Default()
}
return &Server{
mgr: mgr,
apiKey: apiKey,
port: port,
logger: logger.With("component", "api"),
bus: NewBus(),
}
}
// Run starts the HTTP server and blocks until ctx is done.
// It also starts the status-diff poller that feeds /sse/status.
func (s *Server) Run(ctx context.Context) error {
mux := http.NewServeMux()
// Public endpoints
mux.HandleFunc("GET /health", s.handleHealth)
// Auth-gated REST endpoints
mux.Handle("GET /agents", s.auth(http.HandlerFunc(s.handleListAgents)))
mux.Handle("GET /agents/{id}", s.auth(http.HandlerFunc(s.handleGetAgent)))
mux.Handle("POST /agents/{id}/start", s.auth(http.HandlerFunc(s.handleStartAgent)))
mux.Handle("POST /agents/{id}/stop", s.auth(http.HandlerFunc(s.handleStopAgent)))
mux.Handle("POST /agents/{id}/restart", s.auth(http.HandlerFunc(s.handleRestartAgent)))
mux.Handle("GET /agents/{id}/logs", s.auth(http.HandlerFunc(s.handleAgentLogs)))
// SSE endpoints
mux.Handle("GET /sse/status", s.auth(http.HandlerFunc(s.handleSSEStatus)))
mux.Handle("GET /sse/agents/{id}/logs", s.auth(http.HandlerFunc(s.handleSSEAgentLogs)))
addr := ":" + strconv.Itoa(s.port)
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
srv := &http.Server{
Handler: s.logMiddleware(mux),
ReadTimeout: 10 * time.Second,
}
s.logger.Info("api server listening", "addr", addr)
// Start the status poller
go s.pollStatus(ctx)
errCh := make(chan error, 1)
go func() { errCh <- srv.Serve(ln) }()
select {
case <-ctx.Done():
shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return srv.Shutdown(shutCtx)
case err := <-errCh:
return err
}
}
// --- Auth middleware ---
func (s *Server) auth(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := extractBearerToken(r)
expected := []byte(s.apiKey)
got := []byte(key)
// Ensure equal-length comparison to avoid timing side-channel.
// subtle.ConstantTimeCompare returns 0 if lengths differ too.
if subtle.ConstantTimeCompare(got, expected) != 1 {
writeJSON(w, http.StatusUnauthorized, map[string]string{"error": "unauthorized"})
return
}
next.ServeHTTP(w, r)
})
}
func extractBearerToken(r *http.Request) string {
h := r.Header.Get("Authorization")
if len(h) > 7 && h[:7] == "Bearer " {
return h[7:]
}
return ""
}
// --- Log middleware ---
func (s *Server) logMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
rw := &statusWriter{ResponseWriter: w, code: http.StatusOK}
next.ServeHTTP(rw, r)
s.logger.Info("http",
"method", r.Method,
"path", r.URL.Path,
"status", rw.code,
"duration_ms", time.Since(start).Milliseconds(),
)
})
}
type statusWriter struct {
http.ResponseWriter
code int
}
func (sw *statusWriter) WriteHeader(code int) {
sw.code = code
sw.ResponseWriter.WriteHeader(code)
}
// --- Helpers ---
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(v)
}
func writeError(w http.ResponseWriter, status int, msg string) {
writeJSON(w, status, map[string]string{"error": msg})
}
+253
View File
@@ -0,0 +1,253 @@
package api
import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/enmanuel/agents/shell/process"
)
// newTestServer creates a Server with a real (temp-dir) Manager and a test API key.
func newTestServer(t *testing.T) *Server {
t.Helper()
dir := t.TempDir()
mgr := process.NewManager(dir+"/run", dir+"/agents/*/config.yaml", "")
return New(mgr, "test-key-abcd1234", 0, nil)
}
// --- Auth tests ---
func TestAuthMissingHeader(t *testing.T) {
s := newTestServer(t)
req := httptest.NewRequest(http.MethodGet, "/agents", nil)
w := httptest.NewRecorder()
s.auth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Error("handler called despite missing auth")
})).ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Fatalf("expected 401, got %d", w.Code)
}
}
func TestAuthWrongKey(t *testing.T) {
s := newTestServer(t)
req := httptest.NewRequest(http.MethodGet, "/agents", nil)
req.Header.Set("Authorization", "Bearer wrong-key")
w := httptest.NewRecorder()
s.auth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Error("handler called despite wrong key")
})).ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Fatalf("expected 401, got %d", w.Code)
}
}
func TestAuthCorrectKey(t *testing.T) {
s := newTestServer(t)
req := httptest.NewRequest(http.MethodGet, "/agents", nil)
req.Header.Set("Authorization", "Bearer test-key-abcd1234")
w := httptest.NewRecorder()
called := false
s.auth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
w.WriteHeader(http.StatusOK)
})).ServeHTTP(w, req)
if !called {
t.Fatal("handler not called with valid key")
}
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
}
// --- Health endpoint ---
func TestHealthEndpoint(t *testing.T) {
s := newTestServer(t)
mux := http.NewServeMux()
mux.HandleFunc("GET /health", s.handleHealth)
req := httptest.NewRequest(http.MethodGet, "/health", nil)
w := httptest.NewRecorder()
mux.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var resp map[string]string
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("decode: %v", err)
}
if resp["status"] != "ok" {
t.Errorf("expected status=ok, got %q", resp["status"])
}
}
// --- List agents ---
func TestListAgentsEmpty(t *testing.T) {
s := newTestServer(t)
mux := http.NewServeMux()
mux.Handle("GET /agents", s.auth(http.HandlerFunc(s.handleListAgents)))
req := httptest.NewRequest(http.MethodGet, "/agents", nil)
req.Header.Set("Authorization", "Bearer test-key-abcd1234")
w := httptest.NewRecorder()
mux.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
body, _ := io.ReadAll(w.Body)
// With empty agents dir, should return empty JSON array.
trimmed := strings.TrimSpace(string(body))
if trimmed != "null" && trimmed != "[]" {
// Accept both null and [] for empty slice serialisation.
t.Logf("body: %s (acceptable)", trimmed)
}
}
// --- Bus tests ---
func TestBusSubscribePublish(t *testing.T) {
b := NewBus()
ch := b.Subscribe("test")
defer b.Unsubscribe("test", ch)
b.Publish("test", "hello")
select {
case ev := <-ch:
if ev != "hello" {
t.Fatalf("expected 'hello', got %v", ev)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
}
func TestBusUnsubscribe(t *testing.T) {
b := NewBus()
ch := b.Subscribe("test")
b.Unsubscribe("test", ch)
// After unsubscribe, channel should be closed.
select {
case _, ok := <-ch:
if ok {
t.Fatal("channel should be closed after unsubscribe")
}
case <-time.After(100 * time.Millisecond):
t.Fatal("channel not closed after unsubscribe")
}
}
func TestBusMultipleSubscribers(t *testing.T) {
b := NewBus()
ch1 := b.Subscribe("x")
ch2 := b.Subscribe("x")
defer b.Unsubscribe("x", ch1)
defer b.Unsubscribe("x", ch2)
b.Publish("x", 42)
for _, ch := range []<-chan Event{ch1, ch2} {
select {
case ev := <-ch:
if ev != 42 {
t.Fatalf("expected 42, got %v", ev)
}
case <-time.After(time.Second):
t.Fatal("timeout")
}
}
}
// --- Get agent not found ---
func TestGetAgentNotFound(t *testing.T) {
s := newTestServer(t)
mux := http.NewServeMux()
mux.Handle("GET /agents/{id}", s.auth(http.HandlerFunc(s.handleGetAgent)))
req := httptest.NewRequest(http.MethodGet, "/agents/nonexistent", nil)
req.Header.Set("Authorization", "Bearer test-key-abcd1234")
w := httptest.NewRecorder()
mux.ServeHTTP(w, req)
if w.Code != http.StatusNotFound {
t.Fatalf("expected 404, got %d", w.Code)
}
}
// --- Stop agent not running ---
func TestStopAgentNotRunning(t *testing.T) {
s := newTestServer(t)
mux := http.NewServeMux()
mux.Handle("POST /agents/{id}/stop", s.auth(http.HandlerFunc(s.handleStopAgent)))
req := httptest.NewRequest(http.MethodPost, "/agents/ghost/stop", nil)
req.Header.Set("Authorization", "Bearer test-key-abcd1234")
w := httptest.NewRecorder()
mux.ServeHTTP(w, req)
// stop returns Conflict when the agent is not running
if w.Code != http.StatusConflict {
t.Fatalf("expected 409, got %d", w.Code)
}
}
// --- Logs endpoint ---
func TestLogsEndpointNotFound(t *testing.T) {
s := newTestServer(t)
mux := http.NewServeMux()
mux.Handle("GET /agents/{id}/logs", s.auth(http.HandlerFunc(s.handleAgentLogs)))
req := httptest.NewRequest(http.MethodGet, "/agents/nonexistent/logs?n=10", nil)
req.Header.Set("Authorization", "Bearer test-key-abcd1234")
w := httptest.NewRecorder()
mux.ServeHTTP(w, req)
if w.Code != http.StatusNotFound {
t.Fatalf("expected 404, got %d", w.Code)
}
}
// --- extractBearerToken ---
func TestExtractBearerToken(t *testing.T) {
cases := []struct {
header string
want string
}{
{"Bearer abc123", "abc123"},
{"bearer abc123", ""}, // case sensitive
{"Basic abc123", ""},
{"", ""},
{"Bearer ", ""},
}
for _, tc := range cases {
req := httptest.NewRequest(http.MethodGet, "/", nil)
if tc.header != "" {
req.Header.Set("Authorization", tc.header)
}
got := extractBearerToken(req)
if got != tc.want {
t.Errorf("header=%q: got=%q want=%q", tc.header, got, tc.want)
}
}
}
+90
View File
@@ -0,0 +1,90 @@
package api
import (
"bufio"
"context"
"fmt"
"io"
"net/http"
"os"
"time"
)
// tailLogFile streams new lines appended to path to w (SSE text/plain lines).
// Sends existing content first (last 200 lines), then polls for new content.
// Blocks until ctx is done.
func tailLogFile(ctx context.Context, path string, w http.ResponseWriter, flusher http.Flusher) {
f, err := os.Open(path)
if err != nil {
// File may not exist yet (agent hasn't written any logs).
// Wait for it to appear.
f = waitForFile(ctx, path)
if f == nil {
return // ctx cancelled before file appeared
}
}
defer f.Close()
// Seek to end minus ~50 KB to avoid dumping the whole file.
// This gives "recent context" without overwhelming the SSE stream.
const tailBytes = 50 * 1024
info, _ := f.Stat()
if info != nil && info.Size() > tailBytes {
_, _ = f.Seek(-tailBytes, io.SeekEnd)
// Skip incomplete first line
r := bufio.NewReader(f)
_, _ = r.ReadString('\n')
// Emit buffered remainder
for {
line, err := r.ReadString('\n')
if line != "" {
fmt.Fprintf(w, "event: log\ndata: %s\n\n", line)
flusher.Flush()
}
if err != nil {
break
}
}
}
// Tail the file: poll for new bytes every 200ms
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
reader := bufio.NewReader(f)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for {
line, err := reader.ReadString('\n')
if line != "" {
fmt.Fprintf(w, "event: log\ndata: %s\n\n", line)
flusher.Flush()
}
if err != nil {
// io.EOF means no more data right now — wait next tick
break
}
}
}
}
}
// waitForFile polls until path exists or ctx is done.
func waitForFile(ctx context.Context, path string) *os.File {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
f, err := os.Open(path)
if err == nil {
return f
}
}
}
}