From f0697671938b5ad53257cefccd2be6e7726acd10 Mon Sep 17 00:00:00 2001 From: vkghub Date: Thu, 29 Jan 2026 11:12:04 -0500 Subject: [PATCH 01/14] handle graceful shutdown of Bunny (AMQP client) --- lib/event_source.rb | 21 +++++++++++ lib/event_source/configure/config.rb | 3 +- lib/event_source/connection_manager.rb | 37 +++++++++++++++++++ .../protocols/amqp/bunny_queue_proxy.rb | 19 +++++++++- lib/event_source/queue.rb | 16 ++++++++ lib/event_source/railtie.rb | 31 ++++++++++++++++ 6 files changed, 125 insertions(+), 2 deletions(-) diff --git a/lib/event_source.rb b/lib/event_source.rb index 5b3d8429..5b901910 100644 --- a/lib/event_source.rb +++ b/lib/event_source.rb @@ -112,6 +112,27 @@ def register_subscriber(subscriber_klass) def register_publisher(subscriber_klass) boot_registry.register_publisher(subscriber_klass) end + + def amqp_inflight_count + @amqp_inflight_mutex ||= Mutex.new + @amqp_inflight_mutex.synchronize { @amqp_inflight_count ||= 0 } + end + + def increment_amqp_inflight + @amqp_inflight_mutex ||= Mutex.new + @amqp_inflight_mutex.synchronize do + @amqp_inflight_count ||= 0 + @amqp_inflight_count += 1 + end + end + + def decrement_amqp_inflight + @amqp_inflight_mutex ||= Mutex.new + @amqp_inflight_mutex.synchronize do + @amqp_inflight_count ||= 0 + @amqp_inflight_count = [@amqp_inflight_count - 1, 0].max + end + end end class EventSourceLogger diff --git a/lib/event_source/configure/config.rb b/lib/event_source/configure/config.rb index fbdea5cc..784eb9b7 100644 --- a/lib/event_source/configure/config.rb +++ b/lib/event_source/configure/config.rb @@ -10,11 +10,12 @@ class Config def initialize @log_level = :warn + @shutdown_timeouts = { amqp_drain: 15, http_drain: 15 } end # TODO: add default for pub_sub_root attr_writer :pub_sub_root, :protocols, :server_configurations - attr_accessor :app_name, :log_level + attr_accessor :app_name, :log_level, :auto_shutdown, :shutdown_timeouts def load_protocols @protocols.each do |protocol| diff --git a/lib/event_source/connection_manager.rb b/lib/event_source/connection_manager.rb index 4af587f0..9c33cf53 100644 --- a/lib/event_source/connection_manager.rb +++ b/lib/event_source/connection_manager.rb @@ -160,6 +160,43 @@ def drop_connection(connection_uri) connections end + # Cancel AMQP consumers and wait briefly for drain + def cancel_amqp_consumers!(timeout: 5) + logger.info "Cancelling AMQP consumers prior to shutdown" + logger.info "AMQP inflight handlers (before_cancel): #{EventSource.amqp_inflight_count}" + + connections_for(:amqp).each do |connection| + connection.channels.each_value do |channel| + channel.subscribe_operations.each_value do |sub_op| + subject = sub_op.subject + if subject.respond_to?(:cancel_consumers!) + subject.cancel_consumers! + end + end + end + end + + start = Time.now + loop do + inflight = EventSource.amqp_inflight_count + any_consumers = connections_for(:amqp).any? do |connection| + connection.channels.values.any? do |channel| + begin + channel.channel_proxy.subject.any_consumers? + rescue StandardError + false + end + end + end + logger.info "AMQP inflight handlers (drain): #{inflight}, any_consumers=#{any_consumers}" + break if inflight == 0 && !any_consumers + break if (Time.now - start) >= timeout + sleep 0.1 + end + logger.info "AMQP consumer cancellation complete" + logger.info "AMQP inflight handlers (end): #{EventSource.amqp_inflight_count}" + end + private # Find connection proxy class for given protocol diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index 7416dc8e..cd708d31 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -91,7 +91,7 @@ def spawn_thread(options) end def add_consumer(subscriber_klass, options) - @subject.subscribe(options) do |delivery_info, metadata, payload| + consumer = @subject.subscribe(options) do |delivery_info, metadata, payload| on_receive_message( subscriber_klass, delivery_info, @@ -99,6 +99,7 @@ def add_consumer(subscriber_klass, options) payload ) end + @consumers << consumer if consumer end def convert_to_subscribe_options(options) @@ -124,6 +125,7 @@ def on_receive_message( metadata, payload ) + EventSource.increment_amqp_inflight logger.debug '**************************' logger.debug subscriber_klass.inspect logger.debug delivery_info.inspect @@ -158,6 +160,7 @@ def on_receive_message( logger.error "Bunny Consumer Error \n message: #{e.message} \n backtrace: #{e.backtrace.join("\n")}" ensure subscriber = nil + EventSource.decrement_amqp_inflight end # Decodes the given payload based on the `contentEncoding` specified in the AsyncAPI *_subscribe.yml message bindings. @@ -247,6 +250,20 @@ def channel_item_queue_bindings_for(bindings) def exchange_name_from_queue(queue_name) queue_name.match(/^\w+\.(.+)/)[1] end + + public + + # Cancel all registered consumers for this queue + def cancel_consumers! + @consumers.each do |consumer| + begin + consumer.cancel + rescue StandardError => e + logger.info "Consumer cancellation error: #{e.message}" + end + end + @consumers.clear + end end end end diff --git a/lib/event_source/queue.rb b/lib/event_source/queue.rb index bafeba03..7ed1301b 100644 --- a/lib/event_source/queue.rb +++ b/lib/event_source/queue.rb @@ -49,6 +49,22 @@ def closed? @subject.closed? end + # Are there any items left to process in the queue? + # @return [Boolean] + def empty? + @subject.empty? + end + + # Current number of items waiting in the queue + # @return [Integer] + def size + if @subject.respond_to?(:length) + @subject.length + else + @subject.size + end + end + # Register an action to be performed, with a resolver class and key. def register_action(resolver, key) @registered_actions << [resolver, key] diff --git a/lib/event_source/railtie.rb b/lib/event_source/railtie.rb index 77dcc66b..493b41d0 100644 --- a/lib/event_source/railtie.rb +++ b/lib/event_source/railtie.rb @@ -1,9 +1,40 @@ # frozen_string_literal: true +require 'logger' + module EventSource # :nodoc: module Railtie Rails::Application::Finisher.initializer "event_source.boot", after: :finisher_hook do + logger = Logger.new($stdout) + logger.progname = 'EventSource graceful shutdown' + timeouts = EventSource.config.shutdown_timeouts || {} + amqp_timeout = timeouts[:amqp_drain] || 5 + + # Perform shutdown work outside of trap/at_exit context to avoid + # ThreadError from mutex operations within Bunny (AMQP client). + shutdown = lambda do |reason| + Thread.new do + begin + logger.info "#{reason}, starting graceful shutdown" + logger.info "AMQP inflight handlers at shutdown start: #{EventSource.amqp_inflight_count}" + cm = EventSource::ConnectionManager.instance + # Stop consuming and allow in-flight handlers to drain briefly + cm.cancel_amqp_consumers!(timeout: amqp_timeout) + cm.drop_connections_for(:amqp) + rescue => e + logger.error "graceful shutdown error: #{e.class}: #{e.message}" + end + end.join + end + + if EventSource.config.auto_shutdown + at_exit { shutdown.call('at_exit received') } + + %w[TERM INT].each do |sig| + Signal.trap(sig) { shutdown.call("signal=#{sig} received") } + end + end EventSource.initialize! end end From 942f84c84f3f9b620d5fedce4c5315ff078c282b Mon Sep 17 00:00:00 2001 From: vkghub Date: Thu, 29 Jan 2026 18:21:02 -0500 Subject: [PATCH 02/14] rearrange methods --- lib/event_source/connection_manager.rb | 17 ++++++++---- .../protocols/amqp/bunny_queue_proxy.rb | 26 +++++++++---------- lib/event_source/queue.rb | 6 ----- 3 files changed, 24 insertions(+), 25 deletions(-) diff --git a/lib/event_source/connection_manager.rb b/lib/event_source/connection_manager.rb index 9c33cf53..5504d08f 100644 --- a/lib/event_source/connection_manager.rb +++ b/lib/event_source/connection_manager.rb @@ -160,7 +160,7 @@ def drop_connection(connection_uri) connections end - # Cancel AMQP consumers and wait briefly for drain + # Cancel AMQP consumers and wait briefly for drain def cancel_amqp_consumers!(timeout: 5) logger.info "Cancelling AMQP consumers prior to shutdown" logger.info "AMQP inflight handlers (before_cancel): #{EventSource.amqp_inflight_count}" @@ -176,6 +176,17 @@ def cancel_amqp_consumers!(timeout: 5) end end + wait_for_amqp_drain(timeout) + + logger.info "AMQP consumer cancellation complete" + logger.info "AMQP inflight handlers (end): #{EventSource.amqp_inflight_count}" + end + + private + + def wait_for_amqp_drain(timeout) + logger.info "Waiting for AMQP inflight handlers to drain" + start = Time.now loop do inflight = EventSource.amqp_inflight_count @@ -193,12 +204,8 @@ def cancel_amqp_consumers!(timeout: 5) break if (Time.now - start) >= timeout sleep 0.1 end - logger.info "AMQP consumer cancellation complete" - logger.info "AMQP inflight handlers (end): #{EventSource.amqp_inflight_count}" end - private - # Find connection proxy class for given protocol # @param [Symbol] protocol the protocol name, `:http` or `:amqp` # @return [Class] Protocol Specific Connection Proxy Class diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index cd708d31..e80b7284 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -201,6 +201,18 @@ def method_missing(name, *args) @subject.send(name, *args) end + # Cancel all registered consumers for this queue + def cancel_consumers! + @consumers.each do |consumer| + begin + consumer.cancel + rescue StandardError => e + logger.info "Consumer cancellation error: #{e.message}" + end + end + @consumers.clear + end + private def subscriber_klass_name_to_suffix(subscriber_klass) @@ -250,20 +262,6 @@ def channel_item_queue_bindings_for(bindings) def exchange_name_from_queue(queue_name) queue_name.match(/^\w+\.(.+)/)[1] end - - public - - # Cancel all registered consumers for this queue - def cancel_consumers! - @consumers.each do |consumer| - begin - consumer.cancel - rescue StandardError => e - logger.info "Consumer cancellation error: #{e.message}" - end - end - @consumers.clear - end end end end diff --git a/lib/event_source/queue.rb b/lib/event_source/queue.rb index 7ed1301b..00b57ad7 100644 --- a/lib/event_source/queue.rb +++ b/lib/event_source/queue.rb @@ -49,12 +49,6 @@ def closed? @subject.closed? end - # Are there any items left to process in the queue? - # @return [Boolean] - def empty? - @subject.empty? - end - # Current number of items waiting in the queue # @return [Integer] def size From dbabe5c281cdd871be4b0761190693b7e8e7da68 Mon Sep 17 00:00:00 2001 From: vkghub Date: Tue, 3 Feb 2026 12:53:16 -0500 Subject: [PATCH 03/14] add more changes and specs --- lib/event_source.rb | 26 +++--- lib/event_source/channel.rb | 7 ++ lib/event_source/connection_manager.rb | 60 +++++++------ .../protocols/amqp/bunny_queue_proxy.rb | 4 +- lib/event_source/railtie.rb | 5 +- spec/event_source/connection_manager_spec.rb | 88 +++++++++++++++++++ 6 files changed, 144 insertions(+), 46 deletions(-) diff --git a/lib/event_source.rb b/lib/event_source.rb index 5b901910..7597298e 100644 --- a/lib/event_source.rb +++ b/lib/event_source.rb @@ -113,24 +113,24 @@ def register_publisher(subscriber_klass) boot_registry.register_publisher(subscriber_klass) end - def amqp_inflight_count - @amqp_inflight_mutex ||= Mutex.new - @amqp_inflight_mutex.synchronize { @amqp_inflight_count ||= 0 } + def inflight_messages_count + @inflight_mutex ||= Mutex.new + @inflight_mutex.synchronize { @inflight_messages_count ||= 0 } end - def increment_amqp_inflight - @amqp_inflight_mutex ||= Mutex.new - @amqp_inflight_mutex.synchronize do - @amqp_inflight_count ||= 0 - @amqp_inflight_count += 1 + def increment_inflight_messages + @inflight_mutex ||= Mutex.new + @inflight_mutex.synchronize do + @inflight_messages_count ||= 0 + @inflight_messages_count += 1 end end - def decrement_amqp_inflight - @amqp_inflight_mutex ||= Mutex.new - @amqp_inflight_mutex.synchronize do - @amqp_inflight_count ||= 0 - @amqp_inflight_count = [@amqp_inflight_count - 1, 0].max + def decrement_inflight_messages + @inflight_mutex ||= Mutex.new + @inflight_mutex.synchronize do + @inflight_messages_count ||= 0 + @inflight_messages_count = [@inflight_messages_count - 1, 0].max end end end diff --git a/lib/event_source/channel.rb b/lib/event_source/channel.rb index 87ca29f0..94af0f88 100644 --- a/lib/event_source/channel.rb +++ b/lib/event_source/channel.rb @@ -110,5 +110,12 @@ def add_subscribe_operation(async_api_channel_item) ) logger.info " Subscribe Operation Added: #{operation_id}" end + + def cancel_consumers + subscribe_operations.each_value do |sub_op| + subject = sub_op.subject + subject.cancel_consumers! if subject.respond_to?(:cancel_consumers!) + end + end end end diff --git a/lib/event_source/connection_manager.rb b/lib/event_source/connection_manager.rb index 5504d08f..7d11b3b7 100644 --- a/lib/event_source/connection_manager.rb +++ b/lib/event_source/connection_manager.rb @@ -160,52 +160,54 @@ def drop_connection(connection_uri) connections end - # Cancel AMQP consumers and wait briefly for drain - def cancel_amqp_consumers!(timeout: 5) - logger.info "Cancelling AMQP consumers prior to shutdown" - logger.info "AMQP inflight handlers (before_cancel): #{EventSource.amqp_inflight_count}" - - connections_for(:amqp).each do |connection| + def cancel_consumers_for(protocol, timeout: 5) + protocol_value = protocol.to_s.upcase + logger.info "Cancelling #{protocol_value} consumers prior to shutdown" + connections = connections_for(protocol) + any_consumers = connections.any? { |connection| connection_has_consumers?(connection) } + logger.info "#{protocol_value} inflight handlers (before_cancel): #{EventSource.inflight_messages_count}, any_consumers=#{any_consumers}" + + connections.each do |connection| connection.channels.each_value do |channel| - channel.subscribe_operations.each_value do |sub_op| - subject = sub_op.subject - if subject.respond_to?(:cancel_consumers!) - subject.cancel_consumers! - end - end + channel.cancel_consumers end end - wait_for_amqp_drain(timeout) + wait_for_connections_to_drain(connections, timeout) - logger.info "AMQP consumer cancellation complete" - logger.info "AMQP inflight handlers (end): #{EventSource.amqp_inflight_count}" + logger.info "#{protocol_value} consumer cancellation complete" + logger.info "#{protocol_value} inflight handlers (end): #{EventSource.inflight_messages_count}" end private - def wait_for_amqp_drain(timeout) - logger.info "Waiting for AMQP inflight handlers to drain" + def wait_for_connections_to_drain(connections, timeout) + return if connections.empty? + protocol = connections.first.protocol.to_s.upcase start = Time.now + loop do - inflight = EventSource.amqp_inflight_count - any_consumers = connections_for(:amqp).any? do |connection| - connection.channels.values.any? do |channel| - begin - channel.channel_proxy.subject.any_consumers? - rescue StandardError - false - end - end - end - logger.info "AMQP inflight handlers (drain): #{inflight}, any_consumers=#{any_consumers}" - break if inflight == 0 && !any_consumers + inflight = EventSource.inflight_messages_count + any_consumers = connections.any? { |connection| connection_has_consumers?(connection) } + + logger.info "#{protocol} inflight handlers (drain): #{inflight}, any_consumers=#{any_consumers}" + break if inflight.zero? && !any_consumers break if (Time.now - start) >= timeout sleep 0.1 end end + def connection_has_consumers?(connection) + connection.channels.values.any? do |channel| + begin + channel.channel_proxy.subject.any_consumers? + rescue StandardError + false + end + end + end + # Find connection proxy class for given protocol # @param [Symbol] protocol the protocol name, `:http` or `:amqp` # @return [Class] Protocol Specific Connection Proxy Class diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index e80b7284..8eff6fe7 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -125,7 +125,7 @@ def on_receive_message( metadata, payload ) - EventSource.increment_amqp_inflight + EventSource.increment_inflight_messages logger.debug '**************************' logger.debug subscriber_klass.inspect logger.debug delivery_info.inspect @@ -160,7 +160,7 @@ def on_receive_message( logger.error "Bunny Consumer Error \n message: #{e.message} \n backtrace: #{e.backtrace.join("\n")}" ensure subscriber = nil - EventSource.decrement_amqp_inflight + EventSource.decrement_inflight_messages end # Decodes the given payload based on the `contentEncoding` specified in the AsyncAPI *_subscribe.yml message bindings. diff --git a/lib/event_source/railtie.rb b/lib/event_source/railtie.rb index 493b41d0..3b0e7254 100644 --- a/lib/event_source/railtie.rb +++ b/lib/event_source/railtie.rb @@ -17,10 +17,11 @@ module Railtie Thread.new do begin logger.info "#{reason}, starting graceful shutdown" - logger.info "AMQP inflight handlers at shutdown start: #{EventSource.amqp_inflight_count}" + logger.info "AMQP inflight handlers at shutdown start: #{EventSource.inflight_messages_count}" cm = EventSource::ConnectionManager.instance + # Stop consuming and allow in-flight handlers to drain briefly - cm.cancel_amqp_consumers!(timeout: amqp_timeout) + cm.cancel_consumers_for(:amqp, timeout: amqp_timeout) cm.drop_connections_for(:amqp) rescue => e logger.error "graceful shutdown error: #{e.class}: #{e.message}" diff --git a/spec/event_source/connection_manager_spec.rb b/spec/event_source/connection_manager_spec.rb index 10100d71..51825ebe 100644 --- a/spec/event_source/connection_manager_spec.rb +++ b/spec/event_source/connection_manager_spec.rb @@ -92,6 +92,68 @@ ).to eq Hash.new end end + + context '.cancel_consumers_for' do + let(:connection) { connection_manager.connections[connection_url] } + + let(:async_api_file) do + Pathname.pwd.join('spec', 'support', 'asyncapi', 'polypress_amqp.yml') + end + + let(:async_api_channels) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call(path: async_api_file) + .success + .channels + end + + let(:channel) do + connection.channels[:'on_polypress.magi_medicaid.mitc.eligibilities'] + end + + before do + connection.start unless connection.active? + connection.add_channels(async_api_channels) + channel.subscribe_operations.values.each do |sub_op| + subscriber_klass = double('SubscriberKlass') + sub_op.subscribe(subscriber_klass) + end + end + + context 'when inflight messages are present' do + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(5) + end + + it 'waits for timeout' do + expect(channel).to receive(:cancel_consumers).and_call_original + connection_manager.cancel_consumers_for(protocol, timeout: 1) + end + end + + context 'when inflight messages are draining' do + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(1, 0) + end + + it 'waits for drain' do + expect(channel).to receive(:cancel_consumers).and_call_original + connection_manager.cancel_consumers_for(protocol, timeout: 5) + end + end + + context 'when no inflight messages are present' do + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(0) + end + + it 'cancels AMQP consumers on each channel without waiting' do + expect(channel).to receive(:cancel_consumers).and_call_original + connection_manager.cancel_consumers_for(protocol, timeout: 1) + end + end + end end end end @@ -163,5 +225,31 @@ end end end + + context 'when no connections are present + - .cancel_consumers_for' do + let(:protocol) { :amqp } + let(:connection) { instance_double('EventSource::Connection', protocol: protocol, channels: { default: channel }) } + let(:channel) { instance_double('EventSource::Channel') } + + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(0) + end + + context 'does not raise error' do + before do + allow(connection_manager).to receive(:connections_for).with(protocol).and_return([]) + allow(connection_manager).to receive(:wait_for_connections_to_drain) + end + + it 'and still calls drain helper' do + expect do + connection_manager.cancel_consumers_for(protocol, timeout: 1) + end.not_to raise_error + + expect(connection_manager).to have_received(:wait_for_connections_to_drain).with([], 1) + end + end + end end end From 70df6a3c68daf7dfbf5bd334b31dc281de21c621 Mon Sep 17 00:00:00 2001 From: vkghub Date: Tue, 3 Feb 2026 12:58:39 -0500 Subject: [PATCH 04/14] remove unused --- lib/event_source/queue.rb | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/lib/event_source/queue.rb b/lib/event_source/queue.rb index 00b57ad7..bafeba03 100644 --- a/lib/event_source/queue.rb +++ b/lib/event_source/queue.rb @@ -49,16 +49,6 @@ def closed? @subject.closed? end - # Current number of items waiting in the queue - # @return [Integer] - def size - if @subject.respond_to?(:length) - @subject.length - else - @subject.size - end - end - # Register an action to be performed, with a resolver class and key. def register_action(resolver, key) @registered_actions << [resolver, key] From cd976f863b372fb2829fa993804a5f092ce7586c Mon Sep 17 00:00:00 2001 From: vkghub Date: Tue, 3 Feb 2026 14:17:09 -0500 Subject: [PATCH 05/14] add more specs --- spec/event_source/channel_spec.rb | 18 ++++++++++++++++++ spec/event_source/connection_manager_spec.rb | 2 ++ 2 files changed, 20 insertions(+) diff --git a/spec/event_source/channel_spec.rb b/spec/event_source/channel_spec.rb index 3f1020cc..ada8f37e 100644 --- a/spec/event_source/channel_spec.rb +++ b/spec/event_source/channel_spec.rb @@ -92,5 +92,23 @@ expect(audit_log_queue.pop.last).to eq 'test message from enterprise events!!' expect(audit_log_queue.pop.last).to eq 'test message from enrollment events!!' end + + context '.cancel_consumers' do + before do + subject.subscribe_operations.values.each do |sub_op| + subscriber_klass = double('SubscriberKlass') + sub_op.subscribe(subscriber_klass) + end + end + + it 'cancels consumers via channel' do + channel = subject + expect(channel.channel_proxy.any_consumers?).to be_truthy + expect(channel.subscribe_operations.values.first.subject.consumers.count).to be > 0 + channel.cancel_consumers + expect(channel.channel_proxy.any_consumers?).to be_falsey + expect(channel.subscribe_operations.values.first.subject.consumers.count).to eq 0 + end + end end end diff --git a/spec/event_source/connection_manager_spec.rb b/spec/event_source/connection_manager_spec.rb index 51825ebe..3d6f14bb 100644 --- a/spec/event_source/connection_manager_spec.rb +++ b/spec/event_source/connection_manager_spec.rb @@ -121,6 +121,8 @@ end end + after { connection.disconnect if connection.active? } + context 'when inflight messages are present' do before do allow(EventSource).to receive(:inflight_messages_count).and_return(5) From 8f3a32f2a1619b35994a2d0e665bf02760a5475c Mon Sep 17 00:00:00 2001 From: vkghub Date: Tue, 3 Feb 2026 17:05:15 -0500 Subject: [PATCH 06/14] add railtie spec --- lib/event_source/configure/config.rb | 2 +- spec/config_helper.rb | 2 +- spec/rails_app/spec/railtie_spec.rb | 88 +++++++++++++++++++++++++++- 3 files changed, 89 insertions(+), 3 deletions(-) diff --git a/lib/event_source/configure/config.rb b/lib/event_source/configure/config.rb index 784eb9b7..f5cd74f6 100644 --- a/lib/event_source/configure/config.rb +++ b/lib/event_source/configure/config.rb @@ -10,7 +10,7 @@ class Config def initialize @log_level = :warn - @shutdown_timeouts = { amqp_drain: 15, http_drain: 15 } + @shutdown_timeouts = { amqp_drain: 5, http_drain: 5 } end # TODO: add default for pub_sub_root diff --git a/spec/config_helper.rb b/spec/config_helper.rb index d3d11654..58678da5 100644 --- a/spec/config_helper.rb +++ b/spec/config_helper.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true EventSource.configure do |config| - config.protocols = %w[amqp http] + config.protocols = %w[amqp http arn] config.log_level = :warn end diff --git a/spec/rails_app/spec/railtie_spec.rb b/spec/rails_app/spec/railtie_spec.rb index 7c93c25f..462fd9c9 100644 --- a/spec/rails_app/spec/railtie_spec.rb +++ b/spec/rails_app/spec/railtie_spec.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true -require_relative './rails_helper' +require_relative 'rails_helper' +require 'config_helper' +require 'event_source/railtie' RSpec.describe EventSource::Railtie do it "runs when invoked" do @@ -8,4 +10,88 @@ connection = manager.connections_for(:amqp).first expect(connection).not_to be_nil end + + context 'auto_shutdown' do + before(:all) do + manager = EventSource::ConnectionManager.instance + manager.drop_connections_for(:amqp) + manager.drop_connections_for(:http) + manager.drop_connections_for(:arn) + end + + let(:protocol) { :amqp } + let(:url) { 'amqp://localhost:5672/' } + let(:protocol_version) { '0.9.1' } + let(:description) { 'Development RabbitMQ Server' } + + let(:server_config) do + { + ref: url, + url: url, + protocol: protocol, + protocol_version: protocol_version, + description: description + } + end + + let(:connection_manager) { EventSource::ConnectionManager.instance } + + let(:connection) do + connection_manager.add_connection(server_config) + connection_manager.connections_for(:amqp).first + end + + let(:async_api_file) do + Pathname.pwd.join('spec', 'support', 'asyncapi', 'polypress_amqp.yml') + end + + let(:async_api_channels) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call(path: async_api_file) + .success + .channels + end + + let(:channel) do + connection.channels[:'on_polypress.magi_medicaid.mitc.eligibilities'] + end + + before do + connection.start unless connection.active? + connection.add_channels(async_api_channels) + + channel.subscribe_operations.each_value do |subscribe_operation| + subscriber_klass = double('SubscriberKlass') + subscribe_operation.subscribe(subscriber_klass) + end + + @original_timeouts = EventSource.config.shutdown_timeouts + @original_auto_shutdown = EventSource.config.auto_shutdown + + EventSource.config.shutdown_timeouts = { amqp_drain: 2 } + EventSource.config.auto_shutdown = true + + allow(EventSource).to receive(:inflight_messages_count).and_return(0) + allow_any_instance_of(Object).to receive(:at_exit) do |_, &blk| + blk.call + end + end + + after do + EventSource.config.shutdown_timeouts = @original_timeouts + EventSource.config.auto_shutdown = @original_auto_shutdown + end + + it 'cancels AMQP consumers and drops AMQP connections on shutdown' do + expect(connection_manager.connections_for(:amqp)).not_to be_empty + expect(connection.channels).not_to be_empty + expect(channel.subscribe_operations.values.first.subject.consumers).not_to be_empty + + initializer = Rails.application.initializers.find { |i| i.name == 'event_source.boot' } + initializer.run(Rails.application) + + expect(connection_manager.connections_for(:amqp)).to be_empty + end + end end From 3141786ae0c40d54762fcd4df349afd5c287e3e4 Mon Sep 17 00:00:00 2001 From: vkghub Date: Tue, 3 Feb 2026 17:08:26 -0500 Subject: [PATCH 07/14] fix typo --- spec/rails_app/spec/railtie_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/rails_app/spec/railtie_spec.rb b/spec/rails_app/spec/railtie_spec.rb index 462fd9c9..1b64ac2d 100644 --- a/spec/rails_app/spec/railtie_spec.rb +++ b/spec/rails_app/spec/railtie_spec.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true require_relative 'rails_helper' -require 'config_helper' +require_relative 'config_helper' require 'event_source/railtie' RSpec.describe EventSource::Railtie do From eff3927e4e85be72346046ba7d430c66198bd363 Mon Sep 17 00:00:00 2001 From: vkghub Date: Tue, 3 Feb 2026 18:27:04 -0500 Subject: [PATCH 08/14] spec fix --- spec/rails_app/spec/railtie_spec.rb | 52 +++++++++++++++++------------ 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/spec/rails_app/spec/railtie_spec.rb b/spec/rails_app/spec/railtie_spec.rb index 1b64ac2d..4f63c114 100644 --- a/spec/rails_app/spec/railtie_spec.rb +++ b/spec/rails_app/spec/railtie_spec.rb @@ -1,24 +1,25 @@ # frozen_string_literal: true require_relative 'rails_helper' -require_relative 'config_helper' require 'event_source/railtie' RSpec.describe EventSource::Railtie do - it "runs when invoked" do + before do + @original_auto_shutdown = EventSource.config.auto_shutdown + EventSource.config.auto_shutdown = auto_shutdown_enabled + @at_exit_handler = nil + allow_any_instance_of(Object).to receive(:at_exit) do |_, &blk| + @at_exit_handler = blk + end + initializer = Rails.application.initializers.find { |i| i.name == 'event_source.boot' } + initializer.run(Rails.application) manager = EventSource::ConnectionManager.instance - connection = manager.connections_for(:amqp).first - expect(connection).not_to be_nil + manager.drop_connections_for(:amqp) + manager.drop_connections_for(:http) + manager.drop_connections_for(:arn) end - context 'auto_shutdown' do - before(:all) do - manager = EventSource::ConnectionManager.instance - manager.drop_connections_for(:amqp) - manager.drop_connections_for(:http) - manager.drop_connections_for(:arn) - end - + context '.auto_shutdown' do let(:protocol) { :amqp } let(:url) { 'amqp://localhost:5672/' } let(:protocol_version) { '0.9.1' } @@ -73,9 +74,6 @@ EventSource.config.auto_shutdown = true allow(EventSource).to receive(:inflight_messages_count).and_return(0) - allow_any_instance_of(Object).to receive(:at_exit) do |_, &blk| - blk.call - end end after do @@ -83,15 +81,25 @@ EventSource.config.auto_shutdown = @original_auto_shutdown end - it 'cancels AMQP consumers and drops AMQP connections on shutdown' do - expect(connection_manager.connections_for(:amqp)).not_to be_empty - expect(connection.channels).not_to be_empty - expect(channel.subscribe_operations.values.first.subject.consumers).not_to be_empty + context 'when auto_shutdown is enabled' do + let(:auto_shutdown_enabled) { true } + it 'cancels AMQP consumers and drops AMQP connections on shutdown' do + expect(connection_manager.connections_for(:amqp)).not_to be_empty + expect(connection.channels).not_to be_empty + expect(channel.subscribe_operations.values.first.subject.consumers).not_to be_empty + expect(@at_exit_handler).not_to be_nil + @at_exit_handler.call - initializer = Rails.application.initializers.find { |i| i.name == 'event_source.boot' } - initializer.run(Rails.application) + expect(connection_manager.connections_for(:amqp)).to be_empty + end + end - expect(connection_manager.connections_for(:amqp)).to be_empty + context 'when auto_shutdown is disabled' do + let(:auto_shutdown_enabled) { false } + + it 'does not register at_exit handler' do + expect(connection_manager.connections_for(:amqp)).not_to be_empty + end end end end From 84812878e041737fee4fa9845f6008f663c956da Mon Sep 17 00:00:00 2001 From: vkghub Date: Tue, 3 Feb 2026 21:45:02 -0500 Subject: [PATCH 09/14] fix path --- spec/rails_app/spec/railtie_spec.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spec/rails_app/spec/railtie_spec.rb b/spec/rails_app/spec/railtie_spec.rb index 4f63c114..d0a1efb0 100644 --- a/spec/rails_app/spec/railtie_spec.rb +++ b/spec/rails_app/spec/railtie_spec.rb @@ -43,7 +43,9 @@ end let(:async_api_file) do - Pathname.pwd.join('spec', 'support', 'asyncapi', 'polypress_amqp.yml') + Pathname.new(__dir__) + .join('..', '..', 'support', 'asyncapi', 'polypress_amqp.yml') + .expand_path end let(:async_api_channels) do From cea920392fcb9dcdff689772eb339d29dd926759 Mon Sep 17 00:00:00 2001 From: vkghub Date: Tue, 3 Feb 2026 22:00:02 -0500 Subject: [PATCH 10/14] revert spec config --- spec/config_helper.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/config_helper.rb b/spec/config_helper.rb index 58678da5..d3d11654 100644 --- a/spec/config_helper.rb +++ b/spec/config_helper.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true EventSource.configure do |config| - config.protocols = %w[amqp http arn] + config.protocols = %w[amqp http] config.log_level = :warn end From 4484cb5f9d8073feb4ee167de213a84a2f152063 Mon Sep 17 00:00:00 2001 From: vkghub Date: Tue, 3 Feb 2026 22:02:52 -0500 Subject: [PATCH 11/14] revert spec --- spec/rails_app/spec/railtie_spec.rb | 106 ++-------------------------- 1 file changed, 5 insertions(+), 101 deletions(-) diff --git a/spec/rails_app/spec/railtie_spec.rb b/spec/rails_app/spec/railtie_spec.rb index d0a1efb0..5df64968 100644 --- a/spec/rails_app/spec/railtie_spec.rb +++ b/spec/rails_app/spec/railtie_spec.rb @@ -1,107 +1,11 @@ # frozen_string_literal: true -require_relative 'rails_helper' -require 'event_source/railtie' +require_relative './rails_helper' RSpec.describe EventSource::Railtie do - before do - @original_auto_shutdown = EventSource.config.auto_shutdown - EventSource.config.auto_shutdown = auto_shutdown_enabled - @at_exit_handler = nil - allow_any_instance_of(Object).to receive(:at_exit) do |_, &blk| - @at_exit_handler = blk - end - initializer = Rails.application.initializers.find { |i| i.name == 'event_source.boot' } - initializer.run(Rails.application) + it "runs when invoked" do manager = EventSource::ConnectionManager.instance - manager.drop_connections_for(:amqp) - manager.drop_connections_for(:http) - manager.drop_connections_for(:arn) + connection = manager.connections_for(:amqp).first + expect(connection).not_to be_nil end - - context '.auto_shutdown' do - let(:protocol) { :amqp } - let(:url) { 'amqp://localhost:5672/' } - let(:protocol_version) { '0.9.1' } - let(:description) { 'Development RabbitMQ Server' } - - let(:server_config) do - { - ref: url, - url: url, - protocol: protocol, - protocol_version: protocol_version, - description: description - } - end - - let(:connection_manager) { EventSource::ConnectionManager.instance } - - let(:connection) do - connection_manager.add_connection(server_config) - connection_manager.connections_for(:amqp).first - end - - let(:async_api_file) do - Pathname.new(__dir__) - .join('..', '..', 'support', 'asyncapi', 'polypress_amqp.yml') - .expand_path - end - - let(:async_api_channels) do - EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath - .new - .call(path: async_api_file) - .success - .channels - end - - let(:channel) do - connection.channels[:'on_polypress.magi_medicaid.mitc.eligibilities'] - end - - before do - connection.start unless connection.active? - connection.add_channels(async_api_channels) - - channel.subscribe_operations.each_value do |subscribe_operation| - subscriber_klass = double('SubscriberKlass') - subscribe_operation.subscribe(subscriber_klass) - end - - @original_timeouts = EventSource.config.shutdown_timeouts - @original_auto_shutdown = EventSource.config.auto_shutdown - - EventSource.config.shutdown_timeouts = { amqp_drain: 2 } - EventSource.config.auto_shutdown = true - - allow(EventSource).to receive(:inflight_messages_count).and_return(0) - end - - after do - EventSource.config.shutdown_timeouts = @original_timeouts - EventSource.config.auto_shutdown = @original_auto_shutdown - end - - context 'when auto_shutdown is enabled' do - let(:auto_shutdown_enabled) { true } - it 'cancels AMQP consumers and drops AMQP connections on shutdown' do - expect(connection_manager.connections_for(:amqp)).not_to be_empty - expect(connection.channels).not_to be_empty - expect(channel.subscribe_operations.values.first.subject.consumers).not_to be_empty - expect(@at_exit_handler).not_to be_nil - @at_exit_handler.call - - expect(connection_manager.connections_for(:amqp)).to be_empty - end - end - - context 'when auto_shutdown is disabled' do - let(:auto_shutdown_enabled) { false } - - it 'does not register at_exit handler' do - expect(connection_manager.connections_for(:amqp)).not_to be_empty - end - end - end -end +end \ No newline at end of file From 6711ec76d4ac5a1cf6ac9a3f197c7eba4679ad8c Mon Sep 17 00:00:00 2001 From: vkghub Date: Tue, 3 Feb 2026 22:17:15 -0500 Subject: [PATCH 12/14] test spec --- spec/config_helper.rb | 2 +- spec/rails_app/spec/railtie_spec.rb | 91 ++++++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/spec/config_helper.rb b/spec/config_helper.rb index d3d11654..58678da5 100644 --- a/spec/config_helper.rb +++ b/spec/config_helper.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true EventSource.configure do |config| - config.protocols = %w[amqp http] + config.protocols = %w[amqp http arn] config.log_level = :warn end diff --git a/spec/rails_app/spec/railtie_spec.rb b/spec/rails_app/spec/railtie_spec.rb index 5df64968..d97b4a86 100644 --- a/spec/rails_app/spec/railtie_spec.rb +++ b/spec/rails_app/spec/railtie_spec.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true -require_relative './rails_helper' +require_relative 'rails_helper' +require_relative '../../config_helper' +require 'event_source/railtie' RSpec.describe EventSource::Railtie do it "runs when invoked" do @@ -8,4 +10,89 @@ connection = manager.connections_for(:amqp).first expect(connection).not_to be_nil end -end \ No newline at end of file + + context 'auto_shutdown' do + before(:all) do + manager = EventSource::ConnectionManager.instance + manager.drop_connections_for(:amqp) + manager.drop_connections_for(:http) + end + + let(:protocol) { :amqp } + let(:url) { 'amqp://localhost:5672/' } + let(:protocol_version) { '0.9.1' } + let(:description) { 'Development RabbitMQ Server' } + + let(:server_config) do + { + ref: url, + url: url, + protocol: protocol, + protocol_version: protocol_version, + description: description + } + end + + let(:connection_manager) { EventSource::ConnectionManager.instance } + + let(:connection) do + connection_manager.add_connection(server_config) + connection_manager.connections_for(:amqp).first + end + + let(:async_api_file) do + Pathname.new(__dir__) + .join('..', '..', 'support', 'asyncapi', 'polypress_amqp.yml') + .expand_path + end + + let(:async_api_channels) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call(path: async_api_file) + .success + .channels + end + + let(:channel) do + connection.channels[:'on_polypress.magi_medicaid.mitc.eligibilities'] + end + + before do + connection.start unless connection.active? + connection.add_channels(async_api_channels) + + channel.subscribe_operations.each_value do |subscribe_operation| + subscriber_klass = double('SubscriberKlass') + subscribe_operation.subscribe(subscriber_klass) + end + + @original_timeouts = EventSource.config.shutdown_timeouts + @original_auto_shutdown = EventSource.config.auto_shutdown + + EventSource.config.shutdown_timeouts = { amqp_drain: 2 } + EventSource.config.auto_shutdown = true + + allow(EventSource).to receive(:inflight_messages_count).and_return(0) + allow_any_instance_of(Object).to receive(:at_exit) do |_, &blk| + blk.call + end + end + + after do + EventSource.config.shutdown_timeouts = @original_timeouts + EventSource.config.auto_shutdown = @original_auto_shutdown + end + + it 'cancels AMQP consumers and drops AMQP connections on shutdown' do + expect(connection_manager.connections_for(:amqp)).not_to be_empty + expect(connection.channels).not_to be_empty + expect(channel.subscribe_operations.values.first.subject.consumers).not_to be_empty + + initializer = Rails.application.initializers.find { |i| i.name == 'event_source.boot' } + initializer.run(Rails.application) + + expect(connection_manager.connections_for(:amqp)).to be_empty + end + end +end From d9d7466b7fe112ad6b3dd673887445f2b19ef531 Mon Sep 17 00:00:00 2001 From: vkghub Date: Tue, 3 Feb 2026 22:42:08 -0500 Subject: [PATCH 13/14] test one more change --- spec/rails_app/spec/railtie_spec.rb | 51 ++++++++++++++++------------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/spec/rails_app/spec/railtie_spec.rb b/spec/rails_app/spec/railtie_spec.rb index d97b4a86..f99af70e 100644 --- a/spec/rails_app/spec/railtie_spec.rb +++ b/spec/rails_app/spec/railtie_spec.rb @@ -1,23 +1,23 @@ # frozen_string_literal: true require_relative 'rails_helper' -require_relative '../../config_helper' -require 'event_source/railtie' RSpec.describe EventSource::Railtie do - it "runs when invoked" do + before do + @original_auto_shutdown = EventSource.config.auto_shutdown + EventSource.config.auto_shutdown = auto_shutdown_enabled + @at_exit_handler = nil + allow_any_instance_of(Object).to receive(:at_exit) do |_, &blk| + @at_exit_handler = blk + end + initializer = Rails.application.initializers.find { |i| i.name == 'event_source.boot' } + initializer.run(Rails.application) manager = EventSource::ConnectionManager.instance - connection = manager.connections_for(:amqp).first - expect(connection).not_to be_nil + manager.drop_connections_for(:amqp) + manager.drop_connections_for(:http) end - context 'auto_shutdown' do - before(:all) do - manager = EventSource::ConnectionManager.instance - manager.drop_connections_for(:amqp) - manager.drop_connections_for(:http) - end - + context '.auto_shutdown' do let(:protocol) { :amqp } let(:url) { 'amqp://localhost:5672/' } let(:protocol_version) { '0.9.1' } @@ -74,9 +74,6 @@ EventSource.config.auto_shutdown = true allow(EventSource).to receive(:inflight_messages_count).and_return(0) - allow_any_instance_of(Object).to receive(:at_exit) do |_, &blk| - blk.call - end end after do @@ -84,15 +81,25 @@ EventSource.config.auto_shutdown = @original_auto_shutdown end - it 'cancels AMQP consumers and drops AMQP connections on shutdown' do - expect(connection_manager.connections_for(:amqp)).not_to be_empty - expect(connection.channels).not_to be_empty - expect(channel.subscribe_operations.values.first.subject.consumers).not_to be_empty + context 'when auto_shutdown is enabled' do + let(:auto_shutdown_enabled) { true } + it 'cancels AMQP consumers and drops AMQP connections on shutdown' do + expect(connection_manager.connections_for(:amqp)).not_to be_empty + expect(connection.channels).not_to be_empty + expect(channel.subscribe_operations.values.first.subject.consumers).not_to be_empty + expect(@at_exit_handler).not_to be_nil + @at_exit_handler.call - initializer = Rails.application.initializers.find { |i| i.name == 'event_source.boot' } - initializer.run(Rails.application) + expect(connection_manager.connections_for(:amqp)).to be_empty + end + end - expect(connection_manager.connections_for(:amqp)).to be_empty + context 'when auto_shutdown is disabled' do + let(:auto_shutdown_enabled) { false } + + it 'does not register at_exit handler' do + expect(connection_manager.connections_for(:amqp)).not_to be_empty + end end end end From b5b49f9a609f350903e625d7b0d8e0193d49ccb9 Mon Sep 17 00:00:00 2001 From: vkghub Date: Tue, 3 Feb 2026 22:49:34 -0500 Subject: [PATCH 14/14] remove unused code --- spec/config_helper.rb | 2 +- spec/rails_app/spec/railtie_spec.rb | 16 ++++------------ 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/spec/config_helper.rb b/spec/config_helper.rb index 58678da5..d3d11654 100644 --- a/spec/config_helper.rb +++ b/spec/config_helper.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true EventSource.configure do |config| - config.protocols = %w[amqp http arn] + config.protocols = %w[amqp http] config.log_level = :warn end diff --git a/spec/rails_app/spec/railtie_spec.rb b/spec/rails_app/spec/railtie_spec.rb index f99af70e..ce6c8a1f 100644 --- a/spec/rails_app/spec/railtie_spec.rb +++ b/spec/rails_app/spec/railtie_spec.rb @@ -12,9 +12,6 @@ end initializer = Rails.application.initializers.find { |i| i.name == 'event_source.boot' } initializer.run(Rails.application) - manager = EventSource::ConnectionManager.instance - manager.drop_connections_for(:amqp) - manager.drop_connections_for(:http) end context '.auto_shutdown' do @@ -22,7 +19,6 @@ let(:url) { 'amqp://localhost:5672/' } let(:protocol_version) { '0.9.1' } let(:description) { 'Development RabbitMQ Server' } - let(:server_config) do { ref: url, @@ -34,16 +30,13 @@ end let(:connection_manager) { EventSource::ConnectionManager.instance } - let(:connection) do connection_manager.add_connection(server_config) connection_manager.connections_for(:amqp).first end let(:async_api_file) do - Pathname.new(__dir__) - .join('..', '..', 'support', 'asyncapi', 'polypress_amqp.yml') - .expand_path + Pathname.new(__dir__).join('..', '..', 'support', 'asyncapi', 'polypress_amqp.yml').expand_path end let(:async_api_channels) do @@ -59,6 +52,8 @@ end before do + connection_manager.drop_connections_for(:amqp) + connection_manager.drop_connections_for(:http) connection.start unless connection.active? connection.add_channels(async_api_channels) @@ -68,11 +63,7 @@ end @original_timeouts = EventSource.config.shutdown_timeouts - @original_auto_shutdown = EventSource.config.auto_shutdown - EventSource.config.shutdown_timeouts = { amqp_drain: 2 } - EventSource.config.auto_shutdown = true - allow(EventSource).to receive(:inflight_messages_count).and_return(0) end @@ -83,6 +74,7 @@ context 'when auto_shutdown is enabled' do let(:auto_shutdown_enabled) { true } + it 'cancels AMQP consumers and drops AMQP connections on shutdown' do expect(connection_manager.connections_for(:amqp)).not_to be_empty expect(connection.channels).not_to be_empty