Files
agents_and_robots/shell/process/manager.go
T
egutierrez 261f96f71b feat(api): per-agent unified control + clear_memory + delete_cache
- Manager: RegisterUnifiedAgent/UnregisterUnifiedAgent/StopUnifiedAgent/
  IsUnifiedAgentRunning/UptimeSeconds — cancela goroutines individuales sin
  matar el launcher
- Manager: UptimeSeconds en AgentStatus via startedAt map
- api/server: AgentController interface + WithController/WithDataDir builders
  + rutas POST /agents/{id}/clear_memory y /agents/{id}/delete_cache
- api/handlers: handleStartAgent/Stop/Restart delegan a controller en modo
  unified; Messages24h enriquecido via queryMessages24h (cache 30s)
- api/handlers: handleClearMemory — para la goroutine, borra messages+facts de
  memory.db, responde {status,messages_deleted,facts_deleted}
- api/handlers: handleDeleteCache — para la goroutine, elimina crypto/ y cache/,
  responde {status,paths_deleted}
- launcher/registry: launchGoroutine extrae goroutine con contexto per-agente;
  deps.procMgr hookea RegisterUnified; startAgent permite relanzar via reload
- launcher/main: agentController implementa api.AgentController sobre registry;
  mgr compartido entre API y registry; WithController+WithDataDir cableados

Co-Authored-By: fn-orquestador <noreply@fn-registry>
2026-05-22 22:56:46 +02:00

777 lines
20 KiB
Go

