From f2c88c2f8fbfab3096968f80809a31bb6158ba2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=A2=E6=99=93=E9=98=B3?= Date: Mon, 2 Mar 2026 15:28:31 +0800 Subject: [PATCH 1/2] Large object LRU --- core/src/tx_service_init.cpp | 44 ++++++++++++ tx_service/include/cc/cc_entry.h | 32 ++++++++- tx_service/include/cc/cc_shard.h | 36 ++++++---- tx_service/include/cc/local_cc_shards.h | 19 ++++++ tx_service/include/cc/object_cc_map.h | 81 +++++++++++++++++++++- tx_service/include/tx_service.h | 13 +++- tx_service/include/type.h | 15 +++++ tx_service/src/cc/cc_shard.cpp | 90 ++++++++++++++++++++++--- 8 files changed, 302 insertions(+), 28 deletions(-) diff --git a/core/src/tx_service_init.cpp b/core/src/tx_service_init.cpp index ec8bb7bf..c5bff0db 100644 --- a/core/src/tx_service_init.cpp +++ b/core/src/tx_service_init.cpp @@ -21,6 +21,13 @@ DEFINE_int32(collect_active_tx_ts_interval_seconds, "Active transaction timestamp collection interval"); DEFINE_bool(kickout_data_for_test, false, "Kickout data for test"); DEFINE_bool(enable_key_cache, false, "Enable key cache"); +DEFINE_string(cache_evict_policy, + "LRU", + "Cache eviction policy. Optional policy includes LRU|LO_LRU." + "LO_LRU is a variant LRU which is adapt for large objects."); +DEFINE_uint32(lolru_large_obj_threshold_kb, + 1024, + "LO_LRU policy option. Threshold of large object in KB."); DEFINE_uint32(max_standby_lag, 400000, "txservice max msg lag between primary and standby"); @@ -223,6 +230,34 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) #endif } + txservice::CacheEvictPolicy cache_evict_policy; + uint32_t lolru_large_obj_threshold_kb; + std::string cache_evict_policy_str = + !CheckCommandLineFlagIsDefault("cache_evict_policy") + ? FLAGS_cache_evict_policy + : config_reader.GetString( + "local", "cache_evict_policy", FLAGS_cache_evict_policy); + if (cache_evict_policy_str == txservice::CacheEvictPolicyLRU::NAME) + { + cache_evict_policy = txservice::CacheEvictPolicy::LRU; + } + else if (cache_evict_policy_str == txservice::CacheEvictPolicyLoLRU::NAME) + { + cache_evict_policy = txservice::CacheEvictPolicy::LO_LRU; + lolru_large_obj_threshold_kb = + !CheckCommandLineFlagIsDefault("lolru_large_obj_threshold_kb") + ? FLAGS_lolru_large_obj_threshold_kb + : config_reader.GetInteger("local", + "lolru_large_obj_threshold_kb", + FLAGS_lolru_large_obj_threshold_kb); + } + else + { + LOG(ERROR) << "Invalidate `cache_evict_policy` " + << cache_evict_policy_str; + return false; + } + LOG(INFO) << "Data substrate memory limit: " << core_config_.node_memory_limit_mb << "MB"; @@ -288,6 +323,15 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) store_hd_->SetTxService(tx_service_.get()); } + if (cache_evict_policy == txservice::CacheEvictPolicy::LRU) + { + tx_service_->SetupPolicyLRU(); + } + else if (cache_evict_policy == txservice::CacheEvictPolicy::LO_LRU) + { + tx_service_->SetupPolicyLoLRU(lolru_large_obj_threshold_kb); + } + if (tx_service_->Start(network_config_.node_id, network_config_.native_ng_id, &network_config_.ng_configs, diff --git a/tx_service/include/cc/cc_entry.h b/tx_service/include/cc/cc_entry.h index 725a5a43..cad4087a 100644 --- a/tx_service/include/cc/cc_entry.h +++ b/tx_service/include/cc/cc_entry.h @@ -902,7 +902,6 @@ struct NonVersionedPayload ValueT tx_obj; cur_payload_.reset(static_cast( tx_obj.DeserializeObject(data, offset).release())); - ; } std::shared_ptr VersionedCurrentPayload() @@ -2041,6 +2040,9 @@ struct LruPage // without TTL set is UINT64_MAX. This value is used to decide if this page // needs to be scanned when purging deleted entries in memory. uint64_t smallest_ttl_{UINT64_MAX}; + + // Large object occupies one page exclusively. + bool large_obj_page_ : 1 {false}; }; template > entry) + { + // append check + auto insert_it = + keys_.size() > 0 && keys_.back() < key + ? keys_.end() + : std::lower_bound(keys_.begin(), keys_.end(), key); + assert(insert_it == keys_.end() || *insert_it != key); + + size_t insert_pos = insert_it - keys_.begin(); + keys_.emplace(insert_it, key); + entries_.emplace(entries_.begin() + insert_pos, entry); + return insert_pos; + } + /** * Find lower bound of the key. Return lower bound the the key, or return * keys_.size() if all keys is less than key. @@ -2532,6 +2552,16 @@ struct CcPage : public LruPage return Remove(idx); } + std::unique_ptr> + MoveEntry(size_t idx) + { + std::unique_ptr< + CcEntry> + entry = std::move(entries_[idx]); + assert(entry != nullptr); + return entry; + } + size_t Size() const { return keys_.size(); diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index ef1ed057..af72d89f 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -536,6 +536,10 @@ class CcShard return catalog_factory_[static_cast(table_engine) - 1]; } + CacheEvictPolicy GetCacheEvictPolicy() const; + + uint64_t LargeObjThresholdBytes() const; + /** * Insert page at the end of the lru list as the most-recently accessed * page. @@ -983,13 +987,6 @@ class CcShard return ng_id == ng_id_; } - // native node group - const uint16_t core_id_; - const uint16_t core_cnt_; - const NodeGroupId ng_id_; - std::atomic meta_data_mux_{}; - LocalCcShards &local_shards_; - bool EnableMvcc() const; void AddActiveSiTx(TxNumber txn, uint64_t start_ts); void RemoveActiveSiTx(TxNumber txn); @@ -1006,11 +1003,6 @@ class CcShard size_t ActiveBlockingTxSize() const; void RemoveExpiredActiveBlockingTxs(); - // shard level memory limit. - uint64_t memory_limit_{0}; - - const bool realtime_sampling_{true}; - // Search lock_holding_txs_, find the entrys with waited transactions and // save them into CheckDeadLockResult. void CollectLockWaitingInfo(CheckDeadLockResult &dlr); @@ -1190,6 +1182,19 @@ class CcShard void CollectCacheHit(); void CollectCacheMiss(); +public: + // native node group + const uint16_t core_id_; + const uint16_t core_cnt_; + const NodeGroupId ng_id_; + std::atomic meta_data_mux_{}; + LocalCcShards &local_shards_; + + // shard level memory limit. + uint64_t memory_limit_{0}; + + const bool realtime_sampling_{true}; + private: void SetTxProcNotifier(std::atomic *tx_proc_status, TxProcCoordinator *tx_coordi) @@ -1310,6 +1315,13 @@ class CcShard // Reserved head and tail for the double-linked list of cc entries, which // simplifies handling of empty and one-element lists. LruPage head_ccp_, tail_ccp_; + + // head --- [small pages] --- protected_head_ --- [large pages] -- tail + // + // Declare as a pointer instead of as a dummy page. Eviction policies(SLRU) + // might share it. + LruPage *protected_head_page_; + /** * @brief Each time a page is accessed and moved to the tail of the LRU * list, the counter is incremented and assigned to the page. Since in a diff --git a/tx_service/include/cc/local_cc_shards.h b/tx_service/include/cc/local_cc_shards.h index 961bee52..38e94d83 100644 --- a/tx_service/include/cc/local_cc_shards.h +++ b/tx_service/include/cc/local_cc_shards.h @@ -1813,6 +1813,18 @@ class LocalCcShards void FlushCurrentFlushBuffer(); + void SetupPolicyLRU() + { + cache_evict_policy_ = CacheEvictPolicy::LRU; + } + + void SetupPolicyLoLRU(uint32_t large_obj_threshold_kb) + { + cache_evict_policy_ = CacheEvictPolicy::LO_LRU; + u_cache_evict_policy_.lo_lru.large_obj_threshold_bytes = + large_obj_threshold_kb * 1024ul; + } + store::DataStoreHandler *const store_hd_; /* @@ -2045,6 +2057,13 @@ class LocalCcShards bool realtime_sampling_; + CacheEvictPolicy cache_evict_policy_{CacheEvictPolicy::LRU}; + union + { + CacheEvictPolicyLRU lru; + CacheEvictPolicyLoLRU lo_lru; + } u_cache_evict_policy_; + struct RangeSplitTask { RangeSplitTask(std::shared_ptr data_sync_task, diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index a4888664..b4ac6f19 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -95,6 +95,7 @@ class ObjectCcMap : public TemplateCcMap using CcMap::shard_; using CcMap::table_name_; using CcMap::table_schema_; + using TemplateCcMap::ccmp_; using TemplateCcMap::Find; using TemplateCcMap::FindEmplace; using TemplateCcMap::End; @@ -103,6 +104,7 @@ class ObjectCcMap : public TemplateCcMap using TemplateCcMap::RecordSchema; using TemplateCcMap::Type; using TemplateCcMap::CleanEntry; + using TemplateCcMap::TryUpdatePageKey; bool Execute(ApplyCc &req) override { @@ -1046,6 +1048,10 @@ class ObjectCcMap : public TemplateCcMap } CommitCommandOnDirtyPayload( dirty_payload, dirty_payload_status, *cmd); + if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) + { + EnsureLargeObjOccupyPageAlone(ccp, cce); + } } // if cmd.ExecuteOn() telling ttl reset is not going to happen else if (obj_result.ttl_reset_ == true) @@ -1201,6 +1207,11 @@ class ObjectCcMap : public TemplateCcMap { CommitCommandOnPayload( cce->payload_.cur_payload_, status, *cmd); + if (shard_->GetCacheEvictPolicy() == + CacheEvictPolicy::LO_LRU) + { + EnsureLargeObjOccupyPageAlone(ccp, cce); + } } // Reset the dirty status. @@ -1418,6 +1429,11 @@ class ObjectCcMap : public TemplateCcMap CommitCommandOnPayload(cce->payload_.cur_payload_, payload_status, *pending_cmd); + if (shard_->GetCacheEvictPolicy() == + CacheEvictPolicy::LO_LRU) + { + EnsureLargeObjOccupyPageAlone(ccp, cce); + } } else { @@ -2551,7 +2567,8 @@ class ObjectCcMap : public TemplateCcMap CcEntry *cce = static_cast *>(entry); - LruPage *ccp = cce->GetCcPage(); + CcPage *ccp = + static_cast *>(cce->GetCcPage()); cce->GetKeyGapLockAndExtraData()->ReleasePin(); cce->RecycleKeyLock(*shard_); @@ -2690,6 +2707,11 @@ class ObjectCcMap : public TemplateCcMap { ccp->smallest_ttl_ = cce->payload_.cur_payload_->GetTTL(); } + + if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) + { + EnsureLargeObjOccupyPageAlone(ccp, cce); + } } else { @@ -2775,6 +2797,14 @@ class ObjectCcMap : public TemplateCcMap // Commit the pending command. CommitCommandOnDirtyPayload( dirty_payload, dirty_payload_status, *pending_cmd); + if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) + { + CcPage *ccp = + static_cast *>( + cce->GetCcPage()); + EnsureLargeObjOccupyPageAlone(ccp, cce); + } + cce->SetDirtyPayload(std::move(dirty_payload)); cce->SetDirtyPayloadStatus(dirty_payload_status); cce->SetPendingCmd(nullptr); @@ -2880,5 +2910,54 @@ class ObjectCcMap : public TemplateCcMap ? cce->payload_.cur_payload_->GetObjectType() : -1; } + + void EnsureLargeObjOccupyPageAlone(CcPage *ccp, + CcEntry *cce) + { + assert(cce->PayloadStatus() == RecordStatus::Normal); + if (cce->payload_.cur_payload_.SerializedLength() > + shard_->LargeObjThresholdBytes()) + { + if (ccp->Size() == 1) + { + ccp->large_obj_page_ = true; + } + else + { + assert(ccp->large_obj_page_ == false); + size_t idx_in_ccp = ccp->FindEntry(cce); + const KeyT *key = ccp->Key(idx_in_ccp); + + auto large_obj_ccp_uptr = + std::make_unique>( + this, ccp, ccp->next_page_); + CcPage *large_obj_ccp_ptr = + large_obj_ccp_uptr.get(); + + large_obj_ccp_ptr->prev_page_ = ccp; + large_obj_ccp_ptr->next_page_ = ccp->next_page_; + ccp->next_page_->prev_page_ = large_obj_ccp_ptr; + ccp->next_page_ = large_obj_ccp_ptr; + + large_obj_ccp_ptr->large_obj_ccp = true; + large_obj_ccp_ptr->Emplace(*key, ccp->MoveEntry(idx_in_ccp)); + large_obj_ccp_ptr->smallest_ttl_ = + cce->payload_.cur_payload_->GetTTL(); + large_obj_ccp_ptr->last_dirty_commit_ts_ = cce->CommitTs(); + large_obj_ccp_ptr->last_access_ts_ = 0; + + auto ccp_it = ccmp_.upper_bound(*key); + if (ccp_it == ccmp_.end() || *key < ccp_it->first) + { + --ccp_it; + } + + ccp->Remove(idx_in_ccp); + TryUpdatePageKey(ccp_it); + + ccmp_.try_emplace(*key, std::move(large_obj_ccp_uptr)); + } + } + } }; } // namespace txservice diff --git a/tx_service/include/tx_service.h b/tx_service/include/tx_service.h index 1c1aa1f1..36a6c6be 100644 --- a/tx_service/include/tx_service.h +++ b/tx_service/include/tx_service.h @@ -89,9 +89,6 @@ class TxServiceModule; class TxProcessor { public: - static const int64_t t1sec = 1000000L; - static const int64_t t2sec = 4000000L; - TxProcessor(size_t thd_id, LocalCcShards &shards, TxLog *txlog_hd, @@ -1303,6 +1300,16 @@ class TxService return 0; } + void SetupPolicyLRU() + { + local_cc_shards_.SetupPolicyLRU(); + } + + void SetupPolicyLoLRU(uint32_t large_obj_threshold_kb) + { + local_cc_shards_.SetupPolicyLoLRU(large_obj_threshold_kb); + } + void WaitClusterReady() { Sharder::Instance().WaitClusterReady(); diff --git a/tx_service/include/type.h b/tx_service/include/type.h index 2fe288c5..bae4ebe1 100644 --- a/tx_service/include/type.h +++ b/tx_service/include/type.h @@ -659,6 +659,21 @@ enum struct TxProcessorStatus Standby }; +enum CacheEvictPolicy +{ + LRU = 0, + LO_LRU, +}; +struct CacheEvictPolicyLRU +{ + static constexpr std::string_view NAME = "LRU"; +}; +struct CacheEvictPolicyLoLRU +{ + static constexpr std::string_view NAME = "LO_LRU"; + uint64_t large_obj_threshold_bytes; +}; + enum struct WorkerStatus { Active, diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index 2036d569..95b2134e 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -96,6 +96,7 @@ CcShard::CcShard( next_forward_sequence_id_(1), head_ccp_(nullptr), tail_ccp_(nullptr), + protected_head_page_(&tail_ccp_), clean_start_ccp_(nullptr), size_(0), dirty_memory_check_interval_(dirty_memory_check_interval), @@ -903,6 +904,17 @@ TEntry *CcShard::LocateTx(TxNumber tx_number) return nullptr; } +CacheEvictPolicy CcShard::GetCacheEvictPolicy() const +{ + return local_shards_.cache_evict_policy_; +} + +uint64_t CcShard::LargeObjThresholdBytes() const +{ + assert(local_shards_.cache_evict_policy_ == CacheEvictPolicy::LO_LRU); + return local_shards_.u_cache_evict_policy_.lo_lru.large_obj_threshold_bytes; +} + void CcShard::DetachLru(LruPage *page) { LruPage *prev = page->lru_prev_; @@ -913,6 +925,17 @@ void CcShard::DetachLru(LruPage *page) { clean_start_ccp_ = page->lru_next_; } + + if (GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) + { + // If page is the head of the protected list, advance the protected list + // head to the next page. + if (protected_head_page_ == page) + { + protected_head_page_ = page->lru_next_; + } + } + assert(prev != nullptr && next != nullptr); prev->lru_next_ = next; next->lru_prev_ = prev; @@ -928,12 +951,23 @@ void CcShard::ReplaceLru(LruPage *old_page, LruPage *new_page) old_page->lru_prev_ = nullptr; LruPage *lru_next = old_page->lru_next_; old_page->lru_next_ = nullptr; - // If page is the head to start looking for cc entry to kickout, move - // the clean head to the next page + // If old page is the head to start looking for cc entry to kickout, move + // the clean head to the new page if (clean_start_ccp_ == old_page) { clean_start_ccp_ = new_page; } + + if (GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) + { + // If old page is the head of the protected list, move the protected + // list head to the new page. + if (protected_head_page_ == old_page) + { + protected_head_page_ = new_page; + } + } + lru_prev->lru_next_ = new_page; lru_next->lru_prev_ = new_page; new_page->lru_next_ = lru_next; @@ -958,23 +992,57 @@ void CcShard::UpdateLruList(LruPage *page, bool is_emplace) page->last_access_ts_ = access_counter_; return; } - // Removes the page from the list, if it's already in the list. This is - // used to keep the updated page at the end(tail) of the LRU list. A - // page's prev and post are both not-null when the page is in the - // list. This is because we have a reserved head and tail for the list. + + LruPage *insert_before = nullptr; + if (GetCacheEvictPolicy() == CacheEvictPolicy::LRU) + { + insert_before = &tail_ccp_; + } + else if (GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) + { + // Determine insertion point depending on whether the page holds large + // object. + insert_before = + page->large_obj_page_ ? &tail_ccp_ : protected_head_page_; + } + else + { + assert(false); + } + + if (page->lru_next_ == insert_before && insert_before->lru_prev_ == page) + { + ++access_counter_; + page->last_access_ts_ = access_counter_; + return; + } + + // Remove the page from its current position in the list (if present). if (page->lru_next_ != nullptr) { DetachLru(page); } - LruPage *second_tail = tail_ccp_.lru_prev_; - second_tail->lru_next_ = page; - tail_ccp_.lru_prev_ = page; - page->lru_next_ = &tail_ccp_; - page->lru_prev_ = second_tail; + + LruPage *insert_after = insert_before->lru_prev_; + insert_after->lru_next_ = page; + insert_before->lru_prev_ = page; + page->lru_next_ = insert_before; + page->lru_prev_ = insert_after; ++access_counter_; page->last_access_ts_ = access_counter_; + if (GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) + { + // Maintain protected_head_page_: when a large object page is inserted + // and the protected list was empty (protected_head_page_ == + // &tail_ccp_), the new page becomes the protected head. + if (page->large_obj_page_ && protected_head_page_ == &tail_ccp_) + { + protected_head_page_ = page; + } + } + // If the update is a emplace update, these new loaded data might be // kickable from cc map. Usually if the clean_start_page is at tail we're // not able to load new data into memory, except some special case where we From 0f82e1a1b201758a17ed89a221528e211d4b437d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=A2=E6=99=93=E9=98=B3?= Date: Wed, 4 Mar 2026 18:05:06 +0800 Subject: [PATCH 2/2] save --- core/src/tx_service_init.cpp | 5 +- tx_service/CMakeLists.txt | 6 + tx_service/include/cc/cc_entry.h | 38 +- tx_service/include/cc/local_cc_shards.h | 2 +- tx_service/include/cc/object_cc_map.h | 119 ++-- tx_service/include/cc/template_cc_map.h | 124 +++++ tx_service/include/store/int_mem_store.h | 5 + tx_service/src/cc/cc_shard.cpp | 70 ++- tx_service/tests/CMakeLists.txt | 1 + tx_service/tests/CcPage-Test.cpp | 50 +- tx_service/tests/LargeObjLRU-Test.cpp | 506 ++++++++++++++++++ tx_service/tests/StartTsCollector-Test.cpp | 4 + .../tests/include/mock/mock_catalog_factory.h | 27 +- 13 files changed, 842 insertions(+), 115 deletions(-) create mode 100644 tx_service/tests/LargeObjLRU-Test.cpp diff --git a/core/src/tx_service_init.cpp b/core/src/tx_service_init.cpp index c5bff0db..3da74597 100644 --- a/core/src/tx_service_init.cpp +++ b/core/src/tx_service_init.cpp @@ -230,8 +230,9 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) #endif } - txservice::CacheEvictPolicy cache_evict_policy; - uint32_t lolru_large_obj_threshold_kb; + txservice::CacheEvictPolicy cache_evict_policy = + txservice::CacheEvictPolicy::LRU; + uint32_t lolru_large_obj_threshold_kb = 0; std::string cache_evict_policy_str = !CheckCommandLineFlagIsDefault("cache_evict_policy") ? FLAGS_cache_evict_policy diff --git a/tx_service/CMakeLists.txt b/tx_service/CMakeLists.txt index c589992f..10836759 100644 --- a/tx_service/CMakeLists.txt +++ b/tx_service/CMakeLists.txt @@ -329,8 +329,14 @@ if(RUN_TX_SERVICE_TESTS) target_include_directories(StartTsCollector-Test PUBLIC ${TX_SERVICE_SOURCE_DIR}/tests/include ${INCLUDE_DIR}) target_link_libraries(StartTsCollector-Test PUBLIC txservice Catch2::Catch2) + # LargeObjLRU-Test + add_executable(LargeObjLRU-Test ${TX_SERVICE_SOURCE_DIR}/tests/LargeObjLRU-Test.cpp) + target_include_directories(LargeObjLRU-Test PUBLIC ${TX_SERVICE_SOURCE_DIR}/tests/include ${INCLUDE_DIR}) + target_link_libraries(LargeObjLRU-Test PUBLIC txservice Catch2::Catch2) + # Register tests with CTest add_test(NAME CcEntry-Test COMMAND CcEntry-Test) add_test(NAME CcPage-Test COMMAND CcPage-Test) add_test(NAME StartTsCollector-Test COMMAND StartTsCollector-Test) + add_test(NAME LargeObjLRU-Test COMMAND LargeObjLRU-Test) endif() diff --git a/tx_service/include/cc/cc_entry.h b/tx_service/include/cc/cc_entry.h index cad4087a..011c884a 100644 --- a/tx_service/include/cc/cc_entry.h +++ b/tx_service/include/cc/cc_entry.h @@ -2042,7 +2042,7 @@ struct LruPage uint64_t smallest_ttl_{UINT64_MAX}; // Large object occupies one page exclusively. - bool large_obj_page_ : 1 {false}; + bool large_obj_page_{false}; }; template UpdateCcPage(this); + + if (entry->payload_.cur_payload_->HasTTL()) + { + uint64_t ttl = entry->payload_.cur_payload_->GetTTL(); + if (ttl < smallest_ttl_) + { + smallest_ttl_ = ttl; + } + } + if (entry->CommitTs() > last_dirty_commit_ts_) + { + last_dirty_commit_ts_ = entry->CommitTs(); + } size_t insert_pos = insert_it - keys_.begin(); keys_.emplace(insert_it, key); - entries_.emplace(entries_.begin() + insert_pos, entry); + entries_.emplace(entries_.begin() + insert_pos, std::move(entry)); return insert_pos; } @@ -2562,6 +2578,24 @@ struct CcPage : public LruPage return entry; } + void UpdateSmallestTTL() + { + smallest_ttl_ = UINT64_MAX; + for (std::unique_ptr< + CcEntry> + &entry : entries_) + { + if (entry->payload_.cur_payload_->HasTTL()) + { + uint64_t ttl = entry->payload_.cur_payload_->GetTTL(); + if (ttl < smallest_ttl_) + { + smallest_ttl_ = ttl; + } + } + } + } + size_t Size() const { return keys_.size(); diff --git a/tx_service/include/cc/local_cc_shards.h b/tx_service/include/cc/local_cc_shards.h index 38e94d83..808f2a87 100644 --- a/tx_service/include/cc/local_cc_shards.h +++ b/tx_service/include/cc/local_cc_shards.h @@ -1822,7 +1822,7 @@ class LocalCcShards { cache_evict_policy_ = CacheEvictPolicy::LO_LRU; u_cache_evict_policy_.lo_lru.large_obj_threshold_bytes = - large_obj_threshold_kb * 1024ul; + large_obj_threshold_kb * 1024ull; } store::DataStoreHandler *const store_hd_; diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index b4ac6f19..48916e97 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -105,6 +105,8 @@ class ObjectCcMap : public TemplateCcMap using TemplateCcMap::Type; using TemplateCcMap::CleanEntry; using TemplateCcMap::TryUpdatePageKey; + using TemplateCcMap:: + EnsureLargeObjOccupyPageAlone; bool Execute(ApplyCc &req) override { @@ -1048,10 +1050,6 @@ class ObjectCcMap : public TemplateCcMap } CommitCommandOnDirtyPayload( dirty_payload, dirty_payload_status, *cmd); - if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) - { - EnsureLargeObjOccupyPageAlone(ccp, cce); - } } // if cmd.ExecuteOn() telling ttl reset is not going to happen else if (obj_result.ttl_reset_ == true) @@ -1207,11 +1205,6 @@ class ObjectCcMap : public TemplateCcMap { CommitCommandOnPayload( cce->payload_.cur_payload_, status, *cmd); - if (shard_->GetCacheEvictPolicy() == - CacheEvictPolicy::LO_LRU) - { - EnsureLargeObjOccupyPageAlone(ccp, cce); - } } // Reset the dirty status. @@ -1270,6 +1263,11 @@ class ObjectCcMap : public TemplateCcMap ccp->smallest_ttl_ = 0; } } + + if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) + { + EnsureLargeObjOccupyPageAlone(ccp, cce); + } } else { @@ -1429,11 +1427,6 @@ class ObjectCcMap : public TemplateCcMap CommitCommandOnPayload(cce->payload_.cur_payload_, payload_status, *pending_cmd); - if (shard_->GetCacheEvictPolicy() == - CacheEvictPolicy::LO_LRU) - { - EnsureLargeObjOccupyPageAlone(ccp, cce); - } } else { @@ -1523,6 +1516,12 @@ class ObjectCcMap : public TemplateCcMap LockType::WriteLock, true, cce->payload_.cur_payload_.get()); + + if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) + { + EnsureLargeObjOccupyPageAlone(ccp, cce); + } + if (cce->PayloadStatus() == RecordStatus::Unknown && cce->IsFree()) { // If the finished cmd ignores kv value and the tx aborts, we will @@ -1866,6 +1865,12 @@ class ObjectCcMap : public TemplateCcMap ccp->smallest_ttl_ = 0; } } + + if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU && + payload_status == RecordStatus::Normal) + { + EnsureLargeObjOccupyPageAlone(ccp, cce); + } } ReleaseCceLock(lk, cce, txn, req.NodeGroupId(), LockType::WriteLock); @@ -2093,6 +2098,12 @@ class ObjectCcMap : public TemplateCcMap } } + if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU && + cce->PayloadStatus() == RecordStatus::Normal) + { + EnsureLargeObjOccupyPageAlone(ccp, cce); + } + return req.SetFinish(*shard_); } @@ -2538,6 +2549,12 @@ class ObjectCcMap : public TemplateCcMap true, cce->payload_.cur_payload_.get()); } + + if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU && + payload_status == RecordStatus::Normal) + { + EnsureLargeObjOccupyPageAlone(ccp, cce); + } } if (next_core != UINT16_MAX) @@ -2707,17 +2724,17 @@ class ObjectCcMap : public TemplateCcMap { ccp->smallest_ttl_ = cce->payload_.cur_payload_->GetTTL(); } - - if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) - { - EnsureLargeObjOccupyPageAlone(ccp, cce); - } } else { assert(cce->PayloadStatus() == RecordStatus::Deleted); ccp->smallest_ttl_ = 0; } + + if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) + { + EnsureLargeObjOccupyPageAlone(ccp, cce); + } } return true; @@ -2797,18 +2814,19 @@ class ObjectCcMap : public TemplateCcMap // Commit the pending command. CommitCommandOnDirtyPayload( dirty_payload, dirty_payload_status, *pending_cmd); - if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) - { - CcPage *ccp = - static_cast *>( - cce->GetCcPage()); - EnsureLargeObjOccupyPageAlone(ccp, cce); - } cce->SetDirtyPayload(std::move(dirty_payload)); cce->SetDirtyPayloadStatus(dirty_payload_status); cce->SetPendingCmd(nullptr); } + + if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) + { + CcPage *ccp = + static_cast *>( + cce->GetCcPage()); + EnsureLargeObjOccupyPageAlone(ccp, cce); + } } void CommitCommandOnPayload(std::unique_ptr &payload, @@ -2910,54 +2928,5 @@ class ObjectCcMap : public TemplateCcMap ? cce->payload_.cur_payload_->GetObjectType() : -1; } - - void EnsureLargeObjOccupyPageAlone(CcPage *ccp, - CcEntry *cce) - { - assert(cce->PayloadStatus() == RecordStatus::Normal); - if (cce->payload_.cur_payload_.SerializedLength() > - shard_->LargeObjThresholdBytes()) - { - if (ccp->Size() == 1) - { - ccp->large_obj_page_ = true; - } - else - { - assert(ccp->large_obj_page_ == false); - size_t idx_in_ccp = ccp->FindEntry(cce); - const KeyT *key = ccp->Key(idx_in_ccp); - - auto large_obj_ccp_uptr = - std::make_unique>( - this, ccp, ccp->next_page_); - CcPage *large_obj_ccp_ptr = - large_obj_ccp_uptr.get(); - - large_obj_ccp_ptr->prev_page_ = ccp; - large_obj_ccp_ptr->next_page_ = ccp->next_page_; - ccp->next_page_->prev_page_ = large_obj_ccp_ptr; - ccp->next_page_ = large_obj_ccp_ptr; - - large_obj_ccp_ptr->large_obj_ccp = true; - large_obj_ccp_ptr->Emplace(*key, ccp->MoveEntry(idx_in_ccp)); - large_obj_ccp_ptr->smallest_ttl_ = - cce->payload_.cur_payload_->GetTTL(); - large_obj_ccp_ptr->last_dirty_commit_ts_ = cce->CommitTs(); - large_obj_ccp_ptr->last_access_ts_ = 0; - - auto ccp_it = ccmp_.upper_bound(*key); - if (ccp_it == ccmp_.end() || *key < ccp_it->first) - { - --ccp_it; - } - - ccp->Remove(idx_in_ccp); - TryUpdatePageKey(ccp_it); - - ccmp_.try_emplace(*key, std::move(large_obj_ccp_uptr)); - } - } - } }; } // namespace txservice diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 77fb5be1..9eb7c27d 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -8899,6 +8899,13 @@ class TemplateCcMap : public CcMap ValueT, VersionedRecord, RangePartitioned>::split_threshold_; + if (shard_->GetCacheEvictPolicy() == + CacheEvictPolicy::LO_LRU && + *borrow_from_prev) + { + borrow_from_prev = page->large_obj_page_ == false && + prev->large_obj_page_ == false; + } } return *borrow_from_prev; }; @@ -8914,6 +8921,13 @@ class TemplateCcMap : public CcMap ValueT, VersionedRecord, RangePartitioned>::split_threshold_; + if (shard_->GetCacheEvictPolicy() == + CacheEvictPolicy::LO_LRU && + *borrow_from_next) + { + borrow_from_next = page->large_obj_page_ == false && + next->large_obj_page_ == false; + } } return *borrow_from_next; }; @@ -8929,6 +8943,13 @@ class TemplateCcMap : public CcMap ValueT, VersionedRecord, RangePartitioned>::split_threshold_; + if (shard_->GetCacheEvictPolicy() == + CacheEvictPolicy::LO_LRU && + *merge_with_prev) + { + merge_with_prev = page->large_obj_page_ == false && + prev->large_obj_page_ == false; + } } return *merge_with_prev; }; @@ -8944,6 +8965,13 @@ class TemplateCcMap : public CcMap ValueT, VersionedRecord, RangePartitioned>::split_threshold_; + if (shard_->GetCacheEvictPolicy() == + CacheEvictPolicy::LO_LRU && + *merge_with_next) + { + merge_with_next = page->large_obj_page_ == false && + next->large_obj_page_ == false; + } } return *merge_with_next; }; @@ -10315,6 +10343,37 @@ class TemplateCcMap : public CcMap return End(); } + if (shard_->GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) + { + if (target_page->large_obj_page_) + { + target_it++; + bool emplace_page = false; + if (target_it == ccmp_.end()) + { + emplace_page = true; + } + else + { + target_page = target_it->second.get(); + emplace_page = + target_page->large_obj_page_ || target_page->Full(); + } + if (emplace_page) + { + target_it = ccmp_.try_emplace( + target_it, + key, + std::make_unique>( + this, target_page, target_page->next_page_)); + target_page = target_it->second.get(); + } + } + } + // not found, emplace key into target page, split the page if // it's full if (target_page->Full()) @@ -11645,6 +11704,8 @@ class TemplateCcMap : public CcMap assert(page1.next_page_ == &page2 && &page1 == page2.prev_page_); assert(page1_it->first == page1.FirstKey()); assert(page2_it->first == page2.FirstKey()); + assert(page1.large_obj_page_ == false); + assert(page2.large_obj_page_ == false); if (page1.Size() > page2.Size()) { @@ -11788,6 +11849,9 @@ class TemplateCcMap : public CcMap CcPage *page2 = page2_it->second.get(); + assert(page1->large_obj_page_ == false); + assert(page2->large_obj_page_ == false); + auto merged_page_it = page1_it; auto discarded_page_it = page2_it; CcPage *merged_page = @@ -11904,6 +11968,66 @@ class TemplateCcMap : public CcMap page2_it = ccmp_.end(); } + void EnsureLargeObjOccupyPageAlone( + CcPage *ccp, + CcEntry *cce) + { + if (cce->PayloadStatus() == RecordStatus::Deleted) + { + ccp->large_obj_page_ = false; + } + else + { + assert(cce->PayloadStatus() == RecordStatus::Normal); + if (cce->payload_.cur_payload_->SerializedLength() > + shard_->LargeObjThresholdBytes()) + { + if (ccp->Size() <= 1) + { + ccp->large_obj_page_ = true; + } + else + { + assert(ccp->large_obj_page_ == false); + size_t idx_in_ccp = ccp->FindEntry(cce); + const KeyT *key = ccp->Key(idx_in_ccp); + + auto large_obj_ccp_uptr = + std::make_unique>( + this, ccp, ccp->next_page_); + large_obj_ccp_uptr->large_obj_page_ = true; + large_obj_ccp_uptr->Emplace(*key, + ccp->MoveEntry(idx_in_ccp)); + ccp->Remove(idx_in_ccp); + ccp->UpdateSmallestTTL(); + key = large_obj_ccp_uptr->Key(0); + + auto ccp_it = ccmp_.upper_bound(*key); + if (ccp_it == ccmp_.end() || *key < ccp_it->first) + { + --ccp_it; + } + + TryUpdatePageKey(ccp_it); + + CcPage + *large_obj_ccp_ptr = large_obj_ccp_uptr.get(); + ccmp_.try_emplace(*key, std::move(large_obj_ccp_uptr)); + shard_->UpdateLruList(large_obj_ccp_ptr, false); + } + } + else + { + ccp->large_obj_page_ = false; + } + } + + shard_->UpdateLruList(ccp, false); + } + CcPage *PageNegInf() { return &neg_inf_page_; diff --git a/tx_service/include/store/int_mem_store.h b/tx_service/include/store/int_mem_store.h index 1ac2d819..d9c0b836 100644 --- a/tx_service/include/store/int_mem_store.h +++ b/tx_service/include/store/int_mem_store.h @@ -148,6 +148,7 @@ class IntMemoryStore : public DataStoreHandler } bool DiscoverAllTableNames( + TableEngine table_engine, std::vector &norm_name_vec, const std::function *yield_fptr = nullptr, const std::function *resume_fptr = nullptr) override @@ -158,6 +159,7 @@ class IntMemoryStore : public DataStoreHandler //-- database bool UpsertDatabase( + TableEngine table_engine, std::string_view db, std::string_view definition, const std::function *yield_fptr = nullptr, @@ -167,6 +169,7 @@ class IntMemoryStore : public DataStoreHandler return false; } bool DropDatabase( + TableEngine table_engine, std::string_view db, const std::function *yield_fptr = nullptr, const std::function *resume_fptr = nullptr) override @@ -175,6 +178,7 @@ class IntMemoryStore : public DataStoreHandler return false; } bool FetchDatabase( + TableEngine table_engine, std::string_view db, std::string &definition, bool &found, @@ -185,6 +189,7 @@ class IntMemoryStore : public DataStoreHandler return false; } bool FetchAllDatabase( + TableEngine table_engine, std::vector &dbnames, const std::function *yield_fptr = nullptr, const std::function *resume_fptr = nullptr) override diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index 95b2134e..56995e34 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -932,7 +932,10 @@ void CcShard::DetachLru(LruPage *page) // head to the next page. if (protected_head_page_ == page) { - protected_head_page_ = page->lru_next_; + assert(protected_head_page_ != &tail_ccp_); + protected_head_page_ = protected_head_page_->lru_next_; + assert(protected_head_page_ == &tail_ccp_ || + protected_head_page_->large_obj_page_ == true); } } @@ -960,11 +963,20 @@ void CcShard::ReplaceLru(LruPage *old_page, LruPage *new_page) if (GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) { + assert(old_page->large_obj_page_ == new_page->large_obj_page_); // If old page is the head of the protected list, move the protected // list head to the new page. - if (protected_head_page_ == old_page) + if (protected_head_page_ != &tail_ccp_) { - protected_head_page_ = new_page; + if (protected_head_page_ == old_page) + { + protected_head_page_ = new_page; + } + assert(protected_head_page_->large_obj_page_ == true); + } + else + { + assert(old_page->large_obj_page_ == false); } } @@ -985,32 +997,43 @@ void CcShard::UpdateLruList(LruPage *page, bool is_emplace) assert(page->lru_next_ == nullptr && page->lru_prev_ == nullptr); return; } - // page already at the tail, do nothing - if (page->lru_next_ == &tail_ccp_ && tail_ccp_.lru_prev_ == page) - { - ++access_counter_; - page->last_access_ts_ = access_counter_; - return; - } - LruPage *insert_before = nullptr; + LruPage *tail_cpp_ptr = &tail_ccp_; + LruPage **insert_before = nullptr; if (GetCacheEvictPolicy() == CacheEvictPolicy::LRU) { - insert_before = &tail_ccp_; + insert_before = &tail_cpp_ptr; } else if (GetCacheEvictPolicy() == CacheEvictPolicy::LO_LRU) { // Determine insertion point depending on whether the page holds large // object. - insert_before = - page->large_obj_page_ ? &tail_ccp_ : protected_head_page_; + if (page->large_obj_page_) + { + if (protected_head_page_ == &tail_ccp_) + { + protected_head_page_ = page; + } + insert_before = &tail_cpp_ptr; + } + else + { + if (protected_head_page_ == page) + { + assert(page != &tail_ccp_); + protected_head_page_ = page->lru_next_; + } + insert_before = &protected_head_page_; + } } else { assert(false); } - if (page->lru_next_ == insert_before && insert_before->lru_prev_ == page) + // page already at the tail/protected_head_page_, do nothing + if (page->lru_next_ == *insert_before && + (*insert_before)->lru_prev_ == page) { ++access_counter_; page->last_access_ts_ = access_counter_; @@ -1023,10 +1046,10 @@ void CcShard::UpdateLruList(LruPage *page, bool is_emplace) DetachLru(page); } - LruPage *insert_after = insert_before->lru_prev_; + LruPage *insert_after = (*insert_before)->lru_prev_; insert_after->lru_next_ = page; - insert_before->lru_prev_ = page; - page->lru_next_ = insert_before; + (*insert_before)->lru_prev_ = page; + page->lru_next_ = *insert_before; page->lru_prev_ = insert_after; ++access_counter_; @@ -1037,9 +1060,16 @@ void CcShard::UpdateLruList(LruPage *page, bool is_emplace) // Maintain protected_head_page_: when a large object page is inserted // and the protected list was empty (protected_head_page_ == // &tail_ccp_), the new page becomes the protected head. - if (page->large_obj_page_ && protected_head_page_ == &tail_ccp_) + if (protected_head_page_ == &tail_ccp_) + { + if (page->large_obj_page_) + { + protected_head_page_ = page; + } + } + else { - protected_head_page_ = page; + assert(protected_head_page_->large_obj_page_ == true); } } diff --git a/tx_service/tests/CMakeLists.txt b/tx_service/tests/CMakeLists.txt index 32c5d47a..9cb9d78e 100644 --- a/tx_service/tests/CMakeLists.txt +++ b/tx_service/tests/CMakeLists.txt @@ -31,6 +31,7 @@ set( SOURCES_SINGLE_FILE CcEntry-Test.cpp CcPage-Test.cpp StartTsCollector-Test.cpp + LargeObjLRU-Test.cpp ) set( SOURCES_ALL diff --git a/tx_service/tests/CcPage-Test.cpp b/tx_service/tests/CcPage-Test.cpp index 56c4e189..e917e9f0 100644 --- a/tx_service/tests/CcPage-Test.cpp +++ b/tx_service/tests/CcPage-Test.cpp @@ -25,6 +25,7 @@ #include "cc_entry.h" #include "cc_shard.h" +#include "include/mock/mock_catalog_factory.h" #include "local_cc_shards.h" #include "template_cc_map.h" #include "tx_key.h" // CompositeKey @@ -33,6 +34,7 @@ namespace txservice { +static MockCatalogFactory mock_catalog_factory{}; void PrepareCcMap(TemplateCcMap, CompositeRecord, @@ -74,25 +76,47 @@ TEST_CASE("CcPage clean tests", "[cc-page]") { std::unordered_map> ng_configs{ {0, {NodeConfig(0, "127.0.0.1", 8600)}}}; - std::map tx_cnf{{"node_memory_limit_mb", 1000}, - {"enable_key_cache", 0}, - {"reltime_sampling", 0}, - {"range_split_worker_num", 1}, - {"core_num", 1}, - {"realtime_sampling", 0}, - {"checkpointer_interval", 10}, - {"enable_shard_heap_defragment", 0}, - {"node_log_limit_mb", 1000}}; - LocalCcShards local_cc_shards( - 0, 0, tx_cnf, nullptr, nullptr, &ng_configs, 2, nullptr, nullptr, true); + std::map tx_cnf{ + {"node_memory_limit_mb", 1000}, + {"enable_key_cache", 0}, + {"reltime_sampling", 0}, + {"range_split_worker_num", 1}, + {"range_slice_memory_limit_percent", 20}, + {"core_num", 1}, + {"realtime_sampling", 0}, + {"checkpointer_interval", 10}, + {"checkpointer_delay_seconds", 0}, + {"checkpointer_min_ckpt_request_interval", 5}, + {"enable_shard_heap_defragment", 0}, + {"node_log_limit_mb", 1000}, + {"collect_active_tx_ts_interval_seconds", 2}, + {"rep_group_cnt", 1}, + }; + + CatalogFactory *catalog_factory[5] = { + &mock_catalog_factory, + &mock_catalog_factory, + &mock_catalog_factory, + &mock_catalog_factory, + &mock_catalog_factory, + }; + LocalCcShards local_cc_shards(0, + 0, + tx_cnf, + catalog_factory, + nullptr, + &ng_configs, + 2, + nullptr, + nullptr, + true); CcShard shard(0, 1, 10000, - 10000, false, 0, local_cc_shards, - nullptr, + catalog_factory, nullptr, &ng_configs, 2); diff --git a/tx_service/tests/LargeObjLRU-Test.cpp b/tx_service/tests/LargeObjLRU-Test.cpp new file mode 100644 index 00000000..53f66820 --- /dev/null +++ b/tx_service/tests/LargeObjLRU-Test.cpp @@ -0,0 +1,506 @@ +/** + * Basic migration consistency test for moving an entry between pages. + */ +#include + +#define protected public +#define private public + +#include "cc/cc_entry.h" +#include "eloq_string_key_record.h" +#include "include/mock/mock_catalog_factory.h" +#include "tx_key.h" +#include "tx_record.h" + +namespace txservice +{ + +static MockCatalogFactory mock_catalog_factory{}; + +TEST_CASE("Large object migration basic consistency", "[large-obj-lru]") +{ + using KeyT = CompositeKey; + using ValT = CompositeRecord; + + // Create sentinel pages and two normal pages linked between them so that + // both prev_page_ and next_page_ are non-null (not sentinels). + CcPage neg_inf(nullptr); + CcPage pos_inf(nullptr); + + CcPage old_page(nullptr, &neg_inf, &pos_inf); + CcPage new_page(nullptr, &old_page, &pos_inf); + + // Prepare two keys and emplace them into old_page + KeyT k1(std::make_tuple(std::string("t"), 1)); + KeyT k2(std::make_tuple(std::string("t"), 2)); + + size_t idx1 = old_page.Emplace(k1); + size_t idx2 = old_page.Emplace(k2); + + REQUIRE(old_page.Size() == 2); + + // Populate payloads for entries to simulate normal objects + old_page.Entry(idx1)->payload_.cur_payload_ = std::make_unique(10); + old_page.Entry(idx2)->payload_.cur_payload_ = std::make_unique(20); + + // Move the second entry to new_page using MoveEntry + Emplace(unique_ptr) + size_t move_idx = old_page.Find(k2); + REQUIRE(move_idx < old_page.Size()); + + auto moved_uptr = old_page.MoveEntry(move_idx); + old_page.Remove(move_idx); + new_page.Emplace(k2, std::move(moved_uptr)); + new_page.large_obj_page_ = true; + + // After move, old page size should have decreased and key k2 no longer + // be present in old_page. + REQUIRE(old_page.Size() == 1); + REQUIRE(old_page.Find(k2) == old_page.Size()); + + // New page contains the moved key + const KeyT *nk = new_page.Key(0); + REQUIRE(nk != nullptr); + REQUIRE(*nk == k2); +} + +// Helper to create shard and local shards configured for LO_LRU. +static std::pair, std::unique_ptr> +make_shard() +{ + std::unordered_map> ng_configs{ + {0, {NodeConfig(0, "127.0.0.1", 8600)}}}; + std::map tx_cnf{ + {"node_memory_limit_mb", 1000}, + {"enable_key_cache", 0}, + {"reltime_sampling", 0}, + {"range_split_worker_num", 1}, + {"range_slice_memory_limit_percent", 20}, + {"core_num", 1}, + {"realtime_sampling", 0}, + {"checkpointer_interval", 10}, + {"checkpointer_delay_seconds", 0}, + {"checkpointer_min_ckpt_request_interval", 5}, + {"enable_shard_heap_defragment", 0}, + {"node_log_limit_mb", 1000}, + {"collect_active_tx_ts_interval_seconds", 2}, + {"rep_group_cnt", 1}, + }; + + CatalogFactory *catalog_factory[5] = {&mock_catalog_factory, + &mock_catalog_factory, + &mock_catalog_factory, + &mock_catalog_factory, + &mock_catalog_factory}; + + auto local = std::make_unique(0, + 0, + tx_cnf, + catalog_factory, + nullptr, + &ng_configs, + 2, + nullptr, + nullptr, + true); + // Use LO_LRU for this shard + local->SetupPolicyLoLRU(1); + + auto shard = std::make_unique(0, + 1, + 10000, + false, + 0, + *local, + catalog_factory, + nullptr, + &ng_configs, + 2); + shard->Init(); + std::string raft_path(""); + Sharder::Instance( + 0, &ng_configs, 0, nullptr, nullptr, local.get(), nullptr, &raft_path); + return {std::move(local), std::move(shard)}; +} + +// Configurable shard factory for integration test +static std::pair, std::unique_ptr> +make_shard_config(uint32_t node_memory_limit_mb, + uint32_t large_obj_threshold_kb) +{ + std::unordered_map> ng_configs{ + {0, {NodeConfig(0, "127.0.0.1", 8600)}}}; + std::map tx_cnf{ + {"node_memory_limit_mb", node_memory_limit_mb}, + {"enable_key_cache", 0}, + {"reltime_sampling", 0}, + {"range_split_worker_num", 1}, + {"range_slice_memory_limit_percent", 20}, + {"core_num", 1}, + {"realtime_sampling", 0}, + {"checkpointer_interval", 10}, + {"checkpointer_delay_seconds", 0}, + {"checkpointer_min_ckpt_request_interval", 5}, + {"enable_shard_heap_defragment", 0}, + {"node_log_limit_mb", 1000}, + {"collect_active_tx_ts_interval_seconds", 2}, + {"rep_group_cnt", 1}, + }; + + CatalogFactory *catalog_factory[5] = {&mock_catalog_factory, + &mock_catalog_factory, + &mock_catalog_factory, + &mock_catalog_factory, + &mock_catalog_factory}; + + auto local = std::make_unique(0, + 0, + tx_cnf, + catalog_factory, + nullptr, + &ng_configs, + 2, + nullptr, + nullptr, + true); + // Use LO_LRU for this shard with provided threshold + local->SetupPolicyLoLRU(large_obj_threshold_kb); + + auto shard = std::make_unique(0, + 1, + 10000, + false, + 0, + *local, + catalog_factory, + nullptr, + &ng_configs, + 2); + shard->Init(); + std::string raft_path(""); + Sharder::Instance( + 0, &ng_configs, 0, nullptr, nullptr, local.get(), nullptr, &raft_path); + return {std::move(local), std::move(shard)}; +} + +// Helper to create a TemplateCcMap and four pages (neg_inf, pos_inf, p1, p2). +template +static std::tuple *, + CcPage *, + CcPage *, + CcPage *> +make_ccmap_and_pages( + CcShard &sh, + std::unique_ptr> &ccmap_uptr) +{ + TableName tname( + std::string("tbl"), TableType::Primary, TableEngine::EloqSql); + ccmap_uptr = std::make_unique>( + &sh, 0, tname, 1, nullptr, true); + auto *cc_map = ccmap_uptr.get(); + auto *neg_inf = new CcPage(cc_map); + auto *pos_inf = new CcPage(cc_map); + auto *p1 = new CcPage(cc_map, neg_inf, pos_inf); + auto *p2 = new CcPage(cc_map, p1, pos_inf); + return {neg_inf, pos_inf, p1, p2}; +} + +TEST_CASE("LRU scenario A: small then large", "[large-obj-lru][lru-A]") +{ + auto [local, shard_uptr] = make_shard(); + CcShard &shard = *shard_uptr; + + std::unique_ptr, + CompositeRecord, + true, + true>> + ccmap_uptr; + CcPage, CompositeRecord, true, true> + *neg_inf, *pos_inf, *p1, *p2; + std::tie(neg_inf, pos_inf, p1, p2) = + make_ccmap_and_pages, + CompositeRecord>(shard, ccmap_uptr); + + p2->large_obj_page_ = true; // mark p2 as large + + shard.UpdateLruList(p1, false); + uint64_t t1 = p1->last_access_ts_; + REQUIRE(t1 > 0); + shard.UpdateLruList(p2, false); + uint64_t t2 = p2->last_access_ts_; + REQUIRE(t2 > t1); + REQUIRE(p2->lru_prev_ == p1); + + delete neg_inf; + delete pos_inf; + delete p1; + delete p2; +} + +TEST_CASE("LRU scenario B: large then small", "[large-obj-lru][lru-B]") +{ + auto [local, shard_uptr] = make_shard(); + CcShard &shard = *shard_uptr; + + std::unique_ptr, + CompositeRecord, + true, + true>> + ccmap_uptr; + CcPage, CompositeRecord, true, true> + *neg_inf, *pos_inf, *p1, *p2; + std::tie(neg_inf, pos_inf, p1, p2) = + make_ccmap_and_pages, + CompositeRecord>(shard, ccmap_uptr); + p2->large_obj_page_ = true; // p2 large + + shard.UpdateLruList(p2, false); + uint64_t tL = p2->last_access_ts_; + REQUIRE(tL > 0); + + shard.UpdateLruList(p1, false); + uint64_t tS = p1->last_access_ts_; + REQUIRE(tS > tL); + + // small page should be inserted before protected head (which is p2) + REQUIRE(p1->lru_next_ == p2); + + delete neg_inf; + delete pos_inf; + delete p1; + delete p2; +} + +TEST_CASE("LRU scenario C: two larges then small", "[large-obj-lru][lru-C]") +{ + auto [local, shard_uptr] = make_shard(); + CcShard &shard = *shard_uptr; + + std::unique_ptr, + CompositeRecord, + true, + true>> + ccmap_uptr; + CcPage, CompositeRecord, true, true> + *neg_inf, *pos_inf, *l1, *l2; + std::tie(neg_inf, pos_inf, l1, l2) = + make_ccmap_and_pages, + CompositeRecord>(shard, ccmap_uptr); + l1->large_obj_page_ = true; + l2->large_obj_page_ = true; + + shard.UpdateLruList(l1, false); + shard.UpdateLruList(l2, false); + + // now insert small page s + CcPage, CompositeRecord, true, true> + *s = new CcPage, + CompositeRecord, + true, + true>(ccmap_uptr.get(), neg_inf, pos_inf); + s->large_obj_page_ = false; + shard.UpdateLruList(s, false); + + // expected order: s -> l1 -> l2 (s before protected_head which is l1) + REQUIRE(s->lru_next_ == l1); + REQUIRE(l1->lru_next_ == l2); + + delete neg_inf; + delete pos_inf; + delete s; + delete l1; + delete l2; +} + +TEST_CASE("Test A: EnsureLargeObjOccupyPageAlone basic migration", + "[large-obj-lru][ensure-migrate]") +{ + auto [local, shard_uptr] = make_shard_config(64, 1); // 1KB threshold + CcShard &shard = *shard_uptr; + + using KeyT = EloqStringKey; + using ValT = EloqStringRecord; + + std::unique_ptr> ccmap_uptr; + TableName tname( + std::string("tbl_mig"), TableType::Primary, TableEngine::EloqSql); + ccmap_uptr = std::make_unique>( + &shard, 0, tname, 1, nullptr, false); + auto *cc_map = ccmap_uptr.get(); + + // Create first key via FindEmplace to ensure page is present in ccmp_ + EloqStringKey k1("ka", 2); + auto it1 = cc_map->FindEmplace(k1); + auto *cce1 = it1->second; + auto *page = it1.GetPage(); + cce1->payload_.cur_payload_ = std::make_unique(); + cce1->SetCommitTsPayloadStatus(1, RecordStatus::Normal); + cce1->SetCkptTs(100); + + // Emplace a second key via cc_map->FindEmplace to ensure map bookkeeping + EloqStringKey k2("kb", 2); + auto it_k2 = cc_map->FindEmplace(k2); + auto *cce2 = it_k2->second; + auto *page2 = it_k2.GetPage(); + REQUIRE(page2 == page); + REQUIRE(page->Size() > 1); + REQUIRE(cce2 != nullptr); + auto rec = std::make_unique(); + std::string big(shard.LargeObjThresholdBytes() + 512, 'X'); + rec->SetEncodedBlob(reinterpret_cast(big.data()), + big.size()); + cce2->payload_.cur_payload_ = std::move(rec); + cce2->SetCommitTsPayloadStatus(1, RecordStatus::Normal); + cce1->SetCkptTs(100); + + // Call EnsureLargeObjOccupyPageAlone to migrate the large entry + ccmap_uptr->EnsureLargeObjOccupyPageAlone(page, cce2); + // original page should have only one entry after migration + REQUIRE(page->Size() == 1); + REQUIRE(page->Entry(0)->PayloadStatus() == RecordStatus::Normal); + + // After migration, FindEmplace on k2 should return a page marked large + auto it2 = cc_map->FindEmplace(k2); + auto *new_page = it2.GetPage(); + REQUIRE(new_page->Size() == 1); + REQUIRE(new_page->Entry(0)->PayloadStatus() == RecordStatus::Normal); + REQUIRE(new_page->large_obj_page_ == true); + + // Ensure k2 is no longer present in original page + size_t found = page->Find(k2); + REQUIRE(found == page->Size()); + + // New page should contain the key + const KeyT *nk = new_page->Key(0); + REQUIRE(nk != nullptr); + REQUIRE(*nk == k2); +} + +// Test C: tail short-circuit sets protected head +TEST_CASE("LRU tail short-circuit sets protected head", + "[large-obj-lru][tail-short]") +{ + auto [local, shard_uptr] = make_shard(); + CcShard &shard = *shard_uptr; + + std::unique_ptr, + CompositeRecord, + true, + true>> + ccmap_uptr; + CcPage, CompositeRecord, true, true> + *neg_inf, *pos_inf, *p1, *p2; + std::tie(neg_inf, pos_inf, p1, p2) = + make_ccmap_and_pages, + CompositeRecord>(shard, ccmap_uptr); + + // Mark tail page as large and update LRU. UpdateLruList should + // short-circuit and set protected_head_page_ to this page. + p2->large_obj_page_ = true; + shard.UpdateLruList(p2, false); + + REQUIRE(shard.protected_head_page_ == p2); + + delete neg_inf; + delete pos_inf; + delete p1; + delete p2; +} + +TEST_CASE("Test D: LRU partition invariant", "[large-obj-lru][partition-inv]") +{ + auto [local, shard_uptr] = make_shard(); + CcShard &shard = *shard_uptr; + + std::unique_ptr, + CompositeRecord, + true, + true>> + ccmap_uptr; + CcPage, CompositeRecord, true, true> + *neg_inf, *pos_inf, *p1, *p2; + std::tie(neg_inf, pos_inf, p1, p2) = + make_ccmap_and_pages, + CompositeRecord>(shard, ccmap_uptr); + + // Create additional pages: two smalls and two larges + auto *s1 = new CcPage, + CompositeRecord, + true, + true>(ccmap_uptr.get(), neg_inf, pos_inf); + auto *s2 = new CcPage, + CompositeRecord, + true, + true>(ccmap_uptr.get(), neg_inf, pos_inf); + auto *l1 = new CcPage, + CompositeRecord, + true, + true>(ccmap_uptr.get(), neg_inf, pos_inf); + auto *l2 = new CcPage, + CompositeRecord, + true, + true>(ccmap_uptr.get(), neg_inf, pos_inf); + auto VerifyLruList = [&shard] + { + // protected_head_page_ must be either tail or a large page + REQUIRE(((shard.protected_head_page_ == &shard.tail_ccp_) || + (shard.protected_head_page_->large_obj_page_))); + + // Verify partition invariant: pages before protected head are small, + // pages from protected head to tail are large. + bool reached_prot = false; + for (LruPage *cur = shard.head_ccp_.lru_next_; cur != &shard.tail_ccp_; + cur = cur->lru_next_) + { + if (cur == shard.protected_head_page_) + { + reached_prot = true; + } + if (!reached_prot) + { + REQUIRE(cur->large_obj_page_ == false); + } + else + { + REQUIRE(cur->large_obj_page_ == true); + } + } + }; + + s1->large_obj_page_ = false; + s2->large_obj_page_ = false; + l1->large_obj_page_ = true; + l2->large_obj_page_ = true; + + // Build LRU order: s1 -> s2 -> l1 -> l2 + shard.UpdateLruList(s1, false); + shard.UpdateLruList(s2, false); + shard.UpdateLruList(l1, false); + shard.UpdateLruList(l2, false); + VerifyLruList(); + + // Shuffle LRU order + shard.UpdateLruList(l2, false); + shard.UpdateLruList(l1, false); + shard.UpdateLruList(s2, false); + shard.UpdateLruList(s1, false); + + VerifyLruList(); + + delete neg_inf; + delete pos_inf; + delete s1; + delete s2; + delete l1; + delete l2; + delete p1; + delete p2; +} + +} // namespace txservice + +int main(int argc, char **argv) +{ + gflags::ParseCommandLineFlags(&argc, &argv, true); + int ret = Catch::Session().run(argc, argv); + return ret; +} diff --git a/tx_service/tests/StartTsCollector-Test.cpp b/tx_service/tests/StartTsCollector-Test.cpp index c927a03e..f2a8a5f8 100644 --- a/tx_service/tests/StartTsCollector-Test.cpp +++ b/tx_service/tests/StartTsCollector-Test.cpp @@ -66,6 +66,8 @@ TEST_CASE("TxStartTsCollector GlobalMinSiTxStartTs unit test", std::pair("core_num", core_num)); tx_service_conf.insert( std::pair("range_split_worker_num", 0)); + tx_service_conf.insert(std::pair( + "range_slice_memory_limit_percent", 20)); tx_service_conf.insert( std::pair("node_memory_limit_mb", 1000)); tx_service_conf.insert( @@ -76,6 +78,8 @@ TEST_CASE("TxStartTsCollector GlobalMinSiTxStartTs unit test", std::pair("checkpointer_interval", 10)); tx_service_conf.insert( std::pair("checkpointer_delay_seconds", 0)); + tx_service_conf.insert(std::pair( + "checkpointer_min_ckpt_request_interval", 5)); tx_service_conf.insert( std::pair("enable_shard_heap_defragment", 0)); tx_service_conf.insert( diff --git a/tx_service/tests/include/mock/mock_catalog_factory.h b/tx_service/tests/include/mock/mock_catalog_factory.h index 923b43cf..c35e28b7 100644 --- a/tx_service/tests/include/mock/mock_catalog_factory.h +++ b/tx_service/tests/include/mock/mock_catalog_factory.h @@ -293,17 +293,40 @@ class MockCatalogFactory : public CatalogFactory return nullptr; } - TxKey NegativeInfKey() override + TxKey NegativeInfKey() const override { assert(false); return TxKey(); } - TxKey PositiveInfKey() override + TxKey PositiveInfKey() const override { assert(false); return TxKey(); } + + TxKey CreateTxKey() const override + { + assert(false); + return TxKey(); + } + + TxKey CreateTxKey(const char *data, size_t size) const override + { + assert(false); + return TxKey(); + } + + const TxKey *PackedNegativeInfinity() const override + { + assert(false); + return nullptr; + } + + std::unique_ptr CreateTxRecord() const override + { + return nullptr; + } }; class MockSystemHandler : public txservice::SystemHandler