From e6b5f10883a8191b87e5609e3107c9a0c29f794b Mon Sep 17 00:00:00 2001 From: lokax Date: Thu, 26 Feb 2026 14:49:47 +0800 Subject: [PATCH 1/6] ckpt --- store_handler/data_store_service_client.cpp | 3 +- .../eloq_store_data_store.cpp | 39 +- tx_service/include/cc/cc_request.h | 52 +- tx_service/include/cc/local_cc_shards.h | 32 +- tx_service/include/cc/template_cc_map.h | 171 +-- tx_service/src/cc/local_cc_shards.cpp | 999 ++++++++++-------- 6 files changed, 673 insertions(+), 623 deletions(-) diff --git a/store_handler/data_store_service_client.cpp b/store_handler/data_store_service_client.cpp index 9a2b8848..f4035c73 100644 --- a/store_handler/data_store_service_client.cpp +++ b/store_handler/data_store_service_client.cpp @@ -306,7 +306,6 @@ bool DataStoreServiceClient::PutAll( DLOG(INFO) << "DataStoreServiceClient::PutAll called with " << flush_task.size() << " tables to flush."; uint64_t now = txservice::LocalCcShards::ClockTsInMillseconds(); - // Create global coordinator SyncPutAllData *sync_putall = sync_putall_data_pool_.NextObject(); PoolableGuard sync_putall_guard(sync_putall); @@ -478,6 +477,7 @@ bool DataStoreServiceClient::PutAll( callback_data->Clear(); callback_data->Free(); } + return false; } } @@ -493,6 +493,7 @@ bool DataStoreServiceClient::PutAll( metrics::kv_meter->Collect( metrics::NAME_KV_FLUSH_ROWS_TOTAL, records_count, "base"); } + return true; } diff --git a/store_handler/eloq_data_store_service/eloq_store_data_store.cpp b/store_handler/eloq_data_store_service/eloq_store_data_store.cpp index 9140a902..54aac848 100644 --- a/store_handler/eloq_data_store_service/eloq_store_data_store.cpp +++ b/store_handler/eloq_data_store_service/eloq_store_data_store.cpp @@ -60,6 +60,12 @@ inline void BuildKey(const WriteRecordsRequest &write_req, uint16_t key_parts, std::string &key_out) { + size_t total_size = 0; + for (uint16_t i = 0; i < key_parts; ++i) + { + total_size += write_req.GetKeyPart(key_first_idx + i).size(); + } + key_out.reserve(key_out.size() + total_size); size_t part_idx = key_first_idx; for (uint16_t i = 0; i < key_parts; ++i, ++part_idx) { @@ -73,6 +79,12 @@ inline void BuildValue(const WriteRecordsRequest &write_req, uint16_t rec_parts, std::string &rec_out) { + size_t total_size = 0; + for (uint16_t i = 0; i < rec_parts; ++i) + { + total_size += write_req.GetRecordPart(rec_first_idx + i).size(); + } + rec_out.reserve(rec_out.size() + total_size); size_t part_idx = rec_first_idx; for (uint16_t i = 0; i < rec_parts; ++i, ++part_idx) { @@ -215,26 +227,28 @@ void EloqStoreDataStore::BatchWriteRecords(WriteRecordsRequest *write_req) ::eloqstore::BatchWriteRequest &kv_write_req = write_op->EloqStoreRequest(); - std::vector<::eloqstore::WriteDataEntry> entries; - size_t rec_cnt = write_req->RecordsCount(); - entries.reserve(rec_cnt); + const size_t rec_cnt = write_req->RecordsCount(); const uint16_t parts_per_key = write_req->PartsCountPerKey(); const uint16_t parts_per_record = write_req->PartsCountPerRecord(); - size_t first_idx = 0; + + std::vector<::eloqstore::WriteDataEntry> entries(rec_cnt); + size_t key_offset = 0; + size_t val_offset = 0; + for (size_t i = 0; i < rec_cnt; ++i) { - ::eloqstore::WriteDataEntry entry; - first_idx = i * parts_per_key; - BuildKey(*write_req, first_idx, parts_per_key, entry.key_); - first_idx = i * parts_per_record; - BuildValue(*write_req, first_idx, parts_per_record, entry.val_); + ::eloqstore::WriteDataEntry &entry = entries[i]; + BuildKey(*write_req, key_offset, parts_per_key, entry.key_); + BuildValue(*write_req, val_offset, parts_per_record, entry.val_); + key_offset += parts_per_key; + val_offset += parts_per_record; + entry.timestamp_ = write_req->GetRecordTs(i); entry.op_ = (write_req->KeyOpType(i) == WriteOpType::PUT ? ::eloqstore::WriteOp::Upsert : ::eloqstore::WriteOp::Delete); - uint64_t ttl = write_req->GetRecordTtl(i); - entry.expire_ts_ = ttl == UINT64_MAX ? 0 : ttl; - entries.emplace_back(std::move(entry)); + const uint64_t ttl = write_req->GetRecordTtl(i); + entry.expire_ts_ = (ttl == UINT64_MAX) ? 0u : ttl; } if (!std::ranges::is_sorted( @@ -251,6 +265,7 @@ void EloqStoreDataStore::BatchWriteRecords(WriteRecordsRequest *write_req) kv_write_req.SetArgs(eloq_store_table_id, std::move(entries)); uint64_t user_data = reinterpret_cast(write_op); + if (!eloq_store_service_->ExecAsyn(&kv_write_req, user_data, OnBatchWrite)) { LOG(ERROR) << "Send write request to EloqStore failed for table: " diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index 97e93fae..e759c7a5 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -8991,11 +8991,7 @@ struct ScanSliceDeltaSizeCcForRangePartition : public CcRequestBase struct ScanDeltaSizeCcForHashPartition : public CcRequestBase { - static constexpr size_t ScanBatchSize = 128; - ScanDeltaSizeCcForHashPartition(const TableName &table_name, - uint64_t last_datasync_ts, - uint64_t scan_ts, uint64_t ng_id, int64_t ng_term, uint64_t txn, @@ -9003,8 +8999,6 @@ struct ScanDeltaSizeCcForHashPartition : public CcRequestBase : table_name_(table_name), node_group_id_(ng_id), node_group_term_(ng_term), - last_datasync_ts_(last_datasync_ts), - scan_ts_(scan_ts), schema_version_(schema_version) { tx_number_ = txn; @@ -9096,44 +9090,27 @@ struct ScanDeltaSizeCcForHashPartition : public CcRequestBase return node_group_term_; } - uint64_t LastDataSyncTs() const - { - return last_datasync_ts_; - } - - uint64_t ScanTs() const - { - return scan_ts_; - } - void AbortCcRequest(CcErrorCode err_code) override { assert(err_code != CcErrorCode::NO_ERROR); SetError(err_code); } - TxKey &PausedKey() + void SetKeyCounts(const size_t data_key_count, const size_t dirty_key_count) { - return pause_key_; - } - - void UpdateKeyCount(const bool need_ckpt) - { - ++scanned_key_count_; - if (need_ckpt) - { - ++updated_key_count_; - } + data_key_count_ = data_key_count; + dirty_key_count_ = dirty_key_count; } size_t UpdatedMemory() const { - const uint64_t scanned = scanned_key_count_; - if (scanned == 0) + if (data_key_count_ == 0) return 0; + // Use cc map's data_key_count_ and dirty_key_count_ for estimation. // integer math with rounding up to avoid systematic underestimation return static_cast( - (updated_key_count_ * memory_usage_ + scanned - 1) / scanned); + (dirty_key_count_ * memory_usage_ + data_key_count_ - 1) / + data_key_count_); } void SetMemoryUsage(const uint64_t memory) @@ -9150,20 +9127,11 @@ struct ScanDeltaSizeCcForHashPartition : public CcRequestBase const TableName &table_name_; uint32_t node_group_id_; int64_t node_group_term_; - // It is used as a hint to decide if a page has dirty data since last round - // of checkpoint. It is guaranteed that all entries committed before this ts - // are synced into data store. - uint64_t last_datasync_ts_; - // Target ts. Collect all data changes committed before this ts into data - // sync vec. - uint64_t scan_ts_; - // Position that we left off during last round of scan. - TxKey pause_key_; uint64_t schema_version_; - // Number of keys scanned / updated, and per-core memory usage. - uint64_t scanned_key_count_{0}; - uint64_t updated_key_count_{0}; + // From cc map: total data keys and dirty keys needing checkpoint. + size_t data_key_count_{0}; + size_t dirty_key_count_{0}; uint64_t memory_usage_{0}; CcErrorCode err_{CcErrorCode::NO_ERROR}; diff --git a/tx_service/include/cc/local_cc_shards.h b/tx_service/include/cc/local_cc_shards.h index 961bee52..6d299392 100644 --- a/tx_service/include/cc/local_cc_shards.h +++ b/tx_service/include/cc/local_cc_shards.h @@ -2452,6 +2452,13 @@ class LocalCcShards uint32_t range_cnt, int32_t &out_next_partition_id); + /** + * @brief Map a data_sync_worker index to the fixed flush_data_worker index. + * Used when data_sync_worker count != flush_data_worker count so that each + * data_sync_worker consistently targets one flush_data_worker. + */ + size_t DataSyncWorkerToFlushDataWorker(size_t data_sync_worker_id) const; + /** * @brief Add a flush task entry to the flush task. If the there's no * pending flush task, create a new flush task and add the entry to it. @@ -2463,16 +2470,21 @@ class LocalCcShards WorkerThreadContext flush_data_worker_ctx_; - // The flush task that has not reached the max pending flush size. - // New flush task entry will be added to this buffer. This task will - // be appended to pending_flush_work_ when it reaches the max pending flush - // size, which will then be processed by flush data worker. - FlushDataTask cur_flush_buffer_; - // Flush task queue for flush data worker to process. - std::deque> pending_flush_work_; - - void FlushDataWorker(); - void FlushData(std::unique_lock &flush_worker_lk); + // Per-worker flush buffers. Each DataSyncWorker has its own buffer. + // New flush task entry will be added to the corresponding buffer. This task + // will be appended to pending_flush_work_[worker_idx] when it reaches the + // max pending flush size, which will then be processed by the corresponding + // flush data worker. Store as pointers because FlushDataTask contains a + // bthread::Mutex and is non-movable/non-copyable, which cannot be stored + // directly in a vector that may reallocate. + std::vector> cur_flush_buffers_; + // Per-worker flush task queues. Each FlushDataWorker processes its + // corresponding queue. + std::vector>> pending_flush_work_; + + void FlushDataWorker(size_t worker_idx); + void FlushData(std::unique_lock &flush_worker_lk, + size_t worker_idx); // Memory controller for data sync. DataSyncMemoryController data_sync_mem_controller_; diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 77fb5be1..b343dc9d 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -8404,132 +8404,11 @@ class TemplateCcMap : public CcMap return false; } - auto &paused_key = req.PausedKey(); - - auto deduce_iterator = [this](const KeyT &search_key) -> Iterator - { - Iterator it; - std::pair search_pair = - ForwardScanStart(search_key, true); - it = search_pair.first; - if (search_pair.second == ScanType::ScanGap) - { - ++it; - } - return it; - }; - - auto next_page_it = [this](Iterator &end_it) -> Iterator - { - Iterator it = end_it; - if (it != End()) - { - CcPage *ccp = - it.GetPage(); - assert(ccp != nullptr); - if (ccp->next_page_ == PagePosInf()) - { - it = End(); - } - else - { - it = Iterator(ccp->next_page_, 0, &neg_inf_); - } - } - return it; - }; - - Iterator key_it; - Iterator req_end_it; - - // The key iterator. - const KeyT *const search_start_key = - paused_key.GetKey() == nullptr ? KeyT::NegativeInfinity() - : paused_key.GetKey(); - key_it = deduce_iterator(*search_start_key); - - // The request end iterator - req_end_it = deduce_iterator(*KeyT::PositiveInfinity()); - - // Since we might skip the page that end_it is on if it's not updated - // since last ckpt, it might skip end_it. If the last page is skipped it - // will be set as the first entry on the next page. Also check if - // (key_it == end_next_page_it). - Iterator req_end_next_page_it = next_page_it(req_end_it); - - // The current slice end iterator - Iterator slice_end_it = req_end_it; - Iterator slice_end_next_page_it = req_end_next_page_it; - - // ScanSliceDeltaSizeCcForRangePartition is running on TxProcessor - // thread. To avoid blocking other transaction for a long time, we only - // process ScanBatchSize number of keys in each round. - for (size_t scan_cnt = 0; - scan_cnt < ScanDeltaSizeCcForHashPartition::ScanBatchSize && - key_it != req_end_it && key_it != req_end_next_page_it; - ++scan_cnt) - { - CcEntry *cce = - key_it->second; - CcPage *ccp = - key_it.GetPage(); - assert(ccp); - - if (ccp->last_dirty_commit_ts_ <= req.LastDataSyncTs()) - { - // Skip the pages that have no updates since last data sync. - if (ccp->next_page_ == PagePosInf()) - { - key_it = End(); - } - else - { - key_it = Iterator(ccp->next_page_, 0, &neg_inf_); - } - - // Check the slice iterator. - if (key_it == slice_end_it || key_it == slice_end_next_page_it) - { - // Reset the slice end iterator - slice_end_it = req_end_it; - slice_end_next_page_it = next_page_it(slice_end_it); - } - continue; - } - - const uint64_t commit_ts = cce->CommitTs(); - - // The commit_ts <= 1 means the key is non-existed or a new inserted - // key that the tx has not finished post-processing. - req.UpdateKeyCount(cce->NeedCkpt() && commit_ts > 1 && - commit_ts <= req.ScanTs()); - - // Forward key iterator - ++key_it; - - if (key_it == slice_end_it) - { - // Update the end it. - slice_end_it = req_end_it; - slice_end_next_page_it = next_page_it(slice_end_it); - } - } - - if (key_it == req_end_it || key_it == req_end_next_page_it) - { - int64_t allocated, committed; - mi_thread_stats(&allocated, &committed); - req.SetMemoryUsage(allocated); - req.SetFinish(); - } - else - { - paused_key = key_it->first->CloneTxKey(); - shard_->EnqueueLowPriorityCcRequest(&req); - } - - // Access ScanSliceDeltaSizeCcForRangePartition member variable is - // unsafe after SetFinished(). + req.SetKeyCounts(data_key_count_, dirty_data_key_count_); + int64_t allocated, committed; + mi_thread_stats(&allocated, &committed); + req.SetMemoryUsage(allocated); + req.SetFinish(); return false; } @@ -8666,10 +8545,35 @@ class TemplateCcMap : public CcMap return true; } + void AdjustDataKeyStats(int64_t size_delta, int64_t dirty_delta) + { + if (table_name_.IsMeta()) + { + return; + } + + if (size_delta != 0) + { + assert(size_delta >= 0 || + data_key_count_ >= static_cast(-size_delta)); + data_key_count_ = static_cast( + static_cast(data_key_count_) + size_delta); + } + + if (dirty_delta != 0) + { + assert(dirty_delta >= 0 || + dirty_data_key_count_ >= static_cast(-dirty_delta)); + dirty_data_key_count_ = static_cast( + static_cast(dirty_data_key_count_) + dirty_delta); + } + } + void OnEntryFlushed(bool was_dirty, bool is_persistent) override { if (was_dirty && is_persistent) { + AdjustDataKeyStats(0, -1); shard_->AdjustDataKeyStats(table_name_, 0, -1); } } @@ -8689,6 +8593,7 @@ class TemplateCcMap : public CcMap page_it = ccmp_.find(ccpage->FirstKey()); } const bool was_dirty = cce->IsDirty(); + AdjustDataKeyStats(-1, was_dirty ? -1 : 0); shard_->AdjustDataKeyStats(table_name_, -1, was_dirty ? -1 : 0); ccpage->Remove(cce); @@ -8765,6 +8670,8 @@ class TemplateCcMap : public CcMap if (total_freed > 0 || dirty_freed > 0) { + AdjustDataKeyStats(-static_cast(total_freed), + -static_cast(dirty_freed)); shard_->AdjustDataKeyStats(table_name_, -static_cast(total_freed), -static_cast(dirty_freed)); @@ -8802,6 +8709,8 @@ class TemplateCcMap : public CcMap cce->ClearLocks(*shard_, cc_ng_id_); } + AdjustDataKeyStats(-static_cast(page->Size()), + -static_cast(dirty_freed)); shard_->AdjustDataKeyStats(table_name_, -static_cast(page->Size()), -static_cast(dirty_freed)); @@ -9191,6 +9100,7 @@ class TemplateCcMap : public CcMap { if (!was_dirty && cce->IsDirty()) { + AdjustDataKeyStats(0, +1); shard_->AdjustDataKeyStats(table_name_, 0, +1); } } @@ -10062,6 +9972,7 @@ class TemplateCcMap : public CcMap { normal_obj_sz_ += normal_rec_change; } + AdjustDataKeyStats(+(end_idx - first_index), 0); shard_->AdjustDataKeyStats( table_name_, +(end_idx - first_index), 0); @@ -10100,6 +10011,7 @@ class TemplateCcMap : public CcMap assert(new_key_cnt > 0); // Update CCMap size + AdjustDataKeyStats(+new_key_cnt, 0); shard_->AdjustDataKeyStats(table_name_, +new_key_cnt, 0); } else @@ -10193,6 +10105,7 @@ class TemplateCcMap : public CcMap assert(new_key_cnt > 0); // Update Ccmap size + AdjustDataKeyStats(+new_key_cnt, 0); shard_->AdjustDataKeyStats(table_name_, +new_key_cnt, 0); } @@ -10432,6 +10345,7 @@ class TemplateCcMap : public CcMap // update lru list shard_->UpdateLruList(target_page, true); + AdjustDataKeyStats(+1, 0); shard_->AdjustDataKeyStats(table_name_, +1, 0); return Iterator(target_page, idx_in_page, &neg_inf_); @@ -11608,6 +11522,9 @@ class TemplateCcMap : public CcMap { normal_obj_sz_ -= clean_guard->CleanObjectCount(); } + AdjustDataKeyStats( + -static_cast(clean_guard->FreedCount()), + -static_cast(clean_guard->DirtyFreedCount())); shard_->AdjustDataKeyStats( table_name_, -static_cast(clean_guard->FreedCount()), @@ -11926,6 +11843,8 @@ class TemplateCcMap : public CcMap TemplateCcMapSamplePool *sample_pool_; size_t normal_obj_sz_{ 0}; // The count of all normal status objects, only used for redis + size_t data_key_count_{0}; + size_t dirty_data_key_count_{0}; }; template diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index 810b5607..6530fbf6 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -140,10 +140,8 @@ LocalCcShards::LocalCcShards( flush_data_worker_ctx_( std::min(static_cast(conf.at("core_num")), 10)), #endif - - cur_flush_buffer_( - static_cast(MB(conf.at("node_memory_limit_mb")) * - (FLAGS_ckpt_buffer_ratio - 0.025))), + // cur_flush_buffers_ will be resized in StartBackgroudWorkers() + // when worker_num_ is determined data_sync_mem_controller_(static_cast( MB(conf.at("node_memory_limit_mb")) * FLAGS_ckpt_buffer_ratio)), statistics_worker_ctx_(1), @@ -291,11 +289,28 @@ LocalCcShards::~LocalCcShards() void LocalCcShards::StartBackgroudWorkers() { + // Ensure FlushDataWorker count matches DataSyncWorker count + // Note: worker_num_ is const, so we can't modify it. We use + // data_sync_worker_ctx_.worker_num_ to initialize the vectors and assert + // they match. + // Initialize per-worker data structures + const size_t worker_num = flush_data_worker_ctx_.worker_num_; + // Calculate buffer size + const uint64_t buffer_size = + data_sync_mem_controller_.FlushMemoryQuota() / (worker_num); + cur_flush_buffers_.clear(); + for (size_t i = 0; i < worker_num; ++i) + { + cur_flush_buffers_.emplace_back( + std::make_unique(buffer_size)); + } + pending_flush_work_.resize(worker_num); + // Starts flush worker threads firstly. for (int id = 0; id < flush_data_worker_ctx_.worker_num_; id++) { std::thread &thd = flush_data_worker_ctx_.worker_thd_.emplace_back( - [this] { FlushDataWorker(); }); + [this, id] { FlushDataWorker(id); }); std::string thread_name = "flush_data_" + std::to_string(id); pthread_setname_np(thd.native_handle(), thread_name.c_str()); } @@ -4782,16 +4797,58 @@ void LocalCcShards::DataSyncForHashPartition( assert(table_name.Type() == TableType::Primary); + ScanDeltaSizeCcForHashPartition scan_delta_size_cc( + table_name, + ng_id, + ng_term, + data_sync_txm->TxNumber(), + catalog_rec.Schema()->Version()); + + EnqueueToCcShard(worker_idx, &scan_delta_size_cc); + scan_delta_size_cc.Wait(); + + if (scan_delta_size_cc.IsError()) + { + LOG(ERROR) << "DataSync scan slice delta size failed on table " + << table_name.StringView() << " with error code: " + << static_cast(scan_delta_size_cc.ErrorCode()); + + AbortTx(data_sync_txm); + std::lock_guard task_worker_lk(data_sync_worker_ctx_.mux_); + data_sync_task_queue_[worker_idx].emplace_front(data_sync_task); + data_sync_worker_ctx_.cv_.notify_one(); + return; + } + auto core_number = cc_shards_.size(); - auto approximate_partition_number_this_node = std::max( - 1U, total_hash_partitions / Sharder::Instance().NodeGroupCount()); - auto approximate_partition_number_this_core = - std::max(1UL, approximate_partition_number_this_node / core_number); - const size_t flush_buffer_size = cur_flush_buffer_.GetFlushBufferSize(); - constexpr size_t default_data_file_size = 8ULL * 1024 * 1024; - const size_t scan_concurrency = - flush_buffer_size / - (default_data_file_size * approximate_partition_number_this_core); + const size_t partition_number = total_hash_partitions; + auto partition_number_this_core = + partition_number / core_number + + (worker_idx < partition_number % core_number); + std::vector partition_ids; + partition_ids.reserve(partition_number_this_core); + for (size_t i = 0; i < partition_number_this_core; ++i) + { + partition_ids.emplace_back(worker_idx + core_number * i); + } + assert(partition_number_this_core == partition_ids.size()); + const auto updated_memory = scan_delta_size_cc.UpdatedMemory(); + auto updated_memory_per_partition = + partition_number_this_core ? updated_memory / partition_number_this_core + : 0; + const size_t flush_buffer_size = + cur_flush_buffers_[DataSyncWorkerToFlushDataWorker(static_cast( + data_sync_task->id_ % + data_sync_worker_ctx_.worker_num_))] + ->GetFlushBufferSize(); + + const size_t partition_number_per_scan = + std::max(1UL, + updated_memory_per_partition != 0 + ? (flush_buffer_size / updated_memory_per_partition) + : partition_number_this_core); + const size_t scan_concurrency = core_number; + if (scan_concurrency > 0) { bool need_notify = scan_concurrency > @@ -4811,431 +4868,416 @@ void LocalCcShards::DataSyncForHashPartition( data_sync_task->flight_task_cnt_ += 1; } - HashPartitionDataSyncScanCc scan_cc(table_name, - data_sync_task->data_sync_ts_, - ng_id, - ng_term, - DATA_SYNC_SCAN_BATCH_SIZE, - data_sync_txm->TxNumber(), - data_sync_task->forward_cache_, - last_sync_ts, - data_sync_task->filter_lambda_, - catalog_rec.Schema()->Version()); - bool scan_data_drained = false; - - auto data_sync_vec = std::make_unique>(); - auto archive_vec = std::make_unique>(); - auto mv_base_vec = - std::make_unique>>(); - uint64_t vec_mem_usage = 0; - - // Note: `DataSyncScanCc` needs to ensure that no two ckpt_rec with the - // same Key can be generated. Our subsequent algorithms are based on - // this assumption. - - assert(worker_idx < cc_shards_.size()); + for (size_t i = 0; i < partition_number_this_core; + i += partition_number_per_scan) + { + size_t min_partition_id_this_scan = partition_ids[i]; + size_t max_partition_id_this_scan = + partition_ids[std::min(i + partition_number_per_scan, + partition_number_this_core) - + 1]; + std::function filter_lambda = + [min_partition_id_this_scan, + max_partition_id_this_scan, + &filter_func = + data_sync_task->filter_lambda_](const size_t hash_code) + { + return (hash_code % total_hash_partitions) >= + min_partition_id_this_scan && + (hash_code % total_hash_partitions) <= + max_partition_id_this_scan && + (!filter_func || filter_func(hash_code)); + }; - while (!scan_data_drained) - { - EnqueueLowPriorityCcRequestToShard(worker_idx, &scan_cc); - scan_cc.Wait(); + HashPartitionDataSyncScanCc scan_cc(table_name, + data_sync_task->data_sync_ts_, + ng_id, + ng_term, + DATA_SYNC_SCAN_BATCH_SIZE, + data_sync_txm->TxNumber(), + data_sync_task->forward_cache_, + last_sync_ts, + filter_lambda, + catalog_rec.Schema()->Version()); + bool scan_data_drained = false; + + auto data_sync_vec = std::make_unique>(); + auto archive_vec = std::make_unique>(); + auto mv_base_vec = + std::make_unique>>(); + uint64_t vec_mem_usage = 0; - if (scan_cc.IsError() && - scan_cc.ErrorCode() != CcErrorCode::LOG_NOT_TRUNCATABLE) - { - LOG(ERROR) << "DataSync scan failed on table " - << table_name.StringView() << " with error code: " - << static_cast(scan_cc.ErrorCode()); + // Note: `DataSyncScanCc` needs to ensure that no two ckpt_rec with the + // same Key can be generated. Our subsequent algorithms are based on + // this assumption. - PostProcessHashPartitionDataSyncTask( - std::move(data_sync_task), - data_sync_txm, - DataSyncTask::CkptErrorCode::SCAN_ERROR); + assert(worker_idx < cc_shards_.size()); - return; - } - if (scan_cc.ErrorCode() == CcErrorCode::LOG_NOT_TRUNCATABLE) + while (!scan_data_drained) { - data_sync_task->status_->SetEntriesSkippedAndNoTruncateLog(); - } - - scan_data_drained = true; + EnqueueLowPriorityCcRequestToShard(worker_idx, &scan_cc); + scan_cc.Wait(); - // Send cache to target node group if needed. - if (data_sync_task->forward_cache_) - { - std::shared_lock meta_lk(meta_data_mux_); - const auto bucket_infos = GetAllBucketInfosNoLocking(ng_id); - if (bucket_infos == nullptr) + if (scan_cc.IsError() && + scan_cc.ErrorCode() != CcErrorCode::LOG_NOT_TRUNCATABLE) { - // no longer node group owner, abort the task - LOG(ERROR) << "DataSync: Failed to get bucket infos for " - "ng#" - << ng_id; + LOG(ERROR) << "DataSync scan failed on table " + << table_name.StringView() << " with error code: " + << static_cast(scan_cc.ErrorCode()); + PostProcessHashPartitionDataSyncTask( std::move(data_sync_task), data_sync_txm, DataSyncTask::CkptErrorCode::SCAN_ERROR); + return; } + if (scan_cc.ErrorCode() == CcErrorCode::LOG_NOT_TRUNCATABLE) + { + data_sync_task->status_->SetEntriesSkippedAndNoTruncateLog(); + } + + scan_data_drained = true; - std::unordered_map - send_cache_closures; - for (size_t idx = 0; idx < scan_cc.accumulated_scan_cnt_; idx++) + // Send cache to target node group if needed. + if (data_sync_task->forward_cache_) { - FlushRecord &ref = scan_cc.DataSyncVec()[idx]; - uint16_t bucket_id = - Sharder::MapKeyHashToBucketId(ref.Key().Hash()); - NodeGroupId dest_ng = - bucket_infos->at(bucket_id)->DirtyBucketOwner(); - assert(dest_ng != UINT32_MAX); + std::shared_lock meta_lk(meta_data_mux_); + const auto bucket_infos = GetAllBucketInfosNoLocking(ng_id); + if (bucket_infos == nullptr) + { + // no longer node group owner, abort the task + LOG(ERROR) << "DataSync: Failed to get bucket infos for " + "ng#" + << ng_id; + PostProcessHashPartitionDataSyncTask( + std::move(data_sync_task), + data_sync_txm, + DataSyncTask::CkptErrorCode::SCAN_ERROR); + return; + } - // Put the record into the request for this node group. - auto ins_res = send_cache_closures.try_emplace(dest_ng); - remote::UploadBatchRequest *req_ptr = nullptr; - if (ins_res.second) + std::unordered_map + send_cache_closures; + for (size_t idx = 0; idx < scan_cc.accumulated_scan_cnt_; idx++) { - uint32_t node_id = - Sharder::Instance().LeaderNodeId(dest_ng); - std::shared_ptr channel = - Sharder::Instance().GetCcNodeServiceChannel(node_id); - if (channel == nullptr) + FlushRecord &ref = scan_cc.DataSyncVec()[idx]; + uint16_t bucket_id = + Sharder::MapKeyHashToBucketId(ref.Key().Hash()); + NodeGroupId dest_ng = + bucket_infos->at(bucket_id)->DirtyBucketOwner(); + assert(dest_ng != UINT32_MAX); + + // Put the record into the request for this node group. + auto ins_res = send_cache_closures.try_emplace(dest_ng); + remote::UploadBatchRequest *req_ptr = nullptr; + if (ins_res.second) { - // Fail to establish the channel to the tx node. - // Just skip the cache sending since it is a best - // effort try to performance improvement. - LOG(ERROR) << "UploadBatch: Failed to init the " - "channel of ng#" - << dest_ng; - send_cache_closures.erase(ins_res.first); + uint32_t node_id = + Sharder::Instance().LeaderNodeId(dest_ng); + std::shared_ptr channel = + Sharder::Instance().GetCcNodeServiceChannel( + node_id); + if (channel == nullptr) + { + // Fail to establish the channel to the tx node. + // Just skip the cache sending since it is a best + // effort try to performance improvement. + LOG(ERROR) << "UploadBatch: Failed to init the " + "channel of ng#" + << dest_ng; + send_cache_closures.erase(ins_res.first); + } + else + { + // Create a closure for the first time. + UploadBatchClosure *upload_batch_closure = + new UploadBatchClosure( + [this, data_sync_task, data_sync_txm]( + CcErrorCode res_code, + int32_t dest_ng_term) + { + // We don't care if + // the cache send + // was succeed or + // not since it's a + // best effort + // move. Just pass in no error + // so that it won't cause data sync + // failure. + PostProcessHashPartitionDataSyncTask( + std::move(data_sync_task), + data_sync_txm, + DataSyncTask::CkptErrorCode:: + NO_ERROR); + }, + 10000, + false); + + upload_batch_closure->SetChannel(node_id, channel); + + ins_res.first->second = upload_batch_closure; + req_ptr = + upload_batch_closure->UploadBatchRequest(); + req_ptr->set_node_group_id(dest_ng); + req_ptr->set_node_group_term(-1); + req_ptr->set_table_name_str(table_name.String()); + req_ptr->set_table_type( + remote::ToRemoteType::ConvertTableType( + table_name.Type())); + req_ptr->set_table_engine( + remote::ToRemoteType::ConvertTableEngine( + table_name.Engine())); + req_ptr->set_kind( + remote::UploadBatchKind::DIRTY_BUCKET_DATA); + req_ptr->set_batch_size(0); + // keys + req_ptr->clear_keys(); + // records + req_ptr->clear_records(); + // commit_ts + req_ptr->clear_commit_ts(); + // rec_status + req_ptr->clear_rec_status(); + } } else { - // Create a closure for the first time. - UploadBatchClosure *upload_batch_closure = - new UploadBatchClosure( - [this, data_sync_task, data_sync_txm]( - CcErrorCode res_code, int32_t dest_ng_term) - { - // We don't care if - // the cache send - // was succeed or - // not since it's a - // best effort - // move. Just pass in no error - // so that it won't cause data sync - // failure. - PostProcessHashPartitionDataSyncTask( - std::move(data_sync_task), - data_sync_txm, - DataSyncTask::CkptErrorCode::NO_ERROR); - }, - 10000, - false); - - upload_batch_closure->SetChannel(node_id, channel); - - ins_res.first->second = upload_batch_closure; - req_ptr = upload_batch_closure->UploadBatchRequest(); - req_ptr->set_node_group_id(dest_ng); - req_ptr->set_node_group_term(-1); - req_ptr->set_table_name_str(table_name.String()); - req_ptr->set_table_type( - remote::ToRemoteType::ConvertTableType( - table_name.Type())); - req_ptr->set_table_engine( - remote::ToRemoteType::ConvertTableEngine( - table_name.Engine())); - req_ptr->set_kind( - remote::UploadBatchKind::DIRTY_BUCKET_DATA); - req_ptr->set_batch_size(0); - // keys - req_ptr->clear_keys(); - // records - req_ptr->clear_records(); - // commit_ts - req_ptr->clear_commit_ts(); - // rec_status - req_ptr->clear_rec_status(); + req_ptr = ins_res.first->second->UploadBatchRequest(); + } + + if (req_ptr) + { + std::string *keys_str = req_ptr->mutable_keys(); + std::string *rec_status_str = + req_ptr->mutable_rec_status(); + std::string *commit_ts_str = + req_ptr->mutable_commit_ts(); + size_t len_sizeof = sizeof(uint64_t); + const char *val_ptr = nullptr; + ref.Key().Serialize(*keys_str); + if (ref.payload_status_ == RecordStatus::Normal) + { + std::string *recs_str = req_ptr->mutable_records(); + ref.Payload()->Serialize(*recs_str); + } + const char *status_ptr = reinterpret_cast( + &(ref.payload_status_)); + rec_status_str->append(status_ptr, + sizeof(RecordStatus)); + val_ptr = + reinterpret_cast(&(ref.commit_ts_)); + commit_ts_str->append(val_ptr, len_sizeof); + req_ptr->set_batch_size(req_ptr->batch_size() + 1); } } - else + meta_lk.unlock(); + { - req_ptr = ins_res.first->second->UploadBatchRequest(); + std::unique_lock flight_lk( + data_sync_task->flight_task_mux_); + data_sync_task->flight_task_cnt_ += + send_cache_closures.size(); } - - if (req_ptr) + // Send cache to target node groups. + for (auto &[ng, upload_batch_closure] : send_cache_closures) { - std::string *keys_str = req_ptr->mutable_keys(); - std::string *rec_status_str = req_ptr->mutable_rec_status(); - std::string *commit_ts_str = req_ptr->mutable_commit_ts(); - size_t len_sizeof = sizeof(uint64_t); - const char *val_ptr = nullptr; - ref.Key().Serialize(*keys_str); - if (ref.payload_status_ == RecordStatus::Normal) - { - std::string *recs_str = req_ptr->mutable_records(); - ref.Payload()->Serialize(*recs_str); - } - const char *status_ptr = - reinterpret_cast(&(ref.payload_status_)); - rec_status_str->append(status_ptr, sizeof(RecordStatus)); - val_ptr = reinterpret_cast(&(ref.commit_ts_)); - commit_ts_str->append(val_ptr, len_sizeof); - req_ptr->set_batch_size(req_ptr->batch_size() + 1); + remote::CcRpcService_Stub stub( + upload_batch_closure->Channel()); + brpc::Controller *cntl_ptr = + upload_batch_closure->Controller(); + cntl_ptr->set_timeout_ms(10000); + // Asynchronous mode + stub.UploadBatch( + upload_batch_closure->Controller(), + upload_batch_closure->UploadBatchRequest(), + upload_batch_closure->UploadBatchResponse(), + upload_batch_closure); } } - meta_lk.unlock(); + uint64_t flush_data_size = scan_cc.accumulated_flush_data_size_; + + // nothing to flush + if (scan_cc.accumulated_scan_cnt_ == 0) { - std::unique_lock flight_lk( - data_sync_task->flight_task_mux_); - data_sync_task->flight_task_cnt_ += send_cache_closures.size(); - } - // Send cache to target node groups. - for (auto &[ng, upload_batch_closure] : send_cache_closures) - { - remote::CcRpcService_Stub stub(upload_batch_closure->Channel()); - brpc::Controller *cntl_ptr = upload_batch_closure->Controller(); - cntl_ptr->set_timeout_ms(10000); - // Asynchronous mode - stub.UploadBatch(upload_batch_closure->Controller(), - upload_batch_closure->UploadBatchRequest(), - upload_batch_closure->UploadBatchResponse(), - upload_batch_closure); + scan_cc.Reset(); + continue; } - } - - uint64_t flush_data_size = scan_cc.accumulated_flush_data_size_; - - // nothing to flush - if (scan_cc.accumulated_scan_cnt_ == 0) - { - scan_cc.Reset(); - continue; - } - // The cost of FlushRecord also needs to be considered. + // The cost of FlushRecord also needs to be considered. #ifdef WITH_JEMALLOC - flush_data_size += (scan_cc.DataSyncVec().size() * sizeof(FlushRecord) + - scan_cc.ArchiveVec().size() * sizeof(FlushRecord) + - scan_cc.MoveBaseIdxVec().size() * - sizeof(std::pair)); + flush_data_size += + (scan_cc.DataSyncVec().size() * sizeof(FlushRecord) + + scan_cc.ArchiveVec().size() * sizeof(FlushRecord) + + scan_cc.MoveBaseIdxVec().size() * + sizeof(std::pair)); #else - // Check if vectors are empty before calling malloc_usable_size - // to avoid SEGV on nullptr or invalid pointers. - // Use malloc_usable_size when ASan is enabled (vectors may be - // allocated by ASan's allocator), otherwise use - // mi_malloc_usable_size for mimalloc-allocated memory. - { - auto &data_sync_vec_ref = scan_cc.DataSyncVec(); - auto &archive_vec_ref = scan_cc.ArchiveVec(); - auto &move_base_idx_vec_ref = scan_cc.MoveBaseIdxVec(); + // Check if vectors are empty before calling malloc_usable_size + // to avoid SEGV on nullptr or invalid pointers. + // Use malloc_usable_size when ASan is enabled (vectors may be + // allocated by ASan's allocator), otherwise use + // mi_malloc_usable_size for mimalloc-allocated memory. + { + auto &data_sync_vec_ref = scan_cc.DataSyncVec(); + auto &archive_vec_ref = scan_cc.ArchiveVec(); + auto &move_base_idx_vec_ref = scan_cc.MoveBaseIdxVec(); #ifdef __SANITIZE_ADDRESS__ - // When ASan is enabled, use standard malloc_usable_size - flush_data_size += - (data_sync_vec_ref.empty() - ? 0 - : malloc_usable_size(data_sync_vec_ref.data())) + - (archive_vec_ref.empty() - ? 0 - : malloc_usable_size(archive_vec_ref.data())) + - (move_base_idx_vec_ref.empty() - ? 0 - : malloc_usable_size(move_base_idx_vec_ref.data())); + // When ASan is enabled, use standard malloc_usable_size + flush_data_size += + (data_sync_vec_ref.empty() + ? 0 + : malloc_usable_size(data_sync_vec_ref.data())) + + (archive_vec_ref.empty() + ? 0 + : malloc_usable_size(archive_vec_ref.data())) + + (move_base_idx_vec_ref.empty() + ? 0 + : malloc_usable_size(move_base_idx_vec_ref.data())); #else - // When ASan is not enabled, use mimalloc's API - flush_data_size += - (data_sync_vec_ref.empty() - ? 0 - : mi_malloc_usable_size(data_sync_vec_ref.data())) + - (archive_vec_ref.empty() - ? 0 - : mi_malloc_usable_size(archive_vec_ref.data())) + - (move_base_idx_vec_ref.empty() - ? 0 - : mi_malloc_usable_size(move_base_idx_vec_ref.data())); + // When ASan is not enabled, use mimalloc's API + flush_data_size += + (data_sync_vec_ref.empty() + ? 0 + : mi_malloc_usable_size(data_sync_vec_ref.data())) + + (archive_vec_ref.empty() + ? 0 + : mi_malloc_usable_size(archive_vec_ref.data())) + + (move_base_idx_vec_ref.empty() + ? 0 + : mi_malloc_usable_size(move_base_idx_vec_ref.data())); #endif - } + } #endif - // this thread will wait in AllocatePendingFlushDataMemQuota if - // quota is not available - uint64_t old_usage = - data_sync_mem_controller_.AllocateFlushDataMemQuota( - flush_data_size); - DLOG(INFO) << "AllocateFlushDataMemQuota old_usage: " << old_usage - << " new_usage: " << old_usage + flush_data_size - << " quota: " << data_sync_mem_controller_.FlushMemoryQuota() - << " flight_tasks: " << data_sync_task->flight_task_cnt_ - << " record count: " << scan_cc.accumulated_scan_cnt_; - - std::unique_lock heap_lk(hash_partition_ckpt_heap_mux_); - mi_threadid_t prev_thd = - mi_override_thread(hash_partition_main_thread_id_); - mi_heap_t *prev_heap = mi_heap_set_default(hash_partition_ckpt_heap_); + uint64_t old_usage = + data_sync_mem_controller_.AllocateFlushDataMemQuota( + flush_data_size); + DLOG(INFO) << "AllocateFlushDataMemQuota old_usage: " << old_usage + << " new_usage: " << old_usage + flush_data_size + << " quota: " + << data_sync_mem_controller_.FlushMemoryQuota() + << " flight_tasks: " << data_sync_task->flight_task_cnt_ + << " record count: " << scan_cc.accumulated_scan_cnt_; + + std::unique_lock heap_lk(hash_partition_ckpt_heap_mux_); + mi_threadid_t prev_thd = + mi_override_thread(hash_partition_main_thread_id_); + mi_heap_t *prev_heap = + mi_heap_set_default(hash_partition_ckpt_heap_); #if defined(WITH_JEMALLOC) - uint32_t prev_arena; - JemallocArenaSwitcher::ReadCurrentArena(prev_arena); - JemallocArenaSwitcher::SwitchToArena(hash_partition_ckpt_arena_id_); + uint32_t prev_arena; + JemallocArenaSwitcher::ReadCurrentArena(prev_arena); + JemallocArenaSwitcher::SwitchToArena(hash_partition_ckpt_arena_id_); #endif - data_sync_vec->reserve(scan_cc.accumulated_scan_cnt_); - for (size_t j = 0; j < scan_cc.accumulated_scan_cnt_; ++j) - { - auto &rec = scan_cc.DataSyncVec()[j]; - // Note. Clone key instead of move key. The memory of - // rec.Key() will be reused to avoid memory allocation. - if (rec.cce_) + data_sync_vec->reserve(scan_cc.accumulated_scan_cnt_); + for (size_t j = 0; j < scan_cc.accumulated_scan_cnt_; ++j) { - // cce_ is null means the key is already persisted on - // kv, so we don't need to put it into the flush vec. - int32_t part_id = - Sharder::MapKeyHashToHashPartitionId(rec.Key().Hash()); - if (table_name.Engine() == TableEngine::EloqKv) + auto &rec = scan_cc.DataSyncVec()[j]; + // Note. Clone key instead of move key. The memory of + // rec.Key() will be reused to avoid memory allocation. + if (rec.cce_) { - data_sync_vec->emplace_back(rec.Key().Clone(), - rec.GetNonVersionedPayload(), - rec.payload_status_, - rec.commit_ts_, - rec.cce_, - rec.post_flush_size_, - part_id); - } - else - { - data_sync_vec->emplace_back(rec.Key().Clone(), - rec.ReleaseVersionedPayload(), - rec.payload_status_, - rec.commit_ts_, - rec.cce_, - rec.post_flush_size_, - part_id); + // cce_ is null means the key is already persisted on + // kv, so we don't need to put it into the flush vec. + int32_t part_id = + Sharder::MapKeyHashToHashPartitionId(rec.Key().Hash()); + if (table_name.Engine() == TableEngine::EloqKv) + { + data_sync_vec->emplace_back( + rec.Key().Clone(), + rec.GetNonVersionedPayload(), + rec.payload_status_, + rec.commit_ts_, + rec.cce_, + rec.post_flush_size_, + part_id); + } + else + { + data_sync_vec->emplace_back( + rec.Key().Clone(), + rec.ReleaseVersionedPayload(), + rec.payload_status_, + rec.commit_ts_, + rec.cce_, + rec.post_flush_size_, + part_id); + } } } - } - for (size_t j = 0; j < scan_cc.ArchiveVec().size(); ++j) - { - auto &rec = scan_cc.ArchiveVec()[j]; - // Note. We need to ensure the copy constructor of - // FlushRecord could not be called. - rec.SetKey((*data_sync_vec)[rec.GetKeyIndex()].Key()); - } + for (size_t j = 0; j < scan_cc.ArchiveVec().size(); ++j) + { + auto &rec = scan_cc.ArchiveVec()[j]; + // Note. We need to ensure the copy constructor of + // FlushRecord could not be called. + rec.SetKey((*data_sync_vec)[rec.GetKeyIndex()].Key()); + } - for (size_t j = 0; j < scan_cc.MoveBaseIdxVec().size(); ++j) - { - size_t key_idx = scan_cc.MoveBaseIdxVec()[j]; - TxKey key_raw = (*data_sync_vec)[key_idx].Key(); - int32_t part_id = - Sharder::MapKeyHashToHashPartitionId(key_raw.Hash()); - mv_base_vec->emplace_back(std::move(key_raw), part_id); - } - mi_override_thread(prev_thd); - mi_heap_set_default(prev_heap); + for (size_t j = 0; j < scan_cc.MoveBaseIdxVec().size(); ++j) + { + size_t key_idx = scan_cc.MoveBaseIdxVec()[j]; + TxKey key_raw = (*data_sync_vec)[key_idx].Key(); + int32_t part_id = + Sharder::MapKeyHashToHashPartitionId(key_raw.Hash()); + mv_base_vec->emplace_back(std::move(key_raw), part_id); + } + mi_override_thread(prev_thd); + mi_heap_set_default(prev_heap); #if defined(WITH_JEMALLOC) - // override arena id - JemallocArenaSwitcher::SwitchToArena(prev_arena); + // override arena id + JemallocArenaSwitcher::SwitchToArena(prev_arena); #endif - heap_lk.unlock(); + heap_lk.unlock(); - std::move(scan_cc.ArchiveVec().begin(), - scan_cc.ArchiveVec().end(), - std::back_inserter(*archive_vec)); + std::move(scan_cc.ArchiveVec().begin(), + scan_cc.ArchiveVec().end(), + std::back_inserter(*archive_vec)); - vec_mem_usage += flush_data_size; + vec_mem_usage += flush_data_size; - scan_data_drained = scan_cc.IsDrained() && scan_data_drained; + scan_data_drained = scan_cc.IsDrained() && scan_data_drained; - { - std::unique_lock flight_task_lk( - data_sync_task->flight_task_mux_); - if (data_sync_task->ckpt_err_ == - DataSyncTask::CkptErrorCode::FLUSH_ERROR) { - flight_task_lk.unlock(); - - LOG(WARNING) - << "There are error during flush for this data sync: " - << data_sync_txm->TxNumber() << " on worker#" << worker_idx - << ". Terminal this datasync task."; - // 1. Release read intent on paused key - if (!scan_cc.IsDrained()) + std::unique_lock flight_task_lk( + data_sync_task->flight_task_mux_); + if (data_sync_task->ckpt_err_ == + DataSyncTask::CkptErrorCode::FLUSH_ERROR) { - scan_cc.Reset( - HashPartitionDataSyncScanCc::OpType::Terminated); - EnqueueLowPriorityCcRequestToShard(worker_idx, &scan_cc); - scan_cc.Wait(); + flight_task_lk.unlock(); + + LOG(WARNING) + << "There are error during flush for this data sync: " + << data_sync_txm->TxNumber() << " on worker#" + << worker_idx << ". Terminal this datasync task."; + // 1. Release read intent on paused key + if (!scan_cc.IsDrained()) + { + scan_cc.Reset( + HashPartitionDataSyncScanCc::OpType::Terminated); + EnqueueLowPriorityCcRequestToShard(worker_idx, + &scan_cc); + scan_cc.Wait(); + } + // 2. Release memory usage on this datasync worker. + data_sync_vec->clear(); + archive_vec->clear(); + mv_base_vec->clear(); + data_sync_mem_controller_.DeallocateFlushMemQuota( + vec_mem_usage); + break; } - // 2. Release memory usage on this datasync worker. - data_sync_vec->clear(); - archive_vec->clear(); - mv_base_vec->clear(); - data_sync_mem_controller_.DeallocateFlushMemQuota( - vec_mem_usage); - break; - } - - // Flush worker will call PostProcessDataSyncTask() to - // decrement flight task count. - data_sync_task->flight_task_cnt_ += 1; - } - - AddFlushTaskEntry( - std::make_unique(std::move(data_sync_vec), - std::move(archive_vec), - std::move(mv_base_vec), - data_sync_txm, - data_sync_task, - catalog_rec.CopySchema(), - vec_mem_usage)); - - data_sync_vec = std::make_unique>(); - - archive_vec = std::make_unique>(); - mv_base_vec = - std::make_unique>>(); - - vec_mem_usage = 0; - - if (scan_cc.scan_heap_is_full_ == 1) - { - // Clear the FlushRecords' memory of scan cc since the - // DataSyncScan heap is full. - auto &data_sync_vec_ref = scan_cc.DataSyncVec(); - auto &archive_vec_ref = scan_cc.ArchiveVec(); - ReleaseDataSyncScanHeapCc release_scan_heap_cc(&data_sync_vec_ref, - &archive_vec_ref); - EnqueueLowPriorityCcRequestToShard(worker_idx, - &release_scan_heap_cc); - release_scan_heap_cc.Wait(); - } - - scan_cc.Reset(); - } - - // release scan heap memory after scan finish - auto &data_sync_vec_ref = scan_cc.DataSyncVec(); - auto &archive_vec_ref = scan_cc.ArchiveVec(); - ReleaseDataSyncScanHeapCc release_scan_heap_cc(&data_sync_vec_ref, - &archive_vec_ref); - EnqueueLowPriorityCcRequestToShard(worker_idx, &release_scan_heap_cc); - release_scan_heap_cc.Wait(); - - if (!data_sync_vec->empty() || !archive_vec->empty() || - !mv_base_vec->empty()) - { - std::unique_lock flight_task_lk( - data_sync_task->flight_task_mux_); - if (data_sync_task->ckpt_err_ == DataSyncTask::CkptErrorCode::NO_ERROR) - { - data_sync_task->flight_task_cnt_ += 1; - flight_task_lk.unlock(); + // Flush worker will call PostProcessDataSyncTask() to + // decrement flight task count. + data_sync_task->flight_task_cnt_ += 1; + } AddFlushTaskEntry( std::make_unique(std::move(data_sync_vec), @@ -5245,13 +5287,67 @@ void LocalCcShards::DataSyncForHashPartition( data_sync_task, catalog_rec.CopySchema(), vec_mem_usage)); + + data_sync_vec = std::make_unique>(); + + archive_vec = std::make_unique>(); + + mv_base_vec = + std::make_unique>>(); + + vec_mem_usage = 0; + + if (scan_cc.scan_heap_is_full_ == 1) + { + // Clear the FlushRecords' memory of scan cc since the + // DataSyncScan heap is full. + auto &data_sync_vec_ref = scan_cc.DataSyncVec(); + auto &archive_vec_ref = scan_cc.ArchiveVec(); + ReleaseDataSyncScanHeapCc release_scan_heap_cc( + &data_sync_vec_ref, &archive_vec_ref); + EnqueueLowPriorityCcRequestToShard(worker_idx, + &release_scan_heap_cc); + release_scan_heap_cc.Wait(); + } + + scan_cc.Reset(); } - else + + // release scan heap memory after scan finish + auto &data_sync_vec_ref = scan_cc.DataSyncVec(); + auto &archive_vec_ref = scan_cc.ArchiveVec(); + ReleaseDataSyncScanHeapCc release_scan_heap_cc(&data_sync_vec_ref, + &archive_vec_ref); + EnqueueLowPriorityCcRequestToShard(worker_idx, &release_scan_heap_cc); + release_scan_heap_cc.Wait(); + + if (!data_sync_vec->empty() || !archive_vec->empty() || + !mv_base_vec->empty()) { - // There are error during flush, and if we do not put the - // current batch data into flush worker, should release the - // memory usage. - data_sync_mem_controller_.DeallocateFlushMemQuota(vec_mem_usage); + std::unique_lock flight_task_lk( + data_sync_task->flight_task_mux_); + if (data_sync_task->ckpt_err_ == + DataSyncTask::CkptErrorCode::NO_ERROR) + { + data_sync_task->flight_task_cnt_ += 1; + flight_task_lk.unlock(); + AddFlushTaskEntry( + std::make_unique(std::move(data_sync_vec), + std::move(archive_vec), + std::move(mv_base_vec), + data_sync_txm, + data_sync_task, + catalog_rec.CopySchema(), + vec_mem_usage)); + } + else + { + // There are error during flush, and if we do not put the + // current batch data into flush worker, should release the + // memory usage. + data_sync_mem_controller_.DeallocateFlushMemQuota( + vec_mem_usage); + } } } @@ -5721,80 +5817,106 @@ void LocalCcShards::SplitFlushRange( txservice::CommitTx(split_txm); } +size_t LocalCcShards::DataSyncWorkerToFlushDataWorker( + size_t data_sync_worker_id) const +{ + return data_sync_worker_id % flush_data_worker_ctx_.worker_num_; +} + void LocalCcShards::AddFlushTaskEntry(std::unique_ptr &&entry) { - cur_flush_buffer_.AddFlushTaskEntry(std::move(entry)); + assert(cur_flush_buffers_.size() == + static_cast(flush_data_worker_ctx_.worker_num_)); + assert(pending_flush_work_.size() == + static_cast(flush_data_worker_ctx_.worker_num_)); + assert(entry->data_sync_task_ != nullptr); + + // Compute target buffer/queue: map data_sync task id to fixed flush worker + const auto &data_sync_task = entry->data_sync_task_; + size_t target = DataSyncWorkerToFlushDataWorker(static_cast( + data_sync_task->id_ % data_sync_worker_ctx_.worker_num_)); + + std::unique_lock worker_lk(flush_data_worker_ctx_.mux_); + auto &cur_flush_buffer = *cur_flush_buffers_[target]; + + cur_flush_buffer.AddFlushTaskEntry(std::move(entry)); std::unique_ptr flush_data_task = - cur_flush_buffer_.MoveFlushData(false); + cur_flush_buffer.MoveFlushData(false); if (flush_data_task != nullptr) { - std::unique_lock worker_lk(flush_data_worker_ctx_.mux_); + auto &pending_flush_work = pending_flush_work_[target]; // Try to merge with the last task if queue is not empty - if (!pending_flush_work_.empty()) + if (!pending_flush_work.empty()) { - auto &last_task = pending_flush_work_.back(); + auto &last_task = pending_flush_work.back(); if (last_task->MergeFrom(std::move(flush_data_task))) { // Merge successful, task was merged into last_task - flush_data_worker_ctx_.cv_.notify_one(); + flush_data_worker_ctx_.cv_.notify_all(); return; } } // Could not merge, wait if queue is full - while (pending_flush_work_.size() >= - static_cast(flush_data_worker_ctx_.worker_num_)) + while (pending_flush_work.size() >= 2) { flush_data_worker_ctx_.cv_.wait(worker_lk); } // Add as new task - pending_flush_work_.emplace_back(std::move(flush_data_task)); - flush_data_worker_ctx_.cv_.notify_one(); + pending_flush_work.emplace_back(std::move(flush_data_task)); + flush_data_worker_ctx_.cv_.notify_all(); } } void LocalCcShards::FlushCurrentFlushBuffer() { - std::unique_ptr flush_data_task = - cur_flush_buffer_.MoveFlushData(true); - if (flush_data_task != nullptr) + assert(cur_flush_buffers_.size() == flush_data_worker_ctx_.worker_num_); + assert(pending_flush_work_.size() == flush_data_worker_ctx_.worker_num_); + + std::unique_lock worker_lk(flush_data_worker_ctx_.mux_); + // Flush all workers' buffers + for (int worker_idx = 0; worker_idx < flush_data_worker_ctx_.worker_num_; + ++worker_idx) { - std::unique_lock worker_lk(flush_data_worker_ctx_.mux_); + auto &cur_flush_buffer = *cur_flush_buffers_[worker_idx]; + auto &pending_flush_work = pending_flush_work_[worker_idx]; - // Try to merge with the last task if queue is not empty - if (!pending_flush_work_.empty()) + std::unique_ptr flush_data_task = + cur_flush_buffer.MoveFlushData(true); + if (flush_data_task != nullptr) { - auto &last_task = pending_flush_work_.back(); - if (last_task->MergeFrom(std::move(flush_data_task))) + // Try to merge with the last task if queue is not empty + if (!pending_flush_work.empty()) { - // Merge successful, task was merged into last_task - flush_data_worker_ctx_.cv_.notify_one(); - return; + auto &last_task = pending_flush_work.back(); + if (last_task->MergeFrom(std::move(flush_data_task))) + { + // Merge successful, task was merged into last_task + flush_data_worker_ctx_.cv_.notify_all(); + continue; + } } - } - // Could not merge, wait if queue is full - while (pending_flush_work_.size() >= - static_cast(flush_data_worker_ctx_.worker_num_)) - { - flush_data_worker_ctx_.cv_.wait(worker_lk); + // Add as new task + pending_flush_work.emplace_back(std::move(flush_data_task)); + flush_data_worker_ctx_.cv_.notify_all(); } - - // Add as new task - pending_flush_work_.emplace_back(std::move(flush_data_task)); - flush_data_worker_ctx_.cv_.notify_one(); } } -void LocalCcShards::FlushData(std::unique_lock &flush_worker_lk) +void LocalCcShards::FlushData(std::unique_lock &flush_worker_lk, + size_t worker_idx) { + assert(worker_idx < pending_flush_work_.size()); + auto &pending_flush_work = pending_flush_work_[worker_idx]; + // Retrieve first pending work and pop it (FIFO). std::unique_ptr cur_work = - std::move(pending_flush_work_.front()); - pending_flush_work_.pop_front(); + std::move(pending_flush_work.front()); + pending_flush_work.pop_front(); // Notify any threads waiting for queue space flush_data_worker_ctx_.cv_.notify_all(); @@ -5804,7 +5926,7 @@ void LocalCcShards::FlushData(std::unique_lock &flush_worker_lk) auto &flush_task_entries = cur_work->flush_task_entries_; bool succ = true; - // Flushes to the data store + if (EnableMvcc()) { succ = store_hd_->CopyBaseToArchive(flush_task_entries); @@ -5835,7 +5957,6 @@ void LocalCcShards::FlushData(std::unique_lock &flush_worker_lk) } } - // Persist data in kv store if needed if (succ && store_hd_->NeedPersistKV()) { std::vector kv_table_names; @@ -5961,19 +6082,33 @@ void LocalCcShards::FlushData(std::unique_lock &flush_worker_lk) flush_worker_lk.lock(); } -void LocalCcShards::FlushDataWorker() +void LocalCcShards::FlushDataWorker(size_t worker_idx) { + assert(worker_idx < + static_cast(flush_data_worker_ctx_.worker_num_)); + assert(worker_idx < static_cast(pending_flush_work_.size())); + assert(worker_idx < static_cast(cur_flush_buffers_.size())); + + auto &pending_flush_work = pending_flush_work_[worker_idx]; + auto &cur_flush_buffer = *cur_flush_buffers_[worker_idx]; + + using clock = std::chrono::steady_clock; size_t previous_flush_size = 0; - auto previous_size_update_time = std::chrono::steady_clock::now(); + auto previous_size_update_time = clock::now(); + std::unique_lock flush_worker_lk(flush_data_worker_ctx_.mux_); while (flush_data_worker_ctx_.status_ == WorkerStatus::Active) { flush_data_worker_ctx_.cv_.wait_for( flush_worker_lk, 10s, - [this, &previous_flush_size, &previous_size_update_time] + [this, + &pending_flush_work, + &cur_flush_buffer, + &previous_flush_size, + &previous_size_update_time] { - if (!pending_flush_work_.empty() || + if (!pending_flush_work.empty() || flush_data_worker_ctx_.status_ == WorkerStatus::Terminated) { return true; @@ -5982,7 +6117,7 @@ void LocalCcShards::FlushDataWorker() if (current_time - previous_size_update_time > 10s) { size_t current_flush_size = - cur_flush_buffer_.GetPendingFlushSize(); + cur_flush_buffer.GetPendingFlushSize(); bool flush_size_changed = current_flush_size != previous_flush_size; previous_flush_size = current_flush_size; @@ -5994,15 +6129,15 @@ void LocalCcShards::FlushDataWorker() // read lock held by ongoing data sync tx, which might // block the DDL. std::unique_ptr flush_data_task = - cur_flush_buffer_.MoveFlushData(true); + cur_flush_buffer.MoveFlushData(true); if (flush_data_task != nullptr) { // Try to merge with the last task if queue is not // empty Note: flush_worker_lk is already held here // (inside condition variable predicate) - if (!pending_flush_work_.empty()) + if (!pending_flush_work.empty()) { - auto &last_task = pending_flush_work_.back(); + auto &last_task = pending_flush_work.back(); if (last_task->MergeFrom( std::move(flush_data_task))) { @@ -6013,8 +6148,8 @@ void LocalCcShards::FlushDataWorker() } // Add as new task. We just checked that - // pending_flush_work_ is empty, - pending_flush_work_.emplace_back( + // pending_flush_work is empty, + pending_flush_work.emplace_back( std::move(flush_data_task)); return true; } @@ -6024,17 +6159,17 @@ void LocalCcShards::FlushDataWorker() return false; }); - if (pending_flush_work_.empty()) + if (pending_flush_work.empty()) { continue; } - FlushData(flush_worker_lk); + FlushData(flush_worker_lk, worker_idx); } - while (!pending_flush_work_.empty()) + while (!pending_flush_work.empty()) { - FlushData(flush_worker_lk); + FlushData(flush_worker_lk, worker_idx); } } From 1edd2bf72b24f79b741a119dd7da2cf6f7b6a944 Mon Sep 17 00:00:00 2001 From: lokax Date: Sat, 28 Feb 2026 18:09:34 +0800 Subject: [PATCH 2/6] map buffer --- tx_service/src/cc/local_cc_shards.cpp | 121 +++++++++++++------------- 1 file changed, 60 insertions(+), 61 deletions(-) diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index 6530fbf6..787a3019 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -289,22 +289,19 @@ LocalCcShards::~LocalCcShards() void LocalCcShards::StartBackgroudWorkers() { - // Ensure FlushDataWorker count matches DataSyncWorker count - // Note: worker_num_ is const, so we can't modify it. We use - // data_sync_worker_ctx_.worker_num_ to initialize the vectors and assert - // they match. - // Initialize per-worker data structures - const size_t worker_num = flush_data_worker_ctx_.worker_num_; - // Calculate buffer size + // cur_flush_buffers_ sized by data_sync_worker_num (one buffer per data + // sync worker). pending_flush_work_ sized by flush worker + // num. + const size_t data_sync_worker_num = data_sync_worker_ctx_.worker_num_; const uint64_t buffer_size = - data_sync_mem_controller_.FlushMemoryQuota() / (worker_num); + data_sync_mem_controller_.FlushMemoryQuota() / data_sync_worker_num; cur_flush_buffers_.clear(); - for (size_t i = 0; i < worker_num; ++i) + for (size_t i = 0; i < data_sync_worker_num; ++i) { cur_flush_buffers_.emplace_back( std::make_unique(buffer_size)); } - pending_flush_work_.resize(worker_num); + pending_flush_work_.resize(flush_data_worker_ctx_.worker_num_); // Starts flush worker threads firstly. for (int id = 0; id < flush_data_worker_ctx_.worker_num_; id++) @@ -4836,11 +4833,10 @@ void LocalCcShards::DataSyncForHashPartition( auto updated_memory_per_partition = partition_number_this_core ? updated_memory / partition_number_this_core : 0; + const size_t data_sync_worker_id = static_cast( + data_sync_task->id_ % data_sync_worker_ctx_.worker_num_); const size_t flush_buffer_size = - cur_flush_buffers_[DataSyncWorkerToFlushDataWorker(static_cast( - data_sync_task->id_ % - data_sync_worker_ctx_.worker_num_))] - ->GetFlushBufferSize(); + cur_flush_buffers_[data_sync_worker_id]->GetFlushBufferSize(); const size_t partition_number_per_scan = std::max(1UL, @@ -5826,18 +5822,19 @@ size_t LocalCcShards::DataSyncWorkerToFlushDataWorker( void LocalCcShards::AddFlushTaskEntry(std::unique_ptr &&entry) { assert(cur_flush_buffers_.size() == - static_cast(flush_data_worker_ctx_.worker_num_)); + static_cast(data_sync_worker_ctx_.worker_num_)); assert(pending_flush_work_.size() == static_cast(flush_data_worker_ctx_.worker_num_)); assert(entry->data_sync_task_ != nullptr); - // Compute target buffer/queue: map data_sync task id to fixed flush worker const auto &data_sync_task = entry->data_sync_task_; - size_t target = DataSyncWorkerToFlushDataWorker(static_cast( - data_sync_task->id_ % data_sync_worker_ctx_.worker_num_)); + const size_t data_sync_worker_id = static_cast( + data_sync_task->id_ % data_sync_worker_ctx_.worker_num_); + const size_t flush_target = + DataSyncWorkerToFlushDataWorker(data_sync_worker_id); std::unique_lock worker_lk(flush_data_worker_ctx_.mux_); - auto &cur_flush_buffer = *cur_flush_buffers_[target]; + auto &cur_flush_buffer = *cur_flush_buffers_[data_sync_worker_id]; cur_flush_buffer.AddFlushTaskEntry(std::move(entry)); @@ -5845,7 +5842,7 @@ void LocalCcShards::AddFlushTaskEntry(std::unique_ptr &&entry) cur_flush_buffer.MoveFlushData(false); if (flush_data_task != nullptr) { - auto &pending_flush_work = pending_flush_work_[target]; + auto &pending_flush_work = pending_flush_work_[flush_target]; // Try to merge with the last task if queue is not empty if (!pending_flush_work.empty()) @@ -5860,10 +5857,12 @@ void LocalCcShards::AddFlushTaskEntry(std::unique_ptr &&entry) } // Could not merge, wait if queue is full + /* while (pending_flush_work.size() >= 2) { flush_data_worker_ctx_.cv_.wait(worker_lk); } + */ // Add as new task pending_flush_work.emplace_back(std::move(flush_data_task)); @@ -5873,16 +5872,18 @@ void LocalCcShards::AddFlushTaskEntry(std::unique_ptr &&entry) void LocalCcShards::FlushCurrentFlushBuffer() { - assert(cur_flush_buffers_.size() == flush_data_worker_ctx_.worker_num_); - assert(pending_flush_work_.size() == flush_data_worker_ctx_.worker_num_); + assert(cur_flush_buffers_.size() == + static_cast(data_sync_worker_ctx_.worker_num_)); + assert(pending_flush_work_.size() == + static_cast(flush_data_worker_ctx_.worker_num_)); std::unique_lock worker_lk(flush_data_worker_ctx_.mux_); - // Flush all workers' buffers - for (int worker_idx = 0; worker_idx < flush_data_worker_ctx_.worker_num_; - ++worker_idx) + for (int i = 0; i < data_sync_worker_ctx_.worker_num_; ++i) { - auto &cur_flush_buffer = *cur_flush_buffers_[worker_idx]; - auto &pending_flush_work = pending_flush_work_[worker_idx]; + auto &cur_flush_buffer = *cur_flush_buffers_[i]; + size_t flush_target = + DataSyncWorkerToFlushDataWorker(static_cast(i)); + auto &pending_flush_work = pending_flush_work_[flush_target]; std::unique_ptr flush_data_task = cur_flush_buffer.MoveFlushData(true); @@ -6087,13 +6088,12 @@ void LocalCcShards::FlushDataWorker(size_t worker_idx) assert(worker_idx < static_cast(flush_data_worker_ctx_.worker_num_)); assert(worker_idx < static_cast(pending_flush_work_.size())); - assert(worker_idx < static_cast(cur_flush_buffers_.size())); auto &pending_flush_work = pending_flush_work_[worker_idx]; - auto &cur_flush_buffer = *cur_flush_buffers_[worker_idx]; using clock = std::chrono::steady_clock; - size_t previous_flush_size = 0; + std::vector previous_flush_sizes(data_sync_worker_ctx_.worker_num_, + 0); auto previous_size_update_time = clock::now(); std::unique_lock flush_worker_lk(flush_data_worker_ctx_.mux_); @@ -6103,9 +6103,9 @@ void LocalCcShards::FlushDataWorker(size_t worker_idx) flush_worker_lk, 10s, [this, + worker_idx, &pending_flush_work, - &cur_flush_buffer, - &previous_flush_size, + &previous_flush_sizes, &previous_size_update_time] { if (!pending_flush_work.empty() || @@ -6116,42 +6116,41 @@ void LocalCcShards::FlushDataWorker(size_t worker_idx) auto current_time = std::chrono::steady_clock::now(); if (current_time - previous_size_update_time > 10s) { - size_t current_flush_size = - cur_flush_buffer.GetPendingFlushSize(); - bool flush_size_changed = - current_flush_size != previous_flush_size; - previous_flush_size = current_flush_size; previous_size_update_time = current_time; - if (!flush_size_changed && current_flush_size > 0) + for (int i = 0; i < data_sync_worker_ctx_.worker_num_; ++i) { - // data sync might be stuck due to lock conflict with - // DDL. Flush current flush buffer to release catalog - // read lock held by ongoing data sync tx, which might - // block the DDL. - std::unique_ptr flush_data_task = - cur_flush_buffer.MoveFlushData(true); - if (flush_data_task != nullptr) + if (DataSyncWorkerToFlushDataWorker( + static_cast(i)) != worker_idx) { - // Try to merge with the last task if queue is not - // empty Note: flush_worker_lk is already held here - // (inside condition variable predicate) - if (!pending_flush_work.empty()) + continue; + } + size_t current_flush_size = + cur_flush_buffers_[i]->GetPendingFlushSize(); + bool flush_size_changed = + current_flush_size != previous_flush_sizes[i]; + previous_flush_sizes[i] = current_flush_size; + if (!flush_size_changed && current_flush_size > 0) + { + // data sync might be stuck due to lock conflict + // with DDL. Flush buffer to release catalog read + // lock held by ongoing data sync tx. + std::unique_ptr flush_data_task = + cur_flush_buffers_[i]->MoveFlushData(true); + if (flush_data_task != nullptr) { - auto &last_task = pending_flush_work.back(); - if (last_task->MergeFrom( - std::move(flush_data_task))) + if (!pending_flush_work.empty()) { - // Merge successful, task was merged into - // last_task - return true; + auto &last_task = pending_flush_work.back(); + if (last_task->MergeFrom( + std::move(flush_data_task))) + { + return true; + } } + pending_flush_work.emplace_back( + std::move(flush_data_task)); + return true; } - - // Add as new task. We just checked that - // pending_flush_work is empty, - pending_flush_work.emplace_back( - std::move(flush_data_task)); - return true; } } } From 988d4fd16e6be25bfdd7fe7b53b6ecdef451d812 Mon Sep 17 00:00:00 2001 From: lokax Date: Fri, 6 Mar 2026 15:46:24 +0800 Subject: [PATCH 3/6] fix --- tx_service/src/cc/local_cc_shards.cpp | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index 787a3019..12bcfae1 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -131,15 +131,7 @@ LocalCcShards::LocalCcShards( #endif data_sync_worker_ctx_(conf.at("core_num")), -#ifdef EXT_TX_PROC_ENABLED - flush_data_worker_ctx_( - conf.at("core_num") >= 2 - ? std::min(conf.at("core_num") / 2, (uint32_t) 10) - : 1), -#else - flush_data_worker_ctx_( - std::min(static_cast(conf.at("core_num")), 10)), -#endif + flush_data_worker_ctx_(conf.at("core_num")), // cur_flush_buffers_ will be resized in StartBackgroudWorkers() // when worker_num_ is determined data_sync_mem_controller_(static_cast( From cf0c0970ddcc6546a855f05d1b50a3594774dc0a Mon Sep 17 00:00:00 2001 From: lokax Date: Fri, 6 Mar 2026 17:46:13 +0800 Subject: [PATCH 4/6] update store --- store_handler/eloq_data_store_service/eloqstore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store_handler/eloq_data_store_service/eloqstore b/store_handler/eloq_data_store_service/eloqstore index 695b094d..76b034f2 160000 --- a/store_handler/eloq_data_store_service/eloqstore +++ b/store_handler/eloq_data_store_service/eloqstore @@ -1 +1 @@ -Subproject commit 695b094d6f02148f30589e658efa3536ede0c3c3 +Subproject commit 76b034f2673efb09ddda79665dc71ccad5f082d6 From 75fba6927a84f79f492786e323f14799a92a73a5 Mon Sep 17 00:00:00 2001 From: lokax Date: Mon, 9 Mar 2026 12:09:16 +0800 Subject: [PATCH 5/6] debug log --- tx_service/src/cc/cc_req_misc.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tx_service/src/cc/cc_req_misc.cpp b/tx_service/src/cc/cc_req_misc.cpp index eae335c7..e8d46b70 100644 --- a/tx_service/src/cc/cc_req_misc.cpp +++ b/tx_service/src/cc/cc_req_misc.cpp @@ -340,6 +340,8 @@ void FetchTableRangesCc::SetFinish(int err) error_code_ = static_cast(CcErrorCode::DATA_STORE_ERR); ranges_vec_.clear(); partition_ranges_vec_.clear(); + partition_scan_states_.clear(); + remaining_partitions_ = 0; }); ccs_.Enqueue(this); } From 3bec81868886f1485f2f72e983f16c52e1ba0950 Mon Sep 17 00:00:00 2001 From: lokax Date: Mon, 9 Mar 2026 17:32:59 +0800 Subject: [PATCH 6/6] fix --- tx_service/include/cc/local_cc_shards.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tx_service/include/cc/local_cc_shards.h b/tx_service/include/cc/local_cc_shards.h index 6d299392..b9fa85b2 100644 --- a/tx_service/include/cc/local_cc_shards.h +++ b/tx_service/include/cc/local_cc_shards.h @@ -1017,6 +1017,7 @@ class LocalCcShards range_entry->UpdateRangeEntry(version, std::move(range_slices)); } + mi_heap_set_default(prev_heap); if (is_override_thd) { mi_override_thread(prev_thd); @@ -1025,7 +1026,6 @@ class LocalCcShards { mi_restore_default_thread_id(); } - mi_heap_set_default(prev_heap); #if defined(WITH_JEMALLOC) JemallocArenaSwitcher::SwitchToArena(prev_arena_id);