7 Commits

Author SHA1 Message Date
egutierrez e1a7402ff1 chore: bump unibus to 0.9.0 (live user-add + clientcheck)
New capability membershipd user add --store kv against a live cluster plus
cmd/clientcheck end-to-end verification (issue 0011 gaps, report 0012). Adds
the capability growth log entry.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 19:41:56 +02:00
egutierrez ce72131ddf docs(cluster): correct runbook + wire --internal-id-file into deploy
Corrections learned from the real 0011 deploy:
- Bring up: the "start magnus alone and verify healthz" order deadlocks — a
  lone node of a 3-node cluster has no meta-group quorum and never serves
  healthz until a second node joins. Document a quorum-forming start and that
  a node never self-serves.
- Replication: R1 is an unusable SPOF (all six control-plane buckets on one
  node) and the cold start only converges with the three cold-start fixes;
  go straight to R3 once the cluster forms.
- Add a "user add --store kv" section: the live user-add path that replaces
  stop-seed-restart, with its security model and idempotency/HA/no-delete
  semantics.
- Topology: real IPs, ROUTE_NETWORK=public (no WireGuard mesh exists).
- Chaos test: mark the data-plane client + failover proofs as validated (0012).

Deploy machinery now emits the persisted internal identity: the unit gains
--internal-id-file ${INTERNAL_ID_FILE} and deploy-cluster.sh writes
INTERNAL_ID_FILE into each node's cluster.env, so a fresh deploy enables the
live user-add path on every node.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 19:41:56 +02:00
egutierrez 3aa5a2c9a9 feat(clientcheck): end-to-end client verification (E2E room + failover)
The 0011 chaos test validated only the control plane (healthz + leader
failover + KV readable with 2/3); it never connected an authenticated bus
client to the data plane. cmd/clientcheck is a reusable verification tool: it
connects with a real identity (nkey + TLS on both planes, multi-node seed
lists), creates an ephemeral E2E room (encrypted + signed, no durable stream),
and either publishes N messages and asserts all come back decrypted (golden)
or publishes a counter for a duration while logging the attached node (loop),
so stopping a node mid-run shows the client fail over to a survivor and keep
receiving with quorum 2/3.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 19:41:56 +02:00
egutierrez 02c2004ebd feat(membershipd): user add/list/revoke --store kv against a live cluster
Closes the most valuable 0011 deploy gap: adding users to the running
cluster's replicated allowlist with no stop-seed-restart. Under enforce the
per-subject ACL confines every bus user to its own rooms, so no ordinary
identity may write the control-plane KV buckets; the only identity the
authenticator grants full JetStream permissions is membershipd's internal
service identity.

- main.go: --internal-id-file persists that identity (load-or-create, 0600)
  instead of a fresh ephemeral key, so the same nkey is available out of
  process. Empty keeps the ephemeral default (single-node/dev unchanged).
- users_kv.go: connectKVStore loads the persisted identity, presents its
  nkey (recognized as internal -> full perms), opens the KV store and
  writes. Defaults assume an on-node loopback invocation; a remote target
  without --ca is refused (allowlist must not travel cleartext, audit N6).
  Prints KV_UNIBUS_users replication (followers_current) after a write.
- users_cli.go: --store kv on add/list/revoke. Re-adding a key is an explicit
  ErrUserExists (no silent overwrite / role flip); revoke is a status flip.
- pkg/client: LoadIdentity (load-only) extracted from LoadOrCreateIdentity,
  preserving its "corrupt file is an error, not silently regenerated" guard.
- kv_useradd_test.go: golden write under enforce, idempotency, unreachable
  endpoint, and remote-without-CA refusal against an embedded node.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 19:41:38 +02:00
egutierrez ff580ac031 Merge quick/cluster-coldstart-fixes: 3-node cluster cold-start fixes + real topology 2026-06-07 18:56:28 +02:00
egutierrez 9fbff79df4 chore(deploy): fill cluster nodes.env with the real 3-node topology
Set magnus's public IP (135.125.201.30) and switch ROUTE_NETWORK to "public":
the three nodes have no WireGuard mesh (homer/datardos do not even have wg
installed), so server-to-server routes go over the public IPs, still protected
by the separate cluster route CA (mutual TLS). KV_REPLICAS is raised to 3 now
that the cluster runs at R3.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 18:56:28 +02:00
egutierrez 33746d9962 fix(cluster): make the JetStream control-plane survive a cold multi-node start
Bringing up the 3-node cluster from clean stores never converged: every node
looped on `open KV bucket "UNIBUS_rooms" (replicas=1): context deadline exceeded`.
Three independent defects in the clustered bootstrap path, none of which surface
on a single node (where JetStream is ready instantly), caused it:

1. embeddednats: route connection pooling (nats-server 2.10 default pool of 3)
   churned with "duplicate route"/"client closed" reconnects on the small cluster,
   interrupting the meta-group RAFT heartbeats and forcing perpetual leader
   re-elections. Set Cluster.PoolSize = -1 (single route per peer).

2. embeddednats: the cluster nodes are Docker hosts, so NATS advertised the docker
   bridge IPs (172.x / 10.0.x) to peers, which then tried to dial those private,
   mutually-unreachable addresses. Set Cluster.NoAdvertise = true so only the
   explicit public-IP routes are used. Also added a UNIBUS_NATS_DEBUG env toggle
   (off by default) that enables the embedded server's logger and loopback
   monitoring port for debugging the route/meta layer.

3. membership.OpenJetStream: a KV op is a NATS request/reply; on a cold cluster the
   op was published once, before the node had contact with the meta leader, so the
   request was dropped and the single long-context call just blocked until timeout.
   Retry each bucket op with short per-attempt contexts until it succeeds or an
   overall bootstrap budget (120s) is exhausted, so it lands once the meta settles.

