Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion core/src/tx_service_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
#include "sequences/sequences.h"
#include "tx_service.h"
DEFINE_int32(checkpointer_interval, 10, "Checkpointer interval in seconds");
DEFINE_int32(checkpointer_min_ckpt_request_interval,
5,
"Minimum checkpoint request interval in seconds, to avoid too "
"frequent checkpoint requests");
DEFINE_int32(range_slice_memory_limit_percent,
10,
"Range slice memory limit percentage");
Expand Down Expand Up @@ -42,6 +46,16 @@ DEFINE_bool(auto_redirect,
false,
"If redirect remote object command to remote node internally");

// Checkpoint trigger threshold based on dirty memory size
// Default is 0, which means 10% of memory_limit will be used as threshold
DEFINE_uint64(dirty_memory_check_interval,
1000,
"Check dirty memory every N calls to AdjustDataKeyStats");
DEFINE_uint64(dirty_memory_size_threshold_mb,
0,
"Trigger checkpoint when dirty memory exceeds this (MB). "
"0 means use 10% of memory_limit as default");

bool DataSubstrate::InitializeTxService(const INIReader &config_reader)
{
uint64_t checkpointer_interval =
Expand All @@ -58,6 +72,14 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader)
"checkpointer_delay_seconds",
FLAGS_checkpointer_delay_seconds);

uint64_t checkpointer_min_ckpt_request_interval =
!CheckCommandLineFlagIsDefault("checkpointer_min_ckpt_request_interval")
? FLAGS_checkpointer_min_ckpt_request_interval
: config_reader.GetInteger(
"local",
"checkpointer_min_ckpt_request_interval",
FLAGS_checkpointer_min_ckpt_request_interval);

uint64_t collect_active_tx_ts_interval_seconds =
!CheckCommandLineFlagIsDefault("collect_active_tx_ts_interval_seconds")
? FLAGS_collect_active_tx_ts_interval_seconds
Expand Down Expand Up @@ -134,6 +156,20 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader)
: config_reader.GetBoolean(
"local", "auto_redirect", FLAGS_auto_redirect);

uint64_t dirty_memory_check_interval =
!CheckCommandLineFlagIsDefault("dirty_memory_check_interval")
? FLAGS_dirty_memory_check_interval
: config_reader.GetInteger("local",
"dirty_memory_check_interval",
FLAGS_dirty_memory_check_interval);

uint64_t dirty_memory_size_threshold_mb =
!CheckCommandLineFlagIsDefault("dirty_memory_size_threshold_mb")
? FLAGS_dirty_memory_size_threshold_mb
: config_reader.GetInteger("local",
"dirty_memory_size_threshold_mb",
FLAGS_dirty_memory_size_threshold_mb);

bool fork_hm_process = false;
std::string hm_ip = "";
std::string hm_bin_path = "";
Expand Down Expand Up @@ -193,6 +229,8 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader)
std::map<std::string, uint32_t> tx_service_conf{
{"core_num", core_config_.core_num},
{"checkpointer_interval", checkpointer_interval},
{"checkpointer_min_ckpt_request_interval",
checkpointer_min_ckpt_request_interval},
{"node_memory_limit_mb", core_config_.node_memory_limit_mb},
{"checkpointer_delay_seconds", checkpointer_delay_seconds},
{"collect_active_tx_ts_interval_seconds",
Expand All @@ -205,7 +243,13 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader)
{"enable_key_cache", enable_key_cache},
{"max_standby_lag", max_standby_lag},
{"kickout_data_for_test", kickout_data_for_test ? 1 : 0},
{"range_slice_memory_limit_percent", range_slice_memory_limit_percent}};
{"range_slice_memory_limit_percent", range_slice_memory_limit_percent},
{"dirty_memory_check_interval",
static_cast<uint32_t>(std::min(dirty_memory_check_interval,
static_cast<uint64_t>(UINT32_MAX)))},
{"dirty_memory_size_threshold_mb",
static_cast<uint32_t>(std::min(dirty_memory_size_threshold_mb,
static_cast<uint64_t>(UINT32_MAX)))}};

