Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ruby/lib/ci/queue/redis/build_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ def record_success(id, stats: nil, skip_flaky_record: false, acknowledge: true)
error_reports_deleted_count, requeued_count, _ = redis.pipelined do |pipeline|
pipeline.hdel(key('error-reports'), id)
pipeline.hget(key('requeues-count'), id)
pipeline.sadd(key('success-reports'), id)
pipeline.expire(key('success-reports'), config.redis_ttl)
record_stats(stats, pipeline: pipeline)
end
record_flaky(id) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0)
Expand Down
12 changes: 10 additions & 2 deletions ruby/lib/ci/queue/redis/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def wait_for_workers

@time_left = config.report_timeout - duration.to_i
@time_left_with_no_workers = config.inactive_workers_timeout
until exhausted? || @time_left <= 0 || max_test_failed? || @time_left_with_no_workers <= 0
until finished? || @time_left <= 0 || max_test_failed? || @time_left_with_no_workers <= 0
@time_left -= 1
sleep 1

Expand All @@ -39,7 +39,7 @@ def wait_for_workers
yield if block_given?
end

exhausted?
finished?
rescue CI::Queue::Redis::LostMaster
false
end
Expand All @@ -48,6 +48,14 @@ def wait_for_workers

private

def finished?
exhausted? && master_confirmed_all_tests_executed?
end

def master_confirmed_all_tests_executed?
redis.get(key('master-confirmed-all-tests-executed')) == 'true'
end

def active_workers?
# if there are running jobs we assume there are still agents active
redis.zrangebyscore(key('running'), CI::Queue.time_now.to_f - config.timeout, "+inf", limit: [0,1]).count > 0
Expand Down
21 changes: 19 additions & 2 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def distributed?

def populate(tests, random: Random.new)
@index = tests.map { |t| [t.id, t] }.to_h
tests = Queue.shuffle(tests, random)
push(tests.map(&:id))
@shuffled_tests = Queue.shuffle(tests, random).map(&:id)
push(@shuffled_tests)
self
end

Expand Down Expand Up @@ -69,6 +69,23 @@ def poll
pipeline.expire(key('worker', worker_id, 'queue'), config.redis_ttl)
pipeline.expire(key('processed'), config.redis_ttl)
end

# check we executed all tests
# only the master should perform this check because otherwise we will DDoS Redis when all workers
# try to fetch the processed tests at the same time
if master? && exhausted?
executed_tests = (redis.smembers(key('success-reports')) + redis.hkeys(key('error-reports')))
missing_tests = @shuffled_tests - executed_tests

if missing_tests.size > 0
puts "ci-queue did not process all tests - #{missing_tests.count} tests not executed!"
puts missing_tests.first(10)
report_worker_error(StandardError.new("ci-queue did not process all tests!"))
end

redis.set(key('master-confirmed-all-tests-executed'), true)
redis.expire(key('master-confirmed-all-tests-executed'), config.redis_ttl)
end
rescue *CONNECTION_ERRORS
end

Expand Down
4 changes: 3 additions & 1 deletion ruby/test/ci/queue/redis_supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ def test_wait_for_workers
workers_done = @supervisor.wait_for_workers
end
thread.wakeup
poll(worker(1))
worker = worker(1)
worker.instance_variable_set(:@master, true)
poll(worker)
thread.join
assert_equal true, workers_done
end
Expand Down
Loading