Skip to content

Conversation

@rayz
Copy link
Contributor

@rayz rayz commented Feb 24, 2025

This pr introduces a change to handle the case in which sender_thread fails to start and a ThreadError is returned.

Addresses issue #306 authored by @mperham

@rayz rayz requested a review from a team as a code owner February 24, 2025 03:40
# start background thread
@sender_thread = @thread_class.new(&method(:send_loop))
@sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3')
rescue ThreadError

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to try to distinguish the error on shutdown from other possible errors? Thread.new can fail for other reasons, such as no memory or too many processes, and it would be unfortunate to bury that in debug messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I've updated it to log the error message. Let me know if that's sufficient enough

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging the message is better, but it still a debug message. It may not be enabled and it would be difficult to find if failure to start the sender thread results in loss of observability.

Would it be possible to check if the sender thread was terminated or the process is exiting and skip re-starting sender thread in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a check for sender_thread.status.nil? to check for thread termination. I am not too familiar with ruby so I'm unsure how to check if the process is exiting. I found the at_exit function but I don't think it would apply here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ruby's Thread documentation is unfortunately lacking in its descriptions of what happens when, but this program still crashes on my machine sometimes, so it looks like status and alive? are not (always) updated together:

sender = Thread.new do
  sleep 600
end

task = Thread.new do
  begin
    sleep 600
  rescue Interrupt
  ensure
    if not sender.alive? and not sender.status.nil?
      Thread.new { puts "OK" }.join
    end
  end
end

sleep 0.01

task.raise Interrupt

This, however, seems to work fine:

done = false

sender = Thread.new do
  sleep 600
ensure
  done = true
end

task = Thread.new do
  begin
    sleep 600
  rescue Interrupt
  ensure
    if not sender.alive? and not done
      Thread.new { puts "OK" }.join
    end
  end
end

sleep 0.01

task.raise Interrupt

Variable assignment appears to be atomic at least in MRI, but we'd need to double-check this for all implementations we support, or we'd need an explicit atomic or a mutex. Although it seems to happen in practice, I also could not find any guarantee that ensure blocks must run when process terminates the threads, so this might not be entirely valid approach either.

@p-datadog
Copy link
Member

p-datadog commented Mar 3, 2025

I have done some testing on this.

If I understand correctly the following script reproduces the problem (on MRI):

Thread.new do
  begin
    loop do
      sleep 1
    end
  ensure
    puts 'ensure'

    Thread.new do
      puts 'in thread'
    end.join
  end
end
sleep 0.1

You can add an explicit exit at the end but it is not necessary:

Thread.new do
  begin
    loop do
      sleep 1
    end
  ensure
    puts 'ensure'

    Thread.new do
      puts 'in thread'
    end.join
  end
end
sleep 0.1

puts 'exit'
exit

Result:

