Files
agents_and_robots/cmd/launcher/registry.go
T
egutierrez 10f0614fc0 fix(launcher): supervisar agentes y reiniciarlos cuando salen sin cancelacion
El launcher salia con status=0 cuando todos los runners (Agent/Robot)
terminaban su Run() de forma natural — por ejemplo tras una rotacion de
token de Matrix o un drop del sync. systemd, configurado con
Restart=on-failure, no relanzaba el proceso al ver salida limpia y los
bots quedaban caidos hasta una intervencion manual.

Solucion: nueva rutina superviseUntilCanceled en agentRegistry que
bloquea sobre waitAll, y si el ctx padre sigue vivo, espera un backoff
y llama reloadAll para recrear los runners. Solo cuando el ctx padre
se cancela (SIGINT/SIGTERM) la rutina retorna y el launcher sale.

main.go pasa a invocar este supervisor en lugar de waitAll directo.

Tests:
- TestSuperviseUntilCanceled_ReturnsWhenCtxCanceledFirst — empty registry
- TestSuperviseUntilCanceled_ReturnsAfterCtxCancelDuringBackoff — cancel
  durante el backoff debe desbloquear inmediatamente
- TestSuperviseUntilCanceled_CallsReloadOnAgentExit — supervisor sigue
  vivo todo el deadline aunque reload falle por cfgPath invalido

Diagnostico: tras varias horas el journalctl mostraba "Deactivated
successfully" sin "Stopping" previo (Apr 13 18:22 tras 23h corriendo)
y el log del agent registraba "context canceled" tras "starting matrix
sync" — sintoma de que mautrix.SyncWithContext salio limpiamente y el
ctx.cancel se propago al cerrar la goroutine sin que systemd hubiera
enviado SIGTERM. El bucle supervisado lo arregla recreando los runners
sin tocar la unit ni depender del Restart de systemd.
2026-05-09 14:55:41 +02:00

287 lines
7.2 KiB
Go

