package api import ( "database/sql" "encoding/json" "fmt" "net/http" "os" "path/filepath" "strconv" "sync" "time" "github.com/enmanuel/agents/shell/process" _ "modernc.org/sqlite" // pure-Go SQLite driver (same as launcher) ) // --- Response types --- // AgentResponse is the JSON representation of an agent. type AgentResponse struct { ID string `json:"id"` Name string `json:"name"` Version string `json:"version"` Desc string `json:"desc"` Enabled bool `json:"enabled"` Running bool `json:"running"` PID int `json:"pid,omitempty"` Instances int `json:"instances"` ConfigPath string `json:"config_path"` UptimeSeconds int64 `json:"uptime_seconds"` Messages24h int `json:"messages_24h"` } // AgentDetailResponse extends AgentResponse with logs. type AgentDetailResponse struct { AgentResponse Logs []string `json:"logs"` } // msg24hCache caches messages_24h counts per agent to avoid hammering SQLite. type msg24hEntry struct { count int fetchAt time.Time } var ( msg24hMu sync.Mutex msg24hCache = make(map[string]msg24hEntry) msg24hTTL = 30 * time.Second ) func agentResponse(s process.AgentStatus) AgentResponse { return AgentResponse{ ID: s.ID, Name: s.Name, Version: s.Version, Desc: s.Desc, Enabled: s.Enabled, Running: s.Running, PID: s.PID, Instances: s.Instances, ConfigPath: s.ConfigPath, UptimeSeconds: s.UptimeSeconds, } } // queryMessages24h returns the count of messages in the past 24h for the given agent. // Uses a 30s cache keyed by agentID. dataDir is the base data directory // (e.g. "agents//data"). Returns 0 on error (non-fatal). func queryMessages24h(agentID, dataDir string) int { msg24hMu.Lock() if e, ok := msg24hCache[agentID]; ok && time.Since(e.fetchAt) < msg24hTTL { msg24hMu.Unlock() return e.count } msg24hMu.Unlock() dbPath := filepath.Join(dataDir, "memory.db") if _, err := os.Stat(dbPath); err != nil { return 0 // DB does not exist yet } db, err := sql.Open("sqlite", dbPath+"?mode=ro&_query_only=1") if err != nil { return 0 } defer db.Close() var count int row := db.QueryRow( "SELECT COUNT(*) FROM messages WHERE agent_id=? AND created_at > datetime('now','-24 hours')", agentID, ) if err := row.Scan(&count); err != nil { return 0 } msg24hMu.Lock() msg24hCache[agentID] = msg24hEntry{count: count, fetchAt: time.Now()} msg24hMu.Unlock() return count } // --- Recent status events --- // handleStatusRecent returns the last N status-diff events from the bus ring // buffer (default 100, cap 100). Lets a new client populate its Status Feed // panel with history before subscribing to /sse/status for live updates. func (s *Server) handleStatusRecent(w http.ResponseWriter, r *http.Request) { n := 100 if qn := r.URL.Query().Get("n"); qn != "" { if parsed, err := strconv.Atoi(qn); err == nil && parsed > 0 { n = parsed } } events := s.bus.Recent("status", n) writeJSON(w, http.StatusOK, events) } // --- Health --- func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ok", "time": time.Now().UTC().Format(time.RFC3339)}) } // statusAllAuto chooses unified vs multi-process status based on runtime mode. // In unified mode all agents run as goroutines under one launcher process — per-agent // PID files do not exist, so StatusAll reports Running=false. StatusAllUnified // reflects the real state. func (s *Server) statusAllAuto() ([]process.AgentStatus, error) { if s.mgr.IsUnifiedRunning() { return s.mgr.StatusAllUnified() } return s.mgr.StatusAll() } // --- List agents --- func (s *Server) handleListAgents(w http.ResponseWriter, r *http.Request) { statuses, err := s.statusAllAuto() if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err)) return } resp := make([]AgentResponse, 0, len(statuses)) for _, st := range statuses { ar := agentResponse(st) // Enrich with messages_24h when dataDir is configured if s.dataDir != "" { agentDataDir := filepath.Join(s.dataDir, st.ID, "data") ar.Messages24h = queryMessages24h(st.ID, agentDataDir) } resp = append(resp, ar) } writeJSON(w, http.StatusOK, resp) } // --- Get single agent --- func (s *Server) handleGetAgent(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") statuses, err := s.statusAllAuto() if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err)) return } var found *process.AgentStatus for i, st := range statuses { if st.ID == id { found = &statuses[i] break } } if found == nil { writeError(w, http.StatusNotFound, "agent not found") return } n := 200 if qn := r.URL.Query().Get("n"); qn != "" { if parsed, err := strconv.Atoi(qn); err == nil && parsed > 0 { n = parsed } } logs, _ := s.mgr.LogTail(id, n) writeJSON(w, http.StatusOK, AgentDetailResponse{ AgentResponse: agentResponse(*found), Logs: logs, }) } // --- Start agent --- func (s *Server) handleStartAgent(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") // Unified mode: delegate to AgentController if available if s.mgr.IsUnifiedRunning() && s.controller != nil { if err := s.controller.StartUnifiedAgent(id); err != nil { writeError(w, http.StatusConflict, fmt.Sprintf("start (unified): %v", err)) return } s.logger.Info("agent started via api (unified)", "id", id) writeJSON(w, http.StatusOK, map[string]string{"status": "started", "id": id, "mode": "unified"}) return } // Multi-process mode: use per-agent process launch agents, err := s.mgr.Scan() if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err)) return } var info *process.AgentInfo for i, a := range agents { if a.ID == id { info = &agents[i] break } } if info == nil { writeError(w, http.StatusNotFound, "agent not found") return } if err := s.mgr.Start(*info); err != nil { writeError(w, http.StatusConflict, fmt.Sprintf("start: %v", err)) return } s.logger.Info("agent started via api", "id", id) writeJSON(w, http.StatusOK, map[string]string{"status": "started", "id": id}) } // --- Stop agent --- func (s *Server) handleStopAgent(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") // Unified mode: cancel goroutine context without killing launcher if s.mgr.IsUnifiedRunning() && s.controller != nil { if err := s.controller.StopUnifiedAgent(id); err != nil { writeError(w, http.StatusConflict, fmt.Sprintf("stop (unified): %v", err)) return } s.logger.Info("agent stopped via api (unified)", "id", id) writeJSON(w, http.StatusOK, map[string]string{"status": "stopped", "id": id, "mode": "unified"}) return } // Multi-process mode if err := s.mgr.Stop(id); err != nil { writeError(w, http.StatusConflict, fmt.Sprintf("stop: %v", err)) return } s.logger.Info("agent stopped via api", "id", id) writeJSON(w, http.StatusOK, map[string]string{"status": "stopped", "id": id}) } // --- Restart agent --- func (s *Server) handleRestartAgent(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") // Unified mode: stop goroutine then re-launch if s.mgr.IsUnifiedRunning() && s.controller != nil { // Stop (ignore not-running error) _ = s.controller.StopUnifiedAgent(id) // Brief pause to let goroutine exit cleanly time.Sleep(500 * time.Millisecond) if err := s.controller.StartUnifiedAgent(id); err != nil { writeError(w, http.StatusConflict, fmt.Sprintf("restart/start (unified): %v", err)) return } s.logger.Info("agent restarted via api (unified)", "id", id) writeJSON(w, http.StatusOK, map[string]string{"status": "restarted", "id": id, "mode": "unified"}) return } // Multi-process mode // Stop first (ignore not-running error) _ = s.mgr.Stop(id) // Wait up to 3s for process to die deadline := time.Now().Add(3 * time.Second) for time.Now().Before(deadline) { if !s.mgr.IsRunning(id) { break } time.Sleep(200 * time.Millisecond) } // Find agent info for Start agents, err := s.mgr.Scan() if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err)) return } var info *process.AgentInfo for i, a := range agents { if a.ID == id { info = &agents[i] break } } if info == nil { writeError(w, http.StatusNotFound, "agent not found") return } if err := s.mgr.Start(*info); err != nil { writeError(w, http.StatusConflict, fmt.Sprintf("restart/start: %v", err)) return } s.logger.Info("agent restarted via api", "id", id) writeJSON(w, http.StatusOK, map[string]string{"status": "restarted", "id": id}) } // --- Agent logs snapshot --- func (s *Server) handleAgentLogs(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") n := 200 if qn := r.URL.Query().Get("n"); qn != "" { if parsed, err := strconv.Atoi(qn); err == nil && parsed > 0 { n = parsed } } logs, err := s.mgr.LogTail(id, n) if err != nil { writeError(w, http.StatusNotFound, fmt.Sprintf("logs: %v", err)) return } writeJSON(w, http.StatusOK, map[string]any{"id": id, "lines": logs, "count": len(logs)}) } // --- SSE: status broadcast --- func (s *Server) handleSSEStatus(w http.ResponseWriter, r *http.Request) { flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming unsupported", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") w.WriteHeader(http.StatusOK) // Initial ping: SSE clients consider the stream "connected" only after // receiving the first byte of body. Without this, agents_dashboard sits // on "connecting" until the first status diff (which can be minutes away). fmt.Fprint(w, ": ping\n\n") flusher.Flush() sub := s.bus.Subscribe("status") defer s.bus.Unsubscribe("status", sub) ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() ctx := r.Context() for { select { case <-ctx.Done(): return case <-ticker.C: // Periodic heartbeat: keeps proxies (Traefik, CDN) from closing // the idle connection and lets the client detect dead servers. if _, err := fmt.Fprint(w, ": ping\n\n"); err != nil { return } flusher.Flush() case ev, ok := <-sub: if !ok { return } data, _ := json.Marshal(ev) fmt.Fprintf(w, "event: status\ndata: %s\n\n", data) flusher.Flush() } } } // --- Clear memory --- func (s *Server) handleClearMemory(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") // Determine whether restart after clear is requested. restart := r.URL.Query().Get("restart") == "true" // In unified mode, stop the agent goroutine before touching its DB. wasRunning := false if s.mgr.IsUnifiedRunning() && s.controller != nil { wasRunning = s.mgr.IsUnifiedAgentRunning(id) if wasRunning { if err := s.controller.StopUnifiedAgent(id); err != nil { writeError(w, http.StatusConflict, fmt.Sprintf("clear_memory/stop: %v", err)) return } // Give goroutine a moment to release the DB. time.Sleep(300 * time.Millisecond) } } // Locate the agent's memory.db. if s.dataDir == "" { writeError(w, http.StatusInternalServerError, "data_dir not configured on server") return } dbPath := filepath.Join(s.dataDir, id, "data", "memory.db") if _, err := os.Stat(dbPath); err != nil { // No memory.db — still a success (nothing to clear). writeJSON(w, http.StatusOK, map[string]any{ "status": "cleared", "messages_deleted": 0, "facts_deleted": 0, }) return } db, err := sql.Open("sqlite", dbPath) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("open memory.db: %v", err)) return } defer db.Close() var msgDel, factsDel int64 res, err := db.ExecContext(r.Context(), "DELETE FROM messages WHERE agent_id=?", id) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("delete messages: %v", err)) return } msgDel, _ = res.RowsAffected() res, err = db.ExecContext(r.Context(), "DELETE FROM facts WHERE agent_id=?", id) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("delete facts: %v", err)) return } factsDel, _ = res.RowsAffected() // Invalidate the 24h cache entry for this agent. msg24hMu.Lock() delete(msg24hCache, id) msg24hMu.Unlock() s.logger.Info("agent memory cleared via api", "id", id, "messages_deleted", msgDel, "facts_deleted", factsDel) // Optionally restart. if (restart || wasRunning) && s.mgr.IsUnifiedRunning() && s.controller != nil { _ = s.controller.StartUnifiedAgent(id) } writeJSON(w, http.StatusOK, map[string]any{ "status": "cleared", "messages_deleted": msgDel, "facts_deleted": factsDel, }) } // --- Delete cache --- func (s *Server) handleDeleteCache(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") restart := r.URL.Query().Get("restart") == "true" // Stop in unified mode before removing crypto dir. wasRunning := false if s.mgr.IsUnifiedRunning() && s.controller != nil { wasRunning = s.mgr.IsUnifiedAgentRunning(id) if wasRunning { if err := s.controller.StopUnifiedAgent(id); err != nil { writeError(w, http.StatusConflict, fmt.Sprintf("delete_cache/stop: %v", err)) return } time.Sleep(300 * time.Millisecond) } } if s.dataDir == "" { writeError(w, http.StatusInternalServerError, "data_dir not configured on server") return } agentDataDir := filepath.Join(s.dataDir, id, "data") var deleted []string // Remove crypto directory (session keys, verification cache). cryptoDir := filepath.Join(agentDataDir, "crypto") if _, err := os.Stat(cryptoDir); err == nil { if err := os.RemoveAll(cryptoDir); err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("remove crypto: %v", err)) return } deleted = append(deleted, cryptoDir) } // Remove cache directory contents (but keep the dir itself). cacheDir := filepath.Join(agentDataDir, "cache") if entries, err := os.ReadDir(cacheDir); err == nil { for _, e := range entries { p := filepath.Join(cacheDir, e.Name()) if err := os.RemoveAll(p); err == nil { deleted = append(deleted, p) } } } s.logger.Info("agent cache deleted via api", "id", id, "paths", len(deleted)) // Optionally restart. if (restart || wasRunning) && s.mgr.IsUnifiedRunning() && s.controller != nil { _ = s.controller.StartUnifiedAgent(id) } writeJSON(w, http.StatusOK, map[string]any{ "status": "cleared", "paths_deleted": deleted, }) } // --- SSE: agent log tail --- func (s *Server) handleSSEAgentLogs(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") logPath := s.mgr.LogPath(id) if logPath == "" { http.Error(w, "agent not found", http.StatusNotFound) return } flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming unsupported", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") w.WriteHeader(http.StatusOK) // Initial ping unblocks client fgets so the UI flips from "connecting" // to "connected" immediately (logfile may be silent for a while). fmt.Fprint(w, ": ping\n\n") flusher.Flush() ctx := r.Context() tailLogFile(ctx, logPath, w, flusher) }