Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 47 additions & 15 deletions tx_service/include/remote/cc_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,28 @@ struct ResendScanSliceResp
ScanSliceResponse msg_;
};

struct SendMessageResult
{
bool sent{false};
bool queued_for_retry{false};
bool need_reconnect{false};

static SendMessageResult Sent()
{
return SendMessageResult{true, false, false};
}

static SendMessageResult Queued(bool need_reconnect)
{
return SendMessageResult{false, true, need_reconnect};
}

static SendMessageResult Failed(bool need_reconnect = false)
{
return SendMessageResult{false, false, need_reconnect};
}
};

class CcStreamSender
{
public:
Expand All @@ -79,21 +101,31 @@ class CcStreamSender
~CcStreamSender();

void RecycleCcMsg(std::unique_ptr<CcMessage> msg);
bool SendMessageToNg(uint32_t node_group_id,
const CcMessage &msg,
CcHandlerResultBase *res = nullptr,
bool resend = false);
bool SendMessageToNode(uint32_t dest_node_id,
const CcMessage &msg,
CcHandlerResultBase *res = nullptr,
bool resend = false,
bool resend_on_eagain = true,
bool log_verbose = false,
bool *need_reconnect = nullptr);
bool SendScanRespToNode(uint32_t dest_node_id,
const ScanSliceResponse &msg,
bool resend = false,
bool *need_reconnect = nullptr);
SendMessageResult SendMessageToNg(uint32_t node_group_id,
const CcMessage &msg,
CcHandlerResultBase *res = nullptr,
bool resend = false);
SendMessageResult SendMessageToNode(uint32_t dest_node_id,
const CcMessage &msg,
CcHandlerResultBase *res = nullptr,
bool resend = false);
SendMessageResult SendScanRespToNode(uint32_t dest_node_id,
const ScanSliceResponse &msg,
bool resend = false);

/**
* @brief Send standby message with best-effort semantics (no
* retry/queuing).
*
* Unlike SendMessageToNode, this does not queue failed messages for retry.
* Use for standby replication which has a dedicated resend logic. The
* caller only cares whether the message is successfully sent into stream or
* not.
*
* @return true if sent immediately, false otherwise
*/
bool SendStandbyMessageToNode(uint32_t dest_node_id, const CcMessage &msg);

void UpdateRemoteNodes(
const std::unordered_map<NodeId, NodeConfig> &nodes_configs);

Expand Down
8 changes: 4 additions & 4 deletions tx_service/src/cc/cc_shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2707,8 +2707,8 @@ void CcShard::ForwardStandbyMessage(StandbyForwardEntry *entry)
}
});

write_succ = stream_sender_->SendMessageToNode(
node_id, entry_ptr->Message(), nullptr, false, false);
write_succ = stream_sender_->SendStandbyMessageToNode(
node_id, entry_ptr->Message());
if (write_succ)
{
last_sent_seq_id++;
Expand Down Expand Up @@ -2839,8 +2839,8 @@ bool CcShard::ResendFailedForwardMessages()
if (entry_it != seq_id_to_entry_map_.end())
{
StandbyForwardEntry *entry = entry_it->second;
bool succ = stream_sender_->SendMessageToNode(
node_id, entry->Message(), nullptr, false, false);
bool succ = stream_sender_->SendStandbyMessageToNode(
node_id, entry->Message());
if (!succ)
{
all_msgs_sent = false;
Expand Down
4 changes: 2 additions & 2 deletions tx_service/src/dead_lock_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ void DeadLockCheck::GatherLockDependancy()
tr::DeadLockRequest *dl = send_msg.mutable_dead_lock_request();
dl->set_src_node_id(Sharder::Instance().NodeId());
dl->set_check_round(check_round_);
bool hr =
auto send_res =
Sharder::Instance().GetCcStreamSender()->SendMessageToNode(
node_id, send_msg);
if (hr)
if (send_res.sent || send_res.queued_for_retry)
{
reply_map_.try_emplace(node_id, false);
}
Expand Down
4 changes: 3 additions & 1 deletion tx_service/src/fault/fault_inject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ void FaultInject::TriggerAction(FaultEntry *entry)
fi_req->set_fault_name(fault_name);
fi_req->set_fault_paras(fault_paras);

b = ss->SendMessageToNode(dest_node_id, send_msg);
auto send_res =
ss->SendMessageToNode(dest_node_id, send_msg);
b = (send_res.sent || send_res.queued_for_retry);
}

// If CcStreamSender == nullptr or failed to send, send to local
Expand Down
Loading