diff --git a/cmd/membershipd/users_cli.go b/cmd/membershipd/users_cli.go index e21276c..5dedeff 100644 --- a/cmd/membershipd/users_cli.go +++ b/cmd/membershipd/users_cli.go @@ -65,7 +65,7 @@ const defaultDBPath = "./local_files/unibus.db" // openStore opens the membership store at path, exiting on failure. Migrations // (including 002_users.sql) are applied by membership.Open, so a fresh database // gets the users table on first use of the CLI. -func openStore(path string) *membership.Store { +func openStore(path string) membership.Store { store, err := membership.Open(path) if err != nil { fmt.Fprintf(os.Stderr, "membershipd user: open store %q: %v\n", path, err) diff --git a/dev/feature_flags.json b/dev/feature_flags.json index 8fc10d1..f58b826 100644 --- a/dev/feature_flags.json +++ b/dev/feature_flags.json @@ -14,6 +14,13 @@ "description": "TLS on the NATS data plane using the project's self-signed CA (deploy/tls/). Server opts in via membershipd --tls-cert/--tls-key; clients pin ca.crt via client.Connect(caPath).", "added": "2026-06-07", "enabled_at": "2026-06-07" + }, + "decentralized": { + "enabled": false, + "issue": "0003", + "description": "Control-plane state on replicated JetStream KV instead of local SQLite (branch-by-abstraction membership.Store: sqliteStore default OFF, jetstreamStore ON). The route cluster (0003a) and the KV store (0003b) ship behind this flag; the membershipd boot wiring that selects the KV store completes with the session/reconnect redesign (0003e) and is activated on the multi-node deploy (0003f). OFF keeps the single-node SQLite control plane unchanged.", + "added": "2026-06-07", + "enabled_at": null } } } diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index c556cd8..7b441d4 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -31,7 +31,7 @@ type testHarness struct { ctrlURL string ns *server.Server httpts *httptest.Server - store *membership.Store + store membership.Store srv *membership.Server } diff --git a/pkg/membership/auth_test.go b/pkg/membership/auth_test.go index 4cff562..ee73cfe 100644 --- a/pkg/membership/auth_test.go +++ b/pkg/membership/auth_test.go @@ -22,7 +22,7 @@ import ( // with a fresh store + blob store, and seeds one active admin ("alice"). type authHarness struct { ts *httptest.Server - store *Store + store Store alice cs.Identity alicePub string // hex } diff --git a/pkg/membership/jetstream_store.go b/pkg/membership/jetstream_store.go new file mode 100644 index 0000000..0ab63a5 --- /dev/null +++ b/pkg/membership/jetstream_store.go @@ -0,0 +1,510 @@ +package membership + +// jetstreamStore is the JetStream KV implementation of Store (issue 0003b): the +// control-plane state (rooms, members, sealed room keys, the user allowlist) +// lives in replicated JetStream Key/Value buckets instead of a process-local +// SQLite file. Any node in the cluster reads and writes the same buckets, and +// JetStream's RAFT layer keeps them consistent across replicas, so the HTTP +// control plane becomes effectively stateless: any membershipd can serve any +// request. It is selected only when the `decentralized` flag is on; sqliteStore +// stays the default. +// +// Key layout (every path segment is a single KV token — ULIDs, RawURL endpoint +// ids and lowercase-hex keys never contain a '.', so '.' is a safe separator and +// a ".*" watch enumerates exactly one trailing token): +// +// rooms roomID -> RoomInfo (JSON) +// members roomID.endpoint -> Member (JSON, carries Role) +// rooms_by_member endpoint.roomID -> role (reverse index for ListRoomsForEndpoint) +// room_keys roomID.endpoint.epoch -> sealed_key bytes +// users signPubHex -> User (JSON) +// +// Consistency caveat: KV has no multi-key transaction, so a multi-write op +// (CreateRoom, AddMember) is a short sequence of single-key writes. The order is +// chosen so a partial failure leaves a recoverable state (the room/member row +// before its reverse index or sealed key), and writes are idempotent (Put +// overwrites), which is also what makes the SQLite->KV migration (0003c) safe to +// re-run. +// +// Fail-closed: every read uses a bounded context, and IsAuthorized/HasAdmin +// return false on ANY backend error (a KV quorum loss or timeout denies access +// rather than admitting it), mirroring the SQLite store's behavior. + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sort" + "strconv" + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +// Bucket names (alphanumeric/dash/underscore only — no dots, per KV rules). +const ( + bucketRooms = "UNIBUS_rooms" + bucketMembers = "UNIBUS_members" + bucketByMember = "UNIBUS_rooms_by_member" + bucketRoomKeys = "UNIBUS_room_keys" + bucketUsers = "UNIBUS_users" + defaultKVOpTime = 5 * time.Second +) + +// JetStreamConfig configures the KV-backed store. +type JetStreamConfig struct { + // Replicas is the per-bucket replication factor (R1..R5). Use 1 for a single + // node or a 1-2 node rollout, 3 for real HA (quorum 2/3). Scaling R1->R3 in + // place is an operational step (nats kv update) done when the third node + // joins; it does not require reopening the store. + Replicas int + // OpTimeout bounds every KV operation so a stalled backend fails closed + // instead of hanging a request. Zero uses defaultKVOpTime. + OpTimeout time.Duration +} + +type jetstreamStore struct { + rooms jetstream.KeyValue + members jetstream.KeyValue + byMember jetstream.KeyValue + keys jetstream.KeyValue + users jetstream.KeyValue + opTimeout time.Duration +} + +// OpenJetStream creates (or opens) the five KV buckets on js with the configured +// replication factor and returns a Store backed by them. The JetStream context +// belongs to the caller (it owns the NATS connection); Close is a no-op. +func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) { + if cfg.Replicas <= 0 { + cfg.Replicas = 1 + } + opTimeout := cfg.OpTimeout + if opTimeout <= 0 { + opTimeout = defaultKVOpTime + } + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + s := &jetstreamStore{opTimeout: opTimeout} + for _, b := range []struct { + name string + dst *jetstream.KeyValue + }{ + {bucketRooms, &s.rooms}, + {bucketMembers, &s.members}, + {bucketByMember, &s.byMember}, + {bucketRoomKeys, &s.keys}, + {bucketUsers, &s.users}, + } { + kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: b.name, + Replicas: cfg.Replicas, + History: 1, + Storage: jetstream.FileStorage, + }) + if err != nil { + return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d): %w", b.name, cfg.Replicas, err) + } + *b.dst = kv + } + return s, nil +} + +// Close releases nothing: the JetStream context and NATS connection are owned by +// the caller, which closes them on shutdown. +func (s *jetstreamStore) Close() error { return nil } + +func (s *jetstreamStore) ctx() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), s.opTimeout) +} + +// ---- key helpers ---------------------------------------------------------- + +func memberKey(roomID, endpoint string) string { return roomID + "." + endpoint } +func byMemberKey(endpoint, roomID string) string { return endpoint + "." + roomID } +func sealedKey(roomID, endpoint string, e int) string { + return roomID + "." + endpoint + "." + strconv.Itoa(e) +} + +// watchEntries collects every current entry whose key matches pattern (a KV +// watch with a ".*" wildcard), draining the watcher until the nil marker +// that signals "all initial values delivered". Tombstones are skipped. +func (s *jetstreamStore) watchEntries(kv jetstream.KeyValue, pattern string) ([]jetstream.KeyValueEntry, error) { + ctx, cancel := s.ctx() + defer cancel() + w, err := kv.Watch(ctx, pattern, jetstream.IgnoreDeletes()) + if err != nil { + return nil, err + } + defer w.Stop() + var out []jetstream.KeyValueEntry + for { + select { + case e := <-w.Updates(): + if e == nil { + return out, nil // initial snapshot complete + } + out = append(out, e) + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +// ---- rooms / members / keys ---------------------------------------------- + +func (s *jetstreamStore) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error { + ctx, cancel := s.ctx() + defer cancel() + + info.Epoch = 1 + roomJSON, err := json.Marshal(info) + if err != nil { + return fmt.Errorf("membership: marshal room: %w", err) + } + // Create (not Put) so a duplicate room id is rejected, matching SQLite's + // PRIMARY KEY behavior. + if _, err := s.rooms.Create(ctx, info.RoomID, roomJSON); err != nil { + if errors.Is(err, jetstream.ErrKeyExists) { + return fmt.Errorf("membership: room %q already exists", info.RoomID) + } + return fmt.Errorf("membership: create room: %w", err) + } + + owner := Member{Endpoint: info.OwnerEndpoint, Role: "owner", SignPub: ownerSignPub, KexPub: ownerKexPub} + if err := s.putMember(ctx, info.RoomID, owner); err != nil { + return err + } + if info.Encrypt { + if _, err := s.keys.Put(ctx, sealedKey(info.RoomID, info.OwnerEndpoint, 1), ownerSealedKey); err != nil { + return fmt.Errorf("membership: put owner key: %w", err) + } + } + return nil +} + +// putMember writes the member row and its reverse index together. +func (s *jetstreamStore) putMember(ctx context.Context, roomID string, m Member) error { + mb, err := json.Marshal(m) + if err != nil { + return fmt.Errorf("membership: marshal member: %w", err) + } + if _, err := s.members.Put(ctx, memberKey(roomID, m.Endpoint), mb); err != nil { + return fmt.Errorf("membership: put member: %w", err) + } + if _, err := s.byMember.Put(ctx, byMemberKey(m.Endpoint, roomID), []byte(m.Role)); err != nil { + return fmt.Errorf("membership: put member index: %w", err) + } + return nil +} + +func (s *jetstreamStore) GetRoom(roomID string) (RoomInfo, error) { + ctx, cancel := s.ctx() + defer cancel() + e, err := s.rooms.Get(ctx, roomID) + if err != nil { + if errors.Is(err, jetstream.ErrKeyNotFound) { + return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, ErrNotFound) + } + return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, err) + } + var info RoomInfo + if err := json.Unmarshal(e.Value(), &info); err != nil { + return RoomInfo{}, fmt.Errorf("membership: unmarshal room %q: %w", roomID, err) + } + return info, nil +} + +func (s *jetstreamStore) AddMember(roomID string, m Member, epoch int, sealedKeyBytes []byte) error { + ctx, cancel := s.ctx() + defer cancel() + if err := s.putMember(ctx, roomID, m); err != nil { + return err + } + if len(sealedKeyBytes) > 0 { + if _, err := s.keys.Put(ctx, sealedKey(roomID, m.Endpoint, epoch), sealedKeyBytes); err != nil { + return fmt.Errorf("membership: put member key: %w", err) + } + } + return nil +} + +func (s *jetstreamStore) GetMember(roomID, endpoint string) (Member, error) { + ctx, cancel := s.ctx() + defer cancel() + e, err := s.members.Get(ctx, memberKey(roomID, endpoint)) + if err != nil { + if errors.Is(err, jetstream.ErrKeyNotFound) { + return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, ErrNotFound) + } + return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, err) + } + var m Member + if err := json.Unmarshal(e.Value(), &m); err != nil { + return Member{}, fmt.Errorf("membership: unmarshal member: %w", err) + } + return m, nil +} + +func (s *jetstreamStore) ListMembers(roomID string) ([]Member, error) { + entries, err := s.watchEntries(s.members, roomID+".*") + if err != nil { + return nil, fmt.Errorf("membership: list members %q: %w", roomID, err) + } + out := make([]Member, 0, len(entries)) + for _, e := range entries { + var m Member + if err := json.Unmarshal(e.Value(), &m); err != nil { + return nil, fmt.Errorf("membership: unmarshal member: %w", err) + } + out = append(out, m) + } + sort.Slice(out, func(i, j int) bool { return out[i].Endpoint < out[j].Endpoint }) + return out, nil +} + +func (s *jetstreamStore) ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error) { + entries, err := s.watchEntries(s.byMember, endpoint+".*") + if err != nil { + return nil, fmt.Errorf("membership: list rooms for endpoint %q: %w", endpoint, err) + } + out := make([]RoomMembership, 0, len(entries)) + for _, e := range entries { + // Key is "."; the roomID is everything after the dot. + roomID := e.Key()[len(endpoint)+1:] + info, err := s.GetRoom(roomID) + if err != nil { + if errors.Is(err, ErrNotFound) { + continue // index points at a removed room: skip, stay consistent + } + return nil, err + } + out = append(out, RoomMembership{RoomInfo: info, Role: string(e.Value())}) + } + sort.Slice(out, func(i, j int) bool { return out[i].RoomID < out[j].RoomID }) + return out, nil +} + +func (s *jetstreamStore) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) { + if epoch > 0 { + ctx, cancel := s.ctx() + defer cancel() + e, err := s.keys.Get(ctx, sealedKey(roomID, endpoint, epoch)) + if err != nil { + if errors.Is(err, jetstream.ErrKeyNotFound) { + return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, ErrNotFound) + } + return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, err) + } + return epoch, e.Value(), nil + } + // epoch <= 0: latest. Enumerate "..*" and take the max. + entries, err := s.watchEntries(s.keys, roomID+"."+endpoint+".*") + if err != nil { + return 0, nil, fmt.Errorf("membership: get latest sealed key %q/%q: %w", roomID, endpoint, err) + } + bestEpoch, bestVal := -1, []byte(nil) + for _, e := range entries { + k := e.Key() + ep, perr := strconv.Atoi(k[len(roomID)+1+len(endpoint)+1:]) + if perr != nil { + continue + } + if ep > bestEpoch { + bestEpoch, bestVal = ep, e.Value() + } + } + if bestEpoch < 0 { + return 0, nil, fmt.Errorf("membership: get latest sealed key %q/%q: %w", roomID, endpoint, ErrNotFound) + } + return bestEpoch, bestVal, nil +} + +func (s *jetstreamStore) PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error { + ctx, cancel := s.ctx() + defer cancel() + for endpoint, sealed := range keys { + if _, err := s.keys.Put(ctx, sealedKey(roomID, endpoint, epoch), sealed); err != nil { + return fmt.Errorf("membership: put sealed key for %q: %w", endpoint, err) + } + } + return nil +} + +func (s *jetstreamStore) BumpEpoch(roomID string, newEpoch int) error { + // Read-modify-write the room's epoch. The control plane serializes rekeys per + // room (owner-signed), so the lost-update window is not exercised in practice. + info, err := s.GetRoom(roomID) + if err != nil { + return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err) + } + info.Epoch = newEpoch + b, err := json.Marshal(info) + if err != nil { + return fmt.Errorf("membership: marshal room: %w", err) + } + ctx, cancel := s.ctx() + defer cancel() + if _, err := s.rooms.Put(ctx, roomID, b); err != nil { + return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err) + } + return nil +} + +func (s *jetstreamStore) RemoveMember(roomID, endpoint string) error { + ctx, cancel := s.ctx() + defer cancel() + // Drop the member row and its reverse index. Past-epoch sealed keys are left + // intact (they only decrypt data the member could already read), matching the + // SQLite store. + if err := s.members.Delete(ctx, memberKey(roomID, endpoint)); err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) { + return fmt.Errorf("membership: remove member %q/%q: %w", roomID, endpoint, err) + } + if err := s.byMember.Delete(ctx, byMemberKey(endpoint, roomID)); err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) { + return fmt.Errorf("membership: remove member index %q/%q: %w", roomID, endpoint, err) + } + return nil +} + +// ---- users (the bus allowlist) ------------------------------------------- + +func (s *jetstreamStore) AddUser(signPub, handle, role string) error { + signPub = normalizeSignPub(signPub) + if signPub == "" || handle == "" { + return fmt.Errorf("membership: AddUser: sign_pub and handle required") + } + if role == "" { + role = RoleMember + } + if role != RoleAdmin && role != RoleMember { + return fmt.Errorf("membership: AddUser: invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember) + } + u := User{SignPub: signPub, Handle: handle, Role: role, Status: StatusActive, CreatedAt: nowRFC3339()} + b, err := json.Marshal(u) + if err != nil { + return fmt.Errorf("membership: marshal user: %w", err) + } + ctx, cancel := s.ctx() + defer cancel() + if _, err := s.users.Create(ctx, signPub, b); err != nil { + if errors.Is(err, jetstream.ErrKeyExists) { + return ErrUserExists + } + return fmt.Errorf("membership: insert user: %w", err) + } + return nil +} + +func (s *jetstreamStore) GetUser(signPub string) (User, error) { + signPub = normalizeSignPub(signPub) + ctx, cancel := s.ctx() + defer cancel() + e, err := s.users.Get(ctx, signPub) + if err != nil { + if errors.Is(err, jetstream.ErrKeyNotFound) { + return User{}, fmt.Errorf("membership: get user %q: %w", signPub, ErrNotFound) + } + return User{}, fmt.Errorf("membership: get user %q: %w", signPub, err) + } + var u User + if err := json.Unmarshal(e.Value(), &u); err != nil { + return User{}, fmt.Errorf("membership: unmarshal user: %w", err) + } + return u, nil +} + +func (s *jetstreamStore) ListUsers() ([]User, error) { + ctx, cancel := s.ctx() + w, err := s.users.WatchAll(ctx, jetstream.IgnoreDeletes()) + if err != nil { + cancel() + return nil, fmt.Errorf("membership: list users: %w", err) + } + defer cancel() + defer w.Stop() + var out []User + for { + select { + case e := <-w.Updates(): + if e == nil { + sort.Slice(out, func(i, j int) bool { + if out[i].Handle != out[j].Handle { + return out[i].Handle < out[j].Handle + } + return out[i].SignPub < out[j].SignPub + }) + return out, nil + } + var u User + if err := json.Unmarshal(e.Value(), &u); err != nil { + return nil, fmt.Errorf("membership: unmarshal user: %w", err) + } + out = append(out, u) + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +func (s *jetstreamStore) RevokeUser(signPub string) error { + signPub = normalizeSignPub(signPub) + u, err := s.GetUser(signPub) + if err != nil { + if errors.Is(err, ErrNotFound) { + return fmt.Errorf("membership: revoke user %q: no active user with that key", signPub) + } + return fmt.Errorf("membership: revoke user %q: %w", signPub, err) + } + if u.Status != StatusActive { + return fmt.Errorf("membership: revoke user %q: no active user with that key", signPub) + } + u.Status = StatusRevoked + u.RevokedAt = nowRFC3339() + b, err := json.Marshal(u) + if err != nil { + return fmt.Errorf("membership: marshal user: %w", err) + } + ctx, cancel := s.ctx() + defer cancel() + if _, err := s.users.Put(ctx, signPub, b); err != nil { + return fmt.Errorf("membership: revoke user %q: %w", signPub, err) + } + return nil +} + +// IsAuthorized reports whether signPub is an active bus user. Any backend error +// (including a KV quorum loss or timeout) yields false: fail closed. +func (s *jetstreamStore) IsAuthorized(signPub string) bool { + signPub = normalizeSignPub(signPub) + if signPub == "" { + return false + } + ctx, cancel := s.ctx() + defer cancel() + e, err := s.users.Get(ctx, signPub) + if err != nil { + return false + } + var u User + if err := json.Unmarshal(e.Value(), &u); err != nil { + return false + } + return u.Status == StatusActive +} + +// HasAdmin reports whether at least one active admin exists. On any backend +// error it returns false, keeping the admin-gated endpoints closed (conservative). +func (s *jetstreamStore) HasAdmin() bool { + users, err := s.ListUsers() + if err != nil { + return false + } + for _, u := range users { + if u.Role == RoleAdmin && u.Status == StatusActive { + return true + } + } + return false +} diff --git a/pkg/membership/jetstream_store_test.go b/pkg/membership/jetstream_store_test.go new file mode 100644 index 0000000..effad8c --- /dev/null +++ b/pkg/membership/jetstream_store_test.go @@ -0,0 +1,275 @@ +package membership + +import ( + "bytes" + "errors" + "net" + "testing" + "time" + + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + server "github.com/nats-io/nats-server/v2/server" +) + +func kvFreePort(t *testing.T) int { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("free port: %v", err) + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port +} + +// newKVStore boots a single-node embedded NATS with JetStream and opens a +// jetstreamStore (R1) over it, returning the store plus the server and +// connection so a test can shut the backend down to exercise fail-closed paths. +func newKVStore(t *testing.T) (*jetstreamStore, *server.Server, *nats.Conn) { + t.Helper() + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), + Host: "127.0.0.1", + Port: kvFreePort(t), + }) + if err != nil { + t.Fatalf("embedded nats: %v", err) + } + nc, err := nats.Connect(ns.ClientURL()) + if err != nil { + ns.Shutdown() + t.Fatalf("nats connect: %v", err) + } + js, err := jetstream.New(nc) + if err != nil { + nc.Close() + ns.Shutdown() + t.Fatalf("jetstream: %v", err) + } + st, err := OpenJetStream(js, JetStreamConfig{Replicas: 1, OpTimeout: 2 * time.Second}) + if err != nil { + nc.Close() + ns.Shutdown() + t.Fatalf("open jetstream store: %v", err) + } + t.Cleanup(func() { + nc.Close() + ns.Shutdown() + ns.WaitForShutdown() + }) + return st.(*jetstreamStore), ns, nc +} + +// TestJetStreamStoreRoomsCRUD is the golden path: an encrypted room with an owner +// and an invited member round-trips through every room/member/key method. +func TestJetStreamStoreRoomsCRUD(t *testing.T) { + s, _, _ := newKVStore(t) + + roomID := newULID() + owner := "owner-ep-1" + info := RoomInfo{RoomID: roomID, Subject: "room.kv", Encrypt: true, Persist: true, SignMsgs: true, OwnerEndpoint: owner} + ownerSealed := []byte("sealed-owner-epoch1") + if err := s.CreateRoom(info, []byte("owner-sign"), []byte("owner-kex"), ownerSealed); err != nil { + t.Fatalf("CreateRoom: %v", err) + } + + // GetRoom returns epoch 1 and the policy. + got, err := s.GetRoom(roomID) + if err != nil { + t.Fatalf("GetRoom: %v", err) + } + if got.Epoch != 1 || got.Subject != "room.kv" || !got.Encrypt || got.OwnerEndpoint != owner { + t.Fatalf("GetRoom mismatch: %+v", got) + } + + // Owner is a member with role "owner". + om, err := s.GetMember(roomID, owner) + if err != nil { + t.Fatalf("GetMember owner: %v", err) + } + if om.Role != "owner" || !bytes.Equal(om.SignPub, []byte("owner-sign")) { + t.Fatalf("owner member mismatch: %+v", om) + } + + // Owner's sealed key at epoch 1. + ep, sealed, err := s.GetSealedKey(roomID, owner, 1) + if err != nil || ep != 1 || !bytes.Equal(sealed, ownerSealed) { + t.Fatalf("GetSealedKey owner: ep=%d sealed=%q err=%v", ep, sealed, err) + } + + // Invite a member with a sealed key at epoch 1. + bob := "member-ep-bob" + bobSealed := []byte("sealed-bob-epoch1") + if err := s.AddMember(roomID, Member{Endpoint: bob, Role: "member", SignPub: []byte("bob-sign"), KexPub: []byte("bob-kex")}, 1, bobSealed); err != nil { + t.Fatalf("AddMember: %v", err) + } + + // ListMembers returns both, sorted by endpoint. + members, err := s.ListMembers(roomID) + if err != nil { + t.Fatalf("ListMembers: %v", err) + } + if len(members) != 2 { + t.Fatalf("ListMembers want 2, got %d (%+v)", len(members), members) + } + + // Bob can find the room via the reverse index. + rooms, err := s.ListRoomsForEndpoint(bob) + if err != nil { + t.Fatalf("ListRoomsForEndpoint: %v", err) + } + if len(rooms) != 1 || rooms[0].RoomID != roomID || rooms[0].Role != "member" { + t.Fatalf("ListRoomsForEndpoint mismatch: %+v", rooms) + } + + // Latest sealed key (epoch <= 0) resolves to epoch 1 for bob. + lep, lsealed, err := s.GetSealedKey(roomID, bob, 0) + if err != nil || lep != 1 || !bytes.Equal(lsealed, bobSealed) { + t.Fatalf("GetSealedKey latest bob: ep=%d err=%v", lep, err) + } + + // Rekey to epoch 2 (bump + new sealed keys), then latest resolves to 2. + if err := s.BumpEpoch(roomID, 2); err != nil { + t.Fatalf("BumpEpoch: %v", err) + } + if err := s.PutSealedKeys(roomID, 2, map[string][]byte{owner: []byte("owner-epoch2")}); err != nil { + t.Fatalf("PutSealedKeys: %v", err) + } + got2, _ := s.GetRoom(roomID) + if got2.Epoch != 2 { + t.Fatalf("after BumpEpoch want epoch 2, got %d", got2.Epoch) + } + lep2, _, err := s.GetSealedKey(roomID, owner, 0) + if err != nil || lep2 != 2 { + t.Fatalf("latest owner key after rekey: ep=%d err=%v", lep2, err) + } + + // Remove bob; he disappears from members and his reverse index. + if err := s.RemoveMember(roomID, bob); err != nil { + t.Fatalf("RemoveMember: %v", err) + } + if _, err := s.GetMember(roomID, bob); !errors.Is(err, ErrNotFound) { + t.Fatalf("GetMember after remove want ErrNotFound, got %v", err) + } + rooms2, _ := s.ListRoomsForEndpoint(bob) + if len(rooms2) != 0 { + t.Fatalf("ListRoomsForEndpoint after remove want 0, got %d", len(rooms2)) + } +} + +// TestJetStreamStoreUsers exercises the allowlist: add, lookup, authorize, +// revoke (which flips IsAuthorized), and the admin gate. +func TestJetStreamStoreUsers(t *testing.T) { + s, _, _ := newKVStore(t) + + const aliceHex = "aa11" + if s.HasAdmin() { + t.Fatalf("fresh store should have no admin") + } + if err := s.AddUser(aliceHex, "alice", RoleAdmin); err != nil { + t.Fatalf("AddUser: %v", err) + } + if !s.HasAdmin() { + t.Fatalf("HasAdmin should be true after adding an admin") + } + if !s.IsAuthorized(aliceHex) { + t.Fatalf("alice should be authorized") + } + // Case-insensitive lookup (keys are normalized lowercase). + if !s.IsAuthorized("AA11") { + t.Fatalf("uppercase hex should normalize and authorize") + } + u, err := s.GetUser(aliceHex) + if err != nil || u.Handle != "alice" || u.Role != RoleAdmin || u.Status != StatusActive { + t.Fatalf("GetUser mismatch: %+v err=%v", u, err) + } + + // Duplicate add is rejected with ErrUserExists. + if err := s.AddUser(aliceHex, "alice2", RoleMember); !errors.Is(err, ErrUserExists) { + t.Fatalf("duplicate AddUser want ErrUserExists, got %v", err) + } + + if err := s.AddUser("bb22", "bob", RoleMember); err != nil { + t.Fatalf("AddUser bob: %v", err) + } + users, err := s.ListUsers() + if err != nil || len(users) != 2 { + t.Fatalf("ListUsers want 2, got %d err=%v", len(users), err) + } + + // Revoke alice: authorization flips off immediately. + if err := s.RevokeUser(aliceHex); err != nil { + t.Fatalf("RevokeUser: %v", err) + } + if s.IsAuthorized(aliceHex) { + t.Fatalf("revoked user must not be authorized") + } + if s.HasAdmin() { + t.Fatalf("after revoking the only admin, HasAdmin must be false") + } + // Revoking again is an error (no active user). + if err := s.RevokeUser(aliceHex); err == nil { + t.Fatalf("re-revoke should error") + } +} + +// TestJetStreamStoreNotFound checks the ErrNotFound mapping for misses. +func TestJetStreamStoreNotFound(t *testing.T) { + s, _, _ := newKVStore(t) + if _, err := s.GetRoom("nope"); !errors.Is(err, ErrNotFound) { + t.Fatalf("GetRoom miss want ErrNotFound, got %v", err) + } + if _, err := s.GetMember("nope", "x"); !errors.Is(err, ErrNotFound) { + t.Fatalf("GetMember miss want ErrNotFound, got %v", err) + } + if _, _, err := s.GetSealedKey("nope", "x", 1); !errors.Is(err, ErrNotFound) { + t.Fatalf("GetSealedKey miss want ErrNotFound, got %v", err) + } + if _, _, err := s.GetSealedKey("nope", "x", 0); !errors.Is(err, ErrNotFound) { + t.Fatalf("GetSealedKey latest miss want ErrNotFound, got %v", err) + } + if _, err := s.GetUser("ffff"); !errors.Is(err, ErrNotFound) { + t.Fatalf("GetUser miss want ErrNotFound, got %v", err) + } +} + +// TestJetStreamStoreIsAuthorizedFailClosed is the error path mandated by the +// issue: when the KV backend is unavailable (here the NATS server is shut down), +// IsAuthorized must DENY, never admit. A previously-authorized identity flips to +// unauthorized once the backend cannot be reached. +func TestJetStreamStoreIsAuthorizedFailClosed(t *testing.T) { + s, ns, nc := newKVStore(t) + + const aliceHex = "abcd" + if err := s.AddUser(aliceHex, "alice", RoleAdmin); err != nil { + t.Fatalf("AddUser: %v", err) + } + if !s.IsAuthorized(aliceHex) { + t.Fatalf("alice should be authorized while the backend is up") + } + + // Take the KV backend away: close the client and stop the server. Every + // subsequent KV Get fails, and the store must fail closed. + nc.Close() + ns.Shutdown() + ns.WaitForShutdown() + + // Bound the assertion: IsAuthorized internally caps each op at OpTimeout, so + // this returns well before the test deadline. + done := make(chan bool, 1) + go func() { done <- s.IsAuthorized(aliceHex) }() + select { + case authorized := <-done: + if authorized { + t.Fatalf("KV backend down but IsAuthorized returned true: NOT fail-closed") + } + case <-time.After(10 * time.Second): + t.Fatalf("IsAuthorized hung when the backend was down (no bounded timeout)") + } + + // HasAdmin is likewise conservative: backend down -> false (gates stay closed). + if s.HasAdmin() { + t.Fatalf("KV backend down but HasAdmin returned true: NOT fail-closed") + } +} diff --git a/pkg/membership/server.go b/pkg/membership/server.go index c5a2724..75136cf 100644 --- a/pkg/membership/server.go +++ b/pkg/membership/server.go @@ -3,7 +3,6 @@ package membership import ( "bytes" "context" - "database/sql" "encoding/json" "errors" "fmt" @@ -56,7 +55,7 @@ const ( // rate limiting, and read endpoints (GET) are unauthenticated. Hardening // (mTLS, capabilities, rate limits) is a later phase. type Server struct { - store *Store + store Store blobs *blobstore.Store mux *http.ServeMux authMode AuthMode @@ -79,7 +78,7 @@ type Server struct { // tests that have not migrated to signed requests yet). It installs a per-IP // rate limiter with the package defaults; loopback dev behavior is unchanged // because the burst comfortably exceeds any single client's request rate. -func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server { +func NewServer(store Store, blobs *blobstore.Store, authMode AuthMode) *Server { s := &Server{ store: store, blobs: blobs, @@ -456,7 +455,7 @@ func (s *Server) handleGetKey(w http.ResponseWriter, r *http.Request) { } ep, sealed, err := s.store.GetSealedKey(roomID, endpoint, epoch) if err != nil { - if errors.Is(err, sql.ErrNoRows) { + if errors.Is(err, ErrNotFound) { writeErr(w, http.StatusForbidden, "not invited to this encrypted room: no key has been sealed for your identity. Ask the room owner to invite you before joining.") return diff --git a/pkg/membership/store.go b/pkg/membership/store.go index 6e4a7e2..fb57257 100644 --- a/pkg/membership/store.go +++ b/pkg/membership/store.go @@ -13,6 +13,7 @@ package membership import ( "database/sql" "embed" + "errors" "fmt" "io/fs" "sort" @@ -26,6 +27,14 @@ import ( //go:embed migrations/*.sql var migrationsFS embed.FS +// ErrNotFound is the store-agnostic "no such record" sentinel. Both backends +// (SQLite and JetStream KV) return it, wrapped, when a lookup misses, so callers +// distinguish "not invited / no key yet" from a genuine backend failure without +// depending on a specific driver's error (the SQLite store maps sql.ErrNoRows to +// it; the KV store maps a missing key to it). This is what lets the control +// plane stay storage-agnostic under the branch-by-abstraction of issue 0003b. +var ErrNotFound = errors.New("membership: not found") + // Member is a participant of a room with their published public keys. type Member struct { Endpoint string `json:"endpoint"` @@ -45,14 +54,58 @@ type RoomInfo struct { OwnerEndpoint string } -// Store is the SQLite-backed membership/key store. -type Store struct { +// Store is the membership/key control-plane store: the authoritative source of +// room metadata, the member directory, per-epoch sealed room keys, and the bus +// user allowlist. It is an interface (branch-by-abstraction, issue 0003b) with +// two implementations: sqliteStore (the default, single-node, local SQLite) and +// jetstreamStore (rooms/members/keys/users on replicated JetStream KV, selected +// when the `decentralized` flag is on). Every lookup miss returns ErrNotFound +// (wrapped); every implementation MUST fail closed (IsAuthorized returns false +// on any backend error), so a KV quorum loss denies rather than admits. +type Store interface { + // Rooms / members / keys. + CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error + GetRoom(roomID string) (RoomInfo, error) + AddMember(roomID string, m Member, epoch int, sealedKey []byte) error + GetMember(roomID, endpoint string) (Member, error) + ListMembers(roomID string) ([]Member, error) + ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error) + GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) + PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error + BumpEpoch(roomID string, newEpoch int) error + RemoveMember(roomID, endpoint string) error + + // Users (the bus allowlist). + AddUser(signPub, handle, role string) error + GetUser(signPub string) (User, error) + ListUsers() ([]User, error) + RevokeUser(signPub string) error + IsAuthorized(signPub string) bool + HasAdmin() bool + + // Lifecycle. + Close() error +} + +// sqliteStore is the SQLite-backed implementation of Store (the default, +// single-node backend). It stays the production default while the +// `decentralized` flag is off. +type sqliteStore struct { db *sql.DB } -// Open opens (creating if needed) the SQLite database at path and applies all -// embedded migrations idempotently. -func Open(path string) (*Store, error) { +// Open opens (creating if needed) the SQLite database at path, applies all +// embedded migrations idempotently, and returns it as a Store. It remains the +// default control-plane backend; the JetStream KV store is opened separately +// (OpenJetStream) when decentralization is enabled. +func Open(path string) (Store, error) { + return openSQLite(path) +} + +// openSQLite is the concrete constructor, returning *sqliteStore so internal +// callers (e.g. the SQLite->KV migration) can use SQLite-specific helpers that +// are not part of the storage-agnostic Store interface. +func openSQLite(path string) (*sqliteStore, error) { // _pragma busy_timeout avoids spurious "database is locked" under concurrent // HTTP handlers; foreign_keys kept off — we manage referential integrity in code. dsn := fmt.Sprintf("file:%s?_pragma=busy_timeout(5000)&_pragma=journal_mode(WAL)", path) @@ -64,7 +117,7 @@ func Open(path string) (*Store, error) { db.Close() return nil, fmt.Errorf("membership: ping db: %w", err) } - s := &Store{db: db} + s := &sqliteStore{db: db} if err := s.applyMigrations(); err != nil { db.Close() return nil, err @@ -73,11 +126,11 @@ func Open(path string) (*Store, error) { } // Close closes the underlying database. -func (s *Store) Close() error { return s.db.Close() } +func (s *sqliteStore) Close() error { return s.db.Close() } // applyMigrations runs every embedded migration in lexical order, tolerating // the "already applied" errors that SQLite's non-idempotent DDL produces. -func (s *Store) applyMigrations() error { +func (s *sqliteStore) applyMigrations() error { files, err := fs.Glob(migrationsFS, "migrations/*.sql") if err != nil { return fmt.Errorf("membership: glob migrations: %w", err) @@ -103,7 +156,7 @@ func nowRFC3339() string { return time.Now().UTC().Format(time.RFC3339Nano) } // CreateRoom inserts a room at epoch 1, registers the owner as a member with // role "owner", and stores the owner's sealed key for epoch 1. Idempotent // inserts are not used: a duplicate room_id returns an error. -func (s *Store) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error { +func (s *sqliteStore) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error { tx, err := s.db.Begin() if err != nil { return fmt.Errorf("membership: begin: %w", err) @@ -142,7 +195,7 @@ func (s *Store) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealed } // GetRoom returns room metadata (including current epoch). -func (s *Store) GetRoom(roomID string) (RoomInfo, error) { +func (s *sqliteStore) GetRoom(roomID string) (RoomInfo, error) { var info RoomInfo var enc, per, sgn int err := s.db.QueryRow( @@ -158,7 +211,7 @@ func (s *Store) GetRoom(roomID string) (RoomInfo, error) { // AddMember inserts a member at the given role and stores their sealed key for // the supplied epoch. -func (s *Store) AddMember(roomID string, m Member, epoch int, sealedKey []byte) error { +func (s *sqliteStore) AddMember(roomID string, m Member, epoch int, sealedKey []byte) error { tx, err := s.db.Begin() if err != nil { return fmt.Errorf("membership: begin: %w", err) @@ -185,7 +238,7 @@ func (s *Store) AddMember(roomID string, m Member, epoch int, sealedKey []byte) } // GetMember returns a single member of a room. -func (s *Store) GetMember(roomID, endpoint string) (Member, error) { +func (s *sqliteStore) GetMember(roomID, endpoint string) (Member, error) { var m Member err := s.db.QueryRow( `SELECT endpoint, role, sign_pub, kex_pub FROM members WHERE room_id = ? AND endpoint = ?`, @@ -198,7 +251,7 @@ func (s *Store) GetMember(roomID, endpoint string) (Member, error) { } // ListMembers returns all members of a room ordered by endpoint. -func (s *Store) ListMembers(roomID string) ([]Member, error) { +func (s *sqliteStore) ListMembers(roomID string) ([]Member, error) { rows, err := s.db.Query( `SELECT endpoint, role, sign_pub, kex_pub FROM members WHERE room_id = ? ORDER BY endpoint`, roomID, @@ -230,7 +283,7 @@ type RoomMembership struct { // ListRoomsForEndpoint returns every room the given endpoint is a member of, // with the room's current metadata and the endpoint's role, ordered by room id. // An endpoint that is in no rooms yields an empty slice (not an error). -func (s *Store) ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error) { +func (s *sqliteStore) ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error) { rows, err := s.db.Query( `SELECT r.room_id, r.subject, r.key_epoch, r.encrypt, r.persist, r.sign_msgs, r.owner_endpoint, m.role FROM members m JOIN rooms r ON r.room_id = m.room_id @@ -257,7 +310,7 @@ func (s *Store) ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error) // GetSealedKey returns the sealed room key for an endpoint at a given epoch. // If epoch <= 0, the latest epoch for that endpoint is returned. -func (s *Store) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) { +func (s *sqliteStore) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) { var ep int var sealed []byte var err error @@ -275,6 +328,12 @@ func (s *Store) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, e ).Scan(&ep, &sealed) } if err != nil { + // Map "no such row" to the store-agnostic sentinel so the control plane + // can tell "not invited / no key yet" (-> 403 with a helpful message) from + // a genuine backend failure, the same way the KV store will. + if errors.Is(err, sql.ErrNoRows) { + return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, ErrNotFound) + } return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, err) } return ep, sealed, nil @@ -282,7 +341,7 @@ func (s *Store) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, e // PutSealedKeys stores a batch of sealed keys for the given epoch (endpoint -> // sealed bytes), upserting on conflict so a rekey can overwrite stale entries. -func (s *Store) PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error { +func (s *sqliteStore) PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error { tx, err := s.db.Begin() if err != nil { return fmt.Errorf("membership: begin: %w", err) @@ -301,7 +360,7 @@ func (s *Store) PutSealedKeys(roomID string, epoch int, keys map[string][]byte) } // BumpEpoch sets the room's current key_epoch to newEpoch. -func (s *Store) BumpEpoch(roomID string, newEpoch int) error { +func (s *sqliteStore) BumpEpoch(roomID string, newEpoch int) error { if _, err := s.db.Exec(`UPDATE rooms SET key_epoch = ? WHERE room_id = ?`, newEpoch, roomID); err != nil { return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err) } @@ -310,7 +369,7 @@ func (s *Store) BumpEpoch(roomID string, newEpoch int) error { // RemoveMember deletes a member from a room. Their sealed keys for past epochs // are left intact (they encrypt only data that member could already read). -func (s *Store) RemoveMember(roomID, endpoint string) error { +func (s *sqliteStore) RemoveMember(roomID, endpoint string) error { if _, err := s.db.Exec(`DELETE FROM members WHERE room_id = ? AND endpoint = ?`, roomID, endpoint); err != nil { return fmt.Errorf("membership: remove member %q/%q: %w", roomID, endpoint, err) } diff --git a/pkg/membership/store_test.go b/pkg/membership/store_test.go index 2d324cc..83c2c0f 100644 --- a/pkg/membership/store_test.go +++ b/pkg/membership/store_test.go @@ -6,10 +6,10 @@ import ( "testing" ) -func openTestStore(t *testing.T) *Store { +func openTestStore(t *testing.T) *sqliteStore { t.Helper() path := filepath.Join(t.TempDir(), "test.db") - s, err := Open(path) + s, err := openSQLite(path) if err != nil { t.Fatalf("Open: %v", err) } diff --git a/pkg/membership/users.go b/pkg/membership/users.go index 7aeb1c5..3a65597 100644 --- a/pkg/membership/users.go +++ b/pkg/membership/users.go @@ -45,7 +45,7 @@ func normalizeSignPub(signPub string) string { // AddUser inserts a new bus user. role defaults to RoleMember when empty. It // returns ErrUserExists if the sign_pub is already registered (the caller may // choose to revoke+re-add or ignore). handle and signPub must be non-empty. -func (s *Store) AddUser(signPub, handle, role string) error { +func (s *sqliteStore) AddUser(signPub, handle, role string) error { signPub = normalizeSignPub(signPub) if signPub == "" || handle == "" { return fmt.Errorf("membership: AddUser: sign_pub and handle required") @@ -74,7 +74,7 @@ func (s *Store) AddUser(signPub, handle, role string) error { // GetUser returns the user with the given signing public key. It returns // sql.ErrNoRows (wrapped) when there is no such user. -func (s *Store) GetUser(signPub string) (User, error) { +func (s *sqliteStore) GetUser(signPub string) (User, error) { signPub = normalizeSignPub(signPub) var u User var revoked sql.NullString @@ -90,7 +90,7 @@ func (s *Store) GetUser(signPub string) (User, error) { } // ListUsers returns every user ordered by handle then sign_pub (stable output). -func (s *Store) ListUsers() ([]User, error) { +func (s *sqliteStore) ListUsers() ([]User, error) { rows, err := s.db.Query( `SELECT sign_pub, handle, role, status, created_at, revoked_at FROM users ORDER BY handle, sign_pub`, ) @@ -116,7 +116,7 @@ func (s *Store) ListUsers() ([]User, error) { // status flip (not a delete) so the identity stays auditable and IsAuthorized // immediately denies it on both planes. Revoking an unknown or already-revoked // user returns an error / is a no-op respectively. -func (s *Store) RevokeUser(signPub string) error { +func (s *sqliteStore) RevokeUser(signPub string) error { signPub = normalizeSignPub(signPub) res, err := s.db.Exec( `UPDATE users SET status = ?, revoked_at = ? WHERE sign_pub = ? AND status = ?`, @@ -140,7 +140,7 @@ func (s *Store) RevokeUser(signPub string) error { // plane (HTTP request middleware) and the data plane (NATS nkey authenticator), // so revoking a user denies access on both without restarting anything. An // unknown key, a revoked key, or any query error all yield false (fail closed). -func (s *Store) IsAuthorized(signPub string) bool { +func (s *sqliteStore) IsAuthorized(signPub string) bool { signPub = normalizeSignPub(signPub) if signPub == "" { return false @@ -155,7 +155,7 @@ func (s *Store) IsAuthorized(signPub string) bool { // HasAdmin reports whether at least one active admin exists. The control plane // uses it to gate user-management endpoints: until the host operator seeds the // first admin via the local CLI, those endpoints stay closed (chicken-egg). -func (s *Store) HasAdmin() bool { +func (s *sqliteStore) HasAdmin() bool { var one int err := s.db.QueryRow( `SELECT 1 FROM users WHERE role = ? AND status = ? LIMIT 1`, RoleAdmin, StatusActive,