feat: soporte de threads de Matrix (m.thread)

Implementa el soporte completo de threads de Matrix:
- Listener extrae ThreadID de m.relates_to con rel_type=m.thread
- Client.SendThreadMarkdown envia mensajes como parte de un thread
  usando SetThread de mautrix con fallback m.in_reply_to
- Runner detecta ThreadID en ReplyAction y rutea a SendThreadMarkdown
- MatrixSender interfaz actualizada con SendThreadMarkdown
- runtime.go propaga ThreadID en todas las respuestas (comandos, LLM, RBAC)
- sendReply helper centraliza la logica de envio con/sin thread
- Auto-thread: si matrix.threads.auto_thread=true, crea thread nuevo
  para cada conversacion que no esta ya en un thread
- Memoria por thread: usa ThreadID como clave de window cuando el mensaje
  esta en un thread, permitiendo conversaciones paralelas independientes
- Config: matrix.threads.enabled y matrix.threads.auto_thread en ThreadsCfg

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-08 12:50:34 +00:00
parent 893ac74a16
commit 38d11a0b32
5 changed files with 86 additions and 25 deletions
+39 -17
View File
@@ -486,7 +486,7 @@ func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) {
Role: coretypes.RoleUser, Content: msgCtx.Content,
})
reply, err := a.runLLM(ctx, msgCtx)
reply, err := a.runLLM(ctx, msgCtx, roomID)
// Build the result to send back via bus
result := orchestration.TaskResult{
@@ -571,13 +571,13 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext,
// RBAC check for commands
if !a.acl.CanDo(msgCtx.SenderID, "command:"+cmdName) {
a.logger.Info("command_denied", "command", cmdName, "sender", msgCtx.SenderID)
_ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID,
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
"No tienes permisos para ejecutar este comando.")
return
}
a.logger.Info("command_executed", "command", cmdName)
reply := handler(ctx, msgCtx)
_ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID, reply)
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, reply)
return
}
@@ -591,7 +591,7 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext,
} else {
// Unknown command — never falls through to rules or LLM
a.logger.Info("command_unknown", "command", msgCtx.Command)
_ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID,
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
fmt.Sprintf("Comando desconocido: `!%s`. Usa `!help` para ver comandos disponibles.", msgCtx.Command))
return
}
@@ -601,7 +601,7 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext,
// RBAC check for LLM access ("ask" action)
if !a.acl.CanDo(msgCtx.SenderID, "ask") {
a.logger.Info("ask_denied", "sender", msgCtx.SenderID)
_ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID,
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
"No tienes permisos para interactuar con este agente.")
return
}
@@ -636,17 +636,30 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext,
// executeActions expands LLM actions and runs the effects runner.
func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decision.MessageContext, actions []decision.Action) {
// Auto-thread: if configured and message is not already in a thread,
// start a new thread rooted at the user's message.
if a.cfg.Matrix.Threads.AutoThread && msgCtx.ThreadID == "" && msgCtx.EventID != "" {
msgCtx.ThreadID = msgCtx.EventID
}
// Sanitize user input before sending to LLM
sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID)
if rejected {
a.runner.Execute(ctx, roomID, []decision.Action{{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID},
Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
}})
return
}
msgCtx.Content = sanitized
// Resolve memory key: use thread root as context key when inside a thread,
// so parallel threads in the same room have independent conversation windows.
memKey := roomID
if msgCtx.ThreadID != "" {
memKey = msgCtx.ThreadID
}
expanded := make([]decision.Action, 0, len(actions))
for _, act := range actions {
if act.Kind == decision.ActionKindLLM {
@@ -654,35 +667,35 @@ func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decisi
a.logger.Warn("LLM action requested but no LLM configured")
expanded = append(expanded, decision.Action{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID},
Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
})
continue
}
// Memory: load window + append user message before LLM call
a.ensureWindowLoaded(ctx, roomID)
a.appendToWindow(roomID, coretypes.Message{
a.ensureWindowLoaded(ctx, memKey)
a.appendToWindow(memKey, coretypes.Message{
Role: coretypes.RoleUser, Content: msgCtx.Content,
})
a.persistMessage(ctx, roomID, coretypes.RoleUser, msgCtx.Content)
a.persistMessage(ctx, memKey, coretypes.RoleUser, msgCtx.Content)
reply, err := a.runLLM(ctx, msgCtx)
reply, err := a.runLLM(ctx, msgCtx, memKey)
if err != nil {
a.logger.Error("llm error", "err", err)
expanded = append(expanded, decision.Action{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID},
Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
})
} else {
expanded = append(expanded, decision.Action{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID},
Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
})
// Memory: append assistant reply after LLM call
a.appendToWindow(roomID, coretypes.Message{
a.appendToWindow(memKey, coretypes.Message{
Role: coretypes.RoleAssistant, Content: reply,
})
a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply)
a.persistMessage(ctx, memKey, coretypes.RoleAssistant, reply)
}
} else {
expanded = append(expanded, act)
@@ -692,7 +705,7 @@ func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decisi
a.runner.Execute(ctx, roomID, expanded)
}
func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (string, error) {
func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext, memKey string) (string, error) {
a.logger.Debug("calling LLM",
"model", a.cfg.LLM.Primary.Model,
"provider", a.cfg.LLM.Primary.Provider,
@@ -702,7 +715,7 @@ func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (str
systemPrompt := a.cfg.Agent.Description
// Build messages: conversation history from window (includes current user msg)
messages := a.getWindowMessages(msgCtx.RoomID)
messages := a.getWindowMessages(memKey)
if len(messages) == 0 {
// Fallback if memory is disabled: just the current message
messages = []coretypes.Message{
@@ -873,6 +886,15 @@ func (a *Agent) persistMessage(ctx context.Context, roomID string, role coretype
}
}
// sendReply sends a markdown reply that respects thread context.
// If threadID is non-empty, the reply is sent as part of that thread.
func (a *Agent) sendReply(ctx context.Context, roomID, eventID, threadID, markdown string) error {
if threadID != "" {
return a.matrix.SendThreadMarkdown(ctx, roomID, threadID, eventID, markdown)
}
return a.matrix.SendReplyMarkdown(ctx, roomID, eventID, markdown)
}
// parseSeverity converts a config string to sanitize.Severity.
func parseSeverity(s string) sanitize.Severity {
switch s {
+7
View File
@@ -198,6 +198,13 @@ type MatrixCfg struct {
Encryption EncryptionCfg `yaml:"encryption"`
Rooms RoomsCfg `yaml:"rooms"`
Filters FiltersCfg `yaml:"filters"`
Threads ThreadsCfg `yaml:"threads"`
}
// ThreadsCfg controls Matrix thread support (m.thread).
type ThreadsCfg struct {
Enabled bool `yaml:"enabled"` // respond in threads when message is in a thread (default true)
AutoThread bool `yaml:"auto_thread"` // auto-create a thread for each new conversation (default false)
}
type EncryptionCfg struct {
+9 -8
View File
@@ -24,6 +24,7 @@ type MatrixSender interface {
SendText(ctx context.Context, roomID, text string) error
SendMarkdown(ctx context.Context, roomID, markdown string) error
SendReplyMarkdown(ctx context.Context, roomID, inReplyTo, markdown string) error
SendThreadMarkdown(ctx context.Context, roomID, threadRootID, inReplyTo, markdown string) error
SendTyping(ctx context.Context, roomID string, typing bool) error
}
@@ -63,15 +64,15 @@ func (r *Runner) executeOne(ctx context.Context, roomID string, a decision.Actio
if a.Reply == nil {
return Result{Action: a, Err: fmt.Errorf("nil reply action")}
}
target := roomID
if a.Reply.ThreadID != "" {
target = a.Reply.ThreadID
}
var err error
if a.Reply.InReplyTo != "" {
err = r.matrix.SendReplyMarkdown(ctx, target, a.Reply.InReplyTo, a.Reply.Content)
} else {
err = r.matrix.SendMarkdown(ctx, target, a.Reply.Content)
switch {
case a.Reply.ThreadID != "":
// Thread reply: send as part of the thread with fallback in_reply_to
err = r.matrix.SendThreadMarkdown(ctx, roomID, a.Reply.ThreadID, a.Reply.InReplyTo, a.Reply.Content)
case a.Reply.InReplyTo != "":
err = r.matrix.SendReplyMarkdown(ctx, roomID, a.Reply.InReplyTo, a.Reply.Content)
default:
err = r.matrix.SendMarkdown(ctx, roomID, a.Reply.Content)
}
return Result{Action: a, Output: a.Reply.Content, Err: err}
+20
View File
@@ -320,6 +320,26 @@ func (c *Client) SendReplyMarkdown(ctx context.Context, roomID, inReplyTo, markd
return err
}
// SendThreadMarkdown sends a formatted message as part of a Matrix thread.
// threadRootID is the event that started the thread (always the same for all messages in a thread).
// inReplyTo is the specific event being replied to within the thread (used as fallback for non-thread clients).
// If inReplyTo is empty, it defaults to threadRootID.
func (c *Client) SendThreadMarkdown(ctx context.Context, roomID, threadRootID, inReplyTo, markdown string) error {
if inReplyTo == "" {
inReplyTo = threadRootID
}
html := mdToHTML(markdown)
content := event.MessageEventContent{
MsgType: event.MsgText,
Body: markdown,
Format: event.FormatHTML,
FormattedBody: html,
RelatesTo: (&event.RelatesTo{}).SetThread(id.EventID(threadRootID), id.EventID(inReplyTo)),
}
_, err := c.raw.SendMessageEvent(ctx, id.RoomID(roomID), event.EventMessage, content)
return err
}
// SendReaction sends a reaction to an event.
func (c *Client) SendReaction(ctx context.Context, roomID, eventID, reaction string) error {
_, err := c.raw.SendReaction(ctx, id.RoomID(roomID), id.EventID(eventID), reaction)
+11
View File
@@ -153,6 +153,17 @@ func (l *Listener) Run(ctx context.Context) error {
)
msgCtx.EventID = evt.ID.String()
// Extract thread root from m.relates_to (Matrix thread support).
if l.cfg.Threads.Enabled {
if relatesTo, ok := evt.Content.Raw["m.relates_to"].(map[string]any); ok {
if relType, _ := relatesTo["rel_type"].(string); relType == "m.thread" {
if threadRoot, _ := relatesTo["event_id"].(string); threadRoot != "" {
msgCtx.ThreadID = threadRoot
}
}
}
}
l.logger.Debug("message parsed",
"sender", msgCtx.SenderID,
"room", msgCtx.RoomID,