diff --git a/web/src/bus/client.ts b/web/src/bus/client.ts index e7df65a..8764e20 100644 --- a/web/src/bus/client.ts +++ b/web/src/bus/client.ts @@ -60,6 +60,22 @@ export function newULID(nowMs: number = Date.now()): string { return ts + r; } +// ulidTime decodes the millisecond epoch timestamp a ULID encodes in its first 10 +// Crockford base32 characters (the inverse of newULID's time prefix). A frame carries +// no explicit timestamp on the wire — its ULID id IS the timestamp — so the UI derives +// a message's time from it, which keeps live and replayed-history messages on the same +// clock (the sender's send time, not the receiver's arrival time). Returns 0 for an id +// whose prefix is not valid Crockford base32, so a malformed id never blows up the UI. +export function ulidTime(id: string): number { + let t = 0; + for (let i = 0; i < 10 && i < id.length; i++) { + const v = CROCKFORD.indexOf(id[i].toUpperCase()); + if (v < 0) return 0; + t = t * 32 + v; + } + return t; +} + // --- room envelope (pure, the security-critical core) ------------------------ export interface SealOptions { @@ -156,6 +172,14 @@ interface RoomKeyResponse { epoch: number; } +// HistoryResp is GET /rooms/{id}/history?limit=N: a room's replayed frames, oldest -> +// newest, each base64-standard encoded. Every entry is one marshaled wire frame — the +// exact bytes the live subscription delivers — so the caller opens them with the same +// envelope path as a live message. A room with no stored history yields an empty list. +interface HistoryResp { + messages: string[]; +} + // PolicyWire is the control-plane JSON shape of a policy (snake_case sign_msgs). interface PolicyWire { encrypt: boolean; @@ -331,6 +355,17 @@ export class ControlPlane { for (const member of members) m.set(member.endpoint, base64ToBytesLocal(member.sign_pub)); return m; } + + // fetchHistory replays a room's stored frames (GET /rooms/{id}/history?limit=N), + // returning up to N marshaled wire frames oldest -> newest. The server base64-standard + // encodes each frame; this decodes them back to the raw bytes the live subscription + // delivers, so BusClient.history can open each with the same envelope path as + // subscribe. The caller tolerates this endpoint being absent on older clusters + // (404/500): the error surfaces and BusClient.history's caller falls back to live-only. + async fetchHistory(roomID: string, limit = 200): Promise { + const resp = await this.request("GET", `/rooms/${roomID}/history?limit=${limit}`); + return (resp.messages ?? []).map((b64) => base64ToBytesLocal(b64)); + } } // base64ToBytesLocal decodes standard base64 (kept local to avoid widening crypto's @@ -392,20 +427,55 @@ export class BusClient { await this.transport.publish(room.subject, marshal(f)); } + // openFrame is the shared envelope-opening core behind subscribe (live) and history + // (replay): it unmarshals one wire frame, resolves the sender's signing key (from the + // sign cache, populated by loadSigners for signed rooms) and the room key for the + // frame's epoch, then verifies + decrypts via openRoomMessage. Returns null when the + // frame fails verification or decryption, so both callers drop it the same way. + private async openFrame( + roomID: string, + policy: Policy, + bytes: Uint8Array, + ): Promise<{ frame: Frame; plaintext: Uint8Array } | null> { + const frame = unmarshal(bytes); + const signerPub = policy.signMsgs ? this.signCache.get(roomID)?.get(frame.sender) : undefined; + const roomKey = policy.encrypt ? await this.roomKey(roomID, frame.epoch) : undefined; + const plaintext = openRoomMessage(frame, policy, signerPub, roomKey); + return plaintext ? { frame, plaintext } : null; + } + // subscribe delivers decoded, verified, decrypted messages for a room. Messages // that fail signature verification or decryption are dropped silently. async subscribe(roomID: string, handler: (f: Frame, plaintext: Uint8Array) => void): Promise { const room = await this.control.fetchRoom(roomID); if (room.policy.signMsgs) await this.loadSigners(roomID); return this.transport.subscribe(room.subject, async (_subject, data) => { - const f = unmarshal(data); - const signerPub = room.policy.signMsgs ? this.signCache.get(roomID)?.get(f.sender) : undefined; - const roomKey = room.policy.encrypt ? await this.roomKey(roomID, f.epoch) : undefined; - const plaintext = openRoomMessage(f, room.policy, signerPub, roomKey); - if (plaintext) handler(f, plaintext); + const opened = await this.openFrame(roomID, room.policy, data); + if (opened) handler(opened.frame, opened.plaintext); }); } + // history replays a room's stored messages, decrypted and verified exactly like + // subscribe (NATS delivers live only, so without this a reload shows nothing until + // new traffic arrives). It resolves the room policy, loads the signer keys for a + // signed room, fetches the marshaled frames from the control plane, and opens each + // with the same openFrame path. Frames that fail verification/decryption are dropped. + // Returns the opened messages in the server's order (oldest -> newest). + async history( + roomID: string, + limit = 200, + ): Promise> { + const room = await this.control.fetchRoom(roomID); + if (room.policy.signMsgs) await this.loadSigners(roomID); + const frames = await this.control.fetchHistory(roomID, limit); + const out: Array<{ frame: Frame; plaintext: Uint8Array }> = []; + for (const bytes of frames) { + const opened = await this.openFrame(roomID, room.policy, bytes); + if (opened) out.push(opened); + } + return out; + } + private async loadSigners(roomID: string): Promise { this.signCache.set(roomID, await this.control.signerKeys(roomID)); } diff --git a/web/src/bus/ulid.test.ts b/web/src/bus/ulid.test.ts new file mode 100644 index 0000000..d736725 --- /dev/null +++ b/web/src/bus/ulid.test.ts @@ -0,0 +1,34 @@ +// Tests for ulidTime, the decoder of the millisecond timestamp a ULID encodes in its +// first 10 Crockford base32 characters. A wire frame carries no explicit timestamp — +// its ULID id IS the timestamp — so the UI derives a message's time (and thus its sort +// order, live and replayed-history alike) from this function. These tests pin that it +// is the exact inverse of newULID's time prefix and that it is time-ordered. + +import { describe, it, expect } from "vitest"; +import { newULID, ulidTime } from "./client.js"; + +describe("ulidTime", () => { + it("round-trips the millisecond timestamp newULID encodes", () => { + for (const ms of [0, 1, 1_000, 1_700_000_000_000, 2_000_000_000_000, Date.now()]) { + expect(ulidTime(newULID(ms))).toBe(ms); + } + }); + + it("is monotonic: a later message decodes to a larger time", () => { + const earlier = newULID(1_700_000_000_000); + const later = newULID(1_700_000_001_000); + expect(ulidTime(earlier)).toBeLessThan(ulidTime(later)); + }); + + it("ignores the 16-char random suffix (only the 10-char time prefix matters)", () => { + const ms = 1_736_000_000_000; + // Two ULIDs minted at the same ms differ only in their random tail, yet decode equal. + expect(ulidTime(newULID(ms))).toBe(ms); + expect(ulidTime(newULID(ms))).toBe(ms); + }); + + it("returns 0 for an id whose prefix is not valid Crockford base32", () => { + expect(ulidTime("!!!!!!!!!!xxxxxxxxxxxxxxxx")).toBe(0); + expect(ulidTime("")).toBe(0); + }); +});