From f4efdf540f76c491e4080e274af6e270be2508ed Mon Sep 17 00:00:00 2001 From: James Hong Date: Wed, 10 Sep 2025 10:59:58 -0700 Subject: [PATCH] [NPE-719] Broaden/Refactor Payloadhandling --- lib/chore.rb | 3 +- lib/chore/job.rb | 31 +++++-------------- lib/chore/payload_handler.rb | 24 ++++++++++++++ lib/chore/worker.rb | 2 +- spec/chore/job_spec.rb | 6 ++-- .../worker/forked_worker_strategy_spec.rb | 2 +- .../worker/helpers/work_distributor_spec.rb | 4 +-- .../worker/single_worker_strategy_spec.rb | 2 +- spec/chore/worker_spec.rb | 16 +++++----- 9 files changed, 49 insertions(+), 41 deletions(-) create mode 100644 lib/chore/payload_handler.rb diff --git a/lib/chore.rb b/lib/chore.rb index 8d6a2323..32900209 100644 --- a/lib/chore.rb +++ b/lib/chore.rb @@ -14,6 +14,7 @@ require 'chore/util' require 'chore/worker' require 'chore/publisher' +require 'chore/payload_handler' # We have a number of things that can live here. I don't want to track ['queues/**','strategies/**'].each do |p| @@ -42,7 +43,7 @@ module Chore #:nodoc: :max_attempts => 1.0 / 0.0, # Infinity :dupe_on_cache_failure => false, :queue_polling_size => 10, - :payload_handler => Chore::Job, + :payload_handler => Chore::PayloadHandler, :master_procline => "chore-master-#{Chore::VERSION}", :worker_procline => "chore-worker-#{Chore::VERSION}", :consumer_sleep_interval => 1 diff --git a/lib/chore/job.rb b/lib/chore/job.rb index 53f6dc70..12a6a80c 100644 --- a/lib/chore/job.rb +++ b/lib/chore/job.rb @@ -7,7 +7,6 @@ module Chore # Chore::Job is the module which gives your job classes the methods they need to be published # and run within Chore. You cannot have a Job in Chore that does not include this module module Job - extend Util # An exception to represent a job choosing to forcibly reject a given instance of itself. # The reasoning behind rejecting the job and the message that spawned it are left to @@ -28,18 +27,6 @@ def self.included(base) #:nodoc: base.extend(Hooks) end - def self.payload_class(message) - constantize(message['class']) - end - - def self.decode(data) - Encoder::JsonEncoder.decode(data) - end - - def self.payload(message) - message['args'] - end - module ClassMethods DEFAULT_OPTIONS = { } @@ -99,11 +86,6 @@ def perform_async(*args) job.perform_async(*args) end - # Resque/Sidekiq compatible serialization. No reason to change what works - def job_hash(job_params) - {:class => self.to_s, :args => job_params} - end - # The name of the configured queue, combined with an optional prefix # # @return [String] @@ -142,15 +124,16 @@ def perform(*args) # Use the current configured publisher to send this job into a queue. def perform_async(*args) - self.class.run_hooks_for(:before_publish,*args) - @chore_publisher ||= self.class.options[:publisher] + klass = self.class + klass.run_hooks_for(:before_publish,*args) + @chore_publisher ||= klass.options[:publisher] - publish_job_hash = self.class.job_hash(args) - Chore.run_hooks_for(:around_publish, self.class.prefixed_queue_name, publish_job_hash) do - @chore_publisher.publish(self.class.prefixed_queue_name,publish_job_hash) + publish_job_hash = Chore.config.payload_handler.job_hash(klass.to_s, args) + Chore.run_hooks_for(:around_publish, klass.prefixed_queue_name, publish_job_hash) do + @chore_publisher.publish(klass.prefixed_queue_name,publish_job_hash) end - self.class.run_hooks_for(:after_publish,*args) + klass.run_hooks_for(:after_publish,*args) end end #Job diff --git a/lib/chore/payload_handler.rb b/lib/chore/payload_handler.rb new file mode 100644 index 00000000..838c3bb8 --- /dev/null +++ b/lib/chore/payload_handler.rb @@ -0,0 +1,24 @@ +module Chore + class PayloadHandler + extend Util + + def self.payload_class(message) + constantize(message["class"]) + end + + # Takes UnitOfWork and return decoded message + def self.decode(item) + Encoder::JsonEncoder.decode(item.message) + end + + def self.payload(message) + message["args"] + end + + # Resque/Sidekiq compatible serialization. No reason to change what works + def self.job_hash(klass, job_params) + # JSON only recognizes string keys, so use strings as keys in our hash for consistency in encoding/decoding + {"class" => klass, "args" => job_params} + end + end +end diff --git a/lib/chore/worker.rb b/lib/chore/worker.rb index d9aef668..1a74270c 100644 --- a/lib/chore/worker.rb +++ b/lib/chore/worker.rb @@ -77,7 +77,7 @@ def start @work.each do |item| return if @stopping begin - item.decoded_message = options[:payload_handler].decode(item.message) + item.decoded_message = options[:payload_handler].decode(item) item.klass = options[:payload_handler].payload_class(item.decoded_message) next if duplicate_work?(item) diff --git a/spec/chore/job_spec.rb b/spec/chore/job_spec.rb index ff54bf0c..433c9497 100644 --- a/spec/chore/job_spec.rb +++ b/spec/chore/job_spec.rb @@ -66,15 +66,15 @@ it 'should call an instance of the queue_options publisher' do args = [1,2,{:h => 'ash'}] TestJob.queue_options(:publisher => Chore::Publisher) - expect_any_instance_of(Chore::Publisher).to receive(:publish).with('test_queue',{:class => 'TestJob',:args => args}).and_return(true) + expect_any_instance_of(Chore::Publisher).to receive(:publish).with('test_queue',{"class" => 'TestJob',"args" => args}).and_return(true) TestJob.perform_async(*args) end it 'calls the around_publish hook with the correct parameters' do args = [1,2,{:h => 'ash'}] - expect(Chore).to receive(:run_hooks_for).with(:around_publish, 'test_queue', {:class => 'TestJob',:args => args}).and_call_original + expect(Chore).to receive(:run_hooks_for).with(:around_publish, 'test_queue', {"class" => 'TestJob',"args" => args}).and_call_original TestJob.queue_options(:publisher => Chore::Publisher) - expect_any_instance_of(Chore::Publisher).to receive(:publish).with('test_queue',{:class => 'TestJob',:args => args}).and_return(true) + expect_any_instance_of(Chore::Publisher).to receive(:publish).with('test_queue',{"class" => 'TestJob',"args" => args}).and_return(true) TestJob.perform_async(*args) end end diff --git a/spec/chore/strategies/worker/forked_worker_strategy_spec.rb b/spec/chore/strategies/worker/forked_worker_strategy_spec.rb index 8266f318..f2a4e4ec 100644 --- a/spec/chore/strategies/worker/forked_worker_strategy_spec.rb +++ b/spec/chore/strategies/worker/forked_worker_strategy_spec.rb @@ -16,7 +16,7 @@ nil, 'test', job_timeout, - Chore::Encoder::JsonEncoder.encode(TestJob.job_hash([1,2,"3"])), + Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(TestJob, [1,2,"3"])), 0, consumer ) diff --git a/spec/chore/strategies/worker/helpers/work_distributor_spec.rb b/spec/chore/strategies/worker/helpers/work_distributor_spec.rb index fd7fae8e..cbd475f0 100644 --- a/spec/chore/strategies/worker/helpers/work_distributor_spec.rb +++ b/spec/chore/strategies/worker/helpers/work_distributor_spec.rb @@ -11,7 +11,7 @@ SecureRandom.uuid, 'test', 60, - Chore::Encoder::JsonEncoder.encode(TestJob.job_hash([1,2,"3"])), + Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(TestJob, [1,2,"3"])), 0 ) end @@ -89,7 +89,7 @@ SecureRandom.uuid, 'test', 60, - Chore::Encoder::JsonEncoder.encode(TestJob.job_hash([1,2,"3"])), + Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(TestJob, [1,2,"3"])), 0 ) worker2 = Chore::Strategy::WorkerInfo.new(2) diff --git a/spec/chore/strategies/worker/single_worker_strategy_spec.rb b/spec/chore/strategies/worker/single_worker_strategy_spec.rb index 33f74312..bbc0bd29 100644 --- a/spec/chore/strategies/worker/single_worker_strategy_spec.rb +++ b/spec/chore/strategies/worker/single_worker_strategy_spec.rb @@ -3,7 +3,7 @@ describe Chore::Strategy::SingleWorkerStrategy do let(:manager) { double('Manager') } let(:job_timeout) { 60 } - let(:job) { Chore::UnitOfWork.new(SecureRandom.uuid, nil, 'test', job_timeout, Chore::Encoder::JsonEncoder.encode(TestJob.job_hash([1,2,"3"])), 0) } + let(:job) { Chore::UnitOfWork.new(SecureRandom.uuid, nil, 'test', job_timeout, Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(TestJob, [1,2,"3"])), 0) } subject { described_class.new(manager) } describe '#stop!' do diff --git a/spec/chore/worker_spec.rb b/spec/chore/worker_spec.rb index 586121a1..9b3e22d8 100644 --- a/spec/chore/worker_spec.rb +++ b/spec/chore/worker_spec.rb @@ -43,11 +43,11 @@ def perform(first, second) let(:consumer) { double('consumer', :complete => nil, :reject => nil) } let(:job_args) { [1,2,'3'] } - let(:job) { SimpleJob.job_hash(job_args) } + let(:job) { Chore::PayloadHandler.job_hash(SimpleJob, job_args) } it 'should use a default payload handler' do worker = Chore::Worker.new - expect(worker.options[:payload_handler]).to eq(Chore::Job) + expect(worker.options[:payload_handler]).to eq(Chore::PayloadHandler) end shared_examples_for "a worker" do @@ -82,11 +82,11 @@ def perform(first, second) context 'when the value being deduped on is unique' do let(:job_args) { [rand,2,'3'] } let(:encoded_job) { Chore::Encoder::JsonEncoder.encode(job) } - let(:job) { SimpleDedupeJob.job_hash(job_args) } + let(:job) { Chore::PayloadHandler.job_hash(SimpleDedupeJob, job_args) } it 'should call complete for each unique value' do allow(consumer).to receive(:duplicate_message?).and_return(false) work = [] - work << Chore::UnitOfWork.new(1, nil, 'dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(SimpleDedupeJob.job_hash([rand,2,'3'])), 0, consumer) + work << Chore::UnitOfWork.new(1, nil, 'dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(SimpleDedupeJob, [rand,2,'3'])), 0, consumer) expect(SimpleDedupeJob).to receive(:perform).exactly(1).times expect(consumer).to receive(:complete).exactly(1).times Chore::Worker.start(work, {:payload_handler => payload_handler}) @@ -96,9 +96,9 @@ def perform(first, second) context 'when the dedupe lambda does not take the same number of arguments as perform' do it 'should raise an error and not complete the job' do work = [] - work << Chore::UnitOfWork.new(1, nil, 'invalid_dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(InvalidDedupeJob.job_hash([rand,2,'3'])), 0, consumer) - work << Chore::UnitOfWork.new(2, nil, 'invalid_dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(InvalidDedupeJob.job_hash([rand,2,'3'])), 0, consumer) - work << Chore::UnitOfWork.new(1, nil, 'invalid_dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(InvalidDedupeJob.job_hash([rand,2,'3'])), 0, consumer) + work << Chore::UnitOfWork.new(1, nil, 'invalid_dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(InvalidDedupeJob, [rand,2,'3'])), 0, consumer) + work << Chore::UnitOfWork.new(2, nil, 'invalid_dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(InvalidDedupeJob, [rand,2,'3'])), 0, consumer) + work << Chore::UnitOfWork.new(1, nil, 'invalid_dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(InvalidDedupeJob, [rand,2,'3'])), 0, consumer) expect(consumer).not_to receive(:complete) Chore::Worker.start(work, {:payload_handler => payload_handler}) end @@ -108,7 +108,7 @@ def perform(first, second) describe "with default payload handler" do let(:encoded_job) { Chore::Encoder::JsonEncoder.encode(job) } - let(:payload_handler) { Chore::Job } + let(:payload_handler) { Chore::PayloadHandler } let(:payload) {job_args} it_behaves_like "a worker" end