Compare commits
15 Commits
5df99fa4c4
...
926b8e96af
| Author | SHA1 | Date | |
|---|---|---|---|
| 926b8e96af | |||
| ae39e35fb4 | |||
| 48a3d6be33 | |||
| 24ff45ca7e | |||
| b8201a82cd | |||
| 3a33656cac | |||
| 2f5b372a80 | |||
| 32bec75665 | |||
| 9b96537aa6 | |||
| 18ee7c469b | |||
| e9ad719424 | |||
| d1e1a478f8 | |||
| cacf608fde | |||
| a9c245d468 | |||
| 8b6a01d280 |
@@ -2,7 +2,7 @@
|
||||
name: unibus
|
||||
lang: go
|
||||
domain: infra
|
||||
version: 0.7.0
|
||||
version: 0.8.0
|
||||
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
|
||||
tags: [service, messaging, nats, e2e]
|
||||
uses_functions:
|
||||
@@ -154,6 +154,30 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
|
||||
|
||||
## Capability growth log
|
||||
|
||||
- v0.8.0 (2026-06-07) — completar y endurecer el cluster (issue 0006, fases
|
||||
0006a–0006g) que cierra los bloqueantes de la auditoría dedicada del cluster
|
||||
(report 0008) y cablea el control plane descentralizado que 0003 dejó a medias.
|
||||
(0006a) Se cablea el nonce replicado en el binario: un nodo con `--cluster-name`
|
||||
usa el bucket JetStream KV compartido obligatoriamente (fail-fast si no se crea),
|
||||
cerrando el replay cross-node (N3); el "ciclo bootstrap" se resuelve con una
|
||||
identidad interna efímera que el authenticator reconoce (full perms) y una
|
||||
conexión in-process privilegiada. (0006b) Se cierra la fuga del control plane
|
||||
por `$JS.API.>` (N2): la ACL pasa a un allow-set cerrado por-room (JS API solo de
|
||||
los streams `UNIBUS_<room>` del peer), dejando `KV_UNIBUS_*`/`OBJ_*` fuera del
|
||||
set y, por tanto, denegados. (0006c) Se cablea el store KV descentralizado
|
||||
(`--store kv|sqlite`, default sqlite = baseline idéntico) con un `storeHolder`
|
||||
fail-closed que rompe el ciclo bootstrap del authenticator. (0006d) Posture
|
||||
homogénea: un nodo rechaza unirse al cluster sin `enforce`, y `/healthz` publica
|
||||
la posture (N1). (0006e) Todos los clientes llaman `RefreshSession` tras cambios
|
||||
de membresía (N4), de modo que la ACL es usable bajo enforce sin desactivarla.
|
||||
(0006f) Bajos: secreto de cluster fuera de argv (`--cluster-pass-file`/env +
|
||||
inyección en routes), `migrate-to-kv` rechaza target remoto sin `--ca`, y docs
|
||||
de CA separada para routes + R1 SPOF vs R3 HA. (0006g) Material de deploy del
|
||||
cluster de 3 nodos (magnus+homer+datardos) en `deploy/cluster/` (certs, unit,
|
||||
script de despliegue dry-run, runbook) — sin tocar ningún VPS. Toda la
|
||||
regresión de auditorías previas + los ataques 0008 siguen verdes; govulncheck 0
|
||||
alcanzables. Branch-by-abstraction: con `--store sqlite` el single-node sigue
|
||||
idéntico y desplegable en todo momento.
|
||||
- v0.7.0 (2026-06-07) — hardening de seguridad 2 (issue 0005, fases 0005a–0005e)
|
||||
que cierra los hallazgos nuevos de la re-auditoría red-team (report 0006) y
|
||||
lleva el veredicto de exposición pública a "sí-con-condiciones". (0005a) Bump de
|
||||
|
||||
@@ -69,6 +69,12 @@ func runSimple(natsURL, ctrlURL, roomSub, idFile, caFile string) {
|
||||
if err := c.Join(roomID); err != nil {
|
||||
log.Fatalf("join: %v", err)
|
||||
}
|
||||
// Membership-change contract (issue 0006e): refresh so the just-created room's
|
||||
// subject is subscribable under enforce+ACL (permissions are frozen at connect
|
||||
// time). Must run BEFORE Subscribe — RefreshSession drops active subscriptions.
|
||||
if err := c.RefreshSession(); err != nil {
|
||||
log.Fatalf("refresh session after create room: %v", err)
|
||||
}
|
||||
sub, err := c.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
|
||||
fmt.Printf("[%s] %s: %s\n", f.Subject, shortID(f.Sender), string(plaintext))
|
||||
})
|
||||
@@ -122,12 +128,21 @@ func runEncryptedDemo(natsURL, ctrlURL, caFile string) {
|
||||
must(err, "A create room")
|
||||
fmt.Printf(" room.test -> %s (E2E, persisted, signed)\n", roomID)
|
||||
|
||||
// Membership-change contract (issue 0006e): A only became a member of this room
|
||||
// after connecting, so refresh to gain its subject + per-room JetStream API
|
||||
// under enforce+ACL before publishing.
|
||||
must(a.RefreshSession(), "A refresh after create room")
|
||||
|
||||
// A invites B (seals K to B's X25519 key).
|
||||
must(a.Invite(roomID, b.Endpoint()), "A invite B")
|
||||
|
||||
// B joins (fetches + decrypts K).
|
||||
must(b.Join(roomID), "B join")
|
||||
|
||||
// B became a member via the invite above; refresh so B can subscribe to the
|
||||
// room's subject under enforce+ACL (before subscribing — refresh drops subs).
|
||||
must(b.RefreshSession(), "B refresh after join")
|
||||
|
||||
// B subscribes; capture received plaintexts.
|
||||
recv := make(chan string, 4)
|
||||
subB, err := b.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
|
||||
|
||||
@@ -0,0 +1,221 @@
|
||||
package main
|
||||
|
||||
// Regression for audit report 0008, vector N3: the binary must wire the
|
||||
// replicated nonce store on a clustered node so a signed request accepted on one
|
||||
// node cannot be replayed to another. The auditor's ephemeral attack showed the
|
||||
// OLD binary never called UseReplicatedNonces (each node kept a per-process
|
||||
// cache), so a captured request replayed to a second node with 200+200. These
|
||||
// tests drive the SAME helper the binary uses (wireReplicatedNonces) so they
|
||||
// prove the WIRING, not just the underlying API.
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
func freePort(t *testing.T) int {
|
||||
t.Helper()
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("free port: %v", err)
|
||||
}
|
||||
defer l.Close()
|
||||
return l.Addr().(*net.TCPAddr).Port
|
||||
}
|
||||
|
||||
// signed008 builds a transport-signed control-plane request with a caller-chosen
|
||||
// ts+nonce, so a test can reuse the exact same signed bytes against two nodes to
|
||||
// exercise replay.
|
||||
func signed008(t *testing.T, baseURL, method, path string, body []byte, id cs.Identity, ts int64, nonce string) *http.Request {
|
||||
t.Helper()
|
||||
canonical := membership.CanonicalRequest(method, path, strconv.FormatInt(ts, 10), nonce, body)
|
||||
sig := cs.SignEd25519(id.SignPriv, canonical)
|
||||
var rdr io.Reader
|
||||
if body != nil {
|
||||
rdr = bytes.NewReader(body)
|
||||
}
|
||||
req, err := http.NewRequest(method, baseURL+path, rdr)
|
||||
if err != nil {
|
||||
t.Fatalf("new request: %v", err)
|
||||
}
|
||||
req.Header.Set("X-Unibus-Pub", hex.EncodeToString(id.SignPub))
|
||||
req.Header.Set("X-Unibus-Ts", strconv.FormatInt(ts, 10))
|
||||
req.Header.Set("X-Unibus-Nonce", nonce)
|
||||
req.Header.Set("X-Unibus-Sig", base64.StdEncoding.EncodeToString(sig))
|
||||
return req
|
||||
}
|
||||
|
||||
func randNonce(t *testing.T) string {
|
||||
t.Helper()
|
||||
raw := make([]byte, 16)
|
||||
if _, err := rand.Read(raw); err != nil {
|
||||
t.Fatalf("nonce: %v", err)
|
||||
}
|
||||
return base64.StdEncoding.EncodeToString(raw)
|
||||
}
|
||||
|
||||
// TestAttack0008_N3 is the blocker regression: two clustered membershipd nodes
|
||||
// wired through wireReplicatedNonces share a JetStream KV nonce bucket, so a
|
||||
// request accepted on node A is rejected (401) when replayed to node B. Before
|
||||
// the fix the binary never wired this and the replay returned 200.
|
||||
func TestAttack0008_N3(t *testing.T) {
|
||||
// One NATS+JetStream backing the shared nonce bucket (no client auth needed:
|
||||
// the test drives the membership.Server's nonce store directly via HTTP).
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("nats: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||
nc, err := nats.Connect(ns.ClientURL())
|
||||
if err != nil {
|
||||
t.Fatalf("connect: %v", err)
|
||||
}
|
||||
t.Cleanup(nc.Close)
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
t.Fatalf("jetstream: %v", err)
|
||||
}
|
||||
|
||||
// Shared control-plane state (stand-in for the replicated store) + two nodes.
|
||||
dir := t.TempDir()
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
alice, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("identity: %v", err)
|
||||
}
|
||||
if err := store.AddUser(hex.EncodeToString(alice.SignPub), "alice", membership.RoleAdmin); err != nil {
|
||||
t.Fatalf("add alice: %v", err)
|
||||
}
|
||||
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
|
||||
// Each node is wired EXACTLY as the binary wires a clustered node.
|
||||
mkNode := func() *httptest.Server {
|
||||
srv := membership.NewServer(store, blobs, membership.AuthEnforce)
|
||||
if err := wireReplicatedNonces(srv, js, true /*clustered*/, 1); err != nil {
|
||||
t.Fatalf("wireReplicatedNonces: %v", err)
|
||||
}
|
||||
return httptest.NewServer(srv)
|
||||
}
|
||||
nodeA := mkNode()
|
||||
t.Cleanup(nodeA.Close)
|
||||
nodeB := mkNode()
|
||||
t.Cleanup(nodeB.Close)
|
||||
|
||||
ts := time.Now().Unix()
|
||||
nonce := randNonce(t)
|
||||
path := "/members/" + frame.EndpointID(alice.SignPub) + "/rooms"
|
||||
|
||||
// Golden: alice's signed request is accepted on node A.
|
||||
respA, err := http.DefaultClient.Do(signed008(t, nodeA.URL, "GET", path, nil, alice, ts, nonce))
|
||||
if err != nil {
|
||||
t.Fatalf("do A: %v", err)
|
||||
}
|
||||
respA.Body.Close()
|
||||
if respA.StatusCode != http.StatusOK {
|
||||
t.Fatalf("node A first use: status %d, want 200", respA.StatusCode)
|
||||
}
|
||||
|
||||
// Error path (the attack): replay the SAME signed bytes to node B → 401.
|
||||
respB, err := http.DefaultClient.Do(signed008(t, nodeB.URL, "GET", path, nil, alice, ts, nonce))
|
||||
if err != nil {
|
||||
t.Fatalf("do B: %v", err)
|
||||
}
|
||||
respB.Body.Close()
|
||||
if respB.StatusCode != http.StatusUnauthorized {
|
||||
t.Fatalf("cross-node replay to node B: status %d, want 401 (replayed nonce must be rejected)", respB.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAttack0008_N3_StandaloneKeepsLocalCache is the edge: a NON-clustered node
|
||||
// must NOT require JetStream — wireReplicatedNonces is a no-op and the node keeps
|
||||
// its in-memory cache, which still rejects a same-node replay (the single-node
|
||||
// guarantee is unchanged). This proves the fix does not add a JetStream
|
||||
// dependency to standalone deployments.
|
||||
func TestAttack0008_N3_StandaloneKeepsLocalCache(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
alice, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("identity: %v", err)
|
||||
}
|
||||
if err := store.AddUser(hex.EncodeToString(alice.SignPub), "alice", membership.RoleAdmin); err != nil {
|
||||
t.Fatalf("add alice: %v", err)
|
||||
}
|
||||
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
|
||||
srv := membership.NewServer(store, blobs, membership.AuthEnforce)
|
||||
// Standalone: clustered=false, js=nil. Must succeed (no JetStream needed).
|
||||
if err := wireReplicatedNonces(srv, nil, false /*clustered*/, 1); err != nil {
|
||||
t.Fatalf("standalone wireReplicatedNonces must be a no-op, got: %v", err)
|
||||
}
|
||||
node := httptest.NewServer(srv)
|
||||
t.Cleanup(node.Close)
|
||||
|
||||
ts := time.Now().Unix()
|
||||
nonce := randNonce(t)
|
||||
path := "/members/" + frame.EndpointID(alice.SignPub) + "/rooms"
|
||||
|
||||
resp1, err := http.DefaultClient.Do(signed008(t, node.URL, "GET", path, nil, alice, ts, nonce))
|
||||
if err != nil {
|
||||
t.Fatalf("do 1: %v", err)
|
||||
}
|
||||
resp1.Body.Close()
|
||||
if resp1.StatusCode != http.StatusOK {
|
||||
t.Fatalf("first use: status %d, want 200", resp1.StatusCode)
|
||||
}
|
||||
// Same-node replay is still rejected by the in-memory cache.
|
||||
resp2, err := http.DefaultClient.Do(signed008(t, node.URL, "GET", path, nil, alice, ts, nonce))
|
||||
if err != nil {
|
||||
t.Fatalf("do 2: %v", err)
|
||||
}
|
||||
resp2.Body.Close()
|
||||
if resp2.StatusCode != http.StatusUnauthorized {
|
||||
t.Fatalf("same-node replay: status %d, want 401", resp2.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAttack0008_N3_ClusteredRequiresJetStream proves the hard rule: a clustered
|
||||
// node with NO JetStream available refuses (error), so the binary fails fast
|
||||
// instead of silently running with a per-process cache.
|
||||
func TestAttack0008_N3_ClusteredRequiresJetStream(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
srv := membership.NewServer(store, blobs, membership.AuthEnforce)
|
||||
if err := wireReplicatedNonces(srv, nil, true /*clustered*/, 1); err == nil {
|
||||
t.Fatalf("clustered node with no JetStream must fail, got nil")
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,8 @@ package main
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
@@ -21,6 +23,74 @@ func splitRoutes(csv string) []string {
|
||||
return out
|
||||
}
|
||||
|
||||
// resolveClusterPass resolves the cluster route secret WITHOUT leaking it through
|
||||
// argv (audit 0008 N1-low: --cluster-pass in argv is visible in ps/journald).
|
||||
// Precedence: --cluster-pass-file (read + trim the file), then the env var
|
||||
// UNIBUS_CLUSTER_PASS, then the legacy --cluster-pass flag (argv-visible, kept for
|
||||
// dev/compat). env is injected (os.Getenv result) so the function stays testable.
|
||||
// It returns the secret and a short source label for logging (never the secret).
|
||||
func resolveClusterPass(passFlag, passFile, env string) (secret, source string, err error) {
|
||||
if passFile != "" {
|
||||
b, rerr := os.ReadFile(passFile)
|
||||
if rerr != nil {
|
||||
return "", "", fmt.Errorf("read --cluster-pass-file %q: %w", passFile, rerr)
|
||||
}
|
||||
return strings.TrimSpace(string(b)), "file", nil
|
||||
}
|
||||
if env != "" {
|
||||
return env, "env", nil
|
||||
}
|
||||
if passFlag != "" {
|
||||
return passFlag, "flag", nil
|
||||
}
|
||||
return "", "none", nil
|
||||
}
|
||||
|
||||
// injectRouteCreds rewrites each route URL that carries NO userinfo to embed
|
||||
// user:pass, so the cluster secret is supplied once (via file/env) instead of
|
||||
// repeated in every --routes argv entry where ps/journald would expose it. A route
|
||||
// that already carries userinfo is left untouched (operator override). With an
|
||||
// empty user it is a no-op. A malformed route URL is an error (configuration bug)
|
||||
// rather than a silently dropped peer.
|
||||
func injectRouteCreds(routes []string, user, pass string) ([]string, error) {
|
||||
if user == "" {
|
||||
return routes, nil
|
||||
}
|
||||
out := make([]string, 0, len(routes))
|
||||
for _, r := range routes {
|
||||
u, err := url.Parse(r)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse route %q: %w", r, err)
|
||||
}
|
||||
if u.User == nil {
|
||||
u.User = url.UserPassword(user, pass)
|
||||
}
|
||||
out = append(out, u.String())
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// isLoopbackURL reports whether a NATS url targets this host only (loopback). Used
|
||||
// to guard migrate-to-kv (audit 0008 N6): pushing the allowlist to a REMOTE NATS
|
||||
// without TLS would send handles/roles/sign-pubs in cleartext, so a remote target
|
||||
// must be TLS-pinned (--ca). A url we cannot classify is treated as NON-loopback
|
||||
// (conservative: it then requires --ca).
|
||||
func isLoopbackURL(natsURL string) bool {
|
||||
u, err := url.Parse(natsURL)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
host := u.Hostname()
|
||||
switch host {
|
||||
case "localhost":
|
||||
return true
|
||||
case "":
|
||||
return false
|
||||
}
|
||||
ip := net.ParseIP(host)
|
||||
return ip != nil && ip.IsLoopback()
|
||||
}
|
||||
|
||||
// isLoopbackBind reports whether the --bind value keeps the service reachable
|
||||
// only from this host. An empty bind means "all interfaces" (public), and a
|
||||
// hostname we cannot resolve to a loopback literal is treated as public — the
|
||||
@@ -83,7 +153,17 @@ func validateBootConfig(bind string, mode membership.AuthMode, tlsCert, tlsKey s
|
||||
// The three route-TLS paths are all-or-nothing (mutual TLS needs the node cert,
|
||||
// its key, and the CA together), independent of the bind, so a partial TLS
|
||||
// config never silently degrades to plaintext routes.
|
||||
func validateClusterConfig(clusterName, bind, user, pass, rtCert, rtKey, rtCA string) error {
|
||||
//
|
||||
// Homogeneous posture (issue 0006d, audit 0008 N1): a cluster is only as secure
|
||||
// as its weakest node — the data plane forwards every subject between nodes, so a
|
||||
// single node running without enforced auth lets an unauthenticated peer
|
||||
// Subscribe(">") on it and harvest the traffic forwarded from the ACL'd nodes.
|
||||
// This node therefore REFUSES to join a cluster unless it runs --bus-auth enforce,
|
||||
// regardless of bind: a clustered node is a production node, and there is no safe
|
||||
// "dev cluster without auth". (A peer running a tampered binary is out of this
|
||||
// node's control; /healthz exposes each node's posture so a monitor can detect
|
||||
// one that is not enforce+ACL — see Server.Posture.)
|
||||
func validateClusterConfig(clusterName, bind, user, pass, rtCert, rtKey, rtCA string, mode membership.AuthMode) error {
|
||||
rtAny := rtCert != "" || rtKey != "" || rtCA != ""
|
||||
rtAll := rtCert != "" && rtKey != "" && rtCA != ""
|
||||
if rtAny && !rtAll {
|
||||
@@ -93,6 +173,13 @@ func validateClusterConfig(clusterName, bind, user, pass, rtCert, rtKey, rtCA st
|
||||
if clusterName == "" {
|
||||
return nil // standalone: no route layer to secure
|
||||
}
|
||||
// A clustered node MUST enforce auth (homogeneous posture). Checked before the
|
||||
// loopback shortcut so even a loopback cluster cannot form without enforce.
|
||||
if mode != membership.AuthEnforce {
|
||||
return fmt.Errorf(
|
||||
"refusing to start: cluster %q requires --bus-auth enforce; a cluster node without enforced auth+ACL lets an unauthenticated peer harvest the traffic forwarded from the other nodes (audit 0008 N1) — every node must run the same enforce+ACL+TLS posture",
|
||||
clusterName)
|
||||
}
|
||||
if isLoopbackBind(bind) {
|
||||
return nil // loopback cluster is dev-only and unreachable from outside
|
||||
}
|
||||
|
||||
@@ -108,31 +108,40 @@ func TestBootConfigPolicy(t *testing.T) {
|
||||
// route-TLS flags are all-or-nothing regardless of bind.
|
||||
func TestClusterConfigPolicy(t *testing.T) {
|
||||
const c, k, ca = "node.crt", "node.key", "ca.crt"
|
||||
en := membership.AuthEnforce
|
||||
off := membership.AuthOff
|
||||
soft := membership.AuthSoft
|
||||
cases := []struct {
|
||||
name string
|
||||
clusterName, bind string
|
||||
user, pass string
|
||||
rtCert, rtKey, rtCA string
|
||||
wantErr bool
|
||||
name string
|
||||
clusterName, bind string
|
||||
user, pass string
|
||||
rtCert, rtKey, rtCA string
|
||||
mode membership.AuthMode
|
||||
wantErr bool
|
||||
}{
|
||||
// Standalone (no cluster name) is always allowed, even on a public bind.
|
||||
{"standalone-public", "", "0.0.0.0", "", "", "", "", "", false},
|
||||
// Loopback dev cluster: unguarded (unreachable from outside).
|
||||
{"loopback-cluster-bare", "unibus", "127.0.0.1", "", "", "", "", "", false},
|
||||
// Golden: full public HA config.
|
||||
{"public-full", "unibus", "0.0.0.0", "u", "p", c, k, ca, false},
|
||||
// Error: public cluster without a route secret.
|
||||
{"public-no-secret", "unibus", "0.0.0.0", "", "", c, k, ca, true},
|
||||
{"public-half-secret", "unibus", "0.0.0.0", "u", "", c, k, ca, true},
|
||||
// Standalone (no cluster name) is always allowed, even on a public bind and
|
||||
// without enforce — the cluster posture rule does not apply to a single node.
|
||||
{"standalone-public-off", "", "0.0.0.0", "", "", "", "", "", off, false},
|
||||
// Loopback dev cluster WITH enforce: allowed (unreachable from outside).
|
||||
{"loopback-cluster-enforce", "unibus", "127.0.0.1", "", "", "", "", "", en, false},
|
||||
// Golden: full public HA config under enforce.
|
||||
{"public-full-enforce", "unibus", "0.0.0.0", "u", "p", c, k, ca, en, false},
|
||||
// N1 (audit 0008): a clustered node WITHOUT enforce is refused — even on
|
||||
// loopback — so no weak node can join the cluster.
|
||||
{"cluster-off-refused", "unibus", "127.0.0.1", "", "", "", "", "", off, true},
|
||||
{"cluster-soft-refused", "unibus", "0.0.0.0", "u", "p", c, k, ca, soft, true},
|
||||
// Error: public cluster without a route secret (enforce on, fails on secret).
|
||||
{"public-no-secret", "unibus", "0.0.0.0", "", "", c, k, ca, en, true},
|
||||
{"public-half-secret", "unibus", "0.0.0.0", "u", "", c, k, ca, en, true},
|
||||
// Error: public cluster without mutual route TLS.
|
||||
{"public-no-tls", "unibus", "10.0.0.1", "u", "p", "", "", "", true},
|
||||
// Error: partial route-TLS flags trip regardless of bind.
|
||||
{"loopback-partial-tls", "unibus", "127.0.0.1", "", "", c, "", "", true},
|
||||
{"standalone-partial-tls", "", "127.0.0.1", "", "", c, k, "", true},
|
||||
{"public-no-tls", "unibus", "10.0.0.1", "u", "p", "", "", "", en, true},
|
||||
// Error: partial route-TLS flags trip regardless of bind/mode.
|
||||
{"loopback-partial-tls", "unibus", "127.0.0.1", "", "", c, "", "", en, true},
|
||||
{"standalone-partial-tls", "", "127.0.0.1", "", "", c, k, "", off, true},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
err := validateClusterConfig(tc.clusterName, tc.bind, tc.user, tc.pass, tc.rtCert, tc.rtKey, tc.rtCA)
|
||||
err := validateClusterConfig(tc.clusterName, tc.bind, tc.user, tc.pass, tc.rtCert, tc.rtKey, tc.rtCA, tc.mode)
|
||||
if tc.wantErr && err == nil {
|
||||
t.Fatalf("cluster config %+v should be refused", tc)
|
||||
}
|
||||
@@ -143,6 +152,22 @@ func TestClusterConfigPolicy(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestAttack0008_N1 is the regression for audit 0008 N1 scenario 2: a node
|
||||
// configured to join a cluster while NOT enforcing auth (the weak node that lets
|
||||
// an unauthenticated peer harvest the cluster's forwarded traffic) must be refused
|
||||
// at startup. The homogeneous-posture rule makes this binary unable to BE that
|
||||
// weak node.
|
||||
func TestAttack0008_N1(t *testing.T) {
|
||||
// Weak node: clustered but --bus-auth off -> refused.
|
||||
if err := validateClusterConfig("unibus", "0.0.0.0", "u", "p", "n.crt", "n.key", "ca.crt", membership.AuthOff); err == nil {
|
||||
t.Fatalf("a clustered node without enforce must be refused (audit 0008 N1)")
|
||||
}
|
||||
// Same node WITH enforce + full route security -> allowed.
|
||||
if err := validateClusterConfig("unibus", "0.0.0.0", "u", "p", "n.crt", "n.key", "ca.crt", membership.AuthEnforce); err != nil {
|
||||
t.Fatalf("a clustered enforce node with full route security must be allowed, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitRoutes(t *testing.T) {
|
||||
cases := []struct {
|
||||
in string
|
||||
|
||||
@@ -0,0 +1,84 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
|
||||
server "github.com/nats-io/nats-server/v2/server"
|
||||
)
|
||||
|
||||
// connectInternalJS opens a privileged JetStream client from membershipd to its
|
||||
// OWN embedded NATS server. This is the resolution of the "bootstrap cycle"
|
||||
// (issue 0006a/c): the service needs JetStream to create the replicated nonce
|
||||
// bucket and the control-plane KV, but under enforce the data plane only accepts
|
||||
// allowlisted clients confined to their rooms. The connection therefore
|
||||
// authenticates with the process's ephemeral internal identity — the identity the
|
||||
// authenticator was built to recognize (NewNkeyAuthenticatorACLInternal) and
|
||||
// grant full permissions — without ever appearing in the user allowlist.
|
||||
//
|
||||
// It uses the in-process transport (nats.InProcessServer), a Go pipe inside the
|
||||
// process, so it bypasses TLS entirely: no CA wiring is needed for this
|
||||
// self-connection even when the public data plane is TLS-only. useNkey mirrors
|
||||
// whether the embedded server enforces auth: under enforce the internal identity
|
||||
// presents its nkey; without enforce the server accepts an unauthenticated
|
||||
// in-process client and the nkey is omitted.
|
||||
//
|
||||
// The caller owns the returned connection and must Close it on shutdown (after
|
||||
// the JetStream context is no longer used).
|
||||
func connectInternalJS(ns *server.Server, internalID cs.Identity, useNkey bool) (*nats.Conn, jetstream.JetStream, error) {
|
||||
opts := []nats.Option{
|
||||
nats.Name("membershipd-internal"),
|
||||
nats.InProcessServer(ns),
|
||||
}
|
||||
if useNkey {
|
||||
pub, sign, err := busauth.ClientNkey(internalID.SignPriv)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("internal nkey: %w", err)
|
||||
}
|
||||
opts = append(opts, nats.Nkey(pub, sign))
|
||||
}
|
||||
// The URL is ignored for an in-process connection; the InProcessServer option
|
||||
// supplies the transport.
|
||||
nc, err := nats.Connect("", opts...)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("connect internal nats: %w", err)
|
||||
}
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
return nil, nil, fmt.Errorf("internal jetstream: %w", err)
|
||||
}
|
||||
return nc, js, nil
|
||||
}
|
||||
|
||||
// connectExternalJS opens a JetStream client to an EXTERNAL NATS the operator
|
||||
// runs (membershipd started with --nats-url). Unlike the embedded path there is
|
||||
// no in-process transport and no internal identity: the external server enforces
|
||||
// its own auth, so membershipd connects as a plain client (optionally TLS-pinned
|
||||
// to the bus CA). It is best-effort and intended for an operator-managed cluster;
|
||||
// the standard unibus deploy uses the embedded server (connectInternalJS).
|
||||
func connectExternalJS(natsURL, caPath string) (*nats.Conn, jetstream.JetStream, error) {
|
||||
opts := []nats.Option{nats.Name("membershipd-internal")}
|
||||
if caPath != "" {
|
||||
tlsCfg, err := busauth.LoadCATLSConfig(caPath)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("load CA %q: %w", caPath, err)
|
||||
}
|
||||
opts = append(opts, nats.Secure(tlsCfg))
|
||||
}
|
||||
nc, err := nats.Connect(natsURL, opts...)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("connect external nats %q: %w", natsURL, err)
|
||||
}
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
return nil, nil, fmt.Errorf("external jetstream: %w", err)
|
||||
}
|
||||
return nc, js, nil
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
package main
|
||||
|
||||
// Bootstrap test for issue 0006a/c: under enforce, membershipd must still reach
|
||||
// JetStream on its OWN embedded server to create the nonce/KV buckets. It does so
|
||||
// with an ephemeral internal identity the authenticator grants full permissions
|
||||
// (NewNkeyAuthenticatorACLInternal). These tests prove that privileged
|
||||
// self-connection works AND that no other identity can claim it.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
func icFreePort(t *testing.T) int {
|
||||
t.Helper()
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("free port: %v", err)
|
||||
}
|
||||
defer l.Close()
|
||||
return l.Addr().(*net.TCPAddr).Port
|
||||
}
|
||||
|
||||
// TestInternalConnPrivilegedUnderEnforce: with an enforce authenticator that
|
||||
// authorizes NO bus user, the internal identity still connects in-process and has
|
||||
// full permissions — it creates a KV bucket and round-trips a value. This is the
|
||||
// resolution of the bootstrap cycle the audit flagged as the reason the KV store
|
||||
// was never wired.
|
||||
func TestInternalConnPrivilegedUnderEnforce(t *testing.T) {
|
||||
internalID, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("internal identity: %v", err)
|
||||
}
|
||||
internalPubHex := hex.EncodeToString(internalID.SignPub)
|
||||
|
||||
// Authenticator: no bus user is authorized; only the internal identity passes.
|
||||
auth := busauth.NewNkeyAuthenticatorACLInternal(
|
||||
func(string) bool { return false },
|
||||
busauth.PermissionsFromSubjects(func(string) ([]string, error) { return []string{"_INBOX.>"}, nil }),
|
||||
internalPubHex,
|
||||
)
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: icFreePort(t), Auth: auth,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("nats: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||
|
||||
nc, js, err := connectInternalJS(ns, internalID, true /*useNkey*/)
|
||||
if err != nil {
|
||||
t.Fatalf("connectInternalJS: %v", err)
|
||||
}
|
||||
t.Cleanup(nc.Close)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KV_UNIBUS_test", Replicas: 1})
|
||||
if err != nil {
|
||||
t.Fatalf("internal conn could not create KV bucket (full perms expected): %v", err)
|
||||
}
|
||||
if _, err := kv.Put(ctx, "k", []byte("v")); err != nil {
|
||||
t.Fatalf("kv put: %v", err)
|
||||
}
|
||||
e, err := kv.Get(ctx, "k")
|
||||
if err != nil || string(e.Value()) != "v" {
|
||||
t.Fatalf("kv get: val=%q err=%v", e, err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestInternalConnOutsiderRejected: an identity that is neither the internal one
|
||||
// nor an allowlisted bus user cannot connect — proving the internal bypass is
|
||||
// scoped to the exact internal key, not a blanket hole.
|
||||
func TestInternalConnOutsiderRejected(t *testing.T) {
|
||||
internalID, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("internal identity: %v", err)
|
||||
}
|
||||
auth := busauth.NewNkeyAuthenticatorACLInternal(
|
||||
func(string) bool { return false },
|
||||
busauth.PermissionsFromSubjects(func(string) ([]string, error) { return []string{"_INBOX.>"}, nil }),
|
||||
hex.EncodeToString(internalID.SignPub),
|
||||
)
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: icFreePort(t), Auth: auth,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("nats: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||
|
||||
outsider, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("outsider identity: %v", err)
|
||||
}
|
||||
pub, sign, err := busauth.ClientNkey(outsider.SignPriv)
|
||||
if err != nil {
|
||||
t.Fatalf("outsider nkey: %v", err)
|
||||
}
|
||||
conn, err := nats.Connect(ns.ClientURL(),
|
||||
nats.Nkey(pub, sign),
|
||||
nats.MaxReconnects(0),
|
||||
nats.Timeout(2*time.Second),
|
||||
)
|
||||
if err == nil {
|
||||
conn.Close()
|
||||
t.Fatalf("outsider (unauthorized, non-internal) must be rejected, but connected")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,154 @@
|
||||
package main
|
||||
|
||||
// Wiring tests for issue 0006c: --store kv selects the replicated JetStream KV
|
||||
// control plane, the authenticator serves from it through the storeHolder, and a
|
||||
// new node sees state created by another (the divergence that per-node SQLite
|
||||
// caused — audit 0008 N5 — is gone). Branch-by-abstraction is verified elsewhere
|
||||
// (the SQLite default path is the unchanged baseline covered by the existing
|
||||
// suite).
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
// TestKVStoreBootstrapUnderEnforce drives the exact decentralized boot the binary
|
||||
// performs: build the authenticator over an empty holder, start NATS, open the
|
||||
// privileged internal connection, open the KV store, publish it into the holder,
|
||||
// then a real bus user (seeded into the KV store) authenticates over nkey. This
|
||||
// proves the bootstrap cycle is broken correctly — the KV-backed control plane
|
||||
// authorizes live clients under enforce.
|
||||
func TestKVStoreBootstrapUnderEnforce(t *testing.T) {
|
||||
internalID, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("internal identity: %v", err)
|
||||
}
|
||||
holder := &storeHolder{}
|
||||
auth := busauth.NewNkeyAuthenticatorACLInternal(
|
||||
holder.IsAuthorized,
|
||||
busauth.PermissionsFromSubjects(holder.subjectACL),
|
||||
hex.EncodeToString(internalID.SignPub),
|
||||
)
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t), Auth: auth,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("nats: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||
|
||||
// Privileged internal connection opens the KV store while the holder still
|
||||
// denies every normal client.
|
||||
intNC, js, err := connectInternalJS(ns, internalID, true)
|
||||
if err != nil {
|
||||
t.Fatalf("connectInternalJS: %v", err)
|
||||
}
|
||||
t.Cleanup(intNC.Close)
|
||||
kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
|
||||
if err != nil {
|
||||
t.Fatalf("open kv store: %v", err)
|
||||
}
|
||||
holder.set(kvStore)
|
||||
|
||||
// Seed a bus user into the KV control plane.
|
||||
alice, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("alice: %v", err)
|
||||
}
|
||||
if err := kvStore.AddUser(hex.EncodeToString(alice.SignPub), "alice", membership.RoleMember); err != nil {
|
||||
t.Fatalf("seed alice: %v", err)
|
||||
}
|
||||
|
||||
// alice authenticates over nkey — authorized via the KV store through the holder.
|
||||
pub, sign, err := busauth.ClientNkey(alice.SignPriv)
|
||||
if err != nil {
|
||||
t.Fatalf("alice nkey: %v", err)
|
||||
}
|
||||
aliceNC, err := nats.Connect(ns.ClientURL(), nats.Nkey(pub, sign), nats.MaxReconnects(0), nats.Timeout(2*time.Second))
|
||||
if err != nil {
|
||||
t.Fatalf("alice (KV-authorized) must connect under enforce: %v", err)
|
||||
}
|
||||
aliceNC.Close()
|
||||
|
||||
// An outsider not in the KV store is denied (fail closed).
|
||||
outsider, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("outsider: %v", err)
|
||||
}
|
||||
opub, osign, err := busauth.ClientNkey(outsider.SignPriv)
|
||||
if err != nil {
|
||||
t.Fatalf("outsider nkey: %v", err)
|
||||
}
|
||||
if oc, err := nats.Connect(ns.ClientURL(), nats.Nkey(opub, osign), nats.MaxReconnects(0), nats.Timeout(2*time.Second)); err == nil {
|
||||
oc.Close()
|
||||
t.Fatalf("an outsider absent from the KV store must be rejected")
|
||||
}
|
||||
}
|
||||
|
||||
// TestKVStoreDecentralizedConsistency: a room/user created via one node's KV store
|
||||
// is immediately visible to another node's KV store over the same JetStream — the
|
||||
// shared, replicated control plane that ends the per-node SQLite divergence.
|
||||
func TestKVStoreDecentralizedConsistency(t *testing.T) {
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("nats: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||
|
||||
open := func() membership.Store {
|
||||
nc, err := nats.Connect(ns.ClientURL())
|
||||
if err != nil {
|
||||
t.Fatalf("connect: %v", err)
|
||||
}
|
||||
t.Cleanup(nc.Close)
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
t.Fatalf("jetstream: %v", err)
|
||||
}
|
||||
st, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
|
||||
if err != nil {
|
||||
t.Fatalf("open kv: %v", err)
|
||||
}
|
||||
return st
|
||||
}
|
||||
nodeA := open()
|
||||
nodeB := open()
|
||||
|
||||
owner, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("owner: %v", err)
|
||||
}
|
||||
ownerPub := hex.EncodeToString(owner.SignPub)
|
||||
if err := nodeA.AddUser(ownerPub, "owner", membership.RoleAdmin); err != nil {
|
||||
t.Fatalf("nodeA add user: %v", err)
|
||||
}
|
||||
if err := nodeA.CreateRoom(
|
||||
membership.RoomInfo{RoomID: "ROOMX", Subject: "room.shared.x", OwnerEndpoint: "owner-ep"},
|
||||
owner.SignPub, owner.KexPub, nil,
|
||||
); err != nil {
|
||||
t.Fatalf("nodeA create room: %v", err)
|
||||
}
|
||||
|
||||
// nodeB (a different connection, same buckets) sees both immediately.
|
||||
if !nodeB.IsAuthorized(ownerPub) {
|
||||
t.Fatalf("nodeB must see the user created on nodeA (decentralized state divergence)")
|
||||
}
|
||||
got, err := nodeB.GetRoom("ROOMX")
|
||||
if err != nil {
|
||||
t.Fatalf("nodeB must see the room created on nodeA: %v", err)
|
||||
}
|
||||
if got.Subject != "room.shared.x" {
|
||||
t.Fatalf("nodeB read wrong room subject: %q", got.Subject)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestResolveClusterPass verifies the secret resolution precedence
|
||||
// (file > env > flag) that keeps the cluster password out of argv (issue 0006f).
|
||||
func TestResolveClusterPass(t *testing.T) {
|
||||
// file wins over env and flag, and is trimmed.
|
||||
f := filepath.Join(t.TempDir(), "pass")
|
||||
if err := os.WriteFile(f, []byte("filesecret\n"), 0o600); err != nil {
|
||||
t.Fatalf("write: %v", err)
|
||||
}
|
||||
if got, src, err := resolveClusterPass("flagsecret", f, "envsecret"); err != nil || got != "filesecret" || src != "file" {
|
||||
t.Fatalf("file precedence: got %q src %q err %v", got, src, err)
|
||||
}
|
||||
// env wins over flag when no file.
|
||||
if got, src, err := resolveClusterPass("flagsecret", "", "envsecret"); err != nil || got != "envsecret" || src != "env" {
|
||||
t.Fatalf("env precedence: got %q src %q err %v", got, src, err)
|
||||
}
|
||||
// flag is the last resort.
|
||||
if got, src, err := resolveClusterPass("flagsecret", "", ""); err != nil || got != "flagsecret" || src != "flag" {
|
||||
t.Fatalf("flag fallback: got %q src %q err %v", got, src, err)
|
||||
}
|
||||
// none set.
|
||||
if got, src, err := resolveClusterPass("", "", ""); err != nil || got != "" || src != "none" {
|
||||
t.Fatalf("none: got %q src %q err %v", got, src, err)
|
||||
}
|
||||
// missing file is an error.
|
||||
if _, _, err := resolveClusterPass("", filepath.Join(t.TempDir(), "nope"), ""); err == nil {
|
||||
t.Fatalf("missing file must error")
|
||||
}
|
||||
}
|
||||
|
||||
// TestInjectRouteCreds verifies the secret is injected only into routes that omit
|
||||
// userinfo, so --routes argv need not carry the password (issue 0006f).
|
||||
func TestInjectRouteCreds(t *testing.T) {
|
||||
in := []string{"nats://10.0.0.2:6250", "nats://override:pw@10.0.0.3:6250"}
|
||||
out, err := injectRouteCreds(in, "user", "secret")
|
||||
if err != nil {
|
||||
t.Fatalf("inject: %v", err)
|
||||
}
|
||||
if !strings.Contains(out[0], "user:secret@10.0.0.2:6250") {
|
||||
t.Fatalf("creds not injected into bare route: %q", out[0])
|
||||
}
|
||||
if !strings.Contains(out[1], "override:pw@10.0.0.3:6250") {
|
||||
t.Fatalf("existing userinfo must be preserved: %q", out[1])
|
||||
}
|
||||
// empty user is a no-op.
|
||||
noop, err := injectRouteCreds(in, "", "")
|
||||
if err != nil || noop[0] != in[0] {
|
||||
t.Fatalf("empty user must be a no-op: %v %q", err, noop[0])
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsLoopbackURL guards migrate-to-kv against pushing the allowlist cleartext
|
||||
// to a remote NATS (issue 0006f, audit 0008 N6).
|
||||
func TestIsLoopbackURL(t *testing.T) {
|
||||
loop := []string{"nats://127.0.0.1:4250", "nats://localhost:4250", "nats://[::1]:4250"}
|
||||
for _, u := range loop {
|
||||
if !isLoopbackURL(u) {
|
||||
t.Fatalf("%q should be loopback", u)
|
||||
}
|
||||
}
|
||||
remote := []string{"nats://10.0.0.2:4250", "nats://bus.example.com:4250", "::not-a-url"}
|
||||
for _, u := range remote {
|
||||
if isLoopbackURL(u) {
|
||||
t.Fatalf("%q should NOT be loopback", u)
|
||||
}
|
||||
}
|
||||
}
|
||||
+153
-14
@@ -7,6 +7,7 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/hex"
|
||||
"flag"
|
||||
"log"
|
||||
"net/http"
|
||||
@@ -15,6 +16,10 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
server "github.com/nats-io/nats-server/v2/server"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
@@ -58,10 +63,26 @@ func main() {
|
||||
clusterPort = flag.Int("cluster-port", 6250, "route listener port for server-to-server cluster traffic")
|
||||
routesCSV = flag.String("routes", "", "comma-separated nats-route URLs of the OTHER nodes, e.g. nats://user:pass@10.0.0.2:6250")
|
||||
clusterUser = flag.String("cluster-user", "", "shared route secret username (gates the route listener)")
|
||||
clusterPass = flag.String("cluster-pass", "", "shared route secret password")
|
||||
clusterPass = flag.String("cluster-pass", "", "shared route secret password (argv-visible — prefer --cluster-pass-file or UNIBUS_CLUSTER_PASS)")
|
||||
// Secret out of argv (issue 0006f, audit 0008 N1-low): a password in
|
||||
// --cluster-pass / --routes is visible in ps/journald. Prefer a file or the
|
||||
// UNIBUS_CLUSTER_PASS env var; routes may then omit userinfo and the secret
|
||||
// is injected from here.
|
||||
clusterPassFile = flag.String("cluster-pass-file", "", "path to a file holding the cluster route password (preferred over --cluster-pass; keeps the secret out of argv)")
|
||||
routeTLSCert = flag.String("route-tls-cert", "", "this node's route certificate (CA-signed); enables mutual route TLS with --route-tls-key/--route-tls-ca")
|
||||
routeTLSKey = flag.String("route-tls-key", "", "this node's route private key")
|
||||
routeTLSCA = flag.String("route-tls-ca", "", "bus CA that signs every node's route certificate (deploy/tls/ca.crt)")
|
||||
// Replicated control plane (issue 0006a/c): the JetStream replication factor
|
||||
// for the shared nonce bucket (and, with --store kv, the control-plane KV).
|
||||
// 1 for a 1-2 node rollout, 3 for real HA quorum (raise in place with
|
||||
// `nats stream update --replicas 3` when the third node joins).
|
||||
kvReplicas = flag.Int("kv-replicas", 1, "JetStream replication factor for the shared nonce/KV buckets (1..3)")
|
||||
caFile = flag.String("ca", "", "bus CA cert; only used to pin TLS on the internal JetStream connection to an EXTERNAL --nats-url (the embedded server uses an in-process connection that needs no CA)")
|
||||
// Control-plane store backend (issue 0006c, feature flag decentralized):
|
||||
// "sqlite" (default) keeps the local single-node SQLite control plane;
|
||||
// "kv" puts rooms/members/keys/users in replicated JetStream KV so any node
|
||||
// in the cluster serves the same state.
|
||||
storeBackend = flag.String("store", "sqlite", "control-plane store backend: sqlite (default, single-node) | kv (replicated JetStream, decentralized)")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
@@ -69,6 +90,17 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatalf("%v", err)
|
||||
}
|
||||
if *storeBackend != "sqlite" && *storeBackend != "kv" {
|
||||
log.Fatalf("--store must be \"sqlite\" or \"kv\", got %q", *storeBackend)
|
||||
}
|
||||
|
||||
// Resolve the cluster route secret out of argv (file/env preferred). The
|
||||
// resolved value (not *clusterPass) is what guards the route layer and is
|
||||
// injected into peer route URLs below.
|
||||
clusterPassResolved, passSource, err := resolveClusterPass(*clusterPass, *clusterPassFile, os.Getenv("UNIBUS_CLUSTER_PASS"))
|
||||
if err != nil {
|
||||
log.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
// Fail-open guard (audit H2): a non-loopback bind, or any TLS flag, demands
|
||||
// --bus-auth enforce. This makes an insecure public startup impossible rather
|
||||
@@ -78,21 +110,62 @@ func main() {
|
||||
}
|
||||
// Cluster route guard (issue 0003a): a public cluster needs a route secret
|
||||
// and mutual route TLS, and the route-TLS flags are all-or-nothing.
|
||||
if err := validateClusterConfig(*clusterName, *bind, *clusterUser, *clusterPass, *routeTLSCert, *routeTLSKey, *routeTLSCA); err != nil {
|
||||
if err := validateClusterConfig(*clusterName, *bind, *clusterUser, clusterPassResolved, *routeTLSCert, *routeTLSKey, *routeTLSCA, authMode); err != nil {
|
||||
log.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
|
||||
log.SetPrefix("[membershipd] ")
|
||||
|
||||
// Control plane store first: the NATS authenticator consults IsAuthorized, so
|
||||
// the store must exist before the embedded server starts.
|
||||
store, err := membership.Open(*dbPath)
|
||||
if err != nil {
|
||||
log.Fatalf("open membership store: %v", err)
|
||||
// A clustered node shares its control plane with peers, so it needs a JetStream
|
||||
// client to manage the replicated nonce bucket (issue 0006a). --store kv (issue
|
||||
// 0006c) also needs JetStream, for the control-plane KV itself. A standalone
|
||||
// single-node SQLite deployment needs none of this and keeps the in-process,
|
||||
// in-memory behavior unchanged.
|
||||
clustered := *clusterName != ""
|
||||
decentralized := *storeBackend == "kv"
|
||||
needJS := clustered || decentralized
|
||||
enforce := authMode == membership.AuthEnforce
|
||||
|
||||
// Internal service identity (issue 0006a): when the embedded data plane enforces
|
||||
// auth, membershipd must still connect to its OWN server to manage JetStream.
|
||||
// It does so with this ephemeral identity, which the authenticator is built to
|
||||
// recognize and grant full permissions (it never enters the user allowlist). It
|
||||
// is only generated when actually needed (JetStream required AND enforce on AND
|
||||
// the server is embedded), so a standalone or non-enforce node is unchanged.
|
||||
var internalID cs.Identity
|
||||
var internalPubHex string
|
||||
if needJS && enforce && *natsURL == "" {
|
||||
internalID, err = cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
log.Fatalf("generate internal identity: %v", err)
|
||||
}
|
||||
internalPubHex = hex.EncodeToString(internalID.SignPub)
|
||||
}
|
||||
defer store.Close()
|
||||
log.Printf("membership store: %s", *dbPath)
|
||||
|
||||
// The authenticator consults the store through a holder so it can be built
|
||||
// before the store exists: with --store kv the JetStream KV store opens only
|
||||
// after NATS is up (the bootstrap cycle). In the default SQLite path the store
|
||||
// is opened and set into the holder right here, before the server starts, so
|
||||
// behavior is identical to the pre-0006c baseline. `store` is the final store
|
||||
// used by the HTTP server (set below for the KV path).
|
||||
holder := &storeHolder{}
|
||||
var store membership.Store
|
||||
if !decentralized {
|
||||
store, err = membership.Open(*dbPath)
|
||||
if err != nil {
|
||||
log.Fatalf("open membership store: %v", err)
|
||||
}
|
||||
holder.set(store)
|
||||
log.Printf("membership store: sqlite %s", *dbPath)
|
||||
}
|
||||
// Close whichever store ends up final (SQLite closes its file; the JetStream KV
|
||||
// store's Close is a no-op — its NATS connection is closed separately).
|
||||
defer func() {
|
||||
if store != nil {
|
||||
store.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
blobs, err := blobstore.New(*storeDir)
|
||||
if err != nil {
|
||||
@@ -118,14 +191,21 @@ func main() {
|
||||
}
|
||||
// Cluster (issue 0003a): with a cluster name, join the route layer for HA.
|
||||
if *clusterName != "" {
|
||||
// Inject the resolved secret into peer route URLs that omit userinfo, so
|
||||
// the password need not appear in --routes argv (issue 0006f).
|
||||
routes, rerr := injectRouteCreds(splitRoutes(*routesCSV), *clusterUser, clusterPassResolved)
|
||||
if rerr != nil {
|
||||
log.Fatalf("%v", rerr)
|
||||
}
|
||||
cc := &embeddednats.ClusterConfig{
|
||||
Name: *clusterName,
|
||||
Host: *bind,
|
||||
Port: *clusterPort,
|
||||
Routes: splitRoutes(*routesCSV),
|
||||
Routes: routes,
|
||||
Username: *clusterUser,
|
||||
Password: *clusterPass,
|
||||
Password: clusterPassResolved,
|
||||
}
|
||||
log.Printf("cluster route secret source: %s", passSource)
|
||||
if *routeTLSCert != "" {
|
||||
rtls, err := busauth.RouteTLSConfig(*routeTLSCert, *routeTLSKey, *routeTLSCA)
|
||||
if err != nil {
|
||||
@@ -145,9 +225,10 @@ func main() {
|
||||
// Subscribe(">") and harvest every room's subject and JetStream activity.
|
||||
// NATS freezes permissions at connect time, so a peer that joins a room
|
||||
// after connecting must client.RefreshSession to gain that room's subject.
|
||||
cfg.Auth = busauth.NewNkeyAuthenticatorACL(
|
||||
store.IsAuthorized,
|
||||
busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)),
|
||||
cfg.Auth = busauth.NewNkeyAuthenticatorACLInternal(
|
||||
holder.IsAuthorized,
|
||||
busauth.PermissionsFromSubjects(holder.subjectACL),
|
||||
internalPubHex,
|
||||
)
|
||||
log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)")
|
||||
}
|
||||
@@ -172,6 +253,38 @@ func main() {
|
||||
log.Printf("using external NATS: %s", natsClientURL)
|
||||
}
|
||||
|
||||
// JetStream client + decentralized store (issue 0006a/c). needJS is set for a
|
||||
// clustered node (shared nonce bucket) and for --store kv (the KV control
|
||||
// plane). Open the privileged JetStream client first (in-process for the
|
||||
// embedded server, a plain client for external NATS), then — for --store kv —
|
||||
// open the replicated KV store and publish it into the holder so the
|
||||
// authenticator and HTTP server serve from it. The privileged connection is the
|
||||
// only client that can connect in this window (the holder still denies everyone
|
||||
// else; the internal identity bypasses the store).
|
||||
var js jetstream.JetStream
|
||||
if needJS {
|
||||
var internalNC *nats.Conn
|
||||
if *natsURL == "" {
|
||||
internalNC, js, err = connectInternalJS(ns, internalID, enforce)
|
||||
} else {
|
||||
internalNC, js, err = connectExternalJS(natsClientURL, *caFile)
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatalf("internal JetStream connection (required by --cluster-name/--store kv): %v", err)
|
||||
}
|
||||
defer internalNC.Close()
|
||||
|
||||
if decentralized {
|
||||
kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: *kvReplicas})
|
||||
if err != nil {
|
||||
log.Fatalf("open decentralized control-plane KV store: %v", err)
|
||||
}
|
||||
store = kvStore
|
||||
holder.set(store)
|
||||
log.Printf("membership store: jetstream KV (replicas=%d)", *kvReplicas)
|
||||
}
|
||||
}
|
||||
|
||||
srv := membership.NewServer(store, blobs, authMode)
|
||||
// On a public (non-loopback) bind, disable cleartext rooms: the embedded NATS
|
||||
// has no per-subject ACL, so cleartext content would be readable by any
|
||||
@@ -181,6 +294,32 @@ func main() {
|
||||
srv.RequireEncryptedRooms = true
|
||||
log.Printf("cleartext rooms: DISABLED (public bind requires end-to-end encryption)")
|
||||
}
|
||||
// Publish this node's posture on /healthz so a monitor (or a peer) can detect a
|
||||
// cluster member not running the homogeneous enforce+ACL+TLS posture (audit
|
||||
// 0008 N1). enforce implies the per-subject ACL in this binary (they are wired
|
||||
// together above).
|
||||
srv.Posture = membership.Posture{
|
||||
Enforce: enforce,
|
||||
ACL: enforce,
|
||||
TLS: *tlsCert != "",
|
||||
Cluster: clustered,
|
||||
Store: *storeBackend,
|
||||
}
|
||||
|
||||
// Replicated anti-replay (issue 0006a, audit 0008 N3): a clustered node MUST
|
||||
// share its nonce store across the cluster, or a request accepted on one node
|
||||
// can be replayed to another. HARD requirement: if the bucket cannot be created
|
||||
// the node refuses to start rather than run with a per-process cache that leaves
|
||||
// the replay hole open.
|
||||
if needJS {
|
||||
if err := wireReplicatedNonces(srv, js, clustered, *kvReplicas); err != nil {
|
||||
log.Fatalf("%v", err)
|
||||
}
|
||||
if clustered {
|
||||
log.Printf("anti-replay: replicated nonce bucket \"KV_UNIBUS_nonces\" (replicas=%d) — cluster-safe", *kvReplicas)
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("control-plane auth: %s", authMode)
|
||||
addr := *bind + ":" + *httpPort
|
||||
httpSrv := &http.Server{
|
||||
|
||||
@@ -33,6 +33,14 @@ func runMigrateCLI(args []string) {
|
||||
fmt.Fprintln(os.Stderr, "membershipd migrate-to-kv: --nats-url is required (the cluster to write the KV buckets into)")
|
||||
os.Exit(2)
|
||||
}
|
||||
// Confidentiality guard (issue 0006f, audit 0008 N6): the migration writes the
|
||||
// allowlist (handles, roles, signing pubkeys) into the KV. Against a REMOTE NATS
|
||||
// without TLS that metadata would travel in cleartext, so a remote target MUST
|
||||
// be TLS-pinned with --ca. A loopback target is local-only and exempt.
|
||||
if !isLoopbackURL(*natsURL) && *ca == "" {
|
||||
fmt.Fprintf(os.Stderr, "membershipd migrate-to-kv: refusing to migrate to remote %q without --ca; the allowlist (handles/roles/sign pubs) would travel in cleartext — pin TLS with --ca, or run against a loopback nats-url\n", *natsURL)
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
// Back up the SQLite database first so a botched migration can be undone.
|
||||
var backupPath string
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
)
|
||||
|
||||
// storeHolder is a concurrency-safe slot for the control-plane store, used to
|
||||
// break the decentralized bootstrap cycle (issue 0006c): the NATS authenticator
|
||||
// must be built BEFORE the embedded server starts, but the JetStream KV store can
|
||||
// only be opened AFTER NATS is up (it needs a JetStream client). The authenticator
|
||||
// therefore consults the holder instead of a concrete store.
|
||||
//
|
||||
// Fail-closed by construction: until the store is set, IsAuthorized denies and
|
||||
// SubjectACL errors, so any client connecting in the startup window is rejected.
|
||||
// The only connection expected in that window is membershipd's own internal
|
||||
// service identity, which the authenticator recognizes by key and lets through
|
||||
// without consulting the store at all. In the SQLite (default) path the store is
|
||||
// set before StartServer, so the window does not exist and behavior is identical
|
||||
// to the pre-0006c baseline.
|
||||
type storeHolder struct {
|
||||
mu sync.RWMutex
|
||||
s membership.Store
|
||||
}
|
||||
|
||||
func (h *storeHolder) set(s membership.Store) {
|
||||
h.mu.Lock()
|
||||
h.s = s
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *storeHolder) get() membership.Store {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return h.s
|
||||
}
|
||||
|
||||
// IsAuthorized reports whether signPubHex is an active bus user, denying while the
|
||||
// store is not yet set (fail closed). It is the predicate the nkey authenticator
|
||||
// uses for every connecting client.
|
||||
func (h *storeHolder) IsAuthorized(signPubHex string) bool {
|
||||
s := h.get()
|
||||
if s == nil {
|
||||
return false
|
||||
}
|
||||
return s.IsAuthorized(signPubHex)
|
||||
}
|
||||
|
||||
// subjectACL derives the per-subject permissions for signPubHex via the live
|
||||
// store, erroring (so the caller fails closed and denies the connection) while the
|
||||
// store is not yet set.
|
||||
func (h *storeHolder) subjectACL(signPubHex string) ([]string, error) {
|
||||
s := h.get()
|
||||
if s == nil {
|
||||
return nil, fmt.Errorf("control-plane store not ready")
|
||||
}
|
||||
return membership.SubjectACLFor(s)(signPubHex)
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
)
|
||||
|
||||
// TestStoreHolderFailClosed: an empty holder denies everything (the bootstrap
|
||||
// window before the store is set), and starts serving once a store is published.
|
||||
func TestStoreHolderFailClosed(t *testing.T) {
|
||||
h := &storeHolder{}
|
||||
|
||||
// Empty: deny + error (fail closed).
|
||||
if h.IsAuthorized("anything") {
|
||||
t.Fatalf("empty holder must deny IsAuthorized")
|
||||
}
|
||||
if _, err := h.subjectACL("anything"); err == nil {
|
||||
t.Fatalf("empty holder must error from subjectACL (fail closed)")
|
||||
}
|
||||
|
||||
// After set: serves from the real store.
|
||||
store, err := membership.Open(filepath.Join(t.TempDir(), "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
id, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("identity: %v", err)
|
||||
}
|
||||
pub := hex.EncodeToString(id.SignPub)
|
||||
if err := store.AddUser(pub, "alice", membership.RoleMember); err != nil {
|
||||
t.Fatalf("add user: %v", err)
|
||||
}
|
||||
h.set(store)
|
||||
|
||||
if !h.IsAuthorized(pub) {
|
||||
t.Fatalf("after set, an active user must be authorized")
|
||||
}
|
||||
if _, err := h.subjectACL(pub); err != nil {
|
||||
t.Fatalf("after set, subjectACL must succeed: %v", err)
|
||||
}
|
||||
if h.IsAuthorized("deadbeef") {
|
||||
t.Fatalf("a non-user must not be authorized")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
// wireReplicatedNonces applies the cluster anti-replay policy to srv. It is the
|
||||
// single piece of wiring the binary uses to decide whether a node must share its
|
||||
// nonce store, extracted so a regression test exercises the EXACT decision the
|
||||
// running binary makes (issue 0006a, audit 0008 N3).
|
||||
//
|
||||
// Policy:
|
||||
// - A clustered node (clustered == true) MUST use the shared JetStream KV nonce
|
||||
// bucket. Every node sees the same bucket, so a request accepted on one node
|
||||
// cannot be replayed to another whose per-process cache never saw the nonce.
|
||||
// A missing JetStream context, or a failure to create the bucket, is a FATAL
|
||||
// configuration error returned to the caller — a clustered node running with a
|
||||
// per-process nonce cache is precisely the replay hole the audit flagged, so
|
||||
// it must refuse to start rather than serve insecurely.
|
||||
// - A standalone node (clustered == false) keeps the in-memory cache that
|
||||
// NewServer installed: there is no second node to replay to, so the shared
|
||||
// bucket would only add a JetStream dependency for no security gain.
|
||||
//
|
||||
// replicas is the nonce bucket's replication factor (R1..R3). Returns nil when no
|
||||
// action is required (standalone).
|
||||
func wireReplicatedNonces(srv *membership.Server, js jetstream.JetStream, clustered bool, replicas int) error {
|
||||
if !clustered {
|
||||
return nil // standalone: the in-memory nonce cache is sufficient and safe
|
||||
}
|
||||
if js == nil {
|
||||
return fmt.Errorf("clustered node requires JetStream for the shared nonce bucket, but none is available")
|
||||
}
|
||||
if err := srv.UseReplicatedNonces(js, replicas); err != nil {
|
||||
return fmt.Errorf("replicated nonces: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -47,6 +47,13 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatalf("create room: %v", err)
|
||||
}
|
||||
// Membership-change contract (issue 0006e): the bus freezes per-subject
|
||||
// permissions at connect time, and this room did not exist then. Refresh the
|
||||
// session so the new room's subject becomes publishable under enforce+ACL. On
|
||||
// an unsecured/dev bus this is a harmless reconnect.
|
||||
if err := c.RefreshSession(); err != nil {
|
||||
log.Fatalf("refresh session after create room: %v", err)
|
||||
}
|
||||
log.Printf("room %q -> %s (subject %s, cleartext)", *roomSub, roomID, *roomSub)
|
||||
|
||||
stop := make(chan os.Signal, 1)
|
||||
|
||||
@@ -65,3 +65,34 @@ curl -fsS http://<host-lan-ip>:8470/healthz
|
||||
- To run against an external NATS instead of the embedded one, append
|
||||
`--nats-url nats://<host>:4222` to `ExecStart` and re-run `daemon-reload` +
|
||||
`restart`.
|
||||
|
||||
## Clustering (HA) — see `deploy/cluster/`
|
||||
|
||||
The single-node service above is secure on its own. Running unibus as a
|
||||
multi-node **cluster** has extra hardening rules (issues 0006a–0006f); the full
|
||||
runbook and the generated material live in `deploy/cluster/`. Key points an
|
||||
operator must know:
|
||||
|
||||
- **Homogeneous posture (0006d).** Every node MUST run `--bus-auth enforce` (the
|
||||
binary refuses to join a cluster otherwise) and present mutual route TLS on a
|
||||
public bind. `/healthz` publishes each node's `posture` so a monitor can flag a
|
||||
node that is not `enforce`+`acl`+`tls`.
|
||||
- **Separate route CA (0006f).** The cluster route layer authenticates *nodes*,
|
||||
not bus users — sign the route certs with a **dedicated cluster CA**
|
||||
(`--route-tls-ca`), NOT the client data-plane CA (`--tls-cert`'s CA). Keeping
|
||||
the two trust roots separate means a client cert can never be presented to the
|
||||
route port. `deploy/cluster/generate-cluster-certs.sh` builds this CA.
|
||||
- **Secret out of argv (0006f).** Pass the route password via
|
||||
`--cluster-pass-file` or the `UNIBUS_CLUSTER_PASS` env var, NOT `--cluster-pass`
|
||||
or a `nats://user:pass@host` in `--routes` (both are visible in `ps`/journald).
|
||||
When the secret comes from a file/env, list peers as bare `--routes
|
||||
nats://<host>:6250` and the binary injects the credentials.
|
||||
- **`migrate-to-kv` confidentiality (0006f).** The migration writes the allowlist
|
||||
(handles/roles/sign pubs) into KV. Run it only against a **loopback** nats-url,
|
||||
or pin TLS with `--ca` for a remote target — otherwise that metadata travels in
|
||||
cleartext. The binary refuses a remote target without `--ca`.
|
||||
- **R1 is NOT HA (0006a/N3-DoS).** With `--kv-replicas 1` the control plane
|
||||
(including the nonce bucket) is a single point of failure: if the node owning
|
||||
the stream dies, every authenticated request fails closed (auth DoS). Real HA
|
||||
needs **R3** (quorum 2/3): raise replicas in place with `nats stream update
|
||||
--replicas 3` once the third node has joined. Do not advertise R1 as HA.
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
# Generated TLS material and secrets — NEVER commit (audit 0008: keys/secret).
|
||||
out/
|
||||
build/
|
||||
secrets/
|
||||
*.key
|
||||
*.srl
|
||||
cluster-ca.crt
|
||||
@@ -0,0 +1,181 @@
|
||||
# unibus cluster — 3-node deploy runbook (issue 0006g)
|
||||
|
||||
This directory holds the material to bring up unibus as a **3-node cluster**
|
||||
(`magnus` + `homer` + `datardos`) for real HA: with **R3** replication the control
|
||||
plane (rooms/members/keys/users on JetStream KV + the anti-replay nonce bucket)
|
||||
survives the loss of any one node (quorum 2/3).
|
||||
|
||||
> **The agent that authored this never touched a VPS.** Every step that changes a
|
||||
> remote host is marked **HUMAN** and is executed by the operator. `deploy-cluster.sh`
|
||||
> defaults to a dry run.
|
||||
|
||||
## Files
|
||||
|
||||
| File | What it is |
|
||||
|---|---|
|
||||
| `nodes.env` | Topology: cluster name, ports, and the per-node rows (name, ssh host, public IP, WG IP). **HUMAN fills the placeholders.** |
|
||||
| `generate-cluster-certs.sh` | Mints a **separate cluster route CA** + a route cert per node, and a data-plane server cert per node signed by the **client CA** (`../tls/ca.*`). |
|
||||
| `membershipd-cluster.service` | One systemd unit, parameterized per node by `/opt/unibus/cluster.env`. enforce + per-subject ACL + TLS + `--store kv`, `Restart=always`. |
|
||||
| `deploy-cluster.sh` | Cross-builds the linux binary, generates each node's `cluster.env`, and (with `--yes`) rsyncs everything + installs the unit. Staggered start is manual. |
|
||||
|
||||
Generated keys/secrets (`out/`, `build/`, `secrets/`) are **gitignored** — they are
|
||||
secret and never leave the operator's trusted machine except over the secure
|
||||
rsync channel.
|
||||
|
||||
## Topology
|
||||
|
||||
| Node | SSH | Public IP | WireGuard IP | Role |
|
||||
|---|---|---|---|---|
|
||||
| magnus | `magnus` | `<MAGNUS_PUBLIC_IP>` | `<MAGNUS_WG_IP>` | seed (first up) |
|
||||
| homer | `homer` | `141.94.69.66` | `<HOMER_WG_IP>` | replica |
|
||||
| datardos | `dd` | `51.91.100.142` | `<DATARDOS_WG_IP>` (10.21.0.x) | replica |
|
||||
|
||||
The route layer (server-to-server) prefers the **WireGuard mesh**
|
||||
(`ROUTE_NETWORK=wg`); the client data plane and the HTTP control plane are reached
|
||||
over the public IPs. The route CA is **separate** from the client CA, so a client
|
||||
cert can never be presented to the route port.
|
||||
|
||||
## Prerequisites (HUMAN, once)
|
||||
|
||||
1. **Fill `nodes.env`** — replace every `<PLACEHOLDER>` (magnus public IP, all WG
|
||||
IPs). The scripts refuse to run while any remain.
|
||||
2. **Client CA exists** — `../tls/ca.crt` + `../tls/ca.key`. If not, run
|
||||
`../tls/generate-certs.sh` on the CA host (om) first. The cluster reuses this CA
|
||||
for the data plane so existing clients keep trusting the bus.
|
||||
3. **Mint cluster TLS**:
|
||||
```bash
|
||||
./generate-cluster-certs.sh # writes out/<name>/ ; --force to rotate the cluster CA
|
||||
```
|
||||
4. **Create the route secret** (out of argv, shared by all nodes):
|
||||
```bash
|
||||
mkdir -p secrets && openssl rand -hex 32 > secrets/cluster.pass
|
||||
```
|
||||
5. **SSH** to each node's SSH host as `root` works (`ssh magnus true`, `ssh dd true`, ...).
|
||||
|
||||
## Stage the nodes
|
||||
|
||||
```bash
|
||||
./deploy-cluster.sh # DRY RUN — prints the full plan, touches nothing
|
||||
./deploy-cluster.sh --yes # HUMAN: actually rsync + install the unit on all 3 nodes
|
||||
```
|
||||
|
||||
This cross-builds `membershipd` (linux/amd64, `CGO_ENABLED=0`), writes each node's
|
||||
`cluster.env` (its `NODE_NAME` and the `--routes` to the OTHER two nodes), and
|
||||
ships the binary, the node's TLS material, the secret, the env file and the unit.
|
||||
It does **not** start anything.
|
||||
|
||||
## Seed the first admin into the KV (HUMAN — loopback bootstrap)
|
||||
|
||||
The empty KV control plane has no users, and under `enforce` no external tool can
|
||||
write the FIRST admin over NATS (it would need to be an admin already — a
|
||||
chicken-and-egg). The `user` CLI also writes only to a local SQLite file, not the
|
||||
KV. So the first admin is seeded on the seed node through a **loopback, no-auth
|
||||
bootstrap** that populates the same JetStream store the cluster unit then reuses:
|
||||
|
||||
```bash
|
||||
ssh root@magnus 'bash -s' <<'SEED'
|
||||
set -euo pipefail
|
||||
cd /opt/unibus
|
||||
# a) Put the first admin into a local SQLite seed file.
|
||||
./membershipd user add --db ./seed.db --handle root --sign-pub <ADMIN_SIGN_PUB_HEX> --role admin
|
||||
# b) Bring up a TEMPORARY loopback, no-auth, single-node KV server on the cluster's
|
||||
# own JetStream store dir (not exposed; bus-auth off is allowed on 127.0.0.1).
|
||||
./membershipd --store kv --bus-auth off --bind 127.0.0.1 \
|
||||
--nats-store ./local_files/jetstream --db ./seed.db >/tmp/seed-boot.log 2>&1 &
|
||||
BOOT=$!; sleep 2
|
||||
# c) Migrate the admin from SQLite into the replicated KV (loopback — no --ca needed).
|
||||
./membershipd migrate-to-kv --db ./seed.db --nats-url nats://127.0.0.1:4250 --replicas 1
|
||||
# d) Stop the bootstrap server. The KV buckets persist in ./local_files/jetstream.
|
||||
kill "$BOOT"; wait "$BOOT" 2>/dev/null || true
|
||||
rm -f ./seed.db
|
||||
SEED
|
||||
```
|
||||
|
||||
> The KV written here lives in `./local_files/jetstream`, which the cluster unit
|
||||
> reuses (`--nats-store` default), so the admin is present when the enforce cluster
|
||||
> starts. Additional users are added the same loopback way until a
|
||||
> `user add --store kv` exists (see GAP in report 0009).
|
||||
|
||||
## Bring up (HUMAN — staggered)
|
||||
|
||||
Bring up the seed first, then the replicas one at a time, checking each joins.
|
||||
|
||||
```bash
|
||||
# 1. Seed node (after the seed step above).
|
||||
ssh root@magnus 'systemctl enable --now membershipd-cluster'
|
||||
ssh root@magnus 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt'
|
||||
|
||||
# 2. Replicas, one at a time.
|
||||
ssh root@homer 'systemctl enable --now membershipd-cluster'
|
||||
ssh root@datardos 'systemctl enable --now membershipd-cluster'
|
||||
```
|
||||
|
||||
> Initial rollout runs at **R1** (`KV_REPLICAS=1` in `nodes.env`): the buckets live
|
||||
> on the seed only. This is NOT HA yet — see "Scale to R3".
|
||||
|
||||
## Promote an existing single-node (SQLite) deployment (HUMAN, optional)
|
||||
|
||||
Instead of seeding fresh, you can migrate an existing single-node `unibus.db` into
|
||||
the KV — **loopback only** (the allowlist would otherwise travel cleartext; the
|
||||
command refuses a remote target without `--ca`). Use the same loopback-bootstrap
|
||||
shape as the seed step (temporary `--bus-auth off` server on 127.0.0.1, then
|
||||
`migrate-to-kv --db /opt/unibus/local_files/unibus.db`).
|
||||
|
||||
## Verify
|
||||
|
||||
```bash
|
||||
# Posture on every node — all must be enforce+acl+tls+cluster, store=kv.
|
||||
for h in magnus homer datardos; do
|
||||
echo "== $h =="
|
||||
ssh root@$h 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt'
|
||||
done
|
||||
|
||||
# Cluster + JetStream meta-group health (needs the `nats` CLI on a node):
|
||||
ssh root@magnus 'nats --server nats://127.0.0.1:4250 server report jetstream'
|
||||
ssh root@magnus 'nats --server nats://127.0.0.1:4250 server list' # 3 servers, routes up
|
||||
```
|
||||
|
||||
A healthy cluster shows 3 routed servers and a JetStream meta-group with a leader.
|
||||
|
||||
## Scale to R3 (HUMAN — real HA)
|
||||
|
||||
Once all three nodes are up and routed, raise the replication factor of every
|
||||
control-plane stream from 1 to 3 IN PLACE (no data loss), then flip `KV_REPLICAS=3`
|
||||
in `nodes.env` so future (re)deploys keep it:
|
||||
|
||||
```bash
|
||||
for s in KV_UNIBUS_users KV_UNIBUS_rooms KV_UNIBUS_members KV_UNIBUS_room_keys \
|
||||
KV_UNIBUS_rooms_by_member KV_UNIBUS_nonces; do
|
||||
ssh root@magnus "nats --server nats://127.0.0.1:4250 stream update $s --replicas 3 -f"
|
||||
done
|
||||
# (also OBJ_UNIBUS_blobs if the object store is in use)
|
||||
```
|
||||
|
||||
Until this is done, R1 means the seed node is a **single point of failure for
|
||||
authentication**: if it dies, the nonce/KV control plane is unreachable and every
|
||||
authenticated request fails closed (auth DoS). R1 is a rollout step, not HA.
|
||||
|
||||
## Chaos test (HUMAN — requires the 3 live VPS; NOT run here)
|
||||
|
||||
Validate quorum tolerance after R3:
|
||||
|
||||
```bash
|
||||
# Kill one node; the cluster keeps serving (quorum 2/3).
|
||||
ssh root@datardos 'systemctl stop membershipd-cluster'
|
||||
# -> clients fail over (multiple seed URLs); reads/writes still succeed.
|
||||
ssh root@datardos 'systemctl start membershipd-cluster' # rejoins, catches up
|
||||
|
||||
# Kill two nodes; quorum is LOST — the control plane should fail CLOSED (deny),
|
||||
# never fail open. Verify a request is rejected, not silently served.
|
||||
```
|
||||
|
||||
This network-level chaos test (kill 1/3, kill 2/3, partition/split-brain) is part
|
||||
of the deploy validation (issue 0003f) and runs against the real VPS — it is
|
||||
deliberately out of scope for the authoring agent.
|
||||
|
||||
## Rollback
|
||||
|
||||
`membershipd` does not delete data. To revert a node to standalone SQLite, stop
|
||||
the unit and start it without `--store kv`/`--cluster-name`; the KV buckets remain
|
||||
for a later retry. To rotate the cluster CA, re-run `generate-cluster-certs.sh
|
||||
--force` and re-stage (every node must get the new `cluster-ca.crt` together).
|
||||
Executable
+126
@@ -0,0 +1,126 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# deploy-cluster.sh — cross-build membershipd and stage it onto the three cluster
|
||||
# nodes (issue 0006g). DEFAULT IS DRY-RUN: it prints the plan and touches nothing.
|
||||
# Pass --yes to actually rsync + run remote commands. Steps that a HUMAN must run
|
||||
# (or confirm) are marked "HUMAN:".
|
||||
#
|
||||
# Prerequisites (HUMAN, once):
|
||||
# 1. Fill nodes.env (no <PLACEHOLDER> left).
|
||||
# 2. ./generate-cluster-certs.sh (mints out/<name>/ TLS material)
|
||||
# 3. Create the route secret locally: mkdir -p secrets && openssl rand -hex 32 > secrets/cluster.pass
|
||||
# (secrets/ is gitignored; it is rsynced to each node as cluster.pass)
|
||||
# 4. SSH access to every node's SSH_HOST with sudo-less root (SSH_USER=root).
|
||||
#
|
||||
# What it does per node (with --yes):
|
||||
# - rsync the membershipd binary, the node's TLS material, the unit, the
|
||||
# generated cluster.env and the route secret into REMOTE_DIR.
|
||||
# - install + daemon-reload the systemd unit.
|
||||
# Start is STAGGERED and left to the human (see README): start the seed node,
|
||||
# seed the admin, then start the rest.
|
||||
set -euo pipefail
|
||||
|
||||
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
cd "$DIR"
|
||||
|
||||
# shellcheck source=/dev/null
|
||||
source ./nodes.env
|
||||
|
||||
APPLY=0
|
||||
[[ "${1:-}" == "--yes" ]] && APPLY=1
|
||||
|
||||
if grep -q '<[A-Z_]\+>' nodes.env; then
|
||||
echo "ERROR: nodes.env still has <PLACEHOLDER> values — fill them in first." >&2
|
||||
exit 2
|
||||
fi
|
||||
|
||||
SECRET_FILE="secrets/cluster.pass"
|
||||
if [[ ! -f "$SECRET_FILE" ]]; then
|
||||
echo "ERROR: $SECRET_FILE missing. HUMAN: mkdir -p secrets && openssl rand -hex 32 > $SECRET_FILE" >&2
|
||||
exit 2
|
||||
fi
|
||||
|
||||
run() {
|
||||
# Echo every action; only execute it under --yes.
|
||||
echo " + $*"
|
||||
if [[ $APPLY -eq 1 ]]; then
|
||||
"$@"
|
||||
fi
|
||||
}
|
||||
|
||||
echo "==> [1/3] cross-build membershipd (linux/amd64, CGO disabled)"
|
||||
run env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o build/membershipd ../../cmd/membershipd
|
||||
|
||||
# Build the comma-separated route list for a node = the OTHER nodes' addresses on
|
||||
# the chosen network, with NO userinfo (the secret is injected by membershipd from
|
||||
# the file). Echoes nothing; prints the value.
|
||||
routes_for() {
|
||||
local self="$1" out=""
|
||||
local row name _ssh pub wg addr
|
||||
for row in "${CLUSTER_NODES[@]}"; do
|
||||
read -r name _ssh pub wg <<<"$row"
|
||||
[[ "$name" == "$self" ]] && continue
|
||||
if [[ "$ROUTE_NETWORK" == "public" ]]; then addr="$pub"; else addr="$wg"; fi
|
||||
out+="nats://${addr}:${NATS_ROUTE_PORT},"
|
||||
done
|
||||
echo "${out%,}"
|
||||
}
|
||||
|
||||
echo "==> [2/3] stage each node (REMOTE_DIR=$REMOTE_DIR)"
|
||||
for row in "${CLUSTER_NODES[@]}"; do
|
||||
read -r name ssh _pub _wg <<<"$row"
|
||||
target="${SSH_USER}@${ssh}"
|
||||
nodedir="out/${name}"
|
||||
if [[ ! -d "$nodedir" ]]; then
|
||||
echo "ERROR: $nodedir missing — run ./generate-cluster-certs.sh first." >&2
|
||||
exit 2
|
||||
fi
|
||||
routes="$(routes_for "$name")"
|
||||
|
||||
echo "-- node ${name} (ssh ${ssh}) routes=${routes}"
|
||||
|
||||
# Generate this node's cluster.env locally, then ship it.
|
||||
envfile="build/cluster-${name}.env"
|
||||
mkdir -p build
|
||||
cat > "$envfile" <<EOF
|
||||
NODE_NAME=${name}
|
||||
CLUSTER_NAME=${CLUSTER_NAME}
|
||||
CLUSTER_USER=${CLUSTER_USER}
|
||||
KV_REPLICAS=${KV_REPLICAS}
|
||||
HTTP_PORT=${HTTP_PORT}
|
||||
NATS_CLIENT_PORT=${NATS_CLIENT_PORT}
|
||||
NATS_ROUTE_PORT=${NATS_ROUTE_PORT}
|
||||
ROUTES=${routes}
|
||||
CLUSTER_PASS_FILE=${REMOTE_DIR}/secrets/cluster.pass
|
||||
TLS_CERT=${REMOTE_DIR}/tls/server-${name}.crt
|
||||
TLS_KEY=${REMOTE_DIR}/tls/server-${name}.key
|
||||
ROUTE_TLS_CERT=${REMOTE_DIR}/tls/route-${name}.crt
|
||||
ROUTE_TLS_KEY=${REMOTE_DIR}/tls/route-${name}.key
|
||||
ROUTE_TLS_CA=${REMOTE_DIR}/tls/cluster-ca.crt
|
||||
EOF
|
||||
|
||||
run ssh "$target" "mkdir -p ${REMOTE_DIR}/tls ${REMOTE_DIR}/secrets"
|
||||
run rsync -az build/membershipd "${target}:${REMOTE_DIR}/membershipd"
|
||||
run rsync -az "${nodedir}/" "${target}:${REMOTE_DIR}/tls/"
|
||||
run rsync -az "$SECRET_FILE" "${target}:${REMOTE_DIR}/secrets/cluster.pass"
|
||||
run rsync -az "$envfile" "${target}:${REMOTE_DIR}/cluster.env"
|
||||
run rsync -az membershipd-cluster.service "${target}:/etc/systemd/system/membershipd-cluster.service"
|
||||
run ssh "$target" "chmod 600 ${REMOTE_DIR}/secrets/cluster.pass ${REMOTE_DIR}/tls/*.key && systemctl daemon-reload"
|
||||
done
|
||||
|
||||
echo "==> [3/3] staged."
|
||||
if [[ $APPLY -eq 0 ]]; then
|
||||
echo " DRY-RUN: nothing was sent. Re-run with --yes to apply."
|
||||
fi
|
||||
cat <<'NEXT'
|
||||
|
||||
HUMAN — staggered start (do NOT enable all at once; see README "Bring up"):
|
||||
1. Seed node first (e.g. magnus):
|
||||
ssh root@magnus 'systemctl enable --now membershipd-cluster'
|
||||
ssh root@magnus '/opt/unibus/membershipd user add --admin ...' # seed admin
|
||||
2. Then the other two, one at a time, checking quorum after each:
|
||||
ssh root@homer 'systemctl enable --now membershipd-cluster'
|
||||
ssh root@datardos 'systemctl enable --now membershipd-cluster'
|
||||
3. Verify posture + quorum (README "Verify").
|
||||
4. Scale replicas 1 -> 3 once all three are up (README "Scale to R3").
|
||||
NEXT
|
||||
Executable
+120
@@ -0,0 +1,120 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# generate-cluster-certs.sh — mint the TLS material for a unibus 3-node cluster
|
||||
# (issue 0006g). Run ONCE on a trusted machine (e.g. om, which custodies the bus
|
||||
# CA); distribute the per-node output to each node over a secure channel. This
|
||||
# script touches NO remote host.
|
||||
#
|
||||
# It produces two trust roots, kept SEPARATE on purpose (audit 0008 N1-low):
|
||||
#
|
||||
# 1. The CLUSTER route CA (cluster-ca.crt/key, generated here): signs each
|
||||
# node's ROUTE certificate. The route layer authenticates NODES, not bus
|
||||
# users, so it must NOT share the client data-plane CA — a client cert can
|
||||
# then never be presented to the route port.
|
||||
# 2. The CLIENT data-plane CA (../tls/ca.crt/key, the one clients pin): signs
|
||||
# each node's DATA-PLANE server certificate. Reused, not regenerated, so
|
||||
# existing clients keep trusting the bus.
|
||||
#
|
||||
# Per node it emits, under out/<name>/:
|
||||
# route-<name>.crt/key route cert (cluster CA), EKU server+clientAuth
|
||||
# (each node is BOTH server and dialer to its peers)
|
||||
# server-<name>.crt/key data-plane cert (client CA), EKU serverAuth
|
||||
# cluster-ca.crt the route CA cert (for --route-tls-ca)
|
||||
# ca.crt the client CA cert (for clients / control-plane TLS)
|
||||
#
|
||||
# SANs per node = its public IP + its WireGuard IP + its hostname + localhost.
|
||||
#
|
||||
# Key material: EC P-256 (Go crypto/tls + nats-server friendly), matching
|
||||
# ../tls/generate-certs.sh.
|
||||
set -euo pipefail
|
||||
|
||||
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
cd "$DIR"
|
||||
|
||||
# shellcheck source=/dev/null
|
||||
source ./nodes.env
|
||||
|
||||
# Refuse to run while any placeholder remains (HUMAN must fill nodes.env first).
|
||||
if grep -q '<[A-Z_]\+>' nodes.env; then
|
||||
echo "ERROR: nodes.env still has <PLACEHOLDER> values — fill them in first." >&2
|
||||
grep -n '<[A-Z_]\+>' nodes.env >&2
|
||||
exit 2
|
||||
fi
|
||||
|
||||
CLIENT_CA_CRT="../tls/ca.crt"
|
||||
CLIENT_CA_KEY="../tls/ca.key"
|
||||
if [[ ! -f "$CLIENT_CA_CRT" || ! -f "$CLIENT_CA_KEY" ]]; then
|
||||
echo "ERROR: client data-plane CA not found at ../tls/ca.{crt,key}." >&2
|
||||
echo " Run ../tls/generate-certs.sh first (it mints the client CA)." >&2
|
||||
exit 2
|
||||
fi
|
||||
|
||||
DAYS_CA=3650
|
||||
DAYS_CRT=825
|
||||
|
||||
force=0
|
||||
[[ "${1:-}" == "--force" ]] && force=1
|
||||
|
||||
# --- cluster route CA (separate trust root) ---
|
||||
if [[ ! -f cluster-ca.crt || ! -f cluster-ca.key || $force -eq 1 ]]; then
|
||||
echo "==> generating cluster route CA (separate from the client CA)"
|
||||
openssl ecparam -name prime256v1 -genkey -noout -out cluster-ca.key
|
||||
chmod 600 cluster-ca.key
|
||||
openssl req -x509 -new -key cluster-ca.key -sha256 -days "$DAYS_CA" \
|
||||
-subj "/CN=unibus-cluster-ca" -out cluster-ca.crt
|
||||
else
|
||||
echo "==> reusing existing cluster route CA (pass --force to regenerate)"
|
||||
fi
|
||||
|
||||
# mint <out_key> <out_crt> <subject_cn> <san> <eku> <ca_crt> <ca_key>
|
||||
mint_cert() {
|
||||
local out_key="$1" out_crt="$2" cn="$3" san="$4" eku="$5" ca_crt="$6" ca_key="$7"
|
||||
local csr ext
|
||||
csr="$(mktemp)"
|
||||
ext="$(mktemp)"
|
||||
openssl ecparam -name prime256v1 -genkey -noout -out "$out_key"
|
||||
chmod 600 "$out_key"
|
||||
openssl req -new -key "$out_key" -subj "/CN=${cn}" -out "$csr"
|
||||
cat > "$ext" <<EOF
|
||||
subjectAltName=${san}
|
||||
extendedKeyUsage=${eku}
|
||||
keyUsage=digitalSignature,keyEncipherment
|
||||
EOF
|
||||
openssl x509 -req -in "$csr" -CA "$ca_crt" -CAkey "$ca_key" -CAcreateserial \
|
||||
-sha256 -days "$DAYS_CRT" -extfile "$ext" -out "$out_crt"
|
||||
rm -f "$csr" "$ext"
|
||||
}
|
||||
|
||||
for row in "${CLUSTER_NODES[@]}"; do
|
||||
read -r name _ssh pub wg <<<"$row"
|
||||
echo "==> node ${name}: SAN IP:${pub}, IP:${wg}, DNS:${name}, localhost, 127.0.0.1"
|
||||
nodedir="out/${name}"
|
||||
mkdir -p "$nodedir"
|
||||
san="IP:${pub},IP:${wg},DNS:${name},DNS:localhost,IP:127.0.0.1"
|
||||
|
||||
# Route cert: signed by the cluster CA; server+client auth (mutual routes).
|
||||
mint_cert "${nodedir}/route-${name}.key" "${nodedir}/route-${name}.crt" \
|
||||
"unibus-route-${name}" "$san" "serverAuth,clientAuth" \
|
||||
cluster-ca.crt cluster-ca.key
|
||||
|
||||
# Data-plane server cert: signed by the client CA; serverAuth only.
|
||||
mint_cert "${nodedir}/server-${name}.key" "${nodedir}/server-${name}.crt" \
|
||||
"unibus-${name}" "$san" "serverAuth" \
|
||||
"$CLIENT_CA_CRT" "$CLIENT_CA_KEY"
|
||||
|
||||
# Co-locate the two CA certs each node needs.
|
||||
cp cluster-ca.crt "${nodedir}/cluster-ca.crt"
|
||||
cp "$CLIENT_CA_CRT" "${nodedir}/ca.crt"
|
||||
done
|
||||
|
||||
rm -f cluster-ca.srl ../tls/ca.srl 2>/dev/null || true
|
||||
|
||||
echo
|
||||
echo "==> done. Per-node material under out/<name>/ (KEYS ARE SECRET — never git):"
|
||||
for row in "${CLUSTER_NODES[@]}"; do
|
||||
read -r name _rest <<<"$row"
|
||||
echo " out/${name}/ (route-${name}.*, server-${name}.*, cluster-ca.crt, ca.crt)"
|
||||
done
|
||||
echo
|
||||
echo "verify a SAN with:"
|
||||
echo " openssl x509 -in out/<name>/server-<name>.crt -noout -text | grep -A1 'Subject Alternative Name'"
|
||||
@@ -0,0 +1,45 @@
|
||||
[Unit]
|
||||
# unibus membershipd — cluster node (issue 0006g).
|
||||
#
|
||||
# One unit, parameterized per node by /opt/unibus/cluster.env (generated by
|
||||
# deploy-cluster.sh): NODE_NAME, ROUTES and the cert paths differ per node, the
|
||||
# rest of the posture (enforce + per-subject ACL + TLS + --store kv) is identical
|
||||
# on every node, which is the homogeneous posture a secure cluster requires
|
||||
# (audit 0008 N1).
|
||||
Description=unibus membershipd (cluster node)
|
||||
After=network-online.target
|
||||
Wants=network-online.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
WorkingDirectory=/opt/unibus
|
||||
EnvironmentFile=/opt/unibus/cluster.env
|
||||
# The route password comes from a FILE referenced by ${CLUSTER_PASS_FILE}, never
|
||||
# from argv (audit 0008 N1-low). The peer --routes carry no userinfo; membershipd
|
||||
# injects the credentials from the file/user.
|
||||
ExecStart=/opt/unibus/membershipd \
|
||||
--bind 0.0.0.0 \
|
||||
--bus-auth enforce \
|
||||
--http-port ${HTTP_PORT} \
|
||||
--nats-port ${NATS_CLIENT_PORT} \
|
||||
--tls-cert ${TLS_CERT} \
|
||||
--tls-key ${TLS_KEY} \
|
||||
--cluster-name ${CLUSTER_NAME} \
|
||||
--server-name ${NODE_NAME} \
|
||||
--cluster-port ${NATS_ROUTE_PORT} \
|
||||
--routes ${ROUTES} \
|
||||
--cluster-user ${CLUSTER_USER} \
|
||||
--cluster-pass-file ${CLUSTER_PASS_FILE} \
|
||||
--route-tls-cert ${ROUTE_TLS_CERT} \
|
||||
--route-tls-key ${ROUTE_TLS_KEY} \
|
||||
--route-tls-ca ${ROUTE_TLS_CA} \
|
||||
--store kv \
|
||||
--kv-replicas ${KV_REPLICAS}
|
||||
# Restart=always (NOT on-failure): a clean SIGTERM exits success, and on-failure
|
||||
# would then NOT restart, leaving the node silently dead (see function_tags.md).
|
||||
Restart=always
|
||||
RestartSec=2
|
||||
LimitNOFILE=65536
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
@@ -0,0 +1,44 @@
|
||||
# Cluster topology for the unibus 3-node deployment (issue 0006g).
|
||||
#
|
||||
# This file is SOURCED by generate-cluster-certs.sh and deploy-cluster.sh.
|
||||
#
|
||||
# HUMAN: fill in every <PLACEHOLDER> with the real value before running the
|
||||
# scripts. The public IPs known at authoring time are pre-filled; the WireGuard
|
||||
# mesh IPs and magnus's public IP must be supplied. The scripts refuse to run
|
||||
# while any <PLACEHOLDER> remains.
|
||||
|
||||
# Cluster identity (must be identical on every node).
|
||||
CLUSTER_NAME="unibus"
|
||||
# Route-secret username; the password is NOT here — it lives in a file (see
|
||||
# CLUSTER_PASS_FILE in deploy-cluster.sh) so it never lands in argv or git.
|
||||
CLUSTER_USER="unibus-cluster"
|
||||
|
||||
# KV/nonce replication factor. START AT 1 for the initial 1->3 rollout, then raise
|
||||
# to 3 IN PLACE (see README "Scale to R3") once all three nodes have joined. Only
|
||||
# set this to 3 here after the third node is up and you re-run the KV update.
|
||||
KV_REPLICAS=1
|
||||
|
||||
# Ports (same on every node; the route port is server-to-server only).
|
||||
NATS_CLIENT_PORT=4250
|
||||
NATS_ROUTE_PORT=6250
|
||||
HTTP_PORT=8470
|
||||
|
||||
# Remote install layout and SSH login user.
|
||||
REMOTE_DIR="/opt/unibus"
|
||||
SSH_USER="root"
|
||||
|
||||
# Which address family the inter-node routes use. "wg" builds --routes from the
|
||||
# WireGuard mesh IPs (private server-to-server links, preferred); "public" uses
|
||||
# the public IPs. The route layer is always mutual-TLS regardless.
|
||||
ROUTE_NETWORK="wg"
|
||||
|
||||
# One row per node: NAME SSH_HOST PUBLIC_IP WG_IP
|
||||
# NAME -> --server-name and the per-node cert filenames (unique).
|
||||
# SSH_HOST -> the `ssh <SSH_HOST>` alias (see ~/.ssh/config).
|
||||
# PUBLIC_IP -> public address; goes in the cert SANs (client-facing data plane).
|
||||
# WG_IP -> WireGuard mesh address; cert SAN + route target when ROUTE_NETWORK=wg.
|
||||
CLUSTER_NODES=(
|
||||
"magnus magnus <MAGNUS_PUBLIC_IP> <MAGNUS_WG_IP>"
|
||||
"homer homer 141.94.69.66 <HOMER_WG_IP>"
|
||||
"datardos dd 51.91.100.142 <DATARDOS_WG_IP>"
|
||||
)
|
||||
@@ -18,7 +18,7 @@
|
||||
"decentralized": {
|
||||
"enabled": false,
|
||||
"issue": "0003",
|
||||
"description": "Control-plane state on replicated JetStream KV instead of local SQLite (branch-by-abstraction membership.Store: sqliteStore default OFF, jetstreamStore ON). The route cluster (0003a) and the KV store (0003b) ship behind this flag; the membershipd boot wiring that selects the KV store completes with the session/reconnect redesign (0003e) and is activated on the multi-node deploy (0003f). OFF keeps the single-node SQLite control plane unchanged.",
|
||||
"description": "Control-plane state on replicated JetStream KV instead of local SQLite (branch-by-abstraction membership.Store: sqliteStore default, jetstreamStore opt-in). The route cluster (0003a) and the KV store (0003b) shipped behind this flag; the membershipd boot wiring that selects the store is COMPLETE since issue 0006c and is realized at runtime with the server flag --store kv|sqlite (default sqlite). The internal-identity bootstrap (0006a) lets membershipd open the KV store on its own embedded NATS under enforce. Per-deploy opt-in: a node joins the decentralized control plane by starting with --store kv (and --cluster-name for HA). OFF (--store sqlite) keeps the single-node SQLite control plane unchanged.",
|
||||
"added": "2026-06-07",
|
||||
"enabled_at": null
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
---
|
||||
issue: 0006
|
||||
title: Completar y endurecer el cluster — wiring del control plane KV + N1-N6 de la auditoría 0008
|
||||
status: spec
|
||||
status: done
|
||||
created: 2026-06-07
|
||||
closed: 2026-06-07
|
||||
closed_by: fases 0006a–0006g (ver report 0009); unibus v0.8.0
|
||||
domain: security
|
||||
scope: unibus (cmd/membershipd, pkg/membership, pkg/embeddednats, pkg/busauth, pkg/client)
|
||||
depends_on: 0003 (completa su wiring), 0005 (hereda el bus single-node ya seguro)
|
||||
|
||||
@@ -85,6 +85,20 @@ func (s *Session) Join(roomID string) error {
|
||||
return s.c.Join(roomID)
|
||||
}
|
||||
|
||||
// RefreshSession reconnects the data plane so the bus re-derives this peer's
|
||||
// per-subject permissions from its current room membership.
|
||||
//
|
||||
// Membership-change contract (issue 0006e): a secured bus (--bus-auth enforce)
|
||||
// freezes a connection's permissions at connect time. After ANY membership change
|
||||
// — a room you just created, were invited to, or joined — call RefreshSession
|
||||
// BEFORE Publish/Subscribe on that room, or the bus denies the new room's subject.
|
||||
// It also drops active subscriptions, so re-Subscribe afterwards. On an unsecured
|
||||
// bus it is a harmless reconnect. A mobile/gateway caller wires this exactly like
|
||||
// cmd/chat and cmd/worker do: CreateRoom -> RefreshSession -> Subscribe/Publish.
|
||||
func (s *Session) RefreshSession() error {
|
||||
return s.c.RefreshSession()
|
||||
}
|
||||
|
||||
// Publish sends a UTF-8 text message to the room.
|
||||
func (s *Session) Publish(roomID, text string) error {
|
||||
return s.c.Publish(roomID, []byte(text))
|
||||
|
||||
@@ -82,6 +82,15 @@ type PermissionsFunc func(signPubHex string) (*server.Permissions, error)
|
||||
type nkeyAuthenticatorACL struct {
|
||||
isAuthorized func(signPubHex string) bool
|
||||
perms PermissionsFunc
|
||||
// internalPubHex is the lowercase-hex Ed25519 public key of membershipd's own
|
||||
// ephemeral internal service identity. A connection that proves that key is
|
||||
// granted full permissions WITHOUT consulting the allowlist, so the service can
|
||||
// bootstrap and manage JetStream (the replicated nonce bucket and, when
|
||||
// decentralized, the control-plane KV buckets) against its own embedded server
|
||||
// even while the data plane confines every client to its rooms. Empty disables
|
||||
// the internal-identity path entirely (behavior identical to a plain ACL
|
||||
// authenticator).
|
||||
internalPubHex string
|
||||
}
|
||||
|
||||
// NewNkeyAuthenticatorACL builds an authenticator that authorizes by the bus
|
||||
@@ -94,6 +103,29 @@ func NewNkeyAuthenticatorACL(isAuthorized func(signPubHex string) bool, perms Pe
|
||||
return &nkeyAuthenticatorACL{isAuthorized: isAuthorized, perms: perms}
|
||||
}
|
||||
|
||||
// NewNkeyAuthenticatorACLInternal is NewNkeyAuthenticatorACL that also recognizes
|
||||
// membershipd's internal service identity (internalPubHex, the lowercase hex of
|
||||
// its ephemeral Ed25519 public key): a connection proving that key is granted
|
||||
// full permissions without an allowlist lookup, so the service can create and
|
||||
// manage JetStream against its own embedded server under enforce (issue 0006a/c —
|
||||
// the replicated nonce bucket and the control-plane KV). Every other identity
|
||||
// goes through the allowlist + per-subject ACL unchanged. An empty internalPubHex
|
||||
// is identical to NewNkeyAuthenticatorACL, so this is a superset and safe to use
|
||||
// everywhere the plain constructor was used.
|
||||
func NewNkeyAuthenticatorACLInternal(isAuthorized func(signPubHex string) bool, perms PermissionsFunc, internalPubHex string) server.Authentication {
|
||||
return &nkeyAuthenticatorACL{isAuthorized: isAuthorized, perms: perms, internalPubHex: internalPubHex}
|
||||
}
|
||||
|
||||
// fullPermissions grants publish and subscribe on every subject (">"). It is the
|
||||
// permission set for membershipd's own internal service connection, which must
|
||||
// manage the JetStream control plane (nonce bucket + KV buckets) over NATS. It is
|
||||
// NEVER granted to a bus user — only to the process's own ephemeral internal
|
||||
// identity, recognized by exact public-key match in Check.
|
||||
func fullPermissions() *server.Permissions {
|
||||
sp := &server.SubjectPermission{Allow: []string{">"}}
|
||||
return &server.Permissions{Publish: sp, Subscribe: sp}
|
||||
}
|
||||
|
||||
// Check verifies the nkey, authorizes against the allowlist, then derives and
|
||||
// registers the connection's subject permissions. A permissions-derivation
|
||||
// error denies the connection (fail closed) rather than granting open access.
|
||||
@@ -102,6 +134,14 @@ func (a *nkeyAuthenticatorACL) Check(c server.ClientAuthentication) bool {
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
// membershipd's own internal service identity bypasses the allowlist and is
|
||||
// granted full permissions so the service can bootstrap JetStream under
|
||||
// enforce. The key is matched exactly against the cryptographically verified
|
||||
// connecting key, so no other identity can claim these permissions.
|
||||
if a.internalPubHex != "" && signPubHex == a.internalPubHex {
|
||||
c.RegisterUser(&server.User{Permissions: fullPermissions()})
|
||||
return true
|
||||
}
|
||||
if !a.isAuthorized(signPubHex) {
|
||||
return false
|
||||
}
|
||||
|
||||
+83
-17
@@ -1,31 +1,95 @@
|
||||
package membership
|
||||
|
||||
// Per-subject data-plane access control derived from room membership (issue
|
||||
// 0003e, audit H4 residual). The control plane already authorizes metadata by
|
||||
// membership; this is the matching restriction on the NATS data plane so a
|
||||
// registered peer can only publish/subscribe on the subjects of the rooms it
|
||||
// actually belongs to — not on every subject on the bus.
|
||||
// 0003e, audit H4 residual; tightened in issue 0006b for audit 0008 N2). The
|
||||
// control plane already authorizes metadata by membership; this is the matching
|
||||
// restriction on the NATS data plane so a registered peer can only
|
||||
// publish/subscribe on the subjects of the rooms it actually belongs to — and can
|
||||
// only reach the JetStream API of ITS OWN rooms' streams, never the control-plane
|
||||
// KV buckets.
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
)
|
||||
|
||||
// clientInfraSubjects are the subjects every peer needs regardless of room
|
||||
// membership: the request/reply inbox space and the JetStream API (the durable
|
||||
// plane of persisted rooms). They are granted to all authorized peers so
|
||||
// request/reply and persisted-room history keep working under the subject ACL.
|
||||
var clientInfraSubjects = []string{"_INBOX.>", "$JS.API.>"}
|
||||
// clientInfraSubjects are the subjects every authorized peer needs regardless of
|
||||
// room membership, kept deliberately MINIMAL (issue 0006b, audit 0008 N2):
|
||||
//
|
||||
// - "_INBOX.>" — request/reply plus the JetStream pull-consumer delivery
|
||||
// and publish-ack inboxes.
|
||||
// - "$JS.API.INFO" — account-level JetStream info (limits/usage counters). It
|
||||
// exposes NO room/user/key contents, so granting it leaks nothing.
|
||||
//
|
||||
// It NO LONGER contains "$JS.API.>". That broad grant was the N2 leak: it let any
|
||||
// registered peer drive the whole JetStream API and read the control-plane KV
|
||||
// buckets (KV_UNIBUS_users/rooms/members/room_keys) and the object store directly
|
||||
// over NATS, bypassing the HTTP authorization (requireMember and the own-endpoint
|
||||
// checks). JetStream API access is now granted PER ROOM, scoped to the stream of
|
||||
// each room the peer belongs to (jsSubjectsFor). Because the control-plane KV
|
||||
// streams (KV_UNIBUS_*) and the object store (OBJ_UNIBUS_*) are never a room
|
||||
// stream, they fall outside the closed allow set and are denied by default.
|
||||
var clientInfraSubjects = []string{"_INBOX.>", "$JS.API.INFO"}
|
||||
|
||||
// SubjectACLFor returns a function that maps a signing public key (lowercase
|
||||
// hex) to the data-plane subjects that identity may publish and subscribe to:
|
||||
// the subject of every room it belongs to, plus the client infrastructure
|
||||
// subjects. It reads the live membership store, so the permissions reflect the
|
||||
// identity's rooms at the moment it connects. A decode error or a store failure
|
||||
// is returned as an error so the caller can fail closed (deny the connection)
|
||||
// rather than grant open access.
|
||||
// roomStreamName is the JetStream stream name a persisted room maps to. It MUST
|
||||
// stay identical to pkg/client.streamName ("UNIBUS_" + sanitized roomID) so the
|
||||
// per-room ACL grants exactly the subjects the client's JetStream calls use. Room
|
||||
// ids are ULIDs (no '.'), so the sanitizing is a no-op in practice, but the rule
|
||||
// is replicated defensively so the producer (client) and the authorizer (this
|
||||
// ACL) never drift apart.
|
||||
func roomStreamName(roomID string) string {
|
||||
var b strings.Builder
|
||||
b.Grow(len("UNIBUS_") + len(roomID))
|
||||
b.WriteString("UNIBUS_")
|
||||
for _, r := range roomID {
|
||||
switch {
|
||||
case r >= 'a' && r <= 'z', r >= 'A' && r <= 'Z', r >= '0' && r <= '9', r == '_':
|
||||
b.WriteRune(r)
|
||||
default:
|
||||
b.WriteRune('_')
|
||||
}
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// jsSubjectsFor returns the MINIMAL JetStream API subjects a peer needs to use the
|
||||
// durable stream of ONE persisted room: create/update/info the stream, manage and
|
||||
// pull from its durable consumer, and ack deliveries. Every subject embeds this
|
||||
// room's stream name, so the grant cannot reach another room's stream nor any
|
||||
// control-plane stream (KV_UNIBUS_* / OBJ_UNIBUS_*). The wildcard layout matches
|
||||
// the NATS JetStream API subject grammar (the stream name is the trailing token
|
||||
// of single-verb requests and follows a two-token verb for MSG.GET / MSG.NEXT /
|
||||
// DURABLE.CREATE):
|
||||
//
|
||||
// $JS.API.STREAM.<verb>.<stream> verb in {CREATE,UPDATE,INFO,DELETE,PURGE,...}
|
||||
// $JS.API.STREAM.MSG.<op>.<stream> op in {GET,DELETE}
|
||||
// $JS.API.CONSUMER.<verb>.<stream> verb in {LIST,NAMES,CREATE(ephemeral)}
|
||||
// $JS.API.CONSUMER.<verb>.<stream>.<consumer>... verb in {CREATE,INFO,DELETE}
|
||||
// $JS.API.CONSUMER.<v1>.<v2>.<stream>.<cons> {MSG.NEXT, DURABLE.CREATE}
|
||||
// $JS.ACK.<stream>.> message acknowledgements
|
||||
func jsSubjectsFor(roomID string) []string {
|
||||
s := roomStreamName(roomID)
|
||||
return []string{
|
||||
"$JS.API.STREAM.*." + s,
|
||||
"$JS.API.STREAM.*.*." + s,
|
||||
"$JS.API.CONSUMER.*." + s,
|
||||
"$JS.API.CONSUMER.*." + s + ".>",
|
||||
"$JS.API.CONSUMER.*.*." + s + ".>",
|
||||
"$JS.ACK." + s + ".>",
|
||||
}
|
||||
}
|
||||
|
||||
// SubjectACLFor returns a function that maps a signing public key (lowercase hex)
|
||||
// to the data-plane subjects that identity may publish and subscribe to: the
|
||||
// subject of every room it belongs to, the per-room JetStream API subjects of
|
||||
// those rooms (so persisted-room history keeps working), plus the minimal client
|
||||
// infrastructure subjects. It reads the live membership store, so the permissions
|
||||
// reflect the identity's rooms at the moment it connects. A decode error or a
|
||||
// store failure is returned as an error so the caller can fail closed (deny the
|
||||
// connection) rather than grant open access.
|
||||
//
|
||||
// Because NATS freezes permissions at connect time, a peer invited to a new room
|
||||
// after connecting must reconnect (client.RefreshSession) to pick up the new
|
||||
@@ -42,10 +106,12 @@ func SubjectACLFor(store Store) func(signPubHex string) ([]string, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("acl: list rooms for %s: %w", endpoint, err)
|
||||
}
|
||||
subjects := make([]string, 0, len(rooms)+len(clientInfraSubjects))
|
||||
// clientInfra + per room: the room subject + that room's JetStream API.
|
||||
subjects := make([]string, 0, len(clientInfraSubjects)+len(rooms)*7)
|
||||
subjects = append(subjects, clientInfraSubjects...)
|
||||
for _, r := range rooms {
|
||||
subjects = append(subjects, r.Subject)
|
||||
subjects = append(subjects, jsSubjectsFor(r.RoomID)...)
|
||||
}
|
||||
return subjects, nil
|
||||
}
|
||||
|
||||
@@ -229,14 +229,11 @@ func TestSubjectACLIsolation(t *testing.T) {
|
||||
// - golden: the member still pub/subs her own room, and the non-member never
|
||||
// captures that traffic.
|
||||
//
|
||||
// Residual (DOCUMENTED, not closed here): the client-infra grant includes
|
||||
// "$JS.API.>", shared by all peers so per-connection JetStream works. A peer that
|
||||
// subscribes specifically to "$JS.API.>" can still observe stream-management
|
||||
// requests whose subjects embed the stream name derived from a room id. Fully
|
||||
// closing that needs NATS accounts/permissions isolation per identity (deferred to
|
||||
// the 0003 decentralization line). The high-impact leak the auditor exploited —
|
||||
// the room subject itself and JetStream advisories captured via "Subscribe(\">\")"
|
||||
// — is closed.
|
||||
// Residual now CLOSED (issue 0006b, audit 0008 N2): the client-infra grant no
|
||||
// longer includes "$JS.API.>". JetStream API access is granted per-room only
|
||||
// (membership.jsSubjectsFor), so a peer can reach the API of its OWN rooms'
|
||||
// streams but not the control-plane KV buckets (KV_UNIBUS_*) nor another room's
|
||||
// stream. See TestAttack0008_N2 for the closed-leak regression.
|
||||
func TestReaudit_H4_WildcardMetadataLeak(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
|
||||
@@ -0,0 +1,152 @@
|
||||
package membership_test
|
||||
|
||||
// Regression for audit report 0008, vector N2: with the broad "$JS.API.>" grant
|
||||
// removed (issue 0006b), a registered peer that belongs to no room can no longer
|
||||
// read the control-plane KV buckets over NATS, while the per-room JetStream API of
|
||||
// a peer's OWN rooms keeps working. The auditor's ephemeral attack populated the
|
||||
// KV control plane and had a registered non-member harvest the allowlist, the room
|
||||
// graph and the sealed-key metadata directly through "$JS.API.>".
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
server "github.com/nats-io/nats-server/v2/server"
|
||||
)
|
||||
|
||||
// startACLNatsInternal is startACLNats plus a recognized internal service identity
|
||||
// (so the test can seed the KV control plane with full permissions, exactly as the
|
||||
// decentralized membershipd does at bootstrap).
|
||||
func startACLNatsInternal(t *testing.T, store membership.Store, internalPubHex string) *server.Server {
|
||||
t.Helper()
|
||||
auth := busauth.NewNkeyAuthenticatorACLInternal(store.IsAuthorized, aclPermsFunc(store), internalPubHex)
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: aclFreePort(t), Auth: auth,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("acl nats: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||
return ns
|
||||
}
|
||||
|
||||
// TestAttack0008_N2 reproduces the control-plane KV leak and proves it is closed.
|
||||
//
|
||||
// error : eve (registered, member of no room) cannot read the KV buckets — the
|
||||
// JetStream KV API and the raw $KV subject space are both denied.
|
||||
// golden: the owner of a persisted room can still drive the JetStream API of HER
|
||||
// OWN room's stream (so persisted-room history keeps working).
|
||||
// edge : eve cannot reach another room's stream API either (cross-room JS deny).
|
||||
func TestAttack0008_N2(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
// The HTTP control-plane store stays SQLite; the KV buckets below stand in for
|
||||
// the decentralized control plane the attack targets.
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
|
||||
ceo, eve, internalID := mustID(t), mustID(t), mustID(t)
|
||||
ceoEP := frame.EndpointID(ceo.SignPub)
|
||||
mustAddUser(t, store, ceo, "ceo-root-admin")
|
||||
mustAddUser(t, store, eve, "eve") // registered, member of nothing
|
||||
// A persisted room owned by ceo: ceo is a member, so her per-room JS is allowed.
|
||||
if err := store.CreateRoom(
|
||||
membership.RoomInfo{RoomID: "PRIVROOM", Subject: "room.board.ma-deal", Encrypt: true, Persist: true, OwnerEndpoint: ceoEP},
|
||||
ceo.SignPub, ceo.KexPub, []byte("sealed-self"),
|
||||
); err != nil {
|
||||
t.Fatalf("create room: %v", err)
|
||||
}
|
||||
|
||||
internalPubHex := hex.EncodeToString(internalID.SignPub)
|
||||
ns := startACLNatsInternal(t, store, internalPubHex)
|
||||
url := ns.ClientURL()
|
||||
|
||||
// Seed the KV control plane with the privileged internal identity (full perms),
|
||||
// simulating the decentralized buckets the attack reads.
|
||||
intErr := make(chan error, 4)
|
||||
intNC := nkeyConn(t, url, internalID, intErr)
|
||||
intJS, err := jetstream.New(intNC)
|
||||
if err != nil {
|
||||
t.Fatalf("internal jetstream: %v", err)
|
||||
}
|
||||
kvStore, err := membership.OpenJetStream(intJS, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
|
||||
if err != nil {
|
||||
t.Fatalf("open kv buckets: %v", err)
|
||||
}
|
||||
if err := kvStore.AddUser(hex.EncodeToString(ceo.SignPub), "ceo-root-admin", membership.RoleAdmin); err != nil {
|
||||
t.Fatalf("seed kv user: %v", err)
|
||||
}
|
||||
|
||||
// Each JetStream op gets its own short context: a DENIED request never gets a
|
||||
// reply, so it blocks until its own deadline — a shared context would be
|
||||
// exhausted by the first denied call and starve the rest.
|
||||
freshCtx := func(d time.Duration) (context.Context, context.CancelFunc) {
|
||||
return context.WithTimeout(context.Background(), d)
|
||||
}
|
||||
|
||||
// --- error: eve cannot read the control-plane KV buckets ------------------
|
||||
eveErr := make(chan error, 8)
|
||||
eveNC := nkeyConn(t, url, eve, eveErr)
|
||||
eveJS, err := jetstream.New(eveNC)
|
||||
if err != nil {
|
||||
t.Fatalf("eve jetstream: %v", err)
|
||||
}
|
||||
// (a) The KV API: binding the bucket requires STREAM.INFO.KV_UNIBUS_users, which
|
||||
// eve has no permission for, so this must fail (no leak of users).
|
||||
kvCtx, kvCancel := freshCtx(2 * time.Second)
|
||||
if kv, err := eveJS.KeyValue(kvCtx, "UNIBUS_users"); err == nil {
|
||||
if e, gerr := kv.Get(kvCtx, hex.EncodeToString(ceo.SignPub)); gerr == nil {
|
||||
kvCancel()
|
||||
t.Fatalf("eve read the control-plane KV users bucket: %q (N2 leak still open)", string(e.Value()))
|
||||
}
|
||||
kvCancel()
|
||||
t.Fatalf("eve was able to BIND the KV users bucket (N2 leak still open)")
|
||||
}
|
||||
kvCancel()
|
||||
// (b) The raw KV subject space: a direct subscribe must be a permissions
|
||||
// violation (delivered async to the error handler).
|
||||
drain(eveErr)
|
||||
if _, err := eveNC.Subscribe("$KV.UNIBUS_users.>", func(*nats.Msg) {}); err != nil {
|
||||
t.Fatalf("eve sub $KV: %v", err)
|
||||
}
|
||||
_ = eveNC.Flush()
|
||||
if e := waitErr(eveErr, 1*time.Second); e == nil {
|
||||
t.Fatalf("eve subscribing to $KV.UNIBUS_users.> must raise a permissions violation")
|
||||
}
|
||||
|
||||
// --- edge: eve cannot reach another room's stream API ---------------------
|
||||
edgeCtx, edgeCancel := freshCtx(2 * time.Second)
|
||||
if _, err := eveJS.Stream(edgeCtx, "UNIBUS_PRIVROOM"); err == nil {
|
||||
edgeCancel()
|
||||
t.Fatalf("eve reached the foreign room stream API (cross-room JS not isolated)")
|
||||
}
|
||||
edgeCancel()
|
||||
|
||||
// --- golden: ceo can drive the JetStream API of HER OWN room's stream ------
|
||||
ceoErr := make(chan error, 4)
|
||||
ceoNC := nkeyConn(t, url, ceo, ceoErr)
|
||||
ceoJS, err := jetstream.New(ceoNC)
|
||||
if err != nil {
|
||||
t.Fatalf("ceo jetstream: %v", err)
|
||||
}
|
||||
goldenCtx, goldenCancel := freshCtx(5 * time.Second)
|
||||
defer goldenCancel()
|
||||
if _, err := ceoJS.CreateOrUpdateStream(goldenCtx, jetstream.StreamConfig{
|
||||
Name: "UNIBUS_PRIVROOM",
|
||||
Subjects: []string{"room.board.ma-deal"},
|
||||
Storage: jetstream.FileStorage,
|
||||
}); err != nil {
|
||||
t.Fatalf("ceo could not manage her OWN room stream (per-room JS broken): %v", err)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package membership_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
)
|
||||
|
||||
// TestHealthExposesPosture: /healthz publishes the node's security posture so a
|
||||
// monitor (or a peer) can detect a cluster member that is not enforce+ACL+TLS
|
||||
// (audit 0008 N1). The probe stays unauthenticated.
|
||||
func TestHealthExposesPosture(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
|
||||
srv := membership.NewServer(store, blobs, membership.AuthEnforce)
|
||||
srv.Posture = membership.Posture{Enforce: true, ACL: true, TLS: true, Cluster: true, Store: "kv"}
|
||||
ts := httptest.NewServer(srv)
|
||||
t.Cleanup(ts.Close)
|
||||
|
||||
resp, err := http.Get(ts.URL + "/healthz")
|
||||
if err != nil {
|
||||
t.Fatalf("get healthz: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("healthz status %d, want 200", resp.StatusCode)
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
var got struct {
|
||||
Status string `json:"status"`
|
||||
Posture membership.Posture `json:"posture"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &got); err != nil {
|
||||
t.Fatalf("decode healthz %q: %v", string(body), err)
|
||||
}
|
||||
if got.Status != "ok" {
|
||||
t.Fatalf("status = %q, want ok", got.Status)
|
||||
}
|
||||
if !got.Posture.Enforce || !got.Posture.ACL || !got.Posture.TLS || !got.Posture.Cluster {
|
||||
t.Fatalf("posture not surfaced correctly: %+v", got.Posture)
|
||||
}
|
||||
if got.Posture.Store != "kv" {
|
||||
t.Fatalf("posture.store = %q, want kv", got.Posture.Store)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
package membership_test
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
"github.com/enmanuel/unibus/pkg/room"
|
||||
)
|
||||
|
||||
// TestClientCreateRoomRefreshPublishFlow is the issue 0006e DoD: under enforce+ACL
|
||||
// a peer creates a room AFTER connecting, and pub/sub works without manual
|
||||
// intervention because the client follows the membership-change contract
|
||||
// (CreateRoom -> RefreshSession -> Subscribe/Publish), exactly as cmd/chat and
|
||||
// cmd/worker now do. This is the end-to-end flow through the client API, proving
|
||||
// the ACL is usable under enforce rather than something an operator must disable.
|
||||
func TestClientCreateRoomRefreshPublishFlow(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
|
||||
alice, bob := mustID(t), mustID(t)
|
||||
mustAddUser(t, store, alice, "alice")
|
||||
mustAddUser(t, store, bob, "bob")
|
||||
|
||||
srv := startACLNats(t, store) // data plane: enforce + per-subject ACL
|
||||
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
ctrl := newCtrl(t, store, blobs)
|
||||
|
||||
aliceC, err := client.NewWithOptions(srv.ClientURL(), ctrl, alice, client.Options{UseNkey: true})
|
||||
if err != nil {
|
||||
t.Fatalf("connect alice: %v", err)
|
||||
}
|
||||
defer aliceC.Close()
|
||||
bobC, err := client.NewWithOptions(srv.ClientURL(), ctrl, bob, client.Options{UseNkey: true})
|
||||
if err != nil {
|
||||
t.Fatalf("connect bob: %v", err)
|
||||
}
|
||||
defer bobC.Close()
|
||||
|
||||
// alice creates a room AFTER connecting: the subject was not in her ACL at
|
||||
// connect time, so she must refresh to publish on it (the worker contract).
|
||||
roomID, err := aliceC.CreateRoom("room.flow.x", room.ModeNATS)
|
||||
if err != nil {
|
||||
t.Fatalf("alice create room: %v", err)
|
||||
}
|
||||
if err := aliceC.RefreshSession(); err != nil {
|
||||
t.Fatalf("alice refresh: %v", err)
|
||||
}
|
||||
|
||||
// alice invites bob; bob joins then refreshes to gain the subject (the chat
|
||||
// subscriber contract), and only then subscribes.
|
||||
if err := aliceC.Invite(roomID, bobC.Endpoint()); err != nil {
|
||||
t.Fatalf("alice invite bob: %v", err)
|
||||
}
|
||||
if err := bobC.Join(roomID); err != nil {
|
||||
t.Fatalf("bob join: %v", err)
|
||||
}
|
||||
if err := bobC.RefreshSession(); err != nil {
|
||||
t.Fatalf("bob refresh: %v", err)
|
||||
}
|
||||
got := make(chan string, 4)
|
||||
sub, err := bobC.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) { got <- string(plaintext) })
|
||||
if err != nil {
|
||||
t.Fatalf("bob subscribe after refresh: %v", err)
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
time.Sleep(200 * time.Millisecond) // let the subscription settle
|
||||
|
||||
if err := aliceC.Publish(roomID, []byte("hello-under-acl")); err != nil {
|
||||
t.Fatalf("alice publish after refresh: %v", err)
|
||||
}
|
||||
select {
|
||||
case msg := <-got:
|
||||
if msg != "hello-under-acl" {
|
||||
t.Fatalf("bob got %q", msg)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("bob did not receive the message: the create->refresh->subscribe flow is broken under enforce+ACL")
|
||||
}
|
||||
}
|
||||
@@ -81,6 +81,25 @@ type Server struct {
|
||||
// (non-loopback) bind. See dev/0004d-dataplane-acl.md for the full rationale
|
||||
// and the residual metadata exposure this does NOT close.
|
||||
RequireEncryptedRooms bool
|
||||
|
||||
// Posture is the node's security posture, surfaced on /healthz so an operator
|
||||
// or a peer can detect a node NOT running the homogeneous enforce+ACL+TLS
|
||||
// posture a secure cluster requires (audit 0008 N1). It is set by the command;
|
||||
// the zero value (all false) reflects an unsecured dev node.
|
||||
Posture Posture
|
||||
}
|
||||
|
||||
// Posture describes the security posture a membershipd node runs with. It is
|
||||
// non-secret operational metadata (booleans + the store backend name), published
|
||||
// on /healthz so a monitor can flag a cluster member that is not enforce+ACL+TLS
|
||||
// — the weak node that would let an unauthenticated peer harvest the cluster's
|
||||
// forwarded traffic (audit 0008 N1).
|
||||
type Posture struct {
|
||||
Enforce bool `json:"enforce"`
|
||||
ACL bool `json:"acl"`
|
||||
TLS bool `json:"tls"`
|
||||
Cluster bool `json:"cluster"`
|
||||
Store string `json:"store"` // "sqlite" | "kv"
|
||||
}
|
||||
|
||||
// NewServer wires the membership store and blob store into an http.Handler. The
|
||||
@@ -390,7 +409,7 @@ func (s *Server) verifyOwnerSig(roomID, by string, sig, canonical []byte) (Membe
|
||||
// ---- handlers -------------------------------------------------------------
|
||||
|
||||
func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "posture": s.Posture})
|
||||
}
|
||||
|
||||
func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
Reference in New Issue
Block a user