Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ pub struct Builder {
pub(super) unhandled_panic: UnhandledPanic,

timer_flavor: TimerFlavor,

/// Whether or not to enable eager hand-off for the I/O and time drivers (in
/// `tokio_unstable`).
enable_eager_driver_handoff: bool,
}

cfg_unstable! {
Expand Down Expand Up @@ -334,6 +338,9 @@ impl Builder {
disable_lifo_slot: false,

timer_flavor: TimerFlavor::Traditional,

// Eager driver handoff is disabled by default.
enable_eager_driver_handoff: false,
}
}

Expand Down Expand Up @@ -414,6 +421,40 @@ impl Builder {
self
}

/// Enable eager hand-off of the I/O and time drivers for multi-threaded
/// runtimes, which is disabled by default.
///
/// When this option is enabled, a worker thread which has parked on the I/O
/// or time driver will notify another worker thread once it is preparing to
/// begin polling a task from the run queue, so that the notified worker can
/// begin polling the I/O or time driver. This can reduce the latency with
/// which I/O and timer notifications are processed, especially when some
/// tasks have polls that take a long time to complete. In addition, it can
/// reduce the risk of a deadlock which may occur when a task blocks the
/// worker thread which is holding the I/O or time driver until some other
/// task, which is waiting for a notification from *that* driver, unblocks
/// it.
///
/// This option is disabled by default, as enabling it may potentially
/// increase contention due to extra synchronization in cross-driver
/// wakeups.
///
/// This option only applies to multi-threaded runtimes. Attempting to use
/// this option with any other runtime type will have no effect.
///
/// **Note**: This is an [unstable API][unstable]. Eager driver hand-off is
/// an experimental feature whose behavior may be removed or changed in 1.x
/// releases. See [the documentation on unstable features][unstable] for
/// details.
///
/// [unstable]: crate#unstable-features
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "rt-multi-thread"))))]
pub fn enable_eager_driver_handoff(&mut self) -> &mut Self {
self.enable_eager_driver_handoff = true;
self
}

