diff --git a/dev/issues/completed/0012-threads.md b/dev/issues/completed/0012-threads.md index 0bc10f1..36a0810 100644 --- a/dev/issues/completed/0012-threads.md +++ b/dev/issues/completed/0012-threads.md @@ -6,6 +6,8 @@ Permitir que los agentes mantengan conversaciones en threads de Matrix (`m.threa de forma que cada interaccion con un usuario pueda vivir en un hilo separado en lugar de la timeline principal del room. +Las respuestas del agente deben volver en el hilo y no en la rama principal + ## Contexto Matrix soporta threads via `m.relates_to` con `rel_type: "m.thread"`. diff --git a/e2e/fixtures/matrix-room.ts b/e2e/fixtures/matrix-room.ts index 616f223..e1a6ee4 100644 --- a/e2e/fixtures/matrix-room.ts +++ b/e2e/fixtures/matrix-room.ts @@ -301,17 +301,26 @@ export async function startThreadOnLastMessage(page: Page) { await dismissToasts(page); // Obtener el event ID del ultimo mensaje y el room ID via el SDK de Element - const threadInfo = await page.evaluate(() => { + const threadInfo = await page.evaluate(async () => { // eslint-disable-next-line @typescript-eslint/no-explicit-any const client = (window as any).mxMatrixClientPeg?.get?.(); if (!client) throw new Error("Matrix client no disponible en window"); - // Obtener el room actual visible - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const dis = (window as any).dis; - const roomId = client.getRooms() - .filter((r: { getMyMembership: () => string }) => r.getMyMembership() === "join") - .map((r: { roomId: string }) => r.roomId)[0]; + // Obtener el room actual desde la URL (mas fiable que getRooms()[0]) + const hash = window.location.hash; // e.g. "#/room/!xxx:server" or "#/room/#alias:server" + const match = hash.match(/#\/room\/([^?/]+)/); + if (!match) throw new Error(`No se pudo obtener room ID de la URL: ${hash}`); + const roomIdOrAlias = decodeURIComponent(match[1]); + + let roomId: string; + if (roomIdOrAlias.startsWith("!")) { + roomId = roomIdOrAlias; + } else { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const resolved = await (client as any).getRoomIdForAlias(roomIdOrAlias); + if (!resolved?.room_id) throw new Error(`No se pudo resolver alias: ${roomIdOrAlias}`); + roomId = resolved.room_id; + } if (!roomId) throw new Error("No hay room activo"); @@ -526,11 +535,33 @@ export async function waitForThreadReplyViaSdk( const client = (window as any).mxMatrixClientPeg?.get?.(); if (!client) return null; + // Scoped to current room only (via URL) to avoid false positives + const hash = window.location.hash; + const match = hash.match(/#\/room\/([^?/]+)/); + const roomIdOrAlias = match ? decodeURIComponent(match[1]) : null; + const rooms = client.getRooms().filter( - (r: { getMyMembership: () => string }) => r.getMyMembership() === "join" + (r: { getMyMembership: () => string; roomId: string }) => { + if (r.getMyMembership() !== "join") return false; + if (roomIdOrAlias) { + return r.roomId === roomIdOrAlias || + r.roomId === roomIdOrAlias; // alias resolution handled below + } + return true; + } ); for (const room of rooms) { + // Skip rooms that don't match the current URL room + if (roomIdOrAlias && !roomIdOrAlias.startsWith("!")) { + // For aliases, check if the room has this alias + const aliases = room.getAltAliases?.() || []; + const canonicalAlias = room.getCanonicalAlias?.(); + if (canonicalAlias !== roomIdOrAlias && !aliases.includes(roomIdOrAlias)) { + continue; + } + } + const timeline = room.getLiveTimeline().getEvents(); // Buscar eventos que sean respuestas de thread (m.relates_to.rel_type === "m.thread") const threadReplies = timeline.filter((e: { diff --git a/shell/matrix/listener.go b/shell/matrix/listener.go index f6b4fd2..3f9657f 100644 --- a/shell/matrix/listener.go +++ b/shell/matrix/listener.go @@ -29,14 +29,15 @@ type MembershipNotifyFunc func(ctx context.Context, roomID, userID, membership s // Listener attaches to a mautrix syncer and dispatches events to an EventHandler. type Listener struct { - client *Client - cfg config.MatrixCfg - handler EventHandler - logger *slog.Logger - dmCache map[id.RoomID]bool - mu sync.RWMutex - interceptor InterceptFunc // if set and returns true, event is forwarded to orchestrator - membershipNotify MembershipNotifyFunc // if set, called on all StateMember events + client *Client + cfg config.MatrixCfg + handler EventHandler + logger *slog.Logger + dmCache map[id.RoomID]bool + mu sync.RWMutex + interceptor InterceptFunc // if set and returns true, event is forwarded to orchestrator + membershipNotify MembershipNotifyFunc // if set, called on all StateMember events + encryptedRelatesTo sync.Map // id.EventID → map[string]any: cache m.relates_to from encrypted events } // NewListener creates a Listener for the given client. @@ -66,6 +67,22 @@ func (l *Listener) SetMembershipNotify(fn MembershipNotifyFunc) { func (l *Listener) Run(ctx context.Context) error { syncer := l.client.raw.Syncer.(*mautrix.DefaultSyncer) + // Cache m.relates_to from encrypted events BEFORE they are decrypted. + // mautrix-go only copies m.relates_to into the decrypted content when + // EncryptedEventContent.RelatesTo != nil (i.e. the outer event has it). + // Older Element / matrix-js-sdk versions may not include it in the outer + // event, so we store it here and use it as a fallback for thread detection. + // Global listeners fire before type-specific ones, so this runs before + // CryptoHelper decrypts and dispatches the event.EventMessage. + syncer.OnEvent(func(ctx context.Context, evt *event.Event) { + if evt.Type != event.EventEncrypted { + return + } + if relatesTo, ok := evt.Content.Raw["m.relates_to"].(map[string]any); ok { + l.encryptedRelatesTo.Store(evt.ID, relatesTo) + } + }) + // Auto-join rooms when invited. Without this, the bot stays in "invited" // state and never receives m.room.message events. syncer.OnEventType(event.StateMember, func(ctx context.Context, evt *event.Event) { @@ -154,7 +171,10 @@ func (l *Listener) Run(ctx context.Context) error { msgCtx.EventID = evt.ID.String() // Extract thread root from m.relates_to (Matrix thread support). - // Two methods: raw map (fast) + typed content fallback (robust for E2EE). + // Three methods in order of preference: + // 1. Raw map from decrypted content (fast path) + // 2. Typed parsed content (robust after E2EE decryption) + // 3. Encrypted event cache (for older clients that omit outer m.relates_to) if l.cfg.Threads.Enabled { if relatesTo, ok := evt.Content.Raw["m.relates_to"].(map[string]any); ok { if relType, _ := relatesTo["rel_type"].(string); relType == "m.thread" { @@ -163,7 +183,7 @@ func (l *Listener) Run(ctx context.Context) error { } } } - // Fallback: use typed content (more robust after E2EE decryption) + // Fallback 2: use typed content (more robust after E2EE decryption) if msgCtx.ThreadID == "" { if msgContent, ok := evt.Content.Parsed.(*event.MessageEventContent); ok && msgContent.RelatesTo != nil { if threadParent := msgContent.RelatesTo.GetThreadParent(); threadParent != "" { @@ -172,6 +192,21 @@ func (l *Listener) Run(ctx context.Context) error { } } } + // Fallback 3: encrypted event cache — for E2EE messages where the outer + // m.relates_to was NOT propagated into the decrypted content by mautrix-go + // (happens with older Element / matrix-js-sdk versions). + if msgCtx.ThreadID == "" { + if cached, found := l.encryptedRelatesTo.LoadAndDelete(evt.ID); found { + if rt, ok := cached.(map[string]any); ok { + if relType, _ := rt["rel_type"].(string); relType == "m.thread" { + if threadRoot, _ := rt["event_id"].(string); threadRoot != "" { + msgCtx.ThreadID = threadRoot + l.logger.Debug("thread detected via encrypted event cache", "thread_id", msgCtx.ThreadID) + } + } + } + } + } } l.logger.Debug("message parsed",