merge: issue/0012-threads — soporte de threads de Matrix

This commit is contained in:
2026-03-08 12:51:20 +00:00
10 changed files with 420 additions and 26 deletions
+39 -17
View File
@@ -486,7 +486,7 @@ func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) {
Role: coretypes.RoleUser, Content: msgCtx.Content,
})
reply, err := a.runLLM(ctx, msgCtx)
reply, err := a.runLLM(ctx, msgCtx, roomID)
// Build the result to send back via bus
result := orchestration.TaskResult{
@@ -571,13 +571,13 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext,
// RBAC check for commands
if !a.acl.CanDo(msgCtx.SenderID, "command:"+cmdName) {
a.logger.Info("command_denied", "command", cmdName, "sender", msgCtx.SenderID)
_ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID,
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
"No tienes permisos para ejecutar este comando.")
return
}
a.logger.Info("command_executed", "command", cmdName)
reply := handler(ctx, msgCtx)
_ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID, reply)
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, reply)
return
}
@@ -591,7 +591,7 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext,
} else {
// Unknown command — never falls through to rules or LLM
a.logger.Info("command_unknown", "command", msgCtx.Command)
_ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID,
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
fmt.Sprintf("Comando desconocido: `!%s`. Usa `!help` para ver comandos disponibles.", msgCtx.Command))
return
}
@@ -601,7 +601,7 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext,
// RBAC check for LLM access ("ask" action)
if !a.acl.CanDo(msgCtx.SenderID, "ask") {
a.logger.Info("ask_denied", "sender", msgCtx.SenderID)
_ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID,
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
"No tienes permisos para interactuar con este agente.")
return
}
@@ -636,17 +636,30 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext,
// executeActions expands LLM actions and runs the effects runner.
func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decision.MessageContext, actions []decision.Action) {
// Auto-thread: if configured and message is not already in a thread,
// start a new thread rooted at the user's message.
if a.cfg.Matrix.Threads.AutoThread && msgCtx.ThreadID == "" && msgCtx.EventID != "" {
msgCtx.ThreadID = msgCtx.EventID
}
// Sanitize user input before sending to LLM
sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID)
if rejected {
a.runner.Execute(ctx, roomID, []decision.Action{{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID},
Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
}})
return
}
msgCtx.Content = sanitized
// Resolve memory key: use thread root as context key when inside a thread,
// so parallel threads in the same room have independent conversation windows.
memKey := roomID
if msgCtx.ThreadID != "" {
memKey = msgCtx.ThreadID
}
expanded := make([]decision.Action, 0, len(actions))
for _, act := range actions {
if act.Kind == decision.ActionKindLLM {
@@ -654,35 +667,35 @@ func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decisi
a.logger.Warn("LLM action requested but no LLM configured")
expanded = append(expanded, decision.Action{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID},
Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
})
continue
}
// Memory: load window + append user message before LLM call
a.ensureWindowLoaded(ctx, roomID)
a.appendToWindow(roomID, coretypes.Message{
a.ensureWindowLoaded(ctx, memKey)
a.appendToWindow(memKey, coretypes.Message{
Role: coretypes.RoleUser, Content: msgCtx.Content,
})
a.persistMessage(ctx, roomID, coretypes.RoleUser, msgCtx.Content)
a.persistMessage(ctx, memKey, coretypes.RoleUser, msgCtx.Content)
reply, err := a.runLLM(ctx, msgCtx)
reply, err := a.runLLM(ctx, msgCtx, memKey)
if err != nil {
a.logger.Error("llm error", "err", err)
expanded = append(expanded, decision.Action{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID},
Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
})
} else {
expanded = append(expanded, decision.Action{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID},
Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
})
// Memory: append assistant reply after LLM call
a.appendToWindow(roomID, coretypes.Message{
a.appendToWindow(memKey, coretypes.Message{
Role: coretypes.RoleAssistant, Content: reply,
})
a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply)
a.persistMessage(ctx, memKey, coretypes.RoleAssistant, reply)
}
} else {
expanded = append(expanded, act)
@@ -692,7 +705,7 @@ func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decisi
a.runner.Execute(ctx, roomID, expanded)
}
func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (string, error) {
func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext, memKey string) (string, error) {
a.logger.Debug("calling LLM",
"model", a.cfg.LLM.Primary.Model,
"provider", a.cfg.LLM.Primary.Provider,
@@ -702,7 +715,7 @@ func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (str
systemPrompt := a.cfg.Agent.Description
// Build messages: conversation history from window (includes current user msg)
messages := a.getWindowMessages(msgCtx.RoomID)
messages := a.getWindowMessages(memKey)
if len(messages) == 0 {
// Fallback if memory is disabled: just the current message
messages = []coretypes.Message{
@@ -873,6 +886,15 @@ func (a *Agent) persistMessage(ctx context.Context, roomID string, role coretype
}
}
// sendReply sends a markdown reply that respects thread context.
// If threadID is non-empty, the reply is sent as part of that thread.
func (a *Agent) sendReply(ctx context.Context, roomID, eventID, threadID, markdown string) error {
if threadID != "" {
return a.matrix.SendThreadMarkdown(ctx, roomID, threadID, eventID, markdown)
}
return a.matrix.SendReplyMarkdown(ctx, roomID, eventID, markdown)
}
// parseSeverity converts a config string to sanitize.Severity.
func parseSeverity(s string) sanitize.Severity {
switch s {
+1 -1
View File
@@ -16,7 +16,7 @@ afectados y notas de implementacion.
| 9 | Command system | [0009-command_system.md](completed/0009-command_system.md) | completado |
| 10 | Access control | [0010-access-control.md](completed/0010-access-control.md) | completado |
| 11 | Markdown rendering | [0011-markdown-rendering.md](completed/0011-markdown-rendering.md) | completado |
| 12 | Threads | [0012-threads.md](0012-threads.md) | pendiente |
| 12 | Threads | [0012-threads.md](completed/0012-threads.md) | completado |
| 13 | Hot reload | [0013-hot-reload.md](0013-hot-reload.md) | pendiente |
| 14 | Template agent standardize | [0014-template-agent-standardize.md](0014-template-agent-standardize.md) | pendiente |
| 15 | Multi-platform Telegram | [0015-multi-platform-telegram.md](0015-multi-platform-telegram.md) | pendiente |
+7
View File
@@ -198,6 +198,13 @@ type MatrixCfg struct {
Encryption EncryptionCfg `yaml:"encryption"`
Rooms RoomsCfg `yaml:"rooms"`
Filters FiltersCfg `yaml:"filters"`
Threads ThreadsCfg `yaml:"threads"`
}
// ThreadsCfg controls Matrix thread support (m.thread).
type ThreadsCfg struct {
Enabled bool `yaml:"enabled"` // respond in threads when message is in a thread (default true)
AutoThread bool `yaml:"auto_thread"` // auto-create a thread for each new conversation (default false)
}
type EncryptionCfg struct {
+9 -8
View File
@@ -24,6 +24,7 @@ type MatrixSender interface {
SendText(ctx context.Context, roomID, text string) error
SendMarkdown(ctx context.Context, roomID, markdown string) error
SendReplyMarkdown(ctx context.Context, roomID, inReplyTo, markdown string) error
SendThreadMarkdown(ctx context.Context, roomID, threadRootID, inReplyTo, markdown string) error
SendTyping(ctx context.Context, roomID string, typing bool) error
}
@@ -63,15 +64,15 @@ func (r *Runner) executeOne(ctx context.Context, roomID string, a decision.Actio
if a.Reply == nil {
return Result{Action: a, Err: fmt.Errorf("nil reply action")}
}
target := roomID
if a.Reply.ThreadID != "" {
target = a.Reply.ThreadID
}
var err error
if a.Reply.InReplyTo != "" {
err = r.matrix.SendReplyMarkdown(ctx, target, a.Reply.InReplyTo, a.Reply.Content)
} else {
err = r.matrix.SendMarkdown(ctx, target, a.Reply.Content)
switch {
case a.Reply.ThreadID != "":
// Thread reply: send as part of the thread with fallback in_reply_to
err = r.matrix.SendThreadMarkdown(ctx, roomID, a.Reply.ThreadID, a.Reply.InReplyTo, a.Reply.Content)
case a.Reply.InReplyTo != "":
err = r.matrix.SendReplyMarkdown(ctx, roomID, a.Reply.InReplyTo, a.Reply.Content)
default:
err = r.matrix.SendMarkdown(ctx, roomID, a.Reply.Content)
}
return Result{Action: a, Output: a.Reply.Content, Err: err}
+172
View File
@@ -0,0 +1,172 @@
package effects
import (
"context"
"log/slog"
"testing"
"github.com/enmanuel/agents/pkg/decision"
)
// fakeMatrixSender records calls for assertions.
type fakeMatrixSender struct {
calls []senderCall
}
type senderCall struct {
method string
roomID string
threadRootID string
inReplyTo string
markdown string
}
func (f *fakeMatrixSender) SendText(ctx context.Context, roomID, text string) error {
f.calls = append(f.calls, senderCall{method: "SendText", roomID: roomID, markdown: text})
return nil
}
func (f *fakeMatrixSender) SendMarkdown(ctx context.Context, roomID, markdown string) error {
f.calls = append(f.calls, senderCall{method: "SendMarkdown", roomID: roomID, markdown: markdown})
return nil
}
func (f *fakeMatrixSender) SendReplyMarkdown(ctx context.Context, roomID, inReplyTo, markdown string) error {
f.calls = append(f.calls, senderCall{method: "SendReplyMarkdown", roomID: roomID, inReplyTo: inReplyTo, markdown: markdown})
return nil
}
func (f *fakeMatrixSender) SendThreadMarkdown(ctx context.Context, roomID, threadRootID, inReplyTo, markdown string) error {
f.calls = append(f.calls, senderCall{method: "SendThreadMarkdown", roomID: roomID, threadRootID: threadRootID, inReplyTo: inReplyTo, markdown: markdown})
return nil
}
func (f *fakeMatrixSender) SendTyping(ctx context.Context, roomID string, typing bool) error {
return nil
}
func TestExecuteReply_PlainMarkdown(t *testing.T) {
sender := &fakeMatrixSender{}
runner := NewRunner(sender, nil, slog.Default())
actions := []decision.Action{{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: "hello"},
}}
results := runner.Execute(context.Background(), "!room:test", actions)
if len(results) != 1 || results[0].Err != nil {
t.Fatalf("unexpected results: %+v", results)
}
if len(sender.calls) != 1 {
t.Fatalf("expected 1 call, got %d", len(sender.calls))
}
c := sender.calls[0]
if c.method != "SendMarkdown" {
t.Errorf("expected SendMarkdown, got %s", c.method)
}
if c.roomID != "!room:test" {
t.Errorf("expected room !room:test, got %s", c.roomID)
}
}
func TestExecuteReply_WithInReplyTo(t *testing.T) {
sender := &fakeMatrixSender{}
runner := NewRunner(sender, nil, slog.Default())
actions := []decision.Action{{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: "hello", InReplyTo: "$evt1"},
}}
runner.Execute(context.Background(), "!room:test", actions)
if len(sender.calls) != 1 {
t.Fatalf("expected 1 call, got %d", len(sender.calls))
}
c := sender.calls[0]
if c.method != "SendReplyMarkdown" {
t.Errorf("expected SendReplyMarkdown, got %s", c.method)
}
if c.inReplyTo != "$evt1" {
t.Errorf("expected inReplyTo=$evt1, got %s", c.inReplyTo)
}
}
func TestExecuteReply_WithThread(t *testing.T) {
sender := &fakeMatrixSender{}
runner := NewRunner(sender, nil, slog.Default())
actions := []decision.Action{{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{
Content: "thread reply",
ThreadID: "$root",
InReplyTo: "$evt2",
},
}}
runner.Execute(context.Background(), "!room:test", actions)
if len(sender.calls) != 1 {
t.Fatalf("expected 1 call, got %d", len(sender.calls))
}
c := sender.calls[0]
if c.method != "SendThreadMarkdown" {
t.Errorf("expected SendThreadMarkdown, got %s", c.method)
}
if c.threadRootID != "$root" {
t.Errorf("expected threadRootID=$root, got %s", c.threadRootID)
}
if c.inReplyTo != "$evt2" {
t.Errorf("expected inReplyTo=$evt2, got %s", c.inReplyTo)
}
if c.roomID != "!room:test" {
t.Errorf("expected room !room:test, got %s", c.roomID)
}
}
func TestExecuteReply_ThreadWithoutInReplyTo(t *testing.T) {
sender := &fakeMatrixSender{}
runner := NewRunner(sender, nil, slog.Default())
actions := []decision.Action{{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{
Content: "thread reply no fallback",
ThreadID: "$root",
},
}}
runner.Execute(context.Background(), "!room:test", actions)
if len(sender.calls) != 1 {
t.Fatalf("expected 1 call, got %d", len(sender.calls))
}
c := sender.calls[0]
if c.method != "SendThreadMarkdown" {
t.Errorf("expected SendThreadMarkdown, got %s", c.method)
}
// inReplyTo should be empty; SendThreadMarkdown will default to threadRootID
if c.inReplyTo != "" {
t.Errorf("expected empty inReplyTo, got %s", c.inReplyTo)
}
}
func TestExecuteReply_NilReply(t *testing.T) {
sender := &fakeMatrixSender{}
runner := NewRunner(sender, nil, slog.Default())
actions := []decision.Action{{
Kind: decision.ActionKindReply,
Reply: nil,
}}
results := runner.Execute(context.Background(), "!room:test", actions)
if len(results) != 1 {
t.Fatalf("expected 1 result, got %d", len(results))
}
if results[0].Err == nil {
t.Error("expected error for nil reply")
}
}
+20
View File
@@ -320,6 +320,26 @@ func (c *Client) SendReplyMarkdown(ctx context.Context, roomID, inReplyTo, markd
return err
}
// SendThreadMarkdown sends a formatted message as part of a Matrix thread.
// threadRootID is the event that started the thread (always the same for all messages in a thread).
// inReplyTo is the specific event being replied to within the thread (used as fallback for non-thread clients).
// If inReplyTo is empty, it defaults to threadRootID.
func (c *Client) SendThreadMarkdown(ctx context.Context, roomID, threadRootID, inReplyTo, markdown string) error {
if inReplyTo == "" {
inReplyTo = threadRootID
}
html := mdToHTML(markdown)
content := event.MessageEventContent{
MsgType: event.MsgText,
Body: markdown,
Format: event.FormatHTML,
FormattedBody: html,
RelatesTo: (&event.RelatesTo{}).SetThread(id.EventID(threadRootID), id.EventID(inReplyTo)),
}
_, err := c.raw.SendMessageEvent(ctx, id.RoomID(roomID), event.EventMessage, content)
return err
}
// SendReaction sends a reaction to an event.
func (c *Client) SendReaction(ctx context.Context, roomID, eventID, reaction string) error {
_, err := c.raw.SendReaction(ctx, id.RoomID(roomID), id.EventID(eventID), reaction)
+11
View File
@@ -153,6 +153,17 @@ func (l *Listener) Run(ctx context.Context) error {
)
msgCtx.EventID = evt.ID.String()
// Extract thread root from m.relates_to (Matrix thread support).
if l.cfg.Threads.Enabled {
if relatesTo, ok := evt.Content.Raw["m.relates_to"].(map[string]any); ok {
if relType, _ := relatesTo["rel_type"].(string); relType == "m.thread" {
if threadRoot, _ := relatesTo["event_id"].(string); threadRoot != "" {
msgCtx.ThreadID = threadRoot
}
}
}
}
l.logger.Debug("message parsed",
"sender", msgCtx.SenderID,
"room", msgCtx.RoomID,
+66
View File
@@ -0,0 +1,66 @@
package matrix
import (
"encoding/json"
"testing"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
)
// TestThreadRelatesToStructure verifies the JSON structure produced by SetThread
// matches the Matrix spec for m.thread events.
func TestThreadRelatesToStructure(t *testing.T) {
rel := (&event.RelatesTo{}).SetThread(id.EventID("$root"), id.EventID("$last"))
data, err := json.Marshal(rel)
if err != nil {
t.Fatalf("marshal error: %v", err)
}
var m map[string]any
if err := json.Unmarshal(data, &m); err != nil {
t.Fatalf("unmarshal error: %v", err)
}
if m["rel_type"] != "m.thread" {
t.Errorf("expected rel_type=m.thread, got %v", m["rel_type"])
}
if m["event_id"] != "$root" {
t.Errorf("expected event_id=$root, got %v", m["event_id"])
}
if m["is_falling_back"] != true {
t.Errorf("expected is_falling_back=true, got %v", m["is_falling_back"])
}
inReplyTo, ok := m["m.in_reply_to"].(map[string]any)
if !ok {
t.Fatal("expected m.in_reply_to to be an object")
}
if inReplyTo["event_id"] != "$last" {
t.Errorf("expected m.in_reply_to.event_id=$last, got %v", inReplyTo["event_id"])
}
}
// TestThreadRelatesToStructure_SameRootAndFallback verifies that when root == fallback,
// the structure is still correct.
func TestThreadRelatesToStructure_SameRootAndFallback(t *testing.T) {
rel := (&event.RelatesTo{}).SetThread(id.EventID("$root"), id.EventID("$root"))
data, err := json.Marshal(rel)
if err != nil {
t.Fatalf("marshal error: %v", err)
}
var m map[string]any
if err := json.Unmarshal(data, &m); err != nil {
t.Fatalf("unmarshal error: %v", err)
}
if m["rel_type"] != "m.thread" {
t.Errorf("expected rel_type=m.thread, got %v", m["rel_type"])
}
if m["event_id"] != "$root" {
t.Errorf("expected event_id=$root, got %v", m["event_id"])
}
}
+95
View File
@@ -0,0 +1,95 @@
package matrix
import (
"testing"
)
// extractThreadID is the pure logic extracted from the listener for testability.
// It mirrors the extraction in listener.go's EventMessage handler.
func extractThreadID(rawContent map[string]any) string {
relatesTo, ok := rawContent["m.relates_to"].(map[string]any)
if !ok {
return ""
}
relType, _ := relatesTo["rel_type"].(string)
if relType != "m.thread" {
return ""
}
threadRoot, _ := relatesTo["event_id"].(string)
return threadRoot
}
func TestExtractThreadID_ThreadMessage(t *testing.T) {
raw := map[string]any{
"body": "hello",
"m.relates_to": map[string]any{
"rel_type": "m.thread",
"event_id": "$root123",
"is_falling_back": true,
"m.in_reply_to": map[string]any{
"event_id": "$last456",
},
},
}
got := extractThreadID(raw)
if got != "$root123" {
t.Errorf("expected $root123, got %q", got)
}
}
func TestExtractThreadID_ReplyNotThread(t *testing.T) {
raw := map[string]any{
"body": "hello",
"m.relates_to": map[string]any{
"m.in_reply_to": map[string]any{
"event_id": "$evt789",
},
},
}
got := extractThreadID(raw)
if got != "" {
t.Errorf("expected empty string for non-thread reply, got %q", got)
}
}
func TestExtractThreadID_NoRelatesTo(t *testing.T) {
raw := map[string]any{
"body": "plain message",
}
got := extractThreadID(raw)
if got != "" {
t.Errorf("expected empty string for plain message, got %q", got)
}
}
func TestExtractThreadID_ReplaceRelation(t *testing.T) {
raw := map[string]any{
"body": "edited",
"m.relates_to": map[string]any{
"rel_type": "m.replace",
"event_id": "$original",
},
}
got := extractThreadID(raw)
if got != "" {
t.Errorf("expected empty string for m.replace, got %q", got)
}
}
func TestExtractThreadID_ThreadWithoutEventID(t *testing.T) {
raw := map[string]any{
"m.relates_to": map[string]any{
"rel_type": "m.thread",
// missing event_id
},
}
got := extractThreadID(raw)
if got != "" {
t.Errorf("expected empty string for thread without event_id, got %q", got)
}
}