2f3f30c86b
El launcher ahora escanea agents/_specials/*/config.yaml ademas de agents/*/config.yaml para descubrir agentes del sistema con identidad Matrix (ej: father-bot). Los SpecialConfig ya cargados (orchestrator) se detectan y saltan via isSpecialConfig() para evitar errores de validacion. Tambien actualiza dev-scripts/_common.sh para que config_path_for() y list_agents_raw() incluyan _specials/ en la busqueda. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
373 lines
11 KiB
Go
373 lines
11 KiB
Go
// Command launcher starts one or more agents from their config files.
|
|
//
|
|
// Usage:
|
|
//
|
|
// go run ./cmd/launcher # auto-discovers agents/*/config.yaml
|
|
// go run ./cmd/launcher -c agents/assistant/config.yaml
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"syscall"
|
|
"time"
|
|
|
|
"maunium.net/go/mautrix"
|
|
|
|
"github.com/spf13/cobra"
|
|
|
|
"github.com/enmanuel/agents/devagents"
|
|
"github.com/enmanuel/agents/internal/config"
|
|
"github.com/enmanuel/agents/pkg/decision"
|
|
"github.com/enmanuel/agents/pkg/orchestration"
|
|
pksecurity "github.com/enmanuel/agents/pkg/security"
|
|
"github.com/enmanuel/agents/shell/bus"
|
|
agentlog "github.com/enmanuel/agents/shell/logger"
|
|
orchshell "github.com/enmanuel/agents/shell/orchestration"
|
|
shellsecurity "github.com/enmanuel/agents/shell/security"
|
|
|
|
// Blank imports: each agent self-registers its rules via init().
|
|
_ "github.com/enmanuel/agents/agents/assistant-bot"
|
|
_ "github.com/enmanuel/agents/agents/asistente-2"
|
|
_ "github.com/enmanuel/agents/agents/meteorologo"
|
|
_ "github.com/enmanuel/agents/agents/test-personality"
|
|
_ "github.com/enmanuel/agents/agents/_specials/father-bot"
|
|
testbot "github.com/enmanuel/agents/agents/test-bot"
|
|
)
|
|
|
|
func main() {
|
|
var (
|
|
configPaths []string
|
|
logLevel string
|
|
logDir string
|
|
)
|
|
|
|
root := &cobra.Command{
|
|
Use: "launcher",
|
|
Short: "Start Matrix agents from config files",
|
|
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
|
|
if len(configPaths) == 0 {
|
|
matches, _ := filepath.Glob("agents/*/config.yaml")
|
|
configPaths = matches
|
|
// Also discover agent-type specials (e.g. father-bot).
|
|
// SpecialConfig middleware (orchestrator) is handled separately.
|
|
specials, _ := filepath.Glob("agents/_specials/*/config.yaml")
|
|
configPaths = append(configPaths, specials...)
|
|
}
|
|
return nil
|
|
},
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
lvl := parseLogLevel(logLevel)
|
|
|
|
// ── Launcher-level logger ──
|
|
logger, launcherCleanup, err := agentlog.NewAgentLogger(agentlog.LoggerConfig{
|
|
BaseDir: logDir,
|
|
AgentID: "launcher",
|
|
Level: lvl,
|
|
})
|
|
if err != nil {
|
|
// Fallback to stdout if file logger fails.
|
|
logger = newLogger(logLevel)
|
|
logger.Warn("could not create file logger, falling back to stdout", "err", err)
|
|
launcherCleanup = func() {}
|
|
}
|
|
defer launcherCleanup()
|
|
|
|
if len(configPaths) == 0 {
|
|
logger.Warn("no agent configs found — nothing to start")
|
|
return nil
|
|
}
|
|
|
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
// ── Load centralized security policy ──
|
|
secPolicy, secErr := shellsecurity.Load("security/")
|
|
if secErr != nil {
|
|
logger.Warn("security policy load failed, using empty policy (open access)", "err", secErr)
|
|
secPolicy = pksecurity.SecurityPolicy{}
|
|
} else {
|
|
logger.Info("security policy loaded",
|
|
"user_groups", len(secPolicy.UserGroups),
|
|
"agent_groups", len(secPolicy.AgentGroups),
|
|
"policies", len(secPolicy.Policies),
|
|
)
|
|
}
|
|
|
|
// ── Shared bus for inter-agent communication ──
|
|
agentBus := bus.New(logger)
|
|
|
|
// ── Start special agents (orchestrator, etc.) BEFORE normal bots ──
|
|
orch, err := startOrchestrator(agentBus, logger)
|
|
if err != nil {
|
|
// Non-fatal: orchestration is optional
|
|
logger.Warn("orchestrator not started", "err", err)
|
|
} else if orch != nil {
|
|
logger.Info("orchestrator initialized")
|
|
}
|
|
|
|
// ── Shared dependencies for agent registry ──
|
|
deps := &launchDeps{
|
|
agentBus: agentBus,
|
|
orch: orch,
|
|
logDir: logDir,
|
|
logLevel: lvl,
|
|
parentCtx: ctx,
|
|
secPolicy: secPolicy,
|
|
}
|
|
registry := newAgentRegistry(deps)
|
|
|
|
// ── SIGHUP: hot-reload individual agent or all agents ──
|
|
sighup := make(chan os.Signal, 1)
|
|
signal.Notify(sighup, syscall.SIGHUP)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case _, ok := <-sighup:
|
|
if !ok {
|
|
return
|
|
}
|
|
id := readReloadTarget("run/reload.txt")
|
|
// Remove the target file after reading so it doesn't
|
|
// affect the next SIGHUP.
|
|
_ = os.Remove("run/reload.txt")
|
|
if id == "" {
|
|
logger.Info("sighup: reloading all agents")
|
|
registry.reloadAll(rulesFor)
|
|
} else {
|
|
logger.Info("sighup: reloading agent", "id", id)
|
|
registry.reload(id, rulesFor)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// ── Start normal agents ──
|
|
// Build a set of special IDs already loaded (e.g. orchestrator)
|
|
// so the discovery loop skips them instead of failing on validation.
|
|
loadedSpecials := make(map[string]bool)
|
|
if orch != nil {
|
|
loadedSpecials[orch.cfg.Special.ID] = true
|
|
}
|
|
|
|
var scannerOnce scanOnce
|
|
for _, path := range configPaths {
|
|
path := path
|
|
|
|
// Skip configs that belong to already-loaded specials.
|
|
if isSpecialConfig(path, loadedSpecials) {
|
|
continue
|
|
}
|
|
|
|
cfg, err := config.Load(path)
|
|
if err != nil {
|
|
logger.Error("failed to load config", "path", path, "err", err)
|
|
continue
|
|
}
|
|
if !cfg.Agent.Enabled {
|
|
logger.Info("agent disabled, skipping", "id", cfg.Agent.ID)
|
|
continue
|
|
}
|
|
if cfg.Agent.Template {
|
|
logger.Info("agent is template, skipping", "id", cfg.Agent.ID)
|
|
continue
|
|
}
|
|
|
|
// Per-agent logger → writes to logs/<agent-id>/YYYY-MM-DD.jsonl
|
|
agentLogger, agentCleanup, aErr := agentlog.NewAgentLogger(agentlog.LoggerConfig{
|
|
BaseDir: logDir,
|
|
AgentID: cfg.Agent.ID,
|
|
Level: lvl,
|
|
})
|
|
if aErr != nil {
|
|
logger.Warn("agent file logger failed, using launcher logger", "agent", cfg.Agent.ID, "err", aErr)
|
|
agentLogger = logger.With("agent", cfg.Agent.ID)
|
|
agentCleanup = func() {}
|
|
}
|
|
|
|
// Branch: robot (command-only, lightweight) vs agent (full runtime).
|
|
var runner devagents.Runner
|
|
|
|
if cfg.Agent.Type == "robot" {
|
|
robot, rErr := devagents.NewRobot(cfg, agentLogger)
|
|
if rErr != nil {
|
|
logger.Error("failed to create robot", "id", cfg.Agent.ID, "err", rErr)
|
|
agentCleanup()
|
|
continue
|
|
}
|
|
// Register agent-specific commands for robots
|
|
if cfg.Agent.ID == "test-bot" {
|
|
for _, cmd := range testbot.Commands() {
|
|
robot.RegisterCommand(cmd.Spec, cmd.Handler)
|
|
}
|
|
}
|
|
|
|
runner = robot
|
|
agentLogger.Info("created robot", "id", cfg.Agent.ID)
|
|
} else {
|
|
rules := rulesFor(cfg.Agent.ID, logger)
|
|
|
|
// Resolve centralized ACL for this agent
|
|
agentACL := pksecurity.ResolveACL(cfg.Agent.ID, deps.secPolicy)
|
|
agentLogger.Debug("resolved acl for agent",
|
|
"agent", cfg.Agent.ID,
|
|
"acl_empty", agentACL.Empty(),
|
|
)
|
|
|
|
a, cErr := devagents.New(cfg, rules, agentACL, agentLogger, devagents.WithLogDir(logDir))
|
|
if cErr != nil {
|
|
logger.Error("failed to create agent", "id", cfg.Agent.ID, "err", cErr)
|
|
agentCleanup()
|
|
continue
|
|
}
|
|
|
|
// Connect agent to bus for orchestration
|
|
a.SetBus(agentBus)
|
|
|
|
// If orchestrator is active, wire interceptor and membership notify
|
|
if orch != nil {
|
|
a.SetInterceptor(orch.orchestrator.Intercept)
|
|
a.SetMembershipNotify(orch.orchestrator.NotifyMembership)
|
|
|
|
orch.orchestrator.RegisterParticipant(orchestration.ParticipantInfo{
|
|
ID: cfg.Agent.ID,
|
|
MatrixUserID: cfg.Matrix.UserID,
|
|
Description: cfg.Agent.Description,
|
|
Capabilities: cfg.Agent.Tags,
|
|
})
|
|
|
|
// Grab the first available Matrix client for room scanning
|
|
scannerOnce.set(a.RawMatrixClient())
|
|
}
|
|
|
|
runner = a
|
|
}
|
|
|
|
registry.register(&runningAgent{
|
|
runner: runner,
|
|
cfg: cfg,
|
|
cfgPath: path,
|
|
logger: agentLogger,
|
|
logCleanup: agentCleanup,
|
|
})
|
|
}
|
|
|
|
// ── Startup room scan (after all participants are registered) ──
|
|
if orch != nil && scannerOnce.client != nil {
|
|
orch.orchestrator.SetScanner(scannerOnce.client)
|
|
scanCtx, scanCancel := context.WithTimeout(ctx, 30*time.Second)
|
|
orch.orchestrator.ScanExistingRooms(scanCtx)
|
|
scanCancel()
|
|
}
|
|
|
|
registry.waitAll()
|
|
registry.cleanupLogs()
|
|
logger.Info("all agents stopped")
|
|
return nil
|
|
},
|
|
}
|
|
|
|
root.Flags().StringSliceVarP(&configPaths, "config", "c", nil,
|
|
"Agent config file(s). If omitted, discovers all agents/*/config.yaml")
|
|
root.Flags().StringVar(&logLevel, "log-level", "info",
|
|
"Log level: debug | info | warn | error")
|
|
root.Flags().StringVar(&logDir, "log-dir", "logs",
|
|
`Log directory (logs/<agent>/YYYY-MM-DD.jsonl). Use "stdout" for console only`)
|
|
|
|
if err := root.Execute(); err != nil {
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
// scanOnce captures the first Matrix client for room scanning.
|
|
type scanOnce struct {
|
|
client *mautrix.Client
|
|
}
|
|
|
|
func (s *scanOnce) set(c *mautrix.Client) {
|
|
if s.client == nil {
|
|
s.client = c
|
|
}
|
|
}
|
|
|
|
// orchHandle wraps a running orchestrator with its config for the launcher.
|
|
type orchHandle struct {
|
|
orchestrator *orchshell.Orchestrator
|
|
cfg *config.SpecialConfig
|
|
}
|
|
|
|
// startOrchestrator scans agents/_specials/orchestrator/config.yaml and
|
|
// initializes the orchestrator if found and enabled.
|
|
func startOrchestrator(agentBus *bus.Bus, logger *slog.Logger) (*orchHandle, error) {
|
|
cfgPath := filepath.Join("agents", "_specials", "orchestrator", "config.yaml")
|
|
if _, err := os.Stat(cfgPath); os.IsNotExist(err) {
|
|
return nil, err
|
|
}
|
|
|
|
cfg, err := config.LoadSpecial(cfgPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !cfg.Special.Enabled {
|
|
return nil, nil
|
|
}
|
|
|
|
orchLogger := logger.With("component", "orchestrator")
|
|
orch, err := orchshell.New(cfg, agentBus, orchLogger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &orchHandle{orchestrator: orch, cfg: cfg}, nil
|
|
}
|
|
|
|
// rulesFor retrieves the rule factory for the given agent ID from the
|
|
// global registry (populated by init() in each agent package).
|
|
// Returns nil if no rules are registered (command-only bot).
|
|
func rulesFor(agentID string, logger *slog.Logger) []decision.Rule {
|
|
factory := devagents.GetRules(agentID)
|
|
if factory == nil {
|
|
logger.Warn("no rules registered for agent, using empty ruleset (command-only)", "id", agentID)
|
|
return nil
|
|
}
|
|
return factory()
|
|
}
|
|
|
|
func parseLogLevel(level string) slog.Level {
|
|
switch level {
|
|
case "debug":
|
|
return slog.LevelDebug
|
|
case "warn":
|
|
return slog.LevelWarn
|
|
case "error":
|
|
return slog.LevelError
|
|
default:
|
|
return slog.LevelInfo
|
|
}
|
|
}
|
|
|
|
// newLogger creates a stdout-only JSON logger (fallback when file logger fails).
|
|
func newLogger(level string) *slog.Logger {
|
|
return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: parseLogLevel(level)}))
|
|
}
|
|
|
|
// isSpecialConfig checks whether a config path belongs to a special agent
|
|
// that was already loaded (e.g. orchestrator). It reads the YAML to detect
|
|
// a "special:" top-level key. This avoids config.Load() failing with
|
|
// validation errors for SpecialConfig files.
|
|
func isSpecialConfig(path string, loadedSpecials map[string]bool) bool {
|
|
if len(loadedSpecials) == 0 {
|
|
return false
|
|
}
|
|
cfg, err := config.LoadSpecial(path)
|
|
if err != nil {
|
|
return false // not a valid special config → let Load() handle it
|
|
}
|
|
return loadedSpecials[cfg.Special.ID]
|
|
}
|