Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "manycastr"
version = "1.4.0"
version = "1.4.1"
edition = "2021"

[dependencies]
Expand Down
25 changes: 17 additions & 8 deletions src/orchestrator/task_distributor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,10 @@ pub async fn broadcast_distributor(config: TaskDistributorConfig) {
probing_rate_interval.tick().await;
}

let cooldown = (config.number_of_probing_workers as u64 * config.worker_interval) + 1;
// Wait for the workers to finish their tasks
tokio::time::sleep(Duration::from_secs(
(config.number_of_probing_workers as u64 * config.worker_interval) + 1,
))
.await;
info!("[Orchestrator] Last tasks being sent, awaiting a {cooldown}-second cooldown.");
tokio::time::sleep(Duration::from_secs(cooldown)).await;

info!("[Orchestrator] Task distribution finished");

Expand Down Expand Up @@ -177,7 +176,8 @@ pub async fn round_robin_distributor(config: TaskDistributorConfig) {
} // end of round-robin loop

// Wait for the workers to finish their tasks
tokio::time::sleep(Duration::from_secs(5)).await;
info!("[Orchestrator] All tasks sent, awaiting a 1-second cooldown for replies.");
tokio::time::sleep(Duration::from_secs(1)).await;

info!("[Orchestrator] Task distribution finished");

Expand Down Expand Up @@ -218,7 +218,8 @@ pub async fn round_robin_distributor(config: TaskDistributorConfig) {
/// Also checks the worker stacks for follow-up tasks and sends them to the appropriate workers.
/// Ends the measurement when all discovery probes have been sent and all stacks are empty.
///
/// Used for --responsive, --latency, and --traceroute measurements.
/// Used for latency and traceroute measurements.
/// Also used when --responsive is set.
///
/// # Arguments
/// * `config` - TaskDistributorConfig with all necessary parameters.
Expand Down Expand Up @@ -347,10 +348,16 @@ pub async fn round_robin_discovery(
.expect("Failed to send task to TaskDistributor");
}

let cooldown = if is_responsive {
(config.number_of_probing_workers as u64 * config.worker_interval) + 1
} else {
5
};

// Check if we finished sending all discovery probes and all stacks are empty
if hitlist_is_empty {
if let Some(start_time) = cooldown_timer {
if start_time.elapsed() >= Duration::from_secs(5) {
if start_time.elapsed() >= Duration::from_secs(cooldown) {
info!("[Orchestrator] Task distribution finished.");
break;
}
Expand All @@ -361,7 +368,9 @@ pub async fn round_robin_discovery(
stacks_guard.values().all(|queue| queue.is_empty())
};
if all_stacks_empty {
info!("[Orchestrator] No more tasks. Waiting 5 seconds for cooldown.",);
info!(
"[Orchestrator] No more tasks. Awaiting a {cooldown}-second cooldown.",
);
cooldown_timer = Some(Instant::now());
}
}
Expand Down
96 changes: 50 additions & 46 deletions src/worker/inbound/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct InboundConfig {
pub origin_id: u32,
/// Source port used
pub sport: u16,
/// Source address used
pub src: String,
}

/// Listen for incoming packets
Expand Down Expand Up @@ -72,15 +74,14 @@ pub fn inbound(config: InboundConfig, tx: UnboundedSender<ReplyBatch>, socket: A
// Listen for incoming packets
let mut received: u32 = 0;
loop {
// Check if we should exit
if rx_f_c.load(Ordering::Relaxed) {
break;
}
let (packet, ttl, src) = match get_packet(&socket) {
Ok(result) => result,
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// Wait 100ms to check again
sleep(Duration::from_millis(100));
sleep(Duration::from_millis(1)); // TODO improve this using nonblocking and using a read timeout
// Check if we should exit
if rx_f_c.load(Ordering::Relaxed) {
break;
}
continue;
}
Err(e) => panic!("Socket error: {}", e),
Expand Down Expand Up @@ -119,10 +120,21 @@ pub fn inbound(config: InboundConfig, tx: UnboundedSender<ReplyBatch>, socket: A
}
}

info!(
"[Worker inbound] Stopped pnet listener (received {} packets)",
received.with_separator()
);
if config.p_type == ProtocolType::Icmp {
info!(
"[Worker inbound] Stopped ICMP ping listener {} (received {} packets)",
config.src,
received.with_separator(),
)
} else {
info!(
"[Worker inbound] Stopped {} listener {}:{} (received {} packets)",
config.p_type,
config.src,
config.sport,
received.with_separator(),
);
}
})
.expect("Failed to spawn listener_thread");

Expand All @@ -145,47 +157,39 @@ fn get_packet(socket: &Socket) -> Result<(&[u8], u32, SocketAddr), std::io::Erro
let mut control_storage = ControlBuffer([MaybeUninit::uninit(); 128]);
let control_buf_bytes = &mut control_storage.0;

loop {
let recv_result = {
let mut iov_buf = [MaybeUninitSlice::new(&mut buf)];
let mut msg = MsgHdrMut::new()
.with_addr(&mut source_storage)
.with_buffers(&mut iov_buf)
.with_control(control_buf_bytes);
let recv_result = {
let mut iov_buf = [MaybeUninitSlice::new(&mut buf)];
let mut msg = MsgHdrMut::new()
.with_addr(&mut source_storage)
.with_buffers(&mut iov_buf)
.with_control(control_buf_bytes);

socket.recvmsg(&mut msg, 0).map(|n| (n, msg.control_len()))
};
socket.recvmsg(&mut msg, 0).map(|n| (n, msg.control_len()))
};

match recv_result {
Ok((bytes_read, control_len)) => {
let source = source_storage
.as_socket()
.ok_or_else(|| std::io::Error::other("invalid source address"))?;
match recv_result {
Ok((bytes_read, control_len)) => {
let source = source_storage
.as_socket()
.ok_or_else(|| std::io::Error::other("invalid source address"))?;

let (packet_data, ancillary_data) = unsafe {
let p = std::slice::from_raw_parts(buf.as_ptr() as *const u8, bytes_read);
let c = std::slice::from_raw_parts(
control_storage.0.as_ptr() as *const u8,
control_len,
);
(p, c)
};
let (packet_data, ancillary_data) = unsafe {
let p = std::slice::from_raw_parts(buf.as_ptr() as *const u8, bytes_read);
let c = std::slice::from_raw_parts(
control_storage.0.as_ptr() as *const u8,
control_len,
);
(p, c)
};

let hop_limit = if source.is_ipv6() {
parse_hop_limit(ancillary_data).unwrap_or(0)
} else {
packet_data[8] as u32
};
return Ok((packet_data, hop_limit, source));
}
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
sleep(Duration::from_millis(1)); // TODO improve this using nonblocking and using a read timeout
continue;
}
return Err(e);
}
let hop_limit = if source.is_ipv6() {
parse_hop_limit(ancillary_data).unwrap_or(0)
} else {
packet_data[8] as u32
};
Ok((packet_data, hop_limit, source))
}
Err(e) => Err(e),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/worker/measurement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl Worker {
is_record: start.is_record,
origin_id: rx_origin.origin_id,
sport: rx_origin.sport as u16,
src: rx_origin.src.expect("no src").to_string(),
},
inbound_tx.clone(),
socket.clone(),
Expand Down