Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Worker < Base
attr_reader :total

def initialize(redis, config)
@reserved_test = nil
@reserved_tests = Set.new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it would be difficult to send the process ID back up and scope by that?

Copy link
Member Author

@shioyama shioyama Jun 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not sure that would be possible 🤔

We have a master process that is reserving tests, which forked processes pull from. The master process fills the queue as soon as it has less than the max (currently 1).

When we are reserving a test, therefore, we don't know which process will end up running it, so we can't check for a "mismatch" at that point.

However, we might be able to improve the check in acknowledge such that when a worker process takes a test (pops it from the server via drb), we track its process ID and associate it with the reserved test. Then in theory when the result comes back, we could check that the reserved test exists, and that the pid we associated with it matches the worker that is trying to record the result.

I'm not sure that we can get the pid though from a request to the drb server. 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh never mind we can just make the worker send its pid.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so we can indeed check that the worker recording a result for a test matches the pid for the worker that asked for that same test. I don't think it would be easy to do that on the gem side of this, though. And I'm not really convinced that having that check would do much for us.

OTOH I don't see how we can do anything about reserve, given that when a server process reserves a test, it doesn't know which worker will end up taking it. So there's no check to be made at that moment.

@shutdown_required = false
super(redis, config)
end
Expand Down Expand Up @@ -135,7 +135,7 @@ def requeue(test, offset: Redis.requeue_offset)
argv: [config.max_requeues, global_max_requeues, test_key, offset],
) == 1

@reserved_test = test_key unless requeued
reserved_tests << test_key unless requeued
requeued
end

Expand All @@ -152,25 +152,24 @@ def release!

attr_reader :index

def reserved_tests
@reserved_tests ||= Set.new
end

def worker_id
config.worker_id
end

def raise_on_mismatching_test(test_key)
if @reserved_test == test_key
@reserved_test = nil
else
raise ReservationError, "Acknowledged #{test_key.inspect} but #{@reserved_test.inspect} was reserved"
def raise_on_mismatching_test(test)
unless reserved_tests.delete?(test)
raise ReservationError, "Acknowledged #{test.inspect} but only #{reserved_tests.map(&:inspect).join(", ")} reserved"
end
end

def reserve
if @reserved_test
raise ReservationError, "#{@reserved_test.inspect} is already reserved. " \
"You have to acknowledge it before you can reserve another one"
end

@reserved_test = (try_to_reserve_lost_test || try_to_reserve_test)
test = (try_to_reserve_lost_test || try_to_reserve_test)
reserved_tests << test
test
end

def try_to_reserve_test
Expand Down
13 changes: 9 additions & 4 deletions ruby/lib/ci/queue/static.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,15 @@ def remaining
end

def running
@reserved_test ? 1 : 0
reserved_tests.empty? ? 0 : 1
end

def poll
while !@shutdown && config.circuit_breakers.none?(&:open?) && !max_test_failed? && @reserved_test = @queue.shift
yield index.fetch(@reserved_test)
while !@shutdown && config.circuit_breakers.none?(&:open?) && !max_test_failed? && reserved_test = @queue.shift
reserved_tests << reserved_test
yield index.fetch(reserved_test)
end
@reserved_test = nil
reserved_tests.clear
end

def exhausted?
Expand Down Expand Up @@ -142,6 +143,10 @@ def should_requeue?(key)
def requeues
@requeues ||= Hash.new(0)
end

def reserved_tests
@reserved_tests ||= Set.new
end
end
end
end
2 changes: 1 addition & 1 deletion ruby/test/minitest/queue/build_status_recorder_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_retrying_test
private

def reserve(queue, method_name)
queue.instance_variable_set(:@reserved_test, Minitest::Queue::SingleExample.new("Minitest::Test", method_name).id)
queue.instance_variable_set(:@reserved_tests, Set.new([Minitest::Queue::SingleExample.new("Minitest::Test", method_name).id]))
end

def worker(id)
Expand Down
Loading