Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions tx_service/include/cc/cc_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,17 @@ struct VersionedLruEntry : public LruEntry

bool IsPersistent() const;

/**
* @brief Check if the entry is dirty (not yet checkpointed).
* This function only relies on CommitTs and CkptTs or
* commit_ts_and_status_, and does not consider standby node status. Use
* this strictly for dirty size tracking.
*
* @return true if the entry is dirty (CommitTs > CkptTs for versioned,
* or flush bit not set for non-versioned), false otherwise.
*/
bool IsDirty() const;
Comment on lines +632 to +641
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Document the CommitTs() <= 1 exclusion.

The implementation returns false for initial entries even when the flush bit isn’t set, but the comment doesn’t mention this. Please update the doc to avoid misinterpretation in metrics/tests.

✏️ Doc-only fix
-     * `@return` true if the entry is dirty (CommitTs > CkptTs for versioned,
-     *         or flush bit not set for non-versioned), false otherwise.
+     * `@return` true if the entry is dirty (CommitTs > 1 and CommitTs > CkptTs for
+     *         versioned, or CommitTs > 1 and flush bit not set for non-versioned),
+     *         false otherwise.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/**
* @brief Check if the entry is dirty (not yet checkpointed).
* This function only relies on CommitTs and CkptTs or commit_ts_and_status_,
* and does not consider standby node status. Use this strictly for dirty
* size tracking.
*
* @return true if the entry is dirty (CommitTs > CkptTs for versioned,
* or flush bit not set for non-versioned), false otherwise.
*/
bool IsDirty() const;
/**
* `@brief` Check if the entry is dirty (not yet checkpointed).
* This function only relies on CommitTs and CkptTs or commit_ts_and_status_,
* and does not consider standby node status. Use this strictly for dirty
* size tracking.
*
* `@return` true if the entry is dirty (CommitTs > 1 and CommitTs > CkptTs for
* versioned, or CommitTs > 1 and flush bit not set for non-versioned),
* false otherwise.
*/
bool IsDirty() const;
🤖 Prompt for AI Agents
In `@tx_service/include/cc/cc_entry.h` around lines 632 - 641, Update the
IsDirty() doc to explicitly state that entries with CommitTs() <= 1 are treated
as non-dirty (i.e., IsDirty() returns false) even if the non-versioned flush bit
isn’t set; mention that this special-case mirrors the code path that checks
CommitTs(), CkptTs(), and commit_ts_and_status_ for initial entries and is
intended for dirty size tracking and metrics/tests to avoid misinterpretation.


RecordStatus PayloadStatus() const;

void SetCommitTsPayloadStatus(uint64_t ts, RecordStatus status);
Expand Down
7 changes: 6 additions & 1 deletion tx_service/include/cc/cc_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,18 @@ class CcMap
virtual bool Execute(ScanSliceDeltaSizeCcForRangePartition &req) = 0;
virtual bool Execute(ScanDeltaSizeCcForHashPartition &req) = 0;

virtual size_t size() const = 0;
virtual size_t NormalObjectSize()
{
assert(false);
return 0;
}

// Notify ccmap when an entry's ckpt ts is updated and persistence may have
// changed.
virtual void OnEntryFlushed(bool was_dirty, bool is_persistent)
{
}

virtual std::pair<size_t, LruPage *> CleanPageAndReBalance(
LruPage *page,
KickoutCcEntryCc *kickout_cc = nullptr,
Expand Down
11 changes: 11 additions & 0 deletions tx_service/include/cc/cc_page_clean_guard.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ struct CcPageCleanGuard
return free_cnt_;
}

// Number of dirty keys freed.
size_t DirtyFreedCount() const
{
return dirty_freed_cnt_;
}

// If any valid key is evicted. This is used to update if ccm is still fully
// cached. Note that this does not include
// 1. Deleted/expired keys. Freeing deleted keys does not affect cache
Expand Down Expand Up @@ -317,6 +323,10 @@ struct CcPageCleanGuard
evicted_valid_key_ = true;
}
}
if (cce->IsDirty())
{
++dirty_freed_cnt_;
}

