diff --git a/cmd/launcher/main.go b/cmd/launcher/main.go index 747c092..c52eb31 100644 --- a/cmd/launcher/main.go +++ b/cmd/launcher/main.go @@ -116,14 +116,18 @@ func main() { logger.Info("orchestrator initialized") } + // ── Process manager (shared: API reflection + per-agent goroutine hooks) ── + mgr := newProcessManager(logDir) + // ── Shared dependencies for agent registry ── deps := &launchDeps{ - agentBus: agentBus, - orch: orch, - logDir: logDir, - logLevel: lvl, - parentCtx: ctx, - secPolicy: secPolicy, + agentBus: agentBus, + orch: orch, + logDir: logDir, + logLevel: lvl, + parentCtx: ctx, + secPolicy: secPolicy, + procMgr: mgr, } registry := newAgentRegistry(deps) @@ -281,10 +285,11 @@ func main() { if key == "" { logger.Warn("api-port set but AGENTS_API_KEY is empty — HTTP API disabled (set AGENTS_API_KEY in .env)") } else { - // Build a process.Manager that reflects the live launcher state. - // The manager uses run/ for PID files and agents/*/config.yaml for discovery. - mgr := newProcessManager(logDir) - srv := api.New(mgr, key, apiPort, logger) + // mgr already created above; share it between API and registry. + ctrl := &agentController{reg: registry, mgr: mgr} + srv := api.New(mgr, key, apiPort, logger). + WithController(ctrl). + WithDataDir("agents") go func() { if err := srv.Run(ctx); err != nil { logger.Error("api server stopped", "err", err) @@ -400,6 +405,24 @@ func newProcessManager(logDir string) *process.Manager { return process.NewManager("run", "agents/*/config.yaml", "bin/launcher") } +// agentController adapts agentRegistry + process.Manager to the api.AgentController +// interface, allowing the HTTP API to start/stop individual agent goroutines without +// restarting the whole launcher process. +type agentController struct { + reg *agentRegistry + mgr *process.Manager +} + +// StopUnifiedAgent cancels the per-agent goroutine context without stopping the launcher. +func (c *agentController) StopUnifiedAgent(id string) error { + return c.mgr.StopUnifiedAgent(id) +} + +// StartUnifiedAgent re-launches the agent goroutine for the given ID. +func (c *agentController) StartUnifiedAgent(id string) error { + return c.reg.startAgent(id, rulesFor) +} + // isSpecialConfig checks whether a config path belongs to a middleware special // (e.g. orchestrator) by detecting a "special:" top-level key with a non-empty // id. This avoids config.Load() failing with "agent.id is required" when the diff --git a/cmd/launcher/registry.go b/cmd/launcher/registry.go index 966bf3d..19e6d86 100644 --- a/cmd/launcher/registry.go +++ b/cmd/launcher/registry.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "log/slog" "os" "strings" @@ -34,6 +35,15 @@ type launchDeps struct { logLevel slog.Level parentCtx context.Context secPolicy pksecurity.SecurityPolicy // centralized security policy loaded from security/ + procMgr procManagerHook // optional: per-agent goroutine registration for API +} + +// procManagerHook allows the registry to register/unregister per-agent goroutine +// contexts with the process.Manager so the API can reflect and control individual +// agent goroutines in unified mode. +type procManagerHook interface { + RegisterUnifiedAgent(id string, cancel context.CancelFunc) + UnregisterUnifiedAgent(id string) } // agentRegistry tracks all running agents by ID, enabling individual hot-reload. @@ -61,10 +71,33 @@ func (r *agentRegistry) register(ra *runningAgent) { runtimeType = "agent" } + r.launchGoroutine(ra, runtimeType) +} + +// launchGoroutine starts a runner goroutine, registering its cancel context with +// the process manager hook when available for per-agent stop/start control. +func (r *agentRegistry) launchGoroutine(ra *runningAgent, runtimeType string) { + agentID := ra.cfg.Agent.ID go func() { + // Create a per-agent context derived from parent so we can cancel just + // this goroutine without stopping the launcher or other agents. + agentCtx, cancel := context.WithCancel(r.deps.parentCtx) + defer cancel() + + // Register with process manager for API control (unified mode). + if r.deps.procMgr != nil { + r.deps.procMgr.RegisterUnifiedAgent(agentID, cancel) + defer r.deps.procMgr.UnregisterUnifiedAgent(agentID) + } + ra.logger.Info("runner started", "type", runtimeType) - if err := ra.runner.Run(r.deps.parentCtx); err != nil { - ra.logger.Error("runner stopped with error", "err", err, "type", runtimeType) + if err := ra.runner.Run(agentCtx); err != nil { + if agentCtx.Err() == nil { + // Not cancelled externally — log as real error + ra.logger.Error("runner stopped with error", "err", err, "type", runtimeType) + } else { + ra.logger.Info("runner stopped (context cancelled)", "type", runtimeType) + } } }() } @@ -90,6 +123,21 @@ func (r *agentRegistry) stopAndWait(id string) { r.deps.agentBus.Unsubscribe(bus.AgentID(id)) } +// startAgent re-launches a stopped (but registered) agent by calling reload. +// Used by the API StartUnifiedAgent flow. +// Returns error if agent is not found in the registry. +func (r *agentRegistry) startAgent(id string, rulesFor func(string, *slog.Logger) []decision.Rule) error { + r.mu.Lock() + _, exists := r.agents[id] + r.mu.Unlock() + if !exists { + return fmt.Errorf("agent %q not found in registry", id) + } + // reload re-reads config and restarts the runner + r.reload(id, rulesFor) + return nil +} + // reload stops an agent, re-reads its config, recreates it, and restarts it. func (r *agentRegistry) reload(id string, rulesFor func(string, *slog.Logger) []decision.Rule) { r.mu.Lock() @@ -192,12 +240,7 @@ func (r *agentRegistry) reload(id string, rulesFor func(string, *slog.Logger) [] if runtimeType == "" { runtimeType = "agent" } - go func() { - newLogger.Info("runner started", "type", runtimeType) - if err := newRunner.Run(r.deps.parentCtx); err != nil { - newLogger.Error("runner stopped with error", "err", err, "type", runtimeType) - } - }() + r.launchGoroutine(newRA, runtimeType) newLogger.Info("runner_reloaded", "id", id, "type", runtimeType) } diff --git a/internal/api/handlers.go b/internal/api/handlers.go index 1f176cf..a945266 100644 --- a/internal/api/handlers.go +++ b/internal/api/handlers.go @@ -1,28 +1,35 @@ package api import ( + "database/sql" "encoding/json" "fmt" "net/http" + "os" + "path/filepath" "strconv" + "sync" "time" "github.com/enmanuel/agents/shell/process" + _ "modernc.org/sqlite" // pure-Go SQLite driver (same as launcher) ) // --- Response types --- // AgentResponse is the JSON representation of an agent. type AgentResponse struct { - ID string `json:"id"` - Name string `json:"name"` - Version string `json:"version"` - Desc string `json:"desc"` - Enabled bool `json:"enabled"` - Running bool `json:"running"` - PID int `json:"pid,omitempty"` - Instances int `json:"instances"` - ConfigPath string `json:"config_path"` + ID string `json:"id"` + Name string `json:"name"` + Version string `json:"version"` + Desc string `json:"desc"` + Enabled bool `json:"enabled"` + Running bool `json:"running"` + PID int `json:"pid,omitempty"` + Instances int `json:"instances"` + ConfigPath string `json:"config_path"` + UptimeSeconds int64 `json:"uptime_seconds"` + Messages24h int `json:"messages_24h"` } // AgentDetailResponse extends AgentResponse with logs. @@ -31,20 +38,71 @@ type AgentDetailResponse struct { Logs []string `json:"logs"` } +// msg24hCache caches messages_24h counts per agent to avoid hammering SQLite. +type msg24hEntry struct { + count int + fetchAt time.Time +} + +var ( + msg24hMu sync.Mutex + msg24hCache = make(map[string]msg24hEntry) + msg24hTTL = 30 * time.Second +) + func agentResponse(s process.AgentStatus) AgentResponse { return AgentResponse{ - ID: s.ID, - Name: s.Name, - Version: s.Version, - Desc: s.Desc, - Enabled: s.Enabled, - Running: s.Running, - PID: s.PID, - Instances: s.Instances, - ConfigPath: s.ConfigPath, + ID: s.ID, + Name: s.Name, + Version: s.Version, + Desc: s.Desc, + Enabled: s.Enabled, + Running: s.Running, + PID: s.PID, + Instances: s.Instances, + ConfigPath: s.ConfigPath, + UptimeSeconds: s.UptimeSeconds, } } +// queryMessages24h returns the count of messages in the past 24h for the given agent. +// Uses a 30s cache keyed by agentID. dataDir is the base data directory +// (e.g. "agents//data"). Returns 0 on error (non-fatal). +func queryMessages24h(agentID, dataDir string) int { + msg24hMu.Lock() + if e, ok := msg24hCache[agentID]; ok && time.Since(e.fetchAt) < msg24hTTL { + msg24hMu.Unlock() + return e.count + } + msg24hMu.Unlock() + + dbPath := filepath.Join(dataDir, "memory.db") + if _, err := os.Stat(dbPath); err != nil { + return 0 // DB does not exist yet + } + + db, err := sql.Open("sqlite", dbPath+"?mode=ro&_query_only=1") + if err != nil { + return 0 + } + defer db.Close() + + var count int + row := db.QueryRow( + "SELECT COUNT(*) FROM messages WHERE agent_id=? AND created_at > datetime('now','-24 hours')", + agentID, + ) + if err := row.Scan(&count); err != nil { + return 0 + } + + msg24hMu.Lock() + msg24hCache[agentID] = msg24hEntry{count: count, fetchAt: time.Now()} + msg24hMu.Unlock() + + return count +} + // --- Health --- func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { @@ -72,7 +130,13 @@ func (s *Server) handleListAgents(w http.ResponseWriter, r *http.Request) { } resp := make([]AgentResponse, 0, len(statuses)) for _, st := range statuses { - resp = append(resp, agentResponse(st)) + ar := agentResponse(st) + // Enrich with messages_24h when dataDir is configured + if s.dataDir != "" { + agentDataDir := filepath.Join(s.dataDir, st.ID, "data") + ar.Messages24h = queryMessages24h(st.ID, agentDataDir) + } + resp = append(resp, ar) } writeJSON(w, http.StatusOK, resp) } @@ -117,6 +181,19 @@ func (s *Server) handleGetAgent(w http.ResponseWriter, r *http.Request) { func (s *Server) handleStartAgent(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") + + // Unified mode: delegate to AgentController if available + if s.mgr.IsUnifiedRunning() && s.controller != nil { + if err := s.controller.StartUnifiedAgent(id); err != nil { + writeError(w, http.StatusConflict, fmt.Sprintf("start (unified): %v", err)) + return + } + s.logger.Info("agent started via api (unified)", "id", id) + writeJSON(w, http.StatusOK, map[string]string{"status": "started", "id": id, "mode": "unified"}) + return + } + + // Multi-process mode: use per-agent process launch agents, err := s.mgr.Scan() if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err)) @@ -147,6 +224,19 @@ func (s *Server) handleStartAgent(w http.ResponseWriter, r *http.Request) { func (s *Server) handleStopAgent(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") + + // Unified mode: cancel goroutine context without killing launcher + if s.mgr.IsUnifiedRunning() && s.controller != nil { + if err := s.controller.StopUnifiedAgent(id); err != nil { + writeError(w, http.StatusConflict, fmt.Sprintf("stop (unified): %v", err)) + return + } + s.logger.Info("agent stopped via api (unified)", "id", id) + writeJSON(w, http.StatusOK, map[string]string{"status": "stopped", "id": id, "mode": "unified"}) + return + } + + // Multi-process mode if err := s.mgr.Stop(id); err != nil { writeError(w, http.StatusConflict, fmt.Sprintf("stop: %v", err)) return @@ -160,6 +250,24 @@ func (s *Server) handleStopAgent(w http.ResponseWriter, r *http.Request) { func (s *Server) handleRestartAgent(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") + // Unified mode: stop goroutine then re-launch + if s.mgr.IsUnifiedRunning() && s.controller != nil { + // Stop (ignore not-running error) + _ = s.controller.StopUnifiedAgent(id) + + // Brief pause to let goroutine exit cleanly + time.Sleep(500 * time.Millisecond) + + if err := s.controller.StartUnifiedAgent(id); err != nil { + writeError(w, http.StatusConflict, fmt.Sprintf("restart/start (unified): %v", err)) + return + } + s.logger.Info("agent restarted via api (unified)", "id", id) + writeJSON(w, http.StatusOK, map[string]string{"status": "restarted", "id": id, "mode": "unified"}) + return + } + + // Multi-process mode // Stop first (ignore not-running error) _ = s.mgr.Stop(id) @@ -267,6 +375,149 @@ func (s *Server) handleSSEStatus(w http.ResponseWriter, r *http.Request) { } } +// --- Clear memory --- + +func (s *Server) handleClearMemory(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + + // Determine whether restart after clear is requested. + restart := r.URL.Query().Get("restart") == "true" + + // In unified mode, stop the agent goroutine before touching its DB. + wasRunning := false + if s.mgr.IsUnifiedRunning() && s.controller != nil { + wasRunning = s.mgr.IsUnifiedAgentRunning(id) + if wasRunning { + if err := s.controller.StopUnifiedAgent(id); err != nil { + writeError(w, http.StatusConflict, fmt.Sprintf("clear_memory/stop: %v", err)) + return + } + // Give goroutine a moment to release the DB. + time.Sleep(300 * time.Millisecond) + } + } + + // Locate the agent's memory.db. + if s.dataDir == "" { + writeError(w, http.StatusInternalServerError, "data_dir not configured on server") + return + } + dbPath := filepath.Join(s.dataDir, id, "data", "memory.db") + if _, err := os.Stat(dbPath); err != nil { + // No memory.db — still a success (nothing to clear). + writeJSON(w, http.StatusOK, map[string]any{ + "status": "cleared", + "messages_deleted": 0, + "facts_deleted": 0, + }) + return + } + + db, err := sql.Open("sqlite", dbPath) + if err != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("open memory.db: %v", err)) + return + } + defer db.Close() + + var msgDel, factsDel int64 + + res, err := db.ExecContext(r.Context(), "DELETE FROM messages WHERE agent_id=?", id) + if err != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("delete messages: %v", err)) + return + } + msgDel, _ = res.RowsAffected() + + res, err = db.ExecContext(r.Context(), "DELETE FROM facts WHERE agent_id=?", id) + if err != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("delete facts: %v", err)) + return + } + factsDel, _ = res.RowsAffected() + + // Invalidate the 24h cache entry for this agent. + msg24hMu.Lock() + delete(msg24hCache, id) + msg24hMu.Unlock() + + s.logger.Info("agent memory cleared via api", "id", id, + "messages_deleted", msgDel, "facts_deleted", factsDel) + + // Optionally restart. + if (restart || wasRunning) && s.mgr.IsUnifiedRunning() && s.controller != nil { + _ = s.controller.StartUnifiedAgent(id) + } + + writeJSON(w, http.StatusOK, map[string]any{ + "status": "cleared", + "messages_deleted": msgDel, + "facts_deleted": factsDel, + }) +} + +// --- Delete cache --- + +func (s *Server) handleDeleteCache(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + + restart := r.URL.Query().Get("restart") == "true" + + // Stop in unified mode before removing crypto dir. + wasRunning := false + if s.mgr.IsUnifiedRunning() && s.controller != nil { + wasRunning = s.mgr.IsUnifiedAgentRunning(id) + if wasRunning { + if err := s.controller.StopUnifiedAgent(id); err != nil { + writeError(w, http.StatusConflict, fmt.Sprintf("delete_cache/stop: %v", err)) + return + } + time.Sleep(300 * time.Millisecond) + } + } + + if s.dataDir == "" { + writeError(w, http.StatusInternalServerError, "data_dir not configured on server") + return + } + + agentDataDir := filepath.Join(s.dataDir, id, "data") + var deleted []string + + // Remove crypto directory (session keys, verification cache). + cryptoDir := filepath.Join(agentDataDir, "crypto") + if _, err := os.Stat(cryptoDir); err == nil { + if err := os.RemoveAll(cryptoDir); err != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("remove crypto: %v", err)) + return + } + deleted = append(deleted, cryptoDir) + } + + // Remove cache directory contents (but keep the dir itself). + cacheDir := filepath.Join(agentDataDir, "cache") + if entries, err := os.ReadDir(cacheDir); err == nil { + for _, e := range entries { + p := filepath.Join(cacheDir, e.Name()) + if err := os.RemoveAll(p); err == nil { + deleted = append(deleted, p) + } + } + } + + s.logger.Info("agent cache deleted via api", "id", id, "paths", len(deleted)) + + // Optionally restart. + if (restart || wasRunning) && s.mgr.IsUnifiedRunning() && s.controller != nil { + _ = s.controller.StartUnifiedAgent(id) + } + + writeJSON(w, http.StatusOK, map[string]any{ + "status": "cleared", + "paths_deleted": deleted, + }) +} + // --- SSE: agent log tail --- func (s *Server) handleSSEAgentLogs(w http.ResponseWriter, r *http.Request) { diff --git a/internal/api/server.go b/internal/api/server.go index 12f48b2..543220e 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -22,13 +22,28 @@ import ( "github.com/enmanuel/agents/shell/process" ) +// AgentController is an optional interface for per-agent unified-mode control. +// The launcher can implement this to allow the API to stop/start individual +// agent goroutines without restarting the whole process. +type AgentController interface { + // StopUnifiedAgent cancels the goroutine context for the agent with the given ID. + // Returns an error if the agent is not currently running in unified mode. + StopUnifiedAgent(id string) error + // StartUnifiedAgent re-launches the agent goroutine for the given ID. + // Returns an error if the agent is not registered. + StartUnifiedAgent(id string) error +} + // Server is the HTTP API server. type Server struct { - mgr *process.Manager - apiKey string - port int - logger *slog.Logger - bus *Bus + mgr *process.Manager + apiKey string + port int + logger *slog.Logger + bus *Bus + controller AgentController // optional: per-agent unified control (nil = not available) + // dataDir is the base directory for agent runtime data used for memory/cache queries. + dataDir string } // New creates a new Server. apiKey is compared with subtle.ConstantTimeCompare. @@ -46,6 +61,18 @@ func New(mgr *process.Manager, apiKey string, port int, logger *slog.Logger) *Se } } +// WithController attaches an AgentController for unified-mode per-agent control. +func (s *Server) WithController(c AgentController) *Server { + s.controller = c + return s +} + +// WithDataDir sets the base directory for agent runtime data (memory.db, crypto/). +func (s *Server) WithDataDir(dir string) *Server { + s.dataDir = dir + return s +} + // Run starts the HTTP server and blocks until ctx is done. // It also starts the status-diff poller that feeds /sse/status. func (s *Server) Run(ctx context.Context) error { @@ -61,6 +88,8 @@ func (s *Server) Run(ctx context.Context) error { mux.Handle("POST /agents/{id}/stop", s.auth(http.HandlerFunc(s.handleStopAgent))) mux.Handle("POST /agents/{id}/restart", s.auth(http.HandlerFunc(s.handleRestartAgent))) mux.Handle("GET /agents/{id}/logs", s.auth(http.HandlerFunc(s.handleAgentLogs))) + mux.Handle("POST /agents/{id}/clear_memory", s.auth(http.HandlerFunc(s.handleClearMemory))) + mux.Handle("POST /agents/{id}/delete_cache", s.auth(http.HandlerFunc(s.handleDeleteCache))) // SSE endpoints mux.Handle("GET /sse/status", s.auth(http.HandlerFunc(s.handleSSEStatus))) diff --git a/shell/process/manager.go b/shell/process/manager.go index 49f53d0..81abe36 100644 --- a/shell/process/manager.go +++ b/shell/process/manager.go @@ -4,12 +4,14 @@ package process import ( "bufio" + "context" "fmt" "os" "os/exec" "path/filepath" "strconv" "strings" + "sync" "syscall" "time" @@ -29,9 +31,10 @@ type AgentInfo struct { // AgentStatus combines agent metadata with runtime state. type AgentStatus struct { AgentInfo - Running bool - PID int - Instances int + Running bool + PID int + Instances int + UptimeSeconds int64 // seconds since agent goroutine started (unified mode) or 0 } // ProcessStats holds resource usage for a running process. @@ -91,11 +94,25 @@ type Manager struct { binPath string envFile string // path to .env file for child processes prober processProber + + // unifiedMode tracks per-agent goroutine cancel functions and start times + // when the unified launcher is running (all agents as goroutines). + unifiedMu sync.RWMutex + unifiedCancels map[string]context.CancelFunc + startedAt map[string]time.Time } // NewManager creates a Manager. binPath can be empty for auto-detection. func NewManager(runDir, agentsGlob, binPath string) *Manager { - return &Manager{runDir: runDir, agentsGlob: agentsGlob, binPath: binPath, envFile: ".env", prober: osProber{}} + return &Manager{ + runDir: runDir, + agentsGlob: agentsGlob, + binPath: binPath, + envFile: ".env", + prober: osProber{}, + unifiedCancels: make(map[string]context.CancelFunc), + startedAt: make(map[string]time.Time), + } } // Scan discovers all agents from config files. @@ -484,8 +501,63 @@ func (m *Manager) UnifiedLogTail(lines int) ([]string, error) { return m.LogTail(unifiedID, lines) } +// ── Per-agent unified control ───────────────────────────────────────────── + +// RegisterUnifiedAgent registers a cancel function and start time for an agent +// goroutine running inside the unified launcher. Called by the launcher runtime. +func (m *Manager) RegisterUnifiedAgent(id string, cancel context.CancelFunc) { + m.unifiedMu.Lock() + defer m.unifiedMu.Unlock() + m.unifiedCancels[id] = cancel + m.startedAt[id] = time.Now() +} + +// UnregisterUnifiedAgent removes the cancel function for an agent goroutine. +// Called when the goroutine exits. +func (m *Manager) UnregisterUnifiedAgent(id string) { + m.unifiedMu.Lock() + defer m.unifiedMu.Unlock() + delete(m.unifiedCancels, id) + delete(m.startedAt, id) +} + +// StopUnifiedAgent cancels the goroutine context for a specific agent without +// stopping the launcher process. Returns error if agent is not registered. +func (m *Manager) StopUnifiedAgent(id string) error { + m.unifiedMu.RLock() + cancel, ok := m.unifiedCancels[id] + m.unifiedMu.RUnlock() + if !ok { + return fmt.Errorf("agent %q is not registered in unified mode (not running)", id) + } + cancel() + m.UnregisterUnifiedAgent(id) + return nil +} + +// IsUnifiedAgentRunning returns true if the agent goroutine is registered. +func (m *Manager) IsUnifiedAgentRunning(id string) bool { + m.unifiedMu.RLock() + defer m.unifiedMu.RUnlock() + _, ok := m.unifiedCancels[id] + return ok +} + +// UptimeSeconds returns how long an agent has been running since registration. +// Returns 0 if the agent is not registered or not running. +func (m *Manager) UptimeSeconds(id string) int64 { + m.unifiedMu.RLock() + defer m.unifiedMu.RUnlock() + if t, ok := m.startedAt[id]; ok { + return int64(time.Since(t).Seconds()) + } + return 0 +} + // StatusAllUnified returns status for all agents, deriving "running" from -// whether the unified launcher is running + the agent is enabled. +// whether the unified launcher is running + per-agent registration. +// When per-agent cancel registration is available (via RegisterUnifiedAgent), +// running reflects the individual goroutine state rather than launcher-wide enabled. func (m *Manager) StatusAllUnified() ([]AgentStatus, error) { agents, err := m.Scan() if err != nil { @@ -494,9 +566,20 @@ func (m *Manager) StatusAllUnified() ([]AgentStatus, error) { launcherRunning := m.IsUnifiedRunning() launcherPID := m.UnifiedPID() + m.unifiedMu.RLock() + hasPerAgentTracking := len(m.unifiedCancels) > 0 + m.unifiedMu.RUnlock() + statuses := make([]AgentStatus, len(agents)) for i, a := range agents { - running := launcherRunning && a.Enabled + var running bool + if hasPerAgentTracking { + // Per-agent goroutine tracking: check individual registration + running = m.IsUnifiedAgentRunning(a.ID) + } else { + // Fallback: launcher running + agent enabled + running = launcherRunning && a.Enabled + } pid := 0 instances := 0 if running { @@ -504,10 +587,11 @@ func (m *Manager) StatusAllUnified() ([]AgentStatus, error) { instances = 1 } statuses[i] = AgentStatus{ - AgentInfo: a, - Running: running, - PID: pid, - Instances: instances, + AgentInfo: a, + Running: running, + PID: pid, + Instances: instances, + UptimeSeconds: m.UptimeSeconds(a.ID), } } return statuses, nil