diff --git a/lib/datadog/statsd/sender.rb b/lib/datadog/statsd/sender.rb index a99b6957..50c92e79 100644 --- a/lib/datadog/statsd/sender.rb +++ b/lib/datadog/statsd/sender.rb @@ -1,159 +1,130 @@ # frozen_string_literal: true module Datadog + class FlushQueue < Queue + end + class CloseQueue < Queue + end class Statsd - # Sender is using a companion thread to flush and pack messages + # Sender is using a background thread to flush and pack messages # in a `MessageBuffer`. # The communication with this thread is done using a `Queue`. # If the thread is dead, it is starting a new one to avoid having a blocked - # Sender with no companion thread to communicate with (most of the time, having - # a dead companion thread means that a fork just happened and that we are - # running in the child process). + # Sender with no background thread to communicate with (most of the time, + # having a dead background thread means that a fork just happened and that we + # are running in the child process). class Sender CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close) def initialize(message_buffer, logger: nil) @message_buffer = message_buffer @logger = logger - @mx = Mutex.new - end - def flush(sync: false) - # keep a copy around in case another thread is calling #stop while this method is running - current_message_queue = message_queue - - # don't try to flush if there is no message_queue instantiated or - # no companion thread running - if !current_message_queue - @logger.debug { "Statsd: can't flush: no message queue ready" } if @logger - return - end - if !sender_thread.alive? - @logger.debug { "Statsd: can't flush: no sender_thread alive" } if @logger - return - end + # communication and synchronization with the background thread + # @mux is also used to not having multiple threads fighting for + # closing the Sender or creating a new background thread + @channel = Queue.new + @mux = Mutex.new + + @is_closed = false - current_message_queue.push(:flush) - rendez_vous if sync + # start background thread immediately + @sender_thread = Thread.new(&method(:send_loop)) end - def rendez_vous - # could happen if #start hasn't be called - return unless message_queue - - # Initialize and get the thread's sync queue - queue = (Thread.current[:statsd_sync_queue] ||= Queue.new) - # tell sender-thread to notify us in the current - # thread's queue - message_queue.push(queue) - # wait for the sender thread to send a message - # once the flush is done - queue.pop + def flush(sync: false) + @mux.synchronize { + # we don't want to send a flush action to the bg thread if: + # - there is no bg thread running + # - the sender has been closed + return if !sender_thread.alive? || @is_closed + + if sync + # blocking flush + blocking_queue = FlushQueue.new + channel << blocking_queue + blocking_queue.pop # wait for the bg thread to finish its work + blocking_queue.close if CLOSEABLE_QUEUES + else + # asynchronous flush + channel << :flush + end + } end def add(message) - raise ArgumentError, 'Start sender first' unless message_queue + return if @is_closed # don't send a message to the bg thread if the sender has been closed - # if the thread does not exist, we assume we are running in a forked process, - # empty the message queue and message buffers (these messages belong to - # the parent process) and spawn a new companion thread. + # the bg thread is not running anymore, this is happening if the main process has forked and + # we are running in the child, we will spawn a bg thread and reset buffers (containing parents' messages) if !sender_thread.alive? - @mx.synchronize { - # a call from another thread has already re-created - # the companion thread before this one acquired the lock + @mux.synchronize { + return if @is_closed + # test if a call from another thread has already re-created + # the background thread before this one acquired the lock break if sender_thread.alive? - @logger.debug { "Statsd: companion thread is dead, re-creating one" } if @logger - message_queue.close if CLOSEABLE_QUEUES - @message_queue = nil - message_buffer.reset - start + # re-create the channel of communication since we will spawn a new bg thread + channel.close if CLOSEABLE_QUEUES + @channel = Queue.new + message_buffer.reset # don't use messages appended by another fork + @sender_thread = Thread.new(&method(:send_loop)) } end - message_queue << message + channel << message end - def start - raise ArgumentError, 'Sender already started' if message_queue - - # initialize a new message queue for the background thread - @message_queue = Queue.new - # start background thread - @sender_thread = Thread.new(&method(:send_loop)) + # Compatibility with `Sender` + def start() end - if CLOSEABLE_QUEUES - # when calling stop, make sure that no other threads is trying - # to close the sender nor trying to continue to `#add` more message - # into the sender. - def stop(join_worker: true) - message_queue = @message_queue - message_queue.close if message_queue - - sender_thread = @sender_thread - sender_thread.join if sender_thread && join_worker - end - else - # when calling stop, make sure that no other threads is trying - # to close the sender nor trying to continue to `#add` more message - # into the sender. - def stop(join_worker: true) - message_queue = @message_queue - message_queue << :close if message_queue - - sender_thread = @sender_thread - sender_thread.join if sender_thread && join_worker - end + def stop() + return if @is_closed + # use this lock to both: not having another thread stopping this instance nor + # having a #add call creating a new thread + @mux.synchronize { + @is_closed = true + if sender_thread.alive? # no reasons to stop the bg thread is none is running already + blocking_queue = CloseQueue.new + channel << blocking_queue + blocking_queue.pop # wait for the bg thread to finish its work + blocking_queue.close if CLOSEABLE_QUEUES + sender_thread.join(3) # wait for completion, timeout after 3 seconds + # TODO(remy): should I close `channel` here? + end + } end private attr_reader :message_buffer - attr_reader :message_queue + attr_reader :channel + attr_reader :mux attr_reader :sender_thread - if CLOSEABLE_QUEUES - def send_loop - until (message = message_queue.pop).nil? && message_queue.closed? - # skip if message is nil, e.g. when message_queue - # is empty and closed - next unless message - - case message - when :flush - message_buffer.flush - when Queue - message.push(:go_on) - else - message_buffer.add(message) - end + def send_loop + until (message = channel.pop).nil? && (CLOSEABLE_QUEUES && channel.closed?) + # skip if message is nil, e.g. when the channel is empty and closed + next unless message + + case message + # if a FlushQueue is received, the background thread has to flush the message + # buffer and to send an :unblock to let the caller know that it has finished + when FlushQueue + message_buffer.flush + message << :unblock + # if a :flush is received, the background thread has to flush asynchronously + when :flush + message_buffer.flush + # if a CloseQueue is received, the background thread has to do a last flush + # and to send an :unblock to let the caller know that it has finished + when CloseQueue + message << :unblock + return + else + message_buffer.add(message) end - - @message_queue = nil - @sender_thread = nil - end - else - def send_loop - loop do - message = message_queue.pop - - next unless message - - case message - when :close - break - when :flush - message_buffer.flush - when Queue - message.push(:go_on) - else - message_buffer.add(message) - end - end - - @message_queue = nil - @sender_thread = nil end end end