package devagents import ( "context" "fmt" "maunium.net/go/mautrix/event" "github.com/enmanuel/agents/pkg/command" "github.com/enmanuel/agents/pkg/decision" coretypes "github.com/enmanuel/agents/pkg/llm" "github.com/enmanuel/agents/pkg/orchestration" "github.com/enmanuel/agents/pkg/sanitize" "github.com/enmanuel/agents/shell/audit" "github.com/enmanuel/agents/shell/bus" "github.com/enmanuel/agents/shell/effects" ) // handleEvent is called by the matrix Listener for each filtered incoming event. func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) { a.logger.Debug("handling event", "sender", msgCtx.SenderID, "is_dm", msgCtx.IsDirectMsg, "is_mention", msgCtx.IsMention, "command", msgCtx.Command, ) roomID := evt.RoomID.String() // Audit: message_received a.emitAudit(audit.Event{ AgentID: a.cfg.Agent.ID, EventType: audit.EventMessageReceived, SenderID: msgCtx.SenderID, RoomID: roomID, Detail: fmt.Sprintf("is_dm=%v is_mention=%v", msgCtx.IsDirectMsg, msgCtx.IsMention), }) // Update room context for memory tools a.roomCtx.Set(roomID) if a.cfg.Personality.Behavior.TypingIndicator { _ = a.sender.SendTyping(ctx, roomID, true) defer a.sender.SendTyping(ctx, roomID, false) } // ── Command flow ───────────────────────────────────────────────── // Commands (!xxx) always resolve before rules or LLM. Never reach the LLM. // Priority: built-in → unknown (agent-specific commands can be added via RegisterCommand). if msgCtx.Command != "" { a.logger.Info("command_received", "command", msgCtx.Command, "sender", msgCtx.SenderID, "room", roomID, "args", msgCtx.Args, ) // Resolve aliases cmdName := msgCtx.Command if canonical, ok := a.cmdAliases[cmdName]; ok { cmdName = canonical } if handler, ok := a.commands[cmdName]; ok { // RBAC check for commands if !a.acl.CanDo(msgCtx.SenderID, "command:"+cmdName) { a.logger.Info("command_denied", "command", cmdName, "sender", msgCtx.SenderID) _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, "No tienes permisos para ejecutar este comando.") return } a.logger.Info("command_executed", "command", cmdName) // Audit: command_exec a.emitAudit(audit.Event{ AgentID: a.cfg.Agent.ID, EventType: audit.EventCommandExec, SenderID: msgCtx.SenderID, RoomID: roomID, Detail: fmt.Sprintf("command=%s", cmdName), }) reply := handler(ctx, msgCtx) _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, reply) return } // Prompt-command: expand .md content and pass to LLM if content, ok := a.promptCmds[cmdName]; ok { a.logger.Info("prompt_command_expanded", "command", cmdName) msgCtx.Content = command.ExpandPrompt(content, msgCtx.Args) msgCtx.Command = "" msgCtx.Args = nil // Fall through to rules/LLM flow below } else { // Unknown command — never falls through to rules or LLM a.logger.Info("command_unknown", "command", msgCtx.Command) unknownMsg := fmt.Sprintf("Comando desconocido: `!%s`. Usa `!help` para ver comandos disponibles.", msgCtx.Command) if a.cfg.Matrix.Filters.CommandPrefix == "" { unknownMsg = fmt.Sprintf("Comando desconocido: `%s`. Usa `help` para ver comandos disponibles.", msgCtx.Command) } _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, unknownMsg) return } } // ── Non-command flow ───────────────────────────────────────────── // RBAC check for LLM access ("ask" action) if !a.acl.CanDo(msgCtx.SenderID, "ask") { a.logger.Info("ask_denied", "sender", msgCtx.SenderID) _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, "No tienes permisos para interactuar con este agente.") return } actions := decision.Evaluate(msgCtx, a.rules) a.logger.Debug("rules evaluated", "matched_actions", len(actions)) // If no rules matched and the message mentions the bot or is a DM, use LLM. if len(actions) == 0 && (msgCtx.IsMention || msgCtx.IsDirectMsg) { if a.llm == nil { // Simple bot: no LLM, ignore non-command messages a.logger.Debug("no LLM configured, ignoring non-command message") return } a.logger.Debug("no rules matched, falling back to LLM") actions = []decision.Action{{ Kind: decision.ActionKindLLM, LLM: &decision.LLMAction{ContextKey: msgCtx.RoomID}, }} } if len(actions) == 0 { a.logger.Debug("no actions, ignoring message", "is_dm", msgCtx.IsDirectMsg, "is_mention", msgCtx.IsMention, ) return } a.executeActions(ctx, roomID, msgCtx, actions) } // executeActions expands LLM actions and runs the effects runner. func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decision.MessageContext, actions []decision.Action) { // Auto-thread: if configured and message is not already in a thread, // start a new thread rooted at the user's message. if a.cfg.Matrix.Threads.AutoThread && msgCtx.ThreadID == "" && msgCtx.EventID != "" { msgCtx.ThreadID = msgCtx.EventID } // Sanitize user input before sending to LLM sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID) if rejected { a.runner.Execute(ctx, roomID, []decision.Action{{ Kind: decision.ActionKindReply, Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, }}) return } msgCtx.Content = sanitized // Resolve memory key: use thread root as context key when inside a thread, // so parallel threads in the same room have independent conversation windows. memKey := roomID if msgCtx.ThreadID != "" { memKey = msgCtx.ThreadID } expanded := make([]decision.Action, 0, len(actions)) for _, act := range actions { if act.Kind == decision.ActionKindLLM { if a.llm == nil { a.logger.Warn("LLM action requested but no LLM configured") expanded = append(expanded, decision.Action{ Kind: decision.ActionKindReply, Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, }) continue } // Memory: load window + append user message before LLM call a.ensureWindowLoaded(ctx, memKey) a.appendToWindow(memKey, coretypes.Message{ Role: coretypes.RoleUser, Content: msgCtx.Content, }) a.persistMessage(ctx, memKey, coretypes.RoleUser, msgCtx.Content) // Create ProgressReporter for claude-code streaming if enabled var progress *effects.ProgressReporter if a.isStreamingEnabled() { progress = effects.NewProgressReporter(a.sender, roomID, a.logger) } reply, err := a.runLLM(ctx, msgCtx, memKey, progress) if err != nil { a.logger.Error("llm error", "err", err) if progress != nil { progress.Finalize("\u274c Error al procesar la solicitud.") } expanded = append(expanded, decision.Action{ Kind: decision.ActionKindReply, Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, }) } else { // If progress reporter was used, finalize it with a done indicator if progress != nil && progress.EventID() != "" { progress.Finalize("\u2705 *Completado*") } expanded = append(expanded, decision.Action{ Kind: decision.ActionKindReply, Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, }) // Memory: append assistant reply after LLM call a.appendToWindow(memKey, coretypes.Message{ Role: coretypes.RoleAssistant, Content: reply, }) a.persistMessage(ctx, memKey, coretypes.RoleAssistant, reply) } } else { expanded = append(expanded, act) } } a.runner.Execute(ctx, roomID, expanded) } // listenBus processes messages from the inter-agent bus. func (a *Agent) listenBus(ctx context.Context, ch <-chan bus.AgentMessage) { for { select { case <-ctx.Done(): return case msg, ok := <-ch: if !ok { return } if msg.Kind == bus.KindTask { a.handleTaskEvent(ctx, msg) } } } } // handleTaskEvent processes a task delegated by the orchestrator. // The bot generates a response and sends it both to Matrix and back via bus. func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) { taskJSON, ok := msg.Payload["task_json"] if !ok { a.logger.Error("task message missing task_json payload") return } task, err := orchestration.UnmarshalTaskEvent(taskJSON) if err != nil { a.logger.Error("failed to unmarshal task event", "err", err) return } a.logger.Info("handling orchestrated task", "task_id", task.TaskID, "room", task.TargetRoomID, "sender", task.OriginalSender, "iteration", task.Iteration, ) roomID := task.TargetRoomID // Update room context for memory tools a.roomCtx.Set(roomID) if a.cfg.Personality.Behavior.TypingIndicator { _ = a.sender.SendTyping(ctx, roomID, true) defer a.sender.SendTyping(ctx, roomID, false) } // Build a synthetic MessageContext from the task msgCtx := decision.MessageContext{ SenderID: task.OriginalSender, RoomID: roomID, Content: task.OriginalQuestion, IsDirectMsg: false, IsMention: true, // treat orchestrated tasks like mentions } // If there are previous responses, prepend context if len(task.PreviousResponses) > 0 { var context string for _, pr := range task.PreviousResponses { context += fmt.Sprintf("[Previous response from %s]: %s\n\n", pr.BotID, pr.Text) } msgCtx.Content = context + "Original question: " + task.OriginalQuestion + "\n\nPlease provide an improved or complementary answer." } // Sanitize orchestrated input sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID) if rejected { a.logger.Warn("orchestrated task rejected by sanitizer", "task_id", task.TaskID, "sender", task.OriginalSender) _ = a.sender.SendMarkdown(ctx, roomID, "El mensaje fue rechazado por el filtro de seguridad.") return } msgCtx.Content = sanitized // Load memory and run LLM a.ensureWindowLoaded(ctx, roomID) a.appendToWindow(roomID, coretypes.Message{ Role: coretypes.RoleUser, Content: msgCtx.Content, }) reply, err := a.runLLM(ctx, msgCtx, roomID, nil) // Build the result to send back via bus result := orchestration.TaskResult{ TaskID: task.TaskID, BotID: a.cfg.Agent.ID, } if err != nil { a.logger.Error("LLM error during orchestrated task", "err", err) result.Error = err.Error() reply = "Sorry, I encountered an error." } else { result.Text = reply // Persist assistant reply a.appendToWindow(roomID, coretypes.Message{ Role: coretypes.RoleAssistant, Content: reply, }) a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply) } // Send reply to Matrix room if sendErr := a.sender.SendMarkdown(ctx, roomID, reply); sendErr != nil { a.logger.Error("failed to send orchestrated reply to Matrix", "err", sendErr) } // Send result back to orchestrator via bus resultJSON, marshalErr := orchestration.MarshalTaskResult(result) if marshalErr != nil { a.logger.Error("failed to marshal task result", "err", marshalErr) return } replyMsg := bus.AgentMessage{ From: bus.AgentID(a.cfg.Agent.ID), To: msg.From, Kind: bus.KindTaskResult, Payload: map[string]string{"result_json": resultJSON}, } if busErr := a.agentBus.Reply(task.TaskID, replyMsg); busErr != nil { a.logger.Error("failed to send task result via bus", "err", busErr) } } // sendReply sends a markdown reply that respects thread context. // If threadID is non-empty, the reply is sent as part of that thread. func (a *Agent) sendReply(ctx context.Context, roomID, eventID, threadID, markdown string) error { if threadID != "" { return a.sender.SendThreadMarkdown(ctx, roomID, threadID, eventID, markdown) } return a.sender.SendReplyMarkdown(ctx, roomID, eventID, markdown) } // parseSeverity converts a config string to sanitize.Severity. func parseSeverity(s string) sanitize.Severity { switch s { case "high": return sanitize.SeverityHigh case "low": return sanitize.SeverityLow default: return sanitize.SeverityMedium } } // emitAudit writes an audit event if the audit writer is enabled. func (a *Agent) emitAudit(evt audit.Event) { if a.auditWriter != nil { a.auditWriter.Emit(evt) } } // isStreamingEnabled returns true when the agent uses claude-code provider // with streaming and show_tool_progress both enabled. func (a *Agent) isStreamingEnabled() bool { return a.cfg.LLM.Primary.Provider == "claude-code" && a.cfg.LLM.Primary.ClaudeCode.Streaming && a.cfg.LLM.Primary.ClaudeCode.ShowToolProgress } // sanitizeInput runs prompt injection detection on the message content. // Returns the (possibly modified) content and true if the message should be rejected. func (a *Agent) sanitizeInput(content, roomID, senderID string) (string, bool) { if a.sanitizeOpts == nil { return content, false } result := sanitize.Sanitize(content, *a.sanitizeOpts) for _, w := range result.Warnings { a.logger.Warn("prompt_injection_detected", "pattern", w.PatternName, "severity", w.Severity, "matched", w.Matched, "sender", senderID, "room", roomID, ) } return result.Output, result.Rejected }