45bd258be1
Implementa la Fase 2 del issue 0036: mensajes de progreso en tiempo real que muestran al usuario que herramientas esta usando el agente claude-code. - SendMarkdownGetID en shell/matrix/client.go: envia mensaje y retorna el event ID para editarlo despues - EditMessage en shell/matrix/client.go: edita un mensaje existente usando m.replace (m.relates_to con rel_type=m.replace) - ProgressReporter en shell/effects/progress.go (NEW): recibe streaming events y actualiza un mensaje unico en Matrix mostrando el progreso (e.g. "🔧 Bash: ls -la" → "🔧 Read: file.go" → "✅ Completado") - Rate limiter integrado: max 1 edit/segundo para no saturar el homeserver - Conectado en devagents/handler.go: cuando provider=claude-code y streaming+show_tool_progress habilitados, crea ProgressReporter y pasa StreamFunc al CompletionRequest - MatrixSender interface actualizada con los nuevos metodos - 10 tests nuevos para ProgressReporter, todos los existentes pasan Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
415 lines
13 KiB
Go
415 lines
13 KiB
Go
package devagents
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"maunium.net/go/mautrix/event"
|
|
|
|
"github.com/enmanuel/agents/pkg/command"
|
|
"github.com/enmanuel/agents/pkg/decision"
|
|
coretypes "github.com/enmanuel/agents/pkg/llm"
|
|
"github.com/enmanuel/agents/pkg/orchestration"
|
|
"github.com/enmanuel/agents/pkg/sanitize"
|
|
"github.com/enmanuel/agents/shell/audit"
|
|
"github.com/enmanuel/agents/shell/bus"
|
|
"github.com/enmanuel/agents/shell/effects"
|
|
)
|
|
|
|
// handleEvent is called by the matrix Listener for each filtered incoming event.
|
|
func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) {
|
|
a.logger.Debug("handling event",
|
|
"sender", msgCtx.SenderID,
|
|
"is_dm", msgCtx.IsDirectMsg,
|
|
"is_mention", msgCtx.IsMention,
|
|
"command", msgCtx.Command,
|
|
)
|
|
|
|
roomID := evt.RoomID.String()
|
|
|
|
// Audit: message_received
|
|
a.emitAudit(audit.Event{
|
|
AgentID: a.cfg.Agent.ID,
|
|
EventType: audit.EventMessageReceived,
|
|
SenderID: msgCtx.SenderID,
|
|
RoomID: roomID,
|
|
Detail: fmt.Sprintf("is_dm=%v is_mention=%v", msgCtx.IsDirectMsg, msgCtx.IsMention),
|
|
})
|
|
|
|
// Update room context for memory tools
|
|
a.roomCtx.Set(roomID)
|
|
|
|
if a.cfg.Personality.Behavior.TypingIndicator {
|
|
_ = a.sender.SendTyping(ctx, roomID, true)
|
|
defer a.sender.SendTyping(ctx, roomID, false)
|
|
}
|
|
|
|
// ── Command flow ─────────────────────────────────────────────────
|
|
// Commands (!xxx) always resolve before rules or LLM. Never reach the LLM.
|
|
// Priority: built-in → unknown (agent-specific commands can be added via RegisterCommand).
|
|
if msgCtx.Command != "" {
|
|
a.logger.Info("command_received",
|
|
"command", msgCtx.Command,
|
|
"sender", msgCtx.SenderID,
|
|
"room", roomID,
|
|
"args", msgCtx.Args,
|
|
)
|
|
|
|
// Resolve aliases
|
|
cmdName := msgCtx.Command
|
|
if canonical, ok := a.cmdAliases[cmdName]; ok {
|
|
cmdName = canonical
|
|
}
|
|
|
|
if handler, ok := a.commands[cmdName]; ok {
|
|
// RBAC check for commands
|
|
if !a.acl.CanDo(msgCtx.SenderID, "command:"+cmdName) {
|
|
a.logger.Info("command_denied", "command", cmdName, "sender", msgCtx.SenderID)
|
|
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
|
|
"No tienes permisos para ejecutar este comando.")
|
|
return
|
|
}
|
|
a.logger.Info("command_executed", "command", cmdName)
|
|
|
|
// Audit: command_exec
|
|
a.emitAudit(audit.Event{
|
|
AgentID: a.cfg.Agent.ID,
|
|
EventType: audit.EventCommandExec,
|
|
SenderID: msgCtx.SenderID,
|
|
RoomID: roomID,
|
|
Detail: fmt.Sprintf("command=%s", cmdName),
|
|
})
|
|
|
|
reply := handler(ctx, msgCtx)
|
|
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, reply)
|
|
return
|
|
}
|
|
|
|
// Prompt-command: expand .md content and pass to LLM
|
|
if content, ok := a.promptCmds[cmdName]; ok {
|
|
a.logger.Info("prompt_command_expanded", "command", cmdName)
|
|
msgCtx.Content = command.ExpandPrompt(content, msgCtx.Args)
|
|
msgCtx.Command = ""
|
|
msgCtx.Args = nil
|
|
// Fall through to rules/LLM flow below
|
|
} else {
|
|
// Unknown command — never falls through to rules or LLM
|
|
a.logger.Info("command_unknown", "command", msgCtx.Command)
|
|
unknownMsg := fmt.Sprintf("Comando desconocido: `!%s`. Usa `!help` para ver comandos disponibles.", msgCtx.Command)
|
|
if a.cfg.Matrix.Filters.CommandPrefix == "" {
|
|
unknownMsg = fmt.Sprintf("Comando desconocido: `%s`. Usa `help` para ver comandos disponibles.", msgCtx.Command)
|
|
}
|
|
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, unknownMsg)
|
|
return
|
|
}
|
|
}
|
|
|
|
// ── Non-command flow ─────────────────────────────────────────────
|
|
// RBAC check for LLM access ("ask" action)
|
|
if !a.acl.CanDo(msgCtx.SenderID, "ask") {
|
|
a.logger.Info("ask_denied", "sender", msgCtx.SenderID)
|
|
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
|
|
"No tienes permisos para interactuar con este agente.")
|
|
return
|
|
}
|
|
|
|
actions := decision.Evaluate(msgCtx, a.rules)
|
|
a.logger.Debug("rules evaluated", "matched_actions", len(actions))
|
|
|
|
// If no rules matched and the message mentions the bot or is a DM, use LLM.
|
|
if len(actions) == 0 && (msgCtx.IsMention || msgCtx.IsDirectMsg) {
|
|
if a.llm == nil {
|
|
// Simple bot: no LLM, ignore non-command messages
|
|
a.logger.Debug("no LLM configured, ignoring non-command message")
|
|
return
|
|
}
|
|
a.logger.Debug("no rules matched, falling back to LLM")
|
|
actions = []decision.Action{{
|
|
Kind: decision.ActionKindLLM,
|
|
LLM: &decision.LLMAction{ContextKey: msgCtx.RoomID},
|
|
}}
|
|
}
|
|
|
|
if len(actions) == 0 {
|
|
a.logger.Debug("no actions, ignoring message",
|
|
"is_dm", msgCtx.IsDirectMsg,
|
|
"is_mention", msgCtx.IsMention,
|
|
)
|
|
return
|
|
}
|
|
|
|
a.executeActions(ctx, roomID, msgCtx, actions)
|
|
}
|
|
|
|
// executeActions expands LLM actions and runs the effects runner.
|
|
func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decision.MessageContext, actions []decision.Action) {
|
|
// Auto-thread: if configured and message is not already in a thread,
|
|
// start a new thread rooted at the user's message.
|
|
if a.cfg.Matrix.Threads.AutoThread && msgCtx.ThreadID == "" && msgCtx.EventID != "" {
|
|
msgCtx.ThreadID = msgCtx.EventID
|
|
}
|
|
|
|
// Sanitize user input before sending to LLM
|
|
sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID)
|
|
if rejected {
|
|
a.runner.Execute(ctx, roomID, []decision.Action{{
|
|
Kind: decision.ActionKindReply,
|
|
Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
|
|
}})
|
|
return
|
|
}
|
|
msgCtx.Content = sanitized
|
|
|
|
// Resolve memory key: use thread root as context key when inside a thread,
|
|
// so parallel threads in the same room have independent conversation windows.
|
|
memKey := roomID
|
|
if msgCtx.ThreadID != "" {
|
|
memKey = msgCtx.ThreadID
|
|
}
|
|
|
|
expanded := make([]decision.Action, 0, len(actions))
|
|
for _, act := range actions {
|
|
if act.Kind == decision.ActionKindLLM {
|
|
if a.llm == nil {
|
|
a.logger.Warn("LLM action requested but no LLM configured")
|
|
expanded = append(expanded, decision.Action{
|
|
Kind: decision.ActionKindReply,
|
|
Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
|
|
})
|
|
continue
|
|
}
|
|
// Memory: load window + append user message before LLM call
|
|
a.ensureWindowLoaded(ctx, memKey)
|
|
a.appendToWindow(memKey, coretypes.Message{
|
|
Role: coretypes.RoleUser, Content: msgCtx.Content,
|
|
})
|
|
a.persistMessage(ctx, memKey, coretypes.RoleUser, msgCtx.Content)
|
|
|
|
// Create ProgressReporter for claude-code streaming if enabled
|
|
var progress *effects.ProgressReporter
|
|
if a.isStreamingEnabled() {
|
|
progress = effects.NewProgressReporter(a.sender, roomID, a.logger)
|
|
}
|
|
|
|
reply, err := a.runLLM(ctx, msgCtx, memKey, progress)
|
|
if err != nil {
|
|
a.logger.Error("llm error", "err", err)
|
|
if progress != nil {
|
|
progress.Finalize("\u274c Error al procesar la solicitud.")
|
|
}
|
|
expanded = append(expanded, decision.Action{
|
|
Kind: decision.ActionKindReply,
|
|
Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
|
|
})
|
|
} else {
|
|
// If progress reporter was used, finalize it with a done indicator
|
|
if progress != nil && progress.EventID() != "" {
|
|
progress.Finalize("\u2705 *Completado*")
|
|
}
|
|
|
|
expanded = append(expanded, decision.Action{
|
|
Kind: decision.ActionKindReply,
|
|
Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
|
|
})
|
|
|
|
// Memory: append assistant reply after LLM call
|
|
a.appendToWindow(memKey, coretypes.Message{
|
|
Role: coretypes.RoleAssistant, Content: reply,
|
|
})
|
|
a.persistMessage(ctx, memKey, coretypes.RoleAssistant, reply)
|
|
}
|
|
} else {
|
|
expanded = append(expanded, act)
|
|
}
|
|
}
|
|
|
|
a.runner.Execute(ctx, roomID, expanded)
|
|
}
|
|
|
|
// listenBus processes messages from the inter-agent bus.
|
|
func (a *Agent) listenBus(ctx context.Context, ch <-chan bus.AgentMessage) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case msg, ok := <-ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
if msg.Kind == bus.KindTask {
|
|
a.handleTaskEvent(ctx, msg)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleTaskEvent processes a task delegated by the orchestrator.
|
|
// The bot generates a response and sends it both to Matrix and back via bus.
|
|
func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) {
|
|
taskJSON, ok := msg.Payload["task_json"]
|
|
if !ok {
|
|
a.logger.Error("task message missing task_json payload")
|
|
return
|
|
}
|
|
|
|
task, err := orchestration.UnmarshalTaskEvent(taskJSON)
|
|
if err != nil {
|
|
a.logger.Error("failed to unmarshal task event", "err", err)
|
|
return
|
|
}
|
|
|
|
a.logger.Info("handling orchestrated task",
|
|
"task_id", task.TaskID,
|
|
"room", task.TargetRoomID,
|
|
"sender", task.OriginalSender,
|
|
"iteration", task.Iteration,
|
|
)
|
|
|
|
roomID := task.TargetRoomID
|
|
|
|
// Update room context for memory tools
|
|
a.roomCtx.Set(roomID)
|
|
|
|
if a.cfg.Personality.Behavior.TypingIndicator {
|
|
_ = a.sender.SendTyping(ctx, roomID, true)
|
|
defer a.sender.SendTyping(ctx, roomID, false)
|
|
}
|
|
|
|
// Build a synthetic MessageContext from the task
|
|
msgCtx := decision.MessageContext{
|
|
SenderID: task.OriginalSender,
|
|
RoomID: roomID,
|
|
Content: task.OriginalQuestion,
|
|
IsDirectMsg: false,
|
|
IsMention: true, // treat orchestrated tasks like mentions
|
|
}
|
|
|
|
// If there are previous responses, prepend context
|
|
if len(task.PreviousResponses) > 0 {
|
|
var context string
|
|
for _, pr := range task.PreviousResponses {
|
|
context += fmt.Sprintf("[Previous response from %s]: %s\n\n", pr.BotID, pr.Text)
|
|
}
|
|
msgCtx.Content = context + "Original question: " + task.OriginalQuestion +
|
|
"\n\nPlease provide an improved or complementary answer."
|
|
}
|
|
|
|
// Sanitize orchestrated input
|
|
sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID)
|
|
if rejected {
|
|
a.logger.Warn("orchestrated task rejected by sanitizer",
|
|
"task_id", task.TaskID, "sender", task.OriginalSender)
|
|
_ = a.sender.SendMarkdown(ctx, roomID, "El mensaje fue rechazado por el filtro de seguridad.")
|
|
return
|
|
}
|
|
msgCtx.Content = sanitized
|
|
|
|
// Load memory and run LLM
|
|
a.ensureWindowLoaded(ctx, roomID)
|
|
a.appendToWindow(roomID, coretypes.Message{
|
|
Role: coretypes.RoleUser, Content: msgCtx.Content,
|
|
})
|
|
|
|
reply, err := a.runLLM(ctx, msgCtx, roomID, nil)
|
|
|
|
// Build the result to send back via bus
|
|
result := orchestration.TaskResult{
|
|
TaskID: task.TaskID,
|
|
BotID: a.cfg.Agent.ID,
|
|
}
|
|
|
|
if err != nil {
|
|
a.logger.Error("LLM error during orchestrated task", "err", err)
|
|
result.Error = err.Error()
|
|
reply = "Sorry, I encountered an error."
|
|
} else {
|
|
result.Text = reply
|
|
// Persist assistant reply
|
|
a.appendToWindow(roomID, coretypes.Message{
|
|
Role: coretypes.RoleAssistant, Content: reply,
|
|
})
|
|
a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply)
|
|
}
|
|
|
|
// Send reply to Matrix room
|
|
if sendErr := a.sender.SendMarkdown(ctx, roomID, reply); sendErr != nil {
|
|
a.logger.Error("failed to send orchestrated reply to Matrix", "err", sendErr)
|
|
}
|
|
|
|
// Send result back to orchestrator via bus
|
|
resultJSON, marshalErr := orchestration.MarshalTaskResult(result)
|
|
if marshalErr != nil {
|
|
a.logger.Error("failed to marshal task result", "err", marshalErr)
|
|
return
|
|
}
|
|
|
|
replyMsg := bus.AgentMessage{
|
|
From: bus.AgentID(a.cfg.Agent.ID),
|
|
To: msg.From,
|
|
Kind: bus.KindTaskResult,
|
|
Payload: map[string]string{"result_json": resultJSON},
|
|
}
|
|
|
|
if busErr := a.agentBus.Reply(task.TaskID, replyMsg); busErr != nil {
|
|
a.logger.Error("failed to send task result via bus", "err", busErr)
|
|
}
|
|
}
|
|
|
|
// sendReply sends a markdown reply that respects thread context.
|
|
// If threadID is non-empty, the reply is sent as part of that thread.
|
|
func (a *Agent) sendReply(ctx context.Context, roomID, eventID, threadID, markdown string) error {
|
|
if threadID != "" {
|
|
return a.sender.SendThreadMarkdown(ctx, roomID, threadID, eventID, markdown)
|
|
}
|
|
return a.sender.SendReplyMarkdown(ctx, roomID, eventID, markdown)
|
|
}
|
|
|
|
// parseSeverity converts a config string to sanitize.Severity.
|
|
func parseSeverity(s string) sanitize.Severity {
|
|
switch s {
|
|
case "high":
|
|
return sanitize.SeverityHigh
|
|
case "low":
|
|
return sanitize.SeverityLow
|
|
default:
|
|
return sanitize.SeverityMedium
|
|
}
|
|
}
|
|
|
|
// emitAudit writes an audit event if the audit writer is enabled.
|
|
func (a *Agent) emitAudit(evt audit.Event) {
|
|
if a.auditWriter != nil {
|
|
a.auditWriter.Emit(evt)
|
|
}
|
|
}
|
|
|
|
// isStreamingEnabled returns true when the agent uses claude-code provider
|
|
// with streaming and show_tool_progress both enabled.
|
|
func (a *Agent) isStreamingEnabled() bool {
|
|
return a.cfg.LLM.Primary.Provider == "claude-code" &&
|
|
a.cfg.LLM.Primary.ClaudeCode.Streaming &&
|
|
a.cfg.LLM.Primary.ClaudeCode.ShowToolProgress
|
|
}
|
|
|
|
// sanitizeInput runs prompt injection detection on the message content.
|
|
// Returns the (possibly modified) content and true if the message should be rejected.
|
|
func (a *Agent) sanitizeInput(content, roomID, senderID string) (string, bool) {
|
|
if a.sanitizeOpts == nil {
|
|
return content, false
|
|
}
|
|
|
|
result := sanitize.Sanitize(content, *a.sanitizeOpts)
|
|
|
|
for _, w := range result.Warnings {
|
|
a.logger.Warn("prompt_injection_detected",
|
|
"pattern", w.PatternName,
|
|
"severity", w.Severity,
|
|
"matched", w.Matched,
|
|
"sender", senderID,
|
|
"room", roomID,
|
|
)
|
|
}
|
|
|
|
return result.Output, result.Rejected
|
|
}
|