Skip to content

Commit 4680308

Browse files
authored
Merge pull request #506 from rustaceanrob/9-22-filter-rate
net: Add timeout rates for filter messages
2 parents c959b5f + 39e0252 commit 4680308

File tree

3 files changed

+81
-22
lines changed

3 files changed

+81
-22
lines changed

src/network/mod.rs

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use bitcoin::{
1212
io::Read,
1313
key::rand,
1414
p2p::{address::AddrV2, message::CommandString, Magic},
15-
Wtxid,
15+
BlockHash, Wtxid,
1616
};
1717
use socks::create_socks5;
1818
use tokio::{net::TcpStream, time::Instant};
@@ -41,6 +41,8 @@ const TWO_HOUR: Duration = Duration::from_secs(60 * 60 * 2);
4141
const TCP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(2);
4242
// Ping the peer if we have not exchanged messages for two minutes
4343
const SEND_PING: Duration = Duration::from_secs(60 * 2);
44+
// An absolute maximum timeout to respond to a batch filter request
45+
const MAX_FILTER_RESPONSE_TIME_SEC: Duration = Duration::from_secs(20);
4446

4547
// These are the parameters of the "tried" and "new" tables
4648
const B_TRIED: usize = 64;
@@ -180,6 +182,7 @@ struct MessageState {
180182
sent_txs: HashSet<Wtxid>,
181183
timed_message_state: HashMap<TimeSensitiveId, Instant>,
182184
ping_state: PingState,
185+
filter_rate: FilterRate,
183186
}
184187

185188
impl MessageState {
@@ -191,6 +194,7 @@ impl MessageState {
191194
sent_txs: Default::default(),
192195
timed_message_state: Default::default(),
193196
ping_state: PingState::default(),
197+
filter_rate: FilterRate::default(),
194198
}
195199
}
196200

@@ -329,6 +333,35 @@ impl Default for PingState {
329333
}
330334
}
331335