cce->ClearLocks(*cc_shard_, cc_ng_id);
clean_set_.set(idx, true);
Expand All @@ -330,6 +340,7 @@ struct CcPageCleanGuard
CcPage<KeyT, ValueT, VersionedRecord, RangePartitioned> *page_{nullptr};
uint64_t last_commit_ts_{0};
uint64_t free_cnt_{0};
uint64_t dirty_freed_cnt_{0};
bool evicted_valid_key_{false};
uint64_t clean_obj_cnt_{0};

Expand Down
11 changes: 4 additions & 7 deletions tx_service/include/cc/cc_req_misc.h
Original file line number Diff line number Diff line change
Expand Up @@ -970,14 +970,12 @@ struct UpdateCceCkptTsCc : public CcRequestBase
UpdateCceCkptTsCc(
NodeGroupId node_group_id,
int64_t term,
absl::flat_hash_map<size_t, std::vector<CkptTsEntry>> &cce_entries,
bool range_partitioned,
bool versioned_payload)
const TableName &table_name,
absl::flat_hash_map<size_t, std::vector<CkptTsEntry>> &cce_entries)
: cce_entries_(cce_entries),
node_group_id_(node_group_id),
term_(term),
range_partitioned_(range_partitioned),
versioned_payload_(versioned_payload)
table_name_(table_name)
{
unfinished_core_cnt_ = cce_entries_.size();
assert(unfinished_core_cnt_ > 0);
Expand Down Expand Up @@ -1026,8 +1024,7 @@ struct UpdateCceCkptTsCc : public CcRequestBase
size_t unfinished_core_cnt_;
NodeGroupId node_group_id_;
int64_t term_;
bool range_partitioned_{false};
bool versioned_payload_{false};
TableName table_name_;
bthread::Mutex mux_;
bthread::ConditionVariable cv_;
};
Expand Down
18 changes: 17 additions & 1 deletion tx_service/include/cc/cc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -3049,6 +3049,8 @@ struct CkptTsCc : public CcRequestBase
memory_committed_vec_.emplace_back(0);
heap_full_vec_.emplace_back(false);
standby_msg_seq_id_vec_.emplace_back(0);
total_key_cnt_vec_.emplace_back(0);
dirty_key_cnt_vec_.emplace_back(0);
}
}

Expand Down Expand Up @@ -3088,6 +3090,9 @@ struct CkptTsCc : public CcRequestBase
memory_allocated_vec_[ccs.LocalCoreId()] = allocated;
memory_committed_vec_[ccs.LocalCoreId()] = committed;
heap_full_vec_[ccs.LocalCoreId()] = full;
auto [total_keys, dirty_keys] = ccs.GetDataKeyStats();
total_key_cnt_vec_[ccs.LocalCoreId()] = total_keys;
dirty_key_cnt_vec_[ccs.LocalCoreId()] = dirty_keys;

std::unique_lock lk(mux_);
if (--unfinish_cnt_ == 0)
Expand Down Expand Up @@ -3149,13 +3154,22 @@ struct CkptTsCc : public CcRequestBase
uint64_t &allocated = memory_allocated_vec_[core_id];
uint64_t &committed = memory_committed_vec_[core_id];
bool heap_full = heap_full_vec_[core_id];
size_t total_keys = total_key_cnt_vec_[core_id];
size_t dirty_keys = dirty_key_cnt_vec_[core_id];
double dirty_ratio = total_keys == 0
? 0.0
: static_cast<double>(dirty_keys) /
static_cast<double>(total_keys);

LOG(INFO) << "ccs " << core_id << " memory usage report, committed "
<< committed << ", allocated " << allocated
<< ", frag ratio " << std::setprecision(2)
<< 100 * (static_cast<float>(committed - allocated) /
committed)
<< " , heap full: " << heap_full;
<< " , heap full: " << heap_full
<< ", dirty key ratio: " << std::setprecision(4)
<< dirty_ratio << " (" << dirty_keys << "/" << total_keys
<< ")";
}
}

