Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bitcoin::{
io::Read,
key::rand,
p2p::{address::AddrV2, message::CommandString, Magic},
Wtxid,
BlockHash, Wtxid,
};
use socks::create_socks5;
use tokio::{net::TcpStream, time::Instant};
Expand Down Expand Up @@ -41,6 +41,8 @@ const TWO_HOUR: Duration = Duration::from_secs(60 * 60 * 2);
const TCP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(2);
// Ping the peer if we have not exchanged messages for two minutes
const SEND_PING: Duration = Duration::from_secs(60 * 2);
// An absolute maximum timeout to respond to a batch filter request
const MAX_FILTER_RESPONSE_TIME_SEC: Duration = Duration::from_secs(20);

// These are the parameters of the "tried" and "new" tables
const B_TRIED: usize = 64;
Expand Down Expand Up @@ -180,6 +182,7 @@ struct MessageState {
sent_txs: HashSet<Wtxid>,
timed_message_state: HashMap<TimeSensitiveId, Instant>,
ping_state: PingState,
filter_rate: FilterRate,
}

impl MessageState {
Expand All @@ -191,6 +194,7 @@ impl MessageState {
sent_txs: Default::default(),
timed_message_state: Default::default(),
ping_state: PingState::default(),
filter_rate: FilterRate::default(),
}
}

Expand Down Expand Up @@ -329,6 +333,35 @@ impl Default for PingState {
}
}

#[derive(Debug, Clone, Default)]
struct FilterRate {
waiting_for: Option<(BlockHash, Instant)>,
}

impl FilterRate {
fn batch_requested(&mut self, stop_hash: BlockHash) {
self.waiting_for = Some((stop_hash, Instant::now()))
}

fn filter_received(&mut self, block_hash: BlockHash) {
if let Some((hash, _)) = self.waiting_for {
if hash.eq(&block_hash) {
self.waiting_for = None;
}
}
}

fn slow_peer(&self) -> bool {
if let Some((_, then)) = self.waiting_for {
let elapsed = then.elapsed();
if elapsed > MAX_FILTER_RESPONSE_TIME_SEC {
return true;
}
}
false
}
}

pub(crate) struct V1Header {
magic: Magic,
_command: CommandString,
Expand Down Expand Up @@ -440,10 +473,12 @@ impl AddressBook {
mod tests {
use std::time::Duration;

use bitcoin::{consensus::deserialize, Transaction};
use bitcoin::{consensus::deserialize, hashes::Hash, BlockHash, Transaction};

use crate::network::{LastBlockMonitor, MessageState, PingState};

use super::FilterRate;

#[tokio::test(start_paused = true)]
async fn test_version_message_state() {
let timeout = Duration::from_secs(1);
Expand Down Expand Up @@ -536,4 +571,17 @@ mod tests {
last_block.reset();
assert!(!last_block.stale());
}

#[tokio::test(start_paused = true)]
async fn test_filter_rate_stale() {
let mut filter_rate = FilterRate::default();
let block_hash_bytes = [1; 32];
let block_hash = BlockHash::from_byte_array(block_hash_bytes);
filter_rate.batch_requested(block_hash);
assert!(!filter_rate.slow_peer());
tokio::time::sleep(Duration::from_secs(15)).await;
assert!(!filter_rate.slow_peer());
tokio::time::sleep(Duration::from_secs(21)).await;
assert!(filter_rate.slow_peer());
}
}
49 changes: 30 additions & 19 deletions src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
mpsc::{self, Receiver, Sender},
Mutex,
},
time::Instant,
time::{Instant, MissedTickBehavior},
};

use crate::{
Expand All @@ -33,7 +33,7 @@ use super::{
AddressBook, MessageState, PeerId, PeerTimeoutConfig,
};

const LOOP_TIMEOUT: Duration = Duration::from_secs(2);
const LOOP_TIMEOUT: Duration = Duration::from_millis(500);
const V2_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(4);

pub(crate) struct Peer {
Expand Down Expand Up @@ -112,6 +112,8 @@ impl Peer {
self.write_bytes(&mut writer, message).await?;
self.message_state.start_version_handshake();
let read_handle = tokio::spawn(async move { peer_reader.read_from_remote().await });
let mut interval = tokio::time::interval(LOOP_TIMEOUT);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
if read_handle.is_finished() {
return Ok(());
Expand All @@ -128,6 +130,10 @@ impl Peer {
self.dialog.send_warning(Warning::PeerTimedOut);
return Ok(());
}
if self.message_state.filter_rate.slow_peer() {
self.dialog.send_warning(Warning::PeerTimedOut);
return Ok(());
}
if Instant::now().duration_since(start_time) > self.timeout_config.max_connection_time {
crate::debug!(format!(
"The connection to peer {} has been maintained for over {} seconds, finding a new peer",
Expand All @@ -137,23 +143,21 @@ impl Peer {
}
select! {
// The peer sent us a message
peer_message = tokio::time::timeout(LOOP_TIMEOUT, rx.recv()) => {
if let Ok(peer_message) = peer_message {
match peer_message {
Some(message) => {
match self.handle_peer_message(message, &mut writer, &mut outbound_messages).await {
Ok(()) => continue,
Err(e) => {
match e {
// We were told by the reader thread to disconnect from this peer
PeerError::DisconnectCommand => return Ok(()),
_ => continue,
}
},
}
},
None => continue,
}
peer_message = rx.recv() => {
match peer_message {
Some(message) => {
match self.handle_peer_message(message, &mut writer, &mut outbound_messages).await {
Ok(()) => continue,
Err(e) => {
match e {
// We were told by the reader thread to disconnect from this peer
PeerError::DisconnectCommand => return Ok(()),
_ => continue,
}
},
}
},
None => continue,
}
}
// The main thread sent us a message
Expand All @@ -174,6 +178,7 @@ impl Peer {
None => continue,
}
}
_ = interval.tick() => continue,
}
}
}
Expand Down Expand Up @@ -228,6 +233,9 @@ impl Peer {
Ok(())
}
ReaderMessage::Filter(filter) => {
self.message_state
.filter_rate
.filter_received(filter.block_hash);
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
Expand Down Expand Up @@ -355,6 +363,9 @@ impl Peer {
self.write_bytes(writer, message).await?;
}
MainThreadMessage::GetFilters(config) => {
self.message_state
.filter_rate
.batch_requested(config.stop_hash);
let message = message_generator.filters(config)?;
self.write_bytes(writer, message).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Node {
PeerMessage::Filter(filter) => {
match self.handle_filter(peer_thread.nonce, filter).await {
Some(response) => {
self.peer_map.broadcast(response).await;
self.peer_map.send_message(peer_thread.nonce, response).await;
}
None => continue,
}
Expand Down
Loading