diff --git a/tx_service/include/cc/cc_entry.h b/tx_service/include/cc/cc_entry.h index 12169d9d..725a5a43 100644 --- a/tx_service/include/cc/cc_entry.h +++ b/tx_service/include/cc/cc_entry.h @@ -629,6 +629,17 @@ struct VersionedLruEntry : public LruEntry bool IsPersistent() const; + /** + * @brief Check if the entry is dirty (not yet checkpointed). + * This function only relies on CommitTs and CkptTs or + * commit_ts_and_status_, and does not consider standby node status. Use + * this strictly for dirty size tracking. + * + * @return true if the entry is dirty (CommitTs > CkptTs for versioned, + * or flush bit not set for non-versioned), false otherwise. + */ + bool IsDirty() const; + RecordStatus PayloadStatus() const; void SetCommitTsPayloadStatus(uint64_t ts, RecordStatus status); diff --git a/tx_service/include/cc/cc_map.h b/tx_service/include/cc/cc_map.h index 1443b94d..0d1434b6 100644 --- a/tx_service/include/cc/cc_map.h +++ b/tx_service/include/cc/cc_map.h @@ -207,13 +207,18 @@ class CcMap virtual bool Execute(ScanSliceDeltaSizeCcForRangePartition &req) = 0; virtual bool Execute(ScanDeltaSizeCcForHashPartition &req) = 0; - virtual size_t size() const = 0; virtual size_t NormalObjectSize() { assert(false); return 0; } + // Notify ccmap when an entry's ckpt ts is updated and persistence may have + // changed. + virtual void OnEntryFlushed(bool was_dirty, bool is_persistent) + { + } + virtual std::pair CleanPageAndReBalance( LruPage *page, KickoutCcEntryCc *kickout_cc = nullptr, diff --git a/tx_service/include/cc/cc_page_clean_guard.h b/tx_service/include/cc/cc_page_clean_guard.h index c7efecd3..39c1c316 100644 --- a/tx_service/include/cc/cc_page_clean_guard.h +++ b/tx_service/include/cc/cc_page_clean_guard.h @@ -134,6 +134,12 @@ struct CcPageCleanGuard return free_cnt_; } + // Number of dirty keys freed. + size_t DirtyFreedCount() const + { + return dirty_freed_cnt_; + } + // If any valid key is evicted. This is used to update if ccm is still fully // cached. Note that this does not include // 1. Deleted/expired keys. Freeing deleted keys does not affect cache @@ -317,6 +323,10 @@ struct CcPageCleanGuard evicted_valid_key_ = true; } } + if (cce->IsDirty()) + { + ++dirty_freed_cnt_; + } cce->ClearLocks(*cc_shard_, cc_ng_id); clean_set_.set(idx, true); @@ -330,6 +340,7 @@ struct CcPageCleanGuard CcPage *page_{nullptr}; uint64_t last_commit_ts_{0}; uint64_t free_cnt_{0}; + uint64_t dirty_freed_cnt_{0}; bool evicted_valid_key_{false}; uint64_t clean_obj_cnt_{0}; diff --git a/tx_service/include/cc/cc_req_misc.h b/tx_service/include/cc/cc_req_misc.h index c17427d2..ed9ea8b9 100644 --- a/tx_service/include/cc/cc_req_misc.h +++ b/tx_service/include/cc/cc_req_misc.h @@ -970,14 +970,12 @@ struct UpdateCceCkptTsCc : public CcRequestBase UpdateCceCkptTsCc( NodeGroupId node_group_id, int64_t term, - absl::flat_hash_map> &cce_entries, - bool range_partitioned, - bool versioned_payload) + const TableName &table_name, + absl::flat_hash_map> &cce_entries) : cce_entries_(cce_entries), node_group_id_(node_group_id), term_(term), - range_partitioned_(range_partitioned), - versioned_payload_(versioned_payload) + table_name_(table_name) { unfinished_core_cnt_ = cce_entries_.size(); assert(unfinished_core_cnt_ > 0); @@ -1026,8 +1024,7 @@ struct UpdateCceCkptTsCc : public CcRequestBase size_t unfinished_core_cnt_; NodeGroupId node_group_id_; int64_t term_; - bool range_partitioned_{false}; - bool versioned_payload_{false}; + TableName table_name_; bthread::Mutex mux_; bthread::ConditionVariable cv_; }; diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index a1f3c0b8..97e93fae 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -3049,6 +3049,8 @@ struct CkptTsCc : public CcRequestBase memory_committed_vec_.emplace_back(0); heap_full_vec_.emplace_back(false); standby_msg_seq_id_vec_.emplace_back(0); + total_key_cnt_vec_.emplace_back(0); + dirty_key_cnt_vec_.emplace_back(0); } } @@ -3088,6 +3090,9 @@ struct CkptTsCc : public CcRequestBase memory_allocated_vec_[ccs.LocalCoreId()] = allocated; memory_committed_vec_[ccs.LocalCoreId()] = committed; heap_full_vec_[ccs.LocalCoreId()] = full; + auto [total_keys, dirty_keys] = ccs.GetDataKeyStats(); + total_key_cnt_vec_[ccs.LocalCoreId()] = total_keys; + dirty_key_cnt_vec_[ccs.LocalCoreId()] = dirty_keys; std::unique_lock lk(mux_); if (--unfinish_cnt_ == 0) @@ -3149,13 +3154,22 @@ struct CkptTsCc : public CcRequestBase uint64_t &allocated = memory_allocated_vec_[core_id]; uint64_t &committed = memory_committed_vec_[core_id]; bool heap_full = heap_full_vec_[core_id]; + size_t total_keys = total_key_cnt_vec_[core_id]; + size_t dirty_keys = dirty_key_cnt_vec_[core_id]; + double dirty_ratio = total_keys == 0 + ? 0.0 + : static_cast(dirty_keys) / + static_cast(total_keys); LOG(INFO) << "ccs " << core_id << " memory usage report, committed " << committed << ", allocated " << allocated << ", frag ratio " << std::setprecision(2) << 100 * (static_cast(committed - allocated) / committed) - << " , heap full: " << heap_full; + << " , heap full: " << heap_full + << ", dirty key ratio: " << std::setprecision(4) + << dirty_ratio << " (" << dirty_keys << "/" << total_keys + << ")"; } } @@ -3203,6 +3217,8 @@ struct CkptTsCc : public CcRequestBase size_t unfinish_cnt_; std::vector memory_allocated_vec_; std::vector memory_committed_vec_; + std::vector total_key_cnt_vec_; + std::vector dirty_key_cnt_vec_; std::vector standby_msg_seq_id_vec_; std::vector subscribed_node_ids_; std::vector heap_full_vec_; diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index 2f04a475..ee977e68 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -313,6 +313,12 @@ class CcShard */ CcMap *GetCcm(const TableName &table_name, uint32_t node_group); + void AdjustDataKeyStats(const TableName &table_name, + int64_t size_delta, + int64_t dirty_delta); + + std::pair GetDataKeyStats() const; + void InitializeShardHeap() { if (shard_heap_thread_id_ == 0) @@ -1281,6 +1287,11 @@ class CcShard // The number of ccentry in all the ccmap of this ccshard. uint64_t size_; + // The number of keys in data tables only (meta tables excluded). + size_t data_key_count_{0}; + // The number of committed dirty keys in data tables only. + size_t dirty_data_key_count_{0}; + Checkpointer *ckpter_; /** diff --git a/tx_service/include/cc/local_cc_shards.h b/tx_service/include/cc/local_cc_shards.h index a0967685..961bee52 100644 --- a/tx_service/include/cc/local_cc_shards.h +++ b/tx_service/include/cc/local_cc_shards.h @@ -352,43 +352,45 @@ class LocalCcShards void PrintCcMap() { - std::unordered_map - mapsizes; // not string owner, sv -> native_ccms_ - for (const auto &cc_shard : cc_shards_) - { - CcShard &shard = *cc_shard; - - for (auto map_iter = shard.native_ccms_.begin(); - map_iter != shard.native_ccms_.end(); - ++map_iter) - { - const TableName &tab_name = map_iter->first; - auto find_iter = mapsizes.find(tab_name); - size_t cnt = 0; - if (find_iter != mapsizes.end()) - { - cnt = find_iter->second; - cnt += map_iter->second->size(); - find_iter->second = cnt; - } - else - { - mapsizes.emplace( - std::piecewise_construct, - std::forward_as_tuple(tab_name.StringView(), - tab_name.Type(), - tab_name.Engine()), - std::forward_as_tuple(map_iter->second->size())); - } - - std::cout << "Table '" << tab_name.StringView() << "' core ID " - << shard.core_id_ << ": " << map_iter->second->size() - << std::endl; - - assert(map_iter->second->VerifyOrdering() == - map_iter->second->size()); - } - } + // std::unordered_map + // mapsizes; // not string owner, sv -> native_ccms_ + // for (const auto &cc_shard : cc_shards_) + // { + // CcShard &shard = *cc_shard; + + // for (auto map_iter = shard.native_ccms_.begin(); + // map_iter != shard.native_ccms_.end(); + // ++map_iter) + // { + // const TableName &tab_name = map_iter->first; + // auto find_iter = mapsizes.find(tab_name); + // size_t cnt = 0; + // if (find_iter != mapsizes.end()) + // { + // cnt = find_iter->second; + // cnt += map_iter->second->size(); + // find_iter->second = cnt; + // } + // else + // { + // mapsizes.emplace( + // std::piecewise_construct, + // std::forward_as_tuple(tab_name.StringView(), + // tab_name.Type(), + // tab_name.Engine()), + // std::forward_as_tuple(map_iter->second->size())); + // } + + // std::cout << "Table '" << tab_name.StringView() << "' core ID + // " + // << shard.core_id_ << ": " << + // map_iter->second->size() + // << std::endl; + + // assert(map_iter->second->VerifyOrdering() == + // map_iter->second->size()); + // } + // } /*for (auto map_iter = mapsizes.begin(); map_iter != mapsizes.end(); ++map_iter) diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index 03617dc8..8f692e2c 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -959,8 +959,10 @@ class ObjectCcMap : public TemplateCcMap cce->ReleaseForwardEntry(); shard_->ForwardStandbyMessage(entry_ptr.release()); } + bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(commit_ts, RecordStatus::Deleted); + this->OnCommittedUpdate(cce, was_dirty); // Release and try to recycle the lock. assert(acquired_lock != LockType::NoLock); ReleaseCceLock( @@ -1214,7 +1216,9 @@ class ObjectCcMap : public TemplateCcMap // PostWriteCc if apply_and_commit_. const uint64_t commit_ts = std::max({cce->CommitTs() + 1, req.TxTs(), shard_->Now()}); + bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(commit_ts, status); + this->OnCommittedUpdate(cce, was_dirty); if (forward_entry) { @@ -1426,7 +1430,9 @@ class ObjectCcMap : public TemplateCcMap cce->ReleaseForwardEntry(); shard_->ForwardStandbyMessage(entry_ptr.release()); } + bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(commit_ts, payload_status); + this->OnCommittedUpdate(cce, was_dirty); // It's possible that the cce HasBufferedCommandList and is still in // unknown status (because FetchRecord fails) and this command // ignores kv value. Need to clear the buffered commands when a new @@ -1647,6 +1653,7 @@ class ObjectCcMap : public TemplateCcMap ttl = 0; } + bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(commit_ts, rec_status); if (req.Kind() == UploadBatchType::DirtyBucketData) { @@ -1682,6 +1689,10 @@ class ObjectCcMap : public TemplateCcMap { cce->SetCommitTsPayloadStatus(commit_ts, RecordStatus::Deleted); } + // Since we have updated both ckpt ts and commit ts, we need to call + // OnFlushed to update the dirty size. + this->OnFlushed(cce, was_dirty); + this->OnCommittedUpdate(cce, was_dirty); DLOG_IF(INFO, TRACE_OCC_ERR) << "UploadBatchCc, txn:" << req.Txn() << " ,cce: " << cce << " ,commit_ts: " << commit_ts; @@ -1775,8 +1786,10 @@ class ObjectCcMap : public TemplateCcMap assert(txn_cmd.new_version_ > cce->CommitTs()); int64_t buffered_cmd_cnt_old = buffered_cmd_list.Size(); + bool was_dirty = cce->IsDirty(); cce->EmplaceAndCommitBufferedTxnCommand( txn_cmd, shard_->NowInMilliseconds()); + this->OnCommittedUpdate(cce, was_dirty); int64_t buffered_cmd_cnt_new = buffered_cmd_list.Size(); shard_->UpdateBufferedCommandCnt(buffered_cmd_cnt_new - buffered_cmd_cnt_old); @@ -1942,7 +1955,9 @@ class ObjectCcMap : public TemplateCcMap cce->payload_.cur_payload_ == nullptr ? RecordStatus::Deleted : RecordStatus::Normal; + bool was_dirty = (cce->CommitTs() > 1 && !cce->IsPersistent()); cce->SetCommitTsPayloadStatus(commit_ts, payload_status); + this->OnCommittedUpdate(cce, was_dirty); if (s_obj_exist && payload_status != RecordStatus::Normal) { TemplateCcMap::normal_obj_sz_--; @@ -1976,8 +1991,10 @@ class ObjectCcMap : public TemplateCcMap // Emplace txn_cmd and try to commit all pending commands. int64_t buffered_cmd_cnt_old = buffered_cmd_list.Size(); + bool was_dirty = (cce->CommitTs() > 1 && !cce->IsPersistent()); cce->EmplaceAndCommitBufferedTxnCommand( txn_cmd, shard_->NowInMilliseconds()); + this->OnCommittedUpdate(cce, was_dirty); RecordStatus new_status = cce->PayloadStatus(); if (s_obj_exist && new_status != RecordStatus::Normal) { @@ -2243,6 +2260,7 @@ class ObjectCcMap : public TemplateCcMap uint64_t current_version = cce->CommitTs(); RecordStatus payload_status = cce->PayloadStatus(); bool s_obj_exist = (payload_status == RecordStatus::Normal); + bool was_dirty = cce->IsDirty(); if (commit_ts <= current_version) { // If the log record's commit ts is smaller than or equal to the @@ -2409,6 +2427,8 @@ class ObjectCcMap : public TemplateCcMap ++TemplateCcMap::normal_obj_sz_; } + this->OnCommittedUpdate(cce, was_dirty); + // Must update dirty_commit_ts. Otherwise, this entry may be // skipped by checkpointer. if (commit_ts > last_dirty_commit_ts_) @@ -2559,6 +2579,7 @@ class ObjectCcMap : public TemplateCcMap } uint64_t commit_version = commit_ts; + bool was_dirty = cce->IsDirty(); cce->TryCommitBufferedCommands(commit_version, shard_->NowInMilliseconds()); int64_t buffered_cmd_cnt_new = buffered_cmd_list.Size(); @@ -2569,6 +2590,7 @@ class ObjectCcMap : public TemplateCcMap ? RecordStatus::Deleted : RecordStatus::Normal; cce->SetCommitTsPayloadStatus(commit_version, commit_status); + this->OnCommittedUpdate(cce, was_dirty); if (buffered_cmd_list.Empty()) { diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 249908d6..f2a00630 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -609,6 +609,7 @@ class TemplateCcMap : public CcMap RecordStatus cce_old_status = cce->PayloadStatus(); RecordStatus new_status = is_del ? RecordStatus::Deleted : RecordStatus::Normal; + bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(commit_ts, new_status); if (req.IsInitialInsert()) @@ -616,6 +617,7 @@ class TemplateCcMap : public CcMap // Updates the ckpt ts after commit ts is set. cce->SetCkptTs(1U); } + OnCommittedUpdate(cce, was_dirty); DLOG_IF(INFO, TRACE_OCC_ERR) << "PostWriteCc, txn:" << txn << " ,cce: " << cce @@ -1125,7 +1127,9 @@ class TemplateCcMap : public CcMap ? RecordStatus::Deleted : RecordStatus::Normal; + bool was_dirty = cce_ptr->IsDirty(); cce_ptr->SetCommitTsPayloadStatus(commit_ts, status); + OnCommittedUpdate(cce_ptr, was_dirty); } else if (req.CommitType() == PostWriteType::PrepareCommit) { @@ -2022,6 +2026,7 @@ class TemplateCcMap : public CcMap tmp_payload_status = RecordStatus::Deleted; } + bool was_dirty = cce->IsDirty(); if (cce->PayloadStatus() == RecordStatus::Unknown) { cce->payload_.PassInCurrentPayload(std::move(tmp_payload)); @@ -2041,6 +2046,8 @@ class TemplateCcMap : public CcMap // Updates the ckpt timestamp such that it is no smaller than the // backfill version. cce->SetCkptTs(req.ReadTimestamp()); + OnFlushed(cce, was_dirty); + OnCommittedUpdate(cce, was_dirty); // Refill mvcc archives if (shard_->EnableMvcc() && @@ -2215,6 +2222,7 @@ class TemplateCcMap : public CcMap CcEntry *>( cce_addr.ExtractCce()); + bool was_dirty = cce->IsDirty(); if (cce->PayloadStatus() == RecordStatus::Unknown) { assert(cce->CommitTs() == 1); @@ -2242,6 +2250,7 @@ class TemplateCcMap : public CcMap // Updates the ckpt timestamp such that it is no smaller than the // backfill version. cce->SetCkptTs(req.CommitTs()); + OnFlushed(cce, was_dirty); // Refill mvcc archives. if (shard_->EnableMvcc()) @@ -7004,7 +7013,9 @@ class TemplateCcMap : public CcMap rec_status = RecordStatus::Deleted; } const uint64_t commit_ts = req.CommitTs(); + bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(commit_ts, rec_status); + OnCommittedUpdate(cce, was_dirty); if (commit_ts > last_dirty_commit_ts_) { @@ -7754,6 +7765,7 @@ class TemplateCcMap : public CcMap } } + bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(commit_ts, rec_status); if (req.Kind() == UploadBatchType::DirtyBucketData) { @@ -7767,6 +7779,8 @@ class TemplateCcMap : public CcMap } cce->SetCkptTs(commit_ts); } + OnCommittedUpdate(cce, was_dirty); + OnFlushed(cce, was_dirty); DLOG_IF(INFO, TRACE_OCC_ERR) << "UploadBatchCc, txn:" << req.Txn() << " ,cce: " << cce << " ,commit_ts: " << commit_ts; @@ -7962,7 +7976,9 @@ class TemplateCcMap : public CcMap if (status == RecordStatus::Normal || status == RecordStatus::Deleted) { + bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(now_ts, status); + OnCommittedUpdate(cce, was_dirty); assert(!cce->HasBufferedCommandList()); } assert(!cce->HasBufferedCommandList() || @@ -7974,7 +7990,9 @@ class TemplateCcMap : public CcMap // last ckpt ts. if (cce->CommitTs() <= ckpt_ts) { + bool was_dirty = cce->IsDirty(); cce->SetCkptTs(cce->CommitTs()); + OnFlushed(cce, was_dirty); } } it++; @@ -8633,9 +8651,12 @@ class TemplateCcMap : public CcMap return true; } - size_t size() const override + void OnEntryFlushed(bool was_dirty, bool is_persistent) override { - return size_; + if (was_dirty && is_persistent) + { + shard_->AdjustDataKeyStats(table_name_, 0, -1); + } } void CleanEntry(LruEntry *entry, LruPage *page) override @@ -8652,8 +8673,9 @@ class TemplateCcMap : public CcMap // position to update the map. page_it = ccmp_.find(ccpage->FirstKey()); } + const bool was_dirty = cce->IsDirty(); + shard_->AdjustDataKeyStats(table_name_, -1, was_dirty ? -1 : 0); ccpage->Remove(cce); - size_--; RebalancePage(ccpage, page_it, true); } @@ -8704,6 +8726,8 @@ class TemplateCcMap : public CcMap void Clean() override { + size_t total_freed = 0; + size_t dirty_freed = 0; for (auto it = ccmp_.begin(); it != ccmp_.end(); it++) { CcPage &page = @@ -8712,15 +8736,26 @@ class TemplateCcMap : public CcMap { shard_->DetachLru(&page); } + total_freed += page.Size(); for (auto &cce : page.entries_) { + if (cce->IsDirty()) + { + ++dirty_freed; + } cce->ClearLocks(*shard_, cc_ng_id_); } } + if (total_freed > 0 || dirty_freed > 0) + { + shard_->AdjustDataKeyStats(table_name_, + -static_cast(total_freed), + -static_cast(dirty_freed)); + } + normal_obj_sz_ = 0; - size_ = 0; ccmp_.clear(); } @@ -8737,8 +8772,13 @@ class TemplateCcMap : public CcMap shard_->DetachLru(page); } + size_t dirty_freed = 0; for (auto &cce : page->entries_) { + if (cce->IsDirty()) + { + ++dirty_freed; + } if (!VersionedRecord && cce->PayloadStatus() == RecordStatus::Normal) { @@ -8747,7 +8787,9 @@ class TemplateCcMap : public CcMap cce->ClearLocks(*shard_, cc_ng_id_); } - size_ -= page->Size(); + shard_->AdjustDataKeyStats(table_name_, + -static_cast(page->Size()), + -static_cast(dirty_freed)); cnt += 1; it = ccmp_.erase(it); @@ -8759,7 +8801,6 @@ class TemplateCcMap : public CcMap return false; } - assert(size_ == 0); assert(VersionedRecord || normal_obj_sz_ == 0); return true; } @@ -9116,9 +9157,12 @@ class TemplateCcMap : public CcMap CcPage *ccp = it.GetPage(); // randomly set ckpt_ts and commit_ts + bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(distribution(generator), RecordStatus::Normal); cce->SetCkptTs(distribution(generator)); + OnFlushed(cce, was_dirty); + OnCommittedUpdate(cce, was_dirty); ccp->last_dirty_commit_ts_ = std::max(cce->CommitTs(), ccp->last_dirty_commit_ts_); } @@ -9126,6 +9170,23 @@ class TemplateCcMap : public CcMap } protected: + void OnCommittedUpdate( + const CcEntry *cce, + bool was_dirty) + { + if (!was_dirty && cce->IsDirty()) + { + shard_->AdjustDataKeyStats(table_name_, 0, +1); + } + } + + void OnFlushed( + const CcEntry *cce, + bool was_dirty) + { + OnEntryFlushed(was_dirty, cce->IsPersistent()); + } + // The CcEntry defragment result enum DefragResult { @@ -9986,7 +10047,8 @@ class TemplateCcMap : public CcMap { normal_obj_sz_ += normal_rec_change; } - size_ += (end_idx - first_index); + shard_->AdjustDataKeyStats( + table_name_, +(end_idx - first_index), 0); return true; } @@ -10023,7 +10085,7 @@ class TemplateCcMap : public CcMap assert(new_key_cnt > 0); // Update CCMap size - size_ += new_key_cnt; + shard_->AdjustDataKeyStats(table_name_, +new_key_cnt, 0); } else { @@ -10112,7 +10174,7 @@ class TemplateCcMap : public CcMap assert(new_key_cnt > 0); // Update Ccmap size - size_ += new_key_cnt; + shard_->AdjustDataKeyStats(table_name_, +new_key_cnt, 0); } if (!VersionedRecord) @@ -10351,7 +10413,7 @@ class TemplateCcMap : public CcMap // update lru list shard_->UpdateLruList(target_page, true); - size_++; + shard_->AdjustDataKeyStats(table_name_, +1, 0); return Iterator(target_page, idx_in_page, &neg_inf_); } @@ -10388,7 +10450,9 @@ class TemplateCcMap : public CcMap } const uint64_t cce_version = cce->CommitTs(); + bool was_dirty = cce->IsDirty(); cce->SetCkptTs(commit_ts); + OnFlushed(cce, was_dirty); if (cce_version < commit_ts) { @@ -11525,7 +11589,10 @@ class TemplateCcMap : public CcMap { normal_obj_sz_ -= clean_guard->CleanObjectCount(); } - size_ -= clean_guard->FreedCount(); + shard_->AdjustDataKeyStats( + table_name_, + -static_cast(clean_guard->FreedCount()), + -static_cast(clean_guard->DirtyFreedCount())); if (clean_guard->EvictedValidKeys()) { ccm_has_full_entries_ = false; @@ -11838,8 +11905,6 @@ class TemplateCcMap : public CcMap CcEntry neg_inf_, pos_inf_; TemplateCcMapSamplePool *sample_pool_; - size_t - size_{}; // The count of all records, including ones in deleted status. size_t normal_obj_sz_{ 0}; // The count of all normal status objects, only used for redis }; diff --git a/tx_service/src/cc/cc_entry.cpp b/tx_service/src/cc/cc_entry.cpp index 07822716..27e8c861 100644 --- a/tx_service/src/cc/cc_entry.cpp +++ b/tx_service/src/cc/cc_entry.cpp @@ -79,6 +79,28 @@ bool VersionedLruEntry::IsPersistent() const } } +template +bool VersionedLruEntry::IsDirty() const +{ + // Only check CommitTs > 1 to exclude initial entries + if (CommitTs() <= 1) + { + return false; + } + + if (Versioned) + { + // For versioned records, dirty means CommitTs > CkptTs + return CommitTs() > CkptTs(); + } + else + { + // For non-versioned records, dirty means the flush bit (5th bit) is not + // set + return !(entry_info_.commit_ts_and_status_ & 0x10); + } +} + template bool VersionedLruEntry::IsFree() const { diff --git a/tx_service/src/cc/cc_req_misc.cpp b/tx_service/src/cc/cc_req_misc.cpp index cd3433e3..ead420ac 100644 --- a/tx_service/src/cc/cc_req_misc.cpp +++ b/tx_service/src/cc/cc_req_misc.cpp @@ -1195,47 +1195,62 @@ bool UpdateCceCkptTsCc::Execute(CcShard &ccs) size_t last_index = std::min(index + SCAN_BATCH_SIZE, records.size()); + CcMap *ccm = ccs.GetCcm(table_name_, node_group_id_); + assert(ccm != nullptr); + + bool range_partitioned = !table_name_.IsHashPartitioned(); + bool versioned_payload = table_name_.Engine() != TableEngine::EloqKv; + for (; index < last_index; ++index) { const CkptTsEntry &ref = records[index]; - if (range_partitioned_) + if (range_partitioned) { - if (versioned_payload_) + if (versioned_payload) { VersionedLruEntry *v_entry = static_cast *>(ref.cce_); + + assert(v_entry->CommitTs() > 1 && !v_entry->IsPersistent()); v_entry->entry_info_.SetDataStoreSize(ref.post_flush_size_); v_entry->SetCkptTs(ref.commit_ts_); v_entry->ClearBeingCkpt(); + ccm->OnEntryFlushed(true, v_entry->IsPersistent()); } else { VersionedLruEntry *v_entry = static_cast *>(ref.cce_); + assert(v_entry->CommitTs() > 1 && !v_entry->IsPersistent()); v_entry->entry_info_.SetDataStoreSize(ref.post_flush_size_); v_entry->SetCkptTs(ref.commit_ts_); v_entry->ClearBeingCkpt(); + ccm->OnEntryFlushed(true, v_entry->IsPersistent()); } } else { - if (versioned_payload_) + if (versioned_payload) { VersionedLruEntry *v_entry = static_cast *>(ref.cce_); + assert(v_entry->CommitTs() > 1 && !v_entry->IsPersistent()); v_entry->SetCkptTs(ref.commit_ts_); v_entry->ClearBeingCkpt(); + ccm->OnEntryFlushed(true, v_entry->IsPersistent()); } else { VersionedLruEntry *v_entry = static_cast *>(ref.cce_); + assert(v_entry->CommitTs() > 1 && !v_entry->IsPersistent()); v_entry->SetCkptTs(ref.commit_ts_); v_entry->ClearBeingCkpt(); + ccm->OnEntryFlushed(true, v_entry->IsPersistent()); } } } diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index 5d98d73e..c61cc0dc 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -377,6 +377,37 @@ CcMap *CcShard::GetCcm(const TableName &table_name, uint32_t node_group) } } +void CcShard::AdjustDataKeyStats(const TableName &table_name, + int64_t size_delta, + int64_t dirty_delta) +{ + if (table_name.IsMeta()) + { + return; + } + + if (size_delta != 0) + { + assert(size_delta >= 0 || + data_key_count_ >= static_cast(-size_delta)); + data_key_count_ = static_cast( + static_cast(data_key_count_) + size_delta); + } + + if (dirty_delta != 0) + { + assert(dirty_delta >= 0 || + dirty_data_key_count_ >= static_cast(-dirty_delta)); + dirty_data_key_count_ = static_cast( + static_cast(dirty_data_key_count_) + dirty_delta); + } +} + +std::pair CcShard::GetDataKeyStats() const +{ + return {data_key_count_, dirty_data_key_count_}; +} + void CcShard::Enqueue(uint32_t thd_id, CcRequestBase *req) { // The memory order in enqueue() of the concurrent queue ensures that the diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index ee41e929..392644ef 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -5869,9 +5869,8 @@ void LocalCcShards::FlushData(std::unique_lock &flush_worker_lk) UpdateCceCkptTsCc update_cce_req( entry->data_sync_task_->node_group_id_, entry->data_sync_task_->node_group_term_, - cce_entries_map, - !table_name.IsHashPartitioned(), - table_name.Engine() != TableEngine::EloqKv); + table_name, + cce_entries_map); for (auto &[core_idx, cce_entries] : cce_entries_map) { updated_ckpt_ts_core_ids.insert(core_idx);