Files
agents_and_robots/agents/runtime.go
T
egutierrez 2667af52cc feat: implement multi-bot orchestration system with LLM routing
Implementa el sistema de orquestación para salas Matrix con múltiples bots.
El orquestador es un "special agent" sin identidad Matrix que coordina qué bot
responde y cuándo, usando LLM (Claude) para routing y evaluación de calidad.

Cambios principales:
- pkg/orchestration/task.go: tipos puros (TaskEvent, BotResponse, QualityScore, RoutingDecision)
- shell/orchestration/: runtime del orquestador (orchestrator.go, router.go, evaluator.go)
- agents/specials/orchestrator/: config + prompts (routing, quality, refinement)
- internal/config/: SpecialConfig, OrchestrationCfg, LoadSpecial()
- shell/bus/bus.go: protocolo request-reply (SendAndWait, Reply) para delegación
- shell/matrix/listener.go: InterceptFunc para interceptar eventos en salas orquestadas
- agents/runtime.go: SetBus, listenBus, handleTaskEvent para recibir tareas del orquestador
- cmd/launcher/main.go: creación de bus compartido, arranque del orquestador antes de bots

Incluye deduplicación para evitar que múltiples listeners en la misma sala
disparen el orquestador más de una vez por mensaje.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 09:05:42 +00:00

636 lines
18 KiB
Go

