From cf5902be51a9ea789c19df5c9dcb91ab887d2355 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Wed, 4 Feb 2026 16:40:34 +0800 Subject: [PATCH 1/2] notify standby to resubscribe when forward entry missing --- tx_service/include/cc/cc_shard.h | 1 + tx_service/include/cc/object_cc_map.h | 13 ++- tx_service/src/cc/cc_shard.cpp | 116 +++++++++++++++----------- 3 files changed, 79 insertions(+), 51 deletions(-) diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index ee977e68..20023351 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -1061,6 +1061,7 @@ class CcShard // Try to send previous failed message to standby nodes. bool ResendFailedForwardMessages(); + void NotifyStandbyOutOfSync(uint32_t node_id); void CollectStandbyMetrics(); diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index b07c7408..fca0f59b 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -1378,9 +1378,20 @@ class ObjectCcMap : public TemplateCcMap bool s_obj_exist = (cce->PayloadStatus() == RecordStatus::Normal); StandbyForwardEntry *forward_entry = nullptr; - if (!shard_->GetSubscribedStandbys().empty()) + auto subscribed_standbys = shard_->GetSubscribedStandbys(); + if (!subscribed_standbys.empty()) { forward_entry = cce->ForwardEntry(); + if (forward_entry == nullptr) + { + LOG(ERROR) << "Subscribed standbys exist, but forward_entry is " + "null. Data loss may occur. Notifying standbys " + "to resubscribe."; + for (uint32_t node_id : subscribed_standbys) + { + shard_->NotifyStandbyOutOfSync(node_id); + } + } } if (commit_ts > 0) { diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index 04b7ef18..a1b62ad6 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -3078,56 +3078,7 @@ bool CcShard::ResendFailedForwardMessages() } else { - // Message not found in map - it has been evicted - // This message is already lost and this standby needs to - // resubscribe to the primary node again. Notify standby that - // it has already fallen behind. Use the latest seq id so that - // standby knows which epoch this out of sync msg belongs to. - remote::CcMessage cc_msg; - cc_msg.set_type( - remote::CcMessage_MessageType:: - CcMessage_MessageType_KeyObjectStandbyForwardRequest); - auto req = cc_msg.mutable_key_obj_standby_forward_req(); - req->set_forward_seq_grp(core_id_); - req->set_forward_seq_id(next_forward_sequence_id_ - 1); - req->set_primary_leader_term(Sharder::Instance().LeaderTerm( - Sharder::Instance().NativeNodeGroup())); - req->set_out_of_sync(true); - stream_sender_->SendMessageToNode(node_id, cc_msg); - - seq_id = UINT64_MAX; - // Remove heartbeat target node - local_shards_.RemoveHeartbeatTargetNode(node_id, - seq_id_and_term.second); - - for (size_t core_idx = 0; core_idx < core_cnt_; ++core_idx) - { - if (core_idx != core_id_) - { - DispatchTask( - core_idx, - [node_id = node_id, - unsubscribe_standby_term = - seq_id_and_term.second](CcShard &ccs) -> bool - { - auto subscribe_node_iter = - ccs.subscribed_standby_nodes_.find(node_id); - if (subscribe_node_iter != - ccs.subscribed_standby_nodes_.end()) - { - if (subscribe_node_iter->second.second <= - unsubscribe_standby_term) - { - // erase ? - subscribe_node_iter->second.first = - UINT64_MAX; - } - } - - return true; - }); - } - } + NotifyStandbyOutOfSync(node_id); break; } } @@ -3144,6 +3095,71 @@ bool CcShard::ResendFailedForwardMessages() return all_msgs_sent; } +void CcShard::NotifyStandbyOutOfSync(uint32_t node_id) +{ + auto seq_node_iter = subscribed_standby_nodes_.find(node_id); + if (seq_node_iter == subscribed_standby_nodes_.end()) + { + return; + } + + if (!stream_sender_) + { + stream_sender_ = Sharder::Instance().GetCcStreamSender(); + } + if (!stream_sender_) + { + LOG(WARNING) << "Failed to notify standby " << node_id + << " of out of sync state because stream sender is null"; + return; + } + + // Message not found in map - it has been evicted. Notify standby that it + // has already fallen behind so it can resubscribe to the primary node. + remote::CcMessage cc_msg; + cc_msg.set_type(remote::CcMessage_MessageType:: + CcMessage_MessageType_KeyObjectStandbyForwardRequest); + auto req = cc_msg.mutable_key_obj_standby_forward_req(); + req->set_forward_seq_grp(core_id_); + req->set_forward_seq_id(next_forward_sequence_id_ - 1); + req->set_primary_leader_term( + Sharder::Instance().LeaderTerm(Sharder::Instance().NativeNodeGroup())); + req->set_out_of_sync(true); + stream_sender_->SendMessageToNode(node_id, cc_msg); + + auto &seq_id_and_term = seq_node_iter->second; + seq_id_and_term.first = UINT64_MAX; + // Remove heartbeat target node + local_shards_.RemoveHeartbeatTargetNode(node_id, seq_id_and_term.second); + + int64_t unsubscribe_standby_term = seq_id_and_term.second; + for (size_t core_idx = 0; core_idx < core_cnt_; ++core_idx) + { + if (core_idx != core_id_) + { + DispatchTask( + core_idx, + [node_id, unsubscribe_standby_term](CcShard &ccs) + -> bool + { + auto subscribe_node_iter = + ccs.subscribed_standby_nodes_.find(node_id); + if (subscribe_node_iter != + ccs.subscribed_standby_nodes_.end()) + { + if (subscribe_node_iter->second.second <= + unsubscribe_standby_term) + { + subscribe_node_iter->second.first = UINT64_MAX; + } + } + + return true; + }); + } + } +} + void CcShard::CollectStandbyMetrics() { assert(metrics::enable_standby_metrics); From ae93f8a691a3d8b86ac76996a5dd6c37c6eab81c Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Wed, 4 Feb 2026 19:37:17 +0800 Subject: [PATCH 2/2] format --- tx_service/src/cc/cc_shard.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index a1b62ad6..b21df686 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -3139,8 +3139,7 @@ void CcShard::NotifyStandbyOutOfSync(uint32_t node_id) { DispatchTask( core_idx, - [node_id, unsubscribe_standby_term](CcShard &ccs) - -> bool + [node_id, unsubscribe_standby_term](CcShard &ccs) -> bool { auto subscribe_node_iter = ccs.subscribed_standby_nodes_.find(node_id);