Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ option(WITH_RDMA "With RDMA" OFF)
option(BUILD_UNIT_TESTS "Whether to build unit tests" OFF)
option(BUILD_BRPC_TOOLS "Whether to build brpc tools" ON)
option(DOWNLOAD_GTEST "Download and build a fresh copy of googletest. Requires Internet access." ON)
option(WITH_BOOST_STACKTRACE "Link libbrpc.so with boost stack trace" OFF)
message("WITH_BOOST_STACKTRACE: " ${WITH_BOOST_STACKTRACE})

option(IO_URING_ENABLED "Enable IO uring based network" OFF)
if (IO_URING_ENABLED)
Expand Down Expand Up @@ -256,6 +258,16 @@ endif()

set(BRPC_PRIVATE_LIBS "-lgflags -lprotobuf -lleveldb -lprotoc -lssl -lcrypto -ldl -lz")

if(WITH_BOOST_STACKTRACE)
find_library(BOOST_STACKTRACE_LIB NAMES boost_stacktrace_backtrace)
if(NOT BOOST_STACKTRACE_LIB)
message(FATAL_ERROR "Fail to find boost_stacktrace_backtrace")
endif()
list(APPEND DYNAMIC_LIB ${BOOST_STACKTRACE_LIB})
set(BRPC_PRIVATE_LIBS "${BRPC_PRIVATE_LIBS} -lboost_stacktrace_backtrace")
add_compile_definitions(BOOST_STACKTRACE_LINK)
endif()

if(WITH_GLOG)
set(DYNAMIC_LIB ${DYNAMIC_LIB} ${GLOG_LIB})
set(BRPC_PRIVATE_LIBS "${BRPC_PRIVATE_LIBS} -lglog")
Expand Down
12 changes: 12 additions & 0 deletions example/echo_c++/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ set(DYNAMIC_LIB
dl
)

find_path(GLOG_INCLUDE_PATH NAMES glog/logging.h)
find_library(GLOG_LIB NAMES glog VERSION ">=0.6.0" REQUIRED)

if((NOT GLOG_INCLUDE_PATH) OR(NOT GLOG_LIB))
message(FATAL_ERROR "Fail to find glog")
endif()

include_directories(${GLOG_INCLUDE_PATH})
set(DYNAMIC_LIB ${DYNAMIC_LIB} ${GLOG_LIB})

if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
set(DYNAMIC_LIB ${DYNAMIC_LIB}
pthread
Expand All @@ -134,6 +144,8 @@ endif()

add_executable(echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER})
add_executable(echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER})
add_executable(benchmark_echo benchmark.cpp ${PROTO_SRC} ${PROTO_HEADER})

target_link_libraries(echo_client ${BRPC_LIB} ${DYNAMIC_LIB})
target_link_libraries(echo_server ${BRPC_LIB} ${DYNAMIC_LIB})
target_link_libraries(benchmark_echo ${BRPC_LIB} ${DYNAMIC_LIB})
137 changes: 137 additions & 0 deletions example/echo_c++/benchmark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Benchmark echo-server by multiple threads.

#include <pthread.h>
#include <unistd.h>

#include <vector>

#include <gflags/gflags.h>
#include <bthread/bthread.h>
#include <bvar/bvar.h>
#include <butil/logging.h>
#include <brpc/channel.h>
#include <brpc/server.h>

#include "echo.pb.h"

DEFINE_string(message, "hello world", "Message body sent to server");
DEFINE_string(attachment, "", "Carry this along with requests");
DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(server, "0.0.0.0:8000", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(thread_num, 50, "Number of threads to send requests");
DEFINE_bool(use_bthread, false, "Use bthread to send requests");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_bool(ssl, true, "Enable ssl");
DEFINE_bool(dont_fail, false, "Print fatal when some call failed");
DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");

bvar::LatencyRecorder g_latency_recorder("echo_client");

static void* sender(void* arg) {
brpc::Channel* channel = static_cast<brpc::Channel*>(arg);
example::EchoService_Stub stub(channel);

while (!brpc::IsAskedToQuit()) {
example::EchoRequest request;
example::EchoResponse response;
brpc::Controller cntl;

request.set_message(FLAGS_message);
cntl.set_timeout_ms(FLAGS_timeout_ms);
cntl.set_max_retry(FLAGS_max_retry);
cntl.request_attachment().append(FLAGS_attachment);

stub.Echo(&cntl, &request, &response, NULL);
if (!cntl.Failed()) {
g_latency_recorder << cntl.latency_us();
LOG(INFO) << "Echo response: " << response.message();
} else {
LOG(WARNING) << "Fail to echo: " << cntl.ErrorText()
<< " latency=" << cntl.latency_us();
CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail)
<< "error=" << cntl.ErrorText() << " latency=" << cntl.latency_us();
bthread_usleep(100000);
}
}
return NULL;
}

