Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 19 additions & 23 deletions tx_service/include/cc/catalog_cc_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,7 @@ class CatalogCcMap
uint32_t ng_id = req.NodeGroupId();
if (shard_->IsNative(ng_id) && req.CcOp() == CcOperation::ReadForWrite)
{
int64_t ng_term = Sharder::Instance().LeaderTerm(ng_id);
CcHandlerResult<AcquireAllResult> *hd_res = req.Result();
if (ng_term < 0)
{
return hd_res->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER);
}

const CatalogKey *catalog_key = nullptr;
if (req.Key() != nullptr)
Expand Down Expand Up @@ -199,19 +194,17 @@ class CatalogCcMap
}
});

int64_t ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId());
int64_t ng_term = req.NodeGroupTerm();
assert(ng_term > 0);
CODE_FAULT_INJECTOR("term_CatalogCcMap_Execute_PostWriteAllCc", {
Comment on lines +197 to 199
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard invalid NodeGroupTerm() instead of assert‑only.

If a request arrives with a negative term, this assert can crash debug builds; in release builds the negative term is used downstream. Prefer a runtime error path (optionally mirroring the ReadCc fallback) so invalid terms are rejected cleanly.

🔧 Proposed fix (robust guard + fallback)
-        int64_t ng_term = req.NodeGroupTerm();
-        assert(ng_term > 0);
+        int64_t ng_term = req.NodeGroupTerm();
+        if (ng_term < 0)
+        {
+            if (req.AllowRunOnCandidate())
+            {
+                ng_term = Sharder::Instance().CandidateLeaderTerm(req.NodeGroupId());
+            }
+            if (ng_term < 0)
+            {
+                ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId());
+                int64_t standby_node_term = Sharder::Instance().StandbyNodeTerm();
+                ng_term = std::max(ng_term, standby_node_term);
+            }
+        }
+        if (ng_term <= 0)
+        {
+            req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER);
+            return true;
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
int64_t ng_term = req.NodeGroupTerm();
assert(ng_term > 0);
CODE_FAULT_INJECTOR("term_CatalogCcMap_Execute_PostWriteAllCc", {
int64_t ng_term = req.NodeGroupTerm();
if (ng_term < 0)
{
if (req.AllowRunOnCandidate())
{
ng_term = Sharder::Instance().CandidateLeaderTerm(req.NodeGroupId());
}
if (ng_term < 0)
{
ng_term = Sharder::Instance().LeaderTerm(req.NodeGroupId());
int64_t standby_node_term = Sharder::Instance().StandbyNodeTerm();
ng_term = std::max(ng_term, standby_node_term);
}
}
if (ng_term <= 0)
{
req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER);
return true;
}
CODE_FAULT_INJECTOR("term_CatalogCcMap_Execute_PostWriteAllCc", {
🤖 Prompt for AI Agents
In `@tx_service/include/cc/catalog_cc_map.h` around lines 197 - 199, Replace the
assert on ng_term with a runtime guard: check req.NodeGroupTerm() into ng_term
and if ng_term <= 0 return a proper error (or take the same fallback path used
by ReadCc) instead of crashing; update the block around ng_term,
CODE_FAULT_INJECTOR("term_CatalogCcMap_Execute_PostWriteAllCc", ...) and
subsequent logic to handle the invalid term via the error/fallback path so
downstream code never receives a negative term.

LOG(INFO)
<< "FaultInject term_CatalogCcMap_Execute_PostWriteAllCc";
ng_term = -1;
FaultInject::Instance().InjectFault(
"term_CatalogCcMap_Execute_PostWriteAllCc", "remove");
});
if (ng_term < 0)
{
req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER);
return true;
}
});

const CatalogKey *table_key = nullptr;
if (req.Key() != nullptr)
Expand Down Expand Up @@ -1148,28 +1141,31 @@ class CatalogCcMap
assert(req.IsLocal());

uint32_t ng_id = req.NodeGroupId();
int64_t ng_term = Sharder::Instance().LeaderTerm(ng_id);
ng_term = std::max(ng_term, Sharder::Instance().StandbyNodeTerm());

if (req.IsInRecovering())
int64_t ng_term = req.NodeGroupTerm();
if (ng_term < 0)
{
ng_term = ng_term > 0
? ng_term
: Sharder::Instance().CandidateLeaderTerm(ng_id);
if (req.AllowRunOnCandidate())
{
ng_term = Sharder::Instance().CandidateLeaderTerm(ng_id);
}
if (ng_term < 0)
{
ng_term = Sharder::Instance().LeaderTerm(ng_id);
int64_t standby_node_term =
Sharder::Instance().StandbyNodeTerm();
ng_term = std::max(ng_term, standby_node_term);
}
}
assert(ng_term > 0);

Comment on lines 1143 to 1160
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle unresolved term after fallback instead of asserting.

If the leader/standby/candidate lookup still yields a negative term, the current assert can crash and negative terms can leak into lock metadata in release builds. Return a proper error instead.

