Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,22 @@ jobs:
working-directory: ./temporalio
# Timeout just in case there's a hanging part in rake
timeout-minutes: 20
# Set env vars for cloud tests. If secrets aren't present, tests will be skipped.
env:
# For mTLS tests
TEMPORAL_CLOUD_MTLS_TEST_TARGET_HOST: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
TEMPORAL_CLOUD_MTLS_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
TEMPORAL_CLOUD_MTLS_TEST_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
TEMPORAL_CLOUD_MTLS_TEST_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}

# For API key tests
TEMPORAL_CLOUD_API_KEY_TEST_TARGET_HOST: us-west-2.aws.api.temporal.io:7233
TEMPORAL_CLOUD_API_KEY_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
TEMPORAL_CLOUD_API_KEY_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}

# For cloud ops tests
TEMPORAL_CLOUD_OPS_TEST_TARGET_HOST: saas-api.tmprl.cloud:443
TEMPORAL_CLOUD_OPS_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
TEMPORAL_CLOUD_OPS_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
TEMPORAL_CLOUD_OPS_TEST_API_VERSION: 2024-05-13-00
run: bundle exec rake TESTOPTS="--verbose"
20 changes: 18 additions & 2 deletions temporalio/ext/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
class.const_set("SERVICE_HEALTH", SERVICE_HEALTH)?;
class.define_singleton_method("async_new", function!(Client::async_new, 3))?;
class.define_method("async_invoke_rpc", method!(Client::async_invoke_rpc, -1))?;
class.define_method("update_metadata", method!(Client::update_metadata, 1))?;
class.define_method("update_api_key", method!(Client::update_api_key, 1))?;

let inner_class = class.define_error("RPCFailure", ruby.get_inner(&ROOT_ERR))?;
inner_class.define_method("code", method!(RpcFailure::code, 0))?;
Expand Down Expand Up @@ -83,10 +85,16 @@ impl Client {
pub fn async_new(runtime: &Runtime, options: Struct, queue: Value) -> Result<(), Error> {
// Build options
let mut opts_build = ClientOptionsBuilder::default();
let tls = options.child(id!("tls"))?;
opts_build
.target_url(
Url::parse(
format!("http://{}", options.member::<String>(id!("target_host"))?).as_str(),
format!(
"{}://{}",
if tls.is_some() { "https" } else { "http" },
options.member::<String>(id!("target_host"))?
)
.as_str(),
)
.map_err(|err| error!("Failed parsing host: {}", err))?,
)
Expand All @@ -95,7 +103,7 @@ impl Client {
.headers(Some(options.member(id!("rpc_metadata"))?))
.api_key(options.member(id!("api_key"))?)
.identity(options.member(id!("identity"))?);
if let Some(tls) = options.child(id!("tls"))? {
if let Some(tls) = tls {
opts_build.tls_cfg(TlsConfig {
client_tls_config: match (
tls.member::<Option<RString>>(id!("client_cert"))?,
Expand Down Expand Up @@ -223,6 +231,14 @@ impl Client {
let callback = AsyncCallback::from_queue(queue);
self.invoke_rpc(service, callback, call)
}

pub fn update_metadata(&self, headers: HashMap<String, String>) {
self.core.get_client().set_headers(headers);
}

pub fn update_api_key(&self, api_key: Option<String>) {
self.core.get_client().set_api_key(api_key);
}
}

#[derive(DataTypeFunctions, TypedData)]
Expand Down
50 changes: 47 additions & 3 deletions temporalio/lib/temporalio/client/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,16 @@ class Options; end # rubocop:disable Lint/EmptyClass
# @!attribute domain
# @return [String, nil] SNI override. This is only needed for self-hosted servers with certificates that do not
# match the hostname being connected to.
class TLSOptions; end # rubocop:disable Lint/EmptyClass
class TLSOptions
def initialize(
client_cert: nil,
client_private_key: nil,
server_root_ca_cert: nil,
domain: nil
)
super
end
end

RPCRetryOptions = Data.define(
:initial_interval,
Expand Down Expand Up @@ -122,7 +131,9 @@ def initialize(interval: 30.0, timeout: 15.0)
# @return [String, nil] Pass for HTTP basic auth for the proxy, must be combined with {basic_auth_user}.
class HTTPConnectProxyOptions; end # rubocop:disable Lint/EmptyClass

# @return [Options] Frozen options for this client which has the same attributes as {initialize}.
# @return [Options] Frozen options for this client which has the same attributes as {initialize}. Note that if
# {api_key=} or {rpc_metadata=} are updated, the options object is replaced with those changes (it is not
# mutated in place).
attr_reader :options

# @return [WorkflowService] Raw gRPC workflow service.
Expand Down Expand Up @@ -183,6 +194,7 @@ def initialize(
lazy_connect:
).freeze
# Create core client now if not lazy
@core_client_mutex = Mutex.new
_core_client unless lazy_connect
# Create service instances
@workflow_service = WorkflowService.new(self)
Expand All @@ -206,11 +218,43 @@ def connected?
!@core_client.nil?
end

# @return [String, nil] API key. This is a shortcut for `options.api_key`.
def api_key
@options.api_key
end

# Set the API key for all future calls. This also makes a new object for {options} with the changes.
#
# @param new_key [String, nil] New API key.
def api_key=(new_key)
# Mutate the client if connected then mutate options
@core_client_mutex.synchronize do
@core_client&.update_api_key(new_key)
@options = @options.with(api_key: new_key)
end
end

# @return [Hash<String, String>] RPC metadata (aka HTTP headers). This is a shortcut for `options.rpc_metadata`.
def rpc_metadata
@options.rpc_metadata
end

# Set the RPC metadata (aka HTTP headers) for all future calls. This also makes a new object for {options} with
# the changes.
#
# @param rpc_metadata [Hash<String, String>] New API key.
def rpc_metadata=(rpc_metadata)
# Mutate the client if connected then mutate options
@core_client_mutex.synchronize do
@core_client&.update_metadata(rpc_metadata)
@options = @options.with(rpc_metadata: rpc_metadata)
end
end

# @!visibility private
def _core_client
# If lazy, this needs to be done under mutex
if @options.lazy_connect
@core_client_mutex ||= Mutex.new
@core_client_mutex.synchronize do
@core_client ||= new_core_client
end
Expand Down
4 changes: 4 additions & 0 deletions temporalio/sig/temporalio/client/connection.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ module Temporalio
def target_host: -> String
def identity: -> String
def connected?: -> bool
def api_key: -> String?
def api_key=: (String? new_key) -> void
def rpc_metadata: -> Hash[String, String]
def rpc_metadata=: (Hash[String, String] rpc_metadata) -> void
def _core_client: -> Internal::Bridge::Client
private def new_core_client: -> Internal::Bridge::Client
end
Expand Down
3 changes: 3 additions & 0 deletions temporalio/sig/temporalio/internal/bridge/client.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ module Temporalio
queue: Queue
) -> void

def update_metadata: (Hash[String, String]) -> void
def update_api_key: (String?) -> void

class RPCFailure < Error
def code: -> Temporalio::Error::RPCError::Code::enum
def message: -> String
Expand Down
73 changes: 73 additions & 0 deletions temporalio/test/client_cloud_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# frozen_string_literal: true

require 'securerandom'
require 'temporalio/api'
require 'temporalio/client'
require 'test'

class ClientCloudTest < Test
class SimpleWorkflow < Temporalio::Workflow::Definition
def execute(name)
"Hello, #{name}!"
end
end

def test_mtls
client_private_key = ENV.fetch('TEMPORAL_CLOUD_MTLS_TEST_CLIENT_KEY', '')
skip('No cloud mTLS key') if client_private_key.empty?

client = Temporalio::Client.connect(
ENV.fetch('TEMPORAL_CLOUD_MTLS_TEST_TARGET_HOST'),
ENV.fetch('TEMPORAL_CLOUD_MTLS_TEST_NAMESPACE'),
tls: Temporalio::Client::Connection::TLSOptions.new(
client_cert: ENV.fetch('TEMPORAL_CLOUD_MTLS_TEST_CLIENT_CERT'),
client_private_key:
)
)
assert_equal 'Hello, Temporal!', execute_workflow(SimpleWorkflow, 'Temporal', client:)
end

def test_api_key
api_key = ENV.fetch('TEMPORAL_CLOUD_API_KEY_TEST_API_KEY', '')
skip('No cloud API key') if api_key.empty?

client = Temporalio::Client.connect(
ENV.fetch('TEMPORAL_CLOUD_API_KEY_TEST_TARGET_HOST'),
ENV.fetch('TEMPORAL_CLOUD_API_KEY_TEST_NAMESPACE'),
api_key:,
tls: true,
rpc_metadata: { 'temporal-namespace' => ENV.fetch('TEMPORAL_CLOUD_API_KEY_TEST_NAMESPACE') }
)
# Run workflow
id = "wf-#{SecureRandom.uuid}"
assert_equal 'Hello, Temporal!', execute_workflow(SimpleWorkflow, 'Temporal', id:, client:)
handle = client.workflow_handle(id)

# Confirm it can be described
assert_equal 'SimpleWorkflow', handle.describe.workflow_type

# Change API and confirm failure
client.connection.api_key = 'wrong'
assert_raises(Temporalio::Error::RPCError) { handle.describe.workflow_type }
end

def test_cloud_ops
api_key = ENV.fetch('TEMPORAL_CLOUD_OPS_TEST_API_KEY', '')
skip('No cloud API key') if api_key.empty?

# Create connection
conn = Temporalio::Client::Connection.new(
target_host: ENV.fetch('TEMPORAL_CLOUD_OPS_TEST_TARGET_HOST'),
api_key:,
tls: true,
rpc_metadata: { 'temporal-cloud-api-version' => ENV.fetch('TEMPORAL_CLOUD_OPS_TEST_API_VERSION') }
)

# Simple call
namespace = ENV.fetch('TEMPORAL_CLOUD_OPS_TEST_NAMESPACE')
res = conn.cloud_service.get_namespace(
Temporalio::Api::Cloud::CloudService::V1::GetNamespaceRequest.new(namespace:)
)
assert_equal namespace, res.namespace.namespace
end
end
Loading