Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions tx_service/include/cc/object_cc_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -1876,6 +1876,21 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
decoded_key.Deserialize(key_str->data(), offset, KeySchema());
const KeyT *look_key = &decoded_key;

if (Sharder::Instance().StandbyNodeTerm() >= 0 &&
Sharder::Instance().GetDataStoreHandler()->IsSharedStorage() &&
commit_ts < Sharder::Instance().NativeNodeGroupCkptTs())
{
auto it = Find(*look_key);
if (it == End())
{
// Discard the forward message since it has already been
// checkpointed. And the checkpointed data will be fetched when
// a forward message with bigger commit_ts than ckpt_ts is
// received.
return req.SetFinish(*shard_);
}
}

// first time the request is processed
auto it = FindEmplace(*look_key);
cce = it->second;
Expand Down Expand Up @@ -2533,16 +2548,19 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
if (status == RecordStatus::Unknown)
{
// fetch record fails.
if (cce->IsFree())
if (cce->PayloadStatus() == RecordStatus::Unknown && cce->IsFree())
{
// Remove cce if it is not referenced by anyone.
CleanEntry(entry, ccp);
}
return true;
}
// It's possible that first ReplayLogCc triggers FetchRecord and the
// second ReplayLogCc has_overwrite and overrides the cce.
if (cce->PayloadStatus() == RecordStatus::Unknown)
// It's possible that first ReplayLogCc/StandbyForwardCc triggers
// FetchRecord and the second ReplayLogCc/StandbyForwardCc has_overwrite
// and overrides the cce. Overrides the cce if the BackFilled version is
// newer.
if (cce->PayloadStatus() == RecordStatus::Unknown &&
cce->CommitTs() < commit_ts)
{
cce->SetCommitTsPayloadStatus(commit_ts, status);
cce->SetCkptTs(commit_ts);
Expand Down
13 changes: 12 additions & 1 deletion tx_service/include/cc/template_cc_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -7961,10 +7961,12 @@ class TemplateCcMap : public CcMap
auto end_it = End();
uint64_t ckpt_ts = req.PrimaryCkptTs();
uint64_t now_ts = shard_->Now();
bool update_any = false;
while (it != end_it)
{
CcEntry<KeyT, ValueT, VersionedRecord, RangePartitioned> *cce =
it->second;
auto *ccp = it.GetPage();
if (txservice_skip_wal)
{
// If wal log is disabled, we need to flush all in memory cache
Expand All @@ -7978,8 +7980,13 @@ class TemplateCcMap : public CcMap
{
bool was_dirty = cce->IsDirty();
cce->SetCommitTsPayloadStatus(now_ts, status);
update_any = true;
OnCommittedUpdate(cce, was_dirty);
assert(!cce->HasBufferedCommandList());
if (now_ts > ccp->last_dirty_commit_ts_)
{
ccp->last_dirty_commit_ts_ = now_ts;
}
}
assert(!cce->HasBufferedCommandList() ||
status == RecordStatus::Unknown);
Expand All @@ -7998,6 +8005,10 @@ class TemplateCcMap : public CcMap
it++;
}

if (update_any && now_ts > last_dirty_commit_ts_)
{
last_dirty_commit_ts_ = now_ts;
}
return true;
}

Expand Down Expand Up @@ -10442,7 +10453,7 @@ class TemplateCcMap : public CcMap
// fetch record fails. Remove the pin on the cce.
cce->GetKeyGapLockAndExtraData()->ReleasePin();
cce->RecycleKeyLock(*shard_);
if (cce->IsFree())
if (cce->PayloadStatus() == RecordStatus::Unknown && cce->IsFree())
{
CleanEntry(cce, ccp);
}
Expand Down
2 changes: 2 additions & 0 deletions tx_service/include/sharder.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,8 @@ class Sharder

uint64_t GetNodeGroupCkptTs(uint32_t cc_ng_id);

uint64_t NativeNodeGroupCkptTs();

Comment on lines 533 to +536
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find where native_cc_node_ is declared, set, and used
rg -n "native_cc_node_" --type cpp --type h -C 2

Repository: eloqdata/tx_service

Length of output: 1009


🏁 Script executed:

# Find all call sites of NativeNodeGroupCkptTs()
rg -n "NativeNodeGroupCkptTs\s*\(" --type cpp --type h -B 2 -A 2

Repository: eloqdata/tx_service

Length of output: 1394


🏁 Script executed:

# Locate the implementation of NativeNodeGroupCkptTs in the .cpp file
fd -e cpp | xargs grep -l "NativeNodeGroupCkptTs" | head -1 | xargs cat | head -100

Repository: eloqdata/tx_service

Length of output: 3332


Make NativeNodeGroupCkptTs() const and guard against uninitialized native_cc_node_.

The assert() at line 1015 won't protect against null deref in release builds. In production, calling NativeNodeGroupCkptTs() before Init() or when the native node group isn't configured will crash. Additionally, this accessor should be const since it has no side effects and is called from const contexts like IsPersistent().

♻️ Suggested header tweak
-    uint64_t NativeNodeGroupCkptTs();
+    uint64_t NativeNodeGroupCkptTs() const;

Consider either: (1) adding explicit initialization checks with a fallback value, (2) documenting that this method must only be called after Init() completes and verifying that precondition at all call sites, or (3) using a guard pattern to safely access the underlying checkpoint timestamp.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
uint64_t GetNodeGroupCkptTs(uint32_t cc_ng_id);
uint64_t NativeNodeGroupCkptTs();
uint64_t GetNodeGroupCkptTs(uint32_t cc_ng_id);
uint64_t NativeNodeGroupCkptTs() const;
🤖 Prompt for AI Agents
In `@tx_service/include/sharder.h` around lines 533 - 536, Change the
NativeNodeGroupCkptTs accessor to be a const method and make it null-safe:
update its declaration to uint64_t NativeNodeGroupCkptTs() const and in the
implementation check whether native_cc_node_ is initialized before
dereferencing; if native_cc_node_ is null return a safe fallback (e.g., 0) and
optionally emit a diagnostic/log in debug builds (retain the assert only for
debug). Ensure callers like IsPersistent() continue to work with the new const
signature.

bool UpdateNodeGroupCkptTs(uint32_t cc_ng_id, uint64_t ckpt_ts);

TxWorkerPool *GetTxWorkerPool()
Expand Down
5 changes: 2 additions & 3 deletions tx_service/src/cc/cc_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ bool VersionedLruEntry<Versioned, RangePartitioned>::IsPersistent() const
if (Sharder::Instance().StandbyNodeTerm() >= 0 &&
Sharder::Instance().GetDataStoreHandler()->IsSharedStorage())
{
// If this is a follower with shared kv, all cce is treated as persisted
// since primary node will write them to kv.
return true;
// If this is a follower with shared kv, check the ng leader's ckpt_ts.
return CommitTs() <= Sharder::Instance().NativeNodeGroupCkptTs();
}

if (Versioned)
Expand Down
5 changes: 5 additions & 0 deletions tx_service/src/sharder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,11 @@ uint64_t Sharder::GetNodeGroupCkptTs(uint32_t cc_ng_id)
return 0;
}

uint64_t Sharder::NativeNodeGroupCkptTs()
{
return GetNodeGroupCkptTs(native_ng_);
}

bool Sharder::UpdateNodeGroupCkptTs(uint32_t cc_ng_id, uint64_t ckpt_ts)
{
std::shared_lock<std::shared_mutex> lk(cluster_cnf_mux_);
Expand Down