package infra import ( "database/sql" "errors" "fmt" "strings" "time" ) // JobDequeue atomically claims the next available job by running a // SELECT + UPDATE inside an EXCLUSIVE transaction. Returns nil, nil when the // queue is empty (or no job matches the jobTypes filter). // jobTypes restricts which job types are dequeued; an empty slice means all types. func JobDequeue(q *JobQueue, jobTypes []string) (*Job, error) { if q == nil { return nil, fmt.Errorf("job_dequeue: queue must not be nil") } tx, err := q.DB.Begin() if err != nil { return nil, fmt.Errorf("job_dequeue: begin tx: %w", err) } defer tx.Rollback() //nolint:errcheck // Build the SELECT query. now := time.Now().UTC().Format(time.RFC3339) var selectQ string var args []any if len(jobTypes) > 0 { placeholders := make([]string, len(jobTypes)) for i, t := range jobTypes { placeholders[i] = "?" args = append(args, t) } selectQ = fmt.Sprintf(` SELECT id, type, payload, status, priority, attempts, max_attempts, scheduled_at, started_at, completed_at, result, error, created_at FROM %s WHERE status = 'pending' AND scheduled_at <= ? AND type IN (%s) ORDER BY priority DESC, scheduled_at ASC LIMIT 1 `, q.TableName, strings.Join(placeholders, ",")) args = append(args, now) // rearrange: now goes before jobTypes in the query newArgs := []any{now} newArgs = append(newArgs, args[:len(args)-1]...) args = newArgs } else { selectQ = fmt.Sprintf(` SELECT id, type, payload, status, priority, attempts, max_attempts, scheduled_at, started_at, completed_at, result, error, created_at FROM %s WHERE status = 'pending' AND scheduled_at <= ? ORDER BY priority DESC, scheduled_at ASC LIMIT 1 `, q.TableName) args = []any{now} } row := tx.QueryRow(selectQ, args...) var job Job var scheduledAt string var startedAt, completedAt sql.NullString var result, jobError sql.NullString var createdAt string err = row.Scan( &job.ID, &job.Type, &job.Payload, (*string)(&job.Status), &job.Priority, &job.Attempts, &job.MaxAttempts, &scheduledAt, &startedAt, &completedAt, &result, &jobError, &createdAt, ) if errors.Is(err, sql.ErrNoRows) { return nil, nil } if err != nil { return nil, fmt.Errorf("job_dequeue: scan: %w", err) } // Mark as running. updateQ := fmt.Sprintf(` UPDATE %s SET status = 'running', started_at = ? WHERE id = ? `, q.TableName) if _, err := tx.Exec(updateQ, now, job.ID); err != nil { return nil, fmt.Errorf("job_dequeue: update status: %w", err) } if err := tx.Commit(); err != nil { return nil, fmt.Errorf("job_dequeue: commit: %w", err) } // Hydrate optional fields. if t, err := time.Parse(time.RFC3339, scheduledAt); err == nil { job.ScheduledAt = t } if t, err := time.Parse(time.RFC3339, createdAt); err == nil { job.CreatedAt = t } if startedAt.Valid { if t, err := time.Parse(time.RFC3339, startedAt.String); err == nil { job.StartedAt = &t } } if completedAt.Valid { if t, err := time.Parse(time.RFC3339, completedAt.String); err == nil { job.CompletedAt = &t } } if result.Valid { job.Result = &result.String } if jobError.Valid { job.Error = &jobError.String } job.Status = JobStatusRunning return &job, nil }