2 Commits

Author SHA1 Message Date
agent 5ea8fa1c20 feat(web): wire the SPA to the live bus via the gateway (drop mock)
Replace the mock data source with a real data layer that talks to the webgw
gateway over REST + SSE. The UI components keep their look and props; only
where the data comes from changed.

- src/api.ts: the single repository layer. fetch wrappers (same-origin cookie)
  for login/logout/me and rooms list/create/join/send, plus streamRoom() which
  opens an EventSource and yields each decrypted message. Wire->UI mappers
  (roomFromWire, messageFromWire).
- src/types.ts: add the gateway wire shapes (MeInfo, RoomWire, MsgWire) next to
  the existing UI types.
- App.tsx: probe /api/me on mount to resume an existing session; otherwise show
  Login. Logout calls the gateway.
- Login.tsx: the password field now unlocks the gateway session (operator
  passphrase); shows a basic error and a loading state. Wallet-per-browser is
  phase 2.
- ChatShell.tsx: load rooms from /api/rooms with loading / empty / error states;
  same Flex layout.
- ChatPanel.tsx: stream messages over SSE for the active room (dedup by id),
  composer sends through the gateway; no optimistic insert (the peer's own echo
  returns over SSE with the real frame id).
- vite.config.ts: dev proxy /api (REST + SSE) -> the gateway on :8481.

mock.ts is left untouched (no longer imported) to avoid churn with the parallel
styling work on master.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 21:14:19 +02:00
agent fb8a03cf0c feat(webgw): web gateway peer (REST + SSE) for the chat SPA
Add cmd/webgw: a single Go binary that holds the operator's bus identity,
connects to the bus as a real authenticated peer (pkg/client), and exposes a
small REST + SSE API the browser consumes. The browser never signs, never
speaks NATS, and never sees a private key.

Endpoints (all under /api, gated by a session cookie except login):
  POST /api/login            unlock a session with the operator passphrase
  POST /api/logout
  GET  /api/me               operator identity the gateway acts as
  GET  /api/rooms            ListMyRooms
  POST /api/rooms            CreateRoom (default policy: encrypted+persisted+signed)
  POST /api/rooms/{id}/join  Join (fetch room key)
  POST /api/rooms/{id}/send  Publish (sealed + signed by the peer)
  GET  /api/rooms/{id}/stream  SSE of decrypted frames (history then live)

Design notes:
- One fan-out hub per room: a single bus subscription is multiplexed to N SSE
  clients, avoiding the per-(room,endpoint) durable-consumer contention that
  multiple Subscribe calls would cause.
- Posture seam mirrors unibus_admin/clientcheck: empty --ca = plaintext dev,
  non-empty = TLS+nkey on both planes; RefreshSession after a membership change
  only under the secured (ACL) posture.
- Identity loaded from `pass` or a 0600 file, held only in memory.
- Session auth: passphrase compared in constant time; opaque HttpOnly cookie
  so EventSource (which cannot set headers) can authenticate the stream.

