Merge issue/0006e-refresh: RefreshSession in all clients (audit 0008 N4)
This commit is contained in:
@@ -69,6 +69,12 @@ func runSimple(natsURL, ctrlURL, roomSub, idFile, caFile string) {
|
||||
if err := c.Join(roomID); err != nil {
|
||||
log.Fatalf("join: %v", err)
|
||||
}
|
||||
// Membership-change contract (issue 0006e): refresh so the just-created room's
|
||||
// subject is subscribable under enforce+ACL (permissions are frozen at connect
|
||||
// time). Must run BEFORE Subscribe — RefreshSession drops active subscriptions.
|
||||
if err := c.RefreshSession(); err != nil {
|
||||
log.Fatalf("refresh session after create room: %v", err)
|
||||
}
|
||||
sub, err := c.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
|
||||
fmt.Printf("[%s] %s: %s\n", f.Subject, shortID(f.Sender), string(plaintext))
|
||||
})
|
||||
@@ -122,12 +128,21 @@ func runEncryptedDemo(natsURL, ctrlURL, caFile string) {
|
||||
must(err, "A create room")
|
||||
fmt.Printf(" room.test -> %s (E2E, persisted, signed)\n", roomID)
|
||||
|
||||
// Membership-change contract (issue 0006e): A only became a member of this room
|
||||
// after connecting, so refresh to gain its subject + per-room JetStream API
|
||||
// under enforce+ACL before publishing.
|
||||
must(a.RefreshSession(), "A refresh after create room")
|
||||
|
||||
// A invites B (seals K to B's X25519 key).
|
||||
must(a.Invite(roomID, b.Endpoint()), "A invite B")
|
||||
|
||||
// B joins (fetches + decrypts K).
|
||||
must(b.Join(roomID), "B join")
|
||||
|
||||
// B became a member via the invite above; refresh so B can subscribe to the
|
||||
// room's subject under enforce+ACL (before subscribing — refresh drops subs).
|
||||
must(b.RefreshSession(), "B refresh after join")
|
||||
|
||||
// B subscribes; capture received plaintexts.
|
||||
recv := make(chan string, 4)
|
||||
subB, err := b.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
|
||||
|
||||
@@ -47,6 +47,13 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatalf("create room: %v", err)
|
||||
}
|
||||
// Membership-change contract (issue 0006e): the bus freezes per-subject
|
||||
// permissions at connect time, and this room did not exist then. Refresh the
|
||||
// session so the new room's subject becomes publishable under enforce+ACL. On
|
||||
// an unsecured/dev bus this is a harmless reconnect.
|
||||
if err := c.RefreshSession(); err != nil {
|
||||
log.Fatalf("refresh session after create room: %v", err)
|
||||
}
|
||||
log.Printf("room %q -> %s (subject %s, cleartext)", *roomSub, roomID, *roomSub)
|
||||
|
||||
stop := make(chan os.Signal, 1)
|
||||
|
||||
@@ -85,6 +85,20 @@ func (s *Session) Join(roomID string) error {
|
||||
return s.c.Join(roomID)
|
||||
}
|
||||
|
||||
// RefreshSession reconnects the data plane so the bus re-derives this peer's
|
||||
// per-subject permissions from its current room membership.
|
||||
//
|
||||
// Membership-change contract (issue 0006e): a secured bus (--bus-auth enforce)
|
||||
// freezes a connection's permissions at connect time. After ANY membership change
|
||||
// — a room you just created, were invited to, or joined — call RefreshSession
|
||||
// BEFORE Publish/Subscribe on that room, or the bus denies the new room's subject.
|
||||
// It also drops active subscriptions, so re-Subscribe afterwards. On an unsecured
|
||||
// bus it is a harmless reconnect. A mobile/gateway caller wires this exactly like
|
||||
// cmd/chat and cmd/worker do: CreateRoom -> RefreshSession -> Subscribe/Publish.
|
||||
func (s *Session) RefreshSession() error {
|
||||
return s.c.RefreshSession()
|
||||
}
|
||||
|
||||
// Publish sends a UTF-8 text message to the room.
|
||||
func (s *Session) Publish(roomID, text string) error {
|
||||
return s.c.Publish(roomID, []byte(text))
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
package membership_test
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
"github.com/enmanuel/unibus/pkg/room"
|
||||
)
|
||||
|
||||
// TestClientCreateRoomRefreshPublishFlow is the issue 0006e DoD: under enforce+ACL
|
||||
// a peer creates a room AFTER connecting, and pub/sub works without manual
|
||||
// intervention because the client follows the membership-change contract
|
||||
// (CreateRoom -> RefreshSession -> Subscribe/Publish), exactly as cmd/chat and
|
||||
// cmd/worker now do. This is the end-to-end flow through the client API, proving
|
||||
// the ACL is usable under enforce rather than something an operator must disable.
|
||||
func TestClientCreateRoomRefreshPublishFlow(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
|
||||
alice, bob := mustID(t), mustID(t)
|
||||
mustAddUser(t, store, alice, "alice")
|
||||
mustAddUser(t, store, bob, "bob")
|
||||
|
||||
srv := startACLNats(t, store) // data plane: enforce + per-subject ACL
|
||||
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
ctrl := newCtrl(t, store, blobs)
|
||||
|
||||
aliceC, err := client.NewWithOptions(srv.ClientURL(), ctrl, alice, client.Options{UseNkey: true})
|
||||
if err != nil {
|
||||
t.Fatalf("connect alice: %v", err)
|
||||
}
|
||||
defer aliceC.Close()
|
||||
bobC, err := client.NewWithOptions(srv.ClientURL(), ctrl, bob, client.Options{UseNkey: true})
|
||||
if err != nil {
|
||||
t.Fatalf("connect bob: %v", err)
|
||||
}
|
||||
defer bobC.Close()
|
||||
|
||||
// alice creates a room AFTER connecting: the subject was not in her ACL at
|
||||
// connect time, so she must refresh to publish on it (the worker contract).
|
||||
roomID, err := aliceC.CreateRoom("room.flow.x", room.ModeNATS)
|
||||
if err != nil {
|
||||
t.Fatalf("alice create room: %v", err)
|
||||
}
|
||||
if err := aliceC.RefreshSession(); err != nil {
|
||||
t.Fatalf("alice refresh: %v", err)
|
||||
}
|
||||
|
||||
// alice invites bob; bob joins then refreshes to gain the subject (the chat
|
||||
// subscriber contract), and only then subscribes.
|
||||
if err := aliceC.Invite(roomID, bobC.Endpoint()); err != nil {
|
||||
t.Fatalf("alice invite bob: %v", err)
|
||||
}
|
||||
if err := bobC.Join(roomID); err != nil {
|
||||
t.Fatalf("bob join: %v", err)
|
||||
}
|
||||
if err := bobC.RefreshSession(); err != nil {
|
||||
t.Fatalf("bob refresh: %v", err)
|
||||
}
|
||||
got := make(chan string, 4)
|
||||
sub, err := bobC.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) { got <- string(plaintext) })
|
||||
if err != nil {
|
||||
t.Fatalf("bob subscribe after refresh: %v", err)
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
time.Sleep(200 * time.Millisecond) // let the subscription settle
|
||||
|
||||
if err := aliceC.Publish(roomID, []byte("hello-under-acl")); err != nil {
|
||||
t.Fatalf("alice publish after refresh: %v", err)
|
||||
}
|
||||
select {
|
||||
case msg := <-got:
|
||||
if msg != "hello-under-acl" {
|
||||
t.Fatalf("bob got %q", msg)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("bob did not receive the message: the create->refresh->subscribe flow is broken under enforce+ACL")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user