From 6b5c40e94090657b4195b6c7ad52b89917731333 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Tue, 17 Mar 2026 15:31:28 +0800 Subject: [PATCH 1/6] Update data migration opration forward 1. Adapt kickoutdata operation with new key sharding logic for range partition 2. Fix the execution flow: after processing the range partition table, the hash partition table needs to be checked. 3. Simplify code using a unified interface. --- tx_service/include/cc/template_cc_map.h | 7 +- tx_service/src/cc/local_cc_handler.cpp | 13 ++-- tx_service/src/tx_operation.cpp | 93 ++++++++----------------- 3 files changed, 41 insertions(+), 72 deletions(-) diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 136b9b00..9cc814a5 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -6899,10 +6899,11 @@ class TemplateCcMap : public CcMap uint16_t pause_idx = shard_->core_id_; CleanType clean_type = req.GetCleanType(); if (clean_type == CleanType::CleanBucketData || - clean_type == CleanType::CleanRangeData) + clean_type == CleanType::CleanRangeData || + clean_type == CleanType::CleanRangeDataForMigration) { - // For clean bucket data and range data, cc req is only sent to 1 - // core. + // For clean bucket data and range data (for data migration), cc req + // is only sent to 1 core. pause_idx = 0; } if (req.ResumeKey(pause_idx)->KeyPtr() != nullptr) diff --git a/tx_service/src/cc/local_cc_handler.cpp b/tx_service/src/cc/local_cc_handler.cpp index 60c5a33e..d0b2284c 100644 --- a/tx_service/src/cc/local_cc_handler.cpp +++ b/tx_service/src/cc/local_cc_handler.cpp @@ -1901,10 +1901,12 @@ void txservice::LocalCcHandler::KickoutData(const TableName &table_name, KickoutCcEntryCc *req = kickout_ccentry_pool_.NextRequest(); // For hash partition, all data in a single bucket should be hashed to // the same core. - uint16_t core_cnt = (clean_type == CleanType::CleanBucketData || - clean_type == CleanType::CleanRangeData) - ? 1 - : Sharder::Instance().GetLocalCcShardsCount(); + uint16_t core_cnt = + (clean_type == CleanType::CleanBucketData || + clean_type == CleanType::CleanRangeData || + clean_type == CleanType::CleanRangeDataForMigration) + ? 1 + : Sharder::Instance().GetLocalCcShardsCount(); req->Reset(table_name, ng_id, &hres, @@ -1929,7 +1931,8 @@ void txservice::LocalCcHandler::KickoutData(const TableName &table_name, Sharder::Instance().ShardBucketIdToCoreIdx((*bucket_id)[0]), req); } - else if (clean_type == CleanType::CleanRangeData) + else if (clean_type == CleanType::CleanRangeData || + clean_type == CleanType::CleanRangeDataForMigration) { assert(range_id != INT32_MAX); uint16_t dest_core = static_cast( diff --git a/tx_service/src/tx_operation.cpp b/tx_service/src/tx_operation.cpp index 275309ae..b45d3316 100644 --- a/tx_service/src/tx_operation.cpp +++ b/tx_service/src/tx_operation.cpp @@ -7847,20 +7847,8 @@ void DataMigrationOp::Forward(TransactionExecution *txm) // Table name is ranges_in_bucket_snapshot_ is of type range // partition, need to convert it first. - TableType type; - if (TableName::IsBase(kickout_range_tbl_it_->first.StringView())) - { - type = TableType::Primary; - } - else if (TableName::IsUniqueSecondary( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::UniqueSecondary; - } - else - { - type = TableType::Secondary; - } + TableType type = + TableName::Type(kickout_range_tbl_it_->first.StringView()); kickout_range_table_ = TableName{kickout_range_tbl_it_->first.StringView(), type, @@ -7892,21 +7880,8 @@ void DataMigrationOp::Forward(TransactionExecution *txm) break; } - TableType type; - if (TableName::IsBase( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::Primary; - } - else if (TableName::IsUniqueSecondary( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::UniqueSecondary; - } - else - { - type = TableType::Secondary; - } + TableType type = TableName::Type( + kickout_range_tbl_it_->first.StringView()); kickout_range_table_ = TableName{kickout_range_tbl_it_->first.StringView(), type, @@ -7978,28 +7953,31 @@ void DataMigrationOp::Forward(TransactionExecution *txm) if (++kickout_range_tbl_it_ == ranges_in_bucket_snapshot_.cend()) { - LOG(INFO) << "Data migration: post write all" - << ", txn: " << txm->TxNumber(); - post_all_bucket_lock_op_.write_type_ = - PostWriteType::PostCommit; - ForwardToSubOperation(txm, &post_all_bucket_lock_op_); + // Try handle hash partitioned tables + if (kickout_hash_partitioned_tbl_it_ == + hash_partitioned_tables_snapshot_.cend()) + { + LOG(INFO) << "Data migration: post write all" + << ", txn: " << txm->TxNumber(); + post_all_bucket_lock_op_.write_type_ = + PostWriteType::PostCommit; + ForwardToSubOperation(txm, &post_all_bucket_lock_op_); + return; + } + kickout_data_op_.node_group_ = txm->TxCcNodeId(); + kickout_data_op_.table_name_ = + &(*kickout_hash_partitioned_tbl_it_); + kickout_data_op_.start_key_ = TxKey(); + kickout_data_op_.end_key_ = TxKey(); + kickout_data_op_.bucket_ids_ = + &status_->bucket_ids_[migrate_bucket_idx_]; + // Check if the key is hashed to this bucket + kickout_data_op_.clean_type_ = CleanType::CleanBucketData; + ForwardToSubOperation(txm, &kickout_data_op_); return; } - TableType type; - if (TableName::IsBase( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::Primary; - } - else if (TableName::IsUniqueSecondary( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::UniqueSecondary; - } - else - { - type = TableType::Secondary; - } + TableType type = + TableName::Type(kickout_range_tbl_it_->first.StringView()); kickout_range_table_ = TableName{kickout_range_tbl_it_->first.StringView(), type, @@ -8031,21 +8009,8 @@ void DataMigrationOp::Forward(TransactionExecution *txm) // Move to hash partitioned tables break; } - TableType type; - if (TableName::IsBase( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::Primary; - } - else if (TableName::IsUniqueSecondary( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::UniqueSecondary; - } - else - { - type = TableType::Secondary; - } + TableType type = TableName::Type( + kickout_range_tbl_it_->first.StringView()); kickout_range_table_ = TableName{kickout_range_tbl_it_->first.StringView(), type, From 769953f8aba1293e0955d76a8d128a51fa9a2f80 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Tue, 17 Mar 2026 16:08:04 +0800 Subject: [PATCH 2/6] Remove range size info during kickoutdata for data migration --- tx_service/include/cc/cc_request.h | 5 +++++ tx_service/include/cc/template_cc_map.h | 16 ++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index c5c462d5..47ebfce8 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -6373,6 +6373,11 @@ struct KickoutCcEntryCc : public TemplatedCcRequest return clean_type_; } + int32_t GetPartitionId() const + { + return range_id_; + } + private: CleanType clean_type_; // Target buckets to be cleaned if clean type is CleanBucketData. diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 9cc814a5..985fef6f 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -6983,6 +6983,14 @@ class TemplateCcMap : public CcMap if (ccp == &pos_inf_page_ || !(ccp->FirstKey() < *end_key)) { + if (req.GetCleanType() == CleanType::CleanRangeDataForMigration) + { + // For data migration, we need to delete the range size info + // for the range that has been migrated to other node group. + int32_t partition_id = req.GetPartitionId(); + assert(partition_id != INT32_MAX); + RemoveRangeSize(static_cast(partition_id)); + } return req.SetFinish(); } else @@ -11552,6 +11560,14 @@ class TemplateCcMap : public CcMap return false; } + void RemoveRangeSize(uint32_t partition_id) + { + if constexpr (RangePartitioned) + { + range_sizes_.erase(partition_id); + } + } + absl::btree_map< KeyT, std::unique_ptr< From d686176ffbc56b8f8113c8e275971916586211cc Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Tue, 17 Mar 2026 17:26:46 +0800 Subject: [PATCH 3/6] Load range size after the data migration finished. --- tx_service/include/cc/cc_map.h | 2 + tx_service/include/cc/range_bucket_cc_map.h | 46 +++++++++++++++++++- tx_service/include/range_bucket_key_record.h | 23 ++++++++++ tx_service/src/cc/cc_map.cpp | 28 ++++++++++++ 4 files changed, 98 insertions(+), 1 deletion(-) diff --git a/tx_service/include/cc/cc_map.h b/tx_service/include/cc/cc_map.h index 9aaa8c58..234c9332 100644 --- a/tx_service/include/cc/cc_map.h +++ b/tx_service/include/cc/cc_map.h @@ -276,6 +276,8 @@ class CcMap void ResetRangeStatus(uint32_t partition_id); + void LoadStoreRangeSize(uint32_t partition_id); + uint64_t SchemaTs() const { return schema_ts_; diff --git a/tx_service/include/cc/range_bucket_cc_map.h b/tx_service/include/cc/range_bucket_cc_map.h index 6805aa5f..a950d097 100644 --- a/tx_service/include/cc/range_bucket_cc_map.h +++ b/tx_service/include/cc/range_bucket_cc_map.h @@ -356,8 +356,52 @@ class RangeBucketCcMap } } upload_bucket_rec->SetBucketInfo(bucket_info); + + // Get ranges in this bucket id. + auto range_ids = shard_->local_shards_.GetRangesInBucket( + target_key->bucket_id_, this->cc_ng_id_); + const_cast(bucket_info) + ->SetRangesInBucket(std::move(range_ids)); } - } + + if (upload_bucket_rec->GetBucketInfo()->BucketOwner() == + this->cc_ng_id_) + { + // Init the rane size info for the ranges that are being + // migrated to the new owner node group. + auto ranges_in_bucket = + upload_bucket_rec->GetBucketInfo()->GetRangesInBucket(); + for (auto &[tbl, range_ids] : ranges_in_bucket) + { + CcMap *data_ccm = shard_->GetCcm(tbl, this->cc_ng_id_); + if (data_ccm == nullptr) + { + continue; + } + + for (auto range_id : range_ids) + { + if ((range_id & 0x3FF) % shard_->core_cnt_ == + shard_->core_id_) + { + // The shard is the owner of the range. We should + // load the store range size as the baseline for + // this range. + data_ccm->LoadStoreRangeSize( + static_cast(range_id)); + } + } + } + } // range partition + + if (shard_->core_id_ == shard_->core_cnt_ - 1) + { + // All ranges in this bucket have been migrated to the new owner + // node group. We can now clear the ranges in bucket info. + const_cast(upload_bucket_rec->GetBucketInfo()) + ->ClearRangesInBucket(); + } + } // post commit or commit else { assert(req.CommitType() == PostWriteType::DowngradeLock); diff --git a/tx_service/include/range_bucket_key_record.h b/tx_service/include/range_bucket_key_record.h index 30bfbb29..57c5d1f5 100644 --- a/tx_service/include/range_bucket_key_record.h +++ b/tx_service/include/range_bucket_key_record.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -147,6 +148,24 @@ struct BucketInfo dirty_version_ = 0; } + void SetRangesInBucket( + std::unordered_map> + &&ranges_in_bucket) + { + ranges_in_bucket_snapshot_ = std::move(ranges_in_bucket); + } + + const std::unordered_map> & + GetRangesInBucket() const + { + return ranges_in_bucket_snapshot_; + } + + void ClearRangesInBucket() + { + ranges_in_bucket_snapshot_.clear(); + } + private: NodeGroupId bucket_owner_{UINT32_MAX}; uint64_t version_{0}; @@ -163,6 +182,10 @@ struct BucketInfo // We use atomic here since it might be updated by any tx processor without // bucket write lock. std::atomic_bool accepts_upload_batch_{false}; + // The ranges in this bucket. Only used during the post commit phase of the + // data migration. + std::unordered_map> + ranges_in_bucket_snapshot_; friend struct RangeBucketRecord; friend class RangeBucketCcMap; }; diff --git a/tx_service/src/cc/cc_map.cpp b/tx_service/src/cc/cc_map.cpp index ede1962c..194106ba 100644 --- a/tx_service/src/cc/cc_map.cpp +++ b/tx_service/src/cc/cc_map.cpp @@ -515,4 +515,32 @@ void CcMap::ResetRangeStatus(uint32_t partition_id) << " status: " << std::boolalpha << std::get<2>(it->second); } +void CcMap::LoadStoreRangeSize(uint32_t partition_id) +{ + auto it = range_sizes_.find(partition_id); + if (it == range_sizes_.end()) + { + it = range_sizes_ + .emplace(partition_id, + std::make_tuple(static_cast( + RangeSizeStatus::kNotInitialized), + 0, + false)) + .first; + } + + if (std::get<0>(it->second) == + static_cast(RangeSizeStatus::kLoading)) + { + // Another request is already loading the range size. + return; + } + + std::get<0>(it->second) = static_cast(RangeSizeStatus::kLoading); + + int64_t ng_term = Sharder::Instance().LeaderTerm(cc_ng_id_); + shard_->FetchTableRangeSize( + table_name_, static_cast(partition_id), cc_ng_id_, ng_term); +} + } // namespace txservice From 92f2a35d7da4f6e5c3310ec5789a508e753766da Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Tue, 17 Mar 2026 17:58:42 +0800 Subject: [PATCH 4/6] Revert "Load range size after the data migration finished." This reverts commit 866931f4fd1e179108c69fc233e661b1c9dd8caa. --- tx_service/include/cc/cc_map.h | 2 - tx_service/include/cc/range_bucket_cc_map.h | 46 +------------------- tx_service/include/range_bucket_key_record.h | 23 ---------- tx_service/src/cc/cc_map.cpp | 28 ------------ 4 files changed, 1 insertion(+), 98 deletions(-) diff --git a/tx_service/include/cc/cc_map.h b/tx_service/include/cc/cc_map.h index 234c9332..9aaa8c58 100644 --- a/tx_service/include/cc/cc_map.h +++ b/tx_service/include/cc/cc_map.h @@ -276,8 +276,6 @@ class CcMap void ResetRangeStatus(uint32_t partition_id); - void LoadStoreRangeSize(uint32_t partition_id); - uint64_t SchemaTs() const { return schema_ts_; diff --git a/tx_service/include/cc/range_bucket_cc_map.h b/tx_service/include/cc/range_bucket_cc_map.h index a950d097..6805aa5f 100644 --- a/tx_service/include/cc/range_bucket_cc_map.h +++ b/tx_service/include/cc/range_bucket_cc_map.h @@ -356,52 +356,8 @@ class RangeBucketCcMap } } upload_bucket_rec->SetBucketInfo(bucket_info); - - // Get ranges in this bucket id. - auto range_ids = shard_->local_shards_.GetRangesInBucket( - target_key->bucket_id_, this->cc_ng_id_); - const_cast(bucket_info) - ->SetRangesInBucket(std::move(range_ids)); - } - - if (upload_bucket_rec->GetBucketInfo()->BucketOwner() == - this->cc_ng_id_) - { - // Init the rane size info for the ranges that are being - // migrated to the new owner node group. - auto ranges_in_bucket = - upload_bucket_rec->GetBucketInfo()->GetRangesInBucket(); - for (auto &[tbl, range_ids] : ranges_in_bucket) - { - CcMap *data_ccm = shard_->GetCcm(tbl, this->cc_ng_id_); - if (data_ccm == nullptr) - { - continue; - } - - for (auto range_id : range_ids) - { - if ((range_id & 0x3FF) % shard_->core_cnt_ == - shard_->core_id_) - { - // The shard is the owner of the range. We should - // load the store range size as the baseline for - // this range. - data_ccm->LoadStoreRangeSize( - static_cast(range_id)); - } - } - } - } // range partition - - if (shard_->core_id_ == shard_->core_cnt_ - 1) - { - // All ranges in this bucket have been migrated to the new owner - // node group. We can now clear the ranges in bucket info. - const_cast(upload_bucket_rec->GetBucketInfo()) - ->ClearRangesInBucket(); } - } // post commit or commit + } else { assert(req.CommitType() == PostWriteType::DowngradeLock); diff --git a/tx_service/include/range_bucket_key_record.h b/tx_service/include/range_bucket_key_record.h index 57c5d1f5..30bfbb29 100644 --- a/tx_service/include/range_bucket_key_record.h +++ b/tx_service/include/range_bucket_key_record.h @@ -24,7 +24,6 @@ #include #include #include -#include #include #include @@ -148,24 +147,6 @@ struct BucketInfo dirty_version_ = 0; } - void SetRangesInBucket( - std::unordered_map> - &&ranges_in_bucket) - { - ranges_in_bucket_snapshot_ = std::move(ranges_in_bucket); - } - - const std::unordered_map> & - GetRangesInBucket() const - { - return ranges_in_bucket_snapshot_; - } - - void ClearRangesInBucket() - { - ranges_in_bucket_snapshot_.clear(); - } - private: NodeGroupId bucket_owner_{UINT32_MAX}; uint64_t version_{0}; @@ -182,10 +163,6 @@ struct BucketInfo // We use atomic here since it might be updated by any tx processor without // bucket write lock. std::atomic_bool accepts_upload_batch_{false}; - // The ranges in this bucket. Only used during the post commit phase of the - // data migration. - std::unordered_map> - ranges_in_bucket_snapshot_; friend struct RangeBucketRecord; friend class RangeBucketCcMap; }; diff --git a/tx_service/src/cc/cc_map.cpp b/tx_service/src/cc/cc_map.cpp index 194106ba..ede1962c 100644 --- a/tx_service/src/cc/cc_map.cpp +++ b/tx_service/src/cc/cc_map.cpp @@ -515,32 +515,4 @@ void CcMap::ResetRangeStatus(uint32_t partition_id) << " status: " << std::boolalpha << std::get<2>(it->second); } -void CcMap::LoadStoreRangeSize(uint32_t partition_id) -{ - auto it = range_sizes_.find(partition_id); - if (it == range_sizes_.end()) - { - it = range_sizes_ - .emplace(partition_id, - std::make_tuple(static_cast( - RangeSizeStatus::kNotInitialized), - 0, - false)) - .first; - } - - if (std::get<0>(it->second) == - static_cast(RangeSizeStatus::kLoading)) - { - // Another request is already loading the range size. - return; - } - - std::get<0>(it->second) = static_cast(RangeSizeStatus::kLoading); - - int64_t ng_term = Sharder::Instance().LeaderTerm(cc_ng_id_); - shard_->FetchTableRangeSize( - table_name_, static_cast(partition_id), cc_ng_id_, ng_term); -} - } // namespace txservice From 94a578621f8ea51f076f7582c7efe42d3d65d846 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Tue, 17 Mar 2026 19:14:02 +0800 Subject: [PATCH 5/6] update forward logic --- tx_service/include/read_write_entry.h | 1 + tx_service/src/tx_operation.cpp | 57 ++++++++++++++++++++++----- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/tx_service/include/read_write_entry.h b/tx_service/include/read_write_entry.h index 36463be1..87eb97c0 100644 --- a/tx_service/include/read_write_entry.h +++ b/tx_service/include/read_write_entry.h @@ -80,6 +80,7 @@ struct WriteSetEntry // Used in double write scenarios during online DDL. // key shard code -> (partition id, cce addr) std::unordered_map> forward_addr_; + // True if the keys is located in a splitting/migrating range. bool on_dirty_range_{false}; }; diff --git a/tx_service/src/tx_operation.cpp b/tx_service/src/tx_operation.cpp index b45d3316..9d600ed0 100644 --- a/tx_service/src/tx_operation.cpp +++ b/tx_service/src/tx_operation.cpp @@ -723,21 +723,27 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) auto *range_info = txm->range_rec_.GetRangeInfo(); int32_t range_id = range_info->PartitionId(); uint32_t residual = static_cast(range_id & 0x3FF); - bool on_dirty_range = range_info->IsDirty(); + bool range_splitting = range_info->IsDirty(); while (write_key_it_ != next_range_start) { const TxKey &write_tx_key = write_key_it_->first; WriteSetEntry &write_entry = write_key_it_->second; write_entry.key_shard_code_ = (range_ng << 10) | residual; write_entry.partition_id_ = range_id; - write_entry.on_dirty_range_ = on_dirty_range; + write_entry.on_dirty_range_ = range_splitting; + + int32_t *new_bucket_range_id_ptr = nullptr; // If current range is migrating, forward to new range owner. if (new_bucket_ng != UINT32_MAX) { assert(new_bucket_ng != range_ng); - write_entry.forward_addr_.try_emplace( - ((new_bucket_ng << 10) | residual), - std::make_pair(range_id, CcEntryAddr())); + auto it = + write_entry.forward_addr_ + .try_emplace(((new_bucket_ng << 10) | residual), + std::make_pair(range_id, CcEntryAddr())) + .first; + new_bucket_range_id_ptr = &it->second.first; + write_entry.on_dirty_range_ = true; } // If range is splitting and the key will fall on a new range after @@ -765,7 +771,10 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) static_cast(new_residual % core_cnt); uint16_t range_shard = static_cast(residual % core_cnt); - if (new_range_ng != range_ng || new_range_shard != range_shard) + if ((new_range_ng != range_ng && + (new_bucket_ng == UINT32_MAX || + new_range_ng != new_bucket_ng)) || + new_range_shard != range_shard) { write_entry.forward_addr_.try_emplace( ((new_range_ng << 10) | new_residual), @@ -773,28 +782,56 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) // There is no need to update the range size of the old // range. write_entry.partition_id_ = -1; + if (new_bucket_range_id_ptr != nullptr) + { + *new_bucket_range_id_ptr = -1; + } } - else if (new_range_ng == range_ng && - new_range_shard == range_shard) + else if (new_range_ng == range_ng) { + assert(new_range_shard == range_shard); // Only update the range size on the new range id in case of // the new range and the old range are located on the same // shard. write_entry.partition_id_ = new_range_id; } + else if (new_bucket_ng != UINT32_MAX && + new_range_ng == new_bucket_ng) + { + assert(new_range_shard == range_shard); + assert(new_bucket_range_id_ptr != nullptr); + *new_bucket_range_id_ptr = new_range_id; + } // If the new range is migrating, forward to the new owner of // new range. - // TODO(ysw): double check the logic here. if (new_range_new_bucket_ng != UINT32_MAX) { assert(new_range_new_bucket_ng != new_range_ng); - if (new_range_new_bucket_ng != range_ng || + if ((new_range_new_bucket_ng != range_ng && + (new_bucket_ng == UINT32_MAX || + new_range_new_bucket_ng != new_bucket_ng)) || new_range_shard != range_shard) { write_entry.forward_addr_.try_emplace( ((new_range_new_bucket_ng << 10) | new_residual), std::make_pair(new_range_id, CcEntryAddr())); + write_entry.on_dirty_range_ = true; + } + else if (new_range_new_bucket_ng == range_ng) + { + assert(new_range_shard == range_shard); + // Only update the range size on the new range id in + // case of the new range and the old range are located + // on the same shard. + write_entry.partition_id_ = new_range_id; + } + else if (new_bucket_ng != UINT32_MAX && + new_range_new_bucket_ng == new_bucket_ng) + { + assert(new_range_shard == range_shard); + assert(new_bucket_range_id_ptr != nullptr); + *new_bucket_range_id_ptr = new_range_id; } } } From 48e581739a5e253dd51ba93dd46feff353d509b2 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Wed, 18 Mar 2026 12:13:52 +0800 Subject: [PATCH 6/6] update comment update comment --- tx_service/include/cc/template_cc_map.h | 2 +- tx_service/src/tx_operation.cpp | 62 ++++++++++++++++++------- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 985fef6f..26cffd1c 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -6988,7 +6988,7 @@ class TemplateCcMap : public CcMap // For data migration, we need to delete the range size info // for the range that has been migrated to other node group. int32_t partition_id = req.GetPartitionId(); - assert(partition_id != INT32_MAX); + assert(partition_id > 0 && partition_id != INT32_MAX); RemoveRangeSize(static_cast(partition_id)); } return req.SetFinish(); diff --git a/tx_service/src/tx_operation.cpp b/tx_service/src/tx_operation.cpp index 9d600ed0..01334517 100644 --- a/tx_service/src/tx_operation.cpp +++ b/tx_service/src/tx_operation.cpp @@ -732,17 +732,15 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) write_entry.partition_id_ = range_id; write_entry.on_dirty_range_ = range_splitting; - int32_t *new_bucket_range_id_ptr = nullptr; + uint32_t new_bucket_forward_key = UINT32_MAX; // If current range is migrating, forward to new range owner. if (new_bucket_ng != UINT32_MAX) { assert(new_bucket_ng != range_ng); - auto it = - write_entry.forward_addr_ - .try_emplace(((new_bucket_ng << 10) | residual), - std::make_pair(range_id, CcEntryAddr())) - .first; - new_bucket_range_id_ptr = &it->second.first; + new_bucket_forward_key = (new_bucket_ng << 10) | residual; + write_entry.forward_addr_.try_emplace( + new_bucket_forward_key, + std::make_pair(range_id, CcEntryAddr())); write_entry.on_dirty_range_ = true; } @@ -782,9 +780,12 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) // There is no need to update the range size of the old // range. write_entry.partition_id_ = -1; - if (new_bucket_range_id_ptr != nullptr) + if (new_bucket_forward_key != UINT32_MAX) { - *new_bucket_range_id_ptr = -1; + auto fwd_it = write_entry.forward_addr_.find( + new_bucket_forward_key); + assert(fwd_it != write_entry.forward_addr_.end()); + fwd_it->second.first = -1; } } else if (new_range_ng == range_ng) @@ -799,8 +800,11 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) new_range_ng == new_bucket_ng) { assert(new_range_shard == range_shard); - assert(new_bucket_range_id_ptr != nullptr); - *new_bucket_range_id_ptr = new_range_id; + assert(new_bucket_forward_key != UINT32_MAX); + auto fwd_it = + write_entry.forward_addr_.find(new_bucket_forward_key); + assert(fwd_it != write_entry.forward_addr_.end()); + fwd_it->second.first = new_range_id; } // If the new range is migrating, forward to the new owner of @@ -830,8 +834,11 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) new_range_new_bucket_ng == new_bucket_ng) { assert(new_range_shard == range_shard); - assert(new_bucket_range_id_ptr != nullptr); - *new_bucket_range_id_ptr = new_range_id; + assert(new_bucket_forward_key != UINT32_MAX); + auto fwd_it = write_entry.forward_addr_.find( + new_bucket_forward_key); + assert(fwd_it != write_entry.forward_addr_.end()); + fwd_it->second.first = new_range_id; } } } @@ -7974,9 +7981,8 @@ void DataMigrationOp::Forward(TransactionExecution *txm) if (kickout_data_op_.hd_result_.IsError()) { - LOG(ERROR) << "Data migration: fail to kickout range data" - << ", table name " - << kickout_range_tbl_it_->first.StringView() + LOG(ERROR) << "Data migration: fail to kickout data, table name: " + << kickout_data_op_.table_name_->StringView() << ", tx_number:" << txm->TxNumber() << ", keep retrying"; RetrySubOperation(txm, &kickout_data_op_); @@ -8044,7 +8050,29 @@ void DataMigrationOp::Forward(TransactionExecution *txm) ranges_in_bucket_snapshot_.cend()) { // Move to hash partitioned tables - break; + if (kickout_hash_partitioned_tbl_it_ == + hash_partitioned_tables_snapshot_.cend()) + { + LOG(INFO) << "Data migration: post write all" + << ", txn: " << txm->TxNumber(); + post_all_bucket_lock_op_.write_type_ = + PostWriteType::PostCommit; + ForwardToSubOperation(txm, + &post_all_bucket_lock_op_); + return; + } + kickout_data_op_.node_group_ = txm->TxCcNodeId(); + kickout_data_op_.table_name_ = + &(*kickout_hash_partitioned_tbl_it_); + kickout_data_op_.start_key_ = TxKey(); + kickout_data_op_.end_key_ = TxKey(); + kickout_data_op_.bucket_ids_ = + &status_->bucket_ids_[migrate_bucket_idx_]; + // Check if the key is hashed to this bucket + kickout_data_op_.clean_type_ = + CleanType::CleanBucketData; + ForwardToSubOperation(txm, &kickout_data_op_); + return; } TableType type = TableName::Type( kickout_range_tbl_it_->first.StringView());