Repo iniciado

This commit is contained in:
2026-03-03 23:19:23 +00:00
commit c126187c5a
32 changed files with 2719 additions and 0 deletions
+64
View File
@@ -0,0 +1,64 @@
// Package bus provides in-process agent-to-agent message passing.
package bus
import (
"fmt"
"sync"
)
// AgentID identifies an agent.
type AgentID string
// AgentMessage is a message between agents.
type AgentMessage struct {
From AgentID
To AgentID
Kind string
Payload map[string]string
}
// Bus manages channels for inter-agent communication.
type Bus struct {
mu sync.RWMutex
channels map[AgentID]chan AgentMessage
}
// New creates a new Bus.
func New() *Bus {
return &Bus{channels: make(map[AgentID]chan AgentMessage)}
}
// Subscribe registers an agent and returns its receive channel.
func (b *Bus) Subscribe(id AgentID) <-chan AgentMessage {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan AgentMessage, 64)
b.channels[id] = ch
return ch
}
// Send delivers a message to an agent's channel.
func (b *Bus) Send(msg AgentMessage) error {
b.mu.RLock()
ch, ok := b.channels[msg.To]
b.mu.RUnlock()
if !ok {
return fmt.Errorf("agent %q not registered on bus", msg.To)
}
select {
case ch <- msg:
return nil
default:
return fmt.Errorf("agent %q message queue full", msg.To)
}
}
// Unsubscribe removes an agent from the bus.
func (b *Bus) Unsubscribe(id AgentID) {
b.mu.Lock()
defer b.mu.Unlock()
if ch, ok := b.channels[id]; ok {
close(ch)
delete(b.channels, id)
}
}
+78
View File
@@ -0,0 +1,78 @@
// Package effects interprets pure []decision.Action values into real side effects.
package effects
import (
"context"
"fmt"
"log/slog"
"github.com/enmanuel/agents/pkg/decision"
"github.com/enmanuel/agents/shell/ssh"
)
// Result holds the outcome of executing a single action.
type Result struct {
Action decision.Action
Output string
Err error
}
// MatrixSender is satisfied by shell/matrix.Client.
type MatrixSender interface {
SendText(ctx context.Context, roomID, text string) error
SendTyping(ctx context.Context, roomID string, typing bool) error
}
// Runner interprets actions and executes them.
type Runner struct {
matrix MatrixSender
ssh *ssh.Executor
logger *slog.Logger
}
// NewRunner creates a Runner with the provided dependencies.
func NewRunner(matrix MatrixSender, ssh *ssh.Executor, logger *slog.Logger) *Runner {
return &Runner{matrix: matrix, ssh: ssh, logger: logger}
}
// Execute runs each action sequentially and returns results.
func (r *Runner) Execute(ctx context.Context, roomID string, actions []decision.Action) []Result {
results := make([]Result, 0, len(actions))
for _, a := range actions {
res := r.executeOne(ctx, roomID, a)
results = append(results, res)
if res.Err != nil {
r.logger.Error("action failed", "kind", a.Kind, "err", res.Err)
}
}
return results
}
func (r *Runner) executeOne(ctx context.Context, roomID string, a decision.Action) Result {
switch a.Kind {
case decision.ActionKindReply:
if a.Reply == nil {
return Result{Action: a, Err: fmt.Errorf("nil reply action")}
}
target := roomID
if a.Reply.ThreadID != "" {
target = a.Reply.ThreadID
}
err := r.matrix.SendText(ctx, target, a.Reply.Content)
return Result{Action: a, Output: a.Reply.Content, Err: err}
case decision.ActionKindSSH:
if a.SSH == nil {
return Result{Action: a, Err: fmt.Errorf("nil ssh action")}
}
res := r.ssh.Execute(ctx, *a.SSH)
output := res.Stdout
if res.Stderr != "" {
output += "\nstderr: " + res.Stderr
}
return Result{Action: a, Output: output, Err: res.Err}
default:
return Result{Action: a, Err: fmt.Errorf("unhandled action kind: %s", a.Kind)}
}
}
+146
View File
@@ -0,0 +1,146 @@
// Package llm contains impure LLM provider implementations.
package llm
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
coretypes "github.com/enmanuel/agents/pkg/llm"
)
const anthropicAPIBase = "https://api.anthropic.com/v1"
const anthropicVersion = "2023-06-01"
// NewAnthropicComplete returns a CompleteFunc backed by the Anthropic API.
func NewAnthropicComplete(apiKeyEnv, baseURL string) coretypes.CompleteFunc {
if baseURL == "" {
baseURL = anthropicAPIBase
}
return func(ctx context.Context, req coretypes.CompletionRequest) (coretypes.CompletionResponse, error) {
apiKey := os.Getenv(apiKeyEnv)
if apiKey == "" {
return coretypes.CompletionResponse{}, fmt.Errorf("env var %s is not set", apiKeyEnv)
}
body := toAnthropicRequest(req)
raw, err := json.Marshal(body)
if err != nil {
return coretypes.CompletionResponse{}, fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, baseURL+"/messages", bytes.NewReader(raw))
if err != nil {
return coretypes.CompletionResponse{}, err
}
httpReq.Header.Set("x-api-key", apiKey)
httpReq.Header.Set("anthropic-version", anthropicVersion)
httpReq.Header.Set("content-type", "application/json")
resp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return coretypes.CompletionResponse{}, fmt.Errorf("anthropic request: %w", err)
}
defer resp.Body.Close()
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return coretypes.CompletionResponse{}, fmt.Errorf("read response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return coretypes.CompletionResponse{}, fmt.Errorf("anthropic error %d: %s", resp.StatusCode, respBytes)
}
return fromAnthropicResponse(respBytes)
}
}
// ── private conversion helpers ────────────────────────────────────────────
type anthropicRequest struct {
Model string `json:"model"`
MaxTokens int `json:"max_tokens"`
System string `json:"system,omitempty"`
Messages []anthropicMessage `json:"messages"`
Tools []anthropicTool `json:"tools,omitempty"`
}
type anthropicMessage struct {
Role string `json:"role"`
Content string `json:"content"`
}
type anthropicTool struct {
Name string `json:"name"`
Description string `json:"description"`
InputSchema map[string]any `json:"input_schema"`
}
type anthropicResponse struct {
Content []struct {
Type string `json:"type"`
Text string `json:"text"`
} `json:"content"`
Usage struct {
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
} `json:"usage"`
StopReason string `json:"stop_reason"`
}
func toAnthropicRequest(req coretypes.CompletionRequest) anthropicRequest {
msgs := make([]anthropicMessage, 0, len(req.Messages))
for _, m := range req.Messages {
if m.Role == coretypes.RoleSystem {
continue // handled as top-level system param
}
msgs = append(msgs, anthropicMessage{
Role: string(m.Role),
Content: m.Content,
})
}
tools := make([]anthropicTool, len(req.Tools))
for i, t := range req.Tools {
tools[i] = anthropicTool{
Name: t.Name,
Description: t.Description,
InputSchema: t.InputSchema,
}
}
return anthropicRequest{
Model: req.Model,
MaxTokens: req.MaxTokens,
System: req.SystemPrompt,
Messages: msgs,
Tools: tools,
}
}
func fromAnthropicResponse(raw []byte) (coretypes.CompletionResponse, error) {
var ar anthropicResponse
if err := json.Unmarshal(raw, &ar); err != nil {
return coretypes.CompletionResponse{}, fmt.Errorf("unmarshal response: %w", err)
}
var content string
for _, c := range ar.Content {
if c.Type == "text" {
content += c.Text
}
}
return coretypes.CompletionResponse{
Content: content,
FinishReason: ar.StopReason,
Usage: coretypes.TokenUsage{
InputTokens: ar.Usage.InputTokens,
OutputTokens: ar.Usage.OutputTokens,
TotalTokens: ar.Usage.InputTokens + ar.Usage.OutputTokens,
},
}, nil
}
+39
View File
@@ -0,0 +1,39 @@
package llm
import (
"context"
"fmt"
"github.com/enmanuel/agents/internal/config"
coretypes "github.com/enmanuel/agents/pkg/llm"
)
// FromConfig builds a CompleteFunc from an LLMProviderCfg.
func FromConfig(cfg config.LLMProviderCfg) (coretypes.CompleteFunc, error) {
switch cfg.Provider {
case "anthropic":
return NewAnthropicComplete(cfg.APIKeyEnv, cfg.BaseURL), nil
case "openai":
return NewOpenAIComplete(cfg.APIKeyEnv, cfg.BaseURL), nil
case "ollama":
base := cfg.BaseURL
if base == "" {
base = "http://localhost:11434/v1"
}
return NewOpenAIComplete("OLLAMA_API_KEY", base), nil
default:
return nil, fmt.Errorf("unknown LLM provider: %s", cfg.Provider)
}
}
// WithFallback wraps primary with a fallback CompleteFunc.
// If primary returns an error, fallback is tried.
func WithFallback(primary, fallback coretypes.CompleteFunc) coretypes.CompleteFunc {
return func(ctx context.Context, req coretypes.CompletionRequest) (coretypes.CompletionResponse, error) {
resp, err := primary(ctx, req)
if err != nil {
return fallback(ctx, req)
}
return resp, nil
}
}
+76
View File
@@ -0,0 +1,76 @@
package llm
import (
"context"
"fmt"
"os"
openai "github.com/sashabaranov/go-openai"
coretypes "github.com/enmanuel/agents/pkg/llm"
)
// NewOpenAIComplete returns a CompleteFunc backed by the OpenAI-compatible API.
// Works with OpenAI, Ollama, vLLM, LMStudio — just change baseURL.
func NewOpenAIComplete(apiKeyEnv, baseURL string) coretypes.CompleteFunc {
return func(ctx context.Context, req coretypes.CompletionRequest) (coretypes.CompletionResponse, error) {
apiKey := os.Getenv(apiKeyEnv)
if apiKey == "" {
apiKey = "ollama" // Ollama doesn't require a real key
}
cfg := openai.DefaultConfig(apiKey)
if baseURL != "" {
cfg.BaseURL = baseURL
}
client := openai.NewClientWithConfig(cfg)
msgs := make([]openai.ChatCompletionMessage, 0, len(req.Messages)+1)
if req.SystemPrompt != "" {
msgs = append(msgs, openai.ChatCompletionMessage{
Role: openai.ChatMessageRoleSystem,
Content: req.SystemPrompt,
})
}
for _, m := range req.Messages {
role := openai.ChatMessageRoleUser
switch m.Role {
case coretypes.RoleAssistant:
role = openai.ChatMessageRoleAssistant
case coretypes.RoleSystem:
role = openai.ChatMessageRoleSystem
case coretypes.RoleTool:
role = openai.ChatMessageRoleTool
}
msgs = append(msgs, openai.ChatCompletionMessage{
Role: role,
Content: m.Content,
})
}
openReq := openai.ChatCompletionRequest{
Model: req.Model,
Messages: msgs,
MaxTokens: req.MaxTokens,
Temperature: float32(req.Temperature),
}
resp, err := client.CreateChatCompletion(ctx, openReq)
if err != nil {
return coretypes.CompletionResponse{}, fmt.Errorf("openai completion: %w", err)
}
if len(resp.Choices) == 0 {
return coretypes.CompletionResponse{}, fmt.Errorf("openai: empty choices")
}
return coretypes.CompletionResponse{
Content: resp.Choices[0].Message.Content,
FinishReason: string(resp.Choices[0].FinishReason),
Usage: coretypes.TokenUsage{
InputTokens: resp.Usage.PromptTokens,
OutputTokens: resp.Usage.CompletionTokens,
TotalTokens: resp.Usage.TotalTokens,
},
}, nil
}
}
+75
View File
@@ -0,0 +1,75 @@
// Package matrix wraps mautrix-go for agent use.
package matrix
import (
"context"
"fmt"
"os"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
"github.com/enmanuel/agents/internal/config"
)
// Client wraps a mautrix client with agent-relevant helpers.
type Client struct {
raw *mautrix.Client
cfg config.MatrixCfg
}
// New creates and authenticates a Matrix client from config.
// The access token is read from the env var specified in cfg.AccessTokenEnv.
func New(cfg config.MatrixCfg) (*Client, error) {
token := os.Getenv(cfg.AccessTokenEnv)
if token == "" {
return nil, fmt.Errorf("env var %s is not set", cfg.AccessTokenEnv)
}
raw, err := mautrix.NewClient(cfg.Homeserver, id.UserID(cfg.UserID), token)
if err != nil {
return nil, fmt.Errorf("create matrix client: %w", err)
}
if cfg.DeviceID != "" {
raw.DeviceID = id.DeviceID(cfg.DeviceID)
}
return &Client{raw: raw, cfg: cfg}, nil
}
// SendText sends a plain-text message to a room.
func (c *Client) SendText(ctx context.Context, roomID, text string) error {
_, err := c.raw.SendText(ctx, id.RoomID(roomID), text)
return err
}
// SendMarkdown sends a formatted (Markdown) message to a room.
func (c *Client) SendMarkdown(ctx context.Context, roomID, markdown string) error {
content := event.MessageEventContent{
MsgType: event.MsgText,
Body: markdown,
Format: event.FormatHTML,
FormattedBody: markdown, // mautrix can render markdown -> HTML if needed
}
_, err := c.raw.SendMessageEvent(ctx, id.RoomID(roomID), event.EventMessage, content)
return err
}
// SendReaction sends a reaction to an event.
func (c *Client) SendReaction(ctx context.Context, roomID, eventID, reaction string) error {
_, err := c.raw.SendReaction(ctx, id.RoomID(roomID), id.EventID(eventID), reaction)
return err
}
// SendTyping sets the typing indicator in a room.
func (c *Client) SendTyping(ctx context.Context, roomID string, typing bool) error {
_, err := c.raw.UserTyping(ctx, id.RoomID(roomID), typing, 5000)
return err
}
// Raw returns the underlying mautrix.Client for advanced use.
func (c *Client) Raw() *mautrix.Client {
return c.raw
}
+122
View File
@@ -0,0 +1,122 @@
package matrix
import (
"context"
"log/slog"
"strings"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
"github.com/enmanuel/agents/internal/config"
"github.com/enmanuel/agents/pkg/decision"
"github.com/enmanuel/agents/pkg/message"
)
// EventHandler is called for each incoming Matrix message that passes filters.
type EventHandler func(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event)
// Listener attaches to a mautrix syncer and dispatches events to an EventHandler.
type Listener struct {
client *Client
cfg config.MatrixCfg
handler EventHandler
logger *slog.Logger
}
// NewListener creates a Listener for the given client.
func NewListener(client *Client, cfg config.MatrixCfg, handler EventHandler, logger *slog.Logger) *Listener {
return &Listener{
client: client,
cfg: cfg,
handler: handler,
logger: logger,
}
}
// Run starts the Matrix sync loop. Blocks until ctx is cancelled.
func (l *Listener) Run(ctx context.Context) error {
syncer := l.client.raw.Syncer.(*mautrix.DefaultSyncer)
syncer.OnEventType(event.EventMessage, func(ctx context.Context, evt *event.Event) {
if !l.shouldHandle(evt) {
return
}
body, ok := evt.Content.Raw["body"].(string)
if !ok || body == "" {
return
}
// Determine power level (simplified — full impl fetches from room state)
powerLevel := 0
isDM := false // TODO: detect DMs via room member count
opts := message.ParseOptions{
CommandPrefix: l.cfg.Filters.CommandPrefix,
BotUserID: l.cfg.UserID,
}
msgCtx := message.Parse(
body,
evt.Sender.String(),
evt.RoomID.String(),
powerLevel,
isDM,
opts,
)
go l.handler(ctx, msgCtx, evt)
})
l.logger.Info("starting matrix sync")
return l.client.raw.SyncWithContext(ctx)
}
// shouldHandle applies the configured filters to an event.
func (l *Listener) shouldHandle(evt *event.Event) bool {
f := l.cfg.Filters
// Don't handle our own messages
if evt.Sender == id.UserID(l.cfg.UserID) {
return false
}
// Ignore bots
if f.IgnoreBots && strings.HasSuffix(evt.Sender.String(), "-bot:"+serverName(l.cfg.UserID)) {
return false
}
// Ignore specific users
for _, u := range f.IgnoreUsers {
if evt.Sender.String() == u {
return false
}
}
// Check if room is in the listen list
if len(l.cfg.Rooms.Listen) > 0 {
allowed := false
for _, r := range l.cfg.Rooms.Listen {
if evt.RoomID.String() == r {
allowed = true
break
}
}
if !allowed {
return false
}
}
return true
}
func serverName(userID string) string {
parts := strings.SplitN(userID, ":", 2)
if len(parts) == 2 {
return parts[1]
}
return ""
}
+43
View File
@@ -0,0 +1,43 @@
// Package protocols contains adapters for external agent protocols.
package protocols
import (
"context"
"fmt"
"log/slog"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
"github.com/enmanuel/agents/pkg/tools"
)
// MCPServer exposes agent tools as an MCP server.
type MCPServer struct {
srv *server.MCPServer
logger *slog.Logger
}
// NewMCPServer creates an MCP server exposing the given tool specs.
func NewMCPServer(name, version string, specs []tools.ToolSpec, logger *slog.Logger) *MCPServer {
srv := server.NewMCPServer(name, version)
for _, spec := range specs {
spec := spec // capture
tool := mcp.NewTool(spec.Name,
mcp.WithDescription(spec.Description),
)
srv.AddTool(tool, func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Placeholder handler — wire real execution here
return mcp.NewToolResultText(fmt.Sprintf("tool %s called", spec.Name)), nil
})
}
return &MCPServer{srv: srv, logger: logger}
}
// ServeStdio runs the MCP server over stdin/stdout (for Claude Desktop / CLI integration).
func (m *MCPServer) ServeStdio(ctx context.Context) error {
m.logger.Info("mcp server starting on stdio")
return server.ServeStdio(m.srv)
}
+146
View File
@@ -0,0 +1,146 @@
// Package ssh provides impure SSH command execution.
package ssh
import (
"bytes"
"context"
"fmt"
"net"
"os"
"time"
gossh "golang.org/x/crypto/ssh"
"github.com/enmanuel/agents/internal/config"
"github.com/enmanuel/agents/pkg/tools"
)
// Result holds the output of an SSH command execution.
type Result struct {
Stdout string
Stderr string
ExitCode int
Err error
}
// Executor runs SSH commands against configured targets.
type Executor struct {
cfg config.SSHCfg
}
// NewExecutor creates an Executor from the SSH config section.
func NewExecutor(cfg config.SSHCfg) *Executor {
return &Executor{cfg: cfg}
}
// Execute runs the SSH command described by spec. Impure.
func (e *Executor) Execute(ctx context.Context, spec tools.SSHCommandSpec) Result {
target, ok := e.cfg.Targets[spec.Target]
if !ok {
return Result{Err: fmt.Errorf("unknown SSH target: %s", spec.Target)}
}
if len(target.Hosts) == 0 {
return Result{Err: fmt.Errorf("no hosts for target: %s", spec.Target)}
}
// Use first host (round-robin or load balancing can be added later)
host := target.Hosts[0]
user := target.User
if user == "" {
user = e.cfg.Defaults.User
}
port := target.Port
if port == 0 {
port = e.cfg.Defaults.Port
}
if port == 0 {
port = 22
}
keyEnv := target.KeyFileEnv
if keyEnv == "" {
keyEnv = e.cfg.Defaults.KeyFileEnv
}
signer, err := loadSigner(keyEnv)
if err != nil {
return Result{Err: fmt.Errorf("load SSH key: %w", err)}
}
sshCfg := &gossh.ClientConfig{
User: user,
Auth: []gossh.AuthMethod{gossh.PublicKeys(signer)},
HostKeyCallback: gossh.InsecureIgnoreHostKey(), // TODO: use known_hosts
Timeout: e.cfg.Defaults.Timeout,
}
if sshCfg.Timeout == 0 {
sshCfg.Timeout = 10 * time.Second
}
addr := fmt.Sprintf("%s:%d", host, port)
conn, err := gossh.Dial("tcp", addr, sshCfg)
if err != nil {
return Result{Err: fmt.Errorf("ssh dial %s: %w", addr, err)}
}
defer conn.Close()
session, err := conn.NewSession()
if err != nil {
return Result{Err: fmt.Errorf("ssh session: %w", err)}
}
defer session.Close()
var stdout, stderr bytes.Buffer
session.Stdout = &stdout
session.Stderr = &stderr
// Respect context cancellation via a goroutine
done := make(chan error, 1)
go func() { done <- session.Run(spec.Command) }()
select {
case <-ctx.Done():
session.Signal(gossh.SIGTERM)
return Result{Err: ctx.Err()}
case err := <-done:
code := 0
if err != nil {
var exitErr *gossh.ExitError
if ok := asExitError(err, &exitErr); ok {
code = exitErr.ExitStatus()
} else {
return Result{Err: err}
}
}
return Result{
Stdout: stdout.String(),
Stderr: stderr.String(),
ExitCode: code,
}
}
}
func loadSigner(keyFileEnv string) (gossh.Signer, error) {
keyPath := os.Getenv(keyFileEnv)
if keyPath == "" {
return nil, fmt.Errorf("env var %s not set", keyFileEnv)
}
raw, err := os.ReadFile(keyPath)
if err != nil {
return nil, err
}
return gossh.ParsePrivateKey(raw)
}
// asExitError is a helper for type-asserting ssh.ExitError.
func asExitError(err error, target **gossh.ExitError) bool {
e, ok := err.(*gossh.ExitError)
if ok {
*target = e
}
return ok
}
// Ensure net is used (for future jump host support)
var _ = net.Dial