Files
agents_and_robots/shell/orchestration/orchestrator.go
T
egutierrez 5697b92ab8 feat: integrar structured logging en todos los componentes del shell
Se propaga *slog.Logger a todos los componentes impuros del shell:
- shell/bus/ — logs de subscribe, send, reply, timeout, unsubscribe
- shell/effects/ — duración y resultado de cada action ejecutada
- shell/llm/ (anthropic, openai, factory) — request/response con tokens, duración, fallback
- shell/memory/sqlite — open, save, recall, close con detalles
- shell/ssh/ — inicio, fin, errores y duración de comandos SSH
- tools/registry — registro, ejecución y errores de herramientas

Se usa el paquete shell/logger para field names consistentes (FieldDurationMS, FieldTokensUsed, etc.).
Cada componente recibe el logger por inyección de dependencias, sin globals.
Las firmas de New/FromConfig se actualizan para aceptar *slog.Logger.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 21:53:31 +00:00

558 lines
15 KiB
Go

// Package orchestration implements the multi-bot orchestrator runtime.
// The orchestrator intercepts Matrix events in managed rooms and coordinates
// which bot responds via the in-process bus.
package orchestration
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/id"
"github.com/enmanuel/agents/internal/config"
"github.com/enmanuel/agents/pkg/decision"
coretypes "github.com/enmanuel/agents/pkg/llm"
"github.com/enmanuel/agents/pkg/orchestration"
"github.com/enmanuel/agents/shell/bus"
shelllm "github.com/enmanuel/agents/shell/llm"
)
// RoomScanner is a read-only view of Matrix rooms and members.
// Satisfied by *mautrix.Client.
type RoomScanner interface {
JoinedRooms(ctx context.Context) (*mautrix.RespJoinedRooms, error)
JoinedMembers(ctx context.Context, roomID id.RoomID) (*mautrix.RespJoinedMembers, error)
}
// Orchestrator coordinates multi-bot rooms. It has no Matrix identity —
// it intercepts events before they reach bots and delegates via the bus.
type Orchestrator struct {
cfg *config.SpecialConfig
llm coretypes.CompleteFunc
bus *bus.Bus
logger *slog.Logger
// mu protects managedRooms, participants, and knownBotIDs.
mu sync.RWMutex
managedRooms map[string][]string // roomID → []botID
staticRooms map[string]struct{} // rooms from YAML (never auto-removed)
participants map[string]orchestration.ParticipantInfo // botID → info
knownBotIDs map[string]string // matrixUserID → botID
// Scanner for room/member discovery (set via SetScanner after startup)
scanner RoomScanner
// Prompts loaded from files
routingPrompt string
qualityPrompt string
refinementPrompt string
// Dedup: multiple bots in the same room will each trigger Intercept().
// We use a set of "room:sender:content" keys to ensure only one fires.
seenMu sync.Mutex
seen map[string]bool
}
// New creates an Orchestrator from its config.
func New(cfg *config.SpecialConfig, agentBus *bus.Bus, logger *slog.Logger) (*Orchestrator, error) {
llmFunc, err := shelllm.FromConfig(cfg.LLM.Primary, logger.With("component", "llm"))
if err != nil {
return nil, fmt.Errorf("orchestrator LLM: %w", err)
}
managed := make(map[string][]string)
static := make(map[string]struct{})
for _, room := range cfg.Orchestration.Rooms {
if room.RoomID == "" {
continue // skip empty room IDs (unset env vars)
}
managed[room.RoomID] = room.Participants
static[room.RoomID] = struct{}{}
}
o := &Orchestrator{
cfg: cfg,
llm: llmFunc,
bus: agentBus,
managedRooms: managed,
staticRooms: static,
participants: make(map[string]orchestration.ParticipantInfo),
knownBotIDs: make(map[string]string),
logger: logger,
seen: make(map[string]bool),
}
if err := o.loadPrompts(); err != nil {
return nil, fmt.Errorf("load prompts: %w", err)
}
return o, nil
}
// SetScanner injects a Matrix client for room/member discovery.
// Must be called before ScanExistingRooms.
func (o *Orchestrator) SetScanner(s RoomScanner) {
o.scanner = s
}
// RegisterParticipant adds bot metadata used for LLM routing decisions.
func (o *Orchestrator) RegisterParticipant(info orchestration.ParticipantInfo) {
o.mu.Lock()
o.participants[info.ID] = info
if info.MatrixUserID != "" {
o.knownBotIDs[info.MatrixUserID] = info.ID
}
o.mu.Unlock()
o.logger.Debug("registered participant", "bot", info.ID, "matrix_uid", info.MatrixUserID)
}
// ScanExistingRooms discovers rooms where ≥2 registered bots are members.
// Called once at startup after all participants are registered.
func (o *Orchestrator) ScanExistingRooms(ctx context.Context) {
if o.scanner == nil {
o.logger.Warn("no scanner set, skipping room discovery")
return
}
resp, err := o.scanner.JoinedRooms(ctx)
if err != nil {
o.logger.Error("failed to list joined rooms for discovery", "err", err)
return
}
for _, roomID := range resp.JoinedRooms {
o.evaluateRoom(ctx, roomID.String())
}
o.mu.RLock()
count := len(o.managedRooms)
o.mu.RUnlock()
o.logger.Info("room discovery complete", "managed_rooms", count)
}
// evaluateRoom checks if a room has ≥2 registered bots and updates managedRooms.
func (o *Orchestrator) evaluateRoom(ctx context.Context, roomID string) {
if o.scanner == nil {
return
}
members, err := o.scanner.JoinedMembers(ctx, id.RoomID(roomID))
if err != nil {
o.logger.Warn("evaluateRoom: failed to fetch members", "room", roomID, "err", err)
return
}
// Collect which registered bots are in this room
o.mu.RLock()
var presentBots []string
for matrixUID, botID := range o.knownBotIDs {
if _, ok := members.Joined[id.UserID(matrixUID)]; ok {
presentBots = append(presentBots, botID)
}
}
_, isStatic := o.staticRooms[roomID]
o.mu.RUnlock()
// Static rooms (from YAML) are never auto-managed
if isStatic {
return
}
o.mu.Lock()
defer o.mu.Unlock()
if len(presentBots) >= 2 {
prev, already := o.managedRooms[roomID]
if !already {
o.managedRooms[roomID] = presentBots
o.logger.Info("auto-managing room", "room", roomID, "bots", presentBots)
} else if len(prev) != len(presentBots) {
o.managedRooms[roomID] = presentBots
o.logger.Info("updated room participants", "room", roomID, "bots", presentBots)
}
} else {
if _, was := o.managedRooms[roomID]; was {
delete(o.managedRooms, roomID)
o.logger.Info("stopped managing room", "room", roomID, "remaining_bots", len(presentBots))
}
}
}
// NotifyMembership is called by bot listeners when a room membership changes.
// It re-evaluates whether the room should be auto-managed.
func (o *Orchestrator) NotifyMembership(ctx context.Context, roomID, userID, membership string) {
o.mu.RLock()
_, isBot := o.knownBotIDs[userID]
o.mu.RUnlock()
if !isBot {
return // only care about bot membership changes
}
o.logger.Debug("bot membership change, re-evaluating room",
"room", roomID, "user", userID, "membership", membership)
go o.evaluateRoom(ctx, roomID)
}
// ShouldIntercept returns true if the room is managed by this orchestrator.
func (o *Orchestrator) ShouldIntercept(roomID string) bool {
o.mu.RLock()
_, ok := o.managedRooms[roomID]
o.mu.RUnlock()
return ok
}
// Intercept is the InterceptFunc used by bot listeners. It checks if the
// room is managed and, if so, starts the orchestration pipeline asynchronously.
// Returns true if the event was intercepted (all bots in the room should return true,
// but only the first one triggers actual routing — the rest are deduped).
func (o *Orchestrator) Intercept(ctx context.Context, msgCtx decision.MessageContext) bool {
if !o.ShouldIntercept(msgCtx.RoomID) {
return false
}
// Ignore messages from known bots to prevent feedback loops.
o.mu.RLock()
_, senderIsBot := o.knownBotIDs[msgCtx.SenderID]
o.mu.RUnlock()
if senderIsBot {
return true // suppress but don't route — bot's own message
}
// Dedup: multiple bots receive the same event. Only route once.
key := msgCtx.RoomID + ":" + msgCtx.SenderID + ":" + msgCtx.Content
o.seenMu.Lock()
if o.seen[key] {
o.seenMu.Unlock()
return true // still intercept (don't let the bot handle it) but don't route again
}
o.seen[key] = true
o.seenMu.Unlock()
// Route asynchronously so the listener isn't blocked.
// Clean up the dedup key after routing completes.
go func() {
defer func() {
o.seenMu.Lock()
delete(o.seen, key)
o.seenMu.Unlock()
}()
if err := o.Route(ctx, msgCtx); err != nil {
o.logger.Error("orchestration failed", "room", msgCtx.RoomID, "err", err)
}
}()
return true
}
// Route is the main entry point. Called when a human posts in a managed room.
// It decides which bot(s) should respond and dispatches tasks via the bus.
func (o *Orchestrator) Route(ctx context.Context, msgCtx decision.MessageContext) error {
o.mu.RLock()
participants, ok := o.managedRooms[msgCtx.RoomID]
participantsCopy := append([]string(nil), participants...)
o.mu.RUnlock()
if !ok {
return fmt.Errorf("room %s is not managed", msgCtx.RoomID)
}
o.logger.Info("orchestrating message",
"room", msgCtx.RoomID,
"sender", msgCtx.SenderID,
"participants", participantsCopy,
"content_preview", truncate(msgCtx.Content, 80),
)
// Optimization: single bot → dispatch directly without LLM
if len(participantsCopy) == 1 {
o.logger.Debug("single participant, dispatching directly", "bot", participantsCopy[0])
_, err := o.dispatchAndWait(ctx, participantsCopy[0], msgCtx, 0, nil)
return err
}
var responses []orchestration.BotResponse
var lastBot string
maxIter := o.cfg.Orchestration.MaxIterations
if maxIter <= 0 {
maxIter = 3
}
for i := 0; i < maxIter; i++ {
// Route: decide which bot responds
var target string
var err error
if i == 0 {
rd, routeErr := o.routeInitial(ctx, msgCtx.Content, participantsCopy)
if routeErr != nil {
o.logger.Error("routing failed, falling back to first participant", "err", routeErr)
target = participantsCopy[0]
} else {
target = rd.TargetBotID
o.logger.Info("routed to bot",
"bot", target,
"confidence", rd.Confidence,
"reason", rd.Reason,
"iteration", i,
)
}
} else {
rd, routeErr := o.routeRefinement(ctx, msgCtx.Content, responses, participantsCopy, lastBot)
if routeErr != nil {
o.logger.Warn("refinement routing failed, stopping pipeline", "err", routeErr)
break
}
target = rd.TargetBotID
o.logger.Info("refinement routed to bot",
"bot", target,
"reason", rd.Reason,
"iteration", i,
)
}
// Dispatch: send TaskEvent to bot via bus and wait for response
response, err := o.dispatchAndWait(ctx, target, msgCtx, i, responses)
if err != nil {
o.logger.Error("dispatch failed", "bot", target, "err", err)
break
}
responses = append(responses, response)
lastBot = target
o.logger.Info("bot responded",
"bot", target,
"response_len", len(response.Text),
"iteration", i,
)
// Fallback: detect circular conversations before quality evaluation
if o.detectRepetition(responses) {
o.logger.Warn("repetition detected, stopping pipeline to prevent circular conversation",
"iteration", i+1,
"total_responses", len(responses),
)
break
}
// Evaluate quality (Fase 3)
score := o.evaluate(ctx, msgCtx.Content, response)
o.logger.Info("quality evaluated",
"score", score.Score,
"continue", score.Continue,
"reason", score.Reason,
"iteration", i,
)
if score.Score >= o.cfg.Orchestration.QualityThreshold || !score.Continue {
o.logger.Info("pipeline complete",
"iterations", i+1,
"final_score", score.Score,
)
break
}
}
return nil
}
// dispatchAndWait sends a TaskEvent to a bot and waits for its response.
func (o *Orchestrator) dispatchAndWait(
ctx context.Context,
botID string,
msgCtx decision.MessageContext,
iteration int,
previousResponses []orchestration.BotResponse,
) (orchestration.BotResponse, error) {
taskID := fmt.Sprintf("orch-%s-%s-%d", msgCtx.RoomID, botID, iteration)
task := orchestration.TaskEvent{
TaskID: taskID,
TargetBotID: botID,
TargetRoomID: msgCtx.RoomID,
OriginalSender: msgCtx.SenderID,
OriginalQuestion: msgCtx.Content,
Iteration: iteration,
PreviousResponses: previousResponses,
}
taskJSON, err := orchestration.MarshalTaskEvent(task)
if err != nil {
return orchestration.BotResponse{}, fmt.Errorf("marshal task: %w", err)
}
msg := bus.AgentMessage{
From: bus.AgentID(o.cfg.Special.ID),
To: bus.AgentID(botID),
Kind: bus.KindTask,
Payload: map[string]string{"task_json": taskJSON},
}
timeout := o.cfg.Orchestration.DelegationTimeout
if timeout <= 0 {
timeout = 30_000_000_000 // 30s default
}
reply, err := o.bus.SendAndWait(ctx, msg, taskID, timeout)
if err != nil {
return orchestration.BotResponse{}, err
}
resultJSON, ok := reply.Payload["result_json"]
if !ok {
return orchestration.BotResponse{}, fmt.Errorf("reply missing result_json")
}
result, err := orchestration.UnmarshalTaskResult(resultJSON)
if err != nil {
return orchestration.BotResponse{}, fmt.Errorf("unmarshal result: %w", err)
}
if result.Error != "" {
return orchestration.BotResponse{}, fmt.Errorf("bot %s error: %s", botID, result.Error)
}
return orchestration.BotResponse{
BotID: botID,
Text: result.Text,
}, nil
}
// loadPrompts reads the orchestrator's prompt files.
func (o *Orchestrator) loadPrompts() error {
base := filepath.Join("agents", "specials", "orchestrator", "prompts")
routing, err := os.ReadFile(filepath.Join(base, "routing.md"))
if err != nil {
return fmt.Errorf("routing prompt: %w", err)
}
o.routingPrompt = string(routing)
quality, err := os.ReadFile(filepath.Join(base, "quality.md"))
if err != nil {
return fmt.Errorf("quality prompt: %w", err)
}
o.qualityPrompt = string(quality)
refinement, err := os.ReadFile(filepath.Join(base, "refinement.md"))
if err != nil {
return fmt.Errorf("refinement prompt: %w", err)
}
o.refinementPrompt = string(refinement)
return nil
}
// buildParticipantsList formats participant info for LLM prompts.
func (o *Orchestrator) buildParticipantsList(botIDs []string, exclude string) string {
o.mu.RLock()
defer o.mu.RUnlock()
var sb strings.Builder
for _, id := range botIDs {
if id == exclude {
continue
}
info, ok := o.participants[id]
if !ok {
sb.WriteString(fmt.Sprintf("- %s: (no description available)\n", id))
continue
}
caps := ""
if len(info.Capabilities) > 0 {
caps = fmt.Sprintf(" (capabilities: %s)", strings.Join(info.Capabilities, ", "))
}
sb.WriteString(fmt.Sprintf("- %s: %s%s\n", info.ID, info.Description, caps))
}
return sb.String()
}
// detectRepetition checks if a new response is too similar to previous responses,
// indicating a circular conversation that should be stopped.
// Returns true if the conversation should be terminated.
func (o *Orchestrator) detectRepetition(responses []orchestration.BotResponse) bool {
if len(responses) < 2 {
return false
}
threshold := o.cfg.Orchestration.RepetitionThreshold
if threshold <= 0 {
threshold = 0.6 // default
}
latest := responses[len(responses)-1].Text
for i := 0; i < len(responses)-1; i++ {
if similarity(latest, responses[i].Text) >= threshold {
return true
}
}
return false
}
// similarity computes a simple bigram-based similarity ratio between two strings.
// Returns a value between 0.0 (completely different) and 1.0 (identical).
func similarity(a, b string) float64 {
if a == b {
return 1.0
}
a = strings.ToLower(strings.TrimSpace(a))
b = strings.ToLower(strings.TrimSpace(b))
if a == b {
return 1.0
}
if len(a) < 2 || len(b) < 2 {
return 0.0
}
bigramsA := makeBigrams(a)
bigramsB := makeBigrams(b)
// Count intersection
intersection := 0
for bg, countA := range bigramsA {
if countB, ok := bigramsB[bg]; ok {
if countA < countB {
intersection += countA
} else {
intersection += countB
}
}
}
totalA := 0
for _, c := range bigramsA {
totalA += c
}
totalB := 0
for _, c := range bigramsB {
totalB += c
}
if totalA+totalB == 0 {
return 0.0
}
return float64(2*intersection) / float64(totalA+totalB)
}
func makeBigrams(s string) map[string]int {
runes := []rune(s)
bgs := make(map[string]int, len(runes))
for i := 0; i < len(runes)-1; i++ {
bg := string(runes[i : i+2])
bgs[bg]++
}
return bgs
}
func truncate(s string, n int) string {
runes := []rune(s)
if len(runes) <= n {
return s
}
return string(runes[:n]) + "..."
}