// Package agents defines the Agent runtime that ties core and shell together. package devagents import ( "context" "fmt" "io" "log/slog" "os" "path/filepath" "sync" "time" "maunium.net/go/mautrix" "maunium.net/go/mautrix/event" "github.com/enmanuel/agents/internal/config" "github.com/enmanuel/agents/pkg/acl" "github.com/enmanuel/agents/pkg/command" "github.com/enmanuel/agents/pkg/decision" coretypes "github.com/enmanuel/agents/pkg/llm" "github.com/enmanuel/agents/pkg/memory" "github.com/enmanuel/agents/pkg/personality" "github.com/enmanuel/agents/pkg/sanitize" "github.com/enmanuel/agents/shell/audit" "github.com/enmanuel/agents/shell/bus" shellcron "github.com/enmanuel/agents/shell/cron" "github.com/enmanuel/agents/shell/effects" shellknowledge "github.com/enmanuel/agents/shell/knowledge" "github.com/enmanuel/agents/shell/matrix" shellmcp "github.com/enmanuel/agents/shell/mcp" shellskills "github.com/enmanuel/agents/shell/skills" "github.com/enmanuel/agents/shell/ssh" "github.com/enmanuel/agents/tools" toolmemory "github.com/enmanuel/agents/tools/memorytools" ) const ( defaultMaxToolIterations = 5 defaultWindowSize = 20 ) // Option configures optional Agent behaviour. type Option func(*Agent) // WithLogDir sets the base directory for JSONL logs (used by !metrics command). func WithLogDir(dir string) Option { return func(a *Agent) { a.logDir = dir } } // CommandHandler executes a built-in command and returns the response text. type CommandHandler func(ctx context.Context, msgCtx decision.MessageContext) string // Agent is the assembled runtime: pure core + impure shell. type Agent struct { cfg *config.AgentConfig personality personality.Personality rules []decision.Rule llm coretypes.CompleteFunc // nil when no LLM configured (simple_bot) matrix *matrix.Client sender effects.MatrixSender // used by sendReply; same object as matrix in production runner *effects.Runner listener *matrix.Listener toolReg *tools.Registry logger *slog.Logger cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown mcpManager *shellmcp.Manager // nil when MCP client is disabled // Lifecycle — cancel stops this agent individually; done is closed when Run returns. cancel context.CancelFunc done chan struct{} // Access control acl acl.ACL // Commands — handlers keyed by canonical name; cmdAliases maps alias → canonical commands map[string]CommandHandler cmdAliases map[string]string // alias → canonical name customSpecs []command.Spec // specs from RegisterCommand (for !help) startTime time.Time // Memory windows map[string]memory.Window windowsMu sync.RWMutex memStore memory.Store // nil when memory is disabled windowSize int roomCtx *toolmemory.RoomContext // Prompt-commands — loaded from prompts/*.md at startup promptCmds map[string]string // name → prompt content // Knowledge store — non-nil when knowledge is enabled knowledgeStore *shellknowledge.FileStore // Shared knowledge store — non-nil when shared_knowledge is enabled sharedKnowledgeStore *shellknowledge.FileStore // Skills loader — non-nil when skills are enabled skillLoader *shellskills.Loader // Sanitization options — nil when sanitization is disabled sanitizeOpts *sanitize.Options // Bus — set via SetBus() when running under the unified launcher agentBus *bus.Bus // Scheduler — nil when no schedules are configured scheduler *shellcron.Scheduler // Audit writer — nil when audit is disabled auditWriter *audit.Writer // LogDir — base directory for JSONL logs (used by !metrics) logDir string } // New assembles an Agent from its config, rules, pre-resolved ACL, and logger. // The ACL is resolved externally (e.g. from security/ YAML files) and injected here. // Pass acl.ACL{} (empty) for open access (no restrictions). // logDir is the base directory for JSONL logs (used by !metrics command); empty disables metrics. func New(cfg *config.AgentConfig, rules []decision.Rule, agentACL acl.ACL, logger *slog.Logger, opts ...Option) (*Agent, error) { // Matrix client matrixClient, err := matrix.New(cfg.Matrix) if err != nil { return nil, fmt.Errorf("matrix client: %w", err) } // E2EE — initialize before the sync loop starts cryptoStore, err := initCrypto(cfg, matrixClient, logger) if err != nil { return nil, err } // SSH executor sshExec := ssh.NewExecutor(cfg.SSH, logger) // LLM client — optional; if no provider is configured, the agent runs as simple_bot llmFunc, err := initLLM(cfg, logger) if err != nil { return nil, err } // Effects runner runner := effects.NewRunner(matrixClient, sshExec, logger) // Resolve base data path for this agent dataBase := resolveDataBase(cfg) logger.Debug("data base path", "path", dataBase) // Memory subsystem memInit, err := initMemoryStore(cfg.Memory.Enabled, cfg.Memory.WindowSize, cfg.Memory.DBPath, dataBase, logger) if err != nil { return nil, err } // Tool dependencies (knowledge, MCP, skills) deps := initToolDeps(cfg, dataBase, logger) if !agentACL.Empty() { logger.Info("acl enabled (centralized security policy)") } // Tool registry — register tools enabled in config roomCtx := &toolmemory.RoomContext{} toolReg := buildToolRegistry(cfg, sshExec, matrixClient, memInit.store, deps.kStore, deps.sharedKStore, deps.mcpManager, deps.skillLoader, deps.skillExecutor, roomCtx, logger) // Rate limiting for tools initRateLimiter(cfg, toolReg, logger) a := &Agent{ cfg: cfg, acl: agentACL, personality: personality.FromConfig(cfg.Personality), rules: rules, llm: llmFunc, matrix: matrixClient, sender: matrixClient, runner: runner, toolReg: toolReg, logger: logger, cryptoStore: cryptoStore, mcpManager: deps.mcpManager, done: make(chan struct{}), commands: make(map[string]CommandHandler), cmdAliases: command.BuiltinNames(), startTime: time.Now(), windows: make(map[string]memory.Window), memStore: memInit.store, knowledgeStore: deps.kStore, sharedKnowledgeStore: deps.sharedKStore, skillLoader: deps.skillLoader, windowSize: memInit.windowSize, roomCtx: roomCtx, } // Apply optional configuration for _, opt := range opts { opt(a) } // Initialize audit writer if enabled if cfg.Security.Audit.Enabled { var matrixSender audit.MatrixSender if cfg.Security.Audit.LogToRoom != "" { mc := matrixClient // capture for closure matrixSender = func(roomID, msg string) { if err := mc.SendMarkdown(context.Background(), roomID, msg); err != nil { logger.Warn("audit_matrix_send_error", "room", roomID, "err", err) } } } aw, auditErr := audit.New(cfg.Security.Audit, matrixSender, logger) if auditErr != nil { logger.Error("audit_writer_init_failed", "err", auditErr) } else { a.auditWriter = aw logger.Info("audit trail enabled", "log_file", cfg.Security.Audit.LogFile, "log_to_room", cfg.Security.Audit.LogToRoom, "include", cfg.Security.Audit.Include, ) // Wire tool_exec audit into the tool registry agentID := cfg.Agent.ID toolReg.SetAuditFunc(func(toolName string, durationMS int64, toolErr error) { detail := fmt.Sprintf("tool=%s duration_ms=%d", toolName, durationMS) if toolErr != nil { detail += " error=" + toolErr.Error() } a.emitAudit(audit.Event{ AgentID: agentID, EventType: audit.EventToolExec, Detail: detail, }) }) } } // Configure sanitization if enabled if cfg.Security.Sanitize.Enabled { minSev := parseSeverity(cfg.Security.Sanitize.MinSeverity) a.sanitizeOpts = &sanitize.Options{ Mode: sanitize.ParseMode(cfg.Security.Sanitize.Mode), MinSeverity: minSev, DisabledPatterns: cfg.Security.Sanitize.DisabledPatterns, } logger.Info("input sanitization enabled", "mode", a.sanitizeOpts.Mode, "min_severity", minSev, ) } // Register built-in command handlers a.registerBuiltinCommands() // Load prompt-commands from prompts/ directory a.loadPromptCommands() // Register memory_clear_context with self as WindowClearer (after a is created) if cfg.Tools.Memory.Enabled && memInit.store != nil { toolReg.Register(toolmemory.NewMemoryClearContext(a, roomCtx)) } // Cron scheduler — only when schedules are configured if len(cfg.Schedules) > 0 { a.scheduler = shellcron.New(cfg.Schedules, matrixClient, llmFunc, cfg.LLM.Primary.Model, logger) logger.Info("cron scheduler configured", "schedules", len(cfg.Schedules)) } // Matrix event listener a.listener = matrix.NewListener(matrixClient, cfg.Matrix, a.handleEvent, logger) return a, nil } // initCrypto initializes E2EE if enabled and returns the crypto store closer. func initCrypto(cfg *config.AgentConfig, matrixClient *matrix.Client, logger *slog.Logger) (io.Closer, error) { if !cfg.Matrix.Encryption.Enabled { return nil, nil } storePath := filepath.Join(cfg.Matrix.Encryption.StorePath, "crypto.db") pickleKey := os.Getenv(cfg.Matrix.Encryption.PickleKeyEnv) logger.Info("initializing e2ee", "store", storePath) cryptoStore, err := matrixClient.InitCrypto(context.Background(), storePath, pickleKey, cfg.Agent.ID) if err != nil { return nil, fmt.Errorf("e2ee init: %w", err) } // Auto-fetch cross-signing private keys from SSSS if recovery key is configured. if envName := cfg.Matrix.Encryption.RecoveryKeyEnv; envName != "" { if rk := os.Getenv(envName); rk != "" { if err := matrixClient.FetchCrossSigningKeys(context.Background(), rk); err != nil { logger.Warn("failed to fetch cross-signing keys from SSSS (non-fatal)", "err", err) } else { logger.Info("cross-signing private keys fetched from SSSS") } } } // Sign own device with the self-signing key so Element shows it as verified. if err := matrixClient.SignOwnDevice(context.Background()); err != nil { logger.Warn("failed to sign own device (non-fatal)", "err", err) } else { logger.Info("own device signed with cross-signing key") } logger.Info("e2ee ready") return cryptoStore, nil } // RegisterCommand adds a custom command handler for this agent. // The spec provides metadata (aliases, description, usage) for !help. // Must be called before Run(). func (a *Agent) RegisterCommand(spec command.Spec, handler CommandHandler) { a.commands[spec.Name] = handler a.cmdAliases[spec.Name] = spec.Name for _, alias := range spec.Aliases { a.cmdAliases[alias] = spec.Name } a.customSpecs = append(a.customSpecs, spec) a.logger.Info("command_registered", "command", spec.Name, "aliases", spec.Aliases) } // SetBus attaches the agent to the inter-agent bus for orchestration. // Must be called before Run(). func (a *Agent) SetBus(b *bus.Bus) { a.agentBus = b } // SetInterceptor configures the listener to skip events in orchestrated rooms. func (a *Agent) SetInterceptor(fn matrix.InterceptFunc) { a.listener.SetInterceptor(fn) } // SetMembershipNotify registers a callback for room membership changes. func (a *Agent) SetMembershipNotify(fn matrix.MembershipNotifyFunc) { a.listener.SetMembershipNotify(fn) } // RawMatrixClient returns the underlying *mautrix.Client for room scanning. func (a *Agent) RawMatrixClient() *mautrix.Client { return a.matrix.Raw() } // Stop cancels this agent's individual context, causing Run to return. // Safe to call multiple times. func (a *Agent) Stop() { if a.cancel != nil { a.cancel() } } // Done returns a channel that is closed when Run has returned. func (a *Agent) Done() <-chan struct{} { return a.done } // Run starts the agent sync loop. Blocks until ctx is cancelled. func (a *Agent) Run(ctx context.Context) error { ctx, a.cancel = context.WithCancel(ctx) defer close(a.done) if a.cryptoStore != nil { defer a.cryptoStore.Close() } if a.memStore != nil { defer a.memStore.Close() } if a.knowledgeStore != nil { defer a.knowledgeStore.Close() } if a.sharedKnowledgeStore != nil { defer a.sharedKnowledgeStore.Close() } if a.mcpManager != nil { defer a.mcpManager.Close() } if a.auditWriter != nil { defer a.auditWriter.Close() } a.logger.Info("agent starting", "id", a.cfg.Agent.ID, "name", a.cfg.Agent.Name, "tools", a.toolReg.Names(), ) // Set presence to online if err := a.matrix.SetPresence(ctx, event.PresenceOnline); err != nil { a.logger.Warn("failed to set presence online", "err", err) } defer func() { // Use background context since ctx is already cancelled at shutdown offlineCtx := context.Background() if err := a.matrix.SetPresence(offlineCtx, event.PresenceOffline); err != nil { a.logger.Warn("failed to set presence offline", "err", err) } }() // Start bus listener if connected to the orchestration bus if a.agentBus != nil { ch := a.agentBus.Subscribe(bus.AgentID(a.cfg.Agent.ID)) go a.listenBus(ctx, ch) a.logger.Info("bus listener started") } // Start cron scheduler in background goroutine (blocks until ctx cancelled) if a.scheduler != nil { go a.scheduler.Start(ctx) } return a.listener.Run(ctx) }