docs: crear issues 0036-0041 — nuevas features del sistema
Issues planificados: - 0036: Claude Code streaming de progreso en Matrix - 0037: Agente que crea otros agentes/bots via Matrix - 0038: Webapps y dashboards embebidos en Element via widgets - 0039: Recordatorios dinámicos y crons que invocan agentes - 0040: Soporte para mensajes de voz (audio → STT) - 0041: Videollamadas con agentes via LiveKit Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,414 @@
|
||||
// Package agents defines the Agent runtime that ties core and shell together.
|
||||
package agents
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"maunium.net/go/mautrix"
|
||||
"maunium.net/go/mautrix/event"
|
||||
|
||||
"github.com/enmanuel/agents/internal/config"
|
||||
"github.com/enmanuel/agents/pkg/acl"
|
||||
"github.com/enmanuel/agents/pkg/command"
|
||||
"github.com/enmanuel/agents/pkg/decision"
|
||||
coretypes "github.com/enmanuel/agents/pkg/llm"
|
||||
"github.com/enmanuel/agents/pkg/memory"
|
||||
"github.com/enmanuel/agents/pkg/personality"
|
||||
"github.com/enmanuel/agents/pkg/sanitize"
|
||||
"github.com/enmanuel/agents/shell/audit"
|
||||
"github.com/enmanuel/agents/shell/bus"
|
||||
shellcron "github.com/enmanuel/agents/shell/cron"
|
||||
"github.com/enmanuel/agents/shell/effects"
|
||||
shellknowledge "github.com/enmanuel/agents/shell/knowledge"
|
||||
"github.com/enmanuel/agents/shell/matrix"
|
||||
shellmcp "github.com/enmanuel/agents/shell/mcp"
|
||||
shellskills "github.com/enmanuel/agents/shell/skills"
|
||||
"github.com/enmanuel/agents/shell/ssh"
|
||||
"github.com/enmanuel/agents/tools"
|
||||
toolmemory "github.com/enmanuel/agents/tools/memorytools"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMaxToolIterations = 5
|
||||
defaultWindowSize = 20
|
||||
)
|
||||
|
||||
// Option configures optional Agent behaviour.
|
||||
type Option func(*Agent)
|
||||
|
||||
// WithLogDir sets the base directory for JSONL logs (used by !metrics command).
|
||||
func WithLogDir(dir string) Option {
|
||||
return func(a *Agent) { a.logDir = dir }
|
||||
}
|
||||
|
||||
// CommandHandler executes a built-in command and returns the response text.
|
||||
type CommandHandler func(ctx context.Context, msgCtx decision.MessageContext) string
|
||||
|
||||
// Agent is the assembled runtime: pure core + impure shell.
|
||||
type Agent struct {
|
||||
cfg *config.AgentConfig
|
||||
personality personality.Personality
|
||||
rules []decision.Rule
|
||||
llm coretypes.CompleteFunc // nil when no LLM configured (simple_bot)
|
||||
matrix *matrix.Client
|
||||
sender effects.MatrixSender // used by sendReply; same object as matrix in production
|
||||
runner *effects.Runner
|
||||
listener *matrix.Listener
|
||||
toolReg *tools.Registry
|
||||
logger *slog.Logger
|
||||
cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown
|
||||
mcpManager *shellmcp.Manager // nil when MCP client is disabled
|
||||
|
||||
// Lifecycle — cancel stops this agent individually; done is closed when Run returns.
|
||||
cancel context.CancelFunc
|
||||
done chan struct{}
|
||||
|
||||
// Access control
|
||||
acl acl.ACL
|
||||
|
||||
// Commands — handlers keyed by canonical name; cmdAliases maps alias → canonical
|
||||
commands map[string]CommandHandler
|
||||
cmdAliases map[string]string // alias → canonical name
|
||||
customSpecs []command.Spec // specs from RegisterCommand (for !help)
|
||||
startTime time.Time
|
||||
|
||||
// Memory
|
||||
windows map[string]memory.Window
|
||||
windowsMu sync.RWMutex
|
||||
memStore memory.Store // nil when memory is disabled
|
||||
windowSize int
|
||||
roomCtx *toolmemory.RoomContext
|
||||
|
||||
// Prompt-commands — loaded from prompts/*.md at startup
|
||||
promptCmds map[string]string // name → prompt content
|
||||
|
||||
// Knowledge store — non-nil when knowledge is enabled
|
||||
knowledgeStore *shellknowledge.FileStore
|
||||
|
||||
// Shared knowledge store — non-nil when shared_knowledge is enabled
|
||||
sharedKnowledgeStore *shellknowledge.FileStore
|
||||
|
||||
// Skills loader — non-nil when skills are enabled
|
||||
skillLoader *shellskills.Loader
|
||||
|
||||
// Sanitization options — nil when sanitization is disabled
|
||||
sanitizeOpts *sanitize.Options
|
||||
|
||||
// Bus — set via SetBus() when running under the unified launcher
|
||||
agentBus *bus.Bus
|
||||
|
||||
// Scheduler — nil when no schedules are configured
|
||||
scheduler *shellcron.Scheduler
|
||||
|
||||
// Audit writer — nil when audit is disabled
|
||||
auditWriter *audit.Writer
|
||||
|
||||
// LogDir — base directory for JSONL logs (used by !metrics)
|
||||
logDir string
|
||||
}
|
||||
|
||||
// New assembles an Agent from its config, rules, pre-resolved ACL, and logger.
|
||||
// The ACL is resolved externally (e.g. from security/ YAML files) and injected here.
|
||||
// Pass acl.ACL{} (empty) for open access (no restrictions).
|
||||
// logDir is the base directory for JSONL logs (used by !metrics command); empty disables metrics.
|
||||
func New(cfg *config.AgentConfig, rules []decision.Rule, agentACL acl.ACL, logger *slog.Logger, opts ...Option) (*Agent, error) {
|
||||
// Matrix client
|
||||
matrixClient, err := matrix.New(cfg.Matrix)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("matrix client: %w", err)
|
||||
}
|
||||
|
||||
// E2EE — initialize before the sync loop starts
|
||||
cryptoStore, err := initCrypto(cfg, matrixClient, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// SSH executor
|
||||
sshExec := ssh.NewExecutor(cfg.SSH, logger)
|
||||
|
||||
// LLM client — optional; if no provider is configured, the agent runs as simple_bot
|
||||
llmFunc, err := initLLM(cfg, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Effects runner
|
||||
runner := effects.NewRunner(matrixClient, sshExec, logger)
|
||||
|
||||
// Resolve base data path for this agent
|
||||
dataBase := resolveDataBase(cfg)
|
||||
logger.Debug("data base path", "path", dataBase)
|
||||
|
||||
// Memory subsystem
|
||||
memInit, err := initMemoryStore(cfg.Memory.Enabled, cfg.Memory.WindowSize, cfg.Memory.DBPath, dataBase, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Tool dependencies (knowledge, MCP, skills)
|
||||
deps := initToolDeps(cfg, dataBase, logger)
|
||||
|
||||
if !agentACL.Empty() {
|
||||
logger.Info("acl enabled (centralized security policy)")
|
||||
}
|
||||
|
||||
// Tool registry — register tools enabled in config
|
||||
roomCtx := &toolmemory.RoomContext{}
|
||||
toolReg := buildToolRegistry(cfg, sshExec, matrixClient, memInit.store, deps.kStore, deps.sharedKStore, deps.mcpManager, deps.skillLoader, deps.skillExecutor, roomCtx, logger)
|
||||
|
||||
// Rate limiting for tools
|
||||
initRateLimiter(cfg, toolReg, logger)
|
||||
|
||||
a := &Agent{
|
||||
cfg: cfg,
|
||||
acl: agentACL,
|
||||
personality: personality.FromConfig(cfg.Personality),
|
||||
rules: rules,
|
||||
llm: llmFunc,
|
||||
matrix: matrixClient,
|
||||
sender: matrixClient,
|
||||
runner: runner,
|
||||
toolReg: toolReg,
|
||||
logger: logger,
|
||||
cryptoStore: cryptoStore,
|
||||
mcpManager: deps.mcpManager,
|
||||
done: make(chan struct{}),
|
||||
commands: make(map[string]CommandHandler),
|
||||
cmdAliases: command.BuiltinNames(),
|
||||
startTime: time.Now(),
|
||||
windows: make(map[string]memory.Window),
|
||||
memStore: memInit.store,
|
||||
knowledgeStore: deps.kStore,
|
||||
sharedKnowledgeStore: deps.sharedKStore,
|
||||
skillLoader: deps.skillLoader,
|
||||
windowSize: memInit.windowSize,
|
||||
roomCtx: roomCtx,
|
||||
}
|
||||
|
||||
// Apply optional configuration
|
||||
for _, opt := range opts {
|
||||
opt(a)
|
||||
}
|
||||
|
||||
// Initialize audit writer if enabled
|
||||
if cfg.Security.Audit.Enabled {
|
||||
var matrixSender audit.MatrixSender
|
||||
if cfg.Security.Audit.LogToRoom != "" {
|
||||
mc := matrixClient // capture for closure
|
||||
matrixSender = func(roomID, msg string) {
|
||||
if err := mc.SendMarkdown(context.Background(), roomID, msg); err != nil {
|
||||
logger.Warn("audit_matrix_send_error", "room", roomID, "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
aw, auditErr := audit.New(cfg.Security.Audit, matrixSender, logger)
|
||||
if auditErr != nil {
|
||||
logger.Error("audit_writer_init_failed", "err", auditErr)
|
||||
} else {
|
||||
a.auditWriter = aw
|
||||
logger.Info("audit trail enabled",
|
||||
"log_file", cfg.Security.Audit.LogFile,
|
||||
"log_to_room", cfg.Security.Audit.LogToRoom,
|
||||
"include", cfg.Security.Audit.Include,
|
||||
)
|
||||
|
||||
// Wire tool_exec audit into the tool registry
|
||||
agentID := cfg.Agent.ID
|
||||
toolReg.SetAuditFunc(func(toolName string, durationMS int64, toolErr error) {
|
||||
detail := fmt.Sprintf("tool=%s duration_ms=%d", toolName, durationMS)
|
||||
if toolErr != nil {
|
||||
detail += " error=" + toolErr.Error()
|
||||
}
|
||||
a.emitAudit(audit.Event{
|
||||
AgentID: agentID,
|
||||
EventType: audit.EventToolExec,
|
||||
Detail: detail,
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Configure sanitization if enabled
|
||||
if cfg.Security.Sanitize.Enabled {
|
||||
minSev := parseSeverity(cfg.Security.Sanitize.MinSeverity)
|
||||
a.sanitizeOpts = &sanitize.Options{
|
||||
Mode: sanitize.ParseMode(cfg.Security.Sanitize.Mode),
|
||||
MinSeverity: minSev,
|
||||
DisabledPatterns: cfg.Security.Sanitize.DisabledPatterns,
|
||||
}
|
||||
logger.Info("input sanitization enabled",
|
||||
"mode", a.sanitizeOpts.Mode,
|
||||
"min_severity", minSev,
|
||||
)
|
||||
}
|
||||
|
||||
// Register built-in command handlers
|
||||
a.registerBuiltinCommands()
|
||||
|
||||
// Load prompt-commands from prompts/ directory
|
||||
a.loadPromptCommands()
|
||||
|
||||
// Register memory_clear_context with self as WindowClearer (after a is created)
|
||||
if cfg.Tools.Memory.Enabled && memInit.store != nil {
|
||||
toolReg.Register(toolmemory.NewMemoryClearContext(a, roomCtx))
|
||||
}
|
||||
|
||||
// Cron scheduler — only when schedules are configured
|
||||
if len(cfg.Schedules) > 0 {
|
||||
a.scheduler = shellcron.New(cfg.Schedules, matrixClient, llmFunc, cfg.LLM.Primary.Model, logger)
|
||||
logger.Info("cron scheduler configured", "schedules", len(cfg.Schedules))
|
||||
}
|
||||
|
||||
// Matrix event listener
|
||||
a.listener = matrix.NewListener(matrixClient, cfg.Matrix, a.handleEvent, logger)
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
// initCrypto initializes E2EE if enabled and returns the crypto store closer.
|
||||
func initCrypto(cfg *config.AgentConfig, matrixClient *matrix.Client, logger *slog.Logger) (io.Closer, error) {
|
||||
if !cfg.Matrix.Encryption.Enabled {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
storePath := filepath.Join(cfg.Matrix.Encryption.StorePath, "crypto.db")
|
||||
pickleKey := os.Getenv(cfg.Matrix.Encryption.PickleKeyEnv)
|
||||
logger.Info("initializing e2ee", "store", storePath)
|
||||
|
||||
cryptoStore, err := matrixClient.InitCrypto(context.Background(), storePath, pickleKey, cfg.Agent.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("e2ee init: %w", err)
|
||||
}
|
||||
|
||||
// Auto-fetch cross-signing private keys from SSSS if recovery key is configured.
|
||||
if envName := cfg.Matrix.Encryption.RecoveryKeyEnv; envName != "" {
|
||||
if rk := os.Getenv(envName); rk != "" {
|
||||
if err := matrixClient.FetchCrossSigningKeys(context.Background(), rk); err != nil {
|
||||
logger.Warn("failed to fetch cross-signing keys from SSSS (non-fatal)", "err", err)
|
||||
} else {
|
||||
logger.Info("cross-signing private keys fetched from SSSS")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sign own device with the self-signing key so Element shows it as verified.
|
||||
if err := matrixClient.SignOwnDevice(context.Background()); err != nil {
|
||||
logger.Warn("failed to sign own device (non-fatal)", "err", err)
|
||||
} else {
|
||||
logger.Info("own device signed with cross-signing key")
|
||||
}
|
||||
|
||||
logger.Info("e2ee ready")
|
||||
return cryptoStore, nil
|
||||
}
|
||||
|
||||
// RegisterCommand adds a custom command handler for this agent.
|
||||
// The spec provides metadata (aliases, description, usage) for !help.
|
||||
// Must be called before Run().
|
||||
func (a *Agent) RegisterCommand(spec command.Spec, handler CommandHandler) {
|
||||
a.commands[spec.Name] = handler
|
||||
a.cmdAliases[spec.Name] = spec.Name
|
||||
for _, alias := range spec.Aliases {
|
||||
a.cmdAliases[alias] = spec.Name
|
||||
}
|
||||
a.customSpecs = append(a.customSpecs, spec)
|
||||
a.logger.Info("command_registered", "command", spec.Name, "aliases", spec.Aliases)
|
||||
}
|
||||
|
||||
// SetBus attaches the agent to the inter-agent bus for orchestration.
|
||||
// Must be called before Run().
|
||||
func (a *Agent) SetBus(b *bus.Bus) {
|
||||
a.agentBus = b
|
||||
}
|
||||
|
||||
// SetInterceptor configures the listener to skip events in orchestrated rooms.
|
||||
func (a *Agent) SetInterceptor(fn matrix.InterceptFunc) {
|
||||
a.listener.SetInterceptor(fn)
|
||||
}
|
||||
|
||||
// SetMembershipNotify registers a callback for room membership changes.
|
||||
func (a *Agent) SetMembershipNotify(fn matrix.MembershipNotifyFunc) {
|
||||
a.listener.SetMembershipNotify(fn)
|
||||
}
|
||||
|
||||
// RawMatrixClient returns the underlying *mautrix.Client for room scanning.
|
||||
func (a *Agent) RawMatrixClient() *mautrix.Client {
|
||||
return a.matrix.Raw()
|
||||
}
|
||||
|
||||
// Stop cancels this agent's individual context, causing Run to return.
|
||||
// Safe to call multiple times.
|
||||
func (a *Agent) Stop() {
|
||||
if a.cancel != nil {
|
||||
a.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
// Done returns a channel that is closed when Run has returned.
|
||||
func (a *Agent) Done() <-chan struct{} {
|
||||
return a.done
|
||||
}
|
||||
|
||||
// Run starts the agent sync loop. Blocks until ctx is cancelled.
|
||||
func (a *Agent) Run(ctx context.Context) error {
|
||||
ctx, a.cancel = context.WithCancel(ctx)
|
||||
defer close(a.done)
|
||||
|
||||
if a.cryptoStore != nil {
|
||||
defer a.cryptoStore.Close()
|
||||
}
|
||||
if a.memStore != nil {
|
||||
defer a.memStore.Close()
|
||||
}
|
||||
if a.knowledgeStore != nil {
|
||||
defer a.knowledgeStore.Close()
|
||||
}
|
||||
if a.sharedKnowledgeStore != nil {
|
||||
defer a.sharedKnowledgeStore.Close()
|
||||
}
|
||||
if a.mcpManager != nil {
|
||||
defer a.mcpManager.Close()
|
||||
}
|
||||
if a.auditWriter != nil {
|
||||
defer a.auditWriter.Close()
|
||||
}
|
||||
a.logger.Info("agent starting",
|
||||
"id", a.cfg.Agent.ID,
|
||||
"name", a.cfg.Agent.Name,
|
||||
"tools", a.toolReg.Names(),
|
||||
)
|
||||
|
||||
// Set presence to online
|
||||
if err := a.matrix.SetPresence(ctx, event.PresenceOnline); err != nil {
|
||||
a.logger.Warn("failed to set presence online", "err", err)
|
||||
}
|
||||
defer func() {
|
||||
// Use background context since ctx is already cancelled at shutdown
|
||||
offlineCtx := context.Background()
|
||||
if err := a.matrix.SetPresence(offlineCtx, event.PresenceOffline); err != nil {
|
||||
a.logger.Warn("failed to set presence offline", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Start bus listener if connected to the orchestration bus
|
||||
if a.agentBus != nil {
|
||||
ch := a.agentBus.Subscribe(bus.AgentID(a.cfg.Agent.ID))
|
||||
go a.listenBus(ctx, ch)
|
||||
a.logger.Info("bus listener started")
|
||||
}
|
||||
|
||||
// Start cron scheduler in background goroutine (blocks until ctx cancelled)
|
||||
if a.scheduler != nil {
|
||||
go a.scheduler.Start(ctx)
|
||||
}
|
||||
|
||||
return a.listener.Run(ctx)
|
||||
}
|
||||
Reference in New Issue
Block a user