diff --git a/agents/runtime.go b/agents/runtime.go index 19ab667..9eacd54 100644 --- a/agents/runtime.go +++ b/agents/runtime.go @@ -24,6 +24,7 @@ import ( "github.com/enmanuel/agents/pkg/personality" "github.com/enmanuel/agents/pkg/sanitize" "github.com/enmanuel/agents/shell/bus" + shellcron "github.com/enmanuel/agents/shell/cron" "github.com/enmanuel/agents/shell/effects" shellknowledge "github.com/enmanuel/agents/shell/knowledge" shelllm "github.com/enmanuel/agents/shell/llm" @@ -93,6 +94,9 @@ type Agent struct { // Bus — set via SetBus() when running under the unified launcher agentBus *bus.Bus + + // Scheduler — nil when no schedules are configured + scheduler *shellcron.Scheduler } // ClearWindow resets the conversation window for a room and deletes persisted @@ -308,6 +312,12 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (* toolReg.Register(toolmemory.NewMemoryClearContext(a, roomCtx)) } + // Cron scheduler — only when schedules are configured + if len(cfg.Schedules) > 0 { + a.scheduler = shellcron.New(cfg.Schedules, matrixClient, llmFunc, cfg.LLM.Primary.Model, logger) + logger.Info("cron scheduler configured", "schedules", len(cfg.Schedules)) + } + // Matrix event listener a.listener = matrix.NewListener(matrixClient, cfg.Matrix, a.handleEvent, logger) @@ -420,6 +430,11 @@ func (a *Agent) Run(ctx context.Context) error { a.logger.Info("bus listener started") } + // Start cron scheduler in background goroutine (blocks until ctx cancelled) + if a.scheduler != nil { + go a.scheduler.Start(ctx) + } + return a.listener.Run(ctx) } diff --git a/dev/issues/README.md b/dev/issues/README.md index 1684d6f..23bc6eb 100644 --- a/dev/issues/README.md +++ b/dev/issues/README.md @@ -9,7 +9,7 @@ afectados y notas de implementacion. | 2 | Memoria para los bots | [0002-bot-memory.md](completed/0002-bot-memory.md) | completado | | 3 | Interaccion entre bots | [0003-bot-interaction.md](completed/0003-bot-interaction.md) | completado | | 4 | Fotos de perfil | [0004-bot-avatar.md](completed/0004-bot-avatar.md) | completado | -| 5 | Cron scheduler | [0005-bot-cron.md](0005-bot-cron.md) | pendiente | +| 5 | Cron scheduler | [0005-bot-cron.md](completed/0005-bot-cron.md) | completado | | 6 | Anadir Claude provider | [0006-anadir-claude-p.md](completed/0006-añadir-claude-p.md) | completado | | 7 | Logs mejorados | [0007-logs-mejorados.md](completed/0007-logs-mejorados.md) | completado | | 8 | Knowledge por agente | [0008-knowledge_por_agente.md](completed/0008-knowledge_por_agente.md) | completado | diff --git a/dev/issues/0005-bot-cron.md b/dev/issues/completed/0005-bot-cron.md similarity index 100% rename from dev/issues/0005-bot-cron.md rename to dev/issues/completed/0005-bot-cron.md diff --git a/go.mod b/go.mod index 061ecf7..2ce3317 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rs/zerolog v1.33.0 // indirect github.com/spf13/cast v1.7.1 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index cbe556d..7778810 100644 --- a/go.sum +++ b/go.sum @@ -79,6 +79,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qq github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= diff --git a/internal/config/schema.go b/internal/config/schema.go index b461d1c..41df2e5 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -340,6 +340,11 @@ type ScheduledAction struct { Target string `yaml:"target"` Command string `yaml:"command"` Script string `yaml:"script"` + + // Phase 1: send_message and llm_prompt fields + Message string `yaml:"message"` // inline text for send_message + Template string `yaml:"template"` // path to .md file for send_message + Prompt string `yaml:"prompt"` // inline prompt text for llm_prompt } type FailureAction struct { diff --git a/shell/cron/actions.go b/shell/cron/actions.go new file mode 100644 index 0000000..f62d288 --- /dev/null +++ b/shell/cron/actions.go @@ -0,0 +1,116 @@ +package cron + +import ( + "context" + "fmt" + "os" + "strings" + + "github.com/enmanuel/agents/internal/config" + coretypes "github.com/enmanuel/agents/pkg/llm" +) + +const actionKindSendMessage = "send_message" +const actionKindLLMPrompt = "llm_prompt" + +// handler is a function that fires when a schedule triggers. +type handler func(ctx context.Context, room string) + +// buildHandler returns the handler for a schedule, or nil for unsupported kinds. +func (s *Scheduler) buildHandler(sc config.ScheduleCfg) handler { + switch sc.Action.Kind { + case actionKindSendMessage: + return s.sendMessageHandler(sc) + case actionKindLLMPrompt: + return s.llmPromptHandler(sc) + default: + return nil + } +} + +// sendMessageHandler returns a handler that sends a static message to a Matrix room. +// The message content is resolved in priority order: Message > Template file. +func (s *Scheduler) sendMessageHandler(sc config.ScheduleCfg) handler { + return func(ctx context.Context, room string) { + content, err := resolveContent(sc.Action.Message, sc.Action.Template) + if err != nil { + s.logger.Error("send_message: failed to resolve content", + "name", sc.Name, "err", err) + return + } + if content == "" { + s.logger.Warn("send_message: empty content, skipping", "name", sc.Name) + return + } + + s.logger.Info("cron_fire", "name", sc.Name, "kind", actionKindSendMessage, "room", room) + if err := s.matrix.SendMarkdown(ctx, room, content); err != nil { + s.logger.Error("send_message: matrix send failed", + "name", sc.Name, "room", room, "err", err) + } + } +} + +// llmPromptHandler returns a handler that calls the LLM with a prompt and sends +// the response to a Matrix room. +func (s *Scheduler) llmPromptHandler(sc config.ScheduleCfg) handler { + return func(ctx context.Context, room string) { + if s.llm == nil { + s.logger.Warn("llm_prompt: no LLM configured, skipping", "name", sc.Name) + return + } + + prompt, err := resolveContent(sc.Action.Prompt, sc.Action.Template) + if err != nil { + s.logger.Error("llm_prompt: failed to resolve prompt", + "name", sc.Name, "err", err) + return + } + if prompt == "" { + s.logger.Warn("llm_prompt: empty prompt, skipping", "name", sc.Name) + return + } + + s.logger.Info("cron_fire", "name", sc.Name, "kind", actionKindLLMPrompt, "room", room) + + req := coretypes.CompletionRequest{ + Model: s.model, + Messages: []coretypes.Message{ + {Role: coretypes.RoleUser, Content: prompt}, + }, + } + + resp, err := s.llm(ctx, req) + if err != nil { + s.logger.Error("llm_prompt: LLM call failed", + "name", sc.Name, "err", err) + return + } + + content := strings.TrimSpace(resp.Content) + if content == "" { + s.logger.Warn("llm_prompt: LLM returned empty response", "name", sc.Name) + return + } + + if err := s.matrix.SendMarkdown(ctx, room, content); err != nil { + s.logger.Error("llm_prompt: matrix send failed", + "name", sc.Name, "room", room, "err", err) + } + } +} + +// resolveContent returns the inline text if non-empty, otherwise reads the file at templatePath. +func resolveContent(inline, templatePath string) (string, error) { + if inline != "" { + return inline, nil + } + if templatePath == "" { + return "", nil + } + data, err := os.ReadFile(templatePath) + if err != nil { + return "", fmt.Errorf("reading template %q: %w", templatePath, err) + } + return strings.TrimSpace(string(data)), nil +} diff --git a/shell/cron/scheduler.go b/shell/cron/scheduler.go new file mode 100644 index 0000000..d2ead76 --- /dev/null +++ b/shell/cron/scheduler.go @@ -0,0 +1,93 @@ +// Package cron provides a scheduler for autonomous bot activity. +// It is part of the impure shell: it reads files, calls LLMs, and sends Matrix messages. +package cron + +import ( + "context" + "log/slog" + + "github.com/robfig/cron/v3" + + "github.com/enmanuel/agents/internal/config" + coretypes "github.com/enmanuel/agents/pkg/llm" +) + +// MatrixSender is the subset of matrix.Client needed by the scheduler. +type MatrixSender interface { + SendMarkdown(ctx context.Context, roomID, markdown string) error +} + +// Scheduler fires configured schedules and executes send_message or llm_prompt actions. +type Scheduler struct { + cfg []config.ScheduleCfg + matrix MatrixSender + llm coretypes.CompleteFunc // nil when agent has no LLM + model string + logger *slog.Logger + cron *cron.Cron +} + +// New creates a Scheduler. llm and model are optional (nil/empty for agents without LLM). +func New( + cfg []config.ScheduleCfg, + matrix MatrixSender, + llm coretypes.CompleteFunc, + model string, + logger *slog.Logger, +) *Scheduler { + return &Scheduler{ + cfg: cfg, + matrix: matrix, + llm: llm, + model: model, + logger: logger.With("component", "cron"), + cron: cron.New(), + } +} + +// Start registers all schedules and starts the cron loop. +// It returns when ctx is cancelled, stopping the cron runner. +func (s *Scheduler) Start(ctx context.Context) { + for _, sc := range s.cfg { + sc := sc // capture range var + if sc.Cron == "" || sc.Action.Kind == "" { + s.logger.Warn("skipping invalid schedule", "name", sc.Name, "cron", sc.Cron, "kind", sc.Action.Kind) + continue + } + + room := sc.OutputRoom + if room == "" { + s.logger.Warn("schedule has no output_room, skipping", "name", sc.Name) + continue + } + + handler := s.buildHandler(sc) + if handler == nil { + s.logger.Warn("unsupported action kind, skipping", "name", sc.Name, "kind", sc.Action.Kind) + continue + } + + _, err := s.cron.AddFunc(sc.Cron, func() { + handler(ctx, room) + }) + if err != nil { + s.logger.Error("failed to register schedule", + "name", sc.Name, + "cron", sc.Cron, + "err", err, + ) + continue + } + + s.logger.Info("schedule registered", "name", sc.Name, "cron", sc.Cron, "kind", sc.Action.Kind, "room", room) + } + + s.cron.Start() + s.logger.Info("cron scheduler started", "schedules", len(s.cfg)) + + <-ctx.Done() + s.logger.Info("cron scheduler stopping") + cronCtx := s.cron.Stop() + <-cronCtx.Done() + s.logger.Info("cron scheduler stopped") +} diff --git a/shell/cron/scheduler_test.go b/shell/cron/scheduler_test.go new file mode 100644 index 0000000..12cb5ae --- /dev/null +++ b/shell/cron/scheduler_test.go @@ -0,0 +1,271 @@ +package cron_test + +import ( + "context" + "errors" + "log/slog" + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/enmanuel/agents/internal/config" + coretypes "github.com/enmanuel/agents/pkg/llm" + shellcron "github.com/enmanuel/agents/shell/cron" +) + +// ── fakes ────────────────────────────────────────────────────────────────── + +type fakeSender struct { + calls atomic.Int32 + lastMD string + lastRM string +} + +func (f *fakeSender) SendMarkdown(_ context.Context, room, md string) error { + f.calls.Add(1) + f.lastRM = room + f.lastMD = md + return nil +} + +type errSender struct{} + +func (e *errSender) SendMarkdown(_ context.Context, _, _ string) error { + return errors.New("matrix unavailable") +} + +func fakeLLM(reply string) coretypes.CompleteFunc { + return func(_ context.Context, _ coretypes.CompletionRequest) (coretypes.CompletionResponse, error) { + return coretypes.CompletionResponse{Content: reply}, nil + } +} + +func newTestLogger(t *testing.T) *slog.Logger { + t.Helper() + return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) +} + +// ── helpers ──────────────────────────────────────────────────────────────── + +// waitCalls blocks until the sender has received at least n calls or the deadline passes. +func waitCalls(t *testing.T, f *fakeSender, n int32) { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if f.calls.Load() >= n { + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("expected %d call(s) to SendMarkdown, got %d", n, f.calls.Load()) +} + +// ── tests ────────────────────────────────────────────────────────────────── + +func TestScheduler_SendMessage_Inline(t *testing.T) { + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "test-inline", + Cron: "@every 100ms", + OutputRoom: "!room:server.com", + Action: config.ScheduledAction{ + Kind: "send_message", + Message: "hola mundo", + }, + }, + } + + s := shellcron.New(cfg, sender, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + waitCalls(t, sender, 1) + cancel() + <-done + + if sender.lastRM != "!room:server.com" { + t.Errorf("unexpected room: %s", sender.lastRM) + } + if sender.lastMD != "hola mundo" { + t.Errorf("unexpected message: %s", sender.lastMD) + } +} + +func TestScheduler_SendMessage_Template(t *testing.T) { + // Write a temporary template file + dir := t.TempDir() + tmpl := filepath.Join(dir, "greeting.md") + if err := os.WriteFile(tmpl, []byte("buenos días"), 0o600); err != nil { + t.Fatal(err) + } + + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "test-template", + Cron: "@every 100ms", + OutputRoom: "!room2:server.com", + Action: config.ScheduledAction{ + Kind: "send_message", + Template: tmpl, + }, + }, + } + + s := shellcron.New(cfg, sender, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + waitCalls(t, sender, 1) + cancel() + <-done + + if sender.lastMD != "buenos días" { + t.Errorf("unexpected message: %q", sender.lastMD) + } +} + +func TestScheduler_LLMPrompt(t *testing.T) { + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "test-llm", + Cron: "@every 100ms", + OutputRoom: "!room3:server.com", + Action: config.ScheduledAction{ + Kind: "llm_prompt", + Prompt: "resume el día", + }, + }, + } + + llm := fakeLLM("resumen generado por LLM") + s := shellcron.New(cfg, sender, llm, "gpt-4o", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + waitCalls(t, sender, 1) + cancel() + <-done + + if sender.lastMD != "resumen generado por LLM" { + t.Errorf("unexpected LLM reply: %q", sender.lastMD) + } +} + +func TestScheduler_LLMPrompt_NoLLM(t *testing.T) { + // When no LLM is configured, llm_prompt should be skipped gracefully (no panic). + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "no-llm", + Cron: "@every 100ms", + OutputRoom: "!room:server.com", + Action: config.ScheduledAction{ + Kind: "llm_prompt", + Prompt: "hello", + }, + }, + } + + s := shellcron.New(cfg, sender, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + // Wait a bit to confirm nothing is sent + time.Sleep(350 * time.Millisecond) + cancel() + <-done + + if sender.calls.Load() != 0 { + t.Errorf("expected 0 calls without LLM, got %d", sender.calls.Load()) + } +} + +func TestScheduler_SkipsInvalidSchedule(t *testing.T) { + // Schedules without output_room or without action kind must be skipped silently. + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "no-room", + Cron: "@every 100ms", + // missing OutputRoom + Action: config.ScheduledAction{Kind: "send_message", Message: "hi"}, + }, + { + Name: "no-kind", + Cron: "@every 100ms", + OutputRoom: "!room:server.com", + // missing Action.Kind + }, + } + + s := shellcron.New(cfg, sender, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + time.Sleep(350 * time.Millisecond) + cancel() + <-done + + if sender.calls.Load() != 0 { + t.Errorf("expected 0 calls for invalid schedules, got %d", sender.calls.Load()) + } +} + +func TestScheduler_MatrixSendError(t *testing.T) { + // If matrix.SendMarkdown returns an error, the scheduler should log it and not panic. + cfg := []config.ScheduleCfg{ + { + Name: "err-send", + Cron: "@every 100ms", + OutputRoom: "!room:server.com", + Action: config.ScheduledAction{ + Kind: "send_message", + Message: "trigger error", + }, + }, + } + + s := shellcron.New(cfg, &errSender{}, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + // Let it fire at least once without panicking + time.Sleep(250 * time.Millisecond) + cancel() + <-done +}