Merge issue/0006b-kv-acl: scope JetStream ACL per-room (audit 0008 N2)

This commit is contained in:
2026-06-07 17:08:54 +02:00
3 changed files with 240 additions and 25 deletions
+83 -17
View File
@@ -1,31 +1,95 @@
package membership
// Per-subject data-plane access control derived from room membership (issue
// 0003e, audit H4 residual). The control plane already authorizes metadata by
// membership; this is the matching restriction on the NATS data plane so a
// registered peer can only publish/subscribe on the subjects of the rooms it
// actually belongs to — not on every subject on the bus.
// 0003e, audit H4 residual; tightened in issue 0006b for audit 0008 N2). The
// control plane already authorizes metadata by membership; this is the matching
// restriction on the NATS data plane so a registered peer can only
// publish/subscribe on the subjects of the rooms it actually belongs to — and can
// only reach the JetStream API of ITS OWN rooms' streams, never the control-plane
// KV buckets.
import (
"encoding/hex"
"fmt"
"strings"
"github.com/enmanuel/unibus/pkg/frame"
)
// clientInfraSubjects are the subjects every peer needs regardless of room
// membership: the request/reply inbox space and the JetStream API (the durable
// plane of persisted rooms). They are granted to all authorized peers so
// request/reply and persisted-room history keep working under the subject ACL.
var clientInfraSubjects = []string{"_INBOX.>", "$JS.API.>"}
// clientInfraSubjects are the subjects every authorized peer needs regardless of
// room membership, kept deliberately MINIMAL (issue 0006b, audit 0008 N2):
//
// - "_INBOX.>" — request/reply plus the JetStream pull-consumer delivery
// and publish-ack inboxes.
// - "$JS.API.INFO" — account-level JetStream info (limits/usage counters). It
// exposes NO room/user/key contents, so granting it leaks nothing.
//
// It NO LONGER contains "$JS.API.>". That broad grant was the N2 leak: it let any
// registered peer drive the whole JetStream API and read the control-plane KV
// buckets (KV_UNIBUS_users/rooms/members/room_keys) and the object store directly
// over NATS, bypassing the HTTP authorization (requireMember and the own-endpoint
// checks). JetStream API access is now granted PER ROOM, scoped to the stream of
// each room the peer belongs to (jsSubjectsFor). Because the control-plane KV
// streams (KV_UNIBUS_*) and the object store (OBJ_UNIBUS_*) are never a room
// stream, they fall outside the closed allow set and are denied by default.
var clientInfraSubjects = []string{"_INBOX.>", "$JS.API.INFO"}
// SubjectACLFor returns a function that maps a signing public key (lowercase
// hex) to the data-plane subjects that identity may publish and subscribe to:
// the subject of every room it belongs to, plus the client infrastructure
// subjects. It reads the live membership store, so the permissions reflect the
// identity's rooms at the moment it connects. A decode error or a store failure
// is returned as an error so the caller can fail closed (deny the connection)
// rather than grant open access.
// roomStreamName is the JetStream stream name a persisted room maps to. It MUST
// stay identical to pkg/client.streamName ("UNIBUS_" + sanitized roomID) so the
// per-room ACL grants exactly the subjects the client's JetStream calls use. Room
// ids are ULIDs (no '.'), so the sanitizing is a no-op in practice, but the rule
// is replicated defensively so the producer (client) and the authorizer (this
// ACL) never drift apart.
func roomStreamName(roomID string) string {
var b strings.Builder
b.Grow(len("UNIBUS_") + len(roomID))
b.WriteString("UNIBUS_")
for _, r := range roomID {
switch {
case r >= 'a' && r <= 'z', r >= 'A' && r <= 'Z', r >= '0' && r <= '9', r == '_':
b.WriteRune(r)
default:
b.WriteRune('_')
}
}
return b.String()
}
// jsSubjectsFor returns the MINIMAL JetStream API subjects a peer needs to use the
// durable stream of ONE persisted room: create/update/info the stream, manage and
// pull from its durable consumer, and ack deliveries. Every subject embeds this
// room's stream name, so the grant cannot reach another room's stream nor any
// control-plane stream (KV_UNIBUS_* / OBJ_UNIBUS_*). The wildcard layout matches
// the NATS JetStream API subject grammar (the stream name is the trailing token
// of single-verb requests and follows a two-token verb for MSG.GET / MSG.NEXT /
// DURABLE.CREATE):
//
// $JS.API.STREAM.<verb>.<stream> verb in {CREATE,UPDATE,INFO,DELETE,PURGE,...}
// $JS.API.STREAM.MSG.<op>.<stream> op in {GET,DELETE}
// $JS.API.CONSUMER.<verb>.<stream> verb in {LIST,NAMES,CREATE(ephemeral)}
// $JS.API.CONSUMER.<verb>.<stream>.<consumer>... verb in {CREATE,INFO,DELETE}
// $JS.API.CONSUMER.<v1>.<v2>.<stream>.<cons> {MSG.NEXT, DURABLE.CREATE}
// $JS.ACK.<stream>.> message acknowledgements
func jsSubjectsFor(roomID string) []string {
s := roomStreamName(roomID)
return []string{
"$JS.API.STREAM.*." + s,
"$JS.API.STREAM.*.*." + s,
"$JS.API.CONSUMER.*." + s,
"$JS.API.CONSUMER.*." + s + ".>",
"$JS.API.CONSUMER.*.*." + s + ".>",
"$JS.ACK." + s + ".>",
}
}
// SubjectACLFor returns a function that maps a signing public key (lowercase hex)
// to the data-plane subjects that identity may publish and subscribe to: the
// subject of every room it belongs to, the per-room JetStream API subjects of
// those rooms (so persisted-room history keeps working), plus the minimal client
// infrastructure subjects. It reads the live membership store, so the permissions
// reflect the identity's rooms at the moment it connects. A decode error or a
// store failure is returned as an error so the caller can fail closed (deny the
// connection) rather than grant open access.
//
// Because NATS freezes permissions at connect time, a peer invited to a new room
// after connecting must reconnect (client.RefreshSession) to pick up the new
@@ -42,10 +106,12 @@ func SubjectACLFor(store Store) func(signPubHex string) ([]string, error) {
if err != nil {
return nil, fmt.Errorf("acl: list rooms for %s: %w", endpoint, err)
}
subjects := make([]string, 0, len(rooms)+len(clientInfraSubjects))
// clientInfra + per room: the room subject + that room's JetStream API.
subjects := make([]string, 0, len(clientInfraSubjects)+len(rooms)*7)
subjects = append(subjects, clientInfraSubjects...)
for _, r := range rooms {
subjects = append(subjects, r.Subject)
subjects = append(subjects, jsSubjectsFor(r.RoomID)...)
}
return subjects, nil
}
+5 -8
View File
@@ -229,14 +229,11 @@ func TestSubjectACLIsolation(t *testing.T) {
// - golden: the member still pub/subs her own room, and the non-member never
// captures that traffic.
//
// Residual (DOCUMENTED, not closed here): the client-infra grant includes
// "$JS.API.>", shared by all peers so per-connection JetStream works. A peer that
// subscribes specifically to "$JS.API.>" can still observe stream-management
// requests whose subjects embed the stream name derived from a room id. Fully
// closing that needs NATS accounts/permissions isolation per identity (deferred to
// the 0003 decentralization line). The high-impact leak the auditor exploited —
// the room subject itself and JetStream advisories captured via "Subscribe(\">\")"
// — is closed.
// Residual now CLOSED (issue 0006b, audit 0008 N2): the client-infra grant no
// longer includes "$JS.API.>". JetStream API access is granted per-room only
// (membership.jsSubjectsFor), so a peer can reach the API of its OWN rooms'
// streams but not the control-plane KV buckets (KV_UNIBUS_*) nor another room's
// stream. See TestAttack0008_N2 for the closed-leak regression.
func TestReaudit_H4_WildcardMetadataLeak(t *testing.T) {
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
+152
View File
@@ -0,0 +1,152 @@
package membership_test
// Regression for audit report 0008, vector N2: with the broad "$JS.API.>" grant
// removed (issue 0006b), a registered peer that belongs to no room can no longer
// read the control-plane KV buckets over NATS, while the per-room JetStream API of
// a peer's OWN rooms keeps working. The auditor's ephemeral attack populated the
// KV control plane and had a registered non-member harvest the allowlist, the room
// graph and the sealed-key metadata directly through "$JS.API.>".
import (
"context"
"encoding/hex"
"path/filepath"
"testing"
"time"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
server "github.com/nats-io/nats-server/v2/server"
)
// startACLNatsInternal is startACLNats plus a recognized internal service identity
// (so the test can seed the KV control plane with full permissions, exactly as the
// decentralized membershipd does at bootstrap).
func startACLNatsInternal(t *testing.T, store membership.Store, internalPubHex string) *server.Server {
t.Helper()
auth := busauth.NewNkeyAuthenticatorACLInternal(store.IsAuthorized, aclPermsFunc(store), internalPubHex)
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: aclFreePort(t), Auth: auth,
})
if err != nil {
t.Fatalf("acl nats: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
return ns
}
// TestAttack0008_N2 reproduces the control-plane KV leak and proves it is closed.
//
// error : eve (registered, member of no room) cannot read the KV buckets — the
// JetStream KV API and the raw $KV subject space are both denied.
// golden: the owner of a persisted room can still drive the JetStream API of HER
// OWN room's stream (so persisted-room history keeps working).
// edge : eve cannot reach another room's stream API either (cross-room JS deny).
func TestAttack0008_N2(t *testing.T) {
dir := t.TempDir()
// The HTTP control-plane store stays SQLite; the KV buckets below stand in for
// the decentralized control plane the attack targets.
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
ceo, eve, internalID := mustID(t), mustID(t), mustID(t)
ceoEP := frame.EndpointID(ceo.SignPub)
mustAddUser(t, store, ceo, "ceo-root-admin")
mustAddUser(t, store, eve, "eve") // registered, member of nothing
// A persisted room owned by ceo: ceo is a member, so her per-room JS is allowed.
if err := store.CreateRoom(
membership.RoomInfo{RoomID: "PRIVROOM", Subject: "room.board.ma-deal", Encrypt: true, Persist: true, OwnerEndpoint: ceoEP},
ceo.SignPub, ceo.KexPub, []byte("sealed-self"),
); err != nil {
t.Fatalf("create room: %v", err)
}
internalPubHex := hex.EncodeToString(internalID.SignPub)
ns := startACLNatsInternal(t, store, internalPubHex)
url := ns.ClientURL()
// Seed the KV control plane with the privileged internal identity (full perms),
// simulating the decentralized buckets the attack reads.
intErr := make(chan error, 4)
intNC := nkeyConn(t, url, internalID, intErr)
intJS, err := jetstream.New(intNC)
if err != nil {
t.Fatalf("internal jetstream: %v", err)
}
kvStore, err := membership.OpenJetStream(intJS, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
if err != nil {
t.Fatalf("open kv buckets: %v", err)
}
if err := kvStore.AddUser(hex.EncodeToString(ceo.SignPub), "ceo-root-admin", membership.RoleAdmin); err != nil {
t.Fatalf("seed kv user: %v", err)
}
// Each JetStream op gets its own short context: a DENIED request never gets a
// reply, so it blocks until its own deadline — a shared context would be
// exhausted by the first denied call and starve the rest.
freshCtx := func(d time.Duration) (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), d)
}
// --- error: eve cannot read the control-plane KV buckets ------------------
eveErr := make(chan error, 8)
eveNC := nkeyConn(t, url, eve, eveErr)
eveJS, err := jetstream.New(eveNC)
if err != nil {
t.Fatalf("eve jetstream: %v", err)
}
// (a) The KV API: binding the bucket requires STREAM.INFO.KV_UNIBUS_users, which
// eve has no permission for, so this must fail (no leak of users).
kvCtx, kvCancel := freshCtx(2 * time.Second)
if kv, err := eveJS.KeyValue(kvCtx, "UNIBUS_users"); err == nil {
if e, gerr := kv.Get(kvCtx, hex.EncodeToString(ceo.SignPub)); gerr == nil {
kvCancel()
t.Fatalf("eve read the control-plane KV users bucket: %q (N2 leak still open)", string(e.Value()))
}
kvCancel()
t.Fatalf("eve was able to BIND the KV users bucket (N2 leak still open)")
}
kvCancel()
// (b) The raw KV subject space: a direct subscribe must be a permissions
// violation (delivered async to the error handler).
drain(eveErr)
if _, err := eveNC.Subscribe("$KV.UNIBUS_users.>", func(*nats.Msg) {}); err != nil {
t.Fatalf("eve sub $KV: %v", err)
}
_ = eveNC.Flush()
if e := waitErr(eveErr, 1*time.Second); e == nil {
t.Fatalf("eve subscribing to $KV.UNIBUS_users.> must raise a permissions violation")
}
// --- edge: eve cannot reach another room's stream API ---------------------
edgeCtx, edgeCancel := freshCtx(2 * time.Second)
if _, err := eveJS.Stream(edgeCtx, "UNIBUS_PRIVROOM"); err == nil {
edgeCancel()
t.Fatalf("eve reached the foreign room stream API (cross-room JS not isolated)")
}
edgeCancel()
// --- golden: ceo can drive the JetStream API of HER OWN room's stream ------
ceoErr := make(chan error, 4)
ceoNC := nkeyConn(t, url, ceo, ceoErr)
ceoJS, err := jetstream.New(ceoNC)
if err != nil {
t.Fatalf("ceo jetstream: %v", err)
}
goldenCtx, goldenCancel := freshCtx(5 * time.Second)
defer goldenCancel()
if _, err := ceoJS.CreateOrUpdateStream(goldenCtx, jetstream.StreamConfig{
Name: "UNIBUS_PRIVROOM",
Subjects: []string{"room.board.ma-deal"},
Storage: jetstream.FileStorage,
}); err != nil {
t.Fatalf("ceo could not manage her OWN room stream (per-room JS broken): %v", err)
}
}