feat: implementar shell/cron — scheduler autónomo para bots

Nuevo paquete shell/cron con dos archivos:

shell/cron/scheduler.go — Scheduler struct con método Start(ctx) que:
  - Registra todas las entradas de config.ScheduleCfg como jobs de robfig/cron
  - Omite schedules sin output_room o sin action.kind (warn en log)
  - Bloquea hasta que ctx sea cancelado, luego detiene el cron limpiamente
  - Recibe MatrixSender, CompleteFunc y *slog.Logger como dependencias (sin importar agents/)

shell/cron/actions.go — ejecutores para fase 1:
  - send_message: resuelve contenido desde Message (inline) o Template (archivo .md),
    luego llama a matrix.SendMarkdown
  - llm_prompt: resuelve prompt desde Prompt o Template, llama al LLM y envía
    la respuesta al room configurado; no-op silencioso si no hay LLM

resolveContent() prioriza texto inline sobre ruta de archivo, lo que permite
tanto mensajes cortos en YAML como prompts largos en archivos .md separados.

Fase 2 (run_tool) y fase 3 (inter-bot) quedan pendientes según el issue.
This commit is contained in:
2026-03-08 19:00:32 +00:00
parent 46d85109fe
commit 4dfc6cf0b9
3 changed files with 480 additions and 0 deletions
+116
View File
@@ -0,0 +1,116 @@
package cron
import (
"context"
"fmt"
"os"
"strings"
"github.com/enmanuel/agents/internal/config"
coretypes "github.com/enmanuel/agents/pkg/llm"
)
const actionKindSendMessage = "send_message"
const actionKindLLMPrompt = "llm_prompt"
// handler is a function that fires when a schedule triggers.
type handler func(ctx context.Context, room string)
// buildHandler returns the handler for a schedule, or nil for unsupported kinds.
func (s *Scheduler) buildHandler(sc config.ScheduleCfg) handler {
switch sc.Action.Kind {
case actionKindSendMessage:
return s.sendMessageHandler(sc)
case actionKindLLMPrompt:
return s.llmPromptHandler(sc)
default:
return nil
}
}
// sendMessageHandler returns a handler that sends a static message to a Matrix room.
// The message content is resolved in priority order: Message > Template file.
func (s *Scheduler) sendMessageHandler(sc config.ScheduleCfg) handler {
return func(ctx context.Context, room string) {
content, err := resolveContent(sc.Action.Message, sc.Action.Template)
if err != nil {
s.logger.Error("send_message: failed to resolve content",
"name", sc.Name, "err", err)
return
}
if content == "" {
s.logger.Warn("send_message: empty content, skipping", "name", sc.Name)
return
}
s.logger.Info("cron_fire", "name", sc.Name, "kind", actionKindSendMessage, "room", room)
if err := s.matrix.SendMarkdown(ctx, room, content); err != nil {
s.logger.Error("send_message: matrix send failed",
"name", sc.Name, "room", room, "err", err)
}
}
}
// llmPromptHandler returns a handler that calls the LLM with a prompt and sends
// the response to a Matrix room.
func (s *Scheduler) llmPromptHandler(sc config.ScheduleCfg) handler {
return func(ctx context.Context, room string) {
if s.llm == nil {
s.logger.Warn("llm_prompt: no LLM configured, skipping", "name", sc.Name)
return
}
prompt, err := resolveContent(sc.Action.Prompt, sc.Action.Template)
if err != nil {
s.logger.Error("llm_prompt: failed to resolve prompt",
"name", sc.Name, "err", err)
return
}
if prompt == "" {
s.logger.Warn("llm_prompt: empty prompt, skipping", "name", sc.Name)
return
}
s.logger.Info("cron_fire", "name", sc.Name, "kind", actionKindLLMPrompt, "room", room)
req := coretypes.CompletionRequest{
Model: s.model,
Messages: []coretypes.Message{
{Role: coretypes.RoleUser, Content: prompt},
},
}
resp, err := s.llm(ctx, req)
if err != nil {
s.logger.Error("llm_prompt: LLM call failed",
"name", sc.Name, "err", err)
return
}
content := strings.TrimSpace(resp.Content)
if content == "" {
s.logger.Warn("llm_prompt: LLM returned empty response", "name", sc.Name)
return
}
if err := s.matrix.SendMarkdown(ctx, room, content); err != nil {
s.logger.Error("llm_prompt: matrix send failed",
"name", sc.Name, "room", room, "err", err)
}
}
}
// resolveContent returns the inline text if non-empty, otherwise reads the file at templatePath.
func resolveContent(inline, templatePath string) (string, error) {
if inline != "" {
return inline, nil
}
if templatePath == "" {
return "", nil
}
data, err := os.ReadFile(templatePath)
if err != nil {
return "", fmt.Errorf("reading template %q: %w", templatePath, err)
}
return strings.TrimSpace(string(data)), nil
}
+93
View File
@@ -0,0 +1,93 @@
// Package cron provides a scheduler for autonomous bot activity.
// It is part of the impure shell: it reads files, calls LLMs, and sends Matrix messages.
package cron
import (
"context"
"log/slog"
"github.com/robfig/cron/v3"
"github.com/enmanuel/agents/internal/config"
coretypes "github.com/enmanuel/agents/pkg/llm"
)
// MatrixSender is the subset of matrix.Client needed by the scheduler.
type MatrixSender interface {
SendMarkdown(ctx context.Context, roomID, markdown string) error
}
// Scheduler fires configured schedules and executes send_message or llm_prompt actions.
type Scheduler struct {
cfg []config.ScheduleCfg
matrix MatrixSender
llm coretypes.CompleteFunc // nil when agent has no LLM
model string
logger *slog.Logger
cron *cron.Cron
}
// New creates a Scheduler. llm and model are optional (nil/empty for agents without LLM).
func New(
cfg []config.ScheduleCfg,
matrix MatrixSender,
llm coretypes.CompleteFunc,
model string,
logger *slog.Logger,
) *Scheduler {
return &Scheduler{
cfg: cfg,
matrix: matrix,
llm: llm,
model: model,
logger: logger.With("component", "cron"),
cron: cron.New(),
}
}
// Start registers all schedules and starts the cron loop.
// It returns when ctx is cancelled, stopping the cron runner.
func (s *Scheduler) Start(ctx context.Context) {
for _, sc := range s.cfg {
sc := sc // capture range var
if sc.Cron == "" || sc.Action.Kind == "" {
s.logger.Warn("skipping invalid schedule", "name", sc.Name, "cron", sc.Cron, "kind", sc.Action.Kind)
continue
}
room := sc.OutputRoom
if room == "" {
s.logger.Warn("schedule has no output_room, skipping", "name", sc.Name)
continue
}
handler := s.buildHandler(sc)
if handler == nil {
s.logger.Warn("unsupported action kind, skipping", "name", sc.Name, "kind", sc.Action.Kind)
continue
}
_, err := s.cron.AddFunc(sc.Cron, func() {
handler(ctx, room)
})
if err != nil {
s.logger.Error("failed to register schedule",
"name", sc.Name,
"cron", sc.Cron,
"err", err,
)
continue
}
s.logger.Info("schedule registered", "name", sc.Name, "cron", sc.Cron, "kind", sc.Action.Kind, "room", room)
}
s.cron.Start()
s.logger.Info("cron scheduler started", "schedules", len(s.cfg))
<-ctx.Done()
s.logger.Info("cron scheduler stopping")
cronCtx := s.cron.Stop()
<-cronCtx.Done()
s.logger.Info("cron scheduler stopped")
}
+271
View File
@@ -0,0 +1,271 @@
package cron_test
import (
"context"
"errors"
"log/slog"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"
"github.com/enmanuel/agents/internal/config"
coretypes "github.com/enmanuel/agents/pkg/llm"
shellcron "github.com/enmanuel/agents/shell/cron"
)
// ── fakes ──────────────────────────────────────────────────────────────────
type fakeSender struct {
calls atomic.Int32
lastMD string
lastRM string
}
func (f *fakeSender) SendMarkdown(_ context.Context, room, md string) error {
f.calls.Add(1)
f.lastRM = room
f.lastMD = md
return nil
}
type errSender struct{}
func (e *errSender) SendMarkdown(_ context.Context, _, _ string) error {
return errors.New("matrix unavailable")
}
func fakeLLM(reply string) coretypes.CompleteFunc {
return func(_ context.Context, _ coretypes.CompletionRequest) (coretypes.CompletionResponse, error) {
return coretypes.CompletionResponse{Content: reply}, nil
}
}
func newTestLogger(t *testing.T) *slog.Logger {
t.Helper()
return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))
}
// ── helpers ────────────────────────────────────────────────────────────────
// waitCalls blocks until the sender has received at least n calls or the deadline passes.
func waitCalls(t *testing.T, f *fakeSender, n int32) {
t.Helper()
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
if f.calls.Load() >= n {
return
}
time.Sleep(20 * time.Millisecond)
}
t.Fatalf("expected %d call(s) to SendMarkdown, got %d", n, f.calls.Load())
}
// ── tests ──────────────────────────────────────────────────────────────────
func TestScheduler_SendMessage_Inline(t *testing.T) {
sender := &fakeSender{}
cfg := []config.ScheduleCfg{
{
Name: "test-inline",
Cron: "@every 100ms",
OutputRoom: "!room:server.com",
Action: config.ScheduledAction{
Kind: "send_message",
Message: "hola mundo",
},
},
}
s := shellcron.New(cfg, sender, nil, "", newTestLogger(t))
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
defer close(done)
s.Start(ctx)
}()
waitCalls(t, sender, 1)
cancel()
<-done
if sender.lastRM != "!room:server.com" {
t.Errorf("unexpected room: %s", sender.lastRM)
}
if sender.lastMD != "hola mundo" {
t.Errorf("unexpected message: %s", sender.lastMD)
}
}
func TestScheduler_SendMessage_Template(t *testing.T) {
// Write a temporary template file
dir := t.TempDir()
tmpl := filepath.Join(dir, "greeting.md")
if err := os.WriteFile(tmpl, []byte("buenos días"), 0o600); err != nil {
t.Fatal(err)
}
sender := &fakeSender{}
cfg := []config.ScheduleCfg{
{
Name: "test-template",
Cron: "@every 100ms",
OutputRoom: "!room2:server.com",
Action: config.ScheduledAction{
Kind: "send_message",
Template: tmpl,
},
},
}
s := shellcron.New(cfg, sender, nil, "", newTestLogger(t))
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
defer close(done)
s.Start(ctx)
}()
waitCalls(t, sender, 1)
cancel()
<-done
if sender.lastMD != "buenos días" {
t.Errorf("unexpected message: %q", sender.lastMD)
}
}
func TestScheduler_LLMPrompt(t *testing.T) {
sender := &fakeSender{}
cfg := []config.ScheduleCfg{
{
Name: "test-llm",
Cron: "@every 100ms",
OutputRoom: "!room3:server.com",
Action: config.ScheduledAction{
Kind: "llm_prompt",
Prompt: "resume el día",
},
},
}
llm := fakeLLM("resumen generado por LLM")
s := shellcron.New(cfg, sender, llm, "gpt-4o", newTestLogger(t))
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
defer close(done)
s.Start(ctx)
}()
waitCalls(t, sender, 1)
cancel()
<-done
if sender.lastMD != "resumen generado por LLM" {
t.Errorf("unexpected LLM reply: %q", sender.lastMD)
}
}
func TestScheduler_LLMPrompt_NoLLM(t *testing.T) {
// When no LLM is configured, llm_prompt should be skipped gracefully (no panic).
sender := &fakeSender{}
cfg := []config.ScheduleCfg{
{
Name: "no-llm",
Cron: "@every 100ms",
OutputRoom: "!room:server.com",
Action: config.ScheduledAction{
Kind: "llm_prompt",
Prompt: "hello",
},
},
}
s := shellcron.New(cfg, sender, nil, "", newTestLogger(t))
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
defer close(done)
s.Start(ctx)
}()
// Wait a bit to confirm nothing is sent
time.Sleep(350 * time.Millisecond)
cancel()
<-done
if sender.calls.Load() != 0 {
t.Errorf("expected 0 calls without LLM, got %d", sender.calls.Load())
}
}
func TestScheduler_SkipsInvalidSchedule(t *testing.T) {
// Schedules without output_room or without action kind must be skipped silently.
sender := &fakeSender{}
cfg := []config.ScheduleCfg{
{
Name: "no-room",
Cron: "@every 100ms",
// missing OutputRoom
Action: config.ScheduledAction{Kind: "send_message", Message: "hi"},
},
{
Name: "no-kind",
Cron: "@every 100ms",
OutputRoom: "!room:server.com",
// missing Action.Kind
},
}
s := shellcron.New(cfg, sender, nil, "", newTestLogger(t))
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
defer close(done)
s.Start(ctx)
}()
time.Sleep(350 * time.Millisecond)
cancel()
<-done
if sender.calls.Load() != 0 {
t.Errorf("expected 0 calls for invalid schedules, got %d", sender.calls.Load())
}
}
func TestScheduler_MatrixSendError(t *testing.T) {
// If matrix.SendMarkdown returns an error, the scheduler should log it and not panic.
cfg := []config.ScheduleCfg{
{
Name: "err-send",
Cron: "@every 100ms",
OutputRoom: "!room:server.com",
Action: config.ScheduledAction{
Kind: "send_message",
Message: "trigger error",
},
},
}
s := shellcron.New(cfg, &errSender{}, nil, "", newTestLogger(t))
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
defer close(done)
s.Start(ctx)
}()
// Let it fire at least once without panicking
time.Sleep(250 * time.Millisecond)
cancel()
<-done
}