From c639dd78de91178b6f7ac9348fffaf50e8dbd941 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Thu, 19 Feb 2026 15:28:38 -0500 Subject: [PATCH 1/3] runtime: add `worker_id()` getter Add `tokio::runtime::worker_id()` which returns the 0-based index of the current runtime worker thread. The index matches the worker indices used by `RuntimeMetrics` (e.g. `worker_total_busy_duration`), making it possible to correlate per-worker metrics with specific tasks. Returns `None` from blocking threads, the `block_on` caller thread (on the multi-thread runtime), and non-Tokio threads. For the current-thread runtime, always returns `Some(0)`. --- tokio/src/runtime/context.rs | 5 + tokio/src/runtime/mod.rs | 34 +++++++ tokio/src/runtime/scheduler/mod.rs | 9 ++ .../runtime/scheduler/multi_thread/worker.rs | 5 + tokio/tests/rt_worker_id.rs | 98 +++++++++++++++++++ 5 files changed, 151 insertions(+) create mode 100644 tokio/tests/rt_worker_id.rs diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index d78935e7243..5c0d9d816b6 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -159,6 +159,11 @@ cfg_rt! { CONTEXT.try_with(|ctx| ctx.current_task_id.get()).unwrap_or(None) } + #[cfg(tokio_unstable)] + pub(crate) fn worker_id() -> Option { + with_scheduler(|ctx| ctx.and_then(|c| c.worker_id())) + } + #[track_caller] pub(crate) fn defer(waker: &Waker) { with_scheduler(|maybe_scheduler| { diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 77cf183dc56..bdce5dd1217 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -569,6 +569,40 @@ cfg_rt! { mod local_runtime; pub use local_runtime::{LocalRuntime, LocalOptions}; + + /// Returns the index of the current worker thread, if called from a + /// runtime worker thread. + /// + /// The returned value is a 0-based index matching the worker indices + /// used by [`RuntimeMetrics`] methods such as + /// [`worker_total_busy_duration`](RuntimeMetrics::worker_total_busy_duration). + /// + /// Returns `None` when called from outside a runtime worker thread + /// (for example, from a blocking thread or a non-Tokio thread). On the + /// multi-thread runtime, the thread that calls [`Runtime::block_on`] is + /// not a worker thread, so this also returns `None` there. + /// + /// For the current-thread runtime, this always returns `Some(0)` + /// (including inside `block_on`, since the calling thread *is* the + /// worker thread). + /// + /// # Examples + /// + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// #[tokio::main(flavor = "multi_thread", worker_threads = 4)] + /// async fn main() { + /// let id = tokio::spawn(async { + /// tokio::runtime::worker_id() + /// }).await.unwrap(); + /// println!("Task ran on worker {:?}", id); + /// } + /// # } + /// ``` + pub fn worker_id() -> Option { + context::worker_id() + } } cfg_taskdump! { diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 3f142120d33..2db10b709f5 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -293,6 +293,15 @@ cfg_rt! { match_flavor!(self, Context(context) => context.defer(waker)); } + #[cfg(tokio_unstable)] + pub(crate) fn worker_id(&self) -> Option { + match self { + Context::CurrentThread(_) => Some(0), + #[cfg(feature = "rt-multi-thread")] + Context::MultiThread(context) => Some(context.worker_index()), + } + } + #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))] pub(crate) fn with_time_temp_local_context(&self, f: F) -> R where diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index f48e6ba5271..72bdc2bd31c 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -1006,6 +1006,11 @@ impl Context { None => f(None), }) } + + #[cfg(tokio_unstable)] + pub(crate) fn worker_index(&self) -> usize { + self.worker.index + } } impl Core { diff --git a/tokio/tests/rt_worker_id.rs b/tokio/tests/rt_worker_id.rs new file mode 100644 index 00000000000..2cb4f3206e3 --- /dev/null +++ b/tokio/tests/rt_worker_id.rs @@ -0,0 +1,98 @@ +#![warn(rust_2018_idioms)] +#![cfg(all( + feature = "full", + tokio_unstable, + not(target_os = "wasi"), + target_has_atomic = "64" +))] + +use tokio::runtime::{self, Runtime}; + +#[test] +fn worker_id_multi_thread() { + let rt = Runtime::new().unwrap(); + rt.block_on(async { + let id = tokio::task::spawn(async { runtime::worker_id() }) + .await + .unwrap(); + let num_workers = rt.metrics().num_workers(); + let id = id.expect("should be Some on worker thread"); + assert!( + id < num_workers, + "worker_id {id} >= num_workers {num_workers}" + ); + }); +} + +#[test] +fn worker_id_current_thread() { + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let id = runtime::worker_id(); + assert_eq!(id, Some(0)); + }); +} + +#[test] +fn worker_id_outside_runtime() { + assert_eq!(runtime::worker_id(), None); +} + +#[test] +fn worker_id_matches_metrics_worker_thread_id() { + let rt = runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .unwrap(); + let metrics = rt.metrics(); + + rt.block_on(async { + // Spawn a task and verify the worker_id matches the metrics index + tokio::task::spawn(async move { + let wid = runtime::worker_id().expect("should be on worker thread"); + let current_thread = std::thread::current().id(); + let metrics_thread = metrics.worker_thread_id(wid); + assert_eq!( + metrics_thread, + Some(current_thread), + "worker_id() returned {wid} but metrics.worker_thread_id({wid}) \ + does not match current thread" + ); + }) + .await + .unwrap(); + }); +} + +#[test] +fn worker_id_from_spawn_blocking() { + let rt = Runtime::new().unwrap(); + rt.block_on(async { + let id = tokio::task::spawn_blocking(runtime::worker_id) + .await + .unwrap(); + assert_eq!(id, None, "spawn_blocking should not be on a worker thread"); + }); +} + +#[test] +fn worker_id_block_on_multi_thread() { + let rt = Runtime::new().unwrap(); + // block_on runs on the calling thread, not a worker thread + let id = rt.block_on(async { runtime::worker_id() }); + assert_eq!( + id, None, + "block_on thread is not a worker thread on multi-thread runtime" + ); +} + +#[tokio::main(flavor = "current_thread")] +#[test] +async fn worker_id_tokio_main_current_thread() { + // current_thread block_on runs on the worker, so this is Some(0) + assert_eq!(runtime::worker_id(), Some(0)); +} From 57d798c86cb20e537850bafe4b5bd096a7740ef7 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Sat, 21 Feb 2026 20:15:28 -0500 Subject: [PATCH 2/3] Rename worker_id to worker_index --- tokio/src/runtime/context.rs | 4 +- tokio/src/runtime/mod.rs | 10 ++-- tokio/src/runtime/scheduler/mod.rs | 2 +- .../{rt_worker_id.rs => rt_worker_index.rs} | 46 +++++++++---------- 4 files changed, 31 insertions(+), 31 deletions(-) rename tokio/tests/{rt_worker_id.rs => rt_worker_index.rs} (54%) diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 5c0d9d816b6..3bf9919404f 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -160,8 +160,8 @@ cfg_rt! { } #[cfg(tokio_unstable)] - pub(crate) fn worker_id() -> Option { - with_scheduler(|ctx| ctx.and_then(|c| c.worker_id())) + pub(crate) fn worker_index() -> Option { + with_scheduler(|ctx| ctx.and_then(|c| c.worker_index())) } #[track_caller] diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index bdce5dd1217..f1cc39b3d92 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -593,15 +593,15 @@ cfg_rt! { /// # { /// #[tokio::main(flavor = "multi_thread", worker_threads = 4)] /// async fn main() { - /// let id = tokio::spawn(async { - /// tokio::runtime::worker_id() + /// let index = tokio::spawn(async { + /// tokio::runtime::worker_index() /// }).await.unwrap(); - /// println!("Task ran on worker {:?}", id); + /// println!("Task ran on worker {:?}", index); /// } /// # } /// ``` - pub fn worker_id() -> Option { - context::worker_id() + pub fn worker_index() -> Option { + context::worker_index() } } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 2db10b709f5..f991e8abda7 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -294,7 +294,7 @@ cfg_rt! { } #[cfg(tokio_unstable)] - pub(crate) fn worker_id(&self) -> Option { + pub(crate) fn worker_index(&self) -> Option { match self { Context::CurrentThread(_) => Some(0), #[cfg(feature = "rt-multi-thread")] diff --git a/tokio/tests/rt_worker_id.rs b/tokio/tests/rt_worker_index.rs similarity index 54% rename from tokio/tests/rt_worker_id.rs rename to tokio/tests/rt_worker_index.rs index 2cb4f3206e3..116bc2f8d2a 100644 --- a/tokio/tests/rt_worker_id.rs +++ b/tokio/tests/rt_worker_index.rs @@ -9,40 +9,40 @@ use tokio::runtime::{self, Runtime}; #[test] -fn worker_id_multi_thread() { +fn worker_index_multi_thread() { let rt = Runtime::new().unwrap(); rt.block_on(async { - let id = tokio::task::spawn(async { runtime::worker_id() }) + let index = tokio::task::spawn(async { runtime::worker_index() }) .await .unwrap(); let num_workers = rt.metrics().num_workers(); - let id = id.expect("should be Some on worker thread"); + let index = index.expect("should be Some on worker thread"); assert!( - id < num_workers, - "worker_id {id} >= num_workers {num_workers}" + index < num_workers, + "worker_index {index} >= num_workers {num_workers}" ); }); } #[test] -fn worker_id_current_thread() { +fn worker_index_current_thread() { let rt = runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); rt.block_on(async { - let id = runtime::worker_id(); - assert_eq!(id, Some(0)); + let index = runtime::worker_index(); + assert_eq!(index, Some(0)); }); } #[test] -fn worker_id_outside_runtime() { - assert_eq!(runtime::worker_id(), None); +fn worker_index_outside_runtime() { + assert_eq!(runtime::worker_index(), None); } #[test] -fn worker_id_matches_metrics_worker_thread_id() { +fn worker_index_matches_metrics_worker_thread_id() { let rt = runtime::Builder::new_multi_thread() .worker_threads(4) .enable_all() @@ -51,15 +51,15 @@ fn worker_id_matches_metrics_worker_thread_id() { let metrics = rt.metrics(); rt.block_on(async { - // Spawn a task and verify the worker_id matches the metrics index + // Spawn a task and verify the worker_index matches the metrics index tokio::task::spawn(async move { - let wid = runtime::worker_id().expect("should be on worker thread"); + let index = runtime::worker_index().expect("should be on worker thread"); let current_thread = std::thread::current().id(); - let metrics_thread = metrics.worker_thread_id(wid); + let metrics_thread = metrics.worker_thread_id(index); assert_eq!( metrics_thread, Some(current_thread), - "worker_id() returned {wid} but metrics.worker_thread_id({wid}) \ + "worker_index() returned {index} but metrics.worker_thread_id({index}) \ does not match current thread" ); }) @@ -69,30 +69,30 @@ fn worker_id_matches_metrics_worker_thread_id() { } #[test] -fn worker_id_from_spawn_blocking() { +fn worker_index_from_spawn_blocking() { let rt = Runtime::new().unwrap(); rt.block_on(async { - let id = tokio::task::spawn_blocking(runtime::worker_id) + let index = tokio::task::spawn_blocking(runtime::worker_index) .await .unwrap(); - assert_eq!(id, None, "spawn_blocking should not be on a worker thread"); + assert_eq!(index, None, "spawn_blocking should not be on a worker thread"); }); } #[test] -fn worker_id_block_on_multi_thread() { +fn worker_index_block_on_multi_thread() { let rt = Runtime::new().unwrap(); // block_on runs on the calling thread, not a worker thread - let id = rt.block_on(async { runtime::worker_id() }); + let index = rt.block_on(async { runtime::worker_index() }); assert_eq!( - id, None, + index, None, "block_on thread is not a worker thread on multi-thread runtime" ); } #[tokio::main(flavor = "current_thread")] #[test] -async fn worker_id_tokio_main_current_thread() { +async fn worker_index_tokio_main_current_thread() { // current_thread block_on runs on the worker, so this is Some(0) - assert_eq!(runtime::worker_id(), Some(0)); + assert_eq!(runtime::worker_index(), Some(0)); } From f6c57e4456d4e87ee810645b690e42a0aa989d4a Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Wed, 25 Feb 2026 09:41:59 -0500 Subject: [PATCH 3/3] Format code --- tokio/tests/rt_worker_index.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/tokio/tests/rt_worker_index.rs b/tokio/tests/rt_worker_index.rs index 116bc2f8d2a..873c5247d4a 100644 --- a/tokio/tests/rt_worker_index.rs +++ b/tokio/tests/rt_worker_index.rs @@ -75,7 +75,10 @@ fn worker_index_from_spawn_blocking() { let index = tokio::task::spawn_blocking(runtime::worker_index) .await .unwrap(); - assert_eq!(index, None, "spawn_blocking should not be on a worker thread"); + assert_eq!( + index, None, + "spawn_blocking should not be on a worker thread" + ); }); } @@ -89,10 +92,3 @@ fn worker_index_block_on_multi_thread() { "block_on thread is not a worker thread on multi-thread runtime" ); } - -#[tokio::main(flavor = "current_thread")] -#[test] -async fn worker_index_tokio_main_current_thread() { - // current_thread block_on runs on the worker, so this is Some(0) - assert_eq!(runtime::worker_index(), Some(0)); -}