feat(uniweb): seed each room from history on open, deduped vs live

When a room is opened, load its stored history and keep it live so reloading no
longer loses the conversation.

- bus.subscribeRoom (used by ChatPanel) now runs subscribeRoomWithHistory: it
  subscribes live immediately but buffers live messages until the history batch
  (oldest -> newest) is delivered, guaranteeing history-first order regardless of
  timing; both halves are deduplicated by frame id via a per-room Set. If the
  history endpoint is absent (404/500), it falls back to live-only as before.
- toMessage maps an opened frame to the UI Message using ulidTime(msgID) for ts
  (not arrival time), so history and live share one clock and sort correctly;
  ChatPanel keeps its list ordered by ts.
- Sidebar previews: loadRooms seeds each room's last message/time from
  history(id, 1) in the background, without blocking the render and without
  overwriting a newer live message; empty rooms keep the "—" placeholder.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-14 19:40:08 +02:00
parent c142b3a025
commit 893df42d29
2 changed files with 110 additions and 24 deletions
+8 -4
View File
@@ -69,16 +69,20 @@ export function ChatPanel({ room }: { room: Room | undefined }) {
const [sendError, setSendError] = useState<string | null>(null);
const viewport = useRef<HTMLDivElement>(null);
// Abre el stream SSE de la room activa. El gateway entrega historia (rooms
// persistidas) y luego mensajes en vivo, ya descifrados. Dedup por id porque
// un re-render no debe duplicar y el eco del propio envío llega por aquí.
// Carga el histórico de la room activa y luego sigue en vivo: bus.subscribeRoom
// entrega primero la historia (oldest->newest) y después los mensajes en vivo, ya
// descifrados y deduplicados por id. Aquí se mantiene la lista ordenada por ts y se
// deduplica de nuevo por id, porque un re-render no debe duplicar y el eco del propio
// envío también llega por esta vía.
useEffect(() => {
setMessages([]);
setSendError(null);
if (!room) return;
const close = bus.subscribeRoom(room.id, (m) => {
setMessages((prev) =>
prev.some((p) => p.id === m.id) ? prev : [...prev, m],
prev.some((p) => p.id === m.id)
? prev
: [...prev, m].sort((a, b) => a.ts - b.ts),
);
});
return close;
+102 -20
View File
@@ -17,6 +17,7 @@ import {
WsNatsTransport,
hexToBytes,
endpointID,
ulidTime,
type Identity,
type Frame,
ModeMatrix,
@@ -119,12 +120,14 @@ function require_(): Session {
// ---- room store (sidebar metadata) -----------------------------------------
//
// The sidebar needs each room's last message and time, plus an unread count for
// rooms the user is NOT currently viewing. There is no message history on the wire
// (NATS delivers live only), so the only way to know a room's latest message is to
// stay subscribed to every room while the app is open. This store owns that: it holds
// the room list, subscribes to each room for metadata, and notifies React watchers on
// every change. ChatPanel keeps its own subscription for the open conversation; this
// store's per-room subscription is independent and only updates sidebar metadata.
// rooms the user is NOT currently viewing. NATS delivers live only, so a live metadata
// subscription per room keeps the sidebar current while the app is open; on first load
// (or after a reload) the control plane's history endpoint seeds each room's last
// message so a room with no live traffic yet still shows its real latest line instead
// of "—". This store owns that: it holds the room list, subscribes to each room for
// metadata, seeds the preview from history, and notifies React watchers on every
// change. ChatPanel keeps its own subscription for the open conversation; this store's
// per-room subscription is independent and only updates sidebar metadata.
let roomList: Room[] = [];
let activeRoomID = "";
@@ -166,6 +169,30 @@ function trackRoomMeta(roomID: string): void {
metaSubs.set(roomID, unsub);
}
// seedRoomPreviews fills each room's sidebar preview (last message + time) from the
// control plane's history, best-effort and in the background: the room list renders
// immediately, then each preview updates as its single most-recent stored message
// arrives. It never overwrites a live message that is already newer, and a room with
// genuinely no history keeps the "—" placeholder (lastTs 0). Errors (missing endpoint,
// transient) are swallowed per room so one failure never blocks the others.
function seedRoomPreviews(s: Session): void {
for (const r of roomList) {
s.client
.history(r.id, 1)
.then((items) => {
if (!items.length) return;
const last = items[items.length - 1];
const m = toMessage(s, last.frame, last.plaintext);
const room = roomList.find((x) => x.id === r.id);
if (!room || m.ts < room.lastTs) return; // a newer live message already won
room.lastTs = m.ts;
room.lastMessage = previewText(m);
notifyRooms();
})
.catch(() => {});
}
}
function untrackAllRooms(): void {
for (const unsub of metaSubs.values()) {
try {
@@ -193,9 +220,24 @@ function resetRoomStore(): void {
notifyRooms();
}
// subscribeRoomInternal is the shared core behind bus.subscribeRoom and the store's
// per-room metadata subscription: it decodes each frame into a UI Message and hands it
// to onMessage. Returns a function that cancels the subscription.
// toMessage maps an opened bus frame to the UI's Message. The timestamp comes from the
// frame's ULID id (ulidTime), NOT the arrival time: a frame carries no explicit ts on
// the wire, and deriving it from the id puts live and replayed-history messages on the
// same clock so they sort into one correct order.
function toMessage(s: Session, f: Frame, plaintext: Uint8Array): Message {
return {
id: f.msgID,
sender: f.sender,
body: new TextDecoder().decode(plaintext),
ts: ulidTime(f.msgID),
mine: f.sender === s.endpoint,
};
}
// subscribeRoomInternal is the live-only core behind the store's per-room metadata
// subscription (and the live half of subscribeRoomWithHistory): it decodes each frame
// into a UI Message and hands it to onMessage. Returns a function that cancels the
// subscription.
function subscribeRoomInternal(
roomID: string,
onMessage: (m: Message) => void,
@@ -205,13 +247,7 @@ function subscribeRoomInternal(
let closed = false;
s.client
.subscribe(roomID, (f: Frame, plaintext: Uint8Array) => {
onMessage({
id: f.msgID,
sender: f.sender,
body: new TextDecoder().decode(plaintext),
ts: Date.now(),
mine: f.sender === s.endpoint,
});
onMessage(toMessage(s, f, plaintext));
})
.then((sub) => {
if (closed) void sub.unsubscribe();
@@ -224,6 +260,49 @@ function subscribeRoomInternal(
};
}
// subscribeRoomWithHistory is what ChatPanel opens for the conversation it is viewing:
// it seeds the room with its stored history (so a reload no longer loses the messages)
// and then keeps it live. History and live are deduplicated by frame id through a
// per-room `seen` set — a message can arrive both ways when it lands between the fetch
// and the subscription. To guarantee history shows first (oldest -> newest) regardless
// of timing, live messages are buffered until the history batch has been delivered,
// then flushed. If history fails or the endpoint is absent (404/500 on an older
// cluster), it is treated as empty and the room runs live-only, exactly as before.
function subscribeRoomWithHistory(
roomID: string,
onMessage: (m: Message) => void,
): () => void {
const s = require_();
const seen = new Set<string>();
let historyDone = false;
let pending: Message[] = [];
const deliver = (m: Message): void => {
if (seen.has(m.id)) return;
seen.add(m.id);
onMessage(m);
};
// Live is subscribed immediately so nothing published during the history fetch is
// missed; messages are buffered until the history batch lands, then delivered.
const liveUnsub = subscribeRoomInternal(roomID, (m) => {
if (historyDone) deliver(m);
else pending.push(m);
});
s.client
.history(roomID)
.then((items) => {
for (const { frame, plaintext } of items) deliver(toMessage(s, frame, plaintext));
})
.catch(() => {
// No history endpoint yet, or a transient failure: fall back to live-only.
})
.finally(() => {
historyDone = true;
for (const m of pending) deliver(m);
pending = [];
});
return liveUnsub;
}
// connectSession opens the live bus connection (control plane + nats.ws data plane)
// for a wallet identity, WITHOUT touching persistence. The private key is used here
// in the browser and never leaves it.
@@ -327,6 +406,7 @@ export const bus = {
}));
for (const r of roomList) trackRoomMeta(r.id);
notifyRooms();
seedRoomPreviews(s); // fill each preview from history without blocking the render
},
// setActiveRoom marks the room the user is viewing: its unread count is cleared and
@@ -372,11 +452,13 @@ export const bus = {
touchSession(); // user activity: restart the idle auto-lock window
},
// subscribeRoom delivers decrypted, verified messages for a room (replaces the old
// SSE streamRoom). Returns an unsubscribe function. ChatPanel uses this for the open
// conversation; the sidebar metadata uses the same core (subscribeRoomInternal).
// subscribeRoom delivers a room's stored history followed by its live messages, both
// decrypted, verified and deduplicated by id (replaces the old SSE streamRoom).
// Returns an unsubscribe function. ChatPanel uses this for the open conversation, so
// reloading the page no longer loses the conversation; the sidebar metadata uses the
// live-only core (subscribeRoomInternal) and seeds its preview from history separately.
subscribeRoom(roomID: string, onMessage: (m: Message) => void): () => void {
return subscribeRoomInternal(roomID, onMessage);
return subscribeRoomWithHistory(roomID, onMessage);
},
};