From a43202554b0c40a68b7743c87b56505cc63bdff0 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Tue, 20 Jan 2026 14:39:21 +0800 Subject: [PATCH] Skip ccpage that has no dirty keys since last datasync during datasyncscan remove useless variable --- tx_service/include/cc/cc_request.h | 17 ++-- tx_service/include/cc/template_cc_map.h | 100 +++++++++++++++++++++--- tx_service/src/cc/local_cc_shards.cpp | 2 +- tx_service/src/sk_generator.cpp | 2 +- 4 files changed, 104 insertions(+), 17 deletions(-) diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index b7385571..de3a6d7a 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -4026,7 +4026,7 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase uint64_t txn, const TxKey *target_start_key, const TxKey *target_end_key, - bool include_persisted_data, + uint64_t last_data_sync_ts = 0, bool export_base_table_item = false, bool export_base_table_item_only = false, StoreRange *store_range = nullptr, @@ -4037,6 +4037,7 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase node_group_id_(node_group_id), node_group_term_(node_group_term), core_cnt_(core_cnt), + last_data_sync_ts_(last_data_sync_ts), data_sync_ts_(data_sync_ts), start_key_(target_start_key), end_key_(target_end_key), @@ -4045,7 +4046,6 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase unfinished_cnt_(core_cnt_), mux_(), cv_(), - include_persisted_data_(include_persisted_data), export_base_table_item_(export_base_table_item), slice_coordinator_(export_base_table_item_, &slices_to_scan_), export_base_table_item_only_(export_base_table_item_only), @@ -4332,6 +4332,7 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase { assert(!export_base_table_item_); size_t curr_slice_index = curr_slice_index_[core_id]; + assert(curr_slice_index < slices_to_scan_.size()); return slices_to_scan_[curr_slice_index].first; } @@ -4405,6 +4406,11 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase return slice_coordinator_.IsEndSlice(); } + uint64_t LastDataSyncTs() const + { + return last_data_sync_ts_; + } + std::vector accumulated_scan_cnt_; std::vector accumulated_flush_data_size_; @@ -4532,6 +4538,10 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase uint32_t node_group_id_; int64_t node_group_term_; uint16_t core_cnt_; + // It is used as a hint to decide if a page has dirty data since last round + // of checkpoint. It is guaranteed that all entries committed before this ts + // are synced into data store. + uint64_t last_data_sync_ts_; // Target ts. Collect all data changes committed before this ts into data // sync vec. uint64_t data_sync_ts_; @@ -4554,9 +4564,6 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase uint32_t unfinished_cnt_; std::mutex mux_; std::condition_variable cv_; - // True means If no larger version exists, we need to export the data which - // commit_ts same as ckpt_ts. - bool include_persisted_data_{false}; // True means we need to export the data in memory and in kv to ckpt vec. // Note: This is only used in range partition. diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index dc0a9075..249908d6 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -5429,6 +5429,26 @@ class TemplateCcMap : public CcMap slice->PostCkptSize() > slice->Size()); }; + auto next_page_it = [this](Iterator &end_it) -> Iterator + { + Iterator it = end_it; + if (it != End()) + { + CcPage *ccp = + it.GetPage(); + assert(ccp != nullptr); + if (ccp->next_page_ == PagePosInf()) + { + it = End(); + } + else + { + it = Iterator(ccp->next_page_, 0, &neg_inf_); + } + } + return it; + }; + const KeyT *const req_start_key = req.start_key_ != nullptr ? req.start_key_->GetKey() : KeyT::NegativeInfinity(); @@ -5609,6 +5629,12 @@ class TemplateCcMap : public CcMap std::tie(key_it, slice_end_it, slice_end_key) = find_non_empty_slice(*search_start_key); + // Since we might skip the page that slice_end_it is located on if it's + // not updated since last ckpt, it might skip slice_end_it. If the last + // page is skipped, the key_it will be set as the first entry on the + // next page. Also check if (key_it == slice_end_next_page_it). + Iterator slice_end_next_page_it = next_page_it(slice_end_it); + uint64_t recycle_ts = 1U; if (shard_->EnableMvcc() && !req.export_base_table_item_only_) { @@ -5633,7 +5659,7 @@ class TemplateCcMap : public CcMap // blocking other transaction for a long time, we only process // CkptScanBatch number of pages in each round. for (size_t scan_cnt = 0; - key_it != slice_end_it && + key_it != slice_end_it && key_it != slice_end_next_page_it && scan_cnt < RangePartitionDataSyncScanCc::DataSyncScanBatchSize && req.accumulated_scan_cnt_.at(shard_->core_id_) < req.scan_batch_size_; @@ -5642,6 +5668,54 @@ class TemplateCcMap : public CcMap const KeyT *key = key_it->first; CcEntry *cce = key_it->second; + CcPage *ccp = + key_it.GetPage(); + assert(ccp); + // Only in the following scenarios, we can skip the ccpage if the + // page has no dirty keys since last datasync: 1) The scan is in the + // normal checkpoint mode(export_base_table_item_ = false). And 2) + // The slice is no need to split(slice_pinned = false). + if (!req.export_base_table_item_ && !slice_pinned && + ccp->last_dirty_commit_ts_ <= req.LastDataSyncTs()) + { + // There is no dirty keys in this ccpage since last datasync. + // Just skip this ccpage. + assert(!cce->NeedCkpt()); + if (ccp->next_page_ == PagePosInf()) + { + key_it = End(); + } + else + { + key_it = Iterator(ccp->next_page_, 0, &neg_inf_); + } + + // Check the slice iterator. + if (key_it == slice_end_it || key_it == slice_end_next_page_it) + { + // Reach to the end of current slice. + // Move to the next slice. + req.MoveToNextSlice(shard_->core_id_); + if (!req.TheBatchEnd(shard_->core_id_)) + { + search_start_key = slice_end_key; + std::tie(key_it, slice_end_it, slice_end_key) = + find_non_empty_slice(*search_start_key); + + slice_end_next_page_it = next_page_it(slice_end_it); + + // If reach to the batch end, it means there are no + // slices that need to be scanned. + slice_pinned = + req.TheBatchEnd(shard_->core_id_) + ? false + : req.IsSlicePinned(shard_->core_id_); + export_persisted_key_only = + !req.export_base_table_item_ && slice_pinned; + } + } + continue; + } if (shard_->EnableMvcc() && !req.export_base_table_item_only_) { @@ -5652,8 +5726,7 @@ class TemplateCcMap : public CcMap // size. bool need_export = cce->CommitTs() <= req.data_sync_ts_ && (slice_pinned || cce->NeedCkpt()); - if (RangePartitioned && - cce->entry_info_.DataStoreSize() == INT32_MAX && need_export) + if (cce->entry_info_.DataStoreSize() == INT32_MAX && need_export) { if (slice_pinned) { @@ -5727,6 +5800,8 @@ class TemplateCcMap : public CcMap std::tie(key_it, slice_end_it, slice_end_key) = find_non_empty_slice(*search_start_key); + slice_end_next_page_it = next_page_it(slice_end_it); + // If reach to the batch end, it means there are no slices // that need to be scanned. slice_pinned = req.TheBatchEnd(shard_->core_id_) @@ -5741,17 +5816,22 @@ class TemplateCcMap : public CcMap // The conditions for exiting the main loop are: reaching the maximum // scan batch size, or reach to the end slice of the current batch // slices. - assert(key_it != slice_end_it || req.TheBatchEnd(shard_->core_id_)); + assert((key_it != slice_end_it && key_it != slice_end_next_page_it) || + req.TheBatchEnd(shard_->core_id_)); // 4. Check whether the request is finished. TxKey next_pause_key; - bool no_more_data = (key_it == slice_end_it) && req.IsLastBatch(); + bool no_more_data = + (key_it == slice_end_it || key_it == slice_end_next_page_it) && + req.IsLastBatch(); if (!no_more_data) { - // If key_it == slice_end_it, it means the current slice is - // completed. The paused key should be the slice end key. - next_pause_key = key_it != slice_end_it - ? key_it->first->CloneTxKey() - : slice_end_key->CloneTxKey(); + // If key_it == slice_end_it or key_it == slice_end_next_page_it, + // it means the current slice is completed. The paused key should be + // the slice end key. + next_pause_key = + (key_it != slice_end_it && key_it != slice_end_next_page_it) + ? key_it->first->CloneTxKey() + : slice_end_key->CloneTxKey(); } // Set the pause_pos_ to mark resume position. diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index cff3739d..ee41e929 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -4064,7 +4064,7 @@ void LocalCcShards::DataSyncForRangePartition( tx_number, &start_tx_key, &end_tx_key, - false, + last_sync_ts, export_base_table_items, false, store_range, diff --git a/tx_service/src/sk_generator.cpp b/tx_service/src/sk_generator.cpp index cdbea6f6..e3fc928e 100644 --- a/tx_service/src/sk_generator.cpp +++ b/tx_service/src/sk_generator.cpp @@ -329,7 +329,7 @@ void SkGenerator::ScanAndEncodeIndex(const TxKey *start_key, tx_number, start_key, end_key, - false, + 0, true, true);