Skip to content
Closed
5 changes: 5 additions & 0 deletions ruby/lib/ci/queue/redis/build_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ def record_success(id, stats: nil, skip_flaky_record: false)
error_reports_deleted_count, requeued_count, _ = redis.pipelined do |pipeline|
pipeline.hdel(key('error-reports'), id.dup.force_encoding(Encoding::BINARY))
pipeline.hget(key('requeues-count'), id.b)
pipeline.sadd(
key('success-reports'),
id.dup.force_encoding(Encoding::BINARY),
)
pipeline.expire(key('success-reports'), config.redis_ttl)
record_stats(stats, pipeline: pipeline)
end
record_flaky(id) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0)
Expand Down
28 changes: 27 additions & 1 deletion ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ module Queue
module Redis
class << self
attr_accessor :requeue_offset
attr_accessor :max_sleep_time
end
self.requeue_offset = 42
self.max_sleep_time = 2

class Worker < Base
attr_reader :total
Expand Down Expand Up @@ -46,15 +48,39 @@ def master?
@master
end

DEFAULT_SLEEP_SECONDS = 0.5

def poll
wait_for_master
attempt = 0
until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed?
if test = reserve
attempt = 0
yield index.fetch(test)
else
sleep 0.05
# Adding exponential backoff to avoid hammering Redis
# we just stay online here in case a test gets retried or times out so we can afford to wait
sleep_time = [DEFAULT_SLEEP_SECONDS * (2 ** attempt), Redis.max_sleep_time].min
attempt += 1
sleep sleep_time
end
end

# check we executed all tests
# only the master should perform this check because otherwise we will DDoS Redis when all workers
# try to fetch the processed tests at the same time
if master? && exhausted?
executed_tests = (redis.smembers(key('success-reports')) + redis.hkeys(key('error-reports')))
executed_tests = executed_tests.map { |id| id.force_encoding(Encoding::BINARY) }
ids = @index.keys.map { |id| id.dup.force_encoding(Encoding::BINARY) }
missing_tests = ids - executed_tests

if missing_tests.size > 0
puts "ci-queue did not process all tests!"
puts missing_tests
end
end

redis.pipelined do |pipeline|
pipeline.expire(key('worker', worker_id, 'queue'), config.redis_ttl)
pipeline.expire(key('processed'), config.redis_ttl)
Expand Down
16 changes: 15 additions & 1 deletion ruby/lib/minitest/queue/build_status_reporter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,16 @@ def requeued_tests
build.requeued_tests
end

APPLICATION_ERROR_EXIT_CODE = 42
TIMED_OUT_EXIT_CODE = 43
TOO_MANY_FAILED_TESTS_EXIT_CODE = 44
WORKERS_DIED_EXIT_CODE = 45
SUCCESS_EXIT_CODE = 0
TEST_FAILURE_EXIT_CODE = 1

def report
exit_code = TEST_FAILURE_EXIT_CODE

if requeued_tests.to_a.any?
step("Requeued #{requeued_tests.size} tests")
requeued_tests.to_a.sort.each do |test_id, count|
Expand All @@ -131,10 +140,14 @@ def report
if remaining_tests.size > 10
puts " ..."
end

exit_code = TIMED_OUT_EXIT_CODE
elsif supervisor.time_left_with_no_workers.to_i <= 0
puts red("All workers died.")
exit_code = WORKERS_DIED_EXIT_CODE
elsif supervisor.max_test_failed?
puts red("Encountered too many failed tests. Test run was ended early.")
exit_code = TOO_MANY_FAILED_TESTS_EXIT_CODE
end

puts
Expand All @@ -146,9 +159,10 @@ def report
puts red("Worker #{worker_id } crashed")
puts error
puts ""
exit_code = APPLICATION_ERROR_EXIT_CODE
end

success?
success? ? SUCCESS_EXIT_CODE : exit_code
end

def success?
Expand Down
11 changes: 5 additions & 6 deletions ruby/lib/minitest/queue/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -253,25 +253,24 @@ def report_command

unless supervisor.wait_for_workers { display_warnings(supervisor.build) }
unless supervisor.queue_initialized?
abort! "No master was elected. Did all workers crash?", 40
abort! "No leader was elected. This typically means no worker was able to start. Were there any errors during application boot?", 40
end

unless supervisor.exhausted?
reporter = BuildStatusReporter.new(supervisor: supervisor)
reporter.report
exit_code = reporter.report
reporter.write_failure_file(queue_config.failure_file) if queue_config.failure_file
reporter.write_flaky_tests_file(queue_config.export_flaky_tests_file) if queue_config.export_flaky_tests_file

abort!("#{supervisor.size} tests weren't run.")
abort!("#{supervisor.size} tests weren't run.", exit_code)
end
end

reporter = BuildStatusReporter.new(supervisor: supervisor)
reporter.write_failure_file(queue_config.failure_file) if queue_config.failure_file
reporter.write_flaky_tests_file(queue_config.export_flaky_tests_file) if queue_config.export_flaky_tests_file
reporter.report

exit! reporter.success? ? 0 : 1
exit_code = reporter.report
exit! exit_code
end

def report_grind_command
Expand Down
2 changes: 1 addition & 1 deletion ruby/lib/rspec/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def call(options, stdout, stderr)

unless supervisor.wait_for_workers
unless supervisor.queue_initialized?
abort! "No master was elected. Did all workers crash?"
abort! "No leader was elected. This typically means no worker was able to start. Were there any errors during application boot?"
end

unless supervisor.exhausted?
Expand Down
3 changes: 3 additions & 0 deletions ruby/test/fixtures/test/lost_test.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# frozen_string_literal: true
require 'test_helper'

CI::Queue::Redis.max_sleep_time = 0.05

class LostTest < Minitest::Test

def test_foo
sleep 3
end
Expand Down
6 changes: 4 additions & 2 deletions ruby/test/integration/minitest_redis_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ def test_max_test_failed
end

refute_predicate $?, :success?
assert_equal 44, $?.exitstatus
assert_empty err
expected = <<~EXPECTED
Waiting for workers to complete
Expand Down Expand Up @@ -264,10 +265,11 @@ def test_all_workers_died
end

refute_predicate $?, :success?
assert_equal 40, $?.exitstatus
assert_empty err
expected = <<~EXPECTED
Waiting for workers to complete
No master was elected. Did all workers crash?
No leader was elected. This typically means no worker was able to start. Were there any errors during application boot?
EXPECTED
assert_equal expected.strip, normalize(out.lines[0..2].join.strip)
end
Expand Down Expand Up @@ -1018,7 +1020,7 @@ def test_application_error
assert_includes out, "Worker 1 crashed"
assert_includes out, "Some error in the test framework"

assert_equal 1, $?.exitstatus
assert_equal 42, $?.exitstatus
end

private
Expand Down
Loading