// Package agents defines the Agent runtime that ties core and shell together.
package agents
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"sync"
"maunium.net/go/mautrix/event"
"github.com/enmanuel/agents/internal/config"
"github.com/enmanuel/agents/pkg/decision"
coretypes "github.com/enmanuel/agents/pkg/llm"
"github.com/enmanuel/agents/pkg/memory"
"github.com/enmanuel/agents/pkg/orchestration"
"github.com/enmanuel/agents/pkg/personality"
"github.com/enmanuel/agents/shell/bus"
"github.com/enmanuel/agents/shell/effects"
shelllm "github.com/enmanuel/agents/shell/llm"
"github.com/enmanuel/agents/shell/matrix"
shellmem "github.com/enmanuel/agents/shell/memory"
"github.com/enmanuel/agents/shell/ssh"
"github.com/enmanuel/agents/tools"
)
const (
defaultMaxToolIterations = 5
defaultWindowSize = 20
)
// Agent is the assembled runtime: pure core + impure shell.
type Agent struct {
cfg *config.AgentConfig
personality personality.Personality
rules []decision.Rule
llm coretypes.CompleteFunc
matrix *matrix.Client
runner *effects.Runner
listener *matrix.Listener
toolReg *tools.Registry
logger *slog.Logger
cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown
// Memory
windows map[string]memory.Window
windowsMu sync.RWMutex
memStore memory.Store // nil when memory is disabled
windowSize int
roomCtx *tools.RoomContext
// Bus — set via SetBus() when running under the unified launcher
agentBus *bus.Bus
}
// ClearWindow resets the conversation window for a room and deletes persisted
// messages from SQLite so the agent starts fresh. Implements tools.WindowClearer.
func (a *Agent) ClearWindow(roomID string) {
a.windowsMu.Lock()
a.windows[roomID] = memory.NewWindow(a.windowSize)
a.windowsMu.Unlock()
if a.memStore != nil {
if err := a.memStore.DeleteMessages(
context.Background(), a.cfg.Agent.ID, &roomID,
); err != nil {
a.logger.Warn("failed to delete persisted messages on clear", "room", roomID, "err", err)
}
}
}
// New assembles an Agent from its config, rules, and logger.
func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (*Agent, error) {
// Matrix client
matrixClient, err := matrix.New(cfg.Matrix)
if err != nil {
return nil, fmt.Errorf("matrix client: %w", err)
}
// E2EE — initialize before the sync loop starts
var cryptoStore io.Closer
if cfg.Matrix.Encryption.Enabled {
storePath := filepath.Join(cfg.Matrix.Encryption.StorePath, "crypto.db")
pickleKey := os.Getenv(cfg.Matrix.Encryption.PickleKeyEnv)
logger.Info("initializing e2ee", "store", storePath)
cryptoStore, err = matrixClient.InitCrypto(context.Background(), storePath, pickleKey, cfg.Agent.ID)
if err != nil {
return nil, fmt.Errorf("e2ee init: %w", err)
}
// Auto-fetch cross-signing private keys from SSSS if recovery key is configured.
if envName := cfg.Matrix.Encryption.RecoveryKeyEnv; envName != "" {
if rk := os.Getenv(envName); rk != "" {
if err := matrixClient.FetchCrossSigningKeys(context.Background(), rk); err != nil {
logger.Warn("failed to fetch cross-signing keys from SSSS (non-fatal)", "err", err)
} else {
logger.Info("cross-signing private keys fetched from SSSS")
}
}
}
// Sign own device with the self-signing key so Element shows it as verified.
if err := matrixClient.SignOwnDevice(context.Background()); err != nil {
logger.Warn("failed to sign own device (non-fatal)", "err", err)
} else {
logger.Info("own device signed with cross-signing key")
}
logger.Info("e2ee ready")
}
// SSH executor
sshExec := ssh.NewExecutor(cfg.SSH)
// LLM client
primaryLLM, err := shelllm.FromConfig(cfg.LLM.Primary)
if err != nil {
return nil, fmt.Errorf("primary LLM: %w", err)
}
var llmFunc coretypes.CompleteFunc = primaryLLM
if cfg.LLM.Fallback.Provider != "" {
fallbackLLM, err := shelllm.FromConfig(cfg.LLM.Fallback)
if err != nil {
logger.Warn("fallback LLM config error", "err", err)
} else {
llmFunc = shelllm.WithFallback(primaryLLM, fallbackLLM)
}
}
// Effects runner
runner := effects.NewRunner(matrixClient, sshExec, logger)
// Memory subsystem
var memStore memory.Store
windowSize := defaultWindowSize
roomCtx := &tools.RoomContext{}
if cfg.Memory.Enabled {
windowSize = cfg.Memory.WindowSize
if windowSize <= 0 {
windowSize = defaultWindowSize
}
dbPath := cfg.Memory.DBPath
if dbPath == "" {
dbPath = filepath.Join("agents", cfg.Agent.ID, "data", "memory.db")
}
store, err := shellmem.New(dbPath)
if err != nil {
return nil, fmt.Errorf("memory store: %w", err)
}
memStore = store
logger.Info("memory enabled", "window_size", windowSize, "db", dbPath)
}
// Tool registry — register tools enabled in config
toolReg := buildToolRegistry(cfg, sshExec, matrixClient, memStore, roomCtx, logger)
a := &Agent{
cfg: cfg,
rules: rules,
llm: llmFunc,
matrix: matrixClient,
runner: runner,
toolReg: toolReg,
logger: logger,
cryptoStore: cryptoStore,
windows: make(map[string]memory.Window),
memStore: memStore,
windowSize: windowSize,
roomCtx: roomCtx,
}
// Register memory_clear_context with self as WindowClearer (after a is created)
if cfg.Tools.Memory.Enabled && memStore != nil {
toolReg.Register(tools.NewMemoryClearContext(a, roomCtx))
}
// Matrix event listener
a.listener = matrix.NewListener(matrixClient, cfg.Matrix, a.handleEvent, logger)
return a, nil
}
// SetBus attaches the agent to the inter-agent bus for orchestration.
// Must be called before Run().
func (a *Agent) SetBus(b *bus.Bus) {
a.agentBus = b
}
// SetInterceptor configures the listener to skip events in orchestrated rooms.
func (a *Agent) SetInterceptor(fn matrix.InterceptFunc) {
a.listener.SetInterceptor(fn)
}
// Run starts the agent sync loop. Blocks until ctx is cancelled.
func (a *Agent) Run(ctx context.Context) error {
if a.cryptoStore != nil {
defer a.cryptoStore.Close()
}
if a.memStore != nil {
defer a.memStore.Close()
}
a.logger.Info("agent starting",
"id", a.cfg.Agent.ID,
"name", a.cfg.Agent.Name,
"tools", a.toolReg.Names(),
)
// Start bus listener if connected to the orchestration bus
if a.agentBus != nil {
ch := a.agentBus.Subscribe(bus.AgentID(a.cfg.Agent.ID))
go a.listenBus(ctx, ch)
a.logger.Info("bus listener started")
}
return a.listener.Run(ctx)
}
// listenBus processes messages from the inter-agent bus.
func (a *Agent) listenBus(ctx context.Context, ch <-chan bus.AgentMessage) {
for {
select {
case <-ctx.Done():
return
case msg, ok := <-ch:
if !ok {
return
}
if msg.Kind == bus.KindTask {
a.handleTaskEvent(ctx, msg)
}
}
}
}
// handleTaskEvent processes a task delegated by the orchestrator.
// The bot generates a response and sends it both to Matrix and back via bus.
func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) {
taskJSON, ok := msg.Payload["task_json"]
if !ok {
a.logger.Error("task message missing task_json payload")
return
}
task, err := orchestration.UnmarshalTaskEvent(taskJSON)
if err != nil {
a.logger.Error("failed to unmarshal task event", "err", err)
return
}
a.logger.Info("handling orchestrated task",
"task_id", task.TaskID,
"room", task.TargetRoomID,
"sender", task.OriginalSender,
"iteration", task.Iteration,
)
roomID := task.TargetRoomID
// Update room context for memory tools
a.roomCtx.Set(roomID)
if a.cfg.Personality.Behavior.TypingIndicator {
_ = a.matrix.SendTyping(ctx, roomID, true)
defer a.matrix.SendTyping(ctx, roomID, false)
}
// Build a synthetic MessageContext from the task
msgCtx := decision.MessageContext{
SenderID: task.OriginalSender,
RoomID: roomID,
Content: task.OriginalQuestion,
IsDirectMsg: false,
IsMention: true, // treat orchestrated tasks like mentions
}
// If there are previous responses, prepend context
if len(task.PreviousResponses) > 0 {
var context string
for _, pr := range task.PreviousResponses {
context += fmt.Sprintf("[Previous response from %s]: %s\n\n", pr.BotID, pr.Text)
}
msgCtx.Content = context + "Original question: " + task.OriginalQuestion +
"\n\nPlease provide an improved or complementary answer."
}
// Load memory and run LLM
a.ensureWindowLoaded(ctx, roomID)
a.appendToWindow(roomID, coretypes.Message{
Role: coretypes.RoleUser, Content: msgCtx.Content,
})
reply, err := a.runLLM(ctx, msgCtx)
// Build the result to send back via bus
result := orchestration.TaskResult{
TaskID: task.TaskID,
BotID: a.cfg.Agent.ID,
}
if err != nil {
a.logger.Error("LLM error during orchestrated task", "err", err)
result.Error = err.Error()
reply = "Sorry, I encountered an error."
} else {
result.Text = reply
// Persist assistant reply
a.appendToWindow(roomID, coretypes.Message{
Role: coretypes.RoleAssistant, Content: reply,
})
a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply)
}
// Send reply to Matrix room
if sendErr := a.matrix.SendText(ctx, roomID, reply); sendErr != nil {
a.logger.Error("failed to send orchestrated reply to Matrix", "err", sendErr)
}
// Send result back to orchestrator via bus
resultJSON, marshalErr := orchestration.MarshalTaskResult(result)
if marshalErr != nil {
a.logger.Error("failed to marshal task result", "err", marshalErr)
return
}
replyMsg := bus.AgentMessage{
From: bus.AgentID(a.cfg.Agent.ID),
To: msg.From,
Kind: bus.KindTaskResult,
Payload: map[string]string{"result_json": resultJSON},
}
if busErr := a.agentBus.Reply(task.TaskID, replyMsg); busErr != nil {
a.logger.Error("failed to send task result via bus", "err", busErr)
}
}
// handleEvent is called by the matrix Listener for each filtered incoming event.
func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) {
a.logger.Debug("handling event",
"sender", msgCtx.SenderID,
"is_dm", msgCtx.IsDirectMsg,
"is_mention", msgCtx.IsMention,
"command", msgCtx.Command,
)
roomID := evt.RoomID.String()
// Update room context for memory tools
a.roomCtx.Set(roomID)
if a.cfg.Personality.Behavior.TypingIndicator {
_ = a.matrix.SendTyping(ctx, roomID, true)
defer a.matrix.SendTyping(ctx, roomID, false)
}
actions := decision.Evaluate(msgCtx, a.rules)
a.logger.Debug("rules evaluated", "matched_actions", len(actions))
// If no rules matched and the message mentions the bot or is a DM, use LLM.
if len(actions) == 0 && (msgCtx.IsMention || msgCtx.IsDirectMsg) {
a.logger.Debug("no rules matched, falling back to LLM")
actions = []decision.Action{{
Kind: decision.ActionKindLLM,
LLM: &decision.LLMAction{ContextKey: msgCtx.RoomID},
}}
}
if len(actions) == 0 {
a.logger.Debug("no actions, ignoring message",
"is_dm", msgCtx.IsDirectMsg,
"is_mention", msgCtx.IsMention,
)
return
}
// Expand LLM actions inline — with tool-use loop when enabled
expanded := make([]decision.Action, 0, len(actions))
for _, act := range actions {
if act.Kind == decision.ActionKindLLM {
// Memory: load window + append user message before LLM call
a.ensureWindowLoaded(ctx, roomID)
a.appendToWindow(roomID, coretypes.Message{
Role: coretypes.RoleUser, Content: msgCtx.Content,
})
a.persistMessage(ctx, roomID, coretypes.RoleUser, msgCtx.Content)
reply, err := a.runLLM(ctx, msgCtx)
if err != nil {
a.logger.Error("llm error", "err", err)
expanded = append(expanded, decision.Action{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error."},
})
} else {
expanded = append(expanded, decision.Action{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: reply},
})
// Memory: append assistant reply after LLM call
a.appendToWindow(roomID, coretypes.Message{
Role: coretypes.RoleAssistant, Content: reply,
})
a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply)
}
} else {
expanded = append(expanded, act)
}
}
a.runner.Execute(ctx, roomID, expanded)
}
func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (string, error) {
a.logger.Debug("calling LLM",
"model", a.cfg.LLM.Primary.Model,
"provider", a.cfg.LLM.Primary.Provider,
)
// Load system prompt from file if configured, else use description
systemPrompt := a.cfg.Agent.Description
// Build messages: conversation history from window (includes current user msg)
messages := a.getWindowMessages(msgCtx.RoomID)
if len(messages) == 0 {
// Fallback if memory is disabled: just the current message
messages = []coretypes.Message{
{Role: coretypes.RoleUser, Content: msgCtx.Content},
}
}
// Build tool specs for the LLM if tool_use is enabled
var llmTools []coretypes.ToolSpec
if a.cfg.LLM.ToolUse.Enabled && a.toolReg.Len() > 0 {
llmTools = a.toolReg.ToLLMSpecs()
a.logger.Debug("tools available for LLM", "count", len(llmTools))
}
maxIter := a.cfg.LLM.ToolUse.MaxIterations
if maxIter <= 0 {
maxIter = defaultMaxToolIterations
}
// Tool-use loop: call LLM → execute tools → feed results back → repeat
for i := 0; i < maxIter; i++ {
req := coretypes.CompletionRequest{
Model: a.cfg.LLM.Primary.Model,
MaxTokens: a.cfg.LLM.Primary.MaxTokens,
Temperature: a.cfg.LLM.Primary.Temperature,
SystemPrompt: systemPrompt,
Messages: messages,
Tools: llmTools,
}
resp, err := a.llm(ctx, req)
if err != nil {
a.logger.Error("LLM call failed", "model", req.Model, "err", err)
return "", err
}
a.logger.Debug("LLM responded",
"content_len", len(resp.Content),
"tool_calls", len(resp.ToolCalls),
"finish_reason", resp.FinishReason,
)
// No tool calls — return the text response
if len(resp.ToolCalls) == 0 {
return resp.Content, nil
}
// Append assistant message with tool calls to conversation
messages = append(messages, coretypes.Message{
Role: coretypes.RoleAssistant,
Content: resp.Content,
ToolCalls: resp.ToolCalls,
})
// Execute each tool and append results
for _, tc := range resp.ToolCalls {
a.logger.Info("executing tool",
"tool", tc.Name,
"call_id", tc.ID,
)
// Notify the room that a tool is being called
toolNotice := fmt.Sprintf("🔨 <em>%s</em>", tc.Name)
if err := a.matrix.SendMarkdown(ctx, msgCtx.RoomID, toolNotice); err != nil {
a.logger.Warn("failed to send tool call notice", "tool", tc.Name, "err", err)
}
result := a.toolReg.Execute(ctx, tc.Name, tc.Arguments)
output := result.Output
if result.Err != nil {
output = fmt.Sprintf("error: %s", result.Err)
a.logger.Warn("tool execution error",
"tool", tc.Name,
"err", result.Err,
)
} else {
a.logger.Debug("tool executed",
"tool", tc.Name,
"output_len", len(output),
)
}
messages = append(messages, coretypes.Message{
Role: coretypes.RoleTool,
Content: output,
ToolCallID: tc.ID,
})
}
}
// Max iterations reached — return whatever we have
a.logger.Warn("tool-use loop reached max iterations", "max", maxIter)
return "I've reached the maximum number of tool iterations. Here's what I found so far.", nil
}
// ── Memory helpers ───────────────────────────────────────────────────────
// ensureWindowLoaded loads the conversation window from SQLite on first access for a room.
func (a *Agent) ensureWindowLoaded(ctx context.Context, roomID string) {
a.windowsMu.Lock()
defer a.windowsMu.Unlock()
if _, ok := a.windows[roomID]; ok {
return
}
w := memory.NewWindow(a.windowSize)
if a.memStore != nil {
msgs, err := a.memStore.LoadMessages(ctx, a.cfg.Agent.ID, roomID, a.windowSize)
if err != nil {
a.logger.Warn("failed to load message history", "room", roomID, "err", err)
} else {
for _, m := range msgs {
w = w.Append(coretypes.Message{Role: m.Role, Content: m.Content})
}
if len(msgs) > 0 {
a.logger.Debug("loaded message history", "room", roomID, "count", len(msgs))
}
}
}
a.windows[roomID] = w
}
// appendToWindow adds a message to the in-memory conversation window.
func (a *Agent) appendToWindow(roomID string, msg coretypes.Message) {
a.windowsMu.Lock()
defer a.windowsMu.Unlock()
w, ok := a.windows[roomID]
if !ok {
w = memory.NewWindow(a.windowSize)
}
a.windows[roomID] = w.Append(msg)
}
// getWindowMessages returns a copy of the conversation window for a room.
func (a *Agent) getWindowMessages(roomID string) []coretypes.Message {
a.windowsMu.RLock()
defer a.windowsMu.RUnlock()
w, ok := a.windows[roomID]
if !ok {
return nil
}
return w.ToLLMMessages()
}
// persistMessage saves a message to the SQLite store (no-op if store is nil).
func (a *Agent) persistMessage(ctx context.Context, roomID string, role coretypes.Role, content string) {
if a.memStore == nil {
return
}
if err := a.memStore.SaveMessage(ctx, memory.HistoryMessage{
AgentID: a.cfg.Agent.ID,
RoomID: roomID,
Role: role,
Content: content,
}); err != nil {
a.logger.Warn("failed to persist message", "room", roomID, "err", err)
}
}
// buildToolRegistry creates a Registry with tools enabled in the agent's config.
func buildToolRegistry(
cfg *config.AgentConfig,
sshExec *ssh.Executor,
matrixClient *matrix.Client,
memStore memory.Store,
roomCtx *tools.RoomContext,
logger *slog.Logger,
) *tools.Registry {
reg := tools.NewRegistry()
if cfg.Tools.HTTP.Enabled {
reg.Register(tools.NewHTTPGet(cfg.Tools.HTTP))
reg.Register(tools.NewHTTPPost(cfg.Tools.HTTP))
logger.Debug("registered http tools")
}
if cfg.Tools.SSH.Enabled {
reg.Register(tools.NewSSHCommand(cfg.Tools.SSH, sshExec))
logger.Debug("registered ssh tool")
}
if cfg.Tools.FileOps.Enabled {
reg.Register(tools.NewReadFile(cfg.Tools.FileOps))
logger.Debug("registered file tool")
}
// current_time is always available
reg.Register(tools.NewCurrentTime())
logger.Debug("registered current_time tool")
// matrix_send is always available
reg.Register(tools.NewMatrixSend(matrixClient))
logger.Debug("registered matrix tool")
// Memory tools (memory_clear_context registered later since it needs the Agent)
if cfg.Tools.Memory.Enabled && memStore != nil {
reg.Register(tools.NewMemorySave(cfg.Agent.ID, memStore))
reg.Register(tools.NewMemoryRecall(cfg.Agent.ID, memStore))
reg.Register(tools.NewMemoryForget(cfg.Agent.ID, memStore))
reg.Register(tools.NewMemorySummary(cfg.Agent.ID, memStore))
logger.Debug("registered memory tools")
}
return reg
}