feat: hot-reload de agentes individuales via SIGHUP
Implementa el mecanismo de hot-reload descrito en el issue 0013: - agents/runtime.go: añade Agent.Stop() y Agent.Done() para ciclo de vida individual. Run() crea un contexto hijo cancelable y cierra el canal done al retornar. - cmd/launcher/registry.go (nuevo): agentRegistry rastrea agentes vivos por ID. Métodos: register, stopAndWait, reload, reloadAll, waitAll, cleanupLogs. reload() sigue el flujo completo: stop→wait→unsubscribe →reload config→recreate→rewire bus/orch→start nueva goroutine. - cmd/launcher/main.go: usa agentRegistry en lugar de sync.WaitGroup. Añade handler de SIGHUP en goroutine separada que lee run/reload.txt para determinar el agente objetivo (* o vacío = todos). Tras leer, borra run/reload.txt para no afectar el siguiente SIGHUP. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -62,6 +62,10 @@ type Agent struct {
|
||||
logger *slog.Logger
|
||||
cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown
|
||||
|
||||
// Lifecycle — cancel stops this agent individually; done is closed when Run returns.
|
||||
cancel context.CancelFunc
|
||||
done chan struct{}
|
||||
|
||||
// Access control
|
||||
acl acl.ACL
|
||||
|
||||
@@ -268,6 +272,7 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (*
|
||||
toolReg: toolReg,
|
||||
logger: logger,
|
||||
cryptoStore: cryptoStore,
|
||||
done: make(chan struct{}),
|
||||
commands: make(map[string]CommandHandler),
|
||||
cmdAliases: command.BuiltinNames(),
|
||||
startTime: time.Now(),
|
||||
@@ -363,8 +368,24 @@ func (a *Agent) RawMatrixClient() *mautrix.Client {
|
||||
return a.matrix.Raw()
|
||||
}
|
||||
|
||||
// Stop cancels this agent's individual context, causing Run to return.
|
||||
// Safe to call multiple times.
|
||||
func (a *Agent) Stop() {
|
||||
if a.cancel != nil {
|
||||
a.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
// Done returns a channel that is closed when Run has returned.
|
||||
func (a *Agent) Done() <-chan struct{} {
|
||||
return a.done
|
||||
}
|
||||
|
||||
// Run starts the agent sync loop. Blocks until ctx is cancelled.
|
||||
func (a *Agent) Run(ctx context.Context) error {
|
||||
ctx, a.cancel = context.WithCancel(ctx)
|
||||
defer close(a.done)
|
||||
|
||||
if a.cryptoStore != nil {
|
||||
defer a.cryptoStore.Close()
|
||||
}
|
||||
|
||||
+64
-28
@@ -12,7 +12,6 @@ import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@@ -72,13 +71,7 @@ func main() {
|
||||
logger.Warn("could not create file logger, falling back to stdout", "err", err)
|
||||
launcherCleanup = func() {}
|
||||
}
|
||||
var cleanups []func()
|
||||
cleanups = append(cleanups, launcherCleanup)
|
||||
defer func() {
|
||||
for _, fn := range cleanups {
|
||||
fn()
|
||||
}
|
||||
}()
|
||||
defer launcherCleanup()
|
||||
|
||||
if len(configPaths) == 0 {
|
||||
logger.Warn("no agent configs found — nothing to start")
|
||||
@@ -96,15 +89,49 @@ func main() {
|
||||
if err != nil {
|
||||
// Non-fatal: orchestration is optional
|
||||
logger.Warn("orchestrator not started", "err", err)
|
||||
} else {
|
||||
} else if orch != nil {
|
||||
logger.Info("orchestrator initialized")
|
||||
}
|
||||
|
||||
// ── Start normal agents ──
|
||||
var wg sync.WaitGroup
|
||||
var scannerOnce sync.Once
|
||||
var scanner *mautrix.Client
|
||||
// ── Shared dependencies for agent registry ──
|
||||
deps := &launchDeps{
|
||||
agentBus: agentBus,
|
||||
orch: orch,
|
||||
logDir: logDir,
|
||||
logLevel: lvl,
|
||||
parentCtx: ctx,
|
||||
}
|
||||
registry := newAgentRegistry(deps)
|
||||
|
||||
// ── SIGHUP: hot-reload individual agent or all agents ──
|
||||
sighup := make(chan os.Signal, 1)
|
||||
signal.Notify(sighup, syscall.SIGHUP)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case _, ok := <-sighup:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
id := readReloadTarget("run/reload.txt")
|
||||
// Remove the target file after reading so it doesn't
|
||||
// affect the next SIGHUP.
|
||||
_ = os.Remove("run/reload.txt")
|
||||
if id == "" {
|
||||
logger.Info("sighup: reloading all agents")
|
||||
registry.reloadAll(rulesFor)
|
||||
} else {
|
||||
logger.Info("sighup: reloading agent", "id", id)
|
||||
registry.reload(id, rulesFor)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// ── Start normal agents ──
|
||||
var scannerOnce scanOnce
|
||||
for _, path := range configPaths {
|
||||
path := path
|
||||
cfg, err := config.Load(path)
|
||||
@@ -130,11 +157,11 @@ func main() {
|
||||
agentLogger = logger.With("agent", cfg.Agent.ID)
|
||||
agentCleanup = func() {}
|
||||
}
|
||||
cleanups = append(cleanups, agentCleanup)
|
||||
|
||||
a, err := agents.New(cfg, rules, agentLogger)
|
||||
if err != nil {
|
||||
logger.Error("failed to create agent", "id", cfg.Agent.ID, "err", err)
|
||||
agentCleanup()
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -154,30 +181,28 @@ func main() {
|
||||
})
|
||||
|
||||
// Grab the first available Matrix client for room scanning
|
||||
scannerOnce.Do(func() {
|
||||
scanner = a.RawMatrixClient()
|
||||
})
|
||||
scannerOnce.set(a.RawMatrixClient())
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
agentLogger.Info("agent running")
|
||||
if err := a.Run(ctx); err != nil {
|
||||
agentLogger.Error("agent stopped with error", "err", err)
|
||||
}
|
||||
}()
|
||||
registry.register(&runningAgent{
|
||||
agent: a,
|
||||
cfg: cfg,
|
||||
cfgPath: path,
|
||||
logger: agentLogger,
|
||||
logCleanup: agentCleanup,
|
||||
})
|
||||
}
|
||||
|
||||
// ── Startup room scan (after all participants are registered) ──
|
||||
if orch != nil && scanner != nil {
|
||||
orch.orchestrator.SetScanner(scanner)
|
||||
if orch != nil && scannerOnce.client != nil {
|
||||
orch.orchestrator.SetScanner(scannerOnce.client)
|
||||
scanCtx, scanCancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
orch.orchestrator.ScanExistingRooms(scanCtx)
|
||||
scanCancel()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
registry.waitAll()
|
||||
registry.cleanupLogs()
|
||||
logger.Info("all agents stopped")
|
||||
return nil
|
||||
},
|
||||
@@ -195,6 +220,17 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
// scanOnce captures the first Matrix client for room scanning.
|
||||
type scanOnce struct {
|
||||
client *mautrix.Client
|
||||
}
|
||||
|
||||
func (s *scanOnce) set(c *mautrix.Client) {
|
||||
if s.client == nil {
|
||||
s.client = c
|
||||
}
|
||||
}
|
||||
|
||||
// orchHandle wraps a running orchestrator with its config for the launcher.
|
||||
type orchHandle struct {
|
||||
orchestrator *orchshell.Orchestrator
|
||||
|
||||
@@ -0,0 +1,231 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/enmanuel/agents/agents"
|
||||
"github.com/enmanuel/agents/internal/config"
|
||||
"github.com/enmanuel/agents/pkg/decision"
|
||||
"github.com/enmanuel/agents/pkg/orchestration"
|
||||
"github.com/enmanuel/agents/shell/bus"
|
||||
agentlog "github.com/enmanuel/agents/shell/logger"
|
||||
)
|
||||
|
||||
// runningAgent holds a live agent and the metadata needed to recreate it.
|
||||
type runningAgent struct {
|
||||
agent *agents.Agent
|
||||
cfg *config.AgentConfig
|
||||
cfgPath string
|
||||
logger *slog.Logger
|
||||
logCleanup func()
|
||||
}
|
||||
|
||||
// launchDeps holds shared resources needed to start/reload agents.
|
||||
type launchDeps struct {
|
||||
agentBus *bus.Bus
|
||||
orch *orchHandle
|
||||
logDir string
|
||||
logLevel slog.Level
|
||||
parentCtx context.Context
|
||||
}
|
||||
|
||||
// agentRegistry tracks all running agents by ID, enabling individual hot-reload.
|
||||
type agentRegistry struct {
|
||||
mu sync.Mutex
|
||||
agents map[string]*runningAgent
|
||||
deps *launchDeps
|
||||
}
|
||||
|
||||
func newAgentRegistry(deps *launchDeps) *agentRegistry {
|
||||
return &agentRegistry{
|
||||
agents: make(map[string]*runningAgent),
|
||||
deps: deps,
|
||||
}
|
||||
}
|
||||
|
||||
// register adds a running agent to the registry and starts its goroutine.
|
||||
func (r *agentRegistry) register(ra *runningAgent) {
|
||||
r.mu.Lock()
|
||||
r.agents[ra.cfg.Agent.ID] = ra
|
||||
r.mu.Unlock()
|
||||
|
||||
go func() {
|
||||
ra.logger.Info("agent running")
|
||||
if err := ra.agent.Run(r.deps.parentCtx); err != nil {
|
||||
ra.logger.Error("agent stopped with error", "err", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// stopAndWait stops a running agent and waits for it to finish.
|
||||
// Caller must NOT hold r.mu.
|
||||
func (r *agentRegistry) stopAndWait(id string) {
|
||||
r.mu.Lock()
|
||||
ra, ok := r.agents[id]
|
||||
r.mu.Unlock()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
ra.agent.Stop()
|
||||
select {
|
||||
case <-ra.agent.Done():
|
||||
case <-time.After(10 * time.Second):
|
||||
ra.logger.Warn("agent did not stop within 10s, forcing", "id", id)
|
||||
}
|
||||
|
||||
// Unsubscribe from bus so no stale channel remains.
|
||||
r.deps.agentBus.Unsubscribe(bus.AgentID(id))
|
||||
}
|
||||
|
||||
// reload stops an agent, re-reads its config, recreates it, and restarts it.
|
||||
func (r *agentRegistry) reload(id string, rulesFor func(string, *slog.Logger) []decision.Rule) {
|
||||
r.mu.Lock()
|
||||
ra, ok := r.agents[id]
|
||||
r.mu.Unlock()
|
||||
if !ok {
|
||||
slog.Warn("reload: agent not found", "id", id)
|
||||
return
|
||||
}
|
||||
|
||||
cfgPath := ra.cfgPath
|
||||
oldCleanup := ra.logCleanup
|
||||
|
||||
ra.logger.Info("agent_reload_start", "id", id)
|
||||
|
||||
// 1. Stop current instance and wait.
|
||||
r.stopAndWait(id)
|
||||
|
||||
// 2. Cleanup old log writer.
|
||||
if oldCleanup != nil {
|
||||
oldCleanup()
|
||||
}
|
||||
|
||||
// 3. Re-read config.
|
||||
cfg, err := config.Load(cfgPath)
|
||||
if err != nil {
|
||||
slog.Error("reload: failed to load config", "path", cfgPath, "err", err)
|
||||
return
|
||||
}
|
||||
if !cfg.Agent.Enabled {
|
||||
slog.Info("reload: agent is disabled, not restarting", "id", id)
|
||||
r.mu.Lock()
|
||||
delete(r.agents, id)
|
||||
r.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// 4. New per-agent logger.
|
||||
newLogger, newCleanup, aErr := agentlog.NewAgentLogger(agentlog.LoggerConfig{
|
||||
BaseDir: r.deps.logDir,
|
||||
AgentID: cfg.Agent.ID,
|
||||
Level: r.deps.logLevel,
|
||||
})
|
||||
if aErr != nil {
|
||||
newLogger = slog.Default().With("agent", cfg.Agent.ID)
|
||||
newCleanup = func() {}
|
||||
}
|
||||
|
||||
// 5. Create new agent (validates config before discarding the old one).
|
||||
rules := rulesFor(cfg.Agent.ID, newLogger)
|
||||
newAgent, err := agents.New(cfg, rules, newLogger)
|
||||
if err != nil {
|
||||
newLogger.Error("reload: failed to create agent", "id", id, "err", err)
|
||||
newCleanup()
|
||||
return
|
||||
}
|
||||
|
||||
// 6. Wire bus and orchestration.
|
||||
newAgent.SetBus(r.deps.agentBus)
|
||||
if r.deps.orch != nil {
|
||||
newAgent.SetInterceptor(r.deps.orch.orchestrator.Intercept)
|
||||
newAgent.SetMembershipNotify(r.deps.orch.orchestrator.NotifyMembership)
|
||||
r.deps.orch.orchestrator.RegisterParticipant(orchestration.ParticipantInfo{
|
||||
ID: cfg.Agent.ID,
|
||||
MatrixUserID: cfg.Matrix.UserID,
|
||||
Description: cfg.Agent.Description,
|
||||
Capabilities: cfg.Agent.Tags,
|
||||
})
|
||||
}
|
||||
|
||||
newRA := &runningAgent{
|
||||
agent: newAgent,
|
||||
cfg: cfg,
|
||||
cfgPath: cfgPath,
|
||||
logger: newLogger,
|
||||
logCleanup: newCleanup,
|
||||
}
|
||||
|
||||
r.mu.Lock()
|
||||
r.agents[id] = newRA
|
||||
r.mu.Unlock()
|
||||
|
||||
// 7. Start new goroutine.
|
||||
go func() {
|
||||
newLogger.Info("agent running")
|
||||
if err := newAgent.Run(r.deps.parentCtx); err != nil {
|
||||
newLogger.Error("agent stopped with error", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
newLogger.Info("agent_reloaded", "id", id)
|
||||
}
|
||||
|
||||
// reloadAll reloads every registered agent sequentially.
|
||||
func (r *agentRegistry) reloadAll(rulesFor func(string, *slog.Logger) []decision.Rule) {
|
||||
r.mu.Lock()
|
||||
ids := make([]string, 0, len(r.agents))
|
||||
for id := range r.agents {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
r.mu.Unlock()
|
||||
|
||||
for _, id := range ids {
|
||||
r.reload(id, rulesFor)
|
||||
}
|
||||
}
|
||||
|
||||
// waitAll blocks until all registered agents have stopped.
|
||||
func (r *agentRegistry) waitAll() {
|
||||
r.mu.Lock()
|
||||
dones := make([]<-chan struct{}, 0, len(r.agents))
|
||||
for _, ra := range r.agents {
|
||||
dones = append(dones, ra.agent.Done())
|
||||
}
|
||||
r.mu.Unlock()
|
||||
|
||||
for _, done := range dones {
|
||||
<-done
|
||||
}
|
||||
}
|
||||
|
||||
// cleanupLogs calls every agent's log cleanup function (called on launcher shutdown).
|
||||
func (r *agentRegistry) cleanupLogs() {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
for _, ra := range r.agents {
|
||||
if ra.logCleanup != nil {
|
||||
ra.logCleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// readReloadTarget reads the given file and returns the trimmed content.
|
||||
// Returns "" if the file doesn't exist, is empty, or equals "*" (meaning reload all).
|
||||
func readReloadTarget(path string) string {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
id := strings.TrimSpace(string(data))
|
||||
if id == "*" {
|
||||
return ""
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user