From c2d47eea3002926ec80528af2557cc5419073537 Mon Sep 17 00:00:00 2001 From: Artem Krasotin Date: Thu, 22 Jan 2026 15:18:10 +0200 Subject: [PATCH 1/4] adjust unique job comments and add tests --- README.md | 3 ++ enqueue.go | 16 +++++--- enqueue_test.go | 99 ++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 110 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 28ac79fe..efcf3ac7 100644 --- a/README.md +++ b/README.md @@ -220,6 +220,8 @@ job, err = enqueuer.EnqueueUniqueInByKey("clear_cache", 300, work.Q{"object_id_" ``` For information on how this map will be serialized to form a unique key, see (https://golang.org/pkg/encoding/json/#Marshal). +Note: the unique Redis key that prevents duplicates is set atomically when the job is enqueued and remains set while the job is on the queue, being processed, or sitting in the retry queue. The key is removed only after the job finishes or is moved to the dead queue, at which point another job with the same name/arguments may be enqueued. + ### Periodic Enqueueing (Cron) You can periodically enqueue jobs on your gocraft/work cluster using your worker pool. The [scheduling specification](https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format) uses a Cron syntax where the fields represent seconds, minutes, hours, day of the month, month, and week of the day, respectively. Even if you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once. @@ -318,6 +320,7 @@ You'll see a view that looks like this: * Both normal queues and the scheduled queue are considered. * When a unique job is enqueued, we'll atomically set a redis key that includes the job name and arguments and enqueue the job. * After the job has been finished or put into dead queue, we'll delete that key to permit another job to be enqueued. +* The unique key remains set while the job is on the queue, being processed, or sitting in the retry queue; it is removed only after the job completes or is moved to the dead queue. ### Periodic jobs diff --git a/enqueue.go b/enqueue.go index 0d2f763e..bcc6e7f0 100644 --- a/enqueue.go +++ b/enqueue.go @@ -100,9 +100,11 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri // EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments. // The already-enqueued job can be in the normal work queue or in the scheduled job queue. -// Once a worker begins processing a job, another job with the same name and arguments can be enqueued again. -// Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once. -// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. +// While a job is enqueued, being processed, or present in the retry queue, the unique lock is held +// and another job with the same name and arguments cannot be enqueued. The unique key is removed +// only after the job finishes (or is moved to the dead queue), at which point a new unique job may +// be enqueued. In order to add robustness to the system, jobs are only unique for 24 hours after +// they're enqueued — this is mostly relevant for scheduled jobs. // EnqueueUnique returns the job if it was enqueued and nil if it wasn't func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*Job, error) { return e.EnqueueUniqueByKey(jobName, args, nil) @@ -115,9 +117,11 @@ func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args ma // EnqueueUniqueByKey enqueues a job unless a job is already enqueued with the same name and key, updating arguments. // The already-enqueued job can be in the normal work queue or in the scheduled job queue. -// Once a worker begins processing a job, another job with the same name and key can be enqueued again. -// Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once. -// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. +// While a job is enqueued, being processed, or present in the retry queue, the unique lock is held +// and another job with the same name and key cannot be enqueued. The unique key is removed only after +// the job finishes (or is moved to the dead queue), at which point a new unique job may be enqueued. +// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued — +// this is mostly relevant for scheduled jobs. // EnqueueUniqueByKey returns the job if it was enqueued and nil if it wasn't func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (*Job, error) { enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap) diff --git a/enqueue_test.go b/enqueue_test.go index 37f97778..a5a79d26 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -175,7 +175,11 @@ func TestEnqueueUnique(t *testing.T) { assert.NotNil(t, job) } -// Tests that unique jobs are removed only after job is done or put in dead queue. +// Tests that unique jobs remain exclusive while the original is enqueued, +// in-progress, or sitting in the retry queue, and are only cleared after +// the job finishes or is moved to the dead queue. Specifically this test +// ensures a duplicate enqueue is rejected while the job is retried and +// allowed after the job is dead. func TestOrderEnqueueUnique(t *testing.T) { pool := newTestPool(":6379") ns := "work" @@ -371,7 +375,10 @@ func TestEnqueueUniqueByKey(t *testing.T) { assert.NotNil(t, job) } -// Tests that unique by key jobs are removed only after job is done or put in dead queue. +// Tests that unique-by-key jobs remain exclusive while the original is enqueued, +// in-progress, or sitting in the retry queue, and are only cleared after the +// job finishes or is moved to the dead queue. This mirrors the non-keyed test +// but verifies behavior when uniqueness is defined by a provided key map. func TestOrderEnqueueUniqueByKey(t *testing.T) { pool := newTestPool(":6379") ns := "work" @@ -468,6 +475,94 @@ func TestEnqueueUniqueInByKey(t *testing.T) { assert.True(t, j.Unique) } +// Tests that while a unique job is being processed, another job with +// the same name and key cannot be enqueued (unique lock held). +func TestEnqueueUniqueByKeyLockedWhileProcessing(t *testing.T) { + pool := newTestPool(":6379") + ns := "work" + cleanKeyspace(ns, pool) + + enqueuer := NewEnqueuer(ns, pool) + + // Enqueue a unique job by key. + job, err := enqueuer.EnqueueUniqueByKey("proc", Q{"a": 1}, Q{"key": "123"}) + require.NoError(t, err) + require.NotNil(t, job) + + started := make(chan struct{}) + done := make(chan struct{}) + + wp := NewWorkerPool(TestContext{}, 1, ns, pool) + wp.JobWithOptions("proc", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error { + close(started) + <-done + return nil + }) + + wp.Start() + + // Wait for the job to be started (moved to in-progress and handler invoked). + select { + case <-time.After(5 * time.Second): + require.FailNow(t, "timed out waiting for job to start") + case <-started: + } + + // While the original is processing, we should NOT be able to enqueue another + // job with the same unique key. + job2, err := enqueuer.EnqueueUniqueByKey("proc", Q{"a": 2}, Q{"key": "123"}) + assert.NoError(t, err) + assert.Nil(t, job2) + + close(done) + wp.Drain() + wp.Stop() +} + +// Tests that while a unique job (keyed by arguments) is being processed, +// another job with the same name and arguments cannot be enqueued (unique lock held). +func TestEnqueueUniqueLockedWhileProcessing(t *testing.T) { + pool := newTestPool(":6379") + ns := "work" + cleanKeyspace(ns, pool) + + enqueuer := NewEnqueuer(ns, pool) + + // Enqueue a unique job (uniqueness based on args). + job, err := enqueuer.EnqueueUnique("proc", Q{"a": 1}) + require.NoError(t, err) + require.NotNil(t, job) + + started := make(chan struct{}) + done := make(chan struct{}) + + wp := NewWorkerPool(TestContext{}, 1, ns, pool) + wp.JobWithOptions("proc", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error { + close(started) + <-done + return nil + }) + + wp.Start() + + // Wait for the job handler to start running. + select { + case <-time.After(5 * time.Second): + require.FailNow(t, "timed out waiting for job to start") + case <-started: + } + + // While the original is processing, attempting to enqueue the same args + // should be rejected. + job2, err := enqueuer.EnqueueUnique("proc", Q{"a": 1}) + assert.NoError(t, err) + assert.Nil(t, job2) + + close(done) + wp.Drain() + wp.Stop() +} + func TestRunEnqueueUniqueInByKey(t *testing.T) { pool := newTestPool(":6379") ns := "work" From 4dd17e5e647aad995700c527e09b2c8fe84e23d1 Mon Sep 17 00:00:00 2001 From: Artem Krasotin Date: Thu, 22 Jan 2026 15:20:30 +0200 Subject: [PATCH 2/4] Fix Makefile to use consistent Docker command syntax --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index df91bb08..33c32ca6 100644 --- a/Makefile +++ b/Makefile @@ -4,12 +4,12 @@ help: test-setup: ## Prepare infrastructure for tests @echo "+ $@" - docker-compose up -d + docker compose up -d .PHONY: test-setup test-teardown: ## Bring down test infrastructure @echo "+ $@" - docker-compose rm -fsv + docker compose rm -fsv .PHONY: test-teardown test-run: ## Run tests From 07462b0a223df26999909313452bcda9b0cc4cc0 Mon Sep 17 00:00:00 2001 From: Artem Krasotin Date: Thu, 22 Jan 2026 16:35:42 +0300 Subject: [PATCH 3/4] Update README.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index efcf3ac7..6c9f2333 100644 --- a/README.md +++ b/README.md @@ -220,7 +220,7 @@ job, err = enqueuer.EnqueueUniqueInByKey("clear_cache", 300, work.Q{"object_id_" ``` For information on how this map will be serialized to form a unique key, see (https://golang.org/pkg/encoding/json/#Marshal). -Note: the unique Redis key that prevents duplicates is set atomically when the job is enqueued and remains set while the job is on the queue, being processed, or sitting in the retry queue. The key is removed only after the job finishes or is moved to the dead queue, at which point another job with the same name/arguments may be enqueued. +Note: for `EnqueueUnique`, the unique Redis key is derived from the job name and its arguments; for `EnqueueUniqueByKey`, it is derived from the job name and the provided key map. The key is set atomically when the job is enqueued and is intended to remain set while the job is on the queue, being processed, or sitting in the retry queue. However, this unique key currently has a 24‑hour TTL; if it expires before the job finishes or is moved to the dead queue (for example, for very long‑running jobs or jobs scheduled more than 24 hours in the future), another job with the same name/arguments or name/key map may be enqueued. ### Periodic Enqueueing (Cron) From d703623b1a210069530f4342aad426251eda9e5c Mon Sep 17 00:00:00 2001 From: Artem Krasotin Date: Thu, 22 Jan 2026 16:36:36 +0300 Subject: [PATCH 4/4] Update README.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 6c9f2333..f7f68c53 100644 --- a/README.md +++ b/README.md @@ -320,7 +320,7 @@ You'll see a view that looks like this: * Both normal queues and the scheduled queue are considered. * When a unique job is enqueued, we'll atomically set a redis key that includes the job name and arguments and enqueue the job. * After the job has been finished or put into dead queue, we'll delete that key to permit another job to be enqueued. -* The unique key remains set while the job is on the queue, being processed, or sitting in the retry queue; it is removed only after the job completes or is moved to the dead queue. +* The unique key remains set while the job is on the queue, being processed, or sitting in the retry queue; it is removed only after the job completes or is moved to the dead queue. Note that in the current implementation this Redis key is created with a 24-hour TTL, so it may expire before completion/dead, at which point another job with the same name/arguments can be enqueued. ### Periodic jobs