33746d9962
Bringing up the 3-node cluster from clean stores never converged: every node looped on `open KV bucket "UNIBUS_rooms" (replicas=1): context deadline exceeded`. Three independent defects in the clustered bootstrap path, none of which surface on a single node (where JetStream is ready instantly), caused it: 1. embeddednats: route connection pooling (nats-server 2.10 default pool of 3) churned with "duplicate route"/"client closed" reconnects on the small cluster, interrupting the meta-group RAFT heartbeats and forcing perpetual leader re-elections. Set Cluster.PoolSize = -1 (single route per peer). 2. embeddednats: the cluster nodes are Docker hosts, so NATS advertised the docker bridge IPs (172.x / 10.0.x) to peers, which then tried to dial those private, mutually-unreachable addresses. Set Cluster.NoAdvertise = true so only the explicit public-IP routes are used. Also added a UNIBUS_NATS_DEBUG env toggle (off by default) that enables the embedded server's logger and loopback monitoring port for debugging the route/meta layer. 3. membership.OpenJetStream: a KV op is a NATS request/reply; on a cold cluster the op was published once, before the node had contact with the meta leader, so the request was dropped and the single long-context call just blocked until timeout. Retry each bucket op with short per-attempt contexts until it succeeds or an overall bootstrap budget (120s) is exhausted, so it lands once the meta settles. With these the cluster forms cleanly, creates the KV buckets, scales R1->R3 in place, and survives loss of one node (quorum 2/3). Verified on magnus+homer+datardos. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
657 lines
22 KiB
Go
657 lines
22 KiB
Go
package membership
|
|
|
|
// jetstreamStore is the JetStream KV implementation of Store (issue 0003b): the
|
|
// control-plane state (rooms, members, sealed room keys, the user allowlist)
|
|
// lives in replicated JetStream Key/Value buckets instead of a process-local
|
|
// SQLite file. Any node in the cluster reads and writes the same buckets, and
|
|
// JetStream's RAFT layer keeps them consistent across replicas, so the HTTP
|
|
// control plane becomes effectively stateless: any membershipd can serve any
|
|
// request. It is selected only when the `decentralized` flag is on; sqliteStore
|
|
// stays the default.
|
|
//
|
|
// Key layout (every path segment is a single KV token — ULIDs, RawURL endpoint
|
|
// ids and lowercase-hex keys never contain a '.', so '.' is a safe separator and
|
|
// a "<prefix>.*" watch enumerates exactly one trailing token):
|
|
//
|
|
// rooms roomID -> RoomInfo (JSON)
|
|
// members roomID.endpoint -> Member (JSON, carries Role)
|
|
// rooms_by_member endpoint.roomID -> role (reverse index for ListRoomsForEndpoint)
|
|
// room_keys roomID.endpoint.epoch -> sealed_key bytes
|
|
// users signPubHex -> User (JSON)
|
|
//
|
|
// Consistency caveat: KV has no multi-key transaction, so a multi-write op
|
|
// (CreateRoom, AddMember) is a short sequence of single-key writes. The order is
|
|
// chosen so a partial failure leaves a recoverable state (the room/member row
|
|
// before its reverse index or sealed key), and writes are idempotent (Put
|
|
// overwrites), which is also what makes the SQLite->KV migration (0003c) safe to
|
|
// re-run.
|
|
//
|
|
// Fail-closed: every read uses a bounded context, and IsAuthorized/HasAdmin
|
|
// return false on ANY backend error (a KV quorum loss or timeout denies access
|
|
// rather than admitting it), mirroring the SQLite store's behavior.
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats.go/jetstream"
|
|
)
|
|
|
|
// Bucket names (alphanumeric/dash/underscore only — no dots, per KV rules).
|
|
const (
|
|
bucketRooms = "UNIBUS_rooms"
|
|
bucketMembers = "UNIBUS_members"
|
|
bucketByMember = "UNIBUS_rooms_by_member"
|
|
bucketRoomKeys = "UNIBUS_room_keys"
|
|
bucketUsers = "UNIBUS_users"
|
|
defaultKVOpTime = 5 * time.Second
|
|
)
|
|
|
|
// JetStreamConfig configures the KV-backed store.
|
|
type JetStreamConfig struct {
|
|
// Replicas is the per-bucket replication factor (R1..R5). Use 1 for a single
|
|
// node or a 1-2 node rollout, 3 for real HA (quorum 2/3). Scaling R1->R3 in
|
|
// place is an operational step (nats kv update) done when the third node
|
|
// joins; it does not require reopening the store.
|
|
Replicas int
|
|
// OpTimeout bounds every KV operation so a stalled backend fails closed
|
|
// instead of hanging a request. Zero uses defaultKVOpTime.
|
|
OpTimeout time.Duration
|
|
}
|
|
|
|
type jetstreamStore struct {
|
|
rooms jetstream.KeyValue
|
|
members jetstream.KeyValue
|
|
byMember jetstream.KeyValue
|
|
keys jetstream.KeyValue
|
|
users jetstream.KeyValue
|
|
opTimeout time.Duration
|
|
}
|
|
|
|
// OpenJetStream creates (or opens) the five KV buckets on js with the configured
|
|
// replication factor and returns a Store backed by them. The JetStream context
|
|
// belongs to the caller (it owns the NATS connection); Close is a no-op.
|
|
func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
|
|
if cfg.Replicas <= 0 {
|
|
cfg.Replicas = 1
|
|
}
|
|
opTimeout := cfg.OpTimeout
|
|
if opTimeout <= 0 {
|
|
opTimeout = defaultKVOpTime
|
|
}
|
|
// Bootstrap budget for creating/opening the buckets. On a single node JetStream
|
|
// is ready the instant the server starts, so the first attempt succeeds. On a
|
|
// COLD multi-node cluster the JetStream meta-group must first elect a leader and
|
|
// each node must establish contact with it before its $JS.API responds. A KV
|
|
// op is a NATS request/reply: if it is published before the node's JetStream is
|
|
// ready the request is dropped (not queued), and a single long-context call then
|
|
// just blocks until it times out (issue 0006g). So we RETRY each bucket op with
|
|
// short per-attempt contexts until it succeeds or the overall bootstrap budget
|
|
// is exhausted; once the cluster is ready the next retry lands and the buckets
|
|
// are created, after which they persist and every node opens them quickly.
|
|
bootstrapBudget := 120 * time.Second
|
|
deadline := time.Now().Add(bootstrapBudget)
|
|
|
|
s := &jetstreamStore{opTimeout: opTimeout}
|
|
for _, b := range []struct {
|
|
name string
|
|
dst *jetstream.KeyValue
|
|
}{
|
|
{bucketRooms, &s.rooms},
|
|
{bucketMembers, &s.members},
|
|
{bucketByMember, &s.byMember},
|
|
{bucketRoomKeys, &s.keys},
|
|
{bucketUsers, &s.users},
|
|
} {
|
|
var kv jetstream.KeyValue
|
|
var lastErr error
|
|
for {
|
|
opCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
kv, lastErr = js.CreateOrUpdateKeyValue(opCtx, jetstream.KeyValueConfig{
|
|
Bucket: b.name,
|
|
Replicas: cfg.Replicas,
|
|
History: 1,
|
|
Storage: jetstream.FileStorage,
|
|
})
|
|
cancel()
|
|
if lastErr == nil {
|
|
break
|
|
}
|
|
if time.Now().After(deadline) {
|
|
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d) after %s: %w", b.name, cfg.Replicas, bootstrapBudget, lastErr)
|
|
}
|
|
// JetStream not ready yet (no meta leader / request dropped). Wait and
|
|
// re-publish the op; in a cluster cold start this lands once the meta
|
|
// group settles.
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
*b.dst = kv
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// Close releases nothing: the JetStream context and NATS connection are owned by
|
|
// the caller, which closes them on shutdown.
|
|
func (s *jetstreamStore) Close() error { return nil }
|
|
|
|
func (s *jetstreamStore) ctx() (context.Context, context.CancelFunc) {
|
|
return context.WithTimeout(context.Background(), s.opTimeout)
|
|
}
|
|
|
|
// ---- key helpers ----------------------------------------------------------
|
|
|
|
func memberKey(roomID, endpoint string) string { return roomID + "." + endpoint }
|
|
func byMemberKey(endpoint, roomID string) string { return endpoint + "." + roomID }
|
|
func sealedKey(roomID, endpoint string, e int) string {
|
|
return roomID + "." + endpoint + "." + strconv.Itoa(e)
|
|
}
|
|
|
|
// watchEntries collects every current entry whose key matches pattern (a KV
|
|
// watch with a "<prefix>.*" wildcard), draining the watcher until the nil marker
|
|
// that signals "all initial values delivered". Tombstones are skipped.
|
|
func (s *jetstreamStore) watchEntries(kv jetstream.KeyValue, pattern string) ([]jetstream.KeyValueEntry, error) {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
w, err := kv.Watch(ctx, pattern, jetstream.IgnoreDeletes())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer w.Stop()
|
|
var out []jetstream.KeyValueEntry
|
|
for {
|
|
select {
|
|
case e := <-w.Updates():
|
|
if e == nil {
|
|
return out, nil // initial snapshot complete
|
|
}
|
|
out = append(out, e)
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---- rooms / members / keys ----------------------------------------------
|
|
|
|
func (s *jetstreamStore) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
|
|
info.Epoch = 1
|
|
roomJSON, err := json.Marshal(info)
|
|
if err != nil {
|
|
return fmt.Errorf("membership: marshal room: %w", err)
|
|
}
|
|
// Create (not Put) so a duplicate room id is rejected, matching SQLite's
|
|
// PRIMARY KEY behavior.
|
|
if _, err := s.rooms.Create(ctx, info.RoomID, roomJSON); err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyExists) {
|
|
return fmt.Errorf("membership: room %q already exists", info.RoomID)
|
|
}
|
|
return fmt.Errorf("membership: create room: %w", err)
|
|
}
|
|
|
|
owner := Member{Endpoint: info.OwnerEndpoint, Role: "owner", SignPub: ownerSignPub, KexPub: ownerKexPub}
|
|
if err := s.putMember(ctx, info.RoomID, owner); err != nil {
|
|
return err
|
|
}
|
|
if info.Encrypt {
|
|
if _, err := s.keys.Put(ctx, sealedKey(info.RoomID, info.OwnerEndpoint, 1), ownerSealedKey); err != nil {
|
|
return fmt.Errorf("membership: put owner key: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// putMember writes the member row and its reverse index together.
|
|
func (s *jetstreamStore) putMember(ctx context.Context, roomID string, m Member) error {
|
|
mb, err := json.Marshal(m)
|
|
if err != nil {
|
|
return fmt.Errorf("membership: marshal member: %w", err)
|
|
}
|
|
if _, err := s.members.Put(ctx, memberKey(roomID, m.Endpoint), mb); err != nil {
|
|
return fmt.Errorf("membership: put member: %w", err)
|
|
}
|
|
if _, err := s.byMember.Put(ctx, byMemberKey(m.Endpoint, roomID), []byte(m.Role)); err != nil {
|
|
return fmt.Errorf("membership: put member index: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *jetstreamStore) GetRoom(roomID string) (RoomInfo, error) {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
e, err := s.rooms.Get(ctx, roomID)
|
|
if err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, ErrNotFound)
|
|
}
|
|
return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, err)
|
|
}
|
|
var info RoomInfo
|
|
if err := json.Unmarshal(e.Value(), &info); err != nil {
|
|
return RoomInfo{}, fmt.Errorf("membership: unmarshal room %q: %w", roomID, err)
|
|
}
|
|
return info, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) AddMember(roomID string, m Member, epoch int, sealedKeyBytes []byte) error {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
if err := s.putMember(ctx, roomID, m); err != nil {
|
|
return err
|
|
}
|
|
if len(sealedKeyBytes) > 0 {
|
|
if _, err := s.keys.Put(ctx, sealedKey(roomID, m.Endpoint, epoch), sealedKeyBytes); err != nil {
|
|
return fmt.Errorf("membership: put member key: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *jetstreamStore) GetMember(roomID, endpoint string) (Member, error) {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
e, err := s.members.Get(ctx, memberKey(roomID, endpoint))
|
|
if err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, ErrNotFound)
|
|
}
|
|
return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, err)
|
|
}
|
|
var m Member
|
|
if err := json.Unmarshal(e.Value(), &m); err != nil {
|
|
return Member{}, fmt.Errorf("membership: unmarshal member: %w", err)
|
|
}
|
|
return m, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) ListMembers(roomID string) ([]Member, error) {
|
|
entries, err := s.watchEntries(s.members, roomID+".*")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("membership: list members %q: %w", roomID, err)
|
|
}
|
|
out := make([]Member, 0, len(entries))
|
|
for _, e := range entries {
|
|
var m Member
|
|
if err := json.Unmarshal(e.Value(), &m); err != nil {
|
|
return nil, fmt.Errorf("membership: unmarshal member: %w", err)
|
|
}
|
|
out = append(out, m)
|
|
}
|
|
sort.Slice(out, func(i, j int) bool { return out[i].Endpoint < out[j].Endpoint })
|
|
return out, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error) {
|
|
entries, err := s.watchEntries(s.byMember, endpoint+".*")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("membership: list rooms for endpoint %q: %w", endpoint, err)
|
|
}
|
|
out := make([]RoomMembership, 0, len(entries))
|
|
for _, e := range entries {
|
|
// Key is "<endpoint>.<roomID>"; the roomID is everything after the dot.
|
|
roomID := e.Key()[len(endpoint)+1:]
|
|
info, err := s.GetRoom(roomID)
|
|
if err != nil {
|
|
if errors.Is(err, ErrNotFound) {
|
|
continue // index points at a removed room: skip, stay consistent
|
|
}
|
|
return nil, err
|
|
}
|
|
out = append(out, RoomMembership{RoomInfo: info, Role: string(e.Value())})
|
|
}
|
|
sort.Slice(out, func(i, j int) bool { return out[i].RoomID < out[j].RoomID })
|
|
return out, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) {
|
|
if epoch > 0 {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
e, err := s.keys.Get(ctx, sealedKey(roomID, endpoint, epoch))
|
|
if err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, ErrNotFound)
|
|
}
|
|
return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, err)
|
|
}
|
|
return epoch, e.Value(), nil
|
|
}
|
|
// epoch <= 0: latest. Enumerate "<roomID>.<endpoint>.*" and take the max.
|
|
entries, err := s.watchEntries(s.keys, roomID+"."+endpoint+".*")
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("membership: get latest sealed key %q/%q: %w", roomID, endpoint, err)
|
|
}
|
|
bestEpoch, bestVal := -1, []byte(nil)
|
|
for _, e := range entries {
|
|
k := e.Key()
|
|
ep, perr := strconv.Atoi(k[len(roomID)+1+len(endpoint)+1:])
|
|
if perr != nil {
|
|
continue
|
|
}
|
|
if ep > bestEpoch {
|
|
bestEpoch, bestVal = ep, e.Value()
|
|
}
|
|
}
|
|
if bestEpoch < 0 {
|
|
return 0, nil, fmt.Errorf("membership: get latest sealed key %q/%q: %w", roomID, endpoint, ErrNotFound)
|
|
}
|
|
return bestEpoch, bestVal, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
for endpoint, sealed := range keys {
|
|
if _, err := s.keys.Put(ctx, sealedKey(roomID, endpoint, epoch), sealed); err != nil {
|
|
return fmt.Errorf("membership: put sealed key for %q: %w", endpoint, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *jetstreamStore) BumpEpoch(roomID string, newEpoch int) error {
|
|
// Read-modify-write the room's epoch. The control plane serializes rekeys per
|
|
// room (owner-signed), so the lost-update window is not exercised in practice.
|
|
info, err := s.GetRoom(roomID)
|
|
if err != nil {
|
|
return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err)
|
|
}
|
|
info.Epoch = newEpoch
|
|
b, err := json.Marshal(info)
|
|
if err != nil {
|
|
return fmt.Errorf("membership: marshal room: %w", err)
|
|
}
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
if _, err := s.rooms.Put(ctx, roomID, b); err != nil {
|
|
return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *jetstreamStore) RemoveMember(roomID, endpoint string) error {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
// Drop the member row and its reverse index. Past-epoch sealed keys are left
|
|
// intact (they only decrypt data the member could already read), matching the
|
|
// SQLite store.
|
|
if err := s.members.Delete(ctx, memberKey(roomID, endpoint)); err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return fmt.Errorf("membership: remove member %q/%q: %w", roomID, endpoint, err)
|
|
}
|
|
if err := s.byMember.Delete(ctx, byMemberKey(endpoint, roomID)); err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return fmt.Errorf("membership: remove member index %q/%q: %w", roomID, endpoint, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ---- users (the bus allowlist) -------------------------------------------
|
|
|
|
func (s *jetstreamStore) AddUser(signPub, handle, role string) error {
|
|
signPub = normalizeSignPub(signPub)
|
|
if signPub == "" || handle == "" {
|
|
return fmt.Errorf("membership: AddUser: sign_pub and handle required")
|
|
}
|
|
if role == "" {
|
|
role = RoleMember
|
|
}
|
|
if role != RoleAdmin && role != RoleMember {
|
|
return fmt.Errorf("membership: AddUser: invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember)
|
|
}
|
|
u := User{SignPub: signPub, Handle: handle, Role: role, Status: StatusActive, CreatedAt: nowRFC3339()}
|
|
b, err := json.Marshal(u)
|
|
if err != nil {
|
|
return fmt.Errorf("membership: marshal user: %w", err)
|
|
}
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
if _, err := s.users.Create(ctx, signPub, b); err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyExists) {
|
|
return ErrUserExists
|
|
}
|
|
return fmt.Errorf("membership: insert user: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *jetstreamStore) GetUser(signPub string) (User, error) {
|
|
signPub = normalizeSignPub(signPub)
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
e, err := s.users.Get(ctx, signPub)
|
|
if err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, ErrNotFound)
|
|
}
|
|
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, err)
|
|
}
|
|
var u User
|
|
if err := json.Unmarshal(e.Value(), &u); err != nil {
|
|
return User{}, fmt.Errorf("membership: unmarshal user: %w", err)
|
|
}
|
|
return u, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) ListUsers() ([]User, error) {
|
|
ctx, cancel := s.ctx()
|
|
w, err := s.users.WatchAll(ctx, jetstream.IgnoreDeletes())
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("membership: list users: %w", err)
|
|
}
|
|
defer cancel()
|
|
defer w.Stop()
|
|
var out []User
|
|
for {
|
|
select {
|
|
case e := <-w.Updates():
|
|
if e == nil {
|
|
sort.Slice(out, func(i, j int) bool {
|
|
if out[i].Handle != out[j].Handle {
|
|
return out[i].Handle < out[j].Handle
|
|
}
|
|
return out[i].SignPub < out[j].SignPub
|
|
})
|
|
return out, nil
|
|
}
|
|
var u User
|
|
if err := json.Unmarshal(e.Value(), &u); err != nil {
|
|
return nil, fmt.Errorf("membership: unmarshal user: %w", err)
|
|
}
|
|
out = append(out, u)
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *jetstreamStore) RevokeUser(signPub string) error {
|
|
signPub = normalizeSignPub(signPub)
|
|
u, err := s.GetUser(signPub)
|
|
if err != nil {
|
|
if errors.Is(err, ErrNotFound) {
|
|
return fmt.Errorf("membership: revoke user %q: no active user with that key", signPub)
|
|
}
|
|
return fmt.Errorf("membership: revoke user %q: %w", signPub, err)
|
|
}
|
|
if u.Status != StatusActive {
|
|
return fmt.Errorf("membership: revoke user %q: no active user with that key", signPub)
|
|
}
|
|
u.Status = StatusRevoked
|
|
u.RevokedAt = nowRFC3339()
|
|
b, err := json.Marshal(u)
|
|
if err != nil {
|
|
return fmt.Errorf("membership: marshal user: %w", err)
|
|
}
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
if _, err := s.users.Put(ctx, signPub, b); err != nil {
|
|
return fmt.Errorf("membership: revoke user %q: %w", signPub, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IsAuthorized reports whether signPub is an active bus user. Any backend error
|
|
// (including a KV quorum loss or timeout) yields false: fail closed.
|
|
func (s *jetstreamStore) IsAuthorized(signPub string) bool {
|
|
signPub = normalizeSignPub(signPub)
|
|
if signPub == "" {
|
|
return false
|
|
}
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
e, err := s.users.Get(ctx, signPub)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
var u User
|
|
if err := json.Unmarshal(e.Value(), &u); err != nil {
|
|
return false
|
|
}
|
|
return u.Status == StatusActive
|
|
}
|
|
|
|
// HasAdmin reports whether at least one active admin exists. On any backend
|
|
// error it returns false, keeping the admin-gated endpoints closed (conservative).
|
|
func (s *jetstreamStore) HasAdmin() bool {
|
|
users, err := s.ListUsers()
|
|
if err != nil {
|
|
return false
|
|
}
|
|
for _, u := range users {
|
|
if u.Role == RoleAdmin && u.Status == StatusActive {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ---- snapshot import / export (issue 0003c migration) ---------------------
|
|
|
|
// importSnapshot writes a full Snapshot into the KV buckets, preserving each
|
|
// room's epoch and each user's status (Put, not CreateRoom/AddUser, so the exact
|
|
// state is reproduced rather than reset to defaults). Idempotent: every write is
|
|
// an overwrite, so re-running the migration converges.
|
|
func (s *jetstreamStore) importSnapshot(snap *Snapshot) error {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
for _, r := range snap.Rooms {
|
|
b, err := json.Marshal(r)
|
|
if err != nil {
|
|
return fmt.Errorf("import: marshal room %q: %w", r.RoomID, err)
|
|
}
|
|
if _, err := s.rooms.Put(ctx, r.RoomID, b); err != nil {
|
|
return fmt.Errorf("import: put room %q: %w", r.RoomID, err)
|
|
}
|
|
}
|
|
for roomID, members := range snap.Members {
|
|
for _, m := range members {
|
|
if err := s.putMember(ctx, roomID, m); err != nil {
|
|
return fmt.Errorf("import: %w", err)
|
|
}
|
|
}
|
|
}
|
|
for _, rec := range snap.Keys {
|
|
if _, err := s.keys.Put(ctx, sealedKey(rec.RoomID, rec.Endpoint, rec.Epoch), rec.Sealed); err != nil {
|
|
return fmt.Errorf("import: put key %q/%q@%d: %w", rec.RoomID, rec.Endpoint, rec.Epoch, err)
|
|
}
|
|
}
|
|
for _, u := range snap.Users {
|
|
b, err := json.Marshal(u)
|
|
if err != nil {
|
|
return fmt.Errorf("import: marshal user %q: %w", u.SignPub, err)
|
|
}
|
|
if _, err := s.users.Put(ctx, normalizeSignPub(u.SignPub), b); err != nil {
|
|
return fmt.Errorf("import: put user %q: %w", u.SignPub, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ExportSnapshot reads the entire KV control-plane state back into a Snapshot,
|
|
// so the migration's parity test can compare it against the SQLite source.
|
|
func (s *jetstreamStore) ExportSnapshot() (*Snapshot, error) {
|
|
snap := &Snapshot{Members: map[string][]Member{}}
|
|
|
|
roomEntries, err := s.watchAll(s.rooms)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("export kv: rooms: %w", err)
|
|
}
|
|
for _, e := range roomEntries {
|
|
var r RoomInfo
|
|
if err := json.Unmarshal(e.Value(), &r); err != nil {
|
|
return nil, fmt.Errorf("export kv: unmarshal room: %w", err)
|
|
}
|
|
snap.Rooms = append(snap.Rooms, r)
|
|
}
|
|
|
|
memberEntries, err := s.watchAll(s.members)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("export kv: members: %w", err)
|
|
}
|
|
for _, e := range memberEntries {
|
|
// Key is "<roomID>.<endpoint>"; neither segment contains a dot.
|
|
roomID := strings.SplitN(e.Key(), ".", 2)[0]
|
|
var m Member
|
|
if err := json.Unmarshal(e.Value(), &m); err != nil {
|
|
return nil, fmt.Errorf("export kv: unmarshal member: %w", err)
|
|
}
|
|
snap.Members[roomID] = append(snap.Members[roomID], m)
|
|
}
|
|
|
|
keyEntries, err := s.watchAll(s.keys)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("export kv: keys: %w", err)
|
|
}
|
|
for _, e := range keyEntries {
|
|
// Key is "<roomID>.<endpoint>.<epoch>".
|
|
parts := strings.Split(e.Key(), ".")
|
|
if len(parts) != 3 {
|
|
continue
|
|
}
|
|
epoch, err := strconv.Atoi(parts[2])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
snap.Keys = append(snap.Keys, SealedKeyRecord{RoomID: parts[0], Endpoint: parts[1], Epoch: epoch, Sealed: e.Value()})
|
|
}
|
|
|
|
users, err := s.ListUsers()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("export kv: users: %w", err)
|
|
}
|
|
snap.Users = users
|
|
return snap, nil
|
|
}
|
|
|
|
// watchAll collects every current entry of a bucket (no key filter), draining
|
|
// the watcher to its initial-snapshot nil marker.
|
|
func (s *jetstreamStore) watchAll(kv jetstream.KeyValue) ([]jetstream.KeyValueEntry, error) {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
w, err := kv.WatchAll(ctx, jetstream.IgnoreDeletes())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer w.Stop()
|
|
var out []jetstream.KeyValueEntry
|
|
for {
|
|
select {
|
|
case e := <-w.Updates():
|
|
if e == nil {
|
|
return out, nil
|
|
}
|
|
out = append(out, e)
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
}
|