Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -985,11 +985,15 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
}
butil::subtle::MemoryBarrier();

_sq_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed);
if (_remote_rq_window_size.load(butil::memory_order_relaxed) >=
_local_window_capacity / 8) {
// Update sq window.
uint16_t old_sq_window_size =
_sq_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed);
bool is_remote_rq_ready =
_remote_rq_window_size.load(butil::memory_order_relaxed) >=
_local_window_capacity / 8;
if (0 == old_sq_window_size && is_remote_rq_ready) {
// Do not wake up writing thread right after polling IBV_WC_SEND.
// Otherwise the writing thread may switch to background too quickly.
// Otherwise, the writing thread may switch to background too quickly.
_socket->WakeAsEpollOut();
}
return 0;
Expand All @@ -1009,15 +1013,16 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
}
}
if (0 != (wc.wc_flags & IBV_WC_WITH_IMM) && wc.imm_data > 0) {
// Update window
// Update remote sq window.
uint32_t acks = butil::NetToHost32(wc.imm_data);
uint32_t wnd_thresh = _local_window_capacity / 8;
uint32_t remote_rq_window_size =
uint32_t remote_sq_window_thresh = _local_window_capacity / 8;
uint32_t old_remote_rq_window_size =
_remote_rq_window_size.fetch_add(acks, butil::memory_order_relaxed);
if (_sq_window_size.load(butil::memory_order_relaxed) > 0 &&
(remote_rq_window_size >= wnd_thresh || acks >= wnd_thresh)) {
old_remote_rq_window_size < remote_sq_window_thresh &&
old_remote_rq_window_size + acks >= remote_sq_window_thresh) {
// Do not wake up writing thread right after _remote_rq_window_size > 0.
// Otherwise the writing thread may switch to background too quickly.
// Otherwise, the writing thread may switch to background too quickly.
_socket->WakeAsEpollOut();
}
}
Expand Down
63 changes: 30 additions & 33 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ DEFINE_int32(connect_timeout_as_unreachable, 3,
"times *continuously*, the error is changed to ENETUNREACH which "
"fails the main socket as well when this socket is pooled.");

DEFINE_int32(wait_output_event_timeout_ms, 50,
"Timeout for waiting output event, default: 50ms.");

DECLARE_bool(usercode_in_coroutine);

