// Package client is the unibus client library: the single API that every peer // (process worker, human chat UI, LLM agent) uses to talk to the bus. // // It hides the two planes behind one object: // - control plane (HTTP to membershipd): create/join rooms, invite, fetch // sealed keys, rekey on kick. // - data plane (NATS): publish/subscribe/request/reply of frames. // // In encrypted rooms it transparently seals/opens payloads with the room key K, // distributes K to invitees via sealed boxes, signs and verifies messages, and // rotates K to a new epoch on kick (forward secrecy). All crypto comes from the // fn-registry cybersecurity package; this library never reimplements primitives. package client import ( "bytes" "crypto/rand" "encoding/json" "fmt" "io" "net/http" "sync" "time" cs "fn-registry/functions/cybersecurity" "github.com/enmanuel/unibus/pkg/frame" "github.com/enmanuel/unibus/pkg/room" "github.com/nats-io/nats.go" ) // Endpoint is the public identity of a peer. type Endpoint struct { ID string SignPub []byte KexPub []byte } // Client is a connected unibus peer. type Client struct { id cs.Identity endpoint string nc *nats.Conn ctrlURL string http *http.Client mu sync.RWMutex keyCache map[string]map[int][]byte // roomID -> epoch -> K signCache map[string][]byte // sender endpoint -> sign pub (for verification) } // New connects to NATS and records the control-plane URL. The identity holds // the peer's long-term keypairs. func New(natsURL, ctrlURL string, id cs.Identity) (*Client, error) { nc, err := nats.Connect(natsURL, nats.Name("unibus-client")) if err != nil { return nil, fmt.Errorf("client: connect nats %q: %w", natsURL, err) } return &Client{ id: id, endpoint: frame.EndpointID(id.SignPub), nc: nc, ctrlURL: ctrlURL, http: &http.Client{Timeout: 10 * time.Second}, keyCache: map[string]map[int][]byte{}, signCache: map[string][]byte{}, }, nil } // Endpoint returns this client's public identity. func (c *Client) Endpoint() Endpoint { return Endpoint{ID: c.endpoint, SignPub: c.id.SignPub, KexPub: c.id.KexPub} } // Close releases the NATS connection. func (c *Client) Close() error { c.nc.Close() return nil } // ---- key cache ------------------------------------------------------------ func (c *Client) cacheKey(roomID string, epoch int, k []byte) { c.mu.Lock() defer c.mu.Unlock() m := c.keyCache[roomID] if m == nil { m = map[int][]byte{} c.keyCache[roomID] = m } m[epoch] = k } func (c *Client) getCachedKey(roomID string, epoch int) ([]byte, bool) { c.mu.RLock() defer c.mu.RUnlock() if m := c.keyCache[roomID]; m != nil { k, ok := m[epoch] return k, ok } return nil, false } // ---- control-plane HTTP helpers ------------------------------------------ func (c *Client) doJSON(method, path string, body, out any) error { var rdr io.Reader if body != nil { b, err := json.Marshal(body) if err != nil { return fmt.Errorf("client: marshal request: %w", err) } rdr = bytes.NewReader(b) } req, err := http.NewRequest(method, c.ctrlURL+path, rdr) if err != nil { return fmt.Errorf("client: new request: %w", err) } if body != nil { req.Header.Set("Content-Type", "application/json") } resp, err := c.http.Do(req) if err != nil { return fmt.Errorf("client: do %s %s: %w", method, path, err) } defer resp.Body.Close() respBody, _ := io.ReadAll(resp.Body) if resp.StatusCode >= 300 { return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody)) } if out != nil { if err := json.Unmarshal(respBody, out); err != nil { return fmt.Errorf("client: decode response: %w", err) } } return nil } // signRequest signs the canonical bytes of req (req must already have its Sig // field cleared) with the client's Ed25519 key. It is symmetric with the // server's verifyOwnerSig. func (c *Client) signRequest(req any) []byte { b, _ := json.Marshal(req) return cs.SignEd25519(c.id.SignPriv, b) } // ---- mirror of server wire types (control plane) ------------------------- type policyJSON struct { Encrypt bool `json:"encrypt"` Persist bool `json:"persist"` SignMsgs bool `json:"sign_msgs"` } type endpointJSON struct { Endpoint string `json:"endpoint"` SignPub []byte `json:"sign_pub"` KexPub []byte `json:"kex_pub"` } type createRoomReq struct { Subject string `json:"subject"` Policy policyJSON `json:"policy"` Owner endpointJSON `json:"owner"` SealedKeySelf []byte `json:"sealed_key_self"` } type createRoomResp struct { RoomID string `json:"room_id"` } type inviteReq struct { By string `json:"by"` Sig []byte `json:"sig"` Member endpointJSON `json:"member"` SealedKey []byte `json:"sealed_key"` } type keyResp struct { Epoch int `json:"epoch"` SealedKey []byte `json:"sealed_key"` } type memberJSON struct { Endpoint string `json:"endpoint"` Role string `json:"role"` SignPub []byte `json:"sign_pub"` KexPub []byte `json:"kex_pub"` } type roomResp struct { Subject string `json:"subject"` Epoch int `json:"epoch"` Policy policyJSON `json:"policy"` } type rekeyKey struct { Endpoint string `json:"endpoint"` SealedKey []byte `json:"sealed_key"` } type rekeyReq struct { By string `json:"by"` Sig []byte `json:"sig"` NewEpoch int `json:"new_epoch"` Keys []rekeyKey `json:"keys"` Remove []string `json:"remove"` } type blobResp struct { Hash string `json:"hash"` } // ---- room operations ------------------------------------------------------ // newRoomKey returns 32 random bytes for a symmetric room key. func newRoomKey() ([]byte, error) { k := make([]byte, 32) if _, err := rand.Read(k); err != nil { return nil, fmt.Errorf("client: generate room key: %w", err) } return k, nil } // CreateRoom creates a room with the given subject and policy. For encrypted // rooms it generates K, seals K to itself, and caches K at epoch 1. func (c *Client) CreateRoom(subject string, p room.Policy) (string, error) { req := createRoomReq{ Subject: subject, Policy: policyJSON{Encrypt: p.Encrypt, Persist: p.Persist, SignMsgs: p.SignMsgs}, Owner: endpointJSON{Endpoint: c.endpoint, SignPub: c.id.SignPub, KexPub: c.id.KexPub}, } var k []byte if p.Encrypt { var err error if k, err = newRoomKey(); err != nil { return "", err } sealed, err := cs.SealKeyBox(c.id.KexPub, k) if err != nil { return "", fmt.Errorf("client: seal own key: %w", err) } req.SealedKeySelf = sealed } var resp createRoomResp if err := c.doJSON("POST", "/rooms", req, &resp); err != nil { return "", err } if p.Encrypt { c.cacheKey(resp.RoomID, 1, k) } return resp.RoomID, nil } // Invite adds a member to a room. It seals the current-epoch room key to the // invitee's X25519 public key and signs the request as the owner. func (c *Client) Invite(roomID string, m Endpoint) error { info, err := c.fetchRoom(roomID) if err != nil { return err } var sealed []byte if info.Policy.Encrypt { k, ok := c.getCachedKey(roomID, info.Epoch) if !ok { return fmt.Errorf("client: invite: no cached key for room %s epoch %d", roomID, info.Epoch) } if sealed, err = cs.SealKeyBox(m.KexPub, k); err != nil { return fmt.Errorf("client: seal key for invitee: %w", err) } } req := inviteReq{ By: c.endpoint, Member: endpointJSON{Endpoint: m.ID, SignPub: m.SignPub, KexPub: m.KexPub}, SealedKey: sealed, } req.Sig = c.signRequest(req) // Sig is zero-valued at sign time return c.doJSON("POST", "/rooms/"+roomID+"/invite", req, nil) } // roomView is the resolved room metadata. type roomView struct { Subject string Epoch int Policy room.Policy } func (c *Client) fetchRoom(roomID string) (roomView, error) { var resp roomResp if err := c.doJSON("GET", "/rooms/"+roomID, nil, &resp); err != nil { return roomView{}, err } return roomView{ Subject: resp.Subject, Epoch: resp.Epoch, Policy: room.Policy{Encrypt: resp.Policy.Encrypt, Persist: resp.Policy.Persist, SignMsgs: resp.Policy.SignMsgs}, }, nil } // fetchKey retrieves and caches the room key K for the given epoch (epoch <= 0 // means latest). It opens the sealed box with the client's own X25519 keypair. func (c *Client) fetchKey(roomID string, epoch int) ([]byte, int, error) { if epoch > 0 { if k, ok := c.getCachedKey(roomID, epoch); ok { return k, epoch, nil } } path := fmt.Sprintf("/rooms/%s/key?endpoint=%s", roomID, c.endpoint) if epoch > 0 { path += fmt.Sprintf("&epoch=%d", epoch) } var resp keyResp if err := c.doJSON("GET", path, nil, &resp); err != nil { return nil, 0, err } k, err := cs.OpenKeyBox(c.id.KexPub, c.id.KexPriv, resp.SealedKey) if err != nil { return nil, 0, fmt.Errorf("client: open room key: %w", err) } c.cacheKey(roomID, resp.Epoch, k) return k, resp.Epoch, nil } // Join resolves room metadata and, for encrypted rooms, fetches and caches the // current room key. It does not subscribe to NATS (use Subscribe for that). func (c *Client) Join(roomID string) error { info, err := c.fetchRoom(roomID) if err != nil { return err } if info.Policy.Encrypt { if _, _, err := c.fetchKey(roomID, info.Epoch); err != nil { return err } } return nil } // signerPub returns the sign public key of a sender endpoint, fetching the // member list once and caching it. func (c *Client) signerPub(roomID, sender string) ([]byte, error) { c.mu.RLock() pub, ok := c.signCache[sender] c.mu.RUnlock() if ok { return pub, nil } var members []memberJSON if err := c.doJSON("GET", "/rooms/"+roomID+"/members", nil, &members); err != nil { return nil, err } c.mu.Lock() for _, m := range members { c.signCache[m.Endpoint] = m.SignPub } pub, ok = c.signCache[sender] c.mu.Unlock() if !ok { return nil, fmt.Errorf("client: no sign key for sender %q", sender) } return pub, nil } // ---- data plane: publish/subscribe --------------------------------------- // Publish sends plaintext to a room. For encrypted rooms it seals the payload // with the current K using the subject as AEAD additional-authenticated-data; // for signed rooms it attaches an Ed25519 signature. func (c *Client) Publish(roomID string, plaintext []byte) error { info, err := c.fetchRoom(roomID) if err != nil { return err } f := frame.Frame{ Type: frame.PUB, Subject: info.Subject, Sender: c.endpoint, MsgID: newULID(), Epoch: info.Epoch, } if info.Policy.Encrypt { k, ep, err := c.fetchKey(roomID, info.Epoch) if err != nil { return err } nonce, ct, err := cs.SealAEAD(k, plaintext, []byte(info.Subject)) if err != nil { return fmt.Errorf("client: seal payload: %w", err) } f.Epoch, f.Nonce, f.Payload = ep, nonce, ct } else { f.Payload = plaintext } if info.Policy.SignMsgs { f.Sig = cs.SignEd25519(c.id.SignPriv, f.SigningBytes()) } b, err := f.Marshal() if err != nil { return fmt.Errorf("client: marshal frame: %w", err) } return c.nc.Publish(info.Subject, b) } // Subscribe subscribes to a room and invokes handler for each message with the // decoded frame and (for encrypted rooms) the decrypted plaintext. Signature // verification and epoch-driven key refresh happen transparently. Messages that // fail verification or decryption are dropped (handler not called). func (c *Client) Subscribe(roomID string, handler func(f frame.Frame, plaintext []byte)) (*nats.Subscription, error) { info, err := c.fetchRoom(roomID) if err != nil { return nil, err } return c.nc.Subscribe(info.Subject, func(msg *nats.Msg) { f, err := frame.Unmarshal(msg.Data) if err != nil { return } if info.Policy.SignMsgs && f.Sig != nil { pub, err := c.signerPub(roomID, f.Sender) if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) { return // unauthenticated frame: drop } } plaintext := f.Payload // Decrypt only inline payloads. Media frames carry their bytes in the // blob store (referenced by f.Blob) with the nonce in BlobRef.Nonce; // the handler decrypts those on demand via FetchMedia. A frame with an // inline ciphertext always has a non-empty Nonce. if info.Policy.Encrypt && len(f.Nonce) > 0 && len(f.Payload) > 0 { k, ok := c.getCachedKey(roomID, f.Epoch) if !ok { // Sender used a newer (or unknown) epoch: refresh K from the control plane. k, _, err = c.fetchKey(roomID, f.Epoch) if err != nil { return // cannot obtain key for this epoch (e.g. we were kicked): drop } } pt, err := cs.OpenAEAD(k, f.Nonce, f.Payload, []byte(info.Subject)) if err != nil { return // cannot decrypt (wrong epoch/kicked): drop } plaintext = pt } handler(f, plaintext) }) } // ---- request/reply (cleartext v1) ---------------------------------------- // Request performs a NATS request/reply on subject (cleartext in v1, intended // for rpc.* subjects). func (c *Client) Request(subject string, plaintext []byte, timeout time.Duration) ([]byte, error) { msg, err := c.nc.Request(subject, plaintext, timeout) if err != nil { return nil, fmt.Errorf("client: request %q: %w", subject, err) } return msg.Data, nil } // Reply registers a responder for subject; handler receives the request bytes // and returns the reply bytes (cleartext in v1). func (c *Client) Reply(subject string, handler func([]byte) []byte) (*nats.Subscription, error) { return c.nc.Subscribe(subject, func(msg *nats.Msg) { if msg.Reply == "" { return } _ = c.nc.Publish(msg.Reply, handler(msg.Data)) }) } // ---- kick / forward secrecy ---------------------------------------------- // Kick removes a member and rotates the room key to a new epoch, re-sealing it // only for the remaining members. The kicked member can no longer decrypt // messages published after the rotation (forward secrecy). func (c *Client) Kick(roomID string, endpoint string) error { info, err := c.fetchRoom(roomID) if err != nil { return err } newEpoch := info.Epoch + 1 if !info.Policy.Encrypt { // Unencrypted room: just remove the member, no key rotation needed. req := rekeyReq{By: c.endpoint, NewEpoch: newEpoch, Remove: []string{endpoint}} req.Sig = c.signRequest(req) return c.doJSON("POST", "/rooms/"+roomID+"/rekey", req, nil) } kPrime, err := newRoomKey() if err != nil { return err } var members []memberJSON if err := c.doJSON("GET", "/rooms/"+roomID+"/members", nil, &members); err != nil { return err } var keys []rekeyKey for _, m := range members { if m.Endpoint == endpoint { continue // exclude the kicked member } sealed, err := cs.SealKeyBox(m.KexPub, kPrime) if err != nil { return fmt.Errorf("client: seal new key for %q: %w", m.Endpoint, err) } keys = append(keys, rekeyKey{Endpoint: m.Endpoint, SealedKey: sealed}) } req := rekeyReq{By: c.endpoint, NewEpoch: newEpoch, Keys: keys, Remove: []string{endpoint}} req.Sig = c.signRequest(req) if err := c.doJSON("POST", "/rooms/"+roomID+"/rekey", req, nil); err != nil { return err } c.cacheKey(roomID, newEpoch, kPrime) return nil } // ---- media (object store) ------------------------------------------------- // PublishMedia encrypts data with the room key, uploads the ciphertext to the // blob store, and publishes a frame carrying only a BlobRef. Receivers whose // handler sees f.Blob != nil should GET /blobs/{hash} and OpenAEAD it. func (c *Client) PublishMedia(roomID string, data []byte) error { info, err := c.fetchRoom(roomID) if err != nil { return err } f := frame.Frame{ Type: frame.PUB, Subject: info.Subject, Sender: c.endpoint, MsgID: newULID(), Epoch: info.Epoch, } var ciphertext []byte var nonce []byte if info.Policy.Encrypt { k, ep, err := c.fetchKey(roomID, info.Epoch) if err != nil { return err } nonce, ciphertext, err = cs.SealAEAD(k, data, []byte(info.Subject)) if err != nil { return fmt.Errorf("client: seal media: %w", err) } f.Epoch = ep } else { ciphertext = data } hash, err := c.putBlob(ciphertext) if err != nil { return err } f.Blob = &frame.BlobRef{Hash: hash, Nonce: nonce, Size: int64(len(ciphertext))} if info.Policy.SignMsgs { f.Sig = cs.SignEd25519(c.id.SignPriv, f.SigningBytes()) } b, err := f.Marshal() if err != nil { return fmt.Errorf("client: marshal media frame: %w", err) } return c.nc.Publish(info.Subject, b) } // FetchMedia downloads and (for encrypted rooms) decrypts a blob referenced by // a received frame. It is a convenience for handlers that see f.Blob != nil. func (c *Client) FetchMedia(roomID string, f frame.Frame) ([]byte, error) { if f.Blob == nil { return nil, fmt.Errorf("client: frame has no blob ref") } ct, err := c.getBlob(f.Blob.Hash) if err != nil { return nil, err } info, err := c.fetchRoom(roomID) if err != nil { return nil, err } if !info.Policy.Encrypt { return ct, nil } k, ok := c.getCachedKey(roomID, f.Epoch) if !ok { if k, _, err = c.fetchKey(roomID, f.Epoch); err != nil { return nil, err } } return cs.OpenAEAD(k, f.Blob.Nonce, ct, []byte(info.Subject)) } func (c *Client) putBlob(ciphertext []byte) (string, error) { req, err := http.NewRequest("POST", c.ctrlURL+"/blobs", bytes.NewReader(ciphertext)) if err != nil { return "", fmt.Errorf("client: new blob request: %w", err) } req.Header.Set("Content-Type", "application/octet-stream") resp, err := c.http.Do(req) if err != nil { return "", fmt.Errorf("client: put blob: %w", err) } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) if resp.StatusCode >= 300 { return "", fmt.Errorf("client: put blob -> %d: %s", resp.StatusCode, string(body)) } var r blobResp if err := json.Unmarshal(body, &r); err != nil { return "", fmt.Errorf("client: decode blob resp: %w", err) } return r.Hash, nil } func (c *Client) getBlob(hash string) ([]byte, error) { resp, err := c.http.Get(c.ctrlURL + "/blobs/" + hash) if err != nil { return nil, fmt.Errorf("client: get blob: %w", err) } defer resp.Body.Close() if resp.StatusCode >= 300 { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("client: get blob -> %d: %s", resp.StatusCode, string(body)) } return io.ReadAll(resp.Body) }