Merge branch 'issue/room-history-endpoint'
Server owns the JetStream stream of persisted rooms + GET /rooms/{id}/history so
clients without JetStream (uniweb) can read the backlog over plain HTTP.
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
name: unibus
|
||||
lang: go
|
||||
domain: infra
|
||||
version: 0.15.1
|
||||
version: 0.16.0
|
||||
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
|
||||
tags: [service, messaging, nats, e2e]
|
||||
uses_functions:
|
||||
@@ -225,6 +225,20 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
|
||||
|
||||
## Capability growth log
|
||||
|
||||
- v0.16.0 (2026-06-14) — feat: el server asegura el stream JetStream de las rooms
|
||||
persist + `GET /rooms/{id}/history` para que clientes sin JetStream (uniweb) lean
|
||||
el histórico. (1) `handleCreateRoom` crea (idempotente, `CreateOrUpdateStream`) el
|
||||
stream durable `UNIBUS_<roomID>` de una room persist ANTES de responder, así su
|
||||
subject se captura desde el minuto cero venga el mensaje de un cliente Go o de un
|
||||
cliente browser que solo habla core NATS (antes el stream lo creaba solo el cliente
|
||||
Go, así que los mensajes de uniweb se perdían). (2) Nuevo endpoint member-only
|
||||
`GET /rooms/{id}/history?limit=N` (default 200, cap 1000): lee el stream
|
||||
server-side y devuelve `{messages:[<base64-std del frame marshalado>]}` en orden
|
||||
oldest→newest; el server jamás descifra (relay del ciphertext E2E). Backfill de
|
||||
rooms persist existentes: lazy-ensure del stream en el endpoint (empiezan a
|
||||
capturar desde ahora; los mensajes previos al stream no son recuperables). El
|
||||
control plane abre ahora su propio contexto JetStream también en single-node
|
||||
embebido. Todo aditivo; build/vet/test verdes.
|
||||
- v0.15.1 (2026-06-14) — fix: la ruta del directorio se registraba con prefijo /api y Caddy lo stripeaba (404 en prod); corregida a /directory.
|
||||
- v0.15.0 (2026-06-14) — nombres legibles + provisioning de bots de un comando.
|
||||
(1) Nuevo `GET /api/directory` en el control-plane: cualquier usuario activo del
|
||||
|
||||
+21
-3
@@ -150,6 +150,16 @@ func main() {
|
||||
decentralized := *storeBackend == "kv"
|
||||
needJS := clustered || decentralized
|
||||
enforce := authMode == membership.AuthEnforce
|
||||
embedded := *natsURL == ""
|
||||
// The control plane also needs a privileged JetStream client to OWN the durable
|
||||
// per-room streams of persisted rooms (ensure the stream on room creation so the
|
||||
// subject is captured from the first message — even from a JetStream-less browser
|
||||
// client — and read it back for GET /rooms/{id}/history). The embedded NATS
|
||||
// always ships JetStream, so open the client whenever we run embedded, even for a
|
||||
// standalone SQLite node. For an EXTERNAL NATS we only reach for JetStream when a
|
||||
// cluster/KV feature explicitly requires it (unchanged), so an operator-managed
|
||||
// external deployment without those features behaves exactly as before.
|
||||
openJS := needJS || embedded
|
||||
|
||||
// Internal service identity (issue 0006a): when the embedded data plane enforces
|
||||
// auth, membershipd must still connect to its OWN server to manage JetStream.
|
||||
@@ -159,7 +169,7 @@ func main() {
|
||||
// the server is embedded), so a standalone or non-enforce node is unchanged.
|
||||
var internalID cs.Identity
|
||||
var internalPubHex string
|
||||
if needJS && enforce && *natsURL == "" {
|
||||
if openJS && enforce && embedded {
|
||||
if *internalIDFile != "" {
|
||||
// Persisted identity: load it, generating + writing it (0600) on first
|
||||
// start. A stable internal key is what `user add --store kv` presents to
|
||||
@@ -316,9 +326,9 @@ func main() {
|
||||
// only client that can connect in this window (the holder still denies everyone
|
||||
// else; the internal identity bypasses the store).
|
||||
var js jetstream.JetStream
|
||||
if needJS {
|
||||
if openJS {
|
||||
var internalNC *nats.Conn
|
||||
if *natsURL == "" {
|
||||
if embedded {
|
||||
internalNC, js, err = connectInternalJS(ns, internalID, enforce)
|
||||
} else {
|
||||
internalNC, js, err = connectExternalJS(natsClientURL, *caFile)
|
||||
@@ -340,6 +350,14 @@ func main() {
|
||||
}
|
||||
|
||||
srv := membership.NewServer(store, blobs, authMode)
|
||||
// Wire the privileged JetStream context so the control plane owns persisted
|
||||
// rooms' durable streams (ensure on create + serve GET /rooms/{id}/history). The
|
||||
// stream replication factor matches the control-plane KV replication so a room's
|
||||
// history is as available as its metadata. js is nil only for an external NATS
|
||||
// without a cluster/KV feature, where history degrades to empty (see openJS).
|
||||
if js != nil {
|
||||
srv.SetJetStream(js, *kvReplicas)
|
||||
}
|
||||
// On a public (non-loopback) bind, disable cleartext rooms: the embedded NATS
|
||||
// has no per-subject ACL, so cleartext content would be readable by any
|
||||
// registered peer. Forcing E2E keeps message content confidential regardless
|
||||
|
||||
@@ -0,0 +1,198 @@
|
||||
package membership
|
||||
|
||||
// Server-side durable history for persisted rooms (room.ModeMatrix / Persist).
|
||||
//
|
||||
// A persisted room's messages ride a file-backed JetStream stream named
|
||||
// "UNIBUS_<roomID>" (roomStreamName, identical to pkg/client.streamName). Until
|
||||
// now that stream was created only by the Go client's first publish/subscribe; a
|
||||
// client that speaks only core NATS (the browser client uniweb, which has no
|
||||
// JetStream) therefore never created it, so its messages were captured nowhere and
|
||||
// vanished on reload. This file moves stream ownership to the server: the control
|
||||
// plane ensures the stream when a persisted room is created (so capture starts at
|
||||
// minute zero whoever publishes) and exposes GET /rooms/{id}/history so a
|
||||
// JetStream-less client can read the backlog over plain HTTP.
|
||||
//
|
||||
// The server never decrypts: each stored message is the E2E frame exactly as it
|
||||
// was published (ciphertext for an encrypted room). The history endpoint returns
|
||||
// those bytes verbatim (base64-encoded for JSON safety), so end-to-end encryption
|
||||
// is preserved — the server only relays the bytes it already holds.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
const (
|
||||
// defaultHistoryLimit is the number of most-recent messages returned when the
|
||||
// caller does not specify ?limit.
|
||||
defaultHistoryLimit = 200
|
||||
// maxHistoryLimit is the hard ceiling on a single history response, so a caller
|
||||
// cannot ask the server to buffer an unbounded backlog into one JSON payload.
|
||||
maxHistoryLimit = 1000
|
||||
// historyOpTimeout bounds each JetStream operation the history path performs
|
||||
// (stream lookup/ensure, info, per-message get) so a stalled data plane cannot
|
||||
// hang a control-plane request indefinitely.
|
||||
historyOpTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// historyResp is the GET /rooms/{id}/history response envelope. messages is the
|
||||
// ordered (oldest→newest) list of the room's most recent frames, each the base64
|
||||
// (standard encoding) of the marshaled, still-encrypted frame as it was published.
|
||||
// The key is a stable contract consumed by the browser client; do not rename it.
|
||||
type historyResp struct {
|
||||
Messages []string `json:"messages"`
|
||||
}
|
||||
|
||||
// streamConfigForRoom builds the JetStream stream config for a persisted room.
|
||||
//
|
||||
// It MUST stay byte-for-byte compatible with pkg/client/persist.go's ensureStream
|
||||
// (the original owner of this format): same name derivation (roomStreamName ==
|
||||
// pkg/client.streamName), same single subject, LimitsPolicy retention, file
|
||||
// storage. pkg/client is the source of truth for the format; we copy it here
|
||||
// rather than import it because pkg/client imports pkg/membership and importing it
|
||||
// back would be a cycle. The only addition is Replicas, matched to the cluster's
|
||||
// control-plane replication so a persisted room's history is as available as its
|
||||
// metadata (1 standalone, up to 3 in an HA cluster). CreateOrUpdateStream treats a
|
||||
// matching config as a no-op, so the client's later ensureStream is harmless.
|
||||
func streamConfigForRoom(roomID, subject string, replicas int) jetstream.StreamConfig {
|
||||
if replicas < 1 {
|
||||
replicas = 1
|
||||
}
|
||||
return jetstream.StreamConfig{
|
||||
Name: roomStreamName(roomID),
|
||||
Subjects: []string{subject},
|
||||
Retention: jetstream.LimitsPolicy,
|
||||
Storage: jetstream.FileStorage,
|
||||
Replicas: replicas,
|
||||
}
|
||||
}
|
||||
|
||||
// ensureRoomStream idempotently creates (or no-ops on) the durable stream that
|
||||
// captures a persisted room's subject. CreateOrUpdateStream returns the existing
|
||||
// stream unchanged when the config matches, so this is safe to call on every room
|
||||
// creation and on every history read (lazy backfill of pre-existing rooms).
|
||||
func ensureRoomStream(ctx context.Context, js jetstream.JetStream, roomID, subject string, replicas int) error {
|
||||
if _, err := js.CreateOrUpdateStream(ctx, streamConfigForRoom(roomID, subject, replicas)); err != nil {
|
||||
return fmt.Errorf("membership: ensure stream for room %s: %w", roomID, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// readRoomHistory returns the last `limit` messages of a room's durable stream in
|
||||
// chronological order (oldest→newest), each base64-encoded (standard encoding). A
|
||||
// stream that does not exist yet, or that holds no messages, yields an empty slice
|
||||
// (not an error): a freshly created or never-used room simply has no history. It
|
||||
// reads by sequence via the stream MSG.GET API rather than binding a consumer, so
|
||||
// it has no side effects on any peer's durable ack position. A gap in the sequence
|
||||
// range (a purged/deleted message) is skipped rather than failing the whole read,
|
||||
// so the result length is bounded by `limit` but may be smaller.
|
||||
func readRoomHistory(ctx context.Context, js jetstream.JetStream, roomID string, limit int) ([]string, error) {
|
||||
out := []string{}
|
||||
stream, err := js.Stream(ctx, roomStreamName(roomID))
|
||||
if err != nil {
|
||||
if errors.Is(err, jetstream.ErrStreamNotFound) {
|
||||
return out, nil
|
||||
}
|
||||
return nil, fmt.Errorf("membership: lookup stream for room %s: %w", roomID, err)
|
||||
}
|
||||
si, err := stream.Info(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("membership: stream info for room %s: %w", roomID, err)
|
||||
}
|
||||
first, last := si.State.FirstSeq, si.State.LastSeq
|
||||
if si.State.Msgs == 0 || last == 0 {
|
||||
return out, nil
|
||||
}
|
||||
// Window of the last `limit` sequence numbers, clamped to the first stored seq.
|
||||
// last >= limit guards the unsigned subtraction against underflow.
|
||||
start := first
|
||||
if last >= uint64(limit) {
|
||||
if cand := last - uint64(limit) + 1; cand > start {
|
||||
start = cand
|
||||
}
|
||||
}
|
||||
for seq := start; seq <= last; seq++ {
|
||||
raw, err := stream.GetMsg(ctx, seq)
|
||||
if err != nil {
|
||||
// A purged/deleted sequence leaves a gap; skip it rather than abort.
|
||||
continue
|
||||
}
|
||||
out = append(out, base64.StdEncoding.EncodeToString(raw.Data))
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// parseHistoryLimit reads the ?limit query value, applying the default when it is
|
||||
// absent and clamping out-of-range / malformed values to [1, maxHistoryLimit].
|
||||
func parseHistoryLimit(q string) int {
|
||||
if q == "" {
|
||||
return defaultHistoryLimit
|
||||
}
|
||||
n, err := strconv.Atoi(q)
|
||||
if err != nil || n <= 0 {
|
||||
return defaultHistoryLimit
|
||||
}
|
||||
if n > maxHistoryLimit {
|
||||
return maxHistoryLimit
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// handleRoomHistory serves GET /rooms/{id}/history: the last ?limit (default 200,
|
||||
// hard cap 1000) messages of a persisted room, oldest→newest, each the base64 of
|
||||
// the still-encrypted frame as published. The server never decrypts — it relays
|
||||
// the ciphertext bytes the stream already holds, preserving E2E.
|
||||
//
|
||||
// Authorization mirrors the sibling room reads (/key, /members): the request must
|
||||
// be a member of the room (requireMember; allowed under AuthOff/dev where no signer
|
||||
// is verified). A missing room is 404; a non-member is 403; an unsigned request
|
||||
// under enforce is rejected with 401 by the auth middleware before this runs.
|
||||
//
|
||||
// For a persisted room the stream is ensured first (lazy backfill): a room created
|
||||
// before the server managed streams begins capturing from now on. Messages sent
|
||||
// before the stream existed were never captured and are unrecoverable — only
|
||||
// messages from stream creation onward appear here.
|
||||
func (s *Server) handleRoomHistory(w http.ResponseWriter, r *http.Request) {
|
||||
roomID := r.PathValue("id")
|
||||
// Existence first so a missing room is a clean 404 (the documented contract),
|
||||
// distinct from a 403 for an existing room the caller is not a member of.
|
||||
info, err := s.store.GetRoom(roomID)
|
||||
if err != nil {
|
||||
writeErr(w, http.StatusNotFound, "room not found")
|
||||
return
|
||||
}
|
||||
if _, ok := s.requireMember(w, r, roomID); !ok {
|
||||
return
|
||||
}
|
||||
limit := parseHistoryLimit(r.URL.Query().Get("limit"))
|
||||
|
||||
// No JetStream wired (e.g. an external-NATS deployment without a cluster/KV
|
||||
// feature): there is no durable stream to read, so report an empty history
|
||||
// rather than 500 — a client degrades to "no backlog" gracefully.
|
||||
if s.js == nil {
|
||||
writeJSON(w, http.StatusOK, historyResp{Messages: []string{}})
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), historyOpTimeout)
|
||||
defer cancel()
|
||||
if info.Persist {
|
||||
if err := ensureRoomStream(ctx, s.js, roomID, info.Subject, s.streamReplicas); err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
msgs, err := readRoomHistory(ctx, s.js, roomID, limit)
|
||||
if err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, historyResp{Messages: msgs})
|
||||
}
|
||||
@@ -0,0 +1,400 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
// historyHarness is an enforce-mode control plane wired to a real embedded NATS
|
||||
// JetStream, so the history path exercises the production code: the server ensures
|
||||
// and reads actual durable streams. alice is a seeded admin (and any room's owner),
|
||||
// bob is a registered user added as a room member, and carol is a registered user
|
||||
// that is NOT a member of the test room (to exercise the 403 path).
|
||||
type historyHarness struct {
|
||||
ts *httptest.Server
|
||||
store Store
|
||||
js jetstream.JetStream
|
||||
nc *nats.Conn
|
||||
alice cs.Identity // admin + room owner
|
||||
bob cs.Identity // room member
|
||||
carol cs.Identity // registered, non-member
|
||||
}
|
||||
|
||||
func newHistoryHarness(t *testing.T) *historyHarness {
|
||||
t.Helper()
|
||||
dir := t.TempDir()
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: filepath.Join(dir, "jetstream"),
|
||||
Host: "127.0.0.1",
|
||||
Port: kvFreePort(t),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("embedded nats: %v", err)
|
||||
}
|
||||
nc, err := nats.Connect(ns.ClientURL())
|
||||
if err != nil {
|
||||
ns.Shutdown()
|
||||
t.Fatalf("nats connect: %v", err)
|
||||
}
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
t.Fatalf("jetstream: %v", err)
|
||||
}
|
||||
store, err := Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
t.Fatalf("open store: %v", err)
|
||||
}
|
||||
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
if err != nil {
|
||||
store.Close()
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
t.Fatalf("open blobs: %v", err)
|
||||
}
|
||||
mustID := func(name string) cs.Identity {
|
||||
id, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("identity %s: %v", name, err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
alice, bob, carol := mustID("alice"), mustID("bob"), mustID("carol")
|
||||
if err := store.AddUser(hex.EncodeToString(alice.SignPub), "alice", RoleAdmin); err != nil {
|
||||
t.Fatalf("seed admin: %v", err)
|
||||
}
|
||||
for _, u := range []struct {
|
||||
id cs.Identity
|
||||
handle string
|
||||
}{{bob, "bob"}, {carol, "carol"}} {
|
||||
if err := store.AddUser(hex.EncodeToString(u.id.SignPub), u.handle, RoleMember); err != nil {
|
||||
t.Fatalf("register %s: %v", u.handle, err)
|
||||
}
|
||||
}
|
||||
|
||||
srv := NewServer(store, blobs, AuthEnforce)
|
||||
srv.SetJetStream(js, 1)
|
||||
ts := httptest.NewServer(srv)
|
||||
t.Cleanup(func() {
|
||||
ts.Close()
|
||||
store.Close()
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
ns.WaitForShutdown()
|
||||
})
|
||||
return &historyHarness{ts: ts, store: store, js: js, nc: nc, alice: alice, bob: bob, carol: carol}
|
||||
}
|
||||
|
||||
// seedPersistRoom creates a persisted (Matrix-policy) room directly in the store
|
||||
// with alice as owner and bob as a member, returning its id and subject. It does
|
||||
// NOT create the stream — that is left to the code under test (handleCreateRoom or
|
||||
// the lazy ensure in the history endpoint), which is exactly what we want to verify.
|
||||
func (h *historyHarness) seedPersistRoom(t *testing.T) (roomID, subject string) {
|
||||
t.Helper()
|
||||
roomID = newULID()
|
||||
subject = "unibus.room." + roomID
|
||||
aliceEp := frame.EndpointID(h.alice.SignPub)
|
||||
info := RoomInfo{RoomID: roomID, Subject: subject, OwnerEndpoint: aliceEp, Encrypt: true, Persist: true}
|
||||
if err := h.store.CreateRoom(info, h.alice.SignPub, h.alice.KexPub, []byte("alice-sealed")); err != nil {
|
||||
t.Fatalf("seed room: %v", err)
|
||||
}
|
||||
bobEp := frame.EndpointID(h.bob.SignPub)
|
||||
bobM := Member{Endpoint: bobEp, Role: RoleMember, SignPub: h.bob.SignPub, KexPub: h.bob.KexPub}
|
||||
if err := h.store.AddMember(roomID, bobM, 0, []byte("bob-sealed")); err != nil {
|
||||
t.Fatalf("add member bob: %v", err)
|
||||
}
|
||||
return roomID, subject
|
||||
}
|
||||
|
||||
// makeFrame builds a marshaled PUB frame whose payload identifies it, so a test can
|
||||
// assert exact bytes and ordering after a round trip through the stream + endpoint.
|
||||
func makeFrame(t *testing.T, subject, sender string, i int) []byte {
|
||||
t.Helper()
|
||||
f := frame.Frame{
|
||||
Type: frame.PUB,
|
||||
Subject: subject,
|
||||
Sender: sender,
|
||||
MsgID: fmt.Sprintf("msg-%02d", i),
|
||||
Payload: []byte(fmt.Sprintf("ciphertext-%02d", i)),
|
||||
}
|
||||
b, err := f.Marshal()
|
||||
if err != nil {
|
||||
t.Fatalf("marshal frame %d: %v", i, err)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// getHistory signs a GET /rooms/{id}/history request as id and returns the status,
|
||||
// the raw body, and the decoded envelope. query is the raw query string (e.g.
|
||||
// "limit=2") or "". The signed path includes the query because the server verifies
|
||||
// the signature over r.URL.RequestURI(), which carries it.
|
||||
func (h *historyHarness) getHistory(t *testing.T, id cs.Identity, roomID, query string, n int) (int, string, historyResp) {
|
||||
t.Helper()
|
||||
path := "/rooms/" + roomID + "/history"
|
||||
if query != "" {
|
||||
path += "?" + query
|
||||
}
|
||||
req := signedReq(t, h.ts.URL, "GET", path, nil, id, time.Now().Unix(), nonceN(n))
|
||||
code, body := do(t, req)
|
||||
var out historyResp
|
||||
if code == 200 {
|
||||
if err := json.Unmarshal([]byte(body), &out); err != nil {
|
||||
t.Fatalf("decode history: %v (%s)", err, body)
|
||||
}
|
||||
}
|
||||
return code, body, out
|
||||
}
|
||||
|
||||
// TestCreateRoomEnsuresStream verifies handleCreateRoom creates the durable stream
|
||||
// for a persisted room before responding, so capture starts at room creation.
|
||||
func TestCreateRoomEnsuresStream(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
aliceEp := frame.EndpointID(h.alice.SignPub)
|
||||
reqBody := createRoomReq{
|
||||
Subject: "unibus.room.created",
|
||||
Policy: policyJSON{Encrypt: true, Persist: true},
|
||||
Owner: endpointJSON{Endpoint: aliceEp, SignPub: h.alice.SignPub, KexPub: h.alice.KexPub},
|
||||
SealedKeySelf: []byte("alice-sealed"),
|
||||
}
|
||||
body, _ := json.Marshal(reqBody)
|
||||
req := signedReq(t, h.ts.URL, "POST", "/rooms", body, h.alice, time.Now().Unix(), nonceN(1))
|
||||
code, respBody := do(t, req)
|
||||
if code != 201 {
|
||||
t.Fatalf("create room: want 201, got %d (%s)", code, respBody)
|
||||
}
|
||||
var cr createRoomResp
|
||||
if err := json.Unmarshal([]byte(respBody), &cr); err != nil {
|
||||
t.Fatalf("decode create resp: %v (%s)", err, respBody)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
if _, err := h.js.Stream(ctx, roomStreamName(cr.RoomID)); err != nil {
|
||||
t.Fatalf("stream for created persist room should exist: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryGolden is the golden path: three frames published to a persisted
|
||||
// room's stream come back from the endpoint base64-encoded, in chronological order,
|
||||
// and decode to the exact frames that were published.
|
||||
func TestRoomHistoryGolden(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, subject := h.seedPersistRoom(t)
|
||||
bobEp := frame.EndpointID(h.bob.SignPub)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil {
|
||||
t.Fatalf("ensure stream: %v", err)
|
||||
}
|
||||
want := make([][]byte, 3)
|
||||
for i := 0; i < 3; i++ {
|
||||
want[i] = makeFrame(t, subject, bobEp, i)
|
||||
// js.Publish waits for the stream ack, so the message is durably stored before
|
||||
// the next iteration — no sleeps, deterministic ordering.
|
||||
if _, err := h.js.Publish(ctx, subject, want[i]); err != nil {
|
||||
t.Fatalf("publish %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
code, raw, hr := h.getHistory(t, h.bob, roomID, "", 10)
|
||||
if code != 200 {
|
||||
t.Fatalf("history: want 200, got %d (%s)", code, raw)
|
||||
}
|
||||
if len(hr.Messages) != 3 {
|
||||
t.Fatalf("want 3 messages, got %d (%s)", len(hr.Messages), raw)
|
||||
}
|
||||
for i, m := range hr.Messages {
|
||||
decoded, err := base64.StdEncoding.DecodeString(m)
|
||||
if err != nil {
|
||||
t.Fatalf("message %d not valid base64: %v", i, err)
|
||||
}
|
||||
if string(decoded) != string(want[i]) {
|
||||
t.Fatalf("message %d bytes mismatch (order or content)", i)
|
||||
}
|
||||
f, err := frame.Unmarshal(decoded)
|
||||
if err != nil {
|
||||
t.Fatalf("message %d does not decode to a frame: %v", i, err)
|
||||
}
|
||||
if f.MsgID != fmt.Sprintf("msg-%02d", i) {
|
||||
t.Fatalf("message %d: want MsgID msg-%02d, got %q", i, i, f.MsgID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryCapturesCoreNATSPublish proves the central fix: a message
|
||||
// published over PLAIN core NATS (as the JetStream-less browser client uniweb does)
|
||||
// is captured by the server-owned stream and served by the endpoint. Without the
|
||||
// server ensuring the stream, this message would be captured nowhere.
|
||||
func TestRoomHistoryCapturesCoreNATSPublish(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, subject := h.seedPersistRoom(t)
|
||||
bobEp := frame.EndpointID(h.bob.SignPub)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil {
|
||||
t.Fatalf("ensure stream: %v", err)
|
||||
}
|
||||
sent := makeFrame(t, subject, bobEp, 7)
|
||||
if err := h.nc.Publish(subject, sent); err != nil {
|
||||
t.Fatalf("core publish: %v", err)
|
||||
}
|
||||
if err := h.nc.Flush(); err != nil {
|
||||
t.Fatalf("flush: %v", err)
|
||||
}
|
||||
// Core NATS publish has no stream ack; poll the stream until the message lands.
|
||||
h.waitMsgs(t, roomID, 1)
|
||||
|
||||
code, raw, hr := h.getHistory(t, h.bob, roomID, "", 11)
|
||||
if code != 200 {
|
||||
t.Fatalf("history: want 200, got %d (%s)", code, raw)
|
||||
}
|
||||
if len(hr.Messages) != 1 {
|
||||
t.Fatalf("want 1 captured message, got %d (%s)", len(hr.Messages), raw)
|
||||
}
|
||||
decoded, err := base64.StdEncoding.DecodeString(hr.Messages[0])
|
||||
if err != nil || string(decoded) != string(sent) {
|
||||
t.Fatalf("captured core-NATS message round-trip mismatch (err=%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryLimit verifies ?limit caps the response to the most recent N
|
||||
// messages, oldest→newest within the window.
|
||||
func TestRoomHistoryLimit(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, subject := h.seedPersistRoom(t)
|
||||
bobEp := frame.EndpointID(h.bob.SignPub)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil {
|
||||
t.Fatalf("ensure stream: %v", err)
|
||||
}
|
||||
for i := 0; i < 5; i++ {
|
||||
if _, err := h.js.Publish(ctx, subject, makeFrame(t, subject, bobEp, i)); err != nil {
|
||||
t.Fatalf("publish %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
code, raw, hr := h.getHistory(t, h.bob, roomID, "limit=2", 12)
|
||||
if code != 200 {
|
||||
t.Fatalf("history: want 200, got %d (%s)", code, raw)
|
||||
}
|
||||
if len(hr.Messages) != 2 {
|
||||
t.Fatalf("limit=2 over 5 messages: want 2, got %d", len(hr.Messages))
|
||||
}
|
||||
// The window is the last two messages (indices 3 and 4), in order.
|
||||
for off, m := range hr.Messages {
|
||||
decoded, _ := base64.StdEncoding.DecodeString(m)
|
||||
f, err := frame.Unmarshal(decoded)
|
||||
if err != nil {
|
||||
t.Fatalf("limited message %d does not decode: %v", off, err)
|
||||
}
|
||||
want := fmt.Sprintf("msg-%02d", off+3)
|
||||
if f.MsgID != want {
|
||||
t.Fatalf("limited message %d: want MsgID %s, got %q", off, want, f.MsgID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryEmptyRoom verifies a persisted room with no messages returns an
|
||||
// empty (non-null) array, lazily ensuring the stream on the way.
|
||||
func TestRoomHistoryEmptyRoom(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, _ := h.seedPersistRoom(t)
|
||||
|
||||
code, raw, hr := h.getHistory(t, h.bob, roomID, "", 13)
|
||||
if code != 200 {
|
||||
t.Fatalf("history: want 200, got %d (%s)", code, raw)
|
||||
}
|
||||
if hr.Messages == nil {
|
||||
t.Fatalf("empty room must return [] not null (%s)", raw)
|
||||
}
|
||||
if len(hr.Messages) != 0 {
|
||||
t.Fatalf("empty room: want 0 messages, got %d", len(hr.Messages))
|
||||
}
|
||||
// The lazy ensure should have created the stream even though no message exists.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
if _, err := h.js.Stream(ctx, roomStreamName(roomID)); err != nil {
|
||||
t.Fatalf("lazy ensure should have created the stream: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryUnauthenticated verifies an unsigned request is rejected with 401
|
||||
// under enforce, before the handler runs.
|
||||
func TestRoomHistoryUnauthenticated(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, _ := h.seedPersistRoom(t)
|
||||
// No signing headers: plain GET against the enforce-mode control plane.
|
||||
req, err := http.NewRequest("GET", h.ts.URL+"/rooms/"+roomID+"/history", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("new request: %v", err)
|
||||
}
|
||||
code, body := do(t, req)
|
||||
if code != 401 {
|
||||
t.Fatalf("unauthenticated history: want 401, got %d (%s)", code, body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryNonMember verifies a registered user who is NOT a member of the
|
||||
// room is rejected with 403.
|
||||
func TestRoomHistoryNonMember(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, _ := h.seedPersistRoom(t)
|
||||
code, body, _ := h.getHistory(t, h.carol, roomID, "", 14)
|
||||
if code != 403 {
|
||||
t.Fatalf("non-member history: want 403, got %d (%s)", code, body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryRoomNotFound verifies a request for a non-existent room is a 404,
|
||||
// distinct from the 403 a non-member of an existing room gets.
|
||||
func TestRoomHistoryRoomNotFound(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
code, body, _ := h.getHistory(t, h.alice, newULID(), "", 15)
|
||||
if code != 404 {
|
||||
t.Fatalf("missing room history: want 404, got %d (%s)", code, body)
|
||||
}
|
||||
}
|
||||
|
||||
// waitMsgs polls the room's stream until it holds at least want messages or a short
|
||||
// deadline elapses, so a core-NATS publish (which carries no stream ack) is observed
|
||||
// deterministically without a fixed sleep.
|
||||
func (h *historyHarness) waitMsgs(t *testing.T, roomID string, want uint64) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(3 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
st, err := h.js.Stream(ctx, roomStreamName(roomID))
|
||||
if err == nil {
|
||||
si, ierr := st.Info(ctx)
|
||||
if ierr == nil && si.State.Msgs >= want {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("stream for room %s never reached %d message(s)", roomID, want)
|
||||
}
|
||||
@@ -109,6 +109,21 @@ type Server struct {
|
||||
// the RemoteAddr-only behavior that predates the flag. Set by the command via
|
||||
// SetTrustedProxies. See clientIP.
|
||||
trustedProxies trustedProxyMatcher
|
||||
|
||||
// js is the privileged JetStream context the server uses to own the durable
|
||||
// per-room streams of persisted rooms: it ensures a room's stream on creation
|
||||
// so the room's subject is captured from the first message — even from a
|
||||
// JetStream-less browser client (uniweb) that speaks only core NATS — and reads
|
||||
// it back for GET /rooms/{id}/history. It is wired by the command via
|
||||
// SetJetStream whenever a JetStream-capable data plane is available (always for
|
||||
// the embedded server). nil leaves history empty and stream-ensure a no-op,
|
||||
// preserving the pre-feature behavior for a deployment without JetStream.
|
||||
js jetstream.JetStream
|
||||
// streamReplicas is the replication factor for the room streams the server
|
||||
// creates, matched to the cluster's control-plane (KV) replication — 1 for a
|
||||
// standalone node, up to 3 in an HA cluster — so a persisted room's history is
|
||||
// as available as its metadata. Used only when js != nil. See SetJetStream.
|
||||
streamReplicas int
|
||||
}
|
||||
|
||||
// Posture describes the security posture a membershipd node runs with. It is
|
||||
@@ -143,6 +158,19 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
|
||||
return s
|
||||
}
|
||||
|
||||
// SetJetStream wires the privileged JetStream context (and the room-stream
|
||||
// replication factor) the server uses to ensure and read the durable streams of
|
||||
// persisted rooms. replicas below 1 is clamped to 1. It must be called once at
|
||||
// startup, before the server begins serving; leaving it unset keeps history empty
|
||||
// and stream-ensure a no-op, the behavior for a deployment without JetStream.
|
||||
func (s *Server) SetJetStream(js jetstream.JetStream, replicas int) {
|
||||
if replicas < 1 {
|
||||
replicas = 1
|
||||
}
|
||||
s.js = js
|
||||
s.streamReplicas = replicas
|
||||
}
|
||||
|
||||
// UseReplicatedNonces switches the server's anti-replay store from the
|
||||
// per-process in-memory cache to a JetStream KV bucket shared across the cluster
|
||||
// (issue 0003e). It MUST be called on every node of a multi-node deployment:
|
||||
@@ -403,6 +431,13 @@ func (s *Server) routes() {
|
||||
s.mux.HandleFunc("POST /rooms/{id}/invite", s.handleInvite)
|
||||
s.mux.HandleFunc("GET /rooms/{id}/key", s.handleGetKey)
|
||||
s.mux.HandleFunc("GET /rooms/{id}/members", s.handleListMembers)
|
||||
// Durable message history for a persisted room, read server-side from the room's
|
||||
// JetStream stream so a client without JetStream (the browser client uniweb) can
|
||||
// load the backlog over plain HTTP. Member-only, like /key and /members.
|
||||
// Registered without the /api prefix like every other control-plane route: Caddy
|
||||
// strips /api via handle_path /api/* before forwarding, so the SPA's
|
||||
// GET /api/rooms/{id}/history arrives here as GET /rooms/{id}/history.
|
||||
s.mux.HandleFunc("GET /rooms/{id}/history", s.handleRoomHistory)
|
||||
s.mux.HandleFunc("GET /members/{endpoint}/rooms", s.handleListMemberRooms)
|
||||
s.mux.HandleFunc("POST /rooms/{id}/rekey", s.handleRekey)
|
||||
s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom)
|
||||
@@ -632,6 +667,21 @@ func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {
|
||||
SignMsgs: req.Policy.SignMsgs,
|
||||
OwnerEndpoint: req.Owner.Endpoint,
|
||||
}
|
||||
// Own the durable stream for a persisted room (issue room-history): ensure it
|
||||
// BEFORE the room row is written so the subject is captured from the very first
|
||||
// message whoever publishes it — a Go client OR a JetStream-less browser client.
|
||||
// Done first so a stream failure aborts cleanly with no orphan room row (the
|
||||
// rare orphan empty stream it can leave is harmless and idempotently reused).
|
||||
// Skipped when no JetStream is wired: the room still works, just without history.
|
||||
if info.Persist && s.js != nil {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), historyOpTimeout)
|
||||
err := ensureRoomStream(ctx, s.js, roomID, info.Subject, s.streamReplicas)
|
||||
cancel()
|
||||
if err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := s.store.CreateRoom(info, req.Owner.SignPub, req.Owner.KexPub, req.SealedKeySelf); err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user