From 0cea0f64e593306b110c53480cb5afc8de0ff096 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 9 Mar 2026 17:25:42 +0800 Subject: [PATCH 01/14] feat(tx_service): add ActiveTxMaxTs aggregation request and candidate-aware forward-entry creation - add CcShard::ActiveTxMaxTs(NodeGroupId) for per-shard max active write-tx ts - add ActiveTxMaxTsCc (CkptTsCc-style) to aggregate max ts across all CcShards - create standby forward entry when either subscribed or candidate standby exists - add CcShard::HasCandidateStandbys() helper --- tx_service/include/cc/cc_request.h | 59 +++++++++++++++++++++++++++ tx_service/include/cc/cc_shard.h | 23 +++++++++++ tx_service/include/cc/object_cc_map.h | 3 +- 3 files changed, 84 insertions(+), 1 deletion(-) diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index 97e93fae..6ddd1be5 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -3225,6 +3225,65 @@ struct CkptTsCc : public CcRequestBase NodeGroupId cc_ng_id_; }; +struct ActiveTxMaxTsCc : public CcRequestBase +{ +public: + ActiveTxMaxTsCc(size_t shard_cnt, NodeGroupId ng_id) + : active_tx_max_ts_(0), + mux_(), + cv_(), + unfinish_cnt_(shard_cnt), + cc_ng_id_(ng_id) + { + } + + ActiveTxMaxTsCc() = delete; + ActiveTxMaxTsCc(const ActiveTxMaxTsCc &) = delete; + ActiveTxMaxTsCc(ActiveTxMaxTsCc &&) = delete; + + bool Execute(CcShard &ccs) override + { + uint64_t shard_active_tx_max_ts = ccs.ActiveTxMaxTs(cc_ng_id_); + + uint64_t old_val = active_tx_max_ts_.load(std::memory_order_relaxed); + while (old_val < shard_active_tx_max_ts && + !active_tx_max_ts_.compare_exchange_weak( + old_val, shard_active_tx_max_ts, std::memory_order_acq_rel)) + ; + + std::unique_lock lk(mux_); + if (--unfinish_cnt_ == 0) + { + cv_.notify_one(); + } + + // return false since ActiveTxMaxTsCc is not reused and does not need + // to call CcRequestBase::Free + return false; + } + + void Wait() + { + std::unique_lock lk(mux_); + while (unfinish_cnt_ > 0) + { + cv_.wait(lk); + } + } + + uint64_t GetActiveTxMaxTs() const + { + return active_tx_max_ts_.load(std::memory_order_relaxed); + } + +private: + std::atomic active_tx_max_ts_; + bthread::Mutex mux_; + bthread::ConditionVariable cv_; + size_t unfinish_cnt_; + NodeGroupId cc_ng_id_; +}; + struct ProcessRemoteScanRespCc : public CcRequestBase { public: diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index 09e4081d..e1e1b14c 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -664,6 +664,24 @@ class CcShard return min_ts; } + uint64_t ActiveTxMaxTs(NodeGroupId cc_ng_id) const + { + uint64_t max_ts = 0; + auto it = lock_holding_txs_.find(cc_ng_id); + if (it != lock_holding_txs_.end()) + { + for (const auto &tx_pair : it->second) + { + if (!TableName::IsMeta(tx_pair.second->table_type_) && + tx_pair.second->wlock_ts_ != 0) + { + max_ts = std::max(max_ts, tx_pair.second->wlock_ts_); + } + } + } + return max_ts; + } + /** * Try to reduce the size of lock array if it becomes sparse. * @@ -1090,6 +1108,11 @@ class CcShard } return node_ids; } + + bool HasCandidateStandbys() const + { + return !candidate_standby_nodes_.empty(); + } void ResetStandbySequence(); void DecrInflightStandbyReqCount(uint32_t seq_grp); diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index a2b31c8e..322a6849 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -866,7 +866,8 @@ class ObjectCcMap : public TemplateCcMap StandbyForwardEntry *forward_entry = nullptr; remote::KeyObjectStandbyForwardRequest *forward_req = nullptr; - if (!shard_->GetSubscribedStandbys().empty()) + if (!shard_->GetSubscribedStandbys().empty() || + shard_->HasCandidateStandbys()) { forward_entry = cce->ForwardEntry(); if (!forward_entry) From 7bb8418d2ed372bff570701163f7d0fb19a2f627 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 9 Mar 2026 17:26:00 +0800 Subject: [PATCH 02/14] docs(cursor_knowledge): split standby snapshot barrier docs into design and implementation (EN/ZH) - add standalone design docs in English and Chinese - add standalone implementation docs in English and Chinese - separate process/architecture from code-level implementation details --- ...snapshot_subscription_barrier_design_en.md | 61 ++++++++++++ ...snapshot_subscription_barrier_design_zh.md | 62 ++++++++++++ ..._subscription_barrier_implementation_en.md | 96 +++++++++++++++++++ ..._subscription_barrier_implementation_zh.md | 94 ++++++++++++++++++ 4 files changed, 313 insertions(+) create mode 100644 .cursor_knowledge/standby_snapshot_subscription_barrier_design_en.md create mode 100644 .cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md create mode 100644 .cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md create mode 100644 .cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_design_en.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_design_en.md new file mode 100644 index 00000000..014427d3 --- /dev/null +++ b/.cursor_knowledge/standby_snapshot_subscription_barrier_design_en.md @@ -0,0 +1,61 @@ +# Standby Snapshot Subscription Barrier: Design + +## 1. Background +Standby snapshot sync runs after standby subscribes to primary. +Existing snapshot eligibility checks are mostly term and subscribe-id coverage. + +This can still produce a snapshot that does not include all writes from transactions that were already active when standby started subscribing. + +## 2. Goal +Introduce a **subscription-time active transaction barrier**: +- At subscription start, record `global_max_active_tx_ts` on primary. +- A snapshot for that subscription epoch is valid only if: + - `ckpt_ts > subscription_time_global_max_active_tx_ts` + +This guarantees snapshot visibility is strictly after all transactions active at subscription time. + +## 3. Non-goals +- No change to standby forward-stream message protocol. +- No change to transaction execution semantics. +- No redesign of checkpoint algorithm. + +## 4. Capture point +Capture barrier at primary RPC **`StandbyStartFollowing`**. +Do not capture it in `RequestStorageSnapshotSync`. + +Why: +- `StandbyStartFollowing` is the actual subscription start moment. +- `RequestStorageSnapshotSync` may be delayed/retried and does not represent subscription time. + +## 5. Conceptual model +Each subscription epoch (identified by `standby_term`) has a fixed barrier: +- `standby_term = (primary_term << 32) | subscribe_id` +- `barrier_ts = max ActiveTxMaxTs across all local shards at subscription time` + +Snapshot for this epoch is sendable only when checkpoint advances beyond this barrier. + +## 6. Runtime flow +1. Standby calls `StandbyStartFollowing`. +2. Primary computes `barrier_ts` and allocates `subscribe_id`, forming `standby_term`. +3. Primary stores `(standby_node_id, standby_term) -> barrier_ts`. +4. Standby later calls `RequestStorageSnapshotSync` with `standby_term`. +5. Primary loads barrier and attaches it to pending snapshot-sync task. +6. In snapshot worker loop, primary checks existing conditions plus: + - `ckpt_ts > barrier_ts` +7. If false, task remains pending; if true, snapshot send + notify proceed. + +## 7. Safety and lifecycle principles +- Missing barrier for incoming snapshot-sync request is treated as invalid request. +- Barrier is scoped to subscription epoch and must be cleaned after completion or term supersession. +- Concurrency control follows existing standby-sync mutex boundary in `SnapshotManager`. + +## 8. Expected effects +- Prevents early snapshots that miss subscription-time in-flight transaction writes. +- Keeps retry behavior and snapshot transport model unchanged. +- Adds controlled waiting under heavy long-running-transaction scenarios. + +## 9. Observability (design level) +Need logs/metrics for barrier gating: +- blocked reason: `ckpt_ts <= barrier_ts` +- dimensions: `node_id`, `standby_term`, `barrier_ts`, `ckpt_ts` +- counters: blocked rounds, barrier map size diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md new file mode 100644 index 00000000..9de831ea --- /dev/null +++ b/.cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md @@ -0,0 +1,62 @@ +# Standby Snapshot 订阅屏障:设计文档 + +## 1. 背景 +当前 standby 在订阅 primary 后再触发 snapshot 同步。 +现有 snapshot 可发送判定主要是 term 与 subscribe-id 覆盖。 + +该机制可能出现一个问题: +订阅时刻已经活跃的事务,其修改不一定全部被当轮 snapshot 覆盖。 + +## 2. 目标 +引入“**订阅时刻活动事务屏障**”: +- 在订阅开始时,于主节点记录 `global_max_active_tx_ts`; +- 该订阅 epoch 的 snapshot 仅在满足 + - `ckpt_ts > subscription_time_global_max_active_tx_ts` + 时才有效。 + +这样可保证 snapshot 位于“订阅时刻所有活跃事务”之后。 + +## 3. 非目标 +- 不改 standby forward 流协议。 +- 不改事务执行语义。 +- 不重写 checkpoint 算法。 + +## 4. 屏障采样时机 +应在主节点 **`StandbyStartFollowing`** RPC 中采样, +不应在 `RequestStorageSnapshotSync` 里采样。 + +原因: +- `StandbyStartFollowing` 才是订阅开始时刻; +- `RequestStorageSnapshotSync` 可能延迟/重试,语义不等价于订阅瞬间。 + +## 5. 设计模型 +每个订阅 epoch(由 `standby_term` 标识)绑定一个固定屏障值: +- `standby_term = (primary_term << 32) | subscribe_id` +- `barrier_ts = 订阅时所有本地 shard 的 ActiveTxMaxTs 最大值` + +只有当 checkpoint 前进超过该屏障,才允许发送该 epoch 的 snapshot。 + +## 6. 运行流程 +1. standby 调用 `StandbyStartFollowing`。 +2. primary 计算 `barrier_ts`,分配 `subscribe_id`,组成 `standby_term`。 +3. primary 保存 `(standby_node_id, standby_term) -> barrier_ts`。 +4. standby 调用 `RequestStorageSnapshotSync`(携带 `standby_term`)。 +5. primary 查到 barrier,并附着到 pending snapshot 任务。 +6. snapshot worker 在现有判定外新增: + - `ckpt_ts > barrier_ts` +7. 若不满足则继续等待下一轮 checkpoint;满足才发送 snapshot。 + +## 7. 安全与生命周期原则 +- 收到 snapshot-sync 请求但找不到 barrier,按无效请求处理。 +- barrier 作用域是订阅 epoch,完成后或 term 超前后要清理。 +- 并发控制沿用 `SnapshotManager` 既有 standby-sync 互斥边界。 + +## 8. 预期效果 +- 避免“过早 snapshot”遗漏订阅时活跃事务写入。 +- 不改变现有重试和传输机制。 +- 长事务场景下会引入可预期的等待。 + +## 9. 可观测性(设计层) +建议增加: +- 阻塞日志:`ckpt_ts <= barrier_ts`,包含 node/term/barrier/ckpt +- 指标:阻塞轮次、barrier map 大小 diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md new file mode 100644 index 00000000..66572221 --- /dev/null +++ b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md @@ -0,0 +1,96 @@ +# Standby Snapshot Subscription Barrier: Implementation + +## 1. Scope of code changes +- `tx_service/src/remote/cc_node_service.cpp` +- `tx_service/include/store/snapshot_manager.h` +- `tx_service/src/store/snapshot_manager.cpp` +- `tx_service/include/cc/cc_shard.h` (if `ActiveTxMaxTs` is missing) +- Optional helper request class for cross-shard max-ts aggregation + +## 2. New/updated state + +### 2.1 Barrier registry in `SnapshotManager` +Add a map keyed by standby node and standby term: +- `subscription_barrier_[standby_node_id][standby_term] = barrier_ts` + +### 2.2 Pending snapshot task extension +Replace pending value from raw request to task struct, e.g.: +- `req` (`StorageSnapshotSyncRequest`) +- `subscription_active_tx_max_ts` +- optional `created_at` + +## 3. New APIs in `SnapshotManager` +- `RegisterSubscriptionBarrier(ng_id, standby_node_id, standby_term, barrier_ts)` +- `GetSubscriptionBarrier(standby_node_id, standby_term, uint64_t* out)` +- `EraseSubscriptionBarrier(standby_node_id, standby_term)` +- optional cleanup utility for old terms + +All above plus pending-task updates should be protected by `standby_sync_mux_`. + +## 4. Barrier collection in `StandbyStartFollowing` +In `CcNodeService::StandbyStartFollowing` on primary: +1. Validate leader term. +2. Compute `global_max_active_tx_ts = max(shard.ActiveTxMaxTs(ng_id))`. +3. Generate `subscribe_id`. +4. Form `standby_term`. +5. Call `SnapshotManager::RegisterSubscriptionBarrier(...)`. + +Implementation note: +- Keep collection in shard-safe context (same pattern as existing cross-shard requests). + +## 5. `RequestStorageSnapshotSync` path changes +In `SnapshotManager::OnSnapshotSyncRequested`: +1. Parse `standby_term` from request. +2. Query barrier by `(standby_node_id, standby_term)`. +3. If not found: reject / do not enqueue. +4. If found: enqueue task with barrier ts. + +Dedup logic remains term-based; when task is replaced by newer term, barrier ts is replaced accordingly. + +## 6. Snapshot gating logic +In `SnapshotManager::SyncWithStandby` keep existing checks and add barrier check: +- Obtain current-round `ckpt_ts`. +- For each candidate pending task, require: + - `ckpt_ts > task.subscription_active_tx_max_ts` + +If condition fails, leave task pending for next round. + +## 7. Checkpoint API adjustment +Current `RunOneRoundCheckpoint` returns `bool` only. +Prefer changing signature to: +- `bool RunOneRoundCheckpoint(uint32_t node_group, int64_t term, uint64_t* out_ckpt_ts)` + +Set `*out_ckpt_ts` from round checkpoint request result (`CkptTsCc::GetCkptTs()`). + +## 8. `ActiveTxMaxTs` helper +If missing, add in `CcShard`: +- `uint64_t ActiveTxMaxTs(NodeGroupId cc_ng_id)` + +Expected behavior: +- Traverse `lock_holding_txs_[cc_ng_id]` +- Use write-lock timestamp domain aligned with existing checkpoint min-ts logic +- Exclude meta-table tx entries (same scope policy as min-ts path) + +## 9. Cleanup rules +- On successful snapshot completion for `(node_id, standby_term)`: erase barrier entry. +- On registering newer term for same node: prune older-term barriers. +- On standby reset/unsubscribe: remove all barriers for that node (if hook available). +- Optional TTL sweep as fallback. + +## 10. Failure behavior +- Missing barrier on sync request: reject request (safe default). +- Checkpoint failure: keep task queued. +- Snapshot transfer or callback failure: preserve existing retry behavior. + +## 11. Tests + +### Unit tests +- barrier register/get/erase and supersession behavior +- pending dedup with barrier replacement +- gating predicate boundaries (`ckpt_ts == barrier_ts`, `ckpt_ts < barrier_ts`, `ckpt_ts > barrier_ts`) + +### Integration tests +- long-running active tx at subscription time blocks snapshot until ckpt passes barrier +- multiple standbys with independent barriers +- repeated sync-request retries with same standby term +- leader term switch cleanup correctness diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md new file mode 100644 index 00000000..88cb0c86 --- /dev/null +++ b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md @@ -0,0 +1,94 @@ +# Standby Snapshot 订阅屏障:实现文档 + +## 1. 代码改动范围 +- `tx_service/src/remote/cc_node_service.cpp` +- `tx_service/include/store/snapshot_manager.h` +- `tx_service/src/store/snapshot_manager.cpp` +- `tx_service/include/cc/cc_shard.h`(若缺少 `ActiveTxMaxTs`) +- 可选:新增跨 shard 聚合请求类 + +## 2. 新增/调整状态 + +### 2.1 `SnapshotManager` 屏障表 +新增按 standby node + standby term 索引的屏障映射: +- `subscription_barrier_[standby_node_id][standby_term] = barrier_ts` + +### 2.2 pending 任务结构扩展 +将 pending value 从单一 request 扩展为任务结构,至少包括: +- `req: StorageSnapshotSyncRequest` +- `subscription_active_tx_max_ts` +- 可选 `created_at` + +## 3. `SnapshotManager` 新接口 +- `RegisterSubscriptionBarrier(ng_id, standby_node_id, standby_term, barrier_ts)` +- `GetSubscriptionBarrier(standby_node_id, standby_term, uint64_t* out)` +- `EraseSubscriptionBarrier(standby_node_id, standby_term)` +- 可选:清理旧 term 的辅助方法 + +以上接口及 pending 操作统一受 `standby_sync_mux_` 保护。 + +## 4. `StandbyStartFollowing` 中采样并注册 +在 primary 侧 `CcNodeService::StandbyStartFollowing` 中: +1. term 校验通过后; +2. 计算 `global_max_active_tx_ts = max(shard.ActiveTxMaxTs(ng_id))`; +3. 生成 `subscribe_id`; +4. 组合 `standby_term`; +5. 注册屏障到 `SnapshotManager`。 + +实现注意: +- `ActiveTxMaxTs` 聚合应在 shard 安全上下文执行(参考现有跨 shard 请求模式)。 + +## 5. `RequestStorageSnapshotSync` 路径调整 +在 `SnapshotManager::OnSnapshotSyncRequested` 中: +1. 取 `standby_node_id + standby_node_term`; +2. 查询 barrier; +3. 查询失败则拒绝入队; +4. 查询成功则将 barrier 附加到 pending 任务。 + +去重语义保持 term 优先;任务被新 term 覆盖时同步替换 barrier。 + +## 6. `SyncWithStandby` 判定扩展 +保留现有判定,新增 barrier 条件: +- 取本轮 `ckpt_ts`; +- 要求 `ckpt_ts > task.subscription_active_tx_max_ts`。 + +不满足则任务保留到下一轮。 + +## 7. `RunOneRoundCheckpoint` 输出 ckpt_ts +当前只返回 bool,建议改为: +- `bool RunOneRoundCheckpoint(uint32_t node_group, int64_t term, uint64_t *out_ckpt_ts)` + +并在函数内将本轮 `CkptTsCc::GetCkptTs()` 写入 `out_ckpt_ts`。 + +## 8. `ActiveTxMaxTs` 方法 +若当前无此方法,在 `CcShard` 中新增: +- `uint64_t ActiveTxMaxTs(NodeGroupId cc_ng_id)` + +建议规则: +- 遍历 `lock_holding_txs_[cc_ng_id]` +- 与 min-ts 逻辑同口径(仅非 meta / 写锁相关) +- 返回该 shard 的活动事务最大时间戳 + +## 9. 清理策略 +- 同一 `(node_id, standby_term)` snapshot 成功后删除 barrier; +- 同节点新 term 注册时删除旧 term barrier; +- standby 重置/取消订阅时清理该节点 barrier; +- 可选 TTL 扫描兜底。 + +## 10. 失败处理 +- 请求到达但 barrier 缺失:拒绝(安全优先); +- checkpoint 失败:任务继续排队等待下一轮; +- snapshot 发送/回调失败:沿用既有重试机制。 + +## 11. 测试建议 + +### 单测 +- barrier 注册/查询/删除 +- pending 去重与 barrier 替换 +- 判定边界:`ckpt_ts == / < / > barrier_ts` + +### 集成 +- 长事务阻塞场景:checkpoint 超过 barrier 前不发送 snapshot +- 多 standby 独立 barrier +- 同 standby term 重试请求 +- term 切换后的清理与隔离 From 179176ea5900d7af3474bddccce5a8d711a1dc63 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 9 Mar 2026 17:34:05 +0800 Subject: [PATCH 03/14] feat(snapshot_manager): add subscription barrier registry APIs - add RegisterSubscriptionBarrier/GetSubscriptionBarrier/EraseSubscriptionBarrier with API comments - add in-memory subscription barrier map keyed by standby node id and standby node term - guard barrier operations with standby_sync_mux_ and prune older terms on register --- tx_service/include/store/snapshot_manager.h | 38 ++++++++++++ tx_service/src/store/snapshot_manager.cpp | 67 +++++++++++++++++++++ 2 files changed, 105 insertions(+) diff --git a/tx_service/include/store/snapshot_manager.h b/tx_service/include/store/snapshot_manager.h index 8004d53d..16232d3a 100644 --- a/tx_service/include/store/snapshot_manager.h +++ b/tx_service/include/store/snapshot_manager.h @@ -60,6 +60,40 @@ class SnapshotManager void OnSnapshotSyncRequested( const txservice::remote::StorageSnapshotSyncRequest *req); + /** + * @brief Register the active-tx barrier captured at standby subscription + * time. + * @param standby_node_id Standby's node id. + * @param standby_node_term Standby's term ((primary_term << 32) | + * subscribe_id). + * @param active_tx_max_ts Maximum active write-tx timestamp observed at + * subscription time. + */ + void RegisterSubscriptionBarrier(uint32_t standby_node_id, + int64_t standby_node_term, + uint64_t active_tx_max_ts); + + /** + * @brief Lookup subscription barrier by standby node id and standby term. + * @param standby_node_id Standby's node id. + * @param standby_node_term Standby's term ((primary_term << 32) | + * subscribe_id). + * @param active_tx_max_ts Output barrier timestamp if found. + * @return true if barrier exists, false otherwise. + */ + bool GetSubscriptionBarrier(uint32_t standby_node_id, + int64_t standby_node_term, + uint64_t *active_tx_max_ts); + + /** + * @brief Remove an existing subscription barrier. + * @param standby_node_id Standby's node id. + * @param standby_node_term Standby's term ((primary_term << 32) | + * subscribe_id). + */ + void EraseSubscriptionBarrier(uint32_t standby_node_id, + int64_t standby_node_term); + txservice::remote::BackupTaskStatus CreateBackup( const txservice::remote::CreateBackupRequest *req); @@ -95,6 +129,10 @@ class SnapshotManager std::condition_variable standby_sync_cv_; std::unordered_map pending_req_; + // standby node id -> (standby node term -> subscription-time active tx + // max ts) + std::unordered_map> + subscription_barrier_; bool terminated_{false}; const std::string backup_path_; diff --git a/tx_service/src/store/snapshot_manager.cpp b/tx_service/src/store/snapshot_manager.cpp index 179d915c..f4533c8b 100644 --- a/tx_service/src/store/snapshot_manager.cpp +++ b/tx_service/src/store/snapshot_manager.cpp @@ -107,6 +107,73 @@ void SnapshotManager::OnSnapshotSyncRequested( } } +void SnapshotManager::RegisterSubscriptionBarrier(uint32_t standby_node_id, + int64_t standby_node_term, + uint64_t active_tx_max_ts) +{ + std::unique_lock lk(standby_sync_mux_); + auto &node_barriers = subscription_barrier_[standby_node_id]; + + // Keep only current and newer terms for this node. + auto it = node_barriers.begin(); + while (it != node_barriers.end()) + { + if (it->first < standby_node_term) + { + it = node_barriers.erase(it); + } + else + { + ++it; + } + } + + node_barriers[standby_node_term] = active_tx_max_ts; +} + +bool SnapshotManager::GetSubscriptionBarrier(uint32_t standby_node_id, + int64_t standby_node_term, + uint64_t *active_tx_max_ts) +{ + if (active_tx_max_ts == nullptr) + { + return false; + } + + std::unique_lock lk(standby_sync_mux_); + auto node_it = subscription_barrier_.find(standby_node_id); + if (node_it == subscription_barrier_.end()) + { + return false; + } + + auto barrier_it = node_it->second.find(standby_node_term); + if (barrier_it == node_it->second.end()) + { + return false; + } + + *active_tx_max_ts = barrier_it->second; + return true; +} + +void SnapshotManager::EraseSubscriptionBarrier(uint32_t standby_node_id, + int64_t standby_node_term) +{ + std::unique_lock lk(standby_sync_mux_); + auto node_it = subscription_barrier_.find(standby_node_id); + if (node_it == subscription_barrier_.end()) + { + return; + } + + node_it->second.erase(standby_node_term); + if (node_it->second.empty()) + { + subscription_barrier_.erase(node_it); + } +} + // If kvstore is enabled, we must flush data in-memory to kvstore firstly. // For non-shared kvstore, also we create and send the snapshot to standby // nodes. From efce676f118c62cdc959f4d2455ff776c67e54d9 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 9 Mar 2026 18:33:11 +0800 Subject: [PATCH 04/14] refactor(snapshot_manager): extend pending snapshot task to carry barrier state - introduce PendingSnapshotSyncTask (request + subscription_active_tx_max_ts) - migrate pending_req_ map value type to PendingSnapshotSyncTask - update SyncWithStandby/OnSnapshotSyncRequested request field access to task.req - keep runtime behavior unchanged; barrier field is reserved for next integration step --- tx_service/include/store/snapshot_manager.h | 9 +++++++-- tx_service/src/store/snapshot_manager.cpp | 14 ++++++++------ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/tx_service/include/store/snapshot_manager.h b/tx_service/include/store/snapshot_manager.h index 16232d3a..be69fae8 100644 --- a/tx_service/include/store/snapshot_manager.h +++ b/tx_service/include/store/snapshot_manager.h @@ -109,6 +109,12 @@ class SnapshotManager bool RunOneRoundCheckpoint(uint32_t node_group, int64_t ng_leader_term); private: + struct PendingSnapshotSyncTask + { + txservice::remote::StorageSnapshotSyncRequest req; + uint64_t subscription_active_tx_max_ts{0}; + }; + SnapshotManager() = default; ~SnapshotManager() = default; @@ -127,8 +133,7 @@ class SnapshotManager std::thread standby_sync_worker_; std::mutex standby_sync_mux_; std::condition_variable standby_sync_cv_; - std::unordered_map - pending_req_; + std::unordered_map pending_req_; // standby node id -> (standby node term -> subscription-time active tx // max ts) std::unordered_map> diff --git a/tx_service/src/store/snapshot_manager.cpp b/tx_service/src/store/snapshot_manager.cpp index f4533c8b..ac347cdc 100644 --- a/tx_service/src/store/snapshot_manager.cpp +++ b/tx_service/src/store/snapshot_manager.cpp @@ -92,7 +92,8 @@ void SnapshotManager::OnSnapshotSyncRequested( // check if the queued task is newer than the new received req. If // so, discard the new req, otherwise, update the task. auto &cur_task = ins_pair.first->second; - int64_t cur_task_standby_node_term = cur_task.standby_node_term(); + int64_t cur_task_standby_node_term = + cur_task.req.standby_node_term(); int64_t req_standby_node_term = req->standby_node_term(); if (cur_task_standby_node_term >= req_standby_node_term) @@ -102,7 +103,7 @@ void SnapshotManager::OnSnapshotSyncRequested( } } - ins_pair.first->second.CopyFrom(*req); + ins_pair.first->second.req.CopyFrom(*req); standby_sync_cv_.notify_all(); } } @@ -224,7 +225,7 @@ void SnapshotManager::SyncWithStandby() } } - std::vector tasks; + std::vector tasks; // Dequeue all pending tasks that can be covered by this snapshot. { @@ -234,7 +235,7 @@ void SnapshotManager::SyncWithStandby() { auto &pending_task = it->second; int64_t pending_task_standby_node_term = - pending_task.standby_node_term(); + pending_task.req.standby_node_term(); int64_t pending_task_primary_term = PrimaryTermFromStandbyTerm(pending_task_standby_node_term); @@ -283,8 +284,9 @@ void SnapshotManager::SyncWithStandby() } } - for (auto &req : tasks) + for (auto &task : tasks) { + auto &req = task.req; uint32_t node_id = req.standby_node_id(); std::string ip; uint16_t port; @@ -362,7 +364,7 @@ void SnapshotManager::SyncWithStandby() // Check again to see if the request has been updated. auto &next_pending_task = pending_req_iter->second; int64_t next_pending_task_standby_term = - next_pending_task.standby_node_term(); + next_pending_task.req.standby_node_term(); int64_t next_pending_task_primary_term = PrimaryTermFromStandbyTerm(next_pending_task_standby_term); From 44ee99bb14893159832063be787123f40379c1fc Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 9 Mar 2026 18:54:28 +0800 Subject: [PATCH 05/14] feat(snapshot_sync): enforce subscription barrier when syncing standby snapshot - register subscription barrier in StandbyStartFollowing using aggregated ActiveTxMaxTs - make OnSnapshotSyncRequested validate barrier existence and return acceptance status - extend SyncWithStandby gating: checkpoint ts must be greater than subscription barrier ts - extend RunOneRoundCheckpoint with optional out checkpoint ts for barrier validation - erase barrier entry after successful snapshot sync --- tx_service/include/store/snapshot_manager.h | 12 ++- tx_service/src/remote/cc_node_service.cpp | 18 +++- tx_service/src/store/snapshot_manager.cpp | 106 ++++++++++++++------ 3 files changed, 103 insertions(+), 33 deletions(-) diff --git a/tx_service/include/store/snapshot_manager.h b/tx_service/include/store/snapshot_manager.h index be69fae8..076c9b76 100644 --- a/tx_service/include/store/snapshot_manager.h +++ b/tx_service/include/store/snapshot_manager.h @@ -57,7 +57,9 @@ class SnapshotManager void Start(); void Shutdown(); - void OnSnapshotSyncRequested( + // Handle snapshot sync request from standby node. Returns true if the + // request is accepted (queued or safely deduped), false otherwise. + bool OnSnapshotSyncRequested( const txservice::remote::StorageSnapshotSyncRequest *req); /** @@ -105,8 +107,12 @@ class SnapshotManager void TerminateBackup(txservice::NodeGroupId ng_id, const std::string &backup_name); - // Run one round checkpoint to flush data in memory to kvstore. - bool RunOneRoundCheckpoint(uint32_t node_group, int64_t ng_leader_term); + // Run one round checkpoint to flush data in memory to kvstore. When + // out_ckpt_ts is not nullptr, it is filled with current round checkpoint + // ts. + bool RunOneRoundCheckpoint(uint32_t node_group, + int64_t ng_leader_term, + uint64_t *out_ckpt_ts = nullptr); private: struct PendingSnapshotSyncTask diff --git a/tx_service/src/remote/cc_node_service.cpp b/tx_service/src/remote/cc_node_service.cpp index 123cd440..3001cf29 100644 --- a/tx_service/src/remote/cc_node_service.cpp +++ b/tx_service/src/remote/cc_node_service.cpp @@ -1656,7 +1656,20 @@ void CcNodeService::StandbyStartFollowing( response->add_start_sequence_id(start_seq); } + ActiveTxMaxTsCc active_tx_max_ts_cc(local_shards_.Count(), + request->node_group_id()); + for (uint16_t core_id = 0; core_id < local_shards_.Count(); core_id++) + { + local_shards_.EnqueueCcRequest(core_id, &active_tx_max_ts_cc); + } + active_tx_max_ts_cc.Wait(); + uint64_t global_active_tx_max_ts = active_tx_max_ts_cc.GetActiveTxMaxTs(); + auto subscribe_id = Sharder::Instance().GetNextSubscribeId(); + int64_t standby_node_term = + (request->ng_term() << 32) | static_cast(subscribe_id); + store::SnapshotManager::Instance().RegisterSubscriptionBarrier( + request->node_id(), standby_node_term, global_active_tx_max_ts); response->set_subscribe_id(subscribe_id); response->set_error(false); @@ -1776,8 +1789,9 @@ void CcNodeService::RequestStorageSnapshotSync( // Then, notify standby nodes that data committed before subscribe timepoint // has been flushed to kvstore. (standby nodes begin fetch record from // kvstore on cache miss). - store::SnapshotManager::Instance().OnSnapshotSyncRequested(request); - response->set_error(false); + bool accepted = + store::SnapshotManager::Instance().OnSnapshotSyncRequested(request); + response->set_error(!accepted); } void CcNodeService::OnSnapshotSynced( diff --git a/tx_service/src/store/snapshot_manager.cpp b/tx_service/src/store/snapshot_manager.cpp index ac347cdc..cdca0594 100644 --- a/tx_service/src/store/snapshot_manager.cpp +++ b/tx_service/src/store/snapshot_manager.cpp @@ -69,7 +69,7 @@ void SnapshotManager::StandbySyncWorker() } } -void SnapshotManager::OnSnapshotSyncRequested( +bool SnapshotManager::OnSnapshotSyncRequested( const txservice::remote::StorageSnapshotSyncRequest *req) { DLOG(INFO) << "Received snapshot sync request from standby node #" @@ -79,13 +79,33 @@ void SnapshotManager::OnSnapshotSyncRequested( if (store_hd_ == nullptr) { LOG(ERROR) << "Store handler is nullptr but standby feature enabled."; - return; + return false; } std::unique_lock lk(standby_sync_mux_); if (!terminated_) { + auto node_it = subscription_barrier_.find(req->standby_node_id()); + if (node_it == subscription_barrier_.end()) + { + LOG(WARNING) << "No subscription barrier found for standby node #" + << req->standby_node_id() + << ", standby term: " << req->standby_node_term(); + return false; + } + + auto barrier_it = node_it->second.find(req->standby_node_term()); + if (barrier_it == node_it->second.end()) + { + LOG(WARNING) << "No barrier found for standby node #" + << req->standby_node_id() + << " at standby term: " << req->standby_node_term(); + return false; + } + + uint64_t active_tx_max_ts = barrier_it->second; + auto ins_pair = pending_req_.try_emplace(req->standby_node_id()); if (!ins_pair.second) { @@ -99,13 +119,17 @@ void SnapshotManager::OnSnapshotSyncRequested( if (cur_task_standby_node_term >= req_standby_node_term) { // discard the task. - return; + return true; } } ins_pair.first->second.req.CopyFrom(*req); + ins_pair.first->second.subscription_active_tx_max_ts = active_tx_max_ts; standby_sync_cv_.notify_all(); + return true; } + + return false; } void SnapshotManager::RegisterSubscriptionBarrier(uint32_t standby_node_id, @@ -203,7 +227,9 @@ void SnapshotManager::SyncWithStandby() uint32_t current_subscribe_id = Sharder::Instance().GetCurrentSubscribeId(); - bool ckpt_res = this->RunOneRoundCheckpoint(node_group, leader_term); + uint64_t ckpt_ts = 0; + bool ckpt_res = + this->RunOneRoundCheckpoint(node_group, leader_term, &ckpt_ts); if (!ckpt_res) { @@ -266,6 +292,18 @@ void SnapshotManager::SyncWithStandby() << pending_task_subscribe_id << ". Wait for next round"; covered = false; } + else if (ckpt_ts <= pending_task.subscription_active_tx_max_ts) + { + LOG(INFO) << "Snapshot checkpoint ts " << ckpt_ts + << " does not pass subscription barrier ts " + << pending_task.subscription_active_tx_max_ts + << " for standby node #" + << pending_task.req.standby_node_id() + << ", standby term " + << pending_task.req.standby_node_term() + << ". Wait for next round"; + covered = false; + } if (!covered) { @@ -357,43 +395,50 @@ void SnapshotManager::SyncWithStandby() // We just need to erase the same request. Even if the notification // fails, after a while, the standby node will resend the // request. - std::unique_lock lk(standby_sync_mux_); - auto pending_req_iter = pending_req_.find(req.standby_node_id()); - if (pending_req_iter != pending_req_.end()) { - // Check again to see if the request has been updated. - auto &next_pending_task = pending_req_iter->second; - int64_t next_pending_task_standby_term = - next_pending_task.req.standby_node_term(); - int64_t next_pending_task_primary_term = - PrimaryTermFromStandbyTerm(next_pending_task_standby_term); + std::unique_lock lk(standby_sync_mux_); + auto pending_req_iter = pending_req_.find(req.standby_node_id()); + if (pending_req_iter != pending_req_.end()) + { + // Check again to see if the request has been updated. + auto &next_pending_task = pending_req_iter->second; + int64_t next_pending_task_standby_term = + next_pending_task.req.standby_node_term(); + int64_t next_pending_task_primary_term = + PrimaryTermFromStandbyTerm(next_pending_task_standby_term); - assert(PrimaryTermFromStandbyTerm(req.standby_node_term()) == - leader_term); + assert(PrimaryTermFromStandbyTerm(req.standby_node_term()) == + leader_term); - if (next_pending_task_primary_term < leader_term) - { - pending_req_.erase(pending_req_iter); - } - else if (next_pending_task_primary_term == leader_term) - { - uint32_t next_pending_task_subscribe_id = - SubscribeIdFromStandbyTerm( - next_pending_task_standby_term); - uint32_t cur_task_subscribe_id = - SubscribeIdFromStandbyTerm(req.standby_node_term()); - if (next_pending_task_subscribe_id <= cur_task_subscribe_id) + if (next_pending_task_primary_term < leader_term) { pending_req_.erase(pending_req_iter); } + else if (next_pending_task_primary_term == leader_term) + { + uint32_t next_pending_task_subscribe_id = + SubscribeIdFromStandbyTerm( + next_pending_task_standby_term); + uint32_t cur_task_subscribe_id = + SubscribeIdFromStandbyTerm(req.standby_node_term()); + if (next_pending_task_subscribe_id <= + cur_task_subscribe_id) + { + pending_req_.erase(pending_req_iter); + } + } } } + + EraseSubscriptionBarrier(req.standby_node_id(), + req.standby_node_term()); } } } bool SnapshotManager::RunOneRoundCheckpoint(uint32_t node_group, - int64_t ng_leader_term) + int64_t ng_leader_term, + uint64_t *out_ckpt_ts) { using namespace txservice; auto &local_shards = *Sharder::Instance().GetLocalCcShards(); @@ -422,6 +467,11 @@ bool SnapshotManager::RunOneRoundCheckpoint(uint32_t node_group, } ckpt_req.Wait(); + if (out_ckpt_ts != nullptr) + { + *out_ckpt_ts = ckpt_req.GetCkptTs(); + } + // Iterate all the tables and execute CkptScanCc requests on this node // group's ccmaps on each ccshard. The result of CkptScanCc is stored in // ckpt_vec. From cf6f55913aa19bb226456676f30b46766bc82acf Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 9 Mar 2026 19:27:14 +0800 Subject: [PATCH 06/14] fix(snapshot_sync): complete standby cleanup paths --- tx_service/include/cc/cc_shard.h | 11 ++++++++ tx_service/include/store/snapshot_manager.h | 14 ++++++++++ tx_service/src/sharder.cpp | 27 +++++++++++++++++++ tx_service/src/store/snapshot_manager.cpp | 30 ++++++++++++++++++++- 4 files changed, 81 insertions(+), 1 deletion(-) diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index e1e1b14c..430e7af4 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -1109,6 +1109,17 @@ class CcShard return node_ids; } + std::vector GetCandidateStandbys() + { + std::vector node_ids; + node_ids.reserve(candidate_standby_nodes_.size()); + for (const auto &it : candidate_standby_nodes_) + { + node_ids.push_back(it.first); + } + return node_ids; + } + bool HasCandidateStandbys() const { return !candidate_standby_nodes_.empty(); diff --git a/tx_service/include/store/snapshot_manager.h b/tx_service/include/store/snapshot_manager.h index 076c9b76..adfbff62 100644 --- a/tx_service/include/store/snapshot_manager.h +++ b/tx_service/include/store/snapshot_manager.h @@ -96,6 +96,13 @@ class SnapshotManager void EraseSubscriptionBarrier(uint32_t standby_node_id, int64_t standby_node_term); + /** + * @brief Remove all subscription barriers and queued sync task for one + * standby node. + * @param standby_node_id Standby's node id. + */ + void EraseSubscriptionBarriersByNode(uint32_t standby_node_id); + txservice::remote::BackupTaskStatus CreateBackup( const txservice::remote::CreateBackupRequest *req); @@ -115,6 +122,13 @@ class SnapshotManager uint64_t *out_ckpt_ts = nullptr); private: + /** + * @brief Remove one subscription barrier entry while caller already holds + * standby_sync_mux_. + */ + void EraseSubscriptionBarrierLocked(uint32_t standby_node_id, + int64_t standby_node_term); + struct PendingSnapshotSyncTask { txservice::remote::StorageSnapshotSyncRequest req; diff --git a/tx_service/src/sharder.cpp b/tx_service/src/sharder.cpp index 0c761d6c..69550de8 100644 --- a/tx_service/src/sharder.cpp +++ b/tx_service/src/sharder.cpp @@ -39,6 +39,7 @@ #include "remote/cc_node_service.h" #include "remote/cc_stream_receiver.h" #include "remote/cc_stream_sender.h" +#include "store/snapshot_manager.h" #include "tx_command.h" #include "tx_service.h" #include "tx_worker_pool.h" @@ -1299,6 +1300,7 @@ void Sharder::UpdateInMemoryClusterConfig( std::shared_ptr> new_nodes_sptr, uint64_t version) { + std::vector removed_node_ids; { std::unique_lock lk(cluster_cnf_mux_); if (cluster_config_.version_ >= version) @@ -1368,6 +1370,16 @@ void Sharder::UpdateInMemoryClusterConfig( cluster_config_.ng_configs_.try_emplace(ng_pair.first); ng_cnf_it.first->second = ng_pair.second; } + + // Nodes removed from cluster config in this update. + for (const auto &[node_id, _] : *cluster_config_.nodes_configs_) + { + if (new_nodes_sptr->find(node_id) == new_nodes_sptr->end()) + { + removed_node_ids.emplace_back(node_id); + } + } + cluster_config_.version_ = version; cluster_config_.nodes_configs_ = std::move(new_nodes_sptr); if (cluster_config_.ng_ids_.use_count() == 1) @@ -1402,6 +1414,15 @@ void Sharder::UpdateInMemoryClusterConfig( ccs.RemoveSubscribedStandby(node_id); } } + std::vector candidate_standby_nodes = + ccs.GetCandidateStandbys(); + for (auto node_id : candidate_standby_nodes) + { + if (nodes_configs->find(node_id) == nodes_configs->end()) + { + ccs.RemoveCandidateStandby(node_id); + } + } return true; }, @@ -1413,6 +1434,12 @@ void Sharder::UpdateInMemoryClusterConfig( } remove_subscribed_standby_cc.Wait(); + + for (auto node_id : removed_node_ids) + { + store::SnapshotManager::Instance().EraseSubscriptionBarriersByNode( + node_id); + } } void Sharder::UpdateClusterConfig( diff --git a/tx_service/src/store/snapshot_manager.cpp b/tx_service/src/store/snapshot_manager.cpp index cdca0594..b7307f7d 100644 --- a/tx_service/src/store/snapshot_manager.cpp +++ b/tx_service/src/store/snapshot_manager.cpp @@ -186,6 +186,19 @@ void SnapshotManager::EraseSubscriptionBarrier(uint32_t standby_node_id, int64_t standby_node_term) { std::unique_lock lk(standby_sync_mux_); + EraseSubscriptionBarrierLocked(standby_node_id, standby_node_term); +} + +void SnapshotManager::EraseSubscriptionBarriersByNode(uint32_t standby_node_id) +{ + std::unique_lock lk(standby_sync_mux_); + pending_req_.erase(standby_node_id); + subscription_barrier_.erase(standby_node_id); +} + +void SnapshotManager::EraseSubscriptionBarrierLocked(uint32_t standby_node_id, + int64_t standby_node_term) +{ auto node_it = subscription_barrier_.find(standby_node_id); if (node_it == subscription_barrier_.end()) { @@ -222,6 +235,9 @@ void SnapshotManager::SyncWithStandby() std::unique_lock lk(standby_sync_mux_); // clear all requests pending_req_.clear(); + // clear barriers as well, all queued sync states are stale when this + // leader term is no longer valid. + subscription_barrier_.clear(); return; } @@ -259,6 +275,7 @@ void SnapshotManager::SyncWithStandby() auto it = pending_req_.begin(); while (it != pending_req_.end()) { + uint32_t pending_node_id = it->first; auto &pending_task = it->second; int64_t pending_task_standby_node_term = pending_task.req.standby_node_term(); @@ -268,6 +285,8 @@ void SnapshotManager::SyncWithStandby() if (pending_task_primary_term < leader_term) { // discard the task. + EraseSubscriptionBarrierLocked(pending_node_id, + pending_task_standby_node_term); it = pending_req_.erase(it); continue; } @@ -312,7 +331,10 @@ void SnapshotManager::SyncWithStandby() it++; continue; } - tasks.emplace_back(std::move(pending_task)); + // Keep a copy for current round execution. Do not move out of + // pending_req_, because completion logic still needs the original + // standby term to decide whether the queued task is stale. + tasks.emplace_back(pending_task); // Keep the request entry so completion logic can check whether it // needs to stay queued, but make sure to advance the iterator so we @@ -412,6 +434,9 @@ void SnapshotManager::SyncWithStandby() if (next_pending_task_primary_term < leader_term) { + EraseSubscriptionBarrierLocked( + req.standby_node_id(), + next_pending_task_standby_term); pending_req_.erase(pending_req_iter); } else if (next_pending_task_primary_term == leader_term) @@ -424,6 +449,9 @@ void SnapshotManager::SyncWithStandby() if (next_pending_task_subscribe_id <= cur_task_subscribe_id) { + EraseSubscriptionBarrierLocked( + req.standby_node_id(), + next_pending_task_standby_term); pending_req_.erase(pending_req_iter); } } From 2ff6a9702943650ebe8025132a5626f97dbf96ae Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Tue, 10 Mar 2026 16:36:50 +0800 Subject: [PATCH 07/14] refactor(standby): move barrier registration to reset and drop candidate forward-entry path --- tx_service/include/cc/cc_shard.h | 4 -- tx_service/include/cc/object_cc_map.h | 40 +++++++++++--------- tx_service/src/fault/cc_node.cpp | 11 +++++- tx_service/src/remote/cc_node_service.cpp | 45 ++++++++++++++++------- tx_service/src/store/snapshot_manager.cpp | 7 +++- 5 files changed, 69 insertions(+), 38 deletions(-) diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index 430e7af4..ef1ed057 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -1120,10 +1120,6 @@ class CcShard return node_ids; } - bool HasCandidateStandbys() const - { - return !candidate_standby_nodes_.empty(); - } void ResetStandbySequence(); void DecrInflightStandbyReqCount(uint32_t seq_grp); diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index 322a6849..d2b46d80 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -866,8 +866,7 @@ class ObjectCcMap : public TemplateCcMap StandbyForwardEntry *forward_entry = nullptr; remote::KeyObjectStandbyForwardRequest *forward_req = nullptr; - if (!shard_->GetSubscribedStandbys().empty() || - shard_->HasCandidateStandbys()) + if (!shard_->GetSubscribedStandbys().empty()) { forward_entry = cce->ForwardEntry(); if (!forward_entry) @@ -1380,20 +1379,17 @@ class ObjectCcMap : public TemplateCcMap assert(ccp != nullptr); bool s_obj_exist = (cce->PayloadStatus() == RecordStatus::Normal); - StandbyForwardEntry *forward_entry = nullptr; auto subscribed_standbys = shard_->GetSubscribedStandbys(); - if (!subscribed_standbys.empty()) + bool has_subscribed_standby = !subscribed_standbys.empty(); + StandbyForwardEntry *forward_entry = cce->ForwardEntry(); + if (has_subscribed_standby && forward_entry == nullptr) { - forward_entry = cce->ForwardEntry(); - if (forward_entry == nullptr) + LOG(ERROR) << "Subscribed standbys exist, but forward_entry is " + "null. Data loss may occur. Notifying standbys " + "to resubscribe."; + for (uint32_t node_id : subscribed_standbys) { - LOG(ERROR) << "Subscribed standbys exist, but forward_entry is " - "null. Data loss may occur. Notifying standbys " - "to resubscribe."; - for (uint32_t node_id : subscribed_standbys) - { - shard_->NotifyStandbyOutOfSync(node_id); - } + shard_->NotifyStandbyOutOfSync(node_id); } } if (commit_ts > 0) @@ -1437,12 +1433,20 @@ class ObjectCcMap : public TemplateCcMap } if (forward_entry) { - // Set commit ts and send the msg to standby node - forward_entry->Request().set_commit_ts(commit_ts); - forward_entry->Request().set_schema_version(schema_ts_); - std::unique_ptr entry_ptr = + if (has_subscribed_standby) + { + // Set commit ts and send the msg to standby node. + forward_entry->Request().set_commit_ts(commit_ts); + forward_entry->Request().set_schema_version(schema_ts_); + std::unique_ptr entry_ptr = + cce->ReleaseForwardEntry(); + shard_->ForwardStandbyMessage(entry_ptr.release()); + } + else + { + // No standby needs this entry anymore. cce->ReleaseForwardEntry(); - shard_->ForwardStandbyMessage(entry_ptr.release()); + } } bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(commit_ts, payload_status); diff --git a/tx_service/src/fault/cc_node.cpp b/tx_service/src/fault/cc_node.cpp index a54ba8f4..4b9f8f00 100644 --- a/tx_service/src/fault/cc_node.cpp +++ b/tx_service/src/fault/cc_node.cpp @@ -1075,8 +1075,17 @@ void CcNode::SubscribePrimaryNode(uint32_t leader_node_id, stub->ResetStandbySequenceId(&cntl, &reset_req, &reset_resp, nullptr); // Check rpc result - while (cntl.Failed()) + while (cntl.Failed() || reset_resp.error()) { + if (reset_resp.error()) + { + LOG(WARNING) + << "ResetStandbySequenceId rejected by primary, ng_id: " + << ng_id_ << ", standby_term: " << standby_term + << ", primary_node_id: " << leader_node_id; + return; + } + // We only need to retry if the message is not delivered. if (Sharder::Instance().LeaderTerm(ng_id_) > 0 || Sharder::Instance().CandidateLeaderTerm(ng_id_) > 0 || diff --git a/tx_service/src/remote/cc_node_service.cpp b/tx_service/src/remote/cc_node_service.cpp index 3001cf29..e1e0160c 100644 --- a/tx_service/src/remote/cc_node_service.cpp +++ b/tx_service/src/remote/cc_node_service.cpp @@ -1656,21 +1656,7 @@ void CcNodeService::StandbyStartFollowing( response->add_start_sequence_id(start_seq); } - ActiveTxMaxTsCc active_tx_max_ts_cc(local_shards_.Count(), - request->node_group_id()); - for (uint16_t core_id = 0; core_id < local_shards_.Count(); core_id++) - { - local_shards_.EnqueueCcRequest(core_id, &active_tx_max_ts_cc); - } - active_tx_max_ts_cc.Wait(); - uint64_t global_active_tx_max_ts = active_tx_max_ts_cc.GetActiveTxMaxTs(); - auto subscribe_id = Sharder::Instance().GetNextSubscribeId(); - int64_t standby_node_term = - (request->ng_term() << 32) | static_cast(subscribe_id); - store::SnapshotManager::Instance().RegisterSubscriptionBarrier( - request->node_id(), standby_node_term, global_active_tx_max_ts); - response->set_subscribe_id(subscribe_id); response->set_error(false); } @@ -1898,6 +1884,37 @@ void CcNodeService::ResetStandbySequenceId( } reset_seq_cc.Wait(); + if (!Sharder::Instance().CheckLeaderTerm( + ng_id, PrimaryTermFromStandbyTerm(standby_node_term))) + { + LOG(WARNING) + << "Reject ResetStandbySequenceId due to leader term mismatch, " + << "ng_id: " << ng_id << ", node_id: " << node_id + << ", standby_term: " << standby_node_term; + response->set_error(true); + return; + } + + uint64_t existing_barrier_ts = 0; + bool has_existing_barrier = + store::SnapshotManager::Instance().GetSubscriptionBarrier( + node_id, standby_node_term, &existing_barrier_ts); + (void) existing_barrier_ts; + if (!has_existing_barrier) + { + ActiveTxMaxTsCc active_tx_max_ts_cc(local_shards_.Count(), ng_id); + for (uint16_t core_id = 0; core_id < local_shards_.Count(); core_id++) + { + local_shards_.EnqueueCcRequest(core_id, &active_tx_max_ts_cc); + } + active_tx_max_ts_cc.Wait(); + uint64_t global_active_tx_max_ts = + active_tx_max_ts_cc.GetActiveTxMaxTs(); + + store::SnapshotManager::Instance().RegisterSubscriptionBarrier( + node_id, standby_node_term, global_active_tx_max_ts); + } + response->set_error(false); } diff --git a/tx_service/src/store/snapshot_manager.cpp b/tx_service/src/store/snapshot_manager.cpp index b7307f7d..f99d5258 100644 --- a/tx_service/src/store/snapshot_manager.cpp +++ b/tx_service/src/store/snapshot_manager.cpp @@ -153,7 +153,12 @@ void SnapshotManager::RegisterSubscriptionBarrier(uint32_t standby_node_id, } } - node_barriers[standby_node_term] = active_tx_max_ts; + // Keep the first registered barrier for the same standby term to make + // duplicate reset/subscribe retries idempotent. + if (node_barriers.find(standby_node_term) == node_barriers.end()) + { + node_barriers[standby_node_term] = active_tx_max_ts; + } } bool SnapshotManager::GetSubscriptionBarrier(uint32_t standby_node_id, From 1c55066a5d21480da7dd0e3492aa402ca9ceda47 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Tue, 10 Mar 2026 16:45:16 +0800 Subject: [PATCH 08/14] chore(standby): downgrade missing forward-entry alert to warning --- tx_service/include/cc/object_cc_map.h | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index d2b46d80..a4888664 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -1382,16 +1382,9 @@ class ObjectCcMap : public TemplateCcMap auto subscribed_standbys = shard_->GetSubscribedStandbys(); bool has_subscribed_standby = !subscribed_standbys.empty(); StandbyForwardEntry *forward_entry = cce->ForwardEntry(); - if (has_subscribed_standby && forward_entry == nullptr) - { - LOG(ERROR) << "Subscribed standbys exist, but forward_entry is " - "null. Data loss may occur. Notifying standbys " - "to resubscribe."; - for (uint32_t node_id : subscribed_standbys) - { - shard_->NotifyStandbyOutOfSync(node_id); - } - } + LOG_IF(WARNING, has_subscribed_standby && forward_entry == nullptr) + << "Subscribed standbys exist, but forward_entry is null. " + "Data loss may occur."; if (commit_ts > 0) { RecordStatus dirty_payload_status = cce->DirtyPayloadStatus(); From e4a6597454d5ab2dd25e9ece8707a417ddba651c Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Wed, 11 Mar 2026 16:16:08 +0800 Subject: [PATCH 09/14] fix(snapshot): drop superseded pending sync tasks and ignore stale barriers --- ..._subscription_barrier_implementation_en.md | 2 +- ..._subscription_barrier_implementation_zh.md | 2 +- tx_service/src/store/snapshot_manager.cpp | 58 ++++++++++++++++++- 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md index 66572221..c558a144 100644 --- a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md +++ b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md @@ -20,7 +20,7 @@ Replace pending value from raw request to task struct, e.g.: - optional `created_at` ## 3. New APIs in `SnapshotManager` -- `RegisterSubscriptionBarrier(ng_id, standby_node_id, standby_term, barrier_ts)` +- `RegisterSubscriptionBarrier(standby_node_id, standby_term, barrier_ts)` - `GetSubscriptionBarrier(standby_node_id, standby_term, uint64_t* out)` - `EraseSubscriptionBarrier(standby_node_id, standby_term)` - optional cleanup utility for old terms diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md index 88cb0c86..9e1bc9fd 100644 --- a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md +++ b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md @@ -20,7 +20,7 @@ - 可选 `created_at` ## 3. `SnapshotManager` 新接口 -- `RegisterSubscriptionBarrier(ng_id, standby_node_id, standby_term, barrier_ts)` +- `RegisterSubscriptionBarrier(standby_node_id, standby_term, barrier_ts)` - `GetSubscriptionBarrier(standby_node_id, standby_term, uint64_t* out)` - `EraseSubscriptionBarrier(standby_node_id, standby_term)` - 可选:清理旧 term 的辅助方法 diff --git a/tx_service/src/store/snapshot_manager.cpp b/tx_service/src/store/snapshot_manager.cpp index f99d5258..d5f2077f 100644 --- a/tx_service/src/store/snapshot_manager.cpp +++ b/tx_service/src/store/snapshot_manager.cpp @@ -137,6 +137,48 @@ void SnapshotManager::RegisterSubscriptionBarrier(uint32_t standby_node_id, uint64_t active_tx_max_ts) { std::unique_lock lk(standby_sync_mux_); + + // Ignore out-of-order old barrier registrations when a newer standby term + // is already known for this standby node. + auto node_it = subscription_barrier_.find(standby_node_id); + if (node_it != subscription_barrier_.end()) + { + for (const auto &entry : node_it->second) + { + int64_t existing_term = entry.first; + if (existing_term > standby_node_term) + { + DLOG(INFO) + << "Ignore stale subscription barrier registration for " + "standby node #" + << standby_node_id << ", term " << standby_node_term + << " because newer term " << existing_term + << " already exists"; + return; + } + } + } + + // Drop queued work from older standby terms. They are superseded by this + // new subscription barrier and should not be synced anymore. + auto pending_it = pending_req_.find(standby_node_id); + if (pending_it != pending_req_.end() && + pending_it->second.req.standby_node_term() > standby_node_term) + { + DLOG(INFO) << "Ignore stale barrier registration for standby node #" + << standby_node_id << ", term " << standby_node_term + << " because queued pending task term " + << pending_it->second.req.standby_node_term() + << " is newer"; + return; + } + + if (pending_it != pending_req_.end() && + pending_it->second.req.standby_node_term() < standby_node_term) + { + pending_req_.erase(pending_it); + } + auto &node_barriers = subscription_barrier_[standby_node_id]; // Keep only current and newer terms for this node. @@ -353,11 +395,25 @@ void SnapshotManager::SyncWithStandby() { auto &req = task.req; uint32_t node_id = req.standby_node_id(); + int64_t req_standby_node_term = req.standby_node_term(); + + // Skip stale copied tasks that have already been superseded/removed + // after this round snapshot task list was built. + { + std::unique_lock lk(standby_sync_mux_); + auto pending_it = pending_req_.find(node_id); + if (pending_it == pending_req_.end() || + pending_it->second.req.standby_node_term() != + req_standby_node_term) + { + continue; + } + } + std::string ip; uint16_t port; Sharder::Instance().GetNodeAddress(node_id, ip, port); std::string remote_dest = req.user() + "@" + ip + ":" + req.dest_path(); - int64_t req_standby_node_term = req.standby_node_term(); int64_t req_primary_term = PrimaryTermFromStandbyTerm(req_standby_node_term); From d913057978836b7fa25b7074b336d923ab2d9605 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Wed, 11 Mar 2026 17:04:36 +0800 Subject: [PATCH 10/14] fix(standby): rollback local follow state when reset is rejected --- tx_service/src/fault/cc_node.cpp | 76 +++++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/tx_service/src/fault/cc_node.cpp b/tx_service/src/fault/cc_node.cpp index 4b9f8f00..cf5dbabe 100644 --- a/tx_service/src/fault/cc_node.cpp +++ b/tx_service/src/fault/cc_node.cpp @@ -1074,15 +1074,89 @@ void CcNode::SubscribePrimaryNode(uint32_t leader_node_id, cntl.Reset(); stub->ResetStandbySequenceId(&cntl, &reset_req, &reset_resp, nullptr); + auto rollback_failed_following_state = [this, standby_term, primary_term]() + { + // If a newer subscribe term has already taken over, do not touch local + // standby state here. + if (requested_subscribe_primary_term_.load(std::memory_order_acquire) != + primary_term) + { + return false; + } + + if (Sharder::Instance().StandbyNodeTerm() != standby_term && + Sharder::Instance().CandidateStandbyNodeTerm() != standby_term) + { + return false; + } + + // Reset per-shard subscribe state only when this failed standby term is + // still the active local follow term. + WaitableCc rollback_cc( + [this, standby_term, primary_term](CcShard &ccs) + { + if (requested_subscribe_primary_term_.load( + std::memory_order_acquire) != primary_term) + { + return true; + } + + if (Sharder::Instance().StandbyNodeTerm() != standby_term && + Sharder::Instance().CandidateStandbyNodeTerm() != + standby_term) + { + return true; + } + + auto &grps = ccs.GetStandbysequenceGrps(); + for (auto &entry : grps) + { + auto &grp_info = entry.second; + if (grp_info.subscribed_) + { + grp_info.Unsubscribe(); + } + } + + return true; + }, + local_cc_shards_.Count()); + for (uint32_t core_id = 0; core_id < local_cc_shards_.Count(); + core_id++) + { + local_cc_shards_.EnqueueCcRequest(core_id, &rollback_cc); + } + rollback_cc.Wait(); + + if (requested_subscribe_primary_term_.load(std::memory_order_acquire) != + primary_term) + { + return false; + } + + if (Sharder::Instance().StandbyNodeTerm() == standby_term) + { + Sharder::Instance().SetStandbyNodeTerm(-1); + } + if (Sharder::Instance().CandidateStandbyNodeTerm() == standby_term) + { + Sharder::Instance().SetCandidateStandbyNodeTerm(-1); + } + + return true; + }; + // Check rpc result while (cntl.Failed() || reset_resp.error()) { if (reset_resp.error()) { + bool rolled_back = rollback_failed_following_state(); LOG(WARNING) << "ResetStandbySequenceId rejected by primary, ng_id: " << ng_id_ << ", standby_term: " << standby_term - << ", primary_node_id: " << leader_node_id; + << ", primary_node_id: " << leader_node_id + << ", rolled_back_local_state: " << rolled_back; return; } From 646417e2aaa590cbb7c3a4125e296f661d8bf9b9 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Wed, 11 Mar 2026 17:51:14 +0800 Subject: [PATCH 11/14] refactor(snapshot): gate standby sync by current checkpoint ts --- tx_service/include/store/snapshot_manager.h | 12 +-- tx_service/src/store/snapshot_manager.cpp | 107 ++++++++++++-------- 2 files changed, 69 insertions(+), 50 deletions(-) diff --git a/tx_service/include/store/snapshot_manager.h b/tx_service/include/store/snapshot_manager.h index adfbff62..f7619525 100644 --- a/tx_service/include/store/snapshot_manager.h +++ b/tx_service/include/store/snapshot_manager.h @@ -114,12 +114,12 @@ class SnapshotManager void TerminateBackup(txservice::NodeGroupId ng_id, const std::string &backup_name); - // Run one round checkpoint to flush data in memory to kvstore. When - // out_ckpt_ts is not nullptr, it is filled with current round checkpoint - // ts. - bool RunOneRoundCheckpoint(uint32_t node_group, - int64_t ng_leader_term, - uint64_t *out_ckpt_ts = nullptr); + // Run one round checkpoint to flush data in memory to kvstore. + bool RunOneRoundCheckpoint(uint32_t node_group, int64_t ng_leader_term); + + // Collect current checkpoint ts from cc shards without triggering data + // flush. + uint64_t GetCurrentCheckpointTs(uint32_t node_group); private: /** diff --git a/tx_service/src/store/snapshot_manager.cpp b/tx_service/src/store/snapshot_manager.cpp index d5f2077f..10fd34ce 100644 --- a/tx_service/src/store/snapshot_manager.cpp +++ b/tx_service/src/store/snapshot_manager.cpp @@ -52,6 +52,7 @@ void SnapshotManager::Shutdown() void SnapshotManager::StandbySyncWorker() { + constexpr auto kBlockedTaskRetryInterval = std::chrono::milliseconds(200); while (true) { std::unique_lock lk(standby_sync_mux_); @@ -66,6 +67,18 @@ void SnapshotManager::StandbySyncWorker() lk.unlock(); SyncWithStandby(); lk.lock(); + + if (terminated_) + { + return; + } + + if (!pending_req_.empty()) + { + // Pending requests are still blocked by subscribe/barrier checks. + // Back off to avoid tight checkpoint retry loops. + standby_sync_cv_.wait_for(lk, kBlockedTaskRetryInterval); + } } } @@ -290,33 +303,11 @@ void SnapshotManager::SyncWithStandby() uint32_t current_subscribe_id = Sharder::Instance().GetCurrentSubscribeId(); - uint64_t ckpt_ts = 0; - bool ckpt_res = - this->RunOneRoundCheckpoint(node_group, leader_term, &ckpt_ts); - - if (!ckpt_res) - { - // data flush failed. Retry on next run. - LOG(ERROR) << "Failed to do checkpoint on SyncWithStandby"; - return; - } - - // Now take a snapshot for non-shared storage, and then send to standby - // node. - std::vector snapshot_files; - if (!store_hd_->IsSharedStorage()) - { - bool res = store_hd_->CreateSnapshotForStandby(snapshot_files); - if (!res) - { - LOG(ERROR) << "Failed to create snpashot for sync with standby"; - return; - } - } + uint64_t cur_ckpt_ts = GetCurrentCheckpointTs(node_group); std::vector tasks; - // Dequeue all pending tasks that can be covered by this snapshot. + // Pick all pending tasks that can be covered by current checkpoint ts. { std::unique_lock lk(standby_sync_mux_); auto it = pending_req_.begin(); @@ -358,9 +349,9 @@ void SnapshotManager::SyncWithStandby() << pending_task_subscribe_id << ". Wait for next round"; covered = false; } - else if (ckpt_ts <= pending_task.subscription_active_tx_max_ts) + else if (cur_ckpt_ts <= pending_task.subscription_active_tx_max_ts) { - LOG(INFO) << "Snapshot checkpoint ts " << ckpt_ts + LOG(INFO) << "Current checkpoint ts " << cur_ckpt_ts << " does not pass subscription barrier ts " << pending_task.subscription_active_tx_max_ts << " for standby node #" @@ -391,6 +382,33 @@ void SnapshotManager::SyncWithStandby() } } + if (tasks.empty()) + { + return; + } + + bool ckpt_res = this->RunOneRoundCheckpoint(node_group, leader_term); + + if (!ckpt_res) + { + // Data flush failed. Retry on next run. + LOG(ERROR) << "Failed to do checkpoint on SyncWithStandby"; + return; + } + + // Now take a snapshot for non-shared storage, and then send to standby + // node. + std::vector snapshot_files; + if (!store_hd_->IsSharedStorage()) + { + bool res = store_hd_->CreateSnapshotForStandby(snapshot_files); + if (!res) + { + LOG(ERROR) << "Failed to create snpashot for sync with standby"; + return; + } + } + for (auto &task : tasks) { auto &req = task.req; @@ -525,9 +543,26 @@ void SnapshotManager::SyncWithStandby() } } +uint64_t SnapshotManager::GetCurrentCheckpointTs(uint32_t node_group) +{ + auto local_shards = Sharder::Instance().GetLocalCcShards(); + assert(local_shards != nullptr); + if (local_shards == nullptr) + { + return 0; + } + + CkptTsCc ckpt_req(local_shards->Count(), node_group); + for (size_t i = 0; i < local_shards->Count(); i++) + { + local_shards->EnqueueCcRequest(i, &ckpt_req); + } + ckpt_req.Wait(); + return ckpt_req.GetCkptTs(); +} + bool SnapshotManager::RunOneRoundCheckpoint(uint32_t node_group, - int64_t ng_leader_term, - uint64_t *out_ckpt_ts) + int64_t ng_leader_term) { using namespace txservice; auto &local_shards = *Sharder::Instance().GetLocalCcShards(); @@ -544,22 +579,6 @@ bool SnapshotManager::RunOneRoundCheckpoint(uint32_t node_group, bool can_be_skipped = false; uint64_t last_ckpt_ts = Sharder::Instance().GetNodeGroupCkptTs(node_group); - size_t core_cnt = local_shards.Count(); - CkptTsCc ckpt_req(core_cnt, node_group); - - // Find minimum ckpt_ts from all the ccshards in parallel. ckpt_ts is - // the minimum timestamp minus 1 among all the active transactions, thus - // it's safe to flush all the entries smaller than or equal to ckpt_ts. - for (size_t i = 0; i < local_shards.Count(); i++) - { - local_shards.EnqueueCcRequest(i, &ckpt_req); - } - ckpt_req.Wait(); - - if (out_ckpt_ts != nullptr) - { - *out_ckpt_ts = ckpt_req.GetCkptTs(); - } // Iterate all the tables and execute CkptScanCc requests on this node // group's ccmaps on each ccshard. The result of CkptScanCc is stored in From 408f87d813eadce9e168337e60c9bc3acee34984 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Wed, 11 Mar 2026 18:01:08 +0800 Subject: [PATCH 12/14] docs(standby): align snapshot barrier docs with latest implementation --- ...snapshot_subscription_barrier_design_en.md | 115 +++++++++--------- ...snapshot_subscription_barrier_design_zh.md | 108 ++++++++-------- ..._subscription_barrier_implementation_en.md | 105 ++++++++-------- ..._subscription_barrier_implementation_zh.md | 97 ++++++++------- 4 files changed, 215 insertions(+), 210 deletions(-) diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_design_en.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_design_en.md index 014427d3..7882c9ee 100644 --- a/.cursor_knowledge/standby_snapshot_subscription_barrier_design_en.md +++ b/.cursor_knowledge/standby_snapshot_subscription_barrier_design_en.md @@ -1,61 +1,64 @@ # Standby Snapshot Subscription Barrier: Design ## 1. Background -Standby snapshot sync runs after standby subscribes to primary. -Existing snapshot eligibility checks are mostly term and subscribe-id coverage. - -This can still produce a snapshot that does not include all writes from transactions that were already active when standby started subscribing. +Snapshot sync for standby was previously gated mostly by leader term and +subscribe-id coverage. That was not enough to ensure snapshot content is after +all transactions that were active when standby subscription became effective. ## 2. Goal -Introduce a **subscription-time active transaction barrier**: -- At subscription start, record `global_max_active_tx_ts` on primary. -- A snapshot for that subscription epoch is valid only if: - - `ckpt_ts > subscription_time_global_max_active_tx_ts` - -This guarantees snapshot visibility is strictly after all transactions active at subscription time. - -## 3. Non-goals -- No change to standby forward-stream message protocol. -- No change to transaction execution semantics. -- No redesign of checkpoint algorithm. - -## 4. Capture point -Capture barrier at primary RPC **`StandbyStartFollowing`**. -Do not capture it in `RequestStorageSnapshotSync`. - -Why: -- `StandbyStartFollowing` is the actual subscription start moment. -- `RequestStorageSnapshotSync` may be delayed/retried and does not represent subscription time. - -## 5. Conceptual model -Each subscription epoch (identified by `standby_term`) has a fixed barrier: -- `standby_term = (primary_term << 32) | subscribe_id` -- `barrier_ts = max ActiveTxMaxTs across all local shards at subscription time` - -Snapshot for this epoch is sendable only when checkpoint advances beyond this barrier. - -## 6. Runtime flow -1. Standby calls `StandbyStartFollowing`. -2. Primary computes `barrier_ts` and allocates `subscribe_id`, forming `standby_term`. -3. Primary stores `(standby_node_id, standby_term) -> barrier_ts`. -4. Standby later calls `RequestStorageSnapshotSync` with `standby_term`. -5. Primary loads barrier and attaches it to pending snapshot-sync task. -6. In snapshot worker loop, primary checks existing conditions plus: - - `ckpt_ts > barrier_ts` -7. If false, task remains pending; if true, snapshot send + notify proceed. - -## 7. Safety and lifecycle principles -- Missing barrier for incoming snapshot-sync request is treated as invalid request. -- Barrier is scoped to subscription epoch and must be cleaned after completion or term supersession. -- Concurrency control follows existing standby-sync mutex boundary in `SnapshotManager`. - -## 8. Expected effects -- Prevents early snapshots that miss subscription-time in-flight transaction writes. -- Keeps retry behavior and snapshot transport model unchanged. -- Adds controlled waiting under heavy long-running-transaction scenarios. - -## 9. Observability (design level) -Need logs/metrics for barrier gating: -- blocked reason: `ckpt_ts <= barrier_ts` -- dimensions: `node_id`, `standby_term`, `barrier_ts`, `ckpt_ts` -- counters: blocked rounds, barrier map size +Introduce a subscription barrier per standby epoch: +- `barrier_ts = max ActiveTxMaxTs across all local ccshards` +- A snapshot is valid for that epoch only when: + - `current_ckpt_ts > barrier_ts` + +This guarantees the snapshot is after all transactions active at the +subscription success point. + +## 3. Key decisions +- Barrier sampling point is **`ResetStandbySequenceId` success path on primary** + (not `StandbyStartFollowing` and not `RequestStorageSnapshotSync`). +- Barrier key is `(standby_node_id, standby_term)`. +- `standby_term = (primary_term << 32) | subscribe_id`. +- Snapshot worker uses a lightweight checkpoint-ts probe before running heavy + checkpoint/flush. + +## 4. Runtime flow +1. Standby calls `StandbyStartFollowing`, receives `subscribe_id` and start seq. +2. Standby calls `ResetStandbySequenceId`. +3. Primary marks standby as subscribed on all shards and samples + `global_active_tx_max_ts` using `ActiveTxMaxTsCc`. +4. Primary registers barrier in `SnapshotManager`: + - `subscription_barrier_[node_id][standby_term] = barrier_ts` +5. Standby calls `RequestStorageSnapshotSync` with `standby_term`. +6. Primary validates barrier existence and enqueues one pending task per node + with attached `subscription_active_tx_max_ts`. +7. `StandbySyncWorker` loop: + - Probe `current_ckpt_ts` via `GetCurrentCheckpointTs()` + - Select pending tasks that satisfy: + - same primary term + - `subscribe_id < current_subscribe_id` + - `current_ckpt_ts > barrier_ts` + - If no task is eligible, skip heavy checkpoint for this round. + - If at least one task is eligible, run `RunOneRoundCheckpoint`, build/send + snapshot, then notify standby. + +## 5. Pending and cleanup model +- Pending tasks blocked by subscribe-id or barrier remain queued for retry. +- Worker uses retry backoff wait when pending queue is non-empty but blocked, to + avoid tight loop. +- Superseded standby terms are pruned: + - registering newer barrier can drop older pending task for the same node + - older barriers are removed +- On leader loss, all pending tasks and barriers are cleared. +- On node removal, `EraseSubscriptionBarriersByNode` clears both pending and + barriers for that node. + +## 6. Safety properties +- Missing barrier on snapshot-sync request is rejected (safe default). +- Barrier is epoch-scoped and never shared across standby terms. +- All barrier/pending updates are under `standby_sync_mux_`. + +## 7. Expected effects +- Prevent early snapshots that miss writes from subscription-time active txns. +- Avoid unnecessary heavy checkpoint rounds when no task is currently eligible. +- Keep existing transport/retry semantics for snapshot send/notify. diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md index 9de831ea..d2798498 100644 --- a/.cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md +++ b/.cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md @@ -1,62 +1,56 @@ # Standby Snapshot 订阅屏障:设计文档 ## 1. 背景 -当前 standby 在订阅 primary 后再触发 snapshot 同步。 -现有 snapshot 可发送判定主要是 term 与 subscribe-id 覆盖。 - -该机制可能出现一个问题: -订阅时刻已经活跃的事务,其修改不一定全部被当轮 snapshot 覆盖。 +此前 standby 的 snapshot 同步主要靠 term 和 subscribe-id 覆盖判定, +无法严格保证 snapshot 一定在“订阅生效时刻所有活跃事务”之后。 ## 2. 目标 -引入“**订阅时刻活动事务屏障**”: -- 在订阅开始时,于主节点记录 `global_max_active_tx_ts`; -- 该订阅 epoch 的 snapshot 仅在满足 - - `ckpt_ts > subscription_time_global_max_active_tx_ts` - 时才有效。 - -这样可保证 snapshot 位于“订阅时刻所有活跃事务”之后。 - -## 3. 非目标 -- 不改 standby forward 流协议。 -- 不改事务执行语义。 -- 不重写 checkpoint 算法。 - -## 4. 屏障采样时机 -应在主节点 **`StandbyStartFollowing`** RPC 中采样, -不应在 `RequestStorageSnapshotSync` 里采样。 - -原因: -- `StandbyStartFollowing` 才是订阅开始时刻; -- `RequestStorageSnapshotSync` 可能延迟/重试,语义不等价于订阅瞬间。 - -## 5. 设计模型 -每个订阅 epoch(由 `standby_term` 标识)绑定一个固定屏障值: -- `standby_term = (primary_term << 32) | subscribe_id` -- `barrier_ts = 订阅时所有本地 shard 的 ActiveTxMaxTs 最大值` - -只有当 checkpoint 前进超过该屏障,才允许发送该 epoch 的 snapshot。 - -## 6. 运行流程 -1. standby 调用 `StandbyStartFollowing`。 -2. primary 计算 `barrier_ts`,分配 `subscribe_id`,组成 `standby_term`。 -3. primary 保存 `(standby_node_id, standby_term) -> barrier_ts`。 -4. standby 调用 `RequestStorageSnapshotSync`(携带 `standby_term`)。 -5. primary 查到 barrier,并附着到 pending snapshot 任务。 -6. snapshot worker 在现有判定外新增: - - `ckpt_ts > barrier_ts` -7. 若不满足则继续等待下一轮 checkpoint;满足才发送 snapshot。 - -## 7. 安全与生命周期原则 -- 收到 snapshot-sync 请求但找不到 barrier,按无效请求处理。 -- barrier 作用域是订阅 epoch,完成后或 term 超前后要清理。 -- 并发控制沿用 `SnapshotManager` 既有 standby-sync 互斥边界。 - -## 8. 预期效果 -- 避免“过早 snapshot”遗漏订阅时活跃事务写入。 -- 不改变现有重试和传输机制。 -- 长事务场景下会引入可预期的等待。 - -## 9. 可观测性(设计层) -建议增加: -- 阻塞日志:`ckpt_ts <= barrier_ts`,包含 node/term/barrier/ckpt -- 指标:阻塞轮次、barrier map 大小 +为每个 standby 订阅 epoch 引入屏障: +- `barrier_ts = 所有本地 ccshard 的 ActiveTxMaxTs 最大值` +- 只有当 `current_ckpt_ts > barrier_ts` 时,该 epoch 的 snapshot 才允许发送 + +这样可保证 snapshot 至少覆盖订阅成功时刻之前的活跃事务影响。 + +## 3. 关键设计决策 +- 屏障采样时机是主节点 **`ResetStandbySequenceId` 成功路径**,不是 + `StandbyStartFollowing`,也不是 `RequestStorageSnapshotSync`。 +- 屏障键为 `(standby_node_id, standby_term)`。 +- `standby_term = (primary_term << 32) | subscribe_id`。 +- snapshot worker 在跑重 checkpoint 之前,先做一次轻量 ckpt-ts 探测。 + +## 4. 运行流程 +1. standby 调用 `StandbyStartFollowing`,拿到 `subscribe_id` 和起始序列。 +2. standby 调用 `ResetStandbySequenceId`。 +3. primary 在所有 shard 上将该 standby 标记为 subscribed,并通过 + `ActiveTxMaxTsCc` 计算 `global_active_tx_max_ts`。 +4. primary 在 `SnapshotManager` 注册屏障: + - `subscription_barrier_[node_id][standby_term] = barrier_ts` +5. standby 调用 `RequestStorageSnapshotSync`(带 `standby_term`)。 +6. primary 校验屏障存在后,将请求入 pending,并附上 + `subscription_active_tx_max_ts`。 +7. `StandbySyncWorker` 周期处理: + - 先通过 `GetCurrentCheckpointTs()` 读取当前 ckpt_ts + - 只挑选满足以下条件的任务: + - primary term 一致 + - `subscribe_id < current_subscribe_id` + - `current_ckpt_ts > barrier_ts` + - 若本轮无可发送任务,跳过重 checkpoint + - 若有可发送任务,执行 `RunOneRoundCheckpoint`,再创建/发送 snapshot, + 并通知 standby + +## 5. Pending 与清理策略 +- subscribe-id 或 barrier 不满足的任务会保留在 pending 中等待后续轮次。 +- 为避免空转,worker 在 pending 非空但阻塞时使用退避等待。 +- 同节点新 term 注册会淘汰旧 term 的 pending/barrier。 +- leader 失效时清空全部 pending 和 barrier。 +- 节点移除时,按 node_id 清空对应 pending 与 barrier。 + +## 6. 安全性 +- snapshot-sync 请求找不到 barrier 一律拒绝(安全默认)。 +- 屏障按订阅 epoch 隔离,不跨 term 复用。 +- pending/barrier 的所有读写都在 `standby_sync_mux_` 保护下。 + +## 7. 预期效果 +- 避免过早 snapshot 遗漏订阅时在途事务写入。 +- 在无可发送任务时避免不必要的重 checkpoint 开销。 +- 保持现有 snapshot 传输和重试语义不变。 diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md index c558a144..ff758599 100644 --- a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md +++ b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md @@ -2,10 +2,10 @@ ## 1. Scope of code changes - `tx_service/src/remote/cc_node_service.cpp` +- `tx_service/src/fault/cc_node.cpp` +- `tx_service/include/cc/cc_request.h` (`ActiveTxMaxTsCc`) - `tx_service/include/store/snapshot_manager.h` - `tx_service/src/store/snapshot_manager.cpp` -- `tx_service/include/cc/cc_shard.h` (if `ActiveTxMaxTs` is missing) -- Optional helper request class for cross-shard max-ts aggregation ## 2. New/updated state @@ -14,83 +14,88 @@ Add a map keyed by standby node and standby term: - `subscription_barrier_[standby_node_id][standby_term] = barrier_ts` ### 2.2 Pending snapshot task extension -Replace pending value from raw request to task struct, e.g.: +Pending value is a task struct: - `req` (`StorageSnapshotSyncRequest`) - `subscription_active_tx_max_ts` -- optional `created_at` ## 3. New APIs in `SnapshotManager` - `RegisterSubscriptionBarrier(standby_node_id, standby_term, barrier_ts)` - `GetSubscriptionBarrier(standby_node_id, standby_term, uint64_t* out)` - `EraseSubscriptionBarrier(standby_node_id, standby_term)` -- optional cleanup utility for old terms +- `EraseSubscriptionBarriersByNode(standby_node_id)` +- `GetCurrentCheckpointTs(node_group) -> uint64_t` +- `RunOneRoundCheckpoint(node_group, leader_term) -> bool` -All above plus pending-task updates should be protected by `standby_sync_mux_`. +Barrier/pending updates are protected by `standby_sync_mux_`. -## 4. Barrier collection in `StandbyStartFollowing` -In `CcNodeService::StandbyStartFollowing` on primary: -1. Validate leader term. -2. Compute `global_max_active_tx_ts = max(shard.ActiveTxMaxTs(ng_id))`. -3. Generate `subscribe_id`. -4. Form `standby_term`. -5. Call `SnapshotManager::RegisterSubscriptionBarrier(...)`. +## 4. Barrier collection in `ResetStandbySequenceId` +In `CcNodeService::ResetStandbySequenceId` on primary: +1. Move standby from candidate to subscribed on all shards. +2. Validate leader term. +3. If barrier for `(node_id, standby_term)` does not exist: + - run `ActiveTxMaxTsCc` across all shards + - compute global max + - call `SnapshotManager::RegisterSubscriptionBarrier(...)` -Implementation note: -- Keep collection in shard-safe context (same pattern as existing cross-shard requests). +This makes the sampling point aligned with "subscription success". ## 5. `RequestStorageSnapshotSync` path changes In `SnapshotManager::OnSnapshotSyncRequested`: -1. Parse `standby_term` from request. +1. Parse `(standby_node_id, standby_term)` from request. 2. Query barrier by `(standby_node_id, standby_term)`. -3. If not found: reject / do not enqueue. +3. If not found: reject request. 4. If found: enqueue task with barrier ts. -Dedup logic remains term-based; when task is replaced by newer term, barrier ts is replaced accordingly. +Dedup is still term-based per standby node. ## 6. Snapshot gating logic -In `SnapshotManager::SyncWithStandby` keep existing checks and add barrier check: -- Obtain current-round `ckpt_ts`. -- For each candidate pending task, require: - - `ckpt_ts > task.subscription_active_tx_max_ts` - -If condition fails, leave task pending for next round. - -## 7. Checkpoint API adjustment -Current `RunOneRoundCheckpoint` returns `bool` only. -Prefer changing signature to: -- `bool RunOneRoundCheckpoint(uint32_t node_group, int64_t term, uint64_t* out_ckpt_ts)` - -Set `*out_ckpt_ts` from round checkpoint request result (`CkptTsCc::GetCkptTs()`). - -## 8. `ActiveTxMaxTs` helper -If missing, add in `CcShard`: -- `uint64_t ActiveTxMaxTs(NodeGroupId cc_ng_id)` - -Expected behavior: -- Traverse `lock_holding_txs_[cc_ng_id]` -- Use write-lock timestamp domain aligned with existing checkpoint min-ts logic -- Exclude meta-table tx entries (same scope policy as min-ts path) - -## 9. Cleanup rules +`SnapshotManager::SyncWithStandby` now runs in two phases: +1. Lightweight phase: + - `current_ckpt_ts = GetCurrentCheckpointTs(node_group)` + - Select tasks that satisfy: + - term alignment + - `subscribe_id < current_subscribe_id` + - `current_ckpt_ts > subscription_active_tx_max_ts` + - If no task is eligible, return directly. +2. Heavy phase: + - Run `RunOneRoundCheckpoint(...)` (flush) + - Create/send snapshot and notify standby for selected tasks. + +## 7. Worker retry pacing +`StandbySyncWorker` keeps existing wake condition on non-empty pending queue, and +adds short wait-for backoff (`200ms`) when requests remain pending after a +round, to avoid tight retry loops. + +## 8. Cleanup rules - On successful snapshot completion for `(node_id, standby_term)`: erase barrier entry. -- On registering newer term for same node: prune older-term barriers. -- On standby reset/unsubscribe: remove all barriers for that node (if hook available). -- Optional TTL sweep as fallback. +- On registering newer term for same node: prune older barriers and drop older + pending task. +- On node removal: `EraseSubscriptionBarriersByNode(node_id)` clears both + pending and barrier entries. +- On leader loss in sync loop: clear all pending and barriers. -## 10. Failure behavior +## 9. Failure behavior - Missing barrier on sync request: reject request (safe default). -- Checkpoint failure: keep task queued. -- Snapshot transfer or callback failure: preserve existing retry behavior. +- Checkpoint failure: keep task queued for next rounds. +- Snapshot transfer failure: task stays pending and retries in later rounds. + +## 10. Standby-side rejection handling +In `CcNode::SubscribePrimaryNode`, if `ResetStandbySequenceId` is rejected by +primary, local standby following state is rolled back: +- unsubscribe per-shard standby sequence groups +- reset standby/candidate standby term if still on the failed term +- guard against clobbering newer resubscribe attempts. ## 11. Tests ### Unit tests - barrier register/get/erase and supersession behavior - pending dedup with barrier replacement -- gating predicate boundaries (`ckpt_ts == barrier_ts`, `ckpt_ts < barrier_ts`, `ckpt_ts > barrier_ts`) +- gating boundaries (`current_ckpt_ts == / < / > barrier_ts`) ### Integration tests -- long-running active tx at subscription time blocks snapshot until ckpt passes barrier +- long-running active tx at subscription success blocks snapshot until + `current_ckpt_ts > barrier` - multiple standbys with independent barriers - repeated sync-request retries with same standby term - leader term switch cleanup correctness diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md index 9e1bc9fd..465ce1d5 100644 --- a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md +++ b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md @@ -2,10 +2,10 @@ ## 1. 代码改动范围 - `tx_service/src/remote/cc_node_service.cpp` +- `tx_service/src/fault/cc_node.cpp` +- `tx_service/include/cc/cc_request.h`(`ActiveTxMaxTsCc`) - `tx_service/include/store/snapshot_manager.h` - `tx_service/src/store/snapshot_manager.cpp` -- `tx_service/include/cc/cc_shard.h`(若缺少 `ActiveTxMaxTs`) -- 可选:新增跨 shard 聚合请求类 ## 2. 新增/调整状态 @@ -14,81 +14,84 @@ - `subscription_barrier_[standby_node_id][standby_term] = barrier_ts` ### 2.2 pending 任务结构扩展 -将 pending value 从单一 request 扩展为任务结构,至少包括: +pending value 为任务结构,包含: - `req: StorageSnapshotSyncRequest` - `subscription_active_tx_max_ts` -- 可选 `created_at` ## 3. `SnapshotManager` 新接口 - `RegisterSubscriptionBarrier(standby_node_id, standby_term, barrier_ts)` - `GetSubscriptionBarrier(standby_node_id, standby_term, uint64_t* out)` - `EraseSubscriptionBarrier(standby_node_id, standby_term)` -- 可选:清理旧 term 的辅助方法 +- `EraseSubscriptionBarriersByNode(standby_node_id)` +- `GetCurrentCheckpointTs(node_group) -> uint64_t` +- `RunOneRoundCheckpoint(node_group, leader_term) -> bool` -以上接口及 pending 操作统一受 `standby_sync_mux_` 保护。 +以上 barrier/pending 相关操作统一受 `standby_sync_mux_` 保护。 -## 4. `StandbyStartFollowing` 中采样并注册 -在 primary 侧 `CcNodeService::StandbyStartFollowing` 中: -1. term 校验通过后; -2. 计算 `global_max_active_tx_ts = max(shard.ActiveTxMaxTs(ng_id))`; -3. 生成 `subscribe_id`; -4. 组合 `standby_term`; -5. 注册屏障到 `SnapshotManager`。 +## 4. 在 `ResetStandbySequenceId` 中采样并注册 +在 primary 侧 `CcNodeService::ResetStandbySequenceId` 中: +1. 先在各 shard 将 standby 从 candidate 切换为 subscribed; +2. 做 term 校验; +3. 若 `(node_id, standby_term)` 尚无 barrier: + - 通过 `ActiveTxMaxTsCc` 聚合所有 shard 的活动事务上界 + - 调用 `SnapshotManager::RegisterSubscriptionBarrier(...)` 注册。 -实现注意: -- `ActiveTxMaxTs` 聚合应在 shard 安全上下文执行(参考现有跨 shard 请求模式)。 +该时机对应“订阅真正成功”。 ## 5. `RequestStorageSnapshotSync` 路径调整 在 `SnapshotManager::OnSnapshotSyncRequested` 中: 1. 取 `standby_node_id + standby_node_term`; 2. 查询 barrier; -3. 查询失败则拒绝入队; +3. 查询失败则拒绝请求; 4. 查询成功则将 barrier 附加到 pending 任务。 -去重语义保持 term 优先;任务被新 term 覆盖时同步替换 barrier。 +去重语义保持“同节点按 term 优先”。 ## 6. `SyncWithStandby` 判定扩展 -保留现有判定,新增 barrier 条件: -- 取本轮 `ckpt_ts`; -- 要求 `ckpt_ts > task.subscription_active_tx_max_ts`。 - -不满足则任务保留到下一轮。 - -## 7. `RunOneRoundCheckpoint` 输出 ckpt_ts -当前只返回 bool,建议改为: -- `bool RunOneRoundCheckpoint(uint32_t node_group, int64_t term, uint64_t *out_ckpt_ts)` - -并在函数内将本轮 `CkptTsCc::GetCkptTs()` 写入 `out_ckpt_ts`。 - -## 8. `ActiveTxMaxTs` 方法 -若当前无此方法,在 `CcShard` 中新增: -- `uint64_t ActiveTxMaxTs(NodeGroupId cc_ng_id)` - -建议规则: -- 遍历 `lock_holding_txs_[cc_ng_id]` -- 与 min-ts 逻辑同口径(仅非 meta / 写锁相关) -- 返回该 shard 的活动事务最大时间戳 - -## 9. 清理策略 +`SnapshotManager::SyncWithStandby` 采用两阶段: +1. 轻量阶段: + - `current_ckpt_ts = GetCurrentCheckpointTs(node_group)` + - 筛选满足以下条件的任务: + - term 对齐 + - `subscribe_id < current_subscribe_id` + - `current_ckpt_ts > subscription_active_tx_max_ts` + - 若无可发送任务,直接返回 +2. 重量阶段: + - 执行 `RunOneRoundCheckpoint(...)` 做 flush + - 对可发送任务创建/发送 snapshot,并通知 standby。 + +## 7. Worker 重试节流 +`StandbySyncWorker` 在 pending 非空但任务仍阻塞时,增加短暂退避等待 +(`200ms`),避免紧循环。 + +## 8. 清理策略 - 同一 `(node_id, standby_term)` snapshot 成功后删除 barrier; -- 同节点新 term 注册时删除旧 term barrier; -- standby 重置/取消订阅时清理该节点 barrier; -- 可选 TTL 扫描兜底。 +- 同节点新 term 注册时,清理旧 term barrier,并可淘汰旧 pending; +- 节点移除时通过 `EraseSubscriptionBarriersByNode(node_id)` 同时清理 + pending 与 barrier; +- leader 失效时清空全部 pending/barrier。 -## 10. 失败处理 +## 9. 失败处理 - 请求到达但 barrier 缺失:拒绝(安全优先); -- checkpoint 失败:任务继续排队等待下一轮; -- snapshot 发送/回调失败:沿用既有重试机制。 +- checkpoint 失败:任务保留,后续轮次重试; +- snapshot 发送失败:任务保留,后续轮次重试。 + +## 10. Standby 侧 reset 拒绝回滚 +在 `CcNode::SubscribePrimaryNode` 中,如果 primary 拒绝 +`ResetStandbySequenceId`: +- 回滚本地 per-shard standby 订阅状态(`Unsubscribe`) +- 若仍是失败 term,则清理 `standby/candidate standby term` +- 并发保护避免误伤更新的重订阅流程。 ## 11. 测试建议 ### 单测 - barrier 注册/查询/删除 - pending 去重与 barrier 替换 -- 判定边界:`ckpt_ts == / < / > barrier_ts` +- 判定边界:`current_ckpt_ts == / < / > barrier_ts` ### 集成 -- 长事务阻塞场景:checkpoint 超过 barrier 前不发送 snapshot +- 长事务阻塞场景:`current_ckpt_ts` 未超过 barrier 前不发送 snapshot - 多 standby 独立 barrier - 同 standby term 重试请求 - term 切换后的清理与隔离 From 8dc5c4f6512143914a45d24c2efaea0943eb6f5a Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Wed, 11 Mar 2026 18:04:59 +0800 Subject: [PATCH 13/14] format --- tx_service/src/store/snapshot_manager.cpp | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tx_service/src/store/snapshot_manager.cpp b/tx_service/src/store/snapshot_manager.cpp index 10fd34ce..e925006b 100644 --- a/tx_service/src/store/snapshot_manager.cpp +++ b/tx_service/src/store/snapshot_manager.cpp @@ -181,8 +181,7 @@ void SnapshotManager::RegisterSubscriptionBarrier(uint32_t standby_node_id, DLOG(INFO) << "Ignore stale barrier registration for standby node #" << standby_node_id << ", term " << standby_node_term << " because queued pending task term " - << pending_it->second.req.standby_node_term() - << " is newer"; + << pending_it->second.req.standby_node_term() << " is newer"; return; } @@ -498,7 +497,8 @@ void SnapshotManager::SyncWithStandby() // request. { std::unique_lock lk(standby_sync_mux_); - auto pending_req_iter = pending_req_.find(req.standby_node_id()); + auto pending_req_iter = + pending_req_.find(req.standby_node_id()); if (pending_req_iter != pending_req_.end()) { // Check again to see if the request has been updated. @@ -506,10 +506,11 @@ void SnapshotManager::SyncWithStandby() int64_t next_pending_task_standby_term = next_pending_task.req.standby_node_term(); int64_t next_pending_task_primary_term = - PrimaryTermFromStandbyTerm(next_pending_task_standby_term); + PrimaryTermFromStandbyTerm( + next_pending_task_standby_term); - assert(PrimaryTermFromStandbyTerm(req.standby_node_term()) == - leader_term); + assert(PrimaryTermFromStandbyTerm( + req.standby_node_term()) == leader_term); if (next_pending_task_primary_term < leader_term) { From 4d63bdfac35fac2af19336db7d4ce44022c489d1 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 16 Mar 2026 15:31:35 +0800 Subject: [PATCH 14/14] update doc --- ...y_snapshot_subscription_barrier_design.md} | 0 ...snapshot_subscription_barrier_design_zh.md | 56 ----------- ...ot_subscription_barrier_implementation.md} | 0 ..._subscription_barrier_implementation_zh.md | 97 ------------------- 4 files changed, 153 deletions(-) rename .cursor_knowledge/{standby_snapshot_subscription_barrier_design_en.md => standby_snapshot_subscription_barrier_design.md} (100%) delete mode 100644 .cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md rename .cursor_knowledge/{standby_snapshot_subscription_barrier_implementation_en.md => standby_snapshot_subscription_barrier_implementation.md} (100%) delete mode 100644 .cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_design_en.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_design.md similarity index 100% rename from .cursor_knowledge/standby_snapshot_subscription_barrier_design_en.md rename to .cursor_knowledge/standby_snapshot_subscription_barrier_design.md diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md deleted file mode 100644 index d2798498..00000000 --- a/.cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md +++ /dev/null @@ -1,56 +0,0 @@ -# Standby Snapshot 订阅屏障:设计文档 - -## 1. 背景 -此前 standby 的 snapshot 同步主要靠 term 和 subscribe-id 覆盖判定, -无法严格保证 snapshot 一定在“订阅生效时刻所有活跃事务”之后。 - -## 2. 目标 -为每个 standby 订阅 epoch 引入屏障: -- `barrier_ts = 所有本地 ccshard 的 ActiveTxMaxTs 最大值` -- 只有当 `current_ckpt_ts > barrier_ts` 时,该 epoch 的 snapshot 才允许发送 - -这样可保证 snapshot 至少覆盖订阅成功时刻之前的活跃事务影响。 - -## 3. 关键设计决策 -- 屏障采样时机是主节点 **`ResetStandbySequenceId` 成功路径**,不是 - `StandbyStartFollowing`,也不是 `RequestStorageSnapshotSync`。 -- 屏障键为 `(standby_node_id, standby_term)`。 -- `standby_term = (primary_term << 32) | subscribe_id`。 -- snapshot worker 在跑重 checkpoint 之前,先做一次轻量 ckpt-ts 探测。 - -## 4. 运行流程 -1. standby 调用 `StandbyStartFollowing`,拿到 `subscribe_id` 和起始序列。 -2. standby 调用 `ResetStandbySequenceId`。 -3. primary 在所有 shard 上将该 standby 标记为 subscribed,并通过 - `ActiveTxMaxTsCc` 计算 `global_active_tx_max_ts`。 -4. primary 在 `SnapshotManager` 注册屏障: - - `subscription_barrier_[node_id][standby_term] = barrier_ts` -5. standby 调用 `RequestStorageSnapshotSync`(带 `standby_term`)。 -6. primary 校验屏障存在后,将请求入 pending,并附上 - `subscription_active_tx_max_ts`。 -7. `StandbySyncWorker` 周期处理: - - 先通过 `GetCurrentCheckpointTs()` 读取当前 ckpt_ts - - 只挑选满足以下条件的任务: - - primary term 一致 - - `subscribe_id < current_subscribe_id` - - `current_ckpt_ts > barrier_ts` - - 若本轮无可发送任务,跳过重 checkpoint - - 若有可发送任务,执行 `RunOneRoundCheckpoint`,再创建/发送 snapshot, - 并通知 standby - -## 5. Pending 与清理策略 -- subscribe-id 或 barrier 不满足的任务会保留在 pending 中等待后续轮次。 -- 为避免空转,worker 在 pending 非空但阻塞时使用退避等待。 -- 同节点新 term 注册会淘汰旧 term 的 pending/barrier。 -- leader 失效时清空全部 pending 和 barrier。 -- 节点移除时,按 node_id 清空对应 pending 与 barrier。 - -## 6. 安全性 -- snapshot-sync 请求找不到 barrier 一律拒绝(安全默认)。 -- 屏障按订阅 epoch 隔离,不跨 term 复用。 -- pending/barrier 的所有读写都在 `standby_sync_mux_` 保护下。 - -## 7. 预期效果 -- 避免过早 snapshot 遗漏订阅时在途事务写入。 -- 在无可发送任务时避免不必要的重 checkpoint 开销。 -- 保持现有 snapshot 传输和重试语义不变。 diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation.md similarity index 100% rename from .cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md rename to .cursor_knowledge/standby_snapshot_subscription_barrier_implementation.md diff --git a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md b/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md deleted file mode 100644 index 465ce1d5..00000000 --- a/.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md +++ /dev/null @@ -1,97 +0,0 @@ -# Standby Snapshot 订阅屏障:实现文档 - -## 1. 代码改动范围 -- `tx_service/src/remote/cc_node_service.cpp` -- `tx_service/src/fault/cc_node.cpp` -- `tx_service/include/cc/cc_request.h`(`ActiveTxMaxTsCc`) -- `tx_service/include/store/snapshot_manager.h` -- `tx_service/src/store/snapshot_manager.cpp` - -## 2. 新增/调整状态 - -### 2.1 `SnapshotManager` 屏障表 -新增按 standby node + standby term 索引的屏障映射: -- `subscription_barrier_[standby_node_id][standby_term] = barrier_ts` - -### 2.2 pending 任务结构扩展 -pending value 为任务结构,包含: -- `req: StorageSnapshotSyncRequest` -- `subscription_active_tx_max_ts` - -## 3. `SnapshotManager` 新接口 -- `RegisterSubscriptionBarrier(standby_node_id, standby_term, barrier_ts)` -- `GetSubscriptionBarrier(standby_node_id, standby_term, uint64_t* out)` -- `EraseSubscriptionBarrier(standby_node_id, standby_term)` -- `EraseSubscriptionBarriersByNode(standby_node_id)` -- `GetCurrentCheckpointTs(node_group) -> uint64_t` -- `RunOneRoundCheckpoint(node_group, leader_term) -> bool` - -以上 barrier/pending 相关操作统一受 `standby_sync_mux_` 保护。 - -## 4. 在 `ResetStandbySequenceId` 中采样并注册 -在 primary 侧 `CcNodeService::ResetStandbySequenceId` 中: -1. 先在各 shard 将 standby 从 candidate 切换为 subscribed; -2. 做 term 校验; -3. 若 `(node_id, standby_term)` 尚无 barrier: - - 通过 `ActiveTxMaxTsCc` 聚合所有 shard 的活动事务上界 - - 调用 `SnapshotManager::RegisterSubscriptionBarrier(...)` 注册。 - -该时机对应“订阅真正成功”。 - -## 5. `RequestStorageSnapshotSync` 路径调整 -在 `SnapshotManager::OnSnapshotSyncRequested` 中: -1. 取 `standby_node_id + standby_node_term`; -2. 查询 barrier; -3. 查询失败则拒绝请求; -4. 查询成功则将 barrier 附加到 pending 任务。 - -去重语义保持“同节点按 term 优先”。 - -## 6. `SyncWithStandby` 判定扩展 -`SnapshotManager::SyncWithStandby` 采用两阶段: -1. 轻量阶段: - - `current_ckpt_ts = GetCurrentCheckpointTs(node_group)` - - 筛选满足以下条件的任务: - - term 对齐 - - `subscribe_id < current_subscribe_id` - - `current_ckpt_ts > subscription_active_tx_max_ts` - - 若无可发送任务,直接返回 -2. 重量阶段: - - 执行 `RunOneRoundCheckpoint(...)` 做 flush - - 对可发送任务创建/发送 snapshot,并通知 standby。 - -## 7. Worker 重试节流 -`StandbySyncWorker` 在 pending 非空但任务仍阻塞时,增加短暂退避等待 -(`200ms`),避免紧循环。 - -## 8. 清理策略 -- 同一 `(node_id, standby_term)` snapshot 成功后删除 barrier; -- 同节点新 term 注册时,清理旧 term barrier,并可淘汰旧 pending; -- 节点移除时通过 `EraseSubscriptionBarriersByNode(node_id)` 同时清理 - pending 与 barrier; -- leader 失效时清空全部 pending/barrier。 - -## 9. 失败处理 -- 请求到达但 barrier 缺失:拒绝(安全优先); -- checkpoint 失败:任务保留,后续轮次重试; -- snapshot 发送失败:任务保留,后续轮次重试。 - -## 10. Standby 侧 reset 拒绝回滚 -在 `CcNode::SubscribePrimaryNode` 中,如果 primary 拒绝 -`ResetStandbySequenceId`: -- 回滚本地 per-shard standby 订阅状态(`Unsubscribe`) -- 若仍是失败 term,则清理 `standby/candidate standby term` -- 并发保护避免误伤更新的重订阅流程。 - -## 11. 测试建议 - -### 单测 -- barrier 注册/查询/删除 -- pending 去重与 barrier 替换 -- 判定边界:`current_ckpt_ts == / < / > barrier_ts` - -### 集成 -- 长事务阻塞场景:`current_ckpt_ts` 未超过 barrier 前不发送 snapshot -- 多 standby 独立 barrier -- 同 standby term 重试请求 -- term 切换后的清理与隔离