diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 9a0c73d2..aff6bc82 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -16,7 +16,7 @@ class Worker < Base attr_reader :total def initialize(redis, config) - @reserved_test = nil + @reserved_tests = Set.new @shutdown_required = false super(redis, config) end @@ -151,7 +151,7 @@ def requeue(test, offset: Redis.requeue_offset) argv: [config.max_requeues, global_max_requeues, test_key, offset], ) == 1 - @reserved_test = test_key unless requeued + reserved_tests << test_key unless requeued requeued end @@ -168,25 +168,24 @@ def release! attr_reader :index + def reserved_tests + @reserved_tests ||= Set.new + end + def worker_id config.worker_id end def raise_on_mismatching_test(test) - if @reserved_test == test - @reserved_test = nil - else - raise ReservationError, "Acknowledged #{test.inspect} but #{@reserved_test.inspect} was reserved" + unless reserved_tests.delete?(test) + raise ReservationError, "Acknowledged #{test.inspect} but only #{reserved_tests.map(&:inspect).join(", ")} reserved" end end def reserve - if @reserved_test - raise ReservationError, "#{@reserved_test.inspect} is already reserved. " \ - "You have to acknowledge it before you can reserve another one" - end - - @reserved_test = (try_to_reserve_lost_test || try_to_reserve_test) + test = (try_to_reserve_lost_test || try_to_reserve_test) + reserved_tests << test + test end def try_to_reserve_test diff --git a/ruby/lib/ci/queue/static.rb b/ruby/lib/ci/queue/static.rb index 9b6260df..8a44c632 100644 --- a/ruby/lib/ci/queue/static.rb +++ b/ruby/lib/ci/queue/static.rb @@ -89,14 +89,15 @@ def remaining end def running - @reserved_test ? 1 : 0 + reserved_tests.empty? ? 0 : 1 end def poll - while !@shutdown && config.circuit_breakers.none?(&:open?) && !max_test_failed? && @reserved_test = @queue.shift - yield index.fetch(@reserved_test) + while !@shutdown && config.circuit_breakers.none?(&:open?) && !max_test_failed? && reserved_test = @queue.shift + reserved_tests << reserved_test + yield index.fetch(reserved_test) end - @reserved_test = nil + reserved_tests.clear end def exhausted? @@ -142,6 +143,10 @@ def should_requeue?(key) def requeues @requeues ||= Hash.new(0) end + + def reserved_tests + @reserved_tests ||= Set.new + end end end end diff --git a/ruby/lib/minitest/queue.rb b/ruby/lib/minitest/queue.rb index 91e464ff..86ec1e38 100644 --- a/ruby/lib/minitest/queue.rb +++ b/ruby/lib/minitest/queue.rb @@ -107,7 +107,7 @@ module WithTimestamps end module Queue - include ::CI::Queue::OutputHelpers + extend ::CI::Queue::OutputHelpers attr_writer :run_command_formatter, :project_root def run_command_formatter @@ -149,7 +149,86 @@ def self.relative_path(path, root: project_root) path end + class << self + def queue + Minitest.queue + end + + def run(reporter, *) + rescue_run_errors do + queue.poll do |example| + result = queue.with_heartbeat(example.id) do + example.run + end + + handle_test_result(reporter, example, result) + end + + queue.stop_heartbeat! + end + end + + def handle_test_result(reporter, example, result) + failed = !(result.passed? || result.skipped?) + + if example.flaky? + result.mark_as_flaked! + failed = false + end + + if failed && queue.config.failing_test && queue.config.failing_test != example.id + # When we do a bisect, we don't care about the result other than the test we're running the bisect on + result.mark_as_flaked! + failed = false + elsif failed + queue.report_failure! + else + queue.report_success! + end + + if failed && CI::Queue.requeueable?(result) && queue.requeue(example) + result.requeue! + reporter.record(result) + elsif queue.acknowledge(example) + reporter.record(result) + queue.increment_test_failed if failed + elsif !failed + # If the test was already acknowledged by another worker (we timed out) + # Then we only record it if it is successful. + reporter.record(result) + end + end + + private + + def rescue_run_errors(&block) + block.call + rescue Errno::EPIPE + # This happens when the heartbeat process dies + reopen_previous_step + puts red("The heartbeat process died. This worker is exiting early.") + exit!(41) + rescue CI::Queue::Error => error + reopen_previous_step + puts red("#{error.class}: #{error.message}") + error.backtrace.each do |frame| + puts red(frame) + end + exit!(41) + rescue => error + reopen_previous_step + Minitest.queue.report_worker_error(error) + puts red("This worker exited because of an uncaught application error:") + puts red("#{error.class}: #{error.message}") + error.backtrace.each do |frame| + puts red(frame) + end + exit!(42) + end + end + class SingleExample + attr_reader :runnable, :method_name def initialize(runnable, method_name) @runnable = runnable @@ -211,7 +290,7 @@ def loaded_tests def __run(*args) if queue - run_from_queue(*args) + Queue.run(*args) if queue.config.circuit_breakers.any?(&:open?) STDERR.puts queue.config.circuit_breakers.map(&:message).join(' ').strip @@ -224,65 +303,6 @@ def __run(*args) super end end - - def run_from_queue(reporter, *) - queue.poll do |example| - result = queue.with_heartbeat(example.id) do - example.run - end - - failed = !(result.passed? || result.skipped?) - - if example.flaky? - result.mark_as_flaked! - failed = false - end - - if failed && queue.config.failing_test && queue.config.failing_test != example.id - # When we do a bisect, we don't care about the result other than the test we're running the bisect on - result.mark_as_flaked! - failed = false - elsif failed - queue.report_failure! - else - queue.report_success! - end - - if failed && CI::Queue.requeueable?(result) && queue.requeue(example) - result.requeue! - reporter.record(result) - elsif queue.acknowledge(example) - reporter.record(result) - queue.increment_test_failed if failed - elsif !failed - # If the test was already acknowledged by another worker (we timed out) - # Then we only record it if it is successful. - reporter.record(result) - end - end - queue.stop_heartbeat! - rescue Errno::EPIPE - # This happens when the heartbeat process dies - reopen_previous_step - puts red("The heartbeat process died. This worker is exiting early.") - exit!(41) - rescue CI::Queue::Error => error - reopen_previous_step - puts red("#{error.class}: #{error.message}") - error.backtrace.each do |frame| - puts red(frame) - end - exit!(41) - rescue => error - reopen_previous_step - queue.report_worker_error(error) - puts red("This worker exited because of an uncaught application error:") - puts red("#{error.class}: #{error.message}") - error.backtrace.each do |frame| - puts red(frame) - end - exit!(42) - end end end