15 Commits

Author SHA1 Message Date
egutierrez 926b8e96af chore(0006): bump unibus to 0.8.0, close issue 0006 (cluster hardening + wiring)
All seven phases (0006a–0006g) merged: blockers N3 (replicated nonce) and N2
($JS.API.> KV leak) closed, decentralized KV store wired (--store kv), homogeneous
cluster posture enforced (N1), RefreshSession in all clients (N4), the lows
(secret out of argv, migrate guard, R1/CA docs), and the 3-node deploy material.

Full suite + every audit-0008 attack regression green; govulncheck 0 reachable.
See report 0009.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 17:33:03 +02:00
egutierrez ae39e35fb4 Merge issue/0006g-deploy: cluster deploy material (magnus+homer+datardos, R3 HA) 2026-06-07 17:31:13 +02:00
egutierrez 48a3d6be33 docs(0006g): cluster deploy material for magnus+homer+datardos (R3 HA)
Parameterized, NO-VPS-touched material to bring up unibus as a 3-node cluster.
The authoring agent ran none of it on a host; every remote-changing step is
marked HUMAN and deploy-cluster.sh defaults to a dry run.

deploy/cluster/:
- nodes.env — topology (cluster name, ports, per-node rows). Public IPs known
  (homer 141.94.69.66, datardos 51.91.100.142) pre-filled; magnus public IP and
  all WireGuard IPs are <PLACEHOLDER> for the human; scripts refuse to run while
  any remain.
- generate-cluster-certs.sh — mints a SEPARATE cluster route CA + a route cert per
  node (server+clientAuth, mutual routes) and a data-plane server cert per node
  signed by the reused client CA (../tls/ca.*); SAN = public + WG + hostname.
- membershipd-cluster.service — one unit, parameterized per node via
  /opt/unibus/cluster.env: enforce + per-subject ACL + TLS + --store kv,
  --cluster-pass-file (secret out of argv), Restart=always.
- deploy-cluster.sh — cross-build linux/amd64, generate each node's cluster.env
  (routes to the other two on the WG mesh, no userinfo), rsync + install (only
  with --yes); staggered start is manual.
- README.md — runbook: prerequisites, loopback bootstrap to seed the first admin
  into the KV (works around the user-CLI/KV chicken-and-egg), staggered bring-up,
  verify posture+quorum, scale R1->R3 in place, and the chaos test (left to 0003f
  on the real VPS).
- .gitignore — out/, build/, secrets/, *.key never committed.

bash -n passes on both scripts; go build/test unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 17:31:13 +02:00
egutierrez 24ff45ca7e Merge issue/0006f-lows: cluster secret out of argv + migrate guard + docs (audit 0008 lows) 2026-06-07 17:24:46 +02:00
egutierrez b8201a82cd fix(0006f): cluster secret out of argv, migrate-to-kv TLS guard, R1/CA docs (audit 0008 lows)
Low-severity cluster hardening from audit 0008:

- Route secret out of argv (N1-low): --cluster-pass and a nats://user:pass@host in
  --routes are visible in ps/journald. New --cluster-pass-file and the
  UNIBUS_CLUSTER_PASS env var (precedence file > env > flag); the resolved secret
  guards the route layer and is injected into bare --routes entries
  (injectRouteCreds), so peers can be listed as nats://host:6250 with no secret in
  argv. The legacy --cluster-pass stays for dev/compat.
- migrate-to-kv confidentiality (N6): refuse a remote --nats-url without --ca (the
  allowlist would travel cleartext); loopback targets are exempt (isLoopbackURL).
- Docs (N1 route CA, N3 DoS): deploy/README gains a Clustering section — use a
  SEPARATE cluster CA for routes (not the client CA), keep the secret out of argv,
  run migrate-to-kv loopback/TLS only, and R1 is a SPOF of auth (not HA); R3
  quorum is real HA. The generated cert material lives in deploy/cluster/ (0006g).

Tests:
- TestResolveClusterPass (file > env > flag precedence; missing file errors),
- TestInjectRouteCreds (injects only into userinfo-less routes; preserves overrides),
- TestIsLoopbackURL (loopback vs remote vs malformed).

