From 893df42d29f1db6017818b8d1f16d57f0a408330 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Sun, 14 Jun 2026 19:40:08 +0200 Subject: [PATCH] feat(uniweb): seed each room from history on open, deduped vs live MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a room is opened, load its stored history and keep it live so reloading no longer loses the conversation. - bus.subscribeRoom (used by ChatPanel) now runs subscribeRoomWithHistory: it subscribes live immediately but buffers live messages until the history batch (oldest -> newest) is delivered, guaranteeing history-first order regardless of timing; both halves are deduplicated by frame id via a per-room Set. If the history endpoint is absent (404/500), it falls back to live-only as before. - toMessage maps an opened frame to the UI Message using ulidTime(msgID) for ts (not arrival time), so history and live share one clock and sort correctly; ChatPanel keeps its list ordered by ts. - Sidebar previews: loadRooms seeds each room's last message/time from history(id, 1) in the background, without blocking the render and without overwriting a newer live message; empty rooms keep the "—" placeholder. Co-Authored-By: Claude Opus 4.8 (1M context) --- web/src/ChatPanel.tsx | 12 +++-- web/src/busService.ts | 122 +++++++++++++++++++++++++++++++++++------- 2 files changed, 110 insertions(+), 24 deletions(-) diff --git a/web/src/ChatPanel.tsx b/web/src/ChatPanel.tsx index a75a447..d30fefc 100644 --- a/web/src/ChatPanel.tsx +++ b/web/src/ChatPanel.tsx @@ -69,16 +69,20 @@ export function ChatPanel({ room }: { room: Room | undefined }) { const [sendError, setSendError] = useState(null); const viewport = useRef(null); - // Abre el stream SSE de la room activa. El gateway entrega historia (rooms - // persistidas) y luego mensajes en vivo, ya descifrados. Dedup por id porque - // un re-render no debe duplicar y el eco del propio envío llega por aquí. + // Carga el histórico de la room activa y luego sigue en vivo: bus.subscribeRoom + // entrega primero la historia (oldest->newest) y después los mensajes en vivo, ya + // descifrados y deduplicados por id. Aquí se mantiene la lista ordenada por ts y se + // deduplica de nuevo por id, porque un re-render no debe duplicar y el eco del propio + // envío también llega por esta vía. useEffect(() => { setMessages([]); setSendError(null); if (!room) return; const close = bus.subscribeRoom(room.id, (m) => { setMessages((prev) => - prev.some((p) => p.id === m.id) ? prev : [...prev, m], + prev.some((p) => p.id === m.id) + ? prev + : [...prev, m].sort((a, b) => a.ts - b.ts), ); }); return close; diff --git a/web/src/busService.ts b/web/src/busService.ts index 94640d4..20f37d4 100644 --- a/web/src/busService.ts +++ b/web/src/busService.ts @@ -17,6 +17,7 @@ import { WsNatsTransport, hexToBytes, endpointID, + ulidTime, type Identity, type Frame, ModeMatrix, @@ -119,12 +120,14 @@ function require_(): Session { // ---- room store (sidebar metadata) ----------------------------------------- // // The sidebar needs each room's last message and time, plus an unread count for -// rooms the user is NOT currently viewing. There is no message history on the wire -// (NATS delivers live only), so the only way to know a room's latest message is to -// stay subscribed to every room while the app is open. This store owns that: it holds -// the room list, subscribes to each room for metadata, and notifies React watchers on -// every change. ChatPanel keeps its own subscription for the open conversation; this -// store's per-room subscription is independent and only updates sidebar metadata. +// rooms the user is NOT currently viewing. NATS delivers live only, so a live metadata +// subscription per room keeps the sidebar current while the app is open; on first load +// (or after a reload) the control plane's history endpoint seeds each room's last +// message so a room with no live traffic yet still shows its real latest line instead +// of "—". This store owns that: it holds the room list, subscribes to each room for +// metadata, seeds the preview from history, and notifies React watchers on every +// change. ChatPanel keeps its own subscription for the open conversation; this store's +// per-room subscription is independent and only updates sidebar metadata. let roomList: Room[] = []; let activeRoomID = ""; @@ -166,6 +169,30 @@ function trackRoomMeta(roomID: string): void { metaSubs.set(roomID, unsub); } +// seedRoomPreviews fills each room's sidebar preview (last message + time) from the +// control plane's history, best-effort and in the background: the room list renders +// immediately, then each preview updates as its single most-recent stored message +// arrives. It never overwrites a live message that is already newer, and a room with +// genuinely no history keeps the "—" placeholder (lastTs 0). Errors (missing endpoint, +// transient) are swallowed per room so one failure never blocks the others. +function seedRoomPreviews(s: Session): void { + for (const r of roomList) { + s.client + .history(r.id, 1) + .then((items) => { + if (!items.length) return; + const last = items[items.length - 1]; + const m = toMessage(s, last.frame, last.plaintext); + const room = roomList.find((x) => x.id === r.id); + if (!room || m.ts < room.lastTs) return; // a newer live message already won + room.lastTs = m.ts; + room.lastMessage = previewText(m); + notifyRooms(); + }) + .catch(() => {}); + } +} + function untrackAllRooms(): void { for (const unsub of metaSubs.values()) { try { @@ -193,9 +220,24 @@ function resetRoomStore(): void { notifyRooms(); } -// subscribeRoomInternal is the shared core behind bus.subscribeRoom and the store's -// per-room metadata subscription: it decodes each frame into a UI Message and hands it -// to onMessage. Returns a function that cancels the subscription. +// toMessage maps an opened bus frame to the UI's Message. The timestamp comes from the +// frame's ULID id (ulidTime), NOT the arrival time: a frame carries no explicit ts on +// the wire, and deriving it from the id puts live and replayed-history messages on the +// same clock so they sort into one correct order. +function toMessage(s: Session, f: Frame, plaintext: Uint8Array): Message { + return { + id: f.msgID, + sender: f.sender, + body: new TextDecoder().decode(plaintext), + ts: ulidTime(f.msgID), + mine: f.sender === s.endpoint, + }; +} + +// subscribeRoomInternal is the live-only core behind the store's per-room metadata +// subscription (and the live half of subscribeRoomWithHistory): it decodes each frame +// into a UI Message and hands it to onMessage. Returns a function that cancels the +// subscription. function subscribeRoomInternal( roomID: string, onMessage: (m: Message) => void, @@ -205,13 +247,7 @@ function subscribeRoomInternal( let closed = false; s.client .subscribe(roomID, (f: Frame, plaintext: Uint8Array) => { - onMessage({ - id: f.msgID, - sender: f.sender, - body: new TextDecoder().decode(plaintext), - ts: Date.now(), - mine: f.sender === s.endpoint, - }); + onMessage(toMessage(s, f, plaintext)); }) .then((sub) => { if (closed) void sub.unsubscribe(); @@ -224,6 +260,49 @@ function subscribeRoomInternal( }; } +// subscribeRoomWithHistory is what ChatPanel opens for the conversation it is viewing: +// it seeds the room with its stored history (so a reload no longer loses the messages) +// and then keeps it live. History and live are deduplicated by frame id through a +// per-room `seen` set — a message can arrive both ways when it lands between the fetch +// and the subscription. To guarantee history shows first (oldest -> newest) regardless +// of timing, live messages are buffered until the history batch has been delivered, +// then flushed. If history fails or the endpoint is absent (404/500 on an older +// cluster), it is treated as empty and the room runs live-only, exactly as before. +function subscribeRoomWithHistory( + roomID: string, + onMessage: (m: Message) => void, +): () => void { + const s = require_(); + const seen = new Set(); + let historyDone = false; + let pending: Message[] = []; + const deliver = (m: Message): void => { + if (seen.has(m.id)) return; + seen.add(m.id); + onMessage(m); + }; + // Live is subscribed immediately so nothing published during the history fetch is + // missed; messages are buffered until the history batch lands, then delivered. + const liveUnsub = subscribeRoomInternal(roomID, (m) => { + if (historyDone) deliver(m); + else pending.push(m); + }); + s.client + .history(roomID) + .then((items) => { + for (const { frame, plaintext } of items) deliver(toMessage(s, frame, plaintext)); + }) + .catch(() => { + // No history endpoint yet, or a transient failure: fall back to live-only. + }) + .finally(() => { + historyDone = true; + for (const m of pending) deliver(m); + pending = []; + }); + return liveUnsub; +} + // connectSession opens the live bus connection (control plane + nats.ws data plane) // for a wallet identity, WITHOUT touching persistence. The private key is used here // in the browser and never leaves it. @@ -327,6 +406,7 @@ export const bus = { })); for (const r of roomList) trackRoomMeta(r.id); notifyRooms(); + seedRoomPreviews(s); // fill each preview from history without blocking the render }, // setActiveRoom marks the room the user is viewing: its unread count is cleared and @@ -372,11 +452,13 @@ export const bus = { touchSession(); // user activity: restart the idle auto-lock window }, - // subscribeRoom delivers decrypted, verified messages for a room (replaces the old - // SSE streamRoom). Returns an unsubscribe function. ChatPanel uses this for the open - // conversation; the sidebar metadata uses the same core (subscribeRoomInternal). + // subscribeRoom delivers a room's stored history followed by its live messages, both + // decrypted, verified and deduplicated by id (replaces the old SSE streamRoom). + // Returns an unsubscribe function. ChatPanel uses this for the open conversation, so + // reloading the page no longer loses the conversation; the sidebar metadata uses the + // live-only core (subscribeRoomInternal) and seeds its preview from history separately. subscribeRoom(roomID: string, onMessage: (m: Message) => void): () => void { - return subscribeRoomInternal(roomID, onMessage); + return subscribeRoomWithHistory(roomID, onMessage); }, };