Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions core/src/tx_service_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ DEFINE_int32(collect_active_tx_ts_interval_seconds,
"Active transaction timestamp collection interval");
DEFINE_bool(kickout_data_for_test, false, "Kickout data for test");
DEFINE_bool(enable_key_cache, false, "Enable key cache");
DEFINE_string(cache_evict_policy,
"LRU",
"Cache eviction policy. Optional policy includes LRU|LO_LRU."
"LO_LRU is a variant LRU which is adapt for large objects.");
DEFINE_uint32(lolru_large_obj_threshold_kb,
1024,
"LO_LRU policy option. Threshold of large object in KB.");
DEFINE_uint32(max_standby_lag,
400000,
"txservice max msg lag between primary and standby");
Expand Down Expand Up @@ -223,6 +230,35 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader)
#endif
}

txservice::CacheEvictPolicy cache_evict_policy =
txservice::CacheEvictPolicy::LRU;
uint32_t lolru_large_obj_threshold_kb = 0;
std::string cache_evict_policy_str =
!CheckCommandLineFlagIsDefault("cache_evict_policy")
? FLAGS_cache_evict_policy
: config_reader.GetString(
"local", "cache_evict_policy", FLAGS_cache_evict_policy);
if (cache_evict_policy_str == txservice::CacheEvictPolicyLRU::NAME)
{
cache_evict_policy = txservice::CacheEvictPolicy::LRU;
}
else if (cache_evict_policy_str == txservice::CacheEvictPolicyLoLRU::NAME)
{
cache_evict_policy = txservice::CacheEvictPolicy::LO_LRU;
lolru_large_obj_threshold_kb =
!CheckCommandLineFlagIsDefault("lolru_large_obj_threshold_kb")
? FLAGS_lolru_large_obj_threshold_kb
: config_reader.GetInteger("local",
"lolru_large_obj_threshold_kb",
FLAGS_lolru_large_obj_threshold_kb);
}
else
{
LOG(ERROR) << "Invalid `cache_evict_policy` "
<< cache_evict_policy_str;
return false;
}

LOG(INFO) << "Data substrate memory limit: "
<< core_config_.node_memory_limit_mb << "MB";

Expand Down Expand Up @@ -288,6 +324,15 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader)
store_hd_->SetTxService(tx_service_.get());
}

if (cache_evict_policy == txservice::CacheEvictPolicy::LRU)
{
tx_service_->SetupPolicyLRU();
}
else if (cache_evict_policy == txservice::CacheEvictPolicy::LO_LRU)
{
tx_service_->SetupPolicyLoLRU(lolru_large_obj_threshold_kb);
}

if (tx_service_->Start(network_config_.node_id,
network_config_.native_ng_id,
&network_config_.ng_configs,
Expand Down
6 changes: 6 additions & 0 deletions tx_service/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,14 @@ if(RUN_TX_SERVICE_TESTS)
target_include_directories(StartTsCollector-Test PUBLIC ${TX_SERVICE_SOURCE_DIR}/tests/include ${INCLUDE_DIR})
target_link_libraries(StartTsCollector-Test PUBLIC txservice Catch2::Catch2)

# LargeObjLRU-Test
add_executable(LargeObjLRU-Test ${TX_SERVICE_SOURCE_DIR}/tests/LargeObjLRU-Test.cpp)
target_include_directories(LargeObjLRU-Test PUBLIC ${TX_SERVICE_SOURCE_DIR}/tests/include ${INCLUDE_DIR})
target_link_libraries(LargeObjLRU-Test PUBLIC txservice Catch2::Catch2)

# Register tests with CTest
add_test(NAME CcEntry-Test COMMAND CcEntry-Test)
add_test(NAME CcPage-Test COMMAND CcPage-Test)
add_test(NAME StartTsCollector-Test COMMAND StartTsCollector-Test)
add_test(NAME LargeObjLRU-Test COMMAND LargeObjLRU-Test)
endif()
66 changes: 65 additions & 1 deletion tx_service/include/cc/cc_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,6 @@ struct NonVersionedPayload
ValueT tx_obj;
cur_payload_.reset(static_cast<ValueT *>(
tx_obj.DeserializeObject(data, offset).release()));
;
}

std::shared_ptr<ValueT> VersionedCurrentPayload()
Expand Down Expand Up @@ -2041,6 +2040,9 @@ struct LruPage
// without TTL set is UINT64_MAX. This value is used to decide if this page
// needs to be scanned when purging deleted entries in memory.
uint64_t smallest_ttl_{UINT64_MAX};

// Large object occupies one page exclusively.
bool large_obj_page_{false};
};

template <typename KeyT,
Expand Down Expand Up @@ -2283,6 +2285,40 @@ struct CcPage : public LruPage
return insert_pos;
}

size_t Emplace(
const KeyT &key,
std::unique_ptr<
CcEntry<KeyT, ValueT, VersionedRecord, RangePartitioned>> entry)
{
// append check
auto insert_it =
keys_.size() > 0 && keys_.back() < key
? keys_.end()
: std::lower_bound(keys_.begin(), keys_.end(), key);
assert(insert_it == keys_.end() || *insert_it != key);
assert(Size() < split_threshold_);

entry->UpdateCcPage(this);

if (entry->payload_.cur_payload_->HasTTL())
{
uint64_t ttl = entry->payload_.cur_payload_->GetTTL();
if (ttl < smallest_ttl_)
{
smallest_ttl_ = ttl;
}
}
if (entry->CommitTs() > last_dirty_commit_ts_)
{
last_dirty_commit_ts_ = entry->CommitTs();
}

size_t insert_pos = insert_it - keys_.begin();
keys_.emplace(insert_it, key);
entries_.emplace(entries_.begin() + insert_pos, std::move(entry));
return insert_pos;
}

