// Package orchestration implements the multi-bot orchestrator runtime. // The orchestrator intercepts Matrix events in managed rooms and coordinates // which bot responds via the in-process bus. package orchestration import ( "context" "fmt" "log/slog" "os" "path/filepath" "strings" "sync" "maunium.net/go/mautrix" "maunium.net/go/mautrix/id" "github.com/enmanuel/agents/internal/config" "github.com/enmanuel/agents/pkg/decision" coretypes "github.com/enmanuel/agents/pkg/llm" "github.com/enmanuel/agents/pkg/orchestration" "github.com/enmanuel/agents/shell/bus" shelllm "github.com/enmanuel/agents/shell/llm" ) // RoomScanner is a read-only view of Matrix rooms and members. // Satisfied by *mautrix.Client. type RoomScanner interface { JoinedRooms(ctx context.Context) (*mautrix.RespJoinedRooms, error) JoinedMembers(ctx context.Context, roomID id.RoomID) (*mautrix.RespJoinedMembers, error) } // Orchestrator coordinates multi-bot rooms. It has no Matrix identity — // it intercepts events before they reach bots and delegates via the bus. type Orchestrator struct { cfg *config.SpecialConfig llm coretypes.CompleteFunc bus *bus.Bus logger *slog.Logger // mu protects managedRooms, participants, and knownBotIDs. mu sync.RWMutex managedRooms map[string][]string // roomID → []botID staticRooms map[string]struct{} // rooms from YAML (never auto-removed) participants map[string]orchestration.ParticipantInfo // botID → info knownBotIDs map[string]string // matrixUserID → botID // Scanner for room/member discovery (set via SetScanner after startup) scanner RoomScanner // Prompts loaded from files routingPrompt string qualityPrompt string refinementPrompt string // Dedup: multiple bots in the same room will each trigger Intercept(). // We use a set of "room:sender:content" keys to ensure only one fires. seenMu sync.Mutex seen map[string]bool } // New creates an Orchestrator from its config. func New(cfg *config.SpecialConfig, agentBus *bus.Bus, logger *slog.Logger) (*Orchestrator, error) { llmFunc, err := shelllm.FromConfig(cfg.LLM.Primary, logger.With("component", "llm")) if err != nil { return nil, fmt.Errorf("orchestrator LLM: %w", err) } managed := make(map[string][]string) static := make(map[string]struct{}) for _, room := range cfg.Orchestration.Rooms { if room.RoomID == "" { continue // skip empty room IDs (unset env vars) } managed[room.RoomID] = room.Participants static[room.RoomID] = struct{}{} } o := &Orchestrator{ cfg: cfg, llm: llmFunc, bus: agentBus, managedRooms: managed, staticRooms: static, participants: make(map[string]orchestration.ParticipantInfo), knownBotIDs: make(map[string]string), logger: logger, seen: make(map[string]bool), } if err := o.loadPrompts(); err != nil { return nil, fmt.Errorf("load prompts: %w", err) } return o, nil } // SetScanner injects a Matrix client for room/member discovery. // Must be called before ScanExistingRooms. func (o *Orchestrator) SetScanner(s RoomScanner) { o.scanner = s } // RegisterParticipant adds bot metadata used for LLM routing decisions. func (o *Orchestrator) RegisterParticipant(info orchestration.ParticipantInfo) { o.mu.Lock() o.participants[info.ID] = info if info.MatrixUserID != "" { o.knownBotIDs[info.MatrixUserID] = info.ID } o.mu.Unlock() o.logger.Debug("registered participant", "bot", info.ID, "matrix_uid", info.MatrixUserID) } // ScanExistingRooms discovers rooms where ≥2 registered bots are members. // Called once at startup after all participants are registered. func (o *Orchestrator) ScanExistingRooms(ctx context.Context) { if o.scanner == nil { o.logger.Warn("no scanner set, skipping room discovery") return } resp, err := o.scanner.JoinedRooms(ctx) if err != nil { o.logger.Error("failed to list joined rooms for discovery", "err", err) return } for _, roomID := range resp.JoinedRooms { o.evaluateRoom(ctx, roomID.String()) } o.mu.RLock() count := len(o.managedRooms) o.mu.RUnlock() o.logger.Info("room discovery complete", "managed_rooms", count) } // evaluateRoom checks if a room has ≥2 registered bots and updates managedRooms. func (o *Orchestrator) evaluateRoom(ctx context.Context, roomID string) { if o.scanner == nil { return } members, err := o.scanner.JoinedMembers(ctx, id.RoomID(roomID)) if err != nil { o.logger.Warn("evaluateRoom: failed to fetch members", "room", roomID, "err", err) return } // Collect which registered bots are in this room o.mu.RLock() var presentBots []string for matrixUID, botID := range o.knownBotIDs { if _, ok := members.Joined[id.UserID(matrixUID)]; ok { presentBots = append(presentBots, botID) } } _, isStatic := o.staticRooms[roomID] o.mu.RUnlock() // Static rooms (from YAML) are never auto-managed if isStatic { return } o.mu.Lock() defer o.mu.Unlock() if len(presentBots) >= 2 { prev, already := o.managedRooms[roomID] if !already { o.managedRooms[roomID] = presentBots o.logger.Info("auto-managing room", "room", roomID, "bots", presentBots) } else if len(prev) != len(presentBots) { o.managedRooms[roomID] = presentBots o.logger.Info("updated room participants", "room", roomID, "bots", presentBots) } } else { if _, was := o.managedRooms[roomID]; was { delete(o.managedRooms, roomID) o.logger.Info("stopped managing room", "room", roomID, "remaining_bots", len(presentBots)) } } } // NotifyMembership is called by bot listeners when a room membership changes. // It re-evaluates whether the room should be auto-managed. func (o *Orchestrator) NotifyMembership(ctx context.Context, roomID, userID, membership string) { o.mu.RLock() _, isBot := o.knownBotIDs[userID] o.mu.RUnlock() if !isBot { return // only care about bot membership changes } o.logger.Debug("bot membership change, re-evaluating room", "room", roomID, "user", userID, "membership", membership) go o.evaluateRoom(ctx, roomID) } // ShouldIntercept returns true if the room is managed by this orchestrator. func (o *Orchestrator) ShouldIntercept(roomID string) bool { o.mu.RLock() _, ok := o.managedRooms[roomID] o.mu.RUnlock() return ok } // Intercept is the InterceptFunc used by bot listeners. It checks if the // room is managed and, if so, starts the orchestration pipeline asynchronously. // Returns true if the event was intercepted (all bots in the room should return true, // but only the first one triggers actual routing — the rest are deduped). func (o *Orchestrator) Intercept(ctx context.Context, msgCtx decision.MessageContext) bool { if !o.ShouldIntercept(msgCtx.RoomID) { return false } // Ignore messages from known bots to prevent feedback loops. o.mu.RLock() _, senderIsBot := o.knownBotIDs[msgCtx.SenderID] o.mu.RUnlock() if senderIsBot { return true // suppress but don't route — bot's own message } // Dedup: multiple bots receive the same event. Only route once. key := msgCtx.RoomID + ":" + msgCtx.SenderID + ":" + msgCtx.Content o.seenMu.Lock() if o.seen[key] { o.seenMu.Unlock() return true // still intercept (don't let the bot handle it) but don't route again } o.seen[key] = true o.seenMu.Unlock() // Route asynchronously so the listener isn't blocked. // Clean up the dedup key after routing completes. go func() { defer func() { o.seenMu.Lock() delete(o.seen, key) o.seenMu.Unlock() }() if err := o.Route(ctx, msgCtx); err != nil { o.logger.Error("orchestration failed", "room", msgCtx.RoomID, "err", err) } }() return true } // Route is the main entry point. Called when a human posts in a managed room. // It decides which bot(s) should respond and dispatches tasks via the bus. func (o *Orchestrator) Route(ctx context.Context, msgCtx decision.MessageContext) error { o.mu.RLock() participants, ok := o.managedRooms[msgCtx.RoomID] participantsCopy := append([]string(nil), participants...) o.mu.RUnlock() if !ok { return fmt.Errorf("room %s is not managed", msgCtx.RoomID) } o.logger.Info("orchestrating message", "room", msgCtx.RoomID, "sender", msgCtx.SenderID, "participants", participantsCopy, "content_preview", truncate(msgCtx.Content, 80), ) // Optimization: single bot → dispatch directly without LLM if len(participantsCopy) == 1 { o.logger.Debug("single participant, dispatching directly", "bot", participantsCopy[0]) _, err := o.dispatchAndWait(ctx, participantsCopy[0], msgCtx, 0, nil) return err } var responses []orchestration.BotResponse var lastBot string maxIter := o.cfg.Orchestration.MaxIterations if maxIter <= 0 { maxIter = 3 } for i := 0; i < maxIter; i++ { // Route: decide which bot responds var target string var err error if i == 0 { rd, routeErr := o.routeInitial(ctx, msgCtx.Content, participantsCopy) if routeErr != nil { o.logger.Error("routing failed, falling back to first participant", "err", routeErr) target = participantsCopy[0] } else { target = rd.TargetBotID o.logger.Info("routed to bot", "bot", target, "confidence", rd.Confidence, "reason", rd.Reason, "iteration", i, ) } } else { rd, routeErr := o.routeRefinement(ctx, msgCtx.Content, responses, participantsCopy, lastBot) if routeErr != nil { o.logger.Warn("refinement routing failed, stopping pipeline", "err", routeErr) break } target = rd.TargetBotID o.logger.Info("refinement routed to bot", "bot", target, "reason", rd.Reason, "iteration", i, ) } // Dispatch: send TaskEvent to bot via bus and wait for response response, err := o.dispatchAndWait(ctx, target, msgCtx, i, responses) if err != nil { o.logger.Error("dispatch failed", "bot", target, "err", err) break } responses = append(responses, response) lastBot = target o.logger.Info("bot responded", "bot", target, "response_len", len(response.Text), "iteration", i, ) // Fallback: detect circular conversations before quality evaluation if o.detectRepetition(responses) { o.logger.Warn("repetition detected, stopping pipeline to prevent circular conversation", "iteration", i+1, "total_responses", len(responses), ) break } // Evaluate quality (Fase 3) score := o.evaluate(ctx, msgCtx.Content, response) o.logger.Info("quality evaluated", "score", score.Score, "continue", score.Continue, "reason", score.Reason, "iteration", i, ) if score.Score >= o.cfg.Orchestration.QualityThreshold || !score.Continue { o.logger.Info("pipeline complete", "iterations", i+1, "final_score", score.Score, ) break } } return nil } // dispatchAndWait sends a TaskEvent to a bot and waits for its response. func (o *Orchestrator) dispatchAndWait( ctx context.Context, botID string, msgCtx decision.MessageContext, iteration int, previousResponses []orchestration.BotResponse, ) (orchestration.BotResponse, error) { taskID := fmt.Sprintf("orch-%s-%s-%d", msgCtx.RoomID, botID, iteration) task := orchestration.TaskEvent{ TaskID: taskID, TargetBotID: botID, TargetRoomID: msgCtx.RoomID, OriginalSender: msgCtx.SenderID, OriginalQuestion: msgCtx.Content, Iteration: iteration, PreviousResponses: previousResponses, } taskJSON, err := orchestration.MarshalTaskEvent(task) if err != nil { return orchestration.BotResponse{}, fmt.Errorf("marshal task: %w", err) } msg := bus.AgentMessage{ From: bus.AgentID(o.cfg.Special.ID), To: bus.AgentID(botID), Kind: bus.KindTask, Payload: map[string]string{"task_json": taskJSON}, } timeout := o.cfg.Orchestration.DelegationTimeout if timeout <= 0 { timeout = 30_000_000_000 // 30s default } reply, err := o.bus.SendAndWait(ctx, msg, taskID, timeout) if err != nil { return orchestration.BotResponse{}, err } resultJSON, ok := reply.Payload["result_json"] if !ok { return orchestration.BotResponse{}, fmt.Errorf("reply missing result_json") } result, err := orchestration.UnmarshalTaskResult(resultJSON) if err != nil { return orchestration.BotResponse{}, fmt.Errorf("unmarshal result: %w", err) } if result.Error != "" { return orchestration.BotResponse{}, fmt.Errorf("bot %s error: %s", botID, result.Error) } return orchestration.BotResponse{ BotID: botID, Text: result.Text, }, nil } // loadPrompts reads the orchestrator's prompt files. func (o *Orchestrator) loadPrompts() error { base := filepath.Join("agents", "specials", "orchestrator", "prompts") routing, err := os.ReadFile(filepath.Join(base, "routing.md")) if err != nil { return fmt.Errorf("routing prompt: %w", err) } o.routingPrompt = string(routing) quality, err := os.ReadFile(filepath.Join(base, "quality.md")) if err != nil { return fmt.Errorf("quality prompt: %w", err) } o.qualityPrompt = string(quality) refinement, err := os.ReadFile(filepath.Join(base, "refinement.md")) if err != nil { return fmt.Errorf("refinement prompt: %w", err) } o.refinementPrompt = string(refinement) return nil } // buildParticipantsList formats participant info for LLM prompts. func (o *Orchestrator) buildParticipantsList(botIDs []string, exclude string) string { o.mu.RLock() defer o.mu.RUnlock() var sb strings.Builder for _, id := range botIDs { if id == exclude { continue } info, ok := o.participants[id] if !ok { sb.WriteString(fmt.Sprintf("- %s: (no description available)\n", id)) continue } caps := "" if len(info.Capabilities) > 0 { caps = fmt.Sprintf(" (capabilities: %s)", strings.Join(info.Capabilities, ", ")) } sb.WriteString(fmt.Sprintf("- %s: %s%s\n", info.ID, info.Description, caps)) } return sb.String() } // detectRepetition checks if a new response is too similar to previous responses, // indicating a circular conversation that should be stopped. // Returns true if the conversation should be terminated. func (o *Orchestrator) detectRepetition(responses []orchestration.BotResponse) bool { if len(responses) < 2 { return false } threshold := o.cfg.Orchestration.RepetitionThreshold if threshold <= 0 { threshold = 0.6 // default } latest := responses[len(responses)-1].Text for i := 0; i < len(responses)-1; i++ { if similarity(latest, responses[i].Text) >= threshold { return true } } return false } // similarity computes a simple bigram-based similarity ratio between two strings. // Returns a value between 0.0 (completely different) and 1.0 (identical). func similarity(a, b string) float64 { if a == b { return 1.0 } a = strings.ToLower(strings.TrimSpace(a)) b = strings.ToLower(strings.TrimSpace(b)) if a == b { return 1.0 } if len(a) < 2 || len(b) < 2 { return 0.0 } bigramsA := makeBigrams(a) bigramsB := makeBigrams(b) // Count intersection intersection := 0 for bg, countA := range bigramsA { if countB, ok := bigramsB[bg]; ok { if countA < countB { intersection += countA } else { intersection += countB } } } totalA := 0 for _, c := range bigramsA { totalA += c } totalB := 0 for _, c := range bigramsB { totalB += c } if totalA+totalB == 0 { return 0.0 } return float64(2*intersection) / float64(totalA+totalB) } func makeBigrams(s string) map[string]int { runes := []rune(s) bgs := make(map[string]int, len(runes)) for i := 0; i < len(runes)-1; i++ { bg := string(runes[i : i+2]) bgs[bg]++ } return bgs } func truncate(s string, n int) string { runes := []rune(s) if len(runes) <= n { return s } return string(runes[:n]) + "..." }