Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
da9d3ee
update version for new feature
rhendriks Jan 23, 2026
7151bf5
couple worker tasks to specific origin_ids and rename verfploeter to …
rhendriks Jan 23, 2026
8a3734c
remove unimplemented 'live', TODOs, and proto renaming
rhendriks Jan 23, 2026
0e019c8
origin IDs in tasks
rhendriks Jan 23, 2026
9615ae1
couple p_type to origin rather than measurement
rhendriks Jan 23, 2026
42154e6
protocols
rhendriks Jan 23, 2026
8d23d81
conflicts
rhendriks Jan 23, 2026
d6fb8bf
allow for 'multi' protocol filenames
rhendriks Jan 23, 2026
9aa0ba9
print measurement type using origins
rhendriks Jan 23, 2026
2bc1387
include p_type in configuration files
rhendriks Jan 24, 2026
79aa88a
include p_type with origins for csv metadata
rhendriks Jan 24, 2026
7b9f6b4
attached origin_id to all tasks
rhendriks Jan 24, 2026
db3daa5
origin_id handling and encode tcp timestamps as millis
rhendriks Jan 24, 2026
2581ead
fix multi-row writing
rhendriks Jan 26, 2026
6e72a35
attach origin_id to a batch of replies
rhendriks Jan 26, 2026
43f7cea
no longer print timed out hops
rhendriks Jan 26, 2026
7bd88ab
associate origin_id only to batches
rhendriks Jan 26, 2026
d7ec3e7
parse multiple p_types
rhendriks Jan 26, 2026
6a9f686
create multiple configs when using multiple protocols
rhendriks Jan 26, 2026
ae132f7
print number of ALL workers
rhendriks Jan 26, 2026
2c8554b
fix print number of workers
rhendriks Jan 26, 2026
95fdc45
print origin ids in filenames
rhendriks Jan 26, 2026
0c00c05
rename verfploeter to catchment
rhendriks Jan 26, 2026
6b8c86a
rename verfploeter to catchment in doc
rhendriks Jan 26, 2026
e825734
fmt and clippy
rhendriks Jan 26, 2026
aabd931
debug pritns
rhendriks Jan 26, 2026
87b2c08
debug
rhendriks Jan 26, 2026
231f034
debug tcp issue
rhendriks Jan 26, 2026
9eb99b6
print port
rhendriks Jan 26, 2026
d6e78b9
typo
rhendriks Jan 26, 2026
6b64479
verify port values
rhendriks Jan 26, 2026
e7bc6d5
remove prints
rhendriks Jan 26, 2026
dc4e4f5
cargo fmt
rhendriks Jan 26, 2026
09efa10
tcp microseconds
rhendriks Jan 26, 2026
06c09aa
only probe using matching origin IDs
rhendriks Jan 26, 2026
ba14edc
cargo fmt
rhendriks Jan 26, 2026
39923fd
rx_time tcp
rhendriks Jan 26, 2026
c021c98
us instead of ms
rhendriks Jan 26, 2026
1ee05e2
fix round-robin cooldowns
rhendriks Jan 26, 2026
d6a1a75
cargo fmt
rhendriks Jan 26, 2026
adb48a2
timeout sleep 5 seconds
rhendriks Jan 26, 2026
25f6ccb
clippy warning
rhendriks Jan 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "manycastr"
version = "1.3.1"
version = "1.4.0"
edition = "2021"

[dependencies]
Expand All @@ -15,7 +15,7 @@ tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros", "sync", "
tonic = { version = "0.14.2", features = ["tls-ring", "zstd"] } # gRPC (inter-component communication)
rand = "0.9.2" # generate random numbers
gethostname = "1.1.0" # get hostname
socket2 = { version = "0.6.1", features = ["all"] }
socket2 = { version = "0.6.2", features = ["all"] }
local-ip-address = "0.6.9" # get local unicast address
indicatif = "0.18.3" # progress bar
ratelimit_meter = "5.0.0" # rate limit
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ When creating a measurement you can specify (for more information run --help):

