Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions PLAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@
- [x] Redis implementation (fast queue operations, locks)
- [x] Unit tests for backend behavior

## Phase 2 – Worker Pool & Execution
## Phase 2 – Worker Pool & Execution

- [ ] Implement worker pool:
- [ ] Multiple workers per queue
- [ ] Graceful shutdown
- [ ] Heartbeats / worker status
- [ ] Implement retry & backoff strategies:
- [ ] Fixed backoff
- [ ] Exponential backoff
- [ ] Max attempts → DLQ
- [ ] Worker handler registration:
- [ ] Map job type → handler func
- [ ] Context with job metadata
- [ ] Basic logging and instrumentation hooks
- [x] Implement worker pool:
- [x] Multiple workers per queue (configurable concurrency per queue)
- [x] Graceful shutdown (context cancellation, waits for in-flight jobs)
- [x] Worker status snapshots (idle / running / stopped + last job ID)
- [x] Implement retry & backoff strategies:
- [x] Fixed backoff (`FixedBackoff`)
- [x] Exponential backoff (`ExponentialBackoff`, capped at MaxDelay)
- [x] Max attempts → DLQ (handled in backend `Nack`)
- [x] Worker handler registration:
- [x] Map job type → handler func (`Registry`)
- [x] Context with job metadata passed to every handler
- [x] Basic logging and instrumentation hooks (`log/slog`, per-event structured logs)
- [x] Fix retry bug: backends now re-schedule non-final failures as `pending` with backoff delay

## Phase 3 – HTTP API

Expand Down
8 changes: 5 additions & 3 deletions internal/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"context"
"time"

