From 5d631d697474d8c2bfe3b3c7a6305dc7c5eae669 Mon Sep 17 00:00:00 2001 From: Baicheng Yu Date: Thu, 13 Feb 2020 00:11:54 +0100 Subject: [PATCH 1/5] Tuning retry by using ActiveJob Exceptions implementation --- lib/quiq/job.rb | 3 +-- lib/quiq/scheduler.rb | 23 +++++++++++++---------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/lib/quiq/job.rb b/lib/quiq/job.rb index b4de0bc..5be3878 100644 --- a/lib/quiq/job.rb +++ b/lib/quiq/job.rb @@ -17,10 +17,9 @@ def run # Then load the definition of the job + its arguments klass = Object.const_get(payload['job_class']) - args = payload['arguments'] # Then run the task - klass.new.perform(*args) + klass.deserialize(payload).perform_now rescue JSON::ParserError => e Quiq.logger.warn("Invalid format: #{e}") send_to_dlq(@raw, e) diff --git a/lib/quiq/scheduler.rb b/lib/quiq/scheduler.rb index 754e48c..6dfcaf3 100644 --- a/lib/quiq/scheduler.rb +++ b/lib/quiq/scheduler.rb @@ -36,17 +36,20 @@ def self.enqueue_at(job, scheduled_at) # Push the job in its queue and remove from scheduler_queue def enqueue(job) - begin - payload = JSON.parse(job) - rescue JSON::ParserError => e - Quiq.logger.warn("Invalid format: #{e}") - Queue.send_to_dlq(job) - end + Async do + begin + payload = JSON.parse(job) + rescue JSON::ParserError => e + Quiq.logger.warn("Invalid format: #{e}") + Queue.send_to_dlq(job) + end - Quiq.redis.transaction do |multi| - multi.lpush(Queue.formatted_name(payload['queue_name']), job) - multi.zrem(SCHEDULER_KEY, job) - end + puts "job payload: #{payload}" + Quiq.redis.transaction do |multi| + multi.lpush(Queue.formatted_name(payload['queue_name']), job) + multi.zrem(SCHEDULER_KEY, job) + end + end.wait end end end From a4ed318842029bbd6784e58989d7dbc71359aadd Mon Sep 17 00:00:00 2001 From: Baicheng Yu Date: Sun, 16 Feb 2020 17:38:16 +0100 Subject: [PATCH 2/5] Add retry example to testapp --- testapp/app/jobs/test_job.rb | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/testapp/app/jobs/test_job.rb b/testapp/app/jobs/test_job.rb index 6e4cf48..48dc9e0 100644 --- a/testapp/app/jobs/test_job.rb +++ b/testapp/app/jobs/test_job.rb @@ -1,9 +1,16 @@ # frozen_string_literal: true class TestJob < ApplicationJob + class CustomError < StandardError; end + + retry_on(CustomError, wait: 5, attempts: 3, queue: :retry) do + raise + end + def perform(data, wait) puts "[Worker ##{$$}] Receiving new job: #{data}" Quiq.current_task.sleep wait puts "[Worker ##{$$}] Time to wake up after #{wait} seconds" + raise CustomError, 'Caught CustomError' end end From a5a1060fc9666f1969d16961fb85e70fd0757c84 Mon Sep 17 00:00:00 2001 From: Baicheng Yu Date: Sun, 16 Feb 2020 17:47:01 +0100 Subject: [PATCH 3/5] Update comments --- lib/quiq/job.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/quiq/job.rb b/lib/quiq/job.rb index 5be3878..c2d6c18 100644 --- a/lib/quiq/job.rb +++ b/lib/quiq/job.rb @@ -15,11 +15,12 @@ def run # First parse the raw message from redis payload = JSON.parse(@raw) - # Then load the definition of the job + its arguments + # Then load the definition of the job + deserialize it to a job object klass = Object.const_get(payload['job_class']) + job = klass.deserialize(payload) # Then run the task - klass.deserialize(payload).perform_now + job.perform_now rescue JSON::ParserError => e Quiq.logger.warn("Invalid format: #{e}") send_to_dlq(@raw, e) From 499ef9382a9c7516cfccc863bfd7c0996ba2f238 Mon Sep 17 00:00:00 2001 From: Baicheng Yu Date: Mon, 17 Feb 2020 22:15:38 +0100 Subject: [PATCH 4/5] Non blocking enqueue --- lib/quiq/scheduler.rb | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/lib/quiq/scheduler.rb b/lib/quiq/scheduler.rb index 6dfcaf3..754e48c 100644 --- a/lib/quiq/scheduler.rb +++ b/lib/quiq/scheduler.rb @@ -36,20 +36,17 @@ def self.enqueue_at(job, scheduled_at) # Push the job in its queue and remove from scheduler_queue def enqueue(job) - Async do - begin - payload = JSON.parse(job) - rescue JSON::ParserError => e - Quiq.logger.warn("Invalid format: #{e}") - Queue.send_to_dlq(job) - end + begin + payload = JSON.parse(job) + rescue JSON::ParserError => e + Quiq.logger.warn("Invalid format: #{e}") + Queue.send_to_dlq(job) + end - puts "job payload: #{payload}" - Quiq.redis.transaction do |multi| - multi.lpush(Queue.formatted_name(payload['queue_name']), job) - multi.zrem(SCHEDULER_KEY, job) - end - end.wait + Quiq.redis.transaction do |multi| + multi.lpush(Queue.formatted_name(payload['queue_name']), job) + multi.zrem(SCHEDULER_KEY, job) + end end end end From ae3c0e80944eb1d8a3c0097617ce83a8965d2df2 Mon Sep 17 00:00:00 2001 From: Baicheng Yu Date: Mon, 17 Feb 2020 22:21:41 +0100 Subject: [PATCH 5/5] Move retry example to RetryJob --- testapp/app/jobs/retry_job.rb | 16 ++++++++++++++++ testapp/app/jobs/test_job.rb | 7 ------- 2 files changed, 16 insertions(+), 7 deletions(-) create mode 100644 testapp/app/jobs/retry_job.rb diff --git a/testapp/app/jobs/retry_job.rb b/testapp/app/jobs/retry_job.rb new file mode 100644 index 0000000..1379a9e --- /dev/null +++ b/testapp/app/jobs/retry_job.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +class RetryJob < ApplicationJob + class CustomError < StandardError; end + + retry_on(CustomError, wait: 5, attempts: 3, queue: :retry) do + raise + end + + def perform(data, wait) + puts "[Worker ##{$$}] Receiving new job: #{data}" + Quiq.current_task.sleep wait + puts "[Worker ##{$$}] Time to wake up after #{wait} seconds" + raise CustomError, 'Caught CustomError' + end +end diff --git a/testapp/app/jobs/test_job.rb b/testapp/app/jobs/test_job.rb index 48dc9e0..6e4cf48 100644 --- a/testapp/app/jobs/test_job.rb +++ b/testapp/app/jobs/test_job.rb @@ -1,16 +1,9 @@ # frozen_string_literal: true class TestJob < ApplicationJob - class CustomError < StandardError; end - - retry_on(CustomError, wait: 5, attempts: 3, queue: :retry) do - raise - end - def perform(data, wait) puts "[Worker ##{$$}] Receiving new job: #{data}" Quiq.current_task.sleep wait puts "[Worker ##{$$}] Time to wake up after #{wait} seconds" - raise CustomError, 'Caught CustomError' end end