diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index c5c462d5..47ebfce8 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -6373,6 +6373,11 @@ struct KickoutCcEntryCc : public TemplatedCcRequest return clean_type_; } + int32_t GetPartitionId() const + { + return range_id_; + } + private: CleanType clean_type_; // Target buckets to be cleaned if clean type is CleanBucketData. diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 136b9b00..26cffd1c 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -6899,10 +6899,11 @@ class TemplateCcMap : public CcMap uint16_t pause_idx = shard_->core_id_; CleanType clean_type = req.GetCleanType(); if (clean_type == CleanType::CleanBucketData || - clean_type == CleanType::CleanRangeData) + clean_type == CleanType::CleanRangeData || + clean_type == CleanType::CleanRangeDataForMigration) { - // For clean bucket data and range data, cc req is only sent to 1 - // core. + // For clean bucket data and range data (for data migration), cc req + // is only sent to 1 core. pause_idx = 0; } if (req.ResumeKey(pause_idx)->KeyPtr() != nullptr) @@ -6982,6 +6983,14 @@ class TemplateCcMap : public CcMap if (ccp == &pos_inf_page_ || !(ccp->FirstKey() < *end_key)) { + if (req.GetCleanType() == CleanType::CleanRangeDataForMigration) + { + // For data migration, we need to delete the range size info + // for the range that has been migrated to other node group. + int32_t partition_id = req.GetPartitionId(); + assert(partition_id > 0 && partition_id != INT32_MAX); + RemoveRangeSize(static_cast(partition_id)); + } return req.SetFinish(); } else @@ -11551,6 +11560,14 @@ class TemplateCcMap : public CcMap return false; } + void RemoveRangeSize(uint32_t partition_id) + { + if constexpr (RangePartitioned) + { + range_sizes_.erase(partition_id); + } + } + absl::btree_map< KeyT, std::unique_ptr< diff --git a/tx_service/include/read_write_entry.h b/tx_service/include/read_write_entry.h index 36463be1..87eb97c0 100644 --- a/tx_service/include/read_write_entry.h +++ b/tx_service/include/read_write_entry.h @@ -80,6 +80,7 @@ struct WriteSetEntry // Used in double write scenarios during online DDL. // key shard code -> (partition id, cce addr) std::unordered_map> forward_addr_; + // True if the keys is located in a splitting/migrating range. bool on_dirty_range_{false}; }; diff --git a/tx_service/src/cc/local_cc_handler.cpp b/tx_service/src/cc/local_cc_handler.cpp index 60c5a33e..d0b2284c 100644 --- a/tx_service/src/cc/local_cc_handler.cpp +++ b/tx_service/src/cc/local_cc_handler.cpp @@ -1901,10 +1901,12 @@ void txservice::LocalCcHandler::KickoutData(const TableName &table_name, KickoutCcEntryCc *req = kickout_ccentry_pool_.NextRequest(); // For hash partition, all data in a single bucket should be hashed to // the same core. - uint16_t core_cnt = (clean_type == CleanType::CleanBucketData || - clean_type == CleanType::CleanRangeData) - ? 1 - : Sharder::Instance().GetLocalCcShardsCount(); + uint16_t core_cnt = + (clean_type == CleanType::CleanBucketData || + clean_type == CleanType::CleanRangeData || + clean_type == CleanType::CleanRangeDataForMigration) + ? 1 + : Sharder::Instance().GetLocalCcShardsCount(); req->Reset(table_name, ng_id, &hres, @@ -1929,7 +1931,8 @@ void txservice::LocalCcHandler::KickoutData(const TableName &table_name, Sharder::Instance().ShardBucketIdToCoreIdx((*bucket_id)[0]), req); } - else if (clean_type == CleanType::CleanRangeData) + else if (clean_type == CleanType::CleanRangeData || + clean_type == CleanType::CleanRangeDataForMigration) { assert(range_id != INT32_MAX); uint16_t dest_core = static_cast( diff --git a/tx_service/src/tx_operation.cpp b/tx_service/src/tx_operation.cpp index 275309ae..01334517 100644 --- a/tx_service/src/tx_operation.cpp +++ b/tx_service/src/tx_operation.cpp @@ -723,21 +723,25 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) auto *range_info = txm->range_rec_.GetRangeInfo(); int32_t range_id = range_info->PartitionId(); uint32_t residual = static_cast(range_id & 0x3FF); - bool on_dirty_range = range_info->IsDirty(); + bool range_splitting = range_info->IsDirty(); while (write_key_it_ != next_range_start) { const TxKey &write_tx_key = write_key_it_->first; WriteSetEntry &write_entry = write_key_it_->second; write_entry.key_shard_code_ = (range_ng << 10) | residual; write_entry.partition_id_ = range_id; - write_entry.on_dirty_range_ = on_dirty_range; + write_entry.on_dirty_range_ = range_splitting; + + uint32_t new_bucket_forward_key = UINT32_MAX; // If current range is migrating, forward to new range owner. if (new_bucket_ng != UINT32_MAX) { assert(new_bucket_ng != range_ng); + new_bucket_forward_key = (new_bucket_ng << 10) | residual; write_entry.forward_addr_.try_emplace( - ((new_bucket_ng << 10) | residual), + new_bucket_forward_key, std::make_pair(range_id, CcEntryAddr())); + write_entry.on_dirty_range_ = true; } // If range is splitting and the key will fall on a new range after @@ -765,7 +769,10 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) static_cast(new_residual % core_cnt); uint16_t range_shard = static_cast(residual % core_cnt); - if (new_range_ng != range_ng || new_range_shard != range_shard) + if ((new_range_ng != range_ng && + (new_bucket_ng == UINT32_MAX || + new_range_ng != new_bucket_ng)) || + new_range_shard != range_shard) { write_entry.forward_addr_.try_emplace( ((new_range_ng << 10) | new_residual), @@ -773,28 +780,65 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm) // There is no need to update the range size of the old // range. write_entry.partition_id_ = -1; + if (new_bucket_forward_key != UINT32_MAX) + { + auto fwd_it = write_entry.forward_addr_.find( + new_bucket_forward_key); + assert(fwd_it != write_entry.forward_addr_.end()); + fwd_it->second.first = -1; + } } - else if (new_range_ng == range_ng && - new_range_shard == range_shard) + else if (new_range_ng == range_ng) { + assert(new_range_shard == range_shard); // Only update the range size on the new range id in case of // the new range and the old range are located on the same // shard. write_entry.partition_id_ = new_range_id; } + else if (new_bucket_ng != UINT32_MAX && + new_range_ng == new_bucket_ng) + { + assert(new_range_shard == range_shard); + assert(new_bucket_forward_key != UINT32_MAX); + auto fwd_it = + write_entry.forward_addr_.find(new_bucket_forward_key); + assert(fwd_it != write_entry.forward_addr_.end()); + fwd_it->second.first = new_range_id; + } // If the new range is migrating, forward to the new owner of // new range. - // TODO(ysw): double check the logic here. if (new_range_new_bucket_ng != UINT32_MAX) { assert(new_range_new_bucket_ng != new_range_ng); - if (new_range_new_bucket_ng != range_ng || + if ((new_range_new_bucket_ng != range_ng && + (new_bucket_ng == UINT32_MAX || + new_range_new_bucket_ng != new_bucket_ng)) || new_range_shard != range_shard) { write_entry.forward_addr_.try_emplace( ((new_range_new_bucket_ng << 10) | new_residual), std::make_pair(new_range_id, CcEntryAddr())); + write_entry.on_dirty_range_ = true; + } + else if (new_range_new_bucket_ng == range_ng) + { + assert(new_range_shard == range_shard); + // Only update the range size on the new range id in + // case of the new range and the old range are located + // on the same shard. + write_entry.partition_id_ = new_range_id; + } + else if (new_bucket_ng != UINT32_MAX && + new_range_new_bucket_ng == new_bucket_ng) + { + assert(new_range_shard == range_shard); + assert(new_bucket_forward_key != UINT32_MAX); + auto fwd_it = write_entry.forward_addr_.find( + new_bucket_forward_key); + assert(fwd_it != write_entry.forward_addr_.end()); + fwd_it->second.first = new_range_id; } } } @@ -7847,20 +7891,8 @@ void DataMigrationOp::Forward(TransactionExecution *txm) // Table name is ranges_in_bucket_snapshot_ is of type range // partition, need to convert it first. - TableType type; - if (TableName::IsBase(kickout_range_tbl_it_->first.StringView())) - { - type = TableType::Primary; - } - else if (TableName::IsUniqueSecondary( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::UniqueSecondary; - } - else - { - type = TableType::Secondary; - } + TableType type = + TableName::Type(kickout_range_tbl_it_->first.StringView()); kickout_range_table_ = TableName{kickout_range_tbl_it_->first.StringView(), type, @@ -7892,21 +7924,8 @@ void DataMigrationOp::Forward(TransactionExecution *txm) break; } - TableType type; - if (TableName::IsBase( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::Primary; - } - else if (TableName::IsUniqueSecondary( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::UniqueSecondary; - } - else - { - type = TableType::Secondary; - } + TableType type = TableName::Type( + kickout_range_tbl_it_->first.StringView()); kickout_range_table_ = TableName{kickout_range_tbl_it_->first.StringView(), type, @@ -7962,9 +7981,8 @@ void DataMigrationOp::Forward(TransactionExecution *txm) if (kickout_data_op_.hd_result_.IsError()) { - LOG(ERROR) << "Data migration: fail to kickout range data" - << ", table name " - << kickout_range_tbl_it_->first.StringView() + LOG(ERROR) << "Data migration: fail to kickout data, table name: " + << kickout_data_op_.table_name_->StringView() << ", tx_number:" << txm->TxNumber() << ", keep retrying"; RetrySubOperation(txm, &kickout_data_op_); @@ -7978,28 +7996,31 @@ void DataMigrationOp::Forward(TransactionExecution *txm) if (++kickout_range_tbl_it_ == ranges_in_bucket_snapshot_.cend()) { - LOG(INFO) << "Data migration: post write all" - << ", txn: " << txm->TxNumber(); - post_all_bucket_lock_op_.write_type_ = - PostWriteType::PostCommit; - ForwardToSubOperation(txm, &post_all_bucket_lock_op_); + // Try handle hash partitioned tables + if (kickout_hash_partitioned_tbl_it_ == + hash_partitioned_tables_snapshot_.cend()) + { + LOG(INFO) << "Data migration: post write all" + << ", txn: " << txm->TxNumber(); + post_all_bucket_lock_op_.write_type_ = + PostWriteType::PostCommit; + ForwardToSubOperation(txm, &post_all_bucket_lock_op_); + return; + } + kickout_data_op_.node_group_ = txm->TxCcNodeId(); + kickout_data_op_.table_name_ = + &(*kickout_hash_partitioned_tbl_it_); + kickout_data_op_.start_key_ = TxKey(); + kickout_data_op_.end_key_ = TxKey(); + kickout_data_op_.bucket_ids_ = + &status_->bucket_ids_[migrate_bucket_idx_]; + // Check if the key is hashed to this bucket + kickout_data_op_.clean_type_ = CleanType::CleanBucketData; + ForwardToSubOperation(txm, &kickout_data_op_); return; } - TableType type; - if (TableName::IsBase( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::Primary; - } - else if (TableName::IsUniqueSecondary( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::UniqueSecondary; - } - else - { - type = TableType::Secondary; - } + TableType type = + TableName::Type(kickout_range_tbl_it_->first.StringView()); kickout_range_table_ = TableName{kickout_range_tbl_it_->first.StringView(), type, @@ -8029,23 +8050,32 @@ void DataMigrationOp::Forward(TransactionExecution *txm) ranges_in_bucket_snapshot_.cend()) { // Move to hash partitioned tables - break; - } - TableType type; - if (TableName::IsBase( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::Primary; - } - else if (TableName::IsUniqueSecondary( - kickout_range_tbl_it_->first.StringView())) - { - type = TableType::UniqueSecondary; - } - else - { - type = TableType::Secondary; + if (kickout_hash_partitioned_tbl_it_ == + hash_partitioned_tables_snapshot_.cend()) + { + LOG(INFO) << "Data migration: post write all" + << ", txn: " << txm->TxNumber(); + post_all_bucket_lock_op_.write_type_ = + PostWriteType::PostCommit; + ForwardToSubOperation(txm, + &post_all_bucket_lock_op_); + return; + } + kickout_data_op_.node_group_ = txm->TxCcNodeId(); + kickout_data_op_.table_name_ = + &(*kickout_hash_partitioned_tbl_it_); + kickout_data_op_.start_key_ = TxKey(); + kickout_data_op_.end_key_ = TxKey(); + kickout_data_op_.bucket_ids_ = + &status_->bucket_ids_[migrate_bucket_idx_]; + // Check if the key is hashed to this bucket + kickout_data_op_.clean_type_ = + CleanType::CleanBucketData; + ForwardToSubOperation(txm, &kickout_data_op_); + return; } + TableType type = TableName::Type( + kickout_range_tbl_it_->first.StringView()); kickout_range_table_ = TableName{kickout_range_tbl_it_->first.StringView(), type,