Files
unibus/pkg/membership/jetstream_store_test.go
T
agent 6b3ace1d39 feat(0003b): membership.Store interface + JetStream KV implementation
Branch-by-abstraction for the control-plane store (issue 0003b), so the
membership state can move off process-local SQLite onto replicated
JetStream KV without rewriting callers and without breaking master.

pkg/membership:
- Store is now an interface (rooms/members/keys + user allowlist +
  Close). The existing SQLite implementation is renamed sqliteStore and
  stays the default: Open(path) still returns it. openSQLite keeps the
  concrete type for internal callers (the 0003c migration).
- ErrNotFound is a storage-agnostic "no such record" sentinel; both
  backends return it (the SQLite store maps sql.ErrNoRows to it). The
  control plane now branches on ErrNotFound instead of sql.ErrNoRows, so
  server.go no longer imports database/sql.
- jetstreamStore (new) implements Store over five replicated KV buckets:
  rooms, members, rooms_by_member (reverse index for ListRoomsForEndpoint),
  room_keys, users. Replication factor is configurable (R1..R5) for the
  R1->R3 rollout. Every read is bounded by OpTimeout and IsAuthorized /
  HasAdmin FAIL CLOSED on any backend error (a KV quorum loss denies,
  never admits), per the audit's requirement for the decentralized store.

dev/feature_flags.json:
- Add the `decentralized` flag (OFF): sqliteStore default while off,
  jetstreamStore behind it. The membershipd boot wiring that selects the
  KV store is deliberately deferred to 0003e/0003f (the embedded-NATS
  authenticator<->store bootstrap is part of the session/deploy redesign);
  OFF keeps the single-node SQLite control plane unchanged.

Tests (DoD: golden + edges + error path):
- TestJetStreamStoreRoomsCRUD: encrypted room + owner + invited member
  round-trip through every room/member/key method, including latest-epoch
  resolution and rekey.
- TestJetStreamStoreUsers: add/get/authorize/list/revoke + admin gate,
  with case-insensitive key normalization and duplicate rejection.
- TestJetStreamStoreNotFound: ErrNotFound mapping for misses.
- TestJetStreamStoreIsAuthorizedFailClosed: NATS backend shut down ->
  IsAuthorized and HasAdmin both DENY within the bounded timeout.

The full existing suite stays green: sqliteStore is unchanged behavior.
2026-06-07 15:04:52 +02:00

276 lines
8.8 KiB
Go

