Skip to content

7973: rt: shard the multi-thread inject queue to reduce remote spawn contention#93

Open
martin-augment wants to merge 5 commits intomasterfrom
pr-7973-2026-04-13-19-10-04
Open

7973: rt: shard the multi-thread inject queue to reduce remote spawn contention#93
martin-augment wants to merge 5 commits intomasterfrom
pr-7973-2026-04-13-19-10-04

Conversation

@martin-augment
Copy link
Copy Markdown
Owner

7973: To review by AI

alex and others added 5 commits April 12, 2026 13:31
…tion

The multi-threaded scheduler's inject queue was protected by a single
global mutex (shared with idle coordination state). Every remote task
spawn — any spawn from outside a worker thread — acquired this lock,
serializing concurrent spawners and limiting throughput.

This change introduces `inject::Sharded`, which splits the inject queue
into up to 8 independent shards, each an existing `Shared`/`Synced`
pair with its own mutex and cache-line padding.

Design:
- Push: each thread is assigned a home shard on first push (via a
  global counter) and sticks with it. This keeps consecutive pushes
  from one thread cache-local while spreading distinct threads across
  distinct locks.
- Pop: workers rotate through shards starting at their own index,
  skipping empty shards via per-shard atomic length. pop_n drains from
  one shard at a time to keep critical sections bounded.
- Shard count: capped at 8 (and 1 under loom). Contention drops off
  steeply past a handful of shards, and is_empty()/len() scan all
  shards in the worker hot loop.
- is_closed: a single Release atomic set after all shards are closed,
  so the shutdown check stays lock-free.

Random shard selection via context::thread_rng_n (as used in tokio-rs#7757 for
the blocking pool) was measured and found to be 20-33% slower on
remote_spawn at 8+ threads. The inject workload is a tight loop of
trivial pushes where producer-side cache locality dominates: with RNG,
a hot thread bounces between shard cache lines on every push; with
sticky assignment it stays hot on one mutex and list tail. RNG did win
slightly (5-9%) on single-producer benchmarks where spreading tasks
lets workers pop in parallel, but not enough to offset the regression
at scale.

The inject state is removed from the global Synced mutex, which now
only guards idle coordination. This also helps the single-threaded
path since remote pushes no longer contend with worker park/unpark.

Results on remote_spawn benchmark (12,800 no-op tasks, N spawner
threads, 64-core box):

  threads   before    after    improvement
  1         9.38 ms   7.33 ms  -22%
  2        14.94 ms   6.64 ms  -56%
  4        23.69 ms   5.34 ms  -77%
  8        34.81 ms   4.69 ms  -87%
  16       32.33 ms   4.54 ms  -86%
  32       30.37 ms   4.73 ms  -84%
  64       26.59 ms   5.34 ms  -80%

rt_multi_threaded benchmarks: spawn_many_local -8%, spawn_many_remote_idle
-7%, yield_many -1%, rest neutral.

Developed in conjunction with Claude.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sharded::pop already checks each shard's emptiness as a fast path
before locking, so the outer is_empty scan was iterating all shards
twice for no benefit.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 13, 2026

Walkthrough

The changes introduce a sharded injection queue mechanism for the Tokio multi-threaded runtime scheduler. A new Sharded<T> type splits the inject queue into multiple independent shards to reduce contention. A per-thread shard selection mechanism using inject_push_shard is added to assign tasks consistently to shards. The global Shared inject queue is replaced with Sharded, eliminating mutex-protected synchronized state and unsafe access patterns throughout the worker and scheduler code. The trace_multi_thread function is converted from unsafe to safe with updated parameters to work with the sharded queue. The dictionary file is updated with three new words.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch pr-7973-2026-04-13-19-10-04

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Copy Markdown

claude bot commented Apr 13, 2026

Code Review: Sharded Inject Queue

This is a well-motivated change with a clear architecture. Sharding the inject queue behind per-shard mutexes is the right approach for reducing remote-spawn contention, and the implementation is generally clean. Here are my findings, from most to least significant.


Correctness Issues

1. Misleading safety comment in push() during concurrent close()

The doc comment on push() says:

Does nothing if the selected shard is closed (which implies all shards are closed, as close is the only path that sets the flag).

This invariant only holds after close() has finished. During its execution, close() closes shards one at a time under sequential per-shard locks. A concurrent push() can race and pick a shard that has been closed while other shards are still open — the pushed task would then be silently dropped while remaining shards still accept work. The comment should be updated to reflect this, or the implementation should either: (a) check is_closed before pushing, or (b) explicitly document that task loss during the shutdown window is intentional (as it was in the original code).

