feat(api): per-agent unified control + clear_memory + delete_cache

- Manager: RegisterUnifiedAgent/UnregisterUnifiedAgent/StopUnifiedAgent/
  IsUnifiedAgentRunning/UptimeSeconds — cancela goroutines individuales sin
  matar el launcher
- Manager: UptimeSeconds en AgentStatus via startedAt map
- api/server: AgentController interface + WithController/WithDataDir builders
  + rutas POST /agents/{id}/clear_memory y /agents/{id}/delete_cache
- api/handlers: handleStartAgent/Stop/Restart delegan a controller en modo
  unified; Messages24h enriquecido via queryMessages24h (cache 30s)
- api/handlers: handleClearMemory — para la goroutine, borra messages+facts de
  memory.db, responde {status,messages_deleted,facts_deleted}
- api/handlers: handleDeleteCache — para la goroutine, elimina crypto/ y cache/,
  responde {status,paths_deleted}
- launcher/registry: launchGoroutine extrae goroutine con contexto per-agente;
  deps.procMgr hookea RegisterUnified; startAgent permite relanzar via reload
- launcher/main: agentController implementa api.AgentController sobre registry;
  mgr compartido entre API y registry; WithController+WithDataDir cableados

Co-Authored-By: fn-orquestador <noreply@fn-registry>
This commit is contained in:
2026-05-22 22:56:46 +02:00
parent 3db4443b65
commit 261f96f71b
5 changed files with 482 additions and 52 deletions
+270 -19
View File
@@ -1,28 +1,35 @@
package api
import (
"database/sql"
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/enmanuel/agents/shell/process"
_ "modernc.org/sqlite" // pure-Go SQLite driver (same as launcher)
)
// --- Response types ---
// AgentResponse is the JSON representation of an agent.
type AgentResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
Desc string `json:"desc"`
Enabled bool `json:"enabled"`
Running bool `json:"running"`
PID int `json:"pid,omitempty"`
Instances int `json:"instances"`
ConfigPath string `json:"config_path"`
ID string `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
Desc string `json:"desc"`
Enabled bool `json:"enabled"`
Running bool `json:"running"`
PID int `json:"pid,omitempty"`
Instances int `json:"instances"`
ConfigPath string `json:"config_path"`
UptimeSeconds int64 `json:"uptime_seconds"`
Messages24h int `json:"messages_24h"`
}
// AgentDetailResponse extends AgentResponse with logs.
@@ -31,20 +38,71 @@ type AgentDetailResponse struct {
Logs []string `json:"logs"`
}
// msg24hCache caches messages_24h counts per agent to avoid hammering SQLite.
type msg24hEntry struct {
count int
fetchAt time.Time
}
var (
msg24hMu sync.Mutex
msg24hCache = make(map[string]msg24hEntry)
msg24hTTL = 30 * time.Second
)
func agentResponse(s process.AgentStatus) AgentResponse {
return AgentResponse{
ID: s.ID,
Name: s.Name,
Version: s.Version,
Desc: s.Desc,
Enabled: s.Enabled,
Running: s.Running,
PID: s.PID,
Instances: s.Instances,
ConfigPath: s.ConfigPath,
ID: s.ID,
Name: s.Name,
Version: s.Version,
Desc: s.Desc,
Enabled: s.Enabled,
Running: s.Running,
PID: s.PID,
Instances: s.Instances,
ConfigPath: s.ConfigPath,
UptimeSeconds: s.UptimeSeconds,
}
}
// queryMessages24h returns the count of messages in the past 24h for the given agent.
// Uses a 30s cache keyed by agentID. dataDir is the base data directory
// (e.g. "agents/<id>/data"). Returns 0 on error (non-fatal).
func queryMessages24h(agentID, dataDir string) int {
msg24hMu.Lock()
if e, ok := msg24hCache[agentID]; ok && time.Since(e.fetchAt) < msg24hTTL {
msg24hMu.Unlock()
return e.count
}
msg24hMu.Unlock()
dbPath := filepath.Join(dataDir, "memory.db")
if _, err := os.Stat(dbPath); err != nil {
return 0 // DB does not exist yet
}
db, err := sql.Open("sqlite", dbPath+"?mode=ro&_query_only=1")
if err != nil {
return 0
}
defer db.Close()
var count int
row := db.QueryRow(
"SELECT COUNT(*) FROM messages WHERE agent_id=? AND created_at > datetime('now','-24 hours')",
agentID,
)
if err := row.Scan(&count); err != nil {
return 0
}
msg24hMu.Lock()
msg24hCache[agentID] = msg24hEntry{count: count, fetchAt: time.Now()}
msg24hMu.Unlock()
return count
}
// --- Health ---
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
@@ -72,7 +130,13 @@ func (s *Server) handleListAgents(w http.ResponseWriter, r *http.Request) {
}
resp := make([]AgentResponse, 0, len(statuses))
for _, st := range statuses {
resp = append(resp, agentResponse(st))
ar := agentResponse(st)
// Enrich with messages_24h when dataDir is configured
if s.dataDir != "" {
agentDataDir := filepath.Join(s.dataDir, st.ID, "data")
ar.Messages24h = queryMessages24h(st.ID, agentDataDir)
}
resp = append(resp, ar)
}
writeJSON(w, http.StatusOK, resp)
}
@@ -117,6 +181,19 @@ func (s *Server) handleGetAgent(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleStartAgent(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
// Unified mode: delegate to AgentController if available
if s.mgr.IsUnifiedRunning() && s.controller != nil {
if err := s.controller.StartUnifiedAgent(id); err != nil {
writeError(w, http.StatusConflict, fmt.Sprintf("start (unified): %v", err))
return
}
s.logger.Info("agent started via api (unified)", "id", id)
writeJSON(w, http.StatusOK, map[string]string{"status": "started", "id": id, "mode": "unified"})
return
}
// Multi-process mode: use per-agent process launch
agents, err := s.mgr.Scan()
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err))
@@ -147,6 +224,19 @@ func (s *Server) handleStartAgent(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleStopAgent(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
// Unified mode: cancel goroutine context without killing launcher
if s.mgr.IsUnifiedRunning() && s.controller != nil {
if err := s.controller.StopUnifiedAgent(id); err != nil {
writeError(w, http.StatusConflict, fmt.Sprintf("stop (unified): %v", err))
return
}
s.logger.Info("agent stopped via api (unified)", "id", id)
writeJSON(w, http.StatusOK, map[string]string{"status": "stopped", "id": id, "mode": "unified"})
return
}
// Multi-process mode
if err := s.mgr.Stop(id); err != nil {
writeError(w, http.StatusConflict, fmt.Sprintf("stop: %v", err))
return
@@ -160,6 +250,24 @@ func (s *Server) handleStopAgent(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleRestartAgent(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
// Unified mode: stop goroutine then re-launch
if s.mgr.IsUnifiedRunning() && s.controller != nil {
// Stop (ignore not-running error)
_ = s.controller.StopUnifiedAgent(id)
// Brief pause to let goroutine exit cleanly
time.Sleep(500 * time.Millisecond)
if err := s.controller.StartUnifiedAgent(id); err != nil {
writeError(w, http.StatusConflict, fmt.Sprintf("restart/start (unified): %v", err))
return
}
s.logger.Info("agent restarted via api (unified)", "id", id)
writeJSON(w, http.StatusOK, map[string]string{"status": "restarted", "id": id, "mode": "unified"})
return
}
// Multi-process mode
// Stop first (ignore not-running error)
_ = s.mgr.Stop(id)
@@ -267,6 +375,149 @@ func (s *Server) handleSSEStatus(w http.ResponseWriter, r *http.Request) {
}
}
// --- Clear memory ---
func (s *Server) handleClearMemory(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
// Determine whether restart after clear is requested.
restart := r.URL.Query().Get("restart") == "true"
// In unified mode, stop the agent goroutine before touching its DB.
wasRunning := false
if s.mgr.IsUnifiedRunning() && s.controller != nil {
wasRunning = s.mgr.IsUnifiedAgentRunning(id)
if wasRunning {
if err := s.controller.StopUnifiedAgent(id); err != nil {
writeError(w, http.StatusConflict, fmt.Sprintf("clear_memory/stop: %v", err))
return
}
// Give goroutine a moment to release the DB.
time.Sleep(300 * time.Millisecond)
}
}
// Locate the agent's memory.db.
if s.dataDir == "" {
writeError(w, http.StatusInternalServerError, "data_dir not configured on server")
return
}
dbPath := filepath.Join(s.dataDir, id, "data", "memory.db")
if _, err := os.Stat(dbPath); err != nil {
// No memory.db — still a success (nothing to clear).
writeJSON(w, http.StatusOK, map[string]any{
"status": "cleared",
"messages_deleted": 0,
"facts_deleted": 0,
})
return
}
db, err := sql.Open("sqlite", dbPath)
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("open memory.db: %v", err))
return
}
defer db.Close()
var msgDel, factsDel int64
res, err := db.ExecContext(r.Context(), "DELETE FROM messages WHERE agent_id=?", id)
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("delete messages: %v", err))
return
}
msgDel, _ = res.RowsAffected()
res, err = db.ExecContext(r.Context(), "DELETE FROM facts WHERE agent_id=?", id)
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("delete facts: %v", err))
return
}
factsDel, _ = res.RowsAffected()
// Invalidate the 24h cache entry for this agent.
msg24hMu.Lock()
delete(msg24hCache, id)
msg24hMu.Unlock()
s.logger.Info("agent memory cleared via api", "id", id,
"messages_deleted", msgDel, "facts_deleted", factsDel)
// Optionally restart.
if (restart || wasRunning) && s.mgr.IsUnifiedRunning() && s.controller != nil {
_ = s.controller.StartUnifiedAgent(id)
}
writeJSON(w, http.StatusOK, map[string]any{
"status": "cleared",
"messages_deleted": msgDel,
"facts_deleted": factsDel,
})
}
// --- Delete cache ---
func (s *Server) handleDeleteCache(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
restart := r.URL.Query().Get("restart") == "true"
// Stop in unified mode before removing crypto dir.
wasRunning := false
if s.mgr.IsUnifiedRunning() && s.controller != nil {
wasRunning = s.mgr.IsUnifiedAgentRunning(id)
if wasRunning {
if err := s.controller.StopUnifiedAgent(id); err != nil {
writeError(w, http.StatusConflict, fmt.Sprintf("delete_cache/stop: %v", err))
return
}
time.Sleep(300 * time.Millisecond)
}
}
if s.dataDir == "" {
writeError(w, http.StatusInternalServerError, "data_dir not configured on server")
return
}
agentDataDir := filepath.Join(s.dataDir, id, "data")
var deleted []string
// Remove crypto directory (session keys, verification cache).
cryptoDir := filepath.Join(agentDataDir, "crypto")
if _, err := os.Stat(cryptoDir); err == nil {
if err := os.RemoveAll(cryptoDir); err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("remove crypto: %v", err))
return
}
deleted = append(deleted, cryptoDir)
}
// Remove cache directory contents (but keep the dir itself).
cacheDir := filepath.Join(agentDataDir, "cache")
if entries, err := os.ReadDir(cacheDir); err == nil {
for _, e := range entries {
p := filepath.Join(cacheDir, e.Name())
if err := os.RemoveAll(p); err == nil {
deleted = append(deleted, p)
}
}
}
s.logger.Info("agent cache deleted via api", "id", id, "paths", len(deleted))
// Optionally restart.
if (restart || wasRunning) && s.mgr.IsUnifiedRunning() && s.controller != nil {
_ = s.controller.StartUnifiedAgent(id)
}
writeJSON(w, http.StatusOK, map[string]any{
"status": "cleared",
"paths_deleted": deleted,
})
}
// --- SSE: agent log tail ---
func (s *Server) handleSSEAgentLogs(w http.ResponseWriter, r *http.Request) {
+34 -5
View File
@@ -22,13 +22,28 @@ import (
"github.com/enmanuel/agents/shell/process"
)
// AgentController is an optional interface for per-agent unified-mode control.
// The launcher can implement this to allow the API to stop/start individual
// agent goroutines without restarting the whole process.
type AgentController interface {
// StopUnifiedAgent cancels the goroutine context for the agent with the given ID.
// Returns an error if the agent is not currently running in unified mode.
StopUnifiedAgent(id string) error
// StartUnifiedAgent re-launches the agent goroutine for the given ID.
// Returns an error if the agent is not registered.
StartUnifiedAgent(id string) error
}
// Server is the HTTP API server.
type Server struct {
mgr *process.Manager
apiKey string
port int
logger *slog.Logger
bus *Bus
mgr *process.Manager
apiKey string
port int
logger *slog.Logger
bus *Bus
controller AgentController // optional: per-agent unified control (nil = not available)
// dataDir is the base directory for agent runtime data used for memory/cache queries.
dataDir string
}
// New creates a new Server. apiKey is compared with subtle.ConstantTimeCompare.
@@ -46,6 +61,18 @@ func New(mgr *process.Manager, apiKey string, port int, logger *slog.Logger) *Se
}
}
// WithController attaches an AgentController for unified-mode per-agent control.
func (s *Server) WithController(c AgentController) *Server {
s.controller = c
return s
}
// WithDataDir sets the base directory for agent runtime data (memory.db, crypto/).
func (s *Server) WithDataDir(dir string) *Server {
s.dataDir = dir
return s
}
// Run starts the HTTP server and blocks until ctx is done.
// It also starts the status-diff poller that feeds /sse/status.
func (s *Server) Run(ctx context.Context) error {
@@ -61,6 +88,8 @@ func (s *Server) Run(ctx context.Context) error {
mux.Handle("POST /agents/{id}/stop", s.auth(http.HandlerFunc(s.handleStopAgent)))
mux.Handle("POST /agents/{id}/restart", s.auth(http.HandlerFunc(s.handleRestartAgent)))
mux.Handle("GET /agents/{id}/logs", s.auth(http.HandlerFunc(s.handleAgentLogs)))
mux.Handle("POST /agents/{id}/clear_memory", s.auth(http.HandlerFunc(s.handleClearMemory)))
mux.Handle("POST /agents/{id}/delete_cache", s.auth(http.HandlerFunc(s.handleDeleteCache)))
// SSE endpoints
mux.Handle("GET /sse/status", s.auth(http.HandlerFunc(s.handleSSEStatus)))