Files
agents_and_robots/internal/api/handlers.go
T
egutierrez 3db4443b65 fix(sse): initial ping + periodic heartbeat unblocks "connecting" state
SSE clients (agents_dashboard) consider the stream connected only after
receiving the first byte of body. The previous implementation flushed
headers and then blocked waiting for status diffs (sse_status) or log
lines (sse_agents_logs) — which could be silent for minutes. UI sat
on "connecting" indefinitely.

Fix:
- After WriteHeader + Flush, emit ":ping\n\n" comment (SSE spec, valid
  no-op) and flush. Unblocks client fgets immediately → state flips
  to "connected" in < 1s.
- Add 15s ticker emitting ":ping\n\n" so idle streams stay alive
  through Traefik / CDN proxies and clients detect dead servers.
- Same treatment for /sse/status and /sse/agents/{id}/logs (tail.go).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:42:29 +02:00

300 lines
7.5 KiB
Go

package api
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
"github.com/enmanuel/agents/shell/process"
)
// --- Response types ---
// AgentResponse is the JSON representation of an agent.
type AgentResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
Desc string `json:"desc"`
Enabled bool `json:"enabled"`
Running bool `json:"running"`
PID int `json:"pid,omitempty"`
Instances int `json:"instances"`
ConfigPath string `json:"config_path"`
}
// AgentDetailResponse extends AgentResponse with logs.
type AgentDetailResponse struct {
AgentResponse
Logs []string `json:"logs"`
}
func agentResponse(s process.AgentStatus) AgentResponse {
return AgentResponse{
ID: s.ID,
Name: s.Name,
Version: s.Version,
Desc: s.Desc,
Enabled: s.Enabled,
Running: s.Running,
PID: s.PID,
Instances: s.Instances,
ConfigPath: s.ConfigPath,
}
}
// --- Health ---
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok", "time": time.Now().UTC().Format(time.RFC3339)})
}
// statusAllAuto chooses unified vs multi-process status based on runtime mode.
// In unified mode all agents run as goroutines under one launcher process — per-agent
// PID files do not exist, so StatusAll reports Running=false. StatusAllUnified
// reflects the real state.
func (s *Server) statusAllAuto() ([]process.AgentStatus, error) {
if s.mgr.IsUnifiedRunning() {
return s.mgr.StatusAllUnified()
}
return s.mgr.StatusAll()
}
// --- List agents ---
func (s *Server) handleListAgents(w http.ResponseWriter, r *http.Request) {
statuses, err := s.statusAllAuto()
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err))
return
}
resp := make([]AgentResponse, 0, len(statuses))
for _, st := range statuses {
resp = append(resp, agentResponse(st))
}
writeJSON(w, http.StatusOK, resp)
}
// --- Get single agent ---
func (s *Server) handleGetAgent(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
statuses, err := s.statusAllAuto()
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err))
return
}
var found *process.AgentStatus
for i, st := range statuses {
if st.ID == id {
found = &statuses[i]
break
}
}
if found == nil {
writeError(w, http.StatusNotFound, "agent not found")
return
}
n := 200
if qn := r.URL.Query().Get("n"); qn != "" {
if parsed, err := strconv.Atoi(qn); err == nil && parsed > 0 {
n = parsed
}
}
logs, _ := s.mgr.LogTail(id, n)
writeJSON(w, http.StatusOK, AgentDetailResponse{
AgentResponse: agentResponse(*found),
Logs: logs,
})
}
// --- Start agent ---
func (s *Server) handleStartAgent(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
agents, err := s.mgr.Scan()
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err))
return
}
var info *process.AgentInfo
for i, a := range agents {
if a.ID == id {
info = &agents[i]
break
}
}
if info == nil {
writeError(w, http.StatusNotFound, "agent not found")
return
}
if err := s.mgr.Start(*info); err != nil {
writeError(w, http.StatusConflict, fmt.Sprintf("start: %v", err))
return
}
s.logger.Info("agent started via api", "id", id)
writeJSON(w, http.StatusOK, map[string]string{"status": "started", "id": id})
}
// --- Stop agent ---
func (s *Server) handleStopAgent(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
if err := s.mgr.Stop(id); err != nil {
writeError(w, http.StatusConflict, fmt.Sprintf("stop: %v", err))
return
}
s.logger.Info("agent stopped via api", "id", id)
writeJSON(w, http.StatusOK, map[string]string{"status": "stopped", "id": id})
}
// --- Restart agent ---
func (s *Server) handleRestartAgent(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
// Stop first (ignore not-running error)
_ = s.mgr.Stop(id)
// Wait up to 3s for process to die
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
if !s.mgr.IsRunning(id) {
break
}
time.Sleep(200 * time.Millisecond)
}
// Find agent info for Start
agents, err := s.mgr.Scan()
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err))
return
}
var info *process.AgentInfo
for i, a := range agents {
if a.ID == id {
info = &agents[i]
break
}
}
if info == nil {
writeError(w, http.StatusNotFound, "agent not found")
return
}
if err := s.mgr.Start(*info); err != nil {
writeError(w, http.StatusConflict, fmt.Sprintf("restart/start: %v", err))
return
}
s.logger.Info("agent restarted via api", "id", id)
writeJSON(w, http.StatusOK, map[string]string{"status": "restarted", "id": id})
}
// --- Agent logs snapshot ---
func (s *Server) handleAgentLogs(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
n := 200
if qn := r.URL.Query().Get("n"); qn != "" {
if parsed, err := strconv.Atoi(qn); err == nil && parsed > 0 {
n = parsed
}
}
logs, err := s.mgr.LogTail(id, n)
if err != nil {
writeError(w, http.StatusNotFound, fmt.Sprintf("logs: %v", err))
return
}
writeJSON(w, http.StatusOK, map[string]any{"id": id, "lines": logs, "count": len(logs)})
}
// --- SSE: status broadcast ---
func (s *Server) handleSSEStatus(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
// Initial ping: SSE clients consider the stream "connected" only after
// receiving the first byte of body. Without this, agents_dashboard sits
// on "connecting" until the first status diff (which can be minutes away).
fmt.Fprint(w, ": ping\n\n")
flusher.Flush()
sub := s.bus.Subscribe("status")
defer s.bus.Unsubscribe("status", sub)
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Periodic heartbeat: keeps proxies (Traefik, CDN) from closing
// the idle connection and lets the client detect dead servers.
if _, err := fmt.Fprint(w, ": ping\n\n"); err != nil {
return
}
flusher.Flush()
case ev, ok := <-sub:
if !ok {
return
}
data, _ := json.Marshal(ev)
fmt.Fprintf(w, "event: status\ndata: %s\n\n", data)
flusher.Flush()
}
}
}
// --- SSE: agent log tail ---
func (s *Server) handleSSEAgentLogs(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
logPath := s.mgr.LogPath(id)
if logPath == "" {
http.Error(w, "agent not found", http.StatusNotFound)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
// Initial ping unblocks client fgets so the UI flips from "connecting"
// to "connected" immediately (logfile may be silent for a while).
fmt.Fprint(w, ": ping\n\n")
flusher.Flush()
ctx := r.Context()
tailLogFile(ctx, logPath, w, flusher)
}