Files
fn_registry/functions/infra/job_queue_test.go
T
egutierrez b5a867ca5a feat: cola de jobs asincrona basada en SQLite (issue 0013)
Implementa el subsistema completo de background jobs para apps Go en el
dominio infra. 9 funciones + 3 tipos + 17 tests, todos pasando.

- Tipos: Job (product), JobQueue (product), JobStatus (sum) con
  JobHandler, EnqueueOption y WorkerOption usando functional options pattern
- job_queue_create: CREATE TABLE + indices + WAL mode
- job_enqueue: INSERT con UUID (github.com/google/uuid), WithPriority/WithScheduledAt/WithMaxAttempts
- job_dequeue: SELECT+UPDATE atomico en transaccion exclusiva, filtro por jobTypes
- job_complete / job_fail: transiciones de estado; fail → dead cuando attempts >= max_attempts
- job_status_summary: pura, formatea conteo de jobs por estado
- job_worker: poll loop bloqueante, context-cancelable, graceful shutdown
- job_worker_pool: N workers con golang.org/x/sync/errgroup
- job_cleanup: DELETE jobs terminales mas viejos que olderThan

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 02:00:44 +02:00

419 lines
10 KiB
Go

package infra
import (
"context"
"database/sql"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
_ "github.com/mattn/go-sqlite3"
)
// openTestDB opens an in-memory SQLite database for testing.
// MaxOpenConns is set to 1 to ensure all goroutines share the same connection
// (required for in-memory SQLite which is per-connection otherwise).
func openTestDB(t *testing.T) *sql.DB {
t.Helper()
db, err := sql.Open("sqlite3", ":memory:?_journal_mode=WAL")
if err != nil {
t.Fatalf("open test db: %v", err)
}
db.SetMaxOpenConns(1)
t.Cleanup(func() { db.Close() })
return db
}
func TestJobQueueCreate(t *testing.T) {
t.Run("test_queue_create", func(t *testing.T) {
db := openTestDB(t)
q, err := JobQueueCreate(db, "jobs")
if err != nil {
t.Fatalf("JobQueueCreate: %v", err)
}
if q == nil {
t.Fatal("expected non-nil JobQueue")
}
if q.TableName != "jobs" {
t.Errorf("TableName = %q, want %q", q.TableName, "jobs")
}
// Verify table exists by inserting a row directly.
_, err = db.Exec(`INSERT INTO jobs (id, type, scheduled_at) VALUES ('x', 'test', ?)`,
time.Now().UTC().Format(time.RFC3339))
if err != nil {
t.Fatalf("table not created: %v", err)
}
})
t.Run("test_queue_create_default_table_name", func(t *testing.T) {
db := openTestDB(t)
q, err := JobQueueCreate(db, "")
if err != nil {
t.Fatalf("JobQueueCreate with empty name: %v", err)
}
if q.TableName != "jobs" {
t.Errorf("TableName = %q, want %q", q.TableName, "jobs")
}
})
t.Run("test_queue_create_nil_db", func(t *testing.T) {
_, err := JobQueueCreate(nil, "jobs")
if err == nil {
t.Fatal("expected error for nil db")
}
})
}
func TestJobEnqueueDequeue(t *testing.T) {
t.Run("enqueue_dequeue_atomicidad", func(t *testing.T) {
db := openTestDB(t)
q, err := JobQueueCreate(db, "jobs")
if err != nil {
t.Fatalf("create: %v", err)
}
id, err := JobEnqueue(q, "send_email", `{"to":"a@b.com"}`)
if err != nil {
t.Fatalf("enqueue: %v", err)
}
if id == "" {
t.Fatal("expected non-empty job ID")
}
job, err := JobDequeue(q, nil)
if err != nil {
t.Fatalf("dequeue: %v", err)
}
if job == nil {
t.Fatal("expected a job, got nil")
}
if job.ID != id {
t.Errorf("job ID = %q, want %q", job.ID, id)
}
if job.Type != "send_email" {
t.Errorf("job Type = %q, want %q", job.Type, "send_email")
}
if job.Status != JobStatusRunning {
t.Errorf("job Status = %q, want %q", job.Status, JobStatusRunning)
}
// Second dequeue should return nil (job is now running, not pending).
job2, err := JobDequeue(q, nil)
if err != nil {
t.Fatalf("second dequeue: %v", err)
}
if job2 != nil {
t.Errorf("expected nil second dequeue, got job %q", job2.ID)
}
})
t.Run("dequeue_empty_queue_returns_nil", func(t *testing.T) {
db := openTestDB(t)
q, _ := JobQueueCreate(db, "jobs")
job, err := JobDequeue(q, nil)
if err != nil {
t.Fatalf("dequeue empty: %v", err)
}
if job != nil {
t.Errorf("expected nil, got job %q", job.ID)
}
})
t.Run("dequeue_priority_order", func(t *testing.T) {
db := openTestDB(t)
q, _ := JobQueueCreate(db, "jobs")
_, _ = JobEnqueue(q, "low", "{}", WithPriority(0))
_, _ = JobEnqueue(q, "high", "{}", WithPriority(10))
_, _ = JobEnqueue(q, "mid", "{}", WithPriority(5))
job, _ := JobDequeue(q, nil)
if job == nil || job.Type != "high" {
t.Errorf("expected high-priority job first, got %v", job)
}
})
t.Run("dequeue_jobtype_filter", func(t *testing.T) {
db := openTestDB(t)
q, _ := JobQueueCreate(db, "jobs")
_, _ = JobEnqueue(q, "email", "{}")
_, _ = JobEnqueue(q, "sms", "{}")
// Only dequeue "sms" jobs.
job, err := JobDequeue(q, []string{"sms"})
if err != nil {
t.Fatalf("dequeue with filter: %v", err)
}
if job == nil || job.Type != "sms" {
t.Errorf("expected sms job, got %v", job)
}
})
t.Run("dequeue_scheduled_in_future_waits", func(t *testing.T) {
db := openTestDB(t)
q, _ := JobQueueCreate(db, "jobs")
future := time.Now().Add(1 * time.Hour)
_, _ = JobEnqueue(q, "future", "{}", WithScheduledAt(future))
job, err := JobDequeue(q, nil)
if err != nil {
t.Fatalf("dequeue future: %v", err)
}
if job != nil {
t.Errorf("expected nil (future job not yet due), got job %q", job.ID)
}
})
}
func TestJobCompleteAndFail(t *testing.T) {
t.Run("complete_fail_transitions", func(t *testing.T) {
db := openTestDB(t)
q, _ := JobQueueCreate(db, "jobs")
id, _ := JobEnqueue(q, "work", "{}", WithMaxAttempts(3))
job, _ := JobDequeue(q, nil)
if job == nil {
t.Fatal("expected job")
}
// Complete it.
if err := JobComplete(q, id, `{"ok":true}`); err != nil {
t.Fatalf("complete: %v", err)
}
// Verify status in DB.
var status string
_ = db.QueryRow(`SELECT status FROM jobs WHERE id = ?`, id).Scan(&status)
if status != "completed" {
t.Errorf("status = %q, want completed", status)
}
})
t.Run("fail_increments_attempts", func(t *testing.T) {
db := openTestDB(t)
q, _ := JobQueueCreate(db, "jobs")
id, _ := JobEnqueue(q, "work", "{}", WithMaxAttempts(3))
_, _ = JobDequeue(q, nil)
// First failure → status = failed (attempts=1 < max=3).
if err := JobFail(q, id, "oops"); err != nil {
t.Fatalf("fail: %v", err)
}
var status string
var attempts int
_ = db.QueryRow(`SELECT status, attempts FROM jobs WHERE id = ?`, id).Scan(&status, &attempts)
if status != "failed" {
t.Errorf("status = %q, want failed", status)
}
if attempts != 1 {
t.Errorf("attempts = %d, want 1", attempts)
}
})
t.Run("fail_transitions_to_dead", func(t *testing.T) {
db := openTestDB(t)
q, _ := JobQueueCreate(db, "jobs")
id, _ := JobEnqueue(q, "work", "{}", WithMaxAttempts(2))
// Fail twice → dead.
for i := 0; i < 2; i++ {
// Re-set to pending so we can dequeue again (simulate retry logic).
if i > 0 {
_, _ = db.Exec(`UPDATE jobs SET status = 'pending' WHERE id = ?`, id)
}
_, _ = JobDequeue(q, nil)
_ = JobFail(q, id, fmt.Sprintf("error %d", i+1))
}
var status string
_ = db.QueryRow(`SELECT status FROM jobs WHERE id = ?`, id).Scan(&status)
if status != "dead" {
t.Errorf("status = %q, want dead after max_attempts reached", status)
}
})
}
func TestJobStatusSummaryFn(t *testing.T) {
t.Run("job_status_summary_format", func(t *testing.T) {
counts := map[string]int{
"pending": 5,
"running": 2,
"completed": 10,
"failed": 1,
"dead": 0,
}
got := JobStatusSummary(counts)
want := "pending: 5, running: 2, completed: 10, failed: 1, dead: 0"
if got != want {
t.Errorf("got %q, want %q", got, want)
}
})
t.Run("job_status_summary_empty_map", func(t *testing.T) {
got := JobStatusSummary(map[string]int{})
want := "pending: 0, running: 0, completed: 0, failed: 0, dead: 0"
if got != want {
t.Errorf("got %q, want %q", got, want)
}
})
}
func TestJobWorkerGracefulShutdown(t *testing.T) {
t.Run("worker_graceful_shutdown", func(t *testing.T) {
db := openTestDB(t)
q, _ := JobQueueCreate(db, "jobs")
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
defer close(done)
_ = JobWorker(ctx, q, func(j Job) error {
return nil
}, WithPollInterval(10*time.Millisecond))
}()
// Let the worker poll a couple of times.
time.Sleep(50 * time.Millisecond)
cancel()
select {
case <-done:
// OK — worker exited cleanly.
case <-time.After(2 * time.Second):
t.Fatal("worker did not stop within timeout after context cancellation")
}
})
t.Run("worker_processes_jobs", func(t *testing.T) {
db := openTestDB(t)
q, _ := JobQueueCreate(db, "jobs")
var processed atomic.Int32
for i := 0; i < 3; i++ {
_, _ = JobEnqueue(q, "task", `{}`)
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
_ = JobWorker(ctx, q, func(j Job) error {
processed.Add(1)
return nil
}, WithPollInterval(10*time.Millisecond))
}()
// Wait for all 3 to be processed (with timeout).
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if processed.Load() >= 3 {
break
}
time.Sleep(10 * time.Millisecond)
}
cancel()
if n := processed.Load(); n < 3 {
t.Errorf("processed %d jobs, want 3", n)
}
})
}
func TestJobConcurrency(t *testing.T) {
t.Run("concurrency_multiples_goroutines_dequeue", func(t *testing.T) {
db := openTestDB(t)
q, _ := JobQueueCreate(db, "jobs")
const numJobs = 50
for i := 0; i < numJobs; i++ {
_, _ = JobEnqueue(q, "task", fmt.Sprintf(`{"i":%d}`, i))
}
var processed atomic.Int32
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
const numWorkers = 5
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = JobWorker(ctx, q, func(j Job) error {
processed.Add(1)
return nil
}, WithPollInterval(5*time.Millisecond))
}()
}
// Wait until all jobs are processed.
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
if processed.Load() >= numJobs {
break
}
time.Sleep(10 * time.Millisecond)
}
cancel()
wg.Wait()
if n := processed.Load(); n != numJobs {
t.Errorf("processed %d, want %d (no double-processing with concurrent workers)", n, numJobs)
}
})
}
func TestJobCleanupFn(t *testing.T) {
t.Run("job_cleanup_removes_old_terminal_jobs", func(t *testing.T) {
db := openTestDB(t)
q, _ := JobQueueCreate(db, "jobs")
id, _ := JobEnqueue(q, "work", "{}")
_, _ = JobDequeue(q, nil)
_ = JobComplete(q, id, "")
// Make the job look old (set created_at in the past, always UTC to match JobCleanup).
past := time.Now().UTC().Add(-25 * time.Hour).Format(time.RFC3339)
_, _ = db.Exec(`UPDATE jobs SET created_at = ? WHERE id = ?`, past, id)
n, err := JobCleanup(q, 24*time.Hour)
if err != nil {
t.Fatalf("cleanup: %v", err)
}
if n != 1 {
t.Errorf("deleted %d rows, want 1", n)
}
// Job should be gone.
var count int
_ = db.QueryRow(`SELECT COUNT(*) FROM jobs WHERE id = ?`, id).Scan(&count)
if count != 0 {
t.Errorf("job still in DB after cleanup")
}
})
t.Run("job_cleanup_keeps_recent_jobs", func(t *testing.T) {
db := openTestDB(t)
q, _ := JobQueueCreate(db, "jobs")
id, _ := JobEnqueue(q, "work", "{}")
_, _ = JobDequeue(q, nil)
_ = JobComplete(q, id, "")
// Job was just created — should not be cleaned up.
n, err := JobCleanup(q, 24*time.Hour)
if err != nil {
t.Fatalf("cleanup: %v", err)
}
if n != 0 {
t.Errorf("deleted %d rows, want 0 (job is recent)", n)
}
})
}