Files
unibus/pkg/membership/jetstream_store.go
T
agent 9013ea5e33 feat(0003c): membershipd migrate-to-kv (idempotent SQLite -> JetStream KV)
The one-time data move decentralization needs (issue 0003c): copy the
entire control-plane state from the local SQLite database into the
replicated JetStream KV buckets, with a backup taken first.

pkg/membership:
- Snapshot / SealedKeyRecord: a backend-agnostic dump of the whole
  control plane (rooms with their real epoch, members, every sealed-key
  row across epochs, users with status).
- (*sqliteStore).ExportSnapshot and (*jetstreamStore).ExportSnapshot read
  a full Snapshot from each backend; (*jetstreamStore).importSnapshot
  writes one with raw Puts (preserving epoch/status, not resetting to
  defaults) so the migration is faithful and idempotent (every write is
  an overwrite, so re-running converges).
- MigrateSQLiteToKV orchestrates export -> import; BackupSQLite makes a
  consistent copy via SQLite's VACUUM INTO before any migration.

cmd/membershipd:
- `membershipd migrate-to-kv --db <path> --nats-url <url> [--replicas N]
  [--ca <cert>] [--no-backup]` backs up the SQLite file, connects to the
  cluster's NATS, and migrates. Dispatched on the host like `user`.

Tests (DoD: golden + edge + parity):
- TestMigrateSQLiteToKVParity: seed a representative SQLite (two rooms,
  one rekeyed to epoch 2, members, a revoked user); after migration the
  KV ExportSnapshot equals the SQLite ExportSnapshot.
- TestMigrateSQLiteToKVIdempotent: running the migration twice yields the
  same KV state.
- TestBackupSQLiteCreatesConsistentCopy: the backup reopens with
  identical data.
Plus a binary smoke (seed user -> run server -> migrate-to-kv -> re-run):
backup written, 1 user migrated, second run identical.
2026-06-07 15:09:56 +02:00

634 lines
20 KiB
Go

