From 069f8758b13f665984580abe87cbcaa1131e5f2c Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Sun, 8 Mar 2026 18:41:32 +0000 Subject: [PATCH] feat: hot-reload de agentes individuales via SIGHUP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implementa el mecanismo de hot-reload descrito en el issue 0013: - agents/runtime.go: añade Agent.Stop() y Agent.Done() para ciclo de vida individual. Run() crea un contexto hijo cancelable y cierra el canal done al retornar. - cmd/launcher/registry.go (nuevo): agentRegistry rastrea agentes vivos por ID. Métodos: register, stopAndWait, reload, reloadAll, waitAll, cleanupLogs. reload() sigue el flujo completo: stop→wait→unsubscribe →reload config→recreate→rewire bus/orch→start nueva goroutine. - cmd/launcher/main.go: usa agentRegistry en lugar de sync.WaitGroup. Añade handler de SIGHUP en goroutine separada que lee run/reload.txt para determinar el agente objetivo (* o vacío = todos). Tras leer, borra run/reload.txt para no afectar el siguiente SIGHUP. Co-Authored-By: Claude Sonnet 4.6 --- agents/runtime.go | 21 ++++ cmd/launcher/main.go | 92 +++++++++++----- cmd/launcher/registry.go | 231 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 316 insertions(+), 28 deletions(-) create mode 100644 cmd/launcher/registry.go diff --git a/agents/runtime.go b/agents/runtime.go index 6b07f28..19ab667 100644 --- a/agents/runtime.go +++ b/agents/runtime.go @@ -62,6 +62,10 @@ type Agent struct { logger *slog.Logger cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown + // Lifecycle — cancel stops this agent individually; done is closed when Run returns. + cancel context.CancelFunc + done chan struct{} + // Access control acl acl.ACL @@ -268,6 +272,7 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (* toolReg: toolReg, logger: logger, cryptoStore: cryptoStore, + done: make(chan struct{}), commands: make(map[string]CommandHandler), cmdAliases: command.BuiltinNames(), startTime: time.Now(), @@ -363,8 +368,24 @@ func (a *Agent) RawMatrixClient() *mautrix.Client { return a.matrix.Raw() } +// Stop cancels this agent's individual context, causing Run to return. +// Safe to call multiple times. +func (a *Agent) Stop() { + if a.cancel != nil { + a.cancel() + } +} + +// Done returns a channel that is closed when Run has returned. +func (a *Agent) Done() <-chan struct{} { + return a.done +} + // Run starts the agent sync loop. Blocks until ctx is cancelled. func (a *Agent) Run(ctx context.Context) error { + ctx, a.cancel = context.WithCancel(ctx) + defer close(a.done) + if a.cryptoStore != nil { defer a.cryptoStore.Close() } diff --git a/cmd/launcher/main.go b/cmd/launcher/main.go index 371e66b..2a85caf 100644 --- a/cmd/launcher/main.go +++ b/cmd/launcher/main.go @@ -12,7 +12,6 @@ import ( "os" "os/signal" "path/filepath" - "sync" "syscall" "time" @@ -72,13 +71,7 @@ func main() { logger.Warn("could not create file logger, falling back to stdout", "err", err) launcherCleanup = func() {} } - var cleanups []func() - cleanups = append(cleanups, launcherCleanup) - defer func() { - for _, fn := range cleanups { - fn() - } - }() + defer launcherCleanup() if len(configPaths) == 0 { logger.Warn("no agent configs found — nothing to start") @@ -96,15 +89,49 @@ func main() { if err != nil { // Non-fatal: orchestration is optional logger.Warn("orchestrator not started", "err", err) - } else { + } else if orch != nil { logger.Info("orchestrator initialized") } - // ── Start normal agents ── - var wg sync.WaitGroup - var scannerOnce sync.Once - var scanner *mautrix.Client + // ── Shared dependencies for agent registry ── + deps := &launchDeps{ + agentBus: agentBus, + orch: orch, + logDir: logDir, + logLevel: lvl, + parentCtx: ctx, + } + registry := newAgentRegistry(deps) + // ── SIGHUP: hot-reload individual agent or all agents ── + sighup := make(chan os.Signal, 1) + signal.Notify(sighup, syscall.SIGHUP) + go func() { + for { + select { + case <-ctx.Done(): + return + case _, ok := <-sighup: + if !ok { + return + } + id := readReloadTarget("run/reload.txt") + // Remove the target file after reading so it doesn't + // affect the next SIGHUP. + _ = os.Remove("run/reload.txt") + if id == "" { + logger.Info("sighup: reloading all agents") + registry.reloadAll(rulesFor) + } else { + logger.Info("sighup: reloading agent", "id", id) + registry.reload(id, rulesFor) + } + } + } + }() + + // ── Start normal agents ── + var scannerOnce scanOnce for _, path := range configPaths { path := path cfg, err := config.Load(path) @@ -130,11 +157,11 @@ func main() { agentLogger = logger.With("agent", cfg.Agent.ID) agentCleanup = func() {} } - cleanups = append(cleanups, agentCleanup) a, err := agents.New(cfg, rules, agentLogger) if err != nil { logger.Error("failed to create agent", "id", cfg.Agent.ID, "err", err) + agentCleanup() continue } @@ -154,30 +181,28 @@ func main() { }) // Grab the first available Matrix client for room scanning - scannerOnce.Do(func() { - scanner = a.RawMatrixClient() - }) + scannerOnce.set(a.RawMatrixClient()) } - wg.Add(1) - go func() { - defer wg.Done() - agentLogger.Info("agent running") - if err := a.Run(ctx); err != nil { - agentLogger.Error("agent stopped with error", "err", err) - } - }() + registry.register(&runningAgent{ + agent: a, + cfg: cfg, + cfgPath: path, + logger: agentLogger, + logCleanup: agentCleanup, + }) } // ── Startup room scan (after all participants are registered) ── - if orch != nil && scanner != nil { - orch.orchestrator.SetScanner(scanner) + if orch != nil && scannerOnce.client != nil { + orch.orchestrator.SetScanner(scannerOnce.client) scanCtx, scanCancel := context.WithTimeout(ctx, 30*time.Second) orch.orchestrator.ScanExistingRooms(scanCtx) scanCancel() } - wg.Wait() + registry.waitAll() + registry.cleanupLogs() logger.Info("all agents stopped") return nil }, @@ -195,6 +220,17 @@ func main() { } } +// scanOnce captures the first Matrix client for room scanning. +type scanOnce struct { + client *mautrix.Client +} + +func (s *scanOnce) set(c *mautrix.Client) { + if s.client == nil { + s.client = c + } +} + // orchHandle wraps a running orchestrator with its config for the launcher. type orchHandle struct { orchestrator *orchshell.Orchestrator diff --git a/cmd/launcher/registry.go b/cmd/launcher/registry.go new file mode 100644 index 0000000..35b1e83 --- /dev/null +++ b/cmd/launcher/registry.go @@ -0,0 +1,231 @@ +package main + +import ( + "context" + "log/slog" + "os" + "strings" + "sync" + "time" + + "github.com/enmanuel/agents/agents" + "github.com/enmanuel/agents/internal/config" + "github.com/enmanuel/agents/pkg/decision" + "github.com/enmanuel/agents/pkg/orchestration" + "github.com/enmanuel/agents/shell/bus" + agentlog "github.com/enmanuel/agents/shell/logger" +) + +// runningAgent holds a live agent and the metadata needed to recreate it. +type runningAgent struct { + agent *agents.Agent + cfg *config.AgentConfig + cfgPath string + logger *slog.Logger + logCleanup func() +} + +// launchDeps holds shared resources needed to start/reload agents. +type launchDeps struct { + agentBus *bus.Bus + orch *orchHandle + logDir string + logLevel slog.Level + parentCtx context.Context +} + +// agentRegistry tracks all running agents by ID, enabling individual hot-reload. +type agentRegistry struct { + mu sync.Mutex + agents map[string]*runningAgent + deps *launchDeps +} + +func newAgentRegistry(deps *launchDeps) *agentRegistry { + return &agentRegistry{ + agents: make(map[string]*runningAgent), + deps: deps, + } +} + +// register adds a running agent to the registry and starts its goroutine. +func (r *agentRegistry) register(ra *runningAgent) { + r.mu.Lock() + r.agents[ra.cfg.Agent.ID] = ra + r.mu.Unlock() + + go func() { + ra.logger.Info("agent running") + if err := ra.agent.Run(r.deps.parentCtx); err != nil { + ra.logger.Error("agent stopped with error", "err", err) + } + }() +} + +// stopAndWait stops a running agent and waits for it to finish. +// Caller must NOT hold r.mu. +func (r *agentRegistry) stopAndWait(id string) { + r.mu.Lock() + ra, ok := r.agents[id] + r.mu.Unlock() + if !ok { + return + } + + ra.agent.Stop() + select { + case <-ra.agent.Done(): + case <-time.After(10 * time.Second): + ra.logger.Warn("agent did not stop within 10s, forcing", "id", id) + } + + // Unsubscribe from bus so no stale channel remains. + r.deps.agentBus.Unsubscribe(bus.AgentID(id)) +} + +// reload stops an agent, re-reads its config, recreates it, and restarts it. +func (r *agentRegistry) reload(id string, rulesFor func(string, *slog.Logger) []decision.Rule) { + r.mu.Lock() + ra, ok := r.agents[id] + r.mu.Unlock() + if !ok { + slog.Warn("reload: agent not found", "id", id) + return + } + + cfgPath := ra.cfgPath + oldCleanup := ra.logCleanup + + ra.logger.Info("agent_reload_start", "id", id) + + // 1. Stop current instance and wait. + r.stopAndWait(id) + + // 2. Cleanup old log writer. + if oldCleanup != nil { + oldCleanup() + } + + // 3. Re-read config. + cfg, err := config.Load(cfgPath) + if err != nil { + slog.Error("reload: failed to load config", "path", cfgPath, "err", err) + return + } + if !cfg.Agent.Enabled { + slog.Info("reload: agent is disabled, not restarting", "id", id) + r.mu.Lock() + delete(r.agents, id) + r.mu.Unlock() + return + } + + // 4. New per-agent logger. + newLogger, newCleanup, aErr := agentlog.NewAgentLogger(agentlog.LoggerConfig{ + BaseDir: r.deps.logDir, + AgentID: cfg.Agent.ID, + Level: r.deps.logLevel, + }) + if aErr != nil { + newLogger = slog.Default().With("agent", cfg.Agent.ID) + newCleanup = func() {} + } + + // 5. Create new agent (validates config before discarding the old one). + rules := rulesFor(cfg.Agent.ID, newLogger) + newAgent, err := agents.New(cfg, rules, newLogger) + if err != nil { + newLogger.Error("reload: failed to create agent", "id", id, "err", err) + newCleanup() + return + } + + // 6. Wire bus and orchestration. + newAgent.SetBus(r.deps.agentBus) + if r.deps.orch != nil { + newAgent.SetInterceptor(r.deps.orch.orchestrator.Intercept) + newAgent.SetMembershipNotify(r.deps.orch.orchestrator.NotifyMembership) + r.deps.orch.orchestrator.RegisterParticipant(orchestration.ParticipantInfo{ + ID: cfg.Agent.ID, + MatrixUserID: cfg.Matrix.UserID, + Description: cfg.Agent.Description, + Capabilities: cfg.Agent.Tags, + }) + } + + newRA := &runningAgent{ + agent: newAgent, + cfg: cfg, + cfgPath: cfgPath, + logger: newLogger, + logCleanup: newCleanup, + } + + r.mu.Lock() + r.agents[id] = newRA + r.mu.Unlock() + + // 7. Start new goroutine. + go func() { + newLogger.Info("agent running") + if err := newAgent.Run(r.deps.parentCtx); err != nil { + newLogger.Error("agent stopped with error", "err", err) + } + }() + + newLogger.Info("agent_reloaded", "id", id) +} + +// reloadAll reloads every registered agent sequentially. +func (r *agentRegistry) reloadAll(rulesFor func(string, *slog.Logger) []decision.Rule) { + r.mu.Lock() + ids := make([]string, 0, len(r.agents)) + for id := range r.agents { + ids = append(ids, id) + } + r.mu.Unlock() + + for _, id := range ids { + r.reload(id, rulesFor) + } +} + +// waitAll blocks until all registered agents have stopped. +func (r *agentRegistry) waitAll() { + r.mu.Lock() + dones := make([]<-chan struct{}, 0, len(r.agents)) + for _, ra := range r.agents { + dones = append(dones, ra.agent.Done()) + } + r.mu.Unlock() + + for _, done := range dones { + <-done + } +} + +// cleanupLogs calls every agent's log cleanup function (called on launcher shutdown). +func (r *agentRegistry) cleanupLogs() { + r.mu.Lock() + defer r.mu.Unlock() + for _, ra := range r.agents { + if ra.logCleanup != nil { + ra.logCleanup() + } + } +} + +// readReloadTarget reads the given file and returns the trimmed content. +// Returns "" if the file doesn't exist, is empty, or equals "*" (meaning reload all). +func readReloadTarget(path string) string { + data, err := os.ReadFile(path) + if err != nil { + return "" + } + id := strings.TrimSpace(string(data)) + if id == "*" { + return "" + } + return id +} +