Files
agents_and_robots/cmd/launcher/registry.go
T
egutierrez bd0c8c0dd3 refactor: mover runtime Go de agents/ a devagents/
agents/ ahora solo contiene carpetas de agentes (config, reglas, prompts).
El runtime (Agent, Robot, Runner, registry, handler, commands, llm, memory)
vive en devagents/ como package devagents.

Cambios:
- git mv agents/*.go → devagents/*.go
- package agents → package devagents en todos los archivos movidos
- Actualizar imports en agents/*/agent.go, cmd/launcher/, dev-scripts/
- Actualizar docs: CLAUDE.md, rules/, docs/e2ee.md, issues pendientes

Build y tests pasan sin errores.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 21:19:25 +00:00

258 lines
6.4 KiB
Go

package main
import (
"context"
"log/slog"
"os"
"strings"
"sync"
"time"
"github.com/enmanuel/agents/devagents"
"github.com/enmanuel/agents/internal/config"
"github.com/enmanuel/agents/pkg/decision"
"github.com/enmanuel/agents/pkg/orchestration"
pksecurity "github.com/enmanuel/agents/pkg/security"
"github.com/enmanuel/agents/shell/bus"
agentlog "github.com/enmanuel/agents/shell/logger"
)
// runningAgent holds a live runner (Agent or Robot) and the metadata needed to recreate it.
type runningAgent struct {
runner devagents.Runner
cfg *config.AgentConfig
cfgPath string
logger *slog.Logger
logCleanup func()
}
// launchDeps holds shared resources needed to start/reload agents.
type launchDeps struct {
agentBus *bus.Bus
orch *orchHandle
logDir string
logLevel slog.Level
parentCtx context.Context
secPolicy pksecurity.SecurityPolicy // centralized security policy loaded from security/
}
// agentRegistry tracks all running agents by ID, enabling individual hot-reload.
type agentRegistry struct {
mu sync.Mutex
agents map[string]*runningAgent
deps *launchDeps
}
func newAgentRegistry(deps *launchDeps) *agentRegistry {
return &agentRegistry{
agents: make(map[string]*runningAgent),
deps: deps,
}
}
// register adds a running agent/robot to the registry and starts its goroutine.
func (r *agentRegistry) register(ra *runningAgent) {
r.mu.Lock()
r.agents[ra.cfg.Agent.ID] = ra
r.mu.Unlock()
runtimeType := ra.cfg.Agent.Type
if runtimeType == "" {
runtimeType = "agent"
}
go func() {
ra.logger.Info("runner started", "type", runtimeType)
if err := ra.runner.Run(r.deps.parentCtx); err != nil {
ra.logger.Error("runner stopped with error", "err", err, "type", runtimeType)
}
}()
}
// stopAndWait stops a running agent/robot and waits for it to finish.
// Caller must NOT hold r.mu.
func (r *agentRegistry) stopAndWait(id string) {
r.mu.Lock()
ra, ok := r.agents[id]
r.mu.Unlock()
if !ok {
return
}
ra.runner.Stop()
select {
case <-ra.runner.Done():
case <-time.After(10 * time.Second):
ra.logger.Warn("runner did not stop within 10s, forcing", "id", id)
}
// Unsubscribe from bus so no stale channel remains.
r.deps.agentBus.Unsubscribe(bus.AgentID(id))
}
// reload stops an agent, re-reads its config, recreates it, and restarts it.
func (r *agentRegistry) reload(id string, rulesFor func(string, *slog.Logger) []decision.Rule) {
r.mu.Lock()
ra, ok := r.agents[id]
r.mu.Unlock()
if !ok {
slog.Warn("reload: agent not found", "id", id)
return
}
cfgPath := ra.cfgPath
oldCleanup := ra.logCleanup
ra.logger.Info("agent_reload_start", "id", id)
// 1. Stop current instance and wait.
r.stopAndWait(id)
// 2. Cleanup old log writer.
if oldCleanup != nil {
oldCleanup()
}
// 3. Re-read config.
cfg, err := config.Load(cfgPath)
if err != nil {
slog.Error("reload: failed to load config", "path", cfgPath, "err", err)
return
}
if !cfg.Agent.Enabled {
slog.Info("reload: agent is disabled, not restarting", "id", id)
r.mu.Lock()
delete(r.agents, id)
r.mu.Unlock()
return
}
// 4. New per-agent logger.
newLogger, newCleanup, aErr := agentlog.NewAgentLogger(agentlog.LoggerConfig{
BaseDir: r.deps.logDir,
AgentID: cfg.Agent.ID,
Level: r.deps.logLevel,
})
if aErr != nil {
newLogger = slog.Default().With("agent", cfg.Agent.ID)
newCleanup = func() {}
}
// 5. Create new runner (validates config before discarding the old one).
var newRunner devagents.Runner
if cfg.Agent.Type == "robot" {
robot, rErr := devagents.NewRobot(cfg, newLogger)
if rErr != nil {
newLogger.Error("reload: failed to create robot", "id", id, "err", rErr)
newCleanup()
return
}
newRunner = robot
} else {
rules := rulesFor(cfg.Agent.ID, newLogger)
agentACL := pksecurity.ResolveACL(cfg.Agent.ID, r.deps.secPolicy)
newLogger.Debug("resolved acl for agent (reload)", "agent", cfg.Agent.ID, "acl_empty", agentACL.Empty())
newAgent, aErr := devagents.New(cfg, rules, agentACL, newLogger)
if aErr != nil {
newLogger.Error("reload: failed to create agent", "id", id, "err", aErr)
newCleanup()
return
}
// Wire bus and orchestration (only for agents, not robots).
newAgent.SetBus(r.deps.agentBus)
if r.deps.orch != nil {
newAgent.SetInterceptor(r.deps.orch.orchestrator.Intercept)
newAgent.SetMembershipNotify(r.deps.orch.orchestrator.NotifyMembership)
r.deps.orch.orchestrator.RegisterParticipant(orchestration.ParticipantInfo{
ID: cfg.Agent.ID,
MatrixUserID: cfg.Matrix.UserID,
Description: cfg.Agent.Description,
Capabilities: cfg.Agent.Tags,
})
}
newRunner = newAgent
}
newRA := &runningAgent{
runner: newRunner,
cfg: cfg,
cfgPath: cfgPath,
logger: newLogger,
logCleanup: newCleanup,
}
r.mu.Lock()
r.agents[id] = newRA
r.mu.Unlock()
// 7. Start new goroutine.
runtimeType := cfg.Agent.Type
if runtimeType == "" {
runtimeType = "agent"
}
go func() {
newLogger.Info("runner started", "type", runtimeType)
if err := newRunner.Run(r.deps.parentCtx); err != nil {
newLogger.Error("runner stopped with error", "err", err, "type", runtimeType)
}
}()
newLogger.Info("runner_reloaded", "id", id, "type", runtimeType)
}
// reloadAll reloads every registered agent sequentially.
func (r *agentRegistry) reloadAll(rulesFor func(string, *slog.Logger) []decision.Rule) {
r.mu.Lock()
ids := make([]string, 0, len(r.agents))
for id := range r.agents {
ids = append(ids, id)
}
r.mu.Unlock()
for _, id := range ids {
r.reload(id, rulesFor)
}
}
// waitAll blocks until all registered runners have stopped.
func (r *agentRegistry) waitAll() {
r.mu.Lock()
dones := make([]<-chan struct{}, 0, len(r.agents))
for _, ra := range r.agents {
dones = append(dones, ra.runner.Done())
}
r.mu.Unlock()
for _, done := range dones {
<-done
}
}
// cleanupLogs calls every agent's log cleanup function (called on launcher shutdown).
func (r *agentRegistry) cleanupLogs() {
r.mu.Lock()
defer r.mu.Unlock()
for _, ra := range r.agents {
if ra.logCleanup != nil {
ra.logCleanup()
}
}
}
// readReloadTarget reads the given file and returns the trimmed content.
// Returns "" if the file doesn't exist, is empty, or equals "*" (meaning reload all).
func readReloadTarget(path string) string {
data, err := os.ReadFile(path)
if err != nil {
return ""
}
id := strings.TrimSpace(string(data))
if id == "*" {
return ""
}
return id
}