diff --git a/README.md b/README.md index 1537c1e..20a864d 100644 --- a/README.md +++ b/README.md @@ -169,18 +169,20 @@ wrapper.join ### Caveats -Ractor::Wrapper is subject to some limitations (and bugs) of Ractors, as of -Ruby 4.0.0. - -* You can run blocks in the object's Ractor only if the block does not - access any data outside the block. Otherwise, the block must be run in - the calling Ractor. * Certain types cannot be used as method arguments or return values - because Ractor does not allow them to be moved between Ractors. These - include threads, backtraces, and a few others. -* Any exceptions raised are always copied (rather than moved) back to the - calling Ractor, and the backtrace is cleared out. This is due to - https://bugs.ruby-lang.org/issues/21818 + because they cannot be moved between Ractors. As of Ruby 4.0.0, these + include threads, backtraces, procs, and a few others. +* As of Ruby 4.0.0, any exceptions raised are always copied (rather than + moved) back to the calling Ractor, and the backtrace is cleared out. + This is due to https://bugs.ruby-lang.org/issues/21818 +* Blocks can be run "in place" (i.e. in the wrapped object context) only + if the block does not access any data outside the block. Otherwise, the + block must be run in caller's context. +* Blocks configured to run in the caller's context can only be run while + a method is executing. They cannot be "saved" as a proc to be run + later unless they are configured to run "in place". In particular, + using blocks as a syntax to define callbacks can generally not be done + through a wrapper. ## Contributing diff --git a/lib/ractor/wrapper.rb b/lib/ractor/wrapper.rb index c70d8b4..c532fd9 100644 --- a/lib/ractor/wrapper.rb +++ b/lib/ractor/wrapper.rb @@ -156,18 +156,20 @@ class Ractor # # ## Caveats # - # Ractor::Wrapper is subject to some limitations (and bugs) of Ractors, as of - # Ruby 4.0.0. - # - # * You can run blocks in the object Ractor only if the block does not - # access any data outside the block. Otherwise, the block must be run in - # the calling Ractor. # * Certain types cannot be used as method arguments or return values - # because they cannot be moved between Ractors. These include threads, - # backtraces, and a few others. - # * Any exceptions raised are always copied (rather than moved) back to the - # calling Ractor, and the backtrace is cleared out. This is due to - # https://bugs.ruby-lang.org/issues/21818 + # because they cannot be moved between Ractors. As of Ruby 4.0.0, these + # include threads, backtraces, procs, and a few others. + # * As of Ruby 4.0.0, any exceptions raised are always copied (rather than + # moved) back to the calling Ractor, and the backtrace is cleared out. + # This is due to https://bugs.ruby-lang.org/issues/21818 + # * Blocks can be run "in place" (i.e. in the wrapped object context) only + # if the block does not access any data outside the block. Otherwise, the + # block must be run in caller's context. + # * Blocks configured to run in the caller's context can only be run while + # a method is executing. They cannot be "saved" as a proc to be run + # later unless they are configured to run "in place". In particular, + # using blocks as a syntax to define callbacks can generally not be done + # through a wrapper. # class Wrapper ## @@ -284,8 +286,8 @@ def interpret_setting(setting, default) # cannot be moved or must run in the main Ractor. # @param name [String] A name for this wrapper. Used during logging. # @param threads [Integer] The number of worker threads to run. - # Defaults to 1, which causes the worker to serialize calls into a - # single thread. + # Defaults to 0, which causes the wrapper to run sequentially without + # spawning workers. # @param move_data [boolean] If true, all communication will by default # move instead of copy arguments and return values. Default is false. # This setting can be overridden by other `:move_*` settings. @@ -311,7 +313,7 @@ def interpret_setting(setting, default) def initialize(object, use_current_ractor: false, name: nil, - threads: 1, + threads: 0, move_data: false, move_arguments: nil, move_results: nil, @@ -346,9 +348,10 @@ def initialize(object, ## # Set the number of threads to run in the wrapper. If the underlying object - # is thread-safe, this allows concurrent calls to it. If the underlying - # object is not thread-safe, you should leave this set to its default of 1, - # which effectively causes calls to be serialized. + # is thread-safe, setting a value of 2 or more allows concurrent calls to + # it. If the underlying object is not thread-safe, you should leave this + # set to its default of 0, which disables worker threads and handles all + # calls sequentially. # # This method can be called only during an initialization block. # All settings are frozen once the wrapper is active. @@ -357,7 +360,7 @@ def initialize(object, # def threads=(value) value = value.to_i - value = 1 if value < 1 + value = 0 if value.negative? @threads = value end @@ -459,7 +462,7 @@ def enable_logging? end ## - # Return the number of threads used by the wrapper. + # Return the number of worker threads used by the wrapper. # # @return [Integer] # @@ -516,9 +519,11 @@ def call(method_name, *args, **kwargs, &) handle_yield(reply_message, transaction, settings, method_name, &) when ReturnMessage maybe_log("Received result", method_name: method_name, transaction: transaction) + reply_port.close return reply_message.value when ExceptionMessage maybe_log("Received exception", method_name: method_name, transaction: transaction) + reply_port.close raise reply_message.exception end end @@ -554,6 +559,7 @@ def join reply_port = ::Ractor::Port.new @port.send(JoinMessage.new(reply_port)) reply_port.receive + reply_port.close end self rescue ::Ractor::ClosedError @@ -633,6 +639,10 @@ def recover_object private + ## + # Start a server in the current Ractor. + # Passes the object directly to the server. + # def setup_local_server(object) maybe_log("Starting local server") @ractor = nil @@ -646,6 +656,11 @@ def setup_local_server(object) end end + ## + # Start a server in an isolated Ractor. + # This must send the object separately since it must be moved into the + # server's Ractor. + # def setup_isolated_server(object) maybe_log("Starting isolated server") @ractor = ::Ractor.new(name, enable_logging?, threads, name: "wrapper:#{name}") do |name, enable_logging, threads| @@ -657,10 +672,16 @@ def setup_isolated_server(object) @port.send(object, move: true) end + ## + # Create a transaction ID, used for logging + # def make_transaction ::Random.rand(7_958_661_109_946_400_884_391_936).to_s(36).freeze end + ## + # Create the shareable object representing a block in a method call + # def make_block_arg(settings, &) if !block_given? nil @@ -671,6 +692,9 @@ def make_block_arg(settings, &) end end + ## + # Handle a call to a block directed to run in the caller environment. + # def handle_yield(message, transaction, settings, method_name) maybe_log("Yielding to block", method_name: method_name, transaction: transaction) begin @@ -691,6 +715,9 @@ def handle_yield(message, transaction, settings, method_name) end end + ## + # Prints out a log message + # def maybe_log(str, transaction: nil, method_name: nil) return unless enable_logging? metadata = [::Time.now.utc.strftime("%Y-%m-%dT%H:%M:%S.%L"), "Ractor::Wrapper/#{name}"] @@ -702,22 +729,23 @@ def maybe_log(str, transaction: nil, method_name: nil) end ## - # This is the backend implementation of a wrapper. A Server runs within a - # Ractor, and manages a shared object. It handles communication with - # clients, translating those messages into method calls on the object. It - # runs worker threads internally to handle actual method calls. + # @private # - # See the {#run} method for an overview of the Server implementation and - # lifecycle. + # Server is the backend implementation of a wrapper. It listens for method + # call requests on a port, and calls the wrapped object in a controlled + # environment. # - # @private + # It can run: + # + # * Either hosted by an external Ractor or isolated in a dedicated Ractor + # * Either sequentially or concurrently using worker threads. # class Server ## # @private - # Create and run a server in the current Ractor + # Create and run a server hosted in the current Ractor # - def self.run_local(object:, port:, name:, enable_logging: false, threads: 1) + def self.run_local(object:, port:, name:, enable_logging: false, threads: 0) server = new(isolated: false, object:, port:, name:, enable_logging:, threads:) server.run end @@ -726,39 +754,35 @@ def self.run_local(object:, port:, name:, enable_logging: false, threads: 1) # @private # Create and run a server in an isolated Ractor # - def self.run_isolated(name:, enable_logging: false, threads: 1) + def self.run_isolated(name:, enable_logging: false, threads: 0) port = ::Ractor.current.default_port server = new(isolated: true, object: nil, port:, name:, enable_logging:, threads:) server.run end + # @private def initialize(isolated:, object:, port:, name:, enable_logging:, threads:) @isolated = isolated @object = object @port = port @name = name @enable_logging = enable_logging - @threads = threads - @queue = ::Queue.new + @threads = threads.positive? ? threads : nil @join_requests = [] end ## - # Handle the server lifecycle, running through the following phases: - # - # * **init**: Setup and spawning of worker threads. - # * **running**: Normal operation, until a stop request is received. - # * **stopping**: Waiting for worker threads to terminate. - # * **cleanup**: Clearing out of any lingering messages. - # - # The server returns the wrapped object, so one client can recover it. + # @private + # Handle the server lifecycle. + # Returns the wrapped object, so it can be recovered if the server is run + # in a Ractor. # def run receive_remote_object if @isolated - start_threads - running_phase - stopping_phase - cleanup_phase + start_workers if @threads + main_loop + stop_workers if @threads + cleanup @object rescue ::StandardError => e maybe_log("Unexpected error: #{e.inspect}") @@ -767,12 +791,21 @@ def run private + ## + # Receive the moved remote object. Called if the server is run in a + # separate Ractor. + # def receive_remote_object maybe_log("Waiting for remote object") @object = @port.receive end - def start_threads + ## + # Start the worker threads. Each thread picks up methods to run from a + # shared queue. Called only if worker threading is enabled. + # + def start_workers + @queue = ::Queue.new maybe_log("Spawning #{@threads} worker threads") (1..@threads).map do |worker_num| ::Thread.new { worker_thread(worker_num) } @@ -780,29 +813,35 @@ def start_threads end ## - # In the **running phase**, the Server listens on the Ractor's inbox and - # handles messages for normal operation: + # This is the main loop, listening on the inbox and handling messages for + # normal operation: # - # * If it receives a `call` message, it adds it to the job queue from - # which a worker thread will pick it up. - # * If it receives a `stop` message, we proceed to the stopping phase. - # * If it receives a `thread_stopped` message, that indicates one of - # the worker threads has unexpectedly stopped. We don't expect this - # to happen until the stopping phase, so if we do see it here, we - # conclude that something has gone wrong, and we proceed to the - # stopping phase. + # * If it receives a CallMessage, it either runs the method (when in + # sequential mode) or adds it to the job queue (when in worker mode). + # * If it receives a StopMessage, it exits the main loop and proceeds + # to the termination logic. + # * If it receives a JoinMessage, it adds it to the list of join ports + # to notify once the wrapper completes. + # * If it receives a WorkerStoppedMessage, that indicates a worker + # thread has unexpectedly stopped. We conclude something has gone + # wrong with a worker, and we bail, stopping the remaining workers + # and proceeding to termination logic. # - def running_phase + def main_loop loop do maybe_log("Waiting for message in running phase") message = @port.receive case message when CallMessage maybe_log("Received CallMessage", call_message: message) - @queue.enq(message) + if @threads + @queue.enq(message) + else + handle_method(message) + end when WorkerStoppedMessage maybe_log("Received unexpected WorkerStoppedMessage") - @threads -= 1 + @threads -= 1 if @threads break when StopMessage maybe_log("Received stop") @@ -815,14 +854,27 @@ def running_phase end ## - # In the **stopping phase**, we close the job queue, which signals to all - # worker threads that they should finish their current task and then - # terminate. We then wait for acknowledgement messages from all workers - # before proceeding to the next phase. Any `call` requests received - # during stopping are refused (i.e. we send back an error response.) Any - # further `stop` requests are ignored. + # This signals workers to stop by closing the queue, and then waits for + # all workers to report in that they have stopped. It is called only if + # worker threading is enabled. + # + # Responds to messages to indicate the wrapper is stopping and no longer + # accepting new method requests: + # + # * If it receives a CallMessage, it sends back a refusal exception. + # * If it receives a StopMessage, it does nothing (i.e. the stop + # operation is idempotent). + # * If it receives a JoinMessage, it adds it to the list of join ports + # to notify once the wrapper completes. At this point the wrapper is + # not yet considered complete because workers are still processing + # earlier method calls. + # * If it receives a WorkerStoppedMessage, it updates its count of + # running workers. # - def stopping_phase + # This phase continues until all workers have signaled that they have + # stopped. + # + def stop_workers @queue.close while @threads.positive? maybe_log("Waiting for message in stopping phase") @@ -843,11 +895,10 @@ def stopping_phase end ## - # In the **cleanup phase**, The Server closes its inbox, and iterates - # through one final time to ensure it has responded to all remaining - # requests with a refusal. + # This is called when the Server is ready to terminate completely. + # It closes the inbox and responds to any remaining contents. # - def cleanup_phase + def cleanup maybe_log("Closing inbox") @port.close maybe_log("Responding to join requests") @@ -860,7 +911,7 @@ def cleanup_phase maybe_log("Inbox is empty") nil end - return if message.nil? + break if message.nil? case message when CallMessage refuse_method(message) @@ -901,18 +952,19 @@ def worker_thread(worker_num) end ## - # This is called within a worker thread to handle a method call request. + # This is called to handle a method call request. # It calls the method on the wrapped object, and then sends back a # response to the caller. If an exception was raised, it sends back an # error response. It tries very hard always to send a response of some # kind; if an error occurs while constructing or sending a response, it - # will catch the exception and try to send a simpler response. + # will catch the exception and try to send a simpler response. If a block + # was passed to the method, it is also handled here. # def handle_method(message, worker_num: nil) block = make_block(message) maybe_log("Running method", worker_num: worker_num, call_message: message) begin - result = @object.send(message.method_name, *message.args, **message.kwargs, &block) + result = @object.__send__(message.method_name, *message.args, **message.kwargs, &block) maybe_log("Sending return value", worker_num: worker_num, call_message: message) message.reply_port.send(ReturnMessage.new(result), move: message.settings.move_results?) rescue ::Exception => e # rubocop:disable Lint/RescueException @@ -929,6 +981,16 @@ def handle_method(message, worker_num: nil) end end + ## + # Creates a block appropriate to the block specification received with + # the method call message. This could return: + # + # * nil if there was no block + # * the proc itself, if a shareable proc was received + # * otherwise a proc that sends a message back to the caller, along + # with the block arguments, to run the block in the caller's + # environment + # def make_block(message) return message.block_arg unless message.block_arg == :send_block_message proc do |*args, **kwargs| @@ -936,6 +998,7 @@ def make_block(message) yield_message = YieldMessage.new(args: args, kwargs: kwargs, reply_port: reply_port) message.reply_port.send(yield_message, move: message.settings.move_block_arguments?) reply_message = reply_port.receive + reply_port.close case reply_message when ExceptionMessage raise reply_message.exception @@ -956,16 +1019,22 @@ def refuse_method(message) error = ::Ractor::ClosedError.new("Wrapper is shutting down") message.reply_port.send(ExceptionMessage.new(error)) rescue ::Ractor::Error - maybe_log("Failure to send refusal message", call_message: message) + maybe_log("Failed to send refusal message", call_message: message) end end + ## + # This attempts to send a signal that a wrapper join has completed. + # def send_join_reply(port) port.send(nil) rescue ::Ractor::ClosedError maybe_log("Join reply port is closed") end + ## + # Print out a log message + # def maybe_log(str, call_message: nil, worker_num: nil, transaction: nil, method_name: nil) return unless @enable_logging transaction ||= call_message&.transaction diff --git a/test/test_wrapper.rb b/test/test_wrapper.rb index 2ad9191..bb03dcc 100644 --- a/test/test_wrapper.rb +++ b/test/test_wrapper.rb @@ -5,56 +5,77 @@ describe ::Ractor::Wrapper do let(:remote) { RemoteObject.new } - describe "an isolated wrapper" do - let(:wrapper) { Ractor::Wrapper.new(remote) } + wrapper_types = [ + { + desc: "an isolated wrapper", + opts: {use_current_ractor: false}, + }, + { + desc: "a local wrapper", + opts: {use_current_ractor: true}, + }, + ] + threading_types = [ + { + desc: "sequentially", + opts: {threads: 0}, + }, + { + desc: "with worker threads", + opts: {threads: 2}, + }, + ] + + threading_types.each do |config| + describe "an isolated wrapper running #{config[:desc]}" do + let(:base_opts) { config[:opts] } + let(:wrapper) { Ractor::Wrapper.new(remote, **base_opts) } - after { wrapper.async_stop.join } + after { wrapper.async_stop.join } - it "moves a wrapped object" do - wrapper - assert_raises(Ractor::MovedError) do - remote.echo_args + it "moves a wrapped object" do + wrapper + assert_raises(Ractor::MovedError) do + remote.echo_args + end end - end - it "recovers the object" do - wrapper.async_stop - recovered = wrapper.recover_object - assert_equal("[], {}", recovered.echo_args) + it "recovers the object" do + assert_equal("[], {}", remote.echo_args) + wrapper + assert_raises(Ractor::MovedError) do + remote.echo_args + end + wrapper.async_stop + recovered = wrapper.recover_object + assert_equal("[], {}", recovered.echo_args) + end end - end - describe "a local wrapper" do - let(:wrapper) { Ractor::Wrapper.new(remote, use_current_ractor: true) } + describe "a local wrapper" do + let(:base_opts) { config[:opts] } + let(:wrapper) { Ractor::Wrapper.new(remote, **base_opts, use_current_ractor: true) } - after { wrapper.async_stop.join } + after { wrapper.async_stop.join } - it "does not move a wrapped object" do - wrapper - assert_equal("[], {}", remote.echo_args) - end + it "does not move a wrapped object" do + wrapper + assert_equal("[], {}", remote.echo_args) + end - it "refuses to recover" do - wrapper.async_stop - error = assert_raises(Ractor::Error) do - wrapper.recover_object + it "refuses to recover" do + wrapper.async_stop + error = assert_raises(Ractor::Error) do + wrapper.recover_object + end + assert_equal("cannot recover an object from a local wrapper", error.message) end - assert_equal("cannot recover an object from a local wrapper", error.message) end end - [ - { - desc: "an isolated wrapper", - opts: {use_current_ractor: false}, - }, - { - desc: "a local wrapper", - opts: {use_current_ractor: true}, - }, - ].each do |config| - describe "basic behavior of #{config[:desc]}" do - let(:base_opts) { config[:opts] } + wrapper_types.product(threading_types).each do |(config1, config2)| + describe "basic behavior of #{config1[:desc]} running #{config2[:desc]}" do + let(:base_opts) { config1[:opts].merge(config2[:opts]) } before { @wrapper = nil } after { @wrapper&.async_stop&.join } @@ -76,8 +97,8 @@ end end - describe "method features of #{config[:desc]}" do - let(:base_opts) { config[:opts] } + describe "method features of #{config1[:desc]} running #{config2[:desc]}" do + let(:base_opts) { config1[:opts].merge(config2[:opts]) } def wrapper(**) @wrapper ||= Ractor::Wrapper.new(remote, **base_opts, **) @@ -116,8 +137,8 @@ def wrapper(**) end end - describe "object moving and copying in #{config[:desc]}" do - let(:base_opts) { config[:opts] } + describe "object moving and copying in #{config1[:desc]} running #{config2[:desc]}" do + let(:base_opts) { config1[:opts].merge(config2[:opts]) } def wrapper(**) @wrapper ||= Ractor::Wrapper.new(remote, **base_opts, **) @@ -242,101 +263,78 @@ def wrapper(**) refute_equal(obj.object_id, id) end end - end - describe "stubs" do - let(:wrapper) { Ractor::Wrapper.new(remote) } + describe "stubs in #{config1[:desc]} running #{config2[:desc]}" do + let(:base_opts) { config1[:opts].merge(config2[:opts]) } + let(:wrapper) { Ractor::Wrapper.new(remote, **base_opts) } - after { wrapper.async_stop.join } + after { wrapper.async_stop.join } - it "converts method calls with arguments and return values" do - result = wrapper.stub.echo_args(1, 2, a: "b", c: "d") - assert_equal("[1, 2], {a: \"b\", c: \"d\"}", result) - end + it "converts method calls with arguments and return values" do + result = wrapper.stub.echo_args(1, 2, a: "b", c: "d") + assert_equal("[1, 2], {a: \"b\", c: \"d\"}", result) + end - it "converts exceptions" do - exception = assert_raises(RuntimeError) do - wrapper.stub.whoops + it "converts exceptions" do + exception = assert_raises(RuntimeError) do + wrapper.stub.whoops + end + assert_equal("Whoops", exception.message) end - assert_equal("Whoops", exception.message) - end - it "converts respond_to" do - assert(wrapper.stub.respond_to?(:echo_args)) - refute(wrapper.stub.respond_to?(:nonexistent_method)) + it "converts respond_to" do + assert(wrapper.stub.respond_to?(:echo_args)) + refute(wrapper.stub.respond_to?(:nonexistent_method)) + end end end - describe "single-thread lifecycle" do - let(:wrapper) { Ractor::Wrapper.new(remote) } - - after { wrapper.async_stop.join } - - it "recovers the remote" do - assert_equal("[], {}", remote.echo_args) - wrapper - assert_raises(Ractor::MovedError) { remote.echo_args } - wrapper.async_stop - recovered = wrapper.recover_object - assert_equal("[], {}", recovered.echo_args) - end + wrapper_types.each do |config| + describe "non-threaded lifecycle in #{config[:desc]}" do + let(:base_opts) { config[:opts] } + let(:wrapper) { Ractor::Wrapper.new(remote, **base_opts) } - it "calls multiple methods" do - assert_equal("[1], {}", wrapper.call(:echo_args, 1)) - assert_equal("[2], {}", wrapper.call(:echo_args, 2)) - end + after { wrapper.async_stop.join } - it "serializes long-running methods" do - r1 = Ractor.new(wrapper) do |w| - result = w.call(:slow_echo, "hello") - [result, Time.now.to_f] - end - r2 = Ractor.new(wrapper) do |w| - result = w.call(:slow_echo, "world") - [result, Time.now.to_f] + it "serializes long-running methods" do + r1 = Ractor.new(wrapper) do |w| + result = w.call(:slow_echo, "hello") + [result, Time.now.to_f] + end + r2 = Ractor.new(wrapper) do |w| + result = w.call(:slow_echo, "world") + [result, Time.now.to_f] + end + result1, time1 = r1.value + result2, time2 = r2.value + assert_equal("hello", result1) + assert_equal("world", result2) + assert_operator((time1 - time2).abs, :>, 0.9) end - result1, time1 = r1.value - result2, time2 = r2.value - assert_equal("hello", result1) - assert_equal("world", result2) - assert_operator((time1 - time2).abs, :>, 0.9) end - end - - describe "2-thread lifecycle" do - let(:wrapper) { Ractor::Wrapper.new(remote, threads: 2) } - after { wrapper.async_stop.join } - - it "recovers the remote" do - assert_equal("[], {}", remote.echo_args) - wrapper - assert_raises(Ractor::MovedError) { remote.echo_args } - wrapper.async_stop - recovered = wrapper.recover_object - assert_equal("[], {}", recovered.echo_args) - end + describe "2-thread lifecycle in #{config[:desc]}" do + let(:base_opts) { config[:opts] } + let(:wrapper) { Ractor::Wrapper.new(remote, **base_opts, threads: 2) } - it "calls multiple methods" do - assert_equal("[1], {}", wrapper.call(:echo_args, 1)) - assert_equal("[2], {}", wrapper.call(:echo_args, 2)) - end + after { wrapper.async_stop.join } - it "parallelizes long-running methods" do - wrapper.call(:echo_args, 1) - r1 = Ractor.new(wrapper) do |w| - result = w.call(:slow_echo, "hello") - [result, Time.now.to_f] - end - r2 = Ractor.new(wrapper) do |w| - result = w.call(:slow_echo, "world") - [result, Time.now.to_f] - end - result1, time1 = r1.value - result2, time2 = r2.value - assert_equal("hello", result1) - assert_equal("world", result2) - assert_operator((time1 - time2).abs, :<, 0.8) + it "parallelizes long-running methods" do + wrapper.call(:echo_args, 1) + r1 = Ractor.new(wrapper) do |w| + result = w.call(:slow_echo, "hello") + [result, Time.now.to_f] + end + r2 = Ractor.new(wrapper) do |w| + result = w.call(:slow_echo, "world") + [result, Time.now.to_f] + end + result1, time1 = r1.value + result2, time2 = r2.value + assert_equal("hello", result1) + assert_equal("world", result2) + assert_operator((time1 - time2).abs, :<, 0.8) + end end end end