Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions tx_service/include/fault/log_replay_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ class RecoveryService : public brpc::StreamInputHandler,
void on_closed(brpc::StreamId id) override;

private:
struct CandidateReplayWatch
{
int64_t term{-1};
uint64_t first_seen_ts{0};
uint64_t last_replay_ts{0};
};

/**
* @brief ReplayNow() check whether the replay request can be processed
* immediately. When replay error happens, we will put the replay request
Expand Down Expand Up @@ -228,10 +235,15 @@ class RecoveryService : public brpc::StreamInputHandler,
std::deque<ReplayLogTask> replay_log_queue_;
std::deque<ReplayLogTask> delayed_replay_queue_;
std::deque<RecoverTxTask> recover_tx_queue_;
std::unordered_map<uint32_t, CandidateReplayWatch> candidate_replay_watch_;
std::mutex queue_mux_;
std::condition_variable queue_cv_;
std::atomic<bool> finish_;

// Candidate leader has no replay stream for this duration, then trigger
// ReplayLog and keep retrying with the same interval until stream appears.
static constexpr uint64_t kReplayCheckIntervalUs = 5ULL * 1000 * 1000;

// ip and port of log replay server of this node
std::string ip_;
uint16_t port_;
Expand Down
83 changes: 58 additions & 25 deletions tx_service/src/fault/log_replay_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ RecoveryService::RecoveryService(LocalCcShards &local_shards,
std::unique_lock<std::mutex> lk(queue_mux_);
bool timed_out = !queue_cv_.wait_for(
lk,
std::chrono::seconds(10),
std::chrono::seconds(5),
[this]
{
return !replay_log_queue_.empty() ||
Expand All @@ -98,42 +98,75 @@ RecoveryService::RecoveryService(LocalCcShards &local_shards,
// Only check for orphaned ng recovery on timeout, not on notify
if (timed_out)
{
// Check every 10s if there are any orphaned ng
// recovery.
// Periodically check candidate leaders that still do not
// have replay stream connections.
lk.unlock();
uint64_t now_ts = LocalCcShards::ClockTs();
auto ngs = Sharder::Instance().AllNodeGroups();
for (auto ng_id : *ngs)
{
int64_t candidate_term =
Sharder::Instance().CandidateLeaderTerm(ng_id);
if (candidate_term != -1)
if (candidate_term < 0)
{
// ng should be replaying
bool need_replay = true;
candidate_replay_watch_.erase(ng_id);
continue;
}

CandidateReplayWatch &watch =
candidate_replay_watch_[ng_id];
if (watch.term != candidate_term)
{
watch.term = candidate_term;
watch.first_seen_ts = now_ts;
watch.last_replay_ts = 0;
}

bool has_replay_connection = false;
{
std::unique_lock<bthread::Mutex> inbound_lk(
inbound_mux_);
for (const auto &entry : inbound_connections_)
{
std::unique_lock<bthread::Mutex> inbound_lk(
inbound_mux_);
for (const auto &entry : inbound_connections_)
if (entry.second.cc_ng_id_ == ng_id &&
entry.second.cc_ng_term_ ==
candidate_term &&
entry.second.recovering_)
{
if (entry.second.cc_ng_id_ == ng_id)
{
need_replay = false;
break;
}
has_replay_connection = true;
break;
}
}
if (need_replay)
{
// ng is not recovering, send replay log
// request
LOG(INFO)
<< "Requesting log replay for ng: " << ng_id
<< " at term: " << candidate_term
<< " as there is no recovery "
"connection";
ReplayLog(ng_id, candidate_term, -1, 0, false);
}
}
if (has_replay_connection)
{
watch.first_seen_ts = 0;
continue;
}

if (watch.first_seen_ts == 0)
{
watch.first_seen_ts = now_ts;
}

if (now_ts - watch.first_seen_ts <
kReplayCheckIntervalUs)
{
continue;
}

if (watch.last_replay_ts != 0 &&
now_ts - watch.last_replay_ts <
kReplayCheckIntervalUs)
{
continue;
}

LOG(INFO) << "Requesting log replay for ng: " << ng_id
<< " at term: " << candidate_term
<< " as there is no recovery connection";
ReplayLog(ng_id, candidate_term, -1, 0, false);
watch.last_replay_ts = now_ts;
}
lk.lock();
}
Expand Down
Loading