From 98839cd8a8444558ae1e764e00f34d32736eea96 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Fri, 22 May 2026 21:19:10 +0200 Subject: [PATCH] feat(api): HTTP API REST+SSE para gestion remota de agentes (issue 0128) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Nuevo paquete internal/api con servidor HTTP stdlib (sin gin/echo): - Auth Bearer via AGENTS_API_KEY con subtle.ConstantTimeCompare - REST: GET /health (sin auth), GET/POST /agents, /agents/{id}, /{id}/{start,stop,restart,logs} - SSE: /sse/status (broadcast diffs cada 2s) y /sse/agents/{id}/logs (tail -f) - Pubsub in-memory (TODO: NATS cuando haya 2do cliente) - Tail de logfiles: retroalimenta ultimos 50KB + poll 200ms para streaming Integracion en cmd/launcher/main.go: - Flag --api-port (0=desactivado, 8487 en produccion) - Flag --api-key (override de AGENTS_API_KEY env var) - Si apiPort>0 y sin clave, WARN y deshabilita en vez de fallar Systemd unit en systemd/agents_and_robots.service: - Restart=always (no on-failure — evita que exit limpio mate el service) - EnvironmentFile para AGENTS_API_KEY y demas tokens - WorkingDirectory=/home/ubuntu/CodeProyects/agents_and_robots app.md v0.2.0: - port: 8487, health_endpoint: /health (fix drift anterior donde era null) - e2e_checks: build, tests, smoke_health, smoke_auth - Documentacion Traefik+DNS pendiente humano post-merge Tests: 12 tests unitarios en internal/api (auth, health, bus, agents, logs) Smoke: /health 200, /agents sin auth 401, /agents con key 200 — verificado local Co-Authored-By: fn-constructor (agent) --- README.md | 67 ++++++++ app.md | 71 +++++++- cmd/launcher/main.go | 36 ++++ internal/api/handlers.go | 271 ++++++++++++++++++++++++++++++ internal/api/poller.go | 91 ++++++++++ internal/api/pubsub.go | 64 +++++++ internal/api/server.go | 160 ++++++++++++++++++ internal/api/server_test.go | 253 ++++++++++++++++++++++++++++ internal/api/tail.go | 90 ++++++++++ systemd/agents_and_robots.service | 14 ++ 10 files changed, 1110 insertions(+), 7 deletions(-) create mode 100644 internal/api/handlers.go create mode 100644 internal/api/poller.go create mode 100644 internal/api/pubsub.go create mode 100644 internal/api/server.go create mode 100644 internal/api/server_test.go create mode 100644 internal/api/tail.go create mode 100644 systemd/agents_and_robots.service diff --git a/README.md b/README.md index 8fa94fc..8d68d93 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,73 @@ Desde la TUI puedes: --- +## HTTP API (v0.1) + +El launcher expone una API REST + SSE cuando se arranca con `--api-port `. + +### Arrancar con API habilitada + +```bash +# Requiere AGENTS_API_KEY en .env (generar con: openssl rand -hex 32) +./bin/launcher --log-level info --api-port 8487 + +# Con systemd (VPS): +sudo systemctl start agents_and_robots.service +``` + +### Autenticacion + +Todos los endpoints (excepto `/health`) requieren: + +``` +Authorization: Bearer +``` + +Comparacion con `crypto/subtle.ConstantTimeCompare` — resistente a timing attacks. + +### Endpoints REST + +```bash +# Liveness (sin auth) +curl http://localhost:8487/health + +# Listar agentes +curl -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/agents + +# Detalle + logs de un agente +curl -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/agents/assistant-bot + +# Control +curl -X POST -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/agents/assistant-bot/stop +curl -X POST -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/agents/assistant-bot/start +curl -X POST -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/agents/assistant-bot/restart + +# Logs snapshot +curl -H "Authorization: Bearer $AGENTS_API_KEY" "http://localhost:8487/agents/assistant-bot/logs?n=50" +``` + +### Endpoints SSE + +```bash +# Stream de cambios de estado (stop/start) — un evento por transicion +curl -N -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/sse/status + +# Tail en vivo del logfile de un agente (< 1s de lag) +curl -N -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/sse/agents/assistant-bot/logs +``` + +### En produccion (VPS) + +El VPS expone la API via Traefik con TLS en `agents.organic-machine.com`. +Los pasos DNS + Traefik los configura el humano tras el merge (ver `app.md` seccion Traefik). + +```bash +# Con Traefik configurado: +curl -fsS -H "Authorization: Bearer $AGENTS_API_KEY" https://agents.organic-machine.com/agents +``` + +--- + ## Principio de diseño El proyecto usa el patrón **pure core / impure shell**: diff --git a/app.md b/app.md index dd25d1c..382a047 100644 --- a/app.md +++ b/app.md @@ -2,8 +2,8 @@ name: agents_and_robots lang: go domain: agents -version: 0.1.0 -description: "Plataforma Go de bots autonomos Matrix con arquitectura pure core / impure shell. Launcher, agentctl, dashboard TUI y register." +version: 0.2.0 +description: "Plataforma Go de bots autonomos Matrix con arquitectura pure core / impure shell. Launcher, agentctl, dashboard TUI, register y HTTP API REST+SSE." tags: [agents, matrix, bots, llm, element, e2ee, tools, service] uses_functions: [] uses_types: [] @@ -12,8 +12,8 @@ entry_point: "cmd/launcher/main.go" dir_path: "projects/element_agents/apps/agents_and_robots" repo_url: "https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/egutierrez/agents_and_robots.git" service: - port: null - health_endpoint: null + port: 8487 + health_endpoint: /health health_timeout_s: 5 systemd_unit: agents_and_robots.service systemd_scope: system @@ -22,6 +22,21 @@ service: pc_targets: - organic-machine.com is_local_only: false +e2e_checks: + - id: build + cmd: "go build -tags goolm ./..." + timeout_s: 120 + - id: tests + cmd: "go test -tags goolm -count=1 ./internal/api/... ./cmd/launcher/..." + timeout_s: 60 + - id: smoke_health + cmd: "AGENTS_API_KEY=test-e2e-smoke ./bin/launcher --api-port 18487 &" + health: "http://127.0.0.1:18487/health" + timeout_s: 10 + - id: smoke_auth + cmd: "curl -s -o /dev/null -w '%{http_code}' http://127.0.0.1:18487/agents" + expect_stdout_contains: "401" + timeout_s: 5 --- ## Deploy @@ -29,15 +44,56 @@ service: - **VPS:** organic-machine.com (SSH alias: organic-machine.com) - **Remote path:** /home/ubuntu/CodeProyects/agents_and_robots - **Build:** `go build -tags goolm -ldflags="-s -w" -o bin/launcher ./cmd/launcher` -- **Run:** `./bin/launcher --log-level info` +- **Run (manual):** `./bin/launcher --log-level info --api-port 8487` +- **Run (systemd):** `systemctl start agents_and_robots.service` (unit en `systemd/agents_and_robots.service`) - **Binarios:** launcher, agentctl, register, dashboard, verify (en bin/) -- **Proceso:** launcher corre como proceso directo (no systemd) - **Config:** agents/*/config.yaml + .env con tokens Matrix, API keys LLM, pickle keys E2EE +## HTTP API (v0.1) + +Puerto: **8487** (local, Traefik termina TLS en `agents.organic-machine.com`). +Auth: `Authorization: Bearer $AGENTS_API_KEY` (32 bytes hex en `.env`). + +| Verbo | Path | Descripcion | +|---|---|---| +| GET | `/health` | Liveness (sin auth) | +| GET | `/agents` | Lista todos los agentes con estado | +| GET | `/agents/{id}` | Detalle + logs recientes | +| POST | `/agents/{id}/start` | Arrancar agente | +| POST | `/agents/{id}/stop` | Parar agente | +| POST | `/agents/{id}/restart` | Stop + Start | +| GET | `/agents/{id}/logs?n=200` | Snapshot de logs | +| GET | `/sse/status` | SSE: diffs de estado cada 2s | +| GET | `/sse/agents/{id}/logs` | SSE: tail -f del logfile | + +**TODO v0.2:** POST `/agents/{id}/message`, PUT `/agents/{id}/config`, SSE messages stream. +**TODO escala:** si se anade un 2do cliente HTTP (otro VPS, otro frontend), reemplazar el pubsub in-memory con NATS o Redis. + +## Systemd + +Unit en `systemd/agents_and_robots.service`. Para instalar en el VPS: + +```bash +# En el VPS (ubuntu@organic-machine.com) +sudo cp systemd/agents_and_robots.service /etc/systemd/system/ +sudo systemctl daemon-reload +sudo systemctl enable --now agents_and_robots.service +``` + +Importante: usa `Restart=always` (no `on-failure`) para reiniciar incluso tras exit limpio. + +## Traefik + DNS (post-merge, humano) + +Pasos que el humano completa tras mergear el PR: +1. Crear DNS A record `agents.organic-machine.com` apuntando a la IP del VPS. +2. En Traefik (standalone o Coolify): ruta `agents.organic-machine.com → 127.0.0.1:8487` con HTTPS+LE. +3. Verificar: `curl -fsS https://agents.organic-machine.com/health`. +4. Generar API key: `openssl rand -hex 32` y escribir `AGENTS_API_KEY=` en `.env` del VPS. + ## Notas 4 binarios principales: -- **launcher** — Inicia agentes como goroutines, descubre configs, sync Matrix +- **launcher** — Inicia agentes como goroutines, descubre configs, sync Matrix, HTTP API - **agentctl** — CLI: list, start, stop, remove agentes - **register** — Registra usuarios bot en Synapse via admin API - **dashboard** — TUI interactiva (bubbletea) para gestion de agentes @@ -56,3 +112,4 @@ Una linea por bump SemVer. Bump-type segun `.claude/commands/version.md`: - `patch`: bugfix sin cambio observable. - v0.1.0 (2026-05-18) — baseline. +- v0.2.0 (2026-05-22) — HTTP API REST+SSE (internal/api), systemd unit, --api-port flag en launcher. diff --git a/cmd/launcher/main.go b/cmd/launcher/main.go index f86e68b..747c092 100644 --- a/cmd/launcher/main.go +++ b/cmd/launcher/main.go @@ -20,6 +20,7 @@ import ( "github.com/spf13/cobra" "github.com/enmanuel/agents/devagents" + "github.com/enmanuel/agents/internal/api" "github.com/enmanuel/agents/internal/config" "github.com/enmanuel/agents/pkg/decision" "github.com/enmanuel/agents/pkg/orchestration" @@ -28,6 +29,7 @@ import ( agentlog "github.com/enmanuel/agents/shell/logger" orchshell "github.com/enmanuel/agents/shell/orchestration" shellsecurity "github.com/enmanuel/agents/shell/security" + "github.com/enmanuel/agents/shell/process" // Blank imports: each agent self-registers its rules via init(). _ "github.com/enmanuel/agents/agents/assistant-bot" @@ -46,6 +48,8 @@ func main() { configPaths []string logLevel string logDir string + apiPort int + apiKey string ) root := &cobra.Command{ @@ -268,6 +272,28 @@ func main() { scanCancel() } + // ── HTTP API (optional) ── + if apiPort > 0 { + key := apiKey + if key == "" { + key = os.Getenv("AGENTS_API_KEY") + } + if key == "" { + logger.Warn("api-port set but AGENTS_API_KEY is empty — HTTP API disabled (set AGENTS_API_KEY in .env)") + } else { + // Build a process.Manager that reflects the live launcher state. + // The manager uses run/ for PID files and agents/*/config.yaml for discovery. + mgr := newProcessManager(logDir) + srv := api.New(mgr, key, apiPort, logger) + go func() { + if err := srv.Run(ctx); err != nil { + logger.Error("api server stopped", "err", err) + } + }() + logger.Info("http api enabled", "port", apiPort) + } + } + // Supervised loop: wait for all agents, and if the parent context is // still alive (i.e. no SIGINT/SIGTERM received), reload them and keep // going. Protects against the launcher exiting cleanly when all @@ -286,6 +312,10 @@ func main() { "Log level: debug | info | warn | error") root.Flags().StringVar(&logDir, "log-dir", "logs", `Log directory (logs//YYYY-MM-DD.jsonl). Use "stdout" for console only`) + root.Flags().IntVar(&apiPort, "api-port", 0, + "HTTP API port (0 = disabled). Requires AGENTS_API_KEY env var.") + root.Flags().StringVar(&apiKey, "api-key", "", + "HTTP API Bearer key (overrides AGENTS_API_KEY env var)") if err := root.Execute(); err != nil { os.Exit(1) @@ -364,6 +394,12 @@ func newLogger(level string) *slog.Logger { return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: parseLogLevel(level)})) } +// newProcessManager creates a process.Manager scoped to the current working +// directory, used by the HTTP API to reflect the live launcher state. +func newProcessManager(logDir string) *process.Manager { + return process.NewManager("run", "agents/*/config.yaml", "bin/launcher") +} + // isSpecialConfig checks whether a config path belongs to a middleware special // (e.g. orchestrator) by detecting a "special:" top-level key with a non-empty // id. This avoids config.Load() failing with "agent.id is required" when the diff --git a/internal/api/handlers.go b/internal/api/handlers.go new file mode 100644 index 0000000..3ae2d43 --- /dev/null +++ b/internal/api/handlers.go @@ -0,0 +1,271 @@ +package api + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/enmanuel/agents/shell/process" +) + +// --- Response types --- + +// AgentResponse is the JSON representation of an agent. +type AgentResponse struct { + ID string `json:"id"` + Name string `json:"name"` + Version string `json:"version"` + Desc string `json:"desc"` + Enabled bool `json:"enabled"` + Running bool `json:"running"` + PID int `json:"pid,omitempty"` + Instances int `json:"instances"` + ConfigPath string `json:"config_path"` +} + +// AgentDetailResponse extends AgentResponse with logs. +type AgentDetailResponse struct { + AgentResponse + Logs []string `json:"logs"` +} + +func agentResponse(s process.AgentStatus) AgentResponse { + return AgentResponse{ + ID: s.ID, + Name: s.Name, + Version: s.Version, + Desc: s.Desc, + Enabled: s.Enabled, + Running: s.Running, + PID: s.PID, + Instances: s.Instances, + ConfigPath: s.ConfigPath, + } +} + +// --- Health --- + +func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]string{"status": "ok", "time": time.Now().UTC().Format(time.RFC3339)}) +} + +// --- List agents --- + +func (s *Server) handleListAgents(w http.ResponseWriter, r *http.Request) { + statuses, err := s.mgr.StatusAll() + if err != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err)) + return + } + resp := make([]AgentResponse, 0, len(statuses)) + for _, st := range statuses { + resp = append(resp, agentResponse(st)) + } + writeJSON(w, http.StatusOK, resp) +} + +// --- Get single agent --- + +func (s *Server) handleGetAgent(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + statuses, err := s.mgr.StatusAll() + if err != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err)) + return + } + + var found *process.AgentStatus + for i, st := range statuses { + if st.ID == id { + found = &statuses[i] + break + } + } + if found == nil { + writeError(w, http.StatusNotFound, "agent not found") + return + } + + n := 200 + if qn := r.URL.Query().Get("n"); qn != "" { + if parsed, err := strconv.Atoi(qn); err == nil && parsed > 0 { + n = parsed + } + } + + logs, _ := s.mgr.LogTail(id, n) + writeJSON(w, http.StatusOK, AgentDetailResponse{ + AgentResponse: agentResponse(*found), + Logs: logs, + }) +} + +// --- Start agent --- + +func (s *Server) handleStartAgent(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + agents, err := s.mgr.Scan() + if err != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err)) + return + } + + var info *process.AgentInfo + for i, a := range agents { + if a.ID == id { + info = &agents[i] + break + } + } + if info == nil { + writeError(w, http.StatusNotFound, "agent not found") + return + } + + if err := s.mgr.Start(*info); err != nil { + writeError(w, http.StatusConflict, fmt.Sprintf("start: %v", err)) + return + } + s.logger.Info("agent started via api", "id", id) + writeJSON(w, http.StatusOK, map[string]string{"status": "started", "id": id}) +} + +// --- Stop agent --- + +func (s *Server) handleStopAgent(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + if err := s.mgr.Stop(id); err != nil { + writeError(w, http.StatusConflict, fmt.Sprintf("stop: %v", err)) + return + } + s.logger.Info("agent stopped via api", "id", id) + writeJSON(w, http.StatusOK, map[string]string{"status": "stopped", "id": id}) +} + +// --- Restart agent --- + +func (s *Server) handleRestartAgent(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + + // Stop first (ignore not-running error) + _ = s.mgr.Stop(id) + + // Wait up to 3s for process to die + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if !s.mgr.IsRunning(id) { + break + } + time.Sleep(200 * time.Millisecond) + } + + // Find agent info for Start + agents, err := s.mgr.Scan() + if err != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err)) + return + } + + var info *process.AgentInfo + for i, a := range agents { + if a.ID == id { + info = &agents[i] + break + } + } + if info == nil { + writeError(w, http.StatusNotFound, "agent not found") + return + } + + if err := s.mgr.Start(*info); err != nil { + writeError(w, http.StatusConflict, fmt.Sprintf("restart/start: %v", err)) + return + } + s.logger.Info("agent restarted via api", "id", id) + writeJSON(w, http.StatusOK, map[string]string{"status": "restarted", "id": id}) +} + +// --- Agent logs snapshot --- + +func (s *Server) handleAgentLogs(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + n := 200 + if qn := r.URL.Query().Get("n"); qn != "" { + if parsed, err := strconv.Atoi(qn); err == nil && parsed > 0 { + n = parsed + } + } + + logs, err := s.mgr.LogTail(id, n) + if err != nil { + writeError(w, http.StatusNotFound, fmt.Sprintf("logs: %v", err)) + return + } + writeJSON(w, http.StatusOK, map[string]any{"id": id, "lines": logs, "count": len(logs)}) +} + +// --- SSE: status broadcast --- + +func (s *Server) handleSSEStatus(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + sub := s.bus.Subscribe("status") + defer s.bus.Unsubscribe("status", sub) + + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-sub: + if !ok { + return + } + data, _ := json.Marshal(ev) + fmt.Fprintf(w, "event: status\ndata: %s\n\n", data) + flusher.Flush() + } + } +} + +// --- SSE: agent log tail --- + +func (s *Server) handleSSEAgentLogs(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + + logPath := s.mgr.LogPath(id) + if logPath == "" { + http.Error(w, "agent not found", http.StatusNotFound) + return + } + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + ctx := r.Context() + tailLogFile(ctx, logPath, w, flusher) +} diff --git a/internal/api/poller.go b/internal/api/poller.go new file mode 100644 index 0000000..e3082a6 --- /dev/null +++ b/internal/api/poller.go @@ -0,0 +1,91 @@ +package api + +import ( + "context" + "time" + + "github.com/enmanuel/agents/shell/process" +) + +// StatusDiff is published to the "status" topic when an agent's running state changes. +type StatusDiff struct { + AgentID string `json:"agent_id"` + OldStatus bool `json:"old_running"` + NewStatus bool `json:"new_running"` + PID int `json:"pid,omitempty"` +} + +// pollStatus polls StatusAll every 2s and publishes StatusDiff events on changes. +func (s *Server) pollStatus(ctx context.Context) { + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + // Seed the previous state map. + prev := make(map[string]bool) + if statuses, err := s.mgr.StatusAll(); err == nil { + for _, st := range statuses { + prev[st.ID] = st.Running + } + } + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.checkAndPublishDiffs(prev) + } + } +} + +func (s *Server) checkAndPublishDiffs(prev map[string]bool) { + statuses, err := s.mgr.StatusAll() + if err != nil { + return + } + + for _, st := range statuses { + old, known := prev[st.ID] + if !known || old != st.Running { + s.bus.Publish("status", StatusDiff{ + AgentID: st.ID, + OldStatus: old, + NewStatus: st.Running, + PID: st.PID, + }) + prev[st.ID] = st.Running + } + } + + // Handle agents that were removed (disappeared from scan) + current := make(map[string]bool, len(statuses)) + for _, st := range statuses { + current[st.ID] = true + } + for id, wasRunning := range prev { + if !current[id] { + if wasRunning { + s.bus.Publish("status", StatusDiff{ + AgentID: id, + OldStatus: true, + NewStatus: false, + }) + } + delete(prev, id) + } + } +} + +// agentInfoByID finds AgentInfo by ID in a StatusAll scan. +func agentInfoByID(mgr *process.Manager, id string) (*process.AgentInfo, error) { + agents, err := mgr.Scan() + if err != nil { + return nil, err + } + for i, a := range agents { + if a.ID == id { + return &agents[i], nil + } + } + return nil, nil +} diff --git a/internal/api/pubsub.go b/internal/api/pubsub.go new file mode 100644 index 0000000..73bb52b --- /dev/null +++ b/internal/api/pubsub.go @@ -0,0 +1,64 @@ +// Package api — in-memory pub/sub bus for SSE broadcast. +// +// TODO(v0.2): if a second consumer (e.g. from another VPS) is added, +// replace this in-memory bus with NATS or Redis pub/sub. For now +// (1 local client) the overhead of an external broker is unwarranted. +package api + +import ( + "sync" +) + +// Event is a generic event payload (JSON-serialisable). +type Event = any + +// Bus is a simple in-memory pub/sub hub. +// Topics are arbitrary strings (e.g. "status", "logs/agent-id"). +type Bus struct { + mu sync.RWMutex + subs map[string][]chan Event +} + +// NewBus creates an initialised Bus. +func NewBus() *Bus { + return &Bus{subs: make(map[string][]chan Event)} +} + +// Subscribe returns a channel that receives events published to topic. +// The channel is buffered (32) to avoid blocking the publisher. +func (b *Bus) Subscribe(topic string) <-chan Event { + ch := make(chan Event, 32) + b.mu.Lock() + b.subs[topic] = append(b.subs[topic], ch) + b.mu.Unlock() + return ch +} + +// Unsubscribe removes ch from topic and closes it. +func (b *Bus) Unsubscribe(topic string, ch <-chan Event) { + b.mu.Lock() + defer b.mu.Unlock() + list := b.subs[topic] + for i, c := range list { + if c == ch { + close(c) + b.subs[topic] = append(list[:i], list[i+1:]...) + return + } + } +} + +// Publish sends ev to all subscribers of topic. +// Non-blocking: if a subscriber channel is full, the event is dropped for that subscriber. +func (b *Bus) Publish(topic string, ev Event) { + b.mu.RLock() + list := b.subs[topic] + b.mu.RUnlock() + for _, ch := range list { + select { + case ch <- ev: + default: + // drop for this slow subscriber + } + } +} diff --git a/internal/api/server.go b/internal/api/server.go new file mode 100644 index 0000000..a170bb2 --- /dev/null +++ b/internal/api/server.go @@ -0,0 +1,160 @@ +// Package api provides the HTTP API server for agents_and_robots. +// It exposes REST endpoints for agent management and SSE streams for +// real-time status and log updates. +// +// Auth: every endpoint (except /health) requires: +// +// Authorization: Bearer +// +// with crypto/subtle constant-time comparison. +package api + +import ( + "context" + "crypto/subtle" + "encoding/json" + "log/slog" + "net" + "net/http" + "strconv" + "time" + + "github.com/enmanuel/agents/shell/process" +) + +// Server is the HTTP API server. +type Server struct { + mgr *process.Manager + apiKey string + port int + logger *slog.Logger + bus *Bus +} + +// New creates a new Server. apiKey is compared with subtle.ConstantTimeCompare. +// If apiKey is empty the server refuses to start. +func New(mgr *process.Manager, apiKey string, port int, logger *slog.Logger) *Server { + if logger == nil { + logger = slog.Default() + } + return &Server{ + mgr: mgr, + apiKey: apiKey, + port: port, + logger: logger.With("component", "api"), + bus: NewBus(), + } +} + +// Run starts the HTTP server and blocks until ctx is done. +// It also starts the status-diff poller that feeds /sse/status. +func (s *Server) Run(ctx context.Context) error { + mux := http.NewServeMux() + + // Public endpoints + mux.HandleFunc("GET /health", s.handleHealth) + + // Auth-gated REST endpoints + mux.Handle("GET /agents", s.auth(http.HandlerFunc(s.handleListAgents))) + mux.Handle("GET /agents/{id}", s.auth(http.HandlerFunc(s.handleGetAgent))) + mux.Handle("POST /agents/{id}/start", s.auth(http.HandlerFunc(s.handleStartAgent))) + mux.Handle("POST /agents/{id}/stop", s.auth(http.HandlerFunc(s.handleStopAgent))) + mux.Handle("POST /agents/{id}/restart", s.auth(http.HandlerFunc(s.handleRestartAgent))) + mux.Handle("GET /agents/{id}/logs", s.auth(http.HandlerFunc(s.handleAgentLogs))) + + // SSE endpoints + mux.Handle("GET /sse/status", s.auth(http.HandlerFunc(s.handleSSEStatus))) + mux.Handle("GET /sse/agents/{id}/logs", s.auth(http.HandlerFunc(s.handleSSEAgentLogs))) + + addr := ":" + strconv.Itoa(s.port) + ln, err := net.Listen("tcp", addr) + if err != nil { + return err + } + + srv := &http.Server{ + Handler: s.logMiddleware(mux), + ReadTimeout: 10 * time.Second, + } + + s.logger.Info("api server listening", "addr", addr) + + // Start the status poller + go s.pollStatus(ctx) + + errCh := make(chan error, 1) + go func() { errCh <- srv.Serve(ln) }() + + select { + case <-ctx.Done(): + shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return srv.Shutdown(shutCtx) + case err := <-errCh: + return err + } +} + +// --- Auth middleware --- + +func (s *Server) auth(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + key := extractBearerToken(r) + expected := []byte(s.apiKey) + got := []byte(key) + + // Ensure equal-length comparison to avoid timing side-channel. + // subtle.ConstantTimeCompare returns 0 if lengths differ too. + if subtle.ConstantTimeCompare(got, expected) != 1 { + writeJSON(w, http.StatusUnauthorized, map[string]string{"error": "unauthorized"}) + return + } + next.ServeHTTP(w, r) + }) +} + +func extractBearerToken(r *http.Request) string { + h := r.Header.Get("Authorization") + if len(h) > 7 && h[:7] == "Bearer " { + return h[7:] + } + return "" +} + +// --- Log middleware --- + +func (s *Server) logMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + rw := &statusWriter{ResponseWriter: w, code: http.StatusOK} + next.ServeHTTP(rw, r) + s.logger.Info("http", + "method", r.Method, + "path", r.URL.Path, + "status", rw.code, + "duration_ms", time.Since(start).Milliseconds(), + ) + }) +} + +type statusWriter struct { + http.ResponseWriter + code int +} + +func (sw *statusWriter) WriteHeader(code int) { + sw.code = code + sw.ResponseWriter.WriteHeader(code) +} + +// --- Helpers --- + +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(v) +} + +func writeError(w http.ResponseWriter, status int, msg string) { + writeJSON(w, status, map[string]string{"error": msg}) +} diff --git a/internal/api/server_test.go b/internal/api/server_test.go new file mode 100644 index 0000000..a76674e --- /dev/null +++ b/internal/api/server_test.go @@ -0,0 +1,253 @@ +package api + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/enmanuel/agents/shell/process" +) + +// newTestServer creates a Server with a real (temp-dir) Manager and a test API key. +func newTestServer(t *testing.T) *Server { + t.Helper() + dir := t.TempDir() + mgr := process.NewManager(dir+"/run", dir+"/agents/*/config.yaml", "") + return New(mgr, "test-key-abcd1234", 0, nil) +} + +// --- Auth tests --- + +func TestAuthMissingHeader(t *testing.T) { + s := newTestServer(t) + req := httptest.NewRequest(http.MethodGet, "/agents", nil) + w := httptest.NewRecorder() + + s.auth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Error("handler called despite missing auth") + })).ServeHTTP(w, req) + + if w.Code != http.StatusUnauthorized { + t.Fatalf("expected 401, got %d", w.Code) + } +} + +func TestAuthWrongKey(t *testing.T) { + s := newTestServer(t) + req := httptest.NewRequest(http.MethodGet, "/agents", nil) + req.Header.Set("Authorization", "Bearer wrong-key") + w := httptest.NewRecorder() + + s.auth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Error("handler called despite wrong key") + })).ServeHTTP(w, req) + + if w.Code != http.StatusUnauthorized { + t.Fatalf("expected 401, got %d", w.Code) + } +} + +func TestAuthCorrectKey(t *testing.T) { + s := newTestServer(t) + req := httptest.NewRequest(http.MethodGet, "/agents", nil) + req.Header.Set("Authorization", "Bearer test-key-abcd1234") + w := httptest.NewRecorder() + + called := false + s.auth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(http.StatusOK) + })).ServeHTTP(w, req) + + if !called { + t.Fatal("handler not called with valid key") + } + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } +} + +// --- Health endpoint --- + +func TestHealthEndpoint(t *testing.T) { + s := newTestServer(t) + mux := http.NewServeMux() + mux.HandleFunc("GET /health", s.handleHealth) + + req := httptest.NewRequest(http.MethodGet, "/health", nil) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + var resp map[string]string + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp["status"] != "ok" { + t.Errorf("expected status=ok, got %q", resp["status"]) + } +} + +// --- List agents --- + +func TestListAgentsEmpty(t *testing.T) { + s := newTestServer(t) + mux := http.NewServeMux() + mux.Handle("GET /agents", s.auth(http.HandlerFunc(s.handleListAgents))) + + req := httptest.NewRequest(http.MethodGet, "/agents", nil) + req.Header.Set("Authorization", "Bearer test-key-abcd1234") + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + + body, _ := io.ReadAll(w.Body) + // With empty agents dir, should return empty JSON array. + trimmed := strings.TrimSpace(string(body)) + if trimmed != "null" && trimmed != "[]" { + // Accept both null and [] for empty slice serialisation. + t.Logf("body: %s (acceptable)", trimmed) + } +} + +// --- Bus tests --- + +func TestBusSubscribePublish(t *testing.T) { + b := NewBus() + ch := b.Subscribe("test") + defer b.Unsubscribe("test", ch) + + b.Publish("test", "hello") + + select { + case ev := <-ch: + if ev != "hello" { + t.Fatalf("expected 'hello', got %v", ev) + } + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + } +} + +func TestBusUnsubscribe(t *testing.T) { + b := NewBus() + ch := b.Subscribe("test") + b.Unsubscribe("test", ch) + + // After unsubscribe, channel should be closed. + select { + case _, ok := <-ch: + if ok { + t.Fatal("channel should be closed after unsubscribe") + } + case <-time.After(100 * time.Millisecond): + t.Fatal("channel not closed after unsubscribe") + } +} + +func TestBusMultipleSubscribers(t *testing.T) { + b := NewBus() + ch1 := b.Subscribe("x") + ch2 := b.Subscribe("x") + defer b.Unsubscribe("x", ch1) + defer b.Unsubscribe("x", ch2) + + b.Publish("x", 42) + + for _, ch := range []<-chan Event{ch1, ch2} { + select { + case ev := <-ch: + if ev != 42 { + t.Fatalf("expected 42, got %v", ev) + } + case <-time.After(time.Second): + t.Fatal("timeout") + } + } +} + +// --- Get agent not found --- + +func TestGetAgentNotFound(t *testing.T) { + s := newTestServer(t) + mux := http.NewServeMux() + mux.Handle("GET /agents/{id}", s.auth(http.HandlerFunc(s.handleGetAgent))) + + req := httptest.NewRequest(http.MethodGet, "/agents/nonexistent", nil) + req.Header.Set("Authorization", "Bearer test-key-abcd1234") + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + + if w.Code != http.StatusNotFound { + t.Fatalf("expected 404, got %d", w.Code) + } +} + +// --- Stop agent not running --- + +func TestStopAgentNotRunning(t *testing.T) { + s := newTestServer(t) + mux := http.NewServeMux() + mux.Handle("POST /agents/{id}/stop", s.auth(http.HandlerFunc(s.handleStopAgent))) + + req := httptest.NewRequest(http.MethodPost, "/agents/ghost/stop", nil) + req.Header.Set("Authorization", "Bearer test-key-abcd1234") + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + + // stop returns Conflict when the agent is not running + if w.Code != http.StatusConflict { + t.Fatalf("expected 409, got %d", w.Code) + } +} + +// --- Logs endpoint --- + +func TestLogsEndpointNotFound(t *testing.T) { + s := newTestServer(t) + mux := http.NewServeMux() + mux.Handle("GET /agents/{id}/logs", s.auth(http.HandlerFunc(s.handleAgentLogs))) + + req := httptest.NewRequest(http.MethodGet, "/agents/nonexistent/logs?n=10", nil) + req.Header.Set("Authorization", "Bearer test-key-abcd1234") + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + + if w.Code != http.StatusNotFound { + t.Fatalf("expected 404, got %d", w.Code) + } +} + +// --- extractBearerToken --- + +func TestExtractBearerToken(t *testing.T) { + cases := []struct { + header string + want string + }{ + {"Bearer abc123", "abc123"}, + {"bearer abc123", ""}, // case sensitive + {"Basic abc123", ""}, + {"", ""}, + {"Bearer ", ""}, + } + for _, tc := range cases { + req := httptest.NewRequest(http.MethodGet, "/", nil) + if tc.header != "" { + req.Header.Set("Authorization", tc.header) + } + got := extractBearerToken(req) + if got != tc.want { + t.Errorf("header=%q: got=%q want=%q", tc.header, got, tc.want) + } + } +} diff --git a/internal/api/tail.go b/internal/api/tail.go new file mode 100644 index 0000000..8b986e7 --- /dev/null +++ b/internal/api/tail.go @@ -0,0 +1,90 @@ +package api + +import ( + "bufio" + "context" + "fmt" + "io" + "net/http" + "os" + "time" +) + +// tailLogFile streams new lines appended to path to w (SSE text/plain lines). +// Sends existing content first (last 200 lines), then polls for new content. +// Blocks until ctx is done. +func tailLogFile(ctx context.Context, path string, w http.ResponseWriter, flusher http.Flusher) { + f, err := os.Open(path) + if err != nil { + // File may not exist yet (agent hasn't written any logs). + // Wait for it to appear. + f = waitForFile(ctx, path) + if f == nil { + return // ctx cancelled before file appeared + } + } + defer f.Close() + + // Seek to end minus ~50 KB to avoid dumping the whole file. + // This gives "recent context" without overwhelming the SSE stream. + const tailBytes = 50 * 1024 + info, _ := f.Stat() + if info != nil && info.Size() > tailBytes { + _, _ = f.Seek(-tailBytes, io.SeekEnd) + // Skip incomplete first line + r := bufio.NewReader(f) + _, _ = r.ReadString('\n') + // Emit buffered remainder + for { + line, err := r.ReadString('\n') + if line != "" { + fmt.Fprintf(w, "event: log\ndata: %s\n\n", line) + flusher.Flush() + } + if err != nil { + break + } + } + } + + // Tail the file: poll for new bytes every 200ms + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + + reader := bufio.NewReader(f) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + for { + line, err := reader.ReadString('\n') + if line != "" { + fmt.Fprintf(w, "event: log\ndata: %s\n\n", line) + flusher.Flush() + } + if err != nil { + // io.EOF means no more data right now — wait next tick + break + } + } + } + } +} + +// waitForFile polls until path exists or ctx is done. +func waitForFile(ctx context.Context, path string) *os.File { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + f, err := os.Open(path) + if err == nil { + return f + } + } + } +} diff --git a/systemd/agents_and_robots.service b/systemd/agents_and_robots.service new file mode 100644 index 0000000..ebba264 --- /dev/null +++ b/systemd/agents_and_robots.service @@ -0,0 +1,14 @@ +[Unit] +Description=agents_and_robots — Matrix bot platform launcher +After=network.target + +[Service] +Type=simple +WorkingDirectory=/home/ubuntu/CodeProyects/agents_and_robots +EnvironmentFile=/home/ubuntu/CodeProyects/agents_and_robots/.env +ExecStart=/home/ubuntu/CodeProyects/agents_and_robots/bin/launcher --log-level info --api-port 8487 +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target