Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 88 additions & 117 deletions lib/datadog/statsd/sender.rb
Original file line number Diff line number Diff line change
@@ -1,159 +1,130 @@
# frozen_string_literal: true

module Datadog
class FlushQueue < Queue
end
class CloseQueue < Queue
end
Comment on lines +4 to +7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these aren't used elsewhere, I suggest putting them inside the Sender class itself.

Also (very minor) if you want a single-line definition, you can use:

FlushQueue = Class.new(Queue) # OR
class FlushQueue < Queue; end

class Statsd
# Sender is using a companion thread to flush and pack messages
# Sender is using a background thread to flush and pack messages
# in a `MessageBuffer`.
# The communication with this thread is done using a `Queue`.
# If the thread is dead, it is starting a new one to avoid having a blocked
# Sender with no companion thread to communicate with (most of the time, having
# a dead companion thread means that a fork just happened and that we are
# running in the child process).
# Sender with no background thread to communicate with (most of the time,
# having a dead background thread means that a fork just happened and that we
# are running in the child process).
class Sender
CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close)

def initialize(message_buffer, logger: nil)
@message_buffer = message_buffer
@logger = logger
@mx = Mutex.new
end

def flush(sync: false)
# keep a copy around in case another thread is calling #stop while this method is running
current_message_queue = message_queue

# don't try to flush if there is no message_queue instantiated or
# no companion thread running
if !current_message_queue
@logger.debug { "Statsd: can't flush: no message queue ready" } if @logger
return
end
if !sender_thread.alive?
@logger.debug { "Statsd: can't flush: no sender_thread alive" } if @logger
return
end
# communication and synchronization with the background thread
# @mux is also used to not having multiple threads fighting for
# closing the Sender or creating a new background thread
@channel = Queue.new
@mux = Mutex.new

@is_closed = false

current_message_queue.push(:flush)
rendez_vous if sync
# start background thread immediately
@sender_thread = Thread.new(&method(:send_loop))
end

def rendez_vous
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The forwarder will probably need to be updated to not use #rendez_vous anymore, right? (To be honest, I don't quite understand the use-case of having both #sync_with_outbound_io and #flush at the top-level).

# could happen if #start hasn't be called
return unless message_queue

# Initialize and get the thread's sync queue
queue = (Thread.current[:statsd_sync_queue] ||= Queue.new)
Comment on lines -44 to -45
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We lost this caching-the-queue behavior in the refactoring, which doesn't seem like an issue, but just doublechecking by asking if this is ok from a performance pov (I actually am not sure how expensive it is to create queues, probably not a lot)

# tell sender-thread to notify us in the current
# thread's queue
message_queue.push(queue)
# wait for the sender thread to send a message
# once the flush is done
queue.pop
def flush(sync: false)
@mux.synchronize {
# we don't want to send a flush action to the bg thread if:
# - there is no bg thread running
# - the sender has been closed
return if !sender_thread.alive? || @is_closed

if sync
# blocking flush
blocking_queue = FlushQueue.new
channel << blocking_queue
blocking_queue.pop # wait for the bg thread to finish its work
blocking_queue.close if CLOSEABLE_QUEUES
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I'm growing increasingly unconvinced this whole business with the CLOSEABLE_QUEUES is worth it. It effectively creates two code paths for different Ruby versions, but it's not like we're going to drop support for the old Rubies soon (#close is a Ruby 2.3 feature, and we're still fighting to drop 2.0).

Would it be simpler to just remove this entirely? It doesn't even seem that it would particularly improve performance either.

else
# asynchronous flush
channel << :flush
end
}
end

def add(message)
raise ArgumentError, 'Start sender first' unless message_queue
return if @is_closed # don't send a message to the bg thread if the sender has been closed

