merge: issue/0012-threads — thread detection via encrypted event cache
Corrige el bug por el que los agentes respondia en la timeline principal en lugar del thread. Causa raiz: mautrix-go solo copia m.relates_to al payload descifrado si el evento cifrado exterior incluye EncryptedEventContent.RelatesTo, lo cual versiones antiguas de Element no hacen. Los dos mecanismos de deteccion existentes fallaban en este caso. Solucion: cache de eventos cifrados en Listener usando un sync.Map. Un listener global (OnEvent) captura m.relates_to del evento m.room.encrypted antes de que CryptoHelper lo descifre. El handler EventMessage lo consulta como tercer fallback con LoadAndDelete. Ademas: correccion del fixture Playwright startThreadOnLastMessage (usaba getRooms()[0] en lugar del room de la URL actual) y waitForThreadReplyViaSdk (acotado al room de la URL para evitar falsos positivos).
This commit is contained in:
@@ -6,6 +6,8 @@ Permitir que los agentes mantengan conversaciones en threads de Matrix (`m.threa
|
|||||||
de forma que cada interaccion con un usuario pueda vivir en un hilo separado
|
de forma que cada interaccion con un usuario pueda vivir en un hilo separado
|
||||||
en lugar de la timeline principal del room.
|
en lugar de la timeline principal del room.
|
||||||
|
|
||||||
|
Las respuestas del agente deben volver en el hilo y no en la rama principal
|
||||||
|
|
||||||
## Contexto
|
## Contexto
|
||||||
|
|
||||||
Matrix soporta threads via `m.relates_to` con `rel_type: "m.thread"`.
|
Matrix soporta threads via `m.relates_to` con `rel_type: "m.thread"`.
|
||||||
|
|||||||
@@ -301,17 +301,26 @@ export async function startThreadOnLastMessage(page: Page) {
|
|||||||
await dismissToasts(page);
|
await dismissToasts(page);
|
||||||
|
|
||||||
// Obtener el event ID del ultimo mensaje y el room ID via el SDK de Element
|
// Obtener el event ID del ultimo mensaje y el room ID via el SDK de Element
|
||||||
const threadInfo = await page.evaluate(() => {
|
const threadInfo = await page.evaluate(async () => {
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
const client = (window as any).mxMatrixClientPeg?.get?.();
|
const client = (window as any).mxMatrixClientPeg?.get?.();
|
||||||
if (!client) throw new Error("Matrix client no disponible en window");
|
if (!client) throw new Error("Matrix client no disponible en window");
|
||||||
|
|
||||||
// Obtener el room actual visible
|
// Obtener el room actual desde la URL (mas fiable que getRooms()[0])
|
||||||
|
const hash = window.location.hash; // e.g. "#/room/!xxx:server" or "#/room/#alias:server"
|
||||||
|
const match = hash.match(/#\/room\/([^?/]+)/);
|
||||||
|
if (!match) throw new Error(`No se pudo obtener room ID de la URL: ${hash}`);
|
||||||
|
const roomIdOrAlias = decodeURIComponent(match[1]);
|
||||||
|
|
||||||
|
let roomId: string;
|
||||||
|
if (roomIdOrAlias.startsWith("!")) {
|
||||||
|
roomId = roomIdOrAlias;
|
||||||
|
} else {
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
const dis = (window as any).dis;
|
const resolved = await (client as any).getRoomIdForAlias(roomIdOrAlias);
|
||||||
const roomId = client.getRooms()
|
if (!resolved?.room_id) throw new Error(`No se pudo resolver alias: ${roomIdOrAlias}`);
|
||||||
.filter((r: { getMyMembership: () => string }) => r.getMyMembership() === "join")
|
roomId = resolved.room_id;
|
||||||
.map((r: { roomId: string }) => r.roomId)[0];
|
}
|
||||||
|
|
||||||
if (!roomId) throw new Error("No hay room activo");
|
if (!roomId) throw new Error("No hay room activo");
|
||||||
|
|
||||||
@@ -526,11 +535,33 @@ export async function waitForThreadReplyViaSdk(
|
|||||||
const client = (window as any).mxMatrixClientPeg?.get?.();
|
const client = (window as any).mxMatrixClientPeg?.get?.();
|
||||||
if (!client) return null;
|
if (!client) return null;
|
||||||
|
|
||||||
|
// Scoped to current room only (via URL) to avoid false positives
|
||||||
|
const hash = window.location.hash;
|
||||||
|
const match = hash.match(/#\/room\/([^?/]+)/);
|
||||||
|
const roomIdOrAlias = match ? decodeURIComponent(match[1]) : null;
|
||||||
|
|
||||||
const rooms = client.getRooms().filter(
|
const rooms = client.getRooms().filter(
|
||||||
(r: { getMyMembership: () => string }) => r.getMyMembership() === "join"
|
(r: { getMyMembership: () => string; roomId: string }) => {
|
||||||
|
if (r.getMyMembership() !== "join") return false;
|
||||||
|
if (roomIdOrAlias) {
|
||||||
|
return r.roomId === roomIdOrAlias ||
|
||||||
|
r.roomId === roomIdOrAlias; // alias resolution handled below
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
for (const room of rooms) {
|
for (const room of rooms) {
|
||||||
|
// Skip rooms that don't match the current URL room
|
||||||
|
if (roomIdOrAlias && !roomIdOrAlias.startsWith("!")) {
|
||||||
|
// For aliases, check if the room has this alias
|
||||||
|
const aliases = room.getAltAliases?.() || [];
|
||||||
|
const canonicalAlias = room.getCanonicalAlias?.();
|
||||||
|
if (canonicalAlias !== roomIdOrAlias && !aliases.includes(roomIdOrAlias)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const timeline = room.getLiveTimeline().getEvents();
|
const timeline = room.getLiveTimeline().getEvents();
|
||||||
// Buscar eventos que sean respuestas de thread (m.relates_to.rel_type === "m.thread")
|
// Buscar eventos que sean respuestas de thread (m.relates_to.rel_type === "m.thread")
|
||||||
const threadReplies = timeline.filter((e: {
|
const threadReplies = timeline.filter((e: {
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ type Listener struct {
|
|||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
interceptor InterceptFunc // if set and returns true, event is forwarded to orchestrator
|
interceptor InterceptFunc // if set and returns true, event is forwarded to orchestrator
|
||||||
membershipNotify MembershipNotifyFunc // if set, called on all StateMember events
|
membershipNotify MembershipNotifyFunc // if set, called on all StateMember events
|
||||||
|
encryptedRelatesTo sync.Map // id.EventID → map[string]any: cache m.relates_to from encrypted events
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewListener creates a Listener for the given client.
|
// NewListener creates a Listener for the given client.
|
||||||
@@ -66,6 +67,22 @@ func (l *Listener) SetMembershipNotify(fn MembershipNotifyFunc) {
|
|||||||
func (l *Listener) Run(ctx context.Context) error {
|
func (l *Listener) Run(ctx context.Context) error {
|
||||||
syncer := l.client.raw.Syncer.(*mautrix.DefaultSyncer)
|
syncer := l.client.raw.Syncer.(*mautrix.DefaultSyncer)
|
||||||
|
|
||||||
|
// Cache m.relates_to from encrypted events BEFORE they are decrypted.
|
||||||
|
// mautrix-go only copies m.relates_to into the decrypted content when
|
||||||
|
// EncryptedEventContent.RelatesTo != nil (i.e. the outer event has it).
|
||||||
|
// Older Element / matrix-js-sdk versions may not include it in the outer
|
||||||
|
// event, so we store it here and use it as a fallback for thread detection.
|
||||||
|
// Global listeners fire before type-specific ones, so this runs before
|
||||||
|
// CryptoHelper decrypts and dispatches the event.EventMessage.
|
||||||
|
syncer.OnEvent(func(ctx context.Context, evt *event.Event) {
|
||||||
|
if evt.Type != event.EventEncrypted {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if relatesTo, ok := evt.Content.Raw["m.relates_to"].(map[string]any); ok {
|
||||||
|
l.encryptedRelatesTo.Store(evt.ID, relatesTo)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
// Auto-join rooms when invited. Without this, the bot stays in "invited"
|
// Auto-join rooms when invited. Without this, the bot stays in "invited"
|
||||||
// state and never receives m.room.message events.
|
// state and never receives m.room.message events.
|
||||||
syncer.OnEventType(event.StateMember, func(ctx context.Context, evt *event.Event) {
|
syncer.OnEventType(event.StateMember, func(ctx context.Context, evt *event.Event) {
|
||||||
@@ -154,7 +171,10 @@ func (l *Listener) Run(ctx context.Context) error {
|
|||||||
msgCtx.EventID = evt.ID.String()
|
msgCtx.EventID = evt.ID.String()
|
||||||
|
|
||||||
// Extract thread root from m.relates_to (Matrix thread support).
|
// Extract thread root from m.relates_to (Matrix thread support).
|
||||||
// Two methods: raw map (fast) + typed content fallback (robust for E2EE).
|
// Three methods in order of preference:
|
||||||
|
// 1. Raw map from decrypted content (fast path)
|
||||||
|
// 2. Typed parsed content (robust after E2EE decryption)
|
||||||
|
// 3. Encrypted event cache (for older clients that omit outer m.relates_to)
|
||||||
if l.cfg.Threads.Enabled {
|
if l.cfg.Threads.Enabled {
|
||||||
if relatesTo, ok := evt.Content.Raw["m.relates_to"].(map[string]any); ok {
|
if relatesTo, ok := evt.Content.Raw["m.relates_to"].(map[string]any); ok {
|
||||||
if relType, _ := relatesTo["rel_type"].(string); relType == "m.thread" {
|
if relType, _ := relatesTo["rel_type"].(string); relType == "m.thread" {
|
||||||
@@ -163,7 +183,7 @@ func (l *Listener) Run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Fallback: use typed content (more robust after E2EE decryption)
|
// Fallback 2: use typed content (more robust after E2EE decryption)
|
||||||
if msgCtx.ThreadID == "" {
|
if msgCtx.ThreadID == "" {
|
||||||
if msgContent, ok := evt.Content.Parsed.(*event.MessageEventContent); ok && msgContent.RelatesTo != nil {
|
if msgContent, ok := evt.Content.Parsed.(*event.MessageEventContent); ok && msgContent.RelatesTo != nil {
|
||||||
if threadParent := msgContent.RelatesTo.GetThreadParent(); threadParent != "" {
|
if threadParent := msgContent.RelatesTo.GetThreadParent(); threadParent != "" {
|
||||||
@@ -172,6 +192,21 @@ func (l *Listener) Run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Fallback 3: encrypted event cache — for E2EE messages where the outer
|
||||||
|
// m.relates_to was NOT propagated into the decrypted content by mautrix-go
|
||||||
|
// (happens with older Element / matrix-js-sdk versions).
|
||||||
|
if msgCtx.ThreadID == "" {
|
||||||
|
if cached, found := l.encryptedRelatesTo.LoadAndDelete(evt.ID); found {
|
||||||
|
if rt, ok := cached.(map[string]any); ok {
|
||||||
|
if relType, _ := rt["rel_type"].(string); relType == "m.thread" {
|
||||||
|
if threadRoot, _ := rt["event_id"].(string); threadRoot != "" {
|
||||||
|
msgCtx.ThreadID = threadRoot
|
||||||
|
l.logger.Debug("thread detected via encrypted event cache", "thread_id", msgCtx.ThreadID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
l.logger.Debug("message parsed",
|
l.logger.Debug("message parsed",
|
||||||
|
|||||||
Reference in New Issue
Block a user