diff --git a/store_handler/data_store_service_client_closure.cpp b/store_handler/data_store_service_client_closure.cpp index efc27dce..09deb168 100644 --- a/store_handler/data_store_service_client_closure.cpp +++ b/store_handler/data_store_service_client_closure.cpp @@ -1276,17 +1276,16 @@ void LoadRangeSliceCallback(void *data, std::string key_str, value_str; uint64_t ts, ttl; + uint64_t snapshot_ts = fill_store_slice_req->SnapshotTs(); for (uint32_t i = 0; i < items_size; i++) { scan_next_closure->GetItem(i, key_str, value_str, ts, ttl); - txservice::TxKey key = - catalog_factory->CreateTxKey(key_str.data(), key_str.size()); - std::unique_ptr record = - catalog_factory->CreateTxRecord(); bool is_deleted = false; + std::unique_ptr record = nullptr; if (table_name.Engine() == txservice::TableEngine::EloqKv) { // Hash partition + record = catalog_factory->CreateTxRecord(); txservice::TxObject *tx_object = reinterpret_cast(record.get()); size_t offset = 0; @@ -1308,18 +1307,47 @@ void LoadRangeSliceCallback(void *data, std::abort(); } + if ((snapshot_ts == 0 && is_deleted) || + (snapshot_ts > 0 && snapshot_ts > ts && is_deleted)) + { + // if it is not a snapshot read, there is no need to return + // deleted keys. + // If it is a snapshot read and the latest version is newer than + // snapshot read ts, we need to backfill deleted key since there + // might be visible archive version of this key for snapshot ts. + // The caller will decide if reading archive table is necessary + // based on the deleted key version. + if (i == items_size - 1 && + scan_next_closure->ItemsSize() == 1000) + { + // Record the last key of the current batch as the start key + // of the next batch. kv_start_key_ is a string view, so we + // need to create a new TxKey object as its owner. + txservice::TxKey key = catalog_factory->CreateTxKey( + key_str.data(), key_str.size()); + fill_store_slice_req->kv_start_key_ = + std::string_view(key.Data(), key.Size()); + fill_store_slice_req->kv_start_key_owner_ = std::move(key); + } + continue; + } + + record = catalog_factory->CreateTxRecord(); if (!is_deleted) { record->Deserialize(value_str.data(), offset); } } - if (i == items_size - 1) + txservice::TxKey key = + catalog_factory->CreateTxKey(key_str.data(), key_str.size()); + if (i == items_size - 1 && scan_next_closure->ItemsSize() == 1000) { + // Record the last key of the current batch as the start key of the + // next batch. fill_store_slice_req->kv_start_key_ = std::string_view(key.Data(), key.Size()); } - fill_store_slice_req->AddDataItem( std::move(key), std::move(record), ts, is_deleted); } diff --git a/store_handler/eloq_data_store_service/eloqstore b/store_handler/eloq_data_store_service/eloqstore index a94159ab..67c86296 160000 --- a/store_handler/eloq_data_store_service/eloqstore +++ b/store_handler/eloq_data_store_service/eloqstore @@ -1 +1 @@ -Subproject commit a94159ab298a8b13d20dcd07b4c66336267746ec +Subproject commit 67c86296b89a3d677d877053237d187d1c2ce0d7 diff --git a/tx_service/include/cc/cc_req_misc.h b/tx_service/include/cc/cc_req_misc.h index d3fa883c..2208153a 100644 --- a/tx_service/include/cc/cc_req_misc.h +++ b/tx_service/include/cc/cc_req_misc.h @@ -550,6 +550,7 @@ struct FillStoreSliceCc : public CcRequestBase public: // These variables only be used in DataStoreHandler const std::string *kv_table_name_{nullptr}; + TxKey kv_start_key_owner_; std::string_view kv_start_key_; std::string_view kv_end_key_; std::string kv_session_id_; diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 1a809f1c..8a0b5c53 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -5422,7 +5422,11 @@ class TemplateCcMap : public CcMap range_ptr->FindSlice(slice_key); assert(slice->PostCkptSize() != UINT64_MAX); - return slice->PostCkptSize() > StoreSlice::slice_upper_bound; + // Only need to split the slice when the post ckpt size of the slice + // is greater than the current size and greater than the slice upper + // bound. + return (slice->PostCkptSize() > StoreSlice::slice_upper_bound && + slice->PostCkptSize() > slice->Size()); }; const KeyT *const req_start_key = req.start_key_ != nullptr @@ -11723,6 +11727,11 @@ class TemplateCcMap : public CcMap // remove discarded page from the map // note that all iterators are invalid after the erasion ccmp_.erase(discarded_page_it); + // Update page1_it and page2_it since iterator is invalid after erase + page1_it = ccmp_.find(merged_page->FirstKey()); + assert(page1_it != ccmp_.end()); + assert(page1_it->second.get() == merged_page); + page2_it = ccmp_.end(); } CcPage *PageNegInf() diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index 73dddef8..d0e7c01d 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -5397,8 +5397,11 @@ void LocalCcShards::UpdateSlices(const TableName &table_name, : false; uint64_t slice_post_ckpt_size = curr_slice->PostCkptSize(); + // If post ckpt size of the slice is less than or equal to the current + // size, there is no need to split the slice. if (slice_post_ckpt_size == UINT64_MAX || - slice_post_ckpt_size <= StoreSlice::slice_upper_bound) + slice_post_ckpt_size <= StoreSlice::slice_upper_bound || + slice_post_ckpt_size <= curr_slice->Size()) { // Case 1: There is no unpersisted data in the current slice, so no // need to split the slice. Only to migrate this data from the old