Merge issue/0001e-migrate-clients: secure-by-default clients, bus-auth enforce
Phase 0001e of issue 0001. client.Connect(caPath) is the single seam every peer uses: with the bundled ca.crt it connects with TLS + nkey and signs the control plane (enforce); without it, legacy plaintext dev. worker/chat gain --ca, the mobile NewSession gains caPath, membershipd gains --tls-cert/--tls-key and turns on the nkey authenticator under enforce. dev/feature_flags.json declares the target state (bus-auth enforce, bus-tls on); the gateway and unibots migrations are documented as notes (dev/0001e-remaining-clients.md). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+13
-12
@@ -27,11 +27,12 @@ import (
|
||||
|
||||
func main() {
|
||||
var (
|
||||
natsURL = flag.String("nats-url", "nats://127.0.0.1:4250", "NATS url")
|
||||
ctrlURL = flag.String("ctrl-url", "http://127.0.0.1:8470", "membershipd control-plane url")
|
||||
roomSub = flag.String("room", "proc.test.ticks", "room subject to subscribe to")
|
||||
idFile = flag.String("id-file", "./local_files/chat.id", "identity file path")
|
||||
demoEnc = flag.Bool("demo-encrypted", false, "run the encrypted forward-secrecy demo")
|
||||
natsURL = flag.String("nats-url", "nats://127.0.0.1:4250", "NATS url")
|
||||
ctrlURL = flag.String("ctrl-url", "http://127.0.0.1:8470", "membershipd control-plane url")
|
||||
roomSub = flag.String("room", "proc.test.ticks", "room subject to subscribe to")
|
||||
idFile = flag.String("id-file", "./local_files/chat.id", "identity file path")
|
||||
demoEnc = flag.Bool("demo-encrypted", false, "run the encrypted forward-secrecy demo")
|
||||
caFile = flag.String("ca", "", "path to the bus CA cert (ca.crt); set to connect with TLS + nkey to a secured bus")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
@@ -39,19 +40,19 @@ func main() {
|
||||
log.SetPrefix("[chat] ")
|
||||
|
||||
if *demoEnc {
|
||||
runEncryptedDemo(*natsURL, *ctrlURL)
|
||||
runEncryptedDemo(*natsURL, *ctrlURL, *caFile)
|
||||
return
|
||||
}
|
||||
runSimple(*natsURL, *ctrlURL, *roomSub, *idFile)
|
||||
runSimple(*natsURL, *ctrlURL, *roomSub, *idFile, *caFile)
|
||||
}
|
||||
|
||||
// runSimple subscribes to a cleartext subject and prints messages live.
|
||||
func runSimple(natsURL, ctrlURL, roomSub, idFile string) {
|
||||
func runSimple(natsURL, ctrlURL, roomSub, idFile, caFile string) {
|
||||
id, err := client.LoadOrCreateIdentity(idFile)
|
||||
if err != nil {
|
||||
log.Fatalf("identity: %v", err)
|
||||
}
|
||||
c, err := client.New(natsURL, ctrlURL, id)
|
||||
c, err := client.Connect(natsURL, ctrlURL, id, caFile)
|
||||
if err != nil {
|
||||
log.Fatalf("connect: %v", err)
|
||||
}
|
||||
@@ -91,7 +92,7 @@ func shortID(id string) string {
|
||||
}
|
||||
|
||||
// runEncryptedDemo proves E2E encryption + forward secrecy end-to-end.
|
||||
func runEncryptedDemo(natsURL, ctrlURL string) {
|
||||
func runEncryptedDemo(natsURL, ctrlURL, caFile string) {
|
||||
log.Printf("=== encrypted forward-secrecy demo ===")
|
||||
pass := true
|
||||
check := func(name string, ok bool) {
|
||||
@@ -109,10 +110,10 @@ func runEncryptedDemo(natsURL, ctrlURL string) {
|
||||
idB, err := newEphemeralIdentity()
|
||||
must(err, "generate B identity")
|
||||
|
||||
a, err := client.New(natsURL, ctrlURL, idA)
|
||||
a, err := client.Connect(natsURL, ctrlURL, idA, caFile)
|
||||
must(err, "connect A")
|
||||
defer a.Close()
|
||||
b, err := client.New(natsURL, ctrlURL, idB)
|
||||
b, err := client.Connect(natsURL, ctrlURL, idB, caFile)
|
||||
must(err, "connect B")
|
||||
defer b.Close()
|
||||
|
||||
|
||||
+45
-19
@@ -17,6 +17,7 @@ import (
|
||||
server "github.com/nats-io/nats-server/v2/server"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
)
|
||||
@@ -41,6 +42,8 @@ func main() {
|
||||
natsPort = flag.Int("nats-port", 4250, "embedded NATS listen port (when --nats-url empty)")
|
||||
natsStore = flag.String("nats-store", "./local_files/jetstream", "embedded JetStream store dir")
|
||||
busAuth = flag.String("bus-auth", "off", "control-plane auth rollout: off|soft|enforce (feature flag bus-auth)")
|
||||
tlsCert = flag.String("tls-cert", "", "PATH to the NATS server certificate (deploy/tls/server.crt); enables TLS on the embedded data plane")
|
||||
tlsKey = flag.String("tls-key", "", "path to the NATS server private key (deploy/tls/server.key); required with --tls-cert")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
@@ -52,25 +55,8 @@ func main() {
|
||||
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
|
||||
log.SetPrefix("[membershipd] ")
|
||||
|
||||
// Data plane: embedded or external NATS.
|
||||
var ns *server.Server
|
||||
natsClientURL := *natsURL
|
||||
if natsClientURL == "" {
|
||||
var err error
|
||||
// Bind the embedded NATS to the same interface as the HTTP API so a single
|
||||
// --bind flag governs reachability: 127.0.0.1 keeps the whole stack
|
||||
// loopback-only; 0.0.0.0 exposes both planes to the LAN.
|
||||
ns, err = embeddednats.StartHost(*natsStore, *bind, *natsPort)
|
||||
if err != nil {
|
||||
log.Fatalf("start embedded nats: %v", err)
|
||||
}
|
||||
natsClientURL = embeddednats.ClientURL(ns)
|
||||
log.Printf("embedded NATS (JetStream) ready: %s", natsClientURL)
|
||||
} else {
|
||||
log.Printf("using external NATS: %s", natsClientURL)
|
||||
}
|
||||
|
||||
// Control plane: SQLite store + blob store + HTTP API.
|
||||
// Control plane store first: the NATS authenticator consults IsAuthorized, so
|
||||
// the store must exist before the embedded server starts.
|
||||
store, err := membership.Open(*dbPath)
|
||||
if err != nil {
|
||||
log.Fatalf("open membership store: %v", err)
|
||||
@@ -84,6 +70,46 @@ func main() {
|
||||
}
|
||||
log.Printf("blob store: %s", *storeDir)
|
||||
|
||||
// Data plane: embedded or external NATS. For the embedded server, enforce
|
||||
// turns on the nkey authenticator (only allowlisted identities may connect)
|
||||
// and --tls-cert/--tls-key turn on TLS. An external NATS manages its own
|
||||
// auth/TLS, so those flags do not apply to it.
|
||||
var ns *server.Server
|
||||
natsClientURL := *natsURL
|
||||
if natsClientURL == "" {
|
||||
cfg := embeddednats.ServerConfig{
|
||||
// Bind the embedded NATS to the same interface as the HTTP API so a
|
||||
// single --bind flag governs reachability: 127.0.0.1 keeps the whole
|
||||
// stack loopback-only; 0.0.0.0 exposes both planes to the LAN.
|
||||
StoreDir: *natsStore,
|
||||
Host: *bind,
|
||||
Port: *natsPort,
|
||||
}
|
||||
if authMode == membership.AuthEnforce {
|
||||
cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized)
|
||||
log.Printf("NATS nkey authentication: ON (enforce)")
|
||||
}
|
||||
if *tlsCert != "" || *tlsKey != "" {
|
||||
if *tlsCert == "" || *tlsKey == "" {
|
||||
log.Fatalf("--tls-cert and --tls-key must be set together")
|
||||
}
|
||||
tlsCfg, err := busauth.ServerTLSConfig(*tlsCert, *tlsKey)
|
||||
if err != nil {
|
||||
log.Fatalf("load NATS TLS: %v", err)
|
||||
}
|
||||
cfg.TLS = tlsCfg
|
||||
log.Printf("NATS TLS: ON (%s)", *tlsCert)
|
||||
}
|
||||
ns, err = embeddednats.StartServer(cfg)
|
||||
if err != nil {
|
||||
log.Fatalf("start embedded nats: %v", err)
|
||||
}
|
||||
natsClientURL = embeddednats.ClientURL(ns)
|
||||
log.Printf("embedded NATS (JetStream) ready: %s", natsClientURL)
|
||||
} else {
|
||||
log.Printf("using external NATS: %s", natsClientURL)
|
||||
}
|
||||
|
||||
srv := membership.NewServer(store, blobs, authMode)
|
||||
log.Printf("control-plane auth: %s", authMode)
|
||||
addr := *bind + ":" + *httpPort
|
||||
|
||||
+2
-1
@@ -23,6 +23,7 @@ func main() {
|
||||
ctrlURL = flag.String("ctrl-url", "http://127.0.0.1:8470", "membershipd control-plane url")
|
||||
roomSub = flag.String("room", "proc.test.ticks", "room subject to publish to")
|
||||
idFile = flag.String("id-file", "./local_files/worker.id", "identity file path")
|
||||
caFile = flag.String("ca", "", "path to the bus CA cert (ca.crt); set to connect with TLS + nkey to a secured bus")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
@@ -33,7 +34,7 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatalf("identity: %v", err)
|
||||
}
|
||||
c, err := client.New(*natsURL, *ctrlURL, id)
|
||||
c, err := client.Connect(*natsURL, *ctrlURL, id, *caFile)
|
||||
if err != nil {
|
||||
log.Fatalf("connect: %v", err)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
# Issue 0001e — remaining client migrations (notes, NOT implemented)
|
||||
|
||||
Phase 0001e migrated the first-class Go clients and the mobile binding to the
|
||||
secure connection path (`client.Connect(caPath)` → TLS + nkey; control-plane
|
||||
requests are always signed). Two consumers are intentionally **left as notes**
|
||||
because they live outside this sub-repo or need their own coordination:
|
||||
|
||||
## 1. Web gateway (`playground/server.go`)
|
||||
|
||||
The playground is a local dev gateway that embeds its own membershipd
|
||||
(`membership.NewServer(..., AuthOff)`) and an open embedded NATS, and connects
|
||||
browser sessions through an in-process client. To run it against a **secured**
|
||||
bus it would need:
|
||||
|
||||
- Connect its internal client via `client.Connect(natsURL, ctrlURL, id, caPath)`
|
||||
with the bundled `ca.crt` (it currently builds the client without options).
|
||||
- If it should itself enforce auth on the browser-facing side, start its
|
||||
embedded membershipd with an auth mode and its embedded NATS with
|
||||
`embeddednats.StartServer(ServerConfig{Auth: ..., TLS: ...})` — but a local
|
||||
dev gateway typically stays open and only the *upstream* bus is secured.
|
||||
- The gateway's own bus identity must be registered in the upstream allowlist
|
||||
(`membershipd user add`).
|
||||
|
||||
Decision: left at `AuthOff` + plaintext for now (local dev tool). Migrate when
|
||||
the gateway is pointed at the public bus.
|
||||
|
||||
## 2. unibots (`shell/transportunibus`, in the agents repo — NOT this sub-repo)
|
||||
|
||||
The bot transport lives in the `agents_and_robots` / message_bus consumer, not
|
||||
in `dataforge/unibus`. To talk to the secured bus it must, after recompiling
|
||||
against this `pkg/client`:
|
||||
|
||||
- Switch its connect call to `client.Connect(natsURL, ctrlURL, id, caPath)`,
|
||||
passing the path to the bundled `ca.crt`.
|
||||
- Ship `ca.crt` alongside the bot binary (read-only) and point `caPath` at it.
|
||||
- Register each bot's identity (`hex(SignPub)`) in the bus allowlist via
|
||||
`membershipd user add --handle <bot> --sign-pub <hex>` on the bus host.
|
||||
- Run as `systemd --user` with `caPath` set, per the deploy plan (0001f).
|
||||
|
||||
No code change is possible from this sub-repo; this is the contract the bot
|
||||
transport consumes.
|
||||
|
||||
## Server enablement (operator, phase 0001f)
|
||||
|
||||
`membershipd` now accepts:
|
||||
|
||||
- `--bus-auth enforce` — verify signed control-plane requests AND turn on the
|
||||
NATS nkey authenticator (only allowlisted identities connect).
|
||||
- `--tls-cert deploy/tls/server.crt --tls-key deploy/tls/server.key` — present
|
||||
the server certificate and require TLS on the embedded NATS.
|
||||
|
||||
`dev/feature_flags.json` now declares both `bus-auth: enforce` and
|
||||
`bus-tls: enabled` as the project's target state. The flags are declarative;
|
||||
the operator activates them at deploy time with the flags above. The CLI
|
||||
defaults remain off so local dev and the test suite are unaffected.
|
||||
@@ -1,19 +1,19 @@
|
||||
{
|
||||
"flags": {
|
||||
"bus-auth": {
|
||||
"enabled": false,
|
||||
"state": "off",
|
||||
"enabled": true,
|
||||
"state": "enforce",
|
||||
"issue": "0001",
|
||||
"description": "Signed control-plane auth + NATS nkey auth. Rollout: off -> soft (verify+log, allow) -> enforce (reject). 'enabled' mirrors state!=off.",
|
||||
"description": "Signed control-plane auth + NATS nkey auth. Rollout: off -> soft (verify+log, allow) -> enforce (reject). 'enabled' mirrors state!=off. Server opts in via membershipd --bus-auth; clients via client.Connect(caPath).",
|
||||
"added": "2026-06-07",
|
||||
"enabled_at": null
|
||||
"enabled_at": "2026-06-07"
|
||||
},
|
||||
"bus-tls": {
|
||||
"enabled": false,
|
||||
"enabled": true,
|
||||
"issue": "0001",
|
||||
"description": "TLS on the NATS data plane using the project's self-signed CA (deploy/tls/). When enabled the server presents its cert and clients pin the CA.",
|
||||
"description": "TLS on the NATS data plane using the project's self-signed CA (deploy/tls/). Server opts in via membershipd --tls-cert/--tls-key; clients pin ca.crt via client.Connect(caPath).",
|
||||
"added": "2026-06-07",
|
||||
"enabled_at": null
|
||||
"enabled_at": "2026-06-07"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+8
-4
@@ -44,14 +44,18 @@ func GenerateIdentity(path string) error {
|
||||
}
|
||||
|
||||
// NewSession loads the identity at idPath and connects to the bus. natsURL is
|
||||
// the data plane (for example nats://host:4250) and ctrlURL is the control
|
||||
// plane HTTP endpoint (for example http://host:8470).
|
||||
func NewSession(idPath, natsURL, ctrlURL string) (*Session, error) {
|
||||
// the data plane (for example tls://host:4250) and ctrlURL is the control plane
|
||||
// HTTP endpoint (for example http://host:8470). caPath is the path to the bus
|
||||
// CA certificate (ca.crt) bundled with the app: when set, the session connects
|
||||
// securely (TLS pinned to that CA + nkey authentication on the data plane),
|
||||
// matching a bus running with auth + TLS. Pass an empty caPath to connect in
|
||||
// plaintext to an unsecured (dev) bus.
|
||||
func NewSession(idPath, natsURL, ctrlURL, caPath string) (*Session, error) {
|
||||
id, err := client.LoadOrCreateIdentity(idPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c, err := client.New(natsURL, ctrlURL, id)
|
||||
c, err := client.Connect(natsURL, ctrlURL, id, caPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -79,6 +79,24 @@ func New(natsURL, ctrlURL string, id cs.Identity) (*Client, error) {
|
||||
return NewWithOptions(natsURL, ctrlURL, id, Options{})
|
||||
}
|
||||
|
||||
// Connect is the single migration seam every peer (worker, chat, mobile,
|
||||
// gateway) uses to pick its security posture from one input: the CA path. With
|
||||
// a non-empty caPath it connects securely — TLS pinned to that CA plus nkey
|
||||
// authentication on the data plane — matching a bus running with bus-auth
|
||||
// enforce + bus-tls. With an empty caPath it falls back to the legacy plaintext,
|
||||
// no-nkey connection for local dev against an unsecured bus. The control-plane
|
||||
// HTTP requests are signed in both cases (that signing is unconditional).
|
||||
func Connect(natsURL, ctrlURL string, id cs.Identity, caPath string) (*Client, error) {
|
||||
if caPath == "" {
|
||||
return New(natsURL, ctrlURL, id)
|
||||
}
|
||||
tlsCfg, err := busauth.LoadCATLSConfig(caPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("client: load CA %q: %w", caPath, err)
|
||||
}
|
||||
return NewWithOptions(natsURL, ctrlURL, id, Options{UseNkey: true, TLS: tlsCfg})
|
||||
}
|
||||
|
||||
// NewWithOptions is New with explicit connection options (nkey auth, and, from
|
||||
// phase 0001d, TLS). It is the single place the data-plane connection is built,
|
||||
// so every peer (worker, chat, mobile, gateway) gets identical behavior by
|
||||
|
||||
@@ -7,13 +7,16 @@ import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
"encoding/hex"
|
||||
"encoding/pem"
|
||||
"math/big"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
"github.com/enmanuel/unibus/pkg/room"
|
||||
)
|
||||
@@ -111,3 +114,72 @@ func TestNatsTLS(t *testing.T) {
|
||||
t.Fatalf("client without the CA must fail the TLS handshake")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSecureBusEndToEnd is the headline golden of issue 0001: with ALL three
|
||||
// layers active at once — control-plane request signing (enforce), NATS nkey
|
||||
// auth, and TLS — two registered peers run an encrypted room end to end. A
|
||||
// creates a Matrix-policy room, invites B, A publishes and B decrypts. This
|
||||
// proves the layers compose: signed HTTP control plane + authenticated,
|
||||
// encrypted data plane + E2E room content.
|
||||
func TestSecureBusEndToEnd(t *testing.T) {
|
||||
serverTLS, caPool := genTestCA(t)
|
||||
h := bootHarness(t, membership.AuthEnforce, true, serverTLS)
|
||||
waitHealth(t, h.ctrlURL)
|
||||
|
||||
clientTLS := &tls.Config{RootCAs: caPool, MinVersion: tls.VersionTLS12}
|
||||
secure := func(t *testing.T, handle string) (*client.Client, membership.AuthMode) {
|
||||
id := mustIdentity(t)
|
||||
if err := h.store.AddUser(hex.EncodeToString(id.SignPub), handle, membership.RoleMember); err != nil {
|
||||
t.Fatalf("register %s: %v", handle, err)
|
||||
}
|
||||
c, err := client.NewWithOptions(h.natsURL, h.ctrlURL, id, client.Options{UseNkey: true, TLS: clientTLS})
|
||||
if err != nil {
|
||||
t.Fatalf("connect %s securely: %v", handle, err)
|
||||
}
|
||||
return c, 0
|
||||
}
|
||||
|
||||
a, _ := secure(t, "alice")
|
||||
defer a.Close()
|
||||
b, _ := secure(t, "bob")
|
||||
defer b.Close()
|
||||
|
||||
roomID, err := a.CreateRoom("room.secure", room.ModeMatrix)
|
||||
if err != nil {
|
||||
t.Fatalf("A create encrypted room over secure bus: %v", err)
|
||||
}
|
||||
if err := a.Invite(roomID, b.Endpoint()); err != nil {
|
||||
t.Fatalf("A invite B: %v", err)
|
||||
}
|
||||
if err := b.Join(roomID); err != nil {
|
||||
t.Fatalf("B join: %v", err)
|
||||
}
|
||||
|
||||
var mu sync.Mutex
|
||||
var got []string
|
||||
sub, err := b.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
|
||||
mu.Lock()
|
||||
got = append(got, string(plaintext))
|
||||
mu.Unlock()
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("B subscribe: %v", err)
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
const msg = "mensaje sobre bus seguro (auth+TLS+E2E)"
|
||||
if err := a.Publish(roomID, []byte(msg)); err != nil {
|
||||
t.Fatalf("A publish: %v", err)
|
||||
}
|
||||
if !waitFor(&mu, &got, func(rs []string) bool {
|
||||
for _, r := range rs {
|
||||
if r == msg {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}, 2*time.Second) {
|
||||
t.Fatalf("B did not receive/decrypt the message over the secured bus; got %v", snapshot(&mu, &got))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -861,7 +861,8 @@ func main() {
|
||||
log.Fatalf("open blob store: %v", err)
|
||||
}
|
||||
// AuthOff: the playground is a local dev gateway that has not migrated to
|
||||
// signed control-plane requests yet (tracked in phase 0001e of issue 0001).
|
||||
// signed control-plane requests or a secured upstream bus yet. What it would
|
||||
// need is written up in dev/0001e-remaining-clients.md (issue 0001, phase 0001e).
|
||||
ctrlSrv := &http.Server{Addr: ctrlAddr, Handler: membership.NewServer(store, blobs, membership.AuthOff)}
|
||||
go func() {
|
||||
if err := ctrlSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
|
||||
Reference in New Issue
Block a user