Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ help:

test-setup: ## Prepare infrastructure for tests
@echo "+ $@"
docker-compose up -d
docker compose up -d
.PHONY: test-setup

test-teardown: ## Bring down test infrastructure
@echo "+ $@"
docker-compose rm -fsv
docker compose rm -fsv
.PHONY: test-teardown

test-run: ## Run tests
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ job, err = enqueuer.EnqueueUniqueInByKey("clear_cache", 300, work.Q{"object_id_"
```
For information on how this map will be serialized to form a unique key, see (https://golang.org/pkg/encoding/json/#Marshal).

Note: for `EnqueueUnique`, the unique Redis key is derived from the job name and its arguments; for `EnqueueUniqueByKey`, it is derived from the job name and the provided key map. The key is set atomically when the job is enqueued and is intended to remain set while the job is on the queue, being processed, or sitting in the retry queue. However, this unique key currently has a 24‑hour TTL; if it expires before the job finishes or is moved to the dead queue (for example, for very long‑running jobs or jobs scheduled more than 24 hours in the future), another job with the same name/arguments or name/key map may be enqueued.

### Periodic Enqueueing (Cron)

You can periodically enqueue jobs on your gocraft/work cluster using your worker pool. The [scheduling specification](https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format) uses a Cron syntax where the fields represent seconds, minutes, hours, day of the month, month, and week of the day, respectively. Even if you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.
Expand Down Expand Up @@ -318,6 +320,7 @@ You'll see a view that looks like this:
* Both normal queues and the scheduled queue are considered.
* When a unique job is enqueued, we'll atomically set a redis key that includes the job name and arguments and enqueue the job.
* After the job has been finished or put into dead queue, we'll delete that key to permit another job to be enqueued.
* The unique key remains set while the job is on the queue, being processed, or sitting in the retry queue; it is removed only after the job completes or is moved to the dead queue. Note that in the current implementation this Redis key is created with a 24-hour TTL, so it may expire before completion/dead, at which point another job with the same name/arguments can be enqueued.

### Periodic jobs

Expand Down
16 changes: 10 additions & 6 deletions enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,11 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri

// EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments.
// The already-enqueued job can be in the normal work queue or in the scheduled job queue.
// Once a worker begins processing a job, another job with the same name and arguments can be enqueued again.
// Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once.
// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs.
// While a job is enqueued, being processed, or present in the retry queue, the unique lock is held
// and another job with the same name and arguments cannot be enqueued. The unique key is removed
// only after the job finishes (or is moved to the dead queue), at which point a new unique job may
// be enqueued. In order to add robustness to the system, jobs are only unique for 24 hours after
// they're enqueued — this is mostly relevant for scheduled jobs.
Comment on lines +103 to +107
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says the unique key is removed only after the job finishes or is moved to the dead queue, but the Redis unique key is also set with a 24h TTL (see redis.go:366) and is not refreshed during processing/retries. That means the lock can expire before completion for long-running or long-delayed jobs. Please clarify the docstring to mention the TTL/possible expiry (or adjust the implementation to refresh the TTL while the job exists).

Copilot uses AI. Check for mistakes.
// EnqueueUnique returns the job if it was enqueued and nil if it wasn't
func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*Job, error) {
return e.EnqueueUniqueByKey(jobName, args, nil)
Expand All @@ -115,9 +117,11 @@ func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args ma

// EnqueueUniqueByKey enqueues a job unless a job is already enqueued with the same name and key, updating arguments.
// The already-enqueued job can be in the normal work queue or in the scheduled job queue.
// Once a worker begins processing a job, another job with the same name and key can be enqueued again.
// Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once.
// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs.
// While a job is enqueued, being processed, or present in the retry queue, the unique lock is held
// and another job with the same name and key cannot be enqueued. The unique key is removed only after
// the job finishes (or is moved to the dead queue), at which point a new unique job may be enqueued.
// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued —
// this is mostly relevant for scheduled jobs.
Comment on lines +120 to +124
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same documentation issue as EnqueueUnique: the unique key has a 24h TTL in the Lua script (redis.go:381) and can expire before the job completes/reaches dead, so the lock is not guaranteed to persist for the full lifecycle. Please reflect this TTL/expiry behavior in this doc comment (or change the implementation to refresh TTL as needed).

Copilot uses AI. Check for mistakes.
// EnqueueUniqueByKey returns the job if it was enqueued and nil if it wasn't
func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (*Job, error) {
enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap)
Expand Down
99 changes: 97 additions & 2 deletions enqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ func TestEnqueueUnique(t *testing.T) {
assert.NotNil(t, job)
}

// Tests that unique jobs are removed only after job is done or put in dead queue.
// Tests that unique jobs remain exclusive while the original is enqueued,
// in-progress, or sitting in the retry queue, and are only cleared after
// the job finishes or is moved to the dead queue. Specifically this test
// ensures a duplicate enqueue is rejected while the job is retried and
// allowed after the job is dead.
func TestOrderEnqueueUnique(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
Expand Down Expand Up @@ -371,7 +375,10 @@ func TestEnqueueUniqueByKey(t *testing.T) {
assert.NotNil(t, job)
}

// Tests that unique by key jobs are removed only after job is done or put in dead queue.
// Tests that unique-by-key jobs remain exclusive while the original is enqueued,
// in-progress, or sitting in the retry queue, and are only cleared after the
// job finishes or is moved to the dead queue. This mirrors the non-keyed test
// but verifies behavior when uniqueness is defined by a provided key map.
func TestOrderEnqueueUniqueByKey(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
Expand Down Expand Up @@ -468,6 +475,94 @@ func TestEnqueueUniqueInByKey(t *testing.T) {
assert.True(t, j.Unique)
}

// Tests that while a unique job is being processed, another job with
// the same name and key cannot be enqueued (unique lock held).
func TestEnqueueUniqueByKeyLockedWhileProcessing(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)

enqueuer := NewEnqueuer(ns, pool)

// Enqueue a unique job by key.
job, err := enqueuer.EnqueueUniqueByKey("proc", Q{"a": 1}, Q{"key": "123"})
require.NoError(t, err)
require.NotNil(t, job)

started := make(chan struct{})
done := make(chan struct{})

wp := NewWorkerPool(TestContext{}, 1, ns, pool)
wp.JobWithOptions("proc", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error {
close(started)
<-done
return nil
})

wp.Start()

// Wait for the job to be started (moved to in-progress and handler invoked).
select {
case <-time.After(5 * time.Second):
require.FailNow(t, "timed out waiting for job to start")
case <-started:
}

// While the original is processing, we should NOT be able to enqueue another
// job with the same unique key.
job2, err := enqueuer.EnqueueUniqueByKey("proc", Q{"a": 2}, Q{"key": "123"})
assert.NoError(t, err)
assert.Nil(t, job2)

close(done)
wp.Drain()
wp.Stop()
}

// Tests that while a unique job (keyed by arguments) is being processed,
// another job with the same name and arguments cannot be enqueued (unique lock held).
func TestEnqueueUniqueLockedWhileProcessing(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)

enqueuer := NewEnqueuer(ns, pool)

// Enqueue a unique job (uniqueness based on args).
job, err := enqueuer.EnqueueUnique("proc", Q{"a": 1})
require.NoError(t, err)
require.NotNil(t, job)

started := make(chan struct{})
done := make(chan struct{})

wp := NewWorkerPool(TestContext{}, 1, ns, pool)
wp.JobWithOptions("proc", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error {
close(started)
<-done
return nil
})

wp.Start()

// Wait for the job handler to start running.
select {
case <-time.After(5 * time.Second):
require.FailNow(t, "timed out waiting for job to start")
case <-started:
}

// While the original is processing, attempting to enqueue the same args
// should be rejected.
job2, err := enqueuer.EnqueueUnique("proc", Q{"a": 1})
assert.NoError(t, err)
assert.Nil(t, job2)

close(done)
wp.Drain()
wp.Stop()
}

func TestRunEnqueueUniqueInByKey(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
Expand Down