Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions core/src/tx_service_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ DEFINE_int32(collect_active_tx_ts_interval_seconds,
"Active transaction timestamp collection interval");
DEFINE_bool(kickout_data_for_test, false, "Kickout data for test");
DEFINE_bool(enable_key_cache, false, "Enable key cache");
DEFINE_string(cache_evict_policy,
"LRU",
"Cache eviction policy. Optional policy includes LRU|LO_LRU."
"LO_LRU is a variant LRU which is adapt for large objects.");
DEFINE_uint32(lolru_large_obj_threshold_kb,
1024,
"LO_LRU policy option. Threshold of large object in KB.");
DEFINE_uint32(max_standby_lag,
400000,
"txservice max msg lag between primary and standby");
Expand Down Expand Up @@ -223,6 +230,35 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader)
#endif
}

txservice::CacheEvictPolicy cache_evict_policy =
txservice::CacheEvictPolicy::LRU;
uint32_t lolru_large_obj_threshold_kb = 0;
std::string cache_evict_policy_str =
!CheckCommandLineFlagIsDefault("cache_evict_policy")
? FLAGS_cache_evict_policy
: config_reader.GetString(
"local", "cache_evict_policy", FLAGS_cache_evict_policy);
if (cache_evict_policy_str == txservice::CacheEvictPolicyLRU::NAME)
{
cache_evict_policy = txservice::CacheEvictPolicy::LRU;
}
else if (cache_evict_policy_str == txservice::CacheEvictPolicyLoLRU::NAME)
{
cache_evict_policy = txservice::CacheEvictPolicy::LO_LRU;
lolru_large_obj_threshold_kb =
!CheckCommandLineFlagIsDefault("lolru_large_obj_threshold_kb")
? FLAGS_lolru_large_obj_threshold_kb
: config_reader.GetInteger("local",
"lolru_large_obj_threshold_kb",
FLAGS_lolru_large_obj_threshold_kb);
}
else
{
LOG(ERROR) << "Invalidate `cache_evict_policy` "
<< cache_evict_policy_str;
return false;
}

LOG(INFO) << "Data substrate memory limit: "
<< core_config_.node_memory_limit_mb << "MB";

Expand Down Expand Up @@ -288,6 +324,15 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader)
store_hd_->SetTxService(tx_service_.get());
}

if (cache_evict_policy == txservice::CacheEvictPolicy::LRU)
{
tx_service_->SetupPolicyLRU();
}
else if (cache_evict_policy == txservice::CacheEvictPolicy::LO_LRU)
{
tx_service_->SetupPolicyLoLRU(lolru_large_obj_threshold_kb);
}

if (tx_service_->Start(network_config_.node_id,
network_config_.native_ng_id,
&network_config_.ng_configs,
Expand Down
6 changes: 6 additions & 0 deletions tx_service/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,14 @@ if(RUN_TX_SERVICE_TESTS)
target_include_directories(StartTsCollector-Test PUBLIC ${TX_SERVICE_SOURCE_DIR}/tests/include ${INCLUDE_DIR})
target_link_libraries(StartTsCollector-Test PUBLIC txservice Catch2::Catch2)

# LargeObjLRU-Test
add_executable(LargeObjLRU-Test ${TX_SERVICE_SOURCE_DIR}/tests/LargeObjLRU-Test.cpp)
target_include_directories(LargeObjLRU-Test PUBLIC ${TX_SERVICE_SOURCE_DIR}/tests/include ${INCLUDE_DIR})
target_link_libraries(LargeObjLRU-Test PUBLIC txservice Catch2::Catch2)

# Register tests with CTest
add_test(NAME CcEntry-Test COMMAND CcEntry-Test)
add_test(NAME CcPage-Test COMMAND CcPage-Test)
add_test(NAME StartTsCollector-Test COMMAND StartTsCollector-Test)
add_test(NAME LargeObjLRU-Test COMMAND LargeObjLRU-Test)
endif()
66 changes: 65 additions & 1 deletion tx_service/include/cc/cc_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,6 @@ struct NonVersionedPayload
ValueT tx_obj;
cur_payload_.reset(static_cast<ValueT *>(
tx_obj.DeserializeObject(data, offset).release()));
;
}

std::shared_ptr<ValueT> VersionedCurrentPayload()
Expand Down Expand Up @@ -2041,6 +2040,9 @@ struct LruPage
// without TTL set is UINT64_MAX. This value is used to decide if this page
// needs to be scanned when purging deleted entries in memory.
uint64_t smallest_ttl_{UINT64_MAX};

// Large object occupies one page exclusively.
bool large_obj_page_{false};
};

template <typename KeyT,
Expand Down Expand Up @@ -2283,6 +2285,40 @@ struct CcPage : public LruPage
return insert_pos;
}