/// Sets the number of worker threads the `Runtime` will use.
///
/// This can be any number above 0 though it is advised to keep this value
Expand Down Expand Up @@ -1661,6 +1702,10 @@ impl Builder {
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
// This setting never makes sense for a current thread runtime,
// as it only configures how the I/O driver is stolen across
// workers.
enable_eager_driver_handoff: false,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
Expand Down Expand Up @@ -1843,6 +1888,7 @@ cfg_rt_multi_thread! {
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
enable_eager_driver_handoff: self.enable_eager_driver_handoff,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
Expand Down Expand Up @@ -1880,7 +1926,11 @@ impl fmt::Debug for Builder {
.field("after_start", &self.after_start.as_ref().map(|_| "..."))
.field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
.field("before_park", &self.before_park.as_ref().map(|_| "..."))
.field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."));
.field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
.field(
"enable_eager_driver_handoff",
&self.enable_eager_driver_handoff,
);

if self.name.is_none() {
debug.finish_non_exhaustive()
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/runtime/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ pub(crate) struct Config {
#[cfg(tokio_unstable)]
/// How to respond to unhandled task panics.
pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,

/// If `true`, an idle worker is woken whenever a worker thread transitions
/// from polling the I/O driver to polling its own tasks (requires
/// `tokio_unstable`).
pub(crate) enable_eager_driver_handoff: bool,
}
36 changes: 26 additions & 10 deletions tokio/src/runtime/scheduler/multi_thread/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ pub(crate) struct Unparker {
inner: Arc<Inner>,
}

/// Represents how a worker thread was parked
#[derive(Copy, Clone, Eq, PartialEq)]
pub(crate) enum HadDriver {
Yes,
No,
}

struct Inner {
/// Avoids entering the park if possible
state: AtomicUsize,
Expand Down Expand Up @@ -66,27 +73,33 @@ impl Parker {
}
}

pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.inner.park(handle);
pub(crate) fn park(&mut self, handle: &driver::Handle) -> HadDriver {
self.inner.park(handle)
}

/// Parks the current thread for up to `duration`.
///
/// This function tries to acquire the driver lock. If it succeeds, it
/// parks using the driver. Otherwise, it fails back to using a condvar,
/// unless the duration is zero, in which case it returns immediately.
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
pub(crate) fn park_timeout(
&mut self,
handle: &driver::Handle,
duration: Duration,
) -> HadDriver {
if let Some(mut driver) = self.inner.shared.driver.try_lock() {
self.inner.park_driver(&mut driver, handle, Some(duration));
self.inner.park_driver(&mut driver, handle, Some(duration))
} else if !duration.is_zero() {
self.inner.park_condvar(Some(duration));
HadDriver::No
} else {
// https://github.com/tokio-rs/tokio/issues/6536
// Hacky, but it's just for loom tests. The counter gets incremented during
// `park_timeout`, but we still have to increment the counter if we can't acquire the
// lock.
#[cfg(loom)]
CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
HadDriver::No
}
}

Expand Down Expand Up @@ -116,21 +129,22 @@ impl Unparker {

impl Inner {
/// Parks the current thread for at most `dur`.
fn park(&self, handle: &driver::Handle) {
fn park(&self, handle: &driver::Handle) -> HadDriver {
// If we were previously notified then we consume this notification and
// return quickly.
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
return HadDriver::No;
}

if let Some(mut driver) = self.shared.driver.try_lock() {
self.park_driver(&mut driver, handle, None);
self.park_driver(&mut driver, handle, None)
} else {
self.park_condvar(None);
HadDriver::No
}
}

Expand Down Expand Up @@ -216,12 +230,12 @@ impl Inner {
driver: &mut Driver,
handle: &driver::Handle,
duration: Option<Duration>,
) {
) -> HadDriver {
if duration.as_ref().is_some_and(Duration::is_zero) {
// zero duration doesn't actually park the thread, it just
// polls the I/O events, timers, etc.
driver.park_timeout(handle, Duration::ZERO);
return;
return HadDriver::Yes;
}

match self
Expand All @@ -239,7 +253,7 @@ impl Inner {
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");

return;
return HadDriver::No;
}
Err(actual) => panic!("inconsistent park state; actual = {actual}"),
}
Expand All @@ -256,6 +270,8 @@ impl Inner {
PARKED_DRIVER => {} // no notification, alas
n => panic!("inconsistent park_timeout state: {n}"),
}

HadDriver::Yes
}

fn unpark(&self, driver: &driver::Handle) {
Expand Down
69 changes: 57 additions & 12 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
use crate::loom::sync::{Arc, Mutex};
use crate::runtime;
use crate::runtime::scheduler::multi_thread::{
idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
idle, park, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
};
use crate::runtime::scheduler::{inject, Defer, Lock};
use crate::runtime::task::OwnedTasks;
Expand Down Expand Up @@ -132,6 +132,15 @@ struct Core {
/// True if the scheduler is being traced
is_traced: bool,

/// Whether or not the worker has just returned from a park in which we
/// parked on the I/O driver.
had_driver: park::HadDriver,

/// If `true`, the worker should eagerly notify another worker when polling
/// the first task after returning from a park in which it parked on the I/O
/// or time driver.
enable_eager_driver_handoff: bool,

/// Parker
///
/// Stored in an `Option` as the parker is added / removed to make the
Expand Down Expand Up @@ -280,6 +289,8 @@ pub(super) fn create(
is_searching: false,
is_shutdown: false,
is_traced: false,
enable_eager_driver_handoff: config.enable_eager_driver_handoff,
had_driver: park::HadDriver::No,
park: Some(park),
global_queue_interval: stats.tuned_global_queue_interval(&config),
stats,
Expand Down Expand Up @@ -616,7 +627,30 @@ impl Context {

// Make sure the worker is not in the **searching** state. This enables
// another idle worker to try to steal work.
core.transition_from_searching(&self.worker);
let notified_parked_worker = core.transition_from_searching(&self.worker);

// If the setting to wake eagerly when releasing the I/O driver is
// enabled, and this worker had the driver, wake a parked worker to come
// grab it from us.
//
// Note that this is only done when we are *actually* about to poll a
// task, rather than whenever the worker has unparked. When the worker
// has been unparked, it may not actually have any tasks to poll, and if
// it's still holding the I/O driver, it should just go back to polling
// the driver again, rather than trying to wake someone else spuriously.
//
// Note that this explicitly checks `cfg!(tokio_unstable)` in addition,
// as that should result in this whole expression being eliminated at
// compile-time when unstable features are disabled.
if cfg!(tokio_unstable)
&& core.enable_eager_driver_handoff
&& core.had_driver == park::HadDriver::Yes
&& !notified_parked_worker
// don't do it a second time
{
core.had_driver = park::HadDriver::No;
self.worker.handle.notify_parked_local();
}
Comment on lines +645 to +653
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The had_driver state should be reset to No even if notified_parked_worker is true. If a worker transitions from searching and notifies another worker, it has already fulfilled the "handoff" requirement. If had_driver is not reset in this case, the worker might trigger a redundant notification when polling the next task in its queue, as notified_parked_worker will be false for subsequent tasks while had_driver remains Yes.

Suggested change
if cfg!(tokio_unstable)
&& core.enable_eager_driver_handoff
&& core.had_driver == park::HadDriver::Yes
&& !notified_parked_worker
// don't do it a second time
{
core.had_driver = park::HadDriver::No;
self.worker.handle.notify_parked_local();
}
if cfg!(tokio_unstable)
&& core.enable_eager_driver_handoff
&& core.had_driver == park::HadDriver::Yes
{
core.had_driver = park::HadDriver::No;
if !notified_parked_worker {
self.worker.handle.notify_parked_local();
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had_driver Not Reset When Searching Transition Notifies

Medium Severity

When transition_from_searching already notified a parked worker (notified_parked_worker == true), the eager-handoff block is skipped — but core.had_driver is never reset to HadDriver::No. On the very next call to run_task for the following task, is_searching is already false, so transition_from_searching returns false, making !notified_parked_worker true, and the condition fires an extra spurious notify_parked_local(). The had_driver reset should happen unconditionally whenever had_driver == HadDriver::Yes, with the additional notify_parked_local() call guarded only by !notified_parked_worker.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit e277211. Configure here.

Comment on lines +630 to +653
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reset had_driver on the first post-park task, even when searching already woke someone.

If transition_from_searching() returns true, this first task already consumed the post-driver state, but core.had_driver stays Yes. The next run_task can then call notify_parked_local() again from the same park cycle, which defeats the intended one-shot handoff and adds extra wakeups.

💡 Suggested fix
         let notified_parked_worker = core.transition_from_searching(&self.worker);
+        let had_driver = core.had_driver == park::HadDriver::Yes;
+        core.had_driver = park::HadDriver::No;
 
         // If the setting to wake eagerly when releasing the I/O driver is
         // enabled, and this worker had the driver, wake a parked worker to come
         // grab it from us.
@@
         if cfg!(tokio_unstable)
             && core.enable_eager_driver_handoff
-            && core.had_driver == park::HadDriver::Yes
+            && had_driver
             && !notified_parked_worker
-        // don't do it a second time
         {
-            core.had_driver = park::HadDriver::No;
             self.worker.handle.notify_parked_local();
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let notified_parked_worker = core.transition_from_searching(&self.worker);
// If the setting to wake eagerly when releasing the I/O driver is
// enabled, and this worker had the driver, wake a parked worker to come
// grab it from us.
//
// Note that this is only done when we are *actually* about to poll a
// task, rather than whenever the worker has unparked. When the worker
// has been unparked, it may not actually have any tasks to poll, and if
// it's still holding the I/O driver, it should just go back to polling
// the driver again, rather than trying to wake someone else spuriously.
//
// Note that this explicitly checks `cfg!(tokio_unstable)` in addition,
// as that should result in this whole expression being eliminated at
// compile-time when unstable features are disabled.
if cfg!(tokio_unstable)
&& core.enable_eager_driver_handoff
&& core.had_driver == park::HadDriver::Yes
&& !notified_parked_worker
// don't do it a second time
{
core.had_driver = park::HadDriver::No;
self.worker.handle.notify_parked_local();
}
let notified_parked_worker = core.transition_from_searching(&self.worker);
let had_driver = core.had_driver == park::HadDriver::Yes;
core.had_driver = park::HadDriver::No;
// If the setting to wake eagerly when releasing the I/O driver is
// enabled, and this worker had the driver, wake a parked worker to come
// grab it from us.
//
// Note that this is only done when we are *actually* about to poll a
// task, rather than whenever the worker has unparked. When the worker
// has been unparked, it may not actually have any tasks to poll, and if
// it's still holding the I/O driver, it should just go back to polling
// the driver again, rather than trying to wake someone else spuriously.
//
// Note that this explicitly checks `cfg!(tokio_unstable)` in addition,
// as that should result in this whole expression being eliminated at
// compile-time when unstable features are disabled.
if cfg!(tokio_unstable)
&& core.enable_eager_driver_handoff
&& had_driver
&& !notified_parked_worker
{
self.worker.handle.notify_parked_local();
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/src/runtime/scheduler/multi_thread/worker.rs` around lines 630 - 653,
transition_from_searching() can return true meaning the post-driver state was
already consumed, but core.had_driver remains Yes and can cause a second notify
later; change the logic so that when cfg!(tokio_unstable) &&
core.enable_eager_driver_handoff && core.had_driver == park::HadDriver::Yes you
always reset core.had_driver = park::HadDriver::No immediately, and only call
self.worker.handle.notify_parked_local() conditionally when
notified_parked_worker is false. In other words, move or duplicate the
core.had_driver = No assignment out of the inner conditional so the had_driver
flag is cleared on the first post-park task, and keep notify_parked_local
guarded by !notified_parked_worker; refer to transition_from_searching,
core.had_driver, core.enable_eager_driver_handoff, notified_parked_worker, and
self.worker.handle.notify_parked_local.


self.assert_lifo_enabled_is_correct(&core);

Expand Down Expand Up @@ -825,11 +859,11 @@ impl Context {
};

// Park thread
if let Some(timeout) = duration {
park.park_timeout(&self.worker.handle.driver, timeout);
let had_driver = if let Some(timeout) = duration {
park.park_timeout(&self.worker.handle.driver, timeout)
} else {
park.park(&self.worker.handle.driver);
}
park.park(&self.worker.handle.driver)
};

self.defer.wake();

Expand All @@ -854,6 +888,8 @@ impl Context {

// Place `park` back in `core`
core.park = Some(park);
core.had_driver = had_driver;

if core.should_notify_others() {
self.worker.handle.notify_parked_local();
}
Expand Down Expand Up @@ -1117,13 +1153,13 @@ impl Core {
self.is_searching
}

fn transition_from_searching(&mut self, worker: &Worker) {
fn transition_from_searching(&mut self, worker: &Worker) -> bool {
if !self.is_searching {
return;
return false;
}

self.is_searching = false;
worker.handle.transition_worker_from_searching();
worker.handle.transition_worker_from_searching()
}

fn has_tasks(&self) -> bool {
Expand Down Expand Up @@ -1370,12 +1406,18 @@ impl Handle {
}
}

fn notify_parked_local(&self) {
/// Notify a parked worker.
///
/// Returns `true` if a worker was notified, `false` otherwise.
fn notify_parked_local(&self) -> bool {
super::counters::inc_num_inc_notify_local();

if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
super::counters::inc_num_unparks_local();
self.shared.remotes[index].unpark.unpark(&self.driver);
true
} else {
false
}
}

Expand Down Expand Up @@ -1404,11 +1446,14 @@ impl Handle {
}
}

fn transition_worker_from_searching(&self) {
/// Returns `true` if another parked worker was notified, `false` otherwise.
fn transition_worker_from_searching(&self) -> bool {
if self.shared.idle.transition_worker_from_searching() {
// We are the final searching worker. Because work was found, we
// need to notify another worker.
self.notify_parked_local();
self.notify_parked_local()
} else {
false
}
}

Expand Down
Loading
Loading