-
Notifications
You must be signed in to change notification settings - Fork 9
Fix stuck during recovery #357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -283,6 +283,11 @@ struct TemplatedCcRequest : public CcRequestBase | |
| return node_group_id_; | ||
| } | ||
|
|
||
| int64_t NodeGroupTerm() const | ||
| { | ||
| return ng_term_; | ||
| } | ||
|
|
||
| int64_t TxTerm() const | ||
| { | ||
| return tx_term_; | ||
|
|
@@ -526,6 +531,40 @@ struct AcquireAllCc : public TemplatedCcRequest<AcquireAllCc, AcquireAllResult> | |
| AcquireAllCc(const AcquireAllCc &rhs) = delete; | ||
| AcquireAllCc(AcquireAllCc &&rhs) = delete; | ||
|
|
||
| bool ValidTermCheck() override | ||
| { | ||
| int64_t ng_term = Sharder::Instance().LeaderTerm(node_group_id_); | ||
| int64_t ng_candidate_term = | ||
| Sharder::Instance().CandidateLeaderTerm(node_group_id_); | ||
| ng_term = std::max(ng_term, ng_candidate_term); | ||
|
|
||
| if (ng_term < 0) | ||
| { | ||
| return false; | ||
| } | ||
| else | ||
| { | ||
| uint32_t tx_ng_id = (Txn() >> 32L) >> 10; | ||
| if (tx_ng_id == node_group_id_ && ng_term != tx_term_) | ||
| { | ||
| // The request is processed on the coordinator candidate leader, | ||
| // but the term is mismatch. | ||
| return false; | ||
| } | ||
| } | ||
| assert(ng_term > 0); | ||
|
|
||
| if (ng_term_ < 0) | ||
| { | ||
| ng_term_ = ng_term; | ||
| } | ||
| else if (ng_term != ng_term_) | ||
| { | ||
| return false; | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
|
Comment on lines
+534
to
+567
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix potential data race on Proposed fix (serialize `ng_term_` initialization + make term validation non-assert-only) bool ValidTermCheck() override
{
int64_t ng_term = Sharder::Instance().LeaderTerm(node_group_id_);
int64_t ng_candidate_term =
Sharder::Instance().CandidateLeaderTerm(node_group_id_);
ng_term = std::max(ng_term, ng_candidate_term);
if (ng_term < 0)
{
return false;
}
else
{
- uint32_t tx_ng_id = (Txn() >> 32L) >> 10;
+ // Txn encodes ng_id in the high bits; this is effectively >> 42.
+ uint32_t tx_ng_id = static_cast<uint32_t>(Txn() >> 42);
if (tx_ng_id == node_group_id_ && ng_term != tx_term_)
{
// The request is processed on the coordinator candidate leader,
// but the term is mismatch.
return false;
}
}
- assert(ng_term > 0);
+ // Don't rely on assert for validity.
+ if (ng_term <= 0)
+ {
+ return false;
+ }
- if (ng_term_ < 0)
- {
- ng_term_ = ng_term;
- }
- else if (ng_term != ng_term_)
- {
- return false;
- }
+ // AcquireAllCc runs in parallel across cores; protect ng_term_ init/check.
+ {
+ std::lock_guard<std::mutex> lk(mux_);
+ if (ng_term_ < 0)
+ {
+ ng_term_ = ng_term;
+ }
+ else if (ng_term != ng_term_)
+ {
+ return false;
+ }
+ }
return true;
}🤖 Prompt for AI Agents |
||
| void Reset(const TableName *tname, | ||
| const TxKey *key, | ||
| uint32_t node_group_id, | ||
|
|
@@ -919,6 +958,40 @@ struct PostWriteAllCc | |
| PostWriteAllCc(const PostWriteAllCc &rhs) = delete; | ||
| PostWriteAllCc(PostWriteAllCc &&rhs) = delete; | ||
|
|
||
| bool ValidTermCheck() override | ||
| { | ||
| int64_t ng_term = Sharder::Instance().LeaderTerm(node_group_id_); | ||
| int64_t ng_candidate_term = | ||
| Sharder::Instance().CandidateLeaderTerm(node_group_id_); | ||
| ng_term = std::max(ng_term, ng_candidate_term); | ||
|
|
||
| if (ng_term < 0) | ||
| { | ||
| return false; | ||
| } | ||
| else | ||
| { | ||
| uint32_t tx_ng_id = (Txn() >> 32L) >> 10; | ||
| if (tx_ng_id == node_group_id_ && ng_term != tx_term_) | ||
| { | ||
| // The request is processed on the coordinator candidate leader, | ||
| // but the term is mismatch. | ||
| return false; | ||
| } | ||
| } | ||
| assert(ng_term > 0); | ||
|
|
||
| if (ng_term_ < 0) | ||
| { | ||
| ng_term_ = ng_term; | ||
| } | ||
| else if (ng_term != ng_term_) | ||
| { | ||
| return false; | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| void Reset(const TableName *tname, | ||
| const TxKey *key, | ||
| uint32_t node_group_id, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -399,13 +399,6 @@ class RangeCcMap : public TemplateCcMap<KeyT, RangeRecord, true, false> | |
| */ | ||
| bool Execute(PostWriteAllCc &req) override | ||
| { | ||
| int64_t ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId()); | ||
| if (ng_term < 0) | ||
| { | ||
| req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); | ||
| return false; | ||
| } | ||
|
|
||
| // When the commit ts is 0 or the commit type is DowngradeLock, the | ||
| // request commits nothing and only removes the write intents/locks | ||
| // acquired earlier. | ||
|
|
@@ -610,6 +603,9 @@ class RangeCcMap : public TemplateCcMap<KeyT, RangeRecord, true, false> | |
|
|
||
| if (txservice_enable_key_cache && this->table_name_.IsBase()) | ||
| { | ||
| int64_t ng_term = req.NodeGroupTerm(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. introduce compile warning in release build. Use
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. variable |
||
| assert(ng_term > 0); | ||
|
|
||
|
Comment on lines
+606
to
+608
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # First, let's find and examine the PostWriteAllCc class and its ValidTermCheck implementation
rg -n "class PostWriteAllCc" --type cpp -A 20Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: # Search for ValidTermCheck implementations and overrides
rg -n "ValidTermCheck" --type cpp -B 2 -A 5Repository: eloqdata/tx_service Length of output: 27257 🏁 Script executed: # Search for Execute(PostWriteAllCc in the actual file to understand the context
rg -n "Execute.*PostWriteAllCc" tx_service/include/cc/range_cc_map.h -B 5 -A 20Repository: eloqdata/tx_service Length of output: 1106 🏁 Script executed: # Let's examine the specific file and lines mentioned
head -650 tx_service/include/cc/range_cc_map.h | tail -60Repository: eloqdata/tx_service Length of output: 3031 🏁 Script executed: # Search for TryPinNodeGroupData, PinNodeGroupData, UnpinNodeGroupData in range_cc_map.h
rg -n "TryPinNodeGroupData|PinNodeGroupData|UnpinNodeGroupData" tx_service/include/cc/range_cc_map.h -B 3 -A 3Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: # Look for the broader context around lines 606-608, particularly the PostCommit method
sed -n '550,700p' tx_service/include/cc/range_cc_map.h | cat -nRepository: eloqdata/tx_service Length of output: 8383 🏁 Script executed: # Search for NodeGroupTerm() usage and how it's validated elsewhere
rg -n "NodeGroupTerm()" tx_service/include/cc/range_cc_map.h -B 2 -A 2Repository: eloqdata/tx_service Length of output: 279 🏁 Script executed: # Search for similar assertions on ng_term or node_group terms in the codebase
rg -n "assert.*ng_term.*[>0]|assert.*term.*[>0]" --type cpp tx_service/include/cc/ | head -20Repository: eloqdata/tx_service Length of output: 921 🏁 Script executed: # Check if ValidTermCheck() is called at the start of PostWriteAllCc::Execute() in range_cc_map.h
sed -n '400,450p' tx_service/include/cc/range_cc_map.h | cat -nRepository: eloqdata/tx_service Length of output: 2619 🏁 Script executed: # Search for local_cc_shards to understand if pin/unpin operations are involved
rg -n "TryPinNodeGroupData|local_shards_|PostProcessFlushTaskEntries" tx_service/include/cc/range_cc_map.h -B 2 -A 2Repository: eloqdata/tx_service Length of output: 4238 🏁 Script executed: # Check PostWriteAllCc::Execute in cc_request.h to see the full implementation
rg -n "class PostWriteAllCc" tx_service/include/cc/cc_request.h -A 100 | head -120Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: # Find the full Execute() implementation for PostWriteAllCc in cc_request.h
rg -n "class PostWriteAllCc" tx_service/include/cc/cc_request.h -A 200 | grep -A 50 "bool Execute"Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: # Search for PostWriteAllCc declaration and where ValidTermCheck is called
rg -n "PostWriteAllCc" tx_service/include/cc/cc_request.h | head -20Repository: eloqdata/tx_service Length of output: 694 🏁 Script executed: # Look at the actual PostWriteAllCc class definition to see if it has an Execute that calls ValidTermCheck
sed -n '940,1100p' tx_service/include/cc/cc_request.h | cat -nRepository: eloqdata/tx_service Length of output: 5717 🏁 Script executed: # Check if InitKeyCache uses pin/unpin operations
rg -n "InitKeyCache" --type cpp -A 10 | head -50Repository: eloqdata/tx_service Length of output: 3701 🏁 Script executed: # Search for TemplateCcMap<KeyT, RangeRecord>::Execute to see if it calls ValidTermCheck
rg -n "class TemplateCcMap" tx_service/include/cc/template_cc_map.h -A 5Repository: eloqdata/tx_service Length of output: 292 🏁 Script executed: # Look for Execute method in TemplateCcMap that handles PostWriteAllCc
rg -n "bool Execute\(PostWriteAllCc" tx_service/include/cc/template_cc_map.h -B 5 -A 20Repository: eloqdata/tx_service Length of output: 918 🏁 Script executed: # Check if there's a ValidTermCheck call at the start of the PostWriteAllCc execution path
sed -n '1,100p' tx_service/include/cc/template_cc_map.h | grep -A 10 "Execute.*PostWriteAllCc"Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: # Continue reading the Execute(PostWriteAllCc) method in template_cc_map.h to find ValidTermCheck call
sed -n '972,1050p' tx_service/include/cc/template_cc_map.h | cat -nRepository: eloqdata/tx_service Length of output: 3466 🏁 Script executed: # Search for where Execute() calls ValidTermCheck in the codebase
rg -n "if.*ValidTermCheck\|ValidTermCheck\(\)" tx_service/include/cc/cc_request.h -B 2 -A 3 | head -40Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: # Look for the pattern where Execute delegates to parent class for PostWriteAllCc
rg -n "TemplateCcMap.*Execute.*req" tx_service/include/cc/range_cc_map.h -B 5 -A 5Repository: eloqdata/tx_service Length of output: 1587 🏁 Script executed: # Find where ValidTermCheck is called in the CcShard::Execute flow
rg -n "class CcShard" tx_service/include/cc/ --type cpp -A 50 | grep -A 30 "Execute.*CcRequestBase"Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: # Look for the Execute method that dispatches to cc requests
rg -n "bool Execute\(CcRequestBase" tx_service/include/cc/cc_shard.h -B 5 -A 20Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: # Search in cc_shard.h for the main Execute dispatcher
rg -n "ValidTermCheck\(\)" tx_service/include/cc/cc_shard.h -B 5 -A 5Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: # Let me check the actual flow by looking at CcRequestBase and how Execute is called
rg -n "class CcRequestBase" tx_service/include/cc/cc_request.h -A 30 | head -50Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: # Look for the entry point that calls ValidTermCheck before Execute
rg -n "ValidTermCheck\(\)" tx_service/src/cc/ --type cpp -B 3 -A 8 | head -60Repository: eloqdata/tx_service Length of output: 3739 🏁 Script executed: # Check local_cc_shards.cpp for pin/unpin patterns mentioned in the learning
rg -n "TryPinNodeGroupData|UnpinNodeGroupData" tx_service/src/cc/local_cc_shards.cpp -B 3 -A 5 | head -80Repository: eloqdata/tx_service Length of output: 3021 🏁 Script executed: # Let me check if there are any conditional checks around InitKeyCache in similar code patterns
rg -n "InitKeyCache" tx_service/include/cc/ --type cpp -B 5 -A 5 | grep -A 5 -B 5 "if.*term\|assert.*term"Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: # Check the full context around lines 606-650 in range_cc_map.h to see error handling
sed -n '600,750p' tx_service/include/cc/range_cc_map.h | cat -nRepository: eloqdata/tx_service Length of output: 7870 Replace the assertion with a runtime check to guard the resource-pinning operation. The code pins the range via if (ng_term <= 0) {
// Handle error: term validation failed
return false; // or appropriate error handling
}This ensures the pinning operation is only performed when the term is valid, matching the defensive pattern used elsewhere in the codebase. 🤖 Prompt for AI Agents |
||
| // try to init the key cache for new range if it | ||
| // lands on this ng | ||
| for (auto new_range : new_range_entries) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -700,11 +700,8 @@ class TemplateCcMap : public CcMap | |
| bool will_insert = false; | ||
|
|
||
| uint32_t ng_id = req.NodeGroupId(); | ||
| int64_t ng_term = Sharder::Instance().LeaderTerm(ng_id); | ||
| if (ng_term < 0) | ||
| { | ||
| return hd_res->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); | ||
| } | ||
| int64_t ng_term = req.NodeGroupTerm(); | ||
| assert(ng_term > 0); | ||
|
|
||
|
Comment on lines
702
to
705
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: rg -n --type=cpp --type=hpp -A 2 "NodeGroupTerm\s*\(" | head -80Repository: eloqdata/tx_service Length of output: 90 🏁 Script executed: rg -n --type=cpp --type=hpp "ValidTermCheck\s*\(" -A 3 | head -60Repository: eloqdata/tx_service Length of output: 90 🏁 Script executed: fd -e hpp -e cpp | xargs rg -l "class\s+AcquireAllCc\b|struct\s+AcquireAllCc\b"Repository: eloqdata/tx_service Length of output: 47 🏁 Script executed: rg -n --type=cpp --type=hpp "int64_t.*NodeGroupTerm|CcRequestBase.*NodeGroupTerm" -A 5 | head -100Repository: eloqdata/tx_service Length of output: 90 🏁 Script executed: rg -n "NodeGroupTerm\s*\(" -A 2 | head -100Repository: eloqdata/tx_service Length of output: 5274 🏁 Script executed: rg -n "ValidTermCheck" -A 3 | head -80Repository: eloqdata/tx_service Length of output: 4720 🏁 Script executed: fd -e h -e cpp | xargs grep -l "class AcquireAllCc\|struct AcquireAllCc" 2>/dev/nullRepository: eloqdata/tx_service Length of output: 128 🏁 Script executed: rg -n "int64_t.*NodeGroupTerm\(\)|NodeGroupTerm.*int64_t" -B 2 -A 5 | head -120Repository: eloqdata/tx_service Length of output: 5127 🏁 Script executed: rg -n "class AcquireAllCc" -A 50 | head -80Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: rg -n "AcquireAllCc::Execute" -A 100 | head -150Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: rg -n "SetCceLock|AcquireCceKeyLock" -B 2 -A 5 | head -120Repository: eloqdata/tx_service Length of output: 8947 🏁 Script executed: rg -n "AcquireAllCc" | head -50Repository: eloqdata/tx_service Length of output: 3795 🏁 Script executed: rg -n "Execute\(CcShard" -B 5 | grep -A 5 "AcquireAllCc"Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: fd -type f -name "*.h" | xargs grep -l "class AcquireAllCc"Repository: eloqdata/tx_service Length of output: 235 🏁 Script executed: cat tx_service/include/cc/template_cc_map.h | sed -n '690,750p'Repository: eloqdata/tx_service Length of output: 2701 🏁 Script executed: rg -n "AcquireAllCc::Reset|NextRequest\(" -B 3 -A 8 | head -100Repository: eloqdata/tx_service Length of output: 7754 🏁 Script executed: rg -n "ng_term_" tx_service/include/cc/cc_request.h -B 2 -A 2 | head -80Repository: eloqdata/tx_service Length of output: 2386 🏁 Script executed: rg -n "struct AcquireAllCc" -A 80 tx_service/include/cc/cc_request.h | head -120Repository: eloqdata/tx_service Length of output: 2811 🏁 Script executed: rg -n "AcquireAllCc.*Reset" -B 2 -A 15 | head -150Repository: eloqdata/tx_service Length of output: 4148 🏁 Script executed: rg -n "ValidTermCheck\|Execute" tx_service/src/cc/local_cc_shard.cpp -B 2 -A 2 | head -150Repository: eloqdata/tx_service Length of output: 139 🏁 Script executed: rg -n "EnqueueCcRequest\|Execute.*AcquireAllCc" -B 5 -A 5 | head -120Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: cat tx_service/include/cc/template_cc_map.h | sed -n '671,690p'Repository: eloqdata/tx_service Length of output: 764 🏁 Script executed: rg -n "class LocalCcHandler" -A 50 tx_service/include/cc/local_cc_handler.h | head -100Repository: eloqdata/tx_service Length of output: 2304 🏁 Script executed: rg -n "EnqueueCcRequest.*AcquireAllCc|ProcessCcRequest" -B 5 -A 10 | head -150Repository: eloqdata/tx_service Length of output: 1086 🏁 Script executed: rg -n "ValidTermCheck.*Execute\|Execute.*ValidTermCheck" -B 3 -A 3Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: rg -n "ValidTermCheck\(\)" -B 5 -A 5 | grep -A 10 -B 10 "AcquireAllCc\|TemplatedCcRequest"Repository: eloqdata/tx_service Length of output: 4234 🏁 Script executed: rg -n "CcShards::ProcessCcRequest\|LocalCcShards::ProcessCcRequest" -A 50 | head -120Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: rg -n "\.Execute\(" tx_service/src/cc/local_cc_shards.cpp -B 10 -A 5 | head -150Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: rg -n "Execute.*AcquireAllCc" tx_service/include/cc/template_cc_map.h -B 2 -A 50 | head -100Repository: eloqdata/tx_service Length of output: 4117 🏁 Script executed: rg -n "bool Execute\(AcquireAllCc" -B 5 -A 80 tx_service/include/cc/template_cc_map.h | head -150Repository: eloqdata/tx_service Length of output: 3863 Replace If Suggested fix uint32_t ng_id = req.NodeGroupId();
int64_t ng_term = req.NodeGroupTerm();
-assert(ng_term > 0);
+if (ng_term <= 0)
+{
+ return hd_res->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER);
+}🤖 Prompt for AI Agents |
||
| uint16_t tx_core_id = ((req.Txn() >> 32L) & 0x3FF) % shard_->core_cnt_; | ||
|
|
||
|
|
@@ -994,13 +991,6 @@ class TemplateCcMap : public CcMap | |
| return false; | ||
| } | ||
|
|
||
| int64_t ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId()); | ||
| if (ng_term < 0) | ||
| { | ||
| req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); | ||
| return true; | ||
| } | ||
|
|
||
| const KeyT *target_key = nullptr; | ||
| if (req.Key() != nullptr) | ||
| { | ||
|
|
@@ -5570,6 +5560,21 @@ class TemplateCcMap : public CcMap | |
| it = deduce_iterator(*start_key); | ||
| slice_end_key = typed_slice->EndKey(); | ||
| end_it = deduce_iterator(*slice_end_key); | ||
| if (!(*start_key < *slice_end_key)) | ||
| { | ||
| DLOG(ERROR) | ||
| << "!!!ERROR!!! start key: " << start_key->ToString() | ||
| << ", search key: " << search_key.ToString() | ||
| << ", slice start key: " | ||
| << typed_slice->StartKey()->ToString() | ||
| << ", slice end key: " << slice_end_key->ToString() | ||
| << ", export base table item: " << std::boolalpha | ||
| << req.export_base_table_item_ | ||
| << ", current slice index: " | ||
| << req.curr_slice_index_[shard_->core_id_] | ||
| << " on core: " << shard_->core_id_ | ||
| << ", table: " << table_name_.StringView(); | ||
| } | ||
|
Comment on lines
+5563
to
+5577
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don’t continue scanning when Right now this only emits a At minimum: (1) make this visible in prod (not Safer fallback (treat as empty slice) end_it = deduce_iterator(*slice_end_key);
if (!(*start_key < *slice_end_key))
{
- DLOG(ERROR)
+ LOG(ERROR)
<< "!!!ERROR!!! start key: " << start_key->ToString()
<< ", search key: " << search_key.ToString()
<< ", slice start key: "
<< typed_slice->StartKey()->ToString()
<< ", slice end key: " << slice_end_key->ToString()
<< ", export base table item: " << std::boolalpha
<< req.export_base_table_item_
<< ", current slice index: "
<< req.curr_slice_index_[shard_->core_id_]
<< " on core: " << shard_->core_id_
<< ", table: " << table_name_.StringView();
+ // Prevent scanning out of the slice.
+ it = end_it;
} |
||
|
|
||
| if (it != end_it) | ||
| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4172,32 +4172,32 @@ void SplitFlushRangeOp::ClearInfos() | |
| void SplitFlushRangeOp::Forward(TransactionExecution *txm) | ||
| { | ||
| if (txm->TxStatus() == TxnStatus::Recovering && | ||
| Sharder::Instance().LeaderTerm(txm->TxCcNodeId()) < 0) | ||
| (Sharder::Instance().CandidateLeaderTerm(txm->TxCcNodeId()) != | ||
| txm->TxTerm() && | ||
| Sharder::Instance().LeaderTerm(txm->TxCcNodeId()) != txm->TxTerm())) | ||
| { | ||
| // This is a recovered tx and replay is not done yet. We should wait for | ||
| // replay finish before forwarding tx machine. | ||
| if (Sharder::Instance().CandidateLeaderTerm(txm->TxCcNodeId()) != | ||
| txm->TxTerm()) | ||
| { | ||
| // Recovered term is invalid. Do not call ForceToFinish as it will | ||
| // cause infinite recursive call. Clean up tx state directly. | ||
| txm->bool_resp_->Finish(false); | ||
| // Recovered term is invalid. Do not call ForceToFinish as it will | ||
| // cause infinite recursive call. Clean up tx state directly. | ||
| txm->bool_resp_->Finish(false); | ||
|
|
||
| ClearInfos(); | ||
| ClearInfos(); | ||
|
|
||
| txm->state_stack_.pop_back(); | ||
| assert(txm->state_stack_.empty()); | ||
| txm->state_stack_.pop_back(); | ||
| assert(txm->state_stack_.empty()); | ||
|
|
||
| assert(this == txm->split_flush_op_.get()); | ||
| LocalCcShards *shards = Sharder::Instance().GetLocalCcShards(); | ||
| std::unique_lock<std::mutex> lk( | ||
| shards->split_flush_range_op_pool_mux_); | ||
| shards->split_flush_range_op_pool_.emplace_back( | ||
| std::move(txm->split_flush_op_)); | ||
| assert(txm->split_flush_op_ == nullptr); | ||
| } | ||
| assert(this == txm->split_flush_op_.get()); | ||
| LocalCcShards *shards = Sharder::Instance().GetLocalCcShards(); | ||
| std::unique_lock<std::mutex> lk(shards->split_flush_range_op_pool_mux_); | ||
| shards->split_flush_range_op_pool_.emplace_back( | ||
| std::move(txm->split_flush_op_)); | ||
| assert(txm->split_flush_op_ == nullptr); | ||
| return; | ||
| } | ||
|
Comment on lines
4174
to
4195
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
set -euo pipefail
# 1) Find all references to split flush op pooling + lifecycle.
echo "=== SPLIT FLUSH OP POOLING & LIFECYCLE ==="
rg -n --hidden --no-heading -S \
'split_flush_op_|split_flush_range_op_pool_|split_flush_range_op_pool_mux_|SplitFlushRangeOp::Forward|NewSplitFlush|SplitFlushRangeOp::Reset' . 2>/dev/null | head -100
echo -e "\n=== TX RESPONSE FINISH & TXM RECYCLING ==="
rg -n --hidden --no-heading -S \
'bool_resp_->Finish\(|Finish\(false\)|Finish\(true\)|TxnStatus::Recovering' . 2>/dev/null | head -100
echo -e "\n=== SPLIT FLUSH OP SUBOPS & ASYNC WORK ==="
rg -n --hidden --no-heading -S \
'data_sync_op_|ds_upsert_range_op_|ds_clean_old_range_op_|update_key_cache_op_|AsyncOp<' tx_service/src/tx_operation.cpp 2>/dev/null | head -60Repository: eloqdata/tx_service Length of output: 11128 🏁 Script executed: #!/bin/bash
# Get file size and read SplitFlushRangeOp class definition
wc -l tx_service/src/tx_operation.cpp
echo -e "\n=== SEARCHING FOR SPLITFLUSHRANGEOP CLASS ==="
rg -n 'class SplitFlushRangeOp' tx_service/src/tx_operation.cppRepository: eloqdata/tx_service Length of output: 146 🏁 Script executed: #!/bin/bash
# Read the relevant section including the questioned code
sed -n '4160,4220p' tx_service/src/tx_operation.cppRepository: eloqdata/tx_service Length of output: 2327 🏁 Script executed: #!/bin/bash
# Find ClearInfos implementation in SplitFlushRangeOp
rg -n -A 20 'SplitFlushRangeOp.*ClearInfos\(|void.*SplitFlushRangeOp.*ClearInfos' tx_service/src/tx_operation.cppRepository: eloqdata/tx_service Length of output: 870 🏁 Script executed: #!/bin/bash
# Find DataMigrationOp invalid-term cleanup pattern for comparison
rg -n -B 5 -A 15 'DataMigrationOp.*Recovering\|Recovered term is invalid.*DataMigration' tx_service/src/tx_operation.cppRepository: eloqdata/tx_service Length of output: 45 🏁 Script executed: #!/bin/bash
# Find DataMigrationOp invalid-term cleanup for comparison
rg -n -B 3 -A 12 'Recovered term is invalid' tx_service/src/tx_operation.cppRepository: eloqdata/tx_service Length of output: 1578 🏁 Script executed: #!/bin/bash
# Check if DataMigrationOp calls txm->Abort() after recycling
rg -n -B 5 -A 20 'class DataMigrationOp' tx_service/src/tx_operation.cpp | head -100Repository: eloqdata/tx_service Length of output: 45 🏁 Script executed: #!/bin/bash
# Find where DataMigrationOp recycling happens and whether Abort is called
rg -n -B 2 -A 10 'txm->Abort()' tx_service/src/tx_operation.cpp | head -80Repository: eloqdata/tx_service Length of output: 2050 🏁 Script executed: #!/bin/bash
# Check AsyncOp::Forward to see if it spawns persistent worker threads
sed -n '3729,3785p' tx_service/src/tx_operation.cppRepository: eloqdata/tx_service Length of output: 1673 Critical UAF risk: subop worker threads not joined before recycling split_flush_op_ The invalid-term cleanup path (line 4174–4193) calls
Additionally,
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes we can only directly recycle slit flush op if op_ == nullptr There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @liunyl Thanks — agreed. I’ll gate the early recycle path so we only directly recycle the SplitFlushRangeOp when no sub-op has started (op_ == nullptr). If a sub-op is already in-flight, we’ll fall through and let the normal state-machine paths unwind safely. [plan_code_changes]
If you’d like, I can open a quick PR amendment with this patch. ✏️ Learnings added
🧠 Learnings used
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When forward SplitFlushRangeOp, there will definitely be no running sub-ops. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| // else: allow forwarding recovered split flush tx on the candidate leader, | ||
| // otherwise, a deadlock might occur in the following scenario: If the | ||
| // replay data cannot be completed due to insufficient memory, the recovered | ||
| // split transaction will have to wait for the leader to complete log replay | ||
| // before it can continue forwarding. | ||
| if (op_ == nullptr) | ||
| { | ||
| // Initialize commit ts as the start time of tx. This value will | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only if the tx is processed on coordinator node group, should we allow cc requests to be processed as candidate leader.