diff --git a/tx_service/include/fault/log_replay_service.h b/tx_service/include/fault/log_replay_service.h index e9fa2fc2..c5cc864f 100644 --- a/tx_service/include/fault/log_replay_service.h +++ b/tx_service/include/fault/log_replay_service.h @@ -161,6 +161,13 @@ class RecoveryService : public brpc::StreamInputHandler, void on_closed(brpc::StreamId id) override; private: + struct CandidateReplayWatch + { + int64_t term{-1}; + uint64_t first_seen_ts{0}; + uint64_t last_replay_ts{0}; + }; + /** * @brief ReplayNow() check whether the replay request can be processed * immediately. When replay error happens, we will put the replay request @@ -228,10 +235,15 @@ class RecoveryService : public brpc::StreamInputHandler, std::deque replay_log_queue_; std::deque delayed_replay_queue_; std::deque recover_tx_queue_; + std::unordered_map candidate_replay_watch_; std::mutex queue_mux_; std::condition_variable queue_cv_; std::atomic finish_; + // Candidate leader has no replay stream for this duration, then trigger + // ReplayLog and keep retrying with the same interval until stream appears. + static constexpr uint64_t kReplayCheckIntervalUs = 5ULL * 1000 * 1000; + // ip and port of log replay server of this node std::string ip_; uint16_t port_; diff --git a/tx_service/src/fault/log_replay_service.cpp b/tx_service/src/fault/log_replay_service.cpp index 739caa70..2c244d91 100644 --- a/tx_service/src/fault/log_replay_service.cpp +++ b/tx_service/src/fault/log_replay_service.cpp @@ -87,7 +87,7 @@ RecoveryService::RecoveryService(LocalCcShards &local_shards, std::unique_lock lk(queue_mux_); bool timed_out = !queue_cv_.wait_for( lk, - std::chrono::seconds(10), + std::chrono::seconds(5), [this] { return !replay_log_queue_.empty() || @@ -98,42 +98,75 @@ RecoveryService::RecoveryService(LocalCcShards &local_shards, // Only check for orphaned ng recovery on timeout, not on notify if (timed_out) { - // Check every 10s if there are any orphaned ng - // recovery. + // Periodically check candidate leaders that still do not + // have replay stream connections. lk.unlock(); + uint64_t now_ts = LocalCcShards::ClockTs(); auto ngs = Sharder::Instance().AllNodeGroups(); for (auto ng_id : *ngs) { int64_t candidate_term = Sharder::Instance().CandidateLeaderTerm(ng_id); - if (candidate_term != -1) + if (candidate_term < 0) { - // ng should be replaying - bool need_replay = true; + candidate_replay_watch_.erase(ng_id); + continue; + } + + CandidateReplayWatch &watch = + candidate_replay_watch_[ng_id]; + if (watch.term != candidate_term) + { + watch.term = candidate_term; + watch.first_seen_ts = now_ts; + watch.last_replay_ts = 0; + } + + bool has_replay_connection = false; + { + std::unique_lock inbound_lk( + inbound_mux_); + for (const auto &entry : inbound_connections_) { - std::unique_lock inbound_lk( - inbound_mux_); - for (const auto &entry : inbound_connections_) + if (entry.second.cc_ng_id_ == ng_id && + entry.second.cc_ng_term_ == + candidate_term && + entry.second.recovering_) { - if (entry.second.cc_ng_id_ == ng_id) - { - need_replay = false; - break; - } + has_replay_connection = true; + break; } } - if (need_replay) - { - // ng is not recovering, send replay log - // request - LOG(INFO) - << "Requesting log replay for ng: " << ng_id - << " at term: " << candidate_term - << " as there is no recovery " - "connection"; - ReplayLog(ng_id, candidate_term, -1, 0, false); - } } + if (has_replay_connection) + { + watch.first_seen_ts = 0; + continue; + } + + if (watch.first_seen_ts == 0) + { + watch.first_seen_ts = now_ts; + } + + if (now_ts - watch.first_seen_ts < + kReplayCheckIntervalUs) + { + continue; + } + + if (watch.last_replay_ts != 0 && + now_ts - watch.last_replay_ts < + kReplayCheckIntervalUs) + { + continue; + } + + LOG(INFO) << "Requesting log replay for ng: " << ng_id + << " at term: " << candidate_term + << " as there is no recovery connection"; + ReplayLog(ng_id, candidate_term, -1, 0, false); + watch.last_replay_ts = now_ts; } lk.lock(); }