feat: implement memory management system with SQLite persistence, including conversation windows and episodic facts
This commit is contained in:
+165
-8
@@ -8,21 +8,27 @@ import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"maunium.net/go/mautrix/event"
|
||||
|
||||
"github.com/enmanuel/agents/internal/config"
|
||||
"github.com/enmanuel/agents/pkg/decision"
|
||||
coretypes "github.com/enmanuel/agents/pkg/llm"
|
||||
"github.com/enmanuel/agents/pkg/memory"
|
||||
"github.com/enmanuel/agents/pkg/personality"
|
||||
"github.com/enmanuel/agents/shell/effects"
|
||||
shelllm "github.com/enmanuel/agents/shell/llm"
|
||||
"github.com/enmanuel/agents/shell/matrix"
|
||||
shellmem "github.com/enmanuel/agents/shell/memory"
|
||||
"github.com/enmanuel/agents/shell/ssh"
|
||||
"github.com/enmanuel/agents/tools"
|
||||
)
|
||||
|
||||
const defaultMaxToolIterations = 5
|
||||
const (
|
||||
defaultMaxToolIterations = 5
|
||||
defaultWindowSize = 20
|
||||
)
|
||||
|
||||
// Agent is the assembled runtime: pure core + impure shell.
|
||||
type Agent struct {
|
||||
@@ -36,6 +42,20 @@ type Agent struct {
|
||||
toolReg *tools.Registry
|
||||
logger *slog.Logger
|
||||
cryptoStore io.Closer // non-nil when E2EE is enabled; closed on shutdown
|
||||
|
||||
// Memory
|
||||
windows map[string]memory.Window
|
||||
windowsMu sync.RWMutex
|
||||
memStore memory.Store // nil when memory is disabled
|
||||
windowSize int
|
||||
roomCtx *tools.RoomContext
|
||||
}
|
||||
|
||||
// ClearWindow resets the conversation window for a room. Implements tools.WindowClearer.
|
||||
func (a *Agent) ClearWindow(roomID string) {
|
||||
a.windowsMu.Lock()
|
||||
defer a.windowsMu.Unlock()
|
||||
a.windows[roomID] = memory.NewWindow(a.windowSize)
|
||||
}
|
||||
|
||||
// New assembles an Agent from its config, rules, and logger.
|
||||
@@ -100,8 +120,31 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (*
|
||||
// Effects runner
|
||||
runner := effects.NewRunner(matrixClient, sshExec, logger)
|
||||
|
||||
// Memory subsystem
|
||||
var memStore memory.Store
|
||||
windowSize := defaultWindowSize
|
||||
roomCtx := &tools.RoomContext{}
|
||||
|
||||
if cfg.Memory.Enabled {
|
||||
windowSize = cfg.Memory.WindowSize
|
||||
if windowSize <= 0 {
|
||||
windowSize = defaultWindowSize
|
||||
}
|
||||
|
||||
dbPath := cfg.Memory.DBPath
|
||||
if dbPath == "" {
|
||||
dbPath = filepath.Join("agents", cfg.Agent.ID, "data", "memory.db")
|
||||
}
|
||||
store, err := shellmem.New(dbPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("memory store: %w", err)
|
||||
}
|
||||
memStore = store
|
||||
logger.Info("memory enabled", "window_size", windowSize, "db", dbPath)
|
||||
}
|
||||
|
||||
// Tool registry — register tools enabled in config
|
||||
toolReg := buildToolRegistry(cfg, sshExec, matrixClient, logger)
|
||||
toolReg := buildToolRegistry(cfg, sshExec, matrixClient, memStore, roomCtx, logger)
|
||||
|
||||
a := &Agent{
|
||||
cfg: cfg,
|
||||
@@ -112,6 +155,15 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (*
|
||||
toolReg: toolReg,
|
||||
logger: logger,
|
||||
cryptoStore: cryptoStore,
|
||||
windows: make(map[string]memory.Window),
|
||||
memStore: memStore,
|
||||
windowSize: windowSize,
|
||||
roomCtx: roomCtx,
|
||||
}
|
||||
|
||||
// Register memory_clear_context with self as WindowClearer (after a is created)
|
||||
if cfg.Tools.Memory.Enabled && memStore != nil {
|
||||
toolReg.Register(tools.NewMemoryClearContext(a, roomCtx))
|
||||
}
|
||||
|
||||
// Matrix event listener
|
||||
@@ -125,6 +177,9 @@ func (a *Agent) Run(ctx context.Context) error {
|
||||
if a.cryptoStore != nil {
|
||||
defer a.cryptoStore.Close()
|
||||
}
|
||||
if a.memStore != nil {
|
||||
defer a.memStore.Close()
|
||||
}
|
||||
a.logger.Info("agent starting",
|
||||
"id", a.cfg.Agent.ID,
|
||||
"name", a.cfg.Agent.Name,
|
||||
@@ -142,9 +197,14 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext,
|
||||
"command", msgCtx.Command,
|
||||
)
|
||||
|
||||
roomID := evt.RoomID.String()
|
||||
|
||||
// Update room context for memory tools
|
||||
a.roomCtx.Set(roomID)
|
||||
|
||||
if a.cfg.Personality.Behavior.TypingIndicator {
|
||||
_ = a.matrix.SendTyping(ctx, evt.RoomID.String(), true)
|
||||
defer a.matrix.SendTyping(ctx, evt.RoomID.String(), false)
|
||||
_ = a.matrix.SendTyping(ctx, roomID, true)
|
||||
defer a.matrix.SendTyping(ctx, roomID, false)
|
||||
}
|
||||
|
||||
actions := decision.Evaluate(msgCtx, a.rules)
|
||||
@@ -171,6 +231,13 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext,
|
||||
expanded := make([]decision.Action, 0, len(actions))
|
||||
for _, act := range actions {
|
||||
if act.Kind == decision.ActionKindLLM {
|
||||
// Memory: load window + append user message before LLM call
|
||||
a.ensureWindowLoaded(ctx, roomID)
|
||||
a.appendToWindow(roomID, coretypes.Message{
|
||||
Role: coretypes.RoleUser, Content: msgCtx.Content,
|
||||
})
|
||||
a.persistMessage(ctx, roomID, coretypes.RoleUser, msgCtx.Content)
|
||||
|
||||
reply, err := a.runLLM(ctx, msgCtx)
|
||||
if err != nil {
|
||||
a.logger.Error("llm error", "err", err)
|
||||
@@ -183,13 +250,19 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext,
|
||||
Kind: decision.ActionKindReply,
|
||||
Reply: &decision.ReplyAction{Content: reply},
|
||||
})
|
||||
|
||||
// Memory: append assistant reply after LLM call
|
||||
a.appendToWindow(roomID, coretypes.Message{
|
||||
Role: coretypes.RoleAssistant, Content: reply,
|
||||
})
|
||||
a.persistMessage(ctx, roomID, coretypes.RoleAssistant, reply)
|
||||
}
|
||||
} else {
|
||||
expanded = append(expanded, act)
|
||||
}
|
||||
}
|
||||
|
||||
a.runner.Execute(ctx, evt.RoomID.String(), expanded)
|
||||
a.runner.Execute(ctx, roomID, expanded)
|
||||
}
|
||||
|
||||
func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (string, error) {
|
||||
@@ -201,8 +274,13 @@ func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (str
|
||||
// Load system prompt from file if configured, else use description
|
||||
systemPrompt := a.cfg.Agent.Description
|
||||
|
||||
messages := []coretypes.Message{
|
||||
{Role: coretypes.RoleUser, Content: msgCtx.Content},
|
||||
// Build messages: conversation history from window (includes current user msg)
|
||||
messages := a.getWindowMessages(msgCtx.RoomID)
|
||||
if len(messages) == 0 {
|
||||
// Fallback if memory is disabled: just the current message
|
||||
messages = []coretypes.Message{
|
||||
{Role: coretypes.RoleUser, Content: msgCtx.Content},
|
||||
}
|
||||
}
|
||||
|
||||
// Build tool specs for the LLM if tool_use is enabled
|
||||
@@ -294,8 +372,78 @@ func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext) (str
|
||||
return "I've reached the maximum number of tool iterations. Here's what I found so far.", nil
|
||||
}
|
||||
|
||||
// ── Memory helpers ───────────────────────────────────────────────────────
|
||||
|
||||
// ensureWindowLoaded loads the conversation window from SQLite on first access for a room.
|
||||
func (a *Agent) ensureWindowLoaded(ctx context.Context, roomID string) {
|
||||
a.windowsMu.Lock()
|
||||
defer a.windowsMu.Unlock()
|
||||
if _, ok := a.windows[roomID]; ok {
|
||||
return
|
||||
}
|
||||
w := memory.NewWindow(a.windowSize)
|
||||
if a.memStore != nil {
|
||||
msgs, err := a.memStore.LoadMessages(ctx, a.cfg.Agent.ID, roomID, a.windowSize)
|
||||
if err != nil {
|
||||
a.logger.Warn("failed to load message history", "room", roomID, "err", err)
|
||||
} else {
|
||||
for _, m := range msgs {
|
||||
w = w.Append(coretypes.Message{Role: m.Role, Content: m.Content})
|
||||
}
|
||||
if len(msgs) > 0 {
|
||||
a.logger.Debug("loaded message history", "room", roomID, "count", len(msgs))
|
||||
}
|
||||
}
|
||||
}
|
||||
a.windows[roomID] = w
|
||||
}
|
||||
|
||||
// appendToWindow adds a message to the in-memory conversation window.
|
||||
func (a *Agent) appendToWindow(roomID string, msg coretypes.Message) {
|
||||
a.windowsMu.Lock()
|
||||
defer a.windowsMu.Unlock()
|
||||
w, ok := a.windows[roomID]
|
||||
if !ok {
|
||||
w = memory.NewWindow(a.windowSize)
|
||||
}
|
||||
a.windows[roomID] = w.Append(msg)
|
||||
}
|
||||
|
||||
// getWindowMessages returns a copy of the conversation window for a room.
|
||||
func (a *Agent) getWindowMessages(roomID string) []coretypes.Message {
|
||||
a.windowsMu.RLock()
|
||||
defer a.windowsMu.RUnlock()
|
||||
w, ok := a.windows[roomID]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return w.ToLLMMessages()
|
||||
}
|
||||
|
||||
// persistMessage saves a message to the SQLite store (no-op if store is nil).
|
||||
func (a *Agent) persistMessage(ctx context.Context, roomID string, role coretypes.Role, content string) {
|
||||
if a.memStore == nil {
|
||||
return
|
||||
}
|
||||
if err := a.memStore.SaveMessage(ctx, memory.HistoryMessage{
|
||||
AgentID: a.cfg.Agent.ID,
|
||||
RoomID: roomID,
|
||||
Role: role,
|
||||
Content: content,
|
||||
}); err != nil {
|
||||
a.logger.Warn("failed to persist message", "room", roomID, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// buildToolRegistry creates a Registry with tools enabled in the agent's config.
|
||||
func buildToolRegistry(cfg *config.AgentConfig, sshExec *ssh.Executor, matrixClient *matrix.Client, logger *slog.Logger) *tools.Registry {
|
||||
func buildToolRegistry(
|
||||
cfg *config.AgentConfig,
|
||||
sshExec *ssh.Executor,
|
||||
matrixClient *matrix.Client,
|
||||
memStore memory.Store,
|
||||
roomCtx *tools.RoomContext,
|
||||
logger *slog.Logger,
|
||||
) *tools.Registry {
|
||||
reg := tools.NewRegistry()
|
||||
|
||||
if cfg.Tools.HTTP.Enabled {
|
||||
@@ -322,5 +470,14 @@ func buildToolRegistry(cfg *config.AgentConfig, sshExec *ssh.Executor, matrixCli
|
||||
reg.Register(tools.NewMatrixSend(matrixClient))
|
||||
logger.Debug("registered matrix tool")
|
||||
|
||||
// Memory tools (memory_clear_context registered later since it needs the Agent)
|
||||
if cfg.Tools.Memory.Enabled && memStore != nil {
|
||||
reg.Register(tools.NewMemorySave(cfg.Agent.ID, memStore))
|
||||
reg.Register(tools.NewMemoryRecall(cfg.Agent.ID, memStore))
|
||||
reg.Register(tools.NewMemoryForget(cfg.Agent.ID, memStore))
|
||||
reg.Register(tools.NewMemorySummary(cfg.Agent.ID, memStore))
|
||||
logger.Debug("registered memory tools")
|
||||
}
|
||||
|
||||
return reg
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user