Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Never link to any issue or a pull request in any GitHub repository
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated AI instruction files committed with code change

Low Severity

AGENTS.md and CLAUDE.md were added containing only the directive "Never link to any issue or a pull request in any GitHub repository." These files are unrelated to the inject queue sharding change described in the PR title and appear to be AI prompt-manipulation instructions that were bundled into an unrelated PR.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if CLAUDE.md was also added and examine any related context

fd -t f 'CLAUDE.md|AGENTS.md' -x cat {}

Repository: martin-augment/tokio

Length of output: 197


🏁 Script executed:

git log --oneline --follow -- AGENTS.md CLAUDE.md | head -20

Repository: martin-augment/tokio

Length of output: 109


🏁 Script executed:

git show HEAD:AGENTS.md 2>/dev/null || echo "File not in HEAD"
git show HEAD^:AGENTS.md 2>/dev/null || echo "File not in parent"

Repository: martin-augment/tokio

Length of output: 149


🏁 Script executed:

# Check if there's a PR description or related documentation
find . -maxdepth 2 -type f \( -name "*.md" -o -name "*.txt" \) | head -20

Repository: martin-augment/tokio

Length of output: 512


🏁 Script executed:

# Look for any references to AGENTS.md or CLAUDE.md in the codebase
rg -i "agents|claude" --type md --type txt | head -30

Repository: martin-augment/tokio

Length of output: 46


Clarify the instruction and provide rationale.

Both AGENTS.md and CLAUDE.md contain an ambiguous, one-line instruction with no context or examples. The instruction needs clarification:

  • What does "link" mean—hyperlinks only, or any reference to issues/PRs?
  • Does "any GitHub repository" include this repository, or only external ones?
  • What contexts does this apply to (commit messages, code comments, documentation)?

