Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions .cursor_knowledge/standby_snapshot_subscription_barrier_design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Standby Snapshot Subscription Barrier: Design

## 1. Background
Snapshot sync for standby was previously gated mostly by leader term and
subscribe-id coverage. That was not enough to ensure snapshot content is after
all transactions that were active when standby subscription became effective.

## 2. Goal
Introduce a subscription barrier per standby epoch:
- `barrier_ts = max ActiveTxMaxTs across all local ccshards`
- A snapshot is valid for that epoch only when:
- `current_ckpt_ts > barrier_ts`

This guarantees the snapshot is after all transactions active at the
subscription success point.

## 3. Key decisions
- Barrier sampling point is **`ResetStandbySequenceId` success path on primary**
(not `StandbyStartFollowing` and not `RequestStorageSnapshotSync`).
- Barrier key is `(standby_node_id, standby_term)`.
- `standby_term = (primary_term << 32) | subscribe_id`.
- Snapshot worker uses a lightweight checkpoint-ts probe before running heavy
checkpoint/flush.

## 4. Runtime flow
1. Standby calls `StandbyStartFollowing`, receives `subscribe_id` and start seq.
2. Standby calls `ResetStandbySequenceId`.
3. Primary marks standby as subscribed on all shards and samples
`global_active_tx_max_ts` using `ActiveTxMaxTsCc`.
4. Primary registers barrier in `SnapshotManager`:
- `subscription_barrier_[node_id][standby_term] = barrier_ts`
5. Standby calls `RequestStorageSnapshotSync` with `standby_term`.
6. Primary validates barrier existence and enqueues one pending task per node
with attached `subscription_active_tx_max_ts`.
7. `StandbySyncWorker` loop:
- Probe `current_ckpt_ts` via `GetCurrentCheckpointTs()`
- Select pending tasks that satisfy:
- same primary term
- `subscribe_id < current_subscribe_id`
- `current_ckpt_ts > barrier_ts`
- If no task is eligible, skip heavy checkpoint for this round.
- If at least one task is eligible, run `RunOneRoundCheckpoint`, build/send
snapshot, then notify standby.

## 5. Pending and cleanup model
- Pending tasks blocked by subscribe-id or barrier remain queued for retry.
- Worker uses retry backoff wait when pending queue is non-empty but blocked, to
avoid tight loop.
- Superseded standby terms are pruned:
- registering newer barrier can drop older pending task for the same node
- older barriers are removed
- On leader loss, all pending tasks and barriers are cleared.
- On node removal, `EraseSubscriptionBarriersByNode` clears both pending and
barriers for that node.

## 6. Safety properties
- Missing barrier on snapshot-sync request is rejected (safe default).
- Barrier is epoch-scoped and never shared across standby terms.
- All barrier/pending updates are under `standby_sync_mux_`.

## 7. Expected effects
- Prevent early snapshots that miss writes from subscription-time active txns.
- Avoid unnecessary heavy checkpoint rounds when no task is currently eligible.
- Keep existing transport/retry semantics for snapshot send/notify.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Standby Snapshot Subscription Barrier: Implementation

## 1. Scope of code changes
- `tx_service/src/remote/cc_node_service.cpp`
- `tx_service/src/fault/cc_node.cpp`
- `tx_service/include/cc/cc_request.h` (`ActiveTxMaxTsCc`)
- `tx_service/include/store/snapshot_manager.h`
- `tx_service/src/store/snapshot_manager.cpp`

## 2. New/updated state

### 2.1 Barrier registry in `SnapshotManager`
Add a map keyed by standby node and standby term:
- `subscription_barrier_[standby_node_id][standby_term] = barrier_ts`

### 2.2 Pending snapshot task extension
Pending value is a task struct:
- `req` (`StorageSnapshotSyncRequest`)
- `subscription_active_tx_max_ts`

## 3. New APIs in `SnapshotManager`
- `RegisterSubscriptionBarrier(standby_node_id, standby_term, barrier_ts)`
- `GetSubscriptionBarrier(standby_node_id, standby_term, uint64_t* out)`
- `EraseSubscriptionBarrier(standby_node_id, standby_term)`
- `EraseSubscriptionBarriersByNode(standby_node_id)`
- `GetCurrentCheckpointTs(node_group) -> uint64_t`
- `RunOneRoundCheckpoint(node_group, leader_term) -> bool`

Barrier/pending updates are protected by `standby_sync_mux_`.

