20 Commits

Author SHA1 Message Date
egutierrez 19bb0e56a6 chore: gitignore go.work for /tmp worktree resolution
go.work resolves the fn-registry replace to an absolute path when the worktree
lives outside the parent tree; it is local-only and must not be committed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 22:14:44 +02:00
egutierrez 7e2f62520d docs(unibus): bump 0.12.0 — accounts via single-use invites + hard-delete
Document the wallet-model account layer: invite-link account creation and real
hard-delete, both gotcha and capability growth log entries.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 22:14:44 +02:00
egutierrez 52c80ac010 test(membership,client): invite lifecycle, register, hard-delete
- Store-level suite over BOTH backends (SQLite + JetStream KV): golden redeem,
  single-use rejection, unknown token, expired token (forced past), cancel, and
  hard-delete. Plus the burn-on-claim edge (redeem with an already-registered
  key spends the invite and returns ErrUserExists on both backends).
- HTTP suite: admin mints an invite, a brand-new identity redeems it UNSIGNED
  via /register, the user appears in the allowlist, a second redeem is 409,
  expired is 410, malformed keys are 400, a non-admin is 403 on all four admin
  routes, and DELETE /users purges (vs revoke's status flip).
- Client end-to-end: admin mints an invite, an unregistered joiner redeems it
  without any admin signature, appears in the allowlist, then is hard-deleted.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 22:14:44 +02:00
egutierrez ca801d16af feat(client): invite and hard-delete admin methods, unsigned Register
Add the client-library counterparts the admin panel and the /join client page
consume:

- CreateInvite, ListInvites, CancelInvite, DeleteUser: signed as admin.
- Register(token, signPub, kexPub): UNSIGNED, via a new doUnsigned helper that
  fails over across control-plane endpoints and surfaces the server's structured
  error. The registering peer is not in the allowlist, so it cannot sign; the
  bearer token is the authorization.
- InviteInfo flat view for the panel.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 22:14:44 +02:00
egutierrez 18987bbd2f feat(membership): invite, register and hard-delete HTTP endpoints
Expose the account-creation surface over the signed control plane:

- POST /invites, GET /invites (pending only), DELETE /invites/{token},
  DELETE /users/{signpub}: all admin-only via requireAdmin (default-deny by role).
- POST /register: the wallet-model join path. It is the ONLY allowlist-mutating
  route exempt from the admin signature, because the registering identity is not
  yet in the allowlist and cannot sign — authorization is the bearer invite token.
  It validates both public keys (sign_pub Ed25519, kex_pub X25519, 64-hex) BEFORE
  spending the token, fixes handle/role from the invite (no client escalation),
  and maps state errors to precise codes (unknown 404, used 409, expired 410,
  already-registered 409).

Split isRateExempt (only /healthz) from isAuthExempt (/healthz + POST /register)
so /register skips the admin-signature middleware but stays per-IP rate limited.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 22:14:44 +02:00
egutierrez d64b0c052d feat(membership): single-use invites + hard-delete in the Store (SQLite + KV)
Add the data layer for WhatsApp-style accounts on the wallet model: the admin
mints a single-use invitation link, the new user redeems it by publishing only
its public keys, and the admin can hard-delete a user.

- Invite type and lifecycle (invites.go): 32-byte crypto/rand hex token, 7-day
  default TTL, fail-closed expiry parsing. Methods CreateInvite/GetInvite/
  ListInvites/ConsumeInvite/CancelInvite on both backends. ConsumeInvite is
  atomic and single-use: SQLite uses a transaction guarded by `used = 0`, the KV
  store uses a compare-and-swap on the entry revision (mark-first). Both burn the
  token on claim, so an already-registered key surfaces ErrUserExists with the
  invite spent — identical semantics across backends.
- DeleteUser (users.go + jetstream_store.go): hard-delete of the allowlist row,
  distinct from RevokeUser's status flip. Room memberships of the ex-user are
  intentionally left inert (they can no longer authenticate); no partial cleanup.
- Migration 003_invites.sql (root + embedded copy, byte-identical): additive
  `invites` table with audit columns, per db_migrations rules.
- Store interface gains DeleteUser, CreateInvite, GetInvite, ListInvites,
  ConsumeInvite, CancelInvite. New UNIBUS_invites KV bucket.
- Consistency fix: SQLite GetUser now maps sql.ErrNoRows to ErrNotFound, matching
  the KV backend and the storage-agnostic contract documented in store.go.
- ValidateKexPubHex added alongside ValidateSignPubHex for /register key checks.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 22:14:25 +02:00
egutierrez f31580deec Merge quick/nats-monitor-flag: UNIBUS_NATS_MONITOR loopback monitoring decoupled from debug log (bump 0.11.0) 2026-06-07 21:18:59 +02:00
Egutierrez 1c9325104c feat(embeddednats): UNIBUS_NATS_MONITOR flag decoupled from debug log
Add a dedicated UNIBUS_NATS_MONITOR=1 toggle that opens the embedded
nats-server monitoring HTTP endpoint (127.0.0.1:8222, loopback only) so a
local metrics scraper can read /varz, /connz and /jsz for server-level
metrics (msgs/s, connections, KV bucket msgs, RAFT leader per stream,
restarts).

Previously the monitoring endpoint was only reachable via UNIBUS_NATS_DEBUG=1,
which is coupled to the verbose nats-server debug log: enabling the endpoint
also wrote routes/RAFT/room subjects to journald in clear, which regresses the
hardened posture (issue 0007). The two concerns are now decoupled.

The toggle computation is extracted to a pure function
natsLogOpts(debugEnv, monitorEnv) (noLog, debug, trace, monitor): MONITOR=1
opens the endpoint while keeping the log quiet (NoLog true / Debug false). The
inverse coupling is preserved for backward compatibility (DEBUG still implies
MONITOR). The 127.0.0.1 bind stays hardcoded — the monitoring endpoint has no
auth and must never be reachable from the network.

Deploy wiring versioned: additive systemd drop-in
membershipd-cluster.service.d/nats-monitor.conf (Environment=UNIBUS_NATS_MONITOR=1)
plus a "NATS server metrics" section in the cluster README with the rolling
activation runbook (magnus -> homer -> datardos) gated on R3 reconvergence
(followers 2/2) between nodes.

Tests: pure decoupling table (monitor on => log NOT debug; debug => monitor;
default closed) + a real embedded server with MONITOR=1 asserting /varz answers
200 on loopback:8222, and a server without the flag with the endpoint closed.
100% additive: behavior is identical without the flag. Bump app.md 0.10.0 ->
0.11.0.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 20:57:46 +02:00
egutierrez b4f3118e85 Merge quick/users-http-admin: HTTP admin-only users API + client methods (report 0014) 2026-06-07 20:46:44 +02:00
egutierrez e9053169da Merge quick/0011-deploy-gaps: live user-add --store kv + clientcheck E2E + runbook fixes (report 0012) 2026-06-07 20:46:44 +02:00
Egutierrez b983e43090 docs(0007): spec encryption-at-rest del control plane (JetStream/SQLite en disco) 2026-06-07 20:34:35 +02:00
egutierrez b379730225 docs(app): document users HTTP admin model, bump 0.10.0
Add a gotcha describing the unified-storage model (the server writes
users to the same store/KV as rooms), the admin-only HTTP surface, and
the CLI-seeds-admin-#0 bootstrap. Bump the version 0.9.0 -> 0.10.0 and
add the capability growth log entry for the new HTTP admin users API.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 20:32:05 +02:00
egutierrez 450ca01baf feat(membership,client): HTTP admin-only users API
Close the last control-plane asymmetry: rooms had a signed HTTP surface
but users were only manageable via the local CLI or direct store access.
Add admin-only HTTP endpoints, symmetric with rooms, executed against the
same privileged store the server already serves (SQLite single-node, the
replicated JetStream KV in cluster) — no new KV connection, no internal
identity, so the admin panel can manage the allowlist by signing as an
admin instead of needing --db / direct KV access.

Endpoints (all behind requireAdmin, on top of the existing
signature+nonce+TLS+enforce middleware):
  - GET  /users                    list the full allowlist (incl. revoked)
  - POST /users                    add {sign_pub, handle, role}
  - POST /users/{signpub}/revoke   revoke (status flip, no hard delete)

requireAdmin is default-deny with no dev relaxation: it allows a request
only when the authenticated signer is confirmed by the store as an active
admin; any other case (no signer, non-admin, revoked, store error) is 403,
fail-closed. The request context now also carries the signer's sign_pub
hex, because the endpoint id is a one-way hash of the key and cannot be
reversed to look the signer up in the allowlist.

Validation/idempotency mirror the CLL: sign_pub must be 64-hex, role must
be admin|member (empty defaults to member), re-adding an existing key is a
409 that leaves the row untouched. The hex check is unified into
membership.ValidateSignPubHex, reused by the CLI and the handlers.

pkg/client gains ListUsers/AddUser/RevokeUser (flat UserInfo type) signed
via doJSON, so the panel plugs in directly.

Tests: non-admin -> 403 on all three endpoints; admin add->list->revoke
roundtrip; validation (400 hex, 400 role, 409 re-add, row untouched); plus
a client test against an embedded membershipd under enforce.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 20:31:57 +02:00
egutierrez e1a7402ff1 chore: bump unibus to 0.9.0 (live user-add + clientcheck)
New capability membershipd user add --store kv against a live cluster plus
cmd/clientcheck end-to-end verification (issue 0011 gaps, report 0012). Adds
the capability growth log entry.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 19:41:56 +02:00
egutierrez ce72131ddf docs(cluster): correct runbook + wire --internal-id-file into deploy
Corrections learned from the real 0011 deploy:
- Bring up: the "start magnus alone and verify healthz" order deadlocks — a
  lone node of a 3-node cluster has no meta-group quorum and never serves
  healthz until a second node joins. Document a quorum-forming start and that
  a node never self-serves.
- Replication: R1 is an unusable SPOF (all six control-plane buckets on one
  node) and the cold start only converges with the three cold-start fixes;
  go straight to R3 once the cluster forms.
- Add a "user add --store kv" section: the live user-add path that replaces
  stop-seed-restart, with its security model and idempotency/HA/no-delete
  semantics.
- Topology: real IPs, ROUTE_NETWORK=public (no WireGuard mesh exists).
- Chaos test: mark the data-plane client + failover proofs as validated (0012).

Deploy machinery now emits the persisted internal identity: the unit gains
--internal-id-file ${INTERNAL_ID_FILE} and deploy-cluster.sh writes
INTERNAL_ID_FILE into each node's cluster.env, so a fresh deploy enables the
live user-add path on every node.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 19:41:56 +02:00
egutierrez 3aa5a2c9a9 feat(clientcheck): end-to-end client verification (E2E room + failover)
The 0011 chaos test validated only the control plane (healthz + leader
failover + KV readable with 2/3); it never connected an authenticated bus
client to the data plane. cmd/clientcheck is a reusable verification tool: it
connects with a real identity (nkey + TLS on both planes, multi-node seed
lists), creates an ephemeral E2E room (encrypted + signed, no durable stream),
and either publishes N messages and asserts all come back decrypted (golden)
or publishes a counter for a duration while logging the attached node (loop),
so stopping a node mid-run shows the client fail over to a survivor and keep
receiving with quorum 2/3.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 19:41:56 +02:00
egutierrez 02c2004ebd feat(membershipd): user add/list/revoke --store kv against a live cluster
Closes the most valuable 0011 deploy gap: adding users to the running
cluster's replicated allowlist with no stop-seed-restart. Under enforce the
per-subject ACL confines every bus user to its own rooms, so no ordinary
identity may write the control-plane KV buckets; the only identity the
authenticator grants full JetStream permissions is membershipd's internal
service identity.

- main.go: --internal-id-file persists that identity (load-or-create, 0600)
  instead of a fresh ephemeral key, so the same nkey is available out of
  process. Empty keeps the ephemeral default (single-node/dev unchanged).
- users_kv.go: connectKVStore loads the persisted identity, presents its
  nkey (recognized as internal -> full perms), opens the KV store and
  writes. Defaults assume an on-node loopback invocation; a remote target
  without --ca is refused (allowlist must not travel cleartext, audit N6).
  Prints KV_UNIBUS_users replication (followers_current) after a write.
- users_cli.go: --store kv on add/list/revoke. Re-adding a key is an explicit
  ErrUserExists (no silent overwrite / role flip); revoke is a status flip.
- pkg/client: LoadIdentity (load-only) extracted from LoadOrCreateIdentity,
  preserving its "corrupt file is an error, not silently regenerated" guard.
- kv_useradd_test.go: golden write under enforce, idempotency, unreachable
  endpoint, and remote-without-CA refusal against an embedded node.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 19:41:38 +02:00
egutierrez ff580ac031 Merge quick/cluster-coldstart-fixes: 3-node cluster cold-start fixes + real topology 2026-06-07 18:56:28 +02:00
egutierrez 9fbff79df4 chore(deploy): fill cluster nodes.env with the real 3-node topology
Set magnus's public IP (135.125.201.30) and switch ROUTE_NETWORK to "public":
the three nodes have no WireGuard mesh (homer/datardos do not even have wg
installed), so server-to-server routes go over the public IPs, still protected
by the separate cluster route CA (mutual TLS). KV_REPLICAS is raised to 3 now
that the cluster runs at R3.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 18:56:28 +02:00
egutierrez 33746d9962 fix(cluster): make the JetStream control-plane survive a cold multi-node start
Bringing up the 3-node cluster from clean stores never converged: every node
looped on `open KV bucket "UNIBUS_rooms" (replicas=1): context deadline exceeded`.
Three independent defects in the clustered bootstrap path, none of which surface
on a single node (where JetStream is ready instantly), caused it:

1. embeddednats: route connection pooling (nats-server 2.10 default pool of 3)
   churned with "duplicate route"/"client closed" reconnects on the small cluster,
   interrupting the meta-group RAFT heartbeats and forcing perpetual leader
   re-elections. Set Cluster.PoolSize = -1 (single route per peer).

2. embeddednats: the cluster nodes are Docker hosts, so NATS advertised the docker
   bridge IPs (172.x / 10.0.x) to peers, which then tried to dial those private,
   mutually-unreachable addresses. Set Cluster.NoAdvertise = true so only the
   explicit public-IP routes are used. Also added a UNIBUS_NATS_DEBUG env toggle
   (off by default) that enables the embedded server's logger and loopback
   monitoring port for debugging the route/meta layer.

3. membership.OpenJetStream: a KV op is a NATS request/reply; on a cold cluster the
   op was published once, before the node had contact with the meta leader, so the
   request was dropped and the single long-context call just blocked until timeout.
   Retry each bucket op with short per-attempt contexts until it succeeds or an
   overall bootstrap budget (120s) is exhausted, so it lands once the meta settles.

With these the cluster forms cleanly, creates the KV buckets, scales R1->R3 in
place, and survives loss of one node (quorum 2/3). Verified on magnus+homer+datardos.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 18:56:28 +02:00
29 changed files with 3410 additions and 114 deletions
+4
View File
@@ -14,3 +14,7 @@ worker.id
/chat
*.exe
registry.db
# local worktree resolution (do not commit)
go.work
go.work.sum
+131 -1
View File
@@ -2,7 +2,7 @@
name: unibus
lang: go
domain: infra
version: 0.8.0
version: 0.12.0
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
tags: [service, messaging, nats, e2e]
uses_functions:
@@ -122,6 +122,41 @@ Para apuntar a un NATS externo en producción: `--nats-url nats://host:4222` en
las rutas GET de lectura. Confía en la red interna. Las rutas mutantes
(`/rooms`, `/invite`, `/rekey`) sí exigen firma Ed25519 del owner sobre los
bytes canónicos de la request. Endurecer es fase posterior.
- **Gestión de usuarios: storage unificado, alta por dos vías.** El allowlist de
usuarios vive en el MISMO store que las rooms (`pkg/membership.Store`): SQLite en
single-node, JetStream KV replicado (`UNIBUS_users`) en cluster. El `Server` ya
tiene ese store privilegiado abierto (es quien sirve el KV en cada nodo), así que
expone `GET/POST /users` y `POST /users/{signpub}/revoke` como API HTTP admin-only,
simétrica con las rutas de rooms: el panel de administración firma como admin y el
server ejecuta la mutación contra el mismo store. El panel NO necesita `--db`, ni la
identidad interna, ni correr en un nodo del cluster; funciona idéntico en single-node
y cluster. La autorización es default-deny: solo un firmante que el store confirma como
`role == "admin"` activo pasa, cualquier otro recibe 403 (encima de la firma+nonce+TLS
ya existentes). La CLI `membershipd user add --store kv` sigue existiendo SOLO para
sembrar el admin #0 (bootstrap del huevo-gallina: sin un admin sembrado no hay quién
firme el primer `POST /users`); a partir de ahí toda la gestión es HTTP admin-only. El
alta es idempotente igual que la CLI: re-alta de una clave ya registrada = 409, sin
sobrescribir ni elevar rol; el revoke es un flip de status (sin hard-delete), auditable.
- **Cuentas estilo WhatsApp: alta por invitación, baja por hard-delete.** Sobre la API
admin anterior, `unibus` añade el modelo wallet de cuentas. El admin NO genera claves:
`POST /invites` (admin-only) acuña un enlace de invitación de un solo uso con caducidad
(token de 32 bytes `crypto/rand` en hex; TTL default 7 días), fijando `handle` y `role`.
El nuevo usuario abre el enlace en SU cliente, que genera el par de claves localmente
(la privada nunca sale del dispositivo) y llama `POST /register` con `{token, sign_pub,
kex_pub}`. `/register` es la ÚNICA ruta que añade al allowlist sin firma admin —
autorizada por el TOKEN, porque la identidad nueva aún no está en el allowlist y no puede
firmar. Está endurecida: token fuerte de un solo uso (consumo atómico, doble uso → 409),
caducidad (→ 410), `handle`/`role` fijados por el invite (sin escalado), validación
estricta de ambas claves hex de 64 chars, y rate-limit por IP heredado del control plane
(solo `/healthz` está exento). El borrado de cuenta es `DELETE /users/{signpub}`
(admin-only): hard-delete real del allowlist, distinto del `revoke` (que se mantiene:
revoke = quitar acceso dejando rastro auditable; delete = purga). Tras hard-delete, las
membresías de rooms del ex-usuario quedan inertes (ya no puede autenticarse en ningún
plano); NO se limpian a medias — un owner expulsa/rekey su room si quiere forward secrecy.
Invites y users viven en el MISMO store (SQLite `invites`/`users`, KV `UNIBUS_invites`/
`UNIBUS_users`). `pkg/client` gana `CreateInvite/ListInvites/CancelInvite/Register/
DeleteUser`; solo `Register` va sin firmar. Recovery: hard-delete del último admin se
recupera con la CLI local `membershipd user add` (mismo seam que siembra el admin #0).
- **Identidad = secreto crítico.** El archivo de identidad (`worker.id`,
`chat.id`) contiene las claves privadas (Ed25519 + X25519). Se escribe 0600.
Perderlo = mensajes ilegibles, sin recuperación. Trátalo como una clave SSH.
@@ -154,6 +189,101 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
## Capability growth log
- v0.12.0 (2026-06-07) — capa de CUENTAS estilo WhatsApp sobre el modelo wallet: alta de
usuario por enlace de invitación de un solo uso + baja por hard-delete real. El admin
nunca ve la clave privada del usuario. (1) **Invites**: nuevo backend de datos en ambos
stores (SQLite `invites` vía migración aditiva `003_invites.sql`; KV `UNIBUS_invites`).
Tipo `Invite{Token, Handle, Role, ExpiresAt, Used, CreatedAt}` + campos de auditoría del
consumo (`UsedAt/UsedSignPub/UsedKexPub`). Métodos `Store.CreateInvite` (token 32 bytes
`crypto/rand` hex, TTL default 7d), `GetInvite`, `ListInvites`, `ConsumeInvite` (valida
existe/no-usado/no-caducado → registra el sign_pub con el handle/role del invite → marca
usado, atómico) y `CancelInvite`. Consumo single-use garantizado en ambos backends: tx
SQLite (mark guard `used=0` + insert) y CAS sobre la revisión KV (mark-first); burn-on-
claim idéntico si la clave ya existe. (2) **Hard-delete**: `Store.DeleteUser` (SQLite
`DELETE FROM users`, KV `users.Delete`) purga el allowlist — distinto del `revoke`
(status flip, conservado). Las membresías de rooms del ex-usuario quedan inertes
(documentado, sin limpieza parcial). (3) **Endpoints HTTP**: `POST /invites`, `GET
/invites` (solo pendientes), `DELETE /invites/{token}`, `DELETE /users/{signpub}`
(todos admin-only vía `requireAdmin`) y `POST /register` — la única ruta auth-exempt de
firma admin (autorizada por el token), rate-limited (se separa `isRateExempt`, solo
`/healthz`, de `isAuthExempt`) y con validación hex estricta de `sign_pub`+`kex_pub`
ANTES de gastar el token. Errores mapeados: token desconocido 404, usado 409, caducado
410, identidad ya registrada 409. (4) **pkg/client**: `CreateInvite/ListInvites/
CancelInvite/Register/DeleteUser`; `Register` va sin firma vía un helper `doUnsigned`.
(5) Fix de consistencia: el `GetUser` de SQLite ahora mapea `sql.ErrNoRows``ErrNotFound`
como el KV y como documenta `store.go`. Tests nuevos: suite de invites store-level en
AMBOS backends (golden + single-use + token desconocido + caducado + cancel + hard-delete
+ burn-on-claim), suite HTTP (crear invite → register sin auth → aparece en allowlist →
re-register 409 → caducado 410 → no-admin 403 en las 4 rutas admin → hard-delete purga),
y test de cliente end-to-end (admin acuña invite → joiner no-registrado redime sin firma →
aparece → hard-delete desaparece). Cambios 100% aditivos: el comportamiento previo no
cambia; build/vet/test verdes (`CGO_ENABLED=0`).
- v0.11.0 (2026-06-07) — flag dedicado `UNIBUS_NATS_MONITOR` que abre el endpoint
de monitoring HTTP del nats-server embebido (`127.0.0.1:8222`, loopback only) de
forma DESACOPLADA del debug-log. Antes el monitoring solo se abría con
`UNIBUS_NATS_DEBUG=1`, que además encendía el log verboso del nats-server
(rutas/RAFT/subjects a journald en claro) — incompatible con el endurecimiento
del issue 0007. El cómputo de los toggles se extrae a una función pura
`natsLogOpts(debugEnv, monitorEnv) (noLog, debug, trace, monitor)`: `MONITOR=1`
abre el endpoint dejando el log en silencio (`NoLog` true / `Debug` false), y se
mantiene el acoplamiento inverso por compatibilidad (`DEBUG` sigue implicando
`MONITOR`). El bind loopback `127.0.0.1` queda hardcoded — el monitoring NUNCA es
público y no lleva auth; lo lee un scraper local que empuja a VictoriaMetrics
(dashboard `unibus-nats` en `fleet_monitoring`). Se versiona el cableado de
deploy: drop-in systemd aditivo `membershipd-cluster.service.d/nats-monitor.conf`
(`Environment=UNIBUS_NATS_MONITOR=1`) + sección "NATS server metrics" en el
README del cluster con el runbook de activación rolling (magnus→homer→datardos)
y gate de reconvergencia R3 (`followers 2/2`) entre nodos. Tests nuevos: tabla
pura del desacoplamiento (monitor on ⇒ log NO debug; debug ⇒ monitor; default
cerrado) + server real con `MONITOR=1` que confirma `/varz` 200 en loopback:8222
y server sin flag con el endpoint cerrado. Cambios 100% aditivos: sin el flag el
comportamiento es idéntico; build/test verdes.
- v0.10.0 (2026-06-07) — API HTTP admin-only de gestión de usuarios, cerrando la
última asimetría del control plane: las rooms tenían superficie HTTP firmada
(`POST /rooms`, etc.) pero los users solo se gestionaban por CLI local o acceso
directo al store. Se añaden `GET /users` (lista completa, incluidos revocados),
`POST /users` (alta `{sign_pub, handle, role}`: valida hex de 64 chars + role en
`{admin, member}`, 409 idempotente que no sobrescribe ni eleva rol) y
`POST /users/{signpub}/revoke` (flip de status, sin hard-delete). Los tres pasan por
un helper `requireAdmin` default-deny que confirma contra el store que el firmante
autenticado es un user `role == "admin"` activo (el endpoint id es un hash one-way de
la clave, así que el contexto lleva ahora también el `sign_pub` hex del firmante para
resolver `GetUser`); cualquier otro firmante recibe 403, encima de la firma+nonce+TLS+
enforce ya heredadas del middleware. NO se abre conexión KV nueva ni se usa la identidad
interna: el server escribe vía su `s.store` privilegiado, el MISMO que las rooms (SQLite
single-node, KV `UNIBUS_users` en cluster). `pkg/client` gana `ListUsers/AddUser/RevokeUser`
(tipo plano `UserInfo`) firmando como admin, así la pestaña Users del panel deja de
necesitar `--db`/acceso KV directo. La CLI `membershipd user add --store kv` queda SOLO
para sembrar el admin #0 (bootstrap). La validación de `sign_pub` se unifica en
`membership.ValidateSignPubHex`, reusada por la CLI y los handlers. Tests nuevos:
no-admin → 403 en los tres endpoints, roundtrip admin add→list→revoke, y validación
(hex inválido → 400, role inválido → 400, re-alta → 409), más un test de cliente contra
un membershipd embebido. Cambios 100% aditivos: el comportamiento single-node y de las
rutas de rooms no cambia; vet/build/test verdes.
- v0.9.0 (2026-06-07) — cierre de los gaps que el despliegue del cluster (report
0011) dejó abiertos (report 0012). (GAP A) Nueva capability `membershipd user
add|list|revoke --store kv`: alta/baja de usuarios contra el KV replicado del
cluster EN MARCHA, sin el procedimiento de parar-sembrar-rearrancar. Usa la
conexión interna privilegiada — el daemon persiste su identidad de servicio con
`--internal-id-file` (cada nodo genera/carga la suya, 0600 junto a las claves TLS)
y la CLI, ejecutada por loopback en un nodo, presenta esa nkey que el
autenticador reconoce con permisos plenos de JetStream; ninguna identidad de
usuario normal puede tocar los buckets `KV_UNIBUS_*` bajo la ACL por-subject. El
alta es idempotente (re-alta de la misma clave = `ErrUserExists` explícito, sin
sobrescribir ni elevar rol), commitea con quórum 2/3 (HA, imprime
`followers_current`) y rechaza un destino remoto sin `--ca` (igual que
`migrate-to-kv`). (GAP B) Nuevo `cmd/clientcheck`: verificación end-to-end real
con un cliente autenticado (identidad operator, nkey+TLS+https) que crea una room
E2E, publica y recibe descifrado contra el cluster vivo, incluido un nodo parado a
media transmisión donde el cliente hace failover a un superviviente y sigue
recibiendo con cero pérdida (quórum 2/3) — el plano de datos que el chaos test del
0011 nunca probó. (GAP C) Runbook `deploy/cluster/README.md` corregido: el orden
de arranque "magnus solo y verifica healthz" deadlockeaba (un nodo solo no tiene
quórum del meta-group y nunca sirve healthz); se documenta el arranque por quórum,
que R1 es un SPOF inservible (ir directo a R3) y la nueva vía de alta con el
cluster vivo. La plantilla de deploy (unit + `deploy-cluster.sh`) emite ya
`INTERNAL_ID_FILE` y el flag. Verificado contra los 3 VPS reales (magnus + homer +
datardos); posture enforce+ACL+TLS+R3 intacta.
- v0.8.0 (2026-06-07) — completar y endurecer el cluster (issue 0006, fases
0006a0006g) que cierra los bloqueantes de la auditoría dedicada del cluster
(report 0008) y cablea el control plane descentralizado que 0003 dejó a medias.
+260
View File
@@ -0,0 +1,260 @@
// Command clientcheck is an end-to-end verification client for a live unibus
// cluster (issue 0011 GAP B). The 0011 chaos test validated only the control
// plane (healthz + meta/stream-leader failover + KV readable with 2/3); it never
// connected an authenticated bus client (nkey + TLS) to create a room and
// publish/subscribe through it, least of all across a node loss. clientcheck does
// exactly that with a real identity (the operator), so the data-plane end-to-end
// path — connect, create an E2E room, publish, receive decrypted — is exercised
// against the running cluster, including while a node is stopped.
//
// It is a reusable tool, not a throwaway script: point it at the cluster's CA,
// an identity file, and the NATS + control-plane seed lists.
//
// # golden: connect, create an E2E room, publish N, confirm N decrypted back
// clientcheck --ca ca.crt --identity-file operator.id \
// --nats-seeds nats://A:4250,nats://B:4250,nats://C:4250 \
// --ctrl-seeds https://A:8470,https://B:8470,https://C:8470 --messages 5
//
// # loop: publish a counter every interval for the duration, logging the node
// # it is attached to — stop a node mid-run (systemctl stop membershipd-cluster)
// # and watch it fail over to a survivor and keep receiving (quorum 2/3).
// clientcheck ... --mode loop --duration 45s --interval 1s
package main
import (
"crypto/rand"
"encoding/hex"
"flag"
"fmt"
"log"
"sort"
"strings"
"sync"
"time"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/room"
)
func main() {
var (
caPath = flag.String("ca", "", "bus CA cert pinning TLS on both planes (required for a secured cluster)")
idFile = flag.String("identity-file", "", "path to the client identity JSON (e.g. `pass show unibus/operator-identity` written 0600) (required)")
natsSeeds = flag.String("nats-seeds", "", "comma-separated NATS urls of the cluster nodes (required)")
ctrlSeeds = flag.String("ctrl-seeds", "", "comma-separated control-plane https urls of the cluster nodes (required)")
subject = flag.String("subject", "test.gapcheck", "test room subject PREFIX; a random token is appended so runs never collide with real rooms")
messages = flag.Int("messages", 5, "golden mode: number of messages to publish and expect back")
mode = flag.String("mode", "golden", "golden (publish N, verify N decrypted) | loop (publish a counter for --duration, for failover testing)")
duration = flag.Duration("duration", 30*time.Second, "loop mode: how long to keep publishing")
interval = flag.Duration("interval", 1*time.Second, "loop mode: delay between published messages")
)
flag.Parse()
if *idFile == "" || *natsSeeds == "" || *ctrlSeeds == "" {
log.Fatalf("clientcheck: --identity-file, --nats-seeds and --ctrl-seeds are required")
}
id, err := client.LoadIdentity(*idFile)
if err != nil {
log.Fatalf("clientcheck: load identity: %v", err)
}
natsList := splitCSV(*natsSeeds)
ctrlList := splitCSV(*ctrlSeeds)
if len(natsList) == 0 || len(ctrlList) == 0 {
log.Fatalf("clientcheck: empty --nats-seeds or --ctrl-seeds")
}
// Build the secure client options: nkey on the data plane, TLS pinned to the
// bus CA on both planes, and the FULL seed lists so nats.go fails over to a
// surviving node when the attached one dies (the failover this tool verifies).
opts := client.Options{
NatsServers: natsList[1:],
CtrlURLs: ctrlList[1:],
}
if *caPath != "" {
tlsCfg, err := busauth.LoadCATLSConfig(*caPath)
if err != nil {
log.Fatalf("clientcheck: load CA: %v", err)
}
opts.UseNkey = true
opts.TLS = tlsCfg
opts.CtrlTLS = tlsCfg
for _, u := range ctrlList {
if !strings.HasPrefix(u, "https://") {
log.Fatalf("clientcheck: control URL %q must be https:// when --ca is set", u)
}
}
}
c, err := client.NewWithOptions(natsList[0], ctrlList[0], id, opts)
if err != nil {
log.Fatalf("clientcheck: connect: %v", err)
}
defer c.Close()
log.Printf("connected: endpoint=%s nats=%s", c.Endpoint().ID, c.ConnectedServer())
// Create an EPHEMERAL E2E room (encrypted + signed, NOT persisted): the test
// stays end-to-end encrypted (the cluster requires encryption on a public
// bind) while leaving no durable JetStream stream behind. The random subject
// token guarantees the room is unique and never a real room.
rnd := make([]byte, 8)
if _, err := rand.Read(rnd); err != nil {
log.Fatalf("clientcheck: random: %v", err)
}
subj := fmt.Sprintf("%s.%s", *subject, hex.EncodeToString(rnd))
policy := room.Policy{Encrypt: true, Persist: false, SignMsgs: true}
roomID, err := c.CreateRoom(subj, policy)
if err != nil {
log.Fatalf("clientcheck: create room: %v", err)
}
log.Printf("created E2E room: id=%s subject=%s (encrypt=%v sign=%v persist=%v)", roomID, subj, policy.Encrypt, policy.SignMsgs, policy.Persist)
// Under the per-subject ACL, NATS freezes permissions at connect time, so the
// just-created room's subject is not yet publishable/subscribable on the live
// connection. RefreshSession reconnects so the authenticator re-derives the
// ACL (now including this room) — the post-0006 contract every client follows
// after a membership change.
if err := c.RefreshSession(); err != nil {
log.Fatalf("clientcheck: refresh session: %v", err)
}
switch *mode {
case "golden":
runGolden(c, roomID, *messages)
case "loop":
runLoop(c, roomID, *duration, *interval)
default:
log.Fatalf("clientcheck: --mode must be golden or loop, got %q", *mode)
}
}
// runGolden subscribes, publishes n messages, and asserts all n come back
// decrypted. Exits non-zero if any are missing.
func runGolden(c *client.Client, roomID string, n int) {
var mu sync.Mutex
got := map[string]bool{}
sub, err := c.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
mu.Lock()
got[string(plaintext)] = true
mu.Unlock()
})
if err != nil {
log.Fatalf("clientcheck: subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(300 * time.Millisecond) // let the subscription settle
want := make([]string, n)
for i := 0; i < n; i++ {
msg := fmt.Sprintf("gapcheck-e2e-%d", i)
want[i] = msg
if err := c.Publish(roomID, []byte(msg)); err != nil {
log.Fatalf("clientcheck: publish %d: %v", i, err)
}
}
log.Printf("published %d messages to %s; waiting for decrypted echoes...", n, roomID)
deadline := time.Now().Add(15 * time.Second)
for time.Now().Before(deadline) {
mu.Lock()
have := len(got)
mu.Unlock()
if have >= n {
break
}
time.Sleep(100 * time.Millisecond)
}
mu.Lock()
defer mu.Unlock()
missing := 0
for _, w := range want {
if !got[w] {
missing++
log.Printf(" MISSING: %q", w)
}
}
log.Printf("connected node at finish: %s", c.ConnectedServer())
if missing > 0 {
log.Fatalf("GOLDEN FAIL: %d/%d messages not received decrypted", missing, n)
}
log.Printf("GOLDEN OK: all %d messages received and decrypted end-to-end", n)
}
// runLoop publishes a numbered message every interval for the duration and logs
// the count received plus the node currently attached, so an operator stopping a
// cluster node mid-run sees the client fail over to a survivor and keep receiving
// (quorum 2/3). It is the live failover-with-a-connected-client test the 0011
// chaos run never performed.
func runLoop(c *client.Client, roomID string, duration, interval time.Duration) {
var mu sync.Mutex
received := 0
servers := map[string]int{} // node -> #ticks observed attached
sub, err := c.Subscribe(roomID, func(_ frame.Frame, _ []byte) {
mu.Lock()
received++
mu.Unlock()
})
if err != nil {
log.Fatalf("clientcheck: subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(300 * time.Millisecond)
log.Printf("loop: publishing every %s for %s — stop a node now to test failover", interval, duration)
end := time.Now().Add(duration)
sent := 0
for time.Now().Before(end) {
msg := fmt.Sprintf("gapcheck-loop-%d", sent)
err := c.Publish(roomID, []byte(msg))
sent++
mu.Lock()
recv := received
mu.Unlock()
node := c.ConnectedServer()
up := c.IsConnected()
if node != "" {
mu.Lock()
servers[node]++
mu.Unlock()
}
pubStatus := "ok"
if err != nil {
pubStatus = "ERR:" + err.Error()
}
log.Printf(" t=%2ds sent=%d recv=%d up=%v node=%s publish=%s",
sent, sent, recv, up, node, pubStatus)
time.Sleep(interval)
}
mu.Lock()
defer mu.Unlock()
log.Printf("loop done: sent=%d received=%d", sent, received)
nodes := make([]string, 0, len(servers))
for n := range servers {
nodes = append(nodes, n)
}
sort.Strings(nodes)
for _, n := range nodes {
log.Printf(" attached to %s for %d ticks", n, servers[n])
}
if len(servers) > 1 {
log.Printf("FAILOVER OBSERVED: client was attached to %d distinct nodes across the run", len(servers))
}
if received == 0 {
log.Fatalf("LOOP FAIL: received 0 messages")
}
log.Printf("LOOP OK: client kept receiving across the run (received=%d)", received)
}
func splitCSV(s string) []string {
var out []string
for _, p := range strings.Split(s, ",") {
if p = strings.TrimSpace(p); p != "" {
out = append(out, p)
}
}
return out
}
+152
View File
@@ -0,0 +1,152 @@
package main
// Integration tests for issue 0011 GAP A: `membershipd user add --store kv`
// adds users to a RUNNING cluster's replicated allowlist via the privileged
// internal connection, instead of the stop-seed-restart procedure the 0011
// deploy required. These exercise the real connectKVStore path (load the
// persisted internal identity from a file, present its nkey, open the KV store,
// write the user) against an embedded enforce node, plus the idempotency and
// error semantics the DoD calls for. Multi-node replication and node-down quorum
// are validated against the live cluster (report 0012).
import (
"encoding/hex"
"errors"
"path/filepath"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/membership"
)
// startEnforceKVNode boots a single embedded enforce node whose authenticator
// recognizes internalPubHex as the privileged internal identity, bootstraps the
// KV control-plane store over the in-process internal connection, and publishes
// it into the holder — the exact sequence main.go performs for --store kv. It
// returns the client URL the CLI connects to.
func startEnforceKVNode(t *testing.T, internalID cs.Identity) string {
t.Helper()
holder := &storeHolder{}
auth := busauth.NewNkeyAuthenticatorACLInternal(
holder.IsAuthorized,
busauth.PermissionsFromSubjects(holder.subjectACL),
hex.EncodeToString(internalID.SignPub),
)
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t), Auth: auth,
})
if err != nil {
t.Fatalf("start enforce node: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
intNC, js, err := connectInternalJS(ns, internalID, true)
if err != nil {
t.Fatalf("bootstrap internal connection: %v", err)
}
t.Cleanup(intNC.Close)
kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
if err != nil {
t.Fatalf("bootstrap KV store: %v", err)
}
holder.set(kvStore)
return ns.ClientURL()
}
// TestUserAddStoreKV_GoldenAndIdempotent is the GAP A golden + edge-1: the CLI
// connection (real connectKVStore, loading the internal identity from a file and
// presenting its nkey) writes a user into the live KV allowlist, the user is
// authorized afterward, and re-adding the same key is an explicit ErrUserExists
// with no corruption (the unchanged row is still authorized).
func TestUserAddStoreKV_GoldenAndIdempotent(t *testing.T) {
idFile := filepath.Join(t.TempDir(), "internal.id")
internalID, err := client.LoadOrCreateIdentity(idFile) // persists 0600
if err != nil {
t.Fatalf("persist internal identity: %v", err)
}
url := startEnforceKVNode(t, internalID)
// Golden: connect as the privileged internal identity (loopback, no TLS) and
// add a new user, exactly as `user add --store kv` does.
kv, err := connectKVStore(url, idFile, "", 1)
if err != nil {
t.Fatalf("connectKVStore (privileged): %v", err)
}
defer kv.Close()
newUser, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("new user identity: %v", err)
}
pub := hex.EncodeToString(newUser.SignPub)
if err := kv.store.AddUser(pub, "gapcheck_user", membership.RoleMember); err != nil {
t.Fatalf("add user to live KV: %v", err)
}
if !kv.store.IsAuthorized(pub) {
t.Fatalf("user added to KV must be authorized")
}
// Edge 1: re-adding the same key is a clean, non-destructive ErrUserExists.
err = kv.store.AddUser(pub, "gapcheck_user", membership.RoleMember)
if !errors.Is(err, membership.ErrUserExists) {
t.Fatalf("re-add must return ErrUserExists (idempotent), got %v", err)
}
// A different handle/role with the SAME key is also rejected — the row is not
// silently overwritten (no role flip).
if err := kv.store.AddUser(pub, "impostor", membership.RoleAdmin); !errors.Is(err, membership.ErrUserExists) {
t.Fatalf("re-add with a different role must NOT overwrite; want ErrUserExists, got %v", err)
}
u, err := kv.store.GetUser(pub)
if err != nil {
t.Fatalf("get user: %v", err)
}
if u.Handle != "gapcheck_user" || u.Role != membership.RoleMember || u.Status != membership.StatusActive {
t.Fatalf("idempotent re-add corrupted the row: %+v", u)
}
}
// TestUserAddStoreKV_RequiresInternalIdentity: --store kv without a usable
// internal identity file fails loudly (missing file, empty path) rather than
// silently connecting unprivileged.
func TestUserAddStoreKV_RequiresInternalIdentity(t *testing.T) {
if _, err := connectKVStore("nats://127.0.0.1:4250", "", "", 1); err == nil {
t.Fatalf("empty --internal-id-file must be an error")
}
missing := filepath.Join(t.TempDir(), "nope.id")
if _, err := connectKVStore("nats://127.0.0.1:4250", missing, "", 1); err == nil {
t.Fatalf("missing internal identity file must be an error")
}
}
// TestUserAddStoreKV_UnreachableKV is the GAP A error case: pointing --store kv
// at a dead endpoint yields a clear, handled error (no crash, no silent success).
func TestUserAddStoreKV_UnreachableKV(t *testing.T) {
idFile := filepath.Join(t.TempDir(), "internal.id")
if _, err := client.LoadOrCreateIdentity(idFile); err != nil {
t.Fatalf("persist internal identity: %v", err)
}
// A loopback port with nothing listening: connect must fail fast and wrapped.
_, err := connectKVStore("nats://127.0.0.1:1/", idFile, "", 1)
if err == nil {
t.Fatalf("connecting to a dead endpoint must error")
}
}
// TestUserAddStoreKV_RemoteWithoutCARefused: a non-loopback target without --ca
// is refused so the allowlist write never travels in cleartext (audit 0008 N6,
// same guard as migrate-to-kv).
func TestUserAddStoreKV_RemoteWithoutCARefused(t *testing.T) {
idFile := filepath.Join(t.TempDir(), "internal.id")
if _, err := client.LoadOrCreateIdentity(idFile); err != nil {
t.Fatalf("persist internal identity: %v", err)
}
_, err := connectKVStore("nats://203.0.113.1:4250", idFile, "", 1)
if err == nil {
t.Fatalf("remote target without --ca must be refused")
}
}
+27 -3
View File
@@ -24,6 +24,7 @@ import (
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/membership"
)
@@ -83,6 +84,17 @@ func main() {
// "kv" puts rooms/members/keys/users in replicated JetStream KV so any node
// in the cluster serves the same state.
storeBackend = flag.String("store", "sqlite", "control-plane store backend: sqlite (default, single-node) | kv (replicated JetStream, decentralized)")
// Persisted internal service identity (issue 0011 gaps, GAP A): when set, the
// privileged internal identity used to manage JetStream is LOADED from this
// file (generated and persisted on first start) instead of being a fresh
// ephemeral key each boot. Persisting it is what lets `membershipd user add
// --store kv` write the replicated allowlist of a LIVE cluster: that CLI,
// run over loopback on a node, loads the SAME identity and presents the nkey
// this node's authenticator already grants full permissions. Empty keeps the
// ephemeral-per-process behavior (single-node/dev default, unchanged). The
// file holds a private key: it is written 0600 and belongs next to the node's
// TLS keys (deploy keeps it under secrets/, gitignored).
internalIDFile = flag.String("internal-id-file", "", "path to a persisted internal service identity (JSON); enables `membershipd user add --store kv` against the live cluster. Empty = ephemeral per-process identity (dev default)")
)
flag.Parse()
@@ -136,9 +148,21 @@ func main() {
var internalID cs.Identity
var internalPubHex string
if needJS && enforce && *natsURL == "" {
internalID, err = cs.GenerateIdentity()
if err != nil {
log.Fatalf("generate internal identity: %v", err)
if *internalIDFile != "" {
// Persisted identity: load it, generating + writing it (0600) on first
// start. A stable internal key is what `user add --store kv` presents to
// add users to a live cluster (GAP A); rotate it by deleting the file and
// restarting.
internalID, err = client.LoadOrCreateIdentity(*internalIDFile)
if err != nil {
log.Fatalf("load internal service identity %q: %v", *internalIDFile, err)
}
log.Printf("internal service identity: persisted (%s)", *internalIDFile)
} else {
internalID, err = cs.GenerateIdentity()
if err != nil {
log.Fatalf("generate internal identity: %v", err)
}
}
internalPubHex = hex.EncodeToString(internalID.SignPub)
}
+83 -16
View File
@@ -1,7 +1,7 @@
package main
import (
"encoding/hex"
"errors"
"flag"
"fmt"
"os"
@@ -50,13 +50,26 @@ commands:
list List all registered users
revoke Revoke a user (denies access on both planes immediately)
store backends (--store):
sqlite local SQLite database (default; seeds the first admin offline)
kv the RUNNING cluster's replicated JetStream KV allowlist, via the
privileged internal connection — add users with the cluster live,
no stop-seed-restart needed (run over loopback/SSH on a node)
examples:
membershipd user add --handle alice --sign-pub <64-hex> --role admin
membershipd user list
membershipd user add --store kv --handle bob --sign-pub <64-hex> --role member
membershipd user list --store kv
membershipd user revoke <64-hex>
common flags:
--db <path> SQLite database path (default ./local_files/unibus.db)
--db <path> SQLite database path (--store sqlite; default ./local_files/unibus.db)
--store kv flags (defaults assume an on-node invocation):
--nats-url <url> cluster NATS (default nats://127.0.0.1:4250)
--internal-id-file <path> persisted internal service identity (default /opt/unibus/secrets/internal.id)
--ca <path> CA cert pinning the data-plane TLS (default /opt/unibus/tls/ca.crt)
--kv-replicas <n> KV replication factor, match the cluster (default 3)
`)
}
@@ -76,16 +89,56 @@ func openStore(path string) membership.Store {
// validateSignPubHex ensures the key is exactly a 32-byte Ed25519 public key in
// hex (64 hex chars). Catching this here turns a silent "authorized nobody" into
// an explicit error at seed time.
// an explicit error at seed time. It delegates to membership.ValidateSignPubHex
// so the CLI and the HTTP user-management handlers share one rule.
func validateSignPubHex(signPub string) error {
b, err := hex.DecodeString(signPub)
if err != nil {
return fmt.Errorf("sign-pub is not valid hex: %w", err)
return membership.ValidateSignPubHex(signPub)
}
// kvFlags holds the connection flags shared by the --store kv path of the user
// subcommands. registerKVFlags wires them onto a flag set so add and list expose
// an identical interface.
type kvFlags struct {
store *string
natsURL *string
internalID *string
ca *string
replicas *int
}
func registerKVFlags(fs *flag.FlagSet) kvFlags {
return kvFlags{
store: fs.String("store", "sqlite", "user store backend: sqlite (local DB) | kv (the live cluster's replicated allowlist)"),
natsURL: fs.String("nats-url", defaultClusterNatsURL, "cluster NATS url for --store kv"),
internalID: fs.String("internal-id-file", defaultInternalIDFile, "persisted internal service identity for --store kv"),
ca: fs.String("ca", defaultClusterCAFile, "CA cert pinning TLS on the --store kv NATS connection"),
replicas: fs.Int("kv-replicas", 3, "KV replication factor for --store kv (match the cluster)"),
}
if len(b) != 32 {
return fmt.Errorf("sign-pub must be a 32-byte Ed25519 public key (64 hex chars), got %d bytes", len(b))
}
// resolveStore returns the membership store for the chosen backend plus a cleanup
// func. For --store kv it opens the privileged connection to the live cluster; for
// sqlite it opens the local file. It exits the process with a clear message on any
// failure (a dead NATS, a missing identity file), so a broken --store kv add fails
// loudly instead of silently — Error case of the GAP A DoD. The returned *kvConn
// is non-nil only for the kv backend (so the caller can report replication).
func resolveStore(cmd string, kf kvFlags, dbPath string) (membership.Store, *kvConn, func()) {
switch *kf.store {
case "sqlite":
store := openStore(dbPath)
return store, nil, func() { store.Close() }
case "kv":
kv, err := connectKVStore(*kf.natsURL, *kf.internalID, *kf.ca, *kf.replicas)
if err != nil {
fmt.Fprintf(os.Stderr, "membershipd %s: --store kv: %v\n", cmd, err)
os.Exit(1)
}
return kv.store, kv, kv.Close
default:
fmt.Fprintf(os.Stderr, "membershipd %s: --store must be \"sqlite\" or \"kv\", got %q\n", cmd, *kf.store)
os.Exit(2)
return nil, nil, func() {}
}
return nil
}
func userAdd(args []string) {
@@ -94,6 +147,7 @@ func userAdd(args []string) {
signPub := fs.String("sign-pub", "", "Ed25519 signing public key in hex (required)")
role := fs.String("role", membership.RoleMember, "role: admin or member")
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
kf := registerKVFlags(fs)
_ = fs.Parse(args)
if *handle == "" || *signPub == "" {
@@ -105,23 +159,35 @@ func userAdd(args []string) {
os.Exit(2)
}
store := openStore(*dbPath)
defer store.Close()
store, kv, closeStore := resolveStore("user add", kf, *dbPath)
defer closeStore()
if err := store.AddUser(*signPub, *handle, *role); err != nil {
if errors.Is(err, membership.ErrUserExists) {
// Idempotency contract (GAP A): re-adding the same key is an EXPLICIT,
// non-destructive error — the existing row is left untouched (no silent
// upsert that could flip a role or clobber status, which would corrupt the
// allowlist). To replace a user, `user revoke <key>` then add again.
fmt.Fprintf(os.Stderr, "membershipd user add: user %s already registered (unchanged); revoke it first to replace\n", *signPub)
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "membershipd user add: %v\n", err)
os.Exit(1)
}
fmt.Printf("added user %q (%s) role=%s\n", *handle, *signPub, *role)
if kv != nil {
reportKVReplication(kv.js)
}
}
func userList(args []string) {
fs := flag.NewFlagSet("user list", flag.ExitOnError)
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
kf := registerKVFlags(fs)
_ = fs.Parse(args)
store := openStore(*dbPath)
defer store.Close()
store, _, closeStore := resolveStore("user list", kf, *dbPath)
defer closeStore()
users, err := store.ListUsers()
if err != nil {
@@ -143,6 +209,7 @@ func userList(args []string) {
func userRevoke(args []string) {
fs := flag.NewFlagSet("user revoke", flag.ExitOnError)
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
kf := registerKVFlags(fs)
// Go's flag package stops at the first non-flag argument, so `revoke <key>
// --db path` would otherwise leave --db unparsed. Pull a leading positional
@@ -167,8 +234,8 @@ func userRevoke(args []string) {
os.Exit(2)
}
store := openStore(*dbPath)
defer store.Close()
store, _, closeStore := resolveStore("user revoke", kf, *dbPath)
defer closeStore()
if err := store.RevokeUser(signPub); err != nil {
fmt.Fprintf(os.Stderr, "membershipd user revoke: %v\n", err)
+151
View File
@@ -0,0 +1,151 @@
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// users_kv.go is the `--store kv` half of the user administration CLI (issue 0011
// gaps, GAP A): adding and listing bus users directly against the RUNNING
// cluster's replicated JetStream KV allowlist, with no need to stop the cluster,
// seed a standalone node, and restart (the procedure the 0011 deploy required).
//
// The mechanism is the cluster's own privileged internal connection. Under
// enforce every bus user is confined by the per-subject ACL to the JetStream API
// of its own rooms, so no ordinary identity may touch the control-plane buckets
// (KV_UNIBUS_*). The ONLY identity the authenticator grants full JetStream
// permissions is membershipd's internal service identity. By persisting that
// identity to a file (membershipd --internal-id-file) the same key becomes
// available to this CLI, which presents it as its NATS nkey and is therefore
// recognized as the privileged internal client and allowed to read/write the KV.
//
// Intended invocation is over loopback on a cluster node (SSH): the data-plane
// TLS certificate's SAN covers 127.0.0.1/localhost and the internal identity file
// lives 0600 next to the node's TLS keys. Using the file requires root on the
// node, which already implies full control of that node — so co-locating it adds
// no practical exposure beyond what the TLS server key and cluster password
// already represent.
// defaultClusterNatsURL is the node-local NATS listener. The CLI is meant to run
// on a cluster node over SSH, talking to that node's own embedded server.
const defaultClusterNatsURL = "nats://127.0.0.1:4250"
// Deploy-default paths for the privileged identity and the data-plane CA, so an
// on-node invocation needs only --handle/--sign-pub/--role. Override for other
// layouts.
const (
defaultInternalIDFile = "/opt/unibus/secrets/internal.id"
defaultClusterCAFile = "/opt/unibus/tls/ca.crt"
)
// kvConn bundles the privileged NATS connection to a live cluster and the
// KV-backed control-plane store opened over it. Close releases both.
type kvConn struct {
nc *nats.Conn
js jetstream.JetStream
store membership.Store
}
func (k *kvConn) Close() {
if k == nil {
return
}
if k.store != nil {
_ = k.store.Close()
}
if k.nc != nil {
k.nc.Close()
}
}
// connectKVStore opens the privileged internal connection to the cluster's NATS
// and the JetStream KV control-plane store on top of it. internalIDFile is the
// membershipd-persisted internal service identity whose nkey the authenticator
// grants full permissions; caPath pins the data-plane TLS (empty only for a
// non-TLS dev cluster). A non-loopback target without --ca is refused, mirroring
// migrate-to-kv (audit 0008 N6): the allowlist write must not travel in cleartext.
func connectKVStore(natsURL, internalIDFile, caPath string, replicas int) (*kvConn, error) {
if internalIDFile == "" {
return nil, fmt.Errorf("--internal-id-file is required for --store kv (the privileged identity membershipd persists with --internal-id-file)")
}
// Confidentiality guard: a remote NATS without TLS would expose the allowlist
// (handles/roles/sign-pubs) and the privileged nkey handshake in cleartext.
if !isLoopbackURL(natsURL) && caPath == "" {
return nil, fmt.Errorf("refusing to connect to remote %q without --ca: the allowlist write would travel in cleartext — pin TLS with --ca, or run over a loopback --nats-url on a node", natsURL)
}
id, err := client.LoadIdentity(internalIDFile)
if err != nil {
return nil, fmt.Errorf("load internal identity: %w", err)
}
nkeyPub, nkeySign, err := busauth.ClientNkey(id.SignPriv)
if err != nil {
return nil, fmt.Errorf("derive nkey from internal identity: %w", err)
}
opts := []nats.Option{
nats.Name("membershipd-user-cli"),
nats.Nkey(nkeyPub, nkeySign),
}
if caPath != "" {
tlsCfg, err := busauth.LoadCATLSConfig(caPath)
if err != nil {
return nil, fmt.Errorf("load CA %q: %w", caPath, err)
}
opts = append(opts, nats.Secure(tlsCfg))
}
nc, err := nats.Connect(natsURL, opts...)
if err != nil {
return nil, fmt.Errorf("connect cluster NATS %q: %w", natsURL, err)
}
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
return nil, fmt.Errorf("jetstream: %w", err)
}
store, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: replicas})
if err != nil {
nc.Close()
return nil, fmt.Errorf("open KV control-plane store: %w", err)
}
return &kvConn{nc: nc, js: js, store: store}, nil
}
// reportKVReplication prints the replication status of the allowlist bucket
// stream (KV_UNIBUS_users) right after a write, so the operator sees the add
// landed on a quorum and replicated to the followers — executable evidence that
// the live-cluster add is HA, not single-node. Best-effort: a read failure is a
// note, not an error (the write itself already succeeded).
func reportKVReplication(js jetstream.JetStream) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
st, err := js.Stream(ctx, "KV_UNIBUS_users")
if err != nil {
fmt.Fprintf(os.Stderr, "note: could not read KV_UNIBUS_users stream info: %v\n", err)
return
}
info, err := st.Info(ctx)
if err != nil {
fmt.Fprintf(os.Stderr, "note: could not read KV_UNIBUS_users stream info: %v\n", err)
return
}
if info.Cluster == nil {
fmt.Printf("KV_UNIBUS_users: standalone (R1, no cluster replication); msgs=%d\n", info.State.Msgs)
return
}
current := 0
for _, r := range info.Cluster.Replicas {
if r.Current {
current++
}
}
fmt.Printf("KV_UNIBUS_users: leader=%s followers_current=%d/%d msgs=%d\n",
info.Cluster.Leader, current, len(info.Cluster.Replicas), info.State.Msgs)
}
+201 -39
View File
@@ -5,9 +5,12 @@ This directory holds the material to bring up unibus as a **3-node cluster**
plane (rooms/members/keys/users on JetStream KV + the anti-replay nonce bucket)
survives the loss of any one node (quorum 2/3).
> **The agent that authored this never touched a VPS.** Every step that changes a
> remote host is marked **HUMAN** and is executed by the operator. `deploy-cluster.sh`
> defaults to a dry run.
> **Status: this cluster is DEPLOYED in production** (magnus + homer + datardos,
> R3, enforce+ACL+TLS) — see report 0011. The runbook below was authored before any
> VPS existed and has since been **corrected against the real deploy** (report 0012):
> the start ordering, the R1→R3 reality, and the live user-add path were all wrong
> or missing. Steps that change a remote host are marked **HUMAN**; `deploy-cluster.sh`
> still defaults to a dry run.
## Files
@@ -22,18 +25,22 @@ Generated keys/secrets (`out/`, `build/`, `secrets/`) are **gitignored** — the
secret and never leave the operator's trusted machine except over the secure
rsync channel.
## Topology
## Topology (as deployed, report 0011)
| Node | SSH | Public IP | WireGuard IP | Role |
|---|---|---|---|---|
| magnus | `magnus` | `<MAGNUS_PUBLIC_IP>` | `<MAGNUS_WG_IP>` | seed (first up) |
| homer | `homer` | `141.94.69.66` | `<HOMER_WG_IP>` | replica |
| datardos | `dd` | `51.91.100.142` | `<DATARDOS_WG_IP>` (10.21.0.x) | replica |
| Node | SSH | Public IP | Role |
|---|---|---|---|
| magnus | `magnus` (root) | `135.125.201.30` | node — **= organic-machine.com = `om`**, the critical host (caddy + gitea + registry-api + monitoring); the bus runs alongside, untouched |
| homer | `homer` (ubuntu+sudo) | `141.94.69.66` | node |
| datardos | `dd` (ubuntu+sudo) | `51.91.100.142` | node |
The route layer (server-to-server) prefers the **WireGuard mesh**
(`ROUTE_NETWORK=wg`); the client data plane and the HTTP control plane are reached
over the public IPs. The route CA is **separate** from the client CA, so a client
cert can never be presented to the route port.
`ROUTE_NETWORK=public`, **not `wg`**: there is no WireGuard mesh between the three
nodes (homer and datardos do not even have the `wg` binary; om's only WG peers are
the operator's PCs). The server-to-server routes therefore travel over the public
IPs, protected by the **separate cluster route CA** (mutual route TLS) — a client
data-plane cert can never be presented to the route port. The client data plane and
the HTTP control plane are also reached over the public IPs. There is no fixed
"seed" node: with R3 the three are peers (see "Bring up" for why a lone node cannot
self-serve).
## Prerequisites (HUMAN, once)
@@ -93,25 +100,48 @@ SEED
> The KV written here lives in `./local_files/jetstream`, which the cluster unit
> reuses (`--nats-store` default), so the admin is present when the enforce cluster
> starts. Additional users are added the same loopback way until a
> `user add --store kv` exists (see GAP in report 0009).
> starts. This loopback bootstrap is needed ONLY for the very first admin (the
> chicken-and-egg). **Every user after that is added with the cluster live** — no
> stop-seed-restart — via `user add --store kv` (see "Add users to the live
> cluster" below, report 0012).
## Bring up (HUMAN — staggered)
## Bring up (HUMAN)
Bring up the seed first, then the replicas one at a time, checking each joins.
> **CORRECTION (report 0012).** The original instruction — "start magnus alone and
> verify healthz, then add the others" — is **WRONG and will look like a hung
> deploy.** A 3-node JetStream cluster forms a RAFT meta-group that needs a quorum
> (2 of 3) to elect a leader. A single started node has no quorum, so its JetStream
> meta never becomes current: `--store kv` blocks creating the KV buckets and
> **`/healthz` never returns ok** until a second node joins. Waiting for magnus to
> "go green" before starting the others therefore deadlocks the rollout.
Start the nodes so a quorum forms. On a **clean cluster** the simplest correct
procedure is to start all three close together and let the meta-group converge:
```bash
# 1. Seed node (after the seed step above).
ssh root@magnus 'systemctl enable --now membershipd-cluster'
ssh root@magnus 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt'
# Start all three (order does not matter); each blocks on the others until a
# 2/3 quorum elects a JetStream meta leader, then the KV buckets are created.
for h in magnus homer datardos; do ssh "$h" 'sudo systemctl enable --now membershipd-cluster'; done
# 2. Replicas, one at a time.
ssh root@homer 'systemctl enable --now membershipd-cluster'
ssh root@datardos 'systemctl enable --now membershipd-cluster'
# Only NOW does healthz return ok — once the meta-group has a leader (give it
# ~10-30s on a cold start). Poll, do not assume the first node is broken.
for h in magnus homer datardos; do
echo "== $h =="; ssh "$h" 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt || echo "(not ready yet — needs quorum)"'
done
```
> Initial rollout runs at **R1** (`KV_REPLICAS=1` in `nodes.env`): the buckets live
> on the seed only. This is NOT HA yet — see "Scale to R3".
A **staggered** start also works, but only because `membershipd`'s KV open RETRIES
the bucket creation for a 120s bootstrap budget (issue 0006g, fix #3): the first
node sits in that retry loop — NOT serving healthz — until the second node makes a
quorum, then both converge and the third catches up. Either way, a lone node never
self-serves; do not gate the next node's start on the previous one's healthz.
> A cold multi-node start only converges because of **three cold-start fixes**
> (report 0011): route pooling off (`PoolSize=-1`), `NoAdvertise=true` (Docker
> bridge IPs not gossiped), and the KV-open retry loop above. Without them the
> meta-group re-elects leaders forever and bucket creation hangs. If a fresh
> cluster will not form, confirm the running binary contains these fixes before
> touching config.
## Promote an existing single-node (SQLite) deployment (HUMAN, optional)
@@ -137,11 +167,80 @@ ssh root@magnus 'nats --server nats://127.0.0.1:4250 server list' # 3 servers,
A healthy cluster shows 3 routed servers and a JetStream meta-group with a leader.
## Scale to R3 (HUMAN — real HA)
## Add users to the live cluster (HUMAN — `user add --store kv`)
Once all three nodes are up and routed, raise the replication factor of every
control-plane stream from 1 to 3 IN PLACE (no data loss), then flip `KV_REPLICAS=3`
in `nodes.env` so future (re)deploys keep it:
With the cluster up, add (and revoke) bus users **without stopping anything**,
directly against the replicated KV allowlist. This replaces the stop-seed-restart
procedure the original runbook implied for every user beyond the first admin.
The mechanism is the cluster's own **privileged internal connection**: under
`enforce` every bus user is confined by the per-subject ACL to its own rooms, so no
ordinary identity may write the control-plane buckets. The only identity the
authenticator grants full JetStream permissions is `membershipd`'s internal service
identity. The unit persists that identity to `${INTERNAL_ID_FILE}`
(`/opt/unibus/secrets/internal.id`, 0600) via `--internal-id-file`, so the same key
is available to the CLI. Run the CLI **on a node, over loopback** (the data-plane
TLS cert SAN covers `127.0.0.1`); reading the identity file requires root on that
node, which already implies full control of it, so this adds no practical exposure.
```bash
# Add a member to the live cluster's replicated allowlist (run on any node).
ssh root@magnus 'sudo /opt/unibus/membershipd user add --store kv \
--handle alice --role member --sign-pub <64-hex-ed25519-pub>'
# -> added user "alice" (...) role=member
# -> KV_UNIBUS_users: leader=<node> followers_current=2/2 msgs=N (replicated, HA)
# List / revoke against the same live KV:
ssh root@magnus 'sudo /opt/unibus/membershipd user list --store kv'
ssh root@magnus 'sudo /opt/unibus/membershipd user revoke --store kv <64-hex-ed25519-pub>'
```
Defaults assume an on-node invocation (`--nats-url nats://127.0.0.1:4250`,
`--internal-id-file /opt/unibus/secrets/internal.id`, `--ca /opt/unibus/tls/ca.crt`,
`--kv-replicas 3`). Semantics:
- **Idempotent / non-destructive**: re-adding the same key is an explicit
`already registered` error (exit 1), never a silent overwrite — a re-add cannot
flip a member to admin. To replace a user, `revoke` then add.
- **HA**: the write commits through the JetStream quorum, so it succeeds even with
one node down (2/3); the printed `followers_current` shows replication.
- **No hard delete**: `revoke` flips status to `revoked` (denied on both planes,
auditable); the KV has no row deletion, matching the SQLite store.
> **Rollout note (report 0012):** the live verification deployed this binary +
> `--internal-id-file` to **datardos only** (the non-critical node). magnus and
> homer still run the 0011 binary. To make the capability available (and the unit)
> on all three — recommended, the posture is identical so there is no urgency — roll
> the new binary with backups, one node at a time, verifying healthz between each:
> ```bash
> for h in homer magnus; do
> ssh "$h" 'sudo cp -a /opt/unibus/membershipd /opt/unibus/membershipd.bak' # backup
> scp build/membershipd "$h:/tmp/m" && ssh "$h" 'sudo install -o ubuntu -g ubuntu -m0775 /tmp/m /opt/unibus/membershipd'
> # add INTERNAL_ID_FILE=/opt/unibus/secrets/internal.id to /opt/unibus/cluster.env
> # add `--internal-id-file ${INTERNAL_ID_FILE} \` to the unit before `--store kv`
> ssh "$h" 'sudo systemctl daemon-reload && sudo systemctl restart membershipd-cluster'
> ssh "$h" 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt' # green before next
> done
> ```
> (`deploy-cluster.sh` + the unit template already emit `INTERNAL_ID_FILE` and the
> flag, so a fresh `./deploy-cluster.sh --yes` is correct for all three.)
## Replication: go straight to R3 (HUMAN — real HA)
> **CORRECTION (report 0012).** The original "start at R1, then scale to R3" plan
> assumed R1 is a usable interim state. **It is not, in this cluster.** At R1 all six
> control-plane buckets (`KV_UNIBUS_users/rooms/members/room_keys/rooms_by_member`
> + `KV_UNIBUS_nonces`) live on a SINGLE node — a hard **SPOF for authentication**:
> if that node dies, the nonce/KV control plane is unreachable and EVERY
> authenticated request fails closed (auth DoS). Worse, the cold multi-node start
> only converges at all because of the three cold-start fixes (see "Bring up"); the
> real deploy never ran a healthy R1 and **jumped straight to R3 once the cluster
> formed.** Treat R1 as a transient artifact of bucket creation, not a milestone.
The deployed config already sets `KV_REPLICAS=3` in `nodes.env`. If buckets were
created at R1 (e.g. only one node was up when `--store kv` first opened them), raise
every control-plane stream to R3 IN PLACE (no data loss) once all three nodes are
routed:
```bash
for s in KV_UNIBUS_users KV_UNIBUS_rooms KV_UNIBUS_members KV_UNIBUS_room_keys \
@@ -151,27 +250,32 @@ done
# (also OBJ_UNIBUS_blobs if the object store is in use)
```
Until this is done, R1 means the seed node is a **single point of failure for
authentication**: if it dies, the nonce/KV control plane is unreachable and every
authenticated request fails closed (auth DoS). R1 is a rollout step, not HA.
After this each bucket shows `followers_current=2/2` (quorum 2/3). The
`user add --store kv` command prints that figure for `KV_UNIBUS_users` on every add,
which is a cheap live HA check.
## Chaos test (HUMAN — requires the 3 live VPS; NOT run here)
## Chaos test (HUMAN — requires the 3 live VPS)
Validate quorum tolerance after R3:
```bash
# Kill one node; the cluster keeps serving (quorum 2/3).
ssh root@datardos 'systemctl stop membershipd-cluster'
# Kill one node; the cluster keeps serving (quorum 2/3). On ubuntu nodes use sudo.
ssh dd 'sudo systemctl stop membershipd-cluster'
# -> clients fail over (multiple seed URLs); reads/writes still succeed.
ssh root@datardos 'systemctl start membershipd-cluster' # rejoins, catches up
ssh dd 'sudo systemctl start membershipd-cluster' # rejoins, catches up
# Kill two nodes; quorum is LOST — the control plane should fail CLOSED (deny),
# never fail open. Verify a request is rejected, not silently served.
```
This network-level chaos test (kill 1/3, kill 2/3, partition/split-brain) is part
of the deploy validation (issue 0003f) and runs against the real VPS — it is
deliberately out of scope for the authoring agent.
> **Validated (report 0012).** The 0011 chaos run checked only the control plane
> (healthz + meta/stream-leader failover + KV readable with 2/3). Report 0012 added
> the missing data-plane proofs against the live cluster: a real authenticated
> client (`cmd/clientcheck`, operator identity, nkey+TLS) creating an E2E room and
> publishing/subscribing — including a node stopped mid-stream, where the client
> failed over to a survivor and kept receiving with zero loss (quorum 2/3) — and
> `user add --store kv` committing with one node (the KV leader) down. The kill-2/3
> fail-closed case remains a documented manual step.
## Rollback
@@ -179,3 +283,61 @@ deliberately out of scope for the authoring agent.
the unit and start it without `--store kv`/`--cluster-name`; the KV buckets remain
for a later retry. To rotate the cluster CA, re-run `generate-cluster-certs.sh
--force` and re-stage (every node must get the new `cluster-ca.crt` together).
## NATS server metrics (loopback monitoring — optional)
The embedded NATS server can expose its own monitoring HTTP endpoint so a local
scraper reads server-level metrics that `/healthz` does not surface: msgs/s,
connections, slow consumers, memory, KV bucket message counts, the RAFT leader per
stream and per-stream restarts. This feeds the `unibus-nats` dashboard in
`fleet_monitoring` (the scraper hits `127.0.0.1:8222/varz|/connz|/jsz` over
loopback and pushes to VictoriaMetrics).
The endpoint is opened by the **dedicated** environment toggle `UNIBUS_NATS_MONITOR=1`
(0.11.0+ binary). It is **decoupled** from `UNIBUS_NATS_DEBUG`: it opens the
monitoring endpoint WITHOUT enabling the verbose nats-server debug log, so no room
subjects or routing metadata leak to journald (keeps the hardened posture, issue
0007). The endpoint binds `127.0.0.1:8222` **only** — the binary hardcodes the
loopback bind, so it is never reachable from the network and needs no auth. Never
use `UNIBUS_NATS_DEBUG` in production just to get the endpoint.
### Enable it (HUMAN — requires the 0.11.0+ binary on the node)
The clean way is the additive systemd drop-in in this directory:
```bash
# On each node, AFTER the 0.11.0+ binary is in /opt/unibus/membershipd:
ssh <node> 'sudo mkdir -p /etc/systemd/system/membershipd-cluster.service.d'
scp membershipd-cluster.service.d/nats-monitor.conf <node>:/tmp/nats-monitor.conf
ssh <node> 'sudo cp /tmp/nats-monitor.conf /etc/systemd/system/membershipd-cluster.service.d/ \
&& sudo systemctl daemon-reload && sudo systemctl restart membershipd-cluster'
```
(Equivalently, add `UNIBUS_NATS_MONITOR=1` to `/opt/unibus/cluster.env`, which the
unit already sources via `EnvironmentFile`; the drop-in is preferred because it is
self-documenting and does not edit the generated env file.)
### Rolling restart with the R3 reconvergence gate (CRITICAL)
`systemctl restart membershipd-cluster` restarts that node's JetStream RAFT member.
**Never restart two nodes at once** — that would drop the cluster below quorum
(2/3) and fail the control plane closed. Roll **one node at a time**, in the order
`magnus → homer → datardos`, and between each node wait until the cluster has
reconverged to R3 (every control-plane bucket back to `followers_current=2/2`):
```bash
# After restarting ONE node, gate on R3 reconvergence before touching the next:
ssh root@magnus 'for s in KV_UNIBUS_users KV_UNIBUS_rooms KV_UNIBUS_members \
KV_UNIBUS_room_keys KV_UNIBUS_rooms_by_member KV_UNIBUS_nonces; do
nats --server nats://127.0.0.1:4250 stream info "$s" -j \
| jq -r --arg s "$s" \"\\($s): replicas=\\(.cluster.replicas|length) leader=\\(.cluster.leader)\"
done'
# Proceed to the next node ONLY when all six show 3 replicas with a leader
# (i.e. 2/2 followers current). Also confirm healthz is green on the just-restarted
# node first:
ssh <node> 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt'
```
This restart is normally **not** done as a standalone step: the 0.11.0 binary that
carries the flag is rolled to the three nodes in the consolidated rollout, and the
drop-in is installed during that same rolling restart.
+12 -8
View File
@@ -97,6 +97,7 @@ TLS_KEY=${REMOTE_DIR}/tls/server-${name}.key
ROUTE_TLS_CERT=${REMOTE_DIR}/tls/route-${name}.crt
ROUTE_TLS_KEY=${REMOTE_DIR}/tls/route-${name}.key
ROUTE_TLS_CA=${REMOTE_DIR}/tls/cluster-ca.crt
INTERNAL_ID_FILE=${REMOTE_DIR}/secrets/internal.id
EOF
run ssh "$target" "mkdir -p ${REMOTE_DIR}/tls ${REMOTE_DIR}/secrets"
@@ -114,13 +115,16 @@ if [[ $APPLY -eq 0 ]]; then
fi
cat <<'NEXT'
HUMAN — staggered start (do NOT enable all at once; see README "Bring up"):
1. Seed node first (e.g. magnus):
ssh root@magnus 'systemctl enable --now membershipd-cluster'
ssh root@magnus '/opt/unibus/membershipd user add --admin ...' # seed admin
2. Then the other two, one at a time, checking quorum after each:
ssh root@homer 'systemctl enable --now membershipd-cluster'
ssh root@datardos 'systemctl enable --now membershipd-cluster'
HUMAN — bring up (see README "Bring up" — a LONE node has no quorum and never
serves healthz, so do NOT gate the next node on the previous one going green):
1. Seed the FIRST admin into the KV via the loopback bootstrap (README
"Seed the first admin"); this is needed only for the chicken-and-egg admin.
2. Start all three so a 2/3 quorum forms (order does not matter); healthz
turns ok only once the meta-group elects a leader (~10-30s cold):
for h in magnus homer datardos; do ssh "$h" 'sudo systemctl enable --now membershipd-cluster'; done
3. Verify posture + quorum (README "Verify").
4. Scale replicas 1 -> 3 once all three are up (README "Scale to R3").
4. Ensure R3 on every control-plane stream (README "Replication: go straight to
R3"); R1 is a SPOF, not a milestone.
5. Add further users with the cluster LIVE — no restart — via
`membershipd user add --store kv` (README "Add users to the live cluster").
NEXT
@@ -33,6 +33,7 @@ ExecStart=/opt/unibus/membershipd \
--route-tls-cert ${ROUTE_TLS_CERT} \
--route-tls-key ${ROUTE_TLS_KEY} \
--route-tls-ca ${ROUTE_TLS_CA} \
--internal-id-file ${INTERNAL_ID_FILE} \
--store kv \
--kv-replicas ${KV_REPLICAS}
# Restart=always (NOT on-failure): a clean SIGTERM exits success, and on-failure
@@ -0,0 +1,27 @@
# Drop-in: enable the embedded NATS server monitoring HTTP endpoint so a local
# metrics scraper can read /varz, /connz and /jsz for server-level metrics
# (msgs/s, connections, KV bucket msgs, RAFT leader per stream, restarts).
#
# ADDITIVE and minimal: it only sets one environment variable; the base unit
# (membershipd-cluster.service) is otherwise unchanged.
#
# UNIBUS_NATS_MONITOR is DECOUPLED from UNIBUS_NATS_DEBUG: it opens the monitoring
# endpoint WITHOUT enabling the verbose nats-server debug log, so no room subjects
# or routing metadata are written to journald (keeps the hardened posture, issue
# 0007). Do NOT use UNIBUS_NATS_DEBUG in production just to get the endpoint.
#
# The endpoint binds 127.0.0.1:8222 ONLY — the binary hardcodes the loopback bind,
# so it is never reachable from the network and needs no auth. The scraper runs on
# the same host and reads it over loopback.
#
# Requires the 0.11.0+ membershipd binary (the one that honors UNIBUS_NATS_MONITOR).
# Install on a node:
# sudo mkdir -p /etc/systemd/system/membershipd-cluster.service.d
# sudo cp nats-monitor.conf /etc/systemd/system/membershipd-cluster.service.d/
# sudo systemctl daemon-reload && sudo systemctl restart membershipd-cluster
#
# Restarting a node restarts its JetStream RAFT member, so roll ONE node at a time
# and wait for R3 reconvergence (followers 2/2) before touching the next. See the
# "NATS server metrics" section of this directory's README for the full runbook.
[Service]
Environment=UNIBUS_NATS_MONITOR=1
+21 -8
View File
@@ -2,10 +2,10 @@
#
# This file is SOURCED by generate-cluster-certs.sh and deploy-cluster.sh.
#
# HUMAN: fill in every <PLACEHOLDER> with the real value before running the
# HUMAN: fill in every placeholder with the real value before running the
# scripts. The public IPs known at authoring time are pre-filled; the WireGuard
# mesh IPs and magnus's public IP must be supplied. The scripts refuse to run
# while any <PLACEHOLDER> remains.
# while any unfilled placeholder remains.
# Cluster identity (must be identical on every node).
CLUSTER_NAME="unibus"
@@ -16,7 +16,7 @@ CLUSTER_USER="unibus-cluster"
# KV/nonce replication factor. START AT 1 for the initial 1->3 rollout, then raise
# to 3 IN PLACE (see README "Scale to R3") once all three nodes have joined. Only
# set this to 3 here after the third node is up and you re-run the KV update.
KV_REPLICAS=1
KV_REPLICAS=3
# Ports (same on every node; the route port is server-to-server only).
NATS_CLIENT_PORT=4250
@@ -30,15 +30,28 @@ SSH_USER="root"
# Which address family the inter-node routes use. "wg" builds --routes from the
# WireGuard mesh IPs (private server-to-server links, preferred); "public" uses
# the public IPs. The route layer is always mutual-TLS regardless.
ROUTE_NETWORK="wg"
#
# DEPLOY DECISION (2026-06-07): set to "public". No WireGuard mesh exists between
# the three cluster nodes — homer and datardos do not even have the `wg` binary
# installed, and om's only WG peers are the operator's personal PCs, not the VPS.
# Rather than stand up a fresh mesh blindly, the routes go over the public IPs,
# still protected by the separate cluster route CA (mutual-TLS). On magnus (the
# only node with ufw active) the route port 6250 is restricted to the homer and
# datardos public IPs; homer/datardos run ufw inactive (Docker hosts) and rely on
# the route mutual-TLS for 6250.
ROUTE_NETWORK="public"
# One row per node: NAME SSH_HOST PUBLIC_IP WG_IP
# NAME -> --server-name and the per-node cert filenames (unique).
# SSH_HOST -> the `ssh <SSH_HOST>` alias (see ~/.ssh/config).
# SSH_HOST -> the `ssh ALIAS` alias (see ~/.ssh/config).
# PUBLIC_IP -> public address; goes in the cert SANs (client-facing data plane).
# WG_IP -> WireGuard mesh address; cert SAN + route target when ROUTE_NETWORK=wg.
# NOTE: with ROUTE_NETWORK=public and no WireGuard mesh, the WG_IP column is set to
# each node's public IP so the cert SAN covers the address actually used by the
# public routes and no unfilled placeholder remains (scripts refuse to run otherwise).
# magnus == organic-machine.com == om (135.125.201.30); SSH alias `magnus` enters as root.
CLUSTER_NODES=(
"magnus magnus <MAGNUS_PUBLIC_IP> <MAGNUS_WG_IP>"
"homer homer 141.94.69.66 <HOMER_WG_IP>"
"datardos dd 51.91.100.142 <DATARDOS_WG_IP>"
"magnus magnus 135.125.201.30 135.125.201.30"
"homer homer 141.94.69.66 141.94.69.66"
"datardos dd 51.91.100.142 51.91.100.142"
)
@@ -0,0 +1,78 @@
---
issue: 0007
title: Cifrado at-rest del control plane (JetStream KV / SQLite en disco)
status: spec
created: 2026-06-07
domain: security
scope: unibus (pkg/embeddednats, cmd/membershipd, deploy/cluster) + procedimiento de migración del store existente
---
# Objetivo
Cifrar en reposo el almacenamiento del plano de control para que un nodo comprometido
(root en el VPS) o un disco robado no exponga los metadatos de control en claro.
Estado actual (auditado el 07/06/2026, report 0012 y siguientes):
- **Contenido de los mensajes**: cifrado E2E por room (megolm/olm). El servidor nunca ve el
plaintext; no vive en el plano de control. **No es el objeto de este issue.**
- **Claves de room** (`UNIBUS_room_keys`): guardadas **selladas** (sealed box X25519, cifradas
para cada miembro). El servidor las almacena y reparte pero no puede abrirlas. **Ya protegidas.**
- **Metadatos de control** (`UNIBUS_rooms`, `UNIBUS_members`, `UNIBUS_rooms_by_member`,
`UNIBUS_users`): se serializan con `json.Marshal` y se escriben **en claro** en el store. En
cluster ese store es el directorio `local_files/jetstream/` de cada nodo; en single-node es el
archivo SQLite `local_files/unibus.db`. Hoy **no hay cifrado at-rest**: con root en un nodo se
pueden leer subjects de salas, la pertenencia (quién está en qué sala con qué rol), los handles
y roles de los usuarios, y las claves públicas (signPub/kexPub). No se exponen mensajes (E2E) ni
se pueden descifrar salas (claves selladas), pero sí toda la topología.
Tras este issue, los buckets/archivos del control plane quedan cifrados en disco con una clave por
nodo gestionada fuera de git. El modelo de amenaza pasa de "root del nodo ve la topología" a "root
del nodo necesita además la clave at-rest (que puede vivir en un secreto separado / TPM / variable
de entorno inyectada) para leer cualquier cosa".
# Contexto técnico
- NATS Server / JetStream soporta **encryption at-rest** nativo: se configura una cifra
(`aes` o `chacha20`) y una clave; JetStream cifra los ficheros de los streams/KV en disco. El
bus usa un NATS **embebido** (`pkg/embeddednats`), así que la activación es por opciones del
servidor embebido, no por un `nats-server.conf` externo.
- Para el backend SQLite (single-node) el equivalente sería SQLCipher o cifrado a nivel de
archivo/FS; queda como sub-tarea de menor prioridad porque el despliegue real es cluster (KV).
# Tareas
1. Confirmar la API de encryption-at-rest del NATS embebido en la versión usada (opción de
servidor para cipher + clave; cómo se pasa la clave de forma que no quede en argv ni en git).
2. Activar el cifrado en `pkg/embeddednats` detrás de una opción de configuración. La clave se
inyecta por archivo (`--jetstream-encryption-key-file`, 0600, junto a las claves TLS del nodo)
o variable de entorno desde el unit systemd; nunca en argv ni commiteada.
3. `cmd/membershipd`: flag/env para la clave + reflejar el estado en la posture publicada en
`/healthz` (p.ej. `"at_rest":true`) para que el monitor lo verifique.
4. `deploy/cluster`: provisionar la clave at-rest por nodo (generación + `pass`/secrets gitignored)
y cablearla en `cluster.env` + el unit. Documentar en el runbook.
5. **Migración del store existente** (gotcha crítico): JetStream no re-cifra retroactivamente los
datos ya escritos en claro. Diseñar y documentar el procedimiento seguro para el cluster en
producción (probable: backup → exportar snapshot del control plane → parar nodo → recrear el
store con la clave activa → re-importar; o rotación nodo a nodo aprovechando la replicación R3).
Respetar la regla de migraciones (aditivo, sin pérdida de datos).
6. Tests: arrancar un nodo con clave at-rest, escribir un user/room, y verificar que el fichero en
disco **no** contiene en claro un subject/handle conocido (grep negativo), y que el nodo sigue
leyéndolos con la clave. Verificar que sin la clave el store no se abre.
# Definition of Done
- Cifrado at-rest activo en los 3 nodos del cluster; `/healthz` lo refleja en la posture.
- Evidencia ejecutable: un valor conocido (subject de sala / handle de usuario) **no** aparece en
claro al hacer `grep` sobre `local_files/jetstream/`; el nodo lo sigue sirviendo con la clave.
- Procedimiento de migración probado sobre datos reales sin pérdida (snapshot/restore verificado).
- La clave at-rest nunca está en git ni en argv; vive en archivo 0600 / secreto inyectado.
- No baja ninguna otra capa de seguridad (enforce + ACL + TLS + E2E + sealed keys intactas).
# Notas
Aditivo y ortogonal al resto de la seguridad: TLS protege en tránsito, E2E el contenido, las claves
de room van selladas; este issue cierra el último hueco (metadatos de control en claro en disco)
para el modelo de amenaza "VPS comprometido / disco robado". Prioridad media: el despliegue ya es
seguro frente a ataques de red (enforce+TLS+ACL); esto endurece frente a compromiso físico/root del
host. Relacionado con el endurecimiento de los issues 0004/0005/0006.
+28
View File
@@ -0,0 +1,28 @@
-- 003_invites.sql — single-use registration invites (issue: user accounts / wallet model).
--
-- An admin mints an invite so a brand-new identity can join the bus allowlist
-- WITHOUT the admin ever handling its private key. The token is the bearer
-- secret that authorizes POST /register: the registering client generates its
-- keypair locally and publishes only its public keys, fixing the link between an
-- invite and the identity it creates via the audit columns below. The handle and
-- role are fixed by the admin at mint time and cannot be changed by the client
-- (no privilege escalation).
--
-- Additive and idempotent: safe to apply repeatedly. Never modify this file;
-- further schema changes go in new numbered migrations (see
-- .claude/rules/db_migrations.md). The embedded copy under
-- pkg/membership/migrations/003_invites.sql mirrors this file byte-for-byte.
CREATE TABLE IF NOT EXISTS invites (
token TEXT PRIMARY KEY, -- 32 random bytes in lowercase hex (the bearer secret)
handle TEXT NOT NULL, -- handle the new user will get (fixed by admin)
role TEXT NOT NULL DEFAULT 'member', -- 'admin' | 'member' (fixed by admin)
expires_at TEXT NOT NULL, -- RFC3339; past this the invite is dead
used INTEGER NOT NULL DEFAULT 0, -- 0 pending, 1 consumed (single-use)
created_at TEXT NOT NULL,
used_at TEXT, -- RFC3339 when consumed (NULL until used)
used_sign_pub TEXT, -- Ed25519 key that consumed it (audit; NULL until used)
used_kex_pub TEXT -- X25519 key presented at registration (audit; NULL until used)
);
CREATE INDEX IF NOT EXISTS idx_invites_used ON invites(used);
+229
View File
@@ -331,6 +331,60 @@ func (c *Client) doJSON(method, path string, body, out any) error {
return fmt.Errorf("client: %s %s: all control planes failed: %w", method, path, lastErr)
}
// doUnsigned performs a control-plane request WITHOUT the transport signature
// headers, for the one endpoint a not-yet-registered identity must reach: POST
// /register. The registering peer is not in the allowlist, so it cannot produce
// an accepted signature; authorization is the single-use invite token inside the
// body. Like doJSON it fails over across the control-plane endpoints (any node
// serves the same state) and surfaces the server's structured error message.
func (c *Client) doUnsigned(method, path string, body, out any) error {
var bodyBytes []byte
if body != nil {
b, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("client: marshal request: %w", err)
}
bodyBytes = b
}
var lastErr error
for _, base := range c.ctrlURLs {
var rdr io.Reader
if bodyBytes != nil {
rdr = bytes.NewReader(bodyBytes)
}
req, err := http.NewRequest(method, base+path, rdr)
if err != nil {
return fmt.Errorf("client: new request: %w", err)
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := c.http.Do(req)
if err != nil {
lastErr = err
continue // dead node: try the next control plane
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 300 {
var er struct {
Error string `json:"error"`
}
if json.Unmarshal(respBody, &er) == nil && er.Error != "" {
return fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode)
}
return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody))
}
if out != nil {
if err := json.Unmarshal(respBody, out); err != nil {
return fmt.Errorf("client: decode response: %w", err)
}
}
return nil
}
return fmt.Errorf("client: %s %s: all control planes failed: %w", method, path, lastErr)
}
// signRequest signs the canonical bytes of req (req must already have its Sig
// field cleared) with the client's Ed25519 key. It is symmetric with the
// server's verifyOwnerSig. This is the PAYLOAD-level owner signature that
@@ -456,6 +510,52 @@ type memberRoomJSON struct {
Role string `json:"role"`
}
// userJSON mirrors the server's wire type on the admin user-management endpoints.
type userJSON struct {
SignPub string `json:"sign_pub"`
Handle string `json:"handle"`
Role string `json:"role"`
Status string `json:"status"`
CreatedAt string `json:"created_at"`
RevokedAt string `json:"revoked_at,omitempty"`
}
// addUserReq is the POST /users body (mirror of the server type).
type addUserReq struct {
SignPub string `json:"sign_pub"`
Handle string `json:"handle"`
Role string `json:"role"`
}
// createInviteReq / createInviteResp mirror the server's POST /invites types.
type createInviteReq struct {
Handle string `json:"handle"`
Role string `json:"role"`
TTLSecs int `json:"ttl_secs"`
}
type createInviteResp struct {
Token string `json:"token"`
ExpiresAt string `json:"expires_at"`
}
// inviteJSON mirrors the server's GET /invites row.
type inviteJSON struct {
Token string `json:"token"`
Handle string `json:"handle"`
Role string `json:"role"`
ExpiresAt string `json:"expires_at"`
Used bool `json:"used"`
CreatedAt string `json:"created_at"`
}
// registerReq mirrors the server's POST /register body.
type registerReq struct {
Token string `json:"token"`
SignPub string `json:"sign_pub"`
KexPub string `json:"kex_pub"`
}
// ---- room operations ------------------------------------------------------
// RoomRef is a room this peer belongs to, returned by ListMyRooms. It is the
@@ -490,6 +590,135 @@ func (c *Client) ListMyRooms() ([]RoomRef, error) {
return out, nil
}
// ---- user administration (admin-only) ------------------------------------
// UserInfo is a bus user as returned by the admin user-management endpoints. It
// is a flat view (no nested types) for the admin panel: the signing key
// (lowercase hex), handle, role ("admin"|"member"), status ("active"|"revoked"),
// and timestamps. RevokedAt is empty for an active user.
type UserInfo struct {
SignPub string
Handle string
Role string
Status string
CreatedAt string
RevokedAt string
}
// ListUsers returns the full bus allowlist, including revoked users. The caller
// must be signing as an admin: a non-admin signer is rejected by the server with
// 403, surfaced here as an error.
func (c *Client) ListUsers() ([]UserInfo, error) {
var resp []userJSON
if err := c.doJSON("GET", "/users", nil, &resp); err != nil {
return nil, err
}
out := make([]UserInfo, 0, len(resp))
for _, u := range resp {
out = append(out, UserInfo{
SignPub: u.SignPub,
Handle: u.Handle,
Role: u.Role,
Status: u.Status,
CreatedAt: u.CreatedAt,
RevokedAt: u.RevokedAt,
})
}
return out, nil
}
// AddUser registers a bus user from their Ed25519 signing public key (64-hex).
// role is "admin" or "member" (empty defaults to member, matching the server).
// The caller must be signing as an admin. Re-adding an already-registered key
// returns an error (the server replies 409 and leaves the existing row
// untouched — no silent role/status change).
func (c *Client) AddUser(signPub, handle, role string) error {
return c.doJSON("POST", "/users", addUserReq{SignPub: signPub, Handle: handle, Role: role}, nil)
}
// RevokeUser revokes a bus user by their signing public key (64-hex). Revocation
// is a status flip (no hard delete): the identity stays auditable and is denied
// on both planes immediately. The caller must be signing as an admin.
func (c *Client) RevokeUser(signPub string) error {
return c.doJSON("POST", "/users/"+signPub+"/revoke", nil, nil)
}
// DeleteUser hard-deletes a bus user by their signing public key (64-hex) — the
// purge counterpart of RevokeUser. The allowlist row is removed entirely (no
// audit trail); the ex-user can no longer authenticate, so their room
// memberships become inert. The caller must be signing as an admin.
func (c *Client) DeleteUser(signPub string) error {
return c.doJSON("DELETE", "/users/"+signPub, nil, nil)
}
// InviteInfo is a single-use registration invite as returned by the admin invite
// endpoints. It is a flat view for the admin panel: the bearer token (to build
// the join link), the handle and role the new user will receive, the absolute
// expiry, whether it has been used, and when it was minted.
type InviteInfo struct {
Token string
Handle string
Role string
ExpiresAt string
Used bool
CreatedAt string
}
// CreateInvite mints a single-use registration invite. handle and role are fixed
// here (the registering client cannot change them); role is "admin" or "member"
// (empty defaults to member). ttlSecs sets the link lifetime (non-positive uses
// the server's 7-day default). The returned InviteInfo carries the token and
// expiry; the caller turns the token into a join link. Caller must sign as admin.
func (c *Client) CreateInvite(handle, role string, ttlSecs int) (InviteInfo, error) {
var resp createInviteResp
if err := c.doJSON("POST", "/invites", createInviteReq{Handle: handle, Role: role, TTLSecs: ttlSecs}, &resp); err != nil {
return InviteInfo{}, err
}
r := role
if r == "" {
r = "member"
}
return InviteInfo{Token: resp.Token, Handle: handle, Role: r, ExpiresAt: resp.ExpiresAt}, nil
}
// ListInvites returns the pending invites (not used, not expired). Caller must
// sign as admin.
func (c *Client) ListInvites() ([]InviteInfo, error) {
var resp []inviteJSON
if err := c.doJSON("GET", "/invites", nil, &resp); err != nil {
return nil, err
}
out := make([]InviteInfo, 0, len(resp))
for _, inv := range resp {
out = append(out, InviteInfo{
Token: inv.Token,
Handle: inv.Handle,
Role: inv.Role,
ExpiresAt: inv.ExpiresAt,
Used: inv.Used,
CreatedAt: inv.CreatedAt,
})
}
return out, nil
}
// CancelInvite cancels (deletes) a pending invite by its token, so an admin can
// revoke a link before it is redeemed. Caller must sign as admin.
func (c *Client) CancelInvite(token string) error {
return c.doJSON("DELETE", "/invites/"+token, nil, nil)
}
// Register redeems a single-use invite token, joining the bus allowlist. It is
// the wallet-model join call: the registering peer generated its own keypair
// locally and publishes ONLY its public keys here (signPub Ed25519, kexPub
// X25519, both 64-hex). It is UNSIGNED — the bearer token is the authorization,
// because this identity is not yet in the allowlist and so cannot sign an
// accepted request. On success the identity is registered with the invite's
// handle and role and can connect like any other peer.
func (c *Client) Register(token, signPub, kexPub string) error {
return c.doUnsigned("POST", "/register", registerReq{Token: token, SignPub: signPub, KexPub: kexPub}, nil)
}
// newRoomKey returns 32 random bytes for a symmetric room key.
func newRoomKey() ([]byte, error) {
k := make([]byte, 32)
+27 -11
View File
@@ -33,20 +33,36 @@ type identityFile struct {
KexPriv string `json:"kex_priv"`
}
// LoadIdentity loads an existing identity from path. Unlike LoadOrCreateIdentity
// it NEVER creates one: a missing or unreadable file is an error. It is for
// callers that must consume a specific, pre-provisioned identity rather than mint
// a fresh one — for example membershipd's persisted internal service identity,
// which `membershipd user add --store kv` reads to present the privileged nkey
// the cluster authenticator recognizes.
func LoadIdentity(path string) (cs.Identity, error) {
data, err := os.ReadFile(path)
if err != nil {
return cs.Identity{}, fmt.Errorf("client: read identity %q: %w", path, err)
}
var f identityFile
if err := json.Unmarshal(data, &f); err != nil {
return cs.Identity{}, fmt.Errorf("client: parse identity %q: %w", path, err)
}
id, err := f.toIdentity()
if err != nil {
return cs.Identity{}, fmt.Errorf("client: decode identity %q: %w", path, err)
}
return id, nil
}
// LoadOrCreateIdentity loads the identity at path, or generates and persists a
// new one if the file does not exist. The file is written with 0600
// permissions because it holds private keys.
// permissions because it holds private keys. A file that exists but is
// unreadable or corrupt is an error (NOT silently regenerated), so a damaged
// identity surfaces instead of minting a new key that cannot decrypt old data.
func LoadOrCreateIdentity(path string) (cs.Identity, error) {
if data, err := os.ReadFile(path); err == nil {
var f identityFile
if err := json.Unmarshal(data, &f); err != nil {
return cs.Identity{}, fmt.Errorf("client: parse identity %q: %w", path, err)
}
id, err := f.toIdentity()
if err != nil {
return cs.Identity{}, fmt.Errorf("client: decode identity %q: %w", path, err)
}
return id, nil
if _, statErr := os.Stat(path); statErr == nil {
return LoadIdentity(path)
}
id, err := cs.GenerateIdentity()
+104
View File
@@ -0,0 +1,104 @@
package client_test
import (
"encoding/hex"
"testing"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/membership"
)
// TestClientInvitesAdminAPI drives the wallet-model account flow through the real
// pkg/client methods against an in-process membershipd under enforce: an admin
// mints an invite, a brand-new identity redeems it via the UNSIGNED Register call
// (it is not yet in the allowlist), the admin then sees the user, and finally the
// admin hard-deletes it and it vanishes. This is the exact path the admin panel +
// the /join client page depend on, so it locks the client/server contract.
func TestClientInvitesAdminAPI(t *testing.T) {
h := newHarnessMode(t, membership.AuthEnforce)
waitHealth(t, h.ctrlURL)
admin, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect admin: %v", err)
}
defer admin.Close()
registerClient(t, h, admin, "admin", membership.RoleAdmin)
// Admin mints a single-use invite fixing handle + role.
inv, err := admin.CreateInvite("dora", membership.RoleMember, 0)
if err != nil {
t.Fatalf("admin CreateInvite: %v", err)
}
if len(inv.Token) != 64 || inv.ExpiresAt == "" {
t.Fatalf("invite malformed: %+v", inv)
}
if inv.Handle != "dora" || inv.Role != membership.RoleMember {
t.Fatalf("invite echo wrong: %+v", inv)
}
// It appears among the pending invites.
pend, err := admin.ListInvites()
if err != nil {
t.Fatalf("admin ListInvites: %v", err)
}
if !containsToken(pend, inv.Token) {
t.Fatalf("minted invite not pending: %+v", pend)
}
// A brand-new identity (NOT in the allowlist) redeems the invite via the
// UNSIGNED Register. We model its locally-generated keypair with a fresh
// identity and present its two public keys. Redeeming through this joiner
// client — which never registered and never seeded an admin — proves Register
// needs no admin signature; the bearer token is the sole authorization.
newID := mustIdentity(t)
signPub := hex.EncodeToString(newID.SignPub)
kexPub := hex.EncodeToString(newID.KexPub)
joiner, err := client.New(h.natsURL, h.ctrlURL, newID)
if err != nil {
t.Fatalf("connect joiner: %v", err)
}
defer joiner.Close()
if err := joiner.Register(inv.Token, signPub, kexPub); err != nil {
t.Fatalf("joiner Register: %v", err)
}
// Admin now sees dora in the allowlist with the invite's handle/role.
users, err := admin.ListUsers()
if err != nil {
t.Fatalf("admin ListUsers: %v", err)
}
row, ok := findUserInfo(users, signPub)
if !ok {
t.Fatalf("registered dora missing from allowlist: %+v", users)
}
if row.Handle != "dora" || row.Role != membership.RoleMember || row.Status != membership.StatusActive {
t.Fatalf("dora row wrong: %+v", row)
}
// Single-use: redeeming again is an error.
if err := joiner.Register(inv.Token, signPub, kexPub); err == nil {
t.Fatalf("second Register should error (used token)")
}
// Admin hard-deletes dora; she vanishes from the allowlist entirely.
if err := admin.DeleteUser(signPub); err != nil {
t.Fatalf("admin DeleteUser: %v", err)
}
users, err = admin.ListUsers()
if err != nil {
t.Fatalf("admin ListUsers after delete: %v", err)
}
if _, ok := findUserInfo(users, signPub); ok {
t.Fatalf("hard-deleted dora must NOT appear: %+v", users)
}
}
func containsToken(invites []client.InviteInfo, token string) bool {
for _, i := range invites {
if i.Token == token {
return true
}
}
return false
}
+99
View File
@@ -0,0 +1,99 @@
package client_test
import (
"encoding/hex"
"strings"
"testing"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/membership"
)
// findUserInfo returns the row with the given signing key (case-insensitive).
func findUserInfo(users []client.UserInfo, signPub string) (client.UserInfo, bool) {
want := strings.ToLower(signPub)
for _, u := range users {
if strings.ToLower(u.SignPub) == want {
return u, true
}
}
return client.UserInfo{}, false
}
// TestClientUsersAdminAPI drives the admin user-management API through the real
// pkg/client methods against an in-process membershipd under enforce: an admin
// client adds a user, lists it, revokes it, and sees the status flip — and a
// non-admin client is denied. This is the path the admin panel uses, so it locks
// the client/server contract the panel depends on.
func TestClientUsersAdminAPI(t *testing.T) {
h := newHarnessMode(t, membership.AuthEnforce)
waitHealth(t, h.ctrlURL)
admin, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect admin: %v", err)
}
defer admin.Close()
registerClient(t, h, admin, "admin", membership.RoleAdmin)
member, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect member: %v", err)
}
defer member.Close()
registerClient(t, h, member, "member", membership.RoleMember)
// A brand-new identity the admin will register over HTTP.
carol := mustIdentity(t)
carolPub := hex.EncodeToString(carol.SignPub)
// Admin adds carol as a member.
if err := admin.AddUser(carolPub, "carol", membership.RoleMember); err != nil {
t.Fatalf("admin AddUser: %v", err)
}
// Admin lists: carol present and active.
users, err := admin.ListUsers()
if err != nil {
t.Fatalf("admin ListUsers: %v", err)
}
row, ok := findUserInfo(users, carolPub)
if !ok {
t.Fatalf("carol missing from list after add: %+v", users)
}
if row.Status != membership.StatusActive || row.Role != membership.RoleMember {
t.Fatalf("carol row wrong after add: %+v", row)
}
// Re-adding the same key is a conflict surfaced as an error (no silent upsert).
if err := admin.AddUser(carolPub, "carol-again", membership.RoleAdmin); err == nil {
t.Fatalf("re-adding carol should error (409), got nil")
}
// Admin revokes carol; list shows the status flip (no hard delete).
if err := admin.RevokeUser(carolPub); err != nil {
t.Fatalf("admin RevokeUser: %v", err)
}
users, err = admin.ListUsers()
if err != nil {
t.Fatalf("admin ListUsers after revoke: %v", err)
}
row, ok = findUserInfo(users, carolPub)
if !ok {
t.Fatalf("carol vanished after revoke (should be a status flip): %+v", users)
}
if row.Status != membership.StatusRevoked {
t.Fatalf("carol should be revoked, got status %q", row.Status)
}
// A non-admin (member) is denied on every user-management method.
if _, err := member.ListUsers(); err == nil {
t.Fatalf("non-admin ListUsers should error (403), got nil")
}
if err := member.AddUser(carolPub, "x", membership.RoleMember); err == nil {
t.Fatalf("non-admin AddUser should error (403), got nil")
}
if err := member.RevokeUser(carolPub); err == nil {
t.Fatalf("non-admin RevokeUser should error (403), got nil")
}
}
+61 -2
View File
@@ -9,6 +9,7 @@ import (
"crypto/tls"
"fmt"
"net/url"
"os"
"time"
server "github.com/nats-io/nats-server/v2/server"
@@ -102,10 +103,38 @@ func StartHostAuth(storeDir, host string, port int, auth server.Authentication)
return StartServer(ServerConfig{StoreDir: storeDir, Host: host, Port: port, Auth: auth})
}
// natsLogOpts maps the two independent environment toggles to the embedded
// nats-server logging and monitoring flags. It is a pure function (no I/O) so the
// decoupling between the two toggles can be unit-tested directly.
//
// - UNIBUS_NATS_DEBUG="1" enables the nats-server logger (route/RAFT/JetStream
// errors); "2" additionally enables protocol tracing. Off by default so the
// server stays silent (NoLog) and production behavior is unchanged.
// - UNIBUS_NATS_MONITOR="1" opens the monitoring HTTP endpoint (loopback only)
// for a local metrics scraper to read /varz, /connz and /jsz.
//
// The two are DECOUPLED on purpose: enabling the monitoring endpoint must NOT turn
// on the verbose debug log, which would write room subjects and routing metadata
// to journald in clear and regress the hardened posture (issue 0007). The reverse
// coupling is kept for backward compatibility: debug mode still exposes the
// monitoring endpoint as well (debug implies monitor), so existing debugging
// workflows are unchanged.
func natsLogOpts(debugEnv, monitorEnv string) (noLog, debug, trace, monitor bool) {
debug = debugEnv == "1" || debugEnv == "2"
trace = debugEnv == "2"
monitor = monitorEnv == "1" || debug
noLog = !debug
return noLog, debug, trace, monitor
}
// StartServer launches an embedded nats-server with JetStream from cfg. It
// blocks until the server is ready to accept connections (up to 5s) and returns
// the running server; the caller must Shutdown it.
func StartServer(cfg ServerConfig) (*server.Server, error) {
// Map the two independent env toggles to the nats-server logging + monitoring
// flags. See natsLogOpts for the decoupling rationale (issue 0007).
noLog, debugNATS, traceNATS, monitorNATS := natsLogOpts(
os.Getenv("UNIBUS_NATS_DEBUG"), os.Getenv("UNIBUS_NATS_MONITOR"))
opts := &server.Options{
JetStream: true,
StoreDir: cfg.StoreDir,
@@ -114,8 +143,19 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
ServerName: cfg.ServerName,
DontListen: false,
// Keep the embedded server quiet by default; the host app logs the URLs.
NoLog: true,
NoSigs: true,
NoLog: noLog,
Debug: debugNATS,
Trace: traceNATS,
Logtime: true,
NoSigs: true,
}
if monitorNATS {
// Expose the nats-server monitoring endpoint on LOOPBACK ONLY (never public):
// the operator (or a local metrics scraper) inspects /varz, /connz, /jsz,
// /routez. The 127.0.0.1 bind is mandatory because this endpoint has no auth;
// it must stay unreachable from the network.
opts.HTTPHost = "127.0.0.1"
opts.HTTPPort = 8222
}
if cfg.Auth != nil {
opts.CustomClientAuthentication = cfg.Auth
@@ -141,6 +181,10 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
return nil, fmt.Errorf("embeddednats: new server: %w", err)
}
if debugNATS {
ns.ConfigureLogger()
}
go ns.Start()
if !ns.ReadyForConnections(5 * time.Second) {
@@ -162,6 +206,21 @@ func applyClusterOpts(opts *server.Options, c *ClusterConfig) error {
Port: c.Port,
Username: c.Username,
Password: c.Password,
// Disable route connection pooling (nats-server 2.10+ defaults to a pool of
// 3 connections per peer). On a small cluster the pool churns with
// "duplicate route"/"client closed" reconnects that interrupt the meta-group
// RAFT heartbeats, causing perpetual leader re-elections so the JetStream
// meta never becomes current and stream/KV creation hangs (issue 0006g).
// PoolSize=-1 forces the classic single route per peer, which is stable for
// the 3-node unibus cluster.
PoolSize: -1,
// NoAdvertise stops the server from gossiping its locally-discovered IPs to
// peers. The cluster nodes are Docker hosts, so without this NATS advertises
// the docker bridge addresses (172.x / 10.0.x) as reachable routes; peers
// then try to dial those private, mutually-unreachable IPs, churning the
// route layer and destabilizing the JetStream meta-group. With NoAdvertise
// the nodes use ONLY the explicit public-IP routes we configure (issue 0006g).
NoAdvertise: true,
}
if c.TLS != nil {
opts.Cluster.TLSConfig = c.TLS
+134
View File
@@ -0,0 +1,134 @@
package embeddednats
import (
"io"
"net"
"net/http"
"testing"
"time"
)
// TestNatsLogOptsDecoupled is the core regression guard for issue 0007: turning
// on the monitoring endpoint must NEVER turn on the verbose nats-server debug log
// (which would leak room subjects/routing metadata to journald). It also checks
// the backward-compatible coupling (debug still implies monitoring) and the quiet
// default.
func TestNatsLogOptsDecoupled(t *testing.T) {
cases := []struct {
name string
debugEnv, monitorEnv string
noLog, debug, trace, monitor bool
}{
{"default off — quiet, no monitor", "", "", true, false, false, false},
{"monitor only — endpoint on, log stays quiet", "", "1", true, false, false, true},
{"debug implies monitor", "1", "", false, true, false, true},
{"trace implies debug+monitor", "2", "", false, true, true, true},
{"both set", "1", "1", false, true, false, true},
{"monitor garbage value ignored", "", "yes", true, false, false, false},
{"debug garbage value ignored", "true", "", true, false, false, false},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
noLog, debug, trace, monitor := natsLogOpts(c.debugEnv, c.monitorEnv)
if noLog != c.noLog || debug != c.debug || trace != c.trace || monitor != c.monitor {
t.Fatalf("natsLogOpts(%q,%q) = (noLog=%v debug=%v trace=%v monitor=%v), want (noLog=%v debug=%v trace=%v monitor=%v)",
c.debugEnv, c.monitorEnv, noLog, debug, trace, monitor,
c.noLog, c.debug, c.trace, c.monitor)
}
})
}
// Explicit golden assertion of the security property: monitor on, log off.
noLog, debug, _, monitor := natsLogOpts("", "1")
if !monitor {
t.Fatal("UNIBUS_NATS_MONITOR=1 must open the monitoring endpoint")
}
if debug || !noLog {
t.Fatalf("UNIBUS_NATS_MONITOR=1 must NOT enable the debug log (got debug=%v noLog=%v)", debug, noLog)
}
}
// TestMonitorEndpointLoopback boots a real embedded server with
// UNIBUS_NATS_MONITOR=1 (and DEBUG explicitly off) and proves the monitoring HTTP
// endpoint answers on loopback only — the exact contract the metrics scraper
// relies on. The pure decoupling check above already guarantees the log stays out
// of debug mode for this same env combination.
func TestMonitorEndpointLoopback(t *testing.T) {
t.Setenv("UNIBUS_NATS_DEBUG", "")
t.Setenv("UNIBUS_NATS_MONITOR", "1")
ns, err := StartServer(ServerConfig{
StoreDir: t.TempDir(),
Host: "127.0.0.1",
Port: freeLoopbackPort(t),
})
if err != nil {
t.Fatalf("start server with monitoring: %v", err)
}
defer func() { ns.Shutdown(); ns.WaitForShutdown() }()
addr := ns.MonitorAddr()
if addr == nil {
t.Fatal("monitoring endpoint not open with UNIBUS_NATS_MONITOR=1 (MonitorAddr is nil)")
}
if !addr.IP.IsLoopback() {
t.Fatalf("monitoring endpoint bound to %s, must be loopback only", addr.IP)
}
if addr.Port != 8222 {
t.Fatalf("monitoring endpoint on port %d, want the fixed loopback port 8222", addr.Port)
}
// /varz must answer 200 with a non-empty body on loopback.
url := "http://" + addr.String() + "/varz"
var resp *http.Response
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
resp, err = http.Get(url) //nolint:gosec // loopback monitoring endpoint, no auth by design
if err == nil {
break
}
time.Sleep(50 * time.Millisecond)
}
if err != nil {
t.Fatalf("GET %s: %v", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("GET %s -> %d, want 200", url, resp.StatusCode)
}
body, _ := io.ReadAll(resp.Body)
if len(body) == 0 {
t.Fatalf("GET %s returned an empty body", url)
}
}
// TestMonitorDisabledByDefault proves a server started without either toggle does
// NOT open the monitoring endpoint, so production stays closed unless opted in.
func TestMonitorDisabledByDefault(t *testing.T) {
t.Setenv("UNIBUS_NATS_DEBUG", "")
t.Setenv("UNIBUS_NATS_MONITOR", "")
ns, err := StartServer(ServerConfig{
StoreDir: t.TempDir(),
Host: "127.0.0.1",
Port: freeLoopbackPort(t),
})
if err != nil {
t.Fatalf("start server: %v", err)
}
defer func() { ns.Shutdown(); ns.WaitForShutdown() }()
if addr := ns.MonitorAddr(); addr != nil {
t.Fatalf("monitoring endpoint open (%s) without UNIBUS_NATS_MONITOR — must stay closed by default", addr)
}
}
func freeLoopbackPort(t *testing.T) int {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("free port: %v", err)
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port
}
+296
View File
@@ -0,0 +1,296 @@
package membership
import (
"crypto/rand"
"database/sql"
"encoding/hex"
"errors"
"fmt"
"strings"
"time"
)
// Invite is a single-use registration token the admin mints so a brand-new
// identity can join the bus allowlist WITHOUT the admin ever handling its
// private key (the wallet model: the key is born and stays on the user's
// device; only the public key is published, via POST /register).
//
// The admin fixes the handle and role at mint time; the registering client may
// NOT change them (no privilege escalation). Token is 32 random bytes in
// lowercase hex (64 chars). ExpiresAt and CreatedAt are RFC3339Nano UTC. Used
// flips to true the instant the invite is consumed, and an invite can be
// consumed at most once. The audit fields (UsedAt/UsedSignPub/UsedKexPub) are
// empty until the invite is consumed; they record which keys claimed it, so the
// link between an invite and the identity it created stays traceable even though
// the allowlist row itself stores only the signing key.
type Invite struct {
Token string `json:"token"`
Handle string `json:"handle"`
Role string `json:"role"`
ExpiresAt string `json:"expires_at"`
Used bool `json:"used"`
CreatedAt string `json:"created_at"`
// Audit (populated on consume; omitted on the wire while pending).
UsedAt string `json:"used_at,omitempty"`
UsedSignPub string `json:"used_sign_pub,omitempty"`
UsedKexPub string `json:"used_kex_pub,omitempty"`
}
// Invite-flow sentinels. They let callers (and the HTTP layer) map a failed
// consume to a precise status code without string-matching: an unknown token is
// ErrNotFound (reused from the store), a spent token is ErrInviteUsed, a
// past-deadline token is ErrInviteExpired. ErrUserExists (from users.go) is
// reused when the presented signing key is already registered.
var (
ErrInviteUsed = errors.New("membership: invite already used")
ErrInviteExpired = errors.New("membership: invite expired")
)
// defaultInviteTTL is the lifetime of an invite when the caller passes a
// non-positive ttlSecs. Seven days mirrors a typical "share this link this
// week" expectation while keeping the un-authenticated /register window bounded.
const defaultInviteTTL = 7 * 24 * time.Hour
// newInviteToken returns 32 cryptographically-random bytes as lowercase hex (64
// chars). The token IS the bearer secret that authorizes /register, so it must
// be unguessable; crypto/rand is the only acceptable source.
func newInviteToken() (string, error) {
b := make([]byte, 32)
if _, err := rand.Read(b); err != nil {
return "", fmt.Errorf("membership: generate invite token: %w", err)
}
return hex.EncodeToString(b), nil
}
// inviteTTL resolves a caller-supplied ttlSecs into a concrete duration,
// defaulting to defaultInviteTTL when non-positive.
func inviteTTL(ttlSecs int) time.Duration {
if ttlSecs <= 0 {
return defaultInviteTTL
}
return time.Duration(ttlSecs) * time.Second
}
// inviteIsExpired reports whether the RFC3339 expiry has passed. A token whose
// expiry cannot be parsed is treated as expired (fail closed): a corrupt
// deadline must never widen the unauthenticated registration window.
func inviteIsExpired(expiresAt string) bool {
exp, err := time.Parse(time.RFC3339Nano, expiresAt)
if err != nil {
return true
}
return time.Now().UTC().After(exp)
}
// validateInviteRole normalizes and validates the role an invite may carry. It
// mirrors AddUser: empty defaults to member, and only admin|member are allowed
// (an admin minting an admin invite is deliberate and permitted).
func validateInviteRole(role string) (string, error) {
if role == "" {
return RoleMember, nil
}
if role != RoleAdmin && role != RoleMember {
return "", fmt.Errorf("membership: invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember)
}
return role, nil
}
// ---- SQLite implementation ------------------------------------------------
// CreateInvite mints a single-use invite for a future user. handle is required;
// role defaults to member and must be admin|member. ttlSecs sets the lifetime
// (non-positive uses the 7-day default). The token is 32 random bytes in hex.
func (s *sqliteStore) CreateInvite(handle, role string, ttlSecs int) (Invite, error) {
if handle == "" {
return Invite{}, fmt.Errorf("membership: CreateInvite: handle required")
}
role, err := validateInviteRole(role)
if err != nil {
return Invite{}, err
}
token, err := newInviteToken()
if err != nil {
return Invite{}, err
}
now := time.Now().UTC()
inv := Invite{
Token: token,
Handle: handle,
Role: role,
ExpiresAt: now.Add(inviteTTL(ttlSecs)).Format(time.RFC3339Nano),
Used: false,
CreatedAt: now.Format(time.RFC3339Nano),
}
if _, err := s.db.Exec(
`INSERT INTO invites (token, handle, role, expires_at, used, created_at) VALUES (?, ?, ?, ?, 0, ?)`,
inv.Token, inv.Handle, inv.Role, inv.ExpiresAt, inv.CreatedAt,
); err != nil {
return Invite{}, fmt.Errorf("membership: insert invite: %w", err)
}
return inv, nil
}
// GetInvite returns the invite with the given token, or ErrNotFound (wrapped)
// when there is none.
func (s *sqliteStore) GetInvite(token string) (Invite, error) {
var inv Invite
var used int
var usedAt, usedSign, usedKex sql.NullString
err := s.db.QueryRow(
`SELECT token, handle, role, expires_at, used, created_at, used_at, used_sign_pub, used_kex_pub
FROM invites WHERE token = ?`, token,
).Scan(&inv.Token, &inv.Handle, &inv.Role, &inv.ExpiresAt, &used, &inv.CreatedAt, &usedAt, &usedSign, &usedKex)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, ErrNotFound)
}
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, err)
}
inv.Used = used != 0
inv.UsedAt, inv.UsedSignPub, inv.UsedKexPub = usedAt.String, usedSign.String, usedKex.String
return inv, nil
}
// ListInvites returns every invite ordered newest-first (by created_at). It
// includes consumed invites so the admin panel can show the full picture; the
// caller filters to "pending" when it wants only live links.
func (s *sqliteStore) ListInvites() ([]Invite, error) {
rows, err := s.db.Query(
`SELECT token, handle, role, expires_at, used, created_at, used_at, used_sign_pub, used_kex_pub
FROM invites ORDER BY created_at DESC, token`,
)
if err != nil {
return nil, fmt.Errorf("membership: list invites: %w", err)
}
defer rows.Close()
var out []Invite
for rows.Next() {
var inv Invite
var used int
var usedAt, usedSign, usedKex sql.NullString
if err := rows.Scan(&inv.Token, &inv.Handle, &inv.Role, &inv.ExpiresAt, &used, &inv.CreatedAt, &usedAt, &usedSign, &usedKex); err != nil {
return nil, fmt.Errorf("membership: scan invite: %w", err)
}
inv.Used = used != 0
inv.UsedAt, inv.UsedSignPub, inv.UsedKexPub = usedAt.String, usedSign.String, usedKex.String
out = append(out, inv)
}
return out, rows.Err()
}
// ConsumeInvite atomically validates and spends an invite, registering the
// presented signing key as a bus user with the invite's handle and role. It is
// the ONLY path that adds to the allowlist without an admin signature: the
// bearer token is the authorization, so the checks here are the security
// boundary.
//
// Atomicity (single transaction): the invite is marked used FIRST (guarded by
// `used = 0`, so two concurrent consumers cannot both win), then the user is
// inserted. A token that passes validation is therefore spent exactly once.
// Special case: if the signing key is already registered, the user INSERT hits
// the PRIMARY KEY and we return ErrUserExists — but the invite stays SPENT (we
// commit the mark), matching the JetStream backend's burn-on-claim semantics so
// the two stores behave identically. A genuine backend error rolls everything
// back, leaving the invite reusable.
func (s *sqliteStore) ConsumeInvite(token, signPub, kexPub string) error {
signPub = normalizeSignPub(signPub)
kexPub = normalizeSignPub(kexPub)
if signPub == "" {
return fmt.Errorf("membership: ConsumeInvite: sign_pub required")
}
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("membership: ConsumeInvite: begin: %w", err)
}
defer tx.Rollback()
var handle, role, expiresAt string
var used int
err = tx.QueryRow(
`SELECT handle, role, expires_at, used FROM invites WHERE token = ?`, token,
).Scan(&handle, &role, &expiresAt, &used)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrNotFound)
}
return fmt.Errorf("membership: consume invite %q: %w", token, err)
}
if used != 0 {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
}
if inviteIsExpired(expiresAt) {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteExpired)
}
// Mark used first, guarded by used = 0 so a concurrent consumer that already
// flipped it (rows affected = 0) is rejected as used rather than double-spending.
now := nowRFC3339()
res, err := tx.Exec(
`UPDATE invites SET used = 1, used_at = ?, used_sign_pub = ?, used_kex_pub = ? WHERE token = ? AND used = 0`,
now, signPub, kexPub, token,
)
if err != nil {
return fmt.Errorf("membership: consume invite %q: mark used: %w", token, err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("membership: consume invite %q: rows affected: %w", token, err)
}
if n == 0 {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
}
// Register the user with the invite-fixed handle and role.
_, err = tx.Exec(
`INSERT INTO users (sign_pub, handle, role, status, created_at) VALUES (?, ?, ?, ?, ?)`,
signPub, handle, role, StatusActive, now,
)
if err != nil {
// Already-registered key: the invite is still spent (commit the mark) so
// the burn-on-claim contract matches the KV store. Any other failure rolls back.
if isUniqueViolation(err) {
if cErr := tx.Commit(); cErr != nil {
return fmt.Errorf("membership: consume invite %q: commit: %w", token, cErr)
}
return ErrUserExists
}
return fmt.Errorf("membership: consume invite %q: insert user: %w", token, err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("membership: consume invite %q: commit: %w", token, err)
}
return nil
}
// CancelInvite removes a pending invite (the admin revoked the link before it
// was used). It hard-deletes the row; a consumed invite stays for audit only if
// the caller targets a pending token. Deleting an unknown token returns
// ErrNotFound so the HTTP layer can answer 404.
func (s *sqliteStore) CancelInvite(token string) error {
res, err := s.db.Exec(`DELETE FROM invites WHERE token = ?`, token)
if err != nil {
return fmt.Errorf("membership: cancel invite %q: %w", token, err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("membership: cancel invite %q: rows affected: %w", token, err)
}
if n == 0 {
return fmt.Errorf("membership: cancel invite %q: %w", token, ErrNotFound)
}
return nil
}
// isUniqueViolation reports whether err is a SQLite UNIQUE/PRIMARY KEY conflict.
// modernc.org/sqlite surfaces it as a message fragment; matching it here keeps
// the string-matching in one place (the same fragments AddUser checks inline).
func isUniqueViolation(err error) bool {
if err == nil {
return false
}
msg := err.Error()
return strings.Contains(msg, "UNIQUE constraint") || strings.Contains(msg, "PRIMARY KEY")
}
+194
View File
@@ -0,0 +1,194 @@
package membership
import (
"bytes"
"encoding/hex"
"encoding/json"
"io"
"net/http"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
)
// postRegister posts an UNSIGNED /register request (the wallet-model join: the
// new identity is not yet in the allowlist, so it cannot sign). It returns the
// status and body so a test can assert the precise code.
func postRegister(t *testing.T, h *authHarness, body registerReq) (int, string) {
t.Helper()
b, err := json.Marshal(body)
if err != nil {
t.Fatalf("marshal register: %v", err)
}
resp, err := http.Post(h.ts.URL+"/register", "application/json", bytes.NewReader(b))
if err != nil {
t.Fatalf("post register: %v", err)
}
defer resp.Body.Close()
rb, _ := io.ReadAll(resp.Body)
return resp.StatusCode, string(rb)
}
// TestInvitesHTTP_Golden is the end-to-end wallet-model flow over real HTTP:
// alice (admin) mints an invite, a brand-new identity redeems it UNSIGNED via
// /register, the user then appears in the admin allowlist, and a second redeem of
// the same token is rejected as used.
func TestInvitesHTTP_Golden(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
// Admin mints an invite.
var inv createInviteResp
code, body := signedJSON(t, h, "POST", "/invites",
createInviteReq{Handle: "newbie", Role: RoleMember}, h.alice, 1)
if code != http.StatusCreated {
t.Fatalf("admin create invite should be 201, got %d (%s)", code, body)
}
if err := json.Unmarshal([]byte(body), &inv); err != nil {
t.Fatalf("decode invite: %v (%s)", err, body)
}
if len(inv.Token) != 64 || inv.ExpiresAt == "" {
t.Fatalf("invite token/expiry malformed: %+v", inv)
}
// A brand-new identity redeems it WITHOUT any admin signature.
id, _ := cs.GenerateIdentity()
signPub := hex.EncodeToString(id.SignPub)
kexPub := hex.EncodeToString(id.KexPub)
if code, body := postRegister(t, h, registerReq{Token: inv.Token, SignPub: signPub, KexPub: kexPub}); code != http.StatusCreated {
t.Fatalf("register should be 201, got %d (%s)", code, body)
}
// The user now appears in the admin allowlist with the invite's handle/role.
users := listUsers(t, h, 2)
row, ok := findUser(users, signPub)
if !ok {
t.Fatalf("registered user missing from allowlist: %+v", users)
}
if row.Handle != "newbie" || row.Role != RoleMember || row.Status != StatusActive {
t.Fatalf("registered user row wrong: %+v", row)
}
// The invite is no longer pending.
if code, body := signedJSON(t, h, "GET", "/invites", nil, h.alice, 3); code == http.StatusOK {
var pend []inviteJSON
_ = json.Unmarshal([]byte(body), &pend)
for _, p := range pend {
if p.Token == inv.Token {
t.Fatalf("consumed invite should not be listed as pending: %+v", pend)
}
}
}
// Single-use: a second redeem of the same token is 409 used.
id2, _ := cs.GenerateIdentity()
if code, body := postRegister(t, h, registerReq{
Token: inv.Token, SignPub: hex.EncodeToString(id2.SignPub), KexPub: hex.EncodeToString(id2.KexPub),
}); code != http.StatusConflict {
t.Fatalf("second redeem should be 409, got %d (%s)", code, body)
}
}
// TestInvitesHTTP_RegisterValidation covers /register input + state errors: an
// unknown token is 404, an expired token is 410, and malformed hex keys are 400 —
// each WITHOUT registering anything.
func TestInvitesHTTP_RegisterValidation(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
id, _ := cs.GenerateIdentity()
signPub := hex.EncodeToString(id.SignPub)
kexPub := hex.EncodeToString(id.KexPub)
// Unknown token -> 404.
if code, body := postRegister(t, h, registerReq{Token: "deadbeef", SignPub: signPub, KexPub: kexPub}); code != http.StatusNotFound {
t.Fatalf("unknown token should be 404, got %d (%s)", code, body)
}
// Malformed sign_pub -> 400.
if code, body := postRegister(t, h, registerReq{Token: "x", SignPub: "abcd", KexPub: kexPub}); code != http.StatusBadRequest {
t.Fatalf("malformed sign_pub should be 400, got %d (%s)", code, body)
}
// Malformed kex_pub -> 400.
if code, body := postRegister(t, h, registerReq{Token: "x", SignPub: signPub, KexPub: "zzzz"}); code != http.StatusBadRequest {
t.Fatalf("malformed kex_pub should be 400, got %d (%s)", code, body)
}
// Expired token -> 410. Mint via the admin API, then force its deadline past
// directly in the store (white-box).
var inv createInviteResp
_, body := signedJSON(t, h, "POST", "/invites", createInviteReq{Handle: "late", Role: RoleMember}, h.alice, 1)
if err := json.Unmarshal([]byte(body), &inv); err != nil {
t.Fatalf("decode invite: %v (%s)", err, body)
}
ss, ok := h.store.(*sqliteStore)
if !ok {
t.Fatalf("expected sqliteStore harness")
}
past := time.Now().Add(-time.Hour).UTC().Format(time.RFC3339Nano)
if _, err := ss.db.Exec(`UPDATE invites SET expires_at = ? WHERE token = ?`, past, inv.Token); err != nil {
t.Fatalf("force expire: %v", err)
}
if code, rb := postRegister(t, h, registerReq{Token: inv.Token, SignPub: signPub, KexPub: kexPub}); code != http.StatusGone {
t.Fatalf("expired token should be 410, got %d (%s)", code, rb)
}
}
// TestInvitesHTTP_NonAdminForbidden is the security spine for the new endpoints:
// a REGISTERED non-admin (bob) is denied on POST /invites, GET /invites,
// DELETE /invites/{token}, and DELETE /users/{signpub} — each a 403 by role.
func TestInvitesHTTP_NonAdminForbidden(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
bob, _ := cs.GenerateIdentity()
register(t, h, bob, "bob") // role member
bobPub := hex.EncodeToString(bob.SignPub)
checks := []struct {
name string
method string
path string
body any
}{
{"create invite", "POST", "/invites", createInviteReq{Handle: "x", Role: RoleMember}},
{"list invites", "GET", "/invites", nil},
{"cancel invite", "DELETE", "/invites/sometoken", nil},
{"delete user", "DELETE", "/users/" + bobPub, nil},
}
for i, c := range checks {
code, body := signedJSON(t, h, c.method, c.path, c.body, bob, i+1)
if code != http.StatusForbidden {
t.Fatalf("non-admin %s should be 403, got %d (%s)", c.name, code, body)
}
}
}
// TestUsersHTTP_HardDelete proves DELETE /users/{signpub} purges a user (distinct
// from revoke's status flip): alice adds carol, hard-deletes her, and carol then
// vanishes from the allowlist entirely (not merely flagged revoked).
func TestUsersHTTP_HardDelete(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
carol, _ := cs.GenerateIdentity()
carolPub := hex.EncodeToString(carol.SignPub)
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: carolPub, Handle: "carol", Role: RoleMember}, h.alice, 1); code != http.StatusCreated {
t.Fatalf("add carol should be 201, got %d (%s)", code, body)
}
// Hard-delete carol.
if code, body := signedJSON(t, h, "DELETE", "/users/"+carolPub, nil, h.alice, 2); code != http.StatusOK {
t.Fatalf("hard-delete carol should be 200, got %d (%s)", code, body)
}
// She is gone entirely — not present in the list at all (vs revoke, which
// keeps her as status=revoked).
users := listUsers(t, h, 3)
if _, ok := findUser(users, carolPub); ok {
t.Fatalf("hard-deleted carol must NOT appear in the allowlist: %+v", users)
}
// Deleting her again is a 404.
if code, body := signedJSON(t, h, "DELETE", "/users/"+carolPub, nil, h.alice, 4); code != http.StatusNotFound {
t.Fatalf("re-delete should be 404, got %d (%s)", code, body)
}
}
+186
View File
@@ -0,0 +1,186 @@
package membership
import (
"encoding/hex"
"encoding/json"
"errors"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
)
// newIDHex generates a fresh identity and returns its signing and key-exchange
// public keys as lowercase hex — the two keys a client presents to /register.
func newIDHex(t *testing.T) (signPub, kexPub string) {
t.Helper()
id, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("identity: %v", err)
}
return hex.EncodeToString(id.SignPub), hex.EncodeToString(id.KexPub)
}
// inviteSuite drives the full invite lifecycle against any Store backend: mint,
// look up, redeem (which registers the user), reject a second redeem (single-use)
// and a non-existent token, reject an expired token (forced past via the
// backend-specific forceExpire closure), and hard-delete a user. It is shared by
// the SQLite and JetStream tests so both backends prove identical behavior.
func inviteSuite(t *testing.T, s Store, forceExpire func(token string)) {
t.Helper()
// Mint an invite fixing handle + role.
inv, err := s.CreateInvite("alice-new", RoleMember, 3600)
if err != nil {
t.Fatalf("CreateInvite: %v", err)
}
if len(inv.Token) != 64 {
t.Fatalf("token should be 64 hex chars, got %d (%q)", len(inv.Token), inv.Token)
}
if inv.Used {
t.Fatalf("fresh invite must not be used")
}
// GetInvite round-trips it.
got, err := s.GetInvite(inv.Token)
if err != nil || got.Handle != "alice-new" || got.Role != RoleMember {
t.Fatalf("GetInvite mismatch: %+v err=%v", got, err)
}
// Redeem it: the presented signing key joins the allowlist with the invite's
// handle and role.
signPub, kexPub := newIDHex(t)
if err := s.ConsumeInvite(inv.Token, signPub, kexPub); err != nil {
t.Fatalf("ConsumeInvite (golden): %v", err)
}
u, err := s.GetUser(signPub)
if err != nil {
t.Fatalf("GetUser after register: %v", err)
}
if u.Handle != "alice-new" || u.Role != RoleMember || u.Status != StatusActive {
t.Fatalf("registered user wrong: %+v", u)
}
if !s.IsAuthorized(signPub) {
t.Fatalf("registered user should be authorized")
}
// Single-use: redeeming the same token again (even with a different identity)
// is rejected as used.
sp2, kp2 := newIDHex(t)
if err := s.ConsumeInvite(inv.Token, sp2, kp2); !errors.Is(err, ErrInviteUsed) {
t.Fatalf("second redeem should be ErrInviteUsed, got %v", err)
}
if _, err := s.GetUser(sp2); !errors.Is(err, ErrNotFound) {
t.Fatalf("second identity must NOT be registered, got %v", err)
}
// Unknown token is ErrNotFound.
if err := s.ConsumeInvite("deadbeef", "ab", "cd"); !errors.Is(err, ErrNotFound) {
t.Fatalf("unknown token should be ErrNotFound, got %v", err)
}
// Expired invite: mint one, force its deadline into the past, redeem -> rejected.
exp, err := s.CreateInvite("late", RoleMember, 3600)
if err != nil {
t.Fatalf("CreateInvite expired: %v", err)
}
forceExpire(exp.Token)
sp3, kp3 := newIDHex(t)
if err := s.ConsumeInvite(exp.Token, sp3, kp3); !errors.Is(err, ErrInviteExpired) {
t.Fatalf("expired redeem should be ErrInviteExpired, got %v", err)
}
// CancelInvite removes a pending invite; redeeming it afterward is ErrNotFound.
canc, err := s.CreateInvite("cancelme", RoleMember, 3600)
if err != nil {
t.Fatalf("CreateInvite cancel: %v", err)
}
if err := s.CancelInvite(canc.Token); err != nil {
t.Fatalf("CancelInvite: %v", err)
}
if err := s.ConsumeInvite(canc.Token, sp3, kp3); !errors.Is(err, ErrNotFound) {
t.Fatalf("cancelled invite redeem should be ErrNotFound, got %v", err)
}
// Hard-delete the registered user: it disappears from the allowlist entirely.
if err := s.DeleteUser(signPub); err != nil {
t.Fatalf("DeleteUser: %v", err)
}
if _, err := s.GetUser(signPub); !errors.Is(err, ErrNotFound) {
t.Fatalf("deleted user should be ErrNotFound, got %v", err)
}
if s.IsAuthorized(signPub) {
t.Fatalf("deleted user must not be authorized")
}
// Deleting an unknown key is ErrNotFound.
if err := s.DeleteUser(signPub); !errors.Is(err, ErrNotFound) {
t.Fatalf("re-delete should be ErrNotFound, got %v", err)
}
}
// TestInvitesSQLite runs the suite against the default SQLite backend, forcing
// expiry with a direct UPDATE on the embedded DB (white-box, same package).
func TestInvitesSQLite(t *testing.T) {
s := openTestStore(t)
inviteSuite(t, s, func(token string) {
past := time.Now().Add(-time.Hour).UTC().Format(time.RFC3339Nano)
if _, err := s.db.Exec(`UPDATE invites SET expires_at = ? WHERE token = ?`, past, token); err != nil {
t.Fatalf("force expire: %v", err)
}
})
}
// TestInvitesJetStream runs the same suite against the replicated KV backend,
// forcing expiry by re-Putting the invite JSON with a past deadline.
func TestInvitesJetStream(t *testing.T) {
s, _, _ := newKVStore(t)
inviteSuite(t, s, func(token string) {
inv, err := s.GetInvite(token)
if err != nil {
t.Fatalf("force expire: get invite: %v", err)
}
inv.ExpiresAt = time.Now().Add(-time.Hour).UTC().Format(time.RFC3339Nano)
b, err := json.Marshal(inv)
if err != nil {
t.Fatalf("force expire: marshal: %v", err)
}
ctx, cancel := s.ctx()
defer cancel()
if _, err := s.invites.Put(ctx, token, b); err != nil {
t.Fatalf("force expire: put: %v", err)
}
})
}
// TestConsumeInvite_AlreadyRegistered covers the burn-on-claim edge: redeeming a
// valid invite with a signing key that is already registered surfaces
// ErrUserExists AND spends the invite (both backends behave identically).
func TestConsumeInvite_AlreadyRegistered(t *testing.T) {
for _, tc := range []struct {
name string
open func(t *testing.T) Store
}{
{"sqlite", func(t *testing.T) Store { return openTestStore(t) }},
{"jetstream", func(t *testing.T) Store { s, _, _ := newKVStore(t); return s }},
} {
t.Run(tc.name, func(t *testing.T) {
s := tc.open(t)
signPub, kexPub := newIDHex(t)
if err := s.AddUser(signPub, "existing", RoleMember); err != nil {
t.Fatalf("seed user: %v", err)
}
inv, err := s.CreateInvite("dup", RoleMember, 3600)
if err != nil {
t.Fatalf("CreateInvite: %v", err)
}
if err := s.ConsumeInvite(inv.Token, signPub, kexPub); !errors.Is(err, ErrUserExists) {
t.Fatalf("redeem with registered key should be ErrUserExists, got %v", err)
}
// The invite is spent (burn-on-claim): a fresh identity cannot reuse it.
sp2, kp2 := newIDHex(t)
if err := s.ConsumeInvite(inv.Token, sp2, kp2); !errors.Is(err, ErrInviteUsed) {
t.Fatalf("invite should be spent after a burned claim, got %v", err)
}
})
}
}
+225 -10
View File
@@ -50,6 +50,7 @@ const (
bucketByMember = "UNIBUS_rooms_by_member"
bucketRoomKeys = "UNIBUS_room_keys"
bucketUsers = "UNIBUS_users"
bucketInvites = "UNIBUS_invites"
defaultKVOpTime = 5 * time.Second
)
@@ -71,6 +72,7 @@ type jetstreamStore struct {
byMember jetstream.KeyValue
keys jetstream.KeyValue
users jetstream.KeyValue
invites jetstream.KeyValue
opTimeout time.Duration
}
@@ -85,8 +87,18 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
if opTimeout <= 0 {
opTimeout = defaultKVOpTime
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
// Bootstrap budget for creating/opening the buckets. On a single node JetStream
// is ready the instant the server starts, so the first attempt succeeds. On a
// COLD multi-node cluster the JetStream meta-group must first elect a leader and
// each node must establish contact with it before its $JS.API responds. A KV
// op is a NATS request/reply: if it is published before the node's JetStream is
// ready the request is dropped (not queued), and a single long-context call then
// just blocks until it times out (issue 0006g). So we RETRY each bucket op with
// short per-attempt contexts until it succeeds or the overall bootstrap budget
// is exhausted; once the cluster is ready the next retry lands and the buckets
// are created, after which they persist and every node opens them quickly.
bootstrapBudget := 120 * time.Second
deadline := time.Now().Add(bootstrapBudget)
s := &jetstreamStore{opTimeout: opTimeout}
for _, b := range []struct {
@@ -98,15 +110,29 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
{bucketByMember, &s.byMember},
{bucketRoomKeys, &s.keys},
{bucketUsers, &s.users},
{bucketInvites, &s.invites},
} {
kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: b.name,
Replicas: cfg.Replicas,
History: 1,
Storage: jetstream.FileStorage,
})
if err != nil {
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d): %w", b.name, cfg.Replicas, err)
var kv jetstream.KeyValue
var lastErr error
for {
opCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
kv, lastErr = js.CreateOrUpdateKeyValue(opCtx, jetstream.KeyValueConfig{
Bucket: b.name,
Replicas: cfg.Replicas,
History: 1,
Storage: jetstream.FileStorage,
})
cancel()
if lastErr == nil {
break
}
if time.Now().After(deadline) {
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d) after %s: %w", b.name, cfg.Replicas, bootstrapBudget, lastErr)
}
// JetStream not ready yet (no meta leader / request dropped). Wait and
// re-publish the op; in a cluster cold start this lands once the meta
// group settles.
time.Sleep(1 * time.Second)
}
*b.dst = kv
}
@@ -475,6 +501,28 @@ func (s *jetstreamStore) RevokeUser(signPub string) error {
return nil
}
// DeleteUser hard-deletes a user from the KV allowlist (the purge counterpart of
// RevokeUser's status flip). It checks existence first so deleting an unknown key
// is ErrNotFound (KV Delete is otherwise idempotent and would not signal a miss).
// Only the allowlist key is removed; room memberships the ex-user holds become
// inert because they can no longer authenticate — see the SQLite DeleteUser for
// the full rationale on why room state is left untouched.
func (s *jetstreamStore) DeleteUser(signPub string) error {
signPub = normalizeSignPub(signPub)
ctx, cancel := s.ctx()
defer cancel()
if _, err := s.users.Get(ctx, signPub); err != nil {
if errors.Is(err, jetstream.ErrKeyNotFound) {
return fmt.Errorf("membership: delete user %q: %w", signPub, ErrNotFound)
}
return fmt.Errorf("membership: delete user %q: %w", signPub, err)
}
if err := s.users.Delete(ctx, signPub); err != nil {
return fmt.Errorf("membership: delete user %q: %w", signPub, err)
}
return nil
}
// IsAuthorized reports whether signPub is an active bus user. Any backend error
// (including a KV quorum loss or timeout) yields false: fail closed.
func (s *jetstreamStore) IsAuthorized(signPub string) bool {
@@ -510,6 +558,173 @@ func (s *jetstreamStore) HasAdmin() bool {
return false
}
// ---- invites (single-use registration tokens) ----------------------------
func (s *jetstreamStore) CreateInvite(handle, role string, ttlSecs int) (Invite, error) {
if handle == "" {
return Invite{}, fmt.Errorf("membership: CreateInvite: handle required")
}
role, err := validateInviteRole(role)
if err != nil {
return Invite{}, err
}
token, err := newInviteToken()
if err != nil {
return Invite{}, err
}
now := time.Now().UTC()
inv := Invite{
Token: token,
Handle: handle,
Role: role,
ExpiresAt: now.Add(inviteTTL(ttlSecs)).Format(time.RFC3339Nano),
Used: false,
CreatedAt: now.Format(time.RFC3339Nano),
}
b, err := json.Marshal(inv)
if err != nil {
return Invite{}, fmt.Errorf("membership: marshal invite: %w", err)
}
ctx, cancel := s.ctx()
defer cancel()
// Create (not Put) so a token collision is rejected rather than silently
// overwriting a live invite — a 32-byte random collision is astronomically
// unlikely, but Create makes the single-use guarantee unconditional.
if _, err := s.invites.Create(ctx, token, b); err != nil {
if errors.Is(err, jetstream.ErrKeyExists) {
return Invite{}, fmt.Errorf("membership: create invite: token collision")
}
return Invite{}, fmt.Errorf("membership: create invite: %w", err)
}
return inv, nil
}
func (s *jetstreamStore) GetInvite(token string) (Invite, error) {
ctx, cancel := s.ctx()
defer cancel()
e, err := s.invites.Get(ctx, token)
if err != nil {
if errors.Is(err, jetstream.ErrKeyNotFound) {
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, ErrNotFound)
}
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, err)
}
var inv Invite
if err := json.Unmarshal(e.Value(), &inv); err != nil {
return Invite{}, fmt.Errorf("membership: unmarshal invite: %w", err)
}
return inv, nil
}
func (s *jetstreamStore) ListInvites() ([]Invite, error) {
ctx, cancel := s.ctx()
w, err := s.invites.WatchAll(ctx, jetstream.IgnoreDeletes())
if err != nil {
cancel()
return nil, fmt.Errorf("membership: list invites: %w", err)
}
defer cancel()
defer w.Stop()
var out []Invite
for {
select {
case e := <-w.Updates():
if e == nil {
sort.Slice(out, func(i, j int) bool {
if out[i].CreatedAt != out[j].CreatedAt {
return out[i].CreatedAt > out[j].CreatedAt // newest first
}
return out[i].Token < out[j].Token
})
return out, nil
}
var inv Invite
if err := json.Unmarshal(e.Value(), &inv); err != nil {
return nil, fmt.Errorf("membership: unmarshal invite: %w", err)
}
out = append(out, inv)
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// ConsumeInvite spends a KV invite and registers the presented signing key. With
// no multi-key transaction, single-use is enforced by a compare-and-swap on the
// invite: the token is marked used via Update against the revision read by Get,
// so only ONE concurrent consumer can win the swap; the loser sees a revision
// mismatch and is rejected as used. The user is registered AFTER the successful
// swap. Burn-on-claim: if the signing key is already registered the swap has
// already spent the token and we surface ErrUserExists — the SQLite store commits
// the same way, so both backends behave identically.
func (s *jetstreamStore) ConsumeInvite(token, signPub, kexPub string) error {
signPub = normalizeSignPub(signPub)
kexPub = normalizeSignPub(kexPub)
if signPub == "" {
return fmt.Errorf("membership: ConsumeInvite: sign_pub required")
}
ctx, cancel := s.ctx()
defer cancel()
e, err := s.invites.Get(ctx, token)
if err != nil {
if errors.Is(err, jetstream.ErrKeyNotFound) {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrNotFound)
}
return fmt.Errorf("membership: consume invite %q: %w", token, err)
}
var inv Invite
if err := json.Unmarshal(e.Value(), &inv); err != nil {
return fmt.Errorf("membership: unmarshal invite: %w", err)
}
if inv.Used {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
}
if inviteIsExpired(inv.ExpiresAt) {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteExpired)
}
inv.Used = true
inv.UsedAt = nowRFC3339()
inv.UsedSignPub = signPub
inv.UsedKexPub = kexPub
b, err := json.Marshal(inv)
if err != nil {
return fmt.Errorf("membership: marshal invite: %w", err)
}
// CAS: Update only succeeds if the invite is still at the revision we read, so
// a racing consumer that already flipped it loses here. A failed swap is
// conservatively treated as "already used" (the common cause); the caller can
// re-read to learn the precise state.
if _, err := s.invites.Update(ctx, token, b, e.Revision()); err != nil {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
}
// Token is now spent. Register the user with the invite-fixed handle and role.
if err := s.AddUser(signPub, inv.Handle, inv.Role); err != nil {
if errors.Is(err, ErrUserExists) {
return ErrUserExists
}
return fmt.Errorf("membership: consume invite %q: register user: %w", token, err)
}
return nil
}
func (s *jetstreamStore) CancelInvite(token string) error {
ctx, cancel := s.ctx()
defer cancel()
if _, err := s.invites.Get(ctx, token); err != nil {
if errors.Is(err, jetstream.ErrKeyNotFound) {
return fmt.Errorf("membership: cancel invite %q: %w", token, ErrNotFound)
}
return fmt.Errorf("membership: cancel invite %q: %w", token, err)
}
if err := s.invites.Delete(ctx, token); err != nil {
return fmt.Errorf("membership: cancel invite %q: %w", token, err)
}
return nil
}
// ---- snapshot import / export (issue 0003c migration) ---------------------
// importSnapshot writes a full Snapshot into the KV buckets, preserving each
+28
View File
@@ -0,0 +1,28 @@
-- 003_invites.sql — single-use registration invites (issue: user accounts / wallet model).
--
-- An admin mints an invite so a brand-new identity can join the bus allowlist
-- WITHOUT the admin ever handling its private key. The token is the bearer
-- secret that authorizes POST /register: the registering client generates its
-- keypair locally and publishes only its public keys, fixing the link between an
-- invite and the identity it creates via the audit columns below. The handle and
-- role are fixed by the admin at mint time and cannot be changed by the client
-- (no privilege escalation).
--
-- Additive and idempotent: safe to apply repeatedly. Never modify this file;
-- further schema changes go in new numbered migrations (see
-- .claude/rules/db_migrations.md). The embedded copy under
-- pkg/membership/migrations/003_invites.sql mirrors this file byte-for-byte.
CREATE TABLE IF NOT EXISTS invites (
token TEXT PRIMARY KEY, -- 32 random bytes in lowercase hex (the bearer secret)
handle TEXT NOT NULL, -- handle the new user will get (fixed by admin)
role TEXT NOT NULL DEFAULT 'member', -- 'admin' | 'member' (fixed by admin)
expires_at TEXT NOT NULL, -- RFC3339; past this the invite is dead
used INTEGER NOT NULL DEFAULT 0, -- 0 pending, 1 consumed (single-use)
created_at TEXT NOT NULL,
used_at TEXT, -- RFC3339 when consumed (NULL until used)
used_sign_pub TEXT, -- Ed25519 key that consumed it (audit; NULL until used)
used_kex_pub TEXT -- X25519 key presented at registration (audit; NULL until used)
);
CREATE INDEX IF NOT EXISTS idx_invites_used ON invites(used);
+406 -14
View File
@@ -144,9 +144,11 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
now := time.Now()
// Per-IP rate limit runs first, ahead of auth and body reads, so a flood is
// shed at the cheapest possible point. The health probe is exempt so liveness
// checks are never throttled.
if !isAuthExempt(r) && !s.limiter.allow(clientIP(r), now) {
// shed at the cheapest possible point. ONLY the health probe is exempt so
// liveness checks are never throttled — note this is isRateExempt, NOT
// isAuthExempt: POST /register is auth-exempt (no admin signature) but stays
// rate-limited, since it is the one un-signed path that mutates the allowlist.
if !isRateExempt(r) && !s.limiter.allow(clientIP(r), now) {
writeErr(w, http.StatusTooManyRequests, "rate limit exceeded")
return
}
@@ -213,9 +215,12 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeErr(w, http.StatusUnauthorized, "unauthorized: "+err.Error())
return
}
// Carry the authenticated signer's endpoint into the handler so room handlers
// can authorize by membership (audit H3). Only set on a verified identity.
s.mux.ServeHTTP(w, r.WithContext(withSigner(r.Context(), res.endpoint)))
// Carry the authenticated signer's endpoint AND signing key into the handler.
// Room handlers authorize by membership via the endpoint (audit H3); the
// user-management handlers authorize by role via the signing key (the endpoint
// id is a one-way hash of the key, so it cannot be reversed to look the signer
// up in the user allowlist). Both are set only on a verified identity.
s.mux.ServeHTTP(w, r.WithContext(withSigner(r.Context(), res.endpoint, res.pubHex)))
}
// isBodyTooLarge reports whether err is the sentinel returned by MaxBytesReader
@@ -229,11 +234,19 @@ func isBodyTooLarge(err error) bool {
// values cannot collide with keys set by other packages.
type ctxKey int
const ctxSignerEndpoint ctxKey = iota
const (
ctxSignerEndpoint ctxKey = iota
ctxSignerPub
)
// withSigner returns a context carrying the authenticated signer's endpoint id.
func withSigner(ctx context.Context, endpoint string) context.Context {
return context.WithValue(ctx, ctxSignerEndpoint, endpoint)
// withSigner returns a context carrying the authenticated signer's endpoint id
// and signing public key (lowercase hex). The endpoint authorizes room
// membership; the signing key authorizes user-management by role, because the
// endpoint id is a one-way hash of the key (base64url(sha256(signPub))) and so
// cannot be reversed to look the signer up in the user allowlist.
func withSigner(ctx context.Context, endpoint, pubHex string) context.Context {
ctx = context.WithValue(ctx, ctxSignerEndpoint, endpoint)
return context.WithValue(ctx, ctxSignerPub, pubHex)
}
// signerEndpoint returns the authenticated signer's endpoint id and whether one
@@ -245,6 +258,16 @@ func signerEndpoint(r *http.Request) (string, bool) {
return v, ok && v != ""
}
// signerPubHex returns the authenticated signer's signing public key (lowercase
// hex) and whether one is present. Like signerEndpoint it is absent under
// AuthOff and on a soft-mode pass-through; the user-management handlers treat
// that absence as "no admin identity" and deny (default-deny), since a
// privilege-granting operation must never run without a verified admin.
func signerPubHex(r *http.Request) (string, bool) {
v, ok := r.Context().Value(ctxSignerPub).(string)
return v, ok && v != ""
}
// requireMember authorizes a room request by membership (audit H3): it returns
// the signer endpoint and true when the request may proceed, or writes 403 and
// returns false when an authenticated signer is not a member of roomID. When no
@@ -262,13 +285,54 @@ func (s *Server) requireMember(w http.ResponseWriter, r *http.Request, roomID st
return signer, true
}
// isAuthExempt lists requests that bypass control-plane auth even under enforce.
// Only the unauthenticated health probe qualifies: it carries no data and is
// needed by load balancers / smoke checks / systemd before any identity exists.
func isAuthExempt(r *http.Request) bool {
// requireAdmin authorizes a user-management request: it returns the signer's
// signing-key hex and true ONLY when the authenticated signer is a user with
// role admin and active status; otherwise it writes 403 and returns false.
//
// Default-deny, with no dev relaxation: unlike requireMember (which allows a
// request when no authenticated signer is present, preserving AuthOff/dev
// behavior for room reads), this denies whenever the signer is absent or is not
// a verified active admin. The user-management endpoints grant and revoke bus
// access, so they must never be reachable without a verified admin identity —
// the store is consulted on every call so a just-revoked admin is denied
// immediately, and any store error fails closed.
func (s *Server) requireAdmin(w http.ResponseWriter, r *http.Request) (string, bool) {
pubHex, ok := signerPubHex(r)
if !ok {
writeErr(w, http.StatusForbidden, "forbidden: admin role required")
return "", false
}
u, err := s.store.GetUser(pubHex)
if err != nil || u.Role != RoleAdmin || u.Status != StatusActive {
writeErr(w, http.StatusForbidden, "forbidden: admin role required")
return "", false
}
return pubHex, true
}
// isRateExempt lists requests that bypass the per-IP rate limiter. Only the
// health probe qualifies: a load balancer / systemd / smoke check polls it and
// must never be throttled. Everything else — including POST /register — is rate
// limited.
func isRateExempt(r *http.Request) bool {
return r.Method == http.MethodGet && r.URL.Path == "/healthz"
}
// isAuthExempt lists requests that bypass control-plane signature auth even under
// enforce. Two qualify:
// - GET /healthz: carries no data, needed before any identity exists.
// - POST /register: the wallet-model join path. The registering identity is not
// yet in the allowlist, so it CANNOT produce an accepted admin signature;
// authorization is the single-use bearer invite token, validated inside the
// handler (ConsumeInvite). It stays rate-limited (see isRateExempt) and
// strictly validates the hex keys before spending the token.
func isAuthExempt(r *http.Request) bool {
if r.Method == http.MethodGet && r.URL.Path == "/healthz" {
return true
}
return r.Method == http.MethodPost && r.URL.Path == "/register"
}
func (s *Server) routes() {
s.mux.HandleFunc("GET /healthz", s.handleHealth)
s.mux.HandleFunc("POST /rooms", s.handleCreateRoom)
@@ -280,6 +344,23 @@ func (s *Server) routes() {
s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom)
s.mux.HandleFunc("POST /blobs", s.handlePutBlob)
s.mux.HandleFunc("GET /blobs/{hash}", s.handleGetBlob)
// User-management (admin-only) — the HTTP-signed equivalent of the local
// `membershipd user` CLI, so the admin panel manages the bus allowlist by
// signing as an admin instead of needing direct store/KV access. All three
// pass through requireAdmin; they hit the same store the room handlers do.
s.mux.HandleFunc("GET /users", s.handleListUsers)
s.mux.HandleFunc("POST /users", s.handleAddUser)
s.mux.HandleFunc("POST /users/{signpub}/revoke", s.handleRevokeUser)
// Hard-delete (purge) a user — distinct from revoke (status flip). Admin-only.
s.mux.HandleFunc("DELETE /users/{signpub}", s.handleDeleteUser)
// Invites — the wallet-model account-creation path. The admin mints a
// single-use link (POST /invites, admin-only); the new user's client redeems
// it without an admin signature (POST /register, token-authorized). Listing
// and cancelling a pending invite are admin-only.
s.mux.HandleFunc("POST /invites", s.handleCreateInvite)
s.mux.HandleFunc("GET /invites", s.handleListInvites)
s.mux.HandleFunc("DELETE /invites/{token}", s.handleCancelInvite)
s.mux.HandleFunc("POST /register", s.handleRegister)
}
// ---- wire types -----------------------------------------------------------
@@ -357,6 +438,67 @@ type blobResp struct {
Hash string `json:"hash"`
}
// userJSON is the wire representation of a bus user on the admin endpoints. It
// carries the full record the panel needs to render the allowlist, including
// status (so revoked users are visible) and the timestamps. revoked_at is
// omitted for an active user.
type userJSON struct {
SignPub string `json:"sign_pub"`
Handle string `json:"handle"`
Role string `json:"role"`
Status string `json:"status"`
CreatedAt string `json:"created_at"`
RevokedAt string `json:"revoked_at,omitempty"`
}
// addUserReq is the POST /users body: the new user's Ed25519 signing key
// (64-hex), human handle, and role. role is optional and defaults to member.
type addUserReq struct {
SignPub string `json:"sign_pub"`
Handle string `json:"handle"`
Role string `json:"role"`
}
// createInviteReq is the POST /invites body (admin-only): the handle and role the
// future user will receive (fixed here, NOT chosen by the registering client) and
// an optional TTL in seconds (non-positive uses the 7-day default).
type createInviteReq struct {
Handle string `json:"handle"`
Role string `json:"role"`
TTLSecs int `json:"ttl_secs"`
}
// createInviteResp is the POST /invites reply: the bearer token to put in the
// join link and its absolute expiry. The token is shown ONCE here; the admin
// copies the link immediately.
type createInviteResp struct {
Token string `json:"token"`
ExpiresAt string `json:"expires_at"`
}
// inviteJSON is the wire representation of a pending invite on GET /invites. It
// omits the audit fields (used_*) because the listing is of pending invites only;
// used_at is carried so a client can render "expires in N".
type inviteJSON struct {
Token string `json:"token"`
Handle string `json:"handle"`
Role string `json:"role"`
ExpiresAt string `json:"expires_at"`
Used bool `json:"used"`
CreatedAt string `json:"created_at"`
}
// registerReq is the POST /register body. It is the ONLY allowlist-mutating
// request that carries no admin signature: the bearer Token authorizes it. The
// client supplies its freshly-generated public keys (sign_pub = Ed25519 identity,
// kex_pub = X25519 key-exchange), both 64-hex. The handle and role come from the
// invite, never from this body — the client cannot escalate.
type registerReq struct {
Token string `json:"token"`
SignPub string `json:"sign_pub"`
KexPub string `json:"kex_pub"`
}
// ---- helpers --------------------------------------------------------------
func writeJSON(w http.ResponseWriter, code int, v any) {
@@ -674,3 +816,253 @@ func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write(data)
}
// ---- user-management handlers (admin-only) --------------------------------
// handleListUsers returns the full bus allowlist, including revoked users, so an
// admin sees the complete picture (a revoked identity stays auditable). Admin-only.
func (s *Server) handleListUsers(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
users, err := s.store.ListUsers()
if err != nil {
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
out := make([]userJSON, 0, len(users))
for _, u := range users {
out = append(out, userJSON{
SignPub: u.SignPub,
Handle: u.Handle,
Role: u.Role,
Status: u.Status,
CreatedAt: u.CreatedAt,
RevokedAt: u.RevokedAt,
})
}
writeJSON(w, http.StatusOK, out)
}
// handleAddUser registers a new bus user from an admin-supplied Ed25519 signing
// key. It mirrors the `membershipd user add` CLI: the key must be 64-hex, the
// role must be admin or member (empty defaults to member), and re-adding an
// already-registered key is a 409 that leaves the existing row untouched — no
// silent upsert that could flip a role or clobber status. Admin-only.
func (s *Server) handleAddUser(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
var req addUserReq
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
return
}
if req.SignPub == "" || req.Handle == "" {
writeErr(w, http.StatusBadRequest, "sign_pub and handle required")
return
}
if err := ValidateSignPubHex(req.SignPub); err != nil {
writeErr(w, http.StatusBadRequest, err.Error())
return
}
role := req.Role
if role == "" {
role = RoleMember
}
if role != RoleAdmin && role != RoleMember {
writeErr(w, http.StatusBadRequest,
fmt.Sprintf("invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember))
return
}
if err := s.store.AddUser(req.SignPub, req.Handle, role); err != nil {
if errors.Is(err, ErrUserExists) {
// Idempotency contract (mirrors the CLI): re-adding a key is an explicit,
// non-destructive conflict. To replace a user, revoke then add again.
writeErr(w, http.StatusConflict,
"user already registered (unchanged); revoke it first to replace")
return
}
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusCreated, map[string]string{"status": "added"})
}
// handleRevokeUser revokes a bus user by signing key. Revocation is a status
// flip (no hard delete) so the identity stays auditable and IsAuthorized denies
// it on both planes immediately. Revoking an unknown or already-revoked key is a
// 404. Admin-only.
func (s *Server) handleRevokeUser(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
signPub := r.PathValue("signpub")
if err := ValidateSignPubHex(signPub); err != nil {
writeErr(w, http.StatusBadRequest, err.Error())
return
}
if err := s.store.RevokeUser(signPub); err != nil {
writeServerErr(w, r, http.StatusNotFound, "no active user with that key", err)
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "revoked"})
}
// handleDeleteUser hard-deletes a bus user by signing key — the purge that the
// admin panel's "Eliminar" (permanent) action maps to, distinct from revoke's
// status flip. The row is removed entirely (no audit trail kept); use revoke when
// an auditable record must remain. Deleting an unknown key is a 404. Admin-only.
//
// Security note: like revoke, this does NOT special-case the last admin — an
// admin can delete the final admin and lock the HTTP user-management surface. The
// recovery seam is the local `membershipd user add` CLI (which re-seeds an admin
// directly against the store), the same chicken-egg breaker that seeds the first
// admin.
func (s *Server) handleDeleteUser(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
signPub := r.PathValue("signpub")
if err := ValidateSignPubHex(signPub); err != nil {
writeErr(w, http.StatusBadRequest, err.Error())
return
}
if err := s.store.DeleteUser(signPub); err != nil {
if errors.Is(err, ErrNotFound) {
writeErr(w, http.StatusNotFound, "no user with that key")
return
}
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "deleted"})
}
// ---- invite handlers ------------------------------------------------------
// handleCreateInvite mints a single-use registration invite. The handle and role
// are fixed here by the admin; the role is validated (admin|member, empty ->
// member) so an unknown role is a clean 400 rather than an opaque 500. The reply
// carries the bearer token and its expiry — the admin turns the token into the
// join link. Admin-only.
func (s *Server) handleCreateInvite(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
var req createInviteReq
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
return
}
if req.Handle == "" {
writeErr(w, http.StatusBadRequest, "handle required")
return
}
if req.Role != "" && req.Role != RoleAdmin && req.Role != RoleMember {
writeErr(w, http.StatusBadRequest,
fmt.Sprintf("invalid role %q (want %q or %q)", req.Role, RoleAdmin, RoleMember))
return
}
inv, err := s.store.CreateInvite(req.Handle, req.Role, req.TTLSecs)
if err != nil {
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusCreated, createInviteResp{Token: inv.Token, ExpiresAt: inv.ExpiresAt})
}
// handleListInvites returns the PENDING invites (not yet used and not expired), so
// the admin panel shows only live links worth copying. Consumed/expired invites
// are filtered out here rather than at the store, which exposes the full set for
// other callers. Admin-only.
func (s *Server) handleListInvites(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
invites, err := s.store.ListInvites()
if err != nil {
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
out := make([]inviteJSON, 0, len(invites))
for _, inv := range invites {
if inv.Used || inviteIsExpired(inv.ExpiresAt) {
continue // pending only
}
out = append(out, inviteJSON{
Token: inv.Token,
Handle: inv.Handle,
Role: inv.Role,
ExpiresAt: inv.ExpiresAt,
Used: inv.Used,
CreatedAt: inv.CreatedAt,
})
}
writeJSON(w, http.StatusOK, out)
}
// handleCancelInvite cancels (hard-deletes) a pending invite, so an admin can
// revoke a link before it is redeemed. Cancelling an unknown token is a 404.
// Admin-only.
func (s *Server) handleCancelInvite(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
token := r.PathValue("token")
if token == "" {
writeErr(w, http.StatusBadRequest, "token required")
return
}
if err := s.store.CancelInvite(token); err != nil {
if errors.Is(err, ErrNotFound) {
writeErr(w, http.StatusNotFound, "no such invite")
return
}
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "cancelled"})
}
// handleRegister redeems an invite: the wallet-model join path. It is auth-exempt
// (no admin signature; see isAuthExempt) but rate-limited and strictly validated.
// The client presents the single-use token plus its freshly-generated public keys
// (sign_pub Ed25519, kex_pub X25519). Both keys are validated as 64-hex BEFORE the
// token is spent, the handle and role come from the invite (never this body), and
// ConsumeInvite enforces single-use atomically. Errors map to precise codes so a
// client can tell "unknown" from "used" from "expired".
func (s *Server) handleRegister(w http.ResponseWriter, r *http.Request) {
var req registerReq
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
return
}
if req.Token == "" {
writeErr(w, http.StatusBadRequest, "token required")
return
}
if err := ValidateSignPubHex(req.SignPub); err != nil {
writeErr(w, http.StatusBadRequest, err.Error())
return
}
if err := ValidateKexPubHex(req.KexPub); err != nil {
writeErr(w, http.StatusBadRequest, err.Error())
return
}
err := s.store.ConsumeInvite(req.Token, req.SignPub, req.KexPub)
switch {
case err == nil:
writeJSON(w, http.StatusCreated, map[string]string{"status": "registered"})
case errors.Is(err, ErrNotFound):
writeErr(w, http.StatusNotFound, "invalid or unknown invite token")
case errors.Is(err, ErrInviteUsed):
writeErr(w, http.StatusConflict, "invite already used")
case errors.Is(err, ErrInviteExpired):
writeErr(w, http.StatusGone, "invite expired")
case errors.Is(err, ErrUserExists):
writeErr(w, http.StatusConflict, "identity already registered")
default:
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
}
}
+14
View File
@@ -80,9 +80,23 @@ type Store interface {
GetUser(signPub string) (User, error)
ListUsers() ([]User, error)
RevokeUser(signPub string) error
// DeleteUser hard-deletes a user (the purge counterpart of RevokeUser's
// status flip): the row is removed, not just flagged. The ex-user can no
// longer authenticate, so any room memberships they hold become inert.
DeleteUser(signPub string) error
IsAuthorized(signPub string) bool
HasAdmin() bool
// Invites (single-use registration tokens; the wallet-model join path).
// CreateInvite mints a token fixing handle+role; ConsumeInvite is the only
// path that adds to the allowlist without an admin signature (the bearer
// token is the authorization), spending the token exactly once.
CreateInvite(handle, role string, ttlSecs int) (Invite, error)
GetInvite(token string) (Invite, error)
ListInvites() ([]Invite, error)
ConsumeInvite(token, signPub, kexPub string) error
CancelInvite(token string) error
// Lifecycle.
Close() error
}
+67 -2
View File
@@ -2,6 +2,7 @@ package membership
import (
"database/sql"
"encoding/hex"
"errors"
"fmt"
"strings"
@@ -35,6 +36,40 @@ type User struct {
RevokedAt string // empty unless revoked
}
// ValidateSignPubHex ensures signPub is exactly a 32-byte Ed25519 public key in
// hex (64 hex chars). It is the single source of truth for that check, shared by
// the local admin CLI (which validates before seeding the first admin) and the
// HTTP user-management handlers (which validate an admin-supplied key before it
// reaches the store). Catching a malformed key here turns a silent "authorized
// nobody" into an explicit error at the boundary.
func ValidateSignPubHex(signPub string) error {
b, err := hex.DecodeString(signPub)
if err != nil {
return fmt.Errorf("sign-pub is not valid hex: %w", err)
}
if len(b) != 32 {
return fmt.Errorf("sign-pub must be a 32-byte Ed25519 public key (64 hex chars), got %d bytes", len(b))
}
return nil
}
// ValidateKexPubHex ensures kexPub is exactly a 32-byte X25519 public key in hex
// (64 hex chars). It is the registration-side counterpart of ValidateSignPubHex:
// POST /register receives both the new identity's signing key and its key-exchange
// key, and both must be well-formed before the invite is consumed. An X25519
// public key is 32 bytes, identical in length to Ed25519, so the check is the
// same shape with a key-exchange-specific message.
func ValidateKexPubHex(kexPub string) error {
b, err := hex.DecodeString(kexPub)
if err != nil {
return fmt.Errorf("kex-pub is not valid hex: %w", err)
}
if len(b) != 32 {
return fmt.Errorf("kex-pub must be a 32-byte X25519 public key (64 hex chars), got %d bytes", len(b))
}
return nil
}
// normalizeSignPub lowercases the hex key so lookups are case-insensitive: the
// primary key is stored lowercase and every query normalizes its input the same
// way, so a caller passing uppercase hex still matches.
@@ -72,8 +107,10 @@ func (s *sqliteStore) AddUser(signPub, handle, role string) error {
return nil
}
// GetUser returns the user with the given signing public key. It returns
// sql.ErrNoRows (wrapped) when there is no such user.
// GetUser returns the user with the given signing public key. A miss returns
// ErrNotFound (wrapped), matching the storage-agnostic contract in store.go and
// the JetStream backend, so callers can branch on ErrNotFound regardless of which
// store is active (the SQLite-specific sql.ErrNoRows is mapped here).
func (s *sqliteStore) GetUser(signPub string) (User, error) {
signPub = normalizeSignPub(signPub)
var u User
@@ -83,6 +120,9 @@ func (s *sqliteStore) GetUser(signPub string) (User, error) {
signPub,
).Scan(&u.SignPub, &u.Handle, &u.Role, &u.Status, &u.CreatedAt, &revoked)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, ErrNotFound)
}
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, err)
}
u.RevokedAt = revoked.String
@@ -135,6 +175,31 @@ func (s *sqliteStore) RevokeUser(signPub string) error {
return nil
}
// DeleteUser hard-deletes a user from the allowlist (admin "remove user", the
// purge counterpart of RevokeUser's status flip). It removes ONLY the allowlist
// row: the ex-user can no longer authenticate on either plane, so any room
// memberships they still hold become inert (they cannot fetch a sealed key, sign
// a request, or open a NATS connection). We deliberately do NOT chase down and
// rewrite those room memberships here — that would be a partial, racy cleanup of
// state owned by each room's owner; a room owner kicks/rekeys to achieve forward
// secrecy when needed. Deleting an unknown key returns ErrNotFound (wrapped) so
// the HTTP layer can answer 404.
func (s *sqliteStore) DeleteUser(signPub string) error {
signPub = normalizeSignPub(signPub)
res, err := s.db.Exec(`DELETE FROM users WHERE sign_pub = ?`, signPub)
if err != nil {
return fmt.Errorf("membership: delete user %q: %w", signPub, err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("membership: delete user %q: rows affected: %w", signPub, err)
}
if n == 0 {
return fmt.Errorf("membership: delete user %q: %w", signPub, ErrNotFound)
}
return nil
}
// IsAuthorized reports whether signPub belongs to an active (non-revoked) bus
// user. It is the single authorization predicate consulted by both the control
// plane (HTTP request middleware) and the data plane (NATS nkey authenticator),
+164
View File
@@ -0,0 +1,164 @@
package membership
import (
"encoding/hex"
"encoding/json"
"net/http"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
)
// signedJSON is signedReq for a JSON body: it marshals v and signs the request
// as id with a distinct nonce. It returns the response status and body, reusing
// the auth_test harness so these tests exercise the real signed wire contract.
func signedJSON(t *testing.T, h *authHarness, method, path string, v any, id cs.Identity, n int) (int, string) {
t.Helper()
var body []byte
if v != nil {
b, err := json.Marshal(v)
if err != nil {
t.Fatalf("marshal body: %v", err)
}
body = b
}
return do(t, signedReq(t, h.ts.URL, method, path, body, id, time.Now().Unix(), nonceN(n)))
}
// TestUsersHTTP_NonAdminForbidden is the security spine: a REGISTERED but
// non-admin signer (bob, role member) is denied on every user-management
// endpoint. His signature clears auth (he is in the allowlist), so each request
// reaches the handler, where requireAdmin returns 403 — default-deny by role.
func TestUsersHTTP_NonAdminForbidden(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
bob, _ := cs.GenerateIdentity()
register(t, h, bob, "bob") // role member (see register in authz_test.go)
bobPub := hex.EncodeToString(bob.SignPub)
victim, _ := cs.GenerateIdentity()
victimPub := hex.EncodeToString(victim.SignPub)
checks := []struct {
name string
method string
path string
body any
}{
{"list users", "GET", "/users", nil},
{"add user", "POST", "/users", addUserReq{SignPub: victimPub, Handle: "mallory", Role: RoleMember}},
{"revoke user", "POST", "/users/" + bobPub + "/revoke", nil},
}
for i, c := range checks {
code, body := signedJSON(t, h, c.method, c.path, c.body, bob, i+1)
if code != http.StatusForbidden {
t.Fatalf("non-admin %s should be 403, got %d (%s)", c.name, code, body)
}
}
}
// TestUsersHTTP_AdminRoundtrip exercises the golden path end to end: alice (the
// seeded admin) adds carol, sees her in the list as active, revokes her, then
// sees her status flip to revoked (no hard delete — she stays in the list).
func TestUsersHTTP_AdminRoundtrip(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
carol, _ := cs.GenerateIdentity()
carolPub := hex.EncodeToString(carol.SignPub)
// Add carol as a member.
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: carolPub, Handle: "carol", Role: RoleMember}, h.alice, 1); code != http.StatusCreated {
t.Fatalf("admin add carol should be 201, got %d (%s)", code, body)
}
// List: carol present and active; alice (the seed admin) also present.
users := listUsers(t, h, 2)
carolRow, ok := findUser(users, carolPub)
if !ok {
t.Fatalf("carol missing from list after add: %+v", users)
}
if carolRow.Status != StatusActive || carolRow.Role != RoleMember || carolRow.Handle != "carol" {
t.Fatalf("carol row wrong after add: %+v", carolRow)
}
if _, ok := findUser(users, h.alicePub); !ok {
t.Fatalf("seeded admin alice missing from list: %+v", users)
}
// Revoke carol.
if code, body := signedJSON(t, h, "POST", "/users/"+carolPub+"/revoke", nil, h.alice, 3); code != http.StatusOK {
t.Fatalf("admin revoke carol should be 200, got %d (%s)", code, body)
}
// List again: carol still present, now revoked (status flip, not delete).
users = listUsers(t, h, 4)
carolRow, ok = findUser(users, carolPub)
if !ok {
t.Fatalf("carol vanished from list after revoke (should be a status flip): %+v", users)
}
if carolRow.Status != StatusRevoked {
t.Fatalf("carol should be revoked, got status %q", carolRow.Status)
}
}
// TestUsersHTTP_Validation covers the input-validation contract: a malformed hex
// key is 400, an unknown role is 400, and re-adding an already-registered key is
// 409 (the existing row is left untouched — no silent upsert).
func TestUsersHTTP_Validation(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
good, _ := cs.GenerateIdentity()
goodPub := hex.EncodeToString(good.SignPub)
// Invalid hex (too short) -> 400.
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: "abcd", Handle: "shorty", Role: RoleMember}, h.alice, 1); code != http.StatusBadRequest {
t.Fatalf("malformed sign_pub should be 400, got %d (%s)", code, body)
}
// Invalid role -> 400.
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: goodPub, Handle: "weirdrole", Role: "superuser"}, h.alice, 2); code != http.StatusBadRequest {
t.Fatalf("invalid role should be 400, got %d (%s)", code, body)
}
// Re-adding the seeded admin's own key -> 409 (idempotency, no overwrite).
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: h.alicePub, Handle: "alice-again", Role: RoleMember}, h.alice, 3); code != http.StatusConflict {
t.Fatalf("re-adding an existing key should be 409, got %d (%s)", code, body)
}
// And the existing row is untouched: alice is still an active admin.
u, err := h.store.GetUser(h.alicePub)
if err != nil {
t.Fatalf("get alice after conflicting re-add: %v", err)
}
if u.Role != RoleAdmin || u.Status != StatusActive || u.Handle != "alice" {
t.Fatalf("conflicting re-add mutated the existing row: %+v", u)
}
}
// listUsers signs a GET /users as alice and decodes the response.
func listUsers(t *testing.T, h *authHarness, n int) []userJSON {
t.Helper()
code, body := signedJSON(t, h, "GET", "/users", nil, h.alice, n)
if code != http.StatusOK {
t.Fatalf("admin list users should be 200, got %d (%s)", code, body)
}
var users []userJSON
if err := json.Unmarshal([]byte(body), &users); err != nil {
t.Fatalf("decode users: %v (%s)", err, body)
}
return users
}
// findUser returns the row with the given signing key (case-insensitive).
func findUser(users []userJSON, signPub string) (userJSON, bool) {
want := normalizeSignPub(signPub)
for _, u := range users {
if normalizeSignPub(u.SignPub) == want {
return u, true
}
}
return userJSON{}, false
}