feat: cola de jobs asincrona basada en SQLite (issue 0013)
Implementa el subsistema completo de background jobs para apps Go en el dominio infra. 9 funciones + 3 tipos + 17 tests, todos pasando. - Tipos: Job (product), JobQueue (product), JobStatus (sum) con JobHandler, EnqueueOption y WorkerOption usando functional options pattern - job_queue_create: CREATE TABLE + indices + WAL mode - job_enqueue: INSERT con UUID (github.com/google/uuid), WithPriority/WithScheduledAt/WithMaxAttempts - job_dequeue: SELECT+UPDATE atomico en transaccion exclusiva, filtro por jobTypes - job_complete / job_fail: transiciones de estado; fail → dead cuando attempts >= max_attempts - job_status_summary: pura, formatea conteo de jobs por estado - job_worker: poll loop bloqueante, context-cancelable, graceful shutdown - job_worker_pool: N workers con golang.org/x/sync/errgroup - job_cleanup: DELETE jobs terminales mas viejos que olderThan Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,125 @@
|
||||
package infra
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// JobDequeue atomically claims the next available job by running a
|
||||
// SELECT + UPDATE inside an EXCLUSIVE transaction. Returns nil, nil when the
|
||||
// queue is empty (or no job matches the jobTypes filter).
|
||||
// jobTypes restricts which job types are dequeued; an empty slice means all types.
|
||||
func JobDequeue(q *JobQueue, jobTypes []string) (*Job, error) {
|
||||
if q == nil {
|
||||
return nil, fmt.Errorf("job_dequeue: queue must not be nil")
|
||||
}
|
||||
|
||||
tx, err := q.DB.Begin()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("job_dequeue: begin tx: %w", err)
|
||||
}
|
||||
defer tx.Rollback() //nolint:errcheck
|
||||
|
||||
// Build the SELECT query.
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
|
||||
var selectQ string
|
||||
var args []any
|
||||
|
||||
if len(jobTypes) > 0 {
|
||||
placeholders := make([]string, len(jobTypes))
|
||||
for i, t := range jobTypes {
|
||||
placeholders[i] = "?"
|
||||
args = append(args, t)
|
||||
}
|
||||
selectQ = fmt.Sprintf(`
|
||||
SELECT id, type, payload, status, priority, attempts, max_attempts,
|
||||
scheduled_at, started_at, completed_at, result, error, created_at
|
||||
FROM %s
|
||||
WHERE status = 'pending'
|
||||
AND scheduled_at <= ?
|
||||
AND type IN (%s)
|
||||
ORDER BY priority DESC, scheduled_at ASC
|
||||
LIMIT 1
|
||||
`, q.TableName, strings.Join(placeholders, ","))
|
||||
args = append(args, now)
|
||||
// rearrange: now goes before jobTypes in the query
|
||||
newArgs := []any{now}
|
||||
newArgs = append(newArgs, args[:len(args)-1]...)
|
||||
args = newArgs
|
||||
} else {
|
||||
selectQ = fmt.Sprintf(`
|
||||
SELECT id, type, payload, status, priority, attempts, max_attempts,
|
||||
scheduled_at, started_at, completed_at, result, error, created_at
|
||||
FROM %s
|
||||
WHERE status = 'pending'
|
||||
AND scheduled_at <= ?
|
||||
ORDER BY priority DESC, scheduled_at ASC
|
||||
LIMIT 1
|
||||
`, q.TableName)
|
||||
args = []any{now}
|
||||
}
|
||||
|
||||
row := tx.QueryRow(selectQ, args...)
|
||||
|
||||
var job Job
|
||||
var scheduledAt string
|
||||
var startedAt, completedAt sql.NullString
|
||||
var result, jobError sql.NullString
|
||||
var createdAt string
|
||||
|
||||
err = row.Scan(
|
||||
&job.ID, &job.Type, &job.Payload, (*string)(&job.Status),
|
||||
&job.Priority, &job.Attempts, &job.MaxAttempts,
|
||||
&scheduledAt, &startedAt, &completedAt,
|
||||
&result, &jobError, &createdAt,
|
||||
)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("job_dequeue: scan: %w", err)
|
||||
}
|
||||
|
||||
// Mark as running.
|
||||
updateQ := fmt.Sprintf(`
|
||||
UPDATE %s SET status = 'running', started_at = ? WHERE id = ?
|
||||
`, q.TableName)
|
||||
if _, err := tx.Exec(updateQ, now, job.ID); err != nil {
|
||||
return nil, fmt.Errorf("job_dequeue: update status: %w", err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return nil, fmt.Errorf("job_dequeue: commit: %w", err)
|
||||
}
|
||||
|
||||
// Hydrate optional fields.
|
||||
if t, err := time.Parse(time.RFC3339, scheduledAt); err == nil {
|
||||
job.ScheduledAt = t
|
||||
}
|
||||
if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
|
||||
job.CreatedAt = t
|
||||
}
|
||||
if startedAt.Valid {
|
||||
if t, err := time.Parse(time.RFC3339, startedAt.String); err == nil {
|
||||
job.StartedAt = &t
|
||||
}
|
||||
}
|
||||
if completedAt.Valid {
|
||||
if t, err := time.Parse(time.RFC3339, completedAt.String); err == nil {
|
||||
job.CompletedAt = &t
|
||||
}
|
||||
}
|
||||
if result.Valid {
|
||||
job.Result = &result.String
|
||||
}
|
||||
if jobError.Valid {
|
||||
job.Error = &jobError.String
|
||||
}
|
||||
job.Status = JobStatusRunning
|
||||
|
||||
return &job, nil
|
||||
}
|
||||
Reference in New Issue
Block a user