Files
agents_and_robots/shell/bus/bus.go
T
2026-03-03 23:19:23 +00:00

65 lines
1.3 KiB
Go

// Package bus provides in-process agent-to-agent message passing.
package bus
import (
"fmt"
"sync"
)
// AgentID identifies an agent.
type AgentID string
// AgentMessage is a message between agents.
type AgentMessage struct {
From AgentID
To AgentID
Kind string
Payload map[string]string
}
// Bus manages channels for inter-agent communication.
type Bus struct {
mu sync.RWMutex
channels map[AgentID]chan AgentMessage
}
// New creates a new Bus.
func New() *Bus {
return &Bus{channels: make(map[AgentID]chan AgentMessage)}
}
// Subscribe registers an agent and returns its receive channel.
func (b *Bus) Subscribe(id AgentID) <-chan AgentMessage {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan AgentMessage, 64)
b.channels[id] = ch
return ch
}
// Send delivers a message to an agent's channel.
func (b *Bus) Send(msg AgentMessage) error {
b.mu.RLock()
ch, ok := b.channels[msg.To]
b.mu.RUnlock()
if !ok {
return fmt.Errorf("agent %q not registered on bus", msg.To)
}
select {
case ch <- msg:
return nil
default:
return fmt.Errorf("agent %q message queue full", msg.To)
}
}
// Unsubscribe removes an agent from the bus.
func (b *Bus) Unsubscribe(id AgentID) {
b.mu.Lock()
defer b.mu.Unlock()
if ch, ok := b.channels[id]; ok {
close(ch)
delete(b.channels, id)
}
}