Files
unibus/pkg/client/client_test.go
T
agent 6b3ace1d39 feat(0003b): membership.Store interface + JetStream KV implementation
Branch-by-abstraction for the control-plane store (issue 0003b), so the
membership state can move off process-local SQLite onto replicated
JetStream KV without rewriting callers and without breaking master.

pkg/membership:
- Store is now an interface (rooms/members/keys + user allowlist +
  Close). The existing SQLite implementation is renamed sqliteStore and
  stays the default: Open(path) still returns it. openSQLite keeps the
  concrete type for internal callers (the 0003c migration).
- ErrNotFound is a storage-agnostic "no such record" sentinel; both
  backends return it (the SQLite store maps sql.ErrNoRows to it). The
  control plane now branches on ErrNotFound instead of sql.ErrNoRows, so
  server.go no longer imports database/sql.
- jetstreamStore (new) implements Store over five replicated KV buckets:
  rooms, members, rooms_by_member (reverse index for ListRoomsForEndpoint),
  room_keys, users. Replication factor is configurable (R1..R5) for the
  R1->R3 rollout. Every read is bounded by OpTimeout and IsAuthorized /
  HasAdmin FAIL CLOSED on any backend error (a KV quorum loss denies,
  never admits), per the audit's requirement for the decentralized store.

dev/feature_flags.json:
- Add the `decentralized` flag (OFF): sqliteStore default while off,
  jetstreamStore behind it. The membershipd boot wiring that selects the
  KV store is deliberately deferred to 0003e/0003f (the embedded-NATS
  authenticator<->store bootstrap is part of the session/deploy redesign);
  OFF keeps the single-node SQLite control plane unchanged.

Tests (DoD: golden + edges + error path):
- TestJetStreamStoreRoomsCRUD: encrypted room + owner + invited member
  round-trip through every room/member/key method, including latest-epoch
  resolution and rekey.
- TestJetStreamStoreUsers: add/get/authorize/list/revoke + admin gate,
  with case-insensitive key normalization and duplicate rejection.
- TestJetStreamStoreNotFound: ErrNotFound mapping for misses.
- TestJetStreamStoreIsAuthorizedFailClosed: NATS backend shut down ->
  IsAuthorized and HasAdmin both DENY within the bounded timeout.

The full existing suite stays green: sqliteStore is unchanged behavior.
2026-06-07 15:04:52 +02:00

636 lines
19 KiB
Go

