From b9936bcea0067d4f540a12f2facedd214ad87765 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Thu, 29 Jan 2026 18:23:32 +0800 Subject: [PATCH 1/8] * fix BackFill; * fix standby node ccentry eviction; * discard forward msg with commit_ts smaller than leader's ckpt_ts --- tx_service/include/cc/object_cc_map.h | 9 ++++++--- tx_service/include/cc/template_cc_map.h | 3 ++- tx_service/include/sharder.h | 3 +++ tx_service/src/cc/cc_entry.cpp | 5 ++--- tx_service/src/sharder.cpp | 12 +++++++++++- 5 files changed, 24 insertions(+), 8 deletions(-) diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index 8f692e2c..f34a8d2d 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -1887,9 +1887,11 @@ class ObjectCcMap : public TemplateCcMap assert(cce); ccp = it.GetPage(); - if (commit_ts <= cce->CommitTs()) + if (commit_ts <= cce->CommitTs() || + commit_ts <= Sharder::Instance().NativeNodeGroupCkptTs()) { - // Discard message since cce has a newer version. + // Discard message since cce has a newer version or has been + // checkpointed by leader node. return req.SetFinish(*shard_); } else @@ -2530,7 +2532,8 @@ class ObjectCcMap : public TemplateCcMap cce->GetKeyGapLockAndExtraData()->ReleasePin(); cce->RecycleKeyLock(*shard_); - if (status == RecordStatus::Unknown) + if (status == RecordStatus::Unknown && + cce->PayloadStatus() == RecordStatus::Unknown) { // fetch record fails. if (cce->IsFree()) diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index f2a00630..0381990c 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -10437,7 +10437,8 @@ class TemplateCcMap : public CcMap CcPage *>( cce->GetCcPage()); assert(ccp != nullptr); - if (status == RecordStatus::Unknown) + if (status == RecordStatus::Unknown && + cce->PayloadStatus() == RecordStatus::Unknown) { // fetch record fails. Remove the pin on the cce. cce->GetKeyGapLockAndExtraData()->ReleasePin(); diff --git a/tx_service/include/sharder.h b/tx_service/include/sharder.h index 4586505d..47537464 100644 --- a/tx_service/include/sharder.h +++ b/tx_service/include/sharder.h @@ -532,6 +532,8 @@ class Sharder uint64_t GetNodeGroupCkptTs(uint32_t cc_ng_id); + uint64_t NativeNodeGroupCkptTs(); + bool UpdateNodeGroupCkptTs(uint32_t cc_ng_id, uint64_t ckpt_ts); TxWorkerPool *GetTxWorkerPool() @@ -784,6 +786,7 @@ class Sharder private: uint32_t node_id_; uint32_t native_ng_; + fault::CcNode *native_cc_node_{}; std::string host_name_; uint16_t port_; // Whether is candidate of native node group. diff --git a/tx_service/src/cc/cc_entry.cpp b/tx_service/src/cc/cc_entry.cpp index 27e8c861..4610841b 100644 --- a/tx_service/src/cc/cc_entry.cpp +++ b/tx_service/src/cc/cc_entry.cpp @@ -63,9 +63,8 @@ bool VersionedLruEntry::IsPersistent() const if (Sharder::Instance().StandbyNodeTerm() >= 0 && Sharder::Instance().GetDataStoreHandler()->IsSharedStorage()) { - // If this is a follower with shared kv, all cce is treated as persisted - // since primary node will write them to kv. - return true; + // If this is a follower with shared kv, check the ng leader's ckpt_ts. + return CommitTs() <= Sharder::Instance().NativeNodeGroupCkptTs(); } if (Versioned) diff --git a/tx_service/src/sharder.cpp b/tx_service/src/sharder.cpp index b1e5df45..2ba05378 100644 --- a/tx_service/src/sharder.cpp +++ b/tx_service/src/sharder.cpp @@ -254,7 +254,7 @@ int Sharder::Init( if (cluster_config_.ng_configs_.at(ng_id).at(idx).node_id_ == node_id_) { - cluster_config_.cc_nodes_.try_emplace( + auto [it, _] = cluster_config_.cc_nodes_.try_emplace( ng_id, std::make_shared( ng_id, @@ -262,6 +262,10 @@ int Sharder::Init( *local_shards_, log_agent_ != nullptr ? log_agent_->LogGroupCount() : 0)); + if (ng_id == native_ng_) + { + native_cc_node_ = it->second.get(); + } } } } @@ -1006,6 +1010,12 @@ uint64_t Sharder::GetNodeGroupCkptTs(uint32_t cc_ng_id) return 0; } +uint64_t Sharder::NativeNodeGroupCkptTs() +{ + assert(native_cc_node_ != nullptr); + return native_cc_node_->GetCkptTs(); +} + bool Sharder::UpdateNodeGroupCkptTs(uint32_t cc_ng_id, uint64_t ckpt_ts) { std::shared_lock lk(cluster_cnf_mux_); From bd1f315fe8b93cfa50d9c6a0087bcf1934d70a3f Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Thu, 29 Jan 2026 18:53:43 +0800 Subject: [PATCH 2/8] fix --- tx_service/include/cc/object_cc_map.h | 5 ++--- tx_service/include/cc/template_cc_map.h | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index f34a8d2d..45cb9f39 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -2532,11 +2532,10 @@ class ObjectCcMap : public TemplateCcMap cce->GetKeyGapLockAndExtraData()->ReleasePin(); cce->RecycleKeyLock(*shard_); - if (status == RecordStatus::Unknown && - cce->PayloadStatus() == RecordStatus::Unknown) + if (status == RecordStatus::Unknown) { // fetch record fails. - if (cce->IsFree()) + if (cce->PayloadStatus() == RecordStatus::Unknown && cce->IsFree()) { // Remove cce if it is not referenced by anyone. CleanEntry(entry, ccp); diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 0381990c..0630c025 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -10437,13 +10437,12 @@ class TemplateCcMap : public CcMap CcPage *>( cce->GetCcPage()); assert(ccp != nullptr); - if (status == RecordStatus::Unknown && - cce->PayloadStatus() == RecordStatus::Unknown) + if (status == RecordStatus::Unknown) { // fetch record fails. Remove the pin on the cce. cce->GetKeyGapLockAndExtraData()->ReleasePin(); cce->RecycleKeyLock(*shard_); - if (cce->IsFree()) + if (cce->PayloadStatus() == RecordStatus::Unknown && cce->IsFree()) { CleanEntry(cce, ccp); } From e5491a529a89e34a11e9d3dad06bb5131dd0ae91 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Thu, 29 Jan 2026 19:02:49 +0800 Subject: [PATCH 3/8] fix --- tx_service/include/cc/object_cc_map.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index 45cb9f39..65945291 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -1887,8 +1887,7 @@ class ObjectCcMap : public TemplateCcMap assert(cce); ccp = it.GetPage(); - if (commit_ts <= cce->CommitTs() || - commit_ts <= Sharder::Instance().NativeNodeGroupCkptTs()) + if (commit_ts <= cce->CommitTs()) { // Discard message since cce has a newer version or has been // checkpointed by leader node. From a1bea624e57fdaad04e0fad5bc267701d33f301a Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Thu, 29 Jan 2026 19:32:58 +0800 Subject: [PATCH 4/8] discard the forward msg if the cce does not exist and the msg has been checkpointed; override the cce if a newer version is Backfilled --- tx_service/include/cc/object_cc_map.h | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index 65945291..b5926e4a 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -1876,6 +1876,21 @@ class ObjectCcMap : public TemplateCcMap decoded_key.Deserialize(key_str->data(), offset, KeySchema()); const KeyT *look_key = &decoded_key; + if (Sharder::Instance().StandbyNodeTerm() >= 0 && + Sharder::Instance().GetDataStoreHandler()->IsSharedStorage() && + commit_ts < Sharder::Instance().NativeNodeGroupCkptTs()) + { + auto it = Find(*look_key); + if (it == End()) + { + // Discard the forward message since it has already been + // checkpointed. And the checkpointed data will be fetched when + // a forward message with bigger commit_ts than ckpt_ts is + // received. + return req.SetFinish(*shard_); + } + } + // first time the request is processed auto it = FindEmplace(*look_key); cce = it->second; @@ -1889,8 +1904,7 @@ class ObjectCcMap : public TemplateCcMap if (commit_ts <= cce->CommitTs()) { - // Discard message since cce has a newer version or has been - // checkpointed by leader node. + // Discard message since cce has a newer version. return req.SetFinish(*shard_); } else @@ -2541,9 +2555,12 @@ class ObjectCcMap : public TemplateCcMap } return true; } - // It's possible that first ReplayLogCc triggers FetchRecord and the - // second ReplayLogCc has_overwrite and overrides the cce. - if (cce->PayloadStatus() == RecordStatus::Unknown) + // It's possible that first ReplayLogCc/StandbyForwardCc triggers + // FetchRecord and the second ReplayLogCc/StandbyForwardCc has_overwrite + // and overrides the cce. Overrides the cce if the BackFilled version is + // newer. + if (cce->PayloadStatus() == RecordStatus::Unknown && + cce->CommitTs() < commit_ts) { cce->SetCommitTsPayloadStatus(commit_ts, status); cce->SetCkptTs(commit_ts); From f5b67651d8a5e43f9698ffc29bd87f80bef5bd96 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 2 Feb 2026 15:49:47 +0800 Subject: [PATCH 5/8] fix Sharder::NativeNodeGroupCkptTs --- tx_service/include/sharder.h | 1 - tx_service/src/sharder.cpp | 9 ++------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/tx_service/include/sharder.h b/tx_service/include/sharder.h index 47537464..4b3ce895 100644 --- a/tx_service/include/sharder.h +++ b/tx_service/include/sharder.h @@ -786,7 +786,6 @@ class Sharder private: uint32_t node_id_; uint32_t native_ng_; - fault::CcNode *native_cc_node_{}; std::string host_name_; uint16_t port_; // Whether is candidate of native node group. diff --git a/tx_service/src/sharder.cpp b/tx_service/src/sharder.cpp index 2ba05378..0c761d6c 100644 --- a/tx_service/src/sharder.cpp +++ b/tx_service/src/sharder.cpp @@ -254,7 +254,7 @@ int Sharder::Init( if (cluster_config_.ng_configs_.at(ng_id).at(idx).node_id_ == node_id_) { - auto [it, _] = cluster_config_.cc_nodes_.try_emplace( + cluster_config_.cc_nodes_.try_emplace( ng_id, std::make_shared( ng_id, @@ -262,10 +262,6 @@ int Sharder::Init( *local_shards_, log_agent_ != nullptr ? log_agent_->LogGroupCount() : 0)); - if (ng_id == native_ng_) - { - native_cc_node_ = it->second.get(); - } } } } @@ -1012,8 +1008,7 @@ uint64_t Sharder::GetNodeGroupCkptTs(uint32_t cc_ng_id) uint64_t Sharder::NativeNodeGroupCkptTs() { - assert(native_cc_node_ != nullptr); - return native_cc_node_->GetCkptTs(); + return GetNodeGroupCkptTs(native_ng_); } bool Sharder::UpdateNodeGroupCkptTs(uint32_t cc_ng_id, uint64_t ckpt_ts) From 57059013a25f46802e2630ccf302c813a1a3d470 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 2 Feb 2026 18:22:22 +0800 Subject: [PATCH 6/8] EscalateStandbyCcmCc updates ccmap and ccpage's dirty ts --- tx_service/include/cc/template_cc_map.h | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 0630c025..595653fd 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -7965,6 +7965,7 @@ class TemplateCcMap : public CcMap { CcEntry *cce = it->second; + auto *ccp = it.current_page_; if (txservice_skip_wal) { // If wal log is disabled, we need to flush all in memory cache @@ -7980,6 +7981,10 @@ class TemplateCcMap : public CcMap cce->SetCommitTsPayloadStatus(now_ts, status); OnCommittedUpdate(cce, was_dirty); assert(!cce->HasBufferedCommandList()); + if (now_ts > ccp->last_dirty_commit_ts_) + { + ccp->last_dirty_commit_ts_ = now_ts; + } } assert(!cce->HasBufferedCommandList() || status == RecordStatus::Unknown); @@ -7998,6 +8003,10 @@ class TemplateCcMap : public CcMap it++; } + if (now_ts > last_dirty_commit_ts_) + { + last_dirty_commit_ts_ = now_ts; + } return true; } From 6d3e470f82279e583de5cca61ba143126c20b045 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 2 Feb 2026 19:02:56 +0800 Subject: [PATCH 7/8] fix --- tx_service/include/cc/template_cc_map.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 595653fd..d1f9e269 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -7965,7 +7965,7 @@ class TemplateCcMap : public CcMap { CcEntry *cce = it->second; - auto *ccp = it.current_page_; + auto *ccp = it.GetPage(); if (txservice_skip_wal) { // If wal log is disabled, we need to flush all in memory cache From d0ee088e9b2b5e1a82b16e4d7708c9b66f24de18 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Wed, 4 Feb 2026 16:10:43 +0800 Subject: [PATCH 8/8] fix update ccm last_dirty_commit_ts_ --- tx_service/include/cc/template_cc_map.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index d1f9e269..ba458bf2 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -7961,6 +7961,7 @@ class TemplateCcMap : public CcMap auto end_it = End(); uint64_t ckpt_ts = req.PrimaryCkptTs(); uint64_t now_ts = shard_->Now(); + bool update_any = false; while (it != end_it) { CcEntry *cce = @@ -7979,6 +7980,7 @@ class TemplateCcMap : public CcMap { bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(now_ts, status); + update_any = true; OnCommittedUpdate(cce, was_dirty); assert(!cce->HasBufferedCommandList()); if (now_ts > ccp->last_dirty_commit_ts_) @@ -8003,7 +8005,7 @@ class TemplateCcMap : public CcMap it++; } - if (now_ts > last_dirty_commit_ts_) + if (update_any && now_ts > last_dirty_commit_ts_) { last_dirty_commit_ts_ = now_ts; }