From 7176afde0ae0b50f161dfd2ee94c7ceb3f6578e0 Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Fri, 6 Mar 2026 17:03:08 +0000 Subject: [PATCH] feat: update orchestrator for enhanced multi-bot management and room discovery --- agents/asistente-2/config.yaml | 2 +- agents/assistant-bot/config.yaml | 2 +- agents/runtime.go | 11 ++ agents/specials/orchestrator/config.yaml | 12 +- cmd/launcher/main.go | 36 +++-- pkg/orchestration/task.go | 1 + shell/matrix/listener.go | 37 ++++-- shell/orchestration/orchestrator.go | 159 +++++++++++++++++++++-- 8 files changed, 220 insertions(+), 40 deletions(-) diff --git a/agents/asistente-2/config.yaml b/agents/asistente-2/config.yaml index b8c83b2..4cee4b5 100644 --- a/agents/asistente-2/config.yaml +++ b/agents/asistente-2/config.yaml @@ -126,7 +126,7 @@ matrix: homeserver: "https://matrix-af2f3d.organic-machine.com" user_id: "@asistente-2:matrix-af2f3d.organic-machine.com" access_token_env: MATRIX_TOKEN_ASISTENTE_2 - device_id: "XUGTSZJYFQ" + device_id: "IVECMVQWNZ" encryption: enabled: true diff --git a/agents/assistant-bot/config.yaml b/agents/assistant-bot/config.yaml index 55a29b2..b0832b0 100644 --- a/agents/assistant-bot/config.yaml +++ b/agents/assistant-bot/config.yaml @@ -127,7 +127,7 @@ matrix: homeserver: "https://matrix-af2f3d.organic-machine.com" user_id: "@assistant-bot:matrix-af2f3d.organic-machine.com" access_token_env: MATRIX_TOKEN_ASSISTANT_BOT - device_id: "SMWMRKMHDH" + device_id: "WXAKFKILMR" encryption: enabled: true diff --git a/agents/runtime.go b/agents/runtime.go index 53fd95e..ffc3342 100644 --- a/agents/runtime.go +++ b/agents/runtime.go @@ -10,6 +10,7 @@ import ( "path/filepath" "sync" + "maunium.net/go/mautrix" "maunium.net/go/mautrix/event" "github.com/enmanuel/agents/internal/config" @@ -197,6 +198,16 @@ func (a *Agent) SetInterceptor(fn matrix.InterceptFunc) { a.listener.SetInterceptor(fn) } +// SetMembershipNotify registers a callback for room membership changes. +func (a *Agent) SetMembershipNotify(fn matrix.MembershipNotifyFunc) { + a.listener.SetMembershipNotify(fn) +} + +// RawMatrixClient returns the underlying *mautrix.Client for room scanning. +func (a *Agent) RawMatrixClient() *mautrix.Client { + return a.matrix.Raw() +} + // Run starts the agent sync loop. Blocks until ctx is cancelled. func (a *Agent) Run(ctx context.Context) error { if a.cryptoStore != nil { diff --git a/agents/specials/orchestrator/config.yaml b/agents/specials/orchestrator/config.yaml index 3379e08..d7d6619 100644 --- a/agents/specials/orchestrator/config.yaml +++ b/agents/specials/orchestrator/config.yaml @@ -6,9 +6,9 @@ special: llm: primary: - provider: anthropic - model: claude-sonnet-4-6 - api_key_env: ANTHROPIC_API_KEY + provider: openai + model: gpt-4o + api_key_env: OPENAI_API_KEY max_tokens: 512 temperature: 0.2 @@ -16,8 +16,4 @@ orchestration: max_iterations: 3 quality_threshold: 0.8 delegation_timeout: 30s - rooms: - - room_id: "${MATRIX_ROOM_SHARED}" - participants: - - assistant-bot - - asistente-2 + rooms: [] # auto-detected: any room with ≥2 registered bots is managed automatically diff --git a/cmd/launcher/main.go b/cmd/launcher/main.go index 9b64e1d..97659d4 100644 --- a/cmd/launcher/main.go +++ b/cmd/launcher/main.go @@ -14,6 +14,9 @@ import ( "path/filepath" "sync" "syscall" + "time" + + "maunium.net/go/mautrix" "github.com/spf13/cobra" @@ -70,13 +73,14 @@ func main() { // Non-fatal: orchestration is optional logger.Warn("orchestrator not started", "err", err) } else { - logger.Info("orchestrator ready", - "managed_rooms", len(orch.cfg.Orchestration.Rooms), - ) + logger.Info("orchestrator initialized") } // ── Start normal agents ── var wg sync.WaitGroup + var scannerOnce sync.Once + var scanner *mautrix.Client + for _, path := range configPaths { path := path cfg, err := config.Load(path) @@ -101,20 +105,22 @@ func main() { // Connect agent to bus for orchestration a.SetBus(agentBus) - // If orchestrator is active, set interceptor so bots don't - // handle events directly in orchestrated rooms. - // The first bot's listener to receive the event will trigger orchestration. + // If orchestrator is active, wire interceptor and membership notify if orch != nil { a.SetInterceptor(orch.orchestrator.Intercept) - } + a.SetMembershipNotify(orch.orchestrator.NotifyMembership) - // Register this agent as a participant in the orchestrator - if orch != nil { orch.orchestrator.RegisterParticipant(orchestration.ParticipantInfo{ - ID: cfg.Agent.ID, - Description: cfg.Agent.Description, + ID: cfg.Agent.ID, + MatrixUserID: cfg.Matrix.UserID, + Description: cfg.Agent.Description, Capabilities: cfg.Agent.Tags, }) + + // Grab the first available Matrix client for room scanning + scannerOnce.Do(func() { + scanner = a.RawMatrixClient() + }) } wg.Add(1) @@ -127,6 +133,14 @@ func main() { }() } + // ── Startup room scan (after all participants are registered) ── + if orch != nil && scanner != nil { + orch.orchestrator.SetScanner(scanner) + scanCtx, scanCancel := context.WithTimeout(ctx, 30*time.Second) + orch.orchestrator.ScanExistingRooms(scanCtx) + scanCancel() + } + wg.Wait() logger.Info("all agents stopped") return nil diff --git a/pkg/orchestration/task.go b/pkg/orchestration/task.go index e2ebb4f..0e2bc62 100644 --- a/pkg/orchestration/task.go +++ b/pkg/orchestration/task.go @@ -54,6 +54,7 @@ type ContextMessage struct { // ParticipantInfo describes a bot available for routing. type ParticipantInfo struct { ID string `json:"id"` + MatrixUserID string `json:"matrix_user_id"` // e.g. "@assistant-bot:server" Description string `json:"description"` Capabilities []string `json:"capabilities,omitempty"` } diff --git a/shell/matrix/listener.go b/shell/matrix/listener.go index 8b8edd7..672f240 100644 --- a/shell/matrix/listener.go +++ b/shell/matrix/listener.go @@ -23,15 +23,20 @@ type EventHandler func(ctx context.Context, msgCtx decision.MessageContext, evt // delivered to the bot's normal handler (the orchestrator handles it instead). type InterceptFunc func(ctx context.Context, msgCtx decision.MessageContext) bool +// MembershipNotifyFunc is called when a StateMember event fires. +// Used by the orchestrator to detect when bots join or leave rooms. +type MembershipNotifyFunc func(ctx context.Context, roomID, userID, membership string) + // Listener attaches to a mautrix syncer and dispatches events to an EventHandler. type Listener struct { - client *Client - cfg config.MatrixCfg - handler EventHandler - logger *slog.Logger - dmCache map[id.RoomID]bool - mu sync.RWMutex - interceptor InterceptFunc // if set and returns true, event is forwarded to orchestrator + client *Client + cfg config.MatrixCfg + handler EventHandler + logger *slog.Logger + dmCache map[id.RoomID]bool + mu sync.RWMutex + interceptor InterceptFunc // if set and returns true, event is forwarded to orchestrator + membershipNotify MembershipNotifyFunc // if set, called on all StateMember events } // NewListener creates a Listener for the given client. @@ -52,6 +57,11 @@ func (l *Listener) SetInterceptor(fn InterceptFunc) { l.interceptor = fn } +// SetMembershipNotify registers a callback for StateMember events. +func (l *Listener) SetMembershipNotify(fn MembershipNotifyFunc) { + l.membershipNotify = fn +} + // Run starts the Matrix sync loop. Blocks until ctx is cancelled. func (l *Listener) Run(ctx context.Context) error { syncer := l.client.raw.Syncer.(*mautrix.DefaultSyncer) @@ -59,10 +69,19 @@ func (l *Listener) Run(ctx context.Context) error { // Auto-join rooms when invited. Without this, the bot stays in "invited" // state and never receives m.room.message events. syncer.OnEventType(event.StateMember, func(ctx context.Context, evt *event.Event) { - if evt.GetStateKey() != l.cfg.UserID { + stateKey := evt.GetStateKey() + membership := evt.Content.AsMember().Membership + + // Notify orchestrator of all membership changes (for any user) + if l.membershipNotify != nil { + l.membershipNotify(ctx, evt.RoomID.String(), stateKey, string(membership)) + } + + // Auto-join: only process invites for ourselves + if stateKey != l.cfg.UserID { return } - if evt.Content.AsMember().Membership != event.MembershipInvite { + if membership != event.MembershipInvite { return } l.logger.Info("received room invite, joining", "room", evt.RoomID, "inviter", evt.Sender) diff --git a/shell/orchestration/orchestrator.go b/shell/orchestration/orchestrator.go index 9e116cd..39d23de 100644 --- a/shell/orchestration/orchestrator.go +++ b/shell/orchestration/orchestrator.go @@ -12,6 +12,9 @@ import ( "strings" "sync" + "maunium.net/go/mautrix" + "maunium.net/go/mautrix/id" + "github.com/enmanuel/agents/internal/config" "github.com/enmanuel/agents/pkg/decision" coretypes "github.com/enmanuel/agents/pkg/llm" @@ -20,16 +23,31 @@ import ( shelllm "github.com/enmanuel/agents/shell/llm" ) +// RoomScanner is a read-only view of Matrix rooms and members. +// Satisfied by *mautrix.Client. +type RoomScanner interface { + JoinedRooms(ctx context.Context) (*mautrix.RespJoinedRooms, error) + JoinedMembers(ctx context.Context, roomID id.RoomID) (*mautrix.RespJoinedMembers, error) +} + // Orchestrator coordinates multi-bot rooms. It has no Matrix identity — // it intercepts events before they reach bots and delegates via the bus. type Orchestrator struct { cfg *config.SpecialConfig llm coretypes.CompleteFunc bus *bus.Bus - managedRooms map[string][]string // roomID → []botID - participants map[string]orchestration.ParticipantInfo // botID → info logger *slog.Logger + // mu protects managedRooms, participants, and knownBotIDs. + mu sync.RWMutex + managedRooms map[string][]string // roomID → []botID + staticRooms map[string]struct{} // rooms from YAML (never auto-removed) + participants map[string]orchestration.ParticipantInfo // botID → info + knownBotIDs map[string]string // matrixUserID → botID + + // Scanner for room/member discovery (set via SetScanner after startup) + scanner RoomScanner + // Prompts loaded from files routingPrompt string qualityPrompt string @@ -49,8 +67,13 @@ func New(cfg *config.SpecialConfig, agentBus *bus.Bus, logger *slog.Logger) (*Or } managed := make(map[string][]string) + static := make(map[string]struct{}) for _, room := range cfg.Orchestration.Rooms { + if room.RoomID == "" { + continue // skip empty room IDs (unset env vars) + } managed[room.RoomID] = room.Participants + static[room.RoomID] = struct{}{} } o := &Orchestrator{ @@ -58,7 +81,9 @@ func New(cfg *config.SpecialConfig, agentBus *bus.Bus, logger *slog.Logger) (*Or llm: llmFunc, bus: agentBus, managedRooms: managed, + staticRooms: static, participants: make(map[string]orchestration.ParticipantInfo), + knownBotIDs: make(map[string]string), logger: logger, seen: make(map[string]bool), } @@ -70,15 +95,115 @@ func New(cfg *config.SpecialConfig, agentBus *bus.Bus, logger *slog.Logger) (*Or return o, nil } +// SetScanner injects a Matrix client for room/member discovery. +// Must be called before ScanExistingRooms. +func (o *Orchestrator) SetScanner(s RoomScanner) { + o.scanner = s +} + // RegisterParticipant adds bot metadata used for LLM routing decisions. func (o *Orchestrator) RegisterParticipant(info orchestration.ParticipantInfo) { + o.mu.Lock() o.participants[info.ID] = info - o.logger.Debug("registered participant", "bot", info.ID, "desc", info.Description) + if info.MatrixUserID != "" { + o.knownBotIDs[info.MatrixUserID] = info.ID + } + o.mu.Unlock() + o.logger.Debug("registered participant", "bot", info.ID, "matrix_uid", info.MatrixUserID) +} + +// ScanExistingRooms discovers rooms where ≥2 registered bots are members. +// Called once at startup after all participants are registered. +func (o *Orchestrator) ScanExistingRooms(ctx context.Context) { + if o.scanner == nil { + o.logger.Warn("no scanner set, skipping room discovery") + return + } + + resp, err := o.scanner.JoinedRooms(ctx) + if err != nil { + o.logger.Error("failed to list joined rooms for discovery", "err", err) + return + } + + for _, roomID := range resp.JoinedRooms { + o.evaluateRoom(ctx, roomID.String()) + } + + o.mu.RLock() + count := len(o.managedRooms) + o.mu.RUnlock() + o.logger.Info("room discovery complete", "managed_rooms", count) +} + +// evaluateRoom checks if a room has ≥2 registered bots and updates managedRooms. +func (o *Orchestrator) evaluateRoom(ctx context.Context, roomID string) { + if o.scanner == nil { + return + } + + members, err := o.scanner.JoinedMembers(ctx, id.RoomID(roomID)) + if err != nil { + o.logger.Warn("evaluateRoom: failed to fetch members", "room", roomID, "err", err) + return + } + + // Collect which registered bots are in this room + o.mu.RLock() + var presentBots []string + for matrixUID, botID := range o.knownBotIDs { + if _, ok := members.Joined[id.UserID(matrixUID)]; ok { + presentBots = append(presentBots, botID) + } + } + _, isStatic := o.staticRooms[roomID] + o.mu.RUnlock() + + // Static rooms (from YAML) are never auto-managed + if isStatic { + return + } + + o.mu.Lock() + defer o.mu.Unlock() + + if len(presentBots) >= 2 { + prev, already := o.managedRooms[roomID] + if !already { + o.managedRooms[roomID] = presentBots + o.logger.Info("auto-managing room", "room", roomID, "bots", presentBots) + } else if len(prev) != len(presentBots) { + o.managedRooms[roomID] = presentBots + o.logger.Info("updated room participants", "room", roomID, "bots", presentBots) + } + } else { + if _, was := o.managedRooms[roomID]; was { + delete(o.managedRooms, roomID) + o.logger.Info("stopped managing room", "room", roomID, "remaining_bots", len(presentBots)) + } + } +} + +// NotifyMembership is called by bot listeners when a room membership changes. +// It re-evaluates whether the room should be auto-managed. +func (o *Orchestrator) NotifyMembership(ctx context.Context, roomID, userID, membership string) { + o.mu.RLock() + _, isBot := o.knownBotIDs[userID] + o.mu.RUnlock() + if !isBot { + return // only care about bot membership changes + } + + o.logger.Debug("bot membership change, re-evaluating room", + "room", roomID, "user", userID, "membership", membership) + go o.evaluateRoom(ctx, roomID) } // ShouldIntercept returns true if the room is managed by this orchestrator. func (o *Orchestrator) ShouldIntercept(roomID string) bool { + o.mu.RLock() _, ok := o.managedRooms[roomID] + o.mu.RUnlock() return ok } @@ -91,6 +216,14 @@ func (o *Orchestrator) Intercept(ctx context.Context, msgCtx decision.MessageCon return false } + // Ignore messages from known bots to prevent feedback loops. + o.mu.RLock() + _, senderIsBot := o.knownBotIDs[msgCtx.SenderID] + o.mu.RUnlock() + if senderIsBot { + return true // suppress but don't route — bot's own message + } + // Dedup: multiple bots receive the same event. Only route once. key := msgCtx.RoomID + ":" + msgCtx.SenderID + ":" + msgCtx.Content o.seenMu.Lock() @@ -119,7 +252,11 @@ func (o *Orchestrator) Intercept(ctx context.Context, msgCtx decision.MessageCon // Route is the main entry point. Called when a human posts in a managed room. // It decides which bot(s) should respond and dispatches tasks via the bus. func (o *Orchestrator) Route(ctx context.Context, msgCtx decision.MessageContext) error { + o.mu.RLock() participants, ok := o.managedRooms[msgCtx.RoomID] + participantsCopy := append([]string(nil), participants...) + o.mu.RUnlock() + if !ok { return fmt.Errorf("room %s is not managed", msgCtx.RoomID) } @@ -127,14 +264,14 @@ func (o *Orchestrator) Route(ctx context.Context, msgCtx decision.MessageContext o.logger.Info("orchestrating message", "room", msgCtx.RoomID, "sender", msgCtx.SenderID, - "participants", participants, + "participants", participantsCopy, "content_preview", truncate(msgCtx.Content, 80), ) // Optimization: single bot → dispatch directly without LLM - if len(participants) == 1 { - o.logger.Debug("single participant, dispatching directly", "bot", participants[0]) - _, err := o.dispatchAndWait(ctx, participants[0], msgCtx, 0, nil) + if len(participantsCopy) == 1 { + o.logger.Debug("single participant, dispatching directly", "bot", participantsCopy[0]) + _, err := o.dispatchAndWait(ctx, participantsCopy[0], msgCtx, 0, nil) return err } @@ -151,10 +288,10 @@ func (o *Orchestrator) Route(ctx context.Context, msgCtx decision.MessageContext var err error if i == 0 { - rd, routeErr := o.routeInitial(ctx, msgCtx.Content, participants) + rd, routeErr := o.routeInitial(ctx, msgCtx.Content, participantsCopy) if routeErr != nil { o.logger.Error("routing failed, falling back to first participant", "err", routeErr) - target = participants[0] + target = participantsCopy[0] } else { target = rd.TargetBotID o.logger.Info("routed to bot", @@ -165,7 +302,7 @@ func (o *Orchestrator) Route(ctx context.Context, msgCtx decision.MessageContext ) } } else { - rd, routeErr := o.routeRefinement(ctx, msgCtx.Content, responses, participants, lastBot) + rd, routeErr := o.routeRefinement(ctx, msgCtx.Content, responses, participantsCopy, lastBot) if routeErr != nil { o.logger.Warn("refinement routing failed, stopping pipeline", "err", routeErr) break @@ -304,6 +441,8 @@ func (o *Orchestrator) loadPrompts() error { // buildParticipantsList formats participant info for LLM prompts. func (o *Orchestrator) buildParticipantsList(botIDs []string, exclude string) string { + o.mu.RLock() + defer o.mu.RUnlock() var sb strings.Builder for _, id := range botIDs { if id == exclude {