Skip to content

Commit 2deb978

Browse files
committed
feat(crawler): add ping support just in case
1 parent db45174 commit 2deb978

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

crawler/src/crawler.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ trait PeerConnection {
9797
peer_count += 1;
9898
}
9999
}
100+
// Handling Ping's just in case it improves odds of getting more addresses.
101+
NetworkMessage::Ping(nonce) => {
102+
debug!("Received ping during get_peers, responding with pong");
103+
self.send(NetworkMessage::Pong(nonce)).await?
104+
}
100105
_ => {
101106
debug!("Received unexpected message in get_peers: {message:?}, ignoring");
102107
}
@@ -419,37 +424,35 @@ impl CrawlSession {
419424
break;
420425
}
421426

422-
// Periodic status logging.
427+
// Periodic status logging, but can be crowded out by large peer batches.
423428
if last_log_time.elapsed() >= log_interval {
424429
info!(
425-
"{} active tasks (max: {}), {} messages queue'd",
430+
"{} active tasks (max: {}), {} unique peers tested",
426431
active_tasks,
427432
self.crawler.max_concurrent_tasks,
428-
peer_discovery_rx.len()
433+
tested_peers.len()
429434
);
430435
last_log_time = Instant::now();
431436
}
432437

433438
// Wait for something to happen
434439
tokio::select! {
435-
// New peers discovered.
440+
// New peers discovered, can be up to 1,000 in a batch.
436441
Some(peers) = peer_discovery_rx.recv() => {
437442
for peer in peers {
438443
// Skip if already tested
439444
if !tested_peers.insert(peer.clone()) {
440445
continue;
441446
}
442447

443-
// Wait if we're at capacity
448+
// Wait if we're at capacity.
444449
while active_tasks >= self.crawler.max_concurrent_tasks {
445-
// Wait for a task to complete
446450
if let Some(result) = task_done_rx.recv().await {
447451
active_tasks -= 1;
448452
debug!("Task completed with result: {result:?}");
449453
}
450454
}
451455

452-
// Spawn a new task
453456
let session = self.clone();
454457
let discovery_tx = peer_discovery_tx.clone();
455458
let done_tx = task_done_tx.clone();
@@ -466,7 +469,7 @@ impl CrawlSession {
466469
active_tasks -= 1;
467470
debug!("Task completed with result: {result:?}");
468471

469-
// Check if we're done: no more tasks running
472+
// Check if we're done: no more tasks running.
470473
if active_tasks == 0 {
471474
info!("Crawler exhausted - all peers processed");
472475
break;

0 commit comments

Comments
 (0)