diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..c0195d9 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,215 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview +Flatware is a test parallelization tool for RSpec and Cucumber that significantly reduces test execution time by distributing tests across multiple worker processes using DRb (distributed Ruby) for inter-process communication. + +## Architecture Deep Dive + +### Core Components and Their Interactions + +#### 1. Worker Process Management (`lib/flatware/worker.rb`) +- **Spawning**: Workers are created via `fork()` with unique IDs and `TEST_ENV_NUMBER` env vars +- **Communication**: Uses DRb over Unix sockets (`drbunix:flatware-sink`) +- **Retry Logic**: 10 retries with exponential backoff for DRb connections +- **Lifecycle**: Pull-based model - workers request jobs from sink until receiving sentinel job +- **Process Naming**: Sets `$0 = "flatware worker #{i}"` for easy identification + +#### 2. Sink Server (`lib/flatware/sink.rb`) +Central coordinator implementing pull-based work distribution: +- **Job Queue**: Maintains queue of jobs, distributes on worker request +- **Worker Tracking**: Uses Set to track active workers +- **Completion Detection**: Auto-stops when no workers remain and queue empty +- **Signal Handling**: Graceful shutdown on INT/CLD signals +- **Job Grouping**: Groups jobs by worker count using modulo for load balancing + +#### 3. Message Protocol +DRb RPC methods between workers and sink: +- `ready(worker_id)` - Worker requests work +- `started(job)` - Worker reports job start +- `finished(job)` - Worker reports job completion +- `checkpoint(data)` - Send test results/progress +- `progress(message)` - Real-time test output + +### Job Distribution Strategies + +#### FileJobBuilder (`lib/flatware/rspec/file_job_builder.rb`) +Default strategy - distributes spec files: +- Uses RSpec's persisted example status for time-based balancing +- Greedy algorithm minimizes total runtime variance across workers +- Untimed files distributed round-robin + +#### ExampleJobBuilder (`lib/flatware/rspec/example_job_builder.rb`) +Fine-grained strategy - distributes individual examples: +- Loads RSpec config in forked process to avoid pollution +- Prioritizes longer-running examples first +- Uses `within_forked_process` pattern for safe configuration loading + +### RSpec Integration Details + +#### Marshalable Object System (`lib/flatware/rspec/marshalable/`) +Solves the problem of RSpec objects containing unmarshalable references: +- `Example` - Strips RSpec examples to essential attributes +- `ExecutionResult` - Serializes test results with custom exception handling +- `SummaryNotification` - Aggregatable summary data across workers +- Uses dynamic method generation for event handling + +#### Exception Serialization (`lib/flatware/serialized_exception.rb`) +Handles unmarshalable exceptions by extracting: +- Class name as string (not class object) +- Message and backtrace +- Recursively serializes cause chain + +#### Checkpoint System (`lib/flatware/rspec/checkpoint.rb`) +- Aggregates test results from multiple workers +- Implements `+` operator for combining checkpoints +- Dynamically generates event handlers based on RSpec events + +### Process and Signal Management + +#### PID Tracking (`lib/flatware/pid.rb`) +- Cross-platform PS parsing (Darwin vs Linux) +- Process group tracking via pgid +- Provides cleanup utilities for all flatware processes + +#### Signal Handling (`lib/flatware/sink/signal.rb`) +- **INT**: Triggers graceful shutdown +- **CLD**: Detects worker failures +- Workers finish current jobs before exiting + +### Error Handling Patterns + +#### Worker Resilience +- DRb connection retries with exponential backoff +- Failed jobs remain in queue for other workers +- Worker crashes detected via CLD signal + +#### Serialization Edge Cases +- Exception cause chains properly handled +- Class names stored as strings to avoid loading issues +- Metadata filtered to exclude unmarshalable objects + +## Common Development Commands + +```bash +# Run tests +bundle exec rake spec # Run RSpec tests +bundle exec rake cucumber # Run Cucumber tests +bundle exec rake lint # Run RuboCop linter +bundle exec rake # Run all checks (lint, spec, cucumber) + +# Test a single spec file +bundle exec rspec spec/flatware/worker_spec.rb + +# Run Cucumber with specific tags +bundle exec cucumber --tags "not @wip" + +# Build gems +bundle exec rake build # Build all gems +bundle exec rake build:flatware # Build main gem +bundle exec rake build:flatware-rspec # Build RSpec runner +bundle exec rake build:flatware-cucumber # Build Cucumber runner + +# Test flatware itself in development +bundle exec flatware rspec +bundle exec flatware cucumber + +# Debug with specific workers +bundle exec flatware -w 2 rspec # Use 2 workers +bundle exec flatware --log rspec # Enable debug logging + +# Use example-based distribution +bundle exec flatware rspec --job-builder ExampleJobBuilder +``` + +## Database Configuration for Rails Apps +When using Flatware with Rails, configure test databases with `TEST_ENV_NUMBER`: +```yaml +test: + database: foo_test<%=ENV['TEST_ENV_NUMBER']%> +``` + +Then prepare databases: +```bash +flatware fan rake db:test:prepare +``` + +## Flatware Configuration Hooks +Create `spec/flatware_helper.rb` for lifecycle callbacks: +```ruby +Flatware.configure do |conf| + conf.before_fork do + require 'rails_helper' + ActiveRecord::Base.connection.disconnect! + end + + conf.after_fork do |test_env_number| + config = ActiveRecord::Base.connection_db_config.configuration_hash + ActiveRecord::Base.establish_connection( + config.merge(database: config.fetch(:database) + test_env_number.to_s) + ) + end +end +``` + +## Key Implementation Patterns + +1. **Defensive Marshaling**: All cross-process objects explicitly made marshalable +2. **Pull-based Distribution**: Workers request work (not pushed) +3. **Sentinel Jobs**: Special jobs signal workers to shutdown cleanly +4. **Broadcaster Pattern**: `method_missing` forwards to multiple formatters +5. **Process Title Setting**: Each component sets `$0` for identification +6. **Forked Configuration**: ExampleJobBuilder loads config in fork to avoid pollution +7. **Dynamic Method Generation**: Checkpoint generates handlers from RSpec events + +## Debugging Tips + +### Process Inspection +```bash +ps aux | grep flatware # See all flatware processes +``` + +### Common Issues +1. **Segfault in PG gem**: Add `ENV["PGGSSENCMODE"] = "disable"` to flatware helper +2. **ActiveRecord connection errors**: Ensure disconnect in before_fork, reconnect in after_fork +3. **SimpleCov integration**: Call `SimpleCov.at_fork.call(test_env_number)` in after_fork + +### Testing Flatware Development +Use Aruba for integration testing: +1. Add `@no-clobber` tag to `features/flatware.feature` +2. Run `cucumber features/flatware.feature` +3. CD to `./tmp/aruba` where flatware is in PATH + +## Important File Locations + +### Core Logic +- `lib/flatware/worker.rb` - Worker process implementation +- `lib/flatware/sink.rb` - Central coordinator server +- `lib/flatware/cli.rb` - Command-line interface (Thor) + +### RSpec Integration +- `lib/flatware/rspec/` - RSpec-specific code +- `lib/flatware/rspec/marshalable/` - Serialization layer +- `lib/flatware/rspec/formatter.rb` - RSpec formatter integration + +### Job Distribution +- `lib/flatware/rspec/file_job_builder.rb` - File-based distribution (default) +- `lib/flatware/rspec/example_job_builder.rb` - Example-based distribution + +### IPC/Messaging +- `lib/flatware/sink/client.rb` - Worker-to-sink communication +- `lib/flatware/broadcaster.rb` - Event broadcasting to multiple formatters + +### Process Management +- `lib/flatware/pid.rb` - Process tracking utilities +- `lib/flatware/sink/signal.rb` - Signal handling + +## Non-Obvious Implementation Details + +- **Round-robin with Offset**: Untimed work uses calculated offsets for even distribution +- **Retrying Helper**: Generic retry logic with exponential backoff used throughout +- **Job Sentinels**: Empty jobs with `sentinel?` true trigger worker shutdown +- **Configuration Isolation**: Each worker gets isolated RSpec configuration +- **Checkpoint Aggregation**: Results combined via `+` operator across workers +- **Process Group Management**: Uses `setpgrp` to manage related processes as group \ No newline at end of file diff --git a/features/rspec.feature b/features/rspec.feature index 8b584ac..888b48d 100644 --- a/features/rspec.feature +++ b/features/rspec.feature @@ -107,3 +107,46 @@ Feature: rspec task """ 0 examples, 0 failures, 1 error occurred outside of examples """ + + @non-zero + Scenario: example job builder + Given spec "a" contains: + """ + describe "fail" do + it { expect(true).to eq false } + end + """ + And spec "b" contains: + """ + describe "pass" do + it { expect(true).to eq true } + end + """ + When I run flatware with "rspec -l --job-builder=ExampleJobBuilder" + Then the output contains the following: + """ + Run options: include {:ids=>{"./spec/a_spec.rb"=>["1:1"]}} + """ + And the output contains the following: + """ + Run options: include {:ids=>{"./spec/b_spec.rb"=>["1:1"]}} + """ + And the output contains the following: + """ + 2 examples, 1 failure + """ + + @non-zero + Scenario: failure outside of examples with example job builder + Given the following spec: + """ + throw :a_fit + describe 'fits' do + it('already threw one') + end + """ + When I run flatware with "rspec --job-builder=ExampleJobBuilder" + Then the output contains the following line: + """ + uncaught throw :a_fit + """ diff --git a/lib/flatware/rspec.rb b/lib/flatware/rspec.rb index 10ab6ff..0bac73e 100644 --- a/lib/flatware/rspec.rb +++ b/lib/flatware/rspec.rb @@ -7,12 +7,14 @@ module Flatware module RSpec require 'flatware/rspec/formatter' - require 'flatware/rspec/job_builder' + require 'flatware/rspec/file_job_builder' + require 'flatware/rspec/example_job_builder' module_function - def extract_jobs_from_args(args, workers:) - JobBuilder.new(args, workers: workers).jobs + def extract_jobs_from_args(args, workers:, job_builder:) + builder = const_get(job_builder) + builder.new(args, workers: workers).jobs end def runner diff --git a/lib/flatware/rspec/cli.rb b/lib/flatware/rspec/cli.rb index c0417f7..d019cb5 100644 --- a/lib/flatware/rspec/cli.rb +++ b/lib/flatware/rspec/cli.rb @@ -13,18 +13,27 @@ class CLI type: :string, default: 'drbunix:flatware-sink' ) + method_option( + 'job-builder', + type: :string, + default: 'FileJobBuilder' + ) desc 'rspec [FLATWARE_OPTS]', 'parallelizes rspec' def rspec(*rspec_args) - jobs = RSpec.extract_jobs_from_args rspec_args, workers: workers - - formatter = Flatware::RSpec::Formatters::Console.new( - ::RSpec.configuration.output_stream, - deprecation_stream: ::RSpec.configuration.deprecation_stream - ) + jobs = RSpec.extract_jobs_from_args rspec_args, workers: workers, job_builder: options['job-builder'] Flatware.verbose = options[:log] Worker.spawn count: workers, runner: RSpec, sink: options['sink-endpoint'] start_sink(jobs: jobs, workers: workers, formatter: formatter) end + + private + + def formatter + Flatware::RSpec::Formatters::Console.new( + ::RSpec.configuration.output_stream, + deprecation_stream: ::RSpec.configuration.deprecation_stream + ) + end end end diff --git a/lib/flatware/rspec/example_job_builder.rb b/lib/flatware/rspec/example_job_builder.rb new file mode 100644 index 0000000..74cbc48 --- /dev/null +++ b/lib/flatware/rspec/example_job_builder.rb @@ -0,0 +1,214 @@ +# frozen_string_literal: true + +require 'rspec/core/sandbox' +require 'drb/drb' +require 'fileutils' +require 'flatware/serialized_exception' + +module Flatware + module RSpec + # groups examples into one job per worker. + # reads from persisted example statuses, if available, + # and attempts to ballence the jobs accordingly. + class ExampleJobBuilder + attr_reader :args, :workers + + def initialize(args, workers:) + @args = args + @workers = workers + + load_configuration_and_examples + end + + def jobs + timed_examples, untimed_examples = timed_and_untimed_examples + buckets = Array.new([@examples_to_run.size, workers].min) { Bucket.new } + + balance_jobs( + buckets: buckets, + timed_examples: timed_examples, + untimed_examples: untimed_examples + ) + end + + private + + def balance_jobs(buckets:, timed_examples:, untimed_examples:) + timed_examples.each do |(example_id, time)| + buckets.min_by(&:runtime).add_example(example_id, time) + end + + untimed_examples.each_with_index do |example_id, index| + offset = (timed_examples.size + index) % buckets.size + buckets[offset].add_example(example_id) + end + + buckets.map { |bucket| Job.new(bucket.examples, args) } + end + + def timed_and_untimed_examples + timed_examples = [] + untimed_examples = [] + + @examples_to_run.each do |example_id| + if (time = example_runtimes[example_id]) + timed_examples << [example_id, time] + else + untimed_examples << example_id + end + end + + [timed_examples.sort_by! { |(_id, time)| -time }, untimed_examples] + end + + def load_persisted_example_statuses + ::RSpec::Core::ExampleStatusPersister.load_from(@example_status_persistence_file_path || '') + end + + def example_runtimes + @example_runtimes ||= load_persisted_example_statuses.each_with_object({}) do |status_entry, runtimes| + next unless status_entry.fetch(:status) =~ /pass/i + + runtimes[status_entry[:example_id]] = status_entry[:run_time].to_f + end + end + + def load_configuration_and_examples + configuration = ::RSpec.configuration + configuration.define_singleton_method(:command) { 'rspec' } + + ::RSpec::Core::ConfigurationOptions.new(args).configure(configuration) + + @example_status_persistence_file_path = configuration.example_status_persistence_file_path + + # Load spec files in a fork to avoid polluting the parent process, + # otherwise the actual execution will return warnings for redefining constants + # and shared example groups. + @examples_to_run = within_forked_process { load_examples_to_run(configuration) } + end + + def within_forked_process + # Generate unique endpoint for temporary DRb server + socket_file = "/tmp/flatware-example-builder-#{Process.pid}-#{Time.now.to_f}.sock" + endpoint = "drbunix:#{socket_file}" + + # Container to store result from child process + result_container = { data: nil, error: nil, completed: false } + + # Ensure any existing DRb thread is running + DRb.start_service unless DRb.thread + + # Start temporary DRb server in parent + server = DRb::DRbServer.new(endpoint, result_container, verbose: Flatware.verbose?) + + begin + pid = fork do + $stdout = File.new(File::NULL, 'w') + + # Start DRb in child process + DRb.start_service + + # Connect to parent's temporary DRb server + retrying_drb_connection(times: 10, wait: 0.1) do + remote_container = DRbObject.new_with_uri(endpoint) + + begin + # Execute block and store result + data = yield + remote_container[:data] = data + remote_container[:completed] = true + rescue StandardError => e + remote_container[:error] = Flatware::SerializedException.from(e) + remote_container[:completed] = true + ensure + # Explicitly stop DRb service in child to close connection cleanly + DRb.stop_service + # Small delay to ensure the DRb thread finishes sending data + sleep 0.05 + end + end + + # Ensure child exits cleanly + exit(0) + end + + # Wait for child to complete with timeout + timeout_seconds = 30 + start_time = Time.now + + while !result_container[:completed] && (Time.now - start_time) < timeout_seconds + sleep 0.05 + # Check if child process is still alive + break unless Process.waitpid(pid, Process::WNOHANG).nil? + end + + # Ensure child process has exited + begin + Process.waitpid(pid) + rescue StandardError + nil + end + + # Give DRb server thread time to finish processing any remaining operations + # This prevents "connection closed" errors in CI environments + if result_container[:completed] + sleep 0.1 + # Allow the DRb server thread to complete its work + Thread.pass + end + + # Handle results + if result_container[:error] + # Re-raise the exception from the child process + error = result_container[:error] + raise "#{error.class}: #{error.message}" + end + raise 'Child process failed to return result' unless result_container[:completed] + + result_container[:data] + ensure + # Stop the temporary DRb server + server&.stop_service + # Clean up socket file + FileUtils.rm_f(socket_file) + end + end + + def retrying_drb_connection(times:, wait:) + tries = 0 + begin + yield + rescue DRb::DRbConnError + raise if (tries += 1) >= times + + sleep wait + retry + end + end + + def load_examples_to_run(configuration) + configuration.load_spec_files + + # If there's an error loading spec files, raise an error instead of exiting. + # The error will be captured by within_forked_process and properly handled. + raise 'RSpec configuration failed to load spec files' if ::RSpec.world.wants_to_quit + + ::RSpec.world.ordered_example_groups.flat_map(&:descendants).flat_map(&:filtered_examples).map(&:id) + end + + class Bucket + attr_reader :examples, :runtime + + def initialize + @examples = [] + @runtime = 0 + end + + def add_example(example_id, runtime = 0) + @examples << example_id + @runtime += runtime + end + end + end + end +end diff --git a/lib/flatware/rspec/job_builder.rb b/lib/flatware/rspec/file_job_builder.rb similarity index 99% rename from lib/flatware/rspec/job_builder.rb rename to lib/flatware/rspec/file_job_builder.rb index 2691aef..e59e222 100644 --- a/lib/flatware/rspec/job_builder.rb +++ b/lib/flatware/rspec/file_job_builder.rb @@ -7,7 +7,7 @@ module RSpec # groups spec files into one job per worker. # reads from persisted example statuses, if available, # and attempts to ballence the jobs accordingly. - class JobBuilder + class FileJobBuilder extend Forwardable attr_reader :args, :workers, :configuration diff --git a/spec/flatware/rspec/example_job_builder_spec.rb b/spec/flatware/rspec/example_job_builder_spec.rb new file mode 100644 index 0000000..7e5b507 --- /dev/null +++ b/spec/flatware/rspec/example_job_builder_spec.rb @@ -0,0 +1,103 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'flatware/rspec/example_job_builder' + +describe Flatware::RSpec::ExampleJobBuilder do + before do + allow(RSpec::Core::ExampleStatusPersister).to( + receive(:load_from).and_return(persisted_examples) + ) + + allow_any_instance_of(RSpec::Core::World).to( + receive(:ordered_example_groups).and_return(ordered_example_groups) + ) + end + + let(:persisted_examples) { [] } + let(:examples_to_run) { [] } + let(:ordered_example_groups) do + examples_to_run + .group_by { |example_id| example_id.split('[').first } + .map do |_file_name, example_ids| + double(descendants: [double(filtered_examples: example_ids.map { |id| double(id: id) })]) + end + end + + subject do + described_class.new([], workers: 2).jobs + end + + context 'when this run includes persisted examples' do + let(:persisted_examples) do + [ + { example_id: './fast_1_spec.rb[1]', run_time: '1 second' }, + { example_id: './fast_2_spec.rb[1]', run_time: '1 second' }, + { example_id: './fast_3_spec.rb[1]', run_time: '1 second' }, + { example_id: './slow_spec.rb[1]', run_time: '2 seconds' } + ].map { |example| example.merge status: 'passed' } + end + + let(:examples_to_run) { %w(./fast_1_spec.rb[1] ./fast_2_spec.rb[1] ./slow_spec.rb[1]) } + + it 'groups them into equal time blocks' do + expect(subject).to match_array( + [ + have_attributes( + id: match_array(%w[./fast_1_spec.rb[1] ./fast_2_spec.rb[1]]) + ), + have_attributes(id: match_array(%w[./slow_spec.rb[1]])) + ] + ) + end + + context 'and this run includes examples that are not persisted' do + let(:examples_to_run) do + %w[ + ./fast_1_spec.rb[1] + ./fast_2_spec.rb[1] + ./slow_spec.rb[1] + ./new_1_spec.rb[1] + ./new_2_spec.rb[1] + ./new_3_spec.rb[1] + ] + end + + it 'assigns the remaining files round-robin' do + expect(subject).to match_array( + [ + have_attributes(id: include('./new_1_spec.rb[1]', './new_3_spec.rb[1]')), + have_attributes(id: include('./new_2_spec.rb[1]')) + ] + ) + end + end + + context 'and an example from one file takes longer than all other examples' do + let(:persisted_examples) do + [ + { example_id: './spec_1.rb[1]', run_time: '10 seconds' }, + { example_id: './spec_1.rb[2]', run_time: '1 second' }, + { example_id: './spec_1.rb[3]', run_time: '1 second' }, + { example_id: './spec_2.rb[1]', run_time: '1 second' }, + { example_id: './spec_2.rb[2]', run_time: '1 second' }, + { example_id: './spec_2.rb[3]', run_time: '1 second' } + ].map { |example| example.merge status: 'passed' } + end + + let(:examples_to_run) do + %w(./spec_1.rb[1] ./spec_1.rb[2] ./spec_1.rb[3] ./spec_2.rb[1] ./spec_2.rb[2] ./spec_2.rb[3]) + end + + it 'assigns that example as sole in one job' do + expect(subject).to match_array( + [ + have_attributes(id: ['./spec_1.rb[1]']), + have_attributes(id: match_array(%w[./spec_1.rb[2] ./spec_1.rb[3] ./spec_2.rb[1] ./spec_2.rb[2] + ./spec_2.rb[3]])) + ] + ) + end + end + end +end diff --git a/spec/flatware/rspec/job_builder_spec.rb b/spec/flatware/rspec/file_job_builder_spec.rb similarity index 95% rename from spec/flatware/rspec/job_builder_spec.rb rename to spec/flatware/rspec/file_job_builder_spec.rb index 6e35d14..d4eea0d 100644 --- a/spec/flatware/rspec/job_builder_spec.rb +++ b/spec/flatware/rspec/file_job_builder_spec.rb @@ -1,9 +1,9 @@ # frozen_string_literal: true require 'spec_helper' -require 'flatware/rspec/job_builder' +require 'flatware/rspec/file_job_builder' -describe Flatware::RSpec::JobBuilder do +describe Flatware::RSpec::FileJobBuilder do before do allow(RSpec::Core::ExampleStatusPersister).to( receive(:load_from).and_return(persisted_examples)