diff --git a/.github/workflows/unit_test-pr.yaml b/.github/workflows/unit_test-pr.yaml index aa8d25d5..1e9c558c 100644 --- a/.github/workflows/unit_test-pr.yaml +++ b/.github/workflows/unit_test-pr.yaml @@ -11,7 +11,6 @@ jobs: matrix: dependency_file: - Gemfile.latest - - Gemfile.ruby27 - Gemfile.ruby31 - Gemfile.ruby32 uses: ./.github/workflows/unit_test-run.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 047b65fd..a8ff8561 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,17 @@ ## [master](https://github.com/Tapjoy/chore/tree/master) +**Features** +- Google Pubsub support + +**Fixed bugs** +- N/A + +**Cleanups** +- Drops Ruby 2.7 support + +## [v4.7.5](https://github.com/Tapjoy/chore/tree/v4.7.5) + **Features** - N/A diff --git a/Gemfile.latest.lock b/Gemfile.latest.lock index d67038ea..a4a31551 100644 --- a/Gemfile.latest.lock +++ b/Gemfile.latest.lock @@ -1,14 +1,15 @@ PATH remote: . specs: - chore-core (4.7.0) - aws-sdk-sqs (>= 1) + chore-core (5.0.0) get_process_mem (>= 0.2.0) multi_json GEM remote: https://rubygems.org/ specs: + addressable (2.8.7) + public_suffix (>= 2.0.2, < 7.0) aws-eventstream (1.3.0) aws-partitions (1.983.0) aws-sdk-core (3.209.1) @@ -21,15 +22,90 @@ GEM aws-sigv4 (~> 1.5) aws-sigv4 (1.10.0) aws-eventstream (~> 1, >= 1.0.2) + base64 (0.3.0) bigdecimal (3.1.8) + concurrent-ruby (1.3.5) dalli (3.2.8) diff-lcs (1.5.1) + faraday (2.13.4) + faraday-net_http (>= 2.0, < 3.5) + json + logger + faraday-net_http (3.4.1) + net-http (>= 0.5.0) + faraday-retry (2.3.2) + faraday (~> 2.0) ffi (1.17.0) + gapic-common (1.1.0) + faraday (>= 1.9, < 3.a) + faraday-retry (>= 1.0, < 3.a) + google-cloud-env (~> 2.2) + google-logging-utils (~> 0.1) + google-protobuf (>= 3.25, < 5.a) + googleapis-common-protos (~> 1.6) + googleapis-common-protos-types (~> 1.15) + googleauth (~> 1.12) + grpc (~> 1.66) get_process_mem (1.0.0) bigdecimal (>= 2.0) ffi (~> 1.0) + google-cloud-core (1.8.0) + google-cloud-env (>= 1.0, < 3.a) + google-cloud-errors (~> 1.0) + google-cloud-env (2.3.1) + base64 (~> 0.2) + faraday (>= 1.0, < 3.a) + google-cloud-errors (1.5.0) + google-cloud-pubsub (3.0.2) + concurrent-ruby (~> 1.3) + google-cloud-core (~> 1.8) + google-cloud-pubsub-v1 (~> 1.11) + retriable (~> 3.1) + google-cloud-pubsub-v1 (1.11.1) + gapic-common (~> 1.0) + google-cloud-errors (~> 1.0) + google-iam-v1 (~> 1.3) + google-iam-v1 (1.4.0) + gapic-common (~> 1.0) + google-cloud-errors (~> 1.0) + grpc-google-iam-v1 (~> 1.11) + google-logging-utils (0.2.0) + google-protobuf (4.32.0) + bigdecimal + rake (>= 13) + googleapis-common-protos (1.7.0) + google-protobuf (>= 3.18, < 5.a) + googleapis-common-protos-types (~> 1.7) + grpc (~> 1.41) + googleapis-common-protos-types (1.20.0) + google-protobuf (>= 3.18, < 5.a) + googleauth (1.14.0) + faraday (>= 1.0, < 3.a) + google-cloud-env (~> 2.2) + google-logging-utils (~> 0.1) + jwt (>= 1.4, < 3.0) + multi_json (~> 1.11) + os (>= 0.9, < 2.0) + signet (>= 0.16, < 2.a) + grpc (1.74.1) + google-protobuf (>= 3.25, < 5.0) + googleapis-common-protos-types (~> 1.0) + grpc-google-iam-v1 (1.11.0) + google-protobuf (>= 3.18, < 5.a) + googleapis-common-protos (~> 1.7.0) + grpc (~> 1.41) jmespath (1.6.2) + json (2.13.2) + jwt (2.10.2) + base64 + logger (1.7.0) multi_json (1.15.0) + net-http (0.6.0) + uri + os (1.1.4) + public_suffix (6.0.2) + rake (13.3.0) + retriable (3.1.2) rspec (3.13.0) rspec-core (~> 3.13.0) rspec-expectations (~> 3.13.0) @@ -43,14 +119,22 @@ GEM diff-lcs (>= 1.2.0, < 2.0) rspec-support (~> 3.13.0) rspec-support (3.13.1) + signet (0.20.0) + addressable (~> 2.8) + faraday (>= 0.17.5, < 3.a) + jwt (>= 1.5, < 3.0) + multi_json (~> 1.10) timecop (0.9.10) + uri (1.0.3) PLATFORMS ruby DEPENDENCIES + aws-sdk-sqs (>= 1) chore-core! dalli (>= 2) + google-cloud-pubsub (>= 3.0, < 4.0) rspec (>= 3) timecop diff --git a/Gemfile.ruby27 b/Gemfile.ruby27 deleted file mode 100644 index 0847268c..00000000 --- a/Gemfile.ruby27 +++ /dev/null @@ -1,4 +0,0 @@ -source 'https://rubygems.org' -ruby '2.7.6' - -gemspec name: 'chore-core' diff --git a/Gemfile.ruby27.lock b/Gemfile.ruby27.lock deleted file mode 100644 index 6bb4bffa..00000000 --- a/Gemfile.ruby27.lock +++ /dev/null @@ -1,61 +0,0 @@ -PATH - remote: . - specs: - chore-core (4.7.0) - aws-sdk-sqs (>= 1) - get_process_mem (>= 0.2.0) - multi_json - -GEM - remote: https://rubygems.org/ - specs: - aws-eventstream (1.3.0) - aws-partitions (1.983.0) - aws-sdk-core (3.209.1) - aws-eventstream (~> 1, >= 1.3.0) - aws-partitions (~> 1, >= 1.651.0) - aws-sigv4 (~> 1.9) - jmespath (~> 1, >= 1.6.1) - aws-sdk-sqs (1.86.0) - aws-sdk-core (~> 3, >= 3.207.0) - aws-sigv4 (~> 1.5) - aws-sigv4 (1.10.0) - aws-eventstream (~> 1, >= 1.0.2) - bigdecimal (3.1.8) - dalli (3.2.8) - diff-lcs (1.5.1) - ffi (1.17.0) - get_process_mem (1.0.0) - bigdecimal (>= 2.0) - ffi (~> 1.0) - jmespath (1.6.2) - multi_json (1.15.0) - rspec (3.13.0) - rspec-core (~> 3.13.0) - rspec-expectations (~> 3.13.0) - rspec-mocks (~> 3.13.0) - rspec-core (3.13.1) - rspec-support (~> 3.13.0) - rspec-expectations (3.13.3) - diff-lcs (>= 1.2.0, < 2.0) - rspec-support (~> 3.13.0) - rspec-mocks (3.13.2) - diff-lcs (>= 1.2.0, < 2.0) - rspec-support (~> 3.13.0) - rspec-support (3.13.1) - timecop (0.9.10) - -PLATFORMS - ruby - -DEPENDENCIES - chore-core! - dalli (>= 2) - rspec (>= 3) - timecop - -RUBY VERSION - ruby 2.7.6p219 - -BUNDLED WITH - 2.4.22 diff --git a/Gemfile.ruby31.lock b/Gemfile.ruby31.lock index 76517a7e..8ba829de 100644 --- a/Gemfile.ruby31.lock +++ b/Gemfile.ruby31.lock @@ -1,14 +1,15 @@ PATH remote: . specs: - chore-core (4.7.0) - aws-sdk-sqs (>= 1) + chore-core (5.0.0) get_process_mem (>= 0.2.0) multi_json GEM remote: https://rubygems.org/ specs: + addressable (2.8.7) + public_suffix (>= 2.0.2, < 7.0) aws-eventstream (1.3.0) aws-partitions (1.983.0) aws-sdk-core (3.209.1) @@ -21,15 +22,90 @@ GEM aws-sigv4 (~> 1.5) aws-sigv4 (1.10.0) aws-eventstream (~> 1, >= 1.0.2) + base64 (0.3.0) bigdecimal (3.1.8) + concurrent-ruby (1.3.5) dalli (3.2.8) diff-lcs (1.5.1) + faraday (2.13.4) + faraday-net_http (>= 2.0, < 3.5) + json + logger + faraday-net_http (3.4.1) + net-http (>= 0.5.0) + faraday-retry (2.3.2) + faraday (~> 2.0) ffi (1.17.0) + gapic-common (1.1.0) + faraday (>= 1.9, < 3.a) + faraday-retry (>= 1.0, < 3.a) + google-cloud-env (~> 2.2) + google-logging-utils (~> 0.1) + google-protobuf (>= 3.25, < 5.a) + googleapis-common-protos (~> 1.6) + googleapis-common-protos-types (~> 1.15) + googleauth (~> 1.12) + grpc (~> 1.66) get_process_mem (1.0.0) bigdecimal (>= 2.0) ffi (~> 1.0) + google-cloud-core (1.8.0) + google-cloud-env (>= 1.0, < 3.a) + google-cloud-errors (~> 1.0) + google-cloud-env (2.3.1) + base64 (~> 0.2) + faraday (>= 1.0, < 3.a) + google-cloud-errors (1.5.0) + google-cloud-pubsub (3.0.2) + concurrent-ruby (~> 1.3) + google-cloud-core (~> 1.8) + google-cloud-pubsub-v1 (~> 1.11) + retriable (~> 3.1) + google-cloud-pubsub-v1 (1.11.1) + gapic-common (~> 1.0) + google-cloud-errors (~> 1.0) + google-iam-v1 (~> 1.3) + google-iam-v1 (1.4.0) + gapic-common (~> 1.0) + google-cloud-errors (~> 1.0) + grpc-google-iam-v1 (~> 1.11) + google-logging-utils (0.2.0) + google-protobuf (4.32.0) + bigdecimal + rake (>= 13) + googleapis-common-protos (1.7.0) + google-protobuf (>= 3.18, < 5.a) + googleapis-common-protos-types (~> 1.7) + grpc (~> 1.41) + googleapis-common-protos-types (1.20.0) + google-protobuf (>= 3.18, < 5.a) + googleauth (1.14.0) + faraday (>= 1.0, < 3.a) + google-cloud-env (~> 2.2) + google-logging-utils (~> 0.1) + jwt (>= 1.4, < 3.0) + multi_json (~> 1.11) + os (>= 0.9, < 2.0) + signet (>= 0.16, < 2.a) + grpc (1.74.1) + google-protobuf (>= 3.25, < 5.0) + googleapis-common-protos-types (~> 1.0) + grpc-google-iam-v1 (1.11.0) + google-protobuf (>= 3.18, < 5.a) + googleapis-common-protos (~> 1.7.0) + grpc (~> 1.41) jmespath (1.6.2) + json (2.13.2) + jwt (2.10.2) + base64 + logger (1.7.0) multi_json (1.15.0) + net-http (0.6.0) + uri + os (1.1.4) + public_suffix (6.0.2) + rake (13.3.0) + retriable (3.1.2) rspec (3.13.0) rspec-core (~> 3.13.0) rspec-expectations (~> 3.13.0) @@ -43,14 +119,22 @@ GEM diff-lcs (>= 1.2.0, < 2.0) rspec-support (~> 3.13.0) rspec-support (3.13.1) + signet (0.20.0) + addressable (~> 2.8) + faraday (>= 0.17.5, < 3.a) + jwt (>= 1.5, < 3.0) + multi_json (~> 1.10) timecop (0.9.10) + uri (1.0.3) PLATFORMS ruby DEPENDENCIES + aws-sdk-sqs (>= 1) chore-core! dalli (>= 2) + google-cloud-pubsub (>= 3.0, < 4.0) rspec (>= 3) timecop diff --git a/Gemfile.ruby32.lock b/Gemfile.ruby32.lock index 9216534a..e0b925bd 100644 --- a/Gemfile.ruby32.lock +++ b/Gemfile.ruby32.lock @@ -1,14 +1,15 @@ PATH remote: . specs: - chore-core (4.7.0) - aws-sdk-sqs (>= 1) + chore-core (5.0.0) get_process_mem (>= 0.2.0) multi_json GEM remote: https://rubygems.org/ specs: + addressable (2.8.7) + public_suffix (>= 2.0.2, < 7.0) aws-eventstream (1.3.0) aws-partitions (1.983.0) aws-sdk-core (3.209.1) @@ -21,15 +22,90 @@ GEM aws-sigv4 (~> 1.5) aws-sigv4 (1.10.0) aws-eventstream (~> 1, >= 1.0.2) + base64 (0.3.0) bigdecimal (3.1.8) + concurrent-ruby (1.3.5) dalli (3.2.8) diff-lcs (1.5.1) + faraday (2.13.4) + faraday-net_http (>= 2.0, < 3.5) + json + logger + faraday-net_http (3.4.1) + net-http (>= 0.5.0) + faraday-retry (2.3.2) + faraday (~> 2.0) ffi (1.17.0) + gapic-common (1.1.0) + faraday (>= 1.9, < 3.a) + faraday-retry (>= 1.0, < 3.a) + google-cloud-env (~> 2.2) + google-logging-utils (~> 0.1) + google-protobuf (>= 3.25, < 5.a) + googleapis-common-protos (~> 1.6) + googleapis-common-protos-types (~> 1.15) + googleauth (~> 1.12) + grpc (~> 1.66) get_process_mem (1.0.0) bigdecimal (>= 2.0) ffi (~> 1.0) + google-cloud-core (1.8.0) + google-cloud-env (>= 1.0, < 3.a) + google-cloud-errors (~> 1.0) + google-cloud-env (2.3.1) + base64 (~> 0.2) + faraday (>= 1.0, < 3.a) + google-cloud-errors (1.5.0) + google-cloud-pubsub (3.0.2) + concurrent-ruby (~> 1.3) + google-cloud-core (~> 1.8) + google-cloud-pubsub-v1 (~> 1.11) + retriable (~> 3.1) + google-cloud-pubsub-v1 (1.11.1) + gapic-common (~> 1.0) + google-cloud-errors (~> 1.0) + google-iam-v1 (~> 1.3) + google-iam-v1 (1.4.0) + gapic-common (~> 1.0) + google-cloud-errors (~> 1.0) + grpc-google-iam-v1 (~> 1.11) + google-logging-utils (0.2.0) + google-protobuf (4.32.0) + bigdecimal + rake (>= 13) + googleapis-common-protos (1.7.0) + google-protobuf (>= 3.18, < 5.a) + googleapis-common-protos-types (~> 1.7) + grpc (~> 1.41) + googleapis-common-protos-types (1.20.0) + google-protobuf (>= 3.18, < 5.a) + googleauth (1.14.0) + faraday (>= 1.0, < 3.a) + google-cloud-env (~> 2.2) + google-logging-utils (~> 0.1) + jwt (>= 1.4, < 3.0) + multi_json (~> 1.11) + os (>= 0.9, < 2.0) + signet (>= 0.16, < 2.a) + grpc (1.74.1) + google-protobuf (>= 3.25, < 5.0) + googleapis-common-protos-types (~> 1.0) + grpc-google-iam-v1 (1.11.0) + google-protobuf (>= 3.18, < 5.a) + googleapis-common-protos (~> 1.7.0) + grpc (~> 1.41) jmespath (1.6.2) + json (2.13.2) + jwt (2.10.2) + base64 + logger (1.7.0) multi_json (1.15.0) + net-http (0.6.0) + uri + os (1.1.4) + public_suffix (6.0.2) + rake (13.3.0) + retriable (3.1.2) rspec (3.13.0) rspec-core (~> 3.13.0) rspec-expectations (~> 3.13.0) @@ -43,14 +119,22 @@ GEM diff-lcs (>= 1.2.0, < 2.0) rspec-support (~> 3.13.0) rspec-support (3.13.1) + signet (0.20.0) + addressable (~> 2.8) + faraday (>= 0.17.5, < 3.a) + jwt (>= 1.5, < 3.0) + multi_json (~> 1.10) timecop (0.9.10) + uri (1.0.3) PLATFORMS ruby DEPENDENCIES + aws-sdk-sqs (>= 1) chore-core! dalli (>= 2) + google-cloud-pubsub (>= 3.0, < 4.0) rspec (>= 3) timecop diff --git a/README.md b/README.md index d25c6294..937f60a6 100644 --- a/README.md +++ b/README.md @@ -16,11 +16,24 @@ Chore can be integrated with any Ruby-based project by following these instructi 1. Add `chore-core` to the Gemfile ```ruby - gem 'chore-core', '~> 4.0.0' + gem 'chore-core', '~> 5.0.0' ``` - When using SQS, also add `dalli` to use for `memcached`-based deduplication: + **Queue Provider Dependencies**: Add the appropriate gem for your queue provider: + For **Amazon SQS**: + ```ruby + gem 'aws-sdk-sqs' + ``` + + For **Google Cloud Pub/Sub** (requires Ruby 3.1+ and google-cloud-pubsub 3.0+): + ```ruby + gem 'google-cloud-pubsub', '>= 3.0' + ``` + + For **Filesystem queues**: No additional gems required. + + **Optional: Message Deduplication**: If you want to use memcached-based message deduplication: ```ruby gem 'dalli' ``` @@ -40,6 +53,13 @@ dependencies and required code. `AWS_SECRET_ACCESS_KEY` environment variables) and an AWS region is set (e.g. `AWS_REGION` environment variable) so that Chore can authenticate with AWS. +1. When using Google Pubsub: Chore will use Google Cloud's automatic credential discovery (environment variables, service account, etc.). See the [Google Cloud PubSub Authentication documentation](https://cloud.google.com/ruby/docs/reference/google-cloud-pubsub/latest/AUTHENTICATION) for details. Otherwise, you can configure Google::Pubsub directly. + **Note**: PubSub requires gRPC which can have threading issues in some environments. google-cloud-pubsub 3.0+ includes improved gRPC handling, but if using PubSub in a threaded environment (such as with threaded consumer strategies), ensure you use gRPC version 1.74.1 or higher to avoid potential deadlocks and connection issues. + + +1. Queue configuration + **Configuration Priority**: Global configuration → Environment variables + By default, Chore will run over all queues it detects among the required files. If different behavior is desired, use one of the following flags: @@ -97,6 +117,7 @@ Chore.configure do |c| end ``` + Because it is like that the same application serves as the basis for both producing and consuming messages, and there will already be a considerable amount of configuration in the Producer, it makes sense to use Chorefile to simply provide the `require` option and stick to the initializer for the rest of the configuration to keep things DRY. @@ -117,12 +138,27 @@ This section assumes `foreman` is being used to execute (or export the run comma 1. If the queues do not exist, they must be created before the application can produce/consume Chore jobs: + For SQS: ```ruby require 'aws-sdk-sqs' sqs = Aws::SQS::Client.new sqs.create_queue(queue_name: "test_queue") ``` + For GCP Pub/Sub: + ```ruby + require 'google/cloud/pubsub' + pubsub = Google::Cloud::PubSub.new + topic = pubsub.create_topic "test_queue" + topic.create_subscription "test_queue" + ``` + + Alternatively, you can use Chore's built-in queue management: + ```ruby + # This will create both topics and subscriptions for Pub/Sub + Chore::Queues::PubSub.create_queues!(['test_queue']) + ``` + 1. Finally, start the application as usual ``` @@ -179,15 +215,59 @@ Chore.configure do |c| end ``` +Chore provides the following built-in publishers: + +* `Chore::Queues::SQS::Publisher` - For Amazon SQS +* `Chore::Queues::PubSub::Publisher` - For Google Cloud Pub/Sub +* `Chore::Queues::Filesystem::Publisher` - For filesystem-based queues + It is worth noting that any option that can be set via config file or command-line args can also be set in a configure block. -If a global publisher is set, it can be overridden on a per-job basis by specifying the publisher in `queue_options`. +If a global publisher is set, it can be overridden on a per-job basis by specifying the publisher in `queue_options`: + +```ruby +class MyJob + include Chore::Job + queue_options :name => 'my_queue', :publisher => Some::Other::Publisher + + def perform(args={}) + # Job logic here + end +end +``` + +**Note**: Setting the publisher does not automatically set the consumer. You must configure both separately if not using defaults and planning to run publisher and consumer concurrently. + +**Warning**: Using different queue types for consumer and publisher (e.g., PubSub consumer with SQS publisher) can be confusing and is generally not recommended. Mixed configurations should only be used carefully for specific scenarios like migrations, validations, or no-op queues. + +### Consumer Configuration + +Unlike publishers, **consumers are configured globally and cannot be set per-job**. Consumers operate at the process level, polling queues and dispatching work to workers. They handle all queues in the system. + +Consumers can be configured in three ways: + +1. **Command line**: `--consumer CLASS_NAME` +2. **Global configuration**: + ```ruby + Chore.configure do |c| + c.consumer = Your::Custom::Consumer + end + ``` +3. **Default**: `Chore::Queues::SQS::Consumer` if not specified + +Chore provides the following built-in consumers: + +* `Chore::Queues::SQS::Consumer` - For Amazon SQS (default) +* `Chore::Queues::PubSub::Consumer` - For Google Cloud Pub/Sub +* `Chore::Queues::Filesystem::Consumer` - For filesystem-based queues + +**Note**: Consumers cannot be specified in `queue_options` like publishers can. The consumer configuration applies to all queues processed by the Chore instance. This means each Chore instance can only use one consumer at a time. To use multiple consumers (e.g., both SQS and Pub/Sub), you would need to run separate Chore instances, each configured with its intended consumer. ## Retry Backoff Strategy -Chore has basic support for delaying retries of a failed job using a step function. Currently the only queue that -supports this functionality is SQS; all others will simply ignore the delay setting. +Chore has basic support for delaying retries of a failed job using a step function. Currently SQS and Pub/Sub +support this functionality; other queue types will simply ignore the delay setting. ### Setup @@ -207,6 +287,17 @@ If there is a `:backoff` option supplied, any failures will delay the next attem Read more details about SQS and Delays [here](docs/Delayed%20Jobs.md) +### Notes On GCP Pub/Sub + +GCP Pub/Sub uses a topic and subscription model. When using Chore with Pub/Sub: + +* Each queue corresponds to a Pub/Sub topic +* Subscriptions are automatically created with the naming pattern `{topic-name}` +* Message delays are handled using subscription-level ack deadline modification instead of SQS visibility timeouts +* Messages are acknowledged using subscription-level acknowledgment instead of being deleted +* Pull-based consumption is used with configurable batch sizes (up to 1000 messages) +* Delivery attempt is not guaranteed. Generally, this is not an issue and will default to queue configurations for redrive policy, backoff, etc. + ## Hooks A number of hooks, both global and per-job, exist in Chore for flexibility and convencience. Hooks should be named @@ -232,6 +323,10 @@ A number of hooks, both global and per-job, exist in Chore for flexibility and c * `on_fetch(handle, body)` +#### GCP Pub/Sub Consumer Hooks + +* `on_fetch(received_message, body)` + ### Per Job * `before_publish` diff --git a/chore-core.gemspec b/chore-core.gemspec index 9f36fee9..1d2a173f 100644 --- a/chore-core.gemspec +++ b/chore-core.gemspec @@ -36,9 +36,10 @@ Gem::Specification.new do |s| s.summary = "Job processing... for the future!" s.add_runtime_dependency(%q, [">= 0"]) - s.add_runtime_dependency(%q, [">= 1"]) s.add_runtime_dependency('get_process_mem', [">= 0.2.0"]) s.add_development_dependency(%q, [">= 3"]) + s.add_development_dependency(%q, [">= 1"]) + s.add_development_dependency(%q, [">= 3.0", "< 4.0"]) s.add_development_dependency(%q, [">= 2"]) s.add_development_dependency(%q, [">= 0"]) end diff --git a/chore.gemspec b/chore.gemspec index 4378a97f..a7f14223 100644 --- a/chore.gemspec +++ b/chore.gemspec @@ -36,10 +36,11 @@ Gem::Specification.new do |s| s.summary = "Job processing... for the future!" s.add_runtime_dependency(%q, [">= 0"]) - s.add_runtime_dependency(%q, [">= 1"]) s.add_runtime_dependency(%q, [">= 0"]) s.add_runtime_dependency(%q, [">= 0"]) s.add_development_dependency(%q, [">= 3"]) + s.add_development_dependency(%q, [">= 1"]) + s.add_development_dependency(%q, [">= 3.0"]) s.add_development_dependency(%q, [">= 2"]) s.add_development_dependency(%q, [">= 0"]) end diff --git a/lib/chore.rb b/lib/chore.rb index 8d6a2323..32900209 100644 --- a/lib/chore.rb +++ b/lib/chore.rb @@ -14,6 +14,7 @@ require 'chore/util' require 'chore/worker' require 'chore/publisher' +require 'chore/payload_handler' # We have a number of things that can live here. I don't want to track ['queues/**','strategies/**'].each do |p| @@ -42,7 +43,7 @@ module Chore #:nodoc: :max_attempts => 1.0 / 0.0, # Infinity :dupe_on_cache_failure => false, :queue_polling_size => 10, - :payload_handler => Chore::Job, + :payload_handler => Chore::PayloadHandler, :master_procline => "chore-master-#{Chore::VERSION}", :worker_procline => "chore-worker-#{Chore::VERSION}", :consumer_sleep_interval => 1 diff --git a/lib/chore/job.rb b/lib/chore/job.rb index 53f6dc70..12a6a80c 100644 --- a/lib/chore/job.rb +++ b/lib/chore/job.rb @@ -7,7 +7,6 @@ module Chore # Chore::Job is the module which gives your job classes the methods they need to be published # and run within Chore. You cannot have a Job in Chore that does not include this module module Job - extend Util # An exception to represent a job choosing to forcibly reject a given instance of itself. # The reasoning behind rejecting the job and the message that spawned it are left to @@ -28,18 +27,6 @@ def self.included(base) #:nodoc: base.extend(Hooks) end - def self.payload_class(message) - constantize(message['class']) - end - - def self.decode(data) - Encoder::JsonEncoder.decode(data) - end - - def self.payload(message) - message['args'] - end - module ClassMethods DEFAULT_OPTIONS = { } @@ -99,11 +86,6 @@ def perform_async(*args) job.perform_async(*args) end - # Resque/Sidekiq compatible serialization. No reason to change what works - def job_hash(job_params) - {:class => self.to_s, :args => job_params} - end - # The name of the configured queue, combined with an optional prefix # # @return [String] @@ -142,15 +124,16 @@ def perform(*args) # Use the current configured publisher to send this job into a queue. def perform_async(*args) - self.class.run_hooks_for(:before_publish,*args) - @chore_publisher ||= self.class.options[:publisher] + klass = self.class + klass.run_hooks_for(:before_publish,*args) + @chore_publisher ||= klass.options[:publisher] - publish_job_hash = self.class.job_hash(args) - Chore.run_hooks_for(:around_publish, self.class.prefixed_queue_name, publish_job_hash) do - @chore_publisher.publish(self.class.prefixed_queue_name,publish_job_hash) + publish_job_hash = Chore.config.payload_handler.job_hash(klass.to_s, args) + Chore.run_hooks_for(:around_publish, klass.prefixed_queue_name, publish_job_hash) do + @chore_publisher.publish(klass.prefixed_queue_name,publish_job_hash) end - self.class.run_hooks_for(:after_publish,*args) + klass.run_hooks_for(:after_publish,*args) end end #Job diff --git a/lib/chore/payload_handler.rb b/lib/chore/payload_handler.rb new file mode 100644 index 00000000..838c3bb8 --- /dev/null +++ b/lib/chore/payload_handler.rb @@ -0,0 +1,24 @@ +module Chore + class PayloadHandler + extend Util + + def self.payload_class(message) + constantize(message["class"]) + end + + # Takes UnitOfWork and return decoded message + def self.decode(item) + Encoder::JsonEncoder.decode(item.message) + end + + def self.payload(message) + message["args"] + end + + # Resque/Sidekiq compatible serialization. No reason to change what works + def self.job_hash(klass, job_params) + # JSON only recognizes string keys, so use strings as keys in our hash for consistency in encoding/decoding + {"class" => klass, "args" => job_params} + end + end +end diff --git a/lib/chore/queues/pubsub.rb b/lib/chore/queues/pubsub.rb new file mode 100644 index 00000000..69be40e4 --- /dev/null +++ b/lib/chore/queues/pubsub.rb @@ -0,0 +1,127 @@ +module Chore + module Queues + module PubSub + REQUIRED_LIBRARY = "google/cloud/pubsub".freeze + MIN_VERSION = Gem::Version.new('3.0.0') + + # Creates a configured PubSub client with the given options + # Creates a GCP Pub/Sub client using global configuration + def self.pubsub_client + require REQUIRED_LIBRARY + + gem_version = Gem::Version.new(Google::Cloud::PubSub::VERSION) + if gem_version < MIN_VERSION + Chore.logger.error "#{REQUIRED_LIBRARY} version #{gem_version} is not supported. Please use version >= #{MIN_VERSION}" if defined?(Chore.logger) + exit + end + + Google::Cloud::PubSub.new + end + # Helper method to create topics and subscriptions based on the currently known list as provided by your configured Chore::Jobs + # This is meant to be invoked from a rake task, and not directly. + # These topics and subscriptions will be created with the default settings, which may not be ideal. + # This is meant only as a convenience helper for testing, and not as a way to create production quality topics/subscriptions in Pub/Sub + # + # @param [TrueClass, FalseClass] halt_on_existing Raise an exception if the topic already exists + # + # @return [Array] + def self.create_queues!(halt_on_existing=false) + raise RuntimeError.new('You must have at least one Chore Job before attempting to create Pub/Sub topics') unless Chore.prefixed_queue_names.length > 0 + + if halt_on_existing + existing = self.existing_queues + if existing.size > 0 + raise <<-ERROR.gsub(/^\s+/, '') + We found topics/subscriptions that already exist! Verify your queue names or prefix are setup correctly. + + The following queue names were found: + #{existing.join("\n")} + ERROR + end + end + client = pubsub_client + + Chore.prefixed_queue_names.each do |queue_name| + Chore.logger.info "Chore Creating Pub/Sub Topic and Subscription: #{queue_name}" + topic_path = client.topic_path(queue_name) + subscription_path = client.subscription_path(queue_name) + + # We rescue in separate blocks because in cases where topic was created + # but the subscription was not, we still want to remove the subscription. + # + # Create topic first. (Reverse on delete) + begin + # Create topic using topic admin + client.topic_admin.create_topic(name: topic_path) + rescue Google::Cloud::AlreadyExistsError => e + Chore.logger.info "Topic already exists: #{e}" + end + + begin + # Create subscription using subscription admin + client.subscription_admin.create_subscription( + name: subscription_path, + topic: topic_path + ) + rescue Google::Cloud::AlreadyExistsError => e + Chore.logger.info "Subscription already exists: #{e}" + end + end + + Chore.prefixed_queue_names + end + + # Helper method to delete all known topics and subscriptions based on the list as provided by your configured Chore::Jobs + # This is meant to be invoked from a rake task, and not directly. + # + # @return [Array] + def self.delete_queues! + raise RuntimeError.new('You must have at least one Chore Job before attempting to delete Pub/Sub topics') unless Chore.prefixed_queue_names.length > 0 + + client = pubsub_client + Chore.prefixed_queue_names.each do |queue_name| + Chore.logger.info "Chore Deleting Pub/Sub Topic and Subscription: #{queue_name}" + + # We rescue in separate blocks because in cases where subscription was removed + # but the topic was not, we still want to remove the topic. + # + # Delete subscription first + begin + path = client.subscription_path(queue_name) + client.subscription_admin.delete_subscription(subscription: path) + rescue Google::Cloud::NotFoundError => e + Chore.logger.error "Deleting Subscription: #{queue_name} failed because #{e}" + end + + # Then delete topic + begin + path = client.topic_path(queue_name) + client.topic_admin.delete_topic(topic: path) + rescue Google::Cloud::NotFoundError => e + Chore.logger.error "Deleting Topic: #{queue_name} failed because #{e}" + end + end + + Chore.prefixed_queue_names + end + + # Collect a list of topics/subscriptions that already exist + # + # @return [Array] + def self.existing_queues + client = pubsub_client + Chore.prefixed_queue_names.select do |queue_name| + begin + client.publisher(queue_name) + client.subscriber(queue_name) + # if both publisher/subscriber successfully load, then assume exists + true + rescue Google::Cloud::NotFoundError + # google api throws Google::Cloud::NotFoundError if topic/subscription does not exist + false + end + end + end + end + end +end diff --git a/lib/chore/queues/pubsub/consumer.rb b/lib/chore/queues/pubsub/consumer.rb new file mode 100644 index 00000000..b1f04849 --- /dev/null +++ b/lib/chore/queues/pubsub/consumer.rb @@ -0,0 +1,144 @@ +require 'google/cloud/pubsub' +require 'chore/duplicate_detector' + +module Chore + module Queues + module PubSub + # GCP Pub/Sub Consumer for Chore. Requests messages from GCP Pub/Sub and passes them to be worked on. + # Also controls acknowledging completed messages within GCP Pub/Sub. + class Consumer < Chore::Consumer + MAX_PUBSUB_POLLING_SIZE = 1000.freeze + DEFAULT_DEADLINE_SECONDS = 600.freeze + + # @param [String] queue_name Name of GCP Pub/Sub topic + # @param [Hash] opts Options + def initialize(queue_name, opts={}) + super(queue_name, opts) + @subscription_name = queue_name + end + + # Ensure that the consumer is capable of running + def verify_connection! + unless subscriber.exists? + raise "Subscription #{@subscription_name} does not exist" + end + end + + # Begins requesting messages from GCP Pub/Sub, which will invoke the +&handler+ over each message + # + # @param [Block] &handler Message handler, used by the calling context (worker) to create & assigns a UnitOfWork + def consume(&handler) + while running? + begin + messages = handle_messages(&handler) + sleep(Chore.config.consumer_sleep_interval) if messages.empty? + rescue => e + Chore.logger.error { "PubSubConsumer#consume: #{e.inspect} #{e.backtrace * "\n"}" } + end + end + end + + # Unimplemented. Rejects the given message from GCP Pub/Sub. + # + # @param [String] message_id Unique ID of the Pub/Sub message + # + # @return nil + def reject(message_id) + # In Pub/Sub, we can simply not acknowledge the message and it will be redelivered + end + + # Acknowledges the given message from the GCP Pub/Sub subscriber + # + # @param [String] message_id Unique ID of the Pub/Sub message + # @param [String] ack_id Acknowledgment ID of the Pub/Sub message + def complete(message_id, ack_id) + Chore.logger.debug "Completing (acknowledging): #{message_id} ack_id: #{ack_id}" + # Find the message by ack_id and acknowledge it + subscriber.acknowledge(ack_id) + end + + # Delays retry of a job by +backoff_calc+ seconds. + # In Pub/Sub, we modify the ack deadline to delay the message + # + # @param [UnitOfWork] item Item to be delayed + # @param [Block] backoff_calc Code that determines the backoff. + def delay(item, backoff_calc) + delay = backoff_calc.call(item) + Chore.logger.debug "Delaying #{item.id} by #{delay} seconds" + + # Find the message and modify its ack deadline + subscriber.modify_ack_deadline(delay, item.receipt_handle) + + return delay + end + + private + + # Requests messages from GCP Pub/Sub, and invokes the provided +&block+ over each one. Afterwards, the :on_fetch + # hook will be invoked, per message + # + # @param [Block] &handler Message handler, passed along by #consume + # + # @return [Array] + def handle_messages(&block) + begin + verify_connection! + rescue => e + Chore.logger.error "There was a problem verifying the connection to the subscription: #{e.message}. Shutting down..." + raise Chore::TerribleMistake + end + + messages = subscriber.pull(max: pubsub_polling_amount) + received_timestamp = Time.now + + messages.each do |message| + begin + unless duplicate_message?(message.message_id, @subscription_name, queue_timeout, received_timestamp) + # delivery_attempt is available but may be nil for older messages + attempt_count = (message.delivery_attempt || 1) - 1 + block.call(message.message_id, message.ack_id, queue_name, queue_timeout, message.data, attempt_count, received_timestamp) + end + Chore.run_hooks_for(:on_fetch, message.ack_id, message.data) + end + end + + messages + end + + # Retrieves the GCP Pub/Sub subscriber object. The method will cache the results to prevent round trips on subsequent calls + # + # @return [Google::Cloud::PubSub::Subscriber] + def subscriber + @subscriber ||= pubsub.subscriber(@subscription_name) + end + + # The ack deadline (in seconds) of the subscriber + # + # @return [Integer] + def queue_timeout + @queue_timeout ||= subscriber.deadline || DEFAULT_DEADLINE_SECONDS # Default to 10 minutes + end + + # GCP Pub/Sub client object + # + # @return [Google::Cloud::PubSub::Project] + def pubsub + @pubsub ||= Chore::Queues::PubSub.pubsub_client + end + + # Maximum number of messages to retrieve on each request from config + # Validates that the value doesn't exceed Pub/Sub's limit of MAX_PUBSUB_POLLING_SIZE messages + # + # @return [Integer] + # @raise [Chore::TerribleMistake] if queue_polling_size exceeds MAX_PUBSUB_POLLING_SIZE + def pubsub_polling_amount + polling_size = Chore.config.queue_polling_size + if polling_size > MAX_PUBSUB_POLLING_SIZE + raise Chore::TerribleMistake, "queue_polling_size (#{polling_size}) exceeds Google Cloud Pub/Sub maximum limit of #{MAX_PUBSUB_POLLING_SIZE} messages" + end + polling_size + end + end + end + end +end diff --git a/lib/chore/queues/pubsub/publisher.rb b/lib/chore/queues/pubsub/publisher.rb new file mode 100644 index 00000000..11293ea1 --- /dev/null +++ b/lib/chore/queues/pubsub/publisher.rb @@ -0,0 +1,47 @@ +require 'chore/publisher' + +module Chore + module Queues + module PubSub + # GCP Pub/Sub Publisher, for writing messages to GCP Pub/Sub from Chore + class Publisher < Chore::Publisher + # @param [Hash] opts Publisher options + def initialize(opts={}) + super + @pubsub_publisher = {} + end + + # Publishes a message to a GCP Pub/Sub topic + # + # @param [String] queue_name Name of the GCP Pub/Sub topic + # @param [Hash] job Job instance definition, will be encoded to JSON + # + # @return [Google::Cloud::PubSub::Message] + def publish(queue_name, job) + publisher = get_publisher(queue_name) + encoded_job = encode_job(job) + publisher.publish(encoded_job) + end + + + private + + # GCP Pub/Sub client object + # + # @return [Google::Cloud::PubSub::Project] + def pubsub + @pubsub ||= Chore::Queues::PubSub.pubsub_client + end + + # Retrieves the GCP Pub/Sub publisher object. The method will cache the results to prevent round trips on subsequent calls + # + # @param [String] name Name of GCP Pub/Sub topic + # + # @return [Google::Cloud::PubSub::Publisher] + def get_publisher(name) + @pubsub_publisher[name] ||= pubsub.publisher(name) + end + end + end + end +end diff --git a/lib/chore/queues/sqs.rb b/lib/chore/queues/sqs.rb index 4b7413c2..836566ee 100644 --- a/lib/chore/queues/sqs.rb +++ b/lib/chore/queues/sqs.rb @@ -1,7 +1,19 @@ module Chore module Queues module SQS + REQUIRED_LIBRARY = "aws-sdk-sqs".freeze + MIN_VERSION = Gem::Version.new("1") + def self.sqs_client + require REQUIRED_LIBRARY + + # Verify compatible version + gem_version = Gem::Version.new(Aws::SQS::GEM_VERSION) + if gem_version < MIN_VERSION + Chore.logger.error "#{REQUIRED_LIBRARY} version #{gem_version} is not supported. Please use version >= #{MIN_VERSION}" if defined?(Chore.logger) + exit + end + Aws::SQS::Client.new( logger: Chore.logger, log_level: Chore.log_level_to_sym, @@ -19,7 +31,7 @@ def self.sqs_client # # @return [Array] def self.create_queues!(halt_on_existing=false) - raise 'You must have atleast one Chore Job configured and loaded before attempting to create queues' unless Chore.prefixed_queue_names.length > 0 + raise RuntimeError.new('You must have at least one Chore Job before attempting to create SQS queues') unless Chore.prefixed_queue_names.length > 0 if halt_on_existing existing = self.existing_queues @@ -34,7 +46,7 @@ def self.create_queues!(halt_on_existing=false) end Chore.prefixed_queue_names.each do |queue_name| - Chore.logger.info "Chore Creating Queue: #{queue_name}" + Chore.logger.info "Chore Creating SQS Queue: #{queue_name}" begin sqs_client.create_queue(queue_name: queue_name) rescue Aws::SQS::Errors::QueueAlreadyExists @@ -51,11 +63,11 @@ def self.create_queues!(halt_on_existing=false) # @return [Array] def self.delete_queues! - raise 'You must have atleast one Chore Job configured and loaded before attempting to create queues' unless Chore.prefixed_queue_names.length > 0 + raise RuntimeError.new('You must have at least one Chore Job before attempting to delete SQS queues') unless Chore.prefixed_queue_names.length > 0 Chore.prefixed_queue_names.each do |queue_name| begin - Chore.logger.info "Chore Deleting Queue: #{queue_name}" + Chore.logger.info "Chore Deleting SQS Queue: #{queue_name}" url = sqs_client.get_queue_url(queue_name: queue_name).queue_url sqs_client.delete_queue(queue_url: url) rescue => e diff --git a/lib/chore/queues/sqs/consumer.rb b/lib/chore/queues/sqs/consumer.rb index 3834e72f..33084b9f 100644 --- a/lib/chore/queues/sqs/consumer.rb +++ b/lib/chore/queues/sqs/consumer.rb @@ -34,8 +34,6 @@ def verify_connection! # Begins requesting messages from SQS, which will invoke the +&handler+ over each message # # @param [Block] &handler Message handler, used by the calling context (worker) to create & assigns a UnitOfWork - # - # @return [Array] def consume(&handler) while running? begin diff --git a/lib/chore/tasks/queues.task b/lib/chore/tasks/queues.task index c0e6ce63..4743529a 100644 --- a/lib/chore/tasks/queues.task +++ b/lib/chore/tasks/queues.task @@ -1,18 +1,35 @@ namespace :chore do - desc <<-DESC.gsub(/^\s+/, '') - Create all defined queues. If the halt_on_existing argument is set (defaults to off) the task will abort if a single - queue already exists without attempting to create any. + namespace :queues do + desc <<-DESC.gsub(/^\s+/, '') + Create all defined SQS queues. + If the halt_on_existing argument is set (defaults to off) the task will abort if a single + queue already exists without attempting to create any. - This flag is specifically provided for our integration testing platform to ensure we don't deploy to an incorrect environment. - DESC - task :create, :halt_on_existing do |t, args| - halt_on_existing = %w(1 true yes t y).include?(args[:halt_on_existing]) + This flag is specifically provided for our integration testing platform to ensure we don't deploy to an incorrect environment. + DESC + task :create_sqs, :halt_on_existing do |t, args| + halt_on_existing = %w(1 true yes t y).include?(args[:halt_on_existing]) + created_queues = Chore::Queues::SQS.create_queues!(halt_on_existing) + Chore.logger.info "Successfully created #{created_queues.length} SQS queues: #{created_queues.join(', ')}" + end - Chore::Queues::SQS.create_queues!(halt_on_existing) - end + desc "Create all defined Pub/Sub topics and subscriptions" + task :create_pubsub, :halt_on_existing do |t, args| + halt_on_existing = %w(1 true yes t y).include?(args[:halt_on_existing]) + created_queues = Chore::Queues::PubSub.create_queues!(halt_on_existing) + Chore.logger.info "Successfully created #{created_queues.length} Pub/Sub topics: #{created_queues.join(', ')}" + end + + desc "Remove all defined SQS queues" + task :remove_sqs do + deleted_queues = Chore::Queues::SQS.delete_queues! + Chore.logger.info "Successfully deleted #{deleted_queues.length} SQS queues: #{deleted_queues.join(', ')}" + end - desc "Remove all defined queues" - task :remove do - Chore::Queues::SQS.delete_queues! + desc "Remove all defined Pub/Sub topics and subscriptions" + task :remove_pubsub do + deleted_queues = Chore::Queues::PubSub.delete_queues! + Chore.logger.info "Successfully deleted #{deleted_queues.length} Pub/Sub topics: #{deleted_queues.join(', ')}" + end end end diff --git a/lib/chore/version.rb b/lib/chore/version.rb index 71be4a0e..8cab0d3e 100644 --- a/lib/chore/version.rb +++ b/lib/chore/version.rb @@ -1,7 +1,7 @@ module Chore module Version #:nodoc: - MAJOR = 4 - MINOR = 7 + MAJOR = 5 + MINOR = 0 PATCH = 0 STRING = [ MAJOR, MINOR, PATCH ].join('.') diff --git a/lib/chore/worker.rb b/lib/chore/worker.rb index d9aef668..1a74270c 100644 --- a/lib/chore/worker.rb +++ b/lib/chore/worker.rb @@ -77,7 +77,7 @@ def start @work.each do |item| return if @stopping begin - item.decoded_message = options[:payload_handler].decode(item.message) + item.decoded_message = options[:payload_handler].decode(item) item.klass = options[:payload_handler].payload_class(item.decoded_message) next if duplicate_work?(item) diff --git a/spec/chore/job_spec.rb b/spec/chore/job_spec.rb index ff54bf0c..433c9497 100644 --- a/spec/chore/job_spec.rb +++ b/spec/chore/job_spec.rb @@ -66,15 +66,15 @@ it 'should call an instance of the queue_options publisher' do args = [1,2,{:h => 'ash'}] TestJob.queue_options(:publisher => Chore::Publisher) - expect_any_instance_of(Chore::Publisher).to receive(:publish).with('test_queue',{:class => 'TestJob',:args => args}).and_return(true) + expect_any_instance_of(Chore::Publisher).to receive(:publish).with('test_queue',{"class" => 'TestJob',"args" => args}).and_return(true) TestJob.perform_async(*args) end it 'calls the around_publish hook with the correct parameters' do args = [1,2,{:h => 'ash'}] - expect(Chore).to receive(:run_hooks_for).with(:around_publish, 'test_queue', {:class => 'TestJob',:args => args}).and_call_original + expect(Chore).to receive(:run_hooks_for).with(:around_publish, 'test_queue', {"class" => 'TestJob',"args" => args}).and_call_original TestJob.queue_options(:publisher => Chore::Publisher) - expect_any_instance_of(Chore::Publisher).to receive(:publish).with('test_queue',{:class => 'TestJob',:args => args}).and_return(true) + expect_any_instance_of(Chore::Publisher).to receive(:publish).with('test_queue',{"class" => 'TestJob',"args" => args}).and_return(true) TestJob.perform_async(*args) end end diff --git a/spec/chore/queues/pubsub/consumer_spec.rb b/spec/chore/queues/pubsub/consumer_spec.rb new file mode 100644 index 00000000..845f0384 --- /dev/null +++ b/spec/chore/queues/pubsub/consumer_spec.rb @@ -0,0 +1,233 @@ +require 'spec_helper' + +describe Chore::Queues::PubSub::Consumer do + include_context 'fake pubsub objects' + + let(:options) { {} } + let(:consumer) { Chore::Queues::PubSub::Consumer.new(queue_name) } + let(:job) { {'class' => 'TestJob', 'args'=>[1,2,'3']} } + let(:backoff_func) { Proc.new { 2 + 2 } } + + # Since a message handler is required (but not validated), this convenience method lets us + # effectively stub the block. + def consume(&block) + block = Proc.new{} unless block_given? + consumer.consume(&block) + end + + before do + allow(Chore::Queues::PubSub).to receive(:pubsub_client).and_return(pubsub_client) + allow(subscriber).to receive(:pull).and_return([received_message]) + end + + describe "consuming messages" do + before do + allow(consumer).to receive(:running?).and_return(true, false) + end + + context "should create objects for interacting with the Pub/Sub API" do + it 'should create a pubsub client' do + expect(subscriber).to receive(:pull) + consume + end + + it "should only create a pubsub client when one doesn't exist" do + allow(consumer).to receive(:running?).and_return(true, true, true, true, false, true, true) + expect(Chore::Queues::PubSub).to receive(:pubsub_client).exactly(:once) + consume + end + + it 'should look up the subscriber based on the queue name' do + expect(pubsub_client).to receive(:subscriber).with(subscription_name) + consume + end + + it 'should create a subscriber object' do + expect(consumer.send(:subscriber)).to_not be_nil + consume + end + end + + context "should receive a message from the subscriber" do + it 'should use the default size of 10 when no queue_polling_size is specified' do + expect(subscriber).to receive(:pull).with(max: 10).and_return([received_message]) + consume + end + + it 'should respect the queue_polling_size when specified' do + allow(Chore.config).to receive(:queue_polling_size).and_return(5) + expect(subscriber).to receive(:pull).with(max: 5) + consume + end + + it 'should allow exactly 1000 messages (Pub/Sub maximum)' do + allow(Chore.config).to receive(:queue_polling_size).and_return(1000) + expect(subscriber).to receive(:pull).with(max: 1000) + consume + end + + it 'should raise error when queue_polling_size exceeds Pub/Sub max limit of 1000 messages' do + allow(Chore.config).to receive(:queue_polling_size).and_return(2000) + expect { consumer.send(:pubsub_polling_amount) }.to raise_error(Chore::TerribleMistake, /queue_polling_size \(2000\) exceeds Google Cloud Pub\/Sub maximum limit of 1000 messages/) + end + end + + context 'with no messages' do + before do + allow(consumer).to receive(:handle_messages).and_return([]) + end + + it 'should sleep' do + expect(consumer).to receive(:sleep).with(1) + consume + end + end + + context 'with messages' do + before do + allow(consumer).to receive(:duplicate_message?).and_return(false) + allow(subscriber).to receive(:pull).and_return([received_message]) + allow(Time).to receive(:now).and_return(Time.utc(2024, 5, 10, 12, 0, 0)) + end + + it "should check the uniqueness of the message" do + expect(consumer).to receive(:duplicate_message?) + consume + end + + let(:received_timestamp) { Time.utc(2024, 5, 10, 12, 0, 0) } + + it "should yield the message to the handler block" do + expect { |b| consume(&b) } + .to yield_with_args( + received_message.message_id, + received_message.ack_id, + queue_name, + subscriber.deadline, + received_message.data, + 0, # delivery_attempt - 1 (1 - 1 = 0) + received_timestamp + ) + end + + it 'should not sleep' do + expect(consumer).to_not receive(:sleep) + consume + end + + it 'should not yield for a dupe message' do + allow(consumer).to receive(:duplicate_message?).and_return(true) + expect {|b| consume(&b) }.not_to yield_control + end + + context 'with delivery attempt count' do + it 'should calculate attempt count as delivery_attempt - 1' do + allow(received_message).to receive(:delivery_attempt).and_return(3) + expect { |b| consume(&b) } + .to yield_with_args( + received_message.message_id, + received_message.ack_id, + queue_name, + subscriber.deadline, + received_message.data, + 2, # delivery_attempt - 1 (3 - 1 = 2) + received_timestamp + ) + end + + it 'should default to attempt count of 0' do + allow(received_message).to receive(:delivery_attempt).and_return(nil) + expect { |b| consume(&b) } + .to yield_with_args( + received_message.message_id, + received_message.ack_id, + queue_name, + subscriber.deadline, + received_message.data, + 0, # (nil || 1) - 1 = 0 + received_timestamp + ) + end + end + end + + context "failure scenarios" do + it 'should raise exception on subscriber lookup failure' do + allow(consumer).to receive(:verify_connection!).and_raise(Google::Cloud::NotFoundError.new('Subscription not found')) + expect { consume }.to raise_error(Chore::TerribleMistake) + end + + it 'should raise exception for gcp credential failure' do + allow(consumer).to receive(:verify_connection!).and_raise(Google::Cloud::PermissionDeniedError.new('Permission denied')) + expect { consume }.to raise_error(Chore::TerribleMistake) + end + + it 'should raise exception during unexpected failure' do + allow(consumer).to receive(:verify_connection!).and_raise(StandardError.new('Connection error')) + expect { consume }.to raise_error(Chore::TerribleMistake) + end + end + end + + describe "completing work" do + it 'acknowledges the message in the subscriber' do + expect(subscriber).to receive(:acknowledge).with(received_message.ack_id) + consumer.complete(received_message.message_id, received_message.ack_id) + end + + it 'handles missing message gracefully' do + expect { consumer.complete('unknown-id', 'unknown-ack') }.not_to raise_error + end + end + + describe '#delay' do + let(:item) { Chore::UnitOfWork.new(received_message.message_id, received_message.ack_id, queue_name, 600, received_message.data, 0, consumer) } + + it 'modifies the ack deadline of the message' do + delay_seconds = backoff_func.call(item) + expect(subscriber).to receive(:modify_ack_deadline).with(delay_seconds, received_message.ack_id) + consumer.delay(item, backoff_func) + end + + it 'returns the delay value' do + delay_seconds = backoff_func.call(item) + result = consumer.delay(item, backoff_func) + expect(result).to eq(delay_seconds) + end + + it 'handles missing message gracefully' do + unknown_item = Chore::UnitOfWork.new('unknown-id', 'unknown-ack', queue_name, 600, 'data', 0, consumer) + expect { consumer.delay(unknown_item, backoff_func) }.not_to raise_error + end + end + + describe '#reject' do + it 'should be implemented but do nothing (Pub/Sub handles redelivery automatically)' do + expect { consumer.reject('message-id') }.not_to raise_error + end + end + + + describe '#verify_connection!' do + it 'should verify subscriber exists' do + expect(subscriber).to receive(:exists?).and_return(true) + expect { consumer.verify_connection! }.not_to raise_error + end + + it 'should raise error if subscriber does not exist' do + allow(subscriber).to receive(:exists?).and_return(false) + expect { consumer.verify_connection! }.to raise_error + end + end + + describe 'queue timeout' do + it 'should return subscriber ack deadline' do + expect(consumer.send(:queue_timeout)).to eq(600) + end + + it 'should default to 600 seconds if deadline is nil' do + allow(subscriber).to receive(:deadline).and_return(nil) + expect(consumer.send(:queue_timeout)).to eq(600) + end + end +end diff --git a/spec/chore/queues/pubsub/publisher_spec.rb b/spec/chore/queues/pubsub/publisher_spec.rb new file mode 100644 index 00000000..2208118b --- /dev/null +++ b/spec/chore/queues/pubsub/publisher_spec.rb @@ -0,0 +1,60 @@ +require 'spec_helper' + +describe Chore::Queues::PubSub::Publisher do + include_context 'fake pubsub objects' + + let(:spec_publisher) { Chore::Queues::PubSub::Publisher.new } + let(:job) { {'class' => 'TestJob', 'args'=>[1,2,'3']} } + let(:publish_result) { double('Google::Cloud::PubSub::Message', message_id: message_id) } + + before(:each) do + allow(Chore::Queues::PubSub).to receive(:pubsub_client).and_return(pubsub_client) + allow(publisher).to receive(:publish).and_return(publish_result) + end + + it 'should configure pubsub client' do + expect(Chore::Queues::PubSub).to receive(:pubsub_client) + spec_publisher.publish(queue_name, job) + end + + it 'should not create a new Pub/Sub client before every publish' do + expect(Chore::Queues::PubSub).to receive(:pubsub_client).once + 2.times { spec_publisher.send(:get_publisher, queue_name) } + end + + it 'should lookup the topic when publishing' do + expect(pubsub_client).to receive(:publisher).with(queue_name).and_return(publisher) + spec_publisher.publish(queue_name, job) + end + + it 'should publish an encoded message to the specified topic' do + expect(publisher).to receive(:publish).with(job.to_json) + spec_publisher.publish(queue_name, job) + end + + it 'should lookup multiple topics if specified' do + second_queue_name = queue_name + '2' + second_publisher = double('Google::Cloud::PubSub::Publisher') + + expect(pubsub_client).to receive(:publisher).with(queue_name).and_return(publisher) + expect(pubsub_client).to receive(:publisher).with(second_queue_name).and_return(second_publisher) + expect(publisher).to receive(:publish) + expect(second_publisher).to receive(:publish) + + spec_publisher.publish(queue_name, job) + spec_publisher.publish(second_queue_name, job) + end + + it 'should only lookup a named topic once' do + expect(pubsub_client).to receive(:publisher).with(queue_name).once.and_return(publisher) + expect(publisher).to receive(:publish).exactly(4).times + 4.times { spec_publisher.publish(queue_name, job) } + end + + describe 'encoding' do + it 'should encode job to JSON' do + encoded_job = spec_publisher.send(:encode_job, job) + expect(encoded_job).to eq(job.to_json) + end + end +end diff --git a/spec/chore/queues/pubsub_spec.rb b/spec/chore/queues/pubsub_spec.rb new file mode 100644 index 00000000..7d881692 --- /dev/null +++ b/spec/chore/queues/pubsub_spec.rb @@ -0,0 +1,130 @@ +require 'spec_helper' + +describe Chore::Queues::PubSub do + include_context 'fake pubsub objects' + + let(:topic_admin) { double('topic_admin') } + let(:subscription_admin) { double('subscription_admin') } + let(:topic_path) { "projects/test/topics/#{queue_name}" } + let(:subscription_path) { "projects/test/subscriptions/#{subscription_name}" } + + before(:each) do + allow(Chore::Queues::PubSub).to receive(:pubsub_client).and_return(pubsub_client) + allow(Chore).to receive(:prefixed_queue_names).and_return([queue_name]) + end + + context "when managing queues" do + before(:each) do + allow(pubsub_client).to receive(:topic_admin).and_return(topic_admin) + allow(pubsub_client).to receive(:subscription_admin).and_return(subscription_admin) + allow(pubsub_client).to receive(:topic_path).with(queue_name).and_return(topic_path) + allow(pubsub_client).to receive(:subscription_path).with(subscription_name).and_return(subscription_path) + end + it 'should create topics and subscriptions that are defined in its internal job name list' do + expect(topic_admin).to receive(:create_topic).with(name: topic_path).and_return(topic) + expect(subscription_admin).to receive(:create_subscription).with( + name: subscription_path, + topic: topic_path + ) + Chore::Queues::PubSub.create_queues! + end + + it 'should delete topics and subscriptions that are defined in its internal job name list' do + expect(subscription_admin).to receive(:delete_subscription).with(subscription: subscription_path) + expect(topic_admin).to receive(:delete_topic).with(topic: topic_path) + + Chore::Queues::PubSub.delete_queues! + end + + context 'when handling delete errors' do + it 'should continue deleting topic even if subscription delete fails' do + allow(subscription_admin).to receive(:delete_subscription).and_raise(Google::Cloud::NotFoundError.new('Subscription not found')) + expect(topic_admin).to receive(:delete_topic).with(topic: topic_path) + expect(Chore.logger).to receive(:error).with("Deleting Subscription: #{queue_name} failed because Subscription not found") + + Chore::Queues::PubSub.delete_queues! + end + + it 'should continue even if topic delete fails' do + expect(subscription_admin).to receive(:delete_subscription).with(subscription: subscription_path) + allow(topic_admin).to receive(:delete_topic).and_raise(Google::Cloud::NotFoundError.new('Topic not found')) + expect(Chore.logger).to receive(:error).with("Deleting Topic: #{queue_name} failed because Topic not found") + + Chore::Queues::PubSub.delete_queues! + end + end + + context 'and checking for existing queues' do + it 'checks for existing topics/subscriptions' do + allow(pubsub_client).to receive(:topic_path).with(queue_name).and_return(topic_path) + allow(pubsub_client).to receive(:subscription_path).with(subscription_name).and_return(subscription_path) + expect(described_class).to receive(:existing_queues).and_return([]) + # Since existing_queues returns [], create_queues! will proceed to create topics/subscriptions + allow(topic_admin).to receive(:create_topic).and_return(topic) + allow(subscription_admin).to receive(:create_subscription) + Chore::Queues::PubSub.create_queues!(true) + end + + it 'raises an error if a topic/subscription does exist' do + allow(described_class).to receive(:existing_queues).and_return([queue_name]) + expect{Chore::Queues::PubSub.create_queues!(true)}.to raise_error(RuntimeError) + end + + it 'does not raise an error if a topic/subscription does not exist' do + allow(topic_admin).to receive(:create_topic).and_return(topic) + allow(subscription_admin).to receive(:create_subscription) + allow(described_class).to receive(:existing_queues).and_return([]) + expect{Chore::Queues::PubSub.create_queues!(true)}.not_to raise_error + end + end + + context 'when handling already exists errors' do + it 'should continue when topic already exists' do + allow(topic_admin).to receive(:create_topic).and_raise(Google::Cloud::AlreadyExistsError.new('Topic already exists')) + allow(subscription_admin).to receive(:create_subscription).and_return(subscriber) + expect { Chore::Queues::PubSub.create_queues! }.not_to raise_error + end + + it 'should continue when subscription already exists' do + allow(topic_admin).to receive(:create_topic).and_return(topic) + allow(subscription_admin).to receive(:create_subscription).and_raise(Google::Cloud::AlreadyExistsError.new('Subscription already exists')) + expect { Chore::Queues::PubSub.create_queues! }.not_to raise_error + end + + it 'should continue when both topic and subscription already exist' do + allow(topic_admin).to receive(:create_topic).and_raise(Google::Cloud::AlreadyExistsError.new('Topic already exists')) + allow(subscription_admin).to receive(:create_subscription).and_raise(Google::Cloud::AlreadyExistsError.new('Subscription already exists')) + expect { Chore::Queues::PubSub.create_queues! }.not_to raise_error + end + end + end + + describe '.existing_queues' do + it 'returns queues that exist when both publisher and subscriber calls succeed' do + allow(pubsub_client).to receive(:publisher).with(queue_name).and_return(topic) + allow(pubsub_client).to receive(:subscriber).with(queue_name).and_return(subscriber) + expect(described_class.existing_queues).to eq([queue_name]) + end + + it 'filters out queues when publisher call raises NotFoundError' do + allow(pubsub_client).to receive(:publisher).with(queue_name).and_raise(Google::Cloud::NotFoundError.new('Topic not found')) + expect(described_class.existing_queues).to eq([]) + end + + it 'filters out queues when subscriber call raises NotFoundError' do + allow(pubsub_client).to receive(:publisher).with(queue_name).and_return(topic) + allow(pubsub_client).to receive(:subscriber).with(queue_name).and_raise(Google::Cloud::NotFoundError.new('Subscription not found')) + expect(described_class.existing_queues).to eq([]) + end + + it 'filters out queues when publisher call raises Google::Cloud::NotFoundError' do + allow(pubsub_client).to receive(:publisher).with(queue_name).and_raise(Google::Cloud::NotFoundError.new('Topic not found')) + expect(described_class.existing_queues).to eq([]) + end + + it 'handles other errors gracefully' do + allow(pubsub_client).to receive(:publisher).with(queue_name).and_raise(Google::Cloud::NotFoundError.new('Connection error')) + expect(described_class.existing_queues).to eq([]) + end + end +end diff --git a/spec/chore/strategies/worker/forked_worker_strategy_spec.rb b/spec/chore/strategies/worker/forked_worker_strategy_spec.rb index 8266f318..f2a4e4ec 100644 --- a/spec/chore/strategies/worker/forked_worker_strategy_spec.rb +++ b/spec/chore/strategies/worker/forked_worker_strategy_spec.rb @@ -16,7 +16,7 @@ nil, 'test', job_timeout, - Chore::Encoder::JsonEncoder.encode(TestJob.job_hash([1,2,"3"])), + Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(TestJob, [1,2,"3"])), 0, consumer ) diff --git a/spec/chore/strategies/worker/helpers/work_distributor_spec.rb b/spec/chore/strategies/worker/helpers/work_distributor_spec.rb index fd7fae8e..cbd475f0 100644 --- a/spec/chore/strategies/worker/helpers/work_distributor_spec.rb +++ b/spec/chore/strategies/worker/helpers/work_distributor_spec.rb @@ -11,7 +11,7 @@ SecureRandom.uuid, 'test', 60, - Chore::Encoder::JsonEncoder.encode(TestJob.job_hash([1,2,"3"])), + Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(TestJob, [1,2,"3"])), 0 ) end @@ -89,7 +89,7 @@ SecureRandom.uuid, 'test', 60, - Chore::Encoder::JsonEncoder.encode(TestJob.job_hash([1,2,"3"])), + Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(TestJob, [1,2,"3"])), 0 ) worker2 = Chore::Strategy::WorkerInfo.new(2) diff --git a/spec/chore/strategies/worker/single_worker_strategy_spec.rb b/spec/chore/strategies/worker/single_worker_strategy_spec.rb index 33f74312..bbc0bd29 100644 --- a/spec/chore/strategies/worker/single_worker_strategy_spec.rb +++ b/spec/chore/strategies/worker/single_worker_strategy_spec.rb @@ -3,7 +3,7 @@ describe Chore::Strategy::SingleWorkerStrategy do let(:manager) { double('Manager') } let(:job_timeout) { 60 } - let(:job) { Chore::UnitOfWork.new(SecureRandom.uuid, nil, 'test', job_timeout, Chore::Encoder::JsonEncoder.encode(TestJob.job_hash([1,2,"3"])), 0) } + let(:job) { Chore::UnitOfWork.new(SecureRandom.uuid, nil, 'test', job_timeout, Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(TestJob, [1,2,"3"])), 0) } subject { described_class.new(manager) } describe '#stop!' do diff --git a/spec/chore/worker_spec.rb b/spec/chore/worker_spec.rb index 586121a1..9b3e22d8 100644 --- a/spec/chore/worker_spec.rb +++ b/spec/chore/worker_spec.rb @@ -43,11 +43,11 @@ def perform(first, second) let(:consumer) { double('consumer', :complete => nil, :reject => nil) } let(:job_args) { [1,2,'3'] } - let(:job) { SimpleJob.job_hash(job_args) } + let(:job) { Chore::PayloadHandler.job_hash(SimpleJob, job_args) } it 'should use a default payload handler' do worker = Chore::Worker.new - expect(worker.options[:payload_handler]).to eq(Chore::Job) + expect(worker.options[:payload_handler]).to eq(Chore::PayloadHandler) end shared_examples_for "a worker" do @@ -82,11 +82,11 @@ def perform(first, second) context 'when the value being deduped on is unique' do let(:job_args) { [rand,2,'3'] } let(:encoded_job) { Chore::Encoder::JsonEncoder.encode(job) } - let(:job) { SimpleDedupeJob.job_hash(job_args) } + let(:job) { Chore::PayloadHandler.job_hash(SimpleDedupeJob, job_args) } it 'should call complete for each unique value' do allow(consumer).to receive(:duplicate_message?).and_return(false) work = [] - work << Chore::UnitOfWork.new(1, nil, 'dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(SimpleDedupeJob.job_hash([rand,2,'3'])), 0, consumer) + work << Chore::UnitOfWork.new(1, nil, 'dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(SimpleDedupeJob, [rand,2,'3'])), 0, consumer) expect(SimpleDedupeJob).to receive(:perform).exactly(1).times expect(consumer).to receive(:complete).exactly(1).times Chore::Worker.start(work, {:payload_handler => payload_handler}) @@ -96,9 +96,9 @@ def perform(first, second) context 'when the dedupe lambda does not take the same number of arguments as perform' do it 'should raise an error and not complete the job' do work = [] - work << Chore::UnitOfWork.new(1, nil, 'invalid_dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(InvalidDedupeJob.job_hash([rand,2,'3'])), 0, consumer) - work << Chore::UnitOfWork.new(2, nil, 'invalid_dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(InvalidDedupeJob.job_hash([rand,2,'3'])), 0, consumer) - work << Chore::UnitOfWork.new(1, nil, 'invalid_dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(InvalidDedupeJob.job_hash([rand,2,'3'])), 0, consumer) + work << Chore::UnitOfWork.new(1, nil, 'invalid_dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(InvalidDedupeJob, [rand,2,'3'])), 0, consumer) + work << Chore::UnitOfWork.new(2, nil, 'invalid_dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(InvalidDedupeJob, [rand,2,'3'])), 0, consumer) + work << Chore::UnitOfWork.new(1, nil, 'invalid_dedupe_test', 60, Chore::Encoder::JsonEncoder.encode(Chore::PayloadHandler.job_hash(InvalidDedupeJob, [rand,2,'3'])), 0, consumer) expect(consumer).not_to receive(:complete) Chore::Worker.start(work, {:payload_handler => payload_handler}) end @@ -108,7 +108,7 @@ def perform(first, second) describe "with default payload handler" do let(:encoded_job) { Chore::Encoder::JsonEncoder.encode(job) } - let(:payload_handler) { Chore::Job } + let(:payload_handler) { Chore::PayloadHandler } let(:payload) {job_args} it_behaves_like "a worker" end diff --git a/spec/support/queues/pubsub/fake_objects.rb b/spec/support/queues/pubsub/fake_objects.rb new file mode 100644 index 00000000..7a08461c --- /dev/null +++ b/spec/support/queues/pubsub/fake_objects.rb @@ -0,0 +1,73 @@ +describe Chore::Queues::PubSub do + RSpec.shared_context 'fake pubsub objects' do + let(:queue_name) { 'test_queue' } + let(:subscription_name) { queue_name } + let(:project_id) { 'test-project' } + + let(:message_data) { {'class' => 'TestJob', 'args' => [1, 2, '3']}.to_json } + let(:message_id) { 'message-id-123' } + let(:ack_id) { 'ack-id-456' } + + let(:received_message) do + double('Google::Cloud::PubSub::ReceivedMessage', + message_id: message_id, + ack_id: ack_id, + data: message_data, + delivery_attempt: 1 + ) + end + + let(:topic) do + double('Google::Cloud::PubSub::Topic', + name: queue_name, + exists?: true, + publish: received_message, + create_subscription: subscriber, + delete: true + ) + end + + let(:publisher) do + double('Google::Cloud::PubSub::Publisher', + name: queue_name, + exists?: true, + publish: received_message, + create_subscription: subscriber, + delete: true + ) + end + + let(:subscriber) do + double('Google::Cloud::PubSub::Subscriber', + name: subscription_name, + exists?: true, + deadline: 600, + pull: [received_message], + delete: true, + acknowledge: true, + modify_ack_deadline: true + ) + end + + let(:pubsub_client) do + topic_admin = double('topic_admin', + create_topic: topic, + delete_topic: true + ) + subscription_admin = double('subscription_admin', + create_subscription: subscriber, + delete_subscription: true + ) + + double('Google::Cloud::PubSub::Project', + project_id: project_id, + publisher: publisher, + subscriber: subscriber, + topic_admin: topic_admin, + subscription_admin: subscription_admin, + topic_path: "projects/#{project_id}/topics/#{queue_name}", + subscription_path: "projects/#{project_id}/subscriptions/#{queue_name}" + ) + end + end +end