diff --git a/pkg/busauth/authenticator.go b/pkg/busauth/authenticator.go index 3de74a0..ddd1726 100644 --- a/pkg/busauth/authenticator.go +++ b/pkg/busauth/authenticator.go @@ -27,31 +27,88 @@ func NewNkeyAuthenticator(isAuthorized func(signPubHex string) bool) server.Auth // Check verifies the client's nkey signature against the nonce the server // presented, then maps the nkey to its allowlist key and checks authorization. -// Any malformed input or failed verification yields false (fail closed). The -// signature decoding mirrors nats-server's own (raw-url base64, then std base64 -// fallback) so genuine clients using nats.Nkey are accepted unchanged. +// Any malformed input or failed verification yields false (fail closed). func (a *nkeyAuthenticator) Check(c server.ClientAuthentication) bool { + signPubHex, ok := verifyNkey(c) + if !ok { + return false + } + return a.isAuthorized(signPubHex) +} + +// verifyNkey performs the shared nkey verification: it checks the client's +// signature against the server-presented nonce and returns the lowercase-hex +// Ed25519 public key behind the nkey. ok is false on any malformed input or +// failed verification (fail closed). The signature decoding mirrors +// nats-server's own (raw-url base64, then std base64 fallback) so genuine +// clients using nats.Nkey are accepted unchanged. +func verifyNkey(c server.ClientAuthentication) (signPubHex string, ok bool) { opts := c.GetOpts() if opts.Nkey == "" { - return false + return "", false } sig, err := base64.RawURLEncoding.DecodeString(opts.Sig) if err != nil { sig, err = base64.StdEncoding.DecodeString(opts.Sig) if err != nil { - return false + return "", false } } pub, err := nkeys.FromPublicKey(opts.Nkey) if err != nil { - return false + return "", false } if err := pub.Verify(c.GetNonce(), sig); err != nil { - return false + return "", false } - signPubHex, err := SignPubHexFromNkey(opts.Nkey) + signPubHex, err = SignPubHexFromNkey(opts.Nkey) if err != nil { + return "", false + } + return signPubHex, true +} + +// PermissionsFunc maps a connecting identity (lowercase-hex Ed25519 signing key) +// to the NATS permissions it should be granted for this connection. Returning an +// error denies the connection (fail closed). It is how the data plane enforces +// per-subject access from room membership (issue 0003e, audit H4 residual). +type PermissionsFunc func(signPubHex string) (*server.Permissions, error) + +// nkeyAuthenticatorACL is the nkey authenticator that ALSO scopes the connection +// to per-subject permissions derived from room membership. NATS evaluates +// permissions once, at connect time, so a peer that joins a room after +// connecting must reconnect (client.RefreshSession) to gain that room's subject +// — the dynamic-membership reconnection model the audit deferred to this issue. +type nkeyAuthenticatorACL struct { + isAuthorized func(signPubHex string) bool + perms PermissionsFunc +} + +// NewNkeyAuthenticatorACL builds an authenticator that authorizes by the bus +// allowlist AND registers per-subject permissions from perms. A registered but +// permission-less peer can no longer subscribe to or publish on arbitrary +// subjects: it is confined to the subjects of the rooms it belongs to (plus the +// client infrastructure subjects perms includes). This is the per-subject ACL +// the 0004 hardening left as a residual. +func NewNkeyAuthenticatorACL(isAuthorized func(signPubHex string) bool, perms PermissionsFunc) server.Authentication { + return &nkeyAuthenticatorACL{isAuthorized: isAuthorized, perms: perms} +} + +// Check verifies the nkey, authorizes against the allowlist, then derives and +// registers the connection's subject permissions. A permissions-derivation +// error denies the connection (fail closed) rather than granting open access. +func (a *nkeyAuthenticatorACL) Check(c server.ClientAuthentication) bool { + signPubHex, ok := verifyNkey(c) + if !ok { return false } - return a.isAuthorized(signPubHex) + if !a.isAuthorized(signPubHex) { + return false + } + perms, err := a.perms(signPubHex) + if err != nil { + return false // fail closed: never grant open access on a derivation error + } + c.RegisterUser(&server.User{Permissions: perms}) + return true } diff --git a/pkg/client/client.go b/pkg/client/client.go index bcd7f95..2768f40 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -54,6 +54,11 @@ type Client struct { ctrlURLs []string // control-plane HTTP endpoints, tried in order (failover) http *http.Client + // natsServers + natsOpts are retained so RefreshSession can rebuild the + // data-plane connection (re-triggering the server's subject-ACL evaluation). + natsServers []string + natsOpts []nats.Option + mu sync.RWMutex keyCache map[string]map[int][]byte // roomID -> epoch -> K signCache map[string][]byte // sender endpoint -> sign pub (for verification) @@ -187,17 +192,50 @@ func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Cli httpClient.Transport = &http.Transport{TLSClientConfig: opts.CtrlTLS.Clone()} } return &Client{ - id: id, - endpoint: frame.EndpointID(id.SignPub), - nc: nc, - js: js, - ctrlURLs: dedupNonEmpty(append([]string{ctrlURL}, opts.CtrlURLs...)), - http: httpClient, - keyCache: map[string]map[int][]byte{}, - signCache: map[string][]byte{}, + id: id, + endpoint: frame.EndpointID(id.SignPub), + nc: nc, + js: js, + ctrlURLs: dedupNonEmpty(append([]string{ctrlURL}, opts.CtrlURLs...)), + http: httpClient, + natsServers: natsServers, + natsOpts: natsOpts, + keyCache: map[string]map[int][]byte{}, + signCache: map[string][]byte{}, }, nil } +// RefreshSession rebuilds the data-plane NATS connection so the server's +// subject-ACL authenticator re-evaluates this peer's room membership (issue +// 0003e, audit H4 residual). Call it after a membership change — a room you +// created, were invited to, or joined — when the bus enforces per-subject +// permissions, so the new room's subject becomes publishable and subscribable +// (NATS freezes permissions at connect time, so the prior connection cannot see +// the new room). +// +// It opens a fresh connection with the same seeds/options and swaps it in. +// IMPORTANT: active subscriptions from the previous connection are dropped — +// re-subscribe (client.Subscribe) to your rooms after calling this. The key and +// signer caches are preserved. On a non-ACL bus this is a no-op-safe reconnect. +func (c *Client) RefreshSession() error { + nc, err := nats.Connect(strings.Join(c.natsServers, ","), c.natsOpts...) + if err != nil { + return fmt.Errorf("client: refresh session: reconnect nats: %w", err) + } + js, err := jetstream.New(nc) + if err != nil { + nc.Close() + return fmt.Errorf("client: refresh session: init jetstream: %w", err) + } + old := c.nc + c.mu.Lock() + c.nc = nc + c.js = js + c.mu.Unlock() + old.Close() + return nil +} + // Endpoint returns this client's public identity. func (c *Client) Endpoint() Endpoint { return Endpoint{ID: c.endpoint, SignPub: c.id.SignPub, KexPub: c.id.KexPub} diff --git a/pkg/membership/acl.go b/pkg/membership/acl.go new file mode 100644 index 0000000..1443e24 --- /dev/null +++ b/pkg/membership/acl.go @@ -0,0 +1,52 @@ +package membership + +// Per-subject data-plane access control derived from room membership (issue +// 0003e, audit H4 residual). The control plane already authorizes metadata by +// membership; this is the matching restriction on the NATS data plane so a +// registered peer can only publish/subscribe on the subjects of the rooms it +// actually belongs to — not on every subject on the bus. + +import ( + "encoding/hex" + "fmt" + + "github.com/enmanuel/unibus/pkg/frame" +) + +// clientInfraSubjects are the subjects every peer needs regardless of room +// membership: the request/reply inbox space and the JetStream API (the durable +// plane of persisted rooms). They are granted to all authorized peers so +// request/reply and persisted-room history keep working under the subject ACL. +var clientInfraSubjects = []string{"_INBOX.>", "$JS.API.>"} + +// SubjectACLFor returns a function that maps a signing public key (lowercase +// hex) to the data-plane subjects that identity may publish and subscribe to: +// the subject of every room it belongs to, plus the client infrastructure +// subjects. It reads the live membership store, so the permissions reflect the +// identity's rooms at the moment it connects. A decode error or a store failure +// is returned as an error so the caller can fail closed (deny the connection) +// rather than grant open access. +// +// Because NATS freezes permissions at connect time, a peer invited to a new room +// after connecting must reconnect (client.RefreshSession) to pick up the new +// room's subject. The bus is the authoritative directory of subjects, so an +// unlisted subject is simply absent from the allow set. +func SubjectACLFor(store Store) func(signPubHex string) ([]string, error) { + return func(signPubHex string) ([]string, error) { + pub, err := hex.DecodeString(signPubHex) + if err != nil || len(pub) != 32 { + return nil, fmt.Errorf("acl: malformed sign pub %q", signPubHex) + } + endpoint := frame.EndpointID(pub) + rooms, err := store.ListRoomsForEndpoint(endpoint) + if err != nil { + return nil, fmt.Errorf("acl: list rooms for %s: %w", endpoint, err) + } + subjects := make([]string, 0, len(rooms)+len(clientInfraSubjects)) + subjects = append(subjects, clientInfraSubjects...) + for _, r := range rooms { + subjects = append(subjects, r.Subject) + } + return subjects, nil + } +} diff --git a/pkg/membership/acl_test.go b/pkg/membership/acl_test.go new file mode 100644 index 0000000..3b0c519 --- /dev/null +++ b/pkg/membership/acl_test.go @@ -0,0 +1,290 @@ +package membership_test + +import ( + "encoding/hex" + "net" + "net/http/httptest" + "path/filepath" + "testing" + "time" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/blobstore" + "github.com/enmanuel/unibus/pkg/busauth" + "github.com/enmanuel/unibus/pkg/client" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/enmanuel/unibus/pkg/membership" + "github.com/nats-io/nats.go" + server "github.com/nats-io/nats-server/v2/server" +) + +func aclFreePort(t *testing.T) int { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("free port: %v", err) + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port +} + +func mustID(t *testing.T) cs.Identity { + t.Helper() + id, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("identity: %v", err) + } + return id +} + +// aclPermsFunc adapts membership.SubjectACLFor into the busauth.PermissionsFunc +// the ACL authenticator expects (same Allow set for publish and subscribe). +func aclPermsFunc(store membership.Store) busauth.PermissionsFunc { + derive := membership.SubjectACLFor(store) + return func(signPubHex string) (*server.Permissions, error) { + subs, err := derive(signPubHex) + if err != nil { + return nil, err + } + sp := &server.SubjectPermission{Allow: subs} + return &server.Permissions{Publish: sp, Subscribe: sp}, nil + } +} + +// startACLNats boots an embedded NATS whose authenticator confines each peer to +// the subjects of the rooms it belongs to (audit H4 residual). +func startACLNats(t *testing.T, store membership.Store) *server.Server { + t.Helper() + auth := busauth.NewNkeyAuthenticatorACL(store.IsAuthorized, aclPermsFunc(store)) + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), Host: "127.0.0.1", Port: aclFreePort(t), Auth: auth, + }) + if err != nil { + t.Fatalf("acl nats: %v", err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + return ns +} + +func nkeyConn(t *testing.T, natsURL string, id cs.Identity, errCh chan error) *nats.Conn { + t.Helper() + pub, sign, err := busauth.ClientNkey(id.SignPriv) + if err != nil { + t.Fatalf("nkey: %v", err) + } + nc, err := nats.Connect(natsURL, + nats.Nkey(pub, sign), + nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, e error) { + select { + case errCh <- e: + default: + } + }), + ) + if err != nil { + t.Fatalf("connect nkey: %v", err) + } + t.Cleanup(nc.Close) + return nc +} + +func mustAddUser(t *testing.T, store membership.Store, id cs.Identity, handle string) { + t.Helper() + if err := store.AddUser(hex.EncodeToString(id.SignPub), handle, membership.RoleMember); err != nil { + t.Fatalf("add user %s: %v", handle, err) + } +} + +func mustCreateRoom(t *testing.T, store membership.Store, roomID, subject, ownerEP string, owner cs.Identity) { + t.Helper() + info := membership.RoomInfo{RoomID: roomID, Subject: subject, OwnerEndpoint: ownerEP} + if err := store.CreateRoom(info, owner.SignPub, owner.KexPub, nil); err != nil { + t.Fatalf("create room %s: %v", roomID, err) + } +} + +func newCtrl(t *testing.T, store membership.Store, blobs blobstore.Store) string { + t.Helper() + ts := httptest.NewServer(membership.NewServer(store, blobs, membership.AuthOff)) + t.Cleanup(ts.Close) + return ts.URL +} + +func waitErr(ch chan error, d time.Duration) error { + select { + case e := <-ch: + return e + case <-time.After(d): + return nil + } +} + +func drain(ch chan error) { + for { + select { + case <-ch: + default: + return + } + } +} + +// TestSubjectACLIsolation closes the audit H4 residual: a registered peer is +// confined to the subjects of the rooms it belongs to. alice (member of room.A) +// may sub/pub room.A but is DENIED sub/pub on room.B, and never reads what bob +// (member of room.B) publishes there. +func TestSubjectACLIsolation(t *testing.T) { + dir := t.TempDir() + store, err := membership.Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + + alice, bob := mustID(t), mustID(t) + aliceEP, bobEP := frame.EndpointID(alice.SignPub), frame.EndpointID(bob.SignPub) + mustAddUser(t, store, alice, "alice") + mustAddUser(t, store, bob, "bob") + const subjA, subjB = "room.acl.a", "room.acl.b" + mustCreateRoom(t, store, "ROOMA", subjA, aliceEP, alice) + mustCreateRoom(t, store, "ROOMB", subjB, bobEP, bob) + + srv := startACLNats(t, store) + url := srv.ClientURL() + aliceErr := make(chan error, 4) + bobErr := make(chan error, 4) + aliceNC := nkeyConn(t, url, alice, aliceErr) + bobNC := nkeyConn(t, url, bob, bobErr) + + // alice may subscribe to her own room (no error). + aliceGot := make(chan string, 4) + if _, err := aliceNC.Subscribe(subjA, func(m *nats.Msg) { aliceGot <- string(m.Data) }); err != nil { + t.Fatalf("alice sub A: %v", err) + } + _ = aliceNC.Flush() + if e := waitErr(aliceErr, 300*time.Millisecond); e != nil { + t.Fatalf("alice sub to her OWN room raised an error: %v", e) + } + + // alice subscribing to bob's room is a permissions violation. + if _, err := aliceNC.Subscribe(subjB, func(m *nats.Msg) { aliceGot <- "LEAK:" + string(m.Data) }); err != nil { + t.Fatalf("alice sub B (queue): %v", err) + } + _ = aliceNC.Flush() + if e := waitErr(aliceErr, 1*time.Second); e == nil { + t.Fatalf("alice subscribing to bob's room should raise a permissions violation") + } + + // bob publishes in his room; alice (denied) must not receive it. + bobGot := make(chan string, 4) + if _, err := bobNC.Subscribe(subjB, func(m *nats.Msg) { bobGot <- string(m.Data) }); err != nil { + t.Fatalf("bob sub B: %v", err) + } + _ = bobNC.Flush() + if err := bobNC.Publish(subjB, []byte("internal-bob")); err != nil { + t.Fatalf("bob pub B: %v", err) + } + _ = bobNC.Flush() + select { + case got := <-bobGot: + if got != "internal-bob" { + t.Fatalf("bob got %q", got) + } + case <-time.After(2 * time.Second): + t.Fatalf("bob did not receive his own message") + } + select { + case leak := <-aliceGot: + t.Fatalf("alice received bob's room traffic despite the ACL: %q", leak) + case <-time.After(500 * time.Millisecond): + // good: alice never got it + } + + // alice publishing into bob's room is denied; bob must not receive it. + drain(aliceErr) + if err := aliceNC.Publish(subjB, []byte("intruder")); err != nil { + t.Fatalf("alice pub B (queue): %v", err) + } + _ = aliceNC.Flush() + if e := waitErr(aliceErr, 1*time.Second); e == nil { + t.Fatalf("alice publishing into bob's room should raise a permissions violation") + } + select { + case got := <-bobGot: + t.Fatalf("bob received alice's cross-room publish despite the ACL: %q", got) + case <-time.After(500 * time.Millisecond): + // good + } +} + +// TestRefreshSessionGainsNewRoom is the "permissions refreshed on join" path: +// alice is not in room B, so her connection has no permission for its subject; +// after she is added to room B and calls RefreshSession, the reconnect +// re-derives her permissions and she gains the room's subject. +func TestRefreshSessionGainsNewRoom(t *testing.T) { + dir := t.TempDir() + store, err := membership.Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + + alice, bob := mustID(t), mustID(t) + aliceEP, bobEP := frame.EndpointID(alice.SignPub), frame.EndpointID(bob.SignPub) + mustAddUser(t, store, alice, "alice") + mustAddUser(t, store, bob, "bob") + const subjB = "room.refresh.b" + mustCreateRoom(t, store, "ROOMB", subjB, bobEP, bob) + + srv := startACLNats(t, store) + blobs, _ := blobstore.New(filepath.Join(dir, "blobs")) + ctrl := newCtrl(t, store, blobs) + + aliceC, err := client.NewWithOptions(srv.ClientURL(), ctrl, alice, client.Options{UseNkey: true}) + if err != nil { + t.Fatalf("connect alice: %v", err) + } + defer aliceC.Close() + + // Add alice to room B (as if invited), then RefreshSession so the + // authenticator re-derives her permissions on reconnect. + if _, err := store.GetMember("ROOMB", aliceEP); err == nil { + t.Fatalf("alice should not be a member yet") + } + if err := store.AddMember("ROOMB", membership.Member{Endpoint: aliceEP, Role: "member", SignPub: alice.SignPub, KexPub: alice.KexPub}, 1, nil); err != nil { + t.Fatalf("add alice to room B: %v", err) + } + if err := aliceC.RefreshSession(); err != nil { + t.Fatalf("refresh session: %v", err) + } + + bobErr := make(chan error, 2) + bobNC := nkeyConn(t, srv.ClientURL(), bob, bobErr) + + got := make(chan string, 2) + sub, err := aliceC.Subscribe("ROOMB", func(_ frame.Frame, plaintext []byte) { got <- string(plaintext) }) + if err != nil { + t.Fatalf("alice subscribe room B after refresh: %v", err) + } + defer sub.Unsubscribe() + time.Sleep(200 * time.Millisecond) + + // bob publishes a minimal cleartext frame on subjB. + f := frame.Frame{Type: frame.PUB, Subject: subjB, Sender: bobEP, MsgID: "m1", Payload: []byte("hello-after-join")} + b, _ := f.Marshal() + if err := bobNC.Publish(subjB, b); err != nil { + t.Fatalf("bob publish: %v", err) + } + _ = bobNC.Flush() + + select { + case msg := <-got: + if msg != "hello-after-join" { + t.Fatalf("alice got %q", msg) + } + case <-time.After(3 * time.Second): + t.Fatalf("alice did not receive room B traffic after RefreshSession (permissions not refreshed)") + } +}