feat: integrar structured logging en todos los componentes del shell

Se propaga *slog.Logger a todos los componentes impuros del shell:
- shell/bus/ — logs de subscribe, send, reply, timeout, unsubscribe
- shell/effects/ — duración y resultado de cada action ejecutada
- shell/llm/ (anthropic, openai, factory) — request/response con tokens, duración, fallback
- shell/memory/sqlite — open, save, recall, close con detalles
- shell/ssh/ — inicio, fin, errores y duración de comandos SSH
- tools/registry — registro, ejecución y errores de herramientas

Se usa el paquete shell/logger para field names consistentes (FieldDurationMS, FieldTokensUsed, etc.).
Cada componente recibe el logger por inyección de dependencias, sin globals.
Las firmas de New/FromConfig se actualizan para aceptar *slog.Logger.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-06 21:53:31 +00:00
parent 6858a5f13e
commit 5697b92ab8
11 changed files with 175 additions and 30 deletions
+7 -6
View File
@@ -114,21 +114,22 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (*
} }
// SSH executor // SSH executor
sshExec := ssh.NewExecutor(cfg.SSH) sshExec := ssh.NewExecutor(cfg.SSH, logger)
// LLM client // LLM client
primaryLLM, err := shelllm.FromConfig(cfg.LLM.Primary) llmLog := logger.With("component", "llm")
primaryLLM, err := shelllm.FromConfig(cfg.LLM.Primary, llmLog)
if err != nil { if err != nil {
return nil, fmt.Errorf("primary LLM: %w", err) return nil, fmt.Errorf("primary LLM: %w", err)
} }
var llmFunc coretypes.CompleteFunc = primaryLLM var llmFunc coretypes.CompleteFunc = primaryLLM
if cfg.LLM.Fallback.Provider != "" { if cfg.LLM.Fallback.Provider != "" {
fallbackLLM, err := shelllm.FromConfig(cfg.LLM.Fallback) fallbackLLM, err := shelllm.FromConfig(cfg.LLM.Fallback, llmLog)
if err != nil { if err != nil {
logger.Warn("fallback LLM config error", "err", err) logger.Warn("fallback LLM config error", "err", err)
} else { } else {
llmFunc = shelllm.WithFallback(primaryLLM, fallbackLLM) llmFunc = shelllm.WithFallback(primaryLLM, fallbackLLM, llmLog)
} }
} }
@@ -150,7 +151,7 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (*
if dbPath == "" { if dbPath == "" {
dbPath = filepath.Join("agents", cfg.Agent.ID, "data", "memory.db") dbPath = filepath.Join("agents", cfg.Agent.ID, "data", "memory.db")
} }
store, err := shellmem.New(dbPath) store, err := shellmem.New(dbPath, logger)
if err != nil { if err != nil {
return nil, fmt.Errorf("memory store: %w", err) return nil, fmt.Errorf("memory store: %w", err)
} }
@@ -607,7 +608,7 @@ func buildToolRegistry(
roomCtx *tools.RoomContext, roomCtx *tools.RoomContext,
logger *slog.Logger, logger *slog.Logger,
) *tools.Registry { ) *tools.Registry {
reg := tools.NewRegistry() reg := tools.NewRegistry(logger)
if cfg.Tools.HTTP.Enabled { if cfg.Tools.HTTP.Enabled {
reg.Register(tools.NewHTTPGet(cfg.Tools.HTTP)) reg.Register(tools.NewHTTPGet(cfg.Tools.HTTP))
+1 -1
View File
@@ -87,7 +87,7 @@ func main() {
defer stop() defer stop()
// ── Shared bus for inter-agent communication ── // ── Shared bus for inter-agent communication ──
agentBus := bus.New() agentBus := bus.New(logger)
// ── Start special agents (orchestrator, etc.) BEFORE normal bots ── // ── Start special agents (orchestrator, etc.) BEFORE normal bots ──
orch, err := startOrchestrator(agentBus, logger) orch, err := startOrchestrator(agentBus, logger)
+17 -3
View File
@@ -4,6 +4,7 @@ package bus
import ( import (
"context" "context"
"fmt" "fmt"
"log/slog"
"sync" "sync"
"time" "time"
) )
@@ -30,15 +31,18 @@ type Bus struct {
mu sync.RWMutex mu sync.RWMutex
channels map[AgentID]chan AgentMessage channels map[AgentID]chan AgentMessage
replyMu sync.Mutex replyMu sync.Mutex
replyChs map[string]chan AgentMessage // taskID → one-shot reply channel replyChs map[string]chan AgentMessage // taskID → one-shot reply channel
logger *slog.Logger
} }
// New creates a new Bus. // New creates a new Bus.
func New() *Bus { func New(logger *slog.Logger) *Bus {
return &Bus{ return &Bus{
channels: make(map[AgentID]chan AgentMessage), channels: make(map[AgentID]chan AgentMessage),
replyChs: make(map[string]chan AgentMessage), replyChs: make(map[string]chan AgentMessage),
logger: logger.With("component", "bus"),
} }
} }
@@ -48,6 +52,7 @@ func (b *Bus) Subscribe(id AgentID) <-chan AgentMessage {
defer b.mu.Unlock() defer b.mu.Unlock()
ch := make(chan AgentMessage, 64) ch := make(chan AgentMessage, 64)
b.channels[id] = ch b.channels[id] = ch
b.logger.Info("bus_subscribe", "agent", id)
return ch return ch
} }
@@ -57,12 +62,15 @@ func (b *Bus) Send(msg AgentMessage) error {
ch, ok := b.channels[msg.To] ch, ok := b.channels[msg.To]
b.mu.RUnlock() b.mu.RUnlock()
if !ok { if !ok {
b.logger.Warn("bus_not_found", "to", msg.To, "from", msg.From, "kind", msg.Kind)
return fmt.Errorf("agent %q not registered on bus", msg.To) return fmt.Errorf("agent %q not registered on bus", msg.To)
} }
select { select {
case ch <- msg: case ch <- msg:
b.logger.Debug("bus_send", "from", msg.From, "to", msg.To, "kind", msg.Kind)
return nil return nil
default: default:
b.logger.Warn("bus_queue_full", "to", msg.To, "from", msg.From, "kind", msg.Kind)
return fmt.Errorf("agent %q message queue full", msg.To) return fmt.Errorf("agent %q message queue full", msg.To)
} }
} }
@@ -86,6 +94,8 @@ func (b *Bus) SendAndWait(ctx context.Context, msg AgentMessage, taskID string,
return AgentMessage{}, err return AgentMessage{}, err
} }
b.logger.Debug("bus_send_and_wait", "task", taskID, "to", msg.To, "timeout", timeout)
timer := time.NewTimer(timeout) timer := time.NewTimer(timeout)
defer timer.Stop() defer timer.Stop()
@@ -93,6 +103,7 @@ func (b *Bus) SendAndWait(ctx context.Context, msg AgentMessage, taskID string,
case reply := <-ch: case reply := <-ch:
return reply, nil return reply, nil
case <-timer.C: case <-timer.C:
b.logger.Warn("bus_timeout", "task", taskID, "to", msg.To, "timeout", timeout)
return AgentMessage{}, fmt.Errorf("task %s: delegation timeout after %s", taskID, timeout) return AgentMessage{}, fmt.Errorf("task %s: delegation timeout after %s", taskID, timeout)
case <-ctx.Done(): case <-ctx.Done():
return AgentMessage{}, ctx.Err() return AgentMessage{}, ctx.Err()
@@ -109,8 +120,10 @@ func (b *Bus) Reply(taskID string, msg AgentMessage) error {
if ok { if ok {
select { select {
case ch <- msg: case ch <- msg:
b.logger.Debug("bus_reply", "task", taskID, "from", msg.From)
return nil return nil
default: default:
b.logger.Warn("bus_reply_full", "task", taskID)
return fmt.Errorf("reply channel full for task %s", taskID) return fmt.Errorf("reply channel full for task %s", taskID)
} }
} }
@@ -125,5 +138,6 @@ func (b *Bus) Unsubscribe(id AgentID) {
if ch, ok := b.channels[id]; ok { if ch, ok := b.channels[id]; ok {
close(ch) close(ch)
delete(b.channels, id) delete(b.channels, id)
b.logger.Info("bus_unsubscribe", "agent", id)
} }
} }
+8 -1
View File
@@ -5,8 +5,10 @@ import (
"context" "context"
"fmt" "fmt"
"log/slog" "log/slog"
"time"
"github.com/enmanuel/agents/pkg/decision" "github.com/enmanuel/agents/pkg/decision"
"github.com/enmanuel/agents/shell/logger"
"github.com/enmanuel/agents/shell/ssh" "github.com/enmanuel/agents/shell/ssh"
) )
@@ -37,12 +39,17 @@ func NewRunner(matrix MatrixSender, ssh *ssh.Executor, logger *slog.Logger) *Run
// Execute runs each action sequentially and returns results. // Execute runs each action sequentially and returns results.
func (r *Runner) Execute(ctx context.Context, roomID string, actions []decision.Action) []Result { func (r *Runner) Execute(ctx context.Context, roomID string, actions []decision.Action) []Result {
r.logger.Debug("effects_batch", "room", roomID, "count", len(actions))
results := make([]Result, 0, len(actions)) results := make([]Result, 0, len(actions))
for _, a := range actions { for _, a := range actions {
start := time.Now()
res := r.executeOne(ctx, roomID, a) res := r.executeOne(ctx, roomID, a)
ms := time.Since(start).Milliseconds()
results = append(results, res) results = append(results, res)
if res.Err != nil { if res.Err != nil {
r.logger.Error("action failed", "kind", a.Kind, "err", res.Err) r.logger.Error("action_failed", logger.FieldAction, a.Kind, logger.FieldDurationMS, ms, "err", res.Err)
} else {
r.logger.Info("action_done", logger.FieldAction, a.Kind, logger.FieldDurationMS, ms)
} }
} }
return results return results
+35 -2
View File
@@ -7,17 +7,20 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log/slog"
"net/http" "net/http"
"os" "os"
"time"
coretypes "github.com/enmanuel/agents/pkg/llm" coretypes "github.com/enmanuel/agents/pkg/llm"
"github.com/enmanuel/agents/shell/logger"
) )
const anthropicAPIBase = "https://api.anthropic.com/v1" const anthropicAPIBase = "https://api.anthropic.com/v1"
const anthropicVersion = "2023-06-01" const anthropicVersion = "2023-06-01"
// NewAnthropicComplete returns a CompleteFunc backed by the Anthropic API. // NewAnthropicComplete returns a CompleteFunc backed by the Anthropic API.
func NewAnthropicComplete(apiKeyEnv, baseURL string) coretypes.CompleteFunc { func NewAnthropicComplete(apiKeyEnv, baseURL string, log *slog.Logger) coretypes.CompleteFunc {
if baseURL == "" { if baseURL == "" {
baseURL = anthropicAPIBase baseURL = anthropicAPIBase
} }
@@ -28,6 +31,13 @@ func NewAnthropicComplete(apiKeyEnv, baseURL string) coretypes.CompleteFunc {
return coretypes.CompletionResponse{}, fmt.Errorf("env var %s is not set", apiKeyEnv) return coretypes.CompletionResponse{}, fmt.Errorf("env var %s is not set", apiKeyEnv)
} }
log.Info("llm_request",
"provider", "anthropic",
"model", req.Model,
"messages", len(req.Messages),
"tools", len(req.Tools),
)
body := toAnthropicRequest(req) body := toAnthropicRequest(req)
raw, err := json.Marshal(body) raw, err := json.Marshal(body)
if err != nil { if err != nil {
@@ -42,8 +52,11 @@ func NewAnthropicComplete(apiKeyEnv, baseURL string) coretypes.CompleteFunc {
httpReq.Header.Set("anthropic-version", anthropicVersion) httpReq.Header.Set("anthropic-version", anthropicVersion)
httpReq.Header.Set("content-type", "application/json") httpReq.Header.Set("content-type", "application/json")
start := time.Now()
resp, err := http.DefaultClient.Do(httpReq) resp, err := http.DefaultClient.Do(httpReq)
if err != nil { if err != nil {
ms := time.Since(start).Milliseconds()
log.Error("llm_error", "provider", "anthropic", logger.FieldDurationMS, ms, "err", err)
return coretypes.CompletionResponse{}, fmt.Errorf("anthropic request: %w", err) return coretypes.CompletionResponse{}, fmt.Errorf("anthropic request: %w", err)
} }
defer resp.Body.Close() defer resp.Body.Close()
@@ -52,11 +65,31 @@ func NewAnthropicComplete(apiKeyEnv, baseURL string) coretypes.CompleteFunc {
if err != nil { if err != nil {
return coretypes.CompletionResponse{}, fmt.Errorf("read response: %w", err) return coretypes.CompletionResponse{}, fmt.Errorf("read response: %w", err)
} }
ms := time.Since(start).Milliseconds()
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
log.Error("llm_error", "provider", "anthropic", logger.FieldDurationMS, ms, "status", resp.StatusCode)
return coretypes.CompletionResponse{}, fmt.Errorf("anthropic error %d: %s", resp.StatusCode, respBytes) return coretypes.CompletionResponse{}, fmt.Errorf("anthropic error %d: %s", resp.StatusCode, respBytes)
} }
return fromAnthropicResponse(respBytes) result, err := fromAnthropicResponse(respBytes)
if err != nil {
log.Error("llm_error", "provider", "anthropic", logger.FieldDurationMS, ms, "err", err)
return result, err
}
log.Info("llm_response",
"provider", "anthropic",
"model", req.Model,
logger.FieldDurationMS, ms,
logger.FieldTokensUsed, result.Usage.TotalTokens,
"input_tokens", result.Usage.InputTokens,
"output_tokens", result.Usage.OutputTokens,
"tool_calls", len(result.ToolCalls),
"finish_reason", result.FinishReason,
)
return result, nil
} }
} }
+8 -5
View File
@@ -3,24 +3,26 @@ package llm
import ( import (
"context" "context"
"fmt" "fmt"
"log/slog"
"github.com/enmanuel/agents/internal/config" "github.com/enmanuel/agents/internal/config"
coretypes "github.com/enmanuel/agents/pkg/llm" coretypes "github.com/enmanuel/agents/pkg/llm"
) )
// FromConfig builds a CompleteFunc from an LLMProviderCfg. // FromConfig builds a CompleteFunc from an LLMProviderCfg.
func FromConfig(cfg config.LLMProviderCfg) (coretypes.CompleteFunc, error) { func FromConfig(cfg config.LLMProviderCfg, log *slog.Logger) (coretypes.CompleteFunc, error) {
log.Info("llm_provider_init", "provider", cfg.Provider, "model", cfg.Model)
switch cfg.Provider { switch cfg.Provider {
case "anthropic": case "anthropic":
return NewAnthropicComplete(cfg.APIKeyEnv, cfg.BaseURL), nil return NewAnthropicComplete(cfg.APIKeyEnv, cfg.BaseURL, log), nil
case "openai": case "openai":
return NewOpenAIComplete(cfg.APIKeyEnv, cfg.BaseURL), nil return NewOpenAIComplete(cfg.APIKeyEnv, cfg.BaseURL, log), nil
case "ollama": case "ollama":
base := cfg.BaseURL base := cfg.BaseURL
if base == "" { if base == "" {
base = "http://localhost:11434/v1" base = "http://localhost:11434/v1"
} }
return NewOpenAIComplete("OLLAMA_API_KEY", base), nil return NewOpenAIComplete("OLLAMA_API_KEY", base, log), nil
default: default:
return nil, fmt.Errorf("unknown LLM provider: %s", cfg.Provider) return nil, fmt.Errorf("unknown LLM provider: %s", cfg.Provider)
} }
@@ -28,10 +30,11 @@ func FromConfig(cfg config.LLMProviderCfg) (coretypes.CompleteFunc, error) {
// WithFallback wraps primary with a fallback CompleteFunc. // WithFallback wraps primary with a fallback CompleteFunc.
// If primary returns an error, fallback is tried. // If primary returns an error, fallback is tried.
func WithFallback(primary, fallback coretypes.CompleteFunc) coretypes.CompleteFunc { func WithFallback(primary, fallback coretypes.CompleteFunc, log *slog.Logger) coretypes.CompleteFunc {
return func(ctx context.Context, req coretypes.CompletionRequest) (coretypes.CompletionResponse, error) { return func(ctx context.Context, req coretypes.CompletionRequest) (coretypes.CompletionResponse, error) {
resp, err := primary(ctx, req) resp, err := primary(ctx, req)
if err != nil { if err != nil {
log.Warn("llm_fallback_triggered", "primary_err", err)
return fallback(ctx, req) return fallback(ctx, req)
} }
return resp, nil return resp, nil
+28 -1
View File
@@ -4,16 +4,19 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log/slog"
"os" "os"
"time"
openai "github.com/sashabaranov/go-openai" openai "github.com/sashabaranov/go-openai"
coretypes "github.com/enmanuel/agents/pkg/llm" coretypes "github.com/enmanuel/agents/pkg/llm"
"github.com/enmanuel/agents/shell/logger"
) )
// NewOpenAIComplete returns a CompleteFunc backed by the OpenAI-compatible API. // NewOpenAIComplete returns a CompleteFunc backed by the OpenAI-compatible API.
// Works with OpenAI, Ollama, vLLM, LMStudio — just change baseURL. // Works with OpenAI, Ollama, vLLM, LMStudio — just change baseURL.
func NewOpenAIComplete(apiKeyEnv, baseURL string) coretypes.CompleteFunc { func NewOpenAIComplete(apiKeyEnv, baseURL string, log *slog.Logger) coretypes.CompleteFunc {
return func(ctx context.Context, req coretypes.CompletionRequest) (coretypes.CompletionResponse, error) { return func(ctx context.Context, req coretypes.CompletionRequest) (coretypes.CompletionResponse, error) {
apiKey := os.Getenv(apiKeyEnv) apiKey := os.Getenv(apiKeyEnv)
if apiKey == "" { if apiKey == "" {
@@ -49,11 +52,24 @@ func NewOpenAIComplete(apiKeyEnv, baseURL string) coretypes.CompleteFunc {
openReq.Tools = toOpenAITools(req.Tools) openReq.Tools = toOpenAITools(req.Tools)
} }
log.Info("llm_request",
"provider", "openai",
"model", req.Model,
"messages", len(req.Messages),
"tools", len(req.Tools),
)
start := time.Now()
resp, err := client.CreateChatCompletion(ctx, openReq) resp, err := client.CreateChatCompletion(ctx, openReq)
if err != nil { if err != nil {
ms := time.Since(start).Milliseconds()
log.Error("llm_error", "provider", "openai", logger.FieldDurationMS, ms, "err", err)
return coretypes.CompletionResponse{}, fmt.Errorf("openai completion: %w", err) return coretypes.CompletionResponse{}, fmt.Errorf("openai completion: %w", err)
} }
ms := time.Since(start).Milliseconds()
if len(resp.Choices) == 0 { if len(resp.Choices) == 0 {
log.Error("llm_error", "provider", "openai", logger.FieldDurationMS, ms, "err", "empty choices")
return coretypes.CompletionResponse{}, fmt.Errorf("openai: empty choices") return coretypes.CompletionResponse{}, fmt.Errorf("openai: empty choices")
} }
@@ -67,6 +83,17 @@ func NewOpenAIComplete(apiKeyEnv, baseURL string) coretypes.CompleteFunc {
}) })
} }
log.Info("llm_response",
"provider", "openai",
"model", req.Model,
logger.FieldDurationMS, ms,
logger.FieldTokensUsed, resp.Usage.TotalTokens,
"input_tokens", resp.Usage.PromptTokens,
"output_tokens", resp.Usage.CompletionTokens,
"tool_calls", len(toolCalls),
"finish_reason", string(choice.FinishReason),
)
return coretypes.CompletionResponse{ return coretypes.CompletionResponse{
Content: choice.Message.Content, Content: choice.Message.Content,
ToolCalls: toolCalls, ToolCalls: toolCalls,
+19 -3
View File
@@ -5,6 +5,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"fmt" "fmt"
"log/slog"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
@@ -38,11 +39,14 @@ CREATE INDEX IF NOT EXISTS idx_facts_subject ON facts(agent_id, subject);
// SQLiteStore implements memory.Store using SQLite. // SQLiteStore implements memory.Store using SQLite.
type SQLiteStore struct { type SQLiteStore struct {
db *sql.DB db *sql.DB
logger *slog.Logger
} }
// New opens (or creates) a SQLite database at dbPath and runs migrations. // New opens (or creates) a SQLite database at dbPath and runs migrations.
func New(dbPath string) (*SQLiteStore, error) { func New(dbPath string, logger *slog.Logger) (*SQLiteStore, error) {
log := logger.With("component", "memory", "db_path", dbPath)
log.Info("memory_open")
if err := os.MkdirAll(filepath.Dir(dbPath), 0o755); err != nil { if err := os.MkdirAll(filepath.Dir(dbPath), 0o755); err != nil {
return nil, fmt.Errorf("create memory db dir: %w", err) return nil, fmt.Errorf("create memory db dir: %w", err)
} }
@@ -54,15 +58,20 @@ func New(dbPath string) (*SQLiteStore, error) {
db.Close() db.Close()
return nil, fmt.Errorf("migrate memory db: %w", err) return nil, fmt.Errorf("migrate memory db: %w", err)
} }
return &SQLiteStore{db: db}, nil log.Info("memory_ready")
return &SQLiteStore{db: db, logger: log}, nil
} }
func (s *SQLiteStore) SaveFact(ctx context.Context, f memory.Fact) error { func (s *SQLiteStore) SaveFact(ctx context.Context, f memory.Fact) error {
s.logger.Debug("memory_save_fact", "subject", f.Subject, "key", f.Key)
_, err := s.db.ExecContext(ctx, _, err := s.db.ExecContext(ctx,
`INSERT OR REPLACE INTO facts (agent_id, subject, key, value, updated_at) `INSERT OR REPLACE INTO facts (agent_id, subject, key, value, updated_at)
VALUES (?, ?, ?, ?, ?)`, VALUES (?, ?, ?, ?, ?)`,
f.AgentID, f.Subject, f.Key, f.Value, time.Now().UTC(), f.AgentID, f.Subject, f.Key, f.Value, time.Now().UTC(),
) )
if err != nil {
s.logger.Error("memory_save_fact_error", "subject", f.Subject, "key", f.Key, "err", err)
}
return err return err
} }
@@ -95,6 +104,7 @@ func (s *SQLiteStore) RecallFacts(ctx context.Context, agentID, subject string,
} }
facts = append(facts, f) facts = append(facts, f)
} }
s.logger.Debug("memory_recall", "subject", subject, "count", len(facts))
return facts, rows.Err() return facts, rows.Err()
} }
@@ -114,11 +124,15 @@ func (s *SQLiteStore) DeleteFacts(ctx context.Context, agentID, subject string,
} }
func (s *SQLiteStore) SaveMessage(ctx context.Context, m memory.HistoryMessage) error { func (s *SQLiteStore) SaveMessage(ctx context.Context, m memory.HistoryMessage) error {
s.logger.Debug("memory_save_msg", "room", m.RoomID, "role", m.Role)
_, err := s.db.ExecContext(ctx, _, err := s.db.ExecContext(ctx,
`INSERT INTO messages (agent_id, room_id, role, content, created_at) `INSERT INTO messages (agent_id, room_id, role, content, created_at)
VALUES (?, ?, ?, ?, ?)`, VALUES (?, ?, ?, ?, ?)`,
m.AgentID, m.RoomID, string(m.Role), m.Content, time.Now().UTC(), m.AgentID, m.RoomID, string(m.Role), m.Content, time.Now().UTC(),
) )
if err != nil {
s.logger.Error("memory_save_msg_error", "room", m.RoomID, "err", err)
}
return err return err
} }
@@ -152,6 +166,7 @@ func (s *SQLiteStore) LoadMessages(ctx context.Context, agentID, roomID string,
for i, j := 0, len(msgs)-1; i < j; i, j = i+1, j-1 { for i, j := 0, len(msgs)-1; i < j; i, j = i+1, j-1 {
msgs[i], msgs[j] = msgs[j], msgs[i] msgs[i], msgs[j] = msgs[j], msgs[i]
} }
s.logger.Debug("memory_load_msgs", "room", roomID, "count", len(msgs))
return msgs, nil return msgs, nil
} }
@@ -171,5 +186,6 @@ func (s *SQLiteStore) DeleteMessages(ctx context.Context, agentID string, roomID
} }
func (s *SQLiteStore) Close() error { func (s *SQLiteStore) Close() error {
s.logger.Info("memory_closed")
return s.db.Close() return s.db.Close()
} }
+1 -1
View File
@@ -61,7 +61,7 @@ type Orchestrator struct {
// New creates an Orchestrator from its config. // New creates an Orchestrator from its config.
func New(cfg *config.SpecialConfig, agentBus *bus.Bus, logger *slog.Logger) (*Orchestrator, error) { func New(cfg *config.SpecialConfig, agentBus *bus.Bus, logger *slog.Logger) (*Orchestrator, error) {
llmFunc, err := shelllm.FromConfig(cfg.LLM.Primary) llmFunc, err := shelllm.FromConfig(cfg.LLM.Primary, logger.With("component", "llm"))
if err != nil { if err != nil {
return nil, fmt.Errorf("orchestrator LLM: %w", err) return nil, fmt.Errorf("orchestrator LLM: %w", err)
} }
+26 -3
View File
@@ -5,6 +5,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"log/slog"
"net" "net"
"os" "os"
"time" "time"
@@ -13,6 +14,7 @@ import (
"github.com/enmanuel/agents/internal/config" "github.com/enmanuel/agents/internal/config"
"github.com/enmanuel/agents/pkg/tools" "github.com/enmanuel/agents/pkg/tools"
"github.com/enmanuel/agents/shell/logger"
) )
// Result holds the output of an SSH command execution. // Result holds the output of an SSH command execution.
@@ -25,22 +27,32 @@ type Result struct {
// Executor runs SSH commands against configured targets. // Executor runs SSH commands against configured targets.
type Executor struct { type Executor struct {
cfg config.SSHCfg cfg config.SSHCfg
logger *slog.Logger
} }
// NewExecutor creates an Executor from the SSH config section. // NewExecutor creates an Executor from the SSH config section.
func NewExecutor(cfg config.SSHCfg) *Executor { func NewExecutor(cfg config.SSHCfg, log *slog.Logger) *Executor {
return &Executor{cfg: cfg} return &Executor{cfg: cfg, logger: log.With(logger.FieldComponent, "ssh")}
} }
// Execute runs the SSH command described by spec. Impure. // Execute runs the SSH command described by spec. Impure.
func (e *Executor) Execute(ctx context.Context, spec tools.SSHCommandSpec) Result { func (e *Executor) Execute(ctx context.Context, spec tools.SSHCommandSpec) Result {
cmdPreview := spec.Command
if len(cmdPreview) > 80 {
cmdPreview = cmdPreview[:80] + "..."
}
e.logger.Info("ssh_exec_start", "target", spec.Target, "command", cmdPreview)
start := time.Now()
target, ok := e.cfg.Targets[spec.Target] target, ok := e.cfg.Targets[spec.Target]
if !ok { if !ok {
e.logger.Error("ssh_exec_error", "target", spec.Target, "err", "unknown target")
return Result{Err: fmt.Errorf("unknown SSH target: %s", spec.Target)} return Result{Err: fmt.Errorf("unknown SSH target: %s", spec.Target)}
} }
if len(target.Hosts) == 0 { if len(target.Hosts) == 0 {
e.logger.Error("ssh_exec_error", "target", spec.Target, "err", "no hosts")
return Result{Err: fmt.Errorf("no hosts for target: %s", spec.Target)} return Result{Err: fmt.Errorf("no hosts for target: %s", spec.Target)}
} }
@@ -65,6 +77,8 @@ func (e *Executor) Execute(ctx context.Context, spec tools.SSHCommandSpec) Resul
signer, err := loadSigner(keyEnv) signer, err := loadSigner(keyEnv)
if err != nil { if err != nil {
ms := time.Since(start).Milliseconds()
e.logger.Error("ssh_exec_error", "target", spec.Target, logger.FieldDurationMS, ms, "err", err)
return Result{Err: fmt.Errorf("load SSH key: %w", err)} return Result{Err: fmt.Errorf("load SSH key: %w", err)}
} }
@@ -81,12 +95,16 @@ func (e *Executor) Execute(ctx context.Context, spec tools.SSHCommandSpec) Resul
addr := fmt.Sprintf("%s:%d", host, port) addr := fmt.Sprintf("%s:%d", host, port)
conn, err := gossh.Dial("tcp", addr, sshCfg) conn, err := gossh.Dial("tcp", addr, sshCfg)
if err != nil { if err != nil {
ms := time.Since(start).Milliseconds()
e.logger.Error("ssh_exec_error", "target", spec.Target, "host", addr, logger.FieldDurationMS, ms, "err", err)
return Result{Err: fmt.Errorf("ssh dial %s: %w", addr, err)} return Result{Err: fmt.Errorf("ssh dial %s: %w", addr, err)}
} }
defer conn.Close() defer conn.Close()
session, err := conn.NewSession() session, err := conn.NewSession()
if err != nil { if err != nil {
ms := time.Since(start).Milliseconds()
e.logger.Error("ssh_exec_error", "target", spec.Target, logger.FieldDurationMS, ms, "err", err)
return Result{Err: fmt.Errorf("ssh session: %w", err)} return Result{Err: fmt.Errorf("ssh session: %w", err)}
} }
defer session.Close() defer session.Close()
@@ -102,17 +120,22 @@ func (e *Executor) Execute(ctx context.Context, spec tools.SSHCommandSpec) Resul
select { select {
case <-ctx.Done(): case <-ctx.Done():
session.Signal(gossh.SIGTERM) session.Signal(gossh.SIGTERM)
ms := time.Since(start).Milliseconds()
e.logger.Warn("ssh_exec_cancelled", "target", spec.Target, logger.FieldDurationMS, ms)
return Result{Err: ctx.Err()} return Result{Err: ctx.Err()}
case err := <-done: case err := <-done:
ms := time.Since(start).Milliseconds()
code := 0 code := 0
if err != nil { if err != nil {
var exitErr *gossh.ExitError var exitErr *gossh.ExitError
if ok := asExitError(err, &exitErr); ok { if ok := asExitError(err, &exitErr); ok {
code = exitErr.ExitStatus() code = exitErr.ExitStatus()
} else { } else {
e.logger.Error("ssh_exec_error", "target", spec.Target, logger.FieldDurationMS, ms, "err", err)
return Result{Err: err} return Result{Err: err}
} }
} }
e.logger.Info("ssh_exec_end", "target", spec.Target, "exit_code", code, logger.FieldDurationMS, ms)
return Result{ return Result{
Stdout: stdout.String(), Stdout: stdout.String(),
Stderr: stderr.String(), Stderr: stderr.String(),
+25 -4
View File
@@ -4,24 +4,32 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log/slog"
"sort" "sort"
"time"
coretypes "github.com/enmanuel/agents/pkg/llm" coretypes "github.com/enmanuel/agents/pkg/llm"
"github.com/enmanuel/agents/shell/logger"
) )
// Registry holds available tools keyed by name. // Registry holds available tools keyed by name.
type Registry struct { type Registry struct {
tools map[string]Tool tools map[string]Tool
logger *slog.Logger
} }
// NewRegistry creates an empty registry. // NewRegistry creates an empty registry.
func NewRegistry() *Registry { func NewRegistry(log *slog.Logger) *Registry {
return &Registry{tools: make(map[string]Tool)} return &Registry{
tools: make(map[string]Tool),
logger: log.With(logger.FieldComponent, "tools"),
}
} }
// Register adds a tool to the registry. // Register adds a tool to the registry.
func (r *Registry) Register(t Tool) { func (r *Registry) Register(t Tool) {
r.tools[t.Def.Name] = t r.tools[t.Def.Name] = t
r.logger.Debug("tool_registered", "name", t.Def.Name)
} }
// Get looks up a tool by name. // Get looks up a tool by name.
@@ -49,17 +57,30 @@ func (r *Registry) Len() int {
func (r *Registry) Execute(ctx context.Context, name string, argsJSON string) Result { func (r *Registry) Execute(ctx context.Context, name string, argsJSON string) Result {
t, ok := r.tools[name] t, ok := r.tools[name]
if !ok { if !ok {
r.logger.Warn("tool_not_found", "tool", name)
return Result{Err: fmt.Errorf("tool %q not found", name)} return Result{Err: fmt.Errorf("tool %q not found", name)}
} }
var args map[string]any var args map[string]any
if argsJSON != "" { if argsJSON != "" {
if err := json.Unmarshal([]byte(argsJSON), &args); err != nil { if err := json.Unmarshal([]byte(argsJSON), &args); err != nil {
r.logger.Warn("tool_args_invalid", "tool", name, "err", err)
return Result{Err: fmt.Errorf("parse args for %q: %w", name, err)} return Result{Err: fmt.Errorf("parse args for %q: %w", name, err)}
} }
} }
return t.Exec(ctx, args) r.logger.Info("tool_exec_start", "tool", name)
start := time.Now()
result := t.Exec(ctx, args)
ms := time.Since(start).Milliseconds()
if result.Err != nil {
r.logger.Warn("tool_exec_error", "tool", name, "err", result.Err, logger.FieldDurationMS, ms)
} else {
r.logger.Info("tool_exec_end", "tool", name, logger.FieldDurationMS, ms)
}
return result
} }
// ToLLMSpecs converts all registered tools to the LLM-compatible ToolSpec format. // ToLLMSpecs converts all registered tools to the LLM-compatible ToolSpec format.