From ee8e74be1bcaff3d2b1c86f543e67923bccba2bb Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Wed, 8 Apr 2026 23:17:12 +0000 Subject: [PATCH] refactor: separar runtime.go en archivos por responsabilidad MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Divide agents/runtime.go (1188 lineas) en 5 archivos especializados: - runtime.go (350 lineas): struct Agent, New(), Run(), Stop(), lifecycle - handler.go: handleEvent(), executeActions(), command routing, bus, sanitizacion - llm.go: runLLM(), tool-use loop, system prompt, initLLM(), prompt-commands - memory.go: ensureWindowLoaded(), appendToWindow(), persistMessage(), ClearWindow() - registry_build.go: buildToolRegistry(), initToolDeps(), initRateLimiter() Zero cambios en API publica. Todos los metodos siguen siendo del struct Agent, solo viven en archivos separados por responsabilidad. Funciones helper extraidas de New() para reducir su tamaño: - initCrypto(): inicializacion E2EE - initLLM(): cliente LLM con fallback - initMemoryStore(): store SQLite + window size - initToolDeps(): knowledge, MCP, skills - initRateLimiter(): rate limiting de tools Reduccion: 1188 → 350 lineas en runtime.go (70% menos). --- agents/handler.go | 361 +++++++++++++++ agents/llm.go | 197 ++++++++ agents/memory.go | 119 +++++ agents/registry_build.go | 276 +++++++++++ agents/runtime.go | 964 +++------------------------------------ 5 files changed, 1016 insertions(+), 901 deletions(-) create mode 100644 agents/handler.go create mode 100644 agents/llm.go create mode 100644 agents/memory.go create mode 100644 agents/registry_build.go diff --git a/agents/handler.go b/agents/handler.go new file mode 100644 index 0000000..9eb3d66 --- /dev/null +++ b/agents/handler.go @@ -0,0 +1,361 @@ +package agents + +import ( + "context" + "fmt" + + "maunium.net/go/mautrix/event" + + "github.com/enmanuel/agents/pkg/command" + "github.com/enmanuel/agents/pkg/decision" + coretypes "github.com/enmanuel/agents/pkg/llm" + "github.com/enmanuel/agents/pkg/orchestration" + "github.com/enmanuel/agents/pkg/sanitize" + "github.com/enmanuel/agents/shell/bus" +) + +// handleEvent is called by the matrix Listener for each filtered incoming event. +func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) { + a.logger.Debug("handling event", + "sender", msgCtx.SenderID, + "is_dm", msgCtx.IsDirectMsg, + "is_mention", msgCtx.IsMention, + "command", msgCtx.Command, + ) + + roomID := evt.RoomID.String() + + // Update room context for memory tools + a.roomCtx.Set(roomID) + + if a.cfg.Personality.Behavior.TypingIndicator { + _ = a.matrix.SendTyping(ctx, roomID, true) + defer a.matrix.SendTyping(ctx, roomID, false) + } + + // ── Command flow ───────────────────────────────────────────────── + // Commands (!xxx) always resolve before rules or LLM. Never reach the LLM. + // Priority: built-in → unknown (agent-specific commands can be added via RegisterCommand). + if msgCtx.Command != "" { + a.logger.Info("command_received", + "command", msgCtx.Command, + "sender", msgCtx.SenderID, + "room", roomID, + "args", msgCtx.Args, + ) + + // Resolve aliases + cmdName := msgCtx.Command + if canonical, ok := a.cmdAliases[cmdName]; ok { + cmdName = canonical + } + + if handler, ok := a.commands[cmdName]; ok { + // RBAC check for commands + if !a.acl.CanDo(msgCtx.SenderID, "command:"+cmdName) { + a.logger.Info("command_denied", "command", cmdName, "sender", msgCtx.SenderID) + _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, + "No tienes permisos para ejecutar este comando.") + return + } + a.logger.Info("command_executed", "command", cmdName) + reply := handler(ctx, msgCtx) + _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, reply) + return + } + + // Prompt-command: expand .md content and pass to LLM + if content, ok := a.promptCmds[cmdName]; ok { + a.logger.Info("prompt_command_expanded", "command", cmdName) + msgCtx.Content = command.ExpandPrompt(content, msgCtx.Args) + msgCtx.Command = "" + msgCtx.Args = nil + // Fall through to rules/LLM flow below + } else { + // Unknown command — never falls through to rules or LLM + a.logger.Info("command_unknown", "command", msgCtx.Command) + _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, + fmt.Sprintf("Comando desconocido: `!%s`. Usa `!help` para ver comandos disponibles.", msgCtx.Command)) + return + } + } + + // ── Non-command flow ───────────────────────────────────────────── + // RBAC check for LLM access ("ask" action) + if !a.acl.CanDo(msgCtx.SenderID, "ask") { + a.logger.Info("ask_denied", "sender", msgCtx.SenderID) + _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, + "No tienes permisos para interactuar con este agente.") + return + } + + actions := decision.Evaluate(msgCtx, a.rules) + a.logger.Debug("rules evaluated", "matched_actions", len(actions)) + + // If no rules matched and the message mentions the bot or is a DM, use LLM. + if len(actions) == 0 && (msgCtx.IsMention || msgCtx.IsDirectMsg) { + if a.llm == nil { + // Simple bot: no LLM, ignore non-command messages + a.logger.Debug("no LLM configured, ignoring non-command message") + return + } + a.logger.Debug("no rules matched, falling back to LLM") + actions = []decision.Action{{ + Kind: decision.ActionKindLLM, + LLM: &decision.LLMAction{ContextKey: msgCtx.RoomID}, + }} + } + + if len(actions) == 0 { + a.logger.Debug("no actions, ignoring message", + "is_dm", msgCtx.IsDirectMsg, + "is_mention", msgCtx.IsMention, + ) + return + } + + a.executeActions(ctx, roomID, msgCtx, actions) +} + +// executeActions expands LLM actions and runs the effects runner. +func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decision.MessageContext, actions []decision.Action) { + // Auto-thread: if configured and message is not already in a thread, + // start a new thread rooted at the user's message. + if a.cfg.Matrix.Threads.AutoThread && msgCtx.ThreadID == "" && msgCtx.EventID != "" { + msgCtx.ThreadID = msgCtx.EventID + } + + // Sanitize user input before sending to LLM + sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID) + if rejected { + a.runner.Execute(ctx, roomID, []decision.Action{{ + Kind: decision.ActionKindReply, + Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, + }}) + return + } + msgCtx.Content = sanitized + + // Resolve memory key: use thread root as context key when inside a thread, + // so parallel threads in the same room have independent conversation windows. + memKey := roomID + if msgCtx.ThreadID != "" { + memKey = msgCtx.ThreadID + } + + expanded := make([]decision.Action, 0, len(actions)) + for _, act := range actions { + if act.Kind == decision.ActionKindLLM { + if a.llm == nil { + a.logger.Warn("LLM action requested but no LLM configured") + expanded = append(expanded, decision.Action{ + Kind: decision.ActionKindReply, + Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, + }) + continue + } + // Memory: load window + append user message before LLM call + a.ensureWindowLoaded(ctx, memKey) + a.appendToWindow(memKey, coretypes.Message{ + Role: coretypes.RoleUser, Content: msgCtx.Content, + }) + a.persistMessage(ctx, memKey, coretypes.RoleUser, msgCtx.Content) + + reply, err := a.runLLM(ctx, msgCtx, memKey) + if err != nil { + a.logger.Error("llm error", "err", err) + expanded = append(expanded, decision.Action{ + Kind: decision.ActionKindReply, + Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, + }) + } else { + expanded = append(expanded, decision.Action{ + Kind: decision.ActionKindReply, + Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, + }) + + // Memory: append assistant reply after LLM call + a.appendToWindow(memKey, coretypes.Message{ + Role: coretypes.RoleAssistant, Content: reply, + }) + a.persistMessage(ctx, memKey, coretypes.RoleAssistant, reply) + } + } else { + expanded = append(expanded, act) + } + } + + a.runner.Execute(ctx, roomID, expanded) +} + +// listenBus processes messages from the inter-agent bus. +func (a *Agent) listenBus(ctx context.Context, ch <-chan bus.AgentMessage) { + for { + select { + case <-ctx.Done(): + return + case msg, ok := <-ch: + if !ok { + return + } + if msg.Kind == bus.KindTask { + a.handleTaskEvent(ctx, msg) + } + } + } +} + +// handleTaskEvent processes a task delegated by the orchestrator. +// The bot generates a response and sends it both to Matrix and back via bus. +func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) { + taskJSON, ok := msg.Payload["task_json"] + if !ok { + a.logger.Error("task message missing task_json payload") + return + } + + task, err := orchestration.UnmarshalTaskEvent(taskJSON) + if err != nil { + a.logger.Error("failed to unmarshal task event", "err", err) + return + } + + a.logger.Info("handling orchestrated task", + "task_id", task.TaskID, + "room", task.TargetRoomID, + "sender", task.OriginalSender, + "iteration", task.Iteration, + ) + + roomID := task.TargetRoomID + + // Update room context for memory tools + a.roomCtx.Set(roomID) + + if a.cfg.Personality.Behavior.TypingIndicator { + _ = a.matrix.SendTyping(ctx, roomID, true) + defer a.matrix.SendTyping(ctx, roomID, false) + } + + // Build a synthetic MessageContext from the task + msgCtx := decision.MessageContext{ + SenderID: task.OriginalSender, + RoomID: roomID, + Content: task.OriginalQuestion, + IsDirectMsg: false, + IsMention: true, // treat orchestrated tasks like mentions + } + + // If there are previous responses, prepend context + if len(task.PreviousResponses) > 0 { + var context string + for _, pr := range task.PreviousResponses { + context += fmt.Sprintf("[Previous response from %s]: %s\n\n", pr.BotID, pr.Text) + } + msgCtx.Content = context + "Original question: " + task.OriginalQuestion + + "\n\nPlease provide an improved or complementary answer." + } + + // Sanitize orchestrated input + sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID) + if rejected { + a.logger.Warn("orchestrated task rejected by sanitizer", + "task_id", task.TaskID, "sender", task.OriginalSender) + _ = a.matrix.SendMarkdown(ctx, roomID, "El mensaje fue rechazado por el filtro de seguridad.") + return + } + msgCtx.Content = sanitized + + // Load memory and run LLM + a.ensureWindowLoaded(ctx, roomID) + a.appendToWindow(roomID, coretypes.Message{ + Role: coretypes.RoleUser, Content: msgCtx.Content, + }) + + reply, err := a.runLLM(ctx, msgCtx, roomID) + + // Build the result to send back via bus + result := orchestration.TaskResult{ + TaskID: task.TaskID, + BotID: a.cfg.Agent.ID, + } + + if err != nil { + a.logger.Error("LLM error during orchestrated task", "err", err) + result.Error = err.Error() + reply = "Sorry, I encountered an error." + } else { + result.Text = reply + // Persist assistant reply + a.appendToWindow(roomID, coretypes.Message{ + Role: coretypes.RoleAssistant, Content: reply, + }) + a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply) + } + + // Send reply to Matrix room + if sendErr := a.matrix.SendMarkdown(ctx, roomID, reply); sendErr != nil { + a.logger.Error("failed to send orchestrated reply to Matrix", "err", sendErr) + } + + // Send result back to orchestrator via bus + resultJSON, marshalErr := orchestration.MarshalTaskResult(result) + if marshalErr != nil { + a.logger.Error("failed to marshal task result", "err", marshalErr) + return + } + + replyMsg := bus.AgentMessage{ + From: bus.AgentID(a.cfg.Agent.ID), + To: msg.From, + Kind: bus.KindTaskResult, + Payload: map[string]string{"result_json": resultJSON}, + } + + if busErr := a.agentBus.Reply(task.TaskID, replyMsg); busErr != nil { + a.logger.Error("failed to send task result via bus", "err", busErr) + } +} + +// sendReply sends a markdown reply that respects thread context. +// If threadID is non-empty, the reply is sent as part of that thread. +func (a *Agent) sendReply(ctx context.Context, roomID, eventID, threadID, markdown string) error { + if threadID != "" { + return a.matrix.SendThreadMarkdown(ctx, roomID, threadID, eventID, markdown) + } + return a.matrix.SendReplyMarkdown(ctx, roomID, eventID, markdown) +} + +// parseSeverity converts a config string to sanitize.Severity. +func parseSeverity(s string) sanitize.Severity { + switch s { + case "high": + return sanitize.SeverityHigh + case "low": + return sanitize.SeverityLow + default: + return sanitize.SeverityMedium + } +} + +// sanitizeInput runs prompt injection detection on the message content. +// Returns the (possibly modified) content and true if the message should be rejected. +func (a *Agent) sanitizeInput(content, roomID, senderID string) (string, bool) { + if a.sanitizeOpts == nil { + return content, false + } + + result := sanitize.Sanitize(content, *a.sanitizeOpts) + + for _, w := range result.Warnings { + a.logger.Warn("prompt_injection_detected", + "pattern", w.PatternName, + "severity", w.Severity, + "matched", w.Matched, + "sender", senderID, + "room", roomID, + ) + } + + return result.Output, result.Rejected +} diff --git a/agents/llm.go b/agents/llm.go new file mode 100644 index 0000000..e223020 --- /dev/null +++ b/agents/llm.go @@ -0,0 +1,197 @@ +package agents + +import ( + "context" + "fmt" + "log/slog" + "os" + "path/filepath" + + "github.com/enmanuel/agents/internal/config" + "github.com/enmanuel/agents/pkg/command" + "github.com/enmanuel/agents/pkg/decision" + coretypes "github.com/enmanuel/agents/pkg/llm" + "github.com/enmanuel/agents/pkg/personality" + shelllm "github.com/enmanuel/agents/shell/llm" +) + +// runLLM executes the LLM completion loop, including iterative tool-use. +func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext, memKey string) (string, error) { + a.logger.Debug("calling LLM", + "model", a.cfg.LLM.Primary.Model, + "provider", a.cfg.LLM.Primary.Provider, + ) + + // Load system prompt from file if configured, else use description + systemPrompt := a.cfg.Agent.Description + if spFile := a.cfg.LLM.Reasoning.SystemPromptFile; spFile != "" { + // Resolve path relative to agent directory + spPath := filepath.Join("agents", a.cfg.Agent.ID, spFile) + if data, err := os.ReadFile(spPath); err == nil { + systemPrompt = string(data) + } else { + a.logger.Warn("failed to load system_prompt_file, using description", "path", spPath, "err", err) + } + } + + // Concatenate personality prompt block + personalityBlock := personality.BuildPersonalityPrompt(a.personality) + if personalityBlock != "" { + systemPrompt = systemPrompt + "\n\n" + personalityBlock + } + + // Build messages: conversation history from window (includes current user msg) + messages := a.getWindowMessages(memKey) + if len(messages) == 0 { + // Fallback if memory is disabled: just the current message + messages = []coretypes.Message{ + {Role: coretypes.RoleUser, Content: msgCtx.Content}, + } + } + + // Build tool specs for the LLM if tool_use is enabled + var llmTools []coretypes.ToolSpec + if a.cfg.LLM.ToolUse.Enabled && a.toolReg.Len() > 0 { + llmTools = a.toolReg.ToLLMSpecs() + a.logger.Debug("tools available for LLM", "count", len(llmTools)) + } + + maxIter := a.cfg.LLM.ToolUse.MaxIterations + if maxIter <= 0 { + maxIter = defaultMaxToolIterations + } + + // Tool-use loop: call LLM → execute tools → feed results back → repeat + for i := 0; i < maxIter; i++ { + req := coretypes.CompletionRequest{ + Model: a.cfg.LLM.Primary.Model, + MaxTokens: a.cfg.LLM.Primary.MaxTokens, + Temperature: a.cfg.LLM.Primary.Temperature, + SystemPrompt: systemPrompt, + Messages: messages, + Tools: llmTools, + } + + resp, err := a.llm(ctx, req) + if err != nil { + a.logger.Error("LLM call failed", "model", req.Model, "err", err) + return "", err + } + + a.logger.Debug("LLM responded", + "content_len", len(resp.Content), + "tool_calls", len(resp.ToolCalls), + "finish_reason", resp.FinishReason, + ) + + // No tool calls — return the text response + if len(resp.ToolCalls) == 0 { + return resp.Content, nil + } + + // Append assistant message with tool calls to conversation + messages = append(messages, coretypes.Message{ + Role: coretypes.RoleAssistant, + Content: resp.Content, + ToolCalls: resp.ToolCalls, + }) + + // Execute each tool and append results + for _, tc := range resp.ToolCalls { + a.logger.Info("executing tool", + "tool", tc.Name, + "call_id", tc.ID, + ) + + // RBAC check for tool execution + if !a.acl.CanDo(msgCtx.SenderID, "tool:"+tc.Name) { + a.logger.Info("tool_denied", "tool", tc.Name, "sender", msgCtx.SenderID) + messages = append(messages, coretypes.Message{ + Role: coretypes.RoleTool, + Content: "error: permission denied for tool " + tc.Name, + ToolCallID: tc.ID, + }) + continue + } + + // Notify the room that a tool is being called (respect thread context) + toolNotice := fmt.Sprintf("\U0001f528 %s", tc.Name) + if err := a.sendReply(ctx, msgCtx.RoomID, msgCtx.EventID, msgCtx.ThreadID, toolNotice); err != nil { + a.logger.Warn("failed to send tool call notice", "tool", tc.Name, "err", err) + } + + result := a.toolReg.ExecuteForRoom(ctx, tc.Name, tc.Arguments, msgCtx.RoomID) + + output := result.Output + if result.Err != nil { + output = fmt.Sprintf("error: %s", result.Err) + a.logger.Warn("tool execution error", + "tool", tc.Name, + "err", result.Err, + ) + } else { + a.logger.Debug("tool executed", + "tool", tc.Name, + "output_len", len(output), + ) + } + + messages = append(messages, coretypes.Message{ + Role: coretypes.RoleTool, + Content: output, + ToolCallID: tc.ID, + }) + } + } + + // Max iterations reached — return whatever we have + a.logger.Warn("tool-use loop reached max iterations", "max", maxIter) + return "I've reached the maximum number of tool iterations. Here's what I found so far.", nil +} + +// initLLM creates the LLM client function with optional fallback. +// Returns nil when no provider is configured (command-only bot). +func initLLM(cfg *config.AgentConfig, logger *slog.Logger) (coretypes.CompleteFunc, error) { + if cfg.LLM.Primary.Provider == "" { + logger.Info("no LLM configured, running as command-only bot") + return nil, nil + } + + llmLog := logger.With("component", "llm") + primaryLLM, err := shelllm.FromConfig(cfg.LLM.Primary, llmLog) + if err != nil { + return nil, fmt.Errorf("primary LLM: %w", err) + } + + llmFunc := primaryLLM + if cfg.LLM.Fallback.Provider != "" { + fallbackLLM, err := shelllm.FromConfig(cfg.LLM.Fallback, llmLog) + if err != nil { + logger.Warn("fallback LLM config error", "err", err) + } else { + llmFunc = shelllm.WithFallback(primaryLLM, fallbackLLM, cfg.LLM.Fallback, llmLog) + } + } + + return llmFunc, nil +} + +// loadPromptCommands scans the project-root prompts/ directory and loads all .md files. +func (a *Agent) loadPromptCommands() { + prompts, err := command.LoadPromptCommands("prompts") + if err != nil { + a.logger.Warn("failed to load prompt-commands", "err", err) + return + } + a.promptCmds = make(map[string]string, len(prompts)) + for _, p := range prompts { + a.promptCmds[p.Name] = p.Content + } + if len(a.promptCmds) > 0 { + names := make([]string, 0, len(a.promptCmds)) + for n := range a.promptCmds { + names = append(names, n) + } + a.logger.Info("prompt-commands loaded", "count", len(a.promptCmds), "names", names) + } +} diff --git a/agents/memory.go b/agents/memory.go new file mode 100644 index 0000000..b5cd058 --- /dev/null +++ b/agents/memory.go @@ -0,0 +1,119 @@ +package agents + +import ( + "context" + "fmt" + "log/slog" + "path/filepath" + + coretypes "github.com/enmanuel/agents/pkg/llm" + "github.com/enmanuel/agents/pkg/memory" + shellmem "github.com/enmanuel/agents/shell/memory" +) + +// ClearWindow resets the conversation window for a room and deletes persisted +// messages from SQLite so the agent starts fresh. Implements toolmemory.WindowClearer. +func (a *Agent) ClearWindow(roomID string) { + a.windowsMu.Lock() + a.windows[roomID] = memory.NewWindow(a.windowSize) + a.windowsMu.Unlock() + + if a.memStore != nil { + if err := a.memStore.DeleteMessages( + context.Background(), a.cfg.Agent.ID, &roomID, + ); err != nil { + a.logger.Warn("failed to delete persisted messages on clear", "room", roomID, "err", err) + } + } +} + +// ensureWindowLoaded loads the conversation window from SQLite on first access for a room. +func (a *Agent) ensureWindowLoaded(ctx context.Context, roomID string) { + a.windowsMu.Lock() + defer a.windowsMu.Unlock() + if _, ok := a.windows[roomID]; ok { + return + } + w := memory.NewWindow(a.windowSize) + if a.memStore != nil { + msgs, err := a.memStore.LoadMessages(ctx, a.cfg.Agent.ID, roomID, a.windowSize) + if err != nil { + a.logger.Warn("failed to load message history", "room", roomID, "err", err) + } else { + for _, m := range msgs { + w = w.Append(coretypes.Message{Role: m.Role, Content: m.Content}) + } + if len(msgs) > 0 { + a.logger.Debug("loaded message history", "room", roomID, "count", len(msgs)) + } + } + } + a.windows[roomID] = w +} + +// appendToWindow adds a message to the in-memory conversation window. +func (a *Agent) appendToWindow(roomID string, msg coretypes.Message) { + a.windowsMu.Lock() + defer a.windowsMu.Unlock() + w, ok := a.windows[roomID] + if !ok { + w = memory.NewWindow(a.windowSize) + } + a.windows[roomID] = w.Append(msg) +} + +// getWindowMessages returns a copy of the conversation window for a room. +func (a *Agent) getWindowMessages(roomID string) []coretypes.Message { + a.windowsMu.RLock() + defer a.windowsMu.RUnlock() + w, ok := a.windows[roomID] + if !ok { + return nil + } + return w.ToLLMMessages() +} + +// persistMessage saves a message to the SQLite store (no-op if store is nil). +func (a *Agent) persistMessage(ctx context.Context, roomID string, role coretypes.Role, content string) { + if a.memStore == nil { + return + } + if err := a.memStore.SaveMessage(ctx, memory.HistoryMessage{ + AgentID: a.cfg.Agent.ID, + RoomID: roomID, + Role: role, + Content: content, + }); err != nil { + a.logger.Warn("failed to persist message", "room", roomID, "err", err) + } +} + +// memoryInit holds the results of memory subsystem initialization. +type memoryInit struct { + store memory.Store + windowSize int +} + +// initMemoryStore creates the memory store and resolves window size from config. +// Returns a zero-value memoryInit if memory is disabled. +func initMemoryStore(enabled bool, windowSizeCfg int, dbPathCfg string, dataBase string, logger *slog.Logger) (memoryInit, error) { + if !enabled { + return memoryInit{windowSize: defaultWindowSize}, nil + } + + windowSize := windowSizeCfg + if windowSize <= 0 { + windowSize = defaultWindowSize + } + + dbPath := dbPathCfg + if dbPath == "" { + dbPath = filepath.Join(dataBase, "memory.db") + } + store, err := shellmem.New(dbPath, logger) + if err != nil { + return memoryInit{}, fmt.Errorf("memory store: %w", err) + } + logger.Info("memory enabled", "window_size", windowSize, "db", dbPath) + return memoryInit{store: store, windowSize: windowSize}, nil +} diff --git a/agents/registry_build.go b/agents/registry_build.go new file mode 100644 index 0000000..2fcb14c --- /dev/null +++ b/agents/registry_build.go @@ -0,0 +1,276 @@ +package agents + +import ( + "context" + "log/slog" + "os" + "path/filepath" + "time" + + "github.com/enmanuel/agents/internal/config" + "github.com/enmanuel/agents/pkg/memory" + shellknowledge "github.com/enmanuel/agents/shell/knowledge" + shellmcp "github.com/enmanuel/agents/shell/mcp" + shellskills "github.com/enmanuel/agents/shell/skills" + "github.com/enmanuel/agents/shell/ssh" + "github.com/enmanuel/agents/tools" + toolclock "github.com/enmanuel/agents/tools/clock" + toolfile "github.com/enmanuel/agents/tools/file" + toolhttp "github.com/enmanuel/agents/tools/http" + toolimdb "github.com/enmanuel/agents/tools/imdb" + toolknowledge "github.com/enmanuel/agents/tools/knowledgetools" + toolmatrix "github.com/enmanuel/agents/tools/matrix" + toolmcp "github.com/enmanuel/agents/tools/mcptools" + toolmemory "github.com/enmanuel/agents/tools/memorytools" + toolskills "github.com/enmanuel/agents/tools/skilltools" + toolssh "github.com/enmanuel/agents/tools/ssh" + toolweather "github.com/enmanuel/agents/tools/weather" + + "github.com/enmanuel/agents/shell/matrix" +) + +// toolDeps holds external subsystem instances needed by the tool registry. +type toolDeps struct { + kStore *shellknowledge.FileStore + sharedKStore *shellknowledge.FileStore + mcpManager *shellmcp.Manager + skillLoader *shellskills.Loader + skillExecutor *shellskills.Executor +} + +// initToolDeps initializes knowledge stores, MCP manager, and skills loader +// based on the agent config. All results are optional (nil when disabled). +func initToolDeps(cfg *config.AgentConfig, dataBase string, logger *slog.Logger) toolDeps { + var deps toolDeps + + // Knowledge store + if cfg.Tools.Knowledge.Enabled { + knowledgeDir := cfg.Tools.Knowledge.Dir + if knowledgeDir == "" { + knowledgeDir = filepath.Join("agents", cfg.Agent.ID, "knowledge") + } + knowledgeDBPath := filepath.Join(dataBase, "knowledge.db") + kStore, kErr := shellknowledge.New(knowledgeDir, knowledgeDBPath, logger) + if kErr != nil { + logger.Error("knowledge_store_init_failed", "err", kErr) + } else { + if syncErr := kStore.Sync(context.Background()); syncErr != nil { + logger.Error("knowledge_sync_failed", "err", syncErr) + } + deps.kStore = kStore + } + } + + // Shared knowledge store + if cfg.Tools.SharedKnowledge.Enabled { + sharedDir := cfg.Tools.SharedKnowledge.Dir + if sharedDir == "" { + sharedDir = "knowledges" + } + sharedDBPath := cfg.Tools.SharedKnowledge.DBPath + if sharedDBPath == "" { + sharedDBPath = "knowledges/data/knowledge.db" + } + sharedKStore, skErr := shellknowledge.New(sharedDir, sharedDBPath, logger) + if skErr != nil { + logger.Error("shared_knowledge_store_init_failed", "err", skErr) + } else { + if syncErr := sharedKStore.Sync(context.Background()); syncErr != nil { + logger.Error("shared_knowledge_sync_failed", "err", syncErr) + } + logger.Info("shared knowledge enabled", "dir", sharedDir, "db", sharedDBPath) + deps.sharedKStore = sharedKStore + } + } + + // MCP client manager — connects to external MCP servers + if cfg.Tools.MCP.Enabled && len(cfg.Tools.MCP.Servers) > 0 { + mcpManager, mcpErr := shellmcp.NewManager(context.Background(), cfg.Tools.MCP.Servers, logger) + if mcpErr != nil { + logger.Error("mcp_manager_init_failed", "err", mcpErr) + } else { + logger.Info("mcp manager initialized", "servers", len(cfg.Tools.MCP.Servers)) + deps.mcpManager = mcpManager + } + } + + // Skills loader + if cfg.Skills.Enabled { + skillsPath := cfg.Skills.SkillsPath + if skillsPath == "" { + skillsPath = "skills/" + } + deps.skillLoader = shellskills.NewLoader(skillsPath) + + // Skills executor for scripts + allowedInterpreters := cfg.Tools.Skills.AllowedInterpreters + timeout := cfg.Skills.Timeout + if timeout == 0 { + timeout = 60 * time.Second + } + deps.skillExecutor = shellskills.NewExecutor(allowedInterpreters, timeout) + logger.Info("skills enabled", "path", skillsPath, "categories", cfg.Skills.Categories) + } + + return deps +} + +// initRateLimiter configures the rate limiter on the tool registry if enabled. +func initRateLimiter(cfg *config.AgentConfig, toolReg *tools.Registry, logger *slog.Logger) { + if !cfg.Security.ToolRateLimit.Enabled { + return + } + maxCalls := cfg.Security.ToolRateLimit.MaxCallsPerMin + if maxCalls <= 0 { + maxCalls = 10 + } + rl := tools.NewRateLimiter(maxCalls, time.Minute) + toolReg.SetRateLimiter(rl) + + cleanupInterval := cfg.Security.ToolRateLimit.CleanupIntervalS + if cleanupInterval <= 0 { + cleanupInterval = 60 + } + go func() { + ticker := time.NewTicker(time.Duration(cleanupInterval) * time.Second) + defer ticker.Stop() + for range ticker.C { + rl.Cleanup() + } + }() + logger.Info("tool rate limiting enabled", "max_calls_per_min", maxCalls) +} + +// buildToolRegistry creates a Registry with tools enabled in the agent's config. +func buildToolRegistry( + cfg *config.AgentConfig, + sshExec *ssh.Executor, + matrixClient *matrix.Client, + memStore memory.Store, + kStore *shellknowledge.FileStore, + sharedKStore *shellknowledge.FileStore, + mcpManager *shellmcp.Manager, + skillLoader *shellskills.Loader, + skillExecutor *shellskills.Executor, + roomCtx *toolmemory.RoomContext, + logger *slog.Logger, +) *tools.Registry { + reg := tools.NewRegistry(logger) + + if cfg.Tools.HTTP.Enabled { + reg.Register(toolhttp.NewHTTPGet(cfg.Tools.HTTP)) + reg.Register(toolhttp.NewHTTPPost(cfg.Tools.HTTP)) + logger.Debug("registered http tools") + } + + if cfg.Tools.SSH.Enabled { + reg.Register(toolssh.NewSSHCommand(cfg.Tools.SSH, sshExec)) + logger.Debug("registered ssh tool") + } + + if cfg.Tools.FileOps.Enabled { + reg.Register(toolfile.NewReadFile(cfg.Tools.FileOps)) + reg.Register(toolfile.NewListDirectory(cfg.Tools.FileOps)) + if !cfg.Tools.FileOps.ReadOnly { + reg.Register(toolfile.NewWriteFile(cfg.Tools.FileOps)) + reg.Register(toolfile.NewAppendFile(cfg.Tools.FileOps)) + reg.Register(toolfile.NewDeleteFile(cfg.Tools.FileOps)) + } + logger.Debug("registered file tools") + } + + // current_time is always available + reg.Register(toolclock.NewCurrentTime()) + logger.Debug("registered current_time tool") + + // weather tool is always available + reg.Register(toolweather.NewWeather()) + logger.Debug("registered weather tool") + + // imdb tool (enabled via config) + if cfg.Tools.IMDb.Enabled { + reg.Register(toolimdb.NewIMDbSearch(cfg.Tools.IMDb)) + logger.Debug("registered imdb tool") + } + + // matrix_send is always available + reg.Register(toolmatrix.NewMatrixSend(matrixClient, cfg.Tools.Matrix)) + logger.Debug("registered matrix tool") + + // Memory tools (memory_clear_context registered later since it needs the Agent) + if cfg.Tools.Memory.Enabled && memStore != nil { + reg.Register(toolmemory.NewMemorySave(cfg.Agent.ID, memStore)) + reg.Register(toolmemory.NewMemoryRecall(cfg.Agent.ID, memStore)) + reg.Register(toolmemory.NewMemoryForget(cfg.Agent.ID, memStore)) + reg.Register(toolmemory.NewMemorySummary(cfg.Agent.ID, memStore)) + logger.Debug("registered memory tools") + } + + // Knowledge tools + if cfg.Tools.Knowledge.Enabled && kStore != nil { + reg.Register(toolknowledge.NewKnowledgeSearch(kStore)) + reg.Register(toolknowledge.NewKnowledgeRead(kStore)) + reg.Register(toolknowledge.NewKnowledgeWrite(kStore)) + reg.Register(toolknowledge.NewKnowledgeList(kStore)) + logger.Debug("registered knowledge tools") + } + + // Shared knowledge tools + if cfg.Tools.SharedKnowledge.Enabled && sharedKStore != nil { + sharedTools := toolknowledge.NewSharedKnowledgeTools(sharedKStore) + for _, tool := range sharedTools { + reg.Register(tool) + } + logger.Debug("registered shared knowledge tools", "count", len(sharedTools)) + } + + // MCP tools — register tools from all connected MCP servers + if mcpManager != nil { + for serverName, mcpClient := range mcpManager.AllClients() { + // Find the config for this server to get prefix, filter, timeout + var serverCfg *config.MCPServerCfg + for i := range cfg.Tools.MCP.Servers { + if cfg.Tools.MCP.Servers[i].Name == serverName { + serverCfg = &cfg.Tools.MCP.Servers[i] + break + } + } + if serverCfg == nil { + logger.Warn("no config found for MCP server", "name", serverName) + continue + } + + // Convert and register MCP tools + mcpTools := toolmcp.FromMCPServer(mcpClient, serverCfg.Prefix, serverCfg.Tools, serverCfg.Timeout, logger) + for _, tool := range mcpTools { + reg.Register(tool) + } + logger.Debug("registered MCP tools", "server", serverName, "count", len(mcpTools)) + } + } + + // Skills tools — register skill search, load, read, and run tools + if skillLoader != nil { + reg.Register(toolskills.NewSkillSearch(skillLoader, cfg.Skills.Categories)) + reg.Register(toolskills.NewSkillLoad(skillLoader)) + reg.Register(toolskills.NewSkillReadResource(skillLoader)) + if skillExecutor != nil { + reg.Register(toolskills.NewSkillRunScript(skillLoader, skillExecutor)) + } + logger.Debug("registered skills tools") + } + + return reg +} + +// resolveDataBase returns the base directory for agent runtime data. +// Priority: config storage.base_path > $AGENTS_DATA_DIR/ > agents//data +func resolveDataBase(cfg *config.AgentConfig) string { + if cfg.Storage.BasePath != "" { + return cfg.Storage.BasePath + } + if envDir := os.Getenv("AGENTS_DATA_DIR"); envDir != "" { + return filepath.Join(envDir, cfg.Agent.ID) + } + return filepath.Join("agents", cfg.Agent.ID, "data") +} diff --git a/agents/runtime.go b/agents/runtime.go index b541331..43786b7 100644 --- a/agents/runtime.go +++ b/agents/runtime.go @@ -20,31 +20,18 @@ import ( "github.com/enmanuel/agents/pkg/decision" coretypes "github.com/enmanuel/agents/pkg/llm" "github.com/enmanuel/agents/pkg/memory" - "github.com/enmanuel/agents/pkg/orchestration" "github.com/enmanuel/agents/pkg/personality" "github.com/enmanuel/agents/pkg/sanitize" "github.com/enmanuel/agents/shell/bus" shellcron "github.com/enmanuel/agents/shell/cron" "github.com/enmanuel/agents/shell/effects" shellknowledge "github.com/enmanuel/agents/shell/knowledge" - shelllm "github.com/enmanuel/agents/shell/llm" "github.com/enmanuel/agents/shell/matrix" shellmcp "github.com/enmanuel/agents/shell/mcp" - shellmem "github.com/enmanuel/agents/shell/memory" shellskills "github.com/enmanuel/agents/shell/skills" "github.com/enmanuel/agents/shell/ssh" "github.com/enmanuel/agents/tools" - toolclock "github.com/enmanuel/agents/tools/clock" - toolfile "github.com/enmanuel/agents/tools/file" - toolhttp "github.com/enmanuel/agents/tools/http" - toolimdb "github.com/enmanuel/agents/tools/imdb" - toolknowledge "github.com/enmanuel/agents/tools/knowledgetools" - toolmatrix "github.com/enmanuel/agents/tools/matrix" - toolmcp "github.com/enmanuel/agents/tools/mcptools" toolmemory "github.com/enmanuel/agents/tools/memorytools" - toolskills "github.com/enmanuel/agents/tools/skilltools" - toolssh "github.com/enmanuel/agents/tools/ssh" - toolweather "github.com/enmanuel/agents/tools/weather" ) const ( @@ -77,10 +64,10 @@ type Agent struct { acl acl.ACL // Commands — handlers keyed by canonical name; cmdAliases maps alias → canonical - commands map[string]CommandHandler - cmdAliases map[string]string // alias → canonical name - customSpecs []command.Spec // specs from RegisterCommand (for !help) - startTime time.Time + commands map[string]CommandHandler + cmdAliases map[string]string // alias → canonical name + customSpecs []command.Spec // specs from RegisterCommand (for !help) + startTime time.Time // Memory windows map[string]memory.Window @@ -111,22 +98,6 @@ type Agent struct { scheduler *shellcron.Scheduler } -// ClearWindow resets the conversation window for a room and deletes persisted -// messages from SQLite so the agent starts fresh. Implements toolmemory.WindowClearer. -func (a *Agent) ClearWindow(roomID string) { - a.windowsMu.Lock() - a.windows[roomID] = memory.NewWindow(a.windowSize) - a.windowsMu.Unlock() - - if a.memStore != nil { - if err := a.memStore.DeleteMessages( - context.Background(), a.cfg.Agent.ID, &roomID, - ); err != nil { - a.logger.Warn("failed to delete persisted messages on clear", "room", roomID, "err", err) - } - } -} - // New assembles an Agent from its config, rules, pre-resolved ACL, and logger. // The ACL is resolved externally (e.g. from security/ YAML files) and injected here. // Pass acl.ACL{} (empty) for open access (no restrictions). @@ -138,196 +109,46 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, agentACL acl.ACL, logge } // E2EE — initialize before the sync loop starts - var cryptoStore io.Closer - if cfg.Matrix.Encryption.Enabled { - storePath := filepath.Join(cfg.Matrix.Encryption.StorePath, "crypto.db") - pickleKey := os.Getenv(cfg.Matrix.Encryption.PickleKeyEnv) - logger.Info("initializing e2ee", "store", storePath) - cryptoStore, err = matrixClient.InitCrypto(context.Background(), storePath, pickleKey, cfg.Agent.ID) - if err != nil { - return nil, fmt.Errorf("e2ee init: %w", err) - } - - // Auto-fetch cross-signing private keys from SSSS if recovery key is configured. - if envName := cfg.Matrix.Encryption.RecoveryKeyEnv; envName != "" { - if rk := os.Getenv(envName); rk != "" { - if err := matrixClient.FetchCrossSigningKeys(context.Background(), rk); err != nil { - logger.Warn("failed to fetch cross-signing keys from SSSS (non-fatal)", "err", err) - } else { - logger.Info("cross-signing private keys fetched from SSSS") - } - } - } - - // Sign own device with the self-signing key so Element shows it as verified. - if err := matrixClient.SignOwnDevice(context.Background()); err != nil { - logger.Warn("failed to sign own device (non-fatal)", "err", err) - } else { - logger.Info("own device signed with cross-signing key") - } - - logger.Info("e2ee ready") + cryptoStore, err := initCrypto(cfg, matrixClient, logger) + if err != nil { + return nil, err } // SSH executor sshExec := ssh.NewExecutor(cfg.SSH, logger) // LLM client — optional; if no provider is configured, the agent runs as simple_bot - var llmFunc coretypes.CompleteFunc - if cfg.LLM.Primary.Provider != "" { - llmLog := logger.With("component", "llm") - primaryLLM, err := shelllm.FromConfig(cfg.LLM.Primary, llmLog) - if err != nil { - return nil, fmt.Errorf("primary LLM: %w", err) - } - - llmFunc = primaryLLM - if cfg.LLM.Fallback.Provider != "" { - fallbackLLM, err := shelllm.FromConfig(cfg.LLM.Fallback, llmLog) - if err != nil { - logger.Warn("fallback LLM config error", "err", err) - } else { - llmFunc = shelllm.WithFallback(primaryLLM, fallbackLLM, cfg.LLM.Fallback, llmLog) - } - } - } else { - logger.Info("no LLM configured, running as command-only bot") + llmFunc, err := initLLM(cfg, logger) + if err != nil { + return nil, err } // Effects runner runner := effects.NewRunner(matrixClient, sshExec, logger) - // Resolve base data path for this agent. - // Priority: config storage.base_path > $AGENTS_DATA_DIR/ > agents//data + // Resolve base data path for this agent dataBase := resolveDataBase(cfg) logger.Debug("data base path", "path", dataBase) // Memory subsystem - var memStore memory.Store - windowSize := defaultWindowSize - roomCtx := &toolmemory.RoomContext{} - - if cfg.Memory.Enabled { - windowSize = cfg.Memory.WindowSize - if windowSize <= 0 { - windowSize = defaultWindowSize - } - - dbPath := cfg.Memory.DBPath - if dbPath == "" { - dbPath = filepath.Join(dataBase, "memory.db") - } - store, err := shellmem.New(dbPath, logger) - if err != nil { - return nil, fmt.Errorf("memory store: %w", err) - } - memStore = store - logger.Info("memory enabled", "window_size", windowSize, "db", dbPath) + memInit, err := initMemoryStore(cfg.Memory.Enabled, cfg.Memory.WindowSize, cfg.Memory.DBPath, dataBase, logger) + if err != nil { + return nil, err } - // Knowledge store - var kStore *shellknowledge.FileStore - if cfg.Tools.Knowledge.Enabled { - knowledgeDir := cfg.Tools.Knowledge.Dir - if knowledgeDir == "" { - knowledgeDir = filepath.Join("agents", cfg.Agent.ID, "knowledge") - } - knowledgeDBPath := filepath.Join(dataBase, "knowledge.db") - var kErr error - kStore, kErr = shellknowledge.New(knowledgeDir, knowledgeDBPath, logger) - if kErr != nil { - logger.Error("knowledge_store_init_failed", "err", kErr) - } else { - if syncErr := kStore.Sync(context.Background()); syncErr != nil { - logger.Error("knowledge_sync_failed", "err", syncErr) - } - } - } - - // Shared knowledge store - var sharedKStore *shellknowledge.FileStore - if cfg.Tools.SharedKnowledge.Enabled { - sharedDir := cfg.Tools.SharedKnowledge.Dir - if sharedDir == "" { - sharedDir = "knowledges" - } - sharedDBPath := cfg.Tools.SharedKnowledge.DBPath - if sharedDBPath == "" { - sharedDBPath = "knowledges/data/knowledge.db" - } - var skErr error - sharedKStore, skErr = shellknowledge.New(sharedDir, sharedDBPath, logger) - if skErr != nil { - logger.Error("shared_knowledge_store_init_failed", "err", skErr) - } else { - if syncErr := sharedKStore.Sync(context.Background()); syncErr != nil { - logger.Error("shared_knowledge_sync_failed", "err", syncErr) - } - logger.Info("shared knowledge enabled", "dir", sharedDir, "db", sharedDBPath) - } - } + // Tool dependencies (knowledge, MCP, skills) + deps := initToolDeps(cfg, dataBase, logger) if !agentACL.Empty() { logger.Info("acl enabled (centralized security policy)") } - // MCP client manager — connects to external MCP servers - var mcpManager *shellmcp.Manager - if cfg.Tools.MCP.Enabled && len(cfg.Tools.MCP.Servers) > 0 { - var mcpErr error - mcpManager, mcpErr = shellmcp.NewManager(context.Background(), cfg.Tools.MCP.Servers, logger) - if mcpErr != nil { - logger.Error("mcp_manager_init_failed", "err", mcpErr) - } else { - logger.Info("mcp manager initialized", "servers", len(cfg.Tools.MCP.Servers)) - } - } - - // Skills loader - var skillLoader *shellskills.Loader - var skillExecutor *shellskills.Executor - if cfg.Skills.Enabled { - skillsPath := cfg.Skills.SkillsPath - if skillsPath == "" { - skillsPath = "skills/" - } - skillLoader = shellskills.NewLoader(skillsPath) - - // Skills executor for scripts - allowedInterpreters := cfg.Tools.Skills.AllowedInterpreters - timeout := cfg.Skills.Timeout - if timeout == 0 { - timeout = 60 * time.Second - } - skillExecutor = shellskills.NewExecutor(allowedInterpreters, timeout) - logger.Info("skills enabled", "path", skillsPath, "categories", cfg.Skills.Categories) - } - // Tool registry — register tools enabled in config - toolReg := buildToolRegistry(cfg, sshExec, matrixClient, memStore, kStore, sharedKStore, mcpManager, skillLoader, skillExecutor, roomCtx, logger) + roomCtx := &toolmemory.RoomContext{} + toolReg := buildToolRegistry(cfg, sshExec, matrixClient, memInit.store, deps.kStore, deps.sharedKStore, deps.mcpManager, deps.skillLoader, deps.skillExecutor, roomCtx, logger) // Rate limiting for tools - if cfg.Security.ToolRateLimit.Enabled { - maxCalls := cfg.Security.ToolRateLimit.MaxCallsPerMin - if maxCalls <= 0 { - maxCalls = 10 - } - rl := tools.NewRateLimiter(maxCalls, time.Minute) - toolReg.SetRateLimiter(rl) - - cleanupInterval := cfg.Security.ToolRateLimit.CleanupIntervalS - if cleanupInterval <= 0 { - cleanupInterval = 60 - } - go func() { - ticker := time.NewTicker(time.Duration(cleanupInterval) * time.Second) - defer ticker.Stop() - for range ticker.C { - rl.Cleanup() - } - }() - logger.Info("tool rate limiting enabled", "max_calls_per_min", maxCalls) - } + initRateLimiter(cfg, toolReg, logger) a := &Agent{ cfg: cfg, @@ -340,17 +161,17 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, agentACL acl.ACL, logge toolReg: toolReg, logger: logger, cryptoStore: cryptoStore, - mcpManager: mcpManager, + mcpManager: deps.mcpManager, done: make(chan struct{}), commands: make(map[string]CommandHandler), cmdAliases: command.BuiltinNames(), startTime: time.Now(), windows: make(map[string]memory.Window), - memStore: memStore, - knowledgeStore: kStore, - sharedKnowledgeStore: sharedKStore, - skillLoader: skillLoader, - windowSize: windowSize, + memStore: memInit.store, + knowledgeStore: deps.kStore, + sharedKnowledgeStore: deps.sharedKStore, + skillLoader: deps.skillLoader, + windowSize: memInit.windowSize, roomCtx: roomCtx, } @@ -375,7 +196,7 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, agentACL acl.ACL, logge a.loadPromptCommands() // Register memory_clear_context with self as WindowClearer (after a is created) - if cfg.Tools.Memory.Enabled && memStore != nil { + if cfg.Tools.Memory.Enabled && memInit.store != nil { toolReg.Register(toolmemory.NewMemoryClearContext(a, roomCtx)) } @@ -391,6 +212,43 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, agentACL acl.ACL, logge return a, nil } +// initCrypto initializes E2EE if enabled and returns the crypto store closer. +func initCrypto(cfg *config.AgentConfig, matrixClient *matrix.Client, logger *slog.Logger) (io.Closer, error) { + if !cfg.Matrix.Encryption.Enabled { + return nil, nil + } + + storePath := filepath.Join(cfg.Matrix.Encryption.StorePath, "crypto.db") + pickleKey := os.Getenv(cfg.Matrix.Encryption.PickleKeyEnv) + logger.Info("initializing e2ee", "store", storePath) + + cryptoStore, err := matrixClient.InitCrypto(context.Background(), storePath, pickleKey, cfg.Agent.ID) + if err != nil { + return nil, fmt.Errorf("e2ee init: %w", err) + } + + // Auto-fetch cross-signing private keys from SSSS if recovery key is configured. + if envName := cfg.Matrix.Encryption.RecoveryKeyEnv; envName != "" { + if rk := os.Getenv(envName); rk != "" { + if err := matrixClient.FetchCrossSigningKeys(context.Background(), rk); err != nil { + logger.Warn("failed to fetch cross-signing keys from SSSS (non-fatal)", "err", err) + } else { + logger.Info("cross-signing private keys fetched from SSSS") + } + } + } + + // Sign own device with the self-signing key so Element shows it as verified. + if err := matrixClient.SignOwnDevice(context.Background()); err != nil { + logger.Warn("failed to sign own device (non-fatal)", "err", err) + } else { + logger.Info("own device signed with cross-signing key") + } + + logger.Info("e2ee ready") + return cryptoStore, nil +} + // RegisterCommand adds a custom command handler for this agent. // The spec provides metadata (aliases, description, usage) for !help. // Must be called before Run(). @@ -404,26 +262,6 @@ func (a *Agent) RegisterCommand(spec command.Spec, handler CommandHandler) { a.logger.Info("command_registered", "command", spec.Name, "aliases", spec.Aliases) } -// loadPromptCommands scans the project-root prompts/ directory and loads all .md files. -func (a *Agent) loadPromptCommands() { - prompts, err := command.LoadPromptCommands("prompts") - if err != nil { - a.logger.Warn("failed to load prompt-commands", "err", err) - return - } - a.promptCmds = make(map[string]string, len(prompts)) - for _, p := range prompts { - a.promptCmds[p.Name] = p.Content - } - if len(a.promptCmds) > 0 { - names := make([]string, 0, len(a.promptCmds)) - for n := range a.promptCmds { - names = append(names, n) - } - a.logger.Info("prompt-commands loaded", "count", len(a.promptCmds), "names", names) - } -} - // SetBus attaches the agent to the inter-agent bus for orchestration. // Must be called before Run(). func (a *Agent) SetBus(b *bus.Bus) { @@ -510,679 +348,3 @@ func (a *Agent) Run(ctx context.Context) error { return a.listener.Run(ctx) } - -// listenBus processes messages from the inter-agent bus. -func (a *Agent) listenBus(ctx context.Context, ch <-chan bus.AgentMessage) { - for { - select { - case <-ctx.Done(): - return - case msg, ok := <-ch: - if !ok { - return - } - if msg.Kind == bus.KindTask { - a.handleTaskEvent(ctx, msg) - } - } - } -} - -// handleTaskEvent processes a task delegated by the orchestrator. -// The bot generates a response and sends it both to Matrix and back via bus. -func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) { - taskJSON, ok := msg.Payload["task_json"] - if !ok { - a.logger.Error("task message missing task_json payload") - return - } - - task, err := orchestration.UnmarshalTaskEvent(taskJSON) - if err != nil { - a.logger.Error("failed to unmarshal task event", "err", err) - return - } - - a.logger.Info("handling orchestrated task", - "task_id", task.TaskID, - "room", task.TargetRoomID, - "sender", task.OriginalSender, - "iteration", task.Iteration, - ) - - roomID := task.TargetRoomID - - // Update room context for memory tools - a.roomCtx.Set(roomID) - - if a.cfg.Personality.Behavior.TypingIndicator { - _ = a.matrix.SendTyping(ctx, roomID, true) - defer a.matrix.SendTyping(ctx, roomID, false) - } - - // Build a synthetic MessageContext from the task - msgCtx := decision.MessageContext{ - SenderID: task.OriginalSender, - RoomID: roomID, - Content: task.OriginalQuestion, - IsDirectMsg: false, - IsMention: true, // treat orchestrated tasks like mentions - } - - // If there are previous responses, prepend context - if len(task.PreviousResponses) > 0 { - var context string - for _, pr := range task.PreviousResponses { - context += fmt.Sprintf("[Previous response from %s]: %s\n\n", pr.BotID, pr.Text) - } - msgCtx.Content = context + "Original question: " + task.OriginalQuestion + - "\n\nPlease provide an improved or complementary answer." - } - - // Sanitize orchestrated input - sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID) - if rejected { - a.logger.Warn("orchestrated task rejected by sanitizer", - "task_id", task.TaskID, "sender", task.OriginalSender) - _ = a.matrix.SendMarkdown(ctx, roomID, "El mensaje fue rechazado por el filtro de seguridad.") - return - } - msgCtx.Content = sanitized - - // Load memory and run LLM - a.ensureWindowLoaded(ctx, roomID) - a.appendToWindow(roomID, coretypes.Message{ - Role: coretypes.RoleUser, Content: msgCtx.Content, - }) - - reply, err := a.runLLM(ctx, msgCtx, roomID) - - // Build the result to send back via bus - result := orchestration.TaskResult{ - TaskID: task.TaskID, - BotID: a.cfg.Agent.ID, - } - - if err != nil { - a.logger.Error("LLM error during orchestrated task", "err", err) - result.Error = err.Error() - reply = "Sorry, I encountered an error." - } else { - result.Text = reply - // Persist assistant reply - a.appendToWindow(roomID, coretypes.Message{ - Role: coretypes.RoleAssistant, Content: reply, - }) - a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply) - } - - // Send reply to Matrix room - if sendErr := a.matrix.SendMarkdown(ctx, roomID, reply); sendErr != nil { - a.logger.Error("failed to send orchestrated reply to Matrix", "err", sendErr) - } - - // Send result back to orchestrator via bus - resultJSON, marshalErr := orchestration.MarshalTaskResult(result) - if marshalErr != nil { - a.logger.Error("failed to marshal task result", "err", marshalErr) - return - } - - replyMsg := bus.AgentMessage{ - From: bus.AgentID(a.cfg.Agent.ID), - To: msg.From, - Kind: bus.KindTaskResult, - Payload: map[string]string{"result_json": resultJSON}, - } - - if busErr := a.agentBus.Reply(task.TaskID, replyMsg); busErr != nil { - a.logger.Error("failed to send task result via bus", "err", busErr) - } -} - -// handleEvent is called by the matrix Listener for each filtered incoming event. -func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) { - a.logger.Debug("handling event", - "sender", msgCtx.SenderID, - "is_dm", msgCtx.IsDirectMsg, - "is_mention", msgCtx.IsMention, - "command", msgCtx.Command, - ) - - roomID := evt.RoomID.String() - - // Update room context for memory tools - a.roomCtx.Set(roomID) - - if a.cfg.Personality.Behavior.TypingIndicator { - _ = a.matrix.SendTyping(ctx, roomID, true) - defer a.matrix.SendTyping(ctx, roomID, false) - } - - // ── Command flow ───────────────────────────────────────────────── - // Commands (!xxx) always resolve before rules or LLM. Never reach the LLM. - // Priority: built-in → unknown (agent-specific commands can be added via RegisterCommand). - if msgCtx.Command != "" { - a.logger.Info("command_received", - "command", msgCtx.Command, - "sender", msgCtx.SenderID, - "room", roomID, - "args", msgCtx.Args, - ) - - // Resolve aliases - cmdName := msgCtx.Command - if canonical, ok := a.cmdAliases[cmdName]; ok { - cmdName = canonical - } - - if handler, ok := a.commands[cmdName]; ok { - // RBAC check for commands - if !a.acl.CanDo(msgCtx.SenderID, "command:"+cmdName) { - a.logger.Info("command_denied", "command", cmdName, "sender", msgCtx.SenderID) - _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, - "No tienes permisos para ejecutar este comando.") - return - } - a.logger.Info("command_executed", "command", cmdName) - reply := handler(ctx, msgCtx) - _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, reply) - return - } - - // Prompt-command: expand .md content and pass to LLM - if content, ok := a.promptCmds[cmdName]; ok { - a.logger.Info("prompt_command_expanded", "command", cmdName) - msgCtx.Content = command.ExpandPrompt(content, msgCtx.Args) - msgCtx.Command = "" - msgCtx.Args = nil - // Fall through to rules/LLM flow below - } else { - // Unknown command — never falls through to rules or LLM - a.logger.Info("command_unknown", "command", msgCtx.Command) - _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, - fmt.Sprintf("Comando desconocido: `!%s`. Usa `!help` para ver comandos disponibles.", msgCtx.Command)) - return - } - } - - // ── Non-command flow ───────────────────────────────────────────── - // RBAC check for LLM access ("ask" action) - if !a.acl.CanDo(msgCtx.SenderID, "ask") { - a.logger.Info("ask_denied", "sender", msgCtx.SenderID) - _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, - "No tienes permisos para interactuar con este agente.") - return - } - - actions := decision.Evaluate(msgCtx, a.rules) - a.logger.Debug("rules evaluated", "matched_actions", len(actions)) - - // If no rules matched and the message mentions the bot or is a DM, use LLM. - if len(actions) == 0 && (msgCtx.IsMention || msgCtx.IsDirectMsg) { - if a.llm == nil { - // Simple bot: no LLM, ignore non-command messages - a.logger.Debug("no LLM configured, ignoring non-command message") - return - } - a.logger.Debug("no rules matched, falling back to LLM") - actions = []decision.Action{{ - Kind: decision.ActionKindLLM, - LLM: &decision.LLMAction{ContextKey: msgCtx.RoomID}, - }} - } - - if len(actions) == 0 { - a.logger.Debug("no actions, ignoring message", - "is_dm", msgCtx.IsDirectMsg, - "is_mention", msgCtx.IsMention, - ) - return - } - - a.executeActions(ctx, roomID, msgCtx, actions) -} - -// executeActions expands LLM actions and runs the effects runner. -func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decision.MessageContext, actions []decision.Action) { - // Auto-thread: if configured and message is not already in a thread, - // start a new thread rooted at the user's message. - if a.cfg.Matrix.Threads.AutoThread && msgCtx.ThreadID == "" && msgCtx.EventID != "" { - msgCtx.ThreadID = msgCtx.EventID - } - - // Sanitize user input before sending to LLM - sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID) - if rejected { - a.runner.Execute(ctx, roomID, []decision.Action{{ - Kind: decision.ActionKindReply, - Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, - }}) - return - } - msgCtx.Content = sanitized - - // Resolve memory key: use thread root as context key when inside a thread, - // so parallel threads in the same room have independent conversation windows. - memKey := roomID - if msgCtx.ThreadID != "" { - memKey = msgCtx.ThreadID - } - - expanded := make([]decision.Action, 0, len(actions)) - for _, act := range actions { - if act.Kind == decision.ActionKindLLM { - if a.llm == nil { - a.logger.Warn("LLM action requested but no LLM configured") - expanded = append(expanded, decision.Action{ - Kind: decision.ActionKindReply, - Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, - }) - continue - } - // Memory: load window + append user message before LLM call - a.ensureWindowLoaded(ctx, memKey) - a.appendToWindow(memKey, coretypes.Message{ - Role: coretypes.RoleUser, Content: msgCtx.Content, - }) - a.persistMessage(ctx, memKey, coretypes.RoleUser, msgCtx.Content) - - reply, err := a.runLLM(ctx, msgCtx, memKey) - if err != nil { - a.logger.Error("llm error", "err", err) - expanded = append(expanded, decision.Action{ - Kind: decision.ActionKindReply, - Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, - }) - } else { - expanded = append(expanded, decision.Action{ - Kind: decision.ActionKindReply, - Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, - }) - - // Memory: append assistant reply after LLM call - a.appendToWindow(memKey, coretypes.Message{ - Role: coretypes.RoleAssistant, Content: reply, - }) - a.persistMessage(ctx, memKey, coretypes.RoleAssistant, reply) - } - } else { - expanded = append(expanded, act) - } - } - - a.runner.Execute(ctx, roomID, expanded) -} - -func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext, memKey string) (string, error) { - a.logger.Debug("calling LLM", - "model", a.cfg.LLM.Primary.Model, - "provider", a.cfg.LLM.Primary.Provider, - ) - - // Load system prompt from file if configured, else use description - systemPrompt := a.cfg.Agent.Description - if spFile := a.cfg.LLM.Reasoning.SystemPromptFile; spFile != "" { - // Resolve path relative to agent directory - spPath := filepath.Join("agents", a.cfg.Agent.ID, spFile) - if data, err := os.ReadFile(spPath); err == nil { - systemPrompt = string(data) - } else { - a.logger.Warn("failed to load system_prompt_file, using description", "path", spPath, "err", err) - } - } - - // Concatenate personality prompt block - personalityBlock := personality.BuildPersonalityPrompt(a.personality) - if personalityBlock != "" { - systemPrompt = systemPrompt + "\n\n" + personalityBlock - } - - // Build messages: conversation history from window (includes current user msg) - messages := a.getWindowMessages(memKey) - if len(messages) == 0 { - // Fallback if memory is disabled: just the current message - messages = []coretypes.Message{ - {Role: coretypes.RoleUser, Content: msgCtx.Content}, - } - } - - // Build tool specs for the LLM if tool_use is enabled - var llmTools []coretypes.ToolSpec - if a.cfg.LLM.ToolUse.Enabled && a.toolReg.Len() > 0 { - llmTools = a.toolReg.ToLLMSpecs() - a.logger.Debug("tools available for LLM", "count", len(llmTools)) - } - - maxIter := a.cfg.LLM.ToolUse.MaxIterations - if maxIter <= 0 { - maxIter = defaultMaxToolIterations - } - - // Tool-use loop: call LLM → execute tools → feed results back → repeat - for i := 0; i < maxIter; i++ { - req := coretypes.CompletionRequest{ - Model: a.cfg.LLM.Primary.Model, - MaxTokens: a.cfg.LLM.Primary.MaxTokens, - Temperature: a.cfg.LLM.Primary.Temperature, - SystemPrompt: systemPrompt, - Messages: messages, - Tools: llmTools, - } - - resp, err := a.llm(ctx, req) - if err != nil { - a.logger.Error("LLM call failed", "model", req.Model, "err", err) - return "", err - } - - a.logger.Debug("LLM responded", - "content_len", len(resp.Content), - "tool_calls", len(resp.ToolCalls), - "finish_reason", resp.FinishReason, - ) - - // No tool calls — return the text response - if len(resp.ToolCalls) == 0 { - return resp.Content, nil - } - - // Append assistant message with tool calls to conversation - messages = append(messages, coretypes.Message{ - Role: coretypes.RoleAssistant, - Content: resp.Content, - ToolCalls: resp.ToolCalls, - }) - - // Execute each tool and append results - for _, tc := range resp.ToolCalls { - a.logger.Info("executing tool", - "tool", tc.Name, - "call_id", tc.ID, - ) - - // RBAC check for tool execution - if !a.acl.CanDo(msgCtx.SenderID, "tool:"+tc.Name) { - a.logger.Info("tool_denied", "tool", tc.Name, "sender", msgCtx.SenderID) - messages = append(messages, coretypes.Message{ - Role: coretypes.RoleTool, - Content: "error: permission denied for tool " + tc.Name, - ToolCallID: tc.ID, - }) - continue - } - - // Notify the room that a tool is being called (respect thread context) - toolNotice := fmt.Sprintf("🔨 %s", tc.Name) - if err := a.sendReply(ctx, msgCtx.RoomID, msgCtx.EventID, msgCtx.ThreadID, toolNotice); err != nil { - a.logger.Warn("failed to send tool call notice", "tool", tc.Name, "err", err) - } - - result := a.toolReg.ExecuteForRoom(ctx, tc.Name, tc.Arguments, msgCtx.RoomID) - - output := result.Output - if result.Err != nil { - output = fmt.Sprintf("error: %s", result.Err) - a.logger.Warn("tool execution error", - "tool", tc.Name, - "err", result.Err, - ) - } else { - a.logger.Debug("tool executed", - "tool", tc.Name, - "output_len", len(output), - ) - } - - messages = append(messages, coretypes.Message{ - Role: coretypes.RoleTool, - Content: output, - ToolCallID: tc.ID, - }) - } - } - - // Max iterations reached — return whatever we have - a.logger.Warn("tool-use loop reached max iterations", "max", maxIter) - return "I've reached the maximum number of tool iterations. Here's what I found so far.", nil -} - -// ── Memory helpers ─────────────────────────────────────────────────────── - -// ensureWindowLoaded loads the conversation window from SQLite on first access for a room. -func (a *Agent) ensureWindowLoaded(ctx context.Context, roomID string) { - a.windowsMu.Lock() - defer a.windowsMu.Unlock() - if _, ok := a.windows[roomID]; ok { - return - } - w := memory.NewWindow(a.windowSize) - if a.memStore != nil { - msgs, err := a.memStore.LoadMessages(ctx, a.cfg.Agent.ID, roomID, a.windowSize) - if err != nil { - a.logger.Warn("failed to load message history", "room", roomID, "err", err) - } else { - for _, m := range msgs { - w = w.Append(coretypes.Message{Role: m.Role, Content: m.Content}) - } - if len(msgs) > 0 { - a.logger.Debug("loaded message history", "room", roomID, "count", len(msgs)) - } - } - } - a.windows[roomID] = w -} - -// appendToWindow adds a message to the in-memory conversation window. -func (a *Agent) appendToWindow(roomID string, msg coretypes.Message) { - a.windowsMu.Lock() - defer a.windowsMu.Unlock() - w, ok := a.windows[roomID] - if !ok { - w = memory.NewWindow(a.windowSize) - } - a.windows[roomID] = w.Append(msg) -} - -// getWindowMessages returns a copy of the conversation window for a room. -func (a *Agent) getWindowMessages(roomID string) []coretypes.Message { - a.windowsMu.RLock() - defer a.windowsMu.RUnlock() - w, ok := a.windows[roomID] - if !ok { - return nil - } - return w.ToLLMMessages() -} - -// persistMessage saves a message to the SQLite store (no-op if store is nil). -func (a *Agent) persistMessage(ctx context.Context, roomID string, role coretypes.Role, content string) { - if a.memStore == nil { - return - } - if err := a.memStore.SaveMessage(ctx, memory.HistoryMessage{ - AgentID: a.cfg.Agent.ID, - RoomID: roomID, - Role: role, - Content: content, - }); err != nil { - a.logger.Warn("failed to persist message", "room", roomID, "err", err) - } -} - -// sendReply sends a markdown reply that respects thread context. -// If threadID is non-empty, the reply is sent as part of that thread. -func (a *Agent) sendReply(ctx context.Context, roomID, eventID, threadID, markdown string) error { - if threadID != "" { - return a.matrix.SendThreadMarkdown(ctx, roomID, threadID, eventID, markdown) - } - return a.matrix.SendReplyMarkdown(ctx, roomID, eventID, markdown) -} - -// parseSeverity converts a config string to sanitize.Severity. -func parseSeverity(s string) sanitize.Severity { - switch s { - case "high": - return sanitize.SeverityHigh - case "low": - return sanitize.SeverityLow - default: - return sanitize.SeverityMedium - } -} - -// sanitizeInput runs prompt injection detection on the message content. -// Returns the (possibly modified) content and true if the message should be rejected. -func (a *Agent) sanitizeInput(content, roomID, senderID string) (string, bool) { - if a.sanitizeOpts == nil { - return content, false - } - - result := sanitize.Sanitize(content, *a.sanitizeOpts) - - for _, w := range result.Warnings { - a.logger.Warn("prompt_injection_detected", - "pattern", w.PatternName, - "severity", w.Severity, - "matched", w.Matched, - "sender", senderID, - "room", roomID, - ) - } - - return result.Output, result.Rejected -} - -// resolveDataBase returns the base directory for agent runtime data. -// Priority: config storage.base_path > $AGENTS_DATA_DIR/ > agents//data -func resolveDataBase(cfg *config.AgentConfig) string { - if cfg.Storage.BasePath != "" { - return cfg.Storage.BasePath - } - if envDir := os.Getenv("AGENTS_DATA_DIR"); envDir != "" { - return filepath.Join(envDir, cfg.Agent.ID) - } - return filepath.Join("agents", cfg.Agent.ID, "data") -} - -// buildToolRegistry creates a Registry with tools enabled in the agent's config. -func buildToolRegistry( - cfg *config.AgentConfig, - sshExec *ssh.Executor, - matrixClient *matrix.Client, - memStore memory.Store, - kStore *shellknowledge.FileStore, - sharedKStore *shellknowledge.FileStore, - mcpManager *shellmcp.Manager, - skillLoader *shellskills.Loader, - skillExecutor *shellskills.Executor, - roomCtx *toolmemory.RoomContext, - logger *slog.Logger, -) *tools.Registry { - reg := tools.NewRegistry(logger) - - if cfg.Tools.HTTP.Enabled { - reg.Register(toolhttp.NewHTTPGet(cfg.Tools.HTTP)) - reg.Register(toolhttp.NewHTTPPost(cfg.Tools.HTTP)) - logger.Debug("registered http tools") - } - - if cfg.Tools.SSH.Enabled { - reg.Register(toolssh.NewSSHCommand(cfg.Tools.SSH, sshExec)) - logger.Debug("registered ssh tool") - } - - if cfg.Tools.FileOps.Enabled { - reg.Register(toolfile.NewReadFile(cfg.Tools.FileOps)) - reg.Register(toolfile.NewListDirectory(cfg.Tools.FileOps)) - if !cfg.Tools.FileOps.ReadOnly { - reg.Register(toolfile.NewWriteFile(cfg.Tools.FileOps)) - reg.Register(toolfile.NewAppendFile(cfg.Tools.FileOps)) - reg.Register(toolfile.NewDeleteFile(cfg.Tools.FileOps)) - } - logger.Debug("registered file tools") - } - - // current_time is always available - reg.Register(toolclock.NewCurrentTime()) - logger.Debug("registered current_time tool") - - // weather tool is always available - reg.Register(toolweather.NewWeather()) - logger.Debug("registered weather tool") - - // imdb tool (enabled via config) - if cfg.Tools.IMDb.Enabled { - reg.Register(toolimdb.NewIMDbSearch(cfg.Tools.IMDb)) - logger.Debug("registered imdb tool") - } - - // matrix_send is always available - reg.Register(toolmatrix.NewMatrixSend(matrixClient, cfg.Tools.Matrix)) - logger.Debug("registered matrix tool") - - // Memory tools (memory_clear_context registered later since it needs the Agent) - if cfg.Tools.Memory.Enabled && memStore != nil { - reg.Register(toolmemory.NewMemorySave(cfg.Agent.ID, memStore)) - reg.Register(toolmemory.NewMemoryRecall(cfg.Agent.ID, memStore)) - reg.Register(toolmemory.NewMemoryForget(cfg.Agent.ID, memStore)) - reg.Register(toolmemory.NewMemorySummary(cfg.Agent.ID, memStore)) - logger.Debug("registered memory tools") - } - - // Knowledge tools - if cfg.Tools.Knowledge.Enabled && kStore != nil { - reg.Register(toolknowledge.NewKnowledgeSearch(kStore)) - reg.Register(toolknowledge.NewKnowledgeRead(kStore)) - reg.Register(toolknowledge.NewKnowledgeWrite(kStore)) - reg.Register(toolknowledge.NewKnowledgeList(kStore)) - logger.Debug("registered knowledge tools") - } - - // Shared knowledge tools - if cfg.Tools.SharedKnowledge.Enabled && sharedKStore != nil { - sharedTools := toolknowledge.NewSharedKnowledgeTools(sharedKStore) - for _, tool := range sharedTools { - reg.Register(tool) - } - logger.Debug("registered shared knowledge tools", "count", len(sharedTools)) - } - - // MCP tools — register tools from all connected MCP servers - if mcpManager != nil { - for serverName, mcpClient := range mcpManager.AllClients() { - // Find the config for this server to get prefix, filter, timeout - var serverCfg *config.MCPServerCfg - for i := range cfg.Tools.MCP.Servers { - if cfg.Tools.MCP.Servers[i].Name == serverName { - serverCfg = &cfg.Tools.MCP.Servers[i] - break - } - } - if serverCfg == nil { - logger.Warn("no config found for MCP server", "name", serverName) - continue - } - - // Convert and register MCP tools - mcpTools := toolmcp.FromMCPServer(mcpClient, serverCfg.Prefix, serverCfg.Tools, serverCfg.Timeout, logger) - for _, tool := range mcpTools { - reg.Register(tool) - } - logger.Debug("registered MCP tools", "server", serverName, "count", len(mcpTools)) - } - } - - // Skills tools — register skill search, load, read, and run tools - if skillLoader != nil { - reg.Register(toolskills.NewSkillSearch(skillLoader, cfg.Skills.Categories)) - reg.Register(toolskills.NewSkillLoad(skillLoader)) - reg.Register(toolskills.NewSkillReadResource(skillLoader)) - if skillExecutor != nil { - reg.Register(toolskills.NewSkillRunScript(skillLoader, skillExecutor)) - } - logger.Debug("registered skills tools") - } - - return reg -}