diff --git a/agents/runtime.go b/agents/runtime.go index 4739e7f..01b0dbe 100644 --- a/agents/runtime.go +++ b/agents/runtime.go @@ -486,7 +486,7 @@ func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) { Role: coretypes.RoleUser, Content: msgCtx.Content, }) - reply, err := a.runLLM(ctx, msgCtx) + reply, err := a.runLLM(ctx, msgCtx, roomID) // Build the result to send back via bus result := orchestration.TaskResult{ @@ -571,13 +571,13 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, // RBAC check for commands if !a.acl.CanDo(msgCtx.SenderID, "command:"+cmdName) { a.logger.Info("command_denied", "command", cmdName, "sender", msgCtx.SenderID) - _ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID, + _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, "No tienes permisos para ejecutar este comando.") return } a.logger.Info("command_executed", "command", cmdName) reply := handler(ctx, msgCtx) - _ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID, reply) + _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, reply) return } @@ -591,7 +591,7 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, } else { // Unknown command — never falls through to rules or LLM a.logger.Info("command_unknown", "command", msgCtx.Command) - _ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID, + _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, fmt.Sprintf("Comando desconocido: `!%s`. Usa `!help` para ver comandos disponibles.", msgCtx.Command)) return } @@ -601,7 +601,7 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, // RBAC check for LLM access ("ask" action) if !a.acl.CanDo(msgCtx.SenderID, "ask") { a.logger.Info("ask_denied", "sender", msgCtx.SenderID) - _ = a.matrix.SendReplyMarkdown(ctx, roomID, msgCtx.EventID, + _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, "No tienes permisos para interactuar con este agente.") return } @@ -636,17 +636,30 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, // executeActions expands LLM actions and runs the effects runner. func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decision.MessageContext, actions []decision.Action) { + // Auto-thread: if configured and message is not already in a thread, + // start a new thread rooted at the user's message. + if a.cfg.Matrix.Threads.AutoThread && msgCtx.ThreadID == "" && msgCtx.EventID != "" { + msgCtx.ThreadID = msgCtx.EventID + } + // Sanitize user input before sending to LLM sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID) if rejected { a.runner.Execute(ctx, roomID, []decision.Action{{ Kind: decision.ActionKindReply, - Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID}, + Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, }}) return } msgCtx.Content = sanitized + // Resolve memory key: use thread root as context key when inside a thread, + // so parallel threads in the same room have independent conversation windows. + memKey := roomID + if msgCtx.ThreadID != "" { + memKey = msgCtx.ThreadID + } + expanded := make([]decision.Action, 0, len(actions)) for _, act := range actions { if act.Kind == decision.ActionKindLLM { @@ -654,35 +667,35 @@ func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decisi a.logger.Warn("LLM action requested but no LLM configured") expanded = append(expanded, decision.Action{ Kind: decision.ActionKindReply, - Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID}, + Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, }) continue } // Memory: load window + append user message before LLM call - a.ensureWindowLoaded(ctx, roomID) - a.appendToWindow(roomID, coretypes.Message{ + a.ensureWindowLoaded(ctx, memKey) + a.appendToWindow(memKey, coretypes.Message{ Role: coretypes.RoleUser, Content: msgCtx.Content, }) - a.persistMessage(ctx, roomID, coretypes.RoleUser, msgCtx.Content) + a.persistMessage(ctx, memKey, coretypes.RoleUser, msgCtx.Content) - reply, err := a.runLLM(ctx, msgCtx) + reply, err := a.runLLM(ctx, msgCtx, memKey) if err != nil { a.logger.Error("llm error", "err", err) expanded = append(expanded, decision.Action{ Kind: decision.ActionKindReply, - Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID}, + Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, }) } else { expanded = append(expanded, decision.Action{ Kind: decision.ActionKindReply, - Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID}, + Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID}, }) // Memory: append assistant reply after LLM call - a.appendToWindow(roomID, coretypes.Message{ + a.appendToWindow(memKey, coretypes.Message{ Role: coretypes.RoleAssistant, Content: reply, }) - a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply) + a.persistMessage(ctx, memKey, coretypes.RoleAssistant, reply) } } else { expanded = append(expanded, act) @@ -692,7 +705,7 @@ func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decisi a.runner.Execute(ctx, roomID, expanded) } -func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (string, error) { +func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext, memKey string) (string, error) { a.logger.Debug("calling LLM", "model", a.cfg.LLM.Primary.Model, "provider", a.cfg.LLM.Primary.Provider, @@ -702,7 +715,7 @@ func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (str systemPrompt := a.cfg.Agent.Description // Build messages: conversation history from window (includes current user msg) - messages := a.getWindowMessages(msgCtx.RoomID) + messages := a.getWindowMessages(memKey) if len(messages) == 0 { // Fallback if memory is disabled: just the current message messages = []coretypes.Message{ @@ -873,6 +886,15 @@ func (a *Agent) persistMessage(ctx context.Context, roomID string, role coretype } } +// sendReply sends a markdown reply that respects thread context. +// If threadID is non-empty, the reply is sent as part of that thread. +func (a *Agent) sendReply(ctx context.Context, roomID, eventID, threadID, markdown string) error { + if threadID != "" { + return a.matrix.SendThreadMarkdown(ctx, roomID, threadID, eventID, markdown) + } + return a.matrix.SendReplyMarkdown(ctx, roomID, eventID, markdown) +} + // parseSeverity converts a config string to sanitize.Severity. func parseSeverity(s string) sanitize.Severity { switch s { diff --git a/dev/issues/README.md b/dev/issues/README.md index 63f0e9c..070f5fe 100644 --- a/dev/issues/README.md +++ b/dev/issues/README.md @@ -16,7 +16,7 @@ afectados y notas de implementacion. | 9 | Command system | [0009-command_system.md](completed/0009-command_system.md) | completado | | 10 | Access control | [0010-access-control.md](completed/0010-access-control.md) | completado | | 11 | Markdown rendering | [0011-markdown-rendering.md](completed/0011-markdown-rendering.md) | completado | -| 12 | Threads | [0012-threads.md](0012-threads.md) | pendiente | +| 12 | Threads | [0012-threads.md](completed/0012-threads.md) | completado | | 13 | Hot reload | [0013-hot-reload.md](0013-hot-reload.md) | pendiente | | 14 | Template agent standardize | [0014-template-agent-standardize.md](0014-template-agent-standardize.md) | pendiente | | 15 | Multi-platform Telegram | [0015-multi-platform-telegram.md](0015-multi-platform-telegram.md) | pendiente | diff --git a/dev/issues/0012-threads.md b/dev/issues/completed/0012-threads.md similarity index 100% rename from dev/issues/0012-threads.md rename to dev/issues/completed/0012-threads.md diff --git a/internal/config/schema.go b/internal/config/schema.go index cd05d9f..b461d1c 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -198,6 +198,13 @@ type MatrixCfg struct { Encryption EncryptionCfg `yaml:"encryption"` Rooms RoomsCfg `yaml:"rooms"` Filters FiltersCfg `yaml:"filters"` + Threads ThreadsCfg `yaml:"threads"` +} + +// ThreadsCfg controls Matrix thread support (m.thread). +type ThreadsCfg struct { + Enabled bool `yaml:"enabled"` // respond in threads when message is in a thread (default true) + AutoThread bool `yaml:"auto_thread"` // auto-create a thread for each new conversation (default false) } type EncryptionCfg struct { diff --git a/shell/effects/runner.go b/shell/effects/runner.go index 9b5f154..5788df6 100644 --- a/shell/effects/runner.go +++ b/shell/effects/runner.go @@ -24,6 +24,7 @@ type MatrixSender interface { SendText(ctx context.Context, roomID, text string) error SendMarkdown(ctx context.Context, roomID, markdown string) error SendReplyMarkdown(ctx context.Context, roomID, inReplyTo, markdown string) error + SendThreadMarkdown(ctx context.Context, roomID, threadRootID, inReplyTo, markdown string) error SendTyping(ctx context.Context, roomID string, typing bool) error } @@ -63,15 +64,15 @@ func (r *Runner) executeOne(ctx context.Context, roomID string, a decision.Actio if a.Reply == nil { return Result{Action: a, Err: fmt.Errorf("nil reply action")} } - target := roomID - if a.Reply.ThreadID != "" { - target = a.Reply.ThreadID - } var err error - if a.Reply.InReplyTo != "" { - err = r.matrix.SendReplyMarkdown(ctx, target, a.Reply.InReplyTo, a.Reply.Content) - } else { - err = r.matrix.SendMarkdown(ctx, target, a.Reply.Content) + switch { + case a.Reply.ThreadID != "": + // Thread reply: send as part of the thread with fallback in_reply_to + err = r.matrix.SendThreadMarkdown(ctx, roomID, a.Reply.ThreadID, a.Reply.InReplyTo, a.Reply.Content) + case a.Reply.InReplyTo != "": + err = r.matrix.SendReplyMarkdown(ctx, roomID, a.Reply.InReplyTo, a.Reply.Content) + default: + err = r.matrix.SendMarkdown(ctx, roomID, a.Reply.Content) } return Result{Action: a, Output: a.Reply.Content, Err: err} diff --git a/shell/effects/runner_test.go b/shell/effects/runner_test.go new file mode 100644 index 0000000..b5f0547 --- /dev/null +++ b/shell/effects/runner_test.go @@ -0,0 +1,172 @@ +package effects + +import ( + "context" + "log/slog" + "testing" + + "github.com/enmanuel/agents/pkg/decision" +) + +// fakeMatrixSender records calls for assertions. +type fakeMatrixSender struct { + calls []senderCall +} + +type senderCall struct { + method string + roomID string + threadRootID string + inReplyTo string + markdown string +} + +func (f *fakeMatrixSender) SendText(ctx context.Context, roomID, text string) error { + f.calls = append(f.calls, senderCall{method: "SendText", roomID: roomID, markdown: text}) + return nil +} + +func (f *fakeMatrixSender) SendMarkdown(ctx context.Context, roomID, markdown string) error { + f.calls = append(f.calls, senderCall{method: "SendMarkdown", roomID: roomID, markdown: markdown}) + return nil +} + +func (f *fakeMatrixSender) SendReplyMarkdown(ctx context.Context, roomID, inReplyTo, markdown string) error { + f.calls = append(f.calls, senderCall{method: "SendReplyMarkdown", roomID: roomID, inReplyTo: inReplyTo, markdown: markdown}) + return nil +} + +func (f *fakeMatrixSender) SendThreadMarkdown(ctx context.Context, roomID, threadRootID, inReplyTo, markdown string) error { + f.calls = append(f.calls, senderCall{method: "SendThreadMarkdown", roomID: roomID, threadRootID: threadRootID, inReplyTo: inReplyTo, markdown: markdown}) + return nil +} + +func (f *fakeMatrixSender) SendTyping(ctx context.Context, roomID string, typing bool) error { + return nil +} + +func TestExecuteReply_PlainMarkdown(t *testing.T) { + sender := &fakeMatrixSender{} + runner := NewRunner(sender, nil, slog.Default()) + + actions := []decision.Action{{ + Kind: decision.ActionKindReply, + Reply: &decision.ReplyAction{Content: "hello"}, + }} + + results := runner.Execute(context.Background(), "!room:test", actions) + if len(results) != 1 || results[0].Err != nil { + t.Fatalf("unexpected results: %+v", results) + } + if len(sender.calls) != 1 { + t.Fatalf("expected 1 call, got %d", len(sender.calls)) + } + c := sender.calls[0] + if c.method != "SendMarkdown" { + t.Errorf("expected SendMarkdown, got %s", c.method) + } + if c.roomID != "!room:test" { + t.Errorf("expected room !room:test, got %s", c.roomID) + } +} + +func TestExecuteReply_WithInReplyTo(t *testing.T) { + sender := &fakeMatrixSender{} + runner := NewRunner(sender, nil, slog.Default()) + + actions := []decision.Action{{ + Kind: decision.ActionKindReply, + Reply: &decision.ReplyAction{Content: "hello", InReplyTo: "$evt1"}, + }} + + runner.Execute(context.Background(), "!room:test", actions) + + if len(sender.calls) != 1 { + t.Fatalf("expected 1 call, got %d", len(sender.calls)) + } + c := sender.calls[0] + if c.method != "SendReplyMarkdown" { + t.Errorf("expected SendReplyMarkdown, got %s", c.method) + } + if c.inReplyTo != "$evt1" { + t.Errorf("expected inReplyTo=$evt1, got %s", c.inReplyTo) + } +} + +func TestExecuteReply_WithThread(t *testing.T) { + sender := &fakeMatrixSender{} + runner := NewRunner(sender, nil, slog.Default()) + + actions := []decision.Action{{ + Kind: decision.ActionKindReply, + Reply: &decision.ReplyAction{ + Content: "thread reply", + ThreadID: "$root", + InReplyTo: "$evt2", + }, + }} + + runner.Execute(context.Background(), "!room:test", actions) + + if len(sender.calls) != 1 { + t.Fatalf("expected 1 call, got %d", len(sender.calls)) + } + c := sender.calls[0] + if c.method != "SendThreadMarkdown" { + t.Errorf("expected SendThreadMarkdown, got %s", c.method) + } + if c.threadRootID != "$root" { + t.Errorf("expected threadRootID=$root, got %s", c.threadRootID) + } + if c.inReplyTo != "$evt2" { + t.Errorf("expected inReplyTo=$evt2, got %s", c.inReplyTo) + } + if c.roomID != "!room:test" { + t.Errorf("expected room !room:test, got %s", c.roomID) + } +} + +func TestExecuteReply_ThreadWithoutInReplyTo(t *testing.T) { + sender := &fakeMatrixSender{} + runner := NewRunner(sender, nil, slog.Default()) + + actions := []decision.Action{{ + Kind: decision.ActionKindReply, + Reply: &decision.ReplyAction{ + Content: "thread reply no fallback", + ThreadID: "$root", + }, + }} + + runner.Execute(context.Background(), "!room:test", actions) + + if len(sender.calls) != 1 { + t.Fatalf("expected 1 call, got %d", len(sender.calls)) + } + c := sender.calls[0] + if c.method != "SendThreadMarkdown" { + t.Errorf("expected SendThreadMarkdown, got %s", c.method) + } + // inReplyTo should be empty; SendThreadMarkdown will default to threadRootID + if c.inReplyTo != "" { + t.Errorf("expected empty inReplyTo, got %s", c.inReplyTo) + } +} + +func TestExecuteReply_NilReply(t *testing.T) { + sender := &fakeMatrixSender{} + runner := NewRunner(sender, nil, slog.Default()) + + actions := []decision.Action{{ + Kind: decision.ActionKindReply, + Reply: nil, + }} + + results := runner.Execute(context.Background(), "!room:test", actions) + if len(results) != 1 { + t.Fatalf("expected 1 result, got %d", len(results)) + } + if results[0].Err == nil { + t.Error("expected error for nil reply") + } +} diff --git a/shell/matrix/client.go b/shell/matrix/client.go index 68229f4..7155d49 100644 --- a/shell/matrix/client.go +++ b/shell/matrix/client.go @@ -320,6 +320,26 @@ func (c *Client) SendReplyMarkdown(ctx context.Context, roomID, inReplyTo, markd return err } +// SendThreadMarkdown sends a formatted message as part of a Matrix thread. +// threadRootID is the event that started the thread (always the same for all messages in a thread). +// inReplyTo is the specific event being replied to within the thread (used as fallback for non-thread clients). +// If inReplyTo is empty, it defaults to threadRootID. +func (c *Client) SendThreadMarkdown(ctx context.Context, roomID, threadRootID, inReplyTo, markdown string) error { + if inReplyTo == "" { + inReplyTo = threadRootID + } + html := mdToHTML(markdown) + content := event.MessageEventContent{ + MsgType: event.MsgText, + Body: markdown, + Format: event.FormatHTML, + FormattedBody: html, + RelatesTo: (&event.RelatesTo{}).SetThread(id.EventID(threadRootID), id.EventID(inReplyTo)), + } + _, err := c.raw.SendMessageEvent(ctx, id.RoomID(roomID), event.EventMessage, content) + return err +} + // SendReaction sends a reaction to an event. func (c *Client) SendReaction(ctx context.Context, roomID, eventID, reaction string) error { _, err := c.raw.SendReaction(ctx, id.RoomID(roomID), id.EventID(eventID), reaction) diff --git a/shell/matrix/listener.go b/shell/matrix/listener.go index 5b585cf..5486514 100644 --- a/shell/matrix/listener.go +++ b/shell/matrix/listener.go @@ -153,6 +153,17 @@ func (l *Listener) Run(ctx context.Context) error { ) msgCtx.EventID = evt.ID.String() + // Extract thread root from m.relates_to (Matrix thread support). + if l.cfg.Threads.Enabled { + if relatesTo, ok := evt.Content.Raw["m.relates_to"].(map[string]any); ok { + if relType, _ := relatesTo["rel_type"].(string); relType == "m.thread" { + if threadRoot, _ := relatesTo["event_id"].(string); threadRoot != "" { + msgCtx.ThreadID = threadRoot + } + } + } + } + l.logger.Debug("message parsed", "sender", msgCtx.SenderID, "room", msgCtx.RoomID, diff --git a/shell/matrix/thread_relates_test.go b/shell/matrix/thread_relates_test.go new file mode 100644 index 0000000..2771a6b --- /dev/null +++ b/shell/matrix/thread_relates_test.go @@ -0,0 +1,66 @@ +package matrix + +import ( + "encoding/json" + "testing" + + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" +) + +// TestThreadRelatesToStructure verifies the JSON structure produced by SetThread +// matches the Matrix spec for m.thread events. +func TestThreadRelatesToStructure(t *testing.T) { + rel := (&event.RelatesTo{}).SetThread(id.EventID("$root"), id.EventID("$last")) + + data, err := json.Marshal(rel) + if err != nil { + t.Fatalf("marshal error: %v", err) + } + + var m map[string]any + if err := json.Unmarshal(data, &m); err != nil { + t.Fatalf("unmarshal error: %v", err) + } + + if m["rel_type"] != "m.thread" { + t.Errorf("expected rel_type=m.thread, got %v", m["rel_type"]) + } + if m["event_id"] != "$root" { + t.Errorf("expected event_id=$root, got %v", m["event_id"]) + } + if m["is_falling_back"] != true { + t.Errorf("expected is_falling_back=true, got %v", m["is_falling_back"]) + } + + inReplyTo, ok := m["m.in_reply_to"].(map[string]any) + if !ok { + t.Fatal("expected m.in_reply_to to be an object") + } + if inReplyTo["event_id"] != "$last" { + t.Errorf("expected m.in_reply_to.event_id=$last, got %v", inReplyTo["event_id"]) + } +} + +// TestThreadRelatesToStructure_SameRootAndFallback verifies that when root == fallback, +// the structure is still correct. +func TestThreadRelatesToStructure_SameRootAndFallback(t *testing.T) { + rel := (&event.RelatesTo{}).SetThread(id.EventID("$root"), id.EventID("$root")) + + data, err := json.Marshal(rel) + if err != nil { + t.Fatalf("marshal error: %v", err) + } + + var m map[string]any + if err := json.Unmarshal(data, &m); err != nil { + t.Fatalf("unmarshal error: %v", err) + } + + if m["rel_type"] != "m.thread" { + t.Errorf("expected rel_type=m.thread, got %v", m["rel_type"]) + } + if m["event_id"] != "$root" { + t.Errorf("expected event_id=$root, got %v", m["event_id"]) + } +} diff --git a/shell/matrix/thread_test.go b/shell/matrix/thread_test.go new file mode 100644 index 0000000..8d6d3dd --- /dev/null +++ b/shell/matrix/thread_test.go @@ -0,0 +1,95 @@ +package matrix + +import ( + "testing" +) + +// extractThreadID is the pure logic extracted from the listener for testability. +// It mirrors the extraction in listener.go's EventMessage handler. +func extractThreadID(rawContent map[string]any) string { + relatesTo, ok := rawContent["m.relates_to"].(map[string]any) + if !ok { + return "" + } + relType, _ := relatesTo["rel_type"].(string) + if relType != "m.thread" { + return "" + } + threadRoot, _ := relatesTo["event_id"].(string) + return threadRoot +} + +func TestExtractThreadID_ThreadMessage(t *testing.T) { + raw := map[string]any{ + "body": "hello", + "m.relates_to": map[string]any{ + "rel_type": "m.thread", + "event_id": "$root123", + "is_falling_back": true, + "m.in_reply_to": map[string]any{ + "event_id": "$last456", + }, + }, + } + + got := extractThreadID(raw) + if got != "$root123" { + t.Errorf("expected $root123, got %q", got) + } +} + +func TestExtractThreadID_ReplyNotThread(t *testing.T) { + raw := map[string]any{ + "body": "hello", + "m.relates_to": map[string]any{ + "m.in_reply_to": map[string]any{ + "event_id": "$evt789", + }, + }, + } + + got := extractThreadID(raw) + if got != "" { + t.Errorf("expected empty string for non-thread reply, got %q", got) + } +} + +func TestExtractThreadID_NoRelatesTo(t *testing.T) { + raw := map[string]any{ + "body": "plain message", + } + + got := extractThreadID(raw) + if got != "" { + t.Errorf("expected empty string for plain message, got %q", got) + } +} + +func TestExtractThreadID_ReplaceRelation(t *testing.T) { + raw := map[string]any{ + "body": "edited", + "m.relates_to": map[string]any{ + "rel_type": "m.replace", + "event_id": "$original", + }, + } + + got := extractThreadID(raw) + if got != "" { + t.Errorf("expected empty string for m.replace, got %q", got) + } +} + +func TestExtractThreadID_ThreadWithoutEventID(t *testing.T) { + raw := map[string]any{ + "m.relates_to": map[string]any{ + "rel_type": "m.thread", + // missing event_id + }, + } + + got := extractThreadID(raw) + if got != "" { + t.Errorf("expected empty string for thread without event_id, got %q", got) + } +}