txservice::CatalogFactory *catalog_factory[NUM_EXTERNAL_ENGINES] = {
nullptr, nullptr, nullptr};
Expand Down
19 changes: 18 additions & 1 deletion tx_service/include/cc/cc_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ class CcShard
uint64_t cluster_config_version,
metrics::MetricsRegistry *metrics_registry = nullptr,
metrics::CommonLabels common_labels = {},
uint32_t range_slice_memory_limit_percent = 10);
uint32_t range_slice_memory_limit_percent = 10,
uint64_t dirty_memory_check_interval = 1000,
uint64_t dirty_memory_size_threshold_mb = 0);

void Init();

Expand All @@ -319,6 +321,12 @@ class CcShard

std::pair<size_t, size_t> GetDataKeyStats() const;

/**
* @brief Check dirty memory thresholds and trigger checkpoint if exceeded.
* Called periodically from AdjustDataKeyStats based on sampling interval.
*/
void CheckAndTriggerCkptByDirtyMemory();

void InitializeShardHeap()
{
if (shard_heap_thread_id_ == 0)
Expand Down Expand Up @@ -1291,6 +1299,15 @@ class CcShard
size_t data_key_count_{0};
// The number of committed dirty keys in data tables only.
size_t dirty_data_key_count_{0};
// Counter for sampling dirty memory checks in AdjustDataKeyStats.
uint64_t adjust_stats_call_count_{0};

// Config for dirty memory checkpoint triggering.
// The interval (in number of calls to AdjustDataKeyStats) to check whether
// dirty memory exceeds the threshold.
uint64_t dirty_memory_check_interval_{1000};
// Pre-calculated threshold in bytes (0 means use 10% of memory_limit_).
uint64_t dirty_memory_threshold_bytes_{0};

Checkpointer *ckpter_;

Expand Down
10 changes: 7 additions & 3 deletions tx_service/include/checkpointer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class Checkpointer
store::DataStoreHandler *write_hd,
const uint32_t &checkpoint_interval,
TxLog *log_agent,
uint32_t ckpt_delay_seconds);
uint32_t ckpt_delay_seconds,
uint32_t min_ckpt_request_interval);

~Checkpointer() = default;

Expand Down Expand Up @@ -152,15 +153,18 @@ class Checkpointer
};

LocalCcShards &local_shards_;
// protects request_ckpt_ and status_
// protects status_
std::mutex ckpt_mux_;
std::condition_variable ckpt_cv_;
bool request_ckpt_;
std::atomic<bool> request_ckpt_{false};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this changed to atomic? atomic and mutex + cv does not work well together

