diff --git a/Cargo.toml b/Cargo.toml index 1a2e51f6..6454fd6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "manycastr" -version = "1.4.0" +version = "1.4.1" edition = "2021" [dependencies] diff --git a/src/orchestrator/task_distributor.rs b/src/orchestrator/task_distributor.rs index 7e3fb17a..8616873d 100644 --- a/src/orchestrator/task_distributor.rs +++ b/src/orchestrator/task_distributor.rs @@ -66,11 +66,10 @@ pub async fn broadcast_distributor(config: TaskDistributorConfig) { probing_rate_interval.tick().await; } + let cooldown = (config.number_of_probing_workers as u64 * config.worker_interval) + 1; // Wait for the workers to finish their tasks - tokio::time::sleep(Duration::from_secs( - (config.number_of_probing_workers as u64 * config.worker_interval) + 1, - )) - .await; + info!("[Orchestrator] Last tasks being sent, awaiting a {cooldown}-second cooldown."); + tokio::time::sleep(Duration::from_secs(cooldown)).await; info!("[Orchestrator] Task distribution finished"); @@ -177,7 +176,8 @@ pub async fn round_robin_distributor(config: TaskDistributorConfig) { } // end of round-robin loop // Wait for the workers to finish their tasks - tokio::time::sleep(Duration::from_secs(5)).await; + info!("[Orchestrator] All tasks sent, awaiting a 1-second cooldown for replies."); + tokio::time::sleep(Duration::from_secs(1)).await; info!("[Orchestrator] Task distribution finished"); @@ -218,7 +218,8 @@ pub async fn round_robin_distributor(config: TaskDistributorConfig) { /// Also checks the worker stacks for follow-up tasks and sends them to the appropriate workers. /// Ends the measurement when all discovery probes have been sent and all stacks are empty. /// -/// Used for --responsive, --latency, and --traceroute measurements. +/// Used for latency and traceroute measurements. +/// Also used when --responsive is set. /// /// # Arguments /// * `config` - TaskDistributorConfig with all necessary parameters. @@ -347,10 +348,16 @@ pub async fn round_robin_discovery( .expect("Failed to send task to TaskDistributor"); } + let cooldown = if is_responsive { + (config.number_of_probing_workers as u64 * config.worker_interval) + 1 + } else { + 5 + }; + // Check if we finished sending all discovery probes and all stacks are empty if hitlist_is_empty { if let Some(start_time) = cooldown_timer { - if start_time.elapsed() >= Duration::from_secs(5) { + if start_time.elapsed() >= Duration::from_secs(cooldown) { info!("[Orchestrator] Task distribution finished."); break; } @@ -361,7 +368,9 @@ pub async fn round_robin_discovery( stacks_guard.values().all(|queue| queue.is_empty()) }; if all_stacks_empty { - info!("[Orchestrator] No more tasks. Waiting 5 seconds for cooldown.",); + info!( + "[Orchestrator] No more tasks. Awaiting a {cooldown}-second cooldown.", + ); cooldown_timer = Some(Instant::now()); } } diff --git a/src/worker/inbound/mod.rs b/src/worker/inbound/mod.rs index e0a67de8..fc8377c9 100644 --- a/src/worker/inbound/mod.rs +++ b/src/worker/inbound/mod.rs @@ -44,6 +44,8 @@ pub struct InboundConfig { pub origin_id: u32, /// Source port used pub sport: u16, + /// Source address used + pub src: String, } /// Listen for incoming packets @@ -72,15 +74,14 @@ pub fn inbound(config: InboundConfig, tx: UnboundedSender, socket: A // Listen for incoming packets let mut received: u32 = 0; loop { - // Check if we should exit - if rx_f_c.load(Ordering::Relaxed) { - break; - } let (packet, ttl, src) = match get_packet(&socket) { Ok(result) => result, Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - // Wait 100ms to check again - sleep(Duration::from_millis(100)); + sleep(Duration::from_millis(1)); // TODO improve this using nonblocking and using a read timeout + // Check if we should exit + if rx_f_c.load(Ordering::Relaxed) { + break; + } continue; } Err(e) => panic!("Socket error: {}", e), @@ -119,10 +120,21 @@ pub fn inbound(config: InboundConfig, tx: UnboundedSender, socket: A } } - info!( - "[Worker inbound] Stopped pnet listener (received {} packets)", - received.with_separator() - ); + if config.p_type == ProtocolType::Icmp { + info!( + "[Worker inbound] Stopped ICMP ping listener {} (received {} packets)", + config.src, + received.with_separator(), + ) + } else { + info!( + "[Worker inbound] Stopped {} listener {}:{} (received {} packets)", + config.p_type, + config.src, + config.sport, + received.with_separator(), + ); + } }) .expect("Failed to spawn listener_thread"); @@ -145,47 +157,39 @@ fn get_packet(socket: &Socket) -> Result<(&[u8], u32, SocketAddr), std::io::Erro let mut control_storage = ControlBuffer([MaybeUninit::uninit(); 128]); let control_buf_bytes = &mut control_storage.0; - loop { - let recv_result = { - let mut iov_buf = [MaybeUninitSlice::new(&mut buf)]; - let mut msg = MsgHdrMut::new() - .with_addr(&mut source_storage) - .with_buffers(&mut iov_buf) - .with_control(control_buf_bytes); + let recv_result = { + let mut iov_buf = [MaybeUninitSlice::new(&mut buf)]; + let mut msg = MsgHdrMut::new() + .with_addr(&mut source_storage) + .with_buffers(&mut iov_buf) + .with_control(control_buf_bytes); - socket.recvmsg(&mut msg, 0).map(|n| (n, msg.control_len())) - }; + socket.recvmsg(&mut msg, 0).map(|n| (n, msg.control_len())) + }; - match recv_result { - Ok((bytes_read, control_len)) => { - let source = source_storage - .as_socket() - .ok_or_else(|| std::io::Error::other("invalid source address"))?; + match recv_result { + Ok((bytes_read, control_len)) => { + let source = source_storage + .as_socket() + .ok_or_else(|| std::io::Error::other("invalid source address"))?; - let (packet_data, ancillary_data) = unsafe { - let p = std::slice::from_raw_parts(buf.as_ptr() as *const u8, bytes_read); - let c = std::slice::from_raw_parts( - control_storage.0.as_ptr() as *const u8, - control_len, - ); - (p, c) - }; + let (packet_data, ancillary_data) = unsafe { + let p = std::slice::from_raw_parts(buf.as_ptr() as *const u8, bytes_read); + let c = std::slice::from_raw_parts( + control_storage.0.as_ptr() as *const u8, + control_len, + ); + (p, c) + }; - let hop_limit = if source.is_ipv6() { - parse_hop_limit(ancillary_data).unwrap_or(0) - } else { - packet_data[8] as u32 - }; - return Ok((packet_data, hop_limit, source)); - } - Err(e) => { - if e.kind() == std::io::ErrorKind::WouldBlock { - sleep(Duration::from_millis(1)); // TODO improve this using nonblocking and using a read timeout - continue; - } - return Err(e); - } + let hop_limit = if source.is_ipv6() { + parse_hop_limit(ancillary_data).unwrap_or(0) + } else { + packet_data[8] as u32 + }; + Ok((packet_data, hop_limit, source)) } + Err(e) => Err(e), } } diff --git a/src/worker/measurement.rs b/src/worker/measurement.rs index fe587c80..943c4634 100644 --- a/src/worker/measurement.rs +++ b/src/worker/measurement.rs @@ -61,6 +61,7 @@ impl Worker { is_record: start.is_record, origin_id: rx_origin.origin_id, sport: rx_origin.sport as u16, + src: rx_origin.src.expect("no src").to_string(), }, inbound_tx.clone(), socket.clone(),