Conversation
WalkthroughThis PR refactors message-sending APIs in CcStreamSender by introducing a structured Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Areas requiring extra attention:
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
tx_service/include/remote/cc_stream_sender.h (1)
74-95:SendMessageResultAPI looks good; consider small ergonomics tweaksThe
SendMessageResultstruct cleanly exposes the three relevant states (sent, queued, need_reconnect), and updating the three send methods to return it clarifies call‑site logic. The dedicatedSendStandbyMessageToNodeis a reasonable specialization, even though it returns a plainbool.If you want to make misuse harder, you could optionally:
- Mark
SendMessageResult(or the send methods) as[[nodiscard]]to catch ignored results.- Add a brief comment documenting the semantics of
sent,queued_for_retry, andneed_reconnectso future call sites interpret them consistently.Otherwise this header‑level API change looks consistent.
Also applies to: 103-116
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
tx_service/include/remote/cc_stream_sender.h(2 hunks)tx_service/src/cc/cc_shard.cpp(2 hunks)tx_service/src/dead_lock_check.cpp(1 hunks)tx_service/src/fault/fault_inject.cpp(1 hunks)tx_service/src/remote/cc_stream_sender.cpp(17 hunks)tx_service/src/remote/remote_cc_handler.cpp(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-12-02T10:43:27.431Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.
Applied to files:
tx_service/src/cc/cc_shard.cpp
🧬 Code graph analysis (2)
tx_service/include/remote/cc_stream_sender.h (1)
tx_service/src/remote/cc_stream_sender.cpp (8)
SendMessageToNg(507-514)SendMessageToNg(507-510)SendMessageToNode(158-320)SendMessageToNode(158-161)SendScanRespToNode(322-458)SendScanRespToNode(322-323)SendStandbyMessageToNode(460-495)SendStandbyMessageToNode(460-461)
tx_service/src/remote/cc_stream_sender.cpp (1)
tx_service/include/remote/cc_stream_sender.h (2)
Queued(85-93)Sent(80-94)
🔇 Additional comments (6)
tx_service/src/remote/remote_cc_handler.cpp (1)
848-872: BroadcastStatistics now treats queued send as success and surfaces hard failuresUsing
SendMessageResultand callinghres.SetFinished()only whensent || queued_for_retrykeeps the fire‑and‑forget behavior for accepted/queued sends, while allowing genuine send failures to propagate viaCcStreamSender’s error handling. Verify that callers are fine with this potential behavior change (previously hard failures were effectively ignored here).tx_service/src/dead_lock_check.cpp (1)
199-218: Deadlock probe send path correctly updated toSendMessageResultUsing
send_res.sent || send_res.queued_for_retryto decide whether to track a node inreply_map_preserves the previous intent (only expect replies from nodes that accepted/queued the request) while aligning with the new API.tx_service/src/fault/fault_inject.cpp (1)
175-213: Remote fault-inject path correctly interpretsSendMessageResult
b = (send_res.sent || send_res.queued_for_retry);keeps the previous meaning of “remote injection accepted” while properly treating queued‑for‑retry as success and falling back to local injection only on hard failure.tx_service/src/remote/cc_stream_sender.cpp (2)
158-320: Core send paths correctly refactored toSendMessageResultThe refactoring of
SendMessageToNode,SendScanRespToNode,ResendMessageToNode, andConnectStreamsto useSendMessageResultis internally consistent:
- Unknown node →
Failed()withREQUESTED_NODE_NOT_LEADERset onreswhen provided.- Connecting stream (
stream_ver == -1) → buffer into appropriate resend list, wake connector, and returnQueued(true).- EAGAIN → buffer into EAGAIN resend list and return
Queued(false).- Non‑EAGAIN errors:
- initial send (
resend == false) → mark stream for reconnect, buffer into resend list, wake connector,Queued(true);- resend (
resend == true) →Failed()and set error onres.- Resend threads treat
sent || queued_for_retryas success and useneed_reconnectonly to handle races where another thread has already marked the stream for reconnect.This preserves prior behavior while making states explicit at call sites.
Also applies to: 322-458, 617-763, 766-916
460-495:SendStandbyMessageToNodelacks reconnect/resend integration
SendStandbyMessageToNodedoes not trigger reconnect or resend logic on stream failure. If the underlying stream breaks, calls fail and log errors, but CcStreamSender never reconnects the stream unless another code path (e.g.,SendMessageToNode) touches the same node. This risks leaving standby replicas unreachable.Consider wiring
SendStandbyMessageToNodeinto the reconnect machinery, either by delegating toSendMessageToNodeor by mirroring its reconnect behavior (flipping stream version to-1and populatingto_connect_regular_streams_on non-EAGAIN errors).tx_service/src/cc/cc_shard.cpp (1)
2680-2726: Standby forwarding reconnection concern requires verificationThe original review identifies a potential issue where
SendStandbyMessageToNodemay not trigger the same reconnection logic asSendMessageToNodeon stream failures, which could leave standby-only nodes with broken connections. This concern requires examination of the actualSendStandbyMessageToNodeandSendMessageToNodeimplementations incc_stream_sender.cpp, their error handling paths, and whether they enqueue messages for resend or trigger stream reconnection through version management orConnectStreams().
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (2)
tx_service/src/remote/cc_stream_sender.cpp (2)
262-262: Track unresolved design decision about message discarding.The TODO comments flag an important question: when resend fails (possibly indicating a dead remote node), should the message be definitively discarded? The current behavior returns
Failed()without explicit discard logic.Would you like me to open an issue to track this design decision? It requires clarity on retry semantics and failure handling policy.
Also applies to: 422-422
461-500: Consider API consistency or add documentation for different return type.
SendStandbyMessageToNodereturnsboolwhileSendMessageToNode,SendScanRespToNode, andSendMessageToNgall returnSendMessageResult. The best-effort semantics (no retry/queuing) justify the simpler return type, but the inconsistency might confuse developers.Consider one of:
- Add a doc comment explaining why standby messages use a simpler
boolreturn (best-effort, no retry infrastructure)- Use
SendMessageResultfor consistency (thoughSent()/Failed()without queuing might be less clear)/** * @brief Send standby message with best-effort semantics (no retry/queuing). * * Unlike SendMessageToNode, this does not queue failed messages for retry. * Use for standby replication where occasional message loss is acceptable. * * @return true if sent immediately, false otherwise */ bool SendStandbyMessageToNode(uint32_t dest_node_id, const CcMessage &msg);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
tx_service/include/remote/cc_stream_sender.h(2 hunks)tx_service/src/remote/cc_stream_sender.cpp(17 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tx_service/include/remote/cc_stream_sender.h
🧰 Additional context used
🧬 Code graph analysis (1)
tx_service/src/remote/cc_stream_sender.cpp (1)
tx_service/include/remote/cc_stream_sender.h (2)
Queued(85-93)Sent(80-94)
🔇 Additional comments (3)
tx_service/src/remote/cc_stream_sender.cpp (3)
158-321: Excellent refactor to structured return type.The change from boolean return to
SendMessageResultsignificantly improves clarity by explicitly distinguishing between sent, queued-for-retry, and failed states. Eliminating the output parameterneed_reconnectin favor of including it in the result struct is a cleaner design.
697-702: Correct interpretation of SendMessageResult.The logic correctly treats both
sentandqueued_for_retryas successful outcomes for the purpose of clearing messages from the EAGAIN resend list. This aligns with the intent that queued messages will be handled by the reconnection/resend infrastructure.Also applies to: 745-750
829-847: Assertions correctly document subtle invariant.The assertions at lines 836 and 897 verify that
need_reconnectimpliesqueued_for_retrywhenresend=true. This is a subtle but correct invariant: when resending andneed_reconnectis true, it can only occur from the stream-connecting case (line 204 / line 365), which sets both flags. The assertions help document this non-obvious relationship.Also applies to: 890-909
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
tx_service/include/remote/cc_stream_sender.h(2 hunks)tx_service/src/cc/cc_shard.cpp(2 hunks)tx_service/src/dead_lock_check.cpp(1 hunks)tx_service/src/fault/fault_inject.cpp(1 hunks)tx_service/src/remote/cc_stream_sender.cpp(17 hunks)tx_service/src/remote/remote_cc_handler.cpp(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- tx_service/src/remote/remote_cc_handler.cpp
- tx_service/src/dead_lock_check.cpp
- tx_service/include/remote/cc_stream_sender.h
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-12-02T10:43:27.431Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.
Applied to files:
tx_service/src/cc/cc_shard.cpp
🧬 Code graph analysis (1)
tx_service/src/remote/cc_stream_sender.cpp (1)
tx_service/include/remote/cc_stream_sender.h (2)
Queued(85-93)Sent(80-94)
🔇 Additional comments (12)
tx_service/src/fault/fault_inject.cpp (1)
203-204: LGTM!The adaptation to the new
SendMessageResultreturn type is correct. The success criteria (sent || queued_for_retry) aligns with the PR's intent to treat queued messages as successfully handled, with the local fault injection fallback still triggered when the remote send definitively fails.tx_service/src/cc/cc_shard.cpp (2)
2710-2711: LGTM!The switch to
SendStandbyMessageToNodeis appropriate for standby forwarding. The boolean return simplifies the logic since standby messages have their own retry mechanism through the message buffer (history_standby_msg_) andretry_fwd_msg_cc_.
2842-2843: LGTM!Consistent use of
SendStandbyMessageToNodein the retry path. The error handling correctly setsall_msgs_sent = falseand breaks the loop on failure.tx_service/src/remote/cc_stream_sender.cpp (9)
158-186: LGTM on the API migration.The return type change from
booltoSendMessageResultprovides clearer semantics. TheFailed()return when the destination node is unknown is appropriate.
189-205: LGTM!Returning
SendMessageResult::Queued(true)when stream is reconnecting correctly indicates that the message is queued and reconnection is needed.
218-256: LGTM!EAGAIN handling correctly returns
Queued(false)since the message is queued for background resend but no reconnection is needed (the stream is still valid, just temporarily full).
257-321: LGTM!The non-EAGAIN error handling correctly distinguishes between resend (where further retry is futile) and initial send (where reconnection and retry are attempted). The
Queued(true)return for the initial send path correctly signals that reconnection is needed.
512-519: LGTM!
SendMessageToNgcorrectly delegates toSendMessageToNodeand returns the result unchanged.
697-702: LGTM!The resend logic correctly treats both
sentandqueued_for_retryas success for counting purposes, since either outcome means the message has been handled.
745-750: LGTM!Consistent handling with the regular message resend logic above.
829-848: LGTM!The
ConnectStreamslogic correctly handles the newSendMessageResult:
- Checks
need_reconnectto determine if the connection loop should break- The assertion
assert(send_result.queued_for_retry)correctly validates that whenneed_reconnectis true, the current message was already re-queued internally- Remaining messages are re-enqueued for the next connection attempt
890-909: LGTM!Consistent handling for long message streams with the same pattern as regular messages.
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tx_service/src/remote/cc_stream_sender.cpp (1)
713-719: Avoid potential hot-spin when EAGAIN keeps re-queuing.
Right now(sent || queued_for_retry)incrementssend_cnt, so if sends repeatedly hit EAGAIN (i.e.,Queued(false)), the inner resend loop can churn the same node’s queue without backoff. Consider treatingQueued(false)as “no progress” (break/yield) or add a small backoff on that outcome.Also applies to: 761-767
♻️ Duplicate comments (1)
tx_service/src/remote/cc_stream_sender.cpp (1)
461-516: Standby send reconnection-on-non-EAGAIN is implemented (matches prior feedback).
🧹 Nitpick comments (1)
tx_service/include/remote/cc_stream_sender.h (1)
74-127: API shape looks good:SendMessageResult+ dedicated standby send clarifies semantics.
Optional: consider whetherSendMessageResult(or the send methods) should be[[nodiscard]]to discourage accidental ignoring of{queued_for_retry, need_reconnect}states.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
tx_service/include/remote/cc_stream_sender.h(2 hunks)tx_service/src/cc/cc_shard.cpp(2 hunks)tx_service/src/dead_lock_check.cpp(1 hunks)tx_service/src/fault/fault_inject.cpp(1 hunks)tx_service/src/remote/cc_stream_sender.cpp(17 hunks)tx_service/src/remote/remote_cc_handler.cpp(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tx_service/src/remote/remote_cc_handler.cpp
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-12-02T10:43:27.431Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.
Applied to files:
tx_service/src/cc/cc_shard.cpp
🧬 Code graph analysis (2)
tx_service/include/remote/cc_stream_sender.h (1)
tx_service/src/remote/cc_stream_sender.cpp (8)
SendMessageToNg(528-535)SendMessageToNg(528-531)SendMessageToNode(158-321)SendMessageToNode(158-161)SendScanRespToNode(323-459)SendScanRespToNode(323-324)SendStandbyMessageToNode(461-516)SendStandbyMessageToNode(461-462)
tx_service/src/remote/cc_stream_sender.cpp (1)
tx_service/include/remote/cc_stream_sender.h (2)
Queued(85-93)Sent(80-94)
🔇 Additional comments (7)
tx_service/src/dead_lock_check.cpp (1)
210-216: Good adaptation toSendMessageResult(treat queued retry as non-fatal).
This aligns with the new send contract and avoids falsely treating “queued for retry” as an immediate hard failure.tx_service/src/fault/fault_inject.cpp (1)
203-205: Send result handling matches new API semantics.
Using(sent || queued_for_retry)is consistent with the new “queued is not fatal” meaning.tx_service/src/cc/cc_shard.cpp (2)
2710-2712: Standby forwarding correctly switches toSendStandbyMessageToNode.
Only potential footgun: ifstream_sender_can still be null after lazy init, this would crash—consider a defensive null check if not guaranteed by invariant.
2842-2844: Retry path correctly uses standby-specific send API.
Same note as above: consider guarding against a nullstream_sender_if that’s possible in practice.tx_service/src/remote/cc_stream_sender.cpp (3)
158-205:SendMessageToNodemigration toSendMessageResultlooks consistent.
Return values line up with the documented meaning (sent vs queued, reconnect-needed encoded vianeed_reconnect).Also applies to: 255-316, 320-321
323-366:SendScanRespToNodemigration toSendMessageResultlooks consistent.Also applies to: 415-459
845-864: Reconnect-aware resend loop is directionally good; double-check drop semantics onFailed().
These loops only special-caseneed_reconnect; if a resend returnsFailed()(not queued), the message is effectively dropped after dequeue. If that’s intended, OK; otherwise you may want explicit handling (e.g., re-enqueue remaining batch and/or mark stream for reconnect).Also applies to: 906-924
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (2)
tx_service/src/remote/cc_stream_sender.cpp (2)
262-262: Consider documenting the message discard policy.The TODO comment indicates uncertainty about whether to discard messages that fail after reconnection. This design decision impacts reliability guarantees and should be documented or resolved.
845-865: Consider documenting or enforcing the invariant thatneed_reconnect=truerequiresqueued_for_retry=true.The assertions at lines 853 and 914 assume that when
need_reconnectis true,queued_for_retrymust also be true. This invariant holds throughout the current codebase—all paths that setneed_reconnect=trueuseQueued(true), which also setsqueued_for_retry=true. However, theSendMessageResultAPI structure allowsFailed(true)which would return{sent=false, queued_for_retry=false, need_reconnect=true}, creating a theoretically invalid state.To prevent this type of violation in future code, consider either:
- Adding a comment documenting this invariant on the
SendMessageResultstruct- Modifying the factory methods to enforce it structurally (e.g., remove the
need_reconnectparameter fromFailed())
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
tx_service/include/remote/cc_stream_sender.h(2 hunks)tx_service/src/cc/cc_shard.cpp(2 hunks)tx_service/src/dead_lock_check.cpp(1 hunks)tx_service/src/fault/fault_inject.cpp(1 hunks)tx_service/src/remote/cc_stream_sender.cpp(17 hunks)tx_service/src/remote/remote_cc_handler.cpp(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tx_service/src/cc/cc_shard.cpp
🧰 Additional context used
🧬 Code graph analysis (2)
tx_service/src/remote/cc_stream_sender.cpp (1)
tx_service/include/remote/cc_stream_sender.h (2)
Queued(85-93)Sent(80-94)
tx_service/include/remote/cc_stream_sender.h (1)
tx_service/src/remote/cc_stream_sender.cpp (8)
SendMessageToNg(528-535)SendMessageToNg(528-531)SendMessageToNode(158-321)SendMessageToNode(158-161)SendScanRespToNode(323-459)SendScanRespToNode(323-324)SendStandbyMessageToNode(461-516)SendStandbyMessageToNode(461-462)
🔇 Additional comments (10)
tx_service/src/remote/remote_cc_handler.cpp (1)
868-873: LGTM! Correctly interprets the new SendMessageResult semantics.The change properly handles both the
sentandqueued_for_retrystates as success conditions for the fire-and-forget broadcast statistics operation.tx_service/src/dead_lock_check.cpp (1)
210-213: LGTM! Consistent with the new result semantics.The change correctly interprets
sent || queued_for_retryas successful submission for the deadlock detection coordination message.tx_service/src/fault/fault_inject.cpp (1)
203-205: LGTM! Proper fallback logic with the new result type.The change correctly uses
sent || queued_for_retryto determine whether the remote fault injection succeeded, falling back to local execution only when both are false.tx_service/include/remote/cc_stream_sender.h (2)
74-94: LGTM! Well-designed result structure.The
SendMessageResultstruct with static factory methods provides a clear, self-documenting API for the multi-state send outcomes (Sent, Queued, Failed). The three boolean fields effectively capture the necessary state for callers to handle retries and reconnections.
104-127: LGTM! Clear API contracts with good documentation.The updated signatures returning
SendMessageResultprovide better visibility into send outcomes compared to simple boolean returns. The documentation forSendStandbyMessageToNodeclearly distinguishes its best-effort semantics from the retry-enabledSendMessageToNode.tx_service/src/remote/cc_stream_sender.cpp (5)
158-321: LGTM! Clear result semantics with proper error handling.The updated
SendMessageToNodecorrectly returnsSendMessageResultwith appropriate state for each error path:
Failed()when destination unknown or resend fails after reconnectQueued(true)when stream is connecting or needs reconnectionQueued(false)on EAGAIN (transient backpressure)Sent()on immediate successThe
need_reconnectfield properly distinguishes between transient (EAGAIN) and persistent errors.
323-459: LGTM! Consistent with SendMessageToNode logic.The implementation correctly applies the same
SendMessageResultsemantics to long message streams, maintaining consistency across the codebase.
461-516: LGTM! Best-effort semantics properly implemented.The
SendStandbyMessageToNodecorrectly implements fire-and-forget semantics without retry queuing. The reconnection logic for non-EAGAIN errors (lines 498-511) properly addresses the past review comment, ensuring prompt stream recovery rather than relying on idle timeout.
528-535: LGTM! Clean delegation pattern.The wrapper correctly propagates the
SendMessageResultfromSendMessageToNode.
713-719: LGTM! Proper resend logic with the new result type.The background resend thread correctly interprets both
sentandqueued_for_retryas progress, and breaks to the next stream when neither succeeds, preventing wasted retries on a persistently broken connection.Also applies to: 761-767
Related PR:
eloqdata/eloqkv#333
eloqdata/eloqsql#183
eloqdata/eloqdoc#360
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.