static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) {
Expand All @@ -107,8 +110,6 @@ static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) {
BRPC_VALIDATE_GFLAG(connect_timeout_as_unreachable,
validate_connect_timeout_as_unreachable);

const int WAIT_EPOLLOUT_TIMEOUT_MS = 50;

class BAIDU_CACHELINE_ALIGNMENT SocketPool {
friend class Socket;
public:
Expand Down Expand Up @@ -718,7 +719,7 @@ int Socket::OnCreated(const SocketOptions& options) {
return -1;
}
_io_event.set_bthread_tag(options.bthread_tag);
auto guard = butil::MakeScopeGuard([this] {
auto io_event_guard = butil::MakeScopeGuard([this] {
_io_event.Reset();
});

Expand Down Expand Up @@ -759,19 +760,12 @@ int Socket::OnCreated(const SocketOptions& options) {
#if BRPC_WITH_RDMA
CHECK(_rdma_ep == NULL);
if (options.use_rdma) {
_rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(this);
if (!_rdma_ep) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to create RdmaEndpoint";
SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
berror(saved_errno));
return -1;
}
_rdma_ep = new rdma::RdmaEndpoint(this);
_rdma_state = RDMA_UNKNOWN;
} else {
_rdma_state = RDMA_OFF;
}
#endif
#endif // BRPC_WITH_RDMA
_connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
_controller_released_socket.store(false, butil::memory_order_relaxed);
_overcrowded = false;
Expand Down Expand Up @@ -813,12 +807,12 @@ int Socket::OnCreated(const SocketOptions& options) {
if (ResetFileDescriptor(fd) != 0) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to ResetFileDescriptor";
SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s",
berror(saved_errno));
SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s", berror(saved_errno));
return -1;
}

HoldHCRelatedRef();
guard.dismiss();
io_event_guard.dismiss();

return 0;
}
Expand Down Expand Up @@ -866,7 +860,7 @@ void Socket::BeforeRecycled() {
_rdma_ep = NULL;
_rdma_state = RDMA_UNKNOWN;
}
#endif
#endif // BRPC_WITH_RDMA

reset_parsing_context(NULL);
_read_buf.clear();
Expand Down Expand Up @@ -953,7 +947,7 @@ std::string Socket::OnDescription() const {
butil::string_appendf(&result, "fd=%d ", saved_fd);
}
butil::string_appendf(&result, "addr=%s",
butil::endpoint2str(remote_side()).c_str());
butil::endpoint2str(remote_side()).c_str());
const int local_port = local_side().port;
if (local_port > 0) {
butil::string_appendf(&result, ":%d", local_port);
Expand Down Expand Up @@ -997,7 +991,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
<< " was abandoned during health checking";
return -1;
} else {
// nobody holds a health-checking-related reference,
// Nobody holds a health-checking-related reference,
// so no need to do health checking.
if (!_is_hc_related_ref_held) {
RPC_VLOG << "Nobody holds a health-checking-related reference"
Expand Down Expand Up @@ -1026,7 +1020,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
_rdma_ep->Reset();
_rdma_state = RDMA_UNKNOWN;
}
#endif
#endif // BRPC_WITH_RDMA

_local_side = butil::EndPoint();
if (_ssl_session) {
Expand Down Expand Up @@ -1493,8 +1487,7 @@ void Socket::AfterAppConnected(int err, void* data) {
bthread_t th;
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bthread_attr_set_name(&attr, "KeepWrite");
if (bthread_start_background(
&th, &attr, KeepWrite, req) != 0) {
if (bthread_start_background(&th, &attr, KeepWrite, req) != 0) {
PLOG(WARNING) << "Fail to start KeepWrite";
KeepWrite(req);
}
Expand Down Expand Up @@ -1825,6 +1818,13 @@ void* Socket::KeepWrite(void* void_arg) {
break;
}

#if BRPC_WITH_RDMA
int expected_val = 0;
if (s->_rdma_state == RDMA_ON) {
expected_val = s->_epollout_butex->load(butil::memory_order_acquire);
}
#endif // BRPC_WITH_RDMA

const ssize_t nw = s->DoWrite(req);
if (nw < 0) {
if (errno != EAGAIN && errno != EOVERCROWDED) {
Expand Down Expand Up @@ -1862,27 +1862,24 @@ void* Socket::KeepWrite(void* void_arg) {
// KeepWrite to check and setup pending WriteRequests periodically,
// which may turn on _overcrowded to stop pending requests from
// growing infinitely.
const timespec duetime =
butil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS);
const timespec duetime = butil::milliseconds_from_now(
std::max(1, FLAGS_wait_output_event_timeout_ms));
#if BRPC_WITH_RDMA
if (s->_rdma_state == RDMA_ON) {
const int expected_val = s->_epollout_butex
->load(butil::memory_order_acquire);
CHECK(s->_rdma_ep != NULL);
if (!s->_rdma_ep->IsWritable()) {
g_vars->nwaitepollout << 1;
if (bthread::butex_wait(s->_epollout_butex,
expected_val, &duetime) < 0) {
if (errno != EAGAIN && errno != ETIMEDOUT) {
if (bthread::butex_wait(s->_epollout_butex, expected_val, &duetime) < 0) {
if (errno != EAGAIN && errno != ETIMEDOUT && errno != EWOULDBLOCK) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to wait rdma window of " << *s;
s->SetFailed(saved_errno, "Fail to wait rdma window of %s: %s",
s->description().c_str(), berror(saved_errno));
s->description().c_str(), berror(saved_errno));
}
if (s->Failed()) {
// NOTE:
// Different from TCP, we cannot find the RDMA channel
// failed by writing to it. Thus we must check if it
// failed by writing to it. Thus, we must check if it
// is already failed here.
break;
}
Expand All @@ -1891,7 +1888,7 @@ void* Socket::KeepWrite(void* void_arg) {
} else {
#else
{
#endif
#endif // BRPC_WITH_RDMA
g_vars->nwaitepollout << 1;
bool pollin = (s->_on_edge_triggered_events != NULL);
const int rc = s->WaitEpollOut(s->fd(), pollin, &duetime);
Expand Down Expand Up @@ -2281,7 +2278,7 @@ int Socket::OnInputEvent(void* user_data, uint32_t events,
LOG(FATAL) << "Fail to start ProcessEvent";
ProcessEvent(p);
}
#endif
#endif // BRPC_WITH_RDMA
} else if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
LOG(FATAL) << "Fail to start ProcessEvent";
ProcessEvent(p);
Expand Down Expand Up @@ -2591,7 +2588,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
if (ptr->_rdma_state == RDMA_ON && ptr->_rdma_ep) {
ptr->_rdma_ep->DebugInfo(os);
}
#endif
#endif // BRPC_WITH_RDMA
{ os << "\nbthread_tag=" << ptr->_io_event.bthread_tag(); }
}

Expand Down
17 changes: 14 additions & 3 deletions src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
std::unique_lock<butil::Mutex> mu(_mutex);
SingleConnection* sc = _map.seek(key);
if (sc) {
// The `_mutex' guarantees the consistent state
// of `_is_hc_related_ref_held' in SocketMap.
if (!sc->socket->Failed() || sc->socket->HCEnabled()) {
++sc->ref_count;
*id = sc->socket->id();
Expand All @@ -255,7 +257,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
opt.use_rdma = use_rdma;
opt.hc_option = hc_option;
if (_options.socket_creator->CreateSocket(opt, &tmp_id) != 0) {
PLOG(FATAL) << "Fail to create socket to " << key.peer;
PLOG(ERROR) << "Fail to create socket to " << key.peer;
return -1;
}
// Add a reference to make sure that sc->socket is always accessible. Not
Expand All @@ -266,13 +268,18 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
if (rc < 0) {
LOG(FATAL) << "Fail to address SocketId=" << tmp_id;
return -1;
} else if (rc > 0 && !ptr->HCEnabled()) {
LOG(FATAL) << "Failed socket is not HC-enabled";
} else if (rc > 0 &&
// The `_mutex' guarantees the consistent state
// of `_is_hc_related_ref_held' in SocketMap.
!ptr->HCEnabled()) {
LOG(ERROR) << "Failed socket is not HC-enabled";
return -1;
}
// If health check is enabled, a health-checking-related reference
// is hold in Socket::Create.
// If health check is disabled, hold a reference in SocketMap.
// The `_mutex' guarantees the consistent state
// of `_is_hc_related_ref_held' in SocketMap.
SingleConnection new_sc = { 1, ptr->HCEnabled() ? ptr.get() : ptr.release(), 0 };
_map[key] = new_sc;
*id = tmp_id;
Expand Down Expand Up @@ -317,6 +324,10 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
}

void SocketMap::ReleaseReference(Socket* s) {
// The release fence in Dereference() of ReleaseHCRelatedReference()
// pairs with acquire fence in versioned_ref() of WaitAndReset() to
// avoid inconsistent states of `_is_hc_related_ref_held' to be seen
// by others.
if (s->HCEnabled()) {
s->ReleaseHCRelatedReference();
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/bvar/detail/sampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void SamplerCollector::run() {
// NOTE:
// * Following vars can't be created on thread's stack since this thread
// may be abandoned at any time after forking.
// * They can't created inside the constructor of SamplerCollector as well,
// * They can't be created inside the constructor of SamplerCollector as well,
// which results in deadlock.
if (s_cumulated_time_bvar == NULL) {
s_cumulated_time_bvar =
Expand Down
38 changes: 37 additions & 1 deletion test/brpc_socket_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1495,8 +1495,44 @@ TEST_F(SocketTest, tcp_user_timeout) {
}
#endif

TEST_F(SocketTest, connect_on_creation) {
brpc::Acceptor* messenger = new brpc::Acceptor;
int listening_fd = -1;
butil::EndPoint point(butil::IP_ANY, 7878);
for (int i = 0; i < 100; ++i) {
point.port += i;
listening_fd = tcp_listen(point);
if (listening_fd >= 0) {
break;
}
}
ASSERT_GT(listening_fd, 0) << berror();
ASSERT_EQ(0, butil::make_non_blocking(listening_fd));
ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL, false));

{
brpc::SocketOptions options;
options.remote_side = point;
options.connect_on_create = true;
brpc::SocketId id = brpc::INVALID_SOCKET_ID;
ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id));
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
ASSERT_GT(ptr->fd(), 0);
}

{
point.port = 7879;
brpc::SocketOptions options;
options.remote_side = point;
options.connect_on_create = true;
brpc::SocketId id = brpc::INVALID_SOCKET_ID;
ASSERT_EQ(-1, brpc::get_or_new_client_side_messenger()->Create(options, &id));
}
}

int HandleSocketSuccessWrite(bthread_id_t id, void* data, int error_code,
const std::string& error_text) {
const std::string& error_text) {
auto success_count = static_cast<size_t*>(data);
EXPECT_NE(nullptr, success_count);
EXPECT_EQ(0, error_code);
Expand Down
Loading