b5a867ca5a
Implementa el subsistema completo de background jobs para apps Go en el dominio infra. 9 funciones + 3 tipos + 17 tests, todos pasando. - Tipos: Job (product), JobQueue (product), JobStatus (sum) con JobHandler, EnqueueOption y WorkerOption usando functional options pattern - job_queue_create: CREATE TABLE + indices + WAL mode - job_enqueue: INSERT con UUID (github.com/google/uuid), WithPriority/WithScheduledAt/WithMaxAttempts - job_dequeue: SELECT+UPDATE atomico en transaccion exclusiva, filtro por jobTypes - job_complete / job_fail: transiciones de estado; fail → dead cuando attempts >= max_attempts - job_status_summary: pura, formatea conteo de jobs por estado - job_worker: poll loop bloqueante, context-cancelable, graceful shutdown - job_worker_pool: N workers con golang.org/x/sync/errgroup - job_cleanup: DELETE jobs terminales mas viejos que olderThan Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
73 lines
2.2 KiB
Go
73 lines
2.2 KiB
Go
package infra
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
)
|
|
|
|
// JobQueueCreate creates (or verifies) the jobs table and required indices in
|
|
// the given SQLite database, activates WAL mode, and returns a ready *JobQueue.
|
|
// tableName is typically "jobs" but can be any valid SQLite identifier.
|
|
//
|
|
// Schema created:
|
|
//
|
|
// CREATE TABLE IF NOT EXISTS jobs (
|
|
// id TEXT PRIMARY KEY,
|
|
// type TEXT NOT NULL,
|
|
// payload TEXT NOT NULL DEFAULT '{}',
|
|
// status TEXT NOT NULL DEFAULT 'pending',
|
|
// priority INTEGER NOT NULL DEFAULT 0,
|
|
// attempts INTEGER NOT NULL DEFAULT 0,
|
|
// max_attempts INTEGER NOT NULL DEFAULT 3,
|
|
// scheduled_at TEXT NOT NULL,
|
|
// started_at TEXT,
|
|
// completed_at TEXT,
|
|
// result TEXT,
|
|
// error TEXT,
|
|
// created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
|
|
// );
|
|
func JobQueueCreate(db *sql.DB, tableName string) (*JobQueue, error) {
|
|
if db == nil {
|
|
return nil, fmt.Errorf("job_queue_create: db must not be nil")
|
|
}
|
|
if tableName == "" {
|
|
tableName = "jobs"
|
|
}
|
|
|
|
// Enable WAL mode for concurrent read/write access.
|
|
if _, err := db.Exec(`PRAGMA journal_mode=WAL`); err != nil {
|
|
return nil, fmt.Errorf("job_queue_create: enable WAL: %w", err)
|
|
}
|
|
|
|
schema := fmt.Sprintf(`
|
|
CREATE TABLE IF NOT EXISTS %s (
|
|
id TEXT PRIMARY KEY,
|
|
type TEXT NOT NULL,
|
|
payload TEXT NOT NULL DEFAULT '{}',
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
priority INTEGER NOT NULL DEFAULT 0,
|
|
attempts INTEGER NOT NULL DEFAULT 0,
|
|
max_attempts INTEGER NOT NULL DEFAULT 3,
|
|
scheduled_at TEXT NOT NULL,
|
|
started_at TEXT,
|
|
completed_at TEXT,
|
|
result TEXT,
|
|
error TEXT,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now'))
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_%s_dequeue ON %s (status, priority DESC, scheduled_at ASC) WHERE status = 'pending';
|
|
CREATE INDEX IF NOT EXISTS idx_%s_status ON %s (status);
|
|
CREATE INDEX IF NOT EXISTS idx_%s_type ON %s (type);
|
|
`, tableName,
|
|
tableName, tableName,
|
|
tableName, tableName,
|
|
tableName, tableName,
|
|
)
|
|
|
|
if _, err := db.Exec(schema); err != nil {
|
|
return nil, fmt.Errorf("job_queue_create: create schema: %w", err)
|
|
}
|
|
|
|
return &JobQueue{DB: db, TableName: tableName}, nil
|
|
}
|