feat: optional per-room JetStream persistence (history + offline replay), gated by RoomPolicy.Persist

This commit is contained in:
agent
2026-06-03 21:48:55 +02:00
parent bb2e412744
commit 8c680bc002
8 changed files with 578 additions and 45 deletions
+124 -31
View File
@@ -14,6 +14,7 @@ package client
import (
"bytes"
"context"
"crypto/rand"
"encoding/json"
"fmt"
@@ -27,6 +28,7 @@ import (
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/room"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// Endpoint is the public identity of a peer.
@@ -41,6 +43,7 @@ type Client struct {
id cs.Identity
endpoint string
nc *nats.Conn
js jetstream.JetStream // durable plane for rooms with Policy.Persist
ctrlURL string
http *http.Client
@@ -56,10 +59,19 @@ func New(natsURL, ctrlURL string, id cs.Identity) (*Client, error) {
if err != nil {
return nil, fmt.Errorf("client: connect nats %q: %w", natsURL, err)
}
// JetStream context for the durable plane. Obtaining it does not require any
// stream to exist yet and has no effect on cleartext/ephemeral rooms — those
// keep using core nc.Publish / nc.Subscribe untouched.
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
return nil, fmt.Errorf("client: init jetstream: %w", err)
}
return &Client{
id: id,
endpoint: frame.EndpointID(id.SignPub),
nc: nc,
js: js,
ctrlURL: ctrlURL,
http: &http.Client{Timeout: 10 * time.Second},
keyCache: map[string]map[int][]byte{},
@@ -257,6 +269,15 @@ func (c *Client) CreateRoom(subject string, p room.Policy) (string, error) {
if p.Encrypt {
c.cacheKey(resp.RoomID, 1, k)
}
// For persisted rooms, provision the durable JetStream stream up front so the
// first publish (even before any subscriber exists) is captured for history.
if p.Persist {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := c.ensureStream(ctx, resp.RoomID, subject); err != nil {
return "", err
}
}
return resp.RoomID, nil
}
@@ -406,51 +427,117 @@ func (c *Client) Publish(roomID string, plaintext []byte) error {
if err != nil {
return fmt.Errorf("client: marshal frame: %w", err)
}
// Persisted rooms go through JetStream (durable, acked); ephemeral rooms keep
// the exact core-NATS publish they had before.
if info.Policy.Persist {
return c.publishPersistent(roomID, info.Subject, b)
}
return c.nc.Publish(info.Subject, b)
}
// Sub is a transport-agnostic handle to an active room subscription. It wraps
// either a core NATS subscription (ephemeral rooms) or a JetStream durable
// consumer (persisted rooms) behind a single Unsubscribe() method, so callers
// (and tests) treat both planes uniformly. For a persisted room, Unsubscribe
// stops local delivery but leaves the durable consumer's ack position on the
// server, so a later Subscribe with the same peer resumes (offline replay).
type Sub struct {
nsub *nats.Subscription
cc jetstream.ConsumeContext
}
// Unsubscribe stops delivery for this subscription. The error return keeps the
// signature compatible with *nats.Subscription's Unsubscribe.
func (s *Sub) Unsubscribe() error {
if s == nil {
return nil
}
if s.nsub != nil {
return s.nsub.Unsubscribe()
}
if s.cc != nil {
s.cc.Stop()
}
return nil
}
// Subscribe subscribes to a room and invokes handler for each message with the
// decoded frame and (for encrypted rooms) the decrypted plaintext. Signature
// verification and epoch-driven key refresh happen transparently. Messages that
// fail verification or decryption are dropped (handler not called).
func (c *Client) Subscribe(roomID string, handler func(f frame.Frame, plaintext []byte)) (*nats.Subscription, error) {
//
// For ephemeral rooms (Policy.Persist == false) this is a plain core-NATS
// subscription, identical to before. For persisted rooms it binds a per-peer
// durable JetStream consumer with DeliverAll, so a late joiner receives the
// full history (scrollback) and a reconnecting peer resumes from its last ack
// (offline replay). The frame-decode / verify / decrypt logic is shared between
// both planes via processFrame.
func (c *Client) Subscribe(roomID string, handler func(f frame.Frame, plaintext []byte)) (*Sub, error) {
info, err := c.fetchRoom(roomID)
if err != nil {
return nil, err
}
return c.nc.Subscribe(info.Subject, func(msg *nats.Msg) {
f, err := frame.Unmarshal(msg.Data)
deliver := func(data []byte) {
c.processFrame(roomID, info, data, handler)
}
if info.Policy.Persist {
cc, err := c.subscribePersistent(roomID, info.Subject, deliver)
if err != nil {
return
return nil, err
}
if info.Policy.SignMsgs && f.Sig != nil {
pub, err := c.signerPub(roomID, f.Sender)
if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) {
return // unauthenticated frame: drop
}
}
plaintext := f.Payload
// Decrypt only inline payloads. Media frames carry their bytes in the
// blob store (referenced by f.Blob) with the nonce in BlobRef.Nonce;
// the handler decrypts those on demand via FetchMedia. A frame with an
// inline ciphertext always has a non-empty Nonce.
if info.Policy.Encrypt && len(f.Nonce) > 0 && len(f.Payload) > 0 {
k, ok := c.getCachedKey(roomID, f.Epoch)
if !ok {
// Sender used a newer (or unknown) epoch: refresh K from the control plane.
k, _, err = c.fetchKey(roomID, f.Epoch)
if err != nil {
return // cannot obtain key for this epoch (e.g. we were kicked): drop
}
}
pt, err := cs.OpenAEAD(k, f.Nonce, f.Payload, []byte(info.Subject))
if err != nil {
return // cannot decrypt (wrong epoch/kicked): drop
}
plaintext = pt
}
handler(f, plaintext)
return &Sub{cc: cc}, nil
}
nsub, err := c.nc.Subscribe(info.Subject, func(msg *nats.Msg) {
deliver(msg.Data)
})
if err != nil {
return nil, err
}
return &Sub{nsub: nsub}, nil
}
// processFrame decodes one wire message, verifies its signature and (for
// encrypted rooms) decrypts its inline payload, then invokes handler. Messages
// that fail verification or decryption are dropped (handler not called). This
// is the single code path shared by the ephemeral and persisted subscribe
// planes so their decode/verify/decrypt semantics never drift apart.
func (c *Client) processFrame(roomID string, info roomView, data []byte, handler func(f frame.Frame, plaintext []byte)) {
f, err := frame.Unmarshal(data)
if err != nil {
return
}
if info.Policy.SignMsgs && f.Sig != nil {
pub, err := c.signerPub(roomID, f.Sender)
if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) {
return // unauthenticated frame: drop
}
}
plaintext := f.Payload
// Decrypt only inline payloads. Media frames carry their bytes in the
// blob store (referenced by f.Blob) with the nonce in BlobRef.Nonce;
// the handler decrypts those on demand via FetchMedia. A frame with an
// inline ciphertext always has a non-empty Nonce.
if info.Policy.Encrypt && len(f.Nonce) > 0 && len(f.Payload) > 0 {
k, ok := c.getCachedKey(roomID, f.Epoch)
if !ok {
// Sender used a newer (or unknown) epoch: refresh K from the control plane.
k, _, err = c.fetchKey(roomID, f.Epoch)
if err != nil {
// Cannot obtain the key for this epoch. For persisted history this is
// expected and NOT fatal: a member invited at a later epoch reading
// older history (or a kicked peer) simply cannot read those frames.
// Skip this message and keep processing the rest (megolm semantics:
// new members do not read prior history).
return
}
}
pt, err := cs.OpenAEAD(k, f.Nonce, f.Payload, []byte(info.Subject))
if err != nil {
return // cannot decrypt (wrong epoch/kicked): skip this frame, continue
}
plaintext = pt
}
handler(f, plaintext)
}
// ---- request/reply (cleartext v1) ----------------------------------------
@@ -570,6 +657,12 @@ func (c *Client) PublishMedia(roomID string, data []byte) error {
if err != nil {
return fmt.Errorf("client: marshal media frame: %w", err)
}
// Persisted rooms route the media frame (the BlobRef envelope, not the blob
// bytes) through JetStream so it appears in history/replay like any other
// frame; ephemeral rooms keep the original core-NATS publish.
if info.Policy.Persist {
return c.publishPersistent(roomID, info.Subject, b)
}
return c.nc.Publish(info.Subject, b)
}