From fb3bf241b19b25e686d95c86c812aa1943ebd5da Mon Sep 17 00:00:00 2001 From: Chris Salzberg Date: Fri, 23 May 2025 15:32:44 +0900 Subject: [PATCH 1/5] Expose SingleExample#runnable and SingleExample#method_name --- ruby/lib/minitest/queue.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/ruby/lib/minitest/queue.rb b/ruby/lib/minitest/queue.rb index 91e464ff..384d9ee1 100644 --- a/ruby/lib/minitest/queue.rb +++ b/ruby/lib/minitest/queue.rb @@ -150,6 +150,7 @@ def self.relative_path(path, root: project_root) end class SingleExample + attr_reader :runnable, :method_name def initialize(runnable, method_name) @runnable = runnable From adc39eed4c1c23d1b7691888077b00bcaa2ff1a4 Mon Sep 17 00:00:00 2001 From: Chris Salzberg Date: Thu, 22 May 2025 11:54:03 +0900 Subject: [PATCH 2/5] Move Minitest.run_from_queue() to Minitest::Queue.run() This makes it easier to confine internal methods to Minitest::Queue and avoid polluting the top-level Minitest module. --- ruby/lib/minitest/queue.rb | 141 +++++++++++++++++++++---------------- 1 file changed, 80 insertions(+), 61 deletions(-) diff --git a/ruby/lib/minitest/queue.rb b/ruby/lib/minitest/queue.rb index 384d9ee1..86ec1e38 100644 --- a/ruby/lib/minitest/queue.rb +++ b/ruby/lib/minitest/queue.rb @@ -107,7 +107,7 @@ module WithTimestamps end module Queue - include ::CI::Queue::OutputHelpers + extend ::CI::Queue::OutputHelpers attr_writer :run_command_formatter, :project_root def run_command_formatter @@ -149,6 +149,84 @@ def self.relative_path(path, root: project_root) path end + class << self + def queue + Minitest.queue + end + + def run(reporter, *) + rescue_run_errors do + queue.poll do |example| + result = queue.with_heartbeat(example.id) do + example.run + end + + handle_test_result(reporter, example, result) + end + + queue.stop_heartbeat! + end + end + + def handle_test_result(reporter, example, result) + failed = !(result.passed? || result.skipped?) + + if example.flaky? + result.mark_as_flaked! + failed = false + end + + if failed && queue.config.failing_test && queue.config.failing_test != example.id + # When we do a bisect, we don't care about the result other than the test we're running the bisect on + result.mark_as_flaked! + failed = false + elsif failed + queue.report_failure! + else + queue.report_success! + end + + if failed && CI::Queue.requeueable?(result) && queue.requeue(example) + result.requeue! + reporter.record(result) + elsif queue.acknowledge(example) + reporter.record(result) + queue.increment_test_failed if failed + elsif !failed + # If the test was already acknowledged by another worker (we timed out) + # Then we only record it if it is successful. + reporter.record(result) + end + end + + private + + def rescue_run_errors(&block) + block.call + rescue Errno::EPIPE + # This happens when the heartbeat process dies + reopen_previous_step + puts red("The heartbeat process died. This worker is exiting early.") + exit!(41) + rescue CI::Queue::Error => error + reopen_previous_step + puts red("#{error.class}: #{error.message}") + error.backtrace.each do |frame| + puts red(frame) + end + exit!(41) + rescue => error + reopen_previous_step + Minitest.queue.report_worker_error(error) + puts red("This worker exited because of an uncaught application error:") + puts red("#{error.class}: #{error.message}") + error.backtrace.each do |frame| + puts red(frame) + end + exit!(42) + end + end + class SingleExample attr_reader :runnable, :method_name @@ -212,7 +290,7 @@ def loaded_tests def __run(*args) if queue - run_from_queue(*args) + Queue.run(*args) if queue.config.circuit_breakers.any?(&:open?) STDERR.puts queue.config.circuit_breakers.map(&:message).join(' ').strip @@ -225,65 +303,6 @@ def __run(*args) super end end - - def run_from_queue(reporter, *) - queue.poll do |example| - result = queue.with_heartbeat(example.id) do - example.run - end - - failed = !(result.passed? || result.skipped?) - - if example.flaky? - result.mark_as_flaked! - failed = false - end - - if failed && queue.config.failing_test && queue.config.failing_test != example.id - # When we do a bisect, we don't care about the result other than the test we're running the bisect on - result.mark_as_flaked! - failed = false - elsif failed - queue.report_failure! - else - queue.report_success! - end - - if failed && CI::Queue.requeueable?(result) && queue.requeue(example) - result.requeue! - reporter.record(result) - elsif queue.acknowledge(example) - reporter.record(result) - queue.increment_test_failed if failed - elsif !failed - # If the test was already acknowledged by another worker (we timed out) - # Then we only record it if it is successful. - reporter.record(result) - end - end - queue.stop_heartbeat! - rescue Errno::EPIPE - # This happens when the heartbeat process dies - reopen_previous_step - puts red("The heartbeat process died. This worker is exiting early.") - exit!(41) - rescue CI::Queue::Error => error - reopen_previous_step - puts red("#{error.class}: #{error.message}") - error.backtrace.each do |frame| - puts red(frame) - end - exit!(41) - rescue => error - reopen_previous_step - queue.report_worker_error(error) - puts red("This worker exited because of an uncaught application error:") - puts red("#{error.class}: #{error.message}") - error.backtrace.each do |frame| - puts red(frame) - end - exit!(42) - end end end From 90399e56f6ef722cddcba28e66ea1a95c319ea73 Mon Sep 17 00:00:00 2001 From: Chris Salzberg Date: Fri, 23 May 2025 13:45:30 +0900 Subject: [PATCH 3/5] Allow reserving multiple tests --- ruby/lib/ci/queue/redis/worker.rb | 23 ++++++++++++----------- ruby/lib/ci/queue/static.rb | 13 +++++++++---- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 9a0c73d2..b03e60ee 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -16,7 +16,7 @@ class Worker < Base attr_reader :total def initialize(redis, config) - @reserved_test = nil + @reserved_tests = Set.new @shutdown_required = false super(redis, config) end @@ -151,7 +151,7 @@ def requeue(test, offset: Redis.requeue_offset) argv: [config.max_requeues, global_max_requeues, test_key, offset], ) == 1 - @reserved_test = test_key unless requeued + reserved_tests << test_key unless requeued requeued end @@ -168,25 +168,26 @@ def release! attr_reader :index + def reserved_tests + @reserved_tests ||= Set.new + end + def worker_id config.worker_id end def raise_on_mismatching_test(test) - if @reserved_test == test - @reserved_test = nil + if reserved_tests.include?(test) + reserved_tests.delete(test) else - raise ReservationError, "Acknowledged #{test.inspect} but #{@reserved_test.inspect} was reserved" + raise ReservationError, "Acknowledged #{test.inspect} but only #{reserved_tests.map(&:inspect).join(", ")} reserved" end end def reserve - if @reserved_test - raise ReservationError, "#{@reserved_test.inspect} is already reserved. " \ - "You have to acknowledge it before you can reserve another one" - end - - @reserved_test = (try_to_reserve_lost_test || try_to_reserve_test) + test = (try_to_reserve_lost_test || try_to_reserve_test) + reserved_tests << test + test end def try_to_reserve_test diff --git a/ruby/lib/ci/queue/static.rb b/ruby/lib/ci/queue/static.rb index 9b6260df..2e8643c5 100644 --- a/ruby/lib/ci/queue/static.rb +++ b/ruby/lib/ci/queue/static.rb @@ -89,14 +89,15 @@ def remaining end def running - @reserved_test ? 1 : 0 + reserved_tests.empty? ? 0 : 1 end def poll - while !@shutdown && config.circuit_breakers.none?(&:open?) && !max_test_failed? && @reserved_test = @queue.shift - yield index.fetch(@reserved_test) + while !@shutdown && config.circuit_breakers.none?(&:open?) && !max_test_failed? && reserved_test = @queue.shift + reserved_tests << reserved_test + yield index.fetch(reserved_test) end - @reserved_test = nil + @reserved_tests.clear end def exhausted? @@ -142,6 +143,10 @@ def should_requeue?(key) def requeues @requeues ||= Hash.new(0) end + + def reserved_tests + @reserved_tests ||= Set.new + end end end end From 5544e5b369b4f5d1d7a978048af302f9a31f4922 Mon Sep 17 00:00:00 2001 From: Chris Salzberg Date: Fri, 23 May 2025 16:01:59 +0900 Subject: [PATCH 4/5] Use reserved_tests reader --- ruby/lib/ci/queue/static.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ruby/lib/ci/queue/static.rb b/ruby/lib/ci/queue/static.rb index 2e8643c5..8a44c632 100644 --- a/ruby/lib/ci/queue/static.rb +++ b/ruby/lib/ci/queue/static.rb @@ -97,7 +97,7 @@ def poll reserved_tests << reserved_test yield index.fetch(reserved_test) end - @reserved_tests.clear + reserved_tests.clear end def exhausted? From 222d4e1030bc9a6b9449c7cfcc84a6c1e99d8291 Mon Sep 17 00:00:00 2001 From: Chris Salzberg Date: Mon, 16 Jun 2025 17:22:51 +0900 Subject: [PATCH 5/5] Refactor using delete? --- ruby/lib/ci/queue/redis/worker.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index b03e60ee..aff6bc82 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -177,9 +177,7 @@ def worker_id end def raise_on_mismatching_test(test) - if reserved_tests.include?(test) - reserved_tests.delete(test) - else + unless reserved_tests.delete?(test) raise ReservationError, "Acknowledged #{test.inspect} but only #{reserved_tests.map(&:inspect).join(", ")} reserved" end end