diff --git a/cmd/chat/main.go b/cmd/chat/main.go index 33985cb..fb9b30d 100644 --- a/cmd/chat/main.go +++ b/cmd/chat/main.go @@ -69,6 +69,12 @@ func runSimple(natsURL, ctrlURL, roomSub, idFile, caFile string) { if err := c.Join(roomID); err != nil { log.Fatalf("join: %v", err) } + // Membership-change contract (issue 0006e): refresh so the just-created room's + // subject is subscribable under enforce+ACL (permissions are frozen at connect + // time). Must run BEFORE Subscribe — RefreshSession drops active subscriptions. + if err := c.RefreshSession(); err != nil { + log.Fatalf("refresh session after create room: %v", err) + } sub, err := c.Subscribe(roomID, func(f frame.Frame, plaintext []byte) { fmt.Printf("[%s] %s: %s\n", f.Subject, shortID(f.Sender), string(plaintext)) }) @@ -122,12 +128,21 @@ func runEncryptedDemo(natsURL, ctrlURL, caFile string) { must(err, "A create room") fmt.Printf(" room.test -> %s (E2E, persisted, signed)\n", roomID) + // Membership-change contract (issue 0006e): A only became a member of this room + // after connecting, so refresh to gain its subject + per-room JetStream API + // under enforce+ACL before publishing. + must(a.RefreshSession(), "A refresh after create room") + // A invites B (seals K to B's X25519 key). must(a.Invite(roomID, b.Endpoint()), "A invite B") // B joins (fetches + decrypts K). must(b.Join(roomID), "B join") + // B became a member via the invite above; refresh so B can subscribe to the + // room's subject under enforce+ACL (before subscribing — refresh drops subs). + must(b.RefreshSession(), "B refresh after join") + // B subscribes; capture received plaintexts. recv := make(chan string, 4) subB, err := b.Subscribe(roomID, func(f frame.Frame, plaintext []byte) { diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 1f422be..cd349a3 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -47,6 +47,13 @@ func main() { if err != nil { log.Fatalf("create room: %v", err) } + // Membership-change contract (issue 0006e): the bus freezes per-subject + // permissions at connect time, and this room did not exist then. Refresh the + // session so the new room's subject becomes publishable under enforce+ACL. On + // an unsecured/dev bus this is a harmless reconnect. + if err := c.RefreshSession(); err != nil { + log.Fatalf("refresh session after create room: %v", err) + } log.Printf("room %q -> %s (subject %s, cleartext)", *roomSub, roomID, *roomSub) stop := make(chan os.Signal, 1) diff --git a/mobile/unibus.go b/mobile/unibus.go index 8956d80..31f4cf0 100644 --- a/mobile/unibus.go +++ b/mobile/unibus.go @@ -85,6 +85,20 @@ func (s *Session) Join(roomID string) error { return s.c.Join(roomID) } +// RefreshSession reconnects the data plane so the bus re-derives this peer's +// per-subject permissions from its current room membership. +// +// Membership-change contract (issue 0006e): a secured bus (--bus-auth enforce) +// freezes a connection's permissions at connect time. After ANY membership change +// — a room you just created, were invited to, or joined — call RefreshSession +// BEFORE Publish/Subscribe on that room, or the bus denies the new room's subject. +// It also drops active subscriptions, so re-Subscribe afterwards. On an unsecured +// bus it is a harmless reconnect. A mobile/gateway caller wires this exactly like +// cmd/chat and cmd/worker do: CreateRoom -> RefreshSession -> Subscribe/Publish. +func (s *Session) RefreshSession() error { + return s.c.RefreshSession() +} + // Publish sends a UTF-8 text message to the room. func (s *Session) Publish(roomID, text string) error { return s.c.Publish(roomID, []byte(text)) diff --git a/pkg/membership/refresh_flow_test.go b/pkg/membership/refresh_flow_test.go new file mode 100644 index 0000000..f31c273 --- /dev/null +++ b/pkg/membership/refresh_flow_test.go @@ -0,0 +1,88 @@ +package membership_test + +import ( + "path/filepath" + "testing" + "time" + + "github.com/enmanuel/unibus/pkg/blobstore" + "github.com/enmanuel/unibus/pkg/client" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/enmanuel/unibus/pkg/membership" + "github.com/enmanuel/unibus/pkg/room" +) + +// TestClientCreateRoomRefreshPublishFlow is the issue 0006e DoD: under enforce+ACL +// a peer creates a room AFTER connecting, and pub/sub works without manual +// intervention because the client follows the membership-change contract +// (CreateRoom -> RefreshSession -> Subscribe/Publish), exactly as cmd/chat and +// cmd/worker now do. This is the end-to-end flow through the client API, proving +// the ACL is usable under enforce rather than something an operator must disable. +func TestClientCreateRoomRefreshPublishFlow(t *testing.T) { + dir := t.TempDir() + store, err := membership.Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + + alice, bob := mustID(t), mustID(t) + mustAddUser(t, store, alice, "alice") + mustAddUser(t, store, bob, "bob") + + srv := startACLNats(t, store) // data plane: enforce + per-subject ACL + blobs, _ := blobstore.New(filepath.Join(dir, "blobs")) + ctrl := newCtrl(t, store, blobs) + + aliceC, err := client.NewWithOptions(srv.ClientURL(), ctrl, alice, client.Options{UseNkey: true}) + if err != nil { + t.Fatalf("connect alice: %v", err) + } + defer aliceC.Close() + bobC, err := client.NewWithOptions(srv.ClientURL(), ctrl, bob, client.Options{UseNkey: true}) + if err != nil { + t.Fatalf("connect bob: %v", err) + } + defer bobC.Close() + + // alice creates a room AFTER connecting: the subject was not in her ACL at + // connect time, so she must refresh to publish on it (the worker contract). + roomID, err := aliceC.CreateRoom("room.flow.x", room.ModeNATS) + if err != nil { + t.Fatalf("alice create room: %v", err) + } + if err := aliceC.RefreshSession(); err != nil { + t.Fatalf("alice refresh: %v", err) + } + + // alice invites bob; bob joins then refreshes to gain the subject (the chat + // subscriber contract), and only then subscribes. + if err := aliceC.Invite(roomID, bobC.Endpoint()); err != nil { + t.Fatalf("alice invite bob: %v", err) + } + if err := bobC.Join(roomID); err != nil { + t.Fatalf("bob join: %v", err) + } + if err := bobC.RefreshSession(); err != nil { + t.Fatalf("bob refresh: %v", err) + } + got := make(chan string, 4) + sub, err := bobC.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) { got <- string(plaintext) }) + if err != nil { + t.Fatalf("bob subscribe after refresh: %v", err) + } + defer sub.Unsubscribe() + time.Sleep(200 * time.Millisecond) // let the subscription settle + + if err := aliceC.Publish(roomID, []byte("hello-under-acl")); err != nil { + t.Fatalf("alice publish after refresh: %v", err) + } + select { + case msg := <-got: + if msg != "hello-under-acl" { + t.Fatalf("bob got %q", msg) + } + case <-time.After(3 * time.Second): + t.Fatalf("bob did not receive the message: the create->refresh->subscribe flow is broken under enforce+ACL") + } +}