diff --git a/app.md b/app.md index baf7745..90ba159 100644 --- a/app.md +++ b/app.md @@ -17,7 +17,7 @@ uses_types: [] framework: "" entry_point: "cmd/membershipd" dir_path: "projects/message_bus/apps/unibus" -repo_url: "" +repo_url: "https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/dataforge/unibus" service: port: 8470 health_endpoint: /healthz diff --git a/go.mod b/go.mod index 18577d6..f879c6a 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect golang.org/x/crypto v0.51.0 // indirect golang.org/x/sys v0.44.0 // indirect + golang.org/x/text v0.37.0 // indirect golang.org/x/time v0.7.0 // indirect modernc.org/libc v1.70.0 // indirect modernc.org/mathutil v1.7.1 // indirect diff --git a/go.sum b/go.sum index 9b819b1..cba8fff 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ= golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= +golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8= diff --git a/pkg/client/client.go b/pkg/client/client.go index aa93308..66ed0c5 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -14,6 +14,7 @@ package client import ( "bytes" + "context" "crypto/rand" "encoding/json" "fmt" @@ -27,6 +28,7 @@ import ( "github.com/enmanuel/unibus/pkg/frame" "github.com/enmanuel/unibus/pkg/room" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) // Endpoint is the public identity of a peer. @@ -41,6 +43,7 @@ type Client struct { id cs.Identity endpoint string nc *nats.Conn + js jetstream.JetStream // durable plane for rooms with Policy.Persist ctrlURL string http *http.Client @@ -56,10 +59,19 @@ func New(natsURL, ctrlURL string, id cs.Identity) (*Client, error) { if err != nil { return nil, fmt.Errorf("client: connect nats %q: %w", natsURL, err) } + // JetStream context for the durable plane. Obtaining it does not require any + // stream to exist yet and has no effect on cleartext/ephemeral rooms — those + // keep using core nc.Publish / nc.Subscribe untouched. + js, err := jetstream.New(nc) + if err != nil { + nc.Close() + return nil, fmt.Errorf("client: init jetstream: %w", err) + } return &Client{ id: id, endpoint: frame.EndpointID(id.SignPub), nc: nc, + js: js, ctrlURL: ctrlURL, http: &http.Client{Timeout: 10 * time.Second}, keyCache: map[string]map[int][]byte{}, @@ -257,6 +269,15 @@ func (c *Client) CreateRoom(subject string, p room.Policy) (string, error) { if p.Encrypt { c.cacheKey(resp.RoomID, 1, k) } + // For persisted rooms, provision the durable JetStream stream up front so the + // first publish (even before any subscriber exists) is captured for history. + if p.Persist { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := c.ensureStream(ctx, resp.RoomID, subject); err != nil { + return "", err + } + } return resp.RoomID, nil } @@ -406,51 +427,117 @@ func (c *Client) Publish(roomID string, plaintext []byte) error { if err != nil { return fmt.Errorf("client: marshal frame: %w", err) } + // Persisted rooms go through JetStream (durable, acked); ephemeral rooms keep + // the exact core-NATS publish they had before. + if info.Policy.Persist { + return c.publishPersistent(roomID, info.Subject, b) + } return c.nc.Publish(info.Subject, b) } +// Sub is a transport-agnostic handle to an active room subscription. It wraps +// either a core NATS subscription (ephemeral rooms) or a JetStream durable +// consumer (persisted rooms) behind a single Unsubscribe() method, so callers +// (and tests) treat both planes uniformly. For a persisted room, Unsubscribe +// stops local delivery but leaves the durable consumer's ack position on the +// server, so a later Subscribe with the same peer resumes (offline replay). +type Sub struct { + nsub *nats.Subscription + cc jetstream.ConsumeContext +} + +// Unsubscribe stops delivery for this subscription. The error return keeps the +// signature compatible with *nats.Subscription's Unsubscribe. +func (s *Sub) Unsubscribe() error { + if s == nil { + return nil + } + if s.nsub != nil { + return s.nsub.Unsubscribe() + } + if s.cc != nil { + s.cc.Stop() + } + return nil +} + // Subscribe subscribes to a room and invokes handler for each message with the // decoded frame and (for encrypted rooms) the decrypted plaintext. Signature // verification and epoch-driven key refresh happen transparently. Messages that // fail verification or decryption are dropped (handler not called). -func (c *Client) Subscribe(roomID string, handler func(f frame.Frame, plaintext []byte)) (*nats.Subscription, error) { +// +// For ephemeral rooms (Policy.Persist == false) this is a plain core-NATS +// subscription, identical to before. For persisted rooms it binds a per-peer +// durable JetStream consumer with DeliverAll, so a late joiner receives the +// full history (scrollback) and a reconnecting peer resumes from its last ack +// (offline replay). The frame-decode / verify / decrypt logic is shared between +// both planes via processFrame. +func (c *Client) Subscribe(roomID string, handler func(f frame.Frame, plaintext []byte)) (*Sub, error) { info, err := c.fetchRoom(roomID) if err != nil { return nil, err } - return c.nc.Subscribe(info.Subject, func(msg *nats.Msg) { - f, err := frame.Unmarshal(msg.Data) + deliver := func(data []byte) { + c.processFrame(roomID, info, data, handler) + } + if info.Policy.Persist { + cc, err := c.subscribePersistent(roomID, info.Subject, deliver) if err != nil { - return + return nil, err } - if info.Policy.SignMsgs && f.Sig != nil { - pub, err := c.signerPub(roomID, f.Sender) - if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) { - return // unauthenticated frame: drop - } - } - plaintext := f.Payload - // Decrypt only inline payloads. Media frames carry their bytes in the - // blob store (referenced by f.Blob) with the nonce in BlobRef.Nonce; - // the handler decrypts those on demand via FetchMedia. A frame with an - // inline ciphertext always has a non-empty Nonce. - if info.Policy.Encrypt && len(f.Nonce) > 0 && len(f.Payload) > 0 { - k, ok := c.getCachedKey(roomID, f.Epoch) - if !ok { - // Sender used a newer (or unknown) epoch: refresh K from the control plane. - k, _, err = c.fetchKey(roomID, f.Epoch) - if err != nil { - return // cannot obtain key for this epoch (e.g. we were kicked): drop - } - } - pt, err := cs.OpenAEAD(k, f.Nonce, f.Payload, []byte(info.Subject)) - if err != nil { - return // cannot decrypt (wrong epoch/kicked): drop - } - plaintext = pt - } - handler(f, plaintext) + return &Sub{cc: cc}, nil + } + nsub, err := c.nc.Subscribe(info.Subject, func(msg *nats.Msg) { + deliver(msg.Data) }) + if err != nil { + return nil, err + } + return &Sub{nsub: nsub}, nil +} + +// processFrame decodes one wire message, verifies its signature and (for +// encrypted rooms) decrypts its inline payload, then invokes handler. Messages +// that fail verification or decryption are dropped (handler not called). This +// is the single code path shared by the ephemeral and persisted subscribe +// planes so their decode/verify/decrypt semantics never drift apart. +func (c *Client) processFrame(roomID string, info roomView, data []byte, handler func(f frame.Frame, plaintext []byte)) { + f, err := frame.Unmarshal(data) + if err != nil { + return + } + if info.Policy.SignMsgs && f.Sig != nil { + pub, err := c.signerPub(roomID, f.Sender) + if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) { + return // unauthenticated frame: drop + } + } + plaintext := f.Payload + // Decrypt only inline payloads. Media frames carry their bytes in the + // blob store (referenced by f.Blob) with the nonce in BlobRef.Nonce; + // the handler decrypts those on demand via FetchMedia. A frame with an + // inline ciphertext always has a non-empty Nonce. + if info.Policy.Encrypt && len(f.Nonce) > 0 && len(f.Payload) > 0 { + k, ok := c.getCachedKey(roomID, f.Epoch) + if !ok { + // Sender used a newer (or unknown) epoch: refresh K from the control plane. + k, _, err = c.fetchKey(roomID, f.Epoch) + if err != nil { + // Cannot obtain the key for this epoch. For persisted history this is + // expected and NOT fatal: a member invited at a later epoch reading + // older history (or a kicked peer) simply cannot read those frames. + // Skip this message and keep processing the rest (megolm semantics: + // new members do not read prior history). + return + } + } + pt, err := cs.OpenAEAD(k, f.Nonce, f.Payload, []byte(info.Subject)) + if err != nil { + return // cannot decrypt (wrong epoch/kicked): skip this frame, continue + } + plaintext = pt + } + handler(f, plaintext) } // ---- request/reply (cleartext v1) ---------------------------------------- @@ -570,6 +657,12 @@ func (c *Client) PublishMedia(roomID string, data []byte) error { if err != nil { return fmt.Errorf("client: marshal media frame: %w", err) } + // Persisted rooms route the media frame (the BlobRef envelope, not the blob + // bytes) through JetStream so it appears in history/replay like any other + // frame; ephemeral rooms keep the original core-NATS publish. + if info.Policy.Persist { + return c.publishPersistent(roomID, info.Subject, b) + } return c.nc.Publish(info.Subject, b) } diff --git a/pkg/client/persist.go b/pkg/client/persist.go new file mode 100644 index 0000000..5b537c4 --- /dev/null +++ b/pkg/client/persist.go @@ -0,0 +1,134 @@ +// This file adds optional per-room durable persistence on top of the core +// (ephemeral) NATS data plane. Persistence is gated entirely by the per-room +// RoomPolicy.Persist flag: +// +// - Persist == false (room.ModeNATS): nothing here runs. Publish/Subscribe +// use plain core NATS exactly as before — fast, fan-out, no history. +// - Persist == true (room.ModeMatrix): the room's subject is captured by a +// durable JetStream stream. Late joiners receive the full backlog +// (scrollback) and a peer that was offline resumes from its last ack +// (replay). The Frame, the encryption and the signature are identical to +// the ephemeral path; only the transport changes. +// +// We use the modern github.com/nats-io/nats.go/jetstream API throughout (not +// the legacy nc.JetStream() context), and we are consistent about it across +// the whole client. +package client + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +// streamName derives a JetStream stream name from a room id. JetStream stream +// (and durable consumer) names cannot contain '.', '*', '>', path separators +// or whitespace, so we sanitize any char outside [A-Za-z0-9_] to '_' and prefix +// with "UNIBUS_" to namespace the bus's streams. +func streamName(roomID string) string { + return "UNIBUS_" + sanitizeJSName(roomID) +} + +// sanitizeJSName replaces every rune outside [A-Za-z0-9_] with '_' so the +// result is a legal JetStream stream/consumer name token. +func sanitizeJSName(s string) string { + var b strings.Builder + b.Grow(len(s)) + for _, r := range s { + switch { + case r >= 'a' && r <= 'z', r >= 'A' && r <= 'Z', r >= '0' && r <= '9', r == '_': + b.WriteRune(r) + default: + b.WriteRune('_') + } + } + return b.String() +} + +// durableName derives a per-peer durable consumer name for a room. The same +// (roomID, endpointID) pair always maps to the same durable, so a peer that +// reconnects continues from its last acked message → replay of what it missed. +func durableName(roomID, endpointID string) string { + return sanitizeJSName(roomID) + "_" + sanitizeJSName(endpointID) +} + +// ensureStream idempotently creates (or no-ops on) a file-backed JetStream +// stream that captures the given subject for a persisted room. It is safe to +// call repeatedly: CreateOrUpdateStream returns the existing stream unchanged +// when the config matches. Called from CreateRoom(Persist), and defensively +// from the first Publish/Subscribe to a persisted room. +func (c *Client) ensureStream(ctx context.Context, roomID, subject string) error { + if c.js == nil { + return errors.New("client: JetStream not initialized") + } + cfg := jetstream.StreamConfig{ + Name: streamName(roomID), + Subjects: []string{subject}, + Retention: jetstream.LimitsPolicy, + Storage: jetstream.FileStorage, + } + if _, err := c.js.CreateOrUpdateStream(ctx, cfg); err != nil { + return fmt.Errorf("client: ensure stream for room %s: %w", roomID, err) + } + return nil +} + +// publishPersistent publishes a frame to a persisted room via JetStream and +// waits for the server ack, guaranteeing the message is durably stored before +// returning. The stream is ensured first so the very first publish to a freshly +// created room (or after a restart) cannot be lost for lack of a stream. +func (c *Client) publishPersistent(roomID, subject string, frameBytes []byte) error { + if c.js == nil { + return errors.New("client: JetStream not initialized") + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := c.ensureStream(ctx, roomID, subject); err != nil { + return err + } + if _, err := c.js.Publish(ctx, subject, frameBytes); err != nil { + return fmt.Errorf("client: jetstream publish to room %s: %w", roomID, err) + } + return nil +} + +// subscribePersistent binds a per-peer durable consumer to the room's stream +// with DeliverAll + explicit ack. DeliverAll delivers the full backlog +// (scrollback) on first bind and then live messages; the durable name makes a +// reconnecting peer resume from its last ack (offline replay). Each delivered +// message is acked only after the supplied onMsg callback has processed it. +func (c *Client) subscribePersistent(roomID, subject string, onMsg func(data []byte)) (jetstream.ConsumeContext, error) { + if c.js == nil { + return nil, errors.New("client: JetStream not initialized") + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := c.ensureStream(ctx, roomID, subject); err != nil { + return nil, err + } + stream, err := c.js.Stream(ctx, streamName(roomID)) + if err != nil { + return nil, fmt.Errorf("client: lookup stream for room %s: %w", roomID, err) + } + cons, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Durable: durableName(roomID, c.endpoint), + DeliverPolicy: jetstream.DeliverAllPolicy, + AckPolicy: jetstream.AckExplicitPolicy, + }) + if err != nil { + return nil, fmt.Errorf("client: ensure durable consumer for room %s: %w", roomID, err) + } + cc, err := cons.Consume(func(msg jetstream.Msg) { + onMsg(msg.Data()) + _ = msg.Ack() + }) + if err != nil { + return nil, fmt.Errorf("client: consume room %s: %w", roomID, err) + } + return cc, nil +} diff --git a/pkg/client/persist_test.go b/pkg/client/persist_test.go new file mode 100644 index 0000000..6534bf9 --- /dev/null +++ b/pkg/client/persist_test.go @@ -0,0 +1,296 @@ +package client_test + +import ( + "sync" + "testing" + "time" + + "github.com/enmanuel/unibus/pkg/client" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/enmanuel/unibus/pkg/room" +) + +// modePersistPlain is a persisted-but-unencrypted policy: durable JetStream +// history, cleartext payloads, no signatures. It exercises the persistence path +// in isolation from the crypto path. +var modePersistPlain = room.Policy{Encrypt: false, Persist: true, SignMsgs: false} + +// containsAll reports whether every want string appears in got. +func containsAll(got, want []string) bool { + for _, w := range want { + found := false + for _, g := range got { + if g == w { + found = true + break + } + } + if !found { + return false + } + } + return true +} + +// containsAny reports whether any want string appears in got. +func containsAny(got, want []string) bool { + for _, w := range want { + for _, g := range got { + if g == w { + return true + } + } + } + return false +} + +// TestPersistScrollback: a persisted room delivers prior messages (history) to a +// peer that subscribes AFTER those messages were published. A publishes 3 before +// B subscribes; B must receive all 3. +func TestPersistScrollback(t *testing.T) { + h := newHarness(t) + waitHealth(t, h.ctrlURL) + + a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect A: %v", err) + } + defer a.Close() + b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect B: %v", err) + } + defer b.Close() + + roomID, err := a.CreateRoom("room.persist", modePersistPlain) + if err != nil { + t.Fatalf("A create persisted room: %v", err) + } + + want := []string{"hist-1", "hist-2", "hist-3"} + for _, m := range want { + if err := a.Publish(roomID, []byte(m)); err != nil { + t.Fatalf("A publish %q: %v", m, err) + } + } + + // B subscribes only AFTER the 3 messages were published. + col := subscribeCollect(t, b, roomID) + defer col.sub.Unsubscribe() + + if !waitFor(&col.mu, &col.msgs, func(rs []string) bool { + return containsAll(rs, want) + }, 5*time.Second) { + t.Fatalf("B did not receive full scrollback; want %v got %v", want, snapshot(&col.mu, &col.msgs)) + } +} + +// TestEphemeralNoScrollback: an ephemeral (Persist=false) room delivers NO prior +// messages to a late subscriber. A publishes 3; B subscribes after and must see +// none of them. This guards that the Persist=false path stays plain core NATS. +func TestEphemeralNoScrollback(t *testing.T) { + h := newHarness(t) + waitHealth(t, h.ctrlURL) + + a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect A: %v", err) + } + defer a.Close() + b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect B: %v", err) + } + defer b.Close() + + roomID, err := a.CreateRoom("proc.ephemeral.ticks", room.ModeNATS) + if err != nil { + t.Fatalf("A create ephemeral room: %v", err) + } + + prior := []string{"eph-1", "eph-2", "eph-3"} + for _, m := range prior { + if err := a.Publish(roomID, []byte(m)); err != nil { + t.Fatalf("A publish %q: %v", m, err) + } + } + // Let any (incorrect) delivery settle. + time.Sleep(200 * time.Millisecond) + + col := subscribeCollect(t, b, roomID) + defer col.sub.Unsubscribe() + time.Sleep(150 * time.Millisecond) + + // A live message AFTER subscribe proves the subscription works at all. + const live = "eph-live" + if err := a.Publish(roomID, []byte(live)); err != nil { + t.Fatalf("A publish live: %v", err) + } + if !waitFor(&col.mu, &col.msgs, func(rs []string) bool { + return containsAny(rs, []string{live}) + }, 2*time.Second) { + t.Fatalf("B did not receive live message on ephemeral room; got %v", snapshot(&col.mu, &col.msgs)) + } + + // None of the prior messages must have been replayed. + if got := snapshot(&col.mu, &col.msgs); containsAny(got, prior) { + t.Fatalf("ephemeral room replayed history (should be impossible); got %v", got) + } +} + +// TestOfflineReplay: a peer that goes offline (Unsubscribe) and reconnects with +// the same durable resumes from its last ack — it receives messages published +// while it was offline. B subscribes, unsubscribes, A publishes 2, B re-subscribes +// with the SAME identity (same durable) and must receive both missed messages. +func TestOfflineReplay(t *testing.T) { + h := newHarness(t) + waitHealth(t, h.ctrlURL) + + a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect A: %v", err) + } + defer a.Close() + + bID := mustIdentity(t) + b, err := client.New(h.natsURL, h.ctrlURL, bID) + if err != nil { + t.Fatalf("connect B: %v", err) + } + defer b.Close() + + roomID, err := a.CreateRoom("room.replay", modePersistPlain) + if err != nil { + t.Fatalf("A create persisted room: %v", err) + } + + // B comes online once (establishes its durable), receives the first message, + // then goes offline. + col1 := subscribeCollect(t, b, roomID) + const first = "before-offline" + if err := a.Publish(roomID, []byte(first)); err != nil { + t.Fatalf("A publish first: %v", err) + } + if !waitFor(&col1.mu, &col1.msgs, func(rs []string) bool { + return containsAny(rs, []string{first}) + }, 5*time.Second) { + t.Fatalf("B did not receive first message before going offline; got %v", snapshot(&col1.mu, &col1.msgs)) + } + // B goes offline: stop local delivery but keep the durable's ack position. + if err := col1.sub.Unsubscribe(); err != nil { + t.Fatalf("B unsubscribe: %v", err) + } + time.Sleep(150 * time.Millisecond) + + // While B is offline, A publishes 2 more. + missed := []string{"missed-1", "missed-2"} + for _, m := range missed { + if err := a.Publish(roomID, []byte(m)); err != nil { + t.Fatalf("A publish %q: %v", m, err) + } + } + + // B reconnects with the SAME identity → same durable name → resumes from its + // last ack and replays exactly the two messages it missed. + col2 := subscribeCollect(t, b, roomID) + defer col2.sub.Unsubscribe() + if !waitFor(&col2.mu, &col2.msgs, func(rs []string) bool { + return containsAll(rs, missed) + }, 5*time.Second) { + t.Fatalf("B did not replay missed messages on reconnect; want %v got %v", missed, snapshot(&col2.mu, &col2.msgs)) + } +} + +// TestPersistEncryptedNoPanic: a persisted + encrypted room with an epoch change. +// A publishes at epoch 1, kicks B (rotating to epoch 2), publishes at epoch 2. +// A freshly invited peer C (which only holds epoch-2 K) reads the full history: +// it must NOT panic on the epoch-1 frame it cannot decrypt, and must only surface +// the epoch-2 message it can decrypt (megolm: new members do not read prior +// history). This exercises both the OpenAEAD-fails-skip path and empty/foreign +// frames during scrollback without crashing. +func TestPersistEncryptedNoPanic(t *testing.T) { + h := newHarness(t) + waitHealth(t, h.ctrlURL) + + a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect A: %v", err) + } + defer a.Close() + b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect B: %v", err) + } + defer b.Close() + c, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect C: %v", err) + } + defer c.Close() + + // ModeMatrix is encrypted + persisted + signed. + roomID, err := a.CreateRoom("room.secret", room.ModeMatrix) + if err != nil { + t.Fatalf("A create encrypted persisted room: %v", err) + } + if err := a.Invite(roomID, b.Endpoint()); err != nil { + t.Fatalf("A invite B: %v", err) + } + if err := b.Join(roomID); err != nil { + t.Fatalf("B join: %v", err) + } + + const epoch1Msg = "epoch1-secret" + if err := a.Publish(roomID, []byte(epoch1Msg)); err != nil { + t.Fatalf("A publish epoch1: %v", err) + } + + // Kick B → key rotates to epoch 2. + if err := a.Kick(roomID, b.Endpoint().ID); err != nil { + t.Fatalf("A kick B: %v", err) + } + time.Sleep(150 * time.Millisecond) + + const epoch2Msg = "epoch2-secret" + if err := a.Publish(roomID, []byte(epoch2Msg)); err != nil { + t.Fatalf("A publish epoch2: %v", err) + } + + // C is invited only at the current (epoch 2) key, so it CANNOT decrypt the + // epoch-1 history frame but CAN decrypt the epoch-2 one. + if err := a.Invite(roomID, c.Endpoint()); err != nil { + t.Fatalf("A invite C: %v", err) + } + if err := c.Join(roomID); err != nil { + t.Fatalf("C join: %v", err) + } + + var mu sync.Mutex + var seen []string + // The handler must never be invoked with undecryptable garbage, and the + // subscribe path must never panic while scrolling through history that mixes + // epoch-1 (foreign) and epoch-2 (readable) frames. + sub, err := c.Subscribe(roomID, func(f frame.Frame, plaintext []byte) { + mu.Lock() + seen = append(seen, string(plaintext)) + mu.Unlock() + }) + if err != nil { + t.Fatalf("C subscribe: %v", err) + } + defer sub.Unsubscribe() + + // C should surface the epoch-2 message from history (it holds K2). + if !waitFor(&mu, &seen, func(rs []string) bool { + return containsAny(rs, []string{epoch2Msg}) + }, 5*time.Second) { + t.Fatalf("C did not decrypt epoch-2 history message; got %v", snapshot(&mu, &seen)) + } + + // Give scrollback time to fully drain, then assert C never saw the epoch-1 + // secret (it lacks K1) and that no crash occurred (we got here). + time.Sleep(500 * time.Millisecond) + if got := snapshot(&mu, &seen); containsAny(got, []string{epoch1Msg}) { + t.Fatalf("forward secrecy broken in history: C read epoch-1 secret without K1; got %v", got) + } +} diff --git a/playground/index.html b/playground/index.html index 9660a93..945c236 100644 --- a/playground/index.html +++ b/playground/index.html @@ -166,6 +166,13 @@ +