auto(0129): agents_dashboard — secret_store_cpp_infra + CMakeLists register #4
@@ -0,0 +1,409 @@
|
||||
# 0013 — Background Job Queue
|
||||
|
||||
## Metadata
|
||||
|
||||
| Campo | Valor |
|
||||
|-------|-------|
|
||||
| **ID** | 0013 |
|
||||
| **Estado** | pendiente |
|
||||
| **Prioridad** | alta |
|
||||
| **Tipo** | feature |
|
||||
|
||||
## Dependencias
|
||||
|
||||
Ninguna.
|
||||
|
||||
---
|
||||
|
||||
## Objetivo
|
||||
|
||||
Cola de trabajos asincrona basada en SQLite para apps Go del registry. Permite a cualquier app encolar tareas (enviar emails, procesar archivos, ejecutar pipelines) y procesarlas en background con workers concurrentes, reintentos automaticos y dead-letter queue — todo respaldado por SQLite sin depender de Redis, RabbitMQ ni ningun servicio externo.
|
||||
|
||||
## Contexto
|
||||
|
||||
- Actualmente no existen funciones de job queue en el registry. Cada app que necesita procesamiento asincrono tiene que construir su propia solucion ad-hoc.
|
||||
- SQLite encaja perfectamente con el stack existente: `registry.db` y `operations.db` ya demuestran que SQLite escala para los volúmenes del proyecto.
|
||||
- Las apps Go del registry ya usan `mattn/go-sqlite3` con FTS5 y WAL mode — no hay dependencias nuevas.
|
||||
- `map_concurrent_go_core` ofrece paralelismo sincrono pero no persistencia: si el proceso muere, se pierde el trabajo. Una cola persistente en SQLite sobrevive a reinicios.
|
||||
- Patron existente en el ecosistema: `gocraft/work`, `faktory`, `river` — pero todos requieren Redis o Postgres. Una solucion SQLite-native es mas coherente con el proyecto.
|
||||
|
||||
## Arquitectura
|
||||
|
||||
```
|
||||
functions/infra/
|
||||
├── job_queue_create.go — NEW: crea tabla de jobs en SQLite, retorna handle
|
||||
├── job_queue_create.md — NEW
|
||||
├── job_enqueue.go — NEW: añade job a la cola
|
||||
├── job_enqueue.md — NEW
|
||||
├── job_dequeue.go — NEW: reclama proximo job pendiente (atomico)
|
||||
├── job_dequeue.md — NEW
|
||||
├── job_complete.go — NEW: marca job completado con resultado
|
||||
├── job_complete.md — NEW
|
||||
├── job_fail.go — NEW: marca job fallido, incrementa intentos
|
||||
├── job_fail.md — NEW
|
||||
├── job_worker.go — NEW: goroutine que pollea y procesa jobs
|
||||
├── job_worker.md — NEW
|
||||
├── job_worker_pool.go — NEW: pool de N workers con graceful shutdown
|
||||
├── job_worker_pool.md — NEW
|
||||
├── job_cleanup.go — NEW: elimina jobs antiguos completados/fallidos
|
||||
├── job_cleanup.md — NEW
|
||||
├── job_status.go — NEW: resumen de estado de la cola (pura)
|
||||
├── job_status.md — NEW
|
||||
|
||||
types/infra/
|
||||
├── job.md — NEW: metadata del tipo Job
|
||||
├── job_queue.md — NEW: metadata del tipo JobQueue
|
||||
├── job_status.md — NEW: metadata del tipo JobStatus (sum)
|
||||
├── job_handler.md — NEW: metadata del tipo JobHandler
|
||||
```
|
||||
|
||||
### Patron pure core / impure shell
|
||||
|
||||
- **Pure:** `job_status` — formatea conteos en un resumen sin I/O.
|
||||
- **Impure:** Todo lo demas — interactúa con SQLite (I/O de disco) y goroutines (concurrencia).
|
||||
|
||||
## Diseno
|
||||
|
||||
### Tipos
|
||||
|
||||
```go
|
||||
// JobStatus es un sum type: estados posibles de un job
|
||||
type JobStatus string
|
||||
|
||||
const (
|
||||
JobStatusPending JobStatus = "pending"
|
||||
JobStatusRunning JobStatus = "running"
|
||||
JobStatusCompleted JobStatus = "completed"
|
||||
JobStatusFailed JobStatus = "failed"
|
||||
JobStatusDead JobStatus = "dead" // max_attempts superado
|
||||
)
|
||||
|
||||
// Job representa una unidad de trabajo en la cola
|
||||
type Job struct {
|
||||
ID string `json:"id"` // UUID generado al encolar
|
||||
Type string `json:"type"` // tipo del job (ej: "send_email", "process_file")
|
||||
Payload string `json:"payload"` // JSON arbitrario con los datos del job
|
||||
Status JobStatus `json:"status"` // pending | running | completed | failed | dead
|
||||
Priority int `json:"priority"` // 0 = normal, mayor = mas prioritario
|
||||
Attempts int `json:"attempts"` // intentos ejecutados hasta ahora
|
||||
MaxAttempts int `json:"max_attempts"` // maximo de intentos antes de dead-letter
|
||||
ScheduledAt time.Time `json:"scheduled_at"` // cuando debe ejecutarse (permite scheduling futuro)
|
||||
StartedAt *time.Time `json:"started_at"` // cuando empezo a ejecutarse (nil si pending)
|
||||
CompletedAt *time.Time `json:"completed_at"` // cuando termino (nil si no completado)
|
||||
Result string `json:"result"` // JSON con resultado de ejecucion exitosa
|
||||
Error string `json:"error"` // mensaje de error del ultimo intento
|
||||
CreatedAt time.Time `json:"created_at"` // timestamp de creacion
|
||||
}
|
||||
|
||||
// JobQueue es el handle a una cola respaldada por SQLite
|
||||
type JobQueue struct {
|
||||
DB *sql.DB // conexion SQLite con WAL mode
|
||||
TableName string // nombre de la tabla (permite multiples colas en una BD)
|
||||
}
|
||||
|
||||
// JobHandler es la funcion que procesa un job
|
||||
// Recibe el job completo y retorna error si falla.
|
||||
type JobHandler func(job Job) error
|
||||
```
|
||||
|
||||
### Schema SQLite
|
||||
|
||||
```sql
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id TEXT PRIMARY KEY,
|
||||
type TEXT NOT NULL,
|
||||
payload TEXT NOT NULL DEFAULT '{}',
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
priority INTEGER NOT NULL DEFAULT 0,
|
||||
attempts INTEGER NOT NULL DEFAULT 0,
|
||||
max_attempts INTEGER NOT NULL DEFAULT 3,
|
||||
scheduled_at TEXT NOT NULL,
|
||||
started_at TEXT,
|
||||
completed_at TEXT,
|
||||
result TEXT,
|
||||
error TEXT,
|
||||
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_dequeue
|
||||
ON jobs (status, priority DESC, scheduled_at ASC)
|
||||
WHERE status = 'pending';
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs (status);
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_type ON jobs (type);
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_cleanup ON jobs (status, completed_at)
|
||||
WHERE status IN ('completed', 'failed', 'dead');
|
||||
```
|
||||
|
||||
El indice parcial `idx_jobs_dequeue` es clave: SQLite lo usa exclusivamente para el `SELECT` de dequeue, manteniendo el scan rapido incluso con millones de jobs historicos en la tabla.
|
||||
|
||||
### Funciones
|
||||
|
||||
| Funcion | Purity | Firma (simplificada) | Descripcion |
|
||||
|---------|--------|---------------------|-------------|
|
||||
| `job_queue_create` | impure | `(db *sql.DB, tableName string) (*JobQueue, error)` | Crea tabla de jobs con indices, activa WAL mode, retorna handle |
|
||||
| `job_enqueue` | impure | `(q *JobQueue, jobType string, payload string, opts ...EnqueueOption) (string, error)` | Inserta job con priority, scheduled_at, max_attempts. Retorna job ID |
|
||||
| `job_dequeue` | impure | `(q *JobQueue, jobTypes ...string) (*Job, error)` | Reclama atomicamente el proximo job pending. Nil si cola vacia |
|
||||
| `job_complete` | impure | `(q *JobQueue, jobID string, result string) error` | Marca job como completed con resultado JSON |
|
||||
| `job_fail` | impure | `(q *JobQueue, jobID string, errMsg string) error` | Marca como failed, incrementa attempts. Si attempts >= max_attempts, marca como dead |
|
||||
| `job_worker` | impure | `(ctx context.Context, q *JobQueue, handler JobHandler, opts ...WorkerOption) error` | Goroutine que pollea la cola y ejecuta handler. Para con context |
|
||||
| `job_worker_pool` | impure | `(ctx context.Context, q *JobQueue, handler JobHandler, n int, opts ...WorkerOption) error` | Lanza N workers, espera a que todos terminen con graceful shutdown |
|
||||
| `job_cleanup` | impure | `(q *JobQueue, olderThan time.Duration) (int64, error)` | Elimina jobs completed/failed/dead mas antiguos que la retencion |
|
||||
| `job_status` | pure | `(counts map[JobStatus]int) string` | Formatea resumen legible: "pending: 5, running: 2, completed: 100, failed: 3, dead: 1" |
|
||||
|
||||
### Dequeue atomico
|
||||
|
||||
La operacion critica es `job_dequeue` — debe reclamar un job sin condiciones de carrera cuando hay multiples workers. SQLite no tiene `SELECT FOR UPDATE`, pero se puede lograr con una transaccion exclusiva:
|
||||
|
||||
```go
|
||||
func JobDequeue(q *JobQueue, jobTypes ...string) (*Job, error) {
|
||||
tx, err := q.DB.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// SELECT el proximo job elegible
|
||||
query := fmt.Sprintf(`
|
||||
SELECT id, type, payload, status, priority, attempts, max_attempts,
|
||||
scheduled_at, result, error, created_at
|
||||
FROM %s
|
||||
WHERE status = 'pending'
|
||||
AND scheduled_at <= strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now')
|
||||
ORDER BY priority DESC, scheduled_at ASC
|
||||
LIMIT 1
|
||||
`, q.TableName)
|
||||
|
||||
// Si se especifican tipos, filtrar
|
||||
if len(jobTypes) > 0 {
|
||||
// ... añadir AND type IN (...)
|
||||
}
|
||||
|
||||
var job Job
|
||||
err = tx.QueryRow(query).Scan(/* ... */)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil // cola vacia, no es error
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// UPDATE atomico dentro de la misma transaccion
|
||||
_, err = tx.Exec(fmt.Sprintf(`
|
||||
UPDATE %s SET status = 'running', started_at = strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now')
|
||||
WHERE id = ?
|
||||
`, q.TableName), job.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &job, tx.Commit()
|
||||
}
|
||||
```
|
||||
|
||||
Con WAL mode, los writers serializan transacciones de escritura pero los readers no bloquean. Dado que `dequeue` es la unica operacion que necesita exclusividad de escritura y es extremadamente rapida (un SELECT indexado + un UPDATE por PK), la contención es minima incluso con varios workers.
|
||||
|
||||
### Backoff exponencial en reintentos
|
||||
|
||||
Cuando un job falla, `job_fail` incrementa `attempts` y lo marca como `failed`. El worker, al reclamar un job previamente fallido, respeta un backoff basado en el numero de intentos:
|
||||
|
||||
```go
|
||||
// Delay antes de reintentar: base * 2^(attempts-1)
|
||||
// Intento 1: 5s, intento 2: 10s, intento 3: 20s
|
||||
backoff := baseDelay * time.Duration(1<<uint(job.Attempts-1))
|
||||
```
|
||||
|
||||
Cuando `attempts >= max_attempts`, el job pasa a status `dead` (dead-letter). Los jobs dead no se reintentan automaticamente — quedan para inspeccion manual o re-enqueue explicito.
|
||||
|
||||
### Worker loop
|
||||
|
||||
```go
|
||||
func JobWorker(ctx context.Context, q *JobQueue, handler JobHandler, opts ...WorkerOption) error {
|
||||
cfg := defaultWorkerConfig() // pollInterval: 1s, jobTypes: nil (todos)
|
||||
for _, opt := range opts {
|
||||
opt(&cfg)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
job, err := JobDequeue(q, cfg.jobTypes...)
|
||||
if err != nil {
|
||||
// log error, backoff, continue
|
||||
time.Sleep(cfg.pollInterval)
|
||||
continue
|
||||
}
|
||||
|
||||
if job == nil {
|
||||
// cola vacia, esperar
|
||||
time.Sleep(cfg.pollInterval)
|
||||
continue
|
||||
}
|
||||
|
||||
// Ejecutar handler
|
||||
if err := handler(*job); err != nil {
|
||||
JobFail(q, job.ID, err.Error())
|
||||
} else {
|
||||
JobComplete(q, job.ID, "") // resultado vacio por defecto
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Tareas
|
||||
|
||||
### Fase 1: Tipos
|
||||
|
||||
- [ ] **1.1** Crear tipo `Job` en `functions/infra/job.go` con `.md` en `types/infra/job.md`
|
||||
- [ ] **1.2** Crear tipo `JobQueue` en `functions/infra/job_queue.go` con `.md` en `types/infra/job_queue.md`
|
||||
- [ ] **1.3** Crear tipo `JobStatus` (sum) en `functions/infra/job_status_type.go` con `.md` en `types/infra/job_status.md`
|
||||
- [ ] **1.4** Crear tipo `JobHandler` en `functions/infra/job_handler.go` con `.md` en `types/infra/job_handler.md`
|
||||
|
||||
### Fase 2: Funciones core (CRUD de jobs)
|
||||
|
||||
- [ ] **2.1** `job_queue_create` — crea tabla con schema e indices, configura WAL mode, retorna `*JobQueue`
|
||||
- [ ] **2.2** `job_enqueue` — INSERT con UUID generado, soporte para options (priority, scheduled_at, max_attempts)
|
||||
- [ ] **2.3** `job_dequeue` — SELECT + UPDATE atomico en transaccion, nil si cola vacia
|
||||
- [ ] **2.4** `job_complete` — UPDATE status=completed, set completed_at y result
|
||||
- [ ] **2.5** `job_fail` — UPDATE status=failed|dead, incrementar attempts, set error
|
||||
- [ ] **2.6** `job_status` (pure) — recibe map de conteos, retorna string formateado
|
||||
|
||||
### Fase 3: Workers y mantenimiento
|
||||
|
||||
- [ ] **3.1** `job_worker` — goroutine con poll loop, context cancelable, backoff configurable
|
||||
- [ ] **3.2** `job_worker_pool` — lanza N workers con `errgroup`, graceful shutdown via context
|
||||
- [ ] **3.3** `job_cleanup` — DELETE de jobs completed/failed/dead mas antiguos que la retencion
|
||||
|
||||
### Fase 4: Tests y validacion
|
||||
|
||||
- [ ] **4.1** Tests para `job_queue_create` — verifica tabla e indices creados
|
||||
- [ ] **4.2** Tests para enqueue/dequeue — verifica atomicidad, prioridad, scheduling futuro
|
||||
- [ ] **4.3** Tests para complete/fail — verifica transiciones de estado, dead-letter tras max_attempts
|
||||
- [ ] **4.4** Tests para worker — verifica procesamiento, reintentos, graceful shutdown con context
|
||||
- [ ] **4.5** Tests de concurrencia — multiples goroutines dequeue simultaneo, ningun job procesado dos veces
|
||||
- [ ] **4.6** `fn index` y verificar que todas las funciones y tipos aparecen en registry.db
|
||||
- [ ] **4.7** Verificar `go vet -tags fts5` y `go test -tags fts5 -race ./functions/infra/`
|
||||
|
||||
---
|
||||
|
||||
## Ejemplo de uso
|
||||
|
||||
### Cola de envio de emails
|
||||
|
||||
```go
|
||||
// En una app que necesita enviar emails en background
|
||||
|
||||
// 1. Crear la cola
|
||||
db, _ := sql.Open("sqlite3", "operations.db?_journal_mode=wal")
|
||||
queue, _ := infra.JobQueueCreate(db, "jobs")
|
||||
|
||||
// 2. Encolar emails desde un handler HTTP
|
||||
func handleSignup(w http.ResponseWriter, r *http.Request) {
|
||||
user := createUser(r)
|
||||
|
||||
payload, _ := json.Marshal(map[string]string{
|
||||
"to": user.Email,
|
||||
"subject": "Bienvenido",
|
||||
"body": "Gracias por registrarte...",
|
||||
})
|
||||
|
||||
jobID, _ := infra.JobEnqueue(queue, "send_email", string(payload),
|
||||
infra.WithPriority(1),
|
||||
infra.WithMaxAttempts(5),
|
||||
)
|
||||
|
||||
log.Printf("email encolado: job=%s user=%s", jobID, user.Email)
|
||||
infra.HttpJsonResponse(w, 201, user)
|
||||
}
|
||||
|
||||
// 3. Worker que procesa los emails
|
||||
func emailHandler(job infra.Job) error {
|
||||
var email struct {
|
||||
To string `json:"to"`
|
||||
Subject string `json:"subject"`
|
||||
Body string `json:"body"`
|
||||
}
|
||||
json.Unmarshal([]byte(job.Payload), &email)
|
||||
return sendEmail(email.To, email.Subject, email.Body)
|
||||
}
|
||||
|
||||
// 4. Arrancar pool de workers
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
defer cancel()
|
||||
|
||||
infra.JobWorkerPool(ctx, queue, emailHandler, 3,
|
||||
infra.WithPollInterval(2*time.Second),
|
||||
infra.WithJobTypes("send_email"),
|
||||
)
|
||||
```
|
||||
|
||||
### Cola de procesamiento de archivos
|
||||
|
||||
```go
|
||||
// Encolar procesamiento de archivos con prioridad y scheduling
|
||||
|
||||
// Job de alta prioridad: procesar ahora
|
||||
infra.JobEnqueue(queue, "process_csv", `{"path": "/data/ventas.csv", "format": "monthly"}`,
|
||||
infra.WithPriority(10),
|
||||
)
|
||||
|
||||
// Job schedulado para las 3am
|
||||
infra.JobEnqueue(queue, "generate_report", `{"type": "weekly", "format": "pdf"}`,
|
||||
infra.WithScheduledAt(nextAt3AM()),
|
||||
)
|
||||
|
||||
// Worker que despacha por tipo
|
||||
func fileHandler(job infra.Job) error {
|
||||
switch job.Type {
|
||||
case "process_csv":
|
||||
return processCSV(job.Payload)
|
||||
case "generate_report":
|
||||
return generateReport(job.Payload)
|
||||
default:
|
||||
return fmt.Errorf("tipo de job desconocido: %s", job.Type)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Monitoreo y limpieza
|
||||
|
||||
```go
|
||||
// Consultar estado de la cola
|
||||
rows, _ := db.Query(`
|
||||
SELECT status, COUNT(*) FROM jobs GROUP BY status
|
||||
`)
|
||||
counts := map[infra.JobStatus]int{}
|
||||
for rows.Next() {
|
||||
var s string; var c int
|
||||
rows.Scan(&s, &c)
|
||||
counts[infra.JobStatus(s)] = c
|
||||
}
|
||||
fmt.Println(infra.JobStatusSummary(counts))
|
||||
// → "pending: 12, running: 3, completed: 1458, failed: 7, dead: 2"
|
||||
|
||||
// Limpiar jobs viejos (retener 7 dias)
|
||||
deleted, _ := infra.JobCleanup(queue, 7*24*time.Hour)
|
||||
log.Printf("limpiados %d jobs antiguos", deleted)
|
||||
```
|
||||
|
||||
## Decisiones de diseno
|
||||
|
||||
- **SQLite como backend unico:** Coherente con el stack del proyecto. No introduce dependencias externas (Redis, Postgres). WAL mode permite lecturas concurrentes sin bloquear al writer.
|
||||
- **Una tabla por cola, no una BD por cola:** Permite que una app tenga su cola en su propio `operations.db` usando un table name custom, o que varias colas convivan en la misma BD con distintos table names.
|
||||
- **Dequeue con transaccion, no con SKIP LOCKED:** SQLite no soporta `SKIP LOCKED`. La transaccion exclusiva es correcta y suficiente para el nivel de concurrencia esperado (decenas de workers, no miles).
|
||||
- **Poll-based, no event-driven:** SQLite no tiene `LISTEN/NOTIFY`. El polling con intervalo configurable (default 1s) es simple y predecible. Para la escala de uso esperada (apps personales, no sistemas de alta frecuencia), el overhead es despreciable.
|
||||
- **Functional options pattern (WithPriority, WithMaxAttempts):** Permite extender sin romper firmas existentes. Idomático en Go.
|
||||
- **Dead-letter como status, no como tabla separada:** Los jobs dead quedan en la misma tabla para inspeccion. `job_cleanup` los borra despues del periodo de retencion.
|
||||
- **Tipos nativos en firmas Go:** Siguiendo la regla del registry, las funciones usan `*sql.DB`, `string`, `time.Duration` — no tipos del registry en la firma. Los tipos (`Job`, `JobQueue`, etc.) se documentan en `uses_types`/`returns` del frontmatter.
|
||||
|
||||
## Riesgos
|
||||
|
||||
- **Contención de escritura con muchos workers:** SQLite serializa escrituras. Con WAL mode y transacciones cortas (microsegundos), esto no es problema para decenas de workers. Si alguna app necesitara miles de dequeues/segundo, habria que migrar a Postgres — pero eso esta fuera del scope de este registry.
|
||||
- **Polling waste con cola vacia:** Workers consumen CPU haciendo SELECT cada segundo aunque no haya trabajo. Mitigado con backoff adaptativo: si N polls consecutivos retornan nil, el intervalo crece gradualmente (hasta un techo configurable). Se resetea al primer job encontrado.
|
||||
- **Jobs zombies (running forever):** Si un worker muere sin completar ni fallar el job, queda en status `running` indefinidamente. Solucion: `job_worker` registra un timeout por job; si el handler no retorna en ese tiempo, lo marca como failed. Complementar con un sweep periodico que busque jobs `running` con `started_at` mas antiguo que un umbral.
|
||||
- **Payload sin schema:** El payload es JSON libre (`string`). No hay validacion de estructura al encolar. Esto es intencional — cada `JobHandler` valida su propio payload. Pero un typo en el payload causa errores en runtime, no en compile time.
|
||||
- **Limpieza manual:** `job_cleanup` debe invocarse explicitamente (desde un cron, un job periodico, o al arrancar la app). No hay garbage collection automatico. Documentar en el ejemplo de uso.
|
||||
Reference in New Issue
Block a user