16 Commits

Author SHA1 Message Date
egutierrez 4994ea1483 feat(web): wallet join/recover/login (BIP39 seed identity)
Add the device-local wallet onboarding to the SPA. The user's identity
is derived deterministically from a 12-word BIP39 mnemonic and lives on
the device; the browser never signs, never talks NATS, and never sends
the seed to the server.

Wallet layer (web/src/wallet/):
- derive.ts: deterministic identity from a mnemonic. seed = BIP39 seed,
  then HKDF-SHA256 domain-separated into an Ed25519 signing key
  (info "unibus-sign-v1") and an X25519 key-exchange key (info
  "unibus-kex-v1"). The same mnemonic always yields the same sign_pub,
  which is what makes recovery possible without admin intervention. The
  four halves match cs.Identity on the Go side exactly.
- bip39.ts: thin wrappers over @scure/bip39 (generate, validate,
  normalize) so the checksum logic stays in the audited library.
- crypto.ts: at-rest encryption of the private key with WebCrypto only
  (PBKDF2-SHA256 210k iters -> AES-256-GCM). The password never leaves
  the device and only protects the local key copy.
- store.ts: IndexedDB persistence of the encrypted identity (private key
  encrypted; public halves + handle in the clear for display).
- account.ts: saveAndOpen / unlockAndOpen / localIdentity compose the
  primitives with the gateway session API.

Screens:
- Welcome: choose invite link or recover-with-seed on an empty device.
- Join: generate seed, show it once behind an acknowledge gate, confirm
  3 random words, set a local password, register the PUBLIC key with the
  bus via the invite token, then open the session.
- Recover: paste the 12 words, validate, show the reconstructed sign_pub,
  set a new local password, open the session. No register (the identity
  is already in the allowlist).
- WalletLogin: unlock the device's stored identity with the password.
- AuthShell: shared card/header for all pre-chat screens.
- App.tsx: route between join / welcome / login / recover / chat based on
  the invite link, a live gateway session, and any stored identity.

api.ts/types.ts: add register() and session() against the gateway
contract; vite dev server on :5183.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 21:21:50 +02:00
egutierrez 7d93d550d1 feat(webgw): per-user wallet sessions + invite register
Add the gateway backend for the wallet onboarding flow so each browser
session carries its OWN bus identity instead of sharing the single
operator client.

- POST /api/session (session.go): the browser hands its full wallet
  keypair (unlocked from the local encrypted key, over TLS) and the
  gateway spins up a dedicated bus client that acts AS that user. The
  private key lives only in process memory for the life of the session
  and is dropped on logout/shutdown. identityFromHex enforces the exact
  key sizes (sign_pub 32, sign_priv 64, kex_pub 32, kex_priv 32) that
  match cs.Identity on the Go side.
- POST /api/register (register.go): unauthenticated onboarding gated by
  a one-shot invite token. Validates the two PUBLIC key halves, then
  either consumes a configured --mock-tokens invite (local testing) or
  proxies to the bus POST /register (--register-url, bus >= 0.12.0). The
  handle/role come from the invite, never from the client.
- server.go: sessions move from a token->time map to a sessionStore of
  per-user *session records; auth() now resolves the session and passes
  its gateway to each handler. The legacy operator passphrase login
  (POST /api/login) is kept, bound to the shared operator gateway.
- main.go: build a busTemplate config that wallet sessions clone with
  their own Identity; wire --register-url / --mock-tokens.
- webgw_test.go: identity-size validation, hex-key validation, mock
  token parsing, and single-use register (201 then 409) using a fixed
  browser-derived wallet vector.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 21:21:33 +02:00
agent 5ea8fa1c20 feat(web): wire the SPA to the live bus via the gateway (drop mock)
Replace the mock data source with a real data layer that talks to the webgw
gateway over REST + SSE. The UI components keep their look and props; only
where the data comes from changed.

- src/api.ts: the single repository layer. fetch wrappers (same-origin cookie)
  for login/logout/me and rooms list/create/join/send, plus streamRoom() which
  opens an EventSource and yields each decrypted message. Wire->UI mappers
  (roomFromWire, messageFromWire).
- src/types.ts: add the gateway wire shapes (MeInfo, RoomWire, MsgWire) next to
  the existing UI types.
- App.tsx: probe /api/me on mount to resume an existing session; otherwise show
  Login. Logout calls the gateway.
- Login.tsx: the password field now unlocks the gateway session (operator
  passphrase); shows a basic error and a loading state. Wallet-per-browser is
  phase 2.
- ChatShell.tsx: load rooms from /api/rooms with loading / empty / error states;
  same Flex layout.
- ChatPanel.tsx: stream messages over SSE for the active room (dedup by id),
  composer sends through the gateway; no optimistic insert (the peer's own echo
  returns over SSE with the real frame id).
- vite.config.ts: dev proxy /api (REST + SSE) -> the gateway on :8481.

mock.ts is left untouched (no longer imported) to avoid churn with the parallel
styling work on master.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 21:14:19 +02:00
agent fb8a03cf0c feat(webgw): web gateway peer (REST + SSE) for the chat SPA
Add cmd/webgw: a single Go binary that holds the operator's bus identity,
connects to the bus as a real authenticated peer (pkg/client), and exposes a
small REST + SSE API the browser consumes. The browser never signs, never
speaks NATS, and never sees a private key.

Endpoints (all under /api, gated by a session cookie except login):
  POST /api/login            unlock a session with the operator passphrase
  POST /api/logout
  GET  /api/me               operator identity the gateway acts as
  GET  /api/rooms            ListMyRooms
  POST /api/rooms            CreateRoom (default policy: encrypted+persisted+signed)
  POST /api/rooms/{id}/join  Join (fetch room key)
  POST /api/rooms/{id}/send  Publish (sealed + signed by the peer)
  GET  /api/rooms/{id}/stream  SSE of decrypted frames (history then live)

Design notes:
- One fan-out hub per room: a single bus subscription is multiplexed to N SSE
  clients, avoiding the per-(room,endpoint) durable-consumer contention that
  multiple Subscribe calls would cause.
- Posture seam mirrors unibus_admin/clientcheck: empty --ca = plaintext dev,
  non-empty = TLS+nkey on both planes; RefreshSession after a membership change
  only under the secured (ACL) posture.
- Identity loaded from `pass` or a 0600 file, held only in memory.
- Session auth: passphrase compared in constant time; opaque HttpOnly cookie
  so EventSource (which cannot set headers) can authenticate the stream.

TRUST MODEL: room content stays end-to-end encrypted on the bus. The gateway
reads plaintext only because it acts AS the operator's client — a legitimate
member of each room holding the room key. The per-browser wallet (WebCrypto)
that moves decryption into the browser is phase 2.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 21:14:08 +02:00
egutierrez b4f3118e85 Merge quick/users-http-admin: HTTP admin-only users API + client methods (report 0014) 2026-06-07 20:46:44 +02:00
egutierrez e9053169da Merge quick/0011-deploy-gaps: live user-add --store kv + clientcheck E2E + runbook fixes (report 0012) 2026-06-07 20:46:44 +02:00
Egutierrez b983e43090 docs(0007): spec encryption-at-rest del control plane (JetStream/SQLite en disco) 2026-06-07 20:34:35 +02:00
egutierrez b379730225 docs(app): document users HTTP admin model, bump 0.10.0
Add a gotcha describing the unified-storage model (the server writes
users to the same store/KV as rooms), the admin-only HTTP surface, and
the CLI-seeds-admin-#0 bootstrap. Bump the version 0.9.0 -> 0.10.0 and
add the capability growth log entry for the new HTTP admin users API.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 20:32:05 +02:00
egutierrez 450ca01baf feat(membership,client): HTTP admin-only users API
Close the last control-plane asymmetry: rooms had a signed HTTP surface
but users were only manageable via the local CLI or direct store access.
Add admin-only HTTP endpoints, symmetric with rooms, executed against the
same privileged store the server already serves (SQLite single-node, the
replicated JetStream KV in cluster) — no new KV connection, no internal
identity, so the admin panel can manage the allowlist by signing as an
admin instead of needing --db / direct KV access.

Endpoints (all behind requireAdmin, on top of the existing
signature+nonce+TLS+enforce middleware):
  - GET  /users                    list the full allowlist (incl. revoked)
  - POST /users                    add {sign_pub, handle, role}
  - POST /users/{signpub}/revoke   revoke (status flip, no hard delete)

requireAdmin is default-deny with no dev relaxation: it allows a request
only when the authenticated signer is confirmed by the store as an active
admin; any other case (no signer, non-admin, revoked, store error) is 403,
fail-closed. The request context now also carries the signer's sign_pub
hex, because the endpoint id is a one-way hash of the key and cannot be
reversed to look the signer up in the allowlist.

Validation/idempotency mirror the CLL: sign_pub must be 64-hex, role must
be admin|member (empty defaults to member), re-adding an existing key is a
409 that leaves the row untouched. The hex check is unified into
membership.ValidateSignPubHex, reused by the CLI and the handlers.

pkg/client gains ListUsers/AddUser/RevokeUser (flat UserInfo type) signed
via doJSON, so the panel plugs in directly.

Tests: non-admin -> 403 on all three endpoints; admin add->list->revoke
roundtrip; validation (400 hex, 400 role, 409 re-add, row untouched); plus
a client test against an embedded membershipd under enforce.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 20:31:57 +02:00
egutierrez e1a7402ff1 chore: bump unibus to 0.9.0 (live user-add + clientcheck)
New capability membershipd user add --store kv against a live cluster plus
cmd/clientcheck end-to-end verification (issue 0011 gaps, report 0012). Adds
the capability growth log entry.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 19:41:56 +02:00
egutierrez ce72131ddf docs(cluster): correct runbook + wire --internal-id-file into deploy
Corrections learned from the real 0011 deploy:
- Bring up: the "start magnus alone and verify healthz" order deadlocks — a
  lone node of a 3-node cluster has no meta-group quorum and never serves
  healthz until a second node joins. Document a quorum-forming start and that
  a node never self-serves.
- Replication: R1 is an unusable SPOF (all six control-plane buckets on one
  node) and the cold start only converges with the three cold-start fixes;
  go straight to R3 once the cluster forms.
- Add a "user add --store kv" section: the live user-add path that replaces
  stop-seed-restart, with its security model and idempotency/HA/no-delete
  semantics.
- Topology: real IPs, ROUTE_NETWORK=public (no WireGuard mesh exists).
- Chaos test: mark the data-plane client + failover proofs as validated (0012).

Deploy machinery now emits the persisted internal identity: the unit gains
--internal-id-file ${INTERNAL_ID_FILE} and deploy-cluster.sh writes
INTERNAL_ID_FILE into each node's cluster.env, so a fresh deploy enables the
live user-add path on every node.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 19:41:56 +02:00
egutierrez 3aa5a2c9a9 feat(clientcheck): end-to-end client verification (E2E room + failover)
The 0011 chaos test validated only the control plane (healthz + leader
failover + KV readable with 2/3); it never connected an authenticated bus
client to the data plane. cmd/clientcheck is a reusable verification tool: it
connects with a real identity (nkey + TLS on both planes, multi-node seed
lists), creates an ephemeral E2E room (encrypted + signed, no durable stream),
and either publishes N messages and asserts all come back decrypted (golden)
or publishes a counter for a duration while logging the attached node (loop),
so stopping a node mid-run shows the client fail over to a survivor and keep
receiving with quorum 2/3.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 19:41:56 +02:00
egutierrez 02c2004ebd feat(membershipd): user add/list/revoke --store kv against a live cluster
Closes the most valuable 0011 deploy gap: adding users to the running
cluster's replicated allowlist with no stop-seed-restart. Under enforce the
per-subject ACL confines every bus user to its own rooms, so no ordinary
identity may write the control-plane KV buckets; the only identity the
authenticator grants full JetStream permissions is membershipd's internal
service identity.

- main.go: --internal-id-file persists that identity (load-or-create, 0600)
  instead of a fresh ephemeral key, so the same nkey is available out of
  process. Empty keeps the ephemeral default (single-node/dev unchanged).
- users_kv.go: connectKVStore loads the persisted identity, presents its
  nkey (recognized as internal -> full perms), opens the KV store and
  writes. Defaults assume an on-node loopback invocation; a remote target
  without --ca is refused (allowlist must not travel cleartext, audit N6).
  Prints KV_UNIBUS_users replication (followers_current) after a write.
- users_cli.go: --store kv on add/list/revoke. Re-adding a key is an explicit
  ErrUserExists (no silent overwrite / role flip); revoke is a status flip.
- pkg/client: LoadIdentity (load-only) extracted from LoadOrCreateIdentity,
  preserving its "corrupt file is an error, not silently regenerated" guard.
- kv_useradd_test.go: golden write under enforce, idempotency, unreachable
  endpoint, and remote-without-CA refusal against an embedded node.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 19:41:38 +02:00
egutierrez ff580ac031 Merge quick/cluster-coldstart-fixes: 3-node cluster cold-start fixes + real topology 2026-06-07 18:56:28 +02:00
egutierrez 9fbff79df4 chore(deploy): fill cluster nodes.env with the real 3-node topology
Set magnus's public IP (135.125.201.30) and switch ROUTE_NETWORK to "public":
the three nodes have no WireGuard mesh (homer/datardos do not even have wg
installed), so server-to-server routes go over the public IPs, still protected
by the separate cluster route CA (mutual TLS). KV_REPLICAS is raised to 3 now
that the cluster runs at R3.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 18:56:28 +02:00
egutierrez 33746d9962 fix(cluster): make the JetStream control-plane survive a cold multi-node start
Bringing up the 3-node cluster from clean stores never converged: every node
looped on `open KV bucket "UNIBUS_rooms" (replicas=1): context deadline exceeded`.
Three independent defects in the clustered bootstrap path, none of which surface
on a single node (where JetStream is ready instantly), caused it:

1. embeddednats: route connection pooling (nats-server 2.10 default pool of 3)
   churned with "duplicate route"/"client closed" reconnects on the small cluster,
   interrupting the meta-group RAFT heartbeats and forcing perpetual leader
   re-elections. Set Cluster.PoolSize = -1 (single route per peer).

2. embeddednats: the cluster nodes are Docker hosts, so NATS advertised the docker
   bridge IPs (172.x / 10.0.x) to peers, which then tried to dial those private,
   mutually-unreachable addresses. Set Cluster.NoAdvertise = true so only the
   explicit public-IP routes are used. Also added a UNIBUS_NATS_DEBUG env toggle
   (off by default) that enables the embedded server's logger and loopback
   monitoring port for debugging the route/meta layer.

3. membership.OpenJetStream: a KV op is a NATS request/reply; on a cold cluster the
   op was published once, before the node had contact with the meta leader, so the
   request was dropped and the single long-context call just blocked until timeout.
   Retry each bucket op with short per-attempt contexts until it succeeds or an
   overall bootstrap budget (120s) is exhausted, so it lands once the meta settles.