TRUST MODEL: room content stays end-to-end encrypted on the bus. The gateway
reads plaintext only because it acts AS the operator's client — a legitimate
member of each room holding the room key. The per-browser wallet (WebCrypto)
that moves decryption into the browser is phase 2.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 21:14:08 +02:00
29 changed files with 1317 additions and 1851 deletions
-4
View File
@@ -14,7 +14,3 @@ worker.id
/chat /chat
*.exe *.exe
registry.db registry.db
# local worktree resolution (do not commit)
go.work
go.work.sum
+1 -70
View File
@@ -2,7 +2,7 @@
name: unibus name: unibus
lang: go lang: go
domain: infra domain: infra
version: 0.12.0 version: 0.10.0
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo." description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
tags: [service, messaging, nats, e2e] tags: [service, messaging, nats, e2e]
uses_functions: uses_functions:
@@ -137,26 +137,6 @@ Para apuntar a un NATS externo en producción: `--nats-url nats://host:4222` en
firme el primer `POST /users`); a partir de ahí toda la gestión es HTTP admin-only. El firme el primer `POST /users`); a partir de ahí toda la gestión es HTTP admin-only. El
alta es idempotente igual que la CLI: re-alta de una clave ya registrada = 409, sin alta es idempotente igual que la CLI: re-alta de una clave ya registrada = 409, sin
sobrescribir ni elevar rol; el revoke es un flip de status (sin hard-delete), auditable. sobrescribir ni elevar rol; el revoke es un flip de status (sin hard-delete), auditable.
- **Cuentas estilo WhatsApp: alta por invitación, baja por hard-delete.** Sobre la API
admin anterior, `unibus` añade el modelo wallet de cuentas. El admin NO genera claves:
`POST /invites` (admin-only) acuña un enlace de invitación de un solo uso con caducidad
(token de 32 bytes `crypto/rand` en hex; TTL default 7 días), fijando `handle` y `role`.
El nuevo usuario abre el enlace en SU cliente, que genera el par de claves localmente
(la privada nunca sale del dispositivo) y llama `POST /register` con `{token, sign_pub,
kex_pub}`. `/register` es la ÚNICA ruta que añade al allowlist sin firma admin —
autorizada por el TOKEN, porque la identidad nueva aún no está en el allowlist y no puede
firmar. Está endurecida: token fuerte de un solo uso (consumo atómico, doble uso → 409),
caducidad (→ 410), `handle`/`role` fijados por el invite (sin escalado), validación
estricta de ambas claves hex de 64 chars, y rate-limit por IP heredado del control plane
(solo `/healthz` está exento). El borrado de cuenta es `DELETE /users/{signpub}`
(admin-only): hard-delete real del allowlist, distinto del `revoke` (que se mantiene:
revoke = quitar acceso dejando rastro auditable; delete = purga). Tras hard-delete, las
membresías de rooms del ex-usuario quedan inertes (ya no puede autenticarse en ningún
plano); NO se limpian a medias — un owner expulsa/rekey su room si quiere forward secrecy.
Invites y users viven en el MISMO store (SQLite `invites`/`users`, KV `UNIBUS_invites`/
`UNIBUS_users`). `pkg/client` gana `CreateInvite/ListInvites/CancelInvite/Register/
DeleteUser`; solo `Register` va sin firmar. Recovery: hard-delete del último admin se
recupera con la CLI local `membershipd user add` (mismo seam que siembra el admin #0).
- **Identidad = secreto crítico.** El archivo de identidad (`worker.id`, - **Identidad = secreto crítico.** El archivo de identidad (`worker.id`,
`chat.id`) contiene las claves privadas (Ed25519 + X25519). Se escribe 0600. `chat.id`) contiene las claves privadas (Ed25519 + X25519). Se escribe 0600.
Perderlo = mensajes ilegibles, sin recuperación. Trátalo como una clave SSH. Perderlo = mensajes ilegibles, sin recuperación. Trátalo como una clave SSH.
@@ -189,55 +169,6 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
## Capability growth log ## Capability growth log
- v0.12.0 (2026-06-07) — capa de CUENTAS estilo WhatsApp sobre el modelo wallet: alta de
usuario por enlace de invitación de un solo uso + baja por hard-delete real. El admin
nunca ve la clave privada del usuario. (1) **Invites**: nuevo backend de datos en ambos
stores (SQLite `invites` vía migración aditiva `003_invites.sql`; KV `UNIBUS_invites`).
Tipo `Invite{Token, Handle, Role, ExpiresAt, Used, CreatedAt}` + campos de auditoría del
consumo (`UsedAt/UsedSignPub/UsedKexPub`). Métodos `Store.CreateInvite` (token 32 bytes
`crypto/rand` hex, TTL default 7d), `GetInvite`, `ListInvites`, `ConsumeInvite` (valida
existe/no-usado/no-caducado → registra el sign_pub con el handle/role del invite → marca
usado, atómico) y `CancelInvite`. Consumo single-use garantizado en ambos backends: tx
SQLite (mark guard `used=0` + insert) y CAS sobre la revisión KV (mark-first); burn-on-
claim idéntico si la clave ya existe. (2) **Hard-delete**: `Store.DeleteUser` (SQLite
`DELETE FROM users`, KV `users.Delete`) purga el allowlist — distinto del `revoke`
(status flip, conservado). Las membresías de rooms del ex-usuario quedan inertes
(documentado, sin limpieza parcial). (3) **Endpoints HTTP**: `POST /invites`, `GET
/invites` (solo pendientes), `DELETE /invites/{token}`, `DELETE /users/{signpub}`
(todos admin-only vía `requireAdmin`) y `POST /register` — la única ruta auth-exempt de
firma admin (autorizada por el token), rate-limited (se separa `isRateExempt`, solo
`/healthz`, de `isAuthExempt`) y con validación hex estricta de `sign_pub`+`kex_pub`
ANTES de gastar el token. Errores mapeados: token desconocido 404, usado 409, caducado
410, identidad ya registrada 409. (4) **pkg/client**: `CreateInvite/ListInvites/
CancelInvite/Register/DeleteUser`; `Register` va sin firma vía un helper `doUnsigned`.
(5) Fix de consistencia: el `GetUser` de SQLite ahora mapea `sql.ErrNoRows``ErrNotFound`
como el KV y como documenta `store.go`. Tests nuevos: suite de invites store-level en
AMBOS backends (golden + single-use + token desconocido + caducado + cancel + hard-delete
+ burn-on-claim), suite HTTP (crear invite → register sin auth → aparece en allowlist →
re-register 409 → caducado 410 → no-admin 403 en las 4 rutas admin → hard-delete purga),
y test de cliente end-to-end (admin acuña invite → joiner no-registrado redime sin firma →
aparece → hard-delete desaparece). Cambios 100% aditivos: el comportamiento previo no
cambia; build/vet/test verdes (`CGO_ENABLED=0`).
- v0.11.0 (2026-06-07) — flag dedicado `UNIBUS_NATS_MONITOR` que abre el endpoint
de monitoring HTTP del nats-server embebido (`127.0.0.1:8222`, loopback only) de
forma DESACOPLADA del debug-log. Antes el monitoring solo se abría con
`UNIBUS_NATS_DEBUG=1`, que además encendía el log verboso del nats-server
(rutas/RAFT/subjects a journald en claro) — incompatible con el endurecimiento
del issue 0007. El cómputo de los toggles se extrae a una función pura
`natsLogOpts(debugEnv, monitorEnv) (noLog, debug, trace, monitor)`: `MONITOR=1`
abre el endpoint dejando el log en silencio (`NoLog` true / `Debug` false), y se
mantiene el acoplamiento inverso por compatibilidad (`DEBUG` sigue implicando
`MONITOR`). El bind loopback `127.0.0.1` queda hardcoded — el monitoring NUNCA es
público y no lleva auth; lo lee un scraper local que empuja a VictoriaMetrics
(dashboard `unibus-nats` en `fleet_monitoring`). Se versiona el cableado de
deploy: drop-in systemd aditivo `membershipd-cluster.service.d/nats-monitor.conf`
(`Environment=UNIBUS_NATS_MONITOR=1`) + sección "NATS server metrics" en el
README del cluster con el runbook de activación rolling (magnus→homer→datardos)
y gate de reconvergencia R3 (`followers 2/2`) entre nodos. Tests nuevos: tabla
pura del desacoplamiento (monitor on ⇒ log NO debug; debug ⇒ monitor; default
cerrado) + server real con `MONITOR=1` que confirma `/varz` 200 en loopback:8222
y server sin flag con el endpoint cerrado. Cambios 100% aditivos: sin el flag el
comportamiento es idéntico; build/test verdes.
- v0.10.0 (2026-06-07) — API HTTP admin-only de gestión de usuarios, cerrando la - v0.10.0 (2026-06-07) — API HTTP admin-only de gestión de usuarios, cerrando la
última asimetría del control plane: las rooms tenían superficie HTTP firmada última asimetría del control plane: las rooms tenían superficie HTTP firmada
(`POST /rooms`, etc.) pero los users solo se gestionaban por CLI local o acceso (`POST /rooms`, etc.) pero los users solo se gestionaban por CLI local o acceso
+246
View File
@@ -0,0 +1,246 @@
package main
import (
"encoding/hex"
"fmt"
"strings"
"sync"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/room"
)
// gateway is the live web gateway: it owns the operator's identity and a single
// connected unibus client, and turns the bus's crypto-bearing API into the plain
// REST/SSE surface the browser consumes. The browser never signs, never speaks
// NATS, and never sees a private key — the gateway is the legitimate room member
// that seals/opens payloads on the browser's behalf.
//
// TRUST MODEL: content stays end-to-end encrypted on the wire. The gateway can
// read plaintext because it acts AS the operator's client — a real member of
// each room, holding the room key K like any peer. It is the same trust a native
// desktop client has. In the wallet phase (per-browser WebCrypto identity) the
// decryption can move into the browser; today, for the single-operator MVP, the
// gateway decrypts server-side and pushes cleartext over a loopback/authenticated
// SSE channel.
type gateway struct {
id cs.Identity
endpoint string
cli *client.Client
refreshACL bool // call RefreshSession after a membership change (needed under a per-subject ACL bus)
mu sync.Mutex
hubs map[string]*roomHub // roomID -> live fan-out of decrypted frames to SSE clients
}
// gatewayConfig wires a live gateway.
type gatewayConfig struct {
Identity cs.Identity
NatsURL string
CtrlURL string
CtrlURLs []string
NatsURLs []string
CAPath string // bus CA; empty => plaintext dev connection (matches a loopback membershipd)
}
// newGateway connects the unibus client with the operator identity following the
// same posture seam every peer uses: a non-empty CA path means TLS + nkey, empty
// means plaintext dev. When a CA is configured the bus is assumed to enforce a
// per-subject ACL, so membership changes trigger a session refresh.
func newGateway(cfg gatewayConfig) (*gateway, error) {
opts := client.Options{
CtrlURLs: cfg.CtrlURLs,
NatsServers: cfg.NatsURLs,
}
if cfg.CAPath != "" {
tlsCfg, err := busauth.LoadCATLSConfig(cfg.CAPath)
if err != nil {
return nil, fmt.Errorf("webgw: load bus CA %q: %w", cfg.CAPath, err)
}
opts.UseNkey = true
opts.TLS = tlsCfg
opts.CtrlTLS = tlsCfg
}
cli, err := client.NewWithOptions(cfg.NatsURL, cfg.CtrlURL, cfg.Identity, opts)
if err != nil {
return nil, fmt.Errorf("webgw: connect bus client: %w", err)
}
return &gateway{
id: cfg.Identity,
endpoint: frame.EndpointID(cfg.Identity.SignPub),
cli: cli,
refreshACL: cfg.CAPath != "",
hubs: map[string]*roomHub{},
}, nil
}
// Close stops every hub and releases the bus client connection.
func (g *gateway) Close() error {
g.mu.Lock()
for _, h := range g.hubs {
h.stop()
}
g.hubs = map[string]*roomHub{}
g.mu.Unlock()
if g.cli != nil {
return g.cli.Close()
}
return nil
}
// ---- wire types (browser-facing JSON) ------------------------------------
// meInfo is what GET /api/me returns: the operator identity the gateway acts as.
type meInfo struct {
Endpoint string `json:"endpoint"`
SignPub string `json:"sign_pub"`
}
// roomWire is the browser view of a room. It deliberately omits messages: those
// stream over SSE (GET /api/rooms/{id}/stream), not in the room list.
type roomWire struct {
ID string `json:"id"`
Subject string `json:"subject"`
Name string `json:"name"`
Epoch int `json:"epoch"`
Encrypt bool `json:"encrypt"`
Persist bool `json:"persist"`
SignMsgs bool `json:"sign_msgs"`
Role string `json:"role"`
}
// createRoomReq is the POST /api/rooms body. Encrypt/Persist/SignMsgs are
// pointers so an omitted field falls back to the chat default rather than to the
// Go zero value (false). The common case — the browser sending only {subject,
// encrypted} — maps encrypted onto all three (the Matrix-like chat policy).
type createRoomReq struct {
Subject string `json:"subject"`
Encrypted *bool `json:"encrypted,omitempty"`
Encrypt *bool `json:"encrypt,omitempty"`
Persist *bool `json:"persist,omitempty"`
SignMsgs *bool `json:"sign_msgs,omitempty"`
}
// policy resolves the requested policy. A bare {subject} defaults to the
// Matrix-like chat room (encrypted + persisted + signed) so a created room keeps
// durable, end-to-end-encrypted, authored history. Callers can override any leg.
func (r createRoomReq) policy() room.Policy {
enc, per, sig := true, true, true
if r.Encrypted != nil {
enc, per, sig = *r.Encrypted, *r.Encrypted, *r.Encrypted
}
if r.Encrypt != nil {
enc = *r.Encrypt
}
if r.Persist != nil {
per = *r.Persist
}
if r.SignMsgs != nil {
sig = *r.SignMsgs
}
return room.Policy{Encrypt: enc, Persist: per, SignMsgs: sig}
}
// sendReq is the POST /api/rooms/{id}/send body.
type sendReq struct {
Body string `json:"body"`
}
// msgWire is one decrypted message pushed over SSE.
type msgWire struct {
ID string `json:"id"`
Sender string `json:"sender"`
Body string `json:"body"`
TS int64 `json:"ts"` // epoch ms (decoded from the frame's ULID id)
Mine bool `json:"mine"`
}
// ---- operations -----------------------------------------------------------
func (g *gateway) me() meInfo {
return meInfo{Endpoint: g.endpoint, SignPub: hex.EncodeToString(g.id.SignPub)}
}
// subjectName derives a short, human-friendly room name from its bus subject by
// dropping the leading namespace segment (room., test., proc., agent.). It is a
// display nicety only; the canonical identity stays the subject/room id.
func subjectName(subject string) string {
for _, p := range []string{"room.", "test.", "proc.", "agent.", "rpc."} {
if strings.HasPrefix(subject, p) {
return strings.TrimPrefix(subject, p)
}
}
return subject
}
func (g *gateway) listRooms() ([]roomWire, error) {
rooms, err := g.cli.ListMyRooms()
if err != nil {
return nil, err
}
out := make([]roomWire, 0, len(rooms))
for _, rm := range rooms {
out = append(out, roomWire{
ID: rm.RoomID,
Subject: rm.Subject,
Name: subjectName(rm.Subject),
Epoch: rm.Epoch,
Encrypt: rm.Policy.Encrypt,
Persist: rm.Policy.Persist,
SignMsgs: rm.Policy.SignMsgs,
Role: rm.Role,
})
}
return out, nil
}
func (g *gateway) createRoom(req createRoomReq) (roomWire, error) {
subject := strings.TrimSpace(req.Subject)
if subject == "" {
return roomWire{}, fmt.Errorf("webgw: subject required")
}
p := req.policy()
roomID, err := g.cli.CreateRoom(subject, p)
if err != nil {
return roomWire{}, err
}
// Under a per-subject ACL the operator's frozen NATS permissions do not yet
// cover the new room's subject; refresh so subsequent data-plane use works. On
// a plaintext/non-ACL dev bus this is unnecessary and would needlessly drop any
// live SSE subscriptions, so it is gated on the secured posture.
if g.refreshACL {
_ = g.cli.RefreshSession()
}
return roomWire{
ID: roomID,
Subject: subject,
Name: subjectName(subject),
Epoch: 1,
Encrypt: p.Encrypt,
Persist: p.Persist,
SignMsgs: p.SignMsgs,
Role: "owner",
}, nil
}
// join resolves room metadata and (for encrypted rooms) fetches the room key so
// the gateway can later open payloads. Idempotent.
func (g *gateway) join(roomID string) error {
if err := g.cli.Join(roomID); err != nil {
return err
}
if g.refreshACL {
_ = g.cli.RefreshSession()
}
return nil
}
// send publishes plaintext to a room. The unibus client seals it with the room
// key (encrypted rooms) and signs it (signed rooms) before it leaves the process.
func (g *gateway) send(roomID, body string) error {
return g.cli.Publish(roomID, []byte(body))
}
+140
View File
@@ -0,0 +1,140 @@
package main
import (
"sync"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/oklog/ulid/v2"
)
// roomHub multiplexes ONE unibus room subscription to MANY SSE clients. The
// unibus client derives a per-(room, endpoint) durable consumer name, so a
// second Subscribe for the same room from the same operator would contend for
// the same durable (load-balanced delivery) rather than each browser receiving
// every message. The hub holds a single subscription per room and fans each
// decrypted frame out to every connected browser, which also means the gateway
// opens at most one bus subscription per room regardless of how many tabs watch
// it.
type roomHub struct {
roomID string
myEndpoint string
sub *client.Sub
mu sync.Mutex
clients map[chan msgWire]struct{}
}
// frameTS decodes the millisecond timestamp embedded in a frame's ULID id. A
// malformed id (should not happen for bus-produced frames) yields 0, which the
// browser renders without crashing.
func frameTS(msgID string) int64 {
id, err := ulid.Parse(msgID)
if err != nil {
return 0
}
return int64(id.Time())
}
// newRoomHub opens the single bus subscription for roomID and starts fanning
// decrypted frames out to registered clients. The room must already be joined
// (so the gateway holds the room key) before this is called.
func newRoomHub(cli *client.Client, roomID, myEndpoint string) (*roomHub, error) {
h := &roomHub{
roomID: roomID,
myEndpoint: myEndpoint,
clients: map[chan msgWire]struct{}{},
}
sub, err := cli.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
m := msgWire{
ID: f.MsgID,
Sender: f.Sender,
Body: string(plaintext),
TS: frameTS(f.MsgID),
Mine: f.Sender == myEndpoint,
}
h.broadcast(m)
})
if err != nil {
return nil, err
}
h.sub = sub
return h, nil
}
// broadcast delivers a message to every registered client without blocking the
// NATS delivery goroutine: a client whose buffer is full (a stalled browser)
// drops this frame rather than stalling the whole room.
func (h *roomHub) broadcast(m msgWire) {
h.mu.Lock()
defer h.mu.Unlock()
for ch := range h.clients {
select {
case ch <- m:
default:
}
}
}
// add registers a new SSE client channel.
func (h *roomHub) add(ch chan msgWire) {
h.mu.Lock()
defer h.mu.Unlock()
h.clients[ch] = struct{}{}
}
// stop unsubscribes from the bus. Local delivery ends; for a persisted room the
// durable consumer's ack position stays on the server, so a later subscription
// with the same operator resumes from where it left off.
func (h *roomHub) stop() {
if h.sub != nil {
_ = h.sub.Unsubscribe()
}
}
// openStream joins the room (idempotent; fetches the room key for encrypted
// rooms), attaches an SSE client to the room's hub (creating it on first watcher),
// and returns the client's message channel plus a cleanup func. The cleanup
// detaches the client and, when it was the last watcher, tears down the room's
// single bus subscription.
func (g *gateway) openStream(roomID string) (chan msgWire, func(), error) {
if err := g.join(roomID); err != nil {
return nil, nil, err
}
g.mu.Lock()
h := g.hubs[roomID]
if h == nil {
var err error
h, err = newRoomHub(g.cli, roomID, g.endpoint)
if err != nil {
g.mu.Unlock()
return nil, nil, err
}
g.hubs[roomID] = h
}
g.mu.Unlock()
// Buffer so a brief render hitch in the browser does not drop live frames; a
// sustained stall still drops (broadcast is non-blocking) rather than wedging
// the room.
ch := make(chan msgWire, 64)
h.add(ch)
// cleanup takes g.mu before h.mu (the single, consistent lock order) so a
// concurrent openStream that re-creates the hub cannot race the teardown.
cleanup := func() {
g.mu.Lock()
defer g.mu.Unlock()
h.mu.Lock()
delete(h.clients, ch)
empty := len(h.clients) == 0
h.mu.Unlock()
if empty {
if cur := g.hubs[roomID]; cur == h {
delete(g.hubs, roomID)
h.stop()
}
}
}
return ch, cleanup, nil
}
+98
View File
@@ -0,0 +1,98 @@
package main
import (
"encoding/base64"
"encoding/json"
"fmt"
"os"
"os/exec"
cs "fn-registry/functions/cybersecurity"
)
// identityJSON mirrors the on-disk / pass-stored identity format shared across
// the unibus tooling: the four keypair halves, each std-base64. It is the SAME
// shape the bus client persists (pkg/client identity file) and the operator's
// `pass` entry unibus/operator-identity, so the web gateway loads the operator's
// identity without a divergent serialization. Kept in lockstep with
// unibus_admin/internal/admin/identity.go.
type identityJSON struct {
SignPub string `json:"sign_pub"`
SignPriv string `json:"sign_priv"`
KexPub string `json:"kex_pub"`
KexPriv string `json:"kex_priv"`
}
// decodeIdentity turns the JSON identity bytes into a cs.Identity. The private
// halves stay only in memory; this never writes them anywhere.
func decodeIdentity(raw []byte) (cs.Identity, error) {
var f identityJSON
if err := json.Unmarshal(raw, &f); err != nil {
return cs.Identity{}, fmt.Errorf("webgw: parse identity json: %w", err)
}
dec := base64.StdEncoding.DecodeString
signPub, err := dec(f.SignPub)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: decode sign_pub: %w", err)
}
signPriv, err := dec(f.SignPriv)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: decode sign_priv: %w", err)
}
kexPub, err := dec(f.KexPub)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: decode kex_pub: %w", err)
}
kexPriv, err := dec(f.KexPriv)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: decode kex_priv: %w", err)
}
if len(signPub) != 32 || len(signPriv) != 64 || len(kexPub) != 32 || len(kexPriv) != 32 {
return cs.Identity{}, fmt.Errorf("webgw: identity has wrong key sizes (sign_pub=%d sign_priv=%d kex_pub=%d kex_priv=%d)",
len(signPub), len(signPriv), len(kexPub), len(kexPriv))
}
return cs.Identity{SignPub: signPub, SignPriv: signPriv, KexPub: kexPub, KexPriv: kexPriv}, nil
}
// loadIdentityFromFile reads a 0600 identity JSON file (the same format the bus
// client writes) and decodes it. Used on a deploy host where `pass` is not
// available and the operator identity is delivered as a protected file.
func loadIdentityFromFile(path string) (cs.Identity, error) {
raw, err := os.ReadFile(path)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: read identity file %q: %w", path, err)
}
return decodeIdentity(raw)
}
// loadIdentityFromPass shells out to `pass show <entry>` and decodes the JSON
// identity it returns. The secret is held only in memory; this process never
// writes it to disk or argv. Used in local operator workflows where the GNU
// password store holds unibus/operator-identity.
func loadIdentityFromPass(entry string) (cs.Identity, error) {
out, err := exec.Command("pass", "show", entry).Output()
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: pass show %q: %w", entry, err)
}
return decodeIdentity(out)
}
// loadPassValue returns the first line of a `pass show <entry>` for non-identity
// secrets (e.g. the unlock passphrase). Empty entry yields an empty string and
// no error, so callers can treat "no pass entry configured" as "not set".
func loadPassValue(entry string) (string, error) {
if entry == "" {
return "", nil
}
out, err := exec.Command("pass", "show", entry).Output()
if err != nil {
return "", fmt.Errorf("webgw: pass show %q: %w", entry, err)
}
s := string(out)
for i := 0; i < len(s); i++ {
if s[i] == '\n' || s[i] == '\r' {
return s[:i], nil
}
}
return s, nil
}
+180
View File
@@ -0,0 +1,180 @@
// Command webgw is the web gateway for the unibus chat SPA. It is a single Go
// binary that holds the operator's bus identity, connects to the bus as a real
// authenticated peer (pkg/client), and exposes a small REST + SSE API the
// browser consumes. The browser never signs, never speaks NATS, and never sees a
// private key: it authenticates to the gateway with a passphrase and thereafter
// holds only an opaque session cookie.
//
// TRUST MODEL (MVP, single operator): room content stays end-to-end encrypted on
// the bus. The gateway can read plaintext because it acts AS the operator's
// client — a legitimate member of each room holding the room key. Decryption
// happens server-side in this process; cleartext then crosses an authenticated
// (loopback or TLS-fronted) SSE channel to the browser. The wallet phase (issue:
// per-browser WebCrypto identity) can move decryption into the browser; see the
// report for the FASE 2 plan.
//
// # local dev against a loopback membershipd (plaintext), operator from pass:
// webgw --identity-pass unibus/operator-identity \
// --ctrl-url http://127.0.0.1:8470 --nats-url nats://127.0.0.1:4250
//
// # secured cluster (TLS + nkey on both planes), identity from a 0600 file:
// webgw --ca ca.crt --identity-file operator.id \
// --ctrl-url https://node-a:8470 --nats-url nats://node-a:4250 \
// --ctrl-urls https://node-b:8470,https://node-c:8470 \
// --nats-urls nats://node-b:4250,nats://node-c:4250
package main
import (
"context"
"flag"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
cs "fn-registry/functions/cybersecurity"
)
func main() {
var (
bind = flag.String("bind", "127.0.0.1", "interface to bind the gateway HTTP server to (loopback by default)")
port = flag.String("port", "8481", "gateway HTTP port")
ctrlURL = flag.String("ctrl-url", "http://127.0.0.1:8470", "primary unibus control-plane base URL")
ctrlURLs = flag.String("ctrl-urls", "", "comma-separated ADDITIONAL control-plane base URLs (cluster failover)")
natsURL = flag.String("nats-url", "nats://127.0.0.1:4250", "primary NATS URL")
natsURLs = flag.String("nats-urls", "", "comma-separated ADDITIONAL NATS seed URLs (cluster failover)")
caPath = flag.String("ca", "", "bus CA cert path; set to talk TLS+nkey to a secured bus (empty = plaintext dev)")
identityFile = flag.String("identity-file", "", "path to the operator identity JSON file (0600). Mutually exclusive with --identity-pass")
identityPass = flag.String("identity-pass", "", "pass(1) entry holding the operator identity JSON, e.g. unibus/operator-identity")
unlockPass = flag.String("unlock-pass", "", "literal passphrase the browser must send to unlock a session (dev). Prefer --unlock-pass-entry")
unlockEntry = flag.String("unlock-pass-entry", "unibus/admin-panel-password", "pass(1) entry holding the unlock passphrase (used when --unlock-pass is empty)")
webDir = flag.String("web-dir", "", "OPTIONAL path to the built SPA (web/dist) to serve. Empty = API only (use vite dev server)")
)
flag.Parse()
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
log.SetPrefix("[webgw] ")
id, err := loadIdentity(*identityFile, *identityPass)
if err != nil {
log.Fatalf("%v", err)
}
unlock := *unlockPass
if unlock == "" {
unlock, err = loadPassValue(*unlockEntry)
if err != nil {
log.Fatalf("resolve unlock passphrase: %v", err)
}
}
if unlock == "" {
log.Fatalf("an unlock passphrase is required: set --unlock-pass or a non-empty --unlock-pass-entry (default unibus/admin-panel-password)")
}
resolvedWebDir := resolveWebDir(*webDir)
gw, err := newGateway(gatewayConfig{
Identity: id,
NatsURL: *natsURL,
CtrlURL: *ctrlURL,
CtrlURLs: splitCSV(*ctrlURLs),
NatsURLs: splitCSV(*natsURLs),
CAPath: *caPath,
})
if err != nil {
log.Fatalf("%v", err)
}
defer gw.Close()
log.Printf("operator endpoint: %s", gw.endpoint)
log.Printf("control plane: %s (+%d failover)", *ctrlURL, len(splitCSV(*ctrlURLs)))
tls := "OFF (plaintext dev)"
if *caPath != "" {
tls = "ON (CA " + *caPath + ")"
}
log.Printf("bus TLS+nkey: %s", tls)
if resolvedWebDir != "" {
log.Printf("serving SPA from: %s", resolvedWebDir)
} else {
log.Printf("API only (no --web-dir): use the vite dev server with a /api+stream proxy")
}
srv := newServer(gw, unlock, resolvedWebDir)
addr := *bind + ":" + *port
httpSrv := &http.Server{
Addr: addr,
Handler: srv,
// No global write timeout: SSE streams are long-lived. Header timeout still
// bounds slowloris on the request line/headers.
ReadHeaderTimeout: 10 * time.Second,
}
go func() {
log.Printf("web gateway: http://%s", addr)
if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("http server: %v", err)
}
}()
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
<-stop
log.Printf("shutting down...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = httpSrv.Shutdown(ctx)
log.Printf("bye")
}
// loadIdentity resolves the operator identity from exactly one of --identity-file
// or --identity-pass.
func loadIdentity(file, passEntry string) (cs.Identity, error) {
switch {
case file != "" && passEntry != "":
return cs.Identity{}, errFlag("set only one of --identity-file or --identity-pass")
case file != "":
return loadIdentityFromFile(file)
case passEntry != "":
return loadIdentityFromPass(passEntry)
default:
return cs.Identity{}, errFlag("an identity is required: pass --identity-file <path> or --identity-pass <entry>")
}
}
// resolveWebDir validates the --web-dir flag. An empty flag means API-only. A
// non-empty dir is kept only if it actually holds an index.html, so a typo logs
// "API only" rather than serving 404s.
func resolveWebDir(dir string) string {
if dir == "" {
return ""
}
abs, err := filepath.Abs(dir)
if err != nil {
log.Printf("WARN --web-dir %q: %v; serving API only", dir, err)
return ""
}
if !statFile(filepath.Join(abs, "index.html")) {
log.Printf("WARN --web-dir %q has no index.html; serving API only", abs)
return ""
}
return abs
}
type flagErr string
func (e flagErr) Error() string { return string(e) }
func errFlag(s string) error { return flagErr("webgw: " + s) }
func splitCSV(s string) []string {
var out []string
for _, p := range strings.Split(s, ",") {
if p = strings.TrimSpace(p); p != "" {
out = append(out, p)
}
}
return out
}
+301
View File
@@ -0,0 +1,301 @@
package main
import (
"crypto/rand"
"crypto/subtle"
"encoding/hex"
"encoding/json"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
// sessionCookie is the name of the gateway's session cookie. The browser sends
// it automatically on same-origin fetches AND on EventSource (SSE) connections —
// EventSource cannot set custom headers, so a cookie is the only way to
// authenticate the stream. It is HttpOnly so page JS can never read the token.
const sessionCookie = "unibus_session"
// server is the gateway's HTTP surface: a small REST/SSE API under /api gated by
// a session cookie, plus an optional static file server for the built SPA. The
// gateway's privileged operator identity never leaves the process; the browser
// authenticates with a passphrase and thereafter holds only an opaque session
// token.
type server struct {
gw *gateway
unlock string // passphrase that unlocks a session (compared in constant time)
webDir string // optional path to the built SPA (web/dist); empty = API only
mux *http.ServeMux
mu sync.Mutex
sessions map[string]time.Time // token -> issued-at
}
func newServer(gw *gateway, unlock, webDir string) *server {
s := &server{
gw: gw,
unlock: unlock,
webDir: webDir,
mux: http.NewServeMux(),
sessions: map[string]time.Time{},
}
s.routes()
return s
}
func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mux.ServeHTTP(w, r) }
func (s *server) routes() {
// Liveness, unauthenticated (systemd / deploy smoke).
s.mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, _ *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
})
// Auth: login is the only /api route reachable without a session.
s.mux.HandleFunc("POST /api/login", s.handleLogin)
s.mux.HandleFunc("POST /api/logout", s.auth(s.handleLogout))
s.mux.HandleFunc("GET /api/me", s.auth(s.handleMe))
s.mux.HandleFunc("GET /api/rooms", s.auth(s.handleListRooms))
s.mux.HandleFunc("POST /api/rooms", s.auth(s.handleCreateRoom))
s.mux.HandleFunc("POST /api/rooms/{id}/join", s.auth(s.handleJoin))
s.mux.HandleFunc("POST /api/rooms/{id}/send", s.auth(s.handleSend))
s.mux.HandleFunc("GET /api/rooms/{id}/stream", s.auth(s.handleStream))
// Everything else is the SPA (when --web-dir is set). Registered last.
if s.webDir != "" {
s.mux.Handle("/", s.spaHandler())
}
}
// ---- auth -----------------------------------------------------------------
// auth wraps a handler so it runs only with a valid session cookie. A missing or
// unknown token yields 401, which the SPA treats as "show the login screen".
func (s *server) auth(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
c, err := r.Cookie(sessionCookie)
if err != nil || !s.validSession(c.Value) {
writeErr(w, http.StatusUnauthorized, "not authenticated")
return
}
next(w, r)
}
}
func (s *server) validSession(token string) bool {
if token == "" {
return false
}
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.sessions[token]
return ok
}
func (s *server) handleLogin(w http.ResponseWriter, r *http.Request) {
var req struct {
Passphrase string `json:"passphrase"`
}
if !decode(w, r, &req) {
return
}
// Constant-time compare so a wrong passphrase cannot be timed character by
// character. An empty configured passphrase never matches (main refuses to
// start without one, so this is defense in depth).
if s.unlock == "" || subtle.ConstantTimeCompare([]byte(req.Passphrase), []byte(s.unlock)) != 1 {
writeErr(w, http.StatusUnauthorized, "wrong passphrase")
return
}
tok := newToken()
s.mu.Lock()
s.sessions[tok] = time.Now()
s.mu.Unlock()
http.SetCookie(w, &http.Cookie{
Name: sessionCookie,
Value: tok,
Path: "/",
HttpOnly: true,
SameSite: http.SameSiteLaxMode,
})
writeJSON(w, http.StatusOK, s.gw.me())
}
func (s *server) handleLogout(w http.ResponseWriter, r *http.Request) {
if c, err := r.Cookie(sessionCookie); err == nil {
s.mu.Lock()
delete(s.sessions, c.Value)
s.mu.Unlock()
}
http.SetCookie(w, &http.Cookie{Name: sessionCookie, Value: "", Path: "/", MaxAge: -1, HttpOnly: true})
writeJSON(w, http.StatusOK, map[string]string{"status": "logged_out"})
}
func (s *server) handleMe(w http.ResponseWriter, _ *http.Request) {
writeJSON(w, http.StatusOK, s.gw.me())
}
// ---- rooms ----------------------------------------------------------------
func (s *server) handleListRooms(w http.ResponseWriter, _ *http.Request) {
rooms, err := s.gw.listRooms()
if err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, rooms)
}
func (s *server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {
var req createRoomReq
if !decode(w, r, &req) {
return
}
rv, err := s.gw.createRoom(req)
if err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusCreated, rv)
}
func (s *server) handleJoin(w http.ResponseWriter, r *http.Request) {
if err := s.gw.join(r.PathValue("id")); err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "joined"})
}
func (s *server) handleSend(w http.ResponseWriter, r *http.Request) {
var req sendReq
if !decode(w, r, &req) {
return
}
if strings.TrimSpace(req.Body) == "" {
writeErr(w, http.StatusBadRequest, "body required")
return
}
if err := s.gw.send(r.PathValue("id"), req.Body); err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "sent"})
}
// handleStream is the SSE endpoint: it joins the room, attaches to the room's
// fan-out hub, and streams each decrypted message as a `data:` event. For a
// persisted room the hub's underlying subscription delivers history first
// (scrollback) and then live messages; for an ephemeral room only live messages
// flow. The stream ends when the browser disconnects (ctx cancelled).
func (s *server) handleStream(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
writeErr(w, http.StatusInternalServerError, "streaming unsupported")
return
}
ch, cleanup, err := s.gw.openStream(r.PathValue("id"))
if err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
defer cleanup()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // disable proxy buffering (nginx/caddy)
w.WriteHeader(http.StatusOK)
// An initial comment opens the stream immediately so the browser's
// EventSource fires `onopen` without waiting for the first message.
_, _ = w.Write([]byte(": connected\n\n"))
flusher.Flush()
ctx := r.Context()
ping := time.NewTicker(25 * time.Second)
defer ping.Stop()
for {
select {
case <-ctx.Done():
return
case <-ping.C:
// Comment line keeps idle proxies from closing the connection.
if _, err := w.Write([]byte(": ping\n\n")); err != nil {
return
}
flusher.Flush()
case m := <-ch:
b, err := json.Marshal(m)
if err != nil {
continue
}
if _, err := w.Write([]byte("data: " + string(b) + "\n\n")); err != nil {
return
}
flusher.Flush()
}
}
}
// ---- SPA serving (optional) -----------------------------------------------
// spaHandler serves the built SPA from s.webDir. A request for an existing asset
// is served directly; any other path (a client-side route) falls back to
// index.html so the SPA router can take over. /api and /healthz are matched first.
func (s *server) spaHandler() http.Handler {
root := http.Dir(s.webDir)
fileServer := http.FileServer(root)
index := filepath.Join(s.webDir, "index.html")
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/")
if p == "" {
http.ServeFile(w, r, index)
return
}
if f, err := root.Open(p); err == nil {
_ = f.Close()
fileServer.ServeHTTP(w, r)
return
}
http.ServeFile(w, r, index) // unknown path -> SPA client-side routing
})
}
// ---- helpers --------------------------------------------------------------
func newToken() string {
b := make([]byte, 32)
_, _ = rand.Read(b)
return hex.EncodeToString(b)
}
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(v)
}
func writeErr(w http.ResponseWriter, code int, msg string) {
writeJSON(w, code, map[string]string{"error": msg})
}
// decode reads a JSON body into v, writing a 400 and returning false on failure.
func decode(w http.ResponseWriter, r *http.Request, v any) bool {
defer r.Body.Close()
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(v); err != nil {
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
return false
}
return true
}
// statFile reports whether path exists and is a regular file (used to validate
// --web-dir at startup so a typo surfaces as a clear log line, not 404s later).
func statFile(path string) bool {
fi, err := os.Stat(path)
return err == nil && !fi.IsDir()
}
-58
View File
@@ -283,61 +283,3 @@ ssh dd 'sudo systemctl start membershipd-cluster' # rejoins, catches up
the unit and start it without `--store kv`/`--cluster-name`; the KV buckets remain the unit and start it without `--store kv`/`--cluster-name`; the KV buckets remain
for a later retry. To rotate the cluster CA, re-run `generate-cluster-certs.sh for a later retry. To rotate the cluster CA, re-run `generate-cluster-certs.sh
--force` and re-stage (every node must get the new `cluster-ca.crt` together). --force` and re-stage (every node must get the new `cluster-ca.crt` together).
## NATS server metrics (loopback monitoring — optional)
The embedded NATS server can expose its own monitoring HTTP endpoint so a local
scraper reads server-level metrics that `/healthz` does not surface: msgs/s,
connections, slow consumers, memory, KV bucket message counts, the RAFT leader per
stream and per-stream restarts. This feeds the `unibus-nats` dashboard in
`fleet_monitoring` (the scraper hits `127.0.0.1:8222/varz|/connz|/jsz` over
loopback and pushes to VictoriaMetrics).
The endpoint is opened by the **dedicated** environment toggle `UNIBUS_NATS_MONITOR=1`
(0.11.0+ binary). It is **decoupled** from `UNIBUS_NATS_DEBUG`: it opens the
monitoring endpoint WITHOUT enabling the verbose nats-server debug log, so no room
subjects or routing metadata leak to journald (keeps the hardened posture, issue
0007). The endpoint binds `127.0.0.1:8222` **only** — the binary hardcodes the
loopback bind, so it is never reachable from the network and needs no auth. Never
use `UNIBUS_NATS_DEBUG` in production just to get the endpoint.
### Enable it (HUMAN — requires the 0.11.0+ binary on the node)
The clean way is the additive systemd drop-in in this directory:
```bash
# On each node, AFTER the 0.11.0+ binary is in /opt/unibus/membershipd:
ssh <node> 'sudo mkdir -p /etc/systemd/system/membershipd-cluster.service.d'
scp membershipd-cluster.service.d/nats-monitor.conf <node>:/tmp/nats-monitor.conf
ssh <node> 'sudo cp /tmp/nats-monitor.conf /etc/systemd/system/membershipd-cluster.service.d/ \
&& sudo systemctl daemon-reload && sudo systemctl restart membershipd-cluster'
```
(Equivalently, add `UNIBUS_NATS_MONITOR=1` to `/opt/unibus/cluster.env`, which the
unit already sources via `EnvironmentFile`; the drop-in is preferred because it is
self-documenting and does not edit the generated env file.)
### Rolling restart with the R3 reconvergence gate (CRITICAL)
`systemctl restart membershipd-cluster` restarts that node's JetStream RAFT member.
**Never restart two nodes at once** — that would drop the cluster below quorum
(2/3) and fail the control plane closed. Roll **one node at a time**, in the order
`magnus → homer → datardos`, and between each node wait until the cluster has
reconverged to R3 (every control-plane bucket back to `followers_current=2/2`):
```bash
# After restarting ONE node, gate on R3 reconvergence before touching the next:
ssh root@magnus 'for s in KV_UNIBUS_users KV_UNIBUS_rooms KV_UNIBUS_members \
KV_UNIBUS_room_keys KV_UNIBUS_rooms_by_member KV_UNIBUS_nonces; do
nats --server nats://127.0.0.1:4250 stream info "$s" -j \
| jq -r --arg s "$s" \"\\($s): replicas=\\(.cluster.replicas|length) leader=\\(.cluster.leader)\"
done'
# Proceed to the next node ONLY when all six show 3 replicas with a leader
# (i.e. 2/2 followers current). Also confirm healthz is green on the just-restarted
# node first:
ssh <node> 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt'
```
This restart is normally **not** done as a standalone step: the 0.11.0 binary that
carries the flag is rolled to the three nodes in the consolidated rollout, and the
drop-in is installed during that same rolling restart.
@@ -1,27 +0,0 @@
# Drop-in: enable the embedded NATS server monitoring HTTP endpoint so a local
# metrics scraper can read /varz, /connz and /jsz for server-level metrics
# (msgs/s, connections, KV bucket msgs, RAFT leader per stream, restarts).
#
# ADDITIVE and minimal: it only sets one environment variable; the base unit
# (membershipd-cluster.service) is otherwise unchanged.
#
# UNIBUS_NATS_MONITOR is DECOUPLED from UNIBUS_NATS_DEBUG: it opens the monitoring
# endpoint WITHOUT enabling the verbose nats-server debug log, so no room subjects
# or routing metadata are written to journald (keeps the hardened posture, issue
# 0007). Do NOT use UNIBUS_NATS_DEBUG in production just to get the endpoint.
#
# The endpoint binds 127.0.0.1:8222 ONLY — the binary hardcodes the loopback bind,
# so it is never reachable from the network and needs no auth. The scraper runs on
# the same host and reads it over loopback.
#
# Requires the 0.11.0+ membershipd binary (the one that honors UNIBUS_NATS_MONITOR).
# Install on a node:
# sudo mkdir -p /etc/systemd/system/membershipd-cluster.service.d
# sudo cp nats-monitor.conf /etc/systemd/system/membershipd-cluster.service.d/
# sudo systemctl daemon-reload && sudo systemctl restart membershipd-cluster
#
# Restarting a node restarts its JetStream RAFT member, so roll ONE node at a time
# and wait for R3 reconvergence (followers 2/2) before touching the next. See the
# "NATS server metrics" section of this directory's README for the full runbook.
[Service]
Environment=UNIBUS_NATS_MONITOR=1
-28
View File
@@ -1,28 +0,0 @@
-- 003_invites.sql — single-use registration invites (issue: user accounts / wallet model).
--
-- An admin mints an invite so a brand-new identity can join the bus allowlist
-- WITHOUT the admin ever handling its private key. The token is the bearer
-- secret that authorizes POST /register: the registering client generates its
-- keypair locally and publishes only its public keys, fixing the link between an
-- invite and the identity it creates via the audit columns below. The handle and
-- role are fixed by the admin at mint time and cannot be changed by the client
-- (no privilege escalation).
--
-- Additive and idempotent: safe to apply repeatedly. Never modify this file;
-- further schema changes go in new numbered migrations (see
-- .claude/rules/db_migrations.md). The embedded copy under
-- pkg/membership/migrations/003_invites.sql mirrors this file byte-for-byte.
CREATE TABLE IF NOT EXISTS invites (
token TEXT PRIMARY KEY, -- 32 random bytes in lowercase hex (the bearer secret)
handle TEXT NOT NULL, -- handle the new user will get (fixed by admin)
role TEXT NOT NULL DEFAULT 'member', -- 'admin' | 'member' (fixed by admin)
expires_at TEXT NOT NULL, -- RFC3339; past this the invite is dead
used INTEGER NOT NULL DEFAULT 0, -- 0 pending, 1 consumed (single-use)
created_at TEXT NOT NULL,
used_at TEXT, -- RFC3339 when consumed (NULL until used)
used_sign_pub TEXT, -- Ed25519 key that consumed it (audit; NULL until used)
used_kex_pub TEXT -- X25519 key presented at registration (audit; NULL until used)
);
CREATE INDEX IF NOT EXISTS idx_invites_used ON invites(used);
-159
View File
@@ -331,60 +331,6 @@ func (c *Client) doJSON(method, path string, body, out any) error {
return fmt.Errorf("client: %s %s: all control planes failed: %w", method, path, lastErr) return fmt.Errorf("client: %s %s: all control planes failed: %w", method, path, lastErr)
} }
// doUnsigned performs a control-plane request WITHOUT the transport signature
// headers, for the one endpoint a not-yet-registered identity must reach: POST
// /register. The registering peer is not in the allowlist, so it cannot produce
// an accepted signature; authorization is the single-use invite token inside the
// body. Like doJSON it fails over across the control-plane endpoints (any node
// serves the same state) and surfaces the server's structured error message.
func (c *Client) doUnsigned(method, path string, body, out any) error {
var bodyBytes []byte
if body != nil {
b, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("client: marshal request: %w", err)
}
bodyBytes = b
}
var lastErr error
for _, base := range c.ctrlURLs {
var rdr io.Reader
if bodyBytes != nil {
rdr = bytes.NewReader(bodyBytes)
}
req, err := http.NewRequest(method, base+path, rdr)
if err != nil {
return fmt.Errorf("client: new request: %w", err)
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := c.http.Do(req)
if err != nil {
lastErr = err
continue // dead node: try the next control plane
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 300 {
var er struct {
Error string `json:"error"`
}
if json.Unmarshal(respBody, &er) == nil && er.Error != "" {
return fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode)
}
return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody))
}
if out != nil {
if err := json.Unmarshal(respBody, out); err != nil {
return fmt.Errorf("client: decode response: %w", err)
}
}
return nil
}
return fmt.Errorf("client: %s %s: all control planes failed: %w", method, path, lastErr)
}
// signRequest signs the canonical bytes of req (req must already have its Sig // signRequest signs the canonical bytes of req (req must already have its Sig
// field cleared) with the client's Ed25519 key. It is symmetric with the // field cleared) with the client's Ed25519 key. It is symmetric with the
// server's verifyOwnerSig. This is the PAYLOAD-level owner signature that // server's verifyOwnerSig. This is the PAYLOAD-level owner signature that
@@ -527,35 +473,6 @@ type addUserReq struct {
Role string `json:"role"` Role string `json:"role"`
} }
// createInviteReq / createInviteResp mirror the server's POST /invites types.
type createInviteReq struct {
Handle string `json:"handle"`
Role string `json:"role"`
TTLSecs int `json:"ttl_secs"`
}
type createInviteResp struct {
Token string `json:"token"`
ExpiresAt string `json:"expires_at"`
}
// inviteJSON mirrors the server's GET /invites row.
type inviteJSON struct {
Token string `json:"token"`
Handle string `json:"handle"`
Role string `json:"role"`
ExpiresAt string `json:"expires_at"`
Used bool `json:"used"`
CreatedAt string `json:"created_at"`
}
// registerReq mirrors the server's POST /register body.
type registerReq struct {
Token string `json:"token"`
SignPub string `json:"sign_pub"`
KexPub string `json:"kex_pub"`
}
// ---- room operations ------------------------------------------------------ // ---- room operations ------------------------------------------------------
// RoomRef is a room this peer belongs to, returned by ListMyRooms. It is the // RoomRef is a room this peer belongs to, returned by ListMyRooms. It is the
@@ -643,82 +560,6 @@ func (c *Client) RevokeUser(signPub string) error {
return c.doJSON("POST", "/users/"+signPub+"/revoke", nil, nil) return c.doJSON("POST", "/users/"+signPub+"/revoke", nil, nil)
} }
// DeleteUser hard-deletes a bus user by their signing public key (64-hex) — the
// purge counterpart of RevokeUser. The allowlist row is removed entirely (no
// audit trail); the ex-user can no longer authenticate, so their room
// memberships become inert. The caller must be signing as an admin.
func (c *Client) DeleteUser(signPub string) error {
return c.doJSON("DELETE", "/users/"+signPub, nil, nil)
}
// InviteInfo is a single-use registration invite as returned by the admin invite
// endpoints. It is a flat view for the admin panel: the bearer token (to build
// the join link), the handle and role the new user will receive, the absolute
// expiry, whether it has been used, and when it was minted.
type InviteInfo struct {
Token string
Handle string
Role string
ExpiresAt string
Used bool
CreatedAt string
}
// CreateInvite mints a single-use registration invite. handle and role are fixed
// here (the registering client cannot change them); role is "admin" or "member"
// (empty defaults to member). ttlSecs sets the link lifetime (non-positive uses
// the server's 7-day default). The returned InviteInfo carries the token and
// expiry; the caller turns the token into a join link. Caller must sign as admin.
func (c *Client) CreateInvite(handle, role string, ttlSecs int) (InviteInfo, error) {
var resp createInviteResp
if err := c.doJSON("POST", "/invites", createInviteReq{Handle: handle, Role: role, TTLSecs: ttlSecs}, &resp); err != nil {
return InviteInfo{}, err
}
r := role
if r == "" {
r = "member"
}
return InviteInfo{Token: resp.Token, Handle: handle, Role: r, ExpiresAt: resp.ExpiresAt}, nil
}
// ListInvites returns the pending invites (not used, not expired). Caller must
// sign as admin.
func (c *Client) ListInvites() ([]InviteInfo, error) {
var resp []inviteJSON
if err := c.doJSON("GET", "/invites", nil, &resp); err != nil {
return nil, err
}
out := make([]InviteInfo, 0, len(resp))
for _, inv := range resp {
out = append(out, InviteInfo{
Token: inv.Token,
Handle: inv.Handle,
Role: inv.Role,
ExpiresAt: inv.ExpiresAt,
Used: inv.Used,
CreatedAt: inv.CreatedAt,
})
}
return out, nil
}
// CancelInvite cancels (deletes) a pending invite by its token, so an admin can
// revoke a link before it is redeemed. Caller must sign as admin.
func (c *Client) CancelInvite(token string) error {
return c.doJSON("DELETE", "/invites/"+token, nil, nil)
}
// Register redeems a single-use invite token, joining the bus allowlist. It is
// the wallet-model join call: the registering peer generated its own keypair
// locally and publishes ONLY its public keys here (signPub Ed25519, kexPub
// X25519, both 64-hex). It is UNSIGNED — the bearer token is the authorization,
// because this identity is not yet in the allowlist and so cannot sign an
// accepted request. On success the identity is registered with the invite's
// handle and role and can connect like any other peer.
func (c *Client) Register(token, signPub, kexPub string) error {
return c.doUnsigned("POST", "/register", registerReq{Token: token, SignPub: signPub, KexPub: kexPub}, nil)
}
// newRoomKey returns 32 random bytes for a symmetric room key. // newRoomKey returns 32 random bytes for a symmetric room key.
func newRoomKey() ([]byte, error) { func newRoomKey() ([]byte, error) {
k := make([]byte, 32) k := make([]byte, 32)
-104
View File
@@ -1,104 +0,0 @@
package client_test
import (
"encoding/hex"
"testing"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/membership"
)
// TestClientInvitesAdminAPI drives the wallet-model account flow through the real
// pkg/client methods against an in-process membershipd under enforce: an admin
// mints an invite, a brand-new identity redeems it via the UNSIGNED Register call
// (it is not yet in the allowlist), the admin then sees the user, and finally the
// admin hard-deletes it and it vanishes. This is the exact path the admin panel +
// the /join client page depend on, so it locks the client/server contract.
func TestClientInvitesAdminAPI(t *testing.T) {
h := newHarnessMode(t, membership.AuthEnforce)
waitHealth(t, h.ctrlURL)
admin, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect admin: %v", err)
}
defer admin.Close()
registerClient(t, h, admin, "admin", membership.RoleAdmin)
// Admin mints a single-use invite fixing handle + role.
inv, err := admin.CreateInvite("dora", membership.RoleMember, 0)
if err != nil {
t.Fatalf("admin CreateInvite: %v", err)
}
if len(inv.Token) != 64 || inv.ExpiresAt == "" {
t.Fatalf("invite malformed: %+v", inv)
}
if inv.Handle != "dora" || inv.Role != membership.RoleMember {
t.Fatalf("invite echo wrong: %+v", inv)
}
// It appears among the pending invites.
pend, err := admin.ListInvites()
if err != nil {
t.Fatalf("admin ListInvites: %v", err)
}
if !containsToken(pend, inv.Token) {
t.Fatalf("minted invite not pending: %+v", pend)
}
// A brand-new identity (NOT in the allowlist) redeems the invite via the
// UNSIGNED Register. We model its locally-generated keypair with a fresh
// identity and present its two public keys. Redeeming through this joiner
// client — which never registered and never seeded an admin — proves Register
// needs no admin signature; the bearer token is the sole authorization.
newID := mustIdentity(t)
signPub := hex.EncodeToString(newID.SignPub)
kexPub := hex.EncodeToString(newID.KexPub)
joiner, err := client.New(h.natsURL, h.ctrlURL, newID)
if err != nil {
t.Fatalf("connect joiner: %v", err)
}
defer joiner.Close()
if err := joiner.Register(inv.Token, signPub, kexPub); err != nil {
t.Fatalf("joiner Register: %v", err)
}
// Admin now sees dora in the allowlist with the invite's handle/role.
users, err := admin.ListUsers()
if err != nil {
t.Fatalf("admin ListUsers: %v", err)
}
row, ok := findUserInfo(users, signPub)
if !ok {
t.Fatalf("registered dora missing from allowlist: %+v", users)
}
if row.Handle != "dora" || row.Role != membership.RoleMember || row.Status != membership.StatusActive {
t.Fatalf("dora row wrong: %+v", row)
}
// Single-use: redeeming again is an error.
if err := joiner.Register(inv.Token, signPub, kexPub); err == nil {
t.Fatalf("second Register should error (used token)")
}
// Admin hard-deletes dora; she vanishes from the allowlist entirely.
if err := admin.DeleteUser(signPub); err != nil {
t.Fatalf("admin DeleteUser: %v", err)
}
users, err = admin.ListUsers()
if err != nil {
t.Fatalf("admin ListUsers after delete: %v", err)
}
if _, ok := findUserInfo(users, signPub); ok {
t.Fatalf("hard-deleted dora must NOT appear: %+v", users)
}
}
func containsToken(invites []client.InviteInfo, token string) bool {
for _, i := range invites {
if i.Token == token {
return true
}
}
return false
}
+11 -34
View File
@@ -103,38 +103,17 @@ func StartHostAuth(storeDir, host string, port int, auth server.Authentication)
return StartServer(ServerConfig{StoreDir: storeDir, Host: host, Port: port, Auth: auth}) return StartServer(ServerConfig{StoreDir: storeDir, Host: host, Port: port, Auth: auth})
} }
// natsLogOpts maps the two independent environment toggles to the embedded
// nats-server logging and monitoring flags. It is a pure function (no I/O) so the
// decoupling between the two toggles can be unit-tested directly.
//
// - UNIBUS_NATS_DEBUG="1" enables the nats-server logger (route/RAFT/JetStream
// errors); "2" additionally enables protocol tracing. Off by default so the
// server stays silent (NoLog) and production behavior is unchanged.
// - UNIBUS_NATS_MONITOR="1" opens the monitoring HTTP endpoint (loopback only)
// for a local metrics scraper to read /varz, /connz and /jsz.
//
// The two are DECOUPLED on purpose: enabling the monitoring endpoint must NOT turn
// on the verbose debug log, which would write room subjects and routing metadata
// to journald in clear and regress the hardened posture (issue 0007). The reverse
// coupling is kept for backward compatibility: debug mode still exposes the
// monitoring endpoint as well (debug implies monitor), so existing debugging
// workflows are unchanged.
func natsLogOpts(debugEnv, monitorEnv string) (noLog, debug, trace, monitor bool) {
debug = debugEnv == "1" || debugEnv == "2"
trace = debugEnv == "2"
monitor = monitorEnv == "1" || debug
noLog = !debug
return noLog, debug, trace, monitor
}
// StartServer launches an embedded nats-server with JetStream from cfg. It // StartServer launches an embedded nats-server with JetStream from cfg. It
// blocks until the server is ready to accept connections (up to 5s) and returns // blocks until the server is ready to accept connections (up to 5s) and returns
// the running server; the caller must Shutdown it. // the running server; the caller must Shutdown it.
func StartServer(cfg ServerConfig) (*server.Server, error) { func StartServer(cfg ServerConfig) (*server.Server, error) {
// Map the two independent env toggles to the nats-server logging + monitoring // Diagnostic toggle: UNIBUS_NATS_DEBUG=1 enables the embedded nats-server's own
// flags. See natsLogOpts for the decoupling rationale (issue 0007). // logger (route/RAFT/JetStream errors), which is otherwise silenced. Off by
noLog, debugNATS, traceNATS, monitorNATS := natsLogOpts( // default so production behavior is unchanged; only set it when debugging the
os.Getenv("UNIBUS_NATS_DEBUG"), os.Getenv("UNIBUS_NATS_MONITOR")) // cluster route layer.
debugLevel := os.Getenv("UNIBUS_NATS_DEBUG")
debugNATS := debugLevel == "1" || debugLevel == "2"
traceNATS := debugLevel == "2"
opts := &server.Options{ opts := &server.Options{
JetStream: true, JetStream: true,
StoreDir: cfg.StoreDir, StoreDir: cfg.StoreDir,
@@ -143,17 +122,15 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
ServerName: cfg.ServerName, ServerName: cfg.ServerName,
DontListen: false, DontListen: false,
// Keep the embedded server quiet by default; the host app logs the URLs. // Keep the embedded server quiet by default; the host app logs the URLs.
NoLog: noLog, NoLog: !debugNATS,
Debug: debugNATS, Debug: debugNATS,
Trace: traceNATS, Trace: traceNATS,
Logtime: true, Logtime: true,
NoSigs: true, NoSigs: true,
} }
if monitorNATS { if debugNATS {
// Expose the nats-server monitoring endpoint on LOOPBACK ONLY (never public): // Expose the nats-server monitoring endpoint (loopback) so the operator can
// the operator (or a local metrics scraper) inspects /varz, /connz, /jsz, // inspect /jsz, /routez, /varz while debugging the cluster meta-group.
// /routez. The 127.0.0.1 bind is mandatory because this endpoint has no auth;
// it must stay unreachable from the network.
opts.HTTPHost = "127.0.0.1" opts.HTTPHost = "127.0.0.1"
opts.HTTPPort = 8222 opts.HTTPPort = 8222
} }
-134
View File
@@ -1,134 +0,0 @@
package embeddednats
import (
"io"
"net"
"net/http"
"testing"
"time"
)
// TestNatsLogOptsDecoupled is the core regression guard for issue 0007: turning
// on the monitoring endpoint must NEVER turn on the verbose nats-server debug log
// (which would leak room subjects/routing metadata to journald). It also checks
// the backward-compatible coupling (debug still implies monitoring) and the quiet
// default.
func TestNatsLogOptsDecoupled(t *testing.T) {
cases := []struct {
name string
debugEnv, monitorEnv string
noLog, debug, trace, monitor bool
}{
{"default off — quiet, no monitor", "", "", true, false, false, false},
{"monitor only — endpoint on, log stays quiet", "", "1", true, false, false, true},
{"debug implies monitor", "1", "", false, true, false, true},
{"trace implies debug+monitor", "2", "", false, true, true, true},
{"both set", "1", "1", false, true, false, true},
{"monitor garbage value ignored", "", "yes", true, false, false, false},
{"debug garbage value ignored", "true", "", true, false, false, false},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
noLog, debug, trace, monitor := natsLogOpts(c.debugEnv, c.monitorEnv)
if noLog != c.noLog || debug != c.debug || trace != c.trace || monitor != c.monitor {
t.Fatalf("natsLogOpts(%q,%q) = (noLog=%v debug=%v trace=%v monitor=%v), want (noLog=%v debug=%v trace=%v monitor=%v)",
c.debugEnv, c.monitorEnv, noLog, debug, trace, monitor,
c.noLog, c.debug, c.trace, c.monitor)
}
})
}
// Explicit golden assertion of the security property: monitor on, log off.
noLog, debug, _, monitor := natsLogOpts("", "1")
if !monitor {
t.Fatal("UNIBUS_NATS_MONITOR=1 must open the monitoring endpoint")
}
if debug || !noLog {
t.Fatalf("UNIBUS_NATS_MONITOR=1 must NOT enable the debug log (got debug=%v noLog=%v)", debug, noLog)
}
}
// TestMonitorEndpointLoopback boots a real embedded server with
// UNIBUS_NATS_MONITOR=1 (and DEBUG explicitly off) and proves the monitoring HTTP
// endpoint answers on loopback only — the exact contract the metrics scraper
// relies on. The pure decoupling check above already guarantees the log stays out
// of debug mode for this same env combination.
func TestMonitorEndpointLoopback(t *testing.T) {
t.Setenv("UNIBUS_NATS_DEBUG", "")
t.Setenv("UNIBUS_NATS_MONITOR", "1")
ns, err := StartServer(ServerConfig{
StoreDir: t.TempDir(),
Host: "127.0.0.1",
Port: freeLoopbackPort(t),
})
if err != nil {
t.Fatalf("start server with monitoring: %v", err)
}
defer func() { ns.Shutdown(); ns.WaitForShutdown() }()
addr := ns.MonitorAddr()
if addr == nil {
t.Fatal("monitoring endpoint not open with UNIBUS_NATS_MONITOR=1 (MonitorAddr is nil)")
}
if !addr.IP.IsLoopback() {
t.Fatalf("monitoring endpoint bound to %s, must be loopback only", addr.IP)
}
if addr.Port != 8222 {
t.Fatalf("monitoring endpoint on port %d, want the fixed loopback port 8222", addr.Port)
}
// /varz must answer 200 with a non-empty body on loopback.
url := "http://" + addr.String() + "/varz"
var resp *http.Response
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
resp, err = http.Get(url) //nolint:gosec // loopback monitoring endpoint, no auth by design
if err == nil {
break
}
time.Sleep(50 * time.Millisecond)
}
if err != nil {
t.Fatalf("GET %s: %v", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("GET %s -> %d, want 200", url, resp.StatusCode)
}
body, _ := io.ReadAll(resp.Body)
if len(body) == 0 {
t.Fatalf("GET %s returned an empty body", url)
}
}
// TestMonitorDisabledByDefault proves a server started without either toggle does
// NOT open the monitoring endpoint, so production stays closed unless opted in.
func TestMonitorDisabledByDefault(t *testing.T) {
t.Setenv("UNIBUS_NATS_DEBUG", "")
t.Setenv("UNIBUS_NATS_MONITOR", "")
ns, err := StartServer(ServerConfig{
StoreDir: t.TempDir(),
Host: "127.0.0.1",
Port: freeLoopbackPort(t),
})
if err != nil {
t.Fatalf("start server: %v", err)
}
defer func() { ns.Shutdown(); ns.WaitForShutdown() }()
if addr := ns.MonitorAddr(); addr != nil {
t.Fatalf("monitoring endpoint open (%s) without UNIBUS_NATS_MONITOR — must stay closed by default", addr)
}
}
func freeLoopbackPort(t *testing.T) int {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("free port: %v", err)
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port
}
-296
View File
@@ -1,296 +0,0 @@
package membership
import (
"crypto/rand"
"database/sql"
"encoding/hex"
"errors"
"fmt"
"strings"
"time"
)
// Invite is a single-use registration token the admin mints so a brand-new
// identity can join the bus allowlist WITHOUT the admin ever handling its
// private key (the wallet model: the key is born and stays on the user's
// device; only the public key is published, via POST /register).
//
// The admin fixes the handle and role at mint time; the registering client may
// NOT change them (no privilege escalation). Token is 32 random bytes in
// lowercase hex (64 chars). ExpiresAt and CreatedAt are RFC3339Nano UTC. Used
// flips to true the instant the invite is consumed, and an invite can be
// consumed at most once. The audit fields (UsedAt/UsedSignPub/UsedKexPub) are
// empty until the invite is consumed; they record which keys claimed it, so the
// link between an invite and the identity it created stays traceable even though
// the allowlist row itself stores only the signing key.
type Invite struct {
Token string `json:"token"`
Handle string `json:"handle"`
Role string `json:"role"`
ExpiresAt string `json:"expires_at"`
Used bool `json:"used"`
CreatedAt string `json:"created_at"`
// Audit (populated on consume; omitted on the wire while pending).
UsedAt string `json:"used_at,omitempty"`
UsedSignPub string `json:"used_sign_pub,omitempty"`
UsedKexPub string `json:"used_kex_pub,omitempty"`
}
// Invite-flow sentinels. They let callers (and the HTTP layer) map a failed
// consume to a precise status code without string-matching: an unknown token is
// ErrNotFound (reused from the store), a spent token is ErrInviteUsed, a
// past-deadline token is ErrInviteExpired. ErrUserExists (from users.go) is
// reused when the presented signing key is already registered.
var (
ErrInviteUsed = errors.New("membership: invite already used")
ErrInviteExpired = errors.New("membership: invite expired")
)
// defaultInviteTTL is the lifetime of an invite when the caller passes a
// non-positive ttlSecs. Seven days mirrors a typical "share this link this
// week" expectation while keeping the un-authenticated /register window bounded.
const defaultInviteTTL = 7 * 24 * time.Hour
// newInviteToken returns 32 cryptographically-random bytes as lowercase hex (64
// chars). The token IS the bearer secret that authorizes /register, so it must
// be unguessable; crypto/rand is the only acceptable source.
func newInviteToken() (string, error) {
b := make([]byte, 32)
if _, err := rand.Read(b); err != nil {
return "", fmt.Errorf("membership: generate invite token: %w", err)
}
return hex.EncodeToString(b), nil
}
// inviteTTL resolves a caller-supplied ttlSecs into a concrete duration,
// defaulting to defaultInviteTTL when non-positive.
func inviteTTL(ttlSecs int) time.Duration {
if ttlSecs <= 0 {
return defaultInviteTTL
}
return time.Duration(ttlSecs) * time.Second
}
// inviteIsExpired reports whether the RFC3339 expiry has passed. A token whose
// expiry cannot be parsed is treated as expired (fail closed): a corrupt
// deadline must never widen the unauthenticated registration window.
func inviteIsExpired(expiresAt string) bool {
exp, err := time.Parse(time.RFC3339Nano, expiresAt)
if err != nil {
return true
}
return time.Now().UTC().After(exp)
}
// validateInviteRole normalizes and validates the role an invite may carry. It
// mirrors AddUser: empty defaults to member, and only admin|member are allowed
// (an admin minting an admin invite is deliberate and permitted).
func validateInviteRole(role string) (string, error) {
if role == "" {
return RoleMember, nil
}
if role != RoleAdmin && role != RoleMember {
return "", fmt.Errorf("membership: invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember)
}
return role, nil
}
// ---- SQLite implementation ------------------------------------------------
// CreateInvite mints a single-use invite for a future user. handle is required;
// role defaults to member and must be admin|member. ttlSecs sets the lifetime
// (non-positive uses the 7-day default). The token is 32 random bytes in hex.
func (s *sqliteStore) CreateInvite(handle, role string, ttlSecs int) (Invite, error) {
if handle == "" {
return Invite{}, fmt.Errorf("membership: CreateInvite: handle required")
}
role, err := validateInviteRole(role)
if err != nil {
return Invite{}, err
}
token, err := newInviteToken()
if err != nil {
return Invite{}, err
}
now := time.Now().UTC()
inv := Invite{
Token: token,
Handle: handle,
Role: role,
ExpiresAt: now.Add(inviteTTL(ttlSecs)).Format(time.RFC3339Nano),
Used: false,
CreatedAt: now.Format(time.RFC3339Nano),
}
if _, err := s.db.Exec(
`INSERT INTO invites (token, handle, role, expires_at, used, created_at) VALUES (?, ?, ?, ?, 0, ?)`,
inv.Token, inv.Handle, inv.Role, inv.ExpiresAt, inv.CreatedAt,
); err != nil {
return Invite{}, fmt.Errorf("membership: insert invite: %w", err)
}
return inv, nil
}
// GetInvite returns the invite with the given token, or ErrNotFound (wrapped)
// when there is none.
func (s *sqliteStore) GetInvite(token string) (Invite, error) {
var inv Invite
var used int
var usedAt, usedSign, usedKex sql.NullString
err := s.db.QueryRow(
`SELECT token, handle, role, expires_at, used, created_at, used_at, used_sign_pub, used_kex_pub
FROM invites WHERE token = ?`, token,
).Scan(&inv.Token, &inv.Handle, &inv.Role, &inv.ExpiresAt, &used, &inv.CreatedAt, &usedAt, &usedSign, &usedKex)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, ErrNotFound)
}
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, err)
}
inv.Used = used != 0
inv.UsedAt, inv.UsedSignPub, inv.UsedKexPub = usedAt.String, usedSign.String, usedKex.String
return inv, nil
}
// ListInvites returns every invite ordered newest-first (by created_at). It
// includes consumed invites so the admin panel can show the full picture; the
// caller filters to "pending" when it wants only live links.
func (s *sqliteStore) ListInvites() ([]Invite, error) {
rows, err := s.db.Query(
`SELECT token, handle, role, expires_at, used, created_at, used_at, used_sign_pub, used_kex_pub
FROM invites ORDER BY created_at DESC, token`,
)
if err != nil {
return nil, fmt.Errorf("membership: list invites: %w", err)
}
defer rows.Close()
var out []Invite
for rows.Next() {
var inv Invite
var used int
var usedAt, usedSign, usedKex sql.NullString
if err := rows.Scan(&inv.Token, &inv.Handle, &inv.Role, &inv.ExpiresAt, &used, &inv.CreatedAt, &usedAt, &usedSign, &usedKex); err != nil {
return nil, fmt.Errorf("membership: scan invite: %w", err)
}
inv.Used = used != 0
inv.UsedAt, inv.UsedSignPub, inv.UsedKexPub = usedAt.String, usedSign.String, usedKex.String
out = append(out, inv)
}
return out, rows.Err()
}
// ConsumeInvite atomically validates and spends an invite, registering the
// presented signing key as a bus user with the invite's handle and role. It is
// the ONLY path that adds to the allowlist without an admin signature: the
// bearer token is the authorization, so the checks here are the security
// boundary.
//
// Atomicity (single transaction): the invite is marked used FIRST (guarded by
// `used = 0`, so two concurrent consumers cannot both win), then the user is
// inserted. A token that passes validation is therefore spent exactly once.
// Special case: if the signing key is already registered, the user INSERT hits
// the PRIMARY KEY and we return ErrUserExists — but the invite stays SPENT (we
// commit the mark), matching the JetStream backend's burn-on-claim semantics so
// the two stores behave identically. A genuine backend error rolls everything
// back, leaving the invite reusable.
func (s *sqliteStore) ConsumeInvite(token, signPub, kexPub string) error {
signPub = normalizeSignPub(signPub)
kexPub = normalizeSignPub(kexPub)
if signPub == "" {
return fmt.Errorf("membership: ConsumeInvite: sign_pub required")
}
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("membership: ConsumeInvite: begin: %w", err)
}
defer tx.Rollback()
var handle, role, expiresAt string
var used int
err = tx.QueryRow(
`SELECT handle, role, expires_at, used FROM invites WHERE token = ?`, token,
).Scan(&handle, &role, &expiresAt, &used)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrNotFound)
}
return fmt.Errorf("membership: consume invite %q: %w", token, err)
}
if used != 0 {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
}
if inviteIsExpired(expiresAt) {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteExpired)
}
// Mark used first, guarded by used = 0 so a concurrent consumer that already
// flipped it (rows affected = 0) is rejected as used rather than double-spending.
now := nowRFC3339()
res, err := tx.Exec(
`UPDATE invites SET used = 1, used_at = ?, used_sign_pub = ?, used_kex_pub = ? WHERE token = ? AND used = 0`,
now, signPub, kexPub, token,
)
if err != nil {
return fmt.Errorf("membership: consume invite %q: mark used: %w", token, err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("membership: consume invite %q: rows affected: %w", token, err)
}
if n == 0 {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
}
// Register the user with the invite-fixed handle and role.
_, err = tx.Exec(
`INSERT INTO users (sign_pub, handle, role, status, created_at) VALUES (?, ?, ?, ?, ?)`,
signPub, handle, role, StatusActive, now,
)
if err != nil {
// Already-registered key: the invite is still spent (commit the mark) so
// the burn-on-claim contract matches the KV store. Any other failure rolls back.
if isUniqueViolation(err) {
if cErr := tx.Commit(); cErr != nil {
return fmt.Errorf("membership: consume invite %q: commit: %w", token, cErr)
}
return ErrUserExists
}
return fmt.Errorf("membership: consume invite %q: insert user: %w", token, err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("membership: consume invite %q: commit: %w", token, err)
}
return nil
}
// CancelInvite removes a pending invite (the admin revoked the link before it
// was used). It hard-deletes the row; a consumed invite stays for audit only if
// the caller targets a pending token. Deleting an unknown token returns
// ErrNotFound so the HTTP layer can answer 404.
func (s *sqliteStore) CancelInvite(token string) error {
res, err := s.db.Exec(`DELETE FROM invites WHERE token = ?`, token)
if err != nil {
return fmt.Errorf("membership: cancel invite %q: %w", token, err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("membership: cancel invite %q: rows affected: %w", token, err)
}
if n == 0 {
return fmt.Errorf("membership: cancel invite %q: %w", token, ErrNotFound)
}
return nil
}
// isUniqueViolation reports whether err is a SQLite UNIQUE/PRIMARY KEY conflict.
// modernc.org/sqlite surfaces it as a message fragment; matching it here keeps
// the string-matching in one place (the same fragments AddUser checks inline).
func isUniqueViolation(err error) bool {
if err == nil {
return false
}
msg := err.Error()
return strings.Contains(msg, "UNIQUE constraint") || strings.Contains(msg, "PRIMARY KEY")
}
-194
View File
@@ -1,194 +0,0 @@
package membership
import (
"bytes"
"encoding/hex"
"encoding/json"
"io"
"net/http"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
)
// postRegister posts an UNSIGNED /register request (the wallet-model join: the
// new identity is not yet in the allowlist, so it cannot sign). It returns the
// status and body so a test can assert the precise code.
func postRegister(t *testing.T, h *authHarness, body registerReq) (int, string) {
t.Helper()
b, err := json.Marshal(body)
if err != nil {
t.Fatalf("marshal register: %v", err)
}
resp, err := http.Post(h.ts.URL+"/register", "application/json", bytes.NewReader(b))
if err != nil {
t.Fatalf("post register: %v", err)
}
defer resp.Body.Close()
rb, _ := io.ReadAll(resp.Body)
return resp.StatusCode, string(rb)
}
// TestInvitesHTTP_Golden is the end-to-end wallet-model flow over real HTTP:
// alice (admin) mints an invite, a brand-new identity redeems it UNSIGNED via
// /register, the user then appears in the admin allowlist, and a second redeem of
// the same token is rejected as used.
func TestInvitesHTTP_Golden(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
// Admin mints an invite.
var inv createInviteResp
code, body := signedJSON(t, h, "POST", "/invites",
createInviteReq{Handle: "newbie", Role: RoleMember}, h.alice, 1)
if code != http.StatusCreated {
t.Fatalf("admin create invite should be 201, got %d (%s)", code, body)
}
if err := json.Unmarshal([]byte(body), &inv); err != nil {
t.Fatalf("decode invite: %v (%s)", err, body)
}
if len(inv.Token) != 64 || inv.ExpiresAt == "" {
t.Fatalf("invite token/expiry malformed: %+v", inv)
}
// A brand-new identity redeems it WITHOUT any admin signature.
id, _ := cs.GenerateIdentity()
signPub := hex.EncodeToString(id.SignPub)
kexPub := hex.EncodeToString(id.KexPub)
if code, body := postRegister(t, h, registerReq{Token: inv.Token, SignPub: signPub, KexPub: kexPub}); code != http.StatusCreated {
t.Fatalf("register should be 201, got %d (%s)", code, body)
}
// The user now appears in the admin allowlist with the invite's handle/role.
users := listUsers(t, h, 2)
row, ok := findUser(users, signPub)
if !ok {
t.Fatalf("registered user missing from allowlist: %+v", users)
}
if row.Handle != "newbie" || row.Role != RoleMember || row.Status != StatusActive {
t.Fatalf("registered user row wrong: %+v", row)
}
// The invite is no longer pending.
if code, body := signedJSON(t, h, "GET", "/invites", nil, h.alice, 3); code == http.StatusOK {
var pend []inviteJSON
_ = json.Unmarshal([]byte(body), &pend)
for _, p := range pend {
if p.Token == inv.Token {
t.Fatalf("consumed invite should not be listed as pending: %+v", pend)
}
}
}
// Single-use: a second redeem of the same token is 409 used.
id2, _ := cs.GenerateIdentity()
if code, body := postRegister(t, h, registerReq{
Token: inv.Token, SignPub: hex.EncodeToString(id2.SignPub), KexPub: hex.EncodeToString(id2.KexPub),
}); code != http.StatusConflict {
t.Fatalf("second redeem should be 409, got %d (%s)", code, body)
}
}
// TestInvitesHTTP_RegisterValidation covers /register input + state errors: an
// unknown token is 404, an expired token is 410, and malformed hex keys are 400 —
// each WITHOUT registering anything.
func TestInvitesHTTP_RegisterValidation(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
id, _ := cs.GenerateIdentity()
signPub := hex.EncodeToString(id.SignPub)
kexPub := hex.EncodeToString(id.KexPub)
// Unknown token -> 404.
if code, body := postRegister(t, h, registerReq{Token: "deadbeef", SignPub: signPub, KexPub: kexPub}); code != http.StatusNotFound {
t.Fatalf("unknown token should be 404, got %d (%s)", code, body)
}
// Malformed sign_pub -> 400.
if code, body := postRegister(t, h, registerReq{Token: "x", SignPub: "abcd", KexPub: kexPub}); code != http.StatusBadRequest {
t.Fatalf("malformed sign_pub should be 400, got %d (%s)", code, body)
}
// Malformed kex_pub -> 400.
if code, body := postRegister(t, h, registerReq{Token: "x", SignPub: signPub, KexPub: "zzzz"}); code != http.StatusBadRequest {
t.Fatalf("malformed kex_pub should be 400, got %d (%s)", code, body)
}
// Expired token -> 410. Mint via the admin API, then force its deadline past
// directly in the store (white-box).
var inv createInviteResp
_, body := signedJSON(t, h, "POST", "/invites", createInviteReq{Handle: "late", Role: RoleMember}, h.alice, 1)
if err := json.Unmarshal([]byte(body), &inv); err != nil {
t.Fatalf("decode invite: %v (%s)", err, body)
}
ss, ok := h.store.(*sqliteStore)
if !ok {
t.Fatalf("expected sqliteStore harness")
}
past := time.Now().Add(-time.Hour).UTC().Format(time.RFC3339Nano)
if _, err := ss.db.Exec(`UPDATE invites SET expires_at = ? WHERE token = ?`, past, inv.Token); err != nil {
t.Fatalf("force expire: %v", err)
}
if code, rb := postRegister(t, h, registerReq{Token: inv.Token, SignPub: signPub, KexPub: kexPub}); code != http.StatusGone {
t.Fatalf("expired token should be 410, got %d (%s)", code, rb)
}
}
// TestInvitesHTTP_NonAdminForbidden is the security spine for the new endpoints:
// a REGISTERED non-admin (bob) is denied on POST /invites, GET /invites,
// DELETE /invites/{token}, and DELETE /users/{signpub} — each a 403 by role.
func TestInvitesHTTP_NonAdminForbidden(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
bob, _ := cs.GenerateIdentity()
register(t, h, bob, "bob") // role member
bobPub := hex.EncodeToString(bob.SignPub)
checks := []struct {
name string
method string
path string
body any
}{
{"create invite", "POST", "/invites", createInviteReq{Handle: "x", Role: RoleMember}},
{"list invites", "GET", "/invites", nil},
{"cancel invite", "DELETE", "/invites/sometoken", nil},
{"delete user", "DELETE", "/users/" + bobPub, nil},
}
for i, c := range checks {
code, body := signedJSON(t, h, c.method, c.path, c.body, bob, i+1)
if code != http.StatusForbidden {
t.Fatalf("non-admin %s should be 403, got %d (%s)", c.name, code, body)
}
}
}
// TestUsersHTTP_HardDelete proves DELETE /users/{signpub} purges a user (distinct
// from revoke's status flip): alice adds carol, hard-deletes her, and carol then
// vanishes from the allowlist entirely (not merely flagged revoked).
func TestUsersHTTP_HardDelete(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
carol, _ := cs.GenerateIdentity()
carolPub := hex.EncodeToString(carol.SignPub)
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: carolPub, Handle: "carol", Role: RoleMember}, h.alice, 1); code != http.StatusCreated {
t.Fatalf("add carol should be 201, got %d (%s)", code, body)
}
// Hard-delete carol.
if code, body := signedJSON(t, h, "DELETE", "/users/"+carolPub, nil, h.alice, 2); code != http.StatusOK {
t.Fatalf("hard-delete carol should be 200, got %d (%s)", code, body)
}
// She is gone entirely — not present in the list at all (vs revoke, which
// keeps her as status=revoked).
users := listUsers(t, h, 3)
if _, ok := findUser(users, carolPub); ok {
t.Fatalf("hard-deleted carol must NOT appear in the allowlist: %+v", users)
}
// Deleting her again is a 404.
if code, body := signedJSON(t, h, "DELETE", "/users/"+carolPub, nil, h.alice, 4); code != http.StatusNotFound {
t.Fatalf("re-delete should be 404, got %d (%s)", code, body)
}
}
-186
View File
@@ -1,186 +0,0 @@
package membership
import (
"encoding/hex"
"encoding/json"
"errors"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
)
// newIDHex generates a fresh identity and returns its signing and key-exchange
// public keys as lowercase hex — the two keys a client presents to /register.
func newIDHex(t *testing.T) (signPub, kexPub string) {
t.Helper()
id, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("identity: %v", err)
}
return hex.EncodeToString(id.SignPub), hex.EncodeToString(id.KexPub)
}
// inviteSuite drives the full invite lifecycle against any Store backend: mint,
// look up, redeem (which registers the user), reject a second redeem (single-use)
// and a non-existent token, reject an expired token (forced past via the
// backend-specific forceExpire closure), and hard-delete a user. It is shared by
// the SQLite and JetStream tests so both backends prove identical behavior.
func inviteSuite(t *testing.T, s Store, forceExpire func(token string)) {
t.Helper()
// Mint an invite fixing handle + role.
inv, err := s.CreateInvite("alice-new", RoleMember, 3600)
if err != nil {
t.Fatalf("CreateInvite: %v", err)
}
if len(inv.Token) != 64 {
t.Fatalf("token should be 64 hex chars, got %d (%q)", len(inv.Token), inv.Token)
}
if inv.Used {
t.Fatalf("fresh invite must not be used")
}
// GetInvite round-trips it.
got, err := s.GetInvite(inv.Token)
if err != nil || got.Handle != "alice-new" || got.Role != RoleMember {
t.Fatalf("GetInvite mismatch: %+v err=%v", got, err)
}
// Redeem it: the presented signing key joins the allowlist with the invite's
// handle and role.
signPub, kexPub := newIDHex(t)
if err := s.ConsumeInvite(inv.Token, signPub, kexPub); err != nil {
t.Fatalf("ConsumeInvite (golden): %v", err)
}
u, err := s.GetUser(signPub)
if err != nil {
t.Fatalf("GetUser after register: %v", err)
}
if u.Handle != "alice-new" || u.Role != RoleMember || u.Status != StatusActive {
t.Fatalf("registered user wrong: %+v", u)
}
if !s.IsAuthorized(signPub) {
t.Fatalf("registered user should be authorized")
}
// Single-use: redeeming the same token again (even with a different identity)
// is rejected as used.
sp2, kp2 := newIDHex(t)
if err := s.ConsumeInvite(inv.Token, sp2, kp2); !errors.Is(err, ErrInviteUsed) {
t.Fatalf("second redeem should be ErrInviteUsed, got %v", err)
}
if _, err := s.GetUser(sp2); !errors.Is(err, ErrNotFound) {
t.Fatalf("second identity must NOT be registered, got %v", err)
}
// Unknown token is ErrNotFound.
if err := s.ConsumeInvite("deadbeef", "ab", "cd"); !errors.Is(err, ErrNotFound) {
t.Fatalf("unknown token should be ErrNotFound, got %v", err)
}
// Expired invite: mint one, force its deadline into the past, redeem -> rejected.
exp, err := s.CreateInvite("late", RoleMember, 3600)
if err != nil {
t.Fatalf("CreateInvite expired: %v", err)
}
forceExpire(exp.Token)
sp3, kp3 := newIDHex(t)
if err := s.ConsumeInvite(exp.Token, sp3, kp3); !errors.Is(err, ErrInviteExpired) {
t.Fatalf("expired redeem should be ErrInviteExpired, got %v", err)
}
// CancelInvite removes a pending invite; redeeming it afterward is ErrNotFound.
canc, err := s.CreateInvite("cancelme", RoleMember, 3600)
if err != nil {
t.Fatalf("CreateInvite cancel: %v", err)
}
if err := s.CancelInvite(canc.Token); err != nil {
t.Fatalf("CancelInvite: %v", err)
}
if err := s.ConsumeInvite(canc.Token, sp3, kp3); !errors.Is(err, ErrNotFound) {
t.Fatalf("cancelled invite redeem should be ErrNotFound, got %v", err)
}
// Hard-delete the registered user: it disappears from the allowlist entirely.
if err := s.DeleteUser(signPub); err != nil {
t.Fatalf("DeleteUser: %v", err)
}
if _, err := s.GetUser(signPub); !errors.Is(err, ErrNotFound) {
t.Fatalf("deleted user should be ErrNotFound, got %v", err)
}
if s.IsAuthorized(signPub) {
t.Fatalf("deleted user must not be authorized")
}
// Deleting an unknown key is ErrNotFound.
if err := s.DeleteUser(signPub); !errors.Is(err, ErrNotFound) {
t.Fatalf("re-delete should be ErrNotFound, got %v", err)
}
}
// TestInvitesSQLite runs the suite against the default SQLite backend, forcing
// expiry with a direct UPDATE on the embedded DB (white-box, same package).
func TestInvitesSQLite(t *testing.T) {
s := openTestStore(t)
inviteSuite(t, s, func(token string) {
past := time.Now().Add(-time.Hour).UTC().Format(time.RFC3339Nano)
if _, err := s.db.Exec(`UPDATE invites SET expires_at = ? WHERE token = ?`, past, token); err != nil {
t.Fatalf("force expire: %v", err)
}
})
}
// TestInvitesJetStream runs the same suite against the replicated KV backend,
// forcing expiry by re-Putting the invite JSON with a past deadline.
func TestInvitesJetStream(t *testing.T) {
s, _, _ := newKVStore(t)
inviteSuite(t, s, func(token string) {
inv, err := s.GetInvite(token)
if err != nil {
t.Fatalf("force expire: get invite: %v", err)
}
inv.ExpiresAt = time.Now().Add(-time.Hour).UTC().Format(time.RFC3339Nano)
b, err := json.Marshal(inv)
if err != nil {
t.Fatalf("force expire: marshal: %v", err)
}
ctx, cancel := s.ctx()
defer cancel()
if _, err := s.invites.Put(ctx, token, b); err != nil {
t.Fatalf("force expire: put: %v", err)
}
})
}
// TestConsumeInvite_AlreadyRegistered covers the burn-on-claim edge: redeeming a
// valid invite with a signing key that is already registered surfaces
// ErrUserExists AND spends the invite (both backends behave identically).
func TestConsumeInvite_AlreadyRegistered(t *testing.T) {
for _, tc := range []struct {
name string
open func(t *testing.T) Store
}{
{"sqlite", func(t *testing.T) Store { return openTestStore(t) }},
{"jetstream", func(t *testing.T) Store { s, _, _ := newKVStore(t); return s }},
} {
t.Run(tc.name, func(t *testing.T) {
s := tc.open(t)
signPub, kexPub := newIDHex(t)
if err := s.AddUser(signPub, "existing", RoleMember); err != nil {
t.Fatalf("seed user: %v", err)
}
inv, err := s.CreateInvite("dup", RoleMember, 3600)
if err != nil {
t.Fatalf("CreateInvite: %v", err)
}
if err := s.ConsumeInvite(inv.Token, signPub, kexPub); !errors.Is(err, ErrUserExists) {
t.Fatalf("redeem with registered key should be ErrUserExists, got %v", err)
}
// The invite is spent (burn-on-claim): a fresh identity cannot reuse it.
sp2, kp2 := newIDHex(t)
if err := s.ConsumeInvite(inv.Token, sp2, kp2); !errors.Is(err, ErrInviteUsed) {
t.Fatalf("invite should be spent after a burned claim, got %v", err)
}
})
}
}
-192
View File
@@ -50,7 +50,6 @@ const (
bucketByMember = "UNIBUS_rooms_by_member" bucketByMember = "UNIBUS_rooms_by_member"
bucketRoomKeys = "UNIBUS_room_keys" bucketRoomKeys = "UNIBUS_room_keys"
bucketUsers = "UNIBUS_users" bucketUsers = "UNIBUS_users"
bucketInvites = "UNIBUS_invites"
defaultKVOpTime = 5 * time.Second defaultKVOpTime = 5 * time.Second
) )
@@ -72,7 +71,6 @@ type jetstreamStore struct {
byMember jetstream.KeyValue byMember jetstream.KeyValue
keys jetstream.KeyValue keys jetstream.KeyValue
users jetstream.KeyValue users jetstream.KeyValue
invites jetstream.KeyValue
opTimeout time.Duration opTimeout time.Duration
} }
@@ -110,7 +108,6 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
{bucketByMember, &s.byMember}, {bucketByMember, &s.byMember},
{bucketRoomKeys, &s.keys}, {bucketRoomKeys, &s.keys},
{bucketUsers, &s.users}, {bucketUsers, &s.users},
{bucketInvites, &s.invites},
} { } {
var kv jetstream.KeyValue var kv jetstream.KeyValue
var lastErr error var lastErr error
@@ -501,28 +498,6 @@ func (s *jetstreamStore) RevokeUser(signPub string) error {
return nil return nil
} }
// DeleteUser hard-deletes a user from the KV allowlist (the purge counterpart of
// RevokeUser's status flip). It checks existence first so deleting an unknown key
// is ErrNotFound (KV Delete is otherwise idempotent and would not signal a miss).
// Only the allowlist key is removed; room memberships the ex-user holds become
// inert because they can no longer authenticate — see the SQLite DeleteUser for
// the full rationale on why room state is left untouched.
func (s *jetstreamStore) DeleteUser(signPub string) error {
signPub = normalizeSignPub(signPub)
ctx, cancel := s.ctx()
defer cancel()
if _, err := s.users.Get(ctx, signPub); err != nil {
if errors.Is(err, jetstream.ErrKeyNotFound) {
return fmt.Errorf("membership: delete user %q: %w", signPub, ErrNotFound)
}
return fmt.Errorf("membership: delete user %q: %w", signPub, err)
}
if err := s.users.Delete(ctx, signPub); err != nil {
return fmt.Errorf("membership: delete user %q: %w", signPub, err)
}
return nil
}
// IsAuthorized reports whether signPub is an active bus user. Any backend error // IsAuthorized reports whether signPub is an active bus user. Any backend error
// (including a KV quorum loss or timeout) yields false: fail closed. // (including a KV quorum loss or timeout) yields false: fail closed.
func (s *jetstreamStore) IsAuthorized(signPub string) bool { func (s *jetstreamStore) IsAuthorized(signPub string) bool {
@@ -558,173 +533,6 @@ func (s *jetstreamStore) HasAdmin() bool {
return false return false
} }
// ---- invites (single-use registration tokens) ----------------------------
func (s *jetstreamStore) CreateInvite(handle, role string, ttlSecs int) (Invite, error) {
if handle == "" {
return Invite{}, fmt.Errorf("membership: CreateInvite: handle required")
}
role, err := validateInviteRole(role)
if err != nil {
return Invite{}, err
}
token, err := newInviteToken()
if err != nil {
return Invite{}, err
}
now := time.Now().UTC()
inv := Invite{
Token: token,
Handle: handle,
Role: role,
ExpiresAt: now.Add(inviteTTL(ttlSecs)).Format(time.RFC3339Nano),
Used: false,
CreatedAt: now.Format(time.RFC3339Nano),
}
b, err := json.Marshal(inv)
if err != nil {
return Invite{}, fmt.Errorf("membership: marshal invite: %w", err)
}
ctx, cancel := s.ctx()
defer cancel()
// Create (not Put) so a token collision is rejected rather than silently
// overwriting a live invite — a 32-byte random collision is astronomically
// unlikely, but Create makes the single-use guarantee unconditional.
if _, err := s.invites.Create(ctx, token, b); err != nil {
if errors.Is(err, jetstream.ErrKeyExists) {
return Invite{}, fmt.Errorf("membership: create invite: token collision")
}
return Invite{}, fmt.Errorf("membership: create invite: %w", err)
}
return inv, nil
}
func (s *jetstreamStore) GetInvite(token string) (Invite, error) {
ctx, cancel := s.ctx()
defer cancel()
e, err := s.invites.Get(ctx, token)
if err != nil {
if errors.Is(err, jetstream.ErrKeyNotFound) {
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, ErrNotFound)
}
return Invite{}, fmt.Errorf("membership: get invite %q: %w", token, err)
}
var inv Invite
if err := json.Unmarshal(e.Value(), &inv); err != nil {
return Invite{}, fmt.Errorf("membership: unmarshal invite: %w", err)
}
return inv, nil
}
func (s *jetstreamStore) ListInvites() ([]Invite, error) {
ctx, cancel := s.ctx()
w, err := s.invites.WatchAll(ctx, jetstream.IgnoreDeletes())
if err != nil {
cancel()
return nil, fmt.Errorf("membership: list invites: %w", err)
}
defer cancel()
defer w.Stop()
var out []Invite
for {
select {
case e := <-w.Updates():
if e == nil {
sort.Slice(out, func(i, j int) bool {
if out[i].CreatedAt != out[j].CreatedAt {
return out[i].CreatedAt > out[j].CreatedAt // newest first
}
return out[i].Token < out[j].Token
})
return out, nil
}
var inv Invite
if err := json.Unmarshal(e.Value(), &inv); err != nil {
return nil, fmt.Errorf("membership: unmarshal invite: %w", err)
}
out = append(out, inv)
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// ConsumeInvite spends a KV invite and registers the presented signing key. With
// no multi-key transaction, single-use is enforced by a compare-and-swap on the
// invite: the token is marked used via Update against the revision read by Get,
// so only ONE concurrent consumer can win the swap; the loser sees a revision
// mismatch and is rejected as used. The user is registered AFTER the successful
// swap. Burn-on-claim: if the signing key is already registered the swap has
// already spent the token and we surface ErrUserExists — the SQLite store commits
// the same way, so both backends behave identically.
func (s *jetstreamStore) ConsumeInvite(token, signPub, kexPub string) error {
signPub = normalizeSignPub(signPub)
kexPub = normalizeSignPub(kexPub)
if signPub == "" {
return fmt.Errorf("membership: ConsumeInvite: sign_pub required")
}
ctx, cancel := s.ctx()
defer cancel()
e, err := s.invites.Get(ctx, token)
if err != nil {
if errors.Is(err, jetstream.ErrKeyNotFound) {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrNotFound)
}
return fmt.Errorf("membership: consume invite %q: %w", token, err)
}
var inv Invite
if err := json.Unmarshal(e.Value(), &inv); err != nil {
return fmt.Errorf("membership: unmarshal invite: %w", err)
}
if inv.Used {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
}
if inviteIsExpired(inv.ExpiresAt) {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteExpired)
}
inv.Used = true
inv.UsedAt = nowRFC3339()
inv.UsedSignPub = signPub
inv.UsedKexPub = kexPub
b, err := json.Marshal(inv)
if err != nil {
return fmt.Errorf("membership: marshal invite: %w", err)
}
// CAS: Update only succeeds if the invite is still at the revision we read, so
// a racing consumer that already flipped it loses here. A failed swap is
// conservatively treated as "already used" (the common cause); the caller can
// re-read to learn the precise state.
if _, err := s.invites.Update(ctx, token, b, e.Revision()); err != nil {
return fmt.Errorf("membership: consume invite %q: %w", token, ErrInviteUsed)
}
// Token is now spent. Register the user with the invite-fixed handle and role.
if err := s.AddUser(signPub, inv.Handle, inv.Role); err != nil {
if errors.Is(err, ErrUserExists) {
return ErrUserExists
}
return fmt.Errorf("membership: consume invite %q: register user: %w", token, err)
}
return nil
}
func (s *jetstreamStore) CancelInvite(token string) error {
ctx, cancel := s.ctx()
defer cancel()
if _, err := s.invites.Get(ctx, token); err != nil {
if errors.Is(err, jetstream.ErrKeyNotFound) {
return fmt.Errorf("membership: cancel invite %q: %w", token, ErrNotFound)
}
return fmt.Errorf("membership: cancel invite %q: %w", token, err)
}
if err := s.invites.Delete(ctx, token); err != nil {
return fmt.Errorf("membership: cancel invite %q: %w", token, err)
}
return nil
}
// ---- snapshot import / export (issue 0003c migration) --------------------- // ---- snapshot import / export (issue 0003c migration) ---------------------
// importSnapshot writes a full Snapshot into the KV buckets, preserving each // importSnapshot writes a full Snapshot into the KV buckets, preserving each
-28
View File
@@ -1,28 +0,0 @@
-- 003_invites.sql — single-use registration invites (issue: user accounts / wallet model).
--
-- An admin mints an invite so a brand-new identity can join the bus allowlist
-- WITHOUT the admin ever handling its private key. The token is the bearer
-- secret that authorizes POST /register: the registering client generates its
-- keypair locally and publishes only its public keys, fixing the link between an
-- invite and the identity it creates via the audit columns below. The handle and
-- role are fixed by the admin at mint time and cannot be changed by the client
-- (no privilege escalation).
--
-- Additive and idempotent: safe to apply repeatedly. Never modify this file;
-- further schema changes go in new numbered migrations (see
-- .claude/rules/db_migrations.md). The embedded copy under
-- pkg/membership/migrations/003_invites.sql mirrors this file byte-for-byte.
CREATE TABLE IF NOT EXISTS invites (
token TEXT PRIMARY KEY, -- 32 random bytes in lowercase hex (the bearer secret)
handle TEXT NOT NULL, -- handle the new user will get (fixed by admin)
role TEXT NOT NULL DEFAULT 'member', -- 'admin' | 'member' (fixed by admin)
expires_at TEXT NOT NULL, -- RFC3339; past this the invite is dead
used INTEGER NOT NULL DEFAULT 0, -- 0 pending, 1 consumed (single-use)
created_at TEXT NOT NULL,
used_at TEXT, -- RFC3339 when consumed (NULL until used)
used_sign_pub TEXT, -- Ed25519 key that consumed it (audit; NULL until used)
used_kex_pub TEXT -- X25519 key presented at registration (audit; NULL until used)
);
CREATE INDEX IF NOT EXISTS idx_invites_used ON invites(used);
+7 -233
View File
@@ -144,11 +144,9 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
now := time.Now() now := time.Now()
// Per-IP rate limit runs first, ahead of auth and body reads, so a flood is // Per-IP rate limit runs first, ahead of auth and body reads, so a flood is
// shed at the cheapest possible point. ONLY the health probe is exempt so // shed at the cheapest possible point. The health probe is exempt so liveness
// liveness checks are never throttled — note this is isRateExempt, NOT // checks are never throttled.
// isAuthExempt: POST /register is auth-exempt (no admin signature) but stays if !isAuthExempt(r) && !s.limiter.allow(clientIP(r), now) {
// rate-limited, since it is the one un-signed path that mutates the allowlist.
if !isRateExempt(r) && !s.limiter.allow(clientIP(r), now) {
writeErr(w, http.StatusTooManyRequests, "rate limit exceeded") writeErr(w, http.StatusTooManyRequests, "rate limit exceeded")
return return
} }
@@ -310,27 +308,11 @@ func (s *Server) requireAdmin(w http.ResponseWriter, r *http.Request) (string, b
return pubHex, true return pubHex, true
} }
// isRateExempt lists requests that bypass the per-IP rate limiter. Only the // isAuthExempt lists requests that bypass control-plane auth even under enforce.
// health probe qualifies: a load balancer / systemd / smoke check polls it and // Only the unauthenticated health probe qualifies: it carries no data and is
// must never be throttled. Everything else — including POST /register — is rate // needed by load balancers / smoke checks / systemd before any identity exists.
// limited.
func isRateExempt(r *http.Request) bool {
return r.Method == http.MethodGet && r.URL.Path == "/healthz"
}
// isAuthExempt lists requests that bypass control-plane signature auth even under
// enforce. Two qualify:
// - GET /healthz: carries no data, needed before any identity exists.
// - POST /register: the wallet-model join path. The registering identity is not
// yet in the allowlist, so it CANNOT produce an accepted admin signature;
// authorization is the single-use bearer invite token, validated inside the
// handler (ConsumeInvite). It stays rate-limited (see isRateExempt) and
// strictly validates the hex keys before spending the token.
func isAuthExempt(r *http.Request) bool { func isAuthExempt(r *http.Request) bool {
if r.Method == http.MethodGet && r.URL.Path == "/healthz" { return r.Method == http.MethodGet && r.URL.Path == "/healthz"
return true
}
return r.Method == http.MethodPost && r.URL.Path == "/register"
} }
func (s *Server) routes() { func (s *Server) routes() {
@@ -351,16 +333,6 @@ func (s *Server) routes() {
s.mux.HandleFunc("GET /users", s.handleListUsers) s.mux.HandleFunc("GET /users", s.handleListUsers)
s.mux.HandleFunc("POST /users", s.handleAddUser) s.mux.HandleFunc("POST /users", s.handleAddUser)
s.mux.HandleFunc("POST /users/{signpub}/revoke", s.handleRevokeUser) s.mux.HandleFunc("POST /users/{signpub}/revoke", s.handleRevokeUser)
// Hard-delete (purge) a user — distinct from revoke (status flip). Admin-only.
s.mux.HandleFunc("DELETE /users/{signpub}", s.handleDeleteUser)
// Invites — the wallet-model account-creation path. The admin mints a
// single-use link (POST /invites, admin-only); the new user's client redeems
// it without an admin signature (POST /register, token-authorized). Listing
// and cancelling a pending invite are admin-only.
s.mux.HandleFunc("POST /invites", s.handleCreateInvite)
s.mux.HandleFunc("GET /invites", s.handleListInvites)
s.mux.HandleFunc("DELETE /invites/{token}", s.handleCancelInvite)
s.mux.HandleFunc("POST /register", s.handleRegister)
} }
// ---- wire types ----------------------------------------------------------- // ---- wire types -----------------------------------------------------------
@@ -459,46 +431,6 @@ type addUserReq struct {
Role string `json:"role"` Role string `json:"role"`
} }
// createInviteReq is the POST /invites body (admin-only): the handle and role the
// future user will receive (fixed here, NOT chosen by the registering client) and
// an optional TTL in seconds (non-positive uses the 7-day default).
type createInviteReq struct {
Handle string `json:"handle"`
Role string `json:"role"`
TTLSecs int `json:"ttl_secs"`
}
// createInviteResp is the POST /invites reply: the bearer token to put in the
// join link and its absolute expiry. The token is shown ONCE here; the admin
// copies the link immediately.
type createInviteResp struct {
Token string `json:"token"`
ExpiresAt string `json:"expires_at"`
}
// inviteJSON is the wire representation of a pending invite on GET /invites. It
// omits the audit fields (used_*) because the listing is of pending invites only;
// used_at is carried so a client can render "expires in N".
type inviteJSON struct {
Token string `json:"token"`
Handle string `json:"handle"`
Role string `json:"role"`
ExpiresAt string `json:"expires_at"`
Used bool `json:"used"`
CreatedAt string `json:"created_at"`
}
// registerReq is the POST /register body. It is the ONLY allowlist-mutating
// request that carries no admin signature: the bearer Token authorizes it. The
// client supplies its freshly-generated public keys (sign_pub = Ed25519 identity,
// kex_pub = X25519 key-exchange), both 64-hex. The handle and role come from the
// invite, never from this body — the client cannot escalate.
type registerReq struct {
Token string `json:"token"`
SignPub string `json:"sign_pub"`
KexPub string `json:"kex_pub"`
}
// ---- helpers -------------------------------------------------------------- // ---- helpers --------------------------------------------------------------
func writeJSON(w http.ResponseWriter, code int, v any) { func writeJSON(w http.ResponseWriter, code int, v any) {
@@ -908,161 +840,3 @@ func (s *Server) handleRevokeUser(w http.ResponseWriter, r *http.Request) {
} }
writeJSON(w, http.StatusOK, map[string]string{"status": "revoked"}) writeJSON(w, http.StatusOK, map[string]string{"status": "revoked"})
} }
// handleDeleteUser hard-deletes a bus user by signing key — the purge that the
// admin panel's "Eliminar" (permanent) action maps to, distinct from revoke's
// status flip. The row is removed entirely (no audit trail kept); use revoke when
// an auditable record must remain. Deleting an unknown key is a 404. Admin-only.
//
// Security note: like revoke, this does NOT special-case the last admin — an
// admin can delete the final admin and lock the HTTP user-management surface. The
// recovery seam is the local `membershipd user add` CLI (which re-seeds an admin
// directly against the store), the same chicken-egg breaker that seeds the first
// admin.
func (s *Server) handleDeleteUser(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
signPub := r.PathValue("signpub")
if err := ValidateSignPubHex(signPub); err != nil {
writeErr(w, http.StatusBadRequest, err.Error())
return
}
if err := s.store.DeleteUser(signPub); err != nil {
if errors.Is(err, ErrNotFound) {
writeErr(w, http.StatusNotFound, "no user with that key")
return
}
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "deleted"})
}
// ---- invite handlers ------------------------------------------------------
// handleCreateInvite mints a single-use registration invite. The handle and role
// are fixed here by the admin; the role is validated (admin|member, empty ->
// member) so an unknown role is a clean 400 rather than an opaque 500. The reply
// carries the bearer token and its expiry — the admin turns the token into the
// join link. Admin-only.
func (s *Server) handleCreateInvite(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
var req createInviteReq
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
return
}
if req.Handle == "" {
writeErr(w, http.StatusBadRequest, "handle required")
return
}
if req.Role != "" && req.Role != RoleAdmin && req.Role != RoleMember {
writeErr(w, http.StatusBadRequest,
fmt.Sprintf("invalid role %q (want %q or %q)", req.Role, RoleAdmin, RoleMember))
return
}
inv, err := s.store.CreateInvite(req.Handle, req.Role, req.TTLSecs)
if err != nil {
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusCreated, createInviteResp{Token: inv.Token, ExpiresAt: inv.ExpiresAt})
}
// handleListInvites returns the PENDING invites (not yet used and not expired), so
// the admin panel shows only live links worth copying. Consumed/expired invites
// are filtered out here rather than at the store, which exposes the full set for
// other callers. Admin-only.
func (s *Server) handleListInvites(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
invites, err := s.store.ListInvites()
if err != nil {
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
out := make([]inviteJSON, 0, len(invites))
for _, inv := range invites {
if inv.Used || inviteIsExpired(inv.ExpiresAt) {
continue // pending only
}
out = append(out, inviteJSON{
Token: inv.Token,
Handle: inv.Handle,
Role: inv.Role,
ExpiresAt: inv.ExpiresAt,
Used: inv.Used,
CreatedAt: inv.CreatedAt,
})
}
writeJSON(w, http.StatusOK, out)
}
// handleCancelInvite cancels (hard-deletes) a pending invite, so an admin can
// revoke a link before it is redeemed. Cancelling an unknown token is a 404.
// Admin-only.
func (s *Server) handleCancelInvite(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
token := r.PathValue("token")
if token == "" {
writeErr(w, http.StatusBadRequest, "token required")
return
}
if err := s.store.CancelInvite(token); err != nil {
if errors.Is(err, ErrNotFound) {
writeErr(w, http.StatusNotFound, "no such invite")
return
}
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "cancelled"})
}
// handleRegister redeems an invite: the wallet-model join path. It is auth-exempt
// (no admin signature; see isAuthExempt) but rate-limited and strictly validated.
// The client presents the single-use token plus its freshly-generated public keys
// (sign_pub Ed25519, kex_pub X25519). Both keys are validated as 64-hex BEFORE the
// token is spent, the handle and role come from the invite (never this body), and
// ConsumeInvite enforces single-use atomically. Errors map to precise codes so a
// client can tell "unknown" from "used" from "expired".
func (s *Server) handleRegister(w http.ResponseWriter, r *http.Request) {
var req registerReq
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
return
}
if req.Token == "" {
writeErr(w, http.StatusBadRequest, "token required")
return
}
if err := ValidateSignPubHex(req.SignPub); err != nil {
writeErr(w, http.StatusBadRequest, err.Error())
return
}
if err := ValidateKexPubHex(req.KexPub); err != nil {
writeErr(w, http.StatusBadRequest, err.Error())
return
}
err := s.store.ConsumeInvite(req.Token, req.SignPub, req.KexPub)
switch {
case err == nil:
writeJSON(w, http.StatusCreated, map[string]string{"status": "registered"})
case errors.Is(err, ErrNotFound):
writeErr(w, http.StatusNotFound, "invalid or unknown invite token")
case errors.Is(err, ErrInviteUsed):
writeErr(w, http.StatusConflict, "invite already used")
case errors.Is(err, ErrInviteExpired):
writeErr(w, http.StatusGone, "invite expired")
case errors.Is(err, ErrUserExists):
writeErr(w, http.StatusConflict, "identity already registered")
default:
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
}
}
-14
View File
@@ -80,23 +80,9 @@ type Store interface {
GetUser(signPub string) (User, error) GetUser(signPub string) (User, error)
ListUsers() ([]User, error) ListUsers() ([]User, error)
RevokeUser(signPub string) error RevokeUser(signPub string) error
// DeleteUser hard-deletes a user (the purge counterpart of RevokeUser's
// status flip): the row is removed, not just flagged. The ex-user can no
// longer authenticate, so any room memberships they hold become inert.
DeleteUser(signPub string) error
IsAuthorized(signPub string) bool IsAuthorized(signPub string) bool
HasAdmin() bool HasAdmin() bool
// Invites (single-use registration tokens; the wallet-model join path).
// CreateInvite mints a token fixing handle+role; ConsumeInvite is the only
// path that adds to the allowlist without an admin signature (the bearer
// token is the authorization), spending the token exactly once.
CreateInvite(handle, role string, ttlSecs int) (Invite, error)
GetInvite(token string) (Invite, error)
ListInvites() ([]Invite, error)
ConsumeInvite(token, signPub, kexPub string) error
CancelInvite(token string) error
// Lifecycle. // Lifecycle.
Close() error Close() error
} }
+2 -49
View File
@@ -53,23 +53,6 @@ func ValidateSignPubHex(signPub string) error {
return nil return nil
} }
// ValidateKexPubHex ensures kexPub is exactly a 32-byte X25519 public key in hex
// (64 hex chars). It is the registration-side counterpart of ValidateSignPubHex:
// POST /register receives both the new identity's signing key and its key-exchange
// key, and both must be well-formed before the invite is consumed. An X25519
// public key is 32 bytes, identical in length to Ed25519, so the check is the
// same shape with a key-exchange-specific message.
func ValidateKexPubHex(kexPub string) error {
b, err := hex.DecodeString(kexPub)
if err != nil {
return fmt.Errorf("kex-pub is not valid hex: %w", err)
}
if len(b) != 32 {
return fmt.Errorf("kex-pub must be a 32-byte X25519 public key (64 hex chars), got %d bytes", len(b))
}
return nil
}
// normalizeSignPub lowercases the hex key so lookups are case-insensitive: the // normalizeSignPub lowercases the hex key so lookups are case-insensitive: the
// primary key is stored lowercase and every query normalizes its input the same // primary key is stored lowercase and every query normalizes its input the same
// way, so a caller passing uppercase hex still matches. // way, so a caller passing uppercase hex still matches.
@@ -107,10 +90,8 @@ func (s *sqliteStore) AddUser(signPub, handle, role string) error {
return nil return nil
} }
// GetUser returns the user with the given signing public key. A miss returns // GetUser returns the user with the given signing public key. It returns
// ErrNotFound (wrapped), matching the storage-agnostic contract in store.go and // sql.ErrNoRows (wrapped) when there is no such user.
// the JetStream backend, so callers can branch on ErrNotFound regardless of which
// store is active (the SQLite-specific sql.ErrNoRows is mapped here).
func (s *sqliteStore) GetUser(signPub string) (User, error) { func (s *sqliteStore) GetUser(signPub string) (User, error) {
signPub = normalizeSignPub(signPub) signPub = normalizeSignPub(signPub)
var u User var u User
@@ -120,9 +101,6 @@ func (s *sqliteStore) GetUser(signPub string) (User, error) {
signPub, signPub,
).Scan(&u.SignPub, &u.Handle, &u.Role, &u.Status, &u.CreatedAt, &revoked) ).Scan(&u.SignPub, &u.Handle, &u.Role, &u.Status, &u.CreatedAt, &revoked)
if err != nil { if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, ErrNotFound)
}
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, err) return User{}, fmt.Errorf("membership: get user %q: %w", signPub, err)
} }
u.RevokedAt = revoked.String u.RevokedAt = revoked.String
@@ -175,31 +153,6 @@ func (s *sqliteStore) RevokeUser(signPub string) error {
return nil return nil
} }
// DeleteUser hard-deletes a user from the allowlist (admin "remove user", the
// purge counterpart of RevokeUser's status flip). It removes ONLY the allowlist
// row: the ex-user can no longer authenticate on either plane, so any room
// memberships they still hold become inert (they cannot fetch a sealed key, sign
// a request, or open a NATS connection). We deliberately do NOT chase down and
// rewrite those room memberships here — that would be a partial, racy cleanup of
// state owned by each room's owner; a room owner kicks/rekeys to achieve forward
// secrecy when needed. Deleting an unknown key returns ErrNotFound (wrapped) so
// the HTTP layer can answer 404.
func (s *sqliteStore) DeleteUser(signPub string) error {
signPub = normalizeSignPub(signPub)
res, err := s.db.Exec(`DELETE FROM users WHERE sign_pub = ?`, signPub)
if err != nil {
return fmt.Errorf("membership: delete user %q: %w", signPub, err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("membership: delete user %q: rows affected: %w", signPub, err)
}
if n == 0 {
return fmt.Errorf("membership: delete user %q: %w", signPub, ErrNotFound)
}
return nil
}
// IsAuthorized reports whether signPub belongs to an active (non-revoked) bus // IsAuthorized reports whether signPub belongs to an active (non-revoked) bus
// user. It is the single authorization predicate consulted by both the control // user. It is the single authorization predicate consulted by both the control
// plane (HTTP request middleware) and the data plane (NATS nkey authenticator), // plane (HTTP request middleware) and the data plane (NATS nkey authenticator),
+35 -2
View File
@@ -1,11 +1,44 @@
import { useState } from "react"; import { useEffect, useState } from "react";
import { Center, Loader } from "@mantine/core";
import { Login } from "./Login"; import { Login } from "./Login";
import { ChatShell } from "./ChatShell"; import { ChatShell } from "./ChatShell";
import { api } from "./api";
import type { User } from "./types"; import type { User } from "./types";
// shortEndpoint hace legible el endpoint id del operador para mostrarlo como
// handle por defecto cuando no se escribió uno en el login.
function shortEndpoint(ep: string) {
return ep.slice(0, 8);
}
export function App() { export function App() {
const [user, setUser] = useState<User | null>(null); const [user, setUser] = useState<User | null>(null);
const [checking, setChecking] = useState(true);
// Al montar, comprueba si ya hay una sesión viva en el gateway (cookie). Si la
// hay, entra directo; si no (401), muestra el login.
useEffect(() => {
api
.me()
.then((me) =>
setUser({ id: me.endpoint, handle: shortEndpoint(me.endpoint) }),
)
.catch(() => {})
.finally(() => setChecking(false));
}, []);
const logout = () => {
void api.logout().catch(() => {});
setUser(null);
};
if (checking) {
return (
<Center h="100vh" bg="dark.9">
<Loader color="brand" />
</Center>
);
}
if (!user) return <Login onLogin={setUser} />; if (!user) return <Login onLogin={setUser} />;
return <ChatShell user={user} onLogout={() => setUser(null)} />; return <ChatShell user={user} onLogout={logout} />;
} }
+38 -23
View File
@@ -19,7 +19,8 @@ import {
IconDotsVertical, IconDotsVertical,
IconPaperclip, IconPaperclip,
} from "@tabler/icons-react"; } from "@tabler/icons-react";
import type { Message, Room, User } from "./types"; import { api, streamRoom } from "./api";
import type { Message, Room } from "./types";
function initials(s: string) { function initials(s: string) {
return s.replace(/[^a-z0-9]/gi, "").slice(0, 2).toUpperCase() || "?"; return s.replace(/[^a-z0-9]/gi, "").slice(0, 2).toUpperCase() || "?";
@@ -54,22 +55,30 @@ function MessageRow({ msg }: { msg: Message }) {
); );
} }
export function ChatPanel({ export function ChatPanel({ room }: { room: Room | undefined }) {
room,
user,
}: {
room: Room | undefined;
user: User;
}) {
const [draft, setDraft] = useState(""); const [draft, setDraft] = useState("");
const [extra, setExtra] = useState<Record<string, Message[]>>({}); const [messages, setMessages] = useState<Message[]>([]);
const [sendError, setSendError] = useState<string | null>(null);
const viewport = useRef<HTMLDivElement>(null); const viewport = useRef<HTMLDivElement>(null);
const msgs = room ? [...room.messages, ...(extra[room.id] ?? [])] : []; // Abre el stream SSE de la room activa. El gateway entrega historia (rooms
// persistidas) y luego mensajes en vivo, ya descifrados. Dedup por id porque
// un re-render no debe duplicar y el eco del propio envío llega por aquí.
useEffect(() => {
setMessages([]);
setSendError(null);
if (!room) return;
const close = streamRoom(room.id, (m) => {
setMessages((prev) =>
prev.some((p) => p.id === m.id) ? prev : [...prev, m],
);
});
return close;
}, [room?.id]);
useEffect(() => { useEffect(() => {
viewport.current?.scrollTo({ top: viewport.current.scrollHeight }); viewport.current?.scrollTo({ top: viewport.current.scrollHeight });
}, [room?.id, msgs.length]); }, [room?.id, messages.length]);
if (!room) { if (!room) {
return ( return (
@@ -79,18 +88,19 @@ export function ChatPanel({
); );
} }
const send = () => { const send = async () => {
const body = draft.trim(); const body = draft.trim();
if (!body) return; if (!body) return;
const msg: Message = {
id: `local-${Date.now()}`,
sender: user.handle,
body,
ts: Date.now(),
mine: true,
};
setExtra((e) => ({ ...e, [room.id]: [...(e[room.id] ?? []), msg] }));
setDraft(""); setDraft("");
setSendError(null);
try {
// No optimista: el mensaje propio vuelve por SSE con su id real (mine:true),
// evitando duplicados.
await api.send(room.id, body);
} catch (e) {
setDraft(body); // restaura el borrador si el envío falló
setSendError(e instanceof Error ? e.message : "No se pudo enviar");
}
}; };
return ( return (
@@ -126,13 +136,18 @@ export function ChatPanel({
<ScrollArea style={{ flex: 1 }} viewportRef={viewport}> <ScrollArea style={{ flex: 1 }} viewportRef={viewport}>
<Stack gap="lg" p="md"> <Stack gap="lg" p="md">
{msgs.map((m) => ( {messages.map((m) => (
<MessageRow key={m.id} msg={m} /> <MessageRow key={m.id} msg={m} />
))} ))}
</Stack> </Stack>
</ScrollArea> </ScrollArea>
<Divider color="dark.4" /> <Divider color="dark.4" />
{sendError && (
<Text c="red" size="xs" px="sm" pt={4}>
{sendError}
</Text>
)}
<Group p="sm" gap="xs" wrap="nowrap"> <Group p="sm" gap="xs" wrap="nowrap">
<ActionIcon variant="subtle" color="gray" size="lg"> <ActionIcon variant="subtle" color="gray" size="lg">
<IconPaperclip size={18} /> <IconPaperclip size={18} />
@@ -143,14 +158,14 @@ export function ChatPanel({
placeholder={`Mensaje a ${room.name}`} placeholder={`Mensaje a ${room.name}`}
value={draft} value={draft}
onChange={(e) => setDraft(e.currentTarget.value)} onChange={(e) => setDraft(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && send()} onKeyDown={(e) => e.key === "Enter" && void send()}
/> />
<ActionIcon <ActionIcon
size="lg" size="lg"
radius="xl" radius="xl"
variant="filled" variant="filled"
color="brand" color="brand"
onClick={send} onClick={() => void send()}
disabled={!draft.trim()} disabled={!draft.trim()}
> >
<IconSend size={18} /> <IconSend size={18} />
+56 -7
View File
@@ -1,9 +1,9 @@
import { useState } from "react"; import { useCallback, useEffect, useState } from "react";
import { Flex, Box } from "@mantine/core"; import { Flex, Box, Center, Loader, Stack, Text, Button } from "@mantine/core";
import { Sidebar } from "./Sidebar"; import { Sidebar } from "./Sidebar";
import { ChatPanel } from "./ChatPanel"; import { ChatPanel } from "./ChatPanel";
import { MOCK_ROOMS } from "./mock"; import { api } from "./api";
import type { User } from "./types"; import type { Room, User } from "./types";
export function ChatShell({ export function ChatShell({
user, user,
@@ -12,10 +12,59 @@ export function ChatShell({
user: User; user: User;
onLogout: () => void; onLogout: () => void;
}) { }) {
const [rooms] = useState(MOCK_ROOMS); const [rooms, setRooms] = useState<Room[]>([]);
const [activeId, setActiveId] = useState<string>(rooms[0]?.id ?? ""); const [activeId, setActiveId] = useState<string>("");
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const load = useCallback(() => {
setLoading(true);
api
.listRooms()
.then((rs) => {
setRooms(rs);
setActiveId((cur) => cur || rs[0]?.id || "");
setError(null);
})
.catch((e) => setError(e?.message ?? "No se pudieron cargar las rooms"))
.finally(() => setLoading(false));
}, []);
useEffect(() => {
load();
}, [load]);
const active = rooms.find((r) => r.id === activeId); const active = rooms.find((r) => r.id === activeId);
// El panel derecho muestra el estado de carga/error/empty sin tocar el layout.
let panel = <ChatPanel room={active} />;
if (loading && rooms.length === 0) {
panel = (
<Center h="100%">
<Loader color="brand" />
</Center>
);
} else if (error) {
panel = (
<Center h="100%">
<Stack align="center" gap="sm">
<Text c="red" size="sm">
{error}
</Text>
<Button variant="light" color="brand" onClick={load}>
Reintentar
</Button>
</Stack>
</Center>
);
} else if (rooms.length === 0) {
panel = (
<Center h="100%">
<Text c="dimmed">No perteneces a ninguna room todavía</Text>
</Center>
);
}
return ( return (
<Flex h="100vh" w="100vw" style={{ overflow: "hidden" }}> <Flex h="100vh" w="100vw" style={{ overflow: "hidden" }}>
<Box <Box
@@ -36,7 +85,7 @@ export function ChatShell({
/> />
</Box> </Box>
<Box flex={1} h="100%" bg="dark.7" style={{ minWidth: 0 }}> <Box flex={1} h="100%" bg="dark.7" style={{ minWidth: 0 }}>
<ChatPanel room={active} user={user} /> {panel}
</Box> </Box>
</Flex> </Flex>
); );
+30 -5
View File
@@ -11,15 +11,29 @@ import {
Title, Title,
} from "@mantine/core"; } from "@mantine/core";
import { IconShieldLock, IconKey } from "@tabler/icons-react"; import { IconShieldLock, IconKey } from "@tabler/icons-react";
import { api, ApiError } from "./api";
import type { User } from "./types"; import type { User } from "./types";
export function Login({ onLogin }: { onLogin: (u: User) => void }) { export function Login({ onLogin }: { onLogin: (u: User) => void }) {
const [handle, setHandle] = useState(""); const [handle, setHandle] = useState("");
const [password, setPassword] = useState(""); const [password, setPassword] = useState("");
const [busy, setBusy] = useState(false);
const [error, setError] = useState<string | null>(null);
const ready = handle.trim().length > 0 && password.length > 0; const ready = handle.trim().length > 0 && password.length > 0;
const connect = () => { const connect = async () => {
const h = handle.trim(); if (!ready || busy) return;
if (ready) onLogin({ id: h, handle: h }); setBusy(true);
setError(null);
try {
// La contraseña desbloquea la sesión del gateway (passphrase del operador).
// El handle es solo el nombre a mostrar en esta iteración (wallet = fase 2).
const me = await api.login(password);
const h = handle.trim() || me.endpoint.slice(0, 8);
onLogin({ id: me.endpoint, handle: h });
} catch (e) {
setError(e instanceof ApiError ? e.message : "No se pudo conectar al gateway");
setBusy(false);
}
}; };
return ( return (
@@ -52,9 +66,20 @@ export function Login({ onLogin }: { onLogin: (u: User) => void }) {
leftSection={<IconKey size={16} />} leftSection={<IconKey size={16} />}
value={password} value={password}
onChange={(e) => setPassword(e.currentTarget.value)} onChange={(e) => setPassword(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && connect()} onKeyDown={(e) => e.key === "Enter" && void connect()}
/> />
<Button w="100%" size="md" onClick={connect} disabled={!ready}> {error && (
<Text c="red" size="sm" ta="center">
{error}
</Text>
)}
<Button
w="100%"
size="md"
onClick={() => void connect()}
disabled={!ready}
loading={busy}
>
Conectar Conectar
</Button> </Button>
</Stack> </Stack>
+131
View File
@@ -0,0 +1,131 @@
// La única capa por la que la SPA habla con el bus. Cada llamada va al gateway Go
// bajo /api; el gateway mantiene la sesión `pkg/client` (peer autenticado del
// bus), cifra/descifra por room y traduce a REST/SSE. El navegador nunca firma,
// nunca habla NATS y nunca ve una clave privada: solo guarda una cookie de
// sesión opaca (HttpOnly) que el gateway emite tras el login.
import type { MeInfo, Message, MsgWire, Room, RoomWire } from "./types";
export class ApiError extends Error {
status: number;
constructor(message: string, status: number) {
super(message);
this.status = status;
}
}
async function req<T>(path: string, init?: RequestInit): Promise<T> {
const res = await fetch(path, {
// same-origin envía la cookie de sesión automáticamente (también detrás del
// proxy de vite en dev).
credentials: "same-origin",
headers: { "Content-Type": "application/json" },
...init,
});
const text = await res.text();
let body: unknown = null;
if (text) {
try {
body = JSON.parse(text);
} catch {
body = text;
}
}
if (!res.ok) {
const msg =
body && typeof body === "object" && "error" in body
? String((body as { error: unknown }).error)
: `HTTP ${res.status}`;
throw new ApiError(msg, res.status);
}
return body as T;
}
// roomFromWire mapea la fila del gateway al tipo Room que consume la UI. Los
// mensajes NO viven aquí: llegan por stream(). lastMessage/lastTs/unread se
// rellenan de forma neutra para no inventar datos (la cabecera de la sidebar se
// alimentará del stream en una iteración futura).
export function roomFromWire(r: RoomWire): Room {
return {
id: r.id,
name: r.name || r.subject,
encrypted: r.encrypt,
lastMessage: "",
lastTs: 0,
unread: 0,
messages: [],
};
}
// messageFromWire mapea un frame descifrado del SSE al tipo Message de la UI.
export function messageFromWire(m: MsgWire): Message {
return {
id: m.id,
sender: m.sender,
body: m.body,
ts: m.ts,
mine: m.mine,
};
}
export const api = {
// ---- sesión -------------------------------------------------------------
// login desbloquea la sesión del gateway con la passphrase del operador. El
// gateway responde con una cookie de sesión; me() comprueba si ya hay una.
login: (passphrase: string) =>
req<MeInfo>("/api/login", {
method: "POST",
body: JSON.stringify({ passphrase }),
}),
logout: () => req<{ status: string }>("/api/logout", { method: "POST" }),
me: () => req<MeInfo>("/api/me"),
// ---- rooms --------------------------------------------------------------
listRooms: async (): Promise<Room[]> => {
const wire = await req<RoomWire[]>("/api/rooms");
return wire.map(roomFromWire);
},
// createRoom: {subject, encrypted} basta — el gateway deriva la policy
// Matrix-like (cifrada + persistida + firmada) por defecto.
createRoom: async (subject: string, encrypted = true): Promise<Room> => {
const r = await req<RoomWire>("/api/rooms", {
method: "POST",
body: JSON.stringify({ subject, encrypted }),
});
return roomFromWire(r);
},
join: (roomID: string) =>
req<{ status: string }>(
`/api/rooms/${encodeURIComponent(roomID)}/join`,
{ method: "POST" },
),
send: (roomID: string, body: string) =>
req<{ status: string }>(
`/api/rooms/${encodeURIComponent(roomID)}/send`,
{ method: "POST", body: JSON.stringify({ body }) },
),
};
// streamRoom abre el SSE de una room y llama onMessage por cada frame descifrado
// (historia primero en rooms persistidas, luego en vivo). Devuelve una función
// de cierre. EventSource manda la cookie de sesión automáticamente y reconecta
// solo si la conexión cae; onError se invoca en cada corte para que la UI pueda
// reflejar el estado.
export function streamRoom(
roomID: string,
onMessage: (m: Message) => void,
onError?: (e: Event) => void,
): () => void {
const es = new EventSource(
`/api/rooms/${encodeURIComponent(roomID)}/stream`,
);
es.onmessage = (ev) => {
try {
const wire = JSON.parse(ev.data) as MsgWire;
onMessage(messageFromWire(wire));
} catch {
// frame malformado: se ignora, el stream sigue.
}
};
if (onError) es.onerror = onError;
return () => es.close();
}
+33 -3
View File
@@ -1,5 +1,5 @@
// Tipos de dominio de la UI. En la iteración 1 se llenan con datos mock; // Tipos de dominio de la UI. Los datos vienen del gateway Go (REST/SSE), que es
// más adelante vendrán del gateway (REST/SSE) que es un peer del bus. // un peer autenticado del bus. El navegador nunca firma ni habla NATS.
export interface User { export interface User {
id: string; id: string;
@@ -8,7 +8,7 @@ export interface User {
export interface Message { export interface Message {
id: string; id: string;
sender: string; // handle sender: string; // endpoint id del remitente (handle legible es fase 2)
body: string; body: string;
ts: number; // epoch ms ts: number; // epoch ms
mine?: boolean; mine?: boolean;
@@ -23,3 +23,33 @@ export interface Room {
unread: number; unread: number;
messages: Message[]; messages: Message[];
} }
// ---- formas de la API del gateway (wire) ---------------------------------
// MeInfo es la identidad del operador que el gateway encarna (GET /api/me).
export interface MeInfo {
endpoint: string;
sign_pub: string;
}
// RoomWire es la fila de room que devuelve el gateway (GET /api/rooms). No trae
// mensajes: estos llegan por SSE (GET /api/rooms/{id}/stream).
export interface RoomWire {
id: string;
subject: string;
name: string;
epoch: number;
encrypt: boolean;
persist: boolean;
sign_msgs: boolean;
role: string;
}
// MsgWire es un mensaje ya descifrado que el gateway empuja por SSE.
export interface MsgWire {
id: string;
sender: string;
body: string;
ts: number;
mine: boolean;
}
+8 -1
View File
@@ -3,5 +3,12 @@ import react from "@vitejs/plugin-react";
export default defineConfig({ export default defineConfig({
plugins: [react()], plugins: [react()],
server: { host: true, port: 5181 }, // En dev, /api (REST + SSE) se proxea al gateway Go (cmd/webgw, puerto 8481).
// El proxy hace streaming, así que el SSE de /api/rooms/{id}/stream funciona a
// través de él. En producción el gateway sirve el dist embebido y no hay proxy.
server: {
host: true,
port: 5181,
proxy: { "/api": "http://127.0.0.1:8481" },
},
}); });