feat: implement multi-bot orchestration system with LLM routing

Implementa el sistema de orquestación para salas Matrix con múltiples bots.
El orquestador es un "special agent" sin identidad Matrix que coordina qué bot
responde y cuándo, usando LLM (Claude) para routing y evaluación de calidad.

Cambios principales:
- pkg/orchestration/task.go: tipos puros (TaskEvent, BotResponse, QualityScore, RoutingDecision)
- shell/orchestration/: runtime del orquestador (orchestrator.go, router.go, evaluator.go)
- agents/specials/orchestrator/: config + prompts (routing, quality, refinement)
- internal/config/: SpecialConfig, OrchestrationCfg, LoadSpecial()
- shell/bus/bus.go: protocolo request-reply (SendAndWait, Reply) para delegación
- shell/matrix/listener.go: InterceptFunc para interceptar eventos en salas orquestadas
- agents/runtime.go: SetBus, listenBus, handleTaskEvent para recibir tareas del orquestador
- cmd/launcher/main.go: creación de bus compartido, arranque del orquestador antes de bots

Incluye deduplicación para evitar que múltiples listeners en la misma sala
disparen el orquestador más de una vez por mensaje.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-06 09:05:42 +00:00
parent 6bef4283c6
commit 2667af52cc
14 changed files with 1001 additions and 7 deletions
+143
View File
@@ -16,7 +16,9 @@ import (
"github.com/enmanuel/agents/pkg/decision"
coretypes "github.com/enmanuel/agents/pkg/llm"
"github.com/enmanuel/agents/pkg/memory"
"github.com/enmanuel/agents/pkg/orchestration"
"github.com/enmanuel/agents/pkg/personality"
"github.com/enmanuel/agents/shell/bus"
"github.com/enmanuel/agents/shell/effects"
shelllm "github.com/enmanuel/agents/shell/llm"
"github.com/enmanuel/agents/shell/matrix"
@@ -49,6 +51,9 @@ type Agent struct {
memStore memory.Store // nil when memory is disabled
windowSize int
roomCtx *tools.RoomContext
// Bus — set via SetBus() when running under the unified launcher
agentBus *bus.Bus
}
// ClearWindow resets the conversation window for a room and deletes persisted
@@ -181,6 +186,17 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (*
return a, nil
}
// SetBus attaches the agent to the inter-agent bus for orchestration.
// Must be called before Run().
func (a *Agent) SetBus(b *bus.Bus) {
a.agentBus = b
}
// SetInterceptor configures the listener to skip events in orchestrated rooms.
func (a *Agent) SetInterceptor(fn matrix.InterceptFunc) {
a.listener.SetInterceptor(fn)
}
// Run starts the agent sync loop. Blocks until ctx is cancelled.
func (a *Agent) Run(ctx context.Context) error {
if a.cryptoStore != nil {
@@ -194,9 +210,136 @@ func (a *Agent) Run(ctx context.Context) error {
"name", a.cfg.Agent.Name,
"tools", a.toolReg.Names(),
)
// Start bus listener if connected to the orchestration bus
if a.agentBus != nil {
ch := a.agentBus.Subscribe(bus.AgentID(a.cfg.Agent.ID))
go a.listenBus(ctx, ch)
a.logger.Info("bus listener started")
}
return a.listener.Run(ctx)
}
// listenBus processes messages from the inter-agent bus.
func (a *Agent) listenBus(ctx context.Context, ch <-chan bus.AgentMessage) {
for {
select {
case <-ctx.Done():
return
case msg, ok := <-ch:
if !ok {
return
}
if msg.Kind == bus.KindTask {
a.handleTaskEvent(ctx, msg)
}
}
}
}
// handleTaskEvent processes a task delegated by the orchestrator.
// The bot generates a response and sends it both to Matrix and back via bus.
func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) {
taskJSON, ok := msg.Payload["task_json"]
if !ok {
a.logger.Error("task message missing task_json payload")
return
}
task, err := orchestration.UnmarshalTaskEvent(taskJSON)
if err != nil {
a.logger.Error("failed to unmarshal task event", "err", err)
return
}
a.logger.Info("handling orchestrated task",
"task_id", task.TaskID,
"room", task.TargetRoomID,
"sender", task.OriginalSender,
"iteration", task.Iteration,
)
roomID := task.TargetRoomID
// Update room context for memory tools
a.roomCtx.Set(roomID)
if a.cfg.Personality.Behavior.TypingIndicator {
_ = a.matrix.SendTyping(ctx, roomID, true)
defer a.matrix.SendTyping(ctx, roomID, false)
}
// Build a synthetic MessageContext from the task
msgCtx := decision.MessageContext{
SenderID: task.OriginalSender,
RoomID: roomID,
Content: task.OriginalQuestion,
IsDirectMsg: false,
IsMention: true, // treat orchestrated tasks like mentions
}
// If there are previous responses, prepend context
if len(task.PreviousResponses) > 0 {
var context string
for _, pr := range task.PreviousResponses {
context += fmt.Sprintf("[Previous response from %s]: %s\n\n", pr.BotID, pr.Text)
}
msgCtx.Content = context + "Original question: " + task.OriginalQuestion +
"\n\nPlease provide an improved or complementary answer."
}
// Load memory and run LLM
a.ensureWindowLoaded(ctx, roomID)
a.appendToWindow(roomID, coretypes.Message{
Role: coretypes.RoleUser, Content: msgCtx.Content,
})
reply, err := a.runLLM(ctx, msgCtx)
// Build the result to send back via bus
result := orchestration.TaskResult{
TaskID: task.TaskID,
BotID: a.cfg.Agent.ID,
}
if err != nil {
a.logger.Error("LLM error during orchestrated task", "err", err)
result.Error = err.Error()
reply = "Sorry, I encountered an error."
} else {
result.Text = reply
// Persist assistant reply
a.appendToWindow(roomID, coretypes.Message{
Role: coretypes.RoleAssistant, Content: reply,
})
a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply)
}
// Send reply to Matrix room
if sendErr := a.matrix.SendText(ctx, roomID, reply); sendErr != nil {
a.logger.Error("failed to send orchestrated reply to Matrix", "err", sendErr)
}
// Send result back to orchestrator via bus
resultJSON, marshalErr := orchestration.MarshalTaskResult(result)
if marshalErr != nil {
a.logger.Error("failed to marshal task result", "err", marshalErr)
return
}
replyMsg := bus.AgentMessage{
From: bus.AgentID(a.cfg.Agent.ID),
To: msg.From,
Kind: bus.KindTaskResult,
Payload: map[string]string{"result_json": resultJSON},
}
if busErr := a.agentBus.Reply(task.TaskID, replyMsg); busErr != nil {
a.logger.Error("failed to send task result via bus", "err", busErr)
}
}
// handleEvent is called by the matrix Listener for each filtered incoming event.
func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) {
a.logger.Debug("handling event",
+23
View File
@@ -0,0 +1,23 @@
special:
id: orchestrator
type: orchestrator
enabled: true
description: "Middleware de coordinación multi-bot. Sin identidad Matrix."
llm:
primary:
provider: anthropic
model: claude-sonnet-4-6
api_key_env: ANTHROPIC_API_KEY
max_tokens: 512
temperature: 0.2
orchestration:
max_iterations: 3
quality_threshold: 0.8
delegation_timeout: 30s
rooms:
- room_id: "${MATRIX_ROOM_SHARED}"
participants:
- assistant-bot
- asistente-2
@@ -0,0 +1,11 @@
You are a quality evaluator for AI agent responses. Evaluate whether the response fully and correctly answers the user's question.
Criteria:
- Accuracy: Is the information correct?
- Completeness: Does it address all parts of the question?
- Usefulness: Is the response actionable and helpful?
Respond ONLY with valid JSON (no markdown, no extra text):
{"score": <0.0-1.0>, "continue": <true|false>, "reason": "<brief explanation>"}
Set "continue" to true only if the response is clearly incomplete or incorrect and another agent could do better.
@@ -0,0 +1,10 @@
The previous response needs improvement. Choose the best agent to complement or improve the answer.
Available agents (the previous respondent has been excluded):
{{PARTICIPANTS}}
Previous response that needs improvement:
{{LAST_RESPONSE}}
Respond ONLY with valid JSON (no markdown, no extra text):
{"bot_id": "<agent_id>", "reason": "<brief explanation of why this agent can improve>"}
@@ -0,0 +1,9 @@
You are an AI agent coordinator. Your job is to decide which agent should respond to a user's question.
Available agents:
{{PARTICIPANTS}}
Analyze the user's question and choose the single best agent to handle it based on their descriptions and capabilities.
Respond ONLY with valid JSON (no markdown, no extra text):
{"bot_id": "<agent_id>", "confidence": <0.0-1.0>, "reason": "<brief explanation>"}
+68
View File
@@ -22,6 +22,9 @@ import (
asistente2agent "github.com/enmanuel/agents/agents/asistente-2"
"github.com/enmanuel/agents/internal/config"
"github.com/enmanuel/agents/pkg/decision"
"github.com/enmanuel/agents/pkg/orchestration"
"github.com/enmanuel/agents/shell/bus"
orchshell "github.com/enmanuel/agents/shell/orchestration"
)
// rulesRegistry maps agent IDs to their rule factories.
@@ -58,6 +61,21 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// ── Shared bus for inter-agent communication ──
agentBus := bus.New()
// ── Start special agents (orchestrator, etc.) BEFORE normal bots ──
orch, err := startOrchestrator(agentBus, logger)
if err != nil {
// Non-fatal: orchestration is optional
logger.Warn("orchestrator not started", "err", err)
} else {
logger.Info("orchestrator ready",
"managed_rooms", len(orch.cfg.Orchestration.Rooms),
)
}
// ── Start normal agents ──
var wg sync.WaitGroup
for _, path := range configPaths {
path := path
@@ -80,6 +98,25 @@ func main() {
continue
}
// Connect agent to bus for orchestration
a.SetBus(agentBus)
// If orchestrator is active, set interceptor so bots don't
// handle events directly in orchestrated rooms.
// The first bot's listener to receive the event will trigger orchestration.
if orch != nil {
a.SetInterceptor(orch.orchestrator.Intercept)
}
// Register this agent as a participant in the orchestrator
if orch != nil {
orch.orchestrator.RegisterParticipant(orchestration.ParticipantInfo{
ID: cfg.Agent.ID,
Description: cfg.Agent.Description,
Capabilities: cfg.Agent.Tags,
})
}
wg.Add(1)
go func() {
defer wg.Done()
@@ -106,6 +143,37 @@ func main() {
}
}
// orchHandle wraps a running orchestrator with its config for the launcher.
type orchHandle struct {
orchestrator *orchshell.Orchestrator
cfg *config.SpecialConfig
}
// startOrchestrator scans agents/specials/orchestrator/config.yaml and
// initializes the orchestrator if found and enabled.
func startOrchestrator(agentBus *bus.Bus, logger *slog.Logger) (*orchHandle, error) {
cfgPath := filepath.Join("agents", "specials", "orchestrator", "config.yaml")
if _, err := os.Stat(cfgPath); os.IsNotExist(err) {
return nil, err
}
cfg, err := config.LoadSpecial(cfgPath)
if err != nil {
return nil, err
}
if !cfg.Special.Enabled {
return nil, nil
}
orchLogger := logger.With("component", "orchestrator")
orch, err := orchshell.New(cfg, agentBus, orchLogger)
if err != nil {
return nil, err
}
return &orchHandle{orchestrator: orch, cfg: cfg}, nil
}
func rulesFor(agentID string, logger *slog.Logger) []decision.Rule {
factory, ok := rulesRegistry[agentID]
if !ok {
+36
View File
@@ -47,6 +47,42 @@ func LoadMeta(path string) (*AgentConfig, error) {
return &cfg, nil
}
// LoadSpecial reads and parses a special agent config file.
// Special agents have no Matrix identity so validation is lighter.
func LoadSpecial(path string) (*SpecialConfig, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("read special config %s: %w", path, err)
}
expanded := os.ExpandEnv(string(data))
var cfg SpecialConfig
if err := yaml.Unmarshal([]byte(expanded), &cfg); err != nil {
return nil, fmt.Errorf("parse special config %s: %w", path, err)
}
if err := validateSpecial(&cfg); err != nil {
return nil, fmt.Errorf("invalid special config %s: %w", path, err)
}
return &cfg, nil
}
// validateSpecial applies sanity checks for special agent configs.
func validateSpecial(cfg *SpecialConfig) error {
if cfg.Special.ID == "" {
return fmt.Errorf("special.id is required")
}
if cfg.Special.Type == "" {
return fmt.Errorf("special.type is required")
}
if cfg.LLM.Primary.Provider == "" {
return fmt.Errorf("llm.primary.provider is required")
}
return nil
}
// validate applies basic sanity checks.
func validate(cfg *AgentConfig) error {
if cfg.Agent.ID == "" {
+31
View File
@@ -398,3 +398,34 @@ type MemoryCfg struct {
type MemoryToolCfg struct {
Enabled bool `yaml:"enabled"`
}
// ── Special Agents ────────────────────────────────────────────────────────
// SpecialConfig is the root configuration for a special agent (no Matrix identity).
type SpecialConfig struct {
Special SpecialMeta `yaml:"special"`
LLM LLMCfg `yaml:"llm"`
Orchestration OrchestrationCfg `yaml:"orchestration"`
}
// SpecialMeta identifies a special agent.
type SpecialMeta struct {
ID string `yaml:"id"`
Type string `yaml:"type"` // "orchestrator", "scheduler", etc.
Enabled bool `yaml:"enabled"`
Description string `yaml:"description"`
}
// OrchestrationCfg configures the multi-bot orchestrator.
type OrchestrationCfg struct {
MaxIterations int `yaml:"max_iterations"`
QualityThreshold float64 `yaml:"quality_threshold"`
DelegationTimeout time.Duration `yaml:"delegation_timeout"`
Rooms []OrchestratedRoomCfg `yaml:"rooms"`
}
// OrchestratedRoomCfg defines a room managed by the orchestrator.
type OrchestratedRoomCfg struct {
RoomID string `yaml:"room_id"`
Participants []string `yaml:"participants"` // bot IDs that participate in this room
}
+91
View File
@@ -0,0 +1,91 @@
// Package orchestration defines pure types for multi-bot coordination.
// Zero side effects — only data structures and helpers.
package orchestration
import "encoding/json"
// TaskEvent is sent by the orchestrator to a bot via the bus.
// It tells the bot: "answer this question in this room with this context."
type TaskEvent struct {
TaskID string `json:"task_id"`
TargetBotID string `json:"target_bot_id"`
TargetRoomID string `json:"target_room_id"`
OriginalSender string `json:"original_sender"`
OriginalQuestion string `json:"original_question"`
Iteration int `json:"iteration"`
PreviousResponses []BotResponse `json:"previous_responses,omitempty"`
RoomContext []ContextMessage `json:"room_context,omitempty"`
}
// BotResponse is a bot's reply to a TaskEvent.
type BotResponse struct {
BotID string `json:"bot_id"`
Text string `json:"text"`
}
// TaskResult is sent by a bot back to the orchestrator via the bus.
type TaskResult struct {
TaskID string `json:"task_id"`
BotID string `json:"bot_id"`
Text string `json:"text"`
Error string `json:"error,omitempty"`
}
// QualityScore is the LLM's evaluation of a bot's response.
type QualityScore struct {
Score float64 `json:"score"` // 0.01.0
Continue bool `json:"continue"` // should the pipeline continue?
Reason string `json:"reason"`
}
// RoutingDecision is the LLM's choice of which bot should respond.
type RoutingDecision struct {
TargetBotID string `json:"bot_id"`
Confidence float64 `json:"confidence"`
Reason string `json:"reason"`
}
// ContextMessage is a single message from the room's recent history.
type ContextMessage struct {
SenderID string `json:"sender_id"`
Content string `json:"content"`
}
// ParticipantInfo describes a bot available for routing.
type ParticipantInfo struct {
ID string `json:"id"`
Description string `json:"description"`
Capabilities []string `json:"capabilities,omitempty"`
}
// MarshalTaskEvent serializes a TaskEvent to JSON for bus transport.
func MarshalTaskEvent(t TaskEvent) (string, error) {
b, err := json.Marshal(t)
if err != nil {
return "", err
}
return string(b), nil
}
// UnmarshalTaskEvent deserializes a TaskEvent from JSON.
func UnmarshalTaskEvent(data string) (TaskEvent, error) {
var t TaskEvent
err := json.Unmarshal([]byte(data), &t)
return t, err
}
// MarshalTaskResult serializes a TaskResult to JSON for bus transport.
func MarshalTaskResult(r TaskResult) (string, error) {
b, err := json.Marshal(r)
if err != nil {
return "", err
}
return string(b), nil
}
// UnmarshalTaskResult deserializes a TaskResult from JSON.
func UnmarshalTaskResult(data string) (TaskResult, error) {
var r TaskResult
err := json.Unmarshal([]byte(data), &r)
return r, err
}
+66 -1
View File
@@ -2,8 +2,16 @@
package bus
import (
"context"
"fmt"
"sync"
"time"
)
// Well-known message kinds used by the orchestrator.
const (
KindTask = "task" // orchestrator → bot: handle this question
KindTaskResult = "task_result" // bot → orchestrator: here is my answer
)
// AgentID identifies an agent.
@@ -21,11 +29,17 @@ type AgentMessage struct {
type Bus struct {
mu sync.RWMutex
channels map[AgentID]chan AgentMessage
replyMu sync.Mutex
replyChs map[string]chan AgentMessage // taskID → one-shot reply channel
}
// New creates a new Bus.
func New() *Bus {
return &Bus{channels: make(map[AgentID]chan AgentMessage)}
return &Bus{
channels: make(map[AgentID]chan AgentMessage),
replyChs: make(map[string]chan AgentMessage),
}
}
// Subscribe registers an agent and returns its receive channel.
@@ -53,6 +67,57 @@ func (b *Bus) Send(msg AgentMessage) error {
}
}
// SendAndWait sends a task message and blocks until a reply with the matching
// taskID arrives or the context expires. The caller must ensure the reply is
// routed via Reply().
func (b *Bus) SendAndWait(ctx context.Context, msg AgentMessage, taskID string, timeout time.Duration) (AgentMessage, error) {
ch := make(chan AgentMessage, 1)
b.replyMu.Lock()
b.replyChs[taskID] = ch
b.replyMu.Unlock()
defer func() {
b.replyMu.Lock()
delete(b.replyChs, taskID)
b.replyMu.Unlock()
}()
if err := b.Send(msg); err != nil {
return AgentMessage{}, err
}
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case reply := <-ch:
return reply, nil
case <-timer.C:
return AgentMessage{}, fmt.Errorf("task %s: delegation timeout after %s", taskID, timeout)
case <-ctx.Done():
return AgentMessage{}, ctx.Err()
}
}
// Reply routes a task_result message to the waiting SendAndWait caller.
// If no one is waiting for this taskID, it falls back to regular Send.
func (b *Bus) Reply(taskID string, msg AgentMessage) error {
b.replyMu.Lock()
ch, ok := b.replyChs[taskID]
b.replyMu.Unlock()
if ok {
select {
case ch <- msg:
return nil
default:
return fmt.Errorf("reply channel full for task %s", taskID)
}
}
// Fallback: deliver via regular channel
return b.Send(msg)
}
// Unsubscribe removes an agent from the bus.
func (b *Bus) Unsubscribe(id AgentID) {
b.mu.Lock()
+26 -6
View File
@@ -18,14 +18,20 @@ import (
// EventHandler is called for each incoming Matrix message that passes filters.
type EventHandler func(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event)
// InterceptFunc is called for events in orchestrated rooms.
// It receives the parsed MessageContext. If it returns true, the event is not
// delivered to the bot's normal handler (the orchestrator handles it instead).
type InterceptFunc func(ctx context.Context, msgCtx decision.MessageContext) bool
// Listener attaches to a mautrix syncer and dispatches events to an EventHandler.
type Listener struct {
client *Client
cfg config.MatrixCfg
handler EventHandler
logger *slog.Logger
dmCache map[id.RoomID]bool
mu sync.RWMutex
client *Client
cfg config.MatrixCfg
handler EventHandler
logger *slog.Logger
dmCache map[id.RoomID]bool
mu sync.RWMutex
interceptor InterceptFunc // if set and returns true, event is forwarded to orchestrator
}
// NewListener creates a Listener for the given client.
@@ -39,6 +45,13 @@ func NewListener(client *Client, cfg config.MatrixCfg, handler EventHandler, log
}
}
// SetInterceptor registers a function that can intercept event delivery.
// If the function returns true, the event is handled by the orchestrator
// and not delivered to the bot's normal handler.
func (l *Listener) SetInterceptor(fn InterceptFunc) {
l.interceptor = fn
}
// Run starts the Matrix sync loop. Blocks until ctx is cancelled.
func (l *Listener) Run(ctx context.Context) error {
syncer := l.client.raw.Syncer.(*mautrix.DefaultSyncer)
@@ -112,6 +125,13 @@ func (l *Listener) Run(ctx context.Context) error {
"content_preview", truncate(msgCtx.Content, 80),
)
// Orchestrator intercept: if this room is managed, the orchestrator
// handles routing instead of the bot's normal handler.
if l.interceptor != nil && l.interceptor(ctx, msgCtx) {
l.logger.Debug("event intercepted by orchestrator", "room", evt.RoomID)
return
}
go l.handler(ctx, msgCtx, evt)
})
+48
View File
@@ -0,0 +1,48 @@
package orchestration
import (
"context"
"encoding/json"
"fmt"
"strings"
coretypes "github.com/enmanuel/agents/pkg/llm"
"github.com/enmanuel/agents/pkg/orchestration"
)
// evaluate asks the LLM to score the quality of a bot's response.
func (o *Orchestrator) evaluate(ctx context.Context, question string, response orchestration.BotResponse) orchestration.QualityScore {
userContent := fmt.Sprintf("Question: %s\n\nResponse from %s:\n%s", question, response.BotID, response.Text)
resp, err := o.llm(ctx, coretypes.CompletionRequest{
Model: o.cfg.LLM.Primary.Model,
MaxTokens: o.cfg.LLM.Primary.MaxTokens,
Temperature: o.cfg.LLM.Primary.Temperature,
SystemPrompt: o.qualityPrompt,
Messages: []coretypes.Message{
{Role: coretypes.RoleUser, Content: userContent},
},
})
if err != nil {
o.logger.Error("quality evaluation LLM call failed", "err", err)
// On LLM failure, assume quality is good enough to stop the pipeline
return orchestration.QualityScore{
Score: 1.0,
Continue: false,
Reason: fmt.Sprintf("evaluation failed: %s, assuming good quality", err),
}
}
var qs orchestration.QualityScore
if err := json.Unmarshal([]byte(strings.TrimSpace(resp.Content)), &qs); err != nil {
o.logger.Warn("failed to parse quality score", "content", resp.Content, "err", err)
// On parse failure, assume good quality
return orchestration.QualityScore{
Score: 1.0,
Continue: false,
Reason: fmt.Sprintf("parse failed: %s", err),
}
}
return qs
}
+332
View File
@@ -0,0 +1,332 @@
// Package orchestration implements the multi-bot orchestrator runtime.
// The orchestrator intercepts Matrix events in managed rooms and coordinates
// which bot responds via the in-process bus.
package orchestration
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"github.com/enmanuel/agents/internal/config"
"github.com/enmanuel/agents/pkg/decision"
coretypes "github.com/enmanuel/agents/pkg/llm"
"github.com/enmanuel/agents/pkg/orchestration"
"github.com/enmanuel/agents/shell/bus"
shelllm "github.com/enmanuel/agents/shell/llm"
)
// Orchestrator coordinates multi-bot rooms. It has no Matrix identity —
// it intercepts events before they reach bots and delegates via the bus.
type Orchestrator struct {
cfg *config.SpecialConfig
llm coretypes.CompleteFunc
bus *bus.Bus
managedRooms map[string][]string // roomID → []botID
participants map[string]orchestration.ParticipantInfo // botID → info
logger *slog.Logger
// Prompts loaded from files
routingPrompt string
qualityPrompt string
refinementPrompt string
// Dedup: multiple bots in the same room will each trigger Intercept().
// We use a set of "room:sender:content" keys to ensure only one fires.
seenMu sync.Mutex
seen map[string]bool
}
// New creates an Orchestrator from its config.
func New(cfg *config.SpecialConfig, agentBus *bus.Bus, logger *slog.Logger) (*Orchestrator, error) {
llmFunc, err := shelllm.FromConfig(cfg.LLM.Primary)
if err != nil {
return nil, fmt.Errorf("orchestrator LLM: %w", err)
}
managed := make(map[string][]string)
for _, room := range cfg.Orchestration.Rooms {
managed[room.RoomID] = room.Participants
}
o := &Orchestrator{
cfg: cfg,
llm: llmFunc,
bus: agentBus,
managedRooms: managed,
participants: make(map[string]orchestration.ParticipantInfo),
logger: logger,
seen: make(map[string]bool),
}
if err := o.loadPrompts(); err != nil {
return nil, fmt.Errorf("load prompts: %w", err)
}
return o, nil
}
// RegisterParticipant adds bot metadata used for LLM routing decisions.
func (o *Orchestrator) RegisterParticipant(info orchestration.ParticipantInfo) {
o.participants[info.ID] = info
o.logger.Debug("registered participant", "bot", info.ID, "desc", info.Description)
}
// ShouldIntercept returns true if the room is managed by this orchestrator.
func (o *Orchestrator) ShouldIntercept(roomID string) bool {
_, ok := o.managedRooms[roomID]
return ok
}
// Intercept is the InterceptFunc used by bot listeners. It checks if the
// room is managed and, if so, starts the orchestration pipeline asynchronously.
// Returns true if the event was intercepted (all bots in the room should return true,
// but only the first one triggers actual routing — the rest are deduped).
func (o *Orchestrator) Intercept(ctx context.Context, msgCtx decision.MessageContext) bool {
if !o.ShouldIntercept(msgCtx.RoomID) {
return false
}
// Dedup: multiple bots receive the same event. Only route once.
key := msgCtx.RoomID + ":" + msgCtx.SenderID + ":" + msgCtx.Content
o.seenMu.Lock()
if o.seen[key] {
o.seenMu.Unlock()
return true // still intercept (don't let the bot handle it) but don't route again
}
o.seen[key] = true
o.seenMu.Unlock()
// Route asynchronously so the listener isn't blocked.
// Clean up the dedup key after routing completes.
go func() {
defer func() {
o.seenMu.Lock()
delete(o.seen, key)
o.seenMu.Unlock()
}()
if err := o.Route(ctx, msgCtx); err != nil {
o.logger.Error("orchestration failed", "room", msgCtx.RoomID, "err", err)
}
}()
return true
}
// Route is the main entry point. Called when a human posts in a managed room.
// It decides which bot(s) should respond and dispatches tasks via the bus.
func (o *Orchestrator) Route(ctx context.Context, msgCtx decision.MessageContext) error {
participants, ok := o.managedRooms[msgCtx.RoomID]
if !ok {
return fmt.Errorf("room %s is not managed", msgCtx.RoomID)
}
o.logger.Info("orchestrating message",
"room", msgCtx.RoomID,
"sender", msgCtx.SenderID,
"participants", participants,
"content_preview", truncate(msgCtx.Content, 80),
)
// Optimization: single bot → dispatch directly without LLM
if len(participants) == 1 {
o.logger.Debug("single participant, dispatching directly", "bot", participants[0])
_, err := o.dispatchAndWait(ctx, participants[0], msgCtx, 0, nil)
return err
}
var responses []orchestration.BotResponse
var lastBot string
maxIter := o.cfg.Orchestration.MaxIterations
if maxIter <= 0 {
maxIter = 3
}
for i := 0; i < maxIter; i++ {
// Route: decide which bot responds
var target string
var err error
if i == 0 {
rd, routeErr := o.routeInitial(ctx, msgCtx.Content, participants)
if routeErr != nil {
o.logger.Error("routing failed, falling back to first participant", "err", routeErr)
target = participants[0]
} else {
target = rd.TargetBotID
o.logger.Info("routed to bot",
"bot", target,
"confidence", rd.Confidence,
"reason", rd.Reason,
"iteration", i,
)
}
} else {
rd, routeErr := o.routeRefinement(ctx, msgCtx.Content, responses, participants, lastBot)
if routeErr != nil {
o.logger.Warn("refinement routing failed, stopping pipeline", "err", routeErr)
break
}
target = rd.TargetBotID
o.logger.Info("refinement routed to bot",
"bot", target,
"reason", rd.Reason,
"iteration", i,
)
}
// Dispatch: send TaskEvent to bot via bus and wait for response
response, err := o.dispatchAndWait(ctx, target, msgCtx, i, responses)
if err != nil {
o.logger.Error("dispatch failed", "bot", target, "err", err)
break
}
responses = append(responses, response)
lastBot = target
o.logger.Info("bot responded",
"bot", target,
"response_len", len(response.Text),
"iteration", i,
)
// Evaluate quality (Fase 3)
score := o.evaluate(ctx, msgCtx.Content, response)
o.logger.Info("quality evaluated",
"score", score.Score,
"continue", score.Continue,
"reason", score.Reason,
"iteration", i,
)
if score.Score >= o.cfg.Orchestration.QualityThreshold || !score.Continue {
o.logger.Info("pipeline complete",
"iterations", i+1,
"final_score", score.Score,
)
break
}
}
return nil
}
// dispatchAndWait sends a TaskEvent to a bot and waits for its response.
func (o *Orchestrator) dispatchAndWait(
ctx context.Context,
botID string,
msgCtx decision.MessageContext,
iteration int,
previousResponses []orchestration.BotResponse,
) (orchestration.BotResponse, error) {
taskID := fmt.Sprintf("orch-%s-%s-%d", msgCtx.RoomID, botID, iteration)
task := orchestration.TaskEvent{
TaskID: taskID,
TargetBotID: botID,
TargetRoomID: msgCtx.RoomID,
OriginalSender: msgCtx.SenderID,
OriginalQuestion: msgCtx.Content,
Iteration: iteration,
PreviousResponses: previousResponses,
}
taskJSON, err := orchestration.MarshalTaskEvent(task)
if err != nil {
return orchestration.BotResponse{}, fmt.Errorf("marshal task: %w", err)
}
msg := bus.AgentMessage{
From: bus.AgentID(o.cfg.Special.ID),
To: bus.AgentID(botID),
Kind: bus.KindTask,
Payload: map[string]string{"task_json": taskJSON},
}
timeout := o.cfg.Orchestration.DelegationTimeout
if timeout <= 0 {
timeout = 30_000_000_000 // 30s default
}
reply, err := o.bus.SendAndWait(ctx, msg, taskID, timeout)
if err != nil {
return orchestration.BotResponse{}, err
}
resultJSON, ok := reply.Payload["result_json"]
if !ok {
return orchestration.BotResponse{}, fmt.Errorf("reply missing result_json")
}
result, err := orchestration.UnmarshalTaskResult(resultJSON)
if err != nil {
return orchestration.BotResponse{}, fmt.Errorf("unmarshal result: %w", err)
}
if result.Error != "" {
return orchestration.BotResponse{}, fmt.Errorf("bot %s error: %s", botID, result.Error)
}
return orchestration.BotResponse{
BotID: botID,
Text: result.Text,
}, nil
}
// loadPrompts reads the orchestrator's prompt files.
func (o *Orchestrator) loadPrompts() error {
base := filepath.Join("agents", "specials", "orchestrator", "prompts")
routing, err := os.ReadFile(filepath.Join(base, "routing.md"))
if err != nil {
return fmt.Errorf("routing prompt: %w", err)
}
o.routingPrompt = string(routing)
quality, err := os.ReadFile(filepath.Join(base, "quality.md"))
if err != nil {
return fmt.Errorf("quality prompt: %w", err)
}
o.qualityPrompt = string(quality)
refinement, err := os.ReadFile(filepath.Join(base, "refinement.md"))
if err != nil {
return fmt.Errorf("refinement prompt: %w", err)
}
o.refinementPrompt = string(refinement)
return nil
}
// buildParticipantsList formats participant info for LLM prompts.
func (o *Orchestrator) buildParticipantsList(botIDs []string, exclude string) string {
var sb strings.Builder
for _, id := range botIDs {
if id == exclude {
continue
}
info, ok := o.participants[id]
if !ok {
sb.WriteString(fmt.Sprintf("- %s: (no description available)\n", id))
continue
}
caps := ""
if len(info.Capabilities) > 0 {
caps = fmt.Sprintf(" (capabilities: %s)", strings.Join(info.Capabilities, ", "))
}
sb.WriteString(fmt.Sprintf("- %s: %s%s\n", info.ID, info.Description, caps))
}
return sb.String()
}
func truncate(s string, n int) string {
runes := []rune(s)
if len(runes) <= n {
return s
}
return string(runes[:n]) + "..."
}
+107
View File
@@ -0,0 +1,107 @@
package orchestration
import (
"context"
"encoding/json"
"fmt"
"strings"
coretypes "github.com/enmanuel/agents/pkg/llm"
"github.com/enmanuel/agents/pkg/orchestration"
)
// routeInitial asks the LLM which bot should handle the question first.
func (o *Orchestrator) routeInitial(ctx context.Context, question string, participants []string) (orchestration.RoutingDecision, error) {
systemPrompt := strings.ReplaceAll(o.routingPrompt, "{{PARTICIPANTS}}", o.buildParticipantsList(participants, ""))
resp, err := o.llm(ctx, coretypes.CompletionRequest{
Model: o.cfg.LLM.Primary.Model,
MaxTokens: o.cfg.LLM.Primary.MaxTokens,
Temperature: o.cfg.LLM.Primary.Temperature,
SystemPrompt: systemPrompt,
Messages: []coretypes.Message{
{Role: coretypes.RoleUser, Content: question},
},
})
if err != nil {
return orchestration.RoutingDecision{}, fmt.Errorf("LLM routing call: %w", err)
}
var rd orchestration.RoutingDecision
if err := json.Unmarshal([]byte(strings.TrimSpace(resp.Content)), &rd); err != nil {
o.logger.Warn("failed to parse routing response, raw", "content", resp.Content, "err", err)
return orchestration.RoutingDecision{}, fmt.Errorf("parse routing decision: %w", err)
}
// Validate the chosen bot is actually a participant
if !contains(participants, rd.TargetBotID) {
o.logger.Warn("LLM chose unknown bot, falling back to first", "chosen", rd.TargetBotID)
rd.TargetBotID = participants[0]
rd.Confidence = 0.5
rd.Reason = "fallback: LLM chose unknown bot"
}
return rd, nil
}
// routeRefinement asks the LLM which bot should improve the response,
// excluding the last respondent.
func (o *Orchestrator) routeRefinement(
ctx context.Context,
question string,
responses []orchestration.BotResponse,
participants []string,
excludeBot string,
) (orchestration.RoutingDecision, error) {
lastResponse := ""
if len(responses) > 0 {
lastResponse = responses[len(responses)-1].Text
}
systemPrompt := strings.ReplaceAll(o.refinementPrompt, "{{PARTICIPANTS}}", o.buildParticipantsList(participants, excludeBot))
systemPrompt = strings.ReplaceAll(systemPrompt, "{{LAST_RESPONSE}}", lastResponse)
userContent := fmt.Sprintf("Original question: %s\n\nCurrent response that needs improvement:\n%s", question, lastResponse)
resp, err := o.llm(ctx, coretypes.CompletionRequest{
Model: o.cfg.LLM.Primary.Model,
MaxTokens: o.cfg.LLM.Primary.MaxTokens,
Temperature: o.cfg.LLM.Primary.Temperature,
SystemPrompt: systemPrompt,
Messages: []coretypes.Message{
{Role: coretypes.RoleUser, Content: userContent},
},
})
if err != nil {
return orchestration.RoutingDecision{}, fmt.Errorf("LLM refinement call: %w", err)
}
var rd orchestration.RoutingDecision
if err := json.Unmarshal([]byte(strings.TrimSpace(resp.Content)), &rd); err != nil {
o.logger.Warn("failed to parse refinement response", "content", resp.Content, "err", err)
return orchestration.RoutingDecision{}, fmt.Errorf("parse refinement decision: %w", err)
}
// Validate: must be a participant and not the excluded bot
if rd.TargetBotID == excludeBot || !contains(participants, rd.TargetBotID) {
// Pick first available that isn't excluded
for _, p := range participants {
if p != excludeBot {
rd.TargetBotID = p
rd.Reason = "fallback: LLM chose excluded or unknown bot"
break
}
}
}
return rd, nil
}
func contains(ss []string, s string) bool {
for _, v := range ss {
if v == s {
return true
}
}
return false
}