From 4dfc6cf0b9201a9740cb4c22f965ae8cc0342f5e Mon Sep 17 00:00:00 2001 From: Enmanuel Date: Sun, 8 Mar 2026 19:00:32 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20implementar=20shell/cron=20=E2=80=94=20?= =?UTF-8?q?scheduler=20aut=C3=B3nomo=20para=20bots?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Nuevo paquete shell/cron con dos archivos: shell/cron/scheduler.go — Scheduler struct con método Start(ctx) que: - Registra todas las entradas de config.ScheduleCfg como jobs de robfig/cron - Omite schedules sin output_room o sin action.kind (warn en log) - Bloquea hasta que ctx sea cancelado, luego detiene el cron limpiamente - Recibe MatrixSender, CompleteFunc y *slog.Logger como dependencias (sin importar agents/) shell/cron/actions.go — ejecutores para fase 1: - send_message: resuelve contenido desde Message (inline) o Template (archivo .md), luego llama a matrix.SendMarkdown - llm_prompt: resuelve prompt desde Prompt o Template, llama al LLM y envía la respuesta al room configurado; no-op silencioso si no hay LLM resolveContent() prioriza texto inline sobre ruta de archivo, lo que permite tanto mensajes cortos en YAML como prompts largos en archivos .md separados. Fase 2 (run_tool) y fase 3 (inter-bot) quedan pendientes según el issue. --- shell/cron/actions.go | 116 +++++++++++++++ shell/cron/scheduler.go | 93 ++++++++++++ shell/cron/scheduler_test.go | 271 +++++++++++++++++++++++++++++++++++ 3 files changed, 480 insertions(+) create mode 100644 shell/cron/actions.go create mode 100644 shell/cron/scheduler.go create mode 100644 shell/cron/scheduler_test.go diff --git a/shell/cron/actions.go b/shell/cron/actions.go new file mode 100644 index 0000000..f62d288 --- /dev/null +++ b/shell/cron/actions.go @@ -0,0 +1,116 @@ +package cron + +import ( + "context" + "fmt" + "os" + "strings" + + "github.com/enmanuel/agents/internal/config" + coretypes "github.com/enmanuel/agents/pkg/llm" +) + +const actionKindSendMessage = "send_message" +const actionKindLLMPrompt = "llm_prompt" + +// handler is a function that fires when a schedule triggers. +type handler func(ctx context.Context, room string) + +// buildHandler returns the handler for a schedule, or nil for unsupported kinds. +func (s *Scheduler) buildHandler(sc config.ScheduleCfg) handler { + switch sc.Action.Kind { + case actionKindSendMessage: + return s.sendMessageHandler(sc) + case actionKindLLMPrompt: + return s.llmPromptHandler(sc) + default: + return nil + } +} + +// sendMessageHandler returns a handler that sends a static message to a Matrix room. +// The message content is resolved in priority order: Message > Template file. +func (s *Scheduler) sendMessageHandler(sc config.ScheduleCfg) handler { + return func(ctx context.Context, room string) { + content, err := resolveContent(sc.Action.Message, sc.Action.Template) + if err != nil { + s.logger.Error("send_message: failed to resolve content", + "name", sc.Name, "err", err) + return + } + if content == "" { + s.logger.Warn("send_message: empty content, skipping", "name", sc.Name) + return + } + + s.logger.Info("cron_fire", "name", sc.Name, "kind", actionKindSendMessage, "room", room) + if err := s.matrix.SendMarkdown(ctx, room, content); err != nil { + s.logger.Error("send_message: matrix send failed", + "name", sc.Name, "room", room, "err", err) + } + } +} + +// llmPromptHandler returns a handler that calls the LLM with a prompt and sends +// the response to a Matrix room. +func (s *Scheduler) llmPromptHandler(sc config.ScheduleCfg) handler { + return func(ctx context.Context, room string) { + if s.llm == nil { + s.logger.Warn("llm_prompt: no LLM configured, skipping", "name", sc.Name) + return + } + + prompt, err := resolveContent(sc.Action.Prompt, sc.Action.Template) + if err != nil { + s.logger.Error("llm_prompt: failed to resolve prompt", + "name", sc.Name, "err", err) + return + } + if prompt == "" { + s.logger.Warn("llm_prompt: empty prompt, skipping", "name", sc.Name) + return + } + + s.logger.Info("cron_fire", "name", sc.Name, "kind", actionKindLLMPrompt, "room", room) + + req := coretypes.CompletionRequest{ + Model: s.model, + Messages: []coretypes.Message{ + {Role: coretypes.RoleUser, Content: prompt}, + }, + } + + resp, err := s.llm(ctx, req) + if err != nil { + s.logger.Error("llm_prompt: LLM call failed", + "name", sc.Name, "err", err) + return + } + + content := strings.TrimSpace(resp.Content) + if content == "" { + s.logger.Warn("llm_prompt: LLM returned empty response", "name", sc.Name) + return + } + + if err := s.matrix.SendMarkdown(ctx, room, content); err != nil { + s.logger.Error("llm_prompt: matrix send failed", + "name", sc.Name, "room", room, "err", err) + } + } +} + +// resolveContent returns the inline text if non-empty, otherwise reads the file at templatePath. +func resolveContent(inline, templatePath string) (string, error) { + if inline != "" { + return inline, nil + } + if templatePath == "" { + return "", nil + } + data, err := os.ReadFile(templatePath) + if err != nil { + return "", fmt.Errorf("reading template %q: %w", templatePath, err) + } + return strings.TrimSpace(string(data)), nil +} diff --git a/shell/cron/scheduler.go b/shell/cron/scheduler.go new file mode 100644 index 0000000..d2ead76 --- /dev/null +++ b/shell/cron/scheduler.go @@ -0,0 +1,93 @@ +// Package cron provides a scheduler for autonomous bot activity. +// It is part of the impure shell: it reads files, calls LLMs, and sends Matrix messages. +package cron + +import ( + "context" + "log/slog" + + "github.com/robfig/cron/v3" + + "github.com/enmanuel/agents/internal/config" + coretypes "github.com/enmanuel/agents/pkg/llm" +) + +// MatrixSender is the subset of matrix.Client needed by the scheduler. +type MatrixSender interface { + SendMarkdown(ctx context.Context, roomID, markdown string) error +} + +// Scheduler fires configured schedules and executes send_message or llm_prompt actions. +type Scheduler struct { + cfg []config.ScheduleCfg + matrix MatrixSender + llm coretypes.CompleteFunc // nil when agent has no LLM + model string + logger *slog.Logger + cron *cron.Cron +} + +// New creates a Scheduler. llm and model are optional (nil/empty for agents without LLM). +func New( + cfg []config.ScheduleCfg, + matrix MatrixSender, + llm coretypes.CompleteFunc, + model string, + logger *slog.Logger, +) *Scheduler { + return &Scheduler{ + cfg: cfg, + matrix: matrix, + llm: llm, + model: model, + logger: logger.With("component", "cron"), + cron: cron.New(), + } +} + +// Start registers all schedules and starts the cron loop. +// It returns when ctx is cancelled, stopping the cron runner. +func (s *Scheduler) Start(ctx context.Context) { + for _, sc := range s.cfg { + sc := sc // capture range var + if sc.Cron == "" || sc.Action.Kind == "" { + s.logger.Warn("skipping invalid schedule", "name", sc.Name, "cron", sc.Cron, "kind", sc.Action.Kind) + continue + } + + room := sc.OutputRoom + if room == "" { + s.logger.Warn("schedule has no output_room, skipping", "name", sc.Name) + continue + } + + handler := s.buildHandler(sc) + if handler == nil { + s.logger.Warn("unsupported action kind, skipping", "name", sc.Name, "kind", sc.Action.Kind) + continue + } + + _, err := s.cron.AddFunc(sc.Cron, func() { + handler(ctx, room) + }) + if err != nil { + s.logger.Error("failed to register schedule", + "name", sc.Name, + "cron", sc.Cron, + "err", err, + ) + continue + } + + s.logger.Info("schedule registered", "name", sc.Name, "cron", sc.Cron, "kind", sc.Action.Kind, "room", room) + } + + s.cron.Start() + s.logger.Info("cron scheduler started", "schedules", len(s.cfg)) + + <-ctx.Done() + s.logger.Info("cron scheduler stopping") + cronCtx := s.cron.Stop() + <-cronCtx.Done() + s.logger.Info("cron scheduler stopped") +} diff --git a/shell/cron/scheduler_test.go b/shell/cron/scheduler_test.go new file mode 100644 index 0000000..12cb5ae --- /dev/null +++ b/shell/cron/scheduler_test.go @@ -0,0 +1,271 @@ +package cron_test + +import ( + "context" + "errors" + "log/slog" + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/enmanuel/agents/internal/config" + coretypes "github.com/enmanuel/agents/pkg/llm" + shellcron "github.com/enmanuel/agents/shell/cron" +) + +// ── fakes ────────────────────────────────────────────────────────────────── + +type fakeSender struct { + calls atomic.Int32 + lastMD string + lastRM string +} + +func (f *fakeSender) SendMarkdown(_ context.Context, room, md string) error { + f.calls.Add(1) + f.lastRM = room + f.lastMD = md + return nil +} + +type errSender struct{} + +func (e *errSender) SendMarkdown(_ context.Context, _, _ string) error { + return errors.New("matrix unavailable") +} + +func fakeLLM(reply string) coretypes.CompleteFunc { + return func(_ context.Context, _ coretypes.CompletionRequest) (coretypes.CompletionResponse, error) { + return coretypes.CompletionResponse{Content: reply}, nil + } +} + +func newTestLogger(t *testing.T) *slog.Logger { + t.Helper() + return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) +} + +// ── helpers ──────────────────────────────────────────────────────────────── + +// waitCalls blocks until the sender has received at least n calls or the deadline passes. +func waitCalls(t *testing.T, f *fakeSender, n int32) { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if f.calls.Load() >= n { + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("expected %d call(s) to SendMarkdown, got %d", n, f.calls.Load()) +} + +// ── tests ────────────────────────────────────────────────────────────────── + +func TestScheduler_SendMessage_Inline(t *testing.T) { + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "test-inline", + Cron: "@every 100ms", + OutputRoom: "!room:server.com", + Action: config.ScheduledAction{ + Kind: "send_message", + Message: "hola mundo", + }, + }, + } + + s := shellcron.New(cfg, sender, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + waitCalls(t, sender, 1) + cancel() + <-done + + if sender.lastRM != "!room:server.com" { + t.Errorf("unexpected room: %s", sender.lastRM) + } + if sender.lastMD != "hola mundo" { + t.Errorf("unexpected message: %s", sender.lastMD) + } +} + +func TestScheduler_SendMessage_Template(t *testing.T) { + // Write a temporary template file + dir := t.TempDir() + tmpl := filepath.Join(dir, "greeting.md") + if err := os.WriteFile(tmpl, []byte("buenos días"), 0o600); err != nil { + t.Fatal(err) + } + + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "test-template", + Cron: "@every 100ms", + OutputRoom: "!room2:server.com", + Action: config.ScheduledAction{ + Kind: "send_message", + Template: tmpl, + }, + }, + } + + s := shellcron.New(cfg, sender, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + waitCalls(t, sender, 1) + cancel() + <-done + + if sender.lastMD != "buenos días" { + t.Errorf("unexpected message: %q", sender.lastMD) + } +} + +func TestScheduler_LLMPrompt(t *testing.T) { + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "test-llm", + Cron: "@every 100ms", + OutputRoom: "!room3:server.com", + Action: config.ScheduledAction{ + Kind: "llm_prompt", + Prompt: "resume el día", + }, + }, + } + + llm := fakeLLM("resumen generado por LLM") + s := shellcron.New(cfg, sender, llm, "gpt-4o", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + waitCalls(t, sender, 1) + cancel() + <-done + + if sender.lastMD != "resumen generado por LLM" { + t.Errorf("unexpected LLM reply: %q", sender.lastMD) + } +} + +func TestScheduler_LLMPrompt_NoLLM(t *testing.T) { + // When no LLM is configured, llm_prompt should be skipped gracefully (no panic). + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "no-llm", + Cron: "@every 100ms", + OutputRoom: "!room:server.com", + Action: config.ScheduledAction{ + Kind: "llm_prompt", + Prompt: "hello", + }, + }, + } + + s := shellcron.New(cfg, sender, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + // Wait a bit to confirm nothing is sent + time.Sleep(350 * time.Millisecond) + cancel() + <-done + + if sender.calls.Load() != 0 { + t.Errorf("expected 0 calls without LLM, got %d", sender.calls.Load()) + } +} + +func TestScheduler_SkipsInvalidSchedule(t *testing.T) { + // Schedules without output_room or without action kind must be skipped silently. + sender := &fakeSender{} + cfg := []config.ScheduleCfg{ + { + Name: "no-room", + Cron: "@every 100ms", + // missing OutputRoom + Action: config.ScheduledAction{Kind: "send_message", Message: "hi"}, + }, + { + Name: "no-kind", + Cron: "@every 100ms", + OutputRoom: "!room:server.com", + // missing Action.Kind + }, + } + + s := shellcron.New(cfg, sender, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + time.Sleep(350 * time.Millisecond) + cancel() + <-done + + if sender.calls.Load() != 0 { + t.Errorf("expected 0 calls for invalid schedules, got %d", sender.calls.Load()) + } +} + +func TestScheduler_MatrixSendError(t *testing.T) { + // If matrix.SendMarkdown returns an error, the scheduler should log it and not panic. + cfg := []config.ScheduleCfg{ + { + Name: "err-send", + Cron: "@every 100ms", + OutputRoom: "!room:server.com", + Action: config.ScheduledAction{ + Kind: "send_message", + Message: "trigger error", + }, + }, + } + + s := shellcron.New(cfg, &errSender{}, nil, "", newTestLogger(t)) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + s.Start(ctx) + }() + + // Let it fire at least once without panicking + time.Sleep(250 * time.Millisecond) + cancel() + <-done +}