feat(0003b): membership.Store interface + JetStream KV implementation
Branch-by-abstraction for the control-plane store (issue 0003b), so the membership state can move off process-local SQLite onto replicated JetStream KV without rewriting callers and without breaking master. pkg/membership: - Store is now an interface (rooms/members/keys + user allowlist + Close). The existing SQLite implementation is renamed sqliteStore and stays the default: Open(path) still returns it. openSQLite keeps the concrete type for internal callers (the 0003c migration). - ErrNotFound is a storage-agnostic "no such record" sentinel; both backends return it (the SQLite store maps sql.ErrNoRows to it). The control plane now branches on ErrNotFound instead of sql.ErrNoRows, so server.go no longer imports database/sql. - jetstreamStore (new) implements Store over five replicated KV buckets: rooms, members, rooms_by_member (reverse index for ListRoomsForEndpoint), room_keys, users. Replication factor is configurable (R1..R5) for the R1->R3 rollout. Every read is bounded by OpTimeout and IsAuthorized / HasAdmin FAIL CLOSED on any backend error (a KV quorum loss denies, never admits), per the audit's requirement for the decentralized store. dev/feature_flags.json: - Add the `decentralized` flag (OFF): sqliteStore default while off, jetstreamStore behind it. The membershipd boot wiring that selects the KV store is deliberately deferred to 0003e/0003f (the embedded-NATS authenticator<->store bootstrap is part of the session/deploy redesign); OFF keeps the single-node SQLite control plane unchanged. Tests (DoD: golden + edges + error path): - TestJetStreamStoreRoomsCRUD: encrypted room + owner + invited member round-trip through every room/member/key method, including latest-epoch resolution and rekey. - TestJetStreamStoreUsers: add/get/authorize/list/revoke + admin gate, with case-insensitive key normalization and duplicate rejection. - TestJetStreamStoreNotFound: ErrNotFound mapping for misses. - TestJetStreamStoreIsAuthorizedFailClosed: NATS backend shut down -> IsAuthorized and HasAdmin both DENY within the bounded timeout. The full existing suite stays green: sqliteStore is unchanged behavior.
This commit is contained in:
@@ -65,7 +65,7 @@ const defaultDBPath = "./local_files/unibus.db"
|
||||
// openStore opens the membership store at path, exiting on failure. Migrations
|
||||
// (including 002_users.sql) are applied by membership.Open, so a fresh database
|
||||
// gets the users table on first use of the CLI.
|
||||
func openStore(path string) *membership.Store {
|
||||
func openStore(path string) membership.Store {
|
||||
store, err := membership.Open(path)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "membershipd user: open store %q: %v\n", path, err)
|
||||
|
||||
@@ -14,6 +14,13 @@
|
||||
"description": "TLS on the NATS data plane using the project's self-signed CA (deploy/tls/). Server opts in via membershipd --tls-cert/--tls-key; clients pin ca.crt via client.Connect(caPath).",
|
||||
"added": "2026-06-07",
|
||||
"enabled_at": "2026-06-07"
|
||||
},
|
||||
"decentralized": {
|
||||
"enabled": false,
|
||||
"issue": "0003",
|
||||
"description": "Control-plane state on replicated JetStream KV instead of local SQLite (branch-by-abstraction membership.Store: sqliteStore default OFF, jetstreamStore ON). The route cluster (0003a) and the KV store (0003b) ship behind this flag; the membershipd boot wiring that selects the KV store completes with the session/reconnect redesign (0003e) and is activated on the multi-node deploy (0003f). OFF keeps the single-node SQLite control plane unchanged.",
|
||||
"added": "2026-06-07",
|
||||
"enabled_at": null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ type testHarness struct {
|
||||
ctrlURL string
|
||||
ns *server.Server
|
||||
httpts *httptest.Server
|
||||
store *membership.Store
|
||||
store membership.Store
|
||||
srv *membership.Server
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ import (
|
||||
// with a fresh store + blob store, and seeds one active admin ("alice").
|
||||
type authHarness struct {
|
||||
ts *httptest.Server
|
||||
store *Store
|
||||
store Store
|
||||
alice cs.Identity
|
||||
alicePub string // hex
|
||||
}
|
||||
|
||||
@@ -0,0 +1,510 @@
|
||||
package membership
|
||||
|
||||
// jetstreamStore is the JetStream KV implementation of Store (issue 0003b): the
|
||||
// control-plane state (rooms, members, sealed room keys, the user allowlist)
|
||||
// lives in replicated JetStream Key/Value buckets instead of a process-local
|
||||
// SQLite file. Any node in the cluster reads and writes the same buckets, and
|
||||
// JetStream's RAFT layer keeps them consistent across replicas, so the HTTP
|
||||
// control plane becomes effectively stateless: any membershipd can serve any
|
||||
// request. It is selected only when the `decentralized` flag is on; sqliteStore
|
||||
// stays the default.
|
||||
//
|
||||
// Key layout (every path segment is a single KV token — ULIDs, RawURL endpoint
|
||||
// ids and lowercase-hex keys never contain a '.', so '.' is a safe separator and
|
||||
// a "<prefix>.*" watch enumerates exactly one trailing token):
|
||||
//
|
||||
// rooms roomID -> RoomInfo (JSON)
|
||||
// members roomID.endpoint -> Member (JSON, carries Role)
|
||||
// rooms_by_member endpoint.roomID -> role (reverse index for ListRoomsForEndpoint)
|
||||
// room_keys roomID.endpoint.epoch -> sealed_key bytes
|
||||
// users signPubHex -> User (JSON)
|
||||
//
|
||||
// Consistency caveat: KV has no multi-key transaction, so a multi-write op
|
||||
// (CreateRoom, AddMember) is a short sequence of single-key writes. The order is
|
||||
// chosen so a partial failure leaves a recoverable state (the room/member row
|
||||
// before its reverse index or sealed key), and writes are idempotent (Put
|
||||
// overwrites), which is also what makes the SQLite->KV migration (0003c) safe to
|
||||
// re-run.
|
||||
//
|
||||
// Fail-closed: every read uses a bounded context, and IsAuthorized/HasAdmin
|
||||
// return false on ANY backend error (a KV quorum loss or timeout denies access
|
||||
// rather than admitting it), mirroring the SQLite store's behavior.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
// Bucket names (alphanumeric/dash/underscore only — no dots, per KV rules).
|
||||
const (
|
||||
bucketRooms = "UNIBUS_rooms"
|
||||
bucketMembers = "UNIBUS_members"
|
||||
bucketByMember = "UNIBUS_rooms_by_member"
|
||||
bucketRoomKeys = "UNIBUS_room_keys"
|
||||
bucketUsers = "UNIBUS_users"
|
||||
defaultKVOpTime = 5 * time.Second
|
||||
)
|
||||
|
||||
// JetStreamConfig configures the KV-backed store.
|
||||
type JetStreamConfig struct {
|
||||
// Replicas is the per-bucket replication factor (R1..R5). Use 1 for a single
|
||||
// node or a 1-2 node rollout, 3 for real HA (quorum 2/3). Scaling R1->R3 in
|
||||
// place is an operational step (nats kv update) done when the third node
|
||||
// joins; it does not require reopening the store.
|
||||
Replicas int
|
||||
// OpTimeout bounds every KV operation so a stalled backend fails closed
|
||||
// instead of hanging a request. Zero uses defaultKVOpTime.
|
||||
OpTimeout time.Duration
|
||||
}
|
||||
|
||||
type jetstreamStore struct {
|
||||
rooms jetstream.KeyValue
|
||||
members jetstream.KeyValue
|
||||
byMember jetstream.KeyValue
|
||||
keys jetstream.KeyValue
|
||||
users jetstream.KeyValue
|
||||
opTimeout time.Duration
|
||||
}
|
||||
|
||||
// OpenJetStream creates (or opens) the five KV buckets on js with the configured
|
||||
// replication factor and returns a Store backed by them. The JetStream context
|
||||
// belongs to the caller (it owns the NATS connection); Close is a no-op.
|
||||
func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
|
||||
if cfg.Replicas <= 0 {
|
||||
cfg.Replicas = 1
|
||||
}
|
||||
opTimeout := cfg.OpTimeout
|
||||
if opTimeout <= 0 {
|
||||
opTimeout = defaultKVOpTime
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
|
||||
s := &jetstreamStore{opTimeout: opTimeout}
|
||||
for _, b := range []struct {
|
||||
name string
|
||||
dst *jetstream.KeyValue
|
||||
}{
|
||||
{bucketRooms, &s.rooms},
|
||||
{bucketMembers, &s.members},
|
||||
{bucketByMember, &s.byMember},
|
||||
{bucketRoomKeys, &s.keys},
|
||||
{bucketUsers, &s.users},
|
||||
} {
|
||||
kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
|
||||
Bucket: b.name,
|
||||
Replicas: cfg.Replicas,
|
||||
History: 1,
|
||||
Storage: jetstream.FileStorage,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d): %w", b.name, cfg.Replicas, err)
|
||||
}
|
||||
*b.dst = kv
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Close releases nothing: the JetStream context and NATS connection are owned by
|
||||
// the caller, which closes them on shutdown.
|
||||
func (s *jetstreamStore) Close() error { return nil }
|
||||
|
||||
func (s *jetstreamStore) ctx() (context.Context, context.CancelFunc) {
|
||||
return context.WithTimeout(context.Background(), s.opTimeout)
|
||||
}
|
||||
|
||||
// ---- key helpers ----------------------------------------------------------
|
||||
|
||||
func memberKey(roomID, endpoint string) string { return roomID + "." + endpoint }
|
||||
func byMemberKey(endpoint, roomID string) string { return endpoint + "." + roomID }
|
||||
func sealedKey(roomID, endpoint string, e int) string {
|
||||
return roomID + "." + endpoint + "." + strconv.Itoa(e)
|
||||
}
|
||||
|
||||
// watchEntries collects every current entry whose key matches pattern (a KV
|
||||
// watch with a "<prefix>.*" wildcard), draining the watcher until the nil marker
|
||||
// that signals "all initial values delivered". Tombstones are skipped.
|
||||
func (s *jetstreamStore) watchEntries(kv jetstream.KeyValue, pattern string) ([]jetstream.KeyValueEntry, error) {
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
w, err := kv.Watch(ctx, pattern, jetstream.IgnoreDeletes())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer w.Stop()
|
||||
var out []jetstream.KeyValueEntry
|
||||
for {
|
||||
select {
|
||||
case e := <-w.Updates():
|
||||
if e == nil {
|
||||
return out, nil // initial snapshot complete
|
||||
}
|
||||
out = append(out, e)
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---- rooms / members / keys ----------------------------------------------
|
||||
|
||||
func (s *jetstreamStore) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error {
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
|
||||
info.Epoch = 1
|
||||
roomJSON, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: marshal room: %w", err)
|
||||
}
|
||||
// Create (not Put) so a duplicate room id is rejected, matching SQLite's
|
||||
// PRIMARY KEY behavior.
|
||||
if _, err := s.rooms.Create(ctx, info.RoomID, roomJSON); err != nil {
|
||||
if errors.Is(err, jetstream.ErrKeyExists) {
|
||||
return fmt.Errorf("membership: room %q already exists", info.RoomID)
|
||||
}
|
||||
return fmt.Errorf("membership: create room: %w", err)
|
||||
}
|
||||
|
||||
owner := Member{Endpoint: info.OwnerEndpoint, Role: "owner", SignPub: ownerSignPub, KexPub: ownerKexPub}
|
||||
if err := s.putMember(ctx, info.RoomID, owner); err != nil {
|
||||
return err
|
||||
}
|
||||
if info.Encrypt {
|
||||
if _, err := s.keys.Put(ctx, sealedKey(info.RoomID, info.OwnerEndpoint, 1), ownerSealedKey); err != nil {
|
||||
return fmt.Errorf("membership: put owner key: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// putMember writes the member row and its reverse index together.
|
||||
func (s *jetstreamStore) putMember(ctx context.Context, roomID string, m Member) error {
|
||||
mb, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: marshal member: %w", err)
|
||||
}
|
||||
if _, err := s.members.Put(ctx, memberKey(roomID, m.Endpoint), mb); err != nil {
|
||||
return fmt.Errorf("membership: put member: %w", err)
|
||||
}
|
||||
if _, err := s.byMember.Put(ctx, byMemberKey(m.Endpoint, roomID), []byte(m.Role)); err != nil {
|
||||
return fmt.Errorf("membership: put member index: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) GetRoom(roomID string) (RoomInfo, error) {
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
e, err := s.rooms.Get(ctx, roomID)
|
||||
if err != nil {
|
||||
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
||||
return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, ErrNotFound)
|
||||
}
|
||||
return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, err)
|
||||
}
|
||||
var info RoomInfo
|
||||
if err := json.Unmarshal(e.Value(), &info); err != nil {
|
||||
return RoomInfo{}, fmt.Errorf("membership: unmarshal room %q: %w", roomID, err)
|
||||
}
|
||||
return info, nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) AddMember(roomID string, m Member, epoch int, sealedKeyBytes []byte) error {
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
if err := s.putMember(ctx, roomID, m); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(sealedKeyBytes) > 0 {
|
||||
if _, err := s.keys.Put(ctx, sealedKey(roomID, m.Endpoint, epoch), sealedKeyBytes); err != nil {
|
||||
return fmt.Errorf("membership: put member key: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) GetMember(roomID, endpoint string) (Member, error) {
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
e, err := s.members.Get(ctx, memberKey(roomID, endpoint))
|
||||
if err != nil {
|
||||
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
||||
return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, ErrNotFound)
|
||||
}
|
||||
return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, err)
|
||||
}
|
||||
var m Member
|
||||
if err := json.Unmarshal(e.Value(), &m); err != nil {
|
||||
return Member{}, fmt.Errorf("membership: unmarshal member: %w", err)
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) ListMembers(roomID string) ([]Member, error) {
|
||||
entries, err := s.watchEntries(s.members, roomID+".*")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("membership: list members %q: %w", roomID, err)
|
||||
}
|
||||
out := make([]Member, 0, len(entries))
|
||||
for _, e := range entries {
|
||||
var m Member
|
||||
if err := json.Unmarshal(e.Value(), &m); err != nil {
|
||||
return nil, fmt.Errorf("membership: unmarshal member: %w", err)
|
||||
}
|
||||
out = append(out, m)
|
||||
}
|
||||
sort.Slice(out, func(i, j int) bool { return out[i].Endpoint < out[j].Endpoint })
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error) {
|
||||
entries, err := s.watchEntries(s.byMember, endpoint+".*")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("membership: list rooms for endpoint %q: %w", endpoint, err)
|
||||
}
|
||||
out := make([]RoomMembership, 0, len(entries))
|
||||
for _, e := range entries {
|
||||
// Key is "<endpoint>.<roomID>"; the roomID is everything after the dot.
|
||||
roomID := e.Key()[len(endpoint)+1:]
|
||||
info, err := s.GetRoom(roomID)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrNotFound) {
|
||||
continue // index points at a removed room: skip, stay consistent
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, RoomMembership{RoomInfo: info, Role: string(e.Value())})
|
||||
}
|
||||
sort.Slice(out, func(i, j int) bool { return out[i].RoomID < out[j].RoomID })
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) {
|
||||
if epoch > 0 {
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
e, err := s.keys.Get(ctx, sealedKey(roomID, endpoint, epoch))
|
||||
if err != nil {
|
||||
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
||||
return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, ErrNotFound)
|
||||
}
|
||||
return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, err)
|
||||
}
|
||||
return epoch, e.Value(), nil
|
||||
}
|
||||
// epoch <= 0: latest. Enumerate "<roomID>.<endpoint>.*" and take the max.
|
||||
entries, err := s.watchEntries(s.keys, roomID+"."+endpoint+".*")
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("membership: get latest sealed key %q/%q: %w", roomID, endpoint, err)
|
||||
}
|
||||
bestEpoch, bestVal := -1, []byte(nil)
|
||||
for _, e := range entries {
|
||||
k := e.Key()
|
||||
ep, perr := strconv.Atoi(k[len(roomID)+1+len(endpoint)+1:])
|
||||
if perr != nil {
|
||||
continue
|
||||
}
|
||||
if ep > bestEpoch {
|
||||
bestEpoch, bestVal = ep, e.Value()
|
||||
}
|
||||
}
|
||||
if bestEpoch < 0 {
|
||||
return 0, nil, fmt.Errorf("membership: get latest sealed key %q/%q: %w", roomID, endpoint, ErrNotFound)
|
||||
}
|
||||
return bestEpoch, bestVal, nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error {
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
for endpoint, sealed := range keys {
|
||||
if _, err := s.keys.Put(ctx, sealedKey(roomID, endpoint, epoch), sealed); err != nil {
|
||||
return fmt.Errorf("membership: put sealed key for %q: %w", endpoint, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) BumpEpoch(roomID string, newEpoch int) error {
|
||||
// Read-modify-write the room's epoch. The control plane serializes rekeys per
|
||||
// room (owner-signed), so the lost-update window is not exercised in practice.
|
||||
info, err := s.GetRoom(roomID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err)
|
||||
}
|
||||
info.Epoch = newEpoch
|
||||
b, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: marshal room: %w", err)
|
||||
}
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
if _, err := s.rooms.Put(ctx, roomID, b); err != nil {
|
||||
return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) RemoveMember(roomID, endpoint string) error {
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
// Drop the member row and its reverse index. Past-epoch sealed keys are left
|
||||
// intact (they only decrypt data the member could already read), matching the
|
||||
// SQLite store.
|
||||
if err := s.members.Delete(ctx, memberKey(roomID, endpoint)); err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) {
|
||||
return fmt.Errorf("membership: remove member %q/%q: %w", roomID, endpoint, err)
|
||||
}
|
||||
if err := s.byMember.Delete(ctx, byMemberKey(endpoint, roomID)); err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) {
|
||||
return fmt.Errorf("membership: remove member index %q/%q: %w", roomID, endpoint, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ---- users (the bus allowlist) -------------------------------------------
|
||||
|
||||
func (s *jetstreamStore) AddUser(signPub, handle, role string) error {
|
||||
signPub = normalizeSignPub(signPub)
|
||||
if signPub == "" || handle == "" {
|
||||
return fmt.Errorf("membership: AddUser: sign_pub and handle required")
|
||||
}
|
||||
if role == "" {
|
||||
role = RoleMember
|
||||
}
|
||||
if role != RoleAdmin && role != RoleMember {
|
||||
return fmt.Errorf("membership: AddUser: invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember)
|
||||
}
|
||||
u := User{SignPub: signPub, Handle: handle, Role: role, Status: StatusActive, CreatedAt: nowRFC3339()}
|
||||
b, err := json.Marshal(u)
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: marshal user: %w", err)
|
||||
}
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
if _, err := s.users.Create(ctx, signPub, b); err != nil {
|
||||
if errors.Is(err, jetstream.ErrKeyExists) {
|
||||
return ErrUserExists
|
||||
}
|
||||
return fmt.Errorf("membership: insert user: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) GetUser(signPub string) (User, error) {
|
||||
signPub = normalizeSignPub(signPub)
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
e, err := s.users.Get(ctx, signPub)
|
||||
if err != nil {
|
||||
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
||||
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, ErrNotFound)
|
||||
}
|
||||
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, err)
|
||||
}
|
||||
var u User
|
||||
if err := json.Unmarshal(e.Value(), &u); err != nil {
|
||||
return User{}, fmt.Errorf("membership: unmarshal user: %w", err)
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) ListUsers() ([]User, error) {
|
||||
ctx, cancel := s.ctx()
|
||||
w, err := s.users.WatchAll(ctx, jetstream.IgnoreDeletes())
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("membership: list users: %w", err)
|
||||
}
|
||||
defer cancel()
|
||||
defer w.Stop()
|
||||
var out []User
|
||||
for {
|
||||
select {
|
||||
case e := <-w.Updates():
|
||||
if e == nil {
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
if out[i].Handle != out[j].Handle {
|
||||
return out[i].Handle < out[j].Handle
|
||||
}
|
||||
return out[i].SignPub < out[j].SignPub
|
||||
})
|
||||
return out, nil
|
||||
}
|
||||
var u User
|
||||
if err := json.Unmarshal(e.Value(), &u); err != nil {
|
||||
return nil, fmt.Errorf("membership: unmarshal user: %w", err)
|
||||
}
|
||||
out = append(out, u)
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) RevokeUser(signPub string) error {
|
||||
signPub = normalizeSignPub(signPub)
|
||||
u, err := s.GetUser(signPub)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrNotFound) {
|
||||
return fmt.Errorf("membership: revoke user %q: no active user with that key", signPub)
|
||||
}
|
||||
return fmt.Errorf("membership: revoke user %q: %w", signPub, err)
|
||||
}
|
||||
if u.Status != StatusActive {
|
||||
return fmt.Errorf("membership: revoke user %q: no active user with that key", signPub)
|
||||
}
|
||||
u.Status = StatusRevoked
|
||||
u.RevokedAt = nowRFC3339()
|
||||
b, err := json.Marshal(u)
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: marshal user: %w", err)
|
||||
}
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
if _, err := s.users.Put(ctx, signPub, b); err != nil {
|
||||
return fmt.Errorf("membership: revoke user %q: %w", signPub, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsAuthorized reports whether signPub is an active bus user. Any backend error
|
||||
// (including a KV quorum loss or timeout) yields false: fail closed.
|
||||
func (s *jetstreamStore) IsAuthorized(signPub string) bool {
|
||||
signPub = normalizeSignPub(signPub)
|
||||
if signPub == "" {
|
||||
return false
|
||||
}
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
e, err := s.users.Get(ctx, signPub)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
var u User
|
||||
if err := json.Unmarshal(e.Value(), &u); err != nil {
|
||||
return false
|
||||
}
|
||||
return u.Status == StatusActive
|
||||
}
|
||||
|
||||
// HasAdmin reports whether at least one active admin exists. On any backend
|
||||
// error it returns false, keeping the admin-gated endpoints closed (conservative).
|
||||
func (s *jetstreamStore) HasAdmin() bool {
|
||||
users, err := s.ListUsers()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
for _, u := range users {
|
||||
if u.Role == RoleAdmin && u.Status == StatusActive {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -0,0 +1,275 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
server "github.com/nats-io/nats-server/v2/server"
|
||||
)
|
||||
|
||||
func kvFreePort(t *testing.T) int {
|
||||
t.Helper()
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("free port: %v", err)
|
||||
}
|
||||
defer l.Close()
|
||||
return l.Addr().(*net.TCPAddr).Port
|
||||
}
|
||||
|
||||
// newKVStore boots a single-node embedded NATS with JetStream and opens a
|
||||
// jetstreamStore (R1) over it, returning the store plus the server and
|
||||
// connection so a test can shut the backend down to exercise fail-closed paths.
|
||||
func newKVStore(t *testing.T) (*jetstreamStore, *server.Server, *nats.Conn) {
|
||||
t.Helper()
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: t.TempDir(),
|
||||
Host: "127.0.0.1",
|
||||
Port: kvFreePort(t),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("embedded nats: %v", err)
|
||||
}
|
||||
nc, err := nats.Connect(ns.ClientURL())
|
||||
if err != nil {
|
||||
ns.Shutdown()
|
||||
t.Fatalf("nats connect: %v", err)
|
||||
}
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
t.Fatalf("jetstream: %v", err)
|
||||
}
|
||||
st, err := OpenJetStream(js, JetStreamConfig{Replicas: 1, OpTimeout: 2 * time.Second})
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
t.Fatalf("open jetstream store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
ns.WaitForShutdown()
|
||||
})
|
||||
return st.(*jetstreamStore), ns, nc
|
||||
}
|
||||
|
||||
// TestJetStreamStoreRoomsCRUD is the golden path: an encrypted room with an owner
|
||||
// and an invited member round-trips through every room/member/key method.
|
||||
func TestJetStreamStoreRoomsCRUD(t *testing.T) {
|
||||
s, _, _ := newKVStore(t)
|
||||
|
||||
roomID := newULID()
|
||||
owner := "owner-ep-1"
|
||||
info := RoomInfo{RoomID: roomID, Subject: "room.kv", Encrypt: true, Persist: true, SignMsgs: true, OwnerEndpoint: owner}
|
||||
ownerSealed := []byte("sealed-owner-epoch1")
|
||||
if err := s.CreateRoom(info, []byte("owner-sign"), []byte("owner-kex"), ownerSealed); err != nil {
|
||||
t.Fatalf("CreateRoom: %v", err)
|
||||
}
|
||||
|
||||
// GetRoom returns epoch 1 and the policy.
|
||||
got, err := s.GetRoom(roomID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetRoom: %v", err)
|
||||
}
|
||||
if got.Epoch != 1 || got.Subject != "room.kv" || !got.Encrypt || got.OwnerEndpoint != owner {
|
||||
t.Fatalf("GetRoom mismatch: %+v", got)
|
||||
}
|
||||
|
||||
// Owner is a member with role "owner".
|
||||
om, err := s.GetMember(roomID, owner)
|
||||
if err != nil {
|
||||
t.Fatalf("GetMember owner: %v", err)
|
||||
}
|
||||
if om.Role != "owner" || !bytes.Equal(om.SignPub, []byte("owner-sign")) {
|
||||
t.Fatalf("owner member mismatch: %+v", om)
|
||||
}
|
||||
|
||||
// Owner's sealed key at epoch 1.
|
||||
ep, sealed, err := s.GetSealedKey(roomID, owner, 1)
|
||||
if err != nil || ep != 1 || !bytes.Equal(sealed, ownerSealed) {
|
||||
t.Fatalf("GetSealedKey owner: ep=%d sealed=%q err=%v", ep, sealed, err)
|
||||
}
|
||||
|
||||
// Invite a member with a sealed key at epoch 1.
|
||||
bob := "member-ep-bob"
|
||||
bobSealed := []byte("sealed-bob-epoch1")
|
||||
if err := s.AddMember(roomID, Member{Endpoint: bob, Role: "member", SignPub: []byte("bob-sign"), KexPub: []byte("bob-kex")}, 1, bobSealed); err != nil {
|
||||
t.Fatalf("AddMember: %v", err)
|
||||
}
|
||||
|
||||
// ListMembers returns both, sorted by endpoint.
|
||||
members, err := s.ListMembers(roomID)
|
||||
if err != nil {
|
||||
t.Fatalf("ListMembers: %v", err)
|
||||
}
|
||||
if len(members) != 2 {
|
||||
t.Fatalf("ListMembers want 2, got %d (%+v)", len(members), members)
|
||||
}
|
||||
|
||||
// Bob can find the room via the reverse index.
|
||||
rooms, err := s.ListRoomsForEndpoint(bob)
|
||||
if err != nil {
|
||||
t.Fatalf("ListRoomsForEndpoint: %v", err)
|
||||
}
|
||||
if len(rooms) != 1 || rooms[0].RoomID != roomID || rooms[0].Role != "member" {
|
||||
t.Fatalf("ListRoomsForEndpoint mismatch: %+v", rooms)
|
||||
}
|
||||
|
||||
// Latest sealed key (epoch <= 0) resolves to epoch 1 for bob.
|
||||
lep, lsealed, err := s.GetSealedKey(roomID, bob, 0)
|
||||
if err != nil || lep != 1 || !bytes.Equal(lsealed, bobSealed) {
|
||||
t.Fatalf("GetSealedKey latest bob: ep=%d err=%v", lep, err)
|
||||
}
|
||||
|
||||
// Rekey to epoch 2 (bump + new sealed keys), then latest resolves to 2.
|
||||
if err := s.BumpEpoch(roomID, 2); err != nil {
|
||||
t.Fatalf("BumpEpoch: %v", err)
|
||||
}
|
||||
if err := s.PutSealedKeys(roomID, 2, map[string][]byte{owner: []byte("owner-epoch2")}); err != nil {
|
||||
t.Fatalf("PutSealedKeys: %v", err)
|
||||
}
|
||||
got2, _ := s.GetRoom(roomID)
|
||||
if got2.Epoch != 2 {
|
||||
t.Fatalf("after BumpEpoch want epoch 2, got %d", got2.Epoch)
|
||||
}
|
||||
lep2, _, err := s.GetSealedKey(roomID, owner, 0)
|
||||
if err != nil || lep2 != 2 {
|
||||
t.Fatalf("latest owner key after rekey: ep=%d err=%v", lep2, err)
|
||||
}
|
||||
|
||||
// Remove bob; he disappears from members and his reverse index.
|
||||
if err := s.RemoveMember(roomID, bob); err != nil {
|
||||
t.Fatalf("RemoveMember: %v", err)
|
||||
}
|
||||
if _, err := s.GetMember(roomID, bob); !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("GetMember after remove want ErrNotFound, got %v", err)
|
||||
}
|
||||
rooms2, _ := s.ListRoomsForEndpoint(bob)
|
||||
if len(rooms2) != 0 {
|
||||
t.Fatalf("ListRoomsForEndpoint after remove want 0, got %d", len(rooms2))
|
||||
}
|
||||
}
|
||||
|
||||
// TestJetStreamStoreUsers exercises the allowlist: add, lookup, authorize,
|
||||
// revoke (which flips IsAuthorized), and the admin gate.
|
||||
func TestJetStreamStoreUsers(t *testing.T) {
|
||||
s, _, _ := newKVStore(t)
|
||||
|
||||
const aliceHex = "aa11"
|
||||
if s.HasAdmin() {
|
||||
t.Fatalf("fresh store should have no admin")
|
||||
}
|
||||
if err := s.AddUser(aliceHex, "alice", RoleAdmin); err != nil {
|
||||
t.Fatalf("AddUser: %v", err)
|
||||
}
|
||||
if !s.HasAdmin() {
|
||||
t.Fatalf("HasAdmin should be true after adding an admin")
|
||||
}
|
||||
if !s.IsAuthorized(aliceHex) {
|
||||
t.Fatalf("alice should be authorized")
|
||||
}
|
||||
// Case-insensitive lookup (keys are normalized lowercase).
|
||||
if !s.IsAuthorized("AA11") {
|
||||
t.Fatalf("uppercase hex should normalize and authorize")
|
||||
}
|
||||
u, err := s.GetUser(aliceHex)
|
||||
if err != nil || u.Handle != "alice" || u.Role != RoleAdmin || u.Status != StatusActive {
|
||||
t.Fatalf("GetUser mismatch: %+v err=%v", u, err)
|
||||
}
|
||||
|
||||
// Duplicate add is rejected with ErrUserExists.
|
||||
if err := s.AddUser(aliceHex, "alice2", RoleMember); !errors.Is(err, ErrUserExists) {
|
||||
t.Fatalf("duplicate AddUser want ErrUserExists, got %v", err)
|
||||
}
|
||||
|
||||
if err := s.AddUser("bb22", "bob", RoleMember); err != nil {
|
||||
t.Fatalf("AddUser bob: %v", err)
|
||||
}
|
||||
users, err := s.ListUsers()
|
||||
if err != nil || len(users) != 2 {
|
||||
t.Fatalf("ListUsers want 2, got %d err=%v", len(users), err)
|
||||
}
|
||||
|
||||
// Revoke alice: authorization flips off immediately.
|
||||
if err := s.RevokeUser(aliceHex); err != nil {
|
||||
t.Fatalf("RevokeUser: %v", err)
|
||||
}
|
||||
if s.IsAuthorized(aliceHex) {
|
||||
t.Fatalf("revoked user must not be authorized")
|
||||
}
|
||||
if s.HasAdmin() {
|
||||
t.Fatalf("after revoking the only admin, HasAdmin must be false")
|
||||
}
|
||||
// Revoking again is an error (no active user).
|
||||
if err := s.RevokeUser(aliceHex); err == nil {
|
||||
t.Fatalf("re-revoke should error")
|
||||
}
|
||||
}
|
||||
|
||||
// TestJetStreamStoreNotFound checks the ErrNotFound mapping for misses.
|
||||
func TestJetStreamStoreNotFound(t *testing.T) {
|
||||
s, _, _ := newKVStore(t)
|
||||
if _, err := s.GetRoom("nope"); !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("GetRoom miss want ErrNotFound, got %v", err)
|
||||
}
|
||||
if _, err := s.GetMember("nope", "x"); !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("GetMember miss want ErrNotFound, got %v", err)
|
||||
}
|
||||
if _, _, err := s.GetSealedKey("nope", "x", 1); !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("GetSealedKey miss want ErrNotFound, got %v", err)
|
||||
}
|
||||
if _, _, err := s.GetSealedKey("nope", "x", 0); !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("GetSealedKey latest miss want ErrNotFound, got %v", err)
|
||||
}
|
||||
if _, err := s.GetUser("ffff"); !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("GetUser miss want ErrNotFound, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestJetStreamStoreIsAuthorizedFailClosed is the error path mandated by the
|
||||
// issue: when the KV backend is unavailable (here the NATS server is shut down),
|
||||
// IsAuthorized must DENY, never admit. A previously-authorized identity flips to
|
||||
// unauthorized once the backend cannot be reached.
|
||||
func TestJetStreamStoreIsAuthorizedFailClosed(t *testing.T) {
|
||||
s, ns, nc := newKVStore(t)
|
||||
|
||||
const aliceHex = "abcd"
|
||||
if err := s.AddUser(aliceHex, "alice", RoleAdmin); err != nil {
|
||||
t.Fatalf("AddUser: %v", err)
|
||||
}
|
||||
if !s.IsAuthorized(aliceHex) {
|
||||
t.Fatalf("alice should be authorized while the backend is up")
|
||||
}
|
||||
|
||||
// Take the KV backend away: close the client and stop the server. Every
|
||||
// subsequent KV Get fails, and the store must fail closed.
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
ns.WaitForShutdown()
|
||||
|
||||
// Bound the assertion: IsAuthorized internally caps each op at OpTimeout, so
|
||||
// this returns well before the test deadline.
|
||||
done := make(chan bool, 1)
|
||||
go func() { done <- s.IsAuthorized(aliceHex) }()
|
||||
select {
|
||||
case authorized := <-done:
|
||||
if authorized {
|
||||
t.Fatalf("KV backend down but IsAuthorized returned true: NOT fail-closed")
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatalf("IsAuthorized hung when the backend was down (no bounded timeout)")
|
||||
}
|
||||
|
||||
// HasAdmin is likewise conservative: backend down -> false (gates stay closed).
|
||||
if s.HasAdmin() {
|
||||
t.Fatalf("KV backend down but HasAdmin returned true: NOT fail-closed")
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,6 @@ package membership
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -56,7 +55,7 @@ const (
|
||||
// rate limiting, and read endpoints (GET) are unauthenticated. Hardening
|
||||
// (mTLS, capabilities, rate limits) is a later phase.
|
||||
type Server struct {
|
||||
store *Store
|
||||
store Store
|
||||
blobs *blobstore.Store
|
||||
mux *http.ServeMux
|
||||
authMode AuthMode
|
||||
@@ -79,7 +78,7 @@ type Server struct {
|
||||
// tests that have not migrated to signed requests yet). It installs a per-IP
|
||||
// rate limiter with the package defaults; loopback dev behavior is unchanged
|
||||
// because the burst comfortably exceeds any single client's request rate.
|
||||
func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server {
|
||||
func NewServer(store Store, blobs *blobstore.Store, authMode AuthMode) *Server {
|
||||
s := &Server{
|
||||
store: store,
|
||||
blobs: blobs,
|
||||
@@ -456,7 +455,7 @@ func (s *Server) handleGetKey(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
ep, sealed, err := s.store.GetSealedKey(roomID, endpoint, epoch)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
if errors.Is(err, ErrNotFound) {
|
||||
writeErr(w, http.StatusForbidden,
|
||||
"not invited to this encrypted room: no key has been sealed for your identity. Ask the room owner to invite you before joining.")
|
||||
return
|
||||
|
||||
+77
-18
@@ -13,6 +13,7 @@ package membership
|
||||
import (
|
||||
"database/sql"
|
||||
"embed"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"sort"
|
||||
@@ -26,6 +27,14 @@ import (
|
||||
//go:embed migrations/*.sql
|
||||
var migrationsFS embed.FS
|
||||
|
||||
// ErrNotFound is the store-agnostic "no such record" sentinel. Both backends
|
||||
// (SQLite and JetStream KV) return it, wrapped, when a lookup misses, so callers
|
||||
// distinguish "not invited / no key yet" from a genuine backend failure without
|
||||
// depending on a specific driver's error (the SQLite store maps sql.ErrNoRows to
|
||||
// it; the KV store maps a missing key to it). This is what lets the control
|
||||
// plane stay storage-agnostic under the branch-by-abstraction of issue 0003b.
|
||||
var ErrNotFound = errors.New("membership: not found")
|
||||
|
||||
// Member is a participant of a room with their published public keys.
|
||||
type Member struct {
|
||||
Endpoint string `json:"endpoint"`
|
||||
@@ -45,14 +54,58 @@ type RoomInfo struct {
|
||||
OwnerEndpoint string
|
||||
}
|
||||
|
||||
// Store is the SQLite-backed membership/key store.
|
||||
type Store struct {
|
||||
// Store is the membership/key control-plane store: the authoritative source of
|
||||
// room metadata, the member directory, per-epoch sealed room keys, and the bus
|
||||
// user allowlist. It is an interface (branch-by-abstraction, issue 0003b) with
|
||||
// two implementations: sqliteStore (the default, single-node, local SQLite) and
|
||||
// jetstreamStore (rooms/members/keys/users on replicated JetStream KV, selected
|
||||
// when the `decentralized` flag is on). Every lookup miss returns ErrNotFound
|
||||
// (wrapped); every implementation MUST fail closed (IsAuthorized returns false
|
||||
// on any backend error), so a KV quorum loss denies rather than admits.
|
||||
type Store interface {
|
||||
// Rooms / members / keys.
|
||||
CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error
|
||||
GetRoom(roomID string) (RoomInfo, error)
|
||||
AddMember(roomID string, m Member, epoch int, sealedKey []byte) error
|
||||
GetMember(roomID, endpoint string) (Member, error)
|
||||
ListMembers(roomID string) ([]Member, error)
|
||||
ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error)
|
||||
GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error)
|
||||
PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error
|
||||
BumpEpoch(roomID string, newEpoch int) error
|
||||
RemoveMember(roomID, endpoint string) error
|
||||
|
||||
// Users (the bus allowlist).
|
||||
AddUser(signPub, handle, role string) error
|
||||
GetUser(signPub string) (User, error)
|
||||
ListUsers() ([]User, error)
|
||||
RevokeUser(signPub string) error
|
||||
IsAuthorized(signPub string) bool
|
||||
HasAdmin() bool
|
||||
|
||||
// Lifecycle.
|
||||
Close() error
|
||||
}
|
||||
|
||||
// sqliteStore is the SQLite-backed implementation of Store (the default,
|
||||
// single-node backend). It stays the production default while the
|
||||
// `decentralized` flag is off.
|
||||
type sqliteStore struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// Open opens (creating if needed) the SQLite database at path and applies all
|
||||
// embedded migrations idempotently.
|
||||
func Open(path string) (*Store, error) {
|
||||
// Open opens (creating if needed) the SQLite database at path, applies all
|
||||
// embedded migrations idempotently, and returns it as a Store. It remains the
|
||||
// default control-plane backend; the JetStream KV store is opened separately
|
||||
// (OpenJetStream) when decentralization is enabled.
|
||||
func Open(path string) (Store, error) {
|
||||
return openSQLite(path)
|
||||
}
|
||||
|
||||
// openSQLite is the concrete constructor, returning *sqliteStore so internal
|
||||
// callers (e.g. the SQLite->KV migration) can use SQLite-specific helpers that
|
||||
// are not part of the storage-agnostic Store interface.
|
||||
func openSQLite(path string) (*sqliteStore, error) {
|
||||
// _pragma busy_timeout avoids spurious "database is locked" under concurrent
|
||||
// HTTP handlers; foreign_keys kept off — we manage referential integrity in code.
|
||||
dsn := fmt.Sprintf("file:%s?_pragma=busy_timeout(5000)&_pragma=journal_mode(WAL)", path)
|
||||
@@ -64,7 +117,7 @@ func Open(path string) (*Store, error) {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("membership: ping db: %w", err)
|
||||
}
|
||||
s := &Store{db: db}
|
||||
s := &sqliteStore{db: db}
|
||||
if err := s.applyMigrations(); err != nil {
|
||||
db.Close()
|
||||
return nil, err
|
||||
@@ -73,11 +126,11 @@ func Open(path string) (*Store, error) {
|
||||
}
|
||||
|
||||
// Close closes the underlying database.
|
||||
func (s *Store) Close() error { return s.db.Close() }
|
||||
func (s *sqliteStore) Close() error { return s.db.Close() }
|
||||
|
||||
// applyMigrations runs every embedded migration in lexical order, tolerating
|
||||
// the "already applied" errors that SQLite's non-idempotent DDL produces.
|
||||
func (s *Store) applyMigrations() error {
|
||||
func (s *sqliteStore) applyMigrations() error {
|
||||
files, err := fs.Glob(migrationsFS, "migrations/*.sql")
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: glob migrations: %w", err)
|
||||
@@ -103,7 +156,7 @@ func nowRFC3339() string { return time.Now().UTC().Format(time.RFC3339Nano) }
|
||||
// CreateRoom inserts a room at epoch 1, registers the owner as a member with
|
||||
// role "owner", and stores the owner's sealed key for epoch 1. Idempotent
|
||||
// inserts are not used: a duplicate room_id returns an error.
|
||||
func (s *Store) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error {
|
||||
func (s *sqliteStore) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: begin: %w", err)
|
||||
@@ -142,7 +195,7 @@ func (s *Store) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealed
|
||||
}
|
||||
|
||||
// GetRoom returns room metadata (including current epoch).
|
||||
func (s *Store) GetRoom(roomID string) (RoomInfo, error) {
|
||||
func (s *sqliteStore) GetRoom(roomID string) (RoomInfo, error) {
|
||||
var info RoomInfo
|
||||
var enc, per, sgn int
|
||||
err := s.db.QueryRow(
|
||||
@@ -158,7 +211,7 @@ func (s *Store) GetRoom(roomID string) (RoomInfo, error) {
|
||||
|
||||
// AddMember inserts a member at the given role and stores their sealed key for
|
||||
// the supplied epoch.
|
||||
func (s *Store) AddMember(roomID string, m Member, epoch int, sealedKey []byte) error {
|
||||
func (s *sqliteStore) AddMember(roomID string, m Member, epoch int, sealedKey []byte) error {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: begin: %w", err)
|
||||
@@ -185,7 +238,7 @@ func (s *Store) AddMember(roomID string, m Member, epoch int, sealedKey []byte)
|
||||
}
|
||||
|
||||
// GetMember returns a single member of a room.
|
||||
func (s *Store) GetMember(roomID, endpoint string) (Member, error) {
|
||||
func (s *sqliteStore) GetMember(roomID, endpoint string) (Member, error) {
|
||||
var m Member
|
||||
err := s.db.QueryRow(
|
||||
`SELECT endpoint, role, sign_pub, kex_pub FROM members WHERE room_id = ? AND endpoint = ?`,
|
||||
@@ -198,7 +251,7 @@ func (s *Store) GetMember(roomID, endpoint string) (Member, error) {
|
||||
}
|
||||
|
||||
// ListMembers returns all members of a room ordered by endpoint.
|
||||
func (s *Store) ListMembers(roomID string) ([]Member, error) {
|
||||
func (s *sqliteStore) ListMembers(roomID string) ([]Member, error) {
|
||||
rows, err := s.db.Query(
|
||||
`SELECT endpoint, role, sign_pub, kex_pub FROM members WHERE room_id = ? ORDER BY endpoint`,
|
||||
roomID,
|
||||
@@ -230,7 +283,7 @@ type RoomMembership struct {
|
||||
// ListRoomsForEndpoint returns every room the given endpoint is a member of,
|
||||
// with the room's current metadata and the endpoint's role, ordered by room id.
|
||||
// An endpoint that is in no rooms yields an empty slice (not an error).
|
||||
func (s *Store) ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error) {
|
||||
func (s *sqliteStore) ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error) {
|
||||
rows, err := s.db.Query(
|
||||
`SELECT r.room_id, r.subject, r.key_epoch, r.encrypt, r.persist, r.sign_msgs, r.owner_endpoint, m.role
|
||||
FROM members m JOIN rooms r ON r.room_id = m.room_id
|
||||
@@ -257,7 +310,7 @@ func (s *Store) ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error)
|
||||
|
||||
// GetSealedKey returns the sealed room key for an endpoint at a given epoch.
|
||||
// If epoch <= 0, the latest epoch for that endpoint is returned.
|
||||
func (s *Store) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) {
|
||||
func (s *sqliteStore) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) {
|
||||
var ep int
|
||||
var sealed []byte
|
||||
var err error
|
||||
@@ -275,6 +328,12 @@ func (s *Store) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, e
|
||||
).Scan(&ep, &sealed)
|
||||
}
|
||||
if err != nil {
|
||||
// Map "no such row" to the store-agnostic sentinel so the control plane
|
||||
// can tell "not invited / no key yet" (-> 403 with a helpful message) from
|
||||
// a genuine backend failure, the same way the KV store will.
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, ErrNotFound)
|
||||
}
|
||||
return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, err)
|
||||
}
|
||||
return ep, sealed, nil
|
||||
@@ -282,7 +341,7 @@ func (s *Store) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, e
|
||||
|
||||
// PutSealedKeys stores a batch of sealed keys for the given epoch (endpoint ->
|
||||
// sealed bytes), upserting on conflict so a rekey can overwrite stale entries.
|
||||
func (s *Store) PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error {
|
||||
func (s *sqliteStore) PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: begin: %w", err)
|
||||
@@ -301,7 +360,7 @@ func (s *Store) PutSealedKeys(roomID string, epoch int, keys map[string][]byte)
|
||||
}
|
||||
|
||||
// BumpEpoch sets the room's current key_epoch to newEpoch.
|
||||
func (s *Store) BumpEpoch(roomID string, newEpoch int) error {
|
||||
func (s *sqliteStore) BumpEpoch(roomID string, newEpoch int) error {
|
||||
if _, err := s.db.Exec(`UPDATE rooms SET key_epoch = ? WHERE room_id = ?`, newEpoch, roomID); err != nil {
|
||||
return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err)
|
||||
}
|
||||
@@ -310,7 +369,7 @@ func (s *Store) BumpEpoch(roomID string, newEpoch int) error {
|
||||
|
||||
// RemoveMember deletes a member from a room. Their sealed keys for past epochs
|
||||
// are left intact (they encrypt only data that member could already read).
|
||||
func (s *Store) RemoveMember(roomID, endpoint string) error {
|
||||
func (s *sqliteStore) RemoveMember(roomID, endpoint string) error {
|
||||
if _, err := s.db.Exec(`DELETE FROM members WHERE room_id = ? AND endpoint = ?`, roomID, endpoint); err != nil {
|
||||
return fmt.Errorf("membership: remove member %q/%q: %w", roomID, endpoint, err)
|
||||
}
|
||||
|
||||
@@ -6,10 +6,10 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func openTestStore(t *testing.T) *Store {
|
||||
func openTestStore(t *testing.T) *sqliteStore {
|
||||
t.Helper()
|
||||
path := filepath.Join(t.TempDir(), "test.db")
|
||||
s, err := Open(path)
|
||||
s, err := openSQLite(path)
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ func normalizeSignPub(signPub string) string {
|
||||
// AddUser inserts a new bus user. role defaults to RoleMember when empty. It
|
||||
// returns ErrUserExists if the sign_pub is already registered (the caller may
|
||||
// choose to revoke+re-add or ignore). handle and signPub must be non-empty.
|
||||
func (s *Store) AddUser(signPub, handle, role string) error {
|
||||
func (s *sqliteStore) AddUser(signPub, handle, role string) error {
|
||||
signPub = normalizeSignPub(signPub)
|
||||
if signPub == "" || handle == "" {
|
||||
return fmt.Errorf("membership: AddUser: sign_pub and handle required")
|
||||
@@ -74,7 +74,7 @@ func (s *Store) AddUser(signPub, handle, role string) error {
|
||||
|
||||
// GetUser returns the user with the given signing public key. It returns
|
||||
// sql.ErrNoRows (wrapped) when there is no such user.
|
||||
func (s *Store) GetUser(signPub string) (User, error) {
|
||||
func (s *sqliteStore) GetUser(signPub string) (User, error) {
|
||||
signPub = normalizeSignPub(signPub)
|
||||
var u User
|
||||
var revoked sql.NullString
|
||||
@@ -90,7 +90,7 @@ func (s *Store) GetUser(signPub string) (User, error) {
|
||||
}
|
||||
|
||||
// ListUsers returns every user ordered by handle then sign_pub (stable output).
|
||||
func (s *Store) ListUsers() ([]User, error) {
|
||||
func (s *sqliteStore) ListUsers() ([]User, error) {
|
||||
rows, err := s.db.Query(
|
||||
`SELECT sign_pub, handle, role, status, created_at, revoked_at FROM users ORDER BY handle, sign_pub`,
|
||||
)
|
||||
@@ -116,7 +116,7 @@ func (s *Store) ListUsers() ([]User, error) {
|
||||
// status flip (not a delete) so the identity stays auditable and IsAuthorized
|
||||
// immediately denies it on both planes. Revoking an unknown or already-revoked
|
||||
// user returns an error / is a no-op respectively.
|
||||
func (s *Store) RevokeUser(signPub string) error {
|
||||
func (s *sqliteStore) RevokeUser(signPub string) error {
|
||||
signPub = normalizeSignPub(signPub)
|
||||
res, err := s.db.Exec(
|
||||
`UPDATE users SET status = ?, revoked_at = ? WHERE sign_pub = ? AND status = ?`,
|
||||
@@ -140,7 +140,7 @@ func (s *Store) RevokeUser(signPub string) error {
|
||||
// plane (HTTP request middleware) and the data plane (NATS nkey authenticator),
|
||||
// so revoking a user denies access on both without restarting anything. An
|
||||
// unknown key, a revoked key, or any query error all yield false (fail closed).
|
||||
func (s *Store) IsAuthorized(signPub string) bool {
|
||||
func (s *sqliteStore) IsAuthorized(signPub string) bool {
|
||||
signPub = normalizeSignPub(signPub)
|
||||
if signPub == "" {
|
||||
return false
|
||||
@@ -155,7 +155,7 @@ func (s *Store) IsAuthorized(signPub string) bool {
|
||||
// HasAdmin reports whether at least one active admin exists. The control plane
|
||||
// uses it to gate user-management endpoints: until the host operator seeds the
|
||||
// first admin via the local CLI, those endpoints stay closed (chicken-egg).
|
||||
func (s *Store) HasAdmin() bool {
|
||||
func (s *sqliteStore) HasAdmin() bool {
|
||||
var one int
|
||||
err := s.db.QueryRow(
|
||||
`SELECT 1 FROM users WHERE role = ? AND status = ? LIMIT 1`, RoleAdmin, StatusActive,
|
||||
|
||||
Reference in New Issue
Block a user