From 22092834bdce088c9b025044f11d70df79817a78 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Sat, 6 Jun 2026 18:10:44 +0200 Subject: [PATCH] =?UTF-8?q?feat(frame):=20additive=20threading=20=E2=80=94?= =?UTF-8?q?=20ThreadID,=20ReplyTo=20+=20REACT=20type?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Chat bots need replies, threads and reactions. Add two optional, omitempty envelope fields (ThreadID, ReplyTo) plus a REACT frame type. The fields ride the cleartext envelope (message-id references, not secret content) and are omitted when unset, so non-threaded frames are byte-for-byte identical on the wire and their signatures unchanged — a non-breaking, additive change. Client gains PublishReply (threaded reply) and React (emoji reaction). The reaction content travels in the payload, so it is sealed like any message and stays confidential in E2E rooms; receivers dispatch on Frame.Type == REACT and read Frame.ReplyTo for the target. Publish is refactored to share one publishFrame path with the new helpers; its behavior is unchanged. Tests: frame round-trip of a threaded REACT frame (golden), non-threaded wire/sig back-compat asserting thr/re keys are absent (edge), Unmarshal of garbage errors (error path), and an end-to-end reply+reaction round-trip in an encrypted ModeMatrix room. Co-Authored-By: Claude Opus 4.8 (1M context) --- pkg/client/client.go | 54 ++++++++++++++++---- pkg/client/client_test.go | 100 ++++++++++++++++++++++++++++++++++++++ pkg/frame/frame.go | 29 +++++++---- pkg/frame/frame_test.go | 62 +++++++++++++++++++++++ 4 files changed, 227 insertions(+), 18 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 66ed0c5c..2bc4c357 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -392,20 +392,31 @@ func (c *Client) signerPub(roomID, sender string) ([]byte, error) { // ---- data plane: publish/subscribe --------------------------------------- -// Publish sends plaintext to a room. For encrypted rooms it seals the payload -// with the current K using the subject as AEAD additional-authenticated-data; -// for signed rooms it attaches an Ed25519 signature. -func (c *Client) Publish(roomID string, plaintext []byte) error { +// threadMeta carries the optional threading/reaction routing of a published +// frame. The zero value yields a plain top-level message whose wire bytes are +// identical to a pre-threading frame (the fields are omitempty). +type threadMeta struct { + threadID string // thread root message id + replyTo string // message id being replied to / reacted to +} + +// publishFrame is the single publish path shared by Publish, PublishReply and +// React. It builds the envelope, seals+signs per the room policy, and routes +// through JetStream (persisted rooms) or core NATS (ephemeral rooms). The only +// thing the callers vary is the frame type and the threading metadata. +func (c *Client) publishFrame(roomID string, ftype frame.FrameType, plaintext []byte, tm threadMeta) error { info, err := c.fetchRoom(roomID) if err != nil { return err } f := frame.Frame{ - Type: frame.PUB, - Subject: info.Subject, - Sender: c.endpoint, - MsgID: newULID(), - Epoch: info.Epoch, + Type: ftype, + Subject: info.Subject, + Sender: c.endpoint, + MsgID: newULID(), + Epoch: info.Epoch, + ThreadID: tm.threadID, + ReplyTo: tm.replyTo, } if info.Policy.Encrypt { k, ep, err := c.fetchKey(roomID, info.Epoch) @@ -435,6 +446,31 @@ func (c *Client) Publish(roomID string, plaintext []byte) error { return c.nc.Publish(info.Subject, b) } +// Publish sends plaintext to a room. For encrypted rooms it seals the payload +// with the current K using the subject as AEAD additional-authenticated-data; +// for signed rooms it attaches an Ed25519 signature. +func (c *Client) Publish(roomID string, plaintext []byte) error { + return c.publishFrame(roomID, frame.PUB, plaintext, threadMeta{}) +} + +// PublishReply sends plaintext as a reply inside a thread. replyTo is the id of +// the message being replied to; threadID is the thread root — pass replyTo when +// you are starting a new thread off a top-level message, or the existing +// ThreadID to keep replying within one. Encryption and signing are identical to +// Publish; the threading metadata rides the cleartext envelope. Receivers read +// Frame.ReplyTo / Frame.ThreadID to render the conversation tree. +func (c *Client) PublishReply(roomID string, plaintext []byte, replyTo, threadID string) error { + return c.publishFrame(roomID, frame.PUB, plaintext, threadMeta{threadID: threadID, replyTo: replyTo}) +} + +// React publishes a reaction (emoji/shortcode) to a target message. The reaction +// content travels in the payload, so it is sealed exactly like a normal message +// and stays confidential in E2E rooms. Receivers dispatch on Frame.Type == +// frame.REACT and read Frame.ReplyTo for the message being reacted to. +func (c *Client) React(roomID, targetMsgID, emoji string) error { + return c.publishFrame(roomID, frame.REACT, []byte(emoji), threadMeta{replyTo: targetMsgID}) +} + // Sub is a transport-agnostic handle to an active room subscription. It wraps // either a core NATS subscription (ephemeral rooms) or a JetStream durable // consumer (persisted rooms) behind a single Unsubscribe() method, so callers diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index ff0b0fd5..1e07d619 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -302,6 +302,106 @@ func TestMediaBlobRoundTrip(t *testing.T) { } } +// TestThreadedReplyAndReaction exercises the additive threading API end to end +// in an encrypted, persisted, signed room (ModeMatrix): A publishes a root +// message, replies to it within a thread, and reacts to it with an emoji. The +// loopback subscriber must observe the reply carrying ReplyTo/ThreadID and the +// reaction as a frame.REACT whose (decrypted) payload is the emoji — proving the +// reaction stays sealed like any message in an E2E room. +func TestThreadedReplyAndReaction(t *testing.T) { + h := newHarness(t) + waitHealth(t, h.ctrlURL) + + a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect A: %v", err) + } + defer a.Close() + + roomID, err := a.CreateRoom("room.thread", room.ModeMatrix) + if err != nil { + t.Fatalf("create room: %v", err) + } + + type rec struct { + f frame.Frame + pt string + } + var mu sync.Mutex + var got []rec + sub, err := a.Subscribe(roomID, func(f frame.Frame, pt []byte) { + mu.Lock() + got = append(got, rec{f: f, pt: string(pt)}) + mu.Unlock() + }) + if err != nil { + t.Fatalf("subscribe: %v", err) + } + defer sub.Unsubscribe() + time.Sleep(150 * time.Millisecond) + + find := func(pred func(rec) bool) (rec, bool) { + mu.Lock() + defer mu.Unlock() + for _, r := range got { + if pred(r) { + return r, true + } + } + return rec{}, false + } + waitRec := func(pred func(rec) bool) (rec, bool) { + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if r, ok := find(pred); ok { + return r, true + } + time.Sleep(25 * time.Millisecond) + } + return rec{}, false + } + + // 1. Root message. + if err := a.Publish(roomID, []byte("root")); err != nil { + t.Fatalf("publish root: %v", err) + } + rootRec, ok := waitRec(func(r rec) bool { return r.pt == "root" }) + if !ok { + t.Fatalf("never observed the root message") + } + rootID := rootRec.f.MsgID + if rootID == "" { + t.Fatalf("root frame has empty MsgID") + } + + // 2. Threaded reply to the root. + if err := a.PublishReply(roomID, []byte("child"), rootID, rootID); err != nil { + t.Fatalf("publish reply: %v", err) + } + reply, ok := waitRec(func(r rec) bool { return r.pt == "child" }) + if !ok { + t.Fatalf("never observed the threaded reply") + } + if reply.f.ReplyTo != rootID || reply.f.ThreadID != rootID { + t.Fatalf("reply threading lost: ReplyTo=%q ThreadID=%q want %q", reply.f.ReplyTo, reply.f.ThreadID, rootID) + } + + // 3. Reaction to the root (emoji rides the encrypted payload). + if err := a.React(roomID, rootID, "👍"); err != nil { + t.Fatalf("react: %v", err) + } + reaction, ok := waitRec(func(r rec) bool { return r.f.Type == frame.REACT }) + if !ok { + t.Fatalf("never observed the reaction frame") + } + if reaction.f.ReplyTo != rootID { + t.Fatalf("reaction target lost: ReplyTo=%q want %q", reaction.f.ReplyTo, rootID) + } + if reaction.pt != "👍" { + t.Fatalf("reaction payload mismatch: got %q want 👍 (decryption in E2E room)", reaction.pt) + } +} + // ---- test helpers --------------------------------------------------------- type collector struct { diff --git a/pkg/frame/frame.go b/pkg/frame/frame.go index d70524e9..2b5aafc4 100644 --- a/pkg/frame/frame.go +++ b/pkg/frame/frame.go @@ -36,6 +36,10 @@ const ( KICK // ACK acknowledges receipt of a previous frame. ACK + // REACT is a reaction to a previous message (an emoji/shortcode). The target + // message id travels in ReplyTo; the reaction content rides Payload, so in + // encrypted rooms the reaction is sealed exactly like any other message. + REACT ) // BlobRef references an out-of-band encrypted blob stored in the object store. @@ -47,16 +51,23 @@ type BlobRef struct { } // Frame is the unit of transport on the unibus message bus. +// +// Threading metadata (ThreadID, ReplyTo) is additive and optional: it travels in +// the cleartext envelope (these are message-id references, not secret content) +// and is omitted entirely when unset, so the wire format and signatures of +// non-threaded frames are byte-for-byte identical to before this field existed. type Frame struct { - Type FrameType `json:"t"` - Subject string `json:"s"` - Sender string `json:"from"` // endpoint id = EndpointID(signPub) - MsgID string `json:"id"` // ULID - Epoch int `json:"e"` // epoch of the room key K used to encrypt - Nonce []byte `json:"n,omitempty"` // AEAD nonce (encrypted rooms only) - Payload []byte `json:"p,omitempty"` // AEAD ciphertext (or cleartext if the room does not encrypt) - Blob *BlobRef `json:"b,omitempty"` - Sig []byte `json:"sig,omitempty"` // Ed25519 signature over SigningBytes() + Type FrameType `json:"t"` + Subject string `json:"s"` + Sender string `json:"from"` // endpoint id = EndpointID(signPub) + MsgID string `json:"id"` // ULID + Epoch int `json:"e"` // epoch of the room key K used to encrypt + ThreadID string `json:"thr,omitempty"` // root message id of the thread this frame belongs to + ReplyTo string `json:"re,omitempty"` // message id this frame replies to / reacts to + Nonce []byte `json:"n,omitempty"` // AEAD nonce (encrypted rooms only) + Payload []byte `json:"p,omitempty"` // AEAD ciphertext (or cleartext if the room does not encrypt) + Blob *BlobRef `json:"b,omitempty"` + Sig []byte `json:"sig,omitempty"` // Ed25519 signature over SigningBytes() } // Marshal serializes the frame to its wire representation (JSON in v1). diff --git a/pkg/frame/frame_test.go b/pkg/frame/frame_test.go index 12e121d2..0b817717 100644 --- a/pkg/frame/frame_test.go +++ b/pkg/frame/frame_test.go @@ -2,6 +2,7 @@ package frame import ( "bytes" + "strings" "testing" ) @@ -40,6 +41,67 @@ func TestMarshalUnmarshalRoundTrip(t *testing.T) { } } +// TestThreadingRoundTrip (golden) verifies that the additive threading fields +// survive a marshal/unmarshal cycle and that a REACT frame keeps its target. +func TestThreadingRoundTrip(t *testing.T) { + orig := Frame{ + Type: REACT, + Subject: "room.general", + Sender: "abc123", + MsgID: "01J000000000000000000002", + Epoch: 1, + ThreadID: "01J000000000000000000000", + ReplyTo: "01J000000000000000000001", + Payload: []byte("👍"), + } + b, err := orig.Marshal() + if err != nil { + t.Fatalf("Marshal: %v", err) + } + got, err := Unmarshal(b) + if err != nil { + t.Fatalf("Unmarshal: %v", err) + } + if got.Type != REACT { + t.Fatalf("type mismatch: got %d want REACT(%d)", got.Type, REACT) + } + if got.ThreadID != orig.ThreadID || got.ReplyTo != orig.ReplyTo { + t.Fatalf("threading fields lost: got thr=%q re=%q", got.ThreadID, got.ReplyTo) + } + if !bytes.Equal(got.Payload, orig.Payload) { + t.Fatalf("reaction payload mismatch: got %q", got.Payload) + } +} + +// TestNonThreadedWireBackCompat (edge) asserts that a frame without threading +// metadata serializes with NO thr/re keys at all, so its bytes — and therefore +// its signature — are identical to a pre-threading frame. This is the +// guarantee that makes the new fields a non-breaking, additive change. +func TestNonThreadedWireBackCompat(t *testing.T) { + f := Frame{Type: PUB, Subject: "room.general", Sender: "x", MsgID: "id", Epoch: 2, Payload: []byte("hi")} + b, err := f.Marshal() + if err != nil { + t.Fatalf("Marshal: %v", err) + } + s := string(b) + if strings.Contains(s, "\"thr\"") || strings.Contains(s, "\"re\"") { + t.Fatalf("threading keys leaked into a non-threaded frame: %s", s) + } + // SigningBytes of a non-threaded frame must also be free of the keys, so old + // signatures over equivalent frames still verify. + if sb := f.SigningBytes(); strings.Contains(string(sb), "\"thr\"") || strings.Contains(string(sb), "\"re\"") { + t.Fatalf("threading keys leaked into SigningBytes: %s", sb) + } +} + +// TestUnmarshalRejectsGarbage (error path) verifies that malformed wire bytes +// surface as an error rather than a silently zero-valued frame. +func TestUnmarshalRejectsGarbage(t *testing.T) { + if _, err := Unmarshal([]byte("{not valid json")); err == nil { + t.Fatalf("expected error unmarshaling garbage, got nil") + } +} + func TestEndpointIDDeterministic(t *testing.T) { pub := []byte("some-ed25519-public-key-bytes-32") a := EndpointID(pub)