package membership // jetstreamStore is the JetStream KV implementation of Store (issue 0003b): the // control-plane state (rooms, members, sealed room keys, the user allowlist) // lives in replicated JetStream Key/Value buckets instead of a process-local // SQLite file. Any node in the cluster reads and writes the same buckets, and // JetStream's RAFT layer keeps them consistent across replicas, so the HTTP // control plane becomes effectively stateless: any membershipd can serve any // request. It is selected only when the `decentralized` flag is on; sqliteStore // stays the default. // // Key layout (every path segment is a single KV token — ULIDs, RawURL endpoint // ids and lowercase-hex keys never contain a '.', so '.' is a safe separator and // a ".*" watch enumerates exactly one trailing token): // // rooms roomID -> RoomInfo (JSON) // members roomID.endpoint -> Member (JSON, carries Role) // rooms_by_member endpoint.roomID -> role (reverse index for ListRoomsForEndpoint) // room_keys roomID.endpoint.epoch -> sealed_key bytes // users signPubHex -> User (JSON) // // Consistency caveat: KV has no multi-key transaction, so a multi-write op // (CreateRoom, AddMember) is a short sequence of single-key writes. The order is // chosen so a partial failure leaves a recoverable state (the room/member row // before its reverse index or sealed key), and writes are idempotent (Put // overwrites), which is also what makes the SQLite->KV migration (0003c) safe to // re-run. // // Fail-closed: every read uses a bounded context, and IsAuthorized/HasAdmin // return false on ANY backend error (a KV quorum loss or timeout denies access // rather than admitting it), mirroring the SQLite store's behavior. import ( "context" "encoding/json" "errors" "fmt" "sort" "strconv" "strings" "time" "github.com/nats-io/nats.go/jetstream" ) // Bucket names (alphanumeric/dash/underscore only — no dots, per KV rules). const ( bucketRooms = "UNIBUS_rooms" bucketMembers = "UNIBUS_members" bucketByMember = "UNIBUS_rooms_by_member" bucketRoomKeys = "UNIBUS_room_keys" bucketUsers = "UNIBUS_users" defaultKVOpTime = 5 * time.Second ) // JetStreamConfig configures the KV-backed store. type JetStreamConfig struct { // Replicas is the per-bucket replication factor (R1..R5). Use 1 for a single // node or a 1-2 node rollout, 3 for real HA (quorum 2/3). Scaling R1->R3 in // place is an operational step (nats kv update) done when the third node // joins; it does not require reopening the store. Replicas int // OpTimeout bounds every KV operation so a stalled backend fails closed // instead of hanging a request. Zero uses defaultKVOpTime. OpTimeout time.Duration } type jetstreamStore struct { rooms jetstream.KeyValue members jetstream.KeyValue byMember jetstream.KeyValue keys jetstream.KeyValue users jetstream.KeyValue opTimeout time.Duration } // OpenJetStream creates (or opens) the five KV buckets on js with the configured // replication factor and returns a Store backed by them. The JetStream context // belongs to the caller (it owns the NATS connection); Close is a no-op. func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) { if cfg.Replicas <= 0 { cfg.Replicas = 1 } opTimeout := cfg.OpTimeout if opTimeout <= 0 { opTimeout = defaultKVOpTime } // Bootstrap budget for creating/opening the buckets. On a single node JetStream // is ready the instant the server starts, so the first attempt succeeds. On a // COLD multi-node cluster the JetStream meta-group must first elect a leader and // each node must establish contact with it before its $JS.API responds. A KV // op is a NATS request/reply: if it is published before the node's JetStream is // ready the request is dropped (not queued), and a single long-context call then // just blocks until it times out (issue 0006g). So we RETRY each bucket op with // short per-attempt contexts until it succeeds or the overall bootstrap budget // is exhausted; once the cluster is ready the next retry lands and the buckets // are created, after which they persist and every node opens them quickly. bootstrapBudget := 120 * time.Second deadline := time.Now().Add(bootstrapBudget) s := &jetstreamStore{opTimeout: opTimeout} for _, b := range []struct { name string dst *jetstream.KeyValue }{ {bucketRooms, &s.rooms}, {bucketMembers, &s.members}, {bucketByMember, &s.byMember}, {bucketRoomKeys, &s.keys}, {bucketUsers, &s.users}, } { var kv jetstream.KeyValue var lastErr error for { opCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) kv, lastErr = js.CreateOrUpdateKeyValue(opCtx, jetstream.KeyValueConfig{ Bucket: b.name, Replicas: cfg.Replicas, History: 1, Storage: jetstream.FileStorage, }) cancel() if lastErr == nil { break } if time.Now().After(deadline) { return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d) after %s: %w", b.name, cfg.Replicas, bootstrapBudget, lastErr) } // JetStream not ready yet (no meta leader / request dropped). Wait and // re-publish the op; in a cluster cold start this lands once the meta // group settles. time.Sleep(1 * time.Second) } *b.dst = kv } return s, nil } // Close releases nothing: the JetStream context and NATS connection are owned by // the caller, which closes them on shutdown. func (s *jetstreamStore) Close() error { return nil } func (s *jetstreamStore) ctx() (context.Context, context.CancelFunc) { return context.WithTimeout(context.Background(), s.opTimeout) } // ---- key helpers ---------------------------------------------------------- func memberKey(roomID, endpoint string) string { return roomID + "." + endpoint } func byMemberKey(endpoint, roomID string) string { return endpoint + "." + roomID } func sealedKey(roomID, endpoint string, e int) string { return roomID + "." + endpoint + "." + strconv.Itoa(e) } // watchEntries collects every current entry whose key matches pattern (a KV // watch with a ".*" wildcard), draining the watcher until the nil marker // that signals "all initial values delivered". Tombstones are skipped. func (s *jetstreamStore) watchEntries(kv jetstream.KeyValue, pattern string) ([]jetstream.KeyValueEntry, error) { ctx, cancel := s.ctx() defer cancel() w, err := kv.Watch(ctx, pattern, jetstream.IgnoreDeletes()) if err != nil { return nil, err } defer w.Stop() var out []jetstream.KeyValueEntry for { select { case e := <-w.Updates(): if e == nil { return out, nil // initial snapshot complete } out = append(out, e) case <-ctx.Done(): return nil, ctx.Err() } } } // ---- rooms / members / keys ---------------------------------------------- func (s *jetstreamStore) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error { ctx, cancel := s.ctx() defer cancel() info.Epoch = 1 roomJSON, err := json.Marshal(info) if err != nil { return fmt.Errorf("membership: marshal room: %w", err) } // Create (not Put) so a duplicate room id is rejected, matching SQLite's // PRIMARY KEY behavior. if _, err := s.rooms.Create(ctx, info.RoomID, roomJSON); err != nil { if errors.Is(err, jetstream.ErrKeyExists) { return fmt.Errorf("membership: room %q already exists", info.RoomID) } return fmt.Errorf("membership: create room: %w", err) } owner := Member{Endpoint: info.OwnerEndpoint, Role: "owner", SignPub: ownerSignPub, KexPub: ownerKexPub} if err := s.putMember(ctx, info.RoomID, owner); err != nil { return err } if info.Encrypt { if _, err := s.keys.Put(ctx, sealedKey(info.RoomID, info.OwnerEndpoint, 1), ownerSealedKey); err != nil { return fmt.Errorf("membership: put owner key: %w", err) } } return nil } // putMember writes the member row and its reverse index together. func (s *jetstreamStore) putMember(ctx context.Context, roomID string, m Member) error { mb, err := json.Marshal(m) if err != nil { return fmt.Errorf("membership: marshal member: %w", err) } if _, err := s.members.Put(ctx, memberKey(roomID, m.Endpoint), mb); err != nil { return fmt.Errorf("membership: put member: %w", err) } if _, err := s.byMember.Put(ctx, byMemberKey(m.Endpoint, roomID), []byte(m.Role)); err != nil { return fmt.Errorf("membership: put member index: %w", err) } return nil } func (s *jetstreamStore) GetRoom(roomID string) (RoomInfo, error) { ctx, cancel := s.ctx() defer cancel() e, err := s.rooms.Get(ctx, roomID) if err != nil { if errors.Is(err, jetstream.ErrKeyNotFound) { return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, ErrNotFound) } return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, err) } var info RoomInfo if err := json.Unmarshal(e.Value(), &info); err != nil { return RoomInfo{}, fmt.Errorf("membership: unmarshal room %q: %w", roomID, err) } return info, nil } func (s *jetstreamStore) AddMember(roomID string, m Member, epoch int, sealedKeyBytes []byte) error { ctx, cancel := s.ctx() defer cancel() if err := s.putMember(ctx, roomID, m); err != nil { return err } if len(sealedKeyBytes) > 0 { if _, err := s.keys.Put(ctx, sealedKey(roomID, m.Endpoint, epoch), sealedKeyBytes); err != nil { return fmt.Errorf("membership: put member key: %w", err) } } return nil } func (s *jetstreamStore) GetMember(roomID, endpoint string) (Member, error) { ctx, cancel := s.ctx() defer cancel() e, err := s.members.Get(ctx, memberKey(roomID, endpoint)) if err != nil { if errors.Is(err, jetstream.ErrKeyNotFound) { return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, ErrNotFound) } return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, err) } var m Member if err := json.Unmarshal(e.Value(), &m); err != nil { return Member{}, fmt.Errorf("membership: unmarshal member: %w", err) } return m, nil } func (s *jetstreamStore) ListMembers(roomID string) ([]Member, error) { entries, err := s.watchEntries(s.members, roomID+".*") if err != nil { return nil, fmt.Errorf("membership: list members %q: %w", roomID, err) } out := make([]Member, 0, len(entries)) for _, e := range entries { var m Member if err := json.Unmarshal(e.Value(), &m); err != nil { return nil, fmt.Errorf("membership: unmarshal member: %w", err) } out = append(out, m) } sort.Slice(out, func(i, j int) bool { return out[i].Endpoint < out[j].Endpoint }) return out, nil } func (s *jetstreamStore) ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error) { entries, err := s.watchEntries(s.byMember, endpoint+".*") if err != nil { return nil, fmt.Errorf("membership: list rooms for endpoint %q: %w", endpoint, err) } out := make([]RoomMembership, 0, len(entries)) for _, e := range entries { // Key is "."; the roomID is everything after the dot. roomID := e.Key()[len(endpoint)+1:] info, err := s.GetRoom(roomID) if err != nil { if errors.Is(err, ErrNotFound) { continue // index points at a removed room: skip, stay consistent } return nil, err } out = append(out, RoomMembership{RoomInfo: info, Role: string(e.Value())}) } sort.Slice(out, func(i, j int) bool { return out[i].RoomID < out[j].RoomID }) return out, nil } func (s *jetstreamStore) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) { if epoch > 0 { ctx, cancel := s.ctx() defer cancel() e, err := s.keys.Get(ctx, sealedKey(roomID, endpoint, epoch)) if err != nil { if errors.Is(err, jetstream.ErrKeyNotFound) { return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, ErrNotFound) } return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, err) } return epoch, e.Value(), nil } // epoch <= 0: latest. Enumerate "..*" and take the max. entries, err := s.watchEntries(s.keys, roomID+"."+endpoint+".*") if err != nil { return 0, nil, fmt.Errorf("membership: get latest sealed key %q/%q: %w", roomID, endpoint, err) } bestEpoch, bestVal := -1, []byte(nil) for _, e := range entries { k := e.Key() ep, perr := strconv.Atoi(k[len(roomID)+1+len(endpoint)+1:]) if perr != nil { continue } if ep > bestEpoch { bestEpoch, bestVal = ep, e.Value() } } if bestEpoch < 0 { return 0, nil, fmt.Errorf("membership: get latest sealed key %q/%q: %w", roomID, endpoint, ErrNotFound) } return bestEpoch, bestVal, nil } func (s *jetstreamStore) PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error { ctx, cancel := s.ctx() defer cancel() for endpoint, sealed := range keys { if _, err := s.keys.Put(ctx, sealedKey(roomID, endpoint, epoch), sealed); err != nil { return fmt.Errorf("membership: put sealed key for %q: %w", endpoint, err) } } return nil } func (s *jetstreamStore) BumpEpoch(roomID string, newEpoch int) error { // Read-modify-write the room's epoch. The control plane serializes rekeys per // room (owner-signed), so the lost-update window is not exercised in practice. info, err := s.GetRoom(roomID) if err != nil { return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err) } info.Epoch = newEpoch b, err := json.Marshal(info) if err != nil { return fmt.Errorf("membership: marshal room: %w", err) } ctx, cancel := s.ctx() defer cancel() if _, err := s.rooms.Put(ctx, roomID, b); err != nil { return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err) } return nil } func (s *jetstreamStore) RemoveMember(roomID, endpoint string) error { ctx, cancel := s.ctx() defer cancel() // Drop the member row and its reverse index. Past-epoch sealed keys are left // intact (they only decrypt data the member could already read), matching the // SQLite store. if err := s.members.Delete(ctx, memberKey(roomID, endpoint)); err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) { return fmt.Errorf("membership: remove member %q/%q: %w", roomID, endpoint, err) } if err := s.byMember.Delete(ctx, byMemberKey(endpoint, roomID)); err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) { return fmt.Errorf("membership: remove member index %q/%q: %w", roomID, endpoint, err) } return nil } // ---- users (the bus allowlist) ------------------------------------------- func (s *jetstreamStore) AddUser(signPub, handle, role string) error { signPub = normalizeSignPub(signPub) if signPub == "" || handle == "" { return fmt.Errorf("membership: AddUser: sign_pub and handle required") } if role == "" { role = RoleMember } if role != RoleAdmin && role != RoleMember { return fmt.Errorf("membership: AddUser: invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember) } u := User{SignPub: signPub, Handle: handle, Role: role, Status: StatusActive, CreatedAt: nowRFC3339()} b, err := json.Marshal(u) if err != nil { return fmt.Errorf("membership: marshal user: %w", err) } ctx, cancel := s.ctx() defer cancel() if _, err := s.users.Create(ctx, signPub, b); err != nil { if errors.Is(err, jetstream.ErrKeyExists) { return ErrUserExists } return fmt.Errorf("membership: insert user: %w", err) } return nil } func (s *jetstreamStore) GetUser(signPub string) (User, error) { signPub = normalizeSignPub(signPub) ctx, cancel := s.ctx() defer cancel() e, err := s.users.Get(ctx, signPub) if err != nil { if errors.Is(err, jetstream.ErrKeyNotFound) { return User{}, fmt.Errorf("membership: get user %q: %w", signPub, ErrNotFound) } return User{}, fmt.Errorf("membership: get user %q: %w", signPub, err) } var u User if err := json.Unmarshal(e.Value(), &u); err != nil { return User{}, fmt.Errorf("membership: unmarshal user: %w", err) } return u, nil } func (s *jetstreamStore) ListUsers() ([]User, error) { ctx, cancel := s.ctx() w, err := s.users.WatchAll(ctx, jetstream.IgnoreDeletes()) if err != nil { cancel() return nil, fmt.Errorf("membership: list users: %w", err) } defer cancel() defer w.Stop() var out []User for { select { case e := <-w.Updates(): if e == nil { sort.Slice(out, func(i, j int) bool { if out[i].Handle != out[j].Handle { return out[i].Handle < out[j].Handle } return out[i].SignPub < out[j].SignPub }) return out, nil } var u User if err := json.Unmarshal(e.Value(), &u); err != nil { return nil, fmt.Errorf("membership: unmarshal user: %w", err) } out = append(out, u) case <-ctx.Done(): return nil, ctx.Err() } } } func (s *jetstreamStore) RevokeUser(signPub string) error { signPub = normalizeSignPub(signPub) u, err := s.GetUser(signPub) if err != nil { if errors.Is(err, ErrNotFound) { return fmt.Errorf("membership: revoke user %q: no active user with that key", signPub) } return fmt.Errorf("membership: revoke user %q: %w", signPub, err) } if u.Status != StatusActive { return fmt.Errorf("membership: revoke user %q: no active user with that key", signPub) } u.Status = StatusRevoked u.RevokedAt = nowRFC3339() b, err := json.Marshal(u) if err != nil { return fmt.Errorf("membership: marshal user: %w", err) } ctx, cancel := s.ctx() defer cancel() if _, err := s.users.Put(ctx, signPub, b); err != nil { return fmt.Errorf("membership: revoke user %q: %w", signPub, err) } return nil } // IsAuthorized reports whether signPub is an active bus user. Any backend error // (including a KV quorum loss or timeout) yields false: fail closed. func (s *jetstreamStore) IsAuthorized(signPub string) bool { signPub = normalizeSignPub(signPub) if signPub == "" { return false } ctx, cancel := s.ctx() defer cancel() e, err := s.users.Get(ctx, signPub) if err != nil { return false } var u User if err := json.Unmarshal(e.Value(), &u); err != nil { return false } return u.Status == StatusActive } // HasAdmin reports whether at least one active admin exists. On any backend // error it returns false, keeping the admin-gated endpoints closed (conservative). func (s *jetstreamStore) HasAdmin() bool { users, err := s.ListUsers() if err != nil { return false } for _, u := range users { if u.Role == RoleAdmin && u.Status == StatusActive { return true } } return false } // ---- snapshot import / export (issue 0003c migration) --------------------- // importSnapshot writes a full Snapshot into the KV buckets, preserving each // room's epoch and each user's status (Put, not CreateRoom/AddUser, so the exact // state is reproduced rather than reset to defaults). Idempotent: every write is // an overwrite, so re-running the migration converges. func (s *jetstreamStore) importSnapshot(snap *Snapshot) error { ctx, cancel := s.ctx() defer cancel() for _, r := range snap.Rooms { b, err := json.Marshal(r) if err != nil { return fmt.Errorf("import: marshal room %q: %w", r.RoomID, err) } if _, err := s.rooms.Put(ctx, r.RoomID, b); err != nil { return fmt.Errorf("import: put room %q: %w", r.RoomID, err) } } for roomID, members := range snap.Members { for _, m := range members { if err := s.putMember(ctx, roomID, m); err != nil { return fmt.Errorf("import: %w", err) } } } for _, rec := range snap.Keys { if _, err := s.keys.Put(ctx, sealedKey(rec.RoomID, rec.Endpoint, rec.Epoch), rec.Sealed); err != nil { return fmt.Errorf("import: put key %q/%q@%d: %w", rec.RoomID, rec.Endpoint, rec.Epoch, err) } } for _, u := range snap.Users { b, err := json.Marshal(u) if err != nil { return fmt.Errorf("import: marshal user %q: %w", u.SignPub, err) } if _, err := s.users.Put(ctx, normalizeSignPub(u.SignPub), b); err != nil { return fmt.Errorf("import: put user %q: %w", u.SignPub, err) } } return nil } // ExportSnapshot reads the entire KV control-plane state back into a Snapshot, // so the migration's parity test can compare it against the SQLite source. func (s *jetstreamStore) ExportSnapshot() (*Snapshot, error) { snap := &Snapshot{Members: map[string][]Member{}} roomEntries, err := s.watchAll(s.rooms) if err != nil { return nil, fmt.Errorf("export kv: rooms: %w", err) } for _, e := range roomEntries { var r RoomInfo if err := json.Unmarshal(e.Value(), &r); err != nil { return nil, fmt.Errorf("export kv: unmarshal room: %w", err) } snap.Rooms = append(snap.Rooms, r) } memberEntries, err := s.watchAll(s.members) if err != nil { return nil, fmt.Errorf("export kv: members: %w", err) } for _, e := range memberEntries { // Key is "."; neither segment contains a dot. roomID := strings.SplitN(e.Key(), ".", 2)[0] var m Member if err := json.Unmarshal(e.Value(), &m); err != nil { return nil, fmt.Errorf("export kv: unmarshal member: %w", err) } snap.Members[roomID] = append(snap.Members[roomID], m) } keyEntries, err := s.watchAll(s.keys) if err != nil { return nil, fmt.Errorf("export kv: keys: %w", err) } for _, e := range keyEntries { // Key is "..". parts := strings.Split(e.Key(), ".") if len(parts) != 3 { continue } epoch, err := strconv.Atoi(parts[2]) if err != nil { continue } snap.Keys = append(snap.Keys, SealedKeyRecord{RoomID: parts[0], Endpoint: parts[1], Epoch: epoch, Sealed: e.Value()}) } users, err := s.ListUsers() if err != nil { return nil, fmt.Errorf("export kv: users: %w", err) } snap.Users = users return snap, nil } // watchAll collects every current entry of a bucket (no key filter), draining // the watcher to its initial-snapshot nil marker. func (s *jetstreamStore) watchAll(kv jetstream.KeyValue) ([]jetstream.KeyValueEntry, error) { ctx, cancel := s.ctx() defer cancel() w, err := kv.WatchAll(ctx, jetstream.IgnoreDeletes()) if err != nil { return nil, err } defer w.Stop() var out []jetstream.KeyValueEntry for { select { case e := <-w.Updates(): if e == nil { return out, nil } out = append(out, e) case <-ctx.Done(): return nil, ctx.Err() } } }