diff --git a/tx_service/include/cc/catalog_cc_map.h b/tx_service/include/cc/catalog_cc_map.h index 302c81a2..9670f967 100644 --- a/tx_service/include/cc/catalog_cc_map.h +++ b/tx_service/include/cc/catalog_cc_map.h @@ -90,12 +90,7 @@ class CatalogCcMap uint32_t ng_id = req.NodeGroupId(); if (shard_->IsNative(ng_id) && req.CcOp() == CcOperation::ReadForWrite) { - int64_t ng_term = Sharder::Instance().LeaderTerm(ng_id); CcHandlerResult *hd_res = req.Result(); - if (ng_term < 0) - { - return hd_res->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); - } const CatalogKey *catalog_key = nullptr; if (req.Key() != nullptr) @@ -199,19 +194,17 @@ class CatalogCcMap } }); - int64_t ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId()); + int64_t ng_term = req.NodeGroupTerm(); + assert(ng_term > 0); CODE_FAULT_INJECTOR("term_CatalogCcMap_Execute_PostWriteAllCc", { LOG(INFO) << "FaultInject term_CatalogCcMap_Execute_PostWriteAllCc"; ng_term = -1; FaultInject::Instance().InjectFault( "term_CatalogCcMap_Execute_PostWriteAllCc", "remove"); - }); - if (ng_term < 0) - { req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); return true; - } + }); const CatalogKey *table_key = nullptr; if (req.Key() != nullptr) @@ -1148,28 +1141,31 @@ class CatalogCcMap assert(req.IsLocal()); uint32_t ng_id = req.NodeGroupId(); - int64_t ng_term = Sharder::Instance().LeaderTerm(ng_id); - ng_term = std::max(ng_term, Sharder::Instance().StandbyNodeTerm()); - - if (req.IsInRecovering()) + int64_t ng_term = req.NodeGroupTerm(); + if (ng_term < 0) { - ng_term = ng_term > 0 - ? ng_term - : Sharder::Instance().CandidateLeaderTerm(ng_id); + if (req.AllowRunOnCandidate()) + { + ng_term = Sharder::Instance().CandidateLeaderTerm(ng_id); + } + if (ng_term < 0) + { + ng_term = Sharder::Instance().LeaderTerm(ng_id); + int64_t standby_node_term = + Sharder::Instance().StandbyNodeTerm(); + ng_term = std::max(ng_term, standby_node_term); + } } + assert(ng_term > 0); CODE_FAULT_INJECTOR("term_CatalogCcMap_Execute_ReadCc", { - LOG(INFO) << "FaultInject term_CatalogCcMap_Execute_ReadCc"; + LOG(INFO) << "FaultInject term_CatalogCcMap_Execute_ReadCc"; ng_term = -1; FaultInject::Instance().InjectFault( "term_CatalogCcMap_Execute_ReadCc", "remove"); - }); - - if (ng_term < 0) - { req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); return true; - } + }); const CatalogKey *table_key = static_cast(req.Key()); diff --git a/tx_service/include/cc/cc_handler.h b/tx_service/include/cc/cc_handler.h index 3d4640b8..67418635 100644 --- a/tx_service/include/cc/cc_handler.h +++ b/tx_service/include/cc/cc_handler.h @@ -99,7 +99,8 @@ class CcHandler uint32_t hd_res_idx, CcProtocol proto, IsolationLevel iso_level, - bool abort_if_oom) = 0; + bool abort_if_oom, + bool allow_run_on_candidate) = 0; /** * @brief Acquires write locks for the input key in all shards. This method @@ -166,7 +167,8 @@ class CcHandler const TxRecord *record, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult &hres) = 0; + CcHandlerResult &hres, + bool allow_run_on_candidate) = 0; /** * @briefPost-processes a read/scan key. Post-processing clears the read @@ -202,7 +204,8 @@ class CcHandler CcHandlerResult &hres, bool is_local = false, bool need_remote_resp = true, - PostReadType post_read_type = PostReadType::Release) = 0; + PostReadType post_read_type = PostReadType::Release, + bool allow_run_on_candidate = false) = 0; /** * @brief Reads the input key and returns the key's record. The request puts @@ -239,6 +242,7 @@ class CcHandler CcProtocol proto = CcProtocol::OCC, bool is_for_write = false, bool is_covering_keys = false, + bool allow_run_on_candidate = false, bool point_read_on_miss = false, int32_t partition_id = -1, bool abort_if_oom = false) = 0; @@ -297,7 +301,7 @@ class CcHandler IsolationLevel iso_level = IsolationLevel::RepeatableRead, CcProtocol proto = CcProtocol::Locking, bool is_for_write = false, - bool is_recovring = false, + bool allow_run_on_candidate = false, bool execute_immediately = true) = 0; virtual bool ReadLocal( @@ -313,7 +317,7 @@ class CcHandler IsolationLevel iso_level = IsolationLevel::RepeatableRead, CcProtocol proto = CcProtocol::Locking, bool is_for_write = false, - bool is_recovring = false) = 0; + bool allow_run_on_candidate = false) = 0; virtual void ScanOpen( const TableName &table_name, @@ -404,7 +408,8 @@ class CcHandler virtual void NewTxn(CcHandlerResult &hres, IsolationLevel iso_level, NodeGroupId tx_ng_id, - uint32_t log_group_id) = 0; + uint32_t log_group_id, + bool allow_run_on_candidate) = 0; /// /// Sets the commit timestamp of the input tx. diff --git a/tx_service/include/cc/cc_req_misc.h b/tx_service/include/cc/cc_req_misc.h index ed9ea8b9..504d1c08 100644 --- a/tx_service/include/cc/cc_req_misc.h +++ b/tx_service/include/cc/cc_req_misc.h @@ -437,7 +437,8 @@ struct FillStoreSliceCc : public CcRequestBase { assert(err_code != CcErrorCode::NO_ERROR); DLOG(ERROR) << "Abort this FillStoreSliceCc request with error: " - << CcErrorMessage(err_code); + << CcErrorMessage(err_code) + << ", table name: " << table_name_->StringView(); bool finish_all = SetError(err_code); // Recycle request if (finish_all) @@ -989,6 +990,8 @@ struct UpdateCceCkptTsCc : public CcRequestBase UpdateCceCkptTsCc(const UpdateCceCkptTsCc &) = delete; UpdateCceCkptTsCc &operator=(const UpdateCceCkptTsCc &) = delete; + bool ValidTermCheck() const; + bool Execute(CcShard &ccs) override; void SetFinished() diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index 97e93fae..bb00b86b 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -283,6 +283,11 @@ struct TemplatedCcRequest : public CcRequestBase return node_group_id_; } + int64_t NodeGroupTerm() const + { + return ng_term_; + } + int64_t TxTerm() const { return tx_term_; @@ -366,6 +371,34 @@ struct AcquireCc AcquireCc(const AcquireCc &rhs) = delete; AcquireCc(AcquireCc &&rhs) = delete; + bool ValidTermCheck() override + { + int64_t cc_ng_term = -1; + if (allow_run_on_candidate_) + { + cc_ng_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); + } + if (cc_ng_term < 0) + { + cc_ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + } + + if (ng_term_ < 0) + { + ng_term_ = cc_ng_term; + } + + if (cc_ng_term < 0 || cc_ng_term != ng_term_) + { + return false; + } + else + { + return true; + } + } + void Reset(const TableName *tname, const uint64_t schema_version, const TxKey *key, @@ -378,7 +411,8 @@ struct AcquireCc uint32_t hd_res_idx, CcProtocol proto, IsolationLevel iso_level, - bool abort_if_oom) + bool abort_if_oom, + bool allow_run_on_candidate) { uint32_t ng_id = Sharder::Instance().ShardToCcNodeGroup(key_shard_code); TemplatedCcRequest>::Reset( @@ -395,6 +429,7 @@ struct AcquireCc is_local_ = true; block_by_lock_ = false; abort_if_oom_ = abort_if_oom; + allow_run_on_candidate_ = allow_run_on_candidate; } void Reset(const TableName *tname, @@ -512,6 +547,7 @@ struct AcquireCc bool is_local_{true}; bool block_by_lock_{false}; bool abort_if_oom_{false}; + bool allow_run_on_candidate_{false}; }; struct AcquireAllCc : public TemplatedCcRequest @@ -526,6 +562,28 @@ struct AcquireAllCc : public TemplatedCcRequest AcquireAllCc(const AcquireAllCc &rhs) = delete; AcquireAllCc(AcquireAllCc &&rhs) = delete; + bool ValidTermCheck() override + { + int64_t cc_ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t candidate_ng_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); + cc_ng_term = std::max(cc_ng_term, candidate_ng_term); + + if (ng_term_ < 0) + { + ng_term_ = cc_ng_term; + } + + if (cc_ng_term < 0 || cc_ng_term != ng_term_) + { + return false; + } + else + { + return true; + } + } + void Reset(const TableName *tname, const TxKey *key, uint32_t node_group_id, @@ -689,7 +747,17 @@ struct PostWriteCc : public TemplatedCcRequest bool ValidTermCheck() override { - int64_t cc_ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t cc_ng_term = -1; + if (allow_run_on_candidate_) + { + cc_ng_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); + } + if (cc_ng_term < 0) + { + cc_ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + } + if (cce_addr_ != nullptr) { if (cce_addr_->Term() != cc_ng_term) @@ -740,7 +808,8 @@ struct PostWriteCc : public TemplatedCcRequest const TxRecord *rec, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult *res) + CcHandlerResult *res, + bool allow_run_on_candidate) { TemplatedCcRequest::Reset( nullptr, res, addr->NodeGroupId(), tx_number, tx_term); @@ -754,6 +823,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = false; ccm_ = nullptr; is_initial_insert_ = false; + allow_run_on_candidate_ = allow_run_on_candidate; } void Reset(const TxKey *key, @@ -788,6 +858,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = false; ccm_ = nullptr; is_initial_insert_ = initial_insertion; + allow_run_on_candidate_ = false; } void Reset(const CcEntryAddr *addr, @@ -811,6 +882,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = true; ccm_ = nullptr; is_initial_insert_ = false; + allow_run_on_candidate_ = false; } void Reset(const TableName *table_name, @@ -845,6 +917,7 @@ struct PostWriteCc : public TemplatedCcRequest is_remote_ = true; ccm_ = nullptr; is_initial_insert_ = initial_insertion; + allow_run_on_candidate_ = false; } const CcEntryAddr *CceAddr() const @@ -909,6 +982,7 @@ struct PostWriteCc : public TemplatedCcRequest const void *key_; const std::string *key_str_; }; + bool allow_run_on_candidate_{false}; }; struct PostWriteAllCc @@ -919,6 +993,28 @@ struct PostWriteAllCc PostWriteAllCc(const PostWriteAllCc &rhs) = delete; PostWriteAllCc(PostWriteAllCc &&rhs) = delete; + bool ValidTermCheck() override + { + int64_t cc_ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t candidate_ng_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); + cc_ng_term = std::max(cc_ng_term, candidate_ng_term); + + if (ng_term_ < 0) + { + ng_term_ = cc_ng_term; + } + + if (cc_ng_term < 0 || cc_ng_term != ng_term_) + { + return false; + } + else + { + return true; + } + } + void Reset(const TableName *tname, const TxKey *key, uint32_t node_group_id, @@ -1117,9 +1213,29 @@ struct PostReadCc : public TemplatedCcRequest bool ValidTermCheck() override { - int64_t cc_ng_term = Sharder::Instance().LeaderTerm(node_group_id_); - int64_t standby_node_term = Sharder::Instance().StandbyNodeTerm(); - cc_ng_term = std::max(cc_ng_term, standby_node_term); + int64_t cc_ng_term = -1; + if (allow_run_on_candidate_) + { + cc_ng_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); + } + + if (cc_ng_term < 0) + { + cc_ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t standby_node_term = Sharder::Instance().StandbyNodeTerm(); + cc_ng_term = std::max(cc_ng_term, standby_node_term); + } + + if (ng_term_ < 0) + { + ng_term_ = cc_ng_term; + } + + if (cc_ng_term < 0 || cc_ng_term != ng_term_) + { + return false; + } assert(cce_addr_ != nullptr); if (cce_addr_->Term() != cc_ng_term) @@ -1152,7 +1268,8 @@ struct PostReadCc : public TemplatedCcRequest uint64_t key_ts, uint64_t gap_ts, PostReadType post_read_type, - CcHandlerResult *res) + CcHandlerResult *res, + bool allow_run_on_candidate = false) { TemplatedCcRequest::Reset( nullptr, res, addr->NodeGroupId(), tx_number, tx_term); @@ -1163,6 +1280,7 @@ struct PostReadCc : public TemplatedCcRequest gap_ts_ = gap_ts; post_read_type_ = post_read_type; ccm_ = nullptr; + allow_run_on_candidate_ = allow_run_on_candidate; } const CcEntryAddr *CceAddr() const @@ -1196,6 +1314,7 @@ struct PostReadCc : public TemplatedCcRequest uint64_t key_ts_; uint64_t gap_ts_; PostReadType post_read_type_; + bool allow_run_on_candidate_{false}; }; struct ReadCc : public TemplatedCcRequest @@ -1233,7 +1352,7 @@ struct ReadCc : public TemplatedCcRequest bool ValidTermCheck() override { int64_t cc_ng_term = -1; - if (is_in_recovering_) + if (allow_run_on_candidate_) { cc_ng_term = Sharder::Instance().CandidateLeaderTerm(node_group_id_); @@ -1304,7 +1423,7 @@ struct ReadCc : public TemplatedCcRequest bool is_for_write = false, bool is_covering_keys = false, std::vector *archives = nullptr, - bool is_in_recovering = false, + bool allow_run_on_candidate = false, bool point_read_on_miss = false, int32_t partition_id = -1, bool abort_if_oom = false) @@ -1326,7 +1445,7 @@ struct ReadCc : public TemplatedCcRequest cce_ptr_ = nullptr; archives_ = archives; is_local_ = true; - is_in_recovering_ = is_in_recovering; + allow_run_on_candidate_ = allow_run_on_candidate; is_covering_keys_ = is_covering_keys; point_read_on_cache_miss_ = point_read_on_miss; blk_type_ = NotBlocked; @@ -1359,6 +1478,7 @@ struct ReadCc : public TemplatedCcRequest bool is_for_write = false, bool is_covering_keys = false, std::vector *archives = nullptr, + bool allow_run_on_candidate = false, bool point_read_on_miss = false, int32_t partition_id = -1, bool abort_if_oom = false) @@ -1380,7 +1500,7 @@ struct ReadCc : public TemplatedCcRequest cce_ptr_ = nullptr; archives_ = archives; is_local_ = false; - is_in_recovering_ = false; + allow_run_on_candidate_ = allow_run_on_candidate; is_covering_keys_ = is_covering_keys; point_read_on_cache_miss_ = point_read_on_miss; blk_type_ = NotBlocked; @@ -1434,7 +1554,7 @@ struct ReadCc : public TemplatedCcRequest cce_ptr_ = nullptr; archives_ = archives; is_local_ = true; - is_in_recovering_ = false; + allow_run_on_candidate_ = false; is_covering_keys_ = is_covering_keys; point_read_on_cache_miss_ = point_read_on_miss; blk_type_ = NotBlocked; @@ -1533,9 +1653,9 @@ struct ReadCc : public TemplatedCcRequest return is_local_; } - bool IsInRecovering() const + bool AllowRunOnCandidate() const { - return is_in_recovering_; + return allow_run_on_candidate_; } bool IsCoveringKeys() const @@ -1610,8 +1730,9 @@ struct ReadCc : public TemplatedCcRequest LruEntry *cce_ptr_{nullptr}; bool is_local_{true}; - // Is issued in a recovering process - bool is_in_recovering_{false}; + // True if this ccrequest is allowed to run on candidate node, such as + // issued in a recovering process + bool allow_run_on_candidate_{false}; // Reserved for unique sk read bool is_covering_keys_{false}; bool point_read_on_cache_miss_{false}; @@ -3783,8 +3904,11 @@ struct HashPartitionDataSyncScanCc : public CcRequestBase bool ValidTermCheck() { int64_t cc_ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t candidate_ng_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); int64_t standby_node_term = Sharder::Instance().StandbyNodeTerm(); - int64_t current_term = std::max(cc_ng_term, standby_node_term); + int64_t current_term = + std::max({cc_ng_term, standby_node_term, candidate_ng_term}); if (node_group_term_ < 0) { @@ -3840,6 +3964,14 @@ struct HashPartitionDataSyncScanCc : public CcRequestBase return false; } + int64_t GetNodeGroupLeaderTerm() const + { + int64_t leader_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t candidate_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); + return std::max(leader_term, candidate_term); + } + bool IsDrained() const { return pause_pos_.second; @@ -4047,7 +4179,8 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase bool export_base_table_item_only = false, StoreRange *store_range = nullptr, const std::map *old_slices_delta_size = nullptr, - uint64_t schema_version = 0) + uint64_t schema_version = 0, + bool run_on_candidate_node = false) : scan_heap_is_full_(false), table_name_(&table_name), node_group_id_(node_group_id), @@ -4066,7 +4199,8 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase slice_coordinator_(export_base_table_item_, &slices_to_scan_), export_base_table_item_only_(export_base_table_item_only), store_range_(store_range), - schema_version_(schema_version) + schema_version_(schema_version), + run_on_candidate_node_(run_on_candidate_node) { tx_number_ = txn; assert(scan_batch_size_ > DataSyncScanBatchSize); @@ -4105,8 +4239,11 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase bool ValidTermCheck() { int64_t cc_ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t candidate_ng_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); int64_t standby_node_term = Sharder::Instance().StandbyNodeTerm(); - int64_t current_term = std::max(cc_ng_term, standby_node_term); + int64_t current_term = + std::max({cc_ng_term, standby_node_term, candidate_ng_term}); if (node_group_term_ < 0) { @@ -4133,7 +4270,6 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); return false; } - scan_count_++; CcMap *ccm = ccs.GetCcm(*table_name_, node_group_id_); if (ccm == nullptr) { @@ -4185,7 +4321,7 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase cv_.wait(lk, [this] { return unfinished_cnt_ == 0; }); } - void Reset(OpType op_type = OpType::Normal) + void Reset() { std::lock_guard lk(mux_); unfinished_cnt_ = 1; @@ -4215,7 +4351,6 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase } err_ = CcErrorCode::NO_ERROR; - op_type_ = op_type; slice_coordinator_.Reset(); } @@ -4302,11 +4437,6 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase err_ = CcErrorCode::LOG_NOT_TRUNCATABLE; } - bool IsTerminated() const - { - return op_type_ == OpType::Terminated; - } - StoreRange *StoreRangePtr() const { return store_range_; @@ -4427,14 +4557,17 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase return last_data_sync_ts_; } + bool RunOnCandidateNode() const + { + return run_on_candidate_node_; + } + std::vector accumulated_scan_cnt_; std::vector accumulated_flush_data_size_; // std::vector is not safe to use in multi-threaded environment, std::vector scan_heap_is_full_{0}; - size_t scan_count_{0}; - private: struct SliceCoordinator { @@ -4603,7 +4736,7 @@ struct RangePartitionDataSyncScanCc : public CcRequestBase // TODO(xxx) general solution for #1130 const uint64_t schema_version_{0}; - OpType op_type_{OpType::Normal}; + bool run_on_candidate_node_{false}; template bool Execute(CcShard &ccs) override { int64_t ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t candidate_ng_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); + ng_term = std::max(ng_term, candidate_ng_term); if (ng_term < 0 && clean_type_ == CleanType::CleanDeletedData) { // Purge deleted data is the only type of kickout cc that will @@ -6036,7 +6172,7 @@ struct KickoutCcEntryCc : public TemplatedCcRequest if (ng_term < 0) { - return SetError(CcErrorCode::TX_NODE_NOT_LEADER); + return SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); } if (clean_type_ == CleanType::CleanCcm) @@ -6672,6 +6808,9 @@ struct UpdateKeyCacheCc : public CcRequestBase bool Execute(CcShard &ccs) override { int64_t ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t candidate_ng_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); + ng_term = std::max(ng_term, candidate_ng_term); if (ng_term < 0 || ng_term != ng_term_) { return SetFinish(); @@ -8783,6 +8922,9 @@ struct ScanSliceDeltaSizeCcForRangePartition : public CcRequestBase bool ValidTermCheck() const { int64_t cc_ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t candidate_ng_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); + cc_ng_term = std::max(cc_ng_term, candidate_ng_term); assert(node_group_term_ > 0); return cc_ng_term >= 0 && cc_ng_term == node_group_term_; @@ -9240,6 +9382,9 @@ struct SampleSubRangeKeysCc : public CcRequestBase bool ValidTermCheck() { int64_t cc_ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t candidate_ng_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); + cc_ng_term = std::max(cc_ng_term, candidate_ng_term); assert(node_group_term_ > 0); if (cc_ng_term < 0 || cc_ng_term != node_group_term_) diff --git a/tx_service/include/cc/cc_shard.h b/tx_service/include/cc/cc_shard.h index ee977e68..39561f0c 100644 --- a/tx_service/include/cc/cc_shard.h +++ b/tx_service/include/cc/cc_shard.h @@ -600,6 +600,9 @@ class CcShard uint64_t min_ts = UINT64_MAX; int64_t cc_ng_term = Sharder::Instance().LeaderTerm(cc_ng_id); + int64_t candidate_ng_term = + Sharder::Instance().CandidateLeaderTerm(cc_ng_id); + cc_ng_term = std::max(cc_ng_term, candidate_ng_term); if (cc_ng_term < 0) { cc_ng_term = Sharder::Instance().StandbyNodeTerm(); diff --git a/tx_service/include/cc/cluster_config_cc_map.h b/tx_service/include/cc/cluster_config_cc_map.h index 3e55acb9..9af4be24 100644 --- a/tx_service/include/cc/cluster_config_cc_map.h +++ b/tx_service/include/cc/cluster_config_cc_map.h @@ -67,13 +67,8 @@ class ClusterConfigCcMap hd_res->ClearRefCnt(); AcquireAllResult &acquire_all_result = hd_res->Value(); uint32_t ng_id = req.NodeGroupId(); - int64_t ng_term = Sharder::Instance().LeaderTerm(ng_id); - - if (ng_term < 0) - { - hd_res->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); - return true; - } + int64_t ng_term = req.NodeGroupTerm(); + assert(ng_term > 0); LockType acquired_lock = LockType::NoLock; CcErrorCode err_code = CcErrorCode::NO_ERROR; diff --git a/tx_service/include/cc/local_cc_handler.h b/tx_service/include/cc/local_cc_handler.h index eae6ba46..8513a85f 100644 --- a/tx_service/include/cc/local_cc_handler.h +++ b/tx_service/include/cc/local_cc_handler.h @@ -55,7 +55,8 @@ class LocalCcHandler : public CcHandler uint32_t hd_res_idx, CcProtocol proto, IsolationLevel iso_level, - bool abort_if_oom) override; + bool abort_if_oom, + bool allow_run_on_candidate) override; void AcquireWriteAll(const TableName &table_name, const TxKey &key, @@ -103,20 +104,21 @@ class LocalCcHandler : public CcHandler const TxRecord *record, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult &hres) override; + CcHandlerResult &hres, + bool allow_run_on_candidate) override; - CcReqStatus PostRead( - uint64_t tx_number, - int64_t tx_term, - uint16_t command_id, - uint64_t key_ts, - uint64_t gap_ts, - uint64_t commit_ts, - const CcEntryAddr &ccentry_addr, - CcHandlerResult &hres, - bool is_local = false, - bool need_remote_resp = true, - PostReadType post_read_type = PostReadType::Release) override; + CcReqStatus PostRead(uint64_t tx_number, + int64_t tx_term, + uint16_t command_id, + uint64_t key_ts, + uint64_t gap_ts, + uint64_t commit_ts, + const CcEntryAddr &ccentry_addr, + CcHandlerResult &hres, + bool is_local = false, + bool need_remote_resp = true, + PostReadType post_read_type = PostReadType::Release, + bool allow_run_on_candidate = false) override; /// /// Starts concurrency control for the input key and returns the key's @@ -146,6 +148,7 @@ class LocalCcHandler : public CcHandler CcProtocol proto = CcProtocol::OCC, bool is_for_write = false, bool is_covering_keys = false, + bool allow_run_on_candidate = false, bool point_read_on_miss = false, int32_t partition_id = -1, bool abort_if_oom = false) override; @@ -171,7 +174,7 @@ class LocalCcHandler : public CcHandler IsolationLevel iso_level = IsolationLevel::RepeatableRead, CcProtocol proto = CcProtocol::Locking, bool is_for_write = false, - bool is_recovering = false, + bool allow_run_on_candidate = false, bool execute_immediately = true) override; std::tuple @@ -194,7 +197,7 @@ class LocalCcHandler : public CcHandler IsolationLevel iso_level = IsolationLevel::RepeatableRead, CcProtocol proto = CcProtocol::Locking, bool is_for_write = false, - bool is_recovring = false) override; + bool allow_run_on_candidate = false) override; void ScanOpen(const TableName &table_name, const uint64_t schema_version, @@ -281,7 +284,8 @@ class LocalCcHandler : public CcHandler void NewTxn(CcHandlerResult &hres, IsolationLevel iso_level, NodeGroupId tx_ng_id, - uint32_t log_group_id) override; + uint32_t log_group_id, + bool allow_run_on_candidate) override; /// /// Sets the commit timestamp of the input tx. diff --git a/tx_service/include/cc/local_cc_shards.h b/tx_service/include/cc/local_cc_shards.h index 961bee52..c0cca56d 100644 --- a/tx_service/include/cc/local_cc_shards.h +++ b/tx_service/include/cc/local_cc_shards.h @@ -2449,6 +2449,7 @@ class LocalCcShards bool GetNextRangePartitionId(const TableName &tablename, const TableSchema *table_schema, + NodeGroupId ng_id, uint32_t range_cnt, int32_t &out_next_partition_id); diff --git a/tx_service/include/cc/range_bucket_cc_map.h b/tx_service/include/cc/range_bucket_cc_map.h index 6805aa5f..189f5c68 100644 --- a/tx_service/include/cc/range_bucket_cc_map.h +++ b/tx_service/include/cc/range_bucket_cc_map.h @@ -89,22 +89,23 @@ class RangeBucketCcMap TX_TRACE_DUMP(&req); assert(req.IsLocal()); - uint32_t ng_id = req.NodeGroupId(); - int64_t ng_term = Sharder::Instance().LeaderTerm(ng_id); - if (req.IsInRecovering()) - { - ng_term = Sharder::Instance().CandidateLeaderTerm(ng_id); - } - else - { - ng_term = std::max(ng_term, Sharder::Instance().StandbyNodeTerm()); - } - + int64_t ng_term = req.NodeGroupTerm(); if (ng_term < 0) { - req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); - return true; + if (req.AllowRunOnCandidate()) + { + ng_term = + Sharder::Instance().CandidateLeaderTerm(req.NodeGroupId()); + } + if (ng_term < 0) + { + ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId()); + int64_t standby_node_term = + Sharder::Instance().StandbyNodeTerm(); + ng_term = std::max(ng_term, standby_node_term); + } } + assert(ng_term > 0); const RangeBucketKey *bucket_key = static_cast(req.Key()); diff --git a/tx_service/include/cc/range_cc_map.h b/tx_service/include/cc/range_cc_map.h index 5b39fd71..88bd2d28 100644 --- a/tx_service/include/cc/range_cc_map.h +++ b/tx_service/include/cc/range_cc_map.h @@ -217,12 +217,23 @@ class RangeCcMap : public TemplateCcMap assert(this->table_name_ == *req.GetTableName()); CcHandlerResult *hd_result = req.Result(); - int64_t ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId()); + int64_t ng_term = req.NodeGroupTerm(); if (ng_term < 0) { - hd_result->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); - return true; + if (req.AllowRunOnCandidate()) + { + ng_term = + Sharder::Instance().CandidateLeaderTerm(req.NodeGroupId()); + } + if (ng_term < 0) + { + ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId()); + int64_t standby_node_term = + Sharder::Instance().StandbyNodeTerm(); + ng_term = std::max(ng_term, standby_node_term); + } } + assert(ng_term > 0); // For range cc maps, we assume that all of a table's ranges are loaded // into memory for caching when the range cc map is initialized. There @@ -399,13 +410,6 @@ class RangeCcMap : public TemplateCcMap */ bool Execute(PostWriteAllCc &req) override { - int64_t ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId()); - if (ng_term < 0) - { - req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); - return false; - } - // When the commit ts is 0 or the commit type is DowngradeLock, the // request commits nothing and only removes the write intents/locks // acquired earlier. @@ -610,6 +614,8 @@ class RangeCcMap : public TemplateCcMap if (txservice_enable_key_cache && this->table_name_.IsBase()) { + int64_t ng_term = req.NodeGroupTerm(); + assert(ng_term > 0); // try to init the key cache for new range if it // lands on this ng for (auto new_range : new_range_entries) diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index f2a00630..9ccdeda4 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -199,16 +199,14 @@ class TemplateCcMap : public CcMap const KeyT *target_key = nullptr; KeyT decoded_key; - int64_t ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId()); + int64_t ng_term = req.NodeGroupTerm(); + assert(ng_term > 0); CODE_FAULT_INJECTOR("term_TemplateCcMap_Execute_AcquireCc", { - LOG(INFO) << "FaultInject term_TemplateCcMap_Execute_AcquireCc"; + LOG(INFO) << "FaultInject term_TemplateCcMap_Execute_AcquireCc"; ng_term = -1; - }); - if (ng_term < 0) - { hd_res->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); return true; - } + }); if (req.SchemaVersion() != 0 && req.SchemaVersion() != schema_ts_) { @@ -511,14 +509,7 @@ class TemplateCcMap : public CcMap req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); return true; } - }); - - if (!Sharder::Instance().CheckLeaderTerm(cce_addr->NodeGroupId(), - cce_addr->Term())) - { - req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); - return true; - } + }) const ValueT *commit_val = static_cast(req.Payload()); TxNumber txn = req.Txn(); @@ -702,11 +693,8 @@ class TemplateCcMap : public CcMap bool will_insert = false; uint32_t ng_id = req.NodeGroupId(); - int64_t ng_term = Sharder::Instance().LeaderTerm(ng_id); - if (ng_term < 0) - { - return hd_res->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); - } + int64_t ng_term = req.NodeGroupTerm(); + assert(ng_term > 0); uint16_t tx_core_id = ((req.Txn() >> 32L) & 0x3FF) % shard_->core_cnt_; @@ -996,13 +984,6 @@ class TemplateCcMap : public CcMap return false; } - int64_t ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId()); - if (ng_term < 0) - { - req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); - return true; - } - const KeyT *target_key = nullptr; if (req.Key() != nullptr) { @@ -1241,19 +1222,6 @@ class TemplateCcMap : public CcMap } }); - int64_t standby_node_term = Sharder::Instance().StandbyNodeTerm(); - - if (!Sharder::Instance().CheckLeaderTerm(cce_addr.NodeGroupId(), - cce_addr.Term()) && - (standby_node_term < 0 || standby_node_term != cce_addr.Term())) - { - LOG(INFO) << "PostReadCc, node_group(#" << cce_addr.NodeGroupId() - << ") term < 0, tx:" << req.Txn() - << " ,cce: " << cce_addr.ExtractCce(); - hd_res->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); - return true; - } - uint64_t key_ts = req.KeyTs(); uint64_t gap_ts = req.GapTs(); uint64_t commit_ts = req.CommitTs(); @@ -1419,15 +1387,20 @@ class TemplateCcMap : public CcMap }); uint32_t ng_id = req.NodeGroupId(); - int64_t ng_term = -1; - if (req.IsInRecovering()) - { - ng_term = Sharder::Instance().CandidateLeaderTerm(ng_id); - } - else + int64_t ng_term = req.NodeGroupTerm(); + if (ng_term < 0) { - ng_term = Sharder::Instance().LeaderTerm(ng_id); - ng_term = std::max(ng_term, Sharder::Instance().StandbyNodeTerm()); + if (req.AllowRunOnCandidate()) + { + ng_term = Sharder::Instance().CandidateLeaderTerm(ng_id); + } + if (ng_term < 0) + { + ng_term = Sharder::Instance().LeaderTerm(ng_id); + int64_t standby_node_term = + Sharder::Instance().StandbyNodeTerm(); + ng_term = std::max(ng_term, standby_node_term); + } } if (ng_term < 0) @@ -5756,6 +5729,15 @@ class TemplateCcMap : public CcMap // round ckpt. need_export = false; } + else if (req.RunOnCandidateNode()) + { + // If the request is running on candidate node(such as + // during ccnode recovery), during the `DataSync` process, + // there may be some ccentry keys that have just been + // replayed from the log. These keys can simply be skipped + // during this round of datasync. + need_export = false; + } else { LOG(ERROR) @@ -5786,7 +5768,6 @@ class TemplateCcMap : public CcMap req.accumulated_flush_data_size_[shard_->core_id_] += flush_size; - if (export_result.second) { is_scan_mem_full = true; @@ -5903,15 +5884,6 @@ class TemplateCcMap : public CcMap return false; } - int64_t ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId()); - int64_t standby_node_term = Sharder::Instance().StandbyNodeTerm(); - int64_t current_term = std::max(ng_term, standby_node_term); - - if (current_term < 0 || current_term != req.node_group_term_) - { - req.SetError(CcErrorCode::TX_NODE_NOT_LEADER); - return false; - } Iterator it; Iterator end_it = End(); @@ -6083,6 +6055,10 @@ class TemplateCcMap : public CcMap << ", data_sync_ts_: " << req.data_sync_ts_; replay_cmds_notnull = true; + int64_t current_term = req.NodeGroupTerm(); + assert(current_term > 0); + int64_t leader_term = req.GetNodeGroupLeaderTerm(); + // The fetch record may failed when the // cce is touch at 1st place, so the record status can // be Unknown. If the data is owned by this ng, fetch @@ -6107,12 +6083,12 @@ class TemplateCcMap : public CcMap TxKey(key), cce, cc_ng_id_, - ng_term, + current_term, nullptr, part_id); } } - else if (ng_term > 0) + else if (leader_term > 0) { // After node escalate to leader, and we've loaded // from kv, there should be no gap in the buffered diff --git a/tx_service/include/data_sync_task.h b/tx_service/include/data_sync_task.h index 70972f35..fd6add41 100644 --- a/tx_service/include/data_sync_task.h +++ b/tx_service/include/data_sync_task.h @@ -138,7 +138,8 @@ struct DataSyncTask is_dirty_(is_dirty), sync_ts_adjustable_(need_adjust_ts), task_res_(hres), - need_update_ckpt_ts_(true) + need_update_ckpt_ts_(true), + run_on_leader_node_(true) { } @@ -182,6 +183,11 @@ struct DataSyncTask sync_ts_adjustable_ = false; } + void SetRunOnLeaderNode(bool run_on_leader_node) + { + run_on_leader_node_ = run_on_leader_node; + } + const TableName table_name_; int32_t id_; uint64_t range_version_; @@ -238,6 +244,7 @@ struct DataSyncTask cce_entries_; bool need_update_ckpt_ts_{true}; + bool run_on_leader_node_{true}; }; struct FlushTaskEntry diff --git a/tx_service/include/proto/cc_request.proto b/tx_service/include/proto/cc_request.proto index d02a679f..7ce1f6ee 100644 --- a/tx_service/include/proto/cc_request.proto +++ b/tx_service/include/proto/cc_request.proto @@ -888,6 +888,7 @@ message ReadRequest { uint64 schema_version = 15; int32 partition_id = 16; bool abort_if_oom = 17; + bool allow_run_on_candidate = 18; } message ReadResponse { diff --git a/tx_service/include/remote/remote_cc_handler.h b/tx_service/include/remote/remote_cc_handler.h index 83695f21..8ae9e01b 100644 --- a/tx_service/include/remote/remote_cc_handler.h +++ b/tx_service/include/remote/remote_cc_handler.h @@ -130,6 +130,7 @@ class RemoteCcHandler CcProtocol proto = CcProtocol::OCC, bool is_for_write = false, bool is_covering_keys = false, + bool allow_run_on_candidate = false, bool point_read_on_miss = false, int32_t partition_id = -1, bool abort_if_oom = false); diff --git a/tx_service/include/sequences/sequences.h b/tx_service/include/sequences/sequences.h index c459d85b..b5416059 100644 --- a/tx_service/include/sequences/sequences.h +++ b/tx_service/include/sequences/sequences.h @@ -121,6 +121,7 @@ class Sequences int16_t thd_group_id); static bool ApplyIdOfTableRangePartition(const TableName &table, + NodeGroupId ng_id, int64_t desired_vals, int64_t &first_reserved_id, int64_t &reserved_vals, diff --git a/tx_service/include/sharder.h b/tx_service/include/sharder.h index 4586505d..80b33e33 100644 --- a/tx_service/include/sharder.h +++ b/tx_service/include/sharder.h @@ -320,10 +320,13 @@ class Sharder * * @param ng_id The cc node group ID. * @param term The expected leader term. + * @param check_candidate Whether to check the candidate leader term. * @return true, if the current leader is the expected one. * @return false, otherwise. */ - bool CheckLeaderTerm(uint32_t ng_id, int64_t term) const; + bool CheckLeaderTerm(uint32_t ng_id, + int64_t term, + bool check_candidate = false) const; void SetLeaderTerm(NodeGroupId ng_id, int64_t term) { diff --git a/tx_service/include/tx_execution.h b/tx_service/include/tx_execution.h index dc1f81f9..75d2a16c 100644 --- a/tx_service/include/tx_execution.h +++ b/tx_service/include/tx_execution.h @@ -183,7 +183,8 @@ class TransactionExecution : public LinkedTransaction NodeGroupId tx_ng_id = UINT32_MAX, bool start_now = false, const std::function *yield_func = nullptr, - const std::function *resume_func = nullptr); + const std::function *resume_func = nullptr, + bool allow_run_on_candidate = false); std::unique_ptr init_tx_req_; bool CommitTx(CommitTxRequest &commit_req); std::unique_ptr commit_tx_req_; @@ -268,7 +269,8 @@ class TransactionExecution : public LinkedTransaction bool CheckLeaderTerm() const { NodeGroupId ng_id = TxCcNodeId(); - if (Sharder::Instance().CheckLeaderTerm(ng_id, TxTerm()) || + if (Sharder::Instance().CheckLeaderTerm( + ng_id, TxTerm(), allow_run_on_candidate_) || (TxStatus() == TxnStatus::Recovering && Sharder::Instance().CandidateLeaderTerm(ng_id) >= 0)) { @@ -686,6 +688,8 @@ class TransactionExecution : public LinkedTransaction bool bind_to_ext_proc_{false}; + bool allow_run_on_candidate_{false}; + // Initialization phase. InitTxnOperation init_txn_; diff --git a/tx_service/include/tx_request.h b/tx_service/include/tx_request.h index 579d24da..121bb08b 100644 --- a/tx_service/include/tx_request.h +++ b/tx_service/include/tx_request.h @@ -193,12 +193,14 @@ struct InitTxRequest : public TemplateTxRequest const std::function *resume_fptr = nullptr, TransactionExecution *txm = nullptr, uint32_t tx_ng_id = UINT32_MAX, - uint32_t log_group_id = UINT32_MAX) + uint32_t log_group_id = UINT32_MAX, + bool allow_run_on_candidate = false) : TemplateTxRequest(yield_fptr, resume_fptr, txm), iso_level_(level), protocol_(proto), tx_ng_id_(tx_ng_id), - log_group_id_(log_group_id) + log_group_id_(log_group_id), + allow_run_on_candidate_(allow_run_on_candidate) { } @@ -208,6 +210,7 @@ struct InitTxRequest : public TemplateTxRequest CcProtocol protocol_{CcProtocol::OCC}; uint32_t tx_ng_id_{UINT32_MAX}; uint32_t log_group_id_{UINT32_MAX}; + bool allow_run_on_candidate_{false}; }; struct ReadTxRequest @@ -223,7 +226,6 @@ struct ReadTxRequest bool read_local = false, uint64_t ts = 0, bool is_covering_keys = false, - bool is_recovering = false, bool point_read_on_cache_miss = false, const std::function *yield_fptr = nullptr, const std::function *resume_fptr = nullptr, @@ -239,7 +241,6 @@ struct ReadTxRequest ts_(ts), schema_version_(schema_version), is_covering_keys_(is_covering_keys), - is_recovering_(is_recovering), point_read_on_cache_miss_(point_read_on_cache_miss) { } @@ -253,7 +254,6 @@ struct ReadTxRequest bool read_local = false, uint64_t ts = 0, bool is_covering_keys = false, - bool is_recovering = false, bool point_read_on_cache_miss = false) { tab_name_ = tab_name; @@ -266,7 +266,6 @@ struct ReadTxRequest ts_ = ts; schema_version_ = schema_version; is_covering_keys_ = is_covering_keys; - is_recovering_ = is_recovering; point_read_on_cache_miss_ = point_read_on_cache_miss; } @@ -279,7 +278,6 @@ struct ReadTxRequest bool read_local = false, uint64_t ts = 0, bool is_covering_keys = false, - bool is_recovering = false, bool point_read_on_cache_miss = false) { tab_name_ = tab_name; @@ -292,7 +290,6 @@ struct ReadTxRequest ts_ = ts; schema_version_ = schema_version; is_covering_keys_ = is_covering_keys; - is_recovering_ = is_recovering; point_read_on_cache_miss_ = point_read_on_cache_miss; } @@ -333,11 +330,6 @@ struct ReadTxRequest // For unique_sk point query bool is_covering_keys_; - // If this is a read request for recovering. If true we should - // rely on candidate leader term instead of current term to decide - // if node is valid leader. - bool is_recovering_; - bool point_read_on_cache_miss_; }; diff --git a/tx_service/include/tx_util.h b/tx_service/include/tx_util.h index 5e599b8b..f2d51a93 100644 --- a/tx_service/include/tx_util.h +++ b/tx_service/include/tx_util.h @@ -66,7 +66,8 @@ static inline TransactionExecution *NewTxInit( int16_t group_id = -1, bool start_now = false, const std::function *yield_fptr = nullptr, - const std::function *resume_fptr = nullptr) + const std::function *resume_fptr = nullptr, + bool allow_run_on_candidate = false) { assert(tx_service != nullptr); txservice::TransactionExecution *txm = nullptr; @@ -75,7 +76,13 @@ static inline TransactionExecution *NewTxInit( #else txm = tx_service->NewTx(); #endif - txm->InitTx(level, proto, tx_owner, start_now, yield_fptr, resume_fptr); + txm->InitTx(level, + proto, + tx_owner, + start_now, + yield_fptr, + resume_fptr, + allow_run_on_candidate); return txm; } diff --git a/tx_service/src/cc/cc_req_misc.cpp b/tx_service/src/cc/cc_req_misc.cpp index ead420ac..669eab80 100644 --- a/tx_service/src/cc/cc_req_misc.cpp +++ b/tx_service/src/cc/cc_req_misc.cpp @@ -1168,8 +1168,36 @@ bool RunOnTxProcessorCc::Execute(CcShard &ccs) return true; } +bool UpdateCceCkptTsCc::ValidTermCheck() const +{ + int64_t ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t candidate_ng_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); + ng_term = std::max(ng_term, candidate_ng_term); + int64_t standby_node_term = Sharder::Instance().StandbyNodeTerm(); + int64_t current_term = std::max(ng_term, standby_node_term); + + if (current_term < 0 || current_term != term_) + { + LOG(INFO) + << "UpdateCceCkptTsCc::ValidTermCheck failed with current term: " + << current_term << ", term_: " << term_; + return false; + } + + return true; +} + bool UpdateCceCkptTsCc::Execute(CcShard &ccs) { + if (!ValidTermCheck()) + { + LOG(INFO) << "UpdateCceCkptTsCc::ValidTermCheck failed on shard: " + << ccs.core_id_; + SetFinished(); + return false; + } + assert(indices_.count(ccs.core_id_) > 0); auto &index = indices_[ccs.core_id_]; @@ -1183,16 +1211,6 @@ bool UpdateCceCkptTsCc::Execute(CcShard &ccs) return false; } - int64_t ng_leader_term = Sharder::Instance().LeaderTerm(node_group_id_); - int64_t standby_node_term = Sharder::Instance().StandbyNodeTerm(); - int64_t current_term = std::max(ng_leader_term, standby_node_term); - - if (current_term < 0 || current_term != term_) - { - SetFinished(); - return false; - } - size_t last_index = std::min(index + SCAN_BATCH_SIZE, records.size()); CcMap *ccm = ccs.GetCcm(table_name_, node_group_id_); diff --git a/tx_service/src/cc/local_cc_handler.cpp b/tx_service/src/cc/local_cc_handler.cpp index 9dd7962d..632ea213 100644 --- a/tx_service/src/cc/local_cc_handler.cpp +++ b/tx_service/src/cc/local_cc_handler.cpp @@ -62,7 +62,8 @@ void txservice::LocalCcHandler::AcquireWrite( uint32_t hd_res_idx, CcProtocol proto, IsolationLevel iso_level, - bool abort_if_oom) + bool abort_if_oom, + bool allow_run_on_candidate) { uint32_t ng_id = Sharder::Instance().ShardToCcNodeGroup(key_shard_code); AcquireKeyResult &acquire_result = hres.Value()[hd_res_idx]; @@ -92,7 +93,8 @@ void txservice::LocalCcHandler::AcquireWrite( hd_res_idx, proto, iso_level, - abort_if_oom); + abort_if_oom, + allow_run_on_candidate); TX_TRACE_ACTION(this, req); TX_TRACE_DUMP(req); @@ -274,7 +276,8 @@ txservice::CcReqStatus txservice::LocalCcHandler::PostWrite( const TxRecord *record, OperationType operation_type, uint32_t key_shard_code, - CcHandlerResult &hres) + CcHandlerResult &hres, + bool allow_run_on_candidate) { uint32_t ng_id = cce_addr.NodeGroupId(); uint32_t dest_node_id = Sharder::Instance().LeaderNodeId(ng_id); @@ -293,7 +296,8 @@ txservice::CcReqStatus txservice::LocalCcHandler::PostWrite( record, operation_type, key_shard_code, - &hres); + &hres, + allow_run_on_candidate); TX_TRACE_ACTION(this, req); TX_TRACE_DUMP(req); cc_shards_.EnqueueCcRequest(thd_id_, cce_addr.CoreId(), req); @@ -328,7 +332,8 @@ txservice::CcReqStatus txservice::LocalCcHandler::PostRead( CcHandlerResult &hres, bool is_local, bool need_remote_resp, - PostReadType post_read_type) + PostReadType post_read_type, + bool allow_run_on_candidate) { if (IsStandbyTx(tx_term)) { @@ -369,7 +374,8 @@ txservice::CcReqStatus txservice::LocalCcHandler::PostRead( key_ts, gap_ts, post_read_type, - &hres); + &hres, + allow_run_on_candidate); TX_TRACE_ACTION(this, req); TX_TRACE_DUMP(req); if (thd_id_ == cce_addr.CoreId()) @@ -434,6 +440,7 @@ void txservice::LocalCcHandler::Read(const TableName &table_name, CcProtocol proto, bool is_for_write, bool is_covering_keys, + bool allow_run_on_candidate, bool point_read_on_miss, int32_t partition_id, bool abort_if_oom) @@ -469,7 +476,7 @@ void txservice::LocalCcHandler::Read(const TableName &table_name, is_for_write, is_covering_keys, nullptr, - false, + allow_run_on_candidate, point_read_on_miss, partition_id, abort_if_oom); @@ -498,6 +505,7 @@ void txservice::LocalCcHandler::Read(const TableName &table_name, proto, is_for_write, is_covering_keys, + allow_run_on_candidate, point_read_on_miss, partition_id, abort_if_oom); @@ -587,7 +595,7 @@ bool txservice::LocalCcHandler::ReadLocal(const TableName &table_name, IsolationLevel iso_level, CcProtocol proto, bool is_for_write, - bool is_recovering, + bool allow_run_on_candidate, bool execute_immediately) { ReadKeyResult &read_result = hres.Value(); @@ -609,14 +617,15 @@ bool txservice::LocalCcHandler::ReadLocal(const TableName &table_name, { ccs = cc_shards_.cc_shards_[thd_id_].get(); } - int64_t term; + int64_t term = -1; uint32_t shard_code = tx_number >> 32L; uint32_t cc_ng_id = shard_code >> 10; - if (is_recovering) + if (allow_run_on_candidate) { term = Sharder::Instance().CandidateLeaderTerm(cc_ng_id); } - else + + if (term < 0) { int64_t ng_leader_term = Sharder::Instance().LeaderTerm(cc_ng_id); int64_t standby_node_term = Sharder::Instance().StandbyNodeTerm(); @@ -659,7 +668,7 @@ bool txservice::LocalCcHandler::ReadLocal(const TableName &table_name, is_for_write, false, nullptr, - is_recovering); + allow_run_on_candidate); TX_TRACE_ACTION(this, read_req); TX_TRACE_DUMP(read_req); @@ -732,7 +741,7 @@ bool txservice::LocalCcHandler::ReadLocal(const TableName &table_name, IsolationLevel iso_level, CcProtocol proto, bool is_for_write, - bool is_recovering) + bool allow_run_on_candidate) { ReadKeyResult &read_result = hres.Value(); read_result.rec_ = &record; @@ -756,7 +765,7 @@ bool txservice::LocalCcHandler::ReadLocal(const TableName &table_name, int64_t term = -1; uint32_t shard_code = tx_number >> 32L; uint32_t cc_ng_id = shard_code >> 10; - if (is_recovering) + if (allow_run_on_candidate) { term = Sharder::Instance().CandidateLeaderTerm(cc_ng_id); } @@ -1379,11 +1388,17 @@ void txservice::LocalCcHandler::ScanClose(const TableName &table_name, void txservice::LocalCcHandler::NewTxn(CcHandlerResult &hres, IsolationLevel iso_level, NodeGroupId tx_ng_id, - uint32_t log_group_id) + uint32_t log_group_id, + bool allow_run_on_candidate) { CcShard &ccs = *(cc_shards_.cc_shards_[thd_id_]); int64_t term = Sharder::Instance().LeaderTerm(tx_ng_id); + if (term < 0 && allow_run_on_candidate) + { + term = Sharder::Instance().CandidateLeaderTerm(tx_ng_id); + } + if (term < 0) { term = Sharder::Instance().StandbyNodeTerm(); diff --git a/tx_service/src/cc/local_cc_shards.cpp b/tx_service/src/cc/local_cc_shards.cpp index 2c1c9a76..9316fa19 100644 --- a/tx_service/src/cc/local_cc_shards.cpp +++ b/tx_service/src/cc/local_cc_shards.cpp @@ -716,8 +716,7 @@ void LocalCcShards::CreateSchemaRecoveryTx( false, true, 0, - false, - true); + false); txm->Execute(&read_req); read_req.Wait(); if (read_req.IsError()) @@ -820,8 +819,7 @@ void LocalCcShards::CreateSplitRangeRecoveryTx( false, true, 0, - false, - true); + false); txm->Execute(&read_req); read_req.Wait(); // Only case we fail here is that leader gone before replay @@ -3201,7 +3199,9 @@ void LocalCcShards::PostProcessFlushTaskEntries( } if (!Sharder::Instance().CheckLeaderTerm( - task->node_group_id_, task->node_group_term_)) + task->node_group_id_, + task->node_group_term_, + true)) { err_code = CcErrorCode::REQUESTED_NODE_NOT_LEADER; } @@ -3251,7 +3251,9 @@ void LocalCcShards::PostProcessFlushTaskEntries( { if (!task->during_split_range_) { - range_entry->UpdateLastDataSyncTS(task->data_sync_ts_); + uint64_t last_sync_ts = + task->run_on_leader_node_ ? task->data_sync_ts_ : 0; + range_entry->UpdateLastDataSyncTS(last_sync_ts); range_entry->UnPinStoreRange(); // Commit the data sync txm txservice::CommitTx(entry->data_sync_txm_); @@ -3287,7 +3289,7 @@ void LocalCcShards::PostProcessFlushTaskEntries( } if (!Sharder::Instance().CheckLeaderTerm( - task->node_group_id_, task->node_group_term_)) + task->node_group_id_, task->node_group_term_, true)) { err_code = CcErrorCode::REQUESTED_NODE_NOT_LEADER; } @@ -3390,7 +3392,7 @@ void LocalCcShards::PostProcessRangePartitionDataSyncTask( << task->table_name_.Trace() << ", retrying."; std::this_thread::sleep_for(1s); if (!Sharder::Instance().CheckLeaderTerm( - task->node_group_id_, task->node_group_term_)) + task->node_group_id_, task->node_group_term_, true)) { LOG(ERROR) << "Leader term changed during store slice update"; @@ -3406,7 +3408,9 @@ void LocalCcShards::PostProcessRangePartitionDataSyncTask( { if (!task->during_split_range_) { - range_entry->UpdateLastDataSyncTS(task->data_sync_ts_); + uint64_t last_sync_ts = + task->run_on_leader_node_ ? task->data_sync_ts_ : 0; + range_entry->UpdateLastDataSyncTS(last_sync_ts); range_entry->UnPinStoreRange(); // Commit the data sync txm txservice::CommitTx(data_sync_txm); @@ -3461,8 +3465,8 @@ void LocalCcShards::PostProcessRangePartitionDataSyncTask( task->id_); } - if (!Sharder::Instance().CheckLeaderTerm(task->node_group_id_, - task->node_group_term_)) + if (!Sharder::Instance().CheckLeaderTerm( + task->node_group_id_, task->node_group_term_, true)) { err_code = CcErrorCode::REQUESTED_NODE_NOT_LEADER; } @@ -3627,6 +3631,7 @@ void LocalCcShards::DataSyncForRangePartition( // might miss the data that has not been recovered yet. data_sync_task->SetErrorCode( CcErrorCode::REQUESTED_NODE_NOT_LEADER); + data_sync_task->SetRunOnLeaderNode(false); } defer_unpin = std::shared_ptr( @@ -3638,7 +3643,12 @@ void LocalCcShards::DataSyncForRangePartition( data_sync_txm = txservice::NewTxInit(tx_service_, IsolationLevel::RepeatableRead, CcProtocol::Locking, - ng_id); + ng_id, + -1, + false, + nullptr, + nullptr, + true); if (data_sync_txm == nullptr) { @@ -3700,7 +3710,8 @@ void LocalCcShards::DataSyncForRangePartition( { LOG(ERROR) << "DataSync add read lock on table failed, " "table name: " - << table_key.Name().StringView(); + << table_key.Name().StringView() << ", error code: " + << static_cast(read_req.ErrorCode()); // If read lock acquire failed, retry next time. // Put back into the beginning. @@ -3838,6 +3849,10 @@ void LocalCcShards::DataSyncForRangePartition( assert(store_range); last_sync_ts = is_dirty ? 0 : range_entry->GetLastSyncTs(); + if (Sharder::Instance().CandidateLeaderTerm(ng_id) > 0) + { + data_sync_task->SetRunOnLeaderNode(false); + } } if (table_name.IsBase()) @@ -3912,7 +3927,10 @@ void LocalCcShards::DataSyncForRangePartition( txservice::CommitTx(data_sync_txm); // Update the task status for this range. - range_entry->UpdateLastDataSyncTS(data_sync_task->data_sync_ts_); + uint64_t last_sync_ts = data_sync_task->run_on_leader_node_ + ? data_sync_task->data_sync_ts_ + : 0; + range_entry->UpdateLastDataSyncTS(last_sync_ts); // Generally, the StoreRange will be pinned only when there are data // items in the range that needs to be ckpted. if (scan_delta_size_cc.StoreRangePtr() != nullptr) @@ -4069,7 +4087,8 @@ void LocalCcShards::DataSyncForRangePartition( false, store_range, &slices_delta_size, - schema_version); + schema_version, + !data_sync_task->run_on_leader_node_); { // DataSync Worker will call PostProcessDataSyncTask() to decrement @@ -4468,7 +4487,7 @@ void LocalCcShards::PostProcessHashPartitionDataSyncTask( // Make sure that the term has not changed so that catalog entry // is still valid. if (!Sharder::Instance().CheckLeaderTerm( - task->node_group_id_, task->node_group_term_)) + task->node_group_id_, task->node_group_term_, true)) { err_code = CcErrorCode::NG_TERM_CHANGED; } @@ -4492,7 +4511,9 @@ void LocalCcShards::PostProcessHashPartitionDataSyncTask( // term has not changed. catalog entry should not be // nullptr. assert(catalog_entry); - catalog_entry->UpdateLastDataSyncTS(task->data_sync_ts_, + uint64_t last_sync_ts = + task->run_on_leader_node_ ? task->data_sync_ts_ : 0; + catalog_entry->UpdateLastDataSyncTS(last_sync_ts, task->id_); } } @@ -4673,6 +4694,7 @@ void LocalCcShards::DataSyncForHashPartition( // might miss the data that has not been recovered yet. data_sync_task->SetErrorCode( CcErrorCode::REQUESTED_NODE_NOT_LEADER); + data_sync_task->SetRunOnLeaderNode(false); } } @@ -4698,7 +4720,12 @@ void LocalCcShards::DataSyncForHashPartition( txservice::NewTxInit(tx_service_, IsolationLevel::RepeatableRead, CcProtocol::Locking, - ng_id); + ng_id, + -1, + false, + nullptr, + nullptr, + true); if (data_sync_txm == nullptr) { @@ -5451,6 +5478,19 @@ void LocalCcShards::UpdateSlices(const TableName &table_name, paused_subslice_post_ckpt_size < avg_subslice_size ? paused_subslice_post_ckpt_size : 0; + // For very large slices that require multiple rounds of data sync + // scan, each round flushes exported keys to storage and updates the + // checkpoint timestamp, allowing data to be evicted from memory. + // When a scan round finishes, the pinned slice is unpinned. In + // subsequent rounds, the slice is re-pinned, triggering a load + // operation that updates `StoreSlice::size_` with the flushed keys' + // size. This can cause a mismatch with the slice size from previous + // batches. + if (slice_split_keys.size() > 0 && + slice_split_keys.back().cur_size_ != subslice_size) + { + subslice_size = slice_split_keys.back().cur_size_; + } } else { @@ -5613,8 +5653,11 @@ void LocalCcShards::SplitFlushRange( // by data store are always unique. std::vector> new_range_ids; int32_t new_part_id; - if (!GetNextRangePartitionId( - table_name, table_schema.get(), split_keys.size(), new_part_id)) + if (!GetNextRangePartitionId(table_name, + table_schema.get(), + node_group, + split_keys.size(), + new_part_id)) { LOG(ERROR) << "Split range failed due to unable to get next " "partition id. table_name = " @@ -5691,7 +5734,9 @@ void LocalCcShards::SplitFlushRange( return; } - range_entry->UpdateLastDataSyncTS(data_sync_task->data_sync_ts_); + uint64_t last_sync_ts = + data_sync_task->run_on_leader_node_ ? data_sync_task->data_sync_ts_ : 0; + range_entry->UpdateLastDataSyncTS(last_sync_ts); range_entry->UnPinStoreRange(); data_sync_task->SetFinish(); @@ -6683,6 +6728,7 @@ void LocalCcShards::ReportHashPartitionCkptHeapUsage() bool LocalCcShards::GetNextRangePartitionId(const TableName &tablename, const TableSchema *table_schema, + NodeGroupId ng_id, uint32_t range_cnt, int32_t &out_next_partition_id) { @@ -6698,8 +6744,12 @@ bool LocalCcShards::GetNextRangePartitionId(const TableName &tablename, } int64_t first_reserved_id = -1; - bool res = Sequences::ApplyIdOfTableRangePartition( - tablename, range_cnt, first_reserved_id, reserved_cnt, key_schema_ts); + bool res = Sequences::ApplyIdOfTableRangePartition(tablename, + ng_id, + range_cnt, + first_reserved_id, + reserved_cnt, + key_schema_ts); if (!res) { diff --git a/tx_service/src/checkpointer.cpp b/tx_service/src/checkpointer.cpp index 7de601aa..ce7acde6 100644 --- a/tx_service/src/checkpointer.cpp +++ b/tx_service/src/checkpointer.cpp @@ -251,8 +251,8 @@ void Checkpointer::Ckpt(bool is_last_ckpt) for (auto it = tables.begin(); it != tables.end(); ++it) { // Check leader term for leader node - if (!is_standby_node && - Sharder::Instance().LeaderTerm(node_group) != leader_term) + if (!is_standby_node && !Sharder::Instance().CheckLeaderTerm( + node_group, leader_term, true)) { break; } @@ -307,7 +307,7 @@ void Checkpointer::Ckpt(bool is_last_ckpt) // Check leadter term for leader node if (!is_standby_node && - Sharder::Instance().LeaderTerm(node_group) != leader_term) + !Sharder::Instance().CheckLeaderTerm(node_group, leader_term, true)) { continue; } diff --git a/tx_service/src/data_sync_task.cpp b/tx_service/src/data_sync_task.cpp index d0028412..727e5aae 100644 --- a/tx_service/src/data_sync_task.cpp +++ b/tx_service/src/data_sync_task.cpp @@ -79,7 +79,8 @@ DataSyncTask::DataSyncTask(const TableName &table_name, range_entry_(range_entry), during_split_range_(true), export_base_table_items_(export_base_table_items), - tx_number_(txn) + tx_number_(txn), + run_on_leader_node_(true) { assert(!table_name_.IsHashPartitioned()); if (start_key_.KeyPtr() == diff --git a/tx_service/src/remote/remote_cc_handler.cpp b/tx_service/src/remote/remote_cc_handler.cpp index 848ae8f7..075707c8 100644 --- a/tx_service/src/remote/remote_cc_handler.cpp +++ b/tx_service/src/remote/remote_cc_handler.cpp @@ -326,6 +326,7 @@ void txservice::remote::RemoteCcHandler::Read( CcProtocol proto, bool is_for_write, bool is_covering_keys, + bool allow_run_on_candidate, bool point_read_on_miss, int32_t partition_id, bool abort_if_oom) @@ -381,6 +382,7 @@ void txservice::remote::RemoteCcHandler::Read( read->set_ts(ts); read->set_schema_version(schema_version); read->set_abort_if_oom(abort_if_oom); + read->set_allow_run_on_candidate(allow_run_on_candidate); stream_sender_.SendMessageToNg(dest_ng_id, send_msg, &hres); } diff --git a/tx_service/src/remote/remote_cc_request.cpp b/tx_service/src/remote/remote_cc_request.cpp index 32fbb935..dfa95b25 100644 --- a/tx_service/src/remote/remote_cc_request.cpp +++ b/tx_service/src/remote/remote_cc_request.cpp @@ -462,6 +462,7 @@ void txservice::remote::RemoteRead::Reset(std::unique_ptr input_msg) req.is_for_write(), req.is_covering_keys(), nullptr, + req.allow_run_on_candidate(), req.point_read_on_miss(), req.partition_id(), req.abort_if_oom()); diff --git a/tx_service/src/sequences/sequences.cpp b/tx_service/src/sequences/sequences.cpp index 3154bcb4..d8e01897 100644 --- a/tx_service/src/sequences/sequences.cpp +++ b/tx_service/src/sequences/sequences.cpp @@ -74,7 +74,6 @@ bool Sequences::DeleteSequenceInternal(const std::string &seq_name, 0, false, false, - false, nullptr, nullptr, txm); @@ -276,7 +275,6 @@ int Sequences::ApplySequenceBatch( 0, false, false, - false, coro_functors.first, coro_functors.second, txm); @@ -303,7 +301,6 @@ int Sequences::ApplySequenceBatch( false, 0, false, - false, true, coro_functors.first, coro_functors.second, @@ -472,7 +469,6 @@ int Sequences::UpdateAutoIncrement( false, 0, false, - false, true, coro_functors.first, coro_functors.second, @@ -548,6 +544,7 @@ int Sequences::UpdateAutoIncrement( } bool Sequences::ApplyIdOfTableRangePartition(const TableName &table, + NodeGroupId ng_id, int64_t desired_vals, int64_t &first_reserved_id, int64_t &reserved_vals, @@ -558,7 +555,13 @@ bool Sequences::ApplyIdOfTableRangePartition(const TableName &table, TransactionExecution *txm = NewTxInit(instance_->tx_service_, txservice::IsolationLevel::RepeatableRead, - txservice::CcProtocol::Locking); + txservice::CcProtocol::Locking, + ng_id, + -1, + false, + nullptr, + nullptr, + true); if (txm == nullptr) { @@ -578,7 +581,6 @@ bool Sequences::ApplyIdOfTableRangePartition(const TableName &table, 0, false, false, - false, nullptr, nullptr, txm); @@ -605,7 +607,6 @@ bool Sequences::ApplyIdOfTableRangePartition(const TableName &table, false, 0, false, - false, true, nullptr, nullptr, @@ -709,7 +710,6 @@ bool Sequences::InitIdOfTableRangePartition(const TableName &table, 0, false, false, - false, nullptr, nullptr, txm); @@ -736,7 +736,6 @@ bool Sequences::InitIdOfTableRangePartition(const TableName &table, false, 0, false, - false, true, nullptr, nullptr, @@ -829,7 +828,6 @@ bool Sequences::InitIdOfAutoIncrementColumn(const TableName &table_name) 0, false, false, - false, nullptr, nullptr, txm); diff --git a/tx_service/src/sharder.cpp b/tx_service/src/sharder.cpp index b1e5df45..998b9b33 100644 --- a/tx_service/src/sharder.cpp +++ b/tx_service/src/sharder.cpp @@ -614,9 +614,15 @@ std::shared_ptr Sharder::UpdateCcNodeServiceChannel( } } -bool Sharder::CheckLeaderTerm(uint32_t ng_id, int64_t term) const +bool Sharder::CheckLeaderTerm(uint32_t ng_id, + int64_t term, + bool check_candidate) const { int64_t node_term = LeaderTerm(ng_id); + if (node_term < 0 && check_candidate) + { + node_term = CandidateLeaderTerm(ng_id); + } if (node_term < 0) { diff --git a/tx_service/src/tx_execution.cpp b/tx_service/src/tx_execution.cpp index 24553f56..be14d6b7 100644 --- a/tx_service/src/tx_execution.cpp +++ b/tx_service/src/tx_execution.cpp @@ -179,6 +179,7 @@ void TransactionExecution::Reset() tx_req_queue_.Reset(); req_queue_lock_.Unlock(); } + allow_run_on_candidate_ = false; ClearCachedBucketInfos(); lock_cluster_config_op_.Reset(); @@ -244,6 +245,7 @@ void TransactionExecution::SetRecoverTxState(uint64_t txn, tx_number_.store(txn, std::memory_order_relaxed); tx_term_ = tx_term; commit_ts_ = commit_ts; + allow_run_on_candidate_ = true; tx_status_.store(TxnStatus::Recovering, std::memory_order_relaxed); } @@ -551,12 +553,19 @@ void TransactionExecution::InitTx(IsolationLevel iso_level, NodeGroupId tx_ng_id, bool start_now, const std::function *yield_func, - const std::function *resume_func) + const std::function *resume_func, + bool allow_run_on_candidate) { if (start_now) { - InitTxRequest init_tx_req{ - iso_level, protocol, yield_func, resume_func, this, tx_ng_id}; + InitTxRequest init_tx_req{iso_level, + protocol, + yield_func, + resume_func, + this, + tx_ng_id, + UINT32_MAX, + allow_run_on_candidate}; Execute(&init_tx_req); init_tx_req.Wait(); } @@ -567,6 +576,7 @@ void TransactionExecution::InitTx(IsolationLevel iso_level, init_tx_req_->protocol_ = protocol; init_tx_req_->tx_ng_id_ = tx_ng_id; init_tx_req_->txm_ = this; + init_tx_req_->allow_run_on_candidate_ = allow_run_on_candidate; Execute(init_tx_req_.get()); } } @@ -735,6 +745,7 @@ void TransactionExecution::ProcessTxRequest(InitTxRequest &init_txn_req) : init_txn_req.tx_ng_id_; init_txn_.log_group_id_ = init_txn_req.log_group_id_; + allow_run_on_candidate_ = init_txn_req.allow_run_on_candidate_; PushOperation(&init_txn_); Process(init_txn_); } @@ -1743,7 +1754,8 @@ void TransactionExecution::Process(InitTxnOperation &init_txn) cc_handler_->NewTxn(init_txn.hd_result_, iso_level_, init_txn.tx_ng_id_, - init_txn.log_group_id_); + init_txn.log_group_id_, + allow_run_on_candidate_); } void TransactionExecution::PostProcess(InitTxnOperation &init_txn) @@ -1866,7 +1878,7 @@ void TransactionExecution::Process(ReadOperation &read) read.iso_level_, read.protocol_, read.read_tx_req_->is_for_write_, - read.read_tx_req_->is_recovering_); + allow_run_on_candidate_); } else { @@ -1883,7 +1895,7 @@ void TransactionExecution::Process(ReadOperation &read) read.iso_level_, read.protocol_, read.read_tx_req_->is_for_write_, - read.read_tx_req_->is_recovering_); + allow_run_on_candidate_); } if (finished) @@ -1996,6 +2008,9 @@ void TransactionExecution::Process(ReadOperation &read) uint32_t residual = key_hash & 0x3FF; NodeGroupId bucket_ng = bucket_info->BucketOwner(); key_shard_code = bucket_ng << 10 | residual; + + // Set the lock range bucket result + lock_range_bucket_result_.SetFinished(); } else if (!lock_range_bucket_result_.IsFinished()) { @@ -2076,6 +2091,7 @@ void TransactionExecution::Process(ReadOperation &read) read.protocol_, read.read_tx_req_->is_for_write_, is_covering_keys, + allow_run_on_candidate_, read.read_tx_req_->point_read_on_cache_miss_, partition_id, HoldingRangeReadLock()); @@ -2309,7 +2325,7 @@ void TransactionExecution::Process(ReadLocalOperation &lock_local) IsolationLevel::RepeatableRead, CcProtocol::Locking, false, - false, + allow_run_on_candidate_, lock_local.execute_immediately_); if (finished) { @@ -4047,7 +4063,8 @@ void TransactionExecution::Process(AcquireWriteOperation &acquire_write) res_idx++, protocol_, iso_level_, - abort_if_oom); + abort_if_oom, + allow_run_on_candidate_); for (auto &[forward_shard_code, cce_addr] : write_entry.forward_addr_) { @@ -4065,7 +4082,8 @@ void TransactionExecution::Process(AcquireWriteOperation &acquire_write) res_idx++, protocol_, iso_level_, - abort_if_oom); + abort_if_oom, + allow_run_on_candidate_); } } } @@ -4358,7 +4376,11 @@ void TransactionExecution::Process(ValidateOperation &validate) 0, commit_ts_, cce_addr, - validate.hd_result_); + validate.hd_result_, + false, + true, + PostReadType::Release, + allow_run_on_candidate_); if (ret == CcReqStatus::SentLocal) { @@ -5297,7 +5319,8 @@ void TransactionExecution::Process(PostProcessOp &post_process) write_entry.rec_.get(), write_entry.op_, write_entry.key_shard_code_, - post_process.hd_result_); + post_process.hd_result_, + allow_run_on_candidate_); update_post_cnt(ret); for (auto &[forward_shard_code, cce_addr] : @@ -5312,7 +5335,8 @@ void TransactionExecution::Process(PostProcessOp &post_process) write_entry.rec_.get(), write_entry.op_, forward_shard_code, - post_process.hd_result_); + post_process.hd_result_, + allow_run_on_candidate_); update_post_cnt(ret); } } @@ -5336,7 +5360,8 @@ void TransactionExecution::Process(PostProcessOp &post_process) nullptr, OperationType::CommitCommands, 0, - post_process.hd_result_); + post_process.hd_result_, + allow_run_on_candidate_); update_post_cnt(ret); if (cmd_set_entry.forward_entry_ != nullptr && @@ -5388,7 +5413,8 @@ void TransactionExecution::Process(PostProcessOp &post_process) nullptr, write_entry.op_, write_entry.key_shard_code_, - post_process.hd_result_); + post_process.hd_result_, + allow_run_on_candidate_); update_post_cnt(ret); } // Keys that were not successfully locked in the cc @@ -5409,7 +5435,8 @@ void TransactionExecution::Process(PostProcessOp &post_process) nullptr, write_entry.op_, forward_shard_code, - post_process.hd_result_); + post_process.hd_result_, + allow_run_on_candidate_); update_post_cnt(ret); } } @@ -5435,7 +5462,8 @@ void TransactionExecution::Process(PostProcessOp &post_process) nullptr, OperationType::CommitCommands, 0, - post_process.hd_result_); + post_process.hd_result_, + allow_run_on_candidate_); update_post_cnt(ret); if (cmd_set_entry.forward_entry_ != nullptr && @@ -5450,7 +5478,8 @@ void TransactionExecution::Process(PostProcessOp &post_process) nullptr, OperationType::CommitCommands, 0, - post_process.hd_result_); + post_process.hd_result_, + allow_run_on_candidate_); update_post_cnt(ret); } } @@ -5473,7 +5502,8 @@ void TransactionExecution::Process(PostProcessOp &post_process) 0, 0, cce_addr, - post_process.hd_result_); + post_process.hd_result_, + allow_run_on_candidate_); update_post_cnt(ret); } } @@ -5758,7 +5788,10 @@ void TransactionExecution::ReleaseMetaDataReadLock( commit_ts_, cce_addr, meta_data_hd_result, - true); + true, + true, + PostReadType::Release, + allow_run_on_candidate_); --ref_cnt; if (ret == CcReqStatus::SentLocal) @@ -6342,7 +6375,11 @@ void TransactionExecution::Process(PostReadOperation &post_read_operation) 0, commit_ts_, *post_read_operation.cce_addr_, - post_read_operation.hd_result_); + post_read_operation.hd_result_, + false, + true, + PostReadType::Release, + allow_run_on_candidate_); if (ret == CcReqStatus::Processed) { AdvanceCommand(); @@ -7415,7 +7452,8 @@ void TransactionExecution::Process(CmdForwardAcquireWriteOp &forward_acquire) res_idx++, protocol_, iso_level_, - abort_if_oom); + abort_if_oom, + allow_run_on_candidate_); } } @@ -7740,6 +7778,7 @@ void TransactionExecution::Process(BatchReadOperation &batch_read_op) { partition_id = batch_read_op.range_ids_[idx]; } + cc_handler_->Read( table_name, batch_read_op.batch_read_tx_req_->schema_version_, @@ -7757,6 +7796,7 @@ void TransactionExecution::Process(BatchReadOperation &batch_read_op) protocol_, batch_read_op.batch_read_tx_req_->is_for_write_, false, // is_covering_keys + allow_run_on_candidate_, batch_read_op.batch_read_tx_req_->point_read_on_cache_miss_, partition_id, abort_if_oom); diff --git a/tx_service/src/tx_operation.cpp b/tx_service/src/tx_operation.cpp index 663d50c1..d991b9a9 100644 --- a/tx_service/src/tx_operation.cpp +++ b/tx_service/src/tx_operation.cpp @@ -143,7 +143,6 @@ void ReadOperation::Forward(TransactionExecution *txm) { if (!read_tx_req_->read_local_) { - // Just returned from lock_bucket_op_, check lock_bucket_result_. assert(lock_range_bucket_result_->IsFinished()); if (lock_range_bucket_result_->IsError()) { @@ -4221,7 +4220,9 @@ void SplitFlushRangeOp::Forward(TransactionExecution *txm) if (lock_cluster_config_op_.hd_result_->IsError()) { LOG(ERROR) << "Split Flush read cluster config failed, tx_number:" - << txm->TxNumber(); + << txm->TxNumber() << ", error code: " + << static_cast( + lock_cluster_config_op_.hd_result_->ErrorCode()); ForceToFinish(txm); return; }