336+
#[derive(Debug, Clone, Default)]
337+
struct FilterRate {
338+
waiting_for: Option<(BlockHash, Instant)>,
339+
}
340+
341+
impl FilterRate {
342+
fn batch_requested(&mut self, stop_hash: BlockHash) {
343+
self.waiting_for = Some((stop_hash, Instant::now()))
344+
}
345+
346+
fn filter_received(&mut self, block_hash: BlockHash) {
347+
if let Some((hash, _)) = self.waiting_for {
348+
if hash.eq(&block_hash) {
349+
self.waiting_for = None;
350+
}
351+
}
352+
}
353+
354+
fn slow_peer(&self) -> bool {
355+
if let Some((_, then)) = self.waiting_for {
356+
let elapsed = then.elapsed();
357+
if elapsed > MAX_FILTER_RESPONSE_TIME_SEC {
358+
return true;
359+
}
360+
}
361+
false
362+
}
363+
}
364+
332365
pub(crate) struct V1Header {
333366
magic: Magic,
334367
_command: CommandString,
@@ -440,10 +473,12 @@ impl AddressBook {
440473
mod tests {
441474
use std::time::Duration;
442475

443-
use bitcoin::{consensus::deserialize, Transaction};
476+
use bitcoin::{consensus::deserialize, hashes::Hash, BlockHash, Transaction};
444477

445478
use crate::network::{LastBlockMonitor, MessageState, PingState};
446479

480+
use super::FilterRate;
481+
447482
#[tokio::test(start_paused = true)]
448483
async fn test_version_message_state() {
449484
let timeout = Duration::from_secs(1);
@@ -536,4 +571,17 @@ mod tests {
536571
last_block.reset();
537572
assert!(!last_block.stale());
538573
}
574+
575+
#[tokio::test(start_paused = true)]
576+
async fn test_filter_rate_stale() {
577+
let mut filter_rate = FilterRate::default();
578+
let block_hash_bytes = [1; 32];
579+
let block_hash = BlockHash::from_byte_array(block_hash_bytes);
580+
filter_rate.batch_requested(block_hash);
581+
assert!(!filter_rate.slow_peer());
582+
tokio::time::sleep(Duration::from_secs(15)).await;
583+
assert!(!filter_rate.slow_peer());
584+
tokio::time::sleep(Duration::from_secs(21)).await;
585+
assert!(filter_rate.slow_peer());
586+
}
539587
}

src/network/peer.rs

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use tokio::{
1212
mpsc::{self, Receiver, Sender},
1313
Mutex,
1414
},
15-
time::Instant,
15+
time::{Instant, MissedTickBehavior},
1616
};
1717

1818
use crate::{
@@ -33,7 +33,7 @@ use super::{
3333
AddressBook, MessageState, PeerId, PeerTimeoutConfig,
3434
};
3535

36-
const LOOP_TIMEOUT: Duration = Duration::from_secs(2);
36+
const LOOP_TIMEOUT: Duration = Duration::from_millis(500);
3737
const V2_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(4);
3838

3939
pub(crate) struct Peer {
@@ -112,6 +112,8 @@ impl Peer {
112112
self.write_bytes(&mut writer, message).await?;
113113
self.message_state.start_version_handshake();
114114
let read_handle = tokio::spawn(async move { peer_reader.read_from_remote().await });
115+
let mut interval = tokio::time::interval(LOOP_TIMEOUT);
116+
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
115117
loop {
116118
if read_handle.is_finished() {
117119
return Ok(());
@@ -128,6 +130,10 @@ impl Peer {
128130
self.dialog.send_warning(Warning::PeerTimedOut);
129131
return Ok(());
130132
}
133+
if self.message_state.filter_rate.slow_peer() {
134+
self.dialog.send_warning(Warning::PeerTimedOut);
135+
return Ok(());
136+
}
131137
if Instant::now().duration_since(start_time) > self.timeout_config.max_connection_time {
132138
crate::debug!(format!(
133139
"The connection to peer {} has been maintained for over {} seconds, finding a new peer",
@@ -137,23 +143,21 @@ impl Peer {
137143
}
138144
select! {
139145
// The peer sent us a message
140-
peer_message = tokio::time::timeout(LOOP_TIMEOUT, rx.recv()) => {
141-
if let Ok(peer_message) = peer_message {
142-
match peer_message {
143-
Some(message) => {
144-
match self.handle_peer_message(message, &mut writer, &mut outbound_messages).await {
145-
Ok(()) => continue,
146-
Err(e) => {
147-
match e {
148-
// We were told by the reader thread to disconnect from this peer
149-
PeerError::DisconnectCommand => return Ok(()),
150-
_ => continue,
151-
}
152-
},
153-
}
154-
},
155-
None => continue,
156-
}
146+
peer_message = rx.recv() => {
147+
match peer_message {
148+
Some(message) => {
149+
match self.handle_peer_message(message, &mut writer, &mut outbound_messages).await {
150+
Ok(()) => continue,
151+
Err(e) => {
152+
match e {
153+
// We were told by the reader thread to disconnect from this peer
154+
PeerError::DisconnectCommand => return Ok(()),
155+
_ => continue,
156+
}
157+
},
158+
}
159+
},
160+
None => continue,
157161
}
158162
}
159163
// The main thread sent us a message
@@ -174,6 +178,7 @@ impl Peer {
174178
None => continue,
175179
}
176180
}
181+
_ = interval.tick() => continue,
177182
}
178183
}
179184
}
@@ -228,6 +233,9 @@ impl Peer {
228233
Ok(())
229234
}
230235
ReaderMessage::Filter(filter) => {
236+
self.message_state
237+
.filter_rate
238+
.filter_received(filter.block_hash);
231239
self.main_thread_sender
232240
.send(PeerThreadMessage {
233241
nonce: self.nonce,
@@ -355,6 +363,9 @@ impl Peer {
355363
self.write_bytes(writer, message).await?;
356364
}
357365
MainThreadMessage::GetFilters(config) => {
366+
self.message_state
367+
.filter_rate
368+
.batch_requested(config.stop_hash);
358369
let message = message_generator.filters(config)?;
359370
self.write_bytes(writer, message).await?;
360371
}

src/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ impl Node {
176176
PeerMessage::Filter(filter) => {
177177
match self.handle_filter(peer_thread.nonce, filter).await {
178178
Some(response) => {
179-
self.peer_map.broadcast(response).await;
179+
self.peer_map.send_message(peer_thread.nonce, response).await;
180180
}
181181
None => continue,
182182
}

0 commit comments

Comments
 (0)