diff --git a/tx_service/include/remote/cc_stream_sender.h b/tx_service/include/remote/cc_stream_sender.h index 9299c8e5..5bb2f1d3 100644 --- a/tx_service/include/remote/cc_stream_sender.h +++ b/tx_service/include/remote/cc_stream_sender.h @@ -71,6 +71,28 @@ struct ResendScanSliceResp ScanSliceResponse msg_; }; +struct SendMessageResult +{ + bool sent{false}; + bool queued_for_retry{false}; + bool need_reconnect{false}; + + static SendMessageResult Sent() + { + return SendMessageResult{true, false, false}; + } + + static SendMessageResult Queued(bool need_reconnect) + { + return SendMessageResult{false, true, need_reconnect}; + } + + static SendMessageResult Failed(bool need_reconnect = false) + { + return SendMessageResult{false, false, need_reconnect}; + } +}; + class CcStreamSender { public: @@ -79,21 +101,31 @@ class CcStreamSender ~CcStreamSender(); void RecycleCcMsg(std::unique_ptr msg); - bool SendMessageToNg(uint32_t node_group_id, - const CcMessage &msg, - CcHandlerResultBase *res = nullptr, - bool resend = false); - bool SendMessageToNode(uint32_t dest_node_id, - const CcMessage &msg, - CcHandlerResultBase *res = nullptr, - bool resend = false, - bool resend_on_eagain = true, - bool log_verbose = false, - bool *need_reconnect = nullptr); - bool SendScanRespToNode(uint32_t dest_node_id, - const ScanSliceResponse &msg, - bool resend = false, - bool *need_reconnect = nullptr); + SendMessageResult SendMessageToNg(uint32_t node_group_id, + const CcMessage &msg, + CcHandlerResultBase *res = nullptr, + bool resend = false); + SendMessageResult SendMessageToNode(uint32_t dest_node_id, + const CcMessage &msg, + CcHandlerResultBase *res = nullptr, + bool resend = false); + SendMessageResult SendScanRespToNode(uint32_t dest_node_id, + const ScanSliceResponse &msg, + bool resend = false); + + /** + * @brief Send standby message with best-effort semantics (no + * retry/queuing). + * + * Unlike SendMessageToNode, this does not queue failed messages for retry. + * Use for standby replication which has a dedicated resend logic. The + * caller only cares whether the message is successfully sent into stream or + * not. + * + * @return true if sent immediately, false otherwise + */ + bool SendStandbyMessageToNode(uint32_t dest_node_id, const CcMessage &msg); + void UpdateRemoteNodes( const std::unordered_map &nodes_configs); diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index da23101a..15b9701f 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -2707,8 +2707,8 @@ void CcShard::ForwardStandbyMessage(StandbyForwardEntry *entry) } }); - write_succ = stream_sender_->SendMessageToNode( - node_id, entry_ptr->Message(), nullptr, false, false); + write_succ = stream_sender_->SendStandbyMessageToNode( + node_id, entry_ptr->Message()); if (write_succ) { last_sent_seq_id++; @@ -2839,8 +2839,8 @@ bool CcShard::ResendFailedForwardMessages() if (entry_it != seq_id_to_entry_map_.end()) { StandbyForwardEntry *entry = entry_it->second; - bool succ = stream_sender_->SendMessageToNode( - node_id, entry->Message(), nullptr, false, false); + bool succ = stream_sender_->SendStandbyMessageToNode( + node_id, entry->Message()); if (!succ) { all_msgs_sent = false; diff --git a/tx_service/src/dead_lock_check.cpp b/tx_service/src/dead_lock_check.cpp index e6e1a6dc..fd951c1f 100644 --- a/tx_service/src/dead_lock_check.cpp +++ b/tx_service/src/dead_lock_check.cpp @@ -207,10 +207,10 @@ void DeadLockCheck::GatherLockDependancy() tr::DeadLockRequest *dl = send_msg.mutable_dead_lock_request(); dl->set_src_node_id(Sharder::Instance().NodeId()); dl->set_check_round(check_round_); - bool hr = + auto send_res = Sharder::Instance().GetCcStreamSender()->SendMessageToNode( node_id, send_msg); - if (hr) + if (send_res.sent || send_res.queued_for_retry) { reply_map_.try_emplace(node_id, false); } diff --git a/tx_service/src/fault/fault_inject.cpp b/tx_service/src/fault/fault_inject.cpp index d713a284..3504cbcd 100644 --- a/tx_service/src/fault/fault_inject.cpp +++ b/tx_service/src/fault/fault_inject.cpp @@ -200,7 +200,9 @@ void FaultInject::TriggerAction(FaultEntry *entry) fi_req->set_fault_name(fault_name); fi_req->set_fault_paras(fault_paras); - b = ss->SendMessageToNode(dest_node_id, send_msg); + auto send_res = + ss->SendMessageToNode(dest_node_id, send_msg); + b = (send_res.sent || send_res.queued_for_retry); } // If CcStreamSender == nullptr or failed to send, send to local diff --git a/tx_service/src/remote/cc_stream_sender.cpp b/tx_service/src/remote/cc_stream_sender.cpp index 0de8bd74..c28237c9 100644 --- a/tx_service/src/remote/cc_stream_sender.cpp +++ b/tx_service/src/remote/cc_stream_sender.cpp @@ -154,16 +154,11 @@ bool CcStreamSender::UpdateStreamIP(uint32_t node_id, * @param msg * @param res * @param resend - * @return true - * @return false */ -bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, - const CcMessage &msg, - CcHandlerResultBase *res, - bool resend, - bool resend_on_eagain, - bool log_verbose, - bool *need_reconnect) +SendMessageResult CcStreamSender::SendMessageToNode(uint32_t dest_node_id, + const CcMessage &msg, + CcHandlerResultBase *res, + bool resend) { TX_TRACE_ACTION_WITH_CONTEXT( this, @@ -176,10 +171,6 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, .append("}"); })); TX_TRACE_DUMP(&msg); - if (log_verbose) - { - LOG(INFO) << "SendMessageToNode " << dest_node_id; - } std::shared_lock outbound_lk(outbound_mux_); auto stream_it = outbound_streams_.find(dest_node_id); if (stream_it == outbound_streams_.end()) @@ -192,24 +183,15 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, LOG(ERROR) << "Trying to connect to an unknown remote node. Node Id: " << dest_node_id; - return false; + return SendMessageResult::Failed(); } std::atomic &stream_version = std::get<1>(stream_it->second); int64_t stream_ver = stream_version.load(std::memory_order_acquire); if (stream_ver == -1) { - if (need_reconnect != nullptr) - { - *need_reconnect = true; - } // resend the message if stream is connecting std::lock_guard lk(to_connect_mux_); - if (log_verbose) - { - LOG(INFO) << "CC stream is connecting, buffer the message for " - "resend"; - } auto resend_message_list = resend_message_list_.try_emplace( dest_node_id, moodycamel::ConcurrentQueue()); resend_message_list.first->second.enqueue( @@ -219,7 +201,7 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, // resend messages. to_connect_flag_ = true; to_connect_cv_.notify_one(); - return true; + return SendMessageResult::Queued(true); } brpc::StreamId stream_id = std::get<0>(stream_it->second); @@ -233,31 +215,10 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, int error_code = brpc::StreamWrite(stream_id, iobuf, &stream_write_options_); - if (log_verbose) - { - LOG(INFO) << "cc_stream_sender: do stream write with stream id: " - << stream_id << " dest_node_id: " << dest_node_id - << " print response error code: " << error_code; - } - if (error_code != 0) { if (error_code == EAGAIN) { - if (!resend_on_eagain) - { - return false; - } - if (log_verbose) - { - LOG(INFO) - << "cc_stream_sender: retry stream write again on the " - "backgroud thread" - "with stream id: " - << stream_id - << " print response error code: " << error_code; - } - std::lock_guard resend_lk(resend_mux_); auto bg_resend_msg_list = @@ -291,20 +252,16 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, } } - return true; + return SendMessageResult::Queued(false); } else { // for resend message, we have already reconnect the stream, if it // still failed to send the message, it possibly means that the // remote node is dead. We should skip resend the message again. + // TODO: discard the message? needs thorough thinking. if (resend) { - if (log_verbose) - { - LOG(INFO) << "cc_stream_sender: resend message and break"; - } - // SendMessage error return -1 to indicate the request needs // retry. if (res != nullptr) @@ -327,16 +284,10 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, } } - return false; + return SendMessageResult::Failed(); } else { - if (log_verbose) - { - LOG(INFO) << "cc_stream_sender: check stream version: " - << stream_ver << " and need resend message"; - } - std::lock_guard lk(to_connect_mux_); // If the stream version is -1, a separate thread has notified // the connecting thread to reconnect the stream. If the stream @@ -361,20 +312,16 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, to_connect_flag_ = true; to_connect_cv_.notify_one(); - return true; + return SendMessageResult::Queued(true); } } } - else - { - return true; - } + + return SendMessageResult::Sent(); } -bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, - const ScanSliceResponse &msg, - bool resend, - bool *need_reconnect) +SendMessageResult CcStreamSender::SendScanRespToNode( + uint32_t dest_node_id, const ScanSliceResponse &msg, bool resend) { TX_TRACE_ACTION_WITH_CONTEXT( this, @@ -394,17 +341,13 @@ bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, { LOG(ERROR) << "Trying to connect to an unknown remote node. Node Id: " << dest_node_id; - return false; + return SendMessageResult::Failed(); } std::atomic &stream_version = std::get<1>(stream_it->second); int64_t stream_ver = stream_version.load(std::memory_order_acquire); if (stream_ver == -1) { - if (need_reconnect != nullptr) - { - *need_reconnect = true; - } // resend the message if stream is connecting std::lock_guard lk(to_connect_mux_); DLOG(INFO) << "CC stream is connecting, buffer the message for resend"; @@ -419,7 +362,7 @@ bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, // resend messages. to_connect_flag_ = true; to_connect_cv_.notify_one(); - return true; + return SendMessageResult::Queued(true); } brpc::StreamId &stream_id = std::get<0>(stream_it->second); @@ -432,7 +375,6 @@ bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, brpc::StreamWrite(stream_id, iobuf, &stream_write_options_); if (error_code != 0) - { if (error_code == EAGAIN) { @@ -470,16 +412,17 @@ bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, } } - return true; + return SendMessageResult::Queued(false); } else { // for resend message, we have already reconnect the stream, if it // still failed to send the message, it possibly means that the // remote node is dead. We should skip resend the message again. + // TODO: discard the message? needs thorough thinking. if (resend) { - return false; + return SendMessageResult::Failed(); } else { @@ -508,14 +451,68 @@ bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, to_connect_flag_ = true; to_connect_cv_.notify_one(); - return true; + return SendMessageResult::Queued(true); } } } - else + return SendMessageResult::Sent(); +} + +bool CcStreamSender::SendStandbyMessageToNode(uint32_t dest_node_id, + const CcMessage &msg) +{ + std::shared_lock outbound_lk(outbound_mux_); + auto stream_it = outbound_streams_.find(dest_node_id); + if (stream_it == outbound_streams_.end()) { - return true; + LOG_EVERY_SECOND(ERROR) + << "SendStandbyMessageToNode: unknown node " << dest_node_id; + return false; + } + + std::atomic &stream_version = std::get<1>(stream_it->second); + int64_t stream_ver = stream_version.load(std::memory_order_acquire); + if (stream_ver == -1) + { + std::lock_guard lk(to_connect_mux_); + // wake up connector thread to reconnect streams + to_connect_flag_ = true; + to_connect_cv_.notify_one(); + return false; + } + + brpc::StreamId stream_id = std::get<0>(stream_it->second); + outbound_lk.unlock(); + + butil::IOBuf iobuf; + butil::IOBufAsZeroCopyOutputStream wrapper(&iobuf); + msg.SerializeToZeroCopyStream(&wrapper); + + int error_code = + brpc::StreamWrite(stream_id, iobuf, &stream_write_options_); + if (error_code != 0) + { + DLOG(INFO) << "SendStandbyMessageToNode failed, node: " << dest_node_id + << ", err: " << error_code; + // For non-EAGAIN errors, trigger stream reconnection + if (error_code != EAGAIN) + { + std::lock_guard lk(to_connect_mux_); + // Set stream version to -1 to mark it as broken and trigger + // reconnection + if (stream_version.compare_exchange_strong( + stream_ver, -1, std::memory_order_release)) + { + to_connect_regular_streams_.try_emplace(dest_node_id, + stream_ver + 1); + } + to_connect_flag_ = true; + to_connect_cv_.notify_one(); + } + return false; } + + return true; } /** @@ -527,13 +524,11 @@ bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, * @param msg * @param res * @param resend - * @return true - * @return false */ -bool CcStreamSender::SendMessageToNg(uint32_t node_group_id, - const CcMessage &msg, - CcHandlerResultBase *res, - bool resend) +SendMessageResult CcStreamSender::SendMessageToNg(uint32_t node_group_id, + const CcMessage &msg, + CcHandlerResultBase *res, + bool resend) { uint32_t dest_node_id = Sharder::Instance().LeaderNodeId(node_group_id); return SendMessageToNode(dest_node_id, msg, res, resend); @@ -715,10 +710,9 @@ void CcStreamSender::ResendMessageToNode() for (size_t idx = 0; idx < msg_cnt; ++idx) { - if (SendMessageToNode(nid, - messages[idx]->msg_, - messages[idx]->res_, - false)) + auto send_result = SendMessageToNode( + nid, messages[idx]->msg_, messages[idx]->res_, false); + if (send_result.sent || send_result.queued_for_retry) { send_cnt += 1; } @@ -764,7 +758,9 @@ void CcStreamSender::ResendMessageToNode() for (size_t idx = 0; idx < msg_cnt; ++idx) { - if (SendScanRespToNode(nid, messages[idx]->msg_, false)) + auto send_result = + SendScanRespToNode(nid, messages[idx]->msg_, false); + if (send_result.sent || send_result.queued_for_retry) { send_cnt += 1; } @@ -846,18 +842,14 @@ void CcStreamSender::ConnectStreams() lk.unlock(); for (size_t i = 0; i < msg_cnt; ++i) { - SendMessageToNode(nid, - messages[i]->msg_, - messages[i]->res_, - true, - true, - false, - &need_reconnect); - if (need_reconnect) + auto send_result = SendMessageToNode( + nid, messages[i]->msg_, messages[i]->res_, true); + if (send_result.need_reconnect) { // re-enqueue messages from i+1 to msg_cnt-1 // (message at i was already enqueued in // SendMessageToNode) + assert(send_result.queued_for_retry); for (size_t left_idx = i + 1; left_idx < msg_cnt; ++left_idx) { @@ -867,6 +859,7 @@ void CcStreamSender::ConnectStreams() std::move(messages[left_idx])); } } + need_reconnect = true; break; } } @@ -910,13 +903,14 @@ void CcStreamSender::ConnectStreams() lk.unlock(); for (size_t i = 0; i < msg_cnt; ++i) { - SendScanRespToNode( - nid, messages[i]->msg_, true, &need_reconnect); - if (need_reconnect) + auto send_result = + SendScanRespToNode(nid, messages[i]->msg_, true); + if (send_result.need_reconnect) { // re-enqueue messages from i+1 to msg_cnt-1 // (message at i was already enqueued in // SendMessageToNode) + assert(send_result.queued_for_retry); for (size_t left_idx = i + 1; left_idx < msg_cnt; ++left_idx) { @@ -926,6 +920,7 @@ void CcStreamSender::ConnectStreams() std::move(messages[left_idx])); } } + need_reconnect = true; break; } } diff --git a/tx_service/src/remote/remote_cc_handler.cpp b/tx_service/src/remote/remote_cc_handler.cpp index 96e8569f..848ae8f7 100644 --- a/tx_service/src/remote/remote_cc_handler.cpp +++ b/tx_service/src/remote/remote_cc_handler.cpp @@ -865,8 +865,8 @@ void txservice::remote::RemoteCcHandler::BroadcastStatistics( broadcast_stat_req->set_schema_version(schema_ts); broadcast_stat_req->mutable_node_group_sample_pool()->CopyFrom(sample_pool); - bool send = stream_sender_.SendMessageToNg(ng_id, send_msg, &hres); - if (send) + auto send_res = stream_sender_.SendMessageToNg(ng_id, send_msg, &hres); + if (send_res.sent || send_res.queued_for_retry) { hres.SetFinished(); // Don't wait for remote result. }