22092834bd
Chat bots need replies, threads and reactions. Add two optional, omitempty envelope fields (ThreadID, ReplyTo) plus a REACT frame type. The fields ride the cleartext envelope (message-id references, not secret content) and are omitted when unset, so non-threaded frames are byte-for-byte identical on the wire and their signatures unchanged — a non-breaking, additive change. Client gains PublishReply (threaded reply) and React (emoji reaction). The reaction content travels in the payload, so it is sealed like any message and stays confidential in E2E rooms; receivers dispatch on Frame.Type == REACT and read Frame.ReplyTo for the target. Publish is refactored to share one publishFrame path with the new helpers; its behavior is unchanged. Tests: frame round-trip of a threaded REACT frame (golden), non-threaded wire/sig back-compat asserting thr/re keys are absent (edge), Unmarshal of garbage errors (error path), and an end-to-end reply+reaction round-trip in an encrypted ModeMatrix room. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
765 lines
24 KiB
Go
765 lines
24 KiB
Go
// Package client is the unibus client library: the single API that every peer
|
|
// (process worker, human chat UI, LLM agent) uses to talk to the bus.
|
|
//
|
|
// It hides the two planes behind one object:
|
|
// - control plane (HTTP to membershipd): create/join rooms, invite, fetch
|
|
// sealed keys, rekey on kick.
|
|
// - data plane (NATS): publish/subscribe/request/reply of frames.
|
|
//
|
|
// In encrypted rooms it transparently seals/opens payloads with the room key K,
|
|
// distributes K to invitees via sealed boxes, signs and verifies messages, and
|
|
// rotates K to a new epoch on kick (forward secrecy). All crypto comes from the
|
|
// fn-registry cybersecurity package; this library never reimplements primitives.
|
|
package client
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
cs "fn-registry/functions/cybersecurity"
|
|
|
|
"github.com/enmanuel/unibus/pkg/frame"
|
|
"github.com/enmanuel/unibus/pkg/room"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/nats-io/nats.go/jetstream"
|
|
)
|
|
|
|
// Endpoint is the public identity of a peer.
|
|
type Endpoint struct {
|
|
ID string
|
|
SignPub []byte
|
|
KexPub []byte
|
|
}
|
|
|
|
// Client is a connected unibus peer.
|
|
type Client struct {
|
|
id cs.Identity
|
|
endpoint string
|
|
nc *nats.Conn
|
|
js jetstream.JetStream // durable plane for rooms with Policy.Persist
|
|
ctrlURL string
|
|
http *http.Client
|
|
|
|
mu sync.RWMutex
|
|
keyCache map[string]map[int][]byte // roomID -> epoch -> K
|
|
signCache map[string][]byte // sender endpoint -> sign pub (for verification)
|
|
}
|
|
|
|
// New connects to NATS and records the control-plane URL. The identity holds
|
|
// the peer's long-term keypairs.
|
|
func New(natsURL, ctrlURL string, id cs.Identity) (*Client, error) {
|
|
nc, err := nats.Connect(natsURL, nats.Name("unibus-client"))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("client: connect nats %q: %w", natsURL, err)
|
|
}
|
|
// JetStream context for the durable plane. Obtaining it does not require any
|
|
// stream to exist yet and has no effect on cleartext/ephemeral rooms — those
|
|
// keep using core nc.Publish / nc.Subscribe untouched.
|
|
js, err := jetstream.New(nc)
|
|
if err != nil {
|
|
nc.Close()
|
|
return nil, fmt.Errorf("client: init jetstream: %w", err)
|
|
}
|
|
return &Client{
|
|
id: id,
|
|
endpoint: frame.EndpointID(id.SignPub),
|
|
nc: nc,
|
|
js: js,
|
|
ctrlURL: ctrlURL,
|
|
http: &http.Client{Timeout: 10 * time.Second},
|
|
keyCache: map[string]map[int][]byte{},
|
|
signCache: map[string][]byte{},
|
|
}, nil
|
|
}
|
|
|
|
// Endpoint returns this client's public identity.
|
|
func (c *Client) Endpoint() Endpoint {
|
|
return Endpoint{ID: c.endpoint, SignPub: c.id.SignPub, KexPub: c.id.KexPub}
|
|
}
|
|
|
|
// Close releases the NATS connection.
|
|
func (c *Client) Close() error {
|
|
c.nc.Close()
|
|
return nil
|
|
}
|
|
|
|
// ---- key cache ------------------------------------------------------------
|
|
|
|
func (c *Client) cacheKey(roomID string, epoch int, k []byte) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
m := c.keyCache[roomID]
|
|
if m == nil {
|
|
m = map[int][]byte{}
|
|
c.keyCache[roomID] = m
|
|
}
|
|
m[epoch] = k
|
|
}
|
|
|
|
func (c *Client) getCachedKey(roomID string, epoch int) ([]byte, bool) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
if m := c.keyCache[roomID]; m != nil {
|
|
k, ok := m[epoch]
|
|
return k, ok
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
// ---- control-plane HTTP helpers ------------------------------------------
|
|
|
|
func (c *Client) doJSON(method, path string, body, out any) error {
|
|
var rdr io.Reader
|
|
if body != nil {
|
|
b, err := json.Marshal(body)
|
|
if err != nil {
|
|
return fmt.Errorf("client: marshal request: %w", err)
|
|
}
|
|
rdr = bytes.NewReader(b)
|
|
}
|
|
req, err := http.NewRequest(method, c.ctrlURL+path, rdr)
|
|
if err != nil {
|
|
return fmt.Errorf("client: new request: %w", err)
|
|
}
|
|
if body != nil {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
}
|
|
resp, err := c.http.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("client: do %s %s: %w", method, path, err)
|
|
}
|
|
defer resp.Body.Close()
|
|
respBody, _ := io.ReadAll(resp.Body)
|
|
if resp.StatusCode >= 300 {
|
|
// Surface the server's structured {"error": "..."} message when present,
|
|
// instead of leaking the raw HTTP envelope (method, path, status, JSON body).
|
|
var er struct {
|
|
Error string `json:"error"`
|
|
}
|
|
if json.Unmarshal(respBody, &er) == nil && er.Error != "" {
|
|
return fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode)
|
|
}
|
|
return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody))
|
|
}
|
|
if out != nil {
|
|
if err := json.Unmarshal(respBody, out); err != nil {
|
|
return fmt.Errorf("client: decode response: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// signRequest signs the canonical bytes of req (req must already have its Sig
|
|
// field cleared) with the client's Ed25519 key. It is symmetric with the
|
|
// server's verifyOwnerSig.
|
|
func (c *Client) signRequest(req any) []byte {
|
|
b, _ := json.Marshal(req)
|
|
return cs.SignEd25519(c.id.SignPriv, b)
|
|
}
|
|
|
|
// ---- mirror of server wire types (control plane) -------------------------
|
|
|
|
type policyJSON struct {
|
|
Encrypt bool `json:"encrypt"`
|
|
Persist bool `json:"persist"`
|
|
SignMsgs bool `json:"sign_msgs"`
|
|
}
|
|
|
|
type endpointJSON struct {
|
|
Endpoint string `json:"endpoint"`
|
|
SignPub []byte `json:"sign_pub"`
|
|
KexPub []byte `json:"kex_pub"`
|
|
}
|
|
|
|
type createRoomReq struct {
|
|
Subject string `json:"subject"`
|
|
Policy policyJSON `json:"policy"`
|
|
Owner endpointJSON `json:"owner"`
|
|
SealedKeySelf []byte `json:"sealed_key_self"`
|
|
}
|
|
|
|
type createRoomResp struct {
|
|
RoomID string `json:"room_id"`
|
|
}
|
|
|
|
type inviteReq struct {
|
|
By string `json:"by"`
|
|
Sig []byte `json:"sig"`
|
|
Member endpointJSON `json:"member"`
|
|
SealedKey []byte `json:"sealed_key"`
|
|
}
|
|
|
|
type keyResp struct {
|
|
Epoch int `json:"epoch"`
|
|
SealedKey []byte `json:"sealed_key"`
|
|
}
|
|
|
|
type memberJSON struct {
|
|
Endpoint string `json:"endpoint"`
|
|
Role string `json:"role"`
|
|
SignPub []byte `json:"sign_pub"`
|
|
KexPub []byte `json:"kex_pub"`
|
|
}
|
|
|
|
type roomResp struct {
|
|
Subject string `json:"subject"`
|
|
Epoch int `json:"epoch"`
|
|
Policy policyJSON `json:"policy"`
|
|
}
|
|
|
|
type rekeyKey struct {
|
|
Endpoint string `json:"endpoint"`
|
|
SealedKey []byte `json:"sealed_key"`
|
|
}
|
|
|
|
type rekeyReq struct {
|
|
By string `json:"by"`
|
|
Sig []byte `json:"sig"`
|
|
NewEpoch int `json:"new_epoch"`
|
|
Keys []rekeyKey `json:"keys"`
|
|
Remove []string `json:"remove"`
|
|
}
|
|
|
|
type blobResp struct {
|
|
Hash string `json:"hash"`
|
|
}
|
|
|
|
// ---- room operations ------------------------------------------------------
|
|
|
|
// newRoomKey returns 32 random bytes for a symmetric room key.
|
|
func newRoomKey() ([]byte, error) {
|
|
k := make([]byte, 32)
|
|
if _, err := rand.Read(k); err != nil {
|
|
return nil, fmt.Errorf("client: generate room key: %w", err)
|
|
}
|
|
return k, nil
|
|
}
|
|
|
|
// CreateRoom creates a room with the given subject and policy. For encrypted
|
|
// rooms it generates K, seals K to itself, and caches K at epoch 1.
|
|
func (c *Client) CreateRoom(subject string, p room.Policy) (string, error) {
|
|
req := createRoomReq{
|
|
Subject: subject,
|
|
Policy: policyJSON{Encrypt: p.Encrypt, Persist: p.Persist, SignMsgs: p.SignMsgs},
|
|
Owner: endpointJSON{Endpoint: c.endpoint, SignPub: c.id.SignPub, KexPub: c.id.KexPub},
|
|
}
|
|
var k []byte
|
|
if p.Encrypt {
|
|
var err error
|
|
if k, err = newRoomKey(); err != nil {
|
|
return "", err
|
|
}
|
|
sealed, err := cs.SealKeyBox(c.id.KexPub, k)
|
|
if err != nil {
|
|
return "", fmt.Errorf("client: seal own key: %w", err)
|
|
}
|
|
req.SealedKeySelf = sealed
|
|
}
|
|
var resp createRoomResp
|
|
if err := c.doJSON("POST", "/rooms", req, &resp); err != nil {
|
|
return "", err
|
|
}
|
|
if p.Encrypt {
|
|
c.cacheKey(resp.RoomID, 1, k)
|
|
}
|
|
// For persisted rooms, provision the durable JetStream stream up front so the
|
|
// first publish (even before any subscriber exists) is captured for history.
|
|
if p.Persist {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := c.ensureStream(ctx, resp.RoomID, subject); err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
return resp.RoomID, nil
|
|
}
|
|
|
|
// Invite adds a member to a room. It seals the current-epoch room key to the
|
|
// invitee's X25519 public key and signs the request as the owner.
|
|
func (c *Client) Invite(roomID string, m Endpoint) error {
|
|
info, err := c.fetchRoom(roomID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var sealed []byte
|
|
if info.Policy.Encrypt {
|
|
k, ok := c.getCachedKey(roomID, info.Epoch)
|
|
if !ok {
|
|
return fmt.Errorf("client: invite: no cached key for room %s epoch %d", roomID, info.Epoch)
|
|
}
|
|
if sealed, err = cs.SealKeyBox(m.KexPub, k); err != nil {
|
|
return fmt.Errorf("client: seal key for invitee: %w", err)
|
|
}
|
|
}
|
|
req := inviteReq{
|
|
By: c.endpoint,
|
|
Member: endpointJSON{Endpoint: m.ID, SignPub: m.SignPub, KexPub: m.KexPub},
|
|
SealedKey: sealed,
|
|
}
|
|
req.Sig = c.signRequest(req) // Sig is zero-valued at sign time
|
|
return c.doJSON("POST", "/rooms/"+roomID+"/invite", req, nil)
|
|
}
|
|
|
|
// roomView is the resolved room metadata.
|
|
type roomView struct {
|
|
Subject string
|
|
Epoch int
|
|
Policy room.Policy
|
|
}
|
|
|
|
func (c *Client) fetchRoom(roomID string) (roomView, error) {
|
|
var resp roomResp
|
|
if err := c.doJSON("GET", "/rooms/"+roomID, nil, &resp); err != nil {
|
|
return roomView{}, err
|
|
}
|
|
return roomView{
|
|
Subject: resp.Subject,
|
|
Epoch: resp.Epoch,
|
|
Policy: room.Policy{Encrypt: resp.Policy.Encrypt, Persist: resp.Policy.Persist, SignMsgs: resp.Policy.SignMsgs},
|
|
}, nil
|
|
}
|
|
|
|
// fetchKey retrieves and caches the room key K for the given epoch (epoch <= 0
|
|
// means latest). It opens the sealed box with the client's own X25519 keypair.
|
|
func (c *Client) fetchKey(roomID string, epoch int) ([]byte, int, error) {
|
|
if epoch > 0 {
|
|
if k, ok := c.getCachedKey(roomID, epoch); ok {
|
|
return k, epoch, nil
|
|
}
|
|
}
|
|
path := fmt.Sprintf("/rooms/%s/key?endpoint=%s", roomID, c.endpoint)
|
|
if epoch > 0 {
|
|
path += fmt.Sprintf("&epoch=%d", epoch)
|
|
}
|
|
var resp keyResp
|
|
if err := c.doJSON("GET", path, nil, &resp); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
k, err := cs.OpenKeyBox(c.id.KexPub, c.id.KexPriv, resp.SealedKey)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("client: open room key: %w", err)
|
|
}
|
|
c.cacheKey(roomID, resp.Epoch, k)
|
|
return k, resp.Epoch, nil
|
|
}
|
|
|
|
// Join resolves room metadata and, for encrypted rooms, fetches and caches the
|
|
// current room key. It does not subscribe to NATS (use Subscribe for that).
|
|
func (c *Client) Join(roomID string) error {
|
|
info, err := c.fetchRoom(roomID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if info.Policy.Encrypt {
|
|
if _, _, err := c.fetchKey(roomID, info.Epoch); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// signerPub returns the sign public key of a sender endpoint, fetching the
|
|
// member list once and caching it.
|
|
func (c *Client) signerPub(roomID, sender string) ([]byte, error) {
|
|
c.mu.RLock()
|
|
pub, ok := c.signCache[sender]
|
|
c.mu.RUnlock()
|
|
if ok {
|
|
return pub, nil
|
|
}
|
|
var members []memberJSON
|
|
if err := c.doJSON("GET", "/rooms/"+roomID+"/members", nil, &members); err != nil {
|
|
return nil, err
|
|
}
|
|
c.mu.Lock()
|
|
for _, m := range members {
|
|
c.signCache[m.Endpoint] = m.SignPub
|
|
}
|
|
pub, ok = c.signCache[sender]
|
|
c.mu.Unlock()
|
|
if !ok {
|
|
return nil, fmt.Errorf("client: no sign key for sender %q", sender)
|
|
}
|
|
return pub, nil
|
|
}
|
|
|
|
// ---- data plane: publish/subscribe ---------------------------------------
|
|
|
|
// threadMeta carries the optional threading/reaction routing of a published
|
|
// frame. The zero value yields a plain top-level message whose wire bytes are
|
|
// identical to a pre-threading frame (the fields are omitempty).
|
|
type threadMeta struct {
|
|
threadID string // thread root message id
|
|
replyTo string // message id being replied to / reacted to
|
|
}
|
|
|
|
// publishFrame is the single publish path shared by Publish, PublishReply and
|
|
// React. It builds the envelope, seals+signs per the room policy, and routes
|
|
// through JetStream (persisted rooms) or core NATS (ephemeral rooms). The only
|
|
// thing the callers vary is the frame type and the threading metadata.
|
|
func (c *Client) publishFrame(roomID string, ftype frame.FrameType, plaintext []byte, tm threadMeta) error {
|
|
info, err := c.fetchRoom(roomID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
f := frame.Frame{
|
|
Type: ftype,
|
|
Subject: info.Subject,
|
|
Sender: c.endpoint,
|
|
MsgID: newULID(),
|
|
Epoch: info.Epoch,
|
|
ThreadID: tm.threadID,
|
|
ReplyTo: tm.replyTo,
|
|
}
|
|
if info.Policy.Encrypt {
|
|
k, ep, err := c.fetchKey(roomID, info.Epoch)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
nonce, ct, err := cs.SealAEAD(k, plaintext, []byte(info.Subject))
|
|
if err != nil {
|
|
return fmt.Errorf("client: seal payload: %w", err)
|
|
}
|
|
f.Epoch, f.Nonce, f.Payload = ep, nonce, ct
|
|
} else {
|
|
f.Payload = plaintext
|
|
}
|
|
if info.Policy.SignMsgs {
|
|
f.Sig = cs.SignEd25519(c.id.SignPriv, f.SigningBytes())
|
|
}
|
|
b, err := f.Marshal()
|
|
if err != nil {
|
|
return fmt.Errorf("client: marshal frame: %w", err)
|
|
}
|
|
// Persisted rooms go through JetStream (durable, acked); ephemeral rooms keep
|
|
// the exact core-NATS publish they had before.
|
|
if info.Policy.Persist {
|
|
return c.publishPersistent(roomID, info.Subject, b)
|
|
}
|
|
return c.nc.Publish(info.Subject, b)
|
|
}
|
|
|
|
// Publish sends plaintext to a room. For encrypted rooms it seals the payload
|
|
// with the current K using the subject as AEAD additional-authenticated-data;
|
|
// for signed rooms it attaches an Ed25519 signature.
|
|
func (c *Client) Publish(roomID string, plaintext []byte) error {
|
|
return c.publishFrame(roomID, frame.PUB, plaintext, threadMeta{})
|
|
}
|
|
|
|
// PublishReply sends plaintext as a reply inside a thread. replyTo is the id of
|
|
// the message being replied to; threadID is the thread root — pass replyTo when
|
|
// you are starting a new thread off a top-level message, or the existing
|
|
// ThreadID to keep replying within one. Encryption and signing are identical to
|
|
// Publish; the threading metadata rides the cleartext envelope. Receivers read
|
|
// Frame.ReplyTo / Frame.ThreadID to render the conversation tree.
|
|
func (c *Client) PublishReply(roomID string, plaintext []byte, replyTo, threadID string) error {
|
|
return c.publishFrame(roomID, frame.PUB, plaintext, threadMeta{threadID: threadID, replyTo: replyTo})
|
|
}
|
|
|
|
// React publishes a reaction (emoji/shortcode) to a target message. The reaction
|
|
// content travels in the payload, so it is sealed exactly like a normal message
|
|
// and stays confidential in E2E rooms. Receivers dispatch on Frame.Type ==
|
|
// frame.REACT and read Frame.ReplyTo for the message being reacted to.
|
|
func (c *Client) React(roomID, targetMsgID, emoji string) error {
|
|
return c.publishFrame(roomID, frame.REACT, []byte(emoji), threadMeta{replyTo: targetMsgID})
|
|
}
|
|
|
|
// Sub is a transport-agnostic handle to an active room subscription. It wraps
|
|
// either a core NATS subscription (ephemeral rooms) or a JetStream durable
|
|
// consumer (persisted rooms) behind a single Unsubscribe() method, so callers
|
|
// (and tests) treat both planes uniformly. For a persisted room, Unsubscribe
|
|
// stops local delivery but leaves the durable consumer's ack position on the
|
|
// server, so a later Subscribe with the same peer resumes (offline replay).
|
|
type Sub struct {
|
|
nsub *nats.Subscription
|
|
cc jetstream.ConsumeContext
|
|
}
|
|
|
|
// Unsubscribe stops delivery for this subscription. The error return keeps the
|
|
// signature compatible with *nats.Subscription's Unsubscribe.
|
|
func (s *Sub) Unsubscribe() error {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
if s.nsub != nil {
|
|
return s.nsub.Unsubscribe()
|
|
}
|
|
if s.cc != nil {
|
|
s.cc.Stop()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Subscribe subscribes to a room and invokes handler for each message with the
|
|
// decoded frame and (for encrypted rooms) the decrypted plaintext. Signature
|
|
// verification and epoch-driven key refresh happen transparently. Messages that
|
|
// fail verification or decryption are dropped (handler not called).
|
|
//
|
|
// For ephemeral rooms (Policy.Persist == false) this is a plain core-NATS
|
|
// subscription, identical to before. For persisted rooms it binds a per-peer
|
|
// durable JetStream consumer with DeliverAll, so a late joiner receives the
|
|
// full history (scrollback) and a reconnecting peer resumes from its last ack
|
|
// (offline replay). The frame-decode / verify / decrypt logic is shared between
|
|
// both planes via processFrame.
|
|
func (c *Client) Subscribe(roomID string, handler func(f frame.Frame, plaintext []byte)) (*Sub, error) {
|
|
info, err := c.fetchRoom(roomID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
deliver := func(data []byte) {
|
|
c.processFrame(roomID, info, data, handler)
|
|
}
|
|
if info.Policy.Persist {
|
|
cc, err := c.subscribePersistent(roomID, info.Subject, deliver)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Sub{cc: cc}, nil
|
|
}
|
|
nsub, err := c.nc.Subscribe(info.Subject, func(msg *nats.Msg) {
|
|
deliver(msg.Data)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Sub{nsub: nsub}, nil
|
|
}
|
|
|
|
// processFrame decodes one wire message, verifies its signature and (for
|
|
// encrypted rooms) decrypts its inline payload, then invokes handler. Messages
|
|
// that fail verification or decryption are dropped (handler not called). This
|
|
// is the single code path shared by the ephemeral and persisted subscribe
|
|
// planes so their decode/verify/decrypt semantics never drift apart.
|
|
func (c *Client) processFrame(roomID string, info roomView, data []byte, handler func(f frame.Frame, plaintext []byte)) {
|
|
f, err := frame.Unmarshal(data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if info.Policy.SignMsgs && f.Sig != nil {
|
|
pub, err := c.signerPub(roomID, f.Sender)
|
|
if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) {
|
|
return // unauthenticated frame: drop
|
|
}
|
|
}
|
|
plaintext := f.Payload
|
|
// Decrypt only inline payloads. Media frames carry their bytes in the
|
|
// blob store (referenced by f.Blob) with the nonce in BlobRef.Nonce;
|
|
// the handler decrypts those on demand via FetchMedia. A frame with an
|
|
// inline ciphertext always has a non-empty Nonce.
|
|
if info.Policy.Encrypt && len(f.Nonce) > 0 && len(f.Payload) > 0 {
|
|
k, ok := c.getCachedKey(roomID, f.Epoch)
|
|
if !ok {
|
|
// Sender used a newer (or unknown) epoch: refresh K from the control plane.
|
|
k, _, err = c.fetchKey(roomID, f.Epoch)
|
|
if err != nil {
|
|
// Cannot obtain the key for this epoch. For persisted history this is
|
|
// expected and NOT fatal: a member invited at a later epoch reading
|
|
// older history (or a kicked peer) simply cannot read those frames.
|
|
// Skip this message and keep processing the rest (megolm semantics:
|
|
// new members do not read prior history).
|
|
return
|
|
}
|
|
}
|
|
pt, err := cs.OpenAEAD(k, f.Nonce, f.Payload, []byte(info.Subject))
|
|
if err != nil {
|
|
return // cannot decrypt (wrong epoch/kicked): skip this frame, continue
|
|
}
|
|
plaintext = pt
|
|
}
|
|
handler(f, plaintext)
|
|
}
|
|
|
|
// ---- request/reply (cleartext v1) ----------------------------------------
|
|
|
|
// Request performs a NATS request/reply on subject (cleartext in v1, intended
|
|
// for rpc.* subjects).
|
|
func (c *Client) Request(subject string, plaintext []byte, timeout time.Duration) ([]byte, error) {
|
|
msg, err := c.nc.Request(subject, plaintext, timeout)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("client: request %q: %w", subject, err)
|
|
}
|
|
return msg.Data, nil
|
|
}
|
|
|
|
// Reply registers a responder for subject; handler receives the request bytes
|
|
// and returns the reply bytes (cleartext in v1).
|
|
func (c *Client) Reply(subject string, handler func([]byte) []byte) (*nats.Subscription, error) {
|
|
return c.nc.Subscribe(subject, func(msg *nats.Msg) {
|
|
if msg.Reply == "" {
|
|
return
|
|
}
|
|
_ = c.nc.Publish(msg.Reply, handler(msg.Data))
|
|
})
|
|
}
|
|
|
|
// ---- kick / forward secrecy ----------------------------------------------
|
|
|
|
// Kick removes a member and rotates the room key to a new epoch, re-sealing it
|
|
// only for the remaining members. The kicked member can no longer decrypt
|
|
// messages published after the rotation (forward secrecy).
|
|
func (c *Client) Kick(roomID string, endpoint string) error {
|
|
info, err := c.fetchRoom(roomID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newEpoch := info.Epoch + 1
|
|
|
|
if !info.Policy.Encrypt {
|
|
// Unencrypted room: just remove the member, no key rotation needed.
|
|
req := rekeyReq{By: c.endpoint, NewEpoch: newEpoch, Remove: []string{endpoint}}
|
|
req.Sig = c.signRequest(req)
|
|
return c.doJSON("POST", "/rooms/"+roomID+"/rekey", req, nil)
|
|
}
|
|
|
|
kPrime, err := newRoomKey()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var members []memberJSON
|
|
if err := c.doJSON("GET", "/rooms/"+roomID+"/members", nil, &members); err != nil {
|
|
return err
|
|
}
|
|
var keys []rekeyKey
|
|
for _, m := range members {
|
|
if m.Endpoint == endpoint {
|
|
continue // exclude the kicked member
|
|
}
|
|
sealed, err := cs.SealKeyBox(m.KexPub, kPrime)
|
|
if err != nil {
|
|
return fmt.Errorf("client: seal new key for %q: %w", m.Endpoint, err)
|
|
}
|
|
keys = append(keys, rekeyKey{Endpoint: m.Endpoint, SealedKey: sealed})
|
|
}
|
|
req := rekeyReq{By: c.endpoint, NewEpoch: newEpoch, Keys: keys, Remove: []string{endpoint}}
|
|
req.Sig = c.signRequest(req)
|
|
if err := c.doJSON("POST", "/rooms/"+roomID+"/rekey", req, nil); err != nil {
|
|
return err
|
|
}
|
|
c.cacheKey(roomID, newEpoch, kPrime)
|
|
return nil
|
|
}
|
|
|
|
// ---- media (object store) -------------------------------------------------
|
|
|
|
// PublishMedia encrypts data with the room key, uploads the ciphertext to the
|
|
// blob store, and publishes a frame carrying only a BlobRef. Receivers whose
|
|
// handler sees f.Blob != nil should GET /blobs/{hash} and OpenAEAD it.
|
|
func (c *Client) PublishMedia(roomID string, data []byte) error {
|
|
info, err := c.fetchRoom(roomID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
f := frame.Frame{
|
|
Type: frame.PUB,
|
|
Subject: info.Subject,
|
|
Sender: c.endpoint,
|
|
MsgID: newULID(),
|
|
Epoch: info.Epoch,
|
|
}
|
|
|
|
var ciphertext []byte
|
|
var nonce []byte
|
|
if info.Policy.Encrypt {
|
|
k, ep, err := c.fetchKey(roomID, info.Epoch)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
nonce, ciphertext, err = cs.SealAEAD(k, data, []byte(info.Subject))
|
|
if err != nil {
|
|
return fmt.Errorf("client: seal media: %w", err)
|
|
}
|
|
f.Epoch = ep
|
|
} else {
|
|
ciphertext = data
|
|
}
|
|
|
|
hash, err := c.putBlob(ciphertext)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
f.Blob = &frame.BlobRef{Hash: hash, Nonce: nonce, Size: int64(len(ciphertext))}
|
|
|
|
if info.Policy.SignMsgs {
|
|
f.Sig = cs.SignEd25519(c.id.SignPriv, f.SigningBytes())
|
|
}
|
|
b, err := f.Marshal()
|
|
if err != nil {
|
|
return fmt.Errorf("client: marshal media frame: %w", err)
|
|
}
|
|
// Persisted rooms route the media frame (the BlobRef envelope, not the blob
|
|
// bytes) through JetStream so it appears in history/replay like any other
|
|
// frame; ephemeral rooms keep the original core-NATS publish.
|
|
if info.Policy.Persist {
|
|
return c.publishPersistent(roomID, info.Subject, b)
|
|
}
|
|
return c.nc.Publish(info.Subject, b)
|
|
}
|
|
|
|
// FetchMedia downloads and (for encrypted rooms) decrypts a blob referenced by
|
|
// a received frame. It is a convenience for handlers that see f.Blob != nil.
|
|
func (c *Client) FetchMedia(roomID string, f frame.Frame) ([]byte, error) {
|
|
if f.Blob == nil {
|
|
return nil, fmt.Errorf("client: frame has no blob ref")
|
|
}
|
|
ct, err := c.getBlob(f.Blob.Hash)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
info, err := c.fetchRoom(roomID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !info.Policy.Encrypt {
|
|
return ct, nil
|
|
}
|
|
k, ok := c.getCachedKey(roomID, f.Epoch)
|
|
if !ok {
|
|
if k, _, err = c.fetchKey(roomID, f.Epoch); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return cs.OpenAEAD(k, f.Blob.Nonce, ct, []byte(info.Subject))
|
|
}
|
|
|
|
func (c *Client) putBlob(ciphertext []byte) (string, error) {
|
|
req, err := http.NewRequest("POST", c.ctrlURL+"/blobs", bytes.NewReader(ciphertext))
|
|
if err != nil {
|
|
return "", fmt.Errorf("client: new blob request: %w", err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/octet-stream")
|
|
resp, err := c.http.Do(req)
|
|
if err != nil {
|
|
return "", fmt.Errorf("client: put blob: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
body, _ := io.ReadAll(resp.Body)
|
|
if resp.StatusCode >= 300 {
|
|
return "", fmt.Errorf("client: put blob -> %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
var r blobResp
|
|
if err := json.Unmarshal(body, &r); err != nil {
|
|
return "", fmt.Errorf("client: decode blob resp: %w", err)
|
|
}
|
|
return r.Hash, nil
|
|
}
|
|
|
|
func (c *Client) getBlob(hash string) ([]byte, error) {
|
|
resp, err := c.http.Get(c.ctrlURL + "/blobs/" + hash)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("client: get blob: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("client: get blob -> %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
return io.ReadAll(resp.Body)
|
|
}
|