diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 616ef33252..d7d0230767 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -985,11 +985,15 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { } butil::subtle::MemoryBarrier(); - _sq_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed); - if (_remote_rq_window_size.load(butil::memory_order_relaxed) >= - _local_window_capacity / 8) { + // Update sq window. + uint16_t old_sq_window_size = + _sq_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed); + bool is_remote_rq_ready = + _remote_rq_window_size.load(butil::memory_order_relaxed) >= + _local_window_capacity / 8; + if (0 == old_sq_window_size && is_remote_rq_ready) { // Do not wake up writing thread right after polling IBV_WC_SEND. - // Otherwise the writing thread may switch to background too quickly. + // Otherwise, the writing thread may switch to background too quickly. _socket->WakeAsEpollOut(); } return 0; @@ -1009,15 +1013,16 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { } } if (0 != (wc.wc_flags & IBV_WC_WITH_IMM) && wc.imm_data > 0) { - // Update window + // Update remote sq window. uint32_t acks = butil::NetToHost32(wc.imm_data); - uint32_t wnd_thresh = _local_window_capacity / 8; - uint32_t remote_rq_window_size = + uint32_t remote_sq_window_thresh = _local_window_capacity / 8; + uint32_t old_remote_rq_window_size = _remote_rq_window_size.fetch_add(acks, butil::memory_order_relaxed); if (_sq_window_size.load(butil::memory_order_relaxed) > 0 && - (remote_rq_window_size >= wnd_thresh || acks >= wnd_thresh)) { + old_remote_rq_window_size < remote_sq_window_thresh && + old_remote_rq_window_size + acks >= remote_sq_window_thresh) { // Do not wake up writing thread right after _remote_rq_window_size > 0. - // Otherwise the writing thread may switch to background too quickly. + // Otherwise, the writing thread may switch to background too quickly. _socket->WakeAsEpollOut(); } } diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 9490650b78..269b8a1e2b 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -99,6 +99,9 @@ DEFINE_int32(connect_timeout_as_unreachable, 3, "times *continuously*, the error is changed to ENETUNREACH which " "fails the main socket as well when this socket is pooled."); +DEFINE_int32(wait_output_event_timeout_ms, 50, + "Timeout for waiting output event, default: 50ms."); + DECLARE_bool(usercode_in_coroutine); static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) { @@ -107,8 +110,6 @@ static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) { BRPC_VALIDATE_GFLAG(connect_timeout_as_unreachable, validate_connect_timeout_as_unreachable); -const int WAIT_EPOLLOUT_TIMEOUT_MS = 50; - class BAIDU_CACHELINE_ALIGNMENT SocketPool { friend class Socket; public: @@ -718,7 +719,7 @@ int Socket::OnCreated(const SocketOptions& options) { return -1; } _io_event.set_bthread_tag(options.bthread_tag); - auto guard = butil::MakeScopeGuard([this] { + auto io_event_guard = butil::MakeScopeGuard([this] { _io_event.Reset(); }); @@ -759,19 +760,12 @@ int Socket::OnCreated(const SocketOptions& options) { #if BRPC_WITH_RDMA CHECK(_rdma_ep == NULL); if (options.use_rdma) { - _rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(this); - if (!_rdma_ep) { - const int saved_errno = errno; - PLOG(ERROR) << "Fail to create RdmaEndpoint"; - SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s", - berror(saved_errno)); - return -1; - } + _rdma_ep = new rdma::RdmaEndpoint(this); _rdma_state = RDMA_UNKNOWN; } else { _rdma_state = RDMA_OFF; } -#endif +#endif // BRPC_WITH_RDMA _connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN; _controller_released_socket.store(false, butil::memory_order_relaxed); _overcrowded = false; @@ -813,12 +807,12 @@ int Socket::OnCreated(const SocketOptions& options) { if (ResetFileDescriptor(fd) != 0) { const int saved_errno = errno; PLOG(ERROR) << "Fail to ResetFileDescriptor"; - SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s", - berror(saved_errno)); + SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s", berror(saved_errno)); return -1; } + HoldHCRelatedRef(); - guard.dismiss(); + io_event_guard.dismiss(); return 0; } @@ -866,7 +860,7 @@ void Socket::BeforeRecycled() { _rdma_ep = NULL; _rdma_state = RDMA_UNKNOWN; } -#endif +#endif // BRPC_WITH_RDMA reset_parsing_context(NULL); _read_buf.clear(); @@ -953,7 +947,7 @@ std::string Socket::OnDescription() const { butil::string_appendf(&result, "fd=%d ", saved_fd); } butil::string_appendf(&result, "addr=%s", - butil::endpoint2str(remote_side()).c_str()); + butil::endpoint2str(remote_side()).c_str()); const int local_port = local_side().port; if (local_port > 0) { butil::string_appendf(&result, ":%d", local_port); @@ -997,7 +991,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { << " was abandoned during health checking"; return -1; } else { - // nobody holds a health-checking-related reference, + // Nobody holds a health-checking-related reference, // so no need to do health checking. if (!_is_hc_related_ref_held) { RPC_VLOG << "Nobody holds a health-checking-related reference" @@ -1026,7 +1020,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { _rdma_ep->Reset(); _rdma_state = RDMA_UNKNOWN; } -#endif +#endif // BRPC_WITH_RDMA _local_side = butil::EndPoint(); if (_ssl_session) { @@ -1493,8 +1487,7 @@ void Socket::AfterAppConnected(int err, void* data) { bthread_t th; bthread_attr_t attr = BTHREAD_ATTR_NORMAL; bthread_attr_set_name(&attr, "KeepWrite"); - if (bthread_start_background( - &th, &attr, KeepWrite, req) != 0) { + if (bthread_start_background(&th, &attr, KeepWrite, req) != 0) { PLOG(WARNING) << "Fail to start KeepWrite"; KeepWrite(req); } @@ -1825,6 +1818,13 @@ void* Socket::KeepWrite(void* void_arg) { break; } +#if BRPC_WITH_RDMA + int expected_val = 0; + if (s->_rdma_state == RDMA_ON) { + expected_val = s->_epollout_butex->load(butil::memory_order_acquire); + } +#endif // BRPC_WITH_RDMA + const ssize_t nw = s->DoWrite(req); if (nw < 0) { if (errno != EAGAIN && errno != EOVERCROWDED) { @@ -1862,27 +1862,24 @@ void* Socket::KeepWrite(void* void_arg) { // KeepWrite to check and setup pending WriteRequests periodically, // which may turn on _overcrowded to stop pending requests from // growing infinitely. - const timespec duetime = - butil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS); + const timespec duetime = butil::milliseconds_from_now( + std::max(1, FLAGS_wait_output_event_timeout_ms)); #if BRPC_WITH_RDMA if (s->_rdma_state == RDMA_ON) { - const int expected_val = s->_epollout_butex - ->load(butil::memory_order_acquire); CHECK(s->_rdma_ep != NULL); if (!s->_rdma_ep->IsWritable()) { g_vars->nwaitepollout << 1; - if (bthread::butex_wait(s->_epollout_butex, - expected_val, &duetime) < 0) { - if (errno != EAGAIN && errno != ETIMEDOUT) { + if (bthread::butex_wait(s->_epollout_butex, expected_val, &duetime) < 0) { + if (errno != EAGAIN && errno != ETIMEDOUT && errno != EWOULDBLOCK) { const int saved_errno = errno; PLOG(WARNING) << "Fail to wait rdma window of " << *s; s->SetFailed(saved_errno, "Fail to wait rdma window of %s: %s", - s->description().c_str(), berror(saved_errno)); + s->description().c_str(), berror(saved_errno)); } if (s->Failed()) { // NOTE: // Different from TCP, we cannot find the RDMA channel - // failed by writing to it. Thus we must check if it + // failed by writing to it. Thus, we must check if it // is already failed here. break; } @@ -1891,7 +1888,7 @@ void* Socket::KeepWrite(void* void_arg) { } else { #else { -#endif +#endif // BRPC_WITH_RDMA g_vars->nwaitepollout << 1; bool pollin = (s->_on_edge_triggered_events != NULL); const int rc = s->WaitEpollOut(s->fd(), pollin, &duetime); @@ -2281,7 +2278,7 @@ int Socket::OnInputEvent(void* user_data, uint32_t events, LOG(FATAL) << "Fail to start ProcessEvent"; ProcessEvent(p); } -#endif +#endif // BRPC_WITH_RDMA } else if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) { LOG(FATAL) << "Fail to start ProcessEvent"; ProcessEvent(p); @@ -2591,7 +2588,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { if (ptr->_rdma_state == RDMA_ON && ptr->_rdma_ep) { ptr->_rdma_ep->DebugInfo(os); } -#endif +#endif // BRPC_WITH_RDMA { os << "\nbthread_tag=" << ptr->_io_event.bthread_tag(); } } diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 14bea71db5..66843555cd 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -235,6 +235,8 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, std::unique_lock mu(_mutex); SingleConnection* sc = _map.seek(key); if (sc) { + // The `_mutex' guarantees the consistent state + // of `_is_hc_related_ref_held' in SocketMap. if (!sc->socket->Failed() || sc->socket->HCEnabled()) { ++sc->ref_count; *id = sc->socket->id(); @@ -255,7 +257,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, opt.use_rdma = use_rdma; opt.hc_option = hc_option; if (_options.socket_creator->CreateSocket(opt, &tmp_id) != 0) { - PLOG(FATAL) << "Fail to create socket to " << key.peer; + PLOG(ERROR) << "Fail to create socket to " << key.peer; return -1; } // Add a reference to make sure that sc->socket is always accessible. Not @@ -266,13 +268,18 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, if (rc < 0) { LOG(FATAL) << "Fail to address SocketId=" << tmp_id; return -1; - } else if (rc > 0 && !ptr->HCEnabled()) { - LOG(FATAL) << "Failed socket is not HC-enabled"; + } else if (rc > 0 && + // The `_mutex' guarantees the consistent state + // of `_is_hc_related_ref_held' in SocketMap. + !ptr->HCEnabled()) { + LOG(ERROR) << "Failed socket is not HC-enabled"; return -1; } // If health check is enabled, a health-checking-related reference // is hold in Socket::Create. // If health check is disabled, hold a reference in SocketMap. + // The `_mutex' guarantees the consistent state + // of `_is_hc_related_ref_held' in SocketMap. SingleConnection new_sc = { 1, ptr->HCEnabled() ? ptr.get() : ptr.release(), 0 }; _map[key] = new_sc; *id = tmp_id; @@ -317,6 +324,10 @@ void SocketMap::RemoveInternal(const SocketMapKey& key, } void SocketMap::ReleaseReference(Socket* s) { + // The release fence in Dereference() of ReleaseHCRelatedReference() + // pairs with acquire fence in versioned_ref() of WaitAndReset() to + // avoid inconsistent states of `_is_hc_related_ref_held' to be seen + // by others. if (s->HCEnabled()) { s->ReleaseHCRelatedReference(); } else { diff --git a/src/bvar/detail/sampler.cpp b/src/bvar/detail/sampler.cpp index dd6271e7ba..bd95ea0ff0 100644 --- a/src/bvar/detail/sampler.cpp +++ b/src/bvar/detail/sampler.cpp @@ -139,7 +139,7 @@ void SamplerCollector::run() { // NOTE: // * Following vars can't be created on thread's stack since this thread // may be abandoned at any time after forking. - // * They can't created inside the constructor of SamplerCollector as well, + // * They can't be created inside the constructor of SamplerCollector as well, // which results in deadlock. if (s_cumulated_time_bvar == NULL) { s_cumulated_time_bvar = diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp index 8e9f90e833..750757a5a8 100644 --- a/test/brpc_socket_unittest.cpp +++ b/test/brpc_socket_unittest.cpp @@ -1495,8 +1495,44 @@ TEST_F(SocketTest, tcp_user_timeout) { } #endif +TEST_F(SocketTest, connect_on_creation) { + brpc::Acceptor* messenger = new brpc::Acceptor; + int listening_fd = -1; + butil::EndPoint point(butil::IP_ANY, 7878); + for (int i = 0; i < 100; ++i) { + point.port += i; + listening_fd = tcp_listen(point); + if (listening_fd >= 0) { + break; + } + } + ASSERT_GT(listening_fd, 0) << berror(); + ASSERT_EQ(0, butil::make_non_blocking(listening_fd)); + ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL, false)); + + { + brpc::SocketOptions options; + options.remote_side = point; + options.connect_on_create = true; + brpc::SocketId id = brpc::INVALID_SOCKET_ID; + ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); + brpc::SocketUniquePtr ptr; + ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); + ASSERT_GT(ptr->fd(), 0); + } + + { + point.port = 7879; + brpc::SocketOptions options; + options.remote_side = point; + options.connect_on_create = true; + brpc::SocketId id = brpc::INVALID_SOCKET_ID; + ASSERT_EQ(-1, brpc::get_or_new_client_side_messenger()->Create(options, &id)); + } +} + int HandleSocketSuccessWrite(bthread_id_t id, void* data, int error_code, - const std::string& error_text) { + const std::string& error_text) { auto success_count = static_cast(data); EXPECT_NE(nullptr, success_count); EXPECT_EQ(0, error_code);