With these the cluster forms cleanly, creates the KV buckets, scales R1->R3 in
place, and survives loss of one node (quorum 2/3). Verified on magnus+homer+datardos.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 18:56:28 +02:00
47 changed files with 4688 additions and 150 deletions
+4
View File
@@ -12,5 +12,9 @@ worker.id
/membershipd
/worker
/chat
/webgw
*.exe
registry.db
# Local session infra (machine-specific absolute paths; never distributed).
.mcp.json
+62 -1
View File
@@ -2,7 +2,7 @@
name: unibus
lang: go
domain: infra
version: 0.8.0
version: 0.10.0
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
tags: [service, messaging, nats, e2e]
uses_functions:
@@ -122,6 +122,21 @@ Para apuntar a un NATS externo en producción: `--nats-url nats://host:4222` en
las rutas GET de lectura. Confía en la red interna. Las rutas mutantes
(`/rooms`, `/invite`, `/rekey`) sí exigen firma Ed25519 del owner sobre los
bytes canónicos de la request. Endurecer es fase posterior.
- **Gestión de usuarios: storage unificado, alta por dos vías.** El allowlist de
usuarios vive en el MISMO store que las rooms (`pkg/membership.Store`): SQLite en
single-node, JetStream KV replicado (`UNIBUS_users`) en cluster. El `Server` ya
tiene ese store privilegiado abierto (es quien sirve el KV en cada nodo), así que
expone `GET/POST /users` y `POST /users/{signpub}/revoke` como API HTTP admin-only,
simétrica con las rutas de rooms: el panel de administración firma como admin y el
server ejecuta la mutación contra el mismo store. El panel NO necesita `--db`, ni la
identidad interna, ni correr en un nodo del cluster; funciona idéntico en single-node
y cluster. La autorización es default-deny: solo un firmante que el store confirma como
`role == "admin"` activo pasa, cualquier otro recibe 403 (encima de la firma+nonce+TLS
ya existentes). La CLI `membershipd user add --store kv` sigue existiendo SOLO para
sembrar el admin #0 (bootstrap del huevo-gallina: sin un admin sembrado no hay quién
firme el primer `POST /users`); a partir de ahí toda la gestión es HTTP admin-only. El
alta es idempotente igual que la CLI: re-alta de una clave ya registrada = 409, sin
sobrescribir ni elevar rol; el revoke es un flip de status (sin hard-delete), auditable.
- **Identidad = secreto crítico.** El archivo de identidad (`worker.id`,
`chat.id`) contiene las claves privadas (Ed25519 + X25519). Se escribe 0600.
Perderlo = mensajes ilegibles, sin recuperación. Trátalo como una clave SSH.
@@ -154,6 +169,52 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
## Capability growth log
- v0.10.0 (2026-06-07) — API HTTP admin-only de gestión de usuarios, cerrando la
última asimetría del control plane: las rooms tenían superficie HTTP firmada
(`POST /rooms`, etc.) pero los users solo se gestionaban por CLI local o acceso
directo al store. Se añaden `GET /users` (lista completa, incluidos revocados),
`POST /users` (alta `{sign_pub, handle, role}`: valida hex de 64 chars + role en
`{admin, member}`, 409 idempotente que no sobrescribe ni eleva rol) y
`POST /users/{signpub}/revoke` (flip de status, sin hard-delete). Los tres pasan por
un helper `requireAdmin` default-deny que confirma contra el store que el firmante
autenticado es un user `role == "admin"` activo (el endpoint id es un hash one-way de
la clave, así que el contexto lleva ahora también el `sign_pub` hex del firmante para
resolver `GetUser`); cualquier otro firmante recibe 403, encima de la firma+nonce+TLS+
enforce ya heredadas del middleware. NO se abre conexión KV nueva ni se usa la identidad
interna: el server escribe vía su `s.store` privilegiado, el MISMO que las rooms (SQLite
single-node, KV `UNIBUS_users` en cluster). `pkg/client` gana `ListUsers/AddUser/RevokeUser`
(tipo plano `UserInfo`) firmando como admin, así la pestaña Users del panel deja de
necesitar `--db`/acceso KV directo. La CLI `membershipd user add --store kv` queda SOLO
para sembrar el admin #0 (bootstrap). La validación de `sign_pub` se unifica en
`membership.ValidateSignPubHex`, reusada por la CLI y los handlers. Tests nuevos:
no-admin → 403 en los tres endpoints, roundtrip admin add→list→revoke, y validación
(hex inválido → 400, role inválido → 400, re-alta → 409), más un test de cliente contra
un membershipd embebido. Cambios 100% aditivos: el comportamiento single-node y de las
rutas de rooms no cambia; vet/build/test verdes.
- v0.9.0 (2026-06-07) — cierre de los gaps que el despliegue del cluster (report
0011) dejó abiertos (report 0012). (GAP A) Nueva capability `membershipd user
add|list|revoke --store kv`: alta/baja de usuarios contra el KV replicado del
cluster EN MARCHA, sin el procedimiento de parar-sembrar-rearrancar. Usa la
conexión interna privilegiada — el daemon persiste su identidad de servicio con
`--internal-id-file` (cada nodo genera/carga la suya, 0600 junto a las claves TLS)
y la CLI, ejecutada por loopback en un nodo, presenta esa nkey que el
autenticador reconoce con permisos plenos de JetStream; ninguna identidad de
usuario normal puede tocar los buckets `KV_UNIBUS_*` bajo la ACL por-subject. El
alta es idempotente (re-alta de la misma clave = `ErrUserExists` explícito, sin
sobrescribir ni elevar rol), commitea con quórum 2/3 (HA, imprime
`followers_current`) y rechaza un destino remoto sin `--ca` (igual que
`migrate-to-kv`). (GAP B) Nuevo `cmd/clientcheck`: verificación end-to-end real
con un cliente autenticado (identidad operator, nkey+TLS+https) que crea una room
E2E, publica y recibe descifrado contra el cluster vivo, incluido un nodo parado a
media transmisión donde el cliente hace failover a un superviviente y sigue
recibiendo con cero pérdida (quórum 2/3) — el plano de datos que el chaos test del
0011 nunca probó. (GAP C) Runbook `deploy/cluster/README.md` corregido: el orden
de arranque "magnus solo y verifica healthz" deadlockeaba (un nodo solo no tiene
quórum del meta-group y nunca sirve healthz); se documenta el arranque por quórum,
que R1 es un SPOF inservible (ir directo a R3) y la nueva vía de alta con el
cluster vivo. La plantilla de deploy (unit + `deploy-cluster.sh`) emite ya
`INTERNAL_ID_FILE` y el flag. Verificado contra los 3 VPS reales (magnus + homer +
datardos); posture enforce+ACL+TLS+R3 intacta.
- v0.8.0 (2026-06-07) — completar y endurecer el cluster (issue 0006, fases
0006a0006g) que cierra los bloqueantes de la auditoría dedicada del cluster
(report 0008) y cablea el control plane descentralizado que 0003 dejó a medias.
+260
View File
@@ -0,0 +1,260 @@
// Command clientcheck is an end-to-end verification client for a live unibus
// cluster (issue 0011 GAP B). The 0011 chaos test validated only the control
// plane (healthz + meta/stream-leader failover + KV readable with 2/3); it never
// connected an authenticated bus client (nkey + TLS) to create a room and
// publish/subscribe through it, least of all across a node loss. clientcheck does
// exactly that with a real identity (the operator), so the data-plane end-to-end
// path — connect, create an E2E room, publish, receive decrypted — is exercised
// against the running cluster, including while a node is stopped.
//
// It is a reusable tool, not a throwaway script: point it at the cluster's CA,
// an identity file, and the NATS + control-plane seed lists.
//
// # golden: connect, create an E2E room, publish N, confirm N decrypted back
// clientcheck --ca ca.crt --identity-file operator.id \
// --nats-seeds nats://A:4250,nats://B:4250,nats://C:4250 \
// --ctrl-seeds https://A:8470,https://B:8470,https://C:8470 --messages 5
//
// # loop: publish a counter every interval for the duration, logging the node
// # it is attached to — stop a node mid-run (systemctl stop membershipd-cluster)
// # and watch it fail over to a survivor and keep receiving (quorum 2/3).
// clientcheck ... --mode loop --duration 45s --interval 1s
package main
import (
"crypto/rand"
"encoding/hex"
"flag"
"fmt"
"log"
"sort"
"strings"
"sync"
"time"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/room"
)
func main() {
var (
caPath = flag.String("ca", "", "bus CA cert pinning TLS on both planes (required for a secured cluster)")
idFile = flag.String("identity-file", "", "path to the client identity JSON (e.g. `pass show unibus/operator-identity` written 0600) (required)")
natsSeeds = flag.String("nats-seeds", "", "comma-separated NATS urls of the cluster nodes (required)")
ctrlSeeds = flag.String("ctrl-seeds", "", "comma-separated control-plane https urls of the cluster nodes (required)")
subject = flag.String("subject", "test.gapcheck", "test room subject PREFIX; a random token is appended so runs never collide with real rooms")
messages = flag.Int("messages", 5, "golden mode: number of messages to publish and expect back")
mode = flag.String("mode", "golden", "golden (publish N, verify N decrypted) | loop (publish a counter for --duration, for failover testing)")
duration = flag.Duration("duration", 30*time.Second, "loop mode: how long to keep publishing")
interval = flag.Duration("interval", 1*time.Second, "loop mode: delay between published messages")
)
flag.Parse()
if *idFile == "" || *natsSeeds == "" || *ctrlSeeds == "" {
log.Fatalf("clientcheck: --identity-file, --nats-seeds and --ctrl-seeds are required")
}
id, err := client.LoadIdentity(*idFile)
if err != nil {
log.Fatalf("clientcheck: load identity: %v", err)
}
natsList := splitCSV(*natsSeeds)
ctrlList := splitCSV(*ctrlSeeds)
if len(natsList) == 0 || len(ctrlList) == 0 {
log.Fatalf("clientcheck: empty --nats-seeds or --ctrl-seeds")
}
// Build the secure client options: nkey on the data plane, TLS pinned to the
// bus CA on both planes, and the FULL seed lists so nats.go fails over to a
// surviving node when the attached one dies (the failover this tool verifies).
opts := client.Options{
NatsServers: natsList[1:],
CtrlURLs: ctrlList[1:],
}
if *caPath != "" {
tlsCfg, err := busauth.LoadCATLSConfig(*caPath)
if err != nil {
log.Fatalf("clientcheck: load CA: %v", err)
}
opts.UseNkey = true
opts.TLS = tlsCfg
opts.CtrlTLS = tlsCfg
for _, u := range ctrlList {
if !strings.HasPrefix(u, "https://") {
log.Fatalf("clientcheck: control URL %q must be https:// when --ca is set", u)
}
}
}
c, err := client.NewWithOptions(natsList[0], ctrlList[0], id, opts)
if err != nil {
log.Fatalf("clientcheck: connect: %v", err)
}
defer c.Close()
log.Printf("connected: endpoint=%s nats=%s", c.Endpoint().ID, c.ConnectedServer())
// Create an EPHEMERAL E2E room (encrypted + signed, NOT persisted): the test
// stays end-to-end encrypted (the cluster requires encryption on a public
// bind) while leaving no durable JetStream stream behind. The random subject
// token guarantees the room is unique and never a real room.
rnd := make([]byte, 8)
if _, err := rand.Read(rnd); err != nil {
log.Fatalf("clientcheck: random: %v", err)
}
subj := fmt.Sprintf("%s.%s", *subject, hex.EncodeToString(rnd))
policy := room.Policy{Encrypt: true, Persist: false, SignMsgs: true}
roomID, err := c.CreateRoom(subj, policy)
if err != nil {
log.Fatalf("clientcheck: create room: %v", err)
}
log.Printf("created E2E room: id=%s subject=%s (encrypt=%v sign=%v persist=%v)", roomID, subj, policy.Encrypt, policy.SignMsgs, policy.Persist)
// Under the per-subject ACL, NATS freezes permissions at connect time, so the
// just-created room's subject is not yet publishable/subscribable on the live
// connection. RefreshSession reconnects so the authenticator re-derives the
// ACL (now including this room) — the post-0006 contract every client follows
// after a membership change.
if err := c.RefreshSession(); err != nil {
log.Fatalf("clientcheck: refresh session: %v", err)
}
switch *mode {
case "golden":
runGolden(c, roomID, *messages)
case "loop":
runLoop(c, roomID, *duration, *interval)
default:
log.Fatalf("clientcheck: --mode must be golden or loop, got %q", *mode)
}
}
// runGolden subscribes, publishes n messages, and asserts all n come back
// decrypted. Exits non-zero if any are missing.
func runGolden(c *client.Client, roomID string, n int) {
var mu sync.Mutex
got := map[string]bool{}
sub, err := c.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
mu.Lock()
got[string(plaintext)] = true
mu.Unlock()
})
if err != nil {
log.Fatalf("clientcheck: subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(300 * time.Millisecond) // let the subscription settle
want := make([]string, n)
for i := 0; i < n; i++ {
msg := fmt.Sprintf("gapcheck-e2e-%d", i)
want[i] = msg
if err := c.Publish(roomID, []byte(msg)); err != nil {
log.Fatalf("clientcheck: publish %d: %v", i, err)
}
}
log.Printf("published %d messages to %s; waiting for decrypted echoes...", n, roomID)
deadline := time.Now().Add(15 * time.Second)
for time.Now().Before(deadline) {
mu.Lock()
have := len(got)
mu.Unlock()
if have >= n {
break
}
time.Sleep(100 * time.Millisecond)
}
mu.Lock()
defer mu.Unlock()
missing := 0
for _, w := range want {
if !got[w] {
missing++
log.Printf(" MISSING: %q", w)
}
}
log.Printf("connected node at finish: %s", c.ConnectedServer())
if missing > 0 {
log.Fatalf("GOLDEN FAIL: %d/%d messages not received decrypted", missing, n)
}
log.Printf("GOLDEN OK: all %d messages received and decrypted end-to-end", n)
}
// runLoop publishes a numbered message every interval for the duration and logs
// the count received plus the node currently attached, so an operator stopping a
// cluster node mid-run sees the client fail over to a survivor and keep receiving
// (quorum 2/3). It is the live failover-with-a-connected-client test the 0011
// chaos run never performed.
func runLoop(c *client.Client, roomID string, duration, interval time.Duration) {
var mu sync.Mutex
received := 0
servers := map[string]int{} // node -> #ticks observed attached
sub, err := c.Subscribe(roomID, func(_ frame.Frame, _ []byte) {
mu.Lock()
received++
mu.Unlock()
})
if err != nil {
log.Fatalf("clientcheck: subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(300 * time.Millisecond)
log.Printf("loop: publishing every %s for %s — stop a node now to test failover", interval, duration)
end := time.Now().Add(duration)
sent := 0
for time.Now().Before(end) {
msg := fmt.Sprintf("gapcheck-loop-%d", sent)
err := c.Publish(roomID, []byte(msg))
sent++
mu.Lock()
recv := received
mu.Unlock()
node := c.ConnectedServer()
up := c.IsConnected()
if node != "" {
mu.Lock()
servers[node]++
mu.Unlock()
}
pubStatus := "ok"
if err != nil {
pubStatus = "ERR:" + err.Error()
}
log.Printf(" t=%2ds sent=%d recv=%d up=%v node=%s publish=%s",
sent, sent, recv, up, node, pubStatus)
time.Sleep(interval)
}
mu.Lock()
defer mu.Unlock()
log.Printf("loop done: sent=%d received=%d", sent, received)
nodes := make([]string, 0, len(servers))
for n := range servers {
nodes = append(nodes, n)
}
sort.Strings(nodes)
for _, n := range nodes {
log.Printf(" attached to %s for %d ticks", n, servers[n])
}
if len(servers) > 1 {
log.Printf("FAILOVER OBSERVED: client was attached to %d distinct nodes across the run", len(servers))
}
if received == 0 {
log.Fatalf("LOOP FAIL: received 0 messages")
}
log.Printf("LOOP OK: client kept receiving across the run (received=%d)", received)
}
func splitCSV(s string) []string {
var out []string
for _, p := range strings.Split(s, ",") {
if p = strings.TrimSpace(p); p != "" {
out = append(out, p)
}
}
return out
}
+152
View File
@@ -0,0 +1,152 @@
package main
// Integration tests for issue 0011 GAP A: `membershipd user add --store kv`
// adds users to a RUNNING cluster's replicated allowlist via the privileged
// internal connection, instead of the stop-seed-restart procedure the 0011
// deploy required. These exercise the real connectKVStore path (load the
// persisted internal identity from a file, present its nkey, open the KV store,
// write the user) against an embedded enforce node, plus the idempotency and
// error semantics the DoD calls for. Multi-node replication and node-down quorum
// are validated against the live cluster (report 0012).
import (
"encoding/hex"
"errors"
"path/filepath"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/membership"
)
// startEnforceKVNode boots a single embedded enforce node whose authenticator
// recognizes internalPubHex as the privileged internal identity, bootstraps the
// KV control-plane store over the in-process internal connection, and publishes
// it into the holder — the exact sequence main.go performs for --store kv. It
// returns the client URL the CLI connects to.
func startEnforceKVNode(t *testing.T, internalID cs.Identity) string {
t.Helper()
holder := &storeHolder{}
auth := busauth.NewNkeyAuthenticatorACLInternal(
holder.IsAuthorized,
busauth.PermissionsFromSubjects(holder.subjectACL),
hex.EncodeToString(internalID.SignPub),
)
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t), Auth: auth,
})
if err != nil {
t.Fatalf("start enforce node: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
intNC, js, err := connectInternalJS(ns, internalID, true)
if err != nil {
t.Fatalf("bootstrap internal connection: %v", err)
}
t.Cleanup(intNC.Close)
kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second})
if err != nil {
t.Fatalf("bootstrap KV store: %v", err)
}
holder.set(kvStore)
return ns.ClientURL()
}
// TestUserAddStoreKV_GoldenAndIdempotent is the GAP A golden + edge-1: the CLI
// connection (real connectKVStore, loading the internal identity from a file and
// presenting its nkey) writes a user into the live KV allowlist, the user is
// authorized afterward, and re-adding the same key is an explicit ErrUserExists
// with no corruption (the unchanged row is still authorized).
func TestUserAddStoreKV_GoldenAndIdempotent(t *testing.T) {
idFile := filepath.Join(t.TempDir(), "internal.id")
internalID, err := client.LoadOrCreateIdentity(idFile) // persists 0600
if err != nil {
t.Fatalf("persist internal identity: %v", err)
}
url := startEnforceKVNode(t, internalID)
// Golden: connect as the privileged internal identity (loopback, no TLS) and
// add a new user, exactly as `user add --store kv` does.
kv, err := connectKVStore(url, idFile, "", 1)
if err != nil {
t.Fatalf("connectKVStore (privileged): %v", err)
}
defer kv.Close()
newUser, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("new user identity: %v", err)
}
pub := hex.EncodeToString(newUser.SignPub)
if err := kv.store.AddUser(pub, "gapcheck_user", membership.RoleMember); err != nil {
t.Fatalf("add user to live KV: %v", err)
}
if !kv.store.IsAuthorized(pub) {
t.Fatalf("user added to KV must be authorized")
}
// Edge 1: re-adding the same key is a clean, non-destructive ErrUserExists.
err = kv.store.AddUser(pub, "gapcheck_user", membership.RoleMember)
if !errors.Is(err, membership.ErrUserExists) {
t.Fatalf("re-add must return ErrUserExists (idempotent), got %v", err)
}
// A different handle/role with the SAME key is also rejected — the row is not
// silently overwritten (no role flip).
if err := kv.store.AddUser(pub, "impostor", membership.RoleAdmin); !errors.Is(err, membership.ErrUserExists) {
t.Fatalf("re-add with a different role must NOT overwrite; want ErrUserExists, got %v", err)
}
u, err := kv.store.GetUser(pub)
if err != nil {
t.Fatalf("get user: %v", err)
}
if u.Handle != "gapcheck_user" || u.Role != membership.RoleMember || u.Status != membership.StatusActive {
t.Fatalf("idempotent re-add corrupted the row: %+v", u)
}
}
// TestUserAddStoreKV_RequiresInternalIdentity: --store kv without a usable
// internal identity file fails loudly (missing file, empty path) rather than
// silently connecting unprivileged.
func TestUserAddStoreKV_RequiresInternalIdentity(t *testing.T) {
if _, err := connectKVStore("nats://127.0.0.1:4250", "", "", 1); err == nil {
t.Fatalf("empty --internal-id-file must be an error")
}
missing := filepath.Join(t.TempDir(), "nope.id")
if _, err := connectKVStore("nats://127.0.0.1:4250", missing, "", 1); err == nil {
t.Fatalf("missing internal identity file must be an error")
}
}
// TestUserAddStoreKV_UnreachableKV is the GAP A error case: pointing --store kv
// at a dead endpoint yields a clear, handled error (no crash, no silent success).
func TestUserAddStoreKV_UnreachableKV(t *testing.T) {
idFile := filepath.Join(t.TempDir(), "internal.id")
if _, err := client.LoadOrCreateIdentity(idFile); err != nil {
t.Fatalf("persist internal identity: %v", err)
}
// A loopback port with nothing listening: connect must fail fast and wrapped.
_, err := connectKVStore("nats://127.0.0.1:1/", idFile, "", 1)
if err == nil {
t.Fatalf("connecting to a dead endpoint must error")
}
}
// TestUserAddStoreKV_RemoteWithoutCARefused: a non-loopback target without --ca
// is refused so the allowlist write never travels in cleartext (audit 0008 N6,
// same guard as migrate-to-kv).
func TestUserAddStoreKV_RemoteWithoutCARefused(t *testing.T) {
idFile := filepath.Join(t.TempDir(), "internal.id")
if _, err := client.LoadOrCreateIdentity(idFile); err != nil {
t.Fatalf("persist internal identity: %v", err)
}
_, err := connectKVStore("nats://203.0.113.1:4250", idFile, "", 1)
if err == nil {
t.Fatalf("remote target without --ca must be refused")
}
}
+24
View File
@@ -24,6 +24,7 @@ import (
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/membership"
)
@@ -83,6 +84,17 @@ func main() {
// "kv" puts rooms/members/keys/users in replicated JetStream KV so any node
// in the cluster serves the same state.
storeBackend = flag.String("store", "sqlite", "control-plane store backend: sqlite (default, single-node) | kv (replicated JetStream, decentralized)")
// Persisted internal service identity (issue 0011 gaps, GAP A): when set, the
// privileged internal identity used to manage JetStream is LOADED from this
// file (generated and persisted on first start) instead of being a fresh
// ephemeral key each boot. Persisting it is what lets `membershipd user add
// --store kv` write the replicated allowlist of a LIVE cluster: that CLI,
// run over loopback on a node, loads the SAME identity and presents the nkey
// this node's authenticator already grants full permissions. Empty keeps the
// ephemeral-per-process behavior (single-node/dev default, unchanged). The
// file holds a private key: it is written 0600 and belongs next to the node's
// TLS keys (deploy keeps it under secrets/, gitignored).
internalIDFile = flag.String("internal-id-file", "", "path to a persisted internal service identity (JSON); enables `membershipd user add --store kv` against the live cluster. Empty = ephemeral per-process identity (dev default)")
)
flag.Parse()
@@ -136,10 +148,22 @@ func main() {
var internalID cs.Identity
var internalPubHex string
if needJS && enforce && *natsURL == "" {
if *internalIDFile != "" {
// Persisted identity: load it, generating + writing it (0600) on first
// start. A stable internal key is what `user add --store kv` presents to
// add users to a live cluster (GAP A); rotate it by deleting the file and
// restarting.
internalID, err = client.LoadOrCreateIdentity(*internalIDFile)
if err != nil {
log.Fatalf("load internal service identity %q: %v", *internalIDFile, err)
}
log.Printf("internal service identity: persisted (%s)", *internalIDFile)
} else {
internalID, err = cs.GenerateIdentity()
if err != nil {
log.Fatalf("generate internal identity: %v", err)
}
}
internalPubHex = hex.EncodeToString(internalID.SignPub)
}
+82 -15
View File
@@ -1,7 +1,7 @@
package main
import (
"encoding/hex"
"errors"
"flag"
"fmt"
"os"
@@ -50,13 +50,26 @@ commands:
list List all registered users
revoke Revoke a user (denies access on both planes immediately)
store backends (--store):
sqlite local SQLite database (default; seeds the first admin offline)
kv the RUNNING cluster's replicated JetStream KV allowlist, via the
privileged internal connection — add users with the cluster live,
no stop-seed-restart needed (run over loopback/SSH on a node)
examples:
membershipd user add --handle alice --sign-pub <64-hex> --role admin
membershipd user list
membershipd user add --store kv --handle bob --sign-pub <64-hex> --role member
membershipd user list --store kv
membershipd user revoke <64-hex>
common flags:
--db <path> SQLite database path (default ./local_files/unibus.db)
--db <path> SQLite database path (--store sqlite; default ./local_files/unibus.db)
--store kv flags (defaults assume an on-node invocation):
--nats-url <url> cluster NATS (default nats://127.0.0.1:4250)
--internal-id-file <path> persisted internal service identity (default /opt/unibus/secrets/internal.id)
--ca <path> CA cert pinning the data-plane TLS (default /opt/unibus/tls/ca.crt)
--kv-replicas <n> KV replication factor, match the cluster (default 3)
`)
}
@@ -76,16 +89,56 @@ func openStore(path string) membership.Store {
// validateSignPubHex ensures the key is exactly a 32-byte Ed25519 public key in
// hex (64 hex chars). Catching this here turns a silent "authorized nobody" into
// an explicit error at seed time.
// an explicit error at seed time. It delegates to membership.ValidateSignPubHex
// so the CLI and the HTTP user-management handlers share one rule.
func validateSignPubHex(signPub string) error {
b, err := hex.DecodeString(signPub)
return membership.ValidateSignPubHex(signPub)
}
// kvFlags holds the connection flags shared by the --store kv path of the user
// subcommands. registerKVFlags wires them onto a flag set so add and list expose
// an identical interface.
type kvFlags struct {
store *string
natsURL *string
internalID *string
ca *string
replicas *int
}
func registerKVFlags(fs *flag.FlagSet) kvFlags {
return kvFlags{
store: fs.String("store", "sqlite", "user store backend: sqlite (local DB) | kv (the live cluster's replicated allowlist)"),
natsURL: fs.String("nats-url", defaultClusterNatsURL, "cluster NATS url for --store kv"),
internalID: fs.String("internal-id-file", defaultInternalIDFile, "persisted internal service identity for --store kv"),
ca: fs.String("ca", defaultClusterCAFile, "CA cert pinning TLS on the --store kv NATS connection"),
replicas: fs.Int("kv-replicas", 3, "KV replication factor for --store kv (match the cluster)"),
}
}
// resolveStore returns the membership store for the chosen backend plus a cleanup
// func. For --store kv it opens the privileged connection to the live cluster; for
// sqlite it opens the local file. It exits the process with a clear message on any
// failure (a dead NATS, a missing identity file), so a broken --store kv add fails
// loudly instead of silently — Error case of the GAP A DoD. The returned *kvConn
// is non-nil only for the kv backend (so the caller can report replication).
func resolveStore(cmd string, kf kvFlags, dbPath string) (membership.Store, *kvConn, func()) {
switch *kf.store {
case "sqlite":
store := openStore(dbPath)
return store, nil, func() { store.Close() }
case "kv":
kv, err := connectKVStore(*kf.natsURL, *kf.internalID, *kf.ca, *kf.replicas)
if err != nil {
return fmt.Errorf("sign-pub is not valid hex: %w", err)
fmt.Fprintf(os.Stderr, "membershipd %s: --store kv: %v\n", cmd, err)
os.Exit(1)
}
if len(b) != 32 {
return fmt.Errorf("sign-pub must be a 32-byte Ed25519 public key (64 hex chars), got %d bytes", len(b))
return kv.store, kv, kv.Close
default:
fmt.Fprintf(os.Stderr, "membershipd %s: --store must be \"sqlite\" or \"kv\", got %q\n", cmd, *kf.store)
os.Exit(2)
return nil, nil, func() {}
}
return nil
}
func userAdd(args []string) {
@@ -94,6 +147,7 @@ func userAdd(args []string) {
signPub := fs.String("sign-pub", "", "Ed25519 signing public key in hex (required)")
role := fs.String("role", membership.RoleMember, "role: admin or member")
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
kf := registerKVFlags(fs)
_ = fs.Parse(args)
if *handle == "" || *signPub == "" {
@@ -105,23 +159,35 @@ func userAdd(args []string) {
os.Exit(2)
}
store := openStore(*dbPath)
defer store.Close()
store, kv, closeStore := resolveStore("user add", kf, *dbPath)
defer closeStore()
if err := store.AddUser(*signPub, *handle, *role); err != nil {
if errors.Is(err, membership.ErrUserExists) {
// Idempotency contract (GAP A): re-adding the same key is an EXPLICIT,
// non-destructive error — the existing row is left untouched (no silent
// upsert that could flip a role or clobber status, which would corrupt the
// allowlist). To replace a user, `user revoke <key>` then add again.
fmt.Fprintf(os.Stderr, "membershipd user add: user %s already registered (unchanged); revoke it first to replace\n", *signPub)
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "membershipd user add: %v\n", err)
os.Exit(1)
}
fmt.Printf("added user %q (%s) role=%s\n", *handle, *signPub, *role)
if kv != nil {
reportKVReplication(kv.js)
}
}
func userList(args []string) {
fs := flag.NewFlagSet("user list", flag.ExitOnError)
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
kf := registerKVFlags(fs)
_ = fs.Parse(args)
store := openStore(*dbPath)
defer store.Close()
store, _, closeStore := resolveStore("user list", kf, *dbPath)
defer closeStore()
users, err := store.ListUsers()
if err != nil {
@@ -143,6 +209,7 @@ func userList(args []string) {
func userRevoke(args []string) {
fs := flag.NewFlagSet("user revoke", flag.ExitOnError)
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
kf := registerKVFlags(fs)
// Go's flag package stops at the first non-flag argument, so `revoke <key>
// --db path` would otherwise leave --db unparsed. Pull a leading positional
@@ -167,8 +234,8 @@ func userRevoke(args []string) {
os.Exit(2)
}
store := openStore(*dbPath)
defer store.Close()
store, _, closeStore := resolveStore("user revoke", kf, *dbPath)
defer closeStore()
if err := store.RevokeUser(signPub); err != nil {
fmt.Fprintf(os.Stderr, "membershipd user revoke: %v\n", err)
+151
View File
@@ -0,0 +1,151 @@
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// users_kv.go is the `--store kv` half of the user administration CLI (issue 0011
// gaps, GAP A): adding and listing bus users directly against the RUNNING
// cluster's replicated JetStream KV allowlist, with no need to stop the cluster,
// seed a standalone node, and restart (the procedure the 0011 deploy required).
//
// The mechanism is the cluster's own privileged internal connection. Under
// enforce every bus user is confined by the per-subject ACL to the JetStream API
// of its own rooms, so no ordinary identity may touch the control-plane buckets
// (KV_UNIBUS_*). The ONLY identity the authenticator grants full JetStream
// permissions is membershipd's internal service identity. By persisting that
// identity to a file (membershipd --internal-id-file) the same key becomes
// available to this CLI, which presents it as its NATS nkey and is therefore
// recognized as the privileged internal client and allowed to read/write the KV.
//
// Intended invocation is over loopback on a cluster node (SSH): the data-plane
// TLS certificate's SAN covers 127.0.0.1/localhost and the internal identity file
// lives 0600 next to the node's TLS keys. Using the file requires root on the
// node, which already implies full control of that node — so co-locating it adds
// no practical exposure beyond what the TLS server key and cluster password
// already represent.
// defaultClusterNatsURL is the node-local NATS listener. The CLI is meant to run
// on a cluster node over SSH, talking to that node's own embedded server.
const defaultClusterNatsURL = "nats://127.0.0.1:4250"
// Deploy-default paths for the privileged identity and the data-plane CA, so an
// on-node invocation needs only --handle/--sign-pub/--role. Override for other
// layouts.
const (
defaultInternalIDFile = "/opt/unibus/secrets/internal.id"
defaultClusterCAFile = "/opt/unibus/tls/ca.crt"
)
// kvConn bundles the privileged NATS connection to a live cluster and the
// KV-backed control-plane store opened over it. Close releases both.
type kvConn struct {
nc *nats.Conn
js jetstream.JetStream
store membership.Store
}
func (k *kvConn) Close() {
if k == nil {
return
}
if k.store != nil {
_ = k.store.Close()
}
if k.nc != nil {
k.nc.Close()
}
}
// connectKVStore opens the privileged internal connection to the cluster's NATS
// and the JetStream KV control-plane store on top of it. internalIDFile is the
// membershipd-persisted internal service identity whose nkey the authenticator
// grants full permissions; caPath pins the data-plane TLS (empty only for a
// non-TLS dev cluster). A non-loopback target without --ca is refused, mirroring
// migrate-to-kv (audit 0008 N6): the allowlist write must not travel in cleartext.
func connectKVStore(natsURL, internalIDFile, caPath string, replicas int) (*kvConn, error) {
if internalIDFile == "" {
return nil, fmt.Errorf("--internal-id-file is required for --store kv (the privileged identity membershipd persists with --internal-id-file)")
}
// Confidentiality guard: a remote NATS without TLS would expose the allowlist
// (handles/roles/sign-pubs) and the privileged nkey handshake in cleartext.
if !isLoopbackURL(natsURL) && caPath == "" {
return nil, fmt.Errorf("refusing to connect to remote %q without --ca: the allowlist write would travel in cleartext — pin TLS with --ca, or run over a loopback --nats-url on a node", natsURL)
}
id, err := client.LoadIdentity(internalIDFile)
if err != nil {
return nil, fmt.Errorf("load internal identity: %w", err)
}
nkeyPub, nkeySign, err := busauth.ClientNkey(id.SignPriv)
if err != nil {
return nil, fmt.Errorf("derive nkey from internal identity: %w", err)
}
opts := []nats.Option{
nats.Name("membershipd-user-cli"),
nats.Nkey(nkeyPub, nkeySign),
}
if caPath != "" {
tlsCfg, err := busauth.LoadCATLSConfig(caPath)
if err != nil {
return nil, fmt.Errorf("load CA %q: %w", caPath, err)
}
opts = append(opts, nats.Secure(tlsCfg))
}
nc, err := nats.Connect(natsURL, opts...)
if err != nil {
return nil, fmt.Errorf("connect cluster NATS %q: %w", natsURL, err)
}
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
return nil, fmt.Errorf("jetstream: %w", err)
}
store, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: replicas})
if err != nil {
nc.Close()
return nil, fmt.Errorf("open KV control-plane store: %w", err)
}
return &kvConn{nc: nc, js: js, store: store}, nil
}
// reportKVReplication prints the replication status of the allowlist bucket
// stream (KV_UNIBUS_users) right after a write, so the operator sees the add
// landed on a quorum and replicated to the followers — executable evidence that
// the live-cluster add is HA, not single-node. Best-effort: a read failure is a
// note, not an error (the write itself already succeeded).
func reportKVReplication(js jetstream.JetStream) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
st, err := js.Stream(ctx, "KV_UNIBUS_users")
if err != nil {
fmt.Fprintf(os.Stderr, "note: could not read KV_UNIBUS_users stream info: %v\n", err)
return
}
info, err := st.Info(ctx)
if err != nil {
fmt.Fprintf(os.Stderr, "note: could not read KV_UNIBUS_users stream info: %v\n", err)
return
}
if info.Cluster == nil {
fmt.Printf("KV_UNIBUS_users: standalone (R1, no cluster replication); msgs=%d\n", info.State.Msgs)
return
}
current := 0
for _, r := range info.Cluster.Replicas {
if r.Current {
current++
}
}
fmt.Printf("KV_UNIBUS_users: leader=%s followers_current=%d/%d msgs=%d\n",
info.Cluster.Leader, current, len(info.Cluster.Replicas), info.State.Msgs)
}
+246
View File
@@ -0,0 +1,246 @@
package main
import (
"encoding/hex"
"fmt"
"strings"
"sync"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/room"
)
// gateway is the live web gateway: it owns the operator's identity and a single
// connected unibus client, and turns the bus's crypto-bearing API into the plain
// REST/SSE surface the browser consumes. The browser never signs, never speaks
// NATS, and never sees a private key — the gateway is the legitimate room member
// that seals/opens payloads on the browser's behalf.
//
// TRUST MODEL: content stays end-to-end encrypted on the wire. The gateway can
// read plaintext because it acts AS the operator's client — a real member of
// each room, holding the room key K like any peer. It is the same trust a native
// desktop client has. In the wallet phase (per-browser WebCrypto identity) the
// decryption can move into the browser; today, for the single-operator MVP, the
// gateway decrypts server-side and pushes cleartext over a loopback/authenticated
// SSE channel.
type gateway struct {
id cs.Identity
endpoint string
cli *client.Client
refreshACL bool // call RefreshSession after a membership change (needed under a per-subject ACL bus)
mu sync.Mutex
hubs map[string]*roomHub // roomID -> live fan-out of decrypted frames to SSE clients
}
// gatewayConfig wires a live gateway.
type gatewayConfig struct {
Identity cs.Identity
NatsURL string
CtrlURL string
CtrlURLs []string
NatsURLs []string
CAPath string // bus CA; empty => plaintext dev connection (matches a loopback membershipd)
}
// newGateway connects the unibus client with the operator identity following the
// same posture seam every peer uses: a non-empty CA path means TLS + nkey, empty
// means plaintext dev. When a CA is configured the bus is assumed to enforce a
// per-subject ACL, so membership changes trigger a session refresh.
func newGateway(cfg gatewayConfig) (*gateway, error) {
opts := client.Options{
CtrlURLs: cfg.CtrlURLs,
NatsServers: cfg.NatsURLs,
}
if cfg.CAPath != "" {
tlsCfg, err := busauth.LoadCATLSConfig(cfg.CAPath)
if err != nil {
return nil, fmt.Errorf("webgw: load bus CA %q: %w", cfg.CAPath, err)
}
opts.UseNkey = true
opts.TLS = tlsCfg
opts.CtrlTLS = tlsCfg
}
cli, err := client.NewWithOptions(cfg.NatsURL, cfg.CtrlURL, cfg.Identity, opts)
if err != nil {
return nil, fmt.Errorf("webgw: connect bus client: %w", err)
}
return &gateway{
id: cfg.Identity,
endpoint: frame.EndpointID(cfg.Identity.SignPub),
cli: cli,
refreshACL: cfg.CAPath != "",
hubs: map[string]*roomHub{},
}, nil
}
// Close stops every hub and releases the bus client connection.
func (g *gateway) Close() error {
g.mu.Lock()
for _, h := range g.hubs {
h.stop()
}
g.hubs = map[string]*roomHub{}
g.mu.Unlock()
if g.cli != nil {
return g.cli.Close()
}
return nil
}
// ---- wire types (browser-facing JSON) ------------------------------------
// meInfo is what GET /api/me returns: the operator identity the gateway acts as.
type meInfo struct {
Endpoint string `json:"endpoint"`
SignPub string `json:"sign_pub"`
}
// roomWire is the browser view of a room. It deliberately omits messages: those
// stream over SSE (GET /api/rooms/{id}/stream), not in the room list.
type roomWire struct {
ID string `json:"id"`
Subject string `json:"subject"`
Name string `json:"name"`
Epoch int `json:"epoch"`
Encrypt bool `json:"encrypt"`
Persist bool `json:"persist"`
SignMsgs bool `json:"sign_msgs"`
Role string `json:"role"`
}
// createRoomReq is the POST /api/rooms body. Encrypt/Persist/SignMsgs are
// pointers so an omitted field falls back to the chat default rather than to the
// Go zero value (false). The common case — the browser sending only {subject,
// encrypted} — maps encrypted onto all three (the Matrix-like chat policy).
type createRoomReq struct {
Subject string `json:"subject"`
Encrypted *bool `json:"encrypted,omitempty"`
Encrypt *bool `json:"encrypt,omitempty"`
Persist *bool `json:"persist,omitempty"`
SignMsgs *bool `json:"sign_msgs,omitempty"`
}
// policy resolves the requested policy. A bare {subject} defaults to the
// Matrix-like chat room (encrypted + persisted + signed) so a created room keeps
// durable, end-to-end-encrypted, authored history. Callers can override any leg.
func (r createRoomReq) policy() room.Policy {
enc, per, sig := true, true, true
if r.Encrypted != nil {
enc, per, sig = *r.Encrypted, *r.Encrypted, *r.Encrypted
}
if r.Encrypt != nil {
enc = *r.Encrypt
}
if r.Persist != nil {
per = *r.Persist
}
if r.SignMsgs != nil {
sig = *r.SignMsgs
}
return room.Policy{Encrypt: enc, Persist: per, SignMsgs: sig}
}
// sendReq is the POST /api/rooms/{id}/send body.
type sendReq struct {
Body string `json:"body"`
}
// msgWire is one decrypted message pushed over SSE.
type msgWire struct {
ID string `json:"id"`
Sender string `json:"sender"`
Body string `json:"body"`
TS int64 `json:"ts"` // epoch ms (decoded from the frame's ULID id)
Mine bool `json:"mine"`
}
// ---- operations -----------------------------------------------------------
func (g *gateway) me() meInfo {
return meInfo{Endpoint: g.endpoint, SignPub: hex.EncodeToString(g.id.SignPub)}
}
// subjectName derives a short, human-friendly room name from its bus subject by
// dropping the leading namespace segment (room., test., proc., agent.). It is a
// display nicety only; the canonical identity stays the subject/room id.
func subjectName(subject string) string {
for _, p := range []string{"room.", "test.", "proc.", "agent.", "rpc."} {
if strings.HasPrefix(subject, p) {
return strings.TrimPrefix(subject, p)
}
}
return subject
}
func (g *gateway) listRooms() ([]roomWire, error) {
rooms, err := g.cli.ListMyRooms()
if err != nil {
return nil, err
}
out := make([]roomWire, 0, len(rooms))
for _, rm := range rooms {
out = append(out, roomWire{
ID: rm.RoomID,
Subject: rm.Subject,
Name: subjectName(rm.Subject),
Epoch: rm.Epoch,
Encrypt: rm.Policy.Encrypt,
Persist: rm.Policy.Persist,
SignMsgs: rm.Policy.SignMsgs,
Role: rm.Role,
})
}
return out, nil
}
func (g *gateway) createRoom(req createRoomReq) (roomWire, error) {
subject := strings.TrimSpace(req.Subject)
if subject == "" {
return roomWire{}, fmt.Errorf("webgw: subject required")
}
p := req.policy()
roomID, err := g.cli.CreateRoom(subject, p)
if err != nil {
return roomWire{}, err
}
// Under a per-subject ACL the operator's frozen NATS permissions do not yet
// cover the new room's subject; refresh so subsequent data-plane use works. On
// a plaintext/non-ACL dev bus this is unnecessary and would needlessly drop any
// live SSE subscriptions, so it is gated on the secured posture.
if g.refreshACL {
_ = g.cli.RefreshSession()
}
return roomWire{
ID: roomID,
Subject: subject,
Name: subjectName(subject),
Epoch: 1,
Encrypt: p.Encrypt,
Persist: p.Persist,
SignMsgs: p.SignMsgs,
Role: "owner",
}, nil
}
// join resolves room metadata and (for encrypted rooms) fetches the room key so
// the gateway can later open payloads. Idempotent.
func (g *gateway) join(roomID string) error {
if err := g.cli.Join(roomID); err != nil {
return err
}
if g.refreshACL {
_ = g.cli.RefreshSession()
}
return nil
}
// send publishes plaintext to a room. The unibus client seals it with the room
// key (encrypted rooms) and signs it (signed rooms) before it leaves the process.
func (g *gateway) send(roomID, body string) error {
return g.cli.Publish(roomID, []byte(body))
}
+140
View File
@@ -0,0 +1,140 @@
package main
import (
"sync"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/oklog/ulid/v2"
)
// roomHub multiplexes ONE unibus room subscription to MANY SSE clients. The
// unibus client derives a per-(room, endpoint) durable consumer name, so a
// second Subscribe for the same room from the same operator would contend for
// the same durable (load-balanced delivery) rather than each browser receiving
// every message. The hub holds a single subscription per room and fans each
// decrypted frame out to every connected browser, which also means the gateway
// opens at most one bus subscription per room regardless of how many tabs watch
// it.
type roomHub struct {
roomID string
myEndpoint string
sub *client.Sub
mu sync.Mutex
clients map[chan msgWire]struct{}
}
// frameTS decodes the millisecond timestamp embedded in a frame's ULID id. A
// malformed id (should not happen for bus-produced frames) yields 0, which the
// browser renders without crashing.
func frameTS(msgID string) int64 {
id, err := ulid.Parse(msgID)
if err != nil {
return 0
}
return int64(id.Time())
}
// newRoomHub opens the single bus subscription for roomID and starts fanning
// decrypted frames out to registered clients. The room must already be joined
// (so the gateway holds the room key) before this is called.
func newRoomHub(cli *client.Client, roomID, myEndpoint string) (*roomHub, error) {
h := &roomHub{
roomID: roomID,
myEndpoint: myEndpoint,
clients: map[chan msgWire]struct{}{},
}
sub, err := cli.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
m := msgWire{
ID: f.MsgID,
Sender: f.Sender,
Body: string(plaintext),
TS: frameTS(f.MsgID),
Mine: f.Sender == myEndpoint,
}
h.broadcast(m)
})
if err != nil {
return nil, err
}
h.sub = sub
return h, nil
}
// broadcast delivers a message to every registered client without blocking the
// NATS delivery goroutine: a client whose buffer is full (a stalled browser)
// drops this frame rather than stalling the whole room.
func (h *roomHub) broadcast(m msgWire) {
h.mu.Lock()
defer h.mu.Unlock()
for ch := range h.clients {
select {
case ch <- m:
default:
}
}
}
// add registers a new SSE client channel.
func (h *roomHub) add(ch chan msgWire) {
h.mu.Lock()
defer h.mu.Unlock()
h.clients[ch] = struct{}{}
}
// stop unsubscribes from the bus. Local delivery ends; for a persisted room the
// durable consumer's ack position stays on the server, so a later subscription
// with the same operator resumes from where it left off.
func (h *roomHub) stop() {
if h.sub != nil {
_ = h.sub.Unsubscribe()
}
}
// openStream joins the room (idempotent; fetches the room key for encrypted
// rooms), attaches an SSE client to the room's hub (creating it on first watcher),
// and returns the client's message channel plus a cleanup func. The cleanup
// detaches the client and, when it was the last watcher, tears down the room's
// single bus subscription.
func (g *gateway) openStream(roomID string) (chan msgWire, func(), error) {
if err := g.join(roomID); err != nil {
return nil, nil, err
}
g.mu.Lock()
h := g.hubs[roomID]
if h == nil {
var err error
h, err = newRoomHub(g.cli, roomID, g.endpoint)
if err != nil {
g.mu.Unlock()
return nil, nil, err
}
g.hubs[roomID] = h
}
g.mu.Unlock()
// Buffer so a brief render hitch in the browser does not drop live frames; a
// sustained stall still drops (broadcast is non-blocking) rather than wedging
// the room.
ch := make(chan msgWire, 64)
h.add(ch)
// cleanup takes g.mu before h.mu (the single, consistent lock order) so a
// concurrent openStream that re-creates the hub cannot race the teardown.
cleanup := func() {
g.mu.Lock()
defer g.mu.Unlock()
h.mu.Lock()
delete(h.clients, ch)
empty := len(h.clients) == 0
h.mu.Unlock()
if empty {
if cur := g.hubs[roomID]; cur == h {
delete(g.hubs, roomID)
h.stop()
}
}
}
return ch, cleanup, nil
}
+98
View File
@@ -0,0 +1,98 @@
package main
import (
"encoding/base64"
"encoding/json"
"fmt"
"os"
"os/exec"
cs "fn-registry/functions/cybersecurity"
)
// identityJSON mirrors the on-disk / pass-stored identity format shared across
// the unibus tooling: the four keypair halves, each std-base64. It is the SAME
// shape the bus client persists (pkg/client identity file) and the operator's
// `pass` entry unibus/operator-identity, so the web gateway loads the operator's
// identity without a divergent serialization. Kept in lockstep with
// unibus_admin/internal/admin/identity.go.
type identityJSON struct {
SignPub string `json:"sign_pub"`
SignPriv string `json:"sign_priv"`
KexPub string `json:"kex_pub"`
KexPriv string `json:"kex_priv"`
}
// decodeIdentity turns the JSON identity bytes into a cs.Identity. The private
// halves stay only in memory; this never writes them anywhere.
func decodeIdentity(raw []byte) (cs.Identity, error) {
var f identityJSON
if err := json.Unmarshal(raw, &f); err != nil {
return cs.Identity{}, fmt.Errorf("webgw: parse identity json: %w", err)
}
dec := base64.StdEncoding.DecodeString
signPub, err := dec(f.SignPub)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: decode sign_pub: %w", err)
}
signPriv, err := dec(f.SignPriv)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: decode sign_priv: %w", err)
}
kexPub, err := dec(f.KexPub)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: decode kex_pub: %w", err)
}
kexPriv, err := dec(f.KexPriv)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: decode kex_priv: %w", err)
}
if len(signPub) != 32 || len(signPriv) != 64 || len(kexPub) != 32 || len(kexPriv) != 32 {
return cs.Identity{}, fmt.Errorf("webgw: identity has wrong key sizes (sign_pub=%d sign_priv=%d kex_pub=%d kex_priv=%d)",
len(signPub), len(signPriv), len(kexPub), len(kexPriv))
}
return cs.Identity{SignPub: signPub, SignPriv: signPriv, KexPub: kexPub, KexPriv: kexPriv}, nil
}
// loadIdentityFromFile reads a 0600 identity JSON file (the same format the bus
// client writes) and decodes it. Used on a deploy host where `pass` is not
// available and the operator identity is delivered as a protected file.
func loadIdentityFromFile(path string) (cs.Identity, error) {
raw, err := os.ReadFile(path)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: read identity file %q: %w", path, err)
}
return decodeIdentity(raw)
}
// loadIdentityFromPass shells out to `pass show <entry>` and decodes the JSON
// identity it returns. The secret is held only in memory; this process never
// writes it to disk or argv. Used in local operator workflows where the GNU
// password store holds unibus/operator-identity.
func loadIdentityFromPass(entry string) (cs.Identity, error) {
out, err := exec.Command("pass", "show", entry).Output()
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: pass show %q: %w", entry, err)
}
return decodeIdentity(out)
}
// loadPassValue returns the first line of a `pass show <entry>` for non-identity
// secrets (e.g. the unlock passphrase). Empty entry yields an empty string and
// no error, so callers can treat "no pass entry configured" as "not set".
func loadPassValue(entry string) (string, error) {
if entry == "" {
return "", nil
}
out, err := exec.Command("pass", "show", entry).Output()
if err != nil {
return "", fmt.Errorf("webgw: pass show %q: %w", entry, err)
}
s := string(out)
for i := 0; i < len(s); i++ {
if s[i] == '\n' || s[i] == '\r' {
return s[:i], nil
}
}
return s, nil
}
+199
View File
@@ -0,0 +1,199 @@
// Command webgw is the web gateway for the unibus chat SPA. It is a single Go
// binary that holds the operator's bus identity, connects to the bus as a real
// authenticated peer (pkg/client), and exposes a small REST + SSE API the
// browser consumes. The browser never signs, never speaks NATS, and never sees a
// private key: it authenticates to the gateway with a passphrase and thereafter
// holds only an opaque session cookie.
//
// TRUST MODEL (MVP, single operator): room content stays end-to-end encrypted on
// the bus. The gateway can read plaintext because it acts AS the operator's
// client — a legitimate member of each room holding the room key. Decryption
// happens server-side in this process; cleartext then crosses an authenticated
// (loopback or TLS-fronted) SSE channel to the browser. The wallet phase (issue:
// per-browser WebCrypto identity) can move decryption into the browser; see the
// report for the FASE 2 plan.
//
// # local dev against a loopback membershipd (plaintext), operator from pass:
// webgw --identity-pass unibus/operator-identity \
// --ctrl-url http://127.0.0.1:8470 --nats-url nats://127.0.0.1:4250
//
// # secured cluster (TLS + nkey on both planes), identity from a 0600 file:
// webgw --ca ca.crt --identity-file operator.id \
// --ctrl-url https://node-a:8470 --nats-url nats://node-a:4250 \
// --ctrl-urls https://node-b:8470,https://node-c:8470 \
// --nats-urls nats://node-b:4250,nats://node-c:4250
package main
import (
"context"
"flag"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
cs "fn-registry/functions/cybersecurity"
)
func main() {
var (
bind = flag.String("bind", "127.0.0.1", "interface to bind the gateway HTTP server to (loopback by default)")
port = flag.String("port", "8481", "gateway HTTP port")
ctrlURL = flag.String("ctrl-url", "http://127.0.0.1:8470", "primary unibus control-plane base URL")
ctrlURLs = flag.String("ctrl-urls", "", "comma-separated ADDITIONAL control-plane base URLs (cluster failover)")
natsURL = flag.String("nats-url", "nats://127.0.0.1:4250", "primary NATS URL")
natsURLs = flag.String("nats-urls", "", "comma-separated ADDITIONAL NATS seed URLs (cluster failover)")
caPath = flag.String("ca", "", "bus CA cert path; set to talk TLS+nkey to a secured bus (empty = plaintext dev)")
identityFile = flag.String("identity-file", "", "path to the operator identity JSON file (0600). Mutually exclusive with --identity-pass")
identityPass = flag.String("identity-pass", "", "pass(1) entry holding the operator identity JSON, e.g. unibus/operator-identity")
unlockPass = flag.String("unlock-pass", "", "literal passphrase the browser must send to unlock a LEGACY operator session (dev). Prefer --unlock-pass-entry")
unlockEntry = flag.String("unlock-pass-entry", "unibus/admin-panel-password", "pass(1) entry holding the operator unlock passphrase (used when --unlock-pass is empty)")
registerURL = flag.String("register-url", "", "bus POST /register URL for wallet onboarding. Empty = derive from --ctrl-url (<ctrl-url>/register)")
mockTokens = flag.String("mock-tokens", "", "DEV ONLY: comma-separated one-shot invite tokens for local testing, 'token=handle:role'. Empty in production (real invites come from the bus). Example: demo=demo:member")
webDir = flag.String("web-dir", "", "OPTIONAL path to the built SPA (web/dist) to serve. Empty = API only (use vite dev server)")
)
flag.Parse()
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
log.SetPrefix("[webgw] ")
id, err := loadIdentity(*identityFile, *identityPass)
if err != nil {
log.Fatalf("%v", err)
}
unlock := *unlockPass
if unlock == "" {
unlock, err = loadPassValue(*unlockEntry)
if err != nil {
log.Fatalf("resolve unlock passphrase: %v", err)
}
}
if unlock == "" {
log.Fatalf("an unlock passphrase is required: set --unlock-pass or a non-empty --unlock-pass-entry (default unibus/admin-panel-password)")
}
resolvedWebDir := resolveWebDir(*webDir)
// busTemplate is the connection config every bus client uses. The operator
// gateway uses it as-is; each wallet session clones it and overrides Identity
// with the logged-in user's keypair.
busTemplate := gatewayConfig{
Identity: id,
NatsURL: *natsURL,
CtrlURL: *ctrlURL,
CtrlURLs: splitCSV(*ctrlURLs),
NatsURLs: splitCSV(*natsURLs),
CAPath: *caPath,
}
gw, err := newGateway(busTemplate)
if err != nil {
log.Fatalf("%v", err)
}
defer gw.Close()
// Wallet onboarding backend: POST /api/register targets the bus's /register
// (added by the user-accounts work). When --register-url is empty we derive it
// from --ctrl-url; --mock-tokens supplies one-shot invites for local testing
// before that endpoint is deployed.
regURL := *registerURL
if regURL == "" {
regURL = strings.TrimRight(*ctrlURL, "/") + "/register"
}
registrar := newRegistrar(regURL, *mockTokens)
log.Printf("operator endpoint: %s", gw.endpoint)
log.Printf("control plane: %s (+%d failover)", *ctrlURL, len(splitCSV(*ctrlURLs)))
tls := "OFF (plaintext dev)"
if *caPath != "" {
tls = "ON (CA " + *caPath + ")"
}
log.Printf("bus TLS+nkey: %s", tls)
if resolvedWebDir != "" {
log.Printf("serving SPA from: %s", resolvedWebDir)
} else {
log.Printf("API only (no --web-dir): use the vite dev server with a /api+stream proxy")
}
log.Printf("wallet register: %s (mock tokens: %d)", regURL, mockTokenCount(*mockTokens))
srv := newServer(gw, busTemplate, registrar, unlock, resolvedWebDir)
addr := *bind + ":" + *port
httpSrv := &http.Server{
Addr: addr,
Handler: srv,
// No global write timeout: SSE streams are long-lived. Header timeout still
// bounds slowloris on the request line/headers.
ReadHeaderTimeout: 10 * time.Second,
}
go func() {
log.Printf("web gateway: http://%s", addr)
if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("http server: %v", err)
}
}()
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
<-stop
log.Printf("shutting down...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = httpSrv.Shutdown(ctx)
log.Printf("bye")
}
// loadIdentity resolves the operator identity from exactly one of --identity-file
// or --identity-pass.
func loadIdentity(file, passEntry string) (cs.Identity, error) {
switch {
case file != "" && passEntry != "":
return cs.Identity{}, errFlag("set only one of --identity-file or --identity-pass")
case file != "":
return loadIdentityFromFile(file)
case passEntry != "":
return loadIdentityFromPass(passEntry)
default:
return cs.Identity{}, errFlag("an identity is required: pass --identity-file <path> or --identity-pass <entry>")
}
}
// resolveWebDir validates the --web-dir flag. An empty flag means API-only. A
// non-empty dir is kept only if it actually holds an index.html, so a typo logs
// "API only" rather than serving 404s.
func resolveWebDir(dir string) string {
if dir == "" {
return ""
}
abs, err := filepath.Abs(dir)
if err != nil {
log.Printf("WARN --web-dir %q: %v; serving API only", dir, err)
return ""
}
if !statFile(filepath.Join(abs, "index.html")) {
log.Printf("WARN --web-dir %q has no index.html; serving API only", abs)
return ""
}
return abs
}
type flagErr string
func (e flagErr) Error() string { return string(e) }
func errFlag(s string) error { return flagErr("webgw: " + s) }
func splitCSV(s string) []string {
var out []string
for _, p := range strings.Split(s, ",") {
if p = strings.TrimSpace(p); p != "" {
out = append(out, p)
}
}
return out
}
+193
View File
@@ -0,0 +1,193 @@
package main
import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
)
// registerReq is the POST /api/register body. It mirrors the bus contract exactly
// (token + the two PUBLIC key halves, each 64 hex chars). The private key never
// appears here — registration only publishes the public identity. The handle and
// role are NOT accepted from the client; they are fixed by the invite the token
// belongs to (no privilege escalation).
type registerReq struct {
Token string `json:"token"`
SignPub string `json:"sign_pub"`
KexPub string `json:"kex_pub"`
}
// registerResp is what we return to the browser on success. The bus's /register
// (issue: user-accounts) decides handle/role from the invite; in mock mode the
// gateway echoes the configured pair so the SPA can greet the new user.
type registerResp struct {
Handle string `json:"handle"`
Role string `json:"role"`
}
// registrar fulfils POST /api/register. It targets the bus's POST /register
// endpoint (added by the user-accounts work, bus >= 0.12.0). Until that endpoint
// is rolled out, a built-in mock validates against a configured set of one-shot
// tokens so the whole wallet flow is testable locally. Mock tokens are checked
// first; anything else is proxied to the real bus when --register-url is set.
type registrar struct {
mu sync.Mutex
registerURL string // bus POST /register; empty => mock-only
httpc *http.Client // for proxying to the bus
mockTokens map[string]*mockToken // configured one-shot invites for local testing
}
// mockToken is a local stand-in for a bus invite: a token that maps to a fixed
// handle+role and can be consumed exactly once.
type mockToken struct {
handle string
role string
used bool
}
// newRegistrar parses the --mock-tokens spec ("tok=handle:role,tok2=h2:role2")
// and configures the optional proxy target.
func newRegistrar(registerURL, mockSpec string) *registrar {
r := &registrar{
registerURL: strings.TrimSpace(registerURL),
httpc: &http.Client{Timeout: 10 * time.Second},
mockTokens: map[string]*mockToken{},
}
for _, part := range strings.Split(mockSpec, ",") {
part = strings.TrimSpace(part)
if part == "" {
continue
}
// tok=handle:role (role optional, defaults to member)
eq := strings.IndexByte(part, '=')
if eq < 0 {
continue
}
tok := strings.TrimSpace(part[:eq])
hr := strings.TrimSpace(part[eq+1:])
handle, role := hr, "member"
if c := strings.IndexByte(hr, ':'); c >= 0 {
handle, role = strings.TrimSpace(hr[:c]), strings.TrimSpace(hr[c+1:])
}
if tok != "" && handle != "" {
r.mockTokens[tok] = &mockToken{handle: handle, role: role}
}
}
return r
}
// mockTokenCount counts configured mock tokens in a --mock-tokens spec (for the
// startup log line).
func mockTokenCount(spec string) int {
n := 0
for _, part := range strings.Split(spec, ",") {
if p := strings.TrimSpace(part); p != "" && strings.ContainsRune(p, '=') {
n++
}
}
return n
}
// validHexKey reports whether s is exactly 64 lowercase/uppercase hex chars (a
// 32-byte key). Both sign_pub and kex_pub are 32-byte keys.
func validHexKey(s string) bool {
if len(s) != 64 {
return false
}
_, err := hex.DecodeString(s)
return err == nil
}
// handleRegister validates the keys and consumes the token. Order of resolution:
// 1. strict validation of the public keys (defends both mock and proxy paths);
// 2. mock token (one-shot) if configured;
// 3. proxy to the bus /register if --register-url is set;
// 4. otherwise reject with a clear error.
func (s *server) handleRegister(w http.ResponseWriter, r *http.Request) {
var req registerReq
if !decode(w, r, &req) {
return
}
req.Token = strings.TrimSpace(req.Token)
if req.Token == "" {
writeErr(w, http.StatusBadRequest, "token required")
return
}
if !validHexKey(req.SignPub) {
writeErr(w, http.StatusBadRequest, "sign_pub must be 64 hex chars (32 bytes)")
return
}
if !validHexKey(req.KexPub) {
writeErr(w, http.StatusBadRequest, "kex_pub must be 64 hex chars (32 bytes)")
return
}
reg := s.registrar
// 2) mock one-shot token.
reg.mu.Lock()
mt, isMock := reg.mockTokens[req.Token]
if isMock {
if mt.used {
reg.mu.Unlock()
writeErr(w, http.StatusConflict, "invite already used")
return
}
mt.used = true
handle, role := mt.handle, mt.role
reg.mu.Unlock()
writeJSON(w, http.StatusCreated, registerResp{Handle: handle, Role: role})
return
}
reg.mu.Unlock()
// 3) proxy to the real bus /register when configured.
if reg.registerURL != "" {
s.proxyRegister(w, req)
return
}
// 4) no mock match, no proxy target.
writeErr(w, http.StatusBadRequest, "invalid or unknown token (and no bus /register configured)")
}
// proxyRegister forwards the registration to the bus's POST /register. The bus
// validates the invite (existence, not-used, not-expired) and adds the public
// identity to the allowlist with the invite's handle+role. This is unsigned by
// design: the TOKEN authorizes the call, not an admin signature.
func (s *server) proxyRegister(w http.ResponseWriter, req registerReq) {
body, _ := json.Marshal(req)
resp, err := s.registrar.httpc.Post(
s.registrar.registerURL,
"application/json",
bytes.NewReader(body),
)
if err != nil {
writeErr(w, http.StatusBadGateway, "bus register unreachable: "+err.Error())
return
}
defer resp.Body.Close()
raw, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
// On success, try to pass through the bus's handle/role if it returned them;
// otherwise a bare 201 is still success.
if resp.StatusCode == http.StatusCreated || resp.StatusCode == http.StatusOK {
var rr registerResp
_ = json.Unmarshal(raw, &rr)
writeJSON(w, http.StatusCreated, rr)
return
}
// Forward the bus's error verbatim where possible.
msg := strings.TrimSpace(string(raw))
if msg == "" {
msg = fmt.Sprintf("bus register failed (HTTP %d)", resp.StatusCode)
}
writeErr(w, resp.StatusCode, msg)
}
+327
View File
@@ -0,0 +1,327 @@
package main
import (
"crypto/rand"
"crypto/subtle"
"encoding/hex"
"encoding/json"
"net/http"
"os"
"path/filepath"
"strings"
"time"
)
// sessionCookie is the name of the gateway's session cookie. The browser sends
// it automatically on same-origin fetches AND on EventSource (SSE) connections —
// EventSource cannot set custom headers, so a cookie is the only way to
// authenticate the stream. It is HttpOnly so page JS can never read the token.
const sessionCookie = "unibus_session"
// server is the gateway's HTTP surface: a small REST/SSE API under /api plus an
// optional static file server for the built SPA.
//
// Two ways to get a session:
// - POST /api/session — the WALLET model. The browser hands its own bus
// identity (unlocked from its local encrypted key) and the gateway connects a
// dedicated bus client AS that user. Per-user, the primary path.
// - POST /api/login — the legacy operator passphrase. Binds the session to the
// single shared operator gateway. Kept for backward compatibility.
// - POST /api/register — the WALLET onboarding. Unauthenticated (the invite
// token authorizes), it consumes a token and publishes the new user's PUBLIC
// identity to the bus allowlist.
type server struct {
operatorGW *gateway // shared operator client (legacy passphrase login)
busTemplate gatewayConfig // bus connection config; Identity is overridden per user session
registrar *registrar // POST /api/register backend (mock + proxy)
unlock string // passphrase that unlocks an operator session (constant-time compare)
webDir string // optional path to the built SPA (web/dist); empty = API only
mux *http.ServeMux
sessions *sessionStore
}
func newServer(operatorGW *gateway, busTemplate gatewayConfig, registrar *registrar, unlock, webDir string) *server {
s := &server{
operatorGW: operatorGW,
busTemplate: busTemplate,
registrar: registrar,
unlock: unlock,
webDir: webDir,
mux: http.NewServeMux(),
sessions: newSessionStore(),
}
s.routes()
return s
}
func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mux.ServeHTTP(w, r) }
func (s *server) routes() {
// Liveness, unauthenticated (systemd / deploy smoke).
s.mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, _ *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
})
// Unauthenticated onboarding / auth routes.
s.mux.HandleFunc("POST /api/register", s.handleRegister) // invite token authorizes
s.mux.HandleFunc("POST /api/session", s.handleSession) // wallet: per-user identity
s.mux.HandleFunc("POST /api/login", s.handleLogin) // legacy operator passphrase
// Session-gated routes.
s.mux.HandleFunc("POST /api/logout", s.auth(s.handleLogout))
s.mux.HandleFunc("GET /api/me", s.auth(s.handleMe))
s.mux.HandleFunc("GET /api/rooms", s.auth(s.handleListRooms))
s.mux.HandleFunc("POST /api/rooms", s.auth(s.handleCreateRoom))
s.mux.HandleFunc("POST /api/rooms/{id}/join", s.auth(s.handleJoin))
s.mux.HandleFunc("POST /api/rooms/{id}/send", s.auth(s.handleSend))
s.mux.HandleFunc("GET /api/rooms/{id}/stream", s.auth(s.handleStream))
// Everything else is the SPA (when --web-dir is set). Registered last.
if s.webDir != "" {
s.mux.Handle("/", s.spaHandler())
}
}
// meResp is the identity view returned by /api/session, /api/login and /api/me:
// the bus endpoint the session acts as, its signing public key, and the display
// handle.
type meResp struct {
Endpoint string `json:"endpoint"`
SignPub string `json:"sign_pub"`
Handle string `json:"handle"`
}
// ---- auth -----------------------------------------------------------------
// auth wraps a handler so it runs only with a valid session cookie, resolving the
// session (and thus the per-user gateway) it belongs to. A missing or unknown
// token yields 401, which the SPA treats as "show the login screen".
func (s *server) auth(next func(http.ResponseWriter, *http.Request, *session)) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
c, err := r.Cookie(sessionCookie)
if err != nil {
writeErr(w, http.StatusUnauthorized, "not authenticated")
return
}
sess, ok := s.sessions.get(c.Value)
if !ok {
writeErr(w, http.StatusUnauthorized, "not authenticated")
return
}
next(w, r, sess)
}
}
// handleLogin is the legacy operator passphrase login: it unlocks a session bound
// to the shared operator gateway. The wallet path (POST /api/session) is
// preferred; this remains for backward compatibility with the single-operator MVP.
func (s *server) handleLogin(w http.ResponseWriter, r *http.Request) {
var req struct {
Passphrase string `json:"passphrase"`
}
if !decode(w, r, &req) {
return
}
// Constant-time compare so a wrong passphrase cannot be timed character by
// character. An empty configured passphrase never matches.
if s.unlock == "" || subtle.ConstantTimeCompare([]byte(req.Passphrase), []byte(s.unlock)) != 1 {
writeErr(w, http.StatusUnauthorized, "wrong passphrase")
return
}
tok := newToken()
handle := s.operatorGW.endpoint
if len(handle) > 8 {
handle = handle[:8]
}
s.sessions.put(tok, &session{gw: s.operatorGW, owned: false, handle: handle, issuedAt: time.Now()})
http.SetCookie(w, &http.Cookie{
Name: sessionCookie,
Value: tok,
Path: "/",
HttpOnly: true,
SameSite: http.SameSiteLaxMode,
})
writeJSON(w, http.StatusOK, meResp{Endpoint: s.operatorGW.endpoint, SignPub: hex.EncodeToString(s.operatorGW.id.SignPub), Handle: handle})
}
func (s *server) handleLogout(w http.ResponseWriter, r *http.Request, _ *session) {
if c, err := r.Cookie(sessionCookie); err == nil {
if sess, ok := s.sessions.drop(c.Value); ok && sess.owned && sess.gw != nil {
// Per-user session: tear down its bus client so the private key and the
// NATS connection do not outlive the session.
_ = sess.gw.Close()
}
}
http.SetCookie(w, &http.Cookie{Name: sessionCookie, Value: "", Path: "/", MaxAge: -1, HttpOnly: true})
writeJSON(w, http.StatusOK, map[string]string{"status": "logged_out"})
}
func (s *server) handleMe(w http.ResponseWriter, _ *http.Request, sess *session) {
writeJSON(w, http.StatusOK, meResp{
Endpoint: sess.gw.endpoint,
SignPub: hex.EncodeToString(sess.gw.id.SignPub),
Handle: sess.handle,
})
}
// ---- rooms ----------------------------------------------------------------
func (s *server) handleListRooms(w http.ResponseWriter, _ *http.Request, sess *session) {
rooms, err := sess.gw.listRooms()
if err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, rooms)
}
func (s *server) handleCreateRoom(w http.ResponseWriter, r *http.Request, sess *session) {
var req createRoomReq
if !decode(w, r, &req) {
return
}
rv, err := sess.gw.createRoom(req)
if err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusCreated, rv)
}
func (s *server) handleJoin(w http.ResponseWriter, r *http.Request, sess *session) {
if err := sess.gw.join(r.PathValue("id")); err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "joined"})
}
func (s *server) handleSend(w http.ResponseWriter, r *http.Request, sess *session) {
var req sendReq
if !decode(w, r, &req) {
return
}
if strings.TrimSpace(req.Body) == "" {
writeErr(w, http.StatusBadRequest, "body required")
return
}
if err := sess.gw.send(r.PathValue("id"), req.Body); err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "sent"})
}
// handleStream is the SSE endpoint: it joins the room, attaches to the session's
// fan-out hub, and streams each decrypted message as a `data:` event. For a
// persisted room the hub's underlying subscription delivers history first
// (scrollback) and then live messages; for an ephemeral room only live messages
// flow. The stream ends when the browser disconnects (ctx cancelled).
func (s *server) handleStream(w http.ResponseWriter, r *http.Request, sess *session) {
flusher, ok := w.(http.Flusher)
if !ok {
writeErr(w, http.StatusInternalServerError, "streaming unsupported")
return
}
ch, cleanup, err := sess.gw.openStream(r.PathValue("id"))
if err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
defer cleanup()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // disable proxy buffering (nginx/caddy)
w.WriteHeader(http.StatusOK)
// An initial comment opens the stream immediately so the browser's
// EventSource fires `onopen` without waiting for the first message.
_, _ = w.Write([]byte(": connected\n\n"))
flusher.Flush()
ctx := r.Context()
ping := time.NewTicker(25 * time.Second)
defer ping.Stop()
for {
select {
case <-ctx.Done():
return
case <-ping.C:
// Comment line keeps idle proxies from closing the connection.
if _, err := w.Write([]byte(": ping\n\n")); err != nil {
return
}
flusher.Flush()
case m := <-ch:
b, err := json.Marshal(m)
if err != nil {
continue
}
if _, err := w.Write([]byte("data: " + string(b) + "\n\n")); err != nil {
return
}
flusher.Flush()
}
}
}
// ---- SPA serving (optional) -----------------------------------------------
// spaHandler serves the built SPA from s.webDir. A request for an existing asset
// is served directly; any other path (a client-side route) falls back to
// index.html so the SPA router can take over. /api and /healthz are matched first.
func (s *server) spaHandler() http.Handler {
root := http.Dir(s.webDir)
fileServer := http.FileServer(root)
index := filepath.Join(s.webDir, "index.html")
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/")
if p == "" {
http.ServeFile(w, r, index)
return
}
if f, err := root.Open(p); err == nil {
_ = f.Close()
fileServer.ServeHTTP(w, r)
return
}
http.ServeFile(w, r, index) // unknown path -> SPA client-side routing
})
}
// ---- helpers --------------------------------------------------------------
func newToken() string {
b := make([]byte, 32)
_, _ = rand.Read(b)
return hex.EncodeToString(b)
}
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(v)
}
func writeErr(w http.ResponseWriter, code int, msg string) {
writeJSON(w, code, map[string]string{"error": msg})
}
// decode reads a JSON body into v, writing a 400 and returning false on failure.
func decode(w http.ResponseWriter, r *http.Request, v any) bool {
defer r.Body.Close()
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(v); err != nil {
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
return false
}
return true
}
// statFile reports whether path exists and is a regular file (used to validate
// --web-dir at startup so a typo surfaces as a clear log line, not 404s later).
func statFile(path string) bool {
fi, err := os.Stat(path)
return err == nil && !fi.IsDir()
}
+146
View File
@@ -0,0 +1,146 @@
package main
import (
"encoding/hex"
"fmt"
"net/http"
"sync"
"time"
cs "fn-registry/functions/cybersecurity"
)
// session is one logged-in browser. In the wallet model each session carries the
// user's OWN bus identity: the browser unlocks its locally-encrypted private key
// and hands the full keypair to the gateway over TLS, and the gateway spins up a
// dedicated bus client (a *gateway) that acts AS that user. The private key lives
// only in this process's memory for the life of the session — it is never written
// to disk and is dropped when the session ends.
//
// A session may instead point at the shared operator gateway (the legacy
// passphrase login); `owned` distinguishes the two so logout only closes the bus
// client it created.
type session struct {
gw *gateway
owned bool // true => gw was built for this session and must be Closed on logout
handle string
issuedAt time.Time
}
// sessionStore is the gateway's set of live browser sessions, keyed by the opaque
// cookie token. It is independent of any single bus identity.
type sessionStore struct {
mu sync.Mutex
m map[string]*session
}
func newSessionStore() *sessionStore { return &sessionStore{m: map[string]*session{}} }
func (st *sessionStore) put(token string, s *session) {
st.mu.Lock()
st.m[token] = s
st.mu.Unlock()
}
func (st *sessionStore) get(token string) (*session, bool) {
st.mu.Lock()
defer st.mu.Unlock()
s, ok := st.m[token]
return s, ok
}
// drop removes a session and returns it so the caller can close an owned gateway.
func (st *sessionStore) drop(token string) (*session, bool) {
st.mu.Lock()
defer st.mu.Unlock()
s, ok := st.m[token]
if ok {
delete(st.m, token)
}
return s, ok
}
// closeAll closes every owned per-user gateway (used at shutdown). The shared
// operator gateway is owned by main and closed separately.
func (st *sessionStore) closeAll() {
st.mu.Lock()
defer st.mu.Unlock()
for tok, s := range st.m {
if s.owned && s.gw != nil {
_ = s.gw.Close()
}
delete(st.m, tok)
}
}
// identityFromHex builds a cs.Identity from the four hex halves the browser sends
// on POST /api/session. It enforces the exact key sizes (sign_pub 32, sign_priv
// 64, kex_pub 32, kex_priv 32) so a malformed body cannot produce a half-built
// identity that fails opaquely deep in the bus client.
func identityFromHex(signPub, signPriv, kexPub, kexPriv string) (cs.Identity, error) {
sp, err := hex.DecodeString(signPub)
if err != nil {
return cs.Identity{}, fmt.Errorf("sign_pub: %w", err)
}
spriv, err := hex.DecodeString(signPriv)
if err != nil {
return cs.Identity{}, fmt.Errorf("sign_priv: %w", err)
}
kp, err := hex.DecodeString(kexPub)
if err != nil {
return cs.Identity{}, fmt.Errorf("kex_pub: %w", err)
}
kpriv, err := hex.DecodeString(kexPriv)
if err != nil {
return cs.Identity{}, fmt.Errorf("kex_priv: %w", err)
}
if len(sp) != 32 || len(spriv) != 64 || len(kp) != 32 || len(kpriv) != 32 {
return cs.Identity{}, fmt.Errorf("wrong key sizes (sign_pub=%d sign_priv=%d kex_pub=%d kex_priv=%d; want 32/64/32/32)",
len(sp), len(spriv), len(kp), len(kpriv))
}
return cs.Identity{SignPub: sp, SignPriv: spriv, KexPub: kp, KexPriv: kpriv}, nil
}
// sessionReq is the POST /api/session body: the user's full wallet identity (hex)
// plus a display handle. The private halves arrive only over TLS and are held in
// memory for the session; they are never persisted server-side.
type sessionReq struct {
Handle string `json:"handle"`
SignPub string `json:"sign_pub"`
SignPriv string `json:"sign_priv"`
KexPub string `json:"kex_pub"`
KexPriv string `json:"kex_priv"`
}
// handleSession opens a per-user session. It builds the user's bus identity from
// the posted keypair, connects a dedicated bus client as that user, and issues a
// session cookie bound to it. This is the wallet-model replacement for the
// operator passphrase login.
func (s *server) handleSession(w http.ResponseWriter, r *http.Request) {
var req sessionReq
if !decode(w, r, &req) {
return
}
id, err := identityFromHex(req.SignPub, req.SignPriv, req.KexPub, req.KexPriv)
if err != nil {
writeErr(w, http.StatusBadRequest, "bad identity: "+err.Error())
return
}
cfg := s.busTemplate
cfg.Identity = id
gw, err := newGateway(cfg)
if err != nil {
writeErr(w, http.StatusBadGateway, "connect bus as user: "+err.Error())
return
}
tok := newToken()
s.sessions.put(tok, &session{gw: gw, owned: true, handle: req.Handle, issuedAt: time.Now()})
http.SetCookie(w, &http.Cookie{
Name: sessionCookie,
Value: tok,
Path: "/",
HttpOnly: true,
SameSite: http.SameSiteLaxMode,
})
writeJSON(w, http.StatusOK, meResp{Endpoint: gw.endpoint, SignPub: req.SignPub, Handle: req.Handle})
}
+114
View File
@@ -0,0 +1,114 @@
package main
import (
"encoding/json"
"net/http/httptest"
"strings"
"testing"
)
// fixed wallet vector derived in the browser from the mnemonic
// "legal winner thank year wave sausage worth useful legal winner thank yellow"
// using the unibus-sign-v1 / unibus-kex-v1 HKDF scheme. Used to assert the Go
// side accepts the browser-derived key sizes.
const (
fixSignPub = "3d594317212e53a3685b305539f6789eb8c538579e350ca795278b180ebb53db"
fixSignPriv = "94485d66ac958e23546be2e3b7575a47e1264bdf082e09abb7ad02ab32fcd55e3d594317212e53a3685b305539f6789eb8c538579e350ca795278b180ebb53db"
fixKexPub = "f3561ca116e4444b8880b8c0a35f2c9e85804d8628006facd84b1a6146208257"
fixKexPriv = "f6ffdf15e5ee2af0494897ff43e61a06d632af425a0372cb53a7c3e0f84c2bb2"
)
func TestIdentityFromHex(t *testing.T) {
id, err := identityFromHex(fixSignPub, fixSignPriv, fixKexPub, fixKexPriv)
if err != nil {
t.Fatalf("identityFromHex valid vector: %v", err)
}
if len(id.SignPub) != 32 || len(id.SignPriv) != 64 || len(id.KexPub) != 32 || len(id.KexPriv) != 32 {
t.Fatalf("wrong sizes: %d/%d/%d/%d", len(id.SignPub), len(id.SignPriv), len(id.KexPub), len(id.KexPriv))
}
// Wrong sign_priv size (32 instead of 64) must be rejected.
if _, err := identityFromHex(fixSignPub, fixSignPub, fixKexPub, fixKexPriv); err == nil {
t.Fatalf("expected error for short sign_priv")
}
// Non-hex must be rejected.
if _, err := identityFromHex("zz", fixSignPriv, fixKexPub, fixKexPriv); err == nil {
t.Fatalf("expected error for non-hex sign_pub")
}
}
func TestValidHexKey(t *testing.T) {
if !validHexKey(fixSignPub) {
t.Fatalf("fixSignPub should be a valid 32-byte hex key")
}
if validHexKey("abcd") {
t.Fatalf("short key should be invalid")
}
if validHexKey(strings.Repeat("z", 64)) {
t.Fatalf("non-hex key should be invalid")
}
}
func TestNewRegistrarParsesMockTokens(t *testing.T) {
r := newRegistrar("", "demo=demo:member, bob=bob, alice=alice:admin")
if len(r.mockTokens) != 3 {
t.Fatalf("want 3 mock tokens, got %d", len(r.mockTokens))
}
if r.mockTokens["demo"].role != "member" || r.mockTokens["demo"].handle != "demo" {
t.Fatalf("demo token parsed wrong: %+v", r.mockTokens["demo"])
}
if r.mockTokens["bob"].role != "member" {
t.Fatalf("bob should default to role member, got %q", r.mockTokens["bob"].role)
}
if r.mockTokens["alice"].role != "admin" {
t.Fatalf("alice should be admin, got %q", r.mockTokens["alice"].role)
}
}
// post builds a server with only a registrar (the register path does not touch a
// gateway) and runs one POST /api/register, returning status + decoded body.
func postRegister(t *testing.T, s *server, body string) (int, map[string]string) {
t.Helper()
req := httptest.NewRequest("POST", "/api/register", strings.NewReader(body))
w := httptest.NewRecorder()
s.handleRegister(w, req)
var m map[string]string
_ = json.Unmarshal(w.Body.Bytes(), &m)
return w.Code, m
}
func TestHandleRegisterMockSingleUse(t *testing.T) {
s := &server{registrar: newRegistrar("", "demo=demo:member")}
// 1) valid token + valid keys => 201 with the invite's handle/role.
code, body := postRegister(t, s, `{"token":"demo","sign_pub":"`+fixSignPub+`","kex_pub":"`+fixKexPub+`"}`)
if code != 201 {
t.Fatalf("first register: want 201, got %d (%v)", code, body)
}
if body["handle"] != "demo" || body["role"] != "member" {
t.Fatalf("first register body: %v", body)
}
// 2) same token again => 409 (single-use consumed).
code, _ = postRegister(t, s, `{"token":"demo","sign_pub":"`+fixSignPub+`","kex_pub":"`+fixKexPub+`"}`)
if code != 409 {
t.Fatalf("reused token: want 409, got %d", code)
}
}
func TestHandleRegisterValidation(t *testing.T) {
s := &server{registrar: newRegistrar("", "demo=demo:member")}
// bad sign_pub (too short) => 400
if code, _ := postRegister(t, s, `{"token":"demo","sign_pub":"abcd","kex_pub":"`+fixKexPub+`"}`); code != 400 {
t.Fatalf("short sign_pub: want 400, got %d", code)
}
// missing token => 400
if code, _ := postRegister(t, s, `{"sign_pub":"`+fixSignPub+`","kex_pub":"`+fixKexPub+`"}`); code != 400 {
t.Fatalf("missing token: want 400, got %d", code)
}
// unknown token with no mock match and no register-url => 400
if code, _ := postRegister(t, s, `{"token":"nope","sign_pub":"`+fixSignPub+`","kex_pub":"`+fixKexPub+`"}`); code != 400 {
t.Fatalf("unknown token: want 400, got %d", code)
}
}
+143 -39
View File
@@ -5,9 +5,12 @@ This directory holds the material to bring up unibus as a **3-node cluster**
plane (rooms/members/keys/users on JetStream KV + the anti-replay nonce bucket)
survives the loss of any one node (quorum 2/3).
> **The agent that authored this never touched a VPS.** Every step that changes a
> remote host is marked **HUMAN** and is executed by the operator. `deploy-cluster.sh`
> defaults to a dry run.
> **Status: this cluster is DEPLOYED in production** (magnus + homer + datardos,
> R3, enforce+ACL+TLS) — see report 0011. The runbook below was authored before any
> VPS existed and has since been **corrected against the real deploy** (report 0012):
> the start ordering, the R1→R3 reality, and the live user-add path were all wrong
> or missing. Steps that change a remote host are marked **HUMAN**; `deploy-cluster.sh`
> still defaults to a dry run.
## Files
@@ -22,18 +25,22 @@ Generated keys/secrets (`out/`, `build/`, `secrets/`) are **gitignored** — the
secret and never leave the operator's trusted machine except over the secure
rsync channel.
## Topology
## Topology (as deployed, report 0011)
| Node | SSH | Public IP | WireGuard IP | Role |
|---|---|---|---|---|
| magnus | `magnus` | `<MAGNUS_PUBLIC_IP>` | `<MAGNUS_WG_IP>` | seed (first up) |
| homer | `homer` | `141.94.69.66` | `<HOMER_WG_IP>` | replica |
| datardos | `dd` | `51.91.100.142` | `<DATARDOS_WG_IP>` (10.21.0.x) | replica |
| Node | SSH | Public IP | Role |
|---|---|---|---|
| magnus | `magnus` (root) | `135.125.201.30` | node — **= organic-machine.com = `om`**, the critical host (caddy + gitea + registry-api + monitoring); the bus runs alongside, untouched |
| homer | `homer` (ubuntu+sudo) | `141.94.69.66` | node |
| datardos | `dd` (ubuntu+sudo) | `51.91.100.142` | node |
The route layer (server-to-server) prefers the **WireGuard mesh**
(`ROUTE_NETWORK=wg`); the client data plane and the HTTP control plane are reached
over the public IPs. The route CA is **separate** from the client CA, so a client
cert can never be presented to the route port.
`ROUTE_NETWORK=public`, **not `wg`**: there is no WireGuard mesh between the three
nodes (homer and datardos do not even have the `wg` binary; om's only WG peers are
the operator's PCs). The server-to-server routes therefore travel over the public
IPs, protected by the **separate cluster route CA** (mutual route TLS) — a client
data-plane cert can never be presented to the route port. The client data plane and
the HTTP control plane are also reached over the public IPs. There is no fixed
"seed" node: with R3 the three are peers (see "Bring up" for why a lone node cannot
self-serve).
## Prerequisites (HUMAN, once)
@@ -93,25 +100,48 @@ SEED
> The KV written here lives in `./local_files/jetstream`, which the cluster unit
> reuses (`--nats-store` default), so the admin is present when the enforce cluster
> starts. Additional users are added the same loopback way until a
> `user add --store kv` exists (see GAP in report 0009).
> starts. This loopback bootstrap is needed ONLY for the very first admin (the
> chicken-and-egg). **Every user after that is added with the cluster live** — no
> stop-seed-restart — via `user add --store kv` (see "Add users to the live
> cluster" below, report 0012).
## Bring up (HUMAN — staggered)
## Bring up (HUMAN)
Bring up the seed first, then the replicas one at a time, checking each joins.
> **CORRECTION (report 0012).** The original instruction — "start magnus alone and
> verify healthz, then add the others" — is **WRONG and will look like a hung
> deploy.** A 3-node JetStream cluster forms a RAFT meta-group that needs a quorum
> (2 of 3) to elect a leader. A single started node has no quorum, so its JetStream
> meta never becomes current: `--store kv` blocks creating the KV buckets and
> **`/healthz` never returns ok** until a second node joins. Waiting for magnus to
> "go green" before starting the others therefore deadlocks the rollout.
Start the nodes so a quorum forms. On a **clean cluster** the simplest correct
procedure is to start all three close together and let the meta-group converge:
```bash
# 1. Seed node (after the seed step above).
ssh root@magnus 'systemctl enable --now membershipd-cluster'
ssh root@magnus 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt'
# Start all three (order does not matter); each blocks on the others until a
# 2/3 quorum elects a JetStream meta leader, then the KV buckets are created.
for h in magnus homer datardos; do ssh "$h" 'sudo systemctl enable --now membershipd-cluster'; done
# 2. Replicas, one at a time.
ssh root@homer 'systemctl enable --now membershipd-cluster'
ssh root@datardos 'systemctl enable --now membershipd-cluster'
# Only NOW does healthz return ok — once the meta-group has a leader (give it
# ~10-30s on a cold start). Poll, do not assume the first node is broken.
for h in magnus homer datardos; do
echo "== $h =="; ssh "$h" 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt || echo "(not ready yet — needs quorum)"'
done
```
> Initial rollout runs at **R1** (`KV_REPLICAS=1` in `nodes.env`): the buckets live
> on the seed only. This is NOT HA yet — see "Scale to R3".
A **staggered** start also works, but only because `membershipd`'s KV open RETRIES
the bucket creation for a 120s bootstrap budget (issue 0006g, fix #3): the first
node sits in that retry loop — NOT serving healthz — until the second node makes a
quorum, then both converge and the third catches up. Either way, a lone node never
self-serves; do not gate the next node's start on the previous one's healthz.
> A cold multi-node start only converges because of **three cold-start fixes**
> (report 0011): route pooling off (`PoolSize=-1`), `NoAdvertise=true` (Docker
> bridge IPs not gossiped), and the KV-open retry loop above. Without them the
> meta-group re-elects leaders forever and bucket creation hangs. If a fresh
> cluster will not form, confirm the running binary contains these fixes before
> touching config.
## Promote an existing single-node (SQLite) deployment (HUMAN, optional)
@@ -137,11 +167,80 @@ ssh root@magnus 'nats --server nats://127.0.0.1:4250 server list' # 3 servers,
A healthy cluster shows 3 routed servers and a JetStream meta-group with a leader.
## Scale to R3 (HUMAN — real HA)
## Add users to the live cluster (HUMAN — `user add --store kv`)
Once all three nodes are up and routed, raise the replication factor of every
control-plane stream from 1 to 3 IN PLACE (no data loss), then flip `KV_REPLICAS=3`
in `nodes.env` so future (re)deploys keep it:
With the cluster up, add (and revoke) bus users **without stopping anything**,
directly against the replicated KV allowlist. This replaces the stop-seed-restart
procedure the original runbook implied for every user beyond the first admin.
The mechanism is the cluster's own **privileged internal connection**: under
`enforce` every bus user is confined by the per-subject ACL to its own rooms, so no
ordinary identity may write the control-plane buckets. The only identity the
authenticator grants full JetStream permissions is `membershipd`'s internal service
identity. The unit persists that identity to `${INTERNAL_ID_FILE}`
(`/opt/unibus/secrets/internal.id`, 0600) via `--internal-id-file`, so the same key
is available to the CLI. Run the CLI **on a node, over loopback** (the data-plane
TLS cert SAN covers `127.0.0.1`); reading the identity file requires root on that
node, which already implies full control of it, so this adds no practical exposure.
```bash
# Add a member to the live cluster's replicated allowlist (run on any node).
ssh root@magnus 'sudo /opt/unibus/membershipd user add --store kv \
--handle alice --role member --sign-pub <64-hex-ed25519-pub>'
# -> added user "alice" (...) role=member
# -> KV_UNIBUS_users: leader=<node> followers_current=2/2 msgs=N (replicated, HA)
# List / revoke against the same live KV:
ssh root@magnus 'sudo /opt/unibus/membershipd user list --store kv'
ssh root@magnus 'sudo /opt/unibus/membershipd user revoke --store kv <64-hex-ed25519-pub>'
```
Defaults assume an on-node invocation (`--nats-url nats://127.0.0.1:4250`,
`--internal-id-file /opt/unibus/secrets/internal.id`, `--ca /opt/unibus/tls/ca.crt`,
`--kv-replicas 3`). Semantics:
- **Idempotent / non-destructive**: re-adding the same key is an explicit
`already registered` error (exit 1), never a silent overwrite — a re-add cannot
flip a member to admin. To replace a user, `revoke` then add.
- **HA**: the write commits through the JetStream quorum, so it succeeds even with
one node down (2/3); the printed `followers_current` shows replication.
- **No hard delete**: `revoke` flips status to `revoked` (denied on both planes,
auditable); the KV has no row deletion, matching the SQLite store.
> **Rollout note (report 0012):** the live verification deployed this binary +
> `--internal-id-file` to **datardos only** (the non-critical node). magnus and
> homer still run the 0011 binary. To make the capability available (and the unit)
> on all three — recommended, the posture is identical so there is no urgency — roll
> the new binary with backups, one node at a time, verifying healthz between each:
> ```bash
> for h in homer magnus; do
> ssh "$h" 'sudo cp -a /opt/unibus/membershipd /opt/unibus/membershipd.bak' # backup
> scp build/membershipd "$h:/tmp/m" && ssh "$h" 'sudo install -o ubuntu -g ubuntu -m0775 /tmp/m /opt/unibus/membershipd'
> # add INTERNAL_ID_FILE=/opt/unibus/secrets/internal.id to /opt/unibus/cluster.env
> # add `--internal-id-file ${INTERNAL_ID_FILE} \` to the unit before `--store kv`
> ssh "$h" 'sudo systemctl daemon-reload && sudo systemctl restart membershipd-cluster'
> ssh "$h" 'curl -fsS https://127.0.0.1:8470/healthz --cacert /opt/unibus/tls/ca.crt' # green before next
> done
> ```
> (`deploy-cluster.sh` + the unit template already emit `INTERNAL_ID_FILE` and the
> flag, so a fresh `./deploy-cluster.sh --yes` is correct for all three.)
## Replication: go straight to R3 (HUMAN — real HA)
> **CORRECTION (report 0012).** The original "start at R1, then scale to R3" plan
> assumed R1 is a usable interim state. **It is not, in this cluster.** At R1 all six
> control-plane buckets (`KV_UNIBUS_users/rooms/members/room_keys/rooms_by_member`
> + `KV_UNIBUS_nonces`) live on a SINGLE node — a hard **SPOF for authentication**:
> if that node dies, the nonce/KV control plane is unreachable and EVERY
> authenticated request fails closed (auth DoS). Worse, the cold multi-node start
> only converges at all because of the three cold-start fixes (see "Bring up"); the
> real deploy never ran a healthy R1 and **jumped straight to R3 once the cluster
> formed.** Treat R1 as a transient artifact of bucket creation, not a milestone.
The deployed config already sets `KV_REPLICAS=3` in `nodes.env`. If buckets were
created at R1 (e.g. only one node was up when `--store kv` first opened them), raise
every control-plane stream to R3 IN PLACE (no data loss) once all three nodes are
routed:
```bash
for s in KV_UNIBUS_users KV_UNIBUS_rooms KV_UNIBUS_members KV_UNIBUS_room_keys \
@@ -151,27 +250,32 @@ done
# (also OBJ_UNIBUS_blobs if the object store is in use)
```
Until this is done, R1 means the seed node is a **single point of failure for
authentication**: if it dies, the nonce/KV control plane is unreachable and every
authenticated request fails closed (auth DoS). R1 is a rollout step, not HA.
After this each bucket shows `followers_current=2/2` (quorum 2/3). The
`user add --store kv` command prints that figure for `KV_UNIBUS_users` on every add,
which is a cheap live HA check.
## Chaos test (HUMAN — requires the 3 live VPS; NOT run here)
## Chaos test (HUMAN — requires the 3 live VPS)
Validate quorum tolerance after R3:
```bash
# Kill one node; the cluster keeps serving (quorum 2/3).
ssh root@datardos 'systemctl stop membershipd-cluster'
# Kill one node; the cluster keeps serving (quorum 2/3). On ubuntu nodes use sudo.
ssh dd 'sudo systemctl stop membershipd-cluster'
# -> clients fail over (multiple seed URLs); reads/writes still succeed.
ssh root@datardos 'systemctl start membershipd-cluster' # rejoins, catches up
ssh dd 'sudo systemctl start membershipd-cluster' # rejoins, catches up
# Kill two nodes; quorum is LOST — the control plane should fail CLOSED (deny),
# never fail open. Verify a request is rejected, not silently served.
```
This network-level chaos test (kill 1/3, kill 2/3, partition/split-brain) is part
of the deploy validation (issue 0003f) and runs against the real VPS — it is
deliberately out of scope for the authoring agent.
> **Validated (report 0012).** The 0011 chaos run checked only the control plane
> (healthz + meta/stream-leader failover + KV readable with 2/3). Report 0012 added
> the missing data-plane proofs against the live cluster: a real authenticated
> client (`cmd/clientcheck`, operator identity, nkey+TLS) creating an E2E room and
> publishing/subscribing — including a node stopped mid-stream, where the client
> failed over to a survivor and kept receiving with zero loss (quorum 2/3) — and
> `user add --store kv` committing with one node (the KV leader) down. The kill-2/3
> fail-closed case remains a documented manual step.
## Rollback
+12 -8
View File
@@ -97,6 +97,7 @@ TLS_KEY=${REMOTE_DIR}/tls/server-${name}.key
ROUTE_TLS_CERT=${REMOTE_DIR}/tls/route-${name}.crt
ROUTE_TLS_KEY=${REMOTE_DIR}/tls/route-${name}.key
ROUTE_TLS_CA=${REMOTE_DIR}/tls/cluster-ca.crt
INTERNAL_ID_FILE=${REMOTE_DIR}/secrets/internal.id
EOF
run ssh "$target" "mkdir -p ${REMOTE_DIR}/tls ${REMOTE_DIR}/secrets"
@@ -114,13 +115,16 @@ if [[ $APPLY -eq 0 ]]; then
fi
cat <<'NEXT'
HUMAN — staggered start (do NOT enable all at once; see README "Bring up"):
1. Seed node first (e.g. magnus):
ssh root@magnus 'systemctl enable --now membershipd-cluster'
ssh root@magnus '/opt/unibus/membershipd user add --admin ...' # seed admin
2. Then the other two, one at a time, checking quorum after each:
ssh root@homer 'systemctl enable --now membershipd-cluster'
ssh root@datardos 'systemctl enable --now membershipd-cluster'
HUMAN — bring up (see README "Bring up" — a LONE node has no quorum and never
serves healthz, so do NOT gate the next node on the previous one going green):
1. Seed the FIRST admin into the KV via the loopback bootstrap (README
"Seed the first admin"); this is needed only for the chicken-and-egg admin.
2. Start all three so a 2/3 quorum forms (order does not matter); healthz
turns ok only once the meta-group elects a leader (~10-30s cold):
for h in magnus homer datardos; do ssh "$h" 'sudo systemctl enable --now membershipd-cluster'; done
3. Verify posture + quorum (README "Verify").
4. Scale replicas 1 -> 3 once all three are up (README "Scale to R3").
4. Ensure R3 on every control-plane stream (README "Replication: go straight to
R3"); R1 is a SPOF, not a milestone.
5. Add further users with the cluster LIVE — no restart — via
`membershipd user add --store kv` (README "Add users to the live cluster").
NEXT
@@ -33,6 +33,7 @@ ExecStart=/opt/unibus/membershipd \
--route-tls-cert ${ROUTE_TLS_CERT} \
--route-tls-key ${ROUTE_TLS_KEY} \
--route-tls-ca ${ROUTE_TLS_CA} \
--internal-id-file ${INTERNAL_ID_FILE} \
--store kv \
--kv-replicas ${KV_REPLICAS}
# Restart=always (NOT on-failure): a clean SIGTERM exits success, and on-failure
+21 -8
View File
@@ -2,10 +2,10 @@
#
# This file is SOURCED by generate-cluster-certs.sh and deploy-cluster.sh.
#
# HUMAN: fill in every <PLACEHOLDER> with the real value before running the
# HUMAN: fill in every placeholder with the real value before running the
# scripts. The public IPs known at authoring time are pre-filled; the WireGuard
# mesh IPs and magnus's public IP must be supplied. The scripts refuse to run
# while any <PLACEHOLDER> remains.
# while any unfilled placeholder remains.
# Cluster identity (must be identical on every node).
CLUSTER_NAME="unibus"
@@ -16,7 +16,7 @@ CLUSTER_USER="unibus-cluster"
# KV/nonce replication factor. START AT 1 for the initial 1->3 rollout, then raise
# to 3 IN PLACE (see README "Scale to R3") once all three nodes have joined. Only
# set this to 3 here after the third node is up and you re-run the KV update.
KV_REPLICAS=1
KV_REPLICAS=3
# Ports (same on every node; the route port is server-to-server only).
NATS_CLIENT_PORT=4250
@@ -30,15 +30,28 @@ SSH_USER="root"
# Which address family the inter-node routes use. "wg" builds --routes from the
# WireGuard mesh IPs (private server-to-server links, preferred); "public" uses
# the public IPs. The route layer is always mutual-TLS regardless.
ROUTE_NETWORK="wg"
#
# DEPLOY DECISION (2026-06-07): set to "public". No WireGuard mesh exists between
# the three cluster nodes — homer and datardos do not even have the `wg` binary
# installed, and om's only WG peers are the operator's personal PCs, not the VPS.
# Rather than stand up a fresh mesh blindly, the routes go over the public IPs,
# still protected by the separate cluster route CA (mutual-TLS). On magnus (the
# only node with ufw active) the route port 6250 is restricted to the homer and
# datardos public IPs; homer/datardos run ufw inactive (Docker hosts) and rely on
# the route mutual-TLS for 6250.
ROUTE_NETWORK="public"
# One row per node: NAME SSH_HOST PUBLIC_IP WG_IP
# NAME -> --server-name and the per-node cert filenames (unique).
# SSH_HOST -> the `ssh <SSH_HOST>` alias (see ~/.ssh/config).
# SSH_HOST -> the `ssh ALIAS` alias (see ~/.ssh/config).
# PUBLIC_IP -> public address; goes in the cert SANs (client-facing data plane).
# WG_IP -> WireGuard mesh address; cert SAN + route target when ROUTE_NETWORK=wg.
# NOTE: with ROUTE_NETWORK=public and no WireGuard mesh, the WG_IP column is set to
# each node's public IP so the cert SAN covers the address actually used by the
# public routes and no unfilled placeholder remains (scripts refuse to run otherwise).
# magnus == organic-machine.com == om (135.125.201.30); SSH alias `magnus` enters as root.
CLUSTER_NODES=(
"magnus magnus <MAGNUS_PUBLIC_IP> <MAGNUS_WG_IP>"
"homer homer 141.94.69.66 <HOMER_WG_IP>"
"datardos dd 51.91.100.142 <DATARDOS_WG_IP>"
"magnus magnus 135.125.201.30 135.125.201.30"
"homer homer 141.94.69.66 141.94.69.66"
"datardos dd 51.91.100.142 51.91.100.142"
)
@@ -0,0 +1,78 @@
---
issue: 0007
title: Cifrado at-rest del control plane (JetStream KV / SQLite en disco)
status: spec
created: 2026-06-07
domain: security
scope: unibus (pkg/embeddednats, cmd/membershipd, deploy/cluster) + procedimiento de migración del store existente
---
# Objetivo
Cifrar en reposo el almacenamiento del plano de control para que un nodo comprometido
(root en el VPS) o un disco robado no exponga los metadatos de control en claro.
Estado actual (auditado el 07/06/2026, report 0012 y siguientes):
- **Contenido de los mensajes**: cifrado E2E por room (megolm/olm). El servidor nunca ve el
plaintext; no vive en el plano de control. **No es el objeto de este issue.**
- **Claves de room** (`UNIBUS_room_keys`): guardadas **selladas** (sealed box X25519, cifradas
para cada miembro). El servidor las almacena y reparte pero no puede abrirlas. **Ya protegidas.**
- **Metadatos de control** (`UNIBUS_rooms`, `UNIBUS_members`, `UNIBUS_rooms_by_member`,
`UNIBUS_users`): se serializan con `json.Marshal` y se escriben **en claro** en el store. En
cluster ese store es el directorio `local_files/jetstream/` de cada nodo; en single-node es el
archivo SQLite `local_files/unibus.db`. Hoy **no hay cifrado at-rest**: con root en un nodo se
pueden leer subjects de salas, la pertenencia (quién está en qué sala con qué rol), los handles
y roles de los usuarios, y las claves públicas (signPub/kexPub). No se exponen mensajes (E2E) ni
se pueden descifrar salas (claves selladas), pero sí toda la topología.
Tras este issue, los buckets/archivos del control plane quedan cifrados en disco con una clave por
nodo gestionada fuera de git. El modelo de amenaza pasa de "root del nodo ve la topología" a "root
del nodo necesita además la clave at-rest (que puede vivir en un secreto separado / TPM / variable
de entorno inyectada) para leer cualquier cosa".
# Contexto técnico
- NATS Server / JetStream soporta **encryption at-rest** nativo: se configura una cifra
(`aes` o `chacha20`) y una clave; JetStream cifra los ficheros de los streams/KV en disco. El
bus usa un NATS **embebido** (`pkg/embeddednats`), así que la activación es por opciones del
servidor embebido, no por un `nats-server.conf` externo.
- Para el backend SQLite (single-node) el equivalente sería SQLCipher o cifrado a nivel de
archivo/FS; queda como sub-tarea de menor prioridad porque el despliegue real es cluster (KV).
# Tareas
1. Confirmar la API de encryption-at-rest del NATS embebido en la versión usada (opción de
servidor para cipher + clave; cómo se pasa la clave de forma que no quede en argv ni en git).
2. Activar el cifrado en `pkg/embeddednats` detrás de una opción de configuración. La clave se
inyecta por archivo (`--jetstream-encryption-key-file`, 0600, junto a las claves TLS del nodo)
o variable de entorno desde el unit systemd; nunca en argv ni commiteada.
3. `cmd/membershipd`: flag/env para la clave + reflejar el estado en la posture publicada en
`/healthz` (p.ej. `"at_rest":true`) para que el monitor lo verifique.
4. `deploy/cluster`: provisionar la clave at-rest por nodo (generación + `pass`/secrets gitignored)
y cablearla en `cluster.env` + el unit. Documentar en el runbook.
5. **Migración del store existente** (gotcha crítico): JetStream no re-cifra retroactivamente los
datos ya escritos en claro. Diseñar y documentar el procedimiento seguro para el cluster en
producción (probable: backup → exportar snapshot del control plane → parar nodo → recrear el
store con la clave activa → re-importar; o rotación nodo a nodo aprovechando la replicación R3).
Respetar la regla de migraciones (aditivo, sin pérdida de datos).
6. Tests: arrancar un nodo con clave at-rest, escribir un user/room, y verificar que el fichero en
disco **no** contiene en claro un subject/handle conocido (grep negativo), y que el nodo sigue
leyéndolos con la clave. Verificar que sin la clave el store no se abre.
# Definition of Done
- Cifrado at-rest activo en los 3 nodos del cluster; `/healthz` lo refleja en la posture.
- Evidencia ejecutable: un valor conocido (subject de sala / handle de usuario) **no** aparece en
claro al hacer `grep` sobre `local_files/jetstream/`; el nodo lo sigue sirviendo con la clave.
- Procedimiento de migración probado sobre datos reales sin pérdida (snapshot/restore verificado).
- La clave at-rest nunca está en git ni en argv; vive en archivo 0600 / secreto inyectado.
- No baja ninguna otra capa de seguridad (enforce + ACL + TLS + E2E + sealed keys intactas).
# Notas
Aditivo y ortogonal al resto de la seguridad: TLS protege en tránsito, E2E el contenido, las claves
de room van selladas; este issue cierra el último hueco (metadatos de control en claro en disco)
para el modelo de amenaza "VPS comprometido / disco robado". Prioridad media: el despliegue ya es
seguro frente a ataques de red (enforce+TLS+ACL); esto endurece frente a compromiso físico/root del
host. Relacionado con el endurecimiento de los issues 0004/0005/0006.
+70
View File
@@ -456,6 +456,23 @@ type memberRoomJSON struct {
Role string `json:"role"`
}
// userJSON mirrors the server's wire type on the admin user-management endpoints.
type userJSON struct {
SignPub string `json:"sign_pub"`
Handle string `json:"handle"`
Role string `json:"role"`
Status string `json:"status"`
CreatedAt string `json:"created_at"`
RevokedAt string `json:"revoked_at,omitempty"`
}
// addUserReq is the POST /users body (mirror of the server type).
type addUserReq struct {
SignPub string `json:"sign_pub"`
Handle string `json:"handle"`
Role string `json:"role"`
}
// ---- room operations ------------------------------------------------------
// RoomRef is a room this peer belongs to, returned by ListMyRooms. It is the
@@ -490,6 +507,59 @@ func (c *Client) ListMyRooms() ([]RoomRef, error) {
return out, nil
}
// ---- user administration (admin-only) ------------------------------------
// UserInfo is a bus user as returned by the admin user-management endpoints. It
// is a flat view (no nested types) for the admin panel: the signing key
// (lowercase hex), handle, role ("admin"|"member"), status ("active"|"revoked"),
// and timestamps. RevokedAt is empty for an active user.
type UserInfo struct {
SignPub string
Handle string
Role string
Status string
CreatedAt string
RevokedAt string
}
// ListUsers returns the full bus allowlist, including revoked users. The caller
// must be signing as an admin: a non-admin signer is rejected by the server with
// 403, surfaced here as an error.
func (c *Client) ListUsers() ([]UserInfo, error) {
var resp []userJSON
if err := c.doJSON("GET", "/users", nil, &resp); err != nil {
return nil, err
}
out := make([]UserInfo, 0, len(resp))
for _, u := range resp {
out = append(out, UserInfo{
SignPub: u.SignPub,
Handle: u.Handle,
Role: u.Role,
Status: u.Status,
CreatedAt: u.CreatedAt,
RevokedAt: u.RevokedAt,
})
}
return out, nil
}
// AddUser registers a bus user from their Ed25519 signing public key (64-hex).
// role is "admin" or "member" (empty defaults to member, matching the server).
// The caller must be signing as an admin. Re-adding an already-registered key
// returns an error (the server replies 409 and leaves the existing row
// untouched — no silent role/status change).
func (c *Client) AddUser(signPub, handle, role string) error {
return c.doJSON("POST", "/users", addUserReq{SignPub: signPub, Handle: handle, Role: role}, nil)
}
// RevokeUser revokes a bus user by their signing public key (64-hex). Revocation
// is a status flip (no hard delete): the identity stays auditable and is denied
// on both planes immediately. The caller must be signing as an admin.
func (c *Client) RevokeUser(signPub string) error {
return c.doJSON("POST", "/users/"+signPub+"/revoke", nil, nil)
}
// newRoomKey returns 32 random bytes for a symmetric room key.
func newRoomKey() ([]byte, error) {
k := make([]byte, 32)
+21 -5
View File
@@ -33,11 +33,17 @@ type identityFile struct {
KexPriv string `json:"kex_priv"`
}
// LoadOrCreateIdentity loads the identity at path, or generates and persists a
// new one if the file does not exist. The file is written with 0600
// permissions because it holds private keys.
func LoadOrCreateIdentity(path string) (cs.Identity, error) {
if data, err := os.ReadFile(path); err == nil {
// LoadIdentity loads an existing identity from path. Unlike LoadOrCreateIdentity
// it NEVER creates one: a missing or unreadable file is an error. It is for
// callers that must consume a specific, pre-provisioned identity rather than mint
// a fresh one — for example membershipd's persisted internal service identity,
// which `membershipd user add --store kv` reads to present the privileged nkey
// the cluster authenticator recognizes.
func LoadIdentity(path string) (cs.Identity, error) {
data, err := os.ReadFile(path)
if err != nil {
return cs.Identity{}, fmt.Errorf("client: read identity %q: %w", path, err)
}
var f identityFile
if err := json.Unmarshal(data, &f); err != nil {
return cs.Identity{}, fmt.Errorf("client: parse identity %q: %w", path, err)
@@ -49,6 +55,16 @@ func LoadOrCreateIdentity(path string) (cs.Identity, error) {
return id, nil
}
// LoadOrCreateIdentity loads the identity at path, or generates and persists a
// new one if the file does not exist. The file is written with 0600
// permissions because it holds private keys. A file that exists but is
// unreadable or corrupt is an error (NOT silently regenerated), so a damaged
// identity surfaces instead of minting a new key that cannot decrypt old data.
func LoadOrCreateIdentity(path string) (cs.Identity, error) {
if _, statErr := os.Stat(path); statErr == nil {
return LoadIdentity(path)
}
id, err := cs.GenerateIdentity()
if err != nil {
return cs.Identity{}, fmt.Errorf("client: generate identity: %w", err)
+99
View File
@@ -0,0 +1,99 @@
package client_test
import (
"encoding/hex"
"strings"
"testing"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/membership"
)
// findUserInfo returns the row with the given signing key (case-insensitive).
func findUserInfo(users []client.UserInfo, signPub string) (client.UserInfo, bool) {
want := strings.ToLower(signPub)
for _, u := range users {
if strings.ToLower(u.SignPub) == want {
return u, true
}
}
return client.UserInfo{}, false
}
// TestClientUsersAdminAPI drives the admin user-management API through the real
// pkg/client methods against an in-process membershipd under enforce: an admin
// client adds a user, lists it, revokes it, and sees the status flip — and a
// non-admin client is denied. This is the path the admin panel uses, so it locks
// the client/server contract the panel depends on.
func TestClientUsersAdminAPI(t *testing.T) {
h := newHarnessMode(t, membership.AuthEnforce)
waitHealth(t, h.ctrlURL)
admin, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect admin: %v", err)
}
defer admin.Close()
registerClient(t, h, admin, "admin", membership.RoleAdmin)
member, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect member: %v", err)
}
defer member.Close()
registerClient(t, h, member, "member", membership.RoleMember)
// A brand-new identity the admin will register over HTTP.
carol := mustIdentity(t)
carolPub := hex.EncodeToString(carol.SignPub)
// Admin adds carol as a member.
if err := admin.AddUser(carolPub, "carol", membership.RoleMember); err != nil {
t.Fatalf("admin AddUser: %v", err)
}
// Admin lists: carol present and active.
users, err := admin.ListUsers()
if err != nil {
t.Fatalf("admin ListUsers: %v", err)
}
row, ok := findUserInfo(users, carolPub)
if !ok {
t.Fatalf("carol missing from list after add: %+v", users)
}
if row.Status != membership.StatusActive || row.Role != membership.RoleMember {
t.Fatalf("carol row wrong after add: %+v", row)
}
// Re-adding the same key is a conflict surfaced as an error (no silent upsert).
if err := admin.AddUser(carolPub, "carol-again", membership.RoleAdmin); err == nil {
t.Fatalf("re-adding carol should error (409), got nil")
}
// Admin revokes carol; list shows the status flip (no hard delete).
if err := admin.RevokeUser(carolPub); err != nil {
t.Fatalf("admin RevokeUser: %v", err)
}
users, err = admin.ListUsers()
if err != nil {
t.Fatalf("admin ListUsers after revoke: %v", err)
}
row, ok = findUserInfo(users, carolPub)
if !ok {
t.Fatalf("carol vanished after revoke (should be a status flip): %+v", users)
}
if row.Status != membership.StatusRevoked {
t.Fatalf("carol should be revoked, got status %q", row.Status)
}
// A non-admin (member) is denied on every user-management method.
if _, err := member.ListUsers(); err == nil {
t.Fatalf("non-admin ListUsers should error (403), got nil")
}
if err := member.AddUser(carolPub, "x", membership.RoleMember); err == nil {
t.Fatalf("non-admin AddUser should error (403), got nil")
}
if err := member.RevokeUser(carolPub); err == nil {
t.Fatalf("non-admin RevokeUser should error (403), got nil")
}
}
+37 -1
View File
@@ -9,6 +9,7 @@ import (
"crypto/tls"
"fmt"
"net/url"
"os"
"time"
server "github.com/nats-io/nats-server/v2/server"
@@ -106,6 +107,13 @@ func StartHostAuth(storeDir, host string, port int, auth server.Authentication)
// blocks until the server is ready to accept connections (up to 5s) and returns
// the running server; the caller must Shutdown it.
func StartServer(cfg ServerConfig) (*server.Server, error) {
// Diagnostic toggle: UNIBUS_NATS_DEBUG=1 enables the embedded nats-server's own
// logger (route/RAFT/JetStream errors), which is otherwise silenced. Off by
// default so production behavior is unchanged; only set it when debugging the
// cluster route layer.
debugLevel := os.Getenv("UNIBUS_NATS_DEBUG")
debugNATS := debugLevel == "1" || debugLevel == "2"
traceNATS := debugLevel == "2"
opts := &server.Options{
JetStream: true,
StoreDir: cfg.StoreDir,
@@ -114,9 +122,18 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
ServerName: cfg.ServerName,
DontListen: false,
// Keep the embedded server quiet by default; the host app logs the URLs.
NoLog: true,
NoLog: !debugNATS,
Debug: debugNATS,
Trace: traceNATS,
Logtime: true,
NoSigs: true,
}
if debugNATS {
// Expose the nats-server monitoring endpoint (loopback) so the operator can
// inspect /jsz, /routez, /varz while debugging the cluster meta-group.
opts.HTTPHost = "127.0.0.1"
opts.HTTPPort = 8222
}
if cfg.Auth != nil {
opts.CustomClientAuthentication = cfg.Auth
// A CustomClientAuthentication alone does not make the server advertise a
@@ -141,6 +158,10 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
return nil, fmt.Errorf("embeddednats: new server: %w", err)
}
if debugNATS {
ns.ConfigureLogger()
}
go ns.Start()
if !ns.ReadyForConnections(5 * time.Second) {
@@ -162,6 +183,21 @@ func applyClusterOpts(opts *server.Options, c *ClusterConfig) error {
Port: c.Port,
Username: c.Username,
Password: c.Password,
// Disable route connection pooling (nats-server 2.10+ defaults to a pool of
// 3 connections per peer). On a small cluster the pool churns with
// "duplicate route"/"client closed" reconnects that interrupt the meta-group
// RAFT heartbeats, causing perpetual leader re-elections so the JetStream
// meta never becomes current and stream/KV creation hangs (issue 0006g).
// PoolSize=-1 forces the classic single route per peer, which is stable for
// the 3-node unibus cluster.
PoolSize: -1,
// NoAdvertise stops the server from gossiping its locally-discovered IPs to
// peers. The cluster nodes are Docker hosts, so without this NATS advertises
// the docker bridge addresses (172.x / 10.0.x) as reachable routes; peers
// then try to dial those private, mutually-unreachable IPs, churning the
// route layer and destabilizing the JetStream meta-group. With NoAdvertise
// the nodes use ONLY the explicit public-IP routes we configure (issue 0006g).
NoAdvertise: true,
}
if c.TLS != nil {
opts.Cluster.TLSConfig = c.TLS
+28 -5
View File
@@ -85,8 +85,18 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
if opTimeout <= 0 {
opTimeout = defaultKVOpTime
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
// Bootstrap budget for creating/opening the buckets. On a single node JetStream
// is ready the instant the server starts, so the first attempt succeeds. On a
// COLD multi-node cluster the JetStream meta-group must first elect a leader and
// each node must establish contact with it before its $JS.API responds. A KV
// op is a NATS request/reply: if it is published before the node's JetStream is
// ready the request is dropped (not queued), and a single long-context call then
// just blocks until it times out (issue 0006g). So we RETRY each bucket op with
// short per-attempt contexts until it succeeds or the overall bootstrap budget
// is exhausted; once the cluster is ready the next retry lands and the buckets
// are created, after which they persist and every node opens them quickly.
bootstrapBudget := 120 * time.Second
deadline := time.Now().Add(bootstrapBudget)
s := &jetstreamStore{opTimeout: opTimeout}
for _, b := range []struct {
@@ -99,14 +109,27 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
{bucketRoomKeys, &s.keys},
{bucketUsers, &s.users},
} {
kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
var kv jetstream.KeyValue
var lastErr error
for {
opCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
kv, lastErr = js.CreateOrUpdateKeyValue(opCtx, jetstream.KeyValueConfig{
Bucket: b.name,
Replicas: cfg.Replicas,
History: 1,
Storage: jetstream.FileStorage,
})
if err != nil {
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d): %w", b.name, cfg.Replicas, err)
cancel()
if lastErr == nil {
break
}
if time.Now().After(deadline) {
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d) after %s: %w", b.name, cfg.Replicas, bootstrapBudget, lastErr)
}
// JetStream not ready yet (no meta leader / request dropped). Wait and
// re-publish the op; in a cluster cold start this lands once the meta
// group settles.
time.Sleep(1 * time.Second)
}
*b.dst = kv
}
+173 -7
View File
@@ -213,9 +213,12 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeErr(w, http.StatusUnauthorized, "unauthorized: "+err.Error())
return
}
// Carry the authenticated signer's endpoint into the handler so room handlers
// can authorize by membership (audit H3). Only set on a verified identity.
s.mux.ServeHTTP(w, r.WithContext(withSigner(r.Context(), res.endpoint)))
// Carry the authenticated signer's endpoint AND signing key into the handler.
// Room handlers authorize by membership via the endpoint (audit H3); the
// user-management handlers authorize by role via the signing key (the endpoint
// id is a one-way hash of the key, so it cannot be reversed to look the signer
// up in the user allowlist). Both are set only on a verified identity.
s.mux.ServeHTTP(w, r.WithContext(withSigner(r.Context(), res.endpoint, res.pubHex)))
}
// isBodyTooLarge reports whether err is the sentinel returned by MaxBytesReader
@@ -229,11 +232,19 @@ func isBodyTooLarge(err error) bool {
// values cannot collide with keys set by other packages.
type ctxKey int
const ctxSignerEndpoint ctxKey = iota
const (
ctxSignerEndpoint ctxKey = iota
ctxSignerPub
)
// withSigner returns a context carrying the authenticated signer's endpoint id.
func withSigner(ctx context.Context, endpoint string) context.Context {
return context.WithValue(ctx, ctxSignerEndpoint, endpoint)
// withSigner returns a context carrying the authenticated signer's endpoint id
// and signing public key (lowercase hex). The endpoint authorizes room
// membership; the signing key authorizes user-management by role, because the
// endpoint id is a one-way hash of the key (base64url(sha256(signPub))) and so
// cannot be reversed to look the signer up in the user allowlist.
func withSigner(ctx context.Context, endpoint, pubHex string) context.Context {
ctx = context.WithValue(ctx, ctxSignerEndpoint, endpoint)
return context.WithValue(ctx, ctxSignerPub, pubHex)
}
// signerEndpoint returns the authenticated signer's endpoint id and whether one
@@ -245,6 +256,16 @@ func signerEndpoint(r *http.Request) (string, bool) {
return v, ok && v != ""
}
// signerPubHex returns the authenticated signer's signing public key (lowercase
// hex) and whether one is present. Like signerEndpoint it is absent under
// AuthOff and on a soft-mode pass-through; the user-management handlers treat
// that absence as "no admin identity" and deny (default-deny), since a
// privilege-granting operation must never run without a verified admin.
func signerPubHex(r *http.Request) (string, bool) {
v, ok := r.Context().Value(ctxSignerPub).(string)
return v, ok && v != ""
}
// requireMember authorizes a room request by membership (audit H3): it returns
// the signer endpoint and true when the request may proceed, or writes 403 and
// returns false when an authenticated signer is not a member of roomID. When no
@@ -262,6 +283,31 @@ func (s *Server) requireMember(w http.ResponseWriter, r *http.Request, roomID st
return signer, true
}
// requireAdmin authorizes a user-management request: it returns the signer's
// signing-key hex and true ONLY when the authenticated signer is a user with
// role admin and active status; otherwise it writes 403 and returns false.
//
// Default-deny, with no dev relaxation: unlike requireMember (which allows a
// request when no authenticated signer is present, preserving AuthOff/dev
// behavior for room reads), this denies whenever the signer is absent or is not
// a verified active admin. The user-management endpoints grant and revoke bus
// access, so they must never be reachable without a verified admin identity —
// the store is consulted on every call so a just-revoked admin is denied
// immediately, and any store error fails closed.
func (s *Server) requireAdmin(w http.ResponseWriter, r *http.Request) (string, bool) {
pubHex, ok := signerPubHex(r)
if !ok {
writeErr(w, http.StatusForbidden, "forbidden: admin role required")
return "", false
}
u, err := s.store.GetUser(pubHex)
if err != nil || u.Role != RoleAdmin || u.Status != StatusActive {
writeErr(w, http.StatusForbidden, "forbidden: admin role required")
return "", false
}
return pubHex, true
}
// isAuthExempt lists requests that bypass control-plane auth even under enforce.
// Only the unauthenticated health probe qualifies: it carries no data and is
// needed by load balancers / smoke checks / systemd before any identity exists.
@@ -280,6 +326,13 @@ func (s *Server) routes() {
s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom)
s.mux.HandleFunc("POST /blobs", s.handlePutBlob)
s.mux.HandleFunc("GET /blobs/{hash}", s.handleGetBlob)
// User-management (admin-only) — the HTTP-signed equivalent of the local
// `membershipd user` CLI, so the admin panel manages the bus allowlist by
// signing as an admin instead of needing direct store/KV access. All three
// pass through requireAdmin; they hit the same store the room handlers do.
s.mux.HandleFunc("GET /users", s.handleListUsers)
s.mux.HandleFunc("POST /users", s.handleAddUser)
s.mux.HandleFunc("POST /users/{signpub}/revoke", s.handleRevokeUser)
}
// ---- wire types -----------------------------------------------------------
@@ -357,6 +410,27 @@ type blobResp struct {
Hash string `json:"hash"`
}
// userJSON is the wire representation of a bus user on the admin endpoints. It
// carries the full record the panel needs to render the allowlist, including
// status (so revoked users are visible) and the timestamps. revoked_at is
// omitted for an active user.
type userJSON struct {
SignPub string `json:"sign_pub"`
Handle string `json:"handle"`
Role string `json:"role"`
Status string `json:"status"`
CreatedAt string `json:"created_at"`
RevokedAt string `json:"revoked_at,omitempty"`
}
// addUserReq is the POST /users body: the new user's Ed25519 signing key
// (64-hex), human handle, and role. role is optional and defaults to member.
type addUserReq struct {
SignPub string `json:"sign_pub"`
Handle string `json:"handle"`
Role string `json:"role"`
}
// ---- helpers --------------------------------------------------------------
func writeJSON(w http.ResponseWriter, code int, v any) {
@@ -674,3 +748,95 @@ func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write(data)
}
// ---- user-management handlers (admin-only) --------------------------------
// handleListUsers returns the full bus allowlist, including revoked users, so an
// admin sees the complete picture (a revoked identity stays auditable). Admin-only.
func (s *Server) handleListUsers(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
users, err := s.store.ListUsers()
if err != nil {
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
out := make([]userJSON, 0, len(users))
for _, u := range users {
out = append(out, userJSON{
SignPub: u.SignPub,
Handle: u.Handle,
Role: u.Role,
Status: u.Status,
CreatedAt: u.CreatedAt,
RevokedAt: u.RevokedAt,
})
}
writeJSON(w, http.StatusOK, out)
}
// handleAddUser registers a new bus user from an admin-supplied Ed25519 signing
// key. It mirrors the `membershipd user add` CLI: the key must be 64-hex, the
// role must be admin or member (empty defaults to member), and re-adding an
// already-registered key is a 409 that leaves the existing row untouched — no
// silent upsert that could flip a role or clobber status. Admin-only.
func (s *Server) handleAddUser(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
var req addUserReq
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
return
}
if req.SignPub == "" || req.Handle == "" {
writeErr(w, http.StatusBadRequest, "sign_pub and handle required")
return
}
if err := ValidateSignPubHex(req.SignPub); err != nil {
writeErr(w, http.StatusBadRequest, err.Error())
return
}
role := req.Role
if role == "" {
role = RoleMember
}
if role != RoleAdmin && role != RoleMember {
writeErr(w, http.StatusBadRequest,
fmt.Sprintf("invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember))
return
}
if err := s.store.AddUser(req.SignPub, req.Handle, role); err != nil {
if errors.Is(err, ErrUserExists) {
// Idempotency contract (mirrors the CLI): re-adding a key is an explicit,
// non-destructive conflict. To replace a user, revoke then add again.
writeErr(w, http.StatusConflict,
"user already registered (unchanged); revoke it first to replace")
return
}
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusCreated, map[string]string{"status": "added"})
}
// handleRevokeUser revokes a bus user by signing key. Revocation is a status
// flip (no hard delete) so the identity stays auditable and IsAuthorized denies
// it on both planes immediately. Revoking an unknown or already-revoked key is a
// 404. Admin-only.
func (s *Server) handleRevokeUser(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
signPub := r.PathValue("signpub")
if err := ValidateSignPubHex(signPub); err != nil {
writeErr(w, http.StatusBadRequest, err.Error())
return
}
if err := s.store.RevokeUser(signPub); err != nil {
writeServerErr(w, r, http.StatusNotFound, "no active user with that key", err)
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "revoked"})
}
+18
View File
@@ -2,6 +2,7 @@ package membership
import (
"database/sql"
"encoding/hex"
"errors"
"fmt"
"strings"
@@ -35,6 +36,23 @@ type User struct {
RevokedAt string // empty unless revoked
}
// ValidateSignPubHex ensures signPub is exactly a 32-byte Ed25519 public key in
// hex (64 hex chars). It is the single source of truth for that check, shared by
// the local admin CLI (which validates before seeding the first admin) and the
// HTTP user-management handlers (which validate an admin-supplied key before it
// reaches the store). Catching a malformed key here turns a silent "authorized
// nobody" into an explicit error at the boundary.
func ValidateSignPubHex(signPub string) error {
b, err := hex.DecodeString(signPub)
if err != nil {
return fmt.Errorf("sign-pub is not valid hex: %w", err)
}
if len(b) != 32 {
return fmt.Errorf("sign-pub must be a 32-byte Ed25519 public key (64 hex chars), got %d bytes", len(b))
}
return nil
}
// normalizeSignPub lowercases the hex key so lookups are case-insensitive: the
// primary key is stored lowercase and every query normalizes its input the same
// way, so a caller passing uppercase hex still matches.
+164
View File
@@ -0,0 +1,164 @@
package membership
import (
"encoding/hex"
"encoding/json"
"net/http"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
)
// signedJSON is signedReq for a JSON body: it marshals v and signs the request
// as id with a distinct nonce. It returns the response status and body, reusing
// the auth_test harness so these tests exercise the real signed wire contract.
func signedJSON(t *testing.T, h *authHarness, method, path string, v any, id cs.Identity, n int) (int, string) {
t.Helper()
var body []byte
if v != nil {
b, err := json.Marshal(v)
if err != nil {
t.Fatalf("marshal body: %v", err)
}
body = b
}
return do(t, signedReq(t, h.ts.URL, method, path, body, id, time.Now().Unix(), nonceN(n)))
}
// TestUsersHTTP_NonAdminForbidden is the security spine: a REGISTERED but
// non-admin signer (bob, role member) is denied on every user-management
// endpoint. His signature clears auth (he is in the allowlist), so each request
// reaches the handler, where requireAdmin returns 403 — default-deny by role.
func TestUsersHTTP_NonAdminForbidden(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
bob, _ := cs.GenerateIdentity()
register(t, h, bob, "bob") // role member (see register in authz_test.go)
bobPub := hex.EncodeToString(bob.SignPub)
victim, _ := cs.GenerateIdentity()
victimPub := hex.EncodeToString(victim.SignPub)
checks := []struct {
name string
method string
path string
body any
}{
{"list users", "GET", "/users", nil},
{"add user", "POST", "/users", addUserReq{SignPub: victimPub, Handle: "mallory", Role: RoleMember}},
{"revoke user", "POST", "/users/" + bobPub + "/revoke", nil},
}
for i, c := range checks {
code, body := signedJSON(t, h, c.method, c.path, c.body, bob, i+1)
if code != http.StatusForbidden {
t.Fatalf("non-admin %s should be 403, got %d (%s)", c.name, code, body)
}
}
}
// TestUsersHTTP_AdminRoundtrip exercises the golden path end to end: alice (the
// seeded admin) adds carol, sees her in the list as active, revokes her, then
// sees her status flip to revoked (no hard delete — she stays in the list).
func TestUsersHTTP_AdminRoundtrip(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
carol, _ := cs.GenerateIdentity()
carolPub := hex.EncodeToString(carol.SignPub)
// Add carol as a member.
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: carolPub, Handle: "carol", Role: RoleMember}, h.alice, 1); code != http.StatusCreated {
t.Fatalf("admin add carol should be 201, got %d (%s)", code, body)
}
// List: carol present and active; alice (the seed admin) also present.
users := listUsers(t, h, 2)
carolRow, ok := findUser(users, carolPub)
if !ok {
t.Fatalf("carol missing from list after add: %+v", users)
}
if carolRow.Status != StatusActive || carolRow.Role != RoleMember || carolRow.Handle != "carol" {
t.Fatalf("carol row wrong after add: %+v", carolRow)
}
if _, ok := findUser(users, h.alicePub); !ok {
t.Fatalf("seeded admin alice missing from list: %+v", users)
}
// Revoke carol.
if code, body := signedJSON(t, h, "POST", "/users/"+carolPub+"/revoke", nil, h.alice, 3); code != http.StatusOK {
t.Fatalf("admin revoke carol should be 200, got %d (%s)", code, body)
}
// List again: carol still present, now revoked (status flip, not delete).
users = listUsers(t, h, 4)
carolRow, ok = findUser(users, carolPub)
if !ok {
t.Fatalf("carol vanished from list after revoke (should be a status flip): %+v", users)
}
if carolRow.Status != StatusRevoked {
t.Fatalf("carol should be revoked, got status %q", carolRow.Status)
}
}
// TestUsersHTTP_Validation covers the input-validation contract: a malformed hex
// key is 400, an unknown role is 400, and re-adding an already-registered key is
// 409 (the existing row is left untouched — no silent upsert).
func TestUsersHTTP_Validation(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
good, _ := cs.GenerateIdentity()
goodPub := hex.EncodeToString(good.SignPub)
// Invalid hex (too short) -> 400.
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: "abcd", Handle: "shorty", Role: RoleMember}, h.alice, 1); code != http.StatusBadRequest {
t.Fatalf("malformed sign_pub should be 400, got %d (%s)", code, body)
}
// Invalid role -> 400.
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: goodPub, Handle: "weirdrole", Role: "superuser"}, h.alice, 2); code != http.StatusBadRequest {
t.Fatalf("invalid role should be 400, got %d (%s)", code, body)
}
// Re-adding the seeded admin's own key -> 409 (idempotency, no overwrite).
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: h.alicePub, Handle: "alice-again", Role: RoleMember}, h.alice, 3); code != http.StatusConflict {
t.Fatalf("re-adding an existing key should be 409, got %d (%s)", code, body)
}
// And the existing row is untouched: alice is still an active admin.
u, err := h.store.GetUser(h.alicePub)
if err != nil {
t.Fatalf("get alice after conflicting re-add: %v", err)
}
if u.Role != RoleAdmin || u.Status != StatusActive || u.Handle != "alice" {
t.Fatalf("conflicting re-add mutated the existing row: %+v", u)
}
}
// listUsers signs a GET /users as alice and decodes the response.
func listUsers(t *testing.T, h *authHarness, n int) []userJSON {
t.Helper()
code, body := signedJSON(t, h, "GET", "/users", nil, h.alice, n)
if code != http.StatusOK {
t.Fatalf("admin list users should be 200, got %d (%s)", code, body)
}
var users []userJSON
if err := json.Unmarshal([]byte(body), &users); err != nil {
t.Fatalf("decode users: %v (%s)", err, body)
}
return users
}
// findUser returns the row with the given signing key (case-insensitive).
func findUser(users []userJSON, signPub string) (userJSON, bool) {
want := normalizeSignPub(signPub)
for _, u := range users {
if normalizeSignPub(u.SignPub) == want {
return u, true
}
}
return userJSON{}, false
}
+3
View File
@@ -11,6 +11,9 @@
"dependencies": {
"@mantine/core": "^9.3.0",
"@mantine/hooks": "^9.3.0",
"@noble/curves": "^2.2.0",
"@noble/hashes": "^2.2.0",
"@scure/bip39": "^2.2.0",
"@tabler/icons-react": "^3.36.0",
"react": "^19.2.0",
"react-dom": "^19.2.0"
+36
View File
@@ -14,6 +14,15 @@ importers:
'@mantine/hooks':
specifier: ^9.3.0
version: 9.3.0(react@19.2.7)
'@noble/curves':
specifier: ^2.2.0
version: 2.2.0
'@noble/hashes':
specifier: ^2.2.0
version: 2.2.0
'@scure/bip39':
specifier: ^2.2.0
version: 2.2.0
'@tabler/icons-react':
specifier: ^3.36.0
version: 3.44.0(react@19.2.7)
@@ -339,6 +348,14 @@ packages:
peerDependencies:
react: ^19.2.0
'@noble/curves@2.2.0':
resolution: {integrity: sha512-T/BoHgFXirb0ENSPBquzX0rcjXeM6Lo892a2jlYJkqk83LqZx0l1Of7DzlKJ6jkpvMrkHSnAcgb5JegL8SeIkQ==}
engines: {node: '>= 20.19.0'}
'@noble/hashes@2.2.0':
resolution: {integrity: sha512-IYqDGiTXab6FniAgnSdZwgWbomxpy9FtYvLKs7wCUs2a8RkITG+DFGO1DM9cr+E3/RgADRpFjrKVaJ1z6sjtEg==}
engines: {node: '>= 20.19.0'}
'@rolldown/pluginutils@1.0.0-beta.27':
resolution: {integrity: sha512-+d0F4MKMCbeVUJwG96uQ4SgAznZNSq93I3V+9NHA4OpvqG8mRCpGdKmK8l/dl02h2CCDHwW2FqilnTyDcAnqjA==}
@@ -480,6 +497,12 @@ packages:
cpu: [x64]
os: [win32]
'@scure/base@2.2.0':
resolution: {integrity: sha512-b8XEupJibegiXV+tDUseI8oLQc8ei3d/4Jkb2RpbHh3MfE054ov3uIz2dhFkB3FI8iwYkEh0gGCApkrYggkPNg==}
'@scure/bip39@2.2.0':
resolution: {integrity: sha512-T/Bj/YvYMNkIPq6EENO6/rcs2e7qTNuyoUXf0KBFDmp0ZDu0H2X4Lq6yC3i0c8PcWkov5EbW+yQZZbdMmk154A==}
'@tabler/icons-react@3.44.0':
resolution: {integrity: sha512-8+rvzBbVm/1Z3sG3x7GUNAaxIKxwgz8xaMhRs23nrCnMTKRFAhEC+82zAIFeAA0seXdrAGX5HFCkaLpGK2rVHg==}
peerDependencies:
@@ -1086,6 +1109,12 @@ snapshots:
dependencies:
react: 19.2.7
'@noble/curves@2.2.0':
dependencies:
'@noble/hashes': 2.2.0
'@noble/hashes@2.2.0': {}
'@rolldown/pluginutils@1.0.0-beta.27': {}
'@rollup/rollup-android-arm-eabi@4.61.1':
@@ -1163,6 +1192,13 @@ snapshots:
'@rollup/rollup-win32-x64-msvc@4.61.1':
optional: true
'@scure/base@2.2.0': {}
'@scure/bip39@2.2.0':
dependencies:
'@noble/hashes': 2.2.0
'@scure/base': 2.2.0
'@tabler/icons-react@3.44.0(react@19.2.7)':
dependencies:
'@tabler/icons': 3.44.0
+134 -6
View File
@@ -1,11 +1,139 @@
import { useState } from "react";
import { Login } from "./Login";
import { useEffect, useState } from "react";
import { Center, Loader } from "@mantine/core";
import { ChatShell } from "./ChatShell";
import { Join } from "./Join";
import { Recover } from "./Recover";
import { WalletLogin } from "./WalletLogin";
import { Welcome } from "./Welcome";
import { api } from "./api";
import { localIdentity } from "./wallet/account";
import type { User } from "./types";
export function App() {
const [user, setUser] = useState<User | null>(null);
type Route = "loading" | "join" | "welcome" | "login" | "recover" | "chat";
if (!user) return <Login onLogin={setUser} />;
return <ChatShell user={user} onLogout={() => setUser(null)} />;
// readJoinToken returns the invite token if the current URL is /join?token=XXX.
function readJoinToken(): string | null {
if (window.location.pathname !== "/join") return null;
return new URLSearchParams(window.location.search).get("token");
}
// clearUrl drops any /join?token from the address bar once consumed, so a refresh
// or a shared screenshot does not replay the (single-use) token.
function clearUrl() {
if (window.location.pathname !== "/") {
window.history.replaceState(null, "", "/");
}
}
export function App() {
const [route, setRoute] = useState<Route>("loading");
const [user, setUser] = useState<User | null>(null);
const [token, setToken] = useState("");
const [storedHandle, setStoredHandle] = useState("");
// Decide the entry screen on mount: an invite link goes straight to join; a live
// gateway session resumes the chat; a device with a stored identity shows the
// password unlock; an empty device shows the welcome chooser.
useEffect(() => {
const t = readJoinToken();
if (t) {
setToken(t);
setRoute("join");
return;
}
let cancelled = false;
(async () => {
try {
const me = await api.me();
if (cancelled) return;
setUser({ id: me.endpoint, handle: me.handle || me.endpoint.slice(0, 8) });
setRoute("chat");
return;
} catch {
// no live session — fall through
}
const stored = await localIdentity();
if (cancelled) return;
if (stored) {
setStoredHandle(stored.handle);
setRoute("login");
} else {
setRoute("welcome");
}
})();
return () => {
cancelled = true;
};
}, []);
const enterChat = (u: User) => {
setUser(u);
setRoute("chat");
clearUrl();
};
const logout = () => {
void api.logout().catch(() => {});
setUser(null);
// Keep the encrypted identity on the device: logging out returns to the
// password unlock, not a full reset.
void localIdentity().then((stored) => {
if (stored) {
setStoredHandle(stored.handle);
setRoute("login");
} else {
setRoute("welcome");
}
});
};
switch (route) {
case "loading":
return (
<Center h="100vh" bg="dark.9">
<Loader color="brand" />
</Center>
);
case "join":
return (
<Join
token={token}
onJoined={enterChat}
onRecover={() => setRoute("recover")}
/>
);
case "welcome":
return (
<Welcome
onJoinToken={(t) => {
setToken(t);
setRoute("join");
}}
onRecover={() => setRoute("recover")}
/>
);
case "login":
return (
<WalletLogin
handle={storedHandle}
onLoggedIn={enterChat}
onRecover={() => setRoute("recover")}
/>
);
case "recover":
return (
<Recover
onRecovered={enterChat}
onBack={() => setRoute(storedHandle ? "login" : "welcome")}
/>
);
case "chat":
return user ? (
<ChatShell user={user} onLogout={logout} />
) : (
<Center h="100vh" bg="dark.9">
<Loader color="brand" />
</Center>
);
}
}
+47
View File
@@ -0,0 +1,47 @@
import type { ReactNode } from "react";
import { Card, Center, Stack, Text, ThemeIcon, Title } from "@mantine/core";
// AuthCard is the shared centered card used by every pre-chat screen (welcome,
// join, recover, wallet login) so they all look like one flow.
export function AuthCard({
width = 460,
children,
}: {
width?: number;
children: ReactNode;
}) {
return (
<Center h="100vh" bg="dark.9" p="md">
<Card w={width} p="xl" radius="lg" withBorder bg="dark.7">
<Stack gap="lg">{children}</Stack>
</Card>
</Center>
);
}
// AuthHeader is the icon + title + subtitle block at the top of an auth card.
export function AuthHeader({
icon,
title,
subtitle,
}: {
icon: ReactNode;
title: string;
subtitle?: string;
}) {
return (
<Stack align="center" gap="xs">
<ThemeIcon size={56} radius="xl" variant="light" color="brand">
{icon}
</ThemeIcon>
<Title order={3} ta="center">
{title}
</Title>
{subtitle && (
<Text c="dimmed" size="sm" ta="center">
{subtitle}
</Text>
)}
</Stack>
);
}
+38 -23
View File
@@ -19,7 +19,8 @@ import {
IconDotsVertical,
IconPaperclip,
} from "@tabler/icons-react";
import type { Message, Room, User } from "./types";
import { api, streamRoom } from "./api";
import type { Message, Room } from "./types";
function initials(s: string) {
return s.replace(/[^a-z0-9]/gi, "").slice(0, 2).toUpperCase() || "?";
@@ -54,22 +55,30 @@ function MessageRow({ msg }: { msg: Message }) {
);
}
export function ChatPanel({
room,
user,
}: {
room: Room | undefined;
user: User;
}) {
export function ChatPanel({ room }: { room: Room | undefined }) {
const [draft, setDraft] = useState("");
const [extra, setExtra] = useState<Record<string, Message[]>>({});
const [messages, setMessages] = useState<Message[]>([]);
const [sendError, setSendError] = useState<string | null>(null);
const viewport = useRef<HTMLDivElement>(null);
const msgs = room ? [...room.messages, ...(extra[room.id] ?? [])] : [];
// Abre el stream SSE de la room activa. El gateway entrega historia (rooms
// persistidas) y luego mensajes en vivo, ya descifrados. Dedup por id porque
// un re-render no debe duplicar y el eco del propio envío llega por aquí.
useEffect(() => {
setMessages([]);
setSendError(null);
if (!room) return;
const close = streamRoom(room.id, (m) => {
setMessages((prev) =>
prev.some((p) => p.id === m.id) ? prev : [...prev, m],
);
});
return close;
}, [room?.id]);
useEffect(() => {
viewport.current?.scrollTo({ top: viewport.current.scrollHeight });
}, [room?.id, msgs.length]);
}, [room?.id, messages.length]);
if (!room) {
return (
@@ -79,18 +88,19 @@ export function ChatPanel({
);
}
const send = () => {
const send = async () => {
const body = draft.trim();
if (!body) return;
const msg: Message = {
id: `local-${Date.now()}`,
sender: user.handle,
body,
ts: Date.now(),
mine: true,
};
setExtra((e) => ({ ...e, [room.id]: [...(e[room.id] ?? []), msg] }));
setDraft("");
setSendError(null);
try {
// No optimista: el mensaje propio vuelve por SSE con su id real (mine:true),
// evitando duplicados.
await api.send(room.id, body);
} catch (e) {
setDraft(body); // restaura el borrador si el envío falló
setSendError(e instanceof Error ? e.message : "No se pudo enviar");
}
};
return (
@@ -126,13 +136,18 @@ export function ChatPanel({
<ScrollArea style={{ flex: 1 }} viewportRef={viewport}>
<Stack gap="lg" p="md">
{msgs.map((m) => (
{messages.map((m) => (
<MessageRow key={m.id} msg={m} />
))}
</Stack>
</ScrollArea>
<Divider color="dark.4" />
{sendError && (
<Text c="red" size="xs" px="sm" pt={4}>
{sendError}
</Text>
)}
<Group p="sm" gap="xs" wrap="nowrap">
<ActionIcon variant="subtle" color="gray" size="lg">
<IconPaperclip size={18} />
@@ -143,14 +158,14 @@ export function ChatPanel({
placeholder={`Mensaje a ${room.name}`}
value={draft}
onChange={(e) => setDraft(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && send()}
onKeyDown={(e) => e.key === "Enter" && void send()}
/>
<ActionIcon
size="lg"
radius="xl"
variant="filled"
color="brand"
onClick={send}
onClick={() => void send()}
disabled={!draft.trim()}
>
<IconSend size={18} />
+56 -7
View File
@@ -1,9 +1,9 @@
import { useState } from "react";
import { Flex, Box } from "@mantine/core";
import { useCallback, useEffect, useState } from "react";
import { Flex, Box, Center, Loader, Stack, Text, Button } from "@mantine/core";
import { Sidebar } from "./Sidebar";
import { ChatPanel } from "./ChatPanel";
import { MOCK_ROOMS } from "./mock";
import type { User } from "./types";
import { api } from "./api";
import type { Room, User } from "./types";
export function ChatShell({
user,
@@ -12,10 +12,59 @@ export function ChatShell({
user: User;
onLogout: () => void;
}) {
const [rooms] = useState(MOCK_ROOMS);
const [activeId, setActiveId] = useState<string>(rooms[0]?.id ?? "");
const [rooms, setRooms] = useState<Room[]>([]);
const [activeId, setActiveId] = useState<string>("");
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const load = useCallback(() => {
setLoading(true);
api
.listRooms()
.then((rs) => {
setRooms(rs);
setActiveId((cur) => cur || rs[0]?.id || "");
setError(null);
})
.catch((e) => setError(e?.message ?? "No se pudieron cargar las rooms"))
.finally(() => setLoading(false));
}, []);
useEffect(() => {
load();
}, [load]);
const active = rooms.find((r) => r.id === activeId);
// El panel derecho muestra el estado de carga/error/empty sin tocar el layout.
let panel = <ChatPanel room={active} />;
if (loading && rooms.length === 0) {
panel = (
<Center h="100%">
<Loader color="brand" />
</Center>
);
} else if (error) {
panel = (
<Center h="100%">
<Stack align="center" gap="sm">
<Text c="red" size="sm">
{error}
</Text>
<Button variant="light" color="brand" onClick={load}>
Reintentar
</Button>
</Stack>
</Center>
);
} else if (rooms.length === 0) {
panel = (
<Center h="100%">
<Text c="dimmed">No perteneces a ninguna room todavía</Text>
</Center>
);
}
return (
<Flex h="100vh" w="100vw" style={{ overflow: "hidden" }}>
<Box
@@ -36,7 +85,7 @@ export function ChatShell({
/>
</Box>
<Box flex={1} h="100%" bg="dark.7" style={{ minWidth: 0 }}>
<ChatPanel room={active} user={user} />
{panel}
</Box>
</Flex>
);
+322
View File
@@ -0,0 +1,322 @@
import { useEffect, useMemo, useState } from "react";
import {
Alert,
Button,
Card,
Center,
Checkbox,
CopyButton,
Group,
Loader,
PasswordInput,
SimpleGrid,
Stack,
Text,
TextInput,
} from "@mantine/core";
import {
IconAlertTriangle,
IconCheck,
IconCopy,
IconKey,
IconShieldLock,
} from "@tabler/icons-react";
import { api, ApiError } from "./api";
import { AuthCard, AuthHeader } from "./AuthShell";
import type { User } from "./types";
import { newMnemonic, mnemonicWords } from "./wallet/bip39";
import { deriveIdentity, type WalletIdentity } from "./wallet/derive";
import { saveAndOpen } from "./wallet/account";
type Step = "generating" | "show-seed" | "confirm-seed" | "password" | "joining";
// pickPositions chooses `count` distinct word positions (0-based) to ask the user
// to confirm. This is a UI choice, not key material, so Math.random is fine.
function pickPositions(total: number, count: number): number[] {
const all = Array.from({ length: total }, (_, i) => i);
for (let i = all.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[all[i], all[j]] = [all[j], all[i]];
}
return all.slice(0, count).sort((a, b) => a - b);
}
// Join is the onboarding page reached from an invite link (/join?token=XXX). It
// generates a brand-new BIP39 seed, derives the identity, shows the seed exactly
// once with a confirmation gate, takes a local password, registers the PUBLIC key
// with the bus using the token, and enters the chat. The seed is never persisted
// and never sent to the server.
export function Join({
token,
onJoined,
onRecover,
}: {
token: string;
onJoined: (u: User) => void;
onRecover: () => void;
}) {
const [step, setStep] = useState<Step>("generating");
const [mnemonic, setMnemonic] = useState("");
const [identity, setIdentity] = useState<WalletIdentity | null>(null);
const [error, setError] = useState<string | null>(null);
// Generate the seed + identity once on mount. Deriving is fast and pure.
useEffect(() => {
if (!token) {
setError("Enlace de invitación inválido: falta el token.");
return;
}
try {
const m = newMnemonic();
setMnemonic(m);
setIdentity(deriveIdentity(m));
setStep("show-seed");
} catch {
setError("No se pudo generar la identidad en este navegador.");
}
}, [token]);
const words = useMemo(() => mnemonicWords(mnemonic), [mnemonic]);
if (error && step === "generating") {
return (
<AuthCard>
<Alert color="red" icon={<IconAlertTriangle size={18} />} title="Error">
{error}
</Alert>
<Button variant="light" mt="md" onClick={onRecover}>
Recuperar con mi seed
</Button>
</AuthCard>
);
}
if (step === "generating" || !identity) {
return (
<Center h="100vh" bg="dark.9">
<Loader color="brand" />
</Center>
);
}
if (step === "show-seed") {
return (
<ShowSeed words={words} onContinue={() => setStep("confirm-seed")} />
);
}
if (step === "confirm-seed") {
return (
<ConfirmSeed
words={words}
onBack={() => setStep("show-seed")}
onConfirmed={() => setStep("password")}
/>
);
}
// step === "password" | "joining"
return (
<SetPassword
busy={step === "joining"}
error={error}
onSubmit={async (password) => {
setStep("joining");
setError(null);
try {
// Register the PUBLIC identity with the bus (token authorizes), then
// encrypt the private key locally and open the per-user session.
const res = await api.register(token, identity.signPub, identity.kexPub);
const user = await saveAndOpen(identity, res.handle, password);
onJoined(user);
} catch (e) {
setError(
e instanceof ApiError ? e.message : "No se pudo completar el alta.",
);
setStep("password");
}
}}
/>
);
}
// ---- sub-screens ----------------------------------------------------------
function ShowSeed({
words,
onContinue,
}: {
words: string[];
onContinue: () => void;
}) {
const [acknowledged, setAcknowledged] = useState(false);
const phrase = words.join(" ");
return (
<AuthCard>
<AuthHeader
icon={<IconShieldLock size={30} />}
title="Guarda tu frase de recuperación"
subtitle="Estas 12 palabras son tu ÚNICA forma de recuperar tu cuenta si olvidas la contraseña o cambias de dispositivo. No las compartas con nadie."
/>
<Card bg="dark.8" radius="md" p="md" withBorder>
<SimpleGrid cols={3} spacing="xs" verticalSpacing="xs">
{words.map((w, i) => (
<Group gap={6} wrap="nowrap" key={i}>
<Text size="xs" c="dimmed" w={18} ta="right">
{i + 1}
</Text>
<Text size="sm" ff="monospace" fw={600}>
{w}
</Text>
</Group>
))}
</SimpleGrid>
</Card>
<Group justify="space-between">
<CopyButton value={phrase}>
{({ copied, copy }) => (
<Button
variant="subtle"
size="xs"
color={copied ? "teal" : "gray"}
leftSection={
copied ? <IconCheck size={14} /> : <IconCopy size={14} />
}
onClick={copy}
>
{copied ? "Copiada" : "Copiar"}
</Button>
)}
</CopyButton>
</Group>
<Alert color="yellow" variant="light" icon={<IconAlertTriangle size={16} />}>
unibus NO guarda esta frase. Si la pierdes y olvidas la contraseña, solo
el administrador podrá darte de alta de nuevo.
</Alert>
<Checkbox
checked={acknowledged}
onChange={(e) => setAcknowledged(e.currentTarget.checked)}
label="He guardado mi frase de recuperación en un lugar seguro"
/>
<Button disabled={!acknowledged} onClick={onContinue}>
Continuar
</Button>
</AuthCard>
);
}
function ConfirmSeed({
words,
onBack,
onConfirmed,
}: {
words: string[];
onBack: () => void;
onConfirmed: () => void;
}) {
// Ask the user to re-type 3 random words from their phrase. This proves they
// actually wrote the seed down rather than clicking through.
const positions = useMemo(() => pickPositions(words.length, 3), [words.length]);
const [inputs, setInputs] = useState<Record<number, string>>({});
const allCorrect = positions.every(
(p) => (inputs[p] ?? "").trim().toLowerCase() === words[p],
);
const anyTyped = positions.some((p) => (inputs[p] ?? "").length > 0);
return (
<AuthCard>
<AuthHeader
icon={<IconCheck size={30} />}
title="Confirma tu frase"
subtitle="Escribe las palabras solicitadas para confirmar que la guardaste bien."
/>
<Stack gap="sm">
{positions.map((p) => (
<TextInput
key={p}
label={`Palabra #${p + 1}`}
placeholder={`palabra ${p + 1}`}
value={inputs[p] ?? ""}
error={
(inputs[p] ?? "").length > 0 &&
(inputs[p] ?? "").trim().toLowerCase() !== words[p]
? "No coincide"
: undefined
}
onChange={(e) => {
// Capture the value synchronously: React nulls e.currentTarget
// after dispatch, so reading it inside the state updater (which runs
// later) would throw "Cannot read properties of null".
const v = e.currentTarget.value;
setInputs((prev) => ({ ...prev, [p]: v }));
}}
autoComplete="off"
spellCheck={false}
/>
))}
</Stack>
{!allCorrect && anyTyped && (
<Text size="xs" c="dimmed">
Revisa el orden y la ortografía de las palabras.
</Text>
)}
<Group grow>
<Button variant="default" onClick={onBack}>
Volver a ver
</Button>
<Button disabled={!allCorrect} onClick={onConfirmed}>
Confirmar
</Button>
</Group>
</AuthCard>
);
}
function SetPassword({
busy,
error,
onSubmit,
}: {
busy: boolean;
error: string | null;
onSubmit: (password: string) => void;
}) {
const [pw, setPw] = useState("");
const [pw2, setPw2] = useState("");
const tooShort = pw.length > 0 && pw.length < 8;
const mismatch = pw2.length > 0 && pw !== pw2;
const ready = pw.length >= 8 && pw === pw2 && !busy;
return (
<AuthCard>
<AuthHeader
icon={<IconKey size={30} />}
title="Protege tu identidad"
subtitle="Elige una contraseña para cifrar tu clave en ESTE dispositivo. No se guarda ni se envía a ningún servidor; solo desbloquea tu clave local."
/>
<PasswordInput
label="Contraseña"
description="Mínimo 8 caracteres"
leftSection={<IconKey size={16} />}
value={pw}
error={tooShort ? "Demasiado corta" : undefined}
onChange={(e) => setPw(e.currentTarget.value)}
data-autofocus
/>
<PasswordInput
label="Repite la contraseña"
leftSection={<IconKey size={16} />}
value={pw2}
error={mismatch ? "No coincide" : undefined}
onChange={(e) => setPw2(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && ready && onSubmit(pw)}
/>
{error && (
<Text c="red" size="sm" ta="center">
{error}
</Text>
)}
<Button disabled={!ready} loading={busy} onClick={() => onSubmit(pw)}>
Crear cuenta y entrar
</Button>
</AuthCard>
);
}
+30 -5
View File
@@ -11,15 +11,29 @@ import {
Title,
} from "@mantine/core";
import { IconShieldLock, IconKey } from "@tabler/icons-react";
import { api, ApiError } from "./api";
import type { User } from "./types";
export function Login({ onLogin }: { onLogin: (u: User) => void }) {
const [handle, setHandle] = useState("");
const [password, setPassword] = useState("");
const [busy, setBusy] = useState(false);
const [error, setError] = useState<string | null>(null);
const ready = handle.trim().length > 0 && password.length > 0;
const connect = () => {
const h = handle.trim();
if (ready) onLogin({ id: h, handle: h });
const connect = async () => {
if (!ready || busy) return;
setBusy(true);
setError(null);
try {
// La contraseña desbloquea la sesión del gateway (passphrase del operador).
// El handle es solo el nombre a mostrar en esta iteración (wallet = fase 2).
const me = await api.login(password);
const h = handle.trim() || me.endpoint.slice(0, 8);
onLogin({ id: me.endpoint, handle: h });
} catch (e) {
setError(e instanceof ApiError ? e.message : "No se pudo conectar al gateway");
setBusy(false);
}
};
return (
@@ -52,9 +66,20 @@ export function Login({ onLogin }: { onLogin: (u: User) => void }) {
leftSection={<IconKey size={16} />}
value={password}
onChange={(e) => setPassword(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && connect()}
onKeyDown={(e) => e.key === "Enter" && void connect()}
/>
<Button w="100%" size="md" onClick={connect} disabled={!ready}>
{error && (
<Text c="red" size="sm" ta="center">
{error}
</Text>
)}
<Button
w="100%"
size="md"
onClick={() => void connect()}
disabled={!ready}
loading={busy}
>
Conectar
</Button>
</Stack>
+175
View File
@@ -0,0 +1,175 @@
import { useMemo, useState } from "react";
import {
Alert,
Anchor,
Button,
Code,
Group,
PasswordInput,
Stack,
Text,
Textarea,
TextInput,
} from "@mantine/core";
import { IconKey, IconRotateClockwise } from "@tabler/icons-react";
import { AuthCard, AuthHeader } from "./AuthShell";
import { ApiError } from "./api";
import type { User } from "./types";
import { isValidMnemonic, mnemonicWords, normalizeMnemonic } from "./wallet/bip39";
import { deriveIdentity } from "./wallet/derive";
import { saveAndOpen } from "./wallet/account";
type Step = "phrase" | "password";
// Recover re-creates an existing identity from its 12-word seed — no admin needed.
// Validating the BIP39 phrase and re-deriving yields the SAME keypair (same
// sign_pub) the bus already authorizes, so the user lands back in the allowlist
// with their place intact. A new local password then re-encrypts the key on this
// device. Only if the user loses BOTH the password AND the seed must the admin
// re-provision them.
export function Recover({
onRecovered,
onBack,
}: {
onRecovered: (u: User) => void;
onBack: () => void;
}) {
const [step, setStep] = useState<Step>("phrase");
const [phrase, setPhrase] = useState("");
const [handle, setHandle] = useState("");
const [pw, setPw] = useState("");
const [pw2, setPw2] = useState("");
const [busy, setBusy] = useState(false);
const [error, setError] = useState<string | null>(null);
const normalized = normalizeMnemonic(phrase);
const wordCount = mnemonicWords(phrase).length;
const valid = isValidMnemonic(phrase);
// Re-derive as soon as the phrase is valid, so we can show the user which
// identity (sign_pub) it maps to before they commit a new password.
const identity = useMemo(
() => (valid ? deriveIdentity(normalized) : null),
[valid, normalized],
);
if (step === "phrase") {
return (
<AuthCard>
<AuthHeader
icon={<IconRotateClockwise size={30} />}
title="Recuperar con tu frase"
subtitle="Introduce tus 12 palabras de recuperación. Se quedan en este navegador: nunca se envían al servidor."
/>
<Textarea
label="Frase de recuperación (12 palabras)"
placeholder="palabra1 palabra2 palabra3 …"
autosize
minRows={3}
value={phrase}
onChange={(e) => setPhrase(e.currentTarget.value)}
spellCheck={false}
autoComplete="off"
/>
<Text size="xs" c={valid ? "teal" : "dimmed"}>
{wordCount > 0
? valid
? "Frase válida ✓"
: `${wordCount}/12 palabras — frase aún no válida`
: "Separadas por espacios."}
</Text>
{identity && (
<Alert color="brand" variant="light" title="Identidad reconstruida">
<Text size="xs">Tu clave pública de firma (sign_pub):</Text>
<Code block>{identity.signPub}</Code>
</Alert>
)}
<Group grow>
<Button variant="default" onClick={onBack}>
Volver
</Button>
<Button disabled={!valid} onClick={() => setStep("password")}>
Continuar
</Button>
</Group>
</AuthCard>
);
}
// step === "password"
const tooShort = pw.length > 0 && pw.length < 8;
const mismatch = pw2.length > 0 && pw !== pw2;
const ready = pw.length >= 8 && pw === pw2 && !busy && identity !== null;
const finish = async () => {
if (!ready || !identity) return;
setBusy(true);
setError(null);
try {
// No register here: the identity is already in the allowlist. Just re-encrypt
// locally and open the session as the recovered user.
const user = await saveAndOpen(identity, handle.trim(), pw);
onRecovered(user);
} catch (e) {
setError(
e instanceof ApiError
? e.message
: "No se pudo abrir la sesión con la identidad recuperada.",
);
setBusy(false);
}
};
return (
<AuthCard>
<AuthHeader
icon={<IconKey size={30} />}
title="Nueva contraseña"
subtitle="Elige una contraseña para cifrar tu clave recuperada en este dispositivo."
/>
<Stack gap="sm">
<TextInput
label="Nombre a mostrar (opcional)"
placeholder="tu-handle"
value={handle}
onChange={(e) => setHandle(e.currentTarget.value)}
/>
<PasswordInput
label="Contraseña"
description="Mínimo 8 caracteres"
leftSection={<IconKey size={16} />}
value={pw}
error={tooShort ? "Demasiado corta" : undefined}
onChange={(e) => setPw(e.currentTarget.value)}
data-autofocus
/>
<PasswordInput
label="Repite la contraseña"
leftSection={<IconKey size={16} />}
value={pw2}
error={mismatch ? "No coincide" : undefined}
onChange={(e) => setPw2(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && void finish()}
/>
</Stack>
{error && (
<Text c="red" size="sm" ta="center">
{error}
</Text>
)}
<Group grow>
<Button variant="default" onClick={() => setStep("phrase")}>
Volver
</Button>
<Button disabled={!ready} loading={busy} onClick={() => void finish()}>
Recuperar y entrar
</Button>
</Group>
<Group justify="center">
<Anchor size="xs" c="dimmed" onClick={onBack}>
Cancelar
</Anchor>
</Group>
</AuthCard>
);
}
+77
View File
@@ -0,0 +1,77 @@
import { useState } from "react";
import { Anchor, Button, Group, PasswordInput, Text } from "@mantine/core";
import { IconKey, IconWallet } from "@tabler/icons-react";
import { AuthCard, AuthHeader } from "./AuthShell";
import { ApiError } from "./api";
import type { User } from "./types";
import { unlockAndOpen } from "./wallet/account";
import { WrongPasswordError } from "./wallet/crypto";
// WalletLogin is shown when this device already holds an encrypted identity. The
// password decrypts the local private key and opens a per-user gateway session.
// The password is never stored and never sent to the server.
export function WalletLogin({
handle,
onLoggedIn,
onRecover,
}: {
handle: string;
onLoggedIn: (u: User) => void;
onRecover: () => void;
}) {
const [password, setPassword] = useState("");
const [busy, setBusy] = useState(false);
const [error, setError] = useState<string | null>(null);
const unlock = async () => {
if (!password || busy) return;
setBusy(true);
setError(null);
try {
const user = await unlockAndOpen(password);
onLoggedIn(user);
} catch (e) {
if (e instanceof WrongPasswordError) {
setError("Contraseña incorrecta.");
} else if (e instanceof ApiError) {
setError(e.message);
} else {
setError("No se pudo abrir tu identidad.");
}
setBusy(false);
}
};
return (
<AuthCard width={400}>
<AuthHeader
icon={<IconWallet size={30} />}
title="unibus"
subtitle={`Desbloquea la identidad de ${handle || "este dispositivo"}`}
/>
<PasswordInput
label="Contraseña"
description="Descifra tu clave guardada en este dispositivo"
placeholder="••••••••"
leftSection={<IconKey size={16} />}
value={password}
onChange={(e) => setPassword(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && void unlock()}
data-autofocus
/>
{error && (
<Text c="red" size="sm" ta="center">
{error}
</Text>
)}
<Button fullWidth onClick={() => void unlock()} disabled={!password} loading={busy}>
Entrar
</Button>
<Group justify="center">
<Anchor size="xs" c="dimmed" onClick={onRecover}>
¿Olvidaste la contraseña? Recupera con tu frase de 12 palabras
</Anchor>
</Group>
</AuthCard>
);
}
+70
View File
@@ -0,0 +1,70 @@
import { useState } from "react";
import { Button, Divider, Stack, Text, TextInput } from "@mantine/core";
import { IconLink, IconRotateClockwise, IconShieldLock } from "@tabler/icons-react";
import { AuthCard, AuthHeader } from "./AuthShell";
// extractToken pulls the invite token out of whatever the user pastes: a full
// link (.../join?token=XXX), a bare "token=XXX", or just the token itself.
function extractToken(input: string): string {
const s = input.trim();
if (!s) return "";
const m = s.match(/[?&]token=([^&\s]+)/);
if (m) return decodeURIComponent(m[1]);
if (s.startsWith("token=")) return s.slice("token=".length);
return s;
}
// Welcome is the entry screen on a device with no local identity. It offers the
// two ways in: open an invite link (new account) or recover an existing account
// from its 12-word seed.
export function Welcome({
onJoinToken,
onRecover,
}: {
onJoinToken: (token: string) => void;
onRecover: () => void;
}) {
const [link, setLink] = useState("");
const token = extractToken(link);
return (
<AuthCard width={420}>
<AuthHeader
icon={<IconShieldLock size={30} />}
title="unibus"
subtitle="Mensajería cifrada de extremo a extremo. Tu identidad vive en tu dispositivo."
/>
<Stack gap="xs">
<Text size="sm" fw={600}>
Tengo un enlace de invitación
</Text>
<TextInput
placeholder="Pega aquí tu enlace /join?token=…"
leftSection={<IconLink size={16} />}
value={link}
onChange={(e) => setLink(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && token && onJoinToken(token)}
/>
<Button disabled={!token} onClick={() => onJoinToken(token)}>
Crear mi cuenta
</Button>
</Stack>
<Divider label="o" labelPosition="center" color="dark.4" />
<Stack gap="xs">
<Text size="sm" fw={600}>
Ya tengo una cuenta
</Text>
<Button
variant="default"
leftSection={<IconRotateClockwise size={16} />}
onClick={onRecover}
>
Recuperar con mi seed (12 palabras)
</Button>
</Stack>
</AuthCard>
);
}
+167
View File
@@ -0,0 +1,167 @@
// La única capa por la que la SPA habla con el bus. Cada llamada va al gateway Go
// bajo /api; el gateway mantiene la sesión `pkg/client` (peer autenticado del
// bus), cifra/descifra por room y traduce a REST/SSE. El navegador nunca firma,
// nunca habla NATS y nunca ve una clave privada: solo guarda una cookie de
// sesión opaca (HttpOnly) que el gateway emite tras el login.
import type {
MeInfo,
Message,
MsgWire,
RegisterResult,
Room,
RoomWire,
} from "./types";
import type { WalletIdentity } from "./wallet/derive";
export class ApiError extends Error {
status: number;
constructor(message: string, status: number) {
super(message);
this.status = status;
}
}
async function req<T>(path: string, init?: RequestInit): Promise<T> {
const res = await fetch(path, {
// same-origin envía la cookie de sesión automáticamente (también detrás del
// proxy de vite en dev).
credentials: "same-origin",
headers: { "Content-Type": "application/json" },
...init,
});
const text = await res.text();
let body: unknown = null;
if (text) {
try {
body = JSON.parse(text);
} catch {
body = text;
}
}
if (!res.ok) {
const msg =
body && typeof body === "object" && "error" in body
? String((body as { error: unknown }).error)
: `HTTP ${res.status}`;
throw new ApiError(msg, res.status);
}
return body as T;
}
// roomFromWire mapea la fila del gateway al tipo Room que consume la UI. Los
// mensajes NO viven aquí: llegan por stream(). lastMessage/lastTs/unread se
// rellenan de forma neutra para no inventar datos (la cabecera de la sidebar se
// alimentará del stream en una iteración futura).
export function roomFromWire(r: RoomWire): Room {
return {
id: r.id,
name: r.name || r.subject,
encrypted: r.encrypt,
lastMessage: "",
lastTs: 0,
unread: 0,
messages: [],
};
}
// messageFromWire mapea un frame descifrado del SSE al tipo Message de la UI.
export function messageFromWire(m: MsgWire): Message {
return {
id: m.id,
sender: m.sender,
body: m.body,
ts: m.ts,
mine: m.mine,
};
}
export const api = {
// ---- onboarding wallet --------------------------------------------------
// register publica la identidad PÚBLICA del nuevo usuario en el allowlist del
// bus usando el token del enlace de invitación. NO requiere sesión: el token
// autoriza. El handle y el rol los fija el invite, no el cliente. La clave
// privada NUNCA se envía aquí.
register: (token: string, signPub: string, kexPub: string) =>
req<RegisterResult>("/api/register", {
method: "POST",
body: JSON.stringify({ token, sign_pub: signPub, kex_pub: kexPub }),
}),
// session abre una sesión POR USUARIO: el navegador entrega su identidad wallet
// completa (incluida la privada, solo por TLS) y el gateway conecta un cliente
// del bus que actúa COMO ese usuario. La privada vive en memoria del gateway
// mientras dure la sesión; no se persiste en el servidor.
session: (id: WalletIdentity, handle: string) =>
req<MeInfo>("/api/session", {
method: "POST",
body: JSON.stringify({
handle,
sign_pub: id.signPub,
sign_priv: id.signPriv,
kex_pub: id.kexPub,
kex_priv: id.kexPriv,
}),
}),
// ---- sesión (legacy operador) ------------------------------------------
// login desbloquea una sesión ligada al gateway del operador con su passphrase.
// El camino principal ahora es el wallet (session); login se mantiene por
// compatibilidad con el MVP de operador único.
login: (passphrase: string) =>
req<MeInfo>("/api/login", {
method: "POST",
body: JSON.stringify({ passphrase }),
}),
logout: () => req<{ status: string }>("/api/logout", { method: "POST" }),
me: () => req<MeInfo>("/api/me"),
// ---- rooms --------------------------------------------------------------
listRooms: async (): Promise<Room[]> => {
const wire = await req<RoomWire[]>("/api/rooms");
return wire.map(roomFromWire);
},
// createRoom: {subject, encrypted} basta — el gateway deriva la policy
// Matrix-like (cifrada + persistida + firmada) por defecto.
createRoom: async (subject: string, encrypted = true): Promise<Room> => {
const r = await req<RoomWire>("/api/rooms", {
method: "POST",
body: JSON.stringify({ subject, encrypted }),
});
return roomFromWire(r);
},
join: (roomID: string) =>
req<{ status: string }>(
`/api/rooms/${encodeURIComponent(roomID)}/join`,
{ method: "POST" },
),
send: (roomID: string, body: string) =>
req<{ status: string }>(
`/api/rooms/${encodeURIComponent(roomID)}/send`,
{ method: "POST", body: JSON.stringify({ body }) },
),
};
// streamRoom abre el SSE de una room y llama onMessage por cada frame descifrado
// (historia primero en rooms persistidas, luego en vivo). Devuelve una función
// de cierre. EventSource manda la cookie de sesión automáticamente y reconecta
// solo si la conexión cae; onError se invoca en cada corte para que la UI pueda
// reflejar el estado.
export function streamRoom(
roomID: string,
onMessage: (m: Message) => void,
onError?: (e: Event) => void,
): () => void {
const es = new EventSource(
`/api/rooms/${encodeURIComponent(roomID)}/stream`,
);
es.onmessage = (ev) => {
try {
const wire = JSON.parse(ev.data) as MsgWire;
onMessage(messageFromWire(wire));
} catch {
// frame malformado: se ignora, el stream sigue.
}
};
if (onError) es.onerror = onError;
return () => es.close();
}
+43 -3
View File
@@ -1,5 +1,5 @@
// Tipos de dominio de la UI. En la iteración 1 se llenan con datos mock;
// más adelante vendrán del gateway (REST/SSE) que es un peer del bus.
// Tipos de dominio de la UI. Los datos vienen del gateway Go (REST/SSE), que es
// un peer autenticado del bus. El navegador nunca firma ni habla NATS.
export interface User {
id: string;
@@ -8,7 +8,7 @@ export interface User {
export interface Message {
id: string;
sender: string; // handle
sender: string; // endpoint id del remitente (handle legible es fase 2)
body: string;
ts: number; // epoch ms
mine?: boolean;
@@ -23,3 +23,43 @@ export interface Room {
unread: number;
messages: Message[];
}
// ---- formas de la API del gateway (wire) ---------------------------------
// MeInfo es la identidad que el gateway encarna en la sesión actual (GET /api/me,
// POST /api/session, POST /api/login). En el modelo wallet es la identidad del
// USUARIO logueado; `handle` es su nombre a mostrar.
export interface MeInfo {
endpoint: string;
sign_pub: string;
handle: string;
}
// RegisterResult es la respuesta de POST /api/register: el handle y rol que el
// invite (token) fijó para el nuevo usuario.
export interface RegisterResult {
handle: string;
role: string;
}
// RoomWire es la fila de room que devuelve el gateway (GET /api/rooms). No trae
// mensajes: estos llegan por SSE (GET /api/rooms/{id}/stream).
export interface RoomWire {
id: string;
subject: string;
name: string;
epoch: number;
encrypt: boolean;
persist: boolean;
sign_msgs: boolean;
role: string;
}
// MsgWire es un mensaje ya descifrado que el gateway empuja por SSE.
export interface MsgWire {
id: string;
sender: string;
body: string;
ts: number;
mine: boolean;
}
+60
View File
@@ -0,0 +1,60 @@
// High-level wallet account operations shared by the join, recover and login
// flows. These compose the low-level primitives (derive / crypto / store) with
// the gateway API so the page components stay thin.
import { api } from "../api";
import type { MeInfo, User } from "../types";
import { decryptJSON, encryptJSON } from "./crypto";
import type { WalletIdentity } from "./derive";
import { getIdentity, putIdentity, type StoredIdentity } from "./store";
function toUser(me: MeInfo): User {
return { id: me.endpoint, handle: me.handle || me.endpoint.slice(0, 8) };
}
// saveAndOpen encrypts the identity under `password`, stores it on this device,
// and opens a gateway session as that user. Used by join (new identity) and
// recover (re-derived identity): both end with a locally-encrypted key plus a
// live per-user session. The mnemonic/seed is NOT touched here — only the derived
// keypair is persisted (encrypted).
export async function saveAndOpen(
identity: WalletIdentity,
handle: string,
password: string,
): Promise<User> {
const enc = await encryptJSON(identity, password);
await putIdentity({
handle,
signPub: identity.signPub,
kexPub: identity.kexPub,
enc,
createdAt: Date.now(),
});
const me = await api.session(identity, handle);
return toUser(me);
}
// unlockAndOpen reads this device's stored identity, decrypts the private key with
// `password`, and opens a gateway session. Throws WrongPasswordError on a bad
// password (GCM auth failure) and NoLocalIdentityError if the device has none.
export async function unlockAndOpen(password: string): Promise<User> {
const stored = await getIdentity();
if (!stored) throw new NoLocalIdentityError();
const identity = await decryptJSON<WalletIdentity>(stored.enc, password);
const me = await api.session(identity, stored.handle);
return toUser(me);
}
// localIdentity returns the device's stored identity record (or null), for the
// router to decide between the password-unlock screen and the welcome screen, and
// to greet the user by handle before unlocking.
export async function localIdentity(): Promise<StoredIdentity | null> {
return getIdentity();
}
export class NoLocalIdentityError extends Error {
constructor() {
super("no local identity on this device");
this.name = "NoLocalIdentityError";
}
}
+55
View File
@@ -0,0 +1,55 @@
// Thin wrappers over @scure/bip39 (a small, audited BIP39 implementation that
// ships the English wordlist and the mnemonic<->entropy conversions). We do not
// roll our own checksum logic — getting the BIP39 checksum wrong silently is a
// classic footgun, so the conversion stays in the library.
import {
generateMnemonic,
validateMnemonic,
mnemonicToEntropy,
} from "@scure/bip39";
import { wordlist } from "@scure/bip39/wordlists/english.js";
// MNEMONIC_STRENGTH_BITS = 128 bits of entropy => exactly 12 words.
export const MNEMONIC_STRENGTH_BITS = 128;
export const MNEMONIC_WORD_COUNT = 12;
// newMnemonic returns a fresh 12-word mnemonic from a CSPRNG (crypto.getRandomValues
// inside @scure). The caller must show it to the user once and never persist it.
export function newMnemonic(): string {
return generateMnemonic(wordlist, MNEMONIC_STRENGTH_BITS);
}
// normalizeMnemonic lowercases, trims and collapses whitespace so a phrase the
// user typed (extra spaces, trailing newline, mixed case) validates the same way
// it would have been generated.
export function normalizeMnemonic(input: string): string {
return input.trim().toLowerCase().split(/\s+/).filter(Boolean).join(" ");
}
// mnemonicWords splits a phrase into its individual words (normalized).
export function mnemonicWords(input: string): string[] {
const n = normalizeMnemonic(input);
return n ? n.split(" ") : [];
}
// isValidMnemonic checks word count, that every word is in the wordlist, and the
// BIP39 checksum. A phrase that fails this must not be used to derive an identity.
export function isValidMnemonic(input: string): boolean {
const n = normalizeMnemonic(input);
if (mnemonicWords(n).length !== MNEMONIC_WORD_COUNT) return false;
try {
return validateMnemonic(n, wordlist);
} catch {
return false;
}
}
// entropyHex returns the underlying entropy (hex) of a valid mnemonic. Used only
// for diagnostics / tests, never sent anywhere.
export function entropyHex(input: string): string {
const bytes = mnemonicToEntropy(normalizeMnemonic(input), wordlist);
return Array.from(bytes)
.map((b) => b.toString(16).padStart(2, "0"))
.join("");
}
+124
View File
@@ -0,0 +1,124 @@
// Local at-rest encryption of the wallet's private key, using only the platform
// WebCrypto (crypto.subtle) — no extra dependency, no WASM. The password derives
// an AES-GCM key via PBKDF2; the password itself is never stored, never sent to
// the server, and is not part of the identity (it only protects the local copy
// of the private key). The identity's source of truth is the BIP39 seed.
// PBKDF2 work factor. 210k SHA-256 iterations is the OWASP 2023 floor for
// PBKDF2-HMAC-SHA256; stored alongside the blob so a future bump stays readable.
const PBKDF2_ITERS = 210_000;
// EncryptedBlob is the at-rest form of a secret: AES-256-GCM ciphertext plus the
// public KDF parameters needed to re-derive the key from the password. None of
// these fields is secret on its own — only the password (never stored) unlocks it.
export interface EncryptedBlob {
kdf: "PBKDF2-SHA256";
iters: number;
salt: string; // hex, 16 random bytes (PBKDF2 salt)
iv: string; // hex, 12 random bytes (AES-GCM nonce)
ciphertext: string; // hex (includes the GCM auth tag)
}
function toHex(b: Uint8Array): string {
let s = "";
for (const x of b) s += x.toString(16).padStart(2, "0");
return s;
}
function fromHex(h: string): Uint8Array {
const out = new Uint8Array(h.length / 2);
for (let i = 0; i < out.length; i++) {
out[i] = parseInt(h.slice(i * 2, i * 2 + 2), 16);
}
return out;
}
async function deriveAesKey(
password: string,
salt: Uint8Array,
iters: number,
): Promise<CryptoKey> {
const enc = new TextEncoder();
const baseKey = await crypto.subtle.importKey(
"raw",
enc.encode(password),
"PBKDF2",
false,
["deriveKey"],
);
return crypto.subtle.deriveKey(
{ name: "PBKDF2", salt: salt as BufferSource, iterations: iters, hash: "SHA-256" },
baseKey,
{ name: "AES-GCM", length: 256 },
false,
["encrypt", "decrypt"],
);
}
// encryptSecret seals `plaintext` under `password` with a fresh random salt+iv.
export async function encryptSecret(
plaintext: Uint8Array,
password: string,
): Promise<EncryptedBlob> {
const salt = crypto.getRandomValues(new Uint8Array(16));
const iv = crypto.getRandomValues(new Uint8Array(12));
const key = await deriveAesKey(password, salt, PBKDF2_ITERS);
const ct = await crypto.subtle.encrypt(
{ name: "AES-GCM", iv: iv as BufferSource },
key,
plaintext as BufferSource,
);
return {
kdf: "PBKDF2-SHA256",
iters: PBKDF2_ITERS,
salt: toHex(salt),
iv: toHex(iv),
ciphertext: toHex(new Uint8Array(ct)),
};
}
// WrongPasswordError is thrown when GCM authentication fails on decrypt — almost
// always a wrong password (or a corrupted blob). Callers map it to a friendly
// "contraseña incorrecta" message.
export class WrongPasswordError extends Error {
constructor() {
super("wrong password");
this.name = "WrongPasswordError";
}
}
// decryptSecret re-derives the key from `password` and opens the blob. A wrong
// password makes GCM verification fail, surfaced as WrongPasswordError.
export async function decryptSecret(
blob: EncryptedBlob,
password: string,
): Promise<Uint8Array> {
const key = await deriveAesKey(password, fromHex(blob.salt), blob.iters);
try {
const pt = await crypto.subtle.decrypt(
{ name: "AES-GCM", iv: fromHex(blob.iv) as BufferSource },
key,
fromHex(blob.ciphertext) as BufferSource,
);
return new Uint8Array(pt);
} catch {
throw new WrongPasswordError();
}
}
// JSON convenience: encrypt/decrypt a JS value as UTF-8 JSON. We use this to seal
// the whole WalletIdentity object (the private halves) under the password.
export async function encryptJSON(
value: unknown,
password: string,
): Promise<EncryptedBlob> {
return encryptSecret(new TextEncoder().encode(JSON.stringify(value)), password);
}
export async function decryptJSON<T>(
blob: EncryptedBlob,
password: string,
): Promise<T> {
const bytes = await decryptSecret(blob, password);
return JSON.parse(new TextDecoder().decode(bytes)) as T;
}
+69
View File
@@ -0,0 +1,69 @@
// Deterministic identity derivation from a BIP39 mnemonic.
//
// The identity is NOT a loose random keypair: it is derived deterministically
// and reproducibly from a 12-word BIP39 mnemonic (128 bits of entropy). The
// SAME mnemonic always yields the SAME keypair (same sign_pub), which is what
// lets a user recover their account on a new device — or after forgetting the
// local password — without admin intervention: the re-derived identity is byte
// for byte the one already in the bus allowlist.
//
// SCHEME (must be identical at create time and at recovery time):
//
// 1. mnemonic 12 BIP39 words (128-bit entropy + 4-bit checksum)
// 2. seed = BIP39_seed(mnemonic)
// = PBKDF2(HMAC-SHA512, password = NFKD(mnemonic),
// salt = "mnemonic", iterations = 2048, dkLen = 64)
// (the standard BIP39 seed; no extra passphrase)
// 3. signSeed = HKDF-SHA256(ikm = seed, salt = "", info = "unibus-sign-v1", L = 32)
// 4. Ed25519 signing key from signSeed:
// sign_pub = Ed25519.publicKey(signSeed) (32 bytes)
// sign_priv = signSeed || sign_pub (64 bytes; Go's
// ed25519.PrivateKey layout = seed||pub, what the gateway expects)
// 5. kexSeed = HKDF-SHA256(ikm = seed, salt = "", info = "unibus-kex-v1", L = 32)
// 6. X25519 key-exchange key from kexSeed:
// kex_priv = kexSeed (32 bytes; X25519 clamps internally)
// kex_pub = X25519.publicKey(kexSeed) (32 bytes)
//
// The two distinct HKDF `info` labels domain-separate the signing key from the
// key-exchange key so they can never collide. All four halves match cs.Identity
// on the Go side exactly (sign_pub 32, sign_priv 64, kex_pub 32, kex_priv 32),
// so the gateway can act as the user's peer with the derived keys.
import { ed25519, x25519 } from "@noble/curves/ed25519.js";
import { hkdf } from "@noble/hashes/hkdf.js";
import { sha256 } from "@noble/hashes/sha2.js";
import { bytesToHex, concatBytes } from "@noble/hashes/utils.js";
import { mnemonicToSeedSync } from "@scure/bip39";
export const INFO_SIGN = "unibus-sign-v1";
export const INFO_KEX = "unibus-kex-v1";
// WalletIdentity holds the four keypair halves, each lowercase hex. This is the
// shape the gateway's POST /api/session consumes (and a subset — the two public
// halves — is what POST /api/register sends to the bus).
export interface WalletIdentity {
signPub: string; // 64 hex (32-byte Ed25519 public key)
signPriv: string; // 128 hex (64-byte Ed25519 private key, seed||pub)
kexPub: string; // 64 hex (32-byte X25519 public key)
kexPriv: string; // 64 hex (32-byte X25519 private key)
}
// deriveIdentity turns a validated BIP39 mnemonic into the deterministic
// keypair. Pure: the same mnemonic in always produces the same identity out.
export function deriveIdentity(mnemonic: string): WalletIdentity {
const seed = mnemonicToSeedSync(mnemonic.normalize("NFKD")); // 64 bytes
const info = new TextEncoder();
const signSeed = hkdf(sha256, seed, undefined, info.encode(INFO_SIGN), 32);
const kexSeed = hkdf(sha256, seed, undefined, info.encode(INFO_KEX), 32);
const signPub = ed25519.getPublicKey(signSeed);
const signPriv = concatBytes(signSeed, signPub); // Go ed25519.PrivateKey = seed||pub
const kexPub = x25519.getPublicKey(kexSeed);
return {
signPub: bytesToHex(signPub),
signPriv: bytesToHex(signPriv),
kexPub: bytesToHex(kexPub),
kexPriv: bytesToHex(kexSeed),
};
}
+95
View File
@@ -0,0 +1,95 @@
// IndexedDB persistence of the device-local wallet. Only the encrypted private
// key plus the public halves and the display handle are stored — never the
// password, never the BIP39 seed. The private key never leaves the device except
// over TLS to the gateway to open a session (see api.session).
//
// MVP: one active identity per device (keyed by a fixed id). Multi-account on a
// single device is a documented gap.
import type { EncryptedBlob } from "./crypto";
const DB_NAME = "unibus-wallet";
const DB_VERSION = 1;
const STORE = "identity";
const ACTIVE_ID = "active";
// StoredIdentity is one row in IndexedDB. `enc` is the encrypted WalletIdentity
// (all four hex halves); signPub/kexPub are kept in the clear for display and so
// the UI can show who you are without unlocking.
export interface StoredIdentity {
id: string; // always ACTIVE_ID for the single-identity MVP
handle: string;
signPub: string; // 64 hex (public, safe to store in the clear)
kexPub: string; // 64 hex (public)
enc: EncryptedBlob; // encrypted private identity (the secret material)
createdAt: number;
}
function openDB(): Promise<IDBDatabase> {
return new Promise((resolve, reject) => {
const req = indexedDB.open(DB_NAME, DB_VERSION);
req.onupgradeneeded = () => {
const db = req.result;
if (!db.objectStoreNames.contains(STORE)) {
db.createObjectStore(STORE, { keyPath: "id" });
}
};
req.onsuccess = () => resolve(req.result);
req.onerror = () => reject(req.error);
});
}
function tx<T>(
db: IDBDatabase,
mode: IDBTransactionMode,
fn: (store: IDBObjectStore) => IDBRequest<T>,
): Promise<T> {
return new Promise((resolve, reject) => {
const t = db.transaction(STORE, mode);
const req = fn(t.objectStore(STORE));
req.onsuccess = () => resolve(req.result);
req.onerror = () => reject(req.error);
});
}
// getIdentity returns the device's active identity, or null if this device has
// no wallet yet (first visit, or a fresh device awaiting recovery/invite).
export async function getIdentity(): Promise<StoredIdentity | null> {
const db = await openDB();
try {
const row = await tx<StoredIdentity | undefined>(db, "readonly", (s) =>
s.get(ACTIVE_ID),
);
return row ?? null;
} finally {
db.close();
}
}
// hasIdentity is a cheap check for the router: does this device hold a wallet?
export async function hasIdentity(): Promise<boolean> {
return (await getIdentity()) !== null;
}
// putIdentity stores (or replaces) the active identity. Used by both join (new)
// and recover (re-derived): both end with an encrypted private key on the device.
export async function putIdentity(
rec: Omit<StoredIdentity, "id">,
): Promise<void> {
const db = await openDB();
try {
await tx(db, "readwrite", (s) => s.put({ id: ACTIVE_ID, ...rec }));
} finally {
db.close();
}
}
// clearIdentity removes the wallet from this device (e.g. "forget this device").
export async function clearIdentity(): Promise<void> {
const db = await openDB();
try {
await tx(db, "readwrite", (s) => s.delete(ACTIVE_ID));
} finally {
db.close();
}
}
+8 -1
View File
@@ -3,5 +3,12 @@ import react from "@vitejs/plugin-react";
export default defineConfig({
plugins: [react()],
server: { host: true, port: 5181 },
// En dev, /api (REST + SSE) se proxea al gateway Go (cmd/webgw, puerto 8481).
// El proxy hace streaming, así que el SSE de /api/rooms/{id}/stream funciona a
// través de él. En producción el gateway sirve el dist embebido y no hay proxy.
server: {
host: true,
port: 5183,
proxy: { "/api": "http://127.0.0.1:8481" },
},
});