# if the thread does not exist, we assume we are running in a forked process,
# empty the message queue and message buffers (these messages belong to
# the parent process) and spawn a new companion thread.
# the bg thread is not running anymore, this is happening if the main process has forked and
# we are running in the child, we will spawn a bg thread and reset buffers (containing parents' messages)
if !sender_thread.alive?
@mx.synchronize {
# a call from another thread has already re-created
# the companion thread before this one acquired the lock
@mux.synchronize {
return if @is_closed
# test if a call from another thread has already re-created
# the background thread before this one acquired the lock
break if sender_thread.alive?
@logger.debug { "Statsd: companion thread is dead, re-creating one" } if @logger

message_queue.close if CLOSEABLE_QUEUES
@message_queue = nil
message_buffer.reset
start
# re-create the channel of communication since we will spawn a new bg thread
channel.close if CLOSEABLE_QUEUES
@channel = Queue.new
message_buffer.reset # don't use messages appended by another fork
@sender_thread = Thread.new(&method(:send_loop))
}
end

message_queue << message
channel << message
end

def start
raise ArgumentError, 'Sender already started' if message_queue

# initialize a new message queue for the background thread
@message_queue = Queue.new
# start background thread
@sender_thread = Thread.new(&method(:send_loop))
# Compatibility with `Sender`
def start()
end
Comment on lines +78 to 80
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, neither Sender nor SingleThreadedSender use #start, so perhaps it would make sense to just remove them?


if CLOSEABLE_QUEUES
# when calling stop, make sure that no other threads is trying
# to close the sender nor trying to continue to `#add` more message
# into the sender.
def stop(join_worker: true)
message_queue = @message_queue
message_queue.close if message_queue

sender_thread = @sender_thread
sender_thread.join if sender_thread && join_worker
end
else
# when calling stop, make sure that no other threads is trying
# to close the sender nor trying to continue to `#add` more message
# into the sender.
def stop(join_worker: true)
message_queue = @message_queue
message_queue << :close if message_queue

sender_thread = @sender_thread
sender_thread.join if sender_thread && join_worker
end
def stop()
return if @is_closed
# use this lock to both: not having another thread stopping this instance nor
# having a #add call creating a new thread
@mux.synchronize {
@is_closed = true
if sender_thread.alive? # no reasons to stop the bg thread is none is running already
blocking_queue = CloseQueue.new
channel << blocking_queue
blocking_queue.pop # wait for the bg thread to finish its work
blocking_queue.close if CLOSEABLE_QUEUES
sender_thread.join(3) # wait for completion, timeout after 3 seconds
# TODO(remy): should I close `channel` here?
Comment on lines +90 to +94
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I suggested above, I think it'd be simpler to just not use close; if we do, calling close here may be problematic if two stops get called concurrently, but the background thread is taking long to finish. E.g. something like T1: acquire mutex -> tell background thread to stop -> timeout join -> call close -> release mutex; T2: acquire mutex -> does not see previous @is_closed -> tries to write to channel -> channel has been closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, should the stop behavior be a bit more flexible? E.g. configurable timeout, or optionally run a block to decide what to do.

end
}
end

private

attr_reader :message_buffer
attr_reader :message_queue
attr_reader :channel
attr_reader :mux
attr_reader :sender_thread

if CLOSEABLE_QUEUES
def send_loop
until (message = message_queue.pop).nil? && message_queue.closed?
# skip if message is nil, e.g. when message_queue
# is empty and closed
next unless message

case message
when :flush
message_buffer.flush
when Queue
message.push(:go_on)
else
message_buffer.add(message)
end
def send_loop
until (message = channel.pop).nil? && (CLOSEABLE_QUEUES && channel.closed?)
# skip if message is nil, e.g. when the channel is empty and closed
next unless message

case message
# if a FlushQueue is received, the background thread has to flush the message
# buffer and to send an :unblock to let the caller know that it has finished
when FlushQueue
message_buffer.flush
message << :unblock
# if a :flush is received, the background thread has to flush asynchronously
when :flush
message_buffer.flush
# if a CloseQueue is received, the background thread has to do a last flush
# and to send an :unblock to let the caller know that it has finished
when CloseQueue
message << :unblock
return
else
message_buffer.add(message)
end

@message_queue = nil
@sender_thread = nil
end
else
def send_loop
loop do
message = message_queue.pop

next unless message

case message
when :close
break
when :flush
message_buffer.flush
when Queue
message.push(:go_on)
else
message_buffer.add(message)
end
end

@message_queue = nil
@sender_thread = nil
end
end
end
Expand Down