diff --git a/.rubocop.yml b/.rubocop.yml index 57a8912..2e04f09 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -237,6 +237,9 @@ Style/FileTouch: Enabled: true Style/FileWrite: Enabled: true +Style/GlobalVars: + Exclude: + - "test/**/*" Style/GuardClause: Enabled: false Style/HashConversion: diff --git a/Gemfile b/Gemfile index ec945f3..8f68548 100644 --- a/Gemfile +++ b/Gemfile @@ -4,7 +4,6 @@ source "https://rubygems.org" gemspec -gem "faraday", "~> 2.14" gem "minitest", "~> 6.0", ">= 6.0.1" gem "minitest-focus", "~> 1.4", ">= 1.4.1" gem "minitest-rg", "~> 5.4" diff --git a/README.md b/README.md index e3c31d5..1537c1e 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,7 @@ # Ractor::Wrapper -`Ractor::Wrapper` is an experimental class that wraps a non-shareable object, -allowing multiple Ractors to access it concurrently. This can make it possible -for Ractors to share a "plain" object such as a database connection. +Ractor::Wrapper is an experimental class that wraps a non-shareable object in +an actor, allowing multiple Ractors to access it concurrently. **WARNING:** This is a highly experimental library, and currently _not_ recommended for production use. (As of Ruby 4.0.0, the same can be said of @@ -20,73 +19,121 @@ Require it in your code: You can then create wrappers for objects. See the example below. -`Ractor::Wrapper` requires Ruby 4.0.0 or later. +Ractor::Wrapper requires Ruby 4.0.0 or later. -## About Ractor::Wrapper +## What is Ractor::Wrapper? -Ractors for the most part cannot access objects concurrently with other Ractors -unless the object is _shareable_, which generally means deeply immutable along -with a few other restrictions. If multiple Ractors need to interact with a -shared resource that is stateful or otherwise not shareable that resource must -itself be implemented and accessed as a Ractor. +For the most part, unless an object is _sharable_, which generally means +deeply immutable along with a few other restrictions, it cannot be accessed +directly from another Ractor. This makes it difficult for multiple Ractors +to share a resource that is stateful. Such a resource must typically itself +be implemented as a Ractor and accessed via message passing. -`Ractor::Wrapper` makes it possible for such a shared resource to be -implemented as an object and accessed using ordinary method calls. It does this -by "wrapping" the object in a Ractor, and mapping method calls to message -passing. This may make it easier to implement such a resource with a simple -class rather than a full-blown Ractor with message passing, and it may also be -useful for adapting existing object-based resources. +Ractor::Wrapper makes it possible for an ordinary non-shareable object to +be accessed from multiple Ractors. It does this by "wrapping" the object +with an actor that listens for messages and invokes the object's methods in +a controlled single-Ractor environment. It then provides a stub object that +reproduces the interface of the original object, but responds to method +calls by sending messages to the wrapper. Ractor::Wrapper can be used to +implement simple actors by writing "plain" Ruby objects, or to adapt +existing non-shareable objects to a multi-Ractor world. -Given a shared resource object, `Ractor::Wrapper` starts a new Ractor and -"runs" the object within that Ractor. It provides you with a stub object on -which you can invoke methods. The wrapper responds to these method calls by -sending messages to the internal Ractor, which invokes the shared object and -then sends back the result. If the underlying object is thread-safe, you can -configure the wrapper to run multiple threads that can run methods -concurrently. Or, if not, the wrapper can serialize requests to the object. +### Net::HTTP example -### Example usage - -The following example shows how to share a single `Faraday::Conection` -object among multiple Ractors. Because `Faraday::Connection` is not itself -thread-safe, this example serializes all calls to it. +The following example shows how to share a single Net::HTTP session object +among multiple Ractors. ```ruby require "ractor/wrapper" -require "faraday" +require "net/http" -# Create a Faraday connection. Faraday connections are not shareable, +# Create a Net::HTTP session. Net::HTTP sessions are not shareable, # so normally only one Ractor can access them at a time. -connection = Faraday.new("http://example.com") +http = Net::HTTP.new("example.com") +http.start -# Create a wrapper around the connection. This starts up an internal -# Ractor and "moves" the connection object to that Ractor. -wrapper = Ractor::Wrapper.new(connection) +# Create a wrapper around the session. This moves the session into an +# internal Ractor and listens for method call requests. By default, a +# wrapper serializes calls, handling one at a time, for compatibility +# with non-thread-safe objects. +wrapper = Ractor::Wrapper.new(http) -# At this point, the connection object can no longer be accessed -# directly because it is now owned by the wrapper's internal Ractor. -# connection.get("/whoops") # <= raises an error +# At this point, the session object can no longer be accessed directly +# because it is now owned by the wrapper's internal Ractor. +# http.get("/whoops") # <= raises Ractor::MovedError -# However, you can access the connection via the stub object provided -# by the wrapper. This stub proxies the call to the wrapper's internal +# However, you can access the session via the stub object provided by +# the wrapper. This stub proxies the call to the wrapper's internal # Ractor. And it's shareable, so any number of Ractors can use it. -wrapper.stub.get("/hello") +response = wrapper.stub.get("/") # Here, we start two Ractors, and pass the stub to each one. Each # Ractor can simply call methods on the stub as if it were the original -# connection object. (Internally, of course, the calls are proxied back -# to the wrapper.) By default, all calls are serialized. However, if -# you know that the underlying object is thread-safe, you can configure -# a wrapper to run calls concurrently. -r1 = Ractor.new(wrapper.stub) do |conn| - 10.times do - conn.get("/hello") +# connection object. Internally, of course, the calls are proxied to +# the original object via the wrapper, and execution is serialized. +r1 = Ractor.new(wrapper.stub) do |stub| + 5.times do + stub.get("/hello") + end + :ok +end +r2 = Ractor.new(wrapper.stub) do |stub| + 5.times do + stub.get("/ruby") + end + :ok +end + +# Wait for the two above Ractors to finish. +r1.join +r2.join + +# After you stop the wrapper, you can retrieve the underlying session +# object and access it directly again. +wrapper.async_stop +http = wrapper.recover_object +http.finish +``` + +### SQLite3 example + +The following example shows how to share a SQLite3 database among multiple +Ractors. + +```ruby +require "ractor/wrapper" +require "sqlite3" + +# Create a SQLite3 database. These objects are not shareable, so +# normally only one Ractor can access them. +db = SQLite3::Database.new($my_database_path) + +# Create a wrapper around the database. A SQLite3::Database object +# cannot be moved between Ractors, so we configure the wrapper to run +# in the current Ractor. You can also configure it to run multiple +# worker threads because the database object itself is thread-safe. +wrapper = Ractor::Wrapper.new(db, use_current_ractor: true, threads: 2) + +# At this point, the database object can still be accessed directly +# because it hasn't been moved to a different Ractor. +rows = db.execute("select * from numbers") + +# You can also access the database via the stub object provided by the +# wrapper. +rows = wrapper.stub.execute("select * from numbers") + +# Here, we start two Ractors, and pass the stub to each one. The +# wrapper's two worker threads will handle the requests in the order +# received. +r1 = Ractor.new(wrapper.stub) do |stub| + 5.times do + stub.execute("select * from numbers") end :ok end -r2 = Ractor.new(wrapper.stub) do |conn| - 10.times do - conn.get("/ruby") +r2 = Ractor.new(wrapper.stub) do |stub| + 5.times do + stub.execute("select * from numbers") end :ok end @@ -95,17 +142,23 @@ end r1.join r2.join -# After you stop the wrapper, you can retrieve the underlying -# connection object and access it directly again. +# After stopping the wrapper, you can call the join method to wait for +# it to completely finish. wrapper.async_stop -connection = wrapper.recover_object -connection.get("/finally") +wrapper.join + +# When running a wrapper with :use_current_ractor, you do not need to +# recover the object, because it was never moved. The recover_object +# method is not available. +# db2 = wrapper.recover_object # <= raises Ractor::Error ``` ### Features -* Provides a method interface to an object running in its own Ractor. +* Provides a Ractor-shareable method interface to a non-shareable object. * Supports arbitrary method arguments and return values. +* Can be configured to run in its own isolated Ractor or in a Thread in + the current Ractor. * Can be configured per method whether to copy or move arguments and return values. * Blocks can be run in the calling Ractor or in the object Ractor. @@ -125,8 +178,8 @@ Ruby 4.0.0. * Certain types cannot be used as method arguments or return values because Ractor does not allow them to be moved between Ractors. These include threads, backtraces, and a few others. -* Any exceptions raised are always copied back to the calling Ractor, and - the backtrace is cleared out. This is due to +* Any exceptions raised are always copied (rather than moved) back to the + calling Ractor, and the backtrace is cleared out. This is due to https://bugs.ruby-lang.org/issues/21818 ## Contributing @@ -142,7 +195,7 @@ Development is done in GitHub at https://github.com/dazuma/ractor-wrapper. The library uses [toys](https://dazuma.github.io/toys) for testing and CI. To run the test suite, `gem install toys` and then run `toys ci`. You can also run -unit tests, rubocop, and builds independently. +unit tests, rubocop, and build tests independently. ## License diff --git a/lib/ractor/wrapper.rb b/lib/ractor/wrapper.rb index 8aadc06..c70d8b4 100644 --- a/lib/ractor/wrapper.rb +++ b/lib/ractor/wrapper.rb @@ -5,9 +5,8 @@ # class Ractor ## - # An experimental class that wraps a non-shareable object, allowing multiple - # Ractors to access it concurrently. This can make it possible for Ractors to - # share a "plain" object such as a database connection. + # An experimental class that wraps a non-shareable object in an actor, + # allowing multiple Ractors to access it concurrently. # # WARNING: This is a highly experimental library, and currently _not_ # recommended for production use. (As of Ruby 4.0.0, the same can be said of @@ -15,68 +14,114 @@ class Ractor # # ## What is Ractor::Wrapper? # - # Ractors for the most part cannot access objects concurrently with other - # Ractors unless the object is _shareable_, which generally means deeply - # immutable along with a few other restrictions. If multiple Ractors need to - # interact with a shared resource that is stateful or otherwise not shareable - # that resource must itself be implemented and accessed as a Ractor. + # For the most part, unless an object is _sharable_, which generally means + # deeply immutable along with a few other restrictions, it cannot be accessed + # directly from another Ractor. This makes it difficult for multiple Ractors + # to share a resource that is stateful. Such a resource must typically itself + # be implemented as a Ractor and accessed via message passing. # - # `Ractor::Wrapper` makes it possible for such a shared resource to be - # implemented as an object and accessed using ordinary method calls. It does - # this by "wrapping" the object in a Ractor, and mapping method calls to - # message passing. This may make it easier to implement such a resource with - # a simple class rather than a full-blown Ractor with message passing, and it - # may also be useful for adapting existing object-based resources. + # Ractor::Wrapper makes it possible for an ordinary non-shareable object to + # be accessed from multiple Ractors. It does this by "wrapping" the object + # with an actor that listens for messages and invokes the object's methods in + # a controlled single-Ractor environment. It then provides a stub object that + # reproduces the interface of the original object, but responds to method + # calls by sending messages to the wrapper. Ractor::Wrapper can be used to + # implement simple actors by writing "plain" Ruby objects, or to adapt + # existing non-shareable objects to a multi-Ractor world. # - # Given a shared resource object, `Ractor::Wrapper` starts a new Ractor and - # "runs" the object within that Ractor. It provides you with a stub object - # on which you can invoke methods. The wrapper responds to these method calls - # by sending messages to the internal Ractor, which invokes the shared object - # and then sends back the result. If the underlying object is thread-safe, - # you can configure the wrapper to run multiple threads that can run methods - # concurrently. Or, if not, the wrapper can serialize requests to the object. + # ## Net::HTTP example # - # ## Example usage - # - # The following example shows how to share a single `Faraday::Conection` - # object among multiple Ractors. Because `Faraday::Connection` is not itself - # thread-safe, this example serializes all calls to it. + # The following example shows how to share a single Net::HTTP session object + # among multiple Ractors. # # require "ractor/wrapper" - # require "faraday" + # require "net/http" # - # # Create a Faraday connection. Faraday connections are not shareable, + # # Create a Net::HTTP session. Net::HTTP sessions are not shareable, # # so normally only one Ractor can access them at a time. - # connection = Faraday.new("http://example.com") + # http = Net::HTTP.new("example.com") + # http.start # - # # Create a wrapper around the connection. This starts up an internal - # # Ractor and "moves" the connection object to that Ractor. - # wrapper = Ractor::Wrapper.new(connection) + # # Create a wrapper around the session. This moves the session into an + # # internal Ractor and listens for method call requests. By default, a + # # wrapper serializes calls, handling one at a time, for compatibility + # # with non-thread-safe objects. + # wrapper = Ractor::Wrapper.new(http) # - # # At this point, the connection object can no longer be accessed - # # directly because it is now owned by the wrapper's internal Ractor. - # # connection.get("/whoops") # <= raises an error + # # At this point, the session object can no longer be accessed directly + # # because it is now owned by the wrapper's internal Ractor. + # # http.get("/whoops") # <= raises Ractor::MovedError # - # # However, you can access the connection via the stub object provided - # # by the wrapper. This stub proxies the call to the wrapper's internal + # # However, you can access the session via the stub object provided by + # # the wrapper. This stub proxies the call to the wrapper's internal # # Ractor. And it's shareable, so any number of Ractors can use it. - # wrapper.stub.get("/hello") + # response = wrapper.stub.get("/") # # # Here, we start two Ractors, and pass the stub to each one. Each # # Ractor can simply call methods on the stub as if it were the original - # # connection object. (Internally, of course, the calls are proxied back - # # to the wrapper.) By default, all calls are serialized. However, if - # # you know that the underlying object is thread-safe, you can configure - # # a wrapper to run calls concurrently. - # r1 = Ractor.new(wrapper.stub) do |conn| - # 10.times do - # conn.get("/hello") + # # connection object. Internally, of course, the calls are proxied to + # # the original object via the wrapper, and execution is serialized. + # r1 = Ractor.new(wrapper.stub) do |stub| + # 5.times do + # stub.get("/hello") + # end + # :ok + # end + # r2 = Ractor.new(wrapper.stub) do |stub| + # 5.times do + # stub.get("/ruby") + # end + # :ok + # end + # + # # Wait for the two above Ractors to finish. + # r1.join + # r2.join + # + # # After you stop the wrapper, you can retrieve the underlying session + # # object and access it directly again. + # wrapper.async_stop + # http = wrapper.recover_object + # http.finish + # + # ## SQLite3 example + # + # The following example shows how to share a SQLite3 database among multiple + # Ractors. + # + # require "ractor/wrapper" + # require "sqlite3" + # + # # Create a SQLite3 database. These objects are not shareable, so + # # normally only one Ractor can access them. + # db = SQLite3::Database.new($my_database_path) + # + # # Create a wrapper around the database. A SQLite3::Database object + # # cannot be moved between Ractors, so we configure the wrapper to run + # # in the current Ractor. You can also configure it to run multiple + # # worker threads because the database object itself is thread-safe. + # wrapper = Ractor::Wrapper.new(db, use_current_ractor: true, threads: 2) + # + # # At this point, the database object can still be accessed directly + # # because it hasn't been moved to a different Ractor. + # rows = db.execute("select * from numbers") + # + # # You can also access the database via the stub object provided by the + # # wrapper. + # rows = wrapper.stub.execute("select * from numbers") + # + # # Here, we start two Ractors, and pass the stub to each one. The + # # wrapper's two worker threads will handle the requests in the order + # # received. + # r1 = Ractor.new(wrapper.stub) do |db_stub| + # 5.times do + # rows = db_stub.execute("select * from numbers") # end # :ok # end - # r2 = Ractor.new(wrapper.stub) do |conn| - # 10.times do - # conn.get("/ruby") + # r2 = Ractor.new(wrapper.stub) do |db_stub| + # 5.times do + # rows = db_stub.execute("select * from numbers") # end # :ok # end @@ -85,16 +130,22 @@ class Ractor # r1.join # r2.join # - # # After you stop the wrapper, you can retrieve the underlying - # # connection object and access it directly again. + # # After stopping the wrapper, you can call the join method to wait for + # # it to completely finish. # wrapper.async_stop - # connection = wrapper.recover_object - # connection.get("/finally") + # wrapper.join + # + # # When running a wrapper with :use_current_ractor, you do not need to + # # recover the object, because it was never moved. The recover_object + # # method is not available. + # # db2 = wrapper.recover_object # <= raises Ractor::Error # # ## Features # - # * Provides a method interface to an object running in its own Ractor. + # * Provides a Ractor-shareable method interface to a non-shareable object. # * Supports arbitrary method arguments and return values. + # * Can be configured to run in its own isolated Ractor or in a Thread in + # the current Ractor. # * Can be configured per method whether to copy or move arguments and # return values. # * Blocks can be run in the calling Ractor or in the object Ractor. @@ -112,10 +163,10 @@ class Ractor # access any data outside the block. Otherwise, the block must be run in # the calling Ractor. # * Certain types cannot be used as method arguments or return values - # because Ractor does not allow them to be moved between Ractors. These - # include threads, backtraces, and a few others. - # * Any exceptions raised are always copied back to the calling Ractor, and - # the backtrace is cleared out. This is due to + # because they cannot be moved between Ractors. These include threads, + # backtraces, and a few others. + # * Any exceptions raised are always copied (rather than moved) back to the + # calling Ractor, and the backtrace is cleared out. This is due to # https://bugs.ruby-lang.org/issues/21818 # class Wrapper @@ -158,17 +209,17 @@ def respond_to_missing?(name, include_all) # class MethodSettings # @private - def initialize(move: false, + def initialize(move_data: false, move_arguments: nil, - move_result: nil, - execute_block_in_ractor: nil, + move_results: nil, move_block_arguments: nil, - move_block_result: nil) - @move_arguments = interpret_setting(move_arguments, move) - @move_result = interpret_setting(move_result, move) - @execute_block_in_ractor = interpret_setting(execute_block_in_ractor, false) - @move_block_arguments = interpret_setting(move_block_arguments, move) - @move_block_result = interpret_setting(move_block_result, move) + move_block_results: nil, + execute_blocks_in_place: nil) + @move_arguments = interpret_setting(move_arguments, move_data) + @move_results = interpret_setting(move_results, move_data) + @move_block_arguments = interpret_setting(move_block_arguments, move_data) + @move_block_results = interpret_setting(move_block_results, move_data) + @execute_blocks_in_place = interpret_setting(execute_blocks_in_place, false) freeze end @@ -182,15 +233,8 @@ def move_arguments? ## # @return [Boolean] Whether to move return values # - def move_result? - @move_result - end - - ## - # @return [Boolean] Whether to call blocks in-ractor - # - def execute_block_in_ractor? - @execute_block_in_ractor + def move_results? + @move_results end ## @@ -203,8 +247,15 @@ def move_block_arguments? ## # @return [Boolean] Whether to move block results # - def move_block_result? - @move_block_result + def move_block_results? + @move_block_results + end + + ## + # @return [Boolean] Whether to call blocks in-place + # + def execute_blocks_in_place? + @execute_blocks_in_place end private @@ -227,53 +278,69 @@ def interpret_setting(setting, default) # The configuration is frozen once the object is constructed. # # @param object [Object] The non-shareable object to wrap. + # @param use_current_ractor [boolean] If true, the wrapper is run in a + # thread in the current Ractor instead of spawning a new Ractor (the + # default behavior). This option can be used if the wrapped object + # cannot be moved or must run in the main Ractor. # @param name [String] A name for this wrapper. Used during logging. - # @param logging_enabled [boolean] Set to true to enable logging. Default - # is false. - # @param thread_count [Integer] The number of worker threads to run. + # @param threads [Integer] The number of worker threads to run. # Defaults to 1, which causes the worker to serialize calls into a # single thread. - # @param move [boolean] If true, all communication will by default move - # instead of copy arguments and return values. Default is false. + # @param move_data [boolean] If true, all communication will by default + # move instead of copy arguments and return values. Default is false. + # This setting can be overridden by other `:move_*` settings. # @param move_arguments [boolean] If true, all arguments will be moved - # instead of copied by default. If not set, uses the `:move` setting. - # @param move_result [boolean] If true, return values will be moved instead - # of copied by default. If not set, uses the `:move` setting. + # instead of copied by default. If not set, uses the `:move_data` + # setting. + # @param move_results [boolean] If true, return values are moved instead of + # copied by default. If not set, uses the `:move_data` setting. + # @param move_block_arguments [boolean] If true, arguments to blocks are + # moved instead of copied by default. If not set, uses the `:move_data` + # setting. + # @param move_block_results [boolean] If true, result values from blocks + # are moved instead of copied by default. If not set, uses the + # `:move_data` setting. + # @param execute_blocks_in_place [boolean] If true, blocks passed to + # methods are made shareable and passed into the wrapper to be executed + # in the wrapped environment. If false (the default), blocks are + # replaced by a proc that passes messages back out to the caller and + # executes the block in the caller's environment. + # @param enable_logging [boolean] Set to true to enable logging. Default + # is false. # def initialize(object, + use_current_ractor: false, name: nil, - logging_enabled: false, - thread_count: 1, - move: false, + threads: 1, + move_data: false, move_arguments: nil, - move_result: nil, - execute_block_in_ractor: nil, + move_results: nil, move_block_arguments: nil, - move_block_result: nil) - raise ::Ractor::MovedError, "can not wrap a moved object" if ::Ractor::MovedObject === object + move_block_results: nil, + execute_blocks_in_place: nil, + enable_logging: false) + raise ::Ractor::MovedError, "cannot wrap a moved object" if ::Ractor::MovedObject === object @method_settings = {} self.name = name || object_id.to_s - self.logging_enabled = logging_enabled - self.thread_count = thread_count - configure_method(move: move, + self.enable_logging = enable_logging + self.threads = threads + configure_method(move_data: move_data, move_arguments: move_arguments, - move_result: move_result, - execute_block_in_ractor: execute_block_in_ractor, + move_results: move_results, move_block_arguments: move_block_arguments, - move_block_result: move_block_result) + move_block_results: move_block_results, + execute_blocks_in_place: execute_blocks_in_place) yield self if block_given? @method_settings.freeze - maybe_log("Starting server") - @ractor = ::Ractor.new(self.name, name: "wrapper: #{name}") do |wrapper_name| - Server.new(wrapper_name).run + if use_current_ractor + setup_local_server(object) + else + setup_isolated_server(object) end - init_message = InitMessage.new(object: object, - logging_enabled: self.logging_enabled, - thread_count: self.thread_count) - @ractor.send(init_message, move: true) @stub = Stub.new(self) + freeze end @@ -288,10 +355,10 @@ def initialize(object, # # @param value [Integer] # - def thread_count=(value) + def threads=(value) value = value.to_i value = 1 if value < 1 - @thread_count = value + @threads = value end ## @@ -302,8 +369,8 @@ def thread_count=(value) # # @param value [Boolean] # - def logging_enabled=(value) - @logging_enabled = value ? true : false + def enable_logging=(value) + @enable_logging = value ? true : false end ## @@ -330,57 +397,73 @@ def name=(value) # # @param method_name [Symbol, nil] The name of the method being configured, # or `nil` to set defaults for all methods not configured explicitly. - # @param move [Boolean] Whether to move all communication. This value, if - # given, is used if `move_arguments`, `move_result`, or - # `move_exceptions` are not set. - # @param move_arguments [Boolean] Whether to move arguments. - # @param move_result [Boolean] Whether to move return values. + # @param move_data [boolean] If true, communication for this method will + # move instead of copy arguments and return values. Default is false. + # This setting can be overridden by other `:move_*` settings. + # @param move_arguments [boolean] If true, arguments for this method are + # moved instead of copied. If not set, uses the `:move_data` setting. + # @param move_results [boolean] If true, return values for this method are + # moved instead of copied. If not set, uses the `:move_data` setting. + # @param move_block_arguments [boolean] If true, arguments to blocks passed + # to this method are moved instead of copied. If not set, uses the + # `:move_data` setting. + # @param move_block_results [boolean] If true, result values from blocks + # passed to this method are moved instead of copied. If not set, uses + # the `:move_data` setting. + # @param execute_blocks_in_place [boolean] If true, blocks passed to this + # method are made shareable and passed into the wrapper to be executed + # in the wrapped environment. If false (the default), blocks are + # replaced by a proc that passes messages back out to the caller and + # executes the block in the caller's environment. # def configure_method(method_name = nil, - move: false, + move_data: false, move_arguments: nil, - move_result: nil, - execute_block_in_ractor: nil, + move_results: nil, move_block_arguments: nil, - move_block_result: nil) + move_block_results: nil, + execute_blocks_in_place: nil) method_name = method_name.to_sym unless method_name.nil? @method_settings[method_name] = - MethodSettings.new(move: move, + MethodSettings.new(move_data: move_data, move_arguments: move_arguments, - move_result: move_result, - execute_block_in_ractor: execute_block_in_ractor, + move_results: move_results, move_block_arguments: move_block_arguments, - move_block_result: move_block_result) + move_block_results: move_block_results, + execute_blocks_in_place: execute_blocks_in_place) end ## - # Return the wrapper stub. This is an object that responds to the same - # methods as the wrapped object, providing an easy way to call a wrapper. + # Return the name of this wrapper. # - # @return [Ractor::Wrapper::Stub] + # @return [String] # - attr_reader :stub + attr_reader :name ## - # Return the number of threads used by the wrapper. + # Determine whether this wrapper runs in the current Ractor # - # @return [Integer] + # @return [boolean] # - attr_reader :thread_count + def use_current_ractor? + @ractor.nil? + end ## # Return whether logging is enabled for this wrapper. # # @return [Boolean] # - attr_reader :logging_enabled + def enable_logging? + @enable_logging + end ## - # Return the name of this wrapper. + # Return the number of threads used by the wrapper. # - # @return [String] + # @return [Integer] # - attr_reader :name + attr_reader :threads ## # Return the method settings for the given method name. This returns the @@ -396,6 +479,14 @@ def method_settings(method_name) @method_settings[method_name] || @method_settings[nil] end + ## + # Return the wrapper stub. This is an object that responds to the same + # methods as the wrapped object, providing an easy way to call a wrapper. + # + # @return [Ractor::Wrapper::Stub] + # + attr_reader :stub + ## # A lower-level interface for calling methods through the wrapper. # @@ -408,7 +499,7 @@ def call(method_name, *args, **kwargs, &) reply_port = ::Ractor::Port.new transaction = ::Random.rand(7_958_661_109_946_400_884_391_936).to_s(36).freeze settings = method_settings(method_name) - block_arg = settings.execute_block_in_ractor? ? ::Ractor.shareable_proc(&) : :message + block_arg = make_block_arg(settings, &) message = CallMessage.new(method_name: method_name, args: args, kwargs: kwargs, @@ -417,7 +508,7 @@ def call(method_name, *args, **kwargs, &) settings: settings, reply_port: reply_port) maybe_log("Sending method", method_name: method_name, transaction: transaction) - @ractor.send(message, move: settings.move_arguments?) + @port.send(message, move: settings.move_arguments?) loop do reply_message = reply_port.receive case reply_message @@ -444,7 +535,7 @@ def call(method_name, *args, **kwargs, &) # def async_stop maybe_log("Stopping wrapper") - @ractor.send(StopMessage.new.freeze) + @port.send(StopMessage.new.freeze) self rescue ::Ractor::ClosedError # Ignore to allow stops to be idempotent. @@ -457,7 +548,15 @@ def async_stop # @return [self] # def join - @ractor.join + if @ractor + @ractor.join + else + reply_port = ::Ractor::Port.new + @port.send(JoinMessage.new(reply_port)) + reply_port.receive + end + self + rescue ::Ractor::ClosedError self end @@ -466,11 +565,18 @@ def join # only after a stop request has been issued using {#async_stop}, and may # block until the wrapper has fully stopped. # - # Only one ractor may call this method; any additional calls will fail. + # This can be called only if the wrapper was *not* configured with + # `use_current_ractor: true`. If the wrapper had that configuration, the + # object will not be moved, and does not need to be recovered. In such a + # case, any calls to this method will raise Ractor::Error. + # + # Only one ractor may call this method; any additional calls will fail with + # a Ractor::Error. # # @return [Object] The original wrapped object # def recover_object + raise ::Ractor::Error, "cannot recover an object from a local wrapper" unless @ractor @ractor.value end @@ -480,7 +586,7 @@ def recover_object # @private # Message sent to initialize a server. # - InitMessage = ::Data.define(:object, :logging_enabled, :thread_count) + InitMessage = ::Data.define(:object, :enable_logging, :threads) ## # @private @@ -501,6 +607,12 @@ def recover_object # StopMessage = ::Data.define + ## + # @private + # Message sent to a server to request a join response + # + JoinMessage = ::Data.define(:reply_port) + ## # @private # Message sent to report a return value @@ -521,12 +633,50 @@ def recover_object private + def setup_local_server(object) + maybe_log("Starting local server") + @ractor = nil + @port = ::Ractor::Port.new + ::Thread.new do + Server.run_local(object: object, + port: @port, + name: name, + enable_logging: enable_logging?, + threads: threads) + end + end + + def setup_isolated_server(object) + maybe_log("Starting isolated server") + @ractor = ::Ractor.new(name, enable_logging?, threads, name: "wrapper:#{name}") do |name, enable_logging, threads| + Server.run_isolated(name: name, + enable_logging: enable_logging, + threads: threads) + end + @port = @ractor.default_port + @port.send(object, move: true) + end + + def make_transaction + ::Random.rand(7_958_661_109_946_400_884_391_936).to_s(36).freeze + end + + def make_block_arg(settings, &) + if !block_given? + nil + elsif settings.execute_blocks_in_place? + ::Ractor.shareable_proc(&) + else + :send_block_message + end + end + def handle_yield(message, transaction, settings, method_name) maybe_log("Yielding to block", method_name: method_name, transaction: transaction) begin block_result = yield(*message.args, **message.kwargs) maybe_log("Sending block result", method_name: method_name, transaction: transaction) - message.reply_port.send(ReturnMessage.new(block_result), move: settings.move_block_result?) + message.reply_port.send(ReturnMessage.new(block_result), move: settings.move_block_results?) rescue ::Exception => e # rubocop:disable Lint/RescueException maybe_log("Sending block exception", method_name: method_name, transaction: transaction) begin @@ -542,7 +692,7 @@ def handle_yield(message, transaction, settings, method_name) end def maybe_log(str, transaction: nil, method_name: nil) - return unless logging_enabled + return unless enable_logging? metadata = [::Time.now.utc.strftime("%Y-%m-%dT%H:%M:%S.%L"), "Ractor::Wrapper/#{name}"] metadata << "Transaction/#{transaction}" if transaction metadata << "Method/#{method_name}" if method_name @@ -558,13 +708,39 @@ def maybe_log(str, transaction: nil, method_name: nil) # runs worker threads internally to handle actual method calls. # # See the {#run} method for an overview of the Server implementation and - # lifecycle. Server is stateful and not shareable. + # lifecycle. # # @private # class Server - def initialize(name) + ## + # @private + # Create and run a server in the current Ractor + # + def self.run_local(object:, port:, name:, enable_logging: false, threads: 1) + server = new(isolated: false, object:, port:, name:, enable_logging:, threads:) + server.run + end + + ## + # @private + # Create and run a server in an isolated Ractor + # + def self.run_isolated(name:, enable_logging: false, threads: 1) + port = ::Ractor.current.default_port + server = new(isolated: true, object: nil, port:, name:, enable_logging:, threads:) + server.run + end + + def initialize(isolated:, object:, port:, name:, enable_logging:, threads:) + @isolated = isolated + @object = object + @port = port @name = name + @enable_logging = enable_logging + @threads = threads + @queue = ::Queue.new + @join_requests = [] end ## @@ -578,7 +754,8 @@ def initialize(name) # The server returns the wrapped object, so one client can recover it. # def run - init_phase + receive_remote_object if @isolated + start_threads running_phase stopping_phase cleanup_phase @@ -590,24 +767,14 @@ def run private - ## - # In the **init phase**, the Server: - # - # * Receives an initial message providing the object to wrap, and - # server configuration such as thread count and communications - # settings. - # * Initializes the job queue. - # * Spawns worker threads. - # - def init_phase - maybe_log("Waiting for initialization") - init_message = ::Ractor.receive - @object = init_message.object - @logging_enabled = init_message.logging_enabled - @thread_count = init_message.thread_count - @queue = ::Queue.new - maybe_log("Spawning #{@thread_count} worker threads") - (1..@thread_count).map do |worker_num| + def receive_remote_object + maybe_log("Waiting for remote object") + @object = @port.receive + end + + def start_threads + maybe_log("Spawning #{@threads} worker threads") + (1..@threads).map do |worker_num| ::Thread.new { worker_thread(worker_num) } end end @@ -627,19 +794,22 @@ def init_phase # def running_phase loop do - maybe_log("Waiting for message") - message = ::Ractor.receive + maybe_log("Waiting for message in running phase") + message = @port.receive case message when CallMessage maybe_log("Received CallMessage", call_message: message) @queue.enq(message) when WorkerStoppedMessage maybe_log("Received unexpected WorkerStoppedMessage") - @thread_count -= 1 + @threads -= 1 break when StopMessage maybe_log("Received stop") break + when JoinMessage + maybe_log("Received and queueing join request") + @join_requests << message.reply_port end end end @@ -654,21 +824,20 @@ def running_phase # def stopping_phase @queue.close - while @thread_count.positive? - maybe_log("Refusing incoming messages while stopping") - message = ::Ractor.receive + while @threads.positive? + maybe_log("Waiting for message in stopping phase") + message = @port.receive case message when CallMessage - begin - refuse_method(message) - rescue ::Ractor::ClosedError - maybe_log("Reply port is closed", call_message: message) - end + refuse_method(message) when WorkerStoppedMessage maybe_log("Acknowledged WorkerStoppedMessage: #{message.worker_num}") - @thread_count -= 1 + @threads -= 1 when StopMessage maybe_log("Stop received when already stopping") + when JoinMessage + maybe_log("Received and queueing join request") + @join_requests << message.reply_port end end end @@ -679,25 +848,31 @@ def stopping_phase # requests with a refusal. # def cleanup_phase - ::Ractor.current.close - maybe_log("Checking message queue for cleanup") + maybe_log("Closing inbox") + @port.close + maybe_log("Responding to join requests") + @join_requests.each { |port| send_join_reply(port) } + maybe_log("Draining inbox") loop do - message = ::Ractor.receive + message = begin + @port.receive + rescue ::Ractor::ClosedError + maybe_log("Inbox is empty") + nil + end + return if message.nil? case message when CallMessage - begin - refuse_method(message) - rescue ::Ractor::ClosedError - maybe_log("Reply port is closed", call_message: message) - end + refuse_method(message) when WorkerStoppedMessage maybe_log("Unexpected WorkerStoppedMessage when in cleanup") when StopMessage maybe_log("Stop received when already stopping") + when JoinMessage + maybe_log("Received and responding immediately to join request") + send_join_reply(message.reply_port) end end - rescue ::Ractor::ClosedError - maybe_log("Message queue is empty") end ## @@ -719,7 +894,7 @@ def worker_thread(worker_num) ensure maybe_log("Worker stopping", worker_num: worker_num) begin - ::Ractor.current.send(WorkerStoppedMessage.new(worker_num)) + @port.send(WorkerStoppedMessage.new(worker_num)) rescue ::Ractor::ClosedError maybe_log("Orphaned worker thread", worker_num: worker_num) end @@ -734,13 +909,12 @@ def worker_thread(worker_num) # will catch the exception and try to send a simpler response. # def handle_method(message, worker_num: nil) - block = message.block_arg - block = make_proxy_block(message.reply_port, message.settings) if block == :message + block = make_block(message) maybe_log("Running method", worker_num: worker_num, call_message: message) begin result = @object.send(message.method_name, *message.args, **message.kwargs, &block) maybe_log("Sending return value", worker_num: worker_num, call_message: message) - message.reply_port.send(ReturnMessage.new(result), move: message.settings.move_result?) + message.reply_port.send(ReturnMessage.new(result), move: message.settings.move_results?) rescue ::Exception => e # rubocop:disable Lint/RescueException maybe_log("Sending exception", worker_num: worker_num, call_message: message) begin @@ -755,11 +929,12 @@ def handle_method(message, worker_num: nil) end end - def make_proxy_block(port, settings) + def make_block(message) + return message.block_arg unless message.block_arg == :send_block_message proc do |*args, **kwargs| reply_port = ::Ractor::Port.new yield_message = YieldMessage.new(args: args, kwargs: kwargs, reply_port: reply_port) - port.send(yield_message, move: settings.move_block_arguments?) + message.reply_port.send(yield_message, move: message.settings.move_block_arguments?) reply_message = reply_port.receive case reply_message when ExceptionMessage @@ -780,13 +955,19 @@ def refuse_method(message) begin error = ::Ractor::ClosedError.new("Wrapper is shutting down") message.reply_port.send(ExceptionMessage.new(error)) - rescue ::StandardError + rescue ::Ractor::Error maybe_log("Failure to send refusal message", call_message: message) end end + def send_join_reply(port) + port.send(nil) + rescue ::Ractor::ClosedError + maybe_log("Join reply port is closed") + end + def maybe_log(str, call_message: nil, worker_num: nil, transaction: nil, method_name: nil) - return unless @logging_enabled + return unless @enable_logging transaction ||= call_message&.transaction method_name ||= call_message&.method_name metadata = [::Time.now.utc.strftime("%Y-%m-%dT%H:%M:%S.%L"), "Ractor::Wrapper/#{@name}"] diff --git a/ractor-wrapper.gemspec b/ractor-wrapper.gemspec index bec41fb..13cd5b1 100644 --- a/ractor-wrapper.gemspec +++ b/ractor-wrapper.gemspec @@ -26,5 +26,4 @@ require "ractor/wrapper/version" spec.metadata["changelog_uri"] = "https://rubydoc.info/gems/ractor-wrapper/#{::Ractor::Wrapper::VERSION}/file/CHANGELOG.md" spec.metadata["documentation_uri"] = "https://rubydoc.info/gems/ractor-wrapper/#{::Ractor::Wrapper::VERSION}" spec.metadata["homepage_uri"] = "https://github.com/dazuma/ractor-wrapper" - spec.metadata["source_code_uri"] = "https://github.com/dazuma/ractor-wrapper" end diff --git a/test/helper.rb b/test/helper.rb index 77143c0..d21f8ba 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -18,6 +18,10 @@ def run_block(*, **) yield(*, **) end + def run_block_with_id(obj) + yield(obj, obj.object_id) + end + def whoops raise "Whoops" end diff --git a/test/test_example.rb b/test/test_example.rb index 9ca8c27..e5b3adc 100644 --- a/test/test_example.rb +++ b/test/test_example.rb @@ -3,35 +3,22 @@ require "helper" describe ::Ractor::Wrapper do - it "runs the README example" do - # Faraday cannot be run outside the main Ractor - skip - readme_content = ::File.read(::File.join(::File.dirname(__dir__), "README.md")) - script = /\n```ruby\n(.*\n)```\n/m.match(readme_content)[1] - eval(script) # rubocop:disable Security/Eval + let(:ruby_block_finder) { /\n```ruby\n((?:(?:[^`][^\n]*)?\n)+)```\n/m } - require "net/http" - connection = Faraday.new("http://example.com") - wrapper = Ractor::Wrapper.new(connection) - begin - wrapper.stub.get("/hello") - ensure - wrapper.async_stop - end + it "runs the Net::HTTP README example" do + readme_content = ::File.read(::File.join(::File.dirname(__dir__), "README.md")) + script = readme_content.scan(ruby_block_finder)[0][0] + script = "#{script}\nresponse\n" + response = eval(script) # rubocop:disable Security/Eval + assert_kind_of(Net::HTTPOK, response) end - it "wraps a SQLite3 database" do - # SQLite3::Database is not movable - skip - require "sqlite3" - path = File.join(__dir__, "data", "numbers.db") - db = SQLite3::Database.new(path) - wrapper = Ractor::Wrapper.new(db) - begin - rows = wrapper.stub.execute("select * from numbers") - assert_equal([["one", 1], ["two", 2]], rows) - ensure - wrapper.async_stop - end + it "runs the SQLite3 README example" do + $my_database_path = File.join(__dir__, "data", "numbers.db") + readme_content = ::File.read(::File.join(::File.dirname(__dir__), "README.md")) + script = readme_content.scan(ruby_block_finder)[1][0] + script = "#{script}\nrows\n" + rows = eval(script) # rubocop:disable Security/Eval + assert_equal([["one", 1], ["two", 2]], rows) end end diff --git a/test/test_wrapper.rb b/test/test_wrapper.rb index 08462c8..2ad9191 100644 --- a/test/test_wrapper.rb +++ b/test/test_wrapper.rb @@ -5,100 +5,249 @@ describe ::Ractor::Wrapper do let(:remote) { RemoteObject.new } - describe "wrapper features" do + describe "an isolated wrapper" do let(:wrapper) { Ractor::Wrapper.new(remote) } - after { wrapper.async_stop } + after { wrapper.async_stop.join } it "moves a wrapped object" do wrapper assert_raises(Ractor::MovedError) do - remote.to_s + remote.echo_args end end - it "refuses to wrap a moved object" do - wrapper - assert_raises(Ractor::MovedError) do - Ractor::Wrapper.new(remote) - end + it "recovers the object" do + wrapper.async_stop + recovered = wrapper.recover_object + assert_equal("[], {}", recovered.echo_args) end end - describe "method features" do - def wrapper(**) - @wrapper ||= Ractor::Wrapper.new(remote, **) - end + describe "a local wrapper" do + let(:wrapper) { Ractor::Wrapper.new(remote, use_current_ractor: true) } - after { wrapper.async_stop } + after { wrapper.async_stop.join } - it "passes arguments and return values" do - result = wrapper.call(:echo_args, 1, 2, a: "b", c: "d") - assert_equal("[1, 2], {a: \"b\", c: \"d\"}", result) + it "does not move a wrapped object" do + wrapper + assert_equal("[], {}", remote.echo_args) end - it "gets exceptions" do - exception = assert_raises(RuntimeError) do - wrapper.call(:whoops) + it "refuses to recover" do + wrapper.async_stop + error = assert_raises(Ractor::Error) do + wrapper.recover_object end - assert_equal("Whoops", exception.message) + assert_equal("cannot recover an object from a local wrapper", error.message) end + end - it "yields to a local block" do - local_var = false - result = wrapper.call(:run_block, 1, 2, a: "b", c: "d") do |one, two, a:, c:| - local_var = true - "result #{one} #{two} #{a} #{c}" + [ + { + desc: "an isolated wrapper", + opts: {use_current_ractor: false}, + }, + { + desc: "a local wrapper", + opts: {use_current_ractor: true}, + }, + ].each do |config| + describe "basic behavior of #{config[:desc]}" do + let(:base_opts) { config[:opts] } + + before { @wrapper = nil } + after { @wrapper&.async_stop&.join } + + it "refuses to wrap a moved object" do + port = Ractor::Port.new + port.send(remote, move: true) + port.receive + port.close + error = assert_raises(Ractor::MovedError) do + Ractor::Wrapper.new(remote, **base_opts) + end + assert_equal("cannot wrap a moved object", error.message) end - assert_equal("result 1 2 b d", result) - assert_equal(true, local_var) - end - it "yields to a remote block" do - wrapper(execute_block_in_ractor: true) - result = wrapper.call(:run_block, 1, 2, a: "b", c: "d") do |one, two, a:, c:| - "result #{one} #{two} #{a} #{c} #{inspect}" + it "is shareable" do + @wrapper = Ractor::Wrapper.new(remote, **base_opts) + assert(Ractor.shareable?(@wrapper)) end - assert_equal("result 1 2 b d nil", result) end - end - describe "object moving and copying" do - after { @wrapper&.async_stop } + describe "method features of #{config[:desc]}" do + let(:base_opts) { config[:opts] } - it "copies arguments by default" do - @wrapper = Ractor::Wrapper.new(remote) - str = "hello".dup - @wrapper.call(:echo_args, str) - str.to_s # Would fail if str was moved - end + def wrapper(**) + @wrapper ||= Ractor::Wrapper.new(remote, **base_opts, **) + end - it "moves arguments when move_arguments is set to true" do - @wrapper = Ractor::Wrapper.new(remote, move_arguments: true) - str = "hello".dup - @wrapper.call(:echo_args, str) - assert_raises(Ractor::MovedError) { str.to_s } - end + after { wrapper.async_stop.join } + + it "passes arguments and return values" do + result = wrapper.call(:echo_args, 1, 2, a: "b", c: "d") + assert_equal("[1, 2], {a: \"b\", c: \"d\"}", result) + end + + it "gets exceptions" do + exception = assert_raises(RuntimeError) do + wrapper.call(:whoops) + end + assert_equal("Whoops", exception.message) + end - it "moves arguments when move is set to true" do - @wrapper = Ractor::Wrapper.new(remote, move: true) - str = "hello".dup - @wrapper.call(:echo_args, str) - assert_raises(Ractor::MovedError) { str.to_s } + it "yields to a local block" do + local_var = false + result = wrapper.call(:run_block, 1, 2, a: "b", c: "d") do |one, two, a:, c:| + local_var = true + "result #{one} #{two} #{a} #{c}" + end + assert_equal("result 1 2 b d", result) + assert_equal(true, local_var) + end + + it "yields to an in-place block" do + wrapper(execute_blocks_in_place: true) + result = wrapper.call(:run_block, 1, 2, a: "b", c: "d") do |one, two, a:, c:| + "result #{one} #{two} #{a} #{c} #{inspect}" + end + assert_equal("result 1 2 b d nil", result) + end end - it "honors move_arguments over move" do - @wrapper = Ractor::Wrapper.new(remote, move: true, move_arguments: false) - str = "hello".dup - @wrapper.call(:echo_args, str) - str.to_s + describe "object moving and copying in #{config[:desc]}" do + let(:base_opts) { config[:opts] } + + def wrapper(**) + @wrapper ||= Ractor::Wrapper.new(remote, **base_opts, **) + end + + after { wrapper.async_stop.join } + + it "copies arguments by default" do + str = "hello".dup + wrapper.call(:echo_args, str) + str.to_s # Would fail if str was moved + end + + it "moves arguments when move_arguments is set to true" do + wrapper(move_arguments: true) + str = "hello".dup + wrapper.call(:echo_args, str) + assert_raises(Ractor::MovedError) { str.to_s } + end + + it "moves arguments when move_data is set to true" do + wrapper(move_data: true) + str = "hello".dup + wrapper.call(:echo_args, str) + assert_raises(Ractor::MovedError) { str.to_s } + end + + it "honors move_arguments over move_data" do + wrapper(move_data: true, move_arguments: false) + str = "hello".dup + wrapper.call(:echo_args, str) + str.to_s # Would fail if str was moved + end + + it "copies return values by default" do + obj, id = wrapper.call(:object_and_id, "hello".dup) + refute_equal(obj.object_id, id) + end + + it "moves return values when move_results is set to true" do + wrapper(move_results: true) + obj, id = wrapper.call(:object_and_id, "hello".dup) + assert_equal(obj.object_id, id) + end + + it "moves return values when move_data is set to true" do + wrapper(move_data: true) + obj, id = wrapper.call(:object_and_id, "hello".dup) + assert_equal(obj.object_id, id) + end + + it "honors move_results over move_data" do + wrapper(move_data: true, move_results: false) + obj, id = wrapper.call(:object_and_id, "hello".dup) + refute_equal(obj.object_id, id) + end + + it "copies block arguments by default" do + arg_id, orig_id = wrapper.call(:run_block_with_id, "hello".dup) do |str, str_id| + [str.object_id, str_id] + end + refute_equal(orig_id, arg_id) + end + + it "moves block arguments when move_block_arguments is set" do + wrapper(move_block_arguments: true) + arg_id, orig_id = wrapper.call(:run_block_with_id, "hello".dup) do |str, str_id| + [str.object_id, str_id] + end + assert_equal(orig_id, arg_id) + end + + it "moves block arguments when move_data is set" do + wrapper(move_data: true) + arg_id, orig_id = wrapper.call(:run_block_with_id, "hello".dup) do |str, str_id| + [str.object_id, str_id] + end + assert_equal(orig_id, arg_id) + end + + it "honors move_block_arguments over move_data" do + wrapper(move_data: true, move_block_arguments: false) + arg_id, orig_id = wrapper.call(:run_block_with_id, "hello".dup) do |str, str_id| + [str.object_id, str_id] + end + refute_equal(orig_id, arg_id) + end + + it "copies block results by default" do + wrapper(move_results: true) + obj, id = wrapper.call(:run_block) do + str = "hello".dup + [str, str.object_id] + end + refute_equal(obj.object_id, id) + end + + it "moves block results when move_block_results is set" do + wrapper(move_results: true, move_block_results: true) + obj, id = wrapper.call(:run_block) do + str = "hello".dup + [str, str.object_id] + end + assert_equal(obj.object_id, id) + end + + it "moves block results when move_data is set" do + wrapper(move_data: true) + obj, id = wrapper.call(:run_block) do + str = "hello".dup + [str, str.object_id] + end + assert_equal(obj.object_id, id) + end + + it "honors move_block_results over move_data" do + wrapper(move_data: true, move_block_results: false) + obj, id = wrapper.call(:run_block) do + str = "hello".dup + [str, str.object_id] + end + refute_equal(obj.object_id, id) + end end end describe "stubs" do let(:wrapper) { Ractor::Wrapper.new(remote) } - after { wrapper.async_stop } + after { wrapper.async_stop.join } it "converts method calls with arguments and return values" do result = wrapper.stub.echo_args(1, 2, a: "b", c: "d") @@ -121,7 +270,7 @@ def wrapper(**) describe "single-thread lifecycle" do let(:wrapper) { Ractor::Wrapper.new(remote) } - after { wrapper.async_stop } + after { wrapper.async_stop.join } it "recovers the remote" do assert_equal("[], {}", remote.echo_args) @@ -150,14 +299,14 @@ def wrapper(**) result2, time2 = r2.value assert_equal("hello", result1) assert_equal("world", result2) - assert_operator((time1 - time2).abs, :>, 0.8) + assert_operator((time1 - time2).abs, :>, 0.9) end end describe "2-thread lifecycle" do - let(:wrapper) { Ractor::Wrapper.new(remote, thread_count: 2) } + let(:wrapper) { Ractor::Wrapper.new(remote, threads: 2) } - after { wrapper.async_stop } + after { wrapper.async_stop.join } it "recovers the remote" do assert_equal("[], {}", remote.echo_args) @@ -187,7 +336,7 @@ def wrapper(**) result2, time2 = r2.value assert_equal("hello", result1) assert_equal("world", result2) - assert_operator((time1 - time2).abs, :<, 0.4) + assert_operator((time1 - time2).abs, :<, 0.8) end end end