store::DataStoreHandler *store_hd_;
std::thread thd_;
Status ckpt_thd_status_;
const uint32_t checkpoint_interval_;
const uint32_t min_ckpt_request_interval_;
std::chrono::system_clock::time_point last_checkpoint_ts_;
std::atomic<std::chrono::system_clock::time_point>
last_checkpoint_request_ts_;
uint32_t ckpt_delay_time_; // unit: Microsecond
std::atomic<uint64_t> ongoing_data_sync_cnt_{0};
TxService *tx_service_;
Expand Down
3 changes: 2 additions & 1 deletion tx_service/include/tx_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,8 @@ class TxService
store_hd,
conf.at("checkpointer_interval"),
log_hd,
conf.at("checkpointer_delay_seconds"))
conf.at("checkpointer_delay_seconds"),
conf.at("checkpointer_min_ckpt_request_interval"))
{
assert(store_hd != nullptr || skip_kv);
uint32_t core_cnt = conf.at("core_num");
Expand Down
3 changes: 2 additions & 1 deletion tx_service/src/cc/cc_req_misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1499,8 +1499,9 @@ bool ShardCleanCc::Execute(CcShard &ccs)
if (free_count_ == 0 && !shard_heap->IsDefragHeapCcOnFly() &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep this as a safety check. This should not trigger. But if user set some terrible params like check interval = 1000000 and dirty ratio = 99%, we might be stuck at memory full and unable to trigger checkpoint

!Sharder::Instance().GetCheckpointer()->IsOngoingDataSync())
{
ccs.NotifyCkpt();
ccs.NotifyCkpt(true);
}

free_count_ = 0;
// Return true will set the request as free, which means the
// request is not in working state.
Expand Down
72 changes: 70 additions & 2 deletions tx_service/src/cc/cc_shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <chrono> // std::chrono
#include <cstdint>
#include <iomanip> // std::setprecision
#include <string>

#include "cc/catalog_cc_map.h"
Expand Down Expand Up @@ -72,7 +73,9 @@ CcShard::CcShard(
uint64_t cluster_config_version,
metrics::MetricsRegistry *metrics_registry,
metrics::CommonLabels common_labels,
uint32_t range_slice_memory_limit_percent)
uint32_t range_slice_memory_limit_percent,
uint64_t dirty_memory_check_interval,
uint64_t dirty_memory_size_threshold_mb)
: core_id_(core_id),
core_cnt_(core_cnt),
ng_id_(ng_id),
Expand Down Expand Up @@ -102,7 +105,8 @@ CcShard::CcShard(
catalog_factory[3],
catalog_factory[4]},
system_handler_(system_handler),
active_si_txs_()
active_si_txs_(),
dirty_memory_check_interval_(dirty_memory_check_interval)
{
// Reserve range_slice_memory_limit_percent% for range slice info.
// We update this to dynamically reserve the configured range slice
Expand All @@ -117,6 +121,23 @@ CcShard::CcShard(
static_cast<uint64_t>(MB(node_memory_limit_mb) * memory_usage_ratio);
memory_limit_ /= core_cnt_;

// Pre-calculate dirty memory threshold in bytes
if (dirty_memory_size_threshold_mb == 0)
{
// Default: 10% of memory limit per shard, with minimum floor of 1 MB
dirty_memory_threshold_bytes_ =
static_cast<uint64_t>(memory_limit_ * 0.1);
if (dirty_memory_threshold_bytes_ == 0)
{
dirty_memory_threshold_bytes_ = 1024 * 1024; // 1 MB minimum
}
}
else
{
dirty_memory_threshold_bytes_ =
dirty_memory_size_threshold_mb * 1024 * 1024;
}

// Calculate standby buffer memory limit: 10% of node memory limit per
// shard. These part of memory is calculated together with shard memory so
// no need to subtract it from memory_limit_.
Expand Down Expand Up @@ -419,6 +440,14 @@ void CcShard::AdjustDataKeyStats(const TableName &table_name,
{
dirty_data_key_count_ = static_cast<size_t>(new_dirty);
}

// Check dirty memory thresholds periodically
if (dirty_memory_check_interval_ > 0 &&
++adjust_stats_call_count_ >= dirty_memory_check_interval_)
{
adjust_stats_call_count_ = 0;
CheckAndTriggerCkptByDirtyMemory();
}
}
}

Expand All @@ -427,6 +456,45 @@ std::pair<size_t, size_t> CcShard::GetDataKeyStats() const
return {data_key_count_, dirty_data_key_count_};
}

