Skip to content

Commit 3016069

Browse files
committed
fix(crawler): iron out deadlock scenarios
1 parent a329d3b commit 3016069

1 file changed

Lines changed: 61 additions & 23 deletions

File tree

crawler/src/crawler.rs

Lines changed: 61 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use bitcoin_peers_connection::{
77
};
88
use log::{debug, info};
99
use std::net::IpAddr;
10+
use std::sync::Arc;
1011
use std::time::{Duration, Instant};
1112
use std::{collections::HashSet, fmt};
1213
use tokio::sync::mpsc::{self, Receiver};
@@ -27,15 +28,17 @@ trait PeerConnection {
2728
///
2829
/// * `peer_tx` - Channel to send discovered peers through.
2930
/// * `peer_timeout` - Maximum duration to wait for responses.
31+
/// * `tested_peers` - Shared set of already tested peers for filtering.
3032
///
3133
/// # Returns
3234
///
3335
/// * `Ok(usize)` - The number of peer addresses sent through the channel.
3436
/// * `Err(ConnectionError)` - If an error occurs during the exchange.
3537
async fn get_peers(
3638
&mut self,
37-
peer_tx: &mpsc::Sender<Vec<Peer>>,
39+
peer_tx: &mpsc::UnboundedSender<Vec<Peer>>,
3840
peer_timeout: Duration,
41+
tested_peers: &Arc<tokio::sync::RwLock<HashSet<Peer>>>,
3942
) -> Result<usize, ConnectionError> {
4043
self.send(NetworkMessage::GetAddr).await?;
4144
debug!("Sent getaddr message to peer");
@@ -107,10 +110,24 @@ trait PeerConnection {
107110
}
108111
}
109112

110-
// Send the batch if we collected any peers
111-
if !peers_batch.is_empty() && peer_tx.send(peers_batch).await.is_err() {
112-
// Receiver dropped, stop processing
113-
break;
113+
// Send the batch if we collected any peers, but filter first
114+
if !peers_batch.is_empty() {
115+
// Filter out peers that have already been tested (memory optimization)
116+
let mut filtered_peers = Vec::new();
117+
{
118+
let tested = tested_peers.read().await;
119+
for peer in peers_batch {
120+
if !tested.contains(&peer) {
121+
filtered_peers.push(peer);
122+
}
123+
}
124+
}
125+
126+
// Only send peers that haven't been tested yet
127+
if !filtered_peers.is_empty() && peer_tx.send(filtered_peers).is_err() {
128+
// Receiver dropped, stop processing
129+
break;
130+
}
114131
}
115132
}
116133

@@ -243,6 +260,7 @@ impl Crawler {
243260
let session = CrawlSession {
244261
crawler: self.clone(),
245262
crawl_tx,
263+
tested_peers: Arc::new(tokio::sync::RwLock::new(HashSet::new())),
246264
};
247265

248266
tokio::spawn(async move {
@@ -290,6 +308,8 @@ struct CrawlSession {
290308
crawler: Crawler,
291309
/// Channel for sending discovery results back to the caller.
292310
crawl_tx: mpsc::Sender<CrawlerMessage>,
311+
/// Shared set of peers that have already been tested, used for deduplication.
312+
tested_peers: Arc<tokio::sync::RwLock<HashSet<Peer>>>,
293313
}
294314

295315
impl CrawlSession {
@@ -320,7 +340,11 @@ impl CrawlSession {
320340
///
321341
/// * `peer` - The peer to test and potentially discover addresses from.
322342
/// * `peer_discovery_tx` - Channel to send discovered peers through.
323-
async fn process(&self, peer: Peer, peer_discovery_tx: mpsc::Sender<Vec<Peer>>) -> TaskResult {
343+
async fn process(
344+
&self,
345+
peer: Peer,
346+
peer_discovery_tx: mpsc::UnboundedSender<Vec<Peer>>,
347+
) -> TaskResult {
324348
debug!("Processing peer {peer:?}");
325349

326350
let mut conn = match timeout(
@@ -369,7 +393,11 @@ impl CrawlSession {
369393

370394
// Request peers and send them directly through the discovery channel
371395
match conn
372-
.get_peers(&peer_discovery_tx, self.crawler.peer_timeout)
396+
.get_peers(
397+
&peer_discovery_tx,
398+
self.crawler.peer_timeout,
399+
&self.tested_peers,
400+
)
373401
.await
374402
{
375403
Ok(peer_count) => {
@@ -398,19 +426,18 @@ impl CrawlSession {
398426
/// 2. **Channel Closure** - Receiver dropped, indicating caller no longer interested.
399427
async fn coordinate(&self, seed: Peer) {
400428
// Channel to track discovered peers to process.
401-
let (peer_discovery_tx, mut peer_discovery_rx) = mpsc::channel(1000);
429+
// Use unbounded channel to prevent tasks from blocking on peer discovery
430+
let (peer_discovery_tx, mut peer_discovery_rx) = mpsc::unbounded_channel();
402431
// Channel to track task completion.
403432
let (task_done_tx, mut task_done_rx) =
404433
mpsc::channel::<TaskResult>(self.crawler.max_concurrent_tasks);
405434

406435
// Prime the pump with the seed peer.
407-
if peer_discovery_tx.send(vec![seed]).await.is_err() {
436+
if peer_discovery_tx.send(vec![seed]).is_err() {
408437
debug!("Failed to send seed peer");
409438
return;
410439
}
411440

412-
// Track which peers we have asked.
413-
let mut tested_peers = HashSet::new();
414441
// Number of in-flight tasks.
415442
let mut active_tasks = 0;
416443

@@ -426,11 +453,10 @@ impl CrawlSession {
426453

427454
// Periodic status logging, but can be crowded out by large peer batches.
428455
if last_log_time.elapsed() >= log_interval {
456+
let tested_count = self.tested_peers.read().await.len();
429457
info!(
430458
"{} active tasks (max: {}), {} unique peers tested",
431-
active_tasks,
432-
self.crawler.max_concurrent_tasks,
433-
tested_peers.len()
459+
active_tasks, self.crawler.max_concurrent_tasks, tested_count
434460
);
435461
last_log_time = Instant::now();
436462
}
@@ -441,7 +467,7 @@ impl CrawlSession {
441467
Some(peers) = peer_discovery_rx.recv() => {
442468
for peer in peers {
443469
// Skip if already tested
444-
if !tested_peers.insert(peer.clone()) {
470+
if !self.tested_peers.write().await.insert(peer.clone()) {
445471
continue;
446472
}
447473

@@ -599,8 +625,11 @@ mod tests {
599625
),
600626
]);
601627

602-
let (tx, mut rx) = mpsc::channel(10);
603-
let result = mock_conn.get_peers(&tx, Duration::from_millis(100)).await;
628+
let (tx, mut rx) = mpsc::unbounded_channel();
629+
let tested_peers = Arc::new(tokio::sync::RwLock::new(HashSet::new()));
630+
let result = mock_conn
631+
.get_peers(&tx, Duration::from_millis(100), &tested_peers)
632+
.await;
604633

605634
assert!(result.is_ok());
606635
let peer_count = result.unwrap();
@@ -632,8 +661,11 @@ mod tests {
632661
let mut mock_conn = MockPeerConnection::new();
633662
// Don't add any messages - should timeout
634663

635-
let (tx, mut rx) = mpsc::channel(10);
636-
let result = mock_conn.get_peers(&tx, Duration::from_millis(50)).await;
664+
let (tx, mut rx) = mpsc::unbounded_channel();
665+
let tested_peers = Arc::new(tokio::sync::RwLock::new(HashSet::new()));
666+
let result = mock_conn
667+
.get_peers(&tx, Duration::from_millis(50), &tested_peers)
668+
.await;
637669

638670
assert!(result.is_ok());
639671
let peer_count = result.unwrap();
@@ -662,8 +694,11 @@ mod tests {
662694
"Connection lost",
663695
)));
664696

665-
let (tx, _rx) = mpsc::channel(10);
666-
let result = mock_conn.get_peers(&tx, Duration::from_millis(100)).await;
697+
let (tx, _rx) = mpsc::unbounded_channel();
698+
let tested_peers = Arc::new(tokio::sync::RwLock::new(HashSet::new()));
699+
let result = mock_conn
700+
.get_peers(&tx, Duration::from_millis(100), &tested_peers)
701+
.await;
667702

668703
// Should return the error
669704
assert!(result.is_err());
@@ -738,8 +773,11 @@ mod tests {
738773
), // Self-reference
739774
]);
740775

741-
let (tx, mut rx) = mpsc::channel(10);
742-
let result = mock_conn.get_peers(&tx, Duration::from_millis(100)).await;
776+
let (tx, mut rx) = mpsc::unbounded_channel();
777+
let tested_peers = Arc::new(tokio::sync::RwLock::new(HashSet::new()));
778+
let result = mock_conn
779+
.get_peers(&tx, Duration::from_millis(100), &tested_peers)
780+
.await;
743781

744782
assert!(result.is_ok());
745783
let peer_count = result.unwrap();

0 commit comments

Comments
 (0)