Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Worker < Base
attr_reader :total

def initialize(redis, config)
@reserved_test = nil
@reserved_tests = Set.new
@shutdown_required = false
super(redis, config)
end
Expand Down Expand Up @@ -151,7 +151,7 @@ def requeue(test, offset: Redis.requeue_offset)
argv: [config.max_requeues, global_max_requeues, test_key, offset],
) == 1

@reserved_test = test_key unless requeued
reserved_tests << test_key unless requeued
requeued
end

Expand All @@ -168,25 +168,24 @@ def release!

attr_reader :index

def reserved_tests
@reserved_tests ||= Set.new
end

def worker_id
config.worker_id
end

def raise_on_mismatching_test(test)
if @reserved_test == test
@reserved_test = nil
else
raise ReservationError, "Acknowledged #{test.inspect} but #{@reserved_test.inspect} was reserved"
unless reserved_tests.delete?(test)
raise ReservationError, "Acknowledged #{test.inspect} but only #{reserved_tests.map(&:inspect).join(", ")} reserved"
end
end

def reserve
if @reserved_test
raise ReservationError, "#{@reserved_test.inspect} is already reserved. " \
"You have to acknowledge it before you can reserve another one"
end

@reserved_test = (try_to_reserve_lost_test || try_to_reserve_test)
test = (try_to_reserve_lost_test || try_to_reserve_test)
reserved_tests << test
test
end

def try_to_reserve_test
Expand Down
13 changes: 9 additions & 4 deletions ruby/lib/ci/queue/static.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,15 @@ def remaining
end

def running
@reserved_test ? 1 : 0
reserved_tests.empty? ? 0 : 1
end

def poll
while !@shutdown && config.circuit_breakers.none?(&:open?) && !max_test_failed? && @reserved_test = @queue.shift
yield index.fetch(@reserved_test)
while !@shutdown && config.circuit_breakers.none?(&:open?) && !max_test_failed? && reserved_test = @queue.shift
reserved_tests << reserved_test
yield index.fetch(reserved_test)
end
@reserved_test = nil
reserved_tests.clear
end

def exhausted?
Expand Down Expand Up @@ -142,6 +143,10 @@ def should_requeue?(key)
def requeues
@requeues ||= Hash.new(0)
end

def reserved_tests
@reserved_tests ||= Set.new
end
end
end
end
142 changes: 81 additions & 61 deletions ruby/lib/minitest/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ module WithTimestamps
end

module Queue
include ::CI::Queue::OutputHelpers
extend ::CI::Queue::OutputHelpers
attr_writer :run_command_formatter, :project_root

def run_command_formatter
Expand Down Expand Up @@ -149,7 +149,86 @@ def self.relative_path(path, root: project_root)
path
end

class << self
def queue
Minitest.queue
end

def run(reporter, *)
rescue_run_errors do
queue.poll do |example|
result = queue.with_heartbeat(example.id) do
example.run
end

handle_test_result(reporter, example, result)
end

queue.stop_heartbeat!
end
end

def handle_test_result(reporter, example, result)
failed = !(result.passed? || result.skipped?)

if example.flaky?
result.mark_as_flaked!
failed = false
end

if failed && queue.config.failing_test && queue.config.failing_test != example.id
# When we do a bisect, we don't care about the result other than the test we're running the bisect on
result.mark_as_flaked!
failed = false
elsif failed
queue.report_failure!
else
queue.report_success!
end

if failed && CI::Queue.requeueable?(result) && queue.requeue(example)
result.requeue!
reporter.record(result)
elsif queue.acknowledge(example)
reporter.record(result)
queue.increment_test_failed if failed
elsif !failed
# If the test was already acknowledged by another worker (we timed out)
# Then we only record it if it is successful.
reporter.record(result)
end
end

private

def rescue_run_errors(&block)
block.call
rescue Errno::EPIPE
# This happens when the heartbeat process dies
reopen_previous_step
puts red("The heartbeat process died. This worker is exiting early.")
exit!(41)
rescue CI::Queue::Error => error
reopen_previous_step
puts red("#{error.class}: #{error.message}")
error.backtrace.each do |frame|
puts red(frame)
end
exit!(41)
rescue => error
reopen_previous_step
Minitest.queue.report_worker_error(error)
puts red("This worker exited because of an uncaught application error:")
puts red("#{error.class}: #{error.message}")
error.backtrace.each do |frame|
puts red(frame)
end
exit!(42)
end
end

class SingleExample
attr_reader :runnable, :method_name

def initialize(runnable, method_name)
@runnable = runnable
Expand Down Expand Up @@ -211,7 +290,7 @@ def loaded_tests

def __run(*args)
if queue
run_from_queue(*args)
Queue.run(*args)

if queue.config.circuit_breakers.any?(&:open?)
STDERR.puts queue.config.circuit_breakers.map(&:message).join(' ').strip
Expand All @@ -224,65 +303,6 @@ def __run(*args)
super
end
end

def run_from_queue(reporter, *)
queue.poll do |example|
result = queue.with_heartbeat(example.id) do
example.run
end

failed = !(result.passed? || result.skipped?)

if example.flaky?
result.mark_as_flaked!
failed = false
end

if failed && queue.config.failing_test && queue.config.failing_test != example.id
# When we do a bisect, we don't care about the result other than the test we're running the bisect on
result.mark_as_flaked!
failed = false
elsif failed
queue.report_failure!
else
queue.report_success!
end

if failed && CI::Queue.requeueable?(result) && queue.requeue(example)
result.requeue!
reporter.record(result)
elsif queue.acknowledge(example)
reporter.record(result)
queue.increment_test_failed if failed
elsif !failed
# If the test was already acknowledged by another worker (we timed out)
# Then we only record it if it is successful.
reporter.record(result)
end
end
queue.stop_heartbeat!
rescue Errno::EPIPE
# This happens when the heartbeat process dies
reopen_previous_step
puts red("The heartbeat process died. This worker is exiting early.")
exit!(41)
rescue CI::Queue::Error => error
reopen_previous_step
puts red("#{error.class}: #{error.message}")
error.backtrace.each do |frame|
puts red(frame)
end
exit!(41)
rescue => error
reopen_previous_step
queue.report_worker_error(error)
puts red("This worker exited because of an uncaught application error:")
puts red("#{error.class}: #{error.message}")
error.backtrace.each do |frame|
puts red(frame)
end
exit!(42)
end
end
end

Expand Down
Loading