Files
egutierrez 61606d450d feat(0144c): launcher wiring + adapter al tool-use loop LLM
Schema DeviceMeshConfig en AgentConfig. Adapter ToolsForLLM convierte
ToolSpec → tools.Tool transparente al LLM existente. URL via env var
override. tools_allowed filter. agent-wsl-lucas blank import en launcher.

LLM ve los tools como cualquier otra herramienta. Effects runner ya
soporta ActionKindDeviceMesh como fallback. Build + tests verdes.
2026-05-24 14:07:13 +02:00

415 lines
14 KiB
Go

package devagents
import (
"context"
"log/slog"
"os"
"path/filepath"
"time"
"github.com/enmanuel/agents/internal/config"
"github.com/enmanuel/agents/pkg/memory"
devicemeshtools "github.com/enmanuel/agents/pkg/tools/devicemesh"
shellknowledge "github.com/enmanuel/agents/shell/knowledge"
shellmcp "github.com/enmanuel/agents/shell/mcp"
shellskills "github.com/enmanuel/agents/shell/skills"
"github.com/enmanuel/agents/shell/ssh"
"github.com/enmanuel/agents/tools"
toolclock "github.com/enmanuel/agents/tools/clock"
toolfile "github.com/enmanuel/agents/tools/file"
toolhttp "github.com/enmanuel/agents/tools/http"
toolimdb "github.com/enmanuel/agents/tools/imdb"
toolknowledge "github.com/enmanuel/agents/tools/knowledgetools"
toolmatrix "github.com/enmanuel/agents/tools/matrix"
toolmcp "github.com/enmanuel/agents/tools/mcptools"
toolmemory "github.com/enmanuel/agents/tools/memorytools"
toolskills "github.com/enmanuel/agents/tools/skilltools"
toolssh "github.com/enmanuel/agents/tools/ssh"
toolweather "github.com/enmanuel/agents/tools/weather"
toolwikipedia "github.com/enmanuel/agents/tools/wikipedia"
toolexchange "github.com/enmanuel/agents/tools/exchange"
"github.com/enmanuel/agents/shell/matrix"
)
// toolDeps holds external subsystem instances needed by the tool registry.
type toolDeps struct {
kStore *shellknowledge.FileStore
sharedKStore *shellknowledge.FileStore
mcpManager *shellmcp.Manager
skillLoader *shellskills.Loader
skillExecutor *shellskills.Executor
}
// initToolDeps initializes knowledge stores, MCP manager, and skills loader
// based on the agent config. All results are optional (nil when disabled).
func initToolDeps(cfg *config.AgentConfig, dataBase string, logger *slog.Logger) toolDeps {
var deps toolDeps
// Knowledge store
if cfg.Tools.Knowledge.Enabled {
knowledgeDir := cfg.Tools.Knowledge.Dir
if knowledgeDir == "" {
knowledgeDir = filepath.Join("agents", cfg.Agent.ID, "knowledge")
}
knowledgeDBPath := filepath.Join(dataBase, "knowledge.db")
kStore, kErr := shellknowledge.New(knowledgeDir, knowledgeDBPath, logger)
if kErr != nil {
logger.Error("knowledge_store_init_failed", "err", kErr)
} else {
if syncErr := kStore.Sync(context.Background()); syncErr != nil {
logger.Error("knowledge_sync_failed", "err", syncErr)
}
deps.kStore = kStore
}
}
// Shared knowledge store
if cfg.Tools.SharedKnowledge.Enabled {
sharedDir := cfg.Tools.SharedKnowledge.Dir
if sharedDir == "" {
sharedDir = "knowledges"
}
sharedDBPath := cfg.Tools.SharedKnowledge.DBPath
if sharedDBPath == "" {
sharedDBPath = "knowledges/data/knowledge.db"
}
sharedKStore, skErr := shellknowledge.New(sharedDir, sharedDBPath, logger)
if skErr != nil {
logger.Error("shared_knowledge_store_init_failed", "err", skErr)
} else {
if syncErr := sharedKStore.Sync(context.Background()); syncErr != nil {
logger.Error("shared_knowledge_sync_failed", "err", syncErr)
}
logger.Info("shared knowledge enabled", "dir", sharedDir, "db", sharedDBPath)
deps.sharedKStore = sharedKStore
}
}
// MCP client manager — connects to external MCP servers
if cfg.Tools.MCP.Enabled && len(cfg.Tools.MCP.Servers) > 0 {
mcpManager, mcpErr := shellmcp.NewManager(context.Background(), cfg.Tools.MCP.Servers, logger)
if mcpErr != nil {
logger.Error("mcp_manager_init_failed", "err", mcpErr)
} else {
logger.Info("mcp manager initialized", "servers", len(cfg.Tools.MCP.Servers))
deps.mcpManager = mcpManager
}
}
// Skills loader
if cfg.Skills.Enabled {
skillsPath := cfg.Skills.SkillsPath
if skillsPath == "" {
skillsPath = "skills/"
}
deps.skillLoader = shellskills.NewLoader(skillsPath)
// Skills executor for scripts
allowedInterpreters := cfg.Tools.Skills.AllowedInterpreters
timeout := cfg.Skills.Timeout
if timeout == 0 {
timeout = 60 * time.Second
}
deps.skillExecutor = shellskills.NewExecutor(allowedInterpreters, timeout)
logger.Info("skills enabled", "path", skillsPath, "categories", cfg.Skills.Categories)
}
return deps
}
// initRateLimiter configures the rate limiter on the tool registry if enabled.
func initRateLimiter(cfg *config.AgentConfig, toolReg *tools.Registry, logger *slog.Logger) {
if !cfg.Security.ToolRateLimit.Enabled {
return
}
maxCalls := cfg.Security.ToolRateLimit.MaxCallsPerMin
if maxCalls <= 0 {
maxCalls = 10
}
rl := tools.NewRateLimiter(maxCalls, time.Minute)
toolReg.SetRateLimiter(rl)
cleanupInterval := cfg.Security.ToolRateLimit.CleanupIntervalS
if cleanupInterval <= 0 {
cleanupInterval = 60
}
go func() {
ticker := time.NewTicker(time.Duration(cleanupInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
rl.Cleanup()
}
}()
logger.Info("tool rate limiting enabled", "max_calls_per_min", maxCalls)
}
// buildToolRegistry creates a Registry with tools enabled in the agent's config.
func buildToolRegistry(
cfg *config.AgentConfig,
sshExec *ssh.Executor,
matrixClient *matrix.Client,
memStore memory.Store,
kStore *shellknowledge.FileStore,
sharedKStore *shellknowledge.FileStore,
mcpManager *shellmcp.Manager,
skillLoader *shellskills.Loader,
skillExecutor *shellskills.Executor,
roomCtx *toolmemory.RoomContext,
logger *slog.Logger,
) *tools.Registry {
reg := tools.NewRegistry(logger)
if cfg.Tools.HTTP.Enabled {
reg.Register(toolhttp.NewHTTPGet(cfg.Tools.HTTP))
reg.Register(toolhttp.NewHTTPPost(cfg.Tools.HTTP))
logger.Debug("registered http tools")
}
if cfg.Tools.SSH.Enabled {
reg.Register(toolssh.NewSSHCommand(cfg.Tools.SSH, sshExec))
logger.Debug("registered ssh tool")
}
if cfg.Tools.FileOps.Enabled {
reg.Register(toolfile.NewReadFile(cfg.Tools.FileOps))
reg.Register(toolfile.NewListDirectory(cfg.Tools.FileOps))
if !cfg.Tools.FileOps.ReadOnly {
reg.Register(toolfile.NewWriteFile(cfg.Tools.FileOps))
reg.Register(toolfile.NewAppendFile(cfg.Tools.FileOps))
reg.Register(toolfile.NewDeleteFile(cfg.Tools.FileOps))
}
logger.Debug("registered file tools")
}
// current_time is always available
reg.Register(toolclock.NewCurrentTime())
logger.Debug("registered current_time tool")
// weather tool is always available
reg.Register(toolweather.NewWeather())
logger.Debug("registered weather tool")
// wikipedia_search tool is always available
reg.Register(toolwikipedia.NewWikipediaSearch())
logger.Debug("registered wikipedia_search tool")
// imdb tool (enabled via config)
if cfg.Tools.IMDb.Enabled {
reg.Register(toolimdb.NewIMDbSearch(cfg.Tools.IMDb))
logger.Debug("registered imdb tool")
}
// exchange rate tools (enabled via config)
if cfg.Tools.ExchangeRate.Enabled {
if t, err := toolexchange.NewExchangeRateGet(cfg.Tools.ExchangeRate); err != nil {
logger.Warn("exchange_rate_get disabled: API key not configured", "err", err)
} else {
reg.Register(t)
}
if t, err := toolexchange.NewExchangeRateConvert(cfg.Tools.ExchangeRate); err != nil {
logger.Warn("exchange_rate_convert disabled: API key not configured", "err", err)
} else {
reg.Register(t)
}
if t, err := toolexchange.NewExchangeRateList(cfg.Tools.ExchangeRate); err != nil {
logger.Warn("exchange_rate_list disabled: API key not configured", "err", err)
} else {
reg.Register(t)
}
if t, err := toolexchange.NewExchangeRateHistorical(cfg.Tools.ExchangeRate); err != nil {
logger.Warn("exchange_rate_historical disabled: API key not configured", "err", err)
} else {
reg.Register(t)
}
logger.Debug("registered exchange rate tools")
}
// matrix_send is always available
reg.Register(toolmatrix.NewMatrixSend(matrixClient, cfg.Tools.Matrix))
logger.Debug("registered matrix tool")
// Memory tools (memory_clear_context registered later since it needs the Agent)
if cfg.Tools.Memory.Enabled && memStore != nil {
reg.Register(toolmemory.NewMemorySave(cfg.Agent.ID, memStore))
reg.Register(toolmemory.NewMemoryRecall(cfg.Agent.ID, memStore))
reg.Register(toolmemory.NewMemoryForget(cfg.Agent.ID, memStore))
reg.Register(toolmemory.NewMemorySummary(cfg.Agent.ID, memStore))
logger.Debug("registered memory tools")
}
// Knowledge tools
if cfg.Tools.Knowledge.Enabled && kStore != nil {
reg.Register(toolknowledge.NewKnowledgeSearch(kStore))
reg.Register(toolknowledge.NewKnowledgeRead(kStore))
reg.Register(toolknowledge.NewKnowledgeWrite(kStore))
reg.Register(toolknowledge.NewKnowledgeList(kStore))
logger.Debug("registered knowledge tools")
}
// Shared knowledge tools
if cfg.Tools.SharedKnowledge.Enabled && sharedKStore != nil {
sharedTools := toolknowledge.NewSharedKnowledgeTools(sharedKStore)
for _, tool := range sharedTools {
reg.Register(tool)
}
logger.Debug("registered shared knowledge tools", "count", len(sharedTools))
}
// MCP tools — register tools from all connected MCP servers
if mcpManager != nil {
for serverName, mcpClient := range mcpManager.AllClients() {
// Find the config for this server to get prefix, filter, timeout
var serverCfg *config.MCPServerCfg
for i := range cfg.Tools.MCP.Servers {
if cfg.Tools.MCP.Servers[i].Name == serverName {
serverCfg = &cfg.Tools.MCP.Servers[i]
break
}
}
if serverCfg == nil {
logger.Warn("no config found for MCP server", "name", serverName)
continue
}
// Convert and register MCP tools
mcpTools := toolmcp.FromMCPServer(mcpClient, serverCfg.Prefix, serverCfg.Tools, serverCfg.Timeout, logger)
for _, tool := range mcpTools {
reg.Register(tool)
}
logger.Debug("registered MCP tools", "server", serverName, "count", len(mcpTools))
}
}
// Skills tools — register skill search, load, read, and run tools
if skillLoader != nil {
reg.Register(toolskills.NewSkillSearch(skillLoader, cfg.Skills.Categories))
reg.Register(toolskills.NewSkillLoad(skillLoader))
reg.Register(toolskills.NewSkillReadResource(skillLoader))
if skillExecutor != nil {
reg.Register(toolskills.NewSkillRunScript(skillLoader, skillExecutor))
}
logger.Debug("registered skills tools")
}
// Device-mesh tools — exposed when the agent's config has a populated
// `device_mesh:` block with enabled=true. The builtin catalog (issue 0144
// §2.1) is filtered by Mode and then narrowed by ToolsAllowed; each
// surviving spec is adapted to a tools.Tool whose Exec routes through
// the devicemesh.ToolRegistry (validate → ArgMapping → HTTP dispatch →
// ResultMapping). See pkg/tools/devicemesh/adapter.go.
if dmReg := buildDeviceMeshRegistry(cfg, logger); dmReg != nil {
for _, t := range devicemeshtools.ToolsForLLM(dmReg) {
reg.Register(t)
}
logger.Info("device_mesh tools registered",
"host", cfg.DeviceMesh.ResolvedHost(),
"mode", normalizeMeshMode(cfg.DeviceMesh.Mode),
"count", dmReg.Len(),
"names", dmReg.Names(),
)
}
return reg
}
// buildDeviceMeshRegistry constructs the per-agent devicemesh.ToolRegistry
// from cfg.DeviceMesh and returns it ready to be adapted. Returns nil when
// the block is absent, disabled, or yields zero tools so the caller can
// skip registration cleanly. Pure(-ish) — only side effect is os.Getenv
// for the URL override; the rest is pure data shuffling.
func buildDeviceMeshRegistry(cfg *config.AgentConfig, logger *slog.Logger) *devicemeshtools.ToolRegistry {
if cfg == nil || cfg.DeviceMesh == nil || !cfg.DeviceMesh.Enabled {
return nil
}
dm := cfg.DeviceMesh
// Resolve the device_agent URL: env override wins when present and
// non-empty; otherwise fall back to the literal URL from YAML. This
// keeps endpoints out of git while staying explicit.
url := dm.DeviceAgentURL
if dm.URLEnv != "" {
if v := os.Getenv(dm.URLEnv); v != "" {
url = v
}
}
if url == "" {
logger.Warn("device_mesh enabled but no URL resolved (neither device_agent_url nor URLEnv)",
"url_env", dm.URLEnv,
"host", dm.ResolvedHost(),
)
return nil
}
client := devicemeshtools.NewClient(url)
if t := dm.ResolvedTimeoutSeconds(); t > 0 {
client.Timeout = time.Duration(t) * time.Second
}
mode := normalizeMeshMode(dm.Mode)
reg := devicemeshtools.NewToolRegistry(client)
registered := devicemeshtools.RegisterBuiltins(reg, mode)
logger.Debug("device_mesh builtins registered", "mode", mode, "count", len(registered), "names", registered)
// Narrow by tools_allowed if the config asks for it. The filter is a
// pure transform — same Client, fewer specs.
if len(dm.ToolsAllowed) > 0 {
filtered := devicemeshtools.FilterByAllowed(reg, dm.ToolsAllowed)
// Warn on names that the config asked for but the catalog does not
// provide — typical drift between template and code after a new
// builtin lands.
present := make(map[string]bool, len(registered))
for _, n := range registered {
present[n] = true
}
for _, n := range dm.ToolsAllowed {
if !present[n] {
logger.Warn("device_mesh tools_allowed lists unknown tool",
"name", n,
"mode", mode,
)
}
}
reg = filtered
}
if reg.Len() == 0 {
logger.Warn("device_mesh registry empty after filter — skipping",
"host", dm.ResolvedHost(),
)
return nil
}
return reg
}
// normalizeMeshMode maps the YAML "mode" string to the RegistrationMode
// enum, defaulting to ModeUser. Pure function — used by both the registry
// builder and tests.
func normalizeMeshMode(s string) devicemeshtools.RegistrationMode {
switch s {
case "sudo":
return devicemeshtools.ModeSudo
case "all":
return devicemeshtools.ModeAll
case "user", "":
return devicemeshtools.ModeUser
default:
return devicemeshtools.ModeUser
}
}
// resolveDataBase returns the base directory for agent runtime data.
// Priority: config storage.base_path > $AGENTS_DATA_DIR/<id> > <config-dir>/data
func resolveDataBase(cfg *config.AgentConfig) string {
if cfg.Storage.BasePath != "" {
return cfg.Storage.BasePath
}
if envDir := os.Getenv("AGENTS_DATA_DIR"); envDir != "" {
return filepath.Join(envDir, cfg.Agent.ID)
}
if cfg.ConfigDir != "" {
return filepath.Join(cfg.ConfigDir, "data")
}
return filepath.Join("agents", cfg.Agent.ID, "data")
}