diff --git a/cmd/agentctl/main.go b/cmd/agentctl/main.go index 4e59412..29ebb32 100644 --- a/cmd/agentctl/main.go +++ b/cmd/agentctl/main.go @@ -13,19 +13,15 @@ package main import ( "fmt" "os" - "os/exec" - "path/filepath" - "strconv" "strings" - "syscall" "github.com/spf13/cobra" - "github.com/enmanuel/agents/internal/config" + "github.com/enmanuel/agents/shell/process" ) const ( - runDir = "run" // PID + log files + runDir = "run" agentsGlob = "agents/*/config.yaml" ) @@ -34,6 +30,8 @@ const ( func main() { var binPath string + mgr := process.NewManager(runDir, agentsGlob, "") + root := &cobra.Command{ Use: "agentctl", Short: "Manage Matrix agents", @@ -46,10 +44,10 @@ func main() { "Launcher binary path. Defaults to ./bin/launcher, falls back to 'go run ./cmd/launcher'") root.AddCommand( - listCmd(), - startCmd(&binPath), - stopCmd(), - removeCmd(), + listCmd(mgr), + startCmd(mgr, &binPath), + stopCmd(mgr), + removeCmd(mgr), avatarCmd(), displaynameCmd(), ) @@ -61,29 +59,29 @@ func main() { // ── list ────────────────────────────────────────────────────────────────── -func listCmd() *cobra.Command { +func listCmd(mgr *process.Manager) *cobra.Command { return &cobra.Command{ Use: "list", Short: "List all agents and their current status", Aliases: []string{"ls"}, RunE: func(cmd *cobra.Command, args []string) error { - agents, err := scanAgents() + statuses, err := mgr.StatusAll() if err != nil { return err } - if len(agents) == 0 { + if len(statuses) == 0 { fmt.Println("No agents found under agents/*/config.yaml") return nil } fmt.Printf("%-20s %-12s %-8s %s\n", "ID", "STATUS", "VERSION", "DESCRIPTION") fmt.Println(strings.Repeat("─", 72)) - for _, a := range agents { + for _, s := range statuses { fmt.Printf("%-20s %-12s %-8s %s\n", - a.ID, - statusLabel(a), - a.Version, - truncate(a.Desc, 36), + s.ID, + statusLabel(s), + s.Version, + truncate(s.Desc, 36), ) } return nil @@ -93,41 +91,39 @@ func listCmd() *cobra.Command { // ── start ───────────────────────────────────────────────────────────────── -func startCmd(binPath *string) *cobra.Command { +func startCmd(mgr *process.Manager, binPath *string) *cobra.Command { return &cobra.Command{ Use: "start [agent-id...]", Short: "Start one or all enabled agents", RunE: func(cmd *cobra.Command, args []string) error { - agents, err := scanAgents() + statuses, err := mgr.StatusAll() if err != nil { return err } - targets := filterTargets(agents, args) + targets := filterTargets(statuses, args) if len(targets) == 0 { return fmt.Errorf("no matching agents found") } - bin := resolvedBin(*binPath) started := 0 - - for _, a := range targets { - if !a.Enabled { - fmt.Printf("skip %-20s (disabled in config)\n", a.ID) + for _, s := range targets { + if !s.Enabled { + fmt.Printf("skip %-20s (disabled in config)\n", s.ID) continue } - if isRunning(a.ID) { - fmt.Printf("skip %-20s (already running, PID %d)\n", a.ID, readPID(a.ID)) + if s.Running { + fmt.Printf("skip %-20s (already running, PID %d)\n", s.ID, s.PID) continue } - if err := startAgent(a, bin); err != nil { - fmt.Fprintf(os.Stderr, "fail %-20s %v\n", a.ID, err) + if err := mgr.Start(s.AgentInfo); err != nil { + fmt.Fprintf(os.Stderr, "fail %-20s %v\n", s.ID, err) continue } fmt.Printf("start %-20s PID %d log → %s\n", - a.ID, readPID(a.ID), logPath(a.ID)) + s.ID, mgr.ReadPID(s.ID), mgr.LogPath(s.ID)) started++ } @@ -141,34 +137,33 @@ func startCmd(binPath *string) *cobra.Command { // ── stop ────────────────────────────────────────────────────────────────── -func stopCmd() *cobra.Command { +func stopCmd(mgr *process.Manager) *cobra.Command { return &cobra.Command{ Use: "stop [agent-id...]", Short: "Stop one or all running agents", RunE: func(cmd *cobra.Command, args []string) error { - agents, err := scanAgents() + statuses, err := mgr.StatusAll() if err != nil { return err } - targets := filterTargets(agents, args) + targets := filterTargets(statuses, args) if len(targets) == 0 { return fmt.Errorf("no matching agents found") } stopped := 0 - for _, a := range targets { - pid := readPID(a.ID) - if pid == 0 || !isRunning(a.ID) { - fmt.Printf("skip %-20s (not running)\n", a.ID) + for _, s := range targets { + if !s.Running { + fmt.Printf("skip %-20s (not running)\n", s.ID) continue } - if err := syscall.Kill(pid, syscall.SIGTERM); err != nil { - fmt.Fprintf(os.Stderr, "fail %-20s kill: %v\n", a.ID, err) + pid := s.PID + if err := mgr.Stop(s.ID); err != nil { + fmt.Fprintf(os.Stderr, "fail %-20s %v\n", s.ID, err) continue } - removePIDFile(a.ID) - fmt.Printf("stop %-20s sent SIGTERM to PID %d\n", a.ID, pid) + fmt.Printf("stop %-20s stopped PID %d\n", s.ID, pid) stopped++ } @@ -182,7 +177,7 @@ func stopCmd() *cobra.Command { // ── remove ──────────────────────────────────────────────────────────────── -func removeCmd() *cobra.Command { +func removeCmd(mgr *process.Manager) *cobra.Command { return &cobra.Command{ Use: "remove ", Short: "Disable an agent (sets enabled: false). Does not delete data.", @@ -190,15 +185,15 @@ func removeCmd() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { id := args[0] - agents, err := scanAgents() + statuses, err := mgr.StatusAll() if err != nil { return err } - var target *agentInfo - for i := range agents { - if agents[i].ID == id { - target = &agents[i] + var target *process.AgentStatus + for i := range statuses { + if statuses[i].ID == id { + target = &statuses[i] break } } @@ -206,15 +201,14 @@ func removeCmd() *cobra.Command { return fmt.Errorf("agent %q not found", id) } - // Stop if running - if isRunning(id) { - pid := readPID(id) - _ = syscall.Kill(pid, syscall.SIGTERM) - removePIDFile(id) - fmt.Printf("stop %-20s sent SIGTERM to PID %d\n", id, pid) + if target.Running { + if err := mgr.Stop(id); err != nil { + fmt.Fprintf(os.Stderr, "warn stop failed: %v\n", err) + } else { + fmt.Printf("stop %-20s stopped PID %d\n", id, target.PID) + } } - // Disable in config (preserves comments) if err := setEnabled(target.ConfigPath, false); err != nil { return fmt.Errorf("update config: %w", err) } @@ -226,124 +220,44 @@ func removeCmd() *cobra.Command { } } -// ── agent scanning ──────────────────────────────────────────────────────── +// ── helpers ─────────────────────────────────────────────────────────────── -type agentInfo struct { - ID string - Version string - Enabled bool - Desc string - ConfigPath string -} - -func scanAgents() ([]agentInfo, error) { - matches, err := filepath.Glob(agentsGlob) - if err != nil { - return nil, err - } - - var agents []agentInfo - for _, path := range matches { - // Use LoadMeta so list works even when env vars aren't set. - cfg, err := config.LoadMeta(path) - if err != nil { - fmt.Fprintf(os.Stderr, "warn skipping %s: %v\n", path, err) - continue - } - agents = append(agents, agentInfo{ - ID: cfg.Agent.ID, - Version: cfg.Agent.Version, - Enabled: cfg.Agent.Enabled, - Desc: cfg.Agent.Description, - ConfigPath: path, - }) - } - return agents, nil -} - -func filterTargets(agents []agentInfo, ids []string) []agentInfo { +func filterTargets(statuses []process.AgentStatus, ids []string) []process.AgentStatus { if len(ids) == 0 { - return agents // no filter → all + return statuses } set := make(map[string]bool, len(ids)) for _, id := range ids { set[id] = true } - var out []agentInfo - for _, a := range agents { - if set[a.ID] { - out = append(out, a) + var out []process.AgentStatus + for _, s := range statuses { + if set[s.ID] { + out = append(out, s) } } return out } -// ── process management ──────────────────────────────────────────────────── - -func startAgent(a agentInfo, bin string) error { - logFile, err := os.OpenFile(logPath(a.ID), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) - if err != nil { - return fmt.Errorf("open log: %w", err) +func statusLabel(s process.AgentStatus) string { + switch { + case !s.Enabled: + return "disabled" + case s.Running: + return "● running" + default: + return "○ stopped" } - - var cmd *exec.Cmd - if strings.HasPrefix(bin, "go run") { - // dev mode: go run ./cmd/launcher -c - cmd = exec.Command("go", "run", "./cmd/launcher", "-c", a.ConfigPath) - } else { - cmd = exec.Command(bin, "-c", a.ConfigPath) - } - - cmd.Stdout = logFile - cmd.Stderr = logFile - // Detach from the parent process group so it keeps running after agentctl exits - cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true} - - if err := cmd.Start(); err != nil { - logFile.Close() - return fmt.Errorf("exec: %w", err) - } - - // Write PID file — the subprocess owns its lifecycle now - if err := os.WriteFile(pidPath(a.ID), []byte(strconv.Itoa(cmd.Process.Pid)), 0o644); err != nil { - return fmt.Errorf("write PID: %w", err) - } - - // Detach: don't wait for the process - go func() { _ = cmd.Wait() }() - - return nil } -func isRunning(id string) bool { - pid := readPID(id) - if pid == 0 { - return false +func truncate(s string, max int) string { + if len(s) <= max { + return s } - err := syscall.Kill(pid, 0) // signal 0 checks existence without killing - return err == nil + return s[:max-1] + "…" } -func readPID(id string) int { - raw, err := os.ReadFile(pidPath(id)) - if err != nil { - return 0 - } - pid, _ := strconv.Atoi(strings.TrimSpace(string(raw))) - return pid -} - -func removePIDFile(id string) { - _ = os.Remove(pidPath(id)) -} - -func pidPath(id string) string { return filepath.Join(runDir, id+".pid") } -func logPath(id string) string { return filepath.Join(runDir, id+".log") } - -// ── config editing ──────────────────────────────────────────────────────── - // setEnabled flips `enabled: true/false` in the agent section of the YAML. -// Uses text replacement to preserve all comments. func setEnabled(configPath string, enabled bool) error { raw, err := os.ReadFile(configPath) if err != nil { @@ -359,40 +273,8 @@ func setEnabled(configPath string, enabled bool) error { updated := strings.Replace(string(raw), current, replacement, 1) if updated == string(raw) { - return nil // already in the desired state + return nil } return os.WriteFile(configPath, []byte(updated), 0o644) } - -// ── display helpers ─────────────────────────────────────────────────────── - -func statusLabel(a agentInfo) string { - switch { - case !a.Enabled: - return "disabled" - case isRunning(a.ID): - return "● running" - default: - return "○ stopped" - } -} - -func truncate(s string, max int) string { - if len(s) <= max { - return s - } - return s[:max-1] + "…" -} - -// resolvedBin returns the launcher binary path to use. -// Priority: --bin flag > ./bin/launcher (if exists) > go run fallback. -func resolvedBin(flagVal string) string { - if flagVal != "" { - return flagVal - } - if _, err := os.Stat("bin/launcher"); err == nil { - return "bin/launcher" - } - return "go run ./cmd/launcher" -} diff --git a/shell/process/manager.go b/shell/process/manager.go new file mode 100644 index 0000000..c69fa0a --- /dev/null +++ b/shell/process/manager.go @@ -0,0 +1,304 @@ +// Package process manages agent processes: discovery, start, stop, kill, stats. +// This is the impure shell layer — all I/O happens here. +package process + +import ( + "bufio" + "fmt" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "syscall" + "time" + + "github.com/enmanuel/agents/internal/config" +) + +// AgentInfo holds metadata about an agent parsed from its config. +type AgentInfo struct { + ID string + Name string + Version string + Desc string + ConfigPath string + Enabled bool +} + +// AgentStatus combines agent metadata with runtime state. +type AgentStatus struct { + AgentInfo + Running bool + PID int +} + +// ProcessStats holds resource usage for a running process. +type ProcessStats struct { + PID int + UptimeSecs int64 + MemRSSKB int64 + CPUPct float64 + LogBytes int64 +} + +// Manager handles agent process lifecycle. +type Manager struct { + runDir string + agentsGlob string + binPath string +} + +// NewManager creates a Manager. binPath can be empty for auto-detection. +func NewManager(runDir, agentsGlob, binPath string) *Manager { + return &Manager{runDir: runDir, agentsGlob: agentsGlob, binPath: binPath} +} + +// Scan discovers all agents from config files. +func (m *Manager) Scan() ([]AgentInfo, error) { + matches, err := filepath.Glob(m.agentsGlob) + if err != nil { + return nil, err + } + + var agents []AgentInfo + for _, path := range matches { + cfg, err := config.LoadMeta(path) + if err != nil { + continue + } + agents = append(agents, AgentInfo{ + ID: cfg.Agent.ID, + Name: cfg.Agent.Name, + Version: cfg.Agent.Version, + Desc: cfg.Agent.Description, + ConfigPath: path, + Enabled: cfg.Agent.Enabled, + }) + } + return agents, nil +} + +// Status returns the runtime status for a single agent. +func (m *Manager) Status(info AgentInfo) AgentStatus { + pid := m.readPID(info.ID) + running := pid > 0 && m.isAlive(pid) + return AgentStatus{AgentInfo: info, Running: running, PID: pid} +} + +// StatusAll returns status for every discovered agent. +func (m *Manager) StatusAll() ([]AgentStatus, error) { + agents, err := m.Scan() + if err != nil { + return nil, err + } + statuses := make([]AgentStatus, len(agents)) + for i, a := range agents { + statuses[i] = m.Status(a) + } + return statuses, nil +} + +// Start launches an agent process in the background. +func (m *Manager) Start(info AgentInfo) error { + if err := os.MkdirAll(m.runDir, 0o755); err != nil { + return fmt.Errorf("create run dir: %w", err) + } + + logFile, err := os.OpenFile(m.logPath(info.ID), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + return fmt.Errorf("open log: %w", err) + } + + bin := m.resolvedBin() + var cmd *exec.Cmd + if strings.HasPrefix(bin, "go run") { + cmd = exec.Command("go", "run", "./cmd/launcher", "-c", info.ConfigPath) + } else { + cmd = exec.Command(bin, "-c", info.ConfigPath) + } + + cmd.Stdout = logFile + cmd.Stderr = logFile + cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true} + + if err := cmd.Start(); err != nil { + logFile.Close() + return fmt.Errorf("exec: %w", err) + } + + if err := os.WriteFile(m.pidPath(info.ID), []byte(strconv.Itoa(cmd.Process.Pid)), 0o644); err != nil { + return fmt.Errorf("write PID: %w", err) + } + + go func() { _ = cmd.Wait() }() + return nil +} + +// Stop sends SIGTERM, waits up to 5s, then SIGKILL if needed. +func (m *Manager) Stop(id string) error { + pid := m.readPID(id) + if pid == 0 || !m.isAlive(pid) { + return fmt.Errorf("agent %q is not running", id) + } + + if err := syscall.Kill(pid, syscall.SIGTERM); err != nil { + return fmt.Errorf("SIGTERM: %w", err) + } + + // Wait up to 5 seconds for graceful shutdown. + for i := 0; i < 10; i++ { + if !m.isAlive(pid) { + m.removePID(id) + return nil + } + time.Sleep(500 * time.Millisecond) + } + + // Force kill. + if m.isAlive(pid) { + _ = syscall.Kill(pid, syscall.SIGKILL) + } + m.removePID(id) + return nil +} + +// Kill sends SIGKILL immediately. +func (m *Manager) Kill(id string) error { + pid := m.readPID(id) + if pid == 0 || !m.isAlive(pid) { + return fmt.Errorf("agent %q is not running", id) + } + err := syscall.Kill(pid, syscall.SIGKILL) + m.removePID(id) + return err +} + +// Stats gathers resource usage for a running agent from /proc. +func (m *Manager) Stats(id string) (ProcessStats, error) { + pid := m.readPID(id) + if pid == 0 || !m.isAlive(pid) { + return ProcessStats{}, fmt.Errorf("agent %q is not running", id) + } + + s := ProcessStats{PID: pid} + + // Uptime from /proc//stat + if data, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid)); err == nil { + fields := strings.Fields(string(data)) + if len(fields) > 21 { + startTicks, _ := strconv.ParseInt(fields[21], 10, 64) + clkTck := int64(100) // sysconf(_SC_CLK_TCK) is 100 on Linux + if raw, err := os.ReadFile("/proc/stat"); err == nil { + for _, line := range strings.Split(string(raw), "\n") { + if strings.HasPrefix(line, "btime ") { + btime, _ := strconv.ParseInt(strings.Fields(line)[1], 10, 64) + procStart := btime + startTicks/clkTck + s.UptimeSecs = time.Now().Unix() - procStart + break + } + } + } + } + } + + // RSS from /proc//status + if data, err := os.ReadFile(fmt.Sprintf("/proc/%d/status", pid)); err == nil { + for _, line := range strings.Split(string(data), "\n") { + if strings.HasPrefix(line, "VmRSS:") { + fields := strings.Fields(line) + if len(fields) >= 2 { + s.MemRSSKB, _ = strconv.ParseInt(fields[1], 10, 64) + } + break + } + } + } + + // CPU% from ps (simpler than calculating from /proc/stat deltas) + if out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=").Output(); err == nil { + s.CPUPct, _ = strconv.ParseFloat(strings.TrimSpace(string(out)), 64) + } + + // Log file size + if info, err := os.Stat(m.logPath(id)); err == nil { + s.LogBytes = info.Size() + } + + return s, nil +} + +// LogTail returns the last N lines of an agent's log. +func (m *Manager) LogTail(id string, lines int) ([]string, error) { + f, err := os.Open(m.logPath(id)) + if err != nil { + return nil, fmt.Errorf("open log: %w", err) + } + defer f.Close() + + // Read all lines and keep last N. For large files a reverse scanner + // would be better, but agent logs are typically small. + var all []string + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for scanner.Scan() { + all = append(all, scanner.Text()) + } + if err := scanner.Err(); err != nil { + return nil, err + } + + if len(all) > lines { + all = all[len(all)-lines:] + } + return all, nil +} + +// IsRunning checks if an agent process is alive. +func (m *Manager) IsRunning(id string) bool { + pid := m.readPID(id) + return pid > 0 && m.isAlive(pid) +} + +// ReadPID returns the PID from the PID file, or 0. +func (m *Manager) ReadPID(id string) int { + return m.readPID(id) +} + +// PidPath returns the path to the PID file for an agent. +func (m *Manager) PidPath(id string) string { return m.pidPath(id) } + +// LogPath returns the path to the log file for an agent. +func (m *Manager) LogPath(id string) string { return m.logPath(id) } + +// ── internal helpers ───────────────────────────────────────────────────── + +func (m *Manager) pidPath(id string) string { return filepath.Join(m.runDir, id+".pid") } +func (m *Manager) logPath(id string) string { return filepath.Join(m.runDir, id+".log") } + +func (m *Manager) readPID(id string) int { + raw, err := os.ReadFile(m.pidPath(id)) + if err != nil { + return 0 + } + pid, _ := strconv.Atoi(strings.TrimSpace(string(raw))) + return pid +} + +func (m *Manager) isAlive(pid int) bool { + return syscall.Kill(pid, 0) == nil +} + +func (m *Manager) removePID(id string) { + _ = os.Remove(m.pidPath(id)) +} + +func (m *Manager) resolvedBin() string { + if m.binPath != "" { + return m.binPath + } + if _, err := os.Stat("bin/launcher"); err == nil { + return "bin/launcher" + } + return "go run ./cmd/launcher" +}