From 387fd00f206a50ec10ee48aae70dccfe2c971499 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Wed, 10 Dec 2025 19:05:00 +0800 Subject: [PATCH 1/7] add dedicated SendStandbyMessageToNode --- tx_service/include/remote/cc_stream_sender.h | 4 +- tx_service/src/cc/cc_shard.cpp | 8 +- tx_service/src/remote/cc_stream_sender.cpp | 81 +++++++++----------- 3 files changed, 42 insertions(+), 51 deletions(-) diff --git a/tx_service/include/remote/cc_stream_sender.h b/tx_service/include/remote/cc_stream_sender.h index 9299c8e5..5157c588 100644 --- a/tx_service/include/remote/cc_stream_sender.h +++ b/tx_service/include/remote/cc_stream_sender.h @@ -87,13 +87,13 @@ class CcStreamSender const CcMessage &msg, CcHandlerResultBase *res = nullptr, bool resend = false, - bool resend_on_eagain = true, - bool log_verbose = false, bool *need_reconnect = nullptr); bool SendScanRespToNode(uint32_t dest_node_id, const ScanSliceResponse &msg, bool resend = false, bool *need_reconnect = nullptr); + bool SendStandbyMessageToNode(uint32_t dest_node_id, + const CcMessage &msg); void UpdateRemoteNodes( const std::unordered_map &nodes_configs); diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index da23101a..15b9701f 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -2707,8 +2707,8 @@ void CcShard::ForwardStandbyMessage(StandbyForwardEntry *entry) } }); - write_succ = stream_sender_->SendMessageToNode( - node_id, entry_ptr->Message(), nullptr, false, false); + write_succ = stream_sender_->SendStandbyMessageToNode( + node_id, entry_ptr->Message()); if (write_succ) { last_sent_seq_id++; @@ -2839,8 +2839,8 @@ bool CcShard::ResendFailedForwardMessages() if (entry_it != seq_id_to_entry_map_.end()) { StandbyForwardEntry *entry = entry_it->second; - bool succ = stream_sender_->SendMessageToNode( - node_id, entry->Message(), nullptr, false, false); + bool succ = stream_sender_->SendStandbyMessageToNode( + node_id, entry->Message()); if (!succ) { all_msgs_sent = false; diff --git a/tx_service/src/remote/cc_stream_sender.cpp b/tx_service/src/remote/cc_stream_sender.cpp index 0de8bd74..0a24c43e 100644 --- a/tx_service/src/remote/cc_stream_sender.cpp +++ b/tx_service/src/remote/cc_stream_sender.cpp @@ -161,8 +161,6 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, const CcMessage &msg, CcHandlerResultBase *res, bool resend, - bool resend_on_eagain, - bool log_verbose, bool *need_reconnect) { TX_TRACE_ACTION_WITH_CONTEXT( @@ -176,10 +174,6 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, .append("}"); })); TX_TRACE_DUMP(&msg); - if (log_verbose) - { - LOG(INFO) << "SendMessageToNode " << dest_node_id; - } std::shared_lock outbound_lk(outbound_mux_); auto stream_it = outbound_streams_.find(dest_node_id); if (stream_it == outbound_streams_.end()) @@ -205,11 +199,6 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, } // resend the message if stream is connecting std::lock_guard lk(to_connect_mux_); - if (log_verbose) - { - LOG(INFO) << "CC stream is connecting, buffer the message for " - "resend"; - } auto resend_message_list = resend_message_list_.try_emplace( dest_node_id, moodycamel::ConcurrentQueue()); resend_message_list.first->second.enqueue( @@ -233,31 +222,10 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, int error_code = brpc::StreamWrite(stream_id, iobuf, &stream_write_options_); - if (log_verbose) - { - LOG(INFO) << "cc_stream_sender: do stream write with stream id: " - << stream_id << " dest_node_id: " << dest_node_id - << " print response error code: " << error_code; - } - if (error_code != 0) { if (error_code == EAGAIN) { - if (!resend_on_eagain) - { - return false; - } - if (log_verbose) - { - LOG(INFO) - << "cc_stream_sender: retry stream write again on the " - "backgroud thread" - "with stream id: " - << stream_id - << " print response error code: " << error_code; - } - std::lock_guard resend_lk(resend_mux_); auto bg_resend_msg_list = @@ -300,11 +268,6 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, // remote node is dead. We should skip resend the message again. if (resend) { - if (log_verbose) - { - LOG(INFO) << "cc_stream_sender: resend message and break"; - } - // SendMessage error return -1 to indicate the request needs // retry. if (res != nullptr) @@ -331,12 +294,6 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, } else { - if (log_verbose) - { - LOG(INFO) << "cc_stream_sender: check stream version: " - << stream_ver << " and need resend message"; - } - std::lock_guard lk(to_connect_mux_); // If the stream version is -1, a separate thread has notified // the connecting thread to reconnect the stream. If the stream @@ -518,6 +475,42 @@ bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, } } +bool CcStreamSender::SendStandbyMessageToNode(uint32_t dest_node_id, + const CcMessage &msg) +{ + std::shared_lock outbound_lk(outbound_mux_); + auto stream_it = outbound_streams_.find(dest_node_id); + if (stream_it == outbound_streams_.end()) + { + LOG_EVERY_SECOND(ERROR) + << "SendStandbyMessageToNode: unknown node " << dest_node_id; + return false; + } + + std::atomic &stream_version = std::get<1>(stream_it->second); + if (stream_version.load(std::memory_order_acquire) == -1) + { + return false; + } + + brpc::StreamId stream_id = std::get<0>(stream_it->second); + outbound_lk.unlock(); + + butil::IOBuf iobuf; + butil::IOBufAsZeroCopyOutputStream wrapper(&iobuf); + msg.SerializeToZeroCopyStream(&wrapper); + + int error_code = brpc::StreamWrite(stream_id, iobuf, &stream_write_options_); + if (error_code != 0) + { + DLOG(INFO) << "SendStandbyMessageToNode failed, node: " << dest_node_id + << ", err: " << error_code; + return false; + } + + return true; +} + /** * @brief Send a message to a node group leader. Failed message will * be put into a retry list once the stream to the node group leader is @@ -850,8 +843,6 @@ void CcStreamSender::ConnectStreams() messages[i]->msg_, messages[i]->res_, true, - true, - false, &need_reconnect); if (need_reconnect) { From 8147aa65c3c48fa11c2705cc2bf1e3e3b33b49d7 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Wed, 10 Dec 2025 20:07:27 +0800 Subject: [PATCH 2/7] clarify SendMessageResult return type for cc stream sends --- tx_service/include/remote/cc_stream_sender.h | 46 +++++--- tx_service/src/dead_lock_check.cpp | 4 +- tx_service/src/fault/fault_inject.cpp | 3 +- tx_service/src/remote/cc_stream_sender.cpp | 105 ++++++++----------- tx_service/src/remote/remote_cc_handler.cpp | 4 +- 5 files changed, 81 insertions(+), 81 deletions(-) diff --git a/tx_service/include/remote/cc_stream_sender.h b/tx_service/include/remote/cc_stream_sender.h index 5157c588..a1c93956 100644 --- a/tx_service/include/remote/cc_stream_sender.h +++ b/tx_service/include/remote/cc_stream_sender.h @@ -71,6 +71,28 @@ struct ResendScanSliceResp ScanSliceResponse msg_; }; +struct SendMessageResult +{ + bool sent{false}; + bool queued_for_retry{false}; + bool need_reconnect{false}; + + static SendMessageResult Sent() + { + return SendMessageResult{true, false, false}; + } + + static SendMessageResult Queued(bool need_reconnect) + { + return SendMessageResult{false, true, need_reconnect}; + } + + static SendMessageResult Failed(bool need_reconnect = false) + { + return SendMessageResult{false, false, need_reconnect}; + } +}; + class CcStreamSender { public: @@ -79,19 +101,17 @@ class CcStreamSender ~CcStreamSender(); void RecycleCcMsg(std::unique_ptr msg); - bool SendMessageToNg(uint32_t node_group_id, - const CcMessage &msg, - CcHandlerResultBase *res = nullptr, - bool resend = false); - bool SendMessageToNode(uint32_t dest_node_id, - const CcMessage &msg, - CcHandlerResultBase *res = nullptr, - bool resend = false, - bool *need_reconnect = nullptr); - bool SendScanRespToNode(uint32_t dest_node_id, - const ScanSliceResponse &msg, - bool resend = false, - bool *need_reconnect = nullptr); + SendMessageResult SendMessageToNg(uint32_t node_group_id, + const CcMessage &msg, + CcHandlerResultBase *res = nullptr, + bool resend = false); + SendMessageResult SendMessageToNode(uint32_t dest_node_id, + const CcMessage &msg, + CcHandlerResultBase *res = nullptr, + bool resend = false); + SendMessageResult SendScanRespToNode(uint32_t dest_node_id, + const ScanSliceResponse &msg, + bool resend = false); bool SendStandbyMessageToNode(uint32_t dest_node_id, const CcMessage &msg); void UpdateRemoteNodes( diff --git a/tx_service/src/dead_lock_check.cpp b/tx_service/src/dead_lock_check.cpp index e6e1a6dc..fd951c1f 100644 --- a/tx_service/src/dead_lock_check.cpp +++ b/tx_service/src/dead_lock_check.cpp @@ -207,10 +207,10 @@ void DeadLockCheck::GatherLockDependancy() tr::DeadLockRequest *dl = send_msg.mutable_dead_lock_request(); dl->set_src_node_id(Sharder::Instance().NodeId()); dl->set_check_round(check_round_); - bool hr = + auto send_res = Sharder::Instance().GetCcStreamSender()->SendMessageToNode( node_id, send_msg); - if (hr) + if (send_res.sent || send_res.queued_for_retry) { reply_map_.try_emplace(node_id, false); } diff --git a/tx_service/src/fault/fault_inject.cpp b/tx_service/src/fault/fault_inject.cpp index d713a284..fd8b39db 100644 --- a/tx_service/src/fault/fault_inject.cpp +++ b/tx_service/src/fault/fault_inject.cpp @@ -200,7 +200,8 @@ void FaultInject::TriggerAction(FaultEntry *entry) fi_req->set_fault_name(fault_name); fi_req->set_fault_paras(fault_paras); - b = ss->SendMessageToNode(dest_node_id, send_msg); + auto send_res = ss->SendMessageToNode(dest_node_id, send_msg); + b = (send_res.sent || send_res.queued_for_retry); } // If CcStreamSender == nullptr or failed to send, send to local diff --git a/tx_service/src/remote/cc_stream_sender.cpp b/tx_service/src/remote/cc_stream_sender.cpp index 0a24c43e..dd1b6988 100644 --- a/tx_service/src/remote/cc_stream_sender.cpp +++ b/tx_service/src/remote/cc_stream_sender.cpp @@ -154,14 +154,11 @@ bool CcStreamSender::UpdateStreamIP(uint32_t node_id, * @param msg * @param res * @param resend - * @return true - * @return false */ -bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, - const CcMessage &msg, - CcHandlerResultBase *res, - bool resend, - bool *need_reconnect) +SendMessageResult CcStreamSender::SendMessageToNode(uint32_t dest_node_id, + const CcMessage &msg, + CcHandlerResultBase *res, + bool resend) { TX_TRACE_ACTION_WITH_CONTEXT( this, @@ -186,18 +183,13 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, LOG(ERROR) << "Trying to connect to an unknown remote node. Node Id: " << dest_node_id; - return false; + return SendMessageResult::Failed(); } std::atomic &stream_version = std::get<1>(stream_it->second); int64_t stream_ver = stream_version.load(std::memory_order_acquire); if (stream_ver == -1) { - if (need_reconnect != nullptr) - { - *need_reconnect = true; - } - // resend the message if stream is connecting std::lock_guard lk(to_connect_mux_); auto resend_message_list = resend_message_list_.try_emplace( dest_node_id, moodycamel::ConcurrentQueue()); @@ -208,7 +200,7 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, // resend messages. to_connect_flag_ = true; to_connect_cv_.notify_one(); - return true; + return SendMessageResult::Queued(true); } brpc::StreamId stream_id = std::get<0>(stream_it->second); @@ -259,13 +251,14 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, } } - return true; + return SendMessageResult::Queued(false); } else { // for resend message, we have already reconnect the stream, if it // still failed to send the message, it possibly means that the // remote node is dead. We should skip resend the message again. + // TODO: discard the message? needs thorough thinking. if (resend) { // SendMessage error return -1 to indicate the request needs @@ -290,7 +283,7 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, } } - return false; + return SendMessageResult::Failed(); } else { @@ -318,20 +311,16 @@ bool CcStreamSender::SendMessageToNode(uint32_t dest_node_id, to_connect_flag_ = true; to_connect_cv_.notify_one(); - return true; + return SendMessageResult::Queued(true); } } } - else - { - return true; - } + + return SendMessageResult::Sent(); } -bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, - const ScanSliceResponse &msg, - bool resend, - bool *need_reconnect) +SendMessageResult CcStreamSender::SendScanRespToNode( + uint32_t dest_node_id, const ScanSliceResponse &msg, bool resend) { TX_TRACE_ACTION_WITH_CONTEXT( this, @@ -351,17 +340,13 @@ bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, { LOG(ERROR) << "Trying to connect to an unknown remote node. Node Id: " << dest_node_id; - return false; + return SendMessageResult::Failed(); } std::atomic &stream_version = std::get<1>(stream_it->second); int64_t stream_ver = stream_version.load(std::memory_order_acquire); if (stream_ver == -1) { - if (need_reconnect != nullptr) - { - *need_reconnect = true; - } // resend the message if stream is connecting std::lock_guard lk(to_connect_mux_); DLOG(INFO) << "CC stream is connecting, buffer the message for resend"; @@ -376,7 +361,7 @@ bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, // resend messages. to_connect_flag_ = true; to_connect_cv_.notify_one(); - return true; + return SendMessageResult::Queued(true); } brpc::StreamId &stream_id = std::get<0>(stream_it->second); @@ -389,7 +374,6 @@ bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, brpc::StreamWrite(stream_id, iobuf, &stream_write_options_); if (error_code != 0) - { if (error_code == EAGAIN) { @@ -427,16 +411,17 @@ bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, } } - return true; + return SendMessageResult::Queued(false); } else { // for resend message, we have already reconnect the stream, if it // still failed to send the message, it possibly means that the // remote node is dead. We should skip resend the message again. + // TODO: discard the message? needs thorough thinking. if (resend) { - return false; + return SendMessageResult::Failed(); } else { @@ -465,14 +450,11 @@ bool CcStreamSender::SendScanRespToNode(uint32_t dest_node_id, to_connect_flag_ = true; to_connect_cv_.notify_one(); - return true; + return SendMessageResult::Queued(true); } } } - else - { - return true; - } + return SendMessageResult::Sent(); } bool CcStreamSender::SendStandbyMessageToNode(uint32_t dest_node_id, @@ -500,7 +482,8 @@ bool CcStreamSender::SendStandbyMessageToNode(uint32_t dest_node_id, butil::IOBufAsZeroCopyOutputStream wrapper(&iobuf); msg.SerializeToZeroCopyStream(&wrapper); - int error_code = brpc::StreamWrite(stream_id, iobuf, &stream_write_options_); + int error_code = + brpc::StreamWrite(stream_id, iobuf, &stream_write_options_); if (error_code != 0) { DLOG(INFO) << "SendStandbyMessageToNode failed, node: " << dest_node_id @@ -520,13 +503,11 @@ bool CcStreamSender::SendStandbyMessageToNode(uint32_t dest_node_id, * @param msg * @param res * @param resend - * @return true - * @return false */ -bool CcStreamSender::SendMessageToNg(uint32_t node_group_id, - const CcMessage &msg, - CcHandlerResultBase *res, - bool resend) +SendMessageResult CcStreamSender::SendMessageToNg(uint32_t node_group_id, + const CcMessage &msg, + CcHandlerResultBase *res, + bool resend) { uint32_t dest_node_id = Sharder::Instance().LeaderNodeId(node_group_id); return SendMessageToNode(dest_node_id, msg, res, resend); @@ -708,10 +689,9 @@ void CcStreamSender::ResendMessageToNode() for (size_t idx = 0; idx < msg_cnt; ++idx) { - if (SendMessageToNode(nid, - messages[idx]->msg_, - messages[idx]->res_, - false)) + auto send_result = SendMessageToNode( + nid, messages[idx]->msg_, messages[idx]->res_, false); + if (send_result.sent || send_result.queued_for_retry) { send_cnt += 1; } @@ -757,7 +737,9 @@ void CcStreamSender::ResendMessageToNode() for (size_t idx = 0; idx < msg_cnt; ++idx) { - if (SendScanRespToNode(nid, messages[idx]->msg_, false)) + auto send_result = + SendScanRespToNode(nid, messages[idx]->msg_, false); + if (send_result.sent || send_result.queued_for_retry) { send_cnt += 1; } @@ -839,16 +821,14 @@ void CcStreamSender::ConnectStreams() lk.unlock(); for (size_t i = 0; i < msg_cnt; ++i) { - SendMessageToNode(nid, - messages[i]->msg_, - messages[i]->res_, - true, - &need_reconnect); - if (need_reconnect) + auto send_result = SendMessageToNode( + nid, messages[i]->msg_, messages[i]->res_, true); + if (send_result.need_reconnect) { // re-enqueue messages from i+1 to msg_cnt-1 // (message at i was already enqueued in // SendMessageToNode) + assert(send_result.queued_for_retry); for (size_t left_idx = i + 1; left_idx < msg_cnt; ++left_idx) { @@ -858,6 +838,7 @@ void CcStreamSender::ConnectStreams() std::move(messages[left_idx])); } } + need_reconnect = true; break; } } @@ -901,13 +882,10 @@ void CcStreamSender::ConnectStreams() lk.unlock(); for (size_t i = 0; i < msg_cnt; ++i) { - SendScanRespToNode( - nid, messages[i]->msg_, true, &need_reconnect); - if (need_reconnect) + auto send_result = + SendScanRespToNode(nid, messages[i]->msg_, true); + if (send_result.need_reconnect) { - // re-enqueue messages from i+1 to msg_cnt-1 - // (message at i was already enqueued in - // SendMessageToNode) for (size_t left_idx = i + 1; left_idx < msg_cnt; ++left_idx) { @@ -917,6 +895,7 @@ void CcStreamSender::ConnectStreams() std::move(messages[left_idx])); } } + need_reconnect = true; break; } } diff --git a/tx_service/src/remote/remote_cc_handler.cpp b/tx_service/src/remote/remote_cc_handler.cpp index 96e8569f..848ae8f7 100644 --- a/tx_service/src/remote/remote_cc_handler.cpp +++ b/tx_service/src/remote/remote_cc_handler.cpp @@ -865,8 +865,8 @@ void txservice::remote::RemoteCcHandler::BroadcastStatistics( broadcast_stat_req->set_schema_version(schema_ts); broadcast_stat_req->mutable_node_group_sample_pool()->CopyFrom(sample_pool); - bool send = stream_sender_.SendMessageToNg(ng_id, send_msg, &hres); - if (send) + auto send_res = stream_sender_.SendMessageToNg(ng_id, send_msg, &hres); + if (send_res.sent || send_res.queued_for_retry) { hres.SetFinished(); // Don't wait for remote result. } From 0ef3d94bee18d1ec78642062f5ea6f5ea408e549 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Wed, 10 Dec 2025 20:12:30 +0800 Subject: [PATCH 3/7] add comment and format --- tx_service/include/remote/cc_stream_sender.h | 3 +-- tx_service/src/remote/cc_stream_sender.cpp | 5 +++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tx_service/include/remote/cc_stream_sender.h b/tx_service/include/remote/cc_stream_sender.h index a1c93956..e38a24f8 100644 --- a/tx_service/include/remote/cc_stream_sender.h +++ b/tx_service/include/remote/cc_stream_sender.h @@ -112,8 +112,7 @@ class CcStreamSender SendMessageResult SendScanRespToNode(uint32_t dest_node_id, const ScanSliceResponse &msg, bool resend = false); - bool SendStandbyMessageToNode(uint32_t dest_node_id, - const CcMessage &msg); + bool SendStandbyMessageToNode(uint32_t dest_node_id, const CcMessage &msg); void UpdateRemoteNodes( const std::unordered_map &nodes_configs); diff --git a/tx_service/src/remote/cc_stream_sender.cpp b/tx_service/src/remote/cc_stream_sender.cpp index dd1b6988..cef5c9dc 100644 --- a/tx_service/src/remote/cc_stream_sender.cpp +++ b/tx_service/src/remote/cc_stream_sender.cpp @@ -190,6 +190,7 @@ SendMessageResult CcStreamSender::SendMessageToNode(uint32_t dest_node_id, int64_t stream_ver = stream_version.load(std::memory_order_acquire); if (stream_ver == -1) { + // resend the message if stream is connecting std::lock_guard lk(to_connect_mux_); auto resend_message_list = resend_message_list_.try_emplace( dest_node_id, moodycamel::ConcurrentQueue()); @@ -886,6 +887,10 @@ void CcStreamSender::ConnectStreams() SendScanRespToNode(nid, messages[i]->msg_, true); if (send_result.need_reconnect) { + // re-enqueue messages from i+1 to msg_cnt-1 + // (message at i was already enqueued in + // SendMessageToNode) + assert(send_result.queued_for_retry); for (size_t left_idx = i + 1; left_idx < msg_cnt; ++left_idx) { From 14ee4eb3afd8dd362efcd62c60f4ae490df801bc Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Wed, 10 Dec 2025 20:14:55 +0800 Subject: [PATCH 4/7] notify ConnectStreams thread in SendStandbyMessageToNode --- tx_service/src/remote/cc_stream_sender.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tx_service/src/remote/cc_stream_sender.cpp b/tx_service/src/remote/cc_stream_sender.cpp index cef5c9dc..fdcdf7f7 100644 --- a/tx_service/src/remote/cc_stream_sender.cpp +++ b/tx_service/src/remote/cc_stream_sender.cpp @@ -473,6 +473,10 @@ bool CcStreamSender::SendStandbyMessageToNode(uint32_t dest_node_id, std::atomic &stream_version = std::get<1>(stream_it->second); if (stream_version.load(std::memory_order_acquire) == -1) { + std::lock_guard lk(to_connect_mux_); + // wake up connector thread to reconnect streams + to_connect_flag_ = true; + to_connect_cv_.notify_one(); return false; } From 180d34de29d45cef9f5a3834635c69c5b9f2941c Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Thu, 11 Dec 2025 12:38:41 +0800 Subject: [PATCH 5/7] add comments --- tx_service/include/remote/cc_stream_sender.h | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tx_service/include/remote/cc_stream_sender.h b/tx_service/include/remote/cc_stream_sender.h index e38a24f8..5bb2f1d3 100644 --- a/tx_service/include/remote/cc_stream_sender.h +++ b/tx_service/include/remote/cc_stream_sender.h @@ -112,7 +112,20 @@ class CcStreamSender SendMessageResult SendScanRespToNode(uint32_t dest_node_id, const ScanSliceResponse &msg, bool resend = false); + + /** + * @brief Send standby message with best-effort semantics (no + * retry/queuing). + * + * Unlike SendMessageToNode, this does not queue failed messages for retry. + * Use for standby replication which has a dedicated resend logic. The + * caller only cares whether the message is successfully sent into stream or + * not. + * + * @return true if sent immediately, false otherwise + */ bool SendStandbyMessageToNode(uint32_t dest_node_id, const CcMessage &msg); + void UpdateRemoteNodes( const std::unordered_map &nodes_configs); From 17f2ea6630a06bd16f1815776df1e976a4adb612 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Fri, 12 Dec 2025 12:07:01 +0800 Subject: [PATCH 6/7] reconnect stream when StreamWrite fails in SendStandbyMessageToNode --- tx_service/src/remote/cc_stream_sender.cpp | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/tx_service/src/remote/cc_stream_sender.cpp b/tx_service/src/remote/cc_stream_sender.cpp index fdcdf7f7..c28237c9 100644 --- a/tx_service/src/remote/cc_stream_sender.cpp +++ b/tx_service/src/remote/cc_stream_sender.cpp @@ -471,7 +471,8 @@ bool CcStreamSender::SendStandbyMessageToNode(uint32_t dest_node_id, } std::atomic &stream_version = std::get<1>(stream_it->second); - if (stream_version.load(std::memory_order_acquire) == -1) + int64_t stream_ver = stream_version.load(std::memory_order_acquire); + if (stream_ver == -1) { std::lock_guard lk(to_connect_mux_); // wake up connector thread to reconnect streams @@ -493,6 +494,21 @@ bool CcStreamSender::SendStandbyMessageToNode(uint32_t dest_node_id, { DLOG(INFO) << "SendStandbyMessageToNode failed, node: " << dest_node_id << ", err: " << error_code; + // For non-EAGAIN errors, trigger stream reconnection + if (error_code != EAGAIN) + { + std::lock_guard lk(to_connect_mux_); + // Set stream version to -1 to mark it as broken and trigger + // reconnection + if (stream_version.compare_exchange_strong( + stream_ver, -1, std::memory_order_release)) + { + to_connect_regular_streams_.try_emplace(dest_node_id, + stream_ver + 1); + } + to_connect_flag_ = true; + to_connect_cv_.notify_one(); + } return false; } From 50a63e6fd7b2111efb764e1ac8c79d3facb07d60 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Fri, 12 Dec 2025 12:16:49 +0800 Subject: [PATCH 7/7] format --- tx_service/src/fault/fault_inject.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tx_service/src/fault/fault_inject.cpp b/tx_service/src/fault/fault_inject.cpp index fd8b39db..3504cbcd 100644 --- a/tx_service/src/fault/fault_inject.cpp +++ b/tx_service/src/fault/fault_inject.cpp @@ -200,7 +200,8 @@ void FaultInject::TriggerAction(FaultEntry *entry) fi_req->set_fault_name(fault_name); fi_req->set_fault_paras(fault_paras); - auto send_res = ss->SendMessageToNode(dest_node_id, send_msg); + auto send_res = + ss->SendMessageToNode(dest_node_id, send_msg); b = (send_res.sent || send_res.queued_for_retry); }