-
Notifications
You must be signed in to change notification settings - Fork 18
feat: finish the implementation of the block fetch protocol #643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds ReadOnlyChainStore::get_range for inclusive parent-linked header traversal, implements responder-side block streaming (BlockStreaming, current_range, load_first_block), updates related exports/types and tests, and adds a RocksDB unit test for get_range. Changes
Sequence Diagram(s)sequenceDiagram
participant Net as Network
participant Resp as BlockFetchResponder
participant Store as ReadOnlyChainStore
Net->>Resp: RequestRange(from, through)
activate Resp
Resp->>Store: get_range(from, through)
activate Store
Store-->>Resp: Vec<HeaderHash>
deactivate Store
Resp->>Resp: current_range = VecDeque(hashes)
Resp->>Resp: load_first_block()
alt block available
Resp->>Store: load block bytes for header
activate Store
Store-->>Resp: RawBlock bytes
deactivate Store
Resp-->>Net: BlockStreaming::SendBlock(bytes)
Resp->>Resp: continue streaming or emit Done
else no block
Resp-->>Net: BlockStreaming::Done / NoBlocks
end
deactivate Resp
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@crates/amaru-ouroboros-traits/src/stores/consensus/mod.rs`:
- Around line 96-116: The get_range implementation can return a partial path
when the walk hits a root or a missing parent before reaching from_inclusive;
update get_range (and its use of load_header and header.parent()) to return an
empty Vec if the ancestry chain breaks before reaching from_inclusive: while
walking from to_inclusive toward from_inclusive, detect any missing header
(load_header returns None) or missing parent that prevents reaching
from_inclusive and immediately return Vec::new(); only if the loop finishes with
current_hash == *from_inclusive should you collect and reverse the headers and
return them.
In `@crates/amaru-protocols/src/blockfetch/responder.rs`:
- Around line 54-56: Update the docstring for the function that "loads the first
existing block in the current range" to accurately reflect the loop behavior:
replace the sentence that says "the first element of the current_range is
consumed" with wording that clarifies multiple elements may be consumed (e.g.,
"elements of current_range are consumed until a block is found or the range is
exhausted"), keeping the rest of the description unchanged; locate the doc
comment immediately above the block-loading function in responder.rs to apply
this change.
In `@crates/amaru-protocols/src/tests.rs`:
- Around line 65-69: The test soak was reduced to 3s causing flakiness; restore
the original 5s delay by changing the sleep in the tokio::select! branch from
Duration::from_secs(3) back to Duration::from_secs(5) so the responder.join() /
initiator.join() stability check retains the intended buffer.
🧹 Nitpick comments (1)
crates/amaru-protocols/src/blockfetch/responder.rs (1)
95-107: Avoidblockshadowing for clarity.
The innerblockhides the outerblock, which makes the flow a bit harder to follow. Renaming keeps things crystal clear.🎬 Tiny readability tweak
- if let Some(block) = self.load_first_block(eff).await { + if let Some(next_block) = self.load_first_block(eff).await { eff.send( eff.me_ref(), - Inputs::Local(BlockStreaming::SendBlock(block.to_vec())), + Inputs::Local(BlockStreaming::SendBlock(next_block.to_vec())), ) .await; } else {
592f873 to
659047b
Compare
659047b to
4b99e80
Compare
Signed-off-by: etorreborre <etorreborre@yahoo.com>
4b99e80 to
823d0cb
Compare
rkuhn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! just some minor improvements and a possible flakiness issue
| fn get_range(&self, from_inclusive: &HeaderHash, to_inclusive: &HeaderHash) -> Vec<HeaderHash> { | ||
| let mut headers = vec![]; | ||
| let mut current_hash = *to_inclusive; | ||
| while current_hash != *from_inclusive { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop will run until genesis if from_inclusive is not an ancestor of to_inclusive — this is an attack vector. We should probably set an upper bound on how many hashes are returned at maximum. What does the Haskell node do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is a naive strategy indeed. The Haskell node returns an iterator on blocks when they are on the best chain, only if from starts before the chain anchor (before k blocks). Then, there's no limitation that I can see. Otherwise we are on a fork and the amount of streamed blocks is limited.
I propose to tackle this in a follow-up PR if that's ok with you.
| State::Idle, | ||
| Self { | ||
| muxer, | ||
| current_range: VecDeque::default(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we only consume from one end and don’t add to the other, perhaps a Vec is enough (with parent after child, so we can pop off the end)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might go away entirely in a follow-up PR.
| let store = Store::new(eff.clone()); | ||
| let range = store.get_range(&from.hash(), &through.hash()); | ||
| self.current_range = VecDeque::from(range); | ||
| if let Some(block) = self.load_first_block(eff).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be cleaner and easier to read if there was a check_block_exists function to decide this case here and then only have a Streaming message without further content that is used in local() to send the next block or end the stream. Your proposal is correct, though, and avoids one call to the database. I’m on the fence — have you tried the other way to see which one you like better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't tried both, but I indeed wanted to avoid one call.
| } | ||
| IntersectFound(point, tip) => { | ||
| tracing::info!(peer = %msg.peer, ?point, ?tip, "intersect found"); | ||
| tracing::info!(peer = %msg.peer, %point, %tip, "intersect found"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| tracing::info!(peer = %msg.peer, %point, %tip, "intersect found"); | |
| tracing::info!(peer = %msg.peer, %point, tip_point = %tip.point(), "intersect found"); |
The issue with %tip is that it doesn’t show the slot number, which makes it harder to see the relation to %point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we change the Tip Display instance to include the slot?
| chain_store.set_best_chain_hash(&header.hash())?; | ||
|
|
||
| // Add a block for each header | ||
| // We skip one block to test that the initiator can try to fetch missing blocks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unclear to me: it was my understanding that a range can only be returned if all blocks are present, otherwise NoBlocks. did I read wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was my impression that we could start streaming from several peers at the same time, where some peers might not yet have all the blocks on the required chain. So we respond in a best-effort manner. This is definitely something that we could contribute to the blueprint because the specification is not clear-cut on the subject.
Signed-off-by: etorreborre <etorreborre@yahoo.com>
f668d9c to
e95cb50
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@crates/amaru-protocols/src/tests.rs`:
- Around line 146-156: The current wait_for_termination function drops the
Result from tokio::time::timeout so a timeout is ignored; update
wait_for_termination to check the timeout result and return an error when the
timeout elapses (e.g., match or use ? on the timeout call and convert the
Elapsed into an anyhow::Error), while still awaiting responder_done.notified()
and initiator_done.notified(); refer to the function name wait_for_termination
and the notifier variables responder_done and initiator_done to locate where to
perform the change.
🧹 Nitpick comments (1)
crates/amaru-protocols/src/tests.rs (1)
195-258: Public API surface is a bit mismatched.
AcceptStateis public andaccept_stageis public, but external callers can’t constructAcceptStatebecausenewis private and fields aren’t public. Either make the constructor public or reduce visibility topub(crate)to avoid a “can’t call this” API.🎛️ Option A: make the constructor public
-impl AcceptState { - fn new(manager_stage: StageRef<ManagerMessage>, notify: Arc<Notify>) -> Self { +impl AcceptState { + pub fn new(manager_stage: StageRef<ManagerMessage>, notify: Arc<Notify>) -> Self { Self { manager_stage, notify, } } }
…he responder are done Signed-off-by: etorreborre <etorreborre@yahoo.com>
e95cb50 to
01c80f9
Compare
This PR completes (on the responder side) and tests the retrieval of blocks with the blockfetch protocol:
StartBatch / Block / BatchDonemessages.ChainStore.Summary by CodeRabbit
New Features
Improvements
Tests
✏️ Tip: You can customize this high-level summary in your review settings.