diff --git a/agents/handler.go b/agents/handler.go
new file mode 100644
index 0000000..9eb3d66
--- /dev/null
+++ b/agents/handler.go
@@ -0,0 +1,361 @@
+package agents
+
+import (
+ "context"
+ "fmt"
+
+ "maunium.net/go/mautrix/event"
+
+ "github.com/enmanuel/agents/pkg/command"
+ "github.com/enmanuel/agents/pkg/decision"
+ coretypes "github.com/enmanuel/agents/pkg/llm"
+ "github.com/enmanuel/agents/pkg/orchestration"
+ "github.com/enmanuel/agents/pkg/sanitize"
+ "github.com/enmanuel/agents/shell/bus"
+)
+
+// handleEvent is called by the matrix Listener for each filtered incoming event.
+func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) {
+ a.logger.Debug("handling event",
+ "sender", msgCtx.SenderID,
+ "is_dm", msgCtx.IsDirectMsg,
+ "is_mention", msgCtx.IsMention,
+ "command", msgCtx.Command,
+ )
+
+ roomID := evt.RoomID.String()
+
+ // Update room context for memory tools
+ a.roomCtx.Set(roomID)
+
+ if a.cfg.Personality.Behavior.TypingIndicator {
+ _ = a.matrix.SendTyping(ctx, roomID, true)
+ defer a.matrix.SendTyping(ctx, roomID, false)
+ }
+
+ // ── Command flow ─────────────────────────────────────────────────
+ // Commands (!xxx) always resolve before rules or LLM. Never reach the LLM.
+ // Priority: built-in → unknown (agent-specific commands can be added via RegisterCommand).
+ if msgCtx.Command != "" {
+ a.logger.Info("command_received",
+ "command", msgCtx.Command,
+ "sender", msgCtx.SenderID,
+ "room", roomID,
+ "args", msgCtx.Args,
+ )
+
+ // Resolve aliases
+ cmdName := msgCtx.Command
+ if canonical, ok := a.cmdAliases[cmdName]; ok {
+ cmdName = canonical
+ }
+
+ if handler, ok := a.commands[cmdName]; ok {
+ // RBAC check for commands
+ if !a.acl.CanDo(msgCtx.SenderID, "command:"+cmdName) {
+ a.logger.Info("command_denied", "command", cmdName, "sender", msgCtx.SenderID)
+ _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
+ "No tienes permisos para ejecutar este comando.")
+ return
+ }
+ a.logger.Info("command_executed", "command", cmdName)
+ reply := handler(ctx, msgCtx)
+ _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, reply)
+ return
+ }
+
+ // Prompt-command: expand .md content and pass to LLM
+ if content, ok := a.promptCmds[cmdName]; ok {
+ a.logger.Info("prompt_command_expanded", "command", cmdName)
+ msgCtx.Content = command.ExpandPrompt(content, msgCtx.Args)
+ msgCtx.Command = ""
+ msgCtx.Args = nil
+ // Fall through to rules/LLM flow below
+ } else {
+ // Unknown command — never falls through to rules or LLM
+ a.logger.Info("command_unknown", "command", msgCtx.Command)
+ _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
+ fmt.Sprintf("Comando desconocido: `!%s`. Usa `!help` para ver comandos disponibles.", msgCtx.Command))
+ return
+ }
+ }
+
+ // ── Non-command flow ─────────────────────────────────────────────
+ // RBAC check for LLM access ("ask" action)
+ if !a.acl.CanDo(msgCtx.SenderID, "ask") {
+ a.logger.Info("ask_denied", "sender", msgCtx.SenderID)
+ _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
+ "No tienes permisos para interactuar con este agente.")
+ return
+ }
+
+ actions := decision.Evaluate(msgCtx, a.rules)
+ a.logger.Debug("rules evaluated", "matched_actions", len(actions))
+
+ // If no rules matched and the message mentions the bot or is a DM, use LLM.
+ if len(actions) == 0 && (msgCtx.IsMention || msgCtx.IsDirectMsg) {
+ if a.llm == nil {
+ // Simple bot: no LLM, ignore non-command messages
+ a.logger.Debug("no LLM configured, ignoring non-command message")
+ return
+ }
+ a.logger.Debug("no rules matched, falling back to LLM")
+ actions = []decision.Action{{
+ Kind: decision.ActionKindLLM,
+ LLM: &decision.LLMAction{ContextKey: msgCtx.RoomID},
+ }}
+ }
+
+ if len(actions) == 0 {
+ a.logger.Debug("no actions, ignoring message",
+ "is_dm", msgCtx.IsDirectMsg,
+ "is_mention", msgCtx.IsMention,
+ )
+ return
+ }
+
+ a.executeActions(ctx, roomID, msgCtx, actions)
+}
+
+// executeActions expands LLM actions and runs the effects runner.
+func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decision.MessageContext, actions []decision.Action) {
+ // Auto-thread: if configured and message is not already in a thread,
+ // start a new thread rooted at the user's message.
+ if a.cfg.Matrix.Threads.AutoThread && msgCtx.ThreadID == "" && msgCtx.EventID != "" {
+ msgCtx.ThreadID = msgCtx.EventID
+ }
+
+ // Sanitize user input before sending to LLM
+ sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID)
+ if rejected {
+ a.runner.Execute(ctx, roomID, []decision.Action{{
+ Kind: decision.ActionKindReply,
+ Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
+ }})
+ return
+ }
+ msgCtx.Content = sanitized
+
+ // Resolve memory key: use thread root as context key when inside a thread,
+ // so parallel threads in the same room have independent conversation windows.
+ memKey := roomID
+ if msgCtx.ThreadID != "" {
+ memKey = msgCtx.ThreadID
+ }
+
+ expanded := make([]decision.Action, 0, len(actions))
+ for _, act := range actions {
+ if act.Kind == decision.ActionKindLLM {
+ if a.llm == nil {
+ a.logger.Warn("LLM action requested but no LLM configured")
+ expanded = append(expanded, decision.Action{
+ Kind: decision.ActionKindReply,
+ Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
+ })
+ continue
+ }
+ // Memory: load window + append user message before LLM call
+ a.ensureWindowLoaded(ctx, memKey)
+ a.appendToWindow(memKey, coretypes.Message{
+ Role: coretypes.RoleUser, Content: msgCtx.Content,
+ })
+ a.persistMessage(ctx, memKey, coretypes.RoleUser, msgCtx.Content)
+
+ reply, err := a.runLLM(ctx, msgCtx, memKey)
+ if err != nil {
+ a.logger.Error("llm error", "err", err)
+ expanded = append(expanded, decision.Action{
+ Kind: decision.ActionKindReply,
+ Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
+ })
+ } else {
+ expanded = append(expanded, decision.Action{
+ Kind: decision.ActionKindReply,
+ Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
+ })
+
+ // Memory: append assistant reply after LLM call
+ a.appendToWindow(memKey, coretypes.Message{
+ Role: coretypes.RoleAssistant, Content: reply,
+ })
+ a.persistMessage(ctx, memKey, coretypes.RoleAssistant, reply)
+ }
+ } else {
+ expanded = append(expanded, act)
+ }
+ }
+
+ a.runner.Execute(ctx, roomID, expanded)
+}
+
+// listenBus processes messages from the inter-agent bus.
+func (a *Agent) listenBus(ctx context.Context, ch <-chan bus.AgentMessage) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case msg, ok := <-ch:
+ if !ok {
+ return
+ }
+ if msg.Kind == bus.KindTask {
+ a.handleTaskEvent(ctx, msg)
+ }
+ }
+ }
+}
+
+// handleTaskEvent processes a task delegated by the orchestrator.
+// The bot generates a response and sends it both to Matrix and back via bus.
+func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) {
+ taskJSON, ok := msg.Payload["task_json"]
+ if !ok {
+ a.logger.Error("task message missing task_json payload")
+ return
+ }
+
+ task, err := orchestration.UnmarshalTaskEvent(taskJSON)
+ if err != nil {
+ a.logger.Error("failed to unmarshal task event", "err", err)
+ return
+ }
+
+ a.logger.Info("handling orchestrated task",
+ "task_id", task.TaskID,
+ "room", task.TargetRoomID,
+ "sender", task.OriginalSender,
+ "iteration", task.Iteration,
+ )
+
+ roomID := task.TargetRoomID
+
+ // Update room context for memory tools
+ a.roomCtx.Set(roomID)
+
+ if a.cfg.Personality.Behavior.TypingIndicator {
+ _ = a.matrix.SendTyping(ctx, roomID, true)
+ defer a.matrix.SendTyping(ctx, roomID, false)
+ }
+
+ // Build a synthetic MessageContext from the task
+ msgCtx := decision.MessageContext{
+ SenderID: task.OriginalSender,
+ RoomID: roomID,
+ Content: task.OriginalQuestion,
+ IsDirectMsg: false,
+ IsMention: true, // treat orchestrated tasks like mentions
+ }
+
+ // If there are previous responses, prepend context
+ if len(task.PreviousResponses) > 0 {
+ var context string
+ for _, pr := range task.PreviousResponses {
+ context += fmt.Sprintf("[Previous response from %s]: %s\n\n", pr.BotID, pr.Text)
+ }
+ msgCtx.Content = context + "Original question: " + task.OriginalQuestion +
+ "\n\nPlease provide an improved or complementary answer."
+ }
+
+ // Sanitize orchestrated input
+ sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID)
+ if rejected {
+ a.logger.Warn("orchestrated task rejected by sanitizer",
+ "task_id", task.TaskID, "sender", task.OriginalSender)
+ _ = a.matrix.SendMarkdown(ctx, roomID, "El mensaje fue rechazado por el filtro de seguridad.")
+ return
+ }
+ msgCtx.Content = sanitized
+
+ // Load memory and run LLM
+ a.ensureWindowLoaded(ctx, roomID)
+ a.appendToWindow(roomID, coretypes.Message{
+ Role: coretypes.RoleUser, Content: msgCtx.Content,
+ })
+
+ reply, err := a.runLLM(ctx, msgCtx, roomID)
+
+ // Build the result to send back via bus
+ result := orchestration.TaskResult{
+ TaskID: task.TaskID,
+ BotID: a.cfg.Agent.ID,
+ }
+
+ if err != nil {
+ a.logger.Error("LLM error during orchestrated task", "err", err)
+ result.Error = err.Error()
+ reply = "Sorry, I encountered an error."
+ } else {
+ result.Text = reply
+ // Persist assistant reply
+ a.appendToWindow(roomID, coretypes.Message{
+ Role: coretypes.RoleAssistant, Content: reply,
+ })
+ a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply)
+ }
+
+ // Send reply to Matrix room
+ if sendErr := a.matrix.SendMarkdown(ctx, roomID, reply); sendErr != nil {
+ a.logger.Error("failed to send orchestrated reply to Matrix", "err", sendErr)
+ }
+
+ // Send result back to orchestrator via bus
+ resultJSON, marshalErr := orchestration.MarshalTaskResult(result)
+ if marshalErr != nil {
+ a.logger.Error("failed to marshal task result", "err", marshalErr)
+ return
+ }
+
+ replyMsg := bus.AgentMessage{
+ From: bus.AgentID(a.cfg.Agent.ID),
+ To: msg.From,
+ Kind: bus.KindTaskResult,
+ Payload: map[string]string{"result_json": resultJSON},
+ }
+
+ if busErr := a.agentBus.Reply(task.TaskID, replyMsg); busErr != nil {
+ a.logger.Error("failed to send task result via bus", "err", busErr)
+ }
+}
+
+// sendReply sends a markdown reply that respects thread context.
+// If threadID is non-empty, the reply is sent as part of that thread.
+func (a *Agent) sendReply(ctx context.Context, roomID, eventID, threadID, markdown string) error {
+ if threadID != "" {
+ return a.matrix.SendThreadMarkdown(ctx, roomID, threadID, eventID, markdown)
+ }
+ return a.matrix.SendReplyMarkdown(ctx, roomID, eventID, markdown)
+}
+
+// parseSeverity converts a config string to sanitize.Severity.
+func parseSeverity(s string) sanitize.Severity {
+ switch s {
+ case "high":
+ return sanitize.SeverityHigh
+ case "low":
+ return sanitize.SeverityLow
+ default:
+ return sanitize.SeverityMedium
+ }
+}
+
+// sanitizeInput runs prompt injection detection on the message content.
+// Returns the (possibly modified) content and true if the message should be rejected.
+func (a *Agent) sanitizeInput(content, roomID, senderID string) (string, bool) {
+ if a.sanitizeOpts == nil {
+ return content, false
+ }
+
+ result := sanitize.Sanitize(content, *a.sanitizeOpts)
+
+ for _, w := range result.Warnings {
+ a.logger.Warn("prompt_injection_detected",
+ "pattern", w.PatternName,
+ "severity", w.Severity,
+ "matched", w.Matched,
+ "sender", senderID,
+ "room", roomID,
+ )
+ }
+
+ return result.Output, result.Rejected
+}
diff --git a/agents/llm.go b/agents/llm.go
new file mode 100644
index 0000000..e223020
--- /dev/null
+++ b/agents/llm.go
@@ -0,0 +1,197 @@
+package agents
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "os"
+ "path/filepath"
+
+ "github.com/enmanuel/agents/internal/config"
+ "github.com/enmanuel/agents/pkg/command"
+ "github.com/enmanuel/agents/pkg/decision"
+ coretypes "github.com/enmanuel/agents/pkg/llm"
+ "github.com/enmanuel/agents/pkg/personality"
+ shelllm "github.com/enmanuel/agents/shell/llm"
+)
+
+// runLLM executes the LLM completion loop, including iterative tool-use.
+func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext, memKey string) (string, error) {
+ a.logger.Debug("calling LLM",
+ "model", a.cfg.LLM.Primary.Model,
+ "provider", a.cfg.LLM.Primary.Provider,
+ )
+
+ // Load system prompt from file if configured, else use description
+ systemPrompt := a.cfg.Agent.Description
+ if spFile := a.cfg.LLM.Reasoning.SystemPromptFile; spFile != "" {
+ // Resolve path relative to agent directory
+ spPath := filepath.Join("agents", a.cfg.Agent.ID, spFile)
+ if data, err := os.ReadFile(spPath); err == nil {
+ systemPrompt = string(data)
+ } else {
+ a.logger.Warn("failed to load system_prompt_file, using description", "path", spPath, "err", err)
+ }
+ }
+
+ // Concatenate personality prompt block
+ personalityBlock := personality.BuildPersonalityPrompt(a.personality)
+ if personalityBlock != "" {
+ systemPrompt = systemPrompt + "\n\n" + personalityBlock
+ }
+
+ // Build messages: conversation history from window (includes current user msg)
+ messages := a.getWindowMessages(memKey)
+ if len(messages) == 0 {
+ // Fallback if memory is disabled: just the current message
+ messages = []coretypes.Message{
+ {Role: coretypes.RoleUser, Content: msgCtx.Content},
+ }
+ }
+
+ // Build tool specs for the LLM if tool_use is enabled
+ var llmTools []coretypes.ToolSpec
+ if a.cfg.LLM.ToolUse.Enabled && a.toolReg.Len() > 0 {
+ llmTools = a.toolReg.ToLLMSpecs()
+ a.logger.Debug("tools available for LLM", "count", len(llmTools))
+ }
+
+ maxIter := a.cfg.LLM.ToolUse.MaxIterations
+ if maxIter <= 0 {
+ maxIter = defaultMaxToolIterations
+ }
+
+ // Tool-use loop: call LLM → execute tools → feed results back → repeat
+ for i := 0; i < maxIter; i++ {
+ req := coretypes.CompletionRequest{
+ Model: a.cfg.LLM.Primary.Model,
+ MaxTokens: a.cfg.LLM.Primary.MaxTokens,
+ Temperature: a.cfg.LLM.Primary.Temperature,
+ SystemPrompt: systemPrompt,
+ Messages: messages,
+ Tools: llmTools,
+ }
+
+ resp, err := a.llm(ctx, req)
+ if err != nil {
+ a.logger.Error("LLM call failed", "model", req.Model, "err", err)
+ return "", err
+ }
+
+ a.logger.Debug("LLM responded",
+ "content_len", len(resp.Content),
+ "tool_calls", len(resp.ToolCalls),
+ "finish_reason", resp.FinishReason,
+ )
+
+ // No tool calls — return the text response
+ if len(resp.ToolCalls) == 0 {
+ return resp.Content, nil
+ }
+
+ // Append assistant message with tool calls to conversation
+ messages = append(messages, coretypes.Message{
+ Role: coretypes.RoleAssistant,
+ Content: resp.Content,
+ ToolCalls: resp.ToolCalls,
+ })
+
+ // Execute each tool and append results
+ for _, tc := range resp.ToolCalls {
+ a.logger.Info("executing tool",
+ "tool", tc.Name,
+ "call_id", tc.ID,
+ )
+
+ // RBAC check for tool execution
+ if !a.acl.CanDo(msgCtx.SenderID, "tool:"+tc.Name) {
+ a.logger.Info("tool_denied", "tool", tc.Name, "sender", msgCtx.SenderID)
+ messages = append(messages, coretypes.Message{
+ Role: coretypes.RoleTool,
+ Content: "error: permission denied for tool " + tc.Name,
+ ToolCallID: tc.ID,
+ })
+ continue
+ }
+
+ // Notify the room that a tool is being called (respect thread context)
+ toolNotice := fmt.Sprintf("\U0001f528 %s", tc.Name)
+ if err := a.sendReply(ctx, msgCtx.RoomID, msgCtx.EventID, msgCtx.ThreadID, toolNotice); err != nil {
+ a.logger.Warn("failed to send tool call notice", "tool", tc.Name, "err", err)
+ }
+
+ result := a.toolReg.ExecuteForRoom(ctx, tc.Name, tc.Arguments, msgCtx.RoomID)
+
+ output := result.Output
+ if result.Err != nil {
+ output = fmt.Sprintf("error: %s", result.Err)
+ a.logger.Warn("tool execution error",
+ "tool", tc.Name,
+ "err", result.Err,
+ )
+ } else {
+ a.logger.Debug("tool executed",
+ "tool", tc.Name,
+ "output_len", len(output),
+ )
+ }
+
+ messages = append(messages, coretypes.Message{
+ Role: coretypes.RoleTool,
+ Content: output,
+ ToolCallID: tc.ID,
+ })
+ }
+ }
+
+ // Max iterations reached — return whatever we have
+ a.logger.Warn("tool-use loop reached max iterations", "max", maxIter)
+ return "I've reached the maximum number of tool iterations. Here's what I found so far.", nil
+}
+
+// initLLM creates the LLM client function with optional fallback.
+// Returns nil when no provider is configured (command-only bot).
+func initLLM(cfg *config.AgentConfig, logger *slog.Logger) (coretypes.CompleteFunc, error) {
+ if cfg.LLM.Primary.Provider == "" {
+ logger.Info("no LLM configured, running as command-only bot")
+ return nil, nil
+ }
+
+ llmLog := logger.With("component", "llm")
+ primaryLLM, err := shelllm.FromConfig(cfg.LLM.Primary, llmLog)
+ if err != nil {
+ return nil, fmt.Errorf("primary LLM: %w", err)
+ }
+
+ llmFunc := primaryLLM
+ if cfg.LLM.Fallback.Provider != "" {
+ fallbackLLM, err := shelllm.FromConfig(cfg.LLM.Fallback, llmLog)
+ if err != nil {
+ logger.Warn("fallback LLM config error", "err", err)
+ } else {
+ llmFunc = shelllm.WithFallback(primaryLLM, fallbackLLM, cfg.LLM.Fallback, llmLog)
+ }
+ }
+
+ return llmFunc, nil
+}
+
+// loadPromptCommands scans the project-root prompts/ directory and loads all .md files.
+func (a *Agent) loadPromptCommands() {
+ prompts, err := command.LoadPromptCommands("prompts")
+ if err != nil {
+ a.logger.Warn("failed to load prompt-commands", "err", err)
+ return
+ }
+ a.promptCmds = make(map[string]string, len(prompts))
+ for _, p := range prompts {
+ a.promptCmds[p.Name] = p.Content
+ }
+ if len(a.promptCmds) > 0 {
+ names := make([]string, 0, len(a.promptCmds))
+ for n := range a.promptCmds {
+ names = append(names, n)
+ }
+ a.logger.Info("prompt-commands loaded", "count", len(a.promptCmds), "names", names)
+ }
+}
diff --git a/agents/memory.go b/agents/memory.go
new file mode 100644
index 0000000..b5cd058
--- /dev/null
+++ b/agents/memory.go
@@ -0,0 +1,119 @@
+package agents
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "path/filepath"
+
+ coretypes "github.com/enmanuel/agents/pkg/llm"
+ "github.com/enmanuel/agents/pkg/memory"
+ shellmem "github.com/enmanuel/agents/shell/memory"
+)
+
+// ClearWindow resets the conversation window for a room and deletes persisted
+// messages from SQLite so the agent starts fresh. Implements toolmemory.WindowClearer.
+func (a *Agent) ClearWindow(roomID string) {
+ a.windowsMu.Lock()
+ a.windows[roomID] = memory.NewWindow(a.windowSize)
+ a.windowsMu.Unlock()
+
+ if a.memStore != nil {
+ if err := a.memStore.DeleteMessages(
+ context.Background(), a.cfg.Agent.ID, &roomID,
+ ); err != nil {
+ a.logger.Warn("failed to delete persisted messages on clear", "room", roomID, "err", err)
+ }
+ }
+}
+
+// ensureWindowLoaded loads the conversation window from SQLite on first access for a room.
+func (a *Agent) ensureWindowLoaded(ctx context.Context, roomID string) {
+ a.windowsMu.Lock()
+ defer a.windowsMu.Unlock()
+ if _, ok := a.windows[roomID]; ok {
+ return
+ }
+ w := memory.NewWindow(a.windowSize)
+ if a.memStore != nil {
+ msgs, err := a.memStore.LoadMessages(ctx, a.cfg.Agent.ID, roomID, a.windowSize)
+ if err != nil {
+ a.logger.Warn("failed to load message history", "room", roomID, "err", err)
+ } else {
+ for _, m := range msgs {
+ w = w.Append(coretypes.Message{Role: m.Role, Content: m.Content})
+ }
+ if len(msgs) > 0 {
+ a.logger.Debug("loaded message history", "room", roomID, "count", len(msgs))
+ }
+ }
+ }
+ a.windows[roomID] = w
+}
+
+// appendToWindow adds a message to the in-memory conversation window.
+func (a *Agent) appendToWindow(roomID string, msg coretypes.Message) {
+ a.windowsMu.Lock()
+ defer a.windowsMu.Unlock()
+ w, ok := a.windows[roomID]
+ if !ok {
+ w = memory.NewWindow(a.windowSize)
+ }
+ a.windows[roomID] = w.Append(msg)
+}
+
+// getWindowMessages returns a copy of the conversation window for a room.
+func (a *Agent) getWindowMessages(roomID string) []coretypes.Message {
+ a.windowsMu.RLock()
+ defer a.windowsMu.RUnlock()
+ w, ok := a.windows[roomID]
+ if !ok {
+ return nil
+ }
+ return w.ToLLMMessages()
+}
+
+// persistMessage saves a message to the SQLite store (no-op if store is nil).
+func (a *Agent) persistMessage(ctx context.Context, roomID string, role coretypes.Role, content string) {
+ if a.memStore == nil {
+ return
+ }
+ if err := a.memStore.SaveMessage(ctx, memory.HistoryMessage{
+ AgentID: a.cfg.Agent.ID,
+ RoomID: roomID,
+ Role: role,
+ Content: content,
+ }); err != nil {
+ a.logger.Warn("failed to persist message", "room", roomID, "err", err)
+ }
+}
+
+// memoryInit holds the results of memory subsystem initialization.
+type memoryInit struct {
+ store memory.Store
+ windowSize int
+}
+
+// initMemoryStore creates the memory store and resolves window size from config.
+// Returns a zero-value memoryInit if memory is disabled.
+func initMemoryStore(enabled bool, windowSizeCfg int, dbPathCfg string, dataBase string, logger *slog.Logger) (memoryInit, error) {
+ if !enabled {
+ return memoryInit{windowSize: defaultWindowSize}, nil
+ }
+
+ windowSize := windowSizeCfg
+ if windowSize <= 0 {
+ windowSize = defaultWindowSize
+ }
+
+ dbPath := dbPathCfg
+ if dbPath == "" {
+ dbPath = filepath.Join(dataBase, "memory.db")
+ }
+ store, err := shellmem.New(dbPath, logger)
+ if err != nil {
+ return memoryInit{}, fmt.Errorf("memory store: %w", err)
+ }
+ logger.Info("memory enabled", "window_size", windowSize, "db", dbPath)
+ return memoryInit{store: store, windowSize: windowSize}, nil
+}
diff --git a/agents/registry_build.go b/agents/registry_build.go
new file mode 100644
index 0000000..2fcb14c
--- /dev/null
+++ b/agents/registry_build.go
@@ -0,0 +1,276 @@
+package agents
+
+import (
+ "context"
+ "log/slog"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/enmanuel/agents/internal/config"
+ "github.com/enmanuel/agents/pkg/memory"
+ shellknowledge "github.com/enmanuel/agents/shell/knowledge"
+ shellmcp "github.com/enmanuel/agents/shell/mcp"
+ shellskills "github.com/enmanuel/agents/shell/skills"
+ "github.com/enmanuel/agents/shell/ssh"
+ "github.com/enmanuel/agents/tools"
+ toolclock "github.com/enmanuel/agents/tools/clock"
+ toolfile "github.com/enmanuel/agents/tools/file"
+ toolhttp "github.com/enmanuel/agents/tools/http"
+ toolimdb "github.com/enmanuel/agents/tools/imdb"
+ toolknowledge "github.com/enmanuel/agents/tools/knowledgetools"
+ toolmatrix "github.com/enmanuel/agents/tools/matrix"
+ toolmcp "github.com/enmanuel/agents/tools/mcptools"
+ toolmemory "github.com/enmanuel/agents/tools/memorytools"
+ toolskills "github.com/enmanuel/agents/tools/skilltools"
+ toolssh "github.com/enmanuel/agents/tools/ssh"
+ toolweather "github.com/enmanuel/agents/tools/weather"
+
+ "github.com/enmanuel/agents/shell/matrix"
+)
+
+// toolDeps holds external subsystem instances needed by the tool registry.
+type toolDeps struct {
+ kStore *shellknowledge.FileStore
+ sharedKStore *shellknowledge.FileStore
+ mcpManager *shellmcp.Manager
+ skillLoader *shellskills.Loader
+ skillExecutor *shellskills.Executor
+}
+
+// initToolDeps initializes knowledge stores, MCP manager, and skills loader
+// based on the agent config. All results are optional (nil when disabled).
+func initToolDeps(cfg *config.AgentConfig, dataBase string, logger *slog.Logger) toolDeps {
+ var deps toolDeps
+
+ // Knowledge store
+ if cfg.Tools.Knowledge.Enabled {
+ knowledgeDir := cfg.Tools.Knowledge.Dir
+ if knowledgeDir == "" {
+ knowledgeDir = filepath.Join("agents", cfg.Agent.ID, "knowledge")
+ }
+ knowledgeDBPath := filepath.Join(dataBase, "knowledge.db")
+ kStore, kErr := shellknowledge.New(knowledgeDir, knowledgeDBPath, logger)
+ if kErr != nil {
+ logger.Error("knowledge_store_init_failed", "err", kErr)
+ } else {
+ if syncErr := kStore.Sync(context.Background()); syncErr != nil {
+ logger.Error("knowledge_sync_failed", "err", syncErr)
+ }
+ deps.kStore = kStore
+ }
+ }
+
+ // Shared knowledge store
+ if cfg.Tools.SharedKnowledge.Enabled {
+ sharedDir := cfg.Tools.SharedKnowledge.Dir
+ if sharedDir == "" {
+ sharedDir = "knowledges"
+ }
+ sharedDBPath := cfg.Tools.SharedKnowledge.DBPath
+ if sharedDBPath == "" {
+ sharedDBPath = "knowledges/data/knowledge.db"
+ }
+ sharedKStore, skErr := shellknowledge.New(sharedDir, sharedDBPath, logger)
+ if skErr != nil {
+ logger.Error("shared_knowledge_store_init_failed", "err", skErr)
+ } else {
+ if syncErr := sharedKStore.Sync(context.Background()); syncErr != nil {
+ logger.Error("shared_knowledge_sync_failed", "err", syncErr)
+ }
+ logger.Info("shared knowledge enabled", "dir", sharedDir, "db", sharedDBPath)
+ deps.sharedKStore = sharedKStore
+ }
+ }
+
+ // MCP client manager — connects to external MCP servers
+ if cfg.Tools.MCP.Enabled && len(cfg.Tools.MCP.Servers) > 0 {
+ mcpManager, mcpErr := shellmcp.NewManager(context.Background(), cfg.Tools.MCP.Servers, logger)
+ if mcpErr != nil {
+ logger.Error("mcp_manager_init_failed", "err", mcpErr)
+ } else {
+ logger.Info("mcp manager initialized", "servers", len(cfg.Tools.MCP.Servers))
+ deps.mcpManager = mcpManager
+ }
+ }
+
+ // Skills loader
+ if cfg.Skills.Enabled {
+ skillsPath := cfg.Skills.SkillsPath
+ if skillsPath == "" {
+ skillsPath = "skills/"
+ }
+ deps.skillLoader = shellskills.NewLoader(skillsPath)
+
+ // Skills executor for scripts
+ allowedInterpreters := cfg.Tools.Skills.AllowedInterpreters
+ timeout := cfg.Skills.Timeout
+ if timeout == 0 {
+ timeout = 60 * time.Second
+ }
+ deps.skillExecutor = shellskills.NewExecutor(allowedInterpreters, timeout)
+ logger.Info("skills enabled", "path", skillsPath, "categories", cfg.Skills.Categories)
+ }
+
+ return deps
+}
+
+// initRateLimiter configures the rate limiter on the tool registry if enabled.
+func initRateLimiter(cfg *config.AgentConfig, toolReg *tools.Registry, logger *slog.Logger) {
+ if !cfg.Security.ToolRateLimit.Enabled {
+ return
+ }
+ maxCalls := cfg.Security.ToolRateLimit.MaxCallsPerMin
+ if maxCalls <= 0 {
+ maxCalls = 10
+ }
+ rl := tools.NewRateLimiter(maxCalls, time.Minute)
+ toolReg.SetRateLimiter(rl)
+
+ cleanupInterval := cfg.Security.ToolRateLimit.CleanupIntervalS
+ if cleanupInterval <= 0 {
+ cleanupInterval = 60
+ }
+ go func() {
+ ticker := time.NewTicker(time.Duration(cleanupInterval) * time.Second)
+ defer ticker.Stop()
+ for range ticker.C {
+ rl.Cleanup()
+ }
+ }()
+ logger.Info("tool rate limiting enabled", "max_calls_per_min", maxCalls)
+}
+
+// buildToolRegistry creates a Registry with tools enabled in the agent's config.
+func buildToolRegistry(
+ cfg *config.AgentConfig,
+ sshExec *ssh.Executor,
+ matrixClient *matrix.Client,
+ memStore memory.Store,
+ kStore *shellknowledge.FileStore,
+ sharedKStore *shellknowledge.FileStore,
+ mcpManager *shellmcp.Manager,
+ skillLoader *shellskills.Loader,
+ skillExecutor *shellskills.Executor,
+ roomCtx *toolmemory.RoomContext,
+ logger *slog.Logger,
+) *tools.Registry {
+ reg := tools.NewRegistry(logger)
+
+ if cfg.Tools.HTTP.Enabled {
+ reg.Register(toolhttp.NewHTTPGet(cfg.Tools.HTTP))
+ reg.Register(toolhttp.NewHTTPPost(cfg.Tools.HTTP))
+ logger.Debug("registered http tools")
+ }
+
+ if cfg.Tools.SSH.Enabled {
+ reg.Register(toolssh.NewSSHCommand(cfg.Tools.SSH, sshExec))
+ logger.Debug("registered ssh tool")
+ }
+
+ if cfg.Tools.FileOps.Enabled {
+ reg.Register(toolfile.NewReadFile(cfg.Tools.FileOps))
+ reg.Register(toolfile.NewListDirectory(cfg.Tools.FileOps))
+ if !cfg.Tools.FileOps.ReadOnly {
+ reg.Register(toolfile.NewWriteFile(cfg.Tools.FileOps))
+ reg.Register(toolfile.NewAppendFile(cfg.Tools.FileOps))
+ reg.Register(toolfile.NewDeleteFile(cfg.Tools.FileOps))
+ }
+ logger.Debug("registered file tools")
+ }
+
+ // current_time is always available
+ reg.Register(toolclock.NewCurrentTime())
+ logger.Debug("registered current_time tool")
+
+ // weather tool is always available
+ reg.Register(toolweather.NewWeather())
+ logger.Debug("registered weather tool")
+
+ // imdb tool (enabled via config)
+ if cfg.Tools.IMDb.Enabled {
+ reg.Register(toolimdb.NewIMDbSearch(cfg.Tools.IMDb))
+ logger.Debug("registered imdb tool")
+ }
+
+ // matrix_send is always available
+ reg.Register(toolmatrix.NewMatrixSend(matrixClient, cfg.Tools.Matrix))
+ logger.Debug("registered matrix tool")
+
+ // Memory tools (memory_clear_context registered later since it needs the Agent)
+ if cfg.Tools.Memory.Enabled && memStore != nil {
+ reg.Register(toolmemory.NewMemorySave(cfg.Agent.ID, memStore))
+ reg.Register(toolmemory.NewMemoryRecall(cfg.Agent.ID, memStore))
+ reg.Register(toolmemory.NewMemoryForget(cfg.Agent.ID, memStore))
+ reg.Register(toolmemory.NewMemorySummary(cfg.Agent.ID, memStore))
+ logger.Debug("registered memory tools")
+ }
+
+ // Knowledge tools
+ if cfg.Tools.Knowledge.Enabled && kStore != nil {
+ reg.Register(toolknowledge.NewKnowledgeSearch(kStore))
+ reg.Register(toolknowledge.NewKnowledgeRead(kStore))
+ reg.Register(toolknowledge.NewKnowledgeWrite(kStore))
+ reg.Register(toolknowledge.NewKnowledgeList(kStore))
+ logger.Debug("registered knowledge tools")
+ }
+
+ // Shared knowledge tools
+ if cfg.Tools.SharedKnowledge.Enabled && sharedKStore != nil {
+ sharedTools := toolknowledge.NewSharedKnowledgeTools(sharedKStore)
+ for _, tool := range sharedTools {
+ reg.Register(tool)
+ }
+ logger.Debug("registered shared knowledge tools", "count", len(sharedTools))
+ }
+
+ // MCP tools — register tools from all connected MCP servers
+ if mcpManager != nil {
+ for serverName, mcpClient := range mcpManager.AllClients() {
+ // Find the config for this server to get prefix, filter, timeout
+ var serverCfg *config.MCPServerCfg
+ for i := range cfg.Tools.MCP.Servers {
+ if cfg.Tools.MCP.Servers[i].Name == serverName {
+ serverCfg = &cfg.Tools.MCP.Servers[i]
+ break
+ }
+ }
+ if serverCfg == nil {
+ logger.Warn("no config found for MCP server", "name", serverName)
+ continue
+ }
+
+ // Convert and register MCP tools
+ mcpTools := toolmcp.FromMCPServer(mcpClient, serverCfg.Prefix, serverCfg.Tools, serverCfg.Timeout, logger)
+ for _, tool := range mcpTools {
+ reg.Register(tool)
+ }
+ logger.Debug("registered MCP tools", "server", serverName, "count", len(mcpTools))
+ }
+ }
+
+ // Skills tools — register skill search, load, read, and run tools
+ if skillLoader != nil {
+ reg.Register(toolskills.NewSkillSearch(skillLoader, cfg.Skills.Categories))
+ reg.Register(toolskills.NewSkillLoad(skillLoader))
+ reg.Register(toolskills.NewSkillReadResource(skillLoader))
+ if skillExecutor != nil {
+ reg.Register(toolskills.NewSkillRunScript(skillLoader, skillExecutor))
+ }
+ logger.Debug("registered skills tools")
+ }
+
+ return reg
+}
+
+// resolveDataBase returns the base directory for agent runtime data.
+// Priority: config storage.base_path > $AGENTS_DATA_DIR/ > agents//data
+func resolveDataBase(cfg *config.AgentConfig) string {
+ if cfg.Storage.BasePath != "" {
+ return cfg.Storage.BasePath
+ }
+ if envDir := os.Getenv("AGENTS_DATA_DIR"); envDir != "" {
+ return filepath.Join(envDir, cfg.Agent.ID)
+ }
+ return filepath.Join("agents", cfg.Agent.ID, "data")
+}
diff --git a/agents/runtime.go b/agents/runtime.go
index b541331..43786b7 100644
--- a/agents/runtime.go
+++ b/agents/runtime.go
@@ -20,31 +20,18 @@ import (
"github.com/enmanuel/agents/pkg/decision"
coretypes "github.com/enmanuel/agents/pkg/llm"
"github.com/enmanuel/agents/pkg/memory"
- "github.com/enmanuel/agents/pkg/orchestration"
"github.com/enmanuel/agents/pkg/personality"
"github.com/enmanuel/agents/pkg/sanitize"
"github.com/enmanuel/agents/shell/bus"
shellcron "github.com/enmanuel/agents/shell/cron"
"github.com/enmanuel/agents/shell/effects"
shellknowledge "github.com/enmanuel/agents/shell/knowledge"
- shelllm "github.com/enmanuel/agents/shell/llm"
"github.com/enmanuel/agents/shell/matrix"
shellmcp "github.com/enmanuel/agents/shell/mcp"
- shellmem "github.com/enmanuel/agents/shell/memory"
shellskills "github.com/enmanuel/agents/shell/skills"
"github.com/enmanuel/agents/shell/ssh"
"github.com/enmanuel/agents/tools"
- toolclock "github.com/enmanuel/agents/tools/clock"
- toolfile "github.com/enmanuel/agents/tools/file"
- toolhttp "github.com/enmanuel/agents/tools/http"
- toolimdb "github.com/enmanuel/agents/tools/imdb"
- toolknowledge "github.com/enmanuel/agents/tools/knowledgetools"
- toolmatrix "github.com/enmanuel/agents/tools/matrix"
- toolmcp "github.com/enmanuel/agents/tools/mcptools"
toolmemory "github.com/enmanuel/agents/tools/memorytools"
- toolskills "github.com/enmanuel/agents/tools/skilltools"
- toolssh "github.com/enmanuel/agents/tools/ssh"
- toolweather "github.com/enmanuel/agents/tools/weather"
)
const (
@@ -77,10 +64,10 @@ type Agent struct {
acl acl.ACL
// Commands — handlers keyed by canonical name; cmdAliases maps alias → canonical
- commands map[string]CommandHandler
- cmdAliases map[string]string // alias → canonical name
- customSpecs []command.Spec // specs from RegisterCommand (for !help)
- startTime time.Time
+ commands map[string]CommandHandler
+ cmdAliases map[string]string // alias → canonical name
+ customSpecs []command.Spec // specs from RegisterCommand (for !help)
+ startTime time.Time
// Memory
windows map[string]memory.Window
@@ -111,22 +98,6 @@ type Agent struct {
scheduler *shellcron.Scheduler
}
-// ClearWindow resets the conversation window for a room and deletes persisted
-// messages from SQLite so the agent starts fresh. Implements toolmemory.WindowClearer.
-func (a *Agent) ClearWindow(roomID string) {
- a.windowsMu.Lock()
- a.windows[roomID] = memory.NewWindow(a.windowSize)
- a.windowsMu.Unlock()
-
- if a.memStore != nil {
- if err := a.memStore.DeleteMessages(
- context.Background(), a.cfg.Agent.ID, &roomID,
- ); err != nil {
- a.logger.Warn("failed to delete persisted messages on clear", "room", roomID, "err", err)
- }
- }
-}
-
// New assembles an Agent from its config, rules, pre-resolved ACL, and logger.
// The ACL is resolved externally (e.g. from security/ YAML files) and injected here.
// Pass acl.ACL{} (empty) for open access (no restrictions).
@@ -138,196 +109,46 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, agentACL acl.ACL, logge
}
// E2EE — initialize before the sync loop starts
- var cryptoStore io.Closer
- if cfg.Matrix.Encryption.Enabled {
- storePath := filepath.Join(cfg.Matrix.Encryption.StorePath, "crypto.db")
- pickleKey := os.Getenv(cfg.Matrix.Encryption.PickleKeyEnv)
- logger.Info("initializing e2ee", "store", storePath)
- cryptoStore, err = matrixClient.InitCrypto(context.Background(), storePath, pickleKey, cfg.Agent.ID)
- if err != nil {
- return nil, fmt.Errorf("e2ee init: %w", err)
- }
-
- // Auto-fetch cross-signing private keys from SSSS if recovery key is configured.
- if envName := cfg.Matrix.Encryption.RecoveryKeyEnv; envName != "" {
- if rk := os.Getenv(envName); rk != "" {
- if err := matrixClient.FetchCrossSigningKeys(context.Background(), rk); err != nil {
- logger.Warn("failed to fetch cross-signing keys from SSSS (non-fatal)", "err", err)
- } else {
- logger.Info("cross-signing private keys fetched from SSSS")
- }
- }
- }
-
- // Sign own device with the self-signing key so Element shows it as verified.
- if err := matrixClient.SignOwnDevice(context.Background()); err != nil {
- logger.Warn("failed to sign own device (non-fatal)", "err", err)
- } else {
- logger.Info("own device signed with cross-signing key")
- }
-
- logger.Info("e2ee ready")
+ cryptoStore, err := initCrypto(cfg, matrixClient, logger)
+ if err != nil {
+ return nil, err
}
// SSH executor
sshExec := ssh.NewExecutor(cfg.SSH, logger)
// LLM client — optional; if no provider is configured, the agent runs as simple_bot
- var llmFunc coretypes.CompleteFunc
- if cfg.LLM.Primary.Provider != "" {
- llmLog := logger.With("component", "llm")
- primaryLLM, err := shelllm.FromConfig(cfg.LLM.Primary, llmLog)
- if err != nil {
- return nil, fmt.Errorf("primary LLM: %w", err)
- }
-
- llmFunc = primaryLLM
- if cfg.LLM.Fallback.Provider != "" {
- fallbackLLM, err := shelllm.FromConfig(cfg.LLM.Fallback, llmLog)
- if err != nil {
- logger.Warn("fallback LLM config error", "err", err)
- } else {
- llmFunc = shelllm.WithFallback(primaryLLM, fallbackLLM, cfg.LLM.Fallback, llmLog)
- }
- }
- } else {
- logger.Info("no LLM configured, running as command-only bot")
+ llmFunc, err := initLLM(cfg, logger)
+ if err != nil {
+ return nil, err
}
// Effects runner
runner := effects.NewRunner(matrixClient, sshExec, logger)
- // Resolve base data path for this agent.
- // Priority: config storage.base_path > $AGENTS_DATA_DIR/ > agents//data
+ // Resolve base data path for this agent
dataBase := resolveDataBase(cfg)
logger.Debug("data base path", "path", dataBase)
// Memory subsystem
- var memStore memory.Store
- windowSize := defaultWindowSize
- roomCtx := &toolmemory.RoomContext{}
-
- if cfg.Memory.Enabled {
- windowSize = cfg.Memory.WindowSize
- if windowSize <= 0 {
- windowSize = defaultWindowSize
- }
-
- dbPath := cfg.Memory.DBPath
- if dbPath == "" {
- dbPath = filepath.Join(dataBase, "memory.db")
- }
- store, err := shellmem.New(dbPath, logger)
- if err != nil {
- return nil, fmt.Errorf("memory store: %w", err)
- }
- memStore = store
- logger.Info("memory enabled", "window_size", windowSize, "db", dbPath)
+ memInit, err := initMemoryStore(cfg.Memory.Enabled, cfg.Memory.WindowSize, cfg.Memory.DBPath, dataBase, logger)
+ if err != nil {
+ return nil, err
}
- // Knowledge store
- var kStore *shellknowledge.FileStore
- if cfg.Tools.Knowledge.Enabled {
- knowledgeDir := cfg.Tools.Knowledge.Dir
- if knowledgeDir == "" {
- knowledgeDir = filepath.Join("agents", cfg.Agent.ID, "knowledge")
- }
- knowledgeDBPath := filepath.Join(dataBase, "knowledge.db")
- var kErr error
- kStore, kErr = shellknowledge.New(knowledgeDir, knowledgeDBPath, logger)
- if kErr != nil {
- logger.Error("knowledge_store_init_failed", "err", kErr)
- } else {
- if syncErr := kStore.Sync(context.Background()); syncErr != nil {
- logger.Error("knowledge_sync_failed", "err", syncErr)
- }
- }
- }
-
- // Shared knowledge store
- var sharedKStore *shellknowledge.FileStore
- if cfg.Tools.SharedKnowledge.Enabled {
- sharedDir := cfg.Tools.SharedKnowledge.Dir
- if sharedDir == "" {
- sharedDir = "knowledges"
- }
- sharedDBPath := cfg.Tools.SharedKnowledge.DBPath
- if sharedDBPath == "" {
- sharedDBPath = "knowledges/data/knowledge.db"
- }
- var skErr error
- sharedKStore, skErr = shellknowledge.New(sharedDir, sharedDBPath, logger)
- if skErr != nil {
- logger.Error("shared_knowledge_store_init_failed", "err", skErr)
- } else {
- if syncErr := sharedKStore.Sync(context.Background()); syncErr != nil {
- logger.Error("shared_knowledge_sync_failed", "err", syncErr)
- }
- logger.Info("shared knowledge enabled", "dir", sharedDir, "db", sharedDBPath)
- }
- }
+ // Tool dependencies (knowledge, MCP, skills)
+ deps := initToolDeps(cfg, dataBase, logger)
if !agentACL.Empty() {
logger.Info("acl enabled (centralized security policy)")
}
- // MCP client manager — connects to external MCP servers
- var mcpManager *shellmcp.Manager
- if cfg.Tools.MCP.Enabled && len(cfg.Tools.MCP.Servers) > 0 {
- var mcpErr error
- mcpManager, mcpErr = shellmcp.NewManager(context.Background(), cfg.Tools.MCP.Servers, logger)
- if mcpErr != nil {
- logger.Error("mcp_manager_init_failed", "err", mcpErr)
- } else {
- logger.Info("mcp manager initialized", "servers", len(cfg.Tools.MCP.Servers))
- }
- }
-
- // Skills loader
- var skillLoader *shellskills.Loader
- var skillExecutor *shellskills.Executor
- if cfg.Skills.Enabled {
- skillsPath := cfg.Skills.SkillsPath
- if skillsPath == "" {
- skillsPath = "skills/"
- }
- skillLoader = shellskills.NewLoader(skillsPath)
-
- // Skills executor for scripts
- allowedInterpreters := cfg.Tools.Skills.AllowedInterpreters
- timeout := cfg.Skills.Timeout
- if timeout == 0 {
- timeout = 60 * time.Second
- }
- skillExecutor = shellskills.NewExecutor(allowedInterpreters, timeout)
- logger.Info("skills enabled", "path", skillsPath, "categories", cfg.Skills.Categories)
- }
-
// Tool registry — register tools enabled in config
- toolReg := buildToolRegistry(cfg, sshExec, matrixClient, memStore, kStore, sharedKStore, mcpManager, skillLoader, skillExecutor, roomCtx, logger)
+ roomCtx := &toolmemory.RoomContext{}
+ toolReg := buildToolRegistry(cfg, sshExec, matrixClient, memInit.store, deps.kStore, deps.sharedKStore, deps.mcpManager, deps.skillLoader, deps.skillExecutor, roomCtx, logger)
// Rate limiting for tools
- if cfg.Security.ToolRateLimit.Enabled {
- maxCalls := cfg.Security.ToolRateLimit.MaxCallsPerMin
- if maxCalls <= 0 {
- maxCalls = 10
- }
- rl := tools.NewRateLimiter(maxCalls, time.Minute)
- toolReg.SetRateLimiter(rl)
-
- cleanupInterval := cfg.Security.ToolRateLimit.CleanupIntervalS
- if cleanupInterval <= 0 {
- cleanupInterval = 60
- }
- go func() {
- ticker := time.NewTicker(time.Duration(cleanupInterval) * time.Second)
- defer ticker.Stop()
- for range ticker.C {
- rl.Cleanup()
- }
- }()
- logger.Info("tool rate limiting enabled", "max_calls_per_min", maxCalls)
- }
+ initRateLimiter(cfg, toolReg, logger)
a := &Agent{
cfg: cfg,
@@ -340,17 +161,17 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, agentACL acl.ACL, logge
toolReg: toolReg,
logger: logger,
cryptoStore: cryptoStore,
- mcpManager: mcpManager,
+ mcpManager: deps.mcpManager,
done: make(chan struct{}),
commands: make(map[string]CommandHandler),
cmdAliases: command.BuiltinNames(),
startTime: time.Now(),
windows: make(map[string]memory.Window),
- memStore: memStore,
- knowledgeStore: kStore,
- sharedKnowledgeStore: sharedKStore,
- skillLoader: skillLoader,
- windowSize: windowSize,
+ memStore: memInit.store,
+ knowledgeStore: deps.kStore,
+ sharedKnowledgeStore: deps.sharedKStore,
+ skillLoader: deps.skillLoader,
+ windowSize: memInit.windowSize,
roomCtx: roomCtx,
}
@@ -375,7 +196,7 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, agentACL acl.ACL, logge
a.loadPromptCommands()
// Register memory_clear_context with self as WindowClearer (after a is created)
- if cfg.Tools.Memory.Enabled && memStore != nil {
+ if cfg.Tools.Memory.Enabled && memInit.store != nil {
toolReg.Register(toolmemory.NewMemoryClearContext(a, roomCtx))
}
@@ -391,6 +212,43 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, agentACL acl.ACL, logge
return a, nil
}
+// initCrypto initializes E2EE if enabled and returns the crypto store closer.
+func initCrypto(cfg *config.AgentConfig, matrixClient *matrix.Client, logger *slog.Logger) (io.Closer, error) {
+ if !cfg.Matrix.Encryption.Enabled {
+ return nil, nil
+ }
+
+ storePath := filepath.Join(cfg.Matrix.Encryption.StorePath, "crypto.db")
+ pickleKey := os.Getenv(cfg.Matrix.Encryption.PickleKeyEnv)
+ logger.Info("initializing e2ee", "store", storePath)
+
+ cryptoStore, err := matrixClient.InitCrypto(context.Background(), storePath, pickleKey, cfg.Agent.ID)
+ if err != nil {
+ return nil, fmt.Errorf("e2ee init: %w", err)
+ }
+
+ // Auto-fetch cross-signing private keys from SSSS if recovery key is configured.
+ if envName := cfg.Matrix.Encryption.RecoveryKeyEnv; envName != "" {
+ if rk := os.Getenv(envName); rk != "" {
+ if err := matrixClient.FetchCrossSigningKeys(context.Background(), rk); err != nil {
+ logger.Warn("failed to fetch cross-signing keys from SSSS (non-fatal)", "err", err)
+ } else {
+ logger.Info("cross-signing private keys fetched from SSSS")
+ }
+ }
+ }
+
+ // Sign own device with the self-signing key so Element shows it as verified.
+ if err := matrixClient.SignOwnDevice(context.Background()); err != nil {
+ logger.Warn("failed to sign own device (non-fatal)", "err", err)
+ } else {
+ logger.Info("own device signed with cross-signing key")
+ }
+
+ logger.Info("e2ee ready")
+ return cryptoStore, nil
+}
+
// RegisterCommand adds a custom command handler for this agent.
// The spec provides metadata (aliases, description, usage) for !help.
// Must be called before Run().
@@ -404,26 +262,6 @@ func (a *Agent) RegisterCommand(spec command.Spec, handler CommandHandler) {
a.logger.Info("command_registered", "command", spec.Name, "aliases", spec.Aliases)
}
-// loadPromptCommands scans the project-root prompts/ directory and loads all .md files.
-func (a *Agent) loadPromptCommands() {
- prompts, err := command.LoadPromptCommands("prompts")
- if err != nil {
- a.logger.Warn("failed to load prompt-commands", "err", err)
- return
- }
- a.promptCmds = make(map[string]string, len(prompts))
- for _, p := range prompts {
- a.promptCmds[p.Name] = p.Content
- }
- if len(a.promptCmds) > 0 {
- names := make([]string, 0, len(a.promptCmds))
- for n := range a.promptCmds {
- names = append(names, n)
- }
- a.logger.Info("prompt-commands loaded", "count", len(a.promptCmds), "names", names)
- }
-}
-
// SetBus attaches the agent to the inter-agent bus for orchestration.
// Must be called before Run().
func (a *Agent) SetBus(b *bus.Bus) {
@@ -510,679 +348,3 @@ func (a *Agent) Run(ctx context.Context) error {
return a.listener.Run(ctx)
}
-
-// listenBus processes messages from the inter-agent bus.
-func (a *Agent) listenBus(ctx context.Context, ch <-chan bus.AgentMessage) {
- for {
- select {
- case <-ctx.Done():
- return
- case msg, ok := <-ch:
- if !ok {
- return
- }
- if msg.Kind == bus.KindTask {
- a.handleTaskEvent(ctx, msg)
- }
- }
- }
-}
-
-// handleTaskEvent processes a task delegated by the orchestrator.
-// The bot generates a response and sends it both to Matrix and back via bus.
-func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) {
- taskJSON, ok := msg.Payload["task_json"]
- if !ok {
- a.logger.Error("task message missing task_json payload")
- return
- }
-
- task, err := orchestration.UnmarshalTaskEvent(taskJSON)
- if err != nil {
- a.logger.Error("failed to unmarshal task event", "err", err)
- return
- }
-
- a.logger.Info("handling orchestrated task",
- "task_id", task.TaskID,
- "room", task.TargetRoomID,
- "sender", task.OriginalSender,
- "iteration", task.Iteration,
- )
-
- roomID := task.TargetRoomID
-
- // Update room context for memory tools
- a.roomCtx.Set(roomID)
-
- if a.cfg.Personality.Behavior.TypingIndicator {
- _ = a.matrix.SendTyping(ctx, roomID, true)
- defer a.matrix.SendTyping(ctx, roomID, false)
- }
-
- // Build a synthetic MessageContext from the task
- msgCtx := decision.MessageContext{
- SenderID: task.OriginalSender,
- RoomID: roomID,
- Content: task.OriginalQuestion,
- IsDirectMsg: false,
- IsMention: true, // treat orchestrated tasks like mentions
- }
-
- // If there are previous responses, prepend context
- if len(task.PreviousResponses) > 0 {
- var context string
- for _, pr := range task.PreviousResponses {
- context += fmt.Sprintf("[Previous response from %s]: %s\n\n", pr.BotID, pr.Text)
- }
- msgCtx.Content = context + "Original question: " + task.OriginalQuestion +
- "\n\nPlease provide an improved or complementary answer."
- }
-
- // Sanitize orchestrated input
- sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID)
- if rejected {
- a.logger.Warn("orchestrated task rejected by sanitizer",
- "task_id", task.TaskID, "sender", task.OriginalSender)
- _ = a.matrix.SendMarkdown(ctx, roomID, "El mensaje fue rechazado por el filtro de seguridad.")
- return
- }
- msgCtx.Content = sanitized
-
- // Load memory and run LLM
- a.ensureWindowLoaded(ctx, roomID)
- a.appendToWindow(roomID, coretypes.Message{
- Role: coretypes.RoleUser, Content: msgCtx.Content,
- })
-
- reply, err := a.runLLM(ctx, msgCtx, roomID)
-
- // Build the result to send back via bus
- result := orchestration.TaskResult{
- TaskID: task.TaskID,
- BotID: a.cfg.Agent.ID,
- }
-
- if err != nil {
- a.logger.Error("LLM error during orchestrated task", "err", err)
- result.Error = err.Error()
- reply = "Sorry, I encountered an error."
- } else {
- result.Text = reply
- // Persist assistant reply
- a.appendToWindow(roomID, coretypes.Message{
- Role: coretypes.RoleAssistant, Content: reply,
- })
- a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply)
- }
-
- // Send reply to Matrix room
- if sendErr := a.matrix.SendMarkdown(ctx, roomID, reply); sendErr != nil {
- a.logger.Error("failed to send orchestrated reply to Matrix", "err", sendErr)
- }
-
- // Send result back to orchestrator via bus
- resultJSON, marshalErr := orchestration.MarshalTaskResult(result)
- if marshalErr != nil {
- a.logger.Error("failed to marshal task result", "err", marshalErr)
- return
- }
-
- replyMsg := bus.AgentMessage{
- From: bus.AgentID(a.cfg.Agent.ID),
- To: msg.From,
- Kind: bus.KindTaskResult,
- Payload: map[string]string{"result_json": resultJSON},
- }
-
- if busErr := a.agentBus.Reply(task.TaskID, replyMsg); busErr != nil {
- a.logger.Error("failed to send task result via bus", "err", busErr)
- }
-}
-
-// handleEvent is called by the matrix Listener for each filtered incoming event.
-func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) {
- a.logger.Debug("handling event",
- "sender", msgCtx.SenderID,
- "is_dm", msgCtx.IsDirectMsg,
- "is_mention", msgCtx.IsMention,
- "command", msgCtx.Command,
- )
-
- roomID := evt.RoomID.String()
-
- // Update room context for memory tools
- a.roomCtx.Set(roomID)
-
- if a.cfg.Personality.Behavior.TypingIndicator {
- _ = a.matrix.SendTyping(ctx, roomID, true)
- defer a.matrix.SendTyping(ctx, roomID, false)
- }
-
- // ── Command flow ─────────────────────────────────────────────────
- // Commands (!xxx) always resolve before rules or LLM. Never reach the LLM.
- // Priority: built-in → unknown (agent-specific commands can be added via RegisterCommand).
- if msgCtx.Command != "" {
- a.logger.Info("command_received",
- "command", msgCtx.Command,
- "sender", msgCtx.SenderID,
- "room", roomID,
- "args", msgCtx.Args,
- )
-
- // Resolve aliases
- cmdName := msgCtx.Command
- if canonical, ok := a.cmdAliases[cmdName]; ok {
- cmdName = canonical
- }
-
- if handler, ok := a.commands[cmdName]; ok {
- // RBAC check for commands
- if !a.acl.CanDo(msgCtx.SenderID, "command:"+cmdName) {
- a.logger.Info("command_denied", "command", cmdName, "sender", msgCtx.SenderID)
- _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
- "No tienes permisos para ejecutar este comando.")
- return
- }
- a.logger.Info("command_executed", "command", cmdName)
- reply := handler(ctx, msgCtx)
- _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, reply)
- return
- }
-
- // Prompt-command: expand .md content and pass to LLM
- if content, ok := a.promptCmds[cmdName]; ok {
- a.logger.Info("prompt_command_expanded", "command", cmdName)
- msgCtx.Content = command.ExpandPrompt(content, msgCtx.Args)
- msgCtx.Command = ""
- msgCtx.Args = nil
- // Fall through to rules/LLM flow below
- } else {
- // Unknown command — never falls through to rules or LLM
- a.logger.Info("command_unknown", "command", msgCtx.Command)
- _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
- fmt.Sprintf("Comando desconocido: `!%s`. Usa `!help` para ver comandos disponibles.", msgCtx.Command))
- return
- }
- }
-
- // ── Non-command flow ─────────────────────────────────────────────
- // RBAC check for LLM access ("ask" action)
- if !a.acl.CanDo(msgCtx.SenderID, "ask") {
- a.logger.Info("ask_denied", "sender", msgCtx.SenderID)
- _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
- "No tienes permisos para interactuar con este agente.")
- return
- }
-
- actions := decision.Evaluate(msgCtx, a.rules)
- a.logger.Debug("rules evaluated", "matched_actions", len(actions))
-
- // If no rules matched and the message mentions the bot or is a DM, use LLM.
- if len(actions) == 0 && (msgCtx.IsMention || msgCtx.IsDirectMsg) {
- if a.llm == nil {
- // Simple bot: no LLM, ignore non-command messages
- a.logger.Debug("no LLM configured, ignoring non-command message")
- return
- }
- a.logger.Debug("no rules matched, falling back to LLM")
- actions = []decision.Action{{
- Kind: decision.ActionKindLLM,
- LLM: &decision.LLMAction{ContextKey: msgCtx.RoomID},
- }}
- }
-
- if len(actions) == 0 {
- a.logger.Debug("no actions, ignoring message",
- "is_dm", msgCtx.IsDirectMsg,
- "is_mention", msgCtx.IsMention,
- )
- return
- }
-
- a.executeActions(ctx, roomID, msgCtx, actions)
-}
-
-// executeActions expands LLM actions and runs the effects runner.
-func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decision.MessageContext, actions []decision.Action) {
- // Auto-thread: if configured and message is not already in a thread,
- // start a new thread rooted at the user's message.
- if a.cfg.Matrix.Threads.AutoThread && msgCtx.ThreadID == "" && msgCtx.EventID != "" {
- msgCtx.ThreadID = msgCtx.EventID
- }
-
- // Sanitize user input before sending to LLM
- sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID)
- if rejected {
- a.runner.Execute(ctx, roomID, []decision.Action{{
- Kind: decision.ActionKindReply,
- Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
- }})
- return
- }
- msgCtx.Content = sanitized
-
- // Resolve memory key: use thread root as context key when inside a thread,
- // so parallel threads in the same room have independent conversation windows.
- memKey := roomID
- if msgCtx.ThreadID != "" {
- memKey = msgCtx.ThreadID
- }
-
- expanded := make([]decision.Action, 0, len(actions))
- for _, act := range actions {
- if act.Kind == decision.ActionKindLLM {
- if a.llm == nil {
- a.logger.Warn("LLM action requested but no LLM configured")
- expanded = append(expanded, decision.Action{
- Kind: decision.ActionKindReply,
- Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
- })
- continue
- }
- // Memory: load window + append user message before LLM call
- a.ensureWindowLoaded(ctx, memKey)
- a.appendToWindow(memKey, coretypes.Message{
- Role: coretypes.RoleUser, Content: msgCtx.Content,
- })
- a.persistMessage(ctx, memKey, coretypes.RoleUser, msgCtx.Content)
-
- reply, err := a.runLLM(ctx, msgCtx, memKey)
- if err != nil {
- a.logger.Error("llm error", "err", err)
- expanded = append(expanded, decision.Action{
- Kind: decision.ActionKindReply,
- Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
- })
- } else {
- expanded = append(expanded, decision.Action{
- Kind: decision.ActionKindReply,
- Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
- })
-
- // Memory: append assistant reply after LLM call
- a.appendToWindow(memKey, coretypes.Message{
- Role: coretypes.RoleAssistant, Content: reply,
- })
- a.persistMessage(ctx, memKey, coretypes.RoleAssistant, reply)
- }
- } else {
- expanded = append(expanded, act)
- }
- }
-
- a.runner.Execute(ctx, roomID, expanded)
-}
-
-func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext, memKey string) (string, error) {
- a.logger.Debug("calling LLM",
- "model", a.cfg.LLM.Primary.Model,
- "provider", a.cfg.LLM.Primary.Provider,
- )
-
- // Load system prompt from file if configured, else use description
- systemPrompt := a.cfg.Agent.Description
- if spFile := a.cfg.LLM.Reasoning.SystemPromptFile; spFile != "" {
- // Resolve path relative to agent directory
- spPath := filepath.Join("agents", a.cfg.Agent.ID, spFile)
- if data, err := os.ReadFile(spPath); err == nil {
- systemPrompt = string(data)
- } else {
- a.logger.Warn("failed to load system_prompt_file, using description", "path", spPath, "err", err)
- }
- }
-
- // Concatenate personality prompt block
- personalityBlock := personality.BuildPersonalityPrompt(a.personality)
- if personalityBlock != "" {
- systemPrompt = systemPrompt + "\n\n" + personalityBlock
- }
-
- // Build messages: conversation history from window (includes current user msg)
- messages := a.getWindowMessages(memKey)
- if len(messages) == 0 {
- // Fallback if memory is disabled: just the current message
- messages = []coretypes.Message{
- {Role: coretypes.RoleUser, Content: msgCtx.Content},
- }
- }
-
- // Build tool specs for the LLM if tool_use is enabled
- var llmTools []coretypes.ToolSpec
- if a.cfg.LLM.ToolUse.Enabled && a.toolReg.Len() > 0 {
- llmTools = a.toolReg.ToLLMSpecs()
- a.logger.Debug("tools available for LLM", "count", len(llmTools))
- }
-
- maxIter := a.cfg.LLM.ToolUse.MaxIterations
- if maxIter <= 0 {
- maxIter = defaultMaxToolIterations
- }
-
- // Tool-use loop: call LLM → execute tools → feed results back → repeat
- for i := 0; i < maxIter; i++ {
- req := coretypes.CompletionRequest{
- Model: a.cfg.LLM.Primary.Model,
- MaxTokens: a.cfg.LLM.Primary.MaxTokens,
- Temperature: a.cfg.LLM.Primary.Temperature,
- SystemPrompt: systemPrompt,
- Messages: messages,
- Tools: llmTools,
- }
-
- resp, err := a.llm(ctx, req)
- if err != nil {
- a.logger.Error("LLM call failed", "model", req.Model, "err", err)
- return "", err
- }
-
- a.logger.Debug("LLM responded",
- "content_len", len(resp.Content),
- "tool_calls", len(resp.ToolCalls),
- "finish_reason", resp.FinishReason,
- )
-
- // No tool calls — return the text response
- if len(resp.ToolCalls) == 0 {
- return resp.Content, nil
- }
-
- // Append assistant message with tool calls to conversation
- messages = append(messages, coretypes.Message{
- Role: coretypes.RoleAssistant,
- Content: resp.Content,
- ToolCalls: resp.ToolCalls,
- })
-
- // Execute each tool and append results
- for _, tc := range resp.ToolCalls {
- a.logger.Info("executing tool",
- "tool", tc.Name,
- "call_id", tc.ID,
- )
-
- // RBAC check for tool execution
- if !a.acl.CanDo(msgCtx.SenderID, "tool:"+tc.Name) {
- a.logger.Info("tool_denied", "tool", tc.Name, "sender", msgCtx.SenderID)
- messages = append(messages, coretypes.Message{
- Role: coretypes.RoleTool,
- Content: "error: permission denied for tool " + tc.Name,
- ToolCallID: tc.ID,
- })
- continue
- }
-
- // Notify the room that a tool is being called (respect thread context)
- toolNotice := fmt.Sprintf("🔨 %s", tc.Name)
- if err := a.sendReply(ctx, msgCtx.RoomID, msgCtx.EventID, msgCtx.ThreadID, toolNotice); err != nil {
- a.logger.Warn("failed to send tool call notice", "tool", tc.Name, "err", err)
- }
-
- result := a.toolReg.ExecuteForRoom(ctx, tc.Name, tc.Arguments, msgCtx.RoomID)
-
- output := result.Output
- if result.Err != nil {
- output = fmt.Sprintf("error: %s", result.Err)
- a.logger.Warn("tool execution error",
- "tool", tc.Name,
- "err", result.Err,
- )
- } else {
- a.logger.Debug("tool executed",
- "tool", tc.Name,
- "output_len", len(output),
- )
- }
-
- messages = append(messages, coretypes.Message{
- Role: coretypes.RoleTool,
- Content: output,
- ToolCallID: tc.ID,
- })
- }
- }
-
- // Max iterations reached — return whatever we have
- a.logger.Warn("tool-use loop reached max iterations", "max", maxIter)
- return "I've reached the maximum number of tool iterations. Here's what I found so far.", nil
-}
-
-// ── Memory helpers ───────────────────────────────────────────────────────
-
-// ensureWindowLoaded loads the conversation window from SQLite on first access for a room.
-func (a *Agent) ensureWindowLoaded(ctx context.Context, roomID string) {
- a.windowsMu.Lock()
- defer a.windowsMu.Unlock()
- if _, ok := a.windows[roomID]; ok {
- return
- }
- w := memory.NewWindow(a.windowSize)
- if a.memStore != nil {
- msgs, err := a.memStore.LoadMessages(ctx, a.cfg.Agent.ID, roomID, a.windowSize)
- if err != nil {
- a.logger.Warn("failed to load message history", "room", roomID, "err", err)
- } else {
- for _, m := range msgs {
- w = w.Append(coretypes.Message{Role: m.Role, Content: m.Content})
- }
- if len(msgs) > 0 {
- a.logger.Debug("loaded message history", "room", roomID, "count", len(msgs))
- }
- }
- }
- a.windows[roomID] = w
-}
-
-// appendToWindow adds a message to the in-memory conversation window.
-func (a *Agent) appendToWindow(roomID string, msg coretypes.Message) {
- a.windowsMu.Lock()
- defer a.windowsMu.Unlock()
- w, ok := a.windows[roomID]
- if !ok {
- w = memory.NewWindow(a.windowSize)
- }
- a.windows[roomID] = w.Append(msg)
-}
-
-// getWindowMessages returns a copy of the conversation window for a room.
-func (a *Agent) getWindowMessages(roomID string) []coretypes.Message {
- a.windowsMu.RLock()
- defer a.windowsMu.RUnlock()
- w, ok := a.windows[roomID]
- if !ok {
- return nil
- }
- return w.ToLLMMessages()
-}
-
-// persistMessage saves a message to the SQLite store (no-op if store is nil).
-func (a *Agent) persistMessage(ctx context.Context, roomID string, role coretypes.Role, content string) {
- if a.memStore == nil {
- return
- }
- if err := a.memStore.SaveMessage(ctx, memory.HistoryMessage{
- AgentID: a.cfg.Agent.ID,
- RoomID: roomID,
- Role: role,
- Content: content,
- }); err != nil {
- a.logger.Warn("failed to persist message", "room", roomID, "err", err)
- }
-}
-
-// sendReply sends a markdown reply that respects thread context.
-// If threadID is non-empty, the reply is sent as part of that thread.
-func (a *Agent) sendReply(ctx context.Context, roomID, eventID, threadID, markdown string) error {
- if threadID != "" {
- return a.matrix.SendThreadMarkdown(ctx, roomID, threadID, eventID, markdown)
- }
- return a.matrix.SendReplyMarkdown(ctx, roomID, eventID, markdown)
-}
-
-// parseSeverity converts a config string to sanitize.Severity.
-func parseSeverity(s string) sanitize.Severity {
- switch s {
- case "high":
- return sanitize.SeverityHigh
- case "low":
- return sanitize.SeverityLow
- default:
- return sanitize.SeverityMedium
- }
-}
-
-// sanitizeInput runs prompt injection detection on the message content.
-// Returns the (possibly modified) content and true if the message should be rejected.
-func (a *Agent) sanitizeInput(content, roomID, senderID string) (string, bool) {
- if a.sanitizeOpts == nil {
- return content, false
- }
-
- result := sanitize.Sanitize(content, *a.sanitizeOpts)
-
- for _, w := range result.Warnings {
- a.logger.Warn("prompt_injection_detected",
- "pattern", w.PatternName,
- "severity", w.Severity,
- "matched", w.Matched,
- "sender", senderID,
- "room", roomID,
- )
- }
-
- return result.Output, result.Rejected
-}
-
-// resolveDataBase returns the base directory for agent runtime data.
-// Priority: config storage.base_path > $AGENTS_DATA_DIR/ > agents//data
-func resolveDataBase(cfg *config.AgentConfig) string {
- if cfg.Storage.BasePath != "" {
- return cfg.Storage.BasePath
- }
- if envDir := os.Getenv("AGENTS_DATA_DIR"); envDir != "" {
- return filepath.Join(envDir, cfg.Agent.ID)
- }
- return filepath.Join("agents", cfg.Agent.ID, "data")
-}
-
-// buildToolRegistry creates a Registry with tools enabled in the agent's config.
-func buildToolRegistry(
- cfg *config.AgentConfig,
- sshExec *ssh.Executor,
- matrixClient *matrix.Client,
- memStore memory.Store,
- kStore *shellknowledge.FileStore,
- sharedKStore *shellknowledge.FileStore,
- mcpManager *shellmcp.Manager,
- skillLoader *shellskills.Loader,
- skillExecutor *shellskills.Executor,
- roomCtx *toolmemory.RoomContext,
- logger *slog.Logger,
-) *tools.Registry {
- reg := tools.NewRegistry(logger)
-
- if cfg.Tools.HTTP.Enabled {
- reg.Register(toolhttp.NewHTTPGet(cfg.Tools.HTTP))
- reg.Register(toolhttp.NewHTTPPost(cfg.Tools.HTTP))
- logger.Debug("registered http tools")
- }
-
- if cfg.Tools.SSH.Enabled {
- reg.Register(toolssh.NewSSHCommand(cfg.Tools.SSH, sshExec))
- logger.Debug("registered ssh tool")
- }
-
- if cfg.Tools.FileOps.Enabled {
- reg.Register(toolfile.NewReadFile(cfg.Tools.FileOps))
- reg.Register(toolfile.NewListDirectory(cfg.Tools.FileOps))
- if !cfg.Tools.FileOps.ReadOnly {
- reg.Register(toolfile.NewWriteFile(cfg.Tools.FileOps))
- reg.Register(toolfile.NewAppendFile(cfg.Tools.FileOps))
- reg.Register(toolfile.NewDeleteFile(cfg.Tools.FileOps))
- }
- logger.Debug("registered file tools")
- }
-
- // current_time is always available
- reg.Register(toolclock.NewCurrentTime())
- logger.Debug("registered current_time tool")
-
- // weather tool is always available
- reg.Register(toolweather.NewWeather())
- logger.Debug("registered weather tool")
-
- // imdb tool (enabled via config)
- if cfg.Tools.IMDb.Enabled {
- reg.Register(toolimdb.NewIMDbSearch(cfg.Tools.IMDb))
- logger.Debug("registered imdb tool")
- }
-
- // matrix_send is always available
- reg.Register(toolmatrix.NewMatrixSend(matrixClient, cfg.Tools.Matrix))
- logger.Debug("registered matrix tool")
-
- // Memory tools (memory_clear_context registered later since it needs the Agent)
- if cfg.Tools.Memory.Enabled && memStore != nil {
- reg.Register(toolmemory.NewMemorySave(cfg.Agent.ID, memStore))
- reg.Register(toolmemory.NewMemoryRecall(cfg.Agent.ID, memStore))
- reg.Register(toolmemory.NewMemoryForget(cfg.Agent.ID, memStore))
- reg.Register(toolmemory.NewMemorySummary(cfg.Agent.ID, memStore))
- logger.Debug("registered memory tools")
- }
-
- // Knowledge tools
- if cfg.Tools.Knowledge.Enabled && kStore != nil {
- reg.Register(toolknowledge.NewKnowledgeSearch(kStore))
- reg.Register(toolknowledge.NewKnowledgeRead(kStore))
- reg.Register(toolknowledge.NewKnowledgeWrite(kStore))
- reg.Register(toolknowledge.NewKnowledgeList(kStore))
- logger.Debug("registered knowledge tools")
- }
-
- // Shared knowledge tools
- if cfg.Tools.SharedKnowledge.Enabled && sharedKStore != nil {
- sharedTools := toolknowledge.NewSharedKnowledgeTools(sharedKStore)
- for _, tool := range sharedTools {
- reg.Register(tool)
- }
- logger.Debug("registered shared knowledge tools", "count", len(sharedTools))
- }
-
- // MCP tools — register tools from all connected MCP servers
- if mcpManager != nil {
- for serverName, mcpClient := range mcpManager.AllClients() {
- // Find the config for this server to get prefix, filter, timeout
- var serverCfg *config.MCPServerCfg
- for i := range cfg.Tools.MCP.Servers {
- if cfg.Tools.MCP.Servers[i].Name == serverName {
- serverCfg = &cfg.Tools.MCP.Servers[i]
- break
- }
- }
- if serverCfg == nil {
- logger.Warn("no config found for MCP server", "name", serverName)
- continue
- }
-
- // Convert and register MCP tools
- mcpTools := toolmcp.FromMCPServer(mcpClient, serverCfg.Prefix, serverCfg.Tools, serverCfg.Timeout, logger)
- for _, tool := range mcpTools {
- reg.Register(tool)
- }
- logger.Debug("registered MCP tools", "server", serverName, "count", len(mcpTools))
- }
- }
-
- // Skills tools — register skill search, load, read, and run tools
- if skillLoader != nil {
- reg.Register(toolskills.NewSkillSearch(skillLoader, cfg.Skills.Categories))
- reg.Register(toolskills.NewSkillLoad(skillLoader))
- reg.Register(toolskills.NewSkillReadResource(skillLoader))
- if skillExecutor != nil {
- reg.Register(toolskills.NewSkillRunScript(skillLoader, skillExecutor))
- }
- logger.Debug("registered skills tools")
- }
-
- return reg
-}