From e23fac796d7f733a74535b43c6990c5889d43f07 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Tue, 13 Jan 2026 17:00:46 +0800 Subject: [PATCH 01/14] boost stacktrace --- CMakeLists.txt | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index d42a7120..48111b6f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,6 +28,8 @@ option(WITH_RDMA "With RDMA" OFF) option(BUILD_UNIT_TESTS "Whether to build unit tests" OFF) option(BUILD_BRPC_TOOLS "Whether to build brpc tools" ON) option(DOWNLOAD_GTEST "Download and build a fresh copy of googletest. Requires Internet access." ON) +option(WITH_BOOST_STACKTRACE "Link libbrpc.so with boost stack trace" OFF) +message("WITH_BOOST_STACKTRACE: " ${WITH_BOOST_STACKTRACE}) option(IO_URING_ENABLED "Enable IO uring based network" OFF) if (IO_URING_ENABLED) @@ -256,6 +258,16 @@ endif() set(BRPC_PRIVATE_LIBS "-lgflags -lprotobuf -lleveldb -lprotoc -lssl -lcrypto -ldl -lz") +if(WITH_BOOST_STACKTRACE) + find_library(BOOST_STACKTRACE_LIB NAMES boost_stacktrace_backtrace) + if(NOT BOOST_STACKTRACE_LIB) + message(FATAL_ERROR "Fail to find boost_stacktrace_backtrace") + endif() + list(APPEND DYNAMIC_LIB ${BOOST_STACKTRACE_LIB}) + set(BRPC_PRIVATE_LIBS "${BRPC_PRIVATE_LIBS} -lboost_stacktrace_backtrace") + add_compile_definitions(BOOST_STACKTRACE_LINK) +endif() + if(WITH_GLOG) set(DYNAMIC_LIB ${DYNAMIC_LIB} ${GLOG_LIB}) set(BRPC_PRIVATE_LIBS "${BRPC_PRIVATE_LIBS} -lglog") From 799ed1e8b8b43045bb017729ca38ead0c7db793e Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Thu, 15 Jan 2026 14:39:42 +0800 Subject: [PATCH 02/14] feat: add TLS io_uring write path using memory BIOs - add TLS ring context + helpers (Feed/Drain/Flush ciphertext) - rebind SSL sessions to memory BIO when io_uring TLS is enabled - implement DoWriteTlsRing: feed plaintext to SSL, drain BIO, flush via io_uring - guard logic behind --enable_ssl_io_uring flag, document helpers --- src/brpc/socket.cpp | 227 +++++++++++++++++++++++++++++++++++++++++++- src/brpc/socket.h | 26 +++++ 2 files changed, 248 insertions(+), 5 deletions(-) diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 9c1acfea..8db48e4c 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -64,8 +64,11 @@ DEFINE_bool(dispatch_lazily, false, "dispatcher lazily creates task"); #ifdef IO_URING_ENABLED DEFINE_bool(use_io_uring, false, "Use IO URING to do the polling."); +DEFINE_bool(enable_ssl_io_uring, false, "Use memory BIO + io_uring for TLS sockets."); #endif +#include + namespace bthread { size_t __attribute__((weak)) get_sizes(const bthread_id_list_t* list, size_t* cnt, size_t n); @@ -360,6 +363,25 @@ struct RegisteredRingBuffer { }; #endif +#ifdef IO_URING_ENABLED +struct Socket::TlsRingContext { + TlsRingContext() + : mem_rbio(NULL) + , mem_wbio(NULL) {} + + ~TlsRingContext() { + // Actual BIO lifetime is now owned by SSL after SSL_set_bio. + mem_rbio = NULL; + mem_wbio = NULL; + } + + BIO* mem_rbio; + BIO* mem_wbio; + // Ciphertext drained from memory BIO waiting to be submitted via io_uring. + butil::IOBuf pending_cipher_out; +}; +#endif + struct BAIDU_CACHELINE_ALIGNMENT Socket::WriteRequest { static WriteRequest* const UNCONNECTED; @@ -534,6 +556,10 @@ Socket::Socket(Forbidden) , _unwritten_bytes(0) , _epollout_butex(NULL) , _write_head(NULL) +#ifdef IO_URING_ENABLED + , _tls_ring_ctx() + , _tls_uses_ring(false) +#endif , _stream_set(NULL) , _total_streams_unconsumed_size(0) , _ninflight_app_health_check(0) @@ -544,6 +570,9 @@ Socket::Socket(Forbidden) } Socket::~Socket() { +#ifdef IO_URING_ENABLED + DestroyTlsRingContext(); +#endif pthread_mutex_destroy(&_id_wait_list_mutex); bthread::butex_destroy(_epollout_butex); } @@ -618,6 +647,136 @@ void Socket::ReleaseAllFailedWriteRequests(Socket::WriteRequest* req) { ReturnFailedWriteRequest(req, error_code, error_text); } +#ifdef IO_URING_ENABLED +int Socket::InitTlsRingContext(int /*fd*/) { + if (!FLAGS_use_io_uring || !FLAGS_enable_ssl_io_uring) { + return 0; + } + if (_tls_ring_ctx) { + return 0; + } + _tls_ring_ctx = std::make_unique(); + if (_tls_ring_ctx == NULL) { + LOG(ERROR) << "Fail to allocate TlsRingContext"; + return -1; + } + _tls_uses_ring = true; + return 0; +} + +void Socket::DestroyTlsRingContext() { + _tls_ring_ctx.reset(); + _tls_uses_ring = false; +} + +int Socket::AddMemoryBIO(int fd) { + if (!FLAGS_use_io_uring || !FLAGS_enable_ssl_io_uring) { + return 0; + } + if (InitTlsRingContext(fd) != 0) { + return -1; + } + BIO* mem_rbio = BIO_new(BIO_s_mem()); + BIO* mem_wbio = BIO_new(BIO_s_mem()); + if (!mem_rbio || !mem_wbio) { + if (mem_rbio) { + BIO_free(mem_rbio); + } + if (mem_wbio) { + BIO_free(mem_wbio); + } + LOG(ERROR) << "Fail to create memory BIO for TLS ring"; + return -1; + } + { + BAIDU_SCOPED_LOCK(_ssl_session_mutex); + BIO* old_wbio = SSL_get_wbio(_ssl_session); + if (old_wbio) { + BIO_flush(old_wbio); + } + SSL_set_bio(_ssl_session, mem_rbio, mem_wbio); + } + _tls_ring_ctx->mem_rbio = mem_rbio; + _tls_ring_ctx->mem_wbio = mem_wbio; + return 0; +} + +int Socket::FeedTlsCiphertext(const void* data, size_t len) { + if (!_tls_ring_ctx || !_tls_ring_ctx->mem_rbio || len == 0) { + return 0; + } + const int nw = BIO_write(_tls_ring_ctx->mem_rbio, data, len); + if (nw <= 0) { + return -1; + } + return nw; +} + +int Socket::DrainTlsCiphertext() { + if (!_tls_ring_ctx || !_tls_ring_ctx->mem_wbio) { + return 0; + } + char buf[16 * 1024]; + int total = 0; + while (BIO_pending(_tls_ring_ctx->mem_wbio) > 0) { + int rc = BIO_read(_tls_ring_ctx->mem_wbio, buf, sizeof(buf)); + if (rc <= 0) { + break; + } + _tls_ring_ctx->pending_cipher_out.append(buf, rc); + total += rc; + } + return total; +} + +int Socket::FlushTlsCiphertext() { + if (!_tls_ring_ctx) { + return 0; + } + ssize_t total = 0; + while (!_tls_ring_ctx->pending_cipher_out.empty()) { + _tls_ring_ctx->pending_cipher_out.cut_into_iovecs(&iovecs_); + if (iovecs_.empty()) { + break; + } + bthread::TaskGroup* g = bthread::TaskGroup::VolatileTLSTaskGroup(); + int rc = g->SocketWaitingNonFixedWrite(this); + if (rc < 0) { + errno = -rc; + iovecs_.clear(); + return -1; + } + total += rc; + _tls_ring_ctx->pending_cipher_out.pop_front(rc); + iovecs_.clear(); + } + return total; +} + +ssize_t Socket::ConsumeTlsPlaintext(size_t size_hint) { + if (!_tls_ring_ctx || !_tls_ring_ctx->mem_rbio) { + return 0; + } + ssize_t total = 0; + while (total < (ssize_t)size_hint) { + char buf[4 * 1024]; + ERR_clear_error(); + int rc = SSL_read(_ssl_session, buf, std::min(sizeof(buf), size_hint - total)); + if (rc <= 0) { + int ssl_err = SSL_get_error(_ssl_session, rc); + if (ssl_err == SSL_ERROR_WANT_READ) { + break; + } + errno = (ssl_err == SSL_ERROR_ZERO_RETURN) ? ECONNRESET : ESSL; + return -1; + } + _read_buf.append(buf, rc); + total += rc; + } + return total; +} +#endif + int Socket::ResetFileDescriptor(int fd, size_t bound_gid) { // Reset message sizes when fd is changed. _last_msg_size = 0; @@ -1197,6 +1356,9 @@ int Socket::Status(SocketId id, int32_t* nref) { } void Socket::OnRecycle() { +#ifdef IO_URING_ENABLED + DestroyTlsRingContext(); +#endif const bool create_by_connect = CreatedByConnect(); if (_app_connect) { std::shared_ptr tmp; @@ -2271,17 +2433,17 @@ ssize_t Socket::DoWrite(WriteRequest* req) { } } -#ifdef IO_URING_ENABLED - // io_uring and SSL is not supported. - CHECK(!use_mixed_data_list); -#endif - CHECK_EQ(SSL_CONNECTED, ssl_state()); if (_conn) { // TODO: Separate SSL stuff from SocketConnection BAIDU_SCOPED_LOCK(_ssl_session_mutex); return _conn->CutMessageIntoSSLChannel(_ssl_session, data_list, ndata); } +#ifdef IO_URING_ENABLED + if (FLAGS_use_io_uring && FLAGS_enable_ssl_io_uring && _tls_ring_ctx) { + return DoWriteTlsRing(req, data_list, ndata); + } +#endif int ssl_error = 0; ssize_t nw = 0; { @@ -2317,6 +2479,50 @@ ssize_t Socket::DoWrite(WriteRequest* req) { return nw; } +#ifdef IO_URING_ENABLED +ssize_t Socket::DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], size_t ndata) { + ssize_t total_plain = 0; + // SSL objects are not thread-safe. Guard all SSL_write operations. + BAIDU_SCOPED_LOCK(_ssl_session_mutex); + for (size_t i = 0; i < ndata; ++i) { + butil::IOBuf* buf = data_list[i]; + while (!buf->empty()) { + int ssl_error = SSL_ERROR_NONE; + // cut_into_SSL_channel sends plaintext into SSL_write and removes + // consumed bytes from |buf| when successful. + const ssize_t nw = buf->cut_into_SSL_channel(_ssl_session, &ssl_error); + if (nw > 0) { + total_plain += nw; + continue; + } + + if (ssl_error == SSL_ERROR_WANT_WRITE) { + // Memory BIO is full. Drain existing ciphertext and push to + // io_uring before retrying the remaining plaintext. + DrainTlsCiphertext(); + if (FlushTlsCiphertext() < 0) { + return -1; + } + continue; + } + + if (ssl_error == SSL_ERROR_WANT_READ) { + errno = EPROTO; + } else { + errno = ESSL; + } + return -1; + } + } + DrainTlsCiphertext(); + if (FlushTlsCiphertext() < 0) { + return -1; + } + + return total_plain; +} +#endif + int Socket::SSLHandshake(int fd, bool server_mode) { if (_ssl_ctx == NULL) { if (server_mode) { @@ -2349,10 +2555,20 @@ int Socket::SSLHandshake(int fd, bool server_mode) { // as it may confuse the origin event processing code. while (true) { ERR_clear_error(); + LOG(INFO) << "SSLHandshake: " << *this << ", " << boost::stacktrace::stacktrace(); int rc = SSL_do_handshake(_ssl_session); if (rc == 1) { _ssl_state = SSL_CONNECTED; +#ifdef IO_URING_ENABLED + if (FLAGS_use_io_uring && FLAGS_enable_ssl_io_uring) { + if (AddMemoryBIO(fd) != 0) { + return -1; + } + return 0; + } +#else AddBIOBuffer(_ssl_session, fd, FLAGS_ssl_bio_buffer_size); +#endif return 0; } @@ -2437,6 +2653,7 @@ ssize_t Socket::DoRead(size_t size_hint) { int ssl_error = 0; ssize_t nr = 0; { + LOG(INFO) << "DoRead: " << *this << ", " << boost::stacktrace::stacktrace(); BAIDU_SCOPED_LOCK(_ssl_session_mutex); nr = _read_buf.append_from_SSL_channel(_ssl_session, &ssl_error, size_hint); } diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 7749bcf0..7d9d81f0 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -21,6 +21,7 @@ #include // std::ostream #include // std::deque +#include // std::unique_ptr #include // std::set #include "butil/atomicops.h" // butil::atomic #include "bthread/types.h" // bthread_id_t @@ -49,6 +50,8 @@ namespace bthread { class TaskGroup; } +typedef struct bio_st BIO; + namespace brpc { namespace policy { class ConsistentHashingLoadBalancer; @@ -277,6 +280,9 @@ friend class policy::H2GlobalStreamCreator; class SharedPart; struct Forbidden {}; struct WriteRequest; +#ifdef IO_URING_ENABLED + struct TlsRingContext; +#endif public: const static int STREAM_FAKE_FD = INT_MAX; @@ -352,6 +358,7 @@ friend class policy::H2GlobalStreamCreator; #endif }; + #ifdef IO_URING_ENABLED int Write(char *ring_buf, uint16_t ring_buf_idx, uint32_t ring_buf_size, const WriteOptions* options = NULL); @@ -642,6 +649,18 @@ friend class policy::H2GlobalStreamCreator; int CopyDataRead(); void ClearInboundBuf(); bool RecycleInBackgroundIfNecessary(); + // TLS memory BIO helpers (io_uring path only). + int InitTlsRingContext(int fd); + void DestroyTlsRingContext(); + int AddMemoryBIO(int fd); + // Push ciphertext read from io_uring into SSL's memory BIO. + int FeedTlsCiphertext(const void* data, size_t len); + // Drain encrypted bytes from SSL's memory BIO to pending buffer. + int DrainTlsCiphertext(); + // Push pending ciphertext to io_uring (synchronous submit). + int FlushTlsCiphertext(); + // Pull decrypted plaintext from SSL into _read_buf. + ssize_t ConsumeTlsPlaintext(size_t size_hint); #endif private: DISALLOW_COPY_AND_ASSIGN(Socket); @@ -678,6 +697,9 @@ friend void DereferenceSocket(Socket*); // `req' using the corresponding method. Returns written bytes on // success, -1 otherwise and errno is set ssize_t DoWrite(WriteRequest* req); +#ifdef IO_URING_ENABLED + ssize_t DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], size_t ndata); +#endif // Called before returning to pool. void OnRecycle(); @@ -976,6 +998,10 @@ friend void DereferenceSocket(Socket*); // Storing data that are not flushed into `fd' yet. butil::atomic _write_head; +#ifdef IO_URING_ENABLED + std::unique_ptr _tls_ring_ctx; + bool _tls_uses_ring{}; +#endif butil::Mutex _stream_mutex; std::set *_stream_set; From ecd651bc06185c5e6fb6e9dd2fcd4dee6481300f Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Thu, 15 Jan 2026 15:31:24 +0800 Subject: [PATCH 03/14] fix DoWrite data_list, add debugging logs --- src/brpc/socket.cpp | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 8db48e4c..ddf31f86 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -695,6 +695,8 @@ int Socket::AddMemoryBIO(int fd) { BIO_flush(old_wbio); } SSL_set_bio(_ssl_session, mem_rbio, mem_wbio); + LOG_IF(INFO, FLAGS_enable_ssl_io_uring) + << "Socket=" << *this << " switched SSL BIO to memory (fd=" << fd << ")"; } _tls_ring_ctx->mem_rbio = mem_rbio; _tls_ring_ctx->mem_wbio = mem_wbio; @@ -726,6 +728,7 @@ int Socket::DrainTlsCiphertext() { _tls_ring_ctx->pending_cipher_out.append(buf, rc); total += rc; } + LOG_IF(INFO, total > 0) << "Socket=" << *this << " drained " << total << " bytes TLS ciphertext"; return total; } @@ -749,6 +752,7 @@ int Socket::FlushTlsCiphertext() { total += rc; _tls_ring_ctx->pending_cipher_out.pop_front(rc); iovecs_.clear(); + LOG(INFO) << "Socket=" << *this << " flushed " << rc << " bytes TLS ciphertext via io_uring"; } return total; } @@ -2364,7 +2368,7 @@ ssize_t Socket::DoWrite(WriteRequest* req) { #ifdef IO_URING_ENABLED std::variant mixed_data_list[DATA_LIST_MAX]; // If io_uring is used and this is not a stream, use mixed_data_list. - bool use_mixed_data_list = FLAGS_use_io_uring && _conn == nullptr; + bool use_mixed_data_list = FLAGS_use_io_uring && _conn == nullptr && !FLAGS_enable_ssl_io_uring; #endif size_t ndata = 0; for (WriteRequest* p = req; p != NULL && ndata < DATA_LIST_MAX; @@ -2501,8 +2505,12 @@ ssize_t Socket::DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], siz // io_uring before retrying the remaining plaintext. DrainTlsCiphertext(); if (FlushTlsCiphertext() < 0) { + LOG(WARNING) << "Socket=" << *this + << " FlushTlsCiphertext failed during WANT_WRITE: " + << berror(errno); return -1; } + LOG(INFO) << "Socket=" << *this << " TLS WANT_WRITE resolved, retrying"; continue; } @@ -2511,13 +2519,19 @@ ssize_t Socket::DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], siz } else { errno = ESSL; } + LOG(WARNING) << "Socket=" << *this << " TLS write failed: " + << SSLError(ERR_get_error()) << " errno=" << errno; return -1; } } DrainTlsCiphertext(); if (FlushTlsCiphertext() < 0) { + LOG(WARNING) << "Socket=" << *this + << " FlushTlsCiphertext failed after draining: " + << berror(errno); return -1; } + LOG(INFO) << "Socket=" << *this << " wrote TLS plaintext=" << total_plain; return total_plain; } @@ -2653,7 +2667,7 @@ ssize_t Socket::DoRead(size_t size_hint) { int ssl_error = 0; ssize_t nr = 0; { - LOG(INFO) << "DoRead: " << *this << ", " << boost::stacktrace::stacktrace(); + // LOG(INFO) << "DoRead: " << *this << ", " << boost::stacktrace::stacktrace(); BAIDU_SCOPED_LOCK(_ssl_session_mutex); nr = _read_buf.append_from_SSL_channel(_ssl_session, &ssl_error, size_hint); } From 14b612e30fc9241fefd836926b6cf93a4636c03d Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Thu, 15 Jan 2026 19:41:07 +0800 Subject: [PATCH 04/14] feat: add io_uring TLS handshake support - handle TLS detection in CopyDataRead via a cached buffer and AddMemoryBIO-based helpers - add EnsureTlsSession/ContinueTlsHandshake/ProcessTlsRingData to drive handshake + decryption through memory BIO - refactor io_uring TLS write path (DoWriteTlsRing) with drain/flush helpers and add debug logs - add sample TLS flags to echo client/server for testing --- src/brpc/input_messenger.cpp | 1 + src/brpc/socket.cpp | 154 ++++++++++++++++++++++++++++++++++- src/brpc/socket.h | 8 ++ 3 files changed, 161 insertions(+), 2 deletions(-) diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp index c6a19cd6..12242ecb 100644 --- a/src/brpc/input_messenger.cpp +++ b/src/brpc/input_messenger.cpp @@ -422,6 +422,7 @@ void InputMessenger::OnNewMessagesFromRing(Socket *m) { const ssize_t nr = m->CopyDataRead(); + // TODO(zkl): EAGAIN should check buf_idx_ and return if no new data if (nr <= 0) { if (0 == nr) { // Set `read_eof' flag and proceed to feed EOF into `Protocol' diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index ddf31f86..254624ca 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -779,6 +779,140 @@ ssize_t Socket::ConsumeTlsPlaintext(size_t size_hint) { } return total; } + +int Socket::EnsureTlsSessionForRing() { + // Create or reuse the SSL session and switch it to memory BIO so that + // handshake/read/write can be driven entirely by io_uring buffers. + if (!_ssl_ctx) { + LOG(ERROR) << "Socket=" << *this << " lacks SSL context to handle TLS data"; + errno = ESSL; + return -1; + } + if (_ssl_session == NULL) { + _ssl_session = CreateSSLSession(_ssl_ctx->raw_ctx, id(), fd(), true); + if (_ssl_session == NULL) { + LOG(ERROR) << "Socket=" << *this << " failed to create SSL session"; + errno = ESSL; + return -1; + } + SSL_set_accept_state(_ssl_session); + } + if (AddMemoryBIO(fd()) != 0) { + errno = ESSL; + return -1; + } + return 0; +} + +int Socket::ContinueTlsHandshake() { + // Drive SSL handshake until it completes or requires more data. Any + // handshake output is drained and submitted via io_uring. + while (_ssl_state == SSL_CONNECTING) { + ERR_clear_error(); + int rc = SSL_do_handshake(_ssl_session); + if (rc == 1) { + _ssl_state = SSL_CONNECTED; + break; + } + const int ssl_err = SSL_get_error(_ssl_session, rc); + if (ssl_err == SSL_ERROR_WANT_READ) { + break; + } else if (ssl_err == SSL_ERROR_WANT_WRITE) { + DrainTlsCiphertext(); + if (FlushTlsCiphertext() < 0) { + LOG(WARNING) << "Socket=" << *this + << " failed to flush TLS handshake: " << berror(errno); + return -1; + } + } else { + LOG(WARNING) << "Socket=" << *this << " TLS handshake error: " + << SSLError(ERR_get_error()); + errno = ESSL; + return -1; + } + } + return 0; +} + +ssize_t Socket::ProcessTlsRingData(const char* data, size_t len) { + // Feed ciphertext into the SSL read BIO and, if handshake is still in + // progress, continue driving it. When fully connected, decrypt plaintext + // into _read_buf. + if (len > 0) { + if (FeedTlsCiphertext(data, len) < 0) { + errno = ESSL; + return -1; + } + } + if (_ssl_state == SSL_CONNECTING) { + if (ContinueTlsHandshake() != 0) { + return -1; + } + } + if (_ssl_state != SSL_CONNECTED) { + return 0; + } + ssize_t plain = ConsumeTlsPlaintext(len); + return plain < 0 ? -1 : plain; +} + +#ifdef IO_URING_ENABLED +static SSLState DetectSSLStateInBuffer(const char* header, size_t len) { + if (len < 5) { + return SSL_UNKNOWN; + } + if (header[0] == 0x16 && len >= 6 && header[5] == 0x01) { + return SSL_CONNECTING; + } + if ((header[0] & 0x80) == 0x80 && len >= 3 && header[2] == 0x01) { + return SSL_CONNECTING; + } + return SSL_OFF; +} +#endif + +ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { + if (_ssl_state == SSL_UNKNOWN) { + // Accumulate bytes until we can determine whether this connection is + // using TLS. The first handshake record needs at least 6 bytes. + _tls_detect_buf.append(data, len); + if (_tls_detect_buf.length() < 6) { + return len; + } + char header[6]; + _tls_detect_buf.copy_to(header, 6); + SSLState s = DetectSSLStateInBuffer(header, 6); + if (s == SSL_CONNECTING) { + if (EnsureTlsSessionForRing() != 0) { + errno = ESSL; + return -1; + } + _ssl_state = SSL_CONNECTING; + std::string cached; + cached.resize(_tls_detect_buf.size()); + _tls_detect_buf.copy_to(cached.data(), cached.size()); + _tls_detect_buf.clear(); + return ProcessTlsRingData(cached.data(), cached.size()); + } else { + _ssl_state = SSL_OFF; + _read_buf.append(_tls_detect_buf); + ssize_t plain = _tls_detect_buf.length(); + _tls_detect_buf.clear(); + if (_force_ssl) { + errno = ESSL; + return -1; + } + return plain; + } + } + if (_ssl_state == SSL_CONNECTING || _ssl_state == SSL_CONNECTED) { + // Once we know the connection is TLS, all incoming ciphertext goes + // through ProcessTlsRingData. + return ProcessTlsRingData(data, len); + } + _read_buf.append(data, len); + return len; +} #endif int Socket::ResetFileDescriptor(int fd, size_t bound_gid) { @@ -2537,6 +2671,7 @@ ssize_t Socket::DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], siz } #endif + int Socket::SSLHandshake(int fd, bool server_mode) { if (_ssl_ctx == NULL) { if (server_mode) { @@ -3642,11 +3777,26 @@ int Socket::CopyDataRead() { CHECK(static_cast(buf_idx_) < in_bufs_.size()); auto &rbuf = in_bufs_[buf_idx_]; int nw = rbuf.bytes_; + int ret = nw; if (rbuf.bytes_ > 0) { const char *buf_head = cur_group->GetRingReadBuf(rbuf.buf_id_); - _read_buf.append(buf_head, rbuf.bytes_); +#ifdef IO_URING_ENABLED + if (FLAGS_use_io_uring && FLAGS_enable_ssl_io_uring) { + ssize_t plain = HandleTlsRingRead(buf_head, rbuf.bytes_); + if (plain < 0) { + ret = -errno; + goto END; + } + ret = plain; + } else +#endif + { + _read_buf.append(buf_head, rbuf.bytes_); + ret = rbuf.bytes_; + } } +END: if (rbuf.need_rearm_ && rbuf.bytes_ != 0) { cur_group->SocketRecv(this); } @@ -3655,7 +3805,7 @@ int Socket::CopyDataRead() { cur_group->RecycleRingReadBuf(rbuf.buf_id_, rbuf.bytes_); } buf_idx_++; - return nw; + return ret; } void Socket::ClearInboundBuf() { diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 7d9d81f0..a7de64b6 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -699,6 +699,11 @@ friend void DereferenceSocket(Socket*); ssize_t DoWrite(WriteRequest* req); #ifdef IO_URING_ENABLED ssize_t DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], size_t ndata); + // io_uring TLS helpers for read path + int EnsureTlsSessionForRing(); + int ContinueTlsHandshake(); + ssize_t ProcessTlsRingData(const char* data, size_t len); + ssize_t HandleTlsRingRead(const char* data, size_t len); #endif // Called before returning to pool. @@ -998,6 +1003,9 @@ friend void DereferenceSocket(Socket*); // Storing data that are not flushed into `fd' yet. butil::atomic _write_head; +#ifdef IO_URING_ENABLED + butil::IOBuf _tls_detect_buf; +#endif #ifdef IO_URING_ENABLED std::unique_ptr _tls_ring_ctx; bool _tls_uses_ring{}; From 1b6e727bef16cbb06d5fd4e0b944ef1252c279c9 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Fri, 16 Jan 2026 19:11:17 +0800 Subject: [PATCH 05/14] Connect side use iouring --- src/brpc/input_messenger.cpp | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp index 12242ecb..64572ae7 100644 --- a/src/brpc/input_messenger.cpp +++ b/src/brpc/input_messenger.cpp @@ -29,8 +29,12 @@ #include "brpc/protocol.h" // ListProtocols #include "brpc/rdma/rdma_endpoint.h" #include "brpc/input_messenger.h" + +#include + #include "brpc/socket.h" +DECLARE_bool(use_io_uring); namespace brpc { @@ -417,12 +421,13 @@ void InputMessenger::OnNewMessagesFromRing(Socket *m) { InputMessageClosure last_msg; bool read_eof = false; while (!read_eof && m->buf_idx_ < m->in_bufs_.size()) { + LOG(INFO) << "m->buf_idx: " << m->buf_idx_ << ", m->in_bufs_.size(): " << m->in_bufs_.size(); const int64_t received_us = butil::cpuwide_time_us(); const int64_t base_realtime = butil::gettimeofday_us() - received_us; const ssize_t nr = m->CopyDataRead(); + LOG(INFO) << "CopyDataRead nr: " << nr << ", read_buf size: " << m->_read_buf.size(); - // TODO(zkl): EAGAIN should check buf_idx_ and return if no new data if (nr <= 0) { if (0 == nr) { // Set `read_eof' flag and proceed to feed EOF into `Protocol' @@ -440,6 +445,7 @@ void InputMessenger::OnNewMessagesFromRing(Socket *m) { m->ClearInboundBuf(); return; } + continue; } } @@ -548,7 +554,11 @@ int InputMessenger::Create(const butil::EndPoint& remote_side, SocketOptions options; options.remote_side = remote_side; options.user = this; +#ifdef IO_URING_ENABLED + options.on_edge_triggered_events = FLAGS_use_io_uring ? OnNewMessagesFromRing : OnNewMessages; +#else options.on_edge_triggered_events = OnNewMessages; +#endif options.health_check_interval_s = health_check_interval_s; if (FLAGS_socket_keepalive) { options.keepalive_options = std::make_shared(); @@ -572,7 +582,12 @@ int InputMessenger::Create(SocketOptions options, SocketId* id) { #else { #endif +#ifdef IO_URING_ENABLED + LOG(INFO) << "InputMessenger::Create, " << boost::stacktrace::stacktrace(); + options.on_edge_triggered_events = FLAGS_use_io_uring ? OnNewMessagesFromRing : OnNewMessages; +#else options.on_edge_triggered_events = OnNewMessages; +#endif } // Enable keepalive by options or Gflag. // Priority: options > Gflag. From de3d7659ef5a673aa2775b1c54befd03f95bf5e8 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Fri, 16 Jan 2026 19:11:29 +0800 Subject: [PATCH 06/14] feat: add memory-BIO TLS support for io_uring - introduce TLS ring helpers (EnsureTlsSessionForRing, ContinueTlsHandshake, ProcessTlsRingData, HandleTlsRingRead) to drive handshake/decrypt via io_uring - add detection cache so TLS is identified before _read_buf is touched; feed ciphertext from ring, flush handshake output, and return EAGAIN while waiting - refactor CopyDataRead to use the new helpers, handle EAGAIN correctly, and add detailed logging for TLS handshake/flush events --- src/brpc/event_dispatcher_epoll.cpp | 4 ++ src/brpc/socket.cpp | 69 ++++++++++++++++++++++++++--- 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/src/brpc/event_dispatcher_epoll.cpp b/src/brpc/event_dispatcher_epoll.cpp index a6650d0f..4bd10ccf 100644 --- a/src/brpc/event_dispatcher_epoll.cpp +++ b/src/brpc/event_dispatcher_epoll.cpp @@ -23,6 +23,7 @@ #include "bthread/task_control.h" #include "bthread/task_group.h" #include +#include extern "C" { extern void bthread_flush(); @@ -145,6 +146,7 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) { errno = EINVAL; return -1; } + LOG(INFO) << "AddEpollOut fd: " << fd << boost::stacktrace::stacktrace(); epoll_event evt; evt.data.u64 = socket_id; @@ -169,6 +171,7 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) { int EventDispatcher::RemoveEpollOut(SocketId socket_id, int fd, bool pollin) { + LOG(INFO) << "RemoveEpollOut fd: " << fd << ", pollin: " << pollin << boost::stacktrace::stacktrace(); if (pollin) { epoll_event evt; evt.data.u64 = socket_id; @@ -210,6 +213,7 @@ int EventDispatcher::RemoveConsumer(int fd) { // epoll_wait will keep returning events of the fd continuously, making // program abnormal. if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) { + LOG(INFO) << "RemoveConsumer fd: " << fd << " stack trace: " << boost::stacktrace::stacktrace(); PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd; return -1; } diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 254624ca..3e3bd77c 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -704,10 +704,13 @@ int Socket::AddMemoryBIO(int fd) { } int Socket::FeedTlsCiphertext(const void* data, size_t len) { + LOG(INFO) << "FeedTlsCiphertext, len: " << len; if (!_tls_ring_ctx || !_tls_ring_ctx->mem_rbio || len == 0) { + LOG(INFO) << "return 0"; return 0; } const int nw = BIO_write(_tls_ring_ctx->mem_rbio, data, len); + LOG(INFO) << "nw: " << nw; if (nw <= 0) { return -1; } @@ -733,7 +736,9 @@ int Socket::DrainTlsCiphertext() { } int Socket::FlushTlsCiphertext() { + LOG(INFO) << "FlushTlsCiphertext"; if (!_tls_ring_ctx) { + LOG(INFO) << "return 0"; return 0; } ssize_t total = 0; @@ -747,6 +752,7 @@ int Socket::FlushTlsCiphertext() { if (rc < 0) { errno = -rc; iovecs_.clear(); + LOG(INFO) << "return -1"; return -1; } total += rc; @@ -754,10 +760,12 @@ int Socket::FlushTlsCiphertext() { iovecs_.clear(); LOG(INFO) << "Socket=" << *this << " flushed " << rc << " bytes TLS ciphertext via io_uring"; } + LOG(INFO) << "return total: " << total; return total; } ssize_t Socket::ConsumeTlsPlaintext(size_t size_hint) { + LOG(INFO) << "ConsumeTlsPlaintext, size_hint: " << size_hint; if (!_tls_ring_ctx || !_tls_ring_ctx->mem_rbio) { return 0; } @@ -789,6 +797,7 @@ int Socket::EnsureTlsSessionForRing() { return -1; } if (_ssl_session == NULL) { + LOG(INFO) << "Socket=" << *this << " creating TLS session"; _ssl_session = CreateSSLSession(_ssl_ctx->raw_ctx, id(), fd(), true); if (_ssl_session == NULL) { LOG(ERROR) << "Socket=" << *this << " failed to create SSL session"; @@ -796,11 +805,15 @@ int Socket::EnsureTlsSessionForRing() { return -1; } SSL_set_accept_state(_ssl_session); + } else { + LOG(INFO) << "Socket=" << *this << " reusing existing TLS session"; } if (AddMemoryBIO(fd()) != 0) { + LOG(ERROR) << "Socket=" << *this << " failed to bind memory BIO"; errno = ESSL; return -1; } + LOG(INFO) << "Socket=" << *this << " initialized TLS session for io_uring"; return 0; } @@ -808,6 +821,7 @@ int Socket::ContinueTlsHandshake() { // Drive SSL handshake until it completes or requires more data. Any // handshake output is drained and submitted via io_uring. while (_ssl_state == SSL_CONNECTING) { + LOG(INFO) << "ContinueTlsHandshake _ssl_state: " << int(_ssl_state); ERR_clear_error(); int rc = SSL_do_handshake(_ssl_session); if (rc == 1) { @@ -815,7 +829,19 @@ int Socket::ContinueTlsHandshake() { break; } const int ssl_err = SSL_get_error(_ssl_session, rc); + LOG(INFO) << "ssl_err: " << int(ssl_err); if (ssl_err == SSL_ERROR_WANT_READ) { + LOG(INFO) << "SSL_ERROR_WANT_READ"; + DrainTlsCiphertext(); + if (!_tls_ring_ctx->pending_cipher_out.empty()) { + if (FlushTlsCiphertext() < 0) { + LOG(WARNING) << "Socket=" << *this + << " failed to flush TLS handshake after WANT_READ: " + << berror(errno); + return -1; + } + LOG(INFO) << "Socket=" << *this << " flushed TLS handshake data after WANT_READ"; + } break; } else if (ssl_err == SSL_ERROR_WANT_WRITE) { DrainTlsCiphertext(); @@ -824,6 +850,7 @@ int Socket::ContinueTlsHandshake() { << " failed to flush TLS handshake: " << berror(errno); return -1; } + LOG(INFO) << "Socket=" << *this << " flushed TLS handshake data via io_uring"; } else { LOG(WARNING) << "Socket=" << *this << " TLS handshake error: " << SSLError(ERR_get_error()); @@ -838,17 +865,24 @@ ssize_t Socket::ProcessTlsRingData(const char* data, size_t len) { // Feed ciphertext into the SSL read BIO and, if handshake is still in // progress, continue driving it. When fully connected, decrypt plaintext // into _read_buf. + LOG(INFO) << "Socket=" << *this << ", ProcessTlsRingData, len: " << len; if (len > 0) { if (FeedTlsCiphertext(data, len) < 0) { errno = ESSL; return -1; } } + LOG(INFO) << "_ssl_state: " << int(_ssl_state); if (_ssl_state == SSL_CONNECTING) { + LOG(INFO) << "SSL_CONNECTING, ContinueTlsHandshake"; if (ContinueTlsHandshake() != 0) { return -1; } + if (_ssl_state == SSL_CONNECTED) { + LOG(INFO) << "Socket=" << *this << " TLS handshake completed"; + } } + LOG(INFO) << "_ssl_state: " << int(_ssl_state); if (_ssl_state != SSL_CONNECTED) { return 0; } @@ -877,11 +911,13 @@ ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { // using TLS. The first handshake record needs at least 6 bytes. _tls_detect_buf.append(data, len); if (_tls_detect_buf.length() < 6) { - return len; + errno = EAGAIN; + return -errno; } char header[6]; _tls_detect_buf.copy_to(header, 6); SSLState s = DetectSSLStateInBuffer(header, 6); + LOG(INFO) << "Socket=" << *this << " DetectSSLStateInBuffer, ssl state: " << int(s); if (s == SSL_CONNECTING) { if (EnsureTlsSessionForRing() != 0) { errno = ESSL; @@ -892,7 +928,14 @@ ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { cached.resize(_tls_detect_buf.size()); _tls_detect_buf.copy_to(cached.data(), cached.size()); _tls_detect_buf.clear(); - return ProcessTlsRingData(cached.data(), cached.size()); + LOG(INFO) << "Socket=" << *this << " detected TLS over io_uring (cached=" + << cached.size() << " bytes)"; + ssize_t plain = ProcessTlsRingData(cached.data(), cached.size()); + if (plain == 0) { + errno = EAGAIN; + return -errno; + } + return plain; } else { _ssl_state = SSL_OFF; _read_buf.append(_tls_detect_buf); @@ -902,14 +945,23 @@ ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { errno = ESSL; return -1; } + LOG(INFO) << "Socket=" << *this << " treated as non-TLS over io_uring," + << " appended " << plain << " bytes"; return plain; } } if (_ssl_state == SSL_CONNECTING || _ssl_state == SSL_CONNECTED) { // Once we know the connection is TLS, all incoming ciphertext goes // through ProcessTlsRingData. - return ProcessTlsRingData(data, len); + LOG(INFO) << "Socket=" << *this << " processing TLS ring data len=" << len; + ssize_t plain = ProcessTlsRingData(data, len); + if (plain == 0) { + errno = EAGAIN; + return -errno; + } + return plain; } + LOG(INFO) << "Socket=" << *this << " treating ring data as plaintext len=" << len; _read_buf.append(data, len); return len; } @@ -967,10 +1019,12 @@ int Socket::ResetFileDescriptor(int fd, size_t bound_gid) { EnableKeepaliveIfNeeded(fd); + LOG(INFO) << "ResetFileDescriptor" << boost::stacktrace::stacktrace(); if (_on_edge_triggered_events) { #ifdef IO_URING_ENABLED if (FLAGS_use_io_uring && _on_edge_triggered_events == InputMessenger::OnNewMessagesFromRing) { + LOG(INFO) << "io_uring SocketRegister"; bthread_attr_t attr; attr = BTHREAD_ATTR_NORMAL; @@ -990,6 +1044,7 @@ int Socket::ResetFileDescriptor(int fd, size_t bound_gid) { } } else { #endif + LOG(INFO) << "GetGlobalEventDispatcher(fd).AddConsumer"; if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) { PLOG(ERROR) << "Fail to add SocketId=" << id() << " into EventDispatcher"; @@ -2704,8 +2759,9 @@ int Socket::SSLHandshake(int fd, bool server_mode) { // as it may confuse the origin event processing code. while (true) { ERR_clear_error(); - LOG(INFO) << "SSLHandshake: " << *this << ", " << boost::stacktrace::stacktrace(); int rc = SSL_do_handshake(_ssl_session); + LOG(INFO) << "SSLHandshake: " << *this << ", " << boost::stacktrace::stacktrace() + << ", rc: " << rc; if (rc == 1) { _ssl_state = SSL_CONNECTED; #ifdef IO_URING_ENABLED @@ -2722,6 +2778,7 @@ int Socket::SSLHandshake(int fd, bool server_mode) { } int ssl_error = SSL_get_error(_ssl_session, rc); + LOG(INFO) << "ssl_error: " << ssl_error; switch (ssl_error) { case SSL_ERROR_WANT_READ: #if defined(OS_LINUX) @@ -3773,6 +3830,7 @@ void Socket::NotifyWaitingNonFixedWrite(int nw) { } int Socket::CopyDataRead() { + LOG(INFO) << "socket: " << *this << ", CopyDataRead"; bthread::TaskGroup *cur_group = bound_g_; CHECK(static_cast(buf_idx_) < in_bufs_.size()); auto &rbuf = in_bufs_[buf_idx_]; @@ -3781,8 +3839,9 @@ int Socket::CopyDataRead() { if (rbuf.bytes_ > 0) { const char *buf_head = cur_group->GetRingReadBuf(rbuf.buf_id_); #ifdef IO_URING_ENABLED - if (FLAGS_use_io_uring && FLAGS_enable_ssl_io_uring) { + if (FLAGS_use_io_uring && FLAGS_enable_ssl_io_uring && _ssl_state != SSL_OFF) { ssize_t plain = HandleTlsRingRead(buf_head, rbuf.bytes_); + LOG(INFO) << "HandleTlsRingRead, plain: " << plain; if (plain < 0) { ret = -errno; goto END; From e1abadd8a34cee7d448adf509ed05f75031bb3f3 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 19 Jan 2026 18:51:34 +0800 Subject: [PATCH 07/14] EpollThread::start use pthread --- src/bthread/fd.cpp | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/bthread/fd.cpp b/src/bthread/fd.cpp index f26cbd07..968ee5a8 100644 --- a/src/bthread/fd.cpp +++ b/src/bthread/fd.cpp @@ -26,6 +26,8 @@ #include // struct kevent #include // kevent(), kqueue() #endif +#include + #include "butil/atomicops.h" #include "butil/time.h" #include "butil/fd_utility.h" // make_non_blocking @@ -39,6 +41,9 @@ namespace bthread { +DEFINE_bool(use_pthread_epoll_thread, true, + "Use separate pthreads as event dispatcher to do epoll."); + extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; template @@ -136,6 +141,15 @@ class EpollThread { PLOG(FATAL) << "Fail to epoll_create/kqueue"; return -1; } + LOG(INFO) << "Start epoll thread, run_this" << boost::stacktrace::stacktrace(); + + if (FLAGS_use_pthread_epoll_thread) { + LOG(INFO) << "Start pthread epoll thread"; + _thd = std::thread(EpollThread::run_this, this); + _tid = 1; + return 0; + } + LOG(INFO) << "Start bthread epoll thread"; if (bthread_start_background( &_tid, NULL, EpollThread::run_this, this) != 0) { close(_epfd); @@ -185,10 +199,15 @@ class EpollThread { return -1; } - const int rc = bthread_join(_tid, NULL); - if (rc) { - LOG(FATAL) << "Fail to join EpollThread, " << berror(rc); - return -1; + if (FLAGS_use_pthread_epoll_thread) { + _thd.join(); + _tid = 0; + } else { + const int rc = bthread_join(_tid, NULL); + if (rc) { + LOG(FATAL) << "Fail to join EpollThread, " << berror(rc); + return -1; + } } close(closing_epoll_pipe[0]); close(closing_epoll_pipe[1]); @@ -392,6 +411,7 @@ class EpollThread { int _epfd; bool _stop; + std::thread _thd; bthread_t _tid; butil::Mutex _start_mutex; }; From 1999ddbb0b59e40a21980163256d314c122a5912 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 19 Jan 2026 18:51:57 +0800 Subject: [PATCH 08/14] add debug logs --- src/brpc/socket.cpp | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 3e3bd77c..e22aa5d0 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -721,7 +721,7 @@ int Socket::DrainTlsCiphertext() { if (!_tls_ring_ctx || !_tls_ring_ctx->mem_wbio) { return 0; } - char buf[16 * 1024]; + char buf[4 * 1024]; int total = 0; while (BIO_pending(_tls_ring_ctx->mem_wbio) > 0) { int rc = BIO_read(_tls_ring_ctx->mem_wbio, buf, sizeof(buf)); @@ -749,6 +749,7 @@ int Socket::FlushTlsCiphertext() { } bthread::TaskGroup* g = bthread::TaskGroup::VolatileTLSTaskGroup(); int rc = g->SocketWaitingNonFixedWrite(this); + LOG(INFO) << "SocketWaitingNonFixedWrite, return: " << rc; if (rc < 0) { errno = -rc; iovecs_.clear(); @@ -767,6 +768,7 @@ int Socket::FlushTlsCiphertext() { ssize_t Socket::ConsumeTlsPlaintext(size_t size_hint) { LOG(INFO) << "ConsumeTlsPlaintext, size_hint: " << size_hint; if (!_tls_ring_ctx || !_tls_ring_ctx->mem_rbio) { + LOG(INFO) << "ConsumeTlsPlaintext, !_tls_ring_ctx || !_tls_ring_ctx->mem_rbio, return 0"; return 0; } ssize_t total = 0; @@ -777,14 +779,18 @@ ssize_t Socket::ConsumeTlsPlaintext(size_t size_hint) { if (rc <= 0) { int ssl_err = SSL_get_error(_ssl_session, rc); if (ssl_err == SSL_ERROR_WANT_READ) { + LOG(INFO) << "ConsumeTlsPlaintext SSL_read, rc: " << rc + << ", ssl_err: " << int(ssl_err); break; } errno = (ssl_err == SSL_ERROR_ZERO_RETURN) ? ECONNRESET : ESSL; + LOG(INFO) << "ConsumeTlsPlaintext, errno: " << int(errno) << ", return -1"; return -1; } _read_buf.append(buf, rc); total += rc; } + LOG(INFO) << "ConsumeTlsPlaintext, return total: " << total; return total; } @@ -825,11 +831,12 @@ int Socket::ContinueTlsHandshake() { ERR_clear_error(); int rc = SSL_do_handshake(_ssl_session); if (rc == 1) { + LOG(INFO) << "SSL_do_handshake return: " << rc << ", ssl_err: " << int(SSL_get_error(_ssl_session, rc)); _ssl_state = SSL_CONNECTED; break; } const int ssl_err = SSL_get_error(_ssl_session, rc); - LOG(INFO) << "ssl_err: " << int(ssl_err); + LOG(INFO) << "SSL_do_handshake return: " << rc << ", ssl_err: " << int(ssl_err); if (ssl_err == SSL_ERROR_WANT_READ) { LOG(INFO) << "SSL_ERROR_WANT_READ"; DrainTlsCiphertext(); @@ -874,7 +881,7 @@ ssize_t Socket::ProcessTlsRingData(const char* data, size_t len) { } LOG(INFO) << "_ssl_state: " << int(_ssl_state); if (_ssl_state == SSL_CONNECTING) { - LOG(INFO) << "SSL_CONNECTING, ContinueTlsHandshake"; + LOG(INFO) << "ProcessTlsRingData SSL_CONNECTING, ContinueTlsHandshake"; if (ContinueTlsHandshake() != 0) { return -1; } @@ -882,11 +889,13 @@ ssize_t Socket::ProcessTlsRingData(const char* data, size_t len) { LOG(INFO) << "Socket=" << *this << " TLS handshake completed"; } } - LOG(INFO) << "_ssl_state: " << int(_ssl_state); + LOG(INFO) << "ProcessTlsRingData _ssl_state: " << int(_ssl_state); if (_ssl_state != SSL_CONNECTED) { + LOG(INFO) << "ProcessTlsRingData, _ssl_state != SSL_CONNECTED, return 0, _ssl_state: " << int(_ssl_state); return 0; } ssize_t plain = ConsumeTlsPlaintext(len); + LOG(INFO) << "ProcessTlsRingData ConsumeTlsPlaintext result: " << plain; return plain < 0 ? -1 : plain; } @@ -911,6 +920,7 @@ ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { // using TLS. The first handshake record needs at least 6 bytes. _tls_detect_buf.append(data, len); if (_tls_detect_buf.length() < 6) { + LOG(INFO) << "HandleTlsRingRead _tls_detect_buf.length() < 6, return eagain"; errno = EAGAIN; return -errno; } @@ -921,6 +931,7 @@ ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { if (s == SSL_CONNECTING) { if (EnsureTlsSessionForRing() != 0) { errno = ESSL; + LOG(INFO) << "HandleTlsRingRead, return -1"; return -1; } _ssl_state = SSL_CONNECTING; @@ -932,6 +943,7 @@ ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { << cached.size() << " bytes)"; ssize_t plain = ProcessTlsRingData(cached.data(), cached.size()); if (plain == 0) { + LOG(INFO) << "HandleTlsRingRead ProcessTlsRingData 0, return EAGAIN"; errno = EAGAIN; return -errno; } @@ -956,6 +968,7 @@ ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { LOG(INFO) << "Socket=" << *this << " processing TLS ring data len=" << len; ssize_t plain = ProcessTlsRingData(data, len); if (plain == 0) { + LOG(INFO) << "HandleTlsRingRead ProcessTlsRingData 0, return EAGAIN"; errno = EAGAIN; return -errno; } @@ -2028,6 +2041,7 @@ int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest* req) { void Socket::AfterAppConnected(int err, void* data) { WriteRequest* req = static_cast(data); + LOG(INFO) << "socket: " << *req->socket << ", AfterAppConnected, keep write"; if (err == 0) { Socket* const s = req->socket; SharedPart* sp = s->GetSharedPart(); @@ -2098,9 +2112,12 @@ void Socket::CheckConnectedAndKeepWrite(int fd, int err, void* data) { CHECK_GE(sockfd, 0); if (err == 0 && s->CheckConnected(sockfd) == 0 && s->ResetFileDescriptor(sockfd) == 0) { + LOG(INFO) << "socket: " << *s << ", CheckConnectedAndKeepWrite 1"; if (s->_app_connect) { + LOG(INFO) << "socket: " << *s << ", CheckConnectedAndKeepWrite s->_app_connect->StartConnect"; s->_app_connect->StartConnect(req->socket, AfterAppConnected, req); } else { + LOG(INFO) << "socket: " << *s << ", CheckConnectedAndKeepWrite AfterAppConnected"; // Successfully created a connection AfterAppConnected(0, req); } @@ -2110,6 +2127,7 @@ void Socket::CheckConnectedAndKeepWrite(int fd, int err, void* data) { if (err == 0) { err = errno ? errno : -1; } + LOG(INFO) << "socket: " << *s << ", CheckConnectedAndKeepWrite else AfterAppConnected"; AfterAppConnected(err, req); } } From e68f77e86a0b5084c1ed28ef00efc3f88e9e58d0 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 19 Jan 2026 19:50:24 +0800 Subject: [PATCH 09/14] fix compilation --- src/brpc/socket.cpp | 19 ------------------- src/brpc/socket.h | 29 ++++++++++++++++++++++------- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index e22aa5d0..2f49ef4c 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -363,25 +363,6 @@ struct RegisteredRingBuffer { }; #endif -#ifdef IO_URING_ENABLED -struct Socket::TlsRingContext { - TlsRingContext() - : mem_rbio(NULL) - , mem_wbio(NULL) {} - - ~TlsRingContext() { - // Actual BIO lifetime is now owned by SSL after SSL_set_bio. - mem_rbio = NULL; - mem_wbio = NULL; - } - - BIO* mem_rbio; - BIO* mem_wbio; - // Ciphertext drained from memory BIO waiting to be submitted via io_uring. - butil::IOBuf pending_cipher_out; -}; -#endif - struct BAIDU_CACHELINE_ALIGNMENT Socket::WriteRequest { static WriteRequest* const UNCONNECTED; diff --git a/src/brpc/socket.h b/src/brpc/socket.h index a7de64b6..67df1eb1 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -1003,13 +1003,6 @@ friend void DereferenceSocket(Socket*); // Storing data that are not flushed into `fd' yet. butil::atomic _write_head; -#ifdef IO_URING_ENABLED - butil::IOBuf _tls_detect_buf; -#endif -#ifdef IO_URING_ENABLED - std::unique_ptr _tls_ring_ctx; - bool _tls_uses_ring{}; -#endif butil::Mutex _stream_mutex; std::set *_stream_set; @@ -1023,6 +1016,24 @@ friend void DereferenceSocket(Socket*); std::shared_ptr _keepalive_options; // These fields are only for io_uring build. + + struct TlsRingContext { + TlsRingContext() + : mem_rbio(NULL) + , mem_wbio(NULL) {} + + ~TlsRingContext() { + // Actual BIO lifetime is now owned by SSL after SSL_set_bio. + mem_rbio = NULL; + mem_wbio = NULL; + } + + BIO* mem_rbio; + BIO* mem_wbio; + // Ciphertext drained from memory BIO waiting to be submitted via io_uring. + butil::IOBuf pending_cipher_out; + }; + WriteRequest *io_uring_write_req_{nullptr}; std::vector iovecs_; int32_t keep_write_nw_; @@ -1039,6 +1050,10 @@ friend void DereferenceSocket(Socket*); int reg_fd_{-1}; uint16_t recv_num_{0}; + butil::IOBuf _tls_detect_buf; + std::unique_ptr _tls_ring_ctx; + bool _tls_uses_ring{}; + #ifdef IO_URING_ENABLED friend class ::RingListener; #endif From 3b4de7e74ee418bad6f00c8e1a10096f5a59933b Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Tue, 20 Jan 2026 15:14:33 +0800 Subject: [PATCH 10/14] support ssl iouring in redis protocol --- src/brpc/policy/redis_protocol.cpp | 3 ++- src/brpc/socket.cpp | 11 +++-------- src/brpc/socket.h | 1 - 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/src/brpc/policy/redis_protocol.cpp b/src/brpc/policy/redis_protocol.cpp index 2f58b7a7..b9706709 100644 --- a/src/brpc/policy/redis_protocol.cpp +++ b/src/brpc/policy/redis_protocol.cpp @@ -38,6 +38,7 @@ #include "bthread/ring_write_buf_pool.h" DECLARE_bool(use_io_uring); +DECLARE_bool(enable_ssl_io_uring); namespace bthread { extern BAIDU_THREAD_LOCAL TaskGroup *tls_task_group; @@ -173,7 +174,7 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, #ifdef IO_URING_ENABLED char *ring_buf = nullptr; uint16_t ring_buf_idx = UINT16_MAX; - if (FLAGS_use_io_uring) { + if (FLAGS_use_io_uring && !FLAGS_enable_ssl_io_uring) { std::tie(ring_buf, ring_buf_idx) = cur_group->GetRingWriteBuf(); if (ring_buf_idx != UINT16_MAX) { appender.set_ring_buffer(ring_buf, RingWriteBufferPool::buf_length); diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 2f49ef4c..fe199153 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -537,10 +537,7 @@ Socket::Socket(Forbidden) , _unwritten_bytes(0) , _epollout_butex(NULL) , _write_head(NULL) -#ifdef IO_URING_ENABLED , _tls_ring_ctx() - , _tls_uses_ring(false) -#endif , _stream_set(NULL) , _total_streams_unconsumed_size(0) , _ninflight_app_health_check(0) @@ -641,13 +638,11 @@ int Socket::InitTlsRingContext(int /*fd*/) { LOG(ERROR) << "Fail to allocate TlsRingContext"; return -1; } - _tls_uses_ring = true; return 0; } void Socket::DestroyTlsRingContext() { _tls_ring_ctx.reset(); - _tls_uses_ring = false; } int Socket::AddMemoryBIO(int fd) { @@ -666,7 +661,7 @@ int Socket::AddMemoryBIO(int fd) { if (mem_wbio) { BIO_free(mem_wbio); } - LOG(ERROR) << "Fail to create memory BIO for TLS ring"; + LOG(ERROR) << "Fail to create memory BIO for TLS ring, socket: " << *this; return -1; } { @@ -676,8 +671,7 @@ int Socket::AddMemoryBIO(int fd) { BIO_flush(old_wbio); } SSL_set_bio(_ssl_session, mem_rbio, mem_wbio); - LOG_IF(INFO, FLAGS_enable_ssl_io_uring) - << "Socket=" << *this << " switched SSL BIO to memory (fd=" << fd << ")"; + LOG(INFO) << "Socket=" << *this << " switched SSL BIO to memory (fd=" << fd << ")"; } _tls_ring_ctx->mem_rbio = mem_rbio; _tls_ring_ctx->mem_wbio = mem_wbio; @@ -2430,6 +2424,7 @@ void* Socket::KeepWrite(void* void_arg) { s->ReturnSuccessfulWriteRequest(saved_req); } const ssize_t nw = s->DoWrite(req); + LOG(INFO) << "Socket: " << *s << " KeepWrite DoWrite nw: " << nw; if (nw < 0) { if (errno != EAGAIN && errno != EOVERCROWDED) { const int saved_errno = errno; diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 67df1eb1..211077b1 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -1052,7 +1052,6 @@ friend void DereferenceSocket(Socket*); butil::IOBuf _tls_detect_buf; std::unique_ptr _tls_ring_ctx; - bool _tls_uses_ring{}; #ifdef IO_URING_ENABLED friend class ::RingListener; From 0c4bcf9c28c53095e90ac4688f04db9ac737d035 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Tue, 20 Jan 2026 17:10:16 +0800 Subject: [PATCH 11/14] handle the error case that ssl_error is SSL_ERROR_SYSCALL and errno is zero --- src/brpc/socket.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index fe199153..cd9958bb 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -1796,6 +1796,7 @@ int Socket::WaitEpollOut(int fd, bool pollin, const timespec* abstime) { // health checker which called `SetFailed' before const int expected_val = _epollout_butex->load(butil::memory_order_relaxed); EventDispatcher& edisp = GetGlobalEventDispatcher(fd); + LOG(INFO) << "AddEpollOut socket: " << *this << boost::stacktrace::stacktrace(); if (edisp.AddEpollOut(id(), fd, pollin) != 0) { return -1; } @@ -1870,6 +1871,7 @@ int Socket::Connect(const timespec* abstime, // Add `sockfd' into epoll so that `HandleEpollOutRequest' will // be called with `req' when epoll event reaches + LOG(INFO) << "AddEpollOut socket: " << *this << boost::stacktrace::stacktrace(); if (GetGlobalEventDispatcher(sockfd). AddEpollOut(connect_id, sockfd, false) != 0) { const int saved_errno = errno; @@ -2659,6 +2661,10 @@ ssize_t Socket::DoWrite(WriteRequest* req) { } else { // System error with corresponding errno set PLOG(WARNING) << "Fail to write into ssl_fd=" << fd(); + if (ssl_error == SSL_ERROR_SYSCALL && errno == 0) { + // Connection is reset by remote. + errno = ECONNRESET; + } } break; } @@ -2882,6 +2888,10 @@ ssize_t Socket::DoRead(size_t size_hint) { } else { // System error with corresponding errno set PLOG(WARNING) << "Fail to read from ssl_fd=" << fd(); + if (ssl_error == SSL_ERROR_SYSCALL && errno == 0) { + // Connection is reset by remote. + errno = ECONNRESET; + } } break; } From b834bd872abfa9251fd943a445687ee2afab248f Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Tue, 20 Jan 2026 17:30:51 +0800 Subject: [PATCH 12/14] disable some debug logs --- src/brpc/event_dispatcher_epoll.cpp | 2 -- src/brpc/input_messenger.cpp | 6 +++--- src/bthread/fd.cpp | 6 +++--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/brpc/event_dispatcher_epoll.cpp b/src/brpc/event_dispatcher_epoll.cpp index 4bd10ccf..e9ee5a96 100644 --- a/src/brpc/event_dispatcher_epoll.cpp +++ b/src/brpc/event_dispatcher_epoll.cpp @@ -146,7 +146,6 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) { errno = EINVAL; return -1; } - LOG(INFO) << "AddEpollOut fd: " << fd << boost::stacktrace::stacktrace(); epoll_event evt; evt.data.u64 = socket_id; @@ -171,7 +170,6 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) { int EventDispatcher::RemoveEpollOut(SocketId socket_id, int fd, bool pollin) { - LOG(INFO) << "RemoveEpollOut fd: " << fd << ", pollin: " << pollin << boost::stacktrace::stacktrace(); if (pollin) { epoll_event evt; evt.data.u64 = socket_id; diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp index 64572ae7..f21bffff 100644 --- a/src/brpc/input_messenger.cpp +++ b/src/brpc/input_messenger.cpp @@ -421,12 +421,12 @@ void InputMessenger::OnNewMessagesFromRing(Socket *m) { InputMessageClosure last_msg; bool read_eof = false; while (!read_eof && m->buf_idx_ < m->in_bufs_.size()) { - LOG(INFO) << "m->buf_idx: " << m->buf_idx_ << ", m->in_bufs_.size(): " << m->in_bufs_.size(); + // LOG(INFO) << "m->buf_idx: " << m->buf_idx_ << ", m->in_bufs_.size(): " << m->in_bufs_.size(); const int64_t received_us = butil::cpuwide_time_us(); const int64_t base_realtime = butil::gettimeofday_us() - received_us; const ssize_t nr = m->CopyDataRead(); - LOG(INFO) << "CopyDataRead nr: " << nr << ", read_buf size: " << m->_read_buf.size(); + // LOG(INFO) << "CopyDataRead nr: " << nr << ", read_buf size: " << m->_read_buf.size(); if (nr <= 0) { if (0 == nr) { @@ -583,7 +583,7 @@ int InputMessenger::Create(SocketOptions options, SocketId* id) { { #endif #ifdef IO_URING_ENABLED - LOG(INFO) << "InputMessenger::Create, " << boost::stacktrace::stacktrace(); + // LOG(INFO) << "InputMessenger::Create, " << boost::stacktrace::stacktrace(); options.on_edge_triggered_events = FLAGS_use_io_uring ? OnNewMessagesFromRing : OnNewMessages; #else options.on_edge_triggered_events = OnNewMessages; diff --git a/src/bthread/fd.cpp b/src/bthread/fd.cpp index 968ee5a8..1f740730 100644 --- a/src/bthread/fd.cpp +++ b/src/bthread/fd.cpp @@ -141,15 +141,15 @@ class EpollThread { PLOG(FATAL) << "Fail to epoll_create/kqueue"; return -1; } - LOG(INFO) << "Start epoll thread, run_this" << boost::stacktrace::stacktrace(); + // LOG(INFO) << "Start epoll thread, run_this" << boost::stacktrace::stacktrace(); if (FLAGS_use_pthread_epoll_thread) { - LOG(INFO) << "Start pthread epoll thread"; + // LOG(INFO) << "Start pthread epoll thread"; _thd = std::thread(EpollThread::run_this, this); _tid = 1; return 0; } - LOG(INFO) << "Start bthread epoll thread"; + // LOG(INFO) << "Start bthread epoll thread"; if (bthread_start_background( &_tid, NULL, EpollThread::run_this, this) != 0) { close(_epfd); From e3529694490e09097f875884b3a5072cd04a8ca6 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Tue, 20 Jan 2026 18:38:12 +0800 Subject: [PATCH 13/14] disable some debug logs --- src/brpc/socket.cpp | 118 ++++++++++++++++++++++---------------------- 1 file changed, 59 insertions(+), 59 deletions(-) diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index cd9958bb..192c5419 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -671,7 +671,7 @@ int Socket::AddMemoryBIO(int fd) { BIO_flush(old_wbio); } SSL_set_bio(_ssl_session, mem_rbio, mem_wbio); - LOG(INFO) << "Socket=" << *this << " switched SSL BIO to memory (fd=" << fd << ")"; + // LOG(INFO) << "Socket=" << *this << " switched SSL BIO to memory (fd=" << fd << ")"; } _tls_ring_ctx->mem_rbio = mem_rbio; _tls_ring_ctx->mem_wbio = mem_wbio; @@ -679,13 +679,13 @@ int Socket::AddMemoryBIO(int fd) { } int Socket::FeedTlsCiphertext(const void* data, size_t len) { - LOG(INFO) << "FeedTlsCiphertext, len: " << len; + // LOG(INFO) << "FeedTlsCiphertext, len: " << len; if (!_tls_ring_ctx || !_tls_ring_ctx->mem_rbio || len == 0) { - LOG(INFO) << "return 0"; + // LOG(INFO) << "return 0"; return 0; } const int nw = BIO_write(_tls_ring_ctx->mem_rbio, data, len); - LOG(INFO) << "nw: " << nw; + // LOG(INFO) << "nw: " << nw; if (nw <= 0) { return -1; } @@ -706,14 +706,14 @@ int Socket::DrainTlsCiphertext() { _tls_ring_ctx->pending_cipher_out.append(buf, rc); total += rc; } - LOG_IF(INFO, total > 0) << "Socket=" << *this << " drained " << total << " bytes TLS ciphertext"; + // LOG_IF(INFO, total > 0) << "Socket=" << *this << " drained " << total << " bytes TLS ciphertext"; return total; } int Socket::FlushTlsCiphertext() { - LOG(INFO) << "FlushTlsCiphertext"; + // LOG(INFO) << "FlushTlsCiphertext"; if (!_tls_ring_ctx) { - LOG(INFO) << "return 0"; + // LOG(INFO) << "return 0"; return 0; } ssize_t total = 0; @@ -724,7 +724,7 @@ int Socket::FlushTlsCiphertext() { } bthread::TaskGroup* g = bthread::TaskGroup::VolatileTLSTaskGroup(); int rc = g->SocketWaitingNonFixedWrite(this); - LOG(INFO) << "SocketWaitingNonFixedWrite, return: " << rc; + // LOG(INFO) << "SocketWaitingNonFixedWrite, return: " << rc; if (rc < 0) { errno = -rc; iovecs_.clear(); @@ -734,16 +734,16 @@ int Socket::FlushTlsCiphertext() { total += rc; _tls_ring_ctx->pending_cipher_out.pop_front(rc); iovecs_.clear(); - LOG(INFO) << "Socket=" << *this << " flushed " << rc << " bytes TLS ciphertext via io_uring"; + // LOG(INFO) << "Socket=" << *this << " flushed " << rc << " bytes TLS ciphertext via io_uring"; } - LOG(INFO) << "return total: " << total; + // LOG(INFO) << "return total: " << total; return total; } ssize_t Socket::ConsumeTlsPlaintext(size_t size_hint) { - LOG(INFO) << "ConsumeTlsPlaintext, size_hint: " << size_hint; + // LOG(INFO) << "ConsumeTlsPlaintext, size_hint: " << size_hint; if (!_tls_ring_ctx || !_tls_ring_ctx->mem_rbio) { - LOG(INFO) << "ConsumeTlsPlaintext, !_tls_ring_ctx || !_tls_ring_ctx->mem_rbio, return 0"; + // LOG(INFO) << "ConsumeTlsPlaintext, !_tls_ring_ctx || !_tls_ring_ctx->mem_rbio, return 0"; return 0; } ssize_t total = 0; @@ -754,18 +754,18 @@ ssize_t Socket::ConsumeTlsPlaintext(size_t size_hint) { if (rc <= 0) { int ssl_err = SSL_get_error(_ssl_session, rc); if (ssl_err == SSL_ERROR_WANT_READ) { - LOG(INFO) << "ConsumeTlsPlaintext SSL_read, rc: " << rc - << ", ssl_err: " << int(ssl_err); + // LOG(INFO) << "ConsumeTlsPlaintext SSL_read, rc: " << rc + // << ", ssl_err: " << int(ssl_err); break; } errno = (ssl_err == SSL_ERROR_ZERO_RETURN) ? ECONNRESET : ESSL; - LOG(INFO) << "ConsumeTlsPlaintext, errno: " << int(errno) << ", return -1"; + // LOG(INFO) << "ConsumeTlsPlaintext, errno: " << int(errno) << ", return -1"; return -1; } _read_buf.append(buf, rc); total += rc; } - LOG(INFO) << "ConsumeTlsPlaintext, return total: " << total; + // LOG(INFO) << "ConsumeTlsPlaintext, return total: " << total; return total; } @@ -778,7 +778,7 @@ int Socket::EnsureTlsSessionForRing() { return -1; } if (_ssl_session == NULL) { - LOG(INFO) << "Socket=" << *this << " creating TLS session"; + // LOG(INFO) << "Socket=" << *this << " creating TLS session"; _ssl_session = CreateSSLSession(_ssl_ctx->raw_ctx, id(), fd(), true); if (_ssl_session == NULL) { LOG(ERROR) << "Socket=" << *this << " failed to create SSL session"; @@ -787,14 +787,14 @@ int Socket::EnsureTlsSessionForRing() { } SSL_set_accept_state(_ssl_session); } else { - LOG(INFO) << "Socket=" << *this << " reusing existing TLS session"; + // LOG(INFO) << "Socket=" << *this << " reusing existing TLS session"; } if (AddMemoryBIO(fd()) != 0) { LOG(ERROR) << "Socket=" << *this << " failed to bind memory BIO"; errno = ESSL; return -1; } - LOG(INFO) << "Socket=" << *this << " initialized TLS session for io_uring"; + // LOG(INFO) << "Socket=" << *this << " initialized TLS session for io_uring"; return 0; } @@ -802,18 +802,18 @@ int Socket::ContinueTlsHandshake() { // Drive SSL handshake until it completes or requires more data. Any // handshake output is drained and submitted via io_uring. while (_ssl_state == SSL_CONNECTING) { - LOG(INFO) << "ContinueTlsHandshake _ssl_state: " << int(_ssl_state); + // LOG(INFO) << "ContinueTlsHandshake _ssl_state: " << int(_ssl_state); ERR_clear_error(); int rc = SSL_do_handshake(_ssl_session); if (rc == 1) { - LOG(INFO) << "SSL_do_handshake return: " << rc << ", ssl_err: " << int(SSL_get_error(_ssl_session, rc)); + // LOG(INFO) << "SSL_do_handshake return: " << rc << ", ssl_err: " << int(SSL_get_error(_ssl_session, rc)); _ssl_state = SSL_CONNECTED; break; } const int ssl_err = SSL_get_error(_ssl_session, rc); - LOG(INFO) << "SSL_do_handshake return: " << rc << ", ssl_err: " << int(ssl_err); + // LOG(INFO) << "SSL_do_handshake return: " << rc << ", ssl_err: " << int(ssl_err); if (ssl_err == SSL_ERROR_WANT_READ) { - LOG(INFO) << "SSL_ERROR_WANT_READ"; + // LOG(INFO) << "SSL_ERROR_WANT_READ"; DrainTlsCiphertext(); if (!_tls_ring_ctx->pending_cipher_out.empty()) { if (FlushTlsCiphertext() < 0) { @@ -822,7 +822,7 @@ int Socket::ContinueTlsHandshake() { << berror(errno); return -1; } - LOG(INFO) << "Socket=" << *this << " flushed TLS handshake data after WANT_READ"; + // LOG(INFO) << "Socket=" << *this << " flushed TLS handshake data after WANT_READ"; } break; } else if (ssl_err == SSL_ERROR_WANT_WRITE) { @@ -832,7 +832,7 @@ int Socket::ContinueTlsHandshake() { << " failed to flush TLS handshake: " << berror(errno); return -1; } - LOG(INFO) << "Socket=" << *this << " flushed TLS handshake data via io_uring"; + // LOG(INFO) << "Socket=" << *this << " flushed TLS handshake data via io_uring"; } else { LOG(WARNING) << "Socket=" << *this << " TLS handshake error: " << SSLError(ERR_get_error()); @@ -847,30 +847,30 @@ ssize_t Socket::ProcessTlsRingData(const char* data, size_t len) { // Feed ciphertext into the SSL read BIO and, if handshake is still in // progress, continue driving it. When fully connected, decrypt plaintext // into _read_buf. - LOG(INFO) << "Socket=" << *this << ", ProcessTlsRingData, len: " << len; + // LOG(INFO) << "Socket=" << *this << ", ProcessTlsRingData, len: " << len; if (len > 0) { if (FeedTlsCiphertext(data, len) < 0) { errno = ESSL; return -1; } } - LOG(INFO) << "_ssl_state: " << int(_ssl_state); + // LOG(INFO) << "_ssl_state: " << int(_ssl_state); if (_ssl_state == SSL_CONNECTING) { - LOG(INFO) << "ProcessTlsRingData SSL_CONNECTING, ContinueTlsHandshake"; + // LOG(INFO) << "ProcessTlsRingData SSL_CONNECTING, ContinueTlsHandshake"; if (ContinueTlsHandshake() != 0) { return -1; } if (_ssl_state == SSL_CONNECTED) { - LOG(INFO) << "Socket=" << *this << " TLS handshake completed"; + // LOG(INFO) << "Socket=" << *this << " TLS handshake completed"; } } - LOG(INFO) << "ProcessTlsRingData _ssl_state: " << int(_ssl_state); + // LOG(INFO) << "ProcessTlsRingData _ssl_state: " << int(_ssl_state); if (_ssl_state != SSL_CONNECTED) { - LOG(INFO) << "ProcessTlsRingData, _ssl_state != SSL_CONNECTED, return 0, _ssl_state: " << int(_ssl_state); + // LOG(INFO) << "ProcessTlsRingData, _ssl_state != SSL_CONNECTED, return 0, _ssl_state: " << int(_ssl_state); return 0; } ssize_t plain = ConsumeTlsPlaintext(len); - LOG(INFO) << "ProcessTlsRingData ConsumeTlsPlaintext result: " << plain; + // LOG(INFO) << "ProcessTlsRingData ConsumeTlsPlaintext result: " << plain; return plain < 0 ? -1 : plain; } @@ -902,7 +902,7 @@ ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { char header[6]; _tls_detect_buf.copy_to(header, 6); SSLState s = DetectSSLStateInBuffer(header, 6); - LOG(INFO) << "Socket=" << *this << " DetectSSLStateInBuffer, ssl state: " << int(s); + // LOG(INFO) << "Socket=" << *this << " DetectSSLStateInBuffer, ssl state: " << int(s); if (s == SSL_CONNECTING) { if (EnsureTlsSessionForRing() != 0) { errno = ESSL; @@ -914,11 +914,11 @@ ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { cached.resize(_tls_detect_buf.size()); _tls_detect_buf.copy_to(cached.data(), cached.size()); _tls_detect_buf.clear(); - LOG(INFO) << "Socket=" << *this << " detected TLS over io_uring (cached=" - << cached.size() << " bytes)"; + // LOG(INFO) << "Socket=" << *this << " detected TLS over io_uring (cached=" + // << cached.size() << " bytes)"; ssize_t plain = ProcessTlsRingData(cached.data(), cached.size()); if (plain == 0) { - LOG(INFO) << "HandleTlsRingRead ProcessTlsRingData 0, return EAGAIN"; + // LOG(INFO) << "HandleTlsRingRead ProcessTlsRingData 0, return EAGAIN"; errno = EAGAIN; return -errno; } @@ -932,24 +932,24 @@ ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { errno = ESSL; return -1; } - LOG(INFO) << "Socket=" << *this << " treated as non-TLS over io_uring," - << " appended " << plain << " bytes"; + // LOG(INFO) << "Socket=" << *this << " treated as non-TLS over io_uring," + // << " appended " << plain << " bytes"; return plain; } } if (_ssl_state == SSL_CONNECTING || _ssl_state == SSL_CONNECTED) { // Once we know the connection is TLS, all incoming ciphertext goes // through ProcessTlsRingData. - LOG(INFO) << "Socket=" << *this << " processing TLS ring data len=" << len; + // LOG(INFO) << "Socket=" << *this << " processing TLS ring data len=" << len; ssize_t plain = ProcessTlsRingData(data, len); if (plain == 0) { - LOG(INFO) << "HandleTlsRingRead ProcessTlsRingData 0, return EAGAIN"; + // LOG(INFO) << "HandleTlsRingRead ProcessTlsRingData 0, return EAGAIN"; errno = EAGAIN; return -errno; } return plain; } - LOG(INFO) << "Socket=" << *this << " treating ring data as plaintext len=" << len; + // LOG(INFO) << "Socket=" << *this << " treating ring data as plaintext len=" << len; _read_buf.append(data, len); return len; } @@ -1007,12 +1007,12 @@ int Socket::ResetFileDescriptor(int fd, size_t bound_gid) { EnableKeepaliveIfNeeded(fd); - LOG(INFO) << "ResetFileDescriptor" << boost::stacktrace::stacktrace(); + // LOG(INFO) << "ResetFileDescriptor" << boost::stacktrace::stacktrace(); if (_on_edge_triggered_events) { #ifdef IO_URING_ENABLED if (FLAGS_use_io_uring && _on_edge_triggered_events == InputMessenger::OnNewMessagesFromRing) { - LOG(INFO) << "io_uring SocketRegister"; + // LOG(INFO) << "io_uring SocketRegister"; bthread_attr_t attr; attr = BTHREAD_ATTR_NORMAL; @@ -1032,7 +1032,7 @@ int Socket::ResetFileDescriptor(int fd, size_t bound_gid) { } } else { #endif - LOG(INFO) << "GetGlobalEventDispatcher(fd).AddConsumer"; + // LOG(INFO) << "GetGlobalEventDispatcher(fd).AddConsumer"; if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) { PLOG(ERROR) << "Fail to add SocketId=" << id() << " into EventDispatcher"; @@ -1796,7 +1796,7 @@ int Socket::WaitEpollOut(int fd, bool pollin, const timespec* abstime) { // health checker which called `SetFailed' before const int expected_val = _epollout_butex->load(butil::memory_order_relaxed); EventDispatcher& edisp = GetGlobalEventDispatcher(fd); - LOG(INFO) << "AddEpollOut socket: " << *this << boost::stacktrace::stacktrace(); + // LOG(INFO) << "AddEpollOut socket: " << *this << boost::stacktrace::stacktrace(); if (edisp.AddEpollOut(id(), fd, pollin) != 0) { return -1; } @@ -1871,7 +1871,7 @@ int Socket::Connect(const timespec* abstime, // Add `sockfd' into epoll so that `HandleEpollOutRequest' will // be called with `req' when epoll event reaches - LOG(INFO) << "AddEpollOut socket: " << *this << boost::stacktrace::stacktrace(); + // LOG(INFO) << "AddEpollOut socket: " << *this << boost::stacktrace::stacktrace(); if (GetGlobalEventDispatcher(sockfd). AddEpollOut(connect_id, sockfd, false) != 0) { const int saved_errno = errno; @@ -2018,7 +2018,7 @@ int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest* req) { void Socket::AfterAppConnected(int err, void* data) { WriteRequest* req = static_cast(data); - LOG(INFO) << "socket: " << *req->socket << ", AfterAppConnected, keep write"; + // LOG(INFO) << "socket: " << *req->socket << ", AfterAppConnected, keep write"; if (err == 0) { Socket* const s = req->socket; SharedPart* sp = s->GetSharedPart(); @@ -2089,12 +2089,12 @@ void Socket::CheckConnectedAndKeepWrite(int fd, int err, void* data) { CHECK_GE(sockfd, 0); if (err == 0 && s->CheckConnected(sockfd) == 0 && s->ResetFileDescriptor(sockfd) == 0) { - LOG(INFO) << "socket: " << *s << ", CheckConnectedAndKeepWrite 1"; + // LOG(INFO) << "socket: " << *s << ", CheckConnectedAndKeepWrite 1"; if (s->_app_connect) { - LOG(INFO) << "socket: " << *s << ", CheckConnectedAndKeepWrite s->_app_connect->StartConnect"; + // LOG(INFO) << "socket: " << *s << ", CheckConnectedAndKeepWrite s->_app_connect->StartConnect"; s->_app_connect->StartConnect(req->socket, AfterAppConnected, req); } else { - LOG(INFO) << "socket: " << *s << ", CheckConnectedAndKeepWrite AfterAppConnected"; + // LOG(INFO) << "socket: " << *s << ", CheckConnectedAndKeepWrite AfterAppConnected"; // Successfully created a connection AfterAppConnected(0, req); } @@ -2104,7 +2104,7 @@ void Socket::CheckConnectedAndKeepWrite(int fd, int err, void* data) { if (err == 0) { err = errno ? errno : -1; } - LOG(INFO) << "socket: " << *s << ", CheckConnectedAndKeepWrite else AfterAppConnected"; + // LOG(INFO) << "socket: " << *s << ", CheckConnectedAndKeepWrite else AfterAppConnected"; AfterAppConnected(err, req); } } @@ -2426,7 +2426,7 @@ void* Socket::KeepWrite(void* void_arg) { s->ReturnSuccessfulWriteRequest(saved_req); } const ssize_t nw = s->DoWrite(req); - LOG(INFO) << "Socket: " << *s << " KeepWrite DoWrite nw: " << nw; + // LOG(INFO) << "Socket: " << *s << " KeepWrite DoWrite nw: " << nw; if (nw < 0) { if (errno != EAGAIN && errno != EOVERCROWDED) { const int saved_errno = errno; @@ -2699,7 +2699,7 @@ ssize_t Socket::DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], siz << berror(errno); return -1; } - LOG(INFO) << "Socket=" << *this << " TLS WANT_WRITE resolved, retrying"; + // LOG(INFO) << "Socket=" << *this << " TLS WANT_WRITE resolved, retrying"; continue; } @@ -2720,7 +2720,7 @@ ssize_t Socket::DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], siz << berror(errno); return -1; } - LOG(INFO) << "Socket=" << *this << " wrote TLS plaintext=" << total_plain; + // LOG(INFO) << "Socket=" << *this << " wrote TLS plaintext=" << total_plain; return total_plain; } @@ -2760,8 +2760,8 @@ int Socket::SSLHandshake(int fd, bool server_mode) { while (true) { ERR_clear_error(); int rc = SSL_do_handshake(_ssl_session); - LOG(INFO) << "SSLHandshake: " << *this << ", " << boost::stacktrace::stacktrace() - << ", rc: " << rc; + // LOG(INFO) << "SSLHandshake: " << *this << ", " << boost::stacktrace::stacktrace() + // << ", rc: " << rc; if (rc == 1) { _ssl_state = SSL_CONNECTED; #ifdef IO_URING_ENABLED @@ -2778,7 +2778,7 @@ int Socket::SSLHandshake(int fd, bool server_mode) { } int ssl_error = SSL_get_error(_ssl_session, rc); - LOG(INFO) << "ssl_error: " << ssl_error; + // LOG(INFO) << "ssl_error: " << ssl_error; switch (ssl_error) { case SSL_ERROR_WANT_READ: #if defined(OS_LINUX) @@ -3834,7 +3834,7 @@ void Socket::NotifyWaitingNonFixedWrite(int nw) { } int Socket::CopyDataRead() { - LOG(INFO) << "socket: " << *this << ", CopyDataRead"; + // LOG(INFO) << "socket: " << *this << ", CopyDataRead"; bthread::TaskGroup *cur_group = bound_g_; CHECK(static_cast(buf_idx_) < in_bufs_.size()); auto &rbuf = in_bufs_[buf_idx_]; @@ -3845,7 +3845,7 @@ int Socket::CopyDataRead() { #ifdef IO_URING_ENABLED if (FLAGS_use_io_uring && FLAGS_enable_ssl_io_uring && _ssl_state != SSL_OFF) { ssize_t plain = HandleTlsRingRead(buf_head, rbuf.bytes_); - LOG(INFO) << "HandleTlsRingRead, plain: " << plain; + // LOG(INFO) << "HandleTlsRingRead, plain: " << plain; if (plain < 0) { ret = -errno; goto END; From bc3c3c7e16ad78dfd6b17607032786e0c68c8b5f Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Thu, 5 Feb 2026 15:16:17 +0800 Subject: [PATCH 14/14] debugging --- example/echo_c++/CMakeLists.txt | 12 ++ example/echo_c++/benchmark.cpp | 137 +++++++++++++++++ example/echo_c++/cert.pem | 26 ++++ example/echo_c++/client.cpp | 4 + example/echo_c++/key.pem | 27 ++++ example/echo_c++/server.cpp | 10 ++ src/brpc/socket.cpp | 254 +++++++++++++++++++++++--------- src/brpc/socket.h | 5 +- 8 files changed, 406 insertions(+), 69 deletions(-) create mode 100644 example/echo_c++/benchmark.cpp create mode 100644 example/echo_c++/cert.pem create mode 100644 example/echo_c++/key.pem diff --git a/example/echo_c++/CMakeLists.txt b/example/echo_c++/CMakeLists.txt index badd49e1..5d309257 100644 --- a/example/echo_c++/CMakeLists.txt +++ b/example/echo_c++/CMakeLists.txt @@ -118,6 +118,16 @@ set(DYNAMIC_LIB dl ) +find_path(GLOG_INCLUDE_PATH NAMES glog/logging.h) +find_library(GLOG_LIB NAMES glog VERSION ">=0.6.0" REQUIRED) + +if((NOT GLOG_INCLUDE_PATH) OR(NOT GLOG_LIB)) + message(FATAL_ERROR "Fail to find glog") +endif() + +include_directories(${GLOG_INCLUDE_PATH}) +set(DYNAMIC_LIB ${DYNAMIC_LIB} ${GLOG_LIB}) + if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") set(DYNAMIC_LIB ${DYNAMIC_LIB} pthread @@ -134,6 +144,8 @@ endif() add_executable(echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER}) add_executable(echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER}) +add_executable(benchmark_echo benchmark.cpp ${PROTO_SRC} ${PROTO_HEADER}) target_link_libraries(echo_client ${BRPC_LIB} ${DYNAMIC_LIB}) target_link_libraries(echo_server ${BRPC_LIB} ${DYNAMIC_LIB}) +target_link_libraries(benchmark_echo ${BRPC_LIB} ${DYNAMIC_LIB}) diff --git a/example/echo_c++/benchmark.cpp b/example/echo_c++/benchmark.cpp new file mode 100644 index 00000000..80fb828e --- /dev/null +++ b/example/echo_c++/benchmark.cpp @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Benchmark echo-server by multiple threads. + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include "echo.pb.h" + +DEFINE_string(message, "hello world", "Message body sent to server"); +DEFINE_string(attachment, "", "Carry this along with requests"); +DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto"); +DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); +DEFINE_string(server, "0.0.0.0:8000", "IP Address of server"); +DEFINE_string(load_balancer, "", "The algorithm for load balancing"); +DEFINE_int32(thread_num, 50, "Number of threads to send requests"); +DEFINE_bool(use_bthread, false, "Use bthread to send requests"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +DEFINE_bool(ssl, true, "Enable ssl"); +DEFINE_bool(dont_fail, false, "Print fatal when some call failed"); +DEFINE_int32(dummy_port, -1, "Launch dummy server at this port"); + +bvar::LatencyRecorder g_latency_recorder("echo_client"); + +static void* sender(void* arg) { + brpc::Channel* channel = static_cast(arg); + example::EchoService_Stub stub(channel); + + while (!brpc::IsAskedToQuit()) { + example::EchoRequest request; + example::EchoResponse response; + brpc::Controller cntl; + + request.set_message(FLAGS_message); + cntl.set_timeout_ms(FLAGS_timeout_ms); + cntl.set_max_retry(FLAGS_max_retry); + cntl.request_attachment().append(FLAGS_attachment); + + stub.Echo(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + g_latency_recorder << cntl.latency_us(); + LOG(INFO) << "Echo response: " << response.message(); + } else { + LOG(WARNING) << "Fail to echo: " << cntl.ErrorText() + << " latency=" << cntl.latency_us(); + CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail) + << "error=" << cntl.ErrorText() << " latency=" << cntl.latency_us(); + bthread_usleep(100000); + } + } + return NULL; +} + +int main(int argc, char* argv[]) { + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + brpc::Channel channel; + brpc::ChannelOptions options; + options.protocol = FLAGS_protocol; + options.connection_type = FLAGS_connection_type; + options.timeout_ms = FLAGS_timeout_ms; + options.max_retry = FLAGS_max_retry; + if (FLAGS_ssl) { + options.mutable_ssl_options(); + } + + if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return -1; + } + + std::vector bids; + std::vector pids; + if (!FLAGS_use_bthread) { + pids.resize(FLAGS_thread_num); + for (int i = 0; i < FLAGS_thread_num; ++i) { + if (pthread_create(&pids[i], NULL, sender, &channel) != 0) { + LOG(ERROR) << "Fail to create pthread"; + return -1; + } + } + } else { + bids.resize(FLAGS_thread_num); + for (int i = 0; i < FLAGS_thread_num; ++i) { + if (bthread_start_background(&bids[i], NULL, sender, &channel) != 0) { + LOG(ERROR) << "Fail to create bthread"; + return -1; + } + } + } + + if (FLAGS_dummy_port >= 0) { + brpc::StartDummyServerAt(FLAGS_dummy_port); + } + + while (!brpc::IsAskedToQuit()) { + sleep(1); + LOG(INFO) << "Sending echo requests at qps=" << g_latency_recorder.qps(1) + << " latency=" << g_latency_recorder.latency(1); + } + + LOG(INFO) << "Echo benchmark is going to quit"; + for (int i = 0; i < FLAGS_thread_num; ++i) { + if (!FLAGS_use_bthread) { + pthread_join(pids[i], NULL); + } else { + bthread_join(bids[i], NULL); + } + } + + return 0; +} diff --git a/example/echo_c++/cert.pem b/example/echo_c++/cert.pem new file mode 100644 index 00000000..28bcc21e --- /dev/null +++ b/example/echo_c++/cert.pem @@ -0,0 +1,26 @@ +-----BEGIN CERTIFICATE----- +MIIEUTCCAzmgAwIBAgIBADANBgkqhkiG9w0BAQQFADB9MQswCQYDVQQGEwJDTjER +MA8GA1UECBMIU2hhbmdoYWkxETAPBgNVBAcTCFNoYW5naGFpMQ4wDAYDVQQKEwVC +YWlkdTEMMAoGA1UECxMDSU5GMQwwCgYDVQQDEwNTQVQxHDAaBgkqhkiG9w0BCQEW +DXNhdEBiYWlkdS5jb20wHhcNMTUwNzE2MDMxOTUxWhcNMTgwNTA1MDMxOTUxWjB9 +MQswCQYDVQQGEwJDTjERMA8GA1UECBMIU2hhbmdoYWkxETAPBgNVBAcTCFNoYW5n +aGFpMQ4wDAYDVQQKEwVCYWlkdTEMMAoGA1UECxMDSU5GMQwwCgYDVQQDEwNTQVQx +HDAaBgkqhkiG9w0BCQEWDXNhdEBiYWlkdS5jb20wggEiMA0GCSqGSIb3DQEBAQUA +A4IBDwAwggEKAoIBAQCqdyAeHY39tqY1RYVbfpqZjZlJDtZb04znxjgQrX+mKmLb +mwvXgJojlfn2Qcgp4NKYFqDFb9tU/Gbb436dRvkHyWOz0RPMspR0TTRU1NIY8wRy +0A1LOCgLHsbRJHqktGjylejALdgsspFWyDY9bEfb4oWsnKGzJqcvIDXrPmMOOY4o +pbA9SufSzwRZN7Yzc5jAedpaF9SK78RQXtvV0+JfCUwBsBWPKevRFFUrN7rQBYjP +cgV/HgDuquPrqnESVSYyfEBKZba6cmNb+xzO3cB1brPTtobSXh+0o/0CtRA+2m63 +ODexxCLntgkPm42IYCJLM15xTatcfVX/3LHQ31DrAgMBAAGjgdswgdgwHQYDVR0O +BBYEFGcd7lA//bSAoSC/NbWRx/H+O1zpMIGoBgNVHSMEgaAwgZ2AFGcd7lA//bSA +oSC/NbWRx/H+O1zpoYGBpH8wfTELMAkGA1UEBhMCQ04xETAPBgNVBAgTCFNoYW5n +aGFpMREwDwYDVQQHEwhTaGFuZ2hhaTEOMAwGA1UEChMFQmFpZHUxDDAKBgNVBAsT +A0lORjEMMAoGA1UEAxMDU0FUMRwwGgYJKoZIhvcNAQkBFg1zYXRAYmFpZHUuY29t +ggEAMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEEBQADggEBAKfoCn8SpLk3uQyT +X+oygcRWfTeJtN3D5J69NCMJ7wB+QPfpEBPwiqMgdbp4bRJ98H7x5UQsHT+EDOT/ +9OmipomHInFY4W1ew11zNKwuENeRrnZwTcCiVLZsxZsAU41ZeI5Yq+2WdtxnePCR +VL1/NjKOq+WoRdb2nLSNDWgYMkLRVlt32hyzryyrBbmaxUl8BxnPqUiWduMwsZUz +HNpXkoa1xTSd+En1SHYWfMg8BOVuV0I0/fjUUG9AXVqYpuogfbjAvibVNWAmxOfo +fOjCPCGoJC1ET3AxYkgXGwioobz0pK/13k2pV+wu7W4g+6iTfz+hwZbPsUk2a/5I +f6vXFB0= +-----END CERTIFICATE----- diff --git a/example/echo_c++/client.cpp b/example/echo_c++/client.cpp index 337aa6e9..1f39b9b0 100644 --- a/example/echo_c++/client.cpp +++ b/example/echo_c++/client.cpp @@ -31,6 +31,7 @@ DEFINE_string(load_balancer, "", "The algorithm for load balancing"); DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests"); +DEFINE_bool(ssl, true, "Enable ssl"); int main(int argc, char* argv[]) { // Parse gflags. We recommend you to use gflags as well. @@ -46,6 +47,9 @@ int main(int argc, char* argv[]) { options.connection_type = FLAGS_connection_type; options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/; options.max_retry = FLAGS_max_retry; + if (FLAGS_ssl) { + options.mutable_ssl_options(); + } if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) { LOG(ERROR) << "Fail to initialize channel"; return -1; diff --git a/example/echo_c++/key.pem b/example/echo_c++/key.pem new file mode 100644 index 00000000..e3f64d1e --- /dev/null +++ b/example/echo_c++/key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAqncgHh2N/bamNUWFW36amY2ZSQ7WW9OM58Y4EK1/pipi25sL +14CaI5X59kHIKeDSmBagxW/bVPxm2+N+nUb5B8ljs9ETzLKUdE00VNTSGPMEctAN +SzgoCx7G0SR6pLRo8pXowC3YLLKRVsg2PWxH2+KFrJyhsyanLyA16z5jDjmOKKWw +PUrn0s8EWTe2M3OYwHnaWhfUiu/EUF7b1dPiXwlMAbAVjynr0RRVKze60AWIz3IF +fx4A7qrj66pxElUmMnxASmW2unJjW/sczt3AdW6z07aG0l4ftKP9ArUQPtputzg3 +scQi57YJD5uNiGAiSzNecU2rXH1V/9yx0N9Q6wIDAQABAoIBADN3khflnnhKzDXr +To9IU08nRG+dbjT9U16rJ0RJze+SfpSFZHblWiSCZJzoUZHrUkofEt1pn1QyfK/J +KPI9enTSZirlZk/4XwAaS0GNm/1yahZsIIdkZhqtaSO+GtVdrw4HGuXjMZCVPXJx +MocrCSsnYmqyQ9P+SJ3e4Mis5mVllwDiUVlnTIamSSt16qkPdamLSJrxvI4LirQK +9MZWNLoDFpRU1MJxQ/QzrEC3ONTq4j++AfbGzYTmDDtLeM8OSH5o72YXZ2JkaA4c +xCzHFT+NaJYxF7esn/ctzGg50LYl8IF2UQtzOkX2l3l/OktIB1w+jGV6ONb1EWx5 +4zkkzNkCgYEA2EXj7GMsyNE3OYdMw8zrqQKUMON2CNnD+mBseGlr22/bhXtzpqK8 +uNel8WF1ezOnVvNsU8pml/W/mKUu6KQt5JfaDzen3OKjzTABVlbJxwFhPvwAeaIA +q/tmSKyqiCgOMbR7Cq4UEwGf2A9/RII4JEC0/aipRU5srF65OYPUOJcCgYEAycco +DFVG6jUw9w68t/X4f7NT4IYP96hSAqLUPuVz2fWwXKLWEX8JiMI+Ue3PbMz6mPcs +4vMu364u4R3IuzrrI+PRK9iTa/pahBP6eF6ZpbY1ObI8CVLTrqUS9p22rr9lBm8V +EZA9hwcHLYt+PWzaKcsFpbP4+AeY7nBBbL9CAM0CgYAzuJsmeB1ItUgIuQOxu7sM +AzLfcjZTLYkBwreOIGAL7XdJN9nTmw2ZAvGLhWwsF5FIaRSaAUiBxOKaJb7PIhxb +k7kxdHTvjT/xHS7ksAK3VewkvO18KTMR7iBq9ugdgb7LQkc+qZzhYr0QVbxw7Ndy +TAs8sm4wxe2VV13ilFVXZwKBgDfU6ZnwBr1Llo7l/wYQA4CiSDU6IzTt2DNuhrgY +mWPX/cLEM+OHeUXkKYZV/S0n0rd8vWjWzUOLWOFlcmOMPAAkS36MYM5h6aXeOVIR +KwaVUkjyrnYN+xC6EHM41JGp1/RdzECd3sh8A1pw3K92bS9fQ+LD18IZqBFh8lh6 +23KJAoGAe48SwAsaGvqRO61Taww/Wf+YpGc9lnVbCvNFGScYaycPMqaRBUBmz/U3 +QQgpQY8T7JIECbA8sf78SlAZ9x93r0UQ70RekV3WzKAQHfHK8nqTjd3T0+i4aySO +yQpYYCgE24zYO6rQgwrhzI0S4rWe7izDDlg0RmLtQh7Xw+rlkAQ= +-----END RSA PRIVATE KEY----- diff --git a/example/echo_c++/server.cpp b/example/echo_c++/server.cpp index 08b3e9d6..8711383b 100644 --- a/example/echo_c++/server.cpp +++ b/example/echo_c++/server.cpp @@ -28,6 +28,10 @@ DEFINE_string(listen_addr, "", "Server listen address, may be IPV4/IPV6/UDS." " If this is set, the flag port will be ignored"); DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " "read/write operations during the last `idle_timeout_s'"); +DEFINE_bool(ssl, true, "Enable TLS server"); +DEFINE_string(certificate, "../cert.pem", "Certificate file path to enable SSL"); +DEFINE_string(private_key, "../key.pem", "Private key file path to enable SSL"); +DEFINE_string(ciphers, "", "Cipher suite used for SSL connections"); // Your implementation of example::EchoService // Notice that implementing brpc::Describable grants the ability to put @@ -104,6 +108,12 @@ int main(int argc, char* argv[]) { // Start the server. brpc::ServerOptions options; options.idle_timeout_sec = FLAGS_idle_timeout_s; + if (FLAGS_ssl) { + options.mutable_ssl_options()->default_cert.certificate = FLAGS_certificate; + options.mutable_ssl_options()->default_cert.private_key = FLAGS_private_key; + options.mutable_ssl_options()->ciphers = FLAGS_ciphers; + } + options.num_threads = 1; if (server.Start(point, &options) != 0) { LOG(ERROR) << "Fail to start EchoServer"; return -1; diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 192c5419..808c8f97 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -61,6 +61,11 @@ #include "bthread/ring_listener.h" #endif +#include +#include +#include + + DEFINE_bool(dispatch_lazily, false, "dispatcher lazily creates task"); #ifdef IO_URING_ENABLED DEFINE_bool(use_io_uring, false, "Use IO URING to do the polling."); @@ -646,6 +651,7 @@ void Socket::DestroyTlsRingContext() { } int Socket::AddMemoryBIO(int fd) { + LOG(INFO) << "Socket: " << *this << " AddMemoryBIO fd: " << fd; if (!FLAGS_use_io_uring || !FLAGS_enable_ssl_io_uring) { return 0; } @@ -665,27 +671,65 @@ int Socket::AddMemoryBIO(int fd) { return -1; } { - BAIDU_SCOPED_LOCK(_ssl_session_mutex); + // BAIDU_SCOPED_LOCK(_ssl_session_mutex); BIO* old_wbio = SSL_get_wbio(_ssl_session); if (old_wbio) { BIO_flush(old_wbio); } + // Actual BIO lifetime is now owned by SSL after SSL_set_bio. SSL_set_bio(_ssl_session, mem_rbio, mem_wbio); - // LOG(INFO) << "Socket=" << *this << " switched SSL BIO to memory (fd=" << fd << ")"; + LOG(INFO) << "Socket=" << *this << " switched SSL BIO to memory (fd=" << fd << ")"; } _tls_ring_ctx->mem_rbio = mem_rbio; _tls_ring_ctx->mem_wbio = mem_wbio; return 0; } +static std::string HexDump16(const std::string& s) { + return ""; + std::ostringstream oss; + oss << "len=" << s.size() << "\n"; + + for (size_t i = 0; i < s.size(); i += 16) { + oss << std::setw(6) << std::setfill('0') << i << ": "; + + // hex + oss << std::hex << std::setfill('0'); + for (size_t j = 0; j < 16; j++) { + if (i + j < s.size()) { + unsigned char c = (unsigned char)s[i + j]; + oss << std::setw(2) << (int)c << " "; + } else { + oss << " "; + } + } + + // ascii + oss << " |"; + for (size_t j = 0; j < 16 && i + j < s.size(); j++) { + unsigned char c = (unsigned char)s[i + j]; + oss << (c >= 32 && c <= 126 ? (char)c : '.'); + } + oss << "|\n"; + + oss << std::dec; // 恢复 + } + + return oss.str(); +} + + + int Socket::FeedTlsCiphertext(const void* data, size_t len) { - // LOG(INFO) << "FeedTlsCiphertext, len: " << len; + LOG(INFO) << "Socket: " << *this << " FeedTlsCiphertext, len: " << len; + std::string cipher((char *)data, len); + LOG(INFO) << "Socket: " << *this << " FeedTlsCiphertext, content: " << HexDump16(cipher); if (!_tls_ring_ctx || !_tls_ring_ctx->mem_rbio || len == 0) { // LOG(INFO) << "return 0"; return 0; } const int nw = BIO_write(_tls_ring_ctx->mem_rbio, data, len); - // LOG(INFO) << "nw: " << nw; + LOG(INFO) << "Socket: " << *this << " FeedTlsCiphertext, BIO_write nw: " << nw; if (nw <= 0) { return -1; } @@ -706,25 +750,30 @@ int Socket::DrainTlsCiphertext() { _tls_ring_ctx->pending_cipher_out.append(buf, rc); total += rc; } - // LOG_IF(INFO, total > 0) << "Socket=" << *this << " drained " << total << " bytes TLS ciphertext"; + LOG(INFO) << "Socket=" << *this << " DrainTlsCiphertext drained " << total << " bytes TLS ciphertext"; return total; } + int Socket::FlushTlsCiphertext() { - // LOG(INFO) << "FlushTlsCiphertext"; + LOG(INFO) << "Socket: " << *this << " FlushTlsCiphertext"; if (!_tls_ring_ctx) { // LOG(INFO) << "return 0"; return 0; } ssize_t total = 0; while (!_tls_ring_ctx->pending_cipher_out.empty()) { + LOG(INFO) << "before cut, iovecs_ size: " << iovecs_.size(); _tls_ring_ctx->pending_cipher_out.cut_into_iovecs(&iovecs_); + LOG(INFO) << "after cut, iovecs_ size: " << iovecs_.size(); + LOG(INFO) << "Socket: " << *this << " FlushTlsCiphertext pending_cipher_out, data: " + << HexDump16(_tls_ring_ctx->pending_cipher_out.to_string()); if (iovecs_.empty()) { break; } bthread::TaskGroup* g = bthread::TaskGroup::VolatileTLSTaskGroup(); int rc = g->SocketWaitingNonFixedWrite(this); - // LOG(INFO) << "SocketWaitingNonFixedWrite, return: " << rc; + LOG(INFO) << "Socket: " << *this << " SocketWaitingNonFixedWrite, return: " << rc; if (rc < 0) { errno = -rc; iovecs_.clear(); @@ -736,36 +785,51 @@ int Socket::FlushTlsCiphertext() { iovecs_.clear(); // LOG(INFO) << "Socket=" << *this << " flushed " << rc << " bytes TLS ciphertext via io_uring"; } - // LOG(INFO) << "return total: " << total; + LOG(INFO) << "Socket: " << *this << " SocketWaitingNonFixedWrite return total: " << total; return total; } -ssize_t Socket::ConsumeTlsPlaintext(size_t size_hint) { - // LOG(INFO) << "ConsumeTlsPlaintext, size_hint: " << size_hint; +// ssize_t Socket::ConsumeTlsPlaintext(size_t size_hint) { +ssize_t Socket::ConsumeTlsPlaintext() { + // LOG(INFO) << "Socket: " << *this << " ConsumeTlsPlaintext, size_hint: " << size_hint; + LOG(INFO) << "Socket: " << *this << " ConsumeTlsPlaintext..."; if (!_tls_ring_ctx || !_tls_ring_ctx->mem_rbio) { - // LOG(INFO) << "ConsumeTlsPlaintext, !_tls_ring_ctx || !_tls_ring_ctx->mem_rbio, return 0"; + LOG(INFO) << "ConsumeTlsPlaintext, !_tls_ring_ctx || !_tls_ring_ctx->mem_rbio, return 0"; return 0; } ssize_t total = 0; - while (total < (ssize_t)size_hint) { + // while (total < (ssize_t)size_hint) { + // size_t pending = BIO_ctrl_pending(_tls_ring_ctx->mem_rbio); + while (true) { char buf[4 * 1024]; ERR_clear_error(); - int rc = SSL_read(_ssl_session, buf, std::min(sizeof(buf), size_hint - total)); + size_t remaining = BIO_ctrl_pending(_tls_ring_ctx->mem_rbio); + LOG(INFO) << "Before SSL_read, BIO mem_rbio len: " << BIO_ctrl_pending(_tls_ring_ctx->mem_rbio) + << ", mem_wbio len: " << BIO_ctrl_pending(_tls_ring_ctx->mem_wbio) << ", n: " << std::min(sizeof(buf), remaining); + + // int rc = SSL_read(_ssl_session, buf, std::min(sizeof(buf), size_hint - total)); + int rc = SSL_read(_ssl_session, buf, std::min(sizeof(buf), remaining)); + LOG(INFO) << "After SSL_read, BIO mem_rbio len: " << BIO_ctrl_pending(_tls_ring_ctx->mem_rbio) + << ", mem_wbio len: " << BIO_ctrl_pending(_tls_ring_ctx->mem_wbio); + LOG(INFO) << "Socket: " << *this << ", SSL_read, return rc: " << rc; + // TODO(zkl): if rc == 0, return EOF? if (rc <= 0) { int ssl_err = SSL_get_error(_ssl_session, rc); if (ssl_err == SSL_ERROR_WANT_READ) { - // LOG(INFO) << "ConsumeTlsPlaintext SSL_read, rc: " << rc - // << ", ssl_err: " << int(ssl_err); + LOG(INFO) << "Socket: " << *this << " ConsumeTlsPlaintext SSL_read, rc: " << rc + << ", ssl_err: " << int(ssl_err); break; } errno = (ssl_err == SSL_ERROR_ZERO_RETURN) ? ECONNRESET : ESSL; - // LOG(INFO) << "ConsumeTlsPlaintext, errno: " << int(errno) << ", return -1"; + LOG(INFO) << "Socket: " << *this << " ConsumeTlsPlaintext, errno: " << int(errno) + << " ssl_err: " << int(ssl_err) << ", " << SSLError(ERR_get_error()) << ", return -1"; return -1; } _read_buf.append(buf, rc); total += rc; + LOG(INFO) << "total: " << total; } - // LOG(INFO) << "ConsumeTlsPlaintext, return total: " << total; + LOG(INFO) << "Socket: " << *this << " ConsumeTlsPlaintext, return total: " << total; return total; } @@ -778,7 +842,7 @@ int Socket::EnsureTlsSessionForRing() { return -1; } if (_ssl_session == NULL) { - // LOG(INFO) << "Socket=" << *this << " creating TLS session"; + LOG(INFO) << "Socket=" << *this << " creating SSL session"; _ssl_session = CreateSSLSession(_ssl_ctx->raw_ctx, id(), fd(), true); if (_ssl_session == NULL) { LOG(ERROR) << "Socket=" << *this << " failed to create SSL session"; @@ -786,14 +850,19 @@ int Socket::EnsureTlsSessionForRing() { return -1; } SSL_set_accept_state(_ssl_session); + if (AddMemoryBIO(fd()) != 0) { + LOG(ERROR) << "Socket=" << *this << " failed to bind memory BIO"; + errno = ESSL; + return -1; + } } else { // LOG(INFO) << "Socket=" << *this << " reusing existing TLS session"; } - if (AddMemoryBIO(fd()) != 0) { - LOG(ERROR) << "Socket=" << *this << " failed to bind memory BIO"; - errno = ESSL; - return -1; - } + // if (AddMemoryBIO(fd()) != 0) { + // LOG(ERROR) << "Socket=" << *this << " failed to bind memory BIO"; + // errno = ESSL; + // return -1; + // } // LOG(INFO) << "Socket=" << *this << " initialized TLS session for io_uring"; return 0; } @@ -802,37 +871,60 @@ int Socket::ContinueTlsHandshake() { // Drive SSL handshake until it completes or requires more data. Any // handshake output is drained and submitted via io_uring. while (_ssl_state == SSL_CONNECTING) { - // LOG(INFO) << "ContinueTlsHandshake _ssl_state: " << int(_ssl_state); + LOG(INFO) << "Socket: " << *this << " ContinueTlsHandshake _ssl_state: " << int(_ssl_state); ERR_clear_error(); + LOG(INFO) << "Before SSL_do_handshake, BIO mem_rbio len: " << BIO_ctrl_pending(_tls_ring_ctx->mem_rbio) + << ", mem_wbio len: " << BIO_ctrl_pending(_tls_ring_ctx->mem_wbio); int rc = SSL_do_handshake(_ssl_session); + LOG(INFO) << "After SSL_do_handshake, BIO mem_rbio len: " << BIO_ctrl_pending(_tls_ring_ctx->mem_rbio) + << ", mem_wbio len: " << BIO_ctrl_pending(_tls_ring_ctx->mem_wbio); if (rc == 1) { - // LOG(INFO) << "SSL_do_handshake return: " << rc << ", ssl_err: " << int(SSL_get_error(_ssl_session, rc)); + LOG(INFO) << "🌟🌟🌟Socket: " << *this << " SSL_do_handshake return: " << rc + << ", ssl_err: " << int(SSL_get_error(_ssl_session, rc)); _ssl_state = SSL_CONNECTED; + DrainTlsCiphertext(); + if (!_tls_ring_ctx->pending_cipher_out.empty()) { + LOG(INFO) << "Socket: " << *this << " FlushTlsCiphertext after SSL connected"; + FlushTlsCiphertext(); + } break; } const int ssl_err = SSL_get_error(_ssl_session, rc); - // LOG(INFO) << "SSL_do_handshake return: " << rc << ", ssl_err: " << int(ssl_err); + LOG(INFO) << "Socket: " << *this << " SSL_do_handshake return: " << rc << ", ssl_err: " << int(ssl_err); + // unsigned long e; + // while ((e = ERR_get_error()) != 0) { + // char buf[256]; + // ERR_error_string_n(e, buf, sizeof(buf)); + // LOG(INFO) << "Socket: " << *this << " SSL_do_handshake ERR: " << buf; + // // fprintf(stderr, "openssl: %s\n", buf); + // } if (ssl_err == SSL_ERROR_WANT_READ) { - // LOG(INFO) << "SSL_ERROR_WANT_READ"; + LOG(INFO) << "Socket: " << *this << " SSL_ERROR_WANT_READ"; + LOG(INFO) << "Before DrainTlsCiphertext, BIO mem_rbio len: " << BIO_ctrl_pending(_tls_ring_ctx->mem_rbio) + << ", mem_wbio len: " << BIO_ctrl_pending(_tls_ring_ctx->mem_wbio); DrainTlsCiphertext(); + LOG(INFO) << "After DrainTlsCiphertext, BIO mem_rbio len: " << BIO_ctrl_pending(_tls_ring_ctx->mem_rbio) +<< ", mem_wbio len: " << BIO_ctrl_pending(_tls_ring_ctx->mem_wbio); if (!_tls_ring_ctx->pending_cipher_out.empty()) { + LOG(INFO) << "Socket=" << *this << " flushing TLS handshake data after WANT_READ..."; if (FlushTlsCiphertext() < 0) { LOG(WARNING) << "Socket=" << *this << " failed to flush TLS handshake after WANT_READ: " << berror(errno); return -1; } - // LOG(INFO) << "Socket=" << *this << " flushed TLS handshake data after WANT_READ"; + LOG(INFO) << "Socket=" << *this << " flushed TLS handshake data after WANT_READ"; } break; } else if (ssl_err == SSL_ERROR_WANT_WRITE) { + LOG(INFO) << "Socket: " << *this << " SSL_ERROR_WANT_WRITE"; DrainTlsCiphertext(); if (FlushTlsCiphertext() < 0) { LOG(WARNING) << "Socket=" << *this << " failed to flush TLS handshake: " << berror(errno); return -1; } - // LOG(INFO) << "Socket=" << *this << " flushed TLS handshake data via io_uring"; + LOG(INFO) << "Socket=" << *this << " flushed TLS handshake data via io_uring"; } else { LOG(WARNING) << "Socket=" << *this << " TLS handshake error: " << SSLError(ERR_get_error()); @@ -847,30 +939,31 @@ ssize_t Socket::ProcessTlsRingData(const char* data, size_t len) { // Feed ciphertext into the SSL read BIO and, if handshake is still in // progress, continue driving it. When fully connected, decrypt plaintext // into _read_buf. - // LOG(INFO) << "Socket=" << *this << ", ProcessTlsRingData, len: " << len; + LOG(INFO) << "Socket=" << *this << ", ProcessTlsRingData, len: " << len; if (len > 0) { if (FeedTlsCiphertext(data, len) < 0) { errno = ESSL; return -1; } } - // LOG(INFO) << "_ssl_state: " << int(_ssl_state); + LOG(INFO) << "Socket: " << *this << " _ssl_state: " << int(_ssl_state); if (_ssl_state == SSL_CONNECTING) { - // LOG(INFO) << "ProcessTlsRingData SSL_CONNECTING, ContinueTlsHandshake"; + LOG(INFO) << "Socket: " << *this << " ProcessTlsRingData SSL_CONNECTING, ContinueTlsHandshake"; if (ContinueTlsHandshake() != 0) { return -1; } if (_ssl_state == SSL_CONNECTED) { - // LOG(INFO) << "Socket=" << *this << " TLS handshake completed"; + LOG(INFO) << "Socket=" << *this << " TLS handshake completed"; } } - // LOG(INFO) << "ProcessTlsRingData _ssl_state: " << int(_ssl_state); + LOG(INFO) << "Socket: " << *this << " ProcessTlsRingData _ssl_state: " << int(_ssl_state); if (_ssl_state != SSL_CONNECTED) { - // LOG(INFO) << "ProcessTlsRingData, _ssl_state != SSL_CONNECTED, return 0, _ssl_state: " << int(_ssl_state); + LOG(INFO) << "Socket: " << *this << " ProcessTlsRingData, _ssl_state != SSL_CONNECTED, return 0, _ssl_state: " << int(_ssl_state); return 0; } - ssize_t plain = ConsumeTlsPlaintext(len); - // LOG(INFO) << "ProcessTlsRingData ConsumeTlsPlaintext result: " << plain; + // ssize_t plain = ConsumeTlsPlaintext(len); + ssize_t plain = ConsumeTlsPlaintext(); + LOG(INFO) << "Socket: " << *this << " ProcessTlsRingData ConsumeTlsPlaintext result: " << plain; return plain < 0 ? -1 : plain; } @@ -890,23 +983,25 @@ static SSLState DetectSSLStateInBuffer(const char* header, size_t len) { #endif ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { + BAIDU_SCOPED_LOCK(_ssl_session_mutex); + LOG(INFO) << "Socket: " << *this << " HandleTlsRingRead, len: " << len; if (_ssl_state == SSL_UNKNOWN) { // Accumulate bytes until we can determine whether this connection is // using TLS. The first handshake record needs at least 6 bytes. _tls_detect_buf.append(data, len); if (_tls_detect_buf.length() < 6) { - LOG(INFO) << "HandleTlsRingRead _tls_detect_buf.length() < 6, return eagain"; + LOG(INFO) << "Socket: " << *this << " HandleTlsRingRead _tls_detect_buf.length() < 6, return eagain"; errno = EAGAIN; return -errno; } char header[6]; _tls_detect_buf.copy_to(header, 6); SSLState s = DetectSSLStateInBuffer(header, 6); - // LOG(INFO) << "Socket=" << *this << " DetectSSLStateInBuffer, ssl state: " << int(s); + LOG(INFO) << "Socket=" << *this << " DetectSSLStateInBuffer, ssl state: " << int(s); if (s == SSL_CONNECTING) { if (EnsureTlsSessionForRing() != 0) { errno = ESSL; - LOG(INFO) << "HandleTlsRingRead, return -1"; + LOG(INFO) << "Socket: " << *this << " HandleTlsRingRead, return -1"; return -1; } _ssl_state = SSL_CONNECTING; @@ -914,11 +1009,11 @@ ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { cached.resize(_tls_detect_buf.size()); _tls_detect_buf.copy_to(cached.data(), cached.size()); _tls_detect_buf.clear(); - // LOG(INFO) << "Socket=" << *this << " detected TLS over io_uring (cached=" - // << cached.size() << " bytes)"; + LOG(INFO) << "Socket=" << *this << " detected TLS over io_uring (cached=" + << cached.size() << " bytes)"; ssize_t plain = ProcessTlsRingData(cached.data(), cached.size()); if (plain == 0) { - // LOG(INFO) << "HandleTlsRingRead ProcessTlsRingData 0, return EAGAIN"; + LOG(INFO) << "Socket: " << *this << " HandleTlsRingRead ProcessTlsRingData 0, return EAGAIN"; errno = EAGAIN; return -errno; } @@ -932,24 +1027,25 @@ ssize_t Socket::HandleTlsRingRead(const char* data, size_t len) { errno = ESSL; return -1; } - // LOG(INFO) << "Socket=" << *this << " treated as non-TLS over io_uring," - // << " appended " << plain << " bytes"; + LOG(INFO) << "Socket=" << *this << " treated as non-TLS over io_uring," + << " appended " << plain << " bytes"; return plain; } } if (_ssl_state == SSL_CONNECTING || _ssl_state == SSL_CONNECTED) { // Once we know the connection is TLS, all incoming ciphertext goes // through ProcessTlsRingData. - // LOG(INFO) << "Socket=" << *this << " processing TLS ring data len=" << len; + LOG(INFO) << "Socket=" << *this << " processing TLS ring data len=" << len; ssize_t plain = ProcessTlsRingData(data, len); if (plain == 0) { - // LOG(INFO) << "HandleTlsRingRead ProcessTlsRingData 0, return EAGAIN"; + LOG(INFO) << "Socket: " << *this << " HandleTlsRingRead ProcessTlsRingData 0, return EAGAIN"; errno = EAGAIN; return -errno; } + LOG(INFO) << "HandleTlsRingRead, return plain: " << plain << ", read_buf size: " << _read_buf.size(); return plain; } - // LOG(INFO) << "Socket=" << *this << " treating ring data as plaintext len=" << len; + LOG(INFO) << "Socket=" << *this << " treating ring data as plaintext len=" << len; _read_buf.append(data, len); return len; } @@ -1269,6 +1365,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { _local_side = butil::EndPoint(); if (_ssl_session) { + LOG(INFO) << "Socket: " << *this << " SSL_free(_ssl_session): " << _ssl_session; SSL_free(_ssl_session); _ssl_session = NULL; } @@ -1622,6 +1719,7 @@ void Socket::OnRecycle() { bthread_id_list_destroy(&_id_wait_list); if (_ssl_session) { + LOG(INFO) << "Socket: " << *this << " SSL_free(_ssl_session): " << _ssl_session; SSL_free(_ssl_session); _ssl_session = NULL; } @@ -2628,13 +2726,14 @@ ssize_t Socket::DoWrite(WriteRequest* req) { BAIDU_SCOPED_LOCK(_ssl_session_mutex); return _conn->CutMessageIntoSSLChannel(_ssl_session, data_list, ndata); } + int ssl_error = 0; + ssize_t nw = 0; #ifdef IO_URING_ENABLED if (FLAGS_use_io_uring && FLAGS_enable_ssl_io_uring && _tls_ring_ctx) { - return DoWriteTlsRing(req, data_list, ndata); - } + BAIDU_SCOPED_LOCK(_ssl_session_mutex); + nw = DoWriteTlsRing(req, data_list, ndata, &ssl_error); + } else #endif - int ssl_error = 0; - ssize_t nw = 0; { BAIDU_SCOPED_LOCK(_ssl_session_mutex); nw = butil::IOBuf::cut_multiple_into_SSL_channel(_ssl_session, data_list, ndata, &ssl_error); @@ -2673,44 +2772,50 @@ ssize_t Socket::DoWrite(WriteRequest* req) { } #ifdef IO_URING_ENABLED -ssize_t Socket::DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], size_t ndata) { +ssize_t Socket::DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], size_t ndata, int* ssl_error) { ssize_t total_plain = 0; // SSL objects are not thread-safe. Guard all SSL_write operations. - BAIDU_SCOPED_LOCK(_ssl_session_mutex); + // BAIDU_SCOPED_LOCK(_ssl_session_mutex); + // size_t unwritten = 0; for (size_t i = 0; i < ndata; ++i) { butil::IOBuf* buf = data_list[i]; while (!buf->empty()) { - int ssl_error = SSL_ERROR_NONE; + // int ssl_error = SSL_ERROR_NONE; // cut_into_SSL_channel sends plaintext into SSL_write and removes // consumed bytes from |buf| when successful. - const ssize_t nw = buf->cut_into_SSL_channel(_ssl_session, &ssl_error); + const ssize_t nw = buf->cut_into_SSL_channel(_ssl_session, ssl_error); if (nw > 0) { + // unwritten += nw; total_plain += nw; continue; } - if (ssl_error == SSL_ERROR_WANT_WRITE) { + if (*ssl_error == SSL_ERROR_WANT_WRITE) { // Memory BIO is full. Drain existing ciphertext and push to // io_uring before retrying the remaining plaintext. DrainTlsCiphertext(); + // TODO(zkl): handle partial write if (FlushTlsCiphertext() < 0) { - LOG(WARNING) << "Socket=" << *this + LOG(FATAL) << "Socket=" << *this << " FlushTlsCiphertext failed during WANT_WRITE: " << berror(errno); - return -1; + return total_plain; } + // total_plain += unwritten; + // unwritten = 0; // LOG(INFO) << "Socket=" << *this << " TLS WANT_WRITE resolved, retrying"; continue; } - if (ssl_error == SSL_ERROR_WANT_READ) { + if (*ssl_error == SSL_ERROR_WANT_READ) { + LOG(FATAL) << "ssl_write returns WANT_READ"; errno = EPROTO; } else { errno = ESSL; } LOG(WARNING) << "Socket=" << *this << " TLS write failed: " << SSLError(ERR_get_error()) << " errno=" << errno; - return -1; + return total_plain; } } DrainTlsCiphertext(); @@ -2718,9 +2823,10 @@ ssize_t Socket::DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], siz LOG(WARNING) << "Socket=" << *this << " FlushTlsCiphertext failed after draining: " << berror(errno); - return -1; + return total_plain; } - // LOG(INFO) << "Socket=" << *this << " wrote TLS plaintext=" << total_plain; + // total_plain += unwritten; + LOG(INFO) << "Socket=" << *this << " DoWriteTlsRing wrote TLS plaintext=" << total_plain; return total_plain; } @@ -2728,6 +2834,15 @@ ssize_t Socket::DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], siz int Socket::SSLHandshake(int fd, bool server_mode) { + struct Defer { + Socket *sock; + Defer(Socket *socket): sock(socket) { + LOG(INFO) << "Socket: " << *sock << ", SSLHandshake..." << boost::stacktrace::stacktrace(); + } + ~Defer() { + LOG(INFO) << "Socket: " << *sock << ", SSLHandshake finishes" << boost::stacktrace::stacktrace(); + } + }; if (_ssl_ctx == NULL) { if (server_mode) { LOG(ERROR) << "Lack SSL configuration to handle SSL request"; @@ -2735,9 +2850,11 @@ int Socket::SSLHandshake(int fd, bool server_mode) { } return 0; } + Defer defer(this); // TODO: Reuse ssl session id for client if (_ssl_session) { + LOG(INFO) << "Socket: " << *this << "SSL_free _ssl_session: " << _ssl_session; // Free the last session, which may be deprecated when socket failed SSL_free(_ssl_session); } @@ -2759,9 +2876,9 @@ int Socket::SSLHandshake(int fd, bool server_mode) { // as it may confuse the origin event processing code. while (true) { ERR_clear_error(); + LOG(INFO) << "Socket: " << *this << " SSLHandshake SSL_do_handshake..."; int rc = SSL_do_handshake(_ssl_session); - // LOG(INFO) << "SSLHandshake: " << *this << ", " << boost::stacktrace::stacktrace() - // << ", rc: " << rc; + LOG(INFO) << "🌟Socket: " << *this << " SSLHandshake, rc: " << rc; if (rc == 1) { _ssl_state = SSL_CONNECTED; #ifdef IO_URING_ENABLED @@ -3834,18 +3951,21 @@ void Socket::NotifyWaitingNonFixedWrite(int nw) { } int Socket::CopyDataRead() { - // LOG(INFO) << "socket: " << *this << ", CopyDataRead"; bthread::TaskGroup *cur_group = bound_g_; CHECK(static_cast(buf_idx_) < in_bufs_.size()); auto &rbuf = in_bufs_[buf_idx_]; int nw = rbuf.bytes_; int ret = nw; + LOG(INFO) << "socket: " << *this << ", CopyDataRead, buf_idx_: " << buf_idx_ + << ", in_bufs_ size: " << in_bufs_.size() + << " rbuf bytes: " << rbuf.bytes_; if (rbuf.bytes_ > 0) { const char *buf_head = cur_group->GetRingReadBuf(rbuf.buf_id_); #ifdef IO_URING_ENABLED if (FLAGS_use_io_uring && FLAGS_enable_ssl_io_uring && _ssl_state != SSL_OFF) { ssize_t plain = HandleTlsRingRead(buf_head, rbuf.bytes_); - // LOG(INFO) << "HandleTlsRingRead, plain: " << plain; + LOG(INFO) << "Socket: " << *this << " HandleTlsRingRead, plain: " << plain + << ", read_buf size: " << _read_buf.size(); if (plain < 0) { ret = -errno; goto END; diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 211077b1..8ebeb278 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -660,7 +660,8 @@ friend class policy::H2GlobalStreamCreator; // Push pending ciphertext to io_uring (synchronous submit). int FlushTlsCiphertext(); // Pull decrypted plaintext from SSL into _read_buf. - ssize_t ConsumeTlsPlaintext(size_t size_hint); + // ssize_t ConsumeTlsPlaintext(size_t size_hint); + ssize_t ConsumeTlsPlaintext(); #endif private: DISALLOW_COPY_AND_ASSIGN(Socket); @@ -698,7 +699,7 @@ friend void DereferenceSocket(Socket*); // success, -1 otherwise and errno is set ssize_t DoWrite(WriteRequest* req); #ifdef IO_URING_ENABLED - ssize_t DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], size_t ndata); + ssize_t DoWriteTlsRing(WriteRequest* req, butil::IOBuf* data_list[], size_t ndata, int* ssl_error); // io_uring TLS helpers for read path int EnsureTlsSessionForRing(); int ContinueTlsHandshake();