diff --git a/tx_service/include/cc/cc_request.h b/tx_service/include/cc/cc_request.h index 618f0ca9..a0327921 100644 --- a/tx_service/include/cc/cc_request.h +++ b/tx_service/include/cc/cc_request.h @@ -283,6 +283,11 @@ struct TemplatedCcRequest : public CcRequestBase return node_group_id_; } + int64_t NodeGroupTerm() const + { + return ng_term_; + } + int64_t TxTerm() const { return tx_term_; @@ -526,6 +531,40 @@ struct AcquireAllCc : public TemplatedCcRequest AcquireAllCc(const AcquireAllCc &rhs) = delete; AcquireAllCc(AcquireAllCc &&rhs) = delete; + bool ValidTermCheck() override + { + int64_t ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t ng_candidate_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); + ng_term = std::max(ng_term, ng_candidate_term); + + if (ng_term < 0) + { + return false; + } + else + { + uint32_t tx_ng_id = (Txn() >> 32L) >> 10; + if (tx_ng_id == node_group_id_ && ng_term != tx_term_) + { + // The request is processed on the coordinator candidate leader, + // but the term is mismatch. + return false; + } + } + assert(ng_term > 0); + + if (ng_term_ < 0) + { + ng_term_ = ng_term; + } + else if (ng_term != ng_term_) + { + return false; + } + return true; + } + void Reset(const TableName *tname, const TxKey *key, uint32_t node_group_id, @@ -919,6 +958,40 @@ struct PostWriteAllCc PostWriteAllCc(const PostWriteAllCc &rhs) = delete; PostWriteAllCc(PostWriteAllCc &&rhs) = delete; + bool ValidTermCheck() override + { + int64_t ng_term = Sharder::Instance().LeaderTerm(node_group_id_); + int64_t ng_candidate_term = + Sharder::Instance().CandidateLeaderTerm(node_group_id_); + ng_term = std::max(ng_term, ng_candidate_term); + + if (ng_term < 0) + { + return false; + } + else + { + uint32_t tx_ng_id = (Txn() >> 32L) >> 10; + if (tx_ng_id == node_group_id_ && ng_term != tx_term_) + { + // The request is processed on the coordinator candidate leader, + // but the term is mismatch. + return false; + } + } + assert(ng_term > 0); + + if (ng_term_ < 0) + { + ng_term_ = ng_term; + } + else if (ng_term != ng_term_) + { + return false; + } + return true; + } + void Reset(const TableName *tname, const TxKey *key, uint32_t node_group_id, diff --git a/tx_service/include/cc/range_cc_map.h b/tx_service/include/cc/range_cc_map.h index 5b39fd71..f331f4a5 100644 --- a/tx_service/include/cc/range_cc_map.h +++ b/tx_service/include/cc/range_cc_map.h @@ -399,13 +399,6 @@ class RangeCcMap : public TemplateCcMap */ bool Execute(PostWriteAllCc &req) override { - int64_t ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId()); - if (ng_term < 0) - { - req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); - return false; - } - // When the commit ts is 0 or the commit type is DowngradeLock, the // request commits nothing and only removes the write intents/locks // acquired earlier. @@ -610,6 +603,9 @@ class RangeCcMap : public TemplateCcMap if (txservice_enable_key_cache && this->table_name_.IsBase()) { + int64_t ng_term = req.NodeGroupTerm(); + assert(ng_term > 0); + // try to init the key cache for new range if it // lands on this ng for (auto new_range : new_range_entries) diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index 8a0b5c53..a18227f7 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -700,11 +700,8 @@ class TemplateCcMap : public CcMap bool will_insert = false; uint32_t ng_id = req.NodeGroupId(); - int64_t ng_term = Sharder::Instance().LeaderTerm(ng_id); - if (ng_term < 0) - { - return hd_res->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); - } + int64_t ng_term = req.NodeGroupTerm(); + assert(ng_term > 0); uint16_t tx_core_id = ((req.Txn() >> 32L) & 0x3FF) % shard_->core_cnt_; @@ -994,13 +991,6 @@ class TemplateCcMap : public CcMap return false; } - int64_t ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId()); - if (ng_term < 0) - { - req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); - return true; - } - const KeyT *target_key = nullptr; if (req.Key() != nullptr) { @@ -5570,6 +5560,21 @@ class TemplateCcMap : public CcMap it = deduce_iterator(*start_key); slice_end_key = typed_slice->EndKey(); end_it = deduce_iterator(*slice_end_key); + if (!(*start_key < *slice_end_key)) + { + DLOG(ERROR) + << "!!!ERROR!!! start key: " << start_key->ToString() + << ", search key: " << search_key.ToString() + << ", slice start key: " + << typed_slice->StartKey()->ToString() + << ", slice end key: " << slice_end_key->ToString() + << ", export base table item: " << std::boolalpha + << req.export_base_table_item_ + << ", current slice index: " + << req.curr_slice_index_[shard_->core_id_] + << " on core: " << shard_->core_id_ + << ", table: " << table_name_.StringView(); + } if (it != end_it) { diff --git a/tx_service/src/tx_operation.cpp b/tx_service/src/tx_operation.cpp index 1b24516b..ade2694a 100644 --- a/tx_service/src/tx_operation.cpp +++ b/tx_service/src/tx_operation.cpp @@ -4172,32 +4172,32 @@ void SplitFlushRangeOp::ClearInfos() void SplitFlushRangeOp::Forward(TransactionExecution *txm) { if (txm->TxStatus() == TxnStatus::Recovering && - Sharder::Instance().LeaderTerm(txm->TxCcNodeId()) < 0) + (Sharder::Instance().CandidateLeaderTerm(txm->TxCcNodeId()) != + txm->TxTerm() && + Sharder::Instance().LeaderTerm(txm->TxCcNodeId()) != txm->TxTerm())) { - // This is a recovered tx and replay is not done yet. We should wait for - // replay finish before forwarding tx machine. - if (Sharder::Instance().CandidateLeaderTerm(txm->TxCcNodeId()) != - txm->TxTerm()) - { - // Recovered term is invalid. Do not call ForceToFinish as it will - // cause infinite recursive call. Clean up tx state directly. - txm->bool_resp_->Finish(false); + // Recovered term is invalid. Do not call ForceToFinish as it will + // cause infinite recursive call. Clean up tx state directly. + txm->bool_resp_->Finish(false); - ClearInfos(); + ClearInfos(); - txm->state_stack_.pop_back(); - assert(txm->state_stack_.empty()); + txm->state_stack_.pop_back(); + assert(txm->state_stack_.empty()); - assert(this == txm->split_flush_op_.get()); - LocalCcShards *shards = Sharder::Instance().GetLocalCcShards(); - std::unique_lock lk( - shards->split_flush_range_op_pool_mux_); - shards->split_flush_range_op_pool_.emplace_back( - std::move(txm->split_flush_op_)); - assert(txm->split_flush_op_ == nullptr); - } + assert(this == txm->split_flush_op_.get()); + LocalCcShards *shards = Sharder::Instance().GetLocalCcShards(); + std::unique_lock lk(shards->split_flush_range_op_pool_mux_); + shards->split_flush_range_op_pool_.emplace_back( + std::move(txm->split_flush_op_)); + assert(txm->split_flush_op_ == nullptr); return; } + // else: allow forwarding recovered split flush tx on the candidate leader, + // otherwise, a deadlock might occur in the following scenario: If the + // replay data cannot be completed due to insufficient memory, the recovered + // split transaction will have to wait for the leader to complete log replay + // before it can continue forwarding. if (op_ == nullptr) { // Initialize commit ts as the start time of tx. This value will