merge: issue/0013-hot-reload — hot-reload de agentes individuales via SIGHUP
Implementa recarga en caliente de un agente individual sin detener el launcher: - Agent.Stop() + Agent.Done() para ciclo de vida individual del agente - agentRegistry en el launcher para rastrear agentes vivos y recargarlos - Handler de SIGHUP: lee run/reload.txt para determinar agente objetivo - TUI: restartAgent() usa SIGHUP en lugar de kill+restart del launcher - agentctl reload [agent-id]: nuevo subcomando de hot-reload - Tests: bus.Unsubscribe, readReloadTarget, Agent.Stop/Done Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,64 @@
|
|||||||
|
package agents
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestAgentStopAndDone verifies that Stop() cancels Run and Done() closes.
|
||||||
|
// Uses a minimal Agent (no Matrix, no LLM) via direct struct init so the test
|
||||||
|
// doesn't require network or external dependencies.
|
||||||
|
func TestAgentStopAndDone(t *testing.T) {
|
||||||
|
a := &Agent{
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Simulate Run: create the cancel, then immediately block on ctx.
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
a.cancel = cancel
|
||||||
|
|
||||||
|
started := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
close(started)
|
||||||
|
// Mimic what Run does: block on ctx, then close done.
|
||||||
|
<-ctx.Done()
|
||||||
|
close(a.done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-started
|
||||||
|
|
||||||
|
// Stop must unblock the goroutine above.
|
||||||
|
a.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-a.Done():
|
||||||
|
// ok
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("Done() did not close within 2s after Stop()")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestAgentStopIdempotent verifies that calling Stop() multiple times is safe.
|
||||||
|
func TestAgentStopIdempotent(t *testing.T) {
|
||||||
|
a := &Agent{
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
_, cancel := context.WithCancel(context.Background())
|
||||||
|
a.cancel = cancel
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Should not panic when called multiple times.
|
||||||
|
a.Stop()
|
||||||
|
a.Stop()
|
||||||
|
a.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestAgentStopNilCancel verifies Stop() is safe when cancel is nil.
|
||||||
|
func TestAgentStopNilCancel(t *testing.T) {
|
||||||
|
a := &Agent{
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
// cancel is nil — must not panic.
|
||||||
|
a.Stop()
|
||||||
|
}
|
||||||
@@ -62,6 +62,10 @@ type Agent struct {
|
|||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown
|
cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown
|
||||||
|
|
||||||
|
// Lifecycle — cancel stops this agent individually; done is closed when Run returns.
|
||||||
|
cancel context.CancelFunc
|
||||||
|
done chan struct{}
|
||||||
|
|
||||||
// Access control
|
// Access control
|
||||||
acl acl.ACL
|
acl acl.ACL
|
||||||
|
|
||||||
@@ -268,6 +272,7 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (*
|
|||||||
toolReg: toolReg,
|
toolReg: toolReg,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
cryptoStore: cryptoStore,
|
cryptoStore: cryptoStore,
|
||||||
|
done: make(chan struct{}),
|
||||||
commands: make(map[string]CommandHandler),
|
commands: make(map[string]CommandHandler),
|
||||||
cmdAliases: command.BuiltinNames(),
|
cmdAliases: command.BuiltinNames(),
|
||||||
startTime: time.Now(),
|
startTime: time.Now(),
|
||||||
@@ -363,8 +368,24 @@ func (a *Agent) RawMatrixClient() *mautrix.Client {
|
|||||||
return a.matrix.Raw()
|
return a.matrix.Raw()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stop cancels this agent's individual context, causing Run to return.
|
||||||
|
// Safe to call multiple times.
|
||||||
|
func (a *Agent) Stop() {
|
||||||
|
if a.cancel != nil {
|
||||||
|
a.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Done returns a channel that is closed when Run has returned.
|
||||||
|
func (a *Agent) Done() <-chan struct{} {
|
||||||
|
return a.done
|
||||||
|
}
|
||||||
|
|
||||||
// Run starts the agent sync loop. Blocks until ctx is cancelled.
|
// Run starts the agent sync loop. Blocks until ctx is cancelled.
|
||||||
func (a *Agent) Run(ctx context.Context) error {
|
func (a *Agent) Run(ctx context.Context) error {
|
||||||
|
ctx, a.cancel = context.WithCancel(ctx)
|
||||||
|
defer close(a.done)
|
||||||
|
|
||||||
if a.cryptoStore != nil {
|
if a.cryptoStore != nil {
|
||||||
defer a.cryptoStore.Close()
|
defer a.cryptoStore.Close()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
||||||
@@ -47,6 +48,7 @@ func main() {
|
|||||||
listCmd(mgr),
|
listCmd(mgr),
|
||||||
startCmd(mgr, &binPath),
|
startCmd(mgr, &binPath),
|
||||||
stopCmd(mgr),
|
stopCmd(mgr),
|
||||||
|
reloadCmd(mgr),
|
||||||
removeCmd(mgr),
|
removeCmd(mgr),
|
||||||
avatarCmd(),
|
avatarCmd(),
|
||||||
displaynameCmd(),
|
displaynameCmd(),
|
||||||
@@ -171,6 +173,48 @@ func stopCmd(mgr *process.Manager) *cobra.Command {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── reload ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
func reloadCmd(mgr *process.Manager) *cobra.Command {
|
||||||
|
return &cobra.Command{
|
||||||
|
Use: "reload [agent-id]",
|
||||||
|
Short: "Hot-reload an agent (or all agents) without stopping the launcher",
|
||||||
|
Long: `Sends SIGHUP to the running launcher, which triggers a hot-reload.
|
||||||
|
If an agent-id is given, only that agent is reloaded.
|
||||||
|
If no agent-id is given, all agents are reloaded.
|
||||||
|
|
||||||
|
The launcher must be running. Use 'agentctl start' first if needed.`,
|
||||||
|
Args: cobra.MaximumNArgs(1),
|
||||||
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
pid := mgr.UnifiedPID()
|
||||||
|
if pid <= 0 {
|
||||||
|
return fmt.Errorf("launcher is not running")
|
||||||
|
}
|
||||||
|
|
||||||
|
target := ""
|
||||||
|
if len(args) == 1 {
|
||||||
|
target = args[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
if target != "" {
|
||||||
|
if err := os.WriteFile("run/reload.txt", []byte(target), 0o644); err != nil {
|
||||||
|
return fmt.Errorf("write reload target: %w", err)
|
||||||
|
}
|
||||||
|
fmt.Printf("reload %-20s sending SIGHUP to PID %d\n", target, pid)
|
||||||
|
} else {
|
||||||
|
// Remove any stale reload.txt so SIGHUP reloads all agents.
|
||||||
|
_ = os.Remove("run/reload.txt")
|
||||||
|
fmt.Printf("reload %-20s sending SIGHUP to PID %d\n", "(all)", pid)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := syscall.Kill(pid, syscall.SIGHUP); err != nil {
|
||||||
|
return fmt.Errorf("kill -HUP %d: %w", pid, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ── remove ────────────────────────────────────────────────────────────────
|
// ── remove ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
func removeCmd(mgr *process.Manager) *cobra.Command {
|
func removeCmd(mgr *process.Manager) *cobra.Command {
|
||||||
|
|||||||
+64
-28
@@ -12,7 +12,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -72,13 +71,7 @@ func main() {
|
|||||||
logger.Warn("could not create file logger, falling back to stdout", "err", err)
|
logger.Warn("could not create file logger, falling back to stdout", "err", err)
|
||||||
launcherCleanup = func() {}
|
launcherCleanup = func() {}
|
||||||
}
|
}
|
||||||
var cleanups []func()
|
defer launcherCleanup()
|
||||||
cleanups = append(cleanups, launcherCleanup)
|
|
||||||
defer func() {
|
|
||||||
for _, fn := range cleanups {
|
|
||||||
fn()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
if len(configPaths) == 0 {
|
if len(configPaths) == 0 {
|
||||||
logger.Warn("no agent configs found — nothing to start")
|
logger.Warn("no agent configs found — nothing to start")
|
||||||
@@ -96,15 +89,49 @@ func main() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// Non-fatal: orchestration is optional
|
// Non-fatal: orchestration is optional
|
||||||
logger.Warn("orchestrator not started", "err", err)
|
logger.Warn("orchestrator not started", "err", err)
|
||||||
} else {
|
} else if orch != nil {
|
||||||
logger.Info("orchestrator initialized")
|
logger.Info("orchestrator initialized")
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Start normal agents ──
|
// ── Shared dependencies for agent registry ──
|
||||||
var wg sync.WaitGroup
|
deps := &launchDeps{
|
||||||
var scannerOnce sync.Once
|
agentBus: agentBus,
|
||||||
var scanner *mautrix.Client
|
orch: orch,
|
||||||
|
logDir: logDir,
|
||||||
|
logLevel: lvl,
|
||||||
|
parentCtx: ctx,
|
||||||
|
}
|
||||||
|
registry := newAgentRegistry(deps)
|
||||||
|
|
||||||
|
// ── SIGHUP: hot-reload individual agent or all agents ──
|
||||||
|
sighup := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sighup, syscall.SIGHUP)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case _, ok := <-sighup:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
id := readReloadTarget("run/reload.txt")
|
||||||
|
// Remove the target file after reading so it doesn't
|
||||||
|
// affect the next SIGHUP.
|
||||||
|
_ = os.Remove("run/reload.txt")
|
||||||
|
if id == "" {
|
||||||
|
logger.Info("sighup: reloading all agents")
|
||||||
|
registry.reloadAll(rulesFor)
|
||||||
|
} else {
|
||||||
|
logger.Info("sighup: reloading agent", "id", id)
|
||||||
|
registry.reload(id, rulesFor)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// ── Start normal agents ──
|
||||||
|
var scannerOnce scanOnce
|
||||||
for _, path := range configPaths {
|
for _, path := range configPaths {
|
||||||
path := path
|
path := path
|
||||||
cfg, err := config.Load(path)
|
cfg, err := config.Load(path)
|
||||||
@@ -130,11 +157,11 @@ func main() {
|
|||||||
agentLogger = logger.With("agent", cfg.Agent.ID)
|
agentLogger = logger.With("agent", cfg.Agent.ID)
|
||||||
agentCleanup = func() {}
|
agentCleanup = func() {}
|
||||||
}
|
}
|
||||||
cleanups = append(cleanups, agentCleanup)
|
|
||||||
|
|
||||||
a, err := agents.New(cfg, rules, agentLogger)
|
a, err := agents.New(cfg, rules, agentLogger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("failed to create agent", "id", cfg.Agent.ID, "err", err)
|
logger.Error("failed to create agent", "id", cfg.Agent.ID, "err", err)
|
||||||
|
agentCleanup()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -154,30 +181,28 @@ func main() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// Grab the first available Matrix client for room scanning
|
// Grab the first available Matrix client for room scanning
|
||||||
scannerOnce.Do(func() {
|
scannerOnce.set(a.RawMatrixClient())
|
||||||
scanner = a.RawMatrixClient()
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
registry.register(&runningAgent{
|
||||||
go func() {
|
agent: a,
|
||||||
defer wg.Done()
|
cfg: cfg,
|
||||||
agentLogger.Info("agent running")
|
cfgPath: path,
|
||||||
if err := a.Run(ctx); err != nil {
|
logger: agentLogger,
|
||||||
agentLogger.Error("agent stopped with error", "err", err)
|
logCleanup: agentCleanup,
|
||||||
}
|
})
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Startup room scan (after all participants are registered) ──
|
// ── Startup room scan (after all participants are registered) ──
|
||||||
if orch != nil && scanner != nil {
|
if orch != nil && scannerOnce.client != nil {
|
||||||
orch.orchestrator.SetScanner(scanner)
|
orch.orchestrator.SetScanner(scannerOnce.client)
|
||||||
scanCtx, scanCancel := context.WithTimeout(ctx, 30*time.Second)
|
scanCtx, scanCancel := context.WithTimeout(ctx, 30*time.Second)
|
||||||
orch.orchestrator.ScanExistingRooms(scanCtx)
|
orch.orchestrator.ScanExistingRooms(scanCtx)
|
||||||
scanCancel()
|
scanCancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
registry.waitAll()
|
||||||
|
registry.cleanupLogs()
|
||||||
logger.Info("all agents stopped")
|
logger.Info("all agents stopped")
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
@@ -195,6 +220,17 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// scanOnce captures the first Matrix client for room scanning.
|
||||||
|
type scanOnce struct {
|
||||||
|
client *mautrix.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *scanOnce) set(c *mautrix.Client) {
|
||||||
|
if s.client == nil {
|
||||||
|
s.client = c
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// orchHandle wraps a running orchestrator with its config for the launcher.
|
// orchHandle wraps a running orchestrator with its config for the launcher.
|
||||||
type orchHandle struct {
|
type orchHandle struct {
|
||||||
orchestrator *orchshell.Orchestrator
|
orchestrator *orchshell.Orchestrator
|
||||||
|
|||||||
@@ -0,0 +1,231 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/enmanuel/agents/agents"
|
||||||
|
"github.com/enmanuel/agents/internal/config"
|
||||||
|
"github.com/enmanuel/agents/pkg/decision"
|
||||||
|
"github.com/enmanuel/agents/pkg/orchestration"
|
||||||
|
"github.com/enmanuel/agents/shell/bus"
|
||||||
|
agentlog "github.com/enmanuel/agents/shell/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
// runningAgent holds a live agent and the metadata needed to recreate it.
|
||||||
|
type runningAgent struct {
|
||||||
|
agent *agents.Agent
|
||||||
|
cfg *config.AgentConfig
|
||||||
|
cfgPath string
|
||||||
|
logger *slog.Logger
|
||||||
|
logCleanup func()
|
||||||
|
}
|
||||||
|
|
||||||
|
// launchDeps holds shared resources needed to start/reload agents.
|
||||||
|
type launchDeps struct {
|
||||||
|
agentBus *bus.Bus
|
||||||
|
orch *orchHandle
|
||||||
|
logDir string
|
||||||
|
logLevel slog.Level
|
||||||
|
parentCtx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
// agentRegistry tracks all running agents by ID, enabling individual hot-reload.
|
||||||
|
type agentRegistry struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
agents map[string]*runningAgent
|
||||||
|
deps *launchDeps
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAgentRegistry(deps *launchDeps) *agentRegistry {
|
||||||
|
return &agentRegistry{
|
||||||
|
agents: make(map[string]*runningAgent),
|
||||||
|
deps: deps,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// register adds a running agent to the registry and starts its goroutine.
|
||||||
|
func (r *agentRegistry) register(ra *runningAgent) {
|
||||||
|
r.mu.Lock()
|
||||||
|
r.agents[ra.cfg.Agent.ID] = ra
|
||||||
|
r.mu.Unlock()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
ra.logger.Info("agent running")
|
||||||
|
if err := ra.agent.Run(r.deps.parentCtx); err != nil {
|
||||||
|
ra.logger.Error("agent stopped with error", "err", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// stopAndWait stops a running agent and waits for it to finish.
|
||||||
|
// Caller must NOT hold r.mu.
|
||||||
|
func (r *agentRegistry) stopAndWait(id string) {
|
||||||
|
r.mu.Lock()
|
||||||
|
ra, ok := r.agents[id]
|
||||||
|
r.mu.Unlock()
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ra.agent.Stop()
|
||||||
|
select {
|
||||||
|
case <-ra.agent.Done():
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
ra.logger.Warn("agent did not stop within 10s, forcing", "id", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsubscribe from bus so no stale channel remains.
|
||||||
|
r.deps.agentBus.Unsubscribe(bus.AgentID(id))
|
||||||
|
}
|
||||||
|
|
||||||
|
// reload stops an agent, re-reads its config, recreates it, and restarts it.
|
||||||
|
func (r *agentRegistry) reload(id string, rulesFor func(string, *slog.Logger) []decision.Rule) {
|
||||||
|
r.mu.Lock()
|
||||||
|
ra, ok := r.agents[id]
|
||||||
|
r.mu.Unlock()
|
||||||
|
if !ok {
|
||||||
|
slog.Warn("reload: agent not found", "id", id)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cfgPath := ra.cfgPath
|
||||||
|
oldCleanup := ra.logCleanup
|
||||||
|
|
||||||
|
ra.logger.Info("agent_reload_start", "id", id)
|
||||||
|
|
||||||
|
// 1. Stop current instance and wait.
|
||||||
|
r.stopAndWait(id)
|
||||||
|
|
||||||
|
// 2. Cleanup old log writer.
|
||||||
|
if oldCleanup != nil {
|
||||||
|
oldCleanup()
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Re-read config.
|
||||||
|
cfg, err := config.Load(cfgPath)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("reload: failed to load config", "path", cfgPath, "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !cfg.Agent.Enabled {
|
||||||
|
slog.Info("reload: agent is disabled, not restarting", "id", id)
|
||||||
|
r.mu.Lock()
|
||||||
|
delete(r.agents, id)
|
||||||
|
r.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. New per-agent logger.
|
||||||
|
newLogger, newCleanup, aErr := agentlog.NewAgentLogger(agentlog.LoggerConfig{
|
||||||
|
BaseDir: r.deps.logDir,
|
||||||
|
AgentID: cfg.Agent.ID,
|
||||||
|
Level: r.deps.logLevel,
|
||||||
|
})
|
||||||
|
if aErr != nil {
|
||||||
|
newLogger = slog.Default().With("agent", cfg.Agent.ID)
|
||||||
|
newCleanup = func() {}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Create new agent (validates config before discarding the old one).
|
||||||
|
rules := rulesFor(cfg.Agent.ID, newLogger)
|
||||||
|
newAgent, err := agents.New(cfg, rules, newLogger)
|
||||||
|
if err != nil {
|
||||||
|
newLogger.Error("reload: failed to create agent", "id", id, "err", err)
|
||||||
|
newCleanup()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 6. Wire bus and orchestration.
|
||||||
|
newAgent.SetBus(r.deps.agentBus)
|
||||||
|
if r.deps.orch != nil {
|
||||||
|
newAgent.SetInterceptor(r.deps.orch.orchestrator.Intercept)
|
||||||
|
newAgent.SetMembershipNotify(r.deps.orch.orchestrator.NotifyMembership)
|
||||||
|
r.deps.orch.orchestrator.RegisterParticipant(orchestration.ParticipantInfo{
|
||||||
|
ID: cfg.Agent.ID,
|
||||||
|
MatrixUserID: cfg.Matrix.UserID,
|
||||||
|
Description: cfg.Agent.Description,
|
||||||
|
Capabilities: cfg.Agent.Tags,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
newRA := &runningAgent{
|
||||||
|
agent: newAgent,
|
||||||
|
cfg: cfg,
|
||||||
|
cfgPath: cfgPath,
|
||||||
|
logger: newLogger,
|
||||||
|
logCleanup: newCleanup,
|
||||||
|
}
|
||||||
|
|
||||||
|
r.mu.Lock()
|
||||||
|
r.agents[id] = newRA
|
||||||
|
r.mu.Unlock()
|
||||||
|
|
||||||
|
// 7. Start new goroutine.
|
||||||
|
go func() {
|
||||||
|
newLogger.Info("agent running")
|
||||||
|
if err := newAgent.Run(r.deps.parentCtx); err != nil {
|
||||||
|
newLogger.Error("agent stopped with error", "err", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
newLogger.Info("agent_reloaded", "id", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// reloadAll reloads every registered agent sequentially.
|
||||||
|
func (r *agentRegistry) reloadAll(rulesFor func(string, *slog.Logger) []decision.Rule) {
|
||||||
|
r.mu.Lock()
|
||||||
|
ids := make([]string, 0, len(r.agents))
|
||||||
|
for id := range r.agents {
|
||||||
|
ids = append(ids, id)
|
||||||
|
}
|
||||||
|
r.mu.Unlock()
|
||||||
|
|
||||||
|
for _, id := range ids {
|
||||||
|
r.reload(id, rulesFor)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitAll blocks until all registered agents have stopped.
|
||||||
|
func (r *agentRegistry) waitAll() {
|
||||||
|
r.mu.Lock()
|
||||||
|
dones := make([]<-chan struct{}, 0, len(r.agents))
|
||||||
|
for _, ra := range r.agents {
|
||||||
|
dones = append(dones, ra.agent.Done())
|
||||||
|
}
|
||||||
|
r.mu.Unlock()
|
||||||
|
|
||||||
|
for _, done := range dones {
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupLogs calls every agent's log cleanup function (called on launcher shutdown).
|
||||||
|
func (r *agentRegistry) cleanupLogs() {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
for _, ra := range r.agents {
|
||||||
|
if ra.logCleanup != nil {
|
||||||
|
ra.logCleanup()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// readReloadTarget reads the given file and returns the trimmed content.
|
||||||
|
// Returns "" if the file doesn't exist, is empty, or equals "*" (meaning reload all).
|
||||||
|
func readReloadTarget(path string) string {
|
||||||
|
data, err := os.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
id := strings.TrimSpace(string(data))
|
||||||
|
if id == "*" {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return id
|
||||||
|
}
|
||||||
|
|
||||||
@@ -0,0 +1,58 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestReadReloadTarget_missing(t *testing.T) {
|
||||||
|
got := readReloadTarget(filepath.Join(t.TempDir(), "reload.txt"))
|
||||||
|
if got != "" {
|
||||||
|
t.Fatalf("expected empty string for missing file, got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadReloadTarget_empty(t *testing.T) {
|
||||||
|
f := filepath.Join(t.TempDir(), "reload.txt")
|
||||||
|
if err := os.WriteFile(f, []byte(""), 0o644); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
got := readReloadTarget(f)
|
||||||
|
if got != "" {
|
||||||
|
t.Fatalf("expected empty string for empty file, got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadReloadTarget_star(t *testing.T) {
|
||||||
|
f := filepath.Join(t.TempDir(), "reload.txt")
|
||||||
|
if err := os.WriteFile(f, []byte("*\n"), 0o644); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
got := readReloadTarget(f)
|
||||||
|
if got != "" {
|
||||||
|
t.Fatalf("expected empty string for '*', got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadReloadTarget_agentID(t *testing.T) {
|
||||||
|
f := filepath.Join(t.TempDir(), "reload.txt")
|
||||||
|
if err := os.WriteFile(f, []byte("assistant-bot\n"), 0o644); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
got := readReloadTarget(f)
|
||||||
|
if got != "assistant-bot" {
|
||||||
|
t.Fatalf("expected 'assistant-bot', got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadReloadTarget_whitespace(t *testing.T) {
|
||||||
|
f := filepath.Join(t.TempDir(), "reload.txt")
|
||||||
|
if err := os.WriteFile(f, []byte(" asistente-2 \n"), 0o644); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
got := readReloadTarget(f)
|
||||||
|
if got != "asistente-2" {
|
||||||
|
t.Fatalf("expected 'asistente-2', got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -17,7 +17,7 @@ afectados y notas de implementacion.
|
|||||||
| 10 | Access control | [0010-access-control.md](completed/0010-access-control.md) | completado |
|
| 10 | Access control | [0010-access-control.md](completed/0010-access-control.md) | completado |
|
||||||
| 11 | Markdown rendering | [0011-markdown-rendering.md](completed/0011-markdown-rendering.md) | completado |
|
| 11 | Markdown rendering | [0011-markdown-rendering.md](completed/0011-markdown-rendering.md) | completado |
|
||||||
| 12 | Threads | [0012-threads.md](completed/0012-threads.md) | completado |
|
| 12 | Threads | [0012-threads.md](completed/0012-threads.md) | completado |
|
||||||
| 13 | Hot reload | [0013-hot-reload.md](0013-hot-reload.md) | pendiente |
|
| 13 | Hot reload | [0013-hot-reload.md](completed/0013-hot-reload.md) | completado |
|
||||||
| 14 | Template agent standardize | [0014-template-agent-standardize.md](0014-template-agent-standardize.md) | pendiente |
|
| 14 | Template agent standardize | [0014-template-agent-standardize.md](0014-template-agent-standardize.md) | pendiente |
|
||||||
| 15 | Multi-platform Telegram | [0015-multi-platform-telegram.md](0015-multi-platform-telegram.md) | pendiente |
|
| 15 | Multi-platform Telegram | [0015-multi-platform-telegram.md](0015-multi-platform-telegram.md) | pendiente |
|
||||||
| 16 | Skills system | [0016-skills-system.md](0016-skills-system.md) | pendiente |
|
| 16 | Skills system | [0016-skills-system.md](0016-skills-system.md) | pendiente |
|
||||||
|
|||||||
@@ -0,0 +1,91 @@
|
|||||||
|
package bus_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log/slog"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/enmanuel/agents/shell/bus"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newBus() *bus.Bus {
|
||||||
|
return bus.New(slog.Default())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubscribeAndSend(t *testing.T) {
|
||||||
|
b := newBus()
|
||||||
|
ch := b.Subscribe("agent-a")
|
||||||
|
|
||||||
|
msg := bus.AgentMessage{From: "orch", To: "agent-a", Kind: bus.KindTask, Payload: map[string]string{"k": "v"}}
|
||||||
|
if err := b.Send(msg); err != nil {
|
||||||
|
t.Fatalf("Send: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got := <-ch
|
||||||
|
if got.Kind != bus.KindTask || got.Payload["k"] != "v" {
|
||||||
|
t.Fatalf("unexpected message: %+v", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnsubscribeClosesChannel(t *testing.T) {
|
||||||
|
b := newBus()
|
||||||
|
ch := b.Subscribe("agent-b")
|
||||||
|
|
||||||
|
b.Unsubscribe("agent-b")
|
||||||
|
|
||||||
|
// Channel must be closed — reading from a closed channel returns zero value + ok=false.
|
||||||
|
_, ok := <-ch
|
||||||
|
if ok {
|
||||||
|
t.Fatal("expected channel to be closed after Unsubscribe")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnsubscribeRemovesFromBus(t *testing.T) {
|
||||||
|
b := newBus()
|
||||||
|
b.Subscribe("agent-c")
|
||||||
|
b.Unsubscribe("agent-c")
|
||||||
|
|
||||||
|
// Sending after unsubscribe must return an error, not panic.
|
||||||
|
err := b.Send(bus.AgentMessage{To: "agent-c", Kind: "ping"})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error when sending to unsubscribed agent")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnsubscribeIdempotent(t *testing.T) {
|
||||||
|
b := newBus()
|
||||||
|
b.Subscribe("agent-d")
|
||||||
|
// Double unsubscribe must not panic.
|
||||||
|
b.Unsubscribe("agent-d")
|
||||||
|
b.Unsubscribe("agent-d")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnsubscribeNonExistent(t *testing.T) {
|
||||||
|
b := newBus()
|
||||||
|
// Unsubscribing an ID that was never subscribed must not panic.
|
||||||
|
b.Unsubscribe("does-not-exist")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSendToUnknownAgent(t *testing.T) {
|
||||||
|
b := newBus()
|
||||||
|
err := b.Send(bus.AgentMessage{To: "ghost", Kind: "hello"})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error when sending to unknown agent")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResubscribeAfterUnsubscribe(t *testing.T) {
|
||||||
|
b := newBus()
|
||||||
|
b.Subscribe("agent-e")
|
||||||
|
b.Unsubscribe("agent-e")
|
||||||
|
|
||||||
|
// Re-subscribe must work and deliver messages.
|
||||||
|
ch2 := b.Subscribe("agent-e")
|
||||||
|
msg := bus.AgentMessage{To: "agent-e", Kind: "ping"}
|
||||||
|
if err := b.Send(msg); err != nil {
|
||||||
|
t.Fatalf("Send after re-subscribe: %v", err)
|
||||||
|
}
|
||||||
|
got := <-ch2
|
||||||
|
if got.Kind != "ping" {
|
||||||
|
t.Fatalf("unexpected kind: %q", got.Kind)
|
||||||
|
}
|
||||||
|
}
|
||||||
+22
-5
@@ -4,8 +4,10 @@ package tui
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"strings"
|
"strings"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
tea "github.com/charmbracelet/bubbletea"
|
tea "github.com/charmbracelet/bubbletea"
|
||||||
@@ -140,13 +142,28 @@ func (a *Adapter) disableAgent(id string) tea.Cmd {
|
|||||||
|
|
||||||
func (a *Adapter) restartAgent(id string) tea.Cmd {
|
func (a *Adapter) restartAgent(id string) tea.Cmd {
|
||||||
return func() tea.Msg {
|
return func() tea.Msg {
|
||||||
_ = a.mgr.StopUnified()
|
pid := a.mgr.UnifiedPID()
|
||||||
time.Sleep(500 * time.Millisecond)
|
if pid <= 0 {
|
||||||
err := a.mgr.StartUnified()
|
// Launcher not running — fall back to full restart.
|
||||||
if err == nil {
|
_ = a.mgr.StopUnified()
|
||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
err := a.mgr.StartUnified()
|
||||||
|
if err == nil {
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
}
|
||||||
|
return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: err}
|
||||||
}
|
}
|
||||||
return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: err}
|
|
||||||
|
// Launcher is running — write target and send SIGHUP for hot-reload.
|
||||||
|
if id != "" {
|
||||||
|
_ = os.WriteFile("run/reload.txt", []byte(id), 0o644)
|
||||||
|
}
|
||||||
|
err := syscall.Kill(pid, syscall.SIGHUP)
|
||||||
|
if err != nil {
|
||||||
|
return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: err}
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
return puretui.MsgActionDone{AgentID: id, Action: "Restart", Err: nil}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user