diff --git a/src/network/mod.rs b/src/network/mod.rs index 47478b46..6c95c446 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -12,7 +12,7 @@ use bitcoin::{ io::Read, key::rand, p2p::{address::AddrV2, message::CommandString, Magic}, - Wtxid, + BlockHash, Wtxid, }; use socks::create_socks5; use tokio::{net::TcpStream, time::Instant}; @@ -41,6 +41,8 @@ const TWO_HOUR: Duration = Duration::from_secs(60 * 60 * 2); const TCP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(2); // Ping the peer if we have not exchanged messages for two minutes const SEND_PING: Duration = Duration::from_secs(60 * 2); +// An absolute maximum timeout to respond to a batch filter request +const MAX_FILTER_RESPONSE_TIME_SEC: Duration = Duration::from_secs(20); // These are the parameters of the "tried" and "new" tables const B_TRIED: usize = 64; @@ -180,6 +182,7 @@ struct MessageState { sent_txs: HashSet, timed_message_state: HashMap, ping_state: PingState, + filter_rate: FilterRate, } impl MessageState { @@ -191,6 +194,7 @@ impl MessageState { sent_txs: Default::default(), timed_message_state: Default::default(), ping_state: PingState::default(), + filter_rate: FilterRate::default(), } } @@ -329,6 +333,35 @@ impl Default for PingState { } } +#[derive(Debug, Clone, Default)] +struct FilterRate { + waiting_for: Option<(BlockHash, Instant)>, +} + +impl FilterRate { + fn batch_requested(&mut self, stop_hash: BlockHash) { + self.waiting_for = Some((stop_hash, Instant::now())) + } + + fn filter_received(&mut self, block_hash: BlockHash) { + if let Some((hash, _)) = self.waiting_for { + if hash.eq(&block_hash) { + self.waiting_for = None; + } + } + } + + fn slow_peer(&self) -> bool { + if let Some((_, then)) = self.waiting_for { + let elapsed = then.elapsed(); + if elapsed > MAX_FILTER_RESPONSE_TIME_SEC { + return true; + } + } + false + } +} + pub(crate) struct V1Header { magic: Magic, _command: CommandString, @@ -440,10 +473,12 @@ impl AddressBook { mod tests { use std::time::Duration; - use bitcoin::{consensus::deserialize, Transaction}; + use bitcoin::{consensus::deserialize, hashes::Hash, BlockHash, Transaction}; use crate::network::{LastBlockMonitor, MessageState, PingState}; + use super::FilterRate; + #[tokio::test(start_paused = true)] async fn test_version_message_state() { let timeout = Duration::from_secs(1); @@ -536,4 +571,17 @@ mod tests { last_block.reset(); assert!(!last_block.stale()); } + + #[tokio::test(start_paused = true)] + async fn test_filter_rate_stale() { + let mut filter_rate = FilterRate::default(); + let block_hash_bytes = [1; 32]; + let block_hash = BlockHash::from_byte_array(block_hash_bytes); + filter_rate.batch_requested(block_hash); + assert!(!filter_rate.slow_peer()); + tokio::time::sleep(Duration::from_secs(15)).await; + assert!(!filter_rate.slow_peer()); + tokio::time::sleep(Duration::from_secs(21)).await; + assert!(filter_rate.slow_peer()); + } } diff --git a/src/network/peer.rs b/src/network/peer.rs index d99dda08..93612e58 100644 --- a/src/network/peer.rs +++ b/src/network/peer.rs @@ -12,7 +12,7 @@ use tokio::{ mpsc::{self, Receiver, Sender}, Mutex, }, - time::Instant, + time::{Instant, MissedTickBehavior}, }; use crate::{ @@ -33,7 +33,7 @@ use super::{ AddressBook, MessageState, PeerId, PeerTimeoutConfig, }; -const LOOP_TIMEOUT: Duration = Duration::from_secs(2); +const LOOP_TIMEOUT: Duration = Duration::from_millis(500); const V2_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(4); pub(crate) struct Peer { @@ -112,6 +112,8 @@ impl Peer { self.write_bytes(&mut writer, message).await?; self.message_state.start_version_handshake(); let read_handle = tokio::spawn(async move { peer_reader.read_from_remote().await }); + let mut interval = tokio::time::interval(LOOP_TIMEOUT); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { if read_handle.is_finished() { return Ok(()); @@ -128,6 +130,10 @@ impl Peer { self.dialog.send_warning(Warning::PeerTimedOut); return Ok(()); } + if self.message_state.filter_rate.slow_peer() { + self.dialog.send_warning(Warning::PeerTimedOut); + return Ok(()); + } if Instant::now().duration_since(start_time) > self.timeout_config.max_connection_time { crate::debug!(format!( "The connection to peer {} has been maintained for over {} seconds, finding a new peer", @@ -137,23 +143,21 @@ impl Peer { } select! { // The peer sent us a message - peer_message = tokio::time::timeout(LOOP_TIMEOUT, rx.recv()) => { - if let Ok(peer_message) = peer_message { - match peer_message { - Some(message) => { - match self.handle_peer_message(message, &mut writer, &mut outbound_messages).await { - Ok(()) => continue, - Err(e) => { - match e { - // We were told by the reader thread to disconnect from this peer - PeerError::DisconnectCommand => return Ok(()), - _ => continue, - } - }, - } - }, - None => continue, - } + peer_message = rx.recv() => { + match peer_message { + Some(message) => { + match self.handle_peer_message(message, &mut writer, &mut outbound_messages).await { + Ok(()) => continue, + Err(e) => { + match e { + // We were told by the reader thread to disconnect from this peer + PeerError::DisconnectCommand => return Ok(()), + _ => continue, + } + }, + } + }, + None => continue, } } // The main thread sent us a message @@ -174,6 +178,7 @@ impl Peer { None => continue, } } + _ = interval.tick() => continue, } } } @@ -228,6 +233,9 @@ impl Peer { Ok(()) } ReaderMessage::Filter(filter) => { + self.message_state + .filter_rate + .filter_received(filter.block_hash); self.main_thread_sender .send(PeerThreadMessage { nonce: self.nonce, @@ -355,6 +363,9 @@ impl Peer { self.write_bytes(writer, message).await?; } MainThreadMessage::GetFilters(config) => { + self.message_state + .filter_rate + .batch_requested(config.stop_hash); let message = message_generator.filters(config)?; self.write_bytes(writer, message).await?; } diff --git a/src/node.rs b/src/node.rs index de73d684..b3cf4a8b 100644 --- a/src/node.rs +++ b/src/node.rs @@ -176,7 +176,7 @@ impl Node { PeerMessage::Filter(filter) => { match self.handle_filter(peer_thread.nonce, filter).await { Some(response) => { - self.peer_map.broadcast(response).await; + self.peer_map.send_message(peer_thread.nonce, response).await; } None => continue, }