Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tx_service/include/cc/cc_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,7 @@ class CcShard

// Try to send previous failed message to standby nodes.
bool ResendFailedForwardMessages();
void NotifyStandbyOutOfSync(uint32_t node_id);

void CollectStandbyMetrics();

Expand Down
13 changes: 12 additions & 1 deletion tx_service/include/cc/object_cc_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -1378,9 +1378,20 @@ class ObjectCcMap : public TemplateCcMap<KeyT, ValueT, false, false>
bool s_obj_exist = (cce->PayloadStatus() == RecordStatus::Normal);

StandbyForwardEntry *forward_entry = nullptr;
if (!shard_->GetSubscribedStandbys().empty())
auto subscribed_standbys = shard_->GetSubscribedStandbys();
if (!subscribed_standbys.empty())
{
forward_entry = cce->ForwardEntry();
if (forward_entry == nullptr)
{
LOG(ERROR) << "Subscribed standbys exist, but forward_entry is "
"null. Data loss may occur. Notifying standbys "
"to resubscribe.";
for (uint32_t node_id : subscribed_standbys)
{
shard_->NotifyStandbyOutOfSync(node_id);
}
}
}
if (commit_ts > 0)
{
Expand Down
115 changes: 65 additions & 50 deletions tx_service/src/cc/cc_shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3078,56 +3078,7 @@ bool CcShard::ResendFailedForwardMessages()
}
else
{
// Message not found in map - it has been evicted
// This message is already lost and this standby needs to
// resubscribe to the primary node again. Notify standby that
// it has already fallen behind. Use the latest seq id so that
// standby knows which epoch this out of sync msg belongs to.
remote::CcMessage cc_msg;
cc_msg.set_type(
remote::CcMessage_MessageType::
CcMessage_MessageType_KeyObjectStandbyForwardRequest);
auto req = cc_msg.mutable_key_obj_standby_forward_req();
req->set_forward_seq_grp(core_id_);
req->set_forward_seq_id(next_forward_sequence_id_ - 1);
req->set_primary_leader_term(Sharder::Instance().LeaderTerm(
Sharder::Instance().NativeNodeGroup()));
req->set_out_of_sync(true);
stream_sender_->SendMessageToNode(node_id, cc_msg);

seq_id = UINT64_MAX;
// Remove heartbeat target node
local_shards_.RemoveHeartbeatTargetNode(node_id,
seq_id_and_term.second);

for (size_t core_idx = 0; core_idx < core_cnt_; ++core_idx)
{
if (core_idx != core_id_)
{
DispatchTask(
core_idx,
[node_id = node_id,
unsubscribe_standby_term =
seq_id_and_term.second](CcShard &ccs) -> bool
{
auto subscribe_node_iter =
ccs.subscribed_standby_nodes_.find(node_id);
if (subscribe_node_iter !=
ccs.subscribed_standby_nodes_.end())
{
if (subscribe_node_iter->second.second <=
unsubscribe_standby_term)
{
// erase ?
subscribe_node_iter->second.first =
UINT64_MAX;
}
}

return true;
});
}
}
NotifyStandbyOutOfSync(node_id);
break;
}
}
Expand All @@ -3144,6 +3095,70 @@ bool CcShard::ResendFailedForwardMessages()
return all_msgs_sent;
}

void CcShard::NotifyStandbyOutOfSync(uint32_t node_id)
{
auto seq_node_iter = subscribed_standby_nodes_.find(node_id);
if (seq_node_iter == subscribed_standby_nodes_.end())
{
return;
}

if (!stream_sender_)
{
stream_sender_ = Sharder::Instance().GetCcStreamSender();
}
if (!stream_sender_)
{
LOG(WARNING) << "Failed to notify standby " << node_id
<< " of out of sync state because stream sender is null";
return;
}

// Message not found in map - it has been evicted. Notify standby that it
// has already fallen behind so it can resubscribe to the primary node.
remote::CcMessage cc_msg;
cc_msg.set_type(remote::CcMessage_MessageType::
CcMessage_MessageType_KeyObjectStandbyForwardRequest);
auto req = cc_msg.mutable_key_obj_standby_forward_req();
req->set_forward_seq_grp(core_id_);
req->set_forward_seq_id(next_forward_sequence_id_ - 1);
req->set_primary_leader_term(
Sharder::Instance().LeaderTerm(Sharder::Instance().NativeNodeGroup()));
req->set_out_of_sync(true);
stream_sender_->SendMessageToNode(node_id, cc_msg);

auto &seq_id_and_term = seq_node_iter->second;
seq_id_and_term.first = UINT64_MAX;
// Remove heartbeat target node
local_shards_.RemoveHeartbeatTargetNode(node_id, seq_id_and_term.second);

int64_t unsubscribe_standby_term = seq_id_and_term.second;
for (size_t core_idx = 0; core_idx < core_cnt_; ++core_idx)
{
if (core_idx != core_id_)
{
DispatchTask(
core_idx,
[node_id, unsubscribe_standby_term](CcShard &ccs) -> bool
{
auto subscribe_node_iter =
ccs.subscribed_standby_nodes_.find(node_id);
if (subscribe_node_iter !=
ccs.subscribed_standby_nodes_.end())
{
if (subscribe_node_iter->second.second <=
unsubscribe_standby_term)
{
subscribe_node_iter->second.first = UINT64_MAX;
}
}

return true;
});
}
}
}
Comment on lines +3098 to +3160
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find SendMessageToNode method definition and its return type
rg -n "SendMessageToNode" -t cpp -t h -t hh -C 3

