From 9d5627a0477649bfa404b265cc9117103cc75df8 Mon Sep 17 00:00:00 2001 From: Grzesiek Kolodziejczyk Date: Tue, 28 Jan 2025 18:56:28 +0100 Subject: [PATCH 1/5] Add option to distribute jobs by example instead of by file The purpose of this change is to allow more optimal distribution when the suite contains outlier files. With enough cores on a CI run we noticed that the lower bound of the duration of the full run was the runtime of the slowest file. In order to be able to split by example, ExampleJobBuilder needs to load all spec files and extract examples from them. As a side effect, this unlocks more filtering options (such as by tag) which previously would silently not work as expected. --- features/rspec.feature | 43 ++++++ lib/flatware/rspec.rb | 8 +- lib/flatware/rspec/cli.rb | 21 ++- lib/flatware/rspec/example_job_builder.rb | 123 ++++++++++++++++++ .../{job_builder.rb => file_job_builder.rb} | 2 +- .../rspec/example_job_builder_spec.rb | 103 +++++++++++++++ ...ilder_spec.rb => file_job_builder_spec.rb} | 4 +- 7 files changed, 292 insertions(+), 12 deletions(-) create mode 100644 lib/flatware/rspec/example_job_builder.rb rename lib/flatware/rspec/{job_builder.rb => file_job_builder.rb} (99%) create mode 100644 spec/flatware/rspec/example_job_builder_spec.rb rename spec/flatware/rspec/{job_builder_spec.rb => file_job_builder_spec.rb} (95%) diff --git a/features/rspec.feature b/features/rspec.feature index 8b584ac..888b48d 100644 --- a/features/rspec.feature +++ b/features/rspec.feature @@ -107,3 +107,46 @@ Feature: rspec task """ 0 examples, 0 failures, 1 error occurred outside of examples """ + + @non-zero + Scenario: example job builder + Given spec "a" contains: + """ + describe "fail" do + it { expect(true).to eq false } + end + """ + And spec "b" contains: + """ + describe "pass" do + it { expect(true).to eq true } + end + """ + When I run flatware with "rspec -l --job-builder=ExampleJobBuilder" + Then the output contains the following: + """ + Run options: include {:ids=>{"./spec/a_spec.rb"=>["1:1"]}} + """ + And the output contains the following: + """ + Run options: include {:ids=>{"./spec/b_spec.rb"=>["1:1"]}} + """ + And the output contains the following: + """ + 2 examples, 1 failure + """ + + @non-zero + Scenario: failure outside of examples with example job builder + Given the following spec: + """ + throw :a_fit + describe 'fits' do + it('already threw one') + end + """ + When I run flatware with "rspec --job-builder=ExampleJobBuilder" + Then the output contains the following line: + """ + uncaught throw :a_fit + """ diff --git a/lib/flatware/rspec.rb b/lib/flatware/rspec.rb index 10ab6ff..0bac73e 100644 --- a/lib/flatware/rspec.rb +++ b/lib/flatware/rspec.rb @@ -7,12 +7,14 @@ module Flatware module RSpec require 'flatware/rspec/formatter' - require 'flatware/rspec/job_builder' + require 'flatware/rspec/file_job_builder' + require 'flatware/rspec/example_job_builder' module_function - def extract_jobs_from_args(args, workers:) - JobBuilder.new(args, workers: workers).jobs + def extract_jobs_from_args(args, workers:, job_builder:) + builder = const_get(job_builder) + builder.new(args, workers: workers).jobs end def runner diff --git a/lib/flatware/rspec/cli.rb b/lib/flatware/rspec/cli.rb index c0417f7..d019cb5 100644 --- a/lib/flatware/rspec/cli.rb +++ b/lib/flatware/rspec/cli.rb @@ -13,18 +13,27 @@ class CLI type: :string, default: 'drbunix:flatware-sink' ) + method_option( + 'job-builder', + type: :string, + default: 'FileJobBuilder' + ) desc 'rspec [FLATWARE_OPTS]', 'parallelizes rspec' def rspec(*rspec_args) - jobs = RSpec.extract_jobs_from_args rspec_args, workers: workers - - formatter = Flatware::RSpec::Formatters::Console.new( - ::RSpec.configuration.output_stream, - deprecation_stream: ::RSpec.configuration.deprecation_stream - ) + jobs = RSpec.extract_jobs_from_args rspec_args, workers: workers, job_builder: options['job-builder'] Flatware.verbose = options[:log] Worker.spawn count: workers, runner: RSpec, sink: options['sink-endpoint'] start_sink(jobs: jobs, workers: workers, formatter: formatter) end + + private + + def formatter + Flatware::RSpec::Formatters::Console.new( + ::RSpec.configuration.output_stream, + deprecation_stream: ::RSpec.configuration.deprecation_stream + ) + end end end diff --git a/lib/flatware/rspec/example_job_builder.rb b/lib/flatware/rspec/example_job_builder.rb new file mode 100644 index 0000000..f76f77b --- /dev/null +++ b/lib/flatware/rspec/example_job_builder.rb @@ -0,0 +1,123 @@ +# frozen_string_literal: true + +require 'rspec/core/sandbox' + +module Flatware + module RSpec + # groups examples into one job per worker. + # reads from persisted example statuses, if available, + # and attempts to ballence the jobs accordingly. + class ExampleJobBuilder + attr_reader :args, :workers + + def initialize(args, workers:) + @args = args + @workers = workers + + load_configuration_and_examples + end + + def jobs + timed_examples, untimed_examples = timed_and_untimed_examples + bucket_count = [@examples_to_run.size, workers].min + + balance_jobs( + bucket_count: bucket_count, + timed_examples: timed_examples, + untimed_examples: untimed_examples + ) + end + + private + + def balance_jobs(bucket_count:, timed_examples:, untimed_examples:) + buckets = Array.new(bucket_count) { Bucket.new } + + timed_examples.sort_by!(&:last).reverse_each do |(example_id, time)| + buckets.min_by(&:runtime).add_example(example_id, time) + end + + untimed_examples.each_with_index do |example_id, index| + buckets[index % bucket_count].add_example(example_id, 0) + end + + buckets.map { |bucket| Job.new(bucket.examples, args) } + end + + def timed_and_untimed_examples + @examples_to_run.reduce([[], []]) do |(timed, untimed), example_id| + if (time = example_runtimes[example_id]) + [timed + [[example_id, time]], untimed] + else + [timed, untimed + [example_id]] + end + end + end + + def load_persisted_example_statuses + ::RSpec::Core::ExampleStatusPersister.load_from( + @example_status_persistence_file_path || '' + ) + end + + def example_runtimes + @example_runtimes ||= load_persisted_example_statuses.each_with_object({}) do |status_entry, runtimes| + next unless status_entry.fetch(:status) =~ /pass/i + + runtimes[status_entry[:example_id]] = status_entry[:run_time].to_f + end + end + + def load_configuration_and_examples + configuration = ::RSpec.configuration + configuration.define_singleton_method(:command) { 'rspec' } + + ::RSpec::Core::ConfigurationOptions.new(args).configure(configuration) + + @example_status_persistence_file_path = configuration.example_status_persistence_file_path + + # Load spec files in a fork to avoid polluting the parent process, + # otherwise the actual execution will return warnings for redefining constants + # and shared example groups. + @examples_to_run = within_forked_process { load_examples_to_run(configuration) } + end + + def within_forked_process + reader, writer = IO.pipe(binmode: true) + + fork do + reader.close + $stdout = File.new(File::NULL, 'w') + + writer.write Marshal.dump(yield) + end + + writer.close + Marshal.load(reader.gets) # rubocop:disable Security/MarshalLoad + end + + def load_examples_to_run(configuration) + configuration.load_spec_files + + # If there's an error loading spec files, exit immediately. + exit(configuration.failure_exit_code) if ::RSpec.world.wants_to_quit + + ::RSpec.world.ordered_example_groups.flat_map(&:descendants).flat_map(&:filtered_examples).map(&:id) + end + + class Bucket + attr_reader :examples, :runtime + + def initialize + @examples = [] + @runtime = 0 + end + + def add_example(example_id, runtime) + @examples << example_id + @runtime += runtime + end + end + end + end +end diff --git a/lib/flatware/rspec/job_builder.rb b/lib/flatware/rspec/file_job_builder.rb similarity index 99% rename from lib/flatware/rspec/job_builder.rb rename to lib/flatware/rspec/file_job_builder.rb index 2691aef..e59e222 100644 --- a/lib/flatware/rspec/job_builder.rb +++ b/lib/flatware/rspec/file_job_builder.rb @@ -7,7 +7,7 @@ module RSpec # groups spec files into one job per worker. # reads from persisted example statuses, if available, # and attempts to ballence the jobs accordingly. - class JobBuilder + class FileJobBuilder extend Forwardable attr_reader :args, :workers, :configuration diff --git a/spec/flatware/rspec/example_job_builder_spec.rb b/spec/flatware/rspec/example_job_builder_spec.rb new file mode 100644 index 0000000..7e5b507 --- /dev/null +++ b/spec/flatware/rspec/example_job_builder_spec.rb @@ -0,0 +1,103 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'flatware/rspec/example_job_builder' + +describe Flatware::RSpec::ExampleJobBuilder do + before do + allow(RSpec::Core::ExampleStatusPersister).to( + receive(:load_from).and_return(persisted_examples) + ) + + allow_any_instance_of(RSpec::Core::World).to( + receive(:ordered_example_groups).and_return(ordered_example_groups) + ) + end + + let(:persisted_examples) { [] } + let(:examples_to_run) { [] } + let(:ordered_example_groups) do + examples_to_run + .group_by { |example_id| example_id.split('[').first } + .map do |_file_name, example_ids| + double(descendants: [double(filtered_examples: example_ids.map { |id| double(id: id) })]) + end + end + + subject do + described_class.new([], workers: 2).jobs + end + + context 'when this run includes persisted examples' do + let(:persisted_examples) do + [ + { example_id: './fast_1_spec.rb[1]', run_time: '1 second' }, + { example_id: './fast_2_spec.rb[1]', run_time: '1 second' }, + { example_id: './fast_3_spec.rb[1]', run_time: '1 second' }, + { example_id: './slow_spec.rb[1]', run_time: '2 seconds' } + ].map { |example| example.merge status: 'passed' } + end + + let(:examples_to_run) { %w(./fast_1_spec.rb[1] ./fast_2_spec.rb[1] ./slow_spec.rb[1]) } + + it 'groups them into equal time blocks' do + expect(subject).to match_array( + [ + have_attributes( + id: match_array(%w[./fast_1_spec.rb[1] ./fast_2_spec.rb[1]]) + ), + have_attributes(id: match_array(%w[./slow_spec.rb[1]])) + ] + ) + end + + context 'and this run includes examples that are not persisted' do + let(:examples_to_run) do + %w[ + ./fast_1_spec.rb[1] + ./fast_2_spec.rb[1] + ./slow_spec.rb[1] + ./new_1_spec.rb[1] + ./new_2_spec.rb[1] + ./new_3_spec.rb[1] + ] + end + + it 'assigns the remaining files round-robin' do + expect(subject).to match_array( + [ + have_attributes(id: include('./new_1_spec.rb[1]', './new_3_spec.rb[1]')), + have_attributes(id: include('./new_2_spec.rb[1]')) + ] + ) + end + end + + context 'and an example from one file takes longer than all other examples' do + let(:persisted_examples) do + [ + { example_id: './spec_1.rb[1]', run_time: '10 seconds' }, + { example_id: './spec_1.rb[2]', run_time: '1 second' }, + { example_id: './spec_1.rb[3]', run_time: '1 second' }, + { example_id: './spec_2.rb[1]', run_time: '1 second' }, + { example_id: './spec_2.rb[2]', run_time: '1 second' }, + { example_id: './spec_2.rb[3]', run_time: '1 second' } + ].map { |example| example.merge status: 'passed' } + end + + let(:examples_to_run) do + %w(./spec_1.rb[1] ./spec_1.rb[2] ./spec_1.rb[3] ./spec_2.rb[1] ./spec_2.rb[2] ./spec_2.rb[3]) + end + + it 'assigns that example as sole in one job' do + expect(subject).to match_array( + [ + have_attributes(id: ['./spec_1.rb[1]']), + have_attributes(id: match_array(%w[./spec_1.rb[2] ./spec_1.rb[3] ./spec_2.rb[1] ./spec_2.rb[2] + ./spec_2.rb[3]])) + ] + ) + end + end + end +end diff --git a/spec/flatware/rspec/job_builder_spec.rb b/spec/flatware/rspec/file_job_builder_spec.rb similarity index 95% rename from spec/flatware/rspec/job_builder_spec.rb rename to spec/flatware/rspec/file_job_builder_spec.rb index 6e35d14..d4eea0d 100644 --- a/spec/flatware/rspec/job_builder_spec.rb +++ b/spec/flatware/rspec/file_job_builder_spec.rb @@ -1,9 +1,9 @@ # frozen_string_literal: true require 'spec_helper' -require 'flatware/rspec/job_builder' +require 'flatware/rspec/file_job_builder' -describe Flatware::RSpec::JobBuilder do +describe Flatware::RSpec::FileJobBuilder do before do allow(RSpec::Core::ExampleStatusPersister).to( receive(:load_from).and_return(persisted_examples) From e2bee927831f2c54b711080d967e595eb39dc024 Mon Sep 17 00:00:00 2001 From: Grzesiek Kolodziejczyk Date: Thu, 13 Feb 2025 12:12:08 +0100 Subject: [PATCH 2/5] Fix sub-optimal distribution of untimed examples --- lib/flatware/rspec/example_job_builder.rb | 30 ++++++++++++----------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/lib/flatware/rspec/example_job_builder.rb b/lib/flatware/rspec/example_job_builder.rb index f76f77b..2e97732 100644 --- a/lib/flatware/rspec/example_job_builder.rb +++ b/lib/flatware/rspec/example_job_builder.rb @@ -19,10 +19,10 @@ def initialize(args, workers:) def jobs timed_examples, untimed_examples = timed_and_untimed_examples - bucket_count = [@examples_to_run.size, workers].min + buckets = Array.new([@examples_to_run.size, workers].min) { Bucket.new } balance_jobs( - bucket_count: bucket_count, + buckets: buckets, timed_examples: timed_examples, untimed_examples: untimed_examples ) @@ -30,34 +30,36 @@ def jobs private - def balance_jobs(bucket_count:, timed_examples:, untimed_examples:) - buckets = Array.new(bucket_count) { Bucket.new } - - timed_examples.sort_by!(&:last).reverse_each do |(example_id, time)| + def balance_jobs(buckets:, timed_examples:, untimed_examples:) + timed_examples.each do |(example_id, time)| buckets.min_by(&:runtime).add_example(example_id, time) end untimed_examples.each_with_index do |example_id, index| - buckets[index % bucket_count].add_example(example_id, 0) + offset = (timed_examples.size + index) % buckets.size + buckets[offset].add_example(example_id) end buckets.map { |bucket| Job.new(bucket.examples, args) } end def timed_and_untimed_examples - @examples_to_run.reduce([[], []]) do |(timed, untimed), example_id| + timed_examples = [] + untimed_examples = [] + + @examples_to_run.each do |example_id| if (time = example_runtimes[example_id]) - [timed + [[example_id, time]], untimed] + timed_examples << [example_id, time] else - [timed, untimed + [example_id]] + untimed_examples << example_id end end + + [timed_examples.sort_by! { |(_id, time)| -time }, untimed_examples] end def load_persisted_example_statuses - ::RSpec::Core::ExampleStatusPersister.load_from( - @example_status_persistence_file_path || '' - ) + ::RSpec::Core::ExampleStatusPersister.load_from(@example_status_persistence_file_path || '') end def example_runtimes @@ -113,7 +115,7 @@ def initialize @runtime = 0 end - def add_example(example_id, runtime) + def add_example(example_id, runtime = 0) @examples << example_id @runtime += runtime end From 4821967b92f6d6d22227db8308d65847117d4506 Mon Sep 17 00:00:00 2001 From: Grzesiek Kolodziejczyk Date: Wed, 9 Apr 2025 15:19:42 +0200 Subject: [PATCH 3/5] Make sure to read all IPC before de-marshaling --- lib/flatware/rspec/example_job_builder.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/flatware/rspec/example_job_builder.rb b/lib/flatware/rspec/example_job_builder.rb index 2e97732..55f98cf 100644 --- a/lib/flatware/rspec/example_job_builder.rb +++ b/lib/flatware/rspec/example_job_builder.rb @@ -95,7 +95,7 @@ def within_forked_process end writer.close - Marshal.load(reader.gets) # rubocop:disable Security/MarshalLoad + Marshal.load(reader.gets(nil)) # rubocop:disable Security/MarshalLoad end def load_examples_to_run(configuration) From f053906eca01f476e2fcd2be4b40316678f7d630 Mon Sep 17 00:00:00 2001 From: Navid EMAD Date: Fri, 19 Sep 2025 14:18:29 +0200 Subject: [PATCH 4/5] Refactor to use drb --- CLAUDE.md | 215 ++++++++++++++++++++++ lib/flatware/rspec/example_job_builder.rb | 94 +++++++++- 2 files changed, 300 insertions(+), 9 deletions(-) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..c0195d9 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,215 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview +Flatware is a test parallelization tool for RSpec and Cucumber that significantly reduces test execution time by distributing tests across multiple worker processes using DRb (distributed Ruby) for inter-process communication. + +## Architecture Deep Dive + +### Core Components and Their Interactions + +#### 1. Worker Process Management (`lib/flatware/worker.rb`) +- **Spawning**: Workers are created via `fork()` with unique IDs and `TEST_ENV_NUMBER` env vars +- **Communication**: Uses DRb over Unix sockets (`drbunix:flatware-sink`) +- **Retry Logic**: 10 retries with exponential backoff for DRb connections +- **Lifecycle**: Pull-based model - workers request jobs from sink until receiving sentinel job +- **Process Naming**: Sets `$0 = "flatware worker #{i}"` for easy identification + +#### 2. Sink Server (`lib/flatware/sink.rb`) +Central coordinator implementing pull-based work distribution: +- **Job Queue**: Maintains queue of jobs, distributes on worker request +- **Worker Tracking**: Uses Set to track active workers +- **Completion Detection**: Auto-stops when no workers remain and queue empty +- **Signal Handling**: Graceful shutdown on INT/CLD signals +- **Job Grouping**: Groups jobs by worker count using modulo for load balancing + +#### 3. Message Protocol +DRb RPC methods between workers and sink: +- `ready(worker_id)` - Worker requests work +- `started(job)` - Worker reports job start +- `finished(job)` - Worker reports job completion +- `checkpoint(data)` - Send test results/progress +- `progress(message)` - Real-time test output + +### Job Distribution Strategies + +#### FileJobBuilder (`lib/flatware/rspec/file_job_builder.rb`) +Default strategy - distributes spec files: +- Uses RSpec's persisted example status for time-based balancing +- Greedy algorithm minimizes total runtime variance across workers +- Untimed files distributed round-robin + +#### ExampleJobBuilder (`lib/flatware/rspec/example_job_builder.rb`) +Fine-grained strategy - distributes individual examples: +- Loads RSpec config in forked process to avoid pollution +- Prioritizes longer-running examples first +- Uses `within_forked_process` pattern for safe configuration loading + +### RSpec Integration Details + +#### Marshalable Object System (`lib/flatware/rspec/marshalable/`) +Solves the problem of RSpec objects containing unmarshalable references: +- `Example` - Strips RSpec examples to essential attributes +- `ExecutionResult` - Serializes test results with custom exception handling +- `SummaryNotification` - Aggregatable summary data across workers +- Uses dynamic method generation for event handling + +#### Exception Serialization (`lib/flatware/serialized_exception.rb`) +Handles unmarshalable exceptions by extracting: +- Class name as string (not class object) +- Message and backtrace +- Recursively serializes cause chain + +#### Checkpoint System (`lib/flatware/rspec/checkpoint.rb`) +- Aggregates test results from multiple workers +- Implements `+` operator for combining checkpoints +- Dynamically generates event handlers based on RSpec events + +### Process and Signal Management + +#### PID Tracking (`lib/flatware/pid.rb`) +- Cross-platform PS parsing (Darwin vs Linux) +- Process group tracking via pgid +- Provides cleanup utilities for all flatware processes + +#### Signal Handling (`lib/flatware/sink/signal.rb`) +- **INT**: Triggers graceful shutdown +- **CLD**: Detects worker failures +- Workers finish current jobs before exiting + +### Error Handling Patterns + +#### Worker Resilience +- DRb connection retries with exponential backoff +- Failed jobs remain in queue for other workers +- Worker crashes detected via CLD signal + +#### Serialization Edge Cases +- Exception cause chains properly handled +- Class names stored as strings to avoid loading issues +- Metadata filtered to exclude unmarshalable objects + +## Common Development Commands + +```bash +# Run tests +bundle exec rake spec # Run RSpec tests +bundle exec rake cucumber # Run Cucumber tests +bundle exec rake lint # Run RuboCop linter +bundle exec rake # Run all checks (lint, spec, cucumber) + +# Test a single spec file +bundle exec rspec spec/flatware/worker_spec.rb + +# Run Cucumber with specific tags +bundle exec cucumber --tags "not @wip" + +# Build gems +bundle exec rake build # Build all gems +bundle exec rake build:flatware # Build main gem +bundle exec rake build:flatware-rspec # Build RSpec runner +bundle exec rake build:flatware-cucumber # Build Cucumber runner + +# Test flatware itself in development +bundle exec flatware rspec +bundle exec flatware cucumber + +# Debug with specific workers +bundle exec flatware -w 2 rspec # Use 2 workers +bundle exec flatware --log rspec # Enable debug logging + +# Use example-based distribution +bundle exec flatware rspec --job-builder ExampleJobBuilder +``` + +## Database Configuration for Rails Apps +When using Flatware with Rails, configure test databases with `TEST_ENV_NUMBER`: +```yaml +test: + database: foo_test<%=ENV['TEST_ENV_NUMBER']%> +``` + +Then prepare databases: +```bash +flatware fan rake db:test:prepare +``` + +## Flatware Configuration Hooks +Create `spec/flatware_helper.rb` for lifecycle callbacks: +```ruby +Flatware.configure do |conf| + conf.before_fork do + require 'rails_helper' + ActiveRecord::Base.connection.disconnect! + end + + conf.after_fork do |test_env_number| + config = ActiveRecord::Base.connection_db_config.configuration_hash + ActiveRecord::Base.establish_connection( + config.merge(database: config.fetch(:database) + test_env_number.to_s) + ) + end +end +``` + +## Key Implementation Patterns + +1. **Defensive Marshaling**: All cross-process objects explicitly made marshalable +2. **Pull-based Distribution**: Workers request work (not pushed) +3. **Sentinel Jobs**: Special jobs signal workers to shutdown cleanly +4. **Broadcaster Pattern**: `method_missing` forwards to multiple formatters +5. **Process Title Setting**: Each component sets `$0` for identification +6. **Forked Configuration**: ExampleJobBuilder loads config in fork to avoid pollution +7. **Dynamic Method Generation**: Checkpoint generates handlers from RSpec events + +## Debugging Tips + +### Process Inspection +```bash +ps aux | grep flatware # See all flatware processes +``` + +### Common Issues +1. **Segfault in PG gem**: Add `ENV["PGGSSENCMODE"] = "disable"` to flatware helper +2. **ActiveRecord connection errors**: Ensure disconnect in before_fork, reconnect in after_fork +3. **SimpleCov integration**: Call `SimpleCov.at_fork.call(test_env_number)` in after_fork + +### Testing Flatware Development +Use Aruba for integration testing: +1. Add `@no-clobber` tag to `features/flatware.feature` +2. Run `cucumber features/flatware.feature` +3. CD to `./tmp/aruba` where flatware is in PATH + +## Important File Locations + +### Core Logic +- `lib/flatware/worker.rb` - Worker process implementation +- `lib/flatware/sink.rb` - Central coordinator server +- `lib/flatware/cli.rb` - Command-line interface (Thor) + +### RSpec Integration +- `lib/flatware/rspec/` - RSpec-specific code +- `lib/flatware/rspec/marshalable/` - Serialization layer +- `lib/flatware/rspec/formatter.rb` - RSpec formatter integration + +### Job Distribution +- `lib/flatware/rspec/file_job_builder.rb` - File-based distribution (default) +- `lib/flatware/rspec/example_job_builder.rb` - Example-based distribution + +### IPC/Messaging +- `lib/flatware/sink/client.rb` - Worker-to-sink communication +- `lib/flatware/broadcaster.rb` - Event broadcasting to multiple formatters + +### Process Management +- `lib/flatware/pid.rb` - Process tracking utilities +- `lib/flatware/sink/signal.rb` - Signal handling + +## Non-Obvious Implementation Details + +- **Round-robin with Offset**: Untimed work uses calculated offsets for even distribution +- **Retrying Helper**: Generic retry logic with exponential backoff used throughout +- **Job Sentinels**: Empty jobs with `sentinel?` true trigger worker shutdown +- **Configuration Isolation**: Each worker gets isolated RSpec configuration +- **Checkpoint Aggregation**: Results combined via `+` operator across workers +- **Process Group Management**: Uses `setpgrp` to manage related processes as group \ No newline at end of file diff --git a/lib/flatware/rspec/example_job_builder.rb b/lib/flatware/rspec/example_job_builder.rb index 55f98cf..c191de7 100644 --- a/lib/flatware/rspec/example_job_builder.rb +++ b/lib/flatware/rspec/example_job_builder.rb @@ -1,6 +1,9 @@ # frozen_string_literal: true require 'rspec/core/sandbox' +require 'drb/drb' +require 'fileutils' +require 'flatware/serialized_exception' module Flatware module RSpec @@ -85,24 +88,97 @@ def load_configuration_and_examples end def within_forked_process - reader, writer = IO.pipe(binmode: true) + # Generate unique endpoint for temporary DRb server + socket_file = "/tmp/flatware-example-builder-#{Process.pid}-#{Time.now.to_f}.sock" + endpoint = "drbunix:#{socket_file}" + + # Container to store result from child process + result_container = { data: nil, error: nil, completed: false } + + # Ensure any existing DRb thread is running + DRb.start_service unless DRb.thread + + # Start temporary DRb server in parent + server = DRb::DRbServer.new(endpoint, result_container, verbose: Flatware.verbose?) + + begin + pid = fork do + $stdout = File.new(File::NULL, 'w') + + # Start DRb in child process + DRb.start_service + + # Connect to parent's temporary DRb server + retrying_drb_connection(times: 10, wait: 0.1) do + remote_container = DRbObject.new_with_uri(endpoint) + + begin + # Execute block and store result + data = yield + remote_container[:data] = data + remote_container[:completed] = true + rescue StandardError => e + remote_container[:error] = Flatware::SerializedException.from(e) + remote_container[:completed] = true + end + end + + # Ensure child exits cleanly + exit(0) + end + + # Wait for child to complete with timeout + timeout_seconds = 30 + start_time = Time.now - fork do - reader.close - $stdout = File.new(File::NULL, 'w') + while !result_container[:completed] && (Time.now - start_time) < timeout_seconds + sleep 0.05 + # Check if child process is still alive + break unless Process.waitpid(pid, Process::WNOHANG).nil? + end + + # Ensure child process has exited + begin + Process.waitpid(pid) + rescue StandardError + nil + end - writer.write Marshal.dump(yield) + # Handle results + if result_container[:error] + # Re-raise the exception from the child process + error = result_container[:error] + raise "#{error.class}: #{error.message}" + end + raise 'Child process failed to return result' unless result_container[:completed] + + result_container[:data] + ensure + # Stop the temporary DRb server + server&.stop_service + # Clean up socket file + FileUtils.rm_f(socket_file) end + end - writer.close - Marshal.load(reader.gets(nil)) # rubocop:disable Security/MarshalLoad + def retrying_drb_connection(times:, wait:) + tries = 0 + begin + yield + rescue DRb::DRbConnError + raise if (tries += 1) >= times + + sleep wait + retry + end end def load_examples_to_run(configuration) configuration.load_spec_files - # If there's an error loading spec files, exit immediately. - exit(configuration.failure_exit_code) if ::RSpec.world.wants_to_quit + # If there's an error loading spec files, raise an error instead of exiting. + # The error will be captured by within_forked_process and properly handled. + raise 'RSpec configuration failed to load spec files' if ::RSpec.world.wants_to_quit ::RSpec.world.ordered_example_groups.flat_map(&:descendants).flat_map(&:filtered_examples).map(&:id) end From c4e87c7dcf3e7c5751f32c9441fadb52c9025828 Mon Sep 17 00:00:00 2001 From: Navid EMAD Date: Wed, 24 Sep 2025 17:54:18 +0200 Subject: [PATCH 5/5] Give DRb server thread time to finish processing any remaining operations --- lib/flatware/rspec/example_job_builder.rb | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/lib/flatware/rspec/example_job_builder.rb b/lib/flatware/rspec/example_job_builder.rb index c191de7..74cbc48 100644 --- a/lib/flatware/rspec/example_job_builder.rb +++ b/lib/flatware/rspec/example_job_builder.rb @@ -120,6 +120,11 @@ def within_forked_process rescue StandardError => e remote_container[:error] = Flatware::SerializedException.from(e) remote_container[:completed] = true + ensure + # Explicitly stop DRb service in child to close connection cleanly + DRb.stop_service + # Small delay to ensure the DRb thread finishes sending data + sleep 0.05 end end @@ -144,6 +149,14 @@ def within_forked_process nil end + # Give DRb server thread time to finish processing any remaining operations + # This prevents "connection closed" errors in CI environments + if result_container[:completed] + sleep 0.1 + # Allow the DRb server thread to complete its work + Thread.pass + end + # Handle results if result_container[:error] # Re-raise the exception from the child process