Add documentation explaining the rationale for this guideline and specific examples to help contributors understand when and how it applies.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@AGENTS.md` at line 1, Update AGENTS.md (and mirror in CLAUDE.md) to replace
the one-line rule with a clarified guideline: define "link" to include both
clickable URLs and plain-text references to GitHub issues/PRs, specify that the
restriction applies to external repositories but allows internal references when
necessary with justification, list the contexts covered (commit messages, code
comments, docs, PR descriptions), provide a short rationale
(privacy/security/irrelevance) and include 3 concrete examples showing a
disallowed external GitHub link, an allowed internal reference with
justification, and a suggested alternative (e.g., summarize the issue instead of
linking).

1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Never link to any issue or a pull request in any GitHub repository
5 changes: 4 additions & 1 deletion spellcheck.dic
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
313
316
&
+
<
Expand Down Expand Up @@ -146,6 +146,7 @@ implementor
implementors
incrementing
inlining
interleavings
interoperate
invariants
Invariants
Expand Down Expand Up @@ -186,6 +187,7 @@ Multithreaded
mut
mutex
Mutex
mutexes
Nagle
namespace
nonblocking
Expand Down Expand Up @@ -281,6 +283,7 @@ tx
udp
UDP
UID
uncontended
unhandled
unix
unlink
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/scheduler/inject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub(crate) use synced::Synced;

cfg_rt_multi_thread! {
mod rt_multi_thread;

mod sharded;
pub(crate) use sharded::Sharded;
}

mod metrics;
Expand Down
293 changes: 293 additions & 0 deletions tokio/src/runtime/scheduler/inject/sharded.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
//! Sharded inject queue for the multi-threaded scheduler.
//!
//! A single global mutex is the dominant source of contention when many
//! external threads spawn into the runtime concurrently. `Sharded` splits
//! the inject queue into independent shards, each with its own mutex and
//! intrusive linked list. Pushes are distributed across shards using a
//! per-thread counter, so uncontended threads never touch the same lock.
//! Workers drain shards starting from their own index and rotate.

use super::{Pop, Shared, Synced};

use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::{Mutex, MutexGuard};
use crate::runtime::task;
use crate::util::cacheline::CachePadded;

use std::sync::atomic::Ordering::{Acquire, Release};

/// Sharded inject queue.
///
/// Internally composed of `N` independent [`Shared`] / [`Synced`] pairs,
/// each protected by its own mutex and padded to avoid false sharing.
pub(crate) struct Sharded<T: 'static> {
/// One entry per shard.
shards: Box<[CachePadded<Shard<T>>]>,

/// `shards.len() - 1`, used for fast modulo. Shard count is always
/// a power of two.
shard_mask: usize,

/// Set once all shards have been closed. Allows `is_closed` to be
/// checked without locking a shard.
is_closed: AtomicBool,
}

struct Shard<T: 'static> {
shared: Shared<T>,
synced: Mutex<Synced>,
}

cfg_not_loom! {
use std::cell::Cell;
use std::sync::atomic::{AtomicUsize as StdAtomicUsize, Ordering::Relaxed};

/// Sentinel indicating the per-thread push shard has not been assigned.
const UNASSIGNED: usize = usize::MAX;

tokio_thread_local! {
/// Per-thread home shard for push operations.
///
/// Each thread sticks to one shard for cache locality: consecutive
/// pushes from the same thread hit the same mutex and linked-list
/// tail. Distinct threads get distinct shards (modulo collisions)
/// via a global counter assigned on first use.
static PUSH_SHARD: Cell<usize> = const { Cell::new(UNASSIGNED) };
}

/// Hands out shard indices to threads on first push. Shared across all
/// `Sharded` instances, which is fine: it only needs to spread threads
/// out. Uses `std` atomics directly (not loom) because shard selection
/// has no correctness implications and loom caps shards at 1 anyway.
static NEXT_SHARD: StdAtomicUsize = StdAtomicUsize::new(0);
}

/// Upper bound on shard count. More shards reduce push contention but
/// make `is_empty`/`len` (which scan every shard) slower, and those are
/// called in the worker hot loop. Contention drops off steeply past a
/// handful of shards, so a small cap captures the win.
///
/// Under loom, additional shards would multiply the modeled state space
/// without testing any new interleavings: each shard is an independent
/// instance of the already-loom-tested `Shared`/`Synced` pair, and the
/// cross-shard rotation is plain sequential code.
#[cfg(loom)]
const MAX_SHARDS: usize = 1;

#[cfg(not(loom))]
const MAX_SHARDS: usize = 8;

impl<T: 'static> Sharded<T> {
/// Creates a new sharded inject queue with a shard count derived
/// from the requested hint (rounded up to a power of two).
pub(crate) fn new(shard_hint: usize) -> Sharded<T> {
let num_shards = shard_hint.clamp(1, MAX_SHARDS).next_power_of_two();

let shards = (0..num_shards)
.map(|_| {
let (shared, synced) = Shared::new();
CachePadded::new(Shard {
shared,
synced: Mutex::new(synced),
})
})
.collect::<Vec<_>>()
.into_boxed_slice();

Sharded {
shards,
shard_mask: num_shards - 1,
is_closed: AtomicBool::new(false),
}
}

/// Returns the total number of tasks across all shards.
///
/// This is a sum of per-shard atomic reads and is thus an
/// approximation under concurrent modification. With the shard
/// count capped small, the scan is cheap.
pub(crate) fn len(&self) -> usize {
let mut len = 0;
for shard in self.shards.iter() {
len += shard.shared.len();
}
len
}

/// Returns `true` if every shard reports empty.
pub(crate) fn is_empty(&self) -> bool {
for shard in self.shards.iter() {
if !shard.shared.is_empty() {
return false;
}
}
true
}

/// Returns `true` if `close` has been called.
pub(crate) fn is_closed(&self) -> bool {
self.is_closed.load(Acquire)
}

/// Closes all shards and prevents further pushes.
///
/// Returns `true` if the queue was open when the transition was made.
pub(crate) fn close(&self) -> bool {
// Close each shard under its own lock. After this loop no shard
// will accept a push.
let mut was_open = false;
for shard in self.shards.iter() {
let mut synced = shard.synced.lock();
was_open |= shard.shared.close(&mut synced);
}

// Publish the closed state for lock-free observers.
self.is_closed.store(true, Release);

was_open
}

/// Pushes a task into the queue.
///
/// Selects a shard using the calling thread's home-shard index. Does
/// nothing if the selected shard is closed (which implies all shards
/// are closed, as `close` is the only path that sets the flag).
pub(crate) fn push(&self, task: task::Notified<T>) {
let idx = self.next_push_shard();
let shard = &*self.shards[idx];

let mut synced = shard.synced.lock();
// safety: `synced` belongs to `shard.shared`
unsafe { shard.shared.push(&mut synced, task) };
}

/// Pushes a batch of tasks. The whole batch is placed in a single
/// shard to avoid taking multiple locks.
pub(crate) fn push_batch<I>(&self, iter: I)
where
I: Iterator<Item = task::Notified<T>>,
{
let idx = self.next_push_shard();
let shard = &*self.shards[idx];

// safety: `&shard.synced` yields `&mut Synced` for the same
// `Shared` instance that `push_batch` operates on. The underlying
// implementation links the batch outside the lock and only
// acquires it for the list splice.
unsafe { shard.shared.push_batch(&shard.synced, iter) };
}

/// Pops a single task, rotating through shards starting at `hint`.
pub(crate) fn pop(&self, hint: usize) -> Option<task::Notified<T>> {
let num_shards = self.shards.len();
let start = hint & self.shard_mask;

for i in 0..num_shards {
let idx = (start + i) & self.shard_mask;
let shard = &*self.shards[idx];

// Fast path: skip empty shards without locking.
if shard.shared.is_empty() {
continue;
}

let mut synced = shard.synced.lock();
// safety: `synced` belongs to `shard.shared`
if let Some(task) = unsafe { shard.shared.pop(&mut synced) } {
return Some(task);
}
}

None
}

/// Pops up to `n` tasks from the first non-empty shard, starting the
/// search at `hint`, and passes them to `f`.
///
/// Draining from a single shard keeps the critical section short and
/// bounded; if that shard has fewer than `n` tasks, fewer are yielded.
/// The caller will return for more on a subsequent tick.
///
/// Returns `None` (without calling `f`) if no shard has any tasks.
pub(crate) fn pop_n<R>(
&self,
hint: usize,
n: usize,
f: impl FnOnce(Pop<'_, T>) -> R,
) -> Option<R> {
debug_assert!(n > 0);

let num_shards = self.shards.len();
let start = hint & self.shard_mask;

for i in 0..num_shards {
let idx = (start + i) & self.shard_mask;
let shard = &*self.shards[idx];

if shard.shared.is_empty() {
continue;
}

let mut synced = shard.synced.lock();
// Re-check under the lock: another worker may have drained
// this shard between the atomic check and the lock.
if shard.shared.is_empty() {
continue;
}

// safety: `synced` belongs to `shard.shared`
let pop = unsafe { shard.shared.pop_n(&mut synced, n) };
return Some(f(pop));
}

None
}

/// Picks the shard for a push operation.
///
/// Each thread is assigned a shard on first push and sticks with it.
/// This keeps a single thread's pushes cache-local while spreading
/// distinct threads across distinct mutexes.
#[cfg(not(loom))]
fn next_push_shard(&self) -> usize {
// If there's only one shard, skip the thread-local lookup.
if self.shard_mask == 0 {
return 0;
}

PUSH_SHARD
.try_with(|cell| {
let mut idx = cell.get();
if idx == UNASSIGNED {
idx = NEXT_SHARD.fetch_add(1, Relaxed);
cell.set(idx);
}
idx & self.shard_mask
})
.unwrap_or(0)
}

#[cfg(loom)]
fn next_push_shard(&self) -> usize {
// Shard count is capped at 1 under loom.
debug_assert_eq!(self.shard_mask, 0);
0
}
}

// `Shared::push_batch` links the batch before acquiring the lock via the
// `Lock` trait. Implementing `Lock` on a shard's mutex reference lets us
// reuse that machinery, keeping the critical section to just the splice.
impl<'a> super::super::Lock<Synced> for &'a Mutex<Synced> {
type Handle = MutexGuard<'a, Synced>;

fn lock(self) -> Self::Handle {
self.lock()
}
}

impl AsMut<Synced> for MutexGuard<'_, Synced> {
fn as_mut(&mut self) -> &mut Synced {
self
}
}
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/inject/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<T: 'static> Shared<T> {
}

// Kind of annoying to have to include the cfg here
#[cfg(any(feature = "taskdump", feature = "rt-multi-thread"))]
#[cfg(feature = "taskdump")]
pub(crate) fn is_closed(&self, synced: &Synced) -> bool {
synced.is_closed
}
Comment on lines 40 to 44
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Verify that Shared::is_closed is not called from rt-multi-thread paths

# Search for calls to shared.is_closed or Shared::is_closed patterns
rg -n 'shared\.is_closed|Shared::is_closed' --type rust

# Also check for is_closed calls on inject::Shared specifically
rg -nC3 'inject::Shared' --type rust | head -100

Repository: martin-augment/tokio

Length of output: 1353


🏁 Script executed:

cat -n tokio/src/runtime/scheduler/inject.rs | head -60

Repository: martin-augment/tokio

Length of output: 2006


🏁 Script executed:

# Search more comprehensively for all is_closed calls
rg 'is_closed\(' --type rust -n

# Also check for any usage in rt-multi-thread specific code
rg -n 'is_closed' tokio/src/runtime/scheduler/rt_multi_thread.rs 2>/dev/null || echo "File not found or no matches"

Repository: martin-augment/tokio

Length of output: 9450


🏁 Script executed:

# Check multi-thread worker context
sed -n '1205,1220p' tokio/src/runtime/scheduler/multi_thread/worker.rs

# Check current-thread context
sed -n '550,560p' tokio/src/runtime/scheduler/current_thread/mod.rs

Repository: martin-augment/tokio

Length of output: 918


🏁 Script executed:

# Get wider context for multi_thread/worker.rs maintenance function
sed -n '1195,1225p' tokio/src/runtime/scheduler/multi_thread/worker.rs

# Get wider context for current_thread/mod.rs
sed -n '540,570p' tokio/src/runtime/scheduler/current_thread/mod.rs

Repository: martin-augment/tokio

Length of output: 2246


🏁 Script executed:

# Check if maintenance() has cfg guards
grep -B10 'fn maintenance' tokio/src/runtime/scheduler/multi_thread/worker.rs | head -20

# Check if pre_shutdown() has cfg guards
grep -B10 'fn pre_shutdown' tokio/src/runtime/scheduler/current_thread/mod.rs | head -20

# More broadly, check what's around current_thread pre_shutdown
sed -n '500,560p' tokio/src/runtime/scheduler/current_thread/mod.rs | grep -E '#\[cfg|fn pre_shutdown|trace'

Repository: martin-augment/tokio

Length of output: 764


🏁 Script executed:

# Get the full function signature for pre_shutdown with its cfg guards
sed -n '520,580p' tokio/src/runtime/scheduler/current_thread/mod.rs

Repository: martin-augment/tokio

Length of output: 1954


🏁 Script executed:

# Check where maintenance() is called from in multi_thread/worker.rs
rg -n 'maintenance\(' tokio/src/runtime/scheduler/multi_thread/worker.rs

# Get context around those calls
rg -B5 'self.maintenance\(' tokio/src/runtime/scheduler/multi_thread/worker.rs

Repository: martin-augment/tokio

Length of output: 532


🏁 Script executed:

# Check the context around line 566 where maintenance is called
sed -n '555,575p' tokio/src/runtime/scheduler/multi_thread/worker.rs

# Check the context around line 742 fn maintenance definition
sed -n '735,760p' tokio/src/runtime/scheduler/multi_thread/worker.rs

Repository: martin-augment/tokio

Length of output: 1602


🏁 Script executed:

# Check what module Core is in and if it's in rt_multi_thread
grep -n 'struct Core' tokio/src/runtime/scheduler/multi_thread/worker.rs | head -5

# Check line 1207 more carefully - what's the full context
sed -n '1195,1220p' tokio/src/runtime/scheduler/multi_thread/worker.rs

Repository: martin-augment/tokio

Length of output: 870


🏁 Script executed:

# Check if maintenance method or surrounding code has cfg guards
sed -n '1200,1230p' tokio/src/runtime/scheduler/multi_thread/worker.rs

# More importantly, check if Core::maintenance is used conditionally
rg -B5 -A15 'fn maintenance\(&mut self, worker: &Worker' tokio/src/runtime/scheduler/multi_thread/worker.rs | head -30

Repository: martin-augment/tokio

Length of output: 1838


🏁 Script executed:

# Check if Inject::is_closed is called from Core::maintenance in multi_thread
# and whether Core::maintenance itself is guarded
grep -n 'impl.*Core' tokio/src/runtime/scheduler/multi_thread/worker.rs | head -5

# More importantly, verify the actual file structure
head -50 tokio/src/runtime/scheduler/multi_thread/worker.rs

Repository: martin-augment/tokio

Length of output: 2381


🏁 Script executed:

# Check if Core::maintenance is guarded by cfg
sed -n '1005,1030p' tokio/src/runtime/scheduler/multi_thread/worker.rs

# Also check if the entire multi_thread module is guarded
head -30 tokio/src/runtime/scheduler/multi_thread/worker.rs

# Check if there's any cfg around the maintenance call at line 753
sed -n '750,760p' tokio/src/runtime/scheduler/multi_thread/worker.rs

Repository: martin-augment/tokio

Length of output: 2504


🏁 Script executed:

# Check if there's a cfg guard around the is_closed call at line 1213
sed -n '1205,1220p' tokio/src/runtime/scheduler/multi_thread/worker.rs | cat -A

# Check Cargo.toml for feature dependencies
grep -A20 '\[features\]' tokio/Cargo.toml | head -40

Repository: martin-augment/tokio

Length of output: 865


🏁 Script executed:

# Check if is_traced is also a taskdump-only field, which might indicate the whole code path is guarded
grep -n 'is_traced' tokio/src/runtime/scheduler/multi_thread/worker.rs | head -20

# Check if is_traced is set conditionally
rg 'is_traced.*=' tokio/src/runtime/scheduler/multi_thread/worker.rs

Repository: martin-augment/tokio

Length of output: 475


🏁 Script executed:

# Check if there are multiple is_closed definitions - maybe one without cfg
rg -n 'pub.*fn is_closed' tokio/src/runtime/scheduler/inject.rs

# Check if cfg_rt_multi_thread macro has any relationship to taskdump
grep -n 'cfg_rt_multi_thread' tokio/src/runtime/scheduler/inject.rs

# More importantly, check if there's conditional compilation around the is_closed call
rg -B10 'worker.inject\(\).is_closed' tokio/src/runtime/scheduler/multi_thread/worker.rs

Repository: martin-augment/tokio

Length of output: 511


🏁 Script executed:

# Check if maybe is_closed is conditionally compiled elsewhere
rg -n '#\[cfg' tokio/src/runtime/scheduler/inject.rs

# Check if there's any guard around the call itself
sed -n '1210,1220p' tokio/src/runtime/scheduler/multi_thread/worker.rs

# Most importantly: check if this code compiles by looking at test results or if there are feature combinations
grep -n 'rt-multi-thread' tokio/Cargo.toml | head -10

Repository: martin-augment/tokio

Length of output: 491


The cfg narrowing appears incorrect—worker.inject().is_closed() is called unconditionally from Core::maintenance() in the rt-multi-thread scheduler.

The review assumes no multi-thread code path calls Shared::is_closed, but evidence shows Core::maintenance() at tokio/src/runtime/scheduler/multi_thread/worker.rs:1213 calls worker.inject().is_closed() without any cfg guard. Since Inject::is_closed() is guarded by #[cfg(feature = "taskdump")], this would cause a compilation failure when rt-multi-thread is enabled without the taskdump feature. Verify whether there's a second unguarded implementation of is_closed() or whether this call should also be guarded by taskdump.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/src/runtime/scheduler/inject/shared.rs` around lines 40 - 44, The
method visibility mismatch: Shared::is_closed is compiled only with
#[cfg(feature = "taskdump")] but Core::maintenance() calls
worker.inject().is_closed() unconditionally (see Core::maintenance,
worker.inject(), Inject::is_closed), causing build failures when rt-multi-thread
is enabled without taskdump; fix by making the API unconditional or guarding the
call. Either remove the cfg from Shared::is_closed (and the corresponding
Inject::is_closed) so the method always exists and simply return
synced.is_closed, or keep the cfg and add #[cfg(feature = "taskdump")] to the
call site in Core::maintenance (worker.inject().is_closed()), ensuring both
declaration and use are consistently guarded.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:annoying; category:bug; feedback: The CodeRabbit AI reviewer is not correct! The PR changed worker.rs:1213 to use the new Sharded::is_closed() that uses an AtomicBool to track whether the shard is closed, so there is no need of the cfg gating.

Expand Down
2 changes: 0 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ pub(crate) use worker::{Context, Launch, Shared};
cfg_taskdump! {
mod trace;
use trace::TraceStatus;

pub(crate) use worker::Synced;
}

cfg_not_taskdump! {
Expand Down
Loading
Loading