diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_design.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_design.md new file mode 100644 index 00000000..7882c9ee --- /dev/null +++ b/.cursor_knowledge/standby_snapshot_subscription_barrier_design.md @@ -0,0 +1,64 @@ +# Standby Snapshot Subscription Barrier: Design + +## 1. Background +Snapshot sync for standby was previously gated mostly by leader term and +subscribe-id coverage. That was not enough to ensure snapshot content is after +all transactions that were active when standby subscription became effective. + +## 2. Goal +Introduce a subscription barrier per standby epoch: +- `barrier_ts = max ActiveTxMaxTs across all local ccshards` +- A snapshot is valid for that epoch only when: + - `current_ckpt_ts > barrier_ts` + +This guarantees the snapshot is after all transactions active at the +subscription success point. + +## 3. Key decisions +- Barrier sampling point is **`ResetStandbySequenceId` success path on primary** + (not `StandbyStartFollowing` and not `RequestStorageSnapshotSync`). +- Barrier key is `(standby_node_id, standby_term)`. +- `standby_term = (primary_term << 32) | subscribe_id`. +- Snapshot worker uses a lightweight checkpoint-ts probe before running heavy + checkpoint/flush. + +## 4. Runtime flow +1. Standby calls `StandbyStartFollowing`, receives `subscribe_id` and start seq. +2. Standby calls `ResetStandbySequenceId`. +3. Primary marks standby as subscribed on all shards and samples + `global_active_tx_max_ts` using `ActiveTxMaxTsCc`. +4. Primary registers barrier in `SnapshotManager`: + - `subscription_barrier_[node_id][standby_term] = barrier_ts` +5. Standby calls `RequestStorageSnapshotSync` with `standby_term`. +6. Primary validates barrier existence and enqueues one pending task per node + with attached `subscription_active_tx_max_ts`. +7. `StandbySyncWorker` loop: + - Probe `current_ckpt_ts` via `GetCurrentCheckpointTs()` + - Select pending tasks that satisfy: + - same primary term + - `subscribe_id < current_subscribe_id` + - `current_ckpt_ts > barrier_ts` + - If no task is eligible, skip heavy checkpoint for this round. + - If at least one task is eligible, run `RunOneRoundCheckpoint`, build/send + snapshot, then notify standby. + +## 5. Pending and cleanup model +- Pending tasks blocked by subscribe-id or barrier remain queued for retry. +- Worker uses retry backoff wait when pending queue is non-empty but blocked, to + avoid tight loop. +- Superseded standby terms are pruned: + - registering newer barrier can drop older pending task for the same node + - older barriers are removed +- On leader loss, all pending tasks and barriers are cleared. +- On node removal, `EraseSubscriptionBarriersByNode` clears both pending and + barriers for that node. + +## 6. Safety properties +- Missing barrier on snapshot-sync request is rejected (safe default). +- Barrier is epoch-scoped and never shared across standby terms. +- All barrier/pending updates are under `standby_sync_mux_`. + +## 7. Expected effects +- Prevent early snapshots that miss writes from subscription-time active txns. +- Avoid unnecessary heavy checkpoint rounds when no task is currently eligible. +- Keep existing transport/retry semantics for snapshot send/notify. diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation.md new file mode 100644 index 00000000..ff758599 --- /dev/null +++ b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation.md @@ -0,0 +1,101 @@ +# Standby Snapshot Subscription Barrier: Implementation + +## 1. Scope of code changes +- `tx_service/src/remote/cc_node_service.cpp` +- `tx_service/src/fault/cc_node.cpp` +- `tx_service/include/cc/cc_request.h` (`ActiveTxMaxTsCc`) +- `tx_service/include/store/snapshot_manager.h` +- `tx_service/src/store/snapshot_manager.cpp` + +## 2. New/updated state + +### 2.1 Barrier registry in `SnapshotManager` +Add a map keyed by standby node and standby term: +- `subscription_barrier_[standby_node_id][standby_term] = barrier_ts` + +### 2.2 Pending snapshot task extension +Pending value is a task struct: +- `req` (`StorageSnapshotSyncRequest`) +- `subscription_active_tx_max_ts` + +## 3. New APIs in `SnapshotManager` +- `RegisterSubscriptionBarrier(standby_node_id, standby_term, barrier_ts)` +- `GetSubscriptionBarrier(standby_node_id, standby_term, uint64_t* out)` +- `EraseSubscriptionBarrier(standby_node_id, standby_term)` +- `EraseSubscriptionBarriersByNode(standby_node_id)` +- `GetCurrentCheckpointTs(node_group) -> uint64_t` +- `RunOneRoundCheckpoint(node_group, leader_term) -> bool` + +Barrier/pending updates are protected by `standby_sync_mux_`. + +## 4. Barrier collection in `ResetStandbySequenceId` +In `CcNodeService::ResetStandbySequenceId` on primary: +1. Move standby from candidate to subscribed on all shards. +2. Validate leader term. +3. If barrier for `(node_id, standby_term)` does not exist: + - run `ActiveTxMaxTsCc` across all shards + - compute global max + - call `SnapshotManager::RegisterSubscriptionBarrier(...)` + +This makes the sampling point aligned with "subscription success". + +## 5. `RequestStorageSnapshotSync` path changes +In `SnapshotManager::OnSnapshotSyncRequested`: +1. Parse `(standby_node_id, standby_term)` from request. +2. Query barrier by `(standby_node_id, standby_term)`. +3. If not found: reject request. +4. If found: enqueue task with barrier ts. + +Dedup is still term-based per standby node. + +## 6. Snapshot gating logic +`SnapshotManager::SyncWithStandby` now runs in two phases: +1. Lightweight phase: + - `current_ckpt_ts = GetCurrentCheckpointTs(node_group)` + - Select tasks that satisfy: + - term alignment + - `subscribe_id < current_subscribe_id` + - `current_ckpt_ts > subscription_active_tx_max_ts` + - If no task is eligible, return directly. +2. Heavy phase: + - Run `RunOneRoundCheckpoint(...)` (flush) + - Create/send snapshot and notify standby for selected tasks. + +## 7. Worker retry pacing +`StandbySyncWorker` keeps existing wake condition on non-empty pending queue, and +adds short wait-for backoff (`200ms`) when requests remain pending after a +round, to avoid tight retry loops. + +## 8. Cleanup rules +- On successful snapshot completion for `(node_id, standby_term)`: erase barrier entry. +- On registering newer term for same node: prune older barriers and drop older + pending task. +- On node removal: `EraseSubscriptionBarriersByNode(node_id)` clears both + pending and barrier entries. +- On leader loss in sync loop: clear all pending and barriers. + +## 9. Failure behavior +- Missing barrier on sync request: reject request (safe default). +- Checkpoint failure: keep task queued for next rounds. +- Snapshot transfer failure: task stays pending and retries in later rounds. + +## 10. Standby-side rejection handling +In `CcNode::SubscribePrimaryNode`, if `ResetStandbySequenceId` is rejected by +primary, local standby following state is rolled back: +- unsubscribe per-shard standby sequence groups +- reset standby/candidate standby term if still on the failed term +- guard against clobbering newer resubscribe attempts. + +## 11. Tests + +### Unit tests +- barrier register/get/erase and supersession behavior +- pending dedup with barrier replacement +- gating boundaries (`current_ckpt_ts == / < / > barrier_ts`) + +### Integration tests +- long-running active tx at subscription success blocks snapshot until + `current_ckpt_ts > barrier` +- multiple standbys with independent barriers +- repeated sync-request retries with same standby term +- leader term switch cleanup correctness diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index 97e93fae..6ddd1be5 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -3225,6 +3225,65 @@ struct CkptTsCc : public CcRequestBase NodeGroupId cc_ng_id_; }; +struct ActiveTxMaxTsCc : public CcRequestBase +{ +public: + ActiveTxMaxTsCc(size_t shard_cnt, NodeGroupId ng_id) + : active_tx_max_ts_(0), + mux_(), + cv_(), + unfinish_cnt_(shard_cnt), + cc_ng_id_(ng_id) + { + } + + ActiveTxMaxTsCc() = delete; + ActiveTxMaxTsCc(const ActiveTxMaxTsCc &) = delete; + ActiveTxMaxTsCc(ActiveTxMaxTsCc &&) = delete; + + bool Execute(CcShard &ccs) override + { + uint64_t shard_active_tx_max_ts = ccs.ActiveTxMaxTs(cc_ng_id_); + + uint64_t old_val = active_tx_max_ts_.load(std::memory_order_relaxed); + while (old_val < shard_active_tx_max_ts && + !active_tx_max_ts_.compare_exchange_weak( + old_val, shard_active_tx_max_ts, std::memory_order_acq_rel)) + ; + + std::unique_lock lk(mux_); + if (--unfinish_cnt_ == 0) + { + cv_.notify_one(); + } + + // return false since ActiveTxMaxTsCc is not reused and does not need + // to call CcRequestBase::Free + return false; + } + + void Wait() + { + std::unique_lock lk(mux_); + while (unfinish_cnt_ > 0) + { + cv_.wait(lk); + } + } + + uint64_t GetActiveTxMaxTs() const + { + return active_tx_max_ts_.load(std::memory_order_relaxed); + } + +private: + std::atomic active_tx_max_ts_; + bthread::Mutex mux_; + bthread::ConditionVariable cv_; + size_t unfinish_cnt_; + NodeGroupId cc_ng_id_; +}; + struct ProcessRemoteScanRespCc : public CcRequestBase { public: diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index 09e4081d..ef1ed057 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -664,6 +664,24 @@ class CcShard return min_ts; } + uint64_t ActiveTxMaxTs(NodeGroupId cc_ng_id) const + { + uint64_t max_ts = 0; + auto it = lock_holding_txs_.find(cc_ng_id); + if (it != lock_holding_txs_.end()) + { + for (const auto &tx_pair : it->second) + { + if (!TableName::IsMeta(tx_pair.second->table_type_) && + tx_pair.second->wlock_ts_ != 0) + { + max_ts = std::max(max_ts, tx_pair.second->wlock_ts_); + } + } + } + return max_ts; + } + /** * Try to reduce the size of lock array if it becomes sparse. * @@ -1090,6 +1108,18 @@ class CcShard } return node_ids; } + + std::vector GetCandidateStandbys() + { + std::vector node_ids; + node_ids.reserve(candidate_standby_nodes_.size()); + for (const auto &it : candidate_standby_nodes_) + { + node_ids.push_back(it.first); + } + return node_ids; + } + void ResetStandbySequence(); void DecrInflightStandbyReqCount(uint32_t seq_grp); diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index a2b31c8e..a4888664 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -1379,22 +1379,12 @@ class ObjectCcMap : public TemplateCcMap assert(ccp != nullptr); bool s_obj_exist = (cce->PayloadStatus() == RecordStatus::Normal); - StandbyForwardEntry *forward_entry = nullptr; auto subscribed_standbys = shard_->GetSubscribedStandbys(); - if (!subscribed_standbys.empty()) - { - forward_entry = cce->ForwardEntry(); - if (forward_entry == nullptr) - { - LOG(ERROR) << "Subscribed standbys exist, but forward_entry is " - "null. Data loss may occur. Notifying standbys " - "to resubscribe."; - for (uint32_t node_id : subscribed_standbys) - { - shard_->NotifyStandbyOutOfSync(node_id); - } - } - } + bool has_subscribed_standby = !subscribed_standbys.empty(); + StandbyForwardEntry *forward_entry = cce->ForwardEntry(); + LOG_IF(WARNING, has_subscribed_standby && forward_entry == nullptr) + << "Subscribed standbys exist, but forward_entry is null. " + "Data loss may occur."; if (commit_ts > 0) { RecordStatus dirty_payload_status = cce->DirtyPayloadStatus(); @@ -1436,12 +1426,20 @@ class ObjectCcMap : public TemplateCcMap } if (forward_entry) { - // Set commit ts and send the msg to standby node - forward_entry->Request().set_commit_ts(commit_ts); - forward_entry->Request().set_schema_version(schema_ts_); - std::unique_ptr entry_ptr = + if (has_subscribed_standby) + { + // Set commit ts and send the msg to standby node. + forward_entry->Request().set_commit_ts(commit_ts); + forward_entry->Request().set_schema_version(schema_ts_); + std::unique_ptr entry_ptr = + cce->ReleaseForwardEntry(); + shard_->ForwardStandbyMessage(entry_ptr.release()); + } + else + { + // No standby needs this entry anymore. cce->ReleaseForwardEntry(); - shard_->ForwardStandbyMessage(entry_ptr.release()); + } } bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(commit_ts, payload_status); diff --git a/tx_service/include/store/snapshot_manager.h b/tx_service/include/store/snapshot_manager.h index 8004d53d..f7619525 100644 --- a/tx_service/include/store/snapshot_manager.h +++ b/tx_service/include/store/snapshot_manager.h @@ -57,9 +57,52 @@ class SnapshotManager void Start(); void Shutdown(); - void OnSnapshotSyncRequested( + // Handle snapshot sync request from standby node. Returns true if the + // request is accepted (queued or safely deduped), false otherwise. + bool OnSnapshotSyncRequested( const txservice::remote::StorageSnapshotSyncRequest *req); + /** + * @brief Register the active-tx barrier captured at standby subscription + * time. + * @param standby_node_id Standby's node id. + * @param standby_node_term Standby's term ((primary_term << 32) | + * subscribe_id). + * @param active_tx_max_ts Maximum active write-tx timestamp observed at + * subscription time. + */ + void RegisterSubscriptionBarrier(uint32_t standby_node_id, + int64_t standby_node_term, + uint64_t active_tx_max_ts); + + /** + * @brief Lookup subscription barrier by standby node id and standby term. + * @param standby_node_id Standby's node id. + * @param standby_node_term Standby's term ((primary_term << 32) | + * subscribe_id). + * @param active_tx_max_ts Output barrier timestamp if found. + * @return true if barrier exists, false otherwise. + */ + bool GetSubscriptionBarrier(uint32_t standby_node_id, + int64_t standby_node_term, + uint64_t *active_tx_max_ts); + + /** + * @brief Remove an existing subscription barrier. + * @param standby_node_id Standby's node id. + * @param standby_node_term Standby's term ((primary_term << 32) | + * subscribe_id). + */ + void EraseSubscriptionBarrier(uint32_t standby_node_id, + int64_t standby_node_term); + + /** + * @brief Remove all subscription barriers and queued sync task for one + * standby node. + * @param standby_node_id Standby's node id. + */ + void EraseSubscriptionBarriersByNode(uint32_t standby_node_id); + txservice::remote::BackupTaskStatus CreateBackup( const txservice::remote::CreateBackupRequest *req); @@ -74,7 +117,24 @@ class SnapshotManager // Run one round checkpoint to flush data in memory to kvstore. bool RunOneRoundCheckpoint(uint32_t node_group, int64_t ng_leader_term); + // Collect current checkpoint ts from cc shards without triggering data + // flush. + uint64_t GetCurrentCheckpointTs(uint32_t node_group); + private: + /** + * @brief Remove one subscription barrier entry while caller already holds + * standby_sync_mux_. + */ + void EraseSubscriptionBarrierLocked(uint32_t standby_node_id, + int64_t standby_node_term); + + struct PendingSnapshotSyncTask + { + txservice::remote::StorageSnapshotSyncRequest req; + uint64_t subscription_active_tx_max_ts{0}; + }; + SnapshotManager() = default; ~SnapshotManager() = default; @@ -93,8 +153,11 @@ class SnapshotManager std::thread standby_sync_worker_; std::mutex standby_sync_mux_; std::condition_variable standby_sync_cv_; - std::unordered_map - pending_req_; + std::unordered_map pending_req_; + // standby node id -> (standby node term -> subscription-time active tx + // max ts) + std::unordered_map> + subscription_barrier_; bool terminated_{false}; const std::string backup_path_; diff --git a/tx_service/src/fault/cc_node.cpp b/tx_service/src/fault/cc_node.cpp index a54ba8f4..cf5dbabe 100644 --- a/tx_service/src/fault/cc_node.cpp +++ b/tx_service/src/fault/cc_node.cpp @@ -1074,9 +1074,92 @@ void CcNode::SubscribePrimaryNode(uint32_t leader_node_id, cntl.Reset(); stub->ResetStandbySequenceId(&cntl, &reset_req, &reset_resp, nullptr); + auto rollback_failed_following_state = [this, standby_term, primary_term]() + { + // If a newer subscribe term has already taken over, do not touch local + // standby state here. + if (requested_subscribe_primary_term_.load(std::memory_order_acquire) != + primary_term) + { + return false; + } + + if (Sharder::Instance().StandbyNodeTerm() != standby_term && + Sharder::Instance().CandidateStandbyNodeTerm() != standby_term) + { + return false; + } + + // Reset per-shard subscribe state only when this failed standby term is + // still the active local follow term. + WaitableCc rollback_cc( + [this, standby_term, primary_term](CcShard &ccs) + { + if (requested_subscribe_primary_term_.load( + std::memory_order_acquire) != primary_term) + { + return true; + } + + if (Sharder::Instance().StandbyNodeTerm() != standby_term && + Sharder::Instance().CandidateStandbyNodeTerm() != + standby_term) + { + return true; + } + + auto &grps = ccs.GetStandbysequenceGrps(); + for (auto &entry : grps) + { + auto &grp_info = entry.second; + if (grp_info.subscribed_) + { + grp_info.Unsubscribe(); + } + } + + return true; + }, + local_cc_shards_.Count()); + for (uint32_t core_id = 0; core_id < local_cc_shards_.Count(); + core_id++) + { + local_cc_shards_.EnqueueCcRequest(core_id, &rollback_cc); + } + rollback_cc.Wait(); + + if (requested_subscribe_primary_term_.load(std::memory_order_acquire) != + primary_term) + { + return false; + } + + if (Sharder::Instance().StandbyNodeTerm() == standby_term) + { + Sharder::Instance().SetStandbyNodeTerm(-1); + } + if (Sharder::Instance().CandidateStandbyNodeTerm() == standby_term) + { + Sharder::Instance().SetCandidateStandbyNodeTerm(-1); + } + + return true; + }; + // Check rpc result - while (cntl.Failed()) + while (cntl.Failed() || reset_resp.error()) { + if (reset_resp.error()) + { + bool rolled_back = rollback_failed_following_state(); + LOG(WARNING) + << "ResetStandbySequenceId rejected by primary, ng_id: " + << ng_id_ << ", standby_term: " << standby_term + << ", primary_node_id: " << leader_node_id + << ", rolled_back_local_state: " << rolled_back; + return; + } + // We only need to retry if the message is not delivered. if (Sharder::Instance().LeaderTerm(ng_id_) > 0 || Sharder::Instance().CandidateLeaderTerm(ng_id_) > 0 || diff --git a/tx_service/src/remote/cc_node_service.cpp b/tx_service/src/remote/cc_node_service.cpp index 123cd440..e1e0160c 100644 --- a/tx_service/src/remote/cc_node_service.cpp +++ b/tx_service/src/remote/cc_node_service.cpp @@ -1657,7 +1657,6 @@ void CcNodeService::StandbyStartFollowing( } auto subscribe_id = Sharder::Instance().GetNextSubscribeId(); - response->set_subscribe_id(subscribe_id); response->set_error(false); } @@ -1776,8 +1775,9 @@ void CcNodeService::RequestStorageSnapshotSync( // Then, notify standby nodes that data committed before subscribe timepoint // has been flushed to kvstore. (standby nodes begin fetch record from // kvstore on cache miss). - store::SnapshotManager::Instance().OnSnapshotSyncRequested(request); - response->set_error(false); + bool accepted = + store::SnapshotManager::Instance().OnSnapshotSyncRequested(request); + response->set_error(!accepted); } void CcNodeService::OnSnapshotSynced( @@ -1884,6 +1884,37 @@ void CcNodeService::ResetStandbySequenceId( } reset_seq_cc.Wait(); + if (!Sharder::Instance().CheckLeaderTerm( + ng_id, PrimaryTermFromStandbyTerm(standby_node_term))) + { + LOG(WARNING) + << "Reject ResetStandbySequenceId due to leader term mismatch, " + << "ng_id: " << ng_id << ", node_id: " << node_id + << ", standby_term: " << standby_node_term; + response->set_error(true); + return; + } + + uint64_t existing_barrier_ts = 0; + bool has_existing_barrier = + store::SnapshotManager::Instance().GetSubscriptionBarrier( + node_id, standby_node_term, &existing_barrier_ts); + (void) existing_barrier_ts; + if (!has_existing_barrier) + { + ActiveTxMaxTsCc active_tx_max_ts_cc(local_shards_.Count(), ng_id); + for (uint16_t core_id = 0; core_id < local_shards_.Count(); core_id++) + { + local_shards_.EnqueueCcRequest(core_id, &active_tx_max_ts_cc); + } + active_tx_max_ts_cc.Wait(); + uint64_t global_active_tx_max_ts = + active_tx_max_ts_cc.GetActiveTxMaxTs(); + + store::SnapshotManager::Instance().RegisterSubscriptionBarrier( + node_id, standby_node_term, global_active_tx_max_ts); + } + response->set_error(false); } diff --git a/tx_service/src/sharder.cpp b/tx_service/src/sharder.cpp index 0c761d6c..69550de8 100644 --- a/tx_service/src/sharder.cpp +++ b/tx_service/src/sharder.cpp @@ -39,6 +39,7 @@ #include "remote/cc_node_service.h" #include "remote/cc_stream_receiver.h" #include "remote/cc_stream_sender.h" +#include "store/snapshot_manager.h" #include "tx_command.h" #include "tx_service.h" #include "tx_worker_pool.h" @@ -1299,6 +1300,7 @@ void Sharder::UpdateInMemoryClusterConfig( std::shared_ptr> new_nodes_sptr, uint64_t version) { + std::vector removed_node_ids; { std::unique_lock lk(cluster_cnf_mux_); if (cluster_config_.version_ >= version) @@ -1368,6 +1370,16 @@ void Sharder::UpdateInMemoryClusterConfig( cluster_config_.ng_configs_.try_emplace(ng_pair.first); ng_cnf_it.first->second = ng_pair.second; } + + // Nodes removed from cluster config in this update. + for (const auto &[node_id, _] : *cluster_config_.nodes_configs_) + { + if (new_nodes_sptr->find(node_id) == new_nodes_sptr->end()) + { + removed_node_ids.emplace_back(node_id); + } + } + cluster_config_.version_ = version; cluster_config_.nodes_configs_ = std::move(new_nodes_sptr); if (cluster_config_.ng_ids_.use_count() == 1) @@ -1402,6 +1414,15 @@ void Sharder::UpdateInMemoryClusterConfig( ccs.RemoveSubscribedStandby(node_id); } } + std::vector candidate_standby_nodes = + ccs.GetCandidateStandbys(); + for (auto node_id : candidate_standby_nodes) + { + if (nodes_configs->find(node_id) == nodes_configs->end()) + { + ccs.RemoveCandidateStandby(node_id); + } + } return true; }, @@ -1413,6 +1434,12 @@ void Sharder::UpdateInMemoryClusterConfig( } remove_subscribed_standby_cc.Wait(); + + for (auto node_id : removed_node_ids) + { + store::SnapshotManager::Instance().EraseSubscriptionBarriersByNode( + node_id); + } } void Sharder::UpdateClusterConfig( diff --git a/tx_service/src/store/snapshot_manager.cpp b/tx_service/src/store/snapshot_manager.cpp index 179d915c..e925006b 100644 --- a/tx_service/src/store/snapshot_manager.cpp +++ b/tx_service/src/store/snapshot_manager.cpp @@ -52,6 +52,7 @@ void SnapshotManager::Shutdown() void SnapshotManager::StandbySyncWorker() { + constexpr auto kBlockedTaskRetryInterval = std::chrono::milliseconds(200); while (true) { std::unique_lock lk(standby_sync_mux_); @@ -66,10 +67,22 @@ void SnapshotManager::StandbySyncWorker() lk.unlock(); SyncWithStandby(); lk.lock(); + + if (terminated_) + { + return; + } + + if (!pending_req_.empty()) + { + // Pending requests are still blocked by subscribe/barrier checks. + // Back off to avoid tight checkpoint retry loops. + standby_sync_cv_.wait_for(lk, kBlockedTaskRetryInterval); + } } } -void SnapshotManager::OnSnapshotSyncRequested( +bool SnapshotManager::OnSnapshotSyncRequested( const txservice::remote::StorageSnapshotSyncRequest *req) { DLOG(INFO) << "Received snapshot sync request from standby node #" @@ -79,31 +92,182 @@ void SnapshotManager::OnSnapshotSyncRequested( if (store_hd_ == nullptr) { LOG(ERROR) << "Store handler is nullptr but standby feature enabled."; - return; + return false; } std::unique_lock lk(standby_sync_mux_); if (!terminated_) { + auto node_it = subscription_barrier_.find(req->standby_node_id()); + if (node_it == subscription_barrier_.end()) + { + LOG(WARNING) << "No subscription barrier found for standby node #" + << req->standby_node_id() + << ", standby term: " << req->standby_node_term(); + return false; + } + + auto barrier_it = node_it->second.find(req->standby_node_term()); + if (barrier_it == node_it->second.end()) + { + LOG(WARNING) << "No barrier found for standby node #" + << req->standby_node_id() + << " at standby term: " << req->standby_node_term(); + return false; + } + + uint64_t active_tx_max_ts = barrier_it->second; + auto ins_pair = pending_req_.try_emplace(req->standby_node_id()); if (!ins_pair.second) { // check if the queued task is newer than the new received req. If // so, discard the new req, otherwise, update the task. auto &cur_task = ins_pair.first->second; - int64_t cur_task_standby_node_term = cur_task.standby_node_term(); + int64_t cur_task_standby_node_term = + cur_task.req.standby_node_term(); int64_t req_standby_node_term = req->standby_node_term(); if (cur_task_standby_node_term >= req_standby_node_term) { // discard the task. - return; + return true; } } - ins_pair.first->second.CopyFrom(*req); + ins_pair.first->second.req.CopyFrom(*req); + ins_pair.first->second.subscription_active_tx_max_ts = active_tx_max_ts; standby_sync_cv_.notify_all(); + return true; + } + + return false; +} + +void SnapshotManager::RegisterSubscriptionBarrier(uint32_t standby_node_id, + int64_t standby_node_term, + uint64_t active_tx_max_ts) +{ + std::unique_lock lk(standby_sync_mux_); + + // Ignore out-of-order old barrier registrations when a newer standby term + // is already known for this standby node. + auto node_it = subscription_barrier_.find(standby_node_id); + if (node_it != subscription_barrier_.end()) + { + for (const auto &entry : node_it->second) + { + int64_t existing_term = entry.first; + if (existing_term > standby_node_term) + { + DLOG(INFO) + << "Ignore stale subscription barrier registration for " + "standby node #" + << standby_node_id << ", term " << standby_node_term + << " because newer term " << existing_term + << " already exists"; + return; + } + } + } + + // Drop queued work from older standby terms. They are superseded by this + // new subscription barrier and should not be synced anymore. + auto pending_it = pending_req_.find(standby_node_id); + if (pending_it != pending_req_.end() && + pending_it->second.req.standby_node_term() > standby_node_term) + { + DLOG(INFO) << "Ignore stale barrier registration for standby node #" + << standby_node_id << ", term " << standby_node_term + << " because queued pending task term " + << pending_it->second.req.standby_node_term() << " is newer"; + return; + } + + if (pending_it != pending_req_.end() && + pending_it->second.req.standby_node_term() < standby_node_term) + { + pending_req_.erase(pending_it); + } + + auto &node_barriers = subscription_barrier_[standby_node_id]; + + // Keep only current and newer terms for this node. + auto it = node_barriers.begin(); + while (it != node_barriers.end()) + { + if (it->first < standby_node_term) + { + it = node_barriers.erase(it); + } + else + { + ++it; + } + } + + // Keep the first registered barrier for the same standby term to make + // duplicate reset/subscribe retries idempotent. + if (node_barriers.find(standby_node_term) == node_barriers.end()) + { + node_barriers[standby_node_term] = active_tx_max_ts; + } +} + +bool SnapshotManager::GetSubscriptionBarrier(uint32_t standby_node_id, + int64_t standby_node_term, + uint64_t *active_tx_max_ts) +{ + if (active_tx_max_ts == nullptr) + { + return false; + } + + std::unique_lock lk(standby_sync_mux_); + auto node_it = subscription_barrier_.find(standby_node_id); + if (node_it == subscription_barrier_.end()) + { + return false; + } + + auto barrier_it = node_it->second.find(standby_node_term); + if (barrier_it == node_it->second.end()) + { + return false; + } + + *active_tx_max_ts = barrier_it->second; + return true; +} + +void SnapshotManager::EraseSubscriptionBarrier(uint32_t standby_node_id, + int64_t standby_node_term) +{ + std::unique_lock lk(standby_sync_mux_); + EraseSubscriptionBarrierLocked(standby_node_id, standby_node_term); +} + +void SnapshotManager::EraseSubscriptionBarriersByNode(uint32_t standby_node_id) +{ + std::unique_lock lk(standby_sync_mux_); + pending_req_.erase(standby_node_id); + subscription_barrier_.erase(standby_node_id); +} + +void SnapshotManager::EraseSubscriptionBarrierLocked(uint32_t standby_node_id, + int64_t standby_node_term) +{ + auto node_it = subscription_barrier_.find(standby_node_id); + if (node_it == subscription_barrier_.end()) + { + return; + } + + node_it->second.erase(standby_node_term); + if (node_it->second.empty()) + { + subscription_barrier_.erase(node_it); } } @@ -130,50 +294,36 @@ void SnapshotManager::SyncWithStandby() std::unique_lock lk(standby_sync_mux_); // clear all requests pending_req_.clear(); + // clear barriers as well, all queued sync states are stale when this + // leader term is no longer valid. + subscription_barrier_.clear(); return; } uint32_t current_subscribe_id = Sharder::Instance().GetCurrentSubscribeId(); - bool ckpt_res = this->RunOneRoundCheckpoint(node_group, leader_term); + uint64_t cur_ckpt_ts = GetCurrentCheckpointTs(node_group); - if (!ckpt_res) - { - // data flush failed. Retry on next run. - LOG(ERROR) << "Failed to do checkpoint on SyncWithStandby"; - return; - } - - // Now take a snapshot for non-shared storage, and then send to standby - // node. - std::vector snapshot_files; - if (!store_hd_->IsSharedStorage()) - { - bool res = store_hd_->CreateSnapshotForStandby(snapshot_files); - if (!res) - { - LOG(ERROR) << "Failed to create snpashot for sync with standby"; - return; - } - } - - std::vector tasks; + std::vector tasks; - // Dequeue all pending tasks that can be covered by this snapshot. + // Pick all pending tasks that can be covered by current checkpoint ts. { std::unique_lock lk(standby_sync_mux_); auto it = pending_req_.begin(); while (it != pending_req_.end()) { + uint32_t pending_node_id = it->first; auto &pending_task = it->second; int64_t pending_task_standby_node_term = - pending_task.standby_node_term(); + pending_task.req.standby_node_term(); int64_t pending_task_primary_term = PrimaryTermFromStandbyTerm(pending_task_standby_node_term); if (pending_task_primary_term < leader_term) { // discard the task. + EraseSubscriptionBarrierLocked(pending_node_id, + pending_task_standby_node_term); it = pending_req_.erase(it); continue; } @@ -198,6 +348,18 @@ void SnapshotManager::SyncWithStandby() << pending_task_subscribe_id << ". Wait for next round"; covered = false; } + else if (cur_ckpt_ts <= pending_task.subscription_active_tx_max_ts) + { + LOG(INFO) << "Current checkpoint ts " << cur_ckpt_ts + << " does not pass subscription barrier ts " + << pending_task.subscription_active_tx_max_ts + << " for standby node #" + << pending_task.req.standby_node_id() + << ", standby term " + << pending_task.req.standby_node_term() + << ". Wait for next round"; + covered = false; + } if (!covered) { @@ -206,7 +368,10 @@ void SnapshotManager::SyncWithStandby() it++; continue; } - tasks.emplace_back(std::move(pending_task)); + // Keep a copy for current round execution. Do not move out of + // pending_req_, because completion logic still needs the original + // standby term to decide whether the queued task is stale. + tasks.emplace_back(pending_task); // Keep the request entry so completion logic can check whether it // needs to stay queued, but make sure to advance the iterator so we @@ -216,14 +381,56 @@ void SnapshotManager::SyncWithStandby() } } - for (auto &req : tasks) + if (tasks.empty()) + { + return; + } + + bool ckpt_res = this->RunOneRoundCheckpoint(node_group, leader_term); + + if (!ckpt_res) + { + // Data flush failed. Retry on next run. + LOG(ERROR) << "Failed to do checkpoint on SyncWithStandby"; + return; + } + + // Now take a snapshot for non-shared storage, and then send to standby + // node. + std::vector snapshot_files; + if (!store_hd_->IsSharedStorage()) + { + bool res = store_hd_->CreateSnapshotForStandby(snapshot_files); + if (!res) + { + LOG(ERROR) << "Failed to create snpashot for sync with standby"; + return; + } + } + + for (auto &task : tasks) { + auto &req = task.req; uint32_t node_id = req.standby_node_id(); + int64_t req_standby_node_term = req.standby_node_term(); + + // Skip stale copied tasks that have already been superseded/removed + // after this round snapshot task list was built. + { + std::unique_lock lk(standby_sync_mux_); + auto pending_it = pending_req_.find(node_id); + if (pending_it == pending_req_.end() || + pending_it->second.req.standby_node_term() != + req_standby_node_term) + { + continue; + } + } + std::string ip; uint16_t port; Sharder::Instance().GetNodeAddress(node_id, ip, port); std::string remote_dest = req.user() + "@" + ip + ":" + req.dest_path(); - int64_t req_standby_node_term = req.standby_node_term(); int64_t req_primary_term = PrimaryTermFromStandbyTerm(req_standby_node_term); @@ -288,41 +495,73 @@ void SnapshotManager::SyncWithStandby() // We just need to erase the same request. Even if the notification // fails, after a while, the standby node will resend the // request. - std::unique_lock lk(standby_sync_mux_); - auto pending_req_iter = pending_req_.find(req.standby_node_id()); - if (pending_req_iter != pending_req_.end()) { - // Check again to see if the request has been updated. - auto &next_pending_task = pending_req_iter->second; - int64_t next_pending_task_standby_term = - next_pending_task.standby_node_term(); - int64_t next_pending_task_primary_term = - PrimaryTermFromStandbyTerm(next_pending_task_standby_term); - - assert(PrimaryTermFromStandbyTerm(req.standby_node_term()) == - leader_term); - - if (next_pending_task_primary_term < leader_term) - { - pending_req_.erase(pending_req_iter); - } - else if (next_pending_task_primary_term == leader_term) + std::unique_lock lk(standby_sync_mux_); + auto pending_req_iter = + pending_req_.find(req.standby_node_id()); + if (pending_req_iter != pending_req_.end()) { - uint32_t next_pending_task_subscribe_id = - SubscribeIdFromStandbyTerm( + // Check again to see if the request has been updated. + auto &next_pending_task = pending_req_iter->second; + int64_t next_pending_task_standby_term = + next_pending_task.req.standby_node_term(); + int64_t next_pending_task_primary_term = + PrimaryTermFromStandbyTerm( next_pending_task_standby_term); - uint32_t cur_task_subscribe_id = - SubscribeIdFromStandbyTerm(req.standby_node_term()); - if (next_pending_task_subscribe_id <= cur_task_subscribe_id) + + assert(PrimaryTermFromStandbyTerm( + req.standby_node_term()) == leader_term); + + if (next_pending_task_primary_term < leader_term) { + EraseSubscriptionBarrierLocked( + req.standby_node_id(), + next_pending_task_standby_term); pending_req_.erase(pending_req_iter); } + else if (next_pending_task_primary_term == leader_term) + { + uint32_t next_pending_task_subscribe_id = + SubscribeIdFromStandbyTerm( + next_pending_task_standby_term); + uint32_t cur_task_subscribe_id = + SubscribeIdFromStandbyTerm(req.standby_node_term()); + if (next_pending_task_subscribe_id <= + cur_task_subscribe_id) + { + EraseSubscriptionBarrierLocked( + req.standby_node_id(), + next_pending_task_standby_term); + pending_req_.erase(pending_req_iter); + } + } } } + + EraseSubscriptionBarrier(req.standby_node_id(), + req.standby_node_term()); } } } +uint64_t SnapshotManager::GetCurrentCheckpointTs(uint32_t node_group) +{ + auto local_shards = Sharder::Instance().GetLocalCcShards(); + assert(local_shards != nullptr); + if (local_shards == nullptr) + { + return 0; + } + + CkptTsCc ckpt_req(local_shards->Count(), node_group); + for (size_t i = 0; i < local_shards->Count(); i++) + { + local_shards->EnqueueCcRequest(i, &ckpt_req); + } + ckpt_req.Wait(); + return ckpt_req.GetCkptTs(); +} + bool SnapshotManager::RunOneRoundCheckpoint(uint32_t node_group, int64_t ng_leader_term) { @@ -341,17 +580,6 @@ bool SnapshotManager::RunOneRoundCheckpoint(uint32_t node_group, bool can_be_skipped = false; uint64_t last_ckpt_ts = Sharder::Instance().GetNodeGroupCkptTs(node_group); - size_t core_cnt = local_shards.Count(); - CkptTsCc ckpt_req(core_cnt, node_group); - - // Find minimum ckpt_ts from all the ccshards in parallel. ckpt_ts is - // the minimum timestamp minus 1 among all the active transactions, thus - // it's safe to flush all the entries smaller than or equal to ckpt_ts. - for (size_t i = 0; i < local_shards.Count(); i++) - { - local_shards.EnqueueCcRequest(i, &ckpt_req); - } - ckpt_req.Wait(); // Iterate all the tables and execute CkptScanCc requests on this node // group's ccmaps on each ccshard. The result of CkptScanCc is stored in