Merge issue/0005e-acl-wire: wire per-subject ACL into membershipd (audit H4)
This commit is contained in:
+12
-2
@@ -138,8 +138,18 @@ func main() {
|
||||
log.Printf("cluster: %q node %q, route port %d, %d peer route(s)", *clusterName, *serverName, *clusterPort, len(cc.Routes))
|
||||
}
|
||||
if authMode == membership.AuthEnforce {
|
||||
cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized)
|
||||
log.Printf("NATS nkey authentication: ON (enforce)")
|
||||
// Per-subject data-plane ACL (audit H4 / N4 residual): the authenticator
|
||||
// authorizes by the bus allowlist AND confines each connection to the
|
||||
// subjects of the rooms it belongs to (plus client-infra subjects). This
|
||||
// closes the wildcard metadata leak where a registered non-member could
|
||||
// Subscribe(">") and harvest every room's subject and JetStream activity.
|
||||
// NATS freezes permissions at connect time, so a peer that joins a room
|
||||
// after connecting must client.RefreshSession to gain that room's subject.
|
||||
cfg.Auth = busauth.NewNkeyAuthenticatorACL(
|
||||
store.IsAuthorized,
|
||||
busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)),
|
||||
)
|
||||
log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)")
|
||||
}
|
||||
if *tlsCert != "" || *tlsKey != "" {
|
||||
if *tlsCert == "" || *tlsKey == "" {
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package busauth
|
||||
|
||||
import server "github.com/nats-io/nats-server/v2/server"
|
||||
|
||||
// PermissionsFromSubjects adapts a subject-deriving function (e.g.
|
||||
// membership.SubjectACLFor, which maps an identity to the subjects of the rooms
|
||||
// it belongs to plus the client infrastructure subjects) into the PermissionsFunc
|
||||
// the ACL authenticator expects. The derived subjects are granted as BOTH the
|
||||
// publish and subscribe allow set, so a connection can only pub/sub on the
|
||||
// subjects it is entitled to. A derivation error is propagated so the caller
|
||||
// fails closed (denies the connection) rather than granting open access.
|
||||
//
|
||||
// This is the production wiring for the per-subject data-plane ACL (issue 0003e,
|
||||
// audit H4): membershipd passes PermissionsFromSubjects(membership.SubjectACLFor(
|
||||
// store)) to NewNkeyAuthenticatorACL. It lives in busauth (not membership) so the
|
||||
// membership package stays free of the nats-server dependency.
|
||||
func PermissionsFromSubjects(derive func(signPubHex string) ([]string, error)) PermissionsFunc {
|
||||
return func(signPubHex string) (*server.Permissions, error) {
|
||||
subjects, err := derive(signPubHex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sp := &server.SubjectPermission{Allow: subjects}
|
||||
return &server.Permissions{Publish: sp, Subscribe: sp}, nil
|
||||
}
|
||||
}
|
||||
+103
-11
@@ -39,18 +39,12 @@ func mustID(t *testing.T) cs.Identity {
|
||||
return id
|
||||
}
|
||||
|
||||
// aclPermsFunc adapts membership.SubjectACLFor into the busauth.PermissionsFunc
|
||||
// the ACL authenticator expects (same Allow set for publish and subscribe).
|
||||
// aclPermsFunc builds the per-subject PermissionsFunc the ACL authenticator
|
||||
// expects. It delegates to the SAME production wiring membershipd uses
|
||||
// (busauth.PermissionsFromSubjects over membership.SubjectACLFor), so this test
|
||||
// exercises the real path rather than a test-only reimplementation.
|
||||
func aclPermsFunc(store membership.Store) busauth.PermissionsFunc {
|
||||
derive := membership.SubjectACLFor(store)
|
||||
return func(signPubHex string) (*server.Permissions, error) {
|
||||
subs, err := derive(signPubHex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sp := &server.SubjectPermission{Allow: subs}
|
||||
return &server.Permissions{Publish: sp, Subscribe: sp}, nil
|
||||
}
|
||||
return busauth.PermissionsFromSubjects(membership.SubjectACLFor(store))
|
||||
}
|
||||
|
||||
// startACLNats boots an embedded NATS whose authenticator confines each peer to
|
||||
@@ -219,6 +213,104 @@ func TestSubjectACLIsolation(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestReaudit_H4_WildcardMetadataLeak ports the re-auditor's H4 vector. Before
|
||||
// the per-subject ACL was WIRED into membershipd (it existed in pkg/membership and
|
||||
// pkg/busauth but the binary used the plain NewNkeyAuthenticator), a registered
|
||||
// NON-member could open a raw NATS connection, Subscribe(">"), and capture every
|
||||
// room's subject plus JetStream stream/advisory activity — the payload stayed E2E
|
||||
// ciphertext, but the metadata leaked. With NewNkeyAuthenticatorACL wired via the
|
||||
// production path (busauth.PermissionsFromSubjects(membership.SubjectACLFor)), a
|
||||
// non-member is confined to the client-infra subjects, so the wildcard and any
|
||||
// foreign room subject are denied.
|
||||
//
|
||||
// Coverage:
|
||||
// - error : a non-member's Subscribe(">") raises a permission violation;
|
||||
// - edge : a non-member subscribing to another room's exact subject is denied;
|
||||
// - golden: the member still pub/subs her own room, and the non-member never
|
||||
// captures that traffic.
|
||||
//
|
||||
// Residual (DOCUMENTED, not closed here): the client-infra grant includes
|
||||
// "$JS.API.>", shared by all peers so per-connection JetStream works. A peer that
|
||||
// subscribes specifically to "$JS.API.>" can still observe stream-management
|
||||
// requests whose subjects embed the stream name derived from a room id. Fully
|
||||
// closing that needs NATS accounts/permissions isolation per identity (deferred to
|
||||
// the 0003 decentralization line). The high-impact leak the auditor exploited —
|
||||
// the room subject itself and JetStream advisories captured via "Subscribe(\">\")"
|
||||
// — is closed.
|
||||
func TestReaudit_H4_WildcardMetadataLeak(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
|
||||
alice, eve := mustID(t), mustID(t)
|
||||
aliceEP := frame.EndpointID(alice.SignPub)
|
||||
mustAddUser(t, store, alice, "alice")
|
||||
mustAddUser(t, store, eve, "eve") // eve is REGISTERED but never a member of alice's room
|
||||
const subject = "room.e2e.confidential"
|
||||
mustCreateRoom(t, store, "ROOMA", subject, aliceEP, alice)
|
||||
|
||||
srv := startACLNats(t, store)
|
||||
url := srv.ClientURL()
|
||||
|
||||
eveErr := make(chan error, 8)
|
||||
eveNC := nkeyConn(t, url, eve, eveErr)
|
||||
eveAll := make(chan *nats.Msg, 16)
|
||||
|
||||
// Error: eve's wildcard subscription is rejected. nats.go creates the local sub
|
||||
// object and the server rejects it asynchronously (delivered to ErrorHandler).
|
||||
if _, err := eveNC.Subscribe(">", func(m *nats.Msg) { eveAll <- m }); err != nil {
|
||||
t.Fatalf("eve sub >: %v", err)
|
||||
}
|
||||
_ = eveNC.Flush()
|
||||
if e := waitErr(eveErr, 1*time.Second); e == nil {
|
||||
t.Fatalf("a non-member's Subscribe(\">\") must raise a permissions violation (wildcard metadata leak still open)")
|
||||
}
|
||||
|
||||
// Edge: eve subscribing to the foreign room's EXACT subject is also denied.
|
||||
drain(eveErr)
|
||||
if _, err := eveNC.Subscribe(subject, func(m *nats.Msg) { eveAll <- m }); err != nil {
|
||||
t.Fatalf("eve sub subject: %v", err)
|
||||
}
|
||||
_ = eveNC.Flush()
|
||||
if e := waitErr(eveErr, 1*time.Second); e == nil {
|
||||
t.Fatalf("a non-member subscribing to another room's subject must be denied")
|
||||
}
|
||||
|
||||
// Golden: alice (the member) pub/subs her own room with no violation, and eve
|
||||
// never captured the traffic despite her (rejected) wildcard.
|
||||
aliceErr := make(chan error, 4)
|
||||
aliceNC := nkeyConn(t, url, alice, aliceErr)
|
||||
aliceGot := make(chan string, 4)
|
||||
if _, err := aliceNC.Subscribe(subject, func(m *nats.Msg) { aliceGot <- string(m.Data) }); err != nil {
|
||||
t.Fatalf("alice sub own room: %v", err)
|
||||
}
|
||||
_ = aliceNC.Flush()
|
||||
if e := waitErr(aliceErr, 300*time.Millisecond); e != nil {
|
||||
t.Fatalf("alice subscribing to her OWN room raised an error: %v", e)
|
||||
}
|
||||
if err := aliceNC.Publish(subject, []byte("members-only metadata")); err != nil {
|
||||
t.Fatalf("alice publish: %v", err)
|
||||
}
|
||||
_ = aliceNC.Flush()
|
||||
select {
|
||||
case got := <-aliceGot:
|
||||
if got != "members-only metadata" {
|
||||
t.Fatalf("alice got %q", got)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("alice did not receive her own room's message")
|
||||
}
|
||||
select {
|
||||
case m := <-eveAll:
|
||||
t.Fatalf("eve captured room traffic despite the ACL: subject=%q data=%q", m.Subject, m.Data)
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
// good: eve captured nothing
|
||||
}
|
||||
}
|
||||
|
||||
// TestRefreshSessionGainsNewRoom is the "permissions refreshed on join" path:
|
||||
// alice is not in room B, so her connection has no permission for its subject;
|
||||
// after she is added to room B and calls RefreshSession, the reconnect
|
||||
|
||||
Reference in New Issue
Block a user