7973: rt: shard the multi-thread inject queue to reduce remote spawn contention#89
7973: rt: shard the multi-thread inject queue to reduce remote spawn contention#89martin-augment wants to merge 2 commits intomasterfrom
Conversation
…tion The multi-threaded scheduler's inject queue was protected by a single global mutex (shared with idle coordination state). Every remote task spawn — any spawn from outside a worker thread — acquired this lock, serializing concurrent spawners and limiting throughput. This change introduces `inject::Sharded`, which splits the inject queue into up to 8 independent shards, each an existing `Shared`/`Synced` pair with its own mutex and cache-line padding. Design: - Push: each thread is assigned a home shard on first push (via a global counter) and sticks with it. This keeps consecutive pushes from one thread cache-local while spreading distinct threads across distinct locks. - Pop: workers rotate through shards starting at their own index, skipping empty shards via per-shard atomic length. pop_n drains from one shard at a time to keep critical sections bounded. - Shard count: capped at 8 (and 1 under loom). Contention drops off steeply past a handful of shards, and is_empty()/len() scan all shards in the worker hot loop. - is_closed: a single Release atomic set after all shards are closed, so the shutdown check stays lock-free. Random shard selection via context::thread_rng_n (as used in tokio-rs#7757 for the blocking pool) was measured and found to be 20-33% slower on remote_spawn at 8+ threads. The inject workload is a tight loop of trivial pushes where producer-side cache locality dominates: with RNG, a hot thread bounces between shard cache lines on every push; with sticky assignment it stays hot on one mutex and list tail. RNG did win slightly (5-9%) on single-producer benchmarks where spreading tasks lets workers pop in parallel, but not enough to offset the regression at scale. The inject state is removed from the global Synced mutex, which now only guards idle coordination. This also helps the single-threaded path since remote pushes no longer contend with worker park/unpark. Results on remote_spawn benchmark (12,800 no-op tasks, N spawner threads, 64-core box): threads before after improvement 1 9.38 ms 7.33 ms -22% 2 14.94 ms 6.64 ms -56% 4 23.69 ms 5.34 ms -77% 8 34.81 ms 4.69 ms -87% 16 32.33 ms 4.54 ms -86% 32 30.37 ms 4.73 ms -84% 64 26.59 ms 5.34 ms -80% rt_multi_threaded benchmarks: spawn_many_local -8%, spawn_many_remote_idle -7%, yield_many -1%, rest neutral. Developed in conjunction with Claude.
WalkthroughThis pull request introduces documentation files (AGENTS.md, CLAUDE.md) with linking instructions, updates the spellcheck dictionary with new terms, and refactors Tokio's multi-threaded runtime scheduler's task injection queue. The changes replace a single shared injection queue ( ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Code ReviewThis is a well-motivated change with a clean design. Sharding the inject queue is the right approach for reducing remote-spawn contention, and the implementation is generally sound. Here are my observations: Correctness
let mut synced = shard.synced.lock();
if shard.shared.is_empty() { // re-check under lock
continue;
}
if let Some(task) = unsafe { shard.shared.pop(&mut synced) } {
return Some(task);
}
On a second if was_open {
self.is_closed.store(true, Release);
}Design Concerns
The docstring acknowledges this, but the implication is worth spelling out: if external threads have a non-uniform push pattern (e.g., all landing on one shard due to affinity), the workers whose hint places them far from that shard will scan through up to
For non-power-of-two worker counts (3, 5, 6, 7 workers), Global The comment correctly explains this is intentional. Worth noting one subtle edge case: threads that are alive when the TLS value is first assigned from one runtime's Test CoverageNo direct unit tests for The new type has no dedicated tests. The loom tests cap
The new Minor Nits
SummaryThe core approach (sharding + per-thread home shard + worker-index-hinted drain) is sound and the implementation correctly handles the close/push/pop lifecycle. The main gaps are the missing unit tests for the |
There was a problem hiding this comment.
Code Review
This pull request introduces a sharded injection queue for the multi-threaded scheduler to reduce contention on the global task queue. By partitioning the queue into multiple shards, each protected by its own mutex, the implementation allows concurrent task submissions from multiple external threads with significantly reduced lock contention. The scheduler's internal structures were updated to remove the global injection queue from the main synchronization lock, and worker logic was adjusted to rotate through shards when popping tasks. I have no feedback to provide as the review comments were purely explanatory.
🤖 Augment PR SummarySummary: This PR reduces multi-thread runtime contention by sharding the global inject queue used for remote task scheduling. Changes:
Technical Notes: Shard count is capped (and forced to 1 under loom) to keep hot-loop scanning cost bounded while still reducing remote-spawn mutex contention. 🤖 Was this summary useful? React with 👍 or 👎 |
| let mut synced = synced.lock(); | ||
| // Safety: exactly the same safety requirements as `trace_multi_thread` function. | ||
| while let Some(notified) = unsafe { injection.pop(&mut synced.inject) } { | ||
| while let Some(notified) = injection.pop(0) { |
There was a problem hiding this comment.
This loop drains the sharded injection queue while external threads can still enqueue into it (remote spawns no longer share the scheduler mutex), so under sustained remote spawning the task dump could become a moving target or potentially never complete.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| @@ -0,0 +1 @@ | |||
| Never link to any issue or a pull request in any GitHub repository | |||
There was a problem hiding this comment.
Unrelated AI instruction files committed with code change
Low Severity
AGENTS.md and CLAUDE.md were added containing only the directive "Never link to any issue or a pull request in any GitHub repository." These files are unrelated to the inject queue sharding change described in the PR title and appear to be AI prompt-manipulation instructions that were bundled into an unrelated PR.
Additional Locations (1)
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
tokio/src/runtime/scheduler/inject/sharded.rs (2)
251-268: Consider documenting the thread-local fallback behavior.When
PUSH_SHARD.try_with()fails (e.g., during thread shutdown), the fallback to shard 0 is safe but could cause temporary contention. This is acceptable, but a brief comment explaining the fallback would aid future maintainers.📝 Optional documentation improvement
PUSH_SHARD .try_with(|cell| { let mut idx = cell.get(); if idx == UNASSIGNED { idx = NEXT_SHARD.fetch_add(1, Relaxed); cell.set(idx); } idx & self.shard_mask }) + // If the thread-local is inaccessible (e.g., during thread + // shutdown), fall back to shard 0. This may cause brief + // contention but is safe. .unwrap_or(0) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tokio/src/runtime/scheduler/inject/sharded.rs` around lines 251 - 268, Add a brief inline comment inside fn next_push_shard explaining why PUSH_SHARD.try_with unwraps to 0: note that when thread-local access fails (e.g., during thread shutdown or teardown) we intentionally fall back to shard 0 to remain safe, that this may cause temporary contention but is acceptable, and mention the roles of PUSH_SHARD, UNASSIGNED, and NEXT_SHARD in assigning per-thread shard indices so future maintainers understand the rationale. Place the comment near the unwrap_or(0) fallback and reference shard_mask behavior for the single-shard fast-path.
132-148: Verify ordering betweenclose()and concurrentpush_batch()calls.The
close()implementation iterates through shards sequentially, closing each under its own lock. A concurrentpush_batch()could theoretically select an already-iterated (still open) shard beforeclose()reaches it, then acquire the lock after that shard is closed—but this is actually fine since the per-shardsynced.is_closedflag is checked under the lock.However, the comment "After this loop no shard will accept a push" is accurate only after the loop completes. Consider whether the
is_closed.store(true, Release)should precede the shard iteration to allow early bail-out in push paths (though current impl is still correct).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tokio/src/runtime/scheduler/inject/sharded.rs` around lines 132 - 148, Move the publication of the closed flag earlier so concurrent push paths can observe the global closed state and bail out: in close() call self.is_closed.store(true, Release) before iterating shards, then iterate self.shards, take each shard.synced.lock(), and call shard.shared.close(&mut synced) to compute was_open as before; keep using shard.shared.close and shard.synced.lock so per-shard state is still set under the lock and push_batch() still checks per-shard synced.is_closed under the lock.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@AGENTS.md`:
- Line 1: Update AGENTS.md (and mirror in CLAUDE.md) to replace the one-line
rule with a clarified guideline: define "link" to include both clickable URLs
and plain-text references to GitHub issues/PRs, specify that the restriction
applies to external repositories but allows internal references when necessary
with justification, list the contexts covered (commit messages, code comments,
docs, PR descriptions), provide a short rationale (privacy/security/irrelevance)
and include 3 concrete examples showing a disallowed external GitHub link, an
allowed internal reference with justification, and a suggested alternative
(e.g., summarize the issue instead of linking).
In `@tokio/src/runtime/scheduler/inject/shared.rs`:
- Around line 40-44: The method visibility mismatch: Shared::is_closed is
compiled only with #[cfg(feature = "taskdump")] but Core::maintenance() calls
worker.inject().is_closed() unconditionally (see Core::maintenance,
worker.inject(), Inject::is_closed), causing build failures when rt-multi-thread
is enabled without taskdump; fix by making the API unconditional or guarding the
call. Either remove the cfg from Shared::is_closed (and the corresponding
Inject::is_closed) so the method always exists and simply return
synced.is_closed, or keep the cfg and add #[cfg(feature = "taskdump")] to the
call site in Core::maintenance (worker.inject().is_closed()), ensuring both
declaration and use are consistently guarded.
---
Nitpick comments:
In `@tokio/src/runtime/scheduler/inject/sharded.rs`:
- Around line 251-268: Add a brief inline comment inside fn next_push_shard
explaining why PUSH_SHARD.try_with unwraps to 0: note that when thread-local
access fails (e.g., during thread shutdown or teardown) we intentionally fall
back to shard 0 to remain safe, that this may cause temporary contention but is
acceptable, and mention the roles of PUSH_SHARD, UNASSIGNED, and NEXT_SHARD in
assigning per-thread shard indices so future maintainers understand the
rationale. Place the comment near the unwrap_or(0) fallback and reference
shard_mask behavior for the single-shard fast-path.
- Around line 132-148: Move the publication of the closed flag earlier so
concurrent push paths can observe the global closed state and bail out: in
close() call self.is_closed.store(true, Release) before iterating shards, then
iterate self.shards, take each shard.synced.lock(), and call
shard.shared.close(&mut synced) to compute was_open as before; keep using
shard.shared.close and shard.synced.lock so per-shard state is still set under
the lock and push_batch() still checks per-shard synced.is_closed under the
lock.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e6775ae4-2543-4479-9111-7cb1f5fc7757
📒 Files selected for processing (10)
AGENTS.mdCLAUDE.mdspellcheck.dictokio/src/runtime/scheduler/inject.rstokio/src/runtime/scheduler/inject/sharded.rstokio/src/runtime/scheduler/inject/shared.rstokio/src/runtime/scheduler/multi_thread/mod.rstokio/src/runtime/scheduler/multi_thread/worker.rstokio/src/runtime/scheduler/multi_thread/worker/taskdump.rstokio/src/runtime/task/trace/mod.rs
💤 Files with no reviewable changes (1)
- tokio/src/runtime/scheduler/multi_thread/mod.rs
| @@ -0,0 +1 @@ | |||
| Never link to any issue or a pull request in any GitHub repository | |||
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if CLAUDE.md was also added and examine any related context
fd -t f 'CLAUDE.md|AGENTS.md' -x cat {}Repository: martin-augment/tokio
Length of output: 197
🏁 Script executed:
git log --oneline --follow -- AGENTS.md CLAUDE.md | head -20Repository: martin-augment/tokio
Length of output: 109
🏁 Script executed:
git show HEAD:AGENTS.md 2>/dev/null || echo "File not in HEAD"
git show HEAD^:AGENTS.md 2>/dev/null || echo "File not in parent"Repository: martin-augment/tokio
Length of output: 149
🏁 Script executed:
# Check if there's a PR description or related documentation
find . -maxdepth 2 -type f \( -name "*.md" -o -name "*.txt" \) | head -20Repository: martin-augment/tokio
Length of output: 512
🏁 Script executed:
# Look for any references to AGENTS.md or CLAUDE.md in the codebase
rg -i "agents|claude" --type md --type txt | head -30Repository: martin-augment/tokio
Length of output: 46
Clarify the instruction and provide rationale.
Both AGENTS.md and CLAUDE.md contain an ambiguous, one-line instruction with no context or examples. The instruction needs clarification:
- What does "link" mean—hyperlinks only, or any reference to issues/PRs?
- Does "any GitHub repository" include this repository, or only external ones?
- What contexts does this apply to (commit messages, code comments, documentation)?
Add documentation explaining the rationale for this guideline and specific examples to help contributors understand when and how it applies.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@AGENTS.md` at line 1, Update AGENTS.md (and mirror in CLAUDE.md) to replace
the one-line rule with a clarified guideline: define "link" to include both
clickable URLs and plain-text references to GitHub issues/PRs, specify that the
restriction applies to external repositories but allows internal references when
necessary with justification, list the contexts covered (commit messages, code
comments, docs, PR descriptions), provide a short rationale
(privacy/security/irrelevance) and include 3 concrete examples showing a
disallowed external GitHub link, an allowed internal reference with
justification, and a suggested alternative (e.g., summarize the issue instead of
linking).
| // Kind of annoying to have to include the cfg here | ||
| #[cfg(any(feature = "taskdump", feature = "rt-multi-thread"))] | ||
| #[cfg(feature = "taskdump")] | ||
| pub(crate) fn is_closed(&self, synced: &Synced) -> bool { | ||
| synced.is_closed | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Verify that Shared::is_closed is not called from rt-multi-thread paths
# Search for calls to shared.is_closed or Shared::is_closed patterns
rg -n 'shared\.is_closed|Shared::is_closed' --type rust
# Also check for is_closed calls on inject::Shared specifically
rg -nC3 'inject::Shared' --type rust | head -100Repository: martin-augment/tokio
Length of output: 1353
🏁 Script executed:
cat -n tokio/src/runtime/scheduler/inject.rs | head -60Repository: martin-augment/tokio
Length of output: 2006
🏁 Script executed:
# Search more comprehensively for all is_closed calls
rg 'is_closed\(' --type rust -n
# Also check for any usage in rt-multi-thread specific code
rg -n 'is_closed' tokio/src/runtime/scheduler/rt_multi_thread.rs 2>/dev/null || echo "File not found or no matches"Repository: martin-augment/tokio
Length of output: 9450
🏁 Script executed:
# Check multi-thread worker context
sed -n '1205,1220p' tokio/src/runtime/scheduler/multi_thread/worker.rs
# Check current-thread context
sed -n '550,560p' tokio/src/runtime/scheduler/current_thread/mod.rsRepository: martin-augment/tokio
Length of output: 918
🏁 Script executed:
# Get wider context for multi_thread/worker.rs maintenance function
sed -n '1195,1225p' tokio/src/runtime/scheduler/multi_thread/worker.rs
# Get wider context for current_thread/mod.rs
sed -n '540,570p' tokio/src/runtime/scheduler/current_thread/mod.rsRepository: martin-augment/tokio
Length of output: 2246
🏁 Script executed:
# Check if maintenance() has cfg guards
grep -B10 'fn maintenance' tokio/src/runtime/scheduler/multi_thread/worker.rs | head -20
# Check if pre_shutdown() has cfg guards
grep -B10 'fn pre_shutdown' tokio/src/runtime/scheduler/current_thread/mod.rs | head -20
# More broadly, check what's around current_thread pre_shutdown
sed -n '500,560p' tokio/src/runtime/scheduler/current_thread/mod.rs | grep -E '#\[cfg|fn pre_shutdown|trace'Repository: martin-augment/tokio
Length of output: 764
🏁 Script executed:
# Get the full function signature for pre_shutdown with its cfg guards
sed -n '520,580p' tokio/src/runtime/scheduler/current_thread/mod.rsRepository: martin-augment/tokio
Length of output: 1954
🏁 Script executed:
# Check where maintenance() is called from in multi_thread/worker.rs
rg -n 'maintenance\(' tokio/src/runtime/scheduler/multi_thread/worker.rs
# Get context around those calls
rg -B5 'self.maintenance\(' tokio/src/runtime/scheduler/multi_thread/worker.rsRepository: martin-augment/tokio
Length of output: 532
🏁 Script executed:
# Check the context around line 566 where maintenance is called
sed -n '555,575p' tokio/src/runtime/scheduler/multi_thread/worker.rs
# Check the context around line 742 fn maintenance definition
sed -n '735,760p' tokio/src/runtime/scheduler/multi_thread/worker.rsRepository: martin-augment/tokio
Length of output: 1602
🏁 Script executed:
# Check what module Core is in and if it's in rt_multi_thread
grep -n 'struct Core' tokio/src/runtime/scheduler/multi_thread/worker.rs | head -5
# Check line 1207 more carefully - what's the full context
sed -n '1195,1220p' tokio/src/runtime/scheduler/multi_thread/worker.rsRepository: martin-augment/tokio
Length of output: 870
🏁 Script executed:
# Check if maintenance method or surrounding code has cfg guards
sed -n '1200,1230p' tokio/src/runtime/scheduler/multi_thread/worker.rs
# More importantly, check if Core::maintenance is used conditionally
rg -B5 -A15 'fn maintenance\(&mut self, worker: &Worker' tokio/src/runtime/scheduler/multi_thread/worker.rs | head -30Repository: martin-augment/tokio
Length of output: 1838
🏁 Script executed:
# Check if Inject::is_closed is called from Core::maintenance in multi_thread
# and whether Core::maintenance itself is guarded
grep -n 'impl.*Core' tokio/src/runtime/scheduler/multi_thread/worker.rs | head -5
# More importantly, verify the actual file structure
head -50 tokio/src/runtime/scheduler/multi_thread/worker.rsRepository: martin-augment/tokio
Length of output: 2381
🏁 Script executed:
# Check if Core::maintenance is guarded by cfg
sed -n '1005,1030p' tokio/src/runtime/scheduler/multi_thread/worker.rs
# Also check if the entire multi_thread module is guarded
head -30 tokio/src/runtime/scheduler/multi_thread/worker.rs
# Check if there's any cfg around the maintenance call at line 753
sed -n '750,760p' tokio/src/runtime/scheduler/multi_thread/worker.rsRepository: martin-augment/tokio
Length of output: 2504
🏁 Script executed:
# Check if there's a cfg guard around the is_closed call at line 1213
sed -n '1205,1220p' tokio/src/runtime/scheduler/multi_thread/worker.rs | cat -A
# Check Cargo.toml for feature dependencies
grep -A20 '\[features\]' tokio/Cargo.toml | head -40Repository: martin-augment/tokio
Length of output: 865
🏁 Script executed:
# Check if is_traced is also a taskdump-only field, which might indicate the whole code path is guarded
grep -n 'is_traced' tokio/src/runtime/scheduler/multi_thread/worker.rs | head -20
# Check if is_traced is set conditionally
rg 'is_traced.*=' tokio/src/runtime/scheduler/multi_thread/worker.rsRepository: martin-augment/tokio
Length of output: 475
🏁 Script executed:
# Check if there are multiple is_closed definitions - maybe one without cfg
rg -n 'pub.*fn is_closed' tokio/src/runtime/scheduler/inject.rs
# Check if cfg_rt_multi_thread macro has any relationship to taskdump
grep -n 'cfg_rt_multi_thread' tokio/src/runtime/scheduler/inject.rs
# More importantly, check if there's conditional compilation around the is_closed call
rg -B10 'worker.inject\(\).is_closed' tokio/src/runtime/scheduler/multi_thread/worker.rsRepository: martin-augment/tokio
Length of output: 511
🏁 Script executed:
# Check if maybe is_closed is conditionally compiled elsewhere
rg -n '#\[cfg' tokio/src/runtime/scheduler/inject.rs
# Check if there's any guard around the call itself
sed -n '1210,1220p' tokio/src/runtime/scheduler/multi_thread/worker.rs
# Most importantly: check if this code compiles by looking at test results or if there are feature combinations
grep -n 'rt-multi-thread' tokio/Cargo.toml | head -10Repository: martin-augment/tokio
Length of output: 491
The cfg narrowing appears incorrect—worker.inject().is_closed() is called unconditionally from Core::maintenance() in the rt-multi-thread scheduler.
The review assumes no multi-thread code path calls Shared::is_closed, but evidence shows Core::maintenance() at tokio/src/runtime/scheduler/multi_thread/worker.rs:1213 calls worker.inject().is_closed() without any cfg guard. Since Inject::is_closed() is guarded by #[cfg(feature = "taskdump")], this would cause a compilation failure when rt-multi-thread is enabled without the taskdump feature. Verify whether there's a second unguarded implementation of is_closed() or whether this call should also be guarded by taskdump.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tokio/src/runtime/scheduler/inject/shared.rs` around lines 40 - 44, The
method visibility mismatch: Shared::is_closed is compiled only with
#[cfg(feature = "taskdump")] but Core::maintenance() calls
worker.inject().is_closed() unconditionally (see Core::maintenance,
worker.inject(), Inject::is_closed), causing build failures when rt-multi-thread
is enabled without taskdump; fix by making the API unconditional or guarding the
call. Either remove the cfg from Shared::is_closed (and the corresponding
Inject::is_closed) so the method always exists and simply return
synced.is_closed, or keep the cfg and add #[cfg(feature = "taskdump")] to the
call site in Core::maintenance (worker.inject().is_closed()), ensuring both
declaration and use are consistently guarded.
There was a problem hiding this comment.
value:annoying; category:bug; feedback: The CodeRabbit AI reviewer is not correct! The PR changed worker.rs:1213 to use the new Sharded::is_closed() that uses an AtomicBool to track whether the shard is closed, so there is no need of the cfg gating.
value:annoying; category:bug; feedback: The Claude AI reviewer is not correct! Both |
value:good-to-have; category:bug; feedback: The Claude AI reviewer is correct! Adding the same check for an empty queue will save few CPUs cycles also for the pop() operation. Prevents extra work that will do nothing at the end. |
value:good-to-have; category:bug; feedback: The Claude AI reviewer is correct! By using the local boolean flag the access to the atomic boolean could be prevented. It is a minor optimization of the CPU usage. |


7973: To review by AI