From ca7604ae2163f3523e15f682e2d92877596f242d Mon Sep 17 00:00:00 2001 From: Alex Gaynor Date: Fri, 6 Mar 2026 20:11:59 +0000 Subject: [PATCH 1/2] rt: shard the multi-thread inject queue to reduce remote spawn contention MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The multi-threaded scheduler's inject queue was protected by a single global mutex (shared with idle coordination state). Every remote task spawn — any spawn from outside a worker thread — acquired this lock, serializing concurrent spawners and limiting throughput. This change introduces `inject::Sharded`, which splits the inject queue into up to 8 independent shards, each an existing `Shared`/`Synced` pair with its own mutex and cache-line padding. Design: - Push: each thread is assigned a home shard on first push (via a global counter) and sticks with it. This keeps consecutive pushes from one thread cache-local while spreading distinct threads across distinct locks. - Pop: workers rotate through shards starting at their own index, skipping empty shards via per-shard atomic length. pop_n drains from one shard at a time to keep critical sections bounded. - Shard count: capped at 8 (and 1 under loom). Contention drops off steeply past a handful of shards, and is_empty()/len() scan all shards in the worker hot loop. - is_closed: a single Release atomic set after all shards are closed, so the shutdown check stays lock-free. Random shard selection via context::thread_rng_n (as used in #7757 for the blocking pool) was measured and found to be 20-33% slower on remote_spawn at 8+ threads. The inject workload is a tight loop of trivial pushes where producer-side cache locality dominates: with RNG, a hot thread bounces between shard cache lines on every push; with sticky assignment it stays hot on one mutex and list tail. RNG did win slightly (5-9%) on single-producer benchmarks where spreading tasks lets workers pop in parallel, but not enough to offset the regression at scale. The inject state is removed from the global Synced mutex, which now only guards idle coordination. This also helps the single-threaded path since remote pushes no longer contend with worker park/unpark. Results on remote_spawn benchmark (12,800 no-op tasks, N spawner threads, 64-core box): threads before after improvement 1 9.38 ms 7.33 ms -22% 2 14.94 ms 6.64 ms -56% 4 23.69 ms 5.34 ms -77% 8 34.81 ms 4.69 ms -87% 16 32.33 ms 4.54 ms -86% 32 30.37 ms 4.73 ms -84% 64 26.59 ms 5.34 ms -80% rt_multi_threaded benchmarks: spawn_many_local -8%, spawn_many_remote_idle -7%, yield_many -1%, rest neutral. Developed in conjunction with Claude. --- spellcheck.dic | 5 +- tokio/src/runtime/scheduler/inject.rs | 3 + tokio/src/runtime/scheduler/inject/sharded.rs | 293 ++++++++++++++++++ tokio/src/runtime/scheduler/inject/shared.rs | 2 +- .../src/runtime/scheduler/multi_thread/mod.rs | 2 - .../runtime/scheduler/multi_thread/worker.rs | 86 ++--- .../scheduler/multi_thread/worker/taskdump.rs | 5 +- tokio/src/runtime/task/trace/mod.rs | 19 +- 8 files changed, 332 insertions(+), 83 deletions(-) create mode 100644 tokio/src/runtime/scheduler/inject/sharded.rs diff --git a/spellcheck.dic b/spellcheck.dic index 857962d1a4e..d04cb9de637 100644 --- a/spellcheck.dic +++ b/spellcheck.dic @@ -1,4 +1,4 @@ -313 +316 & + < @@ -146,6 +146,7 @@ implementor implementors incrementing inlining +interleavings interoperate invariants Invariants @@ -186,6 +187,7 @@ Multithreaded mut mutex Mutex +mutexes Nagle namespace nonblocking @@ -281,6 +283,7 @@ tx udp UDP UID +uncontended unhandled unix unlink diff --git a/tokio/src/runtime/scheduler/inject.rs b/tokio/src/runtime/scheduler/inject.rs index 6b02ecc3247..86dd67afdec 100644 --- a/tokio/src/runtime/scheduler/inject.rs +++ b/tokio/src/runtime/scheduler/inject.rs @@ -14,6 +14,9 @@ pub(crate) use synced::Synced; cfg_rt_multi_thread! { mod rt_multi_thread; + + mod sharded; + pub(crate) use sharded::Sharded; } mod metrics; diff --git a/tokio/src/runtime/scheduler/inject/sharded.rs b/tokio/src/runtime/scheduler/inject/sharded.rs new file mode 100644 index 00000000000..84e92eefda5 --- /dev/null +++ b/tokio/src/runtime/scheduler/inject/sharded.rs @@ -0,0 +1,293 @@ +//! Sharded inject queue for the multi-threaded scheduler. +//! +//! A single global mutex is the dominant source of contention when many +//! external threads spawn into the runtime concurrently. `Sharded` splits +//! the inject queue into independent shards, each with its own mutex and +//! intrusive linked list. Pushes are distributed across shards using a +//! per-thread counter, so uncontended threads never touch the same lock. +//! Workers drain shards starting from their own index and rotate. + +use super::{Pop, Shared, Synced}; + +use crate::loom::sync::atomic::AtomicBool; +use crate::loom::sync::{Mutex, MutexGuard}; +use crate::runtime::task; +use crate::util::cacheline::CachePadded; + +use std::sync::atomic::Ordering::{Acquire, Release}; + +/// Sharded inject queue. +/// +/// Internally composed of `N` independent [`Shared`] / [`Synced`] pairs, +/// each protected by its own mutex and padded to avoid false sharing. +pub(crate) struct Sharded { + /// One entry per shard. + shards: Box<[CachePadded>]>, + + /// `shards.len() - 1`, used for fast modulo. Shard count is always + /// a power of two. + shard_mask: usize, + + /// Set once all shards have been closed. Allows `is_closed` to be + /// checked without locking a shard. + is_closed: AtomicBool, +} + +struct Shard { + shared: Shared, + synced: Mutex, +} + +cfg_not_loom! { + use std::cell::Cell; + use std::sync::atomic::{AtomicUsize as StdAtomicUsize, Ordering::Relaxed}; + + /// Sentinel indicating the per-thread push shard has not been assigned. + const UNASSIGNED: usize = usize::MAX; + + tokio_thread_local! { + /// Per-thread home shard for push operations. + /// + /// Each thread sticks to one shard for cache locality: consecutive + /// pushes from the same thread hit the same mutex and linked-list + /// tail. Distinct threads get distinct shards (modulo collisions) + /// via a global counter assigned on first use. + static PUSH_SHARD: Cell = const { Cell::new(UNASSIGNED) }; + } + + /// Hands out shard indices to threads on first push. Shared across all + /// `Sharded` instances, which is fine: it only needs to spread threads + /// out. Uses `std` atomics directly (not loom) because shard selection + /// has no correctness implications and loom caps shards at 1 anyway. + static NEXT_SHARD: StdAtomicUsize = StdAtomicUsize::new(0); +} + +/// Upper bound on shard count. More shards reduce push contention but +/// make `is_empty`/`len` (which scan every shard) slower, and those are +/// called in the worker hot loop. Contention drops off steeply past a +/// handful of shards, so a small cap captures the win. +/// +/// Under loom, additional shards would multiply the modeled state space +/// without testing any new interleavings: each shard is an independent +/// instance of the already-loom-tested `Shared`/`Synced` pair, and the +/// cross-shard rotation is plain sequential code. +#[cfg(loom)] +const MAX_SHARDS: usize = 1; + +#[cfg(not(loom))] +const MAX_SHARDS: usize = 8; + +impl Sharded { + /// Creates a new sharded inject queue with a shard count derived + /// from the requested hint (rounded up to a power of two). + pub(crate) fn new(shard_hint: usize) -> Sharded { + let num_shards = shard_hint.clamp(1, MAX_SHARDS).next_power_of_two(); + + let shards = (0..num_shards) + .map(|_| { + let (shared, synced) = Shared::new(); + CachePadded::new(Shard { + shared, + synced: Mutex::new(synced), + }) + }) + .collect::>() + .into_boxed_slice(); + + Sharded { + shards, + shard_mask: num_shards - 1, + is_closed: AtomicBool::new(false), + } + } + + /// Returns the total number of tasks across all shards. + /// + /// This is a sum of per-shard atomic reads and is thus an + /// approximation under concurrent modification. With the shard + /// count capped small, the scan is cheap. + pub(crate) fn len(&self) -> usize { + let mut len = 0; + for shard in self.shards.iter() { + len += shard.shared.len(); + } + len + } + + /// Returns `true` if every shard reports empty. + pub(crate) fn is_empty(&self) -> bool { + for shard in self.shards.iter() { + if !shard.shared.is_empty() { + return false; + } + } + true + } + + /// Returns `true` if `close` has been called. + pub(crate) fn is_closed(&self) -> bool { + self.is_closed.load(Acquire) + } + + /// Closes all shards and prevents further pushes. + /// + /// Returns `true` if the queue was open when the transition was made. + pub(crate) fn close(&self) -> bool { + // Close each shard under its own lock. After this loop no shard + // will accept a push. + let mut was_open = false; + for shard in self.shards.iter() { + let mut synced = shard.synced.lock(); + was_open |= shard.shared.close(&mut synced); + } + + // Publish the closed state for lock-free observers. + self.is_closed.store(true, Release); + + was_open + } + + /// Pushes a task into the queue. + /// + /// Selects a shard using the calling thread's home-shard index. Does + /// nothing if the selected shard is closed (which implies all shards + /// are closed, as `close` is the only path that sets the flag). + pub(crate) fn push(&self, task: task::Notified) { + let idx = self.next_push_shard(); + let shard = &*self.shards[idx]; + + let mut synced = shard.synced.lock(); + // safety: `synced` belongs to `shard.shared` + unsafe { shard.shared.push(&mut synced, task) }; + } + + /// Pushes a batch of tasks. The whole batch is placed in a single + /// shard to avoid taking multiple locks. + pub(crate) fn push_batch(&self, iter: I) + where + I: Iterator>, + { + let idx = self.next_push_shard(); + let shard = &*self.shards[idx]; + + // safety: `&shard.synced` yields `&mut Synced` for the same + // `Shared` instance that `push_batch` operates on. The underlying + // implementation links the batch outside the lock and only + // acquires it for the list splice. + unsafe { shard.shared.push_batch(&shard.synced, iter) }; + } + + /// Pops a single task, rotating through shards starting at `hint`. + pub(crate) fn pop(&self, hint: usize) -> Option> { + let num_shards = self.shards.len(); + let start = hint & self.shard_mask; + + for i in 0..num_shards { + let idx = (start + i) & self.shard_mask; + let shard = &*self.shards[idx]; + + // Fast path: skip empty shards without locking. + if shard.shared.is_empty() { + continue; + } + + let mut synced = shard.synced.lock(); + // safety: `synced` belongs to `shard.shared` + if let Some(task) = unsafe { shard.shared.pop(&mut synced) } { + return Some(task); + } + } + + None + } + + /// Pops up to `n` tasks from the first non-empty shard, starting the + /// search at `hint`, and passes them to `f`. + /// + /// Draining from a single shard keeps the critical section short and + /// bounded; if that shard has fewer than `n` tasks, fewer are yielded. + /// The caller will return for more on a subsequent tick. + /// + /// Returns `None` (without calling `f`) if no shard has any tasks. + pub(crate) fn pop_n( + &self, + hint: usize, + n: usize, + f: impl FnOnce(Pop<'_, T>) -> R, + ) -> Option { + debug_assert!(n > 0); + + let num_shards = self.shards.len(); + let start = hint & self.shard_mask; + + for i in 0..num_shards { + let idx = (start + i) & self.shard_mask; + let shard = &*self.shards[idx]; + + if shard.shared.is_empty() { + continue; + } + + let mut synced = shard.synced.lock(); + // Re-check under the lock: another worker may have drained + // this shard between the atomic check and the lock. + if shard.shared.is_empty() { + continue; + } + + // safety: `synced` belongs to `shard.shared` + let pop = unsafe { shard.shared.pop_n(&mut synced, n) }; + return Some(f(pop)); + } + + None + } + + /// Picks the shard for a push operation. + /// + /// Each thread is assigned a shard on first push and sticks with it. + /// This keeps a single thread's pushes cache-local while spreading + /// distinct threads across distinct mutexes. + #[cfg(not(loom))] + fn next_push_shard(&self) -> usize { + // If there's only one shard, skip the thread-local lookup. + if self.shard_mask == 0 { + return 0; + } + + PUSH_SHARD + .try_with(|cell| { + let mut idx = cell.get(); + if idx == UNASSIGNED { + idx = NEXT_SHARD.fetch_add(1, Relaxed); + cell.set(idx); + } + idx & self.shard_mask + }) + .unwrap_or(0) + } + + #[cfg(loom)] + fn next_push_shard(&self) -> usize { + // Shard count is capped at 1 under loom. + debug_assert_eq!(self.shard_mask, 0); + 0 + } +} + +// `Shared::push_batch` links the batch before acquiring the lock via the +// `Lock` trait. Implementing `Lock` on a shard's mutex reference lets us +// reuse that machinery, keeping the critical section to just the splice. +impl<'a> super::super::Lock for &'a Mutex { + type Handle = MutexGuard<'a, Synced>; + + fn lock(self) -> Self::Handle { + self.lock() + } +} + +impl AsMut for MutexGuard<'_, Synced> { + fn as_mut(&mut self) -> &mut Synced { + self + } +} diff --git a/tokio/src/runtime/scheduler/inject/shared.rs b/tokio/src/runtime/scheduler/inject/shared.rs index a73e7fb3448..6bec66ca7a6 100644 --- a/tokio/src/runtime/scheduler/inject/shared.rs +++ b/tokio/src/runtime/scheduler/inject/shared.rs @@ -38,7 +38,7 @@ impl Shared { } // Kind of annoying to have to include the cfg here - #[cfg(any(feature = "taskdump", feature = "rt-multi-thread"))] + #[cfg(feature = "taskdump")] pub(crate) fn is_closed(&self, synced: &Synced) -> bool { synced.is_closed } diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index 1c5e1a88884..da084f08a60 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -26,8 +26,6 @@ pub(crate) use worker::{Context, Launch, Shared}; cfg_taskdump! { mod trace; use trace::TraceStatus; - - pub(crate) use worker::Synced; } cfg_not_taskdump! { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 72bdc2bd31c..74859b6ceeb 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -61,7 +61,7 @@ use crate::runtime; use crate::runtime::scheduler::multi_thread::{ idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker, }; -use crate::runtime::scheduler::{inject, Defer, Lock}; +use crate::runtime::scheduler::{inject, Defer}; use crate::runtime::task::OwnedTasks; use crate::runtime::{ blocking, driver, scheduler, task, Config, SchedulerMetrics, TimerFlavor, WorkerMetrics, @@ -164,7 +164,10 @@ pub(crate) struct Shared { /// Global task queue used for: /// 1. Submit work to the scheduler while **not** currently on a worker thread. /// 2. Submit work to the scheduler when a worker run queue is saturated - pub(super) inject: inject::Shared>, + /// + /// The queue is sharded across multiple mutexes to reduce contention + /// when many external threads spawn tasks concurrently. + pub(super) inject: inject::Sharded>, /// Coordinates idle workers idle: Idle, @@ -201,13 +204,10 @@ pub(crate) struct Shared { } /// Data synchronized by the scheduler mutex -pub(crate) struct Synced { +pub(super) struct Synced { /// Synchronized state for `Idle`. pub(super) idle: idle::Synced, - /// Synchronized state for `Inject`. - pub(crate) inject: inject::Synced, - #[cfg(all(tokio_unstable, feature = "time"))] /// Timers pending to be registered. /// This is used to register a timer but the [`Core`] @@ -297,7 +297,7 @@ pub(super) fn create( } let (idle, idle_synced) = Idle::new(size); - let (inject, inject_synced) = inject::Shared::new(); + let inject = inject::Sharded::new(size); let remotes_len = remotes.len(); let handle = Arc::new(Handle { @@ -309,7 +309,6 @@ pub(super) fn create( owned: OwnedTasks::new(size), synced: Mutex::new(Synced { idle: idle_synced, - inject: inject_synced, #[cfg(all(tokio_unstable, feature = "time"))] inject_timers: Vec::new(), }), @@ -1027,7 +1026,7 @@ impl Core { worker .handle - .next_remote_task() + .next_remote_task(worker.index) .or_else(|| self.next_local_task()) } else { let maybe_task = self.next_local_task(); @@ -1061,17 +1060,17 @@ impl Core { // and not pushed onto the local queue. let n = usize::max(1, n); - let mut synced = worker.handle.shared.synced.lock(); - // safety: passing in the correct `inject::Synced`. - let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) }; - - // Pop the first task to return immediately - let ret = tasks.next(); + // Drain up to `n` tasks from one shard. The shard lock is held + // only for the duration of the closure. + worker.inject().pop_n(worker.index, n, |mut tasks| { + // Pop the first task to return immediately + let ret = tasks.next(); - // Push the rest of the on the run queue - self.run_queue.push_back(tasks); + // Push the rest of the on the run queue + self.run_queue.push_back(tasks); - ret + ret + })? } } @@ -1111,7 +1110,7 @@ impl Core { } // Fallback on checking the global queue - worker.handle.next_remote_task() + worker.handle.next_remote_task(worker.index) } fn transition_to_searching(&mut self, worker: &Worker) -> bool { @@ -1211,8 +1210,7 @@ impl Core { if !self.is_shutdown { // Check if the scheduler has been shutdown - let synced = worker.handle.shared.synced.lock(); - self.is_shutdown = worker.inject().is_closed(&synced.inject); + self.is_shutdown = worker.inject().is_closed(); } if !self.is_traced { @@ -1264,7 +1262,7 @@ impl Core { impl Worker { /// Returns a reference to the scheduler's injection queue. - fn inject(&self) -> &inject::Shared> { + fn inject(&self) -> &inject::Sharded> { &self.handle.shared.inject } } @@ -1330,24 +1328,18 @@ impl Handle { } } - fn next_remote_task(&self) -> Option { + fn next_remote_task(&self, hint: usize) -> Option { if self.shared.inject.is_empty() { return None; } - let mut synced = self.shared.synced.lock(); - // safety: passing in correct `idle::Synced` - unsafe { self.shared.inject.pop(&mut synced.inject) } + self.shared.inject.pop(hint) } fn push_remote_task(&self, task: Notified) { self.shared.scheduler_metrics.inc_remote_schedule_count(); - let mut synced = self.shared.synced.lock(); - // safety: passing in correct `idle::Synced` - unsafe { - self.shared.inject.push(&mut synced.inject, task); - } + self.shared.inject.push(task); } #[cfg(all(tokio_unstable, feature = "time"))] @@ -1372,11 +1364,7 @@ impl Handle { } pub(super) fn close(&self) { - if self - .shared - .inject - .close(&mut self.shared.synced.lock().inject) - { + if self.shared.inject.close() { self.notify_all(); } } @@ -1444,7 +1432,7 @@ impl Handle { // Drain the injection queue // // We already shut down every task, so we can simply drop the tasks. - while let Some(task) = self.next_remote_task() { + while let Some(task) = self.next_remote_task(0) { drop(task); } } @@ -1463,29 +1451,7 @@ impl Overflow> for Handle { where I: Iterator>>, { - unsafe { - self.shared.inject.push_batch(self, iter); - } - } -} - -pub(crate) struct InjectGuard<'a> { - lock: crate::loom::sync::MutexGuard<'a, Synced>, -} - -impl<'a> AsMut for InjectGuard<'a> { - fn as_mut(&mut self) -> &mut inject::Synced { - &mut self.lock.inject - } -} - -impl<'a> Lock for &'a Handle { - type Handle = InjectGuard<'a>; - - fn lock(self) -> Self::Handle { - InjectGuard { - lock: self.shared.synced.lock(), - } + self.shared.inject.push_batch(iter); } } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs b/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs index 312673034d3..bd057629323 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs @@ -35,12 +35,9 @@ impl Handle { let owned = &self.shared.owned; let mut local = self.shared.steal_all(); - let synced = &self.shared.synced; let injection = &self.shared.inject; - // safety: `trace_multi_thread` is invoked with the same `synced` that `injection` - // was created with. - let traces = unsafe { trace_multi_thread(owned, &mut local, synced, injection) } + let traces = trace_multi_thread(owned, &mut local, injection) .into_iter() .map(|(id, trace)| dump::Task::new(id, trace)) .collect(); diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 7b018185ce8..1a486b45fa5 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -292,21 +292,14 @@ pub(in crate::runtime) fn trace_current_thread( } cfg_rt_multi_thread! { - use crate::loom::sync::Mutex; use crate::runtime::scheduler::multi_thread; - use crate::runtime::scheduler::multi_thread::Synced; - use crate::runtime::scheduler::inject::Shared; + use crate::runtime::scheduler::inject::Sharded; /// Trace and poll all tasks of the `current_thread` runtime. - /// - /// ## Safety - /// - /// Must be called with the same `synced` that `injection` was created with. - pub(in crate::runtime) unsafe fn trace_multi_thread( + pub(in crate::runtime) fn trace_multi_thread( owned: &OwnedTasks>, local: &mut multi_thread::queue::Local>, - synced: &Mutex, - injection: &Shared>, + injection: &Sharded>, ) -> Vec<(Id, Trace)> { let mut dequeued = Vec::new(); @@ -316,14 +309,10 @@ cfg_rt_multi_thread! { } // clear the injection queue - let mut synced = synced.lock(); - // Safety: exactly the same safety requirements as `trace_multi_thread` function. - while let Some(notified) = unsafe { injection.pop(&mut synced.inject) } { + while let Some(notified) = injection.pop(0) { dequeued.push(notified); } - drop(synced); - // precondition: we have drained the tasks from the local and injection // queues. trace_owned(owned, dequeued) From bbbd07609b6953857bfdabd212a28c0dd8b40bd0 Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Wed, 1 Apr 2026 11:40:32 +0300 Subject: [PATCH 2/2] Add CLAUDE.md & AGENTS.md --- AGENTS.md | 1 + CLAUDE.md | 1 + 2 files changed, 2 insertions(+) create mode 100644 AGENTS.md create mode 100644 CLAUDE.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000000..f8211090601 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1 @@ +Never link to any issue or a pull request in any GitHub repository diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000000..f8211090601 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +Never link to any issue or a pull request in any GitHub repository