size_t Emplace(
const KeyT &key,
std::unique_ptr<
CcEntry<KeyT, ValueT, VersionedRecord, RangePartitioned>> entry)
{
// append check
auto insert_it =
keys_.size() > 0 && keys_.back() < key
? keys_.end()
: std::lower_bound(keys_.begin(), keys_.end(), key);
assert(insert_it == keys_.end() || *insert_it != key);
assert(Size() < split_threshold_);

entry->UpdateCcPage(this);

if (entry->payload_.cur_payload_->HasTTL())
{
uint64_t ttl = entry->payload_.cur_payload_->GetTTL();
if (ttl < smallest_ttl_)
{
smallest_ttl_ = ttl;
}
}
Comment on lines +2303 to +2310
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle tombstones and null payloads when updating smallest_ttl_.

Line 2303 and Line 2588 dereference cur_payload_ unconditionally, but deleted entries are represented with a null payload. The full recompute path also never applies the documented deleted-entry sentinel (smallest_ttl_ = 0), so after re-homing a page with tombstones can either crash here or be skipped by purge logic.

Proposed fix
-        if (entry->payload_.cur_payload_->HasTTL())
+        if (entry->PayloadStatus() == RecordStatus::Deleted)
+        {
+            smallest_ttl_ = 0;
+        }
+        else if (smallest_ttl_ != 0 &&
+                 entry->PayloadStatus() == RecordStatus::Normal &&
+                 entry->payload_.cur_payload_ != nullptr &&
+                 entry->payload_.cur_payload_->HasTTL())
         {
-            uint64_t ttl = entry->payload_.cur_payload_->GetTTL();
-            if (ttl < smallest_ttl_)
-            {
-                smallest_ttl_ = ttl;
-            }
+            smallest_ttl_ =
+                std::min(smallest_ttl_,
+                         entry->payload_.cur_payload_->GetTTL());
         }
@@
     void UpdateSmallestTTL()
     {
         smallest_ttl_ = UINT64_MAX;
         for (std::unique_ptr<
                  CcEntry<KeyT, ValueT, VersionedRecord, RangePartitioned>>
                  &entry : entries_)
         {
-            if (entry->payload_.cur_payload_->HasTTL())
+            if (smallest_ttl_ == 0)
+            {
+                break;
+            }
+            if (entry->PayloadStatus() == RecordStatus::Deleted)
+            {
+                smallest_ttl_ = 0;
+            }
+            else if (entry->PayloadStatus() == RecordStatus::Normal &&
+                     entry->payload_.cur_payload_ != nullptr &&
+                     entry->payload_.cur_payload_->HasTTL())
             {
-                uint64_t ttl = entry->payload_.cur_payload_->GetTTL();
-                if (ttl < smallest_ttl_)
-                {
-                    smallest_ttl_ = ttl;
-                }
+                smallest_ttl_ =
+                    std::min(smallest_ttl_,
+                             entry->payload_.cur_payload_->GetTTL());
             }
         }
     }

Also applies to: 2581-2597

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/cc_entry.h` around lines 2303 - 2310, The code
dereferences entry->payload_.cur_payload_ without checking for null or
tombstone; update the logic that updates smallest_ttl_ to first check if
entry->payload_.cur_payload_ is non-null and not a tombstone before calling
HasTTL/GetTTL, and when walking the full-recompute path ensure the deleted-entry
sentinel is applied (set smallest_ttl_ = 0) if any entry has a null/tombstone
payload; apply the same null/tombstone guard to the other occurrence around
lines referencing entry->payload_.cur_payload_ (the block at 2581-2597) so you
never dereference a null payload and you correctly set smallest_ttl_ = 0 for
deleted entries.

if (entry->CommitTs() > last_dirty_commit_ts_)
{
last_dirty_commit_ts_ = entry->CommitTs();
}

size_t insert_pos = insert_it - keys_.begin();
keys_.emplace(insert_it, key);
entries_.emplace(entries_.begin() + insert_pos, std::move(entry));
return insert_pos;
}

/**
* Find lower bound of the key. Return lower bound the the key, or return
* keys_.size() if all keys is less than key.
Expand Down Expand Up @@ -2532,6 +2568,34 @@ struct CcPage : public LruPage
return Remove(idx);
}

std::unique_ptr<CcEntry<KeyT, ValueT, VersionedRecord, RangePartitioned>>
MoveEntry(size_t idx)
{
std::unique_ptr<
CcEntry<KeyT, ValueT, VersionedRecord, RangePartitioned>>
entry = std::move(entries_[idx]);
assert(entry != nullptr);
return entry;
}

void UpdateSmallestTTL()
{
smallest_ttl_ = UINT64_MAX;
for (std::unique_ptr<
CcEntry<KeyT, ValueT, VersionedRecord, RangePartitioned>>
&entry : entries_)
{
if (entry->payload_.cur_payload_->HasTTL())
{
uint64_t ttl = entry->payload_.cur_payload_->GetTTL();
if (ttl < smallest_ttl_)
{
smallest_ttl_ = ttl;
}
}
}
}

size_t Size() const
{
return keys_.size();
Expand Down
36 changes: 24 additions & 12 deletions tx_service/include/cc/cc_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,10 @@ class CcShard
return catalog_factory_[static_cast<int>(table_engine) - 1];
}

CacheEvictPolicy GetCacheEvictPolicy() const;

uint64_t LargeObjThresholdBytes() const;

/**
* Insert page at the end of the lru list as the most-recently accessed
* page.
Expand Down Expand Up @@ -983,13 +987,6 @@ class CcShard
return ng_id == ng_id_;
}

// native node group
const uint16_t core_id_;
const uint16_t core_cnt_;
const NodeGroupId ng_id_;
std::atomic<int32_t> meta_data_mux_{};
LocalCcShards &local_shards_;

bool EnableMvcc() const;
void AddActiveSiTx(TxNumber txn, uint64_t start_ts);
void RemoveActiveSiTx(TxNumber txn);
Expand All @@ -1006,11 +1003,6 @@ class CcShard
size_t ActiveBlockingTxSize() const;
void RemoveExpiredActiveBlockingTxs();

// shard level memory limit.
uint64_t memory_limit_{0};

const bool realtime_sampling_{true};

// Search lock_holding_txs_, find the entrys with waited transactions and
// save them into CheckDeadLockResult.
void CollectLockWaitingInfo(CheckDeadLockResult &dlr);
Expand Down Expand Up @@ -1190,6 +1182,19 @@ class CcShard
void CollectCacheHit();
void CollectCacheMiss();

public:
// native node group
const uint16_t core_id_;
const uint16_t core_cnt_;
const NodeGroupId ng_id_;
std::atomic<int32_t> meta_data_mux_{};
LocalCcShards &local_shards_;

// shard level memory limit.
uint64_t memory_limit_{0};

const bool realtime_sampling_{true};

private:
void SetTxProcNotifier(std::atomic<TxProcessorStatus> *tx_proc_status,
TxProcCoordinator *tx_coordi)
Expand Down Expand Up @@ -1310,6 +1315,13 @@ class CcShard
// Reserved head and tail for the double-linked list of cc entries, which
// simplifies handling of empty and one-element lists.
LruPage head_ccp_, tail_ccp_;

// head --- [small pages] --- protected_head_ --- [large pages] -- tail
//
// Declare as a pointer instead of as a dummy page. Eviction policies(SLRU)
// might share it.
LruPage *protected_head_page_;

/**
* @brief Each time a page is accessed and moved to the tail of the LRU
* list, the counter is incremented and assigned to the page. Since in a
Expand Down
19 changes: 19 additions & 0 deletions tx_service/include/cc/local_cc_shards.h
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,18 @@ class LocalCcShards

void FlushCurrentFlushBuffer();

void SetupPolicyLRU()
{
cache_evict_policy_ = CacheEvictPolicy::LRU;
}

void SetupPolicyLoLRU(uint32_t large_obj_threshold_kb)
{
cache_evict_policy_ = CacheEvictPolicy::LO_LRU;
u_cache_evict_policy_.lo_lru.large_obj_threshold_bytes =
large_obj_threshold_kb * 1024ull;
}
Comment on lines +1816 to +1826
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Make policy switches synchronized or init-only.

SetupPolicyLRU() / SetupPolicyLoLRU() mutate cache_evict_policy_ and large_obj_threshold_bytes with plain writes, while shard code reads the same fields during request processing. Since TxService exposes these setters directly, calling them after startup is a read/write data race, and readers can observe LO_LRU before the threshold update is visible. Either guard this state with synchronization or explicitly restrict these setters to pre-start initialization.

Also applies to: 2060-2065

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/local_cc_shards.h` around lines 1816 - 1826, The
setters SetupPolicyLRU and SetupPolicyLoLRU mutate shared state
(cache_evict_policy_ and u_cache_evict_policy_.lo_lru.large_obj_threshold_bytes)
that shard readers access concurrently; make these operations safe by either (A)
guarding the fields with a mutex used by readers and writers (protect
cache_evict_policy_ and u_cache_evict_policy_.lo_lru.large_obj_threshold_bytes
with a single std::mutex or std::shared_mutex, lock for write in
SetupPolicyLRU/SetupPolicyLoLRU and lock for read where shards consult the
policy), or (B) restrict the setters to init-only by asserting the service is
not started (add a started_ flag check in SetupPolicyLRU/SetupPolicyLoLRU and
document that they must be called before startup). Update all reader sites that
access
cache_evict_policy_/u_cache_evict_policy_.lo_lru.large_obj_threshold_bytes to
use the same synchronization (or skip if you choose the init-only path and
enforce the assert).


store::DataStoreHandler *const store_hd_;

/*
Expand Down Expand Up @@ -2045,6 +2057,13 @@ class LocalCcShards

bool realtime_sampling_;

CacheEvictPolicy cache_evict_policy_{CacheEvictPolicy::LRU};
union
{
CacheEvictPolicyLRU lru;
CacheEvictPolicyLoLRU lo_lru;
} u_cache_evict_policy_;

struct RangeSplitTask
{
RangeSplitTask(std::shared_ptr<DataSyncTask> data_sync_task,
Expand Down
Loading
Loading