void CcShard::CheckAndTriggerCkptByDirtyMemory()
{
if (memory_limit_ == 0 || data_key_count_ == 0 || ckpter_ == nullptr)
{
return;
}

// Get current memory usage
if (GetShardHeap() == nullptr)
{
return;
}
int64_t allocated = 0, committed = 0;
GetShardHeap()->Full(&allocated, &committed);

if (allocated <= 0)
{
return;
}

// Calculate dirty memory
double dirty_key_ratio = static_cast<double>(dirty_data_key_count_) /
static_cast<double>(data_key_count_);
uint64_t dirty_memory = static_cast<uint64_t>(allocated * dirty_key_ratio);

// Trigger checkpoint when dirty memory exceeds threshold
if (dirty_memory > dirty_memory_threshold_bytes_)
{
DLOG(INFO) << "Shard " << core_id_
<< " triggering checkpoint - dirty_memory="
<< (dirty_memory / (1024 * 1024)) << "MB (threshold="
<< (dirty_memory_threshold_bytes_ / (1024 * 1024))
<< "MB), dirty_keys=" << dirty_data_key_count_ << "/"
<< data_key_count_;

NotifyCkpt(true); // Request immediate checkpoint
}
}
Comment on lines +459 to +496
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Null-check GetShardHeap() before dereferencing.

GetShardHeap() can return nullptr if InitializeShardHeap() hasn't been called yet. The existing guards (memory_limit_ == 0, data_key_count_ == 0) don't guarantee the heap is initialized.

Additionally, allocated is int64_t — if it's zero or negative (edge case), casting allocated * dirty_key_ratio to uint64_t on Line 444 would produce an incorrect large value.

Proposed fix
 void CcShard::CheckAndTriggerCkptByDirtyMemory()
 {
     if (memory_limit_ == 0 || data_key_count_ == 0 || ckpter_ == nullptr)
     {
         return;
     }
 
+    if (GetShardHeap() == nullptr)
+    {
+        return;
+    }
+
     // Get current memory usage
     int64_t allocated = 0, committed = 0;
     GetShardHeap()->Full(&allocated, &committed);
 
+    if (allocated <= 0)
+    {
+        return;
+    }
+
     // Calculate dirty memory
🤖 Prompt for AI Agents
In `@tx_service/src/cc/cc_shard.cpp` around lines 430 - 469, In
CcShard::CheckAndTriggerCkptByDirtyMemory, add a nullptr check for
GetShardHeap() before calling Full() and return early if it is null to avoid
dereferencing an uninitialized heap; also guard the allocated value (returned by
Full()) so that if allocated <= 0 you treat it as 0 (or clamp to 0) before
computing dirty_memory and casting to uint64_t to avoid producing a huge
unsigned value when allocated is negative; update the dirty_memory calculation
to use the clamped allocated value and keep the rest of the threshold logic and
ckpter_->Notify(true) behavior unchanged.


void CcShard::Enqueue(uint32_t thd_id, CcRequestBase *req)
{
// The memory order in enqueue() of the concurrent queue ensures that the
Expand Down
12 changes: 11 additions & 1 deletion tx_service/src/cc/local_cc_shards.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ LocalCcShards::LocalCcShards(
uint64_t node_memory_limit_mb = conf.at("node_memory_limit_mb");
uint32_t range_slice_memory_limit_percent =
conf.at("range_slice_memory_limit_percent");
uint64_t dirty_memory_check_interval =
conf.count("dirty_memory_check_interval") > 0
? conf.at("dirty_memory_check_interval")
: 1000;
uint64_t dirty_memory_size_threshold_mb =
conf.count("dirty_memory_size_threshold_mb") > 0
? conf.at("dirty_memory_size_threshold_mb")
: 0;
uint16_t core_cnt = conf.at("core_num");
for (uint16_t thd_idx = 0; thd_idx < core_cnt; ++thd_idx)
{
Expand All @@ -215,7 +223,9 @@ LocalCcShards::LocalCcShards(
cluster_config_version,
metrics_registry,
common_labels,
range_slice_memory_limit_percent));
range_slice_memory_limit_percent,
dirty_memory_check_interval,
dirty_memory_size_threshold_mb));
}
for (size_t i = 0; i < cc_shards_.size(); ++i)
{
Expand Down
Loading