CGO_ENABLED=0 go build/vet/test green; govulncheck 0 reachable.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 17:24:46 +02:00
egutierrez 3a33656cac Merge issue/0006e-refresh: RefreshSession in all clients (audit 0008 N4) 2026-06-07 17:21:14 +02:00
egutierrez 2f5b372a80 fix(0006e): call RefreshSession after membership changes in all clients (audit 0008 N4)
A secured bus freezes per-subject permissions at connect time, so a peer that
creates or joins a room after connecting cannot pub/sub on it until it reconnects
(RefreshSession). No client called it, so under enforce+ACL the demos failed
closed — pushing the operator to disable the ACL (a security regression at the
operator's discretion).

Wire the membership-change contract into every client:
- cmd/worker: RefreshSession after CreateRoom, before publishing.
- cmd/chat (simple): RefreshSession after CreateRoom+Join, before Subscribe.
- cmd/chat (encrypted demo): A refreshes after CreateRoom; B refreshes after the
  invite+join, both before pub/sub.
- local_files/bridge (gateway): RefreshSession after CreateRoom+Join, before Subscribe.
- mobile: new Session.RefreshSession wrapper + the contract documented for callers.

Contract (documented on the wrappers): after ANY membership change, call
RefreshSession BEFORE pub/sub on the new room (it drops active subs, so it must
precede Subscribe). On an unsecured/dev bus it is a harmless reconnect.

Test:
- TestClientCreateRoomRefreshPublishFlow: end-to-end under enforce+ACL, a peer
  creates a room, refreshes, invites a second peer who joins+refreshes+subscribes,
  and the publish is received — no manual intervention, the ACL stays on.

CGO_ENABLED=0 go build/vet/test green; govulncheck 0 reachable.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 17:21:14 +02:00
egutierrez 32bec75665 Merge issue/0006d-posture: homogeneous cluster posture + /healthz posture (audit 0008 N1) 2026-06-07 17:17:37 +02:00
egutierrez 9b96537aa6 fix(0006d): enforce homogeneous cluster posture + publish posture on /healthz (audit 0008 N1)
A cluster is only as secure as its weakest node: the data plane forwards every
subject between nodes, so one node running without enforced auth lets an
unauthenticated peer Subscribe(">") on it and harvest the traffic forwarded from
the ACL'd nodes.

- validateClusterConfig now takes the auth mode and REFUSES to join a cluster
  unless --bus-auth enforce, regardless of bind (a clustered node is a production
  node; there is no safe dev cluster without auth). This binary therefore cannot
  BE the weak node.
- Server.Posture {enforce,acl,tls,cluster,store} is published on /healthz (non
  secret operational metadata, probe stays unauthenticated) so a monitor or peer
  can detect a cluster member not running enforce+ACL+TLS — covering a peer that
  runs a tampered/old binary outside this node's control.

Tests:
- TestAttack0008_N1: a clustered node with --bus-auth off is refused; the same
  node with enforce + full route security is allowed.
- TestClusterConfigPolicy: extended with off/soft clustered cases (refused) and
  the mode parameter throughout.
- TestHealthExposesPosture: /healthz returns the posture booleans + store backend.

CGO_ENABLED=0 go build/vet/test green; govulncheck 0 reachable.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 17:17:37 +02:00
egutierrez 18ee7c469b Merge issue/0006c-kv-store: wire decentralized control-plane KV store (--store kv) 2026-06-07 17:14:20 +02:00
egutierrez e9ad719424 feat(0006c): wire the decentralized control-plane KV store (--store kv)
0003 built the JetStream KV store (jetstreamStore) but the binary never selected
it: membership.Open (SQLite) was hardcoded and OpenJetStream was only reached by
migrate-to-kv. This completes the wiring so a node actually serves its control
plane from the replicated KV.

- New flag --store kv|sqlite (default sqlite). kv opens the JetStream KV control
  plane over the privileged internal connection; sqlite is the unchanged baseline
  (branch-by-abstraction: the full suite's SQLite paths are untouched).
- Bootstrap cycle resolved with storeHolder: the authenticator consults the holder
  (fail-closed until set), so it can be built before the KV store exists. The KV
  store opens after NATS is up and is published into the holder. The only client
  that can connect in that window is the internal identity, which bypasses the
  store by key. In SQLite mode the store is set before StartServer, so the window
  does not exist.
- needJS now covers --store kv as well as --cluster-name; the JetStream client is
  shared by the KV store and the replicated nonce bucket.
- feature_flags.json: decentralized wiring documented as complete, realized via
  --store kv (opt-in per deploy; default stays sqlite).

Fail-closed preserved: jetstreamStore.IsAuthorized already denies on any backend
error; the holder denies while unset.

Tests:
- TestStoreHolderFailClosed: empty holder denies; serves after set.
- TestKVStoreBootstrapUnderEnforce: end-to-end decentralized boot — KV-seeded user
  authenticates over nkey under enforce; outsider denied.
- TestKVStoreDecentralizedConsistency: a room/user created on one node's KV store
  is visible to another's (ends the per-node SQLite divergence, audit 0008 N5).

CGO_ENABLED=0 go build/vet/test green; govulncheck 0 reachable.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 17:14:20 +02:00
egutierrez d1e1a478f8 Merge issue/0006b-kv-acl: scope JetStream ACL per-room (audit 0008 N2) 2026-06-07 17:08:54 +02:00
egutierrez cacf608fde fix(0006b): scope JetStream ACL per-room, close $JS.API.> KV leak (audit 0008 N2)
The client-infra grant was {"_INBOX.>", "$JS.API.>"}. The broad "$JS.API.>" let
any registered peer drive the whole JetStream API and read the control-plane KV
buckets (KV_UNIBUS_users/rooms/members/room_keys) and the object store directly
over NATS, bypassing the HTTP authorization (requireMember + own-endpoint
checks): a full leak of the allowlist, room graph and sealed-key metadata once the
decentralized control plane is active.

Fix: replace the broad grant with a CLOSED, per-room allow set.
- clientInfraSubjects shrinks to {"_INBOX.>", "$JS.API.INFO"} ($JS.API.INFO is
  account counters only — no room/user/key contents).
- SubjectACLFor now grants, per room the peer belongs to, the room subject plus
  the minimal JetStream API subjects of THAT room's stream (jsSubjectsFor:
  STREAM.*, CONSUMER.*, $JS.ACK scoped to UNIBUS_<roomID>).
- Because KV_UNIBUS_* and OBJ_UNIBUS_* are never a room stream, they fall outside
  the closed allow set and are denied by default. Clients reach blobs over the
  HTTP control plane, not the NATS object store, so OBJ needs no client grant.

roomStreamName mirrors pkg/client.streamName so the authorizer and the producer
never drift.

Tests:
- TestAttack0008_N2: eve (registered, member of no room) cannot bind the KV users
  bucket nor subscribe $KV.UNIBUS_users.> (permissions violation); golden: the
  room owner can still drive her OWN room stream's JetStream API; edge: eve cannot
  reach a foreign room's stream.
- TestReaudit_H4 residual note updated: the $JS.API.> leak it deferred is closed.

CGO_ENABLED=0 go build/vet/test green; govulncheck 0 reachable.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 17:08:54 +02:00
egutierrez a9c245d468 Merge issue/0006a-replicated-nonce: wire replicated nonce store (audit 0008 N3) 2026-06-07 17:02:19 +02:00
egutierrez 8b6a01d280 fix(0006a): wire replicated nonce store on clustered nodes (audit 0008 N3)
membershipd never called Server.UseReplicatedNonces, so every node kept a
per-process anti-replay cache and a signed request accepted on node A could be
replayed to node B (200+200). This wires the shared JetStream KV nonce bucket on
any clustered node, closing the cross-node replay hole.

Bootstrap: under enforce the service needs JetStream on its own embedded server,
but the data plane only accepts allowlisted clients. Resolved with an ephemeral
internal service identity the authenticator recognizes and grants full
permissions (NewNkeyAuthenticatorACLInternal), connected over the in-process
transport (no TLS/CA needed for the self-connection).

Hard rule: --cluster-name != "" means the replicated nonce bucket is mandatory;
if it cannot be created the node refuses to start (wireReplicatedNonces returns a
fatal error) rather than run insecurely. Standalone nodes keep the in-memory
cache unchanged (branch-by-abstraction: no JetStream dependency added).

Changes:
- busauth: NewNkeyAuthenticatorACLInternal + fullPermissions for the internal id.
- cmd/membershipd: connectInternalJS (in-process, privileged) / connectExternalJS;
  wireReplicatedNonces helper; main wires it when clustered; --kv-replicas flag.

Tests (regression of audit 0008 N3):
- TestAttack0008_N3: 2 clustered nodes share the bucket, cross-node replay -> 401.
- TestAttack0008_N3_StandaloneKeepsLocalCache: standalone needs no JetStream,
  same-node replay still 401.
- TestAttack0008_N3_ClusteredRequiresJetStream: clustered + no JetStream -> fatal.
- TestInternalConnPrivilegedUnderEnforce / ...OutsiderRejected: the privileged
  self-connection works under enforce and no other identity can claim it.

CGO_ENABLED=0 go build/vet/test green; govulncheck 0 reachable.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 17:02:19 +02:00
32 changed files with 2161 additions and 63 deletions
+25 -1
View File
@@ -2,7 +2,7 @@
name: unibus
lang: go
domain: infra
version: 0.7.0
version: 0.8.0
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
tags: [service, messaging, nats, e2e]
uses_functions:
@@ -154,6 +154,30 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
## Capability growth log
- v0.8.0 (2026-06-07) — completar y endurecer el cluster (issue 0006, fases
0006a0006g) que cierra los bloqueantes de la auditoría dedicada del cluster
(report 0008) y cablea el control plane descentralizado que 0003 dejó a medias.
(0006a) Se cablea el nonce replicado en el binario: un nodo con `--cluster-name`
usa el bucket JetStream KV compartido obligatoriamente (fail-fast si no se crea),
cerrando el replay cross-node (N3); el "ciclo bootstrap" se resuelve con una
identidad interna efímera que el authenticator reconoce (full perms) y una
conexión in-process privilegiada. (0006b) Se cierra la fuga del control plane
por `$JS.API.>` (N2): la ACL pasa a un allow-set cerrado por-room (JS API solo de
los streams `UNIBUS_<room>` del peer), dejando `KV_UNIBUS_*`/`OBJ_*` fuera del
set y, por tanto, denegados. (0006c) Se cablea el store KV descentralizado
(`--store kv|sqlite`, default sqlite = baseline idéntico) con un `storeHolder`
fail-closed que rompe el ciclo bootstrap del authenticator. (0006d) Posture
homogénea: un nodo rechaza unirse al cluster sin `enforce`, y `/healthz` publica
la posture (N1). (0006e) Todos los clientes llaman `RefreshSession` tras cambios
de membresía (N4), de modo que la ACL es usable bajo enforce sin desactivarla.
(0006f) Bajos: secreto de cluster fuera de argv (`--cluster-pass-file`/env +
inyección en routes), `migrate-to-kv` rechaza target remoto sin `--ca`, y docs
de CA separada para routes + R1 SPOF vs R3 HA. (0006g) Material de deploy del
cluster de 3 nodos (magnus+homer+datardos) en `deploy/cluster/` (certs, unit,
script de despliegue dry-run, runbook) — sin tocar ningún VPS. Toda la
regresión de auditorías previas + los ataques 0008 siguen verdes; govulncheck 0
alcanzables. Branch-by-abstraction: con `--store sqlite` el single-node sigue
idéntico y desplegable en todo momento.
- v0.7.0 (2026-06-07) — hardening de seguridad 2 (issue 0005, fases 0005a0005e)
que cierra los hallazgos nuevos de la re-auditoría red-team (report 0006) y
lleva el veredicto de exposición pública a "sí-con-condiciones". (0005a) Bump de
+15
View File
@@ -69,6 +69,12 @@ func runSimple(natsURL, ctrlURL, roomSub, idFile, caFile string) {
if err := c.Join(roomID); err != nil {
log.Fatalf("join: %v", err)
}
// Membership-change contract (issue 0006e): refresh so the just-created room's
// subject is subscribable under enforce+ACL (permissions are frozen at connect
// time). Must run BEFORE Subscribe — RefreshSession drops active subscriptions.
if err := c.RefreshSession(); err != nil {
log.Fatalf("refresh session after create room: %v", err)
}
sub, err := c.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
fmt.Printf("[%s] %s: %s\n", f.Subject, shortID(f.Sender), string(plaintext))
})
@@ -122,12 +128,21 @@ func runEncryptedDemo(natsURL, ctrlURL, caFile string) {
must(err, "A create room")
fmt.Printf(" room.test -> %s (E2E, persisted, signed)\n", roomID)
// Membership-change contract (issue 0006e): A only became a member of this room
// after connecting, so refresh to gain its subject + per-room JetStream API
// under enforce+ACL before publishing.
must(a.RefreshSession(), "A refresh after create room")
// A invites B (seals K to B's X25519 key).
must(a.Invite(roomID, b.Endpoint()), "A invite B")
// B joins (fetches + decrypts K).
must(b.Join(roomID), "B join")
// B became a member via the invite above; refresh so B can subscribe to the
// room's subject under enforce+ACL (before subscribing — refresh drops subs).
must(b.RefreshSession(), "B refresh after join")
// B subscribes; capture received plaintexts.
recv := make(chan string, 4)
subB, err := b.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
+221
View File
@@ -0,0 +1,221 @@
package main
// Regression for audit report 0008, vector N3: the binary must wire the
// replicated nonce store on a clustered node so a signed request accepted on one
// node cannot be replayed to another. The auditor's ephemeral attack showed the
// OLD binary never called UseReplicatedNonces (each node kept a per-process
// cache), so a captured request replayed to a second node with 200+200. These
// tests drive the SAME helper the binary uses (wireReplicatedNonces) so they
// prove the WIRING, not just the underlying API.
import (
"bytes"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"io"
"net"
"net/http"
"net/http/httptest"
"path/filepath"
"strconv"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
func freePort(t *testing.T) int {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("free port: %v", err)
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port
}
// signed008 builds a transport-signed control-plane request with a caller-chosen
// ts+nonce, so a test can reuse the exact same signed bytes against two nodes to
// exercise replay.
func signed008(t *testing.T, baseURL, method, path string, body []byte, id cs.Identity, ts int64, nonce string) *http.Request {
t.Helper()
canonical := membership.CanonicalRequest(method, path, strconv.FormatInt(ts, 10), nonce, body)
sig := cs.SignEd25519(id.SignPriv, canonical)
var rdr io.Reader
if body != nil {
rdr = bytes.NewReader(body)
}
req, err := http.NewRequest(method, baseURL+path, rdr)
if err != nil {
t.Fatalf("new request: %v", err)
}
req.Header.Set("X-Unibus-Pub", hex.EncodeToString(id.SignPub))
req.Header.Set("X-Unibus-Ts", strconv.FormatInt(ts, 10))
req.Header.Set("X-Unibus-Nonce", nonce)
req.Header.Set("X-Unibus-Sig", base64.StdEncoding.EncodeToString(sig))
return req
}
func randNonce(t *testing.T) string {
t.Helper()
raw := make([]byte, 16)
if _, err := rand.Read(raw); err != nil {
t.Fatalf("nonce: %v", err)
}
return base64.StdEncoding.EncodeToString(raw)
}
// TestAttack0008_N3 is the blocker regression: two clustered membershipd nodes
// wired through wireReplicatedNonces share a JetStream KV nonce bucket, so a
// request accepted on node A is rejected (401) when replayed to node B. Before
// the fix the binary never wired this and the replay returned 200.
func TestAttack0008_N3(t *testing.T) {
// One NATS+JetStream backing the shared nonce bucket (no client auth needed:
// the test drives the membership.Server's nonce store directly via HTTP).
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t),
})
if err != nil {
t.Fatalf("nats: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
nc, err := nats.Connect(ns.ClientURL())
if err != nil {
t.Fatalf("connect: %v", err)
}
t.Cleanup(nc.Close)
js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("jetstream: %v", err)
}
// Shared control-plane state (stand-in for the replicated store) + two nodes.
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
alice, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("identity: %v", err)
}
if err := store.AddUser(hex.EncodeToString(alice.SignPub), "alice", membership.RoleAdmin); err != nil {
t.Fatalf("add alice: %v", err)
}
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
// Each node is wired EXACTLY as the binary wires a clustered node.
mkNode := func() *httptest.Server {
srv := membership.NewServer(store, blobs, membership.AuthEnforce)
if err := wireReplicatedNonces(srv, js, true /*clustered*/, 1); err != nil {
t.Fatalf("wireReplicatedNonces: %v", err)
}
return httptest.NewServer(srv)
}
nodeA := mkNode()
t.Cleanup(nodeA.Close)
nodeB := mkNode()
t.Cleanup(nodeB.Close)
ts := time.Now().Unix()
nonce := randNonce(t)
path := "/members/" + frame.EndpointID(alice.SignPub) + "/rooms"
// Golden: alice's signed request is accepted on node A.
respA, err := http.DefaultClient.Do(signed008(t, nodeA.URL, "GET", path, nil, alice, ts, nonce))
if err != nil {
t.Fatalf("do A: %v", err)
}
respA.Body.Close()
if respA.StatusCode != http.StatusOK {
t.Fatalf("node A first use: status %d, want 200", respA.StatusCode)
}
// Error path (the attack): replay the SAME signed bytes to node B → 401.
respB, err := http.DefaultClient.Do(signed008(t, nodeB.URL, "GET", path, nil, alice, ts, nonce))
if err != nil {
t.Fatalf("do B: %v", err)
}
respB.Body.Close()
if respB.StatusCode != http.StatusUnauthorized {
t.Fatalf("cross-node replay to node B: status %d, want 401 (replayed nonce must be rejected)", respB.StatusCode)
}
}
// TestAttack0008_N3_StandaloneKeepsLocalCache is the edge: a NON-clustered node
// must NOT require JetStream — wireReplicatedNonces is a no-op and the node keeps
// its in-memory cache, which still rejects a same-node replay (the single-node
// guarantee is unchanged). This proves the fix does not add a JetStream
// dependency to standalone deployments.
func TestAttack0008_N3_StandaloneKeepsLocalCache(t *testing.T) {
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
alice, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("identity: %v", err)
}
if err := store.AddUser(hex.EncodeToString(alice.SignPub), "alice", membership.RoleAdmin); err != nil {
t.Fatalf("add alice: %v", err)
}
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
srv := membership.NewServer(store, blobs, membership.AuthEnforce)
// Standalone: clustered=false, js=nil. Must succeed (no JetStream needed).
if err := wireReplicatedNonces(srv, nil, false /*clustered*/, 1); err != nil {
t.Fatalf("standalone wireReplicatedNonces must be a no-op, got: %v", err)
}
node := httptest.NewServer(srv)
t.Cleanup(node.Close)
ts := time.Now().Unix()
nonce := randNonce(t)
path := "/members/" + frame.EndpointID(alice.SignPub) + "/rooms"
resp1, err := http.DefaultClient.Do(signed008(t, node.URL, "GET", path, nil, alice, ts, nonce))
if err != nil {
t.Fatalf("do 1: %v", err)
}
resp1.Body.Close()
if resp1.StatusCode != http.StatusOK {
t.Fatalf("first use: status %d, want 200", resp1.StatusCode)
}
// Same-node replay is still rejected by the in-memory cache.
resp2, err := http.DefaultClient.Do(signed008(t, node.URL, "GET", path, nil, alice, ts, nonce))
if err != nil {
t.Fatalf("do 2: %v", err)
}
resp2.Body.Close()
if resp2.StatusCode != http.StatusUnauthorized {
t.Fatalf("same-node replay: status %d, want 401", resp2.StatusCode)
}
}
// TestAttack0008_N3_ClusteredRequiresJetStream proves the hard rule: a clustered
// node with NO JetStream available refuses (error), so the binary fails fast
// instead of silently running with a per-process cache.
func TestAttack0008_N3_ClusteredRequiresJetStream(t *testing.T) {
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
srv := membership.NewServer(store, blobs, membership.AuthEnforce)
if err := wireReplicatedNonces(srv, nil, true /*clustered*/, 1); err == nil {
t.Fatalf("clustered node with no JetStream must fail, got nil")
}
}
+88 -1
View File
@@ -3,6 +3,8 @@ package main
import (
"fmt"
"net"
"net/url"
"os"
"strings"
"github.com/enmanuel/unibus/pkg/membership"
@@ -21,6 +23,74 @@ func splitRoutes(csv string) []string {
return out
}
// resolveClusterPass resolves the cluster route secret WITHOUT leaking it through
// argv (audit 0008 N1-low: --cluster-pass in argv is visible in ps/journald).
// Precedence: --cluster-pass-file (read + trim the file), then the env var
// UNIBUS_CLUSTER_PASS, then the legacy --cluster-pass flag (argv-visible, kept for
// dev/compat). env is injected (os.Getenv result) so the function stays testable.
// It returns the secret and a short source label for logging (never the secret).
func resolveClusterPass(passFlag, passFile, env string) (secret, source string, err error) {
if passFile != "" {
b, rerr := os.ReadFile(passFile)
if rerr != nil {
return "", "", fmt.Errorf("read --cluster-pass-file %q: %w", passFile, rerr)
}
return strings.TrimSpace(string(b)), "file", nil
}
if env != "" {
return env, "env", nil
}
if passFlag != "" {
return passFlag, "flag", nil
}
return "", "none", nil
}
// injectRouteCreds rewrites each route URL that carries NO userinfo to embed
// user:pass, so the cluster secret is supplied once (via file/env) instead of
// repeated in every --routes argv entry where ps/journald would expose it. A route
// that already carries userinfo is left untouched (operator override). With an
// empty user it is a no-op. A malformed route URL is an error (configuration bug)
// rather than a silently dropped peer.
func injectRouteCreds(routes []string, user, pass string) ([]string, error) {
if user == "" {
return routes, nil
}
out := make([]string, 0, len(routes))
for _, r := range routes {
u, err := url.Parse(r)
if err != nil {
return nil, fmt.Errorf("parse route %q: %w", r, err)
}
if u.User == nil {
u.User = url.UserPassword(user, pass)
}
out = append(out, u.String())
}
return out, nil
}
// isLoopbackURL reports whether a NATS url targets this host only (loopback). Used
// to guard migrate-to-kv (audit 0008 N6): pushing the allowlist to a REMOTE NATS
// without TLS would send handles/roles/sign-pubs in cleartext, so a remote target
// must be TLS-pinned (--ca). A url we cannot classify is treated as NON-loopback
// (conservative: it then requires --ca).
func isLoopbackURL(natsURL string) bool {
u, err := url.Parse(natsURL)
if err != nil {
return false
}
host := u.Hostname()
switch host {
case "localhost":
return true
case "":
return false
}
ip := net.ParseIP(host)
return ip != nil && ip.IsLoopback()
}
// isLoopbackBind reports whether the --bind value keeps the service reachable
// only from this host. An empty bind means "all interfaces" (public), and a
// hostname we cannot resolve to a loopback literal is treated as public — the
@@ -83,7 +153,17 @@ func validateBootConfig(bind string, mode membership.AuthMode, tlsCert, tlsKey s
// The three route-TLS paths are all-or-nothing (mutual TLS needs the node cert,
// its key, and the CA together), independent of the bind, so a partial TLS
// config never silently degrades to plaintext routes.
func validateClusterConfig(clusterName, bind, user, pass, rtCert, rtKey, rtCA string) error {
//
// Homogeneous posture (issue 0006d, audit 0008 N1): a cluster is only as secure
// as its weakest node — the data plane forwards every subject between nodes, so a
// single node running without enforced auth lets an unauthenticated peer
// Subscribe(">") on it and harvest the traffic forwarded from the ACL'd nodes.
// This node therefore REFUSES to join a cluster unless it runs --bus-auth enforce,
// regardless of bind: a clustered node is a production node, and there is no safe
// "dev cluster without auth". (A peer running a tampered binary is out of this
// node's control; /healthz exposes each node's posture so a monitor can detect
// one that is not enforce+ACL — see Server.Posture.)
func validateClusterConfig(clusterName, bind, user, pass, rtCert, rtKey, rtCA string, mode membership.AuthMode) error {
rtAny := rtCert != "" || rtKey != "" || rtCA != ""
rtAll := rtCert != "" && rtKey != "" && rtCA != ""
if rtAny && !rtAll {
@@ -93,6 +173,13 @@ func validateClusterConfig(clusterName, bind, user, pass, rtCert, rtKey, rtCA st
if clusterName == "" {
return nil // standalone: no route layer to secure
}
// A clustered node MUST enforce auth (homogeneous posture). Checked before the
// loopback shortcut so even a loopback cluster cannot form without enforce.
if mode != membership.AuthEnforce {
return fmt.Errorf(
"refusing to start: cluster %q requires --bus-auth enforce; a cluster node without enforced auth+ACL lets an unauthenticated peer harvest the traffic forwarded from the other nodes (audit 0008 N1) — every node must run the same enforce+ACL+TLS posture",
clusterName)
}
if isLoopbackBind(bind) {
return nil // loopback cluster is dev-only and unreachable from outside
}
+44 -19
View File
@@ -108,31 +108,40 @@ func TestBootConfigPolicy(t *testing.T) {
// route-TLS flags are all-or-nothing regardless of bind.
func TestClusterConfigPolicy(t *testing.T) {
const c, k, ca = "node.crt", "node.key", "ca.crt"
en := membership.AuthEnforce
off := membership.AuthOff
soft := membership.AuthSoft
cases := []struct {
name string
clusterName, bind string
user, pass string
rtCert, rtKey, rtCA string
wantErr bool
name string
clusterName, bind string
user, pass string
rtCert, rtKey, rtCA string
mode membership.AuthMode
wantErr bool
}{
// Standalone (no cluster name) is always allowed, even on a public bind.
{"standalone-public", "", "0.0.0.0", "", "", "", "", "", false},
// Loopback dev cluster: unguarded (unreachable from outside).
{"loopback-cluster-bare", "unibus", "127.0.0.1", "", "", "", "", "", false},
// Golden: full public HA config.
{"public-full", "unibus", "0.0.0.0", "u", "p", c, k, ca, false},
// Error: public cluster without a route secret.
{"public-no-secret", "unibus", "0.0.0.0", "", "", c, k, ca, true},
{"public-half-secret", "unibus", "0.0.0.0", "u", "", c, k, ca, true},
// Standalone (no cluster name) is always allowed, even on a public bind and
// without enforce — the cluster posture rule does not apply to a single node.
{"standalone-public-off", "", "0.0.0.0", "", "", "", "", "", off, false},
// Loopback dev cluster WITH enforce: allowed (unreachable from outside).
{"loopback-cluster-enforce", "unibus", "127.0.0.1", "", "", "", "", "", en, false},
// Golden: full public HA config under enforce.
{"public-full-enforce", "unibus", "0.0.0.0", "u", "p", c, k, ca, en, false},
// N1 (audit 0008): a clustered node WITHOUT enforce is refused — even on
// loopback — so no weak node can join the cluster.
{"cluster-off-refused", "unibus", "127.0.0.1", "", "", "", "", "", off, true},
{"cluster-soft-refused", "unibus", "0.0.0.0", "u", "p", c, k, ca, soft, true},
// Error: public cluster without a route secret (enforce on, fails on secret).
{"public-no-secret", "unibus", "0.0.0.0", "", "", c, k, ca, en, true},
{"public-half-secret", "unibus", "0.0.0.0", "u", "", c, k, ca, en, true},
// Error: public cluster without mutual route TLS.
{"public-no-tls", "unibus", "10.0.0.1", "u", "p", "", "", "", true},
// Error: partial route-TLS flags trip regardless of bind.
{"loopback-partial-tls", "unibus", "127.0.0.1", "", "", c, "", "", true},
{"standalone-partial-tls", "", "127.0.0.1", "", "", c, k, "", true},
{"public-no-tls", "unibus", "10.0.0.1", "u", "p", "", "", "", en, true},
// Error: partial route-TLS flags trip regardless of bind/mode.
{"loopback-partial-tls", "unibus", "127.0.0.1", "", "", c, "", "", en, true},
{"standalone-partial-tls", "", "127.0.0.1", "", "", c, k, "", off, true},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
err := validateClusterConfig(tc.clusterName, tc.bind, tc.user, tc.pass, tc.rtCert, tc.rtKey, tc.rtCA)
err := validateClusterConfig(tc.clusterName, tc.bind, tc.user, tc.pass, tc.rtCert, tc.rtKey, tc.rtCA, tc.mode)
if tc.wantErr && err == nil {
t.Fatalf("cluster config %+v should be refused", tc)
}
@@ -143,6 +152,22 @@ func TestClusterConfigPolicy(t *testing.T) {
}
}
// TestAttack0008_N1 is the regression for audit 0008 N1 scenario 2: a node
// configured to join a cluster while NOT enforcing auth (the weak node that lets
// an unauthenticated peer harvest the cluster's forwarded traffic) must be refused
// at startup. The homogeneous-posture rule makes this binary unable to BE that
// weak node.
func TestAttack0008_N1(t *testing.T) {
// Weak node: clustered but --bus-auth off -> refused.
if err := validateClusterConfig("unibus", "0.0.0.0", "u", "p", "n.crt", "n.key", "ca.crt", membership.AuthOff); err == nil {
t.Fatalf("a clustered node without enforce must be refused (audit 0008 N1)")
}
// Same node WITH enforce + full route security -> allowed.
if err := validateClusterConfig("unibus", "0.0.0.0", "u", "p", "n.crt", "n.key", "ca.crt", membership.AuthEnforce); err != nil {
t.Fatalf("a clustered enforce node with full route security must be allowed, got: %v", err)
}
}
func TestSplitRoutes(t *testing.T) {
cases := []struct {
in string
+84
View File
@@ -0,0 +1,84 @@
package main
import (
"fmt"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
server "github.com/nats-io/nats-server/v2/server"
)
// connectInternalJS opens a privileged JetStream client from membershipd to its
// OWN embedded NATS server. This is the resolution of the "bootstrap cycle"
// (issue 0006a/c): the service needs JetStream to create the replicated nonce
// bucket and the control-plane KV, but under enforce the data plane only accepts
// allowlisted clients confined to their rooms. The connection therefore
// authenticates with the process's ephemeral internal identity — the identity the
// authenticator was built to recognize (NewNkeyAuthenticatorACLInternal) and
// grant full permissions — without ever appearing in the user allowlist.
//
// It uses the in-process transport (nats.InProcessServer), a Go pipe inside the
// process, so it bypasses TLS entirely: no CA wiring is needed for this
// self-connection even when the public data plane is TLS-only. useNkey mirrors
// whether the embedded server enforces auth: under enforce the internal identity
// presents its nkey; without enforce the server accepts an unauthenticated
// in-process client and the nkey is omitted.
//
// The caller owns the returned connection and must Close it on shutdown (after
// the JetStream context is no longer used).
func connectInternalJS(ns *server.Server, internalID cs.Identity, useNkey bool) (*nats.Conn, jetstream.JetStream, error) {
opts := []nats.Option{
nats.Name("membershipd-internal"),
nats.InProcessServer(ns),
}
if useNkey {
pub, sign, err := busauth.ClientNkey(internalID.SignPriv)
if err != nil {
return nil, nil, fmt.Errorf("internal nkey: %w", err)
}
opts = append(opts, nats.Nkey(pub, sign))
}
// The URL is ignored for an in-process connection; the InProcessServer option
// supplies the transport.
nc, err := nats.Connect("", opts...)
if err != nil {
return nil, nil, fmt.Errorf("connect internal nats: %w", err)
}
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
return nil, nil, fmt.Errorf("internal jetstream: %w", err)
}
return nc, js, nil
}
// connectExternalJS opens a JetStream client to an EXTERNAL NATS the operator
// runs (membershipd started with --nats-url). Unlike the embedded path there is
// no in-process transport and no internal identity: the external server enforces
// its own auth, so membershipd connects as a plain client (optionally TLS-pinned
// to the bus CA). It is best-effort and intended for an operator-managed cluster;
// the standard unibus deploy uses the embedded server (connectInternalJS).
func connectExternalJS(natsURL, caPath string) (*nats.Conn, jetstream.JetStream, error) {
opts := []nats.Option{nats.Name("membershipd-internal")}
if caPath != "" {
tlsCfg, err := busauth.LoadCATLSConfig(caPath)
if err != nil {
return nil, nil, fmt.Errorf("load CA %q: %w", caPath, err)
}
opts = append(opts, nats.Secure(tlsCfg))
}
nc, err := nats.Connect(natsURL, opts...)
if err != nil {
return nil, nil, fmt.Errorf("connect external nats %q: %w", natsURL, err)
}
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
return nil, nil, fmt.Errorf("external jetstream: %w", err)
}
return nc, js, nil
}
+119
View File
@@ -0,0 +1,119 @@
package main
// Bootstrap test for issue 0006a/c: under enforce, membershipd must still reach
// JetStream on its OWN embedded server to create the nonce/KV buckets. It does so
// with an ephemeral internal identity the authenticator grants full permissions
// (NewNkeyAuthenticatorACLInternal). These tests prove that privileged
// self-connection works AND that no other identity can claim it.
import (
"context"
"encoding/hex"
"net"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
func icFreePort(t *testing.T) int {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("free port: %v", err)
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port
}
// TestInternalConnPrivilegedUnderEnforce: with an enforce authenticator that
// authorizes NO bus user, the internal identity still connects in-process and has
// full permissions — it creates a KV bucket and round-trips a value. This is the
// resolution of the bootstrap cycle the audit flagged as the reason the KV store
// was never wired.
func TestInternalConnPrivilegedUnderEnforce(t *testing.T) {
internalID, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("internal identity: %v", err)
}
internalPubHex := hex.EncodeToString(internalID.SignPub)
// Authenticator: no bus user is authorized; only the internal identity passes.
auth := busauth.NewNkeyAuthenticatorACLInternal(
func(string) bool { return false },
busauth.PermissionsFromSubjects(func(string) ([]string, error) { return []string{"_INBOX.>"}, nil }),
internalPubHex,
)
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: icFreePort(t), Auth: auth,
})
if err != nil {
t.Fatalf("nats: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
nc, js, err := connectInternalJS(ns, internalID, true /*useNkey*/)
if err != nil {
t.Fatalf("connectInternalJS: %v", err)
}
t.Cleanup(nc.Close)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KV_UNIBUS_test", Replicas: 1})
if err != nil {
t.Fatalf("internal conn could not create KV bucket (full perms expected): %v", err)
}
if _, err := kv.Put(ctx, "k", []byte("v")); err != nil {
t.Fatalf("kv put: %v", err)
}
e, err := kv.Get(ctx, "k")
if err != nil || string(e.Value()) != "v" {
t.Fatalf("kv get: val=%q err=%v", e, err)
}
}
// TestInternalConnOutsiderRejected: an identity that is neither the internal one
// nor an allowlisted bus user cannot connect — proving the internal bypass is
// scoped to the exact internal key, not a blanket hole.
func TestInternalConnOutsiderRejected(t *testing.T) {
internalID, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("internal identity: %v", err)
}
auth := busauth.NewNkeyAuthenticatorACLInternal(
func(string) bool { return false },
busauth.PermissionsFromSubjects(func(string) ([]string, error) { return []string{"_INBOX.>"}, nil }),
hex.EncodeToString(internalID.SignPub),
)
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: icFreePort(t), Auth: auth,
})
if err != nil {
t.Fatalf("nats: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
outsider, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("outsider identity: %v", err)
}
pub, sign, err := busauth.ClientNkey(outsider.SignPriv)
if err != nil {
t.Fatalf("outsider nkey: %v", err)
}
conn, err := nats.Connect(ns.ClientURL(),
nats.Nkey(pub, sign),
nats.MaxReconnects(0),
nats.Timeout(2*time.Second),
)
if err == nil {
conn.Close()
t.Fatalf("outsider (unauthorized, non-internal) must be rejected, but connected")
}
}
+154
View File
@@ -0,0 +1,154 @@
package main
// Wiring tests for issue 0006c: --store kv selects the replicated JetStream KV
// control plane, the authenticator serves from it through the storeHolder, and a
// new node sees state created by another (the divergence that per-node SQLite
// caused — audit 0008 N5 — is gone). Branch-by-abstraction is verified elsewhere
// (the SQLite default path is the unchanged baseline covered by the existing
// suite).
import (
"encoding/hex"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// TestKVStoreBootstrapUnderEnforce drives the exact decentralized boot the binary
// performs: build the authenticator over an empty holder, start NATS, open the
// privileged internal connection, open the KV store, publish it into the holder,
// then a real bus user (seeded into the KV store) authenticates over nkey. This
// proves the bootstrap cycle is broken correctly — the KV-backed control plane
// authorizes live clients under enforce.
func TestKVStoreBootstrapUnderEnforce(t *testing.T) {
internalID, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("internal identity: %v", err)
}
holder := &storeHolder{}
auth := busauth.NewNkeyAuthenticatorACLInternal(
holder.IsAuthorized,
busauth.PermissionsFromSubjects(holder.subjectACL),
hex.EncodeToString(internalID.SignPub),
)
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t), Auth: auth,
})
if err != nil {
t.Fatalf("nats: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
// Privileged internal connection opens the KV store while the holder still
// denies every normal client.
intNC, js, err := connectInternalJS(ns, internalID, true)
if err != nil {
t.Fatalf("connectInternalJS: %v", err)
}
t.Cleanup(intNC.Close)
kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
if err != nil {
t.Fatalf("open kv store: %v", err)
}
holder.set(kvStore)
// Seed a bus user into the KV control plane.
alice, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("alice: %v", err)
}
if err := kvStore.AddUser(hex.EncodeToString(alice.SignPub), "alice", membership.RoleMember); err != nil {
t.Fatalf("seed alice: %v", err)
}
// alice authenticates over nkey — authorized via the KV store through the holder.
pub, sign, err := busauth.ClientNkey(alice.SignPriv)
if err != nil {
t.Fatalf("alice nkey: %v", err)
}
aliceNC, err := nats.Connect(ns.ClientURL(), nats.Nkey(pub, sign), nats.MaxReconnects(0), nats.Timeout(2*time.Second))
if err != nil {
t.Fatalf("alice (KV-authorized) must connect under enforce: %v", err)
}
aliceNC.Close()
// An outsider not in the KV store is denied (fail closed).
outsider, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("outsider: %v", err)
}
opub, osign, err := busauth.ClientNkey(outsider.SignPriv)
if err != nil {
t.Fatalf("outsider nkey: %v", err)
}
if oc, err := nats.Connect(ns.ClientURL(), nats.Nkey(opub, osign), nats.MaxReconnects(0), nats.Timeout(2*time.Second)); err == nil {
oc.Close()
t.Fatalf("an outsider absent from the KV store must be rejected")
}
}
// TestKVStoreDecentralizedConsistency: a room/user created via one node's KV store
// is immediately visible to another node's KV store over the same JetStream — the
// shared, replicated control plane that ends the per-node SQLite divergence.
func TestKVStoreDecentralizedConsistency(t *testing.T) {
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t),
})
if err != nil {
t.Fatalf("nats: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
open := func() membership.Store {
nc, err := nats.Connect(ns.ClientURL())
if err != nil {
t.Fatalf("connect: %v", err)
}
t.Cleanup(nc.Close)
js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("jetstream: %v", err)
}
st, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
if err != nil {
t.Fatalf("open kv: %v", err)
}
return st
}
nodeA := open()
nodeB := open()
owner, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("owner: %v", err)
}
ownerPub := hex.EncodeToString(owner.SignPub)
if err := nodeA.AddUser(ownerPub, "owner", membership.RoleAdmin); err != nil {
t.Fatalf("nodeA add user: %v", err)
}
if err := nodeA.CreateRoom(
membership.RoomInfo{RoomID: "ROOMX", Subject: "room.shared.x", OwnerEndpoint: "owner-ep"},
owner.SignPub, owner.KexPub, nil,
); err != nil {
t.Fatalf("nodeA create room: %v", err)
}
// nodeB (a different connection, same buckets) sees both immediately.
if !nodeB.IsAuthorized(ownerPub) {
t.Fatalf("nodeB must see the user created on nodeA (decentralized state divergence)")
}
got, err := nodeB.GetRoom("ROOMX")
if err != nil {
t.Fatalf("nodeB must see the room created on nodeA: %v", err)
}
if got.Subject != "room.shared.x" {
t.Fatalf("nodeB read wrong room subject: %q", got.Subject)
}
}
+75
View File
@@ -0,0 +1,75 @@
package main
import (
"os"
"path/filepath"
"strings"
"testing"
)
// TestResolveClusterPass verifies the secret resolution precedence
// (file > env > flag) that keeps the cluster password out of argv (issue 0006f).
func TestResolveClusterPass(t *testing.T) {
// file wins over env and flag, and is trimmed.
f := filepath.Join(t.TempDir(), "pass")
if err := os.WriteFile(f, []byte("filesecret\n"), 0o600); err != nil {
t.Fatalf("write: %v", err)
}
if got, src, err := resolveClusterPass("flagsecret", f, "envsecret"); err != nil || got != "filesecret" || src != "file" {
t.Fatalf("file precedence: got %q src %q err %v", got, src, err)
}
// env wins over flag when no file.
if got, src, err := resolveClusterPass("flagsecret", "", "envsecret"); err != nil || got != "envsecret" || src != "env" {
t.Fatalf("env precedence: got %q src %q err %v", got, src, err)
}
// flag is the last resort.
if got, src, err := resolveClusterPass("flagsecret", "", ""); err != nil || got != "flagsecret" || src != "flag" {
t.Fatalf("flag fallback: got %q src %q err %v", got, src, err)
}
// none set.
if got, src, err := resolveClusterPass("", "", ""); err != nil || got != "" || src != "none" {
t.Fatalf("none: got %q src %q err %v", got, src, err)
}
// missing file is an error.
if _, _, err := resolveClusterPass("", filepath.Join(t.TempDir(), "nope"), ""); err == nil {
t.Fatalf("missing file must error")
}
}
// TestInjectRouteCreds verifies the secret is injected only into routes that omit
// userinfo, so --routes argv need not carry the password (issue 0006f).
func TestInjectRouteCreds(t *testing.T) {
in := []string{"nats://10.0.0.2:6250", "nats://override:pw@10.0.0.3:6250"}
out, err := injectRouteCreds(in, "user", "secret")
if err != nil {
t.Fatalf("inject: %v", err)
}
if !strings.Contains(out[0], "user:secret@10.0.0.2:6250") {
t.Fatalf("creds not injected into bare route: %q", out[0])
}
if !strings.Contains(out[1], "override:pw@10.0.0.3:6250") {
t.Fatalf("existing userinfo must be preserved: %q", out[1])
}
// empty user is a no-op.
noop, err := injectRouteCreds(in, "", "")
if err != nil || noop[0] != in[0] {
t.Fatalf("empty user must be a no-op: %v %q", err, noop[0])
}
}
// TestIsLoopbackURL guards migrate-to-kv against pushing the allowlist cleartext
// to a remote NATS (issue 0006f, audit 0008 N6).
func TestIsLoopbackURL(t *testing.T) {
loop := []string{"nats://127.0.0.1:4250", "nats://localhost:4250", "nats://[::1]:4250"}
for _, u := range loop {
if !isLoopbackURL(u) {
t.Fatalf("%q should be loopback", u)
}
}
remote := []string{"nats://10.0.0.2:4250", "nats://bus.example.com:4250", "::not-a-url"}
for _, u := range remote {
if isLoopbackURL(u) {
t.Fatalf("%q should NOT be loopback", u)
}
}
}
+153 -14
View File
@@ -7,6 +7,7 @@ package main
import (
"context"
"crypto/tls"
"encoding/hex"
"flag"
"log"
"net/http"
@@ -15,6 +16,10 @@ import (
"syscall"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
server "github.com/nats-io/nats-server/v2/server"
"github.com/enmanuel/unibus/pkg/blobstore"
@@ -58,10 +63,26 @@ func main() {
clusterPort = flag.Int("cluster-port", 6250, "route listener port for server-to-server cluster traffic")
routesCSV = flag.String("routes", "", "comma-separated nats-route URLs of the OTHER nodes, e.g. nats://user:pass@10.0.0.2:6250")
clusterUser = flag.String("cluster-user", "", "shared route secret username (gates the route listener)")
clusterPass = flag.String("cluster-pass", "", "shared route secret password")
clusterPass = flag.String("cluster-pass", "", "shared route secret password (argv-visible — prefer --cluster-pass-file or UNIBUS_CLUSTER_PASS)")
// Secret out of argv (issue 0006f, audit 0008 N1-low): a password in
// --cluster-pass / --routes is visible in ps/journald. Prefer a file or the
// UNIBUS_CLUSTER_PASS env var; routes may then omit userinfo and the secret
// is injected from here.
clusterPassFile = flag.String("cluster-pass-file", "", "path to a file holding the cluster route password (preferred over --cluster-pass; keeps the secret out of argv)")
routeTLSCert = flag.String("route-tls-cert", "", "this node's route certificate (CA-signed); enables mutual route TLS with --route-tls-key/--route-tls-ca")
routeTLSKey = flag.String("route-tls-key", "", "this node's route private key")
routeTLSCA = flag.String("route-tls-ca", "", "bus CA that signs every node's route certificate (deploy/tls/ca.crt)")
// Replicated control plane (issue 0006a/c): the JetStream replication factor
// for the shared nonce bucket (and, with --store kv, the control-plane KV).
// 1 for a 1-2 node rollout, 3 for real HA quorum (raise in place with
// `nats stream update --replicas 3` when the third node joins).
kvReplicas = flag.Int("kv-replicas", 1, "JetStream replication factor for the shared nonce/KV buckets (1..3)")
caFile = flag.String("ca", "", "bus CA cert; only used to pin TLS on the internal JetStream connection to an EXTERNAL --nats-url (the embedded server uses an in-process connection that needs no CA)")
// Control-plane store backend (issue 0006c, feature flag decentralized):
// "sqlite" (default) keeps the local single-node SQLite control plane;
// "kv" puts rooms/members/keys/users in replicated JetStream KV so any node
// in the cluster serves the same state.
storeBackend = flag.String("store", "sqlite", "control-plane store backend: sqlite (default, single-node) | kv (replicated JetStream, decentralized)")
)
flag.Parse()
@@ -69,6 +90,17 @@ func main() {
if err != nil {
log.Fatalf("%v", err)
}
if *storeBackend != "sqlite" && *storeBackend != "kv" {
log.Fatalf("--store must be \"sqlite\" or \"kv\", got %q", *storeBackend)
}
// Resolve the cluster route secret out of argv (file/env preferred). The
// resolved value (not *clusterPass) is what guards the route layer and is
// injected into peer route URLs below.
clusterPassResolved, passSource, err := resolveClusterPass(*clusterPass, *clusterPassFile, os.Getenv("UNIBUS_CLUSTER_PASS"))
if err != nil {
log.Fatalf("%v", err)
}
// Fail-open guard (audit H2): a non-loopback bind, or any TLS flag, demands
// --bus-auth enforce. This makes an insecure public startup impossible rather
@@ -78,21 +110,62 @@ func main() {
}
// Cluster route guard (issue 0003a): a public cluster needs a route secret
// and mutual route TLS, and the route-TLS flags are all-or-nothing.
if err := validateClusterConfig(*clusterName, *bind, *clusterUser, *clusterPass, *routeTLSCert, *routeTLSKey, *routeTLSCA); err != nil {
if err := validateClusterConfig(*clusterName, *bind, *clusterUser, clusterPassResolved, *routeTLSCert, *routeTLSKey, *routeTLSCA, authMode); err != nil {
log.Fatalf("%v", err)
}
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
log.SetPrefix("[membershipd] ")
// Control plane store first: the NATS authenticator consults IsAuthorized, so
// the store must exist before the embedded server starts.
store, err := membership.Open(*dbPath)
if err != nil {
log.Fatalf("open membership store: %v", err)
// A clustered node shares its control plane with peers, so it needs a JetStream
// client to manage the replicated nonce bucket (issue 0006a). --store kv (issue
// 0006c) also needs JetStream, for the control-plane KV itself. A standalone
// single-node SQLite deployment needs none of this and keeps the in-process,
// in-memory behavior unchanged.
clustered := *clusterName != ""
decentralized := *storeBackend == "kv"
needJS := clustered || decentralized
enforce := authMode == membership.AuthEnforce
// Internal service identity (issue 0006a): when the embedded data plane enforces
// auth, membershipd must still connect to its OWN server to manage JetStream.
// It does so with this ephemeral identity, which the authenticator is built to
// recognize and grant full permissions (it never enters the user allowlist). It
// is only generated when actually needed (JetStream required AND enforce on AND
// the server is embedded), so a standalone or non-enforce node is unchanged.
var internalID cs.Identity
var internalPubHex string
if needJS && enforce && *natsURL == "" {
internalID, err = cs.GenerateIdentity()
if err != nil {
log.Fatalf("generate internal identity: %v", err)
}
internalPubHex = hex.EncodeToString(internalID.SignPub)
}
defer store.Close()
log.Printf("membership store: %s", *dbPath)
// The authenticator consults the store through a holder so it can be built
// before the store exists: with --store kv the JetStream KV store opens only
// after NATS is up (the bootstrap cycle). In the default SQLite path the store
// is opened and set into the holder right here, before the server starts, so
// behavior is identical to the pre-0006c baseline. `store` is the final store
// used by the HTTP server (set below for the KV path).
holder := &storeHolder{}
var store membership.Store
if !decentralized {
store, err = membership.Open(*dbPath)
if err != nil {
log.Fatalf("open membership store: %v", err)
}
holder.set(store)
log.Printf("membership store: sqlite %s", *dbPath)
}
// Close whichever store ends up final (SQLite closes its file; the JetStream KV
// store's Close is a no-op — its NATS connection is closed separately).
defer func() {
if store != nil {
store.Close()
}
}()
blobs, err := blobstore.New(*storeDir)
if err != nil {
@@ -118,14 +191,21 @@ func main() {
}
// Cluster (issue 0003a): with a cluster name, join the route layer for HA.
if *clusterName != "" {
// Inject the resolved secret into peer route URLs that omit userinfo, so
// the password need not appear in --routes argv (issue 0006f).
routes, rerr := injectRouteCreds(splitRoutes(*routesCSV), *clusterUser, clusterPassResolved)
if rerr != nil {
log.Fatalf("%v", rerr)
}
cc := &embeddednats.ClusterConfig{
Name: *clusterName,
Host: *bind,
Port: *clusterPort,
Routes: splitRoutes(*routesCSV),
Routes: routes,
Username: *clusterUser,
Password: *clusterPass,
Password: clusterPassResolved,
}
log.Printf("cluster route secret source: %s", passSource)
if *routeTLSCert != "" {
rtls, err := busauth.RouteTLSConfig(*routeTLSCert, *routeTLSKey, *routeTLSCA)
if err != nil {
@@ -145,9 +225,10 @@ func main() {
// Subscribe(">") and harvest every room's subject and JetStream activity.
// NATS freezes permissions at connect time, so a peer that joins a room
// after connecting must client.RefreshSession to gain that room's subject.
cfg.Auth = busauth.NewNkeyAuthenticatorACL(
store.IsAuthorized,
busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)),
cfg.Auth = busauth.NewNkeyAuthenticatorACLInternal(
holder.IsAuthorized,
busauth.PermissionsFromSubjects(holder.subjectACL),
internalPubHex,
)
log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)")
}
@@ -172,6 +253,38 @@ func main() {
log.Printf("using external NATS: %s", natsClientURL)
}
// JetStream client + decentralized store (issue 0006a/c). needJS is set for a
// clustered node (shared nonce bucket) and for --store kv (the KV control
// plane). Open the privileged JetStream client first (in-process for the
// embedded server, a plain client for external NATS), then — for --store kv —
// open the replicated KV store and publish it into the holder so the
// authenticator and HTTP server serve from it. The privileged connection is the
// only client that can connect in this window (the holder still denies everyone
// else; the internal identity bypasses the store).
var js jetstream.JetStream
if needJS {
var internalNC *nats.Conn
if *natsURL == "" {
internalNC, js, err = connectInternalJS(ns, internalID, enforce)
} else {
internalNC, js, err = connectExternalJS(natsClientURL, *caFile)
}
if err != nil {
log.Fatalf("internal JetStream connection (required by --cluster-name/--store kv): %v", err)
}
defer internalNC.Close()
if decentralized {
kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: *kvReplicas})
if err != nil {
log.Fatalf("open decentralized control-plane KV store: %v", err)
}
store = kvStore
holder.set(store)
log.Printf("membership store: jetstream KV (replicas=%d)", *kvReplicas)
}
}
srv := membership.NewServer(store, blobs, authMode)
// On a public (non-loopback) bind, disable cleartext rooms: the embedded NATS
// has no per-subject ACL, so cleartext content would be readable by any
@@ -181,6 +294,32 @@ func main() {
srv.RequireEncryptedRooms = true
log.Printf("cleartext rooms: DISABLED (public bind requires end-to-end encryption)")
}
// Publish this node's posture on /healthz so a monitor (or a peer) can detect a
// cluster member not running the homogeneous enforce+ACL+TLS posture (audit
// 0008 N1). enforce implies the per-subject ACL in this binary (they are wired
// together above).
srv.Posture = membership.Posture{
Enforce: enforce,
ACL: enforce,
TLS: *tlsCert != "",
Cluster: clustered,
Store: *storeBackend,
}
// Replicated anti-replay (issue 0006a, audit 0008 N3): a clustered node MUST
// share its nonce store across the cluster, or a request accepted on one node
// can be replayed to another. HARD requirement: if the bucket cannot be created
// the node refuses to start rather than run with a per-process cache that leaves
// the replay hole open.
if needJS {
if err := wireReplicatedNonces(srv, js, clustered, *kvReplicas); err != nil {
log.Fatalf("%v", err)
}
if clustered {
log.Printf("anti-replay: replicated nonce bucket \"KV_UNIBUS_nonces\" (replicas=%d) — cluster-safe", *kvReplicas)
}
}
log.Printf("control-plane auth: %s", authMode)
addr := *bind + ":" + *httpPort
httpSrv := &http.Server{
+8
View File
@@ -33,6 +33,14 @@ func runMigrateCLI(args []string) {
fmt.Fprintln(os.Stderr, "membershipd migrate-to-kv: --nats-url is required (the cluster to write the KV buckets into)")
os.Exit(2)
}
// Confidentiality guard (issue 0006f, audit 0008 N6): the migration writes the
// allowlist (handles, roles, signing pubkeys) into the KV. Against a REMOTE NATS
// without TLS that metadata would travel in cleartext, so a remote target MUST
// be TLS-pinned with --ca. A loopback target is local-only and exempt.
if !isLoopbackURL(*natsURL) && *ca == "" {
fmt.Fprintf(os.Stderr, "membershipd migrate-to-kv: refusing to migrate to remote %q without --ca; the allowlist (handles/roles/sign pubs) would travel in cleartext — pin TLS with --ca, or run against a loopback nats-url\n", *natsURL)
os.Exit(2)
}
// Back up the SQLite database first so a botched migration can be undone.
var backupPath string
+60
View File
@@ -0,0 +1,60 @@
package main
import (
"fmt"
"sync"
"github.com/enmanuel/unibus/pkg/membership"
)
// storeHolder is a concurrency-safe slot for the control-plane store, used to
// break the decentralized bootstrap cycle (issue 0006c): the NATS authenticator
// must be built BEFORE the embedded server starts, but the JetStream KV store can
// only be opened AFTER NATS is up (it needs a JetStream client). The authenticator
// therefore consults the holder instead of a concrete store.
//
// Fail-closed by construction: until the store is set, IsAuthorized denies and
// SubjectACL errors, so any client connecting in the startup window is rejected.
// The only connection expected in that window is membershipd's own internal
// service identity, which the authenticator recognizes by key and lets through
// without consulting the store at all. In the SQLite (default) path the store is
// set before StartServer, so the window does not exist and behavior is identical
// to the pre-0006c baseline.
type storeHolder struct {
mu sync.RWMutex
s membership.Store
}
func (h *storeHolder) set(s membership.Store) {
h.mu.Lock()
h.s = s
h.mu.Unlock()
}
func (h *storeHolder) get() membership.Store {
h.mu.RLock()
defer h.mu.RUnlock()
return h.s
}
// IsAuthorized reports whether signPubHex is an active bus user, denying while the
// store is not yet set (fail closed). It is the predicate the nkey authenticator
// uses for every connecting client.
func (h *storeHolder) IsAuthorized(signPubHex string) bool {
s := h.get()
if s == nil {
return false
}
return s.IsAuthorized(signPubHex)
}
// subjectACL derives the per-subject permissions for signPubHex via the live
// store, erroring (so the caller fails closed and denies the connection) while the
// store is not yet set.
func (h *storeHolder) subjectACL(signPubHex string) ([]string, error) {
s := h.get()
if s == nil {
return nil, fmt.Errorf("control-plane store not ready")
}
return membership.SubjectACLFor(s)(signPubHex)
}
+51
View File
@@ -0,0 +1,51 @@
package main
import (
"encoding/hex"
"path/filepath"
"testing"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/membership"
)
// TestStoreHolderFailClosed: an empty holder denies everything (the bootstrap
// window before the store is set), and starts serving once a store is published.
func TestStoreHolderFailClosed(t *testing.T) {
h := &storeHolder{}
// Empty: deny + error (fail closed).
if h.IsAuthorized("anything") {
t.Fatalf("empty holder must deny IsAuthorized")
}
if _, err := h.subjectACL("anything"); err == nil {
t.Fatalf("empty holder must error from subjectACL (fail closed)")
}
// After set: serves from the real store.
store, err := membership.Open(filepath.Join(t.TempDir(), "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
id, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("identity: %v", err)
}
pub := hex.EncodeToString(id.SignPub)
if err := store.AddUser(pub, "alice", membership.RoleMember); err != nil {
t.Fatalf("add user: %v", err)
}
h.set(store)
if !h.IsAuthorized(pub) {
t.Fatalf("after set, an active user must be authorized")
}
if _, err := h.subjectACL(pub); err != nil {
t.Fatalf("after set, subjectACL must succeed: %v", err)
}
if h.IsAuthorized("deadbeef") {
t.Fatalf("a non-user must not be authorized")
}
}
+40
View File
@@ -0,0 +1,40 @@
package main
import (
"fmt"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/nats-io/nats.go/jetstream"
)
// wireReplicatedNonces applies the cluster anti-replay policy to srv. It is the
// single piece of wiring the binary uses to decide whether a node must share its
// nonce store, extracted so a regression test exercises the EXACT decision the
// running binary makes (issue 0006a, audit 0008 N3).
//
// Policy:
// - A clustered node (clustered == true) MUST use the shared JetStream KV nonce
// bucket. Every node sees the same bucket, so a request accepted on one node
// cannot be replayed to another whose per-process cache never saw the nonce.
// A missing JetStream context, or a failure to create the bucket, is a FATAL
// configuration error returned to the caller — a clustered node running with a
// per-process nonce cache is precisely the replay hole the audit flagged, so
// it must refuse to start rather than serve insecurely.
// - A standalone node (clustered == false) keeps the in-memory cache that
// NewServer installed: there is no second node to replay to, so the shared
// bucket would only add a JetStream dependency for no security gain.
//
// replicas is the nonce bucket's replication factor (R1..R3). Returns nil when no
// action is required (standalone).
func wireReplicatedNonces(srv *membership.Server, js jetstream.JetStream, clustered bool, replicas int) error {
if !clustered {
return nil // standalone: the in-memory nonce cache is sufficient and safe
}
if js == nil {
return fmt.Errorf("clustered node requires JetStream for the shared nonce bucket, but none is available")
}
if err := srv.UseReplicatedNonces(js, replicas); err != nil {
return fmt.Errorf("replicated nonces: %w", err)
}
return nil
}
+7
View File
@@ -47,6 +47,13 @@ func main() {
if err != nil {
log.Fatalf("create room: %v", err)
}
// Membership-change contract (issue 0006e): the bus freezes per-subject
// permissions at connect time, and this room did not exist then. Refresh the
// session so the new room's subject becomes publishable under enforce+ACL. On
// an unsecured/dev bus this is a harmless reconnect.
if err := c.RefreshSession(); err != nil {
log.Fatalf("refresh session after create room: %v", err)
}
log.Printf("room %q -> %s (subject %s, cleartext)", *roomSub, roomID, *roomSub)
stop := make(chan os.Signal, 1)
+31
View File
@@ -65,3 +65,34 @@ curl -fsS http://<host-lan-ip>:8470/healthz
- To run against an external NATS instead of the embedded one, append
`--nats-url nats://<host>:4222` to `ExecStart` and re-run `daemon-reload` +
`restart`.
## Clustering (HA) — see `deploy/cluster/`
The single-node service above is secure on its own. Running unibus as a
multi-node **cluster** has extra hardening rules (issues 0006a0006f); the full
runbook and the generated material live in `deploy/cluster/`. Key points an
operator must know:
- **Homogeneous posture (0006d).** Every node MUST run `--bus-auth enforce` (the
binary refuses to join a cluster otherwise) and present mutual route TLS on a
public bind. `/healthz` publishes each node's `posture` so a monitor can flag a
node that is not `enforce`+`acl`+`tls`.
- **Separate route CA (0006f).** The cluster route layer authenticates *nodes*,
not bus users — sign the route certs with a **dedicated cluster CA**
(`--route-tls-ca`), NOT the client data-plane CA (`--tls-cert`'s CA). Keeping
the two trust roots separate means a client cert can never be presented to the
route port. `deploy/cluster/generate-cluster-certs.sh` builds this CA.
- **Secret out of argv (0006f).** Pass the route password via
`--cluster-pass-file` or the `UNIBUS_CLUSTER_PASS` env var, NOT `--cluster-pass`
or a `nats://user:pass@host` in `--routes` (both are visible in `ps`/journald).
When the secret comes from a file/env, list peers as bare `--routes
nats://<host>:6250` and the binary injects the credentials.
- **`migrate-to-kv` confidentiality (0006f).** The migration writes the allowlist
(handles/roles/sign pubs) into KV. Run it only against a **loopback** nats-url,
or pin TLS with `--ca` for a remote target — otherwise that metadata travels in
cleartext. The binary refuses a remote target without `--ca`.
- **R1 is NOT HA (0006a/N3-DoS).** With `--kv-replicas 1` the control plane
(including the nonce bucket) is a single point of failure: if the node owning
the stream dies, every authenticated request fails closed (auth DoS). Real HA
needs **R3** (quorum 2/3): raise replicas in place with `nats stream update
--replicas 3` once the third node has joined. Do not advertise R1 as HA.
+7
View File
@@ -0,0 +1,7 @@
# Generated TLS material and secrets — NEVER commit (audit 0008: keys/secret).
out/
build/
secrets/
*.key
*.srl
cluster-ca.crt
+181
View File
@@ -0,0 +1,181 @@
# unibus cluster — 3-node deploy runbook (issue 0006g)
This directory holds the material to bring up unibus as a **3-node cluster**
(`magnus` + `homer` + `datardos`) for real HA: with **R3** replication the control
plane (rooms/members/keys/users on JetStream KV + the anti-replay nonce bucket)
survives the loss of any one node (quorum 2/3).
> **The agent that authored this never touched a VPS.** Every step that changes a
> remote host is marked **HUMAN** and is executed by the operator. `deploy-cluster.sh`
> defaults to a dry run.
## Files
| File | What it is |
|---|---|
| `nodes.env` | Topology: cluster name, ports, and the per-node rows (name, ssh host, public IP, WG IP). **HUMAN fills the placeholders.** |
| `generate-cluster-certs.sh` | Mints a **separate cluster route CA** + a route cert per node, and a data-plane server cert per node signed by the **client CA** (`../tls/ca.*`). |
| `membershipd-cluster.service` | One systemd unit, parameterized per node by `/opt/unibus/cluster.env`. enforce + per-subject ACL + TLS + `--store kv`, `Restart=always`. |
| `deploy-cluster.sh` | Cross-builds the linux binary, generates each node's `cluster.env`, and (with `--yes`) rsyncs everything + installs the unit. Staggered start is manual. |
Generated keys/secrets (`out/`, `build/`, `secrets/`) are **gitignored** — they are
secret and never leave the operator's trusted machine except over the secure
rsync channel.
## Topology
| Node | SSH | Public IP | WireGuard IP | Role |
|---|---|---|---|---|
| magnus | `magnus` | `<MAGNUS_PUBLIC_IP>` | `<MAGNUS_WG_IP>` | seed (first up) |
| homer | `homer` | `141.94.69.66` | `<HOMER_WG_IP>` | replica |
| datardos | `dd` | `51.91.100.142` | `<DATARDOS_WG_IP>` (10.21.0.x) | replica |
The route layer (server-to-server) prefers the **WireGuard mesh**
(`ROUTE_NETWORK=wg`); the client data plane and the HTTP control plane are reached
over the public IPs. The route CA is **separate** from the client CA, so a client
cert can never be presented to the route port.
## Prerequisites (HUMAN, once)
1. **Fill `nodes.env`** — replace every `<PLACEHOLDER>` (magnus public IP, all WG
IPs). The scripts refuse to run while any remain.
2. **Client CA exists**`../tls/ca.crt` + `../tls/ca.key`. If not, run
`../tls/generate-certs.sh` on the CA host (om) first. The cluster reuses this CA
for the data plane so existing clients keep trusting the bus.
3. **Mint cluster TLS**:
```bash
./generate-cluster-certs.sh # writes out/<name>/ ; --force to rotate the cluster CA
```
4. **Create the route secret** (out of argv, shared by all nodes):
```bash
mkdir -p secrets && openssl rand -hex 32 > secrets/cluster.pass
```
5. **SSH** to each node's SSH host as `root` works (`ssh magnus true`, `ssh dd true`, ...).
## Stage the nodes
```bash
./deploy-cluster.sh # DRY RUN — prints the full plan, touches nothing
./deploy-cluster.sh --yes # HUMAN: actually rsync + install the unit on all 3 nodes
```
This cross-builds `membershipd` (linux/amd64, `CGO_ENABLED=0`), writes each node's
`cluster.env` (its `NODE_NAME` and the `--routes` to the OTHER two nodes), and
ships the binary, the node's TLS material, the secret, the env file and the unit.
It does **not** start anything.
## Seed the first admin into the KV (HUMAN — loopback bootstrap)
The empty KV control plane has no users, and under `enforce` no external tool can
write the FIRST admin over NATS (it would need to be an admin already — a
chicken-and-egg). The `user` CLI also writes only to a local SQLite file, not the
KV. So the first admin is seeded on the seed node through a **loopback, no-auth
bootstrap** that populates the same JetStream store the cluster unit then reuses:
```bash
ssh root@magnus 'bash -s' <<'SEED'
set -euo pipefail
cd /opt/unibus
# a) Put the first admin into a local SQLite seed file.
./membershipd user add --db ./seed.db --handle root --sign-pub <ADMIN_SIGN_PUB_HEX> --role admin
# b) Bring up a TEMPORARY loopback, no-auth, single-node KV server on the cluster's
# own JetStream store dir (not exposed; bus-auth off is allowed on 127.0.0.1).
./membershipd --store kv --bus-auth off --bind 127.0.0.1 \
--nats-store ./local_files/jetstream --db ./seed.db >/tmp/seed-boot.log 2>&1 &
BOOT=$!; sleep 2
# c) Migrate the admin from SQLite into the replicated KV (loopback — no --ca needed).
./membershipd migrate-to-kv --db ./seed.db --nats-url nats://127.0.0.1:4250 --replicas 1
# d) Stop the bootstrap server. The KV buckets persist in ./local_files/jetstream.
kill "$BOOT"; wait "$BOOT" 2>/dev/null || true
rm -f ./seed.db
SEED
```
> The KV written here lives in `./local_files/jetstream`, which the cluster unit
> reuses (`--nats-store` default), so the admin is present when the enforce cluster
> starts. Additional users are added the same loopback way until a
> `user add --store kv` exists (see GAP in report 0009).
## Bring up (HUMAN — staggered)
Bring up the seed first, then the replicas one at a time, checking each joins.
```bash
# 1. Seed node (after the seed step above).
ssh root@magnus 'systemctl enable --now membershipd-cluster'
ssh root@magnus 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt'
# 2. Replicas, one at a time.
ssh root@homer 'systemctl enable --now membershipd-cluster'
ssh root@datardos 'systemctl enable --now membershipd-cluster'
```
> Initial rollout runs at **R1** (`KV_REPLICAS=1` in `nodes.env`): the buckets live
> on the seed only. This is NOT HA yet — see "Scale to R3".
## Promote an existing single-node (SQLite) deployment (HUMAN, optional)
Instead of seeding fresh, you can migrate an existing single-node `unibus.db` into
the KV — **loopback only** (the allowlist would otherwise travel cleartext; the
command refuses a remote target without `--ca`). Use the same loopback-bootstrap
shape as the seed step (temporary `--bus-auth off` server on 127.0.0.1, then
`migrate-to-kv --db /opt/unibus/local_files/unibus.db`).
## Verify
```bash
# Posture on every node — all must be enforce+acl+tls+cluster, store=kv.
for h in magnus homer datardos; do
echo "== $h =="
ssh root@$h 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt'
done
# Cluster + JetStream meta-group health (needs the `nats` CLI on a node):
ssh root@magnus 'nats --server nats://127.0.0.1:4250 server report jetstream'
ssh root@magnus 'nats --server nats://127.0.0.1:4250 server list' # 3 servers, routes up
```
A healthy cluster shows 3 routed servers and a JetStream meta-group with a leader.
## Scale to R3 (HUMAN — real HA)
Once all three nodes are up and routed, raise the replication factor of every
control-plane stream from 1 to 3 IN PLACE (no data loss), then flip `KV_REPLICAS=3`
in `nodes.env` so future (re)deploys keep it:
```bash
for s in KV_UNIBUS_users KV_UNIBUS_rooms KV_UNIBUS_members KV_UNIBUS_room_keys \
KV_UNIBUS_rooms_by_member KV_UNIBUS_nonces; do
ssh root@magnus "nats --server nats://127.0.0.1:4250 stream update $s --replicas 3 -f"
done
# (also OBJ_UNIBUS_blobs if the object store is in use)
```
Until this is done, R1 means the seed node is a **single point of failure for
authentication**: if it dies, the nonce/KV control plane is unreachable and every
authenticated request fails closed (auth DoS). R1 is a rollout step, not HA.
## Chaos test (HUMAN — requires the 3 live VPS; NOT run here)
Validate quorum tolerance after R3:
```bash
# Kill one node; the cluster keeps serving (quorum 2/3).
ssh root@datardos 'systemctl stop membershipd-cluster'
# -> clients fail over (multiple seed URLs); reads/writes still succeed.
ssh root@datardos 'systemctl start membershipd-cluster' # rejoins, catches up
# Kill two nodes; quorum is LOST — the control plane should fail CLOSED (deny),
# never fail open. Verify a request is rejected, not silently served.
```
This network-level chaos test (kill 1/3, kill 2/3, partition/split-brain) is part
of the deploy validation (issue 0003f) and runs against the real VPS — it is
deliberately out of scope for the authoring agent.
## Rollback
`membershipd` does not delete data. To revert a node to standalone SQLite, stop
the unit and start it without `--store kv`/`--cluster-name`; the KV buckets remain
for a later retry. To rotate the cluster CA, re-run `generate-cluster-certs.sh
--force` and re-stage (every node must get the new `cluster-ca.crt` together).
+126
View File
@@ -0,0 +1,126 @@
#!/usr/bin/env bash
#
# deploy-cluster.sh — cross-build membershipd and stage it onto the three cluster
# nodes (issue 0006g). DEFAULT IS DRY-RUN: it prints the plan and touches nothing.
# Pass --yes to actually rsync + run remote commands. Steps that a HUMAN must run
# (or confirm) are marked "HUMAN:".
#
# Prerequisites (HUMAN, once):
# 1. Fill nodes.env (no <PLACEHOLDER> left).
# 2. ./generate-cluster-certs.sh (mints out/<name>/ TLS material)
# 3. Create the route secret locally: mkdir -p secrets && openssl rand -hex 32 > secrets/cluster.pass
# (secrets/ is gitignored; it is rsynced to each node as cluster.pass)
# 4. SSH access to every node's SSH_HOST with sudo-less root (SSH_USER=root).
#
# What it does per node (with --yes):
# - rsync the membershipd binary, the node's TLS material, the unit, the
# generated cluster.env and the route secret into REMOTE_DIR.
# - install + daemon-reload the systemd unit.
# Start is STAGGERED and left to the human (see README): start the seed node,
# seed the admin, then start the rest.
set -euo pipefail
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$DIR"
# shellcheck source=/dev/null
source ./nodes.env
APPLY=0
[[ "${1:-}" == "--yes" ]] && APPLY=1
if grep -q '<[A-Z_]\+>' nodes.env; then
echo "ERROR: nodes.env still has <PLACEHOLDER> values — fill them in first." >&2
exit 2
fi
SECRET_FILE="secrets/cluster.pass"
if [[ ! -f "$SECRET_FILE" ]]; then
echo "ERROR: $SECRET_FILE missing. HUMAN: mkdir -p secrets && openssl rand -hex 32 > $SECRET_FILE" >&2
exit 2
fi
run() {
# Echo every action; only execute it under --yes.
echo " + $*"
if [[ $APPLY -eq 1 ]]; then
"$@"
fi
}
echo "==> [1/3] cross-build membershipd (linux/amd64, CGO disabled)"
run env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o build/membershipd ../../cmd/membershipd
# Build the comma-separated route list for a node = the OTHER nodes' addresses on
# the chosen network, with NO userinfo (the secret is injected by membershipd from
# the file). Echoes nothing; prints the value.
routes_for() {
local self="$1" out=""
local row name _ssh pub wg addr
for row in "${CLUSTER_NODES[@]}"; do
read -r name _ssh pub wg <<<"$row"
[[ "$name" == "$self" ]] && continue
if [[ "$ROUTE_NETWORK" == "public" ]]; then addr="$pub"; else addr="$wg"; fi
out+="nats://${addr}:${NATS_ROUTE_PORT},"
done
echo "${out%,}"
}
echo "==> [2/3] stage each node (REMOTE_DIR=$REMOTE_DIR)"
for row in "${CLUSTER_NODES[@]}"; do
read -r name ssh _pub _wg <<<"$row"
target="${SSH_USER}@${ssh}"
nodedir="out/${name}"
if [[ ! -d "$nodedir" ]]; then
echo "ERROR: $nodedir missing — run ./generate-cluster-certs.sh first." >&2
exit 2
fi
routes="$(routes_for "$name")"
echo "-- node ${name} (ssh ${ssh}) routes=${routes}"
# Generate this node's cluster.env locally, then ship it.
envfile="build/cluster-${name}.env"
mkdir -p build
cat > "$envfile" <<EOF
NODE_NAME=${name}
CLUSTER_NAME=${CLUSTER_NAME}
CLUSTER_USER=${CLUSTER_USER}
KV_REPLICAS=${KV_REPLICAS}
HTTP_PORT=${HTTP_PORT}
NATS_CLIENT_PORT=${NATS_CLIENT_PORT}
NATS_ROUTE_PORT=${NATS_ROUTE_PORT}
ROUTES=${routes}
CLUSTER_PASS_FILE=${REMOTE_DIR}/secrets/cluster.pass
TLS_CERT=${REMOTE_DIR}/tls/server-${name}.crt
TLS_KEY=${REMOTE_DIR}/tls/server-${name}.key
ROUTE_TLS_CERT=${REMOTE_DIR}/tls/route-${name}.crt
ROUTE_TLS_KEY=${REMOTE_DIR}/tls/route-${name}.key
ROUTE_TLS_CA=${REMOTE_DIR}/tls/cluster-ca.crt
EOF
run ssh "$target" "mkdir -p ${REMOTE_DIR}/tls ${REMOTE_DIR}/secrets"
run rsync -az build/membershipd "${target}:${REMOTE_DIR}/membershipd"
run rsync -az "${nodedir}/" "${target}:${REMOTE_DIR}/tls/"
run rsync -az "$SECRET_FILE" "${target}:${REMOTE_DIR}/secrets/cluster.pass"
run rsync -az "$envfile" "${target}:${REMOTE_DIR}/cluster.env"
run rsync -az membershipd-cluster.service "${target}:/etc/systemd/system/membershipd-cluster.service"
run ssh "$target" "chmod 600 ${REMOTE_DIR}/secrets/cluster.pass ${REMOTE_DIR}/tls/*.key && systemctl daemon-reload"
done
echo "==> [3/3] staged."
if [[ $APPLY -eq 0 ]]; then
echo " DRY-RUN: nothing was sent. Re-run with --yes to apply."
fi
cat <<'NEXT'
HUMAN — staggered start (do NOT enable all at once; see README "Bring up"):
1. Seed node first (e.g. magnus):
ssh root@magnus 'systemctl enable --now membershipd-cluster'
ssh root@magnus '/opt/unibus/membershipd user add --admin ...' # seed admin
2. Then the other two, one at a time, checking quorum after each:
ssh root@homer 'systemctl enable --now membershipd-cluster'
ssh root@datardos 'systemctl enable --now membershipd-cluster'
3. Verify posture + quorum (README "Verify").
4. Scale replicas 1 -> 3 once all three are up (README "Scale to R3").
NEXT
+120
View File
@@ -0,0 +1,120 @@
#!/usr/bin/env bash
#
# generate-cluster-certs.sh — mint the TLS material for a unibus 3-node cluster
# (issue 0006g). Run ONCE on a trusted machine (e.g. om, which custodies the bus
# CA); distribute the per-node output to each node over a secure channel. This
# script touches NO remote host.
#
# It produces two trust roots, kept SEPARATE on purpose (audit 0008 N1-low):
#
# 1. The CLUSTER route CA (cluster-ca.crt/key, generated here): signs each
# node's ROUTE certificate. The route layer authenticates NODES, not bus
# users, so it must NOT share the client data-plane CA — a client cert can
# then never be presented to the route port.
# 2. The CLIENT data-plane CA (../tls/ca.crt/key, the one clients pin): signs
# each node's DATA-PLANE server certificate. Reused, not regenerated, so
# existing clients keep trusting the bus.
#
# Per node it emits, under out/<name>/:
# route-<name>.crt/key route cert (cluster CA), EKU server+clientAuth
# (each node is BOTH server and dialer to its peers)
# server-<name>.crt/key data-plane cert (client CA), EKU serverAuth
# cluster-ca.crt the route CA cert (for --route-tls-ca)
# ca.crt the client CA cert (for clients / control-plane TLS)
#
# SANs per node = its public IP + its WireGuard IP + its hostname + localhost.
#
# Key material: EC P-256 (Go crypto/tls + nats-server friendly), matching
# ../tls/generate-certs.sh.
set -euo pipefail
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$DIR"
# shellcheck source=/dev/null
source ./nodes.env
# Refuse to run while any placeholder remains (HUMAN must fill nodes.env first).
if grep -q '<[A-Z_]\+>' nodes.env; then
echo "ERROR: nodes.env still has <PLACEHOLDER> values — fill them in first." >&2
grep -n '<[A-Z_]\+>' nodes.env >&2
exit 2
fi
CLIENT_CA_CRT="../tls/ca.crt"
CLIENT_CA_KEY="../tls/ca.key"
if [[ ! -f "$CLIENT_CA_CRT" || ! -f "$CLIENT_CA_KEY" ]]; then
echo "ERROR: client data-plane CA not found at ../tls/ca.{crt,key}." >&2
echo " Run ../tls/generate-certs.sh first (it mints the client CA)." >&2
exit 2
fi
DAYS_CA=3650
DAYS_CRT=825
force=0
[[ "${1:-}" == "--force" ]] && force=1
# --- cluster route CA (separate trust root) ---
if [[ ! -f cluster-ca.crt || ! -f cluster-ca.key || $force -eq 1 ]]; then
echo "==> generating cluster route CA (separate from the client CA)"
openssl ecparam -name prime256v1 -genkey -noout -out cluster-ca.key
chmod 600 cluster-ca.key
openssl req -x509 -new -key cluster-ca.key -sha256 -days "$DAYS_CA" \
-subj "/CN=unibus-cluster-ca" -out cluster-ca.crt
else
echo "==> reusing existing cluster route CA (pass --force to regenerate)"
fi
# mint <out_key> <out_crt> <subject_cn> <san> <eku> <ca_crt> <ca_key>
mint_cert() {
local out_key="$1" out_crt="$2" cn="$3" san="$4" eku="$5" ca_crt="$6" ca_key="$7"
local csr ext
csr="$(mktemp)"
ext="$(mktemp)"
openssl ecparam -name prime256v1 -genkey -noout -out "$out_key"
chmod 600 "$out_key"
openssl req -new -key "$out_key" -subj "/CN=${cn}" -out "$csr"
cat > "$ext" <<EOF
subjectAltName=${san}
extendedKeyUsage=${eku}
keyUsage=digitalSignature,keyEncipherment
EOF
openssl x509 -req -in "$csr" -CA "$ca_crt" -CAkey "$ca_key" -CAcreateserial \
-sha256 -days "$DAYS_CRT" -extfile "$ext" -out "$out_crt"
rm -f "$csr" "$ext"
}
for row in "${CLUSTER_NODES[@]}"; do
read -r name _ssh pub wg <<<"$row"
echo "==> node ${name}: SAN IP:${pub}, IP:${wg}, DNS:${name}, localhost, 127.0.0.1"
nodedir="out/${name}"
mkdir -p "$nodedir"
san="IP:${pub},IP:${wg},DNS:${name},DNS:localhost,IP:127.0.0.1"
# Route cert: signed by the cluster CA; server+client auth (mutual routes).
mint_cert "${nodedir}/route-${name}.key" "${nodedir}/route-${name}.crt" \
"unibus-route-${name}" "$san" "serverAuth,clientAuth" \
cluster-ca.crt cluster-ca.key
# Data-plane server cert: signed by the client CA; serverAuth only.
mint_cert "${nodedir}/server-${name}.key" "${nodedir}/server-${name}.crt" \
"unibus-${name}" "$san" "serverAuth" \
"$CLIENT_CA_CRT" "$CLIENT_CA_KEY"
# Co-locate the two CA certs each node needs.
cp cluster-ca.crt "${nodedir}/cluster-ca.crt"
cp "$CLIENT_CA_CRT" "${nodedir}/ca.crt"
done
rm -f cluster-ca.srl ../tls/ca.srl 2>/dev/null || true
echo
echo "==> done. Per-node material under out/<name>/ (KEYS ARE SECRET — never git):"
for row in "${CLUSTER_NODES[@]}"; do
read -r name _rest <<<"$row"
echo " out/${name}/ (route-${name}.*, server-${name}.*, cluster-ca.crt, ca.crt)"
done
echo
echo "verify a SAN with:"
echo " openssl x509 -in out/<name>/server-<name>.crt -noout -text | grep -A1 'Subject Alternative Name'"
@@ -0,0 +1,45 @@
[Unit]
# unibus membershipd — cluster node (issue 0006g).
#
# One unit, parameterized per node by /opt/unibus/cluster.env (generated by
# deploy-cluster.sh): NODE_NAME, ROUTES and the cert paths differ per node, the
# rest of the posture (enforce + per-subject ACL + TLS + --store kv) is identical
# on every node, which is the homogeneous posture a secure cluster requires
# (audit 0008 N1).
Description=unibus membershipd (cluster node)
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
WorkingDirectory=/opt/unibus
EnvironmentFile=/opt/unibus/cluster.env
# The route password comes from a FILE referenced by ${CLUSTER_PASS_FILE}, never
# from argv (audit 0008 N1-low). The peer --routes carry no userinfo; membershipd
# injects the credentials from the file/user.
ExecStart=/opt/unibus/membershipd \
--bind 0.0.0.0 \
--bus-auth enforce \
--http-port ${HTTP_PORT} \
--nats-port ${NATS_CLIENT_PORT} \
--tls-cert ${TLS_CERT} \
--tls-key ${TLS_KEY} \
--cluster-name ${CLUSTER_NAME} \
--server-name ${NODE_NAME} \
--cluster-port ${NATS_ROUTE_PORT} \
--routes ${ROUTES} \
--cluster-user ${CLUSTER_USER} \
--cluster-pass-file ${CLUSTER_PASS_FILE} \
--route-tls-cert ${ROUTE_TLS_CERT} \
--route-tls-key ${ROUTE_TLS_KEY} \
--route-tls-ca ${ROUTE_TLS_CA} \
--store kv \
--kv-replicas ${KV_REPLICAS}
# Restart=always (NOT on-failure): a clean SIGTERM exits success, and on-failure
# would then NOT restart, leaving the node silently dead (see function_tags.md).
Restart=always
RestartSec=2
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target
+44
View File
@@ -0,0 +1,44 @@
# Cluster topology for the unibus 3-node deployment (issue 0006g).
#
# This file is SOURCED by generate-cluster-certs.sh and deploy-cluster.sh.
#
# HUMAN: fill in every <PLACEHOLDER> with the real value before running the
# scripts. The public IPs known at authoring time are pre-filled; the WireGuard
# mesh IPs and magnus's public IP must be supplied. The scripts refuse to run
# while any <PLACEHOLDER> remains.
# Cluster identity (must be identical on every node).
CLUSTER_NAME="unibus"
# Route-secret username; the password is NOT here — it lives in a file (see
# CLUSTER_PASS_FILE in deploy-cluster.sh) so it never lands in argv or git.
CLUSTER_USER="unibus-cluster"
# KV/nonce replication factor. START AT 1 for the initial 1->3 rollout, then raise
# to 3 IN PLACE (see README "Scale to R3") once all three nodes have joined. Only
# set this to 3 here after the third node is up and you re-run the KV update.
KV_REPLICAS=1
# Ports (same on every node; the route port is server-to-server only).
NATS_CLIENT_PORT=4250
NATS_ROUTE_PORT=6250
HTTP_PORT=8470
# Remote install layout and SSH login user.
REMOTE_DIR="/opt/unibus"
SSH_USER="root"
# Which address family the inter-node routes use. "wg" builds --routes from the
# WireGuard mesh IPs (private server-to-server links, preferred); "public" uses
# the public IPs. The route layer is always mutual-TLS regardless.
ROUTE_NETWORK="wg"
# One row per node: NAME SSH_HOST PUBLIC_IP WG_IP
# NAME -> --server-name and the per-node cert filenames (unique).
# SSH_HOST -> the `ssh <SSH_HOST>` alias (see ~/.ssh/config).
# PUBLIC_IP -> public address; goes in the cert SANs (client-facing data plane).
# WG_IP -> WireGuard mesh address; cert SAN + route target when ROUTE_NETWORK=wg.
CLUSTER_NODES=(
"magnus magnus <MAGNUS_PUBLIC_IP> <MAGNUS_WG_IP>"
"homer homer 141.94.69.66 <HOMER_WG_IP>"
"datardos dd 51.91.100.142 <DATARDOS_WG_IP>"
)
+1 -1
View File
@@ -18,7 +18,7 @@
"decentralized": {
"enabled": false,
"issue": "0003",
"description": "Control-plane state on replicated JetStream KV instead of local SQLite (branch-by-abstraction membership.Store: sqliteStore default OFF, jetstreamStore ON). The route cluster (0003a) and the KV store (0003b) ship behind this flag; the membershipd boot wiring that selects the KV store completes with the session/reconnect redesign (0003e) and is activated on the multi-node deploy (0003f). OFF keeps the single-node SQLite control plane unchanged.",
"description": "Control-plane state on replicated JetStream KV instead of local SQLite (branch-by-abstraction membership.Store: sqliteStore default, jetstreamStore opt-in). The route cluster (0003a) and the KV store (0003b) shipped behind this flag; the membershipd boot wiring that selects the store is COMPLETE since issue 0006c and is realized at runtime with the server flag --store kv|sqlite (default sqlite). The internal-identity bootstrap (0006a) lets membershipd open the KV store on its own embedded NATS under enforce. Per-deploy opt-in: a node joins the decentralized control plane by starting with --store kv (and --cluster-name for HA). OFF (--store sqlite) keeps the single-node SQLite control plane unchanged.",
"added": "2026-06-07",
"enabled_at": null
}
@@ -1,8 +1,10 @@
---
issue: 0006
title: Completar y endurecer el cluster — wiring del control plane KV + N1-N6 de la auditoría 0008
status: spec
status: done
created: 2026-06-07
closed: 2026-06-07
closed_by: fases 0006a0006g (ver report 0009); unibus v0.8.0
domain: security
scope: unibus (cmd/membershipd, pkg/membership, pkg/embeddednats, pkg/busauth, pkg/client)
depends_on: 0003 (completa su wiring), 0005 (hereda el bus single-node ya seguro)
+14
View File
@@ -85,6 +85,20 @@ func (s *Session) Join(roomID string) error {
return s.c.Join(roomID)
}
// RefreshSession reconnects the data plane so the bus re-derives this peer's
// per-subject permissions from its current room membership.
//
// Membership-change contract (issue 0006e): a secured bus (--bus-auth enforce)
// freezes a connection's permissions at connect time. After ANY membership change
// — a room you just created, were invited to, or joined — call RefreshSession
// BEFORE Publish/Subscribe on that room, or the bus denies the new room's subject.
// It also drops active subscriptions, so re-Subscribe afterwards. On an unsecured
// bus it is a harmless reconnect. A mobile/gateway caller wires this exactly like
// cmd/chat and cmd/worker do: CreateRoom -> RefreshSession -> Subscribe/Publish.
func (s *Session) RefreshSession() error {
return s.c.RefreshSession()
}
// Publish sends a UTF-8 text message to the room.
func (s *Session) Publish(roomID, text string) error {
return s.c.Publish(roomID, []byte(text))
+40
View File
@@ -82,6 +82,15 @@ type PermissionsFunc func(signPubHex string) (*server.Permissions, error)
type nkeyAuthenticatorACL struct {
isAuthorized func(signPubHex string) bool
perms PermissionsFunc
// internalPubHex is the lowercase-hex Ed25519 public key of membershipd's own
// ephemeral internal service identity. A connection that proves that key is
// granted full permissions WITHOUT consulting the allowlist, so the service can
// bootstrap and manage JetStream (the replicated nonce bucket and, when
// decentralized, the control-plane KV buckets) against its own embedded server
// even while the data plane confines every client to its rooms. Empty disables
// the internal-identity path entirely (behavior identical to a plain ACL
// authenticator).
internalPubHex string
}
// NewNkeyAuthenticatorACL builds an authenticator that authorizes by the bus
@@ -94,6 +103,29 @@ func NewNkeyAuthenticatorACL(isAuthorized func(signPubHex string) bool, perms Pe
return &nkeyAuthenticatorACL{isAuthorized: isAuthorized, perms: perms}
}
// NewNkeyAuthenticatorACLInternal is NewNkeyAuthenticatorACL that also recognizes
// membershipd's internal service identity (internalPubHex, the lowercase hex of
// its ephemeral Ed25519 public key): a connection proving that key is granted
// full permissions without an allowlist lookup, so the service can create and
// manage JetStream against its own embedded server under enforce (issue 0006a/c —
// the replicated nonce bucket and the control-plane KV). Every other identity
// goes through the allowlist + per-subject ACL unchanged. An empty internalPubHex
// is identical to NewNkeyAuthenticatorACL, so this is a superset and safe to use
// everywhere the plain constructor was used.
func NewNkeyAuthenticatorACLInternal(isAuthorized func(signPubHex string) bool, perms PermissionsFunc, internalPubHex string) server.Authentication {
return &nkeyAuthenticatorACL{isAuthorized: isAuthorized, perms: perms, internalPubHex: internalPubHex}
}
// fullPermissions grants publish and subscribe on every subject (">"). It is the
// permission set for membershipd's own internal service connection, which must
// manage the JetStream control plane (nonce bucket + KV buckets) over NATS. It is
// NEVER granted to a bus user — only to the process's own ephemeral internal
// identity, recognized by exact public-key match in Check.
func fullPermissions() *server.Permissions {
sp := &server.SubjectPermission{Allow: []string{">"}}
return &server.Permissions{Publish: sp, Subscribe: sp}
}
// Check verifies the nkey, authorizes against the allowlist, then derives and
// registers the connection's subject permissions. A permissions-derivation
// error denies the connection (fail closed) rather than granting open access.
@@ -102,6 +134,14 @@ func (a *nkeyAuthenticatorACL) Check(c server.ClientAuthentication) bool {
if !ok {
return false
}
// membershipd's own internal service identity bypasses the allowlist and is
// granted full permissions so the service can bootstrap JetStream under
// enforce. The key is matched exactly against the cryptographically verified
// connecting key, so no other identity can claim these permissions.
if a.internalPubHex != "" && signPubHex == a.internalPubHex {
c.RegisterUser(&server.User{Permissions: fullPermissions()})
return true
}
if !a.isAuthorized(signPubHex) {
return false
}
+83 -17
View File
@@ -1,31 +1,95 @@
package membership
// Per-subject data-plane access control derived from room membership (issue
// 0003e, audit H4 residual). The control plane already authorizes metadata by
// membership; this is the matching restriction on the NATS data plane so a
// registered peer can only publish/subscribe on the subjects of the rooms it
// actually belongs to — not on every subject on the bus.
// 0003e, audit H4 residual; tightened in issue 0006b for audit 0008 N2). The
// control plane already authorizes metadata by membership; this is the matching
// restriction on the NATS data plane so a registered peer can only
// publish/subscribe on the subjects of the rooms it actually belongs to — and can
// only reach the JetStream API of ITS OWN rooms' streams, never the control-plane
// KV buckets.
import (
"encoding/hex"
"fmt"
"strings"
"github.com/enmanuel/unibus/pkg/frame"
)
// clientInfraSubjects are the subjects every peer needs regardless of room
// membership: the request/reply inbox space and the JetStream API (the durable
// plane of persisted rooms). They are granted to all authorized peers so
// request/reply and persisted-room history keep working under the subject ACL.
var clientInfraSubjects = []string{"_INBOX.>", "$JS.API.>"}
// clientInfraSubjects are the subjects every authorized peer needs regardless of
// room membership, kept deliberately MINIMAL (issue 0006b, audit 0008 N2):
//
// - "_INBOX.>" — request/reply plus the JetStream pull-consumer delivery
// and publish-ack inboxes.
// - "$JS.API.INFO" — account-level JetStream info (limits/usage counters). It
// exposes NO room/user/key contents, so granting it leaks nothing.
//
// It NO LONGER contains "$JS.API.>". That broad grant was the N2 leak: it let any
// registered peer drive the whole JetStream API and read the control-plane KV
// buckets (KV_UNIBUS_users/rooms/members/room_keys) and the object store directly
// over NATS, bypassing the HTTP authorization (requireMember and the own-endpoint
// checks). JetStream API access is now granted PER ROOM, scoped to the stream of
// each room the peer belongs to (jsSubjectsFor). Because the control-plane KV
// streams (KV_UNIBUS_*) and the object store (OBJ_UNIBUS_*) are never a room
// stream, they fall outside the closed allow set and are denied by default.
var clientInfraSubjects = []string{"_INBOX.>", "$JS.API.INFO"}
// SubjectACLFor returns a function that maps a signing public key (lowercase
// hex) to the data-plane subjects that identity may publish and subscribe to:
// the subject of every room it belongs to, plus the client infrastructure
// subjects. It reads the live membership store, so the permissions reflect the
// identity's rooms at the moment it connects. A decode error or a store failure
// is returned as an error so the caller can fail closed (deny the connection)
// rather than grant open access.
// roomStreamName is the JetStream stream name a persisted room maps to. It MUST
// stay identical to pkg/client.streamName ("UNIBUS_" + sanitized roomID) so the
// per-room ACL grants exactly the subjects the client's JetStream calls use. Room
// ids are ULIDs (no '.'), so the sanitizing is a no-op in practice, but the rule
// is replicated defensively so the producer (client) and the authorizer (this
// ACL) never drift apart.
func roomStreamName(roomID string) string {
var b strings.Builder
b.Grow(len("UNIBUS_") + len(roomID))
b.WriteString("UNIBUS_")
for _, r := range roomID {
switch {
case r >= 'a' && r <= 'z', r >= 'A' && r <= 'Z', r >= '0' && r <= '9', r == '_':
b.WriteRune(r)
default:
b.WriteRune('_')
}
}
return b.String()
}
// jsSubjectsFor returns the MINIMAL JetStream API subjects a peer needs to use the
// durable stream of ONE persisted room: create/update/info the stream, manage and
// pull from its durable consumer, and ack deliveries. Every subject embeds this
// room's stream name, so the grant cannot reach another room's stream nor any
// control-plane stream (KV_UNIBUS_* / OBJ_UNIBUS_*). The wildcard layout matches
// the NATS JetStream API subject grammar (the stream name is the trailing token
// of single-verb requests and follows a two-token verb for MSG.GET / MSG.NEXT /
// DURABLE.CREATE):
//
// $JS.API.STREAM.<verb>.<stream> verb in {CREATE,UPDATE,INFO,DELETE,PURGE,...}
// $JS.API.STREAM.MSG.<op>.<stream> op in {GET,DELETE}
// $JS.API.CONSUMER.<verb>.<stream> verb in {LIST,NAMES,CREATE(ephemeral)}
// $JS.API.CONSUMER.<verb>.<stream>.<consumer>... verb in {CREATE,INFO,DELETE}
// $JS.API.CONSUMER.<v1>.<v2>.<stream>.<cons> {MSG.NEXT, DURABLE.CREATE}
// $JS.ACK.<stream>.> message acknowledgements
func jsSubjectsFor(roomID string) []string {
s := roomStreamName(roomID)
return []string{
"$JS.API.STREAM.*." + s,
"$JS.API.STREAM.*.*." + s,
"$JS.API.CONSUMER.*." + s,
"$JS.API.CONSUMER.*." + s + ".>",
"$JS.API.CONSUMER.*.*." + s + ".>",
"$JS.ACK." + s + ".>",
}
}
// SubjectACLFor returns a function that maps a signing public key (lowercase hex)
// to the data-plane subjects that identity may publish and subscribe to: the
// subject of every room it belongs to, the per-room JetStream API subjects of
// those rooms (so persisted-room history keeps working), plus the minimal client
// infrastructure subjects. It reads the live membership store, so the permissions
// reflect the identity's rooms at the moment it connects. A decode error or a
// store failure is returned as an error so the caller can fail closed (deny the
// connection) rather than grant open access.
//
// Because NATS freezes permissions at connect time, a peer invited to a new room
// after connecting must reconnect (client.RefreshSession) to pick up the new
@@ -42,10 +106,12 @@ func SubjectACLFor(store Store) func(signPubHex string) ([]string, error) {
if err != nil {
return nil, fmt.Errorf("acl: list rooms for %s: %w", endpoint, err)
}
subjects := make([]string, 0, len(rooms)+len(clientInfraSubjects))
// clientInfra + per room: the room subject + that room's JetStream API.
subjects := make([]string, 0, len(clientInfraSubjects)+len(rooms)*7)
subjects = append(subjects, clientInfraSubjects...)
for _, r := range rooms {
subjects = append(subjects, r.Subject)
subjects = append(subjects, jsSubjectsFor(r.RoomID)...)
}
return subjects, nil
}
+5 -8
View File
@@ -229,14 +229,11 @@ func TestSubjectACLIsolation(t *testing.T) {
// - golden: the member still pub/subs her own room, and the non-member never
// captures that traffic.
//
// Residual (DOCUMENTED, not closed here): the client-infra grant includes
// "$JS.API.>", shared by all peers so per-connection JetStream works. A peer that
// subscribes specifically to "$JS.API.>" can still observe stream-management
// requests whose subjects embed the stream name derived from a room id. Fully
// closing that needs NATS accounts/permissions isolation per identity (deferred to
// the 0003 decentralization line). The high-impact leak the auditor exploited —
// the room subject itself and JetStream advisories captured via "Subscribe(\">\")"
// — is closed.
// Residual now CLOSED (issue 0006b, audit 0008 N2): the client-infra grant no
// longer includes "$JS.API.>". JetStream API access is granted per-room only
// (membership.jsSubjectsFor), so a peer can reach the API of its OWN rooms'
// streams but not the control-plane KV buckets (KV_UNIBUS_*) nor another room's
// stream. See TestAttack0008_N2 for the closed-leak regression.
func TestReaudit_H4_WildcardMetadataLeak(t *testing.T) {
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
+152
View File
@@ -0,0 +1,152 @@
package membership_test
// Regression for audit report 0008, vector N2: with the broad "$JS.API.>" grant
// removed (issue 0006b), a registered peer that belongs to no room can no longer
// read the control-plane KV buckets over NATS, while the per-room JetStream API of
// a peer's OWN rooms keeps working. The auditor's ephemeral attack populated the
// KV control plane and had a registered non-member harvest the allowlist, the room
// graph and the sealed-key metadata directly through "$JS.API.>".
import (
"context"
"encoding/hex"
"path/filepath"
"testing"
"time"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
server "github.com/nats-io/nats-server/v2/server"
)
// startACLNatsInternal is startACLNats plus a recognized internal service identity
// (so the test can seed the KV control plane with full permissions, exactly as the
// decentralized membershipd does at bootstrap).
func startACLNatsInternal(t *testing.T, store membership.Store, internalPubHex string) *server.Server {
t.Helper()
auth := busauth.NewNkeyAuthenticatorACLInternal(store.IsAuthorized, aclPermsFunc(store), internalPubHex)
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: aclFreePort(t), Auth: auth,
})
if err != nil {
t.Fatalf("acl nats: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
return ns
}
// TestAttack0008_N2 reproduces the control-plane KV leak and proves it is closed.
//
// error : eve (registered, member of no room) cannot read the KV buckets — the
// JetStream KV API and the raw $KV subject space are both denied.
// golden: the owner of a persisted room can still drive the JetStream API of HER
// OWN room's stream (so persisted-room history keeps working).
// edge : eve cannot reach another room's stream API either (cross-room JS deny).
func TestAttack0008_N2(t *testing.T) {
dir := t.TempDir()
// The HTTP control-plane store stays SQLite; the KV buckets below stand in for
// the decentralized control plane the attack targets.
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
ceo, eve, internalID := mustID(t), mustID(t), mustID(t)
ceoEP := frame.EndpointID(ceo.SignPub)
mustAddUser(t, store, ceo, "ceo-root-admin")
mustAddUser(t, store, eve, "eve") // registered, member of nothing
// A persisted room owned by ceo: ceo is a member, so her per-room JS is allowed.
if err := store.CreateRoom(
membership.RoomInfo{RoomID: "PRIVROOM", Subject: "room.board.ma-deal", Encrypt: true, Persist: true, OwnerEndpoint: ceoEP},
ceo.SignPub, ceo.KexPub, []byte("sealed-self"),
); err != nil {
t.Fatalf("create room: %v", err)
}
internalPubHex := hex.EncodeToString(internalID.SignPub)
ns := startACLNatsInternal(t, store, internalPubHex)
url := ns.ClientURL()
// Seed the KV control plane with the privileged internal identity (full perms),
// simulating the decentralized buckets the attack reads.
intErr := make(chan error, 4)
intNC := nkeyConn(t, url, internalID, intErr)
intJS, err := jetstream.New(intNC)
if err != nil {
t.Fatalf("internal jetstream: %v", err)
}
kvStore, err := membership.OpenJetStream(intJS, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
if err != nil {
t.Fatalf("open kv buckets: %v", err)
}
if err := kvStore.AddUser(hex.EncodeToString(ceo.SignPub), "ceo-root-admin", membership.RoleAdmin); err != nil {
t.Fatalf("seed kv user: %v", err)
}
// Each JetStream op gets its own short context: a DENIED request never gets a
// reply, so it blocks until its own deadline — a shared context would be
// exhausted by the first denied call and starve the rest.
freshCtx := func(d time.Duration) (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), d)
}
// --- error: eve cannot read the control-plane KV buckets ------------------
eveErr := make(chan error, 8)
eveNC := nkeyConn(t, url, eve, eveErr)
eveJS, err := jetstream.New(eveNC)
if err != nil {
t.Fatalf("eve jetstream: %v", err)
}
// (a) The KV API: binding the bucket requires STREAM.INFO.KV_UNIBUS_users, which
// eve has no permission for, so this must fail (no leak of users).
kvCtx, kvCancel := freshCtx(2 * time.Second)
if kv, err := eveJS.KeyValue(kvCtx, "UNIBUS_users"); err == nil {
if e, gerr := kv.Get(kvCtx, hex.EncodeToString(ceo.SignPub)); gerr == nil {
kvCancel()
t.Fatalf("eve read the control-plane KV users bucket: %q (N2 leak still open)", string(e.Value()))
}
kvCancel()
t.Fatalf("eve was able to BIND the KV users bucket (N2 leak still open)")
}
kvCancel()
// (b) The raw KV subject space: a direct subscribe must be a permissions
// violation (delivered async to the error handler).
drain(eveErr)
if _, err := eveNC.Subscribe("$KV.UNIBUS_users.>", func(*nats.Msg) {}); err != nil {
t.Fatalf("eve sub $KV: %v", err)
}
_ = eveNC.Flush()
if e := waitErr(eveErr, 1*time.Second); e == nil {
t.Fatalf("eve subscribing to $KV.UNIBUS_users.> must raise a permissions violation")
}
// --- edge: eve cannot reach another room's stream API ---------------------
edgeCtx, edgeCancel := freshCtx(2 * time.Second)
if _, err := eveJS.Stream(edgeCtx, "UNIBUS_PRIVROOM"); err == nil {
edgeCancel()
t.Fatalf("eve reached the foreign room stream API (cross-room JS not isolated)")
}
edgeCancel()
// --- golden: ceo can drive the JetStream API of HER OWN room's stream ------
ceoErr := make(chan error, 4)
ceoNC := nkeyConn(t, url, ceo, ceoErr)
ceoJS, err := jetstream.New(ceoNC)
if err != nil {
t.Fatalf("ceo jetstream: %v", err)
}
goldenCtx, goldenCancel := freshCtx(5 * time.Second)
defer goldenCancel()
if _, err := ceoJS.CreateOrUpdateStream(goldenCtx, jetstream.StreamConfig{
Name: "UNIBUS_PRIVROOM",
Subjects: []string{"room.board.ma-deal"},
Storage: jetstream.FileStorage,
}); err != nil {
t.Fatalf("ceo could not manage her OWN room stream (per-room JS broken): %v", err)
}
}
+57
View File
@@ -0,0 +1,57 @@
package membership_test
import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"path/filepath"
"testing"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/membership"
)
// TestHealthExposesPosture: /healthz publishes the node's security posture so a
// monitor (or a peer) can detect a cluster member that is not enforce+ACL+TLS
// (audit 0008 N1). The probe stays unauthenticated.
func TestHealthExposesPosture(t *testing.T) {
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
srv := membership.NewServer(store, blobs, membership.AuthEnforce)
srv.Posture = membership.Posture{Enforce: true, ACL: true, TLS: true, Cluster: true, Store: "kv"}
ts := httptest.NewServer(srv)
t.Cleanup(ts.Close)
resp, err := http.Get(ts.URL + "/healthz")
if err != nil {
t.Fatalf("get healthz: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("healthz status %d, want 200", resp.StatusCode)
}
body, _ := io.ReadAll(resp.Body)
var got struct {
Status string `json:"status"`
Posture membership.Posture `json:"posture"`
}
if err := json.Unmarshal(body, &got); err != nil {
t.Fatalf("decode healthz %q: %v", string(body), err)
}
if got.Status != "ok" {
t.Fatalf("status = %q, want ok", got.Status)
}
if !got.Posture.Enforce || !got.Posture.ACL || !got.Posture.TLS || !got.Posture.Cluster {
t.Fatalf("posture not surfaced correctly: %+v", got.Posture)
}
if got.Posture.Store != "kv" {
t.Fatalf("posture.store = %q, want kv", got.Posture.Store)
}
}
+88
View File
@@ -0,0 +1,88 @@
package membership_test
import (
"path/filepath"
"testing"
"time"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/enmanuel/unibus/pkg/room"
)
// TestClientCreateRoomRefreshPublishFlow is the issue 0006e DoD: under enforce+ACL
// a peer creates a room AFTER connecting, and pub/sub works without manual
// intervention because the client follows the membership-change contract
// (CreateRoom -> RefreshSession -> Subscribe/Publish), exactly as cmd/chat and
// cmd/worker now do. This is the end-to-end flow through the client API, proving
// the ACL is usable under enforce rather than something an operator must disable.
func TestClientCreateRoomRefreshPublishFlow(t *testing.T) {
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
alice, bob := mustID(t), mustID(t)
mustAddUser(t, store, alice, "alice")
mustAddUser(t, store, bob, "bob")
srv := startACLNats(t, store) // data plane: enforce + per-subject ACL
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
ctrl := newCtrl(t, store, blobs)
aliceC, err := client.NewWithOptions(srv.ClientURL(), ctrl, alice, client.Options{UseNkey: true})
if err != nil {
t.Fatalf("connect alice: %v", err)
}
defer aliceC.Close()
bobC, err := client.NewWithOptions(srv.ClientURL(), ctrl, bob, client.Options{UseNkey: true})
if err != nil {
t.Fatalf("connect bob: %v", err)
}
defer bobC.Close()
// alice creates a room AFTER connecting: the subject was not in her ACL at
// connect time, so she must refresh to publish on it (the worker contract).
roomID, err := aliceC.CreateRoom("room.flow.x", room.ModeNATS)
if err != nil {
t.Fatalf("alice create room: %v", err)
}
if err := aliceC.RefreshSession(); err != nil {
t.Fatalf("alice refresh: %v", err)
}
// alice invites bob; bob joins then refreshes to gain the subject (the chat
// subscriber contract), and only then subscribes.
if err := aliceC.Invite(roomID, bobC.Endpoint()); err != nil {
t.Fatalf("alice invite bob: %v", err)
}
if err := bobC.Join(roomID); err != nil {
t.Fatalf("bob join: %v", err)
}
if err := bobC.RefreshSession(); err != nil {
t.Fatalf("bob refresh: %v", err)
}
got := make(chan string, 4)
sub, err := bobC.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) { got <- string(plaintext) })
if err != nil {
t.Fatalf("bob subscribe after refresh: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(200 * time.Millisecond) // let the subscription settle
if err := aliceC.Publish(roomID, []byte("hello-under-acl")); err != nil {
t.Fatalf("alice publish after refresh: %v", err)
}
select {
case msg := <-got:
if msg != "hello-under-acl" {
t.Fatalf("bob got %q", msg)
}
case <-time.After(3 * time.Second):
t.Fatalf("bob did not receive the message: the create->refresh->subscribe flow is broken under enforce+ACL")
}
}
+20 -1
View File
@@ -81,6 +81,25 @@ type Server struct {
// (non-loopback) bind. See dev/0004d-dataplane-acl.md for the full rationale
// and the residual metadata exposure this does NOT close.
RequireEncryptedRooms bool
// Posture is the node's security posture, surfaced on /healthz so an operator
// or a peer can detect a node NOT running the homogeneous enforce+ACL+TLS
// posture a secure cluster requires (audit 0008 N1). It is set by the command;
// the zero value (all false) reflects an unsecured dev node.
Posture Posture
}
// Posture describes the security posture a membershipd node runs with. It is
// non-secret operational metadata (booleans + the store backend name), published
// on /healthz so a monitor can flag a cluster member that is not enforce+ACL+TLS
// — the weak node that would let an unauthenticated peer harvest the cluster's
// forwarded traffic (audit 0008 N1).
type Posture struct {
Enforce bool `json:"enforce"`
ACL bool `json:"acl"`
TLS bool `json:"tls"`
Cluster bool `json:"cluster"`
Store string `json:"store"` // "sqlite" | "kv"
}
// NewServer wires the membership store and blob store into an http.Handler. The
@@ -390,7 +409,7 @@ func (s *Server) verifyOwnerSig(roomID, by string, sig, canonical []byte) (Membe
// ---- handlers -------------------------------------------------------------
func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "posture": s.Posture})
}
func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {