Files
fn_registry/dev/issues/completed/0013-background-jobs.md
T

18 KiB

id, title, status, type, domain, scope, priority, depends, blocks, related, created, updated, tags
id title status type domain scope priority depends blocks related created updated tags
0013 Background Job Queue completado feature
multi-app alta
2026-05-17 2026-05-17

0013 — Background Job Queue

Metadata

Campo Valor
ID 0013
Estado pendiente
Prioridad alta
Tipo feature

Dependencias

Ninguna.


Objetivo

Cola de trabajos asincrona basada en SQLite para apps Go del registry. Permite a cualquier app encolar tareas (enviar emails, procesar archivos, ejecutar pipelines) y procesarlas en background con workers concurrentes, reintentos automaticos y dead-letter queue — todo respaldado por SQLite sin depender de Redis, RabbitMQ ni ningun servicio externo.

Contexto

  • Actualmente no existen funciones de job queue en el registry. Cada app que necesita procesamiento asincrono tiene que construir su propia solucion ad-hoc.
  • SQLite encaja perfectamente con el stack existente: registry.db y operations.db ya demuestran que SQLite escala para los volúmenes del proyecto.
  • Las apps Go del registry ya usan mattn/go-sqlite3 con FTS5 y WAL mode — no hay dependencias nuevas.
  • map_concurrent_go_core ofrece paralelismo sincrono pero no persistencia: si el proceso muere, se pierde el trabajo. Una cola persistente en SQLite sobrevive a reinicios.
  • Patron existente en el ecosistema: gocraft/work, faktory, river — pero todos requieren Redis o Postgres. Una solucion SQLite-native es mas coherente con el proyecto.

Arquitectura

functions/infra/
├── job_queue_create.go         — NEW: crea tabla de jobs en SQLite, retorna handle
├── job_queue_create.md         — NEW
├── job_enqueue.go              — NEW: añade job a la cola
├── job_enqueue.md              — NEW
├── job_dequeue.go              — NEW: reclama proximo job pendiente (atomico)
├── job_dequeue.md              — NEW
├── job_complete.go             — NEW: marca job completado con resultado
├── job_complete.md             — NEW
├── job_fail.go                 — NEW: marca job fallido, incrementa intentos
├── job_fail.md                 — NEW
├── job_worker.go               — NEW: goroutine que pollea y procesa jobs
├── job_worker.md               — NEW
├── job_worker_pool.go          — NEW: pool de N workers con graceful shutdown
├── job_worker_pool.md          — NEW
├── job_cleanup.go              — NEW: elimina jobs antiguos completados/fallidos
├── job_cleanup.md              — NEW
├── job_status.go               — NEW: resumen de estado de la cola (pura)
├── job_status.md               — NEW

types/infra/
├── job.md                      — NEW: metadata del tipo Job
├── job_queue.md                — NEW: metadata del tipo JobQueue
├── job_status.md               — NEW: metadata del tipo JobStatus (sum)
├── job_handler.md              — NEW: metadata del tipo JobHandler

Patron pure core / impure shell

  • Pure: job_status — formatea conteos en un resumen sin I/O.
  • Impure: Todo lo demas — interactúa con SQLite (I/O de disco) y goroutines (concurrencia).

Diseno

Tipos

// JobStatus es un sum type: estados posibles de un job
type JobStatus string

const (
    JobStatusPending   JobStatus = "pending"
    JobStatusRunning   JobStatus = "running"
    JobStatusCompleted JobStatus = "completed"
    JobStatusFailed    JobStatus = "failed"
    JobStatusDead      JobStatus = "dead" // max_attempts superado
)

// Job representa una unidad de trabajo en la cola
type Job struct {
    ID           string    `json:"id"`            // UUID generado al encolar
    Type         string    `json:"type"`          // tipo del job (ej: "send_email", "process_file")
    Payload      string    `json:"payload"`       // JSON arbitrario con los datos del job
    Status       JobStatus `json:"status"`        // pending | running | completed | failed | dead
    Priority     int       `json:"priority"`      // 0 = normal, mayor = mas prioritario
    Attempts     int       `json:"attempts"`      // intentos ejecutados hasta ahora
    MaxAttempts  int       `json:"max_attempts"`  // maximo de intentos antes de dead-letter
    ScheduledAt  time.Time `json:"scheduled_at"`  // cuando debe ejecutarse (permite scheduling futuro)
    StartedAt    *time.Time `json:"started_at"`   // cuando empezo a ejecutarse (nil si pending)
    CompletedAt  *time.Time `json:"completed_at"` // cuando termino (nil si no completado)
    Result       string    `json:"result"`        // JSON con resultado de ejecucion exitosa
    Error        string    `json:"error"`         // mensaje de error del ultimo intento
    CreatedAt    time.Time `json:"created_at"`    // timestamp de creacion
}

// JobQueue es el handle a una cola respaldada por SQLite
type JobQueue struct {
    DB        *sql.DB // conexion SQLite con WAL mode
    TableName string  // nombre de la tabla (permite multiples colas en una BD)
}

// JobHandler es la funcion que procesa un job
// Recibe el job completo y retorna error si falla.
type JobHandler func(job Job) error

Schema SQLite

CREATE TABLE IF NOT EXISTS jobs (
    id           TEXT PRIMARY KEY,
    type         TEXT NOT NULL,
    payload      TEXT NOT NULL DEFAULT '{}',
    status       TEXT NOT NULL DEFAULT 'pending',
    priority     INTEGER NOT NULL DEFAULT 0,
    attempts     INTEGER NOT NULL DEFAULT 0,
    max_attempts INTEGER NOT NULL DEFAULT 3,
    scheduled_at TEXT NOT NULL,
    started_at   TEXT,
    completed_at TEXT,
    result       TEXT,
    error        TEXT,
    created_at   TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
);

CREATE INDEX IF NOT EXISTS idx_jobs_dequeue
    ON jobs (status, priority DESC, scheduled_at ASC)
    WHERE status = 'pending';

CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs (status);
CREATE INDEX IF NOT EXISTS idx_jobs_type ON jobs (type);
CREATE INDEX IF NOT EXISTS idx_jobs_cleanup ON jobs (status, completed_at)
    WHERE status IN ('completed', 'failed', 'dead');

El indice parcial idx_jobs_dequeue es clave: SQLite lo usa exclusivamente para el SELECT de dequeue, manteniendo el scan rapido incluso con millones de jobs historicos en la tabla.

Funciones

Funcion Purity Firma (simplificada) Descripcion
job_queue_create impure (db *sql.DB, tableName string) (*JobQueue, error) Crea tabla de jobs con indices, activa WAL mode, retorna handle
job_enqueue impure (q *JobQueue, jobType string, payload string, opts ...EnqueueOption) (string, error) Inserta job con priority, scheduled_at, max_attempts. Retorna job ID
job_dequeue impure (q *JobQueue, jobTypes ...string) (*Job, error) Reclama atomicamente el proximo job pending. Nil si cola vacia
job_complete impure (q *JobQueue, jobID string, result string) error Marca job como completed con resultado JSON
job_fail impure (q *JobQueue, jobID string, errMsg string) error Marca como failed, incrementa attempts. Si attempts >= max_attempts, marca como dead
job_worker impure (ctx context.Context, q *JobQueue, handler JobHandler, opts ...WorkerOption) error Goroutine que pollea la cola y ejecuta handler. Para con context
job_worker_pool impure (ctx context.Context, q *JobQueue, handler JobHandler, n int, opts ...WorkerOption) error Lanza N workers, espera a que todos terminen con graceful shutdown
job_cleanup impure (q *JobQueue, olderThan time.Duration) (int64, error) Elimina jobs completed/failed/dead mas antiguos que la retencion
job_status pure (counts map[JobStatus]int) string Formatea resumen legible: "pending: 5, running: 2, completed: 100, failed: 3, dead: 1"

Dequeue atomico

La operacion critica es job_dequeue — debe reclamar un job sin condiciones de carrera cuando hay multiples workers. SQLite no tiene SELECT FOR UPDATE, pero se puede lograr con una transaccion exclusiva:

func JobDequeue(q *JobQueue, jobTypes ...string) (*Job, error) {
    tx, err := q.DB.Begin()
    if err != nil {
        return nil, err
    }
    defer tx.Rollback()

    // SELECT el proximo job elegible
    query := fmt.Sprintf(`
        SELECT id, type, payload, status, priority, attempts, max_attempts,
               scheduled_at, result, error, created_at
        FROM %s
        WHERE status = 'pending'
          AND scheduled_at <= strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now')
        ORDER BY priority DESC, scheduled_at ASC
        LIMIT 1
    `, q.TableName)

    // Si se especifican tipos, filtrar
    if len(jobTypes) > 0 {
        // ... añadir AND type IN (...)
    }

    var job Job
    err = tx.QueryRow(query).Scan(/* ... */)
    if err == sql.ErrNoRows {
        return nil, nil // cola vacia, no es error
    }
    if err != nil {
        return nil, err
    }

    // UPDATE atomico dentro de la misma transaccion
    _, err = tx.Exec(fmt.Sprintf(`
        UPDATE %s SET status = 'running', started_at = strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now')
        WHERE id = ?
    `, q.TableName), job.ID)
    if err != nil {
        return nil, err
    }

    return &job, tx.Commit()
}

Con WAL mode, los writers serializan transacciones de escritura pero los readers no bloquean. Dado que dequeue es la unica operacion que necesita exclusividad de escritura y es extremadamente rapida (un SELECT indexado + un UPDATE por PK), la contención es minima incluso con varios workers.

Backoff exponencial en reintentos

Cuando un job falla, job_fail incrementa attempts y lo marca como failed. El worker, al reclamar un job previamente fallido, respeta un backoff basado en el numero de intentos:

// Delay antes de reintentar: base * 2^(attempts-1)
// Intento 1: 5s, intento 2: 10s, intento 3: 20s
backoff := baseDelay * time.Duration(1<<uint(job.Attempts-1))

Cuando attempts >= max_attempts, el job pasa a status dead (dead-letter). Los jobs dead no se reintentan automaticamente — quedan para inspeccion manual o re-enqueue explicito.

Worker loop

func JobWorker(ctx context.Context, q *JobQueue, handler JobHandler, opts ...WorkerOption) error {
    cfg := defaultWorkerConfig() // pollInterval: 1s, jobTypes: nil (todos)
    for _, opt := range opts {
        opt(&cfg)
    }

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        job, err := JobDequeue(q, cfg.jobTypes...)
        if err != nil {
            // log error, backoff, continue
            time.Sleep(cfg.pollInterval)
            continue
        }

        if job == nil {
            // cola vacia, esperar
            time.Sleep(cfg.pollInterval)
            continue
        }

        // Ejecutar handler
        if err := handler(*job); err != nil {
            JobFail(q, job.ID, err.Error())
        } else {
            JobComplete(q, job.ID, "") // resultado vacio por defecto
        }
    }
}

Tareas

Fase 1: Tipos

  • 1.1 Crear tipo Job en functions/infra/job.go con .md en types/infra/job.md
  • 1.2 Crear tipo JobQueue en functions/infra/job_queue.go con .md en types/infra/job_queue.md
  • 1.3 Crear tipo JobStatus (sum) en functions/infra/job_status_type.go con .md en types/infra/job_status.md
  • 1.4 Crear tipo JobHandler en functions/infra/job_handler.go con .md en types/infra/job_handler.md

Fase 2: Funciones core (CRUD de jobs)

  • 2.1 job_queue_create — crea tabla con schema e indices, configura WAL mode, retorna *JobQueue
  • 2.2 job_enqueue — INSERT con UUID generado, soporte para options (priority, scheduled_at, max_attempts)
  • 2.3 job_dequeue — SELECT + UPDATE atomico en transaccion, nil si cola vacia
  • 2.4 job_complete — UPDATE status=completed, set completed_at y result
  • 2.5 job_fail — UPDATE status=failed|dead, incrementar attempts, set error
  • 2.6 job_status (pure) — recibe map de conteos, retorna string formateado

Fase 3: Workers y mantenimiento

  • 3.1 job_worker — goroutine con poll loop, context cancelable, backoff configurable
  • 3.2 job_worker_pool — lanza N workers con errgroup, graceful shutdown via context
  • 3.3 job_cleanup — DELETE de jobs completed/failed/dead mas antiguos que la retencion

Fase 4: Tests y validacion

  • 4.1 Tests para job_queue_create — verifica tabla e indices creados
  • 4.2 Tests para enqueue/dequeue — verifica atomicidad, prioridad, scheduling futuro
  • 4.3 Tests para complete/fail — verifica transiciones de estado, dead-letter tras max_attempts
  • 4.4 Tests para worker — verifica procesamiento, reintentos, graceful shutdown con context
  • 4.5 Tests de concurrencia — multiples goroutines dequeue simultaneo, ningun job procesado dos veces
  • 4.6 fn index y verificar que todas las funciones y tipos aparecen en registry.db
  • 4.7 Verificar go vet -tags fts5 y go test -tags fts5 -race ./functions/infra/

Ejemplo de uso

Cola de envio de emails

// En una app que necesita enviar emails en background

// 1. Crear la cola
db, _ := sql.Open("sqlite3", "operations.db?_journal_mode=wal")
queue, _ := infra.JobQueueCreate(db, "jobs")

// 2. Encolar emails desde un handler HTTP
func handleSignup(w http.ResponseWriter, r *http.Request) {
    user := createUser(r)

    payload, _ := json.Marshal(map[string]string{
        "to":      user.Email,
        "subject": "Bienvenido",
        "body":    "Gracias por registrarte...",
    })

    jobID, _ := infra.JobEnqueue(queue, "send_email", string(payload),
        infra.WithPriority(1),
        infra.WithMaxAttempts(5),
    )

    log.Printf("email encolado: job=%s user=%s", jobID, user.Email)
    infra.HttpJsonResponse(w, 201, user)
}