// Package process manages agent processes: discovery, start, stop, kill, stats.
// This is the impure shell layer — all I/O happens here.
package process
import (
"bufio"
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/enmanuel/agents/internal/config"
)
// AgentInfo holds metadata about an agent parsed from its config.
type AgentInfo struct {
ID string
Name string
Version string
Desc string
ConfigPath string
Enabled bool
}
// AgentStatus combines agent metadata with runtime state.
type AgentStatus struct {
AgentInfo
Running bool
PID int
Instances int
UptimeSeconds int64 // seconds since agent goroutine started (unified mode) or 0
}
// ProcessStats holds resource usage for a running process.
type ProcessStats struct {
PID int
UptimeSecs int64
MemRSSKB int64
CPUPct float64
LogBytes int64
}
// processProber abstracts process detection for testing.
type processProber interface {
// pgrepPIDs runs pgrep -f with the given pattern and returns matching PIDs.
pgrepPIDs(pattern string) []int
// processComm returns the comm name for a PID (e.g. "launcher", "go").
processComm(pid int) string
// isAlive checks if a PID is running.
isAlive(pid int) bool
}
// osProber is the real implementation using OS calls.
type osProber struct{}
func (osProber) pgrepPIDs(pattern string) []int {
out, err := exec.Command("pgrep", "-f", pattern).Output()
if err != nil {
return nil
}
var pids []int
for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") {
if p, err := strconv.Atoi(strings.TrimSpace(line)); err == nil && p > 0 {
pids = append(pids, p)
}
}
return pids
}
func (osProber) processComm(pid int) string {
data, err := os.ReadFile(fmt.Sprintf("/proc/%d/comm", pid))
if err != nil {
return ""
}
return strings.TrimSpace(string(data))
}
func (osProber) isAlive(pid int) bool {
return syscall.Kill(pid, 0) == nil
}
const unifiedID = "launcher" // PID/log file ID for the unified launcher
// Manager handles agent process lifecycle.
type Manager struct {
runDir string
agentsGlob string
binPath string
envFile string // path to .env file for child processes
prober processProber
// unifiedMode tracks per-agent goroutine cancel functions and start times
// when the unified launcher is running (all agents as goroutines).
unifiedMu sync.RWMutex
unifiedCancels map[string]context.CancelFunc
startedAt map[string]time.Time
}
// NewManager creates a Manager. binPath can be empty for auto-detection.
func NewManager(runDir, agentsGlob, binPath string) *Manager {
return &Manager{
runDir: runDir,
agentsGlob: agentsGlob,
binPath: binPath,
envFile: ".env",
prober: osProber{},
unifiedCancels: make(map[string]context.CancelFunc),
startedAt: make(map[string]time.Time),
}
}
// Scan discovers all agents from config files.
func (m *Manager) Scan() ([]AgentInfo, error) {
matches, err := filepath.Glob(m.agentsGlob)
if err != nil {
return nil, err
}
var agents []AgentInfo
for _, path := range matches {
cfg, err := config.LoadMeta(path)
if err != nil {
continue
}
agents = append(agents, AgentInfo{
ID: cfg.Agent.ID,
Name: cfg.Agent.Name,
Version: cfg.Agent.Version,
Desc: cfg.Agent.Description,
ConfigPath: path,
Enabled: cfg.Agent.Enabled,
})
}
return agents, nil
}
// Status returns the runtime status for a single agent.
func (m *Manager) Status(info AgentInfo) AgentStatus {
pids := m.findProcessPIDs(info.ID)
primary := 0
if len(pids) > 0 {
primary = pids[0]
}
return AgentStatus{
AgentInfo: info,
Running: len(pids) > 0,
PID: primary,
Instances: len(pids),
}
}
// StatusAll returns status for every discovered agent.
func (m *Manager) StatusAll() ([]AgentStatus, error) {
agents, err := m.Scan()
if err != nil {
return nil, err
}
statuses := make([]AgentStatus, len(agents))
for i, a := range agents {
statuses[i] = m.Status(a)
}
return statuses, nil
}
// Start launches an agent process in the background.
// Returns an error if the agent is already running.
func (m *Manager) Start(info AgentInfo) error {
if pids := m.findProcessPIDs(info.ID); len(pids) > 0 {
return fmt.Errorf("agent %q is already running (PID %d)", info.ID, pids[0])
}
if err := os.MkdirAll(m.runDir, 0o755); err != nil {
return fmt.Errorf("create run dir: %w", err)
}
logFile, err := os.OpenFile(m.logPath(info.ID), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
return fmt.Errorf("open log: %w", err)
}
bin := m.resolvedBin()
var cmd *exec.Cmd
if strings.HasPrefix(bin, "go run") {
cmd = exec.Command("go", "run", "-tags", "goolm", "./cmd/launcher", "-c", info.ConfigPath)
} else {
cmd = exec.Command(bin, "-c", info.ConfigPath)
}
cmd.Env = m.BuildEnv()
cmd.Stdout = logFile
cmd.Stderr = logFile
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
if err := cmd.Start(); err != nil {
logFile.Close()
return fmt.Errorf("exec: %w", err)
}
if err := os.WriteFile(m.pidPath(info.ID), []byte(strconv.Itoa(cmd.Process.Pid)), 0o644); err != nil {
return fmt.Errorf("write PID: %w", err)
}
go func() { _ = cmd.Wait() }()
return nil
}
// Stop sends SIGTERM to all instances, waits up to 5s, then SIGKILL if needed.
func (m *Manager) Stop(id string) error {
pids := m.findProcessPIDs(id)
// Also include PID file PID if alive and not already in the list
filePID := m.readPID(id)
if filePID > 0 && m.isAlive(filePID) {
found := false
for _, p := range pids {
if p == filePID {
found = true
break
}
}
if !found {
pids = append(pids, filePID)
}
}
if len(pids) == 0 {
return fmt.Errorf("agent %q is not running", id)
}
// SIGTERM all instances
for _, pid := range pids {
_ = syscall.Kill(pid, syscall.SIGTERM)
}
// Wait up to 5 seconds for graceful shutdown.
for i := 0; i < 10; i++ {
allDead := true
for _, pid := range pids {
if m.isAlive(pid) {
allDead = false
break
}
}
if allDead {
m.removePID(id)
return nil
}
time.Sleep(500 * time.Millisecond)
}
// Force kill survivors.
for _, pid := range pids {
if m.isAlive(pid) {
_ = syscall.Kill(pid, syscall.SIGKILL)
}
}
m.removePID(id)
return nil
}
// Kill sends SIGKILL to all instances immediately.
func (m *Manager) Kill(id string) error {
pids := m.findProcessPIDs(id)
filePID := m.readPID(id)
if filePID > 0 && m.isAlive(filePID) {
found := false
for _, p := range pids {
if p == filePID {
found = true
break
}
}
if !found {
pids = append(pids, filePID)
}
}
if len(pids) == 0 {
return fmt.Errorf("agent %q is not running", id)
}
var lastErr error
for _, pid := range pids {
if err := syscall.Kill(pid, syscall.SIGKILL); err != nil {
lastErr = err
}
}
m.removePID(id)
return lastErr
}
// Stats gathers resource usage for a running agent from /proc.
func (m *Manager) Stats(id string) (ProcessStats, error) {
pid := m.resolveRunningPID(id)
if pid == 0 {
return ProcessStats{}, fmt.Errorf("agent %q is not running", id)
}
return m.statsForPID(pid, id), nil
}
// statsForPID gathers resource usage for a specific PID.
func (m *Manager) statsForPID(pid int, id string) ProcessStats {
s := ProcessStats{PID: pid}
// Uptime from /proc/<pid>/stat
if data, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid)); err == nil {
fields := strings.Fields(string(data))
if len(fields) > 21 {
startTicks, _ := strconv.ParseInt(fields[21], 10, 64)
clkTck := int64(100) // sysconf(_SC_CLK_TCK) is 100 on Linux
if raw, err := os.ReadFile("/proc/stat"); err == nil {
for _, line := range strings.Split(string(raw), "\n") {
if strings.HasPrefix(line, "btime ") {
btime, _ := strconv.ParseInt(strings.Fields(line)[1], 10, 64)
procStart := btime + startTicks/clkTck
s.UptimeSecs = time.Now().Unix() - procStart
break
}
}
}
}
}
// RSS from /proc/<pid>/status
if data, err := os.ReadFile(fmt.Sprintf("/proc/%d/status", pid)); err == nil {
for _, line := range strings.Split(string(data), "\n") {
if strings.HasPrefix(line, "VmRSS:") {
fields := strings.Fields(line)
if len(fields) >= 2 {
s.MemRSSKB, _ = strconv.ParseInt(fields[1], 10, 64)
}
break
}
}
}
// CPU% from ps (simpler than calculating from /proc/stat deltas)
if out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=").Output(); err == nil {
s.CPUPct, _ = strconv.ParseFloat(strings.TrimSpace(string(out)), 64)
}
// Log file size
if info, err := os.Stat(m.logPath(id)); err == nil {
s.LogBytes = info.Size()
}
return s
}
// LogTail returns the last N lines of an agent's log.
func (m *Manager) LogTail(id string, lines int) ([]string, error) {
f, err := os.Open(m.logPath(id))
if err != nil {
return nil, fmt.Errorf("open log: %w", err)
}
defer f.Close()
// Read all lines and keep last N. For large files a reverse scanner
// would be better, but agent logs are typically small.
var all []string
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
all = append(all, scanner.Text())
}
if err := scanner.Err(); err != nil {
return nil, err
}
if len(all) > lines {
all = all[len(all)-lines:]
}
return all, nil
}
// IsRunning checks if an agent process is alive.
func (m *Manager) IsRunning(id string) bool {
return m.resolveRunningPID(id) > 0
}
// InstanceCount returns how many launcher processes are running for an agent.
func (m *Manager) InstanceCount(id string) int {
return len(m.findProcessPIDs(id))
}
// ReadPID returns the PID from the PID file, or 0.
func (m *Manager) ReadPID(id string) int {
return m.readPID(id)
}
// PidPath returns the path to the PID file for an agent.
func (m *Manager) PidPath(id string) string { return m.pidPath(id) }
// LogPath returns the path to the log file for an agent.
func (m *Manager) LogPath(id string) string { return m.logPath(id) }
// Build compiles all project binaries by running build.sh.
// Returns the combined output and any error.
func (m *Manager) Build() (string, error) {
cmd := exec.Command("bash", "build.sh")
cmd.Env = m.BuildEnv()
out, err := cmd.CombinedOutput()
return string(out), err
}
// ── Unified launcher ─────────────────────────────────────────────────────
// The unified launcher runs ALL enabled agents + orchestrator in a single
// process. PID → run/launcher.pid, log → run/launcher.log.
// StartUnified launches the unified launcher (no -c flag → discovers all agents).
func (m *Manager) StartUnified() error {
if m.IsUnifiedRunning() {
return fmt.Errorf("unified launcher is already running (PID %d)", m.readPID(unifiedID))
}
if err := os.MkdirAll(m.runDir, 0o755); err != nil {
return fmt.Errorf("create run dir: %w", err)
}
logFile, err := os.OpenFile(m.logPath(unifiedID), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
return fmt.Errorf("open log: %w", err)
}
bin := m.resolvedBin()
var cmd *exec.Cmd
if strings.HasPrefix(bin, "go run") {
cmd = exec.Command("go", "run", "-tags", "goolm", "./cmd/launcher", "--log-level", "info")
} else {
cmd = exec.Command(bin, "--log-level", "info")
}
cmd.Env = m.BuildEnv()
cmd.Stdout = logFile
cmd.Stderr = logFile
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
if err := cmd.Start(); err != nil {
logFile.Close()
return fmt.Errorf("exec: %w", err)
}
if err := os.WriteFile(m.pidPath(unifiedID), []byte(strconv.Itoa(cmd.Process.Pid)), 0o644); err != nil {
return fmt.Errorf("write PID: %w", err)
}
go func() { _ = cmd.Wait() }()
return nil
}
// StopUnified stops the unified launcher process.
func (m *Manager) StopUnified() error {
return m.Stop(unifiedID)
}
// KillUnified sends SIGKILL to the unified launcher.
func (m *Manager) KillUnified() error {
return m.Kill(unifiedID)
}
// IsUnifiedRunning checks if the unified launcher is alive.
func (m *Manager) IsUnifiedRunning() bool {
pid := m.readPID(unifiedID)
if pid > 0 && m.isAlive(pid) {
return true
}
// Fallback: search for launcher running without -c flag
pids := m.findUnifiedPIDs()
return len(pids) > 0
}
// UnifiedPID returns the PID of the running unified launcher, or 0.
func (m *Manager) UnifiedPID() int {
pid := m.readPID(unifiedID)
if pid > 0 && m.isAlive(pid) {
return pid
}
pids := m.findUnifiedPIDs()
if len(pids) > 0 {
// Repair PID file
_ = os.WriteFile(m.pidPath(unifiedID), []byte(strconv.Itoa(pids[0])), 0o644)
return pids[0]
}
return 0
}
// UnifiedStats returns resource usage for the unified launcher process.
func (m *Manager) UnifiedStats() (ProcessStats, error) {
pid := m.UnifiedPID()
if pid == 0 {
return ProcessStats{}, fmt.Errorf("unified launcher is not running")
}
return m.statsForPID(pid, unifiedID), nil
}
// UnifiedLogTail returns the last N lines of the unified launcher log.
func (m *Manager) UnifiedLogTail(lines int) ([]string, error) {
return m.LogTail(unifiedID, lines)
}
// ── Per-agent unified control ─────────────────────────────────────────────
// RegisterUnifiedAgent registers a cancel function and start time for an agent
// goroutine running inside the unified launcher. Called by the launcher runtime.
func (m *Manager) RegisterUnifiedAgent(id string, cancel context.CancelFunc) {
m.unifiedMu.Lock()
defer m.unifiedMu.Unlock()
m.unifiedCancels[id] = cancel
m.startedAt[id] = time.Now()
}
// UnregisterUnifiedAgent removes the cancel function for an agent goroutine.
// Called when the goroutine exits.
func (m *Manager) UnregisterUnifiedAgent(id string) {
m.unifiedMu.Lock()
defer m.unifiedMu.Unlock()
delete(m.unifiedCancels, id)
delete(m.startedAt, id)
}
// StopUnifiedAgent cancels the goroutine context for a specific agent without
// stopping the launcher process. Returns error if agent is not registered.
func (m *Manager) StopUnifiedAgent(id string) error {
m.unifiedMu.RLock()
cancel, ok := m.unifiedCancels[id]
m.unifiedMu.RUnlock()
if !ok {
return fmt.Errorf("agent %q is not registered in unified mode (not running)", id)
}
cancel()
m.UnregisterUnifiedAgent(id)
return nil
}
// IsUnifiedAgentRunning returns true if the agent goroutine is registered.
func (m *Manager) IsUnifiedAgentRunning(id string) bool {
m.unifiedMu.RLock()
defer m.unifiedMu.RUnlock()
_, ok := m.unifiedCancels[id]
return ok
}
// UptimeSeconds returns how long an agent has been running since registration.
// Returns 0 if the agent is not registered or not running.
func (m *Manager) UptimeSeconds(id string) int64 {
m.unifiedMu.RLock()
defer m.unifiedMu.RUnlock()
if t, ok := m.startedAt[id]; ok {
return int64(time.Since(t).Seconds())
}
return 0
}
// StatusAllUnified returns status for all agents, deriving "running" from
// whether the unified launcher is running + per-agent registration.
// When per-agent cancel registration is available (via RegisterUnifiedAgent),
// running reflects the individual goroutine state rather than launcher-wide enabled.
func (m *Manager) StatusAllUnified() ([]AgentStatus, error) {
agents, err := m.Scan()
if err != nil {
return nil, err
}
launcherRunning := m.IsUnifiedRunning()
launcherPID := m.UnifiedPID()
m.unifiedMu.RLock()
hasPerAgentTracking := len(m.unifiedCancels) > 0
m.unifiedMu.RUnlock()
statuses := make([]AgentStatus, len(agents))
for i, a := range agents {
var running bool
if hasPerAgentTracking {
// Per-agent goroutine tracking: check individual registration
running = m.IsUnifiedAgentRunning(a.ID)
} else {
// Fallback: launcher running + agent enabled
running = launcherRunning && a.Enabled
}
pid := 0
instances := 0
if running {
pid = launcherPID
instances = 1
}
statuses[i] = AgentStatus{
AgentInfo: a,
Running: running,
PID: pid,
Instances: instances,
UptimeSeconds: m.UptimeSeconds(a.ID),
}
}
return statuses, nil
}
// ToggleEnabled sets the enabled field in an agent's config.yaml.
func (m *Manager) ToggleEnabled(id string, enabled bool) error {
agents, err := m.Scan()
if err != nil {
return err
}
for _, a := range agents {
if a.ID == id {
return m.setEnabledInConfig(a.ConfigPath, enabled)
}
}
return fmt.Errorf("agent %q not found", id)
}
// setEnabledInConfig rewrites the enabled field in a config.yaml.
func (m *Manager) setEnabledInConfig(path string, enabled bool) error {
data, err := os.ReadFile(path)
if err != nil {
return err
}
val := "false"
if enabled {
val = "true"
}
lines := strings.Split(string(data), "\n")
for i, line := range lines {
trimmed := strings.TrimSpace(line)
if strings.HasPrefix(trimmed, "enabled:") {
// Preserve indentation
indent := line[:len(line)-len(strings.TrimLeft(line, " \t"))]
lines[i] = indent + "enabled: " + val
break
}
}
return os.WriteFile(path, []byte(strings.Join(lines, "\n")), 0o644)
}
// findUnifiedPIDs finds launcher processes running without -c flag.
func (m *Manager) findUnifiedPIDs() []int {
// Search for launcher processes that do NOT have -c flag
raw := m.prober.pgrepPIDs("launcher.*--log-level")
var pids []int
for _, p := range raw {
comm := m.prober.processComm(p)
if comm == "go" {
continue
}
pids = append(pids, p)
}
return pids
}
// ── internal helpers ─────────────────────────────────────────────────────
func (m *Manager) pidPath(id string) string { return filepath.Join(m.runDir, id+".pid") }
func (m *Manager) logPath(id string) string { return filepath.Join(m.runDir, id+".log") }
func (m *Manager) readPID(id string) int {
raw, err := os.ReadFile(m.pidPath(id))
if err != nil {
return 0
}
pid, _ := strconv.Atoi(strings.TrimSpace(string(raw)))
return pid
}
// findProcessPIDs searches for running launcher processes for a given agent ID
// using pgrep. Filters out "go run" wrapper PIDs to avoid double-counting.
func (m *Manager) findProcessPIDs(id string) []int {
configPath := m.configPathFor(id)
if configPath == "" {
return nil
}
pattern := fmt.Sprintf("launcher.*-c.*%s", configPath)
raw := m.prober.pgrepPIDs(pattern)
// Filter out the "go" wrapper process that appears when using "go run".
var pids []int
for _, p := range raw {
comm := m.prober.processComm(p)
if comm == "go" {
continue
}
pids = append(pids, p)
}
return pids
}
// configPathFor returns the config file path for the given agent ID.
func (m *Manager) configPathFor(id string) string {
matches, err := filepath.Glob(m.agentsGlob)
if err != nil {
return ""
}
for _, path := range matches {
cfg, err := config.LoadMeta(path)
if err != nil {
continue
}
if cfg.Agent.ID == id {
return path
}
}
return ""
}
// resolveRunningPID returns the PID of the running agent, checking the PID file
// first and falling back to process discovery. It also repairs stale PID files.
func (m *Manager) resolveRunningPID(id string) int {
// Check PID file first
pid := m.readPID(id)
if pid > 0 && m.isAlive(pid) {
return pid
}
// PID file is stale or missing — search for actual processes
pids := m.findProcessPIDs(id)
if len(pids) > 0 {
// Repair the PID file with the first found process
_ = os.WriteFile(m.pidPath(id), []byte(strconv.Itoa(pids[0])), 0o644)
return pids[0]
}
// Clean up stale PID file
if pid > 0 {
m.removePID(id)
}
return 0
}
func (m *Manager) isAlive(pid int) bool {
return m.prober.isAlive(pid)
}
func (m *Manager) removePID(id string) {
_ = os.Remove(m.pidPath(id))
}
// BuildEnv returns the environment for child processes: current env + .env file vars.
func (m *Manager) BuildEnv() []string {
env := os.Environ()
if m.envFile == "" {
return env
}
data, err := os.ReadFile(m.envFile)
if err != nil {
return env
}
// Parse KEY=VALUE lines, skip comments and blanks.
seen := make(map[string]bool)
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "#") {
continue
}
if idx := strings.Index(line, "="); idx > 0 {
key := line[:idx]
seen[key] = true
env = append(env, line)
}
}
_ = seen // .env values appended last, so they override earlier entries
return env
}
func (m *Manager) resolvedBin() string {
if m.binPath != "" {
return m.binPath
}
if _, err := os.Stat("bin/launcher"); err == nil {
return "bin/launcher"
}
return "go run ./cmd/launcher"
}