diff --git a/agents/lifecycle_test.go b/agents/lifecycle_test.go new file mode 100644 index 0000000..93c71b2 --- /dev/null +++ b/agents/lifecycle_test.go @@ -0,0 +1,64 @@ +package agents + +import ( + "context" + "testing" + "time" +) + +// TestAgentStopAndDone verifies that Stop() cancels Run and Done() closes. +// Uses a minimal Agent (no Matrix, no LLM) via direct struct init so the test +// doesn't require network or external dependencies. +func TestAgentStopAndDone(t *testing.T) { + a := &Agent{ + done: make(chan struct{}), + } + + // Simulate Run: create the cancel, then immediately block on ctx. + ctx, cancel := context.WithCancel(context.Background()) + a.cancel = cancel + + started := make(chan struct{}) + go func() { + close(started) + // Mimic what Run does: block on ctx, then close done. + <-ctx.Done() + close(a.done) + }() + + <-started + + // Stop must unblock the goroutine above. + a.Stop() + + select { + case <-a.Done(): + // ok + case <-time.After(2 * time.Second): + t.Fatal("Done() did not close within 2s after Stop()") + } +} + +// TestAgentStopIdempotent verifies that calling Stop() multiple times is safe. +func TestAgentStopIdempotent(t *testing.T) { + a := &Agent{ + done: make(chan struct{}), + } + _, cancel := context.WithCancel(context.Background()) + a.cancel = cancel + defer cancel() + + // Should not panic when called multiple times. + a.Stop() + a.Stop() + a.Stop() +} + +// TestAgentStopNilCancel verifies Stop() is safe when cancel is nil. +func TestAgentStopNilCancel(t *testing.T) { + a := &Agent{ + done: make(chan struct{}), + } + // cancel is nil — must not panic. + a.Stop() +} diff --git a/agents/runtime.go b/agents/runtime.go index 6b07f28..19ab667 100644 --- a/agents/runtime.go +++ b/agents/runtime.go @@ -62,6 +62,10 @@ type Agent struct { logger *slog.Logger cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown + // Lifecycle — cancel stops this agent individually; done is closed when Run returns. + cancel context.CancelFunc + done chan struct{} + // Access control acl acl.ACL @@ -268,6 +272,7 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (* toolReg: toolReg, logger: logger, cryptoStore: cryptoStore, + done: make(chan struct{}), commands: make(map[string]CommandHandler), cmdAliases: command.BuiltinNames(), startTime: time.Now(), @@ -363,8 +368,24 @@ func (a *Agent) RawMatrixClient() *mautrix.Client { return a.matrix.Raw() } +// Stop cancels this agent's individual context, causing Run to return. +// Safe to call multiple times. +func (a *Agent) Stop() { + if a.cancel != nil { + a.cancel() + } +} + +// Done returns a channel that is closed when Run has returned. +func (a *Agent) Done() <-chan struct{} { + return a.done +} + // Run starts the agent sync loop. Blocks until ctx is cancelled. func (a *Agent) Run(ctx context.Context) error { + ctx, a.cancel = context.WithCancel(ctx) + defer close(a.done) + if a.cryptoStore != nil { defer a.cryptoStore.Close() } diff --git a/cmd/agentctl/main.go b/cmd/agentctl/main.go index cd3da10..1385f71 100644 --- a/cmd/agentctl/main.go +++ b/cmd/agentctl/main.go @@ -14,6 +14,7 @@ import ( "fmt" "os" "strings" + "syscall" "github.com/spf13/cobra" @@ -47,6 +48,7 @@ func main() { listCmd(mgr), startCmd(mgr, &binPath), stopCmd(mgr), + reloadCmd(mgr), removeCmd(mgr), avatarCmd(), displaynameCmd(), @@ -171,6 +173,48 @@ func stopCmd(mgr *process.Manager) *cobra.Command { } } +// ── reload ──────────────────────────────────────────────────────────────── + +func reloadCmd(mgr *process.Manager) *cobra.Command { + return &cobra.Command{ + Use: "reload [agent-id]", + Short: "Hot-reload an agent (or all agents) without stopping the launcher", + Long: `Sends SIGHUP to the running launcher, which triggers a hot-reload. +If an agent-id is given, only that agent is reloaded. +If no agent-id is given, all agents are reloaded. + +The launcher must be running. Use 'agentctl start' first if needed.`, + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + pid := mgr.UnifiedPID() + if pid <= 0 { + return fmt.Errorf("launcher is not running") + } + + target := "" + if len(args) == 1 { + target = args[0] + } + + if target != "" { + if err := os.WriteFile("run/reload.txt", []byte(target), 0o644); err != nil { + return fmt.Errorf("write reload target: %w", err) + } + fmt.Printf("reload %-20s sending SIGHUP to PID %d\n", target, pid) + } else { + // Remove any stale reload.txt so SIGHUP reloads all agents. + _ = os.Remove("run/reload.txt") + fmt.Printf("reload %-20s sending SIGHUP to PID %d\n", "(all)", pid) + } + + if err := syscall.Kill(pid, syscall.SIGHUP); err != nil { + return fmt.Errorf("kill -HUP %d: %w", pid, err) + } + return nil + }, + } +} + // ── remove ──────────────────────────────────────────────────────────────── func removeCmd(mgr *process.Manager) *cobra.Command { diff --git a/cmd/launcher/main.go b/cmd/launcher/main.go index 371e66b..2a85caf 100644 --- a/cmd/launcher/main.go +++ b/cmd/launcher/main.go @@ -12,7 +12,6 @@ import ( "os" "os/signal" "path/filepath" - "sync" "syscall" "time" @@ -72,13 +71,7 @@ func main() { logger.Warn("could not create file logger, falling back to stdout", "err", err) launcherCleanup = func() {} } - var cleanups []func() - cleanups = append(cleanups, launcherCleanup) - defer func() { - for _, fn := range cleanups { - fn() - } - }() + defer launcherCleanup() if len(configPaths) == 0 { logger.Warn("no agent configs found — nothing to start") @@ -96,15 +89,49 @@ func main() { if err != nil { // Non-fatal: orchestration is optional logger.Warn("orchestrator not started", "err", err) - } else { + } else if orch != nil { logger.Info("orchestrator initialized") } - // ── Start normal agents ── - var wg sync.WaitGroup - var scannerOnce sync.Once - var scanner *mautrix.Client + // ── Shared dependencies for agent registry ── + deps := &launchDeps{ + agentBus: agentBus, + orch: orch, + logDir: logDir, + logLevel: lvl, + parentCtx: ctx, + } + registry := newAgentRegistry(deps) + // ── SIGHUP: hot-reload individual agent or all agents ── + sighup := make(chan os.Signal, 1) + signal.Notify(sighup, syscall.SIGHUP) + go func() { + for { + select { + case <-ctx.Done(): + return + case _, ok := <-sighup: + if !ok { + return + } + id := readReloadTarget("run/reload.txt") + // Remove the target file after reading so it doesn't + // affect the next SIGHUP. + _ = os.Remove("run/reload.txt") + if id == "" { + logger.Info("sighup: reloading all agents") + registry.reloadAll(rulesFor) + } else { + logger.Info("sighup: reloading agent", "id", id) + registry.reload(id, rulesFor) + } + } + } + }() + + // ── Start normal agents ── + var scannerOnce scanOnce for _, path := range configPaths { path := path cfg, err := config.Load(path) @@ -130,11 +157,11 @@ func main() { agentLogger = logger.With("agent", cfg.Agent.ID) agentCleanup = func() {} } - cleanups = append(cleanups, agentCleanup) a, err := agents.New(cfg, rules, agentLogger) if err != nil { logger.Error("failed to create agent", "id", cfg.Agent.ID, "err", err) + agentCleanup() continue } @@ -154,30 +181,28 @@ func main() { }) // Grab the first available Matrix client for room scanning - scannerOnce.Do(func() { - scanner = a.RawMatrixClient() - }) + scannerOnce.set(a.RawMatrixClient()) } - wg.Add(1) - go func() { - defer wg.Done() - agentLogger.Info("agent running") - if err := a.Run(ctx); err != nil { - agentLogger.Error("agent stopped with error", "err", err) - } - }() + registry.register(&runningAgent{ + agent: a, + cfg: cfg, + cfgPath: path, + logger: agentLogger, + logCleanup: agentCleanup, + }) } // ── Startup room scan (after all participants are registered) ── - if orch != nil && scanner != nil { - orch.orchestrator.SetScanner(scanner) + if orch != nil && scannerOnce.client != nil { + orch.orchestrator.SetScanner(scannerOnce.client) scanCtx, scanCancel := context.WithTimeout(ctx, 30*time.Second) orch.orchestrator.ScanExistingRooms(scanCtx) scanCancel() } - wg.Wait() + registry.waitAll() + registry.cleanupLogs() logger.Info("all agents stopped") return nil }, @@ -195,6 +220,17 @@ func main() { } } +// scanOnce captures the first Matrix client for room scanning. +type scanOnce struct { + client *mautrix.Client +} + +func (s *scanOnce) set(c *mautrix.Client) { + if s.client == nil { + s.client = c + } +} + // orchHandle wraps a running orchestrator with its config for the launcher. type orchHandle struct { orchestrator *orchshell.Orchestrator diff --git a/cmd/launcher/registry.go b/cmd/launcher/registry.go new file mode 100644 index 0000000..35b1e83 --- /dev/null +++ b/cmd/launcher/registry.go @@ -0,0 +1,231 @@ +package main + +import ( + "context" + "log/slog" + "os" + "strings" + "sync" + "time" + + "github.com/enmanuel/agents/agents" + "github.com/enmanuel/agents/internal/config" + "github.com/enmanuel/agents/pkg/decision" + "github.com/enmanuel/agents/pkg/orchestration" + "github.com/enmanuel/agents/shell/bus" + agentlog "github.com/enmanuel/agents/shell/logger" +) + +// runningAgent holds a live agent and the metadata needed to recreate it. +type runningAgent struct { + agent *agents.Agent + cfg *config.AgentConfig + cfgPath string + logger *slog.Logger + logCleanup func() +} + +// launchDeps holds shared resources needed to start/reload agents. +type launchDeps struct { + agentBus *bus.Bus + orch *orchHandle + logDir string + logLevel slog.Level + parentCtx context.Context +} + +// agentRegistry tracks all running agents by ID, enabling individual hot-reload. +type agentRegistry struct { + mu sync.Mutex + agents map[string]*runningAgent + deps *launchDeps +} + +func newAgentRegistry(deps *launchDeps) *agentRegistry { + return &agentRegistry{ + agents: make(map[string]*runningAgent), + deps: deps, + } +} + +// register adds a running agent to the registry and starts its goroutine. +func (r *agentRegistry) register(ra *runningAgent) { + r.mu.Lock() + r.agents[ra.cfg.Agent.ID] = ra + r.mu.Unlock() + + go func() { + ra.logger.Info("agent running") + if err := ra.agent.Run(r.deps.parentCtx); err != nil { + ra.logger.Error("agent stopped with error", "err", err) + } + }() +} + +// stopAndWait stops a running agent and waits for it to finish. +// Caller must NOT hold r.mu. +func (r *agentRegistry) stopAndWait(id string) { + r.mu.Lock() + ra, ok := r.agents[id] + r.mu.Unlock() + if !ok { + return + } + + ra.agent.Stop() + select { + case <-ra.agent.Done(): + case <-time.After(10 * time.Second): + ra.logger.Warn("agent did not stop within 10s, forcing", "id", id) + } + + // Unsubscribe from bus so no stale channel remains. + r.deps.agentBus.Unsubscribe(bus.AgentID(id)) +} + +// reload stops an agent, re-reads its config, recreates it, and restarts it. +func (r *agentRegistry) reload(id string, rulesFor func(string, *slog.Logger) []decision.Rule) { + r.mu.Lock() + ra, ok := r.agents[id] + r.mu.Unlock() + if !ok { + slog.Warn("reload: agent not found", "id", id) + return + } + + cfgPath := ra.cfgPath + oldCleanup := ra.logCleanup + + ra.logger.Info("agent_reload_start", "id", id) + + // 1. Stop current instance and wait. + r.stopAndWait(id) + + // 2. Cleanup old log writer. + if oldCleanup != nil { + oldCleanup() + } + + // 3. Re-read config. + cfg, err := config.Load(cfgPath) + if err != nil { + slog.Error("reload: failed to load config", "path", cfgPath, "err", err) + return + } + if !cfg.Agent.Enabled { + slog.Info("reload: agent is disabled, not restarting", "id", id) + r.mu.Lock() + delete(r.agents, id) + r.mu.Unlock() + return + } + + // 4. New per-agent logger. + newLogger, newCleanup, aErr := agentlog.NewAgentLogger(agentlog.LoggerConfig{ + BaseDir: r.deps.logDir, + AgentID: cfg.Agent.ID, + Level: r.deps.logLevel, + }) + if aErr != nil { + newLogger = slog.Default().With("agent", cfg.Agent.ID) + newCleanup = func() {} + } + + // 5. Create new agent (validates config before discarding the old one). + rules := rulesFor(cfg.Agent.ID, newLogger) + newAgent, err := agents.New(cfg, rules, newLogger) + if err != nil { + newLogger.Error("reload: failed to create agent", "id", id, "err", err) + newCleanup() + return + } + + // 6. Wire bus and orchestration. + newAgent.SetBus(r.deps.agentBus) + if r.deps.orch != nil { + newAgent.SetInterceptor(r.deps.orch.orchestrator.Intercept) + newAgent.SetMembershipNotify(r.deps.orch.orchestrator.NotifyMembership) + r.deps.orch.orchestrator.RegisterParticipant(orchestration.ParticipantInfo{ + ID: cfg.Agent.ID, + MatrixUserID: cfg.Matrix.UserID, + Description: cfg.Agent.Description, + Capabilities: cfg.Agent.Tags, + }) + } + + newRA := &runningAgent{ + agent: newAgent, + cfg: cfg, + cfgPath: cfgPath, + logger: newLogger, + logCleanup: newCleanup, + } + + r.mu.Lock() + r.agents[id] = newRA + r.mu.Unlock() + + // 7. Start new goroutine. + go func() { + newLogger.Info("agent running") + if err := newAgent.Run(r.deps.parentCtx); err != nil { + newLogger.Error("agent stopped with error", "err", err) + } + }() + + newLogger.Info("agent_reloaded", "id", id) +} + +// reloadAll reloads every registered agent sequentially. +func (r *agentRegistry) reloadAll(rulesFor func(string, *slog.Logger) []decision.Rule) { + r.mu.Lock() + ids := make([]string, 0, len(r.agents)) + for id := range r.agents { + ids = append(ids, id) + } + r.mu.Unlock() + + for _, id := range ids { + r.reload(id, rulesFor) + } +} + +// waitAll blocks until all registered agents have stopped. +func (r *agentRegistry) waitAll() { + r.mu.Lock() + dones := make([]<-chan struct{}, 0, len(r.agents)) + for _, ra := range r.agents { + dones = append(dones, ra.agent.Done()) + } + r.mu.Unlock() + + for _, done := range dones { + <-done + } +} + +// cleanupLogs calls every agent's log cleanup function (called on launcher shutdown). +func (r *agentRegistry) cleanupLogs() { + r.mu.Lock() + defer r.mu.Unlock() + for _, ra := range r.agents { + if ra.logCleanup != nil { + ra.logCleanup() + } + } +} + +// readReloadTarget reads the given file and returns the trimmed content. +// Returns "" if the file doesn't exist, is empty, or equals "*" (meaning reload all). +func readReloadTarget(path string) string { + data, err := os.ReadFile(path) + if err != nil { + return "" + } + id := strings.TrimSpace(string(data)) + if id == "*" { + return "" + } + return id +} + diff --git a/cmd/launcher/registry_test.go b/cmd/launcher/registry_test.go new file mode 100644 index 0000000..83a6131 --- /dev/null +++ b/cmd/launcher/registry_test.go @@ -0,0 +1,58 @@ +package main + +import ( + "os" + "path/filepath" + "testing" +) + +func TestReadReloadTarget_missing(t *testing.T) { + got := readReloadTarget(filepath.Join(t.TempDir(), "reload.txt")) + if got != "" { + t.Fatalf("expected empty string for missing file, got %q", got) + } +} + +func TestReadReloadTarget_empty(t *testing.T) { + f := filepath.Join(t.TempDir(), "reload.txt") + if err := os.WriteFile(f, []byte(""), 0o644); err != nil { + t.Fatal(err) + } + got := readReloadTarget(f) + if got != "" { + t.Fatalf("expected empty string for empty file, got %q", got) + } +} + +func TestReadReloadTarget_star(t *testing.T) { + f := filepath.Join(t.TempDir(), "reload.txt") + if err := os.WriteFile(f, []byte("*\n"), 0o644); err != nil { + t.Fatal(err) + } + got := readReloadTarget(f) + if got != "" { + t.Fatalf("expected empty string for '*', got %q", got) + } +} + +func TestReadReloadTarget_agentID(t *testing.T) { + f := filepath.Join(t.TempDir(), "reload.txt") + if err := os.WriteFile(f, []byte("assistant-bot\n"), 0o644); err != nil { + t.Fatal(err) + } + got := readReloadTarget(f) + if got != "assistant-bot" { + t.Fatalf("expected 'assistant-bot', got %q", got) + } +} + +func TestReadReloadTarget_whitespace(t *testing.T) { + f := filepath.Join(t.TempDir(), "reload.txt") + if err := os.WriteFile(f, []byte(" asistente-2 \n"), 0o644); err != nil { + t.Fatal(err) + } + got := readReloadTarget(f) + if got != "asistente-2" { + t.Fatalf("expected 'asistente-2', got %q", got) + } +} diff --git a/dev/issues/README.md b/dev/issues/README.md index 0b13b31..1684d6f 100644 --- a/dev/issues/README.md +++ b/dev/issues/README.md @@ -17,7 +17,7 @@ afectados y notas de implementacion. | 10 | Access control | [0010-access-control.md](completed/0010-access-control.md) | completado | | 11 | Markdown rendering | [0011-markdown-rendering.md](completed/0011-markdown-rendering.md) | completado | | 12 | Threads | [0012-threads.md](completed/0012-threads.md) | completado | -| 13 | Hot reload | [0013-hot-reload.md](0013-hot-reload.md) | pendiente | +| 13 | Hot reload | [0013-hot-reload.md](completed/0013-hot-reload.md) | completado | | 14 | Template agent standardize | [0014-template-agent-standardize.md](0014-template-agent-standardize.md) | pendiente | | 15 | Multi-platform Telegram | [0015-multi-platform-telegram.md](0015-multi-platform-telegram.md) | pendiente | | 16 | Skills system | [0016-skills-system.md](0016-skills-system.md) | pendiente | diff --git a/dev/issues/0013-hot-reload.md b/dev/issues/completed/0013-hot-reload.md similarity index 100% rename from dev/issues/0013-hot-reload.md rename to dev/issues/completed/0013-hot-reload.md diff --git a/shell/bus/bus_test.go b/shell/bus/bus_test.go new file mode 100644 index 0000000..5573f41 --- /dev/null +++ b/shell/bus/bus_test.go @@ -0,0 +1,91 @@ +package bus_test + +import ( + "log/slog" + "testing" + + "github.com/enmanuel/agents/shell/bus" +) + +func newBus() *bus.Bus { + return bus.New(slog.Default()) +} + +func TestSubscribeAndSend(t *testing.T) { + b := newBus() + ch := b.Subscribe("agent-a") + + msg := bus.AgentMessage{From: "orch", To: "agent-a", Kind: bus.KindTask, Payload: map[string]string{"k": "v"}} + if err := b.Send(msg); err != nil { + t.Fatalf("Send: %v", err) + } + + got := <-ch + if got.Kind != bus.KindTask || got.Payload["k"] != "v" { + t.Fatalf("unexpected message: %+v", got) + } +} + +func TestUnsubscribeClosesChannel(t *testing.T) { + b := newBus() + ch := b.Subscribe("agent-b") + + b.Unsubscribe("agent-b") + + // Channel must be closed — reading from a closed channel returns zero value + ok=false. + _, ok := <-ch + if ok { + t.Fatal("expected channel to be closed after Unsubscribe") + } +} + +func TestUnsubscribeRemovesFromBus(t *testing.T) { + b := newBus() + b.Subscribe("agent-c") + b.Unsubscribe("agent-c") + + // Sending after unsubscribe must return an error, not panic. + err := b.Send(bus.AgentMessage{To: "agent-c", Kind: "ping"}) + if err == nil { + t.Fatal("expected error when sending to unsubscribed agent") + } +} + +func TestUnsubscribeIdempotent(t *testing.T) { + b := newBus() + b.Subscribe("agent-d") + // Double unsubscribe must not panic. + b.Unsubscribe("agent-d") + b.Unsubscribe("agent-d") +} + +func TestUnsubscribeNonExistent(t *testing.T) { + b := newBus() + // Unsubscribing an ID that was never subscribed must not panic. + b.Unsubscribe("does-not-exist") +} + +func TestSendToUnknownAgent(t *testing.T) { + b := newBus() + err := b.Send(bus.AgentMessage{To: "ghost", Kind: "hello"}) + if err == nil { + t.Fatal("expected error when sending to unknown agent") + } +} + +func TestResubscribeAfterUnsubscribe(t *testing.T) { + b := newBus() + b.Subscribe("agent-e") + b.Unsubscribe("agent-e") + + // Re-subscribe must work and deliver messages. + ch2 := b.Subscribe("agent-e") + msg := bus.AgentMessage{To: "agent-e", Kind: "ping"} + if err := b.Send(msg); err != nil { + t.Fatalf("Send after re-subscribe: %v", err) + } + got := <-ch2 + if got.Kind != "ping" { + t.Fatalf("unexpected kind: %q", got.Kind) + } +} diff --git a/shell/tui/adapter.go b/shell/tui/adapter.go index fd7248b..72264e3 100644 --- a/shell/tui/adapter.go +++ b/shell/tui/adapter.go @@ -4,8 +4,10 @@ package tui import ( "fmt" + "os" "os/exec" "strings" + "syscall" "time" tea "github.com/charmbracelet/bubbletea" @@ -140,13 +142,28 @@ func (a *Adapter) disableAgent(id string) tea.Cmd { func (a *Adapter) restartAgent(id string) tea.Cmd { return func() tea.Msg { - _ = a.mgr.StopUnified() - time.Sleep(500 * time.Millisecond) - err := a.mgr.StartUnified() - if err == nil { + pid := a.mgr.UnifiedPID() + if pid <= 0 { + // Launcher not running — fall back to full restart. + _ = a.mgr.StopUnified() time.Sleep(500 * time.Millisecond) + err := a.mgr.StartUnified() + if err == nil { + time.Sleep(500 * time.Millisecond) + } + return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: err} } - return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: err} + + // Launcher is running — write target and send SIGHUP for hot-reload. + if id != "" { + _ = os.WriteFile("run/reload.txt", []byte(id), 0o644) + } + err := syscall.Kill(pid, syscall.SIGHUP) + if err != nil { + return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: err} + } + time.Sleep(1 * time.Second) + return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: nil} } }