diff --git a/core/src/tx_service_init.cpp b/core/src/tx_service_init.cpp index ec5883d7..ec8bb7bf 100644 --- a/core/src/tx_service_init.cpp +++ b/core/src/tx_service_init.cpp @@ -8,6 +8,10 @@ #include "sequences/sequences.h" #include "tx_service.h" DEFINE_int32(checkpointer_interval, 10, "Checkpointer interval in seconds"); +DEFINE_int32(checkpointer_min_ckpt_request_interval, + 5, + "Minimum checkpoint request interval in seconds, to avoid too " + "frequent checkpoint requests"); DEFINE_int32(range_slice_memory_limit_percent, 10, "Range slice memory limit percentage"); @@ -42,6 +46,16 @@ DEFINE_bool(auto_redirect, false, "If redirect remote object command to remote node internally"); +// Checkpoint trigger threshold based on dirty memory size +// Default is 0, which means 10% of memory_limit will be used as threshold +DEFINE_uint64(dirty_memory_check_interval, + 1000, + "Check dirty memory every N calls to AdjustDataKeyStats"); +DEFINE_uint64(dirty_memory_size_threshold_mb, + 0, + "Trigger checkpoint when dirty memory exceeds this (MB). " + "0 means use 10% of memory_limit as default"); + bool DataSubstrate::InitializeTxService(const INIReader &config_reader) { uint64_t checkpointer_interval = @@ -58,6 +72,14 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) "checkpointer_delay_seconds", FLAGS_checkpointer_delay_seconds); + uint64_t checkpointer_min_ckpt_request_interval = + !CheckCommandLineFlagIsDefault("checkpointer_min_ckpt_request_interval") + ? FLAGS_checkpointer_min_ckpt_request_interval + : config_reader.GetInteger( + "local", + "checkpointer_min_ckpt_request_interval", + FLAGS_checkpointer_min_ckpt_request_interval); + uint64_t collect_active_tx_ts_interval_seconds = !CheckCommandLineFlagIsDefault("collect_active_tx_ts_interval_seconds") ? FLAGS_collect_active_tx_ts_interval_seconds @@ -134,6 +156,20 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) : config_reader.GetBoolean( "local", "auto_redirect", FLAGS_auto_redirect); + uint64_t dirty_memory_check_interval = + !CheckCommandLineFlagIsDefault("dirty_memory_check_interval") + ? FLAGS_dirty_memory_check_interval + : config_reader.GetInteger("local", + "dirty_memory_check_interval", + FLAGS_dirty_memory_check_interval); + + uint64_t dirty_memory_size_threshold_mb = + !CheckCommandLineFlagIsDefault("dirty_memory_size_threshold_mb") + ? FLAGS_dirty_memory_size_threshold_mb + : config_reader.GetInteger("local", + "dirty_memory_size_threshold_mb", + FLAGS_dirty_memory_size_threshold_mb); + bool fork_hm_process = false; std::string hm_ip = ""; std::string hm_bin_path = ""; @@ -193,6 +229,8 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) std::map tx_service_conf{ {"core_num", core_config_.core_num}, {"checkpointer_interval", checkpointer_interval}, + {"checkpointer_min_ckpt_request_interval", + checkpointer_min_ckpt_request_interval}, {"node_memory_limit_mb", core_config_.node_memory_limit_mb}, {"checkpointer_delay_seconds", checkpointer_delay_seconds}, {"collect_active_tx_ts_interval_seconds", @@ -205,7 +243,13 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) {"enable_key_cache", enable_key_cache}, {"max_standby_lag", max_standby_lag}, {"kickout_data_for_test", kickout_data_for_test ? 1 : 0}, - {"range_slice_memory_limit_percent", range_slice_memory_limit_percent}}; + {"range_slice_memory_limit_percent", range_slice_memory_limit_percent}, + {"dirty_memory_check_interval", + static_cast(std::min(dirty_memory_check_interval, + static_cast(UINT32_MAX)))}, + {"dirty_memory_size_threshold_mb", + static_cast(std::min(dirty_memory_size_threshold_mb, + static_cast(UINT32_MAX)))}}; txservice::CatalogFactory *catalog_factory[NUM_EXTERNAL_ENGINES] = { nullptr, nullptr, nullptr}; diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index ee977e68..946dbe49 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -299,7 +299,9 @@ class CcShard uint64_t cluster_config_version, metrics::MetricsRegistry *metrics_registry = nullptr, metrics::CommonLabels common_labels = {}, - uint32_t range_slice_memory_limit_percent = 10); + uint32_t range_slice_memory_limit_percent = 10, + uint64_t dirty_memory_check_interval = 1000, + uint64_t dirty_memory_size_threshold_mb = 0); void Init(); @@ -319,6 +321,12 @@ class CcShard std::pair GetDataKeyStats() const; + /** + * @brief Check dirty memory thresholds and trigger checkpoint if exceeded. + * Called periodically from AdjustDataKeyStats based on sampling interval. + */ + void CheckAndTriggerCkptByDirtyMemory(); + void InitializeShardHeap() { if (shard_heap_thread_id_ == 0) @@ -1291,6 +1299,15 @@ class CcShard size_t data_key_count_{0}; // The number of committed dirty keys in data tables only. size_t dirty_data_key_count_{0}; + // Counter for sampling dirty memory checks in AdjustDataKeyStats. + uint64_t adjust_stats_call_count_{0}; + + // Config for dirty memory checkpoint triggering. + // The interval (in number of calls to AdjustDataKeyStats) to check whether + // dirty memory exceeds the threshold. + uint64_t dirty_memory_check_interval_{1000}; + // Pre-calculated threshold in bytes (0 means use 10% of memory_limit_). + uint64_t dirty_memory_threshold_bytes_{0}; Checkpointer *ckpter_; diff --git a/tx_service/include/checkpointer.h b/tx_service/include/checkpointer.h index 487c3ac8..a4e7a045 100644 --- a/tx_service/include/checkpointer.h +++ b/tx_service/include/checkpointer.h @@ -50,7 +50,8 @@ class Checkpointer store::DataStoreHandler *write_hd, const uint32_t &checkpoint_interval, TxLog *log_agent, - uint32_t ckpt_delay_seconds); + uint32_t ckpt_delay_seconds, + uint32_t min_ckpt_request_interval); ~Checkpointer() = default; @@ -152,15 +153,18 @@ class Checkpointer }; LocalCcShards &local_shards_; - // protects request_ckpt_ and status_ + // protects status_ std::mutex ckpt_mux_; std::condition_variable ckpt_cv_; - bool request_ckpt_; + std::atomic request_ckpt_{false}; store::DataStoreHandler *store_hd_; std::thread thd_; Status ckpt_thd_status_; const uint32_t checkpoint_interval_; + const uint32_t min_ckpt_request_interval_; std::chrono::system_clock::time_point last_checkpoint_ts_; + std::atomic + last_checkpoint_request_ts_; uint32_t ckpt_delay_time_; // unit: Microsecond std::atomic ongoing_data_sync_cnt_{0}; TxService *tx_service_; diff --git a/tx_service/include/tx_service.h b/tx_service/include/tx_service.h index 007d9dbe..1c1aa1f1 100644 --- a/tx_service/include/tx_service.h +++ b/tx_service/include/tx_service.h @@ -1163,7 +1163,8 @@ class TxService store_hd, conf.at("checkpointer_interval"), log_hd, - conf.at("checkpointer_delay_seconds")) + conf.at("checkpointer_delay_seconds"), + conf.at("checkpointer_min_ckpt_request_interval")) { assert(store_hd != nullptr || skip_kv); uint32_t core_cnt = conf.at("core_num"); diff --git a/tx_service/src/cc/cc_req_misc.cpp b/tx_service/src/cc/cc_req_misc.cpp index 95d64614..6b4766bc 100644 --- a/tx_service/src/cc/cc_req_misc.cpp +++ b/tx_service/src/cc/cc_req_misc.cpp @@ -1499,8 +1499,9 @@ bool ShardCleanCc::Execute(CcShard &ccs) if (free_count_ == 0 && !shard_heap->IsDefragHeapCcOnFly() && !Sharder::Instance().GetCheckpointer()->IsOngoingDataSync()) { - ccs.NotifyCkpt(); + ccs.NotifyCkpt(true); } + free_count_ = 0; // Return true will set the request as free, which means the // request is not in working state. diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index 04b7ef18..99e658b0 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -27,6 +27,7 @@ #include // std::chrono #include +#include // std::setprecision #include #include "cc/catalog_cc_map.h" @@ -72,7 +73,9 @@ CcShard::CcShard( uint64_t cluster_config_version, metrics::MetricsRegistry *metrics_registry, metrics::CommonLabels common_labels, - uint32_t range_slice_memory_limit_percent) + uint32_t range_slice_memory_limit_percent, + uint64_t dirty_memory_check_interval, + uint64_t dirty_memory_size_threshold_mb) : core_id_(core_id), core_cnt_(core_cnt), ng_id_(ng_id), @@ -102,7 +105,8 @@ CcShard::CcShard( catalog_factory[3], catalog_factory[4]}, system_handler_(system_handler), - active_si_txs_() + active_si_txs_(), + dirty_memory_check_interval_(dirty_memory_check_interval) { // Reserve range_slice_memory_limit_percent% for range slice info. // We update this to dynamically reserve the configured range slice @@ -117,6 +121,23 @@ CcShard::CcShard( static_cast(MB(node_memory_limit_mb) * memory_usage_ratio); memory_limit_ /= core_cnt_; + // Pre-calculate dirty memory threshold in bytes + if (dirty_memory_size_threshold_mb == 0) + { + // Default: 10% of memory limit per shard, with minimum floor of 1 MB + dirty_memory_threshold_bytes_ = + static_cast(memory_limit_ * 0.1); + if (dirty_memory_threshold_bytes_ == 0) + { + dirty_memory_threshold_bytes_ = 1024 * 1024; // 1 MB minimum + } + } + else + { + dirty_memory_threshold_bytes_ = + dirty_memory_size_threshold_mb * 1024 * 1024; + } + // Calculate standby buffer memory limit: 10% of node memory limit per // shard. These part of memory is calculated together with shard memory so // no need to subtract it from memory_limit_. @@ -419,6 +440,14 @@ void CcShard::AdjustDataKeyStats(const TableName &table_name, { dirty_data_key_count_ = static_cast(new_dirty); } + + // Check dirty memory thresholds periodically + if (dirty_memory_check_interval_ > 0 && + ++adjust_stats_call_count_ >= dirty_memory_check_interval_) + { + adjust_stats_call_count_ = 0; + CheckAndTriggerCkptByDirtyMemory(); + } } } @@ -427,6 +456,45 @@ std::pair CcShard::GetDataKeyStats() const return {data_key_count_, dirty_data_key_count_}; } +void CcShard::CheckAndTriggerCkptByDirtyMemory() +{ + if (memory_limit_ == 0 || data_key_count_ == 0 || ckpter_ == nullptr) + { + return; + } + + // Get current memory usage + if (GetShardHeap() == nullptr) + { + return; + } + int64_t allocated = 0, committed = 0; + GetShardHeap()->Full(&allocated, &committed); + + if (allocated <= 0) + { + return; + } + + // Calculate dirty memory + double dirty_key_ratio = static_cast(dirty_data_key_count_) / + static_cast(data_key_count_); + uint64_t dirty_memory = static_cast(allocated * dirty_key_ratio); + + // Trigger checkpoint when dirty memory exceeds threshold + if (dirty_memory > dirty_memory_threshold_bytes_) + { + DLOG(INFO) << "Shard " << core_id_ + << " triggering checkpoint - dirty_memory=" + << (dirty_memory / (1024 * 1024)) << "MB (threshold=" + << (dirty_memory_threshold_bytes_ / (1024 * 1024)) + << "MB), dirty_keys=" << dirty_data_key_count_ << "/" + << data_key_count_; + + NotifyCkpt(true); // Request immediate checkpoint + } +} + void CcShard::Enqueue(uint32_t thd_id, CcRequestBase *req) { // The memory order in enqueue() of the concurrent queue ensures that the diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index c0d1a06b..f1fa5f16 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -198,6 +198,14 @@ LocalCcShards::LocalCcShards( uint64_t node_memory_limit_mb = conf.at("node_memory_limit_mb"); uint32_t range_slice_memory_limit_percent = conf.at("range_slice_memory_limit_percent"); + uint64_t dirty_memory_check_interval = + conf.count("dirty_memory_check_interval") > 0 + ? conf.at("dirty_memory_check_interval") + : 1000; + uint64_t dirty_memory_size_threshold_mb = + conf.count("dirty_memory_size_threshold_mb") > 0 + ? conf.at("dirty_memory_size_threshold_mb") + : 0; uint16_t core_cnt = conf.at("core_num"); for (uint16_t thd_idx = 0; thd_idx < core_cnt; ++thd_idx) { @@ -215,7 +223,9 @@ LocalCcShards::LocalCcShards( cluster_config_version, metrics_registry, common_labels, - range_slice_memory_limit_percent)); + range_slice_memory_limit_percent, + dirty_memory_check_interval, + dirty_memory_size_threshold_mb)); } for (size_t i = 0; i < cc_shards_.size(); ++i) { diff --git a/tx_service/src/checkpointer.cpp b/tx_service/src/checkpointer.cpp index 7de601aa..afac83bc 100644 --- a/tx_service/src/checkpointer.cpp +++ b/tx_service/src/checkpointer.cpp @@ -55,15 +55,25 @@ Checkpointer::Checkpointer(LocalCcShards &shards, store::DataStoreHandler *write_hd, const uint32_t &checkpoint_interval, TxLog *log_agent, - uint32_t ckpt_delay_seconds) + uint32_t ckpt_delay_seconds, + uint32_t min_ckpt_request_interval) : local_shards_(shards), ckpt_mux_(), ckpt_cv_(), - request_ckpt_(false), store_hd_(write_hd), ckpt_thd_status_(Status::Active), checkpoint_interval_(checkpoint_interval), + min_ckpt_request_interval_(min_ckpt_request_interval), + last_checkpoint_ts_(std::chrono::system_clock::now()), + // Initialize last_checkpoint_request_ts_ to a time point that is + // sufficiently in the past, so that the first checkpoint request can be + // processed immediately + last_checkpoint_request_ts_( + std::chrono::system_clock::now() - + std::chrono::seconds(2 * min_ckpt_request_interval)), ckpt_delay_time_(ckpt_delay_seconds * 1000000), + ongoing_data_sync_cnt_(0), + tx_service_(nullptr), log_agent_(log_agent) { tx_service_ = shards.tx_service_; @@ -80,6 +90,7 @@ Checkpointer::Checkpointer(LocalCcShards &shards, DLOG(INFO) << "checkpointer init, checkpoint_interval_: " << checkpoint_interval_ + << " ,min_ckpt_request_interval_: " << min_ckpt_request_interval_ << " ,ckpt_delay_seconds: " << ckpt_delay_seconds; } @@ -407,7 +418,7 @@ void Checkpointer::Ckpt(bool is_last_ckpt) void Checkpointer::Run() { std::unique_lock lk(ckpt_mux_); - last_checkpoint_ts_ = std::chrono::high_resolution_clock::now(); + last_checkpoint_ts_ = std::chrono::system_clock::now(); while (ckpt_thd_status_ == Status::Active) { while (!ckpt_cv_.wait_for( @@ -420,11 +431,11 @@ void Checkpointer::Run() return true; } - // Either cc shards are full and have requested a checkpoint, or + // Either have requested a checkpoint, or // we've sleeped for at least checkpoint_interval_ seconds. // Only enqueue new checkpoint task if there's idle worker. - return (request_ckpt_ || - std::chrono::high_resolution_clock::now() >= + return (request_ckpt_.load(std::memory_order_acquire) || + std::chrono::system_clock::now() >= last_checkpoint_ts_ + std::chrono::seconds(checkpoint_interval_)); })) @@ -433,8 +444,8 @@ void Checkpointer::Run() } CODE_FAULT_INJECTOR("checkpointer_skip_ckpt", { - request_ckpt_ = false; - last_checkpoint_ts_ = std::chrono::high_resolution_clock::now(); + request_ckpt_.store(false, std::memory_order_release); + last_checkpoint_ts_ = std::chrono::system_clock::now(); continue; }); @@ -443,14 +454,14 @@ void Checkpointer::Run() break; } - last_checkpoint_ts_ = std::chrono::high_resolution_clock::now(); + last_checkpoint_ts_ = std::chrono::system_clock::now(); lk.unlock(); Ckpt(false); lk.lock(); // notify all waiting that one round checkpoint is done. ckpt_cv_.notify_all(); - request_ckpt_ = false; + request_ckpt_.store(false, std::memory_order_release); } // ensure normal shutdown execute checkpoint since we could receive @@ -470,11 +481,24 @@ void Checkpointer::Run() */ void Checkpointer::Notify(bool request_ckpt) { - std::unique_lock lk(ckpt_mux_); if (request_ckpt) { - request_ckpt_ = true; + auto now = std::chrono::system_clock::now(); + if (now < last_checkpoint_request_ts_.load(std::memory_order_relaxed) + + std::chrono::seconds(min_ckpt_request_interval_)) + { + return; + } + + bool expected = false; + if (!request_ckpt_.compare_exchange_strong(expected, true)) + { + return; + } + + last_checkpoint_request_ts_.store(now); } + std::unique_lock lk(ckpt_mux_); ckpt_cv_.notify_one(); }