feat(uniweb): room history client (fetchHistory + BusClient.history)
NATS delivers live only, so reloading the page lost a room's history. Add the
client half of the new history endpoint:
- ControlPlane.fetchHistory(roomID, limit): signed GET /rooms/{id}/history?limit=N,
decoding each base64-std frame to the raw bytes the live subscription delivers.
- BusClient.history(roomID, limit): opens each replayed frame (verify + decrypt)
exactly like subscribe, dropping any that fail, oldest -> newest.
- Extract BusClient.openFrame as the shared envelope-opening core for subscribe
and history (no duplication; subscribe behavior unchanged).
- ulidTime(id): decode the ms-epoch a ULID encodes in its first 10 Crockford
chars (inverse of newULID), so a frame's timestamp comes from its id (the wire
carries none). Covered by ulid.test.ts.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+75
-5
@@ -60,6 +60,22 @@ export function newULID(nowMs: number = Date.now()): string {
|
||||
return ts + r;
|
||||
}
|
||||
|
||||
// ulidTime decodes the millisecond epoch timestamp a ULID encodes in its first 10
|
||||
// Crockford base32 characters (the inverse of newULID's time prefix). A frame carries
|
||||
// no explicit timestamp on the wire — its ULID id IS the timestamp — so the UI derives
|
||||
// a message's time from it, which keeps live and replayed-history messages on the same
|
||||
// clock (the sender's send time, not the receiver's arrival time). Returns 0 for an id
|
||||
// whose prefix is not valid Crockford base32, so a malformed id never blows up the UI.
|
||||
export function ulidTime(id: string): number {
|
||||
let t = 0;
|
||||
for (let i = 0; i < 10 && i < id.length; i++) {
|
||||
const v = CROCKFORD.indexOf(id[i].toUpperCase());
|
||||
if (v < 0) return 0;
|
||||
t = t * 32 + v;
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
// --- room envelope (pure, the security-critical core) ------------------------
|
||||
|
||||
export interface SealOptions {
|
||||
@@ -156,6 +172,14 @@ interface RoomKeyResponse {
|
||||
epoch: number;
|
||||
}
|
||||
|
||||
// HistoryResp is GET /rooms/{id}/history?limit=N: a room's replayed frames, oldest ->
|
||||
// newest, each base64-standard encoded. Every entry is one marshaled wire frame — the
|
||||
// exact bytes the live subscription delivers — so the caller opens them with the same
|
||||
// envelope path as a live message. A room with no stored history yields an empty list.
|
||||
interface HistoryResp {
|
||||
messages: string[];
|
||||
}
|
||||
|
||||
// PolicyWire is the control-plane JSON shape of a policy (snake_case sign_msgs).
|
||||
interface PolicyWire {
|
||||
encrypt: boolean;
|
||||
@@ -331,6 +355,17 @@ export class ControlPlane {
|
||||
for (const member of members) m.set(member.endpoint, base64ToBytesLocal(member.sign_pub));
|
||||
return m;
|
||||
}
|
||||
|
||||
// fetchHistory replays a room's stored frames (GET /rooms/{id}/history?limit=N),
|
||||
// returning up to N marshaled wire frames oldest -> newest. The server base64-standard
|
||||
// encodes each frame; this decodes them back to the raw bytes the live subscription
|
||||
// delivers, so BusClient.history can open each with the same envelope path as
|
||||
// subscribe. The caller tolerates this endpoint being absent on older clusters
|
||||
// (404/500): the error surfaces and BusClient.history's caller falls back to live-only.
|
||||
async fetchHistory(roomID: string, limit = 200): Promise<Uint8Array[]> {
|
||||
const resp = await this.request<HistoryResp>("GET", `/rooms/${roomID}/history?limit=${limit}`);
|
||||
return (resp.messages ?? []).map((b64) => base64ToBytesLocal(b64));
|
||||
}
|
||||
}
|
||||
|
||||
// base64ToBytesLocal decodes standard base64 (kept local to avoid widening crypto's
|
||||
@@ -392,20 +427,55 @@ export class BusClient {
|
||||
await this.transport.publish(room.subject, marshal(f));
|
||||
}
|
||||
|
||||
// openFrame is the shared envelope-opening core behind subscribe (live) and history
|
||||
// (replay): it unmarshals one wire frame, resolves the sender's signing key (from the
|
||||
// sign cache, populated by loadSigners for signed rooms) and the room key for the
|
||||
// frame's epoch, then verifies + decrypts via openRoomMessage. Returns null when the
|
||||
// frame fails verification or decryption, so both callers drop it the same way.
|
||||
private async openFrame(
|
||||
roomID: string,
|
||||
policy: Policy,
|
||||
bytes: Uint8Array,
|
||||
): Promise<{ frame: Frame; plaintext: Uint8Array } | null> {
|
||||
const frame = unmarshal(bytes);
|
||||
const signerPub = policy.signMsgs ? this.signCache.get(roomID)?.get(frame.sender) : undefined;
|
||||
const roomKey = policy.encrypt ? await this.roomKey(roomID, frame.epoch) : undefined;
|
||||
const plaintext = openRoomMessage(frame, policy, signerPub, roomKey);
|
||||
return plaintext ? { frame, plaintext } : null;
|
||||
}
|
||||
|
||||
// subscribe delivers decoded, verified, decrypted messages for a room. Messages
|
||||
// that fail signature verification or decryption are dropped silently.
|
||||
async subscribe(roomID: string, handler: (f: Frame, plaintext: Uint8Array) => void): Promise<Subscription> {
|
||||
const room = await this.control.fetchRoom(roomID);
|
||||
if (room.policy.signMsgs) await this.loadSigners(roomID);
|
||||
return this.transport.subscribe(room.subject, async (_subject, data) => {
|
||||
const f = unmarshal(data);
|
||||
const signerPub = room.policy.signMsgs ? this.signCache.get(roomID)?.get(f.sender) : undefined;
|
||||
const roomKey = room.policy.encrypt ? await this.roomKey(roomID, f.epoch) : undefined;
|
||||
const plaintext = openRoomMessage(f, room.policy, signerPub, roomKey);
|
||||
if (plaintext) handler(f, plaintext);
|
||||
const opened = await this.openFrame(roomID, room.policy, data);
|
||||
if (opened) handler(opened.frame, opened.plaintext);
|
||||
});
|
||||
}
|
||||
|
||||
// history replays a room's stored messages, decrypted and verified exactly like
|
||||
// subscribe (NATS delivers live only, so without this a reload shows nothing until
|
||||
// new traffic arrives). It resolves the room policy, loads the signer keys for a
|
||||
// signed room, fetches the marshaled frames from the control plane, and opens each
|
||||
// with the same openFrame path. Frames that fail verification/decryption are dropped.
|
||||
// Returns the opened messages in the server's order (oldest -> newest).
|
||||
async history(
|
||||
roomID: string,
|
||||
limit = 200,
|
||||
): Promise<Array<{ frame: Frame; plaintext: Uint8Array }>> {
|
||||
const room = await this.control.fetchRoom(roomID);
|
||||
if (room.policy.signMsgs) await this.loadSigners(roomID);
|
||||
const frames = await this.control.fetchHistory(roomID, limit);
|
||||
const out: Array<{ frame: Frame; plaintext: Uint8Array }> = [];
|
||||
for (const bytes of frames) {
|
||||
const opened = await this.openFrame(roomID, room.policy, bytes);
|
||||
if (opened) out.push(opened);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
private async loadSigners(roomID: string): Promise<void> {
|
||||
this.signCache.set(roomID, await this.control.signerKeys(roomID));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user