// Package transportunibus implements transport.Transport over the unibus message // bus (github.com/enmanuel/unibus). A bot built on the neutral // transport.Transport speaks unibus instead of Matrix: it discovers the rooms it // has been invited to, joins them, and replies in the room a message arrived on. // // Room-based model ("everything is a room"): // // - There is no inbox/outbox subject convention. A conversation is a unibus // room; a 1:1 DM is just a room with two members. A human peer creates an // encrypted room (room.ModeMatrix), invites the bot by its endpoint id, and // publishes a message. The bot finds the room by polling ListMyRooms, // Joins (fetching the sealed room key), Subscribes, and answers in place. // - The control plane is pull-based: there is no server push of invitations, // so the bot polls ListMyRooms on a ticker and reacts to rooms it has not // seen before. // // This adapter carries no Matrix (mautrix) types, so the agent core driving it // stays transport-neutral. package transportunibus import ( "context" "encoding/json" "fmt" "log/slog" "net/http" "strings" "sync" "time" "github.com/enmanuel/agents/internal/config" "github.com/enmanuel/agents/pkg/message" "github.com/enmanuel/agents/pkg/transport" "github.com/enmanuel/unibus/pkg/client" "github.com/enmanuel/unibus/pkg/frame" ) // defaultCommandPrefix marks a command message (e.g. "!ping") when the bot's // config does not override it. const defaultCommandPrefix = "!" // discoveryInterval is how often the bot polls the control plane for rooms it // has been invited to. The control plane has no push, so this is the latency a // human waits between inviting the bot and the bot joining. const discoveryInterval = 2 * time.Second // Transport is a unibus-backed transport.Transport for one bot. It discovers // rooms, subscribes to them, and replies in the room each message came from. type Transport struct { handle string commandPrefix string client *client.Client endpoint string // this bot's own endpoint id, to skip its own messages ctrlURL string http *http.Client logger *slog.Logger mu sync.Mutex subscribed map[string]*client.Sub // roomID -> active subscription memberCount map[string]int // roomID -> cached member count (for IsDirectMsg) } // compile-time assertion that Transport satisfies the neutral interface. var _ transport.Transport = (*Transport)(nil) // New connects to a unibus deployment using the bot's BusCfg. It loads (or // creates) the bot's long-term identity, connects to the NATS data plane and // the membershipd control plane, and records the handle used for mention // detection. It does not create or join any room: rooms are discovered at Run // time as the bot is invited to them. func New(busCfg config.BusCfg, logger *slog.Logger) (*Transport, error) { id, err := client.LoadOrCreateIdentity(busCfg.IdentityPath) if err != nil { return nil, fmt.Errorf("transportunibus: identity: %w", err) } c, err := client.New(busCfg.NatsURL, busCfg.CtrlURL, id) if err != nil { return nil, fmt.Errorf("transportunibus: connect: %w", err) } if logger == nil { logger = slog.Default() } prefix := busCfg.CommandPrefix if prefix == "" { prefix = defaultCommandPrefix } return &Transport{ handle: busCfg.Handle, commandPrefix: prefix, client: c, endpoint: c.Endpoint().ID, ctrlURL: busCfg.CtrlURL, http: &http.Client{Timeout: 10 * time.Second}, logger: logger, subscribed: map[string]*client.Sub{}, memberCount: map[string]int{}, }, nil } // Endpoint returns this bot's public endpoint id. A human peer needs it to // invite the bot to a room (the bot logs it at startup; a directory is a later // step). func (t *Transport) Endpoint() string { return t.endpoint } // BusEndpoint returns this bot's full public endpoint (id + signing/key-exchange // public keys). A peer inviting the bot to an encrypted room needs the public // keys to seal the room key for it. func (t *Transport) BusEndpoint() client.Endpoint { return t.client.Endpoint() } // Run polls the control plane for rooms the bot has been invited to, joins and // subscribes to each new one, and delivers every decrypted frame to handler as // a neutral InboundMessage. It blocks until ctx is cancelled. func (t *Transport) Run(ctx context.Context, handler transport.Handler) error { t.logger.Info("unibus transport running", "handle", t.handle, "endpoint", t.endpoint) ticker := time.NewTicker(discoveryInterval) defer ticker.Stop() defer t.unsubscribeAll() // Discover immediately so we don't wait a full interval on startup. t.discover(ctx, handler) for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: t.discover(ctx, handler) } } } // discover lists the bot's rooms and joins+subscribes to any it has not seen. func (t *Transport) discover(ctx context.Context, handler transport.Handler) { rooms, err := t.client.ListMyRooms() if err != nil { t.logger.Warn("unibus discover: list rooms failed", "err", err) return } for _, r := range rooms { t.mu.Lock() _, already := t.subscribed[r.RoomID] t.mu.Unlock() if already { continue } if err := t.client.Join(r.RoomID); err != nil { t.logger.Warn("unibus discover: join failed", "room", r.RoomID, "err", err) continue } roomID := r.RoomID sub, err := t.client.Subscribe(roomID, func(f frame.Frame, plaintext []byte) { t.onFrame(ctx, handler, roomID, f, plaintext) }) if err != nil { t.logger.Warn("unibus discover: subscribe failed", "room", roomID, "err", err) continue } t.mu.Lock() t.subscribed[roomID] = sub t.mu.Unlock() t.logger.Info("joined and subscribed to room", "room", roomID, "subject", r.Subject) } } // onFrame maps a decrypted frame to a neutral InboundMessage and delivers it. // It skips the bot's own messages (to avoid replying to itself), parses any // command, and computes IsDirectMsg (2-member room) and IsMention (handle in // body) so the agent core's command/LLM flow behaves exactly as it did on // Matrix. func (t *Transport) onFrame(ctx context.Context, handler transport.Handler, roomID string, f frame.Frame, plaintext []byte) { if f.Sender == t.endpoint { return // never react to our own messages } body := string(plaintext) isDM := t.roomMemberCount(roomID) == 2 isMention := t.handle != "" && strings.Contains(strings.ToLower(body), strings.ToLower(t.handle)) // Reuse the pure command parser so "!cmd args" is split the same way the // Matrix listener split it. parsed := message.Parse(body, f.Sender, roomID, 0, isDM, message.ParseOptions{ CommandPrefix: t.commandPrefix, }) handler(ctx, transport.InboundMessage{ RoomID: roomID, Subject: f.Subject, SenderID: f.Sender, MsgID: f.MsgID, ThreadID: f.ThreadID, ReplyTo: f.ReplyTo, Body: body, Command: parsed.Command, Args: parsed.Args, IsDirectMsg: isDM, IsMention: isMention, }) } // Reply publishes a reply into the room the message came from. When the reply // carries a ReplyTo / ThreadID anchor it is published as a threaded reply so // receivers can render the conversation tree. func (t *Transport) Reply(_ context.Context, out transport.OutboundReply) error { if out.ReplyTo != "" || out.ThreadID != "" { return t.client.PublishReply(out.RoomID, []byte(out.Markdown), out.ReplyTo, out.ThreadID) } return t.client.Publish(out.RoomID, []byte(out.Markdown)) } // Send posts a standalone message into a room. func (t *Transport) Send(_ context.Context, roomID, markdown string) error { return t.client.Publish(roomID, []byte(markdown)) } // Close unsubscribes from every room and releases the unibus client connection. func (t *Transport) Close() error { t.unsubscribeAll() return t.client.Close() } // Sender returns an adapter that satisfies the effects/cron/tools Sender // interface, letting the agent's effects runner, scheduler, and bus_send tool // publish into rooms over this transport. func (t *Transport) Sender() *busSender { return &busSender{t: t} } // unsubscribeAll cancels every active room subscription. func (t *Transport) unsubscribeAll() { t.mu.Lock() subs := t.subscribed t.subscribed = map[string]*client.Sub{} t.mu.Unlock() for roomID, sub := range subs { if err := sub.Unsubscribe(); err != nil { t.logger.Warn("unibus: unsubscribe failed", "room", roomID, "err", err) } } } // roomMemberCount returns the number of members in a room, used to decide // IsDirectMsg. The control plane exposes GET /rooms/{id}/members; the result is // cached per room since membership rarely changes during a conversation. func (t *Transport) roomMemberCount(roomID string) int { t.mu.Lock() if n, ok := t.memberCount[roomID]; ok { t.mu.Unlock() return n } t.mu.Unlock() n, err := t.fetchMemberCount(roomID) if err != nil { t.logger.Warn("unibus: member count fetch failed", "room", roomID, "err", err) return 0 // unknown → treat as not-a-DM (mention still drives the LLM) } t.mu.Lock() t.memberCount[roomID] = n t.mu.Unlock() return n } // memberJSON mirrors the membership server's GET /rooms/{id}/members element. // Only the count matters here, so the body fields are ignored. type memberJSON struct { Endpoint string `json:"endpoint"` } // fetchMemberCount calls the membershipd control plane directly to count the // members of a room. unibus's client does not expose this, and the task forbids // modifying unibus, so the minimal HTTP GET lives here. func (t *Transport) fetchMemberCount(roomID string) (int, error) { url := strings.TrimRight(t.ctrlURL, "/") + "/rooms/" + roomID + "/members" resp, err := t.http.Get(url) if err != nil { return 0, fmt.Errorf("get members: %w", err) } defer resp.Body.Close() if resp.StatusCode >= 300 { return 0, fmt.Errorf("get members: status %d", resp.StatusCode) } var members []memberJSON if err := json.NewDecoder(resp.Body).Decode(&members); err != nil { return 0, fmt.Errorf("decode members: %w", err) } return len(members), nil } // busSender adapts a *Transport to the effects.Sender / cron.Sender / tools // Sender interface (SendText/SendMarkdown/SendReplyMarkdown/SendThreadMarkdown/ // SendTyping). All sends publish into the given room; SendTyping is a no-op // because unibus has no typing-indicator concept. type busSender struct{ t *Transport } func (s *busSender) SendText(_ context.Context, roomID, text string) error { return s.t.client.Publish(roomID, []byte(text)) } func (s *busSender) SendMarkdown(_ context.Context, roomID, markdown string) error { return s.t.client.Publish(roomID, []byte(markdown)) } func (s *busSender) SendReplyMarkdown(_ context.Context, roomID, inReplyTo, markdown string) error { return s.t.client.PublishReply(roomID, []byte(markdown), inReplyTo, "") } func (s *busSender) SendThreadMarkdown(_ context.Context, roomID, threadRootID, inReplyTo, markdown string) error { return s.t.client.PublishReply(roomID, []byte(markdown), inReplyTo, threadRootID) } // SendTyping is a no-op: unibus has no typing indicator. func (s *busSender) SendTyping(_ context.Context, _ string, _ bool) error { return nil }