Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions lib/datadog/statsd/sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def initialize(message_buffer, telemetry: nil, queue_size: UDP_DEFAULT_BUFFER_SI
@mx = Mutex.new
@queue_class = queue_class
@thread_class = thread_class
@done = false
@flush_timer = if flush_interval
Datadog::Statsd::Timer.new(flush_interval) { flush(sync: true) }
else
Expand Down Expand Up @@ -66,8 +67,11 @@ def add(message)
# if the thread does not exist, we assume we are running in a forked process,
# empty the message queue and message buffers (these messages belong to
# the parent process) and spawn a new companion thread.
if !sender_thread.alive?
if sender_thread.nil? || !sender_thread.alive?
@mx.synchronize {
# an attempt was previously made to start the sender thread but failed.
# skipping re-start
return if @done
# a call from another thread has already re-created
# the companion thread before this one acquired the lock
break if sender_thread.alive?
Expand Down Expand Up @@ -96,9 +100,15 @@ def start

# initialize a new message queue for the background thread
@message_queue = @queue_class.new
# start background thread
@sender_thread = @thread_class.new(&method(:send_loop))
@sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3')
begin
# start background thread
@sender_thread = @thread_class.new(&method(:send_loop))
@sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3')
rescue ThreadError => e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest adding a test for this, as it's a bit brittle to maintain and understand what led to it. It looks like it'd be easy to do so: the @thread_class is already being injected, and thus it can be replaced by something that always raises a dummy ThreadError when #new is called on it.

@logger.debug { "Statsd: Failed to start sender thread: #{e.message}" } if @logger
@mx.synchronize { @done = true }
end

@flush_timer.start if @flush_timer
end

Expand Down
39 changes: 39 additions & 0 deletions spec/statsd/sender_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,45 @@
end
end

context 'when a ThreadError is raised starting the sender thread' do
let(:thread_class) do
class_double(Thread)
end

before do
expect(thread_class).to receive(:new).and_raise(ThreadError, "ThreadError")
end

it 'ignores the thread error' do
expect do
subject.start
end.not_to raise_error
end

context 'when add' do
let(:fake_queue_length) { queue_size }
let(:fake_queue) do
if Queue.instance_methods.include?(:close)
instance_double(Queue, { "length" => fake_queue_length, "<<" => true, "close" => true })
else
instance_double(Queue, { "length" => fake_queue_length, "<<" => true })
end
end
let(:queue_class) do
class_double(Queue, new: fake_queue)
end

it 'does not store the message' do
subject.start
expect(fake_queue).not_to receive(:<<).with('message')
if not Queue.instance_methods.include?(:close)
expect(fake_queue).to receive(:<<).with(:close)
end
subject.add('message')
end
end
end

context 'when flush_interval is set' do
let(:flush_interval) { 0.001 }

Expand Down
Loading