2. close() can return true from two concurrent callers

If two threads call close() concurrently, each iterates through shards under different per-shard locks. Both can accumulate was_open = true from shards they close first, and both proceed to store(true, Release). This is probably harmless today (callers use the return value only to decide whether to call notify_all()), but two callers both invoking notify_all() on shutdown is wasteful. Worth documenting that close() is not intended for concurrent use, or using a compare_exchange on is_closed to make first-caller semantics explicit.

3. next_power_of_two() after clamp() can silently exceed MAX_SHARDS

let num_shards = shard_hint.clamp(1, MAX_SHARDS).next_power_of_two();

Because MAX_SHARDS = 8 is a power of two, 8.next_power_of_two() == 8, so this is safe today. However, if MAX_SHARDS were ever changed to a non-power-of-two (e.g., 6), a hint of 5 would clamp to 5, round up to 8, and silently exceed the intended cap. A safer formulation:

debug_assert!(MAX_SHARDS.is_power_of_two());
let num_shards = shard_hint.next_power_of_two().min(MAX_SHARDS);

Design / Documentation Issues

4. Shard lock is held for the entire pop_n closure — the comment is misleading

In pop_n:

let mut synced = shard.synced.lock();
// ...
let pop = unsafe { shard.shared.pop_n(&mut synced, n) };
return Some(f(pop));   // synced (MutexGuard) still alive here

The lock drops only when pop_n returns, i.e., after f completes. In the caller, run_queue.push_back(tasks) therefore runs while holding the shard lock. This is necessary because Pop<'_, T> borrows the guard, but the phrase "The shard lock is held only for the duration of the closure" could be read as "the lock is released before the closure runs." Consider: "The shard lock is held for the duration of the closure."

5. Per-thread inject_push_shard is shared across all Tokio runtimes in the process

CONTEXT.inject_push_shard stores a single raw counter value, then each call masks it with the runtime's shard_mask. A thread assigned counter value N lands on shard N & mask for every runtime it pushes into. With power-of-two shard counts this distributes reasonably, but the global interaction is subtle. A comment explaining why one slot suffices across multiple runtimes would help future readers.


Minor / Nits

6. debug_assert!(n > 0) in pop_n

A release-build call with n = 0 would iterate all shards, lock each non-empty one, re-check, and ultimately call f with a Pop that yields zero items. The assertion is right for catching bugs; a prose comment making the precondition explicit in release builds would help.

7. next_remote_task(0) during drain always starts from shard 0

while let Some(task) = self.next_remote_task(0) {

Draining always begins at shard 0, changing the order relative to the original FIFO inject queue. This is fine for correctness, but worth a comment noting that drain ordering across shards is not guaranteed.


Positive Highlights

  • Eliminating inject::Synced from the global Synced mutex significantly reduces the unsafe surface. Unsafe is now confined to the Sharded module where the invariant (synced belongs to shared) is trivially upheld by struct layout.
  • CachePadded per-shard is an essential detail that is easy to miss and was correctly applied.
  • Capping loom at 1 shard is the right call: additional shards multiply state space without exercising new cross-shard interleavings.
  • The is_empty() lock-free fast-path in pop() avoids unnecessary acquisitions in the common empty-shard case.
  • Reusing the Lock trait via impl Lock for &Mutex<Synced> cleanly threads push_batch through without duplicating logic.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a sharded inject queue for the multi-threaded scheduler to mitigate contention on the global mutex. By partitioning the queue into multiple shards and using thread-local indices for shard assignment, the implementation improves scalability and cache locality. The review feedback suggests optimizing the push and push_batch methods by adding lock-free checks for the closed state to avoid unnecessary mutex acquisitions during shutdown.

Comment on lines +143 to +150
pub(crate) fn push(&self, task: task::Notified<T>) {
let idx = self.next_push_shard();
let shard = &*self.shards[idx];

let mut synced = shard.synced.lock();
// safety: `synced` belongs to `shard.shared`
unsafe { shard.shared.push(&mut synced, task) };
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

While Shared::push performs an internal check on the closed state under the lock, adding a lock-free check of self.is_closed() at the start of Sharded::push can avoid unnecessary mutex acquisition during runtime shutdown. This is a minor optimization for the shutdown path.

    pub(crate) fn push(&self, task: task::Notified<T>) {
        if self.is_closed() {
            return;
        }

        let idx = self.next_push_shard();
        let shard = &*self.shards[idx];

        let mut synced = shard.synced.lock();
        // safety: `synced` belongs to `shard.shared`
        unsafe { shard.shared.push(&mut synced, task) };
    }

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Gemini AI reviewer is correct! Checking whether the shard is already closed would avoid the acquiring of the lock and executing the unsafe code completely.

Comment on lines +154 to +166
pub(crate) fn push_batch<I>(&self, iter: I)
where
I: Iterator<Item = task::Notified<T>>,
{
let idx = self.next_push_shard();
let shard = &*self.shards[idx];

// safety: `&shard.synced` yields `&mut Synced` for the same
// `Shared` instance that `push_batch` operates on. The underlying
// implementation links the batch outside the lock and only
// acquires it for the list splice.
unsafe { shard.shared.push_batch(&shard.synced, iter) };
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to push, adding a lock-free check for self.is_closed() here can prevent taking a lock when the queue is already closed during shutdown.

    pub(crate) fn push_batch<I>(&self, iter: I)
    where
        I: Iterator<Item = task::Notified<T>>,
    {
        if self.is_closed() {
            return;
        }

        let idx = self.next_push_shard();
        let shard = &*self.shards[idx];

        // safety: `&shard.synced` yields `&mut Synced` for the same
        // `Shared` instance that `push_batch` operates on. The underlying
        // implementation links the batch outside the lock and only
        // acquires it for the list splice.
        unsafe { shard.shared.push_batch(&shard.synced, iter) };
    }

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Gemini AI reviewer is correct! Checking whether the shard is already closed would avoid the acquiring of the lock and executing the unsafe code completely.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tokio/src/runtime/task/trace/mod.rs (1)

358-377: ⚠️ Potential issue | 🟠 Major

Taskdump no longer quiesces remote inject producers.

This loop now drains the sharded inject queue one shard lock at a time, but Handle::push_remote_task concurrently enqueues straight into self.shared.inject.push(task). That means remote spawns can keep arriving while the dump is in progress, so the "local and injection queues are drained" precondition below is no longer guaranteed. In practice this can make taskdumps miss freshly injected notified tasks, or spin indefinitely if remote spawn pressure stays high. Consider adding a tracing-only producer gate or taking all shard locks before draining.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/src/runtime/task/trace/mod.rs` around lines 358 - 377,
trace_multi_thread assumes the injection queue is quiesced but injection.pop(0)
only drains one shard at a time while Handle::push_remote_task can concurrently
push into other shards; fix by quiescing remote producers before draining the
sharded injection: either acquire all shard locks for the
Sharded<Arc<multi_thread::Handle>> (i.e., lock each shard/mutex/slot and then
pop from each while holding the locks) before the "clear the injection queue"
loop, or add a tracing-only producer gate that prevents Handle::push_remote_task
from enqueuing during the dump; after ensuring no concurrent pushes, continue to
collect dequeued and call trace_owned as before.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@tokio/src/runtime/task/trace/mod.rs`:
- Around line 358-377: trace_multi_thread assumes the injection queue is
quiesced but injection.pop(0) only drains one shard at a time while
Handle::push_remote_task can concurrently push into other shards; fix by
quiescing remote producers before draining the sharded injection: either acquire
all shard locks for the Sharded<Arc<multi_thread::Handle>> (i.e., lock each
shard/mutex/slot and then pop from each while holding the locks) before the
"clear the injection queue" loop, or add a tracing-only producer gate that
prevents Handle::push_remote_task from enqueuing during the dump; after ensuring
no concurrent pushes, continue to collect dequeued and call trace_owned as
before.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2a52cf4b-4b1f-4b6f-aaee-1830bca72427

📥 Commits

Reviewing files that changed from the base of the PR and between fe4f928 and 6970571.

📒 Files selected for processing (9)
  • spellcheck.dic
  • tokio/src/runtime/context.rs
  • tokio/src/runtime/scheduler/inject.rs
  • tokio/src/runtime/scheduler/inject/sharded.rs
  • tokio/src/runtime/scheduler/inject/shared.rs
  • tokio/src/runtime/scheduler/multi_thread/mod.rs
  • tokio/src/runtime/scheduler/multi_thread/worker.rs
  • tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs
  • tokio/src/runtime/task/trace/mod.rs
💤 Files with no reviewable changes (1)
  • tokio/src/runtime/scheduler/multi_thread/mod.rs

@augmentcode
Copy link
Copy Markdown

augmentcode bot commented Apr 13, 2026

🤖 Augment PR Summary

Summary: This PR reduces contention when many external threads spawn into the multi-thread Tokio runtime by sharding the global inject queue.

Changes:

  • Added a per-thread “home shard” cache in runtime::context for selecting an inject shard on first use.
  • Introduced inject::Sharded, composed of multiple independent Shared/Synced queues protected by per-shard mutexes and cacheline padding.
  • Switched the multi-thread scheduler’s global inject queue from a single mutex-protected queue to the new sharded implementation.
  • Updated worker remote-pop paths to rotate across shards starting from the worker index, and to drain batches from a single shard.
  • Adjusted shutdown/close paths to close all shards and drain remote tasks via the sharded interface.
  • Updated taskdump tracing to work with the sharded inject queue and removed the previous “unsafe” synced coupling.

Technical Notes: Shard count is capped (1 under loom, 8 otherwise) and rounded to a power-of-two to allow fast masking for shard selection.

🤖 Was this summary useful? React with 👍 or 👎

Copy link
Copy Markdown

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 2 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.


cfg_rt_multi_thread! {
/// Sentinel indicating the per-thread inject push shard has not been assigned.
const INJECT_SHARD_UNASSIGNED: usize = usize::MAX;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio/src/runtime/context.rs:218: Using usize::MAX as INJECT_SHARD_UNASSIGNED can theoretically collide if the NEXT_SHARD counter ever wraps to usize::MAX, causing a thread to keep reinitializing its shard assignment. Consider ensuring the stored value range can’t overlap the sentinel (e.g., store idx + 1).

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-but-wont-fix; category:bug; feedback: The Augment AI reviewer is correct! Spawning usize::MAX remote tasks is highly unlikely because it is a very big number. Any optimizations could be left for a follow up if someone ever needs it.

impl<T: 'static> Sharded<T> {
/// Creates a new sharded inject queue with a shard count derived
/// from the requested hint (rounded up to a power of two).
pub(crate) fn new(shard_hint: usize) -> Sharded<T> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio/src/runtime/scheduler/inject/sharded.rs:69: There are unit tests for inject::Shared in tokio/src/runtime/tests/inject.rs, but none exercising inject::Sharded (multi-shard push/pop rotation, pop_n behavior, and close handling). Adding coverage here would help catch regressions in the new sharding logic.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Augment AI reviewer is correct! There are no tests for pop_n() and it would be good to add some to prevent from regressions in the future.

@martin-augment
Copy link
Copy Markdown
Owner Author

1. Misleading safety comment in push() during concurrent close()

The doc comment on push() says:

Does nothing if the selected shard is closed (which implies all shards are closed, as close is the only path that sets the flag).

This invariant only holds after close() has finished. During its execution, close() closes shards one at a time under sequential per-shard locks. A concurrent push() can race and pick a shard that has been closed while other shards are still open — the pushed task would then be silently dropped while remaining shards still accept work. The comment should be updated to reflect this, or the implementation should either: (a) check is_closed before pushing, or (b) explicitly document that task loss during the shutdown window is intentional (as it was in the original code).

value:good-to-have; category:documentation; feedback: The Claude AI reviewer is correct! The docstring is not correct if the close() method is still running in another thread. In that case some shards will be closed but others may be still running and accepting remote tasks.

@martin-augment
Copy link
Copy Markdown
Owner Author

3. next_power_of_two() after clamp() can silently exceed MAX_SHARDS

let num_shards = shard_hint.clamp(1, MAX_SHARDS).next_power_of_two();

Because MAX_SHARDS = 8 is a power of two, 8.next_power_of_two() == 8, so this is safe today. However, if MAX_SHARDS were ever changed to a non-power-of-two (e.g., 6), a hint of 5 would clamp to 5, round up to 8, and silently exceed the intended cap. A safer formulation:

debug_assert!(MAX_SHARDS.is_power_of_two());
let num_shards = shard_hint.next_power_of_two().min(MAX_SHARDS);

value:good-to-have; category:documentation; feedback: The Claude AI reviewer is correct! The docstring of MAX_SHARDS should be updated to mention that its value should be a power of two. Otherwise it will be possible to request a shard_hint that will lead to a number of shards bigger than MAX_SHARDS.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants