// Package agents defines the Agent runtime that ties core and shell together. package agents import ( "context" "fmt" "io" "log/slog" "os" "path/filepath" "sync" "maunium.net/go/mautrix/event" "github.com/enmanuel/agents/internal/config" "github.com/enmanuel/agents/pkg/decision" coretypes "github.com/enmanuel/agents/pkg/llm" "github.com/enmanuel/agents/pkg/memory" "github.com/enmanuel/agents/pkg/personality" "github.com/enmanuel/agents/shell/effects" shelllm "github.com/enmanuel/agents/shell/llm" "github.com/enmanuel/agents/shell/matrix" shellmem "github.com/enmanuel/agents/shell/memory" "github.com/enmanuel/agents/shell/ssh" "github.com/enmanuel/agents/tools" ) const ( defaultMaxToolIterations = 5 defaultWindowSize = 20 ) // Agent is the assembled runtime: pure core + impure shell. type Agent struct { cfg *config.AgentConfig personality personality.Personality rules []decision.Rule llm coretypes.CompleteFunc matrix *matrix.Client runner *effects.Runner listener *matrix.Listener toolReg *tools.Registry logger *slog.Logger cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown // Memory windows map[string]memory.Window windowsMu sync.RWMutex memStore memory.Store // nil when memory is disabled windowSize int roomCtx *tools.RoomContext } // ClearWindow resets the conversation window for a room. Implements tools.WindowClearer. func (a *Agent) ClearWindow(roomID string) { a.windowsMu.Lock() defer a.windowsMu.Unlock() a.windows[roomID] = memory.NewWindow(a.windowSize) } // New assembles an Agent from its config, rules, and logger. func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (*Agent, error) { // Matrix client matrixClient, err := matrix.New(cfg.Matrix) if err != nil { return nil, fmt.Errorf("matrix client: %w", err) } // E2EE — initialize before the sync loop starts var cryptoStore io.Closer if cfg.Matrix.Encryption.Enabled { storePath := filepath.Join(cfg.Matrix.Encryption.StorePath, "crypto.db") pickleKey := os.Getenv(cfg.Matrix.Encryption.PickleKeyEnv) logger.Info("initializing e2ee", "store", storePath) cryptoStore, err = matrixClient.InitCrypto(context.Background(), storePath, pickleKey, cfg.Agent.ID) if err != nil { return nil, fmt.Errorf("e2ee init: %w", err) } // Auto-fetch cross-signing private keys from SSSS if recovery key is configured. if envName := cfg.Matrix.Encryption.RecoveryKeyEnv; envName != "" { if rk := os.Getenv(envName); rk != "" { if err := matrixClient.FetchCrossSigningKeys(context.Background(), rk); err != nil { logger.Warn("failed to fetch cross-signing keys from SSSS (non-fatal)", "err", err) } else { logger.Info("cross-signing private keys fetched from SSSS") } } } // Sign own device with the self-signing key so Element shows it as verified. if err := matrixClient.SignOwnDevice(context.Background()); err != nil { logger.Warn("failed to sign own device (non-fatal)", "err", err) } else { logger.Info("own device signed with cross-signing key") } logger.Info("e2ee ready") } // SSH executor sshExec := ssh.NewExecutor(cfg.SSH) // LLM client primaryLLM, err := shelllm.FromConfig(cfg.LLM.Primary) if err != nil { return nil, fmt.Errorf("primary LLM: %w", err) } var llmFunc coretypes.CompleteFunc = primaryLLM if cfg.LLM.Fallback.Provider != "" { fallbackLLM, err := shelllm.FromConfig(cfg.LLM.Fallback) if err != nil { logger.Warn("fallback LLM config error", "err", err) } else { llmFunc = shelllm.WithFallback(primaryLLM, fallbackLLM) } } // Effects runner runner := effects.NewRunner(matrixClient, sshExec, logger) // Memory subsystem var memStore memory.Store windowSize := defaultWindowSize roomCtx := &tools.RoomContext{} if cfg.Memory.Enabled { windowSize = cfg.Memory.WindowSize if windowSize <= 0 { windowSize = defaultWindowSize } dbPath := cfg.Memory.DBPath if dbPath == "" { dbPath = filepath.Join("agents", cfg.Agent.ID, "data", "memory.db") } store, err := shellmem.New(dbPath) if err != nil { return nil, fmt.Errorf("memory store: %w", err) } memStore = store logger.Info("memory enabled", "window_size", windowSize, "db", dbPath) } // Tool registry — register tools enabled in config toolReg := buildToolRegistry(cfg, sshExec, matrixClient, memStore, roomCtx, logger) a := &Agent{ cfg: cfg, rules: rules, llm: llmFunc, matrix: matrixClient, runner: runner, toolReg: toolReg, logger: logger, cryptoStore: cryptoStore, windows: make(map[string]memory.Window), memStore: memStore, windowSize: windowSize, roomCtx: roomCtx, } // Register memory_clear_context with self as WindowClearer (after a is created) if cfg.Tools.Memory.Enabled && memStore != nil { toolReg.Register(tools.NewMemoryClearContext(a, roomCtx)) } // Matrix event listener a.listener = matrix.NewListener(matrixClient, cfg.Matrix, a.handleEvent, logger) return a, nil } // Run starts the agent sync loop. Blocks until ctx is cancelled. func (a *Agent) Run(ctx context.Context) error { if a.cryptoStore != nil { defer a.cryptoStore.Close() } if a.memStore != nil { defer a.memStore.Close() } a.logger.Info("agent starting", "id", a.cfg.Agent.ID, "name", a.cfg.Agent.Name, "tools", a.toolReg.Names(), ) return a.listener.Run(ctx) } // handleEvent is called by the matrix Listener for each filtered incoming event. func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) { a.logger.Debug("handling event", "sender", msgCtx.SenderID, "is_dm", msgCtx.IsDirectMsg, "is_mention", msgCtx.IsMention, "command", msgCtx.Command, ) roomID := evt.RoomID.String() // Update room context for memory tools a.roomCtx.Set(roomID) if a.cfg.Personality.Behavior.TypingIndicator { _ = a.matrix.SendTyping(ctx, roomID, true) defer a.matrix.SendTyping(ctx, roomID, false) } actions := decision.Evaluate(msgCtx, a.rules) a.logger.Debug("rules evaluated", "matched_actions", len(actions)) // If no rules matched and the message mentions the bot or is a DM, use LLM. if len(actions) == 0 && (msgCtx.IsMention || msgCtx.IsDirectMsg) { a.logger.Debug("no rules matched, falling back to LLM") actions = []decision.Action{{ Kind: decision.ActionKindLLM, LLM: &decision.LLMAction{ContextKey: msgCtx.RoomID}, }} } if len(actions) == 0 { a.logger.Debug("no actions, ignoring message", "is_dm", msgCtx.IsDirectMsg, "is_mention", msgCtx.IsMention, ) return } // Expand LLM actions inline — with tool-use loop when enabled expanded := make([]decision.Action, 0, len(actions)) for _, act := range actions { if act.Kind == decision.ActionKindLLM { // Memory: load window + append user message before LLM call a.ensureWindowLoaded(ctx, roomID) a.appendToWindow(roomID, coretypes.Message{ Role: coretypes.RoleUser, Content: msgCtx.Content, }) a.persistMessage(ctx, roomID, coretypes.RoleUser, msgCtx.Content) reply, err := a.runLLM(ctx, msgCtx) if err != nil { a.logger.Error("llm error", "err", err) expanded = append(expanded, decision.Action{ Kind: decision.ActionKindReply, Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error."}, }) } else { expanded = append(expanded, decision.Action{ Kind: decision.ActionKindReply, Reply: &decision.ReplyAction{Content: reply}, }) // Memory: append assistant reply after LLM call a.appendToWindow(roomID, coretypes.Message{ Role: coretypes.RoleAssistant, Content: reply, }) a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply) } } else { expanded = append(expanded, act) } } a.runner.Execute(ctx, roomID, expanded) } func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (string, error) { a.logger.Debug("calling LLM", "model", a.cfg.LLM.Primary.Model, "provider", a.cfg.LLM.Primary.Provider, ) // Load system prompt from file if configured, else use description systemPrompt := a.cfg.Agent.Description // Build messages: conversation history from window (includes current user msg) messages := a.getWindowMessages(msgCtx.RoomID) if len(messages) == 0 { // Fallback if memory is disabled: just the current message messages = []coretypes.Message{ {Role: coretypes.RoleUser, Content: msgCtx.Content}, } } // Build tool specs for the LLM if tool_use is enabled var llmTools []coretypes.ToolSpec if a.cfg.LLM.ToolUse.Enabled && a.toolReg.Len() > 0 { llmTools = a.toolReg.ToLLMSpecs() a.logger.Debug("tools available for LLM", "count", len(llmTools)) } maxIter := a.cfg.LLM.ToolUse.MaxIterations if maxIter <= 0 { maxIter = defaultMaxToolIterations } // Tool-use loop: call LLM → execute tools → feed results back → repeat for i := 0; i < maxIter; i++ { req := coretypes.CompletionRequest{ Model: a.cfg.LLM.Primary.Model, MaxTokens: a.cfg.LLM.Primary.MaxTokens, Temperature: a.cfg.LLM.Primary.Temperature, SystemPrompt: systemPrompt, Messages: messages, Tools: llmTools, } resp, err := a.llm(ctx, req) if err != nil { a.logger.Error("LLM call failed", "model", req.Model, "err", err) return "", err } a.logger.Debug("LLM responded", "content_len", len(resp.Content), "tool_calls", len(resp.ToolCalls), "finish_reason", resp.FinishReason, ) // No tool calls — return the text response if len(resp.ToolCalls) == 0 { return resp.Content, nil } // Append assistant message with tool calls to conversation messages = append(messages, coretypes.Message{ Role: coretypes.RoleAssistant, Content: resp.Content, ToolCalls: resp.ToolCalls, }) // Execute each tool and append results for _, tc := range resp.ToolCalls { a.logger.Info("executing tool", "tool", tc.Name, "call_id", tc.ID, ) // Notify the room that a tool is being called toolNotice := fmt.Sprintf("🔨 %s", tc.Name) if err := a.matrix.SendMarkdown(ctx, msgCtx.RoomID, toolNotice); err != nil { a.logger.Warn("failed to send tool call notice", "tool", tc.Name, "err", err) } result := a.toolReg.Execute(ctx, tc.Name, tc.Arguments) output := result.Output if result.Err != nil { output = fmt.Sprintf("error: %s", result.Err) a.logger.Warn("tool execution error", "tool", tc.Name, "err", result.Err, ) } else { a.logger.Debug("tool executed", "tool", tc.Name, "output_len", len(output), ) } messages = append(messages, coretypes.Message{ Role: coretypes.RoleTool, Content: output, ToolCallID: tc.ID, }) } } // Max iterations reached — return whatever we have a.logger.Warn("tool-use loop reached max iterations", "max", maxIter) return "I've reached the maximum number of tool iterations. Here's what I found so far.", nil } // ── Memory helpers ─────────────────────────────────────────────────────── // ensureWindowLoaded loads the conversation window from SQLite on first access for a room. func (a *Agent) ensureWindowLoaded(ctx context.Context, roomID string) { a.windowsMu.Lock() defer a.windowsMu.Unlock() if _, ok := a.windows[roomID]; ok { return } w := memory.NewWindow(a.windowSize) if a.memStore != nil { msgs, err := a.memStore.LoadMessages(ctx, a.cfg.Agent.ID, roomID, a.windowSize) if err != nil { a.logger.Warn("failed to load message history", "room", roomID, "err", err) } else { for _, m := range msgs { w = w.Append(coretypes.Message{Role: m.Role, Content: m.Content}) } if len(msgs) > 0 { a.logger.Debug("loaded message history", "room", roomID, "count", len(msgs)) } } } a.windows[roomID] = w } // appendToWindow adds a message to the in-memory conversation window. func (a *Agent) appendToWindow(roomID string, msg coretypes.Message) { a.windowsMu.Lock() defer a.windowsMu.Unlock() w, ok := a.windows[roomID] if !ok { w = memory.NewWindow(a.windowSize) } a.windows[roomID] = w.Append(msg) } // getWindowMessages returns a copy of the conversation window for a room. func (a *Agent) getWindowMessages(roomID string) []coretypes.Message { a.windowsMu.RLock() defer a.windowsMu.RUnlock() w, ok := a.windows[roomID] if !ok { return nil } return w.ToLLMMessages() } // persistMessage saves a message to the SQLite store (no-op if store is nil). func (a *Agent) persistMessage(ctx context.Context, roomID string, role coretypes.Role, content string) { if a.memStore == nil { return } if err := a.memStore.SaveMessage(ctx, memory.HistoryMessage{ AgentID: a.cfg.Agent.ID, RoomID: roomID, Role: role, Content: content, }); err != nil { a.logger.Warn("failed to persist message", "room", roomID, "err", err) } } // buildToolRegistry creates a Registry with tools enabled in the agent's config. func buildToolRegistry( cfg *config.AgentConfig, sshExec *ssh.Executor, matrixClient *matrix.Client, memStore memory.Store, roomCtx *tools.RoomContext, logger *slog.Logger, ) *tools.Registry { reg := tools.NewRegistry() if cfg.Tools.HTTP.Enabled { reg.Register(tools.NewHTTPGet(cfg.Tools.HTTP)) reg.Register(tools.NewHTTPPost(cfg.Tools.HTTP)) logger.Debug("registered http tools") } if cfg.Tools.SSH.Enabled { reg.Register(tools.NewSSHCommand(cfg.Tools.SSH, sshExec)) logger.Debug("registered ssh tool") } if cfg.Tools.FileOps.Enabled { reg.Register(tools.NewReadFile(cfg.Tools.FileOps)) logger.Debug("registered file tool") } // current_time is always available reg.Register(tools.NewCurrentTime()) logger.Debug("registered current_time tool") // matrix_send is always available reg.Register(tools.NewMatrixSend(matrixClient)) logger.Debug("registered matrix tool") // Memory tools (memory_clear_context registered later since it needs the Agent) if cfg.Tools.Memory.Enabled && memStore != nil { reg.Register(tools.NewMemorySave(cfg.Agent.ID, memStore)) reg.Register(tools.NewMemoryRecall(cfg.Agent.ID, memStore)) reg.Register(tools.NewMemoryForget(cfg.Agent.ID, memStore)) reg.Register(tools.NewMemorySummary(cfg.Agent.ID, memStore)) logger.Debug("registered memory tools") } return reg }