diff --git a/pkg/busauth/authenticator.go b/pkg/busauth/authenticator.go index 3de74a0b..ddd17267 100644 --- a/pkg/busauth/authenticator.go +++ b/pkg/busauth/authenticator.go @@ -27,31 +27,88 @@ func NewNkeyAuthenticator(isAuthorized func(signPubHex string) bool) server.Auth // Check verifies the client's nkey signature against the nonce the server // presented, then maps the nkey to its allowlist key and checks authorization. -// Any malformed input or failed verification yields false (fail closed). The -// signature decoding mirrors nats-server's own (raw-url base64, then std base64 -// fallback) so genuine clients using nats.Nkey are accepted unchanged. +// Any malformed input or failed verification yields false (fail closed). func (a *nkeyAuthenticator) Check(c server.ClientAuthentication) bool { + signPubHex, ok := verifyNkey(c) + if !ok { + return false + } + return a.isAuthorized(signPubHex) +} + +// verifyNkey performs the shared nkey verification: it checks the client's +// signature against the server-presented nonce and returns the lowercase-hex +// Ed25519 public key behind the nkey. ok is false on any malformed input or +// failed verification (fail closed). The signature decoding mirrors +// nats-server's own (raw-url base64, then std base64 fallback) so genuine +// clients using nats.Nkey are accepted unchanged. +func verifyNkey(c server.ClientAuthentication) (signPubHex string, ok bool) { opts := c.GetOpts() if opts.Nkey == "" { - return false + return "", false } sig, err := base64.RawURLEncoding.DecodeString(opts.Sig) if err != nil { sig, err = base64.StdEncoding.DecodeString(opts.Sig) if err != nil { - return false + return "", false } } pub, err := nkeys.FromPublicKey(opts.Nkey) if err != nil { - return false + return "", false } if err := pub.Verify(c.GetNonce(), sig); err != nil { - return false + return "", false } - signPubHex, err := SignPubHexFromNkey(opts.Nkey) + signPubHex, err = SignPubHexFromNkey(opts.Nkey) if err != nil { + return "", false + } + return signPubHex, true +} + +// PermissionsFunc maps a connecting identity (lowercase-hex Ed25519 signing key) +// to the NATS permissions it should be granted for this connection. Returning an +// error denies the connection (fail closed). It is how the data plane enforces +// per-subject access from room membership (issue 0003e, audit H4 residual). +type PermissionsFunc func(signPubHex string) (*server.Permissions, error) + +// nkeyAuthenticatorACL is the nkey authenticator that ALSO scopes the connection +// to per-subject permissions derived from room membership. NATS evaluates +// permissions once, at connect time, so a peer that joins a room after +// connecting must reconnect (client.RefreshSession) to gain that room's subject +// — the dynamic-membership reconnection model the audit deferred to this issue. +type nkeyAuthenticatorACL struct { + isAuthorized func(signPubHex string) bool + perms PermissionsFunc +} + +// NewNkeyAuthenticatorACL builds an authenticator that authorizes by the bus +// allowlist AND registers per-subject permissions from perms. A registered but +// permission-less peer can no longer subscribe to or publish on arbitrary +// subjects: it is confined to the subjects of the rooms it belongs to (plus the +// client infrastructure subjects perms includes). This is the per-subject ACL +// the 0004 hardening left as a residual. +func NewNkeyAuthenticatorACL(isAuthorized func(signPubHex string) bool, perms PermissionsFunc) server.Authentication { + return &nkeyAuthenticatorACL{isAuthorized: isAuthorized, perms: perms} +} + +// Check verifies the nkey, authorizes against the allowlist, then derives and +// registers the connection's subject permissions. A permissions-derivation +// error denies the connection (fail closed) rather than granting open access. +func (a *nkeyAuthenticatorACL) Check(c server.ClientAuthentication) bool { + signPubHex, ok := verifyNkey(c) + if !ok { return false } - return a.isAuthorized(signPubHex) + if !a.isAuthorized(signPubHex) { + return false + } + perms, err := a.perms(signPubHex) + if err != nil { + return false // fail closed: never grant open access on a derivation error + } + c.RegisterUser(&server.User{Permissions: perms}) + return true } diff --git a/pkg/client/client.go b/pkg/client/client.go index 69de476d..2768f404 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -51,9 +51,14 @@ type Client struct { endpoint string nc *nats.Conn js jetstream.JetStream // durable plane for rooms with Policy.Persist - ctrlURL string + ctrlURLs []string // control-plane HTTP endpoints, tried in order (failover) http *http.Client + // natsServers + natsOpts are retained so RefreshSession can rebuild the + // data-plane connection (re-triggering the server's subject-ACL evaluation). + natsServers []string + natsOpts []nats.Option + mu sync.RWMutex keyCache map[string]map[int][]byte // roomID -> epoch -> K signCache map[string][]byte // sender endpoint -> sign pub (for verification) @@ -77,6 +82,33 @@ type Options struct { // secured independently (a test may TLS one and not the other); production // sets both to the same CA via Connect. Nil keeps the control plane plaintext. CtrlTLS *tls.Config + // NatsServers are ADDITIONAL NATS seed URLs for cluster failover (issue + // 0003e), beyond the primary natsURL passed to the constructor. With more + // than one server nats.go reconnects to a surviving node automatically when + // the one a client is attached to dies, so a node loss is transparent. + NatsServers []string + // CtrlURLs are ADDITIONAL control-plane HTTP endpoints (one per node) beyond + // the primary ctrlURL. Each request is tried against them in order until one + // answers, so the control plane survives a node loss too. With the + // decentralized KV store every node serves the same state, so any of them + // can answer any request. + CtrlURLs []string +} + +// dedupNonEmpty returns the input with empty strings dropped and duplicates +// removed, preserving order. Used to build the NATS seed list and control-plane +// list from a primary URL plus optional extras without a redundant entry. +func dedupNonEmpty(in []string) []string { + seen := map[string]bool{} + var out []string + for _, s := range in { + if s == "" || seen[s] { + continue + } + seen[s] = true + out = append(out, s) + } + return out } // New connects to NATS and records the control-plane URL with default Options @@ -116,7 +148,20 @@ func Connect(natsURL, ctrlURL string, id cs.Identity, caPath string) (*Client, e // so every peer (worker, chat, mobile, gateway) gets identical behavior by // passing the same Options. func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Client, error) { - natsOpts := []nats.Option{nats.Name("unibus-client")} + // Seed list = primary + extras. With more than one seed, nats.go fails over + // to a surviving node on disconnect; MaxReconnects(-1) keeps it retrying + // indefinitely so a node coming back is rejoined rather than given up on. + natsServers := dedupNonEmpty(append([]string{natsURL}, opts.NatsServers...)) + natsOpts := []nats.Option{ + nats.Name("unibus-client"), + nats.MaxReconnects(-1), + nats.ReconnectWait(250 * time.Millisecond), + } + if len(natsServers) > 1 { + // Try every seed on the initial connect too, so startup tolerates one + // seed being down. + natsOpts = append(natsOpts, nats.RetryOnFailedConnect(true)) + } if opts.UseNkey { nkeyPub, nkeySign, err := busauth.ClientNkey(id.SignPriv) if err != nil { @@ -127,9 +172,9 @@ func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Cli if opts.TLS != nil { natsOpts = append(natsOpts, nats.Secure(opts.TLS)) } - nc, err := nats.Connect(natsURL, natsOpts...) + nc, err := nats.Connect(strings.Join(natsServers, ","), natsOpts...) if err != nil { - return nil, fmt.Errorf("client: connect nats %q: %w", natsURL, err) + return nil, fmt.Errorf("client: connect nats %v: %w", natsServers, err) } // JetStream context for the durable plane. Obtaining it does not require any // stream to exist yet and has no effect on cleartext/ephemeral rooms — those @@ -147,17 +192,50 @@ func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Cli httpClient.Transport = &http.Transport{TLSClientConfig: opts.CtrlTLS.Clone()} } return &Client{ - id: id, - endpoint: frame.EndpointID(id.SignPub), - nc: nc, - js: js, - ctrlURL: ctrlURL, - http: httpClient, - keyCache: map[string]map[int][]byte{}, - signCache: map[string][]byte{}, + id: id, + endpoint: frame.EndpointID(id.SignPub), + nc: nc, + js: js, + ctrlURLs: dedupNonEmpty(append([]string{ctrlURL}, opts.CtrlURLs...)), + http: httpClient, + natsServers: natsServers, + natsOpts: natsOpts, + keyCache: map[string]map[int][]byte{}, + signCache: map[string][]byte{}, }, nil } +// RefreshSession rebuilds the data-plane NATS connection so the server's +// subject-ACL authenticator re-evaluates this peer's room membership (issue +// 0003e, audit H4 residual). Call it after a membership change — a room you +// created, were invited to, or joined — when the bus enforces per-subject +// permissions, so the new room's subject becomes publishable and subscribable +// (NATS freezes permissions at connect time, so the prior connection cannot see +// the new room). +// +// It opens a fresh connection with the same seeds/options and swaps it in. +// IMPORTANT: active subscriptions from the previous connection are dropped — +// re-subscribe (client.Subscribe) to your rooms after calling this. The key and +// signer caches are preserved. On a non-ACL bus this is a no-op-safe reconnect. +func (c *Client) RefreshSession() error { + nc, err := nats.Connect(strings.Join(c.natsServers, ","), c.natsOpts...) + if err != nil { + return fmt.Errorf("client: refresh session: reconnect nats: %w", err) + } + js, err := jetstream.New(nc) + if err != nil { + nc.Close() + return fmt.Errorf("client: refresh session: init jetstream: %w", err) + } + old := c.nc + c.mu.Lock() + c.nc = nc + c.js = js + c.mu.Unlock() + old.Close() + return nil +} + // Endpoint returns this client's public identity. func (c *Client) Endpoint() Endpoint { return Endpoint{ID: c.endpoint, SignPub: c.id.SignPub, KexPub: c.id.KexPub} @@ -169,6 +247,15 @@ func (c *Client) Close() error { return nil } +// ConnectedServer returns the URL of the NATS node this client is currently +// attached to (empty when disconnected). It is observability for cluster +// failover: after a node dies, this reports the surviving node nats.go +// reconnected to. IsConnected reports whether the data-plane link is up. +func (c *Client) ConnectedServer() string { return c.nc.ConnectedUrl() } + +// IsConnected reports whether the NATS data-plane connection is currently up. +func (c *Client) IsConnected() bool { return c.nc.IsConnected() } + // ---- key cache ------------------------------------------------------------ func (c *Client) cacheKey(roomID string, epoch int, k []byte) { @@ -203,36 +290,45 @@ func (c *Client) doJSON(method, path string, body, out any) error { } bodyBytes = b } - req, err := c.newSignedRequest(method, path, bodyBytes) - if err != nil { - return err - } - if body != nil { - req.Header.Set("Content-Type", "application/json") - } - resp, err := c.http.Do(req) - if err != nil { - return fmt.Errorf("client: do %s %s: %w", method, path, err) - } - defer resp.Body.Close() - respBody, _ := io.ReadAll(resp.Body) - if resp.StatusCode >= 300 { - // Surface the server's structured {"error": "..."} message when present, - // instead of leaking the raw HTTP envelope (method, path, status, JSON body). - var er struct { - Error string `json:"error"` + // Try each control-plane endpoint in order. A transport error (a dead node) + // falls over to the next; an HTTP response (any status) is authoritative and + // returned, since every node serves the same state. Each attempt is freshly + // signed (new nonce), so a failed-over retry is never seen as a replay. + var lastErr error + for _, base := range c.ctrlURLs { + req, err := c.newSignedRequestTo(base, method, path, bodyBytes) + if err != nil { + return err } - if json.Unmarshal(respBody, &er) == nil && er.Error != "" { - return fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode) + if body != nil { + req.Header.Set("Content-Type", "application/json") } - return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody)) - } - if out != nil { - if err := json.Unmarshal(respBody, out); err != nil { - return fmt.Errorf("client: decode response: %w", err) + resp, err := c.http.Do(req) + if err != nil { + lastErr = err + continue // dead node: try the next control plane } + defer resp.Body.Close() + respBody, _ := io.ReadAll(resp.Body) + if resp.StatusCode >= 300 { + // Surface the server's structured {"error": "..."} message when present, + // instead of leaking the raw HTTP envelope (method, path, status, body). + var er struct { + Error string `json:"error"` + } + if json.Unmarshal(respBody, &er) == nil && er.Error != "" { + return fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode) + } + return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody)) + } + if out != nil { + if err := json.Unmarshal(respBody, out); err != nil { + return fmt.Errorf("client: decode response: %w", err) + } + } + return nil } - return nil + return fmt.Errorf("client: %s %s: all control planes failed: %w", method, path, lastErr) } // signRequest signs the canonical bytes of req (req must already have its Sig @@ -246,22 +342,25 @@ func (c *Client) signRequest(req any) []byte { return cs.SignEd25519(c.id.SignPriv, b) } -// newSignedRequest builds an *http.Request to the control plane and attaches the -// transport authentication headers (X-Unibus-Pub/Ts/Nonce/Sig) signing the -// canonical request bytes with this peer's Ed25519 key. path is the request URI -// (path plus any query); body is the raw request body (nil for GET). The server -// (membership.authenticate) verifies these headers under the bus-auth flag. +// newSignedRequestTo builds an *http.Request to the control-plane endpoint +// `base` and attaches the transport authentication headers +// (X-Unibus-Pub/Ts/Nonce/Sig) signing the canonical request bytes with this +// peer's Ed25519 key. path is the request URI (path plus any query); body is the +// raw request body (nil for GET). The server (membership.authenticate) verifies +// these headers under the bus-auth flag. The signature covers method+path+ts+ +// nonce+sha256(body), NOT the host, so the same request can be addressed to any +// node — and each failover attempt mints a fresh nonce so it is never a replay. // // Signing happens on every request — including GETs — so that under enforce the // server can authenticate the caller and reject unregistered or revoked // identities uniformly. The canonical construction is the single source of truth // in membership.CanonicalRequest, shared by both sides. -func (c *Client) newSignedRequest(method, path string, body []byte) (*http.Request, error) { +func (c *Client) newSignedRequestTo(base, method, path string, body []byte) (*http.Request, error) { var rdr io.Reader if body != nil { rdr = bytes.NewReader(body) } - req, err := http.NewRequest(method, c.ctrlURL+path, rdr) + req, err := http.NewRequest(method, base+path, rdr) if err != nil { return nil, fmt.Errorf("client: new request: %w", err) } @@ -887,40 +986,50 @@ func (c *Client) FetchMedia(roomID string, f frame.Frame) ([]byte, error) { } func (c *Client) putBlob(ciphertext []byte) (string, error) { - req, err := c.newSignedRequest("POST", "/blobs", ciphertext) - if err != nil { - return "", err + var lastErr error + for _, base := range c.ctrlURLs { + req, err := c.newSignedRequestTo(base, "POST", "/blobs", ciphertext) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/octet-stream") + resp, err := c.http.Do(req) + if err != nil { + lastErr = err + continue // dead node: try the next control plane + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode >= 300 { + return "", fmt.Errorf("client: put blob -> %d: %s", resp.StatusCode, string(body)) + } + var r blobResp + if err := json.Unmarshal(body, &r); err != nil { + return "", fmt.Errorf("client: decode blob resp: %w", err) + } + return r.Hash, nil } - req.Header.Set("Content-Type", "application/octet-stream") - resp, err := c.http.Do(req) - if err != nil { - return "", fmt.Errorf("client: put blob: %w", err) - } - defer resp.Body.Close() - body, _ := io.ReadAll(resp.Body) - if resp.StatusCode >= 300 { - return "", fmt.Errorf("client: put blob -> %d: %s", resp.StatusCode, string(body)) - } - var r blobResp - if err := json.Unmarshal(body, &r); err != nil { - return "", fmt.Errorf("client: decode blob resp: %w", err) - } - return r.Hash, nil + return "", fmt.Errorf("client: put blob: all control planes failed: %w", lastErr) } func (c *Client) getBlob(hash string) ([]byte, error) { - req, err := c.newSignedRequest("GET", "/blobs/"+hash, nil) - if err != nil { - return nil, err + var lastErr error + for _, base := range c.ctrlURLs { + req, err := c.newSignedRequestTo(base, "GET", "/blobs/"+hash, nil) + if err != nil { + return nil, err + } + resp, err := c.http.Do(req) + if err != nil { + lastErr = err + continue // dead node: try the next control plane + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("client: get blob -> %d: %s", resp.StatusCode, string(body)) + } + return io.ReadAll(resp.Body) } - resp, err := c.http.Do(req) - if err != nil { - return nil, fmt.Errorf("client: get blob: %w", err) - } - defer resp.Body.Close() - if resp.StatusCode >= 300 { - body, _ := io.ReadAll(resp.Body) - return nil, fmt.Errorf("client: get blob -> %d: %s", resp.StatusCode, string(body)) - } - return io.ReadAll(resp.Body) + return nil, fmt.Errorf("client: get blob: all control planes failed: %w", lastErr) } diff --git a/pkg/client/failover_test.go b/pkg/client/failover_test.go new file mode 100644 index 00000000..da6f5cf2 --- /dev/null +++ b/pkg/client/failover_test.go @@ -0,0 +1,185 @@ +package client_test + +import ( + "fmt" + "net/http/httptest" + "path/filepath" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/enmanuel/unibus/pkg/blobstore" + "github.com/enmanuel/unibus/pkg/client" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/enmanuel/unibus/pkg/membership" + "github.com/enmanuel/unibus/pkg/room" + server "github.com/nats-io/nats-server/v2/server" +) + +// startClusterNode boots a clustered embedded NATS node (auth off, no route TLS: +// this test exercises client failover, not route security — that is covered in +// pkg/embeddednats). +func startClusterNode(t *testing.T, name string, clientPort, routePort int, peerRoutePorts []int) *server.Server { + t.Helper() + routes := make([]string, 0, len(peerRoutePorts)) + for _, p := range peerRoutePorts { + routes = append(routes, fmt.Sprintf("nats://127.0.0.1:%d", p)) + } + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), + Host: "127.0.0.1", + Port: clientPort, + ServerName: name, + Cluster: &embeddednats.ClusterConfig{Name: "unibus-failover", Host: "127.0.0.1", Port: routePort, Routes: routes}, + }) + if err != nil { + t.Fatalf("start node %s: %v", name, err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + return ns +} + +func waitClusterRoutes(t *testing.T, ns *server.Server) { + t.Helper() + deadline := time.Now().Add(8 * time.Second) + for time.Now().Before(deadline) { + if ns.NumRoutes() >= 1 { + return + } + time.Sleep(50 * time.Millisecond) + } + t.Fatalf("node %q never formed a route", ns.Name()) +} + +// portOf extracts the :port of a nats URL for matching ConnectedServer() (which +// may report a different host spelling than ClientURL()). +func portOf(natsURL string) string { + i := strings.LastIndex(natsURL, ":") + if i < 0 { + return "" + } + return natsURL[i+1:] +} + +// TestClientFailoverAcrossNodes is the issue's edge case: a client connected to +// node A keeps its session when A is killed — nats.go reconnects it to node B +// and it keeps receiving messages published on the surviving node. +func TestClientFailoverAcrossNodes(t *testing.T) { + rp0, rp1 := freePort(t), freePort(t) + p0, p1 := freePort(t), freePort(t) + n0 := startClusterNode(t, "n0", p0, rp0, []int{rp1}) + n1 := startClusterNode(t, "n1", p1, rp1, []int{rp0}) + waitClusterRoutes(t, n0) + waitClusterRoutes(t, n1) + nodes := map[string]*server.Server{strconv.Itoa(p0): n0, strconv.Itoa(p1): n1} + + // Control plane: one in-process membershipd (metadata only; the data plane is + // the NATS cluster). Auth off keeps the test focused on data-plane failover. + dir := t.TempDir() + store, err := membership.Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + blobs, err := blobstore.New(filepath.Join(dir, "blobs")) + if err != nil { + t.Fatalf("blobs: %v", err) + } + ctrl := httptest.NewServer(membership.NewServer(store, blobs, membership.AuthOff)) + t.Cleanup(ctrl.Close) + + url0 := n0.ClientURL() + url1 := n1.ClientURL() + + // A seeds BOTH nodes (failover list); B connects directly to n1. + a, err := client.NewWithOptions(url0, ctrl.URL, mustIdentity(t), client.Options{NatsServers: []string{url1}}) + if err != nil { + t.Fatalf("connect A: %v", err) + } + defer a.Close() + b, err := client.NewWithOptions(url1, ctrl.URL, mustIdentity(t), client.Options{NatsServers: []string{url0}}) + if err != nil { + t.Fatalf("connect B: %v", err) + } + defer b.Close() + + roomID, err := a.CreateRoom("room.failover", room.ModeNATS) + if err != nil { + t.Fatalf("A create room: %v", err) + } + + var mu sync.Mutex + var got []string + sub, err := a.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) { + mu.Lock() + got = append(got, string(plaintext)) + mu.Unlock() + }) + if err != nil { + t.Fatalf("A subscribe: %v", err) + } + defer sub.Unsubscribe() + time.Sleep(200 * time.Millisecond) + + // Pre-kill sanity: B publishes, A receives across the cluster. + if err := b.Publish(roomID, []byte("before-kill")); err != nil { + t.Fatalf("B publish 1: %v", err) + } + if !waitFor(&mu, &got, func(rs []string) bool { return contains(rs, "before-kill") }, 3*time.Second) { + t.Fatalf("A did not receive the pre-kill message; got %v", snapshot(&mu, &got)) + } + + // Identify and KILL the node A is attached to, forcing a reconnect. + attached := a.ConnectedServer() + killPort := portOf(attached) + victim, ok := nodes[killPort] + if !ok { + t.Fatalf("A is attached to an unknown node %q (port %q)", attached, killPort) + } + survivorURL := url1 + if killPort == strconv.Itoa(p1) { + survivorURL = url0 + } + victim.Shutdown() + victim.WaitForShutdown() + + // A must reconnect to the surviving node. + deadline := time.Now().Add(8 * time.Second) + for time.Now().Before(deadline) { + if a.IsConnected() && portOf(a.ConnectedServer()) == portOf(survivorURL) { + break + } + time.Sleep(100 * time.Millisecond) + } + if !a.IsConnected() || portOf(a.ConnectedServer()) != portOf(survivorURL) { + t.Fatalf("A did not fail over to the surviving node (now on %q, want port %s)", a.ConnectedServer(), portOf(survivorURL)) + } + + // Make B publish from the surviving node and confirm A still receives — + // the session (its subscription) survived the failover. + if survivorURL == url0 { + // B's primary was n1 (killed); ensure B is on the survivor too. + deadline := time.Now().Add(8 * time.Second) + for time.Now().Before(deadline) && portOf(b.ConnectedServer()) != portOf(survivorURL) { + time.Sleep(100 * time.Millisecond) + } + } + if err := b.Publish(roomID, []byte("after-kill")); err != nil { + t.Fatalf("B publish 2: %v", err) + } + if !waitFor(&mu, &got, func(rs []string) bool { return contains(rs, "after-kill") }, 6*time.Second) { + t.Fatalf("A did not receive a message after failover; got %v", snapshot(&mu, &got)) + } +} + +func contains(rs []string, want string) bool { + for _, r := range rs { + if r == want { + return true + } + } + return false +} diff --git a/pkg/membership/acl.go b/pkg/membership/acl.go new file mode 100644 index 00000000..1443e24e --- /dev/null +++ b/pkg/membership/acl.go @@ -0,0 +1,52 @@ +package membership + +// Per-subject data-plane access control derived from room membership (issue +// 0003e, audit H4 residual). The control plane already authorizes metadata by +// membership; this is the matching restriction on the NATS data plane so a +// registered peer can only publish/subscribe on the subjects of the rooms it +// actually belongs to — not on every subject on the bus. + +import ( + "encoding/hex" + "fmt" + + "github.com/enmanuel/unibus/pkg/frame" +) + +// clientInfraSubjects are the subjects every peer needs regardless of room +// membership: the request/reply inbox space and the JetStream API (the durable +// plane of persisted rooms). They are granted to all authorized peers so +// request/reply and persisted-room history keep working under the subject ACL. +var clientInfraSubjects = []string{"_INBOX.>", "$JS.API.>"} + +// SubjectACLFor returns a function that maps a signing public key (lowercase +// hex) to the data-plane subjects that identity may publish and subscribe to: +// the subject of every room it belongs to, plus the client infrastructure +// subjects. It reads the live membership store, so the permissions reflect the +// identity's rooms at the moment it connects. A decode error or a store failure +// is returned as an error so the caller can fail closed (deny the connection) +// rather than grant open access. +// +// Because NATS freezes permissions at connect time, a peer invited to a new room +// after connecting must reconnect (client.RefreshSession) to pick up the new +// room's subject. The bus is the authoritative directory of subjects, so an +// unlisted subject is simply absent from the allow set. +func SubjectACLFor(store Store) func(signPubHex string) ([]string, error) { + return func(signPubHex string) ([]string, error) { + pub, err := hex.DecodeString(signPubHex) + if err != nil || len(pub) != 32 { + return nil, fmt.Errorf("acl: malformed sign pub %q", signPubHex) + } + endpoint := frame.EndpointID(pub) + rooms, err := store.ListRoomsForEndpoint(endpoint) + if err != nil { + return nil, fmt.Errorf("acl: list rooms for %s: %w", endpoint, err) + } + subjects := make([]string, 0, len(rooms)+len(clientInfraSubjects)) + subjects = append(subjects, clientInfraSubjects...) + for _, r := range rooms { + subjects = append(subjects, r.Subject) + } + return subjects, nil + } +} diff --git a/pkg/membership/acl_test.go b/pkg/membership/acl_test.go new file mode 100644 index 00000000..3b0c5199 --- /dev/null +++ b/pkg/membership/acl_test.go @@ -0,0 +1,290 @@ +package membership_test + +import ( + "encoding/hex" + "net" + "net/http/httptest" + "path/filepath" + "testing" + "time" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/blobstore" + "github.com/enmanuel/unibus/pkg/busauth" + "github.com/enmanuel/unibus/pkg/client" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/enmanuel/unibus/pkg/membership" + "github.com/nats-io/nats.go" + server "github.com/nats-io/nats-server/v2/server" +) + +func aclFreePort(t *testing.T) int { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("free port: %v", err) + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port +} + +func mustID(t *testing.T) cs.Identity { + t.Helper() + id, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("identity: %v", err) + } + return id +} + +// aclPermsFunc adapts membership.SubjectACLFor into the busauth.PermissionsFunc +// the ACL authenticator expects (same Allow set for publish and subscribe). +func aclPermsFunc(store membership.Store) busauth.PermissionsFunc { + derive := membership.SubjectACLFor(store) + return func(signPubHex string) (*server.Permissions, error) { + subs, err := derive(signPubHex) + if err != nil { + return nil, err + } + sp := &server.SubjectPermission{Allow: subs} + return &server.Permissions{Publish: sp, Subscribe: sp}, nil + } +} + +// startACLNats boots an embedded NATS whose authenticator confines each peer to +// the subjects of the rooms it belongs to (audit H4 residual). +func startACLNats(t *testing.T, store membership.Store) *server.Server { + t.Helper() + auth := busauth.NewNkeyAuthenticatorACL(store.IsAuthorized, aclPermsFunc(store)) + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), Host: "127.0.0.1", Port: aclFreePort(t), Auth: auth, + }) + if err != nil { + t.Fatalf("acl nats: %v", err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + return ns +} + +func nkeyConn(t *testing.T, natsURL string, id cs.Identity, errCh chan error) *nats.Conn { + t.Helper() + pub, sign, err := busauth.ClientNkey(id.SignPriv) + if err != nil { + t.Fatalf("nkey: %v", err) + } + nc, err := nats.Connect(natsURL, + nats.Nkey(pub, sign), + nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, e error) { + select { + case errCh <- e: + default: + } + }), + ) + if err != nil { + t.Fatalf("connect nkey: %v", err) + } + t.Cleanup(nc.Close) + return nc +} + +func mustAddUser(t *testing.T, store membership.Store, id cs.Identity, handle string) { + t.Helper() + if err := store.AddUser(hex.EncodeToString(id.SignPub), handle, membership.RoleMember); err != nil { + t.Fatalf("add user %s: %v", handle, err) + } +} + +func mustCreateRoom(t *testing.T, store membership.Store, roomID, subject, ownerEP string, owner cs.Identity) { + t.Helper() + info := membership.RoomInfo{RoomID: roomID, Subject: subject, OwnerEndpoint: ownerEP} + if err := store.CreateRoom(info, owner.SignPub, owner.KexPub, nil); err != nil { + t.Fatalf("create room %s: %v", roomID, err) + } +} + +func newCtrl(t *testing.T, store membership.Store, blobs blobstore.Store) string { + t.Helper() + ts := httptest.NewServer(membership.NewServer(store, blobs, membership.AuthOff)) + t.Cleanup(ts.Close) + return ts.URL +} + +func waitErr(ch chan error, d time.Duration) error { + select { + case e := <-ch: + return e + case <-time.After(d): + return nil + } +} + +func drain(ch chan error) { + for { + select { + case <-ch: + default: + return + } + } +} + +// TestSubjectACLIsolation closes the audit H4 residual: a registered peer is +// confined to the subjects of the rooms it belongs to. alice (member of room.A) +// may sub/pub room.A but is DENIED sub/pub on room.B, and never reads what bob +// (member of room.B) publishes there. +func TestSubjectACLIsolation(t *testing.T) { + dir := t.TempDir() + store, err := membership.Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + + alice, bob := mustID(t), mustID(t) + aliceEP, bobEP := frame.EndpointID(alice.SignPub), frame.EndpointID(bob.SignPub) + mustAddUser(t, store, alice, "alice") + mustAddUser(t, store, bob, "bob") + const subjA, subjB = "room.acl.a", "room.acl.b" + mustCreateRoom(t, store, "ROOMA", subjA, aliceEP, alice) + mustCreateRoom(t, store, "ROOMB", subjB, bobEP, bob) + + srv := startACLNats(t, store) + url := srv.ClientURL() + aliceErr := make(chan error, 4) + bobErr := make(chan error, 4) + aliceNC := nkeyConn(t, url, alice, aliceErr) + bobNC := nkeyConn(t, url, bob, bobErr) + + // alice may subscribe to her own room (no error). + aliceGot := make(chan string, 4) + if _, err := aliceNC.Subscribe(subjA, func(m *nats.Msg) { aliceGot <- string(m.Data) }); err != nil { + t.Fatalf("alice sub A: %v", err) + } + _ = aliceNC.Flush() + if e := waitErr(aliceErr, 300*time.Millisecond); e != nil { + t.Fatalf("alice sub to her OWN room raised an error: %v", e) + } + + // alice subscribing to bob's room is a permissions violation. + if _, err := aliceNC.Subscribe(subjB, func(m *nats.Msg) { aliceGot <- "LEAK:" + string(m.Data) }); err != nil { + t.Fatalf("alice sub B (queue): %v", err) + } + _ = aliceNC.Flush() + if e := waitErr(aliceErr, 1*time.Second); e == nil { + t.Fatalf("alice subscribing to bob's room should raise a permissions violation") + } + + // bob publishes in his room; alice (denied) must not receive it. + bobGot := make(chan string, 4) + if _, err := bobNC.Subscribe(subjB, func(m *nats.Msg) { bobGot <- string(m.Data) }); err != nil { + t.Fatalf("bob sub B: %v", err) + } + _ = bobNC.Flush() + if err := bobNC.Publish(subjB, []byte("internal-bob")); err != nil { + t.Fatalf("bob pub B: %v", err) + } + _ = bobNC.Flush() + select { + case got := <-bobGot: + if got != "internal-bob" { + t.Fatalf("bob got %q", got) + } + case <-time.After(2 * time.Second): + t.Fatalf("bob did not receive his own message") + } + select { + case leak := <-aliceGot: + t.Fatalf("alice received bob's room traffic despite the ACL: %q", leak) + case <-time.After(500 * time.Millisecond): + // good: alice never got it + } + + // alice publishing into bob's room is denied; bob must not receive it. + drain(aliceErr) + if err := aliceNC.Publish(subjB, []byte("intruder")); err != nil { + t.Fatalf("alice pub B (queue): %v", err) + } + _ = aliceNC.Flush() + if e := waitErr(aliceErr, 1*time.Second); e == nil { + t.Fatalf("alice publishing into bob's room should raise a permissions violation") + } + select { + case got := <-bobGot: + t.Fatalf("bob received alice's cross-room publish despite the ACL: %q", got) + case <-time.After(500 * time.Millisecond): + // good + } +} + +// TestRefreshSessionGainsNewRoom is the "permissions refreshed on join" path: +// alice is not in room B, so her connection has no permission for its subject; +// after she is added to room B and calls RefreshSession, the reconnect +// re-derives her permissions and she gains the room's subject. +func TestRefreshSessionGainsNewRoom(t *testing.T) { + dir := t.TempDir() + store, err := membership.Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + + alice, bob := mustID(t), mustID(t) + aliceEP, bobEP := frame.EndpointID(alice.SignPub), frame.EndpointID(bob.SignPub) + mustAddUser(t, store, alice, "alice") + mustAddUser(t, store, bob, "bob") + const subjB = "room.refresh.b" + mustCreateRoom(t, store, "ROOMB", subjB, bobEP, bob) + + srv := startACLNats(t, store) + blobs, _ := blobstore.New(filepath.Join(dir, "blobs")) + ctrl := newCtrl(t, store, blobs) + + aliceC, err := client.NewWithOptions(srv.ClientURL(), ctrl, alice, client.Options{UseNkey: true}) + if err != nil { + t.Fatalf("connect alice: %v", err) + } + defer aliceC.Close() + + // Add alice to room B (as if invited), then RefreshSession so the + // authenticator re-derives her permissions on reconnect. + if _, err := store.GetMember("ROOMB", aliceEP); err == nil { + t.Fatalf("alice should not be a member yet") + } + if err := store.AddMember("ROOMB", membership.Member{Endpoint: aliceEP, Role: "member", SignPub: alice.SignPub, KexPub: alice.KexPub}, 1, nil); err != nil { + t.Fatalf("add alice to room B: %v", err) + } + if err := aliceC.RefreshSession(); err != nil { + t.Fatalf("refresh session: %v", err) + } + + bobErr := make(chan error, 2) + bobNC := nkeyConn(t, srv.ClientURL(), bob, bobErr) + + got := make(chan string, 2) + sub, err := aliceC.Subscribe("ROOMB", func(_ frame.Frame, plaintext []byte) { got <- string(plaintext) }) + if err != nil { + t.Fatalf("alice subscribe room B after refresh: %v", err) + } + defer sub.Unsubscribe() + time.Sleep(200 * time.Millisecond) + + // bob publishes a minimal cleartext frame on subjB. + f := frame.Frame{Type: frame.PUB, Subject: subjB, Sender: bobEP, MsgID: "m1", Payload: []byte("hello-after-join")} + b, _ := f.Marshal() + if err := bobNC.Publish(subjB, b); err != nil { + t.Fatalf("bob publish: %v", err) + } + _ = bobNC.Flush() + + select { + case msg := <-got: + if msg != "hello-after-join" { + t.Fatalf("alice got %q", msg) + } + case <-time.After(3 * time.Second): + t.Fatalf("alice did not receive room B traffic after RefreshSession (permissions not refreshed)") + } +} diff --git a/pkg/membership/auth.go b/pkg/membership/auth.go index c4cf3541..e151bf40 100644 --- a/pkg/membership/auth.go +++ b/pkg/membership/auth.go @@ -95,16 +95,27 @@ func CanonicalRequest(method, path, ts, nonce string, body []byte) []byte { return []byte(method + "\n" + path + "\n" + ts + "\n" + nonce + "\n" + hex.EncodeToString(sum[:])) } -// nonceCache remembers recently-seen nonces to reject replays. It is an -// in-memory store guarded by a mutex — sufficient for a single membershipd -// process (the spec's chosen tradeoff over a server-issued nonce round-trip). A -// distributed deployment would need a shared store (tracked for issue 0003). +// nonceStore is the anti-replay backend: rememberOrReject records a nonce and +// reports whether it was unseen (true -> accept) or already seen (false -> +// reject the replay). It is an interface (issue 0003e) so the single-node +// in-memory cache can be swapped for a replicated KV store: a per-process cache +// is BROKEN under multi-node failover (a request captured and replayed to a +// DIFFERENT node whose cache never saw the nonce would be accepted), so a +// cluster MUST share the nonce state. Every implementation fails CLOSED — a +// backend it cannot reach rejects rather than admits. +type nonceStore interface { + rememberOrReject(nonce string, now time.Time) bool +} + +// memNonceCache remembers recently-seen nonces to reject replays. It is an +// in-memory store guarded by a mutex — sufficient for a SINGLE membershipd +// process. A clustered deployment uses kvNonceStore instead (issue 0003e). // // Pruning is O(expired), not O(n): because the TTL is constant, insertion order // equals expiry order, so the oldest entries (front of `order`) are exactly the // ones that expire first (audit H7 — the previous full-map scan under the mutex // was a CPU-amplification vector). A size cap bounds memory. -type nonceCache struct { +type memNonceCache struct { mu sync.Mutex seen map[string]time.Time // nonce -> expiry order []string // nonces in insertion order == expiry order @@ -112,13 +123,13 @@ type nonceCache struct { cap int } -func newNonceCache(ttl time.Duration, capacity int) *nonceCache { - return &nonceCache{seen: make(map[string]time.Time), ttl: ttl, cap: capacity} +func newMemNonceCache(ttl time.Duration, capacity int) *memNonceCache { + return &memNonceCache{seen: make(map[string]time.Time), ttl: ttl, cap: capacity} } // rememberOrReject records nonce and returns true if it was unseen, or false if // it is a replay (still live in the cache). -func (n *nonceCache) rememberOrReject(nonce string, now time.Time) bool { +func (n *memNonceCache) rememberOrReject(nonce string, now time.Time) bool { n.mu.Lock() defer n.mu.Unlock() diff --git a/pkg/membership/nonce_cache_test.go b/pkg/membership/nonce_cache_test.go index 0ff102ad..18ce0dc6 100644 --- a/pkg/membership/nonce_cache_test.go +++ b/pkg/membership/nonce_cache_test.go @@ -11,7 +11,7 @@ import ( // (error), and after the TTL the same nonce is accepted again because its entry // was pruned (edge). func TestNonceCacheRememberPrune(t *testing.T) { - nc := newNonceCache(50*time.Millisecond, 1000) + nc := newMemNonceCache(50*time.Millisecond, 1000) base := time.Now() if !nc.rememberOrReject("a", base) { @@ -31,7 +31,7 @@ func TestNonceCacheRememberPrune(t *testing.T) { // from the map. func TestNonceCacheCapBounded(t *testing.T) { const capacity = 100 - nc := newNonceCache(time.Hour, capacity) + nc := newMemNonceCache(time.Hour, capacity) base := time.Now() for i := 0; i < 500; i++ { nc.rememberOrReject("n"+strconv.Itoa(i), base) diff --git a/pkg/membership/nonce_kv.go b/pkg/membership/nonce_kv.go new file mode 100644 index 00000000..07f91e12 --- /dev/null +++ b/pkg/membership/nonce_kv.go @@ -0,0 +1,77 @@ +package membership + +// kvNonceStore is the replicated anti-replay backend (issue 0003e): seen nonces +// live in a JetStream KV bucket shared by every node, with a per-key TTL so they +// expire on their own. This closes the multi-node replay hole the auditor +// flagged: the per-process memNonceCache let an attacker replay a captured +// request to a DIFFERENT node, whose local cache never saw the nonce. With the +// shared bucket the first node to see a nonce wins the atomic Create, and every +// other node rejects the replay. + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +const bucketNonces = "UNIBUS_nonces" + +type kvNonceStore struct { + kv jetstream.KeyValue + opTimeout time.Duration +} + +// newKVNonceStore creates (or opens) the replicated nonce bucket. ttl is the +// per-key expiry — it must be >= the request acceptance window (2*clockSkew) so +// a replay can never outlive its memory, exactly like the in-memory cache's TTL. +func newKVNonceStore(js jetstream.JetStream, ttl time.Duration, replicas int, opTimeout time.Duration) (*kvNonceStore, error) { + if replicas <= 0 { + replicas = 1 + } + if opTimeout <= 0 { + opTimeout = defaultKVOpTime + } + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: bucketNonces, + TTL: ttl, + Replicas: replicas, + History: 1, + Storage: jetstream.FileStorage, + }) + if err != nil { + return nil, fmt.Errorf("membership: open nonce KV bucket (replicas=%d): %w", replicas, err) + } + return &kvNonceStore{kv: kv, opTimeout: opTimeout}, nil +} + +// nonceKVKey maps a raw nonce (std-base64, which contains '+' '/' '=' that KV +// keys forbid) to a KV-safe token: the hex of its sha256. Deterministic, so the +// same nonce always maps to the same key, and collision-free in practice. +func nonceKVKey(nonce string) string { + sum := sha256.Sum256([]byte(nonce)) + return hex.EncodeToString(sum[:]) +} + +// rememberOrReject atomically claims the nonce: Create succeeds only if the key +// is absent, so the first sight returns true (accept) and any later sight (a +// replay, on this or any other node sharing the bucket) returns false. A backend +// error fails CLOSED — reject — so a KV outage never silently disables +// anti-replay. The TTL on the bucket expires the key, reopening the window. +func (s *kvNonceStore) rememberOrReject(nonce string, _ time.Time) bool { + ctx, cancel := context.WithTimeout(context.Background(), s.opTimeout) + defer cancel() + if _, err := s.kv.Create(ctx, nonceKVKey(nonce), nil); err != nil { + if errors.Is(err, jetstream.ErrKeyExists) { + return false // replay: already claimed + } + return false // backend unreachable: fail closed + } + return true // first sight: accept +} diff --git a/pkg/membership/nonce_kv_test.go b/pkg/membership/nonce_kv_test.go new file mode 100644 index 00000000..2f9a88f5 --- /dev/null +++ b/pkg/membership/nonce_kv_test.go @@ -0,0 +1,117 @@ +package membership + +import ( + "crypto/rand" + "encoding/base64" + "encoding/hex" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + "time" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/blobstore" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// TestReplicatedNonceRejectsCrossNodeReplay is the issue's mandated error path: +// with the shared KV nonce store, a request accepted on node A is rejected as a +// replay when the SAME signed bytes are sent to node B. This closes the +// multi-node replay hole that the per-process cache left open. +func TestReplicatedNonceRejectsCrossNodeReplay(t *testing.T) { + // One NATS+JetStream backing the shared nonce bucket. + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), Host: "127.0.0.1", Port: kvFreePort(t), + }) + if err != nil { + t.Fatalf("nats: %v", err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + nc, err := nats.Connect(ns.ClientURL()) + if err != nil { + t.Fatalf("connect: %v", err) + } + t.Cleanup(nc.Close) + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("jetstream: %v", err) + } + + // One shared SQLite store (simulating the replicated control-plane state) and + // two membershipd servers (two nodes) that BOTH use the shared KV nonce store. + dir := t.TempDir() + store, err := Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + alice, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("identity: %v", err) + } + alicePub := hex.EncodeToString(alice.SignPub) + if err := store.AddUser(alicePub, "alice", RoleAdmin); err != nil { + t.Fatalf("add alice: %v", err) + } + blobs, _ := blobstore.New(filepath.Join(dir, "blobs")) + + mkNode := func() *httptest.Server { + srv := NewServer(store, blobs, AuthEnforce) + if err := srv.UseReplicatedNonces(js, 1); err != nil { + t.Fatalf("UseReplicatedNonces: %v", err) + } + return httptest.NewServer(srv) + } + nodeA := mkNode() + t.Cleanup(nodeA.Close) + nodeB := mkNode() + t.Cleanup(nodeB.Close) + + // Build ONE signed request (fixed ts+nonce) and send the identical bytes to + // both nodes. Authenticated path: alice listing her own rooms (200, empty). + ts := time.Now().Unix() + nonceRaw := make([]byte, 16) + if _, err := rand.Read(nonceRaw); err != nil { + t.Fatalf("nonce: %v", err) + } + nonce := base64.StdEncoding.EncodeToString(nonceRaw) + path := "/members/" + frame.EndpointID(alice.SignPub) + "/rooms" + + reqA := signedReq(t, nodeA.URL, "GET", path, nil, alice, ts, nonce) + respA, err := http.DefaultClient.Do(reqA) + if err != nil { + t.Fatalf("do A: %v", err) + } + respA.Body.Close() + if respA.StatusCode != http.StatusOK { + t.Fatalf("node A first use: status %d, want 200 (auth should pass, nonce fresh)", respA.StatusCode) + } + + // Replay the SAME ts+nonce to node B: the shared bucket already holds the + // nonce, so node B must reject it. + reqB := signedReq(t, nodeB.URL, "GET", path, nil, alice, ts, nonce) + respB, err := http.DefaultClient.Do(reqB) + if err != nil { + t.Fatalf("do B: %v", err) + } + respB.Body.Close() + if respB.StatusCode != http.StatusUnauthorized { + t.Fatalf("cross-node replay to node B: status %d, want 401 (replayed nonce)", respB.StatusCode) + } + + // And replaying to node A again is likewise rejected (same bucket). + reqA2 := signedReq(t, nodeA.URL, "GET", path, nil, alice, ts, nonce) + respA2, err := http.DefaultClient.Do(reqA2) + if err != nil { + t.Fatalf("do A2: %v", err) + } + respA2.Body.Close() + if respA2.StatusCode != http.StatusUnauthorized { + t.Fatalf("replay to node A: status %d, want 401", respA2.StatusCode) + } +} diff --git a/pkg/membership/server.go b/pkg/membership/server.go index 1b5758bd..21596057 100644 --- a/pkg/membership/server.go +++ b/pkg/membership/server.go @@ -19,6 +19,7 @@ import ( "github.com/enmanuel/unibus/pkg/blobstore" "github.com/enmanuel/unibus/pkg/frame" + "github.com/nats-io/nats.go/jetstream" ) // Body-size ceilings for the control plane. They bound how much an unauthenticated @@ -59,7 +60,7 @@ type Server struct { blobs blobstore.Store mux *http.ServeMux authMode AuthMode - nonces *nonceCache + nonces nonceStore limiter *ipRateLimiter // RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS) @@ -84,13 +85,29 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server { blobs: blobs, mux: http.NewServeMux(), authMode: authMode, - nonces: newNonceCache(nonceTTL, maxNonceCacheEntries), + nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries), limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL), } s.routes() return s } +// UseReplicatedNonces switches the server's anti-replay store from the +// per-process in-memory cache to a JetStream KV bucket shared across the cluster +// (issue 0003e). It MUST be called on every node of a multi-node deployment: +// otherwise a request captured on one node can be replayed to another whose +// local cache never saw the nonce. replicas is the bucket's replication factor +// (R1..R3). The TTL matches the in-memory cache (nonceTTL = 2*clockSkew), so a +// replay can never outlive its memory. +func (s *Server) UseReplicatedNonces(js jetstream.JetStream, replicas int) error { + ns, err := newKVNonceStore(js, nonceTTL, replicas, 0) + if err != nil { + return err + } + s.nonces = ns + return nil +} + // ServeHTTP satisfies http.Handler. It runs the control-plane auth middleware // (signature verification + anti-replay + allowlist) ahead of the router // according to authMode, then dispatches to the matched handler.