From 46d85109fe111d8e13482912d34bc1672d2f1a23 Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Sun, 8 Mar 2026 19:00:18 +0000 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20a=C3=B1adir=20campo=20Message/Templ?= =?UTF-8?q?ate/Prompt=20a=20ScheduledAction=20y=20dependencia=20cron?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Se añaden tres campos nuevos a ScheduledAction en internal/config/schema.go: - Message: texto inline para send_message - Template: ruta a archivo .md para send_message o llm_prompt - Prompt: texto inline del prompt para llm_prompt Se agrega github.com/robfig/cron/v3 v3.0.1 como dependencia. No hay cambios de ruptura: los campos son opcionales y el schema existente sigue siendo compatible. --- go.mod | 1 + go.sum | 2 ++ internal/config/schema.go | 5 +++++ 3 files changed, 8 insertions(+) diff --git a/go.mod b/go.mod index 061ecf7..2ce3317 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rs/zerolog v1.33.0 // indirect github.com/spf13/cast v1.7.1 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index cbe556d..7778810 100644 --- a/go.sum +++ b/go.sum @@ -79,6 +79,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qq github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= diff --git a/internal/config/schema.go b/internal/config/schema.go index b461d1c..41df2e5 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -340,6 +340,11 @@ type ScheduledAction struct { Target string `yaml:"target"` Command string `yaml:"command"` Script string `yaml:"script"` + + // Phase 1: send_message and llm_prompt fields + Message string `yaml:"message"` // inline text for send_message + Template string `yaml:"template"` // path to .md file for send_message + Prompt string `yaml:"prompt"` // inline prompt text for llm_prompt } type FailureAction struct { From 4dfc6cf0b9201a9740cb4c22f965ae8cc0342f5e Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Sun, 8 Mar 2026 19:00:32 +0000 Subject: [PATCH 2/4] =?UTF-8?q?feat:=20implementar=20shell/cron=20?= =?UTF-8?q?=E2=80=94=20scheduler=20aut=C3=B3nomo=20para=20bots?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Nuevo paquete shell/cron con dos archivos: shell/cron/scheduler.go — Scheduler struct con método Start(ctx) que: - Registra todas las entradas de config.ScheduleCfg como jobs de robfig/cron - Omite schedules sin output_room o sin action.kind (warn en log) - Bloquea hasta que ctx sea cancelado, luego detiene el cron limpiamente - Recibe MatrixSender, CompleteFunc y *slog.Logger como dependencias (sin importar agents/) shell/cron/actions.go — ejecutores para fase 1: - send_message: resuelve contenido desde Message (inline) o Template (archivo .md), luego llama a matrix.SendMarkdown - llm_prompt: resuelve prompt desde Prompt o Template, llama al LLM y envía la respuesta al room configurado; no-op silencioso si no hay LLM resolveContent() prioriza texto inline sobre ruta de archivo, lo que permite tanto mensajes cortos en YAML como prompts largos en archivos .md separados. Fase 2 (run_tool) y fase 3 (inter-bot) quedan pendientes según el issue. --- shell/cron/actions.go | 116 +++++++++++++++ shell/cron/scheduler.go | 93 ++++++++++++ shell/cron/scheduler_test.go | 271 +++++++++++++++++++++++++++++++++++ 3 files changed, 480 insertions(+) create mode 100644 shell/cron/actions.go create mode 100644 shell/cron/scheduler.go create mode 100644 shell/cron/scheduler_test.go diff --git a/shell/cron/actions.go b/shell/cron/actions.go new file mode 100644 index 0000000..f62d288 --- /dev/null +++ b/shell/cron/actions.go @@ -0,0 +1,116 @@ +package cron + +import ( + "context" + "fmt" + "os" + "strings" + + "github.com/enmanuel/agents/internal/config" + coretypes "github.com/enmanuel/agents/pkg/llm" +) + +const actionKindSendMessage = "send_message" +const actionKindLLMPrompt = "llm_prompt" + +// handler is a function that fires when a schedule triggers. +type handler func(ctx context.Context, room string) + +// buildHandler returns the handler for a schedule, or nil for unsupported kinds. +func (s *Scheduler) buildHandler(sc config.ScheduleCfg) handler { + switch sc.Action.Kind { + case actionKindSendMessage: + return s.sendMessageHandler(sc) + case actionKindLLMPrompt: + return s.llmPromptHandler(sc) + default: + return nil + } +} + +// sendMessageHandler returns a handler that sends a static message to a Matrix room. +// The message content is resolved in priority order: Message > Template file. +func (s *Scheduler) sendMessageHandler(sc config.ScheduleCfg) handler { + return func(ctx context.Context, room string) { + content, err := resolveContent(sc.Action.Message, sc.Action.Template) + if err != nil { + s.logger.Error("send_message: failed to resolve content", + "name", sc.Name, "err", err) + return + } + if content == "" { + s.logger.Warn("send_message: empty content, skipping", "name", sc.Name) + return + } + + s.logger.Info("cron_fire", "name", sc.Name, "kind", actionKindSendMessage, "room", room) + if err := s.matrix.SendMarkdown(ctx, room, content); err != nil { + s.logger.Error("send_message: matrix send failed", + "name", sc.Name, "room", room, "err", err) + } + } +} + +// llmPromptHandler returns a handler that calls the LLM with a prompt and sends +// the response to a Matrix room. +func (s *Scheduler) llmPromptHandler(sc config.ScheduleCfg) handler { + return func(ctx context.Context, room string) { + if s.llm == nil { + s.logger.Warn("llm_prompt: no LLM configured, skipping", "name", sc.Name) + return + } + + prompt, err := resolveContent(sc.Action.Prompt, sc.Action.Template) + if err != nil { + s.logger.Error("llm_prompt: failed to resolve prompt", + "name", sc.Name, "err", err) + return + } + if prompt == "" { + s.logger.Warn("llm_prompt: empty prompt, skipping", "name", sc.Name) + return + } + + s.logger.Info("cron_fire", "name", sc.Name, "kind", actionKindLLMPrompt, "room", room) + + req := coretypes.CompletionRequest{ + Model: s.model, + Messages: []coretypes.Message{ + {Role: coretypes.RoleUser, Content: prompt}, + }, + } + + resp, err := s.llm(ctx, req) + if err != nil { + s.logger.Error("llm_prompt: LLM call failed", + "name", sc.Name, "err", err) + return + } + + content := strings.TrimSpace(resp.Content) + if content == "" { + s.logger.Warn("llm_prompt: LLM returned empty response", "name", sc.Name) + return + } + + if err := s.matrix.SendMarkdown(ctx, room, content); err != nil { + s.logger.Error("llm_prompt: matrix send failed", + "name", sc.Name, "room", room, "err", err) + } + } +} + +// resolveContent returns the inline text if non-empty, otherwise reads the file at templatePath. +func resolveContent(inline, templatePath string) (string, error) { + if inline != "" { + return inline, nil + } + if templatePath == "" { + return "", nil + } + data, err := os.ReadFile(templatePath) + if err != nil { + return "", fmt.Errorf("reading template %q: %w", templatePath, err) + } + return strings.TrimSpace(string(data)), nil +} diff --git a/shell/cron/scheduler.go b/shell/cron/scheduler.go new file mode 100644 index 0000000..d2ead76 --- /dev/null +++ b/shell/cron/scheduler.go @@ -0,0 +1,93 @@ +// Package cron provides a scheduler for autonomous bot activity. +// It is part of the impure shell: it reads files, calls LLMs, and sends Matrix messages. +package cron + +import ( + "context" + "log/slog" + + "github.com/robfig/cron/v3" + + "github.com/enmanuel/agents/internal/config" + coretypes "github.com/enmanuel/agents/pkg/llm" +) + +// MatrixSender is the subset of matrix.Client needed by the scheduler. +type MatrixSender interface { + SendMarkdown(ctx context.Context, roomID, markdown string) error +} + +// Scheduler fires configured schedules and executes send_message or llm_prompt actions. +type Scheduler struct { + cfg []config.ScheduleCfg + matrix MatrixSender + llm coretypes.CompleteFunc // nil when agent has no LLM + model string + logger *slog.Logger + cron *cron.Cron +} + +// New creates a Scheduler. llm and model are optional (nil/empty for agents without LLM). +func New( + cfg []config.ScheduleCfg, + matrix MatrixSender, + llm coretypes.CompleteFunc, + model string, + logger *slog.Logger, +) *Scheduler { + return &Scheduler{ + cfg: cfg, + matrix: matrix, + llm: llm, + model: model, + logger: logger.With("component", "cron"), + cron: cron.New(), + } +} + +// Start registers all schedules and starts the cron loop. +// It returns when ctx is cancelled, stopping the cron runner. +func (s *Scheduler) Start(ctx context.Context) { + for _, sc := range s.cfg { + sc := sc // capture range var + if sc.Cron == "" || sc.Action.Kind == "" { + s.logger.Warn("skipping invalid schedule", "name", sc.Name, "cron", sc.Cron, "kind", sc.Action.Kind) + continue + } + + room := sc.OutputRoom + if room == "" { + s.logger.Warn("schedule has no output_room, skipping", "name", sc.Name) + continue + } + + handler := s.buildHandler(sc) + if handler == nil { + s.logger.Warn("unsupported action kind, skipping", "name", sc.Name, "kind", sc.Action.Kind) + continue + } + + _, err := s.cron.AddFunc(sc.Cron, func() { + handler(ctx, room) + }) + if err != nil { + s.logger.Error("failed to register schedule", + "name", sc.Name, + "cron", sc.Cron, + "err", err, + ) + continue + } + + s.logger.Info("schedule registered", "name", sc.Name, "cron", sc.Cron, "kind", sc.Action.Kind, "room", room) + } + + s.cron.Start() + s.logger.Info("cron scheduler started", "schedules", len(s.cfg)) + + <-ctx.Done() + s.logger.Info("cron scheduler stopping") + cronCtx := s.cron.Stop() + <-cronCtx.Done() + s.logger.Info("cron scheduler stopped") +} diff --git a/shell/cron/scheduler_test.go b/shell/cron/scheduler_test.go new file mode 100644 index 0000000..12cb5ae --- /dev/null +++ b/shell/cron/scheduler_test.go @@ -0,0 +1,271 @@ +package cron_test + +import ( + "context" + "errors" + "log/slog" + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/enmanuel/agents/internal/config" + coretypes "github.com/enmanuel/agents/pkg/llm" + shellcron "github.com/enmanuel/agents/shell/cron" +) + +// ── fakes ────────────────────────────────────────────────────────────────── + +type fakeSender struct { + calls atomic.Int32 + lastMD string + lastRM string +} + +func (f *fakeSender) SendMarkdown(_ context.Context, room, md string) error { + f.calls.Add(1) + f.lastRM = room + f.lastMD = md + return nil +} + +type errSender struct{} + +func (e *errSender) SendMarkdown(_ context.Context, _, _ string) error { + return errors.New("matrix unavailable") +} + +func fakeLLM(reply string) coretypes.CompleteFunc { + return func(_ context.Context, _ coretypes.CompletionRequest) (coretypes.CompletionResponse, error) { + return coretypes.CompletionResponse{Content: reply}, nil + } +} + +func newTestLogger(t *testing.T) *slog.Logger { + t.Helper() + return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) +} + +// ── helpers ──────────────────────────────────────────────────────────────── + +// waitCalls blocks until the sender has received at least n calls or the deadline passes. +func waitCalls(t *testing.T, f *fakeSender, n int32) { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if f.calls.Load() >= n { + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("expected %d call(s) to SendMarkdown, got %d", n, f.calls.Load()) +} + +// ── tests ────────────────────────────────────────────────────────────────── + +func TestScheduler_SendMessage_Inline(t *testing.T) { + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "test-inline", + Cron: "@every 100ms", + OutputRoom: "!room:server.com", + Action: config.ScheduledAction{ + Kind: "send_message", + Message: "hola mundo", + }, + }, + } + + s := shellcron.New(cfg, sender, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + waitCalls(t, sender, 1) + cancel() + <-done + + if sender.lastRM != "!room:server.com" { + t.Errorf("unexpected room: %s", sender.lastRM) + } + if sender.lastMD != "hola mundo" { + t.Errorf("unexpected message: %s", sender.lastMD) + } +} + +func TestScheduler_SendMessage_Template(t *testing.T) { + // Write a temporary template file + dir := t.TempDir() + tmpl := filepath.Join(dir, "greeting.md") + if err := os.WriteFile(tmpl, []byte("buenos días"), 0o600); err != nil { + t.Fatal(err) + } + + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "test-template", + Cron: "@every 100ms", + OutputRoom: "!room2:server.com", + Action: config.ScheduledAction{ + Kind: "send_message", + Template: tmpl, + }, + }, + } + + s := shellcron.New(cfg, sender, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + waitCalls(t, sender, 1) + cancel() + <-done + + if sender.lastMD != "buenos días" { + t.Errorf("unexpected message: %q", sender.lastMD) + } +} + +func TestScheduler_LLMPrompt(t *testing.T) { + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "test-llm", + Cron: "@every 100ms", + OutputRoom: "!room3:server.com", + Action: config.ScheduledAction{ + Kind: "llm_prompt", + Prompt: "resume el día", + }, + }, + } + + llm := fakeLLM("resumen generado por LLM") + s := shellcron.New(cfg, sender, llm, "gpt-4o", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + waitCalls(t, sender, 1) + cancel() + <-done + + if sender.lastMD != "resumen generado por LLM" { + t.Errorf("unexpected LLM reply: %q", sender.lastMD) + } +} + +func TestScheduler_LLMPrompt_NoLLM(t *testing.T) { + // When no LLM is configured, llm_prompt should be skipped gracefully (no panic). + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "no-llm", + Cron: "@every 100ms", + OutputRoom: "!room:server.com", + Action: config.ScheduledAction{ + Kind: "llm_prompt", + Prompt: "hello", + }, + }, + } + + s := shellcron.New(cfg, sender, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + // Wait a bit to confirm nothing is sent + time.Sleep(350 * time.Millisecond) + cancel() + <-done + + if sender.calls.Load() != 0 { + t.Errorf("expected 0 calls without LLM, got %d", sender.calls.Load()) + } +} + +func TestScheduler_SkipsInvalidSchedule(t *testing.T) { + // Schedules without output_room or without action kind must be skipped silently. + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "no-room", + Cron: "@every 100ms", + // missing OutputRoom + Action: config.ScheduledAction{Kind: "send_message", Message: "hi"}, + }, + { + Name: "no-kind", + Cron: "@every 100ms", + OutputRoom: "!room:server.com", + // missing Action.Kind + }, + } + + s := shellcron.New(cfg, sender, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + time.Sleep(350 * time.Millisecond) + cancel() + <-done + + if sender.calls.Load() != 0 { + t.Errorf("expected 0 calls for invalid schedules, got %d", sender.calls.Load()) + } +} + +func TestScheduler_MatrixSendError(t *testing.T) { + // If matrix.SendMarkdown returns an error, the scheduler should log it and not panic. + cfg := []config.ScheduleCfg{ + { + Name: "err-send", + Cron: "@every 100ms", + OutputRoom: "!room:server.com", + Action: config.ScheduledAction{ + Kind: "send_message", + Message: "trigger error", + }, + }, + } + + s := shellcron.New(cfg, &errSender{}, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + // Let it fire at least once without panicking + time.Sleep(250 * time.Millisecond) + cancel() + <-done +} From 9d1ab2d28e5a4b5634f3db306c7f84276552744d Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Sun, 8 Mar 2026 19:00:38 +0000 Subject: [PATCH 3/4] feat: integrar Scheduler en agents/runtime.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Se instancia shellcron.Scheduler en agents.New() cuando cfg.Schedules tiene entradas (scheduler queda nil en agentes sin schedules, sin overhead). En agents.Run() se arranca el scheduler en una goroutine independiente que termina cuando el ctx del agente es cancelado — el shutdown es limpio gracias a cron.Stop() que devuelve un contexto que se espera. La integración no rompe agentes existentes: el campo scheduler es nil por defecto y todo el código nuevo está tras if a.scheduler != nil. --- agents/runtime.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/agents/runtime.go b/agents/runtime.go index 19ab667..9eacd54 100644 --- a/agents/runtime.go +++ b/agents/runtime.go @@ -24,6 +24,7 @@ import ( "github.com/enmanuel/agents/pkg/personality" "github.com/enmanuel/agents/pkg/sanitize" "github.com/enmanuel/agents/shell/bus" + shellcron "github.com/enmanuel/agents/shell/cron" "github.com/enmanuel/agents/shell/effects" shellknowledge "github.com/enmanuel/agents/shell/knowledge" shelllm "github.com/enmanuel/agents/shell/llm" @@ -93,6 +94,9 @@ type Agent struct { // Bus — set via SetBus() when running under the unified launcher agentBus *bus.Bus + + // Scheduler — nil when no schedules are configured + scheduler *shellcron.Scheduler } // ClearWindow resets the conversation window for a room and deletes persisted @@ -308,6 +312,12 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (* toolReg.Register(toolmemory.NewMemoryClearContext(a, roomCtx)) } + // Cron scheduler — only when schedules are configured + if len(cfg.Schedules) > 0 { + a.scheduler = shellcron.New(cfg.Schedules, matrixClient, llmFunc, cfg.LLM.Primary.Model, logger) + logger.Info("cron scheduler configured", "schedules", len(cfg.Schedules)) + } + // Matrix event listener a.listener = matrix.NewListener(matrixClient, cfg.Matrix, a.handleEvent, logger) @@ -420,6 +430,11 @@ func (a *Agent) Run(ctx context.Context) error { a.logger.Info("bus listener started") } + // Start cron scheduler in background goroutine (blocks until ctx cancelled) + if a.scheduler != nil { + go a.scheduler.Start(ctx) + } + return a.listener.Run(ctx) } From 769f648778f4a21f52a677b99927bc42ca0b35e5 Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Sun, 8 Mar 2026 19:00:41 +0000 Subject: [PATCH 4/4] chore: cerrar issue 0005-bot-cron MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Se mueve el issue a dev/issues/completed/ y se actualiza el índice en README.md. El estado cambia de 'pendiente' a 'completado'. --- dev/issues/README.md | 2 +- dev/issues/{ => completed}/0005-bot-cron.md | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename dev/issues/{ => completed}/0005-bot-cron.md (100%) diff --git a/dev/issues/README.md b/dev/issues/README.md index 1684d6f..23bc6eb 100644 --- a/dev/issues/README.md +++ b/dev/issues/README.md @@ -9,7 +9,7 @@ afectados y notas de implementacion. | 2 | Memoria para los bots | [0002-bot-memory.md](completed/0002-bot-memory.md) | completado | | 3 | Interaccion entre bots | [0003-bot-interaction.md](completed/0003-bot-interaction.md) | completado | | 4 | Fotos de perfil | [0004-bot-avatar.md](completed/0004-bot-avatar.md) | completado | -| 5 | Cron scheduler | [0005-bot-cron.md](0005-bot-cron.md) | pendiente | +| 5 | Cron scheduler | [0005-bot-cron.md](completed/0005-bot-cron.md) | completado | | 6 | Anadir Claude provider | [0006-anadir-claude-p.md](completed/0006-añadir-claude-p.md) | completado | | 7 | Logs mejorados | [0007-logs-mejorados.md](completed/0007-logs-mejorados.md) | completado | | 8 | Knowledge por agente | [0008-knowledge_por_agente.md](completed/0008-knowledge_por_agente.md) | completado | diff --git a/dev/issues/0005-bot-cron.md b/dev/issues/completed/0005-bot-cron.md similarity index 100% rename from dev/issues/0005-bot-cron.md rename to dev/issues/completed/0005-bot-cron.md