Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 19bb0e56a6 | |||
| 7e2f62520d | |||
| 52c80ac010 | |||
| ca801d16af | |||
| 18987bbd2f | |||
| d64b0c052d | |||
| f31580deec | |||
| 1c9325104c | |||
| b4f3118e85 | |||
| e9053169da | |||
| b983e43090 | |||
| b379730225 | |||
| 450ca01baf | |||
| e1a7402ff1 | |||
| ce72131ddf | |||
| 3aa5a2c9a9 | |||
| 02c2004ebd | |||
| ff580ac031 | |||
| 9fbff79df4 | |||
| 33746d9962 |
@@ -14,3 +14,7 @@ worker.id
|
||||
/chat
|
||||
*.exe
|
||||
registry.db
|
||||
|
||||
# local worktree resolution (do not commit)
|
||||
go.work
|
||||
go.work.sum
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
name: unibus
|
||||
lang: go
|
||||
domain: infra
|
||||
version: 0.8.0
|
||||
version: 0.12.0
|
||||
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
|
||||
tags: [service, messaging, nats, e2e]
|
||||
uses_functions:
|
||||
@@ -122,6 +122,41 @@ Para apuntar a un NATS externo en producción: `--nats-url nats://host:4222` en
|
||||
las rutas GET de lectura. Confía en la red interna. Las rutas mutantes
|
||||
(`/rooms`, `/invite`, `/rekey`) sí exigen firma Ed25519 del owner sobre los
|
||||
bytes canónicos de la request. Endurecer es fase posterior.
|
||||
- **Gestión de usuarios: storage unificado, alta por dos vías.** El allowlist de
|
||||
usuarios vive en el MISMO store que las rooms (`pkg/membership.Store`): SQLite en
|
||||
single-node, JetStream KV replicado (`UNIBUS_users`) en cluster. El `Server` ya
|
||||
tiene ese store privilegiado abierto (es quien sirve el KV en cada nodo), así que
|
||||
expone `GET/POST /users` y `POST /users/{signpub}/revoke` como API HTTP admin-only,
|
||||
simétrica con las rutas de rooms: el panel de administración firma como admin y el
|
||||
server ejecuta la mutación contra el mismo store. El panel NO necesita `--db`, ni la
|
||||
identidad interna, ni correr en un nodo del cluster; funciona idéntico en single-node
|
||||
y cluster. La autorización es default-deny: solo un firmante que el store confirma como
|
||||
`role == "admin"` activo pasa, cualquier otro recibe 403 (encima de la firma+nonce+TLS
|
||||
ya existentes). La CLI `membershipd user add --store kv` sigue existiendo SOLO para
|
||||
sembrar el admin #0 (bootstrap del huevo-gallina: sin un admin sembrado no hay quién
|
||||
firme el primer `POST /users`); a partir de ahí toda la gestión es HTTP admin-only. El
|
||||
alta es idempotente igual que la CLI: re-alta de una clave ya registrada = 409, sin
|
||||
sobrescribir ni elevar rol; el revoke es un flip de status (sin hard-delete), auditable.
|
||||
- **Cuentas estilo WhatsApp: alta por invitación, baja por hard-delete.** Sobre la API
|
||||
admin anterior, `unibus` añade el modelo wallet de cuentas. El admin NO genera claves:
|
||||
`POST /invites` (admin-only) acuña un enlace de invitación de un solo uso con caducidad
|
||||
(token de 32 bytes `crypto/rand` en hex; TTL default 7 días), fijando `handle` y `role`.
|
||||
El nuevo usuario abre el enlace en SU cliente, que genera el par de claves localmente
|
||||
(la privada nunca sale del dispositivo) y llama `POST /register` con `{token, sign_pub,
|
||||
kex_pub}`. `/register` es la ÚNICA ruta que añade al allowlist sin firma admin —
|
||||
autorizada por el TOKEN, porque la identidad nueva aún no está en el allowlist y no puede
|
||||
firmar. Está endurecida: token fuerte de un solo uso (consumo atómico, doble uso → 409),
|
||||
caducidad (→ 410), `handle`/`role` fijados por el invite (sin escalado), validación
|
||||
estricta de ambas claves hex de 64 chars, y rate-limit por IP heredado del control plane
|
||||
(solo `/healthz` está exento). El borrado de cuenta es `DELETE /users/{signpub}`
|
||||
(admin-only): hard-delete real del allowlist, distinto del `revoke` (que se mantiene:
|
||||
revoke = quitar acceso dejando rastro auditable; delete = purga). Tras hard-delete, las
|
||||
membresías de rooms del ex-usuario quedan inertes (ya no puede autenticarse en ningún
|
||||
plano); NO se limpian a medias — un owner expulsa/rekey su room si quiere forward secrecy.
|
||||
Invites y users viven en el MISMO store (SQLite `invites`/`users`, KV `UNIBUS_invites`/
|
||||
`UNIBUS_users`). `pkg/client` gana `CreateInvite/ListInvites/CancelInvite/Register/
|
||||
DeleteUser`; solo `Register` va sin firmar. Recovery: hard-delete del último admin se
|
||||
recupera con la CLI local `membershipd user add` (mismo seam que siembra el admin #0).
|
||||
- **Identidad = secreto crítico.** El archivo de identidad (`worker.id`,
|
||||
`chat.id`) contiene las claves privadas (Ed25519 + X25519). Se escribe 0600.
|
||||
Perderlo = mensajes ilegibles, sin recuperación. Trátalo como una clave SSH.
|
||||
@@ -154,6 +189,101 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
|
||||
|
||||
## Capability growth log
|
||||
|
||||
- v0.12.0 (2026-06-07) — capa de CUENTAS estilo WhatsApp sobre el modelo wallet: alta de
|
||||
usuario por enlace de invitación de un solo uso + baja por hard-delete real. El admin
|
||||
nunca ve la clave privada del usuario. (1) **Invites**: nuevo backend de datos en ambos
|
||||
stores (SQLite `invites` vía migración aditiva `003_invites.sql`; KV `UNIBUS_invites`).
|
||||
Tipo `Invite{Token, Handle, Role, ExpiresAt, Used, CreatedAt}` + campos de auditoría del
|
||||
consumo (`UsedAt/UsedSignPub/UsedKexPub`). Métodos `Store.CreateInvite` (token 32 bytes
|
||||
`crypto/rand` hex, TTL default 7d), `GetInvite`, `ListInvites`, `ConsumeInvite` (valida
|
||||
existe/no-usado/no-caducado → registra el sign_pub con el handle/role del invite → marca
|
||||
usado, atómico) y `CancelInvite`. Consumo single-use garantizado en ambos backends: tx
|
||||
SQLite (mark guard `used=0` + insert) y CAS sobre la revisión KV (mark-first); burn-on-
|
||||
claim idéntico si la clave ya existe. (2) **Hard-delete**: `Store.DeleteUser` (SQLite
|
||||
`DELETE FROM users`, KV `users.Delete`) purga el allowlist — distinto del `revoke`
|
||||
(status flip, conservado). Las membresías de rooms del ex-usuario quedan inertes
|
||||
(documentado, sin limpieza parcial). (3) **Endpoints HTTP**: `POST /invites`, `GET
|
||||
/invites` (solo pendientes), `DELETE /invites/{token}`, `DELETE /users/{signpub}`
|
||||
(todos admin-only vía `requireAdmin`) y `POST /register` — la única ruta auth-exempt de
|
||||
firma admin (autorizada por el token), rate-limited (se separa `isRateExempt`, solo
|
||||
`/healthz`, de `isAuthExempt`) y con validación hex estricta de `sign_pub`+`kex_pub`
|
||||
ANTES de gastar el token. Errores mapeados: token desconocido 404, usado 409, caducado
|
||||
410, identidad ya registrada 409. (4) **pkg/client**: `CreateInvite/ListInvites/
|
||||
CancelInvite/Register/DeleteUser`; `Register` va sin firma vía un helper `doUnsigned`.
|
||||
(5) Fix de consistencia: el `GetUser` de SQLite ahora mapea `sql.ErrNoRows` → `ErrNotFound`
|
||||
como el KV y como documenta `store.go`. Tests nuevos: suite de invites store-level en
|
||||
AMBOS backends (golden + single-use + token desconocido + caducado + cancel + hard-delete
|
||||
+ burn-on-claim), suite HTTP (crear invite → register sin auth → aparece en allowlist →
|
||||
re-register 409 → caducado 410 → no-admin 403 en las 4 rutas admin → hard-delete purga),
|
||||
y test de cliente end-to-end (admin acuña invite → joiner no-registrado redime sin firma →
|
||||
aparece → hard-delete desaparece). Cambios 100% aditivos: el comportamiento previo no
|
||||
cambia; build/vet/test verdes (`CGO_ENABLED=0`).
|
||||
- v0.11.0 (2026-06-07) — flag dedicado `UNIBUS_NATS_MONITOR` que abre el endpoint
|
||||
de monitoring HTTP del nats-server embebido (`127.0.0.1:8222`, loopback only) de
|
||||
forma DESACOPLADA del debug-log. Antes el monitoring solo se abría con
|
||||
`UNIBUS_NATS_DEBUG=1`, que además encendía el log verboso del nats-server
|
||||
(rutas/RAFT/subjects a journald en claro) — incompatible con el endurecimiento
|
||||
del issue 0007. El cómputo de los toggles se extrae a una función pura
|
||||
`natsLogOpts(debugEnv, monitorEnv) (noLog, debug, trace, monitor)`: `MONITOR=1`
|
||||
abre el endpoint dejando el log en silencio (`NoLog` true / `Debug` false), y se
|
||||
mantiene el acoplamiento inverso por compatibilidad (`DEBUG` sigue implicando
|
||||
`MONITOR`). El bind loopback `127.0.0.1` queda hardcoded — el monitoring NUNCA es
|
||||
público y no lleva auth; lo lee un scraper local que empuja a VictoriaMetrics
|
||||
(dashboard `unibus-nats` en `fleet_monitoring`). Se versiona el cableado de
|
||||
deploy: drop-in systemd aditivo `membershipd-cluster.service.d/nats-monitor.conf`
|
||||
(`Environment=UNIBUS_NATS_MONITOR=1`) + sección "NATS server metrics" en el
|
||||
README del cluster con el runbook de activación rolling (magnus→homer→datardos)
|
||||
y gate de reconvergencia R3 (`followers 2/2`) entre nodos. Tests nuevos: tabla
|
||||
pura del desacoplamiento (monitor on ⇒ log NO debug; debug ⇒ monitor; default
|
||||
cerrado) + server real con `MONITOR=1` que confirma `/varz` 200 en loopback:8222
|
||||
y server sin flag con el endpoint cerrado. Cambios 100% aditivos: sin el flag el
|
||||
comportamiento es idéntico; build/test verdes.
|
||||
- v0.10.0 (2026-06-07) — API HTTP admin-only de gestión de usuarios, cerrando la
|
||||
última asimetría del control plane: las rooms tenían superficie HTTP firmada
|
||||
(`POST /rooms`, etc.) pero los users solo se gestionaban por CLI local o acceso
|
||||
directo al store. Se añaden `GET /users` (lista completa, incluidos revocados),
|
||||
`POST /users` (alta `{sign_pub, handle, role}`: valida hex de 64 chars + role en
|
||||
`{admin, member}`, 409 idempotente que no sobrescribe ni eleva rol) y
|
||||
`POST /users/{signpub}/revoke` (flip de status, sin hard-delete). Los tres pasan por
|
||||
un helper `requireAdmin` default-deny que confirma contra el store que el firmante
|
||||
autenticado es un user `role == "admin"` activo (el endpoint id es un hash one-way de
|
||||
la clave, así que el contexto lleva ahora también el `sign_pub` hex del firmante para
|
||||
resolver `GetUser`); cualquier otro firmante recibe 403, encima de la firma+nonce+TLS+
|
||||
enforce ya heredadas del middleware. NO se abre conexión KV nueva ni se usa la identidad
|
||||
interna: el server escribe vía su `s.store` privilegiado, el MISMO que las rooms (SQLite
|
||||
single-node, KV `UNIBUS_users` en cluster). `pkg/client` gana `ListUsers/AddUser/RevokeUser`
|
||||
(tipo plano `UserInfo`) firmando como admin, así la pestaña Users del panel deja de
|
||||
necesitar `--db`/acceso KV directo. La CLI `membershipd user add --store kv` queda SOLO
|
||||
para sembrar el admin #0 (bootstrap). La validación de `sign_pub` se unifica en
|
||||
`membership.ValidateSignPubHex`, reusada por la CLI y los handlers. Tests nuevos:
|
||||
no-admin → 403 en los tres endpoints, roundtrip admin add→list→revoke, y validación
|
||||
(hex inválido → 400, role inválido → 400, re-alta → 409), más un test de cliente contra
|
||||
un membershipd embebido. Cambios 100% aditivos: el comportamiento single-node y de las
|
||||
rutas de rooms no cambia; vet/build/test verdes.
|
||||
- v0.9.0 (2026-06-07) — cierre de los gaps que el despliegue del cluster (report
|
||||
0011) dejó abiertos (report 0012). (GAP A) Nueva capability `membershipd user
|
||||
add|list|revoke --store kv`: alta/baja de usuarios contra el KV replicado del
|
||||
cluster EN MARCHA, sin el procedimiento de parar-sembrar-rearrancar. Usa la
|
||||
conexión interna privilegiada — el daemon persiste su identidad de servicio con
|
||||
`--internal-id-file` (cada nodo genera/carga la suya, 0600 junto a las claves TLS)
|
||||
y la CLI, ejecutada por loopback en un nodo, presenta esa nkey que el
|
||||
autenticador reconoce con permisos plenos de JetStream; ninguna identidad de
|
||||
usuario normal puede tocar los buckets `KV_UNIBUS_*` bajo la ACL por-subject. El
|
||||
alta es idempotente (re-alta de la misma clave = `ErrUserExists` explícito, sin
|
||||
sobrescribir ni elevar rol), commitea con quórum 2/3 (HA, imprime
|
||||
`followers_current`) y rechaza un destino remoto sin `--ca` (igual que
|
||||
`migrate-to-kv`). (GAP B) Nuevo `cmd/clientcheck`: verificación end-to-end real
|
||||
con un cliente autenticado (identidad operator, nkey+TLS+https) que crea una room
|
||||
E2E, publica y recibe descifrado contra el cluster vivo, incluido un nodo parado a
|
||||
media transmisión donde el cliente hace failover a un superviviente y sigue
|
||||
recibiendo con cero pérdida (quórum 2/3) — el plano de datos que el chaos test del
|
||||
0011 nunca probó. (GAP C) Runbook `deploy/cluster/README.md` corregido: el orden
|
||||
de arranque "magnus solo y verifica healthz" deadlockeaba (un nodo solo no tiene
|
||||
quórum del meta-group y nunca sirve healthz); se documenta el arranque por quórum,
|
||||
que R1 es un SPOF inservible (ir directo a R3) y la nueva vía de alta con el
|
||||
cluster vivo. La plantilla de deploy (unit + `deploy-cluster.sh`) emite ya
|
||||
`INTERNAL_ID_FILE` y el flag. Verificado contra los 3 VPS reales (magnus + homer +
|
||||
datardos); posture enforce+ACL+TLS+R3 intacta.
|
||||
- v0.8.0 (2026-06-07) — completar y endurecer el cluster (issue 0006, fases
|
||||
0006a–0006g) que cierra los bloqueantes de la auditoría dedicada del cluster
|
||||
(report 0008) y cablea el control plane descentralizado que 0003 dejó a medias.
|
||||
|
||||
@@ -0,0 +1,260 @@
|
||||
// Command clientcheck is an end-to-end verification client for a live unibus
|
||||
// cluster (issue 0011 GAP B). The 0011 chaos test validated only the control
|
||||
// plane (healthz + meta/stream-leader failover + KV readable with 2/3); it never
|
||||
// connected an authenticated bus client (nkey + TLS) to create a room and
|
||||
// publish/subscribe through it, least of all across a node loss. clientcheck does
|
||||
// exactly that with a real identity (the operator), so the data-plane end-to-end
|
||||
// path — connect, create an E2E room, publish, receive decrypted — is exercised
|
||||
// against the running cluster, including while a node is stopped.
|
||||
//
|
||||
// It is a reusable tool, not a throwaway script: point it at the cluster's CA,
|
||||
// an identity file, and the NATS + control-plane seed lists.
|
||||
//
|
||||
// # golden: connect, create an E2E room, publish N, confirm N decrypted back
|
||||
// clientcheck --ca ca.crt --identity-file operator.id \
|
||||
// --nats-seeds nats://A:4250,nats://B:4250,nats://C:4250 \
|
||||
// --ctrl-seeds https://A:8470,https://B:8470,https://C:8470 --messages 5
|
||||
//
|
||||
// # loop: publish a counter every interval for the duration, logging the node
|
||||
// # it is attached to — stop a node mid-run (systemctl stop membershipd-cluster)
|
||||
// # and watch it fail over to a survivor and keep receiving (quorum 2/3).
|
||||
// clientcheck ... --mode loop --duration 45s --interval 1s
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/enmanuel/unibus/pkg/room"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var (
|
||||
caPath = flag.String("ca", "", "bus CA cert pinning TLS on both planes (required for a secured cluster)")
|
||||
idFile = flag.String("identity-file", "", "path to the client identity JSON (e.g. `pass show unibus/operator-identity` written 0600) (required)")
|
||||
natsSeeds = flag.String("nats-seeds", "", "comma-separated NATS urls of the cluster nodes (required)")
|
||||
ctrlSeeds = flag.String("ctrl-seeds", "", "comma-separated control-plane https urls of the cluster nodes (required)")
|
||||
subject = flag.String("subject", "test.gapcheck", "test room subject PREFIX; a random token is appended so runs never collide with real rooms")
|
||||
messages = flag.Int("messages", 5, "golden mode: number of messages to publish and expect back")
|
||||
mode = flag.String("mode", "golden", "golden (publish N, verify N decrypted) | loop (publish a counter for --duration, for failover testing)")
|
||||
duration = flag.Duration("duration", 30*time.Second, "loop mode: how long to keep publishing")
|
||||
interval = flag.Duration("interval", 1*time.Second, "loop mode: delay between published messages")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
if *idFile == "" || *natsSeeds == "" || *ctrlSeeds == "" {
|
||||
log.Fatalf("clientcheck: --identity-file, --nats-seeds and --ctrl-seeds are required")
|
||||
}
|
||||
|
||||
id, err := client.LoadIdentity(*idFile)
|
||||
if err != nil {
|
||||
log.Fatalf("clientcheck: load identity: %v", err)
|
||||
}
|
||||
natsList := splitCSV(*natsSeeds)
|
||||
ctrlList := splitCSV(*ctrlSeeds)
|
||||
if len(natsList) == 0 || len(ctrlList) == 0 {
|
||||
log.Fatalf("clientcheck: empty --nats-seeds or --ctrl-seeds")
|
||||
}
|
||||
|
||||
// Build the secure client options: nkey on the data plane, TLS pinned to the
|
||||
// bus CA on both planes, and the FULL seed lists so nats.go fails over to a
|
||||
// surviving node when the attached one dies (the failover this tool verifies).
|
||||
opts := client.Options{
|
||||
NatsServers: natsList[1:],
|
||||
CtrlURLs: ctrlList[1:],
|
||||
}
|
||||
if *caPath != "" {
|
||||
tlsCfg, err := busauth.LoadCATLSConfig(*caPath)
|
||||
if err != nil {
|
||||
log.Fatalf("clientcheck: load CA: %v", err)
|
||||
}
|
||||
opts.UseNkey = true
|
||||
opts.TLS = tlsCfg
|
||||
opts.CtrlTLS = tlsCfg
|
||||
for _, u := range ctrlList {
|
||||
if !strings.HasPrefix(u, "https://") {
|
||||
log.Fatalf("clientcheck: control URL %q must be https:// when --ca is set", u)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c, err := client.NewWithOptions(natsList[0], ctrlList[0], id, opts)
|
||||
if err != nil {
|
||||
log.Fatalf("clientcheck: connect: %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
log.Printf("connected: endpoint=%s nats=%s", c.Endpoint().ID, c.ConnectedServer())
|
||||
|
||||
// Create an EPHEMERAL E2E room (encrypted + signed, NOT persisted): the test
|
||||
// stays end-to-end encrypted (the cluster requires encryption on a public
|
||||
// bind) while leaving no durable JetStream stream behind. The random subject
|
||||
// token guarantees the room is unique and never a real room.
|
||||
rnd := make([]byte, 8)
|
||||
if _, err := rand.Read(rnd); err != nil {
|
||||
log.Fatalf("clientcheck: random: %v", err)
|
||||
}
|
||||
subj := fmt.Sprintf("%s.%s", *subject, hex.EncodeToString(rnd))
|
||||
policy := room.Policy{Encrypt: true, Persist: false, SignMsgs: true}
|
||||
roomID, err := c.CreateRoom(subj, policy)
|
||||
if err != nil {
|
||||
log.Fatalf("clientcheck: create room: %v", err)
|
||||
}
|
||||
log.Printf("created E2E room: id=%s subject=%s (encrypt=%v sign=%v persist=%v)", roomID, subj, policy.Encrypt, policy.SignMsgs, policy.Persist)
|
||||
|
||||
// Under the per-subject ACL, NATS freezes permissions at connect time, so the
|
||||
// just-created room's subject is not yet publishable/subscribable on the live
|
||||
// connection. RefreshSession reconnects so the authenticator re-derives the
|
||||
// ACL (now including this room) — the post-0006 contract every client follows
|
||||
// after a membership change.
|
||||
if err := c.RefreshSession(); err != nil {
|
||||
log.Fatalf("clientcheck: refresh session: %v", err)
|
||||
}
|
||||
|
||||
switch *mode {
|
||||
case "golden":
|
||||
runGolden(c, roomID, *messages)
|
||||
case "loop":
|
||||
runLoop(c, roomID, *duration, *interval)
|
||||
default:
|
||||
log.Fatalf("clientcheck: --mode must be golden or loop, got %q", *mode)
|
||||
}
|
||||
}
|
||||
|
||||
// runGolden subscribes, publishes n messages, and asserts all n come back
|
||||
// decrypted. Exits non-zero if any are missing.
|
||||
func runGolden(c *client.Client, roomID string, n int) {
|
||||
var mu sync.Mutex
|
||||
got := map[string]bool{}
|
||||
sub, err := c.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
|
||||
mu.Lock()
|
||||
got[string(plaintext)] = true
|
||||
mu.Unlock()
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("clientcheck: subscribe: %v", err)
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
time.Sleep(300 * time.Millisecond) // let the subscription settle
|
||||
|
||||
want := make([]string, n)
|
||||
for i := 0; i < n; i++ {
|
||||
msg := fmt.Sprintf("gapcheck-e2e-%d", i)
|
||||
want[i] = msg
|
||||
if err := c.Publish(roomID, []byte(msg)); err != nil {
|
||||
log.Fatalf("clientcheck: publish %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
log.Printf("published %d messages to %s; waiting for decrypted echoes...", n, roomID)
|
||||
|
||||
deadline := time.Now().Add(15 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
mu.Lock()
|
||||
have := len(got)
|
||||
mu.Unlock()
|
||||
if have >= n {
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
missing := 0
|
||||
for _, w := range want {
|
||||
if !got[w] {
|
||||
missing++
|
||||
log.Printf(" MISSING: %q", w)
|
||||
}
|
||||
}
|
||||
log.Printf("connected node at finish: %s", c.ConnectedServer())
|
||||
if missing > 0 {
|
||||
log.Fatalf("GOLDEN FAIL: %d/%d messages not received decrypted", missing, n)
|
||||
}
|
||||
log.Printf("GOLDEN OK: all %d messages received and decrypted end-to-end", n)
|
||||
}
|
||||
|
||||
// runLoop publishes a numbered message every interval for the duration and logs
|
||||
// the count received plus the node currently attached, so an operator stopping a
|
||||
// cluster node mid-run sees the client fail over to a survivor and keep receiving
|
||||
// (quorum 2/3). It is the live failover-with-a-connected-client test the 0011
|
||||
// chaos run never performed.
|
||||
func runLoop(c *client.Client, roomID string, duration, interval time.Duration) {
|
||||
var mu sync.Mutex
|
||||
received := 0
|
||||
servers := map[string]int{} // node -> #ticks observed attached
|
||||
sub, err := c.Subscribe(roomID, func(_ frame.Frame, _ []byte) {
|
||||
mu.Lock()
|
||||
received++
|
||||
mu.Unlock()
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("clientcheck: subscribe: %v", err)
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
log.Printf("loop: publishing every %s for %s — stop a node now to test failover", interval, duration)
|
||||
end := time.Now().Add(duration)
|
||||
sent := 0
|
||||
for time.Now().Before(end) {
|
||||
msg := fmt.Sprintf("gapcheck-loop-%d", sent)
|
||||
err := c.Publish(roomID, []byte(msg))
|
||||
sent++
|
||||
mu.Lock()
|
||||
recv := received
|
||||
mu.Unlock()
|
||||
node := c.ConnectedServer()
|
||||
up := c.IsConnected()
|
||||
if node != "" {
|
||||
mu.Lock()
|
||||
servers[node]++
|
||||
mu.Unlock()
|
||||
}
|
||||
pubStatus := "ok"
|
||||
if err != nil {
|
||||
pubStatus = "ERR:" + err.Error()
|
||||
}
|
||||
log.Printf(" t=%2ds sent=%d recv=%d up=%v node=%s publish=%s",
|
||||
sent, sent, recv, up, node, pubStatus)
|
||||
time.Sleep(interval)
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
log.Printf("loop done: sent=%d received=%d", sent, received)
|
||||
nodes := make([]string, 0, len(servers))
|
||||
for n := range servers {
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
sort.Strings(nodes)
|
||||
for _, n := range nodes {
|
||||
log.Printf(" attached to %s for %d ticks", n, servers[n])
|
||||
}
|
||||
if len(servers) > 1 {
|
||||
log.Printf("FAILOVER OBSERVED: client was attached to %d distinct nodes across the run", len(servers))
|
||||
}
|
||||
if received == 0 {
|
||||
log.Fatalf("LOOP FAIL: received 0 messages")
|
||||
}
|
||||
log.Printf("LOOP OK: client kept receiving across the run (received=%d)", received)
|
||||
}
|
||||
|
||||
func splitCSV(s string) []string {
|
||||
var out []string
|
||||
for _, p := range strings.Split(s, ",") {
|
||||
if p = strings.TrimSpace(p); p != "" {
|
||||
out = append(out, p)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
package main
|
||||
|
||||
// Integration tests for issue 0011 GAP A: `membershipd user add --store kv`
|
||||
// adds users to a RUNNING cluster's replicated allowlist via the privileged
|
||||
// internal connection, instead of the stop-seed-restart procedure the 0011
|
||||
// deploy required. These exercise the real connectKVStore path (load the
|
||||
// persisted internal identity from a file, present its nkey, open the KV store,
|
||||
// write the user) against an embedded enforce node, plus the idempotency and
|
||||
// error semantics the DoD calls for. Multi-node replication and node-down quorum
|
||||
// are validated against the live cluster (report 0012).
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
)
|
||||
|
||||
// startEnforceKVNode boots a single embedded enforce node whose authenticator
|
||||
// recognizes internalPubHex as the privileged internal identity, bootstraps the
|
||||
// KV control-plane store over the in-process internal connection, and publishes
|
||||
// it into the holder — the exact sequence main.go performs for --store kv. It
|
||||
// returns the client URL the CLI connects to.
|
||||
func startEnforceKVNode(t *testing.T, internalID cs.Identity) string {
|
||||
t.Helper()
|
||||
holder := &storeHolder{}
|
||||
auth := busauth.NewNkeyAuthenticatorACLInternal(
|
||||
holder.IsAuthorized,
|
||||
busauth.PermissionsFromSubjects(holder.subjectACL),
|
||||
hex.EncodeToString(internalID.SignPub),
|
||||
)
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t), Auth: auth,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("start enforce node: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||
|
||||
intNC, js, err := connectInternalJS(ns, internalID, true)
|
||||
if err != nil {
|
||||
t.Fatalf("bootstrap internal connection: %v", err)
|
||||
}
|
||||
t.Cleanup(intNC.Close)
|
||||
kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
|
||||
if err != nil {
|
||||
t.Fatalf("bootstrap KV store: %v", err)
|
||||
}
|
||||
holder.set(kvStore)
|
||||
return ns.ClientURL()
|
||||
}
|
||||
|
||||
// TestUserAddStoreKV_GoldenAndIdempotent is the GAP A golden + edge-1: the CLI
|
||||
// connection (real connectKVStore, loading the internal identity from a file and
|
||||
// presenting its nkey) writes a user into the live KV allowlist, the user is
|
||||
// authorized afterward, and re-adding the same key is an explicit ErrUserExists
|
||||
// with no corruption (the unchanged row is still authorized).
|
||||
func TestUserAddStoreKV_GoldenAndIdempotent(t *testing.T) {
|
||||
idFile := filepath.Join(t.TempDir(), "internal.id")
|
||||
internalID, err := client.LoadOrCreateIdentity(idFile) // persists 0600
|
||||
if err != nil {
|
||||
t.Fatalf("persist internal identity: %v", err)
|
||||
}
|
||||
url := startEnforceKVNode(t, internalID)
|
||||
|
||||
// Golden: connect as the privileged internal identity (loopback, no TLS) and
|
||||
// add a new user, exactly as `user add --store kv` does.
|
||||
kv, err := connectKVStore(url, idFile, "", 1)
|
||||
if err != nil {
|
||||
t.Fatalf("connectKVStore (privileged): %v", err)
|
||||
}
|
||||
defer kv.Close()
|
||||
|
||||
newUser, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("new user identity: %v", err)
|
||||
}
|
||||
pub := hex.EncodeToString(newUser.SignPub)
|
||||
if err := kv.store.AddUser(pub, "gapcheck_user", membership.RoleMember); err != nil {
|
||||
t.Fatalf("add user to live KV: %v", err)
|
||||
}
|
||||
if !kv.store.IsAuthorized(pub) {
|
||||
t.Fatalf("user added to KV must be authorized")
|
||||
}
|
||||
|
||||
// Edge 1: re-adding the same key is a clean, non-destructive ErrUserExists.
|
||||
err = kv.store.AddUser(pub, "gapcheck_user", membership.RoleMember)
|
||||
if !errors.Is(err, membership.ErrUserExists) {
|
||||
t.Fatalf("re-add must return ErrUserExists (idempotent), got %v", err)
|
||||
}
|
||||
// A different handle/role with the SAME key is also rejected — the row is not
|
||||
// silently overwritten (no role flip).
|
||||
if err := kv.store.AddUser(pub, "impostor", membership.RoleAdmin); !errors.Is(err, membership.ErrUserExists) {
|
||||
t.Fatalf("re-add with a different role must NOT overwrite; want ErrUserExists, got %v", err)
|
||||
}
|
||||
u, err := kv.store.GetUser(pub)
|
||||
if err != nil {
|
||||
t.Fatalf("get user: %v", err)
|
||||
}
|
||||
if u.Handle != "gapcheck_user" || u.Role != membership.RoleMember || u.Status != membership.StatusActive {
|
||||
t.Fatalf("idempotent re-add corrupted the row: %+v", u)
|
||||
}
|
||||
}
|
||||
|
||||
// TestUserAddStoreKV_RequiresInternalIdentity: --store kv without a usable
|
||||
// internal identity file fails loudly (missing file, empty path) rather than
|
||||
// silently connecting unprivileged.
|
||||
func TestUserAddStoreKV_RequiresInternalIdentity(t *testing.T) {
|
||||
if _, err := connectKVStore("nats://127.0.0.1:4250", "", "", 1); err == nil {
|
||||
t.Fatalf("empty --internal-id-file must be an error")
|
||||
}
|
||||
missing := filepath.Join(t.TempDir(), "nope.id")
|
||||
if _, err := connectKVStore("nats://127.0.0.1:4250", missing, "", 1); err == nil {
|
||||
t.Fatalf("missing internal identity file must be an error")
|
||||
}
|
||||
}
|
||||
|
||||
// TestUserAddStoreKV_UnreachableKV is the GAP A error case: pointing --store kv
|
||||
// at a dead endpoint yields a clear, handled error (no crash, no silent success).
|
||||
func TestUserAddStoreKV_UnreachableKV(t *testing.T) {
|
||||
idFile := filepath.Join(t.TempDir(), "internal.id")
|
||||
if _, err := client.LoadOrCreateIdentity(idFile); err != nil {
|
||||
t.Fatalf("persist internal identity: %v", err)
|
||||
}
|
||||
// A loopback port with nothing listening: connect must fail fast and wrapped.
|
||||
_, err := connectKVStore("nats://127.0.0.1:1/", idFile, "", 1)
|
||||
if err == nil {
|
||||
t.Fatalf("connecting to a dead endpoint must error")
|
||||
}
|
||||
}
|
||||
|
||||
// TestUserAddStoreKV_RemoteWithoutCARefused: a non-loopback target without --ca
|
||||
// is refused so the allowlist write never travels in cleartext (audit 0008 N6,
|
||||
// same guard as migrate-to-kv).
|
||||
func TestUserAddStoreKV_RemoteWithoutCARefused(t *testing.T) {
|
||||
idFile := filepath.Join(t.TempDir(), "internal.id")
|
||||
if _, err := client.LoadOrCreateIdentity(idFile); err != nil {
|
||||
t.Fatalf("persist internal identity: %v", err)
|
||||
}
|
||||
_, err := connectKVStore("nats://203.0.113.1:4250", idFile, "", 1)
|
||||
if err == nil {
|
||||
t.Fatalf("remote target without --ca must be refused")
|
||||
}
|
||||
}
|
||||
+27
-3
@@ -24,6 +24,7 @@ import (
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
)
|
||||
@@ -83,6 +84,17 @@ func main() {
|
||||
// "kv" puts rooms/members/keys/users in replicated JetStream KV so any node
|
||||
// in the cluster serves the same state.
|
||||
storeBackend = flag.String("store", "sqlite", "control-plane store backend: sqlite (default, single-node) | kv (replicated JetStream, decentralized)")
|
||||
// Persisted internal service identity (issue 0011 gaps, GAP A): when set, the
|
||||
// privileged internal identity used to manage JetStream is LOADED from this
|
||||
// file (generated and persisted on first start) instead of being a fresh
|
||||
// ephemeral key each boot. Persisting it is what lets `membershipd user add
|
||||
// --store kv` write the replicated allowlist of a LIVE cluster: that CLI,
|
||||
// run over loopback on a node, loads the SAME identity and presents the nkey
|
||||
// this node's authenticator already grants full permissions. Empty keeps the
|
||||
// ephemeral-per-process behavior (single-node/dev default, unchanged). The
|
||||
// file holds a private key: it is written 0600 and belongs next to the node's
|
||||
// TLS keys (deploy keeps it under secrets/, gitignored).
|
||||
internalIDFile = flag.String("internal-id-file", "", "path to a persisted internal service identity (JSON); enables `membershipd user add --store kv` against the live cluster. Empty = ephemeral per-process identity (dev default)")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
@@ -136,9 +148,21 @@ func main() {
|
||||
var internalID cs.Identity
|
||||
var internalPubHex string
|
||||
if needJS && enforce && *natsURL == "" {
|
||||
internalID, err = cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
log.Fatalf("generate internal identity: %v", err)
|
||||
if *internalIDFile != "" {
|
||||
// Persisted identity: load it, generating + writing it (0600) on first
|
||||
// start. A stable internal key is what `user add --store kv` presents to
|
||||
// add users to a live cluster (GAP A); rotate it by deleting the file and
|
||||
// restarting.
|
||||
internalID, err = client.LoadOrCreateIdentity(*internalIDFile)
|
||||
if err != nil {
|
||||
log.Fatalf("load internal service identity %q: %v", *internalIDFile, err)
|
||||
}
|
||||
log.Printf("internal service identity: persisted (%s)", *internalIDFile)
|
||||
} else {
|
||||
internalID, err = cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
log.Fatalf("generate internal identity: %v", err)
|
||||
}
|
||||
}
|
||||
internalPubHex = hex.EncodeToString(internalID.SignPub)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -50,13 +50,26 @@ commands:
|
||||
list List all registered users
|
||||
revoke Revoke a user (denies access on both planes immediately)
|
||||
|
||||
store backends (--store):
|
||||
sqlite local SQLite database (default; seeds the first admin offline)
|
||||
kv the RUNNING cluster's replicated JetStream KV allowlist, via the
|
||||
privileged internal connection — add users with the cluster live,
|
||||
no stop-seed-restart needed (run over loopback/SSH on a node)
|
||||
|
||||
examples:
|
||||
membershipd user add --handle alice --sign-pub <64-hex> --role admin
|
||||
membershipd user list
|
||||
membershipd user add --store kv --handle bob --sign-pub <64-hex> --role member
|
||||
membershipd user list --store kv
|
||||
membershipd user revoke <64-hex>
|
||||
|
||||
common flags:
|
||||
--db <path> SQLite database path (default ./local_files/unibus.db)
|
||||
--db <path> SQLite database path (--store sqlite; default ./local_files/unibus.db)
|
||||
|
||||
--store kv flags (defaults assume an on-node invocation):
|
||||
--nats-url <url> cluster NATS (default nats://127.0.0.1:4250)
|
||||
--internal-id-file <path> persisted internal service identity (default /opt/unibus/secrets/internal.id)
|
||||
--ca <path> CA cert pinning the data-plane TLS (default /opt/unibus/tls/ca.crt)
|
||||
--kv-replicas <n> KV replication factor, match the cluster (default 3)
|
||||
`)
|
||||
}
|
||||
|
||||
@@ -76,16 +89,56 @@ func openStore(path string) membership.Store {
|
||||
|
||||
// validateSignPubHex ensures the key is exactly a 32-byte Ed25519 public key in
|
||||
// hex (64 hex chars). Catching this here turns a silent "authorized nobody" into
|
||||
// an explicit error at seed time.
|
||||
// an explicit error at seed time. It delegates to membership.ValidateSignPubHex
|
||||
// so the CLI and the HTTP user-management handlers share one rule.
|
||||
func validateSignPubHex(signPub string) error {
|
||||
b, err := hex.DecodeString(signPub)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sign-pub is not valid hex: %w", err)
|
||||
return membership.ValidateSignPubHex(signPub)
|
||||
}
|
||||
|
||||
// kvFlags holds the connection flags shared by the --store kv path of the user
|
||||
// subcommands. registerKVFlags wires them onto a flag set so add and list expose
|
||||
// an identical interface.
|
||||
type kvFlags struct {
|
||||
store *string
|
||||
natsURL *string
|
||||
internalID *string
|
||||
ca *string
|
||||
replicas *int
|
||||
}
|
||||
|
||||
func registerKVFlags(fs *flag.FlagSet) kvFlags {
|
||||
return kvFlags{
|
||||
store: fs.String("store", "sqlite", "user store backend: sqlite (local DB) | kv (the live cluster's replicated allowlist)"),
|
||||
natsURL: fs.String("nats-url", defaultClusterNatsURL, "cluster NATS url for --store kv"),
|
||||
internalID: fs.String("internal-id-file", defaultInternalIDFile, "persisted internal service identity for --store kv"),
|
||||
ca: fs.String("ca", defaultClusterCAFile, "CA cert pinning TLS on the --store kv NATS connection"),
|
||||
replicas: fs.Int("kv-replicas", 3, "KV replication factor for --store kv (match the cluster)"),
|
||||
}
|
||||
if len(b) != 32 {
|
||||
return fmt.Errorf("sign-pub must be a 32-byte Ed25519 public key (64 hex chars), got %d bytes", len(b))
|
||||
}
|
||||
|
||||
// resolveStore returns the membership store for the chosen backend plus a cleanup
|
||||
// func. For --store kv it opens the privileged connection to the live cluster; for
|
||||
// sqlite it opens the local file. It exits the process with a clear message on any
|
||||
// failure (a dead NATS, a missing identity file), so a broken --store kv add fails
|
||||
// loudly instead of silently — Error case of the GAP A DoD. The returned *kvConn
|
||||
// is non-nil only for the kv backend (so the caller can report replication).
|
||||
func resolveStore(cmd string, kf kvFlags, dbPath string) (membership.Store, *kvConn, func()) {
|
||||
switch *kf.store {
|
||||
case "sqlite":
|
||||
store := openStore(dbPath)
|
||||
return store, nil, func() { store.Close() }
|
||||
case "kv":
|
||||
kv, err := connectKVStore(*kf.natsURL, *kf.internalID, *kf.ca, *kf.replicas)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "membershipd %s: --store kv: %v\n", cmd, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
return kv.store, kv, kv.Close
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "membershipd %s: --store must be \"sqlite\" or \"kv\", got %q\n", cmd, *kf.store)
|
||||
os.Exit(2)
|
||||
return nil, nil, func() {}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func userAdd(args []string) {
|
||||
@@ -94,6 +147,7 @@ func userAdd(args []string) {
|
||||
signPub := fs.String("sign-pub", "", "Ed25519 signing public key in hex (required)")
|
||||
role := fs.String("role", membership.RoleMember, "role: admin or member")
|
||||
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
|
||||
kf := registerKVFlags(fs)
|
||||
_ = fs.Parse(args)
|
||||
|
||||
if *handle == "" || *signPub == "" {
|
||||
@@ -105,23 +159,35 @@ func userAdd(args []string) {
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
store := openStore(*dbPath)
|
||||
defer store.Close()
|
||||
store, kv, closeStore := resolveStore("user add", kf, *dbPath)
|
||||
defer closeStore()
|
||||
|
||||
if err := store.AddUser(*signPub, *handle, *role); err != nil {
|
||||
if errors.Is(err, membership.ErrUserExists) {
|
||||
// Idempotency contract (GAP A): re-adding the same key is an EXPLICIT,
|
||||
// non-destructive error — the existing row is left untouched (no silent
|
||||
// upsert that could flip a role or clobber status, which would corrupt the
|
||||
// allowlist). To replace a user, `user revoke <key>` then add again.
|
||||
fmt.Fprintf(os.Stderr, "membershipd user add: user %s already registered (unchanged); revoke it first to replace\n", *signPub)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "membershipd user add: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("added user %q (%s) role=%s\n", *handle, *signPub, *role)
|
||||
if kv != nil {
|
||||
reportKVReplication(kv.js)
|
||||
}
|
||||
}
|
||||
|
||||
func userList(args []string) {
|
||||
fs := flag.NewFlagSet("user list", flag.ExitOnError)
|
||||
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
|
||||
kf := registerKVFlags(fs)
|
||||
_ = fs.Parse(args)
|
||||
|
||||
store := openStore(*dbPath)
|
||||
defer store.Close()
|
||||
store, _, closeStore := resolveStore("user list", kf, *dbPath)
|
||||
defer closeStore()
|
||||
|
||||
users, err := store.ListUsers()
|
||||
if err != nil {
|
||||
@@ -143,6 +209,7 @@ func userList(args []string) {
|
||||
func userRevoke(args []string) {
|
||||
fs := flag.NewFlagSet("user revoke", flag.ExitOnError)
|
||||
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
|
||||
kf := registerKVFlags(fs)
|
||||
|
||||
// Go's flag package stops at the first non-flag argument, so `revoke <key>
|
||||
// --db path` would otherwise leave --db unparsed. Pull a leading positional
|
||||
@@ -167,8 +234,8 @@ func userRevoke(args []string) {
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
store := openStore(*dbPath)
|
||||
defer store.Close()
|
||||
store, _, closeStore := resolveStore("user revoke", kf, *dbPath)
|
||||
defer closeStore()
|
||||
|
||||
if err := store.RevokeUser(signPub); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "membershipd user revoke: %v\n", err)
|
||||
|
||||
@@ -0,0 +1,151 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/busauth"
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
// users_kv.go is the `--store kv` half of the user administration CLI (issue 0011
|
||||
// gaps, GAP A): adding and listing bus users directly against the RUNNING
|
||||
// cluster's replicated JetStream KV allowlist, with no need to stop the cluster,
|
||||
// seed a standalone node, and restart (the procedure the 0011 deploy required).
|
||||
//
|
||||
// The mechanism is the cluster's own privileged internal connection. Under
|
||||
// enforce every bus user is confined by the per-subject ACL to the JetStream API
|
||||
// of its own rooms, so no ordinary identity may touch the control-plane buckets
|
||||
// (KV_UNIBUS_*). The ONLY identity the authenticator grants full JetStream
|
||||
// permissions is membershipd's internal service identity. By persisting that
|
||||
// identity to a file (membershipd --internal-id-file) the same key becomes
|
||||
// available to this CLI, which presents it as its NATS nkey and is therefore
|
||||
// recognized as the privileged internal client and allowed to read/write the KV.
|
||||
//
|
||||
// Intended invocation is over loopback on a cluster node (SSH): the data-plane
|
||||
// TLS certificate's SAN covers 127.0.0.1/localhost and the internal identity file
|
||||
// lives 0600 next to the node's TLS keys. Using the file requires root on the
|
||||
// node, which already implies full control of that node — so co-locating it adds
|
||||
// no practical exposure beyond what the TLS server key and cluster password
|
||||
// already represent.
|
||||
|
||||
// defaultClusterNatsURL is the node-local NATS listener. The CLI is meant to run
|
||||
// on a cluster node over SSH, talking to that node's own embedded server.
|
||||
const defaultClusterNatsURL = "nats://127.0.0.1:4250"
|
||||
|
||||
// Deploy-default paths for the privileged identity and the data-plane CA, so an
|
||||
// on-node invocation needs only --handle/--sign-pub/--role. Override for other
|
||||
// layouts.
|
||||
const (
|
||||
defaultInternalIDFile = "/opt/unibus/secrets/internal.id"
|
||||
defaultClusterCAFile = "/opt/unibus/tls/ca.crt"
|
||||
)
|
||||
|
||||
// kvConn bundles the privileged NATS connection to a live cluster and the
|
||||
// KV-backed control-plane store opened over it. Close releases both.
|
||||
type kvConn struct {
|
||||
nc *nats.Conn
|
||||
js jetstream.JetStream
|
||||
store membership.Store
|
||||
}
|
||||
|
||||
func (k *kvConn) Close() {
|
||||
if k == nil {
|
||||
return
|
||||
}
|
||||
if k.store != nil {
|
||||
_ = k.store.Close()
|
||||
}
|
||||
if k.nc != nil {
|
||||
k.nc.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// connectKVStore opens the privileged internal connection to the cluster's NATS
|
||||
// and the JetStream KV control-plane store on top of it. internalIDFile is the
|
||||
// membershipd-persisted internal service identity whose nkey the authenticator
|
||||
// grants full permissions; caPath pins the data-plane TLS (empty only for a
|
||||
// non-TLS dev cluster). A non-loopback target without --ca is refused, mirroring
|
||||
// migrate-to-kv (audit 0008 N6): the allowlist write must not travel in cleartext.
|
||||
func connectKVStore(natsURL, internalIDFile, caPath string, replicas int) (*kvConn, error) {
|
||||
if internalIDFile == "" {
|
||||
return nil, fmt.Errorf("--internal-id-file is required for --store kv (the privileged identity membershipd persists with --internal-id-file)")
|
||||
}
|
||||
// Confidentiality guard: a remote NATS without TLS would expose the allowlist
|
||||
// (handles/roles/sign-pubs) and the privileged nkey handshake in cleartext.
|
||||
if !isLoopbackURL(natsURL) && caPath == "" {
|
||||
return nil, fmt.Errorf("refusing to connect to remote %q without --ca: the allowlist write would travel in cleartext — pin TLS with --ca, or run over a loopback --nats-url on a node", natsURL)
|
||||
}
|
||||
|
||||
id, err := client.LoadIdentity(internalIDFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("load internal identity: %w", err)
|
||||
}
|
||||
nkeyPub, nkeySign, err := busauth.ClientNkey(id.SignPriv)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("derive nkey from internal identity: %w", err)
|
||||
}
|
||||
opts := []nats.Option{
|
||||
nats.Name("membershipd-user-cli"),
|
||||
nats.Nkey(nkeyPub, nkeySign),
|
||||
}
|
||||
if caPath != "" {
|
||||
tlsCfg, err := busauth.LoadCATLSConfig(caPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("load CA %q: %w", caPath, err)
|
||||
}
|
||||
opts = append(opts, nats.Secure(tlsCfg))
|
||||
}
|
||||
nc, err := nats.Connect(natsURL, opts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connect cluster NATS %q: %w", natsURL, err)
|
||||
}
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
return nil, fmt.Errorf("jetstream: %w", err)
|
||||
}
|
||||
store, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: replicas})
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
return nil, fmt.Errorf("open KV control-plane store: %w", err)
|
||||
}
|
||||
return &kvConn{nc: nc, js: js, store: store}, nil
|
||||
}
|
||||
|
||||
// reportKVReplication prints the replication status of the allowlist bucket
|
||||
// stream (KV_UNIBUS_users) right after a write, so the operator sees the add
|
||||
// landed on a quorum and replicated to the followers — executable evidence that
|
||||
// the live-cluster add is HA, not single-node. Best-effort: a read failure is a
|
||||
// note, not an error (the write itself already succeeded).
|
||||
func reportKVReplication(js jetstream.JetStream) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
st, err := js.Stream(ctx, "KV_UNIBUS_users")
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "note: could not read KV_UNIBUS_users stream info: %v\n", err)
|
||||
return
|
||||
}
|
||||
info, err := st.Info(ctx)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "note: could not read KV_UNIBUS_users stream info: %v\n", err)
|
||||
return
|
||||
}
|
||||
if info.Cluster == nil {
|
||||
fmt.Printf("KV_UNIBUS_users: standalone (R1, no cluster replication); msgs=%d\n", info.State.Msgs)
|
||||
return
|
||||
}
|
||||
current := 0
|
||||
for _, r := range info.Cluster.Replicas {
|
||||
if r.Current {
|
||||
current++
|
||||
}
|
||||
}
|
||||
fmt.Printf("KV_UNIBUS_users: leader=%s followers_current=%d/%d msgs=%d\n",
|
||||
info.Cluster.Leader, current, len(info.Cluster.Replicas), info.State.Msgs)
|
||||
}
|
||||
+201
-39
@@ -5,9 +5,12 @@ This directory holds the material to bring up unibus as a **3-node cluster**
|
||||
plane (rooms/members/keys/users on JetStream KV + the anti-replay nonce bucket)
|
||||
survives the loss of any one node (quorum 2/3).
|
||||
|
||||
> **The agent that authored this never touched a VPS.** Every step that changes a
|
||||
> remote host is marked **HUMAN** and is executed by the operator. `deploy-cluster.sh`
|
||||
> defaults to a dry run.
|
||||
> **Status: this cluster is DEPLOYED in production** (magnus + homer + datardos,
|
||||
> R3, enforce+ACL+TLS) — see report 0011. The runbook below was authored before any
|
||||
> VPS existed and has since been **corrected against the real deploy** (report 0012):
|
||||
> the start ordering, the R1→R3 reality, and the live user-add path were all wrong
|
||||
> or missing. Steps that change a remote host are marked **HUMAN**; `deploy-cluster.sh`
|
||||
> still defaults to a dry run.
|
||||
|
||||
## Files
|
||||
|
||||
@@ -22,18 +25,22 @@ Generated keys/secrets (`out/`, `build/`, `secrets/`) are **gitignored** — the
|
||||
secret and never leave the operator's trusted machine except over the secure
|
||||
rsync channel.
|
||||
|
||||
## Topology
|
||||
## Topology (as deployed, report 0011)
|
||||
|
||||
| Node | SSH | Public IP | WireGuard IP | Role |
|
||||
|---|---|---|---|---|
|
||||
| magnus | `magnus` | `<MAGNUS_PUBLIC_IP>` | `<MAGNUS_WG_IP>` | seed (first up) |
|
||||
| homer | `homer` | `141.94.69.66` | `<HOMER_WG_IP>` | replica |
|
||||
| datardos | `dd` | `51.91.100.142` | `<DATARDOS_WG_IP>` (10.21.0.x) | replica |
|
||||
| Node | SSH | Public IP | Role |
|
||||
|---|---|---|---|
|
||||
| magnus | `magnus` (root) | `135.125.201.30` | node — **= organic-machine.com = `om`**, the critical host (caddy + gitea + registry-api + monitoring); the bus runs alongside, untouched |
|
||||
| homer | `homer` (ubuntu+sudo) | `141.94.69.66` | node |
|
||||
| datardos | `dd` (ubuntu+sudo) | `51.91.100.142` | node |
|
||||
|
||||
The route layer (server-to-server) prefers the **WireGuard mesh**
|
||||
(`ROUTE_NETWORK=wg`); the client data plane and the HTTP control plane are reached
|
||||
over the public IPs. The route CA is **separate** from the client CA, so a client
|
||||
cert can never be presented to the route port.
|
||||
`ROUTE_NETWORK=public`, **not `wg`**: there is no WireGuard mesh between the three
|
||||
nodes (homer and datardos do not even have the `wg` binary; om's only WG peers are
|
||||
the operator's PCs). The server-to-server routes therefore travel over the public
|
||||
IPs, protected by the **separate cluster route CA** (mutual route TLS) — a client
|
||||
data-plane cert can never be presented to the route port. The client data plane and
|
||||
the HTTP control plane are also reached over the public IPs. There is no fixed
|
||||
"seed" node: with R3 the three are peers (see "Bring up" for why a lone node cannot
|
||||
self-serve).
|
||||
|
||||
## Prerequisites (HUMAN, once)
|
||||
|
||||
@@ -93,25 +100,48 @@ SEED
|
||||
|
||||
> The KV written here lives in `./local_files/jetstream`, which the cluster unit
|
||||
> reuses (`--nats-store` default), so the admin is present when the enforce cluster
|
||||
> starts. Additional users are added the same loopback way until a
|
||||
> `user add --store kv` exists (see GAP in report 0009).
|
||||
> starts. This loopback bootstrap is needed ONLY for the very first admin (the
|
||||
> chicken-and-egg). **Every user after that is added with the cluster live** — no
|
||||
> stop-seed-restart — via `user add --store kv` (see "Add users to the live
|
||||
> cluster" below, report 0012).
|
||||
|
||||
## Bring up (HUMAN — staggered)
|
||||
## Bring up (HUMAN)
|
||||
|
||||
Bring up the seed first, then the replicas one at a time, checking each joins.
|
||||
> **CORRECTION (report 0012).** The original instruction — "start magnus alone and
|
||||
> verify healthz, then add the others" — is **WRONG and will look like a hung
|
||||
> deploy.** A 3-node JetStream cluster forms a RAFT meta-group that needs a quorum
|
||||
> (2 of 3) to elect a leader. A single started node has no quorum, so its JetStream
|
||||
> meta never becomes current: `--store kv` blocks creating the KV buckets and
|
||||
> **`/healthz` never returns ok** until a second node joins. Waiting for magnus to
|
||||
> "go green" before starting the others therefore deadlocks the rollout.
|
||||
|
||||
Start the nodes so a quorum forms. On a **clean cluster** the simplest correct
|
||||
procedure is to start all three close together and let the meta-group converge:
|
||||
|
||||
```bash
|
||||
# 1. Seed node (after the seed step above).
|
||||
ssh root@magnus 'systemctl enable --now membershipd-cluster'
|
||||
ssh root@magnus 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt'
|
||||
# Start all three (order does not matter); each blocks on the others until a
|
||||
# 2/3 quorum elects a JetStream meta leader, then the KV buckets are created.
|
||||
for h in magnus homer datardos; do ssh "$h" 'sudo systemctl enable --now membershipd-cluster'; done
|
||||
|
||||
# 2. Replicas, one at a time.
|
||||
ssh root@homer 'systemctl enable --now membershipd-cluster'
|
||||
ssh root@datardos 'systemctl enable --now membershipd-cluster'
|
||||
# Only NOW does healthz return ok — once the meta-group has a leader (give it
|
||||
# ~10-30s on a cold start). Poll, do not assume the first node is broken.
|
||||
for h in magnus homer datardos; do
|
||||
echo "== $h =="; ssh "$h" 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt || echo "(not ready yet — needs quorum)"'
|
||||
done
|
||||
```
|
||||
|
||||
> Initial rollout runs at **R1** (`KV_REPLICAS=1` in `nodes.env`): the buckets live
|
||||
> on the seed only. This is NOT HA yet — see "Scale to R3".
|
||||
A **staggered** start also works, but only because `membershipd`'s KV open RETRIES
|
||||
the bucket creation for a 120s bootstrap budget (issue 0006g, fix #3): the first
|
||||
node sits in that retry loop — NOT serving healthz — until the second node makes a
|
||||
quorum, then both converge and the third catches up. Either way, a lone node never
|
||||
self-serves; do not gate the next node's start on the previous one's healthz.
|
||||
|
||||
> A cold multi-node start only converges because of **three cold-start fixes**
|
||||
> (report 0011): route pooling off (`PoolSize=-1`), `NoAdvertise=true` (Docker
|
||||
> bridge IPs not gossiped), and the KV-open retry loop above. Without them the
|
||||
> meta-group re-elects leaders forever and bucket creation hangs. If a fresh
|
||||
> cluster will not form, confirm the running binary contains these fixes before
|
||||
> touching config.
|
||||
|
||||
## Promote an existing single-node (SQLite) deployment (HUMAN, optional)
|
||||
|
||||
@@ -137,11 +167,80 @@ ssh root@magnus 'nats --server nats://127.0.0.1:4250 server list' # 3 servers,
|
||||
|
||||
A healthy cluster shows 3 routed servers and a JetStream meta-group with a leader.
|
||||
|
||||
## Scale to R3 (HUMAN — real HA)
|
||||
## Add users to the live cluster (HUMAN — `user add --store kv`)
|
||||
|
||||
Once all three nodes are up and routed, raise the replication factor of every
|
||||
control-plane stream from 1 to 3 IN PLACE (no data loss), then flip `KV_REPLICAS=3`
|
||||
in `nodes.env` so future (re)deploys keep it:
|
||||
With the cluster up, add (and revoke) bus users **without stopping anything**,
|
||||
directly against the replicated KV allowlist. This replaces the stop-seed-restart
|
||||
procedure the original runbook implied for every user beyond the first admin.
|
||||
|
||||
The mechanism is the cluster's own **privileged internal connection**: under
|
||||
`enforce` every bus user is confined by the per-subject ACL to its own rooms, so no
|
||||
ordinary identity may write the control-plane buckets. The only identity the
|
||||
authenticator grants full JetStream permissions is `membershipd`'s internal service
|
||||
identity. The unit persists that identity to `${INTERNAL_ID_FILE}`
|
||||
(`/opt/unibus/secrets/internal.id`, 0600) via `--internal-id-file`, so the same key
|
||||
is available to the CLI. Run the CLI **on a node, over loopback** (the data-plane
|
||||
TLS cert SAN covers `127.0.0.1`); reading the identity file requires root on that
|
||||
node, which already implies full control of it, so this adds no practical exposure.
|
||||
|
||||
```bash
|
||||
# Add a member to the live cluster's replicated allowlist (run on any node).
|
||||
ssh root@magnus 'sudo /opt/unibus/membershipd user add --store kv \
|
||||
--handle alice --role member --sign-pub <64-hex-ed25519-pub>'
|
||||
# -> added user "alice" (...) role=member
|
||||
# -> KV_UNIBUS_users: leader=<node> followers_current=2/2 msgs=N (replicated, HA)
|
||||
|
||||
# List / revoke against the same live KV:
|
||||
ssh root@magnus 'sudo /opt/unibus/membershipd user list --store kv'
|
||||
ssh root@magnus 'sudo /opt/unibus/membershipd user revoke --store kv <64-hex-ed25519-pub>'
|
||||
```
|
||||
|
||||
Defaults assume an on-node invocation (`--nats-url nats://127.0.0.1:4250`,
|
||||
`--internal-id-file /opt/unibus/secrets/internal.id`, `--ca /opt/unibus/tls/ca.crt`,
|
||||
`--kv-replicas 3`). Semantics:
|
||||
|
||||
- **Idempotent / non-destructive**: re-adding the same key is an explicit
|
||||
`already registered` error (exit 1), never a silent overwrite — a re-add cannot
|
||||
flip a member to admin. To replace a user, `revoke` then add.
|
||||
- **HA**: the write commits through the JetStream quorum, so it succeeds even with
|
||||
one node down (2/3); the printed `followers_current` shows replication.
|
||||
- **No hard delete**: `revoke` flips status to `revoked` (denied on both planes,
|
||||
auditable); the KV has no row deletion, matching the SQLite store.
|
||||
|
||||
> **Rollout note (report 0012):** the live verification deployed this binary +
|
||||
> `--internal-id-file` to **datardos only** (the non-critical node). magnus and
|
||||
> homer still run the 0011 binary. To make the capability available (and the unit)
|
||||
> on all three — recommended, the posture is identical so there is no urgency — roll
|
||||
> the new binary with backups, one node at a time, verifying healthz between each:
|
||||
> ```bash
|
||||
> for h in homer magnus; do
|
||||
> ssh "$h" 'sudo cp -a /opt/unibus/membershipd /opt/unibus/membershipd.bak' # backup
|
||||
> scp build/membershipd "$h:/tmp/m" && ssh "$h" 'sudo install -o ubuntu -g ubuntu -m0775 /tmp/m /opt/unibus/membershipd'
|
||||
> # add INTERNAL_ID_FILE=/opt/unibus/secrets/internal.id to /opt/unibus/cluster.env
|
||||
> # add `--internal-id-file ${INTERNAL_ID_FILE} \` to the unit before `--store kv`
|
||||
> ssh "$h" 'sudo systemctl daemon-reload && sudo systemctl restart membershipd-cluster'
|
||||
> ssh "$h" 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt' # green before next
|
||||
> done
|
||||
> ```
|
||||
> (`deploy-cluster.sh` + the unit template already emit `INTERNAL_ID_FILE` and the
|
||||
> flag, so a fresh `./deploy-cluster.sh --yes` is correct for all three.)
|
||||
|
||||
## Replication: go straight to R3 (HUMAN — real HA)
|
||||
|
||||
> **CORRECTION (report 0012).** The original "start at R1, then scale to R3" plan
|
||||
> assumed R1 is a usable interim state. **It is not, in this cluster.** At R1 all six
|
||||
> control-plane buckets (`KV_UNIBUS_users/rooms/members/room_keys/rooms_by_member`
|
||||
> + `KV_UNIBUS_nonces`) live on a SINGLE node — a hard **SPOF for authentication**:
|
||||
> if that node dies, the nonce/KV control plane is unreachable and EVERY
|
||||
> authenticated request fails closed (auth DoS). Worse, the cold multi-node start
|
||||
> only converges at all because of the three cold-start fixes (see "Bring up"); the
|
||||
> real deploy never ran a healthy R1 and **jumped straight to R3 once the cluster
|
||||
> formed.** Treat R1 as a transient artifact of bucket creation, not a milestone.
|
||||
|
||||
The deployed config already sets `KV_REPLICAS=3` in `nodes.env`. If buckets were
|
||||
created at R1 (e.g. only one node was up when `--store kv` first opened them), raise
|
||||
every control-plane stream to R3 IN PLACE (no data loss) once all three nodes are
|
||||
routed:
|
||||
|
||||
```bash
|
||||
for s in KV_UNIBUS_users KV_UNIBUS_rooms KV_UNIBUS_members KV_UNIBUS_room_keys \
|
||||
@@ -151,27 +250,32 @@ done
|
||||
# (also OBJ_UNIBUS_blobs if the object store is in use)
|
||||
```
|
||||
|
||||
Until this is done, R1 means the seed node is a **single point of failure for
|
||||
authentication**: if it dies, the nonce/KV control plane is unreachable and every
|
||||
authenticated request fails closed (auth DoS). R1 is a rollout step, not HA.
|
||||
After this each bucket shows `followers_current=2/2` (quorum 2/3). The
|
||||
`user add --store kv` command prints that figure for `KV_UNIBUS_users` on every add,
|
||||
which is a cheap live HA check.
|
||||
|
||||
## Chaos test (HUMAN — requires the 3 live VPS; NOT run here)
|
||||
## Chaos test (HUMAN — requires the 3 live VPS)
|
||||
|
||||
Validate quorum tolerance after R3:
|
||||
|
||||
```bash
|
||||
# Kill one node; the cluster keeps serving (quorum 2/3).
|
||||
ssh root@datardos 'systemctl stop membershipd-cluster'
|
||||
# Kill one node; the cluster keeps serving (quorum 2/3). On ubuntu nodes use sudo.
|
||||
ssh dd 'sudo systemctl stop membershipd-cluster'
|
||||
# -> clients fail over (multiple seed URLs); reads/writes still succeed.
|
||||
ssh root@datardos 'systemctl start membershipd-cluster' # rejoins, catches up
|
||||
ssh dd 'sudo systemctl start membershipd-cluster' # rejoins, catches up
|
||||
|
||||
# Kill two nodes; quorum is LOST — the control plane should fail CLOSED (deny),
|
||||
# never fail open. Verify a request is rejected, not silently served.
|
||||
```
|
||||
|
||||
This network-level chaos test (kill 1/3, kill 2/3, partition/split-brain) is part
|
||||
of the deploy validation (issue 0003f) and runs against the real VPS — it is
|
||||
deliberately out of scope for the authoring agent.
|
||||
> **Validated (report 0012).** The 0011 chaos run checked only the control plane
|
||||
> (healthz + meta/stream-leader failover + KV readable with 2/3). Report 0012 added
|
||||
> the missing data-plane proofs against the live cluster: a real authenticated
|
||||
> client (`cmd/clientcheck`, operator identity, nkey+TLS) creating an E2E room and
|
||||
> publishing/subscribing — including a node stopped mid-stream, where the client
|
||||
> failed over to a survivor and kept receiving with zero loss (quorum 2/3) — and
|
||||
> `user add --store kv` committing with one node (the KV leader) down. The kill-2/3
|
||||
> fail-closed case remains a documented manual step.
|
||||
|
||||
## Rollback
|
||||
|
||||
@@ -179,3 +283,61 @@ deliberately out of scope for the authoring agent.
|
||||
the unit and start it without `--store kv`/`--cluster-name`; the KV buckets remain
|
||||
for a later retry. To rotate the cluster CA, re-run `generate-cluster-certs.sh
|
||||
--force` and re-stage (every node must get the new `cluster-ca.crt` together).
|
||||
|
||||
## NATS server metrics (loopback monitoring — optional)
|
||||
|
||||
The embedded NATS server can expose its own monitoring HTTP endpoint so a local
|
||||
scraper reads server-level metrics that `/healthz` does not surface: msgs/s,
|
||||
connections, slow consumers, memory, KV bucket message counts, the RAFT leader per
|
||||
stream and per-stream restarts. This feeds the `unibus-nats` dashboard in
|
||||
`fleet_monitoring` (the scraper hits `127.0.0.1:8222/varz|/connz|/jsz` over
|
||||
loopback and pushes to VictoriaMetrics).
|
||||
|
||||
The endpoint is opened by the **dedicated** environment toggle `UNIBUS_NATS_MONITOR=1`
|
||||
(0.11.0+ binary). It is **decoupled** from `UNIBUS_NATS_DEBUG`: it opens the
|
||||
monitoring endpoint WITHOUT enabling the verbose nats-server debug log, so no room
|
||||
subjects or routing metadata leak to journald (keeps the hardened posture, issue
|
||||
0007). The endpoint binds `127.0.0.1:8222` **only** — the binary hardcodes the
|
||||
loopback bind, so it is never reachable from the network and needs no auth. Never
|
||||
use `UNIBUS_NATS_DEBUG` in production just to get the endpoint.
|
||||
|
||||
### Enable it (HUMAN — requires the 0.11.0+ binary on the node)
|
||||
|
||||
The clean way is the additive systemd drop-in in this directory:
|
||||
|
||||
```bash
|
||||
# On each node, AFTER the 0.11.0+ binary is in /opt/unibus/membershipd:
|
||||
ssh <node> 'sudo mkdir -p /etc/systemd/system/membershipd-cluster.service.d'
|
||||
scp membershipd-cluster.service.d/nats-monitor.conf <node>:/tmp/nats-monitor.conf
|
||||
ssh <node> 'sudo cp /tmp/nats-monitor.conf /etc/systemd/system/membershipd-cluster.service.d/ \
|
||||
&& sudo systemctl daemon-reload && sudo systemctl restart membershipd-cluster'
|
||||
```
|
||||
|
||||
(Equivalently, add `UNIBUS_NATS_MONITOR=1` to `/opt/unibus/cluster.env`, which the
|
||||
unit already sources via `EnvironmentFile`; the drop-in is preferred because it is
|
||||
self-documenting and does not edit the generated env file.)
|
||||
|
||||
### Rolling restart with the R3 reconvergence gate (CRITICAL)
|
||||
|
||||
`systemctl restart membershipd-cluster` restarts that node's JetStream RAFT member.
|
||||
**Never restart two nodes at once** — that would drop the cluster below quorum
|
||||
(2/3) and fail the control plane closed. Roll **one node at a time**, in the order
|
||||
`magnus → homer → datardos`, and between each node wait until the cluster has
|
||||
reconverged to R3 (every control-plane bucket back to `followers_current=2/2`):
|
||||
|
||||
```bash
|
||||
# After restarting ONE node, gate on R3 reconvergence before touching the next:
|
||||
ssh root@magnus 'for s in KV_UNIBUS_users KV_UNIBUS_rooms KV_UNIBUS_members \
|
||||
KV_UNIBUS_room_keys KV_UNIBUS_rooms_by_member KV_UNIBUS_nonces; do
|
||||
nats --server nats://127.0.0.1:4250 stream info "$s" -j \
|
||||
| jq -r --arg s "$s" \"\\($s): replicas=\\(.cluster.replicas|length) leader=\\(.cluster.leader)\"
|
||||
done'
|
||||
# Proceed to the next node ONLY when all six show 3 replicas with a leader
|
||||
# (i.e. 2/2 followers current). Also confirm healthz is green on the just-restarted
|
||||
# node first:
|
||||
ssh <node> 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt'
|
||||
```
|
||||
|
||||
This restart is normally **not** done as a standalone step: the 0.11.0 binary that
|
||||
carries the flag is rolled to the three nodes in the consolidated rollout, and the
|
||||
drop-in is installed during that same rolling restart.
|
||||
|
||||
@@ -97,6 +97,7 @@ TLS_KEY=${REMOTE_DIR}/tls/server-${name}.key
|
||||
ROUTE_TLS_CERT=${REMOTE_DIR}/tls/route-${name}.crt
|
||||
ROUTE_TLS_KEY=${REMOTE_DIR}/tls/route-${name}.key
|
||||
ROUTE_TLS_CA=${REMOTE_DIR}/tls/cluster-ca.crt
|
||||
INTERNAL_ID_FILE=${REMOTE_DIR}/secrets/internal.id
|
||||
EOF
|
||||
|
||||
run ssh "$target" "mkdir -p ${REMOTE_DIR}/tls ${REMOTE_DIR}/secrets"
|
||||
@@ -114,13 +115,16 @@ if [[ $APPLY -eq 0 ]]; then
|
||||
fi
|
||||
cat <<'NEXT'
|
||||
|
||||
HUMAN — staggered start (do NOT enable all at once; see README "Bring up"):
|
||||
1. Seed node first (e.g. magnus):
|
||||
ssh root@magnus 'systemctl enable --now membershipd-cluster'
|
||||
ssh root@magnus '/opt/unibus/membershipd user add --admin ...' # seed admin
|
||||
2. Then the other two, one at a time, checking quorum after each:
|
||||
ssh root@homer 'systemctl enable --now membershipd-cluster'
|
||||
ssh root@datardos 'systemctl enable --now membershipd-cluster'
|
||||
HUMAN — bring up (see README "Bring up" — a LONE node has no quorum and never
|
||||
serves healthz, so do NOT gate the next node on the previous one going green):
|
||||
1. Seed the FIRST admin into the KV via the loopback bootstrap (README
|
||||
"Seed the first admin"); this is needed only for the chicken-and-egg admin.
|
||||
2. Start all three so a 2/3 quorum forms (order does not matter); healthz
|
||||
turns ok only once the meta-group elects a leader (~10-30s cold):
|
||||
for h in magnus homer datardos; do ssh "$h" 'sudo systemctl enable --now membershipd-cluster'; done
|
||||
3. Verify posture + quorum (README "Verify").
|
||||
4. Scale replicas 1 -> 3 once all three are up (README "Scale to R3").
|
||||
4. Ensure R3 on every control-plane stream (README "Replication: go straight to
|
||||
R3"); R1 is a SPOF, not a milestone.
|
||||
5. Add further users with the cluster LIVE — no restart — via
|
||||
`membershipd user add --store kv` (README "Add users to the live cluster").
|
||||
NEXT
|
||||
|
||||
@@ -33,6 +33,7 @@ ExecStart=/opt/unibus/membershipd \
|
||||
--route-tls-cert ${ROUTE_TLS_CERT} \
|
||||
--route-tls-key ${ROUTE_TLS_KEY} \
|
||||
--route-tls-ca ${ROUTE_TLS_CA} \
|
||||
--internal-id-file ${INTERNAL_ID_FILE} \
|
||||
--store kv \
|
||||
--kv-replicas ${KV_REPLICAS}
|
||||
# Restart=always (NOT on-failure): a clean SIGTERM exits success, and on-failure
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
# Drop-in: enable the embedded NATS server monitoring HTTP endpoint so a local
|
||||
# metrics scraper can read /varz, /connz and /jsz for server-level metrics
|
||||
# (msgs/s, connections, KV bucket msgs, RAFT leader per stream, restarts).
|
||||
#
|
||||
# ADDITIVE and minimal: it only sets one environment variable; the base unit
|
||||
# (membershipd-cluster.service) is otherwise unchanged.
|
||||
#
|
||||
# UNIBUS_NATS_MONITOR is DECOUPLED from UNIBUS_NATS_DEBUG: it opens the monitoring
|
||||
# endpoint WITHOUT enabling the verbose nats-server debug log, so no room subjects
|
||||
# or routing metadata are written to journald (keeps the hardened posture, issue
|
||||
# 0007). Do NOT use UNIBUS_NATS_DEBUG in production just to get the endpoint.
|
||||
#
|
||||
# The endpoint binds 127.0.0.1:8222 ONLY — the binary hardcodes the loopback bind,
|
||||
# so it is never reachable from the network and needs no auth. The scraper runs on
|
||||
# the same host and reads it over loopback.
|
||||
#
|
||||
# Requires the 0.11.0+ membershipd binary (the one that honors UNIBUS_NATS_MONITOR).
|
||||
# Install on a node:
|
||||
# sudo mkdir -p /etc/systemd/system/membershipd-cluster.service.d
|
||||
# sudo cp nats-monitor.conf /etc/systemd/system/membershipd-cluster.service.d/
|
||||
# sudo systemctl daemon-reload && sudo systemctl restart membershipd-cluster
|
||||
#
|
||||
# Restarting a node restarts its JetStream RAFT member, so roll ONE node at a time
|
||||
# and wait for R3 reconvergence (followers 2/2) before touching the next. See the
|
||||
# "NATS server metrics" section of this directory's README for the full runbook.
|
||||
[Service]
|
||||
Environment=UNIBUS_NATS_MONITOR=1
|
||||
@@ -2,10 +2,10 @@
|
||||
#
|
||||
# This file is SOURCED by generate-cluster-certs.sh and deploy-cluster.sh.
|
||||
#
|
||||
# HUMAN: fill in every <PLACEHOLDER> with the real value before running the
|
||||
# HUMAN: fill in every placeholder with the real value before running the
|
||||
# scripts. The public IPs known at authoring time are pre-filled; the WireGuard
|
||||
# mesh IPs and magnus's public IP must be supplied. The scripts refuse to run
|
||||
# while any <PLACEHOLDER> remains.
|
||||
# while any unfilled placeholder remains.
|
||||
|
||||
# Cluster identity (must be identical on every node).
|
||||
CLUSTER_NAME="unibus"
|
||||
@@ -16,7 +16,7 @@ CLUSTER_USER="unibus-cluster"
|
||||
# KV/nonce replication factor. START AT 1 for the initial 1->3 rollout, then raise
|
||||
# to 3 IN PLACE (see README "Scale to R3") once all three nodes have joined. Only
|
||||
# set this to 3 here after the third node is up and you re-run the KV update.
|
||||
KV_REPLICAS=1
|
||||
KV_REPLICAS=3
|
||||
|
||||
# Ports (same on every node; the route port is server-to-server only).
|
||||
NATS_CLIENT_PORT=4250
|
||||
@@ -30,15 +30,28 @@ SSH_USER="root"
|
||||
# Which address family the inter-node routes use. "wg" builds --routes from the
|
||||
# WireGuard mesh IPs (private server-to-server links, preferred); "public" uses
|
||||
# the public IPs. The route layer is always mutual-TLS regardless.
|
||||
ROUTE_NETWORK="wg"
|
||||
#
|
||||
# DEPLOY DECISION (2026-06-07): set to "public". No WireGuard mesh exists between
|
||||
# the three cluster nodes — homer and datardos do not even have the `wg` binary
|
||||
# installed, and om's only WG peers are the operator's personal PCs, not the VPS.
|
||||
# Rather than stand up a fresh mesh blindly, the routes go over the public IPs,
|
||||
# still protected by the separate cluster route CA (mutual-TLS). On magnus (the
|
||||
# only node with ufw active) the route port 6250 is restricted to the homer and
|
||||
# datardos public IPs; homer/datardos run ufw inactive (Docker hosts) and rely on
|
||||
# the route mutual-TLS for 6250.
|
||||
ROUTE_NETWORK="public"
|
||||
|
||||
# One row per node: NAME SSH_HOST PUBLIC_IP WG_IP
|
||||
# NAME -> --server-name and the per-node cert filenames (unique).
|
||||
# SSH_HOST -> the `ssh <SSH_HOST>` alias (see ~/.ssh/config).
|
||||
# SSH_HOST -> the `ssh ALIAS` alias (see ~/.ssh/config).
|
||||
# PUBLIC_IP -> public address; goes in the cert SANs (client-facing data plane).
|
||||
# WG_IP -> WireGuard mesh address; cert SAN + route target when ROUTE_NETWORK=wg.
|
||||
# NOTE: with ROUTE_NETWORK=public and no WireGuard mesh, the WG_IP column is set to
|
||||
# each node's public IP so the cert SAN covers the address actually used by the
|
||||
# public routes and no unfilled placeholder remains (scripts refuse to run otherwise).
|
||||
# magnus == organic-machine.com == om (135.125.201.30); SSH alias `magnus` enters as root.
|
||||
CLUSTER_NODES=(
|
||||
"magnus magnus <MAGNUS_PUBLIC_IP> <MAGNUS_WG_IP>"
|
||||
"homer homer 141.94.69.66 <HOMER_WG_IP>"
|
||||
"datardos dd 51.91.100.142 <DATARDOS_WG_IP>"
|
||||
"magnus magnus 135.125.201.30 135.125.201.30"
|
||||
"homer homer 141.94.69.66 141.94.69.66"
|
||||
"datardos dd 51.91.100.142 51.91.100.142"
|
||||
)
|
||||
|
||||
@@ -0,0 +1,78 @@
|
||||
---
|
||||
issue: 0007
|
||||
title: Cifrado at-rest del control plane (JetStream KV / SQLite en disco)
|
||||
status: spec
|
||||
created: 2026-06-07
|
||||
domain: security
|
||||
scope: unibus (pkg/embeddednats, cmd/membershipd, deploy/cluster) + procedimiento de migración del store existente
|
||||
---
|
||||
|
||||
# Objetivo
|
||||
|
||||
Cifrar en reposo el almacenamiento del plano de control para que un nodo comprometido
|
||||
(root en el VPS) o un disco robado no exponga los metadatos de control en claro.
|
||||
|
||||
Estado actual (auditado el 07/06/2026, report 0012 y siguientes):
|
||||
|
||||
- **Contenido de los mensajes**: cifrado E2E por room (megolm/olm). El servidor nunca ve el
|
||||
plaintext; no vive en el plano de control. **No es el objeto de este issue.**
|
||||
- **Claves de room** (`UNIBUS_room_keys`): guardadas **selladas** (sealed box X25519, cifradas
|
||||
para cada miembro). El servidor las almacena y reparte pero no puede abrirlas. **Ya protegidas.**
|
||||
- **Metadatos de control** (`UNIBUS_rooms`, `UNIBUS_members`, `UNIBUS_rooms_by_member`,
|
||||
`UNIBUS_users`): se serializan con `json.Marshal` y se escriben **en claro** en el store. En
|
||||
cluster ese store es el directorio `local_files/jetstream/` de cada nodo; en single-node es el
|
||||
archivo SQLite `local_files/unibus.db`. Hoy **no hay cifrado at-rest**: con root en un nodo se
|
||||
pueden leer subjects de salas, la pertenencia (quién está en qué sala con qué rol), los handles
|
||||
y roles de los usuarios, y las claves públicas (signPub/kexPub). No se exponen mensajes (E2E) ni
|
||||
se pueden descifrar salas (claves selladas), pero sí toda la topología.
|
||||
|
||||
Tras este issue, los buckets/archivos del control plane quedan cifrados en disco con una clave por
|
||||
nodo gestionada fuera de git. El modelo de amenaza pasa de "root del nodo ve la topología" a "root
|
||||
del nodo necesita además la clave at-rest (que puede vivir en un secreto separado / TPM / variable
|
||||
de entorno inyectada) para leer cualquier cosa".
|
||||
|
||||
# Contexto técnico
|
||||
|
||||
- NATS Server / JetStream soporta **encryption at-rest** nativo: se configura una cifra
|
||||
(`aes` o `chacha20`) y una clave; JetStream cifra los ficheros de los streams/KV en disco. El
|
||||
bus usa un NATS **embebido** (`pkg/embeddednats`), así que la activación es por opciones del
|
||||
servidor embebido, no por un `nats-server.conf` externo.
|
||||
- Para el backend SQLite (single-node) el equivalente sería SQLCipher o cifrado a nivel de
|
||||
archivo/FS; queda como sub-tarea de menor prioridad porque el despliegue real es cluster (KV).
|
||||
|
||||
# Tareas
|
||||
|
||||
1. Confirmar la API de encryption-at-rest del NATS embebido en la versión usada (opción de
|
||||
servidor para cipher + clave; cómo se pasa la clave de forma que no quede en argv ni en git).
|
||||
2. Activar el cifrado en `pkg/embeddednats` detrás de una opción de configuración. La clave se
|
||||
inyecta por archivo (`--jetstream-encryption-key-file`, 0600, junto a las claves TLS del nodo)
|
||||
o variable de entorno desde el unit systemd; nunca en argv ni commiteada.
|
||||
3. `cmd/membershipd`: flag/env para la clave + reflejar el estado en la posture publicada en
|
||||
`/healthz` (p.ej. `"at_rest":true`) para que el monitor lo verifique.
|
||||
4. `deploy/cluster`: provisionar la clave at-rest por nodo (generación + `pass`/secrets gitignored)
|
||||
y cablearla en `cluster.env` + el unit. Documentar en el runbook.
|
||||
5. **Migración del store existente** (gotcha crítico): JetStream no re-cifra retroactivamente los
|
||||
datos ya escritos en claro. Diseñar y documentar el procedimiento seguro para el cluster en
|
||||
producción (probable: backup → exportar snapshot del control plane → parar nodo → recrear el
|
||||
store con la clave activa → re-importar; o rotación nodo a nodo aprovechando la replicación R3).
|
||||
Respetar la regla de migraciones (aditivo, sin pérdida de datos).
|
||||
6. Tests: arrancar un nodo con clave at-rest, escribir un user/room, y verificar que el fichero en
|
||||
disco **no** contiene en claro un subject/handle conocido (grep negativo), y que el nodo sigue
|
||||
leyéndolos con la clave. Verificar que sin la clave el store no se abre.
|
||||
|
||||
# Definition of Done
|
||||
|
||||
- Cifrado at-rest activo en los 3 nodos del cluster; `/healthz` lo refleja en la posture.
|
||||
- Evidencia ejecutable: un valor conocido (subject de sala / handle de usuario) **no** aparece en
|
||||
claro al hacer `grep` sobre `local_files/jetstream/`; el nodo lo sigue sirviendo con la clave.
|
||||
- Procedimiento de migración probado sobre datos reales sin pérdida (snapshot/restore verificado).
|
||||
- La clave at-rest nunca está en git ni en argv; vive en archivo 0600 / secreto inyectado.
|
||||
- No baja ninguna otra capa de seguridad (enforce + ACL + TLS + E2E + sealed keys intactas).
|
||||
|
||||
# Notas
|
||||
|
||||
Aditivo y ortogonal al resto de la seguridad: TLS protege en tránsito, E2E el contenido, las claves
|
||||
de room van selladas; este issue cierra el último hueco (metadatos de control en claro en disco)
|
||||
para el modelo de amenaza "VPS comprometido / disco robado". Prioridad media: el despliegue ya es
|
||||
seguro frente a ataques de red (enforce+TLS+ACL); esto endurece frente a compromiso físico/root del
|
||||
host. Relacionado con el endurecimiento de los issues 0004/0005/0006.
|
||||
@@ -0,0 +1,28 @@
|
||||
-- 003_invites.sql — single-use registration invites (issue: user accounts / wallet model).
|
||||
--
|
||||
-- An admin mints an invite so a brand-new identity can join the bus allowlist
|
||||
-- WITHOUT the admin ever handling its private key. The token is the bearer
|
||||
-- secret that authorizes POST /register: the registering client generates its
|
||||
-- keypair locally and publishes only its public keys, fixing the link between an
|
||||
-- invite and the identity it creates via the audit columns below. The handle and
|
||||
-- role are fixed by the admin at mint time and cannot be changed by the client
|
||||
-- (no privilege escalation).
|
||||
--
|
||||
-- Additive and idempotent: safe to apply repeatedly. Never modify this file;
|
||||
-- further schema changes go in new numbered migrations (see
|
||||
-- .claude/rules/db_migrations.md). The embedded copy under
|
||||
-- pkg/membership/migrations/003_invites.sql mirrors this file byte-for-byte.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS invites (
|
||||
token TEXT PRIMARY KEY, -- 32 random bytes in lowercase hex (the bearer secret)
|
||||
handle TEXT NOT NULL, -- handle the new user will get (fixed by admin)
|
||||
role TEXT NOT NULL DEFAULT 'member', -- 'admin' | 'member' (fixed by admin)
|
||||
expires_at TEXT NOT NULL, -- RFC3339; past this the invite is dead
|
||||
used INTEGER NOT NULL DEFAULT 0, -- 0 pending, 1 consumed (single-use)
|
||||
created_at TEXT NOT NULL,
|
||||
used_at TEXT, -- RFC3339 when consumed (NULL until used)
|
||||
used_sign_pub TEXT, -- Ed25519 key that consumed it (audit; NULL until used)
|
||||
used_kex_pub TEXT -- X25519 key presented at registration (audit; NULL until used)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_invites_used ON invites(used);
|
||||
@@ -331,6 +331,60 @@ func (c *Client) doJSON(method, path string, body, out any) error {
|
||||
return fmt.Errorf("client: %s %s: all control planes failed: %w", method, path, lastErr)
|
||||
}
|
||||
|
||||
// doUnsigned performs a control-plane request WITHOUT the transport signature
|
||||
// headers, for the one endpoint a not-yet-registered identity must reach: POST
|
||||
// /register. The registering peer is not in the allowlist, so it cannot produce
|
||||
// an accepted signature; authorization is the single-use invite token inside the
|
||||
// body. Like doJSON it fails over across the control-plane endpoints (any node
|
||||
// serves the same state) and surfaces the server's structured error message.
|
||||
func (c *Client) doUnsigned(method, path string, body, out any) error {
|
||||
var bodyBytes []byte
|
||||
if body != nil {
|
||||
b, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("client: marshal request: %w", err)
|
||||
}
|
||||
bodyBytes = b
|
||||
}
|
||||
var lastErr error
|
||||
for _, base := range c.ctrlURLs {
|
||||
var rdr io.Reader
|
||||
if bodyBytes != nil {
|
||||
rdr = bytes.NewReader(bodyBytes)
|
||||
}
|
||||
req, err := http.NewRequest(method, base+path, rdr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("client: new request: %w", err)
|
||||
}
|
||||
if body != nil {
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
}
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue // dead node: try the next control plane
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
if resp.StatusCode >= 300 {
|
||||
var er struct {
|
||||
Error string `json:"error"`
|
||||
}
|
||||
if json.Unmarshal(respBody, &er) == nil && er.Error != "" {
|
||||
return fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode)
|
||||
}
|
||||
return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody))
|
||||
}
|
||||
if out != nil {
|
||||
if err := json.Unmarshal(respBody, out); err != nil {
|
||||
return fmt.Errorf("client: decode response: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("client: %s %s: all control planes failed: %w", method, path, lastErr)
|
||||
}
|
||||
|
||||
// signRequest signs the canonical bytes of req (req must already have its Sig
|
||||
// field cleared) with the client's Ed25519 key. It is symmetric with the
|
||||
// server's verifyOwnerSig. This is the PAYLOAD-level owner signature that
|
||||
@@ -456,6 +510,52 @@ type memberRoomJSON struct {
|
||||
Role string `json:"role"`
|
||||
}
|
||||
|
||||
// userJSON mirrors the server's wire type on the admin user-management endpoints.
|
||||
type userJSON struct {
|
||||
SignPub string `json:"sign_pub"`
|
||||
Handle string `json:"handle"`
|
||||
Role string `json:"role"`
|
||||
Status string `json:"status"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
RevokedAt string `json:"revoked_at,omitempty"`
|
||||
}
|
||||
|
||||
// addUserReq is the POST /users body (mirror of the server type).
|
||||
type addUserReq struct {
|
||||
SignPub string `json:"sign_pub"`
|
||||
Handle string `json:"handle"`
|
||||
Role string `json:"role"`
|
||||
}
|
||||
|
||||
// createInviteReq / createInviteResp mirror the server's POST /invites types.
|
||||
type createInviteReq struct {
|
||||
Handle string `json:"handle"`
|
||||
Role string `json:"role"`
|
||||
TTLSecs int `json:"ttl_secs"`
|
||||
}
|
||||
|
||||
type createInviteResp struct {
|
||||
Token string `json:"token"`
|
||||
ExpiresAt string `json:"expires_at"`
|
||||
}
|
||||
|
||||
// inviteJSON mirrors the server's GET /invites row.
|
||||
type inviteJSON struct {
|
||||
Token string `json:"token"`
|
||||
Handle string `json:"handle"`
|
||||
Role string `json:"role"`
|
||||
ExpiresAt string `json:"expires_at"`
|
||||
Used bool `json:"used"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
// registerReq mirrors the server's POST /register body.
|
||||
type registerReq struct {
|
||||
Token string `json:"token"`
|
||||
SignPub string `json:"sign_pub"`
|
||||
KexPub string `json:"kex_pub"`
|
||||
}
|
||||
|
||||
// ---- room operations ------------------------------------------------------
|
||||
|
||||
// RoomRef is a room this peer belongs to, returned by ListMyRooms. It is the
|
||||
@@ -490,6 +590,135 @@ func (c *Client) ListMyRooms() ([]RoomRef, error) {
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ---- user administration (admin-only) ------------------------------------
|
||||
|
||||
// UserInfo is a bus user as returned by the admin user-management endpoints. It
|
||||
// is a flat view (no nested types) for the admin panel: the signing key
|
||||
// (lowercase hex), handle, role ("admin"|"member"), status ("active"|"revoked"),
|
||||
// and timestamps. RevokedAt is empty for an active user.
|
||||
type UserInfo struct {
|
||||
SignPub string
|
||||
Handle string
|
||||
Role string
|
||||
Status string
|
||||
CreatedAt string
|
||||
RevokedAt string
|
||||
}
|
||||
|
||||
// ListUsers returns the full bus allowlist, including revoked users. The caller
|
||||
// must be signing as an admin: a non-admin signer is rejected by the server with
|
||||
// 403, surfaced here as an error.
|
||||
func (c *Client) ListUsers() ([]UserInfo, error) {
|
||||
var resp []userJSON
|
||||
if err := c.doJSON("GET", "/users", nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out := make([]UserInfo, 0, len(resp))
|
||||
for _, u := range resp {
|
||||
out = append(out, UserInfo{
|
||||
SignPub: u.SignPub,
|
||||
Handle: u.Handle,
|
||||
Role: u.Role,
|
||||
Status: u.Status,
|
||||
CreatedAt: u.CreatedAt,
|
||||
RevokedAt: u.RevokedAt,
|
||||
})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// AddUser registers a bus user from their Ed25519 signing public key (64-hex).
|
||||
// role is "admin" or "member" (empty defaults to member, matching the server).
|
||||
// The caller must be signing as an admin. Re-adding an already-registered key
|
||||
// returns an error (the server replies 409 and leaves the existing row
|
||||
// untouched — no silent role/status change).
|
||||
func (c *Client) AddUser(signPub, handle, role string) error {
|
||||
return c.doJSON("POST", "/users", addUserReq{SignPub: signPub, Handle: handle, Role: role}, nil)
|
||||
}
|
||||
|
||||
// RevokeUser revokes a bus user by their signing public key (64-hex). Revocation
|
||||
// is a status flip (no hard delete): the identity stays auditable and is denied
|
||||
// on both planes immediately. The caller must be signing as an admin.
|
||||
func (c *Client) RevokeUser(signPub string) error {
|
||||
return c.doJSON("POST", "/users/"+signPub+"/revoke", nil, nil)
|
||||
}
|
||||
|
||||
// DeleteUser hard-deletes a bus user by their signing public key (64-hex) — the
|
||||
// purge counterpart of RevokeUser. The allowlist row is removed entirely (no
|
||||
// audit trail); the ex-user can no longer authenticate, so their room
|
||||
// memberships become inert. The caller must be signing as an admin.
|
||||
func (c *Client) DeleteUser(signPub string) error {
|
||||
return c.doJSON("DELETE", "/users/"+signPub, nil, nil)
|
||||
}
|
||||
|
||||
// InviteInfo is a single-use registration invite as returned by the admin invite
|
||||
// endpoints. It is a flat view for the admin panel: the bearer token (to build
|
||||
// the join link), the handle and role the new user will receive, the absolute
|
||||
// expiry, whether it has been used, and when it was minted.
|
||||
type InviteInfo struct {
|
||||
Token string
|
||||
Handle string
|
||||
Role string
|
||||
ExpiresAt string
|
||||
Used bool
|
||||
CreatedAt string
|
||||
}
|
||||
|
||||
// CreateInvite mints a single-use registration invite. handle and role are fixed
|
||||
// here (the registering client cannot change them); role is "admin" or "member"
|
||||
// (empty defaults to member). ttlSecs sets the link lifetime (non-positive uses
|
||||
// the server's 7-day default). The returned InviteInfo carries the token and
|
||||
// expiry; the caller turns the token into a join link. Caller must sign as admin.
|
||||
func (c *Client) CreateInvite(handle, role string, ttlSecs int) (InviteInfo, error) {
|
||||
var resp createInviteResp
|
||||
if err := c.doJSON("POST", "/invites", createInviteReq{Handle: handle, Role: role, TTLSecs: ttlSecs}, &resp); err != nil {
|
||||
return InviteInfo{}, err
|
||||
}
|
||||
r := role
|
||||
if r == "" {
|
||||
r = "member"
|
||||
}
|
||||
return InviteInfo{Token: resp.Token, Handle: handle, Role: r, ExpiresAt: resp.ExpiresAt}, nil
|
||||
}
|
||||
|
||||
// ListInvites returns the pending invites (not used, not expired). Caller must
|
||||
// sign as admin.
|
||||
func (c *Client) ListInvites() ([]InviteInfo, error) {
|
||||
var resp []inviteJSON
|
||||
if err := c.doJSON("GET", "/invites", nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out := make([]InviteInfo, 0, len(resp))
|
||||
for _, inv := range resp {
|
||||
out = append(out, InviteInfo{
|
||||
Token: inv.Token,
|
||||
Handle: inv.Handle,
|
||||
Role: inv.Role,
|
||||
ExpiresAt: inv.ExpiresAt,
|
||||
Used: inv.Used,
|
||||
CreatedAt: inv.CreatedAt,
|
||||
})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// CancelInvite cancels (deletes) a pending invite by its token, so an admin can
|
||||
// revoke a link before it is redeemed. Caller must sign as admin.
|
||||
func (c *Client) CancelInvite(token string) error {
|
||||
return c.doJSON("DELETE", "/invites/"+token, nil, nil)
|
||||
}
|
||||
|
||||
// Register redeems a single-use invite token, joining the bus allowlist. It is
|
||||
// the wallet-model join call: the registering peer generated its own keypair
|
||||
// locally and publishes ONLY its public keys here (signPub Ed25519, kexPub
|
||||
// X25519, both 64-hex). It is UNSIGNED — the bearer token is the authorization,
|
||||
// because this identity is not yet in the allowlist and so cannot sign an
|
||||
// accepted request. On success the identity is registered with the invite's
|
||||
// handle and role and can connect like any other peer.
|
||||
func (c *Client) Register(token, signPub, kexPub string) error {
|
||||
return c.doUnsigned("POST", "/register", registerReq{Token: token, SignPub: signPub, KexPub: kexPub}, nil)
|
||||
}
|
||||
|
||||
// newRoomKey returns 32 random bytes for a symmetric room key.
|
||||
func newRoomKey() ([]byte, error) {
|
||||
k := make([]byte, 32)
|
||||
|
||||
+27
-11
@@ -33,20 +33,36 @@ type identityFile struct {
|
||||
KexPriv string `json:"kex_priv"`
|
||||
}
|
||||
|
||||
// LoadIdentity loads an existing identity from path. Unlike LoadOrCreateIdentity
|
||||
// it NEVER creates one: a missing or unreadable file is an error. It is for
|
||||
// callers that must consume a specific, pre-provisioned identity rather than mint
|
||||
// a fresh one — for example membershipd's persisted internal service identity,
|
||||
// which `membershipd user add --store kv` reads to present the privileged nkey
|
||||
// the cluster authenticator recognizes.
|
||||
func LoadIdentity(path string) (cs.Identity, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return cs.Identity{}, fmt.Errorf("client: read identity %q: %w", path, err)
|
||||
}
|
||||
var f identityFile
|
||||
if err := json.Unmarshal(data, &f); err != nil {
|
||||
return cs.Identity{}, fmt.Errorf("client: parse identity %q: %w", path, err)
|
||||
}
|
||||
id, err := f.toIdentity()
|
||||
if err != nil {
|
||||
return cs.Identity{}, fmt.Errorf("client: decode identity %q: %w", path, err)
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// LoadOrCreateIdentity loads the identity at path, or generates and persists a
|
||||
// new one if the file does not exist. The file is written with 0600
|
||||
// permissions because it holds private keys.
|
||||
// permissions because it holds private keys. A file that exists but is
|
||||
// unreadable or corrupt is an error (NOT silently regenerated), so a damaged
|
||||
// identity surfaces instead of minting a new key that cannot decrypt old data.
|
||||
func LoadOrCreateIdentity(path string) (cs.Identity, error) {
|
||||
if data, err := os.ReadFile(path); err == nil {
|
||||
var f identityFile
|
||||
if err := json.Unmarshal(data, &f); err != nil {
|
||||
return cs.Identity{}, fmt.Errorf("client: parse identity %q: %w", path, err)
|
||||
}
|
||||
id, err := f.toIdentity()
|
||||
if err != nil {
|
||||
return cs.Identity{}, fmt.Errorf("client: decode identity %q: %w", path, err)
|
||||
}
|
||||
return id, nil
|
||||
if _, statErr := os.Stat(path); statErr == nil {
|
||||
return LoadIdentity(path)
|
||||
}
|
||||
|
||||
id, err := cs.GenerateIdentity()
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
package client_test
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"testing"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
)
|
||||
|
||||
// TestClientInvitesAdminAPI drives the wallet-model account flow through the real
|
||||
// pkg/client methods against an in-process membershipd under enforce: an admin
|
||||
// mints an invite, a brand-new identity redeems it via the UNSIGNED Register call
|
||||
// (it is not yet in the allowlist), the admin then sees the user, and finally the
|
||||
// admin hard-deletes it and it vanishes. This is the exact path the admin panel +
|
||||
// the /join client page depend on, so it locks the client/server contract.
|
||||
func TestClientInvitesAdminAPI(t *testing.T) {
|
||||
h := newHarnessMode(t, membership.AuthEnforce)
|
||||
waitHealth(t, h.ctrlURL)
|
||||
|
||||
admin, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
|
||||
if err != nil {
|
||||
t.Fatalf("connect admin: %v", err)
|
||||
}
|
||||
defer admin.Close()
|
||||
registerClient(t, h, admin, "admin", membership.RoleAdmin)
|
||||
|
||||
// Admin mints a single-use invite fixing handle + role.
|
||||
inv, err := admin.CreateInvite("dora", membership.RoleMember, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("admin CreateInvite: %v", err)
|
||||
}
|
||||
if len(inv.Token) != 64 || inv.ExpiresAt == "" {
|
||||
t.Fatalf("invite malformed: %+v", inv)
|
||||
}
|
||||
if inv.Handle != "dora" || inv.Role != membership.RoleMember {
|
||||
t.Fatalf("invite echo wrong: %+v", inv)
|
||||
}
|
||||
|
||||
// It appears among the pending invites.
|
||||
pend, err := admin.ListInvites()
|
||||
if err != nil {
|
||||
t.Fatalf("admin ListInvites: %v", err)
|
||||
}
|
||||
if !containsToken(pend, inv.Token) {
|
||||
t.Fatalf("minted invite not pending: %+v", pend)
|
||||
}
|
||||
|
||||
// A brand-new identity (NOT in the allowlist) redeems the invite via the
|
||||
// UNSIGNED Register. We model its locally-generated keypair with a fresh
|
||||
// identity and present its two public keys. Redeeming through this joiner
|
||||
// client — which never registered and never seeded an admin — proves Register
|
||||
// needs no admin signature; the bearer token is the sole authorization.
|
||||
newID := mustIdentity(t)
|
||||
signPub := hex.EncodeToString(newID.SignPub)
|
||||
kexPub := hex.EncodeToString(newID.KexPub)
|
||||
joiner, err := client.New(h.natsURL, h.ctrlURL, newID)
|
||||
if err != nil {
|
||||
t.Fatalf("connect joiner: %v", err)
|
||||
}
|
||||
defer joiner.Close()
|
||||
if err := joiner.Register(inv.Token, signPub, kexPub); err != nil {
|
||||
t.Fatalf("joiner Register: %v", err)
|
||||
}
|
||||
|
||||
// Admin now sees dora in the allowlist with the invite's handle/role.
|
||||
users, err := admin.ListUsers()
|
||||
if err != nil {
|
||||
t.Fatalf("admin ListUsers: %v", err)
|
||||
}
|
||||
row, ok := findUserInfo(users, signPub)
|
||||
if !ok {
|
||||
t.Fatalf("registered dora missing from allowlist: %+v", users)
|
||||
}
|
||||
if row.Handle != "dora" || row.Role != membership.RoleMember || row.Status != membership.StatusActive {
|
||||
t.Fatalf("dora row wrong: %+v", row)
|
||||
}
|
||||
|
||||
// Single-use: redeeming again is an error.
|
||||
if err := joiner.Register(inv.Token, signPub, kexPub); err == nil {
|
||||
t.Fatalf("second Register should error (used token)")
|
||||
}
|
||||
|
||||
// Admin hard-deletes dora; she vanishes from the allowlist entirely.
|
||||
if err := admin.DeleteUser(signPub); err != nil {
|
||||
t.Fatalf("admin DeleteUser: %v", err)
|
||||
}
|
||||
users, err = admin.ListUsers()
|
||||
if err != nil {
|
||||
t.Fatalf("admin ListUsers after delete: %v", err)
|
||||
}
|
||||
if _, ok := findUserInfo(users, signPub); ok {
|
||||
t.Fatalf("hard-deleted dora must NOT appear: %+v", users)
|
||||
}
|
||||
}
|
||||
|
||||
func containsToken(invites []client.InviteInfo, token string) bool {
|
||||
for _, i := range invites {
|
||||
if i.Token == token {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
package client_test
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
)
|
||||
|
||||
// findUserInfo returns the row with the given signing key (case-insensitive).
|
||||
func findUserInfo(users []client.UserInfo, signPub string) (client.UserInfo, bool) {
|
||||
want := strings.ToLower(signPub)
|
||||
for _, u := range users {
|
||||
if strings.ToLower(u.SignPub) == want {
|
||||
return u, true
|
||||
}
|
||||
}
|
||||
return client.UserInfo{}, false
|
||||
}
|
||||
|
||||
// TestClientUsersAdminAPI drives the admin user-management API through the real
|
||||
// pkg/client methods against an in-process membershipd under enforce: an admin
|
||||
// client adds a user, lists it, revokes it, and sees the status flip — and a
|
||||
// non-admin client is denied. This is the path the admin panel uses, so it locks
|
||||
// the client/server contract the panel depends on.
|
||||
func TestClientUsersAdminAPI(t *testing.T) {
|
||||
h := newHarnessMode(t, membership.AuthEnforce)
|
||||
waitHealth(t, h.ctrlURL)
|
||||
|
||||
admin, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
|
||||
if err != nil {
|
||||
t.Fatalf("connect admin: %v", err)
|
||||
}
|
||||
defer admin.Close()
|
||||
registerClient(t, h, admin, "admin", membership.RoleAdmin)
|
||||
|
||||
member, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
|
||||
if err != nil {
|
||||
t.Fatalf("connect member: %v", err)
|
||||
}
|
||||
defer member.Close()
|
||||
registerClient(t, h, member, "member", membership.RoleMember)
|
||||
|
||||
// A brand-new identity the admin will register over HTTP.
|
||||
carol := mustIdentity(t)
|
||||
carolPub := hex.EncodeToString(carol.SignPub)
|
||||
|
||||
// Admin adds carol as a member.
|
||||
if err := admin.AddUser(carolPub, "carol", membership.RoleMember); err != nil {
|
||||
t.Fatalf("admin AddUser: %v", err)
|
||||
}
|
||||
|
||||
// Admin lists: carol present and active.
|
||||
users, err := admin.ListUsers()
|
||||
if err != nil {
|
||||
t.Fatalf("admin ListUsers: %v", err)
|
||||
}
|
||||
row, ok := findUserInfo(users, carolPub)
|
||||
if !ok {
|
||||
t.Fatalf("carol missing from list after add: %+v", users)
|
||||
}
|
||||
if row.Status != membership.StatusActive || row.Role != membership.RoleMember {
|
||||
t.Fatalf("carol row wrong after add: %+v", row)
|
||||
}
|
||||
|
||||
// Re-adding the same key is a conflict surfaced as an error (no silent upsert).
|
||||
if err := admin.AddUser(carolPub, "carol-again", membership.RoleAdmin); err == nil {
|
||||
t.Fatalf("re-adding carol should error (409), got nil")
|
||||
}
|
||||
|
||||
// Admin revokes carol; list shows the status flip (no hard delete).
|
||||
if err := admin.RevokeUser(carolPub); err != nil {
|
||||
t.Fatalf("admin RevokeUser: %v", err)
|
||||
}
|
||||
users, err = admin.ListUsers()
|
||||
if err != nil {
|
||||
t.Fatalf("admin ListUsers after revoke: %v", err)
|
||||
}
|
||||
row, ok = findUserInfo(users, carolPub)
|
||||
if !ok {
|
||||
t.Fatalf("carol vanished after revoke (should be a status flip): %+v", users)
|
||||
}
|
||||
if row.Status != membership.StatusRevoked {
|
||||
t.Fatalf("carol should be revoked, got status %q", row.Status)
|
||||
}
|
||||
|
||||
// A non-admin (member) is denied on every user-management method.
|
||||
if _, err := member.ListUsers(); err == nil {
|
||||
t.Fatalf("non-admin ListUsers should error (403), got nil")
|
||||
}
|
||||
if err := member.AddUser(carolPub, "x", membership.RoleMember); err == nil {
|
||||
t.Fatalf("non-admin AddUser should error (403), got nil")
|
||||
}
|
||||
if err := member.RevokeUser(carolPub); err == nil {
|
||||
t.Fatalf("non-admin RevokeUser should error (403), got nil")
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
server "github.com/nats-io/nats-server/v2/server"
|
||||
@@ -102,10 +103,38 @@ func StartHostAuth(storeDir, host string, port int, auth server.Authentication)
|
||||
return StartServer(ServerConfig{StoreDir: storeDir, Host: host, Port: port, Auth: auth})
|
||||
}
|
||||
|
||||
// natsLogOpts maps the two independent environment toggles to the embedded
|
||||
// nats-server logging and monitoring flags. It is a pure function (no I/O) so the
|
||||
// decoupling between the two toggles can be unit-tested directly.
|
||||
//
|
||||
// - UNIBUS_NATS_DEBUG="1" enables the nats-server logger (route/RAFT/JetStream
|
||||
// errors); "2" additionally enables protocol tracing. Off by default so the
|
||||
// server stays silent (NoLog) and production behavior is unchanged.
|
||||
// - UNIBUS_NATS_MONITOR="1" opens the monitoring HTTP endpoint (loopback only)
|
||||
// for a local metrics scraper to read /varz, /connz and /jsz.
|
||||
//
|
||||
// The two are DECOUPLED on purpose: enabling the monitoring endpoint must NOT turn
|
||||
// on the verbose debug log, which would write room subjects and routing metadata
|
||||
// to journald in clear and regress the hardened posture (issue 0007). The reverse
|
||||
// coupling is kept for backward compatibility: debug mode still exposes the
|
||||
// monitoring endpoint as well (debug implies monitor), so existing debugging
|
||||
// workflows are unchanged.
|
||||
func natsLogOpts(debugEnv, monitorEnv string) (noLog, debug, trace, monitor bool) {
|
||||
debug = debugEnv == "1" || debugEnv == "2"
|
||||
trace = debugEnv == "2"
|
||||
monitor = monitorEnv == "1" || debug
|
||||
noLog = !debug
|
||||
return noLog, debug, trace, monitor
|
||||
}
|
||||
|
||||
// StartServer launches an embedded nats-server with JetStream from cfg. It
|
||||
// blocks until the server is ready to accept connections (up to 5s) and returns
|
||||
// the running server; the caller must Shutdown it.
|
||||
func StartServer(cfg ServerConfig) (*server.Server, error) {
|
||||
// Map the two independent env toggles to the nats-server logging + monitoring
|
||||
// flags. See natsLogOpts for the decoupling rationale (issue 0007).
|
||||
noLog, debugNATS, traceNATS, monitorNATS := natsLogOpts(
|
||||
os.Getenv("UNIBUS_NATS_DEBUG"), os.Getenv("UNIBUS_NATS_MONITOR"))
|
||||
opts := &server.Options{
|
||||
JetStream: true,
|
||||
StoreDir: cfg.StoreDir,
|
||||
@@ -114,8 +143,19 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
|
||||
ServerName: cfg.ServerName,
|
||||
DontListen: false,
|
||||
// Keep the embedded server quiet by default; the host app logs the URLs.
|
||||
NoLog: true,
|
||||
NoSigs: true,
|
||||
NoLog: noLog,
|
||||
Debug: debugNATS,
|
||||
Trace: traceNATS,
|
||||
Logtime: true,
|
||||
NoSigs: true,
|
||||
}
|
||||
if monitorNATS {
|
||||
// Expose the nats-server monitoring endpoint on LOOPBACK ONLY (never public):
|
||||
// the operator (or a local metrics scraper) inspects /varz, /connz, /jsz,
|
||||
// /routez. The 127.0.0.1 bind is mandatory because this endpoint has no auth;
|
||||
// it must stay unreachable from the network.
|
||||
opts.HTTPHost = "127.0.0.1"
|
||||
opts.HTTPPort = 8222
|
||||
}
|
||||
if cfg.Auth != nil {
|
||||
opts.CustomClientAuthentication = cfg.Auth
|
||||
@@ -141,6 +181,10 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
|
||||
return nil, fmt.Errorf("embeddednats: new server: %w", err)
|
||||
}
|
||||
|
||||
if debugNATS {
|
||||
ns.ConfigureLogger()
|
||||
}
|
||||
|
||||
go ns.Start()
|
||||
|
||||
if !ns.ReadyForConnections(5 * time.Second) {
|
||||
@@ -162,6 +206,21 @@ func applyClusterOpts(opts *server.Options, c *ClusterConfig) error {
|
||||
Port: c.Port,
|
||||
Username: c.Username,
|
||||
Password: c.Password,
|
||||
// Disable route connection pooling (nats-server 2.10+ defaults to a pool of
|
||||
// 3 connections per peer). On a small cluster the pool churns with
|
||||
// "duplicate route"/"client closed" reconnects that interrupt the meta-group
|
||||
// RAFT heartbeats, causing perpetual leader re-elections so the JetStream
|
||||
// meta never becomes current and stream/KV creation hangs (issue 0006g).
|
||||
// PoolSize=-1 forces the classic single route per peer, which is stable for
|
||||
// the 3-node unibus cluster.
|
||||
PoolSize: -1,
|
||||
// NoAdvertise stops the server from gossiping its locally-discovered IPs to
|
||||
// peers. The cluster nodes are Docker hosts, so without this NATS advertises
|
||||
// the docker bridge addresses (172.x / 10.0.x) as reachable routes; peers
|
||||
// then try to dial those private, mutually-unreachable IPs, churning the
|
||||
// route layer and destabilizing the JetStream meta-group. With NoAdvertise
|
||||
// the nodes use ONLY the explicit public-IP routes we configure (issue 0006g).
|
||||
NoAdvertise: true,
|
||||
}
|
||||
if c.TLS != nil {
|
||||
opts.Cluster.TLSConfig = c.TLS
|
||||
|
||||
@@ -0,0 +1,134 @@
|
||||
package embeddednats
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestNatsLogOptsDecoupled is the core regression guard for issue 0007: turning
|
||||
// on the monitoring endpoint must NEVER turn on the verbose nats-server debug log
|
||||
// (which would leak room subjects/routing metadata to journald). It also checks
|
||||
// the backward-compatible coupling (debug still implies monitoring) and the quiet
|
||||
// default.
|
||||
func TestNatsLogOptsDecoupled(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
debugEnv, monitorEnv string
|
||||
noLog, debug, trace, monitor bool
|
||||
}{
|
||||
{"default off — quiet, no monitor", "", "", true, false, false, false},
|
||||
{"monitor only — endpoint on, log stays quiet", "", "1", true, false, false, true},
|
||||
{"debug implies monitor", "1", "", false, true, false, true},
|
||||
{"trace implies debug+monitor", "2", "", false, true, true, true},
|
||||
{"both set", "1", "1", false, true, false, true},
|
||||
{"monitor garbage value ignored", "", "yes", true, false, false, false},
|
||||
{"debug garbage value ignored", "true", "", true, false, false, false},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
noLog, debug, trace, monitor := natsLogOpts(c.debugEnv, c.monitorEnv)
|
||||
if noLog != c.noLog || debug != c.debug || trace != c.trace || monitor != c.monitor {
|
||||
t.Fatalf("natsLogOpts(%q,%q) = (noLog=%v debug=%v trace=%v monitor=%v), want (noLog=%v debug=%v trace=%v monitor=%v)",
|
||||
c.debugEnv, c.monitorEnv, noLog, debug, trace, monitor,
|
||||
c.noLog, c.debug, c.trace, c.monitor)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Explicit golden assertion of the security property: monitor on, log off.
|
||||
noLog, debug, _, monitor := natsLogOpts("", "1")
|
||||
if !monitor {
|
||||
t.Fatal("UNIBUS_NATS_MONITOR=1 must open the monitoring endpoint")
|
||||
}
|
||||
if debug || !noLog {
|
||||
t.Fatalf("UNIBUS_NATS_MONITOR=1 must NOT enable the debug log (got debug=%v noLog=%v)", debug, noLog)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMonitorEndpointLoopback boots a real embedded server with
|
||||
// UNIBUS_NATS_MONITOR=1 (and DEBUG explicitly off) and proves the monitoring HTTP
|
||||
// endpoint answers on loopback only — the exact contract the metrics scraper
|
||||
// relies on. The pure decoupling check above already guarantees the log stays out
|
||||
// of debug mode for this same env combination.
|
||||
func TestMonitorEndpointLoopback(t *testing.T) {
|
||||
t.Setenv("UNIBUS_NATS_DEBUG", "")
|
||||
t.Setenv("UNIBUS_NATS_MONITOR", "1")
|
||||
|
||||
ns, err := StartServer(ServerConfig{
|
||||
StoreDir: t.TempDir(),
|
||||
Host: "127.0.0.1",
|
||||
Port: freeLoopbackPort(t),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("start server with monitoring: %v", err)
|
||||
}
|
||||
defer func() { ns.Shutdown(); ns.WaitForShutdown() }()
|
||||
|
||||
addr := ns.MonitorAddr()
|
||||
if addr == nil {
|
||||
t.Fatal("monitoring endpoint not open with UNIBUS_NATS_MONITOR=1 (MonitorAddr is nil)")
|
||||
}
|
||||
if !addr.IP.IsLoopback() {
|
||||
t.Fatalf("monitoring endpoint bound to %s, must be loopback only", addr.IP)
|
||||
}
|
||||
if addr.Port != 8222 {
|
||||
t.Fatalf("monitoring endpoint on port %d, want the fixed loopback port 8222", addr.Port)
|
||||
}
|
||||
|
||||
// /varz must answer 200 with a non-empty body on loopback.
|
||||
url := "http://" + addr.String() + "/varz"
|
||||
var resp *http.Response
|
||||
deadline := time.Now().Add(3 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
resp, err = http.Get(url) //nolint:gosec // loopback monitoring endpoint, no auth by design
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("GET %s: %v", url, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("GET %s -> %d, want 200", url, resp.StatusCode)
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
if len(body) == 0 {
|
||||
t.Fatalf("GET %s returned an empty body", url)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMonitorDisabledByDefault proves a server started without either toggle does
|
||||
// NOT open the monitoring endpoint, so production stays closed unless opted in.
|
||||
func TestMonitorDisabledByDefault(t *testing.T) {
|
||||
t.Setenv("UNIBUS_NATS_DEBUG", "")
|
||||
t.Setenv("UNIBUS_NATS_MONITOR", "")
|
||||
|
||||
ns, err := StartServer(ServerConfig{
|
||||
StoreDir: t.TempDir(),
|
||||
Host: "127.0.0.1",
|
||||
Port: freeLoopbackPort(t),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("start server: %v", err)
|
||||
}
|
||||
defer func() { ns.Shutdown(); ns.WaitForShutdown() }()
|
||||
|
||||
if addr := ns.MonitorAddr(); addr != nil {
|
||||
t.Fatalf("monitoring endpoint open (%s) without UNIBUS_NATS_MONITOR — must stay closed by default", addr)
|
||||
}
|
||||
}
|
||||
|
||||
func freeLoopbackPort(t *testing.T) int {
|
||||
t.Helper()
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("free port: %v", err)
|
||||
}
|
||||
defer l.Close()
|
||||
return l.Addr().(*net.TCPAddr).Port
|
||||
}
|
||||
@@ -0,0 +1,296 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Invite is a single-use registration token the admin mints so a brand-new
|
||||
// identity can join the bus allowlist WITHOUT the admin ever handling its
|
||||
// private key (the wallet model: the key is born and stays on the user's
|
||||
// device; only the public key is published, via POST /register).
|
||||
//
|
||||
// The admin fixes the handle and role at mint time; the registering client may
|
||||
// NOT change them (no privilege escalation). Token is 32 random bytes in
|
||||
// lowercase hex (64 chars). ExpiresAt and CreatedAt are RFC3339Nano UTC. Used
|
||||
// flips to true the instant the invite is consumed, and an invite can be
|
||||
// consumed at most once. The audit fields (UsedAt/UsedSignPub/UsedKexPub) are
|
||||
// empty until the invite is consumed; they record which keys claimed it, so the
|
||||
// link between an invite and the identity it created stays traceable even though
|
||||
// the allowlist row itself stores only the signing key.
|
||||
type Invite struct {
|
||||
Token string `json:"token"`
|
||||
Handle string `json:"handle"`
|
||||
Role string `json:"role"`
|
||||
ExpiresAt string `json:"expires_at"`
|
||||
Used bool `json:"used"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
|
||||
// Audit (populated on consume; omitted on the wire while pending).
|
||||
UsedAt string `json:"used_at,omitempty"`
|
||||
UsedSignPub string `json:"used_sign_pub,omitempty"`
|
||||
UsedKexPub string `json:"used_kex_pub,omitempty"`
|
||||
}
|
||||
|
||||
// Invite-flow sentinels. They let callers (and the HTTP layer) map a failed
|
||||
// consume to a precise status code without string-matching: an unknown token is
|
||||
// ErrNotFound (reused from the store), a spent token is ErrInviteUsed, a
|
||||
// past-deadline token is ErrInviteExpired. ErrUserExists (from users.go) is
|
||||
// reused when the presented signing key is already registered.
|
||||
var (
|
||||
ErrInviteUsed = errors.New("membership: invite already used")
|
||||
ErrInviteExpired = errors.New("membership: invite expired")
|
||||
)
|
||||
|
||||
// defaultInviteTTL is the lifetime of an invite when the caller passes a
|
||||
// non-positive ttlSecs. Seven days mirrors a typical "share this link this
|
||||
// week" expectation while keeping the un-authenticated /register window bounded.
|
||||
const defaultInviteTTL = 7 * 24 * time.Hour
|
||||
|
||||
// newInviteToken returns 32 cryptographically-random bytes as lowercase hex (64
|
||||
// chars). The token IS the bearer secret that authorizes /register, so it must
|
||||
// be unguessable; crypto/rand is the only acceptable source.
|
||||
func newInviteToken() (string, error) {
|
||||
b := make([]byte, 32)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
return "", fmt.Errorf("membership: generate invite token: %w", err)
|
||||
}
|
||||
return hex.EncodeToString(b), nil
|
||||
}
|
||||
|
||||
// inviteTTL resolves a caller-supplied ttlSecs into a concrete duration,
|
||||
// defaulting to defaultInviteTTL when non-positive.
|
||||
func inviteTTL(ttlSecs int) time.Duration {
|
||||
if ttlSecs <= 0 {
|
||||
return defaultInviteTTL
|
||||
}
|
||||
return time.Duration(ttlSecs) * time.Second
|
||||
}
|
||||
|
||||
// inviteIsExpired reports whether the RFC3339 expiry has passed. A token whose
|
||||
// expiry cannot be parsed is treated as expired (fail closed): a corrupt
|
||||
// deadline must never widen the unauthenticated registration window.
|
||||
func inviteIsExpired(expiresAt string) bool {
|
||||
exp, err := time.Parse(time.RFC3339Nano, expiresAt)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
return time.Now().UTC().After(exp)
|
||||
}
|
||||
|
||||
// validateInviteRole normalizes and validates the role an invite may carry. It
|
||||
// mirrors AddUser: empty defaults to member, and only admin|member are allowed
|
||||
// (an admin minting an admin invite is deliberate and permitted).
|
||||
func validateInviteRole(role string) (string, error) {
|
||||
if role == "" {
|
||||
return RoleMember, nil
|
||||
}
|
||||
if role != RoleAdmin && role != RoleMember {
|
||||
return "", fmt.Errorf("membership: invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember)
|
||||
}
|
||||
return role, nil
|
||||
}
|
||||
|
||||
// ---- SQLite implementation ------------------------------------------------
|
||||
|
||||
// CreateInvite mints a single-use invite for a future user. handle is required;
|
||||
// role defaults to member and must be admin|member. ttlSecs sets the lifetime
|
||||
// (non-positive uses the 7-day default). The token is 32 random bytes in hex.
|
||||
func (s *sqliteStore) CreateInvite(handle, role string, ttlSecs int) (Invite, error) {
|
||||
if handle == "" {
|
||||
return Invite{}, fmt.Errorf("membership: CreateInvite: handle required")
|
||||
}
|
||||
role, err := validateInviteRole(role)
|
||||
if err != nil {
|
||||
return Invite{}, err
|
||||
}
|
||||
token, err := newInviteToken()
|
||||
if err != nil {
|
||||
return Invite{}, err
|
||||
}
|
||||
now := time.Now().UTC()
|
||||
inv := Invite{
|
||||
Token: token,
|
||||
Handle: handle,
|
||||
Role: role,
|
||||
ExpiresAt: now.Add(inviteTTL(ttlSecs)).Format(time.RFC3339Nano),
|
||||
Used: false,
|
||||
CreatedAt: now.Format(time.RFC3339Nano),
|
||||
}
|
||||
if _, err := s.db.Exec(
|
||||
`INSERT INTO invites (token, handle, role, expires_at, used, created_at) VALUES (?, ?, ?, ?, 0, ?)`,
|
||||
inv.Token, inv.Handle, inv.Role, inv.ExpiresAt, inv.CreatedAt,
|
||||
); err != nil {
|
||||
return Invite{}, fmt.Errorf("membership: insert invite: %w", err)
|
||||
}
|
||||
return inv, nil
|
||||
}
|
||||
|
||||
// GetInvite returns the invite with the given token, or ErrNotFound (wrapped)
|
||||
// when there is none.
|
||||
func (s *sqliteStore) GetInvite(token string) (Invite, error) {
|
||||
var inv Invite
|
||||
var used int
|
||||
var usedAt, usedSign, usedKex sql.NullString
|
||||
err := s.db.QueryRow(
|
||||
`SELECT token, handle, role, expires_at, used, created_at, used_at, used_sign_pub, used_kex_pub
|
||||
FROM invites WHERE token = ?`, token,
|
||||
).Scan(&inv.Token, &inv.Handle, &inv.Role, &inv.ExpiresAt, &used, &inv.CreatedAt, &usedAt, &usedSign, &usedKex)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, ErrNotFound)
|
||||
}
|
||||
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, err)
|
||||
}
|
||||
inv.Used = used != 0
|
||||
inv.UsedAt, inv.UsedSignPub, inv.UsedKexPub = usedAt.String, usedSign.String, usedKex.String
|
||||
return inv, nil
|
||||
}
|
||||
|
||||
// ListInvites returns every invite ordered newest-first (by created_at). It
|
||||
// includes consumed invites so the admin panel can show the full picture; the
|
||||
// caller filters to "pending" when it wants only live links.
|
||||
func (s *sqliteStore) ListInvites() ([]Invite, error) {
|
||||
rows, err := s.db.Query(
|
||||
`SELECT token, handle, role, expires_at, used, created_at, used_at, used_sign_pub, used_kex_pub
|
||||
FROM invites ORDER BY created_at DESC, token`,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("membership: list invites: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var out []Invite
|
||||
for rows.Next() {
|
||||
var inv Invite
|
||||
var used int
|
||||
var usedAt, usedSign, usedKex sql.NullString
|
||||
if err := rows.Scan(&inv.Token, &inv.Handle, &inv.Role, &inv.ExpiresAt, &used, &inv.CreatedAt, &usedAt, &usedSign, &usedKex); err != nil {
|
||||
return nil, fmt.Errorf("membership: scan invite: %w", err)
|
||||
}
|
||||
inv.Used = used != 0
|
||||
inv.UsedAt, inv.UsedSignPub, inv.UsedKexPub = usedAt.String, usedSign.String, usedKex.String
|
||||
out = append(out, inv)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// ConsumeInvite atomically validates and spends an invite, registering the
|
||||
// presented signing key as a bus user with the invite's handle and role. It is
|
||||
// the ONLY path that adds to the allowlist without an admin signature: the
|
||||
// bearer token is the authorization, so the checks here are the security
|
||||
// boundary.
|
||||
//
|
||||
// Atomicity (single transaction): the invite is marked used FIRST (guarded by
|
||||
// `used = 0`, so two concurrent consumers cannot both win), then the user is
|
||||
// inserted. A token that passes validation is therefore spent exactly once.
|
||||
// Special case: if the signing key is already registered, the user INSERT hits
|
||||
// the PRIMARY KEY and we return ErrUserExists — but the invite stays SPENT (we
|
||||
// commit the mark), matching the JetStream backend's burn-on-claim semantics so
|
||||
// the two stores behave identically. A genuine backend error rolls everything
|
||||
// back, leaving the invite reusable.
|
||||
func (s *sqliteStore) ConsumeInvite(token, signPub, kexPub string) error {
|
||||
signPub = normalizeSignPub(signPub)
|
||||
kexPub = normalizeSignPub(kexPub)
|
||||
if signPub == "" {
|
||||
return fmt.Errorf("membership: ConsumeInvite: sign_pub required")
|
||||
}
|
||||
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: ConsumeInvite: begin: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
var handle, role, expiresAt string
|
||||
var used int
|
||||
err = tx.QueryRow(
|
||||
`SELECT handle, role, expires_at, used FROM invites WHERE token = ?`, token,
|
||||
).Scan(&handle, &role, &expiresAt, &used)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return fmt.Errorf("membership: consume invite %q: %w", token, ErrNotFound)
|
||||
}
|
||||
return fmt.Errorf("membership: consume invite %q: %w", token, err)
|
||||
}
|
||||
if used != 0 {
|
||||
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
|
||||
}
|
||||
if inviteIsExpired(expiresAt) {
|
||||
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteExpired)
|
||||
}
|
||||
|
||||
// Mark used first, guarded by used = 0 so a concurrent consumer that already
|
||||
// flipped it (rows affected = 0) is rejected as used rather than double-spending.
|
||||
now := nowRFC3339()
|
||||
res, err := tx.Exec(
|
||||
`UPDATE invites SET used = 1, used_at = ?, used_sign_pub = ?, used_kex_pub = ? WHERE token = ? AND used = 0`,
|
||||
now, signPub, kexPub, token,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: consume invite %q: mark used: %w", token, err)
|
||||
}
|
||||
n, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: consume invite %q: rows affected: %w", token, err)
|
||||
}
|
||||
if n == 0 {
|
||||
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
|
||||
}
|
||||
|
||||
// Register the user with the invite-fixed handle and role.
|
||||
_, err = tx.Exec(
|
||||
`INSERT INTO users (sign_pub, handle, role, status, created_at) VALUES (?, ?, ?, ?, ?)`,
|
||||
signPub, handle, role, StatusActive, now,
|
||||
)
|
||||
if err != nil {
|
||||
// Already-registered key: the invite is still spent (commit the mark) so
|
||||
// the burn-on-claim contract matches the KV store. Any other failure rolls back.
|
||||
if isUniqueViolation(err) {
|
||||
if cErr := tx.Commit(); cErr != nil {
|
||||
return fmt.Errorf("membership: consume invite %q: commit: %w", token, cErr)
|
||||
}
|
||||
return ErrUserExists
|
||||
}
|
||||
return fmt.Errorf("membership: consume invite %q: insert user: %w", token, err)
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
return fmt.Errorf("membership: consume invite %q: commit: %w", token, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CancelInvite removes a pending invite (the admin revoked the link before it
|
||||
// was used). It hard-deletes the row; a consumed invite stays for audit only if
|
||||
// the caller targets a pending token. Deleting an unknown token returns
|
||||
// ErrNotFound so the HTTP layer can answer 404.
|
||||
func (s *sqliteStore) CancelInvite(token string) error {
|
||||
res, err := s.db.Exec(`DELETE FROM invites WHERE token = ?`, token)
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: cancel invite %q: %w", token, err)
|
||||
}
|
||||
n, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: cancel invite %q: rows affected: %w", token, err)
|
||||
}
|
||||
if n == 0 {
|
||||
return fmt.Errorf("membership: cancel invite %q: %w", token, ErrNotFound)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// isUniqueViolation reports whether err is a SQLite UNIQUE/PRIMARY KEY conflict.
|
||||
// modernc.org/sqlite surfaces it as a message fragment; matching it here keeps
|
||||
// the string-matching in one place (the same fragments AddUser checks inline).
|
||||
func isUniqueViolation(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
msg := err.Error()
|
||||
return strings.Contains(msg, "UNIQUE constraint") || strings.Contains(msg, "PRIMARY KEY")
|
||||
}
|
||||
@@ -0,0 +1,194 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
)
|
||||
|
||||
// postRegister posts an UNSIGNED /register request (the wallet-model join: the
|
||||
// new identity is not yet in the allowlist, so it cannot sign). It returns the
|
||||
// status and body so a test can assert the precise code.
|
||||
func postRegister(t *testing.T, h *authHarness, body registerReq) (int, string) {
|
||||
t.Helper()
|
||||
b, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal register: %v", err)
|
||||
}
|
||||
resp, err := http.Post(h.ts.URL+"/register", "application/json", bytes.NewReader(b))
|
||||
if err != nil {
|
||||
t.Fatalf("post register: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
rb, _ := io.ReadAll(resp.Body)
|
||||
return resp.StatusCode, string(rb)
|
||||
}
|
||||
|
||||
// TestInvitesHTTP_Golden is the end-to-end wallet-model flow over real HTTP:
|
||||
// alice (admin) mints an invite, a brand-new identity redeems it UNSIGNED via
|
||||
// /register, the user then appears in the admin allowlist, and a second redeem of
|
||||
// the same token is rejected as used.
|
||||
func TestInvitesHTTP_Golden(t *testing.T) {
|
||||
h := newAuthHarness(t, AuthEnforce)
|
||||
|
||||
// Admin mints an invite.
|
||||
var inv createInviteResp
|
||||
code, body := signedJSON(t, h, "POST", "/invites",
|
||||
createInviteReq{Handle: "newbie", Role: RoleMember}, h.alice, 1)
|
||||
if code != http.StatusCreated {
|
||||
t.Fatalf("admin create invite should be 201, got %d (%s)", code, body)
|
||||
}
|
||||
if err := json.Unmarshal([]byte(body), &inv); err != nil {
|
||||
t.Fatalf("decode invite: %v (%s)", err, body)
|
||||
}
|
||||
if len(inv.Token) != 64 || inv.ExpiresAt == "" {
|
||||
t.Fatalf("invite token/expiry malformed: %+v", inv)
|
||||
}
|
||||
|
||||
// A brand-new identity redeems it WITHOUT any admin signature.
|
||||
id, _ := cs.GenerateIdentity()
|
||||
signPub := hex.EncodeToString(id.SignPub)
|
||||
kexPub := hex.EncodeToString(id.KexPub)
|
||||
if code, body := postRegister(t, h, registerReq{Token: inv.Token, SignPub: signPub, KexPub: kexPub}); code != http.StatusCreated {
|
||||
t.Fatalf("register should be 201, got %d (%s)", code, body)
|
||||
}
|
||||
|
||||
// The user now appears in the admin allowlist with the invite's handle/role.
|
||||
users := listUsers(t, h, 2)
|
||||
row, ok := findUser(users, signPub)
|
||||
if !ok {
|
||||
t.Fatalf("registered user missing from allowlist: %+v", users)
|
||||
}
|
||||
if row.Handle != "newbie" || row.Role != RoleMember || row.Status != StatusActive {
|
||||
t.Fatalf("registered user row wrong: %+v", row)
|
||||
}
|
||||
|
||||
// The invite is no longer pending.
|
||||
if code, body := signedJSON(t, h, "GET", "/invites", nil, h.alice, 3); code == http.StatusOK {
|
||||
var pend []inviteJSON
|
||||
_ = json.Unmarshal([]byte(body), &pend)
|
||||
for _, p := range pend {
|
||||
if p.Token == inv.Token {
|
||||
t.Fatalf("consumed invite should not be listed as pending: %+v", pend)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Single-use: a second redeem of the same token is 409 used.
|
||||
id2, _ := cs.GenerateIdentity()
|
||||
if code, body := postRegister(t, h, registerReq{
|
||||
Token: inv.Token, SignPub: hex.EncodeToString(id2.SignPub), KexPub: hex.EncodeToString(id2.KexPub),
|
||||
}); code != http.StatusConflict {
|
||||
t.Fatalf("second redeem should be 409, got %d (%s)", code, body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestInvitesHTTP_RegisterValidation covers /register input + state errors: an
|
||||
// unknown token is 404, an expired token is 410, and malformed hex keys are 400 —
|
||||
// each WITHOUT registering anything.
|
||||
func TestInvitesHTTP_RegisterValidation(t *testing.T) {
|
||||
h := newAuthHarness(t, AuthEnforce)
|
||||
id, _ := cs.GenerateIdentity()
|
||||
signPub := hex.EncodeToString(id.SignPub)
|
||||
kexPub := hex.EncodeToString(id.KexPub)
|
||||
|
||||
// Unknown token -> 404.
|
||||
if code, body := postRegister(t, h, registerReq{Token: "deadbeef", SignPub: signPub, KexPub: kexPub}); code != http.StatusNotFound {
|
||||
t.Fatalf("unknown token should be 404, got %d (%s)", code, body)
|
||||
}
|
||||
|
||||
// Malformed sign_pub -> 400.
|
||||
if code, body := postRegister(t, h, registerReq{Token: "x", SignPub: "abcd", KexPub: kexPub}); code != http.StatusBadRequest {
|
||||
t.Fatalf("malformed sign_pub should be 400, got %d (%s)", code, body)
|
||||
}
|
||||
|
||||
// Malformed kex_pub -> 400.
|
||||
if code, body := postRegister(t, h, registerReq{Token: "x", SignPub: signPub, KexPub: "zzzz"}); code != http.StatusBadRequest {
|
||||
t.Fatalf("malformed kex_pub should be 400, got %d (%s)", code, body)
|
||||
}
|
||||
|
||||
// Expired token -> 410. Mint via the admin API, then force its deadline past
|
||||
// directly in the store (white-box).
|
||||
var inv createInviteResp
|
||||
_, body := signedJSON(t, h, "POST", "/invites", createInviteReq{Handle: "late", Role: RoleMember}, h.alice, 1)
|
||||
if err := json.Unmarshal([]byte(body), &inv); err != nil {
|
||||
t.Fatalf("decode invite: %v (%s)", err, body)
|
||||
}
|
||||
ss, ok := h.store.(*sqliteStore)
|
||||
if !ok {
|
||||
t.Fatalf("expected sqliteStore harness")
|
||||
}
|
||||
past := time.Now().Add(-time.Hour).UTC().Format(time.RFC3339Nano)
|
||||
if _, err := ss.db.Exec(`UPDATE invites SET expires_at = ? WHERE token = ?`, past, inv.Token); err != nil {
|
||||
t.Fatalf("force expire: %v", err)
|
||||
}
|
||||
if code, rb := postRegister(t, h, registerReq{Token: inv.Token, SignPub: signPub, KexPub: kexPub}); code != http.StatusGone {
|
||||
t.Fatalf("expired token should be 410, got %d (%s)", code, rb)
|
||||
}
|
||||
}
|
||||
|
||||
// TestInvitesHTTP_NonAdminForbidden is the security spine for the new endpoints:
|
||||
// a REGISTERED non-admin (bob) is denied on POST /invites, GET /invites,
|
||||
// DELETE /invites/{token}, and DELETE /users/{signpub} — each a 403 by role.
|
||||
func TestInvitesHTTP_NonAdminForbidden(t *testing.T) {
|
||||
h := newAuthHarness(t, AuthEnforce)
|
||||
|
||||
bob, _ := cs.GenerateIdentity()
|
||||
register(t, h, bob, "bob") // role member
|
||||
bobPub := hex.EncodeToString(bob.SignPub)
|
||||
|
||||
checks := []struct {
|
||||
name string
|
||||
method string
|
||||
path string
|
||||
body any
|
||||
}{
|
||||
{"create invite", "POST", "/invites", createInviteReq{Handle: "x", Role: RoleMember}},
|
||||
{"list invites", "GET", "/invites", nil},
|
||||
{"cancel invite", "DELETE", "/invites/sometoken", nil},
|
||||
{"delete user", "DELETE", "/users/" + bobPub, nil},
|
||||
}
|
||||
for i, c := range checks {
|
||||
code, body := signedJSON(t, h, c.method, c.path, c.body, bob, i+1)
|
||||
if code != http.StatusForbidden {
|
||||
t.Fatalf("non-admin %s should be 403, got %d (%s)", c.name, code, body)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestUsersHTTP_HardDelete proves DELETE /users/{signpub} purges a user (distinct
|
||||
// from revoke's status flip): alice adds carol, hard-deletes her, and carol then
|
||||
// vanishes from the allowlist entirely (not merely flagged revoked).
|
||||
func TestUsersHTTP_HardDelete(t *testing.T) {
|
||||
h := newAuthHarness(t, AuthEnforce)
|
||||
|
||||
carol, _ := cs.GenerateIdentity()
|
||||
carolPub := hex.EncodeToString(carol.SignPub)
|
||||
if code, body := signedJSON(t, h, "POST", "/users",
|
||||
addUserReq{SignPub: carolPub, Handle: "carol", Role: RoleMember}, h.alice, 1); code != http.StatusCreated {
|
||||
t.Fatalf("add carol should be 201, got %d (%s)", code, body)
|
||||
}
|
||||
|
||||
// Hard-delete carol.
|
||||
if code, body := signedJSON(t, h, "DELETE", "/users/"+carolPub, nil, h.alice, 2); code != http.StatusOK {
|
||||
t.Fatalf("hard-delete carol should be 200, got %d (%s)", code, body)
|
||||
}
|
||||
|
||||
// She is gone entirely — not present in the list at all (vs revoke, which
|
||||
// keeps her as status=revoked).
|
||||
users := listUsers(t, h, 3)
|
||||
if _, ok := findUser(users, carolPub); ok {
|
||||
t.Fatalf("hard-deleted carol must NOT appear in the allowlist: %+v", users)
|
||||
}
|
||||
|
||||
// Deleting her again is a 404.
|
||||
if code, body := signedJSON(t, h, "DELETE", "/users/"+carolPub, nil, h.alice, 4); code != http.StatusNotFound {
|
||||
t.Fatalf("re-delete should be 404, got %d (%s)", code, body)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,186 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
)
|
||||
|
||||
// newIDHex generates a fresh identity and returns its signing and key-exchange
|
||||
// public keys as lowercase hex — the two keys a client presents to /register.
|
||||
func newIDHex(t *testing.T) (signPub, kexPub string) {
|
||||
t.Helper()
|
||||
id, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("identity: %v", err)
|
||||
}
|
||||
return hex.EncodeToString(id.SignPub), hex.EncodeToString(id.KexPub)
|
||||
}
|
||||
|
||||
// inviteSuite drives the full invite lifecycle against any Store backend: mint,
|
||||
// look up, redeem (which registers the user), reject a second redeem (single-use)
|
||||
// and a non-existent token, reject an expired token (forced past via the
|
||||
// backend-specific forceExpire closure), and hard-delete a user. It is shared by
|
||||
// the SQLite and JetStream tests so both backends prove identical behavior.
|
||||
func inviteSuite(t *testing.T, s Store, forceExpire func(token string)) {
|
||||
t.Helper()
|
||||
|
||||
// Mint an invite fixing handle + role.
|
||||
inv, err := s.CreateInvite("alice-new", RoleMember, 3600)
|
||||
if err != nil {
|
||||
t.Fatalf("CreateInvite: %v", err)
|
||||
}
|
||||
if len(inv.Token) != 64 {
|
||||
t.Fatalf("token should be 64 hex chars, got %d (%q)", len(inv.Token), inv.Token)
|
||||
}
|
||||
if inv.Used {
|
||||
t.Fatalf("fresh invite must not be used")
|
||||
}
|
||||
|
||||
// GetInvite round-trips it.
|
||||
got, err := s.GetInvite(inv.Token)
|
||||
if err != nil || got.Handle != "alice-new" || got.Role != RoleMember {
|
||||
t.Fatalf("GetInvite mismatch: %+v err=%v", got, err)
|
||||
}
|
||||
|
||||
// Redeem it: the presented signing key joins the allowlist with the invite's
|
||||
// handle and role.
|
||||
signPub, kexPub := newIDHex(t)
|
||||
if err := s.ConsumeInvite(inv.Token, signPub, kexPub); err != nil {
|
||||
t.Fatalf("ConsumeInvite (golden): %v", err)
|
||||
}
|
||||
u, err := s.GetUser(signPub)
|
||||
if err != nil {
|
||||
t.Fatalf("GetUser after register: %v", err)
|
||||
}
|
||||
if u.Handle != "alice-new" || u.Role != RoleMember || u.Status != StatusActive {
|
||||
t.Fatalf("registered user wrong: %+v", u)
|
||||
}
|
||||
if !s.IsAuthorized(signPub) {
|
||||
t.Fatalf("registered user should be authorized")
|
||||
}
|
||||
|
||||
// Single-use: redeeming the same token again (even with a different identity)
|
||||
// is rejected as used.
|
||||
sp2, kp2 := newIDHex(t)
|
||||
if err := s.ConsumeInvite(inv.Token, sp2, kp2); !errors.Is(err, ErrInviteUsed) {
|
||||
t.Fatalf("second redeem should be ErrInviteUsed, got %v", err)
|
||||
}
|
||||
if _, err := s.GetUser(sp2); !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("second identity must NOT be registered, got %v", err)
|
||||
}
|
||||
|
||||
// Unknown token is ErrNotFound.
|
||||
if err := s.ConsumeInvite("deadbeef", "ab", "cd"); !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("unknown token should be ErrNotFound, got %v", err)
|
||||
}
|
||||
|
||||
// Expired invite: mint one, force its deadline into the past, redeem -> rejected.
|
||||
exp, err := s.CreateInvite("late", RoleMember, 3600)
|
||||
if err != nil {
|
||||
t.Fatalf("CreateInvite expired: %v", err)
|
||||
}
|
||||
forceExpire(exp.Token)
|
||||
sp3, kp3 := newIDHex(t)
|
||||
if err := s.ConsumeInvite(exp.Token, sp3, kp3); !errors.Is(err, ErrInviteExpired) {
|
||||
t.Fatalf("expired redeem should be ErrInviteExpired, got %v", err)
|
||||
}
|
||||
|
||||
// CancelInvite removes a pending invite; redeeming it afterward is ErrNotFound.
|
||||
canc, err := s.CreateInvite("cancelme", RoleMember, 3600)
|
||||
if err != nil {
|
||||
t.Fatalf("CreateInvite cancel: %v", err)
|
||||
}
|
||||
if err := s.CancelInvite(canc.Token); err != nil {
|
||||
t.Fatalf("CancelInvite: %v", err)
|
||||
}
|
||||
if err := s.ConsumeInvite(canc.Token, sp3, kp3); !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("cancelled invite redeem should be ErrNotFound, got %v", err)
|
||||
}
|
||||
|
||||
// Hard-delete the registered user: it disappears from the allowlist entirely.
|
||||
if err := s.DeleteUser(signPub); err != nil {
|
||||
t.Fatalf("DeleteUser: %v", err)
|
||||
}
|
||||
if _, err := s.GetUser(signPub); !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("deleted user should be ErrNotFound, got %v", err)
|
||||
}
|
||||
if s.IsAuthorized(signPub) {
|
||||
t.Fatalf("deleted user must not be authorized")
|
||||
}
|
||||
// Deleting an unknown key is ErrNotFound.
|
||||
if err := s.DeleteUser(signPub); !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("re-delete should be ErrNotFound, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestInvitesSQLite runs the suite against the default SQLite backend, forcing
|
||||
// expiry with a direct UPDATE on the embedded DB (white-box, same package).
|
||||
func TestInvitesSQLite(t *testing.T) {
|
||||
s := openTestStore(t)
|
||||
inviteSuite(t, s, func(token string) {
|
||||
past := time.Now().Add(-time.Hour).UTC().Format(time.RFC3339Nano)
|
||||
if _, err := s.db.Exec(`UPDATE invites SET expires_at = ? WHERE token = ?`, past, token); err != nil {
|
||||
t.Fatalf("force expire: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestInvitesJetStream runs the same suite against the replicated KV backend,
|
||||
// forcing expiry by re-Putting the invite JSON with a past deadline.
|
||||
func TestInvitesJetStream(t *testing.T) {
|
||||
s, _, _ := newKVStore(t)
|
||||
inviteSuite(t, s, func(token string) {
|
||||
inv, err := s.GetInvite(token)
|
||||
if err != nil {
|
||||
t.Fatalf("force expire: get invite: %v", err)
|
||||
}
|
||||
inv.ExpiresAt = time.Now().Add(-time.Hour).UTC().Format(time.RFC3339Nano)
|
||||
b, err := json.Marshal(inv)
|
||||
if err != nil {
|
||||
t.Fatalf("force expire: marshal: %v", err)
|
||||
}
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
if _, err := s.invites.Put(ctx, token, b); err != nil {
|
||||
t.Fatalf("force expire: put: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestConsumeInvite_AlreadyRegistered covers the burn-on-claim edge: redeeming a
|
||||
// valid invite with a signing key that is already registered surfaces
|
||||
// ErrUserExists AND spends the invite (both backends behave identically).
|
||||
func TestConsumeInvite_AlreadyRegistered(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
open func(t *testing.T) Store
|
||||
}{
|
||||
{"sqlite", func(t *testing.T) Store { return openTestStore(t) }},
|
||||
{"jetstream", func(t *testing.T) Store { s, _, _ := newKVStore(t); return s }},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
s := tc.open(t)
|
||||
signPub, kexPub := newIDHex(t)
|
||||
if err := s.AddUser(signPub, "existing", RoleMember); err != nil {
|
||||
t.Fatalf("seed user: %v", err)
|
||||
}
|
||||
inv, err := s.CreateInvite("dup", RoleMember, 3600)
|
||||
if err != nil {
|
||||
t.Fatalf("CreateInvite: %v", err)
|
||||
}
|
||||
if err := s.ConsumeInvite(inv.Token, signPub, kexPub); !errors.Is(err, ErrUserExists) {
|
||||
t.Fatalf("redeem with registered key should be ErrUserExists, got %v", err)
|
||||
}
|
||||
// The invite is spent (burn-on-claim): a fresh identity cannot reuse it.
|
||||
sp2, kp2 := newIDHex(t)
|
||||
if err := s.ConsumeInvite(inv.Token, sp2, kp2); !errors.Is(err, ErrInviteUsed) {
|
||||
t.Fatalf("invite should be spent after a burned claim, got %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -50,6 +50,7 @@ const (
|
||||
bucketByMember = "UNIBUS_rooms_by_member"
|
||||
bucketRoomKeys = "UNIBUS_room_keys"
|
||||
bucketUsers = "UNIBUS_users"
|
||||
bucketInvites = "UNIBUS_invites"
|
||||
defaultKVOpTime = 5 * time.Second
|
||||
)
|
||||
|
||||
@@ -71,6 +72,7 @@ type jetstreamStore struct {
|
||||
byMember jetstream.KeyValue
|
||||
keys jetstream.KeyValue
|
||||
users jetstream.KeyValue
|
||||
invites jetstream.KeyValue
|
||||
opTimeout time.Duration
|
||||
}
|
||||
|
||||
@@ -85,8 +87,18 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
|
||||
if opTimeout <= 0 {
|
||||
opTimeout = defaultKVOpTime
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
// Bootstrap budget for creating/opening the buckets. On a single node JetStream
|
||||
// is ready the instant the server starts, so the first attempt succeeds. On a
|
||||
// COLD multi-node cluster the JetStream meta-group must first elect a leader and
|
||||
// each node must establish contact with it before its $JS.API responds. A KV
|
||||
// op is a NATS request/reply: if it is published before the node's JetStream is
|
||||
// ready the request is dropped (not queued), and a single long-context call then
|
||||
// just blocks until it times out (issue 0006g). So we RETRY each bucket op with
|
||||
// short per-attempt contexts until it succeeds or the overall bootstrap budget
|
||||
// is exhausted; once the cluster is ready the next retry lands and the buckets
|
||||
// are created, after which they persist and every node opens them quickly.
|
||||
bootstrapBudget := 120 * time.Second
|
||||
deadline := time.Now().Add(bootstrapBudget)
|
||||
|
||||
s := &jetstreamStore{opTimeout: opTimeout}
|
||||
for _, b := range []struct {
|
||||
@@ -98,15 +110,29 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
|
||||
{bucketByMember, &s.byMember},
|
||||
{bucketRoomKeys, &s.keys},
|
||||
{bucketUsers, &s.users},
|
||||
{bucketInvites, &s.invites},
|
||||
} {
|
||||
kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
|
||||
Bucket: b.name,
|
||||
Replicas: cfg.Replicas,
|
||||
History: 1,
|
||||
Storage: jetstream.FileStorage,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d): %w", b.name, cfg.Replicas, err)
|
||||
var kv jetstream.KeyValue
|
||||
var lastErr error
|
||||
for {
|
||||
opCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
kv, lastErr = js.CreateOrUpdateKeyValue(opCtx, jetstream.KeyValueConfig{
|
||||
Bucket: b.name,
|
||||
Replicas: cfg.Replicas,
|
||||
History: 1,
|
||||
Storage: jetstream.FileStorage,
|
||||
})
|
||||
cancel()
|
||||
if lastErr == nil {
|
||||
break
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d) after %s: %w", b.name, cfg.Replicas, bootstrapBudget, lastErr)
|
||||
}
|
||||
// JetStream not ready yet (no meta leader / request dropped). Wait and
|
||||
// re-publish the op; in a cluster cold start this lands once the meta
|
||||
// group settles.
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
*b.dst = kv
|
||||
}
|
||||
@@ -475,6 +501,28 @@ func (s *jetstreamStore) RevokeUser(signPub string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteUser hard-deletes a user from the KV allowlist (the purge counterpart of
|
||||
// RevokeUser's status flip). It checks existence first so deleting an unknown key
|
||||
// is ErrNotFound (KV Delete is otherwise idempotent and would not signal a miss).
|
||||
// Only the allowlist key is removed; room memberships the ex-user holds become
|
||||
// inert because they can no longer authenticate — see the SQLite DeleteUser for
|
||||
// the full rationale on why room state is left untouched.
|
||||
func (s *jetstreamStore) DeleteUser(signPub string) error {
|
||||
signPub = normalizeSignPub(signPub)
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
if _, err := s.users.Get(ctx, signPub); err != nil {
|
||||
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
||||
return fmt.Errorf("membership: delete user %q: %w", signPub, ErrNotFound)
|
||||
}
|
||||
return fmt.Errorf("membership: delete user %q: %w", signPub, err)
|
||||
}
|
||||
if err := s.users.Delete(ctx, signPub); err != nil {
|
||||
return fmt.Errorf("membership: delete user %q: %w", signPub, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsAuthorized reports whether signPub is an active bus user. Any backend error
|
||||
// (including a KV quorum loss or timeout) yields false: fail closed.
|
||||
func (s *jetstreamStore) IsAuthorized(signPub string) bool {
|
||||
@@ -510,6 +558,173 @@ func (s *jetstreamStore) HasAdmin() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// ---- invites (single-use registration tokens) ----------------------------
|
||||
|
||||
func (s *jetstreamStore) CreateInvite(handle, role string, ttlSecs int) (Invite, error) {
|
||||
if handle == "" {
|
||||
return Invite{}, fmt.Errorf("membership: CreateInvite: handle required")
|
||||
}
|
||||
role, err := validateInviteRole(role)
|
||||
if err != nil {
|
||||
return Invite{}, err
|
||||
}
|
||||
token, err := newInviteToken()
|
||||
if err != nil {
|
||||
return Invite{}, err
|
||||
}
|
||||
now := time.Now().UTC()
|
||||
inv := Invite{
|
||||
Token: token,
|
||||
Handle: handle,
|
||||
Role: role,
|
||||
ExpiresAt: now.Add(inviteTTL(ttlSecs)).Format(time.RFC3339Nano),
|
||||
Used: false,
|
||||
CreatedAt: now.Format(time.RFC3339Nano),
|
||||
}
|
||||
b, err := json.Marshal(inv)
|
||||
if err != nil {
|
||||
return Invite{}, fmt.Errorf("membership: marshal invite: %w", err)
|
||||
}
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
// Create (not Put) so a token collision is rejected rather than silently
|
||||
// overwriting a live invite — a 32-byte random collision is astronomically
|
||||
// unlikely, but Create makes the single-use guarantee unconditional.
|
||||
if _, err := s.invites.Create(ctx, token, b); err != nil {
|
||||
if errors.Is(err, jetstream.ErrKeyExists) {
|
||||
return Invite{}, fmt.Errorf("membership: create invite: token collision")
|
||||
}
|
||||
return Invite{}, fmt.Errorf("membership: create invite: %w", err)
|
||||
}
|
||||
return inv, nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) GetInvite(token string) (Invite, error) {
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
e, err := s.invites.Get(ctx, token)
|
||||
if err != nil {
|
||||
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
||||
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, ErrNotFound)
|
||||
}
|
||||
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, err)
|
||||
}
|
||||
var inv Invite
|
||||
if err := json.Unmarshal(e.Value(), &inv); err != nil {
|
||||
return Invite{}, fmt.Errorf("membership: unmarshal invite: %w", err)
|
||||
}
|
||||
return inv, nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) ListInvites() ([]Invite, error) {
|
||||
ctx, cancel := s.ctx()
|
||||
w, err := s.invites.WatchAll(ctx, jetstream.IgnoreDeletes())
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("membership: list invites: %w", err)
|
||||
}
|
||||
defer cancel()
|
||||
defer w.Stop()
|
||||
var out []Invite
|
||||
for {
|
||||
select {
|
||||
case e := <-w.Updates():
|
||||
if e == nil {
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
if out[i].CreatedAt != out[j].CreatedAt {
|
||||
return out[i].CreatedAt > out[j].CreatedAt // newest first
|
||||
}
|
||||
return out[i].Token < out[j].Token
|
||||
})
|
||||
return out, nil
|
||||
}
|
||||
var inv Invite
|
||||
if err := json.Unmarshal(e.Value(), &inv); err != nil {
|
||||
return nil, fmt.Errorf("membership: unmarshal invite: %w", err)
|
||||
}
|
||||
out = append(out, inv)
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ConsumeInvite spends a KV invite and registers the presented signing key. With
|
||||
// no multi-key transaction, single-use is enforced by a compare-and-swap on the
|
||||
// invite: the token is marked used via Update against the revision read by Get,
|
||||
// so only ONE concurrent consumer can win the swap; the loser sees a revision
|
||||
// mismatch and is rejected as used. The user is registered AFTER the successful
|
||||
// swap. Burn-on-claim: if the signing key is already registered the swap has
|
||||
// already spent the token and we surface ErrUserExists — the SQLite store commits
|
||||
// the same way, so both backends behave identically.
|
||||
func (s *jetstreamStore) ConsumeInvite(token, signPub, kexPub string) error {
|
||||
signPub = normalizeSignPub(signPub)
|
||||
kexPub = normalizeSignPub(kexPub)
|
||||
if signPub == "" {
|
||||
return fmt.Errorf("membership: ConsumeInvite: sign_pub required")
|
||||
}
|
||||
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
e, err := s.invites.Get(ctx, token)
|
||||
if err != nil {
|
||||
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
||||
return fmt.Errorf("membership: consume invite %q: %w", token, ErrNotFound)
|
||||
}
|
||||
return fmt.Errorf("membership: consume invite %q: %w", token, err)
|
||||
}
|
||||
var inv Invite
|
||||
if err := json.Unmarshal(e.Value(), &inv); err != nil {
|
||||
return fmt.Errorf("membership: unmarshal invite: %w", err)
|
||||
}
|
||||
if inv.Used {
|
||||
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
|
||||
}
|
||||
if inviteIsExpired(inv.ExpiresAt) {
|
||||
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteExpired)
|
||||
}
|
||||
|
||||
inv.Used = true
|
||||
inv.UsedAt = nowRFC3339()
|
||||
inv.UsedSignPub = signPub
|
||||
inv.UsedKexPub = kexPub
|
||||
b, err := json.Marshal(inv)
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: marshal invite: %w", err)
|
||||
}
|
||||
// CAS: Update only succeeds if the invite is still at the revision we read, so
|
||||
// a racing consumer that already flipped it loses here. A failed swap is
|
||||
// conservatively treated as "already used" (the common cause); the caller can
|
||||
// re-read to learn the precise state.
|
||||
if _, err := s.invites.Update(ctx, token, b, e.Revision()); err != nil {
|
||||
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
|
||||
}
|
||||
|
||||
// Token is now spent. Register the user with the invite-fixed handle and role.
|
||||
if err := s.AddUser(signPub, inv.Handle, inv.Role); err != nil {
|
||||
if errors.Is(err, ErrUserExists) {
|
||||
return ErrUserExists
|
||||
}
|
||||
return fmt.Errorf("membership: consume invite %q: register user: %w", token, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *jetstreamStore) CancelInvite(token string) error {
|
||||
ctx, cancel := s.ctx()
|
||||
defer cancel()
|
||||
if _, err := s.invites.Get(ctx, token); err != nil {
|
||||
if errors.Is(err, jetstream.ErrKeyNotFound) {
|
||||
return fmt.Errorf("membership: cancel invite %q: %w", token, ErrNotFound)
|
||||
}
|
||||
return fmt.Errorf("membership: cancel invite %q: %w", token, err)
|
||||
}
|
||||
if err := s.invites.Delete(ctx, token); err != nil {
|
||||
return fmt.Errorf("membership: cancel invite %q: %w", token, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ---- snapshot import / export (issue 0003c migration) ---------------------
|
||||
|
||||
// importSnapshot writes a full Snapshot into the KV buckets, preserving each
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
-- 003_invites.sql — single-use registration invites (issue: user accounts / wallet model).
|
||||
--
|
||||
-- An admin mints an invite so a brand-new identity can join the bus allowlist
|
||||
-- WITHOUT the admin ever handling its private key. The token is the bearer
|
||||
-- secret that authorizes POST /register: the registering client generates its
|
||||
-- keypair locally and publishes only its public keys, fixing the link between an
|
||||
-- invite and the identity it creates via the audit columns below. The handle and
|
||||
-- role are fixed by the admin at mint time and cannot be changed by the client
|
||||
-- (no privilege escalation).
|
||||
--
|
||||
-- Additive and idempotent: safe to apply repeatedly. Never modify this file;
|
||||
-- further schema changes go in new numbered migrations (see
|
||||
-- .claude/rules/db_migrations.md). The embedded copy under
|
||||
-- pkg/membership/migrations/003_invites.sql mirrors this file byte-for-byte.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS invites (
|
||||
token TEXT PRIMARY KEY, -- 32 random bytes in lowercase hex (the bearer secret)
|
||||
handle TEXT NOT NULL, -- handle the new user will get (fixed by admin)
|
||||
role TEXT NOT NULL DEFAULT 'member', -- 'admin' | 'member' (fixed by admin)
|
||||
expires_at TEXT NOT NULL, -- RFC3339; past this the invite is dead
|
||||
used INTEGER NOT NULL DEFAULT 0, -- 0 pending, 1 consumed (single-use)
|
||||
created_at TEXT NOT NULL,
|
||||
used_at TEXT, -- RFC3339 when consumed (NULL until used)
|
||||
used_sign_pub TEXT, -- Ed25519 key that consumed it (audit; NULL until used)
|
||||
used_kex_pub TEXT -- X25519 key presented at registration (audit; NULL until used)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_invites_used ON invites(used);
|
||||
+406
-14
@@ -144,9 +144,11 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
now := time.Now()
|
||||
|
||||
// Per-IP rate limit runs first, ahead of auth and body reads, so a flood is
|
||||
// shed at the cheapest possible point. The health probe is exempt so liveness
|
||||
// checks are never throttled.
|
||||
if !isAuthExempt(r) && !s.limiter.allow(clientIP(r), now) {
|
||||
// shed at the cheapest possible point. ONLY the health probe is exempt so
|
||||
// liveness checks are never throttled — note this is isRateExempt, NOT
|
||||
// isAuthExempt: POST /register is auth-exempt (no admin signature) but stays
|
||||
// rate-limited, since it is the one un-signed path that mutates the allowlist.
|
||||
if !isRateExempt(r) && !s.limiter.allow(clientIP(r), now) {
|
||||
writeErr(w, http.StatusTooManyRequests, "rate limit exceeded")
|
||||
return
|
||||
}
|
||||
@@ -213,9 +215,12 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
writeErr(w, http.StatusUnauthorized, "unauthorized: "+err.Error())
|
||||
return
|
||||
}
|
||||
// Carry the authenticated signer's endpoint into the handler so room handlers
|
||||
// can authorize by membership (audit H3). Only set on a verified identity.
|
||||
s.mux.ServeHTTP(w, r.WithContext(withSigner(r.Context(), res.endpoint)))
|
||||
// Carry the authenticated signer's endpoint AND signing key into the handler.
|
||||
// Room handlers authorize by membership via the endpoint (audit H3); the
|
||||
// user-management handlers authorize by role via the signing key (the endpoint
|
||||
// id is a one-way hash of the key, so it cannot be reversed to look the signer
|
||||
// up in the user allowlist). Both are set only on a verified identity.
|
||||
s.mux.ServeHTTP(w, r.WithContext(withSigner(r.Context(), res.endpoint, res.pubHex)))
|
||||
}
|
||||
|
||||
// isBodyTooLarge reports whether err is the sentinel returned by MaxBytesReader
|
||||
@@ -229,11 +234,19 @@ func isBodyTooLarge(err error) bool {
|
||||
// values cannot collide with keys set by other packages.
|
||||
type ctxKey int
|
||||
|
||||
const ctxSignerEndpoint ctxKey = iota
|
||||
const (
|
||||
ctxSignerEndpoint ctxKey = iota
|
||||
ctxSignerPub
|
||||
)
|
||||
|
||||
// withSigner returns a context carrying the authenticated signer's endpoint id.
|
||||
func withSigner(ctx context.Context, endpoint string) context.Context {
|
||||
return context.WithValue(ctx, ctxSignerEndpoint, endpoint)
|
||||
// withSigner returns a context carrying the authenticated signer's endpoint id
|
||||
// and signing public key (lowercase hex). The endpoint authorizes room
|
||||
// membership; the signing key authorizes user-management by role, because the
|
||||
// endpoint id is a one-way hash of the key (base64url(sha256(signPub))) and so
|
||||
// cannot be reversed to look the signer up in the user allowlist.
|
||||
func withSigner(ctx context.Context, endpoint, pubHex string) context.Context {
|
||||
ctx = context.WithValue(ctx, ctxSignerEndpoint, endpoint)
|
||||
return context.WithValue(ctx, ctxSignerPub, pubHex)
|
||||
}
|
||||
|
||||
// signerEndpoint returns the authenticated signer's endpoint id and whether one
|
||||
@@ -245,6 +258,16 @@ func signerEndpoint(r *http.Request) (string, bool) {
|
||||
return v, ok && v != ""
|
||||
}
|
||||
|
||||
// signerPubHex returns the authenticated signer's signing public key (lowercase
|
||||
// hex) and whether one is present. Like signerEndpoint it is absent under
|
||||
// AuthOff and on a soft-mode pass-through; the user-management handlers treat
|
||||
// that absence as "no admin identity" and deny (default-deny), since a
|
||||
// privilege-granting operation must never run without a verified admin.
|
||||
func signerPubHex(r *http.Request) (string, bool) {
|
||||
v, ok := r.Context().Value(ctxSignerPub).(string)
|
||||
return v, ok && v != ""
|
||||
}
|
||||
|
||||
// requireMember authorizes a room request by membership (audit H3): it returns
|
||||
// the signer endpoint and true when the request may proceed, or writes 403 and
|
||||
// returns false when an authenticated signer is not a member of roomID. When no
|
||||
@@ -262,13 +285,54 @@ func (s *Server) requireMember(w http.ResponseWriter, r *http.Request, roomID st
|
||||
return signer, true
|
||||
}
|
||||
|
||||
// isAuthExempt lists requests that bypass control-plane auth even under enforce.
|
||||
// Only the unauthenticated health probe qualifies: it carries no data and is
|
||||
// needed by load balancers / smoke checks / systemd before any identity exists.
|
||||
func isAuthExempt(r *http.Request) bool {
|
||||
// requireAdmin authorizes a user-management request: it returns the signer's
|
||||
// signing-key hex and true ONLY when the authenticated signer is a user with
|
||||
// role admin and active status; otherwise it writes 403 and returns false.
|
||||
//
|
||||
// Default-deny, with no dev relaxation: unlike requireMember (which allows a
|
||||
// request when no authenticated signer is present, preserving AuthOff/dev
|
||||
// behavior for room reads), this denies whenever the signer is absent or is not
|
||||
// a verified active admin. The user-management endpoints grant and revoke bus
|
||||
// access, so they must never be reachable without a verified admin identity —
|
||||
// the store is consulted on every call so a just-revoked admin is denied
|
||||
// immediately, and any store error fails closed.
|
||||
func (s *Server) requireAdmin(w http.ResponseWriter, r *http.Request) (string, bool) {
|
||||
pubHex, ok := signerPubHex(r)
|
||||
if !ok {
|
||||
writeErr(w, http.StatusForbidden, "forbidden: admin role required")
|
||||
return "", false
|
||||
}
|
||||
u, err := s.store.GetUser(pubHex)
|
||||
if err != nil || u.Role != RoleAdmin || u.Status != StatusActive {
|
||||
writeErr(w, http.StatusForbidden, "forbidden: admin role required")
|
||||
return "", false
|
||||
}
|
||||
return pubHex, true
|
||||
}
|
||||
|
||||
// isRateExempt lists requests that bypass the per-IP rate limiter. Only the
|
||||
// health probe qualifies: a load balancer / systemd / smoke check polls it and
|
||||
// must never be throttled. Everything else — including POST /register — is rate
|
||||
// limited.
|
||||
func isRateExempt(r *http.Request) bool {
|
||||
return r.Method == http.MethodGet && r.URL.Path == "/healthz"
|
||||
}
|
||||
|
||||
// isAuthExempt lists requests that bypass control-plane signature auth even under
|
||||
// enforce. Two qualify:
|
||||
// - GET /healthz: carries no data, needed before any identity exists.
|
||||
// - POST /register: the wallet-model join path. The registering identity is not
|
||||
// yet in the allowlist, so it CANNOT produce an accepted admin signature;
|
||||
// authorization is the single-use bearer invite token, validated inside the
|
||||
// handler (ConsumeInvite). It stays rate-limited (see isRateExempt) and
|
||||
// strictly validates the hex keys before spending the token.
|
||||
func isAuthExempt(r *http.Request) bool {
|
||||
if r.Method == http.MethodGet && r.URL.Path == "/healthz" {
|
||||
return true
|
||||
}
|
||||
return r.Method == http.MethodPost && r.URL.Path == "/register"
|
||||
}
|
||||
|
||||
func (s *Server) routes() {
|
||||
s.mux.HandleFunc("GET /healthz", s.handleHealth)
|
||||
s.mux.HandleFunc("POST /rooms", s.handleCreateRoom)
|
||||
@@ -280,6 +344,23 @@ func (s *Server) routes() {
|
||||
s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom)
|
||||
s.mux.HandleFunc("POST /blobs", s.handlePutBlob)
|
||||
s.mux.HandleFunc("GET /blobs/{hash}", s.handleGetBlob)
|
||||
// User-management (admin-only) — the HTTP-signed equivalent of the local
|
||||
// `membershipd user` CLI, so the admin panel manages the bus allowlist by
|
||||
// signing as an admin instead of needing direct store/KV access. All three
|
||||
// pass through requireAdmin; they hit the same store the room handlers do.
|
||||
s.mux.HandleFunc("GET /users", s.handleListUsers)
|
||||
s.mux.HandleFunc("POST /users", s.handleAddUser)
|
||||
s.mux.HandleFunc("POST /users/{signpub}/revoke", s.handleRevokeUser)
|
||||
// Hard-delete (purge) a user — distinct from revoke (status flip). Admin-only.
|
||||
s.mux.HandleFunc("DELETE /users/{signpub}", s.handleDeleteUser)
|
||||
// Invites — the wallet-model account-creation path. The admin mints a
|
||||
// single-use link (POST /invites, admin-only); the new user's client redeems
|
||||
// it without an admin signature (POST /register, token-authorized). Listing
|
||||
// and cancelling a pending invite are admin-only.
|
||||
s.mux.HandleFunc("POST /invites", s.handleCreateInvite)
|
||||
s.mux.HandleFunc("GET /invites", s.handleListInvites)
|
||||
s.mux.HandleFunc("DELETE /invites/{token}", s.handleCancelInvite)
|
||||
s.mux.HandleFunc("POST /register", s.handleRegister)
|
||||
}
|
||||
|
||||
// ---- wire types -----------------------------------------------------------
|
||||
@@ -357,6 +438,67 @@ type blobResp struct {
|
||||
Hash string `json:"hash"`
|
||||
}
|
||||
|
||||
// userJSON is the wire representation of a bus user on the admin endpoints. It
|
||||
// carries the full record the panel needs to render the allowlist, including
|
||||
// status (so revoked users are visible) and the timestamps. revoked_at is
|
||||
// omitted for an active user.
|
||||
type userJSON struct {
|
||||
SignPub string `json:"sign_pub"`
|
||||
Handle string `json:"handle"`
|
||||
Role string `json:"role"`
|
||||
Status string `json:"status"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
RevokedAt string `json:"revoked_at,omitempty"`
|
||||
}
|
||||
|
||||
// addUserReq is the POST /users body: the new user's Ed25519 signing key
|
||||
// (64-hex), human handle, and role. role is optional and defaults to member.
|
||||
type addUserReq struct {
|
||||
SignPub string `json:"sign_pub"`
|
||||
Handle string `json:"handle"`
|
||||
Role string `json:"role"`
|
||||
}
|
||||
|
||||
// createInviteReq is the POST /invites body (admin-only): the handle and role the
|
||||
// future user will receive (fixed here, NOT chosen by the registering client) and
|
||||
// an optional TTL in seconds (non-positive uses the 7-day default).
|
||||
type createInviteReq struct {
|
||||
Handle string `json:"handle"`
|
||||
Role string `json:"role"`
|
||||
TTLSecs int `json:"ttl_secs"`
|
||||
}
|
||||
|
||||
// createInviteResp is the POST /invites reply: the bearer token to put in the
|
||||
// join link and its absolute expiry. The token is shown ONCE here; the admin
|
||||
// copies the link immediately.
|
||||
type createInviteResp struct {
|
||||
Token string `json:"token"`
|
||||
ExpiresAt string `json:"expires_at"`
|
||||
}
|
||||
|
||||
// inviteJSON is the wire representation of a pending invite on GET /invites. It
|
||||
// omits the audit fields (used_*) because the listing is of pending invites only;
|
||||
// used_at is carried so a client can render "expires in N".
|
||||
type inviteJSON struct {
|
||||
Token string `json:"token"`
|
||||
Handle string `json:"handle"`
|
||||
Role string `json:"role"`
|
||||
ExpiresAt string `json:"expires_at"`
|
||||
Used bool `json:"used"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
// registerReq is the POST /register body. It is the ONLY allowlist-mutating
|
||||
// request that carries no admin signature: the bearer Token authorizes it. The
|
||||
// client supplies its freshly-generated public keys (sign_pub = Ed25519 identity,
|
||||
// kex_pub = X25519 key-exchange), both 64-hex. The handle and role come from the
|
||||
// invite, never from this body — the client cannot escalate.
|
||||
type registerReq struct {
|
||||
Token string `json:"token"`
|
||||
SignPub string `json:"sign_pub"`
|
||||
KexPub string `json:"kex_pub"`
|
||||
}
|
||||
|
||||
// ---- helpers --------------------------------------------------------------
|
||||
|
||||
func writeJSON(w http.ResponseWriter, code int, v any) {
|
||||
@@ -674,3 +816,253 @@ func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write(data)
|
||||
}
|
||||
|
||||
// ---- user-management handlers (admin-only) --------------------------------
|
||||
|
||||
// handleListUsers returns the full bus allowlist, including revoked users, so an
|
||||
// admin sees the complete picture (a revoked identity stays auditable). Admin-only.
|
||||
func (s *Server) handleListUsers(w http.ResponseWriter, r *http.Request) {
|
||||
if _, ok := s.requireAdmin(w, r); !ok {
|
||||
return
|
||||
}
|
||||
users, err := s.store.ListUsers()
|
||||
if err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
out := make([]userJSON, 0, len(users))
|
||||
for _, u := range users {
|
||||
out = append(out, userJSON{
|
||||
SignPub: u.SignPub,
|
||||
Handle: u.Handle,
|
||||
Role: u.Role,
|
||||
Status: u.Status,
|
||||
CreatedAt: u.CreatedAt,
|
||||
RevokedAt: u.RevokedAt,
|
||||
})
|
||||
}
|
||||
writeJSON(w, http.StatusOK, out)
|
||||
}
|
||||
|
||||
// handleAddUser registers a new bus user from an admin-supplied Ed25519 signing
|
||||
// key. It mirrors the `membershipd user add` CLI: the key must be 64-hex, the
|
||||
// role must be admin or member (empty defaults to member), and re-adding an
|
||||
// already-registered key is a 409 that leaves the existing row untouched — no
|
||||
// silent upsert that could flip a role or clobber status. Admin-only.
|
||||
func (s *Server) handleAddUser(w http.ResponseWriter, r *http.Request) {
|
||||
if _, ok := s.requireAdmin(w, r); !ok {
|
||||
return
|
||||
}
|
||||
var req addUserReq
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
|
||||
return
|
||||
}
|
||||
if req.SignPub == "" || req.Handle == "" {
|
||||
writeErr(w, http.StatusBadRequest, "sign_pub and handle required")
|
||||
return
|
||||
}
|
||||
if err := ValidateSignPubHex(req.SignPub); err != nil {
|
||||
writeErr(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
role := req.Role
|
||||
if role == "" {
|
||||
role = RoleMember
|
||||
}
|
||||
if role != RoleAdmin && role != RoleMember {
|
||||
writeErr(w, http.StatusBadRequest,
|
||||
fmt.Sprintf("invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember))
|
||||
return
|
||||
}
|
||||
if err := s.store.AddUser(req.SignPub, req.Handle, role); err != nil {
|
||||
if errors.Is(err, ErrUserExists) {
|
||||
// Idempotency contract (mirrors the CLI): re-adding a key is an explicit,
|
||||
// non-destructive conflict. To replace a user, revoke then add again.
|
||||
writeErr(w, http.StatusConflict,
|
||||
"user already registered (unchanged); revoke it first to replace")
|
||||
return
|
||||
}
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusCreated, map[string]string{"status": "added"})
|
||||
}
|
||||
|
||||
// handleRevokeUser revokes a bus user by signing key. Revocation is a status
|
||||
// flip (no hard delete) so the identity stays auditable and IsAuthorized denies
|
||||
// it on both planes immediately. Revoking an unknown or already-revoked key is a
|
||||
// 404. Admin-only.
|
||||
func (s *Server) handleRevokeUser(w http.ResponseWriter, r *http.Request) {
|
||||
if _, ok := s.requireAdmin(w, r); !ok {
|
||||
return
|
||||
}
|
||||
signPub := r.PathValue("signpub")
|
||||
if err := ValidateSignPubHex(signPub); err != nil {
|
||||
writeErr(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
if err := s.store.RevokeUser(signPub); err != nil {
|
||||
writeServerErr(w, r, http.StatusNotFound, "no active user with that key", err)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "revoked"})
|
||||
}
|
||||
|
||||
// handleDeleteUser hard-deletes a bus user by signing key — the purge that the
|
||||
// admin panel's "Eliminar" (permanent) action maps to, distinct from revoke's
|
||||
// status flip. The row is removed entirely (no audit trail kept); use revoke when
|
||||
// an auditable record must remain. Deleting an unknown key is a 404. Admin-only.
|
||||
//
|
||||
// Security note: like revoke, this does NOT special-case the last admin — an
|
||||
// admin can delete the final admin and lock the HTTP user-management surface. The
|
||||
// recovery seam is the local `membershipd user add` CLI (which re-seeds an admin
|
||||
// directly against the store), the same chicken-egg breaker that seeds the first
|
||||
// admin.
|
||||
func (s *Server) handleDeleteUser(w http.ResponseWriter, r *http.Request) {
|
||||
if _, ok := s.requireAdmin(w, r); !ok {
|
||||
return
|
||||
}
|
||||
signPub := r.PathValue("signpub")
|
||||
if err := ValidateSignPubHex(signPub); err != nil {
|
||||
writeErr(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
if err := s.store.DeleteUser(signPub); err != nil {
|
||||
if errors.Is(err, ErrNotFound) {
|
||||
writeErr(w, http.StatusNotFound, "no user with that key")
|
||||
return
|
||||
}
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "deleted"})
|
||||
}
|
||||
|
||||
// ---- invite handlers ------------------------------------------------------
|
||||
|
||||
// handleCreateInvite mints a single-use registration invite. The handle and role
|
||||
// are fixed here by the admin; the role is validated (admin|member, empty ->
|
||||
// member) so an unknown role is a clean 400 rather than an opaque 500. The reply
|
||||
// carries the bearer token and its expiry — the admin turns the token into the
|
||||
// join link. Admin-only.
|
||||
func (s *Server) handleCreateInvite(w http.ResponseWriter, r *http.Request) {
|
||||
if _, ok := s.requireAdmin(w, r); !ok {
|
||||
return
|
||||
}
|
||||
var req createInviteReq
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
|
||||
return
|
||||
}
|
||||
if req.Handle == "" {
|
||||
writeErr(w, http.StatusBadRequest, "handle required")
|
||||
return
|
||||
}
|
||||
if req.Role != "" && req.Role != RoleAdmin && req.Role != RoleMember {
|
||||
writeErr(w, http.StatusBadRequest,
|
||||
fmt.Sprintf("invalid role %q (want %q or %q)", req.Role, RoleAdmin, RoleMember))
|
||||
return
|
||||
}
|
||||
inv, err := s.store.CreateInvite(req.Handle, req.Role, req.TTLSecs)
|
||||
if err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusCreated, createInviteResp{Token: inv.Token, ExpiresAt: inv.ExpiresAt})
|
||||
}
|
||||
|
||||
// handleListInvites returns the PENDING invites (not yet used and not expired), so
|
||||
// the admin panel shows only live links worth copying. Consumed/expired invites
|
||||
// are filtered out here rather than at the store, which exposes the full set for
|
||||
// other callers. Admin-only.
|
||||
func (s *Server) handleListInvites(w http.ResponseWriter, r *http.Request) {
|
||||
if _, ok := s.requireAdmin(w, r); !ok {
|
||||
return
|
||||
}
|
||||
invites, err := s.store.ListInvites()
|
||||
if err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
out := make([]inviteJSON, 0, len(invites))
|
||||
for _, inv := range invites {
|
||||
if inv.Used || inviteIsExpired(inv.ExpiresAt) {
|
||||
continue // pending only
|
||||
}
|
||||
out = append(out, inviteJSON{
|
||||
Token: inv.Token,
|
||||
Handle: inv.Handle,
|
||||
Role: inv.Role,
|
||||
ExpiresAt: inv.ExpiresAt,
|
||||
Used: inv.Used,
|
||||
CreatedAt: inv.CreatedAt,
|
||||
})
|
||||
}
|
||||
writeJSON(w, http.StatusOK, out)
|
||||
}
|
||||
|
||||
// handleCancelInvite cancels (hard-deletes) a pending invite, so an admin can
|
||||
// revoke a link before it is redeemed. Cancelling an unknown token is a 404.
|
||||
// Admin-only.
|
||||
func (s *Server) handleCancelInvite(w http.ResponseWriter, r *http.Request) {
|
||||
if _, ok := s.requireAdmin(w, r); !ok {
|
||||
return
|
||||
}
|
||||
token := r.PathValue("token")
|
||||
if token == "" {
|
||||
writeErr(w, http.StatusBadRequest, "token required")
|
||||
return
|
||||
}
|
||||
if err := s.store.CancelInvite(token); err != nil {
|
||||
if errors.Is(err, ErrNotFound) {
|
||||
writeErr(w, http.StatusNotFound, "no such invite")
|
||||
return
|
||||
}
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "cancelled"})
|
||||
}
|
||||
|
||||
// handleRegister redeems an invite: the wallet-model join path. It is auth-exempt
|
||||
// (no admin signature; see isAuthExempt) but rate-limited and strictly validated.
|
||||
// The client presents the single-use token plus its freshly-generated public keys
|
||||
// (sign_pub Ed25519, kex_pub X25519). Both keys are validated as 64-hex BEFORE the
|
||||
// token is spent, the handle and role come from the invite (never this body), and
|
||||
// ConsumeInvite enforces single-use atomically. Errors map to precise codes so a
|
||||
// client can tell "unknown" from "used" from "expired".
|
||||
func (s *Server) handleRegister(w http.ResponseWriter, r *http.Request) {
|
||||
var req registerReq
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
|
||||
return
|
||||
}
|
||||
if req.Token == "" {
|
||||
writeErr(w, http.StatusBadRequest, "token required")
|
||||
return
|
||||
}
|
||||
if err := ValidateSignPubHex(req.SignPub); err != nil {
|
||||
writeErr(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
if err := ValidateKexPubHex(req.KexPub); err != nil {
|
||||
writeErr(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
err := s.store.ConsumeInvite(req.Token, req.SignPub, req.KexPub)
|
||||
switch {
|
||||
case err == nil:
|
||||
writeJSON(w, http.StatusCreated, map[string]string{"status": "registered"})
|
||||
case errors.Is(err, ErrNotFound):
|
||||
writeErr(w, http.StatusNotFound, "invalid or unknown invite token")
|
||||
case errors.Is(err, ErrInviteUsed):
|
||||
writeErr(w, http.StatusConflict, "invite already used")
|
||||
case errors.Is(err, ErrInviteExpired):
|
||||
writeErr(w, http.StatusGone, "invite expired")
|
||||
case errors.Is(err, ErrUserExists):
|
||||
writeErr(w, http.StatusConflict, "identity already registered")
|
||||
default:
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,9 +80,23 @@ type Store interface {
|
||||
GetUser(signPub string) (User, error)
|
||||
ListUsers() ([]User, error)
|
||||
RevokeUser(signPub string) error
|
||||
// DeleteUser hard-deletes a user (the purge counterpart of RevokeUser's
|
||||
// status flip): the row is removed, not just flagged. The ex-user can no
|
||||
// longer authenticate, so any room memberships they hold become inert.
|
||||
DeleteUser(signPub string) error
|
||||
IsAuthorized(signPub string) bool
|
||||
HasAdmin() bool
|
||||
|
||||
// Invites (single-use registration tokens; the wallet-model join path).
|
||||
// CreateInvite mints a token fixing handle+role; ConsumeInvite is the only
|
||||
// path that adds to the allowlist without an admin signature (the bearer
|
||||
// token is the authorization), spending the token exactly once.
|
||||
CreateInvite(handle, role string, ttlSecs int) (Invite, error)
|
||||
GetInvite(token string) (Invite, error)
|
||||
ListInvites() ([]Invite, error)
|
||||
ConsumeInvite(token, signPub, kexPub string) error
|
||||
CancelInvite(token string) error
|
||||
|
||||
// Lifecycle.
|
||||
Close() error
|
||||
}
|
||||
|
||||
+67
-2
@@ -2,6 +2,7 @@ package membership
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
@@ -35,6 +36,40 @@ type User struct {
|
||||
RevokedAt string // empty unless revoked
|
||||
}
|
||||
|
||||
// ValidateSignPubHex ensures signPub is exactly a 32-byte Ed25519 public key in
|
||||
// hex (64 hex chars). It is the single source of truth for that check, shared by
|
||||
// the local admin CLI (which validates before seeding the first admin) and the
|
||||
// HTTP user-management handlers (which validate an admin-supplied key before it
|
||||
// reaches the store). Catching a malformed key here turns a silent "authorized
|
||||
// nobody" into an explicit error at the boundary.
|
||||
func ValidateSignPubHex(signPub string) error {
|
||||
b, err := hex.DecodeString(signPub)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sign-pub is not valid hex: %w", err)
|
||||
}
|
||||
if len(b) != 32 {
|
||||
return fmt.Errorf("sign-pub must be a 32-byte Ed25519 public key (64 hex chars), got %d bytes", len(b))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateKexPubHex ensures kexPub is exactly a 32-byte X25519 public key in hex
|
||||
// (64 hex chars). It is the registration-side counterpart of ValidateSignPubHex:
|
||||
// POST /register receives both the new identity's signing key and its key-exchange
|
||||
// key, and both must be well-formed before the invite is consumed. An X25519
|
||||
// public key is 32 bytes, identical in length to Ed25519, so the check is the
|
||||
// same shape with a key-exchange-specific message.
|
||||
func ValidateKexPubHex(kexPub string) error {
|
||||
b, err := hex.DecodeString(kexPub)
|
||||
if err != nil {
|
||||
return fmt.Errorf("kex-pub is not valid hex: %w", err)
|
||||
}
|
||||
if len(b) != 32 {
|
||||
return fmt.Errorf("kex-pub must be a 32-byte X25519 public key (64 hex chars), got %d bytes", len(b))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// normalizeSignPub lowercases the hex key so lookups are case-insensitive: the
|
||||
// primary key is stored lowercase and every query normalizes its input the same
|
||||
// way, so a caller passing uppercase hex still matches.
|
||||
@@ -72,8 +107,10 @@ func (s *sqliteStore) AddUser(signPub, handle, role string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetUser returns the user with the given signing public key. It returns
|
||||
// sql.ErrNoRows (wrapped) when there is no such user.
|
||||
// GetUser returns the user with the given signing public key. A miss returns
|
||||
// ErrNotFound (wrapped), matching the storage-agnostic contract in store.go and
|
||||
// the JetStream backend, so callers can branch on ErrNotFound regardless of which
|
||||
// store is active (the SQLite-specific sql.ErrNoRows is mapped here).
|
||||
func (s *sqliteStore) GetUser(signPub string) (User, error) {
|
||||
signPub = normalizeSignPub(signPub)
|
||||
var u User
|
||||
@@ -83,6 +120,9 @@ func (s *sqliteStore) GetUser(signPub string) (User, error) {
|
||||
signPub,
|
||||
).Scan(&u.SignPub, &u.Handle, &u.Role, &u.Status, &u.CreatedAt, &revoked)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, ErrNotFound)
|
||||
}
|
||||
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, err)
|
||||
}
|
||||
u.RevokedAt = revoked.String
|
||||
@@ -135,6 +175,31 @@ func (s *sqliteStore) RevokeUser(signPub string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteUser hard-deletes a user from the allowlist (admin "remove user", the
|
||||
// purge counterpart of RevokeUser's status flip). It removes ONLY the allowlist
|
||||
// row: the ex-user can no longer authenticate on either plane, so any room
|
||||
// memberships they still hold become inert (they cannot fetch a sealed key, sign
|
||||
// a request, or open a NATS connection). We deliberately do NOT chase down and
|
||||
// rewrite those room memberships here — that would be a partial, racy cleanup of
|
||||
// state owned by each room's owner; a room owner kicks/rekeys to achieve forward
|
||||
// secrecy when needed. Deleting an unknown key returns ErrNotFound (wrapped) so
|
||||
// the HTTP layer can answer 404.
|
||||
func (s *sqliteStore) DeleteUser(signPub string) error {
|
||||
signPub = normalizeSignPub(signPub)
|
||||
res, err := s.db.Exec(`DELETE FROM users WHERE sign_pub = ?`, signPub)
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: delete user %q: %w", signPub, err)
|
||||
}
|
||||
n, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("membership: delete user %q: rows affected: %w", signPub, err)
|
||||
}
|
||||
if n == 0 {
|
||||
return fmt.Errorf("membership: delete user %q: %w", signPub, ErrNotFound)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsAuthorized reports whether signPub belongs to an active (non-revoked) bus
|
||||
// user. It is the single authorization predicate consulted by both the control
|
||||
// plane (HTTP request middleware) and the data plane (NATS nkey authenticator),
|
||||
|
||||
@@ -0,0 +1,164 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
)
|
||||
|
||||
// signedJSON is signedReq for a JSON body: it marshals v and signs the request
|
||||
// as id with a distinct nonce. It returns the response status and body, reusing
|
||||
// the auth_test harness so these tests exercise the real signed wire contract.
|
||||
func signedJSON(t *testing.T, h *authHarness, method, path string, v any, id cs.Identity, n int) (int, string) {
|
||||
t.Helper()
|
||||
var body []byte
|
||||
if v != nil {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal body: %v", err)
|
||||
}
|
||||
body = b
|
||||
}
|
||||
return do(t, signedReq(t, h.ts.URL, method, path, body, id, time.Now().Unix(), nonceN(n)))
|
||||
}
|
||||
|
||||
// TestUsersHTTP_NonAdminForbidden is the security spine: a REGISTERED but
|
||||
// non-admin signer (bob, role member) is denied on every user-management
|
||||
// endpoint. His signature clears auth (he is in the allowlist), so each request
|
||||
// reaches the handler, where requireAdmin returns 403 — default-deny by role.
|
||||
func TestUsersHTTP_NonAdminForbidden(t *testing.T) {
|
||||
h := newAuthHarness(t, AuthEnforce)
|
||||
|
||||
bob, _ := cs.GenerateIdentity()
|
||||
register(t, h, bob, "bob") // role member (see register in authz_test.go)
|
||||
bobPub := hex.EncodeToString(bob.SignPub)
|
||||
|
||||
victim, _ := cs.GenerateIdentity()
|
||||
victimPub := hex.EncodeToString(victim.SignPub)
|
||||
|
||||
checks := []struct {
|
||||
name string
|
||||
method string
|
||||
path string
|
||||
body any
|
||||
}{
|
||||
{"list users", "GET", "/users", nil},
|
||||
{"add user", "POST", "/users", addUserReq{SignPub: victimPub, Handle: "mallory", Role: RoleMember}},
|
||||
{"revoke user", "POST", "/users/" + bobPub + "/revoke", nil},
|
||||
}
|
||||
for i, c := range checks {
|
||||
code, body := signedJSON(t, h, c.method, c.path, c.body, bob, i+1)
|
||||
if code != http.StatusForbidden {
|
||||
t.Fatalf("non-admin %s should be 403, got %d (%s)", c.name, code, body)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestUsersHTTP_AdminRoundtrip exercises the golden path end to end: alice (the
|
||||
// seeded admin) adds carol, sees her in the list as active, revokes her, then
|
||||
// sees her status flip to revoked (no hard delete — she stays in the list).
|
||||
func TestUsersHTTP_AdminRoundtrip(t *testing.T) {
|
||||
h := newAuthHarness(t, AuthEnforce)
|
||||
|
||||
carol, _ := cs.GenerateIdentity()
|
||||
carolPub := hex.EncodeToString(carol.SignPub)
|
||||
|
||||
// Add carol as a member.
|
||||
if code, body := signedJSON(t, h, "POST", "/users",
|
||||
addUserReq{SignPub: carolPub, Handle: "carol", Role: RoleMember}, h.alice, 1); code != http.StatusCreated {
|
||||
t.Fatalf("admin add carol should be 201, got %d (%s)", code, body)
|
||||
}
|
||||
|
||||
// List: carol present and active; alice (the seed admin) also present.
|
||||
users := listUsers(t, h, 2)
|
||||
carolRow, ok := findUser(users, carolPub)
|
||||
if !ok {
|
||||
t.Fatalf("carol missing from list after add: %+v", users)
|
||||
}
|
||||
if carolRow.Status != StatusActive || carolRow.Role != RoleMember || carolRow.Handle != "carol" {
|
||||
t.Fatalf("carol row wrong after add: %+v", carolRow)
|
||||
}
|
||||
if _, ok := findUser(users, h.alicePub); !ok {
|
||||
t.Fatalf("seeded admin alice missing from list: %+v", users)
|
||||
}
|
||||
|
||||
// Revoke carol.
|
||||
if code, body := signedJSON(t, h, "POST", "/users/"+carolPub+"/revoke", nil, h.alice, 3); code != http.StatusOK {
|
||||
t.Fatalf("admin revoke carol should be 200, got %d (%s)", code, body)
|
||||
}
|
||||
|
||||
// List again: carol still present, now revoked (status flip, not delete).
|
||||
users = listUsers(t, h, 4)
|
||||
carolRow, ok = findUser(users, carolPub)
|
||||
if !ok {
|
||||
t.Fatalf("carol vanished from list after revoke (should be a status flip): %+v", users)
|
||||
}
|
||||
if carolRow.Status != StatusRevoked {
|
||||
t.Fatalf("carol should be revoked, got status %q", carolRow.Status)
|
||||
}
|
||||
}
|
||||
|
||||
// TestUsersHTTP_Validation covers the input-validation contract: a malformed hex
|
||||
// key is 400, an unknown role is 400, and re-adding an already-registered key is
|
||||
// 409 (the existing row is left untouched — no silent upsert).
|
||||
func TestUsersHTTP_Validation(t *testing.T) {
|
||||
h := newAuthHarness(t, AuthEnforce)
|
||||
|
||||
good, _ := cs.GenerateIdentity()
|
||||
goodPub := hex.EncodeToString(good.SignPub)
|
||||
|
||||
// Invalid hex (too short) -> 400.
|
||||
if code, body := signedJSON(t, h, "POST", "/users",
|
||||
addUserReq{SignPub: "abcd", Handle: "shorty", Role: RoleMember}, h.alice, 1); code != http.StatusBadRequest {
|
||||
t.Fatalf("malformed sign_pub should be 400, got %d (%s)", code, body)
|
||||
}
|
||||
|
||||
// Invalid role -> 400.
|
||||
if code, body := signedJSON(t, h, "POST", "/users",
|
||||
addUserReq{SignPub: goodPub, Handle: "weirdrole", Role: "superuser"}, h.alice, 2); code != http.StatusBadRequest {
|
||||
t.Fatalf("invalid role should be 400, got %d (%s)", code, body)
|
||||
}
|
||||
|
||||
// Re-adding the seeded admin's own key -> 409 (idempotency, no overwrite).
|
||||
if code, body := signedJSON(t, h, "POST", "/users",
|
||||
addUserReq{SignPub: h.alicePub, Handle: "alice-again", Role: RoleMember}, h.alice, 3); code != http.StatusConflict {
|
||||
t.Fatalf("re-adding an existing key should be 409, got %d (%s)", code, body)
|
||||
}
|
||||
// And the existing row is untouched: alice is still an active admin.
|
||||
u, err := h.store.GetUser(h.alicePub)
|
||||
if err != nil {
|
||||
t.Fatalf("get alice after conflicting re-add: %v", err)
|
||||
}
|
||||
if u.Role != RoleAdmin || u.Status != StatusActive || u.Handle != "alice" {
|
||||
t.Fatalf("conflicting re-add mutated the existing row: %+v", u)
|
||||
}
|
||||
}
|
||||
|
||||
// listUsers signs a GET /users as alice and decodes the response.
|
||||
func listUsers(t *testing.T, h *authHarness, n int) []userJSON {
|
||||
t.Helper()
|
||||
code, body := signedJSON(t, h, "GET", "/users", nil, h.alice, n)
|
||||
if code != http.StatusOK {
|
||||
t.Fatalf("admin list users should be 200, got %d (%s)", code, body)
|
||||
}
|
||||
var users []userJSON
|
||||
if err := json.Unmarshal([]byte(body), &users); err != nil {
|
||||
t.Fatalf("decode users: %v (%s)", err, body)
|
||||
}
|
||||
return users
|
||||
}
|
||||
|
||||
// findUser returns the row with the given signing key (case-insensitive).
|
||||
func findUser(users []userJSON, signPub string) (userJSON, bool) {
|
||||
want := normalizeSignPub(signPub)
|
||||
for _, u := range users {
|
||||
if normalizeSignPub(u.SignPub) == want {
|
||||
return u, true
|
||||
}
|
||||
}
|
||||
return userJSON{}, false
|
||||
}
|
||||
Reference in New Issue
Block a user