Merge issue/0006c-kv-store: wire decentralized control-plane KV store (--store kv)

This commit is contained in:
2026-06-07 17:14:20 +02:00
5 changed files with 343 additions and 35 deletions
+154
View File
@@ -0,0 +1,154 @@
package main
// Wiring tests for issue 0006c: --store kv selects the replicated JetStream KV
// control plane, the authenticator serves from it through the storeHolder, and a
// new node sees state created by another (the divergence that per-node SQLite
// caused — audit 0008 N5 — is gone). Branch-by-abstraction is verified elsewhere
// (the SQLite default path is the unchanged baseline covered by the existing
// suite).
import (
"encoding/hex"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// TestKVStoreBootstrapUnderEnforce drives the exact decentralized boot the binary
// performs: build the authenticator over an empty holder, start NATS, open the
// privileged internal connection, open the KV store, publish it into the holder,
// then a real bus user (seeded into the KV store) authenticates over nkey. This
// proves the bootstrap cycle is broken correctly — the KV-backed control plane
// authorizes live clients under enforce.
func TestKVStoreBootstrapUnderEnforce(t *testing.T) {
internalID, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("internal identity: %v", err)
}
holder := &storeHolder{}
auth := busauth.NewNkeyAuthenticatorACLInternal(
holder.IsAuthorized,
busauth.PermissionsFromSubjects(holder.subjectACL),
hex.EncodeToString(internalID.SignPub),
)
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t), Auth: auth,
})
if err != nil {
t.Fatalf("nats: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
// Privileged internal connection opens the KV store while the holder still
// denies every normal client.
intNC, js, err := connectInternalJS(ns, internalID, true)
if err != nil {
t.Fatalf("connectInternalJS: %v", err)
}
t.Cleanup(intNC.Close)
kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
if err != nil {
t.Fatalf("open kv store: %v", err)
}
holder.set(kvStore)
// Seed a bus user into the KV control plane.
alice, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("alice: %v", err)
}
if err := kvStore.AddUser(hex.EncodeToString(alice.SignPub), "alice", membership.RoleMember); err != nil {
t.Fatalf("seed alice: %v", err)
}
// alice authenticates over nkey — authorized via the KV store through the holder.
pub, sign, err := busauth.ClientNkey(alice.SignPriv)
if err != nil {
t.Fatalf("alice nkey: %v", err)
}
aliceNC, err := nats.Connect(ns.ClientURL(), nats.Nkey(pub, sign), nats.MaxReconnects(0), nats.Timeout(2*time.Second))
if err != nil {
t.Fatalf("alice (KV-authorized) must connect under enforce: %v", err)
}
aliceNC.Close()
// An outsider not in the KV store is denied (fail closed).
outsider, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("outsider: %v", err)
}
opub, osign, err := busauth.ClientNkey(outsider.SignPriv)
if err != nil {
t.Fatalf("outsider nkey: %v", err)
}
if oc, err := nats.Connect(ns.ClientURL(), nats.Nkey(opub, osign), nats.MaxReconnects(0), nats.Timeout(2*time.Second)); err == nil {
oc.Close()
t.Fatalf("an outsider absent from the KV store must be rejected")
}
}
// TestKVStoreDecentralizedConsistency: a room/user created via one node's KV store
// is immediately visible to another node's KV store over the same JetStream — the
// shared, replicated control plane that ends the per-node SQLite divergence.
func TestKVStoreDecentralizedConsistency(t *testing.T) {
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t),
})
if err != nil {
t.Fatalf("nats: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
open := func() membership.Store {
nc, err := nats.Connect(ns.ClientURL())
if err != nil {
t.Fatalf("connect: %v", err)
}
t.Cleanup(nc.Close)
js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("jetstream: %v", err)
}
st, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
if err != nil {
t.Fatalf("open kv: %v", err)
}
return st
}
nodeA := open()
nodeB := open()
owner, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("owner: %v", err)
}
ownerPub := hex.EncodeToString(owner.SignPub)
if err := nodeA.AddUser(ownerPub, "owner", membership.RoleAdmin); err != nil {
t.Fatalf("nodeA add user: %v", err)
}
if err := nodeA.CreateRoom(
membership.RoomInfo{RoomID: "ROOMX", Subject: "room.shared.x", OwnerEndpoint: "owner-ep"},
owner.SignPub, owner.KexPub, nil,
); err != nil {
t.Fatalf("nodeA create room: %v", err)
}
// nodeB (a different connection, same buckets) sees both immediately.
if !nodeB.IsAuthorized(ownerPub) {
t.Fatalf("nodeB must see the user created on nodeA (decentralized state divergence)")
}
got, err := nodeB.GetRoom("ROOMX")
if err != nil {
t.Fatalf("nodeB must see the room created on nodeA: %v", err)
}
if got.Subject != "room.shared.x" {
t.Fatalf("nodeB read wrong room subject: %q", got.Subject)
}
}
+77 -34
View File
@@ -73,6 +73,11 @@ func main() {
// `nats stream update --replicas 3` when the third node joins).
kvReplicas = flag.Int("kv-replicas", 1, "JetStream replication factor for the shared nonce/KV buckets (1..3)")
caFile = flag.String("ca", "", "bus CA cert; only used to pin TLS on the internal JetStream connection to an EXTERNAL --nats-url (the embedded server uses an in-process connection that needs no CA)")
// Control-plane store backend (issue 0006c, feature flag decentralized):
// "sqlite" (default) keeps the local single-node SQLite control plane;
// "kv" puts rooms/members/keys/users in replicated JetStream KV so any node
// in the cluster serves the same state.
storeBackend = flag.String("store", "sqlite", "control-plane store backend: sqlite (default, single-node) | kv (replicated JetStream, decentralized)")
)
flag.Parse()
@@ -80,6 +85,9 @@ func main() {
if err != nil {
log.Fatalf("%v", err)
}
if *storeBackend != "sqlite" && *storeBackend != "kv" {
log.Fatalf("--store must be \"sqlite\" or \"kv\", got %q", *storeBackend)
}
// Fail-open guard (audit H2): a non-loopback bind, or any TLS flag, demands
// --bus-auth enforce. This makes an insecure public startup impossible rather
@@ -97,12 +105,13 @@ func main() {
log.SetPrefix("[membershipd] ")
// A clustered node shares its control plane with peers, so it needs a JetStream
// client to manage the replicated nonce bucket (issue 0006a). needJS will also
// be true under --store kv (issue 0006c), where the control-plane state lives in
// JetStream KV. A standalone single-node deployment needs none of this and keeps
// the in-process, in-memory behavior unchanged.
// client to manage the replicated nonce bucket (issue 0006a). --store kv (issue
// 0006c) also needs JetStream, for the control-plane KV itself. A standalone
// single-node SQLite deployment needs none of this and keeps the in-process,
// in-memory behavior unchanged.
clustered := *clusterName != ""
needJS := clustered
decentralized := *storeBackend == "kv"
needJS := clustered || decentralized
enforce := authMode == membership.AuthEnforce
// Internal service identity (issue 0006a): when the embedded data plane enforces
@@ -121,14 +130,29 @@ func main() {
internalPubHex = hex.EncodeToString(internalID.SignPub)
}
// Control plane store first: the NATS authenticator consults IsAuthorized, so
// the store must exist before the embedded server starts.
store, err := membership.Open(*dbPath)
if err != nil {
log.Fatalf("open membership store: %v", err)
// The authenticator consults the store through a holder so it can be built
// before the store exists: with --store kv the JetStream KV store opens only
// after NATS is up (the bootstrap cycle). In the default SQLite path the store
// is opened and set into the holder right here, before the server starts, so
// behavior is identical to the pre-0006c baseline. `store` is the final store
// used by the HTTP server (set below for the KV path).
holder := &storeHolder{}
var store membership.Store
if !decentralized {
store, err = membership.Open(*dbPath)
if err != nil {
log.Fatalf("open membership store: %v", err)
}
holder.set(store)
log.Printf("membership store: sqlite %s", *dbPath)
}
defer store.Close()
log.Printf("membership store: %s", *dbPath)
// Close whichever store ends up final (SQLite closes its file; the JetStream KV
// store's Close is a no-op — its NATS connection is closed separately).
defer func() {
if store != nil {
store.Close()
}
}()
blobs, err := blobstore.New(*storeDir)
if err != nil {
@@ -182,8 +206,8 @@ func main() {
// NATS freezes permissions at connect time, so a peer that joins a room
// after connecting must client.RefreshSession to gain that room's subject.
cfg.Auth = busauth.NewNkeyAuthenticatorACLInternal(
store.IsAuthorized,
busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)),
holder.IsAuthorized,
busauth.PermissionsFromSubjects(holder.subjectACL),
internalPubHex,
)
log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)")
@@ -209,6 +233,38 @@ func main() {
log.Printf("using external NATS: %s", natsClientURL)
}
// JetStream client + decentralized store (issue 0006a/c). needJS is set for a
// clustered node (shared nonce bucket) and for --store kv (the KV control
// plane). Open the privileged JetStream client first (in-process for the
// embedded server, a plain client for external NATS), then — for --store kv —
// open the replicated KV store and publish it into the holder so the
// authenticator and HTTP server serve from it. The privileged connection is the
// only client that can connect in this window (the holder still denies everyone
// else; the internal identity bypasses the store).
var js jetstream.JetStream
if needJS {
var internalNC *nats.Conn
if *natsURL == "" {
internalNC, js, err = connectInternalJS(ns, internalID, enforce)
} else {
internalNC, js, err = connectExternalJS(natsClientURL, *caFile)
}
if err != nil {
log.Fatalf("internal JetStream connection (required by --cluster-name/--store kv): %v", err)
}
defer internalNC.Close()
if decentralized {
kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: *kvReplicas})
if err != nil {
log.Fatalf("open decentralized control-plane KV store: %v", err)
}
store = kvStore
holder.set(store)
log.Printf("membership store: jetstream KV (replicas=%d)", *kvReplicas)
}
}
srv := membership.NewServer(store, blobs, authMode)
// On a public (non-loopback) bind, disable cleartext rooms: the embedded NATS
// has no per-subject ACL, so cleartext content would be readable by any
@@ -220,30 +276,17 @@ func main() {
}
// Replicated anti-replay (issue 0006a, audit 0008 N3): a clustered node MUST
// share its nonce store across the cluster via JetStream KV, or a request
// accepted on one node can be replayed to another. Open a privileged JetStream
// client (in-process for the embedded server, a plain client for an external
// NATS) and wire the shared nonce bucket. This is a HARD requirement: if the
// bucket cannot be created the node refuses to start rather than run with a
// per-process cache that leaves the replay hole open.
// share its nonce store across the cluster, or a request accepted on one node
// can be replayed to another. HARD requirement: if the bucket cannot be created
// the node refuses to start rather than run with a per-process cache that leaves
// the replay hole open.
if needJS {
var (
internalNC *nats.Conn
js jetstream.JetStream
)
if *natsURL == "" {
internalNC, js, err = connectInternalJS(ns, internalID, enforce)
} else {
internalNC, js, err = connectExternalJS(natsClientURL, *caFile)
}
if err != nil {
log.Fatalf("internal JetStream connection (required by --cluster-name): %v", err)
}
defer internalNC.Close()
if err := wireReplicatedNonces(srv, js, clustered, *kvReplicas); err != nil {
log.Fatalf("%v", err)
}
log.Printf("anti-replay: replicated nonce bucket %q (replicas=%d) — cluster-safe", "KV_UNIBUS_nonces", *kvReplicas)
if clustered {
log.Printf("anti-replay: replicated nonce bucket \"KV_UNIBUS_nonces\" (replicas=%d) — cluster-safe", *kvReplicas)
}
}
log.Printf("control-plane auth: %s", authMode)
+60
View File
@@ -0,0 +1,60 @@
package main
import (
"fmt"
"sync"
"github.com/enmanuel/unibus/pkg/membership"
)
// storeHolder is a concurrency-safe slot for the control-plane store, used to
// break the decentralized bootstrap cycle (issue 0006c): the NATS authenticator
// must be built BEFORE the embedded server starts, but the JetStream KV store can
// only be opened AFTER NATS is up (it needs a JetStream client). The authenticator
// therefore consults the holder instead of a concrete store.
//
// Fail-closed by construction: until the store is set, IsAuthorized denies and
// SubjectACL errors, so any client connecting in the startup window is rejected.
// The only connection expected in that window is membershipd's own internal
// service identity, which the authenticator recognizes by key and lets through
// without consulting the store at all. In the SQLite (default) path the store is
// set before StartServer, so the window does not exist and behavior is identical
// to the pre-0006c baseline.
type storeHolder struct {
mu sync.RWMutex
s membership.Store
}
func (h *storeHolder) set(s membership.Store) {
h.mu.Lock()
h.s = s
h.mu.Unlock()
}
func (h *storeHolder) get() membership.Store {
h.mu.RLock()
defer h.mu.RUnlock()
return h.s
}
// IsAuthorized reports whether signPubHex is an active bus user, denying while the
// store is not yet set (fail closed). It is the predicate the nkey authenticator
// uses for every connecting client.
func (h *storeHolder) IsAuthorized(signPubHex string) bool {
s := h.get()
if s == nil {
return false
}
return s.IsAuthorized(signPubHex)
}
// subjectACL derives the per-subject permissions for signPubHex via the live
// store, erroring (so the caller fails closed and denies the connection) while the
// store is not yet set.
func (h *storeHolder) subjectACL(signPubHex string) ([]string, error) {
s := h.get()
if s == nil {
return nil, fmt.Errorf("control-plane store not ready")
}
return membership.SubjectACLFor(s)(signPubHex)
}
+51
View File
@@ -0,0 +1,51 @@
package main
import (
"encoding/hex"
"path/filepath"
"testing"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/membership"
)
// TestStoreHolderFailClosed: an empty holder denies everything (the bootstrap
// window before the store is set), and starts serving once a store is published.
func TestStoreHolderFailClosed(t *testing.T) {
h := &storeHolder{}
// Empty: deny + error (fail closed).
if h.IsAuthorized("anything") {
t.Fatalf("empty holder must deny IsAuthorized")
}
if _, err := h.subjectACL("anything"); err == nil {
t.Fatalf("empty holder must error from subjectACL (fail closed)")
}
// After set: serves from the real store.
store, err := membership.Open(filepath.Join(t.TempDir(), "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
id, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("identity: %v", err)
}
pub := hex.EncodeToString(id.SignPub)
if err := store.AddUser(pub, "alice", membership.RoleMember); err != nil {
t.Fatalf("add user: %v", err)
}
h.set(store)
if !h.IsAuthorized(pub) {
t.Fatalf("after set, an active user must be authorized")
}
if _, err := h.subjectACL(pub); err != nil {
t.Fatalf("after set, subjectACL must succeed: %v", err)
}
if h.IsAuthorized("deadbeef") {
t.Fatalf("a non-user must not be authorized")
}
}
+1 -1
View File
@@ -18,7 +18,7 @@
"decentralized": {
"enabled": false,
"issue": "0003",
"description": "Control-plane state on replicated JetStream KV instead of local SQLite (branch-by-abstraction membership.Store: sqliteStore default OFF, jetstreamStore ON). The route cluster (0003a) and the KV store (0003b) ship behind this flag; the membershipd boot wiring that selects the KV store completes with the session/reconnect redesign (0003e) and is activated on the multi-node deploy (0003f). OFF keeps the single-node SQLite control plane unchanged.",
"description": "Control-plane state on replicated JetStream KV instead of local SQLite (branch-by-abstraction membership.Store: sqliteStore default, jetstreamStore opt-in). The route cluster (0003a) and the KV store (0003b) shipped behind this flag; the membershipd boot wiring that selects the store is COMPLETE since issue 0006c and is realized at runtime with the server flag --store kv|sqlite (default sqlite). The internal-identity bootstrap (0006a) lets membershipd open the KV store on its own embedded NATS under enforce. Per-deploy opt-in: a node joins the decentralized control plane by starting with --store kv (and --cluster-name for HA). OFF (--store sqlite) keeps the single-node SQLite control plane unchanged.",
"added": "2026-06-07",
"enabled_at": null
}