int main(int argc, char* argv[]) {
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);

brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms;
options.max_retry = FLAGS_max_retry;
if (FLAGS_ssl) {
options.mutable_ssl_options();
}

if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}

std::vector<bthread_t> bids;
std::vector<pthread_t> pids;
if (!FLAGS_use_bthread) {
pids.resize(FLAGS_thread_num);
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (pthread_create(&pids[i], NULL, sender, &channel) != 0) {
LOG(ERROR) << "Fail to create pthread";
return -1;
}
}
} else {
bids.resize(FLAGS_thread_num);
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (bthread_start_background(&bids[i], NULL, sender, &channel) != 0) {
LOG(ERROR) << "Fail to create bthread";
return -1;
}
}
}

if (FLAGS_dummy_port >= 0) {
brpc::StartDummyServerAt(FLAGS_dummy_port);
}

while (!brpc::IsAskedToQuit()) {
sleep(1);
LOG(INFO) << "Sending echo requests at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
}

LOG(INFO) << "Echo benchmark is going to quit";
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (!FLAGS_use_bthread) {
pthread_join(pids[i], NULL);
} else {
bthread_join(bids[i], NULL);
}
}

return 0;
}
26 changes: 26 additions & 0 deletions example/echo_c++/cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-----BEGIN CERTIFICATE-----
MIIEUTCCAzmgAwIBAgIBADANBgkqhkiG9w0BAQQFADB9MQswCQYDVQQGEwJDTjER
MA8GA1UECBMIU2hhbmdoYWkxETAPBgNVBAcTCFNoYW5naGFpMQ4wDAYDVQQKEwVC
YWlkdTEMMAoGA1UECxMDSU5GMQwwCgYDVQQDEwNTQVQxHDAaBgkqhkiG9w0BCQEW
DXNhdEBiYWlkdS5jb20wHhcNMTUwNzE2MDMxOTUxWhcNMTgwNTA1MDMxOTUxWjB9
MQswCQYDVQQGEwJDTjERMA8GA1UECBMIU2hhbmdoYWkxETAPBgNVBAcTCFNoYW5n
aGFpMQ4wDAYDVQQKEwVCYWlkdTEMMAoGA1UECxMDSU5GMQwwCgYDVQQDEwNTQVQx
HDAaBgkqhkiG9w0BCQEWDXNhdEBiYWlkdS5jb20wggEiMA0GCSqGSIb3DQEBAQUA
A4IBDwAwggEKAoIBAQCqdyAeHY39tqY1RYVbfpqZjZlJDtZb04znxjgQrX+mKmLb
mwvXgJojlfn2Qcgp4NKYFqDFb9tU/Gbb436dRvkHyWOz0RPMspR0TTRU1NIY8wRy
0A1LOCgLHsbRJHqktGjylejALdgsspFWyDY9bEfb4oWsnKGzJqcvIDXrPmMOOY4o
pbA9SufSzwRZN7Yzc5jAedpaF9SK78RQXtvV0+JfCUwBsBWPKevRFFUrN7rQBYjP
cgV/HgDuquPrqnESVSYyfEBKZba6cmNb+xzO3cB1brPTtobSXh+0o/0CtRA+2m63
ODexxCLntgkPm42IYCJLM15xTatcfVX/3LHQ31DrAgMBAAGjgdswgdgwHQYDVR0O
BBYEFGcd7lA//bSAoSC/NbWRx/H+O1zpMIGoBgNVHSMEgaAwgZ2AFGcd7lA//bSA
oSC/NbWRx/H+O1zpoYGBpH8wfTELMAkGA1UEBhMCQ04xETAPBgNVBAgTCFNoYW5n
aGFpMREwDwYDVQQHEwhTaGFuZ2hhaTEOMAwGA1UEChMFQmFpZHUxDDAKBgNVBAsT
A0lORjEMMAoGA1UEAxMDU0FUMRwwGgYJKoZIhvcNAQkBFg1zYXRAYmFpZHUuY29t
ggEAMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEEBQADggEBAKfoCn8SpLk3uQyT
X+oygcRWfTeJtN3D5J69NCMJ7wB+QPfpEBPwiqMgdbp4bRJ98H7x5UQsHT+EDOT/
9OmipomHInFY4W1ew11zNKwuENeRrnZwTcCiVLZsxZsAU41ZeI5Yq+2WdtxnePCR
VL1/NjKOq+WoRdb2nLSNDWgYMkLRVlt32hyzryyrBbmaxUl8BxnPqUiWduMwsZUz
HNpXkoa1xTSd+En1SHYWfMg8BOVuV0I0/fjUUG9AXVqYpuogfbjAvibVNWAmxOfo
fOjCPCGoJC1ET3AxYkgXGwioobz0pK/13k2pV+wu7W4g+6iTfz+hwZbPsUk2a/5I
f6vXFB0=
-----END CERTIFICATE-----
4 changes: 4 additions & 0 deletions example/echo_c++/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
DEFINE_bool(ssl, true, "Enable ssl");