🔧 Proposed fix (runtime error on invalid term)
-        assert(ng_term > 0);
+        if (ng_term <= 0)
+        {
+            req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER);
+            return true;
+        }
🤖 Prompt for AI Agents
In `@tx_service/include/cc/catalog_cc_map.h` around lines 1143 - 1160, The code
currently asserts ng_term > 0 after falling back via Sharder::Instance() calls;
replace this assertion with a runtime error path: if ng_term <= 0, log a clear
error referencing req.NodeGroupId() and the failed lookups
(NodeGroupTerm/StandbyNodeTerm/CandidateLeaderTerm), set the function's
error/Status return (or populate the response error fields used by this module)
and return early instead of proceeding to use ng_term; remove the assert(ng_term
> 0) and ensure no lock metadata is written when ng_term is invalid.

CODE_FAULT_INJECTOR("term_CatalogCcMap_Execute_ReadCc", {
LOG(INFO) << "FaultInject term_CatalogCcMap_Execute_ReadCc";
LOG(INFO) << "FaultInject term_CatalogCcMap_Execute_ReadCc";
ng_term = -1;
FaultInject::Instance().InjectFault(
"term_CatalogCcMap_Execute_ReadCc", "remove");
});

if (ng_term < 0)
{
req.Result()->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER);
return true;
}
});

const CatalogKey *table_key =
static_cast<const CatalogKey *>(req.Key());
Expand Down
17 changes: 11 additions & 6 deletions tx_service/include/cc/cc_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class CcHandler
uint32_t hd_res_idx,
CcProtocol proto,
IsolationLevel iso_level,
bool abort_if_oom) = 0;
bool abort_if_oom,
bool allow_run_on_candidate) = 0;

/**
* @brief Acquires write locks for the input key in all shards. This method
Expand Down Expand Up @@ -166,7 +167,8 @@ class CcHandler
const TxRecord *record,
OperationType operation_type,
uint32_t key_shard_code,
CcHandlerResult<PostProcessResult> &hres) = 0;
CcHandlerResult<PostProcessResult> &hres,
bool allow_run_on_candidate) = 0;

/**
* @briefPost-processes a read/scan key. Post-processing clears the read
Expand Down Expand Up @@ -202,7 +204,8 @@ class CcHandler
CcHandlerResult<PostProcessResult> &hres,
bool is_local = false,
bool need_remote_resp = true,
PostReadType post_read_type = PostReadType::Release) = 0;
PostReadType post_read_type = PostReadType::Release,
bool allow_run_on_candidate = false) = 0;

/**
* @brief Reads the input key and returns the key's record. The request puts
Expand Down Expand Up @@ -239,6 +242,7 @@ class CcHandler
CcProtocol proto = CcProtocol::OCC,
bool is_for_write = false,
bool is_covering_keys = false,
bool allow_run_on_candidate = false,
bool point_read_on_miss = false,
int32_t partition_id = -1,
bool abort_if_oom = false) = 0;
Expand Down Expand Up @@ -297,7 +301,7 @@ class CcHandler
IsolationLevel iso_level = IsolationLevel::RepeatableRead,
CcProtocol proto = CcProtocol::Locking,
bool is_for_write = false,
bool is_recovring = false,
bool allow_run_on_candidate = false,
bool execute_immediately = true) = 0;

virtual bool ReadLocal(
Expand All @@ -313,7 +317,7 @@ class CcHandler
IsolationLevel iso_level = IsolationLevel::RepeatableRead,
CcProtocol proto = CcProtocol::Locking,
bool is_for_write = false,
bool is_recovring = false) = 0;
bool allow_run_on_candidate = false) = 0;

virtual void ScanOpen(
const TableName &table_name,
Expand Down Expand Up @@ -404,7 +408,8 @@ class CcHandler
virtual void NewTxn(CcHandlerResult<InitTxResult> &hres,
IsolationLevel iso_level,
NodeGroupId tx_ng_id,
uint32_t log_group_id) = 0;
uint32_t log_group_id,
bool allow_run_on_candidate) = 0;

/// <summary>
/// Sets the commit timestamp of the input tx.
Expand Down
5 changes: 4 additions & 1 deletion tx_service/include/cc/cc_req_misc.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,8 @@ struct FillStoreSliceCc : public CcRequestBase
{
assert(err_code != CcErrorCode::NO_ERROR);
DLOG(ERROR) << "Abort this FillStoreSliceCc request with error: "
<< CcErrorMessage(err_code);
<< CcErrorMessage(err_code)
<< ", table name: " << table_name_->StringView();
bool finish_all = SetError(err_code);
// Recycle request
if (finish_all)
Expand Down Expand Up @@ -989,6 +990,8 @@ struct UpdateCceCkptTsCc : public CcRequestBase
UpdateCceCkptTsCc(const UpdateCceCkptTsCc &) = delete;
UpdateCceCkptTsCc &operator=(const UpdateCceCkptTsCc &) = delete;

bool ValidTermCheck() const;

bool Execute(CcShard &ccs) override;

void SetFinished()
Expand Down
Loading