b5a867ca5a
Implementa el subsistema completo de background jobs para apps Go en el dominio infra. 9 funciones + 3 tipos + 17 tests, todos pasando. - Tipos: Job (product), JobQueue (product), JobStatus (sum) con JobHandler, EnqueueOption y WorkerOption usando functional options pattern - job_queue_create: CREATE TABLE + indices + WAL mode - job_enqueue: INSERT con UUID (github.com/google/uuid), WithPriority/WithScheduledAt/WithMaxAttempts - job_dequeue: SELECT+UPDATE atomico en transaccion exclusiva, filtro por jobTypes - job_complete / job_fail: transiciones de estado; fail → dead cuando attempts >= max_attempts - job_status_summary: pura, formatea conteo de jobs por estado - job_worker: poll loop bloqueante, context-cancelable, graceful shutdown - job_worker_pool: N workers con golang.org/x/sync/errgroup - job_cleanup: DELETE jobs terminales mas viejos que olderThan Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
71 lines
1.5 KiB
Go
71 lines
1.5 KiB
Go
package infra
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// JobWorker runs a poll loop in the calling goroutine, dequeuing and processing
|
|
// jobs until ctx is cancelled. handler is called for each job; its return value
|
|
// drives complete vs fail logic. The worker sleeps pollInterval between empty
|
|
// dequeue attempts.
|
|
//
|
|
// Options: WithPollInterval (default 1s), WithJobTypes (default all types).
|
|
//
|
|
// This function blocks until ctx is done. Run it in a goroutine:
|
|
//
|
|
// go JobWorker(ctx, q, handler, WithPollInterval(2*time.Second))
|
|
func JobWorker(ctx context.Context, q *JobQueue, handler JobHandler, opts ...WorkerOption) error {
|
|
if q == nil {
|
|
return fmt.Errorf("job_worker: queue must not be nil")
|
|
}
|
|
if handler == nil {
|
|
return fmt.Errorf("job_worker: handler must not be nil")
|
|
}
|
|
|
|
cfg := &workerConfig{
|
|
pollInterval: time.Second,
|
|
}
|
|
for _, o := range opts {
|
|
o(cfg)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
job, err := JobDequeue(q, cfg.jobTypes)
|
|
if err != nil {
|
|
// Transient DB error — wait and retry.
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(cfg.pollInterval):
|
|
}
|
|
continue
|
|
}
|
|
|
|
if job == nil {
|
|
// Queue empty — wait before polling again.
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(cfg.pollInterval):
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Process the job.
|
|
handlerErr := handler(*job)
|
|
if handlerErr != nil {
|
|
_ = JobFail(q, job.ID, handlerErr.Error())
|
|
} else {
|
|
_ = JobComplete(q, job.ID, "")
|
|
}
|
|
}
|
|
}
|