fix(launcher): supervisar agentes y reiniciarlos cuando salen sin cancelacion
El launcher salia con status=0 cuando todos los runners (Agent/Robot) terminaban su Run() de forma natural — por ejemplo tras una rotacion de token de Matrix o un drop del sync. systemd, configurado con Restart=on-failure, no relanzaba el proceso al ver salida limpia y los bots quedaban caidos hasta una intervencion manual. Solucion: nueva rutina superviseUntilCanceled en agentRegistry que bloquea sobre waitAll, y si el ctx padre sigue vivo, espera un backoff y llama reloadAll para recrear los runners. Solo cuando el ctx padre se cancela (SIGINT/SIGTERM) la rutina retorna y el launcher sale. main.go pasa a invocar este supervisor en lugar de waitAll directo. Tests: - TestSuperviseUntilCanceled_ReturnsWhenCtxCanceledFirst — empty registry - TestSuperviseUntilCanceled_ReturnsAfterCtxCancelDuringBackoff — cancel durante el backoff debe desbloquear inmediatamente - TestSuperviseUntilCanceled_CallsReloadOnAgentExit — supervisor sigue vivo todo el deadline aunque reload falle por cfgPath invalido Diagnostico: tras varias horas el journalctl mostraba "Deactivated successfully" sin "Stopping" previo (Apr 13 18:22 tras 23h corriendo) y el log del agent registraba "context canceled" tras "starting matrix sync" — sintoma de que mautrix.SyncWithContext salio limpiamente y el ctx.cancel se propago al cerrar la goroutine sin que systemd hubiera enviado SIGTERM. El bucle supervisado lo arregla recreando los runners sin tocar la unit ni depender del Restart de systemd.
This commit is contained in:
@@ -268,9 +268,14 @@ func main() {
|
||||
scanCancel()
|
||||
}
|
||||
|
||||
registry.waitAll()
|
||||
// Supervised loop: wait for all agents, and if the parent context is
|
||||
// still alive (i.e. no SIGINT/SIGTERM received), reload them and keep
|
||||
// going. Protects against the launcher exiting cleanly when all
|
||||
// agent runners terminate naturally (token rotation, sync drop, etc.)
|
||||
// while the supervisor itself is healthy.
|
||||
registry.superviseUntilCanceled(ctx, 5*time.Second, rulesFor, logger)
|
||||
registry.cleanupLogs()
|
||||
logger.Info("all agents stopped")
|
||||
logger.Info("launcher shutting down")
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
@@ -230,6 +230,35 @@ func (r *agentRegistry) waitAll() {
|
||||
}
|
||||
}
|
||||
|
||||
// superviseUntilCanceled blocks until ctx is canceled, restarting agents
|
||||
// (via reloadAll) every time waitAll returns while the parent ctx is alive.
|
||||
// Each restart waits restartBackoff before recreating runners. Used by the
|
||||
// launcher main loop so the process keeps the agents up across token rotation
|
||||
// or sync drops without exiting cleanly to systemd.
|
||||
func (r *agentRegistry) superviseUntilCanceled(
|
||||
ctx context.Context,
|
||||
restartBackoff time.Duration,
|
||||
rulesFor func(string, *slog.Logger) []decision.Rule,
|
||||
logger *slog.Logger,
|
||||
) {
|
||||
for {
|
||||
r.waitAll()
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
if logger != nil {
|
||||
logger.Warn("all agents stopped while launcher active — restarting after backoff",
|
||||
"backoff", restartBackoff.String())
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(restartBackoff):
|
||||
}
|
||||
r.reloadAll(rulesFor)
|
||||
}
|
||||
}
|
||||
|
||||
// cleanupLogs calls every agent's log cleanup function (called on launcher shutdown).
|
||||
func (r *agentRegistry) cleanupLogs() {
|
||||
r.mu.Lock()
|
||||
|
||||
@@ -1,9 +1,21 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/enmanuel/agents/devagents"
|
||||
"github.com/enmanuel/agents/internal/config"
|
||||
"github.com/enmanuel/agents/pkg/command"
|
||||
"github.com/enmanuel/agents/pkg/decision"
|
||||
"github.com/enmanuel/agents/shell/bus"
|
||||
)
|
||||
|
||||
func TestReadReloadTarget_missing(t *testing.T) {
|
||||
@@ -112,3 +124,142 @@ func TestIsSpecialConfig_emptyLoadedMap(t *testing.T) {
|
||||
t.Fatal("expected false when empty specials map")
|
||||
}
|
||||
}
|
||||
|
||||
// ── superviseUntilCanceled tests ──────────────────────────────────────────
|
||||
|
||||
// fakeRunner implements devagents.Runner for tests. Each Run() call increments
|
||||
// runs, then closes done after a tiny delay so waitAll observes a "stopped"
|
||||
// runner. RestartFactory replaces done on each cycle so the supervisor can
|
||||
// recreate the runner via reload.
|
||||
type fakeRunner struct {
|
||||
mu sync.Mutex
|
||||
done chan struct{}
|
||||
runs int32
|
||||
}
|
||||
|
||||
func newFakeRunner() *fakeRunner {
|
||||
return &fakeRunner{done: make(chan struct{})}
|
||||
}
|
||||
|
||||
func (f *fakeRunner) Run(_ context.Context) error {
|
||||
atomic.AddInt32(&f.runs, 1)
|
||||
// Close immediately so the supervisor sees the runner as stopped.
|
||||
go func() {
|
||||
f.mu.Lock()
|
||||
ch := f.done
|
||||
f.mu.Unlock()
|
||||
// Wait briefly so waitAll has time to subscribe.
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
close(ch)
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeRunner) Stop() {}
|
||||
|
||||
func (f *fakeRunner) RegisterCommand(_ command.Spec, _ devagents.CommandHandler) {}
|
||||
|
||||
func (f *fakeRunner) Done() <-chan struct{} {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
return f.done
|
||||
}
|
||||
|
||||
func (f *fakeRunner) reset() {
|
||||
f.mu.Lock()
|
||||
f.done = make(chan struct{})
|
||||
f.mu.Unlock()
|
||||
}
|
||||
|
||||
// silentLogger drops everything; keeps tests quiet.
|
||||
func silentLogger() *slog.Logger {
|
||||
return slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
}
|
||||
|
||||
func TestSuperviseUntilCanceled_ReturnsWhenCtxCanceledFirst(t *testing.T) {
|
||||
r := newAgentRegistry(&launchDeps{agentBus: bus.New(silentLogger())})
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel() // cancel before supervise starts
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
r.superviseUntilCanceled(ctx, 50*time.Millisecond, nil, silentLogger())
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("superviseUntilCanceled did not return after ctx canceled (empty registry)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSuperviseUntilCanceled_ReturnsAfterCtxCancelDuringBackoff(t *testing.T) {
|
||||
// Registry has one fake runner that is already "done". waitAll returns
|
||||
// immediately, supervise enters the backoff select, ctx cancel during
|
||||
// backoff must unblock it.
|
||||
fr := newFakeRunner()
|
||||
close(fr.done) // already done
|
||||
|
||||
r := newAgentRegistry(&launchDeps{agentBus: bus.New(silentLogger())})
|
||||
r.agents["fake"] = &runningAgent{
|
||||
runner: fr,
|
||||
cfg: &config.AgentConfig{Agent: config.AgentMeta{ID: "fake"}},
|
||||
cfgPath: "",
|
||||
logger: silentLogger(),
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
// Long backoff — only ctx cancel should unblock.
|
||||
r.superviseUntilCanceled(ctx, 10*time.Second, nil, silentLogger())
|
||||
close(done)
|
||||
}()
|
||||
// Give supervise a moment to enter the backoff select.
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
cancel()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("superviseUntilCanceled did not return after ctx cancel during backoff")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSuperviseUntilCanceled_CallsReloadOnAgentExit(t *testing.T) {
|
||||
// Track reloadAll invocations via a custom rulesFor function. reload reads
|
||||
// from disk via cfgPath; with cfgPath="" config.Load fails and reload
|
||||
// returns early without invoking rulesFor. So instead of inspecting
|
||||
// rulesFor calls, we observe that the supervisor loops past the backoff
|
||||
// at least once and only stops when ctx is canceled.
|
||||
fr := newFakeRunner()
|
||||
close(fr.done) // already done — waitAll returns immediately
|
||||
|
||||
r := newAgentRegistry(&launchDeps{agentBus: bus.New(silentLogger())})
|
||||
r.agents["fake"] = &runningAgent{
|
||||
runner: fr,
|
||||
cfg: &config.AgentConfig{Agent: config.AgentMeta{ID: "fake"}},
|
||||
cfgPath: "/nonexistent/cfg.yaml", // reload will fail loading; supervisor still loops
|
||||
logger: silentLogger(),
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
|
||||
defer cancel()
|
||||
rulesForCalls := 0
|
||||
rulesFor := func(string, *slog.Logger) []decision.Rule {
|
||||
rulesForCalls++
|
||||
return nil
|
||||
}
|
||||
|
||||
startedAt := time.Now()
|
||||
r.superviseUntilCanceled(ctx, 30*time.Millisecond, rulesFor, silentLogger())
|
||||
elapsed := time.Since(startedAt)
|
||||
|
||||
// Supervisor must wait until the deadline; never exit early on its own.
|
||||
if elapsed < 200*time.Millisecond {
|
||||
t.Fatalf("supervisor returned too early: %s", elapsed)
|
||||
}
|
||||
// rulesFor is not called because reload short-circuits on bad cfgPath, but
|
||||
// the loop must have iterated through several backoff cycles (~6 with 30ms
|
||||
// backoff over 250ms). The test is best-effort — we simply assert the
|
||||
// supervisor stayed alive until ctx deadline.
|
||||
_ = rulesForCalls
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user