From 7740389a52579e19caf5c8cb2e3448926ff6890a Mon Sep 17 00:00:00 2001 From: Mohamed Beydoun Date: Thu, 25 Jan 2024 23:00:10 -0800 Subject: [PATCH 1/9] Add google-cloud-scheduler to gemspec --- cloudtasker.gemspec | 1 + 1 file changed, 1 insertion(+) diff --git a/cloudtasker.gemspec b/cloudtasker.gemspec index da6544ea..50b3e802 100644 --- a/cloudtasker.gemspec +++ b/cloudtasker.gemspec @@ -34,6 +34,7 @@ Gem::Specification.new do |spec| spec.add_dependency 'connection_pool' spec.add_dependency 'fugit' spec.add_dependency 'google-cloud-tasks' + spec.add_dependency 'google-cloud-scheduler' spec.add_dependency 'jwt' spec.add_dependency 'redis' spec.add_dependency 'retriable' From aae1090ac30c4a7854eb7a5ecc23e6126c75b5fe Mon Sep 17 00:00:00 2001 From: Mohamed Beydoun Date: Thu, 25 Jan 2024 23:00:43 -0800 Subject: [PATCH 2/9] Implement the cloud scheduler --- lib/cloudtasker.rb | 1 + lib/cloudtasker/cloud_scheduler.rb | 11 ++ lib/cloudtasker/cloud_scheduler/job.rb | 130 ++++++++++++++++++++ lib/cloudtasker/cloud_scheduler/manager.rb | 91 ++++++++++++++ lib/cloudtasker/cloud_scheduler/schedule.rb | 97 +++++++++++++++ 5 files changed, 330 insertions(+) create mode 100644 lib/cloudtasker/cloud_scheduler.rb create mode 100644 lib/cloudtasker/cloud_scheduler/job.rb create mode 100644 lib/cloudtasker/cloud_scheduler/manager.rb create mode 100644 lib/cloudtasker/cloud_scheduler/schedule.rb diff --git a/lib/cloudtasker.rb b/lib/cloudtasker.rb index 4a29ed6e..946bbc93 100644 --- a/lib/cloudtasker.rb +++ b/lib/cloudtasker.rb @@ -15,6 +15,7 @@ require 'cloudtasker/middleware/chain' require 'cloudtasker/authenticator' +require 'cloudtasker/cloud_scheduler' require 'cloudtasker/cloud_task' require 'cloudtasker/worker_logger' require 'cloudtasker/worker_handler' diff --git a/lib/cloudtasker/cloud_scheduler.rb b/lib/cloudtasker/cloud_scheduler.rb new file mode 100644 index 00000000..1310248f --- /dev/null +++ b/lib/cloudtasker/cloud_scheduler.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +require_relative 'cloud_scheduler/schedule' +require_relative 'cloud_scheduler/job' +require_relative 'cloud_scheduler/manager' + +module Cloudtasker + # Schedule jobs using GCP Cloud Scheduler + module CloudScheduler + end +end diff --git a/lib/cloudtasker/cloud_scheduler/job.rb b/lib/cloudtasker/cloud_scheduler/job.rb new file mode 100644 index 00000000..118219ef --- /dev/null +++ b/lib/cloudtasker/cloud_scheduler/job.rb @@ -0,0 +1,130 @@ +# frozen_string_literal: true + +require 'cloudtasker/worker_handler' + +module Cloudtasker + module CloudScheduler + # Manage cron jobs + class Job + # + # Return all jobs from a hash. + # + # @param [Hash] hash The hash to load jobs from. + # + # @return [Array] The list of jobs. + # + def self.load_from_hash!(hash) + Schedule.load_from_hash!(hash).map do |schedule| + new(schedule) + end + end + + attr_reader :schedule + + # + # Build a new instance of the class. + # + # @param [Cloudtasker::CloudScheduler::Schedule] schedule The schedule to run. + # + def initialize(schedule) + @schedule = schedule + end + + # + # Parent folder for all jobs. + # + # @return [String] The parent folder. + # + def parent + @parent ||= client.location_path(project: config.gcp_project_id, location: config.gcp_location_id) + end + + # + # Prefix for all jobs. + # + # @return [String] The job prefix. + # + def prefix + "#{parent}/jobs/#{config.gcp_queue_prefix}--" + end + + # + # Return name of the job in the remote scheduler. + # + # @return [String] The job name. + # + def remote_name + "#{prefix}#{schedule.id}" + end + + # + # Return the job name. + # + # @return [String] The job name. + # + def name + schedule.id + end + + # + # Create the job in the remote scheduler. + # + # @return [Google::Cloud::Scheduler::V1::Job] The job instance. + # + def create! + client.create_job(parent: parent, job: to_request_body) + end + + # + # Update the job in the remote scheduler. + # + # @return [Google::Cloud::Scheduler::V1::Job] The job instance. + # + def update! + client.update_job(job: to_request_body) + end + + # + # Delete the job from the remote scheduler. + # + # @return [Google::Protobuf::Empty] The job instance. + # + def delete! + client.delete_job(name: remote_name) + end + + # + # Return a hash that can be used to create/update a job in the remote scheduler. + # + # @return [Hash] The job hash. + # + def to_request_body + { + name: remote_name, + schedule: schedule.cron, + time_zone: schedule.time_zone, + http_target: { + uri: request_config[:url], + http_method: request_config[:http_method], + headers: request_config[:headers], + body: request_config[:body] + } + } + end + + private + + def request_config + schedule.worker_handler.task_payload[:http_request] + end + + def config + @config ||= Cloudtasker.config + end + + def client + @client ||= Google::Cloud::Scheduler.cloud_scheduler + end + end + end +end diff --git a/lib/cloudtasker/cloud_scheduler/manager.rb b/lib/cloudtasker/cloud_scheduler/manager.rb new file mode 100644 index 00000000..9ff2b4ca --- /dev/null +++ b/lib/cloudtasker/cloud_scheduler/manager.rb @@ -0,0 +1,91 @@ +# frozen_string_literal: true + +require 'google/cloud/scheduler' + +module Cloudtasker + module CloudScheduler + # Manage the synchronization of jobs between the + # local configuration and the remote scheduler. + class Manager + # + # Synchronize the local configuration with the remote scheduler. + # + # @param [String] file The path to the schedule configuration file. + # + def self.synchronize!(file) + config = YAML.load_file(file) + jobs = Job.load_from_hash!(config) + + new(jobs).synchronize! + end + + attr_reader :jobs + + # + # Build a new instance of the class. + # + # @param [Array] jobs The list of jobs to synchronize. + # + def initialize(jobs) + @jobs = jobs + end + + # + # Synchronize the local configuration with the remote scheduler. + # + # @return [nil] + # + def synchronize! + new_jobs.map(&:create!) + stale_jobs.map(&:update!) + deleted_jobs.map { |job| client.delete_job(name: job) } + + nil + end + + private + + def remote_jobs + @remote_jobs ||= client.list_jobs(parent: parent) + .response + .jobs + .map(&:name) + .select do |job| + job.start_with?(job_prefix) + end + end + + def new_jobs + jobs.reject do |job| + remote_jobs.include?(job.remote_name) + end + end + + def stale_jobs + jobs.select do |job| + remote_jobs.include?(job.remote_name) + end + end + + def deleted_jobs + remote_jobs - jobs.map(&:remote_name) + end + + def job_prefix + "#{parent}/jobs/#{config.gcp_queue_prefix}--" + end + + def parent + @parent ||= client.location_path(project: config.gcp_project_id, location: config.gcp_location_id) + end + + def config + @config ||= Cloudtasker.config + end + + def client + @client ||= Google::Cloud::Scheduler.cloud_scheduler + end + end + end +end diff --git a/lib/cloudtasker/cloud_scheduler/schedule.rb b/lib/cloudtasker/cloud_scheduler/schedule.rb new file mode 100644 index 00000000..4f2582b6 --- /dev/null +++ b/lib/cloudtasker/cloud_scheduler/schedule.rb @@ -0,0 +1,97 @@ +# frozen_string_literal: true + +require 'fugit' + +module Cloudtasker + module CloudScheduler + # Error raised when a schedule is invalid + class InvalidScheduleError < StandardError; end + + # Manage cron schedules + class Schedule + DEFAULT_TIME_ZONE = 'Etc/UTC' + + attr_accessor :id, :cron, :worker, :queue, :args, :time_zone + + # + # Return all valid schedules while raising an error if any schedule is invalid. + # + # @return [Array] The list of valid schedules. + # + # @raise [RuntimeError] If any schedule is invalid. + def self.load_from_hash!(hash) + return [] if hash.blank? + + hash.map do |id, config| + schedule = new( + id: id.to_s, + cron: config["cron"], + worker: config["worker"], + args: config["args"], + queue: config["queue"], + time_zone: config["time_zone"] || DEFAULT_TIME_ZONE + ) + + raise InvalidScheduleError, "Invalid schedule: #{schedule.id}" unless schedule.valid? + + schedule + end + end + + # + # Build a new instance of the class. + # + # @param [String] id The schedule ID. + # @param [String] cron The cron expression. + # @param [Class] worker The worker class to run. + # @param [Array] args The worker arguments. + # @param [String] queue The queue to use for the cron job. + # @param [String] time_zone The time zone to use for the cron job. + # + def initialize(id:, cron:, worker:, **opts) + @id = id + @cron = cron + @worker = worker + @args = opts[:args] + @queue = opts[:queue] + @time_zone = opts[:time_zone] + end + + # + # Validate the schedule + # + # @return [Boolean] True if the schedule is valid, false otherwise. + # + def valid? + id && cron_schedule && worker + end + + # + # Return the cron schedule to use for the job. + # + # @return [Fugit::Cron] The cron schedule. + # + def cron_schedule + @cron_schedule ||= Fugit::Cron.parse(cron) + end + + # + # Return an instance of the underlying worker. + # + # @return [Cloudtasker::WorkerWrapper] The worker instance + # + def worker_instance + @worker_instance ||= worker.safe_constantize.new(job_queue: queue, job_args: args) + end + + # + # Return an instance of the worker handler. + # + # @return [Cloudtasker::WorkerHandler] The worker handler. + # + def worker_handler + @worker_handler ||= Cloudtasker::WorkerHandler.new(worker_instance) + end + end + end +end From d81e05807c323cabdcc9eb7aedc718b16552a684 Mon Sep 17 00:00:00 2001 From: Mohamed Beydoun Date: Thu, 25 Jan 2024 23:02:20 -0800 Subject: [PATCH 3/9] Correct rubocop offenses --- cloudtasker.gemspec | 2 +- lib/cloudtasker/cloud_scheduler/schedule.rb | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cloudtasker.gemspec b/cloudtasker.gemspec index 50b3e802..907d6467 100644 --- a/cloudtasker.gemspec +++ b/cloudtasker.gemspec @@ -33,8 +33,8 @@ Gem::Specification.new do |spec| spec.add_dependency 'activesupport' spec.add_dependency 'connection_pool' spec.add_dependency 'fugit' - spec.add_dependency 'google-cloud-tasks' spec.add_dependency 'google-cloud-scheduler' + spec.add_dependency 'google-cloud-tasks' spec.add_dependency 'jwt' spec.add_dependency 'redis' spec.add_dependency 'retriable' diff --git a/lib/cloudtasker/cloud_scheduler/schedule.rb b/lib/cloudtasker/cloud_scheduler/schedule.rb index 4f2582b6..6d1196ea 100644 --- a/lib/cloudtasker/cloud_scheduler/schedule.rb +++ b/lib/cloudtasker/cloud_scheduler/schedule.rb @@ -25,11 +25,11 @@ def self.load_from_hash!(hash) hash.map do |id, config| schedule = new( id: id.to_s, - cron: config["cron"], - worker: config["worker"], - args: config["args"], - queue: config["queue"], - time_zone: config["time_zone"] || DEFAULT_TIME_ZONE + cron: config['cron'], + worker: config['worker'], + args: config['args'], + queue: config['queue'], + time_zone: config['time_zone'] || DEFAULT_TIME_ZONE ) raise InvalidScheduleError, "Invalid schedule: #{schedule.id}" unless schedule.valid? From 6d0883319659c63d9a52fc795b2fc0162122843e Mon Sep 17 00:00:00 2001 From: Mohamed Beydoun Date: Thu, 25 Jan 2024 23:42:55 -0800 Subject: [PATCH 4/9] Add OIDC support --- lib/cloudtasker/cloud_scheduler/job.rb | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/lib/cloudtasker/cloud_scheduler/job.rb b/lib/cloudtasker/cloud_scheduler/job.rb index 118219ef..34618464 100644 --- a/lib/cloudtasker/cloud_scheduler/job.rb +++ b/lib/cloudtasker/cloud_scheduler/job.rb @@ -107,13 +107,27 @@ def to_request_body uri: request_config[:url], http_method: request_config[:http_method], headers: request_config[:headers], - body: request_config[:body] - } + body: request_config[:body], + oidc_token: oidc_config + }.compact } end private + def oidc_config + return unless oidc? + + { + service_account_email: config.oidc[:service_account_email], + audience: config.oidc[:audience] + } + end + + def oidc? + config.oidc && config.oidc[:service_account_email] && config.oidc[:audience] + end + def request_config schedule.worker_handler.task_payload[:http_request] end From 1ddbffa3e75ed604b4d1639f962a4e50fca79542 Mon Sep 17 00:00:00 2001 From: Mohamed Beydoun Date: Fri, 26 Jan 2024 08:13:30 -0800 Subject: [PATCH 5/9] Refactor OIDC support --- lib/cloudtasker/cloud_scheduler/job.rb | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/lib/cloudtasker/cloud_scheduler/job.rb b/lib/cloudtasker/cloud_scheduler/job.rb index 34618464..26979814 100644 --- a/lib/cloudtasker/cloud_scheduler/job.rb +++ b/lib/cloudtasker/cloud_scheduler/job.rb @@ -108,26 +108,13 @@ def to_request_body http_method: request_config[:http_method], headers: request_config[:headers], body: request_config[:body], - oidc_token: oidc_config + oidc_token: config.oidc }.compact } end private - def oidc_config - return unless oidc? - - { - service_account_email: config.oidc[:service_account_email], - audience: config.oidc[:audience] - } - end - - def oidc? - config.oidc && config.oidc[:service_account_email] && config.oidc[:audience] - end - def request_config schedule.worker_handler.task_payload[:http_request] end From 9172f4f9f2a78918d8b79ba0f6680eeb4617ac5a Mon Sep 17 00:00:00 2001 From: Mohamed Beydoun Date: Fri, 26 Jan 2024 10:05:52 -0800 Subject: [PATCH 6/9] Add ActiveJob support --- lib/cloudtasker/cloud_scheduler/job.rb | 19 ++++---- .../cloud_scheduler/job/active_job_payload.rb | 26 +++++++++++ .../cloud_scheduler/job/worker_payload.rb | 32 ++++++++++++++ lib/cloudtasker/cloud_scheduler/schedule.rb | 44 +++++++++++++++---- 4 files changed, 105 insertions(+), 16 deletions(-) create mode 100644 lib/cloudtasker/cloud_scheduler/job/active_job_payload.rb create mode 100644 lib/cloudtasker/cloud_scheduler/job/worker_payload.rb diff --git a/lib/cloudtasker/cloud_scheduler/job.rb b/lib/cloudtasker/cloud_scheduler/job.rb index 26979814..9e1a57b3 100644 --- a/lib/cloudtasker/cloud_scheduler/job.rb +++ b/lib/cloudtasker/cloud_scheduler/job.rb @@ -72,7 +72,7 @@ def name # @return [Google::Cloud::Scheduler::V1::Job] The job instance. # def create! - client.create_job(parent: parent, job: to_request_body) + client.create_job(parent: parent, job: payload) end # @@ -81,7 +81,7 @@ def create! # @return [Google::Cloud::Scheduler::V1::Job] The job instance. # def update! - client.update_job(job: to_request_body) + client.update_job(job: payload) end # @@ -98,17 +98,20 @@ def delete! # # @return [Hash] The job hash. # - def to_request_body + def payload { name: remote_name, schedule: schedule.cron, time_zone: schedule.time_zone, http_target: { - uri: request_config[:url], - http_method: request_config[:http_method], - headers: request_config[:headers], - body: request_config[:body], - oidc_token: config.oidc + http_method: 'POST', + uri: config.processor_url, + oidc_token: config.oidc, + body: schedule.job_payload.to_json, + headers: { + Cloudtasker::Config::CONTENT_TYPE_HEADER => 'application/json', + Cloudtasker::Config::CT_AUTHORIZATION_HEADER => Authenticator.bearer_token + }.compact }.compact } end diff --git a/lib/cloudtasker/cloud_scheduler/job/active_job_payload.rb b/lib/cloudtasker/cloud_scheduler/job/active_job_payload.rb new file mode 100644 index 00000000..d14117c4 --- /dev/null +++ b/lib/cloudtasker/cloud_scheduler/job/active_job_payload.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module Cloudtasker + module CloudScheduler + class Job + # Payload used to schedule ActiveJob jobs on Cloud Scheduler + class ActiveJobPayload + attr_reader :worker + + def initialize(worker) + @worker = worker + end + + def to_h + { + 'worker' => 'ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper', + 'job_queue' => worker.queue_name, + 'job_id' => worker.job_id, + 'job_meta' => {}, + 'job_args' => [worker.serialize] + } + end + end + end + end +end diff --git a/lib/cloudtasker/cloud_scheduler/job/worker_payload.rb b/lib/cloudtasker/cloud_scheduler/job/worker_payload.rb new file mode 100644 index 00000000..d7573f68 --- /dev/null +++ b/lib/cloudtasker/cloud_scheduler/job/worker_payload.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +require 'cloudtasker/worker_handler' + +module Cloudtasker + module CloudScheduler + class Job + # Payload used to schedule Cloudtasker Workers on Cloud Scheduler + class WorkerPayload + attr_reader :worker + + def initialize(worker) + @worker = worker + end + + def to_h + JSON.parse(request_config[:body]) + end + + private + + def request_config + worker_handler.task_payload[:http_request] + end + + def worker_handler + @worker_handler ||= Cloudtasker::WorkerHandler.new(worker) + end + end + end + end +end diff --git a/lib/cloudtasker/cloud_scheduler/schedule.rb b/lib/cloudtasker/cloud_scheduler/schedule.rb index 6d1196ea..28c7f931 100644 --- a/lib/cloudtasker/cloud_scheduler/schedule.rb +++ b/lib/cloudtasker/cloud_scheduler/schedule.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true require 'fugit' +require 'cloudtasker/cloud_scheduler/job/active_job_payload' +require 'cloudtasker/cloud_scheduler/job/worker_payload' module Cloudtasker module CloudScheduler @@ -29,6 +31,7 @@ def self.load_from_hash!(hash) worker: config['worker'], args: config['args'], queue: config['queue'], + active_job: config['active_job'] || false, time_zone: config['time_zone'] || DEFAULT_TIME_ZONE ) @@ -54,6 +57,7 @@ def initialize(id:, cron:, worker:, **opts) @worker = worker @args = opts[:args] @queue = opts[:queue] + @active_job = opts[:active_job] @time_zone = opts[:time_zone] end @@ -76,21 +80,45 @@ def cron_schedule end # - # Return an instance of the underlying worker. + # Return if the specified worker is an ActiveJob. # - # @return [Cloudtasker::WorkerWrapper] The worker instance + # @return [Boolean] True if the worker is an ActiveJob, false otherwise. # - def worker_instance - @worker_instance ||= worker.safe_constantize.new(job_queue: queue, job_args: args) + def active_job? + @active_job end # - # Return an instance of the worker handler. + # Return the job payload to make requests to the remote scheduler. # - # @return [Cloudtasker::WorkerHandler] The worker handler. + # @return [Hash] The job payload. # - def worker_handler - @worker_handler ||= Cloudtasker::WorkerHandler.new(worker_instance) + def job_payload + if active_job? + Job::ActiveJobPayload.new(active_job_instance).to_h + else + Job::WorkerPayload.new(worker_instance).to_h + end + end + + private + + def worker_instance + @worker_instance ||= worker_class.new(job_queue: queue, job_args: args) + end + + def active_job_instance + @active_job_instance ||= begin + instance = worker_class.new(args) + instance.queue_name = queue if queue.present? + instance.timezone = time_zone if time_zone.present? + + instance + end + end + + def worker_class + worker.safe_constantize end end end From aa0303e10cfe4d6c97f50566bc4f9ba42fe444c2 Mon Sep 17 00:00:00 2001 From: Mohamed Beydoun Date: Fri, 26 Jan 2024 19:15:09 -0800 Subject: [PATCH 7/9] Add tests --- lib/cloudtasker/cloud_scheduler/job.rb | 14 +- lib/cloudtasker/cloud_scheduler/manager.rb | 1 + lib/cloudtasker/cloud_scheduler/schedule.rb | 13 +- .../job/active_job_payload_spec.rb | 26 +++ .../job/worker_payload_spec.rb | 19 ++ spec/cloudtasker/cloud_scheduler/job_spec.rb | 128 +++++++++++++ .../cloud_scheduler/manager_spec.rb | 77 ++++++++ .../cloud_scheduler/schedule_spec.rb | 173 ++++++++++++++++++ spec/support/test_active_job.rb | 5 + 9 files changed, 444 insertions(+), 12 deletions(-) create mode 100644 spec/cloudtasker/cloud_scheduler/job/active_job_payload_spec.rb create mode 100644 spec/cloudtasker/cloud_scheduler/job/worker_payload_spec.rb create mode 100644 spec/cloudtasker/cloud_scheduler/job_spec.rb create mode 100644 spec/cloudtasker/cloud_scheduler/manager_spec.rb create mode 100644 spec/cloudtasker/cloud_scheduler/schedule_spec.rb create mode 100644 spec/support/test_active_job.rb diff --git a/lib/cloudtasker/cloud_scheduler/job.rb b/lib/cloudtasker/cloud_scheduler/job.rb index 9e1a57b3..5754172d 100644 --- a/lib/cloudtasker/cloud_scheduler/job.rb +++ b/lib/cloudtasker/cloud_scheduler/job.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +require 'google/cloud/scheduler/v1' require 'cloudtasker/worker_handler' module Cloudtasker @@ -30,15 +31,6 @@ def initialize(schedule) @schedule = schedule end - # - # Parent folder for all jobs. - # - # @return [String] The parent folder. - # - def parent - @parent ||= client.location_path(project: config.gcp_project_id, location: config.gcp_location_id) - end - # # Prefix for all jobs. # @@ -118,8 +110,8 @@ def payload private - def request_config - schedule.worker_handler.task_payload[:http_request] + def parent + @parent ||= client.location_path(project: config.gcp_project_id, location: config.gcp_location_id) end def config diff --git a/lib/cloudtasker/cloud_scheduler/manager.rb b/lib/cloudtasker/cloud_scheduler/manager.rb index 9ff2b4ca..c719394c 100644 --- a/lib/cloudtasker/cloud_scheduler/manager.rb +++ b/lib/cloudtasker/cloud_scheduler/manager.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +require 'google/cloud/scheduler/v1' require 'google/cloud/scheduler' module Cloudtasker diff --git a/lib/cloudtasker/cloud_scheduler/schedule.rb b/lib/cloudtasker/cloud_scheduler/schedule.rb index 28c7f931..f14acea4 100644 --- a/lib/cloudtasker/cloud_scheduler/schedule.rb +++ b/lib/cloudtasker/cloud_scheduler/schedule.rb @@ -67,7 +67,7 @@ def initialize(id:, cron:, worker:, **opts) # @return [Boolean] True if the schedule is valid, false otherwise. # def valid? - id && cron_schedule && worker + id.present? && cron_schedule.present? && worker.present? end # @@ -101,6 +101,17 @@ def job_payload end end + # + # Equality operator. + # + # @param [Any] other The object to compare. + # + # @return [Boolean] True if the object is equal. + # + def ==(other) + other.is_a?(self.class) && id == other.id + end + private def worker_instance diff --git a/spec/cloudtasker/cloud_scheduler/job/active_job_payload_spec.rb b/spec/cloudtasker/cloud_scheduler/job/active_job_payload_spec.rb new file mode 100644 index 00000000..f53afa6d --- /dev/null +++ b/spec/cloudtasker/cloud_scheduler/job/active_job_payload_spec.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +RSpec.describe Cloudtasker::CloudScheduler::Job::ActiveJobPayload do + let(:worker) { TestActiveJob.new } + let(:payload) { described_class.new(worker) } + + before do + allow(worker).to receive(:job_id).and_return('123') + allow(worker).to receive(:queue_name).and_return('my_queue') + allow(worker).to receive(:serialize).and_return('serialized_job') + end + + describe '#to_h' do + subject(:hash) { payload.to_h } + + it { is_expected.to be_a(Hash) } + + it { + expect(hash).to include( + 'worker' => 'ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper', + 'job_id' => '123', 'job_args' => ['serialized_job'], + 'job_queue' => 'my_queue', 'job_meta' => {} + ) + } + end +end diff --git a/spec/cloudtasker/cloud_scheduler/job/worker_payload_spec.rb b/spec/cloudtasker/cloud_scheduler/job/worker_payload_spec.rb new file mode 100644 index 00000000..f1170b04 --- /dev/null +++ b/spec/cloudtasker/cloud_scheduler/job/worker_payload_spec.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +RSpec.describe Cloudtasker::CloudScheduler::Job::WorkerPayload do + let(:worker) { TestWorker.new(job_args: ['foo'], job_queue: 'my_queue') } + let(:payload) { described_class.new(worker) } + + describe '#to_h' do + subject(:hash) { payload.to_h } + + it { is_expected.to be_a(Hash) } + + it { + expect(hash).to include( + 'worker' => 'TestWorker', 'job_id' => worker.job_id, + 'job_args' => ['foo'], 'job_queue' => 'my_queue' + ) + } + end +end diff --git a/spec/cloudtasker/cloud_scheduler/job_spec.rb b/spec/cloudtasker/cloud_scheduler/job_spec.rb new file mode 100644 index 00000000..c6802493 --- /dev/null +++ b/spec/cloudtasker/cloud_scheduler/job_spec.rb @@ -0,0 +1,128 @@ +# frozen_string_literal: true + +require 'google/cloud/scheduler/v1' + +RSpec.describe Cloudtasker::CloudScheduler::Job do + let(:job) { described_class.new(scheduler) } + let(:parent_path) { '/path/to/parent' } + let(:queue_prefix) { Cloudtasker.config.gcp_queue_prefix } + let(:client) do + instance_double( + Google::Cloud::Scheduler::V1::CloudScheduler::Client, + location_path: parent_path, + create_job: Google::Cloud::Scheduler::V1::Job.new, + update_job: Google::Cloud::Scheduler::V1::Job.new, + delete_job: Google::Protobuf::Empty.new + ) + end + let(:scheduler) do + Cloudtasker::CloudScheduler::Schedule.new( + id: 'test', + cron: '* * * * *', + worker: 'TestWorker', + args: 'foo', + queue: 'default', + time_zone: 'America/New_York' + ) + end + + before do + allow(Google::Cloud::Scheduler).to receive(:cloud_scheduler).and_return(client) + end + + describe '.load_from_hash!' do + subject(:jobs) { described_class.load_from_hash!(hash) } + + let(:hash) do + { + 'test' => { + 'worker' => 'DummyWorker', + 'cron' => '* * * * *', + 'args' => { 'foo' => 'bar' }, + 'queue' => 'default', + 'time_zone' => 'America/New_York', + 'active_job' => true + } + } + end + + context 'with an empty hash' do + let(:hash) { {} } + + it { is_expected.to eq([]) } + end + + context 'with a valid schedule' do + it { is_expected.to be_a(Array) } + it { expect(jobs.size).to eq(1) } + it { expect(jobs.first).to be_a(described_class) } + end + end + + describe '.new' do + it { expect(job.schedule).to eq(scheduler) } + end + + describe '#prefix' do + subject(:parent) { job.prefix } + + it { is_expected.to eq("#{parent_path}/jobs/#{queue_prefix}--") } + end + + describe '#remote_name' do + subject(:name) { job.remote_name } + + it { is_expected.to eq("#{parent_path}/jobs/#{queue_prefix}--#{scheduler.id}") } + end + + describe '#name' do + subject(:name) { job.name } + + it { is_expected.to eq(scheduler.id) } + end + + describe '#create!' do + subject(:create!) { job.create! } + + after { expect(client).to have_received(:create_job) } + + it { is_expected.to be_a(Google::Cloud::Scheduler::V1::Job) } + end + + describe '#update!' do + subject(:update!) { job.update! } + + after { expect(client).to have_received(:update_job) } + + it { is_expected.to be_a(Google::Cloud::Scheduler::V1::Job) } + end + + describe '#delete!' do + subject(:delete!) { job.delete! } + + after { expect(client).to have_received(:delete_job) } + + it { is_expected.to be_a(Google::Protobuf::Empty) } + end + + describe '#payload' do + subject(:payload) { job.payload } + + before { allow(Cloudtasker::Authenticator).to receive(:bearer_token).and_return('token') } + + it { expect(payload[:name]).to eq(job.remote_name) } + it { expect(payload[:schedule]).to eq(scheduler.cron) } + it { expect(payload[:time_zone]).to eq(scheduler.time_zone) } + it { expect(payload[:http_target][:http_method]).to eq('POST') } + it { expect(payload[:http_target][:uri]).to eq(Cloudtasker.config.processor_url) } + it { expect(payload[:http_target][:oidc_token]).to eq(Cloudtasker.config.oidc) } + it { expect(payload[:http_target][:body]).to eq(scheduler.job_payload.to_json) } + + it { + expect(payload[:http_target][:headers]).to eq({ + Cloudtasker::Config::CONTENT_TYPE_HEADER => 'application/json', + Cloudtasker::Config::CT_AUTHORIZATION_HEADER => 'token' + }) + } + end +end diff --git a/spec/cloudtasker/cloud_scheduler/manager_spec.rb b/spec/cloudtasker/cloud_scheduler/manager_spec.rb new file mode 100644 index 00000000..d6bf2b65 --- /dev/null +++ b/spec/cloudtasker/cloud_scheduler/manager_spec.rb @@ -0,0 +1,77 @@ +# frozen_string_literal: true + +require 'google/cloud/scheduler/v1' + +RSpec.describe Cloudtasker::CloudScheduler::Manager do + let(:manager) { described_class.new(jobs) } + let(:parent_path) { '/path/to/parent' } + let(:queue_prefix) { Cloudtasker.config.gcp_queue_prefix } + let(:client) do + instance_double( + Google::Cloud::Scheduler::V1::CloudScheduler::Client, + location_path: parent_path, + create_job: Google::Cloud::Scheduler::V1::Job.new, + update_job: Google::Cloud::Scheduler::V1::Job.new, + delete_job: Google::Protobuf::Empty.new + ) + end + let(:jobs) do + [ + Cloudtasker::CloudScheduler::Job.new( + Cloudtasker::CloudScheduler::Schedule.new( + id: 'test', + cron: '* * * * *', + worker: 'TestWorker', + args: 'foo', + queue: 'default', + time_zone: 'America/New_York' + ) + ) + ] + end + + before do + allow(Google::Cloud::Scheduler).to receive(:cloud_scheduler).and_return(client) + end + + describe '.synchronize!' do + subject(:synchronize!) { described_class.synchronize!(file) } + + let(:file) { 'path/to/file' } + let(:config) { { 'test' => { 'worker' => 'TestWorker' } } } + let(:jobs) { [instance_double(Cloudtasker::CloudScheduler::Job, create!: nil, update!: nil)] } + let(:manager) { instance_double(described_class, synchronize!: nil) } + + before do + allow(YAML).to receive(:load_file).with(file).and_return(config) + allow(Cloudtasker::CloudScheduler::Job).to receive(:load_from_hash!).with(config).and_return(jobs) + allow(described_class).to receive(:new).with(jobs).and_return(manager) + end + + after { expect(manager).to have_received(:synchronize!) } + + it { is_expected.to be_nil } + end + + describe '#synchronize!' do + subject(:synchronize!) { manager.synchronize! } + + let(:new_jobs) { [instance_double(Cloudtasker::CloudScheduler::Job, create!: nil)] } + let(:stale_jobs) { [instance_double(Cloudtasker::CloudScheduler::Job, update!: nil)] } + let(:deleted_jobs) { ['path/to/deleted/job'] } + + before do + allow(manager).to receive(:new_jobs).and_return(new_jobs) + allow(manager).to receive(:stale_jobs).and_return(stale_jobs) + allow(manager).to receive(:deleted_jobs).and_return(deleted_jobs) + end + + after do + expect(new_jobs.first).to have_received(:create!) + expect(stale_jobs.first).to have_received(:update!) + expect(client).to have_received(:delete_job).with(name: deleted_jobs.first) + end + + it { is_expected.to be_nil } + end +end diff --git a/spec/cloudtasker/cloud_scheduler/schedule_spec.rb b/spec/cloudtasker/cloud_scheduler/schedule_spec.rb new file mode 100644 index 00000000..5c68cd50 --- /dev/null +++ b/spec/cloudtasker/cloud_scheduler/schedule_spec.rb @@ -0,0 +1,173 @@ +# frozen_string_literal: true + +RSpec.describe Cloudtasker::CloudScheduler::Schedule do + describe '.load_from_hash!' do + subject(:schedules) { described_class.load_from_hash!(hash) } + + let(:hash) do + { + 'test' => { + 'worker' => 'TestWorker', + 'cron' => '* * * * *', + 'args' => { 'foo' => 'bar' }, + 'queue' => 'default', + 'time_zone' => 'America/New_York', + 'active_job' => true + } + } + end + + context 'with an empty hash' do + let(:hash) { {} } + + it { is_expected.to eq([]) } + end + + context 'with an invalid schedule' do + let(:hash) { { 'test' => { 'worker' => 'TestWorker' } } } + + it 'raises an error' do + expect { schedules }.to raise_error(Cloudtasker::CloudScheduler::InvalidScheduleError) + end + end + + context 'with a valid schedule' do + it { is_expected.to be_a(Array) } + it { expect(schedules.size).to eq(1) } + it { expect(schedules.first).to be_a(described_class) } + end + end + + describe '.new' do + subject(:schedule) do + described_class.new( + id: id, + cron: cron, + worker: worker, + args: args, + queue: queue, + time_zone: time_zone + ) + end + + let(:id) { 'test' } + let(:cron) { '* * * * *' } + let(:worker) { 'TestWorker' } + let(:args) { { foo: 'bar' } } + let(:queue) { 'default' } + let(:time_zone) { 'America/New_York' } + + it { expect(schedule.id).to eq(id) } + it { expect(schedule.cron).to eq(cron) } + it { expect(schedule.worker).to eq(worker) } + it { expect(schedule.args).to eq(args) } + it { expect(schedule.queue).to eq(queue) } + it { expect(schedule.time_zone).to eq(time_zone) } + end + + describe '#valid?' do + subject { described_class.new(id: id, cron: cron, worker: worker).valid? } + + let(:id) { 'test' } + let(:cron) { '* * * * *' } + let(:worker) { 'TestWorker' } + + context 'with blank id' do + let(:id) { '' } + + it { is_expected.not_to be_truthy } + end + + context 'with an invalid cron' do + let(:cron) { 'invalid' } + + it { is_expected.not_to be_truthy } + end + + context 'with a blank worker' do + let(:worker) { '' } + + it { is_expected.not_to be_truthy } + end + + context 'with a valid id, cron and worker' do + it { is_expected.to be_truthy } + end + end + + describe '#cron_schedule' do + subject { described_class.new(id: 'test', cron: cron, worker: 'TestWorker').cron_schedule } + + let(:cron) { '* * * * *' } + + it { is_expected.to be_a(Fugit::Cron) } + end + + describe '#active_job?' do + subject do + described_class.new(id: 'test', cron: '* * * * *', worker: 'TestWorker', active_job: active_job).active_job? + end + + let(:active_job) { true } + + context 'with active_job set to true' do + it { is_expected.to be_truthy } + end + + context 'with active_job set to false' do + let(:active_job) { false } + + it { is_expected.not_to be_truthy } + end + end + + describe '#job_payload' do + subject(:payload) do + described_class.new( + id: 'test', + cron: '* * * * *', + worker: worker, + args: args, + queue: queue, + active_job: active_job, + time_zone: time_zone + ).job_payload + end + + let(:worker) { 'TestWorker' } + let(:args) { { 'foo' => 'bar' } } + let(:queue) { 'default' } + let(:active_job) { false } + let(:time_zone) { 'America/New_York' } + + # TODO: is there a better way of doing this? + before do + allow(TestActiveJob).to receive(:new).and_return(TestActiveJob.new) + allow_any_instance_of(TestActiveJob).to receive(:queue_name=).and_return(queue) + allow_any_instance_of(TestActiveJob).to receive(:queue_name).and_return(queue) + allow_any_instance_of(TestActiveJob).to receive(:timezone=).and_return(time_zone) + allow_any_instance_of(TestActiveJob).to receive(:timezone).and_return(time_zone) + allow_any_instance_of(TestActiveJob).to receive(:job_id).and_return('1234') + allow_any_instance_of(TestActiveJob).to receive(:serialize).and_return('{}') + end + + context 'with a regular worker' do + it { is_expected.to be_a(Hash) } + it { expect(payload['worker']).to eq(worker) } + it { expect(payload['job_queue']).to eq(queue) } + it { expect(payload['job_args']).to eq(args) } + end + + context 'with an ActiveJob worker' do + let(:worker) { 'TestActiveJob' } + let(:active_job) { true } + + it { is_expected.to be_a(Hash) } + it { expect(payload['worker']).to eq('ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper') } + it { expect(payload['job_queue']).to eq(queue) } + it { expect(payload['job_args']).to eq(['{}']) } + it { expect(payload['job_id']).to eq('1234') } + it { expect(payload['job_meta']).to eq({}) } + end + end +end diff --git a/spec/support/test_active_job.rb b/spec/support/test_active_job.rb new file mode 100644 index 00000000..f7732278 --- /dev/null +++ b/spec/support/test_active_job.rb @@ -0,0 +1,5 @@ +# frozen_string_literal: true + +class TestActiveJob + def perform(args); end +end From 328633871ec1db4e9ee7df9fb0278b29be08a34e Mon Sep 17 00:00:00 2001 From: Mohamed Beydoun Date: Fri, 26 Jan 2024 19:22:09 -0800 Subject: [PATCH 8/9] Add comments for private methods --- lib/cloudtasker/cloud_scheduler/job.rb | 17 +++++++- lib/cloudtasker/cloud_scheduler/manager.rb | 40 +++++++++++++++++++ lib/cloudtasker/cloud_scheduler/schedule.rb | 15 +++++++ .../cloud_scheduler/schedule_spec.rb | 1 - 4 files changed, 71 insertions(+), 2 deletions(-) diff --git a/lib/cloudtasker/cloud_scheduler/job.rb b/lib/cloudtasker/cloud_scheduler/job.rb index 5754172d..b4acaca4 100644 --- a/lib/cloudtasker/cloud_scheduler/job.rb +++ b/lib/cloudtasker/cloud_scheduler/job.rb @@ -32,7 +32,7 @@ def initialize(schedule) end # - # Prefix for all jobs. + # Prefix for all jobs that includes the parent path and the queue prefix. # # @return [String] The job prefix. # @@ -110,14 +110,29 @@ def payload private + # + # Return the parent path for all jobs. + # + # @return [String] The parent path. + # def parent @parent ||= client.location_path(project: config.gcp_project_id, location: config.gcp_location_id) end + # + # Return the Cloudtasker configuration. + # + # @return [Cloudtasker::Config] The configuration. + # def config @config ||= Cloudtasker.config end + # + # Return the Cloud Scheduler client. + # + # @return [Google::Cloud::Scheduler::V1::CloudSchedulerClient] The client. + # def client @client ||= Google::Cloud::Scheduler.cloud_scheduler end diff --git a/lib/cloudtasker/cloud_scheduler/manager.rb b/lib/cloudtasker/cloud_scheduler/manager.rb index c719394c..d4a4da6b 100644 --- a/lib/cloudtasker/cloud_scheduler/manager.rb +++ b/lib/cloudtasker/cloud_scheduler/manager.rb @@ -46,6 +46,11 @@ def synchronize! private + # + # Return all jobs from the remote scheduler. + # + # @return [Array] The list of job names. + # def remote_jobs @remote_jobs ||= client.list_jobs(parent: parent) .response @@ -56,34 +61,69 @@ def remote_jobs end end + # + # Return all jobs that are not yet created in the remote scheduler. + # + # @return [Array] The list of jobs. + # def new_jobs jobs.reject do |job| remote_jobs.include?(job.remote_name) end end + # + # Return all jobs that are present in both local config and remote scheduler. + # + # @return [Array] The list of jobs. + # def stale_jobs jobs.select do |job| remote_jobs.include?(job.remote_name) end end + # + # Return all jobs that are present in the remote scheduler but not in the local config. + # + # @return [Array] The list of job names. + # def deleted_jobs remote_jobs - jobs.map(&:remote_name) end + # + # Prefix for all jobs that includes the parent path and the queue prefix. + # + # @return [String] The job prefix. + # def job_prefix "#{parent}/jobs/#{config.gcp_queue_prefix}--" end + # + # Return the parent path for all jobs. + # + # @return [String] The parent path. + # def parent @parent ||= client.location_path(project: config.gcp_project_id, location: config.gcp_location_id) end + # + # Return the Cloudtasker configuration. + # + # @return [Cloudtasker::Config] The configuration. + # def config @config ||= Cloudtasker.config end + # + # Return the Cloud Scheduler client. + # + # @return [Google::Cloud::Scheduler::V1::CloudSchedulerClient] The client. + # def client @client ||= Google::Cloud::Scheduler.cloud_scheduler end diff --git a/lib/cloudtasker/cloud_scheduler/schedule.rb b/lib/cloudtasker/cloud_scheduler/schedule.rb index f14acea4..d4b4ab85 100644 --- a/lib/cloudtasker/cloud_scheduler/schedule.rb +++ b/lib/cloudtasker/cloud_scheduler/schedule.rb @@ -114,10 +114,20 @@ def ==(other) private + # + # Return an instance of the Cloudtasker Worker class. + # + # @return [Object] The worker instance. + # def worker_instance @worker_instance ||= worker_class.new(job_queue: queue, job_args: args) end + # + # Return an instance of the ActiveJob class. + # + # @return [Object] The ActiveJob instance. + # def active_job_instance @active_job_instance ||= begin instance = worker_class.new(args) @@ -128,6 +138,11 @@ def active_job_instance end end + # + # Return the worker class. + # + # @return [Class] The worker class. + # def worker_class worker.safe_constantize end diff --git a/spec/cloudtasker/cloud_scheduler/schedule_spec.rb b/spec/cloudtasker/cloud_scheduler/schedule_spec.rb index 5c68cd50..1904fdd3 100644 --- a/spec/cloudtasker/cloud_scheduler/schedule_spec.rb +++ b/spec/cloudtasker/cloud_scheduler/schedule_spec.rb @@ -140,7 +140,6 @@ let(:active_job) { false } let(:time_zone) { 'America/New_York' } - # TODO: is there a better way of doing this? before do allow(TestActiveJob).to receive(:new).and_return(TestActiveJob.new) allow_any_instance_of(TestActiveJob).to receive(:queue_name=).and_return(queue) From 4f18e4c766d8fad69638ea84262ca686f263f8b6 Mon Sep 17 00:00:00 2001 From: Mohamed Beydoun Date: Fri, 26 Jan 2024 19:25:19 -0800 Subject: [PATCH 9/9] Add comments to the job payloads --- .../cloud_scheduler/job/active_job_payload.rb | 10 ++++++++++ .../cloud_scheduler/job/worker_payload.rb | 20 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/lib/cloudtasker/cloud_scheduler/job/active_job_payload.rb b/lib/cloudtasker/cloud_scheduler/job/active_job_payload.rb index d14117c4..561b7344 100644 --- a/lib/cloudtasker/cloud_scheduler/job/active_job_payload.rb +++ b/lib/cloudtasker/cloud_scheduler/job/active_job_payload.rb @@ -7,10 +7,20 @@ class Job class ActiveJobPayload attr_reader :worker + # + # Build a new instance of the class. + # + # @param [ActiveJob::Base] worker The ActiveJob instance. + # def initialize(worker) @worker = worker end + # + # Return the Hash representation of the job payload. + # + # @return [Hash] The job payload. + # def to_h { 'worker' => 'ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper', diff --git a/lib/cloudtasker/cloud_scheduler/job/worker_payload.rb b/lib/cloudtasker/cloud_scheduler/job/worker_payload.rb index d7573f68..f7499d6f 100644 --- a/lib/cloudtasker/cloud_scheduler/job/worker_payload.rb +++ b/lib/cloudtasker/cloud_scheduler/job/worker_payload.rb @@ -9,20 +9,40 @@ class Job class WorkerPayload attr_reader :worker + # + # Build a new instance of the class. + # + # @param [Cloudtasker::Worker] worker The Cloudtasker Worker instance. + # def initialize(worker) @worker = worker end + # + # Return the Hash representation of the job payload. + # + # @return [Hash] The job payload. + # def to_h JSON.parse(request_config[:body]) end private + # + # Return the HTTP request configuration for a Cloud Task. + # + # @return [Hash] The request configuration. + # def request_config worker_handler.task_payload[:http_request] end + # + # Return the worker handler. + # + # @return [Cloudtasker::WorkerHandler] The worker handler. + # def worker_handler @worker_handler ||= Cloudtasker::WorkerHandler.new(worker) end