Files

135 lines
5.1 KiB
Go

// This file adds optional per-room durable persistence on top of the core
// (ephemeral) NATS data plane. Persistence is gated entirely by the per-room
// RoomPolicy.Persist flag:
//
// - Persist == false (room.ModeNATS): nothing here runs. Publish/Subscribe
// use plain core NATS exactly as before — fast, fan-out, no history.
// - Persist == true (room.ModeMatrix): the room's subject is captured by a
// durable JetStream stream. Late joiners receive the full backlog
// (scrollback) and a peer that was offline resumes from its last ack
// (replay). The Frame, the encryption and the signature are identical to
// the ephemeral path; only the transport changes.
//
// We use the modern github.com/nats-io/nats.go/jetstream API throughout (not
// the legacy nc.JetStream() context), and we are consistent about it across
// the whole client.
package client
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/nats-io/nats.go/jetstream"
)
// streamName derives a JetStream stream name from a room id. JetStream stream
// (and durable consumer) names cannot contain '.', '*', '>', path separators
// or whitespace, so we sanitize any char outside [A-Za-z0-9_] to '_' and prefix
// with "UNIBUS_" to namespace the bus's streams.
func streamName(roomID string) string {
return "UNIBUS_" + sanitizeJSName(roomID)
}
// sanitizeJSName replaces every rune outside [A-Za-z0-9_] with '_' so the
// result is a legal JetStream stream/consumer name token.
func sanitizeJSName(s string) string {
var b strings.Builder
b.Grow(len(s))
for _, r := range s {
switch {
case r >= 'a' && r <= 'z', r >= 'A' && r <= 'Z', r >= '0' && r <= '9', r == '_':
b.WriteRune(r)
default:
b.WriteRune('_')
}
}
return b.String()
}
// durableName derives a per-peer durable consumer name for a room. The same
// (roomID, endpointID) pair always maps to the same durable, so a peer that
// reconnects continues from its last acked message → replay of what it missed.
func durableName(roomID, endpointID string) string {
return sanitizeJSName(roomID) + "_" + sanitizeJSName(endpointID)
}
// ensureStream idempotently creates (or no-ops on) a file-backed JetStream
// stream that captures the given subject for a persisted room. It is safe to
// call repeatedly: CreateOrUpdateStream returns the existing stream unchanged
// when the config matches. Called from CreateRoom(Persist), and defensively
// from the first Publish/Subscribe to a persisted room.
func (c *Client) ensureStream(ctx context.Context, roomID, subject string) error {
if c.js == nil {
return errors.New("client: JetStream not initialized")
}
cfg := jetstream.StreamConfig{
Name: streamName(roomID),
Subjects: []string{subject},
Retention: jetstream.LimitsPolicy,
Storage: jetstream.FileStorage,
}
if _, err := c.js.CreateOrUpdateStream(ctx, cfg); err != nil {
return fmt.Errorf("client: ensure stream for room %s: %w", roomID, err)
}
return nil
}
// publishPersistent publishes a frame to a persisted room via JetStream and
// waits for the server ack, guaranteeing the message is durably stored before
// returning. The stream is ensured first so the very first publish to a freshly
// created room (or after a restart) cannot be lost for lack of a stream.
func (c *Client) publishPersistent(roomID, subject string, frameBytes []byte) error {
if c.js == nil {
return errors.New("client: JetStream not initialized")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := c.ensureStream(ctx, roomID, subject); err != nil {
return err
}
if _, err := c.js.Publish(ctx, subject, frameBytes); err != nil {
return fmt.Errorf("client: jetstream publish to room %s: %w", roomID, err)
}
return nil
}
// subscribePersistent binds a per-peer durable consumer to the room's stream
// with DeliverAll + explicit ack. DeliverAll delivers the full backlog
// (scrollback) on first bind and then live messages; the durable name makes a
// reconnecting peer resume from its last ack (offline replay). Each delivered
// message is acked only after the supplied onMsg callback has processed it.
func (c *Client) subscribePersistent(roomID, subject string, onMsg func(data []byte)) (jetstream.ConsumeContext, error) {
if c.js == nil {
return nil, errors.New("client: JetStream not initialized")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := c.ensureStream(ctx, roomID, subject); err != nil {
return nil, err
}
stream, err := c.js.Stream(ctx, streamName(roomID))
if err != nil {
return nil, fmt.Errorf("client: lookup stream for room %s: %w", roomID, err)
}
cons, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: durableName(roomID, c.endpoint),
DeliverPolicy: jetstream.DeliverAllPolicy,
AckPolicy: jetstream.AckExplicitPolicy,
})
if err != nil {
return nil, fmt.Errorf("client: ensure durable consumer for room %s: %w", roomID, err)
}
cc, err := cons.Consume(func(msg jetstream.Msg) {
onMsg(msg.Data())
_ = msg.Ack()
})
if err != nil {
return nil, fmt.Errorf("client: consume room %s: %w", roomID, err)
}
return cc, nil
}