From 108bca1cc95e8258e3f3a49161802a07b4ccab90 Mon Sep 17 00:00:00 2001 From: githubzilla Date: Tue, 10 Feb 2026 18:12:52 +0800 Subject: [PATCH 01/15] Initial commit --- tx_service/include/cc/cc_shard.h | 8 +++++ tx_service/src/cc/cc_req_misc.cpp | 10 ++---- tx_service/src/cc/cc_shard.cpp | 59 +++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 8 deletions(-) diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index ee977e68..9c8398cf 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -319,6 +319,12 @@ class CcShard std::pair GetDataKeyStats() const; + /** + * @brief Check dirty memory thresholds and trigger checkpoint if exceeded. + * Called periodically from AdjustDataKeyStats based on sampling interval. + */ + void CheckAndTriggerCkptByDirtyMemory(); + void InitializeShardHeap() { if (shard_heap_thread_id_ == 0) @@ -1291,6 +1297,8 @@ class CcShard size_t data_key_count_{0}; // The number of committed dirty keys in data tables only. size_t dirty_data_key_count_{0}; + // Counter for sampling dirty memory checks in AdjustDataKeyStats. + uint64_t adjust_stats_call_count_{0}; Checkpointer *ckpter_; diff --git a/tx_service/src/cc/cc_req_misc.cpp b/tx_service/src/cc/cc_req_misc.cpp index 95d64614..a52f2acf 100644 --- a/tx_service/src/cc/cc_req_misc.cpp +++ b/tx_service/src/cc/cc_req_misc.cpp @@ -1493,14 +1493,8 @@ bool ShardCleanCc::Execute(CcShard &ccs) // waiting ccrequests. ccs.AbortRequestsAfterMemoryFree(); - // Notify the checkpointer thread to do checkpoint if there - // is not freeable entries to be kicked out from ccmap and - // if the shard is not doing defrag. - if (free_count_ == 0 && !shard_heap->IsDefragHeapCcOnFly() && - !Sharder::Instance().GetCheckpointer()->IsOngoingDataSync()) - { - ccs.NotifyCkpt(); - } + // Note: Checkpoint is now triggered based on dirty memory + // thresholds in AdjustDataKeyStats, not on memory-full events. free_count_ = 0; // Return true will set the request as free, which means the // request is not in working state. diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index 04b7ef18..f17b371b 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -27,6 +27,7 @@ #include // std::chrono #include +#include // std::setprecision #include #include "cc/catalog_cc_map.h" @@ -59,6 +60,17 @@ DECLARE_bool(cmd_read_catalog); namespace txservice { DECLARE_double(ckpt_buffer_ratio); + +// Checkpoint trigger thresholds based on dirty memory +DEFINE_uint64(dirty_memory_check_interval, + 1000, + "Check dirty memory every N calls to AdjustDataKeyStats"); +DEFINE_double(dirty_memory_ratio_threshold, + 0.20, + "Trigger checkpoint when dirty memory / memory_limit exceeds this"); +DEFINE_uint64(dirty_memory_size_threshold_mb, + 50, + "Trigger checkpoint when dirty memory exceeds this (MB)"); CcShard::CcShard( uint16_t core_id, uint32_t core_cnt, @@ -419,6 +431,13 @@ void CcShard::AdjustDataKeyStats(const TableName &table_name, { dirty_data_key_count_ = static_cast(new_dirty); } + + // Check dirty memory thresholds periodically + if (FLAGS_dirty_memory_check_interval > 0 && + ++adjust_stats_call_count_ % FLAGS_dirty_memory_check_interval == 0) + { + CheckAndTriggerCkptByDirtyMemory(); + } } } @@ -427,6 +446,46 @@ std::pair CcShard::GetDataKeyStats() const return {data_key_count_, dirty_data_key_count_}; } +void CcShard::CheckAndTriggerCkptByDirtyMemory() +{ + if (memory_limit_ == 0 || data_key_count_ == 0 || ckpter_ == nullptr) + { + return; + } + + // Get current memory usage + int64_t allocated = 0, committed = 0; + GetShardHeap()->Full(&allocated, &committed); + + // Calculate dirty memory + double dirty_key_ratio = static_cast(dirty_data_key_count_) / + static_cast(data_key_count_); + uint64_t dirty_memory = static_cast(allocated * dirty_key_ratio); + double dirty_memory_ratio = + static_cast(dirty_memory) / static_cast(memory_limit_); + + // Check thresholds + uint64_t size_threshold = + FLAGS_dirty_memory_size_threshold_mb * 1024 * 1024; + bool ratio_exceeded = dirty_memory_ratio > FLAGS_dirty_memory_ratio_threshold; + bool size_exceeded = dirty_memory > size_threshold; + + if (ratio_exceeded || size_exceeded) + { + DLOG(INFO) << "Shard " << core_id_ + << " triggering checkpoint - dirty_memory_ratio=" << std::fixed + << std::setprecision(1) << (dirty_memory_ratio * 100) << "%" + << " (threshold=" + << (FLAGS_dirty_memory_ratio_threshold * 100) + << "%), dirty_memory=" << (dirty_memory / (1024 * 1024)) + << "MB (threshold=" << FLAGS_dirty_memory_size_threshold_mb + << "MB), dirty_keys=" << dirty_data_key_count_ << "/" + << data_key_count_; + + ckpter_->Notify(true); // Request immediate checkpoint + } +} + void CcShard::Enqueue(uint32_t thd_id, CcRequestBase *req) { // The memory order in enqueue() of the concurrent queue ensures that the From d88a237dc057b07288cb04db240c7a61bb0e84da Mon Sep 17 00:00:00 2001 From: githubzilla Date: Tue, 10 Feb 2026 18:54:05 +0800 Subject: [PATCH 02/15] Trigger ckpt only when both ratio and size exceeded --- tx_service/src/cc/cc_shard.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index f17b371b..e882fe8c 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -470,7 +470,8 @@ void CcShard::CheckAndTriggerCkptByDirtyMemory() bool ratio_exceeded = dirty_memory_ratio > FLAGS_dirty_memory_ratio_threshold; bool size_exceeded = dirty_memory > size_threshold; - if (ratio_exceeded || size_exceeded) + // Trigger checkpoint only when BOTH thresholds are exceeded + if (ratio_exceeded && size_exceeded) { DLOG(INFO) << "Shard " << core_id_ << " triggering checkpoint - dirty_memory_ratio=" << std::fixed From 4235db2a7b05a30c9baccd71dca8379cdbec7231 Mon Sep 17 00:00:00 2001 From: githubzilla Date: Tue, 10 Feb 2026 19:21:16 +0800 Subject: [PATCH 03/15] Only use dirty memory size to trigger ckpt --- tx_service/src/cc/cc_shard.cpp | 42 +++++++++++++++++----------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index e882fe8c..ce224703 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -61,16 +61,15 @@ namespace txservice { DECLARE_double(ckpt_buffer_ratio); -// Checkpoint trigger thresholds based on dirty memory +// Checkpoint trigger threshold based on dirty memory size +// Default is 0, which means 10% of memory_limit will be used as threshold DEFINE_uint64(dirty_memory_check_interval, 1000, "Check dirty memory every N calls to AdjustDataKeyStats"); -DEFINE_double(dirty_memory_ratio_threshold, - 0.20, - "Trigger checkpoint when dirty memory / memory_limit exceeds this"); DEFINE_uint64(dirty_memory_size_threshold_mb, - 50, - "Trigger checkpoint when dirty memory exceeds this (MB)"); + 0, + "Trigger checkpoint when dirty memory exceeds this (MB). " + "0 means use 10% of memory_limit as default"); CcShard::CcShard( uint16_t core_id, uint32_t core_cnt, @@ -461,25 +460,26 @@ void CcShard::CheckAndTriggerCkptByDirtyMemory() double dirty_key_ratio = static_cast(dirty_data_key_count_) / static_cast(data_key_count_); uint64_t dirty_memory = static_cast(allocated * dirty_key_ratio); - double dirty_memory_ratio = - static_cast(dirty_memory) / static_cast(memory_limit_); - // Check thresholds - uint64_t size_threshold = - FLAGS_dirty_memory_size_threshold_mb * 1024 * 1024; - bool ratio_exceeded = dirty_memory_ratio > FLAGS_dirty_memory_ratio_threshold; - bool size_exceeded = dirty_memory > size_threshold; + // Determine threshold: use configured value or default to 10% of memory_limit + uint64_t threshold; + if (FLAGS_dirty_memory_size_threshold_mb == 0) + { + // Default: 10% of memory limit + threshold = static_cast(memory_limit_ * 0.1); + } + else + { + threshold = FLAGS_dirty_memory_size_threshold_mb * 1024 * 1024; + } - // Trigger checkpoint only when BOTH thresholds are exceeded - if (ratio_exceeded && size_exceeded) + // Trigger checkpoint when dirty memory exceeds threshold + if (dirty_memory > threshold) { DLOG(INFO) << "Shard " << core_id_ - << " triggering checkpoint - dirty_memory_ratio=" << std::fixed - << std::setprecision(1) << (dirty_memory_ratio * 100) << "%" - << " (threshold=" - << (FLAGS_dirty_memory_ratio_threshold * 100) - << "%), dirty_memory=" << (dirty_memory / (1024 * 1024)) - << "MB (threshold=" << FLAGS_dirty_memory_size_threshold_mb + << " triggering checkpoint - dirty_memory=" + << (dirty_memory / (1024 * 1024)) << "MB (threshold=" + << (threshold / (1024 * 1024)) << "MB), dirty_keys=" << dirty_data_key_count_ << "/" << data_key_count_; From 7ecd5127f376c49342283e1fc60f8ff99d03253d Mon Sep 17 00:00:00 2001 From: githubzilla Date: Tue, 10 Feb 2026 19:26:54 +0800 Subject: [PATCH 04/15] fix clang-format --- tx_service/src/cc/cc_shard.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index ce224703..3d56f97f 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -461,7 +461,8 @@ void CcShard::CheckAndTriggerCkptByDirtyMemory() static_cast(data_key_count_); uint64_t dirty_memory = static_cast(allocated * dirty_key_ratio); - // Determine threshold: use configured value or default to 10% of memory_limit + // Determine threshold: use configured value or default to 10% of + // memory_limit uint64_t threshold; if (FLAGS_dirty_memory_size_threshold_mb == 0) { @@ -478,8 +479,8 @@ void CcShard::CheckAndTriggerCkptByDirtyMemory() { DLOG(INFO) << "Shard " << core_id_ << " triggering checkpoint - dirty_memory=" - << (dirty_memory / (1024 * 1024)) << "MB (threshold=" - << (threshold / (1024 * 1024)) + << (dirty_memory / (1024 * 1024)) + << "MB (threshold=" << (threshold / (1024 * 1024)) << "MB), dirty_keys=" << dirty_data_key_count_ << "/" << data_key_count_; From 13dbcc16829a7e2b47b5bd593be952d1eed20d73 Mon Sep 17 00:00:00 2001 From: githubzilla Date: Wed, 11 Feb 2026 17:16:24 +0800 Subject: [PATCH 05/15] Move dirty_memory config to tx_service_init.cpp --- core/src/tx_service_init.cpp | 28 ++++++++++++++++++++++++++- tx_service/include/cc/cc_shard.h | 8 +++++++- tx_service/src/cc/cc_shard.cpp | 26 ++++++++++--------------- tx_service/src/cc/local_cc_shards.cpp | 12 +++++++++++- 4 files changed, 55 insertions(+), 19 deletions(-) diff --git a/core/src/tx_service_init.cpp b/core/src/tx_service_init.cpp index ec5883d7..53e8a897 100644 --- a/core/src/tx_service_init.cpp +++ b/core/src/tx_service_init.cpp @@ -42,6 +42,16 @@ DEFINE_bool(auto_redirect, false, "If redirect remote object command to remote node internally"); +// Checkpoint trigger threshold based on dirty memory size +// Default is 0, which means 10% of memory_limit will be used as threshold +DEFINE_uint64(dirty_memory_check_interval, + 1000, + "Check dirty memory every N calls to AdjustDataKeyStats"); +DEFINE_uint64(dirty_memory_size_threshold_mb, + 0, + "Trigger checkpoint when dirty memory exceeds this (MB). " + "0 means use 10% of memory_limit as default"); + bool DataSubstrate::InitializeTxService(const INIReader &config_reader) { uint64_t checkpointer_interval = @@ -134,6 +144,20 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) : config_reader.GetBoolean( "local", "auto_redirect", FLAGS_auto_redirect); + uint64_t dirty_memory_check_interval = + !CheckCommandLineFlagIsDefault("dirty_memory_check_interval") + ? FLAGS_dirty_memory_check_interval + : config_reader.GetInteger("local", + "dirty_memory_check_interval", + FLAGS_dirty_memory_check_interval); + + uint64_t dirty_memory_size_threshold_mb = + !CheckCommandLineFlagIsDefault("dirty_memory_size_threshold_mb") + ? FLAGS_dirty_memory_size_threshold_mb + : config_reader.GetInteger("local", + "dirty_memory_size_threshold_mb", + FLAGS_dirty_memory_size_threshold_mb); + bool fork_hm_process = false; std::string hm_ip = ""; std::string hm_bin_path = ""; @@ -205,7 +229,9 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) {"enable_key_cache", enable_key_cache}, {"max_standby_lag", max_standby_lag}, {"kickout_data_for_test", kickout_data_for_test ? 1 : 0}, - {"range_slice_memory_limit_percent", range_slice_memory_limit_percent}}; + {"range_slice_memory_limit_percent", range_slice_memory_limit_percent}, + {"dirty_memory_check_interval", dirty_memory_check_interval}, + {"dirty_memory_size_threshold_mb", dirty_memory_size_threshold_mb}}; txservice::CatalogFactory *catalog_factory[NUM_EXTERNAL_ENGINES] = { nullptr, nullptr, nullptr}; diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index 9c8398cf..ecac68d4 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -299,7 +299,9 @@ class CcShard uint64_t cluster_config_version, metrics::MetricsRegistry *metrics_registry = nullptr, metrics::CommonLabels common_labels = {}, - uint32_t range_slice_memory_limit_percent = 10); + uint32_t range_slice_memory_limit_percent = 10, + uint64_t dirty_memory_check_interval = 1000, + uint64_t dirty_memory_size_threshold_mb = 0); void Init(); @@ -1300,6 +1302,10 @@ class CcShard // Counter for sampling dirty memory checks in AdjustDataKeyStats. uint64_t adjust_stats_call_count_{0}; + // Config for dirty memory checkpoint triggering. + uint64_t dirty_memory_check_interval_{1000}; + uint64_t dirty_memory_size_threshold_mb_{0}; + Checkpointer *ckpter_; /** diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index 3d56f97f..f2d08691 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -60,16 +60,6 @@ DECLARE_bool(cmd_read_catalog); namespace txservice { DECLARE_double(ckpt_buffer_ratio); - -// Checkpoint trigger threshold based on dirty memory size -// Default is 0, which means 10% of memory_limit will be used as threshold -DEFINE_uint64(dirty_memory_check_interval, - 1000, - "Check dirty memory every N calls to AdjustDataKeyStats"); -DEFINE_uint64(dirty_memory_size_threshold_mb, - 0, - "Trigger checkpoint when dirty memory exceeds this (MB). " - "0 means use 10% of memory_limit as default"); CcShard::CcShard( uint16_t core_id, uint32_t core_cnt, @@ -83,7 +73,9 @@ CcShard::CcShard( uint64_t cluster_config_version, metrics::MetricsRegistry *metrics_registry, metrics::CommonLabels common_labels, - uint32_t range_slice_memory_limit_percent) + uint32_t range_slice_memory_limit_percent, + uint64_t dirty_memory_check_interval, + uint64_t dirty_memory_size_threshold_mb) : core_id_(core_id), core_cnt_(core_cnt), ng_id_(ng_id), @@ -113,7 +105,9 @@ CcShard::CcShard( catalog_factory[3], catalog_factory[4]}, system_handler_(system_handler), - active_si_txs_() + active_si_txs_(), + dirty_memory_check_interval_(dirty_memory_check_interval), + dirty_memory_size_threshold_mb_(dirty_memory_size_threshold_mb) { // Reserve range_slice_memory_limit_percent% for range slice info. // We update this to dynamically reserve the configured range slice @@ -432,8 +426,8 @@ void CcShard::AdjustDataKeyStats(const TableName &table_name, } // Check dirty memory thresholds periodically - if (FLAGS_dirty_memory_check_interval > 0 && - ++adjust_stats_call_count_ % FLAGS_dirty_memory_check_interval == 0) + if (dirty_memory_check_interval_ > 0 && + ++adjust_stats_call_count_ % dirty_memory_check_interval_ == 0) { CheckAndTriggerCkptByDirtyMemory(); } @@ -464,14 +458,14 @@ void CcShard::CheckAndTriggerCkptByDirtyMemory() // Determine threshold: use configured value or default to 10% of // memory_limit uint64_t threshold; - if (FLAGS_dirty_memory_size_threshold_mb == 0) + if (dirty_memory_size_threshold_mb_ == 0) { // Default: 10% of memory limit threshold = static_cast(memory_limit_ * 0.1); } else { - threshold = FLAGS_dirty_memory_size_threshold_mb * 1024 * 1024; + threshold = dirty_memory_size_threshold_mb_ * 1024 * 1024; } // Trigger checkpoint when dirty memory exceeds threshold diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index c0d1a06b..f1fa5f16 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -198,6 +198,14 @@ LocalCcShards::LocalCcShards( uint64_t node_memory_limit_mb = conf.at("node_memory_limit_mb"); uint32_t range_slice_memory_limit_percent = conf.at("range_slice_memory_limit_percent"); + uint64_t dirty_memory_check_interval = + conf.count("dirty_memory_check_interval") > 0 + ? conf.at("dirty_memory_check_interval") + : 1000; + uint64_t dirty_memory_size_threshold_mb = + conf.count("dirty_memory_size_threshold_mb") > 0 + ? conf.at("dirty_memory_size_threshold_mb") + : 0; uint16_t core_cnt = conf.at("core_num"); for (uint16_t thd_idx = 0; thd_idx < core_cnt; ++thd_idx) { @@ -215,7 +223,9 @@ LocalCcShards::LocalCcShards( cluster_config_version, metrics_registry, common_labels, - range_slice_memory_limit_percent)); + range_slice_memory_limit_percent, + dirty_memory_check_interval, + dirty_memory_size_threshold_mb)); } for (size_t i = 0; i < cc_shards_.size(); ++i) { From df9bfb680e84314abb7e0dc1dba086ca25efef3b Mon Sep 17 00:00:00 2001 From: githubzilla Date: Thu, 12 Feb 2026 19:21:12 +0800 Subject: [PATCH 06/15] Fix comments --- core/src/tx_service_init.cpp | 8 +++-- tx_service/include/cc/cc_shard.h | 2 ++ tx_service/include/checkpointer.h | 14 ++++++-- tx_service/src/cc/cc_shard.cpp | 54 ++++++++++++++++++++----------- tx_service/src/checkpointer.cpp | 13 ++++---- 5 files changed, 62 insertions(+), 29 deletions(-) diff --git a/core/src/tx_service_init.cpp b/core/src/tx_service_init.cpp index 53e8a897..afd28eb9 100644 --- a/core/src/tx_service_init.cpp +++ b/core/src/tx_service_init.cpp @@ -230,8 +230,12 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) {"max_standby_lag", max_standby_lag}, {"kickout_data_for_test", kickout_data_for_test ? 1 : 0}, {"range_slice_memory_limit_percent", range_slice_memory_limit_percent}, - {"dirty_memory_check_interval", dirty_memory_check_interval}, - {"dirty_memory_size_threshold_mb", dirty_memory_size_threshold_mb}}; + {"dirty_memory_check_interval", + static_cast(std::min(dirty_memory_check_interval, + static_cast(UINT32_MAX)))}, + {"dirty_memory_size_threshold_mb", + static_cast(std::min(dirty_memory_size_threshold_mb, + static_cast(UINT32_MAX)))}}; txservice::CatalogFactory *catalog_factory[NUM_EXTERNAL_ENGINES] = { nullptr, nullptr, nullptr}; diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index ecac68d4..ac52fa4a 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -1305,6 +1305,8 @@ class CcShard // Config for dirty memory checkpoint triggering. uint64_t dirty_memory_check_interval_{1000}; uint64_t dirty_memory_size_threshold_mb_{0}; + // Pre-calculated threshold in bytes (0 means use 10% of memory_limit_). + uint64_t dirty_memory_threshold_bytes_{0}; Checkpointer *ckpter_; diff --git a/tx_service/include/checkpointer.h b/tx_service/include/checkpointer.h index 487c3ac8..21a7b5f6 100644 --- a/tx_service/include/checkpointer.h +++ b/tx_service/include/checkpointer.h @@ -143,6 +143,16 @@ class Checkpointer return ongoing_data_sync_cnt_.load(std::memory_order_relaxed) > 0; } + /** + * @brief Check if a checkpoint has been requested but not yet completed. + * This is useful to avoid spamming checkpoint notifications when a + * checkpoint is already queued. + */ + bool IsCheckpointRequested() const + { + return request_ckpt_.load(std::memory_order_relaxed); + } + private: enum struct Status { @@ -152,10 +162,10 @@ class Checkpointer }; LocalCcShards &local_shards_; - // protects request_ckpt_ and status_ + // protects status_ std::mutex ckpt_mux_; std::condition_variable ckpt_cv_; - bool request_ckpt_; + std::atomic request_ckpt_{false}; store::DataStoreHandler *store_hd_; std::thread thd_; Status ckpt_thd_status_; diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index f2d08691..381527cd 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -122,6 +122,23 @@ CcShard::CcShard( static_cast(MB(node_memory_limit_mb) * memory_usage_ratio); memory_limit_ /= core_cnt_; + // Pre-calculate dirty memory threshold in bytes + if (dirty_memory_size_threshold_mb_ == 0) + { + // Default: 10% of memory limit per shard, with minimum floor of 1 MB + dirty_memory_threshold_bytes_ = + static_cast(memory_limit_ * 0.1); + if (dirty_memory_threshold_bytes_ == 0) + { + dirty_memory_threshold_bytes_ = 1024 * 1024; // 1 MB minimum + } + } + else + { + dirty_memory_threshold_bytes_ = + dirty_memory_size_threshold_mb_ * 1024 * 1024; + } + // Calculate standby buffer memory limit: 10% of node memory limit per // shard. These part of memory is calculated together with shard memory so // no need to subtract it from memory_limit_. @@ -427,8 +444,9 @@ void CcShard::AdjustDataKeyStats(const TableName &table_name, // Check dirty memory thresholds periodically if (dirty_memory_check_interval_ > 0 && - ++adjust_stats_call_count_ % dirty_memory_check_interval_ == 0) + ++adjust_stats_call_count_ >= dirty_memory_check_interval_) { + adjust_stats_call_count_ = 0; CheckAndTriggerCkptByDirtyMemory(); } } @@ -447,38 +465,38 @@ void CcShard::CheckAndTriggerCkptByDirtyMemory() } // Get current memory usage + if (GetShardHeap() == nullptr) + { + return; + } int64_t allocated = 0, committed = 0; GetShardHeap()->Full(&allocated, &committed); + if (allocated <= 0) + { + return; + } + // Calculate dirty memory double dirty_key_ratio = static_cast(dirty_data_key_count_) / static_cast(data_key_count_); uint64_t dirty_memory = static_cast(allocated * dirty_key_ratio); - // Determine threshold: use configured value or default to 10% of - // memory_limit - uint64_t threshold; - if (dirty_memory_size_threshold_mb_ == 0) - { - // Default: 10% of memory limit - threshold = static_cast(memory_limit_ * 0.1); - } - else - { - threshold = dirty_memory_size_threshold_mb_ * 1024 * 1024; - } - // Trigger checkpoint when dirty memory exceeds threshold - if (dirty_memory > threshold) + if (dirty_memory > dirty_memory_threshold_bytes_) { DLOG(INFO) << "Shard " << core_id_ << " triggering checkpoint - dirty_memory=" - << (dirty_memory / (1024 * 1024)) - << "MB (threshold=" << (threshold / (1024 * 1024)) + << (dirty_memory / (1024 * 1024)) << "MB (threshold=" + << (dirty_memory_threshold_bytes_ / (1024 * 1024)) << "MB), dirty_keys=" << dirty_data_key_count_ << "/" << data_key_count_; - ckpter_->Notify(true); // Request immediate checkpoint + // Only notify if no checkpoint is already requested to avoid spamming + if (!ckpter_->IsCheckpointRequested()) + { + ckpter_->Notify(true); // Request immediate checkpoint + } } } diff --git a/tx_service/src/checkpointer.cpp b/tx_service/src/checkpointer.cpp index 7de601aa..aecfedf4 100644 --- a/tx_service/src/checkpointer.cpp +++ b/tx_service/src/checkpointer.cpp @@ -59,7 +59,6 @@ Checkpointer::Checkpointer(LocalCcShards &shards, : local_shards_(shards), ckpt_mux_(), ckpt_cv_(), - request_ckpt_(false), store_hd_(write_hd), ckpt_thd_status_(Status::Active), checkpoint_interval_(checkpoint_interval), @@ -420,10 +419,10 @@ void Checkpointer::Run() return true; } - // Either cc shards are full and have requested a checkpoint, or + // Either have requested a checkpoint, or // we've sleeped for at least checkpoint_interval_ seconds. // Only enqueue new checkpoint task if there's idle worker. - return (request_ckpt_ || + return (request_ckpt_.load(std::memory_order_relaxed) || std::chrono::high_resolution_clock::now() >= last_checkpoint_ts_ + std::chrono::seconds(checkpoint_interval_)); @@ -433,7 +432,7 @@ void Checkpointer::Run() } CODE_FAULT_INJECTOR("checkpointer_skip_ckpt", { - request_ckpt_ = false; + request_ckpt_.store(false, std::memory_order_relaxed); last_checkpoint_ts_ = std::chrono::high_resolution_clock::now(); continue; }); @@ -450,7 +449,7 @@ void Checkpointer::Run() // notify all waiting that one round checkpoint is done. ckpt_cv_.notify_all(); - request_ckpt_ = false; + request_ckpt_.store(false, std::memory_order_relaxed); } // ensure normal shutdown execute checkpoint since we could receive @@ -470,11 +469,11 @@ void Checkpointer::Run() */ void Checkpointer::Notify(bool request_ckpt) { - std::unique_lock lk(ckpt_mux_); if (request_ckpt) { - request_ckpt_ = true; + request_ckpt_.store(true, std::memory_order_relaxed); } + std::unique_lock lk(ckpt_mux_); ckpt_cv_.notify_one(); } From 4e454e2d5ba1c696486396f24c60b563d6a4f02e Mon Sep 17 00:00:00 2001 From: githubzilla Date: Fri, 13 Feb 2026 12:17:05 +0800 Subject: [PATCH 07/15] Leverage CcShard::NotifyCkpt() --- tx_service/src/cc/cc_shard.cpp | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index 381527cd..ec09256f 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -492,11 +492,7 @@ void CcShard::CheckAndTriggerCkptByDirtyMemory() << "MB), dirty_keys=" << dirty_data_key_count_ << "/" << data_key_count_; - // Only notify if no checkpoint is already requested to avoid spamming - if (!ckpter_->IsCheckpointRequested()) - { - ckpter_->Notify(true); // Request immediate checkpoint - } + NotifyCkpt(true); // Request immediate checkpoint } } @@ -1322,7 +1318,7 @@ bool CcShard::FlushEntryForTest( void CcShard::NotifyCkpt(bool request_ckpt) { - if (ckpter_ != nullptr) + if (ckpter_ != nullptr && !ckpter_->IsCheckpointRequested()) { ckpter_->Notify(request_ckpt); } From a34a21da64ce275f9afba0bf3aa2a805fca68e40 Mon Sep 17 00:00:00 2001 From: githubzilla Date: Fri, 13 Feb 2026 18:04:39 +0800 Subject: [PATCH 08/15] Guarante memory order of request_ckpt_ and notify --- tx_service/include/checkpointer.h | 10 ---------- tx_service/src/cc/cc_shard.cpp | 2 +- tx_service/src/checkpointer.cpp | 13 +++++++++---- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/tx_service/include/checkpointer.h b/tx_service/include/checkpointer.h index 21a7b5f6..8508d0fb 100644 --- a/tx_service/include/checkpointer.h +++ b/tx_service/include/checkpointer.h @@ -143,16 +143,6 @@ class Checkpointer return ongoing_data_sync_cnt_.load(std::memory_order_relaxed) > 0; } - /** - * @brief Check if a checkpoint has been requested but not yet completed. - * This is useful to avoid spamming checkpoint notifications when a - * checkpoint is already queued. - */ - bool IsCheckpointRequested() const - { - return request_ckpt_.load(std::memory_order_relaxed); - } - private: enum struct Status { diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index ec09256f..ff0e8d75 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -1318,7 +1318,7 @@ bool CcShard::FlushEntryForTest( void CcShard::NotifyCkpt(bool request_ckpt) { - if (ckpter_ != nullptr && !ckpter_->IsCheckpointRequested()) + if (ckpter_ != nullptr) { ckpter_->Notify(request_ckpt); } diff --git a/tx_service/src/checkpointer.cpp b/tx_service/src/checkpointer.cpp index aecfedf4..ab9c030b 100644 --- a/tx_service/src/checkpointer.cpp +++ b/tx_service/src/checkpointer.cpp @@ -422,7 +422,7 @@ void Checkpointer::Run() // Either have requested a checkpoint, or // we've sleeped for at least checkpoint_interval_ seconds. // Only enqueue new checkpoint task if there's idle worker. - return (request_ckpt_.load(std::memory_order_relaxed) || + return (request_ckpt_.load(std::memory_order_acquire) || std::chrono::high_resolution_clock::now() >= last_checkpoint_ts_ + std::chrono::seconds(checkpoint_interval_)); @@ -432,7 +432,7 @@ void Checkpointer::Run() } CODE_FAULT_INJECTOR("checkpointer_skip_ckpt", { - request_ckpt_.store(false, std::memory_order_relaxed); + request_ckpt_.store(false, std::memory_order_release); last_checkpoint_ts_ = std::chrono::high_resolution_clock::now(); continue; }); @@ -449,7 +449,7 @@ void Checkpointer::Run() // notify all waiting that one round checkpoint is done. ckpt_cv_.notify_all(); - request_ckpt_.store(false, std::memory_order_relaxed); + request_ckpt_.store(false, std::memory_order_release); } // ensure normal shutdown execute checkpoint since we could receive @@ -471,7 +471,12 @@ void Checkpointer::Notify(bool request_ckpt) { if (request_ckpt) { - request_ckpt_.store(true, std::memory_order_relaxed); + bool expected = false; + if (!request_ckpt_.compare_exchange_strong( + expected, true, std::memory_order_release)) + { + return; + } } std::unique_lock lk(ckpt_mux_); ckpt_cv_.notify_one(); From 87efd560aae779a9da60a148d95ea20b2ed133bc Mon Sep 17 00:00:00 2001 From: githubzilla Date: Fri, 13 Feb 2026 19:24:44 +0800 Subject: [PATCH 09/15] Add checkpoint_min_interval --- core/src/tx_service_init.cpp | 19 +++++++++++++++++++ tx_service/include/checkpointer.h | 4 +++- tx_service/include/tx_service.h | 3 ++- tx_service/src/checkpointer.cpp | 12 +++++++++++- 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/core/src/tx_service_init.cpp b/core/src/tx_service_init.cpp index afd28eb9..e9f52ee8 100644 --- a/core/src/tx_service_init.cpp +++ b/core/src/tx_service_init.cpp @@ -8,6 +8,9 @@ #include "sequences/sequences.h" #include "tx_service.h" DEFINE_int32(checkpointer_interval, 10, "Checkpointer interval in seconds"); +DEFINE_int32(checkpointer_min_interval, + 10, + "Minimum checkpointer interval in seconds"); DEFINE_int32(range_slice_memory_limit_percent, 10, "Range slice memory limit percentage"); @@ -68,6 +71,21 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) "checkpointer_delay_seconds", FLAGS_checkpointer_delay_seconds); + uint64_t checkpointer_min_interval = + !CheckCommandLineFlagIsDefault("checkpointer_min_interval") + ? FLAGS_checkpointer_min_interval + : config_reader.GetInteger("local", + "checkpointer_min_interval", + FLAGS_checkpointer_min_interval); + + if (checkpointer_min_interval >= checkpointer_interval) + { + LOG(ERROR) << "checkpointer_min_interval (" << checkpointer_min_interval + << ") must be smaller than checkpointer_interval (" + << checkpointer_interval << ")"; + return false; + } + uint64_t collect_active_tx_ts_interval_seconds = !CheckCommandLineFlagIsDefault("collect_active_tx_ts_interval_seconds") ? FLAGS_collect_active_tx_ts_interval_seconds @@ -217,6 +235,7 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) std::map tx_service_conf{ {"core_num", core_config_.core_num}, {"checkpointer_interval", checkpointer_interval}, + {"checkpointer_min_interval", checkpointer_min_interval}, {"node_memory_limit_mb", core_config_.node_memory_limit_mb}, {"checkpointer_delay_seconds", checkpointer_delay_seconds}, {"collect_active_tx_ts_interval_seconds", diff --git a/tx_service/include/checkpointer.h b/tx_service/include/checkpointer.h index 8508d0fb..93c652b6 100644 --- a/tx_service/include/checkpointer.h +++ b/tx_service/include/checkpointer.h @@ -50,7 +50,8 @@ class Checkpointer store::DataStoreHandler *write_hd, const uint32_t &checkpoint_interval, TxLog *log_agent, - uint32_t ckpt_delay_seconds); + uint32_t ckpt_delay_seconds, + uint32_t min_checkpoint_interval); ~Checkpointer() = default; @@ -160,6 +161,7 @@ class Checkpointer std::thread thd_; Status ckpt_thd_status_; const uint32_t checkpoint_interval_; + const uint32_t min_checkpoint_interval_; std::chrono::system_clock::time_point last_checkpoint_ts_; uint32_t ckpt_delay_time_; // unit: Microsecond std::atomic ongoing_data_sync_cnt_{0}; diff --git a/tx_service/include/tx_service.h b/tx_service/include/tx_service.h index 007d9dbe..3d29958a 100644 --- a/tx_service/include/tx_service.h +++ b/tx_service/include/tx_service.h @@ -1163,7 +1163,8 @@ class TxService store_hd, conf.at("checkpointer_interval"), log_hd, - conf.at("checkpointer_delay_seconds")) + conf.at("checkpointer_delay_seconds"), + conf.at("checkpointer_min_interval")) { assert(store_hd != nullptr || skip_kv); uint32_t core_cnt = conf.at("core_num"); diff --git a/tx_service/src/checkpointer.cpp b/tx_service/src/checkpointer.cpp index ab9c030b..0ec11a1d 100644 --- a/tx_service/src/checkpointer.cpp +++ b/tx_service/src/checkpointer.cpp @@ -55,13 +55,15 @@ Checkpointer::Checkpointer(LocalCcShards &shards, store::DataStoreHandler *write_hd, const uint32_t &checkpoint_interval, TxLog *log_agent, - uint32_t ckpt_delay_seconds) + uint32_t ckpt_delay_seconds, + uint32_t min_checkpoint_interval) : local_shards_(shards), ckpt_mux_(), ckpt_cv_(), store_hd_(write_hd), ckpt_thd_status_(Status::Active), checkpoint_interval_(checkpoint_interval), + min_checkpoint_interval_(min_checkpoint_interval), ckpt_delay_time_(ckpt_delay_seconds * 1000000), log_agent_(log_agent) { @@ -79,6 +81,7 @@ Checkpointer::Checkpointer(LocalCcShards &shards, DLOG(INFO) << "checkpointer init, checkpoint_interval_: " << checkpoint_interval_ + << " ,min_checkpoint_interval_: " << min_checkpoint_interval_ << " ,ckpt_delay_seconds: " << ckpt_delay_seconds; } @@ -471,6 +474,13 @@ void Checkpointer::Notify(bool request_ckpt) { if (request_ckpt) { + if (std::chrono::high_resolution_clock::now() < + last_checkpoint_ts_ + + std::chrono::seconds(min_checkpoint_interval_)) + { + return; + } + bool expected = false; if (!request_ckpt_.compare_exchange_strong( expected, true, std::memory_order_release)) From bd6508c2fdbc1d89607ff1e153cebe5f83a05ef3 Mon Sep 17 00:00:00 2001 From: githubzilla Date: Fri, 13 Feb 2026 19:31:11 +0800 Subject: [PATCH 10/15] Add comments --- tx_service/include/cc/cc_shard.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index ac52fa4a..d1195258 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -1303,7 +1303,10 @@ class CcShard uint64_t adjust_stats_call_count_{0}; // Config for dirty memory checkpoint triggering. + // The interval (in number of calls to AdjustDataKeyStats) to check whether + // dirty memory exceeds the threshold. uint64_t dirty_memory_check_interval_{1000}; + // The threshold of dirty memory size (in MB) to trigger checkpoint. uint64_t dirty_memory_size_threshold_mb_{0}; // Pre-calculated threshold in bytes (0 means use 10% of memory_limit_). uint64_t dirty_memory_threshold_bytes_{0}; From 5619eb6581455314ee84d75a2e05f1597f52c63a Mon Sep 17 00:00:00 2001 From: githubzilla Date: Sat, 14 Feb 2026 17:52:50 +0800 Subject: [PATCH 11/15] Make ckpt min interval default value smaller than the de-facto ckpt interval used when test --- core/src/tx_service_init.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/tx_service_init.cpp b/core/src/tx_service_init.cpp index e9f52ee8..730ad2ad 100644 --- a/core/src/tx_service_init.cpp +++ b/core/src/tx_service_init.cpp @@ -9,7 +9,7 @@ #include "tx_service.h" DEFINE_int32(checkpointer_interval, 10, "Checkpointer interval in seconds"); DEFINE_int32(checkpointer_min_interval, - 10, + 9, "Minimum checkpointer interval in seconds"); DEFINE_int32(range_slice_memory_limit_percent, 10, From 9b336ebd7bd2a71b7837d6a7c8a3923289833bca Mon Sep 17 00:00:00 2001 From: githubzilla Date: Fri, 20 Feb 2026 12:46:07 +0800 Subject: [PATCH 12/15] Seperate the ckpt request interval with ckpt interval --- core/src/tx_service_init.cpp | 31 +++++++++++++------------------ tx_service/include/checkpointer.h | 6 ++++-- tx_service/include/tx_service.h | 2 +- tx_service/src/checkpointer.cpp | 17 +++++++++++------ 4 files changed, 29 insertions(+), 27 deletions(-) diff --git a/core/src/tx_service_init.cpp b/core/src/tx_service_init.cpp index 730ad2ad..ec8bb7bf 100644 --- a/core/src/tx_service_init.cpp +++ b/core/src/tx_service_init.cpp @@ -8,9 +8,10 @@ #include "sequences/sequences.h" #include "tx_service.h" DEFINE_int32(checkpointer_interval, 10, "Checkpointer interval in seconds"); -DEFINE_int32(checkpointer_min_interval, - 9, - "Minimum checkpointer interval in seconds"); +DEFINE_int32(checkpointer_min_ckpt_request_interval, + 5, + "Minimum checkpoint request interval in seconds, to avoid too " + "frequent checkpoint requests"); DEFINE_int32(range_slice_memory_limit_percent, 10, "Range slice memory limit percentage"); @@ -71,20 +72,13 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) "checkpointer_delay_seconds", FLAGS_checkpointer_delay_seconds); - uint64_t checkpointer_min_interval = - !CheckCommandLineFlagIsDefault("checkpointer_min_interval") - ? FLAGS_checkpointer_min_interval - : config_reader.GetInteger("local", - "checkpointer_min_interval", - FLAGS_checkpointer_min_interval); - - if (checkpointer_min_interval >= checkpointer_interval) - { - LOG(ERROR) << "checkpointer_min_interval (" << checkpointer_min_interval - << ") must be smaller than checkpointer_interval (" - << checkpointer_interval << ")"; - return false; - } + uint64_t checkpointer_min_ckpt_request_interval = + !CheckCommandLineFlagIsDefault("checkpointer_min_ckpt_request_interval") + ? FLAGS_checkpointer_min_ckpt_request_interval + : config_reader.GetInteger( + "local", + "checkpointer_min_ckpt_request_interval", + FLAGS_checkpointer_min_ckpt_request_interval); uint64_t collect_active_tx_ts_interval_seconds = !CheckCommandLineFlagIsDefault("collect_active_tx_ts_interval_seconds") @@ -235,7 +229,8 @@ bool DataSubstrate::InitializeTxService(const INIReader &config_reader) std::map tx_service_conf{ {"core_num", core_config_.core_num}, {"checkpointer_interval", checkpointer_interval}, - {"checkpointer_min_interval", checkpointer_min_interval}, + {"checkpointer_min_ckpt_request_interval", + checkpointer_min_ckpt_request_interval}, {"node_memory_limit_mb", core_config_.node_memory_limit_mb}, {"checkpointer_delay_seconds", checkpointer_delay_seconds}, {"collect_active_tx_ts_interval_seconds", diff --git a/tx_service/include/checkpointer.h b/tx_service/include/checkpointer.h index 93c652b6..a5f1aacb 100644 --- a/tx_service/include/checkpointer.h +++ b/tx_service/include/checkpointer.h @@ -51,7 +51,7 @@ class Checkpointer const uint32_t &checkpoint_interval, TxLog *log_agent, uint32_t ckpt_delay_seconds, - uint32_t min_checkpoint_interval); + uint32_t min_ckpt_request_interval); ~Checkpointer() = default; @@ -161,8 +161,10 @@ class Checkpointer std::thread thd_; Status ckpt_thd_status_; const uint32_t checkpoint_interval_; - const uint32_t min_checkpoint_interval_; + const uint32_t min_ckpt_request_interval_; std::chrono::system_clock::time_point last_checkpoint_ts_; + std::atomic + last_checkpoint_request_ts_; uint32_t ckpt_delay_time_; // unit: Microsecond std::atomic ongoing_data_sync_cnt_{0}; TxService *tx_service_; diff --git a/tx_service/include/tx_service.h b/tx_service/include/tx_service.h index 3d29958a..1c1aa1f1 100644 --- a/tx_service/include/tx_service.h +++ b/tx_service/include/tx_service.h @@ -1164,7 +1164,7 @@ class TxService conf.at("checkpointer_interval"), log_hd, conf.at("checkpointer_delay_seconds"), - conf.at("checkpointer_min_interval")) + conf.at("checkpointer_min_ckpt_request_interval")) { assert(store_hd != nullptr || skip_kv); uint32_t core_cnt = conf.at("core_num"); diff --git a/tx_service/src/checkpointer.cpp b/tx_service/src/checkpointer.cpp index 0ec11a1d..3817d3e5 100644 --- a/tx_service/src/checkpointer.cpp +++ b/tx_service/src/checkpointer.cpp @@ -56,16 +56,17 @@ Checkpointer::Checkpointer(LocalCcShards &shards, const uint32_t &checkpoint_interval, TxLog *log_agent, uint32_t ckpt_delay_seconds, - uint32_t min_checkpoint_interval) + uint32_t min_ckpt_request_interval) : local_shards_(shards), ckpt_mux_(), ckpt_cv_(), store_hd_(write_hd), ckpt_thd_status_(Status::Active), checkpoint_interval_(checkpoint_interval), - min_checkpoint_interval_(min_checkpoint_interval), + min_ckpt_request_interval_(min_ckpt_request_interval), ckpt_delay_time_(ckpt_delay_seconds * 1000000), - log_agent_(log_agent) + log_agent_(log_agent), + last_checkpoint_request_ts_(std::chrono::high_resolution_clock::now()) { tx_service_ = shards.tx_service_; for (std::unique_ptr &ccs : shards.cc_shards_) @@ -81,7 +82,7 @@ Checkpointer::Checkpointer(LocalCcShards &shards, DLOG(INFO) << "checkpointer init, checkpoint_interval_: " << checkpoint_interval_ - << " ,min_checkpoint_interval_: " << min_checkpoint_interval_ + << " ,min_ckpt_request_interval_: " << min_ckpt_request_interval_ << " ,ckpt_delay_seconds: " << ckpt_delay_seconds; } @@ -475,8 +476,8 @@ void Checkpointer::Notify(bool request_ckpt) if (request_ckpt) { if (std::chrono::high_resolution_clock::now() < - last_checkpoint_ts_ + - std::chrono::seconds(min_checkpoint_interval_)) + last_checkpoint_request_ts_.load(std::memory_order_acquire) + + std::chrono::seconds(min_ckpt_request_interval_)) { return; } @@ -487,6 +488,10 @@ void Checkpointer::Notify(bool request_ckpt) { return; } + + last_checkpoint_request_ts_.store( + std::chrono::high_resolution_clock::now(), + std::memory_order_release); } std::unique_lock lk(ckpt_mux_); ckpt_cv_.notify_one(); From 4aabbe8dda1a556278d5563502ef5302d7423867 Mon Sep 17 00:00:00 2001 From: githubzilla Date: Fri, 20 Feb 2026 21:42:51 +0800 Subject: [PATCH 13/15] Fix comments --- tx_service/include/cc/cc_shard.h | 2 -- tx_service/include/checkpointer.h | 2 +- tx_service/src/cc/cc_req_misc.cpp | 11 +++++++++-- tx_service/src/cc/cc_shard.cpp | 7 +++---- tx_service/src/checkpointer.cpp | 23 ++++++++++------------- 5 files changed, 23 insertions(+), 22 deletions(-) diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index d1195258..946dbe49 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -1306,8 +1306,6 @@ class CcShard // The interval (in number of calls to AdjustDataKeyStats) to check whether // dirty memory exceeds the threshold. uint64_t dirty_memory_check_interval_{1000}; - // The threshold of dirty memory size (in MB) to trigger checkpoint. - uint64_t dirty_memory_size_threshold_mb_{0}; // Pre-calculated threshold in bytes (0 means use 10% of memory_limit_). uint64_t dirty_memory_threshold_bytes_{0}; diff --git a/tx_service/include/checkpointer.h b/tx_service/include/checkpointer.h index a5f1aacb..a4e7a045 100644 --- a/tx_service/include/checkpointer.h +++ b/tx_service/include/checkpointer.h @@ -163,7 +163,7 @@ class Checkpointer const uint32_t checkpoint_interval_; const uint32_t min_ckpt_request_interval_; std::chrono::system_clock::time_point last_checkpoint_ts_; - std::atomic + std::atomic last_checkpoint_request_ts_; uint32_t ckpt_delay_time_; // unit: Microsecond std::atomic ongoing_data_sync_cnt_{0}; diff --git a/tx_service/src/cc/cc_req_misc.cpp b/tx_service/src/cc/cc_req_misc.cpp index a52f2acf..6b4766bc 100644 --- a/tx_service/src/cc/cc_req_misc.cpp +++ b/tx_service/src/cc/cc_req_misc.cpp @@ -1493,8 +1493,15 @@ bool ShardCleanCc::Execute(CcShard &ccs) // waiting ccrequests. ccs.AbortRequestsAfterMemoryFree(); - // Note: Checkpoint is now triggered based on dirty memory - // thresholds in AdjustDataKeyStats, not on memory-full events. + // Notify the checkpointer thread to do checkpoint if there + // is not freeable entries to be kicked out from ccmap and + // if the shard is not doing defrag. + if (free_count_ == 0 && !shard_heap->IsDefragHeapCcOnFly() && + !Sharder::Instance().GetCheckpointer()->IsOngoingDataSync()) + { + ccs.NotifyCkpt(true); + } + free_count_ = 0; // Return true will set the request as free, which means the // request is not in working state. diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index ff0e8d75..99e658b0 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -106,8 +106,7 @@ CcShard::CcShard( catalog_factory[4]}, system_handler_(system_handler), active_si_txs_(), - dirty_memory_check_interval_(dirty_memory_check_interval), - dirty_memory_size_threshold_mb_(dirty_memory_size_threshold_mb) + dirty_memory_check_interval_(dirty_memory_check_interval) { // Reserve range_slice_memory_limit_percent% for range slice info. // We update this to dynamically reserve the configured range slice @@ -123,7 +122,7 @@ CcShard::CcShard( memory_limit_ /= core_cnt_; // Pre-calculate dirty memory threshold in bytes - if (dirty_memory_size_threshold_mb_ == 0) + if (dirty_memory_size_threshold_mb == 0) { // Default: 10% of memory limit per shard, with minimum floor of 1 MB dirty_memory_threshold_bytes_ = @@ -136,7 +135,7 @@ CcShard::CcShard( else { dirty_memory_threshold_bytes_ = - dirty_memory_size_threshold_mb_ * 1024 * 1024; + dirty_memory_size_threshold_mb * 1024 * 1024; } // Calculate standby buffer memory limit: 10% of node memory limit per diff --git a/tx_service/src/checkpointer.cpp b/tx_service/src/checkpointer.cpp index 3817d3e5..425efa68 100644 --- a/tx_service/src/checkpointer.cpp +++ b/tx_service/src/checkpointer.cpp @@ -66,7 +66,7 @@ Checkpointer::Checkpointer(LocalCcShards &shards, min_ckpt_request_interval_(min_ckpt_request_interval), ckpt_delay_time_(ckpt_delay_seconds * 1000000), log_agent_(log_agent), - last_checkpoint_request_ts_(std::chrono::high_resolution_clock::now()) + last_checkpoint_request_ts_(std::chrono::system_clock::now()) { tx_service_ = shards.tx_service_; for (std::unique_ptr &ccs : shards.cc_shards_) @@ -410,7 +410,7 @@ void Checkpointer::Ckpt(bool is_last_ckpt) void Checkpointer::Run() { std::unique_lock lk(ckpt_mux_); - last_checkpoint_ts_ = std::chrono::high_resolution_clock::now(); + last_checkpoint_ts_ = std::chrono::system_clock::now(); while (ckpt_thd_status_ == Status::Active) { while (!ckpt_cv_.wait_for( @@ -427,7 +427,7 @@ void Checkpointer::Run() // we've sleeped for at least checkpoint_interval_ seconds. // Only enqueue new checkpoint task if there's idle worker. return (request_ckpt_.load(std::memory_order_acquire) || - std::chrono::high_resolution_clock::now() >= + std::chrono::system_clock::now() >= last_checkpoint_ts_ + std::chrono::seconds(checkpoint_interval_)); })) @@ -437,7 +437,7 @@ void Checkpointer::Run() CODE_FAULT_INJECTOR("checkpointer_skip_ckpt", { request_ckpt_.store(false, std::memory_order_release); - last_checkpoint_ts_ = std::chrono::high_resolution_clock::now(); + last_checkpoint_ts_ = std::chrono::system_clock::now(); continue; }); @@ -446,7 +446,7 @@ void Checkpointer::Run() break; } - last_checkpoint_ts_ = std::chrono::high_resolution_clock::now(); + last_checkpoint_ts_ = std::chrono::system_clock::now(); lk.unlock(); Ckpt(false); lk.lock(); @@ -475,23 +475,20 @@ void Checkpointer::Notify(bool request_ckpt) { if (request_ckpt) { - if (std::chrono::high_resolution_clock::now() < - last_checkpoint_request_ts_.load(std::memory_order_acquire) + - std::chrono::seconds(min_ckpt_request_interval_)) + auto now = std::chrono::system_clock::now(); + if (now < last_checkpoint_request_ts_.load(std::memory_order_relaxed) + + std::chrono::seconds(min_ckpt_request_interval_)) { return; } bool expected = false; - if (!request_ckpt_.compare_exchange_strong( - expected, true, std::memory_order_release)) + if (!request_ckpt_.compare_exchange_strong(expected, true)) { return; } - last_checkpoint_request_ts_.store( - std::chrono::high_resolution_clock::now(), - std::memory_order_release); + last_checkpoint_request_ts_.store(now); } std::unique_lock lk(ckpt_mux_); ckpt_cv_.notify_one(); From 75885c182f1c2eb456daa9bae286776f532684b9 Mon Sep 17 00:00:00 2001 From: githubzilla Date: Fri, 20 Feb 2026 21:54:43 +0800 Subject: [PATCH 14/15] Fix compile warning --- tx_service/src/checkpointer.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tx_service/src/checkpointer.cpp b/tx_service/src/checkpointer.cpp index 425efa68..76d7b7aa 100644 --- a/tx_service/src/checkpointer.cpp +++ b/tx_service/src/checkpointer.cpp @@ -64,9 +64,12 @@ Checkpointer::Checkpointer(LocalCcShards &shards, ckpt_thd_status_(Status::Active), checkpoint_interval_(checkpoint_interval), min_ckpt_request_interval_(min_ckpt_request_interval), + last_checkpoint_ts_(std::chrono::system_clock::now()), + last_checkpoint_request_ts_(std::chrono::system_clock::now()), ckpt_delay_time_(ckpt_delay_seconds * 1000000), - log_agent_(log_agent), - last_checkpoint_request_ts_(std::chrono::system_clock::now()) + ongoing_data_sync_cnt_(0), + tx_service_(nullptr), + log_agent_(log_agent) { tx_service_ = shards.tx_service_; for (std::unique_ptr &ccs : shards.cc_shards_) From 51c683ef84e6eeeb3f56c6185ad9cb8fae5c2846 Mon Sep 17 00:00:00 2001 From: githubzilla Date: Sat, 21 Feb 2026 22:19:41 +0800 Subject: [PATCH 15/15] Init last_checkpoint_request_ts_ --- tx_service/src/checkpointer.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tx_service/src/checkpointer.cpp b/tx_service/src/checkpointer.cpp index 76d7b7aa..afac83bc 100644 --- a/tx_service/src/checkpointer.cpp +++ b/tx_service/src/checkpointer.cpp @@ -65,7 +65,12 @@ Checkpointer::Checkpointer(LocalCcShards &shards, checkpoint_interval_(checkpoint_interval), min_ckpt_request_interval_(min_ckpt_request_interval), last_checkpoint_ts_(std::chrono::system_clock::now()), - last_checkpoint_request_ts_(std::chrono::system_clock::now()), + // Initialize last_checkpoint_request_ts_ to a time point that is + // sufficiently in the past, so that the first checkpoint request can be + // processed immediately + last_checkpoint_request_ts_( + std::chrono::system_clock::now() - + std::chrono::seconds(2 * min_ckpt_request_interval)), ckpt_delay_time_(ckpt_delay_seconds * 1000000), ongoing_data_sync_cnt_(0), tx_service_(nullptr),