From e2419a07fa91ac17d576bd81876e12c23554c47f Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 16 Mar 2026 18:31:24 +0800 Subject: [PATCH 1/3] tx_service: gate replay fallback on candidate no-connection timeout --- tx_service/include/fault/log_replay_service.h | 12 +++ tx_service/src/fault/log_replay_service.cpp | 79 +++++++++++++------ 2 files changed, 66 insertions(+), 25 deletions(-) diff --git a/tx_service/include/fault/log_replay_service.h b/tx_service/include/fault/log_replay_service.h index e9fa2fc2..c5cc864f 100644 --- a/tx_service/include/fault/log_replay_service.h +++ b/tx_service/include/fault/log_replay_service.h @@ -161,6 +161,13 @@ class RecoveryService : public brpc::StreamInputHandler, void on_closed(brpc::StreamId id) override; private: + struct CandidateReplayWatch + { + int64_t term{-1}; + uint64_t first_seen_ts{0}; + uint64_t last_replay_ts{0}; + }; + /** * @brief ReplayNow() check whether the replay request can be processed * immediately. When replay error happens, we will put the replay request @@ -228,10 +235,15 @@ class RecoveryService : public brpc::StreamInputHandler, std::deque replay_log_queue_; std::deque delayed_replay_queue_; std::deque recover_tx_queue_; + std::unordered_map candidate_replay_watch_; std::mutex queue_mux_; std::condition_variable queue_cv_; std::atomic finish_; + // Candidate leader has no replay stream for this duration, then trigger + // ReplayLog and keep retrying with the same interval until stream appears. + static constexpr uint64_t kReplayCheckIntervalUs = 5ULL * 1000 * 1000; + // ip and port of log replay server of this node std::string ip_; uint16_t port_; diff --git a/tx_service/src/fault/log_replay_service.cpp b/tx_service/src/fault/log_replay_service.cpp index 739caa70..6e800b91 100644 --- a/tx_service/src/fault/log_replay_service.cpp +++ b/tx_service/src/fault/log_replay_service.cpp @@ -87,7 +87,7 @@ RecoveryService::RecoveryService(LocalCcShards &local_shards, std::unique_lock lk(queue_mux_); bool timed_out = !queue_cv_.wait_for( lk, - std::chrono::seconds(10), + std::chrono::seconds(5), [this] { return !replay_log_queue_.empty() || @@ -98,42 +98,71 @@ RecoveryService::RecoveryService(LocalCcShards &local_shards, // Only check for orphaned ng recovery on timeout, not on notify if (timed_out) { - // Check every 10s if there are any orphaned ng - // recovery. + // Periodically check candidate leaders that still do not + // have replay stream connections. lk.unlock(); + uint64_t now_ts = LocalCcShards::ClockTs(); auto ngs = Sharder::Instance().AllNodeGroups(); for (auto ng_id : *ngs) { int64_t candidate_term = Sharder::Instance().CandidateLeaderTerm(ng_id); - if (candidate_term != -1) + if (candidate_term < 0) { - // ng should be replaying - bool need_replay = true; + candidate_replay_watch_.erase(ng_id); + continue; + } + + CandidateReplayWatch &watch = + candidate_replay_watch_[ng_id]; + if (watch.term != candidate_term) + { + watch.term = candidate_term; + watch.first_seen_ts = now_ts; + watch.last_replay_ts = 0; + } + else if (watch.first_seen_ts == 0) + { + watch.first_seen_ts = now_ts; + } + + bool has_replay_connection = false; + { + std::unique_lock inbound_lk( + inbound_mux_); + for (const auto &entry : inbound_connections_) { - std::unique_lock inbound_lk( - inbound_mux_); - for (const auto &entry : inbound_connections_) + if (entry.second.cc_ng_id_ == ng_id && + entry.second.cc_ng_term_ == candidate_term && + entry.second.recovering_) { - if (entry.second.cc_ng_id_ == ng_id) - { - need_replay = false; - break; - } + has_replay_connection = true; + break; } } - if (need_replay) - { - // ng is not recovering, send replay log - // request - LOG(INFO) - << "Requesting log replay for ng: " << ng_id - << " at term: " << candidate_term - << " as there is no recovery " - "connection"; - ReplayLog(ng_id, candidate_term, -1, 0, false); - } } + if (has_replay_connection) + { + continue; + } + + if (now_ts - watch.first_seen_ts < kReplayCheckIntervalUs) + { + continue; + } + + if (watch.last_replay_ts != 0 && + now_ts - watch.last_replay_ts < + kReplayCheckIntervalUs) + { + continue; + } + + LOG(INFO) << "Requesting log replay for ng: " << ng_id + << " at term: " << candidate_term + << " as there is no recovery connection"; + ReplayLog(ng_id, candidate_term, -1, 0, false); + watch.last_replay_ts = now_ts; } lk.lock(); } From 348b91f2b2130418041e01525e16c41ff3ea0051 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 16 Mar 2026 18:35:21 +0800 Subject: [PATCH 2/3] format --- tx_service/src/fault/log_replay_service.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tx_service/src/fault/log_replay_service.cpp b/tx_service/src/fault/log_replay_service.cpp index 6e800b91..64a49990 100644 --- a/tx_service/src/fault/log_replay_service.cpp +++ b/tx_service/src/fault/log_replay_service.cpp @@ -133,7 +133,8 @@ RecoveryService::RecoveryService(LocalCcShards &local_shards, for (const auto &entry : inbound_connections_) { if (entry.second.cc_ng_id_ == ng_id && - entry.second.cc_ng_term_ == candidate_term && + entry.second.cc_ng_term_ == + candidate_term && entry.second.recovering_) { has_replay_connection = true; @@ -146,7 +147,8 @@ RecoveryService::RecoveryService(LocalCcShards &local_shards, continue; } - if (now_ts - watch.first_seen_ts < kReplayCheckIntervalUs) + if (now_ts - watch.first_seen_ts < + kReplayCheckIntervalUs) { continue; } From 22203da4f7258311e7539f18ea2f3cd46cbebfda Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Tue, 17 Mar 2026 17:44:43 +0800 Subject: [PATCH 3/3] tx_service: reset no-connection timer when replay stream is present --- tx_service/src/fault/log_replay_service.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tx_service/src/fault/log_replay_service.cpp b/tx_service/src/fault/log_replay_service.cpp index 64a49990..2c244d91 100644 --- a/tx_service/src/fault/log_replay_service.cpp +++ b/tx_service/src/fault/log_replay_service.cpp @@ -121,10 +121,6 @@ RecoveryService::RecoveryService(LocalCcShards &local_shards, watch.first_seen_ts = now_ts; watch.last_replay_ts = 0; } - else if (watch.first_seen_ts == 0) - { - watch.first_seen_ts = now_ts; - } bool has_replay_connection = false; { @@ -144,9 +140,15 @@ RecoveryService::RecoveryService(LocalCcShards &local_shards, } if (has_replay_connection) { + watch.first_seen_ts = 0; continue; } + if (watch.first_seen_ts == 0) + { + watch.first_seen_ts = now_ts; + } + if (now_ts - watch.first_seen_ts < kReplayCheckIntervalUs) {