Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,27 @@ def register_subscriber(subscriber_klass)
def register_publisher(subscriber_klass)
boot_registry.register_publisher(subscriber_klass)
end

def inflight_messages_count
@inflight_mutex ||= Mutex.new
@inflight_mutex.synchronize { @inflight_messages_count ||= 0 }
end

def increment_inflight_messages
@inflight_mutex ||= Mutex.new
@inflight_mutex.synchronize do
@inflight_messages_count ||= 0
@inflight_messages_count += 1
end
end

def decrement_inflight_messages
@inflight_mutex ||= Mutex.new
@inflight_mutex.synchronize do
@inflight_messages_count ||= 0
@inflight_messages_count = [@inflight_messages_count - 1, 0].max
end
end
end

class EventSourceLogger
Expand Down
7 changes: 7 additions & 0 deletions lib/event_source/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,12 @@ def add_subscribe_operation(async_api_channel_item)
)
logger.info " Subscribe Operation Added: #{operation_id}"
end

def cancel_consumers
subscribe_operations.each_value do |sub_op|
subject = sub_op.subject
subject.cancel_consumers! if subject.respond_to?(:cancel_consumers!)
end
end
end
end
3 changes: 2 additions & 1 deletion lib/event_source/configure/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ class Config

def initialize
@log_level = :warn
@shutdown_timeouts = { amqp_drain: 5, http_drain: 5 }
end

# TODO: add default for pub_sub_root
attr_writer :pub_sub_root, :protocols, :server_configurations
attr_accessor :app_name, :log_level
attr_accessor :app_name, :log_level, :auto_shutdown, :shutdown_timeouts

def load_protocols
@protocols.each do |protocol|
Expand Down
46 changes: 46 additions & 0 deletions lib/event_source/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,54 @@ def drop_connection(connection_uri)
connections
end

def cancel_consumers_for(protocol, timeout: 5)
protocol_value = protocol.to_s.upcase
logger.info "Cancelling #{protocol_value} consumers prior to shutdown"
connections = connections_for(protocol)
any_consumers = connections.any? { |connection| connection_has_consumers?(connection) }
logger.info "#{protocol_value} inflight handlers (before_cancel): #{EventSource.inflight_messages_count}, any_consumers=#{any_consumers}"

connections.each do |connection|
connection.channels.each_value do |channel|
channel.cancel_consumers
end
end

wait_for_connections_to_drain(connections, timeout)

logger.info "#{protocol_value} consumer cancellation complete"
logger.info "#{protocol_value} inflight handlers (end): #{EventSource.inflight_messages_count}"
end

private

def wait_for_connections_to_drain(connections, timeout)
return if connections.empty?

protocol = connections.first.protocol.to_s.upcase
start = Time.now

loop do
inflight = EventSource.inflight_messages_count
any_consumers = connections.any? { |connection| connection_has_consumers?(connection) }

logger.info "#{protocol} inflight handlers (drain): #{inflight}, any_consumers=#{any_consumers}"
break if inflight.zero? && !any_consumers
break if (Time.now - start) >= timeout
sleep 0.1
end
end

def connection_has_consumers?(connection)
connection.channels.values.any? do |channel|
begin
channel.channel_proxy.subject.any_consumers?
rescue StandardError
false
end
end
end

# Find connection proxy class for given protocol
# @param [Symbol] protocol the protocol name, `:http` or `:amqp`
# @return [Class] Protocol Specific Connection Proxy Class
Expand Down
17 changes: 16 additions & 1 deletion lib/event_source/protocols/amqp/bunny_queue_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,15 @@ def spawn_thread(options)
end

def add_consumer(subscriber_klass, options)
@subject.subscribe(options) do |delivery_info, metadata, payload|
consumer = @subject.subscribe(options) do |delivery_info, metadata, payload|
on_receive_message(
subscriber_klass,
delivery_info,
metadata,
payload
)
end
@consumers << consumer if consumer
end

