-
Notifications
You must be signed in to change notification settings - Fork 0
7944: bench: add remote_spawn benchmark for inject queue contention #69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| //! Benchmark remote task spawning (push_remote_task) at different concurrency | ||
| //! levels on the multi-threaded scheduler. | ||
| //! | ||
| //! This measures contention on the scheduler's inject queue mutex when multiple | ||
| //! external (non-worker) threads spawn tasks into the tokio runtime simultaneously. | ||
| //! Every rt.spawn() from an external thread unconditionally goes through | ||
| //! push_remote_task, making this a direct measurement of inject queue contention. | ||
| //! | ||
| //! For each parallelism level N (1, 2, 4, 8, 16, 32, 64, capped at available parallelism): | ||
| //! - Spawns N std::threads (external to the runtime) | ||
| //! - Each thread spawns TOTAL_TASKS / N tasks into the runtime via rt.spawn() | ||
| //! - All threads are synchronized with a barrier to maximize contention | ||
| //! - Tasks are trivial no-ops to isolate the push overhead | ||
|
|
||
| use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; | ||
| use std::sync::Barrier; | ||
| use tokio::runtime::{self, Runtime}; | ||
|
|
||
| /// Total number of tasks spawned across all threads per iteration. | ||
| /// Must be divisible by the largest parallelism level (64). | ||
| const TOTAL_TASKS: usize = 12_800; | ||
|
|
||
| fn remote_spawn_contention(c: &mut Criterion) { | ||
| let parallelism_levels = parallelism_levels(); | ||
| let mut group = c.benchmark_group("remote_spawn"); | ||
|
|
||
| for num_threads in ¶llelism_levels { | ||
| let num_threads = *num_threads; | ||
| group.bench_with_input( | ||
| BenchmarkId::new("threads", num_threads), | ||
| &num_threads, | ||
| |b, &num_threads| { | ||
| let rt = rt(); | ||
| let tasks_per_thread = TOTAL_TASKS / num_threads; | ||
|
|
||
| b.iter(|| { | ||
| let barrier = Barrier::new(num_threads); | ||
|
|
||
| std::thread::scope(|s| { | ||
| let handles: Vec<_> = (0..num_threads) | ||
| .map(|_| { | ||
| let barrier = &barrier; | ||
| let rt = &rt; | ||
| s.spawn(move || { | ||
| let mut join_handles = Vec::with_capacity(tasks_per_thread); | ||
| barrier.wait(); | ||
|
|
||
| for _ in 0..tasks_per_thread { | ||
| join_handles.push(rt.spawn(async {})); | ||
| } | ||
| join_handles | ||
| }) | ||
| }) | ||
| .collect(); | ||
|
|
||
| let all_handles: Vec<_> = handles | ||
| .into_iter() | ||
| .flat_map(|h| h.join().unwrap()) | ||
| .collect(); | ||
|
|
||
| rt.block_on(async { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The timed region also includes collecting all Severity: medium 🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The Augment AI reviewer is correct! By using Bencher::iter_custom() the measurement could be isolated to include only the spawning and exclude the setup and teardown (awaiting). |
||
| for h in all_handles { | ||
| h.await.unwrap(); | ||
| } | ||
| }); | ||
| }); | ||
| }); | ||
|
Comment on lines
+36
to
+67
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation with b.iter_custom(|iters| {
let mut total_duration = std::time::Duration::ZERO;
for _ in 0..iters {
let barrier = Barrier::new(num_threads);
let start = std::time::Instant::now();
let all_handles = std::thread::scope(|s| {
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let barrier = &barrier;
let rt = &rt;
s.spawn(move || {
let mut join_handles =
Vec::with_capacity(tasks_per_thread);
barrier.wait();
for _ in 0..tasks_per_thread {
join_handles.push(rt.spawn(async {}));
}
join_handles
})
})
.collect();
handles
.into_iter()
.flat_map(|h| h.join().unwrap())
.collect::<Vec<_>>()
});
total_duration += start.elapsed();
// Cleanup: wait for all tasks to complete before the next iteration.
rt.block_on(async {
for h in all_handles {
h.await.unwrap();
}
});
}
total_duration
});
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! By using Bencher::iter_custom() the measurement could be isolated to include only the spawning and exclude the setup and teardown (awaiting).
Comment on lines
+36
to
+67
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # First, let's see if the file exists and read the relevant section
cat -n benches/remote_spawn.rs | head -80Repository: martin-augment/tokio Length of output: 3784 Measured region currently mixes thread lifecycle cost with spawn contention. The code structure confirms the issue: 🤖 Prompt for AI Agents
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is correct! By using Bencher::iter_custom() the measurement could be isolated to include only the spawning and exclude the setup and teardown (awaiting). |
||
| }, | ||
| ); | ||
| } | ||
|
|
||
| group.finish(); | ||
| } | ||
|
|
||
| fn parallelism_levels() -> Vec<usize> { | ||
| let max_parallelism = std::thread::available_parallelism() | ||
| .map(|p| p.get()) | ||
| .unwrap_or(1); | ||
|
|
||
| [1, 2, 4, 8, 16, 32, 64] | ||
| .into_iter() | ||
| .filter(|&n| n <= max_parallelism) | ||
| .collect() | ||
| } | ||
|
|
||
| fn rt() -> Runtime { | ||
| runtime::Builder::new_multi_thread() | ||
| .enable_all() | ||
| .build() | ||
| .unwrap() | ||
| } | ||
|
|
||
| criterion_group!(remote_spawn_benches, remote_spawn_contention); | ||
|
|
||
| criterion_main!(remote_spawn_benches); | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b.itercurrently includes creating/joiningnum_threadsOS threads each iteration (std::thread::scope), which can dominate timings and obscure inject-queue contention. Consider separating thread setup from the timed region if the goal is to measurepush_remote_taskoverhead.Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:good-to-have; category:bug; feedback: The Augment AI reviewer is correct! By using Bencher::iter_custom() the measurement could be isolated to include only the spawning and exclude the setup and teardown (awaiting).