From 27bd96b950055a94e5d546b5f6333d57c6a00d4c Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Wed, 22 Jan 2025 14:46:30 -0600 Subject: [PATCH] Update and signal with start --- temporalio/.rubocop.yml | 6 +- temporalio/lib/temporalio/client.rb | 105 +++++++- .../lib/temporalio/client/interceptor.rb | 37 +++ .../client/with_start_workflow_operation.rb | 109 +++++++++ temporalio/lib/temporalio/error.rb | 1 + .../internal/client/implementation.rb | 227 +++++++++++++++++- .../lib/temporalio/worker/thread_pool.rb | 10 +- temporalio/sig/temporalio/client.rbs | 24 ++ .../sig/temporalio/client/interceptor.rbs | 38 +++ .../client/with_start_workflow_operation.rbs | 67 ++++++ temporalio/sig/temporalio/error.rbs | 2 +- .../internal/client/implementation.rbs | 15 ++ temporalio/test/client_schedule_test.rb | 2 +- temporalio/test/client_workflow_test.rb | 4 - .../test/worker_workflow_handler_test.rb | 200 +++++++++++++++ temporalio/test/worker_workflow_test.rb | 2 - 16 files changed, 831 insertions(+), 18 deletions(-) create mode 100644 temporalio/lib/temporalio/client/with_start_workflow_operation.rb create mode 100644 temporalio/sig/temporalio/client/with_start_workflow_operation.rbs create mode 100644 temporalio/sig/temporalio/internal/client/implementation.rbs diff --git a/temporalio/.rubocop.yml b/temporalio/.rubocop.yml index 75b430f2..96b29ab4 100644 --- a/temporalio/.rubocop.yml +++ b/temporalio/.rubocop.yml @@ -49,6 +49,10 @@ Metrics/AbcSize: Metrics/BlockLength: Max: 100 +# The default is too small +Metrics/BlockNesting: + Max: 5 + # The default is too small Metrics/ClassLength: Max: 1000 @@ -59,7 +63,7 @@ Metrics/CyclomaticComplexity: # The default is too small Metrics/MethodLength: - Max: 100 + Max: 200 # The default is too small Metrics/ModuleLength: diff --git a/temporalio/lib/temporalio/client.rb b/temporalio/lib/temporalio/client.rb index 291272aa..51fdb2d8 100644 --- a/temporalio/lib/temporalio/client.rb +++ b/temporalio/lib/temporalio/client.rb @@ -8,10 +8,13 @@ require 'temporalio/client/interceptor' require 'temporalio/client/schedule' require 'temporalio/client/schedule_handle' +require 'temporalio/client/with_start_workflow_operation' require 'temporalio/client/workflow_execution' require 'temporalio/client/workflow_execution_count' require 'temporalio/client/workflow_handle' require 'temporalio/client/workflow_query_reject_condition' +require 'temporalio/client/workflow_update_handle' +require 'temporalio/client/workflow_update_wait_stage' require 'temporalio/common_enums' require 'temporalio/converters' require 'temporalio/error' @@ -155,7 +158,7 @@ def initialize( default_workflow_query_reject_condition: ).freeze # Initialize interceptors - @impl = interceptors.reverse_each.reduce(Internal::Client::Implementation.new(self)) do |acc, int| + @impl = interceptors.reverse_each.reduce(Internal::Client::Implementation.new(self)) do |acc, int| # steep:ignore int.intercept_client(acc) end end @@ -334,6 +337,106 @@ def workflow_handle( WorkflowHandle.new(client: self, id: workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:) end + # Start an update, possibly starting the workflow at the same time if it doesn't exist (depending upon ID conflict + # policy). Note that in some cases this may fail but the workflow will still be started, and the handle can then be + # retrieved on the start workflow operation. + # + # @param update [Workflow::Definition::Update, Symbol, String] Update definition or name. + # @param args [Array] Update arguments. + # @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This must + # have an `id_conflict_policy` set. + # @param wait_for_stage [WorkflowUpdateWaitStage] Required stage to wait until returning. ADMITTED is not + # currently supported. See https://docs.temporal.io/workflows#update for more details. + # @param id [String] ID of the update. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [WorkflowUpdateHandle] The update handle. + # @raise [Error::WorkflowAlreadyStartedError] Workflow already exists and conflict/reuse policy does not allow. + # @raise [Error::WorkflowUpdateRPCTimeoutOrCanceledError] This update call timed out or was canceled. This doesn't + # mean the update itself was timed out or canceled, and this doesn't mean the workflow did not start. + # @raise [Error::RPCError] RPC error from call. + def start_update_with_start_workflow( + update, + *args, + start_workflow_operation:, + wait_for_stage:, + id: SecureRandom.uuid, + rpc_options: nil + ) + @impl.start_update_with_start_workflow( + Interceptor::StartUpdateWithStartWorkflowInput.new( + update_id: id, + update:, + args:, + wait_for_stage:, + start_workflow_operation:, + headers: {}, + rpc_options: + ) + ) + end + + # Start an update, possibly starting the workflow at the same time if it doesn't exist (depending upon ID conflict + # policy), and wait for update result. This is a shortcut for {start_update_with_start_workflow} + + # {WorkflowUpdateHandle.result}. + # + # @param update [Workflow::Definition::Update, Symbol, String] Update definition or name. + # @param args [Array] Update arguments. + # @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This must + # have an `id_conflict_policy` set. + # @param id [String] ID of the update. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [Object] Successful update result. + # @raise [Error::WorkflowUpdateFailedError] If the update failed. + # @raise [Error::WorkflowAlreadyStartedError] Workflow already exists and conflict/reuse policy does not allow. + # @raise [Error::WorkflowUpdateRPCTimeoutOrCanceledError] This update call timed out or was canceled. This doesn't + # mean the update itself was timed out or canceled, and this doesn't mean the workflow did not start. + # @raise [Error::RPCError] RPC error from call. + def execute_update_with_start_workflow( + update, + *args, + start_workflow_operation:, + id: SecureRandom.uuid, + rpc_options: nil + ) + start_update_with_start_workflow( + update, + *args, + start_workflow_operation:, + wait_for_stage: WorkflowUpdateWaitStage::COMPLETED, + id:, + rpc_options: + ).result + end + + # Send a signal, possibly starting the workflow at the same time if it doesn't exist. + # + # @param signal [Workflow::Definition::Signal, Symbol, String] Signal definition or name. + # @param args [Array] Signal arguments. + # @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This may not + # support all `id_conflict_policy` options. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [WorkflowHandle] A workflow handle to the workflow. + # @raise [Error::WorkflowAlreadyStartedError] Workflow already exists and conflict/reuse policy does not allow. + # @raise [Error::RPCError] RPC error from call. + def signal_with_start_workflow( + signal, + *args, + start_workflow_operation:, + rpc_options: nil + ) + @impl.signal_with_start_workflow( + Interceptor::SignalWithStartWorkflowInput.new( + signal:, + args:, + start_workflow_operation:, + rpc_options: + ) + ) + end + # List workflows. # # @param query [String, nil] A Temporal visibility list filter. diff --git a/temporalio/lib/temporalio/client/interceptor.rb b/temporalio/lib/temporalio/client/interceptor.rb index 4ccf66cb..0a556624 100644 --- a/temporalio/lib/temporalio/client/interceptor.rb +++ b/temporalio/lib/temporalio/client/interceptor.rb @@ -38,6 +38,26 @@ def intercept_client(next_interceptor) :rpc_options ) + # Input for {Outbound.start_update_with_start_workflow}. + StartUpdateWithStartWorkflowInput = Data.define( + :update_id, + :update, + :args, + :wait_for_stage, + :start_workflow_operation, + :headers, + :rpc_options + ) + + # Input for {Outbound.signal_with_start_workflow}. + SignalWithStartWorkflowInput = Data.define( + :signal, + :args, + :start_workflow_operation, + # Headers intentionally not defined here, because they are not separate from start_workflow_operation + :rpc_options + ) + # Input for {Outbound.list_workflows}. ListWorkflowsInput = Data.define( :query, @@ -240,6 +260,23 @@ def start_workflow(input) next_interceptor.start_workflow(input) end + # Called for every {Client.start_update_with_start_workflow} and {Client.execute_update_with_start_workflow} + # call. + # + # @param input [StartUpdateWithStartWorkflowInput] Input. + # @return [WorkflowUpdateHandle] Workflow update handle. + def start_update_with_start_workflow(input) + next_interceptor.start_update_with_start_workflow(input) + end + + # Called for every {Client.signal_with_start_workflow}. + # + # @param input [SignalWithStartWorkflowInput] Input. + # @return [WorkflowHandle] Workflow handle. + def signal_with_start_workflow(input) + next_interceptor.signal_with_start_workflow(input) + end + # Called for every {Client.list_workflows} call. # # @param input [ListWorkflowsInput] Input. diff --git a/temporalio/lib/temporalio/client/with_start_workflow_operation.rb b/temporalio/lib/temporalio/client/with_start_workflow_operation.rb new file mode 100644 index 00000000..478227e7 --- /dev/null +++ b/temporalio/lib/temporalio/client/with_start_workflow_operation.rb @@ -0,0 +1,109 @@ +# frozen_string_literal: true + +require 'temporalio/common_enums' + +module Temporalio + class Client + # Start operation used by {Client.start_update_with_start_workflow}, {Client.execute_update_with_start_workflow}, + # and {Client.signal_with_start_workflow}. + class WithStartWorkflowOperation + Options = Data.define( + :workflow, + :args, + :id, + :task_queue, + :execution_timeout, + :run_timeout, + :task_timeout, + :id_reuse_policy, + :id_conflict_policy, + :retry_policy, + :cron_schedule, + :memo, + :search_attributes, + :start_delay, + :headers + ) + + # Options the operation was created with. + class Options; end # rubocop:disable Lint/EmptyClass + + # @return [Options] Options the operation was created with. + attr_accessor :options + + # Create a with-start workflow operation. These are mostly the same options as {Client.start_workflow}, see that + # documentation for more details. + # + # Note, for {Client.start_update_with_start_workflow} and {Client.execute_update_with_start_workflow}, + # `id_conflict_policy` is required. + def initialize( + workflow, + *args, + id:, + task_queue:, + execution_timeout: nil, + run_timeout: nil, + task_timeout: nil, + id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, + id_conflict_policy: WorkflowIDConflictPolicy::UNSPECIFIED, + retry_policy: nil, + cron_schedule: nil, + memo: nil, + search_attributes: nil, + start_delay: nil, + headers: {} + ) + @options = Options.new( + workflow:, + args:, + id:, + task_queue:, + execution_timeout:, + run_timeout:, + task_timeout:, + id_reuse_policy:, + id_conflict_policy:, + retry_policy:, + cron_schedule:, + memo:, + search_attributes:, + start_delay:, + headers: + ) + @workflow_handle_mutex = Mutex.new + @workflow_handle_cond_var = ConditionVariable.new + end + + # Get the workflow handle, possibly waiting until set, or raise an error if the workflow start was unsuccessful. + # + # @param wait [Boolean] True to wait until it is set, false to return immediately. + # + # @return [WorkflowHandle, nil] The workflow handle when available or `nil` if `wait` is false and it is not set + # yet. + # @raise [Error] Any error that occurred during the call before the workflow start returned. + def workflow_handle(wait: true) + @workflow_handle_mutex.synchronize do + @workflow_handle_cond_var.wait(@workflow_handle_mutex) unless @workflow_handle || !wait + raise @workflow_handle if @workflow_handle.is_a?(Exception) + + @workflow_handle + end + end + + # @!visibility private + def _set_workflow_handle(value) + @workflow_handle_mutex.synchronize do + @workflow_handle ||= value + @workflow_handle_cond_var.broadcast + end + end + + # @!visibility private + def _mark_used + raise 'Start operation already used' if @in_use + + @in_use = true + end + end + end +end diff --git a/temporalio/lib/temporalio/error.rb b/temporalio/lib/temporalio/error.rb index d23daf1b..44ab8407 100644 --- a/temporalio/lib/temporalio/error.rb +++ b/temporalio/lib/temporalio/error.rb @@ -124,6 +124,7 @@ def grpc_status def create_grpc_status return Api::Common::V1::GrpcStatus.new(code: @code) unless @raw_grpc_status + return @raw_grpc_status if @raw_grpc_status.is_a?(Api::Common::V1::GrpcStatus) Api::Common::V1::GrpcStatus.decode(@raw_grpc_status) end diff --git a/temporalio/lib/temporalio/internal/client/implementation.rb b/temporalio/lib/temporalio/internal/client/implementation.rb index 22732a98..174afc1b 100644 --- a/temporalio/lib/temporalio/internal/client/implementation.rb +++ b/temporalio/lib/temporalio/internal/client/implementation.rb @@ -9,6 +9,7 @@ require 'temporalio/client/interceptor' require 'temporalio/client/schedule' require 'temporalio/client/schedule_handle' +require 'temporalio/client/with_start_workflow_operation' require 'temporalio/client/workflow_execution' require 'temporalio/client/workflow_execution_count' require 'temporalio/client/workflow_handle' @@ -41,7 +42,7 @@ def self.with_default_rpc_options(user_rpc_options) end def initialize(client) - super(nil) + super(nil) # steep:ignore @client = client end @@ -104,6 +105,224 @@ def start_workflow(input) ) end + def start_update_with_start_workflow(input) + raise ArgumentError, 'Start operation is required' unless input.start_workflow_operation + + if input.start_workflow_operation.options.id_conflict_policy == WorkflowIDConflictPolicy::UNSPECIFIED + raise ArgumentError, 'ID conflict policy is required in start operation' + end + + # Try to mark used before using + input.start_workflow_operation._mark_used + + # Build request + start_options = input.start_workflow_operation.options + start_req = _start_workflow_request_from_with_start_options( + Api::WorkflowService::V1::StartWorkflowExecutionRequest, start_options + ) + req = Api::WorkflowService::V1::ExecuteMultiOperationRequest.new( + namespace: @client.namespace, + operations: [ + Api::WorkflowService::V1::ExecuteMultiOperationRequest::Operation.new(start_workflow: start_req), + Api::WorkflowService::V1::ExecuteMultiOperationRequest::Operation.new( + update_workflow: Api::WorkflowService::V1::UpdateWorkflowExecutionRequest.new( + namespace: @client.namespace, + workflow_execution: Api::Common::V1::WorkflowExecution.new( + workflow_id: start_options.id + ), + request: Api::Update::V1::Request.new( + meta: Api::Update::V1::Meta.new( + update_id: input.update_id, + identity: @client.connection.identity + ), + input: Api::Update::V1::Input.new( + name: Workflow::Definition::Update._name_from_parameter(input.update), + args: @client.data_converter.to_payloads(input.args), + header: Internal::ProtoUtils.headers_to_proto(input.headers, @client.data_converter) + ) + ), + wait_policy: Api::Update::V1::WaitPolicy.new( + lifecycle_stage: input.wait_for_stage + ) + ) + ) + ] + ) + + # Continually try to start until an exception occurs, the user-asked stage is reached, or the stage is + # accepted. But we will set the workflow handle as soon as we can. + # @type var update_resp: untyped + update_resp = nil + run_id = nil + begin + loop do + resp = @client.workflow_service.execute_multi_operation( + req, rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + run_id = resp.responses.first.start_workflow.run_id + # Set workflow handle (no-op if already set) + input.start_workflow_operation._set_workflow_handle( + Temporalio::Client::WorkflowHandle.new( + client: @client, + id: start_options.id, + run_id: nil, + result_run_id: run_id, + first_execution_run_id: run_id + ) + ) + update_resp = resp.responses.last.update_workflow + + # We're only done if the response stage is at least accepted + if update_resp && Api::Enums::V1::UpdateWorkflowExecutionLifecycleStage.resolve(update_resp.stage) >= + Temporalio::Client::WorkflowUpdateWaitStage::ACCEPTED + break + end + end + + # If the user wants to wait until completed, we must poll until outcome if not already there + if input.wait_for_stage == Temporalio::Client::WorkflowUpdateWaitStage::COMPLETED && update_resp.outcome + update_resp.outcome = @client._impl.poll_workflow_update( + Temporalio::Client::Interceptor::PollWorkflowUpdateInput.new( + workflow_id: start_options.id, + run_id:, + update_id: input.update_id, + rpc_options: input.rpc_options + ) + ) + end + rescue Error => e + # If this is a multi-operation failure, set exception to the first present, non-OK, non-aborted error + if e.is_a?(Error::RPCError) + multi_err = e.grpc_status.details&.first&.unpack(Api::ErrorDetails::V1::MultiOperationExecutionFailure) + if multi_err + non_aborted = multi_err.statuses.find do |s| + # Exists, not-ok, not-aborted + s && s.code != Error::RPCError::Code::OK && + !s.details&.first&.is(Api::Failure::V1::MultiOperationExecutionAborted) + end + if non_aborted + e = Error::RPCError.new( + non_aborted.message, + code: non_aborted.code, + raw_grpc_status: Api::Common::V1::GrpcStatus.new( + code: non_aborted.code, message: non_aborted.message, details: non_aborted.details.to_a + ) + ) + end + end + end + if e.is_a?(Error::RPCError) + # Deadline exceeded or cancel is a special error type + if e.code == Error::RPCError::Code::DEADLINE_EXCEEDED || e.code == Error::RPCError::Code::CANCELED + e = Error::WorkflowUpdateRPCTimeoutOrCanceledError.new + elsif e.code == Error::RPCError::Code::ALREADY_EXISTS && e.grpc_status.details.first + # Unpack and set already started if that's the error + details = e.grpc_status.details.first.unpack( + Api::ErrorDetails::V1::WorkflowExecutionAlreadyStartedFailure + ) + if details + e = Error::WorkflowAlreadyStartedError.new( + workflow_id: start_options.id, + workflow_type: start_req.workflow_type, + run_id: details.run_id + ) + end + end + end + # Cancel is a special type + e = Error::WorkflowUpdateRPCTimeoutOrCanceledError.new if e.is_a?(Error::CanceledError) + # Before we raise here, we want to try to set the start operation exception (no-op if already set with a + # handle) + input.start_workflow_operation._set_workflow_handle(e) + raise e + end + + # Return handle + Temporalio::Client::WorkflowUpdateHandle.new( + client: @client, + id: input.update_id, + workflow_id: start_options.id, + workflow_run_id: run_id, + known_outcome: update_resp.outcome + ) + end + + def signal_with_start_workflow(input) + raise ArgumentError, 'Start operation is required' unless input.start_workflow_operation + + # Try to mark used before using + input.start_workflow_operation._mark_used + + # Build req + start_options = input.start_workflow_operation.options + req = _start_workflow_request_from_with_start_options( + Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest, start_options + ) + req.signal_name = Workflow::Definition::Signal._name_from_parameter(input.signal) + req.signal_input = @client.data_converter.to_payloads(input.args) + + # Send request + begin + resp = @client.workflow_service.signal_with_start_workflow_execution( + req, + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + rescue Error::RPCError => e + # Unpack and raise already started if that's the error, otherwise default raise + if e.code == Error::RPCError::Code::ALREADY_EXISTS && e.grpc_status.details.first + details = e.grpc_status.details.first.unpack( + Api::ErrorDetails::V1::WorkflowExecutionAlreadyStartedFailure + ) + if details + e = Error::WorkflowAlreadyStartedError.new( + workflow_id: req.workflow_id, + workflow_type: req.workflow_type.name, + run_id: details.run_id + ) + end + end + # Before we raise here, we want to the start operation exception + input.start_workflow_operation._set_workflow_handle(e) + raise e + end + + # Set handle and return handle + handle = Temporalio::Client::WorkflowHandle.new( + client: @client, + id: start_options.id, + run_id: nil, + result_run_id: resp.run_id, + first_execution_run_id: resp.run_id + ) + input.start_workflow_operation._set_workflow_handle(handle) + handle + end + + def _start_workflow_request_from_with_start_options(klass, start_options) + klass.new( + request_id: SecureRandom.uuid, + namespace: @client.namespace, + workflow_type: Api::Common::V1::WorkflowType.new( + name: Workflow::Definition._workflow_type_from_workflow_parameter(start_options.workflow) + ), + workflow_id: start_options.id, + task_queue: Api::TaskQueue::V1::TaskQueue.new(name: start_options.task_queue.to_s), + input: @client.data_converter.to_payloads(start_options.args), + workflow_execution_timeout: ProtoUtils.seconds_to_duration(start_options.execution_timeout), + workflow_run_timeout: ProtoUtils.seconds_to_duration(start_options.run_timeout), + workflow_task_timeout: ProtoUtils.seconds_to_duration(start_options.task_timeout), + identity: @client.connection.identity, + workflow_id_reuse_policy: start_options.id_reuse_policy, + workflow_id_conflict_policy: start_options.id_conflict_policy, + retry_policy: start_options.retry_policy&._to_proto, + cron_schedule: start_options.cron_schedule, + memo: ProtoUtils.memo_to_proto(start_options.memo, @client.data_converter), + search_attributes: start_options.search_attributes&._to_proto, + workflow_start_delay: ProtoUtils.seconds_to_duration(start_options.start_delay), + header: ProtoUtils.headers_to_proto(start_options.headers, @client.data_converter) + ) + end + def list_workflows(input) Enumerator.new do |yielder| req = Api::WorkflowService::V1::ListWorkflowExecutionsRequest.new( @@ -284,11 +503,13 @@ def start_workflow_update(input) end rescue Error::RPCError => e # Deadline exceeded or cancel is a special error type - if e.code == Error::RPCError::Code::DEADLINE_EXCEEDED || e.code == Error::RPCError::Code::CANCELLED + if e.code == Error::RPCError::Code::DEADLINE_EXCEEDED || e.code == Error::RPCError::Code::CANCELED raise Error::WorkflowUpdateRPCTimeoutOrCanceledError end raise + rescue Error::CanceledError + raise Error::WorkflowUpdateRPCTimeoutOrCanceledError end # If the user wants to wait until completed, we must poll until outcome @@ -338,7 +559,7 @@ def poll_workflow_update(input) return resp.outcome if resp.outcome rescue Error::RPCError => e # Deadline exceeded or cancel is a special error type - if e.code == Error::RPCError::Code::DEADLINE_EXCEEDED || e.code == Error::RPCError::Code::CANCELLED + if e.code == Error::RPCError::Code::DEADLINE_EXCEEDED || e.code == Error::RPCError::Code::CANCELED raise Error::WorkflowUpdateRPCTimeoutOrCanceledError end diff --git a/temporalio/lib/temporalio/worker/thread_pool.rb b/temporalio/lib/temporalio/worker/thread_pool.rb index d9ffe829..1c3005d9 100644 --- a/temporalio/lib/temporalio/worker/thread_pool.rb +++ b/temporalio/lib/temporalio/worker/thread_pool.rb @@ -1,15 +1,15 @@ # frozen_string_literal: true -# Much of this logic taken from -# https://github.com/ruby-concurrency/concurrent-ruby/blob/044020f44b36930b863b930f3ee8fa1e9f750469/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, -# see MIT license at -# https://github.com/ruby-concurrency/concurrent-ruby/blob/044020f44b36930b863b930f3ee8fa1e9f750469/LICENSE.txt - module Temporalio class Worker # Implementation of a thread pool. This implementation is a stripped down form of Concurrent Ruby's # `CachedThreadPool`. class ThreadPool + # Much of this logic taken from + # https://github.com/ruby-concurrency/concurrent-ruby/blob/044020f44b36930b863b930f3ee8fa1e9f750469/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, + # see MIT license at + # https://github.com/ruby-concurrency/concurrent-ruby/blob/044020f44b36930b863b930f3ee8fa1e9f750469/LICENSE.txt + # @return [ThreadPool] Default/shared thread pool instance with unlimited max threads. def self.default @default ||= new diff --git a/temporalio/sig/temporalio/client.rbs b/temporalio/sig/temporalio/client.rbs index 2e07b373..d80aae2d 100644 --- a/temporalio/sig/temporalio/client.rbs +++ b/temporalio/sig/temporalio/client.rbs @@ -101,6 +101,30 @@ module Temporalio ?first_execution_run_id: String? ) -> WorkflowHandle + def start_update_with_start_workflow: ( + Workflow::Definition::Update | Symbol | String update, + *Object? args, + start_workflow_operation: WithStartWorkflowOperation, + wait_for_stage: WorkflowUpdateWaitStage::enum, + ?id: String, + ?rpc_options: RPCOptions? + ) -> WorkflowUpdateHandle + + def execute_update_with_start_workflow: ( + Workflow::Definition::Update | Symbol | String update, + *Object? args, + start_workflow_operation: WithStartWorkflowOperation, + ?id: String, + ?rpc_options: RPCOptions? + ) -> Object? + + def signal_with_start_workflow: ( + Workflow::Definition::Signal | Symbol | String signal, + *Object? args, + start_workflow_operation: WithStartWorkflowOperation, + ?rpc_options: RPCOptions? + ) -> WorkflowHandle + def list_workflows: ( ?String query, ?rpc_options: RPCOptions? diff --git a/temporalio/sig/temporalio/client/interceptor.rbs b/temporalio/sig/temporalio/client/interceptor.rbs index d3522f65..cb27f0e2 100644 --- a/temporalio/sig/temporalio/client/interceptor.rbs +++ b/temporalio/sig/temporalio/client/interceptor.rbs @@ -43,6 +43,40 @@ module Temporalio ) -> void end + class StartUpdateWithStartWorkflowInput + attr_reader update_id: String + attr_reader update: Workflow::Definition::Update | Symbol | String + attr_reader args: Array[Object?] + attr_reader wait_for_stage: WorkflowUpdateWaitStage::enum + attr_reader start_workflow_operation: WithStartWorkflowOperation + attr_reader headers: Hash[String, Object?] + attr_reader rpc_options: RPCOptions? + + def initialize: ( + update_id: String, + update: Workflow::Definition::Update | Symbol | String, + args: Array[Object?], + wait_for_stage: WorkflowUpdateWaitStage::enum, + start_workflow_operation: WithStartWorkflowOperation, + headers: Hash[String, Object?], + rpc_options: RPCOptions? + ) -> void + end + + class SignalWithStartWorkflowInput + attr_reader signal: Workflow::Definition::Signal | Symbol | String + attr_reader args: Array[Object?] + attr_reader start_workflow_operation: WithStartWorkflowOperation + attr_reader rpc_options: RPCOptions? + + def initialize: ( + signal: Workflow::Definition::Signal | Symbol | String, + args: Array[Object?], + start_workflow_operation: WithStartWorkflowOperation, + rpc_options: RPCOptions? + ) -> void + end + class ListWorkflowsInput attr_reader query: String? attr_reader rpc_options: RPCOptions? @@ -366,6 +400,10 @@ module Temporalio def start_workflow: (StartWorkflowInput input) -> WorkflowHandle + def start_update_with_start_workflow: (StartUpdateWithStartWorkflowInput input) -> WorkflowUpdateHandle + + def signal_with_start_workflow: (SignalWithStartWorkflowInput input) -> WorkflowHandle + def list_workflows: (ListWorkflowsInput input) -> Enumerator[WorkflowExecution, WorkflowExecution] def count_workflows: (CountWorkflowsInput input) -> WorkflowExecutionCount diff --git a/temporalio/sig/temporalio/client/with_start_workflow_operation.rbs b/temporalio/sig/temporalio/client/with_start_workflow_operation.rbs new file mode 100644 index 00000000..997ec95c --- /dev/null +++ b/temporalio/sig/temporalio/client/with_start_workflow_operation.rbs @@ -0,0 +1,67 @@ +module Temporalio + class Client + class WithStartWorkflowOperation + class Options + attr_reader workflow: singleton(Workflow::Definition) | Workflow::Definition::Info | Symbol | String + attr_reader args: Array[Object?] + attr_reader id: String + attr_reader task_queue: String + attr_reader execution_timeout: duration? + attr_reader run_timeout: duration? + attr_reader task_timeout: duration? + attr_reader id_reuse_policy: WorkflowIDReusePolicy::enum + attr_reader id_conflict_policy: WorkflowIDConflictPolicy::enum + attr_reader retry_policy: RetryPolicy? + attr_reader cron_schedule: String? + attr_reader memo: Hash[String | Symbol, Object?]? + attr_reader search_attributes: SearchAttributes? + attr_reader start_delay: duration? + attr_reader headers: Hash[String, Object?] + + def initialize: ( + workflow: singleton(Workflow::Definition) | Workflow::Definition::Info | Symbol | String, + args: Array[Object?], + id: String, + task_queue: String, + execution_timeout: duration?, + run_timeout: duration?, + task_timeout: duration?, + id_reuse_policy: WorkflowIDReusePolicy::enum, + id_conflict_policy: WorkflowIDConflictPolicy::enum, + retry_policy: RetryPolicy?, + cron_schedule: String?, + memo: Hash[String | Symbol, Object?]?, + search_attributes: SearchAttributes?, + start_delay: duration?, + headers: Hash[String, Object?] + ) -> void + end + + attr_accessor options: Options + + def initialize: ( + singleton(Workflow::Definition) | Workflow::Definition::Info | Symbol | String workflow, + *Object? args, + id: String, + task_queue: String, + ?execution_timeout: duration?, + ?run_timeout: duration?, + ?task_timeout: duration?, + ?id_reuse_policy: WorkflowIDReusePolicy::enum, + ?id_conflict_policy: WorkflowIDConflictPolicy::enum, + ?retry_policy: RetryPolicy?, + ?cron_schedule: String?, + ?memo: Hash[String | Symbol, Object?]?, + ?search_attributes: SearchAttributes?, + ?start_delay: duration?, + ?headers: Hash[String, Object?] + ) -> void + + def workflow_handle: () -> WorkflowHandle + | (wait: bool) -> WorkflowHandle? + + def _set_workflow_handle: (WorkflowHandle | Exception value) -> void + def _mark_used: -> void + end + end +end \ No newline at end of file diff --git a/temporalio/sig/temporalio/error.rbs b/temporalio/sig/temporalio/error.rbs index 03638035..6c2e30a5 100644 --- a/temporalio/sig/temporalio/error.rbs +++ b/temporalio/sig/temporalio/error.rbs @@ -41,7 +41,7 @@ module Temporalio def initialize: ( String message, code: Code::enum, - raw_grpc_status: String? + raw_grpc_status: String? | untyped ) -> void def grpc_status: -> untyped diff --git a/temporalio/sig/temporalio/internal/client/implementation.rbs b/temporalio/sig/temporalio/internal/client/implementation.rbs new file mode 100644 index 00000000..06965b22 --- /dev/null +++ b/temporalio/sig/temporalio/internal/client/implementation.rbs @@ -0,0 +1,15 @@ +module Temporalio + module Internal + module Client + class Implementation < Temporalio::Client::Interceptor::Outbound + def self.with_default_rpc_options: (Temporalio::Client::RPCOptions? user_rpc_options) -> Temporalio::Client::RPCOptions + + def initialize: (Temporalio::Client client) -> void + + def _start_workflow_request_from_with_start_options: ( + untyped klass, Temporalio::Client::WithStartWorkflowOperation::Options start_options + ) -> untyped + end + end + end +end \ No newline at end of file diff --git a/temporalio/test/client_schedule_test.rb b/temporalio/test/client_schedule_test.rb index ad5dc0d3..cf9de672 100644 --- a/temporalio/test/client_schedule_test.rb +++ b/temporalio/test/client_schedule_test.rb @@ -9,7 +9,7 @@ class ClientScheduleTest < Test also_run_all_tests_in_fiber - def test_basics # rubocop:disable Metrics/AbcSize,Metrics/MethodLength + def test_basics # rubocop:disable Metrics/AbcSize assert_no_schedules # Create a schedule with lots of things diff --git a/temporalio/test/client_workflow_test.rb b/temporalio/test/client_workflow_test.rb index eb5618c7..8e4deccc 100644 --- a/temporalio/test/client_workflow_test.rb +++ b/temporalio/test/client_workflow_test.rb @@ -477,8 +477,4 @@ def test_rpc_cancellation assert_equal 'User canceled', err.message end end - - # TODO(cretz): Tests to write: - # * Workflow cloud test - # * Signal/update with start end diff --git a/temporalio/test/worker_workflow_handler_test.rb b/temporalio/test/worker_workflow_handler_test.rb index aa463c6b..94493f1c 100644 --- a/temporalio/test/worker_workflow_handler_test.rb +++ b/temporalio/test/worker_workflow_handler_test.rb @@ -618,4 +618,204 @@ def test_update_info l.include?('"UpdateInfoWorkflow"') && l.include?('update_id') && l.include?('"update-2"') end) end + + class UpdateWithStartWorkflow < Temporalio::Workflow::Definition + workflow_query_attr_reader :counter + + def initialize + @counter = 0 + end + + def execute(initial_increment) + @counter += initial_increment + Temporalio::Workflow.wait_condition { false } + end + + workflow_update + def increment_counter(value) + @counter += value + end + + workflow_update + def fail + raise Temporalio::Error::ApplicationError, 'Intentional failure' + end + + workflow_update + def start_waiting + Temporalio::Workflow.wait_condition { @finish_waiting } + end + + workflow_update + def finish_waiting + @finish_waiting = true + end + end + + def test_update_with_start_simple + # Run worker + worker = Temporalio::Worker.new( + client: env.client, + task_queue: "tq-#{SecureRandom.uuid}", + workflows: [UpdateWithStartWorkflow] + ) + worker.run do + # Newly started + id = "wf-#{SecureRandom.uuid}" + start_workflow_operation = Temporalio::Client::WithStartWorkflowOperation.new( + UpdateWithStartWorkflow, 123, + id:, task_queue: worker.task_queue, id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::FAIL + ) + # Run and confirm result of update is pre-workflow-execute + assert_equal 456, env.client.execute_update_with_start_workflow( + UpdateWithStartWorkflow.increment_counter, 456, start_workflow_operation: + ) + # Confirm query is total + handle = start_workflow_operation.workflow_handle + assert_equal 579, handle.query(UpdateWithStartWorkflow.counter) + + # Update with start 5 more times + 5.times do + env.client.execute_update_with_start_workflow( + UpdateWithStartWorkflow.increment_counter, 2, + start_workflow_operation: Temporalio::Client::WithStartWorkflowOperation.new( + UpdateWithStartWorkflow, 10_000, + id:, task_queue: worker.task_queue, id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::USE_EXISTING + ) + ) + end + # Confirm 10 (i.e. 5 * 2) was added + assert_equal 589, handle.query(UpdateWithStartWorkflow.counter) + + # Confirm we get already-exists error on start and on call if we set fail existing + start_workflow_operation = Temporalio::Client::WithStartWorkflowOperation.new( + UpdateWithStartWorkflow, 123, + id:, task_queue: worker.task_queue, id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::FAIL + ) + assert_raises(Temporalio::Error::WorkflowAlreadyStartedError) do + env.client.execute_update_with_start_workflow( + UpdateWithStartWorkflow.increment_counter, 456, start_workflow_operation: + ) + end + assert_raises(Temporalio::Error::WorkflowAlreadyStartedError) do + start_workflow_operation.workflow_handle + end + end + end + + def test_update_with_start_update_failure + # Run worker + worker = Temporalio::Worker.new( + client: env.client, + task_queue: "tq-#{SecureRandom.uuid}", + workflows: [UpdateWithStartWorkflow] + ) + worker.run do + # Update failed but workflow started + id = "wf-#{SecureRandom.uuid}" + start_workflow_operation = Temporalio::Client::WithStartWorkflowOperation.new( + UpdateWithStartWorkflow, 123, + id:, task_queue: worker.task_queue, id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::FAIL + ) + err = assert_raises(Temporalio::Error::WorkflowUpdateFailedError) do + env.client.execute_update_with_start_workflow(UpdateWithStartWorkflow.fail, 456, start_workflow_operation:) + end + assert_instance_of Temporalio::Error::ApplicationError, err.cause + assert_equal 'Intentional failure', err.cause.message + assert_equal 123, start_workflow_operation.workflow_handle.query(UpdateWithStartWorkflow.counter) + end + end + + def test_update_with_start_cancel + # Run worker + worker = Temporalio::Worker.new( + client: env.client, + task_queue: "tq-#{SecureRandom.uuid}", + workflows: [UpdateWithStartWorkflow] + ) + worker.run do + # Run update in background to start with cancellation + cancellation, cancel_proc = Temporalio::Cancellation.new + id = "wf-#{SecureRandom.uuid}" + start_workflow_operation = Temporalio::Client::WithStartWorkflowOperation.new( + UpdateWithStartWorkflow, 123, + id:, task_queue: worker.task_queue, id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::FAIL + ) + background_exc_queue = Queue.new + run_in_background do + env.client.execute_update_with_start_workflow( + UpdateWithStartWorkflow.start_waiting, 456, + start_workflow_operation:, rpc_options: Temporalio::Client::RPCOptions.new(cancellation:) + ) + rescue Temporalio::Error => e + background_exc_queue << e + end + + # Wait until workflow ID exists + assert_eventually do + env.client.workflow_handle(id).describe + rescue Temporalio::Error::RPCError => e + flunk e.full_message + end + + # Now cancel token and confirm it is a proper cancellation + cancel_proc.call + assert_instance_of Temporalio::Error::WorkflowUpdateRPCTimeoutOrCanceledError, background_exc_queue.pop + end + end + + class SignalWithStartWorkflow < Temporalio::Workflow::Definition + workflow_query_attr_reader :events + + def initialize + @events = [] + end + + def execute(event) + @events << event + Temporalio::Workflow.wait_condition { false } + end + + workflow_signal + def add_event(event) + @events << event + end + end + + def test_signal_with_start + # Run worker + worker = Temporalio::Worker.new( + client: env.client, + task_queue: "tq-#{SecureRandom.uuid}", + workflows: [SignalWithStartWorkflow] + ) + worker.run do + # Newly started + id = "wf-#{SecureRandom.uuid}" + start_workflow_operation = Temporalio::Client::WithStartWorkflowOperation.new( + SignalWithStartWorkflow, 'workflow-start', + id:, task_queue: worker.task_queue + ) + handle = env.client.signal_with_start_workflow( + SignalWithStartWorkflow.add_event, 'signal', start_workflow_operation: + ) + # Confirm same handle + assert_same handle, start_workflow_operation.workflow_handle + # Confirm signal event came first + assert_equal %w[signal workflow-start], handle.query(SignalWithStartWorkflow.events) + + # Signal with start 3 more times + 3.times do |i| + env.client.signal_with_start_workflow( + SignalWithStartWorkflow.add_event, "signal-#{i}", + start_workflow_operation: Temporalio::Client::WithStartWorkflowOperation.new( + SignalWithStartWorkflow, 'not-used', + id:, task_queue: worker.task_queue + ) + ) + end + # Confirm events + assert_equal %w[signal workflow-start signal-0 signal-1 signal-2], handle.query(SignalWithStartWorkflow.events) + end + end end diff --git a/temporalio/test/worker_workflow_test.rb b/temporalio/test/worker_workflow_test.rb index ff6491ca..a2a30bdb 100644 --- a/temporalio/test/worker_workflow_test.rb +++ b/temporalio/test/worker_workflow_test.rb @@ -1750,8 +1750,6 @@ def test_confirm_garbage_collect # * Test workflow cancel causing other cancels at the same time but in different coroutines # * 0-sleep timers vs nil timers # * Interceptors - # * Handler - # * Signal/update with start # * Activity # * Local activity cancel (currently broken) end