### Variables
* **Hitlist** - addresses to be probed (IP-addresses or -numbers seperated by newlines) (supports gzipped files)
* **Protocol** - ICMP, DNS, TCP, or CHAOS
* **Measurement Type** - `laces`, `verfploeter`, `unicast`, `latency`, or `anycast-traceroute`
* **Protocol** - ICMP, DNS, TCP, or CHAOS (multiple allowed)
* **Measurement Type** - `laces`, `catchment`, `unicast`, `latency`, or `anycast-traceroute`
* **Rate** - the rate (packets / second) at which each worker will send out probes (default: 1000)
* **Selective** - specify which workers have to send out probes (all connected workers will listen for packets)
* **Worker-interval** - interval between separate worker's probes to the same target (default: 1s)
Expand Down Expand Up @@ -98,10 +98,10 @@ cli -a [ORC ADDRESS] start [parameters]

### Examples

#### Verfploeter catchment mapping
#### Catchment mapping

```
cli -a [::1]:50001 start -m verfploeter -h hitlist.txt -t icmp -a 10.0.0.0 -o results.csv.gz -r 1000
cli -a [::1]:50001 start -m catchment -h hitlist.txt -t icmp -a 10.0.0.0 -o results.csv.gz -r 1000
```

All workers probe the targets in hitlist.txt using ICMPv4, using source address 10.0.0.0, results are stored in results.csv.gz
Expand All @@ -113,7 +113,7 @@ Hitlist is divided amongst workers, each worker sends out 1,000 packets per seco
### Anycast latency measurement using TCPv4

```
cli -a [::1]:50001 start hitlist.txt -t tcp -a 10.0.0.0 -m verfploeter
cli -a [::1]:50001 start hitlist.txt -t tcp -a 10.0.0.0 -m latency
```

Similar as above, except the RTT between each hitlist target and the anycast deployment is also measured.
Expand Down
18 changes: 8 additions & 10 deletions example.conf
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
# This file provides an example configuration file
# A configuration file allows for setting the (source address, source port, destination port) 'origins' used by clients in a task
# Clients may have multiple origins, and will probe with each unique origin
# Origins must use anycast addresses
# Clients will listen for every origin defined (regardless of whether they are probing with the defined origin)
# A configuration consists of a client_id: (source address, source port, and destination port)
# A hostname can be used instead of a client_id (e.g., nl-ams: (...))
# Workers may have multiple origins, and will probe with each unique origin
# Workers will listen for every origin defined (regardless of whether they are probing with the defined origin)
# A hostname can be used instead of a worker_id (e.g., nl-ams: (...))
# 'ALL' means all clients will probe using the defined origin
# In this example all clients probe using Origin (1.1.1.1, 1234, 4321)
# And client 17 additionally probes using Origin (1.1.1.2, 1234, 4321)
ALL - 1.1.1.1, 1234, 4321
17 - 1.1.1.2, 1234, 4321
# Worker, src_addr, src_port, dst_port, protocol
ALL, 1.1.1.1, 1234, 4321, tcp
17, 1.1.1.2, 1234, 4321, ICMP
nl-ams, 1.1.1.3, 189, 53, DNS
de-fra, 1.1.1.3, 189, 53, chaos
102 changes: 37 additions & 65 deletions proto/manycastr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,14 @@ service Controller {
rpc MeasurementFinished(Finished) returns (Ack) {}
// Worker connects with the Orc, which returns a stream for future instructions
rpc WorkerConnect(Worker) returns (stream Instruction) {}
// CLI instructs Orc to perform a measurement and establishes a stream for the Orc to send back replies
// CLI instructs Orc to perform a hitlist-based measurement and establishes a stream for the Orc to send back replies
rpc DoMeasurement(ScheduleMeasurement) returns (stream ReplyBatch) {}
// CLI instructs Orc to perform a live measurement and establishes a stream for the Orc to send back replies
rpc LiveMeasurement(stream LiveMeasurementMessage) returns (stream ReplyBatch) {}
// CLI asks Orc its current status (list of connected workers and measurements)
rpc ListWorkers(Empty) returns (Status) {}
// Worker sends measurement replies to the Orc
rpc SendResult(ReplyBatch) returns (Ack) {}
}

