diff --git a/cmd/membershipd/main.go b/cmd/membershipd/main.go index 8d0e767d..01727bf4 100644 --- a/cmd/membershipd/main.go +++ b/cmd/membershipd/main.go @@ -138,8 +138,18 @@ func main() { log.Printf("cluster: %q node %q, route port %d, %d peer route(s)", *clusterName, *serverName, *clusterPort, len(cc.Routes)) } if authMode == membership.AuthEnforce { - cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized) - log.Printf("NATS nkey authentication: ON (enforce)") + // Per-subject data-plane ACL (audit H4 / N4 residual): the authenticator + // authorizes by the bus allowlist AND confines each connection to the + // subjects of the rooms it belongs to (plus client-infra subjects). This + // closes the wildcard metadata leak where a registered non-member could + // Subscribe(">") and harvest every room's subject and JetStream activity. + // NATS freezes permissions at connect time, so a peer that joins a room + // after connecting must client.RefreshSession to gain that room's subject. + cfg.Auth = busauth.NewNkeyAuthenticatorACL( + store.IsAuthorized, + busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)), + ) + log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)") } if *tlsCert != "" || *tlsKey != "" { if *tlsCert == "" || *tlsKey == "" { diff --git a/pkg/busauth/perms.go b/pkg/busauth/perms.go new file mode 100644 index 00000000..971a9ca5 --- /dev/null +++ b/pkg/busauth/perms.go @@ -0,0 +1,26 @@ +package busauth + +import server "github.com/nats-io/nats-server/v2/server" + +// PermissionsFromSubjects adapts a subject-deriving function (e.g. +// membership.SubjectACLFor, which maps an identity to the subjects of the rooms +// it belongs to plus the client infrastructure subjects) into the PermissionsFunc +// the ACL authenticator expects. The derived subjects are granted as BOTH the +// publish and subscribe allow set, so a connection can only pub/sub on the +// subjects it is entitled to. A derivation error is propagated so the caller +// fails closed (denies the connection) rather than granting open access. +// +// This is the production wiring for the per-subject data-plane ACL (issue 0003e, +// audit H4): membershipd passes PermissionsFromSubjects(membership.SubjectACLFor( +// store)) to NewNkeyAuthenticatorACL. It lives in busauth (not membership) so the +// membership package stays free of the nats-server dependency. +func PermissionsFromSubjects(derive func(signPubHex string) ([]string, error)) PermissionsFunc { + return func(signPubHex string) (*server.Permissions, error) { + subjects, err := derive(signPubHex) + if err != nil { + return nil, err + } + sp := &server.SubjectPermission{Allow: subjects} + return &server.Permissions{Publish: sp, Subscribe: sp}, nil + } +} diff --git a/pkg/membership/acl_test.go b/pkg/membership/acl_test.go index 3b0c5199..ab8db7a2 100644 --- a/pkg/membership/acl_test.go +++ b/pkg/membership/acl_test.go @@ -39,18 +39,12 @@ func mustID(t *testing.T) cs.Identity { return id } -// aclPermsFunc adapts membership.SubjectACLFor into the busauth.PermissionsFunc -// the ACL authenticator expects (same Allow set for publish and subscribe). +// aclPermsFunc builds the per-subject PermissionsFunc the ACL authenticator +// expects. It delegates to the SAME production wiring membershipd uses +// (busauth.PermissionsFromSubjects over membership.SubjectACLFor), so this test +// exercises the real path rather than a test-only reimplementation. func aclPermsFunc(store membership.Store) busauth.PermissionsFunc { - derive := membership.SubjectACLFor(store) - return func(signPubHex string) (*server.Permissions, error) { - subs, err := derive(signPubHex) - if err != nil { - return nil, err - } - sp := &server.SubjectPermission{Allow: subs} - return &server.Permissions{Publish: sp, Subscribe: sp}, nil - } + return busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)) } // startACLNats boots an embedded NATS whose authenticator confines each peer to @@ -219,6 +213,104 @@ func TestSubjectACLIsolation(t *testing.T) { } } +// TestReaudit_H4_WildcardMetadataLeak ports the re-auditor's H4 vector. Before +// the per-subject ACL was WIRED into membershipd (it existed in pkg/membership and +// pkg/busauth but the binary used the plain NewNkeyAuthenticator), a registered +// NON-member could open a raw NATS connection, Subscribe(">"), and capture every +// room's subject plus JetStream stream/advisory activity — the payload stayed E2E +// ciphertext, but the metadata leaked. With NewNkeyAuthenticatorACL wired via the +// production path (busauth.PermissionsFromSubjects(membership.SubjectACLFor)), a +// non-member is confined to the client-infra subjects, so the wildcard and any +// foreign room subject are denied. +// +// Coverage: +// - error : a non-member's Subscribe(">") raises a permission violation; +// - edge : a non-member subscribing to another room's exact subject is denied; +// - golden: the member still pub/subs her own room, and the non-member never +// captures that traffic. +// +// Residual (DOCUMENTED, not closed here): the client-infra grant includes +// "$JS.API.>", shared by all peers so per-connection JetStream works. A peer that +// subscribes specifically to "$JS.API.>" can still observe stream-management +// requests whose subjects embed the stream name derived from a room id. Fully +// closing that needs NATS accounts/permissions isolation per identity (deferred to +// the 0003 decentralization line). The high-impact leak the auditor exploited — +// the room subject itself and JetStream advisories captured via "Subscribe(\">\")" +// — is closed. +func TestReaudit_H4_WildcardMetadataLeak(t *testing.T) { + dir := t.TempDir() + store, err := membership.Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + + alice, eve := mustID(t), mustID(t) + aliceEP := frame.EndpointID(alice.SignPub) + mustAddUser(t, store, alice, "alice") + mustAddUser(t, store, eve, "eve") // eve is REGISTERED but never a member of alice's room + const subject = "room.e2e.confidential" + mustCreateRoom(t, store, "ROOMA", subject, aliceEP, alice) + + srv := startACLNats(t, store) + url := srv.ClientURL() + + eveErr := make(chan error, 8) + eveNC := nkeyConn(t, url, eve, eveErr) + eveAll := make(chan *nats.Msg, 16) + + // Error: eve's wildcard subscription is rejected. nats.go creates the local sub + // object and the server rejects it asynchronously (delivered to ErrorHandler). + if _, err := eveNC.Subscribe(">", func(m *nats.Msg) { eveAll <- m }); err != nil { + t.Fatalf("eve sub >: %v", err) + } + _ = eveNC.Flush() + if e := waitErr(eveErr, 1*time.Second); e == nil { + t.Fatalf("a non-member's Subscribe(\">\") must raise a permissions violation (wildcard metadata leak still open)") + } + + // Edge: eve subscribing to the foreign room's EXACT subject is also denied. + drain(eveErr) + if _, err := eveNC.Subscribe(subject, func(m *nats.Msg) { eveAll <- m }); err != nil { + t.Fatalf("eve sub subject: %v", err) + } + _ = eveNC.Flush() + if e := waitErr(eveErr, 1*time.Second); e == nil { + t.Fatalf("a non-member subscribing to another room's subject must be denied") + } + + // Golden: alice (the member) pub/subs her own room with no violation, and eve + // never captured the traffic despite her (rejected) wildcard. + aliceErr := make(chan error, 4) + aliceNC := nkeyConn(t, url, alice, aliceErr) + aliceGot := make(chan string, 4) + if _, err := aliceNC.Subscribe(subject, func(m *nats.Msg) { aliceGot <- string(m.Data) }); err != nil { + t.Fatalf("alice sub own room: %v", err) + } + _ = aliceNC.Flush() + if e := waitErr(aliceErr, 300*time.Millisecond); e != nil { + t.Fatalf("alice subscribing to her OWN room raised an error: %v", e) + } + if err := aliceNC.Publish(subject, []byte("members-only metadata")); err != nil { + t.Fatalf("alice publish: %v", err) + } + _ = aliceNC.Flush() + select { + case got := <-aliceGot: + if got != "members-only metadata" { + t.Fatalf("alice got %q", got) + } + case <-time.After(2 * time.Second): + t.Fatalf("alice did not receive her own room's message") + } + select { + case m := <-eveAll: + t.Fatalf("eve captured room traffic despite the ACL: subject=%q data=%q", m.Subject, m.Data) + case <-time.After(500 * time.Millisecond): + // good: eve captured nothing + } +} + // TestRefreshSessionGainsNewRoom is the "permissions refreshed on join" path: // alice is not in room B, so her connection has no permission for its subject; // after she is added to room B and calls RefreshSession, the reconnect