9d1ab2d28e
Se instancia shellcron.Scheduler en agents.New() cuando cfg.Schedules tiene entradas (scheduler queda nil en agentes sin schedules, sin overhead). En agents.Run() se arranca el scheduler en una goroutine independiente que termina cuando el ctx del agente es cancelado — el shutdown es limpio gracias a cron.Stop() que devuelve un contexto que se espera. La integración no rompe agentes existentes: el campo scheduler es nil por defecto y todo el código nuevo está tras if a.scheduler != nil.
1040 lines
33 KiB
Go
1040 lines
33 KiB
Go
// Package agents defines the Agent runtime that ties core and shell together.
|
|
package agents
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"maunium.net/go/mautrix"
|
|
"maunium.net/go/mautrix/event"
|
|
|
|
"github.com/enmanuel/agents/internal/config"
|
|
"github.com/enmanuel/agents/pkg/acl"
|
|
"github.com/enmanuel/agents/pkg/command"
|
|
"github.com/enmanuel/agents/pkg/decision"
|
|
coretypes "github.com/enmanuel/agents/pkg/llm"
|
|
"github.com/enmanuel/agents/pkg/memory"
|
|
"github.com/enmanuel/agents/pkg/orchestration"
|
|
"github.com/enmanuel/agents/pkg/personality"
|
|
"github.com/enmanuel/agents/pkg/sanitize"
|
|
"github.com/enmanuel/agents/shell/bus"
|
|
shellcron "github.com/enmanuel/agents/shell/cron"
|
|
"github.com/enmanuel/agents/shell/effects"
|
|
shellknowledge "github.com/enmanuel/agents/shell/knowledge"
|
|
shelllm "github.com/enmanuel/agents/shell/llm"
|
|
"github.com/enmanuel/agents/shell/matrix"
|
|
shellmem "github.com/enmanuel/agents/shell/memory"
|
|
"github.com/enmanuel/agents/shell/ssh"
|
|
"github.com/enmanuel/agents/tools"
|
|
toolclock "github.com/enmanuel/agents/tools/clock"
|
|
toolfile "github.com/enmanuel/agents/tools/file"
|
|
toolhttp "github.com/enmanuel/agents/tools/http"
|
|
toolknowledge "github.com/enmanuel/agents/tools/knowledgetools"
|
|
toolmatrix "github.com/enmanuel/agents/tools/matrix"
|
|
toolmemory "github.com/enmanuel/agents/tools/memorytools"
|
|
toolssh "github.com/enmanuel/agents/tools/ssh"
|
|
toolweather "github.com/enmanuel/agents/tools/weather"
|
|
)
|
|
|
|
const (
|
|
defaultMaxToolIterations = 5
|
|
defaultWindowSize = 20
|
|
)
|
|
|
|
// CommandHandler executes a built-in command and returns the response text.
|
|
type CommandHandler func(ctx context.Context, msgCtx decision.MessageContext) string
|
|
|
|
// Agent is the assembled runtime: pure core + impure shell.
|
|
type Agent struct {
|
|
cfg *config.AgentConfig
|
|
personality personality.Personality
|
|
rules []decision.Rule
|
|
llm coretypes.CompleteFunc // nil when no LLM configured (simple_bot)
|
|
matrix *matrix.Client
|
|
runner *effects.Runner
|
|
listener *matrix.Listener
|
|
toolReg *tools.Registry
|
|
logger *slog.Logger
|
|
cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown
|
|
|
|
// Lifecycle — cancel stops this agent individually; done is closed when Run returns.
|
|
cancel context.CancelFunc
|
|
done chan struct{}
|
|
|
|
// Access control
|
|
acl acl.ACL
|
|
|
|
// Commands — handlers keyed by canonical name; cmdAliases maps alias → canonical
|
|
commands map[string]CommandHandler
|
|
cmdAliases map[string]string // alias → canonical name
|
|
customSpecs []command.Spec // specs from RegisterCommand (for !help)
|
|
startTime time.Time
|
|
|
|
// Memory
|
|
windows map[string]memory.Window
|
|
windowsMu sync.RWMutex
|
|
memStore memory.Store // nil when memory is disabled
|
|
windowSize int
|
|
roomCtx *toolmemory.RoomContext
|
|
|
|
// Prompt-commands — loaded from prompts/*.md at startup
|
|
promptCmds map[string]string // name → prompt content
|
|
|
|
// Knowledge store — non-nil when knowledge is enabled
|
|
knowledgeStore *shellknowledge.FileStore
|
|
|
|
// Sanitization options — nil when sanitization is disabled
|
|
sanitizeOpts *sanitize.Options
|
|
|
|
// Bus — set via SetBus() when running under the unified launcher
|
|
agentBus *bus.Bus
|
|
|
|
// Scheduler — nil when no schedules are configured
|
|
scheduler *shellcron.Scheduler
|
|
}
|
|
|
|
// ClearWindow resets the conversation window for a room and deletes persisted
|
|
// messages from SQLite so the agent starts fresh. Implements toolmemory.WindowClearer.
|
|
func (a *Agent) ClearWindow(roomID string) {
|
|
a.windowsMu.Lock()
|
|
a.windows[roomID] = memory.NewWindow(a.windowSize)
|
|
a.windowsMu.Unlock()
|
|
|
|
if a.memStore != nil {
|
|
if err := a.memStore.DeleteMessages(
|
|
context.Background(), a.cfg.Agent.ID, &roomID,
|
|
); err != nil {
|
|
a.logger.Warn("failed to delete persisted messages on clear", "room", roomID, "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// New assembles an Agent from its config, rules, and logger.
|
|
func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (*Agent, error) {
|
|
// Matrix client
|
|
matrixClient, err := matrix.New(cfg.Matrix)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("matrix client: %w", err)
|
|
}
|
|
|
|
// E2EE — initialize before the sync loop starts
|
|
var cryptoStore io.Closer
|
|
if cfg.Matrix.Encryption.Enabled {
|
|
storePath := filepath.Join(cfg.Matrix.Encryption.StorePath, "crypto.db")
|
|
pickleKey := os.Getenv(cfg.Matrix.Encryption.PickleKeyEnv)
|
|
logger.Info("initializing e2ee", "store", storePath)
|
|
cryptoStore, err = matrixClient.InitCrypto(context.Background(), storePath, pickleKey, cfg.Agent.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("e2ee init: %w", err)
|
|
}
|
|
|
|
// Auto-fetch cross-signing private keys from SSSS if recovery key is configured.
|
|
if envName := cfg.Matrix.Encryption.RecoveryKeyEnv; envName != "" {
|
|
if rk := os.Getenv(envName); rk != "" {
|
|
if err := matrixClient.FetchCrossSigningKeys(context.Background(), rk); err != nil {
|
|
logger.Warn("failed to fetch cross-signing keys from SSSS (non-fatal)", "err", err)
|
|
} else {
|
|
logger.Info("cross-signing private keys fetched from SSSS")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sign own device with the self-signing key so Element shows it as verified.
|
|
if err := matrixClient.SignOwnDevice(context.Background()); err != nil {
|
|
logger.Warn("failed to sign own device (non-fatal)", "err", err)
|
|
} else {
|
|
logger.Info("own device signed with cross-signing key")
|
|
}
|
|
|
|
logger.Info("e2ee ready")
|
|
}
|
|
|
|
// SSH executor
|
|
sshExec := ssh.NewExecutor(cfg.SSH, logger)
|
|
|
|
// LLM client — optional; if no provider is configured, the agent runs as simple_bot
|
|
var llmFunc coretypes.CompleteFunc
|
|
if cfg.LLM.Primary.Provider != "" {
|
|
llmLog := logger.With("component", "llm")
|
|
primaryLLM, err := shelllm.FromConfig(cfg.LLM.Primary, llmLog)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("primary LLM: %w", err)
|
|
}
|
|
|
|
llmFunc = primaryLLM
|
|
if cfg.LLM.Fallback.Provider != "" {
|
|
fallbackLLM, err := shelllm.FromConfig(cfg.LLM.Fallback, llmLog)
|
|
if err != nil {
|
|
logger.Warn("fallback LLM config error", "err", err)
|
|
} else {
|
|
llmFunc = shelllm.WithFallback(primaryLLM, fallbackLLM, cfg.LLM.Fallback, llmLog)
|
|
}
|
|
}
|
|
} else {
|
|
logger.Info("no LLM configured, running as command-only bot")
|
|
}
|
|
|
|
// Effects runner
|
|
runner := effects.NewRunner(matrixClient, sshExec, logger)
|
|
|
|
// Resolve base data path for this agent.
|
|
// Priority: config storage.base_path > $AGENTS_DATA_DIR/<id> > agents/<id>/data
|
|
dataBase := resolveDataBase(cfg)
|
|
logger.Debug("data base path", "path", dataBase)
|
|
|
|
// Memory subsystem
|
|
var memStore memory.Store
|
|
windowSize := defaultWindowSize
|
|
roomCtx := &toolmemory.RoomContext{}
|
|
|
|
if cfg.Memory.Enabled {
|
|
windowSize = cfg.Memory.WindowSize
|
|
if windowSize <= 0 {
|
|
windowSize = defaultWindowSize
|
|
}
|
|
|
|
dbPath := cfg.Memory.DBPath
|
|
if dbPath == "" {
|
|
dbPath = filepath.Join(dataBase, "memory.db")
|
|
}
|
|
store, err := shellmem.New(dbPath, logger)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("memory store: %w", err)
|
|
}
|
|
memStore = store
|
|
logger.Info("memory enabled", "window_size", windowSize, "db", dbPath)
|
|
}
|
|
|
|
// Knowledge store
|
|
var kStore *shellknowledge.FileStore
|
|
if cfg.Tools.Knowledge.Enabled {
|
|
knowledgeDir := cfg.Tools.Knowledge.Dir
|
|
if knowledgeDir == "" {
|
|
knowledgeDir = filepath.Join("agents", cfg.Agent.ID, "knowledge")
|
|
}
|
|
knowledgeDBPath := filepath.Join(dataBase, "knowledge.db")
|
|
var kErr error
|
|
kStore, kErr = shellknowledge.New(knowledgeDir, knowledgeDBPath, logger)
|
|
if kErr != nil {
|
|
logger.Error("knowledge_store_init_failed", "err", kErr)
|
|
} else {
|
|
if syncErr := kStore.Sync(context.Background()); syncErr != nil {
|
|
logger.Error("knowledge_sync_failed", "err", syncErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Build ACL from security roles config
|
|
aclRoles := make(map[string]acl.RoleDef, len(cfg.Security.Roles))
|
|
for name, r := range cfg.Security.Roles {
|
|
aclRoles[name] = acl.RoleDef{Users: r.Users, Actions: r.Actions}
|
|
}
|
|
agentACL := acl.FromMap(aclRoles)
|
|
if !agentACL.Empty() {
|
|
logger.Info("acl enabled", "roles", len(cfg.Security.Roles))
|
|
}
|
|
|
|
// Tool registry — register tools enabled in config
|
|
toolReg := buildToolRegistry(cfg, sshExec, matrixClient, memStore, kStore, roomCtx, logger)
|
|
|
|
// Rate limiting for tools
|
|
if cfg.Security.ToolRateLimit.Enabled {
|
|
maxCalls := cfg.Security.ToolRateLimit.MaxCallsPerMin
|
|
if maxCalls <= 0 {
|
|
maxCalls = 10
|
|
}
|
|
rl := tools.NewRateLimiter(maxCalls, time.Minute)
|
|
toolReg.SetRateLimiter(rl)
|
|
|
|
cleanupInterval := cfg.Security.ToolRateLimit.CleanupIntervalS
|
|
if cleanupInterval <= 0 {
|
|
cleanupInterval = 60
|
|
}
|
|
go func() {
|
|
ticker := time.NewTicker(time.Duration(cleanupInterval) * time.Second)
|
|
defer ticker.Stop()
|
|
for range ticker.C {
|
|
rl.Cleanup()
|
|
}
|
|
}()
|
|
logger.Info("tool rate limiting enabled", "max_calls_per_min", maxCalls)
|
|
}
|
|
|
|
a := &Agent{
|
|
cfg: cfg,
|
|
acl: agentACL,
|
|
rules: rules,
|
|
llm: llmFunc,
|
|
matrix: matrixClient,
|
|
runner: runner,
|
|
toolReg: toolReg,
|
|
logger: logger,
|
|
cryptoStore: cryptoStore,
|
|
done: make(chan struct{}),
|
|
commands: make(map[string]CommandHandler),
|
|
cmdAliases: command.BuiltinNames(),
|
|
startTime: time.Now(),
|
|
windows: make(map[string]memory.Window),
|
|
memStore: memStore,
|
|
knowledgeStore: kStore,
|
|
windowSize: windowSize,
|
|
roomCtx: roomCtx,
|
|
}
|
|
|
|
// Configure sanitization if enabled
|
|
if cfg.Security.Sanitize.Enabled {
|
|
minSev := parseSeverity(cfg.Security.Sanitize.MinSeverity)
|
|
a.sanitizeOpts = &sanitize.Options{
|
|
Mode: sanitize.ParseMode(cfg.Security.Sanitize.Mode),
|
|
MinSeverity: minSev,
|
|
DisabledPatterns: cfg.Security.Sanitize.DisabledPatterns,
|
|
}
|
|
logger.Info("input sanitization enabled",
|
|
"mode", a.sanitizeOpts.Mode,
|
|
"min_severity", minSev,
|
|
)
|
|
}
|
|
|
|
// Register built-in command handlers
|
|
a.registerBuiltinCommands()
|
|
|
|
// Load prompt-commands from prompts/ directory
|
|
a.loadPromptCommands()
|
|
|
|
// Register memory_clear_context with self as WindowClearer (after a is created)
|
|
if cfg.Tools.Memory.Enabled && memStore != nil {
|
|
toolReg.Register(toolmemory.NewMemoryClearContext(a, roomCtx))
|
|
}
|
|
|
|
// Cron scheduler — only when schedules are configured
|
|
if len(cfg.Schedules) > 0 {
|
|
a.scheduler = shellcron.New(cfg.Schedules, matrixClient, llmFunc, cfg.LLM.Primary.Model, logger)
|
|
logger.Info("cron scheduler configured", "schedules", len(cfg.Schedules))
|
|
}
|
|
|
|
// Matrix event listener
|
|
a.listener = matrix.NewListener(matrixClient, cfg.Matrix, a.handleEvent, logger)
|
|
|
|
return a, nil
|
|
}
|
|
|
|
// RegisterCommand adds a custom command handler for this agent.
|
|
// The spec provides metadata (aliases, description, usage) for !help.
|
|
// Must be called before Run().
|
|
func (a *Agent) RegisterCommand(spec command.Spec, handler CommandHandler) {
|
|
a.commands[spec.Name] = handler
|
|
a.cmdAliases[spec.Name] = spec.Name
|
|
for _, alias := range spec.Aliases {
|
|
a.cmdAliases[alias] = spec.Name
|
|
}
|
|
a.customSpecs = append(a.customSpecs, spec)
|
|
a.logger.Info("command_registered", "command", spec.Name, "aliases", spec.Aliases)
|
|
}
|
|
|
|
// loadPromptCommands scans the project-root prompts/ directory and loads all .md files.
|
|
func (a *Agent) loadPromptCommands() {
|
|
prompts, err := command.LoadPromptCommands("prompts")
|
|
if err != nil {
|
|
a.logger.Warn("failed to load prompt-commands", "err", err)
|
|
return
|
|
}
|
|
a.promptCmds = make(map[string]string, len(prompts))
|
|
for _, p := range prompts {
|
|
a.promptCmds[p.Name] = p.Content
|
|
}
|
|
if len(a.promptCmds) > 0 {
|
|
names := make([]string, 0, len(a.promptCmds))
|
|
for n := range a.promptCmds {
|
|
names = append(names, n)
|
|
}
|
|
a.logger.Info("prompt-commands loaded", "count", len(a.promptCmds), "names", names)
|
|
}
|
|
}
|
|
|
|
// SetBus attaches the agent to the inter-agent bus for orchestration.
|
|
// Must be called before Run().
|
|
func (a *Agent) SetBus(b *bus.Bus) {
|
|
a.agentBus = b
|
|
}
|
|
|
|
// SetInterceptor configures the listener to skip events in orchestrated rooms.
|
|
func (a *Agent) SetInterceptor(fn matrix.InterceptFunc) {
|
|
a.listener.SetInterceptor(fn)
|
|
}
|
|
|
|
// SetMembershipNotify registers a callback for room membership changes.
|
|
func (a *Agent) SetMembershipNotify(fn matrix.MembershipNotifyFunc) {
|
|
a.listener.SetMembershipNotify(fn)
|
|
}
|
|
|
|
// RawMatrixClient returns the underlying *mautrix.Client for room scanning.
|
|
func (a *Agent) RawMatrixClient() *mautrix.Client {
|
|
return a.matrix.Raw()
|
|
}
|
|
|
|
// Stop cancels this agent's individual context, causing Run to return.
|
|
// Safe to call multiple times.
|
|
func (a *Agent) Stop() {
|
|
if a.cancel != nil {
|
|
a.cancel()
|
|
}
|
|
}
|
|
|
|
// Done returns a channel that is closed when Run has returned.
|
|
func (a *Agent) Done() <-chan struct{} {
|
|
return a.done
|
|
}
|
|
|
|
// Run starts the agent sync loop. Blocks until ctx is cancelled.
|
|
func (a *Agent) Run(ctx context.Context) error {
|
|
ctx, a.cancel = context.WithCancel(ctx)
|
|
defer close(a.done)
|
|
|
|
if a.cryptoStore != nil {
|
|
defer a.cryptoStore.Close()
|
|
}
|
|
if a.memStore != nil {
|
|
defer a.memStore.Close()
|
|
}
|
|
if a.knowledgeStore != nil {
|
|
defer a.knowledgeStore.Close()
|
|
}
|
|
a.logger.Info("agent starting",
|
|
"id", a.cfg.Agent.ID,
|
|
"name", a.cfg.Agent.Name,
|
|
"tools", a.toolReg.Names(),
|
|
)
|
|
|
|
// Set presence to online
|
|
if err := a.matrix.SetPresence(ctx, event.PresenceOnline); err != nil {
|
|
a.logger.Warn("failed to set presence online", "err", err)
|
|
}
|
|
defer func() {
|
|
// Use background context since ctx is already cancelled at shutdown
|
|
offlineCtx := context.Background()
|
|
if err := a.matrix.SetPresence(offlineCtx, event.PresenceOffline); err != nil {
|
|
a.logger.Warn("failed to set presence offline", "err", err)
|
|
}
|
|
}()
|
|
|
|
// Start bus listener if connected to the orchestration bus
|
|
if a.agentBus != nil {
|
|
ch := a.agentBus.Subscribe(bus.AgentID(a.cfg.Agent.ID))
|
|
go a.listenBus(ctx, ch)
|
|
a.logger.Info("bus listener started")
|
|
}
|
|
|
|
// Start cron scheduler in background goroutine (blocks until ctx cancelled)
|
|
if a.scheduler != nil {
|
|
go a.scheduler.Start(ctx)
|
|
}
|
|
|
|
return a.listener.Run(ctx)
|
|
}
|
|
|
|
// listenBus processes messages from the inter-agent bus.
|
|
func (a *Agent) listenBus(ctx context.Context, ch <-chan bus.AgentMessage) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case msg, ok := <-ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
if msg.Kind == bus.KindTask {
|
|
a.handleTaskEvent(ctx, msg)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleTaskEvent processes a task delegated by the orchestrator.
|
|
// The bot generates a response and sends it both to Matrix and back via bus.
|
|
func (a *Agent) handleTaskEvent(ctx context.Context, msg bus.AgentMessage) {
|
|
taskJSON, ok := msg.Payload["task_json"]
|
|
if !ok {
|
|
a.logger.Error("task message missing task_json payload")
|
|
return
|
|
}
|
|
|
|
task, err := orchestration.UnmarshalTaskEvent(taskJSON)
|
|
if err != nil {
|
|
a.logger.Error("failed to unmarshal task event", "err", err)
|
|
return
|
|
}
|
|
|
|
a.logger.Info("handling orchestrated task",
|
|
"task_id", task.TaskID,
|
|
"room", task.TargetRoomID,
|
|
"sender", task.OriginalSender,
|
|
"iteration", task.Iteration,
|
|
)
|
|
|
|
roomID := task.TargetRoomID
|
|
|
|
// Update room context for memory tools
|
|
a.roomCtx.Set(roomID)
|
|
|
|
if a.cfg.Personality.Behavior.TypingIndicator {
|
|
_ = a.matrix.SendTyping(ctx, roomID, true)
|
|
defer a.matrix.SendTyping(ctx, roomID, false)
|
|
}
|
|
|
|
// Build a synthetic MessageContext from the task
|
|
msgCtx := decision.MessageContext{
|
|
SenderID: task.OriginalSender,
|
|
RoomID: roomID,
|
|
Content: task.OriginalQuestion,
|
|
IsDirectMsg: false,
|
|
IsMention: true, // treat orchestrated tasks like mentions
|
|
}
|
|
|
|
// If there are previous responses, prepend context
|
|
if len(task.PreviousResponses) > 0 {
|
|
var context string
|
|
for _, pr := range task.PreviousResponses {
|
|
context += fmt.Sprintf("[Previous response from %s]: %s\n\n", pr.BotID, pr.Text)
|
|
}
|
|
msgCtx.Content = context + "Original question: " + task.OriginalQuestion +
|
|
"\n\nPlease provide an improved or complementary answer."
|
|
}
|
|
|
|
// Sanitize orchestrated input
|
|
sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID)
|
|
if rejected {
|
|
a.logger.Warn("orchestrated task rejected by sanitizer",
|
|
"task_id", task.TaskID, "sender", task.OriginalSender)
|
|
_ = a.matrix.SendMarkdown(ctx, roomID, "El mensaje fue rechazado por el filtro de seguridad.")
|
|
return
|
|
}
|
|
msgCtx.Content = sanitized
|
|
|
|
// Load memory and run LLM
|
|
a.ensureWindowLoaded(ctx, roomID)
|
|
a.appendToWindow(roomID, coretypes.Message{
|
|
Role: coretypes.RoleUser, Content: msgCtx.Content,
|
|
})
|
|
|
|
reply, err := a.runLLM(ctx, msgCtx, roomID)
|
|
|
|
// Build the result to send back via bus
|
|
result := orchestration.TaskResult{
|
|
TaskID: task.TaskID,
|
|
BotID: a.cfg.Agent.ID,
|
|
}
|
|
|
|
if err != nil {
|
|
a.logger.Error("LLM error during orchestrated task", "err", err)
|
|
result.Error = err.Error()
|
|
reply = "Sorry, I encountered an error."
|
|
} else {
|
|
result.Text = reply
|
|
// Persist assistant reply
|
|
a.appendToWindow(roomID, coretypes.Message{
|
|
Role: coretypes.RoleAssistant, Content: reply,
|
|
})
|
|
a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply)
|
|
}
|
|
|
|
// Send reply to Matrix room
|
|
if sendErr := a.matrix.SendMarkdown(ctx, roomID, reply); sendErr != nil {
|
|
a.logger.Error("failed to send orchestrated reply to Matrix", "err", sendErr)
|
|
}
|
|
|
|
// Send result back to orchestrator via bus
|
|
resultJSON, marshalErr := orchestration.MarshalTaskResult(result)
|
|
if marshalErr != nil {
|
|
a.logger.Error("failed to marshal task result", "err", marshalErr)
|
|
return
|
|
}
|
|
|
|
replyMsg := bus.AgentMessage{
|
|
From: bus.AgentID(a.cfg.Agent.ID),
|
|
To: msg.From,
|
|
Kind: bus.KindTaskResult,
|
|
Payload: map[string]string{"result_json": resultJSON},
|
|
}
|
|
|
|
if busErr := a.agentBus.Reply(task.TaskID, replyMsg); busErr != nil {
|
|
a.logger.Error("failed to send task result via bus", "err", busErr)
|
|
}
|
|
}
|
|
|
|
// handleEvent is called by the matrix Listener for each filtered incoming event.
|
|
func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, evt *event.Event) {
|
|
a.logger.Debug("handling event",
|
|
"sender", msgCtx.SenderID,
|
|
"is_dm", msgCtx.IsDirectMsg,
|
|
"is_mention", msgCtx.IsMention,
|
|
"command", msgCtx.Command,
|
|
)
|
|
|
|
roomID := evt.RoomID.String()
|
|
|
|
// Update room context for memory tools
|
|
a.roomCtx.Set(roomID)
|
|
|
|
if a.cfg.Personality.Behavior.TypingIndicator {
|
|
_ = a.matrix.SendTyping(ctx, roomID, true)
|
|
defer a.matrix.SendTyping(ctx, roomID, false)
|
|
}
|
|
|
|
// ── Command flow ─────────────────────────────────────────────────
|
|
// Commands (!xxx) always resolve before rules or LLM. Never reach the LLM.
|
|
// Priority: built-in → unknown (agent-specific commands can be added via RegisterCommand).
|
|
if msgCtx.Command != "" {
|
|
a.logger.Info("command_received",
|
|
"command", msgCtx.Command,
|
|
"sender", msgCtx.SenderID,
|
|
"room", roomID,
|
|
"args", msgCtx.Args,
|
|
)
|
|
|
|
// Resolve aliases
|
|
cmdName := msgCtx.Command
|
|
if canonical, ok := a.cmdAliases[cmdName]; ok {
|
|
cmdName = canonical
|
|
}
|
|
|
|
if handler, ok := a.commands[cmdName]; ok {
|
|
// RBAC check for commands
|
|
if !a.acl.CanDo(msgCtx.SenderID, "command:"+cmdName) {
|
|
a.logger.Info("command_denied", "command", cmdName, "sender", msgCtx.SenderID)
|
|
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
|
|
"No tienes permisos para ejecutar este comando.")
|
|
return
|
|
}
|
|
a.logger.Info("command_executed", "command", cmdName)
|
|
reply := handler(ctx, msgCtx)
|
|
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, reply)
|
|
return
|
|
}
|
|
|
|
// Prompt-command: expand .md content and pass to LLM
|
|
if content, ok := a.promptCmds[cmdName]; ok {
|
|
a.logger.Info("prompt_command_expanded", "command", cmdName)
|
|
msgCtx.Content = command.ExpandPrompt(content, msgCtx.Args)
|
|
msgCtx.Command = ""
|
|
msgCtx.Args = nil
|
|
// Fall through to rules/LLM flow below
|
|
} else {
|
|
// Unknown command — never falls through to rules or LLM
|
|
a.logger.Info("command_unknown", "command", msgCtx.Command)
|
|
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
|
|
fmt.Sprintf("Comando desconocido: `!%s`. Usa `!help` para ver comandos disponibles.", msgCtx.Command))
|
|
return
|
|
}
|
|
}
|
|
|
|
// ── Non-command flow ─────────────────────────────────────────────
|
|
// RBAC check for LLM access ("ask" action)
|
|
if !a.acl.CanDo(msgCtx.SenderID, "ask") {
|
|
a.logger.Info("ask_denied", "sender", msgCtx.SenderID)
|
|
_ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID,
|
|
"No tienes permisos para interactuar con este agente.")
|
|
return
|
|
}
|
|
|
|
actions := decision.Evaluate(msgCtx, a.rules)
|
|
a.logger.Debug("rules evaluated", "matched_actions", len(actions))
|
|
|
|
// If no rules matched and the message mentions the bot or is a DM, use LLM.
|
|
if len(actions) == 0 && (msgCtx.IsMention || msgCtx.IsDirectMsg) {
|
|
if a.llm == nil {
|
|
// Simple bot: no LLM, ignore non-command messages
|
|
a.logger.Debug("no LLM configured, ignoring non-command message")
|
|
return
|
|
}
|
|
a.logger.Debug("no rules matched, falling back to LLM")
|
|
actions = []decision.Action{{
|
|
Kind: decision.ActionKindLLM,
|
|
LLM: &decision.LLMAction{ContextKey: msgCtx.RoomID},
|
|
}}
|
|
}
|
|
|
|
if len(actions) == 0 {
|
|
a.logger.Debug("no actions, ignoring message",
|
|
"is_dm", msgCtx.IsDirectMsg,
|
|
"is_mention", msgCtx.IsMention,
|
|
)
|
|
return
|
|
}
|
|
|
|
a.executeActions(ctx, roomID, msgCtx, actions)
|
|
}
|
|
|
|
// executeActions expands LLM actions and runs the effects runner.
|
|
func (a *Agent) executeActions(ctx context.Context, roomID string, msgCtx decision.MessageContext, actions []decision.Action) {
|
|
// Auto-thread: if configured and message is not already in a thread,
|
|
// start a new thread rooted at the user's message.
|
|
if a.cfg.Matrix.Threads.AutoThread && msgCtx.ThreadID == "" && msgCtx.EventID != "" {
|
|
msgCtx.ThreadID = msgCtx.EventID
|
|
}
|
|
|
|
// Sanitize user input before sending to LLM
|
|
sanitized, rejected := a.sanitizeInput(msgCtx.Content, roomID, msgCtx.SenderID)
|
|
if rejected {
|
|
a.runner.Execute(ctx, roomID, []decision.Action{{
|
|
Kind: decision.ActionKindReply,
|
|
Reply: &decision.ReplyAction{Content: "Tu mensaje fue rechazado por el filtro de seguridad.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
|
|
}})
|
|
return
|
|
}
|
|
msgCtx.Content = sanitized
|
|
|
|
// Resolve memory key: use thread root as context key when inside a thread,
|
|
// so parallel threads in the same room have independent conversation windows.
|
|
memKey := roomID
|
|
if msgCtx.ThreadID != "" {
|
|
memKey = msgCtx.ThreadID
|
|
}
|
|
|
|
expanded := make([]decision.Action, 0, len(actions))
|
|
for _, act := range actions {
|
|
if act.Kind == decision.ActionKindLLM {
|
|
if a.llm == nil {
|
|
a.logger.Warn("LLM action requested but no LLM configured")
|
|
expanded = append(expanded, decision.Action{
|
|
Kind: decision.ActionKindReply,
|
|
Reply: &decision.ReplyAction{Content: "Este bot no tiene LLM configurado.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
|
|
})
|
|
continue
|
|
}
|
|
// Memory: load window + append user message before LLM call
|
|
a.ensureWindowLoaded(ctx, memKey)
|
|
a.appendToWindow(memKey, coretypes.Message{
|
|
Role: coretypes.RoleUser, Content: msgCtx.Content,
|
|
})
|
|
a.persistMessage(ctx, memKey, coretypes.RoleUser, msgCtx.Content)
|
|
|
|
reply, err := a.runLLM(ctx, msgCtx, memKey)
|
|
if err != nil {
|
|
a.logger.Error("llm error", "err", err)
|
|
expanded = append(expanded, decision.Action{
|
|
Kind: decision.ActionKindReply,
|
|
Reply: &decision.ReplyAction{Content: "Sorry, I encountered an error.", InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
|
|
})
|
|
} else {
|
|
expanded = append(expanded, decision.Action{
|
|
Kind: decision.ActionKindReply,
|
|
Reply: &decision.ReplyAction{Content: reply, InReplyTo: msgCtx.EventID, ThreadID: msgCtx.ThreadID},
|
|
})
|
|
|
|
// Memory: append assistant reply after LLM call
|
|
a.appendToWindow(memKey, coretypes.Message{
|
|
Role: coretypes.RoleAssistant, Content: reply,
|
|
})
|
|
a.persistMessage(ctx, memKey, coretypes.RoleAssistant, reply)
|
|
}
|
|
} else {
|
|
expanded = append(expanded, act)
|
|
}
|
|
}
|
|
|
|
a.runner.Execute(ctx, roomID, expanded)
|
|
}
|
|
|
|
func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext, memKey string) (string, error) {
|
|
a.logger.Debug("calling LLM",
|
|
"model", a.cfg.LLM.Primary.Model,
|
|
"provider", a.cfg.LLM.Primary.Provider,
|
|
)
|
|
|
|
// Load system prompt from file if configured, else use description
|
|
systemPrompt := a.cfg.Agent.Description
|
|
|
|
// Build messages: conversation history from window (includes current user msg)
|
|
messages := a.getWindowMessages(memKey)
|
|
if len(messages) == 0 {
|
|
// Fallback if memory is disabled: just the current message
|
|
messages = []coretypes.Message{
|
|
{Role: coretypes.RoleUser, Content: msgCtx.Content},
|
|
}
|
|
}
|
|
|
|
// Build tool specs for the LLM if tool_use is enabled
|
|
var llmTools []coretypes.ToolSpec
|
|
if a.cfg.LLM.ToolUse.Enabled && a.toolReg.Len() > 0 {
|
|
llmTools = a.toolReg.ToLLMSpecs()
|
|
a.logger.Debug("tools available for LLM", "count", len(llmTools))
|
|
}
|
|
|
|
maxIter := a.cfg.LLM.ToolUse.MaxIterations
|
|
if maxIter <= 0 {
|
|
maxIter = defaultMaxToolIterations
|
|
}
|
|
|
|
// Tool-use loop: call LLM → execute tools → feed results back → repeat
|
|
for i := 0; i < maxIter; i++ {
|
|
req := coretypes.CompletionRequest{
|
|
Model: a.cfg.LLM.Primary.Model,
|
|
MaxTokens: a.cfg.LLM.Primary.MaxTokens,
|
|
Temperature: a.cfg.LLM.Primary.Temperature,
|
|
SystemPrompt: systemPrompt,
|
|
Messages: messages,
|
|
Tools: llmTools,
|
|
}
|
|
|
|
resp, err := a.llm(ctx, req)
|
|
if err != nil {
|
|
a.logger.Error("LLM call failed", "model", req.Model, "err", err)
|
|
return "", err
|
|
}
|
|
|
|
a.logger.Debug("LLM responded",
|
|
"content_len", len(resp.Content),
|
|
"tool_calls", len(resp.ToolCalls),
|
|
"finish_reason", resp.FinishReason,
|
|
)
|
|
|
|
// No tool calls — return the text response
|
|
if len(resp.ToolCalls) == 0 {
|
|
return resp.Content, nil
|
|
}
|
|
|
|
// Append assistant message with tool calls to conversation
|
|
messages = append(messages, coretypes.Message{
|
|
Role: coretypes.RoleAssistant,
|
|
Content: resp.Content,
|
|
ToolCalls: resp.ToolCalls,
|
|
})
|
|
|
|
// Execute each tool and append results
|
|
for _, tc := range resp.ToolCalls {
|
|
a.logger.Info("executing tool",
|
|
"tool", tc.Name,
|
|
"call_id", tc.ID,
|
|
)
|
|
|
|
// RBAC check for tool execution
|
|
if !a.acl.CanDo(msgCtx.SenderID, "tool:"+tc.Name) {
|
|
a.logger.Info("tool_denied", "tool", tc.Name, "sender", msgCtx.SenderID)
|
|
messages = append(messages, coretypes.Message{
|
|
Role: coretypes.RoleTool,
|
|
Content: "error: permission denied for tool " + tc.Name,
|
|
ToolCallID: tc.ID,
|
|
})
|
|
continue
|
|
}
|
|
|
|
// Notify the room that a tool is being called (respect thread context)
|
|
toolNotice := fmt.Sprintf("🔨 <em>%s</em>", tc.Name)
|
|
if err := a.sendReply(ctx, msgCtx.RoomID, msgCtx.EventID, msgCtx.ThreadID, toolNotice); err != nil {
|
|
a.logger.Warn("failed to send tool call notice", "tool", tc.Name, "err", err)
|
|
}
|
|
|
|
result := a.toolReg.ExecuteForRoom(ctx, tc.Name, tc.Arguments, msgCtx.RoomID)
|
|
|
|
output := result.Output
|
|
if result.Err != nil {
|
|
output = fmt.Sprintf("error: %s", result.Err)
|
|
a.logger.Warn("tool execution error",
|
|
"tool", tc.Name,
|
|
"err", result.Err,
|
|
)
|
|
} else {
|
|
a.logger.Debug("tool executed",
|
|
"tool", tc.Name,
|
|
"output_len", len(output),
|
|
)
|
|
}
|
|
|
|
messages = append(messages, coretypes.Message{
|
|
Role: coretypes.RoleTool,
|
|
Content: output,
|
|
ToolCallID: tc.ID,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Max iterations reached — return whatever we have
|
|
a.logger.Warn("tool-use loop reached max iterations", "max", maxIter)
|
|
return "I've reached the maximum number of tool iterations. Here's what I found so far.", nil
|
|
}
|
|
|
|
// ── Memory helpers ───────────────────────────────────────────────────────
|
|
|
|
// ensureWindowLoaded loads the conversation window from SQLite on first access for a room.
|
|
func (a *Agent) ensureWindowLoaded(ctx context.Context, roomID string) {
|
|
a.windowsMu.Lock()
|
|
defer a.windowsMu.Unlock()
|
|
if _, ok := a.windows[roomID]; ok {
|
|
return
|
|
}
|
|
w := memory.NewWindow(a.windowSize)
|
|
if a.memStore != nil {
|
|
msgs, err := a.memStore.LoadMessages(ctx, a.cfg.Agent.ID, roomID, a.windowSize)
|
|
if err != nil {
|
|
a.logger.Warn("failed to load message history", "room", roomID, "err", err)
|
|
} else {
|
|
for _, m := range msgs {
|
|
w = w.Append(coretypes.Message{Role: m.Role, Content: m.Content})
|
|
}
|
|
if len(msgs) > 0 {
|
|
a.logger.Debug("loaded message history", "room", roomID, "count", len(msgs))
|
|
}
|
|
}
|
|
}
|
|
a.windows[roomID] = w
|
|
}
|
|
|
|
// appendToWindow adds a message to the in-memory conversation window.
|
|
func (a *Agent) appendToWindow(roomID string, msg coretypes.Message) {
|
|
a.windowsMu.Lock()
|
|
defer a.windowsMu.Unlock()
|
|
w, ok := a.windows[roomID]
|
|
if !ok {
|
|
w = memory.NewWindow(a.windowSize)
|
|
}
|
|
a.windows[roomID] = w.Append(msg)
|
|
}
|
|
|
|
// getWindowMessages returns a copy of the conversation window for a room.
|
|
func (a *Agent) getWindowMessages(roomID string) []coretypes.Message {
|
|
a.windowsMu.RLock()
|
|
defer a.windowsMu.RUnlock()
|
|
w, ok := a.windows[roomID]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return w.ToLLMMessages()
|
|
}
|
|
|
|
// persistMessage saves a message to the SQLite store (no-op if store is nil).
|
|
func (a *Agent) persistMessage(ctx context.Context, roomID string, role coretypes.Role, content string) {
|
|
if a.memStore == nil {
|
|
return
|
|
}
|
|
if err := a.memStore.SaveMessage(ctx, memory.HistoryMessage{
|
|
AgentID: a.cfg.Agent.ID,
|
|
RoomID: roomID,
|
|
Role: role,
|
|
Content: content,
|
|
}); err != nil {
|
|
a.logger.Warn("failed to persist message", "room", roomID, "err", err)
|
|
}
|
|
}
|
|
|
|
// sendReply sends a markdown reply that respects thread context.
|
|
// If threadID is non-empty, the reply is sent as part of that thread.
|
|
func (a *Agent) sendReply(ctx context.Context, roomID, eventID, threadID, markdown string) error {
|
|
if threadID != "" {
|
|
return a.matrix.SendThreadMarkdown(ctx, roomID, threadID, eventID, markdown)
|
|
}
|
|
return a.matrix.SendReplyMarkdown(ctx, roomID, eventID, markdown)
|
|
}
|
|
|
|
// parseSeverity converts a config string to sanitize.Severity.
|
|
func parseSeverity(s string) sanitize.Severity {
|
|
switch s {
|
|
case "high":
|
|
return sanitize.SeverityHigh
|
|
case "low":
|
|
return sanitize.SeverityLow
|
|
default:
|
|
return sanitize.SeverityMedium
|
|
}
|
|
}
|
|
|
|
// sanitizeInput runs prompt injection detection on the message content.
|
|
// Returns the (possibly modified) content and true if the message should be rejected.
|
|
func (a *Agent) sanitizeInput(content, roomID, senderID string) (string, bool) {
|
|
if a.sanitizeOpts == nil {
|
|
return content, false
|
|
}
|
|
|
|
result := sanitize.Sanitize(content, *a.sanitizeOpts)
|
|
|
|
for _, w := range result.Warnings {
|
|
a.logger.Warn("prompt_injection_detected",
|
|
"pattern", w.PatternName,
|
|
"severity", w.Severity,
|
|
"matched", w.Matched,
|
|
"sender", senderID,
|
|
"room", roomID,
|
|
)
|
|
}
|
|
|
|
return result.Output, result.Rejected
|
|
}
|
|
|
|
// resolveDataBase returns the base directory for agent runtime data.
|
|
// Priority: config storage.base_path > $AGENTS_DATA_DIR/<id> > agents/<id>/data
|
|
func resolveDataBase(cfg *config.AgentConfig) string {
|
|
if cfg.Storage.BasePath != "" {
|
|
return cfg.Storage.BasePath
|
|
}
|
|
if envDir := os.Getenv("AGENTS_DATA_DIR"); envDir != "" {
|
|
return filepath.Join(envDir, cfg.Agent.ID)
|
|
}
|
|
return filepath.Join("agents", cfg.Agent.ID, "data")
|
|
}
|
|
|
|
// buildToolRegistry creates a Registry with tools enabled in the agent's config.
|
|
func buildToolRegistry(
|
|
cfg *config.AgentConfig,
|
|
sshExec *ssh.Executor,
|
|
matrixClient *matrix.Client,
|
|
memStore memory.Store,
|
|
kStore *shellknowledge.FileStore,
|
|
roomCtx *toolmemory.RoomContext,
|
|
logger *slog.Logger,
|
|
) *tools.Registry {
|
|
reg := tools.NewRegistry(logger)
|
|
|
|
if cfg.Tools.HTTP.Enabled {
|
|
reg.Register(toolhttp.NewHTTPGet(cfg.Tools.HTTP))
|
|
reg.Register(toolhttp.NewHTTPPost(cfg.Tools.HTTP))
|
|
logger.Debug("registered http tools")
|
|
}
|
|
|
|
if cfg.Tools.SSH.Enabled {
|
|
reg.Register(toolssh.NewSSHCommand(cfg.Tools.SSH, sshExec))
|
|
logger.Debug("registered ssh tool")
|
|
}
|
|
|
|
if cfg.Tools.FileOps.Enabled {
|
|
reg.Register(toolfile.NewReadFile(cfg.Tools.FileOps))
|
|
logger.Debug("registered file tool")
|
|
}
|
|
|
|
// current_time is always available
|
|
reg.Register(toolclock.NewCurrentTime())
|
|
logger.Debug("registered current_time tool")
|
|
|
|
// weather tool is always available
|
|
reg.Register(toolweather.NewWeather())
|
|
logger.Debug("registered weather tool")
|
|
|
|
// matrix_send is always available
|
|
reg.Register(toolmatrix.NewMatrixSend(matrixClient, cfg.Tools.Matrix))
|
|
logger.Debug("registered matrix tool")
|
|
|
|
// Memory tools (memory_clear_context registered later since it needs the Agent)
|
|
if cfg.Tools.Memory.Enabled && memStore != nil {
|
|
reg.Register(toolmemory.NewMemorySave(cfg.Agent.ID, memStore))
|
|
reg.Register(toolmemory.NewMemoryRecall(cfg.Agent.ID, memStore))
|
|
reg.Register(toolmemory.NewMemoryForget(cfg.Agent.ID, memStore))
|
|
reg.Register(toolmemory.NewMemorySummary(cfg.Agent.ID, memStore))
|
|
logger.Debug("registered memory tools")
|
|
}
|
|
|
|
// Knowledge tools
|
|
if cfg.Tools.Knowledge.Enabled && kStore != nil {
|
|
reg.Register(toolknowledge.NewKnowledgeSearch(kStore))
|
|
reg.Register(toolknowledge.NewKnowledgeRead(kStore))
|
|
reg.Register(toolknowledge.NewKnowledgeWrite(kStore))
|
|
reg.Register(toolknowledge.NewKnowledgeList(kStore))
|
|
logger.Debug("registered knowledge tools")
|
|
}
|
|
|
|
return reg
|
|
}
|