From 38d11a0b32996ec9c9b6eb768f04f2cebc1d1514 Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Sun, 8 Mar 2026 12:50:34 +0000 Subject: [PATCH] feat: soporte de threads de Matrix (m.thread) Implementa el soporte completo de threads de Matrix: - Listener extrae ThreadID de m.relates_to con rel_type=m.thread - Client.SendThreadMarkdown envia mensajes como parte de un thread usando SetThread de mautrix con fallback m.in_reply_to - Runner detecta ThreadID en ReplyAction y rutea a SendThreadMarkdown - MatrixSender interfaz actualizada con SendThreadMarkdown - runtime.go propaga ThreadID en todas las respuestas (comandos, LLM, RBAC) - sendReply helper centraliza la logica de envio con/sin thread - Auto-thread: si matrix.threads.auto_thread=true, crea thread nuevo para cada conversacion que no esta ya en un thread - Memoria por thread: usa ThreadID como clave de window cuando el mensaje esta en un thread, permitiendo conversaciones paralelas independientes - Config: matrix.threads.enabled y matrix.threads.auto_thread en ThreadsCfg Co-Authored-By: Claude Opus 4.6 --- agents/runtime.go | 56 +++++++++++++++++++++++++++------------ internal/config/schema.go | 7 +++++ shell/effects/runner.go | 17 ++++++------ shell/matrix/client.go | 20 ++++++++++++++ shell/matrix/listener.go | 11 ++++++++ 5 files changed, 86 insertions(+), 25 deletions(-) diff --git a/agents/runtime.go b/agents/runtime.go index 4739e7f..01b0dbe 100644 --- a/agents/runtime.go +++ b/agents/runtime.go @@ -486,7 +486,7 @@ func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) { Role: coretypes.RoleUser, Content: msgCtx.Content, }) - reply, err := a.runLLM(ctx, msgCtx) + reply, err := a.runLLM(ctx, msgCtx, roomID) // Build the result to send back via bus result := orchestration.TaskResult{ @@ -571,13 +571,13 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, // RBAC check for commands if !a.acl.CanDo(msgCtx.SenderID, "command:"+cmdName) { a.logger.Info("command_denied", "command", cmdName, "sender", msgCtx.SenderID) - _ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID, + _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, "No tienes permisos para ejecutar este comando.") return } a.logger.Info("command_executed", "command", cmdName) reply := handler(ctx, msgCtx) - _ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID, reply) + _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, reply) return } @@ -591,7 +591,7 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, } else { // Unknown command — never falls through to rules or LLM a.logger.Info("command_unknown", "command", msgCtx.Command) - _ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID, + _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, fmt.Sprintf("Comando desconocido: `!%s`. Usa `!help` para ver comandos disponibles.", msgCtx.Command)) return } @@ -601,7 +601,7 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, // RBAC check for LLM access ("ask" action) if !a.acl.CanDo(msgCtx.SenderID, "ask") { a.logger.Info("ask_denied", "sender", msgCtx.SenderID) - _ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID, + _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, "No tienes permisos para interactuar con este agente.") return } @@ -636,17 +636,30 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, // executeActions expands LLM actions and runs the effects runner. func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decision.MessageContext, actions []decision.Action) { + // Auto-thread: if configured and message is not already in a thread, + // start a new thread rooted at the user's message. + if a.cfg.Matrix.Threads.AutoThread && msgCtx.ThreadID == "" && msgCtx.EventID != "" { + msgCtx.ThreadID = msgCtx.EventID + } + // Sanitize user input before sending to LLM sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID) if rejected { a.runner.Execute(ctx, roomID, []decision.Action{{ Kind: decision.ActionKindReply, - Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID}, + Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, }}) return } msgCtx.Content = sanitized + // Resolve memory key: use thread root as context key when inside a thread, + // so parallel threads in the same room have independent conversation windows. + memKey := roomID + if msgCtx.ThreadID != "" { + memKey = msgCtx.ThreadID + } + expanded := make([]decision.Action, 0, len(actions)) for _, act := range actions { if act.Kind == decision.ActionKindLLM { @@ -654,35 +667,35 @@ func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decisi a.logger.Warn("LLM action requested but no LLM configured") expanded = append(expanded, decision.Action{ Kind: decision.ActionKindReply, - Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID}, + Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, }) continue } // Memory: load window + append user message before LLM call - a.ensureWindowLoaded(ctx, roomID) - a.appendToWindow(roomID, coretypes.Message{ + a.ensureWindowLoaded(ctx, memKey) + a.appendToWindow(memKey, coretypes.Message{ Role: coretypes.RoleUser, Content: msgCtx.Content, }) - a.persistMessage(ctx, roomID, coretypes.RoleUser, msgCtx.Content) + a.persistMessage(ctx, memKey, coretypes.RoleUser, msgCtx.Content) - reply, err := a.runLLM(ctx, msgCtx) + reply, err := a.runLLM(ctx, msgCtx, memKey) if err != nil { a.logger.Error("llm error", "err", err) expanded = append(expanded, decision.Action{ Kind: decision.ActionKindReply, - Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID}, + Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, }) } else { expanded = append(expanded, decision.Action{ Kind: decision.ActionKindReply, - Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID}, + Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, }) // Memory: append assistant reply after LLM call - a.appendToWindow(roomID, coretypes.Message{ + a.appendToWindow(memKey, coretypes.Message{ Role: coretypes.RoleAssistant, Content: reply, }) - a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply) + a.persistMessage(ctx, memKey, coretypes.RoleAssistant, reply) } } else { expanded = append(expanded, act) @@ -692,7 +705,7 @@ func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decisi a.runner.Execute(ctx, roomID, expanded) } -func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (string, error) { +func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext, memKey string) (string, error) { a.logger.Debug("calling LLM", "model", a.cfg.LLM.Primary.Model, "provider", a.cfg.LLM.Primary.Provider, @@ -702,7 +715,7 @@ func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (str systemPrompt := a.cfg.Agent.Description // Build messages: conversation history from window (includes current user msg) - messages := a.getWindowMessages(msgCtx.RoomID) + messages := a.getWindowMessages(memKey) if len(messages) == 0 { // Fallback if memory is disabled: just the current message messages = []coretypes.Message{ @@ -873,6 +886,15 @@ func (a *Agent) persistMessage(ctx context.Context, roomID string, role coretype } } +// sendReply sends a markdown reply that respects thread context. +// If threadID is non-empty, the reply is sent as part of that thread. +func (a *Agent) sendReply(ctx context.Context, roomID, eventID, threadID, markdown string) error { + if threadID != "" { + return a.matrix.SendThreadMarkdown(ctx, roomID, threadID, eventID, markdown) + } + return a.matrix.SendReplyMarkdown(ctx, roomID, eventID, markdown) +} + // parseSeverity converts a config string to sanitize.Severity. func parseSeverity(s string) sanitize.Severity { switch s { diff --git a/internal/config/schema.go b/internal/config/schema.go index cd05d9f..b461d1c 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -198,6 +198,13 @@ type MatrixCfg struct { Encryption EncryptionCfg `yaml:"encryption"` Rooms RoomsCfg `yaml:"rooms"` Filters FiltersCfg `yaml:"filters"` + Threads ThreadsCfg `yaml:"threads"` +} + +// ThreadsCfg controls Matrix thread support (m.thread). +type ThreadsCfg struct { + Enabled bool `yaml:"enabled"` // respond in threads when message is in a thread (default true) + AutoThread bool `yaml:"auto_thread"` // auto-create a thread for each new conversation (default false) } type EncryptionCfg struct { diff --git a/shell/effects/runner.go b/shell/effects/runner.go index 9b5f154..5788df6 100644 --- a/shell/effects/runner.go +++ b/shell/effects/runner.go @@ -24,6 +24,7 @@ type MatrixSender interface { SendText(ctx context.Context, roomID, text string) error SendMarkdown(ctx context.Context, roomID, markdown string) error SendReplyMarkdown(ctx context.Context, roomID, inReplyTo, markdown string) error + SendThreadMarkdown(ctx context.Context, roomID, threadRootID, inReplyTo, markdown string) error SendTyping(ctx context.Context, roomID string, typing bool) error } @@ -63,15 +64,15 @@ func (r *Runner) executeOne(ctx context.Context, roomID string, a decision.Actio if a.Reply == nil { return Result{Action: a, Err: fmt.Errorf("nil reply action")} } - target := roomID - if a.Reply.ThreadID != "" { - target = a.Reply.ThreadID - } var err error - if a.Reply.InReplyTo != "" { - err = r.matrix.SendReplyMarkdown(ctx, target, a.Reply.InReplyTo, a.Reply.Content) - } else { - err = r.matrix.SendMarkdown(ctx, target, a.Reply.Content) + switch { + case a.Reply.ThreadID != "": + // Thread reply: send as part of the thread with fallback in_reply_to + err = r.matrix.SendThreadMarkdown(ctx, roomID, a.Reply.ThreadID, a.Reply.InReplyTo, a.Reply.Content) + case a.Reply.InReplyTo != "": + err = r.matrix.SendReplyMarkdown(ctx, roomID, a.Reply.InReplyTo, a.Reply.Content) + default: + err = r.matrix.SendMarkdown(ctx, roomID, a.Reply.Content) } return Result{Action: a, Output: a.Reply.Content, Err: err} diff --git a/shell/matrix/client.go b/shell/matrix/client.go index 68229f4..7155d49 100644 --- a/shell/matrix/client.go +++ b/shell/matrix/client.go @@ -320,6 +320,26 @@ func (c *Client) SendReplyMarkdown(ctx context.Context, roomID, inReplyTo, markd return err } +// SendThreadMarkdown sends a formatted message as part of a Matrix thread. +// threadRootID is the event that started the thread (always the same for all messages in a thread). +// inReplyTo is the specific event being replied to within the thread (used as fallback for non-thread clients). +// If inReplyTo is empty, it defaults to threadRootID. +func (c *Client) SendThreadMarkdown(ctx context.Context, roomID, threadRootID, inReplyTo, markdown string) error { + if inReplyTo == "" { + inReplyTo = threadRootID + } + html := mdToHTML(markdown) + content := event.MessageEventContent{ + MsgType: event.MsgText, + Body: markdown, + Format: event.FormatHTML, + FormattedBody: html, + RelatesTo: (&event.RelatesTo{}).SetThread(id.EventID(threadRootID), id.EventID(inReplyTo)), + } + _, err := c.raw.SendMessageEvent(ctx, id.RoomID(roomID), event.EventMessage, content) + return err +} + // SendReaction sends a reaction to an event. func (c *Client) SendReaction(ctx context.Context, roomID, eventID, reaction string) error { _, err := c.raw.SendReaction(ctx, id.RoomID(roomID), id.EventID(eventID), reaction) diff --git a/shell/matrix/listener.go b/shell/matrix/listener.go index 5b585cf..5486514 100644 --- a/shell/matrix/listener.go +++ b/shell/matrix/listener.go @@ -153,6 +153,17 @@ func (l *Listener) Run(ctx context.Context) error { ) msgCtx.EventID = evt.ID.String() + // Extract thread root from m.relates_to (Matrix thread support). + if l.cfg.Threads.Enabled { + if relatesTo, ok := evt.Content.Raw["m.relates_to"].(map[string]any); ok { + if relType, _ := relatesTo["rel_type"].(string); relType == "m.thread" { + if threadRoot, _ := relatesTo["event_id"].(string); threadRoot != "" { + msgCtx.ThreadID = threadRoot + } + } + } + } + l.logger.Debug("message parsed", "sender", msgCtx.SenderID, "room", msgCtx.RoomID,