diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000000..f8211090601 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1 @@ +Never link to any issue or a pull request in any GitHub repository diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000000..f8211090601 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +Never link to any issue or a pull request in any GitHub repository diff --git a/spellcheck.dic b/spellcheck.dic index 857962d1a4e..d04cb9de637 100644 --- a/spellcheck.dic +++ b/spellcheck.dic @@ -1,4 +1,4 @@ -313 +316 & + < @@ -146,6 +146,7 @@ implementor implementors incrementing inlining +interleavings interoperate invariants Invariants @@ -186,6 +187,7 @@ Multithreaded mut mutex Mutex +mutexes Nagle namespace nonblocking @@ -281,6 +283,7 @@ tx udp UDP UID +uncontended unhandled unix unlink diff --git a/tokio/src/runtime/scheduler/inject.rs b/tokio/src/runtime/scheduler/inject.rs index 6b02ecc3247..86dd67afdec 100644 --- a/tokio/src/runtime/scheduler/inject.rs +++ b/tokio/src/runtime/scheduler/inject.rs @@ -14,6 +14,9 @@ pub(crate) use synced::Synced; cfg_rt_multi_thread! { mod rt_multi_thread; + + mod sharded; + pub(crate) use sharded::Sharded; } mod metrics; diff --git a/tokio/src/runtime/scheduler/inject/sharded.rs b/tokio/src/runtime/scheduler/inject/sharded.rs new file mode 100644 index 00000000000..84e92eefda5 --- /dev/null +++ b/tokio/src/runtime/scheduler/inject/sharded.rs @@ -0,0 +1,293 @@ +//! Sharded inject queue for the multi-threaded scheduler. +//! +//! A single global mutex is the dominant source of contention when many +//! external threads spawn into the runtime concurrently. `Sharded` splits +//! the inject queue into independent shards, each with its own mutex and +//! intrusive linked list. Pushes are distributed across shards using a +//! per-thread counter, so uncontended threads never touch the same lock. +//! Workers drain shards starting from their own index and rotate. + +use super::{Pop, Shared, Synced}; + +use crate::loom::sync::atomic::AtomicBool; +use crate::loom::sync::{Mutex, MutexGuard}; +use crate::runtime::task; +use crate::util::cacheline::CachePadded; + +use std::sync::atomic::Ordering::{Acquire, Release}; + +/// Sharded inject queue. +/// +/// Internally composed of `N` independent [`Shared`] / [`Synced`] pairs, +/// each protected by its own mutex and padded to avoid false sharing. +pub(crate) struct Sharded { + /// One entry per shard. + shards: Box<[CachePadded>]>, + + /// `shards.len() - 1`, used for fast modulo. Shard count is always + /// a power of two. + shard_mask: usize, + + /// Set once all shards have been closed. Allows `is_closed` to be + /// checked without locking a shard. + is_closed: AtomicBool, +} + +struct Shard { + shared: Shared, + synced: Mutex, +} + +cfg_not_loom! { + use std::cell::Cell; + use std::sync::atomic::{AtomicUsize as StdAtomicUsize, Ordering::Relaxed}; + + /// Sentinel indicating the per-thread push shard has not been assigned. + const UNASSIGNED: usize = usize::MAX; + + tokio_thread_local! { + /// Per-thread home shard for push operations. + /// + /// Each thread sticks to one shard for cache locality: consecutive + /// pushes from the same thread hit the same mutex and linked-list + /// tail. Distinct threads get distinct shards (modulo collisions) + /// via a global counter assigned on first use. + static PUSH_SHARD: Cell = const { Cell::new(UNASSIGNED) }; + } + + /// Hands out shard indices to threads on first push. Shared across all + /// `Sharded` instances, which is fine: it only needs to spread threads + /// out. Uses `std` atomics directly (not loom) because shard selection + /// has no correctness implications and loom caps shards at 1 anyway. + static NEXT_SHARD: StdAtomicUsize = StdAtomicUsize::new(0); +} + +/// Upper bound on shard count. More shards reduce push contention but +/// make `is_empty`/`len` (which scan every shard) slower, and those are +/// called in the worker hot loop. Contention drops off steeply past a +/// handful of shards, so a small cap captures the win. +/// +/// Under loom, additional shards would multiply the modeled state space +/// without testing any new interleavings: each shard is an independent +/// instance of the already-loom-tested `Shared`/`Synced` pair, and the +/// cross-shard rotation is plain sequential code. +#[cfg(loom)] +const MAX_SHARDS: usize = 1; + +#[cfg(not(loom))] +const MAX_SHARDS: usize = 8; + +impl Sharded { + /// Creates a new sharded inject queue with a shard count derived + /// from the requested hint (rounded up to a power of two). + pub(crate) fn new(shard_hint: usize) -> Sharded { + let num_shards = shard_hint.clamp(1, MAX_SHARDS).next_power_of_two(); + + let shards = (0..num_shards) + .map(|_| { + let (shared, synced) = Shared::new(); + CachePadded::new(Shard { + shared, + synced: Mutex::new(synced), + }) + }) + .collect::>() + .into_boxed_slice(); + + Sharded { + shards, + shard_mask: num_shards - 1, + is_closed: AtomicBool::new(false), + } + } + + /// Returns the total number of tasks across all shards. + /// + /// This is a sum of per-shard atomic reads and is thus an + /// approximation under concurrent modification. With the shard + /// count capped small, the scan is cheap. + pub(crate) fn len(&self) -> usize { + let mut len = 0; + for shard in self.shards.iter() { + len += shard.shared.len(); + } + len + } + + /// Returns `true` if every shard reports empty. + pub(crate) fn is_empty(&self) -> bool { + for shard in self.shards.iter() { + if !shard.shared.is_empty() { + return false; + } + } + true + } + + /// Returns `true` if `close` has been called. + pub(crate) fn is_closed(&self) -> bool { + self.is_closed.load(Acquire) + } + + /// Closes all shards and prevents further pushes. + /// + /// Returns `true` if the queue was open when the transition was made. + pub(crate) fn close(&self) -> bool { + // Close each shard under its own lock. After this loop no shard + // will accept a push. + let mut was_open = false; + for shard in self.shards.iter() { + let mut synced = shard.synced.lock(); + was_open |= shard.shared.close(&mut synced); + } + + // Publish the closed state for lock-free observers. + self.is_closed.store(true, Release); + + was_open + } + + /// Pushes a task into the queue. + /// + /// Selects a shard using the calling thread's home-shard index. Does + /// nothing if the selected shard is closed (which implies all shards + /// are closed, as `close` is the only path that sets the flag). + pub(crate) fn push(&self, task: task::Notified) { + let idx = self.next_push_shard(); + let shard = &*self.shards[idx]; + + let mut synced = shard.synced.lock(); + // safety: `synced` belongs to `shard.shared` + unsafe { shard.shared.push(&mut synced, task) }; + } + + /// Pushes a batch of tasks. The whole batch is placed in a single + /// shard to avoid taking multiple locks. + pub(crate) fn push_batch(&self, iter: I) + where + I: Iterator>, + { + let idx = self.next_push_shard(); + let shard = &*self.shards[idx]; + + // safety: `&shard.synced` yields `&mut Synced` for the same + // `Shared` instance that `push_batch` operates on. The underlying + // implementation links the batch outside the lock and only + // acquires it for the list splice. + unsafe { shard.shared.push_batch(&shard.synced, iter) }; + } + + /// Pops a single task, rotating through shards starting at `hint`. + pub(crate) fn pop(&self, hint: usize) -> Option> { + let num_shards = self.shards.len(); + let start = hint & self.shard_mask; + + for i in 0..num_shards { + let idx = (start + i) & self.shard_mask; + let shard = &*self.shards[idx]; + + // Fast path: skip empty shards without locking. + if shard.shared.is_empty() { + continue; + } + + let mut synced = shard.synced.lock(); + // safety: `synced` belongs to `shard.shared` + if let Some(task) = unsafe { shard.shared.pop(&mut synced) } { + return Some(task); + } + } + + None + } + + /// Pops up to `n` tasks from the first non-empty shard, starting the + /// search at `hint`, and passes them to `f`. + /// + /// Draining from a single shard keeps the critical section short and + /// bounded; if that shard has fewer than `n` tasks, fewer are yielded. + /// The caller will return for more on a subsequent tick. + /// + /// Returns `None` (without calling `f`) if no shard has any tasks. + pub(crate) fn pop_n( + &self, + hint: usize, + n: usize, + f: impl FnOnce(Pop<'_, T>) -> R, + ) -> Option { + debug_assert!(n > 0); + + let num_shards = self.shards.len(); + let start = hint & self.shard_mask; + + for i in 0..num_shards { + let idx = (start + i) & self.shard_mask; + let shard = &*self.shards[idx]; + + if shard.shared.is_empty() { + continue; + } + + let mut synced = shard.synced.lock(); + // Re-check under the lock: another worker may have drained + // this shard between the atomic check and the lock. + if shard.shared.is_empty() { + continue; + } + + // safety: `synced` belongs to `shard.shared` + let pop = unsafe { shard.shared.pop_n(&mut synced, n) }; + return Some(f(pop)); + } + + None + } + + /// Picks the shard for a push operation. + /// + /// Each thread is assigned a shard on first push and sticks with it. + /// This keeps a single thread's pushes cache-local while spreading + /// distinct threads across distinct mutexes. + #[cfg(not(loom))] + fn next_push_shard(&self) -> usize { + // If there's only one shard, skip the thread-local lookup. + if self.shard_mask == 0 { + return 0; + } + + PUSH_SHARD + .try_with(|cell| { + let mut idx = cell.get(); + if idx == UNASSIGNED { + idx = NEXT_SHARD.fetch_add(1, Relaxed); + cell.set(idx); + } + idx & self.shard_mask + }) + .unwrap_or(0) + } + + #[cfg(loom)] + fn next_push_shard(&self) -> usize { + // Shard count is capped at 1 under loom. + debug_assert_eq!(self.shard_mask, 0); + 0 + } +} + +// `Shared::push_batch` links the batch before acquiring the lock via the +// `Lock` trait. Implementing `Lock` on a shard's mutex reference lets us +// reuse that machinery, keeping the critical section to just the splice. +impl<'a> super::super::Lock for &'a Mutex { + type Handle = MutexGuard<'a, Synced>; + + fn lock(self) -> Self::Handle { + self.lock() + } +} + +impl AsMut for MutexGuard<'_, Synced> { + fn as_mut(&mut self) -> &mut Synced { + self + } +} diff --git a/tokio/src/runtime/scheduler/inject/shared.rs b/tokio/src/runtime/scheduler/inject/shared.rs index a73e7fb3448..6bec66ca7a6 100644 --- a/tokio/src/runtime/scheduler/inject/shared.rs +++ b/tokio/src/runtime/scheduler/inject/shared.rs @@ -38,7 +38,7 @@ impl Shared { } // Kind of annoying to have to include the cfg here - #[cfg(any(feature = "taskdump", feature = "rt-multi-thread"))] + #[cfg(feature = "taskdump")] pub(crate) fn is_closed(&self, synced: &Synced) -> bool { synced.is_closed } diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index 1c5e1a88884..da084f08a60 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -26,8 +26,6 @@ pub(crate) use worker::{Context, Launch, Shared}; cfg_taskdump! { mod trace; use trace::TraceStatus; - - pub(crate) use worker::Synced; } cfg_not_taskdump! { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 72bdc2bd31c..74859b6ceeb 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -61,7 +61,7 @@ use crate::runtime; use crate::runtime::scheduler::multi_thread::{ idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker, }; -use crate::runtime::scheduler::{inject, Defer, Lock}; +use crate::runtime::scheduler::{inject, Defer}; use crate::runtime::task::OwnedTasks; use crate::runtime::{ blocking, driver, scheduler, task, Config, SchedulerMetrics, TimerFlavor, WorkerMetrics, @@ -164,7 +164,10 @@ pub(crate) struct Shared { /// Global task queue used for: /// 1. Submit work to the scheduler while **not** currently on a worker thread. /// 2. Submit work to the scheduler when a worker run queue is saturated - pub(super) inject: inject::Shared>, + /// + /// The queue is sharded across multiple mutexes to reduce contention + /// when many external threads spawn tasks concurrently. + pub(super) inject: inject::Sharded>, /// Coordinates idle workers idle: Idle, @@ -201,13 +204,10 @@ pub(crate) struct Shared { } /// Data synchronized by the scheduler mutex -pub(crate) struct Synced { +pub(super) struct Synced { /// Synchronized state for `Idle`. pub(super) idle: idle::Synced, - /// Synchronized state for `Inject`. - pub(crate) inject: inject::Synced, - #[cfg(all(tokio_unstable, feature = "time"))] /// Timers pending to be registered. /// This is used to register a timer but the [`Core`] @@ -297,7 +297,7 @@ pub(super) fn create( } let (idle, idle_synced) = Idle::new(size); - let (inject, inject_synced) = inject::Shared::new(); + let inject = inject::Sharded::new(size); let remotes_len = remotes.len(); let handle = Arc::new(Handle { @@ -309,7 +309,6 @@ pub(super) fn create( owned: OwnedTasks::new(size), synced: Mutex::new(Synced { idle: idle_synced, - inject: inject_synced, #[cfg(all(tokio_unstable, feature = "time"))] inject_timers: Vec::new(), }), @@ -1027,7 +1026,7 @@ impl Core { worker .handle - .next_remote_task() + .next_remote_task(worker.index) .or_else(|| self.next_local_task()) } else { let maybe_task = self.next_local_task(); @@ -1061,17 +1060,17 @@ impl Core { // and not pushed onto the local queue. let n = usize::max(1, n); - let mut synced = worker.handle.shared.synced.lock(); - // safety: passing in the correct `inject::Synced`. - let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) }; - - // Pop the first task to return immediately - let ret = tasks.next(); + // Drain up to `n` tasks from one shard. The shard lock is held + // only for the duration of the closure. + worker.inject().pop_n(worker.index, n, |mut tasks| { + // Pop the first task to return immediately + let ret = tasks.next(); - // Push the rest of the on the run queue - self.run_queue.push_back(tasks); + // Push the rest of the on the run queue + self.run_queue.push_back(tasks); - ret + ret + })? } } @@ -1111,7 +1110,7 @@ impl Core { } // Fallback on checking the global queue - worker.handle.next_remote_task() + worker.handle.next_remote_task(worker.index) } fn transition_to_searching(&mut self, worker: &Worker) -> bool { @@ -1211,8 +1210,7 @@ impl Core { if !self.is_shutdown { // Check if the scheduler has been shutdown - let synced = worker.handle.shared.synced.lock(); - self.is_shutdown = worker.inject().is_closed(&synced.inject); + self.is_shutdown = worker.inject().is_closed(); } if !self.is_traced { @@ -1264,7 +1262,7 @@ impl Core { impl Worker { /// Returns a reference to the scheduler's injection queue. - fn inject(&self) -> &inject::Shared> { + fn inject(&self) -> &inject::Sharded> { &self.handle.shared.inject } } @@ -1330,24 +1328,18 @@ impl Handle { } } - fn next_remote_task(&self) -> Option { + fn next_remote_task(&self, hint: usize) -> Option { if self.shared.inject.is_empty() { return None; } - let mut synced = self.shared.synced.lock(); - // safety: passing in correct `idle::Synced` - unsafe { self.shared.inject.pop(&mut synced.inject) } + self.shared.inject.pop(hint) } fn push_remote_task(&self, task: Notified) { self.shared.scheduler_metrics.inc_remote_schedule_count(); - let mut synced = self.shared.synced.lock(); - // safety: passing in correct `idle::Synced` - unsafe { - self.shared.inject.push(&mut synced.inject, task); - } + self.shared.inject.push(task); } #[cfg(all(tokio_unstable, feature = "time"))] @@ -1372,11 +1364,7 @@ impl Handle { } pub(super) fn close(&self) { - if self - .shared - .inject - .close(&mut self.shared.synced.lock().inject) - { + if self.shared.inject.close() { self.notify_all(); } } @@ -1444,7 +1432,7 @@ impl Handle { // Drain the injection queue // // We already shut down every task, so we can simply drop the tasks. - while let Some(task) = self.next_remote_task() { + while let Some(task) = self.next_remote_task(0) { drop(task); } } @@ -1463,29 +1451,7 @@ impl Overflow> for Handle { where I: Iterator>>, { - unsafe { - self.shared.inject.push_batch(self, iter); - } - } -} - -pub(crate) struct InjectGuard<'a> { - lock: crate::loom::sync::MutexGuard<'a, Synced>, -} - -impl<'a> AsMut for InjectGuard<'a> { - fn as_mut(&mut self) -> &mut inject::Synced { - &mut self.lock.inject - } -} - -impl<'a> Lock for &'a Handle { - type Handle = InjectGuard<'a>; - - fn lock(self) -> Self::Handle { - InjectGuard { - lock: self.shared.synced.lock(), - } + self.shared.inject.push_batch(iter); } } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs b/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs index 312673034d3..bd057629323 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs @@ -35,12 +35,9 @@ impl Handle { let owned = &self.shared.owned; let mut local = self.shared.steal_all(); - let synced = &self.shared.synced; let injection = &self.shared.inject; - // safety: `trace_multi_thread` is invoked with the same `synced` that `injection` - // was created with. - let traces = unsafe { trace_multi_thread(owned, &mut local, synced, injection) } + let traces = trace_multi_thread(owned, &mut local, injection) .into_iter() .map(|(id, trace)| dump::Task::new(id, trace)) .collect(); diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 7b018185ce8..1a486b45fa5 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -292,21 +292,14 @@ pub(in crate::runtime) fn trace_current_thread( } cfg_rt_multi_thread! { - use crate::loom::sync::Mutex; use crate::runtime::scheduler::multi_thread; - use crate::runtime::scheduler::multi_thread::Synced; - use crate::runtime::scheduler::inject::Shared; + use crate::runtime::scheduler::inject::Sharded; /// Trace and poll all tasks of the `current_thread` runtime. - /// - /// ## Safety - /// - /// Must be called with the same `synced` that `injection` was created with. - pub(in crate::runtime) unsafe fn trace_multi_thread( + pub(in crate::runtime) fn trace_multi_thread( owned: &OwnedTasks>, local: &mut multi_thread::queue::Local>, - synced: &Mutex, - injection: &Shared>, + injection: &Sharded>, ) -> Vec<(Id, Trace)> { let mut dequeued = Vec::new(); @@ -316,14 +309,10 @@ cfg_rt_multi_thread! { } // clear the injection queue - let mut synced = synced.lock(); - // Safety: exactly the same safety requirements as `trace_multi_thread` function. - while let Some(notified) = unsafe { injection.pop(&mut synced.inject) } { + while let Some(notified) = injection.pop(0) { dequeued.push(notified); } - drop(synced); - // precondition: we have drained the tasks from the local and injection // queues. trace_owned(owned, dequeued)