Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions tx_service/include/cc/cc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -4026,7 +4026,7 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase
uint64_t txn,
const TxKey *target_start_key,
const TxKey *target_end_key,
bool include_persisted_data,
uint64_t last_data_sync_ts = 0,
bool export_base_table_item = false,
bool export_base_table_item_only = false,
StoreRange *store_range = nullptr,
Expand All @@ -4037,6 +4037,7 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase
node_group_id_(node_group_id),
node_group_term_(node_group_term),
core_cnt_(core_cnt),
last_data_sync_ts_(last_data_sync_ts),
data_sync_ts_(data_sync_ts),
start_key_(target_start_key),
end_key_(target_end_key),
Expand All @@ -4045,7 +4046,6 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase
unfinished_cnt_(core_cnt_),
mux_(),
cv_(),
include_persisted_data_(include_persisted_data),
export_base_table_item_(export_base_table_item),
slice_coordinator_(export_base_table_item_, &slices_to_scan_),
export_base_table_item_only_(export_base_table_item_only),
Expand Down Expand Up @@ -4332,6 +4332,7 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase
{
assert(!export_base_table_item_);
size_t curr_slice_index = curr_slice_index_[core_id];
assert(curr_slice_index < slices_to_scan_.size());
return slices_to_scan_[curr_slice_index].first;
}

Expand Down Expand Up @@ -4405,6 +4406,11 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase
return slice_coordinator_.IsEndSlice();
}

uint64_t LastDataSyncTs() const
{
return last_data_sync_ts_;
}

std::vector<size_t> accumulated_scan_cnt_;
std::vector<uint64_t> accumulated_flush_data_size_;

Expand Down Expand Up @@ -4532,6 +4538,10 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase
uint32_t node_group_id_;
int64_t node_group_term_;
uint16_t core_cnt_;
// It is used as a hint to decide if a page has dirty data since last round
// of checkpoint. It is guaranteed that all entries committed before this ts
// are synced into data store.
uint64_t last_data_sync_ts_;
// Target ts. Collect all data changes committed before this ts into data
// sync vec.
uint64_t data_sync_ts_;
Expand All @@ -4554,9 +4564,6 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase
uint32_t unfinished_cnt_;
std::mutex mux_;
std::condition_variable cv_;
// True means If no larger version exists, we need to export the data which
// commit_ts same as ckpt_ts.
bool include_persisted_data_{false};

// True means we need to export the data in memory and in kv to ckpt vec.
// Note: This is only used in range partition.
Expand Down
100 changes: 90 additions & 10 deletions tx_service/include/cc/template_cc_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -5429,6 +5429,26 @@ class TemplateCcMap : public CcMap
slice->PostCkptSize() > slice->Size());
};

auto next_page_it = [this](Iterator &end_it) -> Iterator
{
Iterator it = end_it;
if (it != End())
{
CcPage<KeyT, ValueT, VersionedRecord, RangePartitioned> *ccp =
it.GetPage();
assert(ccp != nullptr);
if (ccp->next_page_ == PagePosInf())
{
it = End();
}
else
{
it = Iterator(ccp->next_page_, 0, &neg_inf_);
}
}
return it;
};

const KeyT *const req_start_key = req.start_key_ != nullptr
? req.start_key_->GetKey<KeyT>()
: KeyT::NegativeInfinity();
Expand Down Expand Up @@ -5609,6 +5629,12 @@ class TemplateCcMap : public CcMap
std::tie(key_it, slice_end_it, slice_end_key) =
find_non_empty_slice(*search_start_key);

// Since we might skip the page that slice_end_it is located on if it's
// not updated since last ckpt, it might skip slice_end_it. If the last
// page is skipped, the key_it will be set as the first entry on the
// next page. Also check if (key_it == slice_end_next_page_it).
Iterator slice_end_next_page_it = next_page_it(slice_end_it);

