Files
uniweb/web/src/bus/client.ts
T
egutierrez c142b3a025 feat(uniweb): room history client (fetchHistory + BusClient.history)
NATS delivers live only, so reloading the page lost a room's history. Add the
client half of the new history endpoint:

- ControlPlane.fetchHistory(roomID, limit): signed GET /rooms/{id}/history?limit=N,
  decoding each base64-std frame to the raw bytes the live subscription delivers.
- BusClient.history(roomID, limit): opens each replayed frame (verify + decrypt)
  exactly like subscribe, dropping any that fail, oldest -> newest.
- Extract BusClient.openFrame as the shared envelope-opening core for subscribe
  and history (no duplication; subscribe behavior unchanged).
- ulidTime(id): decode the ms-epoch a ULID encodes in its first 10 Crockford
  chars (inverse of newULID), so a frame's timestamp comes from its id (the wire
  carries none). Covered by ulid.test.ts.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 19:39:57 +02:00

492 lines
19 KiB
TypeScript

// The browser-native bus client, ported from Go pkg/client. It does what the Go
// gateway used to do server-side — only now it runs in the browser, so the user's
// private key never leaves the device (issue 0001).
//
// The module is split so the security-critical part is pure and unit-testable
// without a live server:
// - sealRoomMessage / openRoomMessage: the room ENVELOPE (build a frame, AEAD-seal
// the payload with the room key using the subject as AAD, sign it; and the
// inverse: verify the signature and open the payload). These are pure and pinned
// by tests.
// - NatsTransport: the data-plane transport interface. The concrete WebSocket
// implementation (nats.ws) is thin glue wired and E2E-tested in a later phase.
// - ControlPlane: the signed HTTP client for membershipd (rooms, keys, members).
// - BusClient: orchestrates transport + control plane + envelope.
import { Policy, Room } from "./room.js";
import {
Frame,
FrameType,
marshal,
unmarshal,
signingBytes,
} from "./frame.js";
import {
sealAEAD,
openAEAD,
randomNonce,
signEd25519,
verifyEd25519,
sealKeyBox,
openKeyBox,
endpointID,
bytesToBase64,
} from "./crypto.js";
import { signedHeaders, freshNonce } from "./busauth.js";
// Identity is the user's full cryptographic identity. The private halves stay in
// memory in the browser and are NEVER serialized to the network.
export interface Identity {
signPub: Uint8Array;
signPriv: Uint8Array; // 64-byte Ed25519 (seed||pub)
kexPub: Uint8Array;
kexPriv: Uint8Array;
}
// --- ULID (message ids), Crockford base32, time-ordered ----------------------
const CROCKFORD = "0123456789ABCDEFGHJKMNPQRSTVWXYZ";
export function newULID(nowMs: number = Date.now()): string {
let ts = "";
let t = nowMs;
for (let i = 0; i < 10; i++) {
ts = CROCKFORD[t % 32] + ts;
t = Math.floor(t / 32);
}
const rnd = crypto.getRandomValues(new Uint8Array(16));
let r = "";
for (let i = 0; i < 16; i++) r += CROCKFORD[rnd[i] & 31];
return ts + r;
}
// ulidTime decodes the millisecond epoch timestamp a ULID encodes in its first 10
// Crockford base32 characters (the inverse of newULID's time prefix). A frame carries
// no explicit timestamp on the wire — its ULID id IS the timestamp — so the UI derives
// a message's time from it, which keeps live and replayed-history messages on the same
// clock (the sender's send time, not the receiver's arrival time). Returns 0 for an id
// whose prefix is not valid Crockford base32, so a malformed id never blows up the UI.
export function ulidTime(id: string): number {
let t = 0;
for (let i = 0; i < 10 && i < id.length; i++) {
const v = CROCKFORD.indexOf(id[i].toUpperCase());
if (v < 0) return 0;
t = t * 32 + v;
}
return t;
}
// --- room envelope (pure, the security-critical core) ------------------------
export interface SealOptions {
type: FrameType;
subject: string;
sender: string; // this peer's endpoint id
signPriv: Uint8Array;
policy: Policy;
epoch: number;
plaintext: Uint8Array;
roomKey?: Uint8Array; // required when policy.encrypt
threadID?: string;
replyTo?: string;
msgID?: string; // defaults to a fresh ULID
}
// sealRoomMessage builds a wire frame from plaintext exactly as Go's publishFrame:
// for encrypted rooms the payload is ChaCha20-Poly1305-sealed with the room key and
// the SUBJECT as additional authenticated data; for signed rooms an Ed25519
// signature over the canonical bytes is attached.
export function sealRoomMessage(o: SealOptions): Frame {
const f: Frame = {
type: o.type,
subject: o.subject,
sender: o.sender,
msgID: o.msgID ?? newULID(),
epoch: o.epoch,
threadID: o.threadID,
replyTo: o.replyTo,
};
if (o.policy.encrypt) {
if (!o.roomKey) throw new Error("sealRoomMessage: encrypted room requires roomKey");
const nonce = randomNonce();
f.nonce = nonce;
f.payload = sealAEAD(o.roomKey, nonce, o.plaintext, new TextEncoder().encode(o.subject));
} else {
f.payload = o.plaintext;
}
if (o.policy.signMsgs) {
f.sig = signEd25519(o.signPriv, signingBytes(f));
}
return f;
}
// openRoomMessage is the inverse: it verifies the signature (for signed rooms) and
// opens the AEAD payload (for encrypted rooms), returning the plaintext or null if
// verification/decryption fails (the caller drops the message).
export function openRoomMessage(
f: Frame,
policy: Policy,
signerPub: Uint8Array | undefined,
roomKey: Uint8Array | undefined,
): Uint8Array | null {
if (policy.signMsgs) {
if (!f.sig || !signerPub || !verifyEd25519(f.sig, signingBytes(f), signerPub)) return null;
}
if (policy.encrypt) {
if (!f.nonce || !f.payload || !roomKey) return null;
try {
return openAEAD(roomKey, f.nonce, f.payload, new TextEncoder().encode(f.subject));
} catch {
return null;
}
}
return f.payload ?? new Uint8Array(0);
}
// --- data-plane transport ----------------------------------------------------
export type MessageHandler = (subject: string, data: Uint8Array) => void;
// NatsTransport abstracts the NATS data plane so BusClient's logic is testable with
// a mock and the concrete WebSocket transport (nats.ws) stays swappable. The browser
// transport connects over ws(s):// using a NATS nkey authenticator built from the
// user's Ed25519 identity (see busauth.natsAuthenticator).
export interface NatsTransport {
publish(subject: string, data: Uint8Array): void | Promise<void>;
subscribe(subject: string, handler: MessageHandler): Promise<Subscription>;
// reconnect rebuilds the connection so the server's per-subject ACL re-evaluates
// this peer's room membership (a room created after connecting is otherwise not in
// the grant). Active subscriptions are dropped; re-subscribe after calling it.
reconnect(): Promise<void>;
close(): Promise<void>;
}
export interface Subscription {
unsubscribe(): void | Promise<void>;
}
// --- control plane (signed HTTP to membershipd) ------------------------------
interface RoomKeyResponse {
sealed_key: string; // base64 sealed box of the room key for this peer
epoch: number;
}
// HistoryResp is GET /rooms/{id}/history?limit=N: a room's replayed frames, oldest ->
// newest, each base64-standard encoded. Every entry is one marshaled wire frame — the
// exact bytes the live subscription delivers — so the caller opens them with the same
// envelope path as a live message. A room with no stored history yields an empty list.
interface HistoryResp {
messages: string[];
}
// PolicyWire is the control-plane JSON shape of a policy (snake_case sign_msgs).
interface PolicyWire {
encrypt: boolean;
persist: boolean;
sign_msgs: boolean;
}
// RoomResp is GET /rooms/{id}: the room metadata WITHOUT the id (the caller knows it)
// and with the policy nested under snake_case keys.
interface RoomResp {
subject: string;
epoch: number;
policy: PolicyWire;
}
interface MemberJSON {
endpoint: string;
sign_pub: string; // base64
}
// DirectoryMemberWire is one row of GET /directory: a cluster-wide member with its
// human handle and role. sign_pub here is 64-hex (the raw Ed25519 public key), and
// endpoint matches endpointID(signPub) byte for byte.
interface DirectoryMemberWire {
sign_pub: string; // 64-hex
endpoint: string; // base64url-nopad, == endpointID(signPub)
handle: string;
role: string;
}
interface DirectoryResp {
members: DirectoryMemberWire[];
}
// DirectoryEntry is the SDK shape of one directory member: the readable handle keyed
// by the stable endpoint id, so the UI can show a name instead of the raw id.
export interface DirectoryEntry {
signPub: string; // 64-hex
endpoint: string;
handle: string;
role: string;
}
// MemberRoomWire is one row of GET /members/{endpoint}/rooms.
interface MemberRoomWire {
room_id: string;
subject: string;
epoch: number;
policy: PolicyWire;
}
// ControlPlane is the signed HTTP client for the membershipd control plane. Every
// request carries the X-Unibus-* auth headers (busauth.signedHeaders). It pins no
// host so it can target any cluster node.
export class ControlPlane {
constructor(
private baseURL: string,
private id: Identity,
) {}
private async request<T>(method: string, path: string, body?: unknown): Promise<T> {
const bodyBytes = body === undefined ? new Uint8Array(0) : new TextEncoder().encode(JSON.stringify(body));
const headers = signedHeaders(
this.id.signPub,
this.id.signPriv,
method,
path,
String(Math.floor(Date.now() / 1000)),
freshNonce(),
bodyBytes,
);
const init: RequestInit = { method, headers: { ...headers } };
if (body !== undefined) {
(init.headers as Record<string, string>)["Content-Type"] = "application/json";
init.body = bodyBytes;
}
const resp = await fetch(this.baseURL + path, init);
if (!resp.ok) {
let msg = `${method} ${path} -> ${resp.status}`;
try {
const e = await resp.json();
if (e?.error) msg = `${e.error} (HTTP ${resp.status})`;
} catch {
/* keep the generic message */
}
throw new Error(`control plane: ${msg}`);
}
return (await resp.json()) as T;
}
// fetchRoom resolves room metadata, mapping the control-plane wire shape
// (snake_case policy, no id) to the SDK's Room type.
async fetchRoom(roomID: string): Promise<Room> {
const r = await this.request<RoomResp>("GET", `/rooms/${roomID}`);
return {
id: roomID,
subject: r.subject,
epoch: r.epoch,
policy: { encrypt: r.policy.encrypt, persist: r.policy.persist, signMsgs: r.policy.sign_msgs },
};
}
// createRoom creates a room owned by this peer. For an encrypted room it mints a
// fresh 32-byte room key, seals it to the owner's own X25519 key (sealed box), and
// ships it as sealed_key_self so the server can store the owner's copy without ever
// seeing the key. Returns the new room id and (for encrypted rooms) the key.
async createRoom(subject: string, policy: Policy): Promise<{ roomID: string; key?: Uint8Array }> {
const body: Record<string, unknown> = {
subject,
policy: { encrypt: policy.encrypt, persist: policy.persist, sign_msgs: policy.signMsgs },
owner: {
endpoint: endpointID(this.id.signPub),
sign_pub: bytesToBase64(this.id.signPub),
kex_pub: bytesToBase64(this.id.kexPub),
},
};
let key: Uint8Array | undefined;
if (policy.encrypt) {
key = crypto.getRandomValues(new Uint8Array(32));
body.sealed_key_self = bytesToBase64(sealKeyBox(this.id.kexPub, key));
}
const resp = await this.request<{ room_id: string }>("POST", "/rooms", body);
return { roomID: resp.room_id, key };
}
// fetchRoomKey fetches the sealed room key for this peer and opens it with the
// user's X25519 private key. The server only ever stores the key sealed for each
// member, so it cannot read it.
async fetchRoomKey(roomID: string, epoch: number): Promise<{ key: Uint8Array; epoch: number }> {
const q = epoch > 0 ? `&epoch=${epoch}` : "";
const resp = await this.request<RoomKeyResponse>(
"GET",
`/rooms/${roomID}/key?endpoint=${endpointID(this.id.signPub)}${q}`,
);
const sealed = base64ToBytesLocal(resp.sealed_key);
const key = openKeyBox(this.id.kexPub, this.id.kexPriv, sealed);
if (!key) throw new Error("control plane: failed to open room key");
return { key, epoch: resp.epoch };
}
// listMemberRooms returns the rooms a peer belongs to (GET /members/{endpoint}/rooms),
// mapping the wire shape (room_id, snake_case policy) to the SDK Room type.
async listMemberRooms(endpoint: string): Promise<Room[]> {
const wire = await this.request<MemberRoomWire[]>("GET", `/members/${endpoint}/rooms`);
return wire.map((r) => ({
id: r.room_id,
subject: r.subject,
epoch: r.epoch,
policy: { encrypt: r.policy.encrypt, persist: r.policy.persist, signMsgs: r.policy.sign_msgs },
}));
}
// fetchDirectory returns the cluster-wide member directory (GET /api/directory), so
// the UI can resolve a message sender's endpoint id to a readable handle. The
// request is signed like every other control-plane call. The caller is expected to
// tolerate this endpoint being absent on older clusters (404) and fall back to the
// short id; this method only maps the wire shape and lets transport errors surface.
async fetchDirectory(): Promise<DirectoryEntry[]> {
const resp = await this.request<DirectoryResp>("GET", "/directory");
return (resp.members ?? []).map((m) => ({
signPub: m.sign_pub,
endpoint: m.endpoint,
handle: m.handle,
role: m.role,
}));
}
// listMembers returns the room's members keyed by endpoint, so a receiver can find
// a sender's signing public key to verify message signatures.
async signerKeys(roomID: string): Promise<Map<string, Uint8Array>> {
const members = await this.request<MemberJSON[]>("GET", `/rooms/${roomID}/members`);
const m = new Map<string, Uint8Array>();
for (const member of members) m.set(member.endpoint, base64ToBytesLocal(member.sign_pub));
return m;
}
// fetchHistory replays a room's stored frames (GET /rooms/{id}/history?limit=N),
// returning up to N marshaled wire frames oldest -> newest. The server base64-standard
// encodes each frame; this decodes them back to the raw bytes the live subscription
// delivers, so BusClient.history can open each with the same envelope path as
// subscribe. The caller tolerates this endpoint being absent on older clusters
// (404/500): the error surfaces and BusClient.history's caller falls back to live-only.
async fetchHistory(roomID: string, limit = 200): Promise<Uint8Array[]> {
const resp = await this.request<HistoryResp>("GET", `/rooms/${roomID}/history?limit=${limit}`);
return (resp.messages ?? []).map((b64) => base64ToBytesLocal(b64));
}
}
// base64ToBytesLocal decodes standard base64 (kept local to avoid widening crypto's
// surface; identical behavior to crypto.base64ToBytes).
function base64ToBytesLocal(s: string): Uint8Array {
const bin = atob(s);
const out = new Uint8Array(bin.length);
for (let i = 0; i < bin.length; i++) out[i] = bin.charCodeAt(i);
return out;
}
// --- BusClient ---------------------------------------------------------------
// BusClient ties the data plane (transport) and control plane together, applying the
// room envelope on publish and subscribe. It holds the user's identity in memory and
// never sends the private key anywhere.
export class BusClient {
private endpoint: string;
private keyCache = new Map<string, Map<number, Uint8Array>>(); // roomID -> epoch -> K
private signCache = new Map<string, Map<string, Uint8Array>>(); // roomID -> endpoint -> signPub
constructor(
private id: Identity,
private transport: NatsTransport,
private control: ControlPlane,
) {
this.endpoint = endpointID(id.signPub);
}
private async roomKey(roomID: string, epoch: number): Promise<Uint8Array> {
const cached = this.keyCache.get(roomID)?.get(epoch);
if (cached) return cached;
const { key, epoch: ep } = await this.control.fetchRoomKey(roomID, epoch);
let byEpoch = this.keyCache.get(roomID);
if (!byEpoch) {
byEpoch = new Map();
this.keyCache.set(roomID, byEpoch);
}
byEpoch.set(ep, key);
return key;
}
// publish seals plaintext per the room policy and publishes it on the data plane.
async publish(roomID: string, plaintext: Uint8Array, opts: { threadID?: string; replyTo?: string; type?: FrameType } = {}): Promise<void> {
const room = await this.control.fetchRoom(roomID);
const roomKey = room.policy.encrypt ? await this.roomKey(roomID, room.epoch) : undefined;
const f = sealRoomMessage({
type: opts.type ?? FrameType.PUB,
subject: room.subject,
sender: this.endpoint,
signPriv: this.id.signPriv,
policy: room.policy,
epoch: room.epoch,
plaintext,
roomKey,
threadID: opts.threadID,
replyTo: opts.replyTo,
});
await this.transport.publish(room.subject, marshal(f));
}
// openFrame is the shared envelope-opening core behind subscribe (live) and history
// (replay): it unmarshals one wire frame, resolves the sender's signing key (from the
// sign cache, populated by loadSigners for signed rooms) and the room key for the
// frame's epoch, then verifies + decrypts via openRoomMessage. Returns null when the
// frame fails verification or decryption, so both callers drop it the same way.
private async openFrame(
roomID: string,
policy: Policy,
bytes: Uint8Array,
): Promise<{ frame: Frame; plaintext: Uint8Array } | null> {
const frame = unmarshal(bytes);
const signerPub = policy.signMsgs ? this.signCache.get(roomID)?.get(frame.sender) : undefined;
const roomKey = policy.encrypt ? await this.roomKey(roomID, frame.epoch) : undefined;
const plaintext = openRoomMessage(frame, policy, signerPub, roomKey);
return plaintext ? { frame, plaintext } : null;
}
// subscribe delivers decoded, verified, decrypted messages for a room. Messages
// that fail signature verification or decryption are dropped silently.
async subscribe(roomID: string, handler: (f: Frame, plaintext: Uint8Array) => void): Promise<Subscription> {
const room = await this.control.fetchRoom(roomID);
if (room.policy.signMsgs) await this.loadSigners(roomID);
return this.transport.subscribe(room.subject, async (_subject, data) => {
const opened = await this.openFrame(roomID, room.policy, data);
if (opened) handler(opened.frame, opened.plaintext);
});
}
// history replays a room's stored messages, decrypted and verified exactly like
// subscribe (NATS delivers live only, so without this a reload shows nothing until
// new traffic arrives). It resolves the room policy, loads the signer keys for a
// signed room, fetches the marshaled frames from the control plane, and opens each
// with the same openFrame path. Frames that fail verification/decryption are dropped.
// Returns the opened messages in the server's order (oldest -> newest).
async history(
roomID: string,
limit = 200,
): Promise<Array<{ frame: Frame; plaintext: Uint8Array }>> {
const room = await this.control.fetchRoom(roomID);
if (room.policy.signMsgs) await this.loadSigners(roomID);
const frames = await this.control.fetchHistory(roomID, limit);
const out: Array<{ frame: Frame; plaintext: Uint8Array }> = [];
for (const bytes of frames) {
const opened = await this.openFrame(roomID, room.policy, bytes);
if (opened) out.push(opened);
}
return out;
}
private async loadSigners(roomID: string): Promise<void> {
this.signCache.set(roomID, await this.control.signerKeys(roomID));
}
// refresh reconnects the data plane so the server's per-subject ACL re-evaluates
// this peer's room membership. Call it after creating or joining a room while
// connected: NATS freezes a connection's publishable/subscribable subjects at
// connect time, so the new room's subject only becomes usable on a fresh
// connection. Active subscriptions are dropped — re-subscribe afterwards.
async refresh(): Promise<void> {
await this.transport.reconnect();
}
}