package client_test
import (
"crypto/tls"
"encoding/hex"
"net"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"sync"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/enmanuel/unibus/pkg/room"
server "github.com/nats-io/nats-server/v2/server"
)
// testHarness boots an embedded NATS server and an in-process membershipd HTTP
// server, returning their URLs and a cleanup func.
type testHarness struct {
natsURL string
ctrlURL string
ns *server.Server
httpts *httptest.Server
store membership.Store
srv *membership.Server
}
func freePort(t *testing.T) int {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("free port: %v", err)
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port
}
func newHarness(t *testing.T) *testHarness { return newHarnessFull(t, membership.AuthOff, false) }
// newHarnessMode is newHarness with an explicit control-plane auth mode and the
// NATS data plane left open (no nkey auth), so HTTP-auth tests can use a plain
// client.New that does not present an nkey.
func newHarnessMode(t *testing.T, mode membership.AuthMode) *testHarness {
return newHarnessFull(t, mode, false)
}
// newHarnessFull boots the embedded NATS (optionally with the nkey authenticator
// backed by the user allowlist) and the membershipd HTTP server in ctrlMode.
// natsAuth and ctrlMode are independent on purpose: an HTTP-enforce test can
// keep NATS open, and an nkey test can keep HTTP off, mirroring how the rollout
// flags compose. The store is created before NATS so the authenticator can
// consult IsAuthorized for live revocation.
func newHarnessFull(t *testing.T, ctrlMode membership.AuthMode, natsAuth bool) *testHarness {
return bootHarness(t, ctrlMode, natsAuth, nil)
}
// bootHarness is the shared body: a store, an embedded NATS (optionally with the
// nkey authenticator and/or TLS), and the membershipd HTTP server in ctrlMode.
func bootHarness(t *testing.T, ctrlMode membership.AuthMode, natsAuth bool, natsTLS *tls.Config) *testHarness {
t.Helper()
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("membership store: %v", err)
}
cfg := embeddednats.ServerConfig{
StoreDir: filepath.Join(dir, "js"),
Host: "127.0.0.1",
Port: freePort(t),
TLS: natsTLS,
}
if natsAuth {
cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized)
}
ns, err := embeddednats.StartServer(cfg)
if err != nil {
store.Close()
t.Fatalf("embedded nats: %v", err)
}
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
if err != nil {
ns.Shutdown()
store.Close()
t.Fatalf("blob store: %v", err)
}
srv := membership.NewServer(store, blobs, ctrlMode)
httpts := httptest.NewServer(srv)
h := &testHarness{natsURL: embeddednats.ClientURL(ns), ctrlURL: httpts.URL, ns: ns, httpts: httpts, store: store, srv: srv}
t.Cleanup(func() {
httpts.Close()
store.Close()
ns.Shutdown()
ns.WaitForShutdown()
})
return h
}
// registerClient adds a peer's signing identity to the bus allowlist so its
// signed control-plane requests pass under enforce.
func registerClient(t *testing.T, h *testHarness, c *client.Client, handle, role string) {
t.Helper()
if err := h.store.AddUser(hex.EncodeToString(c.Endpoint().SignPub), handle, role); err != nil {
t.Fatalf("register %s: %v", handle, err)
}
}
func waitHealth(t *testing.T, ctrlURL string) {
t.Helper()
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
resp, err := http.Get(ctrlURL + "/healthz")
if err == nil && resp.StatusCode == 200 {
resp.Body.Close()
return
}
if resp != nil {
resp.Body.Close()
}
time.Sleep(50 * time.Millisecond)
}
t.Fatalf("membershipd never became healthy")
}
func mustIdentity(t *testing.T) cs.Identity {
t.Helper()
id, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("generate identity: %v", err)
}
return id
}
// TestE2EEncryptedForwardSecrecy is the headline test: A creates an encrypted
// room, invites B, A publishes a message B decrypts, then A kicks B and
// publishes at the new epoch — B must NOT be able to decrypt the new message.
func TestE2EEncryptedForwardSecrecy(t *testing.T) {
h := newHarness(t)
waitHealth(t, h.ctrlURL)
a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect B: %v", err)
}
defer b.Close()
roomID, err := a.CreateRoom("room.test", room.ModeMatrix)
if err != nil {
t.Fatalf("A create room: %v", err)
}
if err := a.Invite(roomID, b.Endpoint()); err != nil {
t.Fatalf("A invite B: %v", err)
}
if err := b.Join(roomID); err != nil {
t.Fatalf("B join: %v", err)
}
var mu sync.Mutex
var received []string
sub, err := b.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
mu.Lock()
received = append(received, string(plaintext))
mu.Unlock()
})
if err != nil {
t.Fatalf("B subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(150 * time.Millisecond)
const msg1 = "hola E2E"
if err := a.Publish(roomID, []byte(msg1)); err != nil {
t.Fatalf("A publish msg1: %v", err)
}
// Wait for B to receive and decrypt msg1.
if !waitFor(&mu, &received, func(rs []string) bool {
for _, r := range rs {
if r == msg1 {
return true
}
}
return false
}, 2*time.Second) {
t.Fatalf("B did not decrypt pre-kick message %q; got %v", msg1, snapshot(&mu, &received))
}
// A kicks B (rotates K to a new epoch, re-sealed only for the owner).
if err := a.Kick(roomID, b.Endpoint().ID); err != nil {
t.Fatalf("A kick B: %v", err)
}
time.Sleep(150 * time.Millisecond)
const msg2 = "secreto post-kick"
if err := a.Publish(roomID, []byte(msg2)); err != nil {
t.Fatalf("A publish msg2: %v", err)
}
// Give B a chance to (fail to) decrypt; assert it never sees msg2.
time.Sleep(1 * time.Second)
for _, r := range snapshot(&mu, &received) {
if r == msg2 {
t.Fatalf("forward secrecy broken: B decrypted post-kick message %q", msg2)
}
}
// Sanity: A itself can still decrypt at the new epoch (self-loopback via a fresh subscriber).
aSub := subscribeCollect(t, a, roomID)
defer aSub.sub.Unsubscribe()
time.Sleep(100 * time.Millisecond)
const msg3 = "owner-still-works"
if err := a.Publish(roomID, []byte(msg3)); err != nil {
t.Fatalf("A publish msg3: %v", err)
}
if !waitFor(&aSub.mu, &aSub.msgs, func(rs []string) bool {
for _, r := range rs {
if r == msg3 {
return true
}
}
return false
}, 2*time.Second) {
t.Fatalf("owner could not decrypt own message at new epoch; got %v", snapshot(&aSub.mu, &aSub.msgs))
}
}
// TestCleartextWorkerToChat validates the ModeNATS path: a publisher and a
// subscriber sharing a subject, no encryption, messages flow through verbatim.
func TestCleartextWorkerToChat(t *testing.T) {
h := newHarness(t)
waitHealth(t, h.ctrlURL)
pub, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect pub: %v", err)
}
defer pub.Close()
subC, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect sub: %v", err)
}
defer subC.Close()
const subject = "proc.test.ticks"
// Each peer owns a room mapped to the shared subject; NATS fans out by subject.
pubRoom, err := pub.CreateRoom(subject, room.ModeNATS)
if err != nil {
t.Fatalf("pub create room: %v", err)
}
subRoom, err := subC.CreateRoom(subject, room.ModeNATS)
if err != nil {
t.Fatalf("sub create room: %v", err)
}
collector := subscribeCollect(t, subC, subRoom)
defer collector.sub.Unsubscribe()
time.Sleep(150 * time.Millisecond)
const msg = "tick 1"
if err := pub.Publish(pubRoom, []byte(msg)); err != nil {
t.Fatalf("publish: %v", err)
}
if !waitFor(&collector.mu, &collector.msgs, func(rs []string) bool {
for _, r := range rs {
if r == msg {
return true
}
}
return false
}, 2*time.Second) {
t.Fatalf("subscriber did not receive cleartext message; got %v", snapshot(&collector.mu, &collector.msgs))
}
}
// TestMediaBlobRoundTrip validates encrypted media via the object store.
func TestMediaBlobRoundTrip(t *testing.T) {
h := newHarness(t)
waitHealth(t, h.ctrlURL)
a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect B: %v", err)
}
defer b.Close()
roomID, err := a.CreateRoom("room.media", room.ModeMatrix)
if err != nil {
t.Fatalf("create room: %v", err)
}
if err := a.Invite(roomID, b.Endpoint()); err != nil {
t.Fatalf("invite: %v", err)
}
if err := b.Join(roomID); err != nil {
t.Fatalf("join: %v", err)
}
gotBlob := make(chan []byte, 1)
sub, err := b.Subscribe(roomID, func(f frame.Frame, _ []byte) {
if f.Blob == nil {
return
}
data, err := b.FetchMedia(roomID, f)
if err != nil {
return
}
gotBlob <- data
})
if err != nil {
t.Fatalf("subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(150 * time.Millisecond)
payload := []byte("a fake image payload that should be encrypted in the store")
if err := a.PublishMedia(roomID, payload); err != nil {
t.Fatalf("publish media: %v", err)
}
select {
case got := <-gotBlob:
if string(got) != string(payload) {
t.Fatalf("media mismatch: got %q want %q", got, payload)
}
case <-time.After(2 * time.Second):
t.Fatalf("B never received/decrypted the media blob")
}
}
// TestThreadedReplyAndReaction exercises the additive threading API end to end
// in an encrypted, persisted, signed room (ModeMatrix): A publishes a root
// message, replies to it within a thread, and reacts to it with an emoji. The
// loopback subscriber must observe the reply carrying ReplyTo/ThreadID and the
// reaction as a frame.REACT whose (decrypted) payload is the emoji — proving the
// reaction stays sealed like any message in an E2E room.
func TestThreadedReplyAndReaction(t *testing.T) {
h := newHarness(t)
waitHealth(t, h.ctrlURL)
a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
roomID, err := a.CreateRoom("room.thread", room.ModeMatrix)
if err != nil {
t.Fatalf("create room: %v", err)
}
type rec struct {
f frame.Frame
pt string
}
var mu sync.Mutex
var got []rec
sub, err := a.Subscribe(roomID, func(f frame.Frame, pt []byte) {
mu.Lock()
got = append(got, rec{f: f, pt: string(pt)})
mu.Unlock()
})
if err != nil {
t.Fatalf("subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(150 * time.Millisecond)
find := func(pred func(rec) bool) (rec, bool) {
mu.Lock()
defer mu.Unlock()
for _, r := range got {
if pred(r) {
return r, true
}
}
return rec{}, false
}
waitRec := func(pred func(rec) bool) (rec, bool) {
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if r, ok := find(pred); ok {
return r, true
}
time.Sleep(25 * time.Millisecond)
}
return rec{}, false
}
// 1. Root message.
if err := a.Publish(roomID, []byte("root")); err != nil {
t.Fatalf("publish root: %v", err)
}
rootRec, ok := waitRec(func(r rec) bool { return r.pt == "root" })
if !ok {
t.Fatalf("never observed the root message")
}
rootID := rootRec.f.MsgID
if rootID == "" {
t.Fatalf("root frame has empty MsgID")
}
// 2. Threaded reply to the root.
if err := a.PublishReply(roomID, []byte("child"), rootID, rootID); err != nil {
t.Fatalf("publish reply: %v", err)
}
reply, ok := waitRec(func(r rec) bool { return r.pt == "child" })
if !ok {
t.Fatalf("never observed the threaded reply")
}
if reply.f.ReplyTo != rootID || reply.f.ThreadID != rootID {
t.Fatalf("reply threading lost: ReplyTo=%q ThreadID=%q want %q", reply.f.ReplyTo, reply.f.ThreadID, rootID)
}
// 3. Reaction to the root (emoji rides the encrypted payload).
if err := a.React(roomID, rootID, "👍"); err != nil {
t.Fatalf("react: %v", err)
}
reaction, ok := waitRec(func(r rec) bool { return r.f.Type == frame.REACT })
if !ok {
t.Fatalf("never observed the reaction frame")
}
if reaction.f.ReplyTo != rootID {
t.Fatalf("reaction target lost: ReplyTo=%q want %q", reaction.f.ReplyTo, rootID)
}
if reaction.pt != "👍" {
t.Fatalf("reaction payload mismatch: got %q want 👍 (decryption in E2E room)", reaction.pt)
}
}
// TestListMyRoomsDiscovery verifies room discovery: an invited peer finds the
// room via ListMyRooms (without being told its id), and a peer in no rooms gets
// an empty list. This is what lets a bot discover rooms it was invited to.
func TestListMyRoomsDiscovery(t *testing.T) {
h := newHarness(t)
waitHealth(t, h.ctrlURL)
a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect B: %v", err)
}
defer b.Close()
// B is in no rooms yet.
if rooms, err := b.ListMyRooms(); err != nil || len(rooms) != 0 {
t.Fatalf("B should start in no rooms, got %v err=%v", rooms, err)
}
roomID, err := a.CreateRoom("room.discovery", room.ModeMatrix)
if err != nil {
t.Fatalf("A create room: %v", err)
}
if err := a.Invite(roomID, b.Endpoint()); err != nil {
t.Fatalf("A invite B: %v", err)
}
// B discovers the room it was invited to, with its policy, without prior knowledge of the id.
rooms, err := b.ListMyRooms()
if err != nil {
t.Fatalf("B ListMyRooms: %v", err)
}
if len(rooms) != 1 || rooms[0].RoomID != roomID {
t.Fatalf("B should discover exactly room %s, got %+v", roomID, rooms)
}
if rooms[0].Subject != "room.discovery" || !rooms[0].Policy.Encrypt || rooms[0].Role != "member" {
t.Fatalf("discovered room metadata wrong: %+v", rooms[0])
}
// A sees the same room as its owner.
aRooms, err := a.ListMyRooms()
if err != nil {
t.Fatalf("A ListMyRooms: %v", err)
}
if len(aRooms) != 1 || aRooms[0].Role != "owner" {
t.Fatalf("A should own exactly one room, got %+v", aRooms)
}
}
// TestControlPlaneAuthEnforceE2E closes the loop end to end with the production
// client against a server in enforce mode: a registered peer's signed requests
// are accepted (golden), and an unregistered peer is rejected with 401 on its
// first control-plane call (error path). This proves the client's real
// signature construction matches the server's verification.
func TestControlPlaneAuthEnforceE2E(t *testing.T) {
h := newHarnessMode(t, membership.AuthEnforce)
waitHealth(t, h.ctrlURL)
a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
registerClient(t, h, a, "alice", membership.RoleAdmin)
// Golden: registered peer's signed request is accepted.
if _, err := a.CreateRoom("room.enforced", room.ModeNATS); err != nil {
t.Fatalf("registered peer should create a room under enforce: %v", err)
}
// Error path: an unregistered peer is rejected on its first control-plane call.
b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect B: %v", err)
}
defer b.Close()
_, err = b.CreateRoom("room.denied", room.ModeNATS)
if err == nil {
t.Fatalf("unregistered peer must be rejected under enforce")
}
if !strings.Contains(err.Error(), "401") && !strings.Contains(strings.ToLower(err.Error()), "unauthorized") {
t.Fatalf("expected a 401/unauthorized error, got %v", err)
}
// Revocation takes effect without restart: revoke A, its next request fails.
if err := h.store.RevokeUser(hex.EncodeToString(a.Endpoint().SignPub)); err != nil {
t.Fatalf("revoke A: %v", err)
}
if _, err := a.CreateRoom("room.after-revoke", room.ModeNATS); err == nil {
t.Fatalf("revoked peer must be rejected without a server restart")
}
}
// TestNatsNkeyAuth exercises the data-plane authenticator: with NATS nkey auth
// on, a registered peer connecting with its nkey is accepted and can publish
// (golden); an unregistered peer is refused at connect time (error path); and a
// peer revoked while the server runs is refused on its NEXT connection, proving
// revocation without a restart (edge).
func TestNatsNkeyAuth(t *testing.T) {
h := newHarnessFull(t, membership.AuthOff, true) // NATS auth on; HTTP off to isolate the data plane
waitHealth(t, h.ctrlURL)
idA := mustIdentity(t)
if err := h.store.AddUser(hex.EncodeToString(idA.SignPub), "alice", membership.RoleMember); err != nil {
t.Fatalf("register A: %v", err)
}
// Golden: registered peer connects with its nkey and uses the bus.
a, err := client.NewWithOptions(h.natsURL, h.ctrlURL, idA, client.Options{UseNkey: true})
if err != nil {
t.Fatalf("registered peer should connect with nkey: %v", err)
}
defer a.Close()
if _, err := a.CreateRoom("room.nkey", room.ModeNATS); err != nil {
t.Fatalf("registered peer should operate: %v", err)
}
// Error path: an unregistered identity is refused at connect time.
idB := mustIdentity(t)
if _, err := client.NewWithOptions(h.natsURL, h.ctrlURL, idB, client.Options{UseNkey: true}); err == nil {
t.Fatalf("unregistered peer must be refused by the NATS authenticator")
}
// Error path: presenting no nkey to an auth-required server is refused.
if _, err := client.NewWithOptions(h.natsURL, h.ctrlURL, idB, client.Options{UseNkey: false}); err == nil {
t.Fatalf("a client without an nkey must be refused when the server requires auth")
}
// Edge: revoke A while the server runs; A's NEXT connection is refused even
// though an already-open connection (a) is unaffected. No server restart.
if err := h.store.RevokeUser(hex.EncodeToString(idA.SignPub)); err != nil {
t.Fatalf("revoke A: %v", err)
}
if _, err := client.NewWithOptions(h.natsURL, h.ctrlURL, idA, client.Options{UseNkey: true}); err == nil {
t.Fatalf("revoked peer must be refused on a new connection without a restart")
}
}
// ---- test helpers ---------------------------------------------------------
type collector struct {
mu sync.Mutex
msgs []string
sub interface{ Unsubscribe() error }
}
func subscribeCollect(t *testing.T, c *client.Client, roomID string) *collector {
t.Helper()
col := &collector{}
sub, err := c.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
col.mu.Lock()
col.msgs = append(col.msgs, string(plaintext))
col.mu.Unlock()
})
if err != nil {
t.Fatalf("subscribe: %v", err)
}
col.sub = sub
return col
}
func waitFor(mu *sync.Mutex, slice *[]string, pred func([]string) bool, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
mu.Lock()
cp := append([]string(nil), (*slice)...)
mu.Unlock()
if pred(cp) {
return true
}
time.Sleep(25 * time.Millisecond)
}
return false
}
func snapshot(mu *sync.Mutex, slice *[]string) []string {
mu.Lock()
defer mu.Unlock()
return append([]string(nil), (*slice)...)
}