diff --git a/lib/event_source.rb b/lib/event_source.rb index 5b3d8429..7597298e 100644 --- a/lib/event_source.rb +++ b/lib/event_source.rb @@ -112,6 +112,27 @@ def register_subscriber(subscriber_klass) def register_publisher(subscriber_klass) boot_registry.register_publisher(subscriber_klass) end + + def inflight_messages_count + @inflight_mutex ||= Mutex.new + @inflight_mutex.synchronize { @inflight_messages_count ||= 0 } + end + + def increment_inflight_messages + @inflight_mutex ||= Mutex.new + @inflight_mutex.synchronize do + @inflight_messages_count ||= 0 + @inflight_messages_count += 1 + end + end + + def decrement_inflight_messages + @inflight_mutex ||= Mutex.new + @inflight_mutex.synchronize do + @inflight_messages_count ||= 0 + @inflight_messages_count = [@inflight_messages_count - 1, 0].max + end + end end class EventSourceLogger diff --git a/lib/event_source/channel.rb b/lib/event_source/channel.rb index 87ca29f0..94af0f88 100644 --- a/lib/event_source/channel.rb +++ b/lib/event_source/channel.rb @@ -110,5 +110,12 @@ def add_subscribe_operation(async_api_channel_item) ) logger.info " Subscribe Operation Added: #{operation_id}" end + + def cancel_consumers + subscribe_operations.each_value do |sub_op| + subject = sub_op.subject + subject.cancel_consumers! if subject.respond_to?(:cancel_consumers!) + end + end end end diff --git a/lib/event_source/configure/config.rb b/lib/event_source/configure/config.rb index fbdea5cc..f5cd74f6 100644 --- a/lib/event_source/configure/config.rb +++ b/lib/event_source/configure/config.rb @@ -10,11 +10,12 @@ class Config def initialize @log_level = :warn + @shutdown_timeouts = { amqp_drain: 5, http_drain: 5 } end # TODO: add default for pub_sub_root attr_writer :pub_sub_root, :protocols, :server_configurations - attr_accessor :app_name, :log_level + attr_accessor :app_name, :log_level, :auto_shutdown, :shutdown_timeouts def load_protocols @protocols.each do |protocol| diff --git a/lib/event_source/connection_manager.rb b/lib/event_source/connection_manager.rb index 4af587f0..7d11b3b7 100644 --- a/lib/event_source/connection_manager.rb +++ b/lib/event_source/connection_manager.rb @@ -160,8 +160,54 @@ def drop_connection(connection_uri) connections end + def cancel_consumers_for(protocol, timeout: 5) + protocol_value = protocol.to_s.upcase + logger.info "Cancelling #{protocol_value} consumers prior to shutdown" + connections = connections_for(protocol) + any_consumers = connections.any? { |connection| connection_has_consumers?(connection) } + logger.info "#{protocol_value} inflight handlers (before_cancel): #{EventSource.inflight_messages_count}, any_consumers=#{any_consumers}" + + connections.each do |connection| + connection.channels.each_value do |channel| + channel.cancel_consumers + end + end + + wait_for_connections_to_drain(connections, timeout) + + logger.info "#{protocol_value} consumer cancellation complete" + logger.info "#{protocol_value} inflight handlers (end): #{EventSource.inflight_messages_count}" + end + private + def wait_for_connections_to_drain(connections, timeout) + return if connections.empty? + + protocol = connections.first.protocol.to_s.upcase + start = Time.now + + loop do + inflight = EventSource.inflight_messages_count + any_consumers = connections.any? { |connection| connection_has_consumers?(connection) } + + logger.info "#{protocol} inflight handlers (drain): #{inflight}, any_consumers=#{any_consumers}" + break if inflight.zero? && !any_consumers + break if (Time.now - start) >= timeout + sleep 0.1 + end + end + + def connection_has_consumers?(connection) + connection.channels.values.any? do |channel| + begin + channel.channel_proxy.subject.any_consumers? + rescue StandardError + false + end + end + end + # Find connection proxy class for given protocol # @param [Symbol] protocol the protocol name, `:http` or `:amqp` # @return [Class] Protocol Specific Connection Proxy Class diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index 7416dc8e..8eff6fe7 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -91,7 +91,7 @@ def spawn_thread(options) end def add_consumer(subscriber_klass, options) - @subject.subscribe(options) do |delivery_info, metadata, payload| + consumer = @subject.subscribe(options) do |delivery_info, metadata, payload| on_receive_message( subscriber_klass, delivery_info, @@ -99,6 +99,7 @@ def add_consumer(subscriber_klass, options) payload ) end + @consumers << consumer if consumer end def convert_to_subscribe_options(options) @@ -124,6 +125,7 @@ def on_receive_message( metadata, payload ) + EventSource.increment_inflight_messages logger.debug '**************************' logger.debug subscriber_klass.inspect logger.debug delivery_info.inspect @@ -158,6 +160,7 @@ def on_receive_message( logger.error "Bunny Consumer Error \n message: #{e.message} \n backtrace: #{e.backtrace.join("\n")}" ensure subscriber = nil + EventSource.decrement_inflight_messages end # Decodes the given payload based on the `contentEncoding` specified in the AsyncAPI *_subscribe.yml message bindings. @@ -198,6 +201,18 @@ def method_missing(name, *args) @subject.send(name, *args) end + # Cancel all registered consumers for this queue + def cancel_consumers! + @consumers.each do |consumer| + begin + consumer.cancel + rescue StandardError => e + logger.info "Consumer cancellation error: #{e.message}" + end + end + @consumers.clear + end + private def subscriber_klass_name_to_suffix(subscriber_klass) diff --git a/lib/event_source/railtie.rb b/lib/event_source/railtie.rb index 77dcc66b..3b0e7254 100644 --- a/lib/event_source/railtie.rb +++ b/lib/event_source/railtie.rb @@ -1,9 +1,41 @@ # frozen_string_literal: true +require 'logger' + module EventSource # :nodoc: module Railtie Rails::Application::Finisher.initializer "event_source.boot", after: :finisher_hook do + logger = Logger.new($stdout) + logger.progname = 'EventSource graceful shutdown' + timeouts = EventSource.config.shutdown_timeouts || {} + amqp_timeout = timeouts[:amqp_drain] || 5 + + # Perform shutdown work outside of trap/at_exit context to avoid + # ThreadError from mutex operations within Bunny (AMQP client). + shutdown = lambda do |reason| + Thread.new do + begin + logger.info "#{reason}, starting graceful shutdown" + logger.info "AMQP inflight handlers at shutdown start: #{EventSource.inflight_messages_count}" + cm = EventSource::ConnectionManager.instance + + # Stop consuming and allow in-flight handlers to drain briefly + cm.cancel_consumers_for(:amqp, timeout: amqp_timeout) + cm.drop_connections_for(:amqp) + rescue => e + logger.error "graceful shutdown error: #{e.class}: #{e.message}" + end + end.join + end + + if EventSource.config.auto_shutdown + at_exit { shutdown.call('at_exit received') } + + %w[TERM INT].each do |sig| + Signal.trap(sig) { shutdown.call("signal=#{sig} received") } + end + end EventSource.initialize! end end diff --git a/spec/event_source/channel_spec.rb b/spec/event_source/channel_spec.rb index 3f1020cc..ada8f37e 100644 --- a/spec/event_source/channel_spec.rb +++ b/spec/event_source/channel_spec.rb @@ -92,5 +92,23 @@ expect(audit_log_queue.pop.last).to eq 'test message from enterprise events!!' expect(audit_log_queue.pop.last).to eq 'test message from enrollment events!!' end + + context '.cancel_consumers' do + before do + subject.subscribe_operations.values.each do |sub_op| + subscriber_klass = double('SubscriberKlass') + sub_op.subscribe(subscriber_klass) + end + end + + it 'cancels consumers via channel' do + channel = subject + expect(channel.channel_proxy.any_consumers?).to be_truthy + expect(channel.subscribe_operations.values.first.subject.consumers.count).to be > 0 + channel.cancel_consumers + expect(channel.channel_proxy.any_consumers?).to be_falsey + expect(channel.subscribe_operations.values.first.subject.consumers.count).to eq 0 + end + end end end diff --git a/spec/event_source/connection_manager_spec.rb b/spec/event_source/connection_manager_spec.rb index 10100d71..3d6f14bb 100644 --- a/spec/event_source/connection_manager_spec.rb +++ b/spec/event_source/connection_manager_spec.rb @@ -92,6 +92,70 @@ ).to eq Hash.new end end + + context '.cancel_consumers_for' do + let(:connection) { connection_manager.connections[connection_url] } + + let(:async_api_file) do + Pathname.pwd.join('spec', 'support', 'asyncapi', 'polypress_amqp.yml') + end + + let(:async_api_channels) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call(path: async_api_file) + .success + .channels + end + + let(:channel) do + connection.channels[:'on_polypress.magi_medicaid.mitc.eligibilities'] + end + + before do + connection.start unless connection.active? + connection.add_channels(async_api_channels) + channel.subscribe_operations.values.each do |sub_op| + subscriber_klass = double('SubscriberKlass') + sub_op.subscribe(subscriber_klass) + end + end + + after { connection.disconnect if connection.active? } + + context 'when inflight messages are present' do + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(5) + end + + it 'waits for timeout' do + expect(channel).to receive(:cancel_consumers).and_call_original + connection_manager.cancel_consumers_for(protocol, timeout: 1) + end + end + + context 'when inflight messages are draining' do + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(1, 0) + end + + it 'waits for drain' do + expect(channel).to receive(:cancel_consumers).and_call_original + connection_manager.cancel_consumers_for(protocol, timeout: 5) + end + end + + context 'when no inflight messages are present' do + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(0) + end + + it 'cancels AMQP consumers on each channel without waiting' do + expect(channel).to receive(:cancel_consumers).and_call_original + connection_manager.cancel_consumers_for(protocol, timeout: 1) + end + end + end end end end @@ -163,5 +227,31 @@ end end end + + context 'when no connections are present + - .cancel_consumers_for' do + let(:protocol) { :amqp } + let(:connection) { instance_double('EventSource::Connection', protocol: protocol, channels: { default: channel }) } + let(:channel) { instance_double('EventSource::Channel') } + + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(0) + end + + context 'does not raise error' do + before do + allow(connection_manager).to receive(:connections_for).with(protocol).and_return([]) + allow(connection_manager).to receive(:wait_for_connections_to_drain) + end + + it 'and still calls drain helper' do + expect do + connection_manager.cancel_consumers_for(protocol, timeout: 1) + end.not_to raise_error + + expect(connection_manager).to have_received(:wait_for_connections_to_drain).with([], 1) + end + end + end end end diff --git a/spec/rails_app/spec/railtie_spec.rb b/spec/rails_app/spec/railtie_spec.rb index 7c93c25f..ce6c8a1f 100644 --- a/spec/rails_app/spec/railtie_spec.rb +++ b/spec/rails_app/spec/railtie_spec.rb @@ -1,11 +1,97 @@ # frozen_string_literal: true -require_relative './rails_helper' +require_relative 'rails_helper' RSpec.describe EventSource::Railtie do - it "runs when invoked" do - manager = EventSource::ConnectionManager.instance - connection = manager.connections_for(:amqp).first - expect(connection).not_to be_nil + before do + @original_auto_shutdown = EventSource.config.auto_shutdown + EventSource.config.auto_shutdown = auto_shutdown_enabled + @at_exit_handler = nil + allow_any_instance_of(Object).to receive(:at_exit) do |_, &blk| + @at_exit_handler = blk + end + initializer = Rails.application.initializers.find { |i| i.name == 'event_source.boot' } + initializer.run(Rails.application) + end + + context '.auto_shutdown' do + let(:protocol) { :amqp } + let(:url) { 'amqp://localhost:5672/' } + let(:protocol_version) { '0.9.1' } + let(:description) { 'Development RabbitMQ Server' } + let(:server_config) do + { + ref: url, + url: url, + protocol: protocol, + protocol_version: protocol_version, + description: description + } + end + + let(:connection_manager) { EventSource::ConnectionManager.instance } + let(:connection) do + connection_manager.add_connection(server_config) + connection_manager.connections_for(:amqp).first + end + + let(:async_api_file) do + Pathname.new(__dir__).join('..', '..', 'support', 'asyncapi', 'polypress_amqp.yml').expand_path + end + + let(:async_api_channels) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call(path: async_api_file) + .success + .channels + end + + let(:channel) do + connection.channels[:'on_polypress.magi_medicaid.mitc.eligibilities'] + end + + before do + connection_manager.drop_connections_for(:amqp) + connection_manager.drop_connections_for(:http) + connection.start unless connection.active? + connection.add_channels(async_api_channels) + + channel.subscribe_operations.each_value do |subscribe_operation| + subscriber_klass = double('SubscriberKlass') + subscribe_operation.subscribe(subscriber_klass) + end + + @original_timeouts = EventSource.config.shutdown_timeouts + EventSource.config.shutdown_timeouts = { amqp_drain: 2 } + allow(EventSource).to receive(:inflight_messages_count).and_return(0) + end + + after do + EventSource.config.shutdown_timeouts = @original_timeouts + EventSource.config.auto_shutdown = @original_auto_shutdown + end + + context 'when auto_shutdown is enabled' do + let(:auto_shutdown_enabled) { true } + + it 'cancels AMQP consumers and drops AMQP connections on shutdown' do + expect(connection_manager.connections_for(:amqp)).not_to be_empty + expect(connection.channels).not_to be_empty + expect(channel.subscribe_operations.values.first.subject.consumers).not_to be_empty + expect(@at_exit_handler).not_to be_nil + @at_exit_handler.call + + expect(connection_manager.connections_for(:amqp)).to be_empty + end + end + + context 'when auto_shutdown is disabled' do + let(:auto_shutdown_enabled) { false } + + it 'does not register at_exit handler' do + expect(connection_manager.connections_for(:amqp)).not_to be_empty + end + end end end