package membership
// jetstreamStore is the JetStream KV implementation of Store (issue 0003b): the
// control-plane state (rooms, members, sealed room keys, the user allowlist)
// lives in replicated JetStream Key/Value buckets instead of a process-local
// SQLite file. Any node in the cluster reads and writes the same buckets, and
// JetStream's RAFT layer keeps them consistent across replicas, so the HTTP
// control plane becomes effectively stateless: any membershipd can serve any
// request. It is selected only when the `decentralized` flag is on; sqliteStore
// stays the default.
//
// Key layout (every path segment is a single KV token — ULIDs, RawURL endpoint
// ids and lowercase-hex keys never contain a '.', so '.' is a safe separator and
// a "<prefix>.*" watch enumerates exactly one trailing token):
//
// rooms roomID -> RoomInfo (JSON)
// members roomID.endpoint -> Member (JSON, carries Role)
// rooms_by_member endpoint.roomID -> role (reverse index for ListRoomsForEndpoint)
// room_keys roomID.endpoint.epoch -> sealed_key bytes
// users signPubHex -> User (JSON)
//
// Consistency caveat: KV has no multi-key transaction, so a multi-write op
// (CreateRoom, AddMember) is a short sequence of single-key writes. The order is
// chosen so a partial failure leaves a recoverable state (the room/member row
// before its reverse index or sealed key), and writes are idempotent (Put
// overwrites), which is also what makes the SQLite->KV migration (0003c) safe to
// re-run.
//
// Fail-closed: every read uses a bounded context, and IsAuthorized/HasAdmin
// return false on ANY backend error (a KV quorum loss or timeout denies access
// rather than admitting it), mirroring the SQLite store's behavior.
import (
"context"
"encoding/json"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/nats-io/nats.go/jetstream"
)
// Bucket names (alphanumeric/dash/underscore only — no dots, per KV rules).
const (
bucketRooms = "UNIBUS_rooms"
bucketMembers = "UNIBUS_members"
bucketByMember = "UNIBUS_rooms_by_member"
bucketRoomKeys = "UNIBUS_room_keys"
bucketUsers = "UNIBUS_users"
defaultKVOpTime = 5 * time.Second
)
// JetStreamConfig configures the KV-backed store.
type JetStreamConfig struct {
// Replicas is the per-bucket replication factor (R1..R5). Use 1 for a single
// node or a 1-2 node rollout, 3 for real HA (quorum 2/3). Scaling R1->R3 in
// place is an operational step (nats kv update) done when the third node
// joins; it does not require reopening the store.
Replicas int
// OpTimeout bounds every KV operation so a stalled backend fails closed
// instead of hanging a request. Zero uses defaultKVOpTime.
OpTimeout time.Duration
}
type jetstreamStore struct {
rooms jetstream.KeyValue
members jetstream.KeyValue
byMember jetstream.KeyValue
keys jetstream.KeyValue
users jetstream.KeyValue
opTimeout time.Duration
}
// OpenJetStream creates (or opens) the five KV buckets on js with the configured
// replication factor and returns a Store backed by them. The JetStream context
// belongs to the caller (it owns the NATS connection); Close is a no-op.
func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
if cfg.Replicas <= 0 {
cfg.Replicas = 1
}
opTimeout := cfg.OpTimeout
if opTimeout <= 0 {
opTimeout = defaultKVOpTime
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
s := &jetstreamStore{opTimeout: opTimeout}
for _, b := range []struct {
name string
dst *jetstream.KeyValue
}{
{bucketRooms, &s.rooms},
{bucketMembers, &s.members},
{bucketByMember, &s.byMember},
{bucketRoomKeys, &s.keys},
{bucketUsers, &s.users},
} {
kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: b.name,
Replicas: cfg.Replicas,
History: 1,
Storage: jetstream.FileStorage,
})
if err != nil {
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d): %w", b.name, cfg.Replicas, err)
}
*b.dst = kv
}
return s, nil
}
// Close releases nothing: the JetStream context and NATS connection are owned by
// the caller, which closes them on shutdown.
func (s *jetstreamStore) Close() error { return nil }
func (s *jetstreamStore) ctx() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), s.opTimeout)
}
// ---- key helpers ----------------------------------------------------------
func memberKey(roomID, endpoint string) string { return roomID + "." + endpoint }
func byMemberKey(endpoint, roomID string) string { return endpoint + "." + roomID }
func sealedKey(roomID, endpoint string, e int) string {
return roomID + "." + endpoint + "." + strconv.Itoa(e)
}
// watchEntries collects every current entry whose key matches pattern (a KV
// watch with a "<prefix>.*" wildcard), draining the watcher until the nil marker
// that signals "all initial values delivered". Tombstones are skipped.
func (s *jetstreamStore) watchEntries(kv jetstream.KeyValue, pattern string) ([]jetstream.KeyValueEntry, error) {
ctx, cancel := s.ctx()
defer cancel()
w, err := kv.Watch(ctx, pattern, jetstream.IgnoreDeletes())
if err != nil {
return nil, err
}
defer w.Stop()
var out []jetstream.KeyValueEntry
for {
select {
case e := <-w.Updates():
if e == nil {
return out, nil // initial snapshot complete
}
out = append(out, e)
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// ---- rooms / members / keys ----------------------------------------------
func (s *jetstreamStore) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error {
ctx, cancel := s.ctx()
defer cancel()
info.Epoch = 1
roomJSON, err := json.Marshal(info)
if err != nil {
return fmt.Errorf("membership: marshal room: %w", err)
}
// Create (not Put) so a duplicate room id is rejected, matching SQLite's
// PRIMARY KEY behavior.
if _, err := s.rooms.Create(ctx, info.RoomID, roomJSON); err != nil {
if errors.Is(err, jetstream.ErrKeyExists) {
return fmt.Errorf("membership: room %q already exists", info.RoomID)
}
return fmt.Errorf("membership: create room: %w", err)
}
owner := Member{Endpoint: info.OwnerEndpoint, Role: "owner", SignPub: ownerSignPub, KexPub: ownerKexPub}
if err := s.putMember(ctx, info.RoomID, owner); err != nil {
return err
}
if info.Encrypt {
if _, err := s.keys.Put(ctx, sealedKey(info.RoomID, info.OwnerEndpoint, 1), ownerSealedKey); err != nil {
return fmt.Errorf("membership: put owner key: %w", err)
}
}
return nil
}
// putMember writes the member row and its reverse index together.
func (s *jetstreamStore) putMember(ctx context.Context, roomID string, m Member) error {
mb, err := json.Marshal(m)
if err != nil {
return fmt.Errorf("membership: marshal member: %w", err)
}
if _, err := s.members.Put(ctx, memberKey(roomID, m.Endpoint), mb); err != nil {
return fmt.Errorf("membership: put member: %w", err)
}
if _, err := s.byMember.Put(ctx, byMemberKey(m.Endpoint, roomID), []byte(m.Role)); err != nil {
return fmt.Errorf("membership: put member index: %w", err)
}
return nil
}
func (s *jetstreamStore) GetRoom(roomID string) (RoomInfo, error) {
ctx, cancel := s.ctx()
defer cancel()
e, err := s.rooms.Get(ctx, roomID)
if err != nil {
if errors.Is(err, jetstream.ErrKeyNotFound) {
return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, ErrNotFound)
}
return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, err)
}
var info RoomInfo
if err := json.Unmarshal(e.Value(), &info); err != nil {
return RoomInfo{}, fmt.Errorf("membership: unmarshal room %q: %w", roomID, err)
}
return info, nil
}
func (s *jetstreamStore) AddMember(roomID string, m Member, epoch int, sealedKeyBytes []byte) error {
ctx, cancel := s.ctx()
defer cancel()
if err := s.putMember(ctx, roomID, m); err != nil {
return err
}
if len(sealedKeyBytes) > 0 {
if _, err := s.keys.Put(ctx, sealedKey(roomID, m.Endpoint, epoch), sealedKeyBytes); err != nil {
return fmt.Errorf("membership: put member key: %w", err)
}
}
return nil
}
func (s *jetstreamStore) GetMember(roomID, endpoint string) (Member, error) {
ctx, cancel := s.ctx()
defer cancel()
e, err := s.members.Get(ctx, memberKey(roomID, endpoint))
if err != nil {
if errors.Is(err, jetstream.ErrKeyNotFound) {
return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, ErrNotFound)
}
return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, err)
}
var m Member
if err := json.Unmarshal(e.Value(), &m); err != nil {
return Member{}, fmt.Errorf("membership: unmarshal member: %w", err)
}
return m, nil
}
func (s *jetstreamStore) ListMembers(roomID string) ([]Member, error) {
entries, err := s.watchEntries(s.members, roomID+".*")
if err != nil {
return nil, fmt.Errorf("membership: list members %q: %w", roomID, err)
}
out := make([]Member, 0, len(entries))
for _, e := range entries {
var m Member
if err := json.Unmarshal(e.Value(), &m); err != nil {
return nil, fmt.Errorf("membership: unmarshal member: %w", err)
}
out = append(out, m)
}
sort.Slice(out, func(i, j int) bool { return out[i].Endpoint < out[j].Endpoint })
return out, nil
}
func (s *jetstreamStore) ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error) {
entries, err := s.watchEntries(s.byMember, endpoint+".*")
if err != nil {
return nil, fmt.Errorf("membership: list rooms for endpoint %q: %w", endpoint, err)
}
out := make([]RoomMembership, 0, len(entries))
for _, e := range entries {
// Key is "<endpoint>.<roomID>"; the roomID is everything after the dot.
roomID := e.Key()[len(endpoint)+1:]
info, err := s.GetRoom(roomID)
if err != nil {
if errors.Is(err, ErrNotFound) {
continue // index points at a removed room: skip, stay consistent
}
return nil, err
}
out = append(out, RoomMembership{RoomInfo: info, Role: string(e.Value())})
}
sort.Slice(out, func(i, j int) bool { return out[i].RoomID < out[j].RoomID })
return out, nil
}
func (s *jetstreamStore) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) {
if epoch > 0 {
ctx, cancel := s.ctx()
defer cancel()
e, err := s.keys.Get(ctx, sealedKey(roomID, endpoint, epoch))
if err != nil {
if errors.Is(err, jetstream.ErrKeyNotFound) {
return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, ErrNotFound)
}
return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, err)
}
return epoch, e.Value(), nil
}
// epoch <= 0: latest. Enumerate "<roomID>.<endpoint>.*" and take the max.
entries, err := s.watchEntries(s.keys, roomID+"."+endpoint+".*")
if err != nil {
return 0, nil, fmt.Errorf("membership: get latest sealed key %q/%q: %w", roomID, endpoint, err)
}
bestEpoch, bestVal := -1, []byte(nil)
for _, e := range entries {
k := e.Key()
ep, perr := strconv.Atoi(k[len(roomID)+1+len(endpoint)+1:])
if perr != nil {
continue
}
if ep > bestEpoch {
bestEpoch, bestVal = ep, e.Value()
}
}
if bestEpoch < 0 {
return 0, nil, fmt.Errorf("membership: get latest sealed key %q/%q: %w", roomID, endpoint, ErrNotFound)
}
return bestEpoch, bestVal, nil
}
func (s *jetstreamStore) PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error {
ctx, cancel := s.ctx()
defer cancel()
for endpoint, sealed := range keys {
if _, err := s.keys.Put(ctx, sealedKey(roomID, endpoint, epoch), sealed); err != nil {
return fmt.Errorf("membership: put sealed key for %q: %w", endpoint, err)
}
}
return nil
}
func (s *jetstreamStore) BumpEpoch(roomID string, newEpoch int) error {
// Read-modify-write the room's epoch. The control plane serializes rekeys per
// room (owner-signed), so the lost-update window is not exercised in practice.
info, err := s.GetRoom(roomID)
if err != nil {
return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err)
}
info.Epoch = newEpoch
b, err := json.Marshal(info)
if err != nil {
return fmt.Errorf("membership: marshal room: %w", err)
}
ctx, cancel := s.ctx()
defer cancel()
if _, err := s.rooms.Put(ctx, roomID, b); err != nil {
return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err)
}
return nil
}
func (s *jetstreamStore) RemoveMember(roomID, endpoint string) error {
ctx, cancel := s.ctx()
defer cancel()
// Drop the member row and its reverse index. Past-epoch sealed keys are left
// intact (they only decrypt data the member could already read), matching the
// SQLite store.
if err := s.members.Delete(ctx, memberKey(roomID, endpoint)); err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) {
return fmt.Errorf("membership: remove member %q/%q: %w", roomID, endpoint, err)
}
if err := s.byMember.Delete(ctx, byMemberKey(endpoint, roomID)); err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) {
return fmt.Errorf("membership: remove member index %q/%q: %w", roomID, endpoint, err)
}
return nil
}
// ---- users (the bus allowlist) -------------------------------------------
func (s *jetstreamStore) AddUser(signPub, handle, role string) error {
signPub = normalizeSignPub(signPub)
if signPub == "" || handle == "" {
return fmt.Errorf("membership: AddUser: sign_pub and handle required")
}
if role == "" {
role = RoleMember
}
if role != RoleAdmin && role != RoleMember {
return fmt.Errorf("membership: AddUser: invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember)
}
u := User{SignPub: signPub, Handle: handle, Role: role, Status: StatusActive, CreatedAt: nowRFC3339()}
b, err := json.Marshal(u)
if err != nil {
return fmt.Errorf("membership: marshal user: %w", err)
}
ctx, cancel := s.ctx()
defer cancel()
if _, err := s.users.Create(ctx, signPub, b); err != nil {
if errors.Is(err, jetstream.ErrKeyExists) {
return ErrUserExists
}
return fmt.Errorf("membership: insert user: %w", err)
}
return nil
}
func (s *jetstreamStore) GetUser(signPub string) (User, error) {
signPub = normalizeSignPub(signPub)
ctx, cancel := s.ctx()
defer cancel()
e, err := s.users.Get(ctx, signPub)
if err != nil {
if errors.Is(err, jetstream.ErrKeyNotFound) {
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, ErrNotFound)
}
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, err)
}
var u User
if err := json.Unmarshal(e.Value(), &u); err != nil {
return User{}, fmt.Errorf("membership: unmarshal user: %w", err)
}
return u, nil
}
func (s *jetstreamStore) ListUsers() ([]User, error) {
ctx, cancel := s.ctx()
w, err := s.users.WatchAll(ctx, jetstream.IgnoreDeletes())
if err != nil {
cancel()
return nil, fmt.Errorf("membership: list users: %w", err)
}
defer cancel()
defer w.Stop()
var out []User
for {
select {
case e := <-w.Updates():
if e == nil {
sort.Slice(out, func(i, j int) bool {
if out[i].Handle != out[j].Handle {
return out[i].Handle < out[j].Handle
}
return out[i].SignPub < out[j].SignPub
})
return out, nil
}
var u User
if err := json.Unmarshal(e.Value(), &u); err != nil {
return nil, fmt.Errorf("membership: unmarshal user: %w", err)
}
out = append(out, u)
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
func (s *jetstreamStore) RevokeUser(signPub string) error {
signPub = normalizeSignPub(signPub)
u, err := s.GetUser(signPub)
if err != nil {
if errors.Is(err, ErrNotFound) {
return fmt.Errorf("membership: revoke user %q: no active user with that key", signPub)
}
return fmt.Errorf("membership: revoke user %q: %w", signPub, err)
}
if u.Status != StatusActive {
return fmt.Errorf("membership: revoke user %q: no active user with that key", signPub)
}
u.Status = StatusRevoked
u.RevokedAt = nowRFC3339()
b, err := json.Marshal(u)
if err != nil {
return fmt.Errorf("membership: marshal user: %w", err)
}
ctx, cancel := s.ctx()
defer cancel()
if _, err := s.users.Put(ctx, signPub, b); err != nil {
return fmt.Errorf("membership: revoke user %q: %w", signPub, err)
}
return nil
}
// IsAuthorized reports whether signPub is an active bus user. Any backend error
// (including a KV quorum loss or timeout) yields false: fail closed.
func (s *jetstreamStore) IsAuthorized(signPub string) bool {
signPub = normalizeSignPub(signPub)
if signPub == "" {
return false
}
ctx, cancel := s.ctx()
defer cancel()
e, err := s.users.Get(ctx, signPub)
if err != nil {
return false
}
var u User
if err := json.Unmarshal(e.Value(), &u); err != nil {
return false
}
return u.Status == StatusActive
}
// HasAdmin reports whether at least one active admin exists. On any backend
// error it returns false, keeping the admin-gated endpoints closed (conservative).
func (s *jetstreamStore) HasAdmin() bool {
users, err := s.ListUsers()
if err != nil {
return false
}
for _, u := range users {
if u.Role == RoleAdmin && u.Status == StatusActive {
return true
}
}
return false
}
// ---- snapshot import / export (issue 0003c migration) ---------------------
// importSnapshot writes a full Snapshot into the KV buckets, preserving each
// room's epoch and each user's status (Put, not CreateRoom/AddUser, so the exact
// state is reproduced rather than reset to defaults). Idempotent: every write is
// an overwrite, so re-running the migration converges.
func (s *jetstreamStore) importSnapshot(snap *Snapshot) error {
ctx, cancel := s.ctx()
defer cancel()
for _, r := range snap.Rooms {
b, err := json.Marshal(r)
if err != nil {
return fmt.Errorf("import: marshal room %q: %w", r.RoomID, err)
}
if _, err := s.rooms.Put(ctx, r.RoomID, b); err != nil {
return fmt.Errorf("import: put room %q: %w", r.RoomID, err)
}
}
for roomID, members := range snap.Members {
for _, m := range members {
if err := s.putMember(ctx, roomID, m); err != nil {
return fmt.Errorf("import: %w", err)
}
}
}
for _, rec := range snap.Keys {
if _, err := s.keys.Put(ctx, sealedKey(rec.RoomID, rec.Endpoint, rec.Epoch), rec.Sealed); err != nil {
return fmt.Errorf("import: put key %q/%q@%d: %w", rec.RoomID, rec.Endpoint, rec.Epoch, err)
}
}
for _, u := range snap.Users {
b, err := json.Marshal(u)
if err != nil {
return fmt.Errorf("import: marshal user %q: %w", u.SignPub, err)
}
if _, err := s.users.Put(ctx, normalizeSignPub(u.SignPub), b); err != nil {
return fmt.Errorf("import: put user %q: %w", u.SignPub, err)
}
}
return nil
}
// ExportSnapshot reads the entire KV control-plane state back into a Snapshot,
// so the migration's parity test can compare it against the SQLite source.
func (s *jetstreamStore) ExportSnapshot() (*Snapshot, error) {
snap := &Snapshot{Members: map[string][]Member{}}
roomEntries, err := s.watchAll(s.rooms)
if err != nil {
return nil, fmt.Errorf("export kv: rooms: %w", err)
}
for _, e := range roomEntries {
var r RoomInfo
if err := json.Unmarshal(e.Value(), &r); err != nil {
return nil, fmt.Errorf("export kv: unmarshal room: %w", err)
}
snap.Rooms = append(snap.Rooms, r)
}
memberEntries, err := s.watchAll(s.members)
if err != nil {
return nil, fmt.Errorf("export kv: members: %w", err)
}
for _, e := range memberEntries {
// Key is "<roomID>.<endpoint>"; neither segment contains a dot.
roomID := strings.SplitN(e.Key(), ".", 2)[0]
var m Member
if err := json.Unmarshal(e.Value(), &m); err != nil {
return nil, fmt.Errorf("export kv: unmarshal member: %w", err)
}
snap.Members[roomID] = append(snap.Members[roomID], m)
}
keyEntries, err := s.watchAll(s.keys)
if err != nil {
return nil, fmt.Errorf("export kv: keys: %w", err)
}
for _, e := range keyEntries {
// Key is "<roomID>.<endpoint>.<epoch>".
parts := strings.Split(e.Key(), ".")
if len(parts) != 3 {
continue
}
epoch, err := strconv.Atoi(parts[2])
if err != nil {
continue
}
snap.Keys = append(snap.Keys, SealedKeyRecord{RoomID: parts[0], Endpoint: parts[1], Epoch: epoch, Sealed: e.Value()})
}
users, err := s.ListUsers()
if err != nil {
return nil, fmt.Errorf("export kv: users: %w", err)
}
snap.Users = users
return snap, nil
}
// watchAll collects every current entry of a bucket (no key filter), draining
// the watcher to its initial-snapshot nil marker.
func (s *jetstreamStore) watchAll(kv jetstream.KeyValue) ([]jetstream.KeyValueEntry, error) {
ctx, cancel := s.ctx()
defer cancel()
w, err := kv.WatchAll(ctx, jetstream.IgnoreDeletes())
if err != nil {
return nil, err
}
defer w.Stop()
var out []jetstream.KeyValueEntry
for {
select {
case e := <-w.Updates():
if e == nil {
return out, nil
}
out = append(out, e)
case <-ctx.Done():
return nil, ctx.Err()
}
}
}