Expand Down Expand Up @@ -3203,6 +3217,8 @@ struct CkptTsCc : public CcRequestBase
size_t unfinish_cnt_;
std::vector<uint64_t> memory_allocated_vec_;
std::vector<uint64_t> memory_committed_vec_;
std::vector<size_t> total_key_cnt_vec_;
std::vector<size_t> dirty_key_cnt_vec_;
std::vector<uint64_t> standby_msg_seq_id_vec_;
std::vector<uint32_t> subscribed_node_ids_;
std::vector<bool> heap_full_vec_;
Expand Down
11 changes: 11 additions & 0 deletions tx_service/include/cc/cc_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ class CcShard
*/
CcMap *GetCcm(const TableName &table_name, uint32_t node_group);

void AdjustDataKeyStats(const TableName &table_name,
int64_t size_delta,
int64_t dirty_delta);

std::pair<size_t, size_t> GetDataKeyStats() const;

void InitializeShardHeap()
{
if (shard_heap_thread_id_ == 0)
Expand Down Expand Up @@ -1281,6 +1287,11 @@ class CcShard
// The number of ccentry in all the ccmap of this ccshard.
uint64_t size_;

// The number of keys in data tables only (meta tables excluded).
size_t data_key_count_{0};
// The number of committed dirty keys in data tables only.
size_t dirty_data_key_count_{0};

Checkpointer *ckpter_;

/**
Expand Down
76 changes: 39 additions & 37 deletions tx_service/include/cc/local_cc_shards.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,43 +352,45 @@ class LocalCcShards

void PrintCcMap()
{
std::unordered_map<TableName, size_t>
mapsizes; // not string owner, sv -> native_ccms_
for (const auto &cc_shard : cc_shards_)
{
CcShard &shard = *cc_shard;

for (auto map_iter = shard.native_ccms_.begin();
map_iter != shard.native_ccms_.end();
++map_iter)
{
const TableName &tab_name = map_iter->first;
auto find_iter = mapsizes.find(tab_name);
size_t cnt = 0;
if (find_iter != mapsizes.end())
{
cnt = find_iter->second;
cnt += map_iter->second->size();
find_iter->second = cnt;
}
else
{
mapsizes.emplace(
std::piecewise_construct,
std::forward_as_tuple(tab_name.StringView(),
tab_name.Type(),
tab_name.Engine()),
std::forward_as_tuple(map_iter->second->size()));
}

std::cout << "Table '" << tab_name.StringView() << "' core ID "
<< shard.core_id_ << ": " << map_iter->second->size()
<< std::endl;

assert(map_iter->second->VerifyOrdering() ==
map_iter->second->size());
}
}
// std::unordered_map<TableName, size_t>
// mapsizes; // not string owner, sv -> native_ccms_
// for (const auto &cc_shard : cc_shards_)
// {
// CcShard &shard = *cc_shard;

// for (auto map_iter = shard.native_ccms_.begin();
// map_iter != shard.native_ccms_.end();
// ++map_iter)
// {
// const TableName &tab_name = map_iter->first;
// auto find_iter = mapsizes.find(tab_name);
// size_t cnt = 0;
// if (find_iter != mapsizes.end())
// {
// cnt = find_iter->second;
// cnt += map_iter->second->size();
// find_iter->second = cnt;
// }
// else
// {
// mapsizes.emplace(
// std::piecewise_construct,
// std::forward_as_tuple(tab_name.StringView(),
// tab_name.Type(),
// tab_name.Engine()),
// std::forward_as_tuple(map_iter->second->size()));
// }

// std::cout << "Table '" << tab_name.StringView() << "' core ID
// "
// << shard.core_id_ << ": " <<
// map_iter->second->size()
// << std::endl;

// assert(map_iter->second->VerifyOrdering() ==
// map_iter->second->size());
// }
// }

/*for (auto map_iter = mapsizes.begin(); map_iter != mapsizes.end();
++map_iter)
Expand Down
22 changes: 22 additions & 0 deletions tx_service/include/cc/object_cc_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -959,8 +959,10 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
cce->ReleaseForwardEntry();
shard_->ForwardStandbyMessage(entry_ptr.release());
}
bool was_dirty = cce->IsDirty();
cce->SetCommitTsPayloadStatus(commit_ts,
RecordStatus::Deleted);
this->OnCommittedUpdate(cce, was_dirty);
// Release and try to recycle the lock.
assert(acquired_lock != LockType::NoLock);
ReleaseCceLock(
Expand Down Expand Up @@ -1214,7 +1216,9 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
// PostWriteCc if apply_and_commit_.
const uint64_t commit_ts =
std::max({cce->CommitTs() + 1, req.TxTs(), shard_->Now()});
bool was_dirty = cce->IsDirty();
cce->SetCommitTsPayloadStatus(commit_ts, status);
this->OnCommittedUpdate(cce, was_dirty);

if (forward_entry)
{
Expand Down Expand Up @@ -1426,7 +1430,9 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
cce->ReleaseForwardEntry();
shard_->ForwardStandbyMessage(entry_ptr.release());
}
bool was_dirty = cce->IsDirty();
cce->SetCommitTsPayloadStatus(commit_ts, payload_status);
this->OnCommittedUpdate(cce, was_dirty);
// It's possible that the cce HasBufferedCommandList and is still in
// unknown status (because FetchRecord fails) and this command
// ignores kv value. Need to clear the buffered commands when a new
Expand Down Expand Up @@ -1647,6 +1653,7 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
ttl = 0;
}

bool was_dirty = cce->IsDirty();
cce->SetCommitTsPayloadStatus(commit_ts, rec_status);
if (req.Kind() == UploadBatchType::DirtyBucketData)
{
Expand Down Expand Up @@ -1682,6 +1689,10 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
{
cce->SetCommitTsPayloadStatus(commit_ts, RecordStatus::Deleted);
}
// Since we have updated both ckpt ts and commit ts, we need to call
// OnFlushed to update the dirty size.
this->OnFlushed(cce, was_dirty);
this->OnCommittedUpdate(cce, was_dirty);
Comment on lines +1692 to +1695
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

OnFlushed called unconditionally, but SetCkptTs is conditional.

The comment states "we have updated both ckpt ts and commit ts," but SetCkptTs at line 1660 is only called when req.Kind() == UploadBatchType::DirtyBucketData. Calling OnFlushed when the checkpoint timestamp wasn't updated will cause incorrect dirty-size accounting.

🐛 Proposed fix
-            // Since we have updated both ckpt ts and commit ts, we need to call
-            // OnFlushed to update the dirty size.
-            this->OnFlushed(cce, was_dirty);
+            if (req.Kind() == UploadBatchType::DirtyBucketData)
+            {
+                // Since we have updated both ckpt ts and commit ts, we need to call
+                // OnFlushed to update the dirty size.
+                this->OnFlushed(cce, was_dirty);
+            }
             this->OnCommittedUpdate(cce, was_dirty);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Since we have updated both ckpt ts and commit ts, we need to call
// OnFlushed to update the dirty size.
this->OnFlushed(cce, was_dirty);
this->OnCommittedUpdate(cce, was_dirty);
if (req.Kind() == UploadBatchType::DirtyBucketData)
{
// Since we have updated both ckpt ts and commit ts, we need to call
// OnFlushed to update the dirty size.
this->OnFlushed(cce, was_dirty);
}
this->OnCommittedUpdate(cce, was_dirty);
🤖 Prompt for AI Agents
In `@tx_service/include/cc/object_cc_map.h` around lines 1692 - 1695, The code
calls this->OnFlushed(cce, was_dirty) unconditionally even though SetCkptTs is
only invoked when req.Kind() == UploadBatchType::DirtyBucketData; change the
logic so OnFlushed is only called when the checkpoint timestamp was actually
updated (i.e., guard the OnFlushed(cce, was_dirty) call behind the same
condition that calls SetCkptTs), while leaving this->OnCommittedUpdate(cce,
was_dirty) executed as before; reference SetCkptTs, OnFlushed,
OnCommittedUpdate, req.Kind(), UploadBatchType::DirtyBucketData, cce, and
was_dirty to locate and update the code path.

DLOG_IF(INFO, TRACE_OCC_ERR)
<< "UploadBatchCc, txn:" << req.Txn() << " ,cce: " << cce
<< " ,commit_ts: " << commit_ts;
Expand Down Expand Up @@ -1775,8 +1786,10 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>

assert(txn_cmd.new_version_ > cce->CommitTs());
int64_t buffered_cmd_cnt_old = buffered_cmd_list.Size();
bool was_dirty = cce->IsDirty();
cce->EmplaceAndCommitBufferedTxnCommand(
txn_cmd, shard_->NowInMilliseconds());
this->OnCommittedUpdate(cce, was_dirty);
int64_t buffered_cmd_cnt_new = buffered_cmd_list.Size();
shard_->UpdateBufferedCommandCnt(buffered_cmd_cnt_new -
buffered_cmd_cnt_old);
Expand Down Expand Up @@ -1942,7 +1955,9 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
cce->payload_.cur_payload_ == nullptr
? RecordStatus::Deleted
: RecordStatus::Normal;
bool was_dirty = (cce->CommitTs() > 1 && !cce->IsPersistent());
cce->SetCommitTsPayloadStatus(commit_ts, payload_status);
this->OnCommittedUpdate(cce, was_dirty);
if (s_obj_exist && payload_status != RecordStatus::Normal)
{
TemplateCcMap<KeyT, ValueT, false, false>::normal_obj_sz_--;
Expand Down Expand Up @@ -1976,8 +1991,10 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>

// Emplace txn_cmd and try to commit all pending commands.
int64_t buffered_cmd_cnt_old = buffered_cmd_list.Size();
bool was_dirty = (cce->CommitTs() > 1 && !cce->IsPersistent());
cce->EmplaceAndCommitBufferedTxnCommand(
txn_cmd, shard_->NowInMilliseconds());
this->OnCommittedUpdate(cce, was_dirty);
RecordStatus new_status = cce->PayloadStatus();
if (s_obj_exist && new_status != RecordStatus::Normal)
{
Expand Down Expand Up @@ -2243,6 +2260,7 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
uint64_t current_version = cce->CommitTs();
RecordStatus payload_status = cce->PayloadStatus();
bool s_obj_exist = (payload_status == RecordStatus::Normal);
bool was_dirty = cce->IsDirty();
if (commit_ts <= current_version)
{
// If the log record's commit ts is smaller than or equal to the
Expand Down Expand Up @@ -2409,6 +2427,8 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
++TemplateCcMap<KeyT, ValueT, false, false>::normal_obj_sz_;
}

this->OnCommittedUpdate(cce, was_dirty);

// Must update dirty_commit_ts. Otherwise, this entry may be
// skipped by checkpointer.
if (commit_ts > last_dirty_commit_ts_)
Expand Down Expand Up @@ -2559,6 +2579,7 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
}

uint64_t commit_version = commit_ts;
bool was_dirty = cce->IsDirty();
cce->TryCommitBufferedCommands(commit_version,
shard_->NowInMilliseconds());
int64_t buffered_cmd_cnt_new = buffered_cmd_list.Size();
Expand All @@ -2569,6 +2590,7 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
? RecordStatus::Deleted
: RecordStatus::Normal;
cce->SetCommitTsPayloadStatus(commit_version, commit_status);
this->OnCommittedUpdate(cce, was_dirty);

if (buffered_cmd_list.Empty())
{
Expand Down
Loading