From 1f66c3a896c8f357733103e28b5fc07bbaf43b5a Mon Sep 17 00:00:00 2001 From: remi Date: Tue, 27 Jan 2026 09:58:51 +0100 Subject: [PATCH 1/7] adjust measurement finished cooldowns, and listener finished prints --- src/orchestrator/task_distributor.rs | 23 +++++++++++++++-------- src/worker/inbound/mod.rs | 18 ++++++++++++++++-- src/worker/measurement.rs | 1 + 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/src/orchestrator/task_distributor.rs b/src/orchestrator/task_distributor.rs index 7e3fb17a..52a87c57 100644 --- a/src/orchestrator/task_distributor.rs +++ b/src/orchestrator/task_distributor.rs @@ -66,11 +66,10 @@ pub async fn broadcast_distributor(config: TaskDistributorConfig) { probing_rate_interval.tick().await; } + let cooldown = (config.number_of_probing_workers as u64 * config.worker_interval) + 1; // Wait for the workers to finish their tasks - tokio::time::sleep(Duration::from_secs( - (config.number_of_probing_workers as u64 * config.worker_interval) + 1, - )) - .await; + info!("[Orchestrator] Last tasks being sent, awaiting {cooldown}-seconds cooldown."); + tokio::time::sleep(Duration::from_secs(cooldown)).await; info!("[Orchestrator] Task distribution finished"); @@ -177,7 +176,8 @@ pub async fn round_robin_distributor(config: TaskDistributorConfig) { } // end of round-robin loop // Wait for the workers to finish their tasks - tokio::time::sleep(Duration::from_secs(5)).await; + info!("[Orchestrator] All tasks sent, awaiting 1-second cooldown for replies."); + tokio::time::sleep(Duration::from_secs(1)).await; info!("[Orchestrator] Task distribution finished"); @@ -218,7 +218,8 @@ pub async fn round_robin_distributor(config: TaskDistributorConfig) { /// Also checks the worker stacks for follow-up tasks and sends them to the appropriate workers. /// Ends the measurement when all discovery probes have been sent and all stacks are empty. /// -/// Used for --responsive, --latency, and --traceroute measurements. +/// Used for latency and traceroute measurements. +/// Also used when --responsive is set. /// /// # Arguments /// * `config` - TaskDistributorConfig with all necessary parameters. @@ -347,10 +348,16 @@ pub async fn round_robin_discovery( .expect("Failed to send task to TaskDistributor"); } + let cooldown = if is_responsive { + (config.number_of_probing_workers as u64 * config.worker_interval) + 1 + } else { + 5 + }; + // Check if we finished sending all discovery probes and all stacks are empty if hitlist_is_empty { if let Some(start_time) = cooldown_timer { - if start_time.elapsed() >= Duration::from_secs(5) { + if start_time.elapsed() >= Duration::from_secs(cooldown) { info!("[Orchestrator] Task distribution finished."); break; } @@ -361,7 +368,7 @@ pub async fn round_robin_discovery( stacks_guard.values().all(|queue| queue.is_empty()) }; if all_stacks_empty { - info!("[Orchestrator] No more tasks. Waiting 5 seconds for cooldown.",); + info!("[Orchestrator] No more tasks. Waiting {cooldown}-seconds for cooldown.",); cooldown_timer = Some(Instant::now()); } } diff --git a/src/worker/inbound/mod.rs b/src/worker/inbound/mod.rs index e0a67de8..b798cd7c 100644 --- a/src/worker/inbound/mod.rs +++ b/src/worker/inbound/mod.rs @@ -44,6 +44,8 @@ pub struct InboundConfig { pub origin_id: u32, /// Source port used pub sport: u16, + /// Source address used + pub src: String, } /// Listen for incoming packets @@ -119,9 +121,21 @@ pub fn inbound(config: InboundConfig, tx: UnboundedSender, socket: A } } + // config.sport + if config.p_type == ProtocolType::Icmp { + info!( + "[Worker inbound] Stopped ICMP ping listener {} (received {} packets)", + config.src, + received.with_separator(), + ) + } + info!( - "[Worker inbound] Stopped pnet listener (received {} packets)", - received.with_separator() + "[Worker inbound] Stopped {} listener {}:{} (received {} packets)", + config.p_type, + config.src, + config.sport, + received.with_separator(), ); }) .expect("Failed to spawn listener_thread"); diff --git a/src/worker/measurement.rs b/src/worker/measurement.rs index fe587c80..dbde1e60 100644 --- a/src/worker/measurement.rs +++ b/src/worker/measurement.rs @@ -61,6 +61,7 @@ impl Worker { is_record: start.is_record, origin_id: rx_origin.origin_id, sport: rx_origin.sport as u16, + src: rx_origin.src.unwrap().to_string(), }, inbound_tx.clone(), socket.clone(), From 03df9573e1203ea89f2bee88bdc8e70e8d8f4ddd Mon Sep 17 00:00:00 2001 From: remi Date: Tue, 27 Jan 2026 11:08:50 +0100 Subject: [PATCH 2/7] debug discovery --- src/orchestrator/service.rs | 2 ++ src/worker/outbound/mod.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/src/orchestrator/service.rs b/src/orchestrator/service.rs index 58663715..6c3d46ca 100644 --- a/src/orchestrator/service.rs +++ b/src/orchestrator/service.rs @@ -388,6 +388,7 @@ impl Controller for ControllerService { m_type, MeasurementType::AnycastLatency | MeasurementType::AnycastTraceroute ); + println!("send_discovery {send_discovery}, is_responsive {is_responsive}"); // Distribute tasks round-robin if true let is_round_robing = send_discovery || (m_def.m_type() == MeasurementType::Catchment); @@ -403,6 +404,7 @@ impl Controller for ControllerService { // Convert hitlist targets into the appropriate TaskType let tasks = if send_discovery { // Send Discovery tasks + println!("sending discovery tasks"); hitlist .iter() .map(|addr| Task { diff --git a/src/worker/outbound/mod.rs b/src/worker/outbound/mod.rs index d54b9f82..fd146ffe 100644 --- a/src/worker/outbound/mod.rs +++ b/src/worker/outbound/mod.rs @@ -117,6 +117,7 @@ pub fn outbound( failed += f; } Some(TaskType::Discovery(task)) => { + println!("sending discovery task"); let (s, f) = send_probe( &config, &task.dst.unwrap(), From dbbfe13b4874cfed3ed3494bec87bb608d9d5c4a Mon Sep 17 00:00:00 2001 From: remi Date: Tue, 27 Jan 2026 11:17:59 +0100 Subject: [PATCH 3/7] remove prints --- src/orchestrator/service.rs | 2 -- src/worker/outbound/mod.rs | 1 - 2 files changed, 3 deletions(-) diff --git a/src/orchestrator/service.rs b/src/orchestrator/service.rs index 6c3d46ca..58663715 100644 --- a/src/orchestrator/service.rs +++ b/src/orchestrator/service.rs @@ -388,7 +388,6 @@ impl Controller for ControllerService { m_type, MeasurementType::AnycastLatency | MeasurementType::AnycastTraceroute ); - println!("send_discovery {send_discovery}, is_responsive {is_responsive}"); // Distribute tasks round-robin if true let is_round_robing = send_discovery || (m_def.m_type() == MeasurementType::Catchment); @@ -404,7 +403,6 @@ impl Controller for ControllerService { // Convert hitlist targets into the appropriate TaskType let tasks = if send_discovery { // Send Discovery tasks - println!("sending discovery tasks"); hitlist .iter() .map(|addr| Task { diff --git a/src/worker/outbound/mod.rs b/src/worker/outbound/mod.rs index fd146ffe..d54b9f82 100644 --- a/src/worker/outbound/mod.rs +++ b/src/worker/outbound/mod.rs @@ -117,7 +117,6 @@ pub fn outbound( failed += f; } Some(TaskType::Discovery(task)) => { - println!("sending discovery task"); let (s, f) = send_probe( &config, &task.dst.unwrap(), From 4291eeffc7b744a54d69b69b7ac8e3e787d0bdea Mon Sep 17 00:00:00 2001 From: remi Date: Tue, 27 Jan 2026 11:21:15 +0100 Subject: [PATCH 4/7] bump version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 1a2e51f6..985127a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "manycastr" -version = "1.4.0" +version = "1.4." edition = "2021" [dependencies] From 63e37affaf382e0b4e85ded65696550738ca6525 Mon Sep 17 00:00:00 2001 From: remi Date: Tue, 27 Jan 2026 11:21:50 +0100 Subject: [PATCH 5/7] bump version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 985127a1..6454fd6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "manycastr" -version = "1.4." +version = "1.4.1" edition = "2021" [dependencies] From 261a53cd0c83859324ea0a9cb3d5db7866cda366 Mon Sep 17 00:00:00 2001 From: remi Date: Tue, 27 Jan 2026 11:42:35 +0100 Subject: [PATCH 6/7] allow for exiting inbound threads and fix print grammar --- src/orchestrator/task_distributor.rs | 6 +++--- src/worker/inbound/mod.rs | 32 +++++++++++----------------- src/worker/measurement.rs | 2 +- 3 files changed, 17 insertions(+), 23 deletions(-) diff --git a/src/orchestrator/task_distributor.rs b/src/orchestrator/task_distributor.rs index 52a87c57..82dee2a2 100644 --- a/src/orchestrator/task_distributor.rs +++ b/src/orchestrator/task_distributor.rs @@ -68,7 +68,7 @@ pub async fn broadcast_distributor(config: TaskDistributorConfig) { let cooldown = (config.number_of_probing_workers as u64 * config.worker_interval) + 1; // Wait for the workers to finish their tasks - info!("[Orchestrator] Last tasks being sent, awaiting {cooldown}-seconds cooldown."); + info!("[Orchestrator] Last tasks being sent, awaiting a {cooldown}-second cooldown."); tokio::time::sleep(Duration::from_secs(cooldown)).await; info!("[Orchestrator] Task distribution finished"); @@ -176,7 +176,7 @@ pub async fn round_robin_distributor(config: TaskDistributorConfig) { } // end of round-robin loop // Wait for the workers to finish their tasks - info!("[Orchestrator] All tasks sent, awaiting 1-second cooldown for replies."); + info!("[Orchestrator] All tasks sent, awaiting a 1-second cooldown for replies."); tokio::time::sleep(Duration::from_secs(1)).await; info!("[Orchestrator] Task distribution finished"); @@ -368,7 +368,7 @@ pub async fn round_robin_discovery( stacks_guard.values().all(|queue| queue.is_empty()) }; if all_stacks_empty { - info!("[Orchestrator] No more tasks. Waiting {cooldown}-seconds for cooldown.",); + info!("[Orchestrator] No more tasks. Awaiting a {cooldown}-second cooldown.",); cooldown_timer = Some(Instant::now()); } } diff --git a/src/worker/inbound/mod.rs b/src/worker/inbound/mod.rs index b798cd7c..13fb8c14 100644 --- a/src/worker/inbound/mod.rs +++ b/src/worker/inbound/mod.rs @@ -74,15 +74,14 @@ pub fn inbound(config: InboundConfig, tx: UnboundedSender, socket: A // Listen for incoming packets let mut received: u32 = 0; loop { - // Check if we should exit - if rx_f_c.load(Ordering::Relaxed) { - break; - } let (packet, ttl, src) = match get_packet(&socket) { Ok(result) => result, Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - // Wait 100ms to check again - sleep(Duration::from_millis(100)); + sleep(Duration::from_millis(1)); // TODO improve this using nonblocking and using a read timeout + // Check if we should exit + if rx_f_c.load(Ordering::Relaxed) { + break; + } continue; } Err(e) => panic!("Socket error: {}", e), @@ -121,22 +120,21 @@ pub fn inbound(config: InboundConfig, tx: UnboundedSender, socket: A } } - // config.sport if config.p_type == ProtocolType::Icmp { info!( "[Worker inbound] Stopped ICMP ping listener {} (received {} packets)", config.src, received.with_separator(), ) + } else { + info!( + "[Worker inbound] Stopped {} listener {}:{} (received {} packets)", + config.p_type, + config.src, + config.sport, + received.with_separator(), + ); } - - info!( - "[Worker inbound] Stopped {} listener {}:{} (received {} packets)", - config.p_type, - config.src, - config.sport, - received.with_separator(), - ); }) .expect("Failed to spawn listener_thread"); @@ -193,10 +191,6 @@ fn get_packet(socket: &Socket) -> Result<(&[u8], u32, SocketAddr), std::io::Erro return Ok((packet_data, hop_limit, source)); } Err(e) => { - if e.kind() == std::io::ErrorKind::WouldBlock { - sleep(Duration::from_millis(1)); // TODO improve this using nonblocking and using a read timeout - continue; - } return Err(e); } } diff --git a/src/worker/measurement.rs b/src/worker/measurement.rs index dbde1e60..943c4634 100644 --- a/src/worker/measurement.rs +++ b/src/worker/measurement.rs @@ -61,7 +61,7 @@ impl Worker { is_record: start.is_record, origin_id: rx_origin.origin_id, sport: rx_origin.sport as u16, - src: rx_origin.src.unwrap().to_string(), + src: rx_origin.src.expect("no src").to_string(), }, inbound_tx.clone(), socket.clone(), From ec60e41a28d245ca46c2a01fd496ab888ff96b9e Mon Sep 17 00:00:00 2001 From: remi Date: Tue, 27 Jan 2026 11:43:37 +0100 Subject: [PATCH 7/7] clippy and fmt --- src/orchestrator/task_distributor.rs | 4 +- src/worker/inbound/mod.rs | 60 +++++++++++++--------------- 2 files changed, 31 insertions(+), 33 deletions(-) diff --git a/src/orchestrator/task_distributor.rs b/src/orchestrator/task_distributor.rs index 82dee2a2..8616873d 100644 --- a/src/orchestrator/task_distributor.rs +++ b/src/orchestrator/task_distributor.rs @@ -368,7 +368,9 @@ pub async fn round_robin_discovery( stacks_guard.values().all(|queue| queue.is_empty()) }; if all_stacks_empty { - info!("[Orchestrator] No more tasks. Awaiting a {cooldown}-second cooldown.",); + info!( + "[Orchestrator] No more tasks. Awaiting a {cooldown}-second cooldown.", + ); cooldown_timer = Some(Instant::now()); } } diff --git a/src/worker/inbound/mod.rs b/src/worker/inbound/mod.rs index 13fb8c14..fc8377c9 100644 --- a/src/worker/inbound/mod.rs +++ b/src/worker/inbound/mod.rs @@ -157,43 +157,39 @@ fn get_packet(socket: &Socket) -> Result<(&[u8], u32, SocketAddr), std::io::Erro let mut control_storage = ControlBuffer([MaybeUninit::uninit(); 128]); let control_buf_bytes = &mut control_storage.0; - loop { - let recv_result = { - let mut iov_buf = [MaybeUninitSlice::new(&mut buf)]; - let mut msg = MsgHdrMut::new() - .with_addr(&mut source_storage) - .with_buffers(&mut iov_buf) - .with_control(control_buf_bytes); + let recv_result = { + let mut iov_buf = [MaybeUninitSlice::new(&mut buf)]; + let mut msg = MsgHdrMut::new() + .with_addr(&mut source_storage) + .with_buffers(&mut iov_buf) + .with_control(control_buf_bytes); - socket.recvmsg(&mut msg, 0).map(|n| (n, msg.control_len())) - }; + socket.recvmsg(&mut msg, 0).map(|n| (n, msg.control_len())) + }; - match recv_result { - Ok((bytes_read, control_len)) => { - let source = source_storage - .as_socket() - .ok_or_else(|| std::io::Error::other("invalid source address"))?; + match recv_result { + Ok((bytes_read, control_len)) => { + let source = source_storage + .as_socket() + .ok_or_else(|| std::io::Error::other("invalid source address"))?; - let (packet_data, ancillary_data) = unsafe { - let p = std::slice::from_raw_parts(buf.as_ptr() as *const u8, bytes_read); - let c = std::slice::from_raw_parts( - control_storage.0.as_ptr() as *const u8, - control_len, - ); - (p, c) - }; + let (packet_data, ancillary_data) = unsafe { + let p = std::slice::from_raw_parts(buf.as_ptr() as *const u8, bytes_read); + let c = std::slice::from_raw_parts( + control_storage.0.as_ptr() as *const u8, + control_len, + ); + (p, c) + }; - let hop_limit = if source.is_ipv6() { - parse_hop_limit(ancillary_data).unwrap_or(0) - } else { - packet_data[8] as u32 - }; - return Ok((packet_data, hop_limit, source)); - } - Err(e) => { - return Err(e); - } + let hop_limit = if source.is_ipv6() { + parse_hop_limit(ancillary_data).unwrap_or(0) + } else { + packet_data[8] as u32 + }; + Ok((packet_data, hop_limit, source)) } + Err(e) => Err(e), } }