feat: update orchestrator for enhanced multi-bot management and room discovery
This commit is contained in:
@@ -12,6 +12,9 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"maunium.net/go/mautrix"
|
||||
"maunium.net/go/mautrix/id"
|
||||
|
||||
"github.com/enmanuel/agents/internal/config"
|
||||
"github.com/enmanuel/agents/pkg/decision"
|
||||
coretypes "github.com/enmanuel/agents/pkg/llm"
|
||||
@@ -20,16 +23,31 @@ import (
|
||||
shelllm "github.com/enmanuel/agents/shell/llm"
|
||||
)
|
||||
|
||||
// RoomScanner is a read-only view of Matrix rooms and members.
|
||||
// Satisfied by *mautrix.Client.
|
||||
type RoomScanner interface {
|
||||
JoinedRooms(ctx context.Context) (*mautrix.RespJoinedRooms, error)
|
||||
JoinedMembers(ctx context.Context, roomID id.RoomID) (*mautrix.RespJoinedMembers, error)
|
||||
}
|
||||
|
||||
// Orchestrator coordinates multi-bot rooms. It has no Matrix identity —
|
||||
// it intercepts events before they reach bots and delegates via the bus.
|
||||
type Orchestrator struct {
|
||||
cfg *config.SpecialConfig
|
||||
llm coretypes.CompleteFunc
|
||||
bus *bus.Bus
|
||||
managedRooms map[string][]string // roomID → []botID
|
||||
participants map[string]orchestration.ParticipantInfo // botID → info
|
||||
logger *slog.Logger
|
||||
|
||||
// mu protects managedRooms, participants, and knownBotIDs.
|
||||
mu sync.RWMutex
|
||||
managedRooms map[string][]string // roomID → []botID
|
||||
staticRooms map[string]struct{} // rooms from YAML (never auto-removed)
|
||||
participants map[string]orchestration.ParticipantInfo // botID → info
|
||||
knownBotIDs map[string]string // matrixUserID → botID
|
||||
|
||||
// Scanner for room/member discovery (set via SetScanner after startup)
|
||||
scanner RoomScanner
|
||||
|
||||
// Prompts loaded from files
|
||||
routingPrompt string
|
||||
qualityPrompt string
|
||||
@@ -49,8 +67,13 @@ func New(cfg *config.SpecialConfig, agentBus *bus.Bus, logger *slog.Logger) (*Or
|
||||
}
|
||||
|
||||
managed := make(map[string][]string)
|
||||
static := make(map[string]struct{})
|
||||
for _, room := range cfg.Orchestration.Rooms {
|
||||
if room.RoomID == "" {
|
||||
continue // skip empty room IDs (unset env vars)
|
||||
}
|
||||
managed[room.RoomID] = room.Participants
|
||||
static[room.RoomID] = struct{}{}
|
||||
}
|
||||
|
||||
o := &Orchestrator{
|
||||
@@ -58,7 +81,9 @@ func New(cfg *config.SpecialConfig, agentBus *bus.Bus, logger *slog.Logger) (*Or
|
||||
llm: llmFunc,
|
||||
bus: agentBus,
|
||||
managedRooms: managed,
|
||||
staticRooms: static,
|
||||
participants: make(map[string]orchestration.ParticipantInfo),
|
||||
knownBotIDs: make(map[string]string),
|
||||
logger: logger,
|
||||
seen: make(map[string]bool),
|
||||
}
|
||||
@@ -70,15 +95,115 @@ func New(cfg *config.SpecialConfig, agentBus *bus.Bus, logger *slog.Logger) (*Or
|
||||
return o, nil
|
||||
}
|
||||
|
||||
// SetScanner injects a Matrix client for room/member discovery.
|
||||
// Must be called before ScanExistingRooms.
|
||||
func (o *Orchestrator) SetScanner(s RoomScanner) {
|
||||
o.scanner = s
|
||||
}
|
||||
|
||||
// RegisterParticipant adds bot metadata used for LLM routing decisions.
|
||||
func (o *Orchestrator) RegisterParticipant(info orchestration.ParticipantInfo) {
|
||||
o.mu.Lock()
|
||||
o.participants[info.ID] = info
|
||||
o.logger.Debug("registered participant", "bot", info.ID, "desc", info.Description)
|
||||
if info.MatrixUserID != "" {
|
||||
o.knownBotIDs[info.MatrixUserID] = info.ID
|
||||
}
|
||||
o.mu.Unlock()
|
||||
o.logger.Debug("registered participant", "bot", info.ID, "matrix_uid", info.MatrixUserID)
|
||||
}
|
||||
|
||||
// ScanExistingRooms discovers rooms where ≥2 registered bots are members.
|
||||
// Called once at startup after all participants are registered.
|
||||
func (o *Orchestrator) ScanExistingRooms(ctx context.Context) {
|
||||
if o.scanner == nil {
|
||||
o.logger.Warn("no scanner set, skipping room discovery")
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := o.scanner.JoinedRooms(ctx)
|
||||
if err != nil {
|
||||
o.logger.Error("failed to list joined rooms for discovery", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, roomID := range resp.JoinedRooms {
|
||||
o.evaluateRoom(ctx, roomID.String())
|
||||
}
|
||||
|
||||
o.mu.RLock()
|
||||
count := len(o.managedRooms)
|
||||
o.mu.RUnlock()
|
||||
o.logger.Info("room discovery complete", "managed_rooms", count)
|
||||
}
|
||||
|
||||
// evaluateRoom checks if a room has ≥2 registered bots and updates managedRooms.
|
||||
func (o *Orchestrator) evaluateRoom(ctx context.Context, roomID string) {
|
||||
if o.scanner == nil {
|
||||
return
|
||||
}
|
||||
|
||||
members, err := o.scanner.JoinedMembers(ctx, id.RoomID(roomID))
|
||||
if err != nil {
|
||||
o.logger.Warn("evaluateRoom: failed to fetch members", "room", roomID, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Collect which registered bots are in this room
|
||||
o.mu.RLock()
|
||||
var presentBots []string
|
||||
for matrixUID, botID := range o.knownBotIDs {
|
||||
if _, ok := members.Joined[id.UserID(matrixUID)]; ok {
|
||||
presentBots = append(presentBots, botID)
|
||||
}
|
||||
}
|
||||
_, isStatic := o.staticRooms[roomID]
|
||||
o.mu.RUnlock()
|
||||
|
||||
// Static rooms (from YAML) are never auto-managed
|
||||
if isStatic {
|
||||
return
|
||||
}
|
||||
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
|
||||
if len(presentBots) >= 2 {
|
||||
prev, already := o.managedRooms[roomID]
|
||||
if !already {
|
||||
o.managedRooms[roomID] = presentBots
|
||||
o.logger.Info("auto-managing room", "room", roomID, "bots", presentBots)
|
||||
} else if len(prev) != len(presentBots) {
|
||||
o.managedRooms[roomID] = presentBots
|
||||
o.logger.Info("updated room participants", "room", roomID, "bots", presentBots)
|
||||
}
|
||||
} else {
|
||||
if _, was := o.managedRooms[roomID]; was {
|
||||
delete(o.managedRooms, roomID)
|
||||
o.logger.Info("stopped managing room", "room", roomID, "remaining_bots", len(presentBots))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyMembership is called by bot listeners when a room membership changes.
|
||||
// It re-evaluates whether the room should be auto-managed.
|
||||
func (o *Orchestrator) NotifyMembership(ctx context.Context, roomID, userID, membership string) {
|
||||
o.mu.RLock()
|
||||
_, isBot := o.knownBotIDs[userID]
|
||||
o.mu.RUnlock()
|
||||
if !isBot {
|
||||
return // only care about bot membership changes
|
||||
}
|
||||
|
||||
o.logger.Debug("bot membership change, re-evaluating room",
|
||||
"room", roomID, "user", userID, "membership", membership)
|
||||
go o.evaluateRoom(ctx, roomID)
|
||||
}
|
||||
|
||||
// ShouldIntercept returns true if the room is managed by this orchestrator.
|
||||
func (o *Orchestrator) ShouldIntercept(roomID string) bool {
|
||||
o.mu.RLock()
|
||||
_, ok := o.managedRooms[roomID]
|
||||
o.mu.RUnlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
@@ -91,6 +216,14 @@ func (o *Orchestrator) Intercept(ctx context.Context, msgCtx decision.MessageCon
|
||||
return false
|
||||
}
|
||||
|
||||
// Ignore messages from known bots to prevent feedback loops.
|
||||
o.mu.RLock()
|
||||
_, senderIsBot := o.knownBotIDs[msgCtx.SenderID]
|
||||
o.mu.RUnlock()
|
||||
if senderIsBot {
|
||||
return true // suppress but don't route — bot's own message
|
||||
}
|
||||
|
||||
// Dedup: multiple bots receive the same event. Only route once.
|
||||
key := msgCtx.RoomID + ":" + msgCtx.SenderID + ":" + msgCtx.Content
|
||||
o.seenMu.Lock()
|
||||
@@ -119,7 +252,11 @@ func (o *Orchestrator) Intercept(ctx context.Context, msgCtx decision.MessageCon
|
||||
// Route is the main entry point. Called when a human posts in a managed room.
|
||||
// It decides which bot(s) should respond and dispatches tasks via the bus.
|
||||
func (o *Orchestrator) Route(ctx context.Context, msgCtx decision.MessageContext) error {
|
||||
o.mu.RLock()
|
||||
participants, ok := o.managedRooms[msgCtx.RoomID]
|
||||
participantsCopy := append([]string(nil), participants...)
|
||||
o.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("room %s is not managed", msgCtx.RoomID)
|
||||
}
|
||||
@@ -127,14 +264,14 @@ func (o *Orchestrator) Route(ctx context.Context, msgCtx decision.MessageContext
|
||||
o.logger.Info("orchestrating message",
|
||||
"room", msgCtx.RoomID,
|
||||
"sender", msgCtx.SenderID,
|
||||
"participants", participants,
|
||||
"participants", participantsCopy,
|
||||
"content_preview", truncate(msgCtx.Content, 80),
|
||||
)
|
||||
|
||||
// Optimization: single bot → dispatch directly without LLM
|
||||
if len(participants) == 1 {
|
||||
o.logger.Debug("single participant, dispatching directly", "bot", participants[0])
|
||||
_, err := o.dispatchAndWait(ctx, participants[0], msgCtx, 0, nil)
|
||||
if len(participantsCopy) == 1 {
|
||||
o.logger.Debug("single participant, dispatching directly", "bot", participantsCopy[0])
|
||||
_, err := o.dispatchAndWait(ctx, participantsCopy[0], msgCtx, 0, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -151,10 +288,10 @@ func (o *Orchestrator) Route(ctx context.Context, msgCtx decision.MessageContext
|
||||
var err error
|
||||
|
||||
if i == 0 {
|
||||
rd, routeErr := o.routeInitial(ctx, msgCtx.Content, participants)
|
||||
rd, routeErr := o.routeInitial(ctx, msgCtx.Content, participantsCopy)
|
||||
if routeErr != nil {
|
||||
o.logger.Error("routing failed, falling back to first participant", "err", routeErr)
|
||||
target = participants[0]
|
||||
target = participantsCopy[0]
|
||||
} else {
|
||||
target = rd.TargetBotID
|
||||
o.logger.Info("routed to bot",
|
||||
@@ -165,7 +302,7 @@ func (o *Orchestrator) Route(ctx context.Context, msgCtx decision.MessageContext
|
||||
)
|
||||
}
|
||||
} else {
|
||||
rd, routeErr := o.routeRefinement(ctx, msgCtx.Content, responses, participants, lastBot)
|
||||
rd, routeErr := o.routeRefinement(ctx, msgCtx.Content, responses, participantsCopy, lastBot)
|
||||
if routeErr != nil {
|
||||
o.logger.Warn("refinement routing failed, stopping pipeline", "err", routeErr)
|
||||
break
|
||||
@@ -304,6 +441,8 @@ func (o *Orchestrator) loadPrompts() error {
|
||||
|
||||
// buildParticipantsList formats participant info for LLM prompts.
|
||||
func (o *Orchestrator) buildParticipantsList(botIDs []string, exclude string) string {
|
||||
o.mu.RLock()
|
||||
defer o.mu.RUnlock()
|
||||
var sb strings.Builder
|
||||
for _, id := range botIDs {
|
||||
if id == exclude {
|
||||
|
||||
Reference in New Issue
Block a user