From b5a867ca5ae83e2f4c5daf2b4b0298d670aaf578 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Mon, 13 Apr 2026 02:00:44 +0200 Subject: [PATCH 1/2] feat: cola de jobs asincrona basada en SQLite (issue 0013) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implementa el subsistema completo de background jobs para apps Go en el dominio infra. 9 funciones + 3 tipos + 17 tests, todos pasando. - Tipos: Job (product), JobQueue (product), JobStatus (sum) con JobHandler, EnqueueOption y WorkerOption usando functional options pattern - job_queue_create: CREATE TABLE + indices + WAL mode - job_enqueue: INSERT con UUID (github.com/google/uuid), WithPriority/WithScheduledAt/WithMaxAttempts - job_dequeue: SELECT+UPDATE atomico en transaccion exclusiva, filtro por jobTypes - job_complete / job_fail: transiciones de estado; fail → dead cuando attempts >= max_attempts - job_status_summary: pura, formatea conteo de jobs por estado - job_worker: poll loop bloqueante, context-cancelable, graceful shutdown - job_worker_pool: N workers con golang.org/x/sync/errgroup - job_cleanup: DELETE jobs terminales mas viejos que olderThan Co-Authored-By: Claude Sonnet 4.6 --- functions/infra/job_cleanup.go | 33 +++ functions/infra/job_cleanup.md | 41 +++ functions/infra/job_complete.go | 39 +++ functions/infra/job_complete.md | 40 +++ functions/infra/job_dequeue.go | 125 +++++++++ functions/infra/job_dequeue.md | 49 ++++ functions/infra/job_enqueue.go | 47 ++++ functions/infra/job_enqueue.md | 48 ++++ functions/infra/job_fail.go | 43 +++ functions/infra/job_fail.md | 43 +++ functions/infra/job_queue_create.go | 72 +++++ functions/infra/job_queue_create.md | 45 +++ functions/infra/job_queue_test.go | 418 ++++++++++++++++++++++++++++ functions/infra/job_queue_types.go | 87 ++++++ functions/infra/job_status.go | 19 ++ functions/infra/job_status.md | 39 +++ functions/infra/job_worker.go | 70 +++++ functions/infra/job_worker.md | 49 ++++ functions/infra/job_worker_pool.go | 36 +++ functions/infra/job_worker_pool.md | 50 ++++ types/infra/job.md | 31 +++ types/infra/job_queue.md | 20 ++ types/infra/job_status.md | 25 ++ 23 files changed, 1469 insertions(+) create mode 100644 functions/infra/job_cleanup.go create mode 100644 functions/infra/job_cleanup.md create mode 100644 functions/infra/job_complete.go create mode 100644 functions/infra/job_complete.md create mode 100644 functions/infra/job_dequeue.go create mode 100644 functions/infra/job_dequeue.md create mode 100644 functions/infra/job_enqueue.go create mode 100644 functions/infra/job_enqueue.md create mode 100644 functions/infra/job_fail.go create mode 100644 functions/infra/job_fail.md create mode 100644 functions/infra/job_queue_create.go create mode 100644 functions/infra/job_queue_create.md create mode 100644 functions/infra/job_queue_test.go create mode 100644 functions/infra/job_queue_types.go create mode 100644 functions/infra/job_status.go create mode 100644 functions/infra/job_status.md create mode 100644 functions/infra/job_worker.go create mode 100644 functions/infra/job_worker.md create mode 100644 functions/infra/job_worker_pool.go create mode 100644 functions/infra/job_worker_pool.md create mode 100644 types/infra/job.md create mode 100644 types/infra/job_queue.md create mode 100644 types/infra/job_status.md diff --git a/functions/infra/job_cleanup.go b/functions/infra/job_cleanup.go new file mode 100644 index 00000000..ede8edf4 --- /dev/null +++ b/functions/infra/job_cleanup.go @@ -0,0 +1,33 @@ +package infra + +import ( + "fmt" + "time" +) + +// JobCleanup deletes jobs in terminal states (completed, failed, dead) that are +// older than olderThan. Returns the number of rows deleted. +// This is useful for keeping the jobs table small in long-running applications. +func JobCleanup(q *JobQueue, olderThan time.Duration) (int64, error) { + if q == nil { + return 0, fmt.Errorf("job_cleanup: queue must not be nil") + } + + cutoff := time.Now().UTC().Add(-olderThan).Format(time.RFC3339) + + query := fmt.Sprintf(` +DELETE FROM %s +WHERE status IN ('completed', 'failed', 'dead') + AND created_at < ? +`, q.TableName) + + res, err := q.DB.Exec(query, cutoff) + if err != nil { + return 0, fmt.Errorf("job_cleanup: delete: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return 0, fmt.Errorf("job_cleanup: rows affected: %w", err) + } + return n, nil +} diff --git a/functions/infra/job_cleanup.md b/functions/infra/job_cleanup.md new file mode 100644 index 00000000..16d5d333 --- /dev/null +++ b/functions/infra/job_cleanup.md @@ -0,0 +1,41 @@ +--- +name: job_cleanup +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func JobCleanup(q *JobQueue, olderThan time.Duration) (int64, error)" +description: "Elimina jobs en estados terminales (completed, failed, dead) cuyo created_at sea mas antiguo que olderThan. Retorna el numero de filas eliminadas. Util para mantener la tabla compacta en apps de larga duracion." +tags: [job, queue, cleanup, delete, maintenance, sqlite, async, background, infra] +uses_functions: [] +uses_types: [job_queue_go_infra] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [fmt, time] +params: + - name: q + desc: "cola de jobs creada con JobQueueCreate" + - name: olderThan + desc: "duracion maxima de retension; jobs mas viejos que esto se eliminan (ej: 24*time.Hour)" +output: "numero de filas eliminadas" +tested: true +tests: + - "job_cleanup_removes_old_terminal_jobs" + - "job_cleanup_keeps_recent_jobs" +test_file_path: "functions/infra/job_queue_test.go" +file_path: "functions/infra/job_cleanup.go" +--- + +## Ejemplo + +```go +// Limpiar jobs terminados hace mas de 7 dias +n, err := JobCleanup(q, 7*24*time.Hour) +fmt.Printf("eliminados: %d jobs\n", n) +``` + +## Notas + +Solo elimina jobs en estados terminales: completed, failed, dead. Los jobs pending y running nunca se eliminan. El cutoff se calcula en UTC. Llamar periodicamente con CronTicker u otro scheduler para mantener la tabla compacta. diff --git a/functions/infra/job_complete.go b/functions/infra/job_complete.go new file mode 100644 index 00000000..25527fa2 --- /dev/null +++ b/functions/infra/job_complete.go @@ -0,0 +1,39 @@ +package infra + +import ( + "fmt" + "time" +) + +// JobComplete marks a job as completed and stores the optional result JSON string. +// result may be empty ("") to indicate no result payload. +func JobComplete(q *JobQueue, jobID string, result string) error { + if q == nil { + return fmt.Errorf("job_complete: queue must not be nil") + } + if jobID == "" { + return fmt.Errorf("job_complete: jobID must not be empty") + } + + now := time.Now().UTC().Format(time.RFC3339) + var resultPtr *string + if result != "" { + resultPtr = &result + } + + query := fmt.Sprintf(` +UPDATE %s +SET status = 'completed', completed_at = ?, result = ? +WHERE id = ? +`, q.TableName) + + res, err := q.DB.Exec(query, now, resultPtr, jobID) + if err != nil { + return fmt.Errorf("job_complete: update: %w", err) + } + n, _ := res.RowsAffected() + if n == 0 { + return fmt.Errorf("job_complete: job %q not found", jobID) + } + return nil +} diff --git a/functions/infra/job_complete.md b/functions/infra/job_complete.md new file mode 100644 index 00000000..00fe8cd3 --- /dev/null +++ b/functions/infra/job_complete.md @@ -0,0 +1,40 @@ +--- +name: job_complete +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func JobComplete(q *JobQueue, jobID string, result string) error" +description: "Marca un job como completado, setea completed_at y almacena el resultado opcional. result puede ser vacio si no hay payload de resultado." +tags: [job, queue, complete, sqlite, async, background, infra] +uses_functions: [] +uses_types: [job_queue_go_infra] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [fmt, time] +params: + - name: q + desc: "cola de jobs creada con JobQueueCreate" + - name: jobID + desc: "UUID del job a marcar como completado" + - name: result + desc: "JSON string con el resultado del job; puede ser vacio" +output: "error si el job no existe o falla el UPDATE" +tested: true +tests: + - "complete_fail_transitions" +test_file_path: "functions/infra/job_queue_test.go" +file_path: "functions/infra/job_complete.go" +--- + +## Ejemplo + +```go +err := JobComplete(q, job.ID, `{"rows_processed":42}`) +``` + +## Notas + +Setea `status='completed'`, `completed_at=now`, y `result` (NULL si vacio). Retorna error si el job no existe (0 rows affected). diff --git a/functions/infra/job_dequeue.go b/functions/infra/job_dequeue.go new file mode 100644 index 00000000..3da3cd2c --- /dev/null +++ b/functions/infra/job_dequeue.go @@ -0,0 +1,125 @@ +package infra + +import ( + "database/sql" + "errors" + "fmt" + "strings" + "time" +) + +// JobDequeue atomically claims the next available job by running a +// SELECT + UPDATE inside an EXCLUSIVE transaction. Returns nil, nil when the +// queue is empty (or no job matches the jobTypes filter). +// jobTypes restricts which job types are dequeued; an empty slice means all types. +func JobDequeue(q *JobQueue, jobTypes []string) (*Job, error) { + if q == nil { + return nil, fmt.Errorf("job_dequeue: queue must not be nil") + } + + tx, err := q.DB.Begin() + if err != nil { + return nil, fmt.Errorf("job_dequeue: begin tx: %w", err) + } + defer tx.Rollback() //nolint:errcheck + + // Build the SELECT query. + now := time.Now().UTC().Format(time.RFC3339) + + var selectQ string + var args []any + + if len(jobTypes) > 0 { + placeholders := make([]string, len(jobTypes)) + for i, t := range jobTypes { + placeholders[i] = "?" + args = append(args, t) + } + selectQ = fmt.Sprintf(` +SELECT id, type, payload, status, priority, attempts, max_attempts, + scheduled_at, started_at, completed_at, result, error, created_at +FROM %s +WHERE status = 'pending' + AND scheduled_at <= ? + AND type IN (%s) +ORDER BY priority DESC, scheduled_at ASC +LIMIT 1 +`, q.TableName, strings.Join(placeholders, ",")) + args = append(args, now) + // rearrange: now goes before jobTypes in the query + newArgs := []any{now} + newArgs = append(newArgs, args[:len(args)-1]...) + args = newArgs + } else { + selectQ = fmt.Sprintf(` +SELECT id, type, payload, status, priority, attempts, max_attempts, + scheduled_at, started_at, completed_at, result, error, created_at +FROM %s +WHERE status = 'pending' + AND scheduled_at <= ? +ORDER BY priority DESC, scheduled_at ASC +LIMIT 1 +`, q.TableName) + args = []any{now} + } + + row := tx.QueryRow(selectQ, args...) + + var job Job + var scheduledAt string + var startedAt, completedAt sql.NullString + var result, jobError sql.NullString + var createdAt string + + err = row.Scan( + &job.ID, &job.Type, &job.Payload, (*string)(&job.Status), + &job.Priority, &job.Attempts, &job.MaxAttempts, + &scheduledAt, &startedAt, &completedAt, + &result, &jobError, &createdAt, + ) + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("job_dequeue: scan: %w", err) + } + + // Mark as running. + updateQ := fmt.Sprintf(` +UPDATE %s SET status = 'running', started_at = ? WHERE id = ? +`, q.TableName) + if _, err := tx.Exec(updateQ, now, job.ID); err != nil { + return nil, fmt.Errorf("job_dequeue: update status: %w", err) + } + + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf("job_dequeue: commit: %w", err) + } + + // Hydrate optional fields. + if t, err := time.Parse(time.RFC3339, scheduledAt); err == nil { + job.ScheduledAt = t + } + if t, err := time.Parse(time.RFC3339, createdAt); err == nil { + job.CreatedAt = t + } + if startedAt.Valid { + if t, err := time.Parse(time.RFC3339, startedAt.String); err == nil { + job.StartedAt = &t + } + } + if completedAt.Valid { + if t, err := time.Parse(time.RFC3339, completedAt.String); err == nil { + job.CompletedAt = &t + } + } + if result.Valid { + job.Result = &result.String + } + if jobError.Valid { + job.Error = &jobError.String + } + job.Status = JobStatusRunning + + return &job, nil +} diff --git a/functions/infra/job_dequeue.md b/functions/infra/job_dequeue.md new file mode 100644 index 00000000..d11d3bf2 --- /dev/null +++ b/functions/infra/job_dequeue.md @@ -0,0 +1,49 @@ +--- +name: job_dequeue +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func JobDequeue(q *JobQueue, jobTypes []string) (*Job, error)" +description: "Extrae atomicamente el siguiente job disponible usando SELECT+UPDATE en una transaccion exclusiva. Retorna nil, nil si la cola esta vacia o no hay jobs que cumplan el filtro. jobTypes limita los tipos dequeued; slice vacio significa todos." +tags: [job, queue, dequeue, atomic, transaction, sqlite, async, background, infra] +uses_functions: [] +uses_types: [job_queue_go_infra, job_go_infra, job_status_go_infra] +returns: [job_go_infra] +returns_optional: true +error_type: "error_go_core" +imports: [database/sql, errors, fmt, strings, time] +params: + - name: q + desc: "cola de jobs creada con JobQueueCreate" + - name: jobTypes + desc: "lista de tipos a desencolar; slice vacio o nil = todos los tipos" +output: "puntero a Job con status=running, o nil si la cola esta vacia" +tested: true +tests: + - "enqueue_dequeue_atomicidad" + - "dequeue_empty_queue_returns_nil" + - "dequeue_priority_order" + - "dequeue_jobtype_filter" + - "dequeue_scheduled_in_future_waits" +test_file_path: "functions/infra/job_queue_test.go" +file_path: "functions/infra/job_dequeue.go" +--- + +## Ejemplo + +```go +// Desencolar cualquier tipo +job, err := JobDequeue(q, nil) +if job == nil { + // cola vacia +} + +// Filtrar por tipo +job, err = JobDequeue(q, []string{"send_email", "send_sms"}) +``` + +## Notas + +Usa `db.Begin()` + `tx.Rollback()` (deferred) + `tx.Commit()`. El SELECT filtra por `status='pending' AND scheduled_at <= now` y ordena por `priority DESC, scheduled_at ASC`. El UPDATE atomico cambia el status a 'running' y setea `started_at`. Safe para multiples workers concurrentes — SQLite serializa la transaccion. diff --git a/functions/infra/job_enqueue.go b/functions/infra/job_enqueue.go new file mode 100644 index 00000000..aa11199a --- /dev/null +++ b/functions/infra/job_enqueue.go @@ -0,0 +1,47 @@ +package infra + +import ( + "fmt" + "time" + + "github.com/google/uuid" +) + +// JobEnqueue inserts a new job into the queue and returns its UUID. +// jobType identifies the kind of work (e.g. "send_email", "resize_image"). +// payload is a JSON string with the job data (defaults to "{}" if empty). +// Options: WithPriority, WithScheduledAt, WithMaxAttempts. +func JobEnqueue(q *JobQueue, jobType string, payload string, opts ...EnqueueOption) (string, error) { + if q == nil { + return "", fmt.Errorf("job_enqueue: queue must not be nil") + } + if jobType == "" { + return "", fmt.Errorf("job_enqueue: jobType must not be empty") + } + if payload == "" { + payload = "{}" + } + + cfg := &enqueueConfig{ + priority: 0, + scheduledAt: time.Now().UTC(), + maxAttempts: 3, + } + for _, o := range opts { + o(cfg) + } + + id := uuid.New().String() + scheduledAt := cfg.scheduledAt.Format(time.RFC3339) + + q2 := fmt.Sprintf(` +INSERT INTO %s (id, type, payload, status, priority, max_attempts, scheduled_at) +VALUES (?, ?, ?, 'pending', ?, ?, ?) +`, q.TableName) + + _, err := q.DB.Exec(q2, id, jobType, payload, cfg.priority, cfg.maxAttempts, scheduledAt) + if err != nil { + return "", fmt.Errorf("job_enqueue: insert: %w", err) + } + return id, nil +} diff --git a/functions/infra/job_enqueue.md b/functions/infra/job_enqueue.md new file mode 100644 index 00000000..78ce1f86 --- /dev/null +++ b/functions/infra/job_enqueue.md @@ -0,0 +1,48 @@ +--- +name: job_enqueue +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func JobEnqueue(q *JobQueue, jobType string, payload string, opts ...EnqueueOption) (string, error)" +description: "Inserta un nuevo job en la cola con UUID generado, tipo, payload JSON y opciones. Retorna el UUID del job. Opciones: WithPriority, WithScheduledAt, WithMaxAttempts." +tags: [job, queue, enqueue, insert, async, background, infra, sqlite] +uses_functions: [] +uses_types: [job_queue_go_infra] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [fmt, time, github.com/google/uuid] +params: + - name: q + desc: "cola de jobs creada con JobQueueCreate" + - name: jobType + desc: "identificador del tipo de trabajo (ej: 'send_email', 'resize_image')" + - name: payload + desc: "JSON string con los datos del job; si vacio se usa '{}'" + - name: opts + desc: "opciones: WithPriority(n), WithScheduledAt(t), WithMaxAttempts(n)" +output: "UUID del job insertado como string" +tested: true +tests: + - "enqueue_dequeue_atomicidad" + - "dequeue_priority_order" + - "dequeue_jobtype_filter" + - "dequeue_scheduled_in_future_waits" +test_file_path: "functions/infra/job_queue_test.go" +file_path: "functions/infra/job_enqueue.go" +--- + +## Ejemplo + +```go +id, err := JobEnqueue(q, "send_email", `{"to":"user@example.com"}`, + WithPriority(5), + WithMaxAttempts(5), +) +``` + +## Notas + +Usa `github.com/google/uuid` para generar el ID. El payload por defecto es "{}". El campo `scheduled_at` por defecto es `time.Now().UTC()`. La prioridad mas alta se procesa primero (ORDER BY priority DESC). diff --git a/functions/infra/job_fail.go b/functions/infra/job_fail.go new file mode 100644 index 00000000..99efebb8 --- /dev/null +++ b/functions/infra/job_fail.go @@ -0,0 +1,43 @@ +package infra + +import ( + "fmt" + "time" +) + +// JobFail increments the attempt counter and transitions the job to: +// - "failed" when attempts < max_attempts (eligible for retry) +// - "dead" when attempts >= max_attempts (no more retries) +// +// errMsg is stored in the error column for debugging. +func JobFail(q *JobQueue, jobID string, errMsg string) error { + if q == nil { + return fmt.Errorf("job_fail: queue must not be nil") + } + if jobID == "" { + return fmt.Errorf("job_fail: jobID must not be empty") + } + + now := time.Now().UTC().Format(time.RFC3339) + + // Atomically increment attempts and decide status. + query := fmt.Sprintf(` +UPDATE %s +SET + attempts = attempts + 1, + status = CASE WHEN (attempts + 1) >= max_attempts THEN 'dead' ELSE 'failed' END, + error = ?, + completed_at = ? +WHERE id = ? +`, q.TableName) + + res, err := q.DB.Exec(query, errMsg, now, jobID) + if err != nil { + return fmt.Errorf("job_fail: update: %w", err) + } + n, _ := res.RowsAffected() + if n == 0 { + return fmt.Errorf("job_fail: job %q not found", jobID) + } + return nil +} diff --git a/functions/infra/job_fail.md b/functions/infra/job_fail.md new file mode 100644 index 00000000..094b995a --- /dev/null +++ b/functions/infra/job_fail.md @@ -0,0 +1,43 @@ +--- +name: job_fail +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func JobFail(q *JobQueue, jobID string, errMsg string) error" +description: "Incrementa el contador de intentos y transiciona el job a 'failed' (reintentable) o 'dead' (sin mas intentos) segun si attempts >= max_attempts. Almacena el mensaje de error." +tags: [job, queue, fail, retry, dead, sqlite, async, background, infra] +uses_functions: [] +uses_types: [job_queue_go_infra] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [fmt, time] +params: + - name: q + desc: "cola de jobs creada con JobQueueCreate" + - name: jobID + desc: "UUID del job que fallo" + - name: errMsg + desc: "mensaje de error para almacenar en la columna error" +output: "error si el job no existe o falla el UPDATE" +tested: true +tests: + - "fail_increments_attempts" + - "fail_transitions_to_dead" +test_file_path: "functions/infra/job_queue_test.go" +file_path: "functions/infra/job_fail.go" +--- + +## Ejemplo + +```go +if err := handler(job); err != nil { + _ = JobFail(q, job.ID, err.Error()) +} +``` + +## Notas + +El UPDATE es atomico: incrementa attempts y decide el nuevo status en un solo SQL con CASE WHEN. Si `(attempts + 1) >= max_attempts` → status='dead', sino → status='failed'. Los jobs failed pueden ser reintentados manualmente reseteando su status a 'pending'. Los jobs dead no se procesan automaticamente. diff --git a/functions/infra/job_queue_create.go b/functions/infra/job_queue_create.go new file mode 100644 index 00000000..9a99dd8c --- /dev/null +++ b/functions/infra/job_queue_create.go @@ -0,0 +1,72 @@ +package infra + +import ( + "database/sql" + "fmt" +) + +// JobQueueCreate creates (or verifies) the jobs table and required indices in +// the given SQLite database, activates WAL mode, and returns a ready *JobQueue. +// tableName is typically "jobs" but can be any valid SQLite identifier. +// +// Schema created: +// +// CREATE TABLE IF NOT EXISTS jobs ( +// id TEXT PRIMARY KEY, +// type TEXT NOT NULL, +// payload TEXT NOT NULL DEFAULT '{}', +// status TEXT NOT NULL DEFAULT 'pending', +// priority INTEGER NOT NULL DEFAULT 0, +// attempts INTEGER NOT NULL DEFAULT 0, +// max_attempts INTEGER NOT NULL DEFAULT 3, +// scheduled_at TEXT NOT NULL, +// started_at TEXT, +// completed_at TEXT, +// result TEXT, +// error TEXT, +// created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) +// ); +func JobQueueCreate(db *sql.DB, tableName string) (*JobQueue, error) { + if db == nil { + return nil, fmt.Errorf("job_queue_create: db must not be nil") + } + if tableName == "" { + tableName = "jobs" + } + + // Enable WAL mode for concurrent read/write access. + if _, err := db.Exec(`PRAGMA journal_mode=WAL`); err != nil { + return nil, fmt.Errorf("job_queue_create: enable WAL: %w", err) + } + + schema := fmt.Sprintf(` +CREATE TABLE IF NOT EXISTS %s ( + id TEXT PRIMARY KEY, + type TEXT NOT NULL, + payload TEXT NOT NULL DEFAULT '{}', + status TEXT NOT NULL DEFAULT 'pending', + priority INTEGER NOT NULL DEFAULT 0, + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 3, + scheduled_at TEXT NOT NULL, + started_at TEXT, + completed_at TEXT, + result TEXT, + error TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now')) +); +CREATE INDEX IF NOT EXISTS idx_%s_dequeue ON %s (status, priority DESC, scheduled_at ASC) WHERE status = 'pending'; +CREATE INDEX IF NOT EXISTS idx_%s_status ON %s (status); +CREATE INDEX IF NOT EXISTS idx_%s_type ON %s (type); +`, tableName, + tableName, tableName, + tableName, tableName, + tableName, tableName, + ) + + if _, err := db.Exec(schema); err != nil { + return nil, fmt.Errorf("job_queue_create: create schema: %w", err) + } + + return &JobQueue{DB: db, TableName: tableName}, nil +} diff --git a/functions/infra/job_queue_create.md b/functions/infra/job_queue_create.md new file mode 100644 index 00000000..965a6f50 --- /dev/null +++ b/functions/infra/job_queue_create.md @@ -0,0 +1,45 @@ +--- +name: job_queue_create +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func JobQueueCreate(db *sql.DB, tableName string) (*JobQueue, error)" +description: "Crea (o verifica) la tabla de jobs con indices en SQLite, activa WAL mode, y retorna un *JobQueue listo para usar. tableName puede ser cualquier identificador SQLite valido (default: 'jobs')." +tags: [job, queue, sqlite, async, background, infra, create, schema] +uses_functions: [] +uses_types: [job_queue_go_infra] +returns: [job_queue_go_infra] +returns_optional: false +error_type: "error_go_core" +imports: [database/sql, fmt] +params: + - name: db + desc: "conexion SQLite abierta (puede ser :memory: o file-based)" + - name: tableName + desc: "nombre de la tabla de jobs; si esta vacio se usa 'jobs'" +output: "puntero a JobQueue con DB y TableName configurados, listo para enqueue/dequeue" +tested: true +tests: + - "test_queue_create" + - "test_queue_create_default_table_name" + - "test_queue_create_nil_db" +test_file_path: "functions/infra/job_queue_test.go" +file_path: "functions/infra/job_queue_create.go" +--- + +## Ejemplo + +```go +db, _ := sql.Open("sqlite3", "jobs.db?_journal_mode=WAL") +q, err := JobQueueCreate(db, "jobs") +if err != nil { + log.Fatal(err) +} +// q esta listo para JobEnqueue, JobDequeue, etc. +``` + +## Notas + +Activa WAL mode en la DB. Usa `CREATE TABLE IF NOT EXISTS` y `CREATE INDEX IF NOT EXISTS` — es seguro llamarlo multiples veces. Indices creados: `idx_{table}_dequeue` (filtrado partial por status='pending'), `idx_{table}_status`, `idx_{table}_type`. diff --git a/functions/infra/job_queue_test.go b/functions/infra/job_queue_test.go new file mode 100644 index 00000000..f478bbc0 --- /dev/null +++ b/functions/infra/job_queue_test.go @@ -0,0 +1,418 @@ +package infra + +import ( + "context" + "database/sql" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" +) + +// openTestDB opens an in-memory SQLite database for testing. +// MaxOpenConns is set to 1 to ensure all goroutines share the same connection +// (required for in-memory SQLite which is per-connection otherwise). +func openTestDB(t *testing.T) *sql.DB { + t.Helper() + db, err := sql.Open("sqlite3", ":memory:?_journal_mode=WAL") + if err != nil { + t.Fatalf("open test db: %v", err) + } + db.SetMaxOpenConns(1) + t.Cleanup(func() { db.Close() }) + return db +} + +func TestJobQueueCreate(t *testing.T) { + t.Run("test_queue_create", func(t *testing.T) { + db := openTestDB(t) + q, err := JobQueueCreate(db, "jobs") + if err != nil { + t.Fatalf("JobQueueCreate: %v", err) + } + if q == nil { + t.Fatal("expected non-nil JobQueue") + } + if q.TableName != "jobs" { + t.Errorf("TableName = %q, want %q", q.TableName, "jobs") + } + + // Verify table exists by inserting a row directly. + _, err = db.Exec(`INSERT INTO jobs (id, type, scheduled_at) VALUES ('x', 'test', ?)`, + time.Now().UTC().Format(time.RFC3339)) + if err != nil { + t.Fatalf("table not created: %v", err) + } + }) + + t.Run("test_queue_create_default_table_name", func(t *testing.T) { + db := openTestDB(t) + q, err := JobQueueCreate(db, "") + if err != nil { + t.Fatalf("JobQueueCreate with empty name: %v", err) + } + if q.TableName != "jobs" { + t.Errorf("TableName = %q, want %q", q.TableName, "jobs") + } + }) + + t.Run("test_queue_create_nil_db", func(t *testing.T) { + _, err := JobQueueCreate(nil, "jobs") + if err == nil { + t.Fatal("expected error for nil db") + } + }) +} + +func TestJobEnqueueDequeue(t *testing.T) { + t.Run("enqueue_dequeue_atomicidad", func(t *testing.T) { + db := openTestDB(t) + q, err := JobQueueCreate(db, "jobs") + if err != nil { + t.Fatalf("create: %v", err) + } + + id, err := JobEnqueue(q, "send_email", `{"to":"a@b.com"}`) + if err != nil { + t.Fatalf("enqueue: %v", err) + } + if id == "" { + t.Fatal("expected non-empty job ID") + } + + job, err := JobDequeue(q, nil) + if err != nil { + t.Fatalf("dequeue: %v", err) + } + if job == nil { + t.Fatal("expected a job, got nil") + } + if job.ID != id { + t.Errorf("job ID = %q, want %q", job.ID, id) + } + if job.Type != "send_email" { + t.Errorf("job Type = %q, want %q", job.Type, "send_email") + } + if job.Status != JobStatusRunning { + t.Errorf("job Status = %q, want %q", job.Status, JobStatusRunning) + } + + // Second dequeue should return nil (job is now running, not pending). + job2, err := JobDequeue(q, nil) + if err != nil { + t.Fatalf("second dequeue: %v", err) + } + if job2 != nil { + t.Errorf("expected nil second dequeue, got job %q", job2.ID) + } + }) + + t.Run("dequeue_empty_queue_returns_nil", func(t *testing.T) { + db := openTestDB(t) + q, _ := JobQueueCreate(db, "jobs") + + job, err := JobDequeue(q, nil) + if err != nil { + t.Fatalf("dequeue empty: %v", err) + } + if job != nil { + t.Errorf("expected nil, got job %q", job.ID) + } + }) + + t.Run("dequeue_priority_order", func(t *testing.T) { + db := openTestDB(t) + q, _ := JobQueueCreate(db, "jobs") + + _, _ = JobEnqueue(q, "low", "{}", WithPriority(0)) + _, _ = JobEnqueue(q, "high", "{}", WithPriority(10)) + _, _ = JobEnqueue(q, "mid", "{}", WithPriority(5)) + + job, _ := JobDequeue(q, nil) + if job == nil || job.Type != "high" { + t.Errorf("expected high-priority job first, got %v", job) + } + }) + + t.Run("dequeue_jobtype_filter", func(t *testing.T) { + db := openTestDB(t) + q, _ := JobQueueCreate(db, "jobs") + + _, _ = JobEnqueue(q, "email", "{}") + _, _ = JobEnqueue(q, "sms", "{}") + + // Only dequeue "sms" jobs. + job, err := JobDequeue(q, []string{"sms"}) + if err != nil { + t.Fatalf("dequeue with filter: %v", err) + } + if job == nil || job.Type != "sms" { + t.Errorf("expected sms job, got %v", job) + } + }) + + t.Run("dequeue_scheduled_in_future_waits", func(t *testing.T) { + db := openTestDB(t) + q, _ := JobQueueCreate(db, "jobs") + + future := time.Now().Add(1 * time.Hour) + _, _ = JobEnqueue(q, "future", "{}", WithScheduledAt(future)) + + job, err := JobDequeue(q, nil) + if err != nil { + t.Fatalf("dequeue future: %v", err) + } + if job != nil { + t.Errorf("expected nil (future job not yet due), got job %q", job.ID) + } + }) +} + +func TestJobCompleteAndFail(t *testing.T) { + t.Run("complete_fail_transitions", func(t *testing.T) { + db := openTestDB(t) + q, _ := JobQueueCreate(db, "jobs") + + id, _ := JobEnqueue(q, "work", "{}", WithMaxAttempts(3)) + job, _ := JobDequeue(q, nil) + if job == nil { + t.Fatal("expected job") + } + + // Complete it. + if err := JobComplete(q, id, `{"ok":true}`); err != nil { + t.Fatalf("complete: %v", err) + } + + // Verify status in DB. + var status string + _ = db.QueryRow(`SELECT status FROM jobs WHERE id = ?`, id).Scan(&status) + if status != "completed" { + t.Errorf("status = %q, want completed", status) + } + }) + + t.Run("fail_increments_attempts", func(t *testing.T) { + db := openTestDB(t) + q, _ := JobQueueCreate(db, "jobs") + + id, _ := JobEnqueue(q, "work", "{}", WithMaxAttempts(3)) + _, _ = JobDequeue(q, nil) + + // First failure → status = failed (attempts=1 < max=3). + if err := JobFail(q, id, "oops"); err != nil { + t.Fatalf("fail: %v", err) + } + var status string + var attempts int + _ = db.QueryRow(`SELECT status, attempts FROM jobs WHERE id = ?`, id).Scan(&status, &attempts) + if status != "failed" { + t.Errorf("status = %q, want failed", status) + } + if attempts != 1 { + t.Errorf("attempts = %d, want 1", attempts) + } + }) + + t.Run("fail_transitions_to_dead", func(t *testing.T) { + db := openTestDB(t) + q, _ := JobQueueCreate(db, "jobs") + + id, _ := JobEnqueue(q, "work", "{}", WithMaxAttempts(2)) + + // Fail twice → dead. + for i := 0; i < 2; i++ { + // Re-set to pending so we can dequeue again (simulate retry logic). + if i > 0 { + _, _ = db.Exec(`UPDATE jobs SET status = 'pending' WHERE id = ?`, id) + } + _, _ = JobDequeue(q, nil) + _ = JobFail(q, id, fmt.Sprintf("error %d", i+1)) + } + + var status string + _ = db.QueryRow(`SELECT status FROM jobs WHERE id = ?`, id).Scan(&status) + if status != "dead" { + t.Errorf("status = %q, want dead after max_attempts reached", status) + } + }) +} + +func TestJobStatusSummaryFn(t *testing.T) { + t.Run("job_status_summary_format", func(t *testing.T) { + counts := map[string]int{ + "pending": 5, + "running": 2, + "completed": 10, + "failed": 1, + "dead": 0, + } + got := JobStatusSummary(counts) + want := "pending: 5, running: 2, completed: 10, failed: 1, dead: 0" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) + + t.Run("job_status_summary_empty_map", func(t *testing.T) { + got := JobStatusSummary(map[string]int{}) + want := "pending: 0, running: 0, completed: 0, failed: 0, dead: 0" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) +} + +func TestJobWorkerGracefulShutdown(t *testing.T) { + t.Run("worker_graceful_shutdown", func(t *testing.T) { + db := openTestDB(t) + q, _ := JobQueueCreate(db, "jobs") + + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan struct{}) + go func() { + defer close(done) + _ = JobWorker(ctx, q, func(j Job) error { + return nil + }, WithPollInterval(10*time.Millisecond)) + }() + + // Let the worker poll a couple of times. + time.Sleep(50 * time.Millisecond) + cancel() + + select { + case <-done: + // OK — worker exited cleanly. + case <-time.After(2 * time.Second): + t.Fatal("worker did not stop within timeout after context cancellation") + } + }) + + t.Run("worker_processes_jobs", func(t *testing.T) { + db := openTestDB(t) + q, _ := JobQueueCreate(db, "jobs") + + var processed atomic.Int32 + for i := 0; i < 3; i++ { + _, _ = JobEnqueue(q, "task", `{}`) + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + _ = JobWorker(ctx, q, func(j Job) error { + processed.Add(1) + return nil + }, WithPollInterval(10*time.Millisecond)) + }() + + // Wait for all 3 to be processed (with timeout). + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if processed.Load() >= 3 { + break + } + time.Sleep(10 * time.Millisecond) + } + cancel() + + if n := processed.Load(); n < 3 { + t.Errorf("processed %d jobs, want 3", n) + } + }) +} + +func TestJobConcurrency(t *testing.T) { + t.Run("concurrency_multiples_goroutines_dequeue", func(t *testing.T) { + db := openTestDB(t) + q, _ := JobQueueCreate(db, "jobs") + + const numJobs = 50 + for i := 0; i < numJobs; i++ { + _, _ = JobEnqueue(q, "task", fmt.Sprintf(`{"i":%d}`, i)) + } + + var processed atomic.Int32 + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + const numWorkers = 5 + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = JobWorker(ctx, q, func(j Job) error { + processed.Add(1) + return nil + }, WithPollInterval(5*time.Millisecond)) + }() + } + + // Wait until all jobs are processed. + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + if processed.Load() >= numJobs { + break + } + time.Sleep(10 * time.Millisecond) + } + cancel() + wg.Wait() + + if n := processed.Load(); n != numJobs { + t.Errorf("processed %d, want %d (no double-processing with concurrent workers)", n, numJobs) + } + }) +} + +func TestJobCleanupFn(t *testing.T) { + t.Run("job_cleanup_removes_old_terminal_jobs", func(t *testing.T) { + db := openTestDB(t) + q, _ := JobQueueCreate(db, "jobs") + + id, _ := JobEnqueue(q, "work", "{}") + _, _ = JobDequeue(q, nil) + _ = JobComplete(q, id, "") + + // Make the job look old (set created_at in the past, always UTC to match JobCleanup). + past := time.Now().UTC().Add(-25 * time.Hour).Format(time.RFC3339) + _, _ = db.Exec(`UPDATE jobs SET created_at = ? WHERE id = ?`, past, id) + + n, err := JobCleanup(q, 24*time.Hour) + if err != nil { + t.Fatalf("cleanup: %v", err) + } + if n != 1 { + t.Errorf("deleted %d rows, want 1", n) + } + + // Job should be gone. + var count int + _ = db.QueryRow(`SELECT COUNT(*) FROM jobs WHERE id = ?`, id).Scan(&count) + if count != 0 { + t.Errorf("job still in DB after cleanup") + } + }) + + t.Run("job_cleanup_keeps_recent_jobs", func(t *testing.T) { + db := openTestDB(t) + q, _ := JobQueueCreate(db, "jobs") + + id, _ := JobEnqueue(q, "work", "{}") + _, _ = JobDequeue(q, nil) + _ = JobComplete(q, id, "") + + // Job was just created — should not be cleaned up. + n, err := JobCleanup(q, 24*time.Hour) + if err != nil { + t.Fatalf("cleanup: %v", err) + } + if n != 0 { + t.Errorf("deleted %d rows, want 0 (job is recent)", n) + } + }) +} diff --git a/functions/infra/job_queue_types.go b/functions/infra/job_queue_types.go new file mode 100644 index 00000000..458193af --- /dev/null +++ b/functions/infra/job_queue_types.go @@ -0,0 +1,87 @@ +package infra + +import ( + "database/sql" + "time" +) + +// JobStatus represents the lifecycle state of a job. +type JobStatus string + +const ( + JobStatusPending JobStatus = "pending" + JobStatusRunning JobStatus = "running" + JobStatusCompleted JobStatus = "completed" + JobStatusFailed JobStatus = "failed" + JobStatusDead JobStatus = "dead" +) + +// Job represents a unit of asynchronous work stored in SQLite. +type Job struct { + ID string + Type string + Payload string // JSON string + Status JobStatus + Priority int + Attempts int + MaxAttempts int + ScheduledAt time.Time + StartedAt *time.Time + CompletedAt *time.Time + Result *string + Error *string + CreatedAt time.Time +} + +// JobQueue wraps a *sql.DB and the jobs table name. +type JobQueue struct { + DB *sql.DB + TableName string +} + +// JobHandler is the function signature for processing a job. +// Return a non-nil error to trigger retry / fail logic. +type JobHandler func(job Job) error + +// EnqueueOption configures optional behaviour of JobEnqueue. +type EnqueueOption func(*enqueueConfig) + +type enqueueConfig struct { + priority int + scheduledAt time.Time + maxAttempts int +} + +// WithPriority sets the job priority (higher = processed first). +func WithPriority(p int) EnqueueOption { + return func(c *enqueueConfig) { c.priority = p } +} + +// WithScheduledAt delays execution until the given time. +func WithScheduledAt(t time.Time) EnqueueOption { + return func(c *enqueueConfig) { c.scheduledAt = t } +} + +// WithMaxAttempts sets how many times the job may be attempted before dying. +func WithMaxAttempts(n int) EnqueueOption { + return func(c *enqueueConfig) { c.maxAttempts = n } +} + +// WorkerOption configures optional behaviour of JobWorker / JobWorkerPool. +type WorkerOption func(*workerConfig) + +type workerConfig struct { + pollInterval time.Duration + jobTypes []string +} + +// WithPollInterval sets how long the worker sleeps between dequeue attempts. +func WithPollInterval(d time.Duration) WorkerOption { + return func(c *workerConfig) { c.pollInterval = d } +} + +// WithJobTypes restricts the worker to the given job types. +// An empty list means all types. +func WithJobTypes(types ...string) WorkerOption { + return func(c *workerConfig) { c.jobTypes = types } +} diff --git a/functions/infra/job_status.go b/functions/infra/job_status.go new file mode 100644 index 00000000..a0cfb653 --- /dev/null +++ b/functions/infra/job_status.go @@ -0,0 +1,19 @@ +package infra + +import ( + "fmt" + "strings" +) + +// JobStatus formats a map of status → count into a human-readable summary string. +// Example output: "pending: 5, running: 2, completed: 10, failed: 1, dead: 0" +// The output always includes all five canonical statuses in a fixed order. +// This is a pure function — no I/O, no state. +func JobStatusSummary(counts map[string]int) string { + statuses := []string{"pending", "running", "completed", "failed", "dead"} + parts := make([]string, 0, len(statuses)) + for _, s := range statuses { + parts = append(parts, fmt.Sprintf("%s: %d", s, counts[s])) + } + return strings.Join(parts, ", ") +} diff --git a/functions/infra/job_status.md b/functions/infra/job_status.md new file mode 100644 index 00000000..3eec0965 --- /dev/null +++ b/functions/infra/job_status.md @@ -0,0 +1,39 @@ +--- +name: job_status_summary +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: pure +signature: "func JobStatusSummary(counts map[string]int) string" +description: "Formatea un mapa de status→conteo en un resumen legible. Siempre incluye los cinco estados canonicos en orden fijo: pending, running, completed, failed, dead." +tags: [job, queue, status, summary, format, pure, infra] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "" +imports: [fmt, strings] +params: + - name: counts + desc: "mapa de nombre de status (pending/running/completed/failed/dead) a numero de jobs en ese estado" +output: "string formateado 'pending: N, running: N, completed: N, failed: N, dead: N'" +tested: true +tests: + - "job_status_summary_format" + - "job_status_summary_empty_map" +test_file_path: "functions/infra/job_queue_test.go" +file_path: "functions/infra/job_status.go" +--- + +## Ejemplo + +```go +counts := map[string]int{"pending": 5, "running": 2, "completed": 10} +summary := JobStatusSummary(counts) +// "pending: 5, running: 2, completed: 10, failed: 0, dead: 0" +``` + +## Notas + +Funcion pura. Los estados no presentes en el mapa se tratan como 0. El orden de salida es siempre fijo: pending, running, completed, failed, dead. diff --git a/functions/infra/job_worker.go b/functions/infra/job_worker.go new file mode 100644 index 00000000..405fe12a --- /dev/null +++ b/functions/infra/job_worker.go @@ -0,0 +1,70 @@ +package infra + +import ( + "context" + "fmt" + "time" +) + +// JobWorker runs a poll loop in the calling goroutine, dequeuing and processing +// jobs until ctx is cancelled. handler is called for each job; its return value +// drives complete vs fail logic. The worker sleeps pollInterval between empty +// dequeue attempts. +// +// Options: WithPollInterval (default 1s), WithJobTypes (default all types). +// +// This function blocks until ctx is done. Run it in a goroutine: +// +// go JobWorker(ctx, q, handler, WithPollInterval(2*time.Second)) +func JobWorker(ctx context.Context, q *JobQueue, handler JobHandler, opts ...WorkerOption) error { + if q == nil { + return fmt.Errorf("job_worker: queue must not be nil") + } + if handler == nil { + return fmt.Errorf("job_worker: handler must not be nil") + } + + cfg := &workerConfig{ + pollInterval: time.Second, + } + for _, o := range opts { + o(cfg) + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + job, err := JobDequeue(q, cfg.jobTypes) + if err != nil { + // Transient DB error — wait and retry. + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(cfg.pollInterval): + } + continue + } + + if job == nil { + // Queue empty — wait before polling again. + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(cfg.pollInterval): + } + continue + } + + // Process the job. + handlerErr := handler(*job) + if handlerErr != nil { + _ = JobFail(q, job.ID, handlerErr.Error()) + } else { + _ = JobComplete(q, job.ID, "") + } + } +} diff --git a/functions/infra/job_worker.md b/functions/infra/job_worker.md new file mode 100644 index 00000000..947107f1 --- /dev/null +++ b/functions/infra/job_worker.md @@ -0,0 +1,49 @@ +--- +name: job_worker +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func JobWorker(ctx context.Context, q *JobQueue, handler JobHandler, opts ...WorkerOption) error" +description: "Ejecuta un poll loop bloqueante que desencola y procesa jobs hasta que el context sea cancelado. El handler determina complete vs fail. Opciones: WithPollInterval (default 1s), WithJobTypes." +tags: [job, queue, worker, goroutine, poll, async, background, infra, concurrency] +uses_functions: [job_dequeue_go_infra, job_complete_go_infra, job_fail_go_infra] +uses_types: [job_queue_go_infra, job_go_infra] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [context, fmt, time] +params: + - name: ctx + desc: "context que al cancelarse detiene el worker limpiamente (graceful shutdown)" + - name: q + desc: "cola de jobs creada con JobQueueCreate" + - name: handler + desc: "funcion que procesa un job; retorno nil = completado, error = fallo" + - name: opts + desc: "opciones: WithPollInterval(d), WithJobTypes(types...)" +output: "ctx.Err() cuando el context se cancela, o error de configuracion" +tested: true +tests: + - "worker_graceful_shutdown" + - "worker_processes_jobs" +test_file_path: "functions/infra/job_queue_test.go" +file_path: "functions/infra/job_worker.go" +--- + +## Ejemplo + +```go +ctx, cancel := context.WithCancel(context.Background()) +defer cancel() + +go JobWorker(ctx, q, func(job Job) error { + fmt.Println("processing", job.ID, job.Type) + return nil // complete +}, WithPollInterval(500*time.Millisecond), WithJobTypes("send_email")) +``` + +## Notas + +Funcion bloqueante — ejecutar en una goroutine. Retorna `ctx.Err()` cuando el context se cancela (graceful shutdown). Los errores transitorios de dequeue (DB) generan un sleep y retry. El handler tiene la responsabilidad completa del procesamiento; JobWorker se encarga de complete/fail. diff --git a/functions/infra/job_worker_pool.go b/functions/infra/job_worker_pool.go new file mode 100644 index 00000000..cf02cdea --- /dev/null +++ b/functions/infra/job_worker_pool.go @@ -0,0 +1,36 @@ +package infra + +import ( + "context" + "fmt" + + "golang.org/x/sync/errgroup" +) + +// JobWorkerPool starts n workers concurrently, all sharing the same queue and +// handler. It uses errgroup for structured concurrency and supports graceful +// shutdown via ctx cancellation. +// +// The function blocks until all workers exit. It returns the first non-context +// error encountered, or ctx.Err() if all workers stopped due to cancellation. +// +// Options: WithPollInterval, WithJobTypes (applied to every worker). +func JobWorkerPool(ctx context.Context, q *JobQueue, n int, handler JobHandler, opts ...WorkerOption) error { + if q == nil { + return fmt.Errorf("job_worker_pool: queue must not be nil") + } + if handler == nil { + return fmt.Errorf("job_worker_pool: handler must not be nil") + } + if n <= 0 { + return fmt.Errorf("job_worker_pool: n must be > 0, got %d", n) + } + + g, gctx := errgroup.WithContext(ctx) + for i := 0; i < n; i++ { + g.Go(func() error { + return JobWorker(gctx, q, handler, opts...) + }) + } + return g.Wait() +} diff --git a/functions/infra/job_worker_pool.md b/functions/infra/job_worker_pool.md new file mode 100644 index 00000000..7ca60ef0 --- /dev/null +++ b/functions/infra/job_worker_pool.md @@ -0,0 +1,50 @@ +--- +name: job_worker_pool +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func JobWorkerPool(ctx context.Context, q *JobQueue, n int, handler JobHandler, opts ...WorkerOption) error" +description: "Lanza n workers concurrentes que comparten la misma cola y handler. Usa errgroup para concurrencia estructurada y soporta graceful shutdown via context. Bloquea hasta que todos los workers terminen." +tags: [job, queue, worker, pool, concurrency, errgroup, async, background, infra, graceful-shutdown] +uses_functions: [job_worker_go_infra] +uses_types: [job_queue_go_infra] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [context, fmt, golang.org/x/sync/errgroup] +params: + - name: ctx + desc: "context que al cancelarse detiene todos los workers; si uno falla, los demas tambien se detienen" + - name: q + desc: "cola de jobs compartida por todos los workers" + - name: n + desc: "numero de workers concurrentes (debe ser > 0)" + - name: handler + desc: "funcion de procesamiento compartida por todos los workers" + - name: opts + desc: "opciones aplicadas a cada worker: WithPollInterval, WithJobTypes" +output: "primer error no-context de cualquier worker, o ctx.Err() si todos terminaron por cancelacion" +tested: true +tests: + - "concurrency_multiples_goroutines_dequeue" +test_file_path: "functions/infra/job_queue_test.go" +file_path: "functions/infra/job_worker_pool.go" +--- + +## Ejemplo + +```go +ctx, cancel := context.WithCancel(context.Background()) +defer cancel() + +// 5 workers procesando emails en paralelo +err := JobWorkerPool(ctx, q, 5, func(job Job) error { + return sendEmail(job) +}, WithPollInterval(100*time.Millisecond)) +``` + +## Notas + +Usa `golang.org/x/sync/errgroup`. Si un worker retorna un error no-context, `errgroup` cancela el context derivado y todos los demas workers se detienen. El graceful shutdown ocurre al cancelar el context original. diff --git a/types/infra/job.md b/types/infra/job.md new file mode 100644 index 00000000..c4f69597 --- /dev/null +++ b/types/infra/job.md @@ -0,0 +1,31 @@ +--- +name: job +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type Job struct { + ID string + Type string + Payload string + Status JobStatus + Priority int + Attempts int + MaxAttempts int + ScheduledAt time.Time + StartedAt *time.Time + CompletedAt *time.Time + Result *string + Error *string + CreatedAt time.Time + } +description: "Unidad de trabajo asincrono almacenada en SQLite. Payload es un JSON string. Status evoluciona de pending a running y luego a completed, failed o dead." +tags: [job, queue, async, background, sqlite, infra] +uses_types: [job_status_go_infra] +file_path: "functions/infra/job_queue_types.go" +--- + +## Notas + +Tipo producto. Los campos `StartedAt`, `CompletedAt`, `Result` y `Error` son punteros — son nil hasta que el job alcanza el estado correspondiente. `Payload` es siempre un JSON string (minimo "{}"). diff --git a/types/infra/job_queue.md b/types/infra/job_queue.md new file mode 100644 index 00000000..39f2c77a --- /dev/null +++ b/types/infra/job_queue.md @@ -0,0 +1,20 @@ +--- +name: job_queue +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type JobQueue struct { + DB *sql.DB + TableName string + } +description: "Handle para una cola de jobs SQLite. Wrappea un *sql.DB y el nombre de la tabla de jobs. Se crea con JobQueueCreate." +tags: [job, queue, sqlite, infra, async] +uses_types: [] +file_path: "functions/infra/job_queue_types.go" +--- + +## Notas + +Tipo producto minimo — todas las funciones del subsistema de jobs lo reciben como primer argumento. TableName permite tener multiples colas en la misma DB. Se crea con `JobQueueCreate`. diff --git a/types/infra/job_status.md b/types/infra/job_status.md new file mode 100644 index 00000000..27c5648b --- /dev/null +++ b/types/infra/job_status.md @@ -0,0 +1,25 @@ +--- +name: job_status +lang: go +domain: infra +version: "1.0.0" +algebraic: sum +definition: | + type JobStatus string + + const ( + JobStatusPending JobStatus = "pending" + JobStatusRunning JobStatus = "running" + JobStatusCompleted JobStatus = "completed" + JobStatusFailed JobStatus = "failed" + JobStatusDead JobStatus = "dead" + ) +description: "Estado del ciclo de vida de un job. Tipo suma con cinco variantes: pending, running, completed, failed, dead." +tags: [job, status, queue, async, infra] +uses_types: [] +file_path: "functions/infra/job_queue_types.go" +--- + +## Notas + +Tipo suma. Ciclo de vida normal: pending → running → completed. En error: pending → running → failed (reintentos posibles) → dead (sin mas intentos). Un job con `attempts >= max_attempts` pasa directamente a dead. From 13e45aa22ef1444a101235c60fd9e95e7530f279 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Mon, 13 Apr 2026 02:01:03 +0200 Subject: [PATCH 2/2] docs: cerrar issue 0013 Co-Authored-By: Claude Sonnet 4.6 --- dev/issues/completed/0013-background-jobs.md | 409 +++++++++++++++++++ 1 file changed, 409 insertions(+) create mode 100644 dev/issues/completed/0013-background-jobs.md diff --git a/dev/issues/completed/0013-background-jobs.md b/dev/issues/completed/0013-background-jobs.md new file mode 100644 index 00000000..7c9f9d9f --- /dev/null +++ b/dev/issues/completed/0013-background-jobs.md @@ -0,0 +1,409 @@ +# 0013 — Background Job Queue + +## Metadata + +| Campo | Valor | +|-------|-------| +| **ID** | 0013 | +| **Estado** | pendiente | +| **Prioridad** | alta | +| **Tipo** | feature | + +## Dependencias + +Ninguna. + +--- + +## Objetivo + +Cola de trabajos asincrona basada en SQLite para apps Go del registry. Permite a cualquier app encolar tareas (enviar emails, procesar archivos, ejecutar pipelines) y procesarlas en background con workers concurrentes, reintentos automaticos y dead-letter queue — todo respaldado por SQLite sin depender de Redis, RabbitMQ ni ningun servicio externo. + +## Contexto + +- Actualmente no existen funciones de job queue en el registry. Cada app que necesita procesamiento asincrono tiene que construir su propia solucion ad-hoc. +- SQLite encaja perfectamente con el stack existente: `registry.db` y `operations.db` ya demuestran que SQLite escala para los volúmenes del proyecto. +- Las apps Go del registry ya usan `mattn/go-sqlite3` con FTS5 y WAL mode — no hay dependencias nuevas. +- `map_concurrent_go_core` ofrece paralelismo sincrono pero no persistencia: si el proceso muere, se pierde el trabajo. Una cola persistente en SQLite sobrevive a reinicios. +- Patron existente en el ecosistema: `gocraft/work`, `faktory`, `river` — pero todos requieren Redis o Postgres. Una solucion SQLite-native es mas coherente con el proyecto. + +## Arquitectura + +``` +functions/infra/ +├── job_queue_create.go — NEW: crea tabla de jobs en SQLite, retorna handle +├── job_queue_create.md — NEW +├── job_enqueue.go — NEW: añade job a la cola +├── job_enqueue.md — NEW +├── job_dequeue.go — NEW: reclama proximo job pendiente (atomico) +├── job_dequeue.md — NEW +├── job_complete.go — NEW: marca job completado con resultado +├── job_complete.md — NEW +├── job_fail.go — NEW: marca job fallido, incrementa intentos +├── job_fail.md — NEW +├── job_worker.go — NEW: goroutine que pollea y procesa jobs +├── job_worker.md — NEW +├── job_worker_pool.go — NEW: pool de N workers con graceful shutdown +├── job_worker_pool.md — NEW +├── job_cleanup.go — NEW: elimina jobs antiguos completados/fallidos +├── job_cleanup.md — NEW +├── job_status.go — NEW: resumen de estado de la cola (pura) +├── job_status.md — NEW + +types/infra/ +├── job.md — NEW: metadata del tipo Job +├── job_queue.md — NEW: metadata del tipo JobQueue +├── job_status.md — NEW: metadata del tipo JobStatus (sum) +├── job_handler.md — NEW: metadata del tipo JobHandler +``` + +### Patron pure core / impure shell + +- **Pure:** `job_status` — formatea conteos en un resumen sin I/O. +- **Impure:** Todo lo demas — interactúa con SQLite (I/O de disco) y goroutines (concurrencia). + +## Diseno + +### Tipos + +```go +// JobStatus es un sum type: estados posibles de un job +type JobStatus string + +const ( + JobStatusPending JobStatus = "pending" + JobStatusRunning JobStatus = "running" + JobStatusCompleted JobStatus = "completed" + JobStatusFailed JobStatus = "failed" + JobStatusDead JobStatus = "dead" // max_attempts superado +) + +// Job representa una unidad de trabajo en la cola +type Job struct { + ID string `json:"id"` // UUID generado al encolar + Type string `json:"type"` // tipo del job (ej: "send_email", "process_file") + Payload string `json:"payload"` // JSON arbitrario con los datos del job + Status JobStatus `json:"status"` // pending | running | completed | failed | dead + Priority int `json:"priority"` // 0 = normal, mayor = mas prioritario + Attempts int `json:"attempts"` // intentos ejecutados hasta ahora + MaxAttempts int `json:"max_attempts"` // maximo de intentos antes de dead-letter + ScheduledAt time.Time `json:"scheduled_at"` // cuando debe ejecutarse (permite scheduling futuro) + StartedAt *time.Time `json:"started_at"` // cuando empezo a ejecutarse (nil si pending) + CompletedAt *time.Time `json:"completed_at"` // cuando termino (nil si no completado) + Result string `json:"result"` // JSON con resultado de ejecucion exitosa + Error string `json:"error"` // mensaje de error del ultimo intento + CreatedAt time.Time `json:"created_at"` // timestamp de creacion +} + +// JobQueue es el handle a una cola respaldada por SQLite +type JobQueue struct { + DB *sql.DB // conexion SQLite con WAL mode + TableName string // nombre de la tabla (permite multiples colas en una BD) +} + +// JobHandler es la funcion que procesa un job +// Recibe el job completo y retorna error si falla. +type JobHandler func(job Job) error +``` + +### Schema SQLite + +```sql +CREATE TABLE IF NOT EXISTS jobs ( + id TEXT PRIMARY KEY, + type TEXT NOT NULL, + payload TEXT NOT NULL DEFAULT '{}', + status TEXT NOT NULL DEFAULT 'pending', + priority INTEGER NOT NULL DEFAULT 0, + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 3, + scheduled_at TEXT NOT NULL, + started_at TEXT, + completed_at TEXT, + result TEXT, + error TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) +); + +CREATE INDEX IF NOT EXISTS idx_jobs_dequeue + ON jobs (status, priority DESC, scheduled_at ASC) + WHERE status = 'pending'; + +CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs (status); +CREATE INDEX IF NOT EXISTS idx_jobs_type ON jobs (type); +CREATE INDEX IF NOT EXISTS idx_jobs_cleanup ON jobs (status, completed_at) + WHERE status IN ('completed', 'failed', 'dead'); +``` + +El indice parcial `idx_jobs_dequeue` es clave: SQLite lo usa exclusivamente para el `SELECT` de dequeue, manteniendo el scan rapido incluso con millones de jobs historicos en la tabla. + +### Funciones + +| Funcion | Purity | Firma (simplificada) | Descripcion | +|---------|--------|---------------------|-------------| +| `job_queue_create` | impure | `(db *sql.DB, tableName string) (*JobQueue, error)` | Crea tabla de jobs con indices, activa WAL mode, retorna handle | +| `job_enqueue` | impure | `(q *JobQueue, jobType string, payload string, opts ...EnqueueOption) (string, error)` | Inserta job con priority, scheduled_at, max_attempts. Retorna job ID | +| `job_dequeue` | impure | `(q *JobQueue, jobTypes ...string) (*Job, error)` | Reclama atomicamente el proximo job pending. Nil si cola vacia | +| `job_complete` | impure | `(q *JobQueue, jobID string, result string) error` | Marca job como completed con resultado JSON | +| `job_fail` | impure | `(q *JobQueue, jobID string, errMsg string) error` | Marca como failed, incrementa attempts. Si attempts >= max_attempts, marca como dead | +| `job_worker` | impure | `(ctx context.Context, q *JobQueue, handler JobHandler, opts ...WorkerOption) error` | Goroutine que pollea la cola y ejecuta handler. Para con context | +| `job_worker_pool` | impure | `(ctx context.Context, q *JobQueue, handler JobHandler, n int, opts ...WorkerOption) error` | Lanza N workers, espera a que todos terminen con graceful shutdown | +| `job_cleanup` | impure | `(q *JobQueue, olderThan time.Duration) (int64, error)` | Elimina jobs completed/failed/dead mas antiguos que la retencion | +| `job_status` | pure | `(counts map[JobStatus]int) string` | Formatea resumen legible: "pending: 5, running: 2, completed: 100, failed: 3, dead: 1" | + +### Dequeue atomico + +La operacion critica es `job_dequeue` — debe reclamar un job sin condiciones de carrera cuando hay multiples workers. SQLite no tiene `SELECT FOR UPDATE`, pero se puede lograr con una transaccion exclusiva: + +```go +func JobDequeue(q *JobQueue, jobTypes ...string) (*Job, error) { + tx, err := q.DB.Begin() + if err != nil { + return nil, err + } + defer tx.Rollback() + + // SELECT el proximo job elegible + query := fmt.Sprintf(` + SELECT id, type, payload, status, priority, attempts, max_attempts, + scheduled_at, result, error, created_at + FROM %s + WHERE status = 'pending' + AND scheduled_at <= strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now') + ORDER BY priority DESC, scheduled_at ASC + LIMIT 1 + `, q.TableName) + + // Si se especifican tipos, filtrar + if len(jobTypes) > 0 { + // ... añadir AND type IN (...) + } + + var job Job + err = tx.QueryRow(query).Scan(/* ... */) + if err == sql.ErrNoRows { + return nil, nil // cola vacia, no es error + } + if err != nil { + return nil, err + } + + // UPDATE atomico dentro de la misma transaccion + _, err = tx.Exec(fmt.Sprintf(` + UPDATE %s SET status = 'running', started_at = strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now') + WHERE id = ? + `, q.TableName), job.ID) + if err != nil { + return nil, err + } + + return &job, tx.Commit() +} +``` + +Con WAL mode, los writers serializan transacciones de escritura pero los readers no bloquean. Dado que `dequeue` es la unica operacion que necesita exclusividad de escritura y es extremadamente rapida (un SELECT indexado + un UPDATE por PK), la contención es minima incluso con varios workers. + +### Backoff exponencial en reintentos + +Cuando un job falla, `job_fail` incrementa `attempts` y lo marca como `failed`. El worker, al reclamar un job previamente fallido, respeta un backoff basado en el numero de intentos: + +```go +// Delay antes de reintentar: base * 2^(attempts-1) +// Intento 1: 5s, intento 2: 10s, intento 3: 20s +backoff := baseDelay * time.Duration(1<= max_attempts`, el job pasa a status `dead` (dead-letter). Los jobs dead no se reintentan automaticamente — quedan para inspeccion manual o re-enqueue explicito. + +### Worker loop + +```go +func JobWorker(ctx context.Context, q *JobQueue, handler JobHandler, opts ...WorkerOption) error { + cfg := defaultWorkerConfig() // pollInterval: 1s, jobTypes: nil (todos) + for _, opt := range opts { + opt(&cfg) + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + job, err := JobDequeue(q, cfg.jobTypes...) + if err != nil { + // log error, backoff, continue + time.Sleep(cfg.pollInterval) + continue + } + + if job == nil { + // cola vacia, esperar + time.Sleep(cfg.pollInterval) + continue + } + + // Ejecutar handler + if err := handler(*job); err != nil { + JobFail(q, job.ID, err.Error()) + } else { + JobComplete(q, job.ID, "") // resultado vacio por defecto + } + } +} +``` + +## Tareas + +### Fase 1: Tipos + +- [ ] **1.1** Crear tipo `Job` en `functions/infra/job.go` con `.md` en `types/infra/job.md` +- [ ] **1.2** Crear tipo `JobQueue` en `functions/infra/job_queue.go` con `.md` en `types/infra/job_queue.md` +- [ ] **1.3** Crear tipo `JobStatus` (sum) en `functions/infra/job_status_type.go` con `.md` en `types/infra/job_status.md` +- [ ] **1.4** Crear tipo `JobHandler` en `functions/infra/job_handler.go` con `.md` en `types/infra/job_handler.md` + +### Fase 2: Funciones core (CRUD de jobs) + +- [ ] **2.1** `job_queue_create` — crea tabla con schema e indices, configura WAL mode, retorna `*JobQueue` +- [ ] **2.2** `job_enqueue` — INSERT con UUID generado, soporte para options (priority, scheduled_at, max_attempts) +- [ ] **2.3** `job_dequeue` — SELECT + UPDATE atomico en transaccion, nil si cola vacia +- [ ] **2.4** `job_complete` — UPDATE status=completed, set completed_at y result +- [ ] **2.5** `job_fail` — UPDATE status=failed|dead, incrementar attempts, set error +- [ ] **2.6** `job_status` (pure) — recibe map de conteos, retorna string formateado + +### Fase 3: Workers y mantenimiento + +- [ ] **3.1** `job_worker` — goroutine con poll loop, context cancelable, backoff configurable +- [ ] **3.2** `job_worker_pool` — lanza N workers con `errgroup`, graceful shutdown via context +- [ ] **3.3** `job_cleanup` — DELETE de jobs completed/failed/dead mas antiguos que la retencion + +### Fase 4: Tests y validacion + +- [ ] **4.1** Tests para `job_queue_create` — verifica tabla e indices creados +- [ ] **4.2** Tests para enqueue/dequeue — verifica atomicidad, prioridad, scheduling futuro +- [ ] **4.3** Tests para complete/fail — verifica transiciones de estado, dead-letter tras max_attempts +- [ ] **4.4** Tests para worker — verifica procesamiento, reintentos, graceful shutdown con context +- [ ] **4.5** Tests de concurrencia — multiples goroutines dequeue simultaneo, ningun job procesado dos veces +- [ ] **4.6** `fn index` y verificar que todas las funciones y tipos aparecen en registry.db +- [ ] **4.7** Verificar `go vet -tags fts5` y `go test -tags fts5 -race ./functions/infra/` + +--- + +## Ejemplo de uso + +### Cola de envio de emails + +```go +// En una app que necesita enviar emails en background + +// 1. Crear la cola +db, _ := sql.Open("sqlite3", "operations.db?_journal_mode=wal") +queue, _ := infra.JobQueueCreate(db, "jobs") + +// 2. Encolar emails desde un handler HTTP +func handleSignup(w http.ResponseWriter, r *http.Request) { + user := createUser(r) + + payload, _ := json.Marshal(map[string]string{ + "to": user.Email, + "subject": "Bienvenido", + "body": "Gracias por registrarte...", + }) + + jobID, _ := infra.JobEnqueue(queue, "send_email", string(payload), + infra.WithPriority(1), + infra.WithMaxAttempts(5), + ) + + log.Printf("email encolado: job=%s user=%s", jobID, user.Email) + infra.HttpJsonResponse(w, 201, user) +} + +// 3. Worker que procesa los emails +func emailHandler(job infra.Job) error { + var email struct { + To string `json:"to"` + Subject string `json:"subject"` + Body string `json:"body"` + } + json.Unmarshal([]byte(job.Payload), &email) + return sendEmail(email.To, email.Subject, email.Body) +} + +// 4. Arrancar pool de workers +ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) +defer cancel() + +infra.JobWorkerPool(ctx, queue, emailHandler, 3, + infra.WithPollInterval(2*time.Second), + infra.WithJobTypes("send_email"), +) +``` + +### Cola de procesamiento de archivos + +```go +// Encolar procesamiento de archivos con prioridad y scheduling + +// Job de alta prioridad: procesar ahora +infra.JobEnqueue(queue, "process_csv", `{"path": "/data/ventas.csv", "format": "monthly"}`, + infra.WithPriority(10), +) + +// Job schedulado para las 3am +infra.JobEnqueue(queue, "generate_report", `{"type": "weekly", "format": "pdf"}`, + infra.WithScheduledAt(nextAt3AM()), +) + +// Worker que despacha por tipo +func fileHandler(job infra.Job) error { + switch job.Type { + case "process_csv": + return processCSV(job.Payload) + case "generate_report": + return generateReport(job.Payload) + default: + return fmt.Errorf("tipo de job desconocido: %s", job.Type) + } +} +``` + +### Monitoreo y limpieza + +```go +// Consultar estado de la cola +rows, _ := db.Query(` + SELECT status, COUNT(*) FROM jobs GROUP BY status +`) +counts := map[infra.JobStatus]int{} +for rows.Next() { + var s string; var c int + rows.Scan(&s, &c) + counts[infra.JobStatus(s)] = c +} +fmt.Println(infra.JobStatusSummary(counts)) +// → "pending: 12, running: 3, completed: 1458, failed: 7, dead: 2" + +// Limpiar jobs viejos (retener 7 dias) +deleted, _ := infra.JobCleanup(queue, 7*24*time.Hour) +log.Printf("limpiados %d jobs antiguos", deleted) +``` + +## Decisiones de diseno + +- **SQLite como backend unico:** Coherente con el stack del proyecto. No introduce dependencias externas (Redis, Postgres). WAL mode permite lecturas concurrentes sin bloquear al writer. +- **Una tabla por cola, no una BD por cola:** Permite que una app tenga su cola en su propio `operations.db` usando un table name custom, o que varias colas convivan en la misma BD con distintos table names. +- **Dequeue con transaccion, no con SKIP LOCKED:** SQLite no soporta `SKIP LOCKED`. La transaccion exclusiva es correcta y suficiente para el nivel de concurrencia esperado (decenas de workers, no miles). +- **Poll-based, no event-driven:** SQLite no tiene `LISTEN/NOTIFY`. El polling con intervalo configurable (default 1s) es simple y predecible. Para la escala de uso esperada (apps personales, no sistemas de alta frecuencia), el overhead es despreciable. +- **Functional options pattern (WithPriority, WithMaxAttempts):** Permite extender sin romper firmas existentes. Idomático en Go. +- **Dead-letter como status, no como tabla separada:** Los jobs dead quedan en la misma tabla para inspeccion. `job_cleanup` los borra despues del periodo de retencion. +- **Tipos nativos en firmas Go:** Siguiendo la regla del registry, las funciones usan `*sql.DB`, `string`, `time.Duration` — no tipos del registry en la firma. Los tipos (`Job`, `JobQueue`, etc.) se documentan en `uses_types`/`returns` del frontmatter. + +## Riesgos + +- **Contención de escritura con muchos workers:** SQLite serializa escrituras. Con WAL mode y transacciones cortas (microsegundos), esto no es problema para decenas de workers. Si alguna app necesitara miles de dequeues/segundo, habria que migrar a Postgres — pero eso esta fuera del scope de este registry. +- **Polling waste con cola vacia:** Workers consumen CPU haciendo SELECT cada segundo aunque no haya trabajo. Mitigado con backoff adaptativo: si N polls consecutivos retornan nil, el intervalo crece gradualmente (hasta un techo configurable). Se resetea al primer job encontrado. +- **Jobs zombies (running forever):** Si un worker muere sin completar ni fallar el job, queda en status `running` indefinidamente. Solucion: `job_worker` registra un timeout por job; si el handler no retorna en ese tiempo, lo marca como failed. Complementar con un sweep periodico que busque jobs `running` con `started_at` mas antiguo que un umbral. +- **Payload sin schema:** El payload es JSON libre (`string`). No hay validacion de estructura al encolar. Esto es intencional — cada `JobHandler` valida su propio payload. Pero un typo en el payload causa errores en runtime, no en compile time. +- **Limpieza manual:** `job_cleanup` debe invocarse explicitamente (desde un cron, un job periodico, o al arrancar la app). No hay garbage collection automatico. Documentar en el ejemplo de uso.