Merge quick/0011-deploy-gaps: live user-add --store kv + clientcheck E2E + runbook fixes (report 0012)
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
name: unibus
|
||||
lang: go
|
||||
domain: infra
|
||||
version: 0.8.0
|
||||
version: 0.9.0
|
||||
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
|
||||
tags: [service, messaging, nats, e2e]
|
||||
uses_functions:
|
||||
@@ -154,6 +154,30 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
|
||||
|
||||
## Capability growth log
|
||||
|
||||
- v0.9.0 (2026-06-07) — cierre de los gaps que el despliegue del cluster (report
|
||||
0011) dejó abiertos (report 0012). (GAP A) Nueva capability `membershipd user
|
||||
add|list|revoke --store kv`: alta/baja de usuarios contra el KV replicado del
|
||||
cluster EN MARCHA, sin el procedimiento de parar-sembrar-rearrancar. Usa la
|
||||
conexión interna privilegiada — el daemon persiste su identidad de servicio con
|
||||
`--internal-id-file` (cada nodo genera/carga la suya, 0600 junto a las claves TLS)
|
||||
y la CLI, ejecutada por loopback en un nodo, presenta esa nkey que el
|
||||
autenticador reconoce con permisos plenos de JetStream; ninguna identidad de
|
||||
usuario normal puede tocar los buckets `KV_UNIBUS_*` bajo la ACL por-subject. El
|
||||
alta es idempotente (re-alta de la misma clave = `ErrUserExists` explícito, sin
|
||||
sobrescribir ni elevar rol), commitea con quórum 2/3 (HA, imprime
|
||||
`followers_current`) y rechaza un destino remoto sin `--ca` (igual que
|
||||
`migrate-to-kv`). (GAP B) Nuevo `cmd/clientcheck`: verificación end-to-end real
|
||||
con un cliente autenticado (identidad operator, nkey+TLS+https) que crea una room
|
||||
E2E, publica y recibe descifrado contra el cluster vivo, incluido un nodo parado a
|
||||
media transmisión donde el cliente hace failover a un superviviente y sigue
|
||||
recibiendo con cero pérdida (quórum 2/3) — el plano de datos que el chaos test del
|
||||
0011 nunca probó. (GAP C) Runbook `deploy/cluster/README.md` corregido: el orden
|
||||
de arranque "magnus solo y verifica healthz" deadlockeaba (un nodo solo no tiene
|
||||
quórum del meta-group y nunca sirve healthz); se documenta el arranque por quórum,
|
||||
que R1 es un SPOF inservible (ir directo a R3) y la nueva vía de alta con el
|
||||
cluster vivo. La plantilla de deploy (unit + `deploy-cluster.sh`) emite ya
|
||||
`INTERNAL_ID_FILE` y el flag. Verificado contra los 3 VPS reales (magnus + homer +
|
||||
datardos); posture enforce+ACL+TLS+R3 intacta.
|
||||
- v0.8.0 (2026-06-07) — completar y endurecer el cluster (issue 0006, fases
|
||||
0006a–0006g) que cierra los bloqueantes de la auditoría dedicada del cluster
|
||||
(report 0008) y cablea el control plane descentralizado que 0003 dejó a medias.
|
||||
|
||||
@@ -0,0 +1,260 @@
|
||||
// Command clientcheck is an end-to-end verification client for a live unibus
|
||||
// cluster (issue 0011 GAP B). The 0011 chaos test validated only the control
|
||||
// plane (healthz + meta/stream-leader failover + KV readable with 2/3); it never
|
||||
// connected an authenticated bus client (nkey + TLS) to create a room and
|
||||
// publish/subscribe through it, least of all across a node loss. clientcheck does
|
||||
// exactly that with a real identity (the operator), so the data-plane end-to-end
|
||||
// path — connect, create an E2E room, publish, receive decrypted — is exercised
|
||||
// against the running cluster, including while a node is stopped.
|
||||
//
|
||||
// It is a reusable tool, not a throwaway script: point it at the cluster's CA,
|
||||
// an identity file, and the NATS + control-plane seed lists.
|
||||
//
|
||||
// # golden: connect, create an E2E room, publish N, confirm N decrypted back
|
||||
// clientcheck --ca ca.crt --identity-file operator.id \
|
||||
// --nats-seeds nats://A:4250,nats://B:4250,nats://C:4250 \
|
||||
// --ctrl-seeds https://A:8470,https://B:8470,https://C:8470 --messages 5
|
||||
//
|
||||
// # loop: publish a counter every interval for the duration, logging the node
|
||||
// # it is attached to — stop a node mid-run (systemctl stop membershipd-cluster)
|
||||
// # and watch it fail over to a survivor and keep receiving (quorum 2/3).
|
||||
// clientcheck ... --mode loop --duration 45s --interval 1s
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/enmanuel/unibus/pkg/room"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var (
|
||||
caPath = flag.String("ca", "", "bus CA cert pinning TLS on both planes (required for a secured cluster)")
|
||||
idFile = flag.String("identity-file", "", "path to the client identity JSON (e.g. `pass show unibus/operator-identity` written 0600) (required)")
|
||||
natsSeeds = flag.String("nats-seeds", "", "comma-separated NATS urls of the cluster nodes (required)")
|
||||
ctrlSeeds = flag.String("ctrl-seeds", "", "comma-separated control-plane https urls of the cluster nodes (required)")
|
||||
subject = flag.String("subject", "test.gapcheck", "test room subject PREFIX; a random token is appended so runs never collide with real rooms")
|
||||
messages = flag.Int("messages", 5, "golden mode: number of messages to publish and expect back")
|
||||
mode = flag.String("mode", "golden", "golden (publish N, verify N decrypted) | loop (publish a counter for --duration, for failover testing)")
|
||||
duration = flag.Duration("duration", 30*time.Second, "loop mode: how long to keep publishing")
|
||||
interval = flag.Duration("interval", 1*time.Second, "loop mode: delay between published messages")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
if *idFile == "" || *natsSeeds == "" || *ctrlSeeds == "" {
|
||||
log.Fatalf("clientcheck: --identity-file, --nats-seeds and --ctrl-seeds are required")
|
||||
}
|
||||
|
||||
id, err := client.LoadIdentity(*idFile)
|
||||
if err != nil {
|
||||
log.Fatalf("clientcheck: load identity: %v", err)
|
||||
}
|
||||
natsList := splitCSV(*natsSeeds)
|
||||
ctrlList := splitCSV(*ctrlSeeds)
|
||||
if len(natsList) == 0 || len(ctrlList) == 0 {
|
||||
log.Fatalf("clientcheck: empty --nats-seeds or --ctrl-seeds")
|
||||
}
|
||||
|
||||
// Build the secure client options: nkey on the data plane, TLS pinned to the
|
||||
// bus CA on both planes, and the FULL seed lists so nats.go fails over to a
|
||||
// surviving node when the attached one dies (the failover this tool verifies).
|
||||
opts := client.Options{
|
||||
NatsServers: natsList[1:],
|
||||
CtrlURLs: ctrlList[1:],
|
||||
}
|
||||
if *caPath != "" {
|
||||
tlsCfg, err := busauth.LoadCATLSConfig(*caPath)
|
||||
if err != nil {
|
||||
log.Fatalf("clientcheck: load CA: %v", err)
|
||||
}
|
||||
opts.UseNkey = true
|
||||
opts.TLS = tlsCfg
|
||||
opts.CtrlTLS = tlsCfg
|
||||
for _, u := range ctrlList {
|
||||
if !strings.HasPrefix(u, "https://") {
|
||||
log.Fatalf("clientcheck: control URL %q must be https:// when --ca is set", u)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c, err := client.NewWithOptions(natsList[0], ctrlList[0], id, opts)
|
||||
if err != nil {
|
||||
log.Fatalf("clientcheck: connect: %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
log.Printf("connected: endpoint=%s nats=%s", c.Endpoint().ID, c.ConnectedServer())
|
||||
|
||||
// Create an EPHEMERAL E2E room (encrypted + signed, NOT persisted): the test
|
||||
// stays end-to-end encrypted (the cluster requires encryption on a public
|
||||
// bind) while leaving no durable JetStream stream behind. The random subject
|
||||
// token guarantees the room is unique and never a real room.
|
||||
rnd := make([]byte, 8)
|
||||
if _, err := rand.Read(rnd); err != nil {
|
||||
log.Fatalf("clientcheck: random: %v", err)
|
||||
}
|
||||
subj := fmt.Sprintf("%s.%s", *subject, hex.EncodeToString(rnd))
|
||||
policy := room.Policy{Encrypt: true, Persist: false, SignMsgs: true}
|
||||
roomID, err := c.CreateRoom(subj, policy)
|
||||
if err != nil {
|
||||
log.Fatalf("clientcheck: create room: %v", err)
|
||||
}
|
||||
log.Printf("created E2E room: id=%s subject=%s (encrypt=%v sign=%v persist=%v)", roomID, subj, policy.Encrypt, policy.SignMsgs, policy.Persist)
|
||||
|
||||
// Under the per-subject ACL, NATS freezes permissions at connect time, so the
|
||||
// just-created room's subject is not yet publishable/subscribable on the live
|
||||
// connection. RefreshSession reconnects so the authenticator re-derives the
|
||||
// ACL (now including this room) — the post-0006 contract every client follows
|
||||
// after a membership change.
|
||||
if err := c.RefreshSession(); err != nil {
|
||||
log.Fatalf("clientcheck: refresh session: %v", err)
|
||||
}
|
||||
|
||||
switch *mode {
|
||||
case "golden":
|
||||
runGolden(c, roomID, *messages)
|
||||
case "loop":
|
||||
runLoop(c, roomID, *duration, *interval)
|
||||
default:
|
||||
log.Fatalf("clientcheck: --mode must be golden or loop, got %q", *mode)
|
||||
}
|
||||
}
|
||||
|
||||
// runGolden subscribes, publishes n messages, and asserts all n come back
|
||||
// decrypted. Exits non-zero if any are missing.
|
||||
func runGolden(c *client.Client, roomID string, n int) {
|
||||
var mu sync.Mutex
|
||||
got := map[string]bool{}
|
||||
sub, err := c.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
|
||||
mu.Lock()
|
||||
got[string(plaintext)] = true
|
||||
mu.Unlock()
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("clientcheck: subscribe: %v", err)
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
time.Sleep(300 * time.Millisecond) // let the subscription settle
|
||||
|
||||
want := make([]string, n)
|
||||
for i := 0; i < n; i++ {
|
||||
msg := fmt.Sprintf("gapcheck-e2e-%d", i)
|
||||
want[i] = msg
|
||||
if err := c.Publish(roomID, []byte(msg)); err != nil {
|
||||
log.Fatalf("clientcheck: publish %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
log.Printf("published %d messages to %s; waiting for decrypted echoes...", n, roomID)
|
||||
|
||||
deadline := time.Now().Add(15 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
mu.Lock()
|
||||
have := len(got)
|
||||
mu.Unlock()
|
||||
if have >= n {
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
missing := 0
|
||||
for _, w := range want {
|
||||
if !got[w] {
|
||||
missing++
|
||||
log.Printf(" MISSING: %q", w)
|
||||
}
|
||||
}
|
||||
log.Printf("connected node at finish: %s", c.ConnectedServer())
|
||||
if missing > 0 {
|
||||
log.Fatalf("GOLDEN FAIL: %d/%d messages not received decrypted", missing, n)
|
||||
}
|
||||
log.Printf("GOLDEN OK: all %d messages received and decrypted end-to-end", n)
|
||||
}
|
||||
|
||||
// runLoop publishes a numbered message every interval for the duration and logs
|
||||
// the count received plus the node currently attached, so an operator stopping a
|
||||
// cluster node mid-run sees the client fail over to a survivor and keep receiving
|
||||
// (quorum 2/3). It is the live failover-with-a-connected-client test the 0011
|
||||
// chaos run never performed.
|
||||
func runLoop(c *client.Client, roomID string, duration, interval time.Duration) {
|
||||
var mu sync.Mutex
|
||||
received := 0
|
||||
servers := map[string]int{} // node -> #ticks observed attached
|
||||
sub, err := c.Subscribe(roomID, func(_ frame.Frame, _ []byte) {
|
||||
mu.Lock()
|
||||
received++
|
||||
mu.Unlock()
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("clientcheck: subscribe: %v", err)
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
log.Printf("loop: publishing every %s for %s — stop a node now to test failover", interval, duration)
|
||||
end := time.Now().Add(duration)
|
||||
sent := 0
|
||||
for time.Now().Before(end) {
|
||||
msg := fmt.Sprintf("gapcheck-loop-%d", sent)
|
||||
err := c.Publish(roomID, []byte(msg))
|
||||
sent++
|
||||
mu.Lock()
|
||||
recv := received
|
||||
mu.Unlock()
|
||||
node := c.ConnectedServer()
|
||||
up := c.IsConnected()
|
||||
if node != "" {
|
||||
mu.Lock()
|
||||
servers[node]++
|
||||
mu.Unlock()
|
||||
}
|
||||
pubStatus := "ok"
|
||||
if err != nil {
|
||||
pubStatus = "ERR:" + err.Error()
|
||||
}
|
||||
log.Printf(" t=%2ds sent=%d recv=%d up=%v node=%s publish=%s",
|
||||
sent, sent, recv, up, node, pubStatus)
|
||||
time.Sleep(interval)
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
log.Printf("loop done: sent=%d received=%d", sent, received)
|
||||
nodes := make([]string, 0, len(servers))
|
||||
for n := range servers {
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
sort.Strings(nodes)
|
||||
for _, n := range nodes {
|
||||
log.Printf(" attached to %s for %d ticks", n, servers[n])
|
||||
}
|
||||
if len(servers) > 1 {
|
||||
log.Printf("FAILOVER OBSERVED: client was attached to %d distinct nodes across the run", len(servers))
|
||||
}
|
||||
if received == 0 {
|
||||
log.Fatalf("LOOP FAIL: received 0 messages")
|
||||
}
|
||||
log.Printf("LOOP OK: client kept receiving across the run (received=%d)", received)
|
||||
}
|
||||
|
||||
func splitCSV(s string) []string {
|
||||
var out []string
|
||||
for _, p := range strings.Split(s, ",") {
|
||||
if p = strings.TrimSpace(p); p != "" {
|
||||
out = append(out, p)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
package main
|
||||
|
||||
// Integration tests for issue 0011 GAP A: `membershipd user add --store kv`
|
||||
// adds users to a RUNNING cluster's replicated allowlist via the privileged
|
||||
// internal connection, instead of the stop-seed-restart procedure the 0011
|
||||
// deploy required. These exercise the real connectKVStore path (load the
|
||||
// persisted internal identity from a file, present its nkey, open the KV store,
|
||||
// write the user) against an embedded enforce node, plus the idempotency and
|
||||
// error semantics the DoD calls for. Multi-node replication and node-down quorum
|
||||
// are validated against the live cluster (report 0012).
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
)
|
||||
|
||||
// startEnforceKVNode boots a single embedded enforce node whose authenticator
|
||||
// recognizes internalPubHex as the privileged internal identity, bootstraps the
|
||||
// KV control-plane store over the in-process internal connection, and publishes
|
||||
// it into the holder — the exact sequence main.go performs for --store kv. It
|
||||
// returns the client URL the CLI connects to.
|
||||
func startEnforceKVNode(t *testing.T, internalID cs.Identity) string {
|
||||
t.Helper()
|
||||
holder := &storeHolder{}
|
||||
auth := busauth.NewNkeyAuthenticatorACLInternal(
|
||||
holder.IsAuthorized,
|
||||
busauth.PermissionsFromSubjects(holder.subjectACL),
|
||||
hex.EncodeToString(internalID.SignPub),
|
||||
)
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t), Auth: auth,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("start enforce node: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||
|
||||
intNC, js, err := connectInternalJS(ns, internalID, true)
|
||||
if err != nil {
|
||||
t.Fatalf("bootstrap internal connection: %v", err)
|
||||
}
|
||||
t.Cleanup(intNC.Close)
|
||||
kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
|
||||
if err != nil {
|
||||
t.Fatalf("bootstrap KV store: %v", err)
|
||||
}
|
||||
holder.set(kvStore)
|
||||
return ns.ClientURL()
|
||||
}
|
||||
|
||||
// TestUserAddStoreKV_GoldenAndIdempotent is the GAP A golden + edge-1: the CLI
|
||||
// connection (real connectKVStore, loading the internal identity from a file and
|
||||
// presenting its nkey) writes a user into the live KV allowlist, the user is
|
||||
// authorized afterward, and re-adding the same key is an explicit ErrUserExists
|
||||
// with no corruption (the unchanged row is still authorized).
|
||||
func TestUserAddStoreKV_GoldenAndIdempotent(t *testing.T) {
|
||||
idFile := filepath.Join(t.TempDir(), "internal.id")
|
||||
internalID, err := client.LoadOrCreateIdentity(idFile) // persists 0600
|
||||
if err != nil {
|
||||
t.Fatalf("persist internal identity: %v", err)
|
||||
}
|
||||
url := startEnforceKVNode(t, internalID)
|
||||
|
||||
// Golden: connect as the privileged internal identity (loopback, no TLS) and
|
||||
// add a new user, exactly as `user add --store kv` does.
|
||||
kv, err := connectKVStore(url, idFile, "", 1)
|
||||
if err != nil {
|
||||
t.Fatalf("connectKVStore (privileged): %v", err)
|
||||
}
|
||||
defer kv.Close()
|
||||
|
||||
newUser, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("new user identity: %v", err)
|
||||
}
|
||||
pub := hex.EncodeToString(newUser.SignPub)
|
||||
if err := kv.store.AddUser(pub, "gapcheck_user", membership.RoleMember); err != nil {
|
||||
t.Fatalf("add user to live KV: %v", err)
|
||||
}
|
||||
if !kv.store.IsAuthorized(pub) {
|
||||
t.Fatalf("user added to KV must be authorized")
|
||||
}
|
||||
|
||||
// Edge 1: re-adding the same key is a clean, non-destructive ErrUserExists.
|
||||
err = kv.store.AddUser(pub, "gapcheck_user", membership.RoleMember)
|
||||
if !errors.Is(err, membership.ErrUserExists) {
|
||||
t.Fatalf("re-add must return ErrUserExists (idempotent), got %v", err)
|
||||
}
|
||||
// A different handle/role with the SAME key is also rejected — the row is not
|
||||
// silently overwritten (no role flip).
|
||||
if err := kv.store.AddUser(pub, "impostor", membership.RoleAdmin); !errors.Is(err, membership.ErrUserExists) {
|
||||
t.Fatalf("re-add with a different role must NOT overwrite; want ErrUserExists, got %v", err)
|
||||
}
|
||||
u, err := kv.store.GetUser(pub)
|
||||
if err != nil {
|
||||
t.Fatalf("get user: %v", err)
|
||||
}
|
||||
if u.Handle != "gapcheck_user" || u.Role != membership.RoleMember || u.Status != membership.StatusActive {
|
||||
t.Fatalf("idempotent re-add corrupted the row: %+v", u)
|
||||
}
|
||||
}
|
||||
|
||||
// TestUserAddStoreKV_RequiresInternalIdentity: --store kv without a usable
|
||||
// internal identity file fails loudly (missing file, empty path) rather than
|
||||
// silently connecting unprivileged.
|
||||
func TestUserAddStoreKV_RequiresInternalIdentity(t *testing.T) {
|
||||
if _, err := connectKVStore("nats://127.0.0.1:4250", "", "", 1); err == nil {
|
||||
t.Fatalf("empty --internal-id-file must be an error")
|
||||
}
|
||||
missing := filepath.Join(t.TempDir(), "nope.id")
|
||||
if _, err := connectKVStore("nats://127.0.0.1:4250", missing, "", 1); err == nil {
|
||||
t.Fatalf("missing internal identity file must be an error")
|
||||
}
|
||||
}
|
||||
|
||||
// TestUserAddStoreKV_UnreachableKV is the GAP A error case: pointing --store kv
|
||||
// at a dead endpoint yields a clear, handled error (no crash, no silent success).
|
||||
func TestUserAddStoreKV_UnreachableKV(t *testing.T) {
|
||||
idFile := filepath.Join(t.TempDir(), "internal.id")
|
||||
if _, err := client.LoadOrCreateIdentity(idFile); err != nil {
|
||||
t.Fatalf("persist internal identity: %v", err)
|
||||
}
|
||||
// A loopback port with nothing listening: connect must fail fast and wrapped.
|
||||
_, err := connectKVStore("nats://127.0.0.1:1/", idFile, "", 1)
|
||||
if err == nil {
|
||||
t.Fatalf("connecting to a dead endpoint must error")
|
||||
}
|
||||
}
|
||||
|
||||
// TestUserAddStoreKV_RemoteWithoutCARefused: a non-loopback target without --ca
|
||||
// is refused so the allowlist write never travels in cleartext (audit 0008 N6,
|
||||
// same guard as migrate-to-kv).
|
||||
func TestUserAddStoreKV_RemoteWithoutCARefused(t *testing.T) {
|
||||
idFile := filepath.Join(t.TempDir(), "internal.id")
|
||||
if _, err := client.LoadOrCreateIdentity(idFile); err != nil {
|
||||
t.Fatalf("persist internal identity: %v", err)
|
||||
}
|
||||
_, err := connectKVStore("nats://203.0.113.1:4250", idFile, "", 1)
|
||||
if err == nil {
|
||||
t.Fatalf("remote target without --ca must be refused")
|
||||
}
|
||||
}
|
||||
+27
-3
@@ -24,6 +24,7 @@ import (
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
)
|
||||
@@ -83,6 +84,17 @@ func main() {
|
||||
// "kv" puts rooms/members/keys/users in replicated JetStream KV so any node
|
||||
// in the cluster serves the same state.
|
||||
storeBackend = flag.String("store", "sqlite", "control-plane store backend: sqlite (default, single-node) | kv (replicated JetStream, decentralized)")
|
||||
// Persisted internal service identity (issue 0011 gaps, GAP A): when set, the
|
||||
// privileged internal identity used to manage JetStream is LOADED from this
|
||||
// file (generated and persisted on first start) instead of being a fresh
|
||||
// ephemeral key each boot. Persisting it is what lets `membershipd user add
|
||||
// --store kv` write the replicated allowlist of a LIVE cluster: that CLI,
|
||||
// run over loopback on a node, loads the SAME identity and presents the nkey
|
||||
// this node's authenticator already grants full permissions. Empty keeps the
|
||||
// ephemeral-per-process behavior (single-node/dev default, unchanged). The
|
||||
// file holds a private key: it is written 0600 and belongs next to the node's
|
||||
// TLS keys (deploy keeps it under secrets/, gitignored).
|
||||
internalIDFile = flag.String("internal-id-file", "", "path to a persisted internal service identity (JSON); enables `membershipd user add --store kv` against the live cluster. Empty = ephemeral per-process identity (dev default)")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
@@ -136,9 +148,21 @@ func main() {
|
||||
var internalID cs.Identity
|
||||
var internalPubHex string
|
||||
if needJS && enforce && *natsURL == "" {
|
||||
internalID, err = cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
log.Fatalf("generate internal identity: %v", err)
|
||||
if *internalIDFile != "" {
|
||||
// Persisted identity: load it, generating + writing it (0600) on first
|
||||
// start. A stable internal key is what `user add --store kv` presents to
|
||||
// add users to a live cluster (GAP A); rotate it by deleting the file and
|
||||
// restarting.
|
||||
internalID, err = client.LoadOrCreateIdentity(*internalIDFile)
|
||||
if err != nil {
|
||||
log.Fatalf("load internal service identity %q: %v", *internalIDFile, err)
|
||||
}
|
||||
log.Printf("internal service identity: persisted (%s)", *internalIDFile)
|
||||
} else {
|
||||
internalID, err = cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
log.Fatalf("generate internal identity: %v", err)
|
||||
}
|
||||
}
|
||||
internalPubHex = hex.EncodeToString(internalID.SignPub)
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -50,13 +51,26 @@ commands:
|
||||
list List all registered users
|
||||
revoke Revoke a user (denies access on both planes immediately)
|
||||
|
||||
store backends (--store):
|
||||
sqlite local SQLite database (default; seeds the first admin offline)
|
||||
kv the RUNNING cluster's replicated JetStream KV allowlist, via the
|
||||
privileged internal connection — add users with the cluster live,
|
||||
no stop-seed-restart needed (run over loopback/SSH on a node)
|
||||
|
||||
examples:
|
||||
membershipd user add --handle alice --sign-pub <64-hex> --role admin
|
||||
membershipd user list
|
||||
membershipd user add --store kv --handle bob --sign-pub <64-hex> --role member
|
||||
membershipd user list --store kv
|
||||
membershipd user revoke <64-hex>
|
||||
|
||||
common flags:
|
||||
--db <path> SQLite database path (default ./local_files/unibus.db)
|
||||
--db <path> SQLite database path (--store sqlite; default ./local_files/unibus.db)
|
||||
|
||||
--store kv flags (defaults assume an on-node invocation):
|
||||
--nats-url <url> cluster NATS (default nats://127.0.0.1:4250)
|
||||
--internal-id-file <path> persisted internal service identity (default /opt/unibus/secrets/internal.id)
|
||||
--ca <path> CA cert pinning the data-plane TLS (default /opt/unibus/tls/ca.crt)
|
||||
--kv-replicas <n> KV replication factor, match the cluster (default 3)
|
||||
`)
|
||||
}
|
||||
|
||||
@@ -88,12 +102,59 @@ func validateSignPubHex(signPub string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// kvFlags holds the connection flags shared by the --store kv path of the user
|
||||
// subcommands. registerKVFlags wires them onto a flag set so add and list expose
|
||||
// an identical interface.
|
||||
type kvFlags struct {
|
||||
store *string
|
||||
natsURL *string
|
||||
internalID *string
|
||||
ca *string
|
||||
replicas *int
|
||||
}
|
||||
|
||||
func registerKVFlags(fs *flag.FlagSet) kvFlags {
|
||||
return kvFlags{
|
||||
store: fs.String("store", "sqlite", "user store backend: sqlite (local DB) | kv (the live cluster's replicated allowlist)"),
|
||||
natsURL: fs.String("nats-url", defaultClusterNatsURL, "cluster NATS url for --store kv"),
|
||||
internalID: fs.String("internal-id-file", defaultInternalIDFile, "persisted internal service identity for --store kv"),
|
||||
ca: fs.String("ca", defaultClusterCAFile, "CA cert pinning TLS on the --store kv NATS connection"),
|
||||
replicas: fs.Int("kv-replicas", 3, "KV replication factor for --store kv (match the cluster)"),
|
||||
}
|
||||
}
|
||||
|
||||
// resolveStore returns the membership store for the chosen backend plus a cleanup
|
||||
// func. For --store kv it opens the privileged connection to the live cluster; for
|
||||
// sqlite it opens the local file. It exits the process with a clear message on any
|
||||
// failure (a dead NATS, a missing identity file), so a broken --store kv add fails
|
||||
// loudly instead of silently — Error case of the GAP A DoD. The returned *kvConn
|
||||
// is non-nil only for the kv backend (so the caller can report replication).
|
||||
func resolveStore(cmd string, kf kvFlags, dbPath string) (membership.Store, *kvConn, func()) {
|
||||
switch *kf.store {
|
||||
case "sqlite":
|
||||
store := openStore(dbPath)
|
||||
return store, nil, func() { store.Close() }
|
||||
case "kv":
|
||||
kv, err := connectKVStore(*kf.natsURL, *kf.internalID, *kf.ca, *kf.replicas)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "membershipd %s: --store kv: %v\n", cmd, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
return kv.store, kv, kv.Close
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "membershipd %s: --store must be \"sqlite\" or \"kv\", got %q\n", cmd, *kf.store)
|
||||
os.Exit(2)
|
||||
return nil, nil, func() {}
|
||||
}
|
||||
}
|
||||
|
||||
func userAdd(args []string) {
|
||||
fs := flag.NewFlagSet("user add", flag.ExitOnError)
|
||||
handle := fs.String("handle", "", "human-readable user name (required)")
|
||||
signPub := fs.String("sign-pub", "", "Ed25519 signing public key in hex (required)")
|
||||
role := fs.String("role", membership.RoleMember, "role: admin or member")
|
||||
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
|
||||
kf := registerKVFlags(fs)
|
||||
_ = fs.Parse(args)
|
||||
|
||||
if *handle == "" || *signPub == "" {
|
||||
@@ -105,23 +166,35 @@ func userAdd(args []string) {
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
store := openStore(*dbPath)
|
||||
defer store.Close()
|
||||
store, kv, closeStore := resolveStore("user add", kf, *dbPath)
|
||||
defer closeStore()
|
||||
|
||||
if err := store.AddUser(*signPub, *handle, *role); err != nil {
|
||||
if errors.Is(err, membership.ErrUserExists) {
|
||||
// Idempotency contract (GAP A): re-adding the same key is an EXPLICIT,
|
||||
// non-destructive error — the existing row is left untouched (no silent
|
||||
// upsert that could flip a role or clobber status, which would corrupt the
|
||||
// allowlist). To replace a user, `user revoke <key>` then add again.
|
||||
fmt.Fprintf(os.Stderr, "membershipd user add: user %s already registered (unchanged); revoke it first to replace\n", *signPub)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "membershipd user add: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("added user %q (%s) role=%s\n", *handle, *signPub, *role)
|
||||
if kv != nil {
|
||||
reportKVReplication(kv.js)
|
||||
}
|
||||
}
|
||||
|
||||
func userList(args []string) {
|
||||
fs := flag.NewFlagSet("user list", flag.ExitOnError)
|
||||
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
|
||||
kf := registerKVFlags(fs)
|
||||
_ = fs.Parse(args)
|
||||
|
||||
store := openStore(*dbPath)
|
||||
defer store.Close()
|
||||
store, _, closeStore := resolveStore("user list", kf, *dbPath)
|
||||
defer closeStore()
|
||||
|
||||
users, err := store.ListUsers()
|
||||
if err != nil {
|
||||
@@ -143,6 +216,7 @@ func userList(args []string) {
|
||||
func userRevoke(args []string) {
|
||||
fs := flag.NewFlagSet("user revoke", flag.ExitOnError)
|
||||
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
|
||||
kf := registerKVFlags(fs)
|
||||
|
||||
// Go's flag package stops at the first non-flag argument, so `revoke <key>
|
||||
// --db path` would otherwise leave --db unparsed. Pull a leading positional
|
||||
@@ -167,8 +241,8 @@ func userRevoke(args []string) {
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
store := openStore(*dbPath)
|
||||
defer store.Close()
|
||||
store, _, closeStore := resolveStore("user revoke", kf, *dbPath)
|
||||
defer closeStore()
|
||||
|
||||
if err := store.RevokeUser(signPub); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "membershipd user revoke: %v\n", err)
|
||||
|
||||
@@ -0,0 +1,151 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
// users_kv.go is the `--store kv` half of the user administration CLI (issue 0011
|
||||
// gaps, GAP A): adding and listing bus users directly against the RUNNING
|
||||
// cluster's replicated JetStream KV allowlist, with no need to stop the cluster,
|
||||
// seed a standalone node, and restart (the procedure the 0011 deploy required).
|
||||
//
|
||||
// The mechanism is the cluster's own privileged internal connection. Under
|
||||
// enforce every bus user is confined by the per-subject ACL to the JetStream API
|
||||
// of its own rooms, so no ordinary identity may touch the control-plane buckets
|
||||
// (KV_UNIBUS_*). The ONLY identity the authenticator grants full JetStream
|
||||
// permissions is membershipd's internal service identity. By persisting that
|
||||
// identity to a file (membershipd --internal-id-file) the same key becomes
|
||||
// available to this CLI, which presents it as its NATS nkey and is therefore
|
||||
// recognized as the privileged internal client and allowed to read/write the KV.
|
||||
//
|
||||
// Intended invocation is over loopback on a cluster node (SSH): the data-plane
|
||||
// TLS certificate's SAN covers 127.0.0.1/localhost and the internal identity file
|
||||
// lives 0600 next to the node's TLS keys. Using the file requires root on the
|
||||
// node, which already implies full control of that node — so co-locating it adds
|
||||
// no practical exposure beyond what the TLS server key and cluster password
|
||||
// already represent.
|
||||
|
||||
// defaultClusterNatsURL is the node-local NATS listener. The CLI is meant to run
|
||||
// on a cluster node over SSH, talking to that node's own embedded server.
|
||||
const defaultClusterNatsURL = "nats://127.0.0.1:4250"
|
||||
|
||||
// Deploy-default paths for the privileged identity and the data-plane CA, so an
|
||||
// on-node invocation needs only --handle/--sign-pub/--role. Override for other
|
||||
// layouts.
|
||||
const (
|
||||
defaultInternalIDFile = "/opt/unibus/secrets/internal.id"
|
||||
defaultClusterCAFile = "/opt/unibus/tls/ca.crt"
|
||||
)
|
||||
|
||||
// kvConn bundles the privileged NATS connection to a live cluster and the
|
||||
// KV-backed control-plane store opened over it. Close releases both.
|
||||
type kvConn struct {
|
||||
nc *nats.Conn
|
||||
js jetstream.JetStream
|
||||
store membership.Store
|
||||
}
|
||||
|
||||
func (k *kvConn) Close() {
|
||||
if k == nil {
|
||||
return
|
||||
}
|
||||
if k.store != nil {
|
||||
_ = k.store.Close()
|
||||
}
|
||||
if k.nc != nil {
|
||||
k.nc.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// connectKVStore opens the privileged internal connection to the cluster's NATS
|
||||
// and the JetStream KV control-plane store on top of it. internalIDFile is the
|
||||
// membershipd-persisted internal service identity whose nkey the authenticator
|
||||
// grants full permissions; caPath pins the data-plane TLS (empty only for a
|
||||
// non-TLS dev cluster). A non-loopback target without --ca is refused, mirroring
|
||||
// migrate-to-kv (audit 0008 N6): the allowlist write must not travel in cleartext.
|
||||
func connectKVStore(natsURL, internalIDFile, caPath string, replicas int) (*kvConn, error) {
|
||||
if internalIDFile == "" {
|
||||
return nil, fmt.Errorf("--internal-id-file is required for --store kv (the privileged identity membershipd persists with --internal-id-file)")
|
||||
}
|
||||
// Confidentiality guard: a remote NATS without TLS would expose the allowlist
|
||||
// (handles/roles/sign-pubs) and the privileged nkey handshake in cleartext.
|
||||
if !isLoopbackURL(natsURL) && caPath == "" {
|
||||
return nil, fmt.Errorf("refusing to connect to remote %q without --ca: the allowlist write would travel in cleartext — pin TLS with --ca, or run over a loopback --nats-url on a node", natsURL)
|
||||
}
|
||||
|
||||
id, err := client.LoadIdentity(internalIDFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("load internal identity: %w", err)
|
||||
}
|
||||
nkeyPub, nkeySign, err := busauth.ClientNkey(id.SignPriv)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("derive nkey from internal identity: %w", err)
|
||||
}
|
||||
opts := []nats.Option{
|
||||
nats.Name("membershipd-user-cli"),
|
||||
nats.Nkey(nkeyPub, nkeySign),
|
||||
}
|
||||
if caPath != "" {
|
||||
tlsCfg, err := busauth.LoadCATLSConfig(caPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("load CA %q: %w", caPath, err)
|
||||
}
|
||||
opts = append(opts, nats.Secure(tlsCfg))
|
||||
}
|
||||
nc, err := nats.Connect(natsURL, opts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connect cluster NATS %q: %w", natsURL, err)
|
||||
}
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
return nil, fmt.Errorf("jetstream: %w", err)
|
||||
}
|
||||
store, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: replicas})
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
return nil, fmt.Errorf("open KV control-plane store: %w", err)
|
||||
}
|
||||
return &kvConn{nc: nc, js: js, store: store}, nil
|
||||
}
|
||||
|
||||
// reportKVReplication prints the replication status of the allowlist bucket
|
||||
// stream (KV_UNIBUS_users) right after a write, so the operator sees the add
|
||||
// landed on a quorum and replicated to the followers — executable evidence that
|
||||
// the live-cluster add is HA, not single-node. Best-effort: a read failure is a
|
||||
// note, not an error (the write itself already succeeded).
|
||||
func reportKVReplication(js jetstream.JetStream) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
st, err := js.Stream(ctx, "KV_UNIBUS_users")
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "note: could not read KV_UNIBUS_users stream info: %v\n", err)
|
||||
return
|
||||
}
|
||||
info, err := st.Info(ctx)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "note: could not read KV_UNIBUS_users stream info: %v\n", err)
|
||||
return
|
||||
}
|
||||
if info.Cluster == nil {
|
||||
fmt.Printf("KV_UNIBUS_users: standalone (R1, no cluster replication); msgs=%d\n", info.State.Msgs)
|
||||
return
|
||||
}
|
||||
current := 0
|
||||
for _, r := range info.Cluster.Replicas {
|
||||
if r.Current {
|
||||
current++
|
||||
}
|
||||
}
|
||||
fmt.Printf("KV_UNIBUS_users: leader=%s followers_current=%d/%d msgs=%d\n",
|
||||
info.Cluster.Leader, current, len(info.Cluster.Replicas), info.State.Msgs)
|
||||
}
|
||||
+143
-39
@@ -5,9 +5,12 @@ This directory holds the material to bring up unibus as a **3-node cluster**
|
||||
plane (rooms/members/keys/users on JetStream KV + the anti-replay nonce bucket)
|
||||
survives the loss of any one node (quorum 2/3).
|
||||
|
||||
> **The agent that authored this never touched a VPS.** Every step that changes a
|
||||
> remote host is marked **HUMAN** and is executed by the operator. `deploy-cluster.sh`
|
||||
> defaults to a dry run.
|
||||
> **Status: this cluster is DEPLOYED in production** (magnus + homer + datardos,
|
||||
> R3, enforce+ACL+TLS) — see report 0011. The runbook below was authored before any
|
||||
> VPS existed and has since been **corrected against the real deploy** (report 0012):
|
||||
> the start ordering, the R1→R3 reality, and the live user-add path were all wrong
|
||||
> or missing. Steps that change a remote host are marked **HUMAN**; `deploy-cluster.sh`
|
||||
> still defaults to a dry run.
|
||||
|
||||
## Files
|
||||
|
||||
@@ -22,18 +25,22 @@ Generated keys/secrets (`out/`, `build/`, `secrets/`) are **gitignored** — the
|
||||
secret and never leave the operator's trusted machine except over the secure
|
||||
rsync channel.
|
||||
|
||||
## Topology
|
||||
## Topology (as deployed, report 0011)
|
||||
|
||||
| Node | SSH | Public IP | WireGuard IP | Role |
|
||||
|---|---|---|---|---|
|
||||
| magnus | `magnus` | `<MAGNUS_PUBLIC_IP>` | `<MAGNUS_WG_IP>` | seed (first up) |
|
||||
| homer | `homer` | `141.94.69.66` | `<HOMER_WG_IP>` | replica |
|
||||
| datardos | `dd` | `51.91.100.142` | `<DATARDOS_WG_IP>` (10.21.0.x) | replica |
|
||||
| Node | SSH | Public IP | Role |
|
||||
|---|---|---|---|
|
||||
| magnus | `magnus` (root) | `135.125.201.30` | node — **= organic-machine.com = `om`**, the critical host (caddy + gitea + registry-api + monitoring); the bus runs alongside, untouched |
|
||||
| homer | `homer` (ubuntu+sudo) | `141.94.69.66` | node |
|
||||
| datardos | `dd` (ubuntu+sudo) | `51.91.100.142` | node |
|
||||
|
||||
The route layer (server-to-server) prefers the **WireGuard mesh**
|
||||
(`ROUTE_NETWORK=wg`); the client data plane and the HTTP control plane are reached
|
||||
over the public IPs. The route CA is **separate** from the client CA, so a client
|
||||
cert can never be presented to the route port.
|
||||
`ROUTE_NETWORK=public`, **not `wg`**: there is no WireGuard mesh between the three
|
||||
nodes (homer and datardos do not even have the `wg` binary; om's only WG peers are
|
||||
the operator's PCs). The server-to-server routes therefore travel over the public
|
||||
IPs, protected by the **separate cluster route CA** (mutual route TLS) — a client
|
||||
data-plane cert can never be presented to the route port. The client data plane and
|
||||
the HTTP control plane are also reached over the public IPs. There is no fixed
|
||||
"seed" node: with R3 the three are peers (see "Bring up" for why a lone node cannot
|
||||
self-serve).
|
||||
|
||||
## Prerequisites (HUMAN, once)
|
||||
|
||||
@@ -93,25 +100,48 @@ SEED
|
||||
|
||||
> The KV written here lives in `./local_files/jetstream`, which the cluster unit
|
||||
> reuses (`--nats-store` default), so the admin is present when the enforce cluster
|
||||
> starts. Additional users are added the same loopback way until a
|
||||
> `user add --store kv` exists (see GAP in report 0009).
|
||||
> starts. This loopback bootstrap is needed ONLY for the very first admin (the
|
||||
> chicken-and-egg). **Every user after that is added with the cluster live** — no
|
||||
> stop-seed-restart — via `user add --store kv` (see "Add users to the live
|
||||
> cluster" below, report 0012).
|
||||
|
||||
## Bring up (HUMAN — staggered)
|
||||
## Bring up (HUMAN)
|
||||
|
||||
Bring up the seed first, then the replicas one at a time, checking each joins.
|
||||
> **CORRECTION (report 0012).** The original instruction — "start magnus alone and
|
||||
> verify healthz, then add the others" — is **WRONG and will look like a hung
|
||||
> deploy.** A 3-node JetStream cluster forms a RAFT meta-group that needs a quorum
|
||||
> (2 of 3) to elect a leader. A single started node has no quorum, so its JetStream
|
||||
> meta never becomes current: `--store kv` blocks creating the KV buckets and
|
||||
> **`/healthz` never returns ok** until a second node joins. Waiting for magnus to
|
||||
> "go green" before starting the others therefore deadlocks the rollout.
|
||||
|
||||
Start the nodes so a quorum forms. On a **clean cluster** the simplest correct
|
||||
procedure is to start all three close together and let the meta-group converge:
|
||||
|
||||
```bash
|
||||
# 1. Seed node (after the seed step above).
|
||||
ssh root@magnus 'systemctl enable --now membershipd-cluster'
|
||||
ssh root@magnus 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt'
|
||||
# Start all three (order does not matter); each blocks on the others until a
|
||||
# 2/3 quorum elects a JetStream meta leader, then the KV buckets are created.
|
||||
for h in magnus homer datardos; do ssh "$h" 'sudo systemctl enable --now membershipd-cluster'; done
|
||||
|
||||
# 2. Replicas, one at a time.
|
||||
ssh root@homer 'systemctl enable --now membershipd-cluster'
|
||||
ssh root@datardos 'systemctl enable --now membershipd-cluster'
|
||||
# Only NOW does healthz return ok — once the meta-group has a leader (give it
|
||||
# ~10-30s on a cold start). Poll, do not assume the first node is broken.
|
||||
for h in magnus homer datardos; do
|
||||
echo "== $h =="; ssh "$h" 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt || echo "(not ready yet — needs quorum)"'
|
||||
done
|
||||
```
|
||||
|
||||
> Initial rollout runs at **R1** (`KV_REPLICAS=1` in `nodes.env`): the buckets live
|
||||
> on the seed only. This is NOT HA yet — see "Scale to R3".
|
||||
A **staggered** start also works, but only because `membershipd`'s KV open RETRIES
|
||||
the bucket creation for a 120s bootstrap budget (issue 0006g, fix #3): the first
|
||||
node sits in that retry loop — NOT serving healthz — until the second node makes a
|
||||
quorum, then both converge and the third catches up. Either way, a lone node never
|
||||
self-serves; do not gate the next node's start on the previous one's healthz.
|
||||
|
||||
> A cold multi-node start only converges because of **three cold-start fixes**
|
||||
> (report 0011): route pooling off (`PoolSize=-1`), `NoAdvertise=true` (Docker
|
||||
> bridge IPs not gossiped), and the KV-open retry loop above. Without them the
|
||||
> meta-group re-elects leaders forever and bucket creation hangs. If a fresh
|
||||
> cluster will not form, confirm the running binary contains these fixes before
|
||||
> touching config.
|
||||
|
||||
## Promote an existing single-node (SQLite) deployment (HUMAN, optional)
|
||||
|
||||
@@ -137,11 +167,80 @@ ssh root@magnus 'nats --server nats://127.0.0.1:4250 server list' # 3 servers,
|
||||
|
||||
A healthy cluster shows 3 routed servers and a JetStream meta-group with a leader.
|
||||
|
||||
## Scale to R3 (HUMAN — real HA)
|
||||
## Add users to the live cluster (HUMAN — `user add --store kv`)
|
||||
|
||||
Once all three nodes are up and routed, raise the replication factor of every
|
||||
control-plane stream from 1 to 3 IN PLACE (no data loss), then flip `KV_REPLICAS=3`
|
||||
in `nodes.env` so future (re)deploys keep it:
|
||||
With the cluster up, add (and revoke) bus users **without stopping anything**,
|
||||
directly against the replicated KV allowlist. This replaces the stop-seed-restart
|
||||
procedure the original runbook implied for every user beyond the first admin.
|
||||
|
||||
The mechanism is the cluster's own **privileged internal connection**: under
|
||||
`enforce` every bus user is confined by the per-subject ACL to its own rooms, so no
|
||||
ordinary identity may write the control-plane buckets. The only identity the
|
||||
authenticator grants full JetStream permissions is `membershipd`'s internal service
|
||||
identity. The unit persists that identity to `${INTERNAL_ID_FILE}`
|
||||
(`/opt/unibus/secrets/internal.id`, 0600) via `--internal-id-file`, so the same key
|
||||
is available to the CLI. Run the CLI **on a node, over loopback** (the data-plane
|
||||
TLS cert SAN covers `127.0.0.1`); reading the identity file requires root on that
|
||||
node, which already implies full control of it, so this adds no practical exposure.
|
||||
|
||||
```bash
|
||||
# Add a member to the live cluster's replicated allowlist (run on any node).
|
||||
ssh root@magnus 'sudo /opt/unibus/membershipd user add --store kv \
|
||||
--handle alice --role member --sign-pub <64-hex-ed25519-pub>'
|
||||
# -> added user "alice" (...) role=member
|
||||
# -> KV_UNIBUS_users: leader=<node> followers_current=2/2 msgs=N (replicated, HA)
|
||||
|
||||
# List / revoke against the same live KV:
|
||||
ssh root@magnus 'sudo /opt/unibus/membershipd user list --store kv'
|
||||
ssh root@magnus 'sudo /opt/unibus/membershipd user revoke --store kv <64-hex-ed25519-pub>'
|
||||
```
|
||||
|
||||
Defaults assume an on-node invocation (`--nats-url nats://127.0.0.1:4250`,
|
||||
`--internal-id-file /opt/unibus/secrets/internal.id`, `--ca /opt/unibus/tls/ca.crt`,
|
||||
`--kv-replicas 3`). Semantics:
|
||||
|
||||
- **Idempotent / non-destructive**: re-adding the same key is an explicit
|
||||
`already registered` error (exit 1), never a silent overwrite — a re-add cannot
|
||||
flip a member to admin. To replace a user, `revoke` then add.
|
||||
- **HA**: the write commits through the JetStream quorum, so it succeeds even with
|
||||
one node down (2/3); the printed `followers_current` shows replication.
|
||||
- **No hard delete**: `revoke` flips status to `revoked` (denied on both planes,
|
||||
auditable); the KV has no row deletion, matching the SQLite store.
|
||||
|
||||
> **Rollout note (report 0012):** the live verification deployed this binary +
|
||||
> `--internal-id-file` to **datardos only** (the non-critical node). magnus and
|
||||
> homer still run the 0011 binary. To make the capability available (and the unit)
|
||||
> on all three — recommended, the posture is identical so there is no urgency — roll
|
||||
> the new binary with backups, one node at a time, verifying healthz between each:
|
||||
> ```bash
|
||||
> for h in homer magnus; do
|
||||
> ssh "$h" 'sudo cp -a /opt/unibus/membershipd /opt/unibus/membershipd.bak' # backup
|
||||
> scp build/membershipd "$h:/tmp/m" && ssh "$h" 'sudo install -o ubuntu -g ubuntu -m0775 /tmp/m /opt/unibus/membershipd'
|
||||
> # add INTERNAL_ID_FILE=/opt/unibus/secrets/internal.id to /opt/unibus/cluster.env
|
||||
> # add `--internal-id-file ${INTERNAL_ID_FILE} \` to the unit before `--store kv`
|
||||
> ssh "$h" 'sudo systemctl daemon-reload && sudo systemctl restart membershipd-cluster'
|
||||
> ssh "$h" 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt' # green before next
|
||||
> done
|
||||
> ```
|
||||
> (`deploy-cluster.sh` + the unit template already emit `INTERNAL_ID_FILE` and the
|
||||
> flag, so a fresh `./deploy-cluster.sh --yes` is correct for all three.)
|
||||
|
||||
## Replication: go straight to R3 (HUMAN — real HA)
|
||||
|
||||
> **CORRECTION (report 0012).** The original "start at R1, then scale to R3" plan
|
||||
> assumed R1 is a usable interim state. **It is not, in this cluster.** At R1 all six
|
||||
> control-plane buckets (`KV_UNIBUS_users/rooms/members/room_keys/rooms_by_member`
|
||||
> + `KV_UNIBUS_nonces`) live on a SINGLE node — a hard **SPOF for authentication**:
|
||||
> if that node dies, the nonce/KV control plane is unreachable and EVERY
|
||||
> authenticated request fails closed (auth DoS). Worse, the cold multi-node start
|
||||
> only converges at all because of the three cold-start fixes (see "Bring up"); the
|
||||
> real deploy never ran a healthy R1 and **jumped straight to R3 once the cluster
|
||||
> formed.** Treat R1 as a transient artifact of bucket creation, not a milestone.
|
||||
|
||||
The deployed config already sets `KV_REPLICAS=3` in `nodes.env`. If buckets were
|
||||
created at R1 (e.g. only one node was up when `--store kv` first opened them), raise
|
||||
every control-plane stream to R3 IN PLACE (no data loss) once all three nodes are
|
||||
routed:
|
||||
|
||||
```bash
|
||||
for s in KV_UNIBUS_users KV_UNIBUS_rooms KV_UNIBUS_members KV_UNIBUS_room_keys \
|
||||
@@ -151,27 +250,32 @@ done
|
||||
# (also OBJ_UNIBUS_blobs if the object store is in use)
|
||||
```
|
||||
|
||||
Until this is done, R1 means the seed node is a **single point of failure for
|
||||
authentication**: if it dies, the nonce/KV control plane is unreachable and every
|
||||
authenticated request fails closed (auth DoS). R1 is a rollout step, not HA.
|
||||
After this each bucket shows `followers_current=2/2` (quorum 2/3). The
|
||||
`user add --store kv` command prints that figure for `KV_UNIBUS_users` on every add,
|
||||
which is a cheap live HA check.
|
||||
|
||||
## Chaos test (HUMAN — requires the 3 live VPS; NOT run here)
|
||||
## Chaos test (HUMAN — requires the 3 live VPS)
|
||||
|
||||
Validate quorum tolerance after R3:
|
||||
|
||||
```bash
|
||||
# Kill one node; the cluster keeps serving (quorum 2/3).
|
||||
ssh root@datardos 'systemctl stop membershipd-cluster'
|
||||
# Kill one node; the cluster keeps serving (quorum 2/3). On ubuntu nodes use sudo.
|
||||
ssh dd 'sudo systemctl stop membershipd-cluster'
|
||||
# -> clients fail over (multiple seed URLs); reads/writes still succeed.
|
||||
ssh root@datardos 'systemctl start membershipd-cluster' # rejoins, catches up
|
||||
ssh dd 'sudo systemctl start membershipd-cluster' # rejoins, catches up
|
||||
|
||||
# Kill two nodes; quorum is LOST — the control plane should fail CLOSED (deny),
|
||||
# never fail open. Verify a request is rejected, not silently served.
|
||||
```
|
||||
|
||||
This network-level chaos test (kill 1/3, kill 2/3, partition/split-brain) is part
|
||||
of the deploy validation (issue 0003f) and runs against the real VPS — it is
|
||||
deliberately out of scope for the authoring agent.
|
||||
> **Validated (report 0012).** The 0011 chaos run checked only the control plane
|
||||
> (healthz + meta/stream-leader failover + KV readable with 2/3). Report 0012 added
|
||||
> the missing data-plane proofs against the live cluster: a real authenticated
|
||||
> client (`cmd/clientcheck`, operator identity, nkey+TLS) creating an E2E room and
|
||||
> publishing/subscribing — including a node stopped mid-stream, where the client
|
||||
> failed over to a survivor and kept receiving with zero loss (quorum 2/3) — and
|
||||
> `user add --store kv` committing with one node (the KV leader) down. The kill-2/3
|
||||
> fail-closed case remains a documented manual step.
|
||||
|
||||
## Rollback
|
||||
|
||||
|
||||
@@ -97,6 +97,7 @@ TLS_KEY=${REMOTE_DIR}/tls/server-${name}.key
|
||||
ROUTE_TLS_CERT=${REMOTE_DIR}/tls/route-${name}.crt
|
||||
ROUTE_TLS_KEY=${REMOTE_DIR}/tls/route-${name}.key
|
||||
ROUTE_TLS_CA=${REMOTE_DIR}/tls/cluster-ca.crt
|
||||
INTERNAL_ID_FILE=${REMOTE_DIR}/secrets/internal.id
|
||||
EOF
|
||||
|
||||
run ssh "$target" "mkdir -p ${REMOTE_DIR}/tls ${REMOTE_DIR}/secrets"
|
||||
@@ -114,13 +115,16 @@ if [[ $APPLY -eq 0 ]]; then
|
||||
fi
|
||||
cat <<'NEXT'
|
||||
|
||||
HUMAN — staggered start (do NOT enable all at once; see README "Bring up"):
|
||||
1. Seed node first (e.g. magnus):
|
||||
ssh root@magnus 'systemctl enable --now membershipd-cluster'
|
||||
ssh root@magnus '/opt/unibus/membershipd user add --admin ...' # seed admin
|
||||
2. Then the other two, one at a time, checking quorum after each:
|
||||
ssh root@homer 'systemctl enable --now membershipd-cluster'
|
||||
ssh root@datardos 'systemctl enable --now membershipd-cluster'
|
||||
HUMAN — bring up (see README "Bring up" — a LONE node has no quorum and never
|
||||
serves healthz, so do NOT gate the next node on the previous one going green):
|
||||
1. Seed the FIRST admin into the KV via the loopback bootstrap (README
|
||||
"Seed the first admin"); this is needed only for the chicken-and-egg admin.
|
||||
2. Start all three so a 2/3 quorum forms (order does not matter); healthz
|
||||
turns ok only once the meta-group elects a leader (~10-30s cold):
|
||||
for h in magnus homer datardos; do ssh "$h" 'sudo systemctl enable --now membershipd-cluster'; done
|
||||
3. Verify posture + quorum (README "Verify").
|
||||
4. Scale replicas 1 -> 3 once all three are up (README "Scale to R3").
|
||||
4. Ensure R3 on every control-plane stream (README "Replication: go straight to
|
||||
R3"); R1 is a SPOF, not a milestone.
|
||||
5. Add further users with the cluster LIVE — no restart — via
|
||||
`membershipd user add --store kv` (README "Add users to the live cluster").
|
||||
NEXT
|
||||
|
||||
@@ -33,6 +33,7 @@ ExecStart=/opt/unibus/membershipd \
|
||||
--route-tls-cert ${ROUTE_TLS_CERT} \
|
||||
--route-tls-key ${ROUTE_TLS_KEY} \
|
||||
--route-tls-ca ${ROUTE_TLS_CA} \
|
||||
--internal-id-file ${INTERNAL_ID_FILE} \
|
||||
--store kv \
|
||||
--kv-replicas ${KV_REPLICAS}
|
||||
# Restart=always (NOT on-failure): a clean SIGTERM exits success, and on-failure
|
||||
|
||||
+27
-11
@@ -33,20 +33,36 @@ type identityFile struct {
|
||||
KexPriv string `json:"kex_priv"`
|
||||
}
|
||||
|
||||
// LoadIdentity loads an existing identity from path. Unlike LoadOrCreateIdentity
|
||||
// it NEVER creates one: a missing or unreadable file is an error. It is for
|
||||
// callers that must consume a specific, pre-provisioned identity rather than mint
|
||||
// a fresh one — for example membershipd's persisted internal service identity,
|
||||
// which `membershipd user add --store kv` reads to present the privileged nkey
|
||||
// the cluster authenticator recognizes.
|
||||
func LoadIdentity(path string) (cs.Identity, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return cs.Identity{}, fmt.Errorf("client: read identity %q: %w", path, err)
|
||||
}
|
||||
var f identityFile
|
||||
if err := json.Unmarshal(data, &f); err != nil {
|
||||
return cs.Identity{}, fmt.Errorf("client: parse identity %q: %w", path, err)
|
||||
}
|
||||
id, err := f.toIdentity()
|
||||
if err != nil {
|
||||
return cs.Identity{}, fmt.Errorf("client: decode identity %q: %w", path, err)
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// LoadOrCreateIdentity loads the identity at path, or generates and persists a
|
||||
// new one if the file does not exist. The file is written with 0600
|
||||
// permissions because it holds private keys.
|
||||
// permissions because it holds private keys. A file that exists but is
|
||||
// unreadable or corrupt is an error (NOT silently regenerated), so a damaged
|
||||
// identity surfaces instead of minting a new key that cannot decrypt old data.
|
||||
func LoadOrCreateIdentity(path string) (cs.Identity, error) {
|
||||
if data, err := os.ReadFile(path); err == nil {
|
||||
var f identityFile
|
||||
if err := json.Unmarshal(data, &f); err != nil {
|
||||
return cs.Identity{}, fmt.Errorf("client: parse identity %q: %w", path, err)
|
||||
}
|
||||
id, err := f.toIdentity()
|
||||
if err != nil {
|
||||
return cs.Identity{}, fmt.Errorf("client: decode identity %q: %w", path, err)
|
||||
}
|
||||
return id, nil
|
||||
if _, statErr := os.Stat(path); statErr == nil {
|
||||
return LoadIdentity(path)
|
||||
}
|
||||
|
||||
id, err := cs.GenerateIdentity()
|
||||
|
||||
Reference in New Issue
Block a user