diff --git a/agents/runtime.go b/agents/runtime.go index 1603a70..53fd95e 100644 --- a/agents/runtime.go +++ b/agents/runtime.go @@ -16,7 +16,9 @@ import ( "github.com/enmanuel/agents/pkg/decision" coretypes "github.com/enmanuel/agents/pkg/llm" "github.com/enmanuel/agents/pkg/memory" + "github.com/enmanuel/agents/pkg/orchestration" "github.com/enmanuel/agents/pkg/personality" + "github.com/enmanuel/agents/shell/bus" "github.com/enmanuel/agents/shell/effects" shelllm "github.com/enmanuel/agents/shell/llm" "github.com/enmanuel/agents/shell/matrix" @@ -49,6 +51,9 @@ type Agent struct { memStore memory.Store // nil when memory is disabled windowSize int roomCtx *tools.RoomContext + + // Bus — set via SetBus() when running under the unified launcher + agentBus *bus.Bus } // ClearWindow resets the conversation window for a room and deletes persisted @@ -181,6 +186,17 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (* return a, nil } +// SetBus attaches the agent to the inter-agent bus for orchestration. +// Must be called before Run(). +func (a *Agent) SetBus(b *bus.Bus) { + a.agentBus = b +} + +// SetInterceptor configures the listener to skip events in orchestrated rooms. +func (a *Agent) SetInterceptor(fn matrix.InterceptFunc) { + a.listener.SetInterceptor(fn) +} + // Run starts the agent sync loop. Blocks until ctx is cancelled. func (a *Agent) Run(ctx context.Context) error { if a.cryptoStore != nil { @@ -194,9 +210,136 @@ func (a *Agent) Run(ctx context.Context) error { "name", a.cfg.Agent.Name, "tools", a.toolReg.Names(), ) + + // Start bus listener if connected to the orchestration bus + if a.agentBus != nil { + ch := a.agentBus.Subscribe(bus.AgentID(a.cfg.Agent.ID)) + go a.listenBus(ctx, ch) + a.logger.Info("bus listener started") + } + return a.listener.Run(ctx) } +// listenBus processes messages from the inter-agent bus. +func (a *Agent) listenBus(ctx context.Context, ch <-chan bus.AgentMessage) { + for { + select { + case <-ctx.Done(): + return + case msg, ok := <-ch: + if !ok { + return + } + if msg.Kind == bus.KindTask { + a.handleTaskEvent(ctx, msg) + } + } + } +} + +// handleTaskEvent processes a task delegated by the orchestrator. +// The bot generates a response and sends it both to Matrix and back via bus. +func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) { + taskJSON, ok := msg.Payload["task_json"] + if !ok { + a.logger.Error("task message missing task_json payload") + return + } + + task, err := orchestration.UnmarshalTaskEvent(taskJSON) + if err != nil { + a.logger.Error("failed to unmarshal task event", "err", err) + return + } + + a.logger.Info("handling orchestrated task", + "task_id", task.TaskID, + "room", task.TargetRoomID, + "sender", task.OriginalSender, + "iteration", task.Iteration, + ) + + roomID := task.TargetRoomID + + // Update room context for memory tools + a.roomCtx.Set(roomID) + + if a.cfg.Personality.Behavior.TypingIndicator { + _ = a.matrix.SendTyping(ctx, roomID, true) + defer a.matrix.SendTyping(ctx, roomID, false) + } + + // Build a synthetic MessageContext from the task + msgCtx := decision.MessageContext{ + SenderID: task.OriginalSender, + RoomID: roomID, + Content: task.OriginalQuestion, + IsDirectMsg: false, + IsMention: true, // treat orchestrated tasks like mentions + } + + // If there are previous responses, prepend context + if len(task.PreviousResponses) > 0 { + var context string + for _, pr := range task.PreviousResponses { + context += fmt.Sprintf("[Previous response from %s]: %s\n\n", pr.BotID, pr.Text) + } + msgCtx.Content = context + "Original question: " + task.OriginalQuestion + + "\n\nPlease provide an improved or complementary answer." + } + + // Load memory and run LLM + a.ensureWindowLoaded(ctx, roomID) + a.appendToWindow(roomID, coretypes.Message{ + Role: coretypes.RoleUser, Content: msgCtx.Content, + }) + + reply, err := a.runLLM(ctx, msgCtx) + + // Build the result to send back via bus + result := orchestration.TaskResult{ + TaskID: task.TaskID, + BotID: a.cfg.Agent.ID, + } + + if err != nil { + a.logger.Error("LLM error during orchestrated task", "err", err) + result.Error = err.Error() + reply = "Sorry, I encountered an error." + } else { + result.Text = reply + // Persist assistant reply + a.appendToWindow(roomID, coretypes.Message{ + Role: coretypes.RoleAssistant, Content: reply, + }) + a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply) + } + + // Send reply to Matrix room + if sendErr := a.matrix.SendText(ctx, roomID, reply); sendErr != nil { + a.logger.Error("failed to send orchestrated reply to Matrix", "err", sendErr) + } + + // Send result back to orchestrator via bus + resultJSON, marshalErr := orchestration.MarshalTaskResult(result) + if marshalErr != nil { + a.logger.Error("failed to marshal task result", "err", marshalErr) + return + } + + replyMsg := bus.AgentMessage{ + From: bus.AgentID(a.cfg.Agent.ID), + To: msg.From, + Kind: bus.KindTaskResult, + Payload: map[string]string{"result_json": resultJSON}, + } + + if busErr := a.agentBus.Reply(task.TaskID, replyMsg); busErr != nil { + a.logger.Error("failed to send task result via bus", "err", busErr) + } +} + // handleEvent is called by the matrix Listener for each filtered incoming event. func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) { a.logger.Debug("handling event", diff --git a/agents/specials/orchestrator/config.yaml b/agents/specials/orchestrator/config.yaml new file mode 100644 index 0000000..3379e08 --- /dev/null +++ b/agents/specials/orchestrator/config.yaml @@ -0,0 +1,23 @@ +special: + id: orchestrator + type: orchestrator + enabled: true + description: "Middleware de coordinación multi-bot. Sin identidad Matrix." + +llm: + primary: + provider: anthropic + model: claude-sonnet-4-6 + api_key_env: ANTHROPIC_API_KEY + max_tokens: 512 + temperature: 0.2 + +orchestration: + max_iterations: 3 + quality_threshold: 0.8 + delegation_timeout: 30s + rooms: + - room_id: "${MATRIX_ROOM_SHARED}" + participants: + - assistant-bot + - asistente-2 diff --git a/agents/specials/orchestrator/prompts/quality.md b/agents/specials/orchestrator/prompts/quality.md new file mode 100644 index 0000000..d56ce3e --- /dev/null +++ b/agents/specials/orchestrator/prompts/quality.md @@ -0,0 +1,11 @@ +You are a quality evaluator for AI agent responses. Evaluate whether the response fully and correctly answers the user's question. + +Criteria: +- Accuracy: Is the information correct? +- Completeness: Does it address all parts of the question? +- Usefulness: Is the response actionable and helpful? + +Respond ONLY with valid JSON (no markdown, no extra text): +{"score": <0.0-1.0>, "continue": , "reason": ""} + +Set "continue" to true only if the response is clearly incomplete or incorrect and another agent could do better. diff --git a/agents/specials/orchestrator/prompts/refinement.md b/agents/specials/orchestrator/prompts/refinement.md new file mode 100644 index 0000000..5ec511f --- /dev/null +++ b/agents/specials/orchestrator/prompts/refinement.md @@ -0,0 +1,10 @@ +The previous response needs improvement. Choose the best agent to complement or improve the answer. + +Available agents (the previous respondent has been excluded): +{{PARTICIPANTS}} + +Previous response that needs improvement: +{{LAST_RESPONSE}} + +Respond ONLY with valid JSON (no markdown, no extra text): +{"bot_id": "", "reason": ""} diff --git a/agents/specials/orchestrator/prompts/routing.md b/agents/specials/orchestrator/prompts/routing.md new file mode 100644 index 0000000..0bad261 --- /dev/null +++ b/agents/specials/orchestrator/prompts/routing.md @@ -0,0 +1,9 @@ +You are an AI agent coordinator. Your job is to decide which agent should respond to a user's question. + +Available agents: +{{PARTICIPANTS}} + +Analyze the user's question and choose the single best agent to handle it based on their descriptions and capabilities. + +Respond ONLY with valid JSON (no markdown, no extra text): +{"bot_id": "", "confidence": <0.0-1.0>, "reason": ""} diff --git a/cmd/launcher/main.go b/cmd/launcher/main.go index 7187d7b..9b64e1d 100644 --- a/cmd/launcher/main.go +++ b/cmd/launcher/main.go @@ -22,6 +22,9 @@ import ( asistente2agent "github.com/enmanuel/agents/agents/asistente-2" "github.com/enmanuel/agents/internal/config" "github.com/enmanuel/agents/pkg/decision" + "github.com/enmanuel/agents/pkg/orchestration" + "github.com/enmanuel/agents/shell/bus" + orchshell "github.com/enmanuel/agents/shell/orchestration" ) // rulesRegistry maps agent IDs to their rule factories. @@ -58,6 +61,21 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() + // ── Shared bus for inter-agent communication ── + agentBus := bus.New() + + // ── Start special agents (orchestrator, etc.) BEFORE normal bots ── + orch, err := startOrchestrator(agentBus, logger) + if err != nil { + // Non-fatal: orchestration is optional + logger.Warn("orchestrator not started", "err", err) + } else { + logger.Info("orchestrator ready", + "managed_rooms", len(orch.cfg.Orchestration.Rooms), + ) + } + + // ── Start normal agents ── var wg sync.WaitGroup for _, path := range configPaths { path := path @@ -80,6 +98,25 @@ func main() { continue } + // Connect agent to bus for orchestration + a.SetBus(agentBus) + + // If orchestrator is active, set interceptor so bots don't + // handle events directly in orchestrated rooms. + // The first bot's listener to receive the event will trigger orchestration. + if orch != nil { + a.SetInterceptor(orch.orchestrator.Intercept) + } + + // Register this agent as a participant in the orchestrator + if orch != nil { + orch.orchestrator.RegisterParticipant(orchestration.ParticipantInfo{ + ID: cfg.Agent.ID, + Description: cfg.Agent.Description, + Capabilities: cfg.Agent.Tags, + }) + } + wg.Add(1) go func() { defer wg.Done() @@ -106,6 +143,37 @@ func main() { } } +// orchHandle wraps a running orchestrator with its config for the launcher. +type orchHandle struct { + orchestrator *orchshell.Orchestrator + cfg *config.SpecialConfig +} + +// startOrchestrator scans agents/specials/orchestrator/config.yaml and +// initializes the orchestrator if found and enabled. +func startOrchestrator(agentBus *bus.Bus, logger *slog.Logger) (*orchHandle, error) { + cfgPath := filepath.Join("agents", "specials", "orchestrator", "config.yaml") + if _, err := os.Stat(cfgPath); os.IsNotExist(err) { + return nil, err + } + + cfg, err := config.LoadSpecial(cfgPath) + if err != nil { + return nil, err + } + if !cfg.Special.Enabled { + return nil, nil + } + + orchLogger := logger.With("component", "orchestrator") + orch, err := orchshell.New(cfg, agentBus, orchLogger) + if err != nil { + return nil, err + } + + return &orchHandle{orchestrator: orch, cfg: cfg}, nil +} + func rulesFor(agentID string, logger *slog.Logger) []decision.Rule { factory, ok := rulesRegistry[agentID] if !ok { diff --git a/internal/config/loader.go b/internal/config/loader.go index 6e88ba3..0f2275a 100644 --- a/internal/config/loader.go +++ b/internal/config/loader.go @@ -47,6 +47,42 @@ func LoadMeta(path string) (*AgentConfig, error) { return &cfg, nil } +// LoadSpecial reads and parses a special agent config file. +// Special agents have no Matrix identity so validation is lighter. +func LoadSpecial(path string) (*SpecialConfig, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("read special config %s: %w", path, err) + } + + expanded := os.ExpandEnv(string(data)) + + var cfg SpecialConfig + if err := yaml.Unmarshal([]byte(expanded), &cfg); err != nil { + return nil, fmt.Errorf("parse special config %s: %w", path, err) + } + + if err := validateSpecial(&cfg); err != nil { + return nil, fmt.Errorf("invalid special config %s: %w", path, err) + } + + return &cfg, nil +} + +// validateSpecial applies sanity checks for special agent configs. +func validateSpecial(cfg *SpecialConfig) error { + if cfg.Special.ID == "" { + return fmt.Errorf("special.id is required") + } + if cfg.Special.Type == "" { + return fmt.Errorf("special.type is required") + } + if cfg.LLM.Primary.Provider == "" { + return fmt.Errorf("llm.primary.provider is required") + } + return nil +} + // validate applies basic sanity checks. func validate(cfg *AgentConfig) error { if cfg.Agent.ID == "" { diff --git a/internal/config/schema.go b/internal/config/schema.go index e979eec..e26b59c 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -398,3 +398,34 @@ type MemoryCfg struct { type MemoryToolCfg struct { Enabled bool `yaml:"enabled"` } + +// ── Special Agents ──────────────────────────────────────────────────────── + +// SpecialConfig is the root configuration for a special agent (no Matrix identity). +type SpecialConfig struct { + Special SpecialMeta `yaml:"special"` + LLM LLMCfg `yaml:"llm"` + Orchestration OrchestrationCfg `yaml:"orchestration"` +} + +// SpecialMeta identifies a special agent. +type SpecialMeta struct { + ID string `yaml:"id"` + Type string `yaml:"type"` // "orchestrator", "scheduler", etc. + Enabled bool `yaml:"enabled"` + Description string `yaml:"description"` +} + +// OrchestrationCfg configures the multi-bot orchestrator. +type OrchestrationCfg struct { + MaxIterations int `yaml:"max_iterations"` + QualityThreshold float64 `yaml:"quality_threshold"` + DelegationTimeout time.Duration `yaml:"delegation_timeout"` + Rooms []OrchestratedRoomCfg `yaml:"rooms"` +} + +// OrchestratedRoomCfg defines a room managed by the orchestrator. +type OrchestratedRoomCfg struct { + RoomID string `yaml:"room_id"` + Participants []string `yaml:"participants"` // bot IDs that participate in this room +} diff --git a/pkg/orchestration/task.go b/pkg/orchestration/task.go new file mode 100644 index 0000000..e2ebb4f --- /dev/null +++ b/pkg/orchestration/task.go @@ -0,0 +1,91 @@ +// Package orchestration defines pure types for multi-bot coordination. +// Zero side effects — only data structures and helpers. +package orchestration + +import "encoding/json" + +// TaskEvent is sent by the orchestrator to a bot via the bus. +// It tells the bot: "answer this question in this room with this context." +type TaskEvent struct { + TaskID string `json:"task_id"` + TargetBotID string `json:"target_bot_id"` + TargetRoomID string `json:"target_room_id"` + OriginalSender string `json:"original_sender"` + OriginalQuestion string `json:"original_question"` + Iteration int `json:"iteration"` + PreviousResponses []BotResponse `json:"previous_responses,omitempty"` + RoomContext []ContextMessage `json:"room_context,omitempty"` +} + +// BotResponse is a bot's reply to a TaskEvent. +type BotResponse struct { + BotID string `json:"bot_id"` + Text string `json:"text"` +} + +// TaskResult is sent by a bot back to the orchestrator via the bus. +type TaskResult struct { + TaskID string `json:"task_id"` + BotID string `json:"bot_id"` + Text string `json:"text"` + Error string `json:"error,omitempty"` +} + +// QualityScore is the LLM's evaluation of a bot's response. +type QualityScore struct { + Score float64 `json:"score"` // 0.0–1.0 + Continue bool `json:"continue"` // should the pipeline continue? + Reason string `json:"reason"` +} + +// RoutingDecision is the LLM's choice of which bot should respond. +type RoutingDecision struct { + TargetBotID string `json:"bot_id"` + Confidence float64 `json:"confidence"` + Reason string `json:"reason"` +} + +// ContextMessage is a single message from the room's recent history. +type ContextMessage struct { + SenderID string `json:"sender_id"` + Content string `json:"content"` +} + +// ParticipantInfo describes a bot available for routing. +type ParticipantInfo struct { + ID string `json:"id"` + Description string `json:"description"` + Capabilities []string `json:"capabilities,omitempty"` +} + +// MarshalTaskEvent serializes a TaskEvent to JSON for bus transport. +func MarshalTaskEvent(t TaskEvent) (string, error) { + b, err := json.Marshal(t) + if err != nil { + return "", err + } + return string(b), nil +} + +// UnmarshalTaskEvent deserializes a TaskEvent from JSON. +func UnmarshalTaskEvent(data string) (TaskEvent, error) { + var t TaskEvent + err := json.Unmarshal([]byte(data), &t) + return t, err +} + +// MarshalTaskResult serializes a TaskResult to JSON for bus transport. +func MarshalTaskResult(r TaskResult) (string, error) { + b, err := json.Marshal(r) + if err != nil { + return "", err + } + return string(b), nil +} + +// UnmarshalTaskResult deserializes a TaskResult from JSON. +func UnmarshalTaskResult(data string) (TaskResult, error) { + var r TaskResult + err := json.Unmarshal([]byte(data), &r) + return r, err +} diff --git a/shell/bus/bus.go b/shell/bus/bus.go index 9305406..2c93feb 100644 --- a/shell/bus/bus.go +++ b/shell/bus/bus.go @@ -2,8 +2,16 @@ package bus import ( + "context" "fmt" "sync" + "time" +) + +// Well-known message kinds used by the orchestrator. +const ( + KindTask = "task" // orchestrator → bot: handle this question + KindTaskResult = "task_result" // bot → orchestrator: here is my answer ) // AgentID identifies an agent. @@ -21,11 +29,17 @@ type AgentMessage struct { type Bus struct { mu sync.RWMutex channels map[AgentID]chan AgentMessage + + replyMu sync.Mutex + replyChs map[string]chan AgentMessage // taskID → one-shot reply channel } // New creates a new Bus. func New() *Bus { - return &Bus{channels: make(map[AgentID]chan AgentMessage)} + return &Bus{ + channels: make(map[AgentID]chan AgentMessage), + replyChs: make(map[string]chan AgentMessage), + } } // Subscribe registers an agent and returns its receive channel. @@ -53,6 +67,57 @@ func (b *Bus) Send(msg AgentMessage) error { } } +// SendAndWait sends a task message and blocks until a reply with the matching +// taskID arrives or the context expires. The caller must ensure the reply is +// routed via Reply(). +func (b *Bus) SendAndWait(ctx context.Context, msg AgentMessage, taskID string, timeout time.Duration) (AgentMessage, error) { + ch := make(chan AgentMessage, 1) + b.replyMu.Lock() + b.replyChs[taskID] = ch + b.replyMu.Unlock() + + defer func() { + b.replyMu.Lock() + delete(b.replyChs, taskID) + b.replyMu.Unlock() + }() + + if err := b.Send(msg); err != nil { + return AgentMessage{}, err + } + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case reply := <-ch: + return reply, nil + case <-timer.C: + return AgentMessage{}, fmt.Errorf("task %s: delegation timeout after %s", taskID, timeout) + case <-ctx.Done(): + return AgentMessage{}, ctx.Err() + } +} + +// Reply routes a task_result message to the waiting SendAndWait caller. +// If no one is waiting for this taskID, it falls back to regular Send. +func (b *Bus) Reply(taskID string, msg AgentMessage) error { + b.replyMu.Lock() + ch, ok := b.replyChs[taskID] + b.replyMu.Unlock() + + if ok { + select { + case ch <- msg: + return nil + default: + return fmt.Errorf("reply channel full for task %s", taskID) + } + } + // Fallback: deliver via regular channel + return b.Send(msg) +} + // Unsubscribe removes an agent from the bus. func (b *Bus) Unsubscribe(id AgentID) { b.mu.Lock() diff --git a/shell/matrix/listener.go b/shell/matrix/listener.go index a161918..8b8edd7 100644 --- a/shell/matrix/listener.go +++ b/shell/matrix/listener.go @@ -18,14 +18,20 @@ import ( // EventHandler is called for each incoming Matrix message that passes filters. type EventHandler func(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) +// InterceptFunc is called for events in orchestrated rooms. +// It receives the parsed MessageContext. If it returns true, the event is not +// delivered to the bot's normal handler (the orchestrator handles it instead). +type InterceptFunc func(ctx context.Context, msgCtx decision.MessageContext) bool + // Listener attaches to a mautrix syncer and dispatches events to an EventHandler. type Listener struct { - client *Client - cfg config.MatrixCfg - handler EventHandler - logger *slog.Logger - dmCache map[id.RoomID]bool - mu sync.RWMutex + client *Client + cfg config.MatrixCfg + handler EventHandler + logger *slog.Logger + dmCache map[id.RoomID]bool + mu sync.RWMutex + interceptor InterceptFunc // if set and returns true, event is forwarded to orchestrator } // NewListener creates a Listener for the given client. @@ -39,6 +45,13 @@ func NewListener(client *Client, cfg config.MatrixCfg, handler EventHandler, log } } +// SetInterceptor registers a function that can intercept event delivery. +// If the function returns true, the event is handled by the orchestrator +// and not delivered to the bot's normal handler. +func (l *Listener) SetInterceptor(fn InterceptFunc) { + l.interceptor = fn +} + // Run starts the Matrix sync loop. Blocks until ctx is cancelled. func (l *Listener) Run(ctx context.Context) error { syncer := l.client.raw.Syncer.(*mautrix.DefaultSyncer) @@ -112,6 +125,13 @@ func (l *Listener) Run(ctx context.Context) error { "content_preview", truncate(msgCtx.Content, 80), ) + // Orchestrator intercept: if this room is managed, the orchestrator + // handles routing instead of the bot's normal handler. + if l.interceptor != nil && l.interceptor(ctx, msgCtx) { + l.logger.Debug("event intercepted by orchestrator", "room", evt.RoomID) + return + } + go l.handler(ctx, msgCtx, evt) }) diff --git a/shell/orchestration/evaluator.go b/shell/orchestration/evaluator.go new file mode 100644 index 0000000..4a5f0d1 --- /dev/null +++ b/shell/orchestration/evaluator.go @@ -0,0 +1,48 @@ +package orchestration + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + coretypes "github.com/enmanuel/agents/pkg/llm" + "github.com/enmanuel/agents/pkg/orchestration" +) + +// evaluate asks the LLM to score the quality of a bot's response. +func (o *Orchestrator) evaluate(ctx context.Context, question string, response orchestration.BotResponse) orchestration.QualityScore { + userContent := fmt.Sprintf("Question: %s\n\nResponse from %s:\n%s", question, response.BotID, response.Text) + + resp, err := o.llm(ctx, coretypes.CompletionRequest{ + Model: o.cfg.LLM.Primary.Model, + MaxTokens: o.cfg.LLM.Primary.MaxTokens, + Temperature: o.cfg.LLM.Primary.Temperature, + SystemPrompt: o.qualityPrompt, + Messages: []coretypes.Message{ + {Role: coretypes.RoleUser, Content: userContent}, + }, + }) + if err != nil { + o.logger.Error("quality evaluation LLM call failed", "err", err) + // On LLM failure, assume quality is good enough to stop the pipeline + return orchestration.QualityScore{ + Score: 1.0, + Continue: false, + Reason: fmt.Sprintf("evaluation failed: %s, assuming good quality", err), + } + } + + var qs orchestration.QualityScore + if err := json.Unmarshal([]byte(strings.TrimSpace(resp.Content)), &qs); err != nil { + o.logger.Warn("failed to parse quality score", "content", resp.Content, "err", err) + // On parse failure, assume good quality + return orchestration.QualityScore{ + Score: 1.0, + Continue: false, + Reason: fmt.Sprintf("parse failed: %s", err), + } + } + + return qs +} diff --git a/shell/orchestration/orchestrator.go b/shell/orchestration/orchestrator.go new file mode 100644 index 0000000..9e116cd --- /dev/null +++ b/shell/orchestration/orchestrator.go @@ -0,0 +1,332 @@ +// Package orchestration implements the multi-bot orchestrator runtime. +// The orchestrator intercepts Matrix events in managed rooms and coordinates +// which bot responds via the in-process bus. +package orchestration + +import ( + "context" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/enmanuel/agents/internal/config" + "github.com/enmanuel/agents/pkg/decision" + coretypes "github.com/enmanuel/agents/pkg/llm" + "github.com/enmanuel/agents/pkg/orchestration" + "github.com/enmanuel/agents/shell/bus" + shelllm "github.com/enmanuel/agents/shell/llm" +) + +// Orchestrator coordinates multi-bot rooms. It has no Matrix identity — +// it intercepts events before they reach bots and delegates via the bus. +type Orchestrator struct { + cfg *config.SpecialConfig + llm coretypes.CompleteFunc + bus *bus.Bus + managedRooms map[string][]string // roomID → []botID + participants map[string]orchestration.ParticipantInfo // botID → info + logger *slog.Logger + + // Prompts loaded from files + routingPrompt string + qualityPrompt string + refinementPrompt string + + // Dedup: multiple bots in the same room will each trigger Intercept(). + // We use a set of "room:sender:content" keys to ensure only one fires. + seenMu sync.Mutex + seen map[string]bool +} + +// New creates an Orchestrator from its config. +func New(cfg *config.SpecialConfig, agentBus *bus.Bus, logger *slog.Logger) (*Orchestrator, error) { + llmFunc, err := shelllm.FromConfig(cfg.LLM.Primary) + if err != nil { + return nil, fmt.Errorf("orchestrator LLM: %w", err) + } + + managed := make(map[string][]string) + for _, room := range cfg.Orchestration.Rooms { + managed[room.RoomID] = room.Participants + } + + o := &Orchestrator{ + cfg: cfg, + llm: llmFunc, + bus: agentBus, + managedRooms: managed, + participants: make(map[string]orchestration.ParticipantInfo), + logger: logger, + seen: make(map[string]bool), + } + + if err := o.loadPrompts(); err != nil { + return nil, fmt.Errorf("load prompts: %w", err) + } + + return o, nil +} + +// RegisterParticipant adds bot metadata used for LLM routing decisions. +func (o *Orchestrator) RegisterParticipant(info orchestration.ParticipantInfo) { + o.participants[info.ID] = info + o.logger.Debug("registered participant", "bot", info.ID, "desc", info.Description) +} + +// ShouldIntercept returns true if the room is managed by this orchestrator. +func (o *Orchestrator) ShouldIntercept(roomID string) bool { + _, ok := o.managedRooms[roomID] + return ok +} + +// Intercept is the InterceptFunc used by bot listeners. It checks if the +// room is managed and, if so, starts the orchestration pipeline asynchronously. +// Returns true if the event was intercepted (all bots in the room should return true, +// but only the first one triggers actual routing — the rest are deduped). +func (o *Orchestrator) Intercept(ctx context.Context, msgCtx decision.MessageContext) bool { + if !o.ShouldIntercept(msgCtx.RoomID) { + return false + } + + // Dedup: multiple bots receive the same event. Only route once. + key := msgCtx.RoomID + ":" + msgCtx.SenderID + ":" + msgCtx.Content + o.seenMu.Lock() + if o.seen[key] { + o.seenMu.Unlock() + return true // still intercept (don't let the bot handle it) but don't route again + } + o.seen[key] = true + o.seenMu.Unlock() + + // Route asynchronously so the listener isn't blocked. + // Clean up the dedup key after routing completes. + go func() { + defer func() { + o.seenMu.Lock() + delete(o.seen, key) + o.seenMu.Unlock() + }() + if err := o.Route(ctx, msgCtx); err != nil { + o.logger.Error("orchestration failed", "room", msgCtx.RoomID, "err", err) + } + }() + return true +} + +// Route is the main entry point. Called when a human posts in a managed room. +// It decides which bot(s) should respond and dispatches tasks via the bus. +func (o *Orchestrator) Route(ctx context.Context, msgCtx decision.MessageContext) error { + participants, ok := o.managedRooms[msgCtx.RoomID] + if !ok { + return fmt.Errorf("room %s is not managed", msgCtx.RoomID) + } + + o.logger.Info("orchestrating message", + "room", msgCtx.RoomID, + "sender", msgCtx.SenderID, + "participants", participants, + "content_preview", truncate(msgCtx.Content, 80), + ) + + // Optimization: single bot → dispatch directly without LLM + if len(participants) == 1 { + o.logger.Debug("single participant, dispatching directly", "bot", participants[0]) + _, err := o.dispatchAndWait(ctx, participants[0], msgCtx, 0, nil) + return err + } + + var responses []orchestration.BotResponse + var lastBot string + maxIter := o.cfg.Orchestration.MaxIterations + if maxIter <= 0 { + maxIter = 3 + } + + for i := 0; i < maxIter; i++ { + // Route: decide which bot responds + var target string + var err error + + if i == 0 { + rd, routeErr := o.routeInitial(ctx, msgCtx.Content, participants) + if routeErr != nil { + o.logger.Error("routing failed, falling back to first participant", "err", routeErr) + target = participants[0] + } else { + target = rd.TargetBotID + o.logger.Info("routed to bot", + "bot", target, + "confidence", rd.Confidence, + "reason", rd.Reason, + "iteration", i, + ) + } + } else { + rd, routeErr := o.routeRefinement(ctx, msgCtx.Content, responses, participants, lastBot) + if routeErr != nil { + o.logger.Warn("refinement routing failed, stopping pipeline", "err", routeErr) + break + } + target = rd.TargetBotID + o.logger.Info("refinement routed to bot", + "bot", target, + "reason", rd.Reason, + "iteration", i, + ) + } + + // Dispatch: send TaskEvent to bot via bus and wait for response + response, err := o.dispatchAndWait(ctx, target, msgCtx, i, responses) + if err != nil { + o.logger.Error("dispatch failed", "bot", target, "err", err) + break + } + + responses = append(responses, response) + lastBot = target + + o.logger.Info("bot responded", + "bot", target, + "response_len", len(response.Text), + "iteration", i, + ) + + // Evaluate quality (Fase 3) + score := o.evaluate(ctx, msgCtx.Content, response) + o.logger.Info("quality evaluated", + "score", score.Score, + "continue", score.Continue, + "reason", score.Reason, + "iteration", i, + ) + + if score.Score >= o.cfg.Orchestration.QualityThreshold || !score.Continue { + o.logger.Info("pipeline complete", + "iterations", i+1, + "final_score", score.Score, + ) + break + } + } + + return nil +} + +// dispatchAndWait sends a TaskEvent to a bot and waits for its response. +func (o *Orchestrator) dispatchAndWait( + ctx context.Context, + botID string, + msgCtx decision.MessageContext, + iteration int, + previousResponses []orchestration.BotResponse, +) (orchestration.BotResponse, error) { + taskID := fmt.Sprintf("orch-%s-%s-%d", msgCtx.RoomID, botID, iteration) + + task := orchestration.TaskEvent{ + TaskID: taskID, + TargetBotID: botID, + TargetRoomID: msgCtx.RoomID, + OriginalSender: msgCtx.SenderID, + OriginalQuestion: msgCtx.Content, + Iteration: iteration, + PreviousResponses: previousResponses, + } + + taskJSON, err := orchestration.MarshalTaskEvent(task) + if err != nil { + return orchestration.BotResponse{}, fmt.Errorf("marshal task: %w", err) + } + + msg := bus.AgentMessage{ + From: bus.AgentID(o.cfg.Special.ID), + To: bus.AgentID(botID), + Kind: bus.KindTask, + Payload: map[string]string{"task_json": taskJSON}, + } + + timeout := o.cfg.Orchestration.DelegationTimeout + if timeout <= 0 { + timeout = 30_000_000_000 // 30s default + } + + reply, err := o.bus.SendAndWait(ctx, msg, taskID, timeout) + if err != nil { + return orchestration.BotResponse{}, err + } + + resultJSON, ok := reply.Payload["result_json"] + if !ok { + return orchestration.BotResponse{}, fmt.Errorf("reply missing result_json") + } + + result, err := orchestration.UnmarshalTaskResult(resultJSON) + if err != nil { + return orchestration.BotResponse{}, fmt.Errorf("unmarshal result: %w", err) + } + + if result.Error != "" { + return orchestration.BotResponse{}, fmt.Errorf("bot %s error: %s", botID, result.Error) + } + + return orchestration.BotResponse{ + BotID: botID, + Text: result.Text, + }, nil +} + +// loadPrompts reads the orchestrator's prompt files. +func (o *Orchestrator) loadPrompts() error { + base := filepath.Join("agents", "specials", "orchestrator", "prompts") + + routing, err := os.ReadFile(filepath.Join(base, "routing.md")) + if err != nil { + return fmt.Errorf("routing prompt: %w", err) + } + o.routingPrompt = string(routing) + + quality, err := os.ReadFile(filepath.Join(base, "quality.md")) + if err != nil { + return fmt.Errorf("quality prompt: %w", err) + } + o.qualityPrompt = string(quality) + + refinement, err := os.ReadFile(filepath.Join(base, "refinement.md")) + if err != nil { + return fmt.Errorf("refinement prompt: %w", err) + } + o.refinementPrompt = string(refinement) + + return nil +} + +// buildParticipantsList formats participant info for LLM prompts. +func (o *Orchestrator) buildParticipantsList(botIDs []string, exclude string) string { + var sb strings.Builder + for _, id := range botIDs { + if id == exclude { + continue + } + info, ok := o.participants[id] + if !ok { + sb.WriteString(fmt.Sprintf("- %s: (no description available)\n", id)) + continue + } + caps := "" + if len(info.Capabilities) > 0 { + caps = fmt.Sprintf(" (capabilities: %s)", strings.Join(info.Capabilities, ", ")) + } + sb.WriteString(fmt.Sprintf("- %s: %s%s\n", info.ID, info.Description, caps)) + } + return sb.String() +} + +func truncate(s string, n int) string { + runes := []rune(s) + if len(runes) <= n { + return s + } + return string(runes[:n]) + "..." +} diff --git a/shell/orchestration/router.go b/shell/orchestration/router.go new file mode 100644 index 0000000..d3bdabe --- /dev/null +++ b/shell/orchestration/router.go @@ -0,0 +1,107 @@ +package orchestration + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + coretypes "github.com/enmanuel/agents/pkg/llm" + "github.com/enmanuel/agents/pkg/orchestration" +) + +// routeInitial asks the LLM which bot should handle the question first. +func (o *Orchestrator) routeInitial(ctx context.Context, question string, participants []string) (orchestration.RoutingDecision, error) { + systemPrompt := strings.ReplaceAll(o.routingPrompt, "{{PARTICIPANTS}}", o.buildParticipantsList(participants, "")) + + resp, err := o.llm(ctx, coretypes.CompletionRequest{ + Model: o.cfg.LLM.Primary.Model, + MaxTokens: o.cfg.LLM.Primary.MaxTokens, + Temperature: o.cfg.LLM.Primary.Temperature, + SystemPrompt: systemPrompt, + Messages: []coretypes.Message{ + {Role: coretypes.RoleUser, Content: question}, + }, + }) + if err != nil { + return orchestration.RoutingDecision{}, fmt.Errorf("LLM routing call: %w", err) + } + + var rd orchestration.RoutingDecision + if err := json.Unmarshal([]byte(strings.TrimSpace(resp.Content)), &rd); err != nil { + o.logger.Warn("failed to parse routing response, raw", "content", resp.Content, "err", err) + return orchestration.RoutingDecision{}, fmt.Errorf("parse routing decision: %w", err) + } + + // Validate the chosen bot is actually a participant + if !contains(participants, rd.TargetBotID) { + o.logger.Warn("LLM chose unknown bot, falling back to first", "chosen", rd.TargetBotID) + rd.TargetBotID = participants[0] + rd.Confidence = 0.5 + rd.Reason = "fallback: LLM chose unknown bot" + } + + return rd, nil +} + +// routeRefinement asks the LLM which bot should improve the response, +// excluding the last respondent. +func (o *Orchestrator) routeRefinement( + ctx context.Context, + question string, + responses []orchestration.BotResponse, + participants []string, + excludeBot string, +) (orchestration.RoutingDecision, error) { + lastResponse := "" + if len(responses) > 0 { + lastResponse = responses[len(responses)-1].Text + } + + systemPrompt := strings.ReplaceAll(o.refinementPrompt, "{{PARTICIPANTS}}", o.buildParticipantsList(participants, excludeBot)) + systemPrompt = strings.ReplaceAll(systemPrompt, "{{LAST_RESPONSE}}", lastResponse) + + userContent := fmt.Sprintf("Original question: %s\n\nCurrent response that needs improvement:\n%s", question, lastResponse) + + resp, err := o.llm(ctx, coretypes.CompletionRequest{ + Model: o.cfg.LLM.Primary.Model, + MaxTokens: o.cfg.LLM.Primary.MaxTokens, + Temperature: o.cfg.LLM.Primary.Temperature, + SystemPrompt: systemPrompt, + Messages: []coretypes.Message{ + {Role: coretypes.RoleUser, Content: userContent}, + }, + }) + if err != nil { + return orchestration.RoutingDecision{}, fmt.Errorf("LLM refinement call: %w", err) + } + + var rd orchestration.RoutingDecision + if err := json.Unmarshal([]byte(strings.TrimSpace(resp.Content)), &rd); err != nil { + o.logger.Warn("failed to parse refinement response", "content", resp.Content, "err", err) + return orchestration.RoutingDecision{}, fmt.Errorf("parse refinement decision: %w", err) + } + + // Validate: must be a participant and not the excluded bot + if rd.TargetBotID == excludeBot || !contains(participants, rd.TargetBotID) { + // Pick first available that isn't excluded + for _, p := range participants { + if p != excludeBot { + rd.TargetBotID = p + rd.Reason = "fallback: LLM chose excluded or unknown bot" + break + } + } + } + + return rd, nil +} + +func contains(ss []string, s string) bool { + for _, v := range ss { + if v == s { + return true + } + } + return false +}