From 8bf49400e0fbd22a4d5ebc8fb53f97d1ca77ea8a Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Thu, 26 Feb 2026 07:07:16 +0800 Subject: [PATCH 01/15] range sizes definition --- tx_service/include/cc/template_cc_map.h | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index f2a00630..fa5f5d4b 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -38,6 +38,7 @@ #include #include +#include "absl/container/flat_hash_map.h" #include "cc_entry.h" #include "cc_map.h" #include "cc_page_clean_guard.h" @@ -8756,6 +8757,10 @@ class TemplateCcMap : public CcMap } normal_obj_sz_ = 0; + if constexpr (RangePartitioned) + { + range_sizes_.clear(); + } ccmp_.clear(); } @@ -11907,6 +11912,9 @@ class TemplateCcMap : public CcMap TemplateCcMapSamplePool *sample_pool_; size_t normal_obj_sz_{ 0}; // The count of all normal status objects, only used for redis + + // Range id -> total size; only used when RangePartitioned. + absl::flat_hash_map range_sizes_; }; template From ff06fc128e44b38f6e25138764c96502d63d8249 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Thu, 26 Feb 2026 07:36:32 +0800 Subject: [PATCH 02/15] update range size on postwritecc --- tx_service/include/cc/cc_request.h | 6 ++++++ tx_service/include/cc/template_cc_map.h | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index 97e93fae..11ac652e 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -877,6 +877,12 @@ struct PostWriteCc : public TemplatedCcRequest return key_shard_code_; } + // Low 10 bits of key_shard_code_: range partition id (when range-partitioned). + uint32_t PartitionId() const + { + return key_shard_code_ & 0x3FF; + } + const void *Key() const { return is_remote_ ? nullptr : key_; diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index fa5f5d4b..3b0f8f7e 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -592,6 +592,8 @@ class TemplateCcMap : public CcMap cce->ArchiveBeforeUpdate(); } + [[maybe_unused]] const size_t old_payload_size = + cce->PayloadSize(); if (is_del) { cce->payload_.SetCurrentPayload(nullptr); @@ -613,6 +615,23 @@ class TemplateCcMap : public CcMap bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(commit_ts, new_status); + if constexpr (RangePartitioned) + { + const int64_t key_delta_size = + (new_status == RecordStatus::Deleted) + ? (-static_cast(write_key->Size() + + old_payload_size)) + : (static_cast(cce->PayloadSize()) - + static_cast(old_payload_size)); + const uint32_t range_id = req.PartitionId(); + auto it = range_sizes_.find(range_id); + assert(it != range_sizes_.end()); + assert(key_delta_size >= 0 || + it->second >= static_cast(-key_delta_size)); + it->second = static_cast( + static_cast(it->second) + key_delta_size); + } + if (req.IsInitialInsert()) { // Updates the ckpt ts after commit ts is set. From 15cb6addec60aeaa82344ab7bdccfcc0712438ae Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Thu, 26 Feb 2026 14:24:22 +0800 Subject: [PATCH 03/15] update store range size after datasync --- store_handler/data_store_service_client.cpp | 23 ++++++++--- store_handler/data_store_service_client.h | 6 ++- .../data_store_service_client_closure.cpp | 17 ++++++--- tx_service/include/catalog_factory.h | 1 + tx_service/include/cc/local_cc_shards.h | 6 ++- .../include/eloq_basic_catalog_factory.h | 2 + tx_service/include/range_record.h | 38 ++++++++++++++++--- tx_service/src/cc/local_cc_shards.cpp | 35 ++++++++++++++--- tx_service/src/eloq_basic_catalog_factory.cpp | 3 +- .../tests/include/mock/mock_catalog_factory.h | 1 + 10 files changed, 108 insertions(+), 24 deletions(-) diff --git a/store_handler/data_store_service_client.cpp b/store_handler/data_store_service_client.cpp index 0609c563..75332faf 100644 --- a/store_handler/data_store_service_client.cpp +++ b/store_handler/data_store_service_client.cpp @@ -1259,11 +1259,13 @@ std::string DataStoreServiceClient::EncodeRangeKey( std::string DataStoreServiceClient::EncodeRangeValue(int32_t range_id, uint64_t range_version, uint64_t version, - uint32_t segment_cnt) + uint32_t segment_cnt, + uint32_t range_size) { std::string kv_range_record; kv_range_record.reserve(sizeof(int32_t) + sizeof(uint64_t) + - sizeof(uint64_t) + sizeof(uint32_t)); + sizeof(uint64_t) + sizeof(uint32_t) + + sizeof(uint32_t)); kv_range_record.append(reinterpret_cast(&range_id), sizeof(int32_t)); kv_range_record.append(reinterpret_cast(&range_version), @@ -1273,6 +1275,8 @@ std::string DataStoreServiceClient::EncodeRangeValue(int32_t range_id, // segment_cnt of slices kv_range_record.append(reinterpret_cast(&segment_cnt), sizeof(uint32_t)); + kv_range_record.append(reinterpret_cast(&range_size), + sizeof(uint32_t)); return kv_range_record; } @@ -1340,6 +1344,7 @@ RangeSliceBatchPlan DataStoreServiceClient::PrepareRangeSliceBatches( RangeSliceBatchPlan plan; plan.segment_cnt = 0; plan.version = version; + plan.range_size = 0; // Estimate capacity based on slices size plan.segment_keys.reserve(slices.size() / 10 + 1); // Rough estimate @@ -1388,6 +1393,7 @@ RangeSliceBatchPlan DataStoreServiceClient::PrepareRangeSliceBatches( sizeof(uint32_t)); segment_record.append(slice_start_key.Data(), key_size); uint32_t slice_size = static_cast(slices[i]->Size()); + plan.range_size += slice_size; segment_record.append(reinterpret_cast(&slice_size), sizeof(uint32_t)); } @@ -1553,6 +1559,7 @@ void DataStoreServiceClient::EnqueueRangeMetadataRecord( uint64_t range_version, uint64_t version, uint32_t segment_cnt, + uint32_t range_size, RangeMetadataAccumulator &accumulator) { // Compute kv_table_name and kv_partition_id @@ -1563,8 +1570,8 @@ void DataStoreServiceClient::EnqueueRangeMetadataRecord( // Encode key and value std::string key_str = EncodeRangeKey(catalog_factory, table_name, range_start_key); - std::string rec_str = - EncodeRangeValue(partition_id, range_version, version, segment_cnt); + std::string rec_str = EncodeRangeValue( + partition_id, range_version, version, segment_cnt, range_size); // Get or create entry in accumulator auto key = std::make_pair(kv_table_name, kv_partition_id); @@ -1732,6 +1739,7 @@ bool DataStoreServiceClient::UpdateRangeSlices( req.range_slices_, req.partition_id_); uint32_t segment_cnt = slice_plan.segment_cnt; + uint32_t range_size = slice_plan.range_size; int32_t kv_partition_id = KvPartitionIdOfRangeSlices(*req.table_name_, req.partition_id_); auto iter = slice_plans.find(kv_partition_id); @@ -1756,6 +1764,7 @@ bool DataStoreServiceClient::UpdateRangeSlices( req.range_version_, req.ckpt_ts_, segment_cnt, + range_size, meta_acc); } @@ -1957,6 +1966,7 @@ bool DataStoreServiceClient::UpdateRangeSlices( range_version, version, segment_cnt, + slice_plans[0].range_size, meta_acc); SyncConcurrentRequest *meta_sync_concurrent = @@ -2048,6 +2058,7 @@ bool DataStoreServiceClient::UpsertRanges( auto slice_plan = PrepareRangeSliceBatches( table_name, version, range.slices_, range.partition_id_); uint32_t segment_cnt = slice_plan.segment_cnt; + uint32_t range_size = slice_plan.range_size; int32_t kv_partition_id = KvPartitionIdOfRangeSlices(table_name, range.partition_id_); @@ -2071,6 +2082,7 @@ bool DataStoreServiceClient::UpsertRanges( version, // range_version (using version for now) version, segment_cnt, + range_size, meta_acc); } @@ -4651,7 +4663,8 @@ bool DataStoreServiceClient::InitTableRanges( std::string key_str = EncodeRangeKey(catalog_factory, table_name, *neg_inf_key); - std::string rec_str = EncodeRangeValue(init_range_id, version, version, 0); + std::string rec_str = + EncodeRangeValue(init_range_id, version, version, 0, 0); keys.emplace_back(std::string_view(key_str.data(), key_str.size())); records.emplace_back(std::string_view(rec_str.data(), rec_str.size())); diff --git a/store_handler/data_store_service_client.h b/store_handler/data_store_service_client.h index 66f6618b..ad4bd52b 100644 --- a/store_handler/data_store_service_client.h +++ b/store_handler/data_store_service_client.h @@ -66,6 +66,7 @@ struct RangeSliceBatchPlan std::vector segment_keys; // Owned string buffers std::vector segment_records; // Owned string buffers size_t version; + uint32_t range_size{0}; // Clear method for reuse void Clear() @@ -74,6 +75,7 @@ struct RangeSliceBatchPlan segment_keys.clear(); segment_records.clear(); version = 0; + range_size = 0; } }; @@ -339,7 +341,8 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler std::string EncodeRangeValue(int32_t range_id, uint64_t range_version, uint64_t version, - uint32_t segment_cnt); + uint32_t segment_cnt, + uint32_t range_size); std::string EncodeRangeSliceKey(const txservice::TableName &table_name, int32_t range_id, uint32_t segment_id); @@ -642,6 +645,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler uint64_t range_version, uint64_t version, uint32_t segment_cnt, + uint32_t range_size, RangeMetadataAccumulator &accumulator); void DispatchRangeMetadataBatches( diff --git a/store_handler/data_store_service_client_closure.cpp b/store_handler/data_store_service_client_closure.cpp index fbf7d34b..7d84d4da 100644 --- a/store_handler/data_store_service_client_closure.cpp +++ b/store_handler/data_store_service_client_closure.cpp @@ -794,7 +794,8 @@ void FetchTableRangesCallback(void *data, { scan_next_closure->GetItem(i, key, value, ts, ttl); assert(value.size() == (sizeof(int32_t) + sizeof(uint64_t) + - sizeof(uint64_t) + sizeof(uint32_t))); + sizeof(uint64_t) + sizeof(uint32_t) + + sizeof(uint32_t))); const char *buf = value.data(); int32_t partition_id = *(reinterpret_cast(buf)); buf += sizeof(partition_id); @@ -802,6 +803,8 @@ void FetchTableRangesCallback(void *data, buf += sizeof(range_version); uint64_t slice_version = *(reinterpret_cast(buf)); buf += sizeof(slice_version); + buf += sizeof(uint32_t); // segment_cnt + uint32_t range_size = *(reinterpret_cast(buf)); std::string_view start_key_sv( key.data() + (table_name_sv.size() + KEY_SEPARATOR.size()), @@ -813,14 +816,17 @@ void FetchTableRangesCallback(void *data, { txservice::TxKey start_key = catalog_factory->CreateTxKey( start_key_sv.data(), start_key_sv.size()); - range_vec.emplace_back( - std::move(start_key), partition_id, range_version); + range_vec.emplace_back(std::move(start_key), + partition_id, + range_version, + range_size); } else { range_vec.emplace_back(catalog_factory->NegativeInfKey(), partition_id, - range_version); + range_version, + range_size); } } @@ -889,7 +895,8 @@ void FetchTableRangesCallback(void *data, catalog_factory->NegativeInfKey(), txservice::Sequences::InitialRangePartitionIdOf( fetch_range_cc->table_name_), - 1); + 1, + 0); } fetch_range_cc->AppendTableRanges( diff --git a/tx_service/include/catalog_factory.h b/tx_service/include/catalog_factory.h index 014e813f..80d42eb6 100644 --- a/tx_service/include/catalog_factory.h +++ b/tx_service/include/catalog_factory.h @@ -424,6 +424,7 @@ class CatalogFactory TxKey start_key, uint64_t version_ts, int64_t partition_id, + uint32_t store_range_size = 0, std::unique_ptr slices = nullptr) = 0; virtual std::unique_ptr CreatePkCcmScanner( diff --git a/tx_service/include/cc/local_cc_shards.h b/tx_service/include/cc/local_cc_shards.h index 961bee52..40e089fa 100644 --- a/tx_service/include/cc/local_cc_shards.h +++ b/tx_service/include/cc/local_cc_shards.h @@ -985,7 +985,11 @@ class LocalCcShards if (last_sync_ts > 0) { - new_range_ptr->UpdateLastDataSyncTS(last_sync_ts); + StoreRange *store_range = new_range_ptr->RangeSlices(); + size_t range_size = + store_range != nullptr ? store_range->PostCkptSize() : 0; + new_range_ptr->UpdateLastDataSyncTS( + last_sync_ts, static_cast(range_size)); } return new_range_ptr; diff --git a/tx_service/include/eloq_basic_catalog_factory.h b/tx_service/include/eloq_basic_catalog_factory.h index 9cee9ac5..6b7764f2 100644 --- a/tx_service/include/eloq_basic_catalog_factory.h +++ b/tx_service/include/eloq_basic_catalog_factory.h @@ -239,6 +239,7 @@ class EloqHashCatalogFactory : public CatalogFactory TxKey start_key, uint64_t version_ts, int64_t partition_id, + uint32_t store_range_size = 0, std::unique_ptr slices = nullptr) override { // No range partitioning supported for hash catalog factory @@ -311,6 +312,7 @@ class EloqRangeCatalogFactory : public CatalogFactory TxKey start_key, uint64_t version_ts, int64_t partition_id, + uint32_t store_range_size = 0, std::unique_ptr slices = nullptr) override; TxKey NegativeInfKey() const override; diff --git a/tx_service/include/range_record.h b/tx_service/include/range_record.h index 701b904d..1434313b 100644 --- a/tx_service/include/range_record.h +++ b/tx_service/include/range_record.h @@ -71,10 +71,22 @@ struct InitRangeEntry { } + InitRangeEntry(TxKey start_key, + int32_t partition_id, + uint64_t version_ts, + uint32_t size) + : key_(std::move(start_key)), + partition_id_(partition_id), + version_ts_(version_ts), + size_(size) + { + } + InitRangeEntry(InitRangeEntry &&rhs) noexcept : key_(std::move(rhs.key_)), partition_id_(rhs.partition_id_), - version_ts_(rhs.version_ts_) + version_ts_(rhs.version_ts_), + size_(rhs.size_) { } @@ -85,6 +97,7 @@ struct InitRangeEntry key_ = std::move(rhs.key_); partition_id_ = rhs.partition_id_; version_ts_ = rhs.version_ts_; + size_ = rhs.size_; } return *this; @@ -93,6 +106,8 @@ struct InitRangeEntry TxKey key_; int32_t partition_id_{0}; uint64_t version_ts_{0}; + // Store range size + uint32_t size_{0}; }; struct RangeInfo @@ -427,8 +442,12 @@ struct TableRangeEntry TableRangeEntry(const TableRangeEntry &) = delete; TableRangeEntry &operator=(const TableRangeEntry &) = delete; - TableRangeEntry(uint64_t version_ts, int64_t partition_id) - : mux_(), fetch_range_slices_req_(nullptr) + TableRangeEntry(uint64_t version_ts, + int64_t partition_id, + uint32_t store_range_size) + : mux_(), + fetch_range_slices_req_(nullptr), + store_range_size_(store_range_size) { } @@ -491,7 +510,7 @@ struct TableRangeEntry return last_sync_ts_; } - void UpdateLastDataSyncTS(uint64_t last_sync_ts) + void UpdateLastDataSyncTS(uint64_t last_sync_ts, uint32_t range_size) { std::unique_lock lk(mux_); @@ -500,6 +519,12 @@ struct TableRangeEntry // data sync succeeded, update last sync ts last_sync_ts_ = last_sync_ts; } + store_range_size_ = range_size; + } + + uint32_t StoreRangeSize() const + { + return store_range_size_; } void FetchRangeSlices(const TableName &range_tbl_name, @@ -521,6 +546,7 @@ struct TableRangeEntry WritePreferSharedMutex mux_; std::unique_ptr fetch_range_slices_req_{nullptr}; + uint32_t store_range_size_{0}; template friend class RangeCcMap; @@ -536,8 +562,10 @@ struct TemplateTableRangeEntry : public TableRangeEntry const KeyT *start_key, uint64_t version_ts, int64_t partition_id, + uint32_t store_range_size = 0, std::unique_ptr> slices = nullptr) - : range_info_(start_key, version_ts, partition_id), + : TableRangeEntry(version_ts, partition_id, store_range_size), + range_info_(start_key, version_ts, partition_id), range_slices_(std::move(slices)) { } diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index 2c1c9a76..39c43fdc 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -1026,7 +1026,11 @@ void LocalCcShards::CreateSplitRangeRecoveryTx( } else { - range_entry->UpdateLastDataSyncTS(0); + StoreRange *store_range = range_entry->RangeSlices(); + assert(store_range); + size_t range_size = store_range->PostCkptSize(); + range_entry->UpdateLastDataSyncTS( + 0, static_cast(range_size)); range_entry->UnPinStoreRange(); txservice::CommitTx(txm); } @@ -1089,7 +1093,8 @@ void LocalCcShards::InitTableRanges(const TableName &range_table_name, GetCatalogFactory(range_table_name.Engine()) ->CreateTableRange(std::move(range_start_key), range_entry.version_ts_, - range_entry.partition_id_); + range_entry.partition_id_, + range_entry.size_); range_it = ranges .try_emplace(new_range->RangeStartTxKey(), std::move(new_range)) @@ -3251,7 +3256,11 @@ void LocalCcShards::PostProcessFlushTaskEntries( { if (!task->during_split_range_) { - range_entry->UpdateLastDataSyncTS(task->data_sync_ts_); + StoreRange *store_range = range_entry->RangeSlices(); + assert(store_range); + size_t range_size = store_range->PostCkptSize(); + range_entry->UpdateLastDataSyncTS( + task->data_sync_ts_, static_cast(range_size)); range_entry->UnPinStoreRange(); // Commit the data sync txm txservice::CommitTx(entry->data_sync_txm_); @@ -3406,7 +3415,11 @@ void LocalCcShards::PostProcessRangePartitionDataSyncTask( { if (!task->during_split_range_) { - range_entry->UpdateLastDataSyncTS(task->data_sync_ts_); + StoreRange *store_range = range_entry->RangeSlices(); + assert(store_range); + size_t range_size = store_range->PostCkptSize(); + range_entry->UpdateLastDataSyncTS( + task->data_sync_ts_, static_cast(range_size)); range_entry->UnPinStoreRange(); // Commit the data sync txm txservice::CommitTx(data_sync_txm); @@ -3912,7 +3925,12 @@ void LocalCcShards::DataSyncForRangePartition( txservice::CommitTx(data_sync_txm); // Update the task status for this range. - range_entry->UpdateLastDataSyncTS(data_sync_task->data_sync_ts_); + StoreRange *store_range = range_entry->RangeSlices(); + assert(store_range); + size_t range_size = store_range->PostCkptSize(); + range_entry->UpdateLastDataSyncTS( + data_sync_task->data_sync_ts_, + static_cast(range_size)); // Generally, the StoreRange will be pinned only when there are data // items in the range that needs to be ckpted. if (scan_delta_size_cc.StoreRangePtr() != nullptr) @@ -5691,7 +5709,12 @@ void LocalCcShards::SplitFlushRange( return; } - range_entry->UpdateLastDataSyncTS(data_sync_task->data_sync_ts_); + StoreRange *store_range = range_entry->RangeSlices(); + assert(store_range); + size_t range_size = store_range->PostCkptSize(); + range_entry->UpdateLastDataSyncTS(data_sync_task->data_sync_ts_, + static_cast(range_size)); + range_entry->UnPinStoreRange(); data_sync_task->SetFinish(); diff --git a/tx_service/src/eloq_basic_catalog_factory.cpp b/tx_service/src/eloq_basic_catalog_factory.cpp index 9983f44e..9b6ae905 100644 --- a/tx_service/src/eloq_basic_catalog_factory.cpp +++ b/tx_service/src/eloq_basic_catalog_factory.cpp @@ -338,6 +338,7 @@ std::unique_ptr EloqRangeCatalogFactory::CreateTableRange( TxKey start_key, uint64_t version_ts, int64_t partition_id, + uint32_t store_range_size, std::unique_ptr slices) { assert(start_key.Type() == KeyType::NegativeInf || start_key.IsOwner()); @@ -354,7 +355,7 @@ std::unique_ptr EloqRangeCatalogFactory::CreateTableRange( range_ptr}; return std::make_unique>( - start, version_ts, partition_id, std::move(typed_range)); + start, version_ts, partition_id, store_range_size, std::move(typed_range)); } TxKey EloqRangeCatalogFactory::NegativeInfKey() const diff --git a/tx_service/tests/include/mock/mock_catalog_factory.h b/tx_service/tests/include/mock/mock_catalog_factory.h index 923b43cf..05669832 100644 --- a/tx_service/tests/include/mock/mock_catalog_factory.h +++ b/tx_service/tests/include/mock/mock_catalog_factory.h @@ -246,6 +246,7 @@ class MockCatalogFactory : public CatalogFactory TxKey start_key, uint64_t version_ts, int64_t partition_id, + uint32_t store_range_size = 0, std::unique_ptr slices = nullptr) override { assert(false); From 575f24d9d78f4ced8cfe17e3c9dcb2573c18040f Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Thu, 26 Feb 2026 15:14:45 +0800 Subject: [PATCH 04/15] clang format --- store_handler/data_store_service_client_closure.cpp | 6 +++--- tx_service/include/cc/cc_request.h | 3 ++- tx_service/src/eloq_basic_catalog_factory.cpp | 6 +++++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/store_handler/data_store_service_client_closure.cpp b/store_handler/data_store_service_client_closure.cpp index 7d84d4da..b4075600 100644 --- a/store_handler/data_store_service_client_closure.cpp +++ b/store_handler/data_store_service_client_closure.cpp @@ -793,9 +793,9 @@ void FetchTableRangesCallback(void *data, for (uint32_t i = 0; i < items_size; i++) { scan_next_closure->GetItem(i, key, value, ts, ttl); - assert(value.size() == (sizeof(int32_t) + sizeof(uint64_t) + - sizeof(uint64_t) + sizeof(uint32_t) + - sizeof(uint32_t))); + assert(value.size() == + (sizeof(int32_t) + sizeof(uint64_t) + sizeof(uint64_t) + + sizeof(uint32_t) + sizeof(uint32_t))); const char *buf = value.data(); int32_t partition_id = *(reinterpret_cast(buf)); buf += sizeof(partition_id); diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index 11ac652e..81f70abf 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -877,7 +877,8 @@ struct PostWriteCc : public TemplatedCcRequest return key_shard_code_; } - // Low 10 bits of key_shard_code_: range partition id (when range-partitioned). + // Low 10 bits of key_shard_code_: range partition id (when + // range-partitioned). uint32_t PartitionId() const { return key_shard_code_ & 0x3FF; diff --git a/tx_service/src/eloq_basic_catalog_factory.cpp b/tx_service/src/eloq_basic_catalog_factory.cpp index 9b6ae905..9c844e5b 100644 --- a/tx_service/src/eloq_basic_catalog_factory.cpp +++ b/tx_service/src/eloq_basic_catalog_factory.cpp @@ -355,7 +355,11 @@ std::unique_ptr EloqRangeCatalogFactory::CreateTableRange( range_ptr}; return std::make_unique>( - start, version_ts, partition_id, store_range_size, std::move(typed_range)); + start, + version_ts, + partition_id, + store_range_size, + std::move(typed_range)); } TxKey EloqRangeCatalogFactory::NegativeInfKey() const From 4a6a18ee0b16e74141c0900f51bd50abced91a85 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Thu, 26 Feb 2026 16:57:16 +0800 Subject: [PATCH 05/15] Init range sizes for ccmap --- tx_service/include/cc/cc_map.h | 18 ++++++++ tx_service/include/cc/cc_request.h | 55 +++++++++++++++++++++++++ tx_service/include/cc/cc_shard.h | 10 +++++ tx_service/include/cc/local_cc_shards.h | 11 +++++ tx_service/include/cc/template_cc_map.h | 18 ++++++++ tx_service/src/cc/cc_shard.cpp | 7 ++++ tx_service/src/cc/local_cc_shards.cpp | 25 +++++++++++ 7 files changed, 144 insertions(+) diff --git a/tx_service/include/cc/cc_map.h b/tx_service/include/cc/cc_map.h index 0d1434b6..2cf133ea 100644 --- a/tx_service/include/cc/cc_map.h +++ b/tx_service/include/cc/cc_map.h @@ -25,6 +25,7 @@ #include #include // std::pair +#include "absl/container/flat_hash_map.h" #include "cc/cc_req_base.h" #include "cc_protocol.h" #include "error_messages.h" // CcErrorCode @@ -260,6 +261,23 @@ class CcMap virtual const txservice::KeySchema *KeySchema() const = 0; virtual const txservice::RecordSchema *RecordSchema() const = 0; + /** + * Whether range_sizes_ has been loaded from store (for range-partitioned + * tables). Default: true (no lazy init needed for non-range-partitioned). + */ + virtual bool RangeSizesInited() const + { + return true; + } + + /** + * Initialize range_sizes_ from store (e.g. + * TableRangeEntry::StoreRangeSize). No-op for non-range-partitioned CcMaps. + */ + virtual void InitRangeSizes(absl::flat_hash_map &&) + { + } + uint64_t SchemaTs() const { return schema_ts_; diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index 81f70abf..e6f00492 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -251,6 +251,15 @@ struct TemplatedCcRequest : public CcRequestBase } assert(ccm != nullptr); assert(ccs.core_id_ == ccm->shard_->core_id_); + if (!table_name_->IsMeta() && !ccm->RangeSizesInited()) + { + TableName range_table_name(table_name_->StringView(), + TableType::RangePartition, + table_name_->Engine()); + absl::flat_hash_map range_sizes = + ccs.GetStoreRangeSizes(range_table_name, node_group_id_); + ccm->InitRangeSizes(std::move(range_sizes)); + } return ccm->Execute(*typed_req); } else @@ -259,6 +268,15 @@ struct TemplatedCcRequest : public CcRequestBase // execution blocked by lock. assert(ccm_ != nullptr); assert(ccs.core_id_ == ccm_->shard_->core_id_); + if (!table_name_->IsMeta() && !ccm_->RangeSizesInited()) + { + TableName range_table_name(table_name_->StringView(), + TableType::RangePartition, + table_name_->Engine()); + absl::flat_hash_map range_sizes = + ccs.GetStoreRangeSizes(range_table_name, node_group_id_); + ccm_->InitRangeSizes(std::move(range_sizes)); + } return ccm_->Execute(*typed_req); } } @@ -2539,6 +2557,15 @@ struct ScanSliceCc ccm_ = ccm; } assert(ccm != nullptr); + if (!table_name_->IsMeta() && !ccm->RangeSizesInited()) + { + TableName range_table_name(table_name_->StringView(), + TableType::RangePartition, + table_name_->Engine()); + absl::flat_hash_map range_sizes = + ccs.GetStoreRangeSizes(range_table_name, node_group_id_); + ccm->InitRangeSizes(std::move(range_sizes)); + } return ccm->Execute(*this); } else @@ -2546,6 +2573,15 @@ struct ScanSliceCc // non parallel request which is executed again, e.g. initial // execution blocked by lock. assert(ccm_ != nullptr); + if (!table_name_->IsMeta() && !ccm_->RangeSizesInited()) + { + TableName range_table_name(table_name_->StringView(), + TableType::RangePartition, + table_name_->Engine()); + absl::flat_hash_map range_sizes = + ccs.GetStoreRangeSizes(range_table_name, node_group_id_); + ccm_->InitRangeSizes(std::move(range_sizes)); + } return ccm_->Execute(*this); } } @@ -5016,6 +5052,15 @@ struct ReplayLogCc : public TemplatedCcRequest table_schema_ = ccm_->GetTableSchema(); } } + if (!table_name_->IsMeta() && !ccm_->RangeSizesInited()) + { + TableName range_table_name(table_name_->StringView(), + TableType::RangePartition, + table_name_->Engine()); + absl::flat_hash_map range_sizes = + ccs.GetStoreRangeSizes(range_table_name, node_group_id_); + ccm_->InitRangeSizes(std::move(range_sizes)); + } return ccm_->Execute(*this); } @@ -7851,6 +7896,16 @@ struct UploadBatchCc : public CcRequestBase } assert(ccm != nullptr); + assert(!table_name_->IsMeta()); + if (!ccm->RangeSizesInited()) + { + TableName range_table_name(table_name_->StringView(), + TableType::RangePartition, + table_name_->Engine()); + absl::flat_hash_map range_sizes = + ccs.GetStoreRangeSizes(range_table_name, node_group_id_); + ccm->InitRangeSizes(std::move(range_sizes)); + } return ccm->Execute(*this); } diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index ee977e68..7f8cbfc6 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -313,6 +313,16 @@ class CcShard */ CcMap *GetCcm(const TableName &table_name, uint32_t node_group); + /** + * Returns partition_id -> store_range_size for ranges that belong to this + * node group and this shard (core). Used to lazy-init + * TemplateCcMap::range_sizes_. + * @param range_table_name Range table name. + * @param node_group Node group id. + */ + absl::flat_hash_map GetStoreRangeSizes( + const TableName &range_table_name, const NodeGroupId node_group); + void AdjustDataKeyStats(const TableName &table_name, int64_t size_delta, int64_t dirty_delta); diff --git a/tx_service/include/cc/local_cc_shards.h b/tx_service/include/cc/local_cc_shards.h index 40e089fa..6ba31ded 100644 --- a/tx_service/include/cc/local_cc_shards.h +++ b/tx_service/include/cc/local_cc_shards.h @@ -1053,6 +1053,17 @@ class LocalCcShards std::map *GetTableRangesForATable( const TableName &range_table_name, const NodeGroupId ng_id); + /** + * Returns partition_id -> StoreRangeSize() for ranges that belong to ng_id + * and the given core_id (partition_id % Count() == core_id). + * Caller must hold shared lock on table range meta if needed; this method + * takes shared_lock on fast_meta_data_mux_. + */ + absl::flat_hash_map GetStoreRangeSizes( + const TableName &range_table_name, + const NodeGroupId ng_id, + const uint16_t core_id); + /** * @brief Upload new range info into range_info_ in TableRangeEntry * object. diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 3b0f8f7e..b3c5f879 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -173,6 +173,24 @@ class TemplateCcMap : public CcMap neg_inf_.ClearLocks(*shard_, cc_ng_id_); } + bool RangeSizesInited() const override + { + if constexpr (RangePartitioned) + { + return range_sizes_.size() > 0; + } + return true; + } + + void InitRangeSizes( + absl::flat_hash_map &&range_sizes) override + { + if constexpr (RangePartitioned) + { + range_sizes_ = std::move(range_sizes); + } + } + bool Execute(AcquireCc &req) override { TX_TRACE_ACTION_WITH_CONTEXT( diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index c61cc0dc..da86a6cf 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -377,6 +377,13 @@ CcMap *CcShard::GetCcm(const TableName &table_name, uint32_t node_group) } } +absl::flat_hash_map CcShard::GetStoreRangeSizes( + const TableName &range_table_name, const NodeGroupId node_group) +{ + return local_shards_.GetStoreRangeSizes( + range_table_name, node_group, core_id_); +} + void CcShard::AdjustDataKeyStats(const TableName &table_name, int64_t size_delta, int64_t dirty_delta) diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index 39c43fdc..f750f9a5 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -1263,6 +1263,31 @@ LocalCcShards::GetTableRangeIdsForATableInternal( return ng_it == table_it->second.end() ? nullptr : &ng_it->second; } +absl::flat_hash_map LocalCcShards::GetStoreRangeSizes( + const TableName &range_table_name, + const NodeGroupId ng_id, + const uint16_t core_id) +{ + absl::flat_hash_map range_sizes; + std::shared_lock lk(fast_meta_data_mux_); + std::unordered_map *range_ids = + GetTableRangeIdsForATableInternal(range_table_name, ng_id); + assert(range_ids != nullptr); + for (const auto &[partition_id, entry] : *range_ids) + { + BucketInfo *owner = + GetRangeOwnerInternal(static_cast(partition_id), ng_id); + NodeGroupId range_owner = owner->BucketOwner(); + uint16_t range_shard_idx = + static_cast(partition_id % Count()); + if (range_owner == ng_id && range_shard_idx == core_id) + { + range_sizes.emplace(partition_id, entry->StoreRangeSize()); + } + } + return range_sizes; +} + void LocalCcShards::CleanTableRange(const TableName &table_name, NodeGroupId ng_id) { From f072cc81f141d025f2c5e09824aaa1cd42c294a4 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Fri, 27 Feb 2026 10:15:51 +0800 Subject: [PATCH 06/15] update --- tx_service/include/cc/template_cc_map.h | 36 ++++++++++++------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index b3c5f879..00325b36 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -173,24 +173,6 @@ class TemplateCcMap : public CcMap neg_inf_.ClearLocks(*shard_, cc_ng_id_); } - bool RangeSizesInited() const override - { - if constexpr (RangePartitioned) - { - return range_sizes_.size() > 0; - } - return true; - } - - void InitRangeSizes( - absl::flat_hash_map &&range_sizes) override - { - if constexpr (RangePartitioned) - { - range_sizes_ = std::move(range_sizes); - } - } - bool Execute(AcquireCc &req) override { TX_TRACE_ACTION_WITH_CONTEXT( @@ -11937,6 +11919,24 @@ class TemplateCcMap : public CcMap return &pos_inf_page_; } + bool RangeSizesInited() const override + { + if constexpr (RangePartitioned) + { + return range_sizes_.size() > 0; + } + return true; + } + + void InitRangeSizes( + absl::flat_hash_map &&range_sizes) override + { + if constexpr (RangePartitioned) + { + range_sizes_ = std::move(range_sizes); + } + } + absl::btree_map< KeyT, std::unique_ptr< From 24d92a9f5fbc4bd54d9e0487b3c0d7aedbf18f03 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Sat, 28 Feb 2026 17:00:59 +0800 Subject: [PATCH 07/15] update the definition of range sizes --- tx_service/include/cc/cc_map.h | 8 ++++++ tx_service/include/cc/template_cc_map.h | 34 +++++++++++++++++++------ 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/tx_service/include/cc/cc_map.h b/tx_service/include/cc/cc_map.h index 2cf133ea..1507bf41 100644 --- a/tx_service/include/cc/cc_map.h +++ b/tx_service/include/cc/cc_map.h @@ -21,6 +21,7 @@ */ #pragma once +#include #include #include #include // std::pair @@ -312,6 +313,13 @@ class CcMap uint64_t last_dirty_commit_ts_{0}; protected: + // Range id -> (range_size, delta_range_size). Only used when + // RangePartitioned. + // - first: current range size; RangeSizeState::Loading (-1) = loading from + // store; RangeSizeState::Uninitialized (-2) = not yet loaded. + // - second: delta accumulated during load (first==-1) or split (first>=0). + absl::flat_hash_map> range_sizes_; + /** * @brief After the input request is executed at the current shard, moves * the request to another shard for execution. diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 00325b36..957fb8eb 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -626,10 +626,22 @@ class TemplateCcMap : public CcMap const uint32_t range_id = req.PartitionId(); auto it = range_sizes_.find(range_id); assert(it != range_sizes_.end()); - assert(key_delta_size >= 0 || - it->second >= static_cast(-key_delta_size)); - it->second = static_cast( - static_cast(it->second) + key_delta_size); + int32_t &first = it->second.first; + int32_t &second = it->second.second; + if (first >= 0) + { + assert(key_delta_size >= 0 || + static_cast(first) >= -key_delta_size); + first = static_cast( + static_cast(first) + key_delta_size); + assert(first >= 0); + } + else + { + // Loading (-1) or Uninitialized (-2): accumulate delta + second = static_cast( + static_cast(second) + key_delta_size); + } } if (req.IsInitialInsert()) @@ -11933,7 +11945,16 @@ class TemplateCcMap : public CcMap { if constexpr (RangePartitioned) { - range_sizes_ = std::move(range_sizes); + range_sizes_.clear(); + for (auto &[range_id, size] : range_sizes) + { + constexpr size_t kMaxInt32 = + static_cast(std::numeric_limits::max()); + int32_t clamped = (size > kMaxInt32) + ? std::numeric_limits::max() + : static_cast(size); + range_sizes_[range_id] = {clamped, 0}; + } } } @@ -11949,9 +11970,6 @@ class TemplateCcMap : public CcMap TemplateCcMapSamplePool *sample_pool_; size_t normal_obj_sz_{ 0}; // The count of all normal status objects, only used for redis - - // Range id -> total size; only used when RangePartitioned. - absl::flat_hash_map range_sizes_; }; template From 772a6b122ad20f63bb3a3dfaa607b1904e39135b Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Sat, 28 Feb 2026 17:15:20 +0800 Subject: [PATCH 08/15] modify the interface of update range size --- tx_service/include/cc/cc_request.h | 23 +++++++-- tx_service/include/cc/template_cc_map.h | 67 ++++++++++++++++++------- tx_service/include/type.h | 7 +++ 3 files changed, 75 insertions(+), 22 deletions(-) diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index e6f00492..4eeb433d 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -758,7 +758,8 @@ struct PostWriteCc : public TemplatedCcRequest const TxRecord *rec, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult *res) + CcHandlerResult *res, + bool on_dirty_range = false) { TemplatedCcRequest::Reset( nullptr, res, addr->NodeGroupId(), tx_number, tx_term); @@ -772,6 +773,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = false; ccm_ = nullptr; is_initial_insert_ = false; + on_dirty_range_ = on_dirty_range; } void Reset(const TxKey *key, @@ -785,7 +787,8 @@ struct PostWriteCc : public TemplatedCcRequest uint32_t key_shard_code, CcHandlerResult *res, bool initial_insertion = false, - int64_t ng_term = INIT_TERM) + int64_t ng_term = INIT_TERM, + bool on_dirty_range = false) { TemplatedCcRequest::Reset( &table_name, @@ -806,6 +809,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = false; ccm_ = nullptr; is_initial_insert_ = initial_insertion; + on_dirty_range_ = on_dirty_range; } void Reset(const CcEntryAddr *addr, @@ -815,7 +819,8 @@ struct PostWriteCc : public TemplatedCcRequest const std::string *rec, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult *res) + CcHandlerResult *res, + bool on_dirty_range = false) { TemplatedCcRequest::Reset( nullptr, res, addr->NodeGroupId(), tx_number, tx_term); @@ -829,6 +834,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = true; ccm_ = nullptr; is_initial_insert_ = false; + on_dirty_range_ = on_dirty_range; } void Reset(const TableName *table_name, @@ -842,7 +848,8 @@ struct PostWriteCc : public TemplatedCcRequest uint32_t key_shard_code, CcHandlerResult *res, bool initial_insertion = false, - int64_t ng_term = INIT_TERM) + int64_t ng_term = INIT_TERM, + bool on_dirty_range = false) { TemplatedCcRequest::Reset( table_name, @@ -863,6 +870,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = true; ccm_ = nullptr; is_initial_insert_ = initial_insertion; + on_dirty_range_ = on_dirty_range; } const CcEntryAddr *CceAddr() const @@ -917,6 +925,11 @@ struct PostWriteCc : public TemplatedCcRequest return is_initial_insert_; } + bool OnDirtyRange() const + { + return on_dirty_range_; + } + private: const CcEntryAddr *cce_addr_; uint64_t commit_ts_; @@ -934,6 +947,8 @@ struct PostWriteCc : public TemplatedCcRequest const void *key_; const std::string *key_str_; }; + // True if the range that the key belongs to is being splitting. + bool on_dirty_range_{false}; }; struct PostWriteAllCc diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 957fb8eb..cb3c8081 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -624,24 +624,10 @@ class TemplateCcMap : public CcMap : (static_cast(cce->PayloadSize()) - static_cast(old_payload_size)); const uint32_t range_id = req.PartitionId(); - auto it = range_sizes_.find(range_id); - assert(it != range_sizes_.end()); - int32_t &first = it->second.first; - int32_t &second = it->second.second; - if (first >= 0) - { - assert(key_delta_size >= 0 || - static_cast(first) >= -key_delta_size); - first = static_cast( - static_cast(first) + key_delta_size); - assert(first >= 0); - } - else - { - // Loading (-1) or Uninitialized (-2): accumulate delta - second = static_cast( - static_cast(second) + key_delta_size); - } + // is_dirty: true when range is splitting. + UpdateRangeSize(range_id, + static_cast(key_delta_size), + req.OnDirtyRange()); } if (req.IsInitialInsert()) @@ -11958,6 +11944,51 @@ class TemplateCcMap : public CcMap } } + void UpdateRangeSize(uint32_t partition_id, + int32_t delta_size, + bool is_dirty) + { + if constexpr (RangePartitioned) + { + auto it = range_sizes_.find(partition_id); + if (it == range_sizes_.end()) + { + it = range_sizes_ + .emplace(partition_id, + std::make_pair( + static_cast( + RangeSizeStatus::kNotInitialized), + 0)) + .first; + } + if (it->second.first == + static_cast(RangeSizeStatus::kNotInitialized)) + { + // Init the range size of this range. + it->second.first = + static_cast(RangeSizeStatus::kLoading); + + // TODO: fetch the range size. Implementation in + // range-size-tracking-phase3-persist-implementation-v1.md + return; + } + + if (it->second.first == + static_cast(RangeSizeStatus::kLoading) || + is_dirty) + { + // Loading or split: record delta in delta part (.second). + it->second.second += delta_size; + } + else + { + assert(delta_size >= 0 || + it->second.first >= static_cast(-delta_size)); + it->second.first += delta_size; + } + } // RangePartitioned + } + absl::btree_map< KeyT, std::unique_ptr< diff --git a/tx_service/include/type.h b/tx_service/include/type.h index 2fe288c5..566e4171 100644 --- a/tx_service/include/type.h +++ b/tx_service/include/type.h @@ -167,6 +167,13 @@ enum class TableEngine : uint8_t InternalHash = 5, // eg. Sequence table is a kind of internal hash table. }; +// Status values for range_sizes_.first (range size not yet known). +enum RangeSizeStatus : int32_t +{ + kNotInitialized = -2, // Range size not yet initialized; need to fetch. + kLoading = -1, // Range size is being loaded; delta goes to .second. +}; + inline std::string KvTablePrefixOf(TableEngine engine) { switch (engine) From 4360e6e57c48b48148d520fb1e0389bfb57d50b8 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Sat, 28 Feb 2026 18:19:01 +0800 Subject: [PATCH 09/15] modify logic for load range size --- store_handler/bigtable_handler.cpp | 7 +++ store_handler/bigtable_handler.h | 3 + store_handler/data_store_service_client.cpp | 24 ++++++++ store_handler/data_store_service_client.h | 8 +++ .../data_store_service_client_closure.cpp | 58 ++++++++++++++---- .../data_store_service_client_closure.h | 8 +++ store_handler/dynamo_handler.cpp | 6 ++ store_handler/dynamo_handler.h | 1 + store_handler/rocksdb_handler.cpp | 7 +++ store_handler/rocksdb_handler.h | 3 + tx_service/include/catalog_factory.h | 1 - tx_service/include/cc/cc_map.h | 20 ++----- tx_service/include/cc/cc_req_misc.h | 31 ++++++++++ tx_service/include/cc/cc_request.h | 55 ----------------- tx_service/include/cc/cc_shard.h | 14 ++--- tx_service/include/cc/local_cc_shards.h | 17 +----- tx_service/include/cc/template_cc_map.h | 34 ++--------- .../include/eloq_basic_catalog_factory.h | 2 - tx_service/include/range_record.h | 37 ++---------- tx_service/include/store/data_store_handler.h | 2 + tx_service/src/cc/cc_map.cpp | 25 ++++++++ tx_service/src/cc/cc_req_misc.cpp | 45 ++++++++++++++ tx_service/src/cc/cc_shard.cpp | 23 +++++-- tx_service/src/cc/local_cc_shards.cpp | 60 ++----------------- tx_service/src/eloq_basic_catalog_factory.cpp | 7 +-- .../tests/include/mock/mock_catalog_factory.h | 1 - 26 files changed, 263 insertions(+), 236 deletions(-) diff --git a/store_handler/bigtable_handler.cpp b/store_handler/bigtable_handler.cpp index 52a712de..172c321f 100644 --- a/store_handler/bigtable_handler.cpp +++ b/store_handler/bigtable_handler.cpp @@ -710,6 +710,13 @@ void EloqDS::BigTableHandler::FetchRangeSlices( fetch_cc)); } +void EloqDS::BigTableHandler::FetchTableRangeSize( + txservice::FetchTableRangeSizeCc *fetch_cc) +{ + LOG(ERROR) << "BigTableHandler::FetchTableRangeSize not implemented"; + assert(false); +} + void EloqDS::BigTableHandler::OnFetchRangeSlices( google::cloud::future>> f, diff --git a/store_handler/bigtable_handler.h b/store_handler/bigtable_handler.h index 10006bbe..e3ccd39c 100644 --- a/store_handler/bigtable_handler.h +++ b/store_handler/bigtable_handler.h @@ -82,6 +82,9 @@ class BigTableHandler : public txservice::store::DataStoreHandler void FetchRangeSlices(txservice::FetchRangeSlicesReq *fetch_cc) override; + void FetchTableRangeSize( + txservice::FetchTableRangeSizeCc *fetch_cc) override; + /** * @brief Read a row from base table or skindex table in datastore with * specified key. Caller should pass in complete primary key or skindex key. diff --git a/store_handler/data_store_service_client.cpp b/store_handler/data_store_service_client.cpp index 75332faf..3d6d6d36 100644 --- a/store_handler/data_store_service_client.cpp +++ b/store_handler/data_store_service_client.cpp @@ -1038,6 +1038,30 @@ void DataStoreServiceClient::FetchRangeSlices( &FetchRangeSlicesCallback); } +void DataStoreServiceClient::FetchTableRangeSize( + txservice::FetchTableRangeSizeCc *fetch_cc) +{ + txservice::TableName range_table_name(fetch_cc->table_name_->StringView(), + txservice::TableType::RangePartition, + fetch_cc->table_name_->Engine()); + + int32_t kv_partition_id = + KvPartitionIdOfRangeSlices(range_table_name, fetch_cc->partition_id_); + uint32_t shard_id = GetShardIdByPartitionId(kv_partition_id, false); + + auto catalog_factory = GetCatalogFactory(range_table_name.Engine()); + assert(catalog_factory != nullptr); + fetch_cc->kv_start_key_ = + EncodeRangeKey(catalog_factory, range_table_name, fetch_cc->start_key_); + + Read(kv_range_table_name, + kv_partition_id, + shard_id, + fetch_cc->kv_start_key_, + fetch_cc, + &FetchRangeSizeCallback); +} + /** * @brief Deletes data that is out of the specified range. * diff --git a/store_handler/data_store_service_client.h b/store_handler/data_store_service_client.h index ad4bd52b..503d2512 100644 --- a/store_handler/data_store_service_client.h +++ b/store_handler/data_store_service_client.h @@ -273,6 +273,9 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler void FetchRangeSlices(txservice::FetchRangeSlicesReq *fetch_cc) override; + void FetchTableRangeSize( + txservice::FetchTableRangeSizeCc *fetch_cc) override; + bool DeleteOutOfRangeData( const txservice::TableName &table_name, int32_t partition_id, @@ -926,6 +929,11 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler ::google::protobuf::Closure *closure, DataStoreServiceClient &client, const remote::CommonResult &result); + + friend void FetchRangeSizeCallback(void *data, + ::google::protobuf::Closure *closure, + DataStoreServiceClient &client, + const remote::CommonResult &result); }; struct UpsertTableData diff --git a/store_handler/data_store_service_client_closure.cpp b/store_handler/data_store_service_client_closure.cpp index b4075600..b03911f9 100644 --- a/store_handler/data_store_service_client_closure.cpp +++ b/store_handler/data_store_service_client_closure.cpp @@ -803,8 +803,6 @@ void FetchTableRangesCallback(void *data, buf += sizeof(range_version); uint64_t slice_version = *(reinterpret_cast(buf)); buf += sizeof(slice_version); - buf += sizeof(uint32_t); // segment_cnt - uint32_t range_size = *(reinterpret_cast(buf)); std::string_view start_key_sv( key.data() + (table_name_sv.size() + KEY_SEPARATOR.size()), @@ -816,17 +814,14 @@ void FetchTableRangesCallback(void *data, { txservice::TxKey start_key = catalog_factory->CreateTxKey( start_key_sv.data(), start_key_sv.size()); - range_vec.emplace_back(std::move(start_key), - partition_id, - range_version, - range_size); + range_vec.emplace_back( + std::move(start_key), partition_id, range_version); } else { range_vec.emplace_back(catalog_factory->NegativeInfKey(), partition_id, - range_version, - range_size); + range_version); } } @@ -895,8 +890,7 @@ void FetchTableRangesCallback(void *data, catalog_factory->NegativeInfKey(), txservice::Sequences::InitialRangePartitionIdOf( fetch_range_cc->table_name_), - 1, - 0); + 1); } fetch_range_cc->AppendTableRanges( @@ -933,6 +927,45 @@ void FetchTableRangesCallback(void *data, } } +void FetchRangeSizeCallback(void *data, + ::google::protobuf::Closure *closure, + DataStoreServiceClient &client, + const remote::CommonResult &result) +{ + txservice::FetchTableRangeSizeCc *fetch_range_size_cc = + static_cast(data); + + if (result.error_code() == remote::DataStoreError::KEY_NOT_FOUND) + { + fetch_range_size_cc->store_range_size_ = 0; + fetch_range_size_cc->SetFinish( + static_cast(txservice::CcErrorCode::NO_ERROR)); + } + else if (result.error_code() != remote::DataStoreError::NO_ERROR) + { + LOG(ERROR) << "Fetch range size failed with error code: " + << result.error_code(); + fetch_range_size_cc->SetFinish( + static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); + } + else + { + ReadClosure *read_closure = static_cast(closure); + std::string_view read_val = read_closure->Value(); + assert(read_closure->TableName() == kv_range_table_name); + assert(read_val.size() == + (sizeof(int32_t) + sizeof(uint64_t) + sizeof(uint64_t) + + sizeof(uint32_t) + sizeof(int32_t))); + const char *buf = read_val.data(); + buf += read_val.size() - sizeof(int32_t); + fetch_range_size_cc->store_range_size_ = + *reinterpret_cast(buf); + + fetch_range_size_cc->SetFinish( + static_cast(txservice::CcErrorCode::NO_ERROR)); + } +} + void FetchRangeSlicesCallback(void *data, ::google::protobuf::Closure *closure, DataStoreServiceClient &client, @@ -973,8 +1006,9 @@ void FetchRangeSlicesCallback(void *data, else { assert(read_closure->TableName() == kv_range_table_name); - assert(read_val.size() == (sizeof(int32_t) + sizeof(uint64_t) + - sizeof(uint64_t) + sizeof(uint32_t))); + assert(read_val.size() == + (sizeof(int32_t) + sizeof(uint64_t) + sizeof(uint64_t) + + sizeof(uint32_t) + sizeof(int32_t))); const char *buf = read_val.data(); int32_t range_partition_id = *(reinterpret_cast(buf)); diff --git a/store_handler/data_store_service_client_closure.h b/store_handler/data_store_service_client_closure.h index 4bb72373..b8c3813c 100644 --- a/store_handler/data_store_service_client_closure.h +++ b/store_handler/data_store_service_client_closure.h @@ -3102,6 +3102,14 @@ void FetchTableRangesCallback(void *data, DataStoreServiceClient &client, const remote::CommonResult &result); +/** + * Callback for fetching range size from table_ranges. + */ +void FetchRangeSizeCallback(void *data, + ::google::protobuf::Closure *closure, + DataStoreServiceClient &client, + const remote::CommonResult &result); + /** * Callback for fetching range slices. * diff --git a/store_handler/dynamo_handler.cpp b/store_handler/dynamo_handler.cpp index 0aa7ef78..5bfa9029 100644 --- a/store_handler/dynamo_handler.cpp +++ b/store_handler/dynamo_handler.cpp @@ -2534,6 +2534,12 @@ void EloqDS::DynamoHandler::FetchRangeSlices(FetchRangeSlicesReq *fetch_cc) assert(false); } +void EloqDS::DynamoHandler::FetchTableRangeSize(FetchTableRangeSizeCc *fetch_cc) +{ + LOG(ERROR) << "DynamoHandler::FetchTableRangeSize not implemented"; + assert(false); +} + void EloqDS::DynamoHandler::OnFetchRangeSlices( const Aws::DynamoDB::DynamoDBClient *client, const Aws::DynamoDB::Model::GetItemRequest &request, diff --git a/store_handler/dynamo_handler.h b/store_handler/dynamo_handler.h index f2fc9ba5..704200e6 100644 --- a/store_handler/dynamo_handler.h +++ b/store_handler/dynamo_handler.h @@ -158,6 +158,7 @@ class DynamoHandler : public txservice::store::DataStoreHandler //-- range partition void FetchTableRanges(FetchTableRangesCc *fetch_cc) override; void FetchRangeSlices(FetchRangeSlicesReq *fetch_cc) override; + void FetchTableRangeSize(FetchTableRangeSizeCc *fetch_cc) override; bool DeleteOutOfRangeData( const txservice::TableName &table_name, diff --git a/store_handler/rocksdb_handler.cpp b/store_handler/rocksdb_handler.cpp index fad24c4b..4916a2d3 100644 --- a/store_handler/rocksdb_handler.cpp +++ b/store_handler/rocksdb_handler.cpp @@ -1110,6 +1110,13 @@ void RocksDBHandler::FetchRangeSlices(txservice::FetchRangeSlicesReq *fetch_cc) assert(false); } +void RocksDBHandler::FetchTableRangeSize( + txservice::FetchTableRangeSizeCc *fetch_cc) +{ + LOG(ERROR) << "RocksDBHandler::FetchTableRangeSize not implemented"; + assert(false); +} + bool DeleteOutOfRangeDataInternal(std::string delete_from_partition_sql, int32_t partition_id, const txservice::TxKey *start_k) diff --git a/store_handler/rocksdb_handler.h b/store_handler/rocksdb_handler.h index c8717a49..8742b064 100644 --- a/store_handler/rocksdb_handler.h +++ b/store_handler/rocksdb_handler.h @@ -346,6 +346,9 @@ class RocksDBHandler : public txservice::store::DataStoreHandler void FetchRangeSlices(txservice::FetchRangeSlicesReq *fetch_cc) override; + void FetchTableRangeSize( + txservice::FetchTableRangeSizeCc *fetch_cc) override; + bool DeleteOutOfRangeDataInternal(std::string delete_from_partition_sql, int32_t partition_id, const txservice::TxKey *start_k); diff --git a/tx_service/include/catalog_factory.h b/tx_service/include/catalog_factory.h index 80d42eb6..014e813f 100644 --- a/tx_service/include/catalog_factory.h +++ b/tx_service/include/catalog_factory.h @@ -424,7 +424,6 @@ class CatalogFactory TxKey start_key, uint64_t version_ts, int64_t partition_id, - uint32_t store_range_size = 0, std::unique_ptr slices = nullptr) = 0; virtual std::unique_ptr CreatePkCcmScanner( diff --git a/tx_service/include/cc/cc_map.h b/tx_service/include/cc/cc_map.h index 1507bf41..3a8ea675 100644 --- a/tx_service/include/cc/cc_map.h +++ b/tx_service/include/cc/cc_map.h @@ -263,21 +263,13 @@ class CcMap virtual const txservice::RecordSchema *RecordSchema() const = 0; /** - * Whether range_sizes_ has been loaded from store (for range-partitioned - * tables). Default: true (no lazy init needed for non-range-partitioned). + * Called by FetchTableRangeSizeCc::Execute when async load completes. + * Merges loaded size with accumulated delta (second), or resets to + * kNotInitialized on failure. */ - virtual bool RangeSizesInited() const - { - return true; - } - - /** - * Initialize range_sizes_ from store (e.g. - * TableRangeEntry::StoreRangeSize). No-op for non-range-partitioned CcMaps. - */ - virtual void InitRangeSizes(absl::flat_hash_map &&) - { - } + void InitRangeSize(uint32_t partition_id, + int32_t persisted_size, + bool succeed = true); uint64_t SchemaTs() const { diff --git a/tx_service/include/cc/cc_req_misc.h b/tx_service/include/cc/cc_req_misc.h index ed9ea8b9..f19e7a95 100644 --- a/tx_service/include/cc/cc_req_misc.h +++ b/tx_service/include/cc/cc_req_misc.h @@ -1148,4 +1148,35 @@ struct ShardCleanCc : public CcRequestBase private: size_t free_count_{0}; }; + +struct FetchTableRangeSizeCc : public CcRequestBase +{ +public: + FetchTableRangeSizeCc() = default; + ~FetchTableRangeSizeCc() = default; + + void Reset(const TableName &table_name, + int32_t partition_id, + const TxKey &start_key, + CcShard *ccs, + NodeGroupId ng_id, + int64_t ng_term); + + bool ValidTermCheck(); + bool Execute(CcShard &ccs) override; + void SetFinish(uint32_t error); + + const TableName *table_name_; + int32_t partition_id_{0}; + TxKey start_key_{}; + NodeGroupId node_group_id_{0}; + int64_t node_group_term_{-1}; + CcShard *ccs_{nullptr}; + + uint32_t error_code_{0}; + int32_t store_range_size_{0}; + + // Only used in DataStoreHandler + std::string kv_start_key_; +}; } // namespace txservice diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index 4eeb433d..0bf15347 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -251,15 +251,6 @@ struct TemplatedCcRequest : public CcRequestBase } assert(ccm != nullptr); assert(ccs.core_id_ == ccm->shard_->core_id_); - if (!table_name_->IsMeta() && !ccm->RangeSizesInited()) - { - TableName range_table_name(table_name_->StringView(), - TableType::RangePartition, - table_name_->Engine()); - absl::flat_hash_map range_sizes = - ccs.GetStoreRangeSizes(range_table_name, node_group_id_); - ccm->InitRangeSizes(std::move(range_sizes)); - } return ccm->Execute(*typed_req); } else @@ -268,15 +259,6 @@ struct TemplatedCcRequest : public CcRequestBase // execution blocked by lock. assert(ccm_ != nullptr); assert(ccs.core_id_ == ccm_->shard_->core_id_); - if (!table_name_->IsMeta() && !ccm_->RangeSizesInited()) - { - TableName range_table_name(table_name_->StringView(), - TableType::RangePartition, - table_name_->Engine()); - absl::flat_hash_map range_sizes = - ccs.GetStoreRangeSizes(range_table_name, node_group_id_); - ccm_->InitRangeSizes(std::move(range_sizes)); - } return ccm_->Execute(*typed_req); } } @@ -2572,15 +2554,6 @@ struct ScanSliceCc ccm_ = ccm; } assert(ccm != nullptr); - if (!table_name_->IsMeta() && !ccm->RangeSizesInited()) - { - TableName range_table_name(table_name_->StringView(), - TableType::RangePartition, - table_name_->Engine()); - absl::flat_hash_map range_sizes = - ccs.GetStoreRangeSizes(range_table_name, node_group_id_); - ccm->InitRangeSizes(std::move(range_sizes)); - } return ccm->Execute(*this); } else @@ -2588,15 +2561,6 @@ struct ScanSliceCc // non parallel request which is executed again, e.g. initial // execution blocked by lock. assert(ccm_ != nullptr); - if (!table_name_->IsMeta() && !ccm_->RangeSizesInited()) - { - TableName range_table_name(table_name_->StringView(), - TableType::RangePartition, - table_name_->Engine()); - absl::flat_hash_map range_sizes = - ccs.GetStoreRangeSizes(range_table_name, node_group_id_); - ccm_->InitRangeSizes(std::move(range_sizes)); - } return ccm_->Execute(*this); } } @@ -5067,15 +5031,6 @@ struct ReplayLogCc : public TemplatedCcRequest table_schema_ = ccm_->GetTableSchema(); } } - if (!table_name_->IsMeta() && !ccm_->RangeSizesInited()) - { - TableName range_table_name(table_name_->StringView(), - TableType::RangePartition, - table_name_->Engine()); - absl::flat_hash_map range_sizes = - ccs.GetStoreRangeSizes(range_table_name, node_group_id_); - ccm_->InitRangeSizes(std::move(range_sizes)); - } return ccm_->Execute(*this); } @@ -7911,16 +7866,6 @@ struct UploadBatchCc : public CcRequestBase } assert(ccm != nullptr); - assert(!table_name_->IsMeta()); - if (!ccm->RangeSizesInited()) - { - TableName range_table_name(table_name_->StringView(), - TableType::RangePartition, - table_name_->Engine()); - absl::flat_hash_map range_sizes = - ccs.GetStoreRangeSizes(range_table_name, node_group_id_); - ccm->InitRangeSizes(std::move(range_sizes)); - } return ccm->Execute(*this); } diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index 7f8cbfc6..8bcf330a 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -313,15 +313,10 @@ class CcShard */ CcMap *GetCcm(const TableName &table_name, uint32_t node_group); - /** - * Returns partition_id -> store_range_size for ranges that belong to this - * node group and this shard (core). Used to lazy-init - * TemplateCcMap::range_sizes_. - * @param range_table_name Range table name. - * @param node_group Node group id. - */ - absl::flat_hash_map GetStoreRangeSizes( - const TableName &range_table_name, const NodeGroupId node_group); + void FetchTableRangeSize(const TableName &table_name, + int32_t partition_id, + NodeGroupId cc_ng_id, + int64_t cc_ng_term); void AdjustDataKeyStats(const TableName &table_name, int64_t size_delta, @@ -1223,6 +1218,7 @@ class CcShard CcRequestPool fill_store_slice_cc_pool_; CcRequestPool init_key_cache_cc_pool_; + CcRequestPool fetch_range_size_cc_pool_; // CcRequest queue on this shard/core. moodycamel::ConcurrentQueue cc_queue_; diff --git a/tx_service/include/cc/local_cc_shards.h b/tx_service/include/cc/local_cc_shards.h index 6ba31ded..961bee52 100644 --- a/tx_service/include/cc/local_cc_shards.h +++ b/tx_service/include/cc/local_cc_shards.h @@ -985,11 +985,7 @@ class LocalCcShards if (last_sync_ts > 0) { - StoreRange *store_range = new_range_ptr->RangeSlices(); - size_t range_size = - store_range != nullptr ? store_range->PostCkptSize() : 0; - new_range_ptr->UpdateLastDataSyncTS( - last_sync_ts, static_cast(range_size)); + new_range_ptr->UpdateLastDataSyncTS(last_sync_ts); } return new_range_ptr; @@ -1053,17 +1049,6 @@ class LocalCcShards std::map *GetTableRangesForATable( const TableName &range_table_name, const NodeGroupId ng_id); - /** - * Returns partition_id -> StoreRangeSize() for ranges that belong to ng_id - * and the given core_id (partition_id % Count() == core_id). - * Caller must hold shared lock on table range meta if needed; this method - * takes shared_lock on fast_meta_data_mux_. - */ - absl::flat_hash_map GetStoreRangeSizes( - const TableName &range_table_name, - const NodeGroupId ng_id, - const uint16_t core_id); - /** * @brief Upload new range info into range_info_ in TableRangeEntry * object. diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index cb3c8081..f51b0926 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -11917,33 +11917,6 @@ class TemplateCcMap : public CcMap return &pos_inf_page_; } - bool RangeSizesInited() const override - { - if constexpr (RangePartitioned) - { - return range_sizes_.size() > 0; - } - return true; - } - - void InitRangeSizes( - absl::flat_hash_map &&range_sizes) override - { - if constexpr (RangePartitioned) - { - range_sizes_.clear(); - for (auto &[range_id, size] : range_sizes) - { - constexpr size_t kMaxInt32 = - static_cast(std::numeric_limits::max()); - int32_t clamped = (size > kMaxInt32) - ? std::numeric_limits::max() - : static_cast(size); - range_sizes_[range_id] = {clamped, 0}; - } - } - } - void UpdateRangeSize(uint32_t partition_id, int32_t delta_size, bool is_dirty) @@ -11968,8 +11941,11 @@ class TemplateCcMap : public CcMap it->second.first = static_cast(RangeSizeStatus::kLoading); - // TODO: fetch the range size. Implementation in - // range-size-tracking-phase3-persist-implementation-v1.md + int64_t ng_term = Sharder::Instance().LeaderTerm(cc_ng_id_); + shard_->FetchTableRangeSize(table_name_, + static_cast(partition_id), + cc_ng_id_, + ng_term); return; } diff --git a/tx_service/include/eloq_basic_catalog_factory.h b/tx_service/include/eloq_basic_catalog_factory.h index 6b7764f2..9cee9ac5 100644 --- a/tx_service/include/eloq_basic_catalog_factory.h +++ b/tx_service/include/eloq_basic_catalog_factory.h @@ -239,7 +239,6 @@ class EloqHashCatalogFactory : public CatalogFactory TxKey start_key, uint64_t version_ts, int64_t partition_id, - uint32_t store_range_size = 0, std::unique_ptr slices = nullptr) override { // No range partitioning supported for hash catalog factory @@ -312,7 +311,6 @@ class EloqRangeCatalogFactory : public CatalogFactory TxKey start_key, uint64_t version_ts, int64_t partition_id, - uint32_t store_range_size = 0, std::unique_ptr slices = nullptr) override; TxKey NegativeInfKey() const override; diff --git a/tx_service/include/range_record.h b/tx_service/include/range_record.h index 1434313b..ca187c01 100644 --- a/tx_service/include/range_record.h +++ b/tx_service/include/range_record.h @@ -71,22 +71,10 @@ struct InitRangeEntry { } - InitRangeEntry(TxKey start_key, - int32_t partition_id, - uint64_t version_ts, - uint32_t size) - : key_(std::move(start_key)), - partition_id_(partition_id), - version_ts_(version_ts), - size_(size) - { - } - InitRangeEntry(InitRangeEntry &&rhs) noexcept : key_(std::move(rhs.key_)), partition_id_(rhs.partition_id_), - version_ts_(rhs.version_ts_), - size_(rhs.size_) + version_ts_(rhs.version_ts_) { } @@ -97,7 +85,6 @@ struct InitRangeEntry key_ = std::move(rhs.key_); partition_id_ = rhs.partition_id_; version_ts_ = rhs.version_ts_; - size_ = rhs.size_; } return *this; @@ -106,8 +93,6 @@ struct InitRangeEntry TxKey key_; int32_t partition_id_{0}; uint64_t version_ts_{0}; - // Store range size - uint32_t size_{0}; }; struct RangeInfo @@ -442,12 +427,8 @@ struct TableRangeEntry TableRangeEntry(const TableRangeEntry &) = delete; TableRangeEntry &operator=(const TableRangeEntry &) = delete; - TableRangeEntry(uint64_t version_ts, - int64_t partition_id, - uint32_t store_range_size) - : mux_(), - fetch_range_slices_req_(nullptr), - store_range_size_(store_range_size) + TableRangeEntry(uint64_t version_ts, int64_t partition_id) + : mux_(), fetch_range_slices_req_(nullptr) { } @@ -510,7 +491,7 @@ struct TableRangeEntry return last_sync_ts_; } - void UpdateLastDataSyncTS(uint64_t last_sync_ts, uint32_t range_size) + void UpdateLastDataSyncTS(uint64_t last_sync_ts) { std::unique_lock lk(mux_); @@ -519,12 +500,6 @@ struct TableRangeEntry // data sync succeeded, update last sync ts last_sync_ts_ = last_sync_ts; } - store_range_size_ = range_size; - } - - uint32_t StoreRangeSize() const - { - return store_range_size_; } void FetchRangeSlices(const TableName &range_tbl_name, @@ -546,7 +521,6 @@ struct TableRangeEntry WritePreferSharedMutex mux_; std::unique_ptr fetch_range_slices_req_{nullptr}; - uint32_t store_range_size_{0}; template friend class RangeCcMap; @@ -562,9 +536,8 @@ struct TemplateTableRangeEntry : public TableRangeEntry const KeyT *start_key, uint64_t version_ts, int64_t partition_id, - uint32_t store_range_size = 0, std::unique_ptr> slices = nullptr) - : TableRangeEntry(version_ts, partition_id, store_range_size), + : TableRangeEntry(version_ts, partition_id), range_info_(start_key, version_ts, partition_id), range_slices_(std::move(slices)) { diff --git a/tx_service/include/store/data_store_handler.h b/tx_service/include/store/data_store_handler.h index 298d0871..344c0fe1 100644 --- a/tx_service/include/store/data_store_handler.h +++ b/tx_service/include/store/data_store_handler.h @@ -134,6 +134,8 @@ class DataStoreHandler virtual void FetchRangeSlices(FetchRangeSlicesReq *fetch_cc) = 0; + virtual void FetchTableRangeSize(FetchTableRangeSizeCc *fetch_cc) = 0; + /** * @brief Read a row from base table or skindex table in datastore with * specified key. Caller should pass in complete primary key or skindex key. diff --git a/tx_service/src/cc/cc_map.cpp b/tx_service/src/cc/cc_map.cpp index 52443b45..dffe93af 100644 --- a/tx_service/src/cc/cc_map.cpp +++ b/tx_service/src/cc/cc_map.cpp @@ -27,6 +27,7 @@ #include "cc/local_cc_shards.h" #include "cc_entry.h" #include "tx_trace.h" +#include "type.h" namespace txservice { @@ -461,4 +462,28 @@ void CcMap::DecrReadIntent(NonBlockingLock *lock, } } +void CcMap::InitRangeSize(uint32_t partition_id, + int32_t persisted_size, + bool succeed) +{ + auto it = range_sizes_.find(partition_id); + if (it == range_sizes_.end()) + { + // Should not happen: UpdateRangeSize already inserted entry. + return; + } + if (succeed) + { + int32_t final_size = persisted_size + it->second.second; + it->second.first = final_size < 0 ? 0 : final_size; + it->second.second = 0; + } + else + { + // Load range size failed; reset to not-initialized for retry. + it->second.first = + static_cast(RangeSizeStatus::kNotInitialized); + } +} + } // namespace txservice diff --git a/tx_service/src/cc/cc_req_misc.cpp b/tx_service/src/cc/cc_req_misc.cpp index ead420ac..26f7bc57 100644 --- a/tx_service/src/cc/cc_req_misc.cpp +++ b/tx_service/src/cc/cc_req_misc.cpp @@ -1531,4 +1531,49 @@ bool ShardCleanCc::Execute(CcShard &ccs) } } +void FetchTableRangeSizeCc::Reset(const TableName &table_name, + int32_t partition_id, + const TxKey &start_key, + CcShard *ccs, + NodeGroupId ng_id, + int64_t ng_term) +{ + table_name_ = &table_name; + partition_id_ = partition_id; + start_key_ = start_key.GetShallowCopy(); + node_group_id_ = ng_id; + node_group_term_ = ng_term; + ccs_ = ccs; + error_code_ = 0; + store_range_size_ = 0; +} + +bool FetchTableRangeSizeCc::ValidTermCheck() +{ + int64_t ng_leader_term = Sharder::Instance().LeaderTerm(node_group_id_); + return ng_leader_term == node_group_term_; +} + +bool FetchTableRangeSizeCc::Execute(CcShard &ccs) +{ + if (!ValidTermCheck()) + { + error_code_ = static_cast(CcErrorCode::NG_TERM_CHANGED); + } + + bool succ = (error_code_ == 0); + CcMap *ccm = ccs.GetCcm(*table_name_, node_group_id_); + assert(ccm != nullptr); + ccm->InitRangeSize( + static_cast(partition_id_), store_range_size_, succ); + + return true; +} + +void FetchTableRangeSizeCc::SetFinish(uint32_t error) +{ + error_code_ = error; + ccs_->Enqueue(this); +} + } // namespace txservice diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index da86a6cf..258e27e3 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -377,11 +377,24 @@ CcMap *CcShard::GetCcm(const TableName &table_name, uint32_t node_group) } } -absl::flat_hash_map CcShard::GetStoreRangeSizes( - const TableName &range_table_name, const NodeGroupId node_group) -{ - return local_shards_.GetStoreRangeSizes( - range_table_name, node_group, core_id_); +void CcShard::FetchTableRangeSize(const TableName &table_name, + int32_t partition_id, + NodeGroupId cc_ng_id, + int64_t cc_ng_term) +{ + FetchTableRangeSizeCc *fetch_cc = fetch_range_size_cc_pool_.NextRequest(); + + const TableName range_table_name(table_name.StringView(), + TableType::RangePartition, + table_name.Engine()); + const TableRangeEntry *range_entry = + GetTableRangeEntry(range_table_name, cc_ng_id, partition_id); + assert(range_entry != nullptr); + TxKey start_key = range_entry->GetRangeInfo()->StartTxKey(); + + fetch_cc->Reset( + table_name, partition_id, start_key, this, cc_ng_id, cc_ng_term); + local_shards_.store_hd_->FetchTableRangeSize(fetch_cc); } void CcShard::AdjustDataKeyStats(const TableName &table_name, diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index f750f9a5..2c1c9a76 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -1026,11 +1026,7 @@ void LocalCcShards::CreateSplitRangeRecoveryTx( } else { - StoreRange *store_range = range_entry->RangeSlices(); - assert(store_range); - size_t range_size = store_range->PostCkptSize(); - range_entry->UpdateLastDataSyncTS( - 0, static_cast(range_size)); + range_entry->UpdateLastDataSyncTS(0); range_entry->UnPinStoreRange(); txservice::CommitTx(txm); } @@ -1093,8 +1089,7 @@ void LocalCcShards::InitTableRanges(const TableName &range_table_name, GetCatalogFactory(range_table_name.Engine()) ->CreateTableRange(std::move(range_start_key), range_entry.version_ts_, - range_entry.partition_id_, - range_entry.size_); + range_entry.partition_id_); range_it = ranges .try_emplace(new_range->RangeStartTxKey(), std::move(new_range)) @@ -1263,31 +1258,6 @@ LocalCcShards::GetTableRangeIdsForATableInternal( return ng_it == table_it->second.end() ? nullptr : &ng_it->second; } -absl::flat_hash_map LocalCcShards::GetStoreRangeSizes( - const TableName &range_table_name, - const NodeGroupId ng_id, - const uint16_t core_id) -{ - absl::flat_hash_map range_sizes; - std::shared_lock lk(fast_meta_data_mux_); - std::unordered_map *range_ids = - GetTableRangeIdsForATableInternal(range_table_name, ng_id); - assert(range_ids != nullptr); - for (const auto &[partition_id, entry] : *range_ids) - { - BucketInfo *owner = - GetRangeOwnerInternal(static_cast(partition_id), ng_id); - NodeGroupId range_owner = owner->BucketOwner(); - uint16_t range_shard_idx = - static_cast(partition_id % Count()); - if (range_owner == ng_id && range_shard_idx == core_id) - { - range_sizes.emplace(partition_id, entry->StoreRangeSize()); - } - } - return range_sizes; -} - void LocalCcShards::CleanTableRange(const TableName &table_name, NodeGroupId ng_id) { @@ -3281,11 +3251,7 @@ void LocalCcShards::PostProcessFlushTaskEntries( { if (!task->during_split_range_) { - StoreRange *store_range = range_entry->RangeSlices(); - assert(store_range); - size_t range_size = store_range->PostCkptSize(); - range_entry->UpdateLastDataSyncTS( - task->data_sync_ts_, static_cast(range_size)); + range_entry->UpdateLastDataSyncTS(task->data_sync_ts_); range_entry->UnPinStoreRange(); // Commit the data sync txm txservice::CommitTx(entry->data_sync_txm_); @@ -3440,11 +3406,7 @@ void LocalCcShards::PostProcessRangePartitionDataSyncTask( { if (!task->during_split_range_) { - StoreRange *store_range = range_entry->RangeSlices(); - assert(store_range); - size_t range_size = store_range->PostCkptSize(); - range_entry->UpdateLastDataSyncTS( - task->data_sync_ts_, static_cast(range_size)); + range_entry->UpdateLastDataSyncTS(task->data_sync_ts_); range_entry->UnPinStoreRange(); // Commit the data sync txm txservice::CommitTx(data_sync_txm); @@ -3950,12 +3912,7 @@ void LocalCcShards::DataSyncForRangePartition( txservice::CommitTx(data_sync_txm); // Update the task status for this range. - StoreRange *store_range = range_entry->RangeSlices(); - assert(store_range); - size_t range_size = store_range->PostCkptSize(); - range_entry->UpdateLastDataSyncTS( - data_sync_task->data_sync_ts_, - static_cast(range_size)); + range_entry->UpdateLastDataSyncTS(data_sync_task->data_sync_ts_); // Generally, the StoreRange will be pinned only when there are data // items in the range that needs to be ckpted. if (scan_delta_size_cc.StoreRangePtr() != nullptr) @@ -5734,12 +5691,7 @@ void LocalCcShards::SplitFlushRange( return; } - StoreRange *store_range = range_entry->RangeSlices(); - assert(store_range); - size_t range_size = store_range->PostCkptSize(); - range_entry->UpdateLastDataSyncTS(data_sync_task->data_sync_ts_, - static_cast(range_size)); - + range_entry->UpdateLastDataSyncTS(data_sync_task->data_sync_ts_); range_entry->UnPinStoreRange(); data_sync_task->SetFinish(); diff --git a/tx_service/src/eloq_basic_catalog_factory.cpp b/tx_service/src/eloq_basic_catalog_factory.cpp index 9c844e5b..9983f44e 100644 --- a/tx_service/src/eloq_basic_catalog_factory.cpp +++ b/tx_service/src/eloq_basic_catalog_factory.cpp @@ -338,7 +338,6 @@ std::unique_ptr EloqRangeCatalogFactory::CreateTableRange( TxKey start_key, uint64_t version_ts, int64_t partition_id, - uint32_t store_range_size, std::unique_ptr slices) { assert(start_key.Type() == KeyType::NegativeInf || start_key.IsOwner()); @@ -355,11 +354,7 @@ std::unique_ptr EloqRangeCatalogFactory::CreateTableRange( range_ptr}; return std::make_unique>( - start, - version_ts, - partition_id, - store_range_size, - std::move(typed_range)); + start, version_ts, partition_id, std::move(typed_range)); } TxKey EloqRangeCatalogFactory::NegativeInfKey() const diff --git a/tx_service/tests/include/mock/mock_catalog_factory.h b/tx_service/tests/include/mock/mock_catalog_factory.h index 05669832..923b43cf 100644 --- a/tx_service/tests/include/mock/mock_catalog_factory.h +++ b/tx_service/tests/include/mock/mock_catalog_factory.h @@ -246,7 +246,6 @@ class MockCatalogFactory : public CatalogFactory TxKey start_key, uint64_t version_ts, int64_t partition_id, - uint32_t store_range_size = 0, std::unique_ptr slices = nullptr) override { assert(false); From 83e6a2ea5889ac6b13f4787827fc7036a09e788f Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Sat, 28 Feb 2026 18:32:31 +0800 Subject: [PATCH 10/15] update --- store_handler/data_store_service_client.cpp | 15 ++++++++------- store_handler/data_store_service_client.h | 6 +++--- .../data_store_service_client_closure.cpp | 2 +- tx_service/include/range_record.h | 3 +-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/store_handler/data_store_service_client.cpp b/store_handler/data_store_service_client.cpp index 3d6d6d36..4f468eeb 100644 --- a/store_handler/data_store_service_client.cpp +++ b/store_handler/data_store_service_client.cpp @@ -1278,18 +1278,19 @@ std::string DataStoreServiceClient::EncodeRangeKey( * @param range_version The version of the range. * @param version The general version number. * @param segment_cnt The number of segments in the range. + * @param range_size The size of the range. * @return Binary string containing the encoded range value. */ std::string DataStoreServiceClient::EncodeRangeValue(int32_t range_id, uint64_t range_version, uint64_t version, uint32_t segment_cnt, - uint32_t range_size) + int32_t range_size) { std::string kv_range_record; kv_range_record.reserve(sizeof(int32_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint32_t) + - sizeof(uint32_t)); + sizeof(int32_t)); kv_range_record.append(reinterpret_cast(&range_id), sizeof(int32_t)); kv_range_record.append(reinterpret_cast(&range_version), @@ -1300,7 +1301,7 @@ std::string DataStoreServiceClient::EncodeRangeValue(int32_t range_id, kv_range_record.append(reinterpret_cast(&segment_cnt), sizeof(uint32_t)); kv_range_record.append(reinterpret_cast(&range_size), - sizeof(uint32_t)); + sizeof(int32_t)); return kv_range_record; } @@ -1417,7 +1418,7 @@ RangeSliceBatchPlan DataStoreServiceClient::PrepareRangeSliceBatches( sizeof(uint32_t)); segment_record.append(slice_start_key.Data(), key_size); uint32_t slice_size = static_cast(slices[i]->Size()); - plan.range_size += slice_size; + plan.range_size += static_cast(slice_size); segment_record.append(reinterpret_cast(&slice_size), sizeof(uint32_t)); } @@ -1583,7 +1584,7 @@ void DataStoreServiceClient::EnqueueRangeMetadataRecord( uint64_t range_version, uint64_t version, uint32_t segment_cnt, - uint32_t range_size, + int32_t range_size, RangeMetadataAccumulator &accumulator) { // Compute kv_table_name and kv_partition_id @@ -1763,7 +1764,7 @@ bool DataStoreServiceClient::UpdateRangeSlices( req.range_slices_, req.partition_id_); uint32_t segment_cnt = slice_plan.segment_cnt; - uint32_t range_size = slice_plan.range_size; + int32_t range_size = slice_plan.range_size; int32_t kv_partition_id = KvPartitionIdOfRangeSlices(*req.table_name_, req.partition_id_); auto iter = slice_plans.find(kv_partition_id); @@ -2082,7 +2083,7 @@ bool DataStoreServiceClient::UpsertRanges( auto slice_plan = PrepareRangeSliceBatches( table_name, version, range.slices_, range.partition_id_); uint32_t segment_cnt = slice_plan.segment_cnt; - uint32_t range_size = slice_plan.range_size; + int32_t range_size = slice_plan.range_size; int32_t kv_partition_id = KvPartitionIdOfRangeSlices(table_name, range.partition_id_); diff --git a/store_handler/data_store_service_client.h b/store_handler/data_store_service_client.h index 503d2512..269d0666 100644 --- a/store_handler/data_store_service_client.h +++ b/store_handler/data_store_service_client.h @@ -66,7 +66,7 @@ struct RangeSliceBatchPlan std::vector segment_keys; // Owned string buffers std::vector segment_records; // Owned string buffers size_t version; - uint32_t range_size{0}; + int32_t range_size{0}; // Clear method for reuse void Clear() @@ -345,7 +345,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler uint64_t range_version, uint64_t version, uint32_t segment_cnt, - uint32_t range_size); + int32_t range_size); std::string EncodeRangeSliceKey(const txservice::TableName &table_name, int32_t range_id, uint32_t segment_id); @@ -648,7 +648,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler uint64_t range_version, uint64_t version, uint32_t segment_cnt, - uint32_t range_size, + int32_t range_size, RangeMetadataAccumulator &accumulator); void DispatchRangeMetadataBatches( diff --git a/store_handler/data_store_service_client_closure.cpp b/store_handler/data_store_service_client_closure.cpp index b03911f9..8f000004 100644 --- a/store_handler/data_store_service_client_closure.cpp +++ b/store_handler/data_store_service_client_closure.cpp @@ -795,7 +795,7 @@ void FetchTableRangesCallback(void *data, scan_next_closure->GetItem(i, key, value, ts, ttl); assert(value.size() == (sizeof(int32_t) + sizeof(uint64_t) + sizeof(uint64_t) + - sizeof(uint32_t) + sizeof(uint32_t))); + sizeof(uint32_t) + sizeof(int32_t))); const char *buf = value.data(); int32_t partition_id = *(reinterpret_cast(buf)); buf += sizeof(partition_id); diff --git a/tx_service/include/range_record.h b/tx_service/include/range_record.h index ca187c01..701b904d 100644 --- a/tx_service/include/range_record.h +++ b/tx_service/include/range_record.h @@ -537,8 +537,7 @@ struct TemplateTableRangeEntry : public TableRangeEntry uint64_t version_ts, int64_t partition_id, std::unique_ptr> slices = nullptr) - : TableRangeEntry(version_ts, partition_id), - range_info_(start_key, version_ts, partition_id), + : range_info_(start_key, version_ts, partition_id), range_slices_(std::move(slices)) { } From 05a1c3ac9e04ea30669e129dfb3edf67ddd8f185 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Mon, 2 Mar 2026 00:51:37 +0800 Subject: [PATCH 11/15] reset range size after range split --- tx_service/include/cc/cc_map.h | 5 ++- tx_service/include/cc/range_cc_map.h | 46 ++++++++++++++++++++++++++++ tx_service/src/cc/cc_map.cpp | 10 ++++-- 3 files changed, 57 insertions(+), 4 deletions(-) diff --git a/tx_service/include/cc/cc_map.h b/tx_service/include/cc/cc_map.h index 3a8ea675..b0822ae8 100644 --- a/tx_service/include/cc/cc_map.h +++ b/tx_service/include/cc/cc_map.h @@ -266,10 +266,13 @@ class CcMap * Called by FetchTableRangeSizeCc::Execute when async load completes. * Merges loaded size with accumulated delta (second), or resets to * kNotInitialized on failure. + * When emplace is true and partition_id is absent, inserts (partition_id, + * (0,0)) before merging; used for new ranges after split. */ void InitRangeSize(uint32_t partition_id, int32_t persisted_size, - bool succeed = true); + bool succeed = true, + bool emplace = false); uint64_t SchemaTs() const { diff --git a/tx_service/include/cc/range_cc_map.h b/tx_service/include/cc/range_cc_map.h index 5b39fd71..9291478e 100644 --- a/tx_service/include/cc/range_cc_map.h +++ b/tx_service/include/cc/range_cc_map.h @@ -709,9 +709,55 @@ class RangeCcMap : public TemplateCcMap // update previous cce's end key cce->SetCommitTsPayloadStatus(new_range_info->version_ts_, RecordStatus::Normal); + + // Reset new range size on the data table ccmap (emplace if + // absent). + int32_t new_range_id = new_range_info->PartitionId(); + NodeGroupId new_range_owner = + shard_->GetRangeOwner(new_range_id, this->cc_ng_id_) + ->BucketOwner(); + if (new_range_owner == this->cc_ng_id_ && + static_cast(new_range_id % shard_->core_cnt_) == + shard_->core_id_) + { + TableType data_table_type = + TableName::Type(this->table_name_.StringView()); + TableName data_table_name(this->table_name_.StringView(), + data_table_type, + this->table_name_.Engine()); + CcMap *ccm = + shard_->GetCcm(data_table_name, this->cc_ng_id_); + assert(ccm != nullptr); + size_t range_size = new_range_entries.at(idx) + ->TypedStoreRange() + ->PostCkptSize(); + ccm->InitRangeSize(static_cast(new_range_id), + static_cast(range_size), + true, + true); + } } // range_owner_rec_ needs to be reset on each core since they point // to bucket records on different cores. + // Reset old range size on the data table ccmap (no emplace). + int32_t old_partition_id = + upload_range_rec->GetRangeInfo()->PartitionId(); + if (range_owner == this->cc_ng_id_ && + static_cast(old_partition_id % shard_->core_cnt_) == + shard_->core_id_) + { + TableType data_table_type = + TableName::Type(this->table_name_.StringView()); + TableName data_table_name(this->table_name_.StringView(), + data_table_type, + this->table_name_.Engine()); + CcMap *ccm = shard_->GetCcm(data_table_name, this->cc_ng_id_); + assert(ccm != nullptr); + size_t old_range_size = + old_entry->TypedStoreRange()->PostCkptSize(); + ccm->InitRangeSize(static_cast(old_partition_id), + static_cast(old_range_size)); + } upload_range_rec->range_owner_rec_ = target_cce->payload_.cur_payload_->range_owner_rec_; diff --git a/tx_service/src/cc/cc_map.cpp b/tx_service/src/cc/cc_map.cpp index dffe93af..fc4cf4dc 100644 --- a/tx_service/src/cc/cc_map.cpp +++ b/tx_service/src/cc/cc_map.cpp @@ -464,13 +464,17 @@ void CcMap::DecrReadIntent(NonBlockingLock *lock, void CcMap::InitRangeSize(uint32_t partition_id, int32_t persisted_size, - bool succeed) + bool succeed, + bool emplace) { auto it = range_sizes_.find(partition_id); if (it == range_sizes_.end()) { - // Should not happen: UpdateRangeSize already inserted entry. - return; + if (!emplace) + { + return; + } + it = range_sizes_.emplace(partition_id, std::make_pair(0, 0)).first; } if (succeed) { From 76fa3d1297c5ccd41501253f23990acd6607101d Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Mon, 2 Mar 2026 01:17:26 +0800 Subject: [PATCH 12/15] set range size flags for postwrite request --- tx_service/include/cc/cc_handler.h | 3 +- tx_service/include/cc/cc_request.h | 28 +++++++++++-------- tx_service/include/cc/local_cc_handler.h | 3 +- tx_service/include/cc/template_cc_map.h | 25 +++++++++-------- tx_service/include/proto/cc_request.proto | 1 + tx_service/include/read_write_entry.h | 7 ++++- tx_service/include/remote/remote_cc_handler.h | 3 +- tx_service/src/cc/local_cc_handler.cpp | 9 ++++-- tx_service/src/remote/remote_cc_handler.cpp | 4 ++- tx_service/src/remote/remote_cc_request.cpp | 3 +- tx_service/src/tx_execution.cpp | 24 ++++++++-------- tx_service/src/tx_operation.cpp | 4 +++ 12 files changed, 72 insertions(+), 42 deletions(-) diff --git a/tx_service/include/cc/cc_handler.h b/tx_service/include/cc/cc_handler.h index 3d4640b8..ed494d60 100644 --- a/tx_service/include/cc/cc_handler.h +++ b/tx_service/include/cc/cc_handler.h @@ -166,7 +166,8 @@ class CcHandler const TxRecord *record, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult &hres) = 0; + CcHandlerResult &hres, + uint8_t range_size_flags = 0x10) = 0; /** * @briefPost-processes a read/scan key. Post-processing clears the read diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index 0bf15347..d03bcfdc 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -741,7 +741,7 @@ struct PostWriteCc : public TemplatedCcRequest OperationType operation_type, uint32_t key_shard_code, CcHandlerResult *res, - bool on_dirty_range = false) + uint8_t range_size_flags = 0x10) { TemplatedCcRequest::Reset( nullptr, res, addr->NodeGroupId(), tx_number, tx_term); @@ -755,7 +755,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = false; ccm_ = nullptr; is_initial_insert_ = false; - on_dirty_range_ = on_dirty_range; + range_size_flags_ = range_size_flags; } void Reset(const TxKey *key, @@ -770,7 +770,7 @@ struct PostWriteCc : public TemplatedCcRequest CcHandlerResult *res, bool initial_insertion = false, int64_t ng_term = INIT_TERM, - bool on_dirty_range = false) + uint8_t range_size_flags = 0x10) { TemplatedCcRequest::Reset( &table_name, @@ -791,7 +791,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = false; ccm_ = nullptr; is_initial_insert_ = initial_insertion; - on_dirty_range_ = on_dirty_range; + range_size_flags_ = range_size_flags; } void Reset(const CcEntryAddr *addr, @@ -802,7 +802,7 @@ struct PostWriteCc : public TemplatedCcRequest OperationType operation_type, uint32_t key_shard_code, CcHandlerResult *res, - bool on_dirty_range = false) + uint8_t range_size_flags = 0x10) { TemplatedCcRequest::Reset( nullptr, res, addr->NodeGroupId(), tx_number, tx_term); @@ -816,7 +816,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = true; ccm_ = nullptr; is_initial_insert_ = false; - on_dirty_range_ = on_dirty_range; + range_size_flags_ = range_size_flags; } void Reset(const TableName *table_name, @@ -831,7 +831,7 @@ struct PostWriteCc : public TemplatedCcRequest CcHandlerResult *res, bool initial_insertion = false, int64_t ng_term = INIT_TERM, - bool on_dirty_range = false) + uint8_t range_size_flags = 0x10) { TemplatedCcRequest::Reset( table_name, @@ -852,7 +852,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = true; ccm_ = nullptr; is_initial_insert_ = initial_insertion; - on_dirty_range_ = on_dirty_range; + range_size_flags_ = range_size_flags; } const CcEntryAddr *CceAddr() const @@ -909,7 +909,12 @@ struct PostWriteCc : public TemplatedCcRequest bool OnDirtyRange() const { - return on_dirty_range_; + return (range_size_flags_ & 0x0F) != 0; + } + + bool NeedUpdateRangeSize() const + { + return (range_size_flags_ >> 4) != 0; } private: @@ -929,8 +934,9 @@ struct PostWriteCc : public TemplatedCcRequest const void *key_; const std::string *key_str_; }; - // True if the range that the key belongs to is being splitting. - bool on_dirty_range_{false}; + // High 4 bits: need update range size; low 4 bits: on dirty (splitting) + // range. + uint8_t range_size_flags_{0x10}; }; struct PostWriteAllCc diff --git a/tx_service/include/cc/local_cc_handler.h b/tx_service/include/cc/local_cc_handler.h index eae6ba46..7c2ff2fa 100644 --- a/tx_service/include/cc/local_cc_handler.h +++ b/tx_service/include/cc/local_cc_handler.h @@ -103,7 +103,8 @@ class LocalCcHandler : public CcHandler const TxRecord *record, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult &hres) override; + CcHandlerResult &hres, + uint8_t range_size_flags = 0x10) override; CcReqStatus PostRead( uint64_t tx_number, diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index f51b0926..0db500ed 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -617,17 +617,20 @@ class TemplateCcMap : public CcMap if constexpr (RangePartitioned) { - const int64_t key_delta_size = - (new_status == RecordStatus::Deleted) - ? (-static_cast(write_key->Size() + - old_payload_size)) - : (static_cast(cce->PayloadSize()) - - static_cast(old_payload_size)); - const uint32_t range_id = req.PartitionId(); - // is_dirty: true when range is splitting. - UpdateRangeSize(range_id, - static_cast(key_delta_size), - req.OnDirtyRange()); + if (req.NeedUpdateRangeSize()) + { + const int64_t key_delta_size = + (new_status == RecordStatus::Deleted) + ? (-static_cast(write_key->Size() + + old_payload_size)) + : (static_cast(cce->PayloadSize()) - + static_cast(old_payload_size)); + const uint32_t range_id = req.PartitionId(); + // is_dirty: true when range is splitting. + UpdateRangeSize(range_id, + static_cast(key_delta_size), + req.OnDirtyRange()); + } } if (req.IsInitialInsert()) diff --git a/tx_service/include/proto/cc_request.proto b/tx_service/include/proto/cc_request.proto index d02a679f..ae877163 100644 --- a/tx_service/include/proto/cc_request.proto +++ b/tx_service/include/proto/cc_request.proto @@ -918,6 +918,7 @@ message PostCommitRequest { bytes record = 5; uint32 operation_type = 6; uint32 key_shard_code = 7; + uint32 range_size_flags = 8; } message ForwardPostCommitRequest { diff --git a/tx_service/include/read_write_entry.h b/tx_service/include/read_write_entry.h index 4d86c34c..ffd53533 100644 --- a/tx_service/include/read_write_entry.h +++ b/tx_service/include/read_write_entry.h @@ -49,7 +49,8 @@ struct WriteSetEntry op_(other.op_), cce_addr_(other.cce_addr_), key_shard_code_(other.key_shard_code_), - forward_addr_(std::move(other.forward_addr_)) + forward_addr_(std::move(other.forward_addr_)), + range_size_flags_(other.range_size_flags_) { } @@ -60,6 +61,7 @@ struct WriteSetEntry cce_addr_ = other.cce_addr_; key_shard_code_ = other.key_shard_code_; forward_addr_ = std::move(other.forward_addr_); + range_size_flags_ = other.range_size_flags_; return *this; } @@ -70,6 +72,9 @@ struct WriteSetEntry uint32_t key_shard_code_{}; // Used in double write scenarios during online DDL. std::unordered_map forward_addr_; + // High 4 bits: need update range size; low 4 bits: on dirty (splitting) + // range. + uint8_t range_size_flags_{0x10}; }; /** diff --git a/tx_service/include/remote/remote_cc_handler.h b/tx_service/include/remote/remote_cc_handler.h index 83695f21..4ef859d4 100644 --- a/tx_service/include/remote/remote_cc_handler.h +++ b/tx_service/include/remote/remote_cc_handler.h @@ -84,7 +84,8 @@ class RemoteCcHandler const TxRecord *record, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult &hres); + CcHandlerResult &hres, + uint8_t range_size_flags = 0x10); void PostWriteAll(uint32_t src_node_id, const TableName &table_name, diff --git a/tx_service/src/cc/local_cc_handler.cpp b/tx_service/src/cc/local_cc_handler.cpp index 9dd7962d..fc753167 100644 --- a/tx_service/src/cc/local_cc_handler.cpp +++ b/tx_service/src/cc/local_cc_handler.cpp @@ -274,7 +274,8 @@ txservice::CcReqStatus txservice::LocalCcHandler::PostWrite( const TxRecord *record, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult &hres) + CcHandlerResult &hres, + uint8_t range_size_flags) { uint32_t ng_id = cce_addr.NodeGroupId(); uint32_t dest_node_id = Sharder::Instance().LeaderNodeId(ng_id); @@ -293,7 +294,8 @@ txservice::CcReqStatus txservice::LocalCcHandler::PostWrite( record, operation_type, key_shard_code, - &hres); + &hres, + range_size_flags); TX_TRACE_ACTION(this, req); TX_TRACE_DUMP(req); cc_shards_.EnqueueCcRequest(thd_id_, cce_addr.CoreId(), req); @@ -312,7 +314,8 @@ txservice::CcReqStatus txservice::LocalCcHandler::PostWrite( record, operation_type, key_shard_code, - hres); + hres, + range_size_flags); } return req_status; } diff --git a/tx_service/src/remote/remote_cc_handler.cpp b/tx_service/src/remote/remote_cc_handler.cpp index 848ae8f7..517fe421 100644 --- a/tx_service/src/remote/remote_cc_handler.cpp +++ b/tx_service/src/remote/remote_cc_handler.cpp @@ -159,7 +159,8 @@ void txservice::remote::RemoteCcHandler::PostWrite( const TxRecord *record, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult &hres) + CcHandlerResult &hres, + uint8_t range_size_flags) { CcMessage send_msg; @@ -194,6 +195,7 @@ void txservice::remote::RemoteCcHandler::PostWrite( post_commit->set_commit_ts(commit_ts); post_commit->set_operation_type(static_cast(operation_type)); post_commit->set_key_shard_code(key_shard_code); + post_commit->set_range_size_flags(range_size_flags); stream_sender_.SendMessageToNg(cce_addr.NodeGroupId(), send_msg, &hres); } diff --git a/tx_service/src/remote/remote_cc_request.cpp b/tx_service/src/remote/remote_cc_request.cpp index 32fbb935..cf9c7f5c 100644 --- a/tx_service/src/remote/remote_cc_request.cpp +++ b/tx_service/src/remote/remote_cc_request.cpp @@ -594,7 +594,8 @@ void txservice::remote::RemotePostWrite::Reset( rec_str, static_cast(post_commit.operation_type()), post_commit.key_shard_code(), - &cc_res_); + &cc_res_, + static_cast(post_commit.range_size_flags())); } else { diff --git a/tx_service/src/tx_execution.cpp b/tx_service/src/tx_execution.cpp index 24553f56..9d7b4183 100644 --- a/tx_service/src/tx_execution.cpp +++ b/tx_service/src/tx_execution.cpp @@ -5297,22 +5297,24 @@ void TransactionExecution::Process(PostProcessOp &post_process) write_entry.rec_.get(), write_entry.op_, write_entry.key_shard_code_, - post_process.hd_result_); + post_process.hd_result_, + write_entry.range_size_flags_); update_post_cnt(ret); for (auto &[forward_shard_code, cce_addr] : write_entry.forward_addr_) { - CcReqStatus ret = - cc_handler_->PostWrite(tx_number, - tx_term_, - command_id, - commit_ts_, - cce_addr, - write_entry.rec_.get(), - write_entry.op_, - forward_shard_code, - post_process.hd_result_); + CcReqStatus ret = cc_handler_->PostWrite( + tx_number, + tx_term_, + command_id, + commit_ts_, + cce_addr, + write_entry.rec_.get(), + write_entry.op_, + forward_shard_code, + post_process.hd_result_, + (0x10 | write_entry.range_size_flags_)); update_post_cnt(ret); } } diff --git a/tx_service/src/tx_operation.cpp b/tx_service/src/tx_operation.cpp index 663d50c1..25574868 100644 --- a/tx_service/src/tx_operation.cpp +++ b/tx_service/src/tx_operation.cpp @@ -739,6 +739,8 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) WriteSetEntry &write_entry = write_key_it_->second; size_t hash = write_tx_key.Hash(); write_entry.key_shard_code_ = (range_ng << 10) | (hash & 0x3FF); + write_entry.range_size_flags_ = + 0x10 | static_cast(range_info->IsDirty()); // If current range is migrating, forward to new range owner. if (new_bucket_ng != UINT32_MAX) { @@ -765,6 +767,8 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) { write_entry.forward_addr_.try_emplace((new_range_ng << 10) | (hash & 0x3FF)); + write_entry.range_size_flags_ = + 0x0F & write_entry.range_size_flags_; } // If the new range is migrating, forward to the new owner of // new range. From 46948183c2bb89482603a8a61135636e669f6b44 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Mon, 2 Mar 2026 17:34:12 +0800 Subject: [PATCH 13/15] update --- tx_service/include/cc/range_cc_map.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tx_service/include/cc/range_cc_map.h b/tx_service/include/cc/range_cc_map.h index 9291478e..41d60d81 100644 --- a/tx_service/include/cc/range_cc_map.h +++ b/tx_service/include/cc/range_cc_map.h @@ -737,8 +737,6 @@ class RangeCcMap : public TemplateCcMap true); } } - // range_owner_rec_ needs to be reset on each core since they point - // to bucket records on different cores. // Reset old range size on the data table ccmap (no emplace). int32_t old_partition_id = upload_range_rec->GetRangeInfo()->PartitionId(); @@ -758,6 +756,9 @@ class RangeCcMap : public TemplateCcMap ccm->InitRangeSize(static_cast(old_partition_id), static_cast(old_range_size)); } + + // range_owner_rec_ needs to be reset on each core since they point + // to bucket records on different cores. upload_range_rec->range_owner_rec_ = target_cce->payload_.cur_payload_->range_owner_rec_; From 2330266d3cccfaf278107855428a27b0927465c7 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Wed, 4 Mar 2026 14:15:10 +0800 Subject: [PATCH 14/15] update range size during create secondary index --- tx_service/include/cc/cc_request.h | 40 ++++--- tx_service/include/cc/object_cc_map.h | 6 +- tx_service/include/cc/template_cc_map.h | 80 +++++++++++--- tx_service/include/proto/cc_request.proto | 4 + tx_service/include/sk_generator.h | 29 +++--- tx_service/src/cc/local_cc_shards.cpp | 1 + tx_service/src/remote/cc_node_service.cpp | 20 +++- tx_service/src/sk_generator.cpp | 121 +++++++++++++--------- 8 files changed, 212 insertions(+), 89 deletions(-) diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index d03bcfdc..54610a36 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -7742,7 +7742,9 @@ struct CollectMemStatsCc : public CcRequestBase struct UploadBatchCc : public CcRequestBase { + // keys, records, commit_ts, rec_status, range_size_flags using WriteEntryTuple = std::tuple; @@ -7759,10 +7761,10 @@ struct UploadBatchCc : public CcRequestBase void Reset(const TableName &table_name, txservice::NodeGroupId ng_id, int64_t &ng_term, - size_t core_cnt, + int32_t partition_id, size_t batch_size, size_t start_key_idx, - const std::vector &entry_vec, + const std::vector> &entry_vec, bthread::Mutex &req_mux, bthread::ConditionVariable &req_cv, size_t &finished_req_cnt, @@ -7773,6 +7775,7 @@ struct UploadBatchCc : public CcRequestBase node_group_id_ = ng_id; node_group_term_ = &ng_term; is_remote_ = false; + partition_id_ = partition_id; batch_size_ = batch_size; start_key_idx_ = start_key_idx; entry_vector_ = &entry_vec; @@ -7780,16 +7783,17 @@ struct UploadBatchCc : public CcRequestBase req_cv_ = &req_cv; finished_req_cnt_ = &finished_req_cnt; req_result_ = &req_result; - unfinished_cnt_.store(core_cnt, std::memory_order_relaxed); + unfinished_cnt_.store(1, std::memory_order_relaxed); err_code_.store(CcErrorCode::NO_ERROR, std::memory_order_relaxed); paused_pos_.clear(); - paused_pos_.resize(core_cnt, {}); + paused_pos_.resize(1, {}); data_type_ = data_type; } void Reset(const TableName &table_name, txservice::NodeGroupId ng_id, int64_t &ng_term, + int32_t partition_id, size_t core_cnt, uint32_t batch_size, const WriteEntryTuple &entry_tuple, @@ -7802,6 +7806,7 @@ struct UploadBatchCc : public CcRequestBase node_group_id_ = ng_id; node_group_term_ = &ng_term; is_remote_ = true; + partition_id_ = partition_id; batch_size_ = batch_size; start_key_idx_ = 0; entry_tuples_ = &entry_tuple; @@ -7944,7 +7949,12 @@ struct UploadBatchCc : public CcRequestBase return batch_size_; } - const std::vector *EntryVector() const + int32_t PartitionId() const + { + return partition_id_; + } + + const std::vector> *EntryVector() const { return is_remote_ ? nullptr : entry_vector_; } @@ -7959,7 +7969,8 @@ struct UploadBatchCc : public CcRequestBase size_t key_off, size_t rec_off, size_t ts_off, - size_t status_off) + size_t status_off, + size_t flags_off) { auto &key_pos = paused_pos_.at(core_id); std::get<0>(key_pos) = key_index; @@ -7967,10 +7978,11 @@ struct UploadBatchCc : public CcRequestBase std::get<2>(key_pos) = rec_off; std::get<3>(key_pos) = ts_off; std::get<4>(key_pos) = status_off; + std::get<5>(key_pos) = flags_off; } - const std::tuple &GetPausedPosition( - uint16_t core_id) const + const std::tuple & + GetPausedPosition(uint16_t core_id) const { return paused_pos_.at(core_id); } @@ -7995,12 +8007,14 @@ struct UploadBatchCc : public CcRequestBase uint32_t node_group_id_{0}; int64_t *node_group_term_{nullptr}; bool is_remote_{false}; + // -1 means broadcast to all shards(used by hash partition) + int32_t partition_id_{-1}; uint32_t batch_size_{0}; size_t start_key_idx_{0}; union { - // for local request - const std::vector *entry_vector_; + // for local request: (range_size_flags, WriteEntry*) + const std::vector> *entry_vector_; // for remote request const WriteEntryTuple *entry_tuples_; }; @@ -8012,8 +8026,10 @@ struct UploadBatchCc : public CcRequestBase // This two variables may be accessed by multi-cores. std::atomic unfinished_cnt_{0}; std::atomic err_code_{CcErrorCode::NO_ERROR}; - // key index, key offset, record offset, ts offset, record status offset - std::vector> paused_pos_; + // key index, key offset, record offset, ts offset, record status offset, + // range_size_flags offset + std::vector> + paused_pos_; UploadBatchType data_type_{UploadBatchType::SkIndexData}; }; diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index 8f692e2c..0e1d6644 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -1558,7 +1558,8 @@ class ObjectCcMap : public TemplateCcMap next_ts_offset = ts_offset; next_status_offset = status_offset; - auto [key_str, rec_str, ts_str, status_str] = *entry_tuples; + auto [key_str, rec_str, ts_str, status_str, flags_str] = + *entry_tuples; // deserialize key decoded_key.Deserialize( key_str.data(), next_key_offset, KeySchema()); @@ -1726,7 +1727,8 @@ class ObjectCcMap : public TemplateCcMap key_offset, rec_offset, ts_offset, - status_offset); + status_offset, + 0); shard_->Enqueue(shard_->LocalCoreId(), &req); return false; } diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 0db500ed..68eeb6d8 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -7638,6 +7638,7 @@ class TemplateCcMap : public CcMap auto entry_tuples = req.EntryTuple(); size_t batch_size = req.BatchSize(); size_t start_key_index = req.StartKeyIndex(); + const int32_t partition_id = req.PartitionId(); const TxRecord *req_rec = nullptr; @@ -7647,6 +7648,7 @@ class TemplateCcMap : public CcMap ValueT decoded_rec; uint64_t commit_ts = 0; RecordStatus rec_status = RecordStatus::Normal; + uint8_t range_size_flags = 0; auto &resume_pos = req.GetPausedPosition(shard_->core_id_); size_t key_pos = std::get<0>(resume_pos); @@ -7654,6 +7656,7 @@ class TemplateCcMap : public CcMap size_t rec_offset = std::get<2>(resume_pos); size_t ts_offset = std::get<3>(resume_pos); size_t status_offset = std::get<4>(resume_pos); + size_t flags_offset = std::get<5>(resume_pos); size_t hash = 0; Iterator it; @@ -7666,6 +7669,7 @@ class TemplateCcMap : public CcMap size_t next_rec_offset = 0; size_t next_ts_offset = 0; size_t next_status_offset = 0; + size_t next_flags_offset = 0; for (size_t cnt = 0; key_pos < batch_size && cnt < UploadBatchCc::UploadBatchBatchSize; ++key_pos, ++cnt) @@ -7674,13 +7678,16 @@ class TemplateCcMap : public CcMap next_rec_offset = rec_offset; next_ts_offset = ts_offset; next_status_offset = status_offset; + next_flags_offset = flags_offset; + if (entry_vec != nullptr) { key_idx = start_key_index + key_pos; - // get key - key = entry_vec->at(key_idx)->key_.GetKey(); - // get record - req_rec = entry_vec->at(key_idx)->rec_.get(); + const auto &pair = entry_vec->at(key_idx); + range_size_flags = pair.first; + const WriteEntry *we = pair.second; + key = we->key_.GetKey(); + req_rec = we->rec_.get(); if (req_rec) { rec_status = RecordStatus::Normal; @@ -7692,11 +7699,12 @@ class TemplateCcMap : public CcMap commit_val = nullptr; } // get commit ts - commit_ts = entry_vec->at(key_idx)->commit_ts_; + commit_ts = we->commit_ts_; } else { - auto [key_str, rec_str, ts_str, status_str] = *entry_tuples; + auto [key_str, rec_str, ts_str, status_str, flags_str] = + *entry_tuples; // deserialize key decoded_key.Deserialize( key_str.data(), next_key_offset, KeySchema()); @@ -7719,21 +7727,43 @@ class TemplateCcMap : public CcMap // deserialize commit ts commit_ts = *((uint64_t *) (ts_str.data() + next_ts_offset)); next_ts_offset += sizeof(uint64_t); + if (RangePartitioned) + { + range_size_flags = + static_cast(flags_str[next_flags_offset]); + next_flags_offset += sizeof(uint8_t); + } } - hash = key->Hash(); - size_t core_idx = (hash & 0x3FF) % shard_->core_cnt_; - if (!(core_idx == shard_->core_id_) || commit_ts <= 1) + if (commit_ts <= 1) { - // Skip the key that does not belong to this core or - // commit ts does not greater than 1. Move to next key. + // skip the key that commit ts does not greater than 1. key_offset = next_key_offset; rec_offset = next_rec_offset; ts_offset = next_ts_offset; status_offset = next_status_offset; + if constexpr (RangePartitioned) + { + flags_offset = next_flags_offset; + } continue; } + if constexpr (!RangePartitioned) + { + hash = key->Hash(); + size_t core_idx = (hash & 0x3FF) % shard_->core_cnt_; + if (core_idx != shard_->core_id_) + { + // skip the key that does not belong to this core. + key_offset = next_key_offset; + rec_offset = next_rec_offset; + ts_offset = next_ts_offset; + status_offset = next_status_offset; + continue; + } + } + it = FindEmplace(*key); cce = it->second; cc_page = it.GetPage(); @@ -7765,6 +7795,10 @@ class TemplateCcMap : public CcMap rec_offset = next_rec_offset; ts_offset = next_ts_offset; status_offset = next_status_offset; + if constexpr (RangePartitioned) + { + flags_offset = next_flags_offset; + } continue; } @@ -7800,6 +7834,23 @@ class TemplateCcMap : public CcMap } cce->SetCkptTs(commit_ts); } + + if constexpr (RangePartitioned) + { + if ((range_size_flags >> 4) != 0) + { + int32_t delta = + (rec_status == RecordStatus::Deleted) + ? -static_cast(write_key->Size()) + : static_cast( + write_key->Size() + + (commit_val ? commit_val->Size() : 0)); + UpdateRangeSize(static_cast(partition_id), + delta, + (range_size_flags & 0x0F) != 0); + } + } + OnCommittedUpdate(cce, was_dirty); OnFlushed(cce, was_dirty); DLOG_IF(INFO, TRACE_OCC_ERR) @@ -7826,6 +7877,10 @@ class TemplateCcMap : public CcMap rec_offset = next_rec_offset; ts_offset = next_ts_offset; status_offset = next_status_offset; + if constexpr (RangePartitioned) + { + flags_offset = next_flags_offset; + } } if (key_pos < batch_size) { @@ -7837,7 +7892,8 @@ class TemplateCcMap : public CcMap key_offset, rec_offset, ts_offset, - status_offset); + status_offset, + flags_offset); shard_->Enqueue(shard_->LocalCoreId(), &req); return false; } diff --git a/tx_service/include/proto/cc_request.proto b/tx_service/include/proto/cc_request.proto index ae877163..812bf830 100644 --- a/tx_service/include/proto/cc_request.proto +++ b/tx_service/include/proto/cc_request.proto @@ -176,6 +176,10 @@ message UploadBatchRequest bytes commit_ts = 9; bytes rec_status = 10; UploadBatchKind kind = 11; + // Target range partition; + int32 partition_id = 12; + // Per-key one byte: [uint8_t, ...] + bytes range_size_flags = 13; } message UploadBatchSlicesRequest diff --git a/tx_service/include/sk_generator.h b/tx_service/include/sk_generator.h index 050d6b27..b33941e8 100644 --- a/tx_service/include/sk_generator.h +++ b/tx_service/include/sk_generator.h @@ -40,8 +40,11 @@ class UploadIndexContext public: using TableIndexSet = std::unordered_map>; - using NGIndexSet = - std::unordered_map>; + // ng_id -> (range_id -> vector of (range_size_flags, WriteEntry*)) + using NGIndexSet = std::unordered_map< + NodeGroupId, + std::unordered_map>>>; private: enum struct UploadTaskStatus @@ -101,16 +104,18 @@ class UploadIndexContext CcErrorCode UploadEncodedIndex(UploadIndexTask &upload_task); CcErrorCode UploadIndexInternal( std::unordered_map &ng_index_set); - void SendIndexes(const TableName &table_name, - NodeGroupId dest_ng_id, - int64_t &ng_term, - const std::vector &write_entry_vec, - size_t batch_size, - size_t start_key_idx, - bthread::Mutex &req_mux, - bthread::ConditionVariable &req_cv, - size_t &finished_req_cnt, - CcErrorCode &res_code); + void SendIndexes( + const TableName &table_name, + NodeGroupId dest_ng_id, + int64_t &ng_term, + int32_t partition_id, + const std::vector> &write_entry_vec, + size_t batch_size, + size_t start_key_idx, + bthread::Mutex &req_mux, + bthread::ConditionVariable &req_cv, + size_t &finished_req_cnt, + CcErrorCode &res_code); // Acquire and release range read lock. CcErrorCode AcquireRangeReadLocks( TransactionExecution *acq_lock_txm, diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index 2c1c9a76..daf203a6 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -4925,6 +4925,7 @@ void LocalCcShards::DataSyncForHashPartition( req_ptr = upload_batch_closure->UploadBatchRequest(); req_ptr->set_node_group_id(dest_ng); req_ptr->set_node_group_term(-1); + req_ptr->set_partition_id(-1); req_ptr->set_table_name_str(table_name.String()); req_ptr->set_table_type( remote::ToRemoteType::ConvertTableType( diff --git a/tx_service/src/remote/cc_node_service.cpp b/tx_service/src/remote/cc_node_service.cpp index 872ed273..9d2da758 100644 --- a/tx_service/src/remote/cc_node_service.cpp +++ b/tx_service/src/remote/cc_node_service.cpp @@ -1172,6 +1172,7 @@ void CcNodeService::UploadBatch( NodeGroupId ng_id = request->node_group_id(); int64_t ng_term = request->node_group_term(); + int32_t partition_id = request->partition_id(); std::string_view table_name_sv{request->table_name_str()}; TableType table_type = @@ -1199,14 +1200,15 @@ void CcNodeService::UploadBatch( << " for table:" << table_name.Trace(); LocalCcShards *cc_shards = Sharder::Instance().GetLocalCcShards(); - size_t core_cnt = cc_shards->Count(); + size_t core_cnt = (partition_id >= 0) ? 1 : cc_shards->Count(); uint32_t batch_size = request->batch_size(); auto write_entry_tuple = UploadBatchCc::WriteEntryTuple(request->keys(), request->records(), request->commit_ts(), - request->rec_status()); + request->rec_status(), + request->range_size_flags()); size_t finished_req = 0; bthread::Mutex req_mux; @@ -1217,6 +1219,7 @@ void CcNodeService::UploadBatch( req.Reset(table_name, ng_id, ng_term, + partition_id, core_cnt, batch_size, write_entry_tuple, @@ -1224,9 +1227,18 @@ void CcNodeService::UploadBatch( req_cv, finished_req, data_type); - for (size_t core = 0; core < core_cnt; ++core) + if (partition_id >= 0) { - cc_shards->EnqueueToCcShard(core, &req); + uint16_t dest_core = + static_cast(partition_id % cc_shards->Count()); + cc_shards->EnqueueToCcShard(dest_core, &req); + } + else + { + for (size_t core = 0; core < cc_shards->Count(); ++core) + { + cc_shards->EnqueueToCcShard(core, &req); + } } { diff --git a/tx_service/src/sk_generator.cpp b/tx_service/src/sk_generator.cpp index e3fc928e..9bf3b40f 100644 --- a/tx_service/src/sk_generator.cpp +++ b/tx_service/src/sk_generator.cpp @@ -680,37 +680,41 @@ CcErrorCode UploadIndexContext::UploadIndexInternal( size_t finished_upload_count = 0; CcErrorCode upload_res_code = CcErrorCode::NO_ERROR; size_t upload_req_count = 0; + for (auto &[table_name, ng_entries] : ng_index_set) { - for (auto &[ng_id, entry_vec] : ng_entries) + for (auto &[ng_id, range_entries] : ng_entries) { - entry_vec_size = entry_vec.size(); - batch_req_cnt = (entry_vec_size / upload_batch_size_ + - (entry_vec_size % upload_batch_size_ ? 1 : 0)); - int64_t &expected_term = leader_terms_.at(ng_id); - size_t start_idx = 0; - size_t end_idx = - (batch_req_cnt > 1 ? upload_batch_size_ : entry_vec_size); - for (size_t idx = 0; idx < batch_req_cnt; ++idx) + for (auto &[range_id, entry_vec] : range_entries) { - SendIndexes(table_name, - ng_id, - expected_term, - entry_vec, - (end_idx - start_idx), - start_idx, - req_mux, - req_cv, - finished_upload_count, - upload_res_code); - ++upload_req_count; - // Next batch - start_idx = end_idx; - end_idx = ((start_idx + upload_batch_size_) > entry_vec_size - ? entry_vec_size - : (start_idx + upload_batch_size_)); + entry_vec_size = entry_vec.size(); + batch_req_cnt = (entry_vec_size / upload_batch_size_ + + (entry_vec_size % upload_batch_size_ ? 1 : 0)); + + size_t start_idx = 0; + size_t end_idx = + (batch_req_cnt > 1 ? upload_batch_size_ : entry_vec_size); + for (size_t idx = 0; idx < batch_req_cnt; ++idx) + { + SendIndexes(table_name, + ng_id, + expected_term, + range_id, + entry_vec, + (end_idx - start_idx), + start_idx, + req_mux, + req_cv, + finished_upload_count, + upload_res_code); + ++upload_req_count; + start_idx = end_idx; + end_idx = ((start_idx + upload_batch_size_) > entry_vec_size + ? entry_vec_size + : (start_idx + upload_batch_size_)); + } } } } @@ -730,7 +734,8 @@ void UploadIndexContext::SendIndexes( const TableName &table_name, NodeGroupId dest_ng_id, int64_t &ng_term, - const std::vector &write_entry_vec, + int32_t partition_id, + const std::vector> &write_entry_vec, size_t batch_size, size_t start_key_idx, bthread::Mutex &req_mux, @@ -740,14 +745,13 @@ void UploadIndexContext::SendIndexes( { uint32_t dest_node_id = Sharder::Instance().LeaderNodeId(dest_ng_id); LocalCcShards *cc_shards = Sharder::Instance().GetLocalCcShards(); - size_t core_cnt = cc_shards->Count(); if (dest_node_id == cc_shards->NodeId()) { UploadBatchCc *req_ptr = NextRequest(); req_ptr->Reset(table_name, dest_ng_id, ng_term, - core_cnt, + partition_id, batch_size, start_key_idx, write_entry_vec, @@ -757,10 +761,9 @@ void UploadIndexContext::SendIndexes( res_code, UploadBatchType::SkIndexData); - for (size_t core = 0; core < core_cnt; ++core) - { - cc_shards->EnqueueToCcShard(core, req_ptr); - } + uint16_t dest_core = + static_cast(partition_id % cc_shards->Count()); + cc_shards->EnqueueToCcShard(dest_core, req_ptr); } else { @@ -834,6 +837,7 @@ void UploadIndexContext::SendIndexes( remote::ToRemoteType::ConvertTableType(table_name.Type())); req_ptr->set_table_engine( remote::ToRemoteType::ConvertTableEngine(table_name.Engine())); + req_ptr->set_partition_id(partition_id); size_t end_key_idx = start_key_idx + batch_size; req_ptr->set_kind(remote::UploadBatchKind::SK_DATA); req_ptr->set_batch_size(batch_size); @@ -853,15 +857,24 @@ void UploadIndexContext::SendIndexes( std::string *rec_status_str = req_ptr->mutable_rec_status(); // All generated sk should be normal status. const RecordStatus rec_status = RecordStatus::Normal; + // range_size_flags + req_ptr->clear_range_size_flags(); + std::string *range_size_flags_str = req_ptr->mutable_range_size_flags(); + for (size_t idx = start_key_idx; idx < end_key_idx; ++idx) { - write_entry_vec.at(idx)->key_.Serialize(*keys_str); - write_entry_vec.at(idx)->rec_->Serialize(*recs_str); - val_ptr = reinterpret_cast( - &(write_entry_vec.at(idx)->commit_ts_)); + uint8_t range_size_flags = write_entry_vec.at(idx).first; + WriteEntry *write_entry = write_entry_vec.at(idx).second; + write_entry->key_.Serialize(*keys_str); + write_entry->rec_->Serialize(*recs_str); + val_ptr = + reinterpret_cast(&(write_entry->commit_ts_)); commit_ts_str->append(val_ptr, len_sizeof); rec_status_str->append(reinterpret_cast(&rec_status), sizeof(rec_status)); + range_size_flags_str->append( + reinterpret_cast(&range_size_flags), + sizeof(range_size_flags)); } brpc::Controller *cntl_ptr = upload_batch_closure->Controller(); @@ -989,17 +1002,24 @@ void UploadIndexContext::AdvanceWriteEntryForRangeInfo( size_t new_range_idx = 0; auto *range_info = range_record.GetRangeInfo(); + const int32_t range_id = range_info->PartitionId(); + const uint8_t default_flags = + 0x10 | static_cast(range_info->IsDirty()); while (cur_write_entry_it != next_range_start) { WriteEntry &write_entry = *cur_write_entry_it; - auto ng_it = ng_write_entrys.try_emplace(range_ng); - ng_it.first->second.push_back(&write_entry); + auto &range_vec = ng_write_entrys[range_ng][range_id]; + range_vec.emplace_back(default_flags, &write_entry); + uint8_t *old_range_flags_ptr = &range_vec.back().first; + + uint8_t *new_bucket_flags_ptr = nullptr; // If current range is migrating, forward to new range owner. if (new_bucket_ng != UINT32_MAX) { - ng_write_entrys.try_emplace(new_bucket_ng) - .first->second.push_back(&write_entry); + auto &new_bucket_vec = ng_write_entrys[new_bucket_ng][range_id]; + new_bucket_vec.emplace_back(default_flags, &write_entry); + new_bucket_flags_ptr = &new_bucket_vec.back().first; } // If range is splitting and the key will fall on a new range after @@ -1016,18 +1036,25 @@ void UploadIndexContext::AdvanceWriteEntryForRangeInfo( } if (new_range_ng != UINT32_MAX) { - if (new_range_ng != range_ng) - { - ng_write_entrys.try_emplace(new_range_ng) - .first->second.push_back(&write_entry); - } + const int32_t new_range_id = + range_info->NewPartitionId()->at(new_range_idx - 1); + + ng_write_entrys[new_range_ng][new_range_id].emplace_back( + default_flags, &write_entry); + // Only update range size on the new range + *old_range_flags_ptr &= 0x0F; + // If the new range is migrating, forward to the new owner of new // range. if (new_range_new_bucket_ng != UINT32_MAX && new_range_new_bucket_ng != range_ng) { - ng_write_entrys.try_emplace(new_range_new_bucket_ng) - .first->second.push_back(&write_entry); + ng_write_entrys[new_range_new_bucket_ng][new_range_id] + .emplace_back(default_flags, &write_entry); + if (new_bucket_flags_ptr) + { + *new_bucket_flags_ptr &= 0x0F; + } } } From 6a2d770f4cc736bd642879acd76aa4b47f079629 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Wed, 4 Mar 2026 14:46:21 +0800 Subject: [PATCH 15/15] update delta size of ccentry --- tx_service/include/cc/template_cc_map.h | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 68eeb6d8..68dc7de7 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -623,8 +623,13 @@ class TemplateCcMap : public CcMap (new_status == RecordStatus::Deleted) ? (-static_cast(write_key->Size() + old_payload_size)) - : (static_cast(cce->PayloadSize()) - - static_cast(old_payload_size)); + : (cce_old_status != RecordStatus::Normal + ? static_cast( + write_key->Size() + + cce->PayloadSize()) + : static_cast( + cce->PayloadSize() - + old_payload_size)); const uint32_t range_id = req.PartitionId(); // is_dirty: true when range is splitting. UpdateRangeSize(range_id, @@ -7802,6 +7807,7 @@ class TemplateCcMap : public CcMap continue; } + [[maybe_unused]] const size_t old_payload_size = cce->PayloadSize(); // Now, all versions of non-unique SecondaryIndex key shared // the unpack info in current version's payload, though the // unpack info will not be used for deleted key, we must not @@ -7821,6 +7827,8 @@ class TemplateCcMap : public CcMap } bool was_dirty = cce->IsDirty(); + [[maybe_unused]] const RecordStatus cce_old_status = + cce->PayloadStatus(); cce->SetCommitTsPayloadStatus(commit_ts, rec_status); if (req.Kind() == UploadBatchType::DirtyBucketData) { @@ -7841,10 +7849,13 @@ class TemplateCcMap : public CcMap { int32_t delta = (rec_status == RecordStatus::Deleted) - ? -static_cast(write_key->Size()) - : static_cast( - write_key->Size() + - (commit_val ? commit_val->Size() : 0)); + ? -(static_cast(write_key->Size() + + old_payload_size)) + : (cce_old_status != RecordStatus::Normal + ? static_cast(write_key->Size() + + cce->PayloadSize()) + : static_cast(cce->PayloadSize() - + old_payload_size)); UpdateRangeSize(static_cast(partition_id), delta, (range_size_flags & 0x0F) != 0);