diff --git a/.gitignore b/.gitignore index 8d55a529..0a0cab00 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ bazel-bin bazel-distbench bazel-out bazel-testlogs +gen-cpp/ diff --git a/BUILD b/BUILD index aa2adc33..cd48b745 100644 --- a/BUILD +++ b/BUILD @@ -17,6 +17,16 @@ config_setting( flag_values = {":with-mercury": 'True'} ) +bool_flag( + name = "with-thrift", + build_setting_default = False +) + +config_setting( + name = "with_thrift", + flag_values = {":with-thrift": 'True'} +) + cc_library( name = "distbench_netutils", srcs = [ @@ -115,9 +125,18 @@ cc_library( + select({ "with_mercury": [":protocol_driver_mercury", ], "//conditions:default": [] - }), + }) + + select({ + "with_thrift": [":protocol_driver_thrift"], + "//conditions:default": [] + } + ), copts = select({ - ":with_mercury":["-DWITH_MERCURY"], + ":with_thrift": ["-DWITH_THRIFT"], + "//conditions:default": [] + }) + + select({ + ":with_mercury": ["-DWITH_MERCURY"], "//conditions:default": [] }) ) @@ -188,6 +207,10 @@ cc_test( ":with_mercury":["-DWITH_MERCURY"], "//conditions:default": [] }) + + select({ + ":with_thrift":["-DWITH_THRIFT"], + "//conditions:default": [] + }) ) cc_binary( @@ -263,6 +286,10 @@ cc_test( copts = select({ ":with_mercury":["-DWITH_MERCURY"], "//conditions:default": [] + }) + + select({ + ":with_thrift":["-DWITH_THRIFT"], + "//conditions:default": [] }), ) @@ -320,6 +347,21 @@ cc_test( ], ) +cc_library( + name = "distbench_thrift_lib", + srcs = ["gen-cpp/Distbench.cpp", + # "gen-cpp/distbench_types.cpp" + ], + hdrs = ["gen-cpp/Distbench.h", "gen-cpp/distbench_types.h"], + strip_include_prefix = "gen-cpp/", + deps = [ + "@apache_thrift//:thrift", + ], + tags = [ + "manual" + ], +) + cc_test( name = "distbench_engine_test", size = "medium", @@ -400,4 +442,21 @@ cc_test( ], ) - +cc_library( + name = "protocol_driver_thrift", + srcs = [ + "protocol_driver_thrift.cc", + ], + hdrs = [ + "protocol_driver_thrift.h", + ], + deps = [ + ":distbench_utils", + ":protocol_driver_api", + ":distbench_thrift_lib", + "@apache_thrift//:thrift", + ], + tags = [ + "manual" + ], +) diff --git a/Makefile b/Makefile index 5ec806de..6407ee04 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,8 @@ default: all +thrift_proto: distbench.thrift + ../../opt/thrift/bin/thrift --gen cpp distbench.thrift + testlog: bazel test --test_output=all :all @@ -28,7 +31,13 @@ clang-format: done all: - bazel build :all + bazel build :distbench + +test_with_thrift: thrift_proto + bazel test :all --//:with-thrift + +all_with_thrift: thrift_proto + bazel build :distbench --//:with-thrift clean: bazel clean diff --git a/WORKSPACE.bazel b/WORKSPACE.bazel index 4bc6c1ba..b95f1157 100644 --- a/WORKSPACE.bazel +++ b/WORKSPACE.bazel @@ -74,3 +74,9 @@ new_local_repository( build_file = "libfabric.BUILD", ) +new_local_repository( + name = "apache_thrift", + path = "../opt/thrift", + build_file = "thrift.BUILD", +) + diff --git a/distbench.thrift b/distbench.thrift new file mode 100644 index 00000000..f31b7767 --- /dev/null +++ b/distbench.thrift @@ -0,0 +1,5 @@ +namespace cpp thrift + +service Distbench { + string GenericRPC(1: string payload); +} diff --git a/distbench_test_sequencer_test.cc b/distbench_test_sequencer_test.cc index ee7124ef..bdbcba90 100644 --- a/distbench_test_sequencer_test.cc +++ b/distbench_test_sequencer_test.cc @@ -429,6 +429,20 @@ TEST(DistBenchTestSequencer, RunIntenseTrafficMaxDurationMaxIteration("grpc_async_callback"); } +#ifdef WITH_THRIFT +TEST(DistBenchTestSequencer, RunIntenseTrafficMaxDurationThrift) { + RunIntenseTrafficMaxDuration("thrift"); +} + +TEST(DistBenchTestSequencer, RunIntenseTrafficMaxIterationThrift) { + RunIntenseTrafficMaxIteration("thrift"); +} + +TEST(DistBenchTestSequencer, RunIntenseTrafficMaxDurationMaxIterationThrift) { + RunIntenseTrafficMaxDurationMaxIteration("thrift"); +} +#endif + #ifdef WITH_MERCURY TEST(DistBenchTestSequencer, RunIntenseTrafficMaxDurationMercury) { RunIntenseTrafficMaxDuration("mercury"); diff --git a/docs/thrift_how_to.md b/docs/thrift_how_to.md new file mode 100644 index 00000000..a18a08a8 --- /dev/null +++ b/docs/thrift_how_to.md @@ -0,0 +1,47 @@ +# Running Distbench with Thrift + +## Introduction + +[Apache Thrift](https://thrift.apache.org/) can be used as transport for the +RPCs using the thrift protocol driver. + +Note: Thrift is only used as a transport for the RPC the +serialization/deserialization is still performed using Protobuf. + +## Building the Thrift library + +Build the Apache Thrift library in the parent folder `opt/thrift`. + +```bash +cd .. +git clone https://github.com/apache/thrift.git thrift_src +cd thrift_src +./bootstrap.sh +./configure --without-java --without-go --without-python --without-py3 --prefix=`pwd`/../opt/thrift +make +make check +make install +``` + +Run the Distbench tests: +```bash +cd ../distbench +make test_with_thrift +``` + +## Running with the Thrift protocol driver + +Use the `thrift` protocol driver: +``` +tests { + default_protocol: "thrift" +``` + +Run the test: +```bash +export DISTBENCH_EXTRA_BAZEL_OPTIONS="--//:with-thrift" +./start_distbench_localhost.sh -n 1 +./simple_test.sh +``` + + diff --git a/protocol_driver_allocator.cc b/protocol_driver_allocator.cc index fc93170c..04e79b85 100644 --- a/protocol_driver_allocator.cc +++ b/protocol_driver_allocator.cc @@ -21,6 +21,9 @@ #ifdef WITH_MERCURY #include "protocol_driver_mercury.h" #endif +#ifdef WITH_THRIFT +#include "protocol_driver_thrift.h" +#endif namespace distbench { @@ -53,6 +56,10 @@ absl::StatusOr> AllocateProtocolDriver( #ifdef WITH_MERCURY } else if (opts.protocol_name() == "mercury") { pd = std::make_unique(); +#endif +#ifdef WITH_THRIFT + } else if (opts.protocol_name() == "thrift") { + pd = std::make_unique(); #endif } else { if (alias_resolver_ == nullptr) { diff --git a/protocol_driver_test.cc b/protocol_driver_test.cc index 7c313d2f..9f82c785 100644 --- a/protocol_driver_test.cc +++ b/protocol_driver_test.cc @@ -286,6 +286,12 @@ ProtocolDriverOptions MercuryOptions() { return ret; } +ProtocolDriverOptions ThriftOptions() { + ProtocolDriverOptions ret; + ret.set_protocol_name("thrift"); + return ret; +} + void BM_GrpcEcho(benchmark::State& state) { Echo(state, GrpcOptions()); } void BM_GrpcCallbackEcho(benchmark::State& state) { @@ -302,6 +308,9 @@ INSTANTIATE_TEST_SUITE_P(ProtocolDriverTests, ProtocolDriverTest, GrpcPollingClientHandoffServer(), GrpcPollingClientPollingServer(), GrpcCallbackClientInlineServer(), +#ifdef WITH_THRIFT + ThriftOptions(), +#endif #ifdef WITH_MERCURY MercuryOptions(), #endif diff --git a/protocol_driver_thrift.cc b/protocol_driver_thrift.cc new file mode 100644 index 00000000..30c011ae --- /dev/null +++ b/protocol_driver_thrift.cc @@ -0,0 +1,227 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "protocol_driver_thrift.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "Distbench.h" +#include "distbench_utils.h" + +namespace distbench { + +using namespace ::apache::thrift; +using namespace ::apache::thrift::protocol; +using namespace ::apache::thrift::transport; +using namespace ::apache::thrift::server; + +absl::Status ThriftPeerClient::HandleConnect(std::string ip_address, int port) { + socket_ = std::make_shared(ip_address, port); + transport_ = std::make_shared(socket_); + protocol_ = std::make_shared(transport_); + client_ = std::make_unique(protocol_); + + try { + transport_->open(); + } catch (TException& tx) { + return absl::UnknownError(tx.what()); + } + + return absl::OkStatus(); +} + +ThriftPeerClient::~ThriftPeerClient() { + client_.reset(); + protocol_.reset(); + transport_.reset(); + socket_.reset(); +} + +void DistbenchThriftHandler::GenericRPC(std::string& _return, + const std::string& payload) { + ServerRpcState* rpc_state = new ServerRpcState; + + distbench::GenericRequest* request = new distbench::GenericRequest(); + bool success = request->ParseFromString(payload); + if (!success) { + LOG(ERROR) << "Unable to decode payload of received GenericRPC (Thrift) !"; + } + rpc_state->request = request; + rpc_state->have_dedicated_thread = true; + rpc_state->send_response = [&]() { + rpc_state->response.SerializeToString(&_return); + }; + rpc_state->free_state = [=]() { delete rpc_state; }; + handler_(rpc_state); +} + +void DistbenchThriftHandler::SetHandler( + std::function handler) { + handler_ = handler; +} + +ProtocolDriverThrift::ProtocolDriverThrift() {} + +absl::Status ProtocolDriverThrift::Initialize( + const ProtocolDriverOptions& pd_opts, int* port) { + absl::MutexLock m(&mutex_server_); + + if (server_initialized_) { + LOG(ERROR) << "Server already initialized !"; + return absl::UnknownError("Thrift server already initialized !"); + } + + std::string netdev_name = pd_opts.netdev_name(); + auto maybe_ip = IpAddressForDevice(netdev_name); + if (!maybe_ip.ok()) return maybe_ip.status(); + server_ip_address_ = maybe_ip.value(); + + thrift_handler_ = std::make_unique(); + thrift_processor_ = std::make_unique(thrift_handler_); + TServerSocket* socket = new TServerSocket(server_ip_address_.ip(), *port); + thrift_serverTransport_ = std::shared_ptr(socket); + thrift_transportFactory_ = std::make_unique(); + thrift_protocolFactory_ = std::make_unique(); + thrift_server_ = std::make_unique( + thrift_processor_, thrift_serverTransport_, thrift_transportFactory_, + thrift_protocolFactory_); + server_thread_ = std::thread{[&]() { + thrift_server_->serve(); + LOG(INFO) << "thrift_server_ stopped serving"; + }}; + + while (!socket->isOpen()) + ; + *port = socket->getPort(); + server_port_ = *port; + server_socket_address_ = SocketAddressForIp(server_ip_address_, *port); + + server_initialized_ = true; + LOG(INFO) << "Thrift server listening on " << server_socket_address_; + return absl::OkStatus(); +} + +void ProtocolDriverThrift::SetHandler( + std::function handler) { + thrift_handler_->SetHandler(handler); +} + +void ProtocolDriverThrift::SetNumPeers(int num_peers) { + thrift_peer_clients_.resize(num_peers); +} + +ProtocolDriverThrift::~ProtocolDriverThrift() { + ShutdownServer(); + thrift_peer_clients_.clear(); +} + +absl::StatusOr ProtocolDriverThrift::HandlePreConnect( + std::string_view remote_connection_info, int peer) { + ServerAddress addr; + addr.set_ip_address(server_ip_address_.ip()); + addr.set_port(server_port_); + addr.set_socket_address(server_socket_address_); + std::string ret; + addr.AppendToString(&ret); + return ret; +} + +absl::Status ProtocolDriverThrift::HandleConnect( + std::string remote_connection_info, int peer) { + CHECK_GE(peer, 0); + CHECK_LT(static_cast(peer), thrift_peer_clients_.size()); + ServerAddress addr; + addr.ParseFromString(remote_connection_info); + LOG(INFO) << "Thrift HandleConnect to " << addr.socket_address(); + + return thrift_peer_clients_[peer].HandleConnect(addr.ip_address(), + addr.port()); +} + +std::vector ProtocolDriverThrift::GetTransportStats() { + return {}; +} + +namespace { +struct PendingRpc { + GenericRequest request; + GenericResponse response; + std::function done_callback; + ClientRpcState* state; +}; +} // anonymous namespace + +void ProtocolDriverThrift::InitiateRpc( + int peer_index, ClientRpcState* state, + std::function done_callback) { + CHECK_GE(peer_index, 0); + CHECK_LT(static_cast(peer_index), thrift_peer_clients_.size()); + + ++pending_rpcs_; + + PendingRpc* new_rpc = new PendingRpc; + new_rpc->done_callback = done_callback; + new_rpc->state = state; + new_rpc->request = std::move(state->request); + + std::string request_encoded; + new_rpc->request.SerializeToString(&request_encoded); + + std::string response_encoded; + thrift_peer_clients_[peer_index].client_->GenericRPC(response_encoded, + request_encoded); + bool success = new_rpc->response.ParseFromString(response_encoded); + if (!success) { + LOG(ERROR) << "Unable to decode payload"; + } else { + new_rpc->state->request = std::move(new_rpc->request); + new_rpc->state->response = std::move(new_rpc->response); + } + new_rpc->state->success = success; + new_rpc->done_callback(); + + delete new_rpc; + --pending_rpcs_; +} + +void ProtocolDriverThrift::ChurnConnection(int peer) {} + +void ProtocolDriverThrift::ShutdownClient() { + while (pending_rpcs_) { + } +} + +void ProtocolDriverThrift::ShutdownServer() { + absl::MutexLock m(&mutex_server_); + + if (!server_initialized_) { + LOG(INFO) << "ShutdownServer called while uninitialized !"; + return; + } + + server_initialized_ = false; + + thrift_server_->stop(); + + // Wait for thrift_server_ to shutdown + server_thread_.join(); +} + +} // namespace distbench diff --git a/protocol_driver_thrift.h b/protocol_driver_thrift.h new file mode 100644 index 00000000..412943ae --- /dev/null +++ b/protocol_driver_thrift.h @@ -0,0 +1,117 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DISTBENCH_PROTOCOL_DRIVER_THRIFT_H_ +#define DISTBENCH_PROTOCOL_DRIVER_THRIFT_H_ + +#include +#include +#include +#include +#include +#include + +#include "Distbench.h" +#include "distbench_utils.h" +#include "protocol_driver.h" + +namespace distbench { + +using namespace ::apache::thrift; +using namespace ::apache::thrift::protocol; +using namespace ::apache::thrift::transport; +using namespace ::apache::thrift::server; + +using namespace ::thrift; + +class DistbenchThriftHandler : virtual public DistbenchIf { + public: + DistbenchThriftHandler() {} + + void GenericRPC(std::string& _return, const std::string& generic_request); + + void SetHandler(std::function handler); + + private: + std::function handler_; +}; + +class TrafficService; + +class ThriftPeerClient { + public: + ThriftPeerClient() = default; + ThriftPeerClient(const ThriftPeerClient& value) = default; + ThriftPeerClient(ThriftPeerClient&& value) = default; + ~ThriftPeerClient(); + + absl::Status HandleConnect(std::string ip_address, int port); + std::unique_ptr client_; + + private: + std::shared_ptr socket_; + std::shared_ptr transport_; + std::shared_ptr protocol_; +}; + +class ProtocolDriverThrift : public ProtocolDriver { + public: + ProtocolDriverThrift(); + ~ProtocolDriverThrift() override; + + absl::Status Initialize(const ProtocolDriverOptions& pd_opts, + int* port) override; + + void SetHandler(std::function handler) override; + void SetNumPeers(int num_peers) override; + + // Connects to the actual Thrift service. + absl::Status HandleConnect(std::string remote_connection_info, + int peer) override; + + // Returns the address of the Thrift service. + absl::StatusOr HandlePreConnect( + std::string_view remote_connection_info, int peer) override; + + std::vector GetTransportStats() override; + void InitiateRpc(int peer_index, ClientRpcState* state, + std::function done_callback) override; + void ChurnConnection(int peer) override; + void ShutdownServer() override; + void ShutdownClient() override; + + private: + std::atomic pending_rpcs_ = 0; + std::vector thrift_peer_clients_; + int server_port_ = 0; + DeviceIpAddress server_ip_address_; + std::string server_socket_address_; + + std::thread server_thread_; + + std::shared_ptr thrift_handler_; + std::shared_ptr thrift_processor_; + std::shared_ptr thrift_serverTransport_; + std::shared_ptr thrift_transportFactory_; + std::shared_ptr thrift_protocolFactory_; + + std::unique_ptr thrift_server_; + + mutable absl::Mutex mutex_server_; + bool server_initialized_ ABSL_GUARDED_BY(mutex_server_) = false; +}; + +} // namespace distbench + +#endif // DISTBENCH_PROTOCOL_DRIVER_THRIFT_H_ diff --git a/start_distbench_localhost.sh b/start_distbench_localhost.sh index 1997661a..bfff7202 100755 --- a/start_distbench_localhost.sh +++ b/start_distbench_localhost.sh @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +DISTBENCH_EXTRA_BAZEL_OPTIONS="${DISTBENCH_EXTRA_BAZEL_OPTIONS:-}" + check_dependencies() { # Verify that the needed tools are presents # @@ -28,7 +30,7 @@ build_distbench() { # Build Distbench # echo Attempting to build DistBench... - if ! echo_and_run bazel build :distbench $BAZEL_COMPILATION_OPTIONS + if ! echo_and_run bazel build :distbench -c $COMPILATION_MODE $DISTBENCH_EXTRA_BAZEL_OPTIONS then echo DistBench did not build successfully. exit 2 diff --git a/thrift.BUILD b/thrift.BUILD new file mode 100644 index 00000000..f18ff9b1 --- /dev/null +++ b/thrift.BUILD @@ -0,0 +1,14 @@ +cc_library( + name = "thrift", + hdrs = glob(["**/*.h", "**/*.tcc"]), + strip_include_prefix = "include/", + includes = ["include/"], + visibility = ["//visibility:public"], + deps = [":thrift_static"], + linkopts = ["-lm","-lpthread"], +) + +cc_import( + name = "thrift_static", + static_library = "lib/libthrift.a", +)