Skip to content
Closed

DRB #328

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ruby/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PATH
remote: .
specs:
ci-queue (0.66.0)
drb
logger

GEM
Expand Down
2 changes: 1 addition & 1 deletion ruby/Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Rake::TestTask.new(:test) do |t|
t.libs << 'lib'
selected_files = ENV["TEST_FILES"].to_s.strip.split(/\s+/)
selected_files = nil if selected_files.empty?
t.test_files = selected_files || FileList['test/**/*_test.rb'] - FileList['test/fixtures/**/*_test.rb']
t.test_files = ["test/integration/minitest_redis_test.rb"]
end

task :default => :test
Expand Down
1 change: 1 addition & 0 deletions ruby/ci-queue.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Gem::Specification.new do |spec|
spec.metadata['allowed_push_host'] = 'https://rubygems.org'

spec.add_runtime_dependency 'logger'
spec.add_runtime_dependency 'drb'

spec.add_development_dependency 'bundler'
spec.add_development_dependency 'rake'
Expand Down
46 changes: 29 additions & 17 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ def master?
@master
end

def stop?
shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed?
end

def pop(worker_id)
if test = reserve(worker_id)
index.fetch(test)
end
rescue *CONNECTION_ERRORS
end

def poll
wait_for_master
until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed?
Expand Down Expand Up @@ -97,19 +108,19 @@ def report_worker_error(error)
build.report_worker_error(error)
end

def acknowledge(test)
def acknowledge(test, worker_id)
test_key = test.id
raise_on_mismatching_test(test_key)
raise_on_mismatching_test(test_key, worker_id)
eval_script(
:acknowledge,
keys: [key('running'), key('processed'), key('owners')],
argv: [test_key],
) == 1
end

def requeue(test, offset: Redis.requeue_offset)
def requeue(test, worker_id, offset: Redis.requeue_offset)
test_key = test.id
raise_on_mismatching_test(test_key)
raise_on_mismatching_test(test_key, worker_id)
global_max_requeues = config.global_max_requeues(total)

requeued = config.max_requeues > 0 && global_max_requeues > 0 && eval_script(
Expand All @@ -125,7 +136,7 @@ def requeue(test, offset: Redis.requeue_offset)
argv: [config.max_requeues, global_max_requeues, test_key, offset],
) == 1

@reserved_test = test_key unless requeued
@reserved_test[worker_id] = test_key unless requeued
requeued
end

Expand All @@ -138,29 +149,30 @@ def release!
nil
end

private

attr_reader :index

def worker_id
config.worker_id
end
attr_reader :index

private


def raise_on_mismatching_test(test)
if @reserved_test == test
@reserved_test = nil
def raise_on_mismatching_test(test, worker_id)
if @reserved_test[worker_id] == test
@reserved_test[worker_id] = nil
else
raise ReservationError, "Acknowledged #{test.inspect} but #{@reserved_test.inspect} was reserved"
raise ReservationError, "Acknowledged #{test.inspect} but #{@reserved_test[worker_id].inspect} was reserved"
end
end

def reserve
if @reserved_test
raise ReservationError, "#{@reserved_test.inspect} is already reserved. " \
def reserve(worker_id)
@reserved_test ||= {}
if @reserved_test[worker_id]
raise ReservationError, "#{@reserved_test[worker_id].inspect} is already reserved. " \
"You have to acknowledge it before you can reserve another one"
end

@reserved_test = (try_to_reserve_lost_test || try_to_reserve_test)
@reserved_test[worker_id] = (try_to_reserve_lost_test || try_to_reserve_test)
end

def try_to_reserve_test
Expand Down
123 changes: 95 additions & 28 deletions ruby/lib/minitest/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require 'shellwords'
require 'minitest'
require 'minitest/reporters'
require 'drb/drb'

require 'minitest/queue/failure_formatter'
require 'minitest/queue/error_report'
Expand Down Expand Up @@ -225,41 +226,107 @@ def __run(*args)
end
end

def run_from_queue(reporter, *)
queue.poll do |example|
result = queue.with_heartbeat(example.id) do
example.run
end
# The URI for the server to connect to
URI="druby://localhost:8780"
URI_2="druby://localhost:8787"

failed = !(result.passed? || result.skipped?)
class TestServer

if example.flaky?
result.mark_as_flaked!
failed = false
end
def initialize(queue, reporter)
@queue = queue
@reporter = reporter
@mutex = Mutex.new
end

attr_accessor :queue, :reporter, :mutex

def wait_for_master
queue.wait_for_master
end

def reserve(worker_id)
queue.pop(worker_id)
end

def stop?
queue.stop?
end

if failed && queue.config.failing_test && queue.config.failing_test != example.id
# When we do a bisect, we don't care about the result other than the test we're running the bisect on
result.mark_as_flaked!
failed = false
elsif failed
queue.report_failure!
else
queue.report_success!
def record(worker_id, example, result)
mutex.synchronize do
failed = !(result.passed? || result.skipped?)

if example.flaky?
result.mark_as_flaked!
failed = false
end

if failed && queue.config.failing_test && queue.config.failing_test != example.id
# When we do a bisect, we don't care about the result other than the test we're running the bisect on
result.mark_as_flaked!
failed = false
elsif failed
queue.report_failure!
else
queue.report_success!
end

if failed && CI::Queue.requeueable?(result) && queue.requeue(example, worker_id)
result.requeue!
reporter.record(result)
elsif queue.acknowledge(example, worker_id)
reporter.record(result)
queue.increment_test_failed if failed
elsif !failed
# If the test was already acknowledged by another worker (we timed out)
# Then we only record it if it is successful.
reporter.record(result)
end
end

if failed && CI::Queue.requeueable?(result) && queue.requeue(example)
result.requeue!
reporter.record(result)
elsif queue.acknowledge(example)
reporter.record(result)
queue.increment_test_failed if failed
elsif !failed
# If the test was already acknowledged by another worker (we timed out)
# Then we only record it if it is successful.
reporter.record(result)
end
end

def run_from_queue(reporter, *)
# The object that handles requests on the server
server = TestServer.new(queue, reporter)

DRb.start_service(URI, server)

ENV.fetch("PARALLEL_WORKERS", 2).to_i.times do |i|
fork do
puts "Forking #{i}"
DRb.start_service
server = DRbObject.new_with_uri(URI)
server.wait_for_master
until server.stop?
if test = server.reserve(i)
puts "Process #{i} running: #{test.id}"
result = test.run
puts "Process #{i} result: #{result}"
server.record(i, test, result)
else
sleep 0.05
end
end

puts "Done"
end
end

until queue.stop?
print "."
sleep 0.05
end
puts "Stopping DRb"
DRb.stop_service

puts "Waiting for processes"
Process.waitall
# Wait for the drb server thread to finish before exiting.

puts "Workers finished"

queue.stop_heartbeat!
rescue Errno::EPIPE
# This happens when the heartbeat process dies
Expand Down
Loading
Loading