diff --git a/app.md b/app.md index ff86143..8fbcc5f 100644 --- a/app.md +++ b/app.md @@ -2,7 +2,7 @@ name: uniweb lang: ts domain: infra -version: 0.5.0 +version: 0.6.0 description: "Cliente web browser-nativo del bus unibus: SPA de chat (React+Mantine) con wallet por usuario (BIP39) que habla DIRECTO al bus (nats.ws + control-plane HTTPS firmado), sin gateway. La clave privada nunca sale del navegador." tags: [messaging, web, frontend, e2e] uses_functions: [] @@ -102,6 +102,19 @@ programáticos) ve a `unibus`; `uniweb` es el cliente web encima. ## Capability growth log +- v0.6.0 (2026-06-14) — carga el histórico de cada room (`GET /api/rooms/{id}/history`) al + abrirla, con dedup vs live; recargar ya no pierde los mensajes. `ControlPlane.fetchHistory` + pega al control-plane (firmado, mismas cabeceras `X-Unibus-*`) y decodifica cada frame de + base64-std; `BusClient.history` lo descifra/verifica con el MISMO camino de envelope que + `subscribe` (refactor: helper privado `openFrame` compartido por ambos). En `busService`, + `bus.subscribeRoom` (que usa `ChatPanel`) ahora siembra la room con su historia y sigue en + vivo: dedup por `frame.id` con un `Set` por room y los mensajes live se bufferean hasta que + la historia (oldest->newest) se entrega, garantizando el orden; si el endpoint falta + (404/500) cae a live-only como antes. El ts de cada mensaje se deriva del ULID `msgID` + (`ulidTime`, inverso de `newULID`) para que historia y live compartan reloj y ordenen bien; + `ChatPanel` ordena por ts. El sidebar siembra su preview con `history(id, 1)` (sin traer + todo), manteniendo el fallback "—" para rooms vacías. `tsc` + 23 unit (incluye `ulid.test.ts`) + + `pnpm build` verdes. - v0.5.0 (2026-06-14) — nombres legibles en mensajes + sidebar con último mensaje/hora reales + `pnpm dev` documentado. (1) Los mensajes muestran el **handle** del remitente en vez del endpoint id: `ControlPlane.fetchDirectory()` pega al control-plane diff --git a/web/src/ChatPanel.tsx b/web/src/ChatPanel.tsx index a75a447..d30fefc 100644 --- a/web/src/ChatPanel.tsx +++ b/web/src/ChatPanel.tsx @@ -69,16 +69,20 @@ export function ChatPanel({ room }: { room: Room | undefined }) { const [sendError, setSendError] = useState(null); const viewport = useRef(null); - // Abre el stream SSE de la room activa. El gateway entrega historia (rooms - // persistidas) y luego mensajes en vivo, ya descifrados. Dedup por id porque - // un re-render no debe duplicar y el eco del propio envío llega por aquí. + // Carga el histórico de la room activa y luego sigue en vivo: bus.subscribeRoom + // entrega primero la historia (oldest->newest) y después los mensajes en vivo, ya + // descifrados y deduplicados por id. Aquí se mantiene la lista ordenada por ts y se + // deduplica de nuevo por id, porque un re-render no debe duplicar y el eco del propio + // envío también llega por esta vía. useEffect(() => { setMessages([]); setSendError(null); if (!room) return; const close = bus.subscribeRoom(room.id, (m) => { setMessages((prev) => - prev.some((p) => p.id === m.id) ? prev : [...prev, m], + prev.some((p) => p.id === m.id) + ? prev + : [...prev, m].sort((a, b) => a.ts - b.ts), ); }); return close; diff --git a/web/src/bus/client.ts b/web/src/bus/client.ts index e7df65a..8764e20 100644 --- a/web/src/bus/client.ts +++ b/web/src/bus/client.ts @@ -60,6 +60,22 @@ export function newULID(nowMs: number = Date.now()): string { return ts + r; } +// ulidTime decodes the millisecond epoch timestamp a ULID encodes in its first 10 +// Crockford base32 characters (the inverse of newULID's time prefix). A frame carries +// no explicit timestamp on the wire — its ULID id IS the timestamp — so the UI derives +// a message's time from it, which keeps live and replayed-history messages on the same +// clock (the sender's send time, not the receiver's arrival time). Returns 0 for an id +// whose prefix is not valid Crockford base32, so a malformed id never blows up the UI. +export function ulidTime(id: string): number { + let t = 0; + for (let i = 0; i < 10 && i < id.length; i++) { + const v = CROCKFORD.indexOf(id[i].toUpperCase()); + if (v < 0) return 0; + t = t * 32 + v; + } + return t; +} + // --- room envelope (pure, the security-critical core) ------------------------ export interface SealOptions { @@ -156,6 +172,14 @@ interface RoomKeyResponse { epoch: number; } +// HistoryResp is GET /rooms/{id}/history?limit=N: a room's replayed frames, oldest -> +// newest, each base64-standard encoded. Every entry is one marshaled wire frame — the +// exact bytes the live subscription delivers — so the caller opens them with the same +// envelope path as a live message. A room with no stored history yields an empty list. +interface HistoryResp { + messages: string[]; +} + // PolicyWire is the control-plane JSON shape of a policy (snake_case sign_msgs). interface PolicyWire { encrypt: boolean; @@ -331,6 +355,17 @@ export class ControlPlane { for (const member of members) m.set(member.endpoint, base64ToBytesLocal(member.sign_pub)); return m; } + + // fetchHistory replays a room's stored frames (GET /rooms/{id}/history?limit=N), + // returning up to N marshaled wire frames oldest -> newest. The server base64-standard + // encodes each frame; this decodes them back to the raw bytes the live subscription + // delivers, so BusClient.history can open each with the same envelope path as + // subscribe. The caller tolerates this endpoint being absent on older clusters + // (404/500): the error surfaces and BusClient.history's caller falls back to live-only. + async fetchHistory(roomID: string, limit = 200): Promise { + const resp = await this.request("GET", `/rooms/${roomID}/history?limit=${limit}`); + return (resp.messages ?? []).map((b64) => base64ToBytesLocal(b64)); + } } // base64ToBytesLocal decodes standard base64 (kept local to avoid widening crypto's @@ -392,20 +427,55 @@ export class BusClient { await this.transport.publish(room.subject, marshal(f)); } + // openFrame is the shared envelope-opening core behind subscribe (live) and history + // (replay): it unmarshals one wire frame, resolves the sender's signing key (from the + // sign cache, populated by loadSigners for signed rooms) and the room key for the + // frame's epoch, then verifies + decrypts via openRoomMessage. Returns null when the + // frame fails verification or decryption, so both callers drop it the same way. + private async openFrame( + roomID: string, + policy: Policy, + bytes: Uint8Array, + ): Promise<{ frame: Frame; plaintext: Uint8Array } | null> { + const frame = unmarshal(bytes); + const signerPub = policy.signMsgs ? this.signCache.get(roomID)?.get(frame.sender) : undefined; + const roomKey = policy.encrypt ? await this.roomKey(roomID, frame.epoch) : undefined; + const plaintext = openRoomMessage(frame, policy, signerPub, roomKey); + return plaintext ? { frame, plaintext } : null; + } + // subscribe delivers decoded, verified, decrypted messages for a room. Messages // that fail signature verification or decryption are dropped silently. async subscribe(roomID: string, handler: (f: Frame, plaintext: Uint8Array) => void): Promise { const room = await this.control.fetchRoom(roomID); if (room.policy.signMsgs) await this.loadSigners(roomID); return this.transport.subscribe(room.subject, async (_subject, data) => { - const f = unmarshal(data); - const signerPub = room.policy.signMsgs ? this.signCache.get(roomID)?.get(f.sender) : undefined; - const roomKey = room.policy.encrypt ? await this.roomKey(roomID, f.epoch) : undefined; - const plaintext = openRoomMessage(f, room.policy, signerPub, roomKey); - if (plaintext) handler(f, plaintext); + const opened = await this.openFrame(roomID, room.policy, data); + if (opened) handler(opened.frame, opened.plaintext); }); } + // history replays a room's stored messages, decrypted and verified exactly like + // subscribe (NATS delivers live only, so without this a reload shows nothing until + // new traffic arrives). It resolves the room policy, loads the signer keys for a + // signed room, fetches the marshaled frames from the control plane, and opens each + // with the same openFrame path. Frames that fail verification/decryption are dropped. + // Returns the opened messages in the server's order (oldest -> newest). + async history( + roomID: string, + limit = 200, + ): Promise> { + const room = await this.control.fetchRoom(roomID); + if (room.policy.signMsgs) await this.loadSigners(roomID); + const frames = await this.control.fetchHistory(roomID, limit); + const out: Array<{ frame: Frame; plaintext: Uint8Array }> = []; + for (const bytes of frames) { + const opened = await this.openFrame(roomID, room.policy, bytes); + if (opened) out.push(opened); + } + return out; + } + private async loadSigners(roomID: string): Promise { this.signCache.set(roomID, await this.control.signerKeys(roomID)); } diff --git a/web/src/bus/ulid.test.ts b/web/src/bus/ulid.test.ts new file mode 100644 index 0000000..d736725 --- /dev/null +++ b/web/src/bus/ulid.test.ts @@ -0,0 +1,34 @@ +// Tests for ulidTime, the decoder of the millisecond timestamp a ULID encodes in its +// first 10 Crockford base32 characters. A wire frame carries no explicit timestamp — +// its ULID id IS the timestamp — so the UI derives a message's time (and thus its sort +// order, live and replayed-history alike) from this function. These tests pin that it +// is the exact inverse of newULID's time prefix and that it is time-ordered. + +import { describe, it, expect } from "vitest"; +import { newULID, ulidTime } from "./client.js"; + +describe("ulidTime", () => { + it("round-trips the millisecond timestamp newULID encodes", () => { + for (const ms of [0, 1, 1_000, 1_700_000_000_000, 2_000_000_000_000, Date.now()]) { + expect(ulidTime(newULID(ms))).toBe(ms); + } + }); + + it("is monotonic: a later message decodes to a larger time", () => { + const earlier = newULID(1_700_000_000_000); + const later = newULID(1_700_000_001_000); + expect(ulidTime(earlier)).toBeLessThan(ulidTime(later)); + }); + + it("ignores the 16-char random suffix (only the 10-char time prefix matters)", () => { + const ms = 1_736_000_000_000; + // Two ULIDs minted at the same ms differ only in their random tail, yet decode equal. + expect(ulidTime(newULID(ms))).toBe(ms); + expect(ulidTime(newULID(ms))).toBe(ms); + }); + + it("returns 0 for an id whose prefix is not valid Crockford base32", () => { + expect(ulidTime("!!!!!!!!!!xxxxxxxxxxxxxxxx")).toBe(0); + expect(ulidTime("")).toBe(0); + }); +}); diff --git a/web/src/busService.ts b/web/src/busService.ts index 94640d4..20f37d4 100644 --- a/web/src/busService.ts +++ b/web/src/busService.ts @@ -17,6 +17,7 @@ import { WsNatsTransport, hexToBytes, endpointID, + ulidTime, type Identity, type Frame, ModeMatrix, @@ -119,12 +120,14 @@ function require_(): Session { // ---- room store (sidebar metadata) ----------------------------------------- // // The sidebar needs each room's last message and time, plus an unread count for -// rooms the user is NOT currently viewing. There is no message history on the wire -// (NATS delivers live only), so the only way to know a room's latest message is to -// stay subscribed to every room while the app is open. This store owns that: it holds -// the room list, subscribes to each room for metadata, and notifies React watchers on -// every change. ChatPanel keeps its own subscription for the open conversation; this -// store's per-room subscription is independent and only updates sidebar metadata. +// rooms the user is NOT currently viewing. NATS delivers live only, so a live metadata +// subscription per room keeps the sidebar current while the app is open; on first load +// (or after a reload) the control plane's history endpoint seeds each room's last +// message so a room with no live traffic yet still shows its real latest line instead +// of "—". This store owns that: it holds the room list, subscribes to each room for +// metadata, seeds the preview from history, and notifies React watchers on every +// change. ChatPanel keeps its own subscription for the open conversation; this store's +// per-room subscription is independent and only updates sidebar metadata. let roomList: Room[] = []; let activeRoomID = ""; @@ -166,6 +169,30 @@ function trackRoomMeta(roomID: string): void { metaSubs.set(roomID, unsub); } +// seedRoomPreviews fills each room's sidebar preview (last message + time) from the +// control plane's history, best-effort and in the background: the room list renders +// immediately, then each preview updates as its single most-recent stored message +// arrives. It never overwrites a live message that is already newer, and a room with +// genuinely no history keeps the "—" placeholder (lastTs 0). Errors (missing endpoint, +// transient) are swallowed per room so one failure never blocks the others. +function seedRoomPreviews(s: Session): void { + for (const r of roomList) { + s.client + .history(r.id, 1) + .then((items) => { + if (!items.length) return; + const last = items[items.length - 1]; + const m = toMessage(s, last.frame, last.plaintext); + const room = roomList.find((x) => x.id === r.id); + if (!room || m.ts < room.lastTs) return; // a newer live message already won + room.lastTs = m.ts; + room.lastMessage = previewText(m); + notifyRooms(); + }) + .catch(() => {}); + } +} + function untrackAllRooms(): void { for (const unsub of metaSubs.values()) { try { @@ -193,9 +220,24 @@ function resetRoomStore(): void { notifyRooms(); } -// subscribeRoomInternal is the shared core behind bus.subscribeRoom and the store's -// per-room metadata subscription: it decodes each frame into a UI Message and hands it -// to onMessage. Returns a function that cancels the subscription. +// toMessage maps an opened bus frame to the UI's Message. The timestamp comes from the +// frame's ULID id (ulidTime), NOT the arrival time: a frame carries no explicit ts on +// the wire, and deriving it from the id puts live and replayed-history messages on the +// same clock so they sort into one correct order. +function toMessage(s: Session, f: Frame, plaintext: Uint8Array): Message { + return { + id: f.msgID, + sender: f.sender, + body: new TextDecoder().decode(plaintext), + ts: ulidTime(f.msgID), + mine: f.sender === s.endpoint, + }; +} + +// subscribeRoomInternal is the live-only core behind the store's per-room metadata +// subscription (and the live half of subscribeRoomWithHistory): it decodes each frame +// into a UI Message and hands it to onMessage. Returns a function that cancels the +// subscription. function subscribeRoomInternal( roomID: string, onMessage: (m: Message) => void, @@ -205,13 +247,7 @@ function subscribeRoomInternal( let closed = false; s.client .subscribe(roomID, (f: Frame, plaintext: Uint8Array) => { - onMessage({ - id: f.msgID, - sender: f.sender, - body: new TextDecoder().decode(plaintext), - ts: Date.now(), - mine: f.sender === s.endpoint, - }); + onMessage(toMessage(s, f, plaintext)); }) .then((sub) => { if (closed) void sub.unsubscribe(); @@ -224,6 +260,49 @@ function subscribeRoomInternal( }; } +// subscribeRoomWithHistory is what ChatPanel opens for the conversation it is viewing: +// it seeds the room with its stored history (so a reload no longer loses the messages) +// and then keeps it live. History and live are deduplicated by frame id through a +// per-room `seen` set — a message can arrive both ways when it lands between the fetch +// and the subscription. To guarantee history shows first (oldest -> newest) regardless +// of timing, live messages are buffered until the history batch has been delivered, +// then flushed. If history fails or the endpoint is absent (404/500 on an older +// cluster), it is treated as empty and the room runs live-only, exactly as before. +function subscribeRoomWithHistory( + roomID: string, + onMessage: (m: Message) => void, +): () => void { + const s = require_(); + const seen = new Set(); + let historyDone = false; + let pending: Message[] = []; + const deliver = (m: Message): void => { + if (seen.has(m.id)) return; + seen.add(m.id); + onMessage(m); + }; + // Live is subscribed immediately so nothing published during the history fetch is + // missed; messages are buffered until the history batch lands, then delivered. + const liveUnsub = subscribeRoomInternal(roomID, (m) => { + if (historyDone) deliver(m); + else pending.push(m); + }); + s.client + .history(roomID) + .then((items) => { + for (const { frame, plaintext } of items) deliver(toMessage(s, frame, plaintext)); + }) + .catch(() => { + // No history endpoint yet, or a transient failure: fall back to live-only. + }) + .finally(() => { + historyDone = true; + for (const m of pending) deliver(m); + pending = []; + }); + return liveUnsub; +} + // connectSession opens the live bus connection (control plane + nats.ws data plane) // for a wallet identity, WITHOUT touching persistence. The private key is used here // in the browser and never leaves it. @@ -327,6 +406,7 @@ export const bus = { })); for (const r of roomList) trackRoomMeta(r.id); notifyRooms(); + seedRoomPreviews(s); // fill each preview from history without blocking the render }, // setActiveRoom marks the room the user is viewing: its unread count is cleared and @@ -372,11 +452,13 @@ export const bus = { touchSession(); // user activity: restart the idle auto-lock window }, - // subscribeRoom delivers decrypted, verified messages for a room (replaces the old - // SSE streamRoom). Returns an unsubscribe function. ChatPanel uses this for the open - // conversation; the sidebar metadata uses the same core (subscribeRoomInternal). + // subscribeRoom delivers a room's stored history followed by its live messages, both + // decrypted, verified and deduplicated by id (replaces the old SSE streamRoom). + // Returns an unsubscribe function. ChatPanel uses this for the open conversation, so + // reloading the page no longer loses the conversation; the sidebar metadata uses the + // live-only core (subscribeRoomInternal) and seeds its preview from history separately. subscribeRoom(roomID: string, onMessage: (m: Message) => void): () => void { - return subscribeRoomInternal(roomID, onMessage); + return subscribeRoomWithHistory(roomID, onMessage); }, };