With these the cluster forms cleanly, creates the KV buckets, scales R1->R3 in
place, and survives loss of one node (quorum 2/3). Verified on magnus+homer+datardos.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 18:56:28 +02:00
13 changed files with 972 additions and 90 deletions
+25 -1
View File
@@ -2,7 +2,7 @@
name: unibus name: unibus
lang: go lang: go
domain: infra domain: infra
version: 0.8.0 version: 0.9.0
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo." description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
tags: [service, messaging, nats, e2e] tags: [service, messaging, nats, e2e]
uses_functions: uses_functions:
@@ -154,6 +154,30 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
## Capability growth log ## Capability growth log
- v0.9.0 (2026-06-07) — cierre de los gaps que el despliegue del cluster (report
0011) dejó abiertos (report 0012). (GAP A) Nueva capability `membershipd user
add|list|revoke --store kv`: alta/baja de usuarios contra el KV replicado del
cluster EN MARCHA, sin el procedimiento de parar-sembrar-rearrancar. Usa la
conexión interna privilegiada — el daemon persiste su identidad de servicio con
`--internal-id-file` (cada nodo genera/carga la suya, 0600 junto a las claves TLS)
y la CLI, ejecutada por loopback en un nodo, presenta esa nkey que el
autenticador reconoce con permisos plenos de JetStream; ninguna identidad de
usuario normal puede tocar los buckets `KV_UNIBUS_*` bajo la ACL por-subject. El
alta es idempotente (re-alta de la misma clave = `ErrUserExists` explícito, sin
sobrescribir ni elevar rol), commitea con quórum 2/3 (HA, imprime
`followers_current`) y rechaza un destino remoto sin `--ca` (igual que
`migrate-to-kv`). (GAP B) Nuevo `cmd/clientcheck`: verificación end-to-end real
con un cliente autenticado (identidad operator, nkey+TLS+https) que crea una room
E2E, publica y recibe descifrado contra el cluster vivo, incluido un nodo parado a
media transmisión donde el cliente hace failover a un superviviente y sigue
recibiendo con cero pérdida (quórum 2/3) — el plano de datos que el chaos test del
0011 nunca probó. (GAP C) Runbook `deploy/cluster/README.md` corregido: el orden
de arranque "magnus solo y verifica healthz" deadlockeaba (un nodo solo no tiene
quórum del meta-group y nunca sirve healthz); se documenta el arranque por quórum,
que R1 es un SPOF inservible (ir directo a R3) y la nueva vía de alta con el
cluster vivo. La plantilla de deploy (unit + `deploy-cluster.sh`) emite ya
`INTERNAL_ID_FILE` y el flag. Verificado contra los 3 VPS reales (magnus + homer +
datardos); posture enforce+ACL+TLS+R3 intacta.
- v0.8.0 (2026-06-07) — completar y endurecer el cluster (issue 0006, fases - v0.8.0 (2026-06-07) — completar y endurecer el cluster (issue 0006, fases
0006a0006g) que cierra los bloqueantes de la auditoría dedicada del cluster 0006a0006g) que cierra los bloqueantes de la auditoría dedicada del cluster
(report 0008) y cablea el control plane descentralizado que 0003 dejó a medias. (report 0008) y cablea el control plane descentralizado que 0003 dejó a medias.
+260
View File
@@ -0,0 +1,260 @@
// Command clientcheck is an end-to-end verification client for a live unibus
// cluster (issue 0011 GAP B). The 0011 chaos test validated only the control
// plane (healthz + meta/stream-leader failover + KV readable with 2/3); it never
// connected an authenticated bus client (nkey + TLS) to create a room and
// publish/subscribe through it, least of all across a node loss. clientcheck does
// exactly that with a real identity (the operator), so the data-plane end-to-end
// path — connect, create an E2E room, publish, receive decrypted — is exercised
// against the running cluster, including while a node is stopped.
//
// It is a reusable tool, not a throwaway script: point it at the cluster's CA,
// an identity file, and the NATS + control-plane seed lists.
//
// # golden: connect, create an E2E room, publish N, confirm N decrypted back
// clientcheck --ca ca.crt --identity-file operator.id \
// --nats-seeds nats://A:4250,nats://B:4250,nats://C:4250 \
// --ctrl-seeds https://A:8470,https://B:8470,https://C:8470 --messages 5
//
// # loop: publish a counter every interval for the duration, logging the node
// # it is attached to — stop a node mid-run (systemctl stop membershipd-cluster)
// # and watch it fail over to a survivor and keep receiving (quorum 2/3).
// clientcheck ... --mode loop --duration 45s --interval 1s
package main
import (
"crypto/rand"
"encoding/hex"
"flag"
"fmt"
"log"
"sort"
"strings"
"sync"
"time"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/room"
)
func main() {
var (
caPath = flag.String("ca", "", "bus CA cert pinning TLS on both planes (required for a secured cluster)")
idFile = flag.String("identity-file", "", "path to the client identity JSON (e.g. `pass show unibus/operator-identity` written 0600) (required)")
natsSeeds = flag.String("nats-seeds", "", "comma-separated NATS urls of the cluster nodes (required)")
ctrlSeeds = flag.String("ctrl-seeds", "", "comma-separated control-plane https urls of the cluster nodes (required)")
subject = flag.String("subject", "test.gapcheck", "test room subject PREFIX; a random token is appended so runs never collide with real rooms")
messages = flag.Int("messages", 5, "golden mode: number of messages to publish and expect back")
mode = flag.String("mode", "golden", "golden (publish N, verify N decrypted) | loop (publish a counter for --duration, for failover testing)")
duration = flag.Duration("duration", 30*time.Second, "loop mode: how long to keep publishing")
interval = flag.Duration("interval", 1*time.Second, "loop mode: delay between published messages")
)
flag.Parse()
if *idFile == "" || *natsSeeds == "" || *ctrlSeeds == "" {
log.Fatalf("clientcheck: --identity-file, --nats-seeds and --ctrl-seeds are required")
}
id, err := client.LoadIdentity(*idFile)
if err != nil {
log.Fatalf("clientcheck: load identity: %v", err)
}
natsList := splitCSV(*natsSeeds)
ctrlList := splitCSV(*ctrlSeeds)
if len(natsList) == 0 || len(ctrlList) == 0 {
log.Fatalf("clientcheck: empty --nats-seeds or --ctrl-seeds")
}
// Build the secure client options: nkey on the data plane, TLS pinned to the
// bus CA on both planes, and the FULL seed lists so nats.go fails over to a
// surviving node when the attached one dies (the failover this tool verifies).
opts := client.Options{
NatsServers: natsList[1:],
CtrlURLs: ctrlList[1:],
}
if *caPath != "" {
tlsCfg, err := busauth.LoadCATLSConfig(*caPath)
if err != nil {
log.Fatalf("clientcheck: load CA: %v", err)
}
opts.UseNkey = true
opts.TLS = tlsCfg
opts.CtrlTLS = tlsCfg
for _, u := range ctrlList {
if !strings.HasPrefix(u, "https://") {
log.Fatalf("clientcheck: control URL %q must be https:// when --ca is set", u)
}
}
}
c, err := client.NewWithOptions(natsList[0], ctrlList[0], id, opts)
if err != nil {
log.Fatalf("clientcheck: connect: %v", err)
}
defer c.Close()
log.Printf("connected: endpoint=%s nats=%s", c.Endpoint().ID, c.ConnectedServer())
// Create an EPHEMERAL E2E room (encrypted + signed, NOT persisted): the test
// stays end-to-end encrypted (the cluster requires encryption on a public
// bind) while leaving no durable JetStream stream behind. The random subject
// token guarantees the room is unique and never a real room.
rnd := make([]byte, 8)
if _, err := rand.Read(rnd); err != nil {
log.Fatalf("clientcheck: random: %v", err)
}
subj := fmt.Sprintf("%s.%s", *subject, hex.EncodeToString(rnd))
policy := room.Policy{Encrypt: true, Persist: false, SignMsgs: true}
roomID, err := c.CreateRoom(subj, policy)
if err != nil {
log.Fatalf("clientcheck: create room: %v", err)
}
log.Printf("created E2E room: id=%s subject=%s (encrypt=%v sign=%v persist=%v)", roomID, subj, policy.Encrypt, policy.SignMsgs, policy.Persist)
// Under the per-subject ACL, NATS freezes permissions at connect time, so the
// just-created room's subject is not yet publishable/subscribable on the live
// connection. RefreshSession reconnects so the authenticator re-derives the
// ACL (now including this room) — the post-0006 contract every client follows
// after a membership change.
if err := c.RefreshSession(); err != nil {
log.Fatalf("clientcheck: refresh session: %v", err)
}
switch *mode {
case "golden":
runGolden(c, roomID, *messages)
case "loop":
runLoop(c, roomID, *duration, *interval)
default:
log.Fatalf("clientcheck: --mode must be golden or loop, got %q", *mode)
}
}
// runGolden subscribes, publishes n messages, and asserts all n come back
// decrypted. Exits non-zero if any are missing.
func runGolden(c *client.Client, roomID string, n int) {
var mu sync.Mutex
got := map[string]bool{}
sub, err := c.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
mu.Lock()
got[string(plaintext)] = true
mu.Unlock()
})
if err != nil {
log.Fatalf("clientcheck: subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(300 * time.Millisecond) // let the subscription settle
want := make([]string, n)
for i := 0; i < n; i++ {
msg := fmt.Sprintf("gapcheck-e2e-%d", i)
want[i] = msg
if err := c.Publish(roomID, []byte(msg)); err != nil {
log.Fatalf("clientcheck: publish %d: %v", i, err)
}
}
log.Printf("published %d messages to %s; waiting for decrypted echoes...", n, roomID)
deadline := time.Now().Add(15 * time.Second)
for time.Now().Before(deadline) {
mu.Lock()
have := len(got)
mu.Unlock()
if have >= n {
break
}
time.Sleep(100 * time.Millisecond)
}
mu.Lock()
defer mu.Unlock()
missing := 0
for _, w := range want {
if !got[w] {
missing++
log.Printf(" MISSING: %q", w)
}
}
log.Printf("connected node at finish: %s", c.ConnectedServer())
if missing > 0 {
log.Fatalf("GOLDEN FAIL: %d/%d messages not received decrypted", missing, n)
}
log.Printf("GOLDEN OK: all %d messages received and decrypted end-to-end", n)
}
// runLoop publishes a numbered message every interval for the duration and logs
// the count received plus the node currently attached, so an operator stopping a
// cluster node mid-run sees the client fail over to a survivor and keep receiving
// (quorum 2/3). It is the live failover-with-a-connected-client test the 0011
// chaos run never performed.
func runLoop(c *client.Client, roomID string, duration, interval time.Duration) {
var mu sync.Mutex
received := 0
servers := map[string]int{} // node -> #ticks observed attached
sub, err := c.Subscribe(roomID, func(_ frame.Frame, _ []byte) {
mu.Lock()
received++
mu.Unlock()
})
if err != nil {
log.Fatalf("clientcheck: subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(300 * time.Millisecond)
log.Printf("loop: publishing every %s for %s — stop a node now to test failover", interval, duration)
end := time.Now().Add(duration)
sent := 0
for time.Now().Before(end) {
msg := fmt.Sprintf("gapcheck-loop-%d", sent)
err := c.Publish(roomID, []byte(msg))
sent++
mu.Lock()
recv := received
mu.Unlock()
node := c.ConnectedServer()
up := c.IsConnected()
if node != "" {
mu.Lock()
servers[node]++
mu.Unlock()
}
pubStatus := "ok"
if err != nil {
pubStatus = "ERR:" + err.Error()
}
log.Printf(" t=%2ds sent=%d recv=%d up=%v node=%s publish=%s",
sent, sent, recv, up, node, pubStatus)
time.Sleep(interval)
}
mu.Lock()
defer mu.Unlock()
log.Printf("loop done: sent=%d received=%d", sent, received)
nodes := make([]string, 0, len(servers))
for n := range servers {
nodes = append(nodes, n)
}
sort.Strings(nodes)
for _, n := range nodes {
log.Printf(" attached to %s for %d ticks", n, servers[n])
}
if len(servers) > 1 {
log.Printf("FAILOVER OBSERVED: client was attached to %d distinct nodes across the run", len(servers))
}
if received == 0 {
log.Fatalf("LOOP FAIL: received 0 messages")
}
log.Printf("LOOP OK: client kept receiving across the run (received=%d)", received)
}
func splitCSV(s string) []string {
var out []string
for _, p := range strings.Split(s, ",") {
if p = strings.TrimSpace(p); p != "" {
out = append(out, p)
}
}
return out
}
+152
View File
@@ -0,0 +1,152 @@
package main
// Integration tests for issue 0011 GAP A: `membershipd user add --store kv`
// adds users to a RUNNING cluster's replicated allowlist via the privileged
// internal connection, instead of the stop-seed-restart procedure the 0011
// deploy required. These exercise the real connectKVStore path (load the
// persisted internal identity from a file, present its nkey, open the KV store,
// write the user) against an embedded enforce node, plus the idempotency and
// error semantics the DoD calls for. Multi-node replication and node-down quorum
// are validated against the live cluster (report 0012).
import (
"encoding/hex"
"errors"
"path/filepath"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/membership"
)
// startEnforceKVNode boots a single embedded enforce node whose authenticator
// recognizes internalPubHex as the privileged internal identity, bootstraps the
// KV control-plane store over the in-process internal connection, and publishes
// it into the holder — the exact sequence main.go performs for --store kv. It
// returns the client URL the CLI connects to.
func startEnforceKVNode(t *testing.T, internalID cs.Identity) string {
t.Helper()
holder := &storeHolder{}
auth := busauth.NewNkeyAuthenticatorACLInternal(
holder.IsAuthorized,
busauth.PermissionsFromSubjects(holder.subjectACL),
hex.EncodeToString(internalID.SignPub),
)
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t), Auth: auth,
})
if err != nil {
t.Fatalf("start enforce node: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
intNC, js, err := connectInternalJS(ns, internalID, true)
if err != nil {
t.Fatalf("bootstrap internal connection: %v", err)
}
t.Cleanup(intNC.Close)
kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
if err != nil {
t.Fatalf("bootstrap KV store: %v", err)
}
holder.set(kvStore)
return ns.ClientURL()
}
// TestUserAddStoreKV_GoldenAndIdempotent is the GAP A golden + edge-1: the CLI
// connection (real connectKVStore, loading the internal identity from a file and
// presenting its nkey) writes a user into the live KV allowlist, the user is
// authorized afterward, and re-adding the same key is an explicit ErrUserExists
// with no corruption (the unchanged row is still authorized).
func TestUserAddStoreKV_GoldenAndIdempotent(t *testing.T) {
idFile := filepath.Join(t.TempDir(), "internal.id")
internalID, err := client.LoadOrCreateIdentity(idFile) // persists 0600
if err != nil {
t.Fatalf("persist internal identity: %v", err)
}
url := startEnforceKVNode(t, internalID)
// Golden: connect as the privileged internal identity (loopback, no TLS) and
// add a new user, exactly as `user add --store kv` does.
kv, err := connectKVStore(url, idFile, "", 1)
if err != nil {
t.Fatalf("connectKVStore (privileged): %v", err)
}
defer kv.Close()
newUser, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("new user identity: %v", err)
}
pub := hex.EncodeToString(newUser.SignPub)
if err := kv.store.AddUser(pub, "gapcheck_user", membership.RoleMember); err != nil {
t.Fatalf("add user to live KV: %v", err)
}
if !kv.store.IsAuthorized(pub) {
t.Fatalf("user added to KV must be authorized")
}
// Edge 1: re-adding the same key is a clean, non-destructive ErrUserExists.
err = kv.store.AddUser(pub, "gapcheck_user", membership.RoleMember)
if !errors.Is(err, membership.ErrUserExists) {
t.Fatalf("re-add must return ErrUserExists (idempotent), got %v", err)
}
// A different handle/role with the SAME key is also rejected — the row is not
// silently overwritten (no role flip).
if err := kv.store.AddUser(pub, "impostor", membership.RoleAdmin); !errors.Is(err, membership.ErrUserExists) {
t.Fatalf("re-add with a different role must NOT overwrite; want ErrUserExists, got %v", err)
}
u, err := kv.store.GetUser(pub)
if err != nil {
t.Fatalf("get user: %v", err)
}
if u.Handle != "gapcheck_user" || u.Role != membership.RoleMember || u.Status != membership.StatusActive {
t.Fatalf("idempotent re-add corrupted the row: %+v", u)
}
}
// TestUserAddStoreKV_RequiresInternalIdentity: --store kv without a usable
// internal identity file fails loudly (missing file, empty path) rather than
// silently connecting unprivileged.
func TestUserAddStoreKV_RequiresInternalIdentity(t *testing.T) {
if _, err := connectKVStore("nats://127.0.0.1:4250", "", "", 1); err == nil {
t.Fatalf("empty --internal-id-file must be an error")
}
missing := filepath.Join(t.TempDir(), "nope.id")
if _, err := connectKVStore("nats://127.0.0.1:4250", missing, "", 1); err == nil {
t.Fatalf("missing internal identity file must be an error")
}
}
// TestUserAddStoreKV_UnreachableKV is the GAP A error case: pointing --store kv
// at a dead endpoint yields a clear, handled error (no crash, no silent success).
func TestUserAddStoreKV_UnreachableKV(t *testing.T) {
idFile := filepath.Join(t.TempDir(), "internal.id")
if _, err := client.LoadOrCreateIdentity(idFile); err != nil {
t.Fatalf("persist internal identity: %v", err)
}
// A loopback port with nothing listening: connect must fail fast and wrapped.
_, err := connectKVStore("nats://127.0.0.1:1/", idFile, "", 1)
if err == nil {
t.Fatalf("connecting to a dead endpoint must error")
}
}
// TestUserAddStoreKV_RemoteWithoutCARefused: a non-loopback target without --ca
// is refused so the allowlist write never travels in cleartext (audit 0008 N6,
// same guard as migrate-to-kv).
func TestUserAddStoreKV_RemoteWithoutCARefused(t *testing.T) {
idFile := filepath.Join(t.TempDir(), "internal.id")
if _, err := client.LoadOrCreateIdentity(idFile); err != nil {
t.Fatalf("persist internal identity: %v", err)
}
_, err := connectKVStore("nats://203.0.113.1:4250", idFile, "", 1)
if err == nil {
t.Fatalf("remote target without --ca must be refused")
}
}
+24
View File
@@ -24,6 +24,7 @@ import (
"github.com/enmanuel/unibus/pkg/blobstore" "github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/busauth" "github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/embeddednats" "github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/membership" "github.com/enmanuel/unibus/pkg/membership"
) )
@@ -83,6 +84,17 @@ func main() {
// "kv" puts rooms/members/keys/users in replicated JetStream KV so any node // "kv" puts rooms/members/keys/users in replicated JetStream KV so any node
// in the cluster serves the same state. // in the cluster serves the same state.
storeBackend = flag.String("store", "sqlite", "control-plane store backend: sqlite (default, single-node) | kv (replicated JetStream, decentralized)") storeBackend = flag.String("store", "sqlite", "control-plane store backend: sqlite (default, single-node) | kv (replicated JetStream, decentralized)")
// Persisted internal service identity (issue 0011 gaps, GAP A): when set, the
// privileged internal identity used to manage JetStream is LOADED from this
// file (generated and persisted on first start) instead of being a fresh
// ephemeral key each boot. Persisting it is what lets `membershipd user add
// --store kv` write the replicated allowlist of a LIVE cluster: that CLI,
// run over loopback on a node, loads the SAME identity and presents the nkey
// this node's authenticator already grants full permissions. Empty keeps the
// ephemeral-per-process behavior (single-node/dev default, unchanged). The
// file holds a private key: it is written 0600 and belongs next to the node's
// TLS keys (deploy keeps it under secrets/, gitignored).
internalIDFile = flag.String("internal-id-file", "", "path to a persisted internal service identity (JSON); enables `membershipd user add --store kv` against the live cluster. Empty = ephemeral per-process identity (dev default)")
) )
flag.Parse() flag.Parse()
@@ -136,10 +148,22 @@ func main() {
var internalID cs.Identity var internalID cs.Identity
var internalPubHex string var internalPubHex string
if needJS && enforce && *natsURL == "" { if needJS && enforce && *natsURL == "" {
if *internalIDFile != "" {
// Persisted identity: load it, generating + writing it (0600) on first
// start. A stable internal key is what `user add --store kv` presents to
// add users to a live cluster (GAP A); rotate it by deleting the file and
// restarting.
internalID, err = client.LoadOrCreateIdentity(*internalIDFile)
if err != nil {
log.Fatalf("load internal service identity %q: %v", *internalIDFile, err)
}
log.Printf("internal service identity: persisted (%s)", *internalIDFile)
} else {
internalID, err = cs.GenerateIdentity() internalID, err = cs.GenerateIdentity()
if err != nil { if err != nil {
log.Fatalf("generate internal identity: %v", err) log.Fatalf("generate internal identity: %v", err)
} }
}
internalPubHex = hex.EncodeToString(internalID.SignPub) internalPubHex = hex.EncodeToString(internalID.SignPub)
} }
+82 -8
View File
@@ -2,6 +2,7 @@ package main
import ( import (
"encoding/hex" "encoding/hex"
"errors"
"flag" "flag"
"fmt" "fmt"
"os" "os"
@@ -50,13 +51,26 @@ commands:
list List all registered users list List all registered users
revoke Revoke a user (denies access on both planes immediately) revoke Revoke a user (denies access on both planes immediately)
store backends (--store):
sqlite local SQLite database (default; seeds the first admin offline)
kv the RUNNING cluster's replicated JetStream KV allowlist, via the
privileged internal connection — add users with the cluster live,
no stop-seed-restart needed (run over loopback/SSH on a node)
examples: examples:
membershipd user add --handle alice --sign-pub <64-hex> --role admin membershipd user add --handle alice --sign-pub <64-hex> --role admin
membershipd user list membershipd user add --store kv --handle bob --sign-pub <64-hex> --role member
membershipd user list --store kv
membershipd user revoke <64-hex> membershipd user revoke <64-hex>
common flags: common flags:
--db <path> SQLite database path (default ./local_files/unibus.db) --db <path> SQLite database path (--store sqlite; default ./local_files/unibus.db)
--store kv flags (defaults assume an on-node invocation):
--nats-url <url> cluster NATS (default nats://127.0.0.1:4250)
--internal-id-file <path> persisted internal service identity (default /opt/unibus/secrets/internal.id)
--ca <path> CA cert pinning the data-plane TLS (default /opt/unibus/tls/ca.crt)
--kv-replicas <n> KV replication factor, match the cluster (default 3)
`) `)
} }
@@ -88,12 +102,59 @@ func validateSignPubHex(signPub string) error {
return nil return nil
} }
// kvFlags holds the connection flags shared by the --store kv path of the user
// subcommands. registerKVFlags wires them onto a flag set so add and list expose
// an identical interface.
type kvFlags struct {
store *string
natsURL *string
internalID *string
ca *string
replicas *int
}
func registerKVFlags(fs *flag.FlagSet) kvFlags {
return kvFlags{
store: fs.String("store", "sqlite", "user store backend: sqlite (local DB) | kv (the live cluster's replicated allowlist)"),
natsURL: fs.String("nats-url", defaultClusterNatsURL, "cluster NATS url for --store kv"),
internalID: fs.String("internal-id-file", defaultInternalIDFile, "persisted internal service identity for --store kv"),
ca: fs.String("ca", defaultClusterCAFile, "CA cert pinning TLS on the --store kv NATS connection"),
replicas: fs.Int("kv-replicas", 3, "KV replication factor for --store kv (match the cluster)"),
}
}
// resolveStore returns the membership store for the chosen backend plus a cleanup
// func. For --store kv it opens the privileged connection to the live cluster; for
// sqlite it opens the local file. It exits the process with a clear message on any
// failure (a dead NATS, a missing identity file), so a broken --store kv add fails
// loudly instead of silently — Error case of the GAP A DoD. The returned *kvConn
// is non-nil only for the kv backend (so the caller can report replication).
func resolveStore(cmd string, kf kvFlags, dbPath string) (membership.Store, *kvConn, func()) {
switch *kf.store {
case "sqlite":
store := openStore(dbPath)
return store, nil, func() { store.Close() }
case "kv":
kv, err := connectKVStore(*kf.natsURL, *kf.internalID, *kf.ca, *kf.replicas)
if err != nil {
fmt.Fprintf(os.Stderr, "membershipd %s: --store kv: %v\n", cmd, err)
os.Exit(1)
}
return kv.store, kv, kv.Close
default:
fmt.Fprintf(os.Stderr, "membershipd %s: --store must be \"sqlite\" or \"kv\", got %q\n", cmd, *kf.store)
os.Exit(2)
return nil, nil, func() {}
}
}
func userAdd(args []string) { func userAdd(args []string) {
fs := flag.NewFlagSet("user add", flag.ExitOnError) fs := flag.NewFlagSet("user add", flag.ExitOnError)
handle := fs.String("handle", "", "human-readable user name (required)") handle := fs.String("handle", "", "human-readable user name (required)")
signPub := fs.String("sign-pub", "", "Ed25519 signing public key in hex (required)") signPub := fs.String("sign-pub", "", "Ed25519 signing public key in hex (required)")
role := fs.String("role", membership.RoleMember, "role: admin or member") role := fs.String("role", membership.RoleMember, "role: admin or member")
dbPath := fs.String("db", defaultDBPath, "SQLite database path") dbPath := fs.String("db", defaultDBPath, "SQLite database path")
kf := registerKVFlags(fs)
_ = fs.Parse(args) _ = fs.Parse(args)
if *handle == "" || *signPub == "" { if *handle == "" || *signPub == "" {
@@ -105,23 +166,35 @@ func userAdd(args []string) {
os.Exit(2) os.Exit(2)
} }
store := openStore(*dbPath) store, kv, closeStore := resolveStore("user add", kf, *dbPath)
defer store.Close() defer closeStore()
if err := store.AddUser(*signPub, *handle, *role); err != nil { if err := store.AddUser(*signPub, *handle, *role); err != nil {
if errors.Is(err, membership.ErrUserExists) {
// Idempotency contract (GAP A): re-adding the same key is an EXPLICIT,
// non-destructive error — the existing row is left untouched (no silent
// upsert that could flip a role or clobber status, which would corrupt the
// allowlist). To replace a user, `user revoke <key>` then add again.
fmt.Fprintf(os.Stderr, "membershipd user add: user %s already registered (unchanged); revoke it first to replace\n", *signPub)
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "membershipd user add: %v\n", err) fmt.Fprintf(os.Stderr, "membershipd user add: %v\n", err)
os.Exit(1) os.Exit(1)
} }
fmt.Printf("added user %q (%s) role=%s\n", *handle, *signPub, *role) fmt.Printf("added user %q (%s) role=%s\n", *handle, *signPub, *role)
if kv != nil {
reportKVReplication(kv.js)
}
} }
func userList(args []string) { func userList(args []string) {
fs := flag.NewFlagSet("user list", flag.ExitOnError) fs := flag.NewFlagSet("user list", flag.ExitOnError)
dbPath := fs.String("db", defaultDBPath, "SQLite database path") dbPath := fs.String("db", defaultDBPath, "SQLite database path")
kf := registerKVFlags(fs)
_ = fs.Parse(args) _ = fs.Parse(args)
store := openStore(*dbPath) store, _, closeStore := resolveStore("user list", kf, *dbPath)
defer store.Close() defer closeStore()
users, err := store.ListUsers() users, err := store.ListUsers()
if err != nil { if err != nil {
@@ -143,6 +216,7 @@ func userList(args []string) {
func userRevoke(args []string) { func userRevoke(args []string) {
fs := flag.NewFlagSet("user revoke", flag.ExitOnError) fs := flag.NewFlagSet("user revoke", flag.ExitOnError)
dbPath := fs.String("db", defaultDBPath, "SQLite database path") dbPath := fs.String("db", defaultDBPath, "SQLite database path")
kf := registerKVFlags(fs)
// Go's flag package stops at the first non-flag argument, so `revoke <key> // Go's flag package stops at the first non-flag argument, so `revoke <key>
// --db path` would otherwise leave --db unparsed. Pull a leading positional // --db path` would otherwise leave --db unparsed. Pull a leading positional
@@ -167,8 +241,8 @@ func userRevoke(args []string) {
os.Exit(2) os.Exit(2)
} }
store := openStore(*dbPath) store, _, closeStore := resolveStore("user revoke", kf, *dbPath)
defer store.Close() defer closeStore()
if err := store.RevokeUser(signPub); err != nil { if err := store.RevokeUser(signPub); err != nil {
fmt.Fprintf(os.Stderr, "membershipd user revoke: %v\n", err) fmt.Fprintf(os.Stderr, "membershipd user revoke: %v\n", err)
+151
View File
@@ -0,0 +1,151 @@
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// users_kv.go is the `--store kv` half of the user administration CLI (issue 0011
// gaps, GAP A): adding and listing bus users directly against the RUNNING
// cluster's replicated JetStream KV allowlist, with no need to stop the cluster,
// seed a standalone node, and restart (the procedure the 0011 deploy required).
//
// The mechanism is the cluster's own privileged internal connection. Under
// enforce every bus user is confined by the per-subject ACL to the JetStream API
// of its own rooms, so no ordinary identity may touch the control-plane buckets
// (KV_UNIBUS_*). The ONLY identity the authenticator grants full JetStream
// permissions is membershipd's internal service identity. By persisting that
// identity to a file (membershipd --internal-id-file) the same key becomes
// available to this CLI, which presents it as its NATS nkey and is therefore
// recognized as the privileged internal client and allowed to read/write the KV.
//
// Intended invocation is over loopback on a cluster node (SSH): the data-plane
// TLS certificate's SAN covers 127.0.0.1/localhost and the internal identity file
// lives 0600 next to the node's TLS keys. Using the file requires root on the
// node, which already implies full control of that node — so co-locating it adds
// no practical exposure beyond what the TLS server key and cluster password
// already represent.
// defaultClusterNatsURL is the node-local NATS listener. The CLI is meant to run
// on a cluster node over SSH, talking to that node's own embedded server.
const defaultClusterNatsURL = "nats://127.0.0.1:4250"
// Deploy-default paths for the privileged identity and the data-plane CA, so an
// on-node invocation needs only --handle/--sign-pub/--role. Override for other
// layouts.
const (
defaultInternalIDFile = "/opt/unibus/secrets/internal.id"
defaultClusterCAFile = "/opt/unibus/tls/ca.crt"
)
// kvConn bundles the privileged NATS connection to a live cluster and the
// KV-backed control-plane store opened over it. Close releases both.
type kvConn struct {
nc *nats.Conn
js jetstream.JetStream
store membership.Store
}
func (k *kvConn) Close() {
if k == nil {
return
}
if k.store != nil {
_ = k.store.Close()
}
if k.nc != nil {
k.nc.Close()
}
}
// connectKVStore opens the privileged internal connection to the cluster's NATS
// and the JetStream KV control-plane store on top of it. internalIDFile is the
// membershipd-persisted internal service identity whose nkey the authenticator
// grants full permissions; caPath pins the data-plane TLS (empty only for a
// non-TLS dev cluster). A non-loopback target without --ca is refused, mirroring
// migrate-to-kv (audit 0008 N6): the allowlist write must not travel in cleartext.
func connectKVStore(natsURL, internalIDFile, caPath string, replicas int) (*kvConn, error) {
if internalIDFile == "" {
return nil, fmt.Errorf("--internal-id-file is required for --store kv (the privileged identity membershipd persists with --internal-id-file)")
}
// Confidentiality guard: a remote NATS without TLS would expose the allowlist
// (handles/roles/sign-pubs) and the privileged nkey handshake in cleartext.
if !isLoopbackURL(natsURL) && caPath == "" {
return nil, fmt.Errorf("refusing to connect to remote %q without --ca: the allowlist write would travel in cleartext — pin TLS with --ca, or run over a loopback --nats-url on a node", natsURL)
}
id, err := client.LoadIdentity(internalIDFile)
if err != nil {
return nil, fmt.Errorf("load internal identity: %w", err)
}
nkeyPub, nkeySign, err := busauth.ClientNkey(id.SignPriv)
if err != nil {
return nil, fmt.Errorf("derive nkey from internal identity: %w", err)
}
opts := []nats.Option{
nats.Name("membershipd-user-cli"),
nats.Nkey(nkeyPub, nkeySign),
}
if caPath != "" {
tlsCfg, err := busauth.LoadCATLSConfig(caPath)
if err != nil {
return nil, fmt.Errorf("load CA %q: %w", caPath, err)
}
opts = append(opts, nats.Secure(tlsCfg))
}
nc, err := nats.Connect(natsURL, opts...)
if err != nil {
return nil, fmt.Errorf("connect cluster NATS %q: %w", natsURL, err)
}
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
return nil, fmt.Errorf("jetstream: %w", err)
}
store, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: replicas})
if err != nil {
nc.Close()
return nil, fmt.Errorf("open KV control-plane store: %w", err)
}
return &kvConn{nc: nc, js: js, store: store}, nil
}
// reportKVReplication prints the replication status of the allowlist bucket
// stream (KV_UNIBUS_users) right after a write, so the operator sees the add
// landed on a quorum and replicated to the followers — executable evidence that
// the live-cluster add is HA, not single-node. Best-effort: a read failure is a
// note, not an error (the write itself already succeeded).
func reportKVReplication(js jetstream.JetStream) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
st, err := js.Stream(ctx, "KV_UNIBUS_users")
if err != nil {
fmt.Fprintf(os.Stderr, "note: could not read KV_UNIBUS_users stream info: %v\n", err)
return
}
info, err := st.Info(ctx)
if err != nil {
fmt.Fprintf(os.Stderr, "note: could not read KV_UNIBUS_users stream info: %v\n", err)
return
}
if info.Cluster == nil {
fmt.Printf("KV_UNIBUS_users: standalone (R1, no cluster replication); msgs=%d\n", info.State.Msgs)
return
}
current := 0
for _, r := range info.Cluster.Replicas {
if r.Current {
current++
}
}
fmt.Printf("KV_UNIBUS_users: leader=%s followers_current=%d/%d msgs=%d\n",
info.Cluster.Leader, current, len(info.Cluster.Replicas), info.State.Msgs)
}
+143 -39
View File
@@ -5,9 +5,12 @@ This directory holds the material to bring up unibus as a **3-node cluster**
plane (rooms/members/keys/users on JetStream KV + the anti-replay nonce bucket) plane (rooms/members/keys/users on JetStream KV + the anti-replay nonce bucket)
survives the loss of any one node (quorum 2/3). survives the loss of any one node (quorum 2/3).
> **The agent that authored this never touched a VPS.** Every step that changes a > **Status: this cluster is DEPLOYED in production** (magnus + homer + datardos,
> remote host is marked **HUMAN** and is executed by the operator. `deploy-cluster.sh` > R3, enforce+ACL+TLS) — see report 0011. The runbook below was authored before any
> defaults to a dry run. > VPS existed and has since been **corrected against the real deploy** (report 0012):
> the start ordering, the R1→R3 reality, and the live user-add path were all wrong
> or missing. Steps that change a remote host are marked **HUMAN**; `deploy-cluster.sh`
> still defaults to a dry run.
## Files ## Files
@@ -22,18 +25,22 @@ Generated keys/secrets (`out/`, `build/`, `secrets/`) are **gitignored** — the
secret and never leave the operator's trusted machine except over the secure secret and never leave the operator's trusted machine except over the secure
rsync channel. rsync channel.
## Topology ## Topology (as deployed, report 0011)
| Node | SSH | Public IP | WireGuard IP | Role | | Node | SSH | Public IP | Role |
|---|---|---|---|---| |---|---|---|---|
| magnus | `magnus` | `<MAGNUS_PUBLIC_IP>` | `<MAGNUS_WG_IP>` | seed (first up) | | magnus | `magnus` (root) | `135.125.201.30` | node — **= organic-machine.com = `om`**, the critical host (caddy + gitea + registry-api + monitoring); the bus runs alongside, untouched |
| homer | `homer` | `141.94.69.66` | `<HOMER_WG_IP>` | replica | | homer | `homer` (ubuntu+sudo) | `141.94.69.66` | node |
| datardos | `dd` | `51.91.100.142` | `<DATARDOS_WG_IP>` (10.21.0.x) | replica | | datardos | `dd` (ubuntu+sudo) | `51.91.100.142` | node |
The route layer (server-to-server) prefers the **WireGuard mesh** `ROUTE_NETWORK=public`, **not `wg`**: there is no WireGuard mesh between the three
(`ROUTE_NETWORK=wg`); the client data plane and the HTTP control plane are reached nodes (homer and datardos do not even have the `wg` binary; om's only WG peers are
over the public IPs. The route CA is **separate** from the client CA, so a client the operator's PCs). The server-to-server routes therefore travel over the public
cert can never be presented to the route port. IPs, protected by the **separate cluster route CA** (mutual route TLS) — a client
data-plane cert can never be presented to the route port. The client data plane and
the HTTP control plane are also reached over the public IPs. There is no fixed
"seed" node: with R3 the three are peers (see "Bring up" for why a lone node cannot
self-serve).
## Prerequisites (HUMAN, once) ## Prerequisites (HUMAN, once)
@@ -93,25 +100,48 @@ SEED
> The KV written here lives in `./local_files/jetstream`, which the cluster unit > The KV written here lives in `./local_files/jetstream`, which the cluster unit
> reuses (`--nats-store` default), so the admin is present when the enforce cluster > reuses (`--nats-store` default), so the admin is present when the enforce cluster
> starts. Additional users are added the same loopback way until a > starts. This loopback bootstrap is needed ONLY for the very first admin (the
> `user add --store kv` exists (see GAP in report 0009). > chicken-and-egg). **Every user after that is added with the cluster live** — no
> stop-seed-restart — via `user add --store kv` (see "Add users to the live
> cluster" below, report 0012).
## Bring up (HUMAN — staggered) ## Bring up (HUMAN)
Bring up the seed first, then the replicas one at a time, checking each joins. > **CORRECTION (report 0012).** The original instruction — "start magnus alone and
> verify healthz, then add the others" — is **WRONG and will look like a hung
> deploy.** A 3-node JetStream cluster forms a RAFT meta-group that needs a quorum
> (2 of 3) to elect a leader. A single started node has no quorum, so its JetStream
> meta never becomes current: `--store kv` blocks creating the KV buckets and
> **`/healthz` never returns ok** until a second node joins. Waiting for magnus to
> "go green" before starting the others therefore deadlocks the rollout.
Start the nodes so a quorum forms. On a **clean cluster** the simplest correct
procedure is to start all three close together and let the meta-group converge:
```bash ```bash
# 1. Seed node (after the seed step above). # Start all three (order does not matter); each blocks on the others until a
ssh root@magnus 'systemctl enable --now membershipd-cluster' # 2/3 quorum elects a JetStream meta leader, then the KV buckets are created.
ssh root@magnus 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt' for h in magnus homer datardos; do ssh "$h" 'sudo systemctl enable --now membershipd-cluster'; done
# 2. Replicas, one at a time. # Only NOW does healthz return ok — once the meta-group has a leader (give it
ssh root@homer 'systemctl enable --now membershipd-cluster' # ~10-30s on a cold start). Poll, do not assume the first node is broken.
ssh root@datardos 'systemctl enable --now membershipd-cluster' for h in magnus homer datardos; do
echo "== $h =="; ssh "$h" 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt || echo "(not ready yet — needs quorum)"'
done
``` ```
> Initial rollout runs at **R1** (`KV_REPLICAS=1` in `nodes.env`): the buckets live A **staggered** start also works, but only because `membershipd`'s KV open RETRIES
> on the seed only. This is NOT HA yet — see "Scale to R3". the bucket creation for a 120s bootstrap budget (issue 0006g, fix #3): the first
node sits in that retry loop — NOT serving healthz — until the second node makes a
quorum, then both converge and the third catches up. Either way, a lone node never
self-serves; do not gate the next node's start on the previous one's healthz.
> A cold multi-node start only converges because of **three cold-start fixes**
> (report 0011): route pooling off (`PoolSize=-1`), `NoAdvertise=true` (Docker
> bridge IPs not gossiped), and the KV-open retry loop above. Without them the
> meta-group re-elects leaders forever and bucket creation hangs. If a fresh
> cluster will not form, confirm the running binary contains these fixes before
> touching config.
## Promote an existing single-node (SQLite) deployment (HUMAN, optional) ## Promote an existing single-node (SQLite) deployment (HUMAN, optional)
@@ -137,11 +167,80 @@ ssh root@magnus 'nats --server nats://127.0.0.1:4250 server list' # 3 servers,
A healthy cluster shows 3 routed servers and a JetStream meta-group with a leader. A healthy cluster shows 3 routed servers and a JetStream meta-group with a leader.
## Scale to R3 (HUMAN — real HA) ## Add users to the live cluster (HUMAN — `user add --store kv`)
Once all three nodes are up and routed, raise the replication factor of every With the cluster up, add (and revoke) bus users **without stopping anything**,
control-plane stream from 1 to 3 IN PLACE (no data loss), then flip `KV_REPLICAS=3` directly against the replicated KV allowlist. This replaces the stop-seed-restart
in `nodes.env` so future (re)deploys keep it: procedure the original runbook implied for every user beyond the first admin.
The mechanism is the cluster's own **privileged internal connection**: under
`enforce` every bus user is confined by the per-subject ACL to its own rooms, so no
ordinary identity may write the control-plane buckets. The only identity the
authenticator grants full JetStream permissions is `membershipd`'s internal service
identity. The unit persists that identity to `${INTERNAL_ID_FILE}`
(`/opt/unibus/secrets/internal.id`, 0600) via `--internal-id-file`, so the same key
is available to the CLI. Run the CLI **on a node, over loopback** (the data-plane
TLS cert SAN covers `127.0.0.1`); reading the identity file requires root on that
node, which already implies full control of it, so this adds no practical exposure.
```bash
# Add a member to the live cluster's replicated allowlist (run on any node).
ssh root@magnus 'sudo /opt/unibus/membershipd user add --store kv \
--handle alice --role member --sign-pub <64-hex-ed25519-pub>'
# -> added user "alice" (...) role=member
# -> KV_UNIBUS_users: leader=<node> followers_current=2/2 msgs=N (replicated, HA)
# List / revoke against the same live KV:
ssh root@magnus 'sudo /opt/unibus/membershipd user list --store kv'
ssh root@magnus 'sudo /opt/unibus/membershipd user revoke --store kv <64-hex-ed25519-pub>'
```
Defaults assume an on-node invocation (`--nats-url nats://127.0.0.1:4250`,
`--internal-id-file /opt/unibus/secrets/internal.id`, `--ca /opt/unibus/tls/ca.crt`,
`--kv-replicas 3`). Semantics:
- **Idempotent / non-destructive**: re-adding the same key is an explicit
`already registered` error (exit 1), never a silent overwrite — a re-add cannot
flip a member to admin. To replace a user, `revoke` then add.
- **HA**: the write commits through the JetStream quorum, so it succeeds even with
one node down (2/3); the printed `followers_current` shows replication.
- **No hard delete**: `revoke` flips status to `revoked` (denied on both planes,
auditable); the KV has no row deletion, matching the SQLite store.
> **Rollout note (report 0012):** the live verification deployed this binary +
> `--internal-id-file` to **datardos only** (the non-critical node). magnus and
> homer still run the 0011 binary. To make the capability available (and the unit)
> on all three — recommended, the posture is identical so there is no urgency — roll
> the new binary with backups, one node at a time, verifying healthz between each:
> ```bash
> for h in homer magnus; do
> ssh "$h" 'sudo cp -a /opt/unibus/membershipd /opt/unibus/membershipd.bak' # backup
> scp build/membershipd "$h:/tmp/m" && ssh "$h" 'sudo install -o ubuntu -g ubuntu -m0775 /tmp/m /opt/unibus/membershipd'
> # add INTERNAL_ID_FILE=/opt/unibus/secrets/internal.id to /opt/unibus/cluster.env
> # add `--internal-id-file ${INTERNAL_ID_FILE} \` to the unit before `--store kv`
> ssh "$h" 'sudo systemctl daemon-reload && sudo systemctl restart membershipd-cluster'
> ssh "$h" 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt' # green before next
> done
> ```
> (`deploy-cluster.sh` + the unit template already emit `INTERNAL_ID_FILE` and the
> flag, so a fresh `./deploy-cluster.sh --yes` is correct for all three.)
## Replication: go straight to R3 (HUMAN — real HA)
> **CORRECTION (report 0012).** The original "start at R1, then scale to R3" plan
> assumed R1 is a usable interim state. **It is not, in this cluster.** At R1 all six
> control-plane buckets (`KV_UNIBUS_users/rooms/members/room_keys/rooms_by_member`
> + `KV_UNIBUS_nonces`) live on a SINGLE node — a hard **SPOF for authentication**:
> if that node dies, the nonce/KV control plane is unreachable and EVERY
> authenticated request fails closed (auth DoS). Worse, the cold multi-node start
> only converges at all because of the three cold-start fixes (see "Bring up"); the
> real deploy never ran a healthy R1 and **jumped straight to R3 once the cluster
> formed.** Treat R1 as a transient artifact of bucket creation, not a milestone.
The deployed config already sets `KV_REPLICAS=3` in `nodes.env`. If buckets were
created at R1 (e.g. only one node was up when `--store kv` first opened them), raise
every control-plane stream to R3 IN PLACE (no data loss) once all three nodes are
routed:
```bash ```bash
for s in KV_UNIBUS_users KV_UNIBUS_rooms KV_UNIBUS_members KV_UNIBUS_room_keys \ for s in KV_UNIBUS_users KV_UNIBUS_rooms KV_UNIBUS_members KV_UNIBUS_room_keys \
@@ -151,27 +250,32 @@ done
# (also OBJ_UNIBUS_blobs if the object store is in use) # (also OBJ_UNIBUS_blobs if the object store is in use)
``` ```
Until this is done, R1 means the seed node is a **single point of failure for After this each bucket shows `followers_current=2/2` (quorum 2/3). The
authentication**: if it dies, the nonce/KV control plane is unreachable and every `user add --store kv` command prints that figure for `KV_UNIBUS_users` on every add,
authenticated request fails closed (auth DoS). R1 is a rollout step, not HA. which is a cheap live HA check.
## Chaos test (HUMAN — requires the 3 live VPS; NOT run here) ## Chaos test (HUMAN — requires the 3 live VPS)
Validate quorum tolerance after R3: Validate quorum tolerance after R3:
```bash ```bash
# Kill one node; the cluster keeps serving (quorum 2/3). # Kill one node; the cluster keeps serving (quorum 2/3). On ubuntu nodes use sudo.
ssh root@datardos 'systemctl stop membershipd-cluster' ssh dd 'sudo systemctl stop membershipd-cluster'
# -> clients fail over (multiple seed URLs); reads/writes still succeed. # -> clients fail over (multiple seed URLs); reads/writes still succeed.
ssh root@datardos 'systemctl start membershipd-cluster' # rejoins, catches up ssh dd 'sudo systemctl start membershipd-cluster' # rejoins, catches up
# Kill two nodes; quorum is LOST — the control plane should fail CLOSED (deny), # Kill two nodes; quorum is LOST — the control plane should fail CLOSED (deny),
# never fail open. Verify a request is rejected, not silently served. # never fail open. Verify a request is rejected, not silently served.
``` ```
This network-level chaos test (kill 1/3, kill 2/3, partition/split-brain) is part > **Validated (report 0012).** The 0011 chaos run checked only the control plane
of the deploy validation (issue 0003f) and runs against the real VPS — it is > (healthz + meta/stream-leader failover + KV readable with 2/3). Report 0012 added
deliberately out of scope for the authoring agent. > the missing data-plane proofs against the live cluster: a real authenticated
> client (`cmd/clientcheck`, operator identity, nkey+TLS) creating an E2E room and
> publishing/subscribing — including a node stopped mid-stream, where the client
> failed over to a survivor and kept receiving with zero loss (quorum 2/3) — and
> `user add --store kv` committing with one node (the KV leader) down. The kill-2/3
> fail-closed case remains a documented manual step.
## Rollback ## Rollback
+12 -8
View File
@@ -97,6 +97,7 @@ TLS_KEY=${REMOTE_DIR}/tls/server-${name}.key
ROUTE_TLS_CERT=${REMOTE_DIR}/tls/route-${name}.crt ROUTE_TLS_CERT=${REMOTE_DIR}/tls/route-${name}.crt
ROUTE_TLS_KEY=${REMOTE_DIR}/tls/route-${name}.key ROUTE_TLS_KEY=${REMOTE_DIR}/tls/route-${name}.key
ROUTE_TLS_CA=${REMOTE_DIR}/tls/cluster-ca.crt ROUTE_TLS_CA=${REMOTE_DIR}/tls/cluster-ca.crt
INTERNAL_ID_FILE=${REMOTE_DIR}/secrets/internal.id
EOF EOF
run ssh "$target" "mkdir -p ${REMOTE_DIR}/tls ${REMOTE_DIR}/secrets" run ssh "$target" "mkdir -p ${REMOTE_DIR}/tls ${REMOTE_DIR}/secrets"
@@ -114,13 +115,16 @@ if [[ $APPLY -eq 0 ]]; then
fi fi
cat <<'NEXT' cat <<'NEXT'
HUMAN — staggered start (do NOT enable all at once; see README "Bring up"): HUMAN — bring up (see README "Bring up" — a LONE node has no quorum and never
1. Seed node first (e.g. magnus): serves healthz, so do NOT gate the next node on the previous one going green):
ssh root@magnus 'systemctl enable --now membershipd-cluster' 1. Seed the FIRST admin into the KV via the loopback bootstrap (README
ssh root@magnus '/opt/unibus/membershipd user add --admin ...' # seed admin "Seed the first admin"); this is needed only for the chicken-and-egg admin.
2. Then the other two, one at a time, checking quorum after each: 2. Start all three so a 2/3 quorum forms (order does not matter); healthz
ssh root@homer 'systemctl enable --now membershipd-cluster' turns ok only once the meta-group elects a leader (~10-30s cold):
ssh root@datardos 'systemctl enable --now membershipd-cluster' for h in magnus homer datardos; do ssh "$h" 'sudo systemctl enable --now membershipd-cluster'; done
3. Verify posture + quorum (README "Verify"). 3. Verify posture + quorum (README "Verify").
4. Scale replicas 1 -> 3 once all three are up (README "Scale to R3"). 4. Ensure R3 on every control-plane stream (README "Replication: go straight to
R3"); R1 is a SPOF, not a milestone.
5. Add further users with the cluster LIVE — no restart — via
`membershipd user add --store kv` (README "Add users to the live cluster").
NEXT NEXT
@@ -33,6 +33,7 @@ ExecStart=/opt/unibus/membershipd \
--route-tls-cert ${ROUTE_TLS_CERT} \ --route-tls-cert ${ROUTE_TLS_CERT} \
--route-tls-key ${ROUTE_TLS_KEY} \ --route-tls-key ${ROUTE_TLS_KEY} \
--route-tls-ca ${ROUTE_TLS_CA} \ --route-tls-ca ${ROUTE_TLS_CA} \
--internal-id-file ${INTERNAL_ID_FILE} \
--store kv \ --store kv \
--kv-replicas ${KV_REPLICAS} --kv-replicas ${KV_REPLICAS}
# Restart=always (NOT on-failure): a clean SIGTERM exits success, and on-failure # Restart=always (NOT on-failure): a clean SIGTERM exits success, and on-failure
+21 -8
View File
@@ -2,10 +2,10 @@
# #
# This file is SOURCED by generate-cluster-certs.sh and deploy-cluster.sh. # This file is SOURCED by generate-cluster-certs.sh and deploy-cluster.sh.
# #
# HUMAN: fill in every <PLACEHOLDER> with the real value before running the # HUMAN: fill in every placeholder with the real value before running the
# scripts. The public IPs known at authoring time are pre-filled; the WireGuard # scripts. The public IPs known at authoring time are pre-filled; the WireGuard
# mesh IPs and magnus's public IP must be supplied. The scripts refuse to run # mesh IPs and magnus's public IP must be supplied. The scripts refuse to run
# while any <PLACEHOLDER> remains. # while any unfilled placeholder remains.
# Cluster identity (must be identical on every node). # Cluster identity (must be identical on every node).
CLUSTER_NAME="unibus" CLUSTER_NAME="unibus"
@@ -16,7 +16,7 @@ CLUSTER_USER="unibus-cluster"
# KV/nonce replication factor. START AT 1 for the initial 1->3 rollout, then raise # KV/nonce replication factor. START AT 1 for the initial 1->3 rollout, then raise
# to 3 IN PLACE (see README "Scale to R3") once all three nodes have joined. Only # to 3 IN PLACE (see README "Scale to R3") once all three nodes have joined. Only
# set this to 3 here after the third node is up and you re-run the KV update. # set this to 3 here after the third node is up and you re-run the KV update.
KV_REPLICAS=1 KV_REPLICAS=3
# Ports (same on every node; the route port is server-to-server only). # Ports (same on every node; the route port is server-to-server only).
NATS_CLIENT_PORT=4250 NATS_CLIENT_PORT=4250
@@ -30,15 +30,28 @@ SSH_USER="root"
# Which address family the inter-node routes use. "wg" builds --routes from the # Which address family the inter-node routes use. "wg" builds --routes from the
# WireGuard mesh IPs (private server-to-server links, preferred); "public" uses # WireGuard mesh IPs (private server-to-server links, preferred); "public" uses
# the public IPs. The route layer is always mutual-TLS regardless. # the public IPs. The route layer is always mutual-TLS regardless.
ROUTE_NETWORK="wg" #
# DEPLOY DECISION (2026-06-07): set to "public". No WireGuard mesh exists between
# the three cluster nodes — homer and datardos do not even have the `wg` binary
# installed, and om's only WG peers are the operator's personal PCs, not the VPS.
# Rather than stand up a fresh mesh blindly, the routes go over the public IPs,
# still protected by the separate cluster route CA (mutual-TLS). On magnus (the
# only node with ufw active) the route port 6250 is restricted to the homer and
# datardos public IPs; homer/datardos run ufw inactive (Docker hosts) and rely on
# the route mutual-TLS for 6250.
ROUTE_NETWORK="public"
# One row per node: NAME SSH_HOST PUBLIC_IP WG_IP # One row per node: NAME SSH_HOST PUBLIC_IP WG_IP
# NAME -> --server-name and the per-node cert filenames (unique). # NAME -> --server-name and the per-node cert filenames (unique).
# SSH_HOST -> the `ssh <SSH_HOST>` alias (see ~/.ssh/config). # SSH_HOST -> the `ssh ALIAS` alias (see ~/.ssh/config).
# PUBLIC_IP -> public address; goes in the cert SANs (client-facing data plane). # PUBLIC_IP -> public address; goes in the cert SANs (client-facing data plane).
# WG_IP -> WireGuard mesh address; cert SAN + route target when ROUTE_NETWORK=wg. # WG_IP -> WireGuard mesh address; cert SAN + route target when ROUTE_NETWORK=wg.
# NOTE: with ROUTE_NETWORK=public and no WireGuard mesh, the WG_IP column is set to
# each node's public IP so the cert SAN covers the address actually used by the
# public routes and no unfilled placeholder remains (scripts refuse to run otherwise).
# magnus == organic-machine.com == om (135.125.201.30); SSH alias `magnus` enters as root.
CLUSTER_NODES=( CLUSTER_NODES=(
"magnus magnus <MAGNUS_PUBLIC_IP> <MAGNUS_WG_IP>" "magnus magnus 135.125.201.30 135.125.201.30"
"homer homer 141.94.69.66 <HOMER_WG_IP>" "homer homer 141.94.69.66 141.94.69.66"
"datardos dd 51.91.100.142 <DATARDOS_WG_IP>" "datardos dd 51.91.100.142 51.91.100.142"
) )
+21 -5
View File
@@ -33,11 +33,17 @@ type identityFile struct {
KexPriv string `json:"kex_priv"` KexPriv string `json:"kex_priv"`
} }
// LoadOrCreateIdentity loads the identity at path, or generates and persists a // LoadIdentity loads an existing identity from path. Unlike LoadOrCreateIdentity
// new one if the file does not exist. The file is written with 0600 // it NEVER creates one: a missing or unreadable file is an error. It is for
// permissions because it holds private keys. // callers that must consume a specific, pre-provisioned identity rather than mint
func LoadOrCreateIdentity(path string) (cs.Identity, error) { // a fresh one — for example membershipd's persisted internal service identity,
if data, err := os.ReadFile(path); err == nil { // which `membershipd user add --store kv` reads to present the privileged nkey
// the cluster authenticator recognizes.
func LoadIdentity(path string) (cs.Identity, error) {
data, err := os.ReadFile(path)
if err != nil {
return cs.Identity{}, fmt.Errorf("client: read identity %q: %w", path, err)
}
var f identityFile var f identityFile
if err := json.Unmarshal(data, &f); err != nil { if err := json.Unmarshal(data, &f); err != nil {
return cs.Identity{}, fmt.Errorf("client: parse identity %q: %w", path, err) return cs.Identity{}, fmt.Errorf("client: parse identity %q: %w", path, err)
@@ -49,6 +55,16 @@ func LoadOrCreateIdentity(path string) (cs.Identity, error) {
return id, nil return id, nil
} }
// LoadOrCreateIdentity loads the identity at path, or generates and persists a
// new one if the file does not exist. The file is written with 0600
// permissions because it holds private keys. A file that exists but is
// unreadable or corrupt is an error (NOT silently regenerated), so a damaged
// identity surfaces instead of minting a new key that cannot decrypt old data.
func LoadOrCreateIdentity(path string) (cs.Identity, error) {
if _, statErr := os.Stat(path); statErr == nil {
return LoadIdentity(path)
}
id, err := cs.GenerateIdentity() id, err := cs.GenerateIdentity()
if err != nil { if err != nil {
return cs.Identity{}, fmt.Errorf("client: generate identity: %w", err) return cs.Identity{}, fmt.Errorf("client: generate identity: %w", err)
+37 -1
View File
@@ -9,6 +9,7 @@ import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net/url" "net/url"
"os"
"time" "time"
server "github.com/nats-io/nats-server/v2/server" server "github.com/nats-io/nats-server/v2/server"
@@ -106,6 +107,13 @@ func StartHostAuth(storeDir, host string, port int, auth server.Authentication)
// blocks until the server is ready to accept connections (up to 5s) and returns // blocks until the server is ready to accept connections (up to 5s) and returns
// the running server; the caller must Shutdown it. // the running server; the caller must Shutdown it.
func StartServer(cfg ServerConfig) (*server.Server, error) { func StartServer(cfg ServerConfig) (*server.Server, error) {
// Diagnostic toggle: UNIBUS_NATS_DEBUG=1 enables the embedded nats-server's own
// logger (route/RAFT/JetStream errors), which is otherwise silenced. Off by
// default so production behavior is unchanged; only set it when debugging the
// cluster route layer.
debugLevel := os.Getenv("UNIBUS_NATS_DEBUG")
debugNATS := debugLevel == "1" || debugLevel == "2"
traceNATS := debugLevel == "2"
opts := &server.Options{ opts := &server.Options{
JetStream: true, JetStream: true,
StoreDir: cfg.StoreDir, StoreDir: cfg.StoreDir,
@@ -114,9 +122,18 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
ServerName: cfg.ServerName, ServerName: cfg.ServerName,
DontListen: false, DontListen: false,
// Keep the embedded server quiet by default; the host app logs the URLs. // Keep the embedded server quiet by default; the host app logs the URLs.
NoLog: true, NoLog: !debugNATS,
Debug: debugNATS,
Trace: traceNATS,
Logtime: true,
NoSigs: true, NoSigs: true,
} }
if debugNATS {
// Expose the nats-server monitoring endpoint (loopback) so the operator can
// inspect /jsz, /routez, /varz while debugging the cluster meta-group.
opts.HTTPHost = "127.0.0.1"
opts.HTTPPort = 8222
}
if cfg.Auth != nil { if cfg.Auth != nil {
opts.CustomClientAuthentication = cfg.Auth opts.CustomClientAuthentication = cfg.Auth
// A CustomClientAuthentication alone does not make the server advertise a // A CustomClientAuthentication alone does not make the server advertise a
@@ -141,6 +158,10 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
return nil, fmt.Errorf("embeddednats: new server: %w", err) return nil, fmt.Errorf("embeddednats: new server: %w", err)
} }
if debugNATS {
ns.ConfigureLogger()
}
go ns.Start() go ns.Start()
if !ns.ReadyForConnections(5 * time.Second) { if !ns.ReadyForConnections(5 * time.Second) {
@@ -162,6 +183,21 @@ func applyClusterOpts(opts *server.Options, c *ClusterConfig) error {
Port: c.Port, Port: c.Port,
Username: c.Username, Username: c.Username,
Password: c.Password, Password: c.Password,
// Disable route connection pooling (nats-server 2.10+ defaults to a pool of
// 3 connections per peer). On a small cluster the pool churns with
// "duplicate route"/"client closed" reconnects that interrupt the meta-group
// RAFT heartbeats, causing perpetual leader re-elections so the JetStream
// meta never becomes current and stream/KV creation hangs (issue 0006g).
// PoolSize=-1 forces the classic single route per peer, which is stable for
// the 3-node unibus cluster.
PoolSize: -1,
// NoAdvertise stops the server from gossiping its locally-discovered IPs to
// peers. The cluster nodes are Docker hosts, so without this NATS advertises
// the docker bridge addresses (172.x / 10.0.x) as reachable routes; peers
// then try to dial those private, mutually-unreachable IPs, churning the
// route layer and destabilizing the JetStream meta-group. With NoAdvertise
// the nodes use ONLY the explicit public-IP routes we configure (issue 0006g).
NoAdvertise: true,
} }
if c.TLS != nil { if c.TLS != nil {
opts.Cluster.TLSConfig = c.TLS opts.Cluster.TLSConfig = c.TLS
+28 -5
View File
@@ -85,8 +85,18 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
if opTimeout <= 0 { if opTimeout <= 0 {
opTimeout = defaultKVOpTime opTimeout = defaultKVOpTime
} }
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) // Bootstrap budget for creating/opening the buckets. On a single node JetStream
defer cancel() // is ready the instant the server starts, so the first attempt succeeds. On a
// COLD multi-node cluster the JetStream meta-group must first elect a leader and
// each node must establish contact with it before its $JS.API responds. A KV
// op is a NATS request/reply: if it is published before the node's JetStream is
// ready the request is dropped (not queued), and a single long-context call then
// just blocks until it times out (issue 0006g). So we RETRY each bucket op with
// short per-attempt contexts until it succeeds or the overall bootstrap budget
// is exhausted; once the cluster is ready the next retry lands and the buckets
// are created, after which they persist and every node opens them quickly.
bootstrapBudget := 120 * time.Second
deadline := time.Now().Add(bootstrapBudget)
s := &jetstreamStore{opTimeout: opTimeout} s := &jetstreamStore{opTimeout: opTimeout}
for _, b := range []struct { for _, b := range []struct {
@@ -99,14 +109,27 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
{bucketRoomKeys, &s.keys}, {bucketRoomKeys, &s.keys},
{bucketUsers, &s.users}, {bucketUsers, &s.users},
} { } {
kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{ var kv jetstream.KeyValue
var lastErr error
for {
opCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
kv, lastErr = js.CreateOrUpdateKeyValue(opCtx, jetstream.KeyValueConfig{
Bucket: b.name, Bucket: b.name,
Replicas: cfg.Replicas, Replicas: cfg.Replicas,
History: 1, History: 1,
Storage: jetstream.FileStorage, Storage: jetstream.FileStorage,
}) })
if err != nil { cancel()
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d): %w", b.name, cfg.Replicas, err) if lastErr == nil {
break
}
if time.Now().After(deadline) {
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d) after %s: %w", b.name, cfg.Replicas, bootstrapBudget, lastErr)
}
// JetStream not ready yet (no meta leader / request dropped). Wait and
// re-publish the op; in a cluster cold start this lands once the meta
// group settles.
time.Sleep(1 * time.Second)
} }
*b.dst = kv *b.dst = kv
} }