package main
import (
"context"
"log/slog"
"os"
"strings"
"sync"
"time"
"github.com/enmanuel/agents/devagents"
"github.com/enmanuel/agents/internal/config"
"github.com/enmanuel/agents/pkg/decision"
"github.com/enmanuel/agents/pkg/orchestration"
pksecurity "github.com/enmanuel/agents/pkg/security"
"github.com/enmanuel/agents/shell/bus"
agentlog "github.com/enmanuel/agents/shell/logger"
)
// runningAgent holds a live runner (Agent or Robot) and the metadata needed to recreate it.
type runningAgent struct {
runner devagents.Runner
cfg *config.AgentConfig
cfgPath string
logger *slog.Logger
logCleanup func()
}
// launchDeps holds shared resources needed to start/reload agents.
type launchDeps struct {
agentBus *bus.Bus
orch *orchHandle
logDir string
logLevel slog.Level
parentCtx context.Context
secPolicy pksecurity.SecurityPolicy // centralized security policy loaded from security/
}
// agentRegistry tracks all running agents by ID, enabling individual hot-reload.
type agentRegistry struct {
mu sync.Mutex
agents map[string]*runningAgent
deps *launchDeps
}
func newAgentRegistry(deps *launchDeps) *agentRegistry {
return &agentRegistry{
agents: make(map[string]*runningAgent),
deps: deps,
}
}
// register adds a running agent/robot to the registry and starts its goroutine.
func (r *agentRegistry) register(ra *runningAgent) {
r.mu.Lock()
r.agents[ra.cfg.Agent.ID] = ra
r.mu.Unlock()
runtimeType := ra.cfg.Agent.Type
if runtimeType == "" {
runtimeType = "agent"
}
go func() {
ra.logger.Info("runner started", "type", runtimeType)
if err := ra.runner.Run(r.deps.parentCtx); err != nil {
ra.logger.Error("runner stopped with error", "err", err, "type", runtimeType)
}
}()
}
// stopAndWait stops a running agent/robot and waits for it to finish.
// Caller must NOT hold r.mu.
func (r *agentRegistry) stopAndWait(id string) {
r.mu.Lock()
ra, ok := r.agents[id]
r.mu.Unlock()
if !ok {
return
}
ra.runner.Stop()
select {
case <-ra.runner.Done():
case <-time.After(10 * time.Second):
ra.logger.Warn("runner did not stop within 10s, forcing", "id", id)
}
// Unsubscribe from bus so no stale channel remains.
r.deps.agentBus.Unsubscribe(bus.AgentID(id))
}
// reload stops an agent, re-reads its config, recreates it, and restarts it.
func (r *agentRegistry) reload(id string, rulesFor func(string, *slog.Logger) []decision.Rule) {
r.mu.Lock()
ra, ok := r.agents[id]
r.mu.Unlock()
if !ok {
slog.Warn("reload: agent not found", "id", id)
return
}
cfgPath := ra.cfgPath
oldCleanup := ra.logCleanup
ra.logger.Info("agent_reload_start", "id", id)
// 1. Stop current instance and wait.
r.stopAndWait(id)
// 2. Cleanup old log writer.
if oldCleanup != nil {
oldCleanup()
}
// 3. Re-read config.
cfg, err := config.Load(cfgPath)
if err != nil {
slog.Error("reload: failed to load config", "path", cfgPath, "err", err)
return
}
if !cfg.Agent.Enabled {
slog.Info("reload: agent is disabled, not restarting", "id", id)
r.mu.Lock()
delete(r.agents, id)
r.mu.Unlock()
return
}
// 4. New per-agent logger.
newLogger, newCleanup, aErr := agentlog.NewAgentLogger(agentlog.LoggerConfig{
BaseDir: r.deps.logDir,
AgentID: cfg.Agent.ID,
Level: r.deps.logLevel,
})
if aErr != nil {
newLogger = slog.Default().With("agent", cfg.Agent.ID)
newCleanup = func() {}
}
// 5. Create new runner (validates config before discarding the old one).
var newRunner devagents.Runner
if cfg.Agent.Type == "robot" {
robot, rErr := devagents.NewRobot(cfg, newLogger)
if rErr != nil {
newLogger.Error("reload: failed to create robot", "id", id, "err", rErr)
newCleanup()
return
}
newRunner = robot
} else {
rules := rulesFor(cfg.Agent.ID, newLogger)
agentACL := pksecurity.ResolveACL(cfg.Agent.ID, r.deps.secPolicy)
newLogger.Debug("resolved acl for agent (reload)", "agent", cfg.Agent.ID, "acl_empty", agentACL.Empty())
newAgent, aErr := devagents.New(cfg, rules, agentACL, newLogger)
if aErr != nil {
newLogger.Error("reload: failed to create agent", "id", id, "err", aErr)
newCleanup()
return
}
// Wire bus and orchestration (only for agents, not robots).
newAgent.SetBus(r.deps.agentBus)
if r.deps.orch != nil {
newAgent.SetInterceptor(r.deps.orch.orchestrator.Intercept)
newAgent.SetMembershipNotify(r.deps.orch.orchestrator.NotifyMembership)
r.deps.orch.orchestrator.RegisterParticipant(orchestration.ParticipantInfo{
ID: cfg.Agent.ID,
MatrixUserID: cfg.Matrix.UserID,
Description: cfg.Agent.Description,
Capabilities: cfg.Agent.Tags,
})
}
newRunner = newAgent
}
newRA := &runningAgent{
runner: newRunner,
cfg: cfg,
cfgPath: cfgPath,
logger: newLogger,
logCleanup: newCleanup,
}
r.mu.Lock()
r.agents[id] = newRA
r.mu.Unlock()
// 7. Start new goroutine.
runtimeType := cfg.Agent.Type
if runtimeType == "" {
runtimeType = "agent"
}
go func() {
newLogger.Info("runner started", "type", runtimeType)
if err := newRunner.Run(r.deps.parentCtx); err != nil {
newLogger.Error("runner stopped with error", "err", err, "type", runtimeType)
}
}()
newLogger.Info("runner_reloaded", "id", id, "type", runtimeType)
}
// reloadAll reloads every registered agent sequentially.
func (r *agentRegistry) reloadAll(rulesFor func(string, *slog.Logger) []decision.Rule) {
r.mu.Lock()
ids := make([]string, 0, len(r.agents))
for id := range r.agents {
ids = append(ids, id)
}
r.mu.Unlock()
for _, id := range ids {
r.reload(id, rulesFor)
}
}
// waitAll blocks until all registered runners have stopped.
func (r *agentRegistry) waitAll() {
r.mu.Lock()
dones := make([]<-chan struct{}, 0, len(r.agents))
for _, ra := range r.agents {
dones = append(dones, ra.runner.Done())
}
r.mu.Unlock()
for _, done := range dones {
<-done
}
}
// superviseUntilCanceled blocks until ctx is canceled, restarting agents
// (via reloadAll) every time waitAll returns while the parent ctx is alive.
// Each restart waits restartBackoff before recreating runners. Used by the
// launcher main loop so the process keeps the agents up across token rotation
// or sync drops without exiting cleanly to systemd.
func (r *agentRegistry) superviseUntilCanceled(
ctx context.Context,
restartBackoff time.Duration,
rulesFor func(string, *slog.Logger) []decision.Rule,
logger *slog.Logger,
) {
for {
r.waitAll()
if ctx.Err() != nil {
return
}
if logger != nil {
logger.Warn("all agents stopped while launcher active — restarting after backoff",
"backoff", restartBackoff.String())
}
select {
case <-ctx.Done():
return
case <-time.After(restartBackoff):
}
r.reloadAll(rulesFor)
}
}
// cleanupLogs calls every agent's log cleanup function (called on launcher shutdown).
func (r *agentRegistry) cleanupLogs() {
r.mu.Lock()
defer r.mu.Unlock()
for _, ra := range r.agents {
if ra.logCleanup != nil {
ra.logCleanup()
}
}
}
// readReloadTarget reads the given file and returns the trimmed content.
// Returns "" if the file doesn't exist, is empty, or equals "*" (meaning reload all).
func readReloadTarget(path string) string {
data, err := os.ReadFile(path)
if err != nil {
return ""
}
id := strings.TrimSpace(string(data))
if id == "*" {
return ""
}
return id
}