diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index 8f692e2c..b5926e4a 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -1876,6 +1876,21 @@ class ObjectCcMap : public TemplateCcMap decoded_key.Deserialize(key_str->data(), offset, KeySchema()); const KeyT *look_key = &decoded_key; + if (Sharder::Instance().StandbyNodeTerm() >= 0 && + Sharder::Instance().GetDataStoreHandler()->IsSharedStorage() && + commit_ts < Sharder::Instance().NativeNodeGroupCkptTs()) + { + auto it = Find(*look_key); + if (it == End()) + { + // Discard the forward message since it has already been + // checkpointed. And the checkpointed data will be fetched when + // a forward message with bigger commit_ts than ckpt_ts is + // received. + return req.SetFinish(*shard_); + } + } + // first time the request is processed auto it = FindEmplace(*look_key); cce = it->second; @@ -2533,16 +2548,19 @@ class ObjectCcMap : public TemplateCcMap if (status == RecordStatus::Unknown) { // fetch record fails. - if (cce->IsFree()) + if (cce->PayloadStatus() == RecordStatus::Unknown && cce->IsFree()) { // Remove cce if it is not referenced by anyone. CleanEntry(entry, ccp); } return true; } - // It's possible that first ReplayLogCc triggers FetchRecord and the - // second ReplayLogCc has_overwrite and overrides the cce. - if (cce->PayloadStatus() == RecordStatus::Unknown) + // It's possible that first ReplayLogCc/StandbyForwardCc triggers + // FetchRecord and the second ReplayLogCc/StandbyForwardCc has_overwrite + // and overrides the cce. Overrides the cce if the BackFilled version is + // newer. + if (cce->PayloadStatus() == RecordStatus::Unknown && + cce->CommitTs() < commit_ts) { cce->SetCommitTsPayloadStatus(commit_ts, status); cce->SetCkptTs(commit_ts); diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index f2a00630..ba458bf2 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -7961,10 +7961,12 @@ class TemplateCcMap : public CcMap auto end_it = End(); uint64_t ckpt_ts = req.PrimaryCkptTs(); uint64_t now_ts = shard_->Now(); + bool update_any = false; while (it != end_it) { CcEntry *cce = it->second; + auto *ccp = it.GetPage(); if (txservice_skip_wal) { // If wal log is disabled, we need to flush all in memory cache @@ -7978,8 +7980,13 @@ class TemplateCcMap : public CcMap { bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(now_ts, status); + update_any = true; OnCommittedUpdate(cce, was_dirty); assert(!cce->HasBufferedCommandList()); + if (now_ts > ccp->last_dirty_commit_ts_) + { + ccp->last_dirty_commit_ts_ = now_ts; + } } assert(!cce->HasBufferedCommandList() || status == RecordStatus::Unknown); @@ -7998,6 +8005,10 @@ class TemplateCcMap : public CcMap it++; } + if (update_any && now_ts > last_dirty_commit_ts_) + { + last_dirty_commit_ts_ = now_ts; + } return true; } @@ -10442,7 +10453,7 @@ class TemplateCcMap : public CcMap // fetch record fails. Remove the pin on the cce. cce->GetKeyGapLockAndExtraData()->ReleasePin(); cce->RecycleKeyLock(*shard_); - if (cce->IsFree()) + if (cce->PayloadStatus() == RecordStatus::Unknown && cce->IsFree()) { CleanEntry(cce, ccp); } diff --git a/tx_service/include/sharder.h b/tx_service/include/sharder.h index 4586505d..4b3ce895 100644 --- a/tx_service/include/sharder.h +++ b/tx_service/include/sharder.h @@ -532,6 +532,8 @@ class Sharder uint64_t GetNodeGroupCkptTs(uint32_t cc_ng_id); + uint64_t NativeNodeGroupCkptTs(); + bool UpdateNodeGroupCkptTs(uint32_t cc_ng_id, uint64_t ckpt_ts); TxWorkerPool *GetTxWorkerPool() diff --git a/tx_service/src/cc/cc_entry.cpp b/tx_service/src/cc/cc_entry.cpp index 27e8c861..4610841b 100644 --- a/tx_service/src/cc/cc_entry.cpp +++ b/tx_service/src/cc/cc_entry.cpp @@ -63,9 +63,8 @@ bool VersionedLruEntry::IsPersistent() const if (Sharder::Instance().StandbyNodeTerm() >= 0 && Sharder::Instance().GetDataStoreHandler()->IsSharedStorage()) { - // If this is a follower with shared kv, all cce is treated as persisted - // since primary node will write them to kv. - return true; + // If this is a follower with shared kv, check the ng leader's ckpt_ts. + return CommitTs() <= Sharder::Instance().NativeNodeGroupCkptTs(); } if (Versioned) diff --git a/tx_service/src/sharder.cpp b/tx_service/src/sharder.cpp index b1e5df45..0c761d6c 100644 --- a/tx_service/src/sharder.cpp +++ b/tx_service/src/sharder.cpp @@ -1006,6 +1006,11 @@ uint64_t Sharder::GetNodeGroupCkptTs(uint32_t cc_ng_id) return 0; } +uint64_t Sharder::NativeNodeGroupCkptTs() +{ + return GetNodeGroupCkptTs(native_ng_); +} + bool Sharder::UpdateNodeGroupCkptTs(uint32_t cc_ng_id, uint64_t ckpt_ts) { std::shared_lock lk(cluster_cnf_mux_);