## 4. Barrier collection in `ResetStandbySequenceId`
In `CcNodeService::ResetStandbySequenceId` on primary:
1. Move standby from candidate to subscribed on all shards.
2. Validate leader term.
3. If barrier for `(node_id, standby_term)` does not exist:
- run `ActiveTxMaxTsCc` across all shards
- compute global max
- call `SnapshotManager::RegisterSubscriptionBarrier(...)`

This makes the sampling point aligned with "subscription success".

## 5. `RequestStorageSnapshotSync` path changes
In `SnapshotManager::OnSnapshotSyncRequested`:
1. Parse `(standby_node_id, standby_term)` from request.
2. Query barrier by `(standby_node_id, standby_term)`.
3. If not found: reject request.
4. If found: enqueue task with barrier ts.

Dedup is still term-based per standby node.

## 6. Snapshot gating logic
`SnapshotManager::SyncWithStandby` now runs in two phases:
1. Lightweight phase:
- `current_ckpt_ts = GetCurrentCheckpointTs(node_group)`
- Select tasks that satisfy:
- term alignment
- `subscribe_id < current_subscribe_id`
- `current_ckpt_ts > subscription_active_tx_max_ts`
- If no task is eligible, return directly.
2. Heavy phase:
- Run `RunOneRoundCheckpoint(...)` (flush)
- Create/send snapshot and notify standby for selected tasks.

## 7. Worker retry pacing
`StandbySyncWorker` keeps existing wake condition on non-empty pending queue, and
adds short wait-for backoff (`200ms`) when requests remain pending after a
round, to avoid tight retry loops.

## 8. Cleanup rules
- On successful snapshot completion for `(node_id, standby_term)`: erase barrier entry.
- On registering newer term for same node: prune older barriers and drop older
pending task.
- On node removal: `EraseSubscriptionBarriersByNode(node_id)` clears both
pending and barrier entries.
- On leader loss in sync loop: clear all pending and barriers.

## 9. Failure behavior
- Missing barrier on sync request: reject request (safe default).
- Checkpoint failure: keep task queued for next rounds.
- Snapshot transfer failure: task stays pending and retries in later rounds.

## 10. Standby-side rejection handling
In `CcNode::SubscribePrimaryNode`, if `ResetStandbySequenceId` is rejected by
primary, local standby following state is rolled back:
- unsubscribe per-shard standby sequence groups
- reset standby/candidate standby term if still on the failed term
- guard against clobbering newer resubscribe attempts.

## 11. Tests

### Unit tests
- barrier register/get/erase and supersession behavior
- pending dedup with barrier replacement
- gating boundaries (`current_ckpt_ts == / < / > barrier_ts`)

### Integration tests
- long-running active tx at subscription success blocks snapshot until
`current_ckpt_ts > barrier`
- multiple standbys with independent barriers
- repeated sync-request retries with same standby term
- leader term switch cleanup correctness
59 changes: 59 additions & 0 deletions tx_service/include/cc/cc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -3225,6 +3225,65 @@ struct CkptTsCc : public CcRequestBase
NodeGroupId cc_ng_id_;
};

struct ActiveTxMaxTsCc : public CcRequestBase
{
public:
ActiveTxMaxTsCc(size_t shard_cnt, NodeGroupId ng_id)
: active_tx_max_ts_(0),
mux_(),
cv_(),
unfinish_cnt_(shard_cnt),
cc_ng_id_(ng_id)
{
}

ActiveTxMaxTsCc() = delete;
ActiveTxMaxTsCc(const ActiveTxMaxTsCc &) = delete;
ActiveTxMaxTsCc(ActiveTxMaxTsCc &&) = delete;

bool Execute(CcShard &ccs) override
{
uint64_t shard_active_tx_max_ts = ccs.ActiveTxMaxTs(cc_ng_id_);

uint64_t old_val = active_tx_max_ts_.load(std::memory_order_relaxed);
while (old_val < shard_active_tx_max_ts &&
!active_tx_max_ts_.compare_exchange_weak(
old_val, shard_active_tx_max_ts, std::memory_order_acq_rel))
;

std::unique_lock lk(mux_);
if (--unfinish_cnt_ == 0)
{
cv_.notify_one();
}

// return false since ActiveTxMaxTsCc is not reused and does not need
// to call CcRequestBase::Free
return false;
}

void Wait()
{
std::unique_lock lk(mux_);
while (unfinish_cnt_ > 0)
{
cv_.wait(lk);
}
}

uint64_t GetActiveTxMaxTs() const
{
return active_tx_max_ts_.load(std::memory_order_relaxed);
}

private:
std::atomic<uint64_t> active_tx_max_ts_;
bthread::Mutex mux_;
bthread::ConditionVariable cv_;
size_t unfinish_cnt_;
NodeGroupId cc_ng_id_;
};