package membership
import (
"bytes"
"errors"
"net"
"testing"
"time"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
server "github.com/nats-io/nats-server/v2/server"
)
func kvFreePort(t *testing.T) int {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("free port: %v", err)
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port
}
// newKVStore boots a single-node embedded NATS with JetStream and opens a
// jetstreamStore (R1) over it, returning the store plus the server and
// connection so a test can shut the backend down to exercise fail-closed paths.
func newKVStore(t *testing.T) (*jetstreamStore, *server.Server, *nats.Conn) {
t.Helper()
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(),
Host: "127.0.0.1",
Port: kvFreePort(t),
})
if err != nil {
t.Fatalf("embedded nats: %v", err)
}
nc, err := nats.Connect(ns.ClientURL())
if err != nil {
ns.Shutdown()
t.Fatalf("nats connect: %v", err)
}
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
ns.Shutdown()
t.Fatalf("jetstream: %v", err)
}
st, err := OpenJetStream(js, JetStreamConfig{Replicas: 1, OpTimeout: 2 * time.Second})
if err != nil {
nc.Close()
ns.Shutdown()
t.Fatalf("open jetstream store: %v", err)
}
t.Cleanup(func() {
nc.Close()
ns.Shutdown()
ns.WaitForShutdown()
})
return st.(*jetstreamStore), ns, nc
}
// TestJetStreamStoreRoomsCRUD is the golden path: an encrypted room with an owner
// and an invited member round-trips through every room/member/key method.
func TestJetStreamStoreRoomsCRUD(t *testing.T) {
s, _, _ := newKVStore(t)
roomID := newULID()
owner := "owner-ep-1"
info := RoomInfo{RoomID: roomID, Subject: "room.kv", Encrypt: true, Persist: true, SignMsgs: true, OwnerEndpoint: owner}
ownerSealed := []byte("sealed-owner-epoch1")
if err := s.CreateRoom(info, []byte("owner-sign"), []byte("owner-kex"), ownerSealed); err != nil {
t.Fatalf("CreateRoom: %v", err)
}
// GetRoom returns epoch 1 and the policy.
got, err := s.GetRoom(roomID)
if err != nil {
t.Fatalf("GetRoom: %v", err)
}
if got.Epoch != 1 || got.Subject != "room.kv" || !got.Encrypt || got.OwnerEndpoint != owner {
t.Fatalf("GetRoom mismatch: %+v", got)
}
// Owner is a member with role "owner".
om, err := s.GetMember(roomID, owner)
if err != nil {
t.Fatalf("GetMember owner: %v", err)
}
if om.Role != "owner" || !bytes.Equal(om.SignPub, []byte("owner-sign")) {
t.Fatalf("owner member mismatch: %+v", om)
}
// Owner's sealed key at epoch 1.
ep, sealed, err := s.GetSealedKey(roomID, owner, 1)
if err != nil || ep != 1 || !bytes.Equal(sealed, ownerSealed) {
t.Fatalf("GetSealedKey owner: ep=%d sealed=%q err=%v", ep, sealed, err)
}
// Invite a member with a sealed key at epoch 1.
bob := "member-ep-bob"
bobSealed := []byte("sealed-bob-epoch1")
if err := s.AddMember(roomID, Member{Endpoint: bob, Role: "member", SignPub: []byte("bob-sign"), KexPub: []byte("bob-kex")}, 1, bobSealed); err != nil {
t.Fatalf("AddMember: %v", err)
}
// ListMembers returns both, sorted by endpoint.
members, err := s.ListMembers(roomID)
if err != nil {
t.Fatalf("ListMembers: %v", err)
}
if len(members) != 2 {
t.Fatalf("ListMembers want 2, got %d (%+v)", len(members), members)
}
// Bob can find the room via the reverse index.
rooms, err := s.ListRoomsForEndpoint(bob)
if err != nil {
t.Fatalf("ListRoomsForEndpoint: %v", err)
}
if len(rooms) != 1 || rooms[0].RoomID != roomID || rooms[0].Role != "member" {
t.Fatalf("ListRoomsForEndpoint mismatch: %+v", rooms)
}
// Latest sealed key (epoch <= 0) resolves to epoch 1 for bob.
lep, lsealed, err := s.GetSealedKey(roomID, bob, 0)
if err != nil || lep != 1 || !bytes.Equal(lsealed, bobSealed) {
t.Fatalf("GetSealedKey latest bob: ep=%d err=%v", lep, err)
}
// Rekey to epoch 2 (bump + new sealed keys), then latest resolves to 2.
if err := s.BumpEpoch(roomID, 2); err != nil {
t.Fatalf("BumpEpoch: %v", err)
}
if err := s.PutSealedKeys(roomID, 2, map[string][]byte{owner: []byte("owner-epoch2")}); err != nil {
t.Fatalf("PutSealedKeys: %v", err)
}
got2, _ := s.GetRoom(roomID)
if got2.Epoch != 2 {
t.Fatalf("after BumpEpoch want epoch 2, got %d", got2.Epoch)
}
lep2, _, err := s.GetSealedKey(roomID, owner, 0)
if err != nil || lep2 != 2 {
t.Fatalf("latest owner key after rekey: ep=%d err=%v", lep2, err)
}
// Remove bob; he disappears from members and his reverse index.
if err := s.RemoveMember(roomID, bob); err != nil {
t.Fatalf("RemoveMember: %v", err)
}
if _, err := s.GetMember(roomID, bob); !errors.Is(err, ErrNotFound) {
t.Fatalf("GetMember after remove want ErrNotFound, got %v", err)
}
rooms2, _ := s.ListRoomsForEndpoint(bob)
if len(rooms2) != 0 {
t.Fatalf("ListRoomsForEndpoint after remove want 0, got %d", len(rooms2))
}
}
// TestJetStreamStoreUsers exercises the allowlist: add, lookup, authorize,
// revoke (which flips IsAuthorized), and the admin gate.
func TestJetStreamStoreUsers(t *testing.T) {
s, _, _ := newKVStore(t)
const aliceHex = "aa11"
if s.HasAdmin() {
t.Fatalf("fresh store should have no admin")
}
if err := s.AddUser(aliceHex, "alice", RoleAdmin); err != nil {
t.Fatalf("AddUser: %v", err)
}
if !s.HasAdmin() {
t.Fatalf("HasAdmin should be true after adding an admin")
}
if !s.IsAuthorized(aliceHex) {
t.Fatalf("alice should be authorized")
}
// Case-insensitive lookup (keys are normalized lowercase).
if !s.IsAuthorized("AA11") {
t.Fatalf("uppercase hex should normalize and authorize")
}
u, err := s.GetUser(aliceHex)
if err != nil || u.Handle != "alice" || u.Role != RoleAdmin || u.Status != StatusActive {
t.Fatalf("GetUser mismatch: %+v err=%v", u, err)
}
// Duplicate add is rejected with ErrUserExists.
if err := s.AddUser(aliceHex, "alice2", RoleMember); !errors.Is(err, ErrUserExists) {
t.Fatalf("duplicate AddUser want ErrUserExists, got %v", err)
}
if err := s.AddUser("bb22", "bob", RoleMember); err != nil {
t.Fatalf("AddUser bob: %v", err)
}
users, err := s.ListUsers()
if err != nil || len(users) != 2 {
t.Fatalf("ListUsers want 2, got %d err=%v", len(users), err)
}
// Revoke alice: authorization flips off immediately.
if err := s.RevokeUser(aliceHex); err != nil {
t.Fatalf("RevokeUser: %v", err)
}
if s.IsAuthorized(aliceHex) {
t.Fatalf("revoked user must not be authorized")
}
if s.HasAdmin() {
t.Fatalf("after revoking the only admin, HasAdmin must be false")
}
// Revoking again is an error (no active user).
if err := s.RevokeUser(aliceHex); err == nil {
t.Fatalf("re-revoke should error")
}
}
// TestJetStreamStoreNotFound checks the ErrNotFound mapping for misses.
func TestJetStreamStoreNotFound(t *testing.T) {
s, _, _ := newKVStore(t)
if _, err := s.GetRoom("nope"); !errors.Is(err, ErrNotFound) {
t.Fatalf("GetRoom miss want ErrNotFound, got %v", err)
}
if _, err := s.GetMember("nope", "x"); !errors.Is(err, ErrNotFound) {
t.Fatalf("GetMember miss want ErrNotFound, got %v", err)
}
if _, _, err := s.GetSealedKey("nope", "x", 1); !errors.Is(err, ErrNotFound) {
t.Fatalf("GetSealedKey miss want ErrNotFound, got %v", err)
}
if _, _, err := s.GetSealedKey("nope", "x", 0); !errors.Is(err, ErrNotFound) {
t.Fatalf("GetSealedKey latest miss want ErrNotFound, got %v", err)
}
if _, err := s.GetUser("ffff"); !errors.Is(err, ErrNotFound) {
t.Fatalf("GetUser miss want ErrNotFound, got %v", err)
}
}
// TestJetStreamStoreIsAuthorizedFailClosed is the error path mandated by the
// issue: when the KV backend is unavailable (here the NATS server is shut down),
// IsAuthorized must DENY, never admit. A previously-authorized identity flips to
// unauthorized once the backend cannot be reached.
func TestJetStreamStoreIsAuthorizedFailClosed(t *testing.T) {
s, ns, nc := newKVStore(t)
const aliceHex = "abcd"
if err := s.AddUser(aliceHex, "alice", RoleAdmin); err != nil {
t.Fatalf("AddUser: %v", err)
}
if !s.IsAuthorized(aliceHex) {
t.Fatalf("alice should be authorized while the backend is up")
}
// Take the KV backend away: close the client and stop the server. Every
// subsequent KV Get fails, and the store must fail closed.
nc.Close()
ns.Shutdown()
ns.WaitForShutdown()
// Bound the assertion: IsAuthorized internally caps each op at OpTimeout, so
// this returns well before the test deadline.
done := make(chan bool, 1)
go func() { done <- s.IsAuthorized(aliceHex) }()
select {
case authorized := <-done:
if authorized {
t.Fatalf("KV backend down but IsAuthorized returned true: NOT fail-closed")
}
case <-time.After(10 * time.Second):
t.Fatalf("IsAuthorized hung when the backend was down (no bounded timeout)")
}
// HasAdmin is likewise conservative: backend down -> false (gates stay closed).
if s.HasAdmin() {
t.Fatalf("KV backend down but HasAdmin returned true: NOT fail-closed")
}
}