Merge issue/0006a-replicated-nonce: wire replicated nonce store (audit 0008 N3)
This commit is contained in:
@@ -0,0 +1,221 @@
|
||||
package main
|
||||
|
||||
// Regression for audit report 0008, vector N3: the binary must wire the
|
||||
// replicated nonce store on a clustered node so a signed request accepted on one
|
||||
// node cannot be replayed to another. The auditor's ephemeral attack showed the
|
||||
// OLD binary never called UseReplicatedNonces (each node kept a per-process
|
||||
// cache), so a captured request replayed to a second node with 200+200. These
|
||||
// tests drive the SAME helper the binary uses (wireReplicatedNonces) so they
|
||||
// prove the WIRING, not just the underlying API.
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
func freePort(t *testing.T) int {
|
||||
t.Helper()
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("free port: %v", err)
|
||||
}
|
||||
defer l.Close()
|
||||
return l.Addr().(*net.TCPAddr).Port
|
||||
}
|
||||
|
||||
// signed008 builds a transport-signed control-plane request with a caller-chosen
|
||||
// ts+nonce, so a test can reuse the exact same signed bytes against two nodes to
|
||||
// exercise replay.
|
||||
func signed008(t *testing.T, baseURL, method, path string, body []byte, id cs.Identity, ts int64, nonce string) *http.Request {
|
||||
t.Helper()
|
||||
canonical := membership.CanonicalRequest(method, path, strconv.FormatInt(ts, 10), nonce, body)
|
||||
sig := cs.SignEd25519(id.SignPriv, canonical)
|
||||
var rdr io.Reader
|
||||
if body != nil {
|
||||
rdr = bytes.NewReader(body)
|
||||
}
|
||||
req, err := http.NewRequest(method, baseURL+path, rdr)
|
||||
if err != nil {
|
||||
t.Fatalf("new request: %v", err)
|
||||
}
|
||||
req.Header.Set("X-Unibus-Pub", hex.EncodeToString(id.SignPub))
|
||||
req.Header.Set("X-Unibus-Ts", strconv.FormatInt(ts, 10))
|
||||
req.Header.Set("X-Unibus-Nonce", nonce)
|
||||
req.Header.Set("X-Unibus-Sig", base64.StdEncoding.EncodeToString(sig))
|
||||
return req
|
||||
}
|
||||
|
||||
func randNonce(t *testing.T) string {
|
||||
t.Helper()
|
||||
raw := make([]byte, 16)
|
||||
if _, err := rand.Read(raw); err != nil {
|
||||
t.Fatalf("nonce: %v", err)
|
||||
}
|
||||
return base64.StdEncoding.EncodeToString(raw)
|
||||
}
|
||||
|
||||
// TestAttack0008_N3 is the blocker regression: two clustered membershipd nodes
|
||||
// wired through wireReplicatedNonces share a JetStream KV nonce bucket, so a
|
||||
// request accepted on node A is rejected (401) when replayed to node B. Before
|
||||
// the fix the binary never wired this and the replay returned 200.
|
||||
func TestAttack0008_N3(t *testing.T) {
|
||||
// One NATS+JetStream backing the shared nonce bucket (no client auth needed:
|
||||
// the test drives the membership.Server's nonce store directly via HTTP).
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("nats: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||
nc, err := nats.Connect(ns.ClientURL())
|
||||
if err != nil {
|
||||
t.Fatalf("connect: %v", err)
|
||||
}
|
||||
t.Cleanup(nc.Close)
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
t.Fatalf("jetstream: %v", err)
|
||||
}
|
||||
|
||||
// Shared control-plane state (stand-in for the replicated store) + two nodes.
|
||||
dir := t.TempDir()
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
alice, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("identity: %v", err)
|
||||
}
|
||||
if err := store.AddUser(hex.EncodeToString(alice.SignPub), "alice", membership.RoleAdmin); err != nil {
|
||||
t.Fatalf("add alice: %v", err)
|
||||
}
|
||||
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
|
||||
// Each node is wired EXACTLY as the binary wires a clustered node.
|
||||
mkNode := func() *httptest.Server {
|
||||
srv := membership.NewServer(store, blobs, membership.AuthEnforce)
|
||||
if err := wireReplicatedNonces(srv, js, true /*clustered*/, 1); err != nil {
|
||||
t.Fatalf("wireReplicatedNonces: %v", err)
|
||||
}
|
||||
return httptest.NewServer(srv)
|
||||
}
|
||||
nodeA := mkNode()
|
||||
t.Cleanup(nodeA.Close)
|
||||
nodeB := mkNode()
|
||||
t.Cleanup(nodeB.Close)
|
||||
|
||||
ts := time.Now().Unix()
|
||||
nonce := randNonce(t)
|
||||
path := "/members/" + frame.EndpointID(alice.SignPub) + "/rooms"
|
||||
|
||||
// Golden: alice's signed request is accepted on node A.
|
||||
respA, err := http.DefaultClient.Do(signed008(t, nodeA.URL, "GET", path, nil, alice, ts, nonce))
|
||||
if err != nil {
|
||||
t.Fatalf("do A: %v", err)
|
||||
}
|
||||
respA.Body.Close()
|
||||
if respA.StatusCode != http.StatusOK {
|
||||
t.Fatalf("node A first use: status %d, want 200", respA.StatusCode)
|
||||
}
|
||||
|
||||
// Error path (the attack): replay the SAME signed bytes to node B → 401.
|
||||
respB, err := http.DefaultClient.Do(signed008(t, nodeB.URL, "GET", path, nil, alice, ts, nonce))
|
||||
if err != nil {
|
||||
t.Fatalf("do B: %v", err)
|
||||
}
|
||||
respB.Body.Close()
|
||||
if respB.StatusCode != http.StatusUnauthorized {
|
||||
t.Fatalf("cross-node replay to node B: status %d, want 401 (replayed nonce must be rejected)", respB.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAttack0008_N3_StandaloneKeepsLocalCache is the edge: a NON-clustered node
|
||||
// must NOT require JetStream — wireReplicatedNonces is a no-op and the node keeps
|
||||
// its in-memory cache, which still rejects a same-node replay (the single-node
|
||||
// guarantee is unchanged). This proves the fix does not add a JetStream
|
||||
// dependency to standalone deployments.
|
||||
func TestAttack0008_N3_StandaloneKeepsLocalCache(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
alice, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("identity: %v", err)
|
||||
}
|
||||
if err := store.AddUser(hex.EncodeToString(alice.SignPub), "alice", membership.RoleAdmin); err != nil {
|
||||
t.Fatalf("add alice: %v", err)
|
||||
}
|
||||
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
|
||||
srv := membership.NewServer(store, blobs, membership.AuthEnforce)
|
||||
// Standalone: clustered=false, js=nil. Must succeed (no JetStream needed).
|
||||
if err := wireReplicatedNonces(srv, nil, false /*clustered*/, 1); err != nil {
|
||||
t.Fatalf("standalone wireReplicatedNonces must be a no-op, got: %v", err)
|
||||
}
|
||||
node := httptest.NewServer(srv)
|
||||
t.Cleanup(node.Close)
|
||||
|
||||
ts := time.Now().Unix()
|
||||
nonce := randNonce(t)
|
||||
path := "/members/" + frame.EndpointID(alice.SignPub) + "/rooms"
|
||||
|
||||
resp1, err := http.DefaultClient.Do(signed008(t, node.URL, "GET", path, nil, alice, ts, nonce))
|
||||
if err != nil {
|
||||
t.Fatalf("do 1: %v", err)
|
||||
}
|
||||
resp1.Body.Close()
|
||||
if resp1.StatusCode != http.StatusOK {
|
||||
t.Fatalf("first use: status %d, want 200", resp1.StatusCode)
|
||||
}
|
||||
// Same-node replay is still rejected by the in-memory cache.
|
||||
resp2, err := http.DefaultClient.Do(signed008(t, node.URL, "GET", path, nil, alice, ts, nonce))
|
||||
if err != nil {
|
||||
t.Fatalf("do 2: %v", err)
|
||||
}
|
||||
resp2.Body.Close()
|
||||
if resp2.StatusCode != http.StatusUnauthorized {
|
||||
t.Fatalf("same-node replay: status %d, want 401", resp2.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAttack0008_N3_ClusteredRequiresJetStream proves the hard rule: a clustered
|
||||
// node with NO JetStream available refuses (error), so the binary fails fast
|
||||
// instead of silently running with a per-process cache.
|
||||
func TestAttack0008_N3_ClusteredRequiresJetStream(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
srv := membership.NewServer(store, blobs, membership.AuthEnforce)
|
||||
if err := wireReplicatedNonces(srv, nil, true /*clustered*/, 1); err == nil {
|
||||
t.Fatalf("clustered node with no JetStream must fail, got nil")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,84 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
|
||||
server "github.com/nats-io/nats-server/v2/server"
|
||||
)
|
||||
|
||||
// connectInternalJS opens a privileged JetStream client from membershipd to its
|
||||
// OWN embedded NATS server. This is the resolution of the "bootstrap cycle"
|
||||
// (issue 0006a/c): the service needs JetStream to create the replicated nonce
|
||||
// bucket and the control-plane KV, but under enforce the data plane only accepts
|
||||
// allowlisted clients confined to their rooms. The connection therefore
|
||||
// authenticates with the process's ephemeral internal identity — the identity the
|
||||
// authenticator was built to recognize (NewNkeyAuthenticatorACLInternal) and
|
||||
// grant full permissions — without ever appearing in the user allowlist.
|
||||
//
|
||||
// It uses the in-process transport (nats.InProcessServer), a Go pipe inside the
|
||||
// process, so it bypasses TLS entirely: no CA wiring is needed for this
|
||||
// self-connection even when the public data plane is TLS-only. useNkey mirrors
|
||||
// whether the embedded server enforces auth: under enforce the internal identity
|
||||
// presents its nkey; without enforce the server accepts an unauthenticated
|
||||
// in-process client and the nkey is omitted.
|
||||
//
|
||||
// The caller owns the returned connection and must Close it on shutdown (after
|
||||
// the JetStream context is no longer used).
|
||||
func connectInternalJS(ns *server.Server, internalID cs.Identity, useNkey bool) (*nats.Conn, jetstream.JetStream, error) {
|
||||
opts := []nats.Option{
|
||||
nats.Name("membershipd-internal"),
|
||||
nats.InProcessServer(ns),
|
||||
}
|
||||
if useNkey {
|
||||
pub, sign, err := busauth.ClientNkey(internalID.SignPriv)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("internal nkey: %w", err)
|
||||
}
|
||||
opts = append(opts, nats.Nkey(pub, sign))
|
||||
}
|
||||
// The URL is ignored for an in-process connection; the InProcessServer option
|
||||
// supplies the transport.
|
||||
nc, err := nats.Connect("", opts...)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("connect internal nats: %w", err)
|
||||
}
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
return nil, nil, fmt.Errorf("internal jetstream: %w", err)
|
||||
}
|
||||
return nc, js, nil
|
||||
}
|
||||
|
||||
// connectExternalJS opens a JetStream client to an EXTERNAL NATS the operator
|
||||
// runs (membershipd started with --nats-url). Unlike the embedded path there is
|
||||
// no in-process transport and no internal identity: the external server enforces
|
||||
// its own auth, so membershipd connects as a plain client (optionally TLS-pinned
|
||||
// to the bus CA). It is best-effort and intended for an operator-managed cluster;
|
||||
// the standard unibus deploy uses the embedded server (connectInternalJS).
|
||||
func connectExternalJS(natsURL, caPath string) (*nats.Conn, jetstream.JetStream, error) {
|
||||
opts := []nats.Option{nats.Name("membershipd-internal")}
|
||||
if caPath != "" {
|
||||
tlsCfg, err := busauth.LoadCATLSConfig(caPath)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("load CA %q: %w", caPath, err)
|
||||
}
|
||||
opts = append(opts, nats.Secure(tlsCfg))
|
||||
}
|
||||
nc, err := nats.Connect(natsURL, opts...)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("connect external nats %q: %w", natsURL, err)
|
||||
}
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
return nil, nil, fmt.Errorf("external jetstream: %w", err)
|
||||
}
|
||||
return nc, js, nil
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
package main
|
||||
|
||||
// Bootstrap test for issue 0006a/c: under enforce, membershipd must still reach
|
||||
// JetStream on its OWN embedded server to create the nonce/KV buckets. It does so
|
||||
// with an ephemeral internal identity the authenticator grants full permissions
|
||||
// (NewNkeyAuthenticatorACLInternal). These tests prove that privileged
|
||||
// self-connection works AND that no other identity can claim it.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
func icFreePort(t *testing.T) int {
|
||||
t.Helper()
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("free port: %v", err)
|
||||
}
|
||||
defer l.Close()
|
||||
return l.Addr().(*net.TCPAddr).Port
|
||||
}
|
||||
|
||||
// TestInternalConnPrivilegedUnderEnforce: with an enforce authenticator that
|
||||
// authorizes NO bus user, the internal identity still connects in-process and has
|
||||
// full permissions — it creates a KV bucket and round-trips a value. This is the
|
||||
// resolution of the bootstrap cycle the audit flagged as the reason the KV store
|
||||
// was never wired.
|
||||
func TestInternalConnPrivilegedUnderEnforce(t *testing.T) {
|
||||
internalID, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("internal identity: %v", err)
|
||||
}
|
||||
internalPubHex := hex.EncodeToString(internalID.SignPub)
|
||||
|
||||
// Authenticator: no bus user is authorized; only the internal identity passes.
|
||||
auth := busauth.NewNkeyAuthenticatorACLInternal(
|
||||
func(string) bool { return false },
|
||||
busauth.PermissionsFromSubjects(func(string) ([]string, error) { return []string{"_INBOX.>"}, nil }),
|
||||
internalPubHex,
|
||||
)
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: icFreePort(t), Auth: auth,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("nats: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||
|
||||
nc, js, err := connectInternalJS(ns, internalID, true /*useNkey*/)
|
||||
if err != nil {
|
||||
t.Fatalf("connectInternalJS: %v", err)
|
||||
}
|
||||
t.Cleanup(nc.Close)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KV_UNIBUS_test", Replicas: 1})
|
||||
if err != nil {
|
||||
t.Fatalf("internal conn could not create KV bucket (full perms expected): %v", err)
|
||||
}
|
||||
if _, err := kv.Put(ctx, "k", []byte("v")); err != nil {
|
||||
t.Fatalf("kv put: %v", err)
|
||||
}
|
||||
e, err := kv.Get(ctx, "k")
|
||||
if err != nil || string(e.Value()) != "v" {
|
||||
t.Fatalf("kv get: val=%q err=%v", e, err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestInternalConnOutsiderRejected: an identity that is neither the internal one
|
||||
// nor an allowlisted bus user cannot connect — proving the internal bypass is
|
||||
// scoped to the exact internal key, not a blanket hole.
|
||||
func TestInternalConnOutsiderRejected(t *testing.T) {
|
||||
internalID, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("internal identity: %v", err)
|
||||
}
|
||||
auth := busauth.NewNkeyAuthenticatorACLInternal(
|
||||
func(string) bool { return false },
|
||||
busauth.PermissionsFromSubjects(func(string) ([]string, error) { return []string{"_INBOX.>"}, nil }),
|
||||
hex.EncodeToString(internalID.SignPub),
|
||||
)
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: icFreePort(t), Auth: auth,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("nats: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||
|
||||
outsider, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("outsider identity: %v", err)
|
||||
}
|
||||
pub, sign, err := busauth.ClientNkey(outsider.SignPriv)
|
||||
if err != nil {
|
||||
t.Fatalf("outsider nkey: %v", err)
|
||||
}
|
||||
conn, err := nats.Connect(ns.ClientURL(),
|
||||
nats.Nkey(pub, sign),
|
||||
nats.MaxReconnects(0),
|
||||
nats.Timeout(2*time.Second),
|
||||
)
|
||||
if err == nil {
|
||||
conn.Close()
|
||||
t.Fatalf("outsider (unauthorized, non-internal) must be rejected, but connected")
|
||||
}
|
||||
}
|
||||
+66
-1
@@ -7,6 +7,7 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/hex"
|
||||
"flag"
|
||||
"log"
|
||||
"net/http"
|
||||
@@ -15,6 +16,10 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
server "github.com/nats-io/nats-server/v2/server"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
@@ -62,6 +67,12 @@ func main() {
|
||||
routeTLSCert = flag.String("route-tls-cert", "", "this node's route certificate (CA-signed); enables mutual route TLS with --route-tls-key/--route-tls-ca")
|
||||
routeTLSKey = flag.String("route-tls-key", "", "this node's route private key")
|
||||
routeTLSCA = flag.String("route-tls-ca", "", "bus CA that signs every node's route certificate (deploy/tls/ca.crt)")
|
||||
// Replicated control plane (issue 0006a/c): the JetStream replication factor
|
||||
// for the shared nonce bucket (and, with --store kv, the control-plane KV).
|
||||
// 1 for a 1-2 node rollout, 3 for real HA quorum (raise in place with
|
||||
// `nats stream update --replicas 3` when the third node joins).
|
||||
kvReplicas = flag.Int("kv-replicas", 1, "JetStream replication factor for the shared nonce/KV buckets (1..3)")
|
||||
caFile = flag.String("ca", "", "bus CA cert; only used to pin TLS on the internal JetStream connection to an EXTERNAL --nats-url (the embedded server uses an in-process connection that needs no CA)")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
@@ -85,6 +96,31 @@ func main() {
|
||||
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
|
||||
log.SetPrefix("[membershipd] ")
|
||||
|
||||
// A clustered node shares its control plane with peers, so it needs a JetStream
|
||||
// client to manage the replicated nonce bucket (issue 0006a). needJS will also
|
||||
// be true under --store kv (issue 0006c), where the control-plane state lives in
|
||||
// JetStream KV. A standalone single-node deployment needs none of this and keeps
|
||||
// the in-process, in-memory behavior unchanged.
|
||||
clustered := *clusterName != ""
|
||||
needJS := clustered
|
||||
enforce := authMode == membership.AuthEnforce
|
||||
|
||||
// Internal service identity (issue 0006a): when the embedded data plane enforces
|
||||
// auth, membershipd must still connect to its OWN server to manage JetStream.
|
||||
// It does so with this ephemeral identity, which the authenticator is built to
|
||||
// recognize and grant full permissions (it never enters the user allowlist). It
|
||||
// is only generated when actually needed (JetStream required AND enforce on AND
|
||||
// the server is embedded), so a standalone or non-enforce node is unchanged.
|
||||
var internalID cs.Identity
|
||||
var internalPubHex string
|
||||
if needJS && enforce && *natsURL == "" {
|
||||
internalID, err = cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
log.Fatalf("generate internal identity: %v", err)
|
||||
}
|
||||
internalPubHex = hex.EncodeToString(internalID.SignPub)
|
||||
}
|
||||
|
||||
// Control plane store first: the NATS authenticator consults IsAuthorized, so
|
||||
// the store must exist before the embedded server starts.
|
||||
store, err := membership.Open(*dbPath)
|
||||
@@ -145,9 +181,10 @@ func main() {
|
||||
// Subscribe(">") and harvest every room's subject and JetStream activity.
|
||||
// NATS freezes permissions at connect time, so a peer that joins a room
|
||||
// after connecting must client.RefreshSession to gain that room's subject.
|
||||
cfg.Auth = busauth.NewNkeyAuthenticatorACL(
|
||||
cfg.Auth = busauth.NewNkeyAuthenticatorACLInternal(
|
||||
store.IsAuthorized,
|
||||
busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)),
|
||||
internalPubHex,
|
||||
)
|
||||
log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)")
|
||||
}
|
||||
@@ -181,6 +218,34 @@ func main() {
|
||||
srv.RequireEncryptedRooms = true
|
||||
log.Printf("cleartext rooms: DISABLED (public bind requires end-to-end encryption)")
|
||||
}
|
||||
|
||||
// Replicated anti-replay (issue 0006a, audit 0008 N3): a clustered node MUST
|
||||
// share its nonce store across the cluster via JetStream KV, or a request
|
||||
// accepted on one node can be replayed to another. Open a privileged JetStream
|
||||
// client (in-process for the embedded server, a plain client for an external
|
||||
// NATS) and wire the shared nonce bucket. This is a HARD requirement: if the
|
||||
// bucket cannot be created the node refuses to start rather than run with a
|
||||
// per-process cache that leaves the replay hole open.
|
||||
if needJS {
|
||||
var (
|
||||
internalNC *nats.Conn
|
||||
js jetstream.JetStream
|
||||
)
|
||||
if *natsURL == "" {
|
||||
internalNC, js, err = connectInternalJS(ns, internalID, enforce)
|
||||
} else {
|
||||
internalNC, js, err = connectExternalJS(natsClientURL, *caFile)
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatalf("internal JetStream connection (required by --cluster-name): %v", err)
|
||||
}
|
||||
defer internalNC.Close()
|
||||
if err := wireReplicatedNonces(srv, js, clustered, *kvReplicas); err != nil {
|
||||
log.Fatalf("%v", err)
|
||||
}
|
||||
log.Printf("anti-replay: replicated nonce bucket %q (replicas=%d) — cluster-safe", "KV_UNIBUS_nonces", *kvReplicas)
|
||||
}
|
||||
|
||||
log.Printf("control-plane auth: %s", authMode)
|
||||
addr := *bind + ":" + *httpPort
|
||||
httpSrv := &http.Server{
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
// wireReplicatedNonces applies the cluster anti-replay policy to srv. It is the
|
||||
// single piece of wiring the binary uses to decide whether a node must share its
|
||||
// nonce store, extracted so a regression test exercises the EXACT decision the
|
||||
// running binary makes (issue 0006a, audit 0008 N3).
|
||||
//
|
||||
// Policy:
|
||||
// - A clustered node (clustered == true) MUST use the shared JetStream KV nonce
|
||||
// bucket. Every node sees the same bucket, so a request accepted on one node
|
||||
// cannot be replayed to another whose per-process cache never saw the nonce.
|
||||
// A missing JetStream context, or a failure to create the bucket, is a FATAL
|
||||
// configuration error returned to the caller — a clustered node running with a
|
||||
// per-process nonce cache is precisely the replay hole the audit flagged, so
|
||||
// it must refuse to start rather than serve insecurely.
|
||||
// - A standalone node (clustered == false) keeps the in-memory cache that
|
||||
// NewServer installed: there is no second node to replay to, so the shared
|
||||
// bucket would only add a JetStream dependency for no security gain.
|
||||
//
|
||||
// replicas is the nonce bucket's replication factor (R1..R3). Returns nil when no
|
||||
// action is required (standalone).
|
||||
func wireReplicatedNonces(srv *membership.Server, js jetstream.JetStream, clustered bool, replicas int) error {
|
||||
if !clustered {
|
||||
return nil // standalone: the in-memory nonce cache is sufficient and safe
|
||||
}
|
||||
if js == nil {
|
||||
return fmt.Errorf("clustered node requires JetStream for the shared nonce bucket, but none is available")
|
||||
}
|
||||
if err := srv.UseReplicatedNonces(js, replicas); err != nil {
|
||||
return fmt.Errorf("replicated nonces: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -82,6 +82,15 @@ type PermissionsFunc func(signPubHex string) (*server.Permissions, error)
|
||||
type nkeyAuthenticatorACL struct {
|
||||
isAuthorized func(signPubHex string) bool
|
||||
perms PermissionsFunc
|
||||
// internalPubHex is the lowercase-hex Ed25519 public key of membershipd's own
|
||||
// ephemeral internal service identity. A connection that proves that key is
|
||||
// granted full permissions WITHOUT consulting the allowlist, so the service can
|
||||
// bootstrap and manage JetStream (the replicated nonce bucket and, when
|
||||
// decentralized, the control-plane KV buckets) against its own embedded server
|
||||
// even while the data plane confines every client to its rooms. Empty disables
|
||||
// the internal-identity path entirely (behavior identical to a plain ACL
|
||||
// authenticator).
|
||||
internalPubHex string
|
||||
}
|
||||
|
||||
// NewNkeyAuthenticatorACL builds an authenticator that authorizes by the bus
|
||||
@@ -94,6 +103,29 @@ func NewNkeyAuthenticatorACL(isAuthorized func(signPubHex string) bool, perms Pe
|
||||
return &nkeyAuthenticatorACL{isAuthorized: isAuthorized, perms: perms}
|
||||
}
|
||||
|
||||
// NewNkeyAuthenticatorACLInternal is NewNkeyAuthenticatorACL that also recognizes
|
||||
// membershipd's internal service identity (internalPubHex, the lowercase hex of
|
||||
// its ephemeral Ed25519 public key): a connection proving that key is granted
|
||||
// full permissions without an allowlist lookup, so the service can create and
|
||||
// manage JetStream against its own embedded server under enforce (issue 0006a/c —
|
||||
// the replicated nonce bucket and the control-plane KV). Every other identity
|
||||
// goes through the allowlist + per-subject ACL unchanged. An empty internalPubHex
|
||||
// is identical to NewNkeyAuthenticatorACL, so this is a superset and safe to use
|
||||
// everywhere the plain constructor was used.
|
||||
func NewNkeyAuthenticatorACLInternal(isAuthorized func(signPubHex string) bool, perms PermissionsFunc, internalPubHex string) server.Authentication {
|
||||
return &nkeyAuthenticatorACL{isAuthorized: isAuthorized, perms: perms, internalPubHex: internalPubHex}
|
||||
}
|
||||
|
||||
// fullPermissions grants publish and subscribe on every subject (">"). It is the
|
||||
// permission set for membershipd's own internal service connection, which must
|
||||
// manage the JetStream control plane (nonce bucket + KV buckets) over NATS. It is
|
||||
// NEVER granted to a bus user — only to the process's own ephemeral internal
|
||||
// identity, recognized by exact public-key match in Check.
|
||||
func fullPermissions() *server.Permissions {
|
||||
sp := &server.SubjectPermission{Allow: []string{">"}}
|
||||
return &server.Permissions{Publish: sp, Subscribe: sp}
|
||||
}
|
||||
|
||||
// Check verifies the nkey, authorizes against the allowlist, then derives and
|
||||
// registers the connection's subject permissions. A permissions-derivation
|
||||
// error denies the connection (fail closed) rather than granting open access.
|
||||
@@ -102,6 +134,14 @@ func (a *nkeyAuthenticatorACL) Check(c server.ClientAuthentication) bool {
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
// membershipd's own internal service identity bypasses the allowlist and is
|
||||
// granted full permissions so the service can bootstrap JetStream under
|
||||
// enforce. The key is matched exactly against the cryptographically verified
|
||||
// connecting key, so no other identity can claim these permissions.
|
||||
if a.internalPubHex != "" && signPubHex == a.internalPubHex {
|
||||
c.RegisterUser(&server.User{Permissions: fullPermissions()})
|
||||
return true
|
||||
}
|
||||
if !a.isAuthorized(signPubHex) {
|
||||
return false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user