struct ProcessRemoteScanRespCc : public CcRequestBase
{
public:
Expand Down
30 changes: 30 additions & 0 deletions tx_service/include/cc/cc_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,24 @@ class CcShard
return min_ts;
}

uint64_t ActiveTxMaxTs(NodeGroupId cc_ng_id) const
{
uint64_t max_ts = 0;
auto it = lock_holding_txs_.find(cc_ng_id);
if (it != lock_holding_txs_.end())
{
for (const auto &tx_pair : it->second)
{
if (!TableName::IsMeta(tx_pair.second->table_type_) &&
tx_pair.second->wlock_ts_ != 0)
{
max_ts = std::max(max_ts, tx_pair.second->wlock_ts_);
}
}
}
return max_ts;
}
Comment on lines +667 to +683
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

wlock_ts_ is not a safe snapshot barrier.

ActiveTxMaxTs() records the max first-write-lock timestamp, but SyncWithStandby() later treats ckpt_ts > barrier_ts as proof that all txns active at subscribe time are represented in the snapshot. That does not hold: an older txn can still commit after a newer txn has already forced ckpt_ts down to newer_tx.wlock_ts - 1, so the snapshot can pass the barrier and still miss the older txn's commit. The barrier needs a completion watermark, not the max lock-acquisition timestamp.


/**
* Try to reduce the size of lock array if it becomes sparse.
*
Expand Down Expand Up @@ -1090,6 +1108,18 @@ class CcShard
}
return node_ids;
}

std::vector<uint32_t> GetCandidateStandbys()
{
std::vector<uint32_t> node_ids;
node_ids.reserve(candidate_standby_nodes_.size());
for (const auto &it : candidate_standby_nodes_)
{
node_ids.push_back(it.first);
}
return node_ids;
}

void ResetStandbySequence();

void DecrInflightStandbyReqCount(uint32_t seq_grp);
Expand Down
38 changes: 18 additions & 20 deletions tx_service/include/cc/object_cc_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -1379,22 +1379,12 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
assert(ccp != nullptr);
bool s_obj_exist = (cce->PayloadStatus() == RecordStatus::Normal);

StandbyForwardEntry *forward_entry = nullptr;
auto subscribed_standbys = shard_->GetSubscribedStandbys();
if (!subscribed_standbys.empty())
{
forward_entry = cce->ForwardEntry();
if (forward_entry == nullptr)
{
LOG(ERROR) << "Subscribed standbys exist, but forward_entry is "
"null. Data loss may occur. Notifying standbys "
"to resubscribe.";
for (uint32_t node_id : subscribed_standbys)
{
shard_->NotifyStandbyOutOfSync(node_id);
}
}
}
bool has_subscribed_standby = !subscribed_standbys.empty();
StandbyForwardEntry *forward_entry = cce->ForwardEntry();
LOG_IF(WARNING, has_subscribed_standby && forward_entry == nullptr)
<< "Subscribed standbys exist, but forward_entry is null. "
"Data loss may occur.";
if (commit_ts > 0)
{
RecordStatus dirty_payload_status = cce->DirtyPayloadStatus();
Expand Down Expand Up @@ -1436,12 +1426,20 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
}
if (forward_entry)
{
// Set commit ts and send the msg to standby node
forward_entry->Request().set_commit_ts(commit_ts);
forward_entry->Request().set_schema_version(schema_ts_);
std::unique_ptr<StandbyForwardEntry> entry_ptr =
if (has_subscribed_standby)
{
// Set commit ts and send the msg to standby node.
forward_entry->Request().set_commit_ts(commit_ts);
forward_entry->Request().set_schema_version(schema_ts_);
std::unique_ptr<StandbyForwardEntry> entry_ptr =
cce->ReleaseForwardEntry();
shard_->ForwardStandbyMessage(entry_ptr.release());
}
else
{
// No standby needs this entry anymore.
cce->ReleaseForwardEntry();
shard_->ForwardStandbyMessage(entry_ptr.release());
}
}
bool was_dirty = cce->IsDirty();
cce->SetCommitTsPayloadStatus(commit_ts, payload_status);
Expand Down
Loading
Loading