diff --git a/src/chain/block_queue.rs b/src/chain/block_queue.rs index f7c8c166..2b0effa1 100644 --- a/src/chain/block_queue.rs +++ b/src/chain/block_queue.rs @@ -94,49 +94,32 @@ impl BlockQueue { #[derive(Debug)] pub(crate) struct Request { hash: BlockHash, - recipient: BlockRecipient, + recipient: oneshot::Sender>, } impl Request { - fn new(hash: BlockHash) -> Self { - Self { - hash, - recipient: BlockRecipient::Event, - } - } - fn from_block_request( block_request: ClientRequest>, ) -> Self { let (hash, oneshot) = block_request.into_values(); Self { hash, - recipient: BlockRecipient::Client(oneshot), + recipient: oneshot, } } } -impl From for Request { - fn from(value: BlockHash) -> Self { - Request::new(value) - } -} - impl From>> for Request { fn from(value: ClientRequest>) -> Self { Request::from_block_request(value) } } -#[derive(Debug)] -pub(crate) enum BlockRecipient { - Client(oneshot::Sender>), - Event, -} - #[derive(Debug)] pub(crate) enum ProcessBlockResponse { - Accepted { block_recipient: BlockRecipient }, + Accepted { + block_recipient: oneshot::Sender>, + }, LateResponse, UnknownHash, } @@ -161,14 +144,26 @@ mod test { [hash_1, hash_2, hash_3] } + trait DummyRequestExt { + fn dummy_request(&self) -> Request; + } + + impl DummyRequestExt for BlockHash { + fn dummy_request(&self) -> Request { + let (tx, _rx) = oneshot::channel(); + let client_request = ClientRequest::new(*self, tx); + Request::from_block_request(client_request) + } + } + #[test] fn test_block_queue() { let [hash_1, hash_2, hash_3] = three_block_hashes(); let mut queue = BlockQueue::new(); - queue.add(hash_1); - queue.add(hash_2); - queue.add(hash_3); - queue.add(hash_1); + queue.add(hash_1.dummy_request()); + queue.add(hash_2.dummy_request()); + queue.add(hash_3.dummy_request()); + queue.add(hash_1.dummy_request()); assert_eq!(queue.queue.len(), 4); assert_eq!(queue.pop(), Some(hash_1)); assert_eq!(queue.pop(), None); @@ -201,9 +196,9 @@ mod test { async fn test_laggy_peer() { let [hash_1, hash_2, hash_3] = three_block_hashes(); let mut queue = BlockQueue::new(); - queue.add(hash_1); - queue.add(hash_2); - queue.add(hash_3); + queue.add(hash_1.dummy_request()); + queue.add(hash_2.dummy_request()); + queue.add(hash_3.dummy_request()); assert_eq!(queue.queue.len(), 3); assert_eq!(queue.pop(), Some(hash_1)); tokio::time::sleep(Duration::from_secs(6)).await; @@ -241,10 +236,10 @@ mod test { fn test_blocks_removed() { let [hash_1, hash_2, hash_3] = three_block_hashes(); let mut queue = BlockQueue::new(); - queue.add(hash_1); - queue.add(hash_2); - queue.add(hash_3); - queue.add(hash_1); + queue.add(hash_1.dummy_request()); + queue.add(hash_2.dummy_request()); + queue.add(hash_3.dummy_request()); + queue.add(hash_1.dummy_request()); assert_eq!(queue.queue.len(), 4); assert_eq!(queue.pop(), Some(hash_1)); assert_eq!( diff --git a/src/messages.rs b/src/messages.rs index d8812834..7021c859 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -43,8 +43,6 @@ impl core::fmt::Display for Info { /// Data and structures useful for a consumer, such as a wallet. #[derive(Debug, Clone)] pub enum Event { - /// A [`Block`](crate) that was requested. - Block(IndexedBlock), /// The chain of block headers has been altered in some way. ChainUpdate(BlockHeaderChanges), /// The node is fully synced, having scanned the requested range. diff --git a/src/node.rs b/src/node.rs index 8e8d226e..94bd4293 100644 --- a/src/node.rs +++ b/src/node.rs @@ -25,7 +25,7 @@ use tokio::{ use crate::{ chain::{ - block_queue::{BlockQueue, BlockRecipient, ProcessBlockResponse}, + block_queue::{BlockQueue, ProcessBlockResponse}, chain::Chain, checkpoints::HeaderCheckpoint, error::HeaderSyncError, @@ -552,18 +552,12 @@ impl Node { self.dialog .send_info(Info::BlockReceived(block.block_hash())) .await; - match block_recipient { - BlockRecipient::Client(sender) => { - let send_err = sender.send(Ok(IndexedBlock::new(height, block))).is_err(); - if send_err { - self.dialog.send_warning(Warning::ChannelDropped); - }; - } - BlockRecipient::Event => { - self.dialog - .send_event(Event::Block(IndexedBlock::new(height, block))); - } - } + let send_err = block_recipient + .send(Ok(IndexedBlock::new(height, block))) + .is_err(); + if send_err { + self.dialog.send_warning(Warning::ChannelDropped); + }; } ProcessBlockResponse::LateResponse => { crate::debug!(format!(