// 3. Worker que procesa los emails
func emailHandler(job infra.Job) error {
    var email struct {
        To      string `json:"to"`
        Subject string `json:"subject"`
        Body    string `json:"body"`
    }
    json.Unmarshal([]byte(job.Payload), &email)
    return sendEmail(email.To, email.Subject, email.Body)
}

// 4. Arrancar pool de workers
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

infra.JobWorkerPool(ctx, queue, emailHandler, 3,
    infra.WithPollInterval(2*time.Second),
    infra.WithJobTypes("send_email"),
)

Cola de procesamiento de archivos

// Encolar procesamiento de archivos con prioridad y scheduling

// Job de alta prioridad: procesar ahora
infra.JobEnqueue(queue, "process_csv", `{"path": "/data/ventas.csv", "format": "monthly"}`,
    infra.WithPriority(10),
)

// Job schedulado para las 3am
infra.JobEnqueue(queue, "generate_report", `{"type": "weekly", "format": "pdf"}`,
    infra.WithScheduledAt(nextAt3AM()),
)

// Worker que despacha por tipo
func fileHandler(job infra.Job) error {
    switch job.Type {
    case "process_csv":
        return processCSV(job.Payload)
    case "generate_report":
        return generateReport(job.Payload)
    default:
        return fmt.Errorf("tipo de job desconocido: %s", job.Type)
    }
}

Monitoreo y limpieza

// Consultar estado de la cola
rows, _ := db.Query(`
    SELECT status, COUNT(*) FROM jobs GROUP BY status
`)
counts := map[infra.JobStatus]int{}
for rows.Next() {
    var s string; var c int
    rows.Scan(&s, &c)
    counts[infra.JobStatus(s)] = c
}
fmt.Println(infra.JobStatusSummary(counts))
// → "pending: 12, running: 3, completed: 1458, failed: 7, dead: 2"

// Limpiar jobs viejos (retener 7 dias)
deleted, _ := infra.JobCleanup(queue, 7*24*time.Hour)
log.Printf("limpiados %d jobs antiguos", deleted)

Decisiones de diseno

  • SQLite como backend unico: Coherente con el stack del proyecto. No introduce dependencias externas (Redis, Postgres). WAL mode permite lecturas concurrentes sin bloquear al writer.
  • Una tabla por cola, no una BD por cola: Permite que una app tenga su cola en su propio operations.db usando un table name custom, o que varias colas convivan en la misma BD con distintos table names.
  • Dequeue con transaccion, no con SKIP LOCKED: SQLite no soporta SKIP LOCKED. La transaccion exclusiva es correcta y suficiente para el nivel de concurrencia esperado (decenas de workers, no miles).
  • Poll-based, no event-driven: SQLite no tiene LISTEN/NOTIFY. El polling con intervalo configurable (default 1s) es simple y predecible. Para la escala de uso esperada (apps personales, no sistemas de alta frecuencia), el overhead es despreciable.
  • Functional options pattern (WithPriority, WithMaxAttempts): Permite extender sin romper firmas existentes. Idomático en Go.
  • Dead-letter como status, no como tabla separada: Los jobs dead quedan en la misma tabla para inspeccion. job_cleanup los borra despues del periodo de retencion.
  • Tipos nativos en firmas Go: Siguiendo la regla del registry, las funciones usan *sql.DB, string, time.Duration — no tipos del registry en la firma. Los tipos (Job, JobQueue, etc.) se documentan en uses_types/returns del frontmatter.

Riesgos

  • Contención de escritura con muchos workers: SQLite serializa escrituras. Con WAL mode y transacciones cortas (microsegundos), esto no es problema para decenas de workers. Si alguna app necesitara miles de dequeues/segundo, habria que migrar a Postgres — pero eso esta fuera del scope de este registry.
  • Polling waste con cola vacia: Workers consumen CPU haciendo SELECT cada segundo aunque no haya trabajo. Mitigado con backoff adaptativo: si N polls consecutivos retornan nil, el intervalo crece gradualmente (hasta un techo configurable). Se resetea al primer job encontrado.
  • Jobs zombies (running forever): Si un worker muere sin completar ni fallar el job, queda en status running indefinidamente. Solucion: job_worker registra un timeout por job; si el handler no retorna en ese tiempo, lo marca como failed. Complementar con un sweep periodico que busque jobs running con started_at mas antiguo que un umbral.
  • Payload sin schema: El payload es JSON libre (string). No hay validacion de estructura al encolar. Esto es intencional — cada JobHandler valida su propio payload. Pero un typo en el payload causa errores en runtime, no en compile time.
  • Limpieza manual: job_cleanup debe invocarse explicitamente (desde un cron, un job periodico, o al arrancar la app). No hay garbage collection automatico. Documentar en el ejemplo de uso.