/**
* Find lower bound of the key. Return lower bound the the key, or return
* keys_.size() if all keys is less than key.
Expand Down Expand Up @@ -2532,6 +2568,34 @@ struct CcPage : public LruPage
return Remove(idx);
}

std::unique_ptr<CcEntry<KeyT, ValueT, VersionedRecord, RangePartitioned>>
MoveEntry(size_t idx)
{
std::unique_ptr<
CcEntry<KeyT, ValueT, VersionedRecord, RangePartitioned>>
entry = std::move(entries_[idx]);
assert(entry != nullptr);
return entry;
}

void UpdateSmallestTTL()
{
smallest_ttl_ = UINT64_MAX;
for (std::unique_ptr<
CcEntry<KeyT, ValueT, VersionedRecord, RangePartitioned>>
&entry : entries_)
{
if (entry->payload_.cur_payload_->HasTTL())
{
uint64_t ttl = entry->payload_.cur_payload_->GetTTL();
if (ttl < smallest_ttl_)
{
smallest_ttl_ = ttl;
}
}
}
}

size_t Size() const
{
return keys_.size();
Expand Down
36 changes: 24 additions & 12 deletions tx_service/include/cc/cc_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,10 @@ class CcShard
return catalog_factory_[static_cast<int>(table_engine) - 1];
}

CacheEvictPolicy GetCacheEvictPolicy() const;

uint64_t LargeObjThresholdBytes() const;

/**
* Insert page at the end of the lru list as the most-recently accessed
* page.
Expand Down Expand Up @@ -965,13 +969,6 @@ class CcShard
return ng_id == ng_id_;
}

// native node group
const uint16_t core_id_;
const uint16_t core_cnt_;
const NodeGroupId ng_id_;
std::atomic<int32_t> meta_data_mux_{};
LocalCcShards &local_shards_;

bool EnableMvcc() const;
void AddActiveSiTx(TxNumber txn, uint64_t start_ts);
void RemoveActiveSiTx(TxNumber txn);
Expand All @@ -988,11 +985,6 @@ class CcShard
size_t ActiveBlockingTxSize() const;
void RemoveExpiredActiveBlockingTxs();

// shard level memory limit.
uint64_t memory_limit_{0};

const bool realtime_sampling_{true};

// Search lock_holding_txs_, find the entrys with waited transactions and
// save them into CheckDeadLockResult.
void CollectLockWaitingInfo(CheckDeadLockResult &dlr);
Expand Down Expand Up @@ -1160,6 +1152,19 @@ class CcShard
void CollectCacheHit();
void CollectCacheMiss();

public:
// native node group
const uint16_t core_id_;
const uint16_t core_cnt_;
const NodeGroupId ng_id_;
std::atomic<int32_t> meta_data_mux_{};
LocalCcShards &local_shards_;

// shard level memory limit.
uint64_t memory_limit_{0};

const bool realtime_sampling_{true};

private:
void SetTxProcNotifier(std::atomic<TxProcessorStatus> *tx_proc_status,
TxProcCoordinator *tx_coordi)
Expand Down Expand Up @@ -1280,6 +1285,13 @@ class CcShard
// Reserved head and tail for the double-linked list of cc entries, which
// simplifies handling of empty and one-element lists.
LruPage head_ccp_, tail_ccp_;

// head --- [small pages] --- protected_head_ --- [large pages] -- tail
//
// Declare as a pointer instead of as a dummy page. Eviction policies(SLRU)
// might share it.
LruPage *protected_head_page_;

/**
* @brief Each time a page is accessed and moved to the tail of the LRU
* list, the counter is incremented and assigned to the page. Since in a
Expand Down
19 changes: 19 additions & 0 deletions tx_service/include/cc/local_cc_shards.h
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,18 @@ class LocalCcShards

void FlushCurrentFlushBuffer();

void SetupPolicyLRU()
{
cache_evict_policy_ = CacheEvictPolicy::LRU;
}

void SetupPolicyLoLRU(uint32_t large_obj_threshold_kb)
{
cache_evict_policy_ = CacheEvictPolicy::LO_LRU;
u_cache_evict_policy_.lo_lru.large_obj_threshold_bytes =
large_obj_threshold_kb * 1024ull;
}

store::DataStoreHandler *const store_hd_;

/*
Expand Down Expand Up @@ -2045,6 +2057,13 @@ class LocalCcShards

bool realtime_sampling_;

CacheEvictPolicy cache_evict_policy_{CacheEvictPolicy::LRU};
union
{
CacheEvictPolicyLRU lru;
CacheEvictPolicyLoLRU lo_lru;
} u_cache_evict_policy_;

struct RangeSplitTask
{
RangeSplitTask(std::shared_ptr<DataSyncTask> data_sync_task,
Expand Down
Loading