diff --git a/pkg/client/client.go b/pkg/client/client.go index 66ed0c5c..2bc4c357 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -392,20 +392,31 @@ func (c *Client) signerPub(roomID, sender string) ([]byte, error) { // ---- data plane: publish/subscribe --------------------------------------- -// Publish sends plaintext to a room. For encrypted rooms it seals the payload -// with the current K using the subject as AEAD additional-authenticated-data; -// for signed rooms it attaches an Ed25519 signature. -func (c *Client) Publish(roomID string, plaintext []byte) error { +// threadMeta carries the optional threading/reaction routing of a published +// frame. The zero value yields a plain top-level message whose wire bytes are +// identical to a pre-threading frame (the fields are omitempty). +type threadMeta struct { + threadID string // thread root message id + replyTo string // message id being replied to / reacted to +} + +// publishFrame is the single publish path shared by Publish, PublishReply and +// React. It builds the envelope, seals+signs per the room policy, and routes +// through JetStream (persisted rooms) or core NATS (ephemeral rooms). The only +// thing the callers vary is the frame type and the threading metadata. +func (c *Client) publishFrame(roomID string, ftype frame.FrameType, plaintext []byte, tm threadMeta) error { info, err := c.fetchRoom(roomID) if err != nil { return err } f := frame.Frame{ - Type: frame.PUB, - Subject: info.Subject, - Sender: c.endpoint, - MsgID: newULID(), - Epoch: info.Epoch, + Type: ftype, + Subject: info.Subject, + Sender: c.endpoint, + MsgID: newULID(), + Epoch: info.Epoch, + ThreadID: tm.threadID, + ReplyTo: tm.replyTo, } if info.Policy.Encrypt { k, ep, err := c.fetchKey(roomID, info.Epoch) @@ -435,6 +446,31 @@ func (c *Client) Publish(roomID string, plaintext []byte) error { return c.nc.Publish(info.Subject, b) } +// Publish sends plaintext to a room. For encrypted rooms it seals the payload +// with the current K using the subject as AEAD additional-authenticated-data; +// for signed rooms it attaches an Ed25519 signature. +func (c *Client) Publish(roomID string, plaintext []byte) error { + return c.publishFrame(roomID, frame.PUB, plaintext, threadMeta{}) +} + +// PublishReply sends plaintext as a reply inside a thread. replyTo is the id of +// the message being replied to; threadID is the thread root — pass replyTo when +// you are starting a new thread off a top-level message, or the existing +// ThreadID to keep replying within one. Encryption and signing are identical to +// Publish; the threading metadata rides the cleartext envelope. Receivers read +// Frame.ReplyTo / Frame.ThreadID to render the conversation tree. +func (c *Client) PublishReply(roomID string, plaintext []byte, replyTo, threadID string) error { + return c.publishFrame(roomID, frame.PUB, plaintext, threadMeta{threadID: threadID, replyTo: replyTo}) +} + +// React publishes a reaction (emoji/shortcode) to a target message. The reaction +// content travels in the payload, so it is sealed exactly like a normal message +// and stays confidential in E2E rooms. Receivers dispatch on Frame.Type == +// frame.REACT and read Frame.ReplyTo for the message being reacted to. +func (c *Client) React(roomID, targetMsgID, emoji string) error { + return c.publishFrame(roomID, frame.REACT, []byte(emoji), threadMeta{replyTo: targetMsgID}) +} + // Sub is a transport-agnostic handle to an active room subscription. It wraps // either a core NATS subscription (ephemeral rooms) or a JetStream durable // consumer (persisted rooms) behind a single Unsubscribe() method, so callers diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index ff0b0fd5..1e07d619 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -302,6 +302,106 @@ func TestMediaBlobRoundTrip(t *testing.T) { } } +// TestThreadedReplyAndReaction exercises the additive threading API end to end +// in an encrypted, persisted, signed room (ModeMatrix): A publishes a root +// message, replies to it within a thread, and reacts to it with an emoji. The +// loopback subscriber must observe the reply carrying ReplyTo/ThreadID and the +// reaction as a frame.REACT whose (decrypted) payload is the emoji — proving the +// reaction stays sealed like any message in an E2E room. +func TestThreadedReplyAndReaction(t *testing.T) { + h := newHarness(t) + waitHealth(t, h.ctrlURL) + + a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect A: %v", err) + } + defer a.Close() + + roomID, err := a.CreateRoom("room.thread", room.ModeMatrix) + if err != nil { + t.Fatalf("create room: %v", err) + } + + type rec struct { + f frame.Frame + pt string + } + var mu sync.Mutex + var got []rec + sub, err := a.Subscribe(roomID, func(f frame.Frame, pt []byte) { + mu.Lock() + got = append(got, rec{f: f, pt: string(pt)}) + mu.Unlock() + }) + if err != nil { + t.Fatalf("subscribe: %v", err) + } + defer sub.Unsubscribe() + time.Sleep(150 * time.Millisecond) + + find := func(pred func(rec) bool) (rec, bool) { + mu.Lock() + defer mu.Unlock() + for _, r := range got { + if pred(r) { + return r, true + } + } + return rec{}, false + } + waitRec := func(pred func(rec) bool) (rec, bool) { + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if r, ok := find(pred); ok { + return r, true + } + time.Sleep(25 * time.Millisecond) + } + return rec{}, false + } + + // 1. Root message. + if err := a.Publish(roomID, []byte("root")); err != nil { + t.Fatalf("publish root: %v", err) + } + rootRec, ok := waitRec(func(r rec) bool { return r.pt == "root" }) + if !ok { + t.Fatalf("never observed the root message") + } + rootID := rootRec.f.MsgID + if rootID == "" { + t.Fatalf("root frame has empty MsgID") + } + + // 2. Threaded reply to the root. + if err := a.PublishReply(roomID, []byte("child"), rootID, rootID); err != nil { + t.Fatalf("publish reply: %v", err) + } + reply, ok := waitRec(func(r rec) bool { return r.pt == "child" }) + if !ok { + t.Fatalf("never observed the threaded reply") + } + if reply.f.ReplyTo != rootID || reply.f.ThreadID != rootID { + t.Fatalf("reply threading lost: ReplyTo=%q ThreadID=%q want %q", reply.f.ReplyTo, reply.f.ThreadID, rootID) + } + + // 3. Reaction to the root (emoji rides the encrypted payload). + if err := a.React(roomID, rootID, "👍"); err != nil { + t.Fatalf("react: %v", err) + } + reaction, ok := waitRec(func(r rec) bool { return r.f.Type == frame.REACT }) + if !ok { + t.Fatalf("never observed the reaction frame") + } + if reaction.f.ReplyTo != rootID { + t.Fatalf("reaction target lost: ReplyTo=%q want %q", reaction.f.ReplyTo, rootID) + } + if reaction.pt != "👍" { + t.Fatalf("reaction payload mismatch: got %q want 👍 (decryption in E2E room)", reaction.pt) + } +} + // ---- test helpers --------------------------------------------------------- type collector struct { diff --git a/pkg/frame/frame.go b/pkg/frame/frame.go index d70524e9..2b5aafc4 100644 --- a/pkg/frame/frame.go +++ b/pkg/frame/frame.go @@ -36,6 +36,10 @@ const ( KICK // ACK acknowledges receipt of a previous frame. ACK + // REACT is a reaction to a previous message (an emoji/shortcode). The target + // message id travels in ReplyTo; the reaction content rides Payload, so in + // encrypted rooms the reaction is sealed exactly like any other message. + REACT ) // BlobRef references an out-of-band encrypted blob stored in the object store. @@ -47,16 +51,23 @@ type BlobRef struct { } // Frame is the unit of transport on the unibus message bus. +// +// Threading metadata (ThreadID, ReplyTo) is additive and optional: it travels in +// the cleartext envelope (these are message-id references, not secret content) +// and is omitted entirely when unset, so the wire format and signatures of +// non-threaded frames are byte-for-byte identical to before this field existed. type Frame struct { - Type FrameType `json:"t"` - Subject string `json:"s"` - Sender string `json:"from"` // endpoint id = EndpointID(signPub) - MsgID string `json:"id"` // ULID - Epoch int `json:"e"` // epoch of the room key K used to encrypt - Nonce []byte `json:"n,omitempty"` // AEAD nonce (encrypted rooms only) - Payload []byte `json:"p,omitempty"` // AEAD ciphertext (or cleartext if the room does not encrypt) - Blob *BlobRef `json:"b,omitempty"` - Sig []byte `json:"sig,omitempty"` // Ed25519 signature over SigningBytes() + Type FrameType `json:"t"` + Subject string `json:"s"` + Sender string `json:"from"` // endpoint id = EndpointID(signPub) + MsgID string `json:"id"` // ULID + Epoch int `json:"e"` // epoch of the room key K used to encrypt + ThreadID string `json:"thr,omitempty"` // root message id of the thread this frame belongs to + ReplyTo string `json:"re,omitempty"` // message id this frame replies to / reacts to + Nonce []byte `json:"n,omitempty"` // AEAD nonce (encrypted rooms only) + Payload []byte `json:"p,omitempty"` // AEAD ciphertext (or cleartext if the room does not encrypt) + Blob *BlobRef `json:"b,omitempty"` + Sig []byte `json:"sig,omitempty"` // Ed25519 signature over SigningBytes() } // Marshal serializes the frame to its wire representation (JSON in v1). diff --git a/pkg/frame/frame_test.go b/pkg/frame/frame_test.go index 12e121d2..0b817717 100644 --- a/pkg/frame/frame_test.go +++ b/pkg/frame/frame_test.go @@ -2,6 +2,7 @@ package frame import ( "bytes" + "strings" "testing" ) @@ -40,6 +41,67 @@ func TestMarshalUnmarshalRoundTrip(t *testing.T) { } } +// TestThreadingRoundTrip (golden) verifies that the additive threading fields +// survive a marshal/unmarshal cycle and that a REACT frame keeps its target. +func TestThreadingRoundTrip(t *testing.T) { + orig := Frame{ + Type: REACT, + Subject: "room.general", + Sender: "abc123", + MsgID: "01J000000000000000000002", + Epoch: 1, + ThreadID: "01J000000000000000000000", + ReplyTo: "01J000000000000000000001", + Payload: []byte("👍"), + } + b, err := orig.Marshal() + if err != nil { + t.Fatalf("Marshal: %v", err) + } + got, err := Unmarshal(b) + if err != nil { + t.Fatalf("Unmarshal: %v", err) + } + if got.Type != REACT { + t.Fatalf("type mismatch: got %d want REACT(%d)", got.Type, REACT) + } + if got.ThreadID != orig.ThreadID || got.ReplyTo != orig.ReplyTo { + t.Fatalf("threading fields lost: got thr=%q re=%q", got.ThreadID, got.ReplyTo) + } + if !bytes.Equal(got.Payload, orig.Payload) { + t.Fatalf("reaction payload mismatch: got %q", got.Payload) + } +} + +// TestNonThreadedWireBackCompat (edge) asserts that a frame without threading +// metadata serializes with NO thr/re keys at all, so its bytes — and therefore +// its signature — are identical to a pre-threading frame. This is the +// guarantee that makes the new fields a non-breaking, additive change. +func TestNonThreadedWireBackCompat(t *testing.T) { + f := Frame{Type: PUB, Subject: "room.general", Sender: "x", MsgID: "id", Epoch: 2, Payload: []byte("hi")} + b, err := f.Marshal() + if err != nil { + t.Fatalf("Marshal: %v", err) + } + s := string(b) + if strings.Contains(s, "\"thr\"") || strings.Contains(s, "\"re\"") { + t.Fatalf("threading keys leaked into a non-threaded frame: %s", s) + } + // SigningBytes of a non-threaded frame must also be free of the keys, so old + // signatures over equivalent frames still verify. + if sb := f.SigningBytes(); strings.Contains(string(sb), "\"thr\"") || strings.Contains(string(sb), "\"re\"") { + t.Fatalf("threading keys leaked into SigningBytes: %s", sb) + } +} + +// TestUnmarshalRejectsGarbage (error path) verifies that malformed wire bytes +// surface as an error rather than a silently zero-valued frame. +func TestUnmarshalRejectsGarbage(t *testing.T) { + if _, err := Unmarshal([]byte("{not valid json")); err == nil { + t.Fatalf("expected error unmarshaling garbage, got nil") + } +} + func TestEndpointIDDeterministic(t *testing.T) { pub := []byte("some-ed25519-public-key-bytes-32") a := EndpointID(pub)