// Metadata for the measurement session (sent once at the start)
message LiveMeasurementMetadata {
string url = 1; // URL to encode in probes
}

// Per-task information (streamed for each task)
message LiveTask {
uint32 worker_id = 1; // Worker from which to send the probe (0 for any worker)
Origin origin = 2; // Origin to send the probe from
Address dst = 3; // Destination to send the probe to
uint32 m_type = 4; // Measurement type (ICMP, UDP, TCP, DNS)
bool is_ipv6 = 5; // Whether to use IPv6
optional string record = 6; // Record to use for DNS queries (TXT/A/AAAA)
bool is_latency = 7; // Whether the measurement is for latency
}

// Wrapper message for streaming tasks or sending metadata
message LiveMeasurementMessage {
oneof msg {
LiveMeasurementMetadata metadata = 1; // Sent once
LiveTask task = 2; // Sent multiple times
}
}

// Empty message
message Empty {}

Expand All @@ -58,7 +32,7 @@ message Finished {

enum MeasurementType {
LACES = 0;
VERFPLOETER = 1;
CATCHMENT = 1;
ANYCAST_LATENCY = 2;
UNICAST_LATENCY = 3;
ANYCAST_TRACEROUTE = 4;
Expand All @@ -75,18 +49,17 @@ enum ProtocolType {
message ScheduleMeasurement {
uint32 probing_rate = 1; // Probing rate (probes per second)
repeated Configuration configurations = 2; // Configuration of worker and origin combinations
ProtocolType p_type = 3; // Measurement type (ICMP, UDP, TCP, DNS)
uint32 worker_interval = 4; // interval between probes from different workers
MeasurementType m_type = 5;
repeated Address hitlist = 6; // target addresses to probe
optional string record = 7; // Record to send CHAOS (TXT) or A/AAAA requests for
optional string url = 8; // URL to encode in probes (e.g., opt-out link)
uint32 probe_interval = 9; // interval between probes from/to the same origin,dst pair
uint32 number_of_probes = 10; // number of probes to send per origin,dst pair
bool is_ipv6 = 11; // whether the measurement is for IPv6
bool is_record = 12; // whether to send IPv4 probes with the Record Route options
optional TraceOptions trace_options = 13; // Optional traceroute options
bool is_responsive = 14; // Whether to probe a target once for responsiveness before sending measurement probes
uint32 worker_interval = 3; // interval between probes from different workers
MeasurementType m_type = 4;
repeated Address hitlist = 5; // target addresses to probe
optional string record = 6; // Record to send CHAOS (TXT) or A/AAAA requests for
optional string url = 7; // URL to encode in probes (e.g., opt-out link)
uint32 probe_interval = 8; // interval between probes from/to the same origin,dst pair
uint32 number_of_probes = 9; // number of probes to send per origin,dst pair
bool is_ipv6 = 10; // whether the measurement is for IPv6
bool is_record = 11; // whether to send IPv4 probes with the Record Route options
optional TraceOptions trace_options = 12; // Optional traceroute options
bool is_responsive = 13; // Whether to probe a target once for responsiveness before sending measurement probes
}

// A list of workers
Expand Down Expand Up @@ -141,6 +114,7 @@ message Task {
Trace trace = 2; // A traceroute task (to a specific TTL)
Probe discovery = 3; // A discovery task (e.g., to check for responsiveness or catching PoP)
}
uint32 origin_id = 4; // Origin from which to send this probe
}

// Initial message to worker, contains the worker ID assigned by the orchestrator
Expand All @@ -152,14 +126,13 @@ message Init {
message Start {
uint32 rate = 1;
uint32 m_id = 2;
ProtocolType p_type = 3; // Measurement type (ICMP, UDP, TCP, DNS)
repeated Origin tx_origins = 4; // origins to send with
repeated Origin rx_origins = 5; // origins to listen for
optional string record = 6; // domain to use for DNS queries (TXT/A/AAAA)
optional string url = 7; // URL encoded in probes
bool is_ipv6 = 8; // whether the measurement is for IPv6
bool is_record = 9; // send IPv4 probes with the Record Route option
MeasurementType m_type = 10; // Measurement type
repeated Origin tx_origins = 3; // origins to send with
repeated Origin rx_origins = 4; // origins to listen for
optional string record = 5; // domain to use for DNS queries (TXT/A/AAAA)
optional string url = 6; // URL encoded in probes
bool is_ipv6 = 7; // whether the measurement is for IPv6
bool is_record = 8; // send IPv4 probes with the Record Route option
MeasurementType m_type = 9; // Measurement type
}

message TraceOptions {
Expand All @@ -174,12 +147,13 @@ message End {
uint32 code = 1; // 0 -> finished, 1 -> CLI disconnect
}

// Origin: the source address and port combinations used by workers in a measurement, alongside an identifier
// Origin: mapping of source address, port values, protocol type, and an identifier
message Origin {
Address src = 1;
fixed32 sport = 2;
fixed32 dport = 3;
fixed32 origin_id = 4;
ProtocolType p_type = 5; // Protocol type to use for this origin
}

// Configuration: mapping of worker ID to origin
Expand All @@ -188,24 +162,25 @@ message Configuration {
uint32 worker_id = 2;
}

// A probing task, containing the target address to probe
// A probing task, containing the target address to probe and the origin ID with which to send the probe
message Probe {
Address dst = 1; // target address of the probe
}

// A traceroute task, containing a destination addres, the TTL value to use, the protocol type, and the origin to use
// A traceroute task, containing a destination address, the TTL value to use, the protocol type, and the origin to use
message Trace {
Address dst = 1; // target address of the traceroute probe
uint32 ttl = 2; // TTL to use for the traceroute probe
uint32 origin_id = 3; // Origin to send the traceroute probe from [None if a single origin is used]
}

// A ReplyBatch, contains the worker that received the measurement replies, and the replies themselves
message ReplyBatch {
uint32 rx_id = 1;
repeated Reply results = 2; // forwarded to CLI by the Orchestrator
fixed32 origin_id = 3; // Origin associated with the replies
}

// Single reply
message Reply {
oneof reply_data {
MeasurementReply measurement = 1; // result
Expand All @@ -217,31 +192,28 @@ message Reply {
// Contains discovery reply data (used for follow-up tasks)
message DiscoveryReply {
Address src = 1;
fixed32 origin_id = 2; // ID of the destination origin (ip, ports) of the reply
}

// Contains measured reply data (forwarded to the CLI and written to file)
message MeasurementReply {
Address src = 1;
fixed32 ttl = 2;
fixed32 origin_id = 3; // ID of the destination origin (ip, ports) of the reply
uint64 rx_time = 4;
uint64 tx_time = 5;
fixed32 tx_id = 6;
optional string chaos = 7;
optional RecordedHops recorded_hops = 8;
uint64 rx_time = 3;
uint64 tx_time = 4;
fixed32 tx_id = 5;
optional string chaos = 6;
optional RecordedHops recorded_hops = 7;
}

// Traceroute reply
message TraceReply {
Address hop_addr = 1;
fixed32 ttl = 2;
fixed32 origin_id = 3; // ID of the destination origin (ip, ports) of the reply
uint64 rx_time = 4; // 32 bit microseconds time
uint64 tx_time = 5; // 16 bit microseconds time
fixed32 tx_id = 6;
Address trace_dst = 7;
uint32 hop_count = 8; // TTL used to trigger this reply
uint64 rx_time = 3; // 32 bit microseconds time
uint64 tx_time = 4; // 16 bit microseconds time
fixed32 tx_id = 5;
Address trace_dst = 6;
uint32 hop_count = 7; // TTL used to trigger this reply
}

// Record Route hops found
Expand Down
60 changes: 44 additions & 16 deletions src/cli/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use crate::cli::commands::start::MeasurementExecutionArgs;
use crate::cli::writer::parquet_writer::write_results_parquet;
use crate::cli::writer::{write_results_csv, MetadataArgs, WriteConfig};
use crate::custom_module::manycastr::controller_client::ControllerClient;
use crate::custom_module::manycastr::ProtocolType::{ChaosDns, Tcp};
use crate::custom_module::manycastr::{MeasurementType, ReplyBatch, ScheduleMeasurement};
use crate::custom_module::Separated;
use crate::ALL_WORKERS;
use crate::{ALL_WORKERS, SINGLE_ORIGIN};
use chrono::Local;
use indicatif::{ProgressBar, ProgressStyle};
use log::{error, info, warn};
Expand All @@ -30,12 +31,11 @@ impl CliClient {
/// # Arguments
/// * `m_def` - measurement definition for the orchestrator created from the command-line arguments
/// * `args` - contains additional arguments for the measurement execution
/// * `is_ipv6` - boolean whether the measurement is IPv6 or not
/// * `m_type` - Measurement type
pub(crate) async fn do_measurement_to_server(
&mut self,
m_def: ScheduleMeasurement,
args: MeasurementExecutionArgs<'_>,
is_ipv6: bool,
m_type: MeasurementType,
) -> Result<(), Box<dyn Error>> {
let probing_rate = m_def.probing_rate;
Expand All @@ -58,15 +58,15 @@ impl CliClient {
};

let m_time = match m_def.m_type() {
MeasurementType::Verfploeter | MeasurementType::AnycastLatency => {
MeasurementType::Catchment | MeasurementType::AnycastLatency => {
((args.hitlist_length as f32 / (probing_rate as f32 * number_of_probers as f32))
+ 1.0)
+ 5.0)
/ 60.0
}
_ => {
(((number_of_probers - 1) as f32 * worker_interval as f32) // Last worker starts probing
+ (args.hitlist_length as f32 / probing_rate as f32) // Time to probe all addresses
+ 1.0) // Time to wait for last replies
+ 5.0) // Time to wait for last replies
/ 60.0 // Convert to minutes
}
};
Expand Down Expand Up @@ -123,11 +123,20 @@ impl CliClient {
let (tx_r, rx_r) = unbounded_channel();

// Get protocol and IP version
let type_str = format!(
"{}{}",
m_def.p_type().as_str(),
if is_ipv6 { "v6" } else { "v4" }
);
let proto_str = {
let mut it = m_def
.configurations
.iter()
.map(|c| c.origin.expect("none origin").p_type());
let first = it.next().unwrap();
if it.all(|p| p == first) {
// A single protocol type is used
first.as_str()
} else {
// Multiple protocol types are used
"multi"
}
};

// Determine traceroute
let is_record = args.is_record;
Expand All @@ -141,7 +150,7 @@ impl CliClient {
let file_path = if path.ends_with('/') {
// User provided a path, use default naming convention for file
format!(
"{path}{}-{type_str}-{timestamp_start_str}{extension}",
"{path}{}-{proto_str}-{timestamp_start_str}{extension}",
m_type.as_str()
)
} else {
Expand All @@ -165,25 +174,44 @@ impl CliClient {
configurations: &m_def.configurations,
is_responsive,
m_type: m_def.m_type(),
p_type: m_def.p_type(),
};

// Check if any configuration has origin_id that is not 0 or u32::MAX -> multi origin
// Check if any configuration has an origin ID
let is_multi_origin = m_def.configurations.iter().any(|conf| {
conf.origin
.as_ref()
.is_some_and(|origin| origin.origin_id != 0 && origin.origin_id != u32::MAX)
.is_some_and(|origin| origin.origin_id != SINGLE_ORIGIN)
});

// Check if any configuration sends CHAOS probes
let is_chaos = m_def.configurations.iter().any(|conf| {
conf.origin
.as_ref()
.is_some_and(|origin| origin.p_type() == ChaosDns)
});

// List of all origin IDs that are TCP
let tcp_origin_ids = m_def
.configurations
.iter()
.filter_map(|conf| {
conf.origin
.as_ref()
.filter(|origin| origin.p_type() == Tcp)
.map(|origin| origin.origin_id)
})
.collect::<Vec<u32>>();

let config = WriteConfig {
print_to_cli: args.is_cli,
output_file: file,
metadata_args,
p_type: m_def.p_type(),
m_type: m_def.m_type(),
is_multi_origin,
worker_map: args.worker_map.clone(),
is_record,
is_chaos,
tcp_origins: tcp_origin_ids,
};

// Start thread that writes results to file
Expand Down
Loading