def convert_to_subscribe_options(options)
Expand All @@ -124,6 +125,7 @@ def on_receive_message(
metadata,
payload
)
EventSource.increment_inflight_messages
logger.debug '**************************'
logger.debug subscriber_klass.inspect
logger.debug delivery_info.inspect
Expand Down Expand Up @@ -158,6 +160,7 @@ def on_receive_message(
logger.error "Bunny Consumer Error \n message: #{e.message} \n backtrace: #{e.backtrace.join("\n")}"
ensure
subscriber = nil
EventSource.decrement_inflight_messages
end

# Decodes the given payload based on the `contentEncoding` specified in the AsyncAPI *_subscribe.yml message bindings.
Expand Down Expand Up @@ -198,6 +201,18 @@ def method_missing(name, *args)
@subject.send(name, *args)
end

# Cancel all registered consumers for this queue
def cancel_consumers!
@consumers.each do |consumer|
begin
consumer.cancel
rescue StandardError => e
logger.info "Consumer cancellation error: #{e.message}"
end
end
@consumers.clear
end

private

def subscriber_klass_name_to_suffix(subscriber_klass)
Expand Down
32 changes: 32 additions & 0 deletions lib/event_source/railtie.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,41 @@
# frozen_string_literal: true

require 'logger'

module EventSource
# :nodoc:
module Railtie
Rails::Application::Finisher.initializer "event_source.boot", after: :finisher_hook do
logger = Logger.new($stdout)
logger.progname = 'EventSource graceful shutdown'
timeouts = EventSource.config.shutdown_timeouts || {}
amqp_timeout = timeouts[:amqp_drain] || 5

# Perform shutdown work outside of trap/at_exit context to avoid
# ThreadError from mutex operations within Bunny (AMQP client).
shutdown = lambda do |reason|
Thread.new do
begin
logger.info "#{reason}, starting graceful shutdown"
logger.info "AMQP inflight handlers at shutdown start: #{EventSource.inflight_messages_count}"
cm = EventSource::ConnectionManager.instance

# Stop consuming and allow in-flight handlers to drain briefly
cm.cancel_consumers_for(:amqp, timeout: amqp_timeout)
cm.drop_connections_for(:amqp)
rescue => e
logger.error "graceful shutdown error: #{e.class}: #{e.message}"
end
end.join
end

if EventSource.config.auto_shutdown
at_exit { shutdown.call('at_exit received') }

%w[TERM INT].each do |sig|
Signal.trap(sig) { shutdown.call("signal=#{sig} received") }
end
end
EventSource.initialize!
end
end
Expand Down
18 changes: 18 additions & 0 deletions spec/event_source/channel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,23 @@
expect(audit_log_queue.pop.last).to eq 'test message from enterprise events!!'
expect(audit_log_queue.pop.last).to eq 'test message from enrollment events!!'
end

context '.cancel_consumers' do
before do
subject.subscribe_operations.values.each do |sub_op|
subscriber_klass = double('SubscriberKlass')
sub_op.subscribe(subscriber_klass)
end
end

it 'cancels consumers via channel' do
channel = subject
expect(channel.channel_proxy.any_consumers?).to be_truthy
expect(channel.subscribe_operations.values.first.subject.consumers.count).to be > 0
channel.cancel_consumers
expect(channel.channel_proxy.any_consumers?).to be_falsey
expect(channel.subscribe_operations.values.first.subject.consumers.count).to eq 0
end
end
end
end
90 changes: 90 additions & 0 deletions spec/event_source/connection_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,70 @@
).to eq Hash.new
end
end

context '.cancel_consumers_for' do
let(:connection) { connection_manager.connections[connection_url] }

let(:async_api_file) do
Pathname.pwd.join('spec', 'support', 'asyncapi', 'polypress_amqp.yml')
end

let(:async_api_channels) do
EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath
.new
.call(path: async_api_file)
.success
.channels
end

let(:channel) do
connection.channels[:'on_polypress.magi_medicaid.mitc.eligibilities']
end

before do
connection.start unless connection.active?
connection.add_channels(async_api_channels)
channel.subscribe_operations.values.each do |sub_op|
subscriber_klass = double('SubscriberKlass')
sub_op.subscribe(subscriber_klass)
end
end

after { connection.disconnect if connection.active? }

context 'when inflight messages are present' do
before do
allow(EventSource).to receive(:inflight_messages_count).and_return(5)
end

it 'waits for timeout' do
expect(channel).to receive(:cancel_consumers).and_call_original
connection_manager.cancel_consumers_for(protocol, timeout: 1)
end
end

context 'when inflight messages are draining' do
before do
allow(EventSource).to receive(:inflight_messages_count).and_return(1, 0)
end

it 'waits for drain' do
expect(channel).to receive(:cancel_consumers).and_call_original
connection_manager.cancel_consumers_for(protocol, timeout: 5)
end
end

context 'when no inflight messages are present' do
before do
allow(EventSource).to receive(:inflight_messages_count).and_return(0)
end

it 'cancels AMQP consumers on each channel without waiting' do
expect(channel).to receive(:cancel_consumers).and_call_original
connection_manager.cancel_consumers_for(protocol, timeout: 1)
end
end
end
end
end
end
Expand Down Expand Up @@ -163,5 +227,31 @@
end
end
end

context 'when no connections are present
- .cancel_consumers_for' do
let(:protocol) { :amqp }
let(:connection) { instance_double('EventSource::Connection', protocol: protocol, channels: { default: channel }) }
let(:channel) { instance_double('EventSource::Channel') }

before do
allow(EventSource).to receive(:inflight_messages_count).and_return(0)
end

context 'does not raise error' do
before do
allow(connection_manager).to receive(:connections_for).with(protocol).and_return([])
allow(connection_manager).to receive(:wait_for_connections_to_drain)
end

it 'and still calls drain helper' do
expect do
connection_manager.cancel_consumers_for(protocol, timeout: 1)
end.not_to raise_error

expect(connection_manager).to have_received(:wait_for_connections_to_drain).with([], 1)
end
end
end
end
end
Loading