big% ruby test.rb
exit
ensure
#<Thread:0x000077bb22f77c40 test.rb:1 aborting> terminated with exception (report_on_exception is true):
test.rb:9:in `new': can't alloc thread (ThreadError)
	from test.rb:9:in `ensure in block in <main>'
	from test.rb:11:in `block in <main>

The "can't alloc thread" message in ThreadError is, funnily enough, used only in the case of the main thread being dead (and is, therefore, never a failure to allocate anything):

https://github.com/ruby/ruby/blob/master/thread.c#L908

    if (GET_RACTOR()->threads.main->status == THREAD_KILLED) {
        rb_raise(rb_eThreadError, "can't alloc thread");
    }

To detect the case of process exiting in the existing Ruby releases, it should then be sufficient to simply check for the ThreadError's message being "can't alloc thread". Of course, this is a pretty fragile way of going about it, and our code also wouldn't make sense to people reading it (since the issue is not allocation of thread).

A more reliable way of detecting this situation would be performing the check quoted above for main thread status, however I am not sure how to get at it from Ruby code (since dogstatsd doesn't have any C extensions that I can currently spot).

On JRuby, threads can be created after the main thread starts exiting (no exception is raised). The reproducer above produces:

big% rbenv shell jruby            
big% ruby test.rb 
exit
ensure
in thread

Copy link
Member

@p-datadog p-datadog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that the PR will attempt to flush the messages one time but if there are multiple calls to add after the process begins to exit, the second and subsequent ones would not attempt to flush the data out synchronously (due to the @done flag check).

# the parent process) and spawn a new companion thread.
if !sender_thread.alive?
@mx.synchronize {
break if @done
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this added line?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect the intention is to not try to start again if the previous start failed.... But yeah it's probably not a bad idea to make this a lot more clearer.

@sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3')
rescue ThreadError => e
@logger.debug { "Statsd: Failed to start sender thread: #{e.message}, flushing message buffer" } if @logger
message_buffer.flush
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line can raise exceptions, that the add method previously wouldn't be raising (i.e. those that are a result of a sending attempt); are they already appropriately handled by upstream code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't completely clear to me on the first pass that the intention here is to flush the message_buffer synchronously (aka in the current thread).

Yet... if the intention is that we get there from being called either when the Sender is originally created (nothing in the buffer) or from add (in the forked process scenario), is message_buffer ever going to contain anything at all?

The control flow to this method seems to be a bit confusing, e.g. it may be worth clarifying when and how this gets called (with a comment or a refactoring?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, if forked, message_queue and message_buffer is emptied. I've removed the additional flush.

Copy link
Member

@ivoanjo ivoanjo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely a weird one, thanks for looking into it! I left a few notes/suggestions.

@sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3')
rescue ThreadError => e
@logger.debug { "Statsd: Failed to start sender thread: #{e.message}, flushing message buffer" } if @logger
message_buffer.flush
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't completely clear to me on the first pass that the intention here is to flush the message_buffer synchronously (aka in the current thread).

Yet... if the intention is that we get there from being called either when the Sender is originally created (nothing in the buffer) or from add (in the forked process scenario), is message_buffer ever going to contain anything at all?

The control flow to this method seems to be a bit confusing, e.g. it may be worth clarifying when and how this gets called (with a comment or a refactoring?).

# the parent process) and spawn a new companion thread.
if !sender_thread.alive?
@mx.synchronize {
break if @done
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect the intention is to not try to start again if the previous start failed.... But yeah it's probably not a bad idea to make this a lot more clearer.

end
end
@sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3')
rescue ThreadError => e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest adding a test for this, as it's a bit brittle to maintain and understand what led to it. It looks like it'd be easy to do so: the @thread_class is already being injected, and thus it can be replaced by something that always raises a dummy ThreadError when #new is called on it.

Copy link
Member

@ivoanjo ivoanjo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few extra notes :)

Comment on lines 108 to 110
ensure
@mx.synchronize { @done = true }
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the intention is to handle ThreadErrors... I don't think this is doing it. Specifically, if the ThreadError is raised by Thread.new, then the ensure will never run, so @done is never going to be set, and every add will result in the attempt to recreate the thread.

end

before do
allow(thread_class).to receive(:new).and_raise(ThreadError, "ThreadError")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor:

Suggested change
allow(thread_class).to receive(:new).and_raise(ThreadError, "ThreadError")
expect(thread_class).to receive(:new).and_raise(ThreadError, "ThreadError")

Using expect here adds a tiny extra level of checks against the test passing trivially, e.g. if the method does not do anything. This is useful in tests that check "it doesn't break" as otherwise they can pass for the wrong reasons.

Alternatively, you can expect on the logger receiving a message as a way of "observing" that the correct path was exercised in the test.

@mx.synchronize {
# an attempt was previously made to start the sender thread but failed.
# skipping re-start
break if @done
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A question occurred to me on a second pass on this one: is break really the intention here, or would return be more correct?

Since we're inside a Ruby block, break only pops out of the @mx.synchronize, and then executes the rest of the method (e.g. from the if message_queue.length <= @queue_size on).

Which means we still store all messages whenever there's a failure. Is that intended? (May be worth adding a spec to clarify this -- e.g. that the queue is either empty or not empty after the failure)

Copy link
Member

@ivoanjo ivoanjo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

Comment on lines 105 to 110
@sender_thread = @thread_class.new do
begin
send_loop
ensure
end
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I guess this can go back to the earlier @sender_thread = @thread_class.new(&method(:send_loop))

@rayz rayz merged commit 64d9855 into master Mar 13, 2025
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants