diff --git a/tx_service/include/store/snapshot_manager.h b/tx_service/include/store/snapshot_manager.h index f7619525..c0428539 100644 --- a/tx_service/include/store/snapshot_manager.h +++ b/tx_service/include/store/snapshot_manager.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include "store/data_store_handler.h" @@ -128,6 +129,11 @@ class SnapshotManager */ void EraseSubscriptionBarrierLocked(uint32_t standby_node_id, int64_t standby_node_term); + bool IsSnapshotSyncCompletedLocked(uint32_t standby_node_id, + int64_t standby_node_term) const; + void MarkSnapshotSyncCompletedLocked(uint32_t standby_node_id, + int64_t standby_node_term); + void EraseSnapshotSyncCompletedByNodeLocked(uint32_t standby_node_id); struct PendingSnapshotSyncTask { @@ -158,6 +164,9 @@ class SnapshotManager // max ts) std::unordered_map> subscription_barrier_; + // standby node id -> completed standby terms + std::unordered_map> + completed_snapshot_terms_; bool terminated_{false}; const std::string backup_path_; diff --git a/tx_service/src/store/snapshot_manager.cpp b/tx_service/src/store/snapshot_manager.cpp index e925006b..33736e77 100644 --- a/tx_service/src/store/snapshot_manager.cpp +++ b/tx_service/src/store/snapshot_manager.cpp @@ -102,6 +102,15 @@ bool SnapshotManager::OnSnapshotSyncRequested( auto node_it = subscription_barrier_.find(req->standby_node_id()); if (node_it == subscription_barrier_.end()) { + if (IsSnapshotSyncCompletedLocked(req->standby_node_id(), + req->standby_node_term())) + { + DLOG(INFO) << "Ignore duplicate snapshot sync request for " + "completed standby node #" + << req->standby_node_id() + << ", standby term: " << req->standby_node_term(); + return true; + } LOG(WARNING) << "No subscription barrier found for standby node #" << req->standby_node_id() << ", standby term: " << req->standby_node_term(); @@ -111,6 +120,15 @@ bool SnapshotManager::OnSnapshotSyncRequested( auto barrier_it = node_it->second.find(req->standby_node_term()); if (barrier_it == node_it->second.end()) { + if (IsSnapshotSyncCompletedLocked(req->standby_node_id(), + req->standby_node_term())) + { + DLOG(INFO) << "Ignore duplicate snapshot sync request for " + "completed standby node #" + << req->standby_node_id() + << ", standby term: " << req->standby_node_term(); + return true; + } LOG(WARNING) << "No barrier found for standby node #" << req->standby_node_id() << " at standby term: " << req->standby_node_term(); @@ -150,6 +168,31 @@ void SnapshotManager::RegisterSubscriptionBarrier(uint32_t standby_node_id, uint64_t active_tx_max_ts) { std::unique_lock lk(standby_sync_mux_); + auto completed_it = completed_snapshot_terms_.find(standby_node_id); + if (completed_it != completed_snapshot_terms_.end()) + { + for (int64_t completed_term : completed_it->second) + { + if (completed_term > standby_node_term) + { + DLOG(INFO) << "Ignore stale subscription barrier registration " + "for standby node #" + << standby_node_id << ", term " << standby_node_term + << " because completed term " << completed_term + << " is newer"; + return; + } + } + + if (completed_it->second.find(standby_node_term) != + completed_it->second.end()) + { + DLOG(INFO) << "Skip barrier registration for already completed " + "standby node #" + << standby_node_id << ", term " << standby_node_term; + return; + } + } // Ignore out-of-order old barrier registrations when a newer standby term // is already known for this standby node. @@ -191,6 +234,26 @@ void SnapshotManager::RegisterSubscriptionBarrier(uint32_t standby_node_id, pending_req_.erase(pending_it); } + if (completed_it != completed_snapshot_terms_.end()) + { + auto term_it = completed_it->second.begin(); + while (term_it != completed_it->second.end()) + { + if (*term_it < standby_node_term) + { + term_it = completed_it->second.erase(term_it); + } + else + { + ++term_it; + } + } + if (completed_it->second.empty()) + { + completed_snapshot_terms_.erase(completed_it); + } + } + auto &node_barriers = subscription_barrier_[standby_node_id]; // Keep only current and newer terms for this node. @@ -253,6 +316,7 @@ void SnapshotManager::EraseSubscriptionBarriersByNode(uint32_t standby_node_id) std::unique_lock lk(standby_sync_mux_); pending_req_.erase(standby_node_id); subscription_barrier_.erase(standby_node_id); + EraseSnapshotSyncCompletedByNodeLocked(standby_node_id); } void SnapshotManager::EraseSubscriptionBarrierLocked(uint32_t standby_node_id, @@ -271,6 +335,43 @@ void SnapshotManager::EraseSubscriptionBarrierLocked(uint32_t standby_node_id, } } +bool SnapshotManager::IsSnapshotSyncCompletedLocked( + uint32_t standby_node_id, int64_t standby_node_term) const +{ + auto node_it = completed_snapshot_terms_.find(standby_node_id); + if (node_it == completed_snapshot_terms_.end()) + { + return false; + } + return node_it->second.find(standby_node_term) != node_it->second.end(); +} + +void SnapshotManager::MarkSnapshotSyncCompletedLocked(uint32_t standby_node_id, + int64_t standby_node_term) +{ + auto &completed_terms = completed_snapshot_terms_[standby_node_id]; + completed_terms.insert(standby_node_term); + + auto it = completed_terms.begin(); + while (it != completed_terms.end()) + { + if (*it < standby_node_term) + { + it = completed_terms.erase(it); + } + else + { + ++it; + } + } +} + +void SnapshotManager::EraseSnapshotSyncCompletedByNodeLocked( + uint32_t standby_node_id) +{ + completed_snapshot_terms_.erase(standby_node_id); +} + // If kvstore is enabled, we must flush data in-memory to kvstore firstly. // For non-shared kvstore, also we create and send the snapshot to standby // nodes. @@ -297,6 +398,7 @@ void SnapshotManager::SyncWithStandby() // clear barriers as well, all queued sync states are stale when this // leader term is no longer valid. subscription_barrier_.clear(); + completed_snapshot_terms_.clear(); return; } @@ -456,6 +558,7 @@ void SnapshotManager::SyncWithStandby() << req.standby_node_term() << ", channel: " << (channel ? "valid" : "null"); + bool notify_succ = false; if (channel) { // needs retry if failed @@ -484,19 +587,39 @@ void SnapshotManager::SyncWithStandby() std::this_thread::sleep_for(std::chrono::seconds(1)); continue; } - else + if (on_sync_resp.error()) { - break; + LOG(WARNING) + << "OnSnapshotSynced to standby node #" + << req.standby_node_id() + << " returned error response at standby term " + << req.standby_node_term(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + continue; } + + notify_succ = true; + break; } } + else + { + LOG(WARNING) << "OnSnapshotSynced channel is nullptr for " + << "standby node #" << req.standby_node_id() + << " at standby term " << req.standby_node_term(); + } + + if (!notify_succ) + { + // Keep pending/barrier so standby can retry with the same + // standby term. + continue; + } - // We don't care if the notification is successful or not. - // We just need to erase the same request. Even if the notification - // fails, after a while, the standby node will resend the - // request. { std::unique_lock lk(standby_sync_mux_); + MarkSnapshotSyncCompletedLocked(req.standby_node_id(), + req.standby_node_term()); auto pending_req_iter = pending_req_.find(req.standby_node_id()); if (pending_req_iter != pending_req_.end())