diff --git a/cmd/membershipd/attack0008_test.go b/cmd/membershipd/attack0008_test.go new file mode 100644 index 0000000..4be65ec --- /dev/null +++ b/cmd/membershipd/attack0008_test.go @@ -0,0 +1,221 @@ +package main + +// Regression for audit report 0008, vector N3: the binary must wire the +// replicated nonce store on a clustered node so a signed request accepted on one +// node cannot be replayed to another. The auditor's ephemeral attack showed the +// OLD binary never called UseReplicatedNonces (each node kept a per-process +// cache), so a captured request replayed to a second node with 200+200. These +// tests drive the SAME helper the binary uses (wireReplicatedNonces) so they +// prove the WIRING, not just the underlying API. + +import ( + "bytes" + "crypto/rand" + "encoding/base64" + "encoding/hex" + "io" + "net" + "net/http" + "net/http/httptest" + "path/filepath" + "strconv" + "testing" + "time" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/blobstore" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/enmanuel/unibus/pkg/membership" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +func freePort(t *testing.T) int { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("free port: %v", err) + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port +} + +// signed008 builds a transport-signed control-plane request with a caller-chosen +// ts+nonce, so a test can reuse the exact same signed bytes against two nodes to +// exercise replay. +func signed008(t *testing.T, baseURL, method, path string, body []byte, id cs.Identity, ts int64, nonce string) *http.Request { + t.Helper() + canonical := membership.CanonicalRequest(method, path, strconv.FormatInt(ts, 10), nonce, body) + sig := cs.SignEd25519(id.SignPriv, canonical) + var rdr io.Reader + if body != nil { + rdr = bytes.NewReader(body) + } + req, err := http.NewRequest(method, baseURL+path, rdr) + if err != nil { + t.Fatalf("new request: %v", err) + } + req.Header.Set("X-Unibus-Pub", hex.EncodeToString(id.SignPub)) + req.Header.Set("X-Unibus-Ts", strconv.FormatInt(ts, 10)) + req.Header.Set("X-Unibus-Nonce", nonce) + req.Header.Set("X-Unibus-Sig", base64.StdEncoding.EncodeToString(sig)) + return req +} + +func randNonce(t *testing.T) string { + t.Helper() + raw := make([]byte, 16) + if _, err := rand.Read(raw); err != nil { + t.Fatalf("nonce: %v", err) + } + return base64.StdEncoding.EncodeToString(raw) +} + +// TestAttack0008_N3 is the blocker regression: two clustered membershipd nodes +// wired through wireReplicatedNonces share a JetStream KV nonce bucket, so a +// request accepted on node A is rejected (401) when replayed to node B. Before +// the fix the binary never wired this and the replay returned 200. +func TestAttack0008_N3(t *testing.T) { + // One NATS+JetStream backing the shared nonce bucket (no client auth needed: + // the test drives the membership.Server's nonce store directly via HTTP). + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t), + }) + if err != nil { + t.Fatalf("nats: %v", err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + nc, err := nats.Connect(ns.ClientURL()) + if err != nil { + t.Fatalf("connect: %v", err) + } + t.Cleanup(nc.Close) + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("jetstream: %v", err) + } + + // Shared control-plane state (stand-in for the replicated store) + two nodes. + dir := t.TempDir() + store, err := membership.Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + alice, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("identity: %v", err) + } + if err := store.AddUser(hex.EncodeToString(alice.SignPub), "alice", membership.RoleAdmin); err != nil { + t.Fatalf("add alice: %v", err) + } + blobs, _ := blobstore.New(filepath.Join(dir, "blobs")) + + // Each node is wired EXACTLY as the binary wires a clustered node. + mkNode := func() *httptest.Server { + srv := membership.NewServer(store, blobs, membership.AuthEnforce) + if err := wireReplicatedNonces(srv, js, true /*clustered*/, 1); err != nil { + t.Fatalf("wireReplicatedNonces: %v", err) + } + return httptest.NewServer(srv) + } + nodeA := mkNode() + t.Cleanup(nodeA.Close) + nodeB := mkNode() + t.Cleanup(nodeB.Close) + + ts := time.Now().Unix() + nonce := randNonce(t) + path := "/members/" + frame.EndpointID(alice.SignPub) + "/rooms" + + // Golden: alice's signed request is accepted on node A. + respA, err := http.DefaultClient.Do(signed008(t, nodeA.URL, "GET", path, nil, alice, ts, nonce)) + if err != nil { + t.Fatalf("do A: %v", err) + } + respA.Body.Close() + if respA.StatusCode != http.StatusOK { + t.Fatalf("node A first use: status %d, want 200", respA.StatusCode) + } + + // Error path (the attack): replay the SAME signed bytes to node B → 401. + respB, err := http.DefaultClient.Do(signed008(t, nodeB.URL, "GET", path, nil, alice, ts, nonce)) + if err != nil { + t.Fatalf("do B: %v", err) + } + respB.Body.Close() + if respB.StatusCode != http.StatusUnauthorized { + t.Fatalf("cross-node replay to node B: status %d, want 401 (replayed nonce must be rejected)", respB.StatusCode) + } +} + +// TestAttack0008_N3_StandaloneKeepsLocalCache is the edge: a NON-clustered node +// must NOT require JetStream — wireReplicatedNonces is a no-op and the node keeps +// its in-memory cache, which still rejects a same-node replay (the single-node +// guarantee is unchanged). This proves the fix does not add a JetStream +// dependency to standalone deployments. +func TestAttack0008_N3_StandaloneKeepsLocalCache(t *testing.T) { + dir := t.TempDir() + store, err := membership.Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + alice, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("identity: %v", err) + } + if err := store.AddUser(hex.EncodeToString(alice.SignPub), "alice", membership.RoleAdmin); err != nil { + t.Fatalf("add alice: %v", err) + } + blobs, _ := blobstore.New(filepath.Join(dir, "blobs")) + + srv := membership.NewServer(store, blobs, membership.AuthEnforce) + // Standalone: clustered=false, js=nil. Must succeed (no JetStream needed). + if err := wireReplicatedNonces(srv, nil, false /*clustered*/, 1); err != nil { + t.Fatalf("standalone wireReplicatedNonces must be a no-op, got: %v", err) + } + node := httptest.NewServer(srv) + t.Cleanup(node.Close) + + ts := time.Now().Unix() + nonce := randNonce(t) + path := "/members/" + frame.EndpointID(alice.SignPub) + "/rooms" + + resp1, err := http.DefaultClient.Do(signed008(t, node.URL, "GET", path, nil, alice, ts, nonce)) + if err != nil { + t.Fatalf("do 1: %v", err) + } + resp1.Body.Close() + if resp1.StatusCode != http.StatusOK { + t.Fatalf("first use: status %d, want 200", resp1.StatusCode) + } + // Same-node replay is still rejected by the in-memory cache. + resp2, err := http.DefaultClient.Do(signed008(t, node.URL, "GET", path, nil, alice, ts, nonce)) + if err != nil { + t.Fatalf("do 2: %v", err) + } + resp2.Body.Close() + if resp2.StatusCode != http.StatusUnauthorized { + t.Fatalf("same-node replay: status %d, want 401", resp2.StatusCode) + } +} + +// TestAttack0008_N3_ClusteredRequiresJetStream proves the hard rule: a clustered +// node with NO JetStream available refuses (error), so the binary fails fast +// instead of silently running with a per-process cache. +func TestAttack0008_N3_ClusteredRequiresJetStream(t *testing.T) { + dir := t.TempDir() + store, err := membership.Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + blobs, _ := blobstore.New(filepath.Join(dir, "blobs")) + srv := membership.NewServer(store, blobs, membership.AuthEnforce) + if err := wireReplicatedNonces(srv, nil, true /*clustered*/, 1); err == nil { + t.Fatalf("clustered node with no JetStream must fail, got nil") + } +} diff --git a/cmd/membershipd/internal_conn.go b/cmd/membershipd/internal_conn.go new file mode 100644 index 0000000..8157b38 --- /dev/null +++ b/cmd/membershipd/internal_conn.go @@ -0,0 +1,84 @@ +package main + +import ( + "fmt" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/busauth" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + + server "github.com/nats-io/nats-server/v2/server" +) + +// connectInternalJS opens a privileged JetStream client from membershipd to its +// OWN embedded NATS server. This is the resolution of the "bootstrap cycle" +// (issue 0006a/c): the service needs JetStream to create the replicated nonce +// bucket and the control-plane KV, but under enforce the data plane only accepts +// allowlisted clients confined to their rooms. The connection therefore +// authenticates with the process's ephemeral internal identity — the identity the +// authenticator was built to recognize (NewNkeyAuthenticatorACLInternal) and +// grant full permissions — without ever appearing in the user allowlist. +// +// It uses the in-process transport (nats.InProcessServer), a Go pipe inside the +// process, so it bypasses TLS entirely: no CA wiring is needed for this +// self-connection even when the public data plane is TLS-only. useNkey mirrors +// whether the embedded server enforces auth: under enforce the internal identity +// presents its nkey; without enforce the server accepts an unauthenticated +// in-process client and the nkey is omitted. +// +// The caller owns the returned connection and must Close it on shutdown (after +// the JetStream context is no longer used). +func connectInternalJS(ns *server.Server, internalID cs.Identity, useNkey bool) (*nats.Conn, jetstream.JetStream, error) { + opts := []nats.Option{ + nats.Name("membershipd-internal"), + nats.InProcessServer(ns), + } + if useNkey { + pub, sign, err := busauth.ClientNkey(internalID.SignPriv) + if err != nil { + return nil, nil, fmt.Errorf("internal nkey: %w", err) + } + opts = append(opts, nats.Nkey(pub, sign)) + } + // The URL is ignored for an in-process connection; the InProcessServer option + // supplies the transport. + nc, err := nats.Connect("", opts...) + if err != nil { + return nil, nil, fmt.Errorf("connect internal nats: %w", err) + } + js, err := jetstream.New(nc) + if err != nil { + nc.Close() + return nil, nil, fmt.Errorf("internal jetstream: %w", err) + } + return nc, js, nil +} + +// connectExternalJS opens a JetStream client to an EXTERNAL NATS the operator +// runs (membershipd started with --nats-url). Unlike the embedded path there is +// no in-process transport and no internal identity: the external server enforces +// its own auth, so membershipd connects as a plain client (optionally TLS-pinned +// to the bus CA). It is best-effort and intended for an operator-managed cluster; +// the standard unibus deploy uses the embedded server (connectInternalJS). +func connectExternalJS(natsURL, caPath string) (*nats.Conn, jetstream.JetStream, error) { + opts := []nats.Option{nats.Name("membershipd-internal")} + if caPath != "" { + tlsCfg, err := busauth.LoadCATLSConfig(caPath) + if err != nil { + return nil, nil, fmt.Errorf("load CA %q: %w", caPath, err) + } + opts = append(opts, nats.Secure(tlsCfg)) + } + nc, err := nats.Connect(natsURL, opts...) + if err != nil { + return nil, nil, fmt.Errorf("connect external nats %q: %w", natsURL, err) + } + js, err := jetstream.New(nc) + if err != nil { + nc.Close() + return nil, nil, fmt.Errorf("external jetstream: %w", err) + } + return nc, js, nil +} diff --git a/cmd/membershipd/internal_conn_test.go b/cmd/membershipd/internal_conn_test.go new file mode 100644 index 0000000..36b4854 --- /dev/null +++ b/cmd/membershipd/internal_conn_test.go @@ -0,0 +1,119 @@ +package main + +// Bootstrap test for issue 0006a/c: under enforce, membershipd must still reach +// JetStream on its OWN embedded server to create the nonce/KV buckets. It does so +// with an ephemeral internal identity the authenticator grants full permissions +// (NewNkeyAuthenticatorACLInternal). These tests prove that privileged +// self-connection works AND that no other identity can claim it. + +import ( + "context" + "encoding/hex" + "net" + "testing" + "time" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/busauth" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +func icFreePort(t *testing.T) int { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("free port: %v", err) + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port +} + +// TestInternalConnPrivilegedUnderEnforce: with an enforce authenticator that +// authorizes NO bus user, the internal identity still connects in-process and has +// full permissions — it creates a KV bucket and round-trips a value. This is the +// resolution of the bootstrap cycle the audit flagged as the reason the KV store +// was never wired. +func TestInternalConnPrivilegedUnderEnforce(t *testing.T) { + internalID, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("internal identity: %v", err) + } + internalPubHex := hex.EncodeToString(internalID.SignPub) + + // Authenticator: no bus user is authorized; only the internal identity passes. + auth := busauth.NewNkeyAuthenticatorACLInternal( + func(string) bool { return false }, + busauth.PermissionsFromSubjects(func(string) ([]string, error) { return []string{"_INBOX.>"}, nil }), + internalPubHex, + ) + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), Host: "127.0.0.1", Port: icFreePort(t), Auth: auth, + }) + if err != nil { + t.Fatalf("nats: %v", err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + + nc, js, err := connectInternalJS(ns, internalID, true /*useNkey*/) + if err != nil { + t.Fatalf("connectInternalJS: %v", err) + } + t.Cleanup(nc.Close) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KV_UNIBUS_test", Replicas: 1}) + if err != nil { + t.Fatalf("internal conn could not create KV bucket (full perms expected): %v", err) + } + if _, err := kv.Put(ctx, "k", []byte("v")); err != nil { + t.Fatalf("kv put: %v", err) + } + e, err := kv.Get(ctx, "k") + if err != nil || string(e.Value()) != "v" { + t.Fatalf("kv get: val=%q err=%v", e, err) + } +} + +// TestInternalConnOutsiderRejected: an identity that is neither the internal one +// nor an allowlisted bus user cannot connect — proving the internal bypass is +// scoped to the exact internal key, not a blanket hole. +func TestInternalConnOutsiderRejected(t *testing.T) { + internalID, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("internal identity: %v", err) + } + auth := busauth.NewNkeyAuthenticatorACLInternal( + func(string) bool { return false }, + busauth.PermissionsFromSubjects(func(string) ([]string, error) { return []string{"_INBOX.>"}, nil }), + hex.EncodeToString(internalID.SignPub), + ) + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), Host: "127.0.0.1", Port: icFreePort(t), Auth: auth, + }) + if err != nil { + t.Fatalf("nats: %v", err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + + outsider, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("outsider identity: %v", err) + } + pub, sign, err := busauth.ClientNkey(outsider.SignPriv) + if err != nil { + t.Fatalf("outsider nkey: %v", err) + } + conn, err := nats.Connect(ns.ClientURL(), + nats.Nkey(pub, sign), + nats.MaxReconnects(0), + nats.Timeout(2*time.Second), + ) + if err == nil { + conn.Close() + t.Fatalf("outsider (unauthorized, non-internal) must be rejected, but connected") + } +} diff --git a/cmd/membershipd/main.go b/cmd/membershipd/main.go index 01727bf..0e54553 100644 --- a/cmd/membershipd/main.go +++ b/cmd/membershipd/main.go @@ -7,6 +7,7 @@ package main import ( "context" "crypto/tls" + "encoding/hex" "flag" "log" "net/http" @@ -15,6 +16,10 @@ import ( "syscall" "time" + cs "fn-registry/functions/cybersecurity" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" server "github.com/nats-io/nats-server/v2/server" "github.com/enmanuel/unibus/pkg/blobstore" @@ -62,6 +67,12 @@ func main() { routeTLSCert = flag.String("route-tls-cert", "", "this node's route certificate (CA-signed); enables mutual route TLS with --route-tls-key/--route-tls-ca") routeTLSKey = flag.String("route-tls-key", "", "this node's route private key") routeTLSCA = flag.String("route-tls-ca", "", "bus CA that signs every node's route certificate (deploy/tls/ca.crt)") + // Replicated control plane (issue 0006a/c): the JetStream replication factor + // for the shared nonce bucket (and, with --store kv, the control-plane KV). + // 1 for a 1-2 node rollout, 3 for real HA quorum (raise in place with + // `nats stream update --replicas 3` when the third node joins). + kvReplicas = flag.Int("kv-replicas", 1, "JetStream replication factor for the shared nonce/KV buckets (1..3)") + caFile = flag.String("ca", "", "bus CA cert; only used to pin TLS on the internal JetStream connection to an EXTERNAL --nats-url (the embedded server uses an in-process connection that needs no CA)") ) flag.Parse() @@ -85,6 +96,31 @@ func main() { log.SetFlags(log.LstdFlags | log.Lmsgprefix) log.SetPrefix("[membershipd] ") + // A clustered node shares its control plane with peers, so it needs a JetStream + // client to manage the replicated nonce bucket (issue 0006a). needJS will also + // be true under --store kv (issue 0006c), where the control-plane state lives in + // JetStream KV. A standalone single-node deployment needs none of this and keeps + // the in-process, in-memory behavior unchanged. + clustered := *clusterName != "" + needJS := clustered + enforce := authMode == membership.AuthEnforce + + // Internal service identity (issue 0006a): when the embedded data plane enforces + // auth, membershipd must still connect to its OWN server to manage JetStream. + // It does so with this ephemeral identity, which the authenticator is built to + // recognize and grant full permissions (it never enters the user allowlist). It + // is only generated when actually needed (JetStream required AND enforce on AND + // the server is embedded), so a standalone or non-enforce node is unchanged. + var internalID cs.Identity + var internalPubHex string + if needJS && enforce && *natsURL == "" { + internalID, err = cs.GenerateIdentity() + if err != nil { + log.Fatalf("generate internal identity: %v", err) + } + internalPubHex = hex.EncodeToString(internalID.SignPub) + } + // Control plane store first: the NATS authenticator consults IsAuthorized, so // the store must exist before the embedded server starts. store, err := membership.Open(*dbPath) @@ -145,9 +181,10 @@ func main() { // Subscribe(">") and harvest every room's subject and JetStream activity. // NATS freezes permissions at connect time, so a peer that joins a room // after connecting must client.RefreshSession to gain that room's subject. - cfg.Auth = busauth.NewNkeyAuthenticatorACL( + cfg.Auth = busauth.NewNkeyAuthenticatorACLInternal( store.IsAuthorized, busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)), + internalPubHex, ) log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)") } @@ -181,6 +218,34 @@ func main() { srv.RequireEncryptedRooms = true log.Printf("cleartext rooms: DISABLED (public bind requires end-to-end encryption)") } + + // Replicated anti-replay (issue 0006a, audit 0008 N3): a clustered node MUST + // share its nonce store across the cluster via JetStream KV, or a request + // accepted on one node can be replayed to another. Open a privileged JetStream + // client (in-process for the embedded server, a plain client for an external + // NATS) and wire the shared nonce bucket. This is a HARD requirement: if the + // bucket cannot be created the node refuses to start rather than run with a + // per-process cache that leaves the replay hole open. + if needJS { + var ( + internalNC *nats.Conn + js jetstream.JetStream + ) + if *natsURL == "" { + internalNC, js, err = connectInternalJS(ns, internalID, enforce) + } else { + internalNC, js, err = connectExternalJS(natsClientURL, *caFile) + } + if err != nil { + log.Fatalf("internal JetStream connection (required by --cluster-name): %v", err) + } + defer internalNC.Close() + if err := wireReplicatedNonces(srv, js, clustered, *kvReplicas); err != nil { + log.Fatalf("%v", err) + } + log.Printf("anti-replay: replicated nonce bucket %q (replicas=%d) — cluster-safe", "KV_UNIBUS_nonces", *kvReplicas) + } + log.Printf("control-plane auth: %s", authMode) addr := *bind + ":" + *httpPort httpSrv := &http.Server{ diff --git a/cmd/membershipd/wiring.go b/cmd/membershipd/wiring.go new file mode 100644 index 0000000..99f00d1 --- /dev/null +++ b/cmd/membershipd/wiring.go @@ -0,0 +1,40 @@ +package main + +import ( + "fmt" + + "github.com/enmanuel/unibus/pkg/membership" + "github.com/nats-io/nats.go/jetstream" +) + +// wireReplicatedNonces applies the cluster anti-replay policy to srv. It is the +// single piece of wiring the binary uses to decide whether a node must share its +// nonce store, extracted so a regression test exercises the EXACT decision the +// running binary makes (issue 0006a, audit 0008 N3). +// +// Policy: +// - A clustered node (clustered == true) MUST use the shared JetStream KV nonce +// bucket. Every node sees the same bucket, so a request accepted on one node +// cannot be replayed to another whose per-process cache never saw the nonce. +// A missing JetStream context, or a failure to create the bucket, is a FATAL +// configuration error returned to the caller — a clustered node running with a +// per-process nonce cache is precisely the replay hole the audit flagged, so +// it must refuse to start rather than serve insecurely. +// - A standalone node (clustered == false) keeps the in-memory cache that +// NewServer installed: there is no second node to replay to, so the shared +// bucket would only add a JetStream dependency for no security gain. +// +// replicas is the nonce bucket's replication factor (R1..R3). Returns nil when no +// action is required (standalone). +func wireReplicatedNonces(srv *membership.Server, js jetstream.JetStream, clustered bool, replicas int) error { + if !clustered { + return nil // standalone: the in-memory nonce cache is sufficient and safe + } + if js == nil { + return fmt.Errorf("clustered node requires JetStream for the shared nonce bucket, but none is available") + } + if err := srv.UseReplicatedNonces(js, replicas); err != nil { + return fmt.Errorf("replicated nonces: %w", err) + } + return nil +} diff --git a/pkg/busauth/authenticator.go b/pkg/busauth/authenticator.go index ddd1726..b816a6d 100644 --- a/pkg/busauth/authenticator.go +++ b/pkg/busauth/authenticator.go @@ -82,6 +82,15 @@ type PermissionsFunc func(signPubHex string) (*server.Permissions, error) type nkeyAuthenticatorACL struct { isAuthorized func(signPubHex string) bool perms PermissionsFunc + // internalPubHex is the lowercase-hex Ed25519 public key of membershipd's own + // ephemeral internal service identity. A connection that proves that key is + // granted full permissions WITHOUT consulting the allowlist, so the service can + // bootstrap and manage JetStream (the replicated nonce bucket and, when + // decentralized, the control-plane KV buckets) against its own embedded server + // even while the data plane confines every client to its rooms. Empty disables + // the internal-identity path entirely (behavior identical to a plain ACL + // authenticator). + internalPubHex string } // NewNkeyAuthenticatorACL builds an authenticator that authorizes by the bus @@ -94,6 +103,29 @@ func NewNkeyAuthenticatorACL(isAuthorized func(signPubHex string) bool, perms Pe return &nkeyAuthenticatorACL{isAuthorized: isAuthorized, perms: perms} } +// NewNkeyAuthenticatorACLInternal is NewNkeyAuthenticatorACL that also recognizes +// membershipd's internal service identity (internalPubHex, the lowercase hex of +// its ephemeral Ed25519 public key): a connection proving that key is granted +// full permissions without an allowlist lookup, so the service can create and +// manage JetStream against its own embedded server under enforce (issue 0006a/c — +// the replicated nonce bucket and the control-plane KV). Every other identity +// goes through the allowlist + per-subject ACL unchanged. An empty internalPubHex +// is identical to NewNkeyAuthenticatorACL, so this is a superset and safe to use +// everywhere the plain constructor was used. +func NewNkeyAuthenticatorACLInternal(isAuthorized func(signPubHex string) bool, perms PermissionsFunc, internalPubHex string) server.Authentication { + return &nkeyAuthenticatorACL{isAuthorized: isAuthorized, perms: perms, internalPubHex: internalPubHex} +} + +// fullPermissions grants publish and subscribe on every subject (">"). It is the +// permission set for membershipd's own internal service connection, which must +// manage the JetStream control plane (nonce bucket + KV buckets) over NATS. It is +// NEVER granted to a bus user — only to the process's own ephemeral internal +// identity, recognized by exact public-key match in Check. +func fullPermissions() *server.Permissions { + sp := &server.SubjectPermission{Allow: []string{">"}} + return &server.Permissions{Publish: sp, Subscribe: sp} +} + // Check verifies the nkey, authorizes against the allowlist, then derives and // registers the connection's subject permissions. A permissions-derivation // error denies the connection (fail closed) rather than granting open access. @@ -102,6 +134,14 @@ func (a *nkeyAuthenticatorACL) Check(c server.ClientAuthentication) bool { if !ok { return false } + // membershipd's own internal service identity bypasses the allowlist and is + // granted full permissions so the service can bootstrap JetStream under + // enforce. The key is matched exactly against the cryptographically verified + // connecting key, so no other identity can claim these permissions. + if a.internalPubHex != "" && signPubHex == a.internalPubHex { + c.RegisterUser(&server.User{Permissions: fullPermissions()}) + return true + } if !a.isAuthorized(signPubHex) { return false }