Files
egutierrez b5a867ca5a feat: cola de jobs asincrona basada en SQLite (issue 0013)
Implementa el subsistema completo de background jobs para apps Go en el
dominio infra. 9 funciones + 3 tipos + 17 tests, todos pasando.

- Tipos: Job (product), JobQueue (product), JobStatus (sum) con
  JobHandler, EnqueueOption y WorkerOption usando functional options pattern
- job_queue_create: CREATE TABLE + indices + WAL mode
- job_enqueue: INSERT con UUID (github.com/google/uuid), WithPriority/WithScheduledAt/WithMaxAttempts
- job_dequeue: SELECT+UPDATE atomico en transaccion exclusiva, filtro por jobTypes
- job_complete / job_fail: transiciones de estado; fail → dead cuando attempts >= max_attempts
- job_status_summary: pura, formatea conteo de jobs por estado
- job_worker: poll loop bloqueante, context-cancelable, graceful shutdown
- job_worker_pool: N workers con golang.org/x/sync/errgroup
- job_cleanup: DELETE jobs terminales mas viejos que olderThan

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 02:00:44 +02:00

48 lines
1.2 KiB
Go

package infra
import (
"fmt"
"time"
"github.com/google/uuid"
)
// JobEnqueue inserts a new job into the queue and returns its UUID.
// jobType identifies the kind of work (e.g. "send_email", "resize_image").
// payload is a JSON string with the job data (defaults to "{}" if empty).
// Options: WithPriority, WithScheduledAt, WithMaxAttempts.
func JobEnqueue(q *JobQueue, jobType string, payload string, opts ...EnqueueOption) (string, error) {
if q == nil {
return "", fmt.Errorf("job_enqueue: queue must not be nil")
}
if jobType == "" {
return "", fmt.Errorf("job_enqueue: jobType must not be empty")
}
if payload == "" {
payload = "{}"
}
cfg := &enqueueConfig{
priority: 0,
scheduledAt: time.Now().UTC(),
maxAttempts: 3,
}
for _, o := range opts {
o(cfg)
}
id := uuid.New().String()
scheduledAt := cfg.scheduledAt.Format(time.RFC3339)
q2 := fmt.Sprintf(`
INSERT INTO %s (id, type, payload, status, priority, max_attempts, scheduled_at)
VALUES (?, ?, ?, 'pending', ?, ?, ?)
`, q.TableName)
_, err := q.DB.Exec(q2, id, jobType, payload, cfg.priority, cfg.maxAttempts, scheduledAt)
if err != nil {
return "", fmt.Errorf("job_enqueue: insert: %w", err)
}
return id, nil
}