uint64_t recycle_ts = 1U;
if (shard_->EnableMvcc() && !req.export_base_table_item_only_)
{
Expand All @@ -5633,7 +5659,7 @@ class TemplateCcMap : public CcMap
// blocking other transaction for a long time, we only process
// CkptScanBatch number of pages in each round.
for (size_t scan_cnt = 0;
key_it != slice_end_it &&
key_it != slice_end_it && key_it != slice_end_next_page_it &&
scan_cnt < RangePartitionDataSyncScanCc::DataSyncScanBatchSize &&
req.accumulated_scan_cnt_.at(shard_->core_id_) <
req.scan_batch_size_;
Expand All @@ -5642,6 +5668,54 @@ class TemplateCcMap : public CcMap
const KeyT *key = key_it->first;
CcEntry<KeyT, ValueT, VersionedRecord, RangePartitioned> *cce =
key_it->second;
CcPage<KeyT, ValueT, VersionedRecord, RangePartitioned> *ccp =
key_it.GetPage();
assert(ccp);
// Only in the following scenarios, we can skip the ccpage if the
// page has no dirty keys since last datasync: 1) The scan is in the
// normal checkpoint mode(export_base_table_item_ = false). And 2)
// The slice is no need to split(slice_pinned = false).
if (!req.export_base_table_item_ && !slice_pinned &&
ccp->last_dirty_commit_ts_ <= req.LastDataSyncTs())
{
// There is no dirty keys in this ccpage since last datasync.
// Just skip this ccpage.
assert(!cce->NeedCkpt());
if (ccp->next_page_ == PagePosInf())
{
key_it = End();
}
else
{
key_it = Iterator(ccp->next_page_, 0, &neg_inf_);
}

// Check the slice iterator.
if (key_it == slice_end_it || key_it == slice_end_next_page_it)
{
// Reach to the end of current slice.
// Move to the next slice.
req.MoveToNextSlice(shard_->core_id_);
if (!req.TheBatchEnd(shard_->core_id_))
{
search_start_key = slice_end_key;
std::tie(key_it, slice_end_it, slice_end_key) =
find_non_empty_slice(*search_start_key);

slice_end_next_page_it = next_page_it(slice_end_it);

// If reach to the batch end, it means there are no
// slices that need to be scanned.
slice_pinned =
req.TheBatchEnd(shard_->core_id_)
? false
: req.IsSlicePinned(shard_->core_id_);
export_persisted_key_only =
!req.export_base_table_item_ && slice_pinned;
}
}
continue;
}

if (shard_->EnableMvcc() && !req.export_base_table_item_only_)
{
Expand All @@ -5652,8 +5726,7 @@ class TemplateCcMap : public CcMap
// size.
bool need_export = cce->CommitTs() <= req.data_sync_ts_ &&
(slice_pinned || cce->NeedCkpt());
if (RangePartitioned &&
cce->entry_info_.DataStoreSize() == INT32_MAX && need_export)
if (cce->entry_info_.DataStoreSize() == INT32_MAX && need_export)
{
if (slice_pinned)
{
Expand Down Expand Up @@ -5727,6 +5800,8 @@ class TemplateCcMap : public CcMap
std::tie(key_it, slice_end_it, slice_end_key) =
find_non_empty_slice(*search_start_key);

slice_end_next_page_it = next_page_it(slice_end_it);

// If reach to the batch end, it means there are no slices
// that need to be scanned.
slice_pinned = req.TheBatchEnd(shard_->core_id_)
Expand All @@ -5741,17 +5816,22 @@ class TemplateCcMap : public CcMap
// The conditions for exiting the main loop are: reaching the maximum
// scan batch size, or reach to the end slice of the current batch
// slices.
assert(key_it != slice_end_it || req.TheBatchEnd(shard_->core_id_));
assert((key_it != slice_end_it && key_it != slice_end_next_page_it) ||
req.TheBatchEnd(shard_->core_id_));
// 4. Check whether the request is finished.
TxKey next_pause_key;
bool no_more_data = (key_it == slice_end_it) && req.IsLastBatch();
bool no_more_data =
(key_it == slice_end_it || key_it == slice_end_next_page_it) &&
req.IsLastBatch();
if (!no_more_data)
{
// If key_it == slice_end_it, it means the current slice is
// completed. The paused key should be the slice end key.
next_pause_key = key_it != slice_end_it
? key_it->first->CloneTxKey()
: slice_end_key->CloneTxKey();
// If key_it == slice_end_it or key_it == slice_end_next_page_it,
// it means the current slice is completed. The paused key should be
// the slice end key.
next_pause_key =
(key_it != slice_end_it && key_it != slice_end_next_page_it)
? key_it->first->CloneTxKey()
: slice_end_key->CloneTxKey();
}

// Set the pause_pos_ to mark resume position.
Expand Down
2 changes: 1 addition & 1 deletion tx_service/src/cc/local_cc_shards.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4064,7 +4064,7 @@ void LocalCcShards::DataSyncForRangePartition(
tx_number,
&start_tx_key,
&end_tx_key,
false,
last_sync_ts,
export_base_table_items,
false,
store_range,
Expand Down
2 changes: 1 addition & 1 deletion tx_service/src/sk_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ void SkGenerator::ScanAndEncodeIndex(const TxKey *start_key,
tx_number,
start_key,
end_key,
false,
0,
true,
true);

Expand Down