From 94c48936ce0595b5054db1d71d2dd411e45e1392 Mon Sep 17 00:00:00 2001 From: Christian Bruckmayer Date: Tue, 17 Jun 2025 21:54:36 +0100 Subject: [PATCH] Check we executed all tests --- ruby/lib/ci/queue/redis/build_record.rb | 2 ++ ruby/lib/ci/queue/redis/supervisor.rb | 12 ++++++++++-- ruby/lib/ci/queue/redis/worker.rb | 21 +++++++++++++++++++-- ruby/test/ci/queue/redis_supervisor_test.rb | 4 +++- 4 files changed, 34 insertions(+), 5 deletions(-) diff --git a/ruby/lib/ci/queue/redis/build_record.rb b/ruby/lib/ci/queue/redis/build_record.rb index 1f0fac38..23726922 100644 --- a/ruby/lib/ci/queue/redis/build_record.rb +++ b/ruby/lib/ci/queue/redis/build_record.rb @@ -72,6 +72,8 @@ def record_success(id, stats: nil, skip_flaky_record: false, acknowledge: true) error_reports_deleted_count, requeued_count, _ = redis.pipelined do |pipeline| pipeline.hdel(key('error-reports'), id) pipeline.hget(key('requeues-count'), id) + pipeline.sadd(key('success-reports'), id) + pipeline.expire(key('success-reports'), config.redis_ttl) record_stats(stats, pipeline: pipeline) end record_flaky(id) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0) diff --git a/ruby/lib/ci/queue/redis/supervisor.rb b/ruby/lib/ci/queue/redis/supervisor.rb index 0fcc47d5..d184dedd 100644 --- a/ruby/lib/ci/queue/redis/supervisor.rb +++ b/ruby/lib/ci/queue/redis/supervisor.rb @@ -26,7 +26,7 @@ def wait_for_workers @time_left = config.report_timeout - duration.to_i @time_left_with_no_workers = config.inactive_workers_timeout - until exhausted? || @time_left <= 0 || max_test_failed? || @time_left_with_no_workers <= 0 + until finished? || @time_left <= 0 || max_test_failed? || @time_left_with_no_workers <= 0 @time_left -= 1 sleep 1 @@ -39,7 +39,7 @@ def wait_for_workers yield if block_given? end - exhausted? + finished? rescue CI::Queue::Redis::LostMaster false end @@ -48,6 +48,14 @@ def wait_for_workers private + def finished? + exhausted? && master_confirmed_all_tests_executed? + end + + def master_confirmed_all_tests_executed? + redis.get(key('master-confirmed-all-tests-executed')) == 'true' + end + def active_workers? # if there are running jobs we assume there are still agents active redis.zrangebyscore(key('running'), CI::Queue.time_now.to_f - config.timeout, "+inf", limit: [0,1]).count > 0 diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index db87e566..9d0ea699 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -27,8 +27,8 @@ def distributed? def populate(tests, random: Random.new) @index = tests.map { |t| [t.id, t] }.to_h - tests = Queue.shuffle(tests, random) - push(tests.map(&:id)) + @shuffled_tests = Queue.shuffle(tests, random).map(&:id) + push(@shuffled_tests) self end @@ -69,6 +69,23 @@ def poll pipeline.expire(key('worker', worker_id, 'queue'), config.redis_ttl) pipeline.expire(key('processed'), config.redis_ttl) end + + # check we executed all tests + # only the master should perform this check because otherwise we will DDoS Redis when all workers + # try to fetch the processed tests at the same time + if master? && exhausted? + executed_tests = (redis.smembers(key('success-reports')) + redis.hkeys(key('error-reports'))) + missing_tests = @shuffled_tests - executed_tests + + if missing_tests.size > 0 + puts "ci-queue did not process all tests - #{missing_tests.count} tests not executed!" + puts missing_tests.first(10) + report_worker_error(StandardError.new("ci-queue did not process all tests!")) + end + + redis.set(key('master-confirmed-all-tests-executed'), true) + redis.expire(key('master-confirmed-all-tests-executed'), config.redis_ttl) + end rescue *CONNECTION_ERRORS end diff --git a/ruby/test/ci/queue/redis_supervisor_test.rb b/ruby/test/ci/queue/redis_supervisor_test.rb index f46baa31..7b64ce2b 100644 --- a/ruby/test/ci/queue/redis_supervisor_test.rb +++ b/ruby/test/ci/queue/redis_supervisor_test.rb @@ -34,7 +34,9 @@ def test_wait_for_workers workers_done = @supervisor.wait_for_workers end thread.wakeup - poll(worker(1)) + worker = worker(1) + worker.instance_variable_set(:@master, true) + poll(worker) thread.join assert_equal true, workers_done end