feat: implement server-wide management actions and enhance TUI dashboard

This commit is contained in:
2026-03-04 20:51:02 +00:00
parent 150f9d2990
commit ddec55871b
13 changed files with 621 additions and 52 deletions
+170 -22
View File
@@ -47,11 +47,12 @@ type Manager struct {
runDir string
agentsGlob string
binPath string
envFile string // path to .env file for child processes
}
// NewManager creates a Manager. binPath can be empty for auto-detection.
func NewManager(runDir, agentsGlob, binPath string) *Manager {
return &Manager{runDir: runDir, agentsGlob: agentsGlob, binPath: binPath}
return &Manager{runDir: runDir, agentsGlob: agentsGlob, binPath: binPath, envFile: ".env"}
}
// Scan discovers all agents from config files.
@@ -81,8 +82,8 @@ func (m *Manager) Scan() ([]AgentInfo, error) {
// Status returns the runtime status for a single agent.
func (m *Manager) Status(info AgentInfo) AgentStatus {
pid := m.readPID(info.ID)
running := pid > 0 && m.isAlive(pid)
pid := m.resolveRunningPID(info.ID)
running := pid > 0
return AgentStatus{AgentInfo: info, Running: running, PID: pid}
}
@@ -101,6 +102,12 @@ func (m *Manager) StatusAll() ([]AgentStatus, error) {
// Start launches an agent process in the background.
func (m *Manager) Start(info AgentInfo) error {
// Check for orphan instances
if existing := m.findProcessPIDs(info.ID); len(existing) > 0 {
return fmt.Errorf("agent %q already has %d running instance(s) (PIDs: %v) — stop them first",
info.ID, len(existing), existing)
}
if err := os.MkdirAll(m.runDir, 0o755); err != nil {
return fmt.Errorf("create run dir: %w", err)
}
@@ -113,11 +120,12 @@ func (m *Manager) Start(info AgentInfo) error {
bin := m.resolvedBin()
var cmd *exec.Cmd
if strings.HasPrefix(bin, "go run") {
cmd = exec.Command("go", "run", "./cmd/launcher", "-c", info.ConfigPath)
cmd = exec.Command("go", "run", "-tags", "goolm", "./cmd/launcher", "-c", info.ConfigPath)
} else {
cmd = exec.Command(bin, "-c", info.ConfigPath)
}
cmd.Env = m.buildEnv()
cmd.Stdout = logFile
cmd.Stderr = logFile
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
@@ -135,49 +143,94 @@ func (m *Manager) Start(info AgentInfo) error {
return nil
}
// Stop sends SIGTERM, waits up to 5s, then SIGKILL if needed.
// Stop sends SIGTERM to all instances, waits up to 5s, then SIGKILL if needed.
func (m *Manager) Stop(id string) error {
pid := m.readPID(id)
if pid == 0 || !m.isAlive(pid) {
pids := m.findProcessPIDs(id)
// Also include PID file PID if alive and not already in the list
filePID := m.readPID(id)
if filePID > 0 && m.isAlive(filePID) {
found := false
for _, p := range pids {
if p == filePID {
found = true
break
}
}
if !found {
pids = append(pids, filePID)
}
}
if len(pids) == 0 {
return fmt.Errorf("agent %q is not running", id)
}
if err := syscall.Kill(pid, syscall.SIGTERM); err != nil {
return fmt.Errorf("SIGTERM: %w", err)
// SIGTERM all instances
for _, pid := range pids {
_ = syscall.Kill(pid, syscall.SIGTERM)
}
// Wait up to 5 seconds for graceful shutdown.
for i := 0; i < 10; i++ {
if !m.isAlive(pid) {
allDead := true
for _, pid := range pids {
if m.isAlive(pid) {
allDead = false
break
}
}
if allDead {
m.removePID(id)
return nil
}
time.Sleep(500 * time.Millisecond)
}
// Force kill.
if m.isAlive(pid) {
_ = syscall.Kill(pid, syscall.SIGKILL)
// Force kill survivors.
for _, pid := range pids {
if m.isAlive(pid) {
_ = syscall.Kill(pid, syscall.SIGKILL)
}
}
m.removePID(id)
return nil
}
// Kill sends SIGKILL immediately.
// Kill sends SIGKILL to all instances immediately.
func (m *Manager) Kill(id string) error {
pid := m.readPID(id)
if pid == 0 || !m.isAlive(pid) {
pids := m.findProcessPIDs(id)
filePID := m.readPID(id)
if filePID > 0 && m.isAlive(filePID) {
found := false
for _, p := range pids {
if p == filePID {
found = true
break
}
}
if !found {
pids = append(pids, filePID)
}
}
if len(pids) == 0 {
return fmt.Errorf("agent %q is not running", id)
}
err := syscall.Kill(pid, syscall.SIGKILL)
var lastErr error
for _, pid := range pids {
if err := syscall.Kill(pid, syscall.SIGKILL); err != nil {
lastErr = err
}
}
m.removePID(id)
return err
return lastErr
}
// Stats gathers resource usage for a running agent from /proc.
func (m *Manager) Stats(id string) (ProcessStats, error) {
pid := m.readPID(id)
if pid == 0 || !m.isAlive(pid) {
pid := m.resolveRunningPID(id)
if pid == 0 {
return ProcessStats{}, fmt.Errorf("agent %q is not running", id)
}
@@ -256,8 +309,12 @@ func (m *Manager) LogTail(id string, lines int) ([]string, error) {
// IsRunning checks if an agent process is alive.
func (m *Manager) IsRunning(id string) bool {
pid := m.readPID(id)
return pid > 0 && m.isAlive(pid)
return m.resolveRunningPID(id) > 0
}
// InstanceCount returns how many launcher processes are running for an agent.
func (m *Manager) InstanceCount(id string) int {
return len(m.findProcessPIDs(id))
}
// ReadPID returns the PID from the PID file, or 0.
@@ -285,6 +342,70 @@ func (m *Manager) readPID(id string) int {
return pid
}
// findProcessPIDs searches for running launcher processes for a given agent ID
// using pgrep. Returns all matching PIDs.
func (m *Manager) findProcessPIDs(id string) []int {
// First try to find the config path for this agent
configPath := m.configPathFor(id)
if configPath == "" {
return nil
}
pattern := fmt.Sprintf("launcher.*-c.*%s", configPath)
out, err := exec.Command("pgrep", "-f", pattern).Output()
if err != nil {
return nil
}
var pids []int
for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") {
if p, err := strconv.Atoi(strings.TrimSpace(line)); err == nil && p > 0 {
pids = append(pids, p)
}
}
return pids
}
// configPathFor returns the config file path for the given agent ID.
func (m *Manager) configPathFor(id string) string {
matches, err := filepath.Glob(m.agentsGlob)
if err != nil {
return ""
}
for _, path := range matches {
cfg, err := config.LoadMeta(path)
if err != nil {
continue
}
if cfg.Agent.ID == id {
return path
}
}
return ""
}
// resolveRunningPID returns the PID of the running agent, checking the PID file
// first and falling back to process discovery. It also repairs stale PID files.
func (m *Manager) resolveRunningPID(id string) int {
// Check PID file first
pid := m.readPID(id)
if pid > 0 && m.isAlive(pid) {
return pid
}
// PID file is stale or missing — search for actual processes
pids := m.findProcessPIDs(id)
if len(pids) > 0 {
// Repair the PID file with the first found process
_ = os.WriteFile(m.pidPath(id), []byte(strconv.Itoa(pids[0])), 0o644)
return pids[0]
}
// Clean up stale PID file
if pid > 0 {
m.removePID(id)
}
return 0
}
func (m *Manager) isAlive(pid int) bool {
return syscall.Kill(pid, 0) == nil
}
@@ -293,6 +414,33 @@ func (m *Manager) removePID(id string) {
_ = os.Remove(m.pidPath(id))
}
// buildEnv returns the environment for child processes: current env + .env file vars.
func (m *Manager) buildEnv() []string {
env := os.Environ()
if m.envFile == "" {
return env
}
data, err := os.ReadFile(m.envFile)
if err != nil {
return env
}
// Parse KEY=VALUE lines, skip comments and blanks.
seen := make(map[string]bool)
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "#") {
continue
}
if idx := strings.Index(line, "="); idx > 0 {
key := line[:idx]
seen[key] = true
env = append(env, line)
}
}
_ = seen // .env values appended last, so they override earlier entries
return env
}
func (m *Manager) resolvedBin() string {
if m.binPath != "" {
return m.binPath