From 069f8758b13f665984580abe87cbcaa1131e5f2c Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Sun, 8 Mar 2026 18:41:32 +0000 Subject: [PATCH 1/6] feat: hot-reload de agentes individuales via SIGHUP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implementa el mecanismo de hot-reload descrito en el issue 0013: - agents/runtime.go: añade Agent.Stop() y Agent.Done() para ciclo de vida individual. Run() crea un contexto hijo cancelable y cierra el canal done al retornar. - cmd/launcher/registry.go (nuevo): agentRegistry rastrea agentes vivos por ID. Métodos: register, stopAndWait, reload, reloadAll, waitAll, cleanupLogs. reload() sigue el flujo completo: stop→wait→unsubscribe →reload config→recreate→rewire bus/orch→start nueva goroutine. - cmd/launcher/main.go: usa agentRegistry en lugar de sync.WaitGroup. Añade handler de SIGHUP en goroutine separada que lee run/reload.txt para determinar el agente objetivo (* o vacío = todos). Tras leer, borra run/reload.txt para no afectar el siguiente SIGHUP. Co-Authored-By: Claude Sonnet 4.6 --- agents/runtime.go | 21 ++++ cmd/launcher/main.go | 92 +++++++++++----- cmd/launcher/registry.go | 231 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 316 insertions(+), 28 deletions(-) create mode 100644 cmd/launcher/registry.go diff --git a/agents/runtime.go b/agents/runtime.go index 6b07f28..19ab667 100644 --- a/agents/runtime.go +++ b/agents/runtime.go @@ -62,6 +62,10 @@ type Agent struct { logger *slog.Logger cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown + // Lifecycle — cancel stops this agent individually; done is closed when Run returns. + cancel context.CancelFunc + done chan struct{} + // Access control acl acl.ACL @@ -268,6 +272,7 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (* toolReg: toolReg, logger: logger, cryptoStore: cryptoStore, + done: make(chan struct{}), commands: make(map[string]CommandHandler), cmdAliases: command.BuiltinNames(), startTime: time.Now(), @@ -363,8 +368,24 @@ func (a *Agent) RawMatrixClient() *mautrix.Client { return a.matrix.Raw() } +// Stop cancels this agent's individual context, causing Run to return. +// Safe to call multiple times. +func (a *Agent) Stop() { + if a.cancel != nil { + a.cancel() + } +} + +// Done returns a channel that is closed when Run has returned. +func (a *Agent) Done() <-chan struct{} { + return a.done +} + // Run starts the agent sync loop. Blocks until ctx is cancelled. func (a *Agent) Run(ctx context.Context) error { + ctx, a.cancel = context.WithCancel(ctx) + defer close(a.done) + if a.cryptoStore != nil { defer a.cryptoStore.Close() } diff --git a/cmd/launcher/main.go b/cmd/launcher/main.go index 371e66b..2a85caf 100644 --- a/cmd/launcher/main.go +++ b/cmd/launcher/main.go @@ -12,7 +12,6 @@ import ( "os" "os/signal" "path/filepath" - "sync" "syscall" "time" @@ -72,13 +71,7 @@ func main() { logger.Warn("could not create file logger, falling back to stdout", "err", err) launcherCleanup = func() {} } - var cleanups []func() - cleanups = append(cleanups, launcherCleanup) - defer func() { - for _, fn := range cleanups { - fn() - } - }() + defer launcherCleanup() if len(configPaths) == 0 { logger.Warn("no agent configs found — nothing to start") @@ -96,15 +89,49 @@ func main() { if err != nil { // Non-fatal: orchestration is optional logger.Warn("orchestrator not started", "err", err) - } else { + } else if orch != nil { logger.Info("orchestrator initialized") } - // ── Start normal agents ── - var wg sync.WaitGroup - var scannerOnce sync.Once - var scanner *mautrix.Client + // ── Shared dependencies for agent registry ── + deps := &launchDeps{ + agentBus: agentBus, + orch: orch, + logDir: logDir, + logLevel: lvl, + parentCtx: ctx, + } + registry := newAgentRegistry(deps) + // ── SIGHUP: hot-reload individual agent or all agents ── + sighup := make(chan os.Signal, 1) + signal.Notify(sighup, syscall.SIGHUP) + go func() { + for { + select { + case <-ctx.Done(): + return + case _, ok := <-sighup: + if !ok { + return + } + id := readReloadTarget("run/reload.txt") + // Remove the target file after reading so it doesn't + // affect the next SIGHUP. + _ = os.Remove("run/reload.txt") + if id == "" { + logger.Info("sighup: reloading all agents") + registry.reloadAll(rulesFor) + } else { + logger.Info("sighup: reloading agent", "id", id) + registry.reload(id, rulesFor) + } + } + } + }() + + // ── Start normal agents ── + var scannerOnce scanOnce for _, path := range configPaths { path := path cfg, err := config.Load(path) @@ -130,11 +157,11 @@ func main() { agentLogger = logger.With("agent", cfg.Agent.ID) agentCleanup = func() {} } - cleanups = append(cleanups, agentCleanup) a, err := agents.New(cfg, rules, agentLogger) if err != nil { logger.Error("failed to create agent", "id", cfg.Agent.ID, "err", err) + agentCleanup() continue } @@ -154,30 +181,28 @@ func main() { }) // Grab the first available Matrix client for room scanning - scannerOnce.Do(func() { - scanner = a.RawMatrixClient() - }) + scannerOnce.set(a.RawMatrixClient()) } - wg.Add(1) - go func() { - defer wg.Done() - agentLogger.Info("agent running") - if err := a.Run(ctx); err != nil { - agentLogger.Error("agent stopped with error", "err", err) - } - }() + registry.register(&runningAgent{ + agent: a, + cfg: cfg, + cfgPath: path, + logger: agentLogger, + logCleanup: agentCleanup, + }) } // ── Startup room scan (after all participants are registered) ── - if orch != nil && scanner != nil { - orch.orchestrator.SetScanner(scanner) + if orch != nil && scannerOnce.client != nil { + orch.orchestrator.SetScanner(scannerOnce.client) scanCtx, scanCancel := context.WithTimeout(ctx, 30*time.Second) orch.orchestrator.ScanExistingRooms(scanCtx) scanCancel() } - wg.Wait() + registry.waitAll() + registry.cleanupLogs() logger.Info("all agents stopped") return nil }, @@ -195,6 +220,17 @@ func main() { } } +// scanOnce captures the first Matrix client for room scanning. +type scanOnce struct { + client *mautrix.Client +} + +func (s *scanOnce) set(c *mautrix.Client) { + if s.client == nil { + s.client = c + } +} + // orchHandle wraps a running orchestrator with its config for the launcher. type orchHandle struct { orchestrator *orchshell.Orchestrator diff --git a/cmd/launcher/registry.go b/cmd/launcher/registry.go new file mode 100644 index 0000000..35b1e83 --- /dev/null +++ b/cmd/launcher/registry.go @@ -0,0 +1,231 @@ +package main + +import ( + "context" + "log/slog" + "os" + "strings" + "sync" + "time" + + "github.com/enmanuel/agents/agents" + "github.com/enmanuel/agents/internal/config" + "github.com/enmanuel/agents/pkg/decision" + "github.com/enmanuel/agents/pkg/orchestration" + "github.com/enmanuel/agents/shell/bus" + agentlog "github.com/enmanuel/agents/shell/logger" +) + +// runningAgent holds a live agent and the metadata needed to recreate it. +type runningAgent struct { + agent *agents.Agent + cfg *config.AgentConfig + cfgPath string + logger *slog.Logger + logCleanup func() +} + +// launchDeps holds shared resources needed to start/reload agents. +type launchDeps struct { + agentBus *bus.Bus + orch *orchHandle + logDir string + logLevel slog.Level + parentCtx context.Context +} + +// agentRegistry tracks all running agents by ID, enabling individual hot-reload. +type agentRegistry struct { + mu sync.Mutex + agents map[string]*runningAgent + deps *launchDeps +} + +func newAgentRegistry(deps *launchDeps) *agentRegistry { + return &agentRegistry{ + agents: make(map[string]*runningAgent), + deps: deps, + } +} + +// register adds a running agent to the registry and starts its goroutine. +func (r *agentRegistry) register(ra *runningAgent) { + r.mu.Lock() + r.agents[ra.cfg.Agent.ID] = ra + r.mu.Unlock() + + go func() { + ra.logger.Info("agent running") + if err := ra.agent.Run(r.deps.parentCtx); err != nil { + ra.logger.Error("agent stopped with error", "err", err) + } + }() +} + +// stopAndWait stops a running agent and waits for it to finish. +// Caller must NOT hold r.mu. +func (r *agentRegistry) stopAndWait(id string) { + r.mu.Lock() + ra, ok := r.agents[id] + r.mu.Unlock() + if !ok { + return + } + + ra.agent.Stop() + select { + case <-ra.agent.Done(): + case <-time.After(10 * time.Second): + ra.logger.Warn("agent did not stop within 10s, forcing", "id", id) + } + + // Unsubscribe from bus so no stale channel remains. + r.deps.agentBus.Unsubscribe(bus.AgentID(id)) +} + +// reload stops an agent, re-reads its config, recreates it, and restarts it. +func (r *agentRegistry) reload(id string, rulesFor func(string, *slog.Logger) []decision.Rule) { + r.mu.Lock() + ra, ok := r.agents[id] + r.mu.Unlock() + if !ok { + slog.Warn("reload: agent not found", "id", id) + return + } + + cfgPath := ra.cfgPath + oldCleanup := ra.logCleanup + + ra.logger.Info("agent_reload_start", "id", id) + + // 1. Stop current instance and wait. + r.stopAndWait(id) + + // 2. Cleanup old log writer. + if oldCleanup != nil { + oldCleanup() + } + + // 3. Re-read config. + cfg, err := config.Load(cfgPath) + if err != nil { + slog.Error("reload: failed to load config", "path", cfgPath, "err", err) + return + } + if !cfg.Agent.Enabled { + slog.Info("reload: agent is disabled, not restarting", "id", id) + r.mu.Lock() + delete(r.agents, id) + r.mu.Unlock() + return + } + + // 4. New per-agent logger. + newLogger, newCleanup, aErr := agentlog.NewAgentLogger(agentlog.LoggerConfig{ + BaseDir: r.deps.logDir, + AgentID: cfg.Agent.ID, + Level: r.deps.logLevel, + }) + if aErr != nil { + newLogger = slog.Default().With("agent", cfg.Agent.ID) + newCleanup = func() {} + } + + // 5. Create new agent (validates config before discarding the old one). + rules := rulesFor(cfg.Agent.ID, newLogger) + newAgent, err := agents.New(cfg, rules, newLogger) + if err != nil { + newLogger.Error("reload: failed to create agent", "id", id, "err", err) + newCleanup() + return + } + + // 6. Wire bus and orchestration. + newAgent.SetBus(r.deps.agentBus) + if r.deps.orch != nil { + newAgent.SetInterceptor(r.deps.orch.orchestrator.Intercept) + newAgent.SetMembershipNotify(r.deps.orch.orchestrator.NotifyMembership) + r.deps.orch.orchestrator.RegisterParticipant(orchestration.ParticipantInfo{ + ID: cfg.Agent.ID, + MatrixUserID: cfg.Matrix.UserID, + Description: cfg.Agent.Description, + Capabilities: cfg.Agent.Tags, + }) + } + + newRA := &runningAgent{ + agent: newAgent, + cfg: cfg, + cfgPath: cfgPath, + logger: newLogger, + logCleanup: newCleanup, + } + + r.mu.Lock() + r.agents[id] = newRA + r.mu.Unlock() + + // 7. Start new goroutine. + go func() { + newLogger.Info("agent running") + if err := newAgent.Run(r.deps.parentCtx); err != nil { + newLogger.Error("agent stopped with error", "err", err) + } + }() + + newLogger.Info("agent_reloaded", "id", id) +} + +// reloadAll reloads every registered agent sequentially. +func (r *agentRegistry) reloadAll(rulesFor func(string, *slog.Logger) []decision.Rule) { + r.mu.Lock() + ids := make([]string, 0, len(r.agents)) + for id := range r.agents { + ids = append(ids, id) + } + r.mu.Unlock() + + for _, id := range ids { + r.reload(id, rulesFor) + } +} + +// waitAll blocks until all registered agents have stopped. +func (r *agentRegistry) waitAll() { + r.mu.Lock() + dones := make([]<-chan struct{}, 0, len(r.agents)) + for _, ra := range r.agents { + dones = append(dones, ra.agent.Done()) + } + r.mu.Unlock() + + for _, done := range dones { + <-done + } +} + +// cleanupLogs calls every agent's log cleanup function (called on launcher shutdown). +func (r *agentRegistry) cleanupLogs() { + r.mu.Lock() + defer r.mu.Unlock() + for _, ra := range r.agents { + if ra.logCleanup != nil { + ra.logCleanup() + } + } +} + +// readReloadTarget reads the given file and returns the trimmed content. +// Returns "" if the file doesn't exist, is empty, or equals "*" (meaning reload all). +func readReloadTarget(path string) string { + data, err := os.ReadFile(path) + if err != nil { + return "" + } + id := strings.TrimSpace(string(data)) + if id == "*" { + return "" + } + return id +} + From 0b7451336917fded4fdbda64ee3ec8b03d430ec4 Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Sun, 8 Mar 2026 18:42:05 +0000 Subject: [PATCH 2/6] feat: TUI usa SIGHUP para hot-reload de agente individual MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit restartAgent() ahora escribe run/reload.txt con el agentID y envía SIGHUP al launcher en lugar de matar y reiniciar el proceso completo. Si el launcher no está corriendo, conserva el comportamiento anterior (stop + start completo). Co-Authored-By: Claude Sonnet 4.6 --- shell/tui/adapter.go | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/shell/tui/adapter.go b/shell/tui/adapter.go index fd7248b..72264e3 100644 --- a/shell/tui/adapter.go +++ b/shell/tui/adapter.go @@ -4,8 +4,10 @@ package tui import ( "fmt" + "os" "os/exec" "strings" + "syscall" "time" tea "github.com/charmbracelet/bubbletea" @@ -140,13 +142,28 @@ func (a *Adapter) disableAgent(id string) tea.Cmd { func (a *Adapter) restartAgent(id string) tea.Cmd { return func() tea.Msg { - _ = a.mgr.StopUnified() - time.Sleep(500 * time.Millisecond) - err := a.mgr.StartUnified() - if err == nil { + pid := a.mgr.UnifiedPID() + if pid <= 0 { + // Launcher not running — fall back to full restart. + _ = a.mgr.StopUnified() time.Sleep(500 * time.Millisecond) + err := a.mgr.StartUnified() + if err == nil { + time.Sleep(500 * time.Millisecond) + } + return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: err} } - return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: err} + + // Launcher is running — write target and send SIGHUP for hot-reload. + if id != "" { + _ = os.WriteFile("run/reload.txt", []byte(id), 0o644) + } + err := syscall.Kill(pid, syscall.SIGHUP) + if err != nil { + return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: err} + } + time.Sleep(1 * time.Second) + return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: nil} } } From 8ec9f39b6d6d41c14c01cd31f75e14df063fc1fe Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Sun, 8 Mar 2026 18:42:38 +0000 Subject: [PATCH 3/6] =?UTF-8?q?feat:=20agentctl=20reload=20=E2=80=94=20sub?= =?UTF-8?q?comando=20de=20hot-reload=20via=20SIGHUP?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Nuevo subcomando: agentctl reload [agent-id] - Sin argumento: elimina run/reload.txt y envía SIGHUP → todos los agentes se recargan. - Con agent-id: escribe run/reload.txt con el ID y envía SIGHUP → solo ese agente se recarga. Si el launcher no está corriendo, muestra error claro. Co-Authored-By: Claude Sonnet 4.6 --- cmd/agentctl/main.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/cmd/agentctl/main.go b/cmd/agentctl/main.go index cd3da10..1385f71 100644 --- a/cmd/agentctl/main.go +++ b/cmd/agentctl/main.go @@ -14,6 +14,7 @@ import ( "fmt" "os" "strings" + "syscall" "github.com/spf13/cobra" @@ -47,6 +48,7 @@ func main() { listCmd(mgr), startCmd(mgr, &binPath), stopCmd(mgr), + reloadCmd(mgr), removeCmd(mgr), avatarCmd(), displaynameCmd(), @@ -171,6 +173,48 @@ func stopCmd(mgr *process.Manager) *cobra.Command { } } +// ── reload ──────────────────────────────────────────────────────────────── + +func reloadCmd(mgr *process.Manager) *cobra.Command { + return &cobra.Command{ + Use: "reload [agent-id]", + Short: "Hot-reload an agent (or all agents) without stopping the launcher", + Long: `Sends SIGHUP to the running launcher, which triggers a hot-reload. +If an agent-id is given, only that agent is reloaded. +If no agent-id is given, all agents are reloaded. + +The launcher must be running. Use 'agentctl start' first if needed.`, + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + pid := mgr.UnifiedPID() + if pid <= 0 { + return fmt.Errorf("launcher is not running") + } + + target := "" + if len(args) == 1 { + target = args[0] + } + + if target != "" { + if err := os.WriteFile("run/reload.txt", []byte(target), 0o644); err != nil { + return fmt.Errorf("write reload target: %w", err) + } + fmt.Printf("reload %-20s sending SIGHUP to PID %d\n", target, pid) + } else { + // Remove any stale reload.txt so SIGHUP reloads all agents. + _ = os.Remove("run/reload.txt") + fmt.Printf("reload %-20s sending SIGHUP to PID %d\n", "(all)", pid) + } + + if err := syscall.Kill(pid, syscall.SIGHUP); err != nil { + return fmt.Errorf("kill -HUP %d: %w", pid, err) + } + return nil + }, + } +} + // ── remove ──────────────────────────────────────────────────────────────── func removeCmd(mgr *process.Manager) *cobra.Command { From f95370de803e13fdb6307f19c1f30dba608bb142 Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Sun, 8 Mar 2026 18:43:46 +0000 Subject: [PATCH 4/6] test: tests para hot-reload (bus, registry, ciclo de vida del agente) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - shell/bus/bus_test.go: tests de Subscribe/Send/Unsubscribe incluyendo idempotencia, canal cerrado tras unsubscribe y resubscribe posterior. - cmd/launcher/registry_test.go: tests para readReloadTarget (archivo ausente, vacío, '*', agentID, whitespace). - agents/lifecycle_test.go: tests para Agent.Stop()/Done() verificando que Stop() desbloquea Run y que es seguro llamarlo múltiples veces o con cancel nil. Co-Authored-By: Claude Sonnet 4.6 --- agents/lifecycle_test.go | 64 ++++++++++++++++++++++++ cmd/launcher/registry_test.go | 58 ++++++++++++++++++++++ shell/bus/bus_test.go | 91 +++++++++++++++++++++++++++++++++++ 3 files changed, 213 insertions(+) create mode 100644 agents/lifecycle_test.go create mode 100644 cmd/launcher/registry_test.go create mode 100644 shell/bus/bus_test.go diff --git a/agents/lifecycle_test.go b/agents/lifecycle_test.go new file mode 100644 index 0000000..93c71b2 --- /dev/null +++ b/agents/lifecycle_test.go @@ -0,0 +1,64 @@ +package agents + +import ( + "context" + "testing" + "time" +) + +// TestAgentStopAndDone verifies that Stop() cancels Run and Done() closes. +// Uses a minimal Agent (no Matrix, no LLM) via direct struct init so the test +// doesn't require network or external dependencies. +func TestAgentStopAndDone(t *testing.T) { + a := &Agent{ + done: make(chan struct{}), + } + + // Simulate Run: create the cancel, then immediately block on ctx. + ctx, cancel := context.WithCancel(context.Background()) + a.cancel = cancel + + started := make(chan struct{}) + go func() { + close(started) + // Mimic what Run does: block on ctx, then close done. + <-ctx.Done() + close(a.done) + }() + + <-started + + // Stop must unblock the goroutine above. + a.Stop() + + select { + case <-a.Done(): + // ok + case <-time.After(2 * time.Second): + t.Fatal("Done() did not close within 2s after Stop()") + } +} + +// TestAgentStopIdempotent verifies that calling Stop() multiple times is safe. +func TestAgentStopIdempotent(t *testing.T) { + a := &Agent{ + done: make(chan struct{}), + } + _, cancel := context.WithCancel(context.Background()) + a.cancel = cancel + defer cancel() + + // Should not panic when called multiple times. + a.Stop() + a.Stop() + a.Stop() +} + +// TestAgentStopNilCancel verifies Stop() is safe when cancel is nil. +func TestAgentStopNilCancel(t *testing.T) { + a := &Agent{ + done: make(chan struct{}), + } + // cancel is nil — must not panic. + a.Stop() +} diff --git a/cmd/launcher/registry_test.go b/cmd/launcher/registry_test.go new file mode 100644 index 0000000..83a6131 --- /dev/null +++ b/cmd/launcher/registry_test.go @@ -0,0 +1,58 @@ +package main + +import ( + "os" + "path/filepath" + "testing" +) + +func TestReadReloadTarget_missing(t *testing.T) { + got := readReloadTarget(filepath.Join(t.TempDir(), "reload.txt")) + if got != "" { + t.Fatalf("expected empty string for missing file, got %q", got) + } +} + +func TestReadReloadTarget_empty(t *testing.T) { + f := filepath.Join(t.TempDir(), "reload.txt") + if err := os.WriteFile(f, []byte(""), 0o644); err != nil { + t.Fatal(err) + } + got := readReloadTarget(f) + if got != "" { + t.Fatalf("expected empty string for empty file, got %q", got) + } +} + +func TestReadReloadTarget_star(t *testing.T) { + f := filepath.Join(t.TempDir(), "reload.txt") + if err := os.WriteFile(f, []byte("*\n"), 0o644); err != nil { + t.Fatal(err) + } + got := readReloadTarget(f) + if got != "" { + t.Fatalf("expected empty string for '*', got %q", got) + } +} + +func TestReadReloadTarget_agentID(t *testing.T) { + f := filepath.Join(t.TempDir(), "reload.txt") + if err := os.WriteFile(f, []byte("assistant-bot\n"), 0o644); err != nil { + t.Fatal(err) + } + got := readReloadTarget(f) + if got != "assistant-bot" { + t.Fatalf("expected 'assistant-bot', got %q", got) + } +} + +func TestReadReloadTarget_whitespace(t *testing.T) { + f := filepath.Join(t.TempDir(), "reload.txt") + if err := os.WriteFile(f, []byte(" asistente-2 \n"), 0o644); err != nil { + t.Fatal(err) + } + got := readReloadTarget(f) + if got != "asistente-2" { + t.Fatalf("expected 'asistente-2', got %q", got) + } +} diff --git a/shell/bus/bus_test.go b/shell/bus/bus_test.go new file mode 100644 index 0000000..5573f41 --- /dev/null +++ b/shell/bus/bus_test.go @@ -0,0 +1,91 @@ +package bus_test + +import ( + "log/slog" + "testing" + + "github.com/enmanuel/agents/shell/bus" +) + +func newBus() *bus.Bus { + return bus.New(slog.Default()) +} + +func TestSubscribeAndSend(t *testing.T) { + b := newBus() + ch := b.Subscribe("agent-a") + + msg := bus.AgentMessage{From: "orch", To: "agent-a", Kind: bus.KindTask, Payload: map[string]string{"k": "v"}} + if err := b.Send(msg); err != nil { + t.Fatalf("Send: %v", err) + } + + got := <-ch + if got.Kind != bus.KindTask || got.Payload["k"] != "v" { + t.Fatalf("unexpected message: %+v", got) + } +} + +func TestUnsubscribeClosesChannel(t *testing.T) { + b := newBus() + ch := b.Subscribe("agent-b") + + b.Unsubscribe("agent-b") + + // Channel must be closed — reading from a closed channel returns zero value + ok=false. + _, ok := <-ch + if ok { + t.Fatal("expected channel to be closed after Unsubscribe") + } +} + +func TestUnsubscribeRemovesFromBus(t *testing.T) { + b := newBus() + b.Subscribe("agent-c") + b.Unsubscribe("agent-c") + + // Sending after unsubscribe must return an error, not panic. + err := b.Send(bus.AgentMessage{To: "agent-c", Kind: "ping"}) + if err == nil { + t.Fatal("expected error when sending to unsubscribed agent") + } +} + +func TestUnsubscribeIdempotent(t *testing.T) { + b := newBus() + b.Subscribe("agent-d") + // Double unsubscribe must not panic. + b.Unsubscribe("agent-d") + b.Unsubscribe("agent-d") +} + +func TestUnsubscribeNonExistent(t *testing.T) { + b := newBus() + // Unsubscribing an ID that was never subscribed must not panic. + b.Unsubscribe("does-not-exist") +} + +func TestSendToUnknownAgent(t *testing.T) { + b := newBus() + err := b.Send(bus.AgentMessage{To: "ghost", Kind: "hello"}) + if err == nil { + t.Fatal("expected error when sending to unknown agent") + } +} + +func TestResubscribeAfterUnsubscribe(t *testing.T) { + b := newBus() + b.Subscribe("agent-e") + b.Unsubscribe("agent-e") + + // Re-subscribe must work and deliver messages. + ch2 := b.Subscribe("agent-e") + msg := bus.AgentMessage{To: "agent-e", Kind: "ping"} + if err := b.Send(msg); err != nil { + t.Fatalf("Send after re-subscribe: %v", err) + } + got := <-ch2 + if got.Kind != "ping" { + t.Fatalf("unexpected kind: %q", got.Kind) + } +} From 4419fad7540e632c78300f6d2efd296f2b79d64b Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Sun, 8 Mar 2026 18:44:04 +0000 Subject: [PATCH 5/6] =?UTF-8?q?chore:=20cerrar=20issue=200013=20=E2=80=94?= =?UTF-8?q?=20hot-reload=20implementado?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mueve 0013-hot-reload.md a completed/ y actualiza el índice de issues. Co-Authored-By: Claude Sonnet 4.6 --- dev/issues/README.md | 2 +- dev/issues/completed/0013-hot-reload.md | 265 ++++++++++++++++++++++++ 2 files changed, 266 insertions(+), 1 deletion(-) create mode 100644 dev/issues/completed/0013-hot-reload.md diff --git a/dev/issues/README.md b/dev/issues/README.md index 0b13b31..1684d6f 100644 --- a/dev/issues/README.md +++ b/dev/issues/README.md @@ -17,7 +17,7 @@ afectados y notas de implementacion. | 10 | Access control | [0010-access-control.md](completed/0010-access-control.md) | completado | | 11 | Markdown rendering | [0011-markdown-rendering.md](completed/0011-markdown-rendering.md) | completado | | 12 | Threads | [0012-threads.md](completed/0012-threads.md) | completado | -| 13 | Hot reload | [0013-hot-reload.md](0013-hot-reload.md) | pendiente | +| 13 | Hot reload | [0013-hot-reload.md](completed/0013-hot-reload.md) | completado | | 14 | Template agent standardize | [0014-template-agent-standardize.md](0014-template-agent-standardize.md) | pendiente | | 15 | Multi-platform Telegram | [0015-multi-platform-telegram.md](0015-multi-platform-telegram.md) | pendiente | | 16 | Skills system | [0016-skills-system.md](0016-skills-system.md) | pendiente | diff --git a/dev/issues/completed/0013-hot-reload.md b/dev/issues/completed/0013-hot-reload.md new file mode 100644 index 0000000..04b0636 --- /dev/null +++ b/dev/issues/completed/0013-hot-reload.md @@ -0,0 +1,265 @@ +# Task 013 — Hot-Reload de Agentes Individuales + +## Objetivo + +Permitir reiniciar (recrear) un agente individual dentro del launcher sin detener +los demas agentes. El bus y el orquestador permanecen intactos porque todo sigue +en el mismo proceso. + +## Contexto + +Actualmente el launcher ejecuta todos los agentes como goroutines dentro de un +unico proceso. No hay forma de reiniciar un solo agente — hay que matar y +re-arrancar el launcher entero, lo que desconecta a todos los bots de Matrix +y rompe conversaciones en curso. + +### Por que no un proceso por agente + +El sistema de orquestacion multi-bot depende de: + +- **Bus in-process** (`shell/bus/bus.go`): Go channels, solo funciona dentro del mismo proceso. +- **Orquestador** (`shell/orchestration/`): usa el bus para `dispatchAndWait()` (request-response). +- **Deduplicacion** (`seen map`): estado compartido en memoria para evitar que multiples bots + en el mismo room procesen el mismo mensaje. +- **Interceptor**: callback sincrono que el listener de cada bot llama al orquestador. + +Separar en procesos romperia todo lo anterior. El hot-reload mantiene el proceso unico +pero recrea el agente internamente. + +## Mecanismo propuesto + +### Signal: SIGHUP + archivo de control + +1. El launcher escucha `SIGHUP` ademas de SIGINT/SIGTERM. +2. Al recibir SIGHUP, lee un archivo `run/reload.txt` que contiene el ID del agente a recargar. +3. Si el archivo no existe o esta vacio, recarga TODOS los agentes. +4. Alternativa: un comando via bus (`bus.KindReload`) enviado desde el TUI/agentctl. + +### Flujo de hot-reload + +``` +SIGHUP recibido (o comando reload via bus/TUI) + | + v +Launcher lee run/reload.txt -> agentID (o "*" para todos) + | + v +Para cada agente a recargar: + 1. Cancelar su context (ctx.cancel) -> Agent.Run() termina gracefully + 2. Esperar a que la goroutine termine (via WaitGroup o done channel) + 3. Desuscribir del bus (bus.Unsubscribe(agentID)) + 4. Re-leer config.yaml del agente + 5. Re-crear Agent con agents.New(cfg, rules, logger) + 6. Re-suscribir al bus (agent.SetBus) + 7. Re-conectar interceptor y membership notify si orquestador activo + 8. Re-registrar participante en orquestador + 9. Lanzar nueva goroutine con agent.Run(newCtx) + | + v +Log: "agent reloaded successfully" +``` + +## Plan de implementacion + +### 1. Hacer Agent cancelable individualmente + +**Archivo**: `agents/runtime.go` + +- Actualmente `Agent.Run(ctx)` recibe el context del launcher (compartido). +- Cambiar para que cada agente tenga su propio `context.WithCancel(parentCtx)`. +- Exponer un metodo `Agent.Stop()` que cancela el context hijo. +- Exponer un canal o metodo `Agent.Done() <-chan struct{}` para saber cuando termino. + +```go +type Agent struct { + // ... campos existentes ... + cancel context.CancelFunc + done chan struct{} +} + +func (a *Agent) Run(ctx context.Context) error { + ctx, a.cancel = context.WithCancel(ctx) + defer close(a.done) + // ... resto del Run existente ... +} + +func (a *Agent) Stop() { + if a.cancel != nil { + a.cancel() + } +} + +func (a *Agent) Done() <-chan struct{} { + return a.done +} +``` + +### 2. Anadir Unsubscribe al bus + +**Archivo**: `shell/bus/bus.go` + +- Nuevo metodo `Unsubscribe(id AgentID)` que elimina el canal del mapa y lo cierra. +- `listenBus()` en runtime.go debe manejar canal cerrado sin panic. + +```go +func (b *Bus) Unsubscribe(id AgentID) { + b.mu.Lock() + defer b.mu.Unlock() + if ch, ok := b.channels[id]; ok { + close(ch) + delete(b.channels, id) + } +} +``` + +### 3. Tracker de agentes en el launcher + +**Archivo**: `cmd/launcher/main.go` + +- Reemplazar el `sync.WaitGroup` actual por un registry de agentes vivos: + +```go +type runningAgent struct { + agent *agents.Agent + cancel context.CancelFunc + done chan struct{} + cfg *config.Config +} + +type agentRegistry struct { + mu sync.Mutex + agents map[string]*runningAgent +} +``` + +- Metodos: `register(id, agent)`, `stop(id)`, `reload(id, parentCtx)`, `stopAll()`. +- `reload(id)` ejecuta el flujo descrito arriba: stop -> wait -> recreate -> start. + +### 4. Handler de SIGHUP + +**Archivo**: `cmd/launcher/main.go` + +- Escuchar SIGHUP en un canal separado (no en el mismo NotifyContext de SIGINT/SIGTERM). +- Al recibir SIGHUP: + - Leer `run/reload.txt` (si existe) + - Llamar `registry.reload(id, ctx)` o `registry.reloadAll(ctx)` si es "*" + +```go +sighup := make(chan os.Signal, 1) +signal.Notify(sighup, syscall.SIGHUP) + +go func() { + for range sighup { + id := readReloadTarget("run/reload.txt") + if id == "" || id == "*" { + registry.reloadAll(ctx) + } else { + registry.reload(id, ctx) + } + } +}() +``` + +### 5. Integracion con el orquestador + +**Archivo**: `cmd/launcher/main.go` (dentro de `reload()`) + +Al recrear un agente que participa en orquestacion: + +1. El orquestador no necesita "desregistrar" al participante — basta con re-registrar + con la misma info (sobreescribe). +2. Re-llamar `SetInterceptor` y `SetMembershipNotify` en el nuevo Agent. +3. El bus.Subscribe del nuevo agente devuelve un canal nuevo — el orquestador usa + `bus.Send(agentID)` que resuelve el nuevo canal automaticamente. + +**Caso critico**: si el agente esta en medio de un `dispatchAndWait()` cuando se cancela: +- El context se cancela -> SendAndWait retorna error +- El orquestador recibe timeout/error para esa iteracion +- La respuesta parcial se pierde pero no hay corrupcion +- El orquestador puede reintentar o pasar al siguiente bot + +### 6. Integracion con el TUI + +**Archivos**: `pkg/tui/update.go`, `shell/tui/adapter.go`, `shell/process/manager.go` + +El boton "Restart" del TUI (task actual) debe cambiar de "kill+start launcher" a: + +1. Escribir el agentID en `run/reload.txt` +2. Enviar SIGHUP al proceso del launcher (`kill -HUP `) +3. Esperar un momento y refrescar estado + +```go +func (a *Adapter) restartAgent(id string) tea.Cmd { + return func() tea.Msg { + // Escribir target en reload file + os.WriteFile("run/reload.txt", []byte(id), 0644) + // Enviar SIGHUP al launcher + pid := a.mgr.UnifiedPID() + if pid > 0 { + syscall.Kill(pid, syscall.SIGHUP) + } + time.Sleep(1 * time.Second) + return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: nil} + } +} +``` + +### 7. Integracion con agentctl CLI + +**Archivo**: `cmd/agentctl/main.go` + +- Nuevo subcomando: `agentctl reload ` +- Escribe `run/reload.txt` + envia SIGHUP +- Mismo mecanismo que el TUI + +### 8. Graceful shutdown del agente + +**Archivo**: `agents/runtime.go` + +Al cancelar el context individual de un agente: + +1. El sync loop de Matrix debe detenerse limpiamente (mautrix tiene `StopSync()`) +2. Las llamadas LLM en curso deben cancelarse via context +3. La tool execution en curso debe respetar context cancellation +4. Memory/knowledge stores deben flush antes de cerrar +5. El canal del bus se cierra — `listenBus` sale del loop + +Verificar que `runtime.go:Run()` ya maneja todo esto con el context actual. +Si no, anadir cleanup explicicto. + +### 9. Tests + +- **Unit test**: `bus.Unsubscribe` no causa panic, mensajes posteriores al unsubscribe + no se pierden (retornan error). +- **Unit test**: `agentRegistry.reload()` — stop + recreate funciona. +- **Integration test**: enviar SIGHUP y verificar que solo el agente target se reinicia. +- **Orchestrator test**: agente en medio de task, se cancela, orquestador maneja el error. + +## Orden de implementacion sugerido + +1. `Agent.Stop()` + `Agent.Done()` (runtime.go) +2. `Bus.Unsubscribe()` (bus.go) +3. `agentRegistry` en launcher (main.go) +4. Handler SIGHUP (main.go) +5. Graceful shutdown verification (runtime.go) +6. Actualizar TUI adapter (adapter.go) +7. Actualizar agentctl (agentctl/main.go) +8. Tests + +## Riesgos y mitigaciones + +| Riesgo | Mitigacion | +|--------|------------| +| Race condition al cerrar canal del bus | Mutex en Unsubscribe, recover en Send | +| Crypto store de mautrix queda locked | Cerrar store explicitamente en cleanup | +| Orquestador en medio de dispatch | Context cancellation + timeout ya existente | +| Config invalido al recargar | Validar config antes de destruir agente viejo | +| Matrix sync no para limpio | Llamar StopSync() explicitamente antes de cancel | + +## Notas + +- SIGHUP es la convencion Unix para "recargar configuracion" (nginx, haproxy, etc.) +- El archivo `run/reload.txt` es efimero — se puede borrar despues de leer +- Si el launcher no esta corriendo, el TUI debe caer al comportamiento actual (start launcher) +- El orquestador NO se recarga — solo los agentes. Para recargar el orquestador + hay que reiniciar el launcher entero. From 25f8aeafaacb8328d49537be84adc74033600a98 Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Sun, 8 Mar 2026 18:44:26 +0000 Subject: [PATCH 6/6] chore: eliminar issue 0013 de pendientes (ya movido a completed/) Co-Authored-By: Claude Sonnet 4.6 --- dev/issues/0013-hot-reload.md | 265 ---------------------------------- 1 file changed, 265 deletions(-) delete mode 100644 dev/issues/0013-hot-reload.md diff --git a/dev/issues/0013-hot-reload.md b/dev/issues/0013-hot-reload.md deleted file mode 100644 index 04b0636..0000000 --- a/dev/issues/0013-hot-reload.md +++ /dev/null @@ -1,265 +0,0 @@ -# Task 013 — Hot-Reload de Agentes Individuales - -## Objetivo - -Permitir reiniciar (recrear) un agente individual dentro del launcher sin detener -los demas agentes. El bus y el orquestador permanecen intactos porque todo sigue -en el mismo proceso. - -## Contexto - -Actualmente el launcher ejecuta todos los agentes como goroutines dentro de un -unico proceso. No hay forma de reiniciar un solo agente — hay que matar y -re-arrancar el launcher entero, lo que desconecta a todos los bots de Matrix -y rompe conversaciones en curso. - -### Por que no un proceso por agente - -El sistema de orquestacion multi-bot depende de: - -- **Bus in-process** (`shell/bus/bus.go`): Go channels, solo funciona dentro del mismo proceso. -- **Orquestador** (`shell/orchestration/`): usa el bus para `dispatchAndWait()` (request-response). -- **Deduplicacion** (`seen map`): estado compartido en memoria para evitar que multiples bots - en el mismo room procesen el mismo mensaje. -- **Interceptor**: callback sincrono que el listener de cada bot llama al orquestador. - -Separar en procesos romperia todo lo anterior. El hot-reload mantiene el proceso unico -pero recrea el agente internamente. - -## Mecanismo propuesto - -### Signal: SIGHUP + archivo de control - -1. El launcher escucha `SIGHUP` ademas de SIGINT/SIGTERM. -2. Al recibir SIGHUP, lee un archivo `run/reload.txt` que contiene el ID del agente a recargar. -3. Si el archivo no existe o esta vacio, recarga TODOS los agentes. -4. Alternativa: un comando via bus (`bus.KindReload`) enviado desde el TUI/agentctl. - -### Flujo de hot-reload - -``` -SIGHUP recibido (o comando reload via bus/TUI) - | - v -Launcher lee run/reload.txt -> agentID (o "*" para todos) - | - v -Para cada agente a recargar: - 1. Cancelar su context (ctx.cancel) -> Agent.Run() termina gracefully - 2. Esperar a que la goroutine termine (via WaitGroup o done channel) - 3. Desuscribir del bus (bus.Unsubscribe(agentID)) - 4. Re-leer config.yaml del agente - 5. Re-crear Agent con agents.New(cfg, rules, logger) - 6. Re-suscribir al bus (agent.SetBus) - 7. Re-conectar interceptor y membership notify si orquestador activo - 8. Re-registrar participante en orquestador - 9. Lanzar nueva goroutine con agent.Run(newCtx) - | - v -Log: "agent reloaded successfully" -``` - -## Plan de implementacion - -### 1. Hacer Agent cancelable individualmente - -**Archivo**: `agents/runtime.go` - -- Actualmente `Agent.Run(ctx)` recibe el context del launcher (compartido). -- Cambiar para que cada agente tenga su propio `context.WithCancel(parentCtx)`. -- Exponer un metodo `Agent.Stop()` que cancela el context hijo. -- Exponer un canal o metodo `Agent.Done() <-chan struct{}` para saber cuando termino. - -```go -type Agent struct { - // ... campos existentes ... - cancel context.CancelFunc - done chan struct{} -} - -func (a *Agent) Run(ctx context.Context) error { - ctx, a.cancel = context.WithCancel(ctx) - defer close(a.done) - // ... resto del Run existente ... -} - -func (a *Agent) Stop() { - if a.cancel != nil { - a.cancel() - } -} - -func (a *Agent) Done() <-chan struct{} { - return a.done -} -``` - -### 2. Anadir Unsubscribe al bus - -**Archivo**: `shell/bus/bus.go` - -- Nuevo metodo `Unsubscribe(id AgentID)` que elimina el canal del mapa y lo cierra. -- `listenBus()` en runtime.go debe manejar canal cerrado sin panic. - -```go -func (b *Bus) Unsubscribe(id AgentID) { - b.mu.Lock() - defer b.mu.Unlock() - if ch, ok := b.channels[id]; ok { - close(ch) - delete(b.channels, id) - } -} -``` - -### 3. Tracker de agentes en el launcher - -**Archivo**: `cmd/launcher/main.go` - -- Reemplazar el `sync.WaitGroup` actual por un registry de agentes vivos: - -```go -type runningAgent struct { - agent *agents.Agent - cancel context.CancelFunc - done chan struct{} - cfg *config.Config -} - -type agentRegistry struct { - mu sync.Mutex - agents map[string]*runningAgent -} -``` - -- Metodos: `register(id, agent)`, `stop(id)`, `reload(id, parentCtx)`, `stopAll()`. -- `reload(id)` ejecuta el flujo descrito arriba: stop -> wait -> recreate -> start. - -### 4. Handler de SIGHUP - -**Archivo**: `cmd/launcher/main.go` - -- Escuchar SIGHUP en un canal separado (no en el mismo NotifyContext de SIGINT/SIGTERM). -- Al recibir SIGHUP: - - Leer `run/reload.txt` (si existe) - - Llamar `registry.reload(id, ctx)` o `registry.reloadAll(ctx)` si es "*" - -```go -sighup := make(chan os.Signal, 1) -signal.Notify(sighup, syscall.SIGHUP) - -go func() { - for range sighup { - id := readReloadTarget("run/reload.txt") - if id == "" || id == "*" { - registry.reloadAll(ctx) - } else { - registry.reload(id, ctx) - } - } -}() -``` - -### 5. Integracion con el orquestador - -**Archivo**: `cmd/launcher/main.go` (dentro de `reload()`) - -Al recrear un agente que participa en orquestacion: - -1. El orquestador no necesita "desregistrar" al participante — basta con re-registrar - con la misma info (sobreescribe). -2. Re-llamar `SetInterceptor` y `SetMembershipNotify` en el nuevo Agent. -3. El bus.Subscribe del nuevo agente devuelve un canal nuevo — el orquestador usa - `bus.Send(agentID)` que resuelve el nuevo canal automaticamente. - -**Caso critico**: si el agente esta en medio de un `dispatchAndWait()` cuando se cancela: -- El context se cancela -> SendAndWait retorna error -- El orquestador recibe timeout/error para esa iteracion -- La respuesta parcial se pierde pero no hay corrupcion -- El orquestador puede reintentar o pasar al siguiente bot - -### 6. Integracion con el TUI - -**Archivos**: `pkg/tui/update.go`, `shell/tui/adapter.go`, `shell/process/manager.go` - -El boton "Restart" del TUI (task actual) debe cambiar de "kill+start launcher" a: - -1. Escribir el agentID en `run/reload.txt` -2. Enviar SIGHUP al proceso del launcher (`kill -HUP `) -3. Esperar un momento y refrescar estado - -```go -func (a *Adapter) restartAgent(id string) tea.Cmd { - return func() tea.Msg { - // Escribir target en reload file - os.WriteFile("run/reload.txt", []byte(id), 0644) - // Enviar SIGHUP al launcher - pid := a.mgr.UnifiedPID() - if pid > 0 { - syscall.Kill(pid, syscall.SIGHUP) - } - time.Sleep(1 * time.Second) - return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: nil} - } -} -``` - -### 7. Integracion con agentctl CLI - -**Archivo**: `cmd/agentctl/main.go` - -- Nuevo subcomando: `agentctl reload ` -- Escribe `run/reload.txt` + envia SIGHUP -- Mismo mecanismo que el TUI - -### 8. Graceful shutdown del agente - -**Archivo**: `agents/runtime.go` - -Al cancelar el context individual de un agente: - -1. El sync loop de Matrix debe detenerse limpiamente (mautrix tiene `StopSync()`) -2. Las llamadas LLM en curso deben cancelarse via context -3. La tool execution en curso debe respetar context cancellation -4. Memory/knowledge stores deben flush antes de cerrar -5. El canal del bus se cierra — `listenBus` sale del loop - -Verificar que `runtime.go:Run()` ya maneja todo esto con el context actual. -Si no, anadir cleanup explicicto. - -### 9. Tests - -- **Unit test**: `bus.Unsubscribe` no causa panic, mensajes posteriores al unsubscribe - no se pierden (retornan error). -- **Unit test**: `agentRegistry.reload()` — stop + recreate funciona. -- **Integration test**: enviar SIGHUP y verificar que solo el agente target se reinicia. -- **Orchestrator test**: agente en medio de task, se cancela, orquestador maneja el error. - -## Orden de implementacion sugerido - -1. `Agent.Stop()` + `Agent.Done()` (runtime.go) -2. `Bus.Unsubscribe()` (bus.go) -3. `agentRegistry` en launcher (main.go) -4. Handler SIGHUP (main.go) -5. Graceful shutdown verification (runtime.go) -6. Actualizar TUI adapter (adapter.go) -7. Actualizar agentctl (agentctl/main.go) -8. Tests - -## Riesgos y mitigaciones - -| Riesgo | Mitigacion | -|--------|------------| -| Race condition al cerrar canal del bus | Mutex en Unsubscribe, recover en Send | -| Crypto store de mautrix queda locked | Cerrar store explicitamente en cleanup | -| Orquestador en medio de dispatch | Context cancellation + timeout ya existente | -| Config invalido al recargar | Validar config antes de destruir agente viejo | -| Matrix sync no para limpio | Llamar StopSync() explicitamente antes de cancel | - -## Notas - -- SIGHUP es la convencion Unix para "recargar configuracion" (nginx, haproxy, etc.) -- El archivo `run/reload.txt` es efimero — se puede borrar despues de leer -- Si el launcher no esta corriendo, el TUI debe caer al comportamiento actual (start launcher) -- El orquestador NO se recarga — solo los agentes. Para recargar el orquestador - hay que reiniciar el launcher entero.