diff --git a/agents/handler.go b/agents/handler.go index 6e83dd3..3ed43be 100644 --- a/agents/handler.go +++ b/agents/handler.go @@ -11,6 +11,7 @@ import ( coretypes "github.com/enmanuel/agents/pkg/llm" "github.com/enmanuel/agents/pkg/orchestration" "github.com/enmanuel/agents/pkg/sanitize" + "github.com/enmanuel/agents/shell/audit" "github.com/enmanuel/agents/shell/bus" ) @@ -25,6 +26,15 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, roomID := evt.RoomID.String() + // Audit: message_received + a.emitAudit(audit.Event{ + AgentID: a.cfg.Agent.ID, + EventType: audit.EventMessageReceived, + SenderID: msgCtx.SenderID, + RoomID: roomID, + Detail: fmt.Sprintf("is_dm=%v is_mention=%v", msgCtx.IsDirectMsg, msgCtx.IsMention), + }) + // Update room context for memory tools a.roomCtx.Set(roomID) @@ -59,6 +69,16 @@ func (a *Agent) handleEvent(ctx context.Context, msgCtx decision.MessageContext, return } a.logger.Info("command_executed", "command", cmdName) + + // Audit: command_exec + a.emitAudit(audit.Event{ + AgentID: a.cfg.Agent.ID, + EventType: audit.EventCommandExec, + SenderID: msgCtx.SenderID, + RoomID: roomID, + Detail: fmt.Sprintf("command=%s", cmdName), + }) + reply := handler(ctx, msgCtx) _ = a.sendReply(ctx, roomID, msgCtx.EventID, msgCtx.ThreadID, reply) return @@ -341,6 +361,13 @@ func parseSeverity(s string) sanitize.Severity { } } +// emitAudit writes an audit event if the audit writer is enabled. +func (a *Agent) emitAudit(evt audit.Event) { + if a.auditWriter != nil { + a.auditWriter.Emit(evt) + } +} + // sanitizeInput runs prompt injection detection on the message content. // Returns the (possibly modified) content and true if the message should be rejected. func (a *Agent) sanitizeInput(content, roomID, senderID string) (string, bool) { diff --git a/agents/llm.go b/agents/llm.go index e223020..073261f 100644 --- a/agents/llm.go +++ b/agents/llm.go @@ -12,6 +12,7 @@ import ( "github.com/enmanuel/agents/pkg/decision" coretypes "github.com/enmanuel/agents/pkg/llm" "github.com/enmanuel/agents/pkg/personality" + "github.com/enmanuel/agents/shell/audit" shelllm "github.com/enmanuel/agents/shell/llm" ) @@ -75,6 +76,12 @@ func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext, memK resp, err := a.llm(ctx, req) if err != nil { a.logger.Error("LLM call failed", "model", req.Model, "err", err) + // Audit: llm_error + a.emitAudit(audit.Event{ + AgentID: a.cfg.Agent.ID, + EventType: audit.EventLLMError, + Detail: fmt.Sprintf("provider=%s model=%s error=%s", a.cfg.LLM.Primary.Provider, req.Model, err), + }) return "", err } @@ -84,6 +91,13 @@ func (a *Agent) runLLM(ctx context.Context, msgCtx decision.MessageContext, memK "finish_reason", resp.FinishReason, ) + // Audit: llm_request + a.emitAudit(audit.Event{ + AgentID: a.cfg.Agent.ID, + EventType: audit.EventLLMRequest, + Detail: fmt.Sprintf("provider=%s model=%s content_len=%d tool_calls=%d", a.cfg.LLM.Primary.Provider, req.Model, len(resp.Content), len(resp.ToolCalls)), + }) + // No tool calls — return the text response if len(resp.ToolCalls) == 0 { return resp.Content, nil diff --git a/agents/runtime.go b/agents/runtime.go index 5df8171..94f844b 100644 --- a/agents/runtime.go +++ b/agents/runtime.go @@ -22,6 +22,7 @@ import ( "github.com/enmanuel/agents/pkg/memory" "github.com/enmanuel/agents/pkg/personality" "github.com/enmanuel/agents/pkg/sanitize" + "github.com/enmanuel/agents/shell/audit" "github.com/enmanuel/agents/shell/bus" shellcron "github.com/enmanuel/agents/shell/cron" "github.com/enmanuel/agents/shell/effects" @@ -39,6 +40,14 @@ const ( defaultWindowSize = 20 ) +// Option configures optional Agent behaviour. +type Option func(*Agent) + +// WithLogDir sets the base directory for JSONL logs (used by !metrics command). +func WithLogDir(dir string) Option { + return func(a *Agent) { a.logDir = dir } +} + // CommandHandler executes a built-in command and returns the response text. type CommandHandler func(ctx context.Context, msgCtx decision.MessageContext) string @@ -97,12 +106,19 @@ type Agent struct { // Scheduler — nil when no schedules are configured scheduler *shellcron.Scheduler + + // Audit writer — nil when audit is disabled + auditWriter *audit.Writer + + // LogDir — base directory for JSONL logs (used by !metrics) + logDir string } // New assembles an Agent from its config, rules, pre-resolved ACL, and logger. // The ACL is resolved externally (e.g. from security/ YAML files) and injected here. // Pass acl.ACL{} (empty) for open access (no restrictions). -func New(cfg *config.AgentConfig, rules []decision.Rule, agentACL acl.ACL, logger *slog.Logger) (*Agent, error) { +// logDir is the base directory for JSONL logs (used by !metrics command); empty disables metrics. +func New(cfg *config.AgentConfig, rules []decision.Rule, agentACL acl.ACL, logger *slog.Logger, opts ...Option) (*Agent, error) { // Matrix client matrixClient, err := matrix.New(cfg.Matrix) if err != nil { @@ -177,6 +193,49 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, agentACL acl.ACL, logge roomCtx: roomCtx, } + // Apply optional configuration + for _, opt := range opts { + opt(a) + } + + // Initialize audit writer if enabled + if cfg.Security.Audit.Enabled { + var matrixSender audit.MatrixSender + if cfg.Security.Audit.LogToRoom != "" { + mc := matrixClient // capture for closure + matrixSender = func(roomID, msg string) { + if err := mc.SendMarkdown(context.Background(), roomID, msg); err != nil { + logger.Warn("audit_matrix_send_error", "room", roomID, "err", err) + } + } + } + aw, auditErr := audit.New(cfg.Security.Audit, matrixSender, logger) + if auditErr != nil { + logger.Error("audit_writer_init_failed", "err", auditErr) + } else { + a.auditWriter = aw + logger.Info("audit trail enabled", + "log_file", cfg.Security.Audit.LogFile, + "log_to_room", cfg.Security.Audit.LogToRoom, + "include", cfg.Security.Audit.Include, + ) + + // Wire tool_exec audit into the tool registry + agentID := cfg.Agent.ID + toolReg.SetAuditFunc(func(toolName string, durationMS int64, toolErr error) { + detail := fmt.Sprintf("tool=%s duration_ms=%d", toolName, durationMS) + if toolErr != nil { + detail += " error=" + toolErr.Error() + } + a.emitAudit(audit.Event{ + AgentID: agentID, + EventType: audit.EventToolExec, + Detail: detail, + }) + }) + } + } + // Configure sanitization if enabled if cfg.Security.Sanitize.Enabled { minSev := parseSeverity(cfg.Security.Sanitize.MinSeverity) @@ -318,6 +377,9 @@ func (a *Agent) Run(ctx context.Context) error { if a.mcpManager != nil { defer a.mcpManager.Close() } + if a.auditWriter != nil { + defer a.auditWriter.Close() + } a.logger.Info("agent starting", "id", a.cfg.Agent.ID, "name", a.cfg.Agent.Name, diff --git a/cmd/launcher/main.go b/cmd/launcher/main.go index a5c7d5c..678b04b 100644 --- a/cmd/launcher/main.go +++ b/cmd/launcher/main.go @@ -192,7 +192,7 @@ func main() { "acl_empty", agentACL.Empty(), ) - a, cErr := agents.New(cfg, rules, agentACL, agentLogger) + a, cErr := agents.New(cfg, rules, agentACL, agentLogger, agents.WithLogDir(logDir)) if cErr != nil { logger.Error("failed to create agent", "id", cfg.Agent.ID, "err", cErr) agentCleanup() diff --git a/shell/audit/writer.go b/shell/audit/writer.go new file mode 100644 index 0000000..208ebe7 --- /dev/null +++ b/shell/audit/writer.go @@ -0,0 +1,133 @@ +// Package audit provides an audit event writer for compliance and review. +// Events are written to a JSONL file and/or sent to a Matrix room. +// This is fully impure (I/O): belongs in shell/. +package audit + +import ( + "encoding/json" + "fmt" + "log/slog" + "os" + "path/filepath" + "sync" + "time" + + "github.com/enmanuel/agents/internal/config" +) + +// Event types emitted by the audit trail. +const ( + EventMessageReceived = "message_received" + EventCommandExec = "command_exec" + EventToolExec = "tool_exec" + EventLLMRequest = "llm_request" + EventLLMError = "llm_error" +) + +// Event represents a single audit trail entry. +type Event struct { + Time time.Time `json:"time"` + AgentID string `json:"agent_id"` + EventType string `json:"event_type"` + SenderID string `json:"sender_id,omitempty"` + RoomID string `json:"room_id,omitempty"` + Detail string `json:"detail,omitempty"` +} + +// MatrixSender is a function that sends a message to a Matrix room. +// Decouples audit from the Matrix client. +type MatrixSender func(roomID, msg string) + +// Writer writes audit events to a JSONL file and/or a Matrix room. +type Writer struct { + cfg config.AuditCfg + sender MatrixSender // may be nil + logger *slog.Logger + + include map[string]bool // allowlist of event types; empty = all + + mu sync.Mutex + file *os.File +} + +// New creates an AuditWriter from the given config. +// matrixSender may be nil if LogToRoom is not configured. +func New(cfg config.AuditCfg, sender MatrixSender, logger *slog.Logger) (*Writer, error) { + w := &Writer{ + cfg: cfg, + sender: sender, + logger: logger.With("component", "audit"), + } + + // Build include allowlist + if len(cfg.Include) > 0 { + w.include = make(map[string]bool, len(cfg.Include)) + for _, t := range cfg.Include { + w.include[t] = true + } + } + + // Open log file if configured + if cfg.LogFile != "" { + dir := filepath.Dir(cfg.LogFile) + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("create audit log dir %s: %w", dir, err) + } + f, err := os.OpenFile(cfg.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + return nil, fmt.Errorf("open audit log %s: %w", cfg.LogFile, err) + } + w.file = f + } + + return w, nil +} + +// Emit writes an audit event. If the event type is not in the include list +// (when non-empty), the event is silently dropped. Thread-safe. +func (w *Writer) Emit(evt Event) { + // Filter by include allowlist (empty = pass all) + if len(w.include) > 0 && !w.include[evt.EventType] { + return + } + + // Ensure time is set + if evt.Time.IsZero() { + evt.Time = time.Now().UTC() + } + + // Write to JSONL file + if w.file != nil { + data, err := json.Marshal(evt) + if err != nil { + w.logger.Error("audit_marshal_error", "err", err) + return + } + data = append(data, '\n') + + w.mu.Lock() + _, writeErr := w.file.Write(data) + w.mu.Unlock() + + if writeErr != nil { + w.logger.Error("audit_write_error", "err", writeErr) + } + } + + // Send to Matrix room + if w.sender != nil && w.cfg.LogToRoom != "" { + msg := fmt.Sprintf("**[audit]** `%s` | agent=%s sender=%s room=%s | %s", + evt.EventType, evt.AgentID, evt.SenderID, evt.RoomID, evt.Detail) + w.sender(w.cfg.LogToRoom, msg) + } +} + +// Close closes the underlying log file. +func (w *Writer) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + if w.file != nil { + return w.file.Close() + } + return nil +} diff --git a/shell/audit/writer_test.go b/shell/audit/writer_test.go new file mode 100644 index 0000000..675343c --- /dev/null +++ b/shell/audit/writer_test.go @@ -0,0 +1,271 @@ +package audit + +import ( + "encoding/json" + "log/slog" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/enmanuel/agents/internal/config" +) + +func testLogger() *slog.Logger { + return slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) +} + +func TestEmit_WritesToFile(t *testing.T) { + dir := t.TempDir() + logFile := filepath.Join(dir, "audit.jsonl") + + cfg := config.AuditCfg{ + Enabled: true, + LogFile: logFile, + } + + w, err := New(cfg, nil, testLogger()) + if err != nil { + t.Fatalf("New: %v", err) + } + defer w.Close() + + w.Emit(Event{ + Time: time.Date(2026, 4, 9, 12, 0, 0, 0, time.UTC), + AgentID: "test-bot", + EventType: EventCommandExec, + SenderID: "@user:example.com", + RoomID: "!room:example.com", + Detail: "command=help", + }) + + data, err := os.ReadFile(logFile) + if err != nil { + t.Fatalf("ReadFile: %v", err) + } + + lines := strings.Split(strings.TrimSpace(string(data)), "\n") + if len(lines) != 1 { + t.Fatalf("expected 1 line, got %d", len(lines)) + } + + var evt Event + if err := json.Unmarshal([]byte(lines[0]), &evt); err != nil { + t.Fatalf("Unmarshal: %v", err) + } + + if evt.AgentID != "test-bot" { + t.Errorf("AgentID = %q, want %q", evt.AgentID, "test-bot") + } + if evt.EventType != EventCommandExec { + t.Errorf("EventType = %q, want %q", evt.EventType, EventCommandExec) + } + if evt.SenderID != "@user:example.com" { + t.Errorf("SenderID = %q, want %q", evt.SenderID, "@user:example.com") + } + if evt.Detail != "command=help" { + t.Errorf("Detail = %q, want %q", evt.Detail, "command=help") + } +} + +func TestEmit_IncludeFilter(t *testing.T) { + dir := t.TempDir() + logFile := filepath.Join(dir, "audit.jsonl") + + cfg := config.AuditCfg{ + Enabled: true, + LogFile: logFile, + Include: []string{EventCommandExec, EventToolExec}, + } + + w, err := New(cfg, nil, testLogger()) + if err != nil { + t.Fatalf("New: %v", err) + } + defer w.Close() + + // Should be written (in include list) + w.Emit(Event{AgentID: "bot", EventType: EventCommandExec, Detail: "included"}) + + // Should NOT be written (not in include list) + w.Emit(Event{AgentID: "bot", EventType: EventLLMRequest, Detail: "excluded"}) + + // Should be written + w.Emit(Event{AgentID: "bot", EventType: EventToolExec, Detail: "also-included"}) + + data, err := os.ReadFile(logFile) + if err != nil { + t.Fatalf("ReadFile: %v", err) + } + + lines := strings.Split(strings.TrimSpace(string(data)), "\n") + if len(lines) != 2 { + t.Fatalf("expected 2 lines (filtered), got %d: %s", len(lines), string(data)) + } + + // Verify content + var evt0, evt1 Event + json.Unmarshal([]byte(lines[0]), &evt0) + json.Unmarshal([]byte(lines[1]), &evt1) + + if evt0.EventType != EventCommandExec { + t.Errorf("line 0 EventType = %q, want %q", evt0.EventType, EventCommandExec) + } + if evt1.EventType != EventToolExec { + t.Errorf("line 1 EventType = %q, want %q", evt1.EventType, EventToolExec) + } +} + +func TestEmit_NoLogFile_OnlyRoom(t *testing.T) { + var sent []string + var mu sync.Mutex + sender := func(roomID, msg string) { + mu.Lock() + sent = append(sent, roomID+"|"+msg) + mu.Unlock() + } + + cfg := config.AuditCfg{ + Enabled: true, + LogToRoom: "!audit:example.com", + // No LogFile + } + + w, err := New(cfg, sender, testLogger()) + if err != nil { + t.Fatalf("New: %v", err) + } + defer w.Close() + + w.Emit(Event{AgentID: "bot", EventType: EventMessageReceived, Detail: "test"}) + + mu.Lock() + defer mu.Unlock() + if len(sent) != 1 { + t.Fatalf("expected 1 message sent, got %d", len(sent)) + } + if !strings.HasPrefix(sent[0], "!audit:example.com|") { + t.Errorf("message sent to wrong room: %s", sent[0]) + } +} + +func TestEmit_NoRoom_OnlyFile(t *testing.T) { + dir := t.TempDir() + logFile := filepath.Join(dir, "audit.jsonl") + + senderCalled := false + sender := func(roomID, msg string) { + senderCalled = true + } + + cfg := config.AuditCfg{ + Enabled: true, + LogFile: logFile, + // No LogToRoom + } + + w, err := New(cfg, sender, testLogger()) + if err != nil { + t.Fatalf("New: %v", err) + } + defer w.Close() + + w.Emit(Event{AgentID: "bot", EventType: EventCommandExec, Detail: "test"}) + + if senderCalled { + t.Error("sender was called despite no LogToRoom configured") + } + + data, err := os.ReadFile(logFile) + if err != nil { + t.Fatalf("ReadFile: %v", err) + } + if len(strings.TrimSpace(string(data))) == 0 { + t.Error("expected data in log file, got empty") + } +} + +func TestEmit_SetsTimeIfZero(t *testing.T) { + dir := t.TempDir() + logFile := filepath.Join(dir, "audit.jsonl") + + cfg := config.AuditCfg{ + Enabled: true, + LogFile: logFile, + } + + w, err := New(cfg, nil, testLogger()) + if err != nil { + t.Fatalf("New: %v", err) + } + defer w.Close() + + before := time.Now().UTC() + w.Emit(Event{AgentID: "bot", EventType: EventCommandExec}) + after := time.Now().UTC() + + data, err := os.ReadFile(logFile) + if err != nil { + t.Fatalf("ReadFile: %v", err) + } + + var evt Event + json.Unmarshal([]byte(strings.TrimSpace(string(data))), &evt) + + if evt.Time.Before(before) || evt.Time.After(after) { + t.Errorf("Time %v not in range [%v, %v]", evt.Time, before, after) + } +} + +func TestEmit_EmptyInclude_PassesAll(t *testing.T) { + dir := t.TempDir() + logFile := filepath.Join(dir, "audit.jsonl") + + cfg := config.AuditCfg{ + Enabled: true, + LogFile: logFile, + Include: []string{}, // empty = pass all + } + + w, err := New(cfg, nil, testLogger()) + if err != nil { + t.Fatalf("New: %v", err) + } + defer w.Close() + + w.Emit(Event{AgentID: "bot", EventType: EventCommandExec}) + w.Emit(Event{AgentID: "bot", EventType: EventLLMRequest}) + w.Emit(Event{AgentID: "bot", EventType: EventToolExec}) + + data, err := os.ReadFile(logFile) + if err != nil { + t.Fatalf("ReadFile: %v", err) + } + + lines := strings.Split(strings.TrimSpace(string(data)), "\n") + if len(lines) != 3 { + t.Errorf("expected 3 lines (all passed), got %d", len(lines)) + } +} + +func TestNew_CreatesDirectory(t *testing.T) { + dir := t.TempDir() + logFile := filepath.Join(dir, "subdir", "nested", "audit.jsonl") + + cfg := config.AuditCfg{ + Enabled: true, + LogFile: logFile, + } + + w, err := New(cfg, nil, testLogger()) + if err != nil { + t.Fatalf("New: %v", err) + } + w.Close() + + if _, err := os.Stat(logFile); os.IsNotExist(err) { + t.Error("log file was not created") + } +} diff --git a/tools/registry.go b/tools/registry.go index 2c8761d..6a3a1bb 100644 --- a/tools/registry.go +++ b/tools/registry.go @@ -12,11 +12,16 @@ import ( "github.com/enmanuel/agents/shell/logger" ) +// AuditFunc is called after each tool execution for audit purposes. +// The registry does not depend on the audit package directly. +type AuditFunc func(toolName string, durationMS int64, err error) + // Registry holds available tools keyed by name. type Registry struct { tools map[string]Tool logger *slog.Logger rateLimiter *RateLimiter // nil when rate limiting is disabled + auditFn AuditFunc // nil when audit is disabled } // NewRegistry creates an empty registry. @@ -60,6 +65,12 @@ func (r *Registry) SetRateLimiter(rl *RateLimiter) { r.rateLimiter = rl } +// SetAuditFunc attaches an audit callback to the registry. +// When set, it is called after each tool execution. +func (r *Registry) SetAuditFunc(fn AuditFunc) { + r.auditFn = fn +} + // ExecuteForRoom is like Execute but checks the per-room rate limit first. // If the rate limit is exceeded, it returns an error result without executing. func (r *Registry) ExecuteForRoom(ctx context.Context, name, argsJSON, roomID string) Result { @@ -99,6 +110,11 @@ func (r *Registry) Execute(ctx context.Context, name string, argsJSON string) Re r.logger.Info("tool_exec_end", "tool", name, logger.FieldDurationMS, ms) } + // Audit callback + if r.auditFn != nil { + r.auditFn(name, ms, result.Err) + } + return result }