diff --git a/cmd/launcher/main.go b/cmd/launcher/main.go index 3e5a3e6..f86e68b 100644 --- a/cmd/launcher/main.go +++ b/cmd/launcher/main.go @@ -268,9 +268,14 @@ func main() { scanCancel() } - registry.waitAll() + // Supervised loop: wait for all agents, and if the parent context is + // still alive (i.e. no SIGINT/SIGTERM received), reload them and keep + // going. Protects against the launcher exiting cleanly when all + // agent runners terminate naturally (token rotation, sync drop, etc.) + // while the supervisor itself is healthy. + registry.superviseUntilCanceled(ctx, 5*time.Second, rulesFor, logger) registry.cleanupLogs() - logger.Info("all agents stopped") + logger.Info("launcher shutting down") return nil }, } diff --git a/cmd/launcher/registry.go b/cmd/launcher/registry.go index dfbb3a5..966bf3d 100644 --- a/cmd/launcher/registry.go +++ b/cmd/launcher/registry.go @@ -230,6 +230,35 @@ func (r *agentRegistry) waitAll() { } } +// superviseUntilCanceled blocks until ctx is canceled, restarting agents +// (via reloadAll) every time waitAll returns while the parent ctx is alive. +// Each restart waits restartBackoff before recreating runners. Used by the +// launcher main loop so the process keeps the agents up across token rotation +// or sync drops without exiting cleanly to systemd. +func (r *agentRegistry) superviseUntilCanceled( + ctx context.Context, + restartBackoff time.Duration, + rulesFor func(string, *slog.Logger) []decision.Rule, + logger *slog.Logger, +) { + for { + r.waitAll() + if ctx.Err() != nil { + return + } + if logger != nil { + logger.Warn("all agents stopped while launcher active — restarting after backoff", + "backoff", restartBackoff.String()) + } + select { + case <-ctx.Done(): + return + case <-time.After(restartBackoff): + } + r.reloadAll(rulesFor) + } +} + // cleanupLogs calls every agent's log cleanup function (called on launcher shutdown). func (r *agentRegistry) cleanupLogs() { r.mu.Lock() diff --git a/cmd/launcher/registry_test.go b/cmd/launcher/registry_test.go index e3b4206..3c98406 100644 --- a/cmd/launcher/registry_test.go +++ b/cmd/launcher/registry_test.go @@ -1,9 +1,21 @@ package main import ( + "context" + "io" + "log/slog" "os" "path/filepath" + "sync" + "sync/atomic" "testing" + "time" + + "github.com/enmanuel/agents/devagents" + "github.com/enmanuel/agents/internal/config" + "github.com/enmanuel/agents/pkg/command" + "github.com/enmanuel/agents/pkg/decision" + "github.com/enmanuel/agents/shell/bus" ) func TestReadReloadTarget_missing(t *testing.T) { @@ -112,3 +124,142 @@ func TestIsSpecialConfig_emptyLoadedMap(t *testing.T) { t.Fatal("expected false when empty specials map") } } + +// ── superviseUntilCanceled tests ────────────────────────────────────────── + +// fakeRunner implements devagents.Runner for tests. Each Run() call increments +// runs, then closes done after a tiny delay so waitAll observes a "stopped" +// runner. RestartFactory replaces done on each cycle so the supervisor can +// recreate the runner via reload. +type fakeRunner struct { + mu sync.Mutex + done chan struct{} + runs int32 +} + +func newFakeRunner() *fakeRunner { + return &fakeRunner{done: make(chan struct{})} +} + +func (f *fakeRunner) Run(_ context.Context) error { + atomic.AddInt32(&f.runs, 1) + // Close immediately so the supervisor sees the runner as stopped. + go func() { + f.mu.Lock() + ch := f.done + f.mu.Unlock() + // Wait briefly so waitAll has time to subscribe. + time.Sleep(5 * time.Millisecond) + close(ch) + }() + return nil +} + +func (f *fakeRunner) Stop() {} + +func (f *fakeRunner) RegisterCommand(_ command.Spec, _ devagents.CommandHandler) {} + +func (f *fakeRunner) Done() <-chan struct{} { + f.mu.Lock() + defer f.mu.Unlock() + return f.done +} + +func (f *fakeRunner) reset() { + f.mu.Lock() + f.done = make(chan struct{}) + f.mu.Unlock() +} + +// silentLogger drops everything; keeps tests quiet. +func silentLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +func TestSuperviseUntilCanceled_ReturnsWhenCtxCanceledFirst(t *testing.T) { + r := newAgentRegistry(&launchDeps{agentBus: bus.New(silentLogger())}) + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel before supervise starts + done := make(chan struct{}) + go func() { + r.superviseUntilCanceled(ctx, 50*time.Millisecond, nil, silentLogger()) + close(done) + }() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("superviseUntilCanceled did not return after ctx canceled (empty registry)") + } +} + +func TestSuperviseUntilCanceled_ReturnsAfterCtxCancelDuringBackoff(t *testing.T) { + // Registry has one fake runner that is already "done". waitAll returns + // immediately, supervise enters the backoff select, ctx cancel during + // backoff must unblock it. + fr := newFakeRunner() + close(fr.done) // already done + + r := newAgentRegistry(&launchDeps{agentBus: bus.New(silentLogger())}) + r.agents["fake"] = &runningAgent{ + runner: fr, + cfg: &config.AgentConfig{Agent: config.AgentMeta{ID: "fake"}}, + cfgPath: "", + logger: silentLogger(), + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + // Long backoff — only ctx cancel should unblock. + r.superviseUntilCanceled(ctx, 10*time.Second, nil, silentLogger()) + close(done) + }() + // Give supervise a moment to enter the backoff select. + time.Sleep(20 * time.Millisecond) + cancel() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("superviseUntilCanceled did not return after ctx cancel during backoff") + } +} + +func TestSuperviseUntilCanceled_CallsReloadOnAgentExit(t *testing.T) { + // Track reloadAll invocations via a custom rulesFor function. reload reads + // from disk via cfgPath; with cfgPath="" config.Load fails and reload + // returns early without invoking rulesFor. So instead of inspecting + // rulesFor calls, we observe that the supervisor loops past the backoff + // at least once and only stops when ctx is canceled. + fr := newFakeRunner() + close(fr.done) // already done — waitAll returns immediately + + r := newAgentRegistry(&launchDeps{agentBus: bus.New(silentLogger())}) + r.agents["fake"] = &runningAgent{ + runner: fr, + cfg: &config.AgentConfig{Agent: config.AgentMeta{ID: "fake"}}, + cfgPath: "/nonexistent/cfg.yaml", // reload will fail loading; supervisor still loops + logger: silentLogger(), + } + + ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer cancel() + rulesForCalls := 0 + rulesFor := func(string, *slog.Logger) []decision.Rule { + rulesForCalls++ + return nil + } + + startedAt := time.Now() + r.superviseUntilCanceled(ctx, 30*time.Millisecond, rulesFor, silentLogger()) + elapsed := time.Since(startedAt) + + // Supervisor must wait until the deadline; never exit early on its own. + if elapsed < 200*time.Millisecond { + t.Fatalf("supervisor returned too early: %s", elapsed) + } + // rulesFor is not called because reload short-circuits on bad cfgPath, but + // the loop must have iterated through several backoff cycles (~6 with 30ms + // backoff over 250ms). The test is best-effort — we simply assert the + // supervisor stayed alive until ctx deadline. + _ = rulesForCalls +}