Repository: eloqdata/tx_service

Length of output: 89


🏁 Script executed:

#!/bin/bash
# Find SendMessageToNode method definition and its return type
rg -n "SendMessageToNode" -t cpp -t h -C 3

Repository: eloqdata/tx_service

Length of output: 25623


🏁 Script executed:

#!/bin/bash
# Find SendMessageResult struct definition
rg -n "struct SendMessageResult|class SendMessageResult" -t cpp -t h -A 10

Repository: eloqdata/tx_service

Length of output: 809


🏁 Script executed:

#!/bin/bash
# Get more details about the SendMessageResult and understand the implementation
rg -n "SendMessageResult" tx_service/include/remote/cc_stream_sender.h -A 30 | head -60

Repository: eloqdata/tx_service

Length of output: 2387


🏁 Script executed:

#!/bin/bash
# Check the NotifyStandbyOutOfSync context and see if SendStandbyMessageToNode could be used
rg -n "SendStandbyMessageToNode" -t cpp -t h -C 2 | head -50

Repository: eloqdata/tx_service

Length of output: 2445


🏁 Script executed:

#!/bin/bash
# Check the implementation of SendMessageToNode to see what happens on failure
cat -n tx_service/src/remote/cc_stream_sender.cpp | sed -n '158,250p'

Repository: eloqdata/tx_service

Length of output: 4286


🏁 Script executed:

#!/bin/bash
# Continue reading SendMessageToNode implementation
cat -n tx_service/src/remote/cc_stream_sender.cpp | sed -n '250,290p'

Repository: eloqdata/tx_service

Length of output: 1948


🏁 Script executed:

#!/bin/bash
# Continue reading to see what happens on non-resend failures
cat -n tx_service/src/remote/cc_stream_sender.cpp | sed -n '290,310p'

Repository: eloqdata/tx_service

Length of output: 1443


🏁 Script executed:

#!/bin/bash
# See the final return value
cat -n tx_service/src/remote/cc_stream_sender.cpp | sed -n '310,325p'

Repository: eloqdata/tx_service

Length of output: 633


🏁 Script executed:

#!/bin/bash
# Check the exact context around line 3128 in cc_shard.cpp
cat -n tx_service/src/cc/cc_shard.cpp | sed -n '3115,3165p'

Repository: eloqdata/tx_service

Length of output: 2377


Check the send result before marking standby as out-of-sync.

NotifyStandbyOutOfSync modifies state (setting seq_id to UINT64_MAX and removing heartbeat targets) without verifying the notification was successfully sent or queued for retry. SendMessageToNode returns a SendMessageResult struct with fields sent and queued_for_retry. If the result indicates the message failed and wasn't queued (both false), the standby will never resubscribe and will silently lag behind.

Check the result before modifying state:

const auto send_result = stream_sender_->SendMessageToNode(node_id, cc_msg);
if (!send_result.sent && !send_result.queued_for_retry)
{
    LOG(WARNING) << "Failed to notify standby " << node_id
                 << " of out-of-sync state; will retry later";
    return;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/cc/cc_shard.cpp` around lines 3098 - 3160,
NotifyStandbyOutOfSync currently updates state (setting seq_id to UINT64_MAX and
removing heartbeat targets) immediately after calling
stream_sender_->SendMessageToNode without checking the SendMessageResult; change
it to capture the return value from stream_sender_->SendMessageToNode
(SendMessageResult), and if both sent and queued_for_retry are false, log a
warning and return early so you don't mark the standby out-of-sync; only proceed
to modify seq_node_iter->second, call local_shards_.RemoveHeartbeatTargetNode,
and dispatch the unsubscribe updates to other cores after the send_result
indicates success or queued retry.


void CcShard::CollectStandbyMetrics()
{
assert(metrics::enable_standby_metrics);
Expand Down