diff --git a/store_handler/bigtable_handler.cpp b/store_handler/bigtable_handler.cpp index 52a712de..172c321f 100644 --- a/store_handler/bigtable_handler.cpp +++ b/store_handler/bigtable_handler.cpp @@ -710,6 +710,13 @@ void EloqDS::BigTableHandler::FetchRangeSlices( fetch_cc)); } +void EloqDS::BigTableHandler::FetchTableRangeSize( + txservice::FetchTableRangeSizeCc *fetch_cc) +{ + LOG(ERROR) << "BigTableHandler::FetchTableRangeSize not implemented"; + assert(false); +} + void EloqDS::BigTableHandler::OnFetchRangeSlices( google::cloud::future>> f, diff --git a/store_handler/bigtable_handler.h b/store_handler/bigtable_handler.h index 10006bbe..e3ccd39c 100644 --- a/store_handler/bigtable_handler.h +++ b/store_handler/bigtable_handler.h @@ -82,6 +82,9 @@ class BigTableHandler : public txservice::store::DataStoreHandler void FetchRangeSlices(txservice::FetchRangeSlicesReq *fetch_cc) override; + void FetchTableRangeSize( + txservice::FetchTableRangeSizeCc *fetch_cc) override; + /** * @brief Read a row from base table or skindex table in datastore with * specified key. Caller should pass in complete primary key or skindex key. diff --git a/store_handler/data_store_service_client.cpp b/store_handler/data_store_service_client.cpp index 0609c563..4f468eeb 100644 --- a/store_handler/data_store_service_client.cpp +++ b/store_handler/data_store_service_client.cpp @@ -1038,6 +1038,30 @@ void DataStoreServiceClient::FetchRangeSlices( &FetchRangeSlicesCallback); } +void DataStoreServiceClient::FetchTableRangeSize( + txservice::FetchTableRangeSizeCc *fetch_cc) +{ + txservice::TableName range_table_name(fetch_cc->table_name_->StringView(), + txservice::TableType::RangePartition, + fetch_cc->table_name_->Engine()); + + int32_t kv_partition_id = + KvPartitionIdOfRangeSlices(range_table_name, fetch_cc->partition_id_); + uint32_t shard_id = GetShardIdByPartitionId(kv_partition_id, false); + + auto catalog_factory = GetCatalogFactory(range_table_name.Engine()); + assert(catalog_factory != nullptr); + fetch_cc->kv_start_key_ = + EncodeRangeKey(catalog_factory, range_table_name, fetch_cc->start_key_); + + Read(kv_range_table_name, + kv_partition_id, + shard_id, + fetch_cc->kv_start_key_, + fetch_cc, + &FetchRangeSizeCallback); +} + /** * @brief Deletes data that is out of the specified range. * @@ -1254,16 +1278,19 @@ std::string DataStoreServiceClient::EncodeRangeKey( * @param range_version The version of the range. * @param version The general version number. * @param segment_cnt The number of segments in the range. + * @param range_size The size of the range. * @return Binary string containing the encoded range value. */ std::string DataStoreServiceClient::EncodeRangeValue(int32_t range_id, uint64_t range_version, uint64_t version, - uint32_t segment_cnt) + uint32_t segment_cnt, + int32_t range_size) { std::string kv_range_record; kv_range_record.reserve(sizeof(int32_t) + sizeof(uint64_t) + - sizeof(uint64_t) + sizeof(uint32_t)); + sizeof(uint64_t) + sizeof(uint32_t) + + sizeof(int32_t)); kv_range_record.append(reinterpret_cast(&range_id), sizeof(int32_t)); kv_range_record.append(reinterpret_cast(&range_version), @@ -1273,6 +1300,8 @@ std::string DataStoreServiceClient::EncodeRangeValue(int32_t range_id, // segment_cnt of slices kv_range_record.append(reinterpret_cast(&segment_cnt), sizeof(uint32_t)); + kv_range_record.append(reinterpret_cast(&range_size), + sizeof(int32_t)); return kv_range_record; } @@ -1340,6 +1369,7 @@ RangeSliceBatchPlan DataStoreServiceClient::PrepareRangeSliceBatches( RangeSliceBatchPlan plan; plan.segment_cnt = 0; plan.version = version; + plan.range_size = 0; // Estimate capacity based on slices size plan.segment_keys.reserve(slices.size() / 10 + 1); // Rough estimate @@ -1388,6 +1418,7 @@ RangeSliceBatchPlan DataStoreServiceClient::PrepareRangeSliceBatches( sizeof(uint32_t)); segment_record.append(slice_start_key.Data(), key_size); uint32_t slice_size = static_cast(slices[i]->Size()); + plan.range_size += static_cast(slice_size); segment_record.append(reinterpret_cast(&slice_size), sizeof(uint32_t)); } @@ -1553,6 +1584,7 @@ void DataStoreServiceClient::EnqueueRangeMetadataRecord( uint64_t range_version, uint64_t version, uint32_t segment_cnt, + int32_t range_size, RangeMetadataAccumulator &accumulator) { // Compute kv_table_name and kv_partition_id @@ -1563,8 +1595,8 @@ void DataStoreServiceClient::EnqueueRangeMetadataRecord( // Encode key and value std::string key_str = EncodeRangeKey(catalog_factory, table_name, range_start_key); - std::string rec_str = - EncodeRangeValue(partition_id, range_version, version, segment_cnt); + std::string rec_str = EncodeRangeValue( + partition_id, range_version, version, segment_cnt, range_size); // Get or create entry in accumulator auto key = std::make_pair(kv_table_name, kv_partition_id); @@ -1732,6 +1764,7 @@ bool DataStoreServiceClient::UpdateRangeSlices( req.range_slices_, req.partition_id_); uint32_t segment_cnt = slice_plan.segment_cnt; + int32_t range_size = slice_plan.range_size; int32_t kv_partition_id = KvPartitionIdOfRangeSlices(*req.table_name_, req.partition_id_); auto iter = slice_plans.find(kv_partition_id); @@ -1756,6 +1789,7 @@ bool DataStoreServiceClient::UpdateRangeSlices( req.range_version_, req.ckpt_ts_, segment_cnt, + range_size, meta_acc); } @@ -1957,6 +1991,7 @@ bool DataStoreServiceClient::UpdateRangeSlices( range_version, version, segment_cnt, + slice_plans[0].range_size, meta_acc); SyncConcurrentRequest *meta_sync_concurrent = @@ -2048,6 +2083,7 @@ bool DataStoreServiceClient::UpsertRanges( auto slice_plan = PrepareRangeSliceBatches( table_name, version, range.slices_, range.partition_id_); uint32_t segment_cnt = slice_plan.segment_cnt; + int32_t range_size = slice_plan.range_size; int32_t kv_partition_id = KvPartitionIdOfRangeSlices(table_name, range.partition_id_); @@ -2071,6 +2107,7 @@ bool DataStoreServiceClient::UpsertRanges( version, // range_version (using version for now) version, segment_cnt, + range_size, meta_acc); } @@ -4651,7 +4688,8 @@ bool DataStoreServiceClient::InitTableRanges( std::string key_str = EncodeRangeKey(catalog_factory, table_name, *neg_inf_key); - std::string rec_str = EncodeRangeValue(init_range_id, version, version, 0); + std::string rec_str = + EncodeRangeValue(init_range_id, version, version, 0, 0); keys.emplace_back(std::string_view(key_str.data(), key_str.size())); records.emplace_back(std::string_view(rec_str.data(), rec_str.size())); diff --git a/store_handler/data_store_service_client.h b/store_handler/data_store_service_client.h index 66f6618b..269d0666 100644 --- a/store_handler/data_store_service_client.h +++ b/store_handler/data_store_service_client.h @@ -66,6 +66,7 @@ struct RangeSliceBatchPlan std::vector segment_keys; // Owned string buffers std::vector segment_records; // Owned string buffers size_t version; + int32_t range_size{0}; // Clear method for reuse void Clear() @@ -74,6 +75,7 @@ struct RangeSliceBatchPlan segment_keys.clear(); segment_records.clear(); version = 0; + range_size = 0; } }; @@ -271,6 +273,9 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler void FetchRangeSlices(txservice::FetchRangeSlicesReq *fetch_cc) override; + void FetchTableRangeSize( + txservice::FetchTableRangeSizeCc *fetch_cc) override; + bool DeleteOutOfRangeData( const txservice::TableName &table_name, int32_t partition_id, @@ -339,7 +344,8 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler std::string EncodeRangeValue(int32_t range_id, uint64_t range_version, uint64_t version, - uint32_t segment_cnt); + uint32_t segment_cnt, + int32_t range_size); std::string EncodeRangeSliceKey(const txservice::TableName &table_name, int32_t range_id, uint32_t segment_id); @@ -642,6 +648,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler uint64_t range_version, uint64_t version, uint32_t segment_cnt, + int32_t range_size, RangeMetadataAccumulator &accumulator); void DispatchRangeMetadataBatches( @@ -922,6 +929,11 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler ::google::protobuf::Closure *closure, DataStoreServiceClient &client, const remote::CommonResult &result); + + friend void FetchRangeSizeCallback(void *data, + ::google::protobuf::Closure *closure, + DataStoreServiceClient &client, + const remote::CommonResult &result); }; struct UpsertTableData diff --git a/store_handler/data_store_service_client_closure.cpp b/store_handler/data_store_service_client_closure.cpp index fbf7d34b..8f000004 100644 --- a/store_handler/data_store_service_client_closure.cpp +++ b/store_handler/data_store_service_client_closure.cpp @@ -793,8 +793,9 @@ void FetchTableRangesCallback(void *data, for (uint32_t i = 0; i < items_size; i++) { scan_next_closure->GetItem(i, key, value, ts, ttl); - assert(value.size() == (sizeof(int32_t) + sizeof(uint64_t) + - sizeof(uint64_t) + sizeof(uint32_t))); + assert(value.size() == + (sizeof(int32_t) + sizeof(uint64_t) + sizeof(uint64_t) + + sizeof(uint32_t) + sizeof(int32_t))); const char *buf = value.data(); int32_t partition_id = *(reinterpret_cast(buf)); buf += sizeof(partition_id); @@ -926,6 +927,45 @@ void FetchTableRangesCallback(void *data, } } +void FetchRangeSizeCallback(void *data, + ::google::protobuf::Closure *closure, + DataStoreServiceClient &client, + const remote::CommonResult &result) +{ + txservice::FetchTableRangeSizeCc *fetch_range_size_cc = + static_cast(data); + + if (result.error_code() == remote::DataStoreError::KEY_NOT_FOUND) + { + fetch_range_size_cc->store_range_size_ = 0; + fetch_range_size_cc->SetFinish( + static_cast(txservice::CcErrorCode::NO_ERROR)); + } + else if (result.error_code() != remote::DataStoreError::NO_ERROR) + { + LOG(ERROR) << "Fetch range size failed with error code: " + << result.error_code(); + fetch_range_size_cc->SetFinish( + static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); + } + else + { + ReadClosure *read_closure = static_cast(closure); + std::string_view read_val = read_closure->Value(); + assert(read_closure->TableName() == kv_range_table_name); + assert(read_val.size() == + (sizeof(int32_t) + sizeof(uint64_t) + sizeof(uint64_t) + + sizeof(uint32_t) + sizeof(int32_t))); + const char *buf = read_val.data(); + buf += read_val.size() - sizeof(int32_t); + fetch_range_size_cc->store_range_size_ = + *reinterpret_cast(buf); + + fetch_range_size_cc->SetFinish( + static_cast(txservice::CcErrorCode::NO_ERROR)); + } +} + void FetchRangeSlicesCallback(void *data, ::google::protobuf::Closure *closure, DataStoreServiceClient &client, @@ -966,8 +1006,9 @@ void FetchRangeSlicesCallback(void *data, else { assert(read_closure->TableName() == kv_range_table_name); - assert(read_val.size() == (sizeof(int32_t) + sizeof(uint64_t) + - sizeof(uint64_t) + sizeof(uint32_t))); + assert(read_val.size() == + (sizeof(int32_t) + sizeof(uint64_t) + sizeof(uint64_t) + + sizeof(uint32_t) + sizeof(int32_t))); const char *buf = read_val.data(); int32_t range_partition_id = *(reinterpret_cast(buf)); diff --git a/store_handler/data_store_service_client_closure.h b/store_handler/data_store_service_client_closure.h index 4bb72373..b8c3813c 100644 --- a/store_handler/data_store_service_client_closure.h +++ b/store_handler/data_store_service_client_closure.h @@ -3102,6 +3102,14 @@ void FetchTableRangesCallback(void *data, DataStoreServiceClient &client, const remote::CommonResult &result); +/** + * Callback for fetching range size from table_ranges. + */ +void FetchRangeSizeCallback(void *data, + ::google::protobuf::Closure *closure, + DataStoreServiceClient &client, + const remote::CommonResult &result); + /** * Callback for fetching range slices. * diff --git a/store_handler/dynamo_handler.cpp b/store_handler/dynamo_handler.cpp index 0aa7ef78..5bfa9029 100644 --- a/store_handler/dynamo_handler.cpp +++ b/store_handler/dynamo_handler.cpp @@ -2534,6 +2534,12 @@ void EloqDS::DynamoHandler::FetchRangeSlices(FetchRangeSlicesReq *fetch_cc) assert(false); } +void EloqDS::DynamoHandler::FetchTableRangeSize(FetchTableRangeSizeCc *fetch_cc) +{ + LOG(ERROR) << "DynamoHandler::FetchTableRangeSize not implemented"; + assert(false); +} + void EloqDS::DynamoHandler::OnFetchRangeSlices( const Aws::DynamoDB::DynamoDBClient *client, const Aws::DynamoDB::Model::GetItemRequest &request, diff --git a/store_handler/dynamo_handler.h b/store_handler/dynamo_handler.h index f2fc9ba5..704200e6 100644 --- a/store_handler/dynamo_handler.h +++ b/store_handler/dynamo_handler.h @@ -158,6 +158,7 @@ class DynamoHandler : public txservice::store::DataStoreHandler //-- range partition void FetchTableRanges(FetchTableRangesCc *fetch_cc) override; void FetchRangeSlices(FetchRangeSlicesReq *fetch_cc) override; + void FetchTableRangeSize(FetchTableRangeSizeCc *fetch_cc) override; bool DeleteOutOfRangeData( const txservice::TableName &table_name, diff --git a/store_handler/rocksdb_handler.cpp b/store_handler/rocksdb_handler.cpp index fad24c4b..4916a2d3 100644 --- a/store_handler/rocksdb_handler.cpp +++ b/store_handler/rocksdb_handler.cpp @@ -1110,6 +1110,13 @@ void RocksDBHandler::FetchRangeSlices(txservice::FetchRangeSlicesReq *fetch_cc) assert(false); } +void RocksDBHandler::FetchTableRangeSize( + txservice::FetchTableRangeSizeCc *fetch_cc) +{ + LOG(ERROR) << "RocksDBHandler::FetchTableRangeSize not implemented"; + assert(false); +} + bool DeleteOutOfRangeDataInternal(std::string delete_from_partition_sql, int32_t partition_id, const txservice::TxKey *start_k) diff --git a/store_handler/rocksdb_handler.h b/store_handler/rocksdb_handler.h index c8717a49..8742b064 100644 --- a/store_handler/rocksdb_handler.h +++ b/store_handler/rocksdb_handler.h @@ -346,6 +346,9 @@ class RocksDBHandler : public txservice::store::DataStoreHandler void FetchRangeSlices(txservice::FetchRangeSlicesReq *fetch_cc) override; + void FetchTableRangeSize( + txservice::FetchTableRangeSizeCc *fetch_cc) override; + bool DeleteOutOfRangeDataInternal(std::string delete_from_partition_sql, int32_t partition_id, const txservice::TxKey *start_k); diff --git a/tx_service/include/cc/cc_handler.h b/tx_service/include/cc/cc_handler.h index 3d4640b8..ed494d60 100644 --- a/tx_service/include/cc/cc_handler.h +++ b/tx_service/include/cc/cc_handler.h @@ -166,7 +166,8 @@ class CcHandler const TxRecord *record, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult &hres) = 0; + CcHandlerResult &hres, + uint8_t range_size_flags = 0x10) = 0; /** * @briefPost-processes a read/scan key. Post-processing clears the read diff --git a/tx_service/include/cc/cc_map.h b/tx_service/include/cc/cc_map.h index 0d1434b6..b0822ae8 100644 --- a/tx_service/include/cc/cc_map.h +++ b/tx_service/include/cc/cc_map.h @@ -21,10 +21,12 @@ */ #pragma once +#include #include #include #include // std::pair +#include "absl/container/flat_hash_map.h" #include "cc/cc_req_base.h" #include "cc_protocol.h" #include "error_messages.h" // CcErrorCode @@ -260,6 +262,18 @@ class CcMap virtual const txservice::KeySchema *KeySchema() const = 0; virtual const txservice::RecordSchema *RecordSchema() const = 0; + /** + * Called by FetchTableRangeSizeCc::Execute when async load completes. + * Merges loaded size with accumulated delta (second), or resets to + * kNotInitialized on failure. + * When emplace is true and partition_id is absent, inserts (partition_id, + * (0,0)) before merging; used for new ranges after split. + */ + void InitRangeSize(uint32_t partition_id, + int32_t persisted_size, + bool succeed = true, + bool emplace = false); + uint64_t SchemaTs() const { return schema_ts_; @@ -294,6 +308,13 @@ class CcMap uint64_t last_dirty_commit_ts_{0}; protected: + // Range id -> (range_size, delta_range_size). Only used when + // RangePartitioned. + // - first: current range size; RangeSizeState::Loading (-1) = loading from + // store; RangeSizeState::Uninitialized (-2) = not yet loaded. + // - second: delta accumulated during load (first==-1) or split (first>=0). + absl::flat_hash_map> range_sizes_; + /** * @brief After the input request is executed at the current shard, moves * the request to another shard for execution. diff --git a/tx_service/include/cc/cc_req_misc.h b/tx_service/include/cc/cc_req_misc.h index ed9ea8b9..f19e7a95 100644 --- a/tx_service/include/cc/cc_req_misc.h +++ b/tx_service/include/cc/cc_req_misc.h @@ -1148,4 +1148,35 @@ struct ShardCleanCc : public CcRequestBase private: size_t free_count_{0}; }; + +struct FetchTableRangeSizeCc : public CcRequestBase +{ +public: + FetchTableRangeSizeCc() = default; + ~FetchTableRangeSizeCc() = default; + + void Reset(const TableName &table_name, + int32_t partition_id, + const TxKey &start_key, + CcShard *ccs, + NodeGroupId ng_id, + int64_t ng_term); + + bool ValidTermCheck(); + bool Execute(CcShard &ccs) override; + void SetFinish(uint32_t error); + + const TableName *table_name_; + int32_t partition_id_{0}; + TxKey start_key_{}; + NodeGroupId node_group_id_{0}; + int64_t node_group_term_{-1}; + CcShard *ccs_{nullptr}; + + uint32_t error_code_{0}; + int32_t store_range_size_{0}; + + // Only used in DataStoreHandler + std::string kv_start_key_; +}; } // namespace txservice diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index 97e93fae..54610a36 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -740,7 +740,8 @@ struct PostWriteCc : public TemplatedCcRequest const TxRecord *rec, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult *res) + CcHandlerResult *res, + uint8_t range_size_flags = 0x10) { TemplatedCcRequest::Reset( nullptr, res, addr->NodeGroupId(), tx_number, tx_term); @@ -754,6 +755,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = false; ccm_ = nullptr; is_initial_insert_ = false; + range_size_flags_ = range_size_flags; } void Reset(const TxKey *key, @@ -767,7 +769,8 @@ struct PostWriteCc : public TemplatedCcRequest uint32_t key_shard_code, CcHandlerResult *res, bool initial_insertion = false, - int64_t ng_term = INIT_TERM) + int64_t ng_term = INIT_TERM, + uint8_t range_size_flags = 0x10) { TemplatedCcRequest::Reset( &table_name, @@ -788,6 +791,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = false; ccm_ = nullptr; is_initial_insert_ = initial_insertion; + range_size_flags_ = range_size_flags; } void Reset(const CcEntryAddr *addr, @@ -797,7 +801,8 @@ struct PostWriteCc : public TemplatedCcRequest const std::string *rec, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult *res) + CcHandlerResult *res, + uint8_t range_size_flags = 0x10) { TemplatedCcRequest::Reset( nullptr, res, addr->NodeGroupId(), tx_number, tx_term); @@ -811,6 +816,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = true; ccm_ = nullptr; is_initial_insert_ = false; + range_size_flags_ = range_size_flags; } void Reset(const TableName *table_name, @@ -824,7 +830,8 @@ struct PostWriteCc : public TemplatedCcRequest uint32_t key_shard_code, CcHandlerResult *res, bool initial_insertion = false, - int64_t ng_term = INIT_TERM) + int64_t ng_term = INIT_TERM, + uint8_t range_size_flags = 0x10) { TemplatedCcRequest::Reset( table_name, @@ -845,6 +852,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = true; ccm_ = nullptr; is_initial_insert_ = initial_insertion; + range_size_flags_ = range_size_flags; } const CcEntryAddr *CceAddr() const @@ -877,6 +885,13 @@ struct PostWriteCc : public TemplatedCcRequest return key_shard_code_; } + // Low 10 bits of key_shard_code_: range partition id (when + // range-partitioned). + uint32_t PartitionId() const + { + return key_shard_code_ & 0x3FF; + } + const void *Key() const { return is_remote_ ? nullptr : key_; @@ -892,6 +907,16 @@ struct PostWriteCc : public TemplatedCcRequest return is_initial_insert_; } + bool OnDirtyRange() const + { + return (range_size_flags_ & 0x0F) != 0; + } + + bool NeedUpdateRangeSize() const + { + return (range_size_flags_ >> 4) != 0; + } + private: const CcEntryAddr *cce_addr_; uint64_t commit_ts_; @@ -909,6 +934,9 @@ struct PostWriteCc : public TemplatedCcRequest const void *key_; const std::string *key_str_; }; + // High 4 bits: need update range size; low 4 bits: on dirty (splitting) + // range. + uint8_t range_size_flags_{0x10}; }; struct PostWriteAllCc @@ -7714,7 +7742,9 @@ struct CollectMemStatsCc : public CcRequestBase struct UploadBatchCc : public CcRequestBase { + // keys, records, commit_ts, rec_status, range_size_flags using WriteEntryTuple = std::tuple; @@ -7731,10 +7761,10 @@ struct UploadBatchCc : public CcRequestBase void Reset(const TableName &table_name, txservice::NodeGroupId ng_id, int64_t &ng_term, - size_t core_cnt, + int32_t partition_id, size_t batch_size, size_t start_key_idx, - const std::vector &entry_vec, + const std::vector> &entry_vec, bthread::Mutex &req_mux, bthread::ConditionVariable &req_cv, size_t &finished_req_cnt, @@ -7745,6 +7775,7 @@ struct UploadBatchCc : public CcRequestBase node_group_id_ = ng_id; node_group_term_ = &ng_term; is_remote_ = false; + partition_id_ = partition_id; batch_size_ = batch_size; start_key_idx_ = start_key_idx; entry_vector_ = &entry_vec; @@ -7752,16 +7783,17 @@ struct UploadBatchCc : public CcRequestBase req_cv_ = &req_cv; finished_req_cnt_ = &finished_req_cnt; req_result_ = &req_result; - unfinished_cnt_.store(core_cnt, std::memory_order_relaxed); + unfinished_cnt_.store(1, std::memory_order_relaxed); err_code_.store(CcErrorCode::NO_ERROR, std::memory_order_relaxed); paused_pos_.clear(); - paused_pos_.resize(core_cnt, {}); + paused_pos_.resize(1, {}); data_type_ = data_type; } void Reset(const TableName &table_name, txservice::NodeGroupId ng_id, int64_t &ng_term, + int32_t partition_id, size_t core_cnt, uint32_t batch_size, const WriteEntryTuple &entry_tuple, @@ -7774,6 +7806,7 @@ struct UploadBatchCc : public CcRequestBase node_group_id_ = ng_id; node_group_term_ = &ng_term; is_remote_ = true; + partition_id_ = partition_id; batch_size_ = batch_size; start_key_idx_ = 0; entry_tuples_ = &entry_tuple; @@ -7916,7 +7949,12 @@ struct UploadBatchCc : public CcRequestBase return batch_size_; } - const std::vector *EntryVector() const + int32_t PartitionId() const + { + return partition_id_; + } + + const std::vector> *EntryVector() const { return is_remote_ ? nullptr : entry_vector_; } @@ -7931,7 +7969,8 @@ struct UploadBatchCc : public CcRequestBase size_t key_off, size_t rec_off, size_t ts_off, - size_t status_off) + size_t status_off, + size_t flags_off) { auto &key_pos = paused_pos_.at(core_id); std::get<0>(key_pos) = key_index; @@ -7939,10 +7978,11 @@ struct UploadBatchCc : public CcRequestBase std::get<2>(key_pos) = rec_off; std::get<3>(key_pos) = ts_off; std::get<4>(key_pos) = status_off; + std::get<5>(key_pos) = flags_off; } - const std::tuple &GetPausedPosition( - uint16_t core_id) const + const std::tuple & + GetPausedPosition(uint16_t core_id) const { return paused_pos_.at(core_id); } @@ -7967,12 +8007,14 @@ struct UploadBatchCc : public CcRequestBase uint32_t node_group_id_{0}; int64_t *node_group_term_{nullptr}; bool is_remote_{false}; + // -1 means broadcast to all shards(used by hash partition) + int32_t partition_id_{-1}; uint32_t batch_size_{0}; size_t start_key_idx_{0}; union { - // for local request - const std::vector *entry_vector_; + // for local request: (range_size_flags, WriteEntry*) + const std::vector> *entry_vector_; // for remote request const WriteEntryTuple *entry_tuples_; }; @@ -7984,8 +8026,10 @@ struct UploadBatchCc : public CcRequestBase // This two variables may be accessed by multi-cores. std::atomic unfinished_cnt_{0}; std::atomic err_code_{CcErrorCode::NO_ERROR}; - // key index, key offset, record offset, ts offset, record status offset - std::vector> paused_pos_; + // key index, key offset, record offset, ts offset, record status offset, + // range_size_flags offset + std::vector> + paused_pos_; UploadBatchType data_type_{UploadBatchType::SkIndexData}; }; diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index ee977e68..8bcf330a 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -313,6 +313,11 @@ class CcShard */ CcMap *GetCcm(const TableName &table_name, uint32_t node_group); + void FetchTableRangeSize(const TableName &table_name, + int32_t partition_id, + NodeGroupId cc_ng_id, + int64_t cc_ng_term); + void AdjustDataKeyStats(const TableName &table_name, int64_t size_delta, int64_t dirty_delta); @@ -1213,6 +1218,7 @@ class CcShard CcRequestPool fill_store_slice_cc_pool_; CcRequestPool init_key_cache_cc_pool_; + CcRequestPool fetch_range_size_cc_pool_; // CcRequest queue on this shard/core. moodycamel::ConcurrentQueue cc_queue_; diff --git a/tx_service/include/cc/local_cc_handler.h b/tx_service/include/cc/local_cc_handler.h index eae6ba46..7c2ff2fa 100644 --- a/tx_service/include/cc/local_cc_handler.h +++ b/tx_service/include/cc/local_cc_handler.h @@ -103,7 +103,8 @@ class LocalCcHandler : public CcHandler const TxRecord *record, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult &hres) override; + CcHandlerResult &hres, + uint8_t range_size_flags = 0x10) override; CcReqStatus PostRead( uint64_t tx_number, diff --git a/tx_service/include/cc/object_cc_map.h b/tx_service/include/cc/object_cc_map.h index 8f692e2c..0e1d6644 100644 --- a/tx_service/include/cc/object_cc_map.h +++ b/tx_service/include/cc/object_cc_map.h @@ -1558,7 +1558,8 @@ class ObjectCcMap : public TemplateCcMap next_ts_offset = ts_offset; next_status_offset = status_offset; - auto [key_str, rec_str, ts_str, status_str] = *entry_tuples; + auto [key_str, rec_str, ts_str, status_str, flags_str] = + *entry_tuples; // deserialize key decoded_key.Deserialize( key_str.data(), next_key_offset, KeySchema()); @@ -1726,7 +1727,8 @@ class ObjectCcMap : public TemplateCcMap key_offset, rec_offset, ts_offset, - status_offset); + status_offset, + 0); shard_->Enqueue(shard_->LocalCoreId(), &req); return false; } diff --git a/tx_service/include/cc/range_cc_map.h b/tx_service/include/cc/range_cc_map.h index 5b39fd71..41d60d81 100644 --- a/tx_service/include/cc/range_cc_map.h +++ b/tx_service/include/cc/range_cc_map.h @@ -709,7 +709,54 @@ class RangeCcMap : public TemplateCcMap // update previous cce's end key cce->SetCommitTsPayloadStatus(new_range_info->version_ts_, RecordStatus::Normal); + + // Reset new range size on the data table ccmap (emplace if + // absent). + int32_t new_range_id = new_range_info->PartitionId(); + NodeGroupId new_range_owner = + shard_->GetRangeOwner(new_range_id, this->cc_ng_id_) + ->BucketOwner(); + if (new_range_owner == this->cc_ng_id_ && + static_cast(new_range_id % shard_->core_cnt_) == + shard_->core_id_) + { + TableType data_table_type = + TableName::Type(this->table_name_.StringView()); + TableName data_table_name(this->table_name_.StringView(), + data_table_type, + this->table_name_.Engine()); + CcMap *ccm = + shard_->GetCcm(data_table_name, this->cc_ng_id_); + assert(ccm != nullptr); + size_t range_size = new_range_entries.at(idx) + ->TypedStoreRange() + ->PostCkptSize(); + ccm->InitRangeSize(static_cast(new_range_id), + static_cast(range_size), + true, + true); + } } + // Reset old range size on the data table ccmap (no emplace). + int32_t old_partition_id = + upload_range_rec->GetRangeInfo()->PartitionId(); + if (range_owner == this->cc_ng_id_ && + static_cast(old_partition_id % shard_->core_cnt_) == + shard_->core_id_) + { + TableType data_table_type = + TableName::Type(this->table_name_.StringView()); + TableName data_table_name(this->table_name_.StringView(), + data_table_type, + this->table_name_.Engine()); + CcMap *ccm = shard_->GetCcm(data_table_name, this->cc_ng_id_); + assert(ccm != nullptr); + size_t old_range_size = + old_entry->TypedStoreRange()->PostCkptSize(); + ccm->InitRangeSize(static_cast(old_partition_id), + static_cast(old_range_size)); + } + // range_owner_rec_ needs to be reset on each core since they point // to bucket records on different cores. upload_range_rec->range_owner_rec_ = diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index f2a00630..68dc7de7 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -38,6 +38,7 @@ #include #include +#include "absl/container/flat_hash_map.h" #include "cc_entry.h" #include "cc_map.h" #include "cc_page_clean_guard.h" @@ -591,6 +592,8 @@ class TemplateCcMap : public CcMap cce->ArchiveBeforeUpdate(); } + [[maybe_unused]] const size_t old_payload_size = + cce->PayloadSize(); if (is_del) { cce->payload_.SetCurrentPayload(nullptr); @@ -612,6 +615,29 @@ class TemplateCcMap : public CcMap bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(commit_ts, new_status); + if constexpr (RangePartitioned) + { + if (req.NeedUpdateRangeSize()) + { + const int64_t key_delta_size = + (new_status == RecordStatus::Deleted) + ? (-static_cast(write_key->Size() + + old_payload_size)) + : (cce_old_status != RecordStatus::Normal + ? static_cast( + write_key->Size() + + cce->PayloadSize()) + : static_cast( + cce->PayloadSize() - + old_payload_size)); + const uint32_t range_id = req.PartitionId(); + // is_dirty: true when range is splitting. + UpdateRangeSize(range_id, + static_cast(key_delta_size), + req.OnDirtyRange()); + } + } + if (req.IsInitialInsert()) { // Updates the ckpt ts after commit ts is set. @@ -7617,6 +7643,7 @@ class TemplateCcMap : public CcMap auto entry_tuples = req.EntryTuple(); size_t batch_size = req.BatchSize(); size_t start_key_index = req.StartKeyIndex(); + const int32_t partition_id = req.PartitionId(); const TxRecord *req_rec = nullptr; @@ -7626,6 +7653,7 @@ class TemplateCcMap : public CcMap ValueT decoded_rec; uint64_t commit_ts = 0; RecordStatus rec_status = RecordStatus::Normal; + uint8_t range_size_flags = 0; auto &resume_pos = req.GetPausedPosition(shard_->core_id_); size_t key_pos = std::get<0>(resume_pos); @@ -7633,6 +7661,7 @@ class TemplateCcMap : public CcMap size_t rec_offset = std::get<2>(resume_pos); size_t ts_offset = std::get<3>(resume_pos); size_t status_offset = std::get<4>(resume_pos); + size_t flags_offset = std::get<5>(resume_pos); size_t hash = 0; Iterator it; @@ -7645,6 +7674,7 @@ class TemplateCcMap : public CcMap size_t next_rec_offset = 0; size_t next_ts_offset = 0; size_t next_status_offset = 0; + size_t next_flags_offset = 0; for (size_t cnt = 0; key_pos < batch_size && cnt < UploadBatchCc::UploadBatchBatchSize; ++key_pos, ++cnt) @@ -7653,13 +7683,16 @@ class TemplateCcMap : public CcMap next_rec_offset = rec_offset; next_ts_offset = ts_offset; next_status_offset = status_offset; + next_flags_offset = flags_offset; + if (entry_vec != nullptr) { key_idx = start_key_index + key_pos; - // get key - key = entry_vec->at(key_idx)->key_.GetKey(); - // get record - req_rec = entry_vec->at(key_idx)->rec_.get(); + const auto &pair = entry_vec->at(key_idx); + range_size_flags = pair.first; + const WriteEntry *we = pair.second; + key = we->key_.GetKey(); + req_rec = we->rec_.get(); if (req_rec) { rec_status = RecordStatus::Normal; @@ -7671,11 +7704,12 @@ class TemplateCcMap : public CcMap commit_val = nullptr; } // get commit ts - commit_ts = entry_vec->at(key_idx)->commit_ts_; + commit_ts = we->commit_ts_; } else { - auto [key_str, rec_str, ts_str, status_str] = *entry_tuples; + auto [key_str, rec_str, ts_str, status_str, flags_str] = + *entry_tuples; // deserialize key decoded_key.Deserialize( key_str.data(), next_key_offset, KeySchema()); @@ -7698,21 +7732,43 @@ class TemplateCcMap : public CcMap // deserialize commit ts commit_ts = *((uint64_t *) (ts_str.data() + next_ts_offset)); next_ts_offset += sizeof(uint64_t); + if (RangePartitioned) + { + range_size_flags = + static_cast(flags_str[next_flags_offset]); + next_flags_offset += sizeof(uint8_t); + } } - hash = key->Hash(); - size_t core_idx = (hash & 0x3FF) % shard_->core_cnt_; - if (!(core_idx == shard_->core_id_) || commit_ts <= 1) + if (commit_ts <= 1) { - // Skip the key that does not belong to this core or - // commit ts does not greater than 1. Move to next key. + // skip the key that commit ts does not greater than 1. key_offset = next_key_offset; rec_offset = next_rec_offset; ts_offset = next_ts_offset; status_offset = next_status_offset; + if constexpr (RangePartitioned) + { + flags_offset = next_flags_offset; + } continue; } + if constexpr (!RangePartitioned) + { + hash = key->Hash(); + size_t core_idx = (hash & 0x3FF) % shard_->core_cnt_; + if (core_idx != shard_->core_id_) + { + // skip the key that does not belong to this core. + key_offset = next_key_offset; + rec_offset = next_rec_offset; + ts_offset = next_ts_offset; + status_offset = next_status_offset; + continue; + } + } + it = FindEmplace(*key); cce = it->second; cc_page = it.GetPage(); @@ -7744,9 +7800,14 @@ class TemplateCcMap : public CcMap rec_offset = next_rec_offset; ts_offset = next_ts_offset; status_offset = next_status_offset; + if constexpr (RangePartitioned) + { + flags_offset = next_flags_offset; + } continue; } + [[maybe_unused]] const size_t old_payload_size = cce->PayloadSize(); // Now, all versions of non-unique SecondaryIndex key shared // the unpack info in current version's payload, though the // unpack info will not be used for deleted key, we must not @@ -7766,6 +7827,8 @@ class TemplateCcMap : public CcMap } bool was_dirty = cce->IsDirty(); + [[maybe_unused]] const RecordStatus cce_old_status = + cce->PayloadStatus(); cce->SetCommitTsPayloadStatus(commit_ts, rec_status); if (req.Kind() == UploadBatchType::DirtyBucketData) { @@ -7779,6 +7842,26 @@ class TemplateCcMap : public CcMap } cce->SetCkptTs(commit_ts); } + + if constexpr (RangePartitioned) + { + if ((range_size_flags >> 4) != 0) + { + int32_t delta = + (rec_status == RecordStatus::Deleted) + ? -(static_cast(write_key->Size() + + old_payload_size)) + : (cce_old_status != RecordStatus::Normal + ? static_cast(write_key->Size() + + cce->PayloadSize()) + : static_cast(cce->PayloadSize() - + old_payload_size)); + UpdateRangeSize(static_cast(partition_id), + delta, + (range_size_flags & 0x0F) != 0); + } + } + OnCommittedUpdate(cce, was_dirty); OnFlushed(cce, was_dirty); DLOG_IF(INFO, TRACE_OCC_ERR) @@ -7805,6 +7888,10 @@ class TemplateCcMap : public CcMap rec_offset = next_rec_offset; ts_offset = next_ts_offset; status_offset = next_status_offset; + if constexpr (RangePartitioned) + { + flags_offset = next_flags_offset; + } } if (key_pos < batch_size) { @@ -7816,7 +7903,8 @@ class TemplateCcMap : public CcMap key_offset, rec_offset, ts_offset, - status_offset); + status_offset, + flags_offset); shard_->Enqueue(shard_->LocalCoreId(), &req); return false; } @@ -8756,6 +8844,10 @@ class TemplateCcMap : public CcMap } normal_obj_sz_ = 0; + if constexpr (RangePartitioned) + { + range_sizes_.clear(); + } ccmp_.clear(); } @@ -11895,6 +11987,54 @@ class TemplateCcMap : public CcMap return &pos_inf_page_; } + void UpdateRangeSize(uint32_t partition_id, + int32_t delta_size, + bool is_dirty) + { + if constexpr (RangePartitioned) + { + auto it = range_sizes_.find(partition_id); + if (it == range_sizes_.end()) + { + it = range_sizes_ + .emplace(partition_id, + std::make_pair( + static_cast( + RangeSizeStatus::kNotInitialized), + 0)) + .first; + } + if (it->second.first == + static_cast(RangeSizeStatus::kNotInitialized)) + { + // Init the range size of this range. + it->second.first = + static_cast(RangeSizeStatus::kLoading); + + int64_t ng_term = Sharder::Instance().LeaderTerm(cc_ng_id_); + shard_->FetchTableRangeSize(table_name_, + static_cast(partition_id), + cc_ng_id_, + ng_term); + return; + } + + if (it->second.first == + static_cast(RangeSizeStatus::kLoading) || + is_dirty) + { + // Loading or split: record delta in delta part (.second). + it->second.second += delta_size; + } + else + { + assert(delta_size >= 0 || + it->second.first >= static_cast(-delta_size)); + it->second.first += delta_size; + } + } // RangePartitioned + } + absl::btree_map< KeyT, std::unique_ptr< diff --git a/tx_service/include/proto/cc_request.proto b/tx_service/include/proto/cc_request.proto index d02a679f..812bf830 100644 --- a/tx_service/include/proto/cc_request.proto +++ b/tx_service/include/proto/cc_request.proto @@ -176,6 +176,10 @@ message UploadBatchRequest bytes commit_ts = 9; bytes rec_status = 10; UploadBatchKind kind = 11; + // Target range partition; + int32 partition_id = 12; + // Per-key one byte: [uint8_t, ...] + bytes range_size_flags = 13; } message UploadBatchSlicesRequest @@ -918,6 +922,7 @@ message PostCommitRequest { bytes record = 5; uint32 operation_type = 6; uint32 key_shard_code = 7; + uint32 range_size_flags = 8; } message ForwardPostCommitRequest { diff --git a/tx_service/include/read_write_entry.h b/tx_service/include/read_write_entry.h index 4d86c34c..ffd53533 100644 --- a/tx_service/include/read_write_entry.h +++ b/tx_service/include/read_write_entry.h @@ -49,7 +49,8 @@ struct WriteSetEntry op_(other.op_), cce_addr_(other.cce_addr_), key_shard_code_(other.key_shard_code_), - forward_addr_(std::move(other.forward_addr_)) + forward_addr_(std::move(other.forward_addr_)), + range_size_flags_(other.range_size_flags_) { } @@ -60,6 +61,7 @@ struct WriteSetEntry cce_addr_ = other.cce_addr_; key_shard_code_ = other.key_shard_code_; forward_addr_ = std::move(other.forward_addr_); + range_size_flags_ = other.range_size_flags_; return *this; } @@ -70,6 +72,9 @@ struct WriteSetEntry uint32_t key_shard_code_{}; // Used in double write scenarios during online DDL. std::unordered_map forward_addr_; + // High 4 bits: need update range size; low 4 bits: on dirty (splitting) + // range. + uint8_t range_size_flags_{0x10}; }; /** diff --git a/tx_service/include/remote/remote_cc_handler.h b/tx_service/include/remote/remote_cc_handler.h index 83695f21..4ef859d4 100644 --- a/tx_service/include/remote/remote_cc_handler.h +++ b/tx_service/include/remote/remote_cc_handler.h @@ -84,7 +84,8 @@ class RemoteCcHandler const TxRecord *record, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult &hres); + CcHandlerResult &hres, + uint8_t range_size_flags = 0x10); void PostWriteAll(uint32_t src_node_id, const TableName &table_name, diff --git a/tx_service/include/sk_generator.h b/tx_service/include/sk_generator.h index 050d6b27..b33941e8 100644 --- a/tx_service/include/sk_generator.h +++ b/tx_service/include/sk_generator.h @@ -40,8 +40,11 @@ class UploadIndexContext public: using TableIndexSet = std::unordered_map>; - using NGIndexSet = - std::unordered_map>; + // ng_id -> (range_id -> vector of (range_size_flags, WriteEntry*)) + using NGIndexSet = std::unordered_map< + NodeGroupId, + std::unordered_map>>>; private: enum struct UploadTaskStatus @@ -101,16 +104,18 @@ class UploadIndexContext CcErrorCode UploadEncodedIndex(UploadIndexTask &upload_task); CcErrorCode UploadIndexInternal( std::unordered_map &ng_index_set); - void SendIndexes(const TableName &table_name, - NodeGroupId dest_ng_id, - int64_t &ng_term, - const std::vector &write_entry_vec, - size_t batch_size, - size_t start_key_idx, - bthread::Mutex &req_mux, - bthread::ConditionVariable &req_cv, - size_t &finished_req_cnt, - CcErrorCode &res_code); + void SendIndexes( + const TableName &table_name, + NodeGroupId dest_ng_id, + int64_t &ng_term, + int32_t partition_id, + const std::vector> &write_entry_vec, + size_t batch_size, + size_t start_key_idx, + bthread::Mutex &req_mux, + bthread::ConditionVariable &req_cv, + size_t &finished_req_cnt, + CcErrorCode &res_code); // Acquire and release range read lock. CcErrorCode AcquireRangeReadLocks( TransactionExecution *acq_lock_txm, diff --git a/tx_service/include/store/data_store_handler.h b/tx_service/include/store/data_store_handler.h index 298d0871..344c0fe1 100644 --- a/tx_service/include/store/data_store_handler.h +++ b/tx_service/include/store/data_store_handler.h @@ -134,6 +134,8 @@ class DataStoreHandler virtual void FetchRangeSlices(FetchRangeSlicesReq *fetch_cc) = 0; + virtual void FetchTableRangeSize(FetchTableRangeSizeCc *fetch_cc) = 0; + /** * @brief Read a row from base table or skindex table in datastore with * specified key. Caller should pass in complete primary key or skindex key. diff --git a/tx_service/include/type.h b/tx_service/include/type.h index 2fe288c5..566e4171 100644 --- a/tx_service/include/type.h +++ b/tx_service/include/type.h @@ -167,6 +167,13 @@ enum class TableEngine : uint8_t InternalHash = 5, // eg. Sequence table is a kind of internal hash table. }; +// Status values for range_sizes_.first (range size not yet known). +enum RangeSizeStatus : int32_t +{ + kNotInitialized = -2, // Range size not yet initialized; need to fetch. + kLoading = -1, // Range size is being loaded; delta goes to .second. +}; + inline std::string KvTablePrefixOf(TableEngine engine) { switch (engine) diff --git a/tx_service/src/cc/cc_map.cpp b/tx_service/src/cc/cc_map.cpp index 52443b45..fc4cf4dc 100644 --- a/tx_service/src/cc/cc_map.cpp +++ b/tx_service/src/cc/cc_map.cpp @@ -27,6 +27,7 @@ #include "cc/local_cc_shards.h" #include "cc_entry.h" #include "tx_trace.h" +#include "type.h" namespace txservice { @@ -461,4 +462,32 @@ void CcMap::DecrReadIntent(NonBlockingLock *lock, } } +void CcMap::InitRangeSize(uint32_t partition_id, + int32_t persisted_size, + bool succeed, + bool emplace) +{ + auto it = range_sizes_.find(partition_id); + if (it == range_sizes_.end()) + { + if (!emplace) + { + return; + } + it = range_sizes_.emplace(partition_id, std::make_pair(0, 0)).first; + } + if (succeed) + { + int32_t final_size = persisted_size + it->second.second; + it->second.first = final_size < 0 ? 0 : final_size; + it->second.second = 0; + } + else + { + // Load range size failed; reset to not-initialized for retry. + it->second.first = + static_cast(RangeSizeStatus::kNotInitialized); + } +} + } // namespace txservice diff --git a/tx_service/src/cc/cc_req_misc.cpp b/tx_service/src/cc/cc_req_misc.cpp index ead420ac..26f7bc57 100644 --- a/tx_service/src/cc/cc_req_misc.cpp +++ b/tx_service/src/cc/cc_req_misc.cpp @@ -1531,4 +1531,49 @@ bool ShardCleanCc::Execute(CcShard &ccs) } } +void FetchTableRangeSizeCc::Reset(const TableName &table_name, + int32_t partition_id, + const TxKey &start_key, + CcShard *ccs, + NodeGroupId ng_id, + int64_t ng_term) +{ + table_name_ = &table_name; + partition_id_ = partition_id; + start_key_ = start_key.GetShallowCopy(); + node_group_id_ = ng_id; + node_group_term_ = ng_term; + ccs_ = ccs; + error_code_ = 0; + store_range_size_ = 0; +} + +bool FetchTableRangeSizeCc::ValidTermCheck() +{ + int64_t ng_leader_term = Sharder::Instance().LeaderTerm(node_group_id_); + return ng_leader_term == node_group_term_; +} + +bool FetchTableRangeSizeCc::Execute(CcShard &ccs) +{ + if (!ValidTermCheck()) + { + error_code_ = static_cast(CcErrorCode::NG_TERM_CHANGED); + } + + bool succ = (error_code_ == 0); + CcMap *ccm = ccs.GetCcm(*table_name_, node_group_id_); + assert(ccm != nullptr); + ccm->InitRangeSize( + static_cast(partition_id_), store_range_size_, succ); + + return true; +} + +void FetchTableRangeSizeCc::SetFinish(uint32_t error) +{ + error_code_ = error; + ccs_->Enqueue(this); +} + } // namespace txservice diff --git a/tx_service/src/cc/cc_shard.cpp b/tx_service/src/cc/cc_shard.cpp index c61cc0dc..258e27e3 100644 --- a/tx_service/src/cc/cc_shard.cpp +++ b/tx_service/src/cc/cc_shard.cpp @@ -377,6 +377,26 @@ CcMap *CcShard::GetCcm(const TableName &table_name, uint32_t node_group) } } +void CcShard::FetchTableRangeSize(const TableName &table_name, + int32_t partition_id, + NodeGroupId cc_ng_id, + int64_t cc_ng_term) +{ + FetchTableRangeSizeCc *fetch_cc = fetch_range_size_cc_pool_.NextRequest(); + + const TableName range_table_name(table_name.StringView(), + TableType::RangePartition, + table_name.Engine()); + const TableRangeEntry *range_entry = + GetTableRangeEntry(range_table_name, cc_ng_id, partition_id); + assert(range_entry != nullptr); + TxKey start_key = range_entry->GetRangeInfo()->StartTxKey(); + + fetch_cc->Reset( + table_name, partition_id, start_key, this, cc_ng_id, cc_ng_term); + local_shards_.store_hd_->FetchTableRangeSize(fetch_cc); +} + void CcShard::AdjustDataKeyStats(const TableName &table_name, int64_t size_delta, int64_t dirty_delta) diff --git a/tx_service/src/cc/local_cc_handler.cpp b/tx_service/src/cc/local_cc_handler.cpp index 9dd7962d..fc753167 100644 --- a/tx_service/src/cc/local_cc_handler.cpp +++ b/tx_service/src/cc/local_cc_handler.cpp @@ -274,7 +274,8 @@ txservice::CcReqStatus txservice::LocalCcHandler::PostWrite( const TxRecord *record, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult &hres) + CcHandlerResult &hres, + uint8_t range_size_flags) { uint32_t ng_id = cce_addr.NodeGroupId(); uint32_t dest_node_id = Sharder::Instance().LeaderNodeId(ng_id); @@ -293,7 +294,8 @@ txservice::CcReqStatus txservice::LocalCcHandler::PostWrite( record, operation_type, key_shard_code, - &hres); + &hres, + range_size_flags); TX_TRACE_ACTION(this, req); TX_TRACE_DUMP(req); cc_shards_.EnqueueCcRequest(thd_id_, cce_addr.CoreId(), req); @@ -312,7 +314,8 @@ txservice::CcReqStatus txservice::LocalCcHandler::PostWrite( record, operation_type, key_shard_code, - hres); + hres, + range_size_flags); } return req_status; } diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index 2c1c9a76..daf203a6 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -4925,6 +4925,7 @@ void LocalCcShards::DataSyncForHashPartition( req_ptr = upload_batch_closure->UploadBatchRequest(); req_ptr->set_node_group_id(dest_ng); req_ptr->set_node_group_term(-1); + req_ptr->set_partition_id(-1); req_ptr->set_table_name_str(table_name.String()); req_ptr->set_table_type( remote::ToRemoteType::ConvertTableType( diff --git a/tx_service/src/remote/cc_node_service.cpp b/tx_service/src/remote/cc_node_service.cpp index 872ed273..9d2da758 100644 --- a/tx_service/src/remote/cc_node_service.cpp +++ b/tx_service/src/remote/cc_node_service.cpp @@ -1172,6 +1172,7 @@ void CcNodeService::UploadBatch( NodeGroupId ng_id = request->node_group_id(); int64_t ng_term = request->node_group_term(); + int32_t partition_id = request->partition_id(); std::string_view table_name_sv{request->table_name_str()}; TableType table_type = @@ -1199,14 +1200,15 @@ void CcNodeService::UploadBatch( << " for table:" << table_name.Trace(); LocalCcShards *cc_shards = Sharder::Instance().GetLocalCcShards(); - size_t core_cnt = cc_shards->Count(); + size_t core_cnt = (partition_id >= 0) ? 1 : cc_shards->Count(); uint32_t batch_size = request->batch_size(); auto write_entry_tuple = UploadBatchCc::WriteEntryTuple(request->keys(), request->records(), request->commit_ts(), - request->rec_status()); + request->rec_status(), + request->range_size_flags()); size_t finished_req = 0; bthread::Mutex req_mux; @@ -1217,6 +1219,7 @@ void CcNodeService::UploadBatch( req.Reset(table_name, ng_id, ng_term, + partition_id, core_cnt, batch_size, write_entry_tuple, @@ -1224,9 +1227,18 @@ void CcNodeService::UploadBatch( req_cv, finished_req, data_type); - for (size_t core = 0; core < core_cnt; ++core) + if (partition_id >= 0) { - cc_shards->EnqueueToCcShard(core, &req); + uint16_t dest_core = + static_cast(partition_id % cc_shards->Count()); + cc_shards->EnqueueToCcShard(dest_core, &req); + } + else + { + for (size_t core = 0; core < cc_shards->Count(); ++core) + { + cc_shards->EnqueueToCcShard(core, &req); + } } { diff --git a/tx_service/src/remote/remote_cc_handler.cpp b/tx_service/src/remote/remote_cc_handler.cpp index 848ae8f7..517fe421 100644 --- a/tx_service/src/remote/remote_cc_handler.cpp +++ b/tx_service/src/remote/remote_cc_handler.cpp @@ -159,7 +159,8 @@ void txservice::remote::RemoteCcHandler::PostWrite( const TxRecord *record, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult &hres) + CcHandlerResult &hres, + uint8_t range_size_flags) { CcMessage send_msg; @@ -194,6 +195,7 @@ void txservice::remote::RemoteCcHandler::PostWrite( post_commit->set_commit_ts(commit_ts); post_commit->set_operation_type(static_cast(operation_type)); post_commit->set_key_shard_code(key_shard_code); + post_commit->set_range_size_flags(range_size_flags); stream_sender_.SendMessageToNg(cce_addr.NodeGroupId(), send_msg, &hres); } diff --git a/tx_service/src/remote/remote_cc_request.cpp b/tx_service/src/remote/remote_cc_request.cpp index 32fbb935..cf9c7f5c 100644 --- a/tx_service/src/remote/remote_cc_request.cpp +++ b/tx_service/src/remote/remote_cc_request.cpp @@ -594,7 +594,8 @@ void txservice::remote::RemotePostWrite::Reset( rec_str, static_cast(post_commit.operation_type()), post_commit.key_shard_code(), - &cc_res_); + &cc_res_, + static_cast(post_commit.range_size_flags())); } else { diff --git a/tx_service/src/sk_generator.cpp b/tx_service/src/sk_generator.cpp index e3fc928e..9bf3b40f 100644 --- a/tx_service/src/sk_generator.cpp +++ b/tx_service/src/sk_generator.cpp @@ -680,37 +680,41 @@ CcErrorCode UploadIndexContext::UploadIndexInternal( size_t finished_upload_count = 0; CcErrorCode upload_res_code = CcErrorCode::NO_ERROR; size_t upload_req_count = 0; + for (auto &[table_name, ng_entries] : ng_index_set) { - for (auto &[ng_id, entry_vec] : ng_entries) + for (auto &[ng_id, range_entries] : ng_entries) { - entry_vec_size = entry_vec.size(); - batch_req_cnt = (entry_vec_size / upload_batch_size_ + - (entry_vec_size % upload_batch_size_ ? 1 : 0)); - int64_t &expected_term = leader_terms_.at(ng_id); - size_t start_idx = 0; - size_t end_idx = - (batch_req_cnt > 1 ? upload_batch_size_ : entry_vec_size); - for (size_t idx = 0; idx < batch_req_cnt; ++idx) + for (auto &[range_id, entry_vec] : range_entries) { - SendIndexes(table_name, - ng_id, - expected_term, - entry_vec, - (end_idx - start_idx), - start_idx, - req_mux, - req_cv, - finished_upload_count, - upload_res_code); - ++upload_req_count; - // Next batch - start_idx = end_idx; - end_idx = ((start_idx + upload_batch_size_) > entry_vec_size - ? entry_vec_size - : (start_idx + upload_batch_size_)); + entry_vec_size = entry_vec.size(); + batch_req_cnt = (entry_vec_size / upload_batch_size_ + + (entry_vec_size % upload_batch_size_ ? 1 : 0)); + + size_t start_idx = 0; + size_t end_idx = + (batch_req_cnt > 1 ? upload_batch_size_ : entry_vec_size); + for (size_t idx = 0; idx < batch_req_cnt; ++idx) + { + SendIndexes(table_name, + ng_id, + expected_term, + range_id, + entry_vec, + (end_idx - start_idx), + start_idx, + req_mux, + req_cv, + finished_upload_count, + upload_res_code); + ++upload_req_count; + start_idx = end_idx; + end_idx = ((start_idx + upload_batch_size_) > entry_vec_size + ? entry_vec_size + : (start_idx + upload_batch_size_)); + } } } } @@ -730,7 +734,8 @@ void UploadIndexContext::SendIndexes( const TableName &table_name, NodeGroupId dest_ng_id, int64_t &ng_term, - const std::vector &write_entry_vec, + int32_t partition_id, + const std::vector> &write_entry_vec, size_t batch_size, size_t start_key_idx, bthread::Mutex &req_mux, @@ -740,14 +745,13 @@ void UploadIndexContext::SendIndexes( { uint32_t dest_node_id = Sharder::Instance().LeaderNodeId(dest_ng_id); LocalCcShards *cc_shards = Sharder::Instance().GetLocalCcShards(); - size_t core_cnt = cc_shards->Count(); if (dest_node_id == cc_shards->NodeId()) { UploadBatchCc *req_ptr = NextRequest(); req_ptr->Reset(table_name, dest_ng_id, ng_term, - core_cnt, + partition_id, batch_size, start_key_idx, write_entry_vec, @@ -757,10 +761,9 @@ void UploadIndexContext::SendIndexes( res_code, UploadBatchType::SkIndexData); - for (size_t core = 0; core < core_cnt; ++core) - { - cc_shards->EnqueueToCcShard(core, req_ptr); - } + uint16_t dest_core = + static_cast(partition_id % cc_shards->Count()); + cc_shards->EnqueueToCcShard(dest_core, req_ptr); } else { @@ -834,6 +837,7 @@ void UploadIndexContext::SendIndexes( remote::ToRemoteType::ConvertTableType(table_name.Type())); req_ptr->set_table_engine( remote::ToRemoteType::ConvertTableEngine(table_name.Engine())); + req_ptr->set_partition_id(partition_id); size_t end_key_idx = start_key_idx + batch_size; req_ptr->set_kind(remote::UploadBatchKind::SK_DATA); req_ptr->set_batch_size(batch_size); @@ -853,15 +857,24 @@ void UploadIndexContext::SendIndexes( std::string *rec_status_str = req_ptr->mutable_rec_status(); // All generated sk should be normal status. const RecordStatus rec_status = RecordStatus::Normal; + // range_size_flags + req_ptr->clear_range_size_flags(); + std::string *range_size_flags_str = req_ptr->mutable_range_size_flags(); + for (size_t idx = start_key_idx; idx < end_key_idx; ++idx) { - write_entry_vec.at(idx)->key_.Serialize(*keys_str); - write_entry_vec.at(idx)->rec_->Serialize(*recs_str); - val_ptr = reinterpret_cast( - &(write_entry_vec.at(idx)->commit_ts_)); + uint8_t range_size_flags = write_entry_vec.at(idx).first; + WriteEntry *write_entry = write_entry_vec.at(idx).second; + write_entry->key_.Serialize(*keys_str); + write_entry->rec_->Serialize(*recs_str); + val_ptr = + reinterpret_cast(&(write_entry->commit_ts_)); commit_ts_str->append(val_ptr, len_sizeof); rec_status_str->append(reinterpret_cast(&rec_status), sizeof(rec_status)); + range_size_flags_str->append( + reinterpret_cast(&range_size_flags), + sizeof(range_size_flags)); } brpc::Controller *cntl_ptr = upload_batch_closure->Controller(); @@ -989,17 +1002,24 @@ void UploadIndexContext::AdvanceWriteEntryForRangeInfo( size_t new_range_idx = 0; auto *range_info = range_record.GetRangeInfo(); + const int32_t range_id = range_info->PartitionId(); + const uint8_t default_flags = + 0x10 | static_cast(range_info->IsDirty()); while (cur_write_entry_it != next_range_start) { WriteEntry &write_entry = *cur_write_entry_it; - auto ng_it = ng_write_entrys.try_emplace(range_ng); - ng_it.first->second.push_back(&write_entry); + auto &range_vec = ng_write_entrys[range_ng][range_id]; + range_vec.emplace_back(default_flags, &write_entry); + uint8_t *old_range_flags_ptr = &range_vec.back().first; + + uint8_t *new_bucket_flags_ptr = nullptr; // If current range is migrating, forward to new range owner. if (new_bucket_ng != UINT32_MAX) { - ng_write_entrys.try_emplace(new_bucket_ng) - .first->second.push_back(&write_entry); + auto &new_bucket_vec = ng_write_entrys[new_bucket_ng][range_id]; + new_bucket_vec.emplace_back(default_flags, &write_entry); + new_bucket_flags_ptr = &new_bucket_vec.back().first; } // If range is splitting and the key will fall on a new range after @@ -1016,18 +1036,25 @@ void UploadIndexContext::AdvanceWriteEntryForRangeInfo( } if (new_range_ng != UINT32_MAX) { - if (new_range_ng != range_ng) - { - ng_write_entrys.try_emplace(new_range_ng) - .first->second.push_back(&write_entry); - } + const int32_t new_range_id = + range_info->NewPartitionId()->at(new_range_idx - 1); + + ng_write_entrys[new_range_ng][new_range_id].emplace_back( + default_flags, &write_entry); + // Only update range size on the new range + *old_range_flags_ptr &= 0x0F; + // If the new range is migrating, forward to the new owner of new // range. if (new_range_new_bucket_ng != UINT32_MAX && new_range_new_bucket_ng != range_ng) { - ng_write_entrys.try_emplace(new_range_new_bucket_ng) - .first->second.push_back(&write_entry); + ng_write_entrys[new_range_new_bucket_ng][new_range_id] + .emplace_back(default_flags, &write_entry); + if (new_bucket_flags_ptr) + { + *new_bucket_flags_ptr &= 0x0F; + } } } diff --git a/tx_service/src/tx_execution.cpp b/tx_service/src/tx_execution.cpp index 24553f56..9d7b4183 100644 --- a/tx_service/src/tx_execution.cpp +++ b/tx_service/src/tx_execution.cpp @@ -5297,22 +5297,24 @@ void TransactionExecution::Process(PostProcessOp &post_process) write_entry.rec_.get(), write_entry.op_, write_entry.key_shard_code_, - post_process.hd_result_); + post_process.hd_result_, + write_entry.range_size_flags_); update_post_cnt(ret); for (auto &[forward_shard_code, cce_addr] : write_entry.forward_addr_) { - CcReqStatus ret = - cc_handler_->PostWrite(tx_number, - tx_term_, - command_id, - commit_ts_, - cce_addr, - write_entry.rec_.get(), - write_entry.op_, - forward_shard_code, - post_process.hd_result_); + CcReqStatus ret = cc_handler_->PostWrite( + tx_number, + tx_term_, + command_id, + commit_ts_, + cce_addr, + write_entry.rec_.get(), + write_entry.op_, + forward_shard_code, + post_process.hd_result_, + (0x10 | write_entry.range_size_flags_)); update_post_cnt(ret); } } diff --git a/tx_service/src/tx_operation.cpp b/tx_service/src/tx_operation.cpp index 663d50c1..25574868 100644 --- a/tx_service/src/tx_operation.cpp +++ b/tx_service/src/tx_operation.cpp @@ -739,6 +739,8 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) WriteSetEntry &write_entry = write_key_it_->second; size_t hash = write_tx_key.Hash(); write_entry.key_shard_code_ = (range_ng << 10) | (hash & 0x3FF); + write_entry.range_size_flags_ = + 0x10 | static_cast(range_info->IsDirty()); // If current range is migrating, forward to new range owner. if (new_bucket_ng != UINT32_MAX) { @@ -765,6 +767,8 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) { write_entry.forward_addr_.try_emplace((new_range_ng << 10) | (hash & 0x3FF)); + write_entry.range_size_flags_ = + 0x0F & write_entry.range_size_flags_; } // If the new range is migrating, forward to the new owner of // new range.