Enforce subscription barrier for standby snapshot sync#436
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughRegisters per-subscription barriers at follow time using the global max ActiveTxMaxTs, stores barriers keyed by (standby_node, standby_term), attaches barrier_ts to pending snapshot-sync tasks, and gates snapshot delivery on checkpoint_ts > barrier_ts; barriers are pruned on completion, term supersession, node removal, or leader loss. Changes
Sequence Diagram(s)sequenceDiagram
participant Standby as StandbyClient
participant Primary as PrimaryService
participant Shards as LocalShards
participant SnapshotMgr as SnapshotManager
participant Checkpoint as CheckpointEngine
rect rgba(200,200,255,0.5)
Standby->>Primary: StandbyStartFollowing
Primary->>Shards: collect ActiveTxMaxTs (ActiveTxMaxTsCc)
Shards-->>Primary: shard maxes
Primary->>Primary: compute global_max_active_tx_ts, standby_term
Primary->>SnapshotMgr: RegisterSubscriptionBarrier(standby_node, standby_term, barrier_ts)
Primary-->>Standby: StartFollowing response
end
rect rgba(200,255,200,0.5)
Standby->>SnapshotMgr: RequestStorageSnapshotSync(standby_term,...)
SnapshotMgr->>SnapshotMgr: GetSubscriptionBarrier(standby_node, standby_term)
alt barrier exists
SnapshotMgr->>SnapshotMgr: enqueue PendingSnapshotSyncTask(with barrier_ts)
SnapshotMgr-->>Standby: accept
else missing
SnapshotMgr-->>Standby: reject
end
end
rect rgba(255,200,200,0.5)
loop checkpoint rounds
SnapshotMgr->>Checkpoint: GetCurrentCheckpointTs(node_group)
Checkpoint-->>SnapshotMgr: ckpt_ts
SnapshotMgr->>SnapshotMgr: for each task: if ckpt_ts > barrier_ts then create/send snapshot and EraseSubscriptionBarrier else keep pending
end
end
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly Related PRs
Suggested Reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip CodeRabbit can suggest fixes for GitHub Check annotations.Configure the |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (2)
tx_service/src/store/snapshot_manager.cpp (2)
185-200: Consider adding TTL-based cleanup for barriers and pending tasks.The current implementation relies on explicit
EraseSubscriptionBarriercalls for cleanup. If a standby disconnects or a long-running transaction prevents the checkpoint from passing the barrier, entries could accumulate indefinitely.Consider adding:
- A
created_attimestamp toPendingSnapshotSyncTaskfor TTL-based task expiration- Periodic sweep of stale barriers (e.g., barriers older than N minutes without a pending task)
- A maximum retry count before discarding tasks
This aligns with the design doc's mention of "optional TTL sweep as fallback" (line 78 of the design doc).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tx_service/src/store/snapshot_manager.cpp` around lines 185 - 200, SnapshotManager::EraseSubscriptionBarrier currently only removes barriers on explicit calls, which can leak entries; add TTL-based cleanup by (1) extending PendingSnapshotSyncTask with a created_at timestamp and a retry_count field, (2) updating code that enqueues tasks to set created_at and initialize retry_count, and (3) implementing a periodic sweep function in SnapshotManager (protected by standby_sync_mux_) that removes subscription_barrier_ entries older than N minutes when there is no corresponding active PendingSnapshotSyncTask or when retry_count exceeds a configured max; ensure the sweep also increments retry_count or expires tasks and call it from the existing maintenance loop or a new background timer to enforce TTL-based expiration.
315-322: Consider explicit handling of moved-from entries for clarity.After
std::move(pending_task), the entry remains inpending_req_with itsreqin a cleared state. The completion logic (lines 399-430) relies on the moved-from protobuf returning default values (term = 0) to trigger cleanup. While this works, it's implicit and fragile.Consider either:
- Marking entries as "in-flight" explicitly (e.g., a separate set of in-flight node IDs)
- Erasing entries when dequeuing and re-inserting only if they shouldn't complete
Alternative approach: track in-flight tasks explicitly
+ std::unordered_set<uint32_t> in_flight_nodes; std::vector<PendingSnapshotSyncTask> tasks; // Dequeue all pending tasks that can be covered by this snapshot. { std::unique_lock<std::mutex> lk(standby_sync_mux_); auto it = pending_req_.begin(); while (it != pending_req_.end()) { // ... existing validation ... if (!covered) { it++; continue; } + in_flight_nodes.insert(it->first); tasks.emplace_back(std::move(pending_task)); - it++; + it = pending_req_.erase(it); } }Then in completion logic, only check
pending_req_if the node_id exists there (indicating a new request arrived during processing).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tx_service/src/store/snapshot_manager.cpp` around lines 315 - 322, The code moves pending_task into tasks but leaves an entry in pending_req_ with a moved-from req, which relies implicitly on protobuf defaults in the completion logic; fix by explicitly marking entries as in-flight or removing them when dequeuing: either maintain an in_flight set keyed by node_id and insert node_id when you emplace_back(std::move(pending_task)) and consult that set in the completion logic (so only entries that remain in pending_req_ represent new arrivals), or erase the pending_req_ entry when you dequeue and only re-insert it if completion logic determines the request should remain queued; update the dequeueing site (where pending_task is moved) and the completion routine to use the chosen mechanism (pending_req_, tasks, in_flight) so moved-from protobufs are never relied upon implicitly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
@.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md:
- Around line 22-26: The docs list RegisterSubscriptionBarrier with an extra
ng_id parameter which mismatches the implementation; update the documentation to
show the actual three-argument signature used in code:
RegisterSubscriptionBarrier(uint32_t standby_node_id, int64_t standby_node_term,
uint64_t active_tx_max_ts), and ensure GetSubscriptionBarrier and
EraseSubscriptionBarrier doc entries likewise reflect the correct parameters
(standby_node_id, standby_term, and out/return types) so the API docs match the
implemented functions.
In
@.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md:
- Around line 22-26: The documented signature for RegisterSubscriptionBarrier is
outdated: remove the extra ng_id parameter so the interface matches the
implementation; update the line that currently lists
RegisterSubscriptionBarrier(ng_id, standby_node_id, standby_term, barrier_ts) to
RegisterSubscriptionBarrier(standby_node_id, standby_term, barrier_ts) and
ensure any related descriptions or examples reference the three-argument form
used by tx_service/src/remote/cc_node_service.cpp.
In `@tx_service/src/remote/cc_node_service.cpp`:
- Around line 1671-1672: You register a subscription barrier via
SnapshotManager::Instance().RegisterSubscriptionBarrier(request->node_id(), ...)
but never remove it if the standby disconnects or this node steps down; add
calls to remove the barrier in the standby unsubscribe/reset and leader-stepdown
paths by invoking the SnapshotManager erase/unregister API (e.g.,
SnapshotManager::Instance().EraseSubscriptionBarrier(request->node_id()) or
UnregisterSubscriptionBarrier) from the Standby reset/unsubscribe handler and
the leader stepdown/demote code path so barriers are cleaned when a standby
aborts or the node loses leadership.
- Around line 1659-1675: After collecting global_active_tx_max_ts but before
calling Sharder::Instance().GetNextSubscribeId() and
RegisterSubscriptionBarrier, re-check leadership by invoking
CheckLeaderTerm(request->node_group_id(), request->ng_term()); if that check
fails, do NOT mint a subscribe_id or call RegisterSubscriptionBarrier — instead
roll back the per-shard candidate state by issuing a rollback request via
local_shards_ (e.g., use local_shards_.EnqueueCcRequest to enqueue a dedicated
rollback/undo CC request matching the earlier ActiveTxMaxTsCc work) and return
an error response (set response->set_error(true)) so no subscribe_id is
returned. Ensure this prevents RegisterSubscriptionBarrier and leaves no
candidate state for the new leader.
---
Nitpick comments:
In `@tx_service/src/store/snapshot_manager.cpp`:
- Around line 185-200: SnapshotManager::EraseSubscriptionBarrier currently only
removes barriers on explicit calls, which can leak entries; add TTL-based
cleanup by (1) extending PendingSnapshotSyncTask with a created_at timestamp and
a retry_count field, (2) updating code that enqueues tasks to set created_at and
initialize retry_count, and (3) implementing a periodic sweep function in
SnapshotManager (protected by standby_sync_mux_) that removes
subscription_barrier_ entries older than N minutes when there is no
corresponding active PendingSnapshotSyncTask or when retry_count exceeds a
configured max; ensure the sweep also increments retry_count or expires tasks
and call it from the existing maintenance loop or a new background timer to
enforce TTL-based expiration.
- Around line 315-322: The code moves pending_task into tasks but leaves an
entry in pending_req_ with a moved-from req, which relies implicitly on protobuf
defaults in the completion logic; fix by explicitly marking entries as in-flight
or removing them when dequeuing: either maintain an in_flight set keyed by
node_id and insert node_id when you emplace_back(std::move(pending_task)) and
consult that set in the completion logic (so only entries that remain in
pending_req_ represent new arrivals), or erase the pending_req_ entry when you
dequeue and only re-insert it if completion logic determines the request should
remain queued; update the dequeueing site (where pending_task is moved) and the
completion routine to use the chosen mechanism (pending_req_, tasks, in_flight)
so moved-from protobufs are never relied upon implicitly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b5ce1da7-fde3-46f9-8374-c73d20aae7db
📒 Files selected for processing (10)
.cursor_knowledge/standby_snapshot_subscription_barrier_design_en.md.cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.mdtx_service/include/cc/cc_request.htx_service/include/cc/cc_shard.htx_service/include/cc/object_cc_map.htx_service/include/store/snapshot_manager.htx_service/src/remote/cc_node_service.cpptx_service/src/store/snapshot_manager.cpp
.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md
Outdated
Show resolved
Hide resolved
.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tx_service/src/store/snapshot_manager.cpp`:
- Around line 135-157: RegisterSubscriptionBarrier currently prunes
subscription_barrier_ but leaves already-queued pending sync tasks in
pending_req_[standby_node_id], which can cause stale snapshots to be shipped;
update RegisterSubscriptionBarrier so that if standby_node_term is not newer
than the node's current max term we ignore the registration, and if it is newer
we first prune older subscription_barrier_ entries (as now) and then
remove/evict any queued pending_req_ entries for the same standby_node_id whose
term < standby_node_term so they cannot be processed by SyncWithStandby;
reference RegisterSubscriptionBarrier, subscription_barrier_, pending_req_,
standby_node_id, standby_node_term and current_subscribe_id when locating where
to add the older-requests eviction and the early-ignore for out-of-order/older
registrations.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3e97341e-ed5b-46e3-a0b2-04e9f6f02092
📒 Files selected for processing (4)
tx_service/include/cc/cc_shard.htx_service/include/store/snapshot_manager.htx_service/src/sharder.cpptx_service/src/store/snapshot_manager.cpp
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tx_service/include/cc/object_cc_map.h (1)
1382-1442:⚠️ Potential issue | 🔴 CriticalDon't drop forward entries when only candidate standbys exist.
ForwardStandbyMessage()is also the path that assigns a sequence ID and keeps entries in history forcandidate_standby_nodes_(seetx_service/src/cc/cc_shard.cpp:2961-3050). Releasing the entry whenhas_subscribed_standbyis false means commits made during the candidate-only phase never enter that history, so a standby can miss post-snapshot deltas after it finishes syncing.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tx_service/include/cc/object_cc_map.h` around lines 1382 - 1442, The code currently releases forward_entry whenever has_subscribed_standby is false, which drops entries needed for candidate standbys; change the condition so that if forward_entry exists and either has_subscribed_standby is true OR there are candidate standbys (check shard_/candidate_standby_nodes_ via an existing accessor such as HasCandidateStandbys() or equivalent), you set commit_ts/schema_version, take the entry with cce->ReleaseForwardEntry(), and call shard_->ForwardStandbyMessage(entry_ptr.release()); only call cce->ReleaseForwardEntry() without forwarding when no subscribed or candidate standbys exist. Reference symbols: has_subscribed_standby, forward_entry, cce->ReleaseForwardEntry(), shard_->ForwardStandbyMessage(), and candidate_standby_nodes_/HasCandidateStandbys().
♻️ Duplicate comments (1)
tx_service/src/store/snapshot_manager.cpp (1)
135-162:⚠️ Potential issue | 🟠 MajorPrune stale queued work when a newer barrier registers.
This still removes older barriers without evicting an already-queued
pending_req_[standby_node_id]for an older term, and it still accepts an out-of-order older registration after a newer term already exists. That leavesSyncWithStandby()free to keep processing superseded work until a later request overwrites it. Please ignore older registrations here and drop any pending task whose term is older thanstandby_node_term.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tx_service/src/store/snapshot_manager.cpp` around lines 135 - 162, RegisterSubscriptionBarrier currently prunes old barriers but still accepts out-of-order older registrations and leaves pending_req_ entries for superseded terms; fix by first checking subscription_barrier_[standby_node_id] for any existing term greater than standby_node_term and return early to ignore stale registrations, then prune barriers < standby_node_term as before, insert the new barrier if absent, and if pending_req_[standby_node_id] exists and its term is less than standby_node_term, erase/clear that pending_req_ while holding standby_sync_mux_ so SyncWithStandby() cannot process superseded work.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tx_service/src/fault/cc_node.cpp`:
- Around line 1078-1087: The reset rejection branch currently returns without
undoing local standby state; before returning from the ResetStandbySequenceId
rejection path (where you check cntl.Failed() || reset_resp.error()), explicitly
roll back the candidate/following state: clear/reset the
CandidateStandbyNodeTerm and unset any per-shard subscribe/subscribe-state that
may have been installed (or call the node's existing helpers such as
ClearCandidateStandbyState() or RestartSubscription()/UnsubscribeShard() for
each shard), then return; this ensures the local subscription state and
CandidateStandbyNodeTerm are cleared when reset_resp.error() is true and
prevents stale local standby state from persisting.
In `@tx_service/src/store/snapshot_manager.cpp`:
- Around line 339-343: The queued copy of pending_task into tasks leaves
pending_req_ non-empty and causes StandbySyncWorker() (which waits on
!pending_req_.empty()) to spin; change the logic in the SyncWithStandby()
handling so that if you intend to schedule the task for execution you remove it
from pending_req_ (e.g., move the task out and erase the pending_req_ entry)
instead of just copying, otherwise skip queuing it while it is blocked by
current_subscribe_id or subscription_active_tx_max_ts; update the code that uses
pending_task/tasks to use the moved task and ensure any completion logic that
relied on the original standby term reads it before erasure (or store the needed
term on the moved task) so you do not leave pending_req_ populated and avoid
tight checkpoint loops.
---
Outside diff comments:
In `@tx_service/include/cc/object_cc_map.h`:
- Around line 1382-1442: The code currently releases forward_entry whenever
has_subscribed_standby is false, which drops entries needed for candidate
standbys; change the condition so that if forward_entry exists and either
has_subscribed_standby is true OR there are candidate standbys (check
shard_/candidate_standby_nodes_ via an existing accessor such as
HasCandidateStandbys() or equivalent), you set commit_ts/schema_version, take
the entry with cce->ReleaseForwardEntry(), and call
shard_->ForwardStandbyMessage(entry_ptr.release()); only call
cce->ReleaseForwardEntry() without forwarding when no subscribed or candidate
standbys exist. Reference symbols: has_subscribed_standby, forward_entry,
cce->ReleaseForwardEntry(), shard_->ForwardStandbyMessage(), and
candidate_standby_nodes_/HasCandidateStandbys().
---
Duplicate comments:
In `@tx_service/src/store/snapshot_manager.cpp`:
- Around line 135-162: RegisterSubscriptionBarrier currently prunes old barriers
but still accepts out-of-order older registrations and leaves pending_req_
entries for superseded terms; fix by first checking
subscription_barrier_[standby_node_id] for any existing term greater than
standby_node_term and return early to ignore stale registrations, then prune
barriers < standby_node_term as before, insert the new barrier if absent, and if
pending_req_[standby_node_id] exists and its term is less than
standby_node_term, erase/clear that pending_req_ while holding standby_sync_mux_
so SyncWithStandby() cannot process superseded work.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4695088b-8a7c-41a9-a9f5-8a7504de667b
📒 Files selected for processing (5)
tx_service/include/cc/cc_shard.htx_service/include/cc/object_cc_map.htx_service/src/fault/cc_node.cpptx_service/src/remote/cc_node_service.cpptx_service/src/store/snapshot_manager.cpp
| uint64_t ActiveTxMaxTs(NodeGroupId cc_ng_id) const | ||
| { | ||
| uint64_t max_ts = 0; | ||
| auto it = lock_holding_txs_.find(cc_ng_id); | ||
| if (it != lock_holding_txs_.end()) | ||
| { | ||
| for (const auto &tx_pair : it->second) | ||
| { | ||
| if (!TableName::IsMeta(tx_pair.second->table_type_) && | ||
| tx_pair.second->wlock_ts_ != 0) | ||
| { | ||
| max_ts = std::max(max_ts, tx_pair.second->wlock_ts_); | ||
| } | ||
| } | ||
| } | ||
| return max_ts; | ||
| } |
There was a problem hiding this comment.
wlock_ts_ is not a safe snapshot barrier.
ActiveTxMaxTs() records the max first-write-lock timestamp, but SyncWithStandby() later treats ckpt_ts > barrier_ts as proof that all txns active at subscribe time are represented in the snapshot. That does not hold: an older txn can still commit after a newer txn has already forced ckpt_ts down to newer_tx.wlock_ts - 1, so the snapshot can pass the barrier and still miss the older txn's commit. The barrier needs a completion watermark, not the max lock-acquisition timestamp.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tx_service/src/store/snapshot_manager.cpp (1)
460-541:⚠️ Potential issue | 🟠 MajorDo not erase the barrier before the standby actually learns the snapshot is ready.
The new barrier is now required for every
RequestStorageSnapshotSync, but this path erases it even when the channel is null or everyOnSnapshotSyncedretry fails. In that case the standby's retry request is rejected byOnSnapshotSyncRequested, so this subscription term can get stuck permanently.Possible fix
- if (channel) + bool notified = false; + if (channel) { // needs retry if failed // since the standby node may be still spinning up. remote::CcRpcService_Stub stub(channel.get()); int retry_times = 5; @@ - else + else if (!on_sync_resp.error()) { + notified = true; break; } } } + + if (!notified) + { + // Keep the barrier/request so the standby can retry this term. + continue; + } // We don't care if the notification is successful or not. // We just need to erase the same request. Even if the notification // fails, after a while, the standby node will resend the // request.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tx_service/src/store/snapshot_manager.cpp` around lines 460 - 541, The code currently erases the subscription barrier and pending request unconditionally even when the standby was never successfully notified (channel null or all OnSnapshotSynced retries failed); change the flow so that you only call EraseSubscriptionBarrierLocked / pending_req_.erase(...) and the final EraseSubscriptionBarrier(...) when the notification actually succeeded. Concretely, in the block that creates remote::CcRpcService_Stub and calls stub.OnSnapshotSynced(...) track a boolean (e.g., notification_succeeded) set true only if cntl.Failed() is false and break; then wrap the subsequent pending_req_ removal logic and the final EraseSubscriptionBarrier(req.standby_node_id(), req.standby_node_term()) behind a check that notification_succeeded is true (and also skip erasure when channel is null), leaving pending_req_ intact so the standby can retry; use the existing function names OnSnapshotSynced, EraseSubscriptionBarrierLocked, pending_req_, and EraseSubscriptionBarrier to locate and adjust the code.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
@.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md:
- Around line 58-63: Update the documentation to reflect the implemented
checkpoint API: state that RunOneRoundCheckpoint(uint32_t, int64_t) still
returns bool (do not mention an out_ckpt_ts parameter), and describe how
snapshot eligibility is gated by GetCurrentCheckpointTs(uint32_t) rather than an
out parameter; remove the instruction to set *out_ckpt_ts from
CkptTsCc::GetCkptTs(), and instead document that CkptTsCc::GetCkptTs() is used
internally by GetCurrentCheckpointTs to obtain the checkpoint timestamp.
In
@.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md:
- Around line 57-61: Update the checkpoint API documentation and implementation
to use the revised signature bool RunOneRoundCheckpoint(uint32_t node_group,
int64_t term, uint64_t *out_ckpt_ts): change the doc text to describe the
out_ckpt_ts parameter, modify RunOneRoundCheckpoint implementation to write the
current checkpoint timestamp into *out_ckpt_ts using CkptTsCc::GetCkptTs(), and
remove or deprecate the separate GetCurrentCheckpointTs(uint32_t) usage in
barrier logic so callers rely on the out parameter for the barrier decision.
---
Outside diff comments:
In `@tx_service/src/store/snapshot_manager.cpp`:
- Around line 460-541: The code currently erases the subscription barrier and
pending request unconditionally even when the standby was never successfully
notified (channel null or all OnSnapshotSynced retries failed); change the flow
so that you only call EraseSubscriptionBarrierLocked / pending_req_.erase(...)
and the final EraseSubscriptionBarrier(...) when the notification actually
succeeded. Concretely, in the block that creates remote::CcRpcService_Stub and
calls stub.OnSnapshotSynced(...) track a boolean (e.g., notification_succeeded)
set true only if cntl.Failed() is false and break; then wrap the subsequent
pending_req_ removal logic and the final
EraseSubscriptionBarrier(req.standby_node_id(), req.standby_node_term()) behind
a check that notification_succeeded is true (and also skip erasure when channel
is null), leaving pending_req_ intact so the standby can retry; use the existing
function names OnSnapshotSynced, EraseSubscriptionBarrierLocked, pending_req_,
and EraseSubscriptionBarrier to locate and adjust the code.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 80cbe0e1-24c5-4ede-b085-d76416bf2c5e
📒 Files selected for processing (5)
.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.mdtx_service/include/store/snapshot_manager.htx_service/src/fault/cc_node.cpptx_service/src/store/snapshot_manager.cpp
.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md
Outdated
Show resolved
Hide resolved
.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.md
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tx_service/src/store/snapshot_manager.cpp (1)
459-542:⚠️ Potential issue | 🟠 MajorKeep the barrier until the standby actually acknowledges the snapshot.
Line 443 only treats file transfer as success. If
OnSnapshotSyncedexhausts its retries, or otherwise fails to confirm completion, Lines 499-542 still clear the pending entry and erase the barrier. After that, a resend for the samestandby_termwill be rejected byOnSnapshotSyncRequested()because the barrier is gone, so a transient notification failure can leave the standby stuck permanently.Possible fix
if (succ) { + bool notify_succeeded = false; auto channel = Sharder::Instance().GetCcNodeServiceChannel( req.standby_node_id()); DLOG(INFO) << "Notifying standby node #" << req.standby_node_id() << " for snapshot synced at term " << req.standby_node_term() @@ while (retry_times-- > 0) { brpc::Controller cntl; cntl.set_timeout_ms(1000); remote::OnSnapshotSyncedRequest on_synced_req; @@ if (cntl.Failed()) { LOG(WARNING) << "OnSnapshotSynced to standby node #" << req.standby_node_id() << " failed, " << " error: " << cntl.ErrorText() << " error code: " << cntl.ErrorCode(); // sleep 1 second and retry std::this_thread::sleep_for(std::chrono::seconds(1)); continue; } else { + notify_succeeded = true; break; } } } - // We don't care if the notification is successful or not. - // We just need to erase the same request. Even if the notification - // fails, after a while, the standby node will resend the - // request. + if (!notify_succeeded) + { + LOG(WARNING) << "Snapshot sent but standby was not notified; " + << "keep pending request and barrier for retry"; + continue; + } + { std::unique_lock<std::mutex> lk(standby_sync_mux_); auto pending_req_iter = pending_req_.find(req.standby_node_id());🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tx_service/src/store/snapshot_manager.cpp` around lines 459 - 542, The code currently always removes pending_req_ and erases the subscription barrier regardless of whether the remote notification via CcRpcService_Stub::OnSnapshotSynced actually succeeded; change the flow so that you only clear pending_req_ and call EraseSubscriptionBarrierLocked/EraseSubscriptionBarrier when the OnSnapshotSynced call succeeded (use a bool like notified or check on_sync_resp/status and cntl.Failed() to determine success after retries). Locate the notification loop around remote::CcRpcService_Stub::OnSnapshotSynced and add a success flag that is checked before the block that acquires standby_sync_mux_ and before the final EraseSubscriptionBarrier call; if notification failed after retries, leave pending_req_ and the barrier in place (or schedule a retry) instead of erasing them.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@tx_service/src/store/snapshot_manager.cpp`:
- Around line 459-542: The code currently always removes pending_req_ and erases
the subscription barrier regardless of whether the remote notification via
CcRpcService_Stub::OnSnapshotSynced actually succeeded; change the flow so that
you only clear pending_req_ and call
EraseSubscriptionBarrierLocked/EraseSubscriptionBarrier when the
OnSnapshotSynced call succeeded (use a bool like notified or check
on_sync_resp/status and cntl.Failed() to determine success after retries).
Locate the notification loop around remote::CcRpcService_Stub::OnSnapshotSynced
and add a success flag that is checked before the block that acquires
standby_sync_mux_ and before the final EraseSubscriptionBarrier call; if
notification failed after retries, leave pending_req_ and the barrier in place
(or schedule a retry) instead of erasing them.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8fcae895-fdef-4a4f-b979-d83cdd3db5e6
📒 Files selected for processing (5)
.cursor_knowledge/standby_snapshot_subscription_barrier_design_en.md.cursor_knowledge/standby_snapshot_subscription_barrier_design_zh.md.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_en.md.cursor_knowledge/standby_snapshot_subscription_barrier_implementation_zh.mdtx_service/src/store/snapshot_manager.cpp
…-aware forward-entry creation - add CcShard::ActiveTxMaxTs(NodeGroupId) for per-shard max active write-tx ts - add ActiveTxMaxTsCc (CkptTsCc-style) to aggregate max ts across all CcShards - create standby forward entry when either subscribed or candidate standby exists - add CcShard::HasCandidateStandbys() helper
…gn and implementation (EN/ZH) - add standalone design docs in English and Chinese - add standalone implementation docs in English and Chinese - separate process/architecture from code-level implementation details
- add RegisterSubscriptionBarrier/GetSubscriptionBarrier/EraseSubscriptionBarrier with API comments - add in-memory subscription barrier map keyed by standby node id and standby node term - guard barrier operations with standby_sync_mux_ and prune older terms on register
…rier state - introduce PendingSnapshotSyncTask (request + subscription_active_tx_max_ts) - migrate pending_req_ map value type to PendingSnapshotSyncTask - update SyncWithStandby/OnSnapshotSyncRequested request field access to task.req - keep runtime behavior unchanged; barrier field is reserved for next integration step
…y snapshot - register subscription barrier in StandbyStartFollowing using aggregated ActiveTxMaxTs - make OnSnapshotSyncRequested validate barrier existence and return acceptance status - extend SyncWithStandby gating: checkpoint ts must be greater than subscription barrier ts - extend RunOneRoundCheckpoint with optional out checkpoint ts for barrier validation - erase barrier entry after successful snapshot sync
…ate forward-entry path
PR Description
This PR adds a subscription-time barrier to make standby snapshot sync safer.
What changed
active_tx_max_tswhen standby starts following.(standby_node_id, standby_term)inSnapshotManager.ckpt_ts > barrier_ts.Why
This avoids sending a snapshot too early, before writes from transactions active at subscription time are fully covered.
Fixes: https://github.com/eloqdata/project_tracker/issues/218, https://github.com/eloqdata/project_tracker/issues/190.
Summary by CodeRabbit
New Features
Bug Fixes
Observability