Files
agents_and_robots/shell/bus/bus.go
T
egutierrez 2667af52cc feat: implement multi-bot orchestration system with LLM routing
Implementa el sistema de orquestación para salas Matrix con múltiples bots.
El orquestador es un "special agent" sin identidad Matrix que coordina qué bot
responde y cuándo, usando LLM (Claude) para routing y evaluación de calidad.

Cambios principales:
- pkg/orchestration/task.go: tipos puros (TaskEvent, BotResponse, QualityScore, RoutingDecision)
- shell/orchestration/: runtime del orquestador (orchestrator.go, router.go, evaluator.go)
- agents/specials/orchestrator/: config + prompts (routing, quality, refinement)
- internal/config/: SpecialConfig, OrchestrationCfg, LoadSpecial()
- shell/bus/bus.go: protocolo request-reply (SendAndWait, Reply) para delegación
- shell/matrix/listener.go: InterceptFunc para interceptar eventos en salas orquestadas
- agents/runtime.go: SetBus, listenBus, handleTaskEvent para recibir tareas del orquestador
- cmd/launcher/main.go: creación de bus compartido, arranque del orquestador antes de bots

Incluye deduplicación para evitar que múltiples listeners en la misma sala
disparen el orquestador más de una vez por mensaje.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 09:05:42 +00:00

130 lines
3.0 KiB
Go

// Package bus provides in-process agent-to-agent message passing.
package bus
import (
"context"
"fmt"
"sync"
"time"
)
// Well-known message kinds used by the orchestrator.
const (
KindTask = "task" // orchestrator → bot: handle this question
KindTaskResult = "task_result" // bot → orchestrator: here is my answer
)
// AgentID identifies an agent.
type AgentID string
// AgentMessage is a message between agents.
type AgentMessage struct {
From AgentID
To AgentID
Kind string
Payload map[string]string
}
// Bus manages channels for inter-agent communication.
type Bus struct {
mu sync.RWMutex
channels map[AgentID]chan AgentMessage
replyMu sync.Mutex
replyChs map[string]chan AgentMessage // taskID → one-shot reply channel
}
// New creates a new Bus.
func New() *Bus {
return &Bus{
channels: make(map[AgentID]chan AgentMessage),
replyChs: make(map[string]chan AgentMessage),
}
}
// Subscribe registers an agent and returns its receive channel.
func (b *Bus) Subscribe(id AgentID) <-chan AgentMessage {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan AgentMessage, 64)
b.channels[id] = ch
return ch
}
// Send delivers a message to an agent's channel.
func (b *Bus) Send(msg AgentMessage) error {
b.mu.RLock()
ch, ok := b.channels[msg.To]
b.mu.RUnlock()
if !ok {
return fmt.Errorf("agent %q not registered on bus", msg.To)
}
select {
case ch <- msg:
return nil
default:
return fmt.Errorf("agent %q message queue full", msg.To)
}
}
// SendAndWait sends a task message and blocks until a reply with the matching
// taskID arrives or the context expires. The caller must ensure the reply is
// routed via Reply().
func (b *Bus) SendAndWait(ctx context.Context, msg AgentMessage, taskID string, timeout time.Duration) (AgentMessage, error) {
ch := make(chan AgentMessage, 1)
b.replyMu.Lock()
b.replyChs[taskID] = ch
b.replyMu.Unlock()
defer func() {
b.replyMu.Lock()
delete(b.replyChs, taskID)
b.replyMu.Unlock()
}()
if err := b.Send(msg); err != nil {
return AgentMessage{}, err
}
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case reply := <-ch:
return reply, nil
case <-timer.C:
return AgentMessage{}, fmt.Errorf("task %s: delegation timeout after %s", taskID, timeout)
case <-ctx.Done():
return AgentMessage{}, ctx.Err()
}
}
// Reply routes a task_result message to the waiting SendAndWait caller.
// If no one is waiting for this taskID, it falls back to regular Send.
func (b *Bus) Reply(taskID string, msg AgentMessage) error {
b.replyMu.Lock()
ch, ok := b.replyChs[taskID]
b.replyMu.Unlock()
if ok {
select {
case ch <- msg:
return nil
default:
return fmt.Errorf("reply channel full for task %s", taskID)
}
}
// Fallback: deliver via regular channel
return b.Send(msg)
}
// Unsubscribe removes an agent from the bus.
func (b *Bus) Unsubscribe(id AgentID) {
b.mu.Lock()
defer b.mu.Unlock()
if ch, ok := b.channels[id]; ok {
close(ch)
delete(b.channels, id)
}
}