"github.com/reckziegelwilliam/queuekit/internal/queue"
)
Expand All @@ -18,9 +19,10 @@ type Backend interface {
// Ack marks a job as successfully completed
Ack(ctx context.Context, jobID string) error

// Nack marks a job as failed and increments its attempt count
// If the job has exceeded max attempts, it should be moved to DLQ automatically
Nack(ctx context.Context, jobID string, err error) error
// Nack marks a job as failed, increments its attempt count, and schedules a retry.
// retryDelay controls how long before the job becomes eligible for re-processing.
// If the job has exceeded max attempts it is moved to the dead-letter queue instead.
Nack(ctx context.Context, jobID string, err error, retryDelay time.Duration) error

// MoveToDLQ moves a job to the dead-letter queue
MoveToDLQ(ctx context.Context, jobID string) error
Expand Down
19 changes: 11 additions & 8 deletions internal/backend/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,10 @@ func (p *PostgresBackend) Ack(ctx context.Context, jobID string) error {
return nil
}

// Nack marks a job as failed and increments its attempt count
func (p *PostgresBackend) Nack(ctx context.Context, jobID string, jobErr error) error {
// Nack marks a job as failed, increments its attempt count, and schedules a retry.
// If attempts >= max_attempts the job is moved to the dead-letter queue instead.
// retryDelay controls when the job becomes eligible for re-processing.
func (p *PostgresBackend) Nack(ctx context.Context, jobID string, jobErr error, retryDelay time.Duration) error {
tx, err := p.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
Expand Down Expand Up @@ -183,20 +185,21 @@ func (p *PostgresBackend) Nack(ctx context.Context, jobID string, jobErr error)
if attempts >= maxAttempts {
query := `
UPDATE jobs
SET status = 'dead', attempts = $1, last_error = $2,
SET status = 'dead', attempts = $1, last_error = $2,
failed_at = $3, updated_at = $3
WHERE id = $4
`
_, err = tx.Exec(ctx, query, attempts, lastError, now, jobID)
} else {
// Otherwise mark as failed and allow retry
// Schedule retry: put back to pending with a future scheduled_at
retryAt := now.Add(retryDelay)
query := `
UPDATE jobs
SET status = 'failed', attempts = $1, last_error = $2,
failed_at = $3, updated_at = $3
WHERE id = $4
SET status = 'pending', attempts = $1, last_error = $2,
failed_at = $3, updated_at = $3, scheduled_at = $4
WHERE id = $5
`
_, err = tx.Exec(ctx, query, attempts, lastError, now, jobID)
_, err = tx.Exec(ctx, query, attempts, lastError, now, retryAt, jobID)
}

if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions internal/backend/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,21 @@
require.NoError(t, err)

testErr := &testError{msg: "processing failed"}
err = backend.Nack(ctx, reserved.ID, testErr)
err = backend.Nack(ctx, reserved.ID, testErr, 0)
require.NoError(t, err)

// Check job was marked as failed
failed, err := backend.GetJob(ctx, reserved.ID)
// Job should be rescheduled as pending (retryable, attempts=1, max=2)
retrying, err := backend.GetJob(ctx, reserved.ID)
require.NoError(t, err)
assert.Equal(t, queue.StatusFailed, failed.Status)
assert.Equal(t, 1, failed.Attempts)
assert.Equal(t, "processing failed", failed.LastError)
assert.Equal(t, queue.StatusPending, retrying.Status)
assert.Equal(t, 1, retrying.Attempts)
assert.Equal(t, "processing failed", retrying.LastError)

// Nack again - should move to dead letter queue
err = backend.Nack(ctx, failed.ID, testErr)
// Nack again - should move to dead letter queue (attempts=2 >= max=2)
err = backend.Nack(ctx, retrying.ID, testErr, 0)
require.NoError(t, err)

dead, err := backend.GetJob(ctx, failed.ID)

Check failure on line 207 in internal/backend/postgres/postgres_test.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: failed (typecheck)

Check failure on line 207 in internal/backend/postgres/postgres_test.go

View workflow job for this annotation

GitHub Actions / Test

undefined: failed
require.NoError(t, err)
assert.Equal(t, queue.StatusDead, dead.Status)
assert.Equal(t, 2, dead.Attempts)
Expand Down
10 changes: 6 additions & 4 deletions internal/backend/redis/redis.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package redis

Check failure on line 1 in internal/backend/redis/redis.go

View workflow job for this annotation

GitHub Actions / Lint

: # github.com/reckziegelwilliam/queuekit/internal/backend/redis [github.com/reckziegelwilliam/queuekit/internal/backend/redis.test]

import (
"context"
Expand Down Expand Up @@ -176,8 +176,10 @@
return nil
}

// Nack marks a job as failed and increments its attempt count
func (r *RedisBackend) Nack(ctx context.Context, jobID string, jobErr error) error {
// Nack marks a job as failed, increments its attempt count, and schedules a retry.
// If attempts >= max_attempts the job is moved to the dead-letter queue instead.
// retryDelay controls when the job becomes eligible for re-processing.
func (r *RedisBackend) Nack(ctx context.Context, jobID string, jobErr error, retryDelay time.Duration) error {
// Get current job data
jobData, err := r.client.HGetAll(ctx, jobKey(jobID)).Result()
if err != nil {
Expand All @@ -191,22 +193,22 @@
attempts, _ := strconv.Atoi(jobData["attempts"])
maxAttempts, _ := strconv.Atoi(jobData["max_attempts"])
queueName := jobData["queue"]
scheduledAt := jobData["scheduled_at"]

lastError := ""
if jobErr != nil {
lastError = jobErr.Error()
}

now := time.Now().UTC()
retryAt := now.Add(retryDelay)

_, err = nackScript.Run(ctx, r.client,
[]string{jobKey(jobID), queueName},
attempts,
maxAttempts,
lastError,
now.Unix(),
scheduledAt,
retryAt.Unix(),
).Result()

if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions internal/backend/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,21 +188,21 @@
require.NoError(t, err)

testErr := &testError{msg: "processing failed"}
err = backend.Nack(ctx, reserved.ID, testErr)
err = backend.Nack(ctx, reserved.ID, testErr, 0)
require.NoError(t, err)

// Check job was marked as failed
failed, err := backend.GetJob(ctx, reserved.ID)
// Job should be rescheduled as pending (retryable, attempts=1, max=2)
retrying, err := backend.GetJob(ctx, reserved.ID)
require.NoError(t, err)
assert.Equal(t, queue.StatusFailed, failed.Status)
assert.Equal(t, 1, failed.Attempts)
assert.Equal(t, "processing failed", failed.LastError)
assert.Equal(t, queue.StatusPending, retrying.Status)
assert.Equal(t, 1, retrying.Attempts)
assert.Equal(t, "processing failed", retrying.LastError)

// Nack again - should move to dead letter queue
err = backend.Nack(ctx, failed.ID, testErr)
// Nack again - should move to dead letter queue (attempts=2 >= max=2)
err = backend.Nack(ctx, retrying.ID, testErr, 0)
require.NoError(t, err)

dead, err := backend.GetJob(ctx, failed.ID)

Check failure on line 205 in internal/backend/redis/redis_test.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: failed (typecheck)

Check failure on line 205 in internal/backend/redis/redis_test.go

View workflow job for this annotation

GitHub Actions / Test

undefined: failed
require.NoError(t, err)
assert.Equal(t, queue.StatusDead, dead.Status)
assert.Equal(t, 2, dead.Attempts)
Expand Down
28 changes: 14 additions & 14 deletions internal/backend/redis/scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,49 +43,49 @@ var reserveScript = redis.NewScript(`
// Lua script to atomically nack a job
// KEYS[1] = job hash key
// KEYS[2] = queue name
// ARGV[1] = current attempts
// ARGV[1] = current attempts (before increment)
// ARGV[2] = max_attempts
// ARGV[3] = last_error
// ARGV[4] = current timestamp
// ARGV[5] = scheduled_at (for re-enqueue)
// ARGV[4] = current timestamp (unix seconds)
// ARGV[5] = retry_at timestamp (unix seconds, now + retryDelay)
var nackScript = redis.NewScript(`
local job_key = KEYS[1]
local queue_name = KEYS[2]
local attempts = tonumber(ARGV[1]) + 1
local max_attempts = tonumber(ARGV[2])
local last_error = ARGV[3]
local now = ARGV[4]
local scheduled_at = tonumber(ARGV[5])
local retry_at = tonumber(ARGV[5])

-- Get job ID
local job_id = redis.call('HGET', job_key, 'id')
if not job_id then
return redis.error_reply('job not found')
end

-- Update attempts and error
redis.call('HSET', job_key,
redis.call('HSET', job_key,
'attempts', attempts,
'last_error', last_error,
'failed_at', now,
'updated_at', now
)

-- Remove from running set
redis.call('SREM', 'status:queue:' .. queue_name .. ':running', job_id)

-- Check if exceeded max attempts
if attempts >= max_attempts then
-- Move to dead letter queue
redis.call('HSET', job_key, 'status', 'dead')
redis.call('SADD', 'status:queue:' .. queue_name .. ':dead', job_id)
return 'dead'
else
-- Re-enqueue as failed (can be retried)
redis.call('HSET', job_key, 'status', 'failed')
redis.call('ZADD', 'queue:' .. queue_name, scheduled_at, job_id)
redis.call('SADD', 'status:queue:' .. queue_name .. ':failed', job_id)
return 'failed'
-- Re-schedule as pending with backoff delay so Reserve can pick it up
redis.call('HSET', job_key, 'status', 'pending', 'scheduled_at', retry_at)
redis.call('ZADD', 'queue:' .. queue_name, retry_at, job_id)
redis.call('SADD', 'status:queue:' .. queue_name .. ':pending', job_id)
return 'pending'
end
`)

Expand Down
65 changes: 65 additions & 0 deletions internal/worker/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package worker

import (
"math"
"time"
)

// BackoffStrategy determines how long to wait before retrying a failed job.
// attempts is the number of attempts already made (0-based before the next attempt).
type BackoffStrategy interface {
NextDelay(attempts int) time.Duration
}

// FixedBackoff waits a constant duration between every retry.
type FixedBackoff struct {
Delay time.Duration
}

// NextDelay always returns the fixed delay regardless of attempt count.
func (f *FixedBackoff) NextDelay(_ int) time.Duration {
return f.Delay
}

// ExponentialBackoff doubles (or scales by Factor) the wait time after each failure,
// capped at MaxDelay.
type ExponentialBackoff struct {
// InitialDelay is the wait time after the first failure.
InitialDelay time.Duration
// MaxDelay is the upper bound on the computed delay.
MaxDelay time.Duration
// Factor is the multiplier applied per attempt. Defaults to 2.0 if zero.
Factor float64
}

// NextDelay returns InitialDelay * Factor^(attempts-1), capped at MaxDelay.
func (e *ExponentialBackoff) NextDelay(attempts int) time.Duration {
factor := e.Factor
if factor <= 0 {
factor = 2.0
}
if attempts <= 0 {
return e.InitialDelay
}
delay := float64(e.InitialDelay) * math.Pow(factor, float64(attempts-1))
if delay > float64(e.MaxDelay) {
return e.MaxDelay
}
return time.Duration(delay)
}

// DefaultExponentialBackoff returns a sensible exponential backoff suitable for
// most production workloads: starts at 5 s and caps at 1 h.
func DefaultExponentialBackoff() *ExponentialBackoff {
return &ExponentialBackoff{
InitialDelay: 5 * time.Second,
MaxDelay: 1 * time.Hour,
Factor: 2.0,
}
}

// NoBackoff returns a zero-delay backoff — retries are immediate.
// Useful in tests or for jobs that should be retried as fast as possible.
func NoBackoff() *FixedBackoff {
return &FixedBackoff{Delay: 0}
}
44 changes: 44 additions & 0 deletions internal/worker/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package worker

import (
"context"
"fmt"

"github.com/reckziegelwilliam/queuekit/internal/queue"
)

// Handler is a function that processes a single job.
// It receives a context (which may be cancelled on shutdown) and the job to process.
// Return nil to indicate success, or a non-nil error to trigger a retry/DLQ.
type Handler func(ctx context.Context, job *queue.Job) error

// Registry maps job types to their handlers.
type Registry struct {
handlers map[string]Handler
}

// NewRegistry creates an empty handler registry.
func NewRegistry() *Registry {
return &Registry{handlers: make(map[string]Handler)}
}

// Register associates a Handler with the given job type.
// Registering the same type twice overwrites the previous handler.
func (r *Registry) Register(jobType string, h Handler) {
r.handlers[jobType] = h
}

// Get returns the Handler registered for the given job type, or false if none.
func (r *Registry) Get(jobType string) (Handler, bool) {
h, ok := r.handlers[jobType]
return h, ok
}

// ErrNoHandler is returned when a job type has no registered handler.
type ErrNoHandler struct {
JobType string
}

func (e *ErrNoHandler) Error() string {
return fmt.Sprintf("no handler registered for job type %q", e.JobType)
}
Loading
Loading