Files
agents_and_robots/agents/runtime.go
T

309 lines
8.8 KiB
Go

// Package agents defines the Agent runtime that ties core and shell together.
package agents
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"strings"
"maunium.net/go/mautrix/event"
"github.com/enmanuel/agents/internal/config"
"github.com/enmanuel/agents/pkg/decision"
coretypes "github.com/enmanuel/agents/pkg/llm"
"github.com/enmanuel/agents/pkg/personality"
"github.com/enmanuel/agents/shell/effects"
shelllm "github.com/enmanuel/agents/shell/llm"
"github.com/enmanuel/agents/shell/matrix"
"github.com/enmanuel/agents/shell/ssh"
"github.com/enmanuel/agents/tools"
)
const defaultMaxToolIterations = 5
// Agent is the assembled runtime: pure core + impure shell.
type Agent struct {
cfg *config.AgentConfig
personality personality.Personality
rules []decision.Rule
llm coretypes.CompleteFunc
matrix *matrix.Client
runner *effects.Runner
listener *matrix.Listener
toolReg *tools.Registry
logger *slog.Logger
cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown
}
// New assembles an Agent from its config, rules, and logger.
func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (*Agent, error) {
// Matrix client
matrixClient, err := matrix.New(cfg.Matrix)
if err != nil {
return nil, fmt.Errorf("matrix client: %w", err)
}
// E2EE — initialize before the sync loop starts
var cryptoStore io.Closer
if cfg.Matrix.Encryption.Enabled {
storePath := filepath.Join(cfg.Matrix.Encryption.StorePath, "crypto.db")
pickleKey := os.Getenv(cfg.Matrix.Encryption.PickleKeyEnv)
logger.Info("initializing e2ee", "store", storePath)
cryptoStore, err = matrixClient.InitCrypto(context.Background(), storePath, pickleKey, cfg.Agent.ID)
if err != nil {
if strings.Contains(err.Error(), "not marked as shared") {
logger.Error("crypto store is inconsistent with server — need a fresh device",
"store", storePath,
"fix", "delete crypto.db, login with password to get new token+device, update .env, restart",
)
}
return nil, fmt.Errorf("e2ee init: %w", err)
}
logger.Info("e2ee ready")
}
// SSH executor
sshExec := ssh.NewExecutor(cfg.SSH)
// LLM client
primaryLLM, err := shelllm.FromConfig(cfg.LLM.Primary)
if err != nil {
return nil, fmt.Errorf("primary LLM: %w", err)
}
var llmFunc coretypes.CompleteFunc = primaryLLM
if cfg.LLM.Fallback.Provider != "" {
fallbackLLM, err := shelllm.FromConfig(cfg.LLM.Fallback)
if err != nil {
logger.Warn("fallback LLM config error", "err", err)
} else {
llmFunc = shelllm.WithFallback(primaryLLM, fallbackLLM)
}
}
// Effects runner
runner := effects.NewRunner(matrixClient, sshExec, logger)
// Tool registry — register tools enabled in config
toolReg := buildToolRegistry(cfg, sshExec, matrixClient, logger)
a := &Agent{
cfg: cfg,
rules: rules,
llm: llmFunc,
matrix: matrixClient,
runner: runner,
toolReg: toolReg,
logger: logger,
cryptoStore: cryptoStore,
}
// Matrix event listener
a.listener = matrix.NewListener(matrixClient, cfg.Matrix, a.handleEvent, logger)
return a, nil
}
// Run starts the agent sync loop. Blocks until ctx is cancelled.
func (a *Agent) Run(ctx context.Context) error {
if a.cryptoStore != nil {
defer a.cryptoStore.Close()
}
a.logger.Info("agent starting",
"id", a.cfg.Agent.ID,
"name", a.cfg.Agent.Name,
"tools", a.toolReg.Names(),
)
return a.listener.Run(ctx)
}
// handleEvent is called by the matrix Listener for each filtered incoming event.
func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) {
a.logger.Debug("handling event",
"sender", msgCtx.SenderID,
"is_dm", msgCtx.IsDirectMsg,
"is_mention", msgCtx.IsMention,
"command", msgCtx.Command,
)
if a.cfg.Personality.Behavior.TypingIndicator {
_ = a.matrix.SendTyping(ctx, evt.RoomID.String(), true)
defer a.matrix.SendTyping(ctx, evt.RoomID.String(), false)
}
actions := decision.Evaluate(msgCtx, a.rules)
a.logger.Debug("rules evaluated", "matched_actions", len(actions))
// If no rules matched and the message mentions the bot or is a DM, use LLM.
if len(actions) == 0 && (msgCtx.IsMention || msgCtx.IsDirectMsg) {
a.logger.Debug("no rules matched, falling back to LLM")
actions = []decision.Action{{
Kind: decision.ActionKindLLM,
LLM: &decision.LLMAction{ContextKey: msgCtx.RoomID},
}}
}
if len(actions) == 0 {
a.logger.Debug("no actions, ignoring message",
"is_dm", msgCtx.IsDirectMsg,
"is_mention", msgCtx.IsMention,
)
return
}
// Expand LLM actions inline — with tool-use loop when enabled
expanded := make([]decision.Action, 0, len(actions))
for _, act := range actions {
if act.Kind == decision.ActionKindLLM {
reply, err := a.runLLM(ctx, msgCtx)
if err != nil {
a.logger.Error("llm error", "err", err)
expanded = append(expanded, decision.Action{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error."},
})
} else {
expanded = append(expanded, decision.Action{
Kind: decision.ActionKindReply,
Reply: &decision.ReplyAction{Content: reply},
})
}
} else {
expanded = append(expanded, act)
}
}
a.runner.Execute(ctx, evt.RoomID.String(), expanded)
}
func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (string, error) {
a.logger.Debug("calling LLM",
"model", a.cfg.LLM.Primary.Model,
"provider", a.cfg.LLM.Primary.Provider,
)
// Load system prompt from file if configured, else use description
systemPrompt := a.cfg.Agent.Description
messages := []coretypes.Message{
{Role: coretypes.RoleUser, Content: msgCtx.Content},
}
// Build tool specs for the LLM if tool_use is enabled
var llmTools []coretypes.ToolSpec
if a.cfg.LLM.ToolUse.Enabled && a.toolReg.Len() > 0 {
llmTools = a.toolReg.ToLLMSpecs()
a.logger.Debug("tools available for LLM", "count", len(llmTools))
}
maxIter := a.cfg.LLM.ToolUse.MaxIterations
if maxIter <= 0 {
maxIter = defaultMaxToolIterations
}
// Tool-use loop: call LLM → execute tools → feed results back → repeat
for i := 0; i < maxIter; i++ {
req := coretypes.CompletionRequest{
Model: a.cfg.LLM.Primary.Model,
MaxTokens: a.cfg.LLM.Primary.MaxTokens,
Temperature: a.cfg.LLM.Primary.Temperature,
SystemPrompt: systemPrompt,
Messages: messages,
Tools: llmTools,
}
resp, err := a.llm(ctx, req)
if err != nil {
a.logger.Error("LLM call failed", "model", req.Model, "err", err)
return "", err
}
a.logger.Debug("LLM responded",
"content_len", len(resp.Content),
"tool_calls", len(resp.ToolCalls),
"finish_reason", resp.FinishReason,
)
// No tool calls — return the text response
if len(resp.ToolCalls) == 0 {
return resp.Content, nil
}
// Append assistant message with tool calls to conversation
messages = append(messages, coretypes.Message{
Role: coretypes.RoleAssistant,
Content: resp.Content,
ToolCalls: resp.ToolCalls,
})
// Execute each tool and append results
for _, tc := range resp.ToolCalls {
a.logger.Info("executing tool",
"tool", tc.Name,
"call_id", tc.ID,
)
result := a.toolReg.Execute(ctx, tc.Name, tc.Arguments)
output := result.Output
if result.Err != nil {
output = fmt.Sprintf("error: %s", result.Err)
a.logger.Warn("tool execution error",
"tool", tc.Name,
"err", result.Err,
)
} else {
a.logger.Debug("tool executed",
"tool", tc.Name,
"output_len", len(output),
)
}
messages = append(messages, coretypes.Message{
Role: coretypes.RoleTool,
Content: output,
ToolCallID: tc.ID,
})
}
}
// Max iterations reached — return whatever we have
a.logger.Warn("tool-use loop reached max iterations", "max", maxIter)
return "I've reached the maximum number of tool iterations. Here's what I found so far.", nil
}
// buildToolRegistry creates a Registry with tools enabled in the agent's config.
func buildToolRegistry(cfg *config.AgentConfig, sshExec *ssh.Executor, matrixClient *matrix.Client, logger *slog.Logger) *tools.Registry {
reg := tools.NewRegistry()
if cfg.Tools.HTTP.Enabled {
reg.Register(tools.NewHTTPGet(cfg.Tools.HTTP))
reg.Register(tools.NewHTTPPost(cfg.Tools.HTTP))
logger.Debug("registered http tools")
}
if cfg.Tools.SSH.Enabled {
reg.Register(tools.NewSSHCommand(cfg.Tools.SSH, sshExec))
logger.Debug("registered ssh tool")
}
if cfg.Tools.FileOps.Enabled {
reg.Register(tools.NewReadFile(cfg.Tools.FileOps))
logger.Debug("registered file tool")
}
// current_time is always available
reg.Register(tools.NewCurrentTime())
logger.Debug("registered current_time tool")
// matrix_send is always available
reg.Register(tools.NewMatrixSend(matrixClient))
logger.Debug("registered matrix tool")
return reg
}