d64b0c052d
Add the data layer for WhatsApp-style accounts on the wallet model: the admin mints a single-use invitation link, the new user redeems it by publishing only its public keys, and the admin can hard-delete a user. - Invite type and lifecycle (invites.go): 32-byte crypto/rand hex token, 7-day default TTL, fail-closed expiry parsing. Methods CreateInvite/GetInvite/ ListInvites/ConsumeInvite/CancelInvite on both backends. ConsumeInvite is atomic and single-use: SQLite uses a transaction guarded by `used = 0`, the KV store uses a compare-and-swap on the entry revision (mark-first). Both burn the token on claim, so an already-registered key surfaces ErrUserExists with the invite spent — identical semantics across backends. - DeleteUser (users.go + jetstream_store.go): hard-delete of the allowlist row, distinct from RevokeUser's status flip. Room memberships of the ex-user are intentionally left inert (they can no longer authenticate); no partial cleanup. - Migration 003_invites.sql (root + embedded copy, byte-identical): additive `invites` table with audit columns, per db_migrations rules. - Store interface gains DeleteUser, CreateInvite, GetInvite, ListInvites, ConsumeInvite, CancelInvite. New UNIBUS_invites KV bucket. - Consistency fix: SQLite GetUser now maps sql.ErrNoRows to ErrNotFound, matching the KV backend and the storage-agnostic contract documented in store.go. - ValidateKexPubHex added alongside ValidateSignPubHex for /register key checks. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
849 lines
28 KiB
Go
849 lines
28 KiB
Go
package membership
|
|
|
|
// jetstreamStore is the JetStream KV implementation of Store (issue 0003b): the
|
|
// control-plane state (rooms, members, sealed room keys, the user allowlist)
|
|
// lives in replicated JetStream Key/Value buckets instead of a process-local
|
|
// SQLite file. Any node in the cluster reads and writes the same buckets, and
|
|
// JetStream's RAFT layer keeps them consistent across replicas, so the HTTP
|
|
// control plane becomes effectively stateless: any membershipd can serve any
|
|
// request. It is selected only when the `decentralized` flag is on; sqliteStore
|
|
// stays the default.
|
|
//
|
|
// Key layout (every path segment is a single KV token — ULIDs, RawURL endpoint
|
|
// ids and lowercase-hex keys never contain a '.', so '.' is a safe separator and
|
|
// a "<prefix>.*" watch enumerates exactly one trailing token):
|
|
//
|
|
// rooms roomID -> RoomInfo (JSON)
|
|
// members roomID.endpoint -> Member (JSON, carries Role)
|
|
// rooms_by_member endpoint.roomID -> role (reverse index for ListRoomsForEndpoint)
|
|
// room_keys roomID.endpoint.epoch -> sealed_key bytes
|
|
// users signPubHex -> User (JSON)
|
|
//
|
|
// Consistency caveat: KV has no multi-key transaction, so a multi-write op
|
|
// (CreateRoom, AddMember) is a short sequence of single-key writes. The order is
|
|
// chosen so a partial failure leaves a recoverable state (the room/member row
|
|
// before its reverse index or sealed key), and writes are idempotent (Put
|
|
// overwrites), which is also what makes the SQLite->KV migration (0003c) safe to
|
|
// re-run.
|
|
//
|
|
// Fail-closed: every read uses a bounded context, and IsAuthorized/HasAdmin
|
|
// return false on ANY backend error (a KV quorum loss or timeout denies access
|
|
// rather than admitting it), mirroring the SQLite store's behavior.
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats.go/jetstream"
|
|
)
|
|
|
|
// Bucket names (alphanumeric/dash/underscore only — no dots, per KV rules).
|
|
const (
|
|
bucketRooms = "UNIBUS_rooms"
|
|
bucketMembers = "UNIBUS_members"
|
|
bucketByMember = "UNIBUS_rooms_by_member"
|
|
bucketRoomKeys = "UNIBUS_room_keys"
|
|
bucketUsers = "UNIBUS_users"
|
|
bucketInvites = "UNIBUS_invites"
|
|
defaultKVOpTime = 5 * time.Second
|
|
)
|
|
|
|
// JetStreamConfig configures the KV-backed store.
|
|
type JetStreamConfig struct {
|
|
// Replicas is the per-bucket replication factor (R1..R5). Use 1 for a single
|
|
// node or a 1-2 node rollout, 3 for real HA (quorum 2/3). Scaling R1->R3 in
|
|
// place is an operational step (nats kv update) done when the third node
|
|
// joins; it does not require reopening the store.
|
|
Replicas int
|
|
// OpTimeout bounds every KV operation so a stalled backend fails closed
|
|
// instead of hanging a request. Zero uses defaultKVOpTime.
|
|
OpTimeout time.Duration
|
|
}
|
|
|
|
type jetstreamStore struct {
|
|
rooms jetstream.KeyValue
|
|
members jetstream.KeyValue
|
|
byMember jetstream.KeyValue
|
|
keys jetstream.KeyValue
|
|
users jetstream.KeyValue
|
|
invites jetstream.KeyValue
|
|
opTimeout time.Duration
|
|
}
|
|
|
|
// OpenJetStream creates (or opens) the five KV buckets on js with the configured
|
|
// replication factor and returns a Store backed by them. The JetStream context
|
|
// belongs to the caller (it owns the NATS connection); Close is a no-op.
|
|
func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
|
|
if cfg.Replicas <= 0 {
|
|
cfg.Replicas = 1
|
|
}
|
|
opTimeout := cfg.OpTimeout
|
|
if opTimeout <= 0 {
|
|
opTimeout = defaultKVOpTime
|
|
}
|
|
// Bootstrap budget for creating/opening the buckets. On a single node JetStream
|
|
// is ready the instant the server starts, so the first attempt succeeds. On a
|
|
// COLD multi-node cluster the JetStream meta-group must first elect a leader and
|
|
// each node must establish contact with it before its $JS.API responds. A KV
|
|
// op is a NATS request/reply: if it is published before the node's JetStream is
|
|
// ready the request is dropped (not queued), and a single long-context call then
|
|
// just blocks until it times out (issue 0006g). So we RETRY each bucket op with
|
|
// short per-attempt contexts until it succeeds or the overall bootstrap budget
|
|
// is exhausted; once the cluster is ready the next retry lands and the buckets
|
|
// are created, after which they persist and every node opens them quickly.
|
|
bootstrapBudget := 120 * time.Second
|
|
deadline := time.Now().Add(bootstrapBudget)
|
|
|
|
s := &jetstreamStore{opTimeout: opTimeout}
|
|
for _, b := range []struct {
|
|
name string
|
|
dst *jetstream.KeyValue
|
|
}{
|
|
{bucketRooms, &s.rooms},
|
|
{bucketMembers, &s.members},
|
|
{bucketByMember, &s.byMember},
|
|
{bucketRoomKeys, &s.keys},
|
|
{bucketUsers, &s.users},
|
|
{bucketInvites, &s.invites},
|
|
} {
|
|
var kv jetstream.KeyValue
|
|
var lastErr error
|
|
for {
|
|
opCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
kv, lastErr = js.CreateOrUpdateKeyValue(opCtx, jetstream.KeyValueConfig{
|
|
Bucket: b.name,
|
|
Replicas: cfg.Replicas,
|
|
History: 1,
|
|
Storage: jetstream.FileStorage,
|
|
})
|
|
cancel()
|
|
if lastErr == nil {
|
|
break
|
|
}
|
|
if time.Now().After(deadline) {
|
|
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d) after %s: %w", b.name, cfg.Replicas, bootstrapBudget, lastErr)
|
|
}
|
|
// JetStream not ready yet (no meta leader / request dropped). Wait and
|
|
// re-publish the op; in a cluster cold start this lands once the meta
|
|
// group settles.
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
*b.dst = kv
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// Close releases nothing: the JetStream context and NATS connection are owned by
|
|
// the caller, which closes them on shutdown.
|
|
func (s *jetstreamStore) Close() error { return nil }
|
|
|
|
func (s *jetstreamStore) ctx() (context.Context, context.CancelFunc) {
|
|
return context.WithTimeout(context.Background(), s.opTimeout)
|
|
}
|
|
|
|
// ---- key helpers ----------------------------------------------------------
|
|
|
|
func memberKey(roomID, endpoint string) string { return roomID + "." + endpoint }
|
|
func byMemberKey(endpoint, roomID string) string { return endpoint + "." + roomID }
|
|
func sealedKey(roomID, endpoint string, e int) string {
|
|
return roomID + "." + endpoint + "." + strconv.Itoa(e)
|
|
}
|
|
|
|
// watchEntries collects every current entry whose key matches pattern (a KV
|
|
// watch with a "<prefix>.*" wildcard), draining the watcher until the nil marker
|
|
// that signals "all initial values delivered". Tombstones are skipped.
|
|
func (s *jetstreamStore) watchEntries(kv jetstream.KeyValue, pattern string) ([]jetstream.KeyValueEntry, error) {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
w, err := kv.Watch(ctx, pattern, jetstream.IgnoreDeletes())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer w.Stop()
|
|
var out []jetstream.KeyValueEntry
|
|
for {
|
|
select {
|
|
case e := <-w.Updates():
|
|
if e == nil {
|
|
return out, nil // initial snapshot complete
|
|
}
|
|
out = append(out, e)
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---- rooms / members / keys ----------------------------------------------
|
|
|
|
func (s *jetstreamStore) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
|
|
info.Epoch = 1
|
|
roomJSON, err := json.Marshal(info)
|
|
if err != nil {
|
|
return fmt.Errorf("membership: marshal room: %w", err)
|
|
}
|
|
// Create (not Put) so a duplicate room id is rejected, matching SQLite's
|
|
// PRIMARY KEY behavior.
|
|
if _, err := s.rooms.Create(ctx, info.RoomID, roomJSON); err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyExists) {
|
|
return fmt.Errorf("membership: room %q already exists", info.RoomID)
|
|
}
|
|
return fmt.Errorf("membership: create room: %w", err)
|
|
}
|
|
|
|
owner := Member{Endpoint: info.OwnerEndpoint, Role: "owner", SignPub: ownerSignPub, KexPub: ownerKexPub}
|
|
if err := s.putMember(ctx, info.RoomID, owner); err != nil {
|
|
return err
|
|
}
|
|
if info.Encrypt {
|
|
if _, err := s.keys.Put(ctx, sealedKey(info.RoomID, info.OwnerEndpoint, 1), ownerSealedKey); err != nil {
|
|
return fmt.Errorf("membership: put owner key: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// putMember writes the member row and its reverse index together.
|
|
func (s *jetstreamStore) putMember(ctx context.Context, roomID string, m Member) error {
|
|
mb, err := json.Marshal(m)
|
|
if err != nil {
|
|
return fmt.Errorf("membership: marshal member: %w", err)
|
|
}
|
|
if _, err := s.members.Put(ctx, memberKey(roomID, m.Endpoint), mb); err != nil {
|
|
return fmt.Errorf("membership: put member: %w", err)
|
|
}
|
|
if _, err := s.byMember.Put(ctx, byMemberKey(m.Endpoint, roomID), []byte(m.Role)); err != nil {
|
|
return fmt.Errorf("membership: put member index: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *jetstreamStore) GetRoom(roomID string) (RoomInfo, error) {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
e, err := s.rooms.Get(ctx, roomID)
|
|
if err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, ErrNotFound)
|
|
}
|
|
return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, err)
|
|
}
|
|
var info RoomInfo
|
|
if err := json.Unmarshal(e.Value(), &info); err != nil {
|
|
return RoomInfo{}, fmt.Errorf("membership: unmarshal room %q: %w", roomID, err)
|
|
}
|
|
return info, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) AddMember(roomID string, m Member, epoch int, sealedKeyBytes []byte) error {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
if err := s.putMember(ctx, roomID, m); err != nil {
|
|
return err
|
|
}
|
|
if len(sealedKeyBytes) > 0 {
|
|
if _, err := s.keys.Put(ctx, sealedKey(roomID, m.Endpoint, epoch), sealedKeyBytes); err != nil {
|
|
return fmt.Errorf("membership: put member key: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *jetstreamStore) GetMember(roomID, endpoint string) (Member, error) {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
e, err := s.members.Get(ctx, memberKey(roomID, endpoint))
|
|
if err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, ErrNotFound)
|
|
}
|
|
return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, err)
|
|
}
|
|
var m Member
|
|
if err := json.Unmarshal(e.Value(), &m); err != nil {
|
|
return Member{}, fmt.Errorf("membership: unmarshal member: %w", err)
|
|
}
|
|
return m, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) ListMembers(roomID string) ([]Member, error) {
|
|
entries, err := s.watchEntries(s.members, roomID+".*")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("membership: list members %q: %w", roomID, err)
|
|
}
|
|
out := make([]Member, 0, len(entries))
|
|
for _, e := range entries {
|
|
var m Member
|
|
if err := json.Unmarshal(e.Value(), &m); err != nil {
|
|
return nil, fmt.Errorf("membership: unmarshal member: %w", err)
|
|
}
|
|
out = append(out, m)
|
|
}
|
|
sort.Slice(out, func(i, j int) bool { return out[i].Endpoint < out[j].Endpoint })
|
|
return out, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error) {
|
|
entries, err := s.watchEntries(s.byMember, endpoint+".*")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("membership: list rooms for endpoint %q: %w", endpoint, err)
|
|
}
|
|
out := make([]RoomMembership, 0, len(entries))
|
|
for _, e := range entries {
|
|
// Key is "<endpoint>.<roomID>"; the roomID is everything after the dot.
|
|
roomID := e.Key()[len(endpoint)+1:]
|
|
info, err := s.GetRoom(roomID)
|
|
if err != nil {
|
|
if errors.Is(err, ErrNotFound) {
|
|
continue // index points at a removed room: skip, stay consistent
|
|
}
|
|
return nil, err
|
|
}
|
|
out = append(out, RoomMembership{RoomInfo: info, Role: string(e.Value())})
|
|
}
|
|
sort.Slice(out, func(i, j int) bool { return out[i].RoomID < out[j].RoomID })
|
|
return out, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) {
|
|
if epoch > 0 {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
e, err := s.keys.Get(ctx, sealedKey(roomID, endpoint, epoch))
|
|
if err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, ErrNotFound)
|
|
}
|
|
return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, err)
|
|
}
|
|
return epoch, e.Value(), nil
|
|
}
|
|
// epoch <= 0: latest. Enumerate "<roomID>.<endpoint>.*" and take the max.
|
|
entries, err := s.watchEntries(s.keys, roomID+"."+endpoint+".*")
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("membership: get latest sealed key %q/%q: %w", roomID, endpoint, err)
|
|
}
|
|
bestEpoch, bestVal := -1, []byte(nil)
|
|
for _, e := range entries {
|
|
k := e.Key()
|
|
ep, perr := strconv.Atoi(k[len(roomID)+1+len(endpoint)+1:])
|
|
if perr != nil {
|
|
continue
|
|
}
|
|
if ep > bestEpoch {
|
|
bestEpoch, bestVal = ep, e.Value()
|
|
}
|
|
}
|
|
if bestEpoch < 0 {
|
|
return 0, nil, fmt.Errorf("membership: get latest sealed key %q/%q: %w", roomID, endpoint, ErrNotFound)
|
|
}
|
|
return bestEpoch, bestVal, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
for endpoint, sealed := range keys {
|
|
if _, err := s.keys.Put(ctx, sealedKey(roomID, endpoint, epoch), sealed); err != nil {
|
|
return fmt.Errorf("membership: put sealed key for %q: %w", endpoint, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *jetstreamStore) BumpEpoch(roomID string, newEpoch int) error {
|
|
// Read-modify-write the room's epoch. The control plane serializes rekeys per
|
|
// room (owner-signed), so the lost-update window is not exercised in practice.
|
|
info, err := s.GetRoom(roomID)
|
|
if err != nil {
|
|
return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err)
|
|
}
|
|
info.Epoch = newEpoch
|
|
b, err := json.Marshal(info)
|
|
if err != nil {
|
|
return fmt.Errorf("membership: marshal room: %w", err)
|
|
}
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
if _, err := s.rooms.Put(ctx, roomID, b); err != nil {
|
|
return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *jetstreamStore) RemoveMember(roomID, endpoint string) error {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
// Drop the member row and its reverse index. Past-epoch sealed keys are left
|
|
// intact (they only decrypt data the member could already read), matching the
|
|
// SQLite store.
|
|
if err := s.members.Delete(ctx, memberKey(roomID, endpoint)); err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return fmt.Errorf("membership: remove member %q/%q: %w", roomID, endpoint, err)
|
|
}
|
|
if err := s.byMember.Delete(ctx, byMemberKey(endpoint, roomID)); err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return fmt.Errorf("membership: remove member index %q/%q: %w", roomID, endpoint, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ---- users (the bus allowlist) -------------------------------------------
|
|
|
|
func (s *jetstreamStore) AddUser(signPub, handle, role string) error {
|
|
signPub = normalizeSignPub(signPub)
|
|
if signPub == "" || handle == "" {
|
|
return fmt.Errorf("membership: AddUser: sign_pub and handle required")
|
|
}
|
|
if role == "" {
|
|
role = RoleMember
|
|
}
|
|
if role != RoleAdmin && role != RoleMember {
|
|
return fmt.Errorf("membership: AddUser: invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember)
|
|
}
|
|
u := User{SignPub: signPub, Handle: handle, Role: role, Status: StatusActive, CreatedAt: nowRFC3339()}
|
|
b, err := json.Marshal(u)
|
|
if err != nil {
|
|
return fmt.Errorf("membership: marshal user: %w", err)
|
|
}
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
if _, err := s.users.Create(ctx, signPub, b); err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyExists) {
|
|
return ErrUserExists
|
|
}
|
|
return fmt.Errorf("membership: insert user: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *jetstreamStore) GetUser(signPub string) (User, error) {
|
|
signPub = normalizeSignPub(signPub)
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
e, err := s.users.Get(ctx, signPub)
|
|
if err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, ErrNotFound)
|
|
}
|
|
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, err)
|
|
}
|
|
var u User
|
|
if err := json.Unmarshal(e.Value(), &u); err != nil {
|
|
return User{}, fmt.Errorf("membership: unmarshal user: %w", err)
|
|
}
|
|
return u, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) ListUsers() ([]User, error) {
|
|
ctx, cancel := s.ctx()
|
|
w, err := s.users.WatchAll(ctx, jetstream.IgnoreDeletes())
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("membership: list users: %w", err)
|
|
}
|
|
defer cancel()
|
|
defer w.Stop()
|
|
var out []User
|
|
for {
|
|
select {
|
|
case e := <-w.Updates():
|
|
if e == nil {
|
|
sort.Slice(out, func(i, j int) bool {
|
|
if out[i].Handle != out[j].Handle {
|
|
return out[i].Handle < out[j].Handle
|
|
}
|
|
return out[i].SignPub < out[j].SignPub
|
|
})
|
|
return out, nil
|
|
}
|
|
var u User
|
|
if err := json.Unmarshal(e.Value(), &u); err != nil {
|
|
return nil, fmt.Errorf("membership: unmarshal user: %w", err)
|
|
}
|
|
out = append(out, u)
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *jetstreamStore) RevokeUser(signPub string) error {
|
|
signPub = normalizeSignPub(signPub)
|
|
u, err := s.GetUser(signPub)
|
|
if err != nil {
|
|
if errors.Is(err, ErrNotFound) {
|
|
return fmt.Errorf("membership: revoke user %q: no active user with that key", signPub)
|
|
}
|
|
return fmt.Errorf("membership: revoke user %q: %w", signPub, err)
|
|
}
|
|
if u.Status != StatusActive {
|
|
return fmt.Errorf("membership: revoke user %q: no active user with that key", signPub)
|
|
}
|
|
u.Status = StatusRevoked
|
|
u.RevokedAt = nowRFC3339()
|
|
b, err := json.Marshal(u)
|
|
if err != nil {
|
|
return fmt.Errorf("membership: marshal user: %w", err)
|
|
}
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
if _, err := s.users.Put(ctx, signPub, b); err != nil {
|
|
return fmt.Errorf("membership: revoke user %q: %w", signPub, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteUser hard-deletes a user from the KV allowlist (the purge counterpart of
|
|
// RevokeUser's status flip). It checks existence first so deleting an unknown key
|
|
// is ErrNotFound (KV Delete is otherwise idempotent and would not signal a miss).
|
|
// Only the allowlist key is removed; room memberships the ex-user holds become
|
|
// inert because they can no longer authenticate — see the SQLite DeleteUser for
|
|
// the full rationale on why room state is left untouched.
|
|
func (s *jetstreamStore) DeleteUser(signPub string) error {
|
|
signPub = normalizeSignPub(signPub)
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
if _, err := s.users.Get(ctx, signPub); err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return fmt.Errorf("membership: delete user %q: %w", signPub, ErrNotFound)
|
|
}
|
|
return fmt.Errorf("membership: delete user %q: %w", signPub, err)
|
|
}
|
|
if err := s.users.Delete(ctx, signPub); err != nil {
|
|
return fmt.Errorf("membership: delete user %q: %w", signPub, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IsAuthorized reports whether signPub is an active bus user. Any backend error
|
|
// (including a KV quorum loss or timeout) yields false: fail closed.
|
|
func (s *jetstreamStore) IsAuthorized(signPub string) bool {
|
|
signPub = normalizeSignPub(signPub)
|
|
if signPub == "" {
|
|
return false
|
|
}
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
e, err := s.users.Get(ctx, signPub)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
var u User
|
|
if err := json.Unmarshal(e.Value(), &u); err != nil {
|
|
return false
|
|
}
|
|
return u.Status == StatusActive
|
|
}
|
|
|
|
// HasAdmin reports whether at least one active admin exists. On any backend
|
|
// error it returns false, keeping the admin-gated endpoints closed (conservative).
|
|
func (s *jetstreamStore) HasAdmin() bool {
|
|
users, err := s.ListUsers()
|
|
if err != nil {
|
|
return false
|
|
}
|
|
for _, u := range users {
|
|
if u.Role == RoleAdmin && u.Status == StatusActive {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ---- invites (single-use registration tokens) ----------------------------
|
|
|
|
func (s *jetstreamStore) CreateInvite(handle, role string, ttlSecs int) (Invite, error) {
|
|
if handle == "" {
|
|
return Invite{}, fmt.Errorf("membership: CreateInvite: handle required")
|
|
}
|
|
role, err := validateInviteRole(role)
|
|
if err != nil {
|
|
return Invite{}, err
|
|
}
|
|
token, err := newInviteToken()
|
|
if err != nil {
|
|
return Invite{}, err
|
|
}
|
|
now := time.Now().UTC()
|
|
inv := Invite{
|
|
Token: token,
|
|
Handle: handle,
|
|
Role: role,
|
|
ExpiresAt: now.Add(inviteTTL(ttlSecs)).Format(time.RFC3339Nano),
|
|
Used: false,
|
|
CreatedAt: now.Format(time.RFC3339Nano),
|
|
}
|
|
b, err := json.Marshal(inv)
|
|
if err != nil {
|
|
return Invite{}, fmt.Errorf("membership: marshal invite: %w", err)
|
|
}
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
// Create (not Put) so a token collision is rejected rather than silently
|
|
// overwriting a live invite — a 32-byte random collision is astronomically
|
|
// unlikely, but Create makes the single-use guarantee unconditional.
|
|
if _, err := s.invites.Create(ctx, token, b); err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyExists) {
|
|
return Invite{}, fmt.Errorf("membership: create invite: token collision")
|
|
}
|
|
return Invite{}, fmt.Errorf("membership: create invite: %w", err)
|
|
}
|
|
return inv, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) GetInvite(token string) (Invite, error) {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
e, err := s.invites.Get(ctx, token)
|
|
if err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, ErrNotFound)
|
|
}
|
|
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, err)
|
|
}
|
|
var inv Invite
|
|
if err := json.Unmarshal(e.Value(), &inv); err != nil {
|
|
return Invite{}, fmt.Errorf("membership: unmarshal invite: %w", err)
|
|
}
|
|
return inv, nil
|
|
}
|
|
|
|
func (s *jetstreamStore) ListInvites() ([]Invite, error) {
|
|
ctx, cancel := s.ctx()
|
|
w, err := s.invites.WatchAll(ctx, jetstream.IgnoreDeletes())
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("membership: list invites: %w", err)
|
|
}
|
|
defer cancel()
|
|
defer w.Stop()
|
|
var out []Invite
|
|
for {
|
|
select {
|
|
case e := <-w.Updates():
|
|
if e == nil {
|
|
sort.Slice(out, func(i, j int) bool {
|
|
if out[i].CreatedAt != out[j].CreatedAt {
|
|
return out[i].CreatedAt > out[j].CreatedAt // newest first
|
|
}
|
|
return out[i].Token < out[j].Token
|
|
})
|
|
return out, nil
|
|
}
|
|
var inv Invite
|
|
if err := json.Unmarshal(e.Value(), &inv); err != nil {
|
|
return nil, fmt.Errorf("membership: unmarshal invite: %w", err)
|
|
}
|
|
out = append(out, inv)
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
// ConsumeInvite spends a KV invite and registers the presented signing key. With
|
|
// no multi-key transaction, single-use is enforced by a compare-and-swap on the
|
|
// invite: the token is marked used via Update against the revision read by Get,
|
|
// so only ONE concurrent consumer can win the swap; the loser sees a revision
|
|
// mismatch and is rejected as used. The user is registered AFTER the successful
|
|
// swap. Burn-on-claim: if the signing key is already registered the swap has
|
|
// already spent the token and we surface ErrUserExists — the SQLite store commits
|
|
// the same way, so both backends behave identically.
|
|
func (s *jetstreamStore) ConsumeInvite(token, signPub, kexPub string) error {
|
|
signPub = normalizeSignPub(signPub)
|
|
kexPub = normalizeSignPub(kexPub)
|
|
if signPub == "" {
|
|
return fmt.Errorf("membership: ConsumeInvite: sign_pub required")
|
|
}
|
|
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
e, err := s.invites.Get(ctx, token)
|
|
if err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return fmt.Errorf("membership: consume invite %q: %w", token, ErrNotFound)
|
|
}
|
|
return fmt.Errorf("membership: consume invite %q: %w", token, err)
|
|
}
|
|
var inv Invite
|
|
if err := json.Unmarshal(e.Value(), &inv); err != nil {
|
|
return fmt.Errorf("membership: unmarshal invite: %w", err)
|
|
}
|
|
if inv.Used {
|
|
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
|
|
}
|
|
if inviteIsExpired(inv.ExpiresAt) {
|
|
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteExpired)
|
|
}
|
|
|
|
inv.Used = true
|
|
inv.UsedAt = nowRFC3339()
|
|
inv.UsedSignPub = signPub
|
|
inv.UsedKexPub = kexPub
|
|
b, err := json.Marshal(inv)
|
|
if err != nil {
|
|
return fmt.Errorf("membership: marshal invite: %w", err)
|
|
}
|
|
// CAS: Update only succeeds if the invite is still at the revision we read, so
|
|
// a racing consumer that already flipped it loses here. A failed swap is
|
|
// conservatively treated as "already used" (the common cause); the caller can
|
|
// re-read to learn the precise state.
|
|
if _, err := s.invites.Update(ctx, token, b, e.Revision()); err != nil {
|
|
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
|
|
}
|
|
|
|
// Token is now spent. Register the user with the invite-fixed handle and role.
|
|
if err := s.AddUser(signPub, inv.Handle, inv.Role); err != nil {
|
|
if errors.Is(err, ErrUserExists) {
|
|
return ErrUserExists
|
|
}
|
|
return fmt.Errorf("membership: consume invite %q: register user: %w", token, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *jetstreamStore) CancelInvite(token string) error {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
if _, err := s.invites.Get(ctx, token); err != nil {
|
|
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
|
return fmt.Errorf("membership: cancel invite %q: %w", token, ErrNotFound)
|
|
}
|
|
return fmt.Errorf("membership: cancel invite %q: %w", token, err)
|
|
}
|
|
if err := s.invites.Delete(ctx, token); err != nil {
|
|
return fmt.Errorf("membership: cancel invite %q: %w", token, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ---- snapshot import / export (issue 0003c migration) ---------------------
|
|
|
|
// importSnapshot writes a full Snapshot into the KV buckets, preserving each
|
|
// room's epoch and each user's status (Put, not CreateRoom/AddUser, so the exact
|
|
// state is reproduced rather than reset to defaults). Idempotent: every write is
|
|
// an overwrite, so re-running the migration converges.
|
|
func (s *jetstreamStore) importSnapshot(snap *Snapshot) error {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
for _, r := range snap.Rooms {
|
|
b, err := json.Marshal(r)
|
|
if err != nil {
|
|
return fmt.Errorf("import: marshal room %q: %w", r.RoomID, err)
|
|
}
|
|
if _, err := s.rooms.Put(ctx, r.RoomID, b); err != nil {
|
|
return fmt.Errorf("import: put room %q: %w", r.RoomID, err)
|
|
}
|
|
}
|
|
for roomID, members := range snap.Members {
|
|
for _, m := range members {
|
|
if err := s.putMember(ctx, roomID, m); err != nil {
|
|
return fmt.Errorf("import: %w", err)
|
|
}
|
|
}
|
|
}
|
|
for _, rec := range snap.Keys {
|
|
if _, err := s.keys.Put(ctx, sealedKey(rec.RoomID, rec.Endpoint, rec.Epoch), rec.Sealed); err != nil {
|
|
return fmt.Errorf("import: put key %q/%q@%d: %w", rec.RoomID, rec.Endpoint, rec.Epoch, err)
|
|
}
|
|
}
|
|
for _, u := range snap.Users {
|
|
b, err := json.Marshal(u)
|
|
if err != nil {
|
|
return fmt.Errorf("import: marshal user %q: %w", u.SignPub, err)
|
|
}
|
|
if _, err := s.users.Put(ctx, normalizeSignPub(u.SignPub), b); err != nil {
|
|
return fmt.Errorf("import: put user %q: %w", u.SignPub, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ExportSnapshot reads the entire KV control-plane state back into a Snapshot,
|
|
// so the migration's parity test can compare it against the SQLite source.
|
|
func (s *jetstreamStore) ExportSnapshot() (*Snapshot, error) {
|
|
snap := &Snapshot{Members: map[string][]Member{}}
|
|
|
|
roomEntries, err := s.watchAll(s.rooms)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("export kv: rooms: %w", err)
|
|
}
|
|
for _, e := range roomEntries {
|
|
var r RoomInfo
|
|
if err := json.Unmarshal(e.Value(), &r); err != nil {
|
|
return nil, fmt.Errorf("export kv: unmarshal room: %w", err)
|
|
}
|
|
snap.Rooms = append(snap.Rooms, r)
|
|
}
|
|
|
|
memberEntries, err := s.watchAll(s.members)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("export kv: members: %w", err)
|
|
}
|
|
for _, e := range memberEntries {
|
|
// Key is "<roomID>.<endpoint>"; neither segment contains a dot.
|
|
roomID := strings.SplitN(e.Key(), ".", 2)[0]
|
|
var m Member
|
|
if err := json.Unmarshal(e.Value(), &m); err != nil {
|
|
return nil, fmt.Errorf("export kv: unmarshal member: %w", err)
|
|
}
|
|
snap.Members[roomID] = append(snap.Members[roomID], m)
|
|
}
|
|
|
|
keyEntries, err := s.watchAll(s.keys)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("export kv: keys: %w", err)
|
|
}
|
|
for _, e := range keyEntries {
|
|
// Key is "<roomID>.<endpoint>.<epoch>".
|
|
parts := strings.Split(e.Key(), ".")
|
|
if len(parts) != 3 {
|
|
continue
|
|
}
|
|
epoch, err := strconv.Atoi(parts[2])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
snap.Keys = append(snap.Keys, SealedKeyRecord{RoomID: parts[0], Endpoint: parts[1], Epoch: epoch, Sealed: e.Value()})
|
|
}
|
|
|
|
users, err := s.ListUsers()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("export kv: users: %w", err)
|
|
}
|
|
snap.Users = users
|
|
return snap, nil
|
|
}
|
|
|
|
// watchAll collects every current entry of a bucket (no key filter), draining
|
|
// the watcher to its initial-snapshot nil marker.
|
|
func (s *jetstreamStore) watchAll(kv jetstream.KeyValue) ([]jetstream.KeyValueEntry, error) {
|
|
ctx, cancel := s.ctx()
|
|
defer cancel()
|
|
w, err := kv.WatchAll(ctx, jetstream.IgnoreDeletes())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer w.Stop()
|
|
var out []jetstream.KeyValueEntry
|
|
for {
|
|
select {
|
|
case e := <-w.Updates():
|
|
if e == nil {
|
|
return out, nil
|
|
}
|
|
out = append(out, e)
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
}
|