7 Commits

Author SHA1 Message Date
agent 5ea8fa1c20 feat(web): wire the SPA to the live bus via the gateway (drop mock)
Replace the mock data source with a real data layer that talks to the webgw
gateway over REST + SSE. The UI components keep their look and props; only
where the data comes from changed.

- src/api.ts: the single repository layer. fetch wrappers (same-origin cookie)
  for login/logout/me and rooms list/create/join/send, plus streamRoom() which
  opens an EventSource and yields each decrypted message. Wire->UI mappers
  (roomFromWire, messageFromWire).
- src/types.ts: add the gateway wire shapes (MeInfo, RoomWire, MsgWire) next to
  the existing UI types.
- App.tsx: probe /api/me on mount to resume an existing session; otherwise show
  Login. Logout calls the gateway.
- Login.tsx: the password field now unlocks the gateway session (operator
  passphrase); shows a basic error and a loading state. Wallet-per-browser is
  phase 2.
- ChatShell.tsx: load rooms from /api/rooms with loading / empty / error states;
  same Flex layout.
- ChatPanel.tsx: stream messages over SSE for the active room (dedup by id),
  composer sends through the gateway; no optimistic insert (the peer's own echo
  returns over SSE with the real frame id).
- vite.config.ts: dev proxy /api (REST + SSE) -> the gateway on :8481.

mock.ts is left untouched (no longer imported) to avoid churn with the parallel
styling work on master.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 21:14:19 +02:00
agent fb8a03cf0c feat(webgw): web gateway peer (REST + SSE) for the chat SPA
Add cmd/webgw: a single Go binary that holds the operator's bus identity,
connects to the bus as a real authenticated peer (pkg/client), and exposes a
small REST + SSE API the browser consumes. The browser never signs, never
speaks NATS, and never sees a private key.

Endpoints (all under /api, gated by a session cookie except login):
  POST /api/login            unlock a session with the operator passphrase
  POST /api/logout
  GET  /api/me               operator identity the gateway acts as
  GET  /api/rooms            ListMyRooms
  POST /api/rooms            CreateRoom (default policy: encrypted+persisted+signed)
  POST /api/rooms/{id}/join  Join (fetch room key)
  POST /api/rooms/{id}/send  Publish (sealed + signed by the peer)
  GET  /api/rooms/{id}/stream  SSE of decrypted frames (history then live)

Design notes:
- One fan-out hub per room: a single bus subscription is multiplexed to N SSE
  clients, avoiding the per-(room,endpoint) durable-consumer contention that
  multiple Subscribe calls would cause.
- Posture seam mirrors unibus_admin/clientcheck: empty --ca = plaintext dev,
  non-empty = TLS+nkey on both planes; RefreshSession after a membership change
  only under the secured (ACL) posture.
- Identity loaded from `pass` or a 0600 file, held only in memory.
- Session auth: passphrase compared in constant time; opaque HttpOnly cookie
  so EventSource (which cannot set headers) can authenticate the stream.

TRUST MODEL: room content stays end-to-end encrypted on the bus. The gateway
reads plaintext only because it acts AS the operator's client — a legitimate
member of each room holding the room key. The per-browser wallet (WebCrypto)
that moves decryption into the browser is phase 2.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 21:14:08 +02:00
egutierrez b4f3118e85 Merge quick/users-http-admin: HTTP admin-only users API + client methods (report 0014) 2026-06-07 20:46:44 +02:00
egutierrez e9053169da Merge quick/0011-deploy-gaps: live user-add --store kv + clientcheck E2E + runbook fixes (report 0012) 2026-06-07 20:46:44 +02:00
Egutierrez b983e43090 docs(0007): spec encryption-at-rest del control plane (JetStream/SQLite en disco) 2026-06-07 20:34:35 +02:00
egutierrez b379730225 docs(app): document users HTTP admin model, bump 0.10.0
Add a gotcha describing the unified-storage model (the server writes
users to the same store/KV as rooms), the admin-only HTTP surface, and
the CLI-seeds-admin-#0 bootstrap. Bump the version 0.9.0 -> 0.10.0 and
add the capability growth log entry for the new HTTP admin users API.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 20:32:05 +02:00
egutierrez 450ca01baf feat(membership,client): HTTP admin-only users API
Close the last control-plane asymmetry: rooms had a signed HTTP surface
but users were only manageable via the local CLI or direct store access.
Add admin-only HTTP endpoints, symmetric with rooms, executed against the
same privileged store the server already serves (SQLite single-node, the
replicated JetStream KV in cluster) — no new KV connection, no internal
identity, so the admin panel can manage the allowlist by signing as an
admin instead of needing --db / direct KV access.

Endpoints (all behind requireAdmin, on top of the existing
signature+nonce+TLS+enforce middleware):
  - GET  /users                    list the full allowlist (incl. revoked)
  - POST /users                    add {sign_pub, handle, role}
  - POST /users/{signpub}/revoke   revoke (status flip, no hard delete)

requireAdmin is default-deny with no dev relaxation: it allows a request
only when the authenticated signer is confirmed by the store as an active
admin; any other case (no signer, non-admin, revoked, store error) is 403,
fail-closed. The request context now also carries the signer's sign_pub
hex, because the endpoint id is a one-way hash of the key and cannot be
reversed to look the signer up in the allowlist.

Validation/idempotency mirror the CLL: sign_pub must be 64-hex, role must
be admin|member (empty defaults to member), re-adding an existing key is a
409 that leaves the row untouched. The hex check is unified into
membership.ValidateSignPubHex, reused by the CLI and the handlers.

pkg/client gains ListUsers/AddUser/RevokeUser (flat UserInfo type) signed
via doJSON, so the panel plugs in directly.

Tests: non-admin -> 403 on all three endpoints; admin add->list->revoke
roundtrip; validation (400 hex, 400 role, 409 re-add, row untouched); plus
a client test against an embedded membershipd under enforce.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 20:31:57 +02:00
20 changed files with 1939 additions and 59 deletions
+38 -1
View File
@@ -2,7 +2,7 @@
name: unibus name: unibus
lang: go lang: go
domain: infra domain: infra
version: 0.9.0 version: 0.10.0
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo." description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
tags: [service, messaging, nats, e2e] tags: [service, messaging, nats, e2e]
uses_functions: uses_functions:
@@ -122,6 +122,21 @@ Para apuntar a un NATS externo en producción: `--nats-url nats://host:4222` en
las rutas GET de lectura. Confía en la red interna. Las rutas mutantes las rutas GET de lectura. Confía en la red interna. Las rutas mutantes
(`/rooms`, `/invite`, `/rekey`) sí exigen firma Ed25519 del owner sobre los (`/rooms`, `/invite`, `/rekey`) sí exigen firma Ed25519 del owner sobre los
bytes canónicos de la request. Endurecer es fase posterior. bytes canónicos de la request. Endurecer es fase posterior.
- **Gestión de usuarios: storage unificado, alta por dos vías.** El allowlist de
usuarios vive en el MISMO store que las rooms (`pkg/membership.Store`): SQLite en
single-node, JetStream KV replicado (`UNIBUS_users`) en cluster. El `Server` ya
tiene ese store privilegiado abierto (es quien sirve el KV en cada nodo), así que
expone `GET/POST /users` y `POST /users/{signpub}/revoke` como API HTTP admin-only,
simétrica con las rutas de rooms: el panel de administración firma como admin y el
server ejecuta la mutación contra el mismo store. El panel NO necesita `--db`, ni la
identidad interna, ni correr en un nodo del cluster; funciona idéntico en single-node
y cluster. La autorización es default-deny: solo un firmante que el store confirma como
`role == "admin"` activo pasa, cualquier otro recibe 403 (encima de la firma+nonce+TLS
ya existentes). La CLI `membershipd user add --store kv` sigue existiendo SOLO para
sembrar el admin #0 (bootstrap del huevo-gallina: sin un admin sembrado no hay quién
firme el primer `POST /users`); a partir de ahí toda la gestión es HTTP admin-only. El
alta es idempotente igual que la CLI: re-alta de una clave ya registrada = 409, sin
sobrescribir ni elevar rol; el revoke es un flip de status (sin hard-delete), auditable.
- **Identidad = secreto crítico.** El archivo de identidad (`worker.id`, - **Identidad = secreto crítico.** El archivo de identidad (`worker.id`,
`chat.id`) contiene las claves privadas (Ed25519 + X25519). Se escribe 0600. `chat.id`) contiene las claves privadas (Ed25519 + X25519). Se escribe 0600.
Perderlo = mensajes ilegibles, sin recuperación. Trátalo como una clave SSH. Perderlo = mensajes ilegibles, sin recuperación. Trátalo como una clave SSH.
@@ -154,6 +169,28 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
## Capability growth log ## Capability growth log
- v0.10.0 (2026-06-07) — API HTTP admin-only de gestión de usuarios, cerrando la
última asimetría del control plane: las rooms tenían superficie HTTP firmada
(`POST /rooms`, etc.) pero los users solo se gestionaban por CLI local o acceso
directo al store. Se añaden `GET /users` (lista completa, incluidos revocados),
`POST /users` (alta `{sign_pub, handle, role}`: valida hex de 64 chars + role en
`{admin, member}`, 409 idempotente que no sobrescribe ni eleva rol) y
`POST /users/{signpub}/revoke` (flip de status, sin hard-delete). Los tres pasan por
un helper `requireAdmin` default-deny que confirma contra el store que el firmante
autenticado es un user `role == "admin"` activo (el endpoint id es un hash one-way de
la clave, así que el contexto lleva ahora también el `sign_pub` hex del firmante para
resolver `GetUser`); cualquier otro firmante recibe 403, encima de la firma+nonce+TLS+
enforce ya heredadas del middleware. NO se abre conexión KV nueva ni se usa la identidad
interna: el server escribe vía su `s.store` privilegiado, el MISMO que las rooms (SQLite
single-node, KV `UNIBUS_users` en cluster). `pkg/client` gana `ListUsers/AddUser/RevokeUser`
(tipo plano `UserInfo`) firmando como admin, así la pestaña Users del panel deja de
necesitar `--db`/acceso KV directo. La CLI `membershipd user add --store kv` queda SOLO
para sembrar el admin #0 (bootstrap). La validación de `sign_pub` se unifica en
`membership.ValidateSignPubHex`, reusada por la CLI y los handlers. Tests nuevos:
no-admin → 403 en los tres endpoints, roundtrip admin add→list→revoke, y validación
(hex inválido → 400, role inválido → 400, re-alta → 409), más un test de cliente contra
un membershipd embebido. Cambios 100% aditivos: el comportamiento single-node y de las
rutas de rooms no cambia; vet/build/test verdes.
- v0.9.0 (2026-06-07) — cierre de los gaps que el despliegue del cluster (report - v0.9.0 (2026-06-07) — cierre de los gaps que el despliegue del cluster (report
0011) dejó abiertos (report 0012). (GAP A) Nueva capability `membershipd user 0011) dejó abiertos (report 0012). (GAP A) Nueva capability `membershipd user
add|list|revoke --store kv`: alta/baja de usuarios contra el KV replicado del add|list|revoke --store kv`: alta/baja de usuarios contra el KV replicado del
+3 -10
View File
@@ -1,7 +1,6 @@
package main package main
import ( import (
"encoding/hex"
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
@@ -90,16 +89,10 @@ func openStore(path string) membership.Store {
// validateSignPubHex ensures the key is exactly a 32-byte Ed25519 public key in // validateSignPubHex ensures the key is exactly a 32-byte Ed25519 public key in
// hex (64 hex chars). Catching this here turns a silent "authorized nobody" into // hex (64 hex chars). Catching this here turns a silent "authorized nobody" into
// an explicit error at seed time. // an explicit error at seed time. It delegates to membership.ValidateSignPubHex
// so the CLI and the HTTP user-management handlers share one rule.
func validateSignPubHex(signPub string) error { func validateSignPubHex(signPub string) error {
b, err := hex.DecodeString(signPub) return membership.ValidateSignPubHex(signPub)
if err != nil {
return fmt.Errorf("sign-pub is not valid hex: %w", err)
}
if len(b) != 32 {
return fmt.Errorf("sign-pub must be a 32-byte Ed25519 public key (64 hex chars), got %d bytes", len(b))
}
return nil
} }
// kvFlags holds the connection flags shared by the --store kv path of the user // kvFlags holds the connection flags shared by the --store kv path of the user
+246
View File
@@ -0,0 +1,246 @@
package main
import (
"encoding/hex"
"fmt"
"strings"
"sync"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/room"
)
// gateway is the live web gateway: it owns the operator's identity and a single
// connected unibus client, and turns the bus's crypto-bearing API into the plain
// REST/SSE surface the browser consumes. The browser never signs, never speaks
// NATS, and never sees a private key — the gateway is the legitimate room member
// that seals/opens payloads on the browser's behalf.
//
// TRUST MODEL: content stays end-to-end encrypted on the wire. The gateway can
// read plaintext because it acts AS the operator's client — a real member of
// each room, holding the room key K like any peer. It is the same trust a native
// desktop client has. In the wallet phase (per-browser WebCrypto identity) the
// decryption can move into the browser; today, for the single-operator MVP, the
// gateway decrypts server-side and pushes cleartext over a loopback/authenticated
// SSE channel.
type gateway struct {
id cs.Identity
endpoint string
cli *client.Client
refreshACL bool // call RefreshSession after a membership change (needed under a per-subject ACL bus)
mu sync.Mutex
hubs map[string]*roomHub // roomID -> live fan-out of decrypted frames to SSE clients
}
// gatewayConfig wires a live gateway.
type gatewayConfig struct {
Identity cs.Identity
NatsURL string
CtrlURL string
CtrlURLs []string
NatsURLs []string
CAPath string // bus CA; empty => plaintext dev connection (matches a loopback membershipd)
}
// newGateway connects the unibus client with the operator identity following the
// same posture seam every peer uses: a non-empty CA path means TLS + nkey, empty
// means plaintext dev. When a CA is configured the bus is assumed to enforce a
// per-subject ACL, so membership changes trigger a session refresh.
func newGateway(cfg gatewayConfig) (*gateway, error) {
opts := client.Options{
CtrlURLs: cfg.CtrlURLs,
NatsServers: cfg.NatsURLs,
}
if cfg.CAPath != "" {
tlsCfg, err := busauth.LoadCATLSConfig(cfg.CAPath)
if err != nil {
return nil, fmt.Errorf("webgw: load bus CA %q: %w", cfg.CAPath, err)
}
opts.UseNkey = true
opts.TLS = tlsCfg
opts.CtrlTLS = tlsCfg
}
cli, err := client.NewWithOptions(cfg.NatsURL, cfg.CtrlURL, cfg.Identity, opts)
if err != nil {
return nil, fmt.Errorf("webgw: connect bus client: %w", err)
}
return &gateway{
id: cfg.Identity,
endpoint: frame.EndpointID(cfg.Identity.SignPub),
cli: cli,
refreshACL: cfg.CAPath != "",
hubs: map[string]*roomHub{},
}, nil
}
// Close stops every hub and releases the bus client connection.
func (g *gateway) Close() error {
g.mu.Lock()
for _, h := range g.hubs {
h.stop()
}
g.hubs = map[string]*roomHub{}
g.mu.Unlock()
if g.cli != nil {
return g.cli.Close()
}
return nil
}
// ---- wire types (browser-facing JSON) ------------------------------------
// meInfo is what GET /api/me returns: the operator identity the gateway acts as.
type meInfo struct {
Endpoint string `json:"endpoint"`
SignPub string `json:"sign_pub"`
}
// roomWire is the browser view of a room. It deliberately omits messages: those
// stream over SSE (GET /api/rooms/{id}/stream), not in the room list.
type roomWire struct {
ID string `json:"id"`
Subject string `json:"subject"`
Name string `json:"name"`
Epoch int `json:"epoch"`
Encrypt bool `json:"encrypt"`
Persist bool `json:"persist"`
SignMsgs bool `json:"sign_msgs"`
Role string `json:"role"`
}
// createRoomReq is the POST /api/rooms body. Encrypt/Persist/SignMsgs are
// pointers so an omitted field falls back to the chat default rather than to the
// Go zero value (false). The common case — the browser sending only {subject,
// encrypted} — maps encrypted onto all three (the Matrix-like chat policy).
type createRoomReq struct {
Subject string `json:"subject"`
Encrypted *bool `json:"encrypted,omitempty"`
Encrypt *bool `json:"encrypt,omitempty"`
Persist *bool `json:"persist,omitempty"`
SignMsgs *bool `json:"sign_msgs,omitempty"`
}
// policy resolves the requested policy. A bare {subject} defaults to the
// Matrix-like chat room (encrypted + persisted + signed) so a created room keeps
// durable, end-to-end-encrypted, authored history. Callers can override any leg.
func (r createRoomReq) policy() room.Policy {
enc, per, sig := true, true, true
if r.Encrypted != nil {
enc, per, sig = *r.Encrypted, *r.Encrypted, *r.Encrypted
}
if r.Encrypt != nil {
enc = *r.Encrypt
}
if r.Persist != nil {
per = *r.Persist
}
if r.SignMsgs != nil {
sig = *r.SignMsgs
}
return room.Policy{Encrypt: enc, Persist: per, SignMsgs: sig}
}
// sendReq is the POST /api/rooms/{id}/send body.
type sendReq struct {
Body string `json:"body"`
}
// msgWire is one decrypted message pushed over SSE.
type msgWire struct {
ID string `json:"id"`
Sender string `json:"sender"`
Body string `json:"body"`
TS int64 `json:"ts"` // epoch ms (decoded from the frame's ULID id)
Mine bool `json:"mine"`
}
// ---- operations -----------------------------------------------------------
func (g *gateway) me() meInfo {
return meInfo{Endpoint: g.endpoint, SignPub: hex.EncodeToString(g.id.SignPub)}
}
// subjectName derives a short, human-friendly room name from its bus subject by
// dropping the leading namespace segment (room., test., proc., agent.). It is a
// display nicety only; the canonical identity stays the subject/room id.
func subjectName(subject string) string {
for _, p := range []string{"room.", "test.", "proc.", "agent.", "rpc."} {
if strings.HasPrefix(subject, p) {
return strings.TrimPrefix(subject, p)
}
}
return subject
}
func (g *gateway) listRooms() ([]roomWire, error) {
rooms, err := g.cli.ListMyRooms()
if err != nil {
return nil, err
}
out := make([]roomWire, 0, len(rooms))
for _, rm := range rooms {
out = append(out, roomWire{
ID: rm.RoomID,
Subject: rm.Subject,
Name: subjectName(rm.Subject),
Epoch: rm.Epoch,
Encrypt: rm.Policy.Encrypt,
Persist: rm.Policy.Persist,
SignMsgs: rm.Policy.SignMsgs,
Role: rm.Role,
})
}
return out, nil
}
func (g *gateway) createRoom(req createRoomReq) (roomWire, error) {
subject := strings.TrimSpace(req.Subject)
if subject == "" {
return roomWire{}, fmt.Errorf("webgw: subject required")
}
p := req.policy()
roomID, err := g.cli.CreateRoom(subject, p)
if err != nil {
return roomWire{}, err
}
// Under a per-subject ACL the operator's frozen NATS permissions do not yet
// cover the new room's subject; refresh so subsequent data-plane use works. On
// a plaintext/non-ACL dev bus this is unnecessary and would needlessly drop any
// live SSE subscriptions, so it is gated on the secured posture.
if g.refreshACL {
_ = g.cli.RefreshSession()
}
return roomWire{
ID: roomID,
Subject: subject,
Name: subjectName(subject),
Epoch: 1,
Encrypt: p.Encrypt,
Persist: p.Persist,
SignMsgs: p.SignMsgs,
Role: "owner",
}, nil
}
// join resolves room metadata and (for encrypted rooms) fetches the room key so
// the gateway can later open payloads. Idempotent.
func (g *gateway) join(roomID string) error {
if err := g.cli.Join(roomID); err != nil {
return err
}
if g.refreshACL {
_ = g.cli.RefreshSession()
}
return nil
}
// send publishes plaintext to a room. The unibus client seals it with the room
// key (encrypted rooms) and signs it (signed rooms) before it leaves the process.
func (g *gateway) send(roomID, body string) error {
return g.cli.Publish(roomID, []byte(body))
}
+140
View File
@@ -0,0 +1,140 @@
package main
import (
"sync"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/oklog/ulid/v2"
)
// roomHub multiplexes ONE unibus room subscription to MANY SSE clients. The
// unibus client derives a per-(room, endpoint) durable consumer name, so a
// second Subscribe for the same room from the same operator would contend for
// the same durable (load-balanced delivery) rather than each browser receiving
// every message. The hub holds a single subscription per room and fans each
// decrypted frame out to every connected browser, which also means the gateway
// opens at most one bus subscription per room regardless of how many tabs watch
// it.
type roomHub struct {
roomID string
myEndpoint string
sub *client.Sub
mu sync.Mutex
clients map[chan msgWire]struct{}
}
// frameTS decodes the millisecond timestamp embedded in a frame's ULID id. A
// malformed id (should not happen for bus-produced frames) yields 0, which the
// browser renders without crashing.
func frameTS(msgID string) int64 {
id, err := ulid.Parse(msgID)
if err != nil {
return 0
}
return int64(id.Time())
}
// newRoomHub opens the single bus subscription for roomID and starts fanning
// decrypted frames out to registered clients. The room must already be joined
// (so the gateway holds the room key) before this is called.
func newRoomHub(cli *client.Client, roomID, myEndpoint string) (*roomHub, error) {
h := &roomHub{
roomID: roomID,
myEndpoint: myEndpoint,
clients: map[chan msgWire]struct{}{},
}
sub, err := cli.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
m := msgWire{
ID: f.MsgID,
Sender: f.Sender,
Body: string(plaintext),
TS: frameTS(f.MsgID),
Mine: f.Sender == myEndpoint,
}
h.broadcast(m)
})
if err != nil {
return nil, err
}
h.sub = sub
return h, nil
}
// broadcast delivers a message to every registered client without blocking the
// NATS delivery goroutine: a client whose buffer is full (a stalled browser)
// drops this frame rather than stalling the whole room.
func (h *roomHub) broadcast(m msgWire) {
h.mu.Lock()
defer h.mu.Unlock()
for ch := range h.clients {
select {
case ch <- m:
default:
}
}
}
// add registers a new SSE client channel.
func (h *roomHub) add(ch chan msgWire) {
h.mu.Lock()
defer h.mu.Unlock()
h.clients[ch] = struct{}{}
}
// stop unsubscribes from the bus. Local delivery ends; for a persisted room the
// durable consumer's ack position stays on the server, so a later subscription
// with the same operator resumes from where it left off.
func (h *roomHub) stop() {
if h.sub != nil {
_ = h.sub.Unsubscribe()
}
}
// openStream joins the room (idempotent; fetches the room key for encrypted
// rooms), attaches an SSE client to the room's hub (creating it on first watcher),
// and returns the client's message channel plus a cleanup func. The cleanup
// detaches the client and, when it was the last watcher, tears down the room's
// single bus subscription.
func (g *gateway) openStream(roomID string) (chan msgWire, func(), error) {
if err := g.join(roomID); err != nil {
return nil, nil, err
}
g.mu.Lock()
h := g.hubs[roomID]
if h == nil {
var err error
h, err = newRoomHub(g.cli, roomID, g.endpoint)
if err != nil {
g.mu.Unlock()
return nil, nil, err
}
g.hubs[roomID] = h
}
g.mu.Unlock()
// Buffer so a brief render hitch in the browser does not drop live frames; a
// sustained stall still drops (broadcast is non-blocking) rather than wedging
// the room.
ch := make(chan msgWire, 64)
h.add(ch)
// cleanup takes g.mu before h.mu (the single, consistent lock order) so a
// concurrent openStream that re-creates the hub cannot race the teardown.
cleanup := func() {
g.mu.Lock()
defer g.mu.Unlock()
h.mu.Lock()
delete(h.clients, ch)
empty := len(h.clients) == 0
h.mu.Unlock()
if empty {
if cur := g.hubs[roomID]; cur == h {
delete(g.hubs, roomID)
h.stop()
}
}
}
return ch, cleanup, nil
}
+98
View File
@@ -0,0 +1,98 @@
package main
import (
"encoding/base64"
"encoding/json"
"fmt"
"os"
"os/exec"
cs "fn-registry/functions/cybersecurity"
)
// identityJSON mirrors the on-disk / pass-stored identity format shared across
// the unibus tooling: the four keypair halves, each std-base64. It is the SAME
// shape the bus client persists (pkg/client identity file) and the operator's
// `pass` entry unibus/operator-identity, so the web gateway loads the operator's
// identity without a divergent serialization. Kept in lockstep with
// unibus_admin/internal/admin/identity.go.
type identityJSON struct {
SignPub string `json:"sign_pub"`
SignPriv string `json:"sign_priv"`
KexPub string `json:"kex_pub"`
KexPriv string `json:"kex_priv"`
}
// decodeIdentity turns the JSON identity bytes into a cs.Identity. The private
// halves stay only in memory; this never writes them anywhere.
func decodeIdentity(raw []byte) (cs.Identity, error) {
var f identityJSON
if err := json.Unmarshal(raw, &f); err != nil {
return cs.Identity{}, fmt.Errorf("webgw: parse identity json: %w", err)
}
dec := base64.StdEncoding.DecodeString
signPub, err := dec(f.SignPub)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: decode sign_pub: %w", err)
}
signPriv, err := dec(f.SignPriv)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: decode sign_priv: %w", err)
}
kexPub, err := dec(f.KexPub)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: decode kex_pub: %w", err)
}
kexPriv, err := dec(f.KexPriv)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: decode kex_priv: %w", err)
}
if len(signPub) != 32 || len(signPriv) != 64 || len(kexPub) != 32 || len(kexPriv) != 32 {
return cs.Identity{}, fmt.Errorf("webgw: identity has wrong key sizes (sign_pub=%d sign_priv=%d kex_pub=%d kex_priv=%d)",
len(signPub), len(signPriv), len(kexPub), len(kexPriv))
}
return cs.Identity{SignPub: signPub, SignPriv: signPriv, KexPub: kexPub, KexPriv: kexPriv}, nil
}
// loadIdentityFromFile reads a 0600 identity JSON file (the same format the bus
// client writes) and decodes it. Used on a deploy host where `pass` is not
// available and the operator identity is delivered as a protected file.
func loadIdentityFromFile(path string) (cs.Identity, error) {
raw, err := os.ReadFile(path)
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: read identity file %q: %w", path, err)
}
return decodeIdentity(raw)
}
// loadIdentityFromPass shells out to `pass show <entry>` and decodes the JSON
// identity it returns. The secret is held only in memory; this process never
// writes it to disk or argv. Used in local operator workflows where the GNU
// password store holds unibus/operator-identity.
func loadIdentityFromPass(entry string) (cs.Identity, error) {
out, err := exec.Command("pass", "show", entry).Output()
if err != nil {
return cs.Identity{}, fmt.Errorf("webgw: pass show %q: %w", entry, err)
}
return decodeIdentity(out)
}
// loadPassValue returns the first line of a `pass show <entry>` for non-identity
// secrets (e.g. the unlock passphrase). Empty entry yields an empty string and
// no error, so callers can treat "no pass entry configured" as "not set".
func loadPassValue(entry string) (string, error) {
if entry == "" {
return "", nil
}
out, err := exec.Command("pass", "show", entry).Output()
if err != nil {
return "", fmt.Errorf("webgw: pass show %q: %w", entry, err)
}
s := string(out)
for i := 0; i < len(s); i++ {
if s[i] == '\n' || s[i] == '\r' {
return s[:i], nil
}
}
return s, nil
}
+180
View File
@@ -0,0 +1,180 @@
// Command webgw is the web gateway for the unibus chat SPA. It is a single Go
// binary that holds the operator's bus identity, connects to the bus as a real
// authenticated peer (pkg/client), and exposes a small REST + SSE API the
// browser consumes. The browser never signs, never speaks NATS, and never sees a
// private key: it authenticates to the gateway with a passphrase and thereafter
// holds only an opaque session cookie.
//
// TRUST MODEL (MVP, single operator): room content stays end-to-end encrypted on
// the bus. The gateway can read plaintext because it acts AS the operator's
// client — a legitimate member of each room holding the room key. Decryption
// happens server-side in this process; cleartext then crosses an authenticated
// (loopback or TLS-fronted) SSE channel to the browser. The wallet phase (issue:
// per-browser WebCrypto identity) can move decryption into the browser; see the
// report for the FASE 2 plan.
//
// # local dev against a loopback membershipd (plaintext), operator from pass:
// webgw --identity-pass unibus/operator-identity \
// --ctrl-url http://127.0.0.1:8470 --nats-url nats://127.0.0.1:4250
//
// # secured cluster (TLS + nkey on both planes), identity from a 0600 file:
// webgw --ca ca.crt --identity-file operator.id \
// --ctrl-url https://node-a:8470 --nats-url nats://node-a:4250 \
// --ctrl-urls https://node-b:8470,https://node-c:8470 \
// --nats-urls nats://node-b:4250,nats://node-c:4250
package main
import (
"context"
"flag"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
cs "fn-registry/functions/cybersecurity"
)
func main() {
var (
bind = flag.String("bind", "127.0.0.1", "interface to bind the gateway HTTP server to (loopback by default)")
port = flag.String("port", "8481", "gateway HTTP port")
ctrlURL = flag.String("ctrl-url", "http://127.0.0.1:8470", "primary unibus control-plane base URL")
ctrlURLs = flag.String("ctrl-urls", "", "comma-separated ADDITIONAL control-plane base URLs (cluster failover)")
natsURL = flag.String("nats-url", "nats://127.0.0.1:4250", "primary NATS URL")
natsURLs = flag.String("nats-urls", "", "comma-separated ADDITIONAL NATS seed URLs (cluster failover)")
caPath = flag.String("ca", "", "bus CA cert path; set to talk TLS+nkey to a secured bus (empty = plaintext dev)")
identityFile = flag.String("identity-file", "", "path to the operator identity JSON file (0600). Mutually exclusive with --identity-pass")
identityPass = flag.String("identity-pass", "", "pass(1) entry holding the operator identity JSON, e.g. unibus/operator-identity")
unlockPass = flag.String("unlock-pass", "", "literal passphrase the browser must send to unlock a session (dev). Prefer --unlock-pass-entry")
unlockEntry = flag.String("unlock-pass-entry", "unibus/admin-panel-password", "pass(1) entry holding the unlock passphrase (used when --unlock-pass is empty)")
webDir = flag.String("web-dir", "", "OPTIONAL path to the built SPA (web/dist) to serve. Empty = API only (use vite dev server)")
)
flag.Parse()
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
log.SetPrefix("[webgw] ")
id, err := loadIdentity(*identityFile, *identityPass)
if err != nil {
log.Fatalf("%v", err)
}
unlock := *unlockPass
if unlock == "" {
unlock, err = loadPassValue(*unlockEntry)
if err != nil {
log.Fatalf("resolve unlock passphrase: %v", err)
}
}
if unlock == "" {
log.Fatalf("an unlock passphrase is required: set --unlock-pass or a non-empty --unlock-pass-entry (default unibus/admin-panel-password)")
}
resolvedWebDir := resolveWebDir(*webDir)
gw, err := newGateway(gatewayConfig{
Identity: id,
NatsURL: *natsURL,
CtrlURL: *ctrlURL,
CtrlURLs: splitCSV(*ctrlURLs),
NatsURLs: splitCSV(*natsURLs),
CAPath: *caPath,
})
if err != nil {
log.Fatalf("%v", err)
}
defer gw.Close()
log.Printf("operator endpoint: %s", gw.endpoint)
log.Printf("control plane: %s (+%d failover)", *ctrlURL, len(splitCSV(*ctrlURLs)))
tls := "OFF (plaintext dev)"
if *caPath != "" {
tls = "ON (CA " + *caPath + ")"
}
log.Printf("bus TLS+nkey: %s", tls)
if resolvedWebDir != "" {
log.Printf("serving SPA from: %s", resolvedWebDir)
} else {
log.Printf("API only (no --web-dir): use the vite dev server with a /api+stream proxy")
}
srv := newServer(gw, unlock, resolvedWebDir)
addr := *bind + ":" + *port
httpSrv := &http.Server{
Addr: addr,
Handler: srv,
// No global write timeout: SSE streams are long-lived. Header timeout still
// bounds slowloris on the request line/headers.
ReadHeaderTimeout: 10 * time.Second,
}
go func() {
log.Printf("web gateway: http://%s", addr)
if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("http server: %v", err)
}
}()
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
<-stop
log.Printf("shutting down...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = httpSrv.Shutdown(ctx)
log.Printf("bye")
}
// loadIdentity resolves the operator identity from exactly one of --identity-file
// or --identity-pass.
func loadIdentity(file, passEntry string) (cs.Identity, error) {
switch {
case file != "" && passEntry != "":
return cs.Identity{}, errFlag("set only one of --identity-file or --identity-pass")
case file != "":
return loadIdentityFromFile(file)
case passEntry != "":
return loadIdentityFromPass(passEntry)
default:
return cs.Identity{}, errFlag("an identity is required: pass --identity-file <path> or --identity-pass <entry>")
}
}
// resolveWebDir validates the --web-dir flag. An empty flag means API-only. A
// non-empty dir is kept only if it actually holds an index.html, so a typo logs
// "API only" rather than serving 404s.
func resolveWebDir(dir string) string {
if dir == "" {
return ""
}
abs, err := filepath.Abs(dir)
if err != nil {
log.Printf("WARN --web-dir %q: %v; serving API only", dir, err)
return ""
}
if !statFile(filepath.Join(abs, "index.html")) {
log.Printf("WARN --web-dir %q has no index.html; serving API only", abs)
return ""
}
return abs
}
type flagErr string
func (e flagErr) Error() string { return string(e) }
func errFlag(s string) error { return flagErr("webgw: " + s) }
func splitCSV(s string) []string {
var out []string
for _, p := range strings.Split(s, ",") {
if p = strings.TrimSpace(p); p != "" {
out = append(out, p)
}
}
return out
}
+301
View File
@@ -0,0 +1,301 @@
package main
import (
"crypto/rand"
"crypto/subtle"
"encoding/hex"
"encoding/json"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
// sessionCookie is the name of the gateway's session cookie. The browser sends
// it automatically on same-origin fetches AND on EventSource (SSE) connections —
// EventSource cannot set custom headers, so a cookie is the only way to
// authenticate the stream. It is HttpOnly so page JS can never read the token.
const sessionCookie = "unibus_session"
// server is the gateway's HTTP surface: a small REST/SSE API under /api gated by
// a session cookie, plus an optional static file server for the built SPA. The
// gateway's privileged operator identity never leaves the process; the browser
// authenticates with a passphrase and thereafter holds only an opaque session
// token.
type server struct {
gw *gateway
unlock string // passphrase that unlocks a session (compared in constant time)
webDir string // optional path to the built SPA (web/dist); empty = API only
mux *http.ServeMux
mu sync.Mutex
sessions map[string]time.Time // token -> issued-at
}
func newServer(gw *gateway, unlock, webDir string) *server {
s := &server{
gw: gw,
unlock: unlock,
webDir: webDir,
mux: http.NewServeMux(),
sessions: map[string]time.Time{},
}
s.routes()
return s
}
func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mux.ServeHTTP(w, r) }
func (s *server) routes() {
// Liveness, unauthenticated (systemd / deploy smoke).
s.mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, _ *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
})
// Auth: login is the only /api route reachable without a session.
s.mux.HandleFunc("POST /api/login", s.handleLogin)
s.mux.HandleFunc("POST /api/logout", s.auth(s.handleLogout))
s.mux.HandleFunc("GET /api/me", s.auth(s.handleMe))
s.mux.HandleFunc("GET /api/rooms", s.auth(s.handleListRooms))
s.mux.HandleFunc("POST /api/rooms", s.auth(s.handleCreateRoom))
s.mux.HandleFunc("POST /api/rooms/{id}/join", s.auth(s.handleJoin))
s.mux.HandleFunc("POST /api/rooms/{id}/send", s.auth(s.handleSend))
s.mux.HandleFunc("GET /api/rooms/{id}/stream", s.auth(s.handleStream))
// Everything else is the SPA (when --web-dir is set). Registered last.
if s.webDir != "" {
s.mux.Handle("/", s.spaHandler())
}
}
// ---- auth -----------------------------------------------------------------
// auth wraps a handler so it runs only with a valid session cookie. A missing or
// unknown token yields 401, which the SPA treats as "show the login screen".
func (s *server) auth(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
c, err := r.Cookie(sessionCookie)
if err != nil || !s.validSession(c.Value) {
writeErr(w, http.StatusUnauthorized, "not authenticated")
return
}
next(w, r)
}
}
func (s *server) validSession(token string) bool {
if token == "" {
return false
}
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.sessions[token]
return ok
}
func (s *server) handleLogin(w http.ResponseWriter, r *http.Request) {
var req struct {
Passphrase string `json:"passphrase"`
}
if !decode(w, r, &req) {
return
}
// Constant-time compare so a wrong passphrase cannot be timed character by
// character. An empty configured passphrase never matches (main refuses to
// start without one, so this is defense in depth).
if s.unlock == "" || subtle.ConstantTimeCompare([]byte(req.Passphrase), []byte(s.unlock)) != 1 {
writeErr(w, http.StatusUnauthorized, "wrong passphrase")
return
}
tok := newToken()
s.mu.Lock()
s.sessions[tok] = time.Now()
s.mu.Unlock()
http.SetCookie(w, &http.Cookie{
Name: sessionCookie,
Value: tok,
Path: "/",
HttpOnly: true,
SameSite: http.SameSiteLaxMode,
})
writeJSON(w, http.StatusOK, s.gw.me())
}
func (s *server) handleLogout(w http.ResponseWriter, r *http.Request) {
if c, err := r.Cookie(sessionCookie); err == nil {
s.mu.Lock()
delete(s.sessions, c.Value)
s.mu.Unlock()
}
http.SetCookie(w, &http.Cookie{Name: sessionCookie, Value: "", Path: "/", MaxAge: -1, HttpOnly: true})
writeJSON(w, http.StatusOK, map[string]string{"status": "logged_out"})
}
func (s *server) handleMe(w http.ResponseWriter, _ *http.Request) {
writeJSON(w, http.StatusOK, s.gw.me())
}
// ---- rooms ----------------------------------------------------------------
func (s *server) handleListRooms(w http.ResponseWriter, _ *http.Request) {
rooms, err := s.gw.listRooms()
if err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, rooms)
}
func (s *server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {
var req createRoomReq
if !decode(w, r, &req) {
return
}
rv, err := s.gw.createRoom(req)
if err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusCreated, rv)
}
func (s *server) handleJoin(w http.ResponseWriter, r *http.Request) {
if err := s.gw.join(r.PathValue("id")); err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "joined"})
}
func (s *server) handleSend(w http.ResponseWriter, r *http.Request) {
var req sendReq
if !decode(w, r, &req) {
return
}
if strings.TrimSpace(req.Body) == "" {
writeErr(w, http.StatusBadRequest, "body required")
return
}
if err := s.gw.send(r.PathValue("id"), req.Body); err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "sent"})
}
// handleStream is the SSE endpoint: it joins the room, attaches to the room's
// fan-out hub, and streams each decrypted message as a `data:` event. For a
// persisted room the hub's underlying subscription delivers history first
// (scrollback) and then live messages; for an ephemeral room only live messages
// flow. The stream ends when the browser disconnects (ctx cancelled).
func (s *server) handleStream(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
writeErr(w, http.StatusInternalServerError, "streaming unsupported")
return
}
ch, cleanup, err := s.gw.openStream(r.PathValue("id"))
if err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
defer cleanup()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // disable proxy buffering (nginx/caddy)
w.WriteHeader(http.StatusOK)
// An initial comment opens the stream immediately so the browser's
// EventSource fires `onopen` without waiting for the first message.
_, _ = w.Write([]byte(": connected\n\n"))
flusher.Flush()
ctx := r.Context()
ping := time.NewTicker(25 * time.Second)
defer ping.Stop()
for {
select {
case <-ctx.Done():
return
case <-ping.C:
// Comment line keeps idle proxies from closing the connection.
if _, err := w.Write([]byte(": ping\n\n")); err != nil {
return
}
flusher.Flush()
case m := <-ch:
b, err := json.Marshal(m)
if err != nil {
continue
}
if _, err := w.Write([]byte("data: " + string(b) + "\n\n")); err != nil {
return
}
flusher.Flush()
}
}
}
// ---- SPA serving (optional) -----------------------------------------------
// spaHandler serves the built SPA from s.webDir. A request for an existing asset
// is served directly; any other path (a client-side route) falls back to
// index.html so the SPA router can take over. /api and /healthz are matched first.
func (s *server) spaHandler() http.Handler {
root := http.Dir(s.webDir)
fileServer := http.FileServer(root)
index := filepath.Join(s.webDir, "index.html")
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/")
if p == "" {
http.ServeFile(w, r, index)
return
}
if f, err := root.Open(p); err == nil {
_ = f.Close()
fileServer.ServeHTTP(w, r)
return
}
http.ServeFile(w, r, index) // unknown path -> SPA client-side routing
})
}
// ---- helpers --------------------------------------------------------------
func newToken() string {
b := make([]byte, 32)
_, _ = rand.Read(b)
return hex.EncodeToString(b)
}
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(v)
}
func writeErr(w http.ResponseWriter, code int, msg string) {
writeJSON(w, code, map[string]string{"error": msg})
}
// decode reads a JSON body into v, writing a 400 and returning false on failure.
func decode(w http.ResponseWriter, r *http.Request, v any) bool {
defer r.Body.Close()
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(v); err != nil {
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
return false
}
return true
}
// statFile reports whether path exists and is a regular file (used to validate
// --web-dir at startup so a typo surfaces as a clear log line, not 404s later).
func statFile(path string) bool {
fi, err := os.Stat(path)
return err == nil && !fi.IsDir()
}
@@ -0,0 +1,78 @@
---
issue: 0007
title: Cifrado at-rest del control plane (JetStream KV / SQLite en disco)
status: spec
created: 2026-06-07
domain: security
scope: unibus (pkg/embeddednats, cmd/membershipd, deploy/cluster) + procedimiento de migración del store existente
---
# Objetivo
Cifrar en reposo el almacenamiento del plano de control para que un nodo comprometido
(root en el VPS) o un disco robado no exponga los metadatos de control en claro.
Estado actual (auditado el 07/06/2026, report 0012 y siguientes):
- **Contenido de los mensajes**: cifrado E2E por room (megolm/olm). El servidor nunca ve el
plaintext; no vive en el plano de control. **No es el objeto de este issue.**
- **Claves de room** (`UNIBUS_room_keys`): guardadas **selladas** (sealed box X25519, cifradas
para cada miembro). El servidor las almacena y reparte pero no puede abrirlas. **Ya protegidas.**
- **Metadatos de control** (`UNIBUS_rooms`, `UNIBUS_members`, `UNIBUS_rooms_by_member`,
`UNIBUS_users`): se serializan con `json.Marshal` y se escriben **en claro** en el store. En
cluster ese store es el directorio `local_files/jetstream/` de cada nodo; en single-node es el
archivo SQLite `local_files/unibus.db`. Hoy **no hay cifrado at-rest**: con root en un nodo se
pueden leer subjects de salas, la pertenencia (quién está en qué sala con qué rol), los handles
y roles de los usuarios, y las claves públicas (signPub/kexPub). No se exponen mensajes (E2E) ni
se pueden descifrar salas (claves selladas), pero sí toda la topología.
Tras este issue, los buckets/archivos del control plane quedan cifrados en disco con una clave por
nodo gestionada fuera de git. El modelo de amenaza pasa de "root del nodo ve la topología" a "root
del nodo necesita además la clave at-rest (que puede vivir en un secreto separado / TPM / variable
de entorno inyectada) para leer cualquier cosa".
# Contexto técnico
- NATS Server / JetStream soporta **encryption at-rest** nativo: se configura una cifra
(`aes` o `chacha20`) y una clave; JetStream cifra los ficheros de los streams/KV en disco. El
bus usa un NATS **embebido** (`pkg/embeddednats`), así que la activación es por opciones del
servidor embebido, no por un `nats-server.conf` externo.
- Para el backend SQLite (single-node) el equivalente sería SQLCipher o cifrado a nivel de
archivo/FS; queda como sub-tarea de menor prioridad porque el despliegue real es cluster (KV).
# Tareas
1. Confirmar la API de encryption-at-rest del NATS embebido en la versión usada (opción de
servidor para cipher + clave; cómo se pasa la clave de forma que no quede en argv ni en git).
2. Activar el cifrado en `pkg/embeddednats` detrás de una opción de configuración. La clave se
inyecta por archivo (`--jetstream-encryption-key-file`, 0600, junto a las claves TLS del nodo)
o variable de entorno desde el unit systemd; nunca en argv ni commiteada.
3. `cmd/membershipd`: flag/env para la clave + reflejar el estado en la posture publicada en
`/healthz` (p.ej. `"at_rest":true`) para que el monitor lo verifique.
4. `deploy/cluster`: provisionar la clave at-rest por nodo (generación + `pass`/secrets gitignored)
y cablearla en `cluster.env` + el unit. Documentar en el runbook.
5. **Migración del store existente** (gotcha crítico): JetStream no re-cifra retroactivamente los
datos ya escritos en claro. Diseñar y documentar el procedimiento seguro para el cluster en
producción (probable: backup → exportar snapshot del control plane → parar nodo → recrear el
store con la clave activa → re-importar; o rotación nodo a nodo aprovechando la replicación R3).
Respetar la regla de migraciones (aditivo, sin pérdida de datos).
6. Tests: arrancar un nodo con clave at-rest, escribir un user/room, y verificar que el fichero en
disco **no** contiene en claro un subject/handle conocido (grep negativo), y que el nodo sigue
leyéndolos con la clave. Verificar que sin la clave el store no se abre.
# Definition of Done
- Cifrado at-rest activo en los 3 nodos del cluster; `/healthz` lo refleja en la posture.
- Evidencia ejecutable: un valor conocido (subject de sala / handle de usuario) **no** aparece en
claro al hacer `grep` sobre `local_files/jetstream/`; el nodo lo sigue sirviendo con la clave.
- Procedimiento de migración probado sobre datos reales sin pérdida (snapshot/restore verificado).
- La clave at-rest nunca está en git ni en argv; vive en archivo 0600 / secreto inyectado.
- No baja ninguna otra capa de seguridad (enforce + ACL + TLS + E2E + sealed keys intactas).
# Notas
Aditivo y ortogonal al resto de la seguridad: TLS protege en tránsito, E2E el contenido, las claves
de room van selladas; este issue cierra el último hueco (metadatos de control en claro en disco)
para el modelo de amenaza "VPS comprometido / disco robado". Prioridad media: el despliegue ya es
seguro frente a ataques de red (enforce+TLS+ACL); esto endurece frente a compromiso físico/root del
host. Relacionado con el endurecimiento de los issues 0004/0005/0006.
+70
View File
@@ -456,6 +456,23 @@ type memberRoomJSON struct {
Role string `json:"role"` Role string `json:"role"`
} }
// userJSON mirrors the server's wire type on the admin user-management endpoints.
type userJSON struct {
SignPub string `json:"sign_pub"`
Handle string `json:"handle"`
Role string `json:"role"`
Status string `json:"status"`
CreatedAt string `json:"created_at"`
RevokedAt string `json:"revoked_at,omitempty"`
}
// addUserReq is the POST /users body (mirror of the server type).
type addUserReq struct {
SignPub string `json:"sign_pub"`
Handle string `json:"handle"`
Role string `json:"role"`
}
// ---- room operations ------------------------------------------------------ // ---- room operations ------------------------------------------------------
// RoomRef is a room this peer belongs to, returned by ListMyRooms. It is the // RoomRef is a room this peer belongs to, returned by ListMyRooms. It is the
@@ -490,6 +507,59 @@ func (c *Client) ListMyRooms() ([]RoomRef, error) {
return out, nil return out, nil
} }
// ---- user administration (admin-only) ------------------------------------
// UserInfo is a bus user as returned by the admin user-management endpoints. It
// is a flat view (no nested types) for the admin panel: the signing key
// (lowercase hex), handle, role ("admin"|"member"), status ("active"|"revoked"),
// and timestamps. RevokedAt is empty for an active user.
type UserInfo struct {
SignPub string
Handle string
Role string
Status string
CreatedAt string
RevokedAt string
}
// ListUsers returns the full bus allowlist, including revoked users. The caller
// must be signing as an admin: a non-admin signer is rejected by the server with
// 403, surfaced here as an error.
func (c *Client) ListUsers() ([]UserInfo, error) {
var resp []userJSON
if err := c.doJSON("GET", "/users", nil, &resp); err != nil {
return nil, err
}
out := make([]UserInfo, 0, len(resp))
for _, u := range resp {
out = append(out, UserInfo{
SignPub: u.SignPub,
Handle: u.Handle,
Role: u.Role,
Status: u.Status,
CreatedAt: u.CreatedAt,
RevokedAt: u.RevokedAt,
})
}
return out, nil
}
// AddUser registers a bus user from their Ed25519 signing public key (64-hex).
// role is "admin" or "member" (empty defaults to member, matching the server).
// The caller must be signing as an admin. Re-adding an already-registered key
// returns an error (the server replies 409 and leaves the existing row
// untouched — no silent role/status change).
func (c *Client) AddUser(signPub, handle, role string) error {
return c.doJSON("POST", "/users", addUserReq{SignPub: signPub, Handle: handle, Role: role}, nil)
}
// RevokeUser revokes a bus user by their signing public key (64-hex). Revocation
// is a status flip (no hard delete): the identity stays auditable and is denied
// on both planes immediately. The caller must be signing as an admin.
func (c *Client) RevokeUser(signPub string) error {
return c.doJSON("POST", "/users/"+signPub+"/revoke", nil, nil)
}
// newRoomKey returns 32 random bytes for a symmetric room key. // newRoomKey returns 32 random bytes for a symmetric room key.
func newRoomKey() ([]byte, error) { func newRoomKey() ([]byte, error) {
k := make([]byte, 32) k := make([]byte, 32)
+99
View File
@@ -0,0 +1,99 @@
package client_test
import (
"encoding/hex"
"strings"
"testing"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/membership"
)
// findUserInfo returns the row with the given signing key (case-insensitive).
func findUserInfo(users []client.UserInfo, signPub string) (client.UserInfo, bool) {
want := strings.ToLower(signPub)
for _, u := range users {
if strings.ToLower(u.SignPub) == want {
return u, true
}
}
return client.UserInfo{}, false
}
// TestClientUsersAdminAPI drives the admin user-management API through the real
// pkg/client methods against an in-process membershipd under enforce: an admin
// client adds a user, lists it, revokes it, and sees the status flip — and a
// non-admin client is denied. This is the path the admin panel uses, so it locks
// the client/server contract the panel depends on.
func TestClientUsersAdminAPI(t *testing.T) {
h := newHarnessMode(t, membership.AuthEnforce)
waitHealth(t, h.ctrlURL)
admin, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect admin: %v", err)
}
defer admin.Close()
registerClient(t, h, admin, "admin", membership.RoleAdmin)
member, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect member: %v", err)
}
defer member.Close()
registerClient(t, h, member, "member", membership.RoleMember)
// A brand-new identity the admin will register over HTTP.
carol := mustIdentity(t)
carolPub := hex.EncodeToString(carol.SignPub)
// Admin adds carol as a member.
if err := admin.AddUser(carolPub, "carol", membership.RoleMember); err != nil {
t.Fatalf("admin AddUser: %v", err)
}
// Admin lists: carol present and active.
users, err := admin.ListUsers()
if err != nil {
t.Fatalf("admin ListUsers: %v", err)
}
row, ok := findUserInfo(users, carolPub)
if !ok {
t.Fatalf("carol missing from list after add: %+v", users)
}
if row.Status != membership.StatusActive || row.Role != membership.RoleMember {
t.Fatalf("carol row wrong after add: %+v", row)
}
// Re-adding the same key is a conflict surfaced as an error (no silent upsert).
if err := admin.AddUser(carolPub, "carol-again", membership.RoleAdmin); err == nil {
t.Fatalf("re-adding carol should error (409), got nil")
}
// Admin revokes carol; list shows the status flip (no hard delete).
if err := admin.RevokeUser(carolPub); err != nil {
t.Fatalf("admin RevokeUser: %v", err)
}
users, err = admin.ListUsers()
if err != nil {
t.Fatalf("admin ListUsers after revoke: %v", err)
}
row, ok = findUserInfo(users, carolPub)
if !ok {
t.Fatalf("carol vanished after revoke (should be a status flip): %+v", users)
}
if row.Status != membership.StatusRevoked {
t.Fatalf("carol should be revoked, got status %q", row.Status)
}
// A non-admin (member) is denied on every user-management method.
if _, err := member.ListUsers(); err == nil {
t.Fatalf("non-admin ListUsers should error (403), got nil")
}
if err := member.AddUser(carolPub, "x", membership.RoleMember); err == nil {
t.Fatalf("non-admin AddUser should error (403), got nil")
}
if err := member.RevokeUser(carolPub); err == nil {
t.Fatalf("non-admin RevokeUser should error (403), got nil")
}
}
+173 -7
View File
@@ -213,9 +213,12 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeErr(w, http.StatusUnauthorized, "unauthorized: "+err.Error()) writeErr(w, http.StatusUnauthorized, "unauthorized: "+err.Error())
return return
} }
// Carry the authenticated signer's endpoint into the handler so room handlers // Carry the authenticated signer's endpoint AND signing key into the handler.
// can authorize by membership (audit H3). Only set on a verified identity. // Room handlers authorize by membership via the endpoint (audit H3); the
s.mux.ServeHTTP(w, r.WithContext(withSigner(r.Context(), res.endpoint))) // user-management handlers authorize by role via the signing key (the endpoint
// id is a one-way hash of the key, so it cannot be reversed to look the signer
// up in the user allowlist). Both are set only on a verified identity.
s.mux.ServeHTTP(w, r.WithContext(withSigner(r.Context(), res.endpoint, res.pubHex)))
} }
// isBodyTooLarge reports whether err is the sentinel returned by MaxBytesReader // isBodyTooLarge reports whether err is the sentinel returned by MaxBytesReader
@@ -229,11 +232,19 @@ func isBodyTooLarge(err error) bool {
// values cannot collide with keys set by other packages. // values cannot collide with keys set by other packages.
type ctxKey int type ctxKey int
const ctxSignerEndpoint ctxKey = iota const (
ctxSignerEndpoint ctxKey = iota
ctxSignerPub
)
// withSigner returns a context carrying the authenticated signer's endpoint id. // withSigner returns a context carrying the authenticated signer's endpoint id
func withSigner(ctx context.Context, endpoint string) context.Context { // and signing public key (lowercase hex). The endpoint authorizes room
return context.WithValue(ctx, ctxSignerEndpoint, endpoint) // membership; the signing key authorizes user-management by role, because the
// endpoint id is a one-way hash of the key (base64url(sha256(signPub))) and so
// cannot be reversed to look the signer up in the user allowlist.
func withSigner(ctx context.Context, endpoint, pubHex string) context.Context {
ctx = context.WithValue(ctx, ctxSignerEndpoint, endpoint)
return context.WithValue(ctx, ctxSignerPub, pubHex)
} }
// signerEndpoint returns the authenticated signer's endpoint id and whether one // signerEndpoint returns the authenticated signer's endpoint id and whether one
@@ -245,6 +256,16 @@ func signerEndpoint(r *http.Request) (string, bool) {
return v, ok && v != "" return v, ok && v != ""
} }
// signerPubHex returns the authenticated signer's signing public key (lowercase
// hex) and whether one is present. Like signerEndpoint it is absent under
// AuthOff and on a soft-mode pass-through; the user-management handlers treat
// that absence as "no admin identity" and deny (default-deny), since a
// privilege-granting operation must never run without a verified admin.
func signerPubHex(r *http.Request) (string, bool) {
v, ok := r.Context().Value(ctxSignerPub).(string)
return v, ok && v != ""
}
// requireMember authorizes a room request by membership (audit H3): it returns // requireMember authorizes a room request by membership (audit H3): it returns
// the signer endpoint and true when the request may proceed, or writes 403 and // the signer endpoint and true when the request may proceed, or writes 403 and
// returns false when an authenticated signer is not a member of roomID. When no // returns false when an authenticated signer is not a member of roomID. When no
@@ -262,6 +283,31 @@ func (s *Server) requireMember(w http.ResponseWriter, r *http.Request, roomID st
return signer, true return signer, true
} }
// requireAdmin authorizes a user-management request: it returns the signer's
// signing-key hex and true ONLY when the authenticated signer is a user with
// role admin and active status; otherwise it writes 403 and returns false.
//
// Default-deny, with no dev relaxation: unlike requireMember (which allows a
// request when no authenticated signer is present, preserving AuthOff/dev
// behavior for room reads), this denies whenever the signer is absent or is not
// a verified active admin. The user-management endpoints grant and revoke bus
// access, so they must never be reachable without a verified admin identity —
// the store is consulted on every call so a just-revoked admin is denied
// immediately, and any store error fails closed.
func (s *Server) requireAdmin(w http.ResponseWriter, r *http.Request) (string, bool) {
pubHex, ok := signerPubHex(r)
if !ok {
writeErr(w, http.StatusForbidden, "forbidden: admin role required")
return "", false
}
u, err := s.store.GetUser(pubHex)
if err != nil || u.Role != RoleAdmin || u.Status != StatusActive {
writeErr(w, http.StatusForbidden, "forbidden: admin role required")
return "", false
}
return pubHex, true
}
// isAuthExempt lists requests that bypass control-plane auth even under enforce. // isAuthExempt lists requests that bypass control-plane auth even under enforce.
// Only the unauthenticated health probe qualifies: it carries no data and is // Only the unauthenticated health probe qualifies: it carries no data and is
// needed by load balancers / smoke checks / systemd before any identity exists. // needed by load balancers / smoke checks / systemd before any identity exists.
@@ -280,6 +326,13 @@ func (s *Server) routes() {
s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom) s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom)
s.mux.HandleFunc("POST /blobs", s.handlePutBlob) s.mux.HandleFunc("POST /blobs", s.handlePutBlob)
s.mux.HandleFunc("GET /blobs/{hash}", s.handleGetBlob) s.mux.HandleFunc("GET /blobs/{hash}", s.handleGetBlob)
// User-management (admin-only) — the HTTP-signed equivalent of the local
// `membershipd user` CLI, so the admin panel manages the bus allowlist by
// signing as an admin instead of needing direct store/KV access. All three
// pass through requireAdmin; they hit the same store the room handlers do.
s.mux.HandleFunc("GET /users", s.handleListUsers)
s.mux.HandleFunc("POST /users", s.handleAddUser)
s.mux.HandleFunc("POST /users/{signpub}/revoke", s.handleRevokeUser)
} }
// ---- wire types ----------------------------------------------------------- // ---- wire types -----------------------------------------------------------
@@ -357,6 +410,27 @@ type blobResp struct {
Hash string `json:"hash"` Hash string `json:"hash"`
} }
// userJSON is the wire representation of a bus user on the admin endpoints. It
// carries the full record the panel needs to render the allowlist, including
// status (so revoked users are visible) and the timestamps. revoked_at is
// omitted for an active user.
type userJSON struct {
SignPub string `json:"sign_pub"`
Handle string `json:"handle"`
Role string `json:"role"`
Status string `json:"status"`
CreatedAt string `json:"created_at"`
RevokedAt string `json:"revoked_at,omitempty"`
}
// addUserReq is the POST /users body: the new user's Ed25519 signing key
// (64-hex), human handle, and role. role is optional and defaults to member.
type addUserReq struct {
SignPub string `json:"sign_pub"`
Handle string `json:"handle"`
Role string `json:"role"`
}
// ---- helpers -------------------------------------------------------------- // ---- helpers --------------------------------------------------------------
func writeJSON(w http.ResponseWriter, code int, v any) { func writeJSON(w http.ResponseWriter, code int, v any) {
@@ -674,3 +748,95 @@ func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
_, _ = w.Write(data) _, _ = w.Write(data)
} }
// ---- user-management handlers (admin-only) --------------------------------
// handleListUsers returns the full bus allowlist, including revoked users, so an
// admin sees the complete picture (a revoked identity stays auditable). Admin-only.
func (s *Server) handleListUsers(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
users, err := s.store.ListUsers()
if err != nil {
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
out := make([]userJSON, 0, len(users))
for _, u := range users {
out = append(out, userJSON{
SignPub: u.SignPub,
Handle: u.Handle,
Role: u.Role,
Status: u.Status,
CreatedAt: u.CreatedAt,
RevokedAt: u.RevokedAt,
})
}
writeJSON(w, http.StatusOK, out)
}
// handleAddUser registers a new bus user from an admin-supplied Ed25519 signing
// key. It mirrors the `membershipd user add` CLI: the key must be 64-hex, the
// role must be admin or member (empty defaults to member), and re-adding an
// already-registered key is a 409 that leaves the existing row untouched — no
// silent upsert that could flip a role or clobber status. Admin-only.
func (s *Server) handleAddUser(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
var req addUserReq
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
return
}
if req.SignPub == "" || req.Handle == "" {
writeErr(w, http.StatusBadRequest, "sign_pub and handle required")
return
}
if err := ValidateSignPubHex(req.SignPub); err != nil {
writeErr(w, http.StatusBadRequest, err.Error())
return
}
role := req.Role
if role == "" {
role = RoleMember
}
if role != RoleAdmin && role != RoleMember {
writeErr(w, http.StatusBadRequest,
fmt.Sprintf("invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember))
return
}
if err := s.store.AddUser(req.SignPub, req.Handle, role); err != nil {
if errors.Is(err, ErrUserExists) {
// Idempotency contract (mirrors the CLI): re-adding a key is an explicit,
// non-destructive conflict. To replace a user, revoke then add again.
writeErr(w, http.StatusConflict,
"user already registered (unchanged); revoke it first to replace")
return
}
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusCreated, map[string]string{"status": "added"})
}
// handleRevokeUser revokes a bus user by signing key. Revocation is a status
// flip (no hard delete) so the identity stays auditable and IsAuthorized denies
// it on both planes immediately. Revoking an unknown or already-revoked key is a
// 404. Admin-only.
func (s *Server) handleRevokeUser(w http.ResponseWriter, r *http.Request) {
if _, ok := s.requireAdmin(w, r); !ok {
return
}
signPub := r.PathValue("signpub")
if err := ValidateSignPubHex(signPub); err != nil {
writeErr(w, http.StatusBadRequest, err.Error())
return
}
if err := s.store.RevokeUser(signPub); err != nil {
writeServerErr(w, r, http.StatusNotFound, "no active user with that key", err)
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "revoked"})
}
+18
View File
@@ -2,6 +2,7 @@ package membership
import ( import (
"database/sql" "database/sql"
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
@@ -35,6 +36,23 @@ type User struct {
RevokedAt string // empty unless revoked RevokedAt string // empty unless revoked
} }
// ValidateSignPubHex ensures signPub is exactly a 32-byte Ed25519 public key in
// hex (64 hex chars). It is the single source of truth for that check, shared by
// the local admin CLI (which validates before seeding the first admin) and the
// HTTP user-management handlers (which validate an admin-supplied key before it
// reaches the store). Catching a malformed key here turns a silent "authorized
// nobody" into an explicit error at the boundary.
func ValidateSignPubHex(signPub string) error {
b, err := hex.DecodeString(signPub)
if err != nil {
return fmt.Errorf("sign-pub is not valid hex: %w", err)
}
if len(b) != 32 {
return fmt.Errorf("sign-pub must be a 32-byte Ed25519 public key (64 hex chars), got %d bytes", len(b))
}
return nil
}
// normalizeSignPub lowercases the hex key so lookups are case-insensitive: the // normalizeSignPub lowercases the hex key so lookups are case-insensitive: the
// primary key is stored lowercase and every query normalizes its input the same // primary key is stored lowercase and every query normalizes its input the same
// way, so a caller passing uppercase hex still matches. // way, so a caller passing uppercase hex still matches.
+164
View File
@@ -0,0 +1,164 @@
package membership
import (
"encoding/hex"
"encoding/json"
"net/http"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
)
// signedJSON is signedReq for a JSON body: it marshals v and signs the request
// as id with a distinct nonce. It returns the response status and body, reusing
// the auth_test harness so these tests exercise the real signed wire contract.
func signedJSON(t *testing.T, h *authHarness, method, path string, v any, id cs.Identity, n int) (int, string) {
t.Helper()
var body []byte
if v != nil {
b, err := json.Marshal(v)
if err != nil {
t.Fatalf("marshal body: %v", err)
}
body = b
}
return do(t, signedReq(t, h.ts.URL, method, path, body, id, time.Now().Unix(), nonceN(n)))
}
// TestUsersHTTP_NonAdminForbidden is the security spine: a REGISTERED but
// non-admin signer (bob, role member) is denied on every user-management
// endpoint. His signature clears auth (he is in the allowlist), so each request
// reaches the handler, where requireAdmin returns 403 — default-deny by role.
func TestUsersHTTP_NonAdminForbidden(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
bob, _ := cs.GenerateIdentity()
register(t, h, bob, "bob") // role member (see register in authz_test.go)
bobPub := hex.EncodeToString(bob.SignPub)
victim, _ := cs.GenerateIdentity()
victimPub := hex.EncodeToString(victim.SignPub)
checks := []struct {
name string
method string
path string
body any
}{
{"list users", "GET", "/users", nil},
{"add user", "POST", "/users", addUserReq{SignPub: victimPub, Handle: "mallory", Role: RoleMember}},
{"revoke user", "POST", "/users/" + bobPub + "/revoke", nil},
}
for i, c := range checks {
code, body := signedJSON(t, h, c.method, c.path, c.body, bob, i+1)
if code != http.StatusForbidden {
t.Fatalf("non-admin %s should be 403, got %d (%s)", c.name, code, body)
}
}
}
// TestUsersHTTP_AdminRoundtrip exercises the golden path end to end: alice (the
// seeded admin) adds carol, sees her in the list as active, revokes her, then
// sees her status flip to revoked (no hard delete — she stays in the list).
func TestUsersHTTP_AdminRoundtrip(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
carol, _ := cs.GenerateIdentity()
carolPub := hex.EncodeToString(carol.SignPub)
// Add carol as a member.
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: carolPub, Handle: "carol", Role: RoleMember}, h.alice, 1); code != http.StatusCreated {
t.Fatalf("admin add carol should be 201, got %d (%s)", code, body)
}
// List: carol present and active; alice (the seed admin) also present.
users := listUsers(t, h, 2)
carolRow, ok := findUser(users, carolPub)
if !ok {
t.Fatalf("carol missing from list after add: %+v", users)
}
if carolRow.Status != StatusActive || carolRow.Role != RoleMember || carolRow.Handle != "carol" {
t.Fatalf("carol row wrong after add: %+v", carolRow)
}
if _, ok := findUser(users, h.alicePub); !ok {
t.Fatalf("seeded admin alice missing from list: %+v", users)
}
// Revoke carol.
if code, body := signedJSON(t, h, "POST", "/users/"+carolPub+"/revoke", nil, h.alice, 3); code != http.StatusOK {
t.Fatalf("admin revoke carol should be 200, got %d (%s)", code, body)
}
// List again: carol still present, now revoked (status flip, not delete).
users = listUsers(t, h, 4)
carolRow, ok = findUser(users, carolPub)
if !ok {
t.Fatalf("carol vanished from list after revoke (should be a status flip): %+v", users)
}
if carolRow.Status != StatusRevoked {
t.Fatalf("carol should be revoked, got status %q", carolRow.Status)
}
}
// TestUsersHTTP_Validation covers the input-validation contract: a malformed hex
// key is 400, an unknown role is 400, and re-adding an already-registered key is
// 409 (the existing row is left untouched — no silent upsert).
func TestUsersHTTP_Validation(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
good, _ := cs.GenerateIdentity()
goodPub := hex.EncodeToString(good.SignPub)
// Invalid hex (too short) -> 400.
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: "abcd", Handle: "shorty", Role: RoleMember}, h.alice, 1); code != http.StatusBadRequest {
t.Fatalf("malformed sign_pub should be 400, got %d (%s)", code, body)
}
// Invalid role -> 400.
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: goodPub, Handle: "weirdrole", Role: "superuser"}, h.alice, 2); code != http.StatusBadRequest {
t.Fatalf("invalid role should be 400, got %d (%s)", code, body)
}
// Re-adding the seeded admin's own key -> 409 (idempotency, no overwrite).
if code, body := signedJSON(t, h, "POST", "/users",
addUserReq{SignPub: h.alicePub, Handle: "alice-again", Role: RoleMember}, h.alice, 3); code != http.StatusConflict {
t.Fatalf("re-adding an existing key should be 409, got %d (%s)", code, body)
}
// And the existing row is untouched: alice is still an active admin.
u, err := h.store.GetUser(h.alicePub)
if err != nil {
t.Fatalf("get alice after conflicting re-add: %v", err)
}
if u.Role != RoleAdmin || u.Status != StatusActive || u.Handle != "alice" {
t.Fatalf("conflicting re-add mutated the existing row: %+v", u)
}
}
// listUsers signs a GET /users as alice and decodes the response.
func listUsers(t *testing.T, h *authHarness, n int) []userJSON {
t.Helper()
code, body := signedJSON(t, h, "GET", "/users", nil, h.alice, n)
if code != http.StatusOK {
t.Fatalf("admin list users should be 200, got %d (%s)", code, body)
}
var users []userJSON
if err := json.Unmarshal([]byte(body), &users); err != nil {
t.Fatalf("decode users: %v (%s)", err, body)
}
return users
}
// findUser returns the row with the given signing key (case-insensitive).
func findUser(users []userJSON, signPub string) (userJSON, bool) {
want := normalizeSignPub(signPub)
for _, u := range users {
if normalizeSignPub(u.SignPub) == want {
return u, true
}
}
return userJSON{}, false
}
+36 -3
View File
@@ -1,11 +1,44 @@
import { useState } from "react"; import { useEffect, useState } from "react";
import { Center, Loader } from "@mantine/core";
import { Login } from "./Login"; import { Login } from "./Login";
import { ChatShell } from "./ChatShell"; import { ChatShell } from "./ChatShell";
import { api } from "./api";
import type { User } from "./types"; import type { User } from "./types";
// shortEndpoint hace legible el endpoint id del operador para mostrarlo como
// handle por defecto cuando no se escribió uno en el login.
function shortEndpoint(ep: string) {
return ep.slice(0, 8);
}
export function App() { export function App() {
const [user, setUser] = useState<User | null>(null); const [user, setUser] = useState<User | null>(null);
const [checking, setChecking] = useState(true);
if (!user) return <Login onLogin={setUser} />; // Al montar, comprueba si ya hay una sesión viva en el gateway (cookie). Si la
return <ChatShell user={user} onLogout={() => setUser(null)} />; // hay, entra directo; si no (401), muestra el login.
useEffect(() => {
api
.me()
.then((me) =>
setUser({ id: me.endpoint, handle: shortEndpoint(me.endpoint) }),
)
.catch(() => {})
.finally(() => setChecking(false));
}, []);
const logout = () => {
void api.logout().catch(() => {});
setUser(null);
};
if (checking) {
return (
<Center h="100vh" bg="dark.9">
<Loader color="brand" />
</Center>
);
}
if (!user) return <Login onLogin={setUser} />;
return <ChatShell user={user} onLogout={logout} />;
} }
+38 -23
View File
@@ -19,7 +19,8 @@ import {
IconDotsVertical, IconDotsVertical,
IconPaperclip, IconPaperclip,
} from "@tabler/icons-react"; } from "@tabler/icons-react";
import type { Message, Room, User } from "./types"; import { api, streamRoom } from "./api";
import type { Message, Room } from "./types";
function initials(s: string) { function initials(s: string) {
return s.replace(/[^a-z0-9]/gi, "").slice(0, 2).toUpperCase() || "?"; return s.replace(/[^a-z0-9]/gi, "").slice(0, 2).toUpperCase() || "?";
@@ -54,22 +55,30 @@ function MessageRow({ msg }: { msg: Message }) {
); );
} }
export function ChatPanel({ export function ChatPanel({ room }: { room: Room | undefined }) {
room,
user,
}: {
room: Room | undefined;
user: User;
}) {
const [draft, setDraft] = useState(""); const [draft, setDraft] = useState("");
const [extra, setExtra] = useState<Record<string, Message[]>>({}); const [messages, setMessages] = useState<Message[]>([]);
const [sendError, setSendError] = useState<string | null>(null);
const viewport = useRef<HTMLDivElement>(null); const viewport = useRef<HTMLDivElement>(null);
const msgs = room ? [...room.messages, ...(extra[room.id] ?? [])] : []; // Abre el stream SSE de la room activa. El gateway entrega historia (rooms
// persistidas) y luego mensajes en vivo, ya descifrados. Dedup por id porque
// un re-render no debe duplicar y el eco del propio envío llega por aquí.
useEffect(() => {
setMessages([]);
setSendError(null);
if (!room) return;
const close = streamRoom(room.id, (m) => {
setMessages((prev) =>
prev.some((p) => p.id === m.id) ? prev : [...prev, m],
);
});
return close;
}, [room?.id]);
useEffect(() => { useEffect(() => {
viewport.current?.scrollTo({ top: viewport.current.scrollHeight }); viewport.current?.scrollTo({ top: viewport.current.scrollHeight });
}, [room?.id, msgs.length]); }, [room?.id, messages.length]);
if (!room) { if (!room) {
return ( return (
@@ -79,18 +88,19 @@ export function ChatPanel({
); );
} }
const send = () => { const send = async () => {
const body = draft.trim(); const body = draft.trim();
if (!body) return; if (!body) return;
const msg: Message = {
id: `local-${Date.now()}`,
sender: user.handle,
body,
ts: Date.now(),
mine: true,
};
setExtra((e) => ({ ...e, [room.id]: [...(e[room.id] ?? []), msg] }));
setDraft(""); setDraft("");
setSendError(null);
try {
// No optimista: el mensaje propio vuelve por SSE con su id real (mine:true),
// evitando duplicados.
await api.send(room.id, body);
} catch (e) {
setDraft(body); // restaura el borrador si el envío falló
setSendError(e instanceof Error ? e.message : "No se pudo enviar");
}
}; };
return ( return (
@@ -126,13 +136,18 @@ export function ChatPanel({
<ScrollArea style={{ flex: 1 }} viewportRef={viewport}> <ScrollArea style={{ flex: 1 }} viewportRef={viewport}>
<Stack gap="lg" p="md"> <Stack gap="lg" p="md">
{msgs.map((m) => ( {messages.map((m) => (
<MessageRow key={m.id} msg={m} /> <MessageRow key={m.id} msg={m} />
))} ))}
</Stack> </Stack>
</ScrollArea> </ScrollArea>
<Divider color="dark.4" /> <Divider color="dark.4" />
{sendError && (
<Text c="red" size="xs" px="sm" pt={4}>
{sendError}
</Text>
)}
<Group p="sm" gap="xs" wrap="nowrap"> <Group p="sm" gap="xs" wrap="nowrap">
<ActionIcon variant="subtle" color="gray" size="lg"> <ActionIcon variant="subtle" color="gray" size="lg">
<IconPaperclip size={18} /> <IconPaperclip size={18} />
@@ -143,14 +158,14 @@ export function ChatPanel({
placeholder={`Mensaje a ${room.name}`} placeholder={`Mensaje a ${room.name}`}
value={draft} value={draft}
onChange={(e) => setDraft(e.currentTarget.value)} onChange={(e) => setDraft(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && send()} onKeyDown={(e) => e.key === "Enter" && void send()}
/> />
<ActionIcon <ActionIcon
size="lg" size="lg"
radius="xl" radius="xl"
variant="filled" variant="filled"
color="brand" color="brand"
onClick={send} onClick={() => void send()}
disabled={!draft.trim()} disabled={!draft.trim()}
> >
<IconSend size={18} /> <IconSend size={18} />
+56 -7
View File
@@ -1,9 +1,9 @@
import { useState } from "react"; import { useCallback, useEffect, useState } from "react";
import { Flex, Box } from "@mantine/core"; import { Flex, Box, Center, Loader, Stack, Text, Button } from "@mantine/core";
import { Sidebar } from "./Sidebar"; import { Sidebar } from "./Sidebar";
import { ChatPanel } from "./ChatPanel"; import { ChatPanel } from "./ChatPanel";
import { MOCK_ROOMS } from "./mock"; import { api } from "./api";
import type { User } from "./types"; import type { Room, User } from "./types";
export function ChatShell({ export function ChatShell({
user, user,
@@ -12,10 +12,59 @@ export function ChatShell({
user: User; user: User;
onLogout: () => void; onLogout: () => void;
}) { }) {
const [rooms] = useState(MOCK_ROOMS); const [rooms, setRooms] = useState<Room[]>([]);
const [activeId, setActiveId] = useState<string>(rooms[0]?.id ?? ""); const [activeId, setActiveId] = useState<string>("");
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const load = useCallback(() => {
setLoading(true);
api
.listRooms()
.then((rs) => {
setRooms(rs);
setActiveId((cur) => cur || rs[0]?.id || "");
setError(null);
})
.catch((e) => setError(e?.message ?? "No se pudieron cargar las rooms"))
.finally(() => setLoading(false));
}, []);
useEffect(() => {
load();
}, [load]);
const active = rooms.find((r) => r.id === activeId); const active = rooms.find((r) => r.id === activeId);
// El panel derecho muestra el estado de carga/error/empty sin tocar el layout.
let panel = <ChatPanel room={active} />;
if (loading && rooms.length === 0) {
panel = (
<Center h="100%">
<Loader color="brand" />
</Center>
);
} else if (error) {
panel = (
<Center h="100%">
<Stack align="center" gap="sm">
<Text c="red" size="sm">
{error}
</Text>
<Button variant="light" color="brand" onClick={load}>
Reintentar
</Button>
</Stack>
</Center>
);
} else if (rooms.length === 0) {
panel = (
<Center h="100%">
<Text c="dimmed">No perteneces a ninguna room todavía</Text>
</Center>
);
}
return ( return (
<Flex h="100vh" w="100vw" style={{ overflow: "hidden" }}> <Flex h="100vh" w="100vw" style={{ overflow: "hidden" }}>
<Box <Box
@@ -36,7 +85,7 @@ export function ChatShell({
/> />
</Box> </Box>
<Box flex={1} h="100%" bg="dark.7" style={{ minWidth: 0 }}> <Box flex={1} h="100%" bg="dark.7" style={{ minWidth: 0 }}>
<ChatPanel room={active} user={user} /> {panel}
</Box> </Box>
</Flex> </Flex>
); );
+30 -5
View File
@@ -11,15 +11,29 @@ import {
Title, Title,
} from "@mantine/core"; } from "@mantine/core";
import { IconShieldLock, IconKey } from "@tabler/icons-react"; import { IconShieldLock, IconKey } from "@tabler/icons-react";
import { api, ApiError } from "./api";
import type { User } from "./types"; import type { User } from "./types";
export function Login({ onLogin }: { onLogin: (u: User) => void }) { export function Login({ onLogin }: { onLogin: (u: User) => void }) {
const [handle, setHandle] = useState(""); const [handle, setHandle] = useState("");
const [password, setPassword] = useState(""); const [password, setPassword] = useState("");
const [busy, setBusy] = useState(false);
const [error, setError] = useState<string | null>(null);
const ready = handle.trim().length > 0 && password.length > 0; const ready = handle.trim().length > 0 && password.length > 0;
const connect = () => { const connect = async () => {
const h = handle.trim(); if (!ready || busy) return;
if (ready) onLogin({ id: h, handle: h }); setBusy(true);
setError(null);
try {
// La contraseña desbloquea la sesión del gateway (passphrase del operador).
// El handle es solo el nombre a mostrar en esta iteración (wallet = fase 2).
const me = await api.login(password);
const h = handle.trim() || me.endpoint.slice(0, 8);
onLogin({ id: me.endpoint, handle: h });
} catch (e) {
setError(e instanceof ApiError ? e.message : "No se pudo conectar al gateway");
setBusy(false);
}
}; };
return ( return (
@@ -52,9 +66,20 @@ export function Login({ onLogin }: { onLogin: (u: User) => void }) {
leftSection={<IconKey size={16} />} leftSection={<IconKey size={16} />}
value={password} value={password}
onChange={(e) => setPassword(e.currentTarget.value)} onChange={(e) => setPassword(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && connect()} onKeyDown={(e) => e.key === "Enter" && void connect()}
/> />
<Button w="100%" size="md" onClick={connect} disabled={!ready}> {error && (
<Text c="red" size="sm" ta="center">
{error}
</Text>
)}
<Button
w="100%"
size="md"
onClick={() => void connect()}
disabled={!ready}
loading={busy}
>
Conectar Conectar
</Button> </Button>
</Stack> </Stack>
+131
View File
@@ -0,0 +1,131 @@
// La única capa por la que la SPA habla con el bus. Cada llamada va al gateway Go
// bajo /api; el gateway mantiene la sesión `pkg/client` (peer autenticado del
// bus), cifra/descifra por room y traduce a REST/SSE. El navegador nunca firma,
// nunca habla NATS y nunca ve una clave privada: solo guarda una cookie de
// sesión opaca (HttpOnly) que el gateway emite tras el login.
import type { MeInfo, Message, MsgWire, Room, RoomWire } from "./types";
export class ApiError extends Error {
status: number;
constructor(message: string, status: number) {
super(message);
this.status = status;
}
}
async function req<T>(path: string, init?: RequestInit): Promise<T> {
const res = await fetch(path, {
// same-origin envía la cookie de sesión automáticamente (también detrás del
// proxy de vite en dev).
credentials: "same-origin",
headers: { "Content-Type": "application/json" },
...init,
});
const text = await res.text();
let body: unknown = null;
if (text) {
try {
body = JSON.parse(text);
} catch {
body = text;
}
}
if (!res.ok) {
const msg =
body && typeof body === "object" && "error" in body
? String((body as { error: unknown }).error)
: `HTTP ${res.status}`;
throw new ApiError(msg, res.status);
}
return body as T;
}
// roomFromWire mapea la fila del gateway al tipo Room que consume la UI. Los
// mensajes NO viven aquí: llegan por stream(). lastMessage/lastTs/unread se
// rellenan de forma neutra para no inventar datos (la cabecera de la sidebar se
// alimentará del stream en una iteración futura).
export function roomFromWire(r: RoomWire): Room {
return {
id: r.id,
name: r.name || r.subject,
encrypted: r.encrypt,
lastMessage: "",
lastTs: 0,
unread: 0,
messages: [],
};
}
// messageFromWire mapea un frame descifrado del SSE al tipo Message de la UI.
export function messageFromWire(m: MsgWire): Message {
return {
id: m.id,
sender: m.sender,
body: m.body,
ts: m.ts,
mine: m.mine,
};
}
export const api = {
// ---- sesión -------------------------------------------------------------
// login desbloquea la sesión del gateway con la passphrase del operador. El
// gateway responde con una cookie de sesión; me() comprueba si ya hay una.
login: (passphrase: string) =>
req<MeInfo>("/api/login", {
method: "POST",
body: JSON.stringify({ passphrase }),
}),
logout: () => req<{ status: string }>("/api/logout", { method: "POST" }),
me: () => req<MeInfo>("/api/me"),
// ---- rooms --------------------------------------------------------------
listRooms: async (): Promise<Room[]> => {
const wire = await req<RoomWire[]>("/api/rooms");
return wire.map(roomFromWire);
},
// createRoom: {subject, encrypted} basta — el gateway deriva la policy
// Matrix-like (cifrada + persistida + firmada) por defecto.
createRoom: async (subject: string, encrypted = true): Promise<Room> => {
const r = await req<RoomWire>("/api/rooms", {
method: "POST",
body: JSON.stringify({ subject, encrypted }),
});
return roomFromWire(r);
},
join: (roomID: string) =>
req<{ status: string }>(
`/api/rooms/${encodeURIComponent(roomID)}/join`,
{ method: "POST" },
),
send: (roomID: string, body: string) =>
req<{ status: string }>(
`/api/rooms/${encodeURIComponent(roomID)}/send`,
{ method: "POST", body: JSON.stringify({ body }) },
),
};
// streamRoom abre el SSE de una room y llama onMessage por cada frame descifrado
// (historia primero en rooms persistidas, luego en vivo). Devuelve una función
// de cierre. EventSource manda la cookie de sesión automáticamente y reconecta
// solo si la conexión cae; onError se invoca en cada corte para que la UI pueda
// reflejar el estado.
export function streamRoom(
roomID: string,
onMessage: (m: Message) => void,
onError?: (e: Event) => void,
): () => void {
const es = new EventSource(
`/api/rooms/${encodeURIComponent(roomID)}/stream`,
);
es.onmessage = (ev) => {
try {
const wire = JSON.parse(ev.data) as MsgWire;
onMessage(messageFromWire(wire));
} catch {
// frame malformado: se ignora, el stream sigue.
}
};
if (onError) es.onerror = onError;
return () => es.close();
}
+33 -3
View File
@@ -1,5 +1,5 @@
// Tipos de dominio de la UI. En la iteración 1 se llenan con datos mock; // Tipos de dominio de la UI. Los datos vienen del gateway Go (REST/SSE), que es
// más adelante vendrán del gateway (REST/SSE) que es un peer del bus. // un peer autenticado del bus. El navegador nunca firma ni habla NATS.
export interface User { export interface User {
id: string; id: string;
@@ -8,7 +8,7 @@ export interface User {
export interface Message { export interface Message {
id: string; id: string;
sender: string; // handle sender: string; // endpoint id del remitente (handle legible es fase 2)
body: string; body: string;
ts: number; // epoch ms ts: number; // epoch ms
mine?: boolean; mine?: boolean;
@@ -23,3 +23,33 @@ export interface Room {
unread: number; unread: number;
messages: Message[]; messages: Message[];
} }
// ---- formas de la API del gateway (wire) ---------------------------------
// MeInfo es la identidad del operador que el gateway encarna (GET /api/me).
export interface MeInfo {
endpoint: string;
sign_pub: string;
}
// RoomWire es la fila de room que devuelve el gateway (GET /api/rooms). No trae
// mensajes: estos llegan por SSE (GET /api/rooms/{id}/stream).
export interface RoomWire {
id: string;
subject: string;
name: string;
epoch: number;
encrypt: boolean;
persist: boolean;
sign_msgs: boolean;
role: string;
}
// MsgWire es un mensaje ya descifrado que el gateway empuja por SSE.
export interface MsgWire {
id: string;
sender: string;
body: string;
ts: number;
mine: boolean;
}
+8 -1
View File
@@ -3,5 +3,12 @@ import react from "@vitejs/plugin-react";
export default defineConfig({ export default defineConfig({
plugins: [react()], plugins: [react()],
server: { host: true, port: 5181 }, // En dev, /api (REST + SSE) se proxea al gateway Go (cmd/webgw, puerto 8481).
// El proxy hace streaming, así que el SSE de /api/rooms/{id}/stream funciona a
// través de él. En producción el gateway sirve el dist embebido y no hay proxy.
server: {
host: true,
port: 5181,
proxy: { "/api": "http://127.0.0.1:8481" },
},
}); });