0eb9e8741d
Nuevo agente Matrix especializado en consultas meteorológicas. Usa la API pública Open-Meteo (sin API key) para obtener condiciones actuales y previsión de 3 días para cualquier ciudad. Incluye: - agents/meteorologo/ — reglas puras, config.yaml, system prompt - tools/weather.go — tool get_weather (geocoding + forecast) - Registro en runtime.go (tool registry) y launcher (rulesRegistry) El agente responde a DMs y menciones delegando al LLM con tool_use habilitado. No tiene comandos directos (!xxx). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
773 lines
24 KiB
Go
773 lines
24 KiB
Go
// Package agents defines the Agent runtime that ties core and shell together.
|
|
package agents
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"maunium.net/go/mautrix"
|
|
"maunium.net/go/mautrix/event"
|
|
|
|
"github.com/enmanuel/agents/internal/config"
|
|
"github.com/enmanuel/agents/pkg/command"
|
|
"github.com/enmanuel/agents/pkg/decision"
|
|
coretypes "github.com/enmanuel/agents/pkg/llm"
|
|
"github.com/enmanuel/agents/pkg/memory"
|
|
"github.com/enmanuel/agents/pkg/orchestration"
|
|
"github.com/enmanuel/agents/pkg/personality"
|
|
"github.com/enmanuel/agents/shell/bus"
|
|
"github.com/enmanuel/agents/shell/effects"
|
|
shellknowledge "github.com/enmanuel/agents/shell/knowledge"
|
|
shelllm "github.com/enmanuel/agents/shell/llm"
|
|
"github.com/enmanuel/agents/shell/matrix"
|
|
shellmem "github.com/enmanuel/agents/shell/memory"
|
|
"github.com/enmanuel/agents/shell/ssh"
|
|
"github.com/enmanuel/agents/tools"
|
|
)
|
|
|
|
const (
|
|
defaultMaxToolIterations = 5
|
|
defaultWindowSize = 20
|
|
)
|
|
|
|
// CommandHandler executes a built-in command and returns the response text.
|
|
type CommandHandler func(ctx context.Context, msgCtx decision.MessageContext) string
|
|
|
|
// Agent is the assembled runtime: pure core + impure shell.
|
|
type Agent struct {
|
|
cfg *config.AgentConfig
|
|
personality personality.Personality
|
|
rules []decision.Rule
|
|
llm coretypes.CompleteFunc // nil when no LLM configured (simple_bot)
|
|
matrix *matrix.Client
|
|
runner *effects.Runner
|
|
listener *matrix.Listener
|
|
toolReg *tools.Registry
|
|
logger *slog.Logger
|
|
cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown
|
|
|
|
// Commands — handlers keyed by canonical name; cmdAliases maps alias → canonical
|
|
commands map[string]CommandHandler
|
|
cmdAliases map[string]string // alias → canonical name
|
|
customSpecs []command.Spec // specs from RegisterCommand (for !help)
|
|
startTime time.Time
|
|
|
|
// Memory
|
|
windows map[string]memory.Window
|
|
windowsMu sync.RWMutex
|
|
memStore memory.Store // nil when memory is disabled
|
|
windowSize int
|
|
roomCtx *tools.RoomContext
|
|
|
|
// Knowledge store — non-nil when knowledge is enabled
|
|
knowledgeStore *shellknowledge.FileStore
|
|
|
|
// Bus — set via SetBus() when running under the unified launcher
|
|
agentBus *bus.Bus
|
|
}
|
|
|
|
// ClearWindow resets the conversation window for a room and deletes persisted
|
|
// messages from SQLite so the agent starts fresh. Implements tools.WindowClearer.
|
|
func (a *Agent) ClearWindow(roomID string) {
|
|
a.windowsMu.Lock()
|
|
a.windows[roomID] = memory.NewWindow(a.windowSize)
|
|
a.windowsMu.Unlock()
|
|
|
|
if a.memStore != nil {
|
|
if err := a.memStore.DeleteMessages(
|
|
context.Background(), a.cfg.Agent.ID, &roomID,
|
|
); err != nil {
|
|
a.logger.Warn("failed to delete persisted messages on clear", "room", roomID, "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// New assembles an Agent from its config, rules, and logger.
|
|
func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (*Agent, error) {
|
|
// Matrix client
|
|
matrixClient, err := matrix.New(cfg.Matrix)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("matrix client: %w", err)
|
|
}
|
|
|
|
// E2EE — initialize before the sync loop starts
|
|
var cryptoStore io.Closer
|
|
if cfg.Matrix.Encryption.Enabled {
|
|
storePath := filepath.Join(cfg.Matrix.Encryption.StorePath, "crypto.db")
|
|
pickleKey := os.Getenv(cfg.Matrix.Encryption.PickleKeyEnv)
|
|
logger.Info("initializing e2ee", "store", storePath)
|
|
cryptoStore, err = matrixClient.InitCrypto(context.Background(), storePath, pickleKey, cfg.Agent.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("e2ee init: %w", err)
|
|
}
|
|
|
|
// Auto-fetch cross-signing private keys from SSSS if recovery key is configured.
|
|
if envName := cfg.Matrix.Encryption.RecoveryKeyEnv; envName != "" {
|
|
if rk := os.Getenv(envName); rk != "" {
|
|
if err := matrixClient.FetchCrossSigningKeys(context.Background(), rk); err != nil {
|
|
logger.Warn("failed to fetch cross-signing keys from SSSS (non-fatal)", "err", err)
|
|
} else {
|
|
logger.Info("cross-signing private keys fetched from SSSS")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sign own device with the self-signing key so Element shows it as verified.
|
|
if err := matrixClient.SignOwnDevice(context.Background()); err != nil {
|
|
logger.Warn("failed to sign own device (non-fatal)", "err", err)
|
|
} else {
|
|
logger.Info("own device signed with cross-signing key")
|
|
}
|
|
|
|
logger.Info("e2ee ready")
|
|
}
|
|
|
|
// SSH executor
|
|
sshExec := ssh.NewExecutor(cfg.SSH, logger)
|
|
|
|
// LLM client — optional; if no provider is configured, the agent runs as simple_bot
|
|
var llmFunc coretypes.CompleteFunc
|
|
if cfg.LLM.Primary.Provider != "" {
|
|
llmLog := logger.With("component", "llm")
|
|
primaryLLM, err := shelllm.FromConfig(cfg.LLM.Primary, llmLog)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("primary LLM: %w", err)
|
|
}
|
|
|
|
llmFunc = primaryLLM
|
|
if cfg.LLM.Fallback.Provider != "" {
|
|
fallbackLLM, err := shelllm.FromConfig(cfg.LLM.Fallback, llmLog)
|
|
if err != nil {
|
|
logger.Warn("fallback LLM config error", "err", err)
|
|
} else {
|
|
llmFunc = shelllm.WithFallback(primaryLLM, fallbackLLM, cfg.LLM.Fallback, llmLog)
|
|
}
|
|
}
|
|
} else {
|
|
logger.Info("no LLM configured, running as command-only bot")
|
|
}
|
|
|
|
// Effects runner
|
|
runner := effects.NewRunner(matrixClient, sshExec, logger)
|
|
|
|
// Memory subsystem
|
|
var memStore memory.Store
|
|
windowSize := defaultWindowSize
|
|
roomCtx := &tools.RoomContext{}
|
|
|
|
if cfg.Memory.Enabled {
|
|
windowSize = cfg.Memory.WindowSize
|
|
if windowSize <= 0 {
|
|
windowSize = defaultWindowSize
|
|
}
|
|
|
|
dbPath := cfg.Memory.DBPath
|
|
if dbPath == "" {
|
|
dbPath = filepath.Join("agents", cfg.Agent.ID, "data", "memory.db")
|
|
}
|
|
store, err := shellmem.New(dbPath, logger)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("memory store: %w", err)
|
|
}
|
|
memStore = store
|
|
logger.Info("memory enabled", "window_size", windowSize, "db", dbPath)
|
|
}
|
|
|
|
// Knowledge store
|
|
var kStore *shellknowledge.FileStore
|
|
if cfg.Tools.Knowledge.Enabled {
|
|
knowledgeDir := cfg.Tools.Knowledge.Dir
|
|
if knowledgeDir == "" {
|
|
knowledgeDir = filepath.Join("agents", cfg.Agent.ID, "knowledge")
|
|
}
|
|
knowledgeDBPath := filepath.Join("agents", cfg.Agent.ID, "data", "knowledge.db")
|
|
var kErr error
|
|
kStore, kErr = shellknowledge.New(knowledgeDir, knowledgeDBPath, logger)
|
|
if kErr != nil {
|
|
logger.Error("knowledge_store_init_failed", "err", kErr)
|
|
} else {
|
|
if syncErr := kStore.Sync(context.Background()); syncErr != nil {
|
|
logger.Error("knowledge_sync_failed", "err", syncErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Tool registry — register tools enabled in config
|
|
toolReg := buildToolRegistry(cfg, sshExec, matrixClient, memStore, kStore, roomCtx, logger)
|
|
|
|
a := &Agent{
|
|
cfg: cfg,
|
|
rules: rules,
|
|
llm: llmFunc,
|
|
matrix: matrixClient,
|
|
runner: runner,
|
|
toolReg: toolReg,
|
|
logger: logger,
|
|
cryptoStore: cryptoStore,
|
|
commands: make(map[string]CommandHandler),
|
|
cmdAliases: command.BuiltinNames(),
|
|
startTime: time.Now(),
|
|
windows: make(map[string]memory.Window),
|
|
memStore: memStore,
|
|
knowledgeStore: kStore,
|
|
windowSize: windowSize,
|
|
roomCtx: roomCtx,
|
|
}
|
|
|
|
// Register built-in command handlers
|
|
a.registerBuiltinCommands()
|
|
|
|
// Register memory_clear_context with self as WindowClearer (after a is created)
|
|
if cfg.Tools.Memory.Enabled && memStore != nil {
|
|
toolReg.Register(tools.NewMemoryClearContext(a, roomCtx))
|
|
}
|
|
|
|
// Matrix event listener
|
|
a.listener = matrix.NewListener(matrixClient, cfg.Matrix, a.handleEvent, logger)
|
|
|
|
return a, nil
|
|
}
|
|
|
|
// RegisterCommand adds a custom command handler for this agent.
|
|
// The spec provides metadata (aliases, description, usage) for !help.
|
|
// Must be called before Run().
|
|
func (a *Agent) RegisterCommand(spec command.Spec, handler CommandHandler) {
|
|
a.commands[spec.Name] = handler
|
|
a.cmdAliases[spec.Name] = spec.Name
|
|
for _, alias := range spec.Aliases {
|
|
a.cmdAliases[alias] = spec.Name
|
|
}
|
|
a.customSpecs = append(a.customSpecs, spec)
|
|
a.logger.Info("command_registered", "command", spec.Name, "aliases", spec.Aliases)
|
|
}
|
|
|
|
// SetBus attaches the agent to the inter-agent bus for orchestration.
|
|
// Must be called before Run().
|
|
func (a *Agent) SetBus(b *bus.Bus) {
|
|
a.agentBus = b
|
|
}
|
|
|
|
// SetInterceptor configures the listener to skip events in orchestrated rooms.
|
|
func (a *Agent) SetInterceptor(fn matrix.InterceptFunc) {
|
|
a.listener.SetInterceptor(fn)
|
|
}
|
|
|
|
// SetMembershipNotify registers a callback for room membership changes.
|
|
func (a *Agent) SetMembershipNotify(fn matrix.MembershipNotifyFunc) {
|
|
a.listener.SetMembershipNotify(fn)
|
|
}
|
|
|
|
// RawMatrixClient returns the underlying *mautrix.Client for room scanning.
|
|
func (a *Agent) RawMatrixClient() *mautrix.Client {
|
|
return a.matrix.Raw()
|
|
}
|
|
|
|
// Run starts the agent sync loop. Blocks until ctx is cancelled.
|
|
func (a *Agent) Run(ctx context.Context) error {
|
|
if a.cryptoStore != nil {
|
|
defer a.cryptoStore.Close()
|
|
}
|
|
if a.memStore != nil {
|
|
defer a.memStore.Close()
|
|
}
|
|
if a.knowledgeStore != nil {
|
|
defer a.knowledgeStore.Close()
|
|
}
|
|
a.logger.Info("agent starting",
|
|
"id", a.cfg.Agent.ID,
|
|
"name", a.cfg.Agent.Name,
|
|
"tools", a.toolReg.Names(),
|
|
)
|
|
|
|
// Start bus listener if connected to the orchestration bus
|
|
if a.agentBus != nil {
|
|
ch := a.agentBus.Subscribe(bus.AgentID(a.cfg.Agent.ID))
|
|
go a.listenBus(ctx, ch)
|
|
a.logger.Info("bus listener started")
|
|
}
|
|
|
|
return a.listener.Run(ctx)
|
|
}
|
|
|
|
// listenBus processes messages from the inter-agent bus.
|
|
func (a *Agent) listenBus(ctx context.Context, ch <-chan bus.AgentMessage) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case msg, ok := <-ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
if msg.Kind == bus.KindTask {
|
|
a.handleTaskEvent(ctx, msg)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleTaskEvent processes a task delegated by the orchestrator.
|
|
// The bot generates a response and sends it both to Matrix and back via bus.
|
|
func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) {
|
|
taskJSON, ok := msg.Payload["task_json"]
|
|
if !ok {
|
|
a.logger.Error("task message missing task_json payload")
|
|
return
|
|
}
|
|
|
|
task, err := orchestration.UnmarshalTaskEvent(taskJSON)
|
|
if err != nil {
|
|
a.logger.Error("failed to unmarshal task event", "err", err)
|
|
return
|
|
}
|
|
|
|
a.logger.Info("handling orchestrated task",
|
|
"task_id", task.TaskID,
|
|
"room", task.TargetRoomID,
|
|
"sender", task.OriginalSender,
|
|
"iteration", task.Iteration,
|
|
)
|
|
|
|
roomID := task.TargetRoomID
|
|
|
|
// Update room context for memory tools
|
|
a.roomCtx.Set(roomID)
|
|
|
|
if a.cfg.Personality.Behavior.TypingIndicator {
|
|
_ = a.matrix.SendTyping(ctx, roomID, true)
|
|
defer a.matrix.SendTyping(ctx, roomID, false)
|
|
}
|
|
|
|
// Build a synthetic MessageContext from the task
|
|
msgCtx := decision.MessageContext{
|
|
SenderID: task.OriginalSender,
|
|
RoomID: roomID,
|
|
Content: task.OriginalQuestion,
|
|
IsDirectMsg: false,
|
|
IsMention: true, // treat orchestrated tasks like mentions
|
|
}
|
|
|
|
// If there are previous responses, prepend context
|
|
if len(task.PreviousResponses) > 0 {
|
|
var context string
|
|
for _, pr := range task.PreviousResponses {
|
|
context += fmt.Sprintf("[Previous response from %s]: %s\n\n", pr.BotID, pr.Text)
|
|
}
|
|
msgCtx.Content = context + "Original question: " + task.OriginalQuestion +
|
|
"\n\nPlease provide an improved or complementary answer."
|
|
}
|
|
|
|
// Load memory and run LLM
|
|
a.ensureWindowLoaded(ctx, roomID)
|
|
a.appendToWindow(roomID, coretypes.Message{
|
|
Role: coretypes.RoleUser, Content: msgCtx.Content,
|
|
})
|
|
|
|
reply, err := a.runLLM(ctx, msgCtx)
|
|
|
|
// Build the result to send back via bus
|
|
result := orchestration.TaskResult{
|
|
TaskID: task.TaskID,
|
|
BotID: a.cfg.Agent.ID,
|
|
}
|
|
|
|
if err != nil {
|
|
a.logger.Error("LLM error during orchestrated task", "err", err)
|
|
result.Error = err.Error()
|
|
reply = "Sorry, I encountered an error."
|
|
} else {
|
|
result.Text = reply
|
|
// Persist assistant reply
|
|
a.appendToWindow(roomID, coretypes.Message{
|
|
Role: coretypes.RoleAssistant, Content: reply,
|
|
})
|
|
a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply)
|
|
}
|
|
|
|
// Send reply to Matrix room
|
|
if sendErr := a.matrix.SendMarkdown(ctx, roomID, reply); sendErr != nil {
|
|
a.logger.Error("failed to send orchestrated reply to Matrix", "err", sendErr)
|
|
}
|
|
|
|
// Send result back to orchestrator via bus
|
|
resultJSON, marshalErr := orchestration.MarshalTaskResult(result)
|
|
if marshalErr != nil {
|
|
a.logger.Error("failed to marshal task result", "err", marshalErr)
|
|
return
|
|
}
|
|
|
|
replyMsg := bus.AgentMessage{
|
|
From: bus.AgentID(a.cfg.Agent.ID),
|
|
To: msg.From,
|
|
Kind: bus.KindTaskResult,
|
|
Payload: map[string]string{"result_json": resultJSON},
|
|
}
|
|
|
|
if busErr := a.agentBus.Reply(task.TaskID, replyMsg); busErr != nil {
|
|
a.logger.Error("failed to send task result via bus", "err", busErr)
|
|
}
|
|
}
|
|
|
|
// handleEvent is called by the matrix Listener for each filtered incoming event.
|
|
func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) {
|
|
a.logger.Debug("handling event",
|
|
"sender", msgCtx.SenderID,
|
|
"is_dm", msgCtx.IsDirectMsg,
|
|
"is_mention", msgCtx.IsMention,
|
|
"command", msgCtx.Command,
|
|
)
|
|
|
|
roomID := evt.RoomID.String()
|
|
|
|
// Update room context for memory tools
|
|
a.roomCtx.Set(roomID)
|
|
|
|
if a.cfg.Personality.Behavior.TypingIndicator {
|
|
_ = a.matrix.SendTyping(ctx, roomID, true)
|
|
defer a.matrix.SendTyping(ctx, roomID, false)
|
|
}
|
|
|
|
// ── Command flow ─────────────────────────────────────────────────
|
|
// Commands (!xxx) always resolve before rules or LLM. Never reach the LLM.
|
|
// Priority: built-in → unknown (agent-specific commands can be added via RegisterCommand).
|
|
if msgCtx.Command != "" {
|
|
a.logger.Info("command_received",
|
|
"command", msgCtx.Command,
|
|
"sender", msgCtx.SenderID,
|
|
"room", roomID,
|
|
"args", msgCtx.Args,
|
|
)
|
|
|
|
// Resolve aliases
|
|
cmdName := msgCtx.Command
|
|
if canonical, ok := a.cmdAliases[cmdName]; ok {
|
|
cmdName = canonical
|
|
}
|
|
|
|
if handler, ok := a.commands[cmdName]; ok {
|
|
a.logger.Info("command_executed", "command", cmdName)
|
|
reply := handler(ctx, msgCtx)
|
|
_ = a.matrix.SendMarkdown(ctx, roomID, reply)
|
|
return
|
|
}
|
|
|
|
// Unknown command — never falls through to rules or LLM
|
|
a.logger.Info("command_unknown", "command", msgCtx.Command)
|
|
_ = a.matrix.SendMarkdown(ctx, roomID,
|
|
fmt.Sprintf("Comando desconocido: !%s. Usa !help para ver comandos disponibles.", msgCtx.Command))
|
|
return
|
|
}
|
|
|
|
// ── Non-command flow ─────────────────────────────────────────────
|
|
actions := decision.Evaluate(msgCtx, a.rules)
|
|
a.logger.Debug("rules evaluated", "matched_actions", len(actions))
|
|
|
|
// If no rules matched and the message mentions the bot or is a DM, use LLM.
|
|
if len(actions) == 0 && (msgCtx.IsMention || msgCtx.IsDirectMsg) {
|
|
if a.llm == nil {
|
|
// Simple bot: no LLM, ignore non-command messages
|
|
a.logger.Debug("no LLM configured, ignoring non-command message")
|
|
return
|
|
}
|
|
a.logger.Debug("no rules matched, falling back to LLM")
|
|
actions = []decision.Action{{
|
|
Kind: decision.ActionKindLLM,
|
|
LLM: &decision.LLMAction{ContextKey: msgCtx.RoomID},
|
|
}}
|
|
}
|
|
|
|
if len(actions) == 0 {
|
|
a.logger.Debug("no actions, ignoring message",
|
|
"is_dm", msgCtx.IsDirectMsg,
|
|
"is_mention", msgCtx.IsMention,
|
|
)
|
|
return
|
|
}
|
|
|
|
a.executeActions(ctx, roomID, msgCtx, actions)
|
|
}
|
|
|
|
// executeActions expands LLM actions and runs the effects runner.
|
|
func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decision.MessageContext, actions []decision.Action) {
|
|
expanded := make([]decision.Action, 0, len(actions))
|
|
for _, act := range actions {
|
|
if act.Kind == decision.ActionKindLLM {
|
|
if a.llm == nil {
|
|
a.logger.Warn("LLM action requested but no LLM configured")
|
|
expanded = append(expanded, decision.Action{
|
|
Kind: decision.ActionKindReply,
|
|
Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado."},
|
|
})
|
|
continue
|
|
}
|
|
// Memory: load window + append user message before LLM call
|
|
a.ensureWindowLoaded(ctx, roomID)
|
|
a.appendToWindow(roomID, coretypes.Message{
|
|
Role: coretypes.RoleUser, Content: msgCtx.Content,
|
|
})
|
|
a.persistMessage(ctx, roomID, coretypes.RoleUser, msgCtx.Content)
|
|
|
|
reply, err := a.runLLM(ctx, msgCtx)
|
|
if err != nil {
|
|
a.logger.Error("llm error", "err", err)
|
|
expanded = append(expanded, decision.Action{
|
|
Kind: decision.ActionKindReply,
|
|
Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error."},
|
|
})
|
|
} else {
|
|
expanded = append(expanded, decision.Action{
|
|
Kind: decision.ActionKindReply,
|
|
Reply: &decision.ReplyAction{Content: reply},
|
|
})
|
|
|
|
// Memory: append assistant reply after LLM call
|
|
a.appendToWindow(roomID, coretypes.Message{
|
|
Role: coretypes.RoleAssistant, Content: reply,
|
|
})
|
|
a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply)
|
|
}
|
|
} else {
|
|
expanded = append(expanded, act)
|
|
}
|
|
}
|
|
|
|
a.runner.Execute(ctx, roomID, expanded)
|
|
}
|
|
|
|
func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (string, error) {
|
|
a.logger.Debug("calling LLM",
|
|
"model", a.cfg.LLM.Primary.Model,
|
|
"provider", a.cfg.LLM.Primary.Provider,
|
|
)
|
|
|
|
// Load system prompt from file if configured, else use description
|
|
systemPrompt := a.cfg.Agent.Description
|
|
|
|
// Build messages: conversation history from window (includes current user msg)
|
|
messages := a.getWindowMessages(msgCtx.RoomID)
|
|
if len(messages) == 0 {
|
|
// Fallback if memory is disabled: just the current message
|
|
messages = []coretypes.Message{
|
|
{Role: coretypes.RoleUser, Content: msgCtx.Content},
|
|
}
|
|
}
|
|
|
|
// Build tool specs for the LLM if tool_use is enabled
|
|
var llmTools []coretypes.ToolSpec
|
|
if a.cfg.LLM.ToolUse.Enabled && a.toolReg.Len() > 0 {
|
|
llmTools = a.toolReg.ToLLMSpecs()
|
|
a.logger.Debug("tools available for LLM", "count", len(llmTools))
|
|
}
|
|
|
|
maxIter := a.cfg.LLM.ToolUse.MaxIterations
|
|
if maxIter <= 0 {
|
|
maxIter = defaultMaxToolIterations
|
|
}
|
|
|
|
// Tool-use loop: call LLM → execute tools → feed results back → repeat
|
|
for i := 0; i < maxIter; i++ {
|
|
req := coretypes.CompletionRequest{
|
|
Model: a.cfg.LLM.Primary.Model,
|
|
MaxTokens: a.cfg.LLM.Primary.MaxTokens,
|
|
Temperature: a.cfg.LLM.Primary.Temperature,
|
|
SystemPrompt: systemPrompt,
|
|
Messages: messages,
|
|
Tools: llmTools,
|
|
}
|
|
|
|
resp, err := a.llm(ctx, req)
|
|
if err != nil {
|
|
a.logger.Error("LLM call failed", "model", req.Model, "err", err)
|
|
return "", err
|
|
}
|
|
|
|
a.logger.Debug("LLM responded",
|
|
"content_len", len(resp.Content),
|
|
"tool_calls", len(resp.ToolCalls),
|
|
"finish_reason", resp.FinishReason,
|
|
)
|
|
|
|
// No tool calls — return the text response
|
|
if len(resp.ToolCalls) == 0 {
|
|
return resp.Content, nil
|
|
}
|
|
|
|
// Append assistant message with tool calls to conversation
|
|
messages = append(messages, coretypes.Message{
|
|
Role: coretypes.RoleAssistant,
|
|
Content: resp.Content,
|
|
ToolCalls: resp.ToolCalls,
|
|
})
|
|
|
|
// Execute each tool and append results
|
|
for _, tc := range resp.ToolCalls {
|
|
a.logger.Info("executing tool",
|
|
"tool", tc.Name,
|
|
"call_id", tc.ID,
|
|
)
|
|
|
|
// Notify the room that a tool is being called
|
|
toolNotice := fmt.Sprintf("🔨 <em>%s</em>", tc.Name)
|
|
if err := a.matrix.SendMarkdown(ctx, msgCtx.RoomID, toolNotice); err != nil {
|
|
a.logger.Warn("failed to send tool call notice", "tool", tc.Name, "err", err)
|
|
}
|
|
|
|
result := a.toolReg.Execute(ctx, tc.Name, tc.Arguments)
|
|
|
|
output := result.Output
|
|
if result.Err != nil {
|
|
output = fmt.Sprintf("error: %s", result.Err)
|
|
a.logger.Warn("tool execution error",
|
|
"tool", tc.Name,
|
|
"err", result.Err,
|
|
)
|
|
} else {
|
|
a.logger.Debug("tool executed",
|
|
"tool", tc.Name,
|
|
"output_len", len(output),
|
|
)
|
|
}
|
|
|
|
messages = append(messages, coretypes.Message{
|
|
Role: coretypes.RoleTool,
|
|
Content: output,
|
|
ToolCallID: tc.ID,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Max iterations reached — return whatever we have
|
|
a.logger.Warn("tool-use loop reached max iterations", "max", maxIter)
|
|
return "I've reached the maximum number of tool iterations. Here's what I found so far.", nil
|
|
}
|
|
|
|
// ── Memory helpers ───────────────────────────────────────────────────────
|
|
|
|
// ensureWindowLoaded loads the conversation window from SQLite on first access for a room.
|
|
func (a *Agent) ensureWindowLoaded(ctx context.Context, roomID string) {
|
|
a.windowsMu.Lock()
|
|
defer a.windowsMu.Unlock()
|
|
if _, ok := a.windows[roomID]; ok {
|
|
return
|
|
}
|
|
w := memory.NewWindow(a.windowSize)
|
|
if a.memStore != nil {
|
|
msgs, err := a.memStore.LoadMessages(ctx, a.cfg.Agent.ID, roomID, a.windowSize)
|
|
if err != nil {
|
|
a.logger.Warn("failed to load message history", "room", roomID, "err", err)
|
|
} else {
|
|
for _, m := range msgs {
|
|
w = w.Append(coretypes.Message{Role: m.Role, Content: m.Content})
|
|
}
|
|
if len(msgs) > 0 {
|
|
a.logger.Debug("loaded message history", "room", roomID, "count", len(msgs))
|
|
}
|
|
}
|
|
}
|
|
a.windows[roomID] = w
|
|
}
|
|
|
|
// appendToWindow adds a message to the in-memory conversation window.
|
|
func (a *Agent) appendToWindow(roomID string, msg coretypes.Message) {
|
|
a.windowsMu.Lock()
|
|
defer a.windowsMu.Unlock()
|
|
w, ok := a.windows[roomID]
|
|
if !ok {
|
|
w = memory.NewWindow(a.windowSize)
|
|
}
|
|
a.windows[roomID] = w.Append(msg)
|
|
}
|
|
|
|
// getWindowMessages returns a copy of the conversation window for a room.
|
|
func (a *Agent) getWindowMessages(roomID string) []coretypes.Message {
|
|
a.windowsMu.RLock()
|
|
defer a.windowsMu.RUnlock()
|
|
w, ok := a.windows[roomID]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return w.ToLLMMessages()
|
|
}
|
|
|
|
// persistMessage saves a message to the SQLite store (no-op if store is nil).
|
|
func (a *Agent) persistMessage(ctx context.Context, roomID string, role coretypes.Role, content string) {
|
|
if a.memStore == nil {
|
|
return
|
|
}
|
|
if err := a.memStore.SaveMessage(ctx, memory.HistoryMessage{
|
|
AgentID: a.cfg.Agent.ID,
|
|
RoomID: roomID,
|
|
Role: role,
|
|
Content: content,
|
|
}); err != nil {
|
|
a.logger.Warn("failed to persist message", "room", roomID, "err", err)
|
|
}
|
|
}
|
|
|
|
// buildToolRegistry creates a Registry with tools enabled in the agent's config.
|
|
func buildToolRegistry(
|
|
cfg *config.AgentConfig,
|
|
sshExec *ssh.Executor,
|
|
matrixClient *matrix.Client,
|
|
memStore memory.Store,
|
|
kStore *shellknowledge.FileStore,
|
|
roomCtx *tools.RoomContext,
|
|
logger *slog.Logger,
|
|
) *tools.Registry {
|
|
reg := tools.NewRegistry(logger)
|
|
|
|
if cfg.Tools.HTTP.Enabled {
|
|
reg.Register(tools.NewHTTPGet(cfg.Tools.HTTP))
|
|
reg.Register(tools.NewHTTPPost(cfg.Tools.HTTP))
|
|
logger.Debug("registered http tools")
|
|
}
|
|
|
|
if cfg.Tools.SSH.Enabled {
|
|
reg.Register(tools.NewSSHCommand(cfg.Tools.SSH, sshExec))
|
|
logger.Debug("registered ssh tool")
|
|
}
|
|
|
|
if cfg.Tools.FileOps.Enabled {
|
|
reg.Register(tools.NewReadFile(cfg.Tools.FileOps))
|
|
logger.Debug("registered file tool")
|
|
}
|
|
|
|
// current_time is always available
|
|
reg.Register(tools.NewCurrentTime())
|
|
logger.Debug("registered current_time tool")
|
|
|
|
// weather tool is always available
|
|
reg.Register(tools.NewWeather())
|
|
logger.Debug("registered weather tool")
|
|
|
|
// matrix_send is always available
|
|
reg.Register(tools.NewMatrixSend(matrixClient))
|
|
logger.Debug("registered matrix tool")
|
|
|
|
// Memory tools (memory_clear_context registered later since it needs the Agent)
|
|
if cfg.Tools.Memory.Enabled && memStore != nil {
|
|
reg.Register(tools.NewMemorySave(cfg.Agent.ID, memStore))
|
|
reg.Register(tools.NewMemoryRecall(cfg.Agent.ID, memStore))
|
|
reg.Register(tools.NewMemoryForget(cfg.Agent.ID, memStore))
|
|
reg.Register(tools.NewMemorySummary(cfg.Agent.ID, memStore))
|
|
logger.Debug("registered memory tools")
|
|
}
|
|
|
|
// Knowledge tools
|
|
if cfg.Tools.Knowledge.Enabled && kStore != nil {
|
|
reg.Register(tools.NewKnowledgeSearch(kStore))
|
|
reg.Register(tools.NewKnowledgeRead(kStore))
|
|
reg.Register(tools.NewKnowledgeWrite(kStore))
|
|
reg.Register(tools.NewKnowledgeList(kStore))
|
|
logger.Debug("registered knowledge tools")
|
|
}
|
|
|
|
return reg
|
|
}
|