int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
Expand All @@ -46,6 +47,9 @@ int main(int argc, char* argv[]) {
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
if (FLAGS_ssl) {
options.mutable_ssl_options();
}
if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
Expand Down
27 changes: 27 additions & 0 deletions example/echo_c++/key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAqncgHh2N/bamNUWFW36amY2ZSQ7WW9OM58Y4EK1/pipi25sL
14CaI5X59kHIKeDSmBagxW/bVPxm2+N+nUb5B8ljs9ETzLKUdE00VNTSGPMEctAN
SzgoCx7G0SR6pLRo8pXowC3YLLKRVsg2PWxH2+KFrJyhsyanLyA16z5jDjmOKKWw
PUrn0s8EWTe2M3OYwHnaWhfUiu/EUF7b1dPiXwlMAbAVjynr0RRVKze60AWIz3IF
fx4A7qrj66pxElUmMnxASmW2unJjW/sczt3AdW6z07aG0l4ftKP9ArUQPtputzg3
scQi57YJD5uNiGAiSzNecU2rXH1V/9yx0N9Q6wIDAQABAoIBADN3khflnnhKzDXr
To9IU08nRG+dbjT9U16rJ0RJze+SfpSFZHblWiSCZJzoUZHrUkofEt1pn1QyfK/J
KPI9enTSZirlZk/4XwAaS0GNm/1yahZsIIdkZhqtaSO+GtVdrw4HGuXjMZCVPXJx
MocrCSsnYmqyQ9P+SJ3e4Mis5mVllwDiUVlnTIamSSt16qkPdamLSJrxvI4LirQK
9MZWNLoDFpRU1MJxQ/QzrEC3ONTq4j++AfbGzYTmDDtLeM8OSH5o72YXZ2JkaA4c
xCzHFT+NaJYxF7esn/ctzGg50LYl8IF2UQtzOkX2l3l/OktIB1w+jGV6ONb1EWx5
4zkkzNkCgYEA2EXj7GMsyNE3OYdMw8zrqQKUMON2CNnD+mBseGlr22/bhXtzpqK8
uNel8WF1ezOnVvNsU8pml/W/mKUu6KQt5JfaDzen3OKjzTABVlbJxwFhPvwAeaIA
q/tmSKyqiCgOMbR7Cq4UEwGf2A9/RII4JEC0/aipRU5srF65OYPUOJcCgYEAycco
DFVG6jUw9w68t/X4f7NT4IYP96hSAqLUPuVz2fWwXKLWEX8JiMI+Ue3PbMz6mPcs
4vMu364u4R3IuzrrI+PRK9iTa/pahBP6eF6ZpbY1ObI8CVLTrqUS9p22rr9lBm8V
EZA9hwcHLYt+PWzaKcsFpbP4+AeY7nBBbL9CAM0CgYAzuJsmeB1ItUgIuQOxu7sM
AzLfcjZTLYkBwreOIGAL7XdJN9nTmw2ZAvGLhWwsF5FIaRSaAUiBxOKaJb7PIhxb
k7kxdHTvjT/xHS7ksAK3VewkvO18KTMR7iBq9ugdgb7LQkc+qZzhYr0QVbxw7Ndy
TAs8sm4wxe2VV13ilFVXZwKBgDfU6ZnwBr1Llo7l/wYQA4CiSDU6IzTt2DNuhrgY
mWPX/cLEM+OHeUXkKYZV/S0n0rd8vWjWzUOLWOFlcmOMPAAkS36MYM5h6aXeOVIR
KwaVUkjyrnYN+xC6EHM41JGp1/RdzECd3sh8A1pw3K92bS9fQ+LD18IZqBFh8lh6
23KJAoGAe48SwAsaGvqRO61Taww/Wf+YpGc9lnVbCvNFGScYaycPMqaRBUBmz/U3
QQgpQY8T7JIECbA8sf78SlAZ9x93r0UQ70RekV3WzKAQHfHK8nqTjd3T0+i4aySO
yQpYYCgE24zYO6rQgwrhzI0S4rWe7izDDlg0RmLtQh7Xw+rlkAQ=
-----END RSA PRIVATE KEY-----
10 changes: 10 additions & 0 deletions example/echo_c++/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ DEFINE_string(listen_addr, "", "Server listen address, may be IPV4/IPV6/UDS."
" If this is set, the flag port will be ignored");
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
DEFINE_bool(ssl, true, "Enable TLS server");
DEFINE_string(certificate, "../cert.pem", "Certificate file path to enable SSL");
DEFINE_string(private_key, "../key.pem", "Private key file path to enable SSL");
DEFINE_string(ciphers, "", "Cipher suite used for SSL connections");

// Your implementation of example::EchoService
// Notice that implementing brpc::Describable grants the ability to put
Expand Down Expand Up @@ -104,6 +108,12 @@ int main(int argc, char* argv[]) {
// Start the server.
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
if (FLAGS_ssl) {
options.mutable_ssl_options()->default_cert.certificate = FLAGS_certificate;
options.mutable_ssl_options()->default_cert.private_key = FLAGS_private_key;
options.mutable_ssl_options()->ciphers = FLAGS_ciphers;
}
options.num_threads = 1;
if (server.Start(point, &options) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/event_dispatcher_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "bthread/task_control.h"
#include "bthread/task_group.h"
#include <unordered_map>
#include <boost/stacktrace/stacktrace.hpp>

extern "C" {
extern void bthread_flush();
Expand Down Expand Up @@ -210,6 +211,7 @@ int EventDispatcher::RemoveConsumer(int fd) {
// epoll_wait will keep returning events of the fd continuously, making
// program abnormal.
if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
LOG(INFO) << "RemoveConsumer fd: " << fd << " stack trace: " << boost::stacktrace::stacktrace();
PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd;
return -1;
}
Expand Down
16 changes: 16 additions & 0 deletions src/brpc/input_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
#include "brpc/protocol.h" // ListProtocols
#include "brpc/rdma/rdma_endpoint.h"
#include "brpc/input_messenger.h"

#include <boost/stacktrace/stacktrace.hpp>

#include "brpc/socket.h"

DECLARE_bool(use_io_uring);

namespace brpc {

Expand Down Expand Up @@ -417,10 +421,12 @@ void InputMessenger::OnNewMessagesFromRing(Socket *m) {
InputMessageClosure last_msg;
bool read_eof = false;
while (!read_eof && m->buf_idx_ < m->in_bufs_.size()) {
// LOG(INFO) << "m->buf_idx: " << m->buf_idx_ << ", m->in_bufs_.size(): " << m->in_bufs_.size();
const int64_t received_us = butil::cpuwide_time_us();
const int64_t base_realtime = butil::gettimeofday_us() - received_us;

const ssize_t nr = m->CopyDataRead();
// LOG(INFO) << "CopyDataRead nr: " << nr << ", read_buf size: " << m->_read_buf.size();

if (nr <= 0) {
if (0 == nr) {
Expand All @@ -439,6 +445,7 @@ void InputMessenger::OnNewMessagesFromRing(Socket *m) {
m->ClearInboundBuf();
return;
}
continue;
}
}

Expand Down Expand Up @@ -547,7 +554,11 @@ int InputMessenger::Create(const butil::EndPoint& remote_side,
SocketOptions options;
options.remote_side = remote_side;
options.user = this;
#ifdef IO_URING_ENABLED
options.on_edge_triggered_events = FLAGS_use_io_uring ? OnNewMessagesFromRing : OnNewMessages;
#else
options.on_edge_triggered_events = OnNewMessages;
#endif
options.health_check_interval_s = health_check_interval_s;
if (FLAGS_socket_keepalive) {
options.keepalive_options = std::make_shared<SocketKeepaliveOptions>();
Expand All @@ -571,7 +582,12 @@ int InputMessenger::Create(SocketOptions options, SocketId* id) {
#else
{
#endif
#ifdef IO_URING_ENABLED
// LOG(INFO) << "InputMessenger::Create, " << boost::stacktrace::stacktrace();
options.on_edge_triggered_events = FLAGS_use_io_uring ? OnNewMessagesFromRing : OnNewMessages;
#else
options.on_edge_triggered_events = OnNewMessages;
#endif
}
// Enable keepalive by options or Gflag.
// Priority: options > Gflag.
Expand Down
3 changes: 2 additions & 1 deletion src/brpc/policy/redis_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "bthread/ring_write_buf_pool.h"

DECLARE_bool(use_io_uring);
DECLARE_bool(enable_ssl_io_uring);

namespace bthread {
extern BAIDU_THREAD_LOCAL TaskGroup *tls_task_group;
Expand Down Expand Up @@ -173,7 +174,7 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
#ifdef IO_URING_ENABLED
char *ring_buf = nullptr;
uint16_t ring_buf_idx = UINT16_MAX;
if (FLAGS_use_io_uring) {
if (FLAGS_use_io_uring && !FLAGS_enable_ssl_io_uring) {
std::tie(ring_buf, ring_buf_idx) = cur_group->GetRingWriteBuf();
if (ring_buf_idx != UINT16_MAX) {
appender.set_ring_buffer(ring_buf, RingWriteBufferPool::buf_length);
Expand Down
Loading
Loading