Merge issue/0003e-client-failover: client failover + replicated nonce store + subject ACL (H4)

This commit is contained in:
2026-06-07 15:27:45 +02:00
10 changed files with 1012 additions and 97 deletions
+66 -9
View File
@@ -27,31 +27,88 @@ func NewNkeyAuthenticator(isAuthorized func(signPubHex string) bool) server.Auth
// Check verifies the client's nkey signature against the nonce the server
// presented, then maps the nkey to its allowlist key and checks authorization.
// Any malformed input or failed verification yields false (fail closed). The
// signature decoding mirrors nats-server's own (raw-url base64, then std base64
// fallback) so genuine clients using nats.Nkey are accepted unchanged.
// Any malformed input or failed verification yields false (fail closed).
func (a *nkeyAuthenticator) Check(c server.ClientAuthentication) bool {
signPubHex, ok := verifyNkey(c)
if !ok {
return false
}
return a.isAuthorized(signPubHex)
}
// verifyNkey performs the shared nkey verification: it checks the client's
// signature against the server-presented nonce and returns the lowercase-hex
// Ed25519 public key behind the nkey. ok is false on any malformed input or
// failed verification (fail closed). The signature decoding mirrors
// nats-server's own (raw-url base64, then std base64 fallback) so genuine
// clients using nats.Nkey are accepted unchanged.
func verifyNkey(c server.ClientAuthentication) (signPubHex string, ok bool) {
opts := c.GetOpts()
if opts.Nkey == "" {
return false
return "", false
}
sig, err := base64.RawURLEncoding.DecodeString(opts.Sig)
if err != nil {
sig, err = base64.StdEncoding.DecodeString(opts.Sig)
if err != nil {
return false
return "", false
}
}
pub, err := nkeys.FromPublicKey(opts.Nkey)
if err != nil {
return false
return "", false
}
if err := pub.Verify(c.GetNonce(), sig); err != nil {
return false
return "", false
}
signPubHex, err := SignPubHexFromNkey(opts.Nkey)
signPubHex, err = SignPubHexFromNkey(opts.Nkey)
if err != nil {
return "", false
}
return signPubHex, true
}
// PermissionsFunc maps a connecting identity (lowercase-hex Ed25519 signing key)
// to the NATS permissions it should be granted for this connection. Returning an
// error denies the connection (fail closed). It is how the data plane enforces
// per-subject access from room membership (issue 0003e, audit H4 residual).
type PermissionsFunc func(signPubHex string) (*server.Permissions, error)
// nkeyAuthenticatorACL is the nkey authenticator that ALSO scopes the connection
// to per-subject permissions derived from room membership. NATS evaluates
// permissions once, at connect time, so a peer that joins a room after
// connecting must reconnect (client.RefreshSession) to gain that room's subject
// — the dynamic-membership reconnection model the audit deferred to this issue.
type nkeyAuthenticatorACL struct {
isAuthorized func(signPubHex string) bool
perms PermissionsFunc
}
// NewNkeyAuthenticatorACL builds an authenticator that authorizes by the bus
// allowlist AND registers per-subject permissions from perms. A registered but
// permission-less peer can no longer subscribe to or publish on arbitrary
// subjects: it is confined to the subjects of the rooms it belongs to (plus the
// client infrastructure subjects perms includes). This is the per-subject ACL
// the 0004 hardening left as a residual.
func NewNkeyAuthenticatorACL(isAuthorized func(signPubHex string) bool, perms PermissionsFunc) server.Authentication {
return &nkeyAuthenticatorACL{isAuthorized: isAuthorized, perms: perms}
}
// Check verifies the nkey, authorizes against the allowlist, then derives and
// registers the connection's subject permissions. A permissions-derivation
// error denies the connection (fail closed) rather than granting open access.
func (a *nkeyAuthenticatorACL) Check(c server.ClientAuthentication) bool {
signPubHex, ok := verifyNkey(c)
if !ok {
return false
}
return a.isAuthorized(signPubHex)
if !a.isAuthorized(signPubHex) {
return false
}
perms, err := a.perms(signPubHex)
if err != nil {
return false // fail closed: never grant open access on a derivation error
}
c.RegisterUser(&server.User{Permissions: perms})
return true
}
+185 -76
View File
@@ -51,9 +51,14 @@ type Client struct {
endpoint string
nc *nats.Conn
js jetstream.JetStream // durable plane for rooms with Policy.Persist
ctrlURL string
ctrlURLs []string // control-plane HTTP endpoints, tried in order (failover)
http *http.Client
// natsServers + natsOpts are retained so RefreshSession can rebuild the
// data-plane connection (re-triggering the server's subject-ACL evaluation).
natsServers []string
natsOpts []nats.Option
mu sync.RWMutex
keyCache map[string]map[int][]byte // roomID -> epoch -> K
signCache map[string][]byte // sender endpoint -> sign pub (for verification)
@@ -77,6 +82,33 @@ type Options struct {
// secured independently (a test may TLS one and not the other); production
// sets both to the same CA via Connect. Nil keeps the control plane plaintext.
CtrlTLS *tls.Config
// NatsServers are ADDITIONAL NATS seed URLs for cluster failover (issue
// 0003e), beyond the primary natsURL passed to the constructor. With more
// than one server nats.go reconnects to a surviving node automatically when
// the one a client is attached to dies, so a node loss is transparent.
NatsServers []string
// CtrlURLs are ADDITIONAL control-plane HTTP endpoints (one per node) beyond
// the primary ctrlURL. Each request is tried against them in order until one
// answers, so the control plane survives a node loss too. With the
// decentralized KV store every node serves the same state, so any of them
// can answer any request.
CtrlURLs []string
}
// dedupNonEmpty returns the input with empty strings dropped and duplicates
// removed, preserving order. Used to build the NATS seed list and control-plane
// list from a primary URL plus optional extras without a redundant entry.
func dedupNonEmpty(in []string) []string {
seen := map[string]bool{}
var out []string
for _, s := range in {
if s == "" || seen[s] {
continue
}
seen[s] = true
out = append(out, s)
}
return out
}
// New connects to NATS and records the control-plane URL with default Options
@@ -116,7 +148,20 @@ func Connect(natsURL, ctrlURL string, id cs.Identity, caPath string) (*Client, e
// so every peer (worker, chat, mobile, gateway) gets identical behavior by
// passing the same Options.
func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Client, error) {
natsOpts := []nats.Option{nats.Name("unibus-client")}
// Seed list = primary + extras. With more than one seed, nats.go fails over
// to a surviving node on disconnect; MaxReconnects(-1) keeps it retrying
// indefinitely so a node coming back is rejoined rather than given up on.
natsServers := dedupNonEmpty(append([]string{natsURL}, opts.NatsServers...))
natsOpts := []nats.Option{
nats.Name("unibus-client"),
nats.MaxReconnects(-1),
nats.ReconnectWait(250 * time.Millisecond),
}
if len(natsServers) > 1 {
// Try every seed on the initial connect too, so startup tolerates one
// seed being down.
natsOpts = append(natsOpts, nats.RetryOnFailedConnect(true))
}
if opts.UseNkey {
nkeyPub, nkeySign, err := busauth.ClientNkey(id.SignPriv)
if err != nil {
@@ -127,9 +172,9 @@ func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Cli
if opts.TLS != nil {
natsOpts = append(natsOpts, nats.Secure(opts.TLS))
}
nc, err := nats.Connect(natsURL, natsOpts...)
nc, err := nats.Connect(strings.Join(natsServers, ","), natsOpts...)
if err != nil {
return nil, fmt.Errorf("client: connect nats %q: %w", natsURL, err)
return nil, fmt.Errorf("client: connect nats %v: %w", natsServers, err)
}
// JetStream context for the durable plane. Obtaining it does not require any
// stream to exist yet and has no effect on cleartext/ephemeral rooms — those
@@ -147,17 +192,50 @@ func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Cli
httpClient.Transport = &http.Transport{TLSClientConfig: opts.CtrlTLS.Clone()}
}
return &Client{
id: id,
endpoint: frame.EndpointID(id.SignPub),
nc: nc,
js: js,
ctrlURL: ctrlURL,
http: httpClient,
keyCache: map[string]map[int][]byte{},
signCache: map[string][]byte{},
id: id,
endpoint: frame.EndpointID(id.SignPub),
nc: nc,
js: js,
ctrlURLs: dedupNonEmpty(append([]string{ctrlURL}, opts.CtrlURLs...)),
http: httpClient,
natsServers: natsServers,
natsOpts: natsOpts,
keyCache: map[string]map[int][]byte{},
signCache: map[string][]byte{},
}, nil
}
// RefreshSession rebuilds the data-plane NATS connection so the server's
// subject-ACL authenticator re-evaluates this peer's room membership (issue
// 0003e, audit H4 residual). Call it after a membership change — a room you
// created, were invited to, or joined — when the bus enforces per-subject
// permissions, so the new room's subject becomes publishable and subscribable
// (NATS freezes permissions at connect time, so the prior connection cannot see
// the new room).
//
// It opens a fresh connection with the same seeds/options and swaps it in.
// IMPORTANT: active subscriptions from the previous connection are dropped —
// re-subscribe (client.Subscribe) to your rooms after calling this. The key and
// signer caches are preserved. On a non-ACL bus this is a no-op-safe reconnect.
func (c *Client) RefreshSession() error {
nc, err := nats.Connect(strings.Join(c.natsServers, ","), c.natsOpts...)
if err != nil {
return fmt.Errorf("client: refresh session: reconnect nats: %w", err)
}
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
return fmt.Errorf("client: refresh session: init jetstream: %w", err)
}
old := c.nc
c.mu.Lock()
c.nc = nc
c.js = js
c.mu.Unlock()
old.Close()
return nil
}
// Endpoint returns this client's public identity.
func (c *Client) Endpoint() Endpoint {
return Endpoint{ID: c.endpoint, SignPub: c.id.SignPub, KexPub: c.id.KexPub}
@@ -169,6 +247,15 @@ func (c *Client) Close() error {
return nil
}
// ConnectedServer returns the URL of the NATS node this client is currently
// attached to (empty when disconnected). It is observability for cluster
// failover: after a node dies, this reports the surviving node nats.go
// reconnected to. IsConnected reports whether the data-plane link is up.
func (c *Client) ConnectedServer() string { return c.nc.ConnectedUrl() }
// IsConnected reports whether the NATS data-plane connection is currently up.
func (c *Client) IsConnected() bool { return c.nc.IsConnected() }
// ---- key cache ------------------------------------------------------------
func (c *Client) cacheKey(roomID string, epoch int, k []byte) {
@@ -203,36 +290,45 @@ func (c *Client) doJSON(method, path string, body, out any) error {
}
bodyBytes = b
}
req, err := c.newSignedRequest(method, path, bodyBytes)
if err != nil {
return err
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := c.http.Do(req)
if err != nil {
return fmt.Errorf("client: do %s %s: %w", method, path, err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 300 {
// Surface the server's structured {"error": "..."} message when present,
// instead of leaking the raw HTTP envelope (method, path, status, JSON body).
var er struct {
Error string `json:"error"`
// Try each control-plane endpoint in order. A transport error (a dead node)
// falls over to the next; an HTTP response (any status) is authoritative and
// returned, since every node serves the same state. Each attempt is freshly
// signed (new nonce), so a failed-over retry is never seen as a replay.
var lastErr error
for _, base := range c.ctrlURLs {
req, err := c.newSignedRequestTo(base, method, path, bodyBytes)
if err != nil {
return err
}
if json.Unmarshal(respBody, &er) == nil && er.Error != "" {
return fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode)
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody))
}
if out != nil {
if err := json.Unmarshal(respBody, out); err != nil {
return fmt.Errorf("client: decode response: %w", err)
resp, err := c.http.Do(req)
if err != nil {
lastErr = err
continue // dead node: try the next control plane
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 300 {
// Surface the server's structured {"error": "..."} message when present,
// instead of leaking the raw HTTP envelope (method, path, status, body).
var er struct {
Error string `json:"error"`
}
if json.Unmarshal(respBody, &er) == nil && er.Error != "" {
return fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode)
}
return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody))
}
if out != nil {
if err := json.Unmarshal(respBody, out); err != nil {
return fmt.Errorf("client: decode response: %w", err)
}
}
return nil
}
return nil
return fmt.Errorf("client: %s %s: all control planes failed: %w", method, path, lastErr)
}
// signRequest signs the canonical bytes of req (req must already have its Sig
@@ -246,22 +342,25 @@ func (c *Client) signRequest(req any) []byte {
return cs.SignEd25519(c.id.SignPriv, b)
}
// newSignedRequest builds an *http.Request to the control plane and attaches the
// transport authentication headers (X-Unibus-Pub/Ts/Nonce/Sig) signing the
// canonical request bytes with this peer's Ed25519 key. path is the request URI
// (path plus any query); body is the raw request body (nil for GET). The server
// (membership.authenticate) verifies these headers under the bus-auth flag.
// newSignedRequestTo builds an *http.Request to the control-plane endpoint
// `base` and attaches the transport authentication headers
// (X-Unibus-Pub/Ts/Nonce/Sig) signing the canonical request bytes with this
// peer's Ed25519 key. path is the request URI (path plus any query); body is the
// raw request body (nil for GET). The server (membership.authenticate) verifies
// these headers under the bus-auth flag. The signature covers method+path+ts+
// nonce+sha256(body), NOT the host, so the same request can be addressed to any
// node — and each failover attempt mints a fresh nonce so it is never a replay.
//
// Signing happens on every request — including GETs — so that under enforce the
// server can authenticate the caller and reject unregistered or revoked
// identities uniformly. The canonical construction is the single source of truth
// in membership.CanonicalRequest, shared by both sides.
func (c *Client) newSignedRequest(method, path string, body []byte) (*http.Request, error) {
func (c *Client) newSignedRequestTo(base, method, path string, body []byte) (*http.Request, error) {
var rdr io.Reader
if body != nil {
rdr = bytes.NewReader(body)
}
req, err := http.NewRequest(method, c.ctrlURL+path, rdr)
req, err := http.NewRequest(method, base+path, rdr)
if err != nil {
return nil, fmt.Errorf("client: new request: %w", err)
}
@@ -887,40 +986,50 @@ func (c *Client) FetchMedia(roomID string, f frame.Frame) ([]byte, error) {
}
func (c *Client) putBlob(ciphertext []byte) (string, error) {
req, err := c.newSignedRequest("POST", "/blobs", ciphertext)
if err != nil {
return "", err
var lastErr error
for _, base := range c.ctrlURLs {
req, err := c.newSignedRequestTo(base, "POST", "/blobs", ciphertext)
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := c.http.Do(req)
if err != nil {
lastErr = err
continue // dead node: try the next control plane
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 300 {
return "", fmt.Errorf("client: put blob -> %d: %s", resp.StatusCode, string(body))
}
var r blobResp
if err := json.Unmarshal(body, &r); err != nil {
return "", fmt.Errorf("client: decode blob resp: %w", err)
}
return r.Hash, nil
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := c.http.Do(req)
if err != nil {
return "", fmt.Errorf("client: put blob: %w", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 300 {
return "", fmt.Errorf("client: put blob -> %d: %s", resp.StatusCode, string(body))
}
var r blobResp
if err := json.Unmarshal(body, &r); err != nil {
return "", fmt.Errorf("client: decode blob resp: %w", err)
}
return r.Hash, nil
return "", fmt.Errorf("client: put blob: all control planes failed: %w", lastErr)
}
func (c *Client) getBlob(hash string) ([]byte, error) {
req, err := c.newSignedRequest("GET", "/blobs/"+hash, nil)
if err != nil {
return nil, err
var lastErr error
for _, base := range c.ctrlURLs {
req, err := c.newSignedRequestTo(base, "GET", "/blobs/"+hash, nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
lastErr = err
continue // dead node: try the next control plane
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("client: get blob -> %d: %s", resp.StatusCode, string(body))
}
return io.ReadAll(resp.Body)
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("client: get blob: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("client: get blob -> %d: %s", resp.StatusCode, string(body))
}
return io.ReadAll(resp.Body)
return nil, fmt.Errorf("client: get blob: all control planes failed: %w", lastErr)
}
+185
View File
@@ -0,0 +1,185 @@
package client_test
import (
"fmt"
"net/http/httptest"
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/enmanuel/unibus/pkg/room"
server "github.com/nats-io/nats-server/v2/server"
)
// startClusterNode boots a clustered embedded NATS node (auth off, no route TLS:
// this test exercises client failover, not route security — that is covered in
// pkg/embeddednats).
func startClusterNode(t *testing.T, name string, clientPort, routePort int, peerRoutePorts []int) *server.Server {
t.Helper()
routes := make([]string, 0, len(peerRoutePorts))
for _, p := range peerRoutePorts {
routes = append(routes, fmt.Sprintf("nats://127.0.0.1:%d", p))
}
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(),
Host: "127.0.0.1",
Port: clientPort,
ServerName: name,
Cluster: &embeddednats.ClusterConfig{Name: "unibus-failover", Host: "127.0.0.1", Port: routePort, Routes: routes},
})
if err != nil {
t.Fatalf("start node %s: %v", name, err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
return ns
}
func waitClusterRoutes(t *testing.T, ns *server.Server) {
t.Helper()
deadline := time.Now().Add(8 * time.Second)
for time.Now().Before(deadline) {
if ns.NumRoutes() >= 1 {
return
}
time.Sleep(50 * time.Millisecond)
}
t.Fatalf("node %q never formed a route", ns.Name())
}
// portOf extracts the :port of a nats URL for matching ConnectedServer() (which
// may report a different host spelling than ClientURL()).
func portOf(natsURL string) string {
i := strings.LastIndex(natsURL, ":")
if i < 0 {
return ""
}
return natsURL[i+1:]
}
// TestClientFailoverAcrossNodes is the issue's edge case: a client connected to
// node A keeps its session when A is killed — nats.go reconnects it to node B
// and it keeps receiving messages published on the surviving node.
func TestClientFailoverAcrossNodes(t *testing.T) {
rp0, rp1 := freePort(t), freePort(t)
p0, p1 := freePort(t), freePort(t)
n0 := startClusterNode(t, "n0", p0, rp0, []int{rp1})
n1 := startClusterNode(t, "n1", p1, rp1, []int{rp0})
waitClusterRoutes(t, n0)
waitClusterRoutes(t, n1)
nodes := map[string]*server.Server{strconv.Itoa(p0): n0, strconv.Itoa(p1): n1}
// Control plane: one in-process membershipd (metadata only; the data plane is
// the NATS cluster). Auth off keeps the test focused on data-plane failover.
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
if err != nil {
t.Fatalf("blobs: %v", err)
}
ctrl := httptest.NewServer(membership.NewServer(store, blobs, membership.AuthOff))
t.Cleanup(ctrl.Close)
url0 := n0.ClientURL()
url1 := n1.ClientURL()
// A seeds BOTH nodes (failover list); B connects directly to n1.
a, err := client.NewWithOptions(url0, ctrl.URL, mustIdentity(t), client.Options{NatsServers: []string{url1}})
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
b, err := client.NewWithOptions(url1, ctrl.URL, mustIdentity(t), client.Options{NatsServers: []string{url0}})
if err != nil {
t.Fatalf("connect B: %v", err)
}
defer b.Close()
roomID, err := a.CreateRoom("room.failover", room.ModeNATS)
if err != nil {
t.Fatalf("A create room: %v", err)
}
var mu sync.Mutex
var got []string
sub, err := a.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
mu.Lock()
got = append(got, string(plaintext))
mu.Unlock()
})
if err != nil {
t.Fatalf("A subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(200 * time.Millisecond)
// Pre-kill sanity: B publishes, A receives across the cluster.
if err := b.Publish(roomID, []byte("before-kill")); err != nil {
t.Fatalf("B publish 1: %v", err)
}
if !waitFor(&mu, &got, func(rs []string) bool { return contains(rs, "before-kill") }, 3*time.Second) {
t.Fatalf("A did not receive the pre-kill message; got %v", snapshot(&mu, &got))
}
// Identify and KILL the node A is attached to, forcing a reconnect.
attached := a.ConnectedServer()
killPort := portOf(attached)
victim, ok := nodes[killPort]
if !ok {
t.Fatalf("A is attached to an unknown node %q (port %q)", attached, killPort)
}
survivorURL := url1
if killPort == strconv.Itoa(p1) {
survivorURL = url0
}
victim.Shutdown()
victim.WaitForShutdown()
// A must reconnect to the surviving node.
deadline := time.Now().Add(8 * time.Second)
for time.Now().Before(deadline) {
if a.IsConnected() && portOf(a.ConnectedServer()) == portOf(survivorURL) {
break
}
time.Sleep(100 * time.Millisecond)
}
if !a.IsConnected() || portOf(a.ConnectedServer()) != portOf(survivorURL) {
t.Fatalf("A did not fail over to the surviving node (now on %q, want port %s)", a.ConnectedServer(), portOf(survivorURL))
}
// Make B publish from the surviving node and confirm A still receives —
// the session (its subscription) survived the failover.
if survivorURL == url0 {
// B's primary was n1 (killed); ensure B is on the survivor too.
deadline := time.Now().Add(8 * time.Second)
for time.Now().Before(deadline) && portOf(b.ConnectedServer()) != portOf(survivorURL) {
time.Sleep(100 * time.Millisecond)
}
}
if err := b.Publish(roomID, []byte("after-kill")); err != nil {
t.Fatalf("B publish 2: %v", err)
}
if !waitFor(&mu, &got, func(rs []string) bool { return contains(rs, "after-kill") }, 6*time.Second) {
t.Fatalf("A did not receive a message after failover; got %v", snapshot(&mu, &got))
}
}
func contains(rs []string, want string) bool {
for _, r := range rs {
if r == want {
return true
}
}
return false
}
+52
View File
@@ -0,0 +1,52 @@
package membership
// Per-subject data-plane access control derived from room membership (issue
// 0003e, audit H4 residual). The control plane already authorizes metadata by
// membership; this is the matching restriction on the NATS data plane so a
// registered peer can only publish/subscribe on the subjects of the rooms it
// actually belongs to — not on every subject on the bus.
import (
"encoding/hex"
"fmt"
"github.com/enmanuel/unibus/pkg/frame"
)
// clientInfraSubjects are the subjects every peer needs regardless of room
// membership: the request/reply inbox space and the JetStream API (the durable
// plane of persisted rooms). They are granted to all authorized peers so
// request/reply and persisted-room history keep working under the subject ACL.
var clientInfraSubjects = []string{"_INBOX.>", "$JS.API.>"}
// SubjectACLFor returns a function that maps a signing public key (lowercase
// hex) to the data-plane subjects that identity may publish and subscribe to:
// the subject of every room it belongs to, plus the client infrastructure
// subjects. It reads the live membership store, so the permissions reflect the
// identity's rooms at the moment it connects. A decode error or a store failure
// is returned as an error so the caller can fail closed (deny the connection)
// rather than grant open access.
//
// Because NATS freezes permissions at connect time, a peer invited to a new room
// after connecting must reconnect (client.RefreshSession) to pick up the new
// room's subject. The bus is the authoritative directory of subjects, so an
// unlisted subject is simply absent from the allow set.
func SubjectACLFor(store Store) func(signPubHex string) ([]string, error) {
return func(signPubHex string) ([]string, error) {
pub, err := hex.DecodeString(signPubHex)
if err != nil || len(pub) != 32 {
return nil, fmt.Errorf("acl: malformed sign pub %q", signPubHex)
}
endpoint := frame.EndpointID(pub)
rooms, err := store.ListRoomsForEndpoint(endpoint)
if err != nil {
return nil, fmt.Errorf("acl: list rooms for %s: %w", endpoint, err)
}
subjects := make([]string, 0, len(rooms)+len(clientInfraSubjects))
subjects = append(subjects, clientInfraSubjects...)
for _, r := range rooms {
subjects = append(subjects, r.Subject)
}
return subjects, nil
}
}
+290
View File
@@ -0,0 +1,290 @@
package membership_test
import (
"encoding/hex"
"net"
"net/http/httptest"
"path/filepath"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/nats-io/nats.go"
server "github.com/nats-io/nats-server/v2/server"
)
func aclFreePort(t *testing.T) int {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("free port: %v", err)
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port
}
func mustID(t *testing.T) cs.Identity {
t.Helper()
id, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("identity: %v", err)
}
return id
}
// aclPermsFunc adapts membership.SubjectACLFor into the busauth.PermissionsFunc
// the ACL authenticator expects (same Allow set for publish and subscribe).
func aclPermsFunc(store membership.Store) busauth.PermissionsFunc {
derive := membership.SubjectACLFor(store)
return func(signPubHex string) (*server.Permissions, error) {
subs, err := derive(signPubHex)
if err != nil {
return nil, err
}
sp := &server.SubjectPermission{Allow: subs}
return &server.Permissions{Publish: sp, Subscribe: sp}, nil
}
}
// startACLNats boots an embedded NATS whose authenticator confines each peer to
// the subjects of the rooms it belongs to (audit H4 residual).
func startACLNats(t *testing.T, store membership.Store) *server.Server {
t.Helper()
auth := busauth.NewNkeyAuthenticatorACL(store.IsAuthorized, aclPermsFunc(store))
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: aclFreePort(t), Auth: auth,
})
if err != nil {
t.Fatalf("acl nats: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
return ns
}
func nkeyConn(t *testing.T, natsURL string, id cs.Identity, errCh chan error) *nats.Conn {
t.Helper()
pub, sign, err := busauth.ClientNkey(id.SignPriv)
if err != nil {
t.Fatalf("nkey: %v", err)
}
nc, err := nats.Connect(natsURL,
nats.Nkey(pub, sign),
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, e error) {
select {
case errCh <- e:
default:
}
}),
)
if err != nil {
t.Fatalf("connect nkey: %v", err)
}
t.Cleanup(nc.Close)
return nc
}
func mustAddUser(t *testing.T, store membership.Store, id cs.Identity, handle string) {
t.Helper()
if err := store.AddUser(hex.EncodeToString(id.SignPub), handle, membership.RoleMember); err != nil {
t.Fatalf("add user %s: %v", handle, err)
}
}
func mustCreateRoom(t *testing.T, store membership.Store, roomID, subject, ownerEP string, owner cs.Identity) {
t.Helper()
info := membership.RoomInfo{RoomID: roomID, Subject: subject, OwnerEndpoint: ownerEP}
if err := store.CreateRoom(info, owner.SignPub, owner.KexPub, nil); err != nil {
t.Fatalf("create room %s: %v", roomID, err)
}
}
func newCtrl(t *testing.T, store membership.Store, blobs blobstore.Store) string {
t.Helper()
ts := httptest.NewServer(membership.NewServer(store, blobs, membership.AuthOff))
t.Cleanup(ts.Close)
return ts.URL
}
func waitErr(ch chan error, d time.Duration) error {
select {
case e := <-ch:
return e
case <-time.After(d):
return nil
}
}
func drain(ch chan error) {
for {
select {
case <-ch:
default:
return
}
}
}
// TestSubjectACLIsolation closes the audit H4 residual: a registered peer is
// confined to the subjects of the rooms it belongs to. alice (member of room.A)
// may sub/pub room.A but is DENIED sub/pub on room.B, and never reads what bob
// (member of room.B) publishes there.
func TestSubjectACLIsolation(t *testing.T) {
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
alice, bob := mustID(t), mustID(t)
aliceEP, bobEP := frame.EndpointID(alice.SignPub), frame.EndpointID(bob.SignPub)
mustAddUser(t, store, alice, "alice")
mustAddUser(t, store, bob, "bob")
const subjA, subjB = "room.acl.a", "room.acl.b"
mustCreateRoom(t, store, "ROOMA", subjA, aliceEP, alice)
mustCreateRoom(t, store, "ROOMB", subjB, bobEP, bob)
srv := startACLNats(t, store)
url := srv.ClientURL()
aliceErr := make(chan error, 4)
bobErr := make(chan error, 4)
aliceNC := nkeyConn(t, url, alice, aliceErr)
bobNC := nkeyConn(t, url, bob, bobErr)
// alice may subscribe to her own room (no error).
aliceGot := make(chan string, 4)
if _, err := aliceNC.Subscribe(subjA, func(m *nats.Msg) { aliceGot <- string(m.Data) }); err != nil {
t.Fatalf("alice sub A: %v", err)
}
_ = aliceNC.Flush()
if e := waitErr(aliceErr, 300*time.Millisecond); e != nil {
t.Fatalf("alice sub to her OWN room raised an error: %v", e)
}
// alice subscribing to bob's room is a permissions violation.
if _, err := aliceNC.Subscribe(subjB, func(m *nats.Msg) { aliceGot <- "LEAK:" + string(m.Data) }); err != nil {
t.Fatalf("alice sub B (queue): %v", err)
}
_ = aliceNC.Flush()
if e := waitErr(aliceErr, 1*time.Second); e == nil {
t.Fatalf("alice subscribing to bob's room should raise a permissions violation")
}
// bob publishes in his room; alice (denied) must not receive it.
bobGot := make(chan string, 4)
if _, err := bobNC.Subscribe(subjB, func(m *nats.Msg) { bobGot <- string(m.Data) }); err != nil {
t.Fatalf("bob sub B: %v", err)
}
_ = bobNC.Flush()
if err := bobNC.Publish(subjB, []byte("internal-bob")); err != nil {
t.Fatalf("bob pub B: %v", err)
}
_ = bobNC.Flush()
select {
case got := <-bobGot:
if got != "internal-bob" {
t.Fatalf("bob got %q", got)
}
case <-time.After(2 * time.Second):
t.Fatalf("bob did not receive his own message")
}
select {
case leak := <-aliceGot:
t.Fatalf("alice received bob's room traffic despite the ACL: %q", leak)
case <-time.After(500 * time.Millisecond):
// good: alice never got it
}
// alice publishing into bob's room is denied; bob must not receive it.
drain(aliceErr)
if err := aliceNC.Publish(subjB, []byte("intruder")); err != nil {
t.Fatalf("alice pub B (queue): %v", err)
}
_ = aliceNC.Flush()
if e := waitErr(aliceErr, 1*time.Second); e == nil {
t.Fatalf("alice publishing into bob's room should raise a permissions violation")
}
select {
case got := <-bobGot:
t.Fatalf("bob received alice's cross-room publish despite the ACL: %q", got)
case <-time.After(500 * time.Millisecond):
// good
}
}
// TestRefreshSessionGainsNewRoom is the "permissions refreshed on join" path:
// alice is not in room B, so her connection has no permission for its subject;
// after she is added to room B and calls RefreshSession, the reconnect
// re-derives her permissions and she gains the room's subject.
func TestRefreshSessionGainsNewRoom(t *testing.T) {
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
alice, bob := mustID(t), mustID(t)
aliceEP, bobEP := frame.EndpointID(alice.SignPub), frame.EndpointID(bob.SignPub)
mustAddUser(t, store, alice, "alice")
mustAddUser(t, store, bob, "bob")
const subjB = "room.refresh.b"
mustCreateRoom(t, store, "ROOMB", subjB, bobEP, bob)
srv := startACLNats(t, store)
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
ctrl := newCtrl(t, store, blobs)
aliceC, err := client.NewWithOptions(srv.ClientURL(), ctrl, alice, client.Options{UseNkey: true})
if err != nil {
t.Fatalf("connect alice: %v", err)
}
defer aliceC.Close()
// Add alice to room B (as if invited), then RefreshSession so the
// authenticator re-derives her permissions on reconnect.
if _, err := store.GetMember("ROOMB", aliceEP); err == nil {
t.Fatalf("alice should not be a member yet")
}
if err := store.AddMember("ROOMB", membership.Member{Endpoint: aliceEP, Role: "member", SignPub: alice.SignPub, KexPub: alice.KexPub}, 1, nil); err != nil {
t.Fatalf("add alice to room B: %v", err)
}
if err := aliceC.RefreshSession(); err != nil {
t.Fatalf("refresh session: %v", err)
}
bobErr := make(chan error, 2)
bobNC := nkeyConn(t, srv.ClientURL(), bob, bobErr)
got := make(chan string, 2)
sub, err := aliceC.Subscribe("ROOMB", func(_ frame.Frame, plaintext []byte) { got <- string(plaintext) })
if err != nil {
t.Fatalf("alice subscribe room B after refresh: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(200 * time.Millisecond)
// bob publishes a minimal cleartext frame on subjB.
f := frame.Frame{Type: frame.PUB, Subject: subjB, Sender: bobEP, MsgID: "m1", Payload: []byte("hello-after-join")}
b, _ := f.Marshal()
if err := bobNC.Publish(subjB, b); err != nil {
t.Fatalf("bob publish: %v", err)
}
_ = bobNC.Flush()
select {
case msg := <-got:
if msg != "hello-after-join" {
t.Fatalf("alice got %q", msg)
}
case <-time.After(3 * time.Second):
t.Fatalf("alice did not receive room B traffic after RefreshSession (permissions not refreshed)")
}
}
+19 -8
View File
@@ -95,16 +95,27 @@ func CanonicalRequest(method, path, ts, nonce string, body []byte) []byte {
return []byte(method + "\n" + path + "\n" + ts + "\n" + nonce + "\n" + hex.EncodeToString(sum[:]))
}
// nonceCache remembers recently-seen nonces to reject replays. It is an
// in-memory store guarded by a mutex — sufficient for a single membershipd
// process (the spec's chosen tradeoff over a server-issued nonce round-trip). A
// distributed deployment would need a shared store (tracked for issue 0003).
// nonceStore is the anti-replay backend: rememberOrReject records a nonce and
// reports whether it was unseen (true -> accept) or already seen (false ->
// reject the replay). It is an interface (issue 0003e) so the single-node
// in-memory cache can be swapped for a replicated KV store: a per-process cache
// is BROKEN under multi-node failover (a request captured and replayed to a
// DIFFERENT node whose cache never saw the nonce would be accepted), so a
// cluster MUST share the nonce state. Every implementation fails CLOSED — a
// backend it cannot reach rejects rather than admits.
type nonceStore interface {
rememberOrReject(nonce string, now time.Time) bool
}
// memNonceCache remembers recently-seen nonces to reject replays. It is an
// in-memory store guarded by a mutex — sufficient for a SINGLE membershipd
// process. A clustered deployment uses kvNonceStore instead (issue 0003e).
//
// Pruning is O(expired), not O(n): because the TTL is constant, insertion order
// equals expiry order, so the oldest entries (front of `order`) are exactly the
// ones that expire first (audit H7 — the previous full-map scan under the mutex
// was a CPU-amplification vector). A size cap bounds memory.
type nonceCache struct {
type memNonceCache struct {
mu sync.Mutex
seen map[string]time.Time // nonce -> expiry
order []string // nonces in insertion order == expiry order
@@ -112,13 +123,13 @@ type nonceCache struct {
cap int
}
func newNonceCache(ttl time.Duration, capacity int) *nonceCache {
return &nonceCache{seen: make(map[string]time.Time), ttl: ttl, cap: capacity}
func newMemNonceCache(ttl time.Duration, capacity int) *memNonceCache {
return &memNonceCache{seen: make(map[string]time.Time), ttl: ttl, cap: capacity}
}
// rememberOrReject records nonce and returns true if it was unseen, or false if
// it is a replay (still live in the cache).
func (n *nonceCache) rememberOrReject(nonce string, now time.Time) bool {
func (n *memNonceCache) rememberOrReject(nonce string, now time.Time) bool {
n.mu.Lock()
defer n.mu.Unlock()
+2 -2
View File
@@ -11,7 +11,7 @@ import (
// (error), and after the TTL the same nonce is accepted again because its entry
// was pruned (edge).
func TestNonceCacheRememberPrune(t *testing.T) {
nc := newNonceCache(50*time.Millisecond, 1000)
nc := newMemNonceCache(50*time.Millisecond, 1000)
base := time.Now()
if !nc.rememberOrReject("a", base) {
@@ -31,7 +31,7 @@ func TestNonceCacheRememberPrune(t *testing.T) {
// from the map.
func TestNonceCacheCapBounded(t *testing.T) {
const capacity = 100
nc := newNonceCache(time.Hour, capacity)
nc := newMemNonceCache(time.Hour, capacity)
base := time.Now()
for i := 0; i < 500; i++ {
nc.rememberOrReject("n"+strconv.Itoa(i), base)
+77
View File
@@ -0,0 +1,77 @@
package membership
// kvNonceStore is the replicated anti-replay backend (issue 0003e): seen nonces
// live in a JetStream KV bucket shared by every node, with a per-key TTL so they
// expire on their own. This closes the multi-node replay hole the auditor
// flagged: the per-process memNonceCache let an attacker replay a captured
// request to a DIFFERENT node, whose local cache never saw the nonce. With the
// shared bucket the first node to see a nonce wins the atomic Create, and every
// other node rejects the replay.
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"time"
"github.com/nats-io/nats.go/jetstream"
)
const bucketNonces = "UNIBUS_nonces"
type kvNonceStore struct {
kv jetstream.KeyValue
opTimeout time.Duration
}
// newKVNonceStore creates (or opens) the replicated nonce bucket. ttl is the
// per-key expiry — it must be >= the request acceptance window (2*clockSkew) so
// a replay can never outlive its memory, exactly like the in-memory cache's TTL.
func newKVNonceStore(js jetstream.JetStream, ttl time.Duration, replicas int, opTimeout time.Duration) (*kvNonceStore, error) {
if replicas <= 0 {
replicas = 1
}
if opTimeout <= 0 {
opTimeout = defaultKVOpTime
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: bucketNonces,
TTL: ttl,
Replicas: replicas,
History: 1,
Storage: jetstream.FileStorage,
})
if err != nil {
return nil, fmt.Errorf("membership: open nonce KV bucket (replicas=%d): %w", replicas, err)
}
return &kvNonceStore{kv: kv, opTimeout: opTimeout}, nil
}
// nonceKVKey maps a raw nonce (std-base64, which contains '+' '/' '=' that KV
// keys forbid) to a KV-safe token: the hex of its sha256. Deterministic, so the
// same nonce always maps to the same key, and collision-free in practice.
func nonceKVKey(nonce string) string {
sum := sha256.Sum256([]byte(nonce))
return hex.EncodeToString(sum[:])
}
// rememberOrReject atomically claims the nonce: Create succeeds only if the key
// is absent, so the first sight returns true (accept) and any later sight (a
// replay, on this or any other node sharing the bucket) returns false. A backend
// error fails CLOSED — reject — so a KV outage never silently disables
// anti-replay. The TTL on the bucket expires the key, reopening the window.
func (s *kvNonceStore) rememberOrReject(nonce string, _ time.Time) bool {
ctx, cancel := context.WithTimeout(context.Background(), s.opTimeout)
defer cancel()
if _, err := s.kv.Create(ctx, nonceKVKey(nonce), nil); err != nil {
if errors.Is(err, jetstream.ErrKeyExists) {
return false // replay: already claimed
}
return false // backend unreachable: fail closed
}
return true // first sight: accept
}
+117
View File
@@ -0,0 +1,117 @@
package membership
import (
"crypto/rand"
"encoding/base64"
"encoding/hex"
"net/http"
"net/http/httptest"
"path/filepath"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// TestReplicatedNonceRejectsCrossNodeReplay is the issue's mandated error path:
// with the shared KV nonce store, a request accepted on node A is rejected as a
// replay when the SAME signed bytes are sent to node B. This closes the
// multi-node replay hole that the per-process cache left open.
func TestReplicatedNonceRejectsCrossNodeReplay(t *testing.T) {
// One NATS+JetStream backing the shared nonce bucket.
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: kvFreePort(t),
})
if err != nil {
t.Fatalf("nats: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
nc, err := nats.Connect(ns.ClientURL())
if err != nil {
t.Fatalf("connect: %v", err)
}
t.Cleanup(nc.Close)
js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("jetstream: %v", err)
}
// One shared SQLite store (simulating the replicated control-plane state) and
// two membershipd servers (two nodes) that BOTH use the shared KV nonce store.
dir := t.TempDir()
store, err := Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
alice, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("identity: %v", err)
}
alicePub := hex.EncodeToString(alice.SignPub)
if err := store.AddUser(alicePub, "alice", RoleAdmin); err != nil {
t.Fatalf("add alice: %v", err)
}
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
mkNode := func() *httptest.Server {
srv := NewServer(store, blobs, AuthEnforce)
if err := srv.UseReplicatedNonces(js, 1); err != nil {
t.Fatalf("UseReplicatedNonces: %v", err)
}
return httptest.NewServer(srv)
}
nodeA := mkNode()
t.Cleanup(nodeA.Close)
nodeB := mkNode()
t.Cleanup(nodeB.Close)
// Build ONE signed request (fixed ts+nonce) and send the identical bytes to
// both nodes. Authenticated path: alice listing her own rooms (200, empty).
ts := time.Now().Unix()
nonceRaw := make([]byte, 16)
if _, err := rand.Read(nonceRaw); err != nil {
t.Fatalf("nonce: %v", err)
}
nonce := base64.StdEncoding.EncodeToString(nonceRaw)
path := "/members/" + frame.EndpointID(alice.SignPub) + "/rooms"
reqA := signedReq(t, nodeA.URL, "GET", path, nil, alice, ts, nonce)
respA, err := http.DefaultClient.Do(reqA)
if err != nil {
t.Fatalf("do A: %v", err)
}
respA.Body.Close()
if respA.StatusCode != http.StatusOK {
t.Fatalf("node A first use: status %d, want 200 (auth should pass, nonce fresh)", respA.StatusCode)
}
// Replay the SAME ts+nonce to node B: the shared bucket already holds the
// nonce, so node B must reject it.
reqB := signedReq(t, nodeB.URL, "GET", path, nil, alice, ts, nonce)
respB, err := http.DefaultClient.Do(reqB)
if err != nil {
t.Fatalf("do B: %v", err)
}
respB.Body.Close()
if respB.StatusCode != http.StatusUnauthorized {
t.Fatalf("cross-node replay to node B: status %d, want 401 (replayed nonce)", respB.StatusCode)
}
// And replaying to node A again is likewise rejected (same bucket).
reqA2 := signedReq(t, nodeA.URL, "GET", path, nil, alice, ts, nonce)
respA2, err := http.DefaultClient.Do(reqA2)
if err != nil {
t.Fatalf("do A2: %v", err)
}
respA2.Body.Close()
if respA2.StatusCode != http.StatusUnauthorized {
t.Fatalf("replay to node A: status %d, want 401", respA2.StatusCode)
}
}
+19 -2
View File
@@ -19,6 +19,7 @@ import (
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/nats-io/nats.go/jetstream"
)
// Body-size ceilings for the control plane. They bound how much an unauthenticated
@@ -59,7 +60,7 @@ type Server struct {
blobs blobstore.Store
mux *http.ServeMux
authMode AuthMode
nonces *nonceCache
nonces nonceStore
limiter *ipRateLimiter
// RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS)
@@ -84,13 +85,29 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
blobs: blobs,
mux: http.NewServeMux(),
authMode: authMode,
nonces: newNonceCache(nonceTTL, maxNonceCacheEntries),
nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries),
limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL),
}
s.routes()
return s
}
// UseReplicatedNonces switches the server's anti-replay store from the
// per-process in-memory cache to a JetStream KV bucket shared across the cluster
// (issue 0003e). It MUST be called on every node of a multi-node deployment:
// otherwise a request captured on one node can be replayed to another whose
// local cache never saw the nonce. replicas is the bucket's replication factor
// (R1..R3). The TTL matches the in-memory cache (nonceTTL = 2*clockSkew), so a
// replay can never outlive its memory.
func (s *Server) UseReplicatedNonces(js jetstream.JetStream, replicas int) error {
ns, err := newKVNonceStore(js, nonceTTL, replicas, 0)
if err != nil {
return err
}
s.nonces = ns
return nil
}
// ServeHTTP satisfies http.Handler. It runs the control-plane auth middleware
// (signature verification + anti-replay + allowlist) ahead of the router
// according to authMode, then dispatches to the matched handler.