Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions tx_service/include/store/snapshot_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "store/data_store_handler.h"
Expand Down Expand Up @@ -128,6 +129,11 @@ class SnapshotManager
*/
void EraseSubscriptionBarrierLocked(uint32_t standby_node_id,
int64_t standby_node_term);
bool IsSnapshotSyncCompletedLocked(uint32_t standby_node_id,
int64_t standby_node_term) const;
void MarkSnapshotSyncCompletedLocked(uint32_t standby_node_id,
int64_t standby_node_term);
void EraseSnapshotSyncCompletedByNodeLocked(uint32_t standby_node_id);

struct PendingSnapshotSyncTask
{
Expand Down Expand Up @@ -158,6 +164,9 @@ class SnapshotManager
// max ts)
std::unordered_map<uint32_t, std::unordered_map<int64_t, uint64_t>>
subscription_barrier_;
// standby node id -> completed standby terms
std::unordered_map<uint32_t, std::unordered_set<int64_t>>
completed_snapshot_terms_;
bool terminated_{false};

const std::string backup_path_;
Expand Down
135 changes: 129 additions & 6 deletions tx_service/src/store/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ bool SnapshotManager::OnSnapshotSyncRequested(
auto node_it = subscription_barrier_.find(req->standby_node_id());
if (node_it == subscription_barrier_.end())
{
if (IsSnapshotSyncCompletedLocked(req->standby_node_id(),
req->standby_node_term()))
{
DLOG(INFO) << "Ignore duplicate snapshot sync request for "
"completed standby node #"
<< req->standby_node_id()
<< ", standby term: " << req->standby_node_term();
return true;
}
LOG(WARNING) << "No subscription barrier found for standby node #"
<< req->standby_node_id()
<< ", standby term: " << req->standby_node_term();
Expand All @@ -111,6 +120,15 @@ bool SnapshotManager::OnSnapshotSyncRequested(
auto barrier_it = node_it->second.find(req->standby_node_term());
if (barrier_it == node_it->second.end())
{
if (IsSnapshotSyncCompletedLocked(req->standby_node_id(),
req->standby_node_term()))
{
DLOG(INFO) << "Ignore duplicate snapshot sync request for "
"completed standby node #"
<< req->standby_node_id()
<< ", standby term: " << req->standby_node_term();
return true;
}
LOG(WARNING) << "No barrier found for standby node #"
<< req->standby_node_id()
<< " at standby term: " << req->standby_node_term();
Expand Down Expand Up @@ -150,6 +168,31 @@ void SnapshotManager::RegisterSubscriptionBarrier(uint32_t standby_node_id,
uint64_t active_tx_max_ts)
{
std::unique_lock<std::mutex> lk(standby_sync_mux_);
auto completed_it = completed_snapshot_terms_.find(standby_node_id);
if (completed_it != completed_snapshot_terms_.end())
{
for (int64_t completed_term : completed_it->second)
{
if (completed_term > standby_node_term)
{
DLOG(INFO) << "Ignore stale subscription barrier registration "
"for standby node #"
<< standby_node_id << ", term " << standby_node_term
<< " because completed term " << completed_term
<< " is newer";
return;
}
}

if (completed_it->second.find(standby_node_term) !=
completed_it->second.end())
{
DLOG(INFO) << "Skip barrier registration for already completed "
"standby node #"
<< standby_node_id << ", term " << standby_node_term;
return;
}
}

// Ignore out-of-order old barrier registrations when a newer standby term
// is already known for this standby node.
Expand Down Expand Up @@ -191,6 +234,26 @@ void SnapshotManager::RegisterSubscriptionBarrier(uint32_t standby_node_id,
pending_req_.erase(pending_it);
}

if (completed_it != completed_snapshot_terms_.end())
{
auto term_it = completed_it->second.begin();
while (term_it != completed_it->second.end())
{
if (*term_it < standby_node_term)
{
term_it = completed_it->second.erase(term_it);
}
else
{
++term_it;
}
}
if (completed_it->second.empty())
{
completed_snapshot_terms_.erase(completed_it);
}
}

auto &node_barriers = subscription_barrier_[standby_node_id];

// Keep only current and newer terms for this node.
Expand Down Expand Up @@ -253,6 +316,7 @@ void SnapshotManager::EraseSubscriptionBarriersByNode(uint32_t standby_node_id)
std::unique_lock<std::mutex> lk(standby_sync_mux_);
pending_req_.erase(standby_node_id);
subscription_barrier_.erase(standby_node_id);
EraseSnapshotSyncCompletedByNodeLocked(standby_node_id);
}

void SnapshotManager::EraseSubscriptionBarrierLocked(uint32_t standby_node_id,
Expand All @@ -271,6 +335,43 @@ void SnapshotManager::EraseSubscriptionBarrierLocked(uint32_t standby_node_id,
}
}

bool SnapshotManager::IsSnapshotSyncCompletedLocked(
uint32_t standby_node_id, int64_t standby_node_term) const
{
auto node_it = completed_snapshot_terms_.find(standby_node_id);
if (node_it == completed_snapshot_terms_.end())
{
return false;
}
return node_it->second.find(standby_node_term) != node_it->second.end();
}

void SnapshotManager::MarkSnapshotSyncCompletedLocked(uint32_t standby_node_id,
int64_t standby_node_term)
{
auto &completed_terms = completed_snapshot_terms_[standby_node_id];
completed_terms.insert(standby_node_term);

auto it = completed_terms.begin();
while (it != completed_terms.end())
{
if (*it < standby_node_term)
{
it = completed_terms.erase(it);
}
else
{
++it;
}
}
}

void SnapshotManager::EraseSnapshotSyncCompletedByNodeLocked(
uint32_t standby_node_id)
{
completed_snapshot_terms_.erase(standby_node_id);
}

// If kvstore is enabled, we must flush data in-memory to kvstore firstly.
// For non-shared kvstore, also we create and send the snapshot to standby
// nodes.
Expand All @@ -297,6 +398,7 @@ void SnapshotManager::SyncWithStandby()
// clear barriers as well, all queued sync states are stale when this
// leader term is no longer valid.
subscription_barrier_.clear();
completed_snapshot_terms_.clear();
return;
}

Expand Down Expand Up @@ -456,6 +558,7 @@ void SnapshotManager::SyncWithStandby()
<< req.standby_node_term()
<< ", channel: " << (channel ? "valid" : "null");

bool notify_succ = false;
if (channel)
{
// needs retry if failed
Expand Down Expand Up @@ -484,19 +587,39 @@ void SnapshotManager::SyncWithStandby()
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
else
if (on_sync_resp.error())
{
break;
LOG(WARNING)
<< "OnSnapshotSynced to standby node #"
<< req.standby_node_id()
<< " returned error response at standby term "
<< req.standby_node_term();
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}

notify_succ = true;
break;
}
}
else
{
LOG(WARNING) << "OnSnapshotSynced channel is nullptr for "
<< "standby node #" << req.standby_node_id()
<< " at standby term " << req.standby_node_term();
}

if (!notify_succ)
{
// Keep pending/barrier so standby can retry with the same
// standby term.
continue;
}

// We don't care if the notification is successful or not.
// We just need to erase the same request. Even if the notification
// fails, after a while, the standby node will resend the
// request.
{
std::unique_lock<std::mutex> lk(standby_sync_mux_);
MarkSnapshotSyncCompletedLocked(req.standby_node_id(),
req.standby_node_term());
auto pending_req_iter =
pending_req_.find(req.standby_node_id());
if (pending_req_iter != pending_req_.end())
Expand Down
Loading