Files

729 lines
22 KiB
Go

// Package client is the unibus client library: the single API that every peer
// (process worker, human chat UI, LLM agent) uses to talk to the bus.
//
// It hides the two planes behind one object:
// - control plane (HTTP to membershipd): create/join rooms, invite, fetch
// sealed keys, rekey on kick.
// - data plane (NATS): publish/subscribe/request/reply of frames.
//
// In encrypted rooms it transparently seals/opens payloads with the room key K,
// distributes K to invitees via sealed boxes, signs and verifies messages, and
// rotates K to a new epoch on kick (forward secrecy). All crypto comes from the
// fn-registry cybersecurity package; this library never reimplements primitives.
package client
import (
"bytes"
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/room"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// Endpoint is the public identity of a peer.
type Endpoint struct {
ID string
SignPub []byte
KexPub []byte
}
// Client is a connected unibus peer.
type Client struct {
id cs.Identity
endpoint string
nc *nats.Conn
js jetstream.JetStream // durable plane for rooms with Policy.Persist
ctrlURL string
http *http.Client
mu sync.RWMutex
keyCache map[string]map[int][]byte // roomID -> epoch -> K
signCache map[string][]byte // sender endpoint -> sign pub (for verification)
}
// New connects to NATS and records the control-plane URL. The identity holds
// the peer's long-term keypairs.
func New(natsURL, ctrlURL string, id cs.Identity) (*Client, error) {
nc, err := nats.Connect(natsURL, nats.Name("unibus-client"))
if err != nil {
return nil, fmt.Errorf("client: connect nats %q: %w", natsURL, err)
}
// JetStream context for the durable plane. Obtaining it does not require any
// stream to exist yet and has no effect on cleartext/ephemeral rooms — those
// keep using core nc.Publish / nc.Subscribe untouched.
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
return nil, fmt.Errorf("client: init jetstream: %w", err)
}
return &Client{
id: id,
endpoint: frame.EndpointID(id.SignPub),
nc: nc,
js: js,
ctrlURL: ctrlURL,
http: &http.Client{Timeout: 10 * time.Second},
keyCache: map[string]map[int][]byte{},
signCache: map[string][]byte{},
}, nil
}
// Endpoint returns this client's public identity.
func (c *Client) Endpoint() Endpoint {
return Endpoint{ID: c.endpoint, SignPub: c.id.SignPub, KexPub: c.id.KexPub}
}
// Close releases the NATS connection.
func (c *Client) Close() error {
c.nc.Close()
return nil
}
// ---- key cache ------------------------------------------------------------
func (c *Client) cacheKey(roomID string, epoch int, k []byte) {
c.mu.Lock()
defer c.mu.Unlock()
m := c.keyCache[roomID]
if m == nil {
m = map[int][]byte{}
c.keyCache[roomID] = m
}
m[epoch] = k
}
func (c *Client) getCachedKey(roomID string, epoch int) ([]byte, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
if m := c.keyCache[roomID]; m != nil {
k, ok := m[epoch]
return k, ok
}
return nil, false
}
// ---- control-plane HTTP helpers ------------------------------------------
func (c *Client) doJSON(method, path string, body, out any) error {
var rdr io.Reader
if body != nil {
b, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("client: marshal request: %w", err)
}
rdr = bytes.NewReader(b)
}
req, err := http.NewRequest(method, c.ctrlURL+path, rdr)
if err != nil {
return fmt.Errorf("client: new request: %w", err)
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := c.http.Do(req)
if err != nil {
return fmt.Errorf("client: do %s %s: %w", method, path, err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 300 {
// Surface the server's structured {"error": "..."} message when present,
// instead of leaking the raw HTTP envelope (method, path, status, JSON body).
var er struct {
Error string `json:"error"`
}
if json.Unmarshal(respBody, &er) == nil && er.Error != "" {
return fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode)
}
return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody))
}
if out != nil {
if err := json.Unmarshal(respBody, out); err != nil {
return fmt.Errorf("client: decode response: %w", err)
}
}
return nil
}
// signRequest signs the canonical bytes of req (req must already have its Sig
// field cleared) with the client's Ed25519 key. It is symmetric with the
// server's verifyOwnerSig.
func (c *Client) signRequest(req any) []byte {
b, _ := json.Marshal(req)
return cs.SignEd25519(c.id.SignPriv, b)
}
// ---- mirror of server wire types (control plane) -------------------------
type policyJSON struct {
Encrypt bool `json:"encrypt"`
Persist bool `json:"persist"`
SignMsgs bool `json:"sign_msgs"`
}
type endpointJSON struct {
Endpoint string `json:"endpoint"`
SignPub []byte `json:"sign_pub"`
KexPub []byte `json:"kex_pub"`
}
type createRoomReq struct {
Subject string `json:"subject"`
Policy policyJSON `json:"policy"`
Owner endpointJSON `json:"owner"`
SealedKeySelf []byte `json:"sealed_key_self"`
}
type createRoomResp struct {
RoomID string `json:"room_id"`
}
type inviteReq struct {
By string `json:"by"`
Sig []byte `json:"sig"`
Member endpointJSON `json:"member"`
SealedKey []byte `json:"sealed_key"`
}
type keyResp struct {
Epoch int `json:"epoch"`
SealedKey []byte `json:"sealed_key"`
}
type memberJSON struct {
Endpoint string `json:"endpoint"`
Role string `json:"role"`
SignPub []byte `json:"sign_pub"`
KexPub []byte `json:"kex_pub"`
}
type roomResp struct {
Subject string `json:"subject"`
Epoch int `json:"epoch"`
Policy policyJSON `json:"policy"`
}
type rekeyKey struct {
Endpoint string `json:"endpoint"`
SealedKey []byte `json:"sealed_key"`
}
type rekeyReq struct {
By string `json:"by"`
Sig []byte `json:"sig"`
NewEpoch int `json:"new_epoch"`
Keys []rekeyKey `json:"keys"`
Remove []string `json:"remove"`
}
type blobResp struct {
Hash string `json:"hash"`
}
// ---- room operations ------------------------------------------------------
// newRoomKey returns 32 random bytes for a symmetric room key.
func newRoomKey() ([]byte, error) {
k := make([]byte, 32)
if _, err := rand.Read(k); err != nil {
return nil, fmt.Errorf("client: generate room key: %w", err)
}
return k, nil
}
// CreateRoom creates a room with the given subject and policy. For encrypted
// rooms it generates K, seals K to itself, and caches K at epoch 1.
func (c *Client) CreateRoom(subject string, p room.Policy) (string, error) {
req := createRoomReq{
Subject: subject,
Policy: policyJSON{Encrypt: p.Encrypt, Persist: p.Persist, SignMsgs: p.SignMsgs},
Owner: endpointJSON{Endpoint: c.endpoint, SignPub: c.id.SignPub, KexPub: c.id.KexPub},
}
var k []byte
if p.Encrypt {
var err error
if k, err = newRoomKey(); err != nil {
return "", err
}
sealed, err := cs.SealKeyBox(c.id.KexPub, k)
if err != nil {
return "", fmt.Errorf("client: seal own key: %w", err)
}
req.SealedKeySelf = sealed
}
var resp createRoomResp
if err := c.doJSON("POST", "/rooms", req, &resp); err != nil {
return "", err
}
if p.Encrypt {
c.cacheKey(resp.RoomID, 1, k)
}
// For persisted rooms, provision the durable JetStream stream up front so the
// first publish (even before any subscriber exists) is captured for history.
if p.Persist {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := c.ensureStream(ctx, resp.RoomID, subject); err != nil {
return "", err
}
}
return resp.RoomID, nil
}
// Invite adds a member to a room. It seals the current-epoch room key to the
// invitee's X25519 public key and signs the request as the owner.
func (c *Client) Invite(roomID string, m Endpoint) error {
info, err := c.fetchRoom(roomID)
if err != nil {
return err
}
var sealed []byte
if info.Policy.Encrypt {
k, ok := c.getCachedKey(roomID, info.Epoch)
if !ok {
return fmt.Errorf("client: invite: no cached key for room %s epoch %d", roomID, info.Epoch)
}
if sealed, err = cs.SealKeyBox(m.KexPub, k); err != nil {
return fmt.Errorf("client: seal key for invitee: %w", err)
}
}
req := inviteReq{
By: c.endpoint,
Member: endpointJSON{Endpoint: m.ID, SignPub: m.SignPub, KexPub: m.KexPub},
SealedKey: sealed,
}
req.Sig = c.signRequest(req) // Sig is zero-valued at sign time
return c.doJSON("POST", "/rooms/"+roomID+"/invite", req, nil)
}
// roomView is the resolved room metadata.
type roomView struct {
Subject string
Epoch int
Policy room.Policy
}
func (c *Client) fetchRoom(roomID string) (roomView, error) {
var resp roomResp
if err := c.doJSON("GET", "/rooms/"+roomID, nil, &resp); err != nil {
return roomView{}, err
}
return roomView{
Subject: resp.Subject,
Epoch: resp.Epoch,
Policy: room.Policy{Encrypt: resp.Policy.Encrypt, Persist: resp.Policy.Persist, SignMsgs: resp.Policy.SignMsgs},
}, nil
}
// fetchKey retrieves and caches the room key K for the given epoch (epoch <= 0
// means latest). It opens the sealed box with the client's own X25519 keypair.
func (c *Client) fetchKey(roomID string, epoch int) ([]byte, int, error) {
if epoch > 0 {
if k, ok := c.getCachedKey(roomID, epoch); ok {
return k, epoch, nil
}
}
path := fmt.Sprintf("/rooms/%s/key?endpoint=%s", roomID, c.endpoint)
if epoch > 0 {
path += fmt.Sprintf("&epoch=%d", epoch)
}
var resp keyResp
if err := c.doJSON("GET", path, nil, &resp); err != nil {
return nil, 0, err
}
k, err := cs.OpenKeyBox(c.id.KexPub, c.id.KexPriv, resp.SealedKey)
if err != nil {
return nil, 0, fmt.Errorf("client: open room key: %w", err)
}
c.cacheKey(roomID, resp.Epoch, k)
return k, resp.Epoch, nil
}
// Join resolves room metadata and, for encrypted rooms, fetches and caches the
// current room key. It does not subscribe to NATS (use Subscribe for that).
func (c *Client) Join(roomID string) error {
info, err := c.fetchRoom(roomID)
if err != nil {
return err
}
if info.Policy.Encrypt {
if _, _, err := c.fetchKey(roomID, info.Epoch); err != nil {
return err
}
}
return nil
}
// signerPub returns the sign public key of a sender endpoint, fetching the
// member list once and caching it.
func (c *Client) signerPub(roomID, sender string) ([]byte, error) {
c.mu.RLock()
pub, ok := c.signCache[sender]
c.mu.RUnlock()
if ok {
return pub, nil
}
var members []memberJSON
if err := c.doJSON("GET", "/rooms/"+roomID+"/members", nil, &members); err != nil {
return nil, err
}
c.mu.Lock()
for _, m := range members {
c.signCache[m.Endpoint] = m.SignPub
}
pub, ok = c.signCache[sender]
c.mu.Unlock()
if !ok {
return nil, fmt.Errorf("client: no sign key for sender %q", sender)
}
return pub, nil
}
// ---- data plane: publish/subscribe ---------------------------------------
// Publish sends plaintext to a room. For encrypted rooms it seals the payload
// with the current K using the subject as AEAD additional-authenticated-data;
// for signed rooms it attaches an Ed25519 signature.
func (c *Client) Publish(roomID string, plaintext []byte) error {
info, err := c.fetchRoom(roomID)
if err != nil {
return err
}
f := frame.Frame{
Type: frame.PUB,
Subject: info.Subject,
Sender: c.endpoint,
MsgID: newULID(),
Epoch: info.Epoch,
}
if info.Policy.Encrypt {
k, ep, err := c.fetchKey(roomID, info.Epoch)
if err != nil {
return err
}
nonce, ct, err := cs.SealAEAD(k, plaintext, []byte(info.Subject))
if err != nil {
return fmt.Errorf("client: seal payload: %w", err)
}
f.Epoch, f.Nonce, f.Payload = ep, nonce, ct
} else {
f.Payload = plaintext
}
if info.Policy.SignMsgs {
f.Sig = cs.SignEd25519(c.id.SignPriv, f.SigningBytes())
}
b, err := f.Marshal()
if err != nil {
return fmt.Errorf("client: marshal frame: %w", err)
}
// Persisted rooms go through JetStream (durable, acked); ephemeral rooms keep
// the exact core-NATS publish they had before.
if info.Policy.Persist {
return c.publishPersistent(roomID, info.Subject, b)
}
return c.nc.Publish(info.Subject, b)
}
// Sub is a transport-agnostic handle to an active room subscription. It wraps
// either a core NATS subscription (ephemeral rooms) or a JetStream durable
// consumer (persisted rooms) behind a single Unsubscribe() method, so callers
// (and tests) treat both planes uniformly. For a persisted room, Unsubscribe
// stops local delivery but leaves the durable consumer's ack position on the
// server, so a later Subscribe with the same peer resumes (offline replay).
type Sub struct {
nsub *nats.Subscription
cc jetstream.ConsumeContext
}
// Unsubscribe stops delivery for this subscription. The error return keeps the
// signature compatible with *nats.Subscription's Unsubscribe.
func (s *Sub) Unsubscribe() error {
if s == nil {
return nil
}
if s.nsub != nil {
return s.nsub.Unsubscribe()
}
if s.cc != nil {
s.cc.Stop()
}
return nil
}
// Subscribe subscribes to a room and invokes handler for each message with the
// decoded frame and (for encrypted rooms) the decrypted plaintext. Signature
// verification and epoch-driven key refresh happen transparently. Messages that
// fail verification or decryption are dropped (handler not called).
//
// For ephemeral rooms (Policy.Persist == false) this is a plain core-NATS
// subscription, identical to before. For persisted rooms it binds a per-peer
// durable JetStream consumer with DeliverAll, so a late joiner receives the
// full history (scrollback) and a reconnecting peer resumes from its last ack
// (offline replay). The frame-decode / verify / decrypt logic is shared between
// both planes via processFrame.
func (c *Client) Subscribe(roomID string, handler func(f frame.Frame, plaintext []byte)) (*Sub, error) {
info, err := c.fetchRoom(roomID)
if err != nil {
return nil, err
}
deliver := func(data []byte) {
c.processFrame(roomID, info, data, handler)
}
if info.Policy.Persist {
cc, err := c.subscribePersistent(roomID, info.Subject, deliver)
if err != nil {
return nil, err
}
return &Sub{cc: cc}, nil
}
nsub, err := c.nc.Subscribe(info.Subject, func(msg *nats.Msg) {
deliver(msg.Data)
})
if err != nil {
return nil, err
}
return &Sub{nsub: nsub}, nil
}
// processFrame decodes one wire message, verifies its signature and (for
// encrypted rooms) decrypts its inline payload, then invokes handler. Messages
// that fail verification or decryption are dropped (handler not called). This
// is the single code path shared by the ephemeral and persisted subscribe
// planes so their decode/verify/decrypt semantics never drift apart.
func (c *Client) processFrame(roomID string, info roomView, data []byte, handler func(f frame.Frame, plaintext []byte)) {
f, err := frame.Unmarshal(data)
if err != nil {
return
}
if info.Policy.SignMsgs && f.Sig != nil {
pub, err := c.signerPub(roomID, f.Sender)
if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) {
return // unauthenticated frame: drop
}
}
plaintext := f.Payload
// Decrypt only inline payloads. Media frames carry their bytes in the
// blob store (referenced by f.Blob) with the nonce in BlobRef.Nonce;
// the handler decrypts those on demand via FetchMedia. A frame with an
// inline ciphertext always has a non-empty Nonce.
if info.Policy.Encrypt && len(f.Nonce) > 0 && len(f.Payload) > 0 {
k, ok := c.getCachedKey(roomID, f.Epoch)
if !ok {
// Sender used a newer (or unknown) epoch: refresh K from the control plane.
k, _, err = c.fetchKey(roomID, f.Epoch)
if err != nil {
// Cannot obtain the key for this epoch. For persisted history this is
// expected and NOT fatal: a member invited at a later epoch reading
// older history (or a kicked peer) simply cannot read those frames.
// Skip this message and keep processing the rest (megolm semantics:
// new members do not read prior history).
return
}
}
pt, err := cs.OpenAEAD(k, f.Nonce, f.Payload, []byte(info.Subject))
if err != nil {
return // cannot decrypt (wrong epoch/kicked): skip this frame, continue
}
plaintext = pt
}
handler(f, plaintext)
}
// ---- request/reply (cleartext v1) ----------------------------------------
// Request performs a NATS request/reply on subject (cleartext in v1, intended
// for rpc.* subjects).
func (c *Client) Request(subject string, plaintext []byte, timeout time.Duration) ([]byte, error) {
msg, err := c.nc.Request(subject, plaintext, timeout)
if err != nil {
return nil, fmt.Errorf("client: request %q: %w", subject, err)
}
return msg.Data, nil
}
// Reply registers a responder for subject; handler receives the request bytes
// and returns the reply bytes (cleartext in v1).
func (c *Client) Reply(subject string, handler func([]byte) []byte) (*nats.Subscription, error) {
return c.nc.Subscribe(subject, func(msg *nats.Msg) {
if msg.Reply == "" {
return
}
_ = c.nc.Publish(msg.Reply, handler(msg.Data))
})
}
// ---- kick / forward secrecy ----------------------------------------------
// Kick removes a member and rotates the room key to a new epoch, re-sealing it
// only for the remaining members. The kicked member can no longer decrypt
// messages published after the rotation (forward secrecy).
func (c *Client) Kick(roomID string, endpoint string) error {
info, err := c.fetchRoom(roomID)
if err != nil {
return err
}
newEpoch := info.Epoch + 1
if !info.Policy.Encrypt {
// Unencrypted room: just remove the member, no key rotation needed.
req := rekeyReq{By: c.endpoint, NewEpoch: newEpoch, Remove: []string{endpoint}}
req.Sig = c.signRequest(req)
return c.doJSON("POST", "/rooms/"+roomID+"/rekey", req, nil)
}
kPrime, err := newRoomKey()
if err != nil {
return err
}
var members []memberJSON
if err := c.doJSON("GET", "/rooms/"+roomID+"/members", nil, &members); err != nil {
return err
}
var keys []rekeyKey
for _, m := range members {
if m.Endpoint == endpoint {
continue // exclude the kicked member
}
sealed, err := cs.SealKeyBox(m.KexPub, kPrime)
if err != nil {
return fmt.Errorf("client: seal new key for %q: %w", m.Endpoint, err)
}
keys = append(keys, rekeyKey{Endpoint: m.Endpoint, SealedKey: sealed})
}
req := rekeyReq{By: c.endpoint, NewEpoch: newEpoch, Keys: keys, Remove: []string{endpoint}}
req.Sig = c.signRequest(req)
if err := c.doJSON("POST", "/rooms/"+roomID+"/rekey", req, nil); err != nil {
return err
}
c.cacheKey(roomID, newEpoch, kPrime)
return nil
}
// ---- media (object store) -------------------------------------------------
// PublishMedia encrypts data with the room key, uploads the ciphertext to the
// blob store, and publishes a frame carrying only a BlobRef. Receivers whose
// handler sees f.Blob != nil should GET /blobs/{hash} and OpenAEAD it.
func (c *Client) PublishMedia(roomID string, data []byte) error {
info, err := c.fetchRoom(roomID)
if err != nil {
return err
}
f := frame.Frame{
Type: frame.PUB,
Subject: info.Subject,
Sender: c.endpoint,
MsgID: newULID(),
Epoch: info.Epoch,
}
var ciphertext []byte
var nonce []byte
if info.Policy.Encrypt {
k, ep, err := c.fetchKey(roomID, info.Epoch)
if err != nil {
return err
}
nonce, ciphertext, err = cs.SealAEAD(k, data, []byte(info.Subject))
if err != nil {
return fmt.Errorf("client: seal media: %w", err)
}
f.Epoch = ep
} else {
ciphertext = data
}
hash, err := c.putBlob(ciphertext)
if err != nil {
return err
}
f.Blob = &frame.BlobRef{Hash: hash, Nonce: nonce, Size: int64(len(ciphertext))}
if info.Policy.SignMsgs {
f.Sig = cs.SignEd25519(c.id.SignPriv, f.SigningBytes())
}
b, err := f.Marshal()
if err != nil {
return fmt.Errorf("client: marshal media frame: %w", err)
}
// Persisted rooms route the media frame (the BlobRef envelope, not the blob
// bytes) through JetStream so it appears in history/replay like any other
// frame; ephemeral rooms keep the original core-NATS publish.
if info.Policy.Persist {
return c.publishPersistent(roomID, info.Subject, b)
}
return c.nc.Publish(info.Subject, b)
}
// FetchMedia downloads and (for encrypted rooms) decrypts a blob referenced by
// a received frame. It is a convenience for handlers that see f.Blob != nil.
func (c *Client) FetchMedia(roomID string, f frame.Frame) ([]byte, error) {
if f.Blob == nil {
return nil, fmt.Errorf("client: frame has no blob ref")
}
ct, err := c.getBlob(f.Blob.Hash)
if err != nil {
return nil, err
}
info, err := c.fetchRoom(roomID)
if err != nil {
return nil, err
}
if !info.Policy.Encrypt {
return ct, nil
}
k, ok := c.getCachedKey(roomID, f.Epoch)
if !ok {
if k, _, err = c.fetchKey(roomID, f.Epoch); err != nil {
return nil, err
}
}
return cs.OpenAEAD(k, f.Blob.Nonce, ct, []byte(info.Subject))
}
func (c *Client) putBlob(ciphertext []byte) (string, error) {
req, err := http.NewRequest("POST", c.ctrlURL+"/blobs", bytes.NewReader(ciphertext))
if err != nil {
return "", fmt.Errorf("client: new blob request: %w", err)
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := c.http.Do(req)
if err != nil {
return "", fmt.Errorf("client: put blob: %w", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 300 {
return "", fmt.Errorf("client: put blob -> %d: %s", resp.StatusCode, string(body))
}
var r blobResp
if err := json.Unmarshal(body, &r); err != nil {
return "", fmt.Errorf("client: decode blob resp: %w", err)
}
return r.Hash, nil
}
func (c *Client) getBlob(hash string) ([]byte, error) {
resp, err := c.http.Get(c.ctrlURL + "/blobs/" + hash)
if err != nil {
return nil, fmt.Errorf("client: get blob: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("client: get blob -> %d: %s", resp.StatusCode, string(body))
}
return io.ReadAll(resp.Body)
}