From a6901703d3cdd7e4cc3a67b0bfdec1632d8e4911 Mon Sep 17 00:00:00 2001 From: Liam Reckziegel Date: Tue, 31 Mar 2026 14:15:22 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20implement=20Phase=202=20=E2=80=94=20wor?= =?UTF-8?q?ker=20pool,=20backoff=20strategies,=20and=20retry=20fix?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Core changes: - Add internal/worker package: Handler registry, BackoffStrategy interface, FixedBackoff, ExponentialBackoff, Worker (single consumer), Pool (multi-queue multi-worker orchestrator) with graceful shutdown and per-worker state snapshots - Fix retry bug in both backends: Nack previously set non-final failures to "failed" status, which Reserve never picks up (it only selects "pending" jobs). Non-final failures now go back to "pending" with scheduled_at = now + retryDelay - Extend Backend.Nack signature to accept retryDelay time.Duration so the worker can pass its computed backoff to the backend atomically - Update all backend tests to use the new Nack signature and assert "pending" status (not "failed") for retryable jobs - Mark Phase 2 complete in PLAN.md Co-Authored-By: Claude Sonnet 4.6 --- PLAN.md | 27 +- internal/backend/backend.go | 8 +- internal/backend/postgres/postgres.go | 19 +- internal/backend/postgres/postgres_test.go | 16 +- internal/backend/redis/redis.go | 10 +- internal/backend/redis/redis_test.go | 16 +- internal/backend/redis/scripts.go | 28 +- internal/worker/backoff.go | 65 +++ internal/worker/handler.go | 44 ++ internal/worker/pool.go | 150 +++++++ internal/worker/worker.go | 205 ++++++++++ internal/worker/worker_test.go | 455 +++++++++++++++++++++ 12 files changed, 985 insertions(+), 58 deletions(-) create mode 100644 internal/worker/backoff.go create mode 100644 internal/worker/handler.go create mode 100644 internal/worker/pool.go create mode 100644 internal/worker/worker.go create mode 100644 internal/worker/worker_test.go diff --git a/PLAN.md b/PLAN.md index 9318ddb..92704e8 100644 --- a/PLAN.md +++ b/PLAN.md @@ -25,20 +25,21 @@ - [x] Redis implementation (fast queue operations, locks) - [x] Unit tests for backend behavior -## Phase 2 – Worker Pool & Execution +## Phase 2 – Worker Pool & Execution ✅ -- [ ] Implement worker pool: - - [ ] Multiple workers per queue - - [ ] Graceful shutdown - - [ ] Heartbeats / worker status -- [ ] Implement retry & backoff strategies: - - [ ] Fixed backoff - - [ ] Exponential backoff - - [ ] Max attempts → DLQ -- [ ] Worker handler registration: - - [ ] Map job type → handler func - - [ ] Context with job metadata -- [ ] Basic logging and instrumentation hooks +- [x] Implement worker pool: + - [x] Multiple workers per queue (configurable concurrency per queue) + - [x] Graceful shutdown (context cancellation, waits for in-flight jobs) + - [x] Worker status snapshots (idle / running / stopped + last job ID) +- [x] Implement retry & backoff strategies: + - [x] Fixed backoff (`FixedBackoff`) + - [x] Exponential backoff (`ExponentialBackoff`, capped at MaxDelay) + - [x] Max attempts → DLQ (handled in backend `Nack`) +- [x] Worker handler registration: + - [x] Map job type → handler func (`Registry`) + - [x] Context with job metadata passed to every handler +- [x] Basic logging and instrumentation hooks (`log/slog`, per-event structured logs) +- [x] Fix retry bug: backends now re-schedule non-final failures as `pending` with backoff delay ## Phase 3 – HTTP API diff --git a/internal/backend/backend.go b/internal/backend/backend.go index 1c64291..160d768 100644 --- a/internal/backend/backend.go +++ b/internal/backend/backend.go @@ -2,6 +2,7 @@ package backend import ( "context" + "time" "github.com/reckziegelwilliam/queuekit/internal/queue" ) @@ -18,9 +19,10 @@ type Backend interface { // Ack marks a job as successfully completed Ack(ctx context.Context, jobID string) error - // Nack marks a job as failed and increments its attempt count - // If the job has exceeded max attempts, it should be moved to DLQ automatically - Nack(ctx context.Context, jobID string, err error) error + // Nack marks a job as failed, increments its attempt count, and schedules a retry. + // retryDelay controls how long before the job becomes eligible for re-processing. + // If the job has exceeded max attempts it is moved to the dead-letter queue instead. + Nack(ctx context.Context, jobID string, err error, retryDelay time.Duration) error // MoveToDLQ moves a job to the dead-letter queue MoveToDLQ(ctx context.Context, jobID string) error diff --git a/internal/backend/postgres/postgres.go b/internal/backend/postgres/postgres.go index 23fa468..5f181cb 100644 --- a/internal/backend/postgres/postgres.go +++ b/internal/backend/postgres/postgres.go @@ -152,8 +152,10 @@ func (p *PostgresBackend) Ack(ctx context.Context, jobID string) error { return nil } -// Nack marks a job as failed and increments its attempt count -func (p *PostgresBackend) Nack(ctx context.Context, jobID string, jobErr error) error { +// Nack marks a job as failed, increments its attempt count, and schedules a retry. +// If attempts >= max_attempts the job is moved to the dead-letter queue instead. +// retryDelay controls when the job becomes eligible for re-processing. +func (p *PostgresBackend) Nack(ctx context.Context, jobID string, jobErr error, retryDelay time.Duration) error { tx, err := p.pool.Begin(ctx) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) @@ -183,20 +185,21 @@ func (p *PostgresBackend) Nack(ctx context.Context, jobID string, jobErr error) if attempts >= maxAttempts { query := ` UPDATE jobs - SET status = 'dead', attempts = $1, last_error = $2, + SET status = 'dead', attempts = $1, last_error = $2, failed_at = $3, updated_at = $3 WHERE id = $4 ` _, err = tx.Exec(ctx, query, attempts, lastError, now, jobID) } else { - // Otherwise mark as failed and allow retry + // Schedule retry: put back to pending with a future scheduled_at + retryAt := now.Add(retryDelay) query := ` UPDATE jobs - SET status = 'failed', attempts = $1, last_error = $2, - failed_at = $3, updated_at = $3 - WHERE id = $4 + SET status = 'pending', attempts = $1, last_error = $2, + failed_at = $3, updated_at = $3, scheduled_at = $4 + WHERE id = $5 ` - _, err = tx.Exec(ctx, query, attempts, lastError, now, jobID) + _, err = tx.Exec(ctx, query, attempts, lastError, now, retryAt, jobID) } if err != nil { diff --git a/internal/backend/postgres/postgres_test.go b/internal/backend/postgres/postgres_test.go index 5fb6e13..37e35b3 100644 --- a/internal/backend/postgres/postgres_test.go +++ b/internal/backend/postgres/postgres_test.go @@ -190,18 +190,18 @@ func TestPostgresBackend_Nack(t *testing.T) { require.NoError(t, err) testErr := &testError{msg: "processing failed"} - err = backend.Nack(ctx, reserved.ID, testErr) + err = backend.Nack(ctx, reserved.ID, testErr, 0) require.NoError(t, err) - // Check job was marked as failed - failed, err := backend.GetJob(ctx, reserved.ID) + // Job should be rescheduled as pending (retryable, attempts=1, max=2) + retrying, err := backend.GetJob(ctx, reserved.ID) require.NoError(t, err) - assert.Equal(t, queue.StatusFailed, failed.Status) - assert.Equal(t, 1, failed.Attempts) - assert.Equal(t, "processing failed", failed.LastError) + assert.Equal(t, queue.StatusPending, retrying.Status) + assert.Equal(t, 1, retrying.Attempts) + assert.Equal(t, "processing failed", retrying.LastError) - // Nack again - should move to dead letter queue - err = backend.Nack(ctx, failed.ID, testErr) + // Nack again - should move to dead letter queue (attempts=2 >= max=2) + err = backend.Nack(ctx, retrying.ID, testErr, 0) require.NoError(t, err) dead, err := backend.GetJob(ctx, failed.ID) diff --git a/internal/backend/redis/redis.go b/internal/backend/redis/redis.go index 0c6bece..5bc07e1 100644 --- a/internal/backend/redis/redis.go +++ b/internal/backend/redis/redis.go @@ -176,8 +176,10 @@ func (r *RedisBackend) Ack(ctx context.Context, jobID string) error { return nil } -// Nack marks a job as failed and increments its attempt count -func (r *RedisBackend) Nack(ctx context.Context, jobID string, jobErr error) error { +// Nack marks a job as failed, increments its attempt count, and schedules a retry. +// If attempts >= max_attempts the job is moved to the dead-letter queue instead. +// retryDelay controls when the job becomes eligible for re-processing. +func (r *RedisBackend) Nack(ctx context.Context, jobID string, jobErr error, retryDelay time.Duration) error { // Get current job data jobData, err := r.client.HGetAll(ctx, jobKey(jobID)).Result() if err != nil { @@ -191,7 +193,6 @@ func (r *RedisBackend) Nack(ctx context.Context, jobID string, jobErr error) err attempts, _ := strconv.Atoi(jobData["attempts"]) maxAttempts, _ := strconv.Atoi(jobData["max_attempts"]) queueName := jobData["queue"] - scheduledAt := jobData["scheduled_at"] lastError := "" if jobErr != nil { @@ -199,6 +200,7 @@ func (r *RedisBackend) Nack(ctx context.Context, jobID string, jobErr error) err } now := time.Now().UTC() + retryAt := now.Add(retryDelay) _, err = nackScript.Run(ctx, r.client, []string{jobKey(jobID), queueName}, @@ -206,7 +208,7 @@ func (r *RedisBackend) Nack(ctx context.Context, jobID string, jobErr error) err maxAttempts, lastError, now.Unix(), - scheduledAt, + retryAt.Unix(), ).Result() if err != nil { diff --git a/internal/backend/redis/redis_test.go b/internal/backend/redis/redis_test.go index a22b254..8933e17 100644 --- a/internal/backend/redis/redis_test.go +++ b/internal/backend/redis/redis_test.go @@ -188,18 +188,18 @@ func TestRedisBackend_Nack(t *testing.T) { require.NoError(t, err) testErr := &testError{msg: "processing failed"} - err = backend.Nack(ctx, reserved.ID, testErr) + err = backend.Nack(ctx, reserved.ID, testErr, 0) require.NoError(t, err) - // Check job was marked as failed - failed, err := backend.GetJob(ctx, reserved.ID) + // Job should be rescheduled as pending (retryable, attempts=1, max=2) + retrying, err := backend.GetJob(ctx, reserved.ID) require.NoError(t, err) - assert.Equal(t, queue.StatusFailed, failed.Status) - assert.Equal(t, 1, failed.Attempts) - assert.Equal(t, "processing failed", failed.LastError) + assert.Equal(t, queue.StatusPending, retrying.Status) + assert.Equal(t, 1, retrying.Attempts) + assert.Equal(t, "processing failed", retrying.LastError) - // Nack again - should move to dead letter queue - err = backend.Nack(ctx, failed.ID, testErr) + // Nack again - should move to dead letter queue (attempts=2 >= max=2) + err = backend.Nack(ctx, retrying.ID, testErr, 0) require.NoError(t, err) dead, err := backend.GetJob(ctx, failed.ID) diff --git a/internal/backend/redis/scripts.go b/internal/backend/redis/scripts.go index f29ef0d..dbbbff1 100644 --- a/internal/backend/redis/scripts.go +++ b/internal/backend/redis/scripts.go @@ -43,11 +43,11 @@ var reserveScript = redis.NewScript(` // Lua script to atomically nack a job // KEYS[1] = job hash key // KEYS[2] = queue name -// ARGV[1] = current attempts +// ARGV[1] = current attempts (before increment) // ARGV[2] = max_attempts // ARGV[3] = last_error -// ARGV[4] = current timestamp -// ARGV[5] = scheduled_at (for re-enqueue) +// ARGV[4] = current timestamp (unix seconds) +// ARGV[5] = retry_at timestamp (unix seconds, now + retryDelay) var nackScript = redis.NewScript(` local job_key = KEYS[1] local queue_name = KEYS[2] @@ -55,25 +55,25 @@ var nackScript = redis.NewScript(` local max_attempts = tonumber(ARGV[2]) local last_error = ARGV[3] local now = ARGV[4] - local scheduled_at = tonumber(ARGV[5]) - + local retry_at = tonumber(ARGV[5]) + -- Get job ID local job_id = redis.call('HGET', job_key, 'id') if not job_id then return redis.error_reply('job not found') end - + -- Update attempts and error - redis.call('HSET', job_key, + redis.call('HSET', job_key, 'attempts', attempts, 'last_error', last_error, 'failed_at', now, 'updated_at', now ) - + -- Remove from running set redis.call('SREM', 'status:queue:' .. queue_name .. ':running', job_id) - + -- Check if exceeded max attempts if attempts >= max_attempts then -- Move to dead letter queue @@ -81,11 +81,11 @@ var nackScript = redis.NewScript(` redis.call('SADD', 'status:queue:' .. queue_name .. ':dead', job_id) return 'dead' else - -- Re-enqueue as failed (can be retried) - redis.call('HSET', job_key, 'status', 'failed') - redis.call('ZADD', 'queue:' .. queue_name, scheduled_at, job_id) - redis.call('SADD', 'status:queue:' .. queue_name .. ':failed', job_id) - return 'failed' + -- Re-schedule as pending with backoff delay so Reserve can pick it up + redis.call('HSET', job_key, 'status', 'pending', 'scheduled_at', retry_at) + redis.call('ZADD', 'queue:' .. queue_name, retry_at, job_id) + redis.call('SADD', 'status:queue:' .. queue_name .. ':pending', job_id) + return 'pending' end `) diff --git a/internal/worker/backoff.go b/internal/worker/backoff.go new file mode 100644 index 0000000..1bf679d --- /dev/null +++ b/internal/worker/backoff.go @@ -0,0 +1,65 @@ +package worker + +import ( + "math" + "time" +) + +// BackoffStrategy determines how long to wait before retrying a failed job. +// attempts is the number of attempts already made (0-based before the next attempt). +type BackoffStrategy interface { + NextDelay(attempts int) time.Duration +} + +// FixedBackoff waits a constant duration between every retry. +type FixedBackoff struct { + Delay time.Duration +} + +// NextDelay always returns the fixed delay regardless of attempt count. +func (f *FixedBackoff) NextDelay(_ int) time.Duration { + return f.Delay +} + +// ExponentialBackoff doubles (or scales by Factor) the wait time after each failure, +// capped at MaxDelay. +type ExponentialBackoff struct { + // InitialDelay is the wait time after the first failure. + InitialDelay time.Duration + // MaxDelay is the upper bound on the computed delay. + MaxDelay time.Duration + // Factor is the multiplier applied per attempt. Defaults to 2.0 if zero. + Factor float64 +} + +// NextDelay returns InitialDelay * Factor^(attempts-1), capped at MaxDelay. +func (e *ExponentialBackoff) NextDelay(attempts int) time.Duration { + factor := e.Factor + if factor <= 0 { + factor = 2.0 + } + if attempts <= 0 { + return e.InitialDelay + } + delay := float64(e.InitialDelay) * math.Pow(factor, float64(attempts-1)) + if delay > float64(e.MaxDelay) { + return e.MaxDelay + } + return time.Duration(delay) +} + +// DefaultExponentialBackoff returns a sensible exponential backoff suitable for +// most production workloads: starts at 5 s and caps at 1 h. +func DefaultExponentialBackoff() *ExponentialBackoff { + return &ExponentialBackoff{ + InitialDelay: 5 * time.Second, + MaxDelay: 1 * time.Hour, + Factor: 2.0, + } +} + +// NoBackoff returns a zero-delay backoff — retries are immediate. +// Useful in tests or for jobs that should be retried as fast as possible. +func NoBackoff() *FixedBackoff { + return &FixedBackoff{Delay: 0} +} diff --git a/internal/worker/handler.go b/internal/worker/handler.go new file mode 100644 index 0000000..b0b1dea --- /dev/null +++ b/internal/worker/handler.go @@ -0,0 +1,44 @@ +package worker + +import ( + "context" + "fmt" + + "github.com/reckziegelwilliam/queuekit/internal/queue" +) + +// Handler is a function that processes a single job. +// It receives a context (which may be cancelled on shutdown) and the job to process. +// Return nil to indicate success, or a non-nil error to trigger a retry/DLQ. +type Handler func(ctx context.Context, job *queue.Job) error + +// Registry maps job types to their handlers. +type Registry struct { + handlers map[string]Handler +} + +// NewRegistry creates an empty handler registry. +func NewRegistry() *Registry { + return &Registry{handlers: make(map[string]Handler)} +} + +// Register associates a Handler with the given job type. +// Registering the same type twice overwrites the previous handler. +func (r *Registry) Register(jobType string, h Handler) { + r.handlers[jobType] = h +} + +// Get returns the Handler registered for the given job type, or false if none. +func (r *Registry) Get(jobType string) (Handler, bool) { + h, ok := r.handlers[jobType] + return h, ok +} + +// ErrNoHandler is returned when a job type has no registered handler. +type ErrNoHandler struct { + JobType string +} + +func (e *ErrNoHandler) Error() string { + return fmt.Sprintf("no handler registered for job type %q", e.JobType) +} diff --git a/internal/worker/pool.go b/internal/worker/pool.go new file mode 100644 index 0000000..8d35798 --- /dev/null +++ b/internal/worker/pool.go @@ -0,0 +1,150 @@ +package worker + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "github.com/reckziegelwilliam/queuekit/internal/backend" +) + +// QueueConfig holds the processing configuration for a single queue. +type QueueConfig struct { + // Name is the queue to consume from. + Name string + // Concurrency is the number of workers to run in parallel for this queue. + // Defaults to 1 if zero or negative. + Concurrency int +} + +// Pool manages a group of workers across one or more queues. +// Start it once; Stop waits for all in-flight jobs to complete before returning. +type Pool struct { + backend backend.Backend + registry *Registry + queues []QueueConfig + logger *slog.Logger + workerOpts []Option + + mu sync.Mutex + workers []*Worker + cancel context.CancelFunc + wg sync.WaitGroup + started bool +} + +// PoolOption is a functional option for Pool. +type PoolOption func(*Pool) + +// WithPoolLogger sets a custom logger for the pool and all workers it spawns. +func WithPoolLogger(l *slog.Logger) PoolOption { + return func(p *Pool) { + p.logger = l + p.workerOpts = append(p.workerOpts, WithWorkerLogger(l)) + } +} + +// WithPoolWorkerOptions appends worker-level options applied to every worker the pool creates. +func WithPoolWorkerOptions(opts ...Option) PoolOption { + return func(p *Pool) { p.workerOpts = append(p.workerOpts, opts...) } +} + +// NewPool creates a Pool that will dispatch jobs from the listed queues. +func NewPool(b backend.Backend, r *Registry, queues []QueueConfig, opts ...PoolOption) *Pool { + p := &Pool{ + backend: b, + registry: r, + queues: queues, + logger: slog.Default(), + } + for _, opt := range opts { + opt(p) + } + return p +} + +// Start launches all workers as goroutines. It returns immediately; workers run +// in the background until Stop is called or the parent context is cancelled. +// +// Start returns an error if the pool is already running. +func (p *Pool) Start(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.started { + return fmt.Errorf("pool is already running") + } + + poolCtx, cancel := context.WithCancel(ctx) + p.cancel = cancel + p.started = true + + workerIdx := 0 + for _, q := range p.queues { + concurrency := q.Concurrency + if concurrency <= 0 { + concurrency = 1 + } + for i := 0; i < concurrency; i++ { + id := fmt.Sprintf("worker-%d", workerIdx) + workerIdx++ + + w := NewWorker(id, q.Name, p.backend, p.registry, p.workerOpts...) + p.workers = append(p.workers, w) + + p.wg.Add(1) + go func(w *Worker) { + defer p.wg.Done() + w.Run(poolCtx) + }(w) + } + } + + p.logger.Info("worker pool started", + "queues", len(p.queues), + "total_workers", workerIdx, + ) + return nil +} + +// Stop cancels the pool's context and waits for all workers to finish their +// current job before returning. It is safe to call Stop multiple times. +func (p *Pool) Stop() { + p.mu.Lock() + cancel := p.cancel + p.mu.Unlock() + + if cancel != nil { + cancel() + } + p.wg.Wait() + + p.mu.Lock() + p.started = false + p.workers = nil + p.cancel = nil + p.mu.Unlock() + + p.logger.Info("worker pool stopped") +} + +// States returns a snapshot of every worker's current state. Safe to call at +// any time, including while the pool is running. +func (p *Pool) States() []WorkerState { + p.mu.Lock() + workers := make([]*Worker, len(p.workers)) + copy(workers, p.workers) + p.mu.Unlock() + + states := make([]WorkerState, len(workers)) + for i, w := range workers { + states[i] = w.State() + } + return states +} + +// Register is a convenience method that delegates to the pool's Registry. +func (p *Pool) Register(jobType string, h Handler) { + p.registry.Register(jobType, h) +} diff --git a/internal/worker/worker.go b/internal/worker/worker.go new file mode 100644 index 0000000..1f6ff0f --- /dev/null +++ b/internal/worker/worker.go @@ -0,0 +1,205 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/reckziegelwilliam/queuekit/internal/backend" + "github.com/reckziegelwilliam/queuekit/internal/queue" +) + +// Status represents the current operational state of a Worker. +type Status string + +const ( + // StatusIdle means the worker is polling for jobs but none are available. + StatusIdle Status = "idle" + // StatusRunning means the worker is currently executing a job. + StatusRunning Status = "running" + // StatusStopped means the worker has exited its run loop. + StatusStopped Status = "stopped" +) + +// WorkerState is a point-in-time snapshot of a worker's status. +type WorkerState struct { + ID string + Queue string + Status Status + LastJobID string + UpdatedAt time.Time +} + +// Worker is a single job consumer. It polls one queue, executes handlers, and +// calls Ack/Nack on the backend based on the outcome. +type Worker struct { + id string + queueName string + backend backend.Backend + registry *Registry + backoff BackoffStrategy + logger *slog.Logger + pollInterval time.Duration + + mu sync.Mutex + status Status + lastJobID string + updatedAt time.Time +} + +// Option is a functional option for configuring a Worker. +type Option func(*Worker) + +// WithBackoff sets a custom BackoffStrategy (default: exponential 5 s → 1 h). +func WithBackoff(b BackoffStrategy) Option { + return func(w *Worker) { w.backoff = b } +} + +// WithPollInterval sets how often the worker checks for new jobs (default: 1 s). +func WithPollInterval(d time.Duration) Option { + return func(w *Worker) { w.pollInterval = d } +} + +// WithWorkerLogger sets a custom structured logger. +func WithWorkerLogger(l *slog.Logger) Option { + return func(w *Worker) { w.logger = l } +} + +// NewWorker creates a Worker that processes jobs from queueName. +func NewWorker(id, queueName string, b backend.Backend, r *Registry, opts ...Option) *Worker { + w := &Worker{ + id: id, + queueName: queueName, + backend: b, + registry: r, + backoff: DefaultExponentialBackoff(), + logger: slog.Default(), + pollInterval: 1 * time.Second, + status: StatusIdle, + updatedAt: time.Now(), + } + for _, opt := range opts { + opt(w) + } + return w +} + +// State returns a thread-safe snapshot of the worker's current state. +func (w *Worker) State() WorkerState { + w.mu.Lock() + defer w.mu.Unlock() + return WorkerState{ + ID: w.id, + Queue: w.queueName, + Status: w.status, + LastJobID: w.lastJobID, + UpdatedAt: w.updatedAt, + } +} + +// Run starts the worker's polling loop. It blocks until ctx is cancelled. +func (w *Worker) Run(ctx context.Context) { + w.setState(StatusIdle, "") + w.logger.Info("worker started", "worker_id", w.id, "queue", w.queueName) + + defer func() { + w.setState(StatusStopped, "") + w.logger.Info("worker stopped", "worker_id", w.id, "queue", w.queueName) + }() + + ticker := time.NewTicker(w.pollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := w.processNext(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + w.logger.Error("worker error", "worker_id", w.id, "error", err) + } + } + } + } +} + +// processNext attempts to claim and execute the next available job. +func (w *Worker) processNext(ctx context.Context) error { + job, err := w.backend.Reserve(ctx, w.queueName) + if err != nil { + if errors.Is(err, context.Canceled) { + return err + } + return fmt.Errorf("reserve: %w", err) + } + if job == nil { + return nil // queue is empty + } + return w.execute(ctx, job) +} + +// execute runs the handler for a job and issues Ack or Nack based on the result. +func (w *Worker) execute(ctx context.Context, job *queue.Job) error { + w.setState(StatusRunning, job.ID) + defer w.setState(StatusIdle, "") + + w.logger.Info("executing job", + "worker_id", w.id, + "job_id", job.ID, + "job_type", job.Type, + "queue", job.Queue, + "attempt", job.Attempts+1, + "max_attempts", job.MaxAttempts, + ) + + handler, ok := w.registry.Get(job.Type) + if !ok { + // No handler — nack immediately so attempts are counted and DLQ applies. + noHandlerErr := &ErrNoHandler{JobType: job.Type} + w.logger.Error("no handler for job type", + "worker_id", w.id, "job_id", job.ID, "job_type", job.Type) + if err := w.backend.Nack(ctx, job.ID, noHandlerErr, 0); err != nil { + return fmt.Errorf("nack (no handler): %w", err) + } + return nil + } + + execErr := handler(ctx, job) + if execErr == nil { + w.logger.Info("job completed", + "worker_id", w.id, "job_id", job.ID, "job_type", job.Type) + if err := w.backend.Ack(ctx, job.ID); err != nil { + return fmt.Errorf("ack: %w", err) + } + return nil + } + + // Handler returned an error — calculate backoff and nack. + delay := w.backoff.NextDelay(job.Attempts + 1) + w.logger.Warn("job failed", + "worker_id", w.id, + "job_id", job.ID, + "job_type", job.Type, + "attempt", job.Attempts+1, + "retry_delay", delay, + "error", execErr, + ) + + if err := w.backend.Nack(ctx, job.ID, execErr, delay); err != nil { + return fmt.Errorf("nack: %w", err) + } + return nil +} + +// setState updates the worker's internal status (thread-safe). +func (w *Worker) setState(s Status, jobID string) { + w.mu.Lock() + defer w.mu.Unlock() + w.status = s + w.lastJobID = jobID + w.updatedAt = time.Now() +} diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go new file mode 100644 index 0000000..33ad0dc --- /dev/null +++ b/internal/worker/worker_test.go @@ -0,0 +1,455 @@ +package worker + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/reckziegelwilliam/queuekit/internal/queue" +) + +// --------------------------------------------------------------------------- +// Mock backend +// --------------------------------------------------------------------------- + +type mockBackend struct { + mu sync.Mutex + jobs map[string]*queue.Job + queued []*queue.Job + acked []string + nacked []string +} + +func newMockBackend() *mockBackend { + return &mockBackend{jobs: make(map[string]*queue.Job)} +} + +func (m *mockBackend) Enqueue(_ context.Context, job *queue.Job) error { + m.mu.Lock() + defer m.mu.Unlock() + m.jobs[job.ID] = job + m.queued = append(m.queued, job) + return nil +} + +func (m *mockBackend) Reserve(_ context.Context, queueName string) (*queue.Job, error) { + m.mu.Lock() + defer m.mu.Unlock() + for i, j := range m.queued { + if j.Queue == queueName && j.Status == queue.StatusPending { + m.queued = append(m.queued[:i], m.queued[i+1:]...) + j.Status = queue.StatusRunning + return j, nil + } + } + return nil, nil +} + +func (m *mockBackend) Ack(_ context.Context, jobID string) error { + m.mu.Lock() + defer m.mu.Unlock() + m.acked = append(m.acked, jobID) + if j, ok := m.jobs[jobID]; ok { + j.Status = queue.StatusCompleted + } + return nil +} + +func (m *mockBackend) Nack(_ context.Context, jobID string, err error, _ time.Duration) error { + m.mu.Lock() + defer m.mu.Unlock() + m.nacked = append(m.nacked, jobID) + if j, ok := m.jobs[jobID]; ok { + j.Attempts++ + if j.Attempts >= j.MaxAttempts { + j.Status = queue.StatusDead + } else { + // Re-schedule for immediate retry in the mock + j.Status = queue.StatusPending + m.queued = append(m.queued, j) + } + if err != nil { + j.LastError = err.Error() + } + } + return nil +} + +func (m *mockBackend) MoveToDLQ(_ context.Context, jobID string) error { + m.mu.Lock() + defer m.mu.Unlock() + if j, ok := m.jobs[jobID]; ok { + j.Status = queue.StatusDead + } + return nil +} + +func (m *mockBackend) ListQueues(_ context.Context) ([]queue.Queue, error) { return nil, nil } +func (m *mockBackend) ListJobs(_ context.Context, _, _ string, _, _ int) ([]*queue.Job, error) { + return nil, nil +} +func (m *mockBackend) GetJob(_ context.Context, jobID string) (*queue.Job, error) { + m.mu.Lock() + defer m.mu.Unlock() + j, ok := m.jobs[jobID] + if !ok { + return nil, fmt.Errorf("job not found: %s", jobID) + } + return j, nil +} +func (m *mockBackend) DeleteJob(_ context.Context, jobID string) error { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.jobs, jobID) + return nil +} +func (m *mockBackend) Close() error { return nil } + +// --------------------------------------------------------------------------- +// Backoff tests +// --------------------------------------------------------------------------- + +func TestFixedBackoff(t *testing.T) { + b := &FixedBackoff{Delay: 5 * time.Second} + for _, attempt := range []int{0, 1, 5, 100} { + assert.Equal(t, 5*time.Second, b.NextDelay(attempt), + "fixed backoff should always return the same delay") + } +} + +func TestExponentialBackoff(t *testing.T) { + b := &ExponentialBackoff{ + InitialDelay: 1 * time.Second, + MaxDelay: 30 * time.Second, + Factor: 2.0, + } + + assert.Equal(t, 1*time.Second, b.NextDelay(0), "attempt 0 should equal InitialDelay") + assert.Equal(t, 1*time.Second, b.NextDelay(1), "attempt 1 should equal InitialDelay") + assert.Equal(t, 2*time.Second, b.NextDelay(2)) + assert.Equal(t, 4*time.Second, b.NextDelay(3)) + assert.Equal(t, 8*time.Second, b.NextDelay(4)) + // Cap at MaxDelay + assert.Equal(t, 30*time.Second, b.NextDelay(10), "should be capped at MaxDelay") +} + +func TestDefaultExponentialBackoff(t *testing.T) { + b := DefaultExponentialBackoff() + assert.NotNil(t, b) + assert.Greater(t, b.NextDelay(5), b.NextDelay(1), "delay should increase with attempts") + assert.Equal(t, b.MaxDelay, b.NextDelay(100), "very high attempt should return MaxDelay") +} + +func TestNoBackoff(t *testing.T) { + b := NoBackoff() + assert.Equal(t, time.Duration(0), b.NextDelay(5)) +} + +// --------------------------------------------------------------------------- +// Handler registry tests +// --------------------------------------------------------------------------- + +func TestRegistry_RegisterAndGet(t *testing.T) { + r := NewRegistry() + + called := false + h := func(_ context.Context, _ *queue.Job) error { + called = true + return nil + } + + r.Register("email.send", h) + + got, ok := r.Get("email.send") + require.True(t, ok) + require.NotNil(t, got) + _ = got(context.Background(), nil) + assert.True(t, called) +} + +func TestRegistry_GetMissing(t *testing.T) { + r := NewRegistry() + _, ok := r.Get("nonexistent") + assert.False(t, ok) +} + +func TestRegistry_Overwrite(t *testing.T) { + r := NewRegistry() + r.Register("type.a", func(_ context.Context, _ *queue.Job) error { return nil }) + + var called string + r.Register("type.a", func(_ context.Context, _ *queue.Job) error { + called = "second" + return nil + }) + + got, ok := r.Get("type.a") + require.True(t, ok) + _ = got(context.Background(), nil) + assert.Equal(t, "second", called, "second registration should overwrite the first") +} + +// --------------------------------------------------------------------------- +// Worker tests +// --------------------------------------------------------------------------- + +func newTestJob(jobType, queueName string) *queue.Job { + return queue.NewJob(jobType, queueName, json.RawMessage(`{"test":true}`)) +} + +func TestWorker_SuccessfulJob(t *testing.T) { + b := newMockBackend() + r := NewRegistry() + + job := newTestJob("email.send", "default") + require.NoError(t, b.Enqueue(context.Background(), job)) + + executed := make(chan string, 1) + r.Register("email.send", func(_ context.Context, j *queue.Job) error { + executed <- j.ID + return nil + }) + + w := NewWorker("w1", "default", b, r, + WithBackoff(NoBackoff()), + WithPollInterval(10*time.Millisecond), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + go w.Run(ctx) + + select { + case gotID := <-executed: + assert.Equal(t, job.ID, gotID) + case <-ctx.Done(): + t.Fatal("handler was never called") + } + + // Give worker time to call Ack + time.Sleep(50 * time.Millisecond) + + b.mu.Lock() + defer b.mu.Unlock() + assert.Contains(t, b.acked, job.ID, "job should have been acked") +} + +func TestWorker_FailedJobNacked(t *testing.T) { + b := newMockBackend() + r := NewRegistry() + + job := newTestJob("risky.task", "default") + job.MaxAttempts = 3 + require.NoError(t, b.Enqueue(context.Background(), job)) + + r.Register("risky.task", func(_ context.Context, _ *queue.Job) error { + return errors.New("transient error") + }) + + w := NewWorker("w1", "default", b, r, + WithBackoff(NoBackoff()), + WithPollInterval(10*time.Millisecond), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + go w.Run(ctx) + <-ctx.Done() + + b.mu.Lock() + nacked := b.nacked + b.mu.Unlock() + + assert.NotEmpty(t, nacked, "job should have been nacked at least once") + assert.Contains(t, nacked, job.ID) +} + +func TestWorker_NoHandlerNacks(t *testing.T) { + b := newMockBackend() + r := NewRegistry() // no handlers registered + + job := newTestJob("unknown.type", "default") + require.NoError(t, b.Enqueue(context.Background(), job)) + + w := NewWorker("w1", "default", b, r, + WithBackoff(NoBackoff()), + WithPollInterval(10*time.Millisecond), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + go w.Run(ctx) + <-ctx.Done() + + b.mu.Lock() + nacked := b.nacked + b.mu.Unlock() + + assert.Contains(t, nacked, job.ID, "job with no handler should be nacked") +} + +func TestWorker_StateTransitions(t *testing.T) { + b := newMockBackend() + r := NewRegistry() + + block := make(chan struct{}) + r.Register("blocking.job", func(_ context.Context, _ *queue.Job) error { + <-block + return nil + }) + + job := newTestJob("blocking.job", "default") + require.NoError(t, b.Enqueue(context.Background(), job)) + + w := NewWorker("w1", "default", b, r, + WithPollInterval(10*time.Millisecond), + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go w.Run(ctx) + + // Worker should eventually transition to running + assert.Eventually(t, func() bool { + return w.State().Status == StatusRunning + }, 500*time.Millisecond, 10*time.Millisecond) + + // Unblock the job + close(block) + + // Worker should return to idle + assert.Eventually(t, func() bool { + return w.State().Status == StatusIdle + }, 500*time.Millisecond, 10*time.Millisecond) + + cancel() + + // Worker should stop + assert.Eventually(t, func() bool { + return w.State().Status == StatusStopped + }, 500*time.Millisecond, 10*time.Millisecond) +} + +// --------------------------------------------------------------------------- +// Pool tests +// --------------------------------------------------------------------------- + +func TestPool_StartStop(t *testing.T) { + b := newMockBackend() + r := NewRegistry() + r.Register("noop", func(_ context.Context, _ *queue.Job) error { return nil }) + + p := NewPool(b, r, []QueueConfig{ + {Name: "default", Concurrency: 2}, + {Name: "priority", Concurrency: 1}, + }) + + ctx := context.Background() + require.NoError(t, p.Start(ctx)) + + states := p.States() + assert.Len(t, states, 3, "should have 2+1=3 workers") + + p.Stop() + + // After Stop, States returns empty slice + assert.Empty(t, p.States()) +} + +func TestPool_DoubleStartReturnsError(t *testing.T) { + b := newMockBackend() + r := NewRegistry() + p := NewPool(b, r, []QueueConfig{{Name: "default", Concurrency: 1}}) + + ctx := context.Background() + require.NoError(t, p.Start(ctx)) + defer p.Stop() + + err := p.Start(ctx) + assert.Error(t, err, "starting an already-running pool should return an error") +} + +func TestPool_ProcessesJobs(t *testing.T) { + b := newMockBackend() + r := NewRegistry() + + const numJobs = 5 + processed := make(chan string, numJobs) + + r.Register("test.job", func(_ context.Context, j *queue.Job) error { + processed <- j.ID + return nil + }) + + for i := 0; i < numJobs; i++ { + job := newTestJob("test.job", "default") + require.NoError(t, b.Enqueue(context.Background(), job)) + } + + p := NewPool(b, r, []QueueConfig{{Name: "default", Concurrency: 3}}, + WithPoolWorkerOptions( + WithBackoff(NoBackoff()), + WithPollInterval(10*time.Millisecond), + ), + ) + + ctx := context.Background() + require.NoError(t, p.Start(ctx)) + + // Collect all processed job IDs + seen := make(map[string]bool) + timeout := time.After(2 * time.Second) + for len(seen) < numJobs { + select { + case id := <-processed: + seen[id] = true + case <-timeout: + t.Fatalf("only %d/%d jobs processed before timeout", len(seen), numJobs) + } + } + + p.Stop() + assert.Len(t, seen, numJobs, "all jobs should have been processed exactly once") +} + +func TestPool_Register(t *testing.T) { + b := newMockBackend() + r := NewRegistry() + p := NewPool(b, r, []QueueConfig{{Name: "q", Concurrency: 1}}) + + called := false + p.Register("my.type", func(_ context.Context, _ *queue.Job) error { + called = true + return nil + }) + + h, ok := r.Get("my.type") + require.True(t, ok) + _ = h(context.Background(), nil) + assert.True(t, called) +} + +func TestPool_DefaultConcurrency(t *testing.T) { + b := newMockBackend() + r := NewRegistry() + + // Concurrency 0 should default to 1 + p := NewPool(b, r, []QueueConfig{{Name: "default", Concurrency: 0}}) + + ctx := context.Background() + require.NoError(t, p.Start(ctx)) + defer p.Stop() + + assert.Len(t, p.States(), 1) +}