Files
unibus/pkg/client/persist_test.go

297 lines
9.1 KiB
Go

package client_test
import (
"sync"
"testing"
"time"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/room"
)
// modePersistPlain is a persisted-but-unencrypted policy: durable JetStream
// history, cleartext payloads, no signatures. It exercises the persistence path
// in isolation from the crypto path.
var modePersistPlain = room.Policy{Encrypt: false, Persist: true, SignMsgs: false}
// containsAll reports whether every want string appears in got.
func containsAll(got, want []string) bool {
for _, w := range want {
found := false
for _, g := range got {
if g == w {
found = true
break
}
}
if !found {
return false
}
}
return true
}
// containsAny reports whether any want string appears in got.
func containsAny(got, want []string) bool {
for _, w := range want {
for _, g := range got {
if g == w {
return true
}
}
}
return false
}
// TestPersistScrollback: a persisted room delivers prior messages (history) to a
// peer that subscribes AFTER those messages were published. A publishes 3 before
// B subscribes; B must receive all 3.
func TestPersistScrollback(t *testing.T) {
h := newHarness(t)
waitHealth(t, h.ctrlURL)
a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect B: %v", err)
}
defer b.Close()
roomID, err := a.CreateRoom("room.persist", modePersistPlain)
if err != nil {
t.Fatalf("A create persisted room: %v", err)
}
want := []string{"hist-1", "hist-2", "hist-3"}
for _, m := range want {
if err := a.Publish(roomID, []byte(m)); err != nil {
t.Fatalf("A publish %q: %v", m, err)
}
}
// B subscribes only AFTER the 3 messages were published.
col := subscribeCollect(t, b, roomID)
defer col.sub.Unsubscribe()
if !waitFor(&col.mu, &col.msgs, func(rs []string) bool {
return containsAll(rs, want)
}, 5*time.Second) {
t.Fatalf("B did not receive full scrollback; want %v got %v", want, snapshot(&col.mu, &col.msgs))
}
}
// TestEphemeralNoScrollback: an ephemeral (Persist=false) room delivers NO prior
// messages to a late subscriber. A publishes 3; B subscribes after and must see
// none of them. This guards that the Persist=false path stays plain core NATS.
func TestEphemeralNoScrollback(t *testing.T) {
h := newHarness(t)
waitHealth(t, h.ctrlURL)
a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect B: %v", err)
}
defer b.Close()
roomID, err := a.CreateRoom("proc.ephemeral.ticks", room.ModeNATS)
if err != nil {
t.Fatalf("A create ephemeral room: %v", err)
}
prior := []string{"eph-1", "eph-2", "eph-3"}
for _, m := range prior {
if err := a.Publish(roomID, []byte(m)); err != nil {
t.Fatalf("A publish %q: %v", m, err)
}
}
// Let any (incorrect) delivery settle.
time.Sleep(200 * time.Millisecond)
col := subscribeCollect(t, b, roomID)
defer col.sub.Unsubscribe()
time.Sleep(150 * time.Millisecond)
// A live message AFTER subscribe proves the subscription works at all.
const live = "eph-live"
if err := a.Publish(roomID, []byte(live)); err != nil {
t.Fatalf("A publish live: %v", err)
}
if !waitFor(&col.mu, &col.msgs, func(rs []string) bool {
return containsAny(rs, []string{live})
}, 2*time.Second) {
t.Fatalf("B did not receive live message on ephemeral room; got %v", snapshot(&col.mu, &col.msgs))
}
// None of the prior messages must have been replayed.
if got := snapshot(&col.mu, &col.msgs); containsAny(got, prior) {
t.Fatalf("ephemeral room replayed history (should be impossible); got %v", got)
}
}
// TestOfflineReplay: a peer that goes offline (Unsubscribe) and reconnects with
// the same durable resumes from its last ack — it receives messages published
// while it was offline. B subscribes, unsubscribes, A publishes 2, B re-subscribes
// with the SAME identity (same durable) and must receive both missed messages.
func TestOfflineReplay(t *testing.T) {
h := newHarness(t)
waitHealth(t, h.ctrlURL)
a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
bID := mustIdentity(t)
b, err := client.New(h.natsURL, h.ctrlURL, bID)
if err != nil {
t.Fatalf("connect B: %v", err)
}
defer b.Close()
roomID, err := a.CreateRoom("room.replay", modePersistPlain)
if err != nil {
t.Fatalf("A create persisted room: %v", err)
}
// B comes online once (establishes its durable), receives the first message,
// then goes offline.
col1 := subscribeCollect(t, b, roomID)
const first = "before-offline"
if err := a.Publish(roomID, []byte(first)); err != nil {
t.Fatalf("A publish first: %v", err)
}
if !waitFor(&col1.mu, &col1.msgs, func(rs []string) bool {
return containsAny(rs, []string{first})
}, 5*time.Second) {
t.Fatalf("B did not receive first message before going offline; got %v", snapshot(&col1.mu, &col1.msgs))
}
// B goes offline: stop local delivery but keep the durable's ack position.
if err := col1.sub.Unsubscribe(); err != nil {
t.Fatalf("B unsubscribe: %v", err)
}
time.Sleep(150 * time.Millisecond)
// While B is offline, A publishes 2 more.
missed := []string{"missed-1", "missed-2"}
for _, m := range missed {
if err := a.Publish(roomID, []byte(m)); err != nil {
t.Fatalf("A publish %q: %v", m, err)
}
}
// B reconnects with the SAME identity → same durable name → resumes from its
// last ack and replays exactly the two messages it missed.
col2 := subscribeCollect(t, b, roomID)
defer col2.sub.Unsubscribe()
if !waitFor(&col2.mu, &col2.msgs, func(rs []string) bool {
return containsAll(rs, missed)
}, 5*time.Second) {
t.Fatalf("B did not replay missed messages on reconnect; want %v got %v", missed, snapshot(&col2.mu, &col2.msgs))
}
}
// TestPersistEncryptedNoPanic: a persisted + encrypted room with an epoch change.
// A publishes at epoch 1, kicks B (rotating to epoch 2), publishes at epoch 2.
// A freshly invited peer C (which only holds epoch-2 K) reads the full history:
// it must NOT panic on the epoch-1 frame it cannot decrypt, and must only surface
// the epoch-2 message it can decrypt (megolm: new members do not read prior
// history). This exercises both the OpenAEAD-fails-skip path and empty/foreign
// frames during scrollback without crashing.
func TestPersistEncryptedNoPanic(t *testing.T) {
h := newHarness(t)
waitHealth(t, h.ctrlURL)
a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect B: %v", err)
}
defer b.Close()
c, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect C: %v", err)
}
defer c.Close()
// ModeMatrix is encrypted + persisted + signed.
roomID, err := a.CreateRoom("room.secret", room.ModeMatrix)
if err != nil {
t.Fatalf("A create encrypted persisted room: %v", err)
}
if err := a.Invite(roomID, b.Endpoint()); err != nil {
t.Fatalf("A invite B: %v", err)
}
if err := b.Join(roomID); err != nil {
t.Fatalf("B join: %v", err)
}
const epoch1Msg = "epoch1-secret"
if err := a.Publish(roomID, []byte(epoch1Msg)); err != nil {
t.Fatalf("A publish epoch1: %v", err)
}
// Kick B → key rotates to epoch 2.
if err := a.Kick(roomID, b.Endpoint().ID); err != nil {
t.Fatalf("A kick B: %v", err)
}
time.Sleep(150 * time.Millisecond)
const epoch2Msg = "epoch2-secret"
if err := a.Publish(roomID, []byte(epoch2Msg)); err != nil {
t.Fatalf("A publish epoch2: %v", err)
}
// C is invited only at the current (epoch 2) key, so it CANNOT decrypt the
// epoch-1 history frame but CAN decrypt the epoch-2 one.
if err := a.Invite(roomID, c.Endpoint()); err != nil {
t.Fatalf("A invite C: %v", err)
}
if err := c.Join(roomID); err != nil {
t.Fatalf("C join: %v", err)
}
var mu sync.Mutex
var seen []string
// The handler must never be invoked with undecryptable garbage, and the
// subscribe path must never panic while scrolling through history that mixes
// epoch-1 (foreign) and epoch-2 (readable) frames.
sub, err := c.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
mu.Lock()
seen = append(seen, string(plaintext))
mu.Unlock()
})
if err != nil {
t.Fatalf("C subscribe: %v", err)
}
defer sub.Unsubscribe()
// C should surface the epoch-2 message from history (it holds K2).
if !waitFor(&mu, &seen, func(rs []string) bool {
return containsAny(rs, []string{epoch2Msg})
}, 5*time.Second) {
t.Fatalf("C did not decrypt epoch-2 history message; got %v", snapshot(&mu, &seen))
}
// Give scrollback time to fully drain, then assert C never saw the epoch-1
// secret (it lacks K1) and that no crash occurred (we got here).
time.Sleep(500 * time.Millisecond)
if got := snapshot(&mu, &seen); containsAny(got, []string{epoch1Msg}) {
t.Fatalf("forward secrecy broken in history: C read epoch-1 secret without K1; got %v", got)
}
}