From 096b3a4a10095bdb5661c6e9faad4603fa228079 Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Thu, 5 Mar 2026 12:09:34 -0500 Subject: [PATCH 1/8] metrics: add task schedule latency metric This new metric tracks the amount of time between when a task is scheduled and when it is polled, also known as queue delay. This duration is recorded in a histogram, just like the poll time metric. This metric is useful for implementing queue management algorithms in systems using tokio. For example, it could be used to implement a generic http load shedder using the CoDel algorithm. --- tokio/src/runtime/builder.rs | 137 +++++++++++++ tokio/src/runtime/config.rs | 3 + tokio/src/runtime/metrics/batch.rs | 24 ++- tokio/src/runtime/metrics/runtime.rs | 184 ++++++++++++++++++ tokio/src/runtime/metrics/worker.rs | 7 + .../runtime/scheduler/current_thread/mod.rs | 51 +++-- .../runtime/scheduler/multi_thread/stats.rs | 4 +- .../runtime/scheduler/multi_thread/worker.rs | 13 +- tokio/src/runtime/task/core.rs | 27 +++ tokio/src/runtime/task/mod.rs | 14 ++ tokio/tests/rt_unstable_metrics.rs | 47 +++++ 11 files changed, 488 insertions(+), 23 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index aea4de7503e..8519cc2027f 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -133,6 +133,10 @@ pub struct Builder { /// Configures the task poll count histogram pub(super) metrics_poll_count_histogram: HistogramBuilder, + pub(super) metrics_schedule_latency_histogram_enable: bool, + + pub(super) metrics_schedule_latency_histogram: HistogramBuilder, + #[cfg(tokio_unstable)] pub(super) unhandled_panic: UnhandledPanic, @@ -323,6 +327,10 @@ impl Builder { metrics_poll_count_histogram: HistogramBuilder::default(), + metrics_schedule_latency_histogram_enable: false, + + metrics_schedule_latency_histogram: HistogramBuilder::default(), + disable_lifo_slot: false, timer_flavor: TimerFlavor::Traditional, @@ -1556,6 +1564,124 @@ impl Builder { self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets); self } + + /// Enables tracking the distribution of task schedule latencies. Task + /// schedule latency is the time between when a task is scheduled for + /// execution and when it is polled. + /// + /// Task schedule latencies are not instrumented by default as doing + /// so requires calling [`Instant::now()`] twice per task poll, which + /// could add measurable overhead. Use the [`Handle::metrics()`] to + /// access the metrics data. + /// + /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. + /// This has an extremely low memory footprint, but may not provide enough granularity. For + /// better granularity with low memory usage, use [`metrics_schedule_latency_histogram_configuration()`] + /// to select [`LogHistogram`] instead. + /// + /// # Examples + /// + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap(); + /// # // Test default values here + /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) } + /// # let m = rt.handle().metrics(); + /// # assert_eq!(m.schedule_latency_histogram_num_buckets(), 10); + /// # assert_eq!(m.schedule_latency_histogram_bucket_range(0), us(0)..us(100)); + /// # assert_eq!(m.schedule_latency_histogram_bucket_range(1), us(100)..us(200)); + /// # } + /// ``` + /// + /// [`Handle::metrics()`]: crate::runtime::Handle::metrics + /// [`Instant::now()`]: std::time::Instant::now + /// [`LogHistogram`]: crate::runtime::LogHistogram + /// [`metrics_schedule_latency_histogram_configuration()`]: Builder::metrics_schedule_latency_histogram_configuration + pub fn enable_metrics_schedule_latency_histogram(&mut self) -> &mut Self { + self.metrics_schedule_latency_histogram_enable = true; + self + } + + /// Configure the histogram for tracking task schedule latencies. + /// + /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. + /// This has an extremely low memory footprint, but may not provide enough granularity. For + /// better granularity with low memory usage, use [`LogHistogram`] instead. + /// + /// # Examples + /// Configure a [`LogHistogram`] with [default configuration]: + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::log(LogHistogram::default()) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// Configure a linear histogram with 100 buckets, each 10μs wide + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// use std::time::Duration; + /// use tokio::runtime::HistogramConfiguration; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::linear(Duration::from_micros(10), 100) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// Configure a [`LogHistogram`] with the following settings: + /// - Measure times from 100ns to 120s + /// - Max error of 0.1 + /// - No more than 1024 buckets + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use std::time::Duration; + /// use tokio::runtime; + /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::log(LogHistogram::builder() + /// .max_value(Duration::from_secs(120)) + /// .min_value(Duration::from_nanos(100)) + /// .max_error(0.1) + /// .max_buckets(1024) + /// .expect("configuration uses 488 buckets") + /// ) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// [`LogHistogram`]: crate::runtime::LogHistogram + pub fn metrics_schedule_latency_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self { + self.metrics_schedule_latency_histogram.histogram_type = configuration.inner; + self + } } fn build_current_thread_runtime(&mut self) -> io::Result { @@ -1631,6 +1757,8 @@ impl Builder { disable_lifo_slot: self.disable_lifo_slot, seed_generator: seed_generator_1, metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), + metrics_schedule_latency_histogram: self + .metrics_schedule_latency_histogram_builder(), }, local_tid, ); @@ -1649,6 +1777,14 @@ impl Builder { None } } + + fn metrics_schedule_latency_histogram_builder(&self) -> Option { + if self.metrics_schedule_latency_histogram_enable { + Some(self.metrics_schedule_latency_histogram.clone()) + } else { + None + } + } } cfg_io_driver! { @@ -1812,6 +1948,7 @@ cfg_rt_multi_thread! { disable_lifo_slot: self.disable_lifo_slot, seed_generator: seed_generator_1, metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), + metrics_schedule_latency_histogram: self.metrics_schedule_latency_histogram_builder(), }, self.timer_flavor, ); diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index b79df96e1e2..549af9975ab 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -48,6 +48,9 @@ pub(crate) struct Config { /// How to build poll time histograms pub(crate) metrics_poll_count_histogram: Option, + /// How to build schedule latency histograms + pub(crate) metrics_schedule_latency_histogram: Option, + #[cfg(tokio_unstable)] /// How to respond to unhandled task panics. pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index fe2f4a9da4e..afab4a3cdab 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -53,6 +53,9 @@ pub(crate) struct MetricsBatch { #[cfg(tokio_unstable)] /// If `Some`, tracks poll times in nanoseconds poll_timer: Option, + + #[cfg(tokio_unstable)] + schedule_latencies: Option, } cfg_unstable_metrics! { @@ -95,6 +98,10 @@ impl MetricsBatch { poll_started_at: now, }) }); + let schedule_latencies = worker_metrics + .schedule_latency_histogram + .as_ref() + .map(HistogramBatch::from_histogram); MetricsBatch { park_count: 0, park_unpark_count: 0, @@ -108,6 +115,7 @@ impl MetricsBatch { busy_duration_total: 0, processing_scheduled_tasks_started_at: maybe_now, poll_timer, + schedule_latencies, } } } @@ -155,6 +163,11 @@ impl MetricsBatch { let dst = worker.poll_count_histogram.as_ref().unwrap(); poll_timer.poll_counts.submit(dst); } + + if let Some(schedule_latencies) = &self.schedule_latencies { + let dst = worker.schedule_latency_histogram.as_ref().unwrap(); + schedule_latencies.submit(dst); + } } } } @@ -206,15 +219,22 @@ impl MetricsBatch { cfg_metrics_variant! { stable: { /// Start polling an individual task - pub(crate) fn start_poll(&mut self) {} + pub(crate) fn start_poll(&mut self, _task_scheduled_at: Option) {} }, unstable: { /// Start polling an individual task - pub(crate) fn start_poll(&mut self) { + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { self.poll_count += 1; if let Some(poll_timer) = &mut self.poll_timer { poll_timer.poll_started_at = Instant::now(); } + if let Some(task_scheduled_at) = task_scheduled_at { + if let Some(schedule_latencies) = &mut self.schedule_latencies { + let now = self.poll_timer.as_ref().map(|p| p.poll_started_at).unwrap_or_else(Instant::now); + let elapsed = duration_as_u64(now - task_scheduled_at); + schedule_latencies.measure(elapsed, 1); + } + } } } } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 8aeb608bd02..187d41ed231 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -571,6 +571,124 @@ impl RuntimeMetrics { pub fn blocking_queue_depth(&self) -> usize { self.handle.inner.blocking_queue_depth() } + + /// Returns `true` if the runtime is tracking the distribution of task + /// schedule latencies. + /// + /// Task schedule latencies times are not instrumented by default as doing + /// so requires calling [`Instant::now()`] twice per task poll. The feature + /// is enabled by calling [`enable_metrics_schedule_latency_histogram()`] + /// when building the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let enabled = metrics.schedule_latency_histogram_enabled(); + /// + /// println!("Tracking task schedule latency distribution: {:?}", enabled); + /// }); + /// } + /// ``` + /// + /// [`enable_metrics_schedule_latency_histogram()`]: crate::runtime::Builder::enable_metrics_schedule_latency_histogram + /// [`Instant::now()`]: std::time::Instant::now + pub fn schedule_latency_histogram_enabled(&self) -> bool { + self.handle.inner.worker_metrics(0).schedule_latency_histogram.is_some() + } + + /// Returns the number of histogram buckets tracking the distribution of + /// task schedule latencies. + /// + /// This value is configured by calling + /// [`metrics_schedule_latency_histogram_configuration()`] when building the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// println!("Histogram buckets: {:?}", buckets); + /// }); + /// } + /// ``` + /// + /// [`metrics_schedule_latency_histogram_configuration()`]: crate::runtime::Builder::metrics_schedule_latency_histogram_configuration + pub fn schedule_latency_histogram_num_buckets(&self) -> usize { + self.handle + .inner + .worker_metrics(0) + .schedule_latency_histogram + .as_ref() + .map(|histogram| histogram.num_buckets()) + .unwrap_or_default() + } + + /// Returns the range of task schedule latencies tracked by the given bucket. + /// + /// This value is configured by calling + /// [`metrics_schedule_latency_histogram_configuration()`] when building the runtime. + /// + /// # Panics + /// + /// The method panics if `bucket` represents an invalid bucket index, i.e. + /// is greater than or equal to `schedule_latency_histogram_num_buckets()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// for i in 0..buckets { + /// let range = metrics.schedule_latency_histogram_bucket_range(i); + /// println!("Histogram bucket {} range: {:?}", i, range); + /// } + /// }); + /// } + /// ``` + /// + /// [`metrics_schedule_latency_histogram_configuration()`]: crate::runtime::Builder::metrics_schedule_latency_histogram_configuration + pub fn schedule_latency_histogram_bucket_range(&self, bucket: usize) -> Range { + self.handle + .inner + .worker_metrics(0) + .schedule_latency_histogram + .as_ref() + .map(|histogram| { + let range = histogram.bucket_range(bucket); + std::ops::Range { + start: Duration::from_nanos(range.start), + end: Duration::from_nanos(range.end), + } + }) + .unwrap_or_default() + } } feature! { @@ -1027,6 +1145,72 @@ impl RuntimeMetrics { .load(Relaxed); Duration::from_nanos(nanos) } + + /// Returns the number of times the given worker polled tasks with a schedule + /// latency within the given bucket's range. + /// + /// Each worker maintains its own histogram and the counts for each bucket + /// starts at zero when the runtime is created. Each time the worker polls a + /// task, it tracks the time elapsed between when the task was scheduled and + /// when it was polled and increments the associated bucket by 1. + /// + /// Each bucket is a monotonically increasing counter. It is never + /// decremented or reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// `bucket` is the index of the bucket being queried. The bucket is scoped + /// to the worker. The range represented by the bucket can be queried by + /// calling [`schedule_latency_histogram_bucket_range()`]. Each worker maintains + /// identical bucket ranges. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()` or if `bucket` represents an + /// invalid bucket. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// for worker in 0..metrics.num_workers() { + /// for i in 0..buckets { + /// let range = metrics.schedule_latency_histogram_bucket_range(i); + /// let count = metrics.schedule_latency_histogram_bucket_count(worker, i); + /// println!("{} tasks encountered a scheduling latency between {}us and {}us", count, range.start.as_micros(), range.end.as_micros()); + /// } + /// } + /// }); + /// } + /// ``` + /// + /// [`schedule_latency_histogram_bucket_range()`]: crate::runtime::RuntimeMetrics::schedule_latency_histogram_bucket_range + #[track_caller] + pub fn schedule_latency_histogram_bucket_count(&self, worker: usize, bucket: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .schedule_latency_histogram + .as_ref() + .map(|histogram| histogram.get(bucket)) + .unwrap_or_default() + } } feature! { diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 30926b2a6c2..7d3f0ce40cd 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -65,6 +65,9 @@ pub(crate) struct WorkerMetrics { #[cfg(tokio_unstable)] /// If `Some`, tracks the number of polls by duration range. pub(super) poll_count_histogram: Option, + + #[cfg(tokio_unstable)] + pub(super) schedule_latency_histogram: Option, } impl WorkerMetrics { @@ -93,6 +96,10 @@ impl WorkerMetrics { .metrics_poll_count_histogram .as_ref() .map(|histogram_builder| histogram_builder.build()); + worker_metrics.schedule_latency_histogram = config + .metrics_schedule_latency_histogram + .as_ref() + .map(|histogram_builder| histogram_builder.build()); worker_metrics } } diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 68ab17f1402..e4e1f8d0f9a 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -3,7 +3,8 @@ use crate::loom::sync::Arc; use crate::runtime::driver::{self, Driver}; use crate::runtime::scheduler::{self, Defer, Inject}; use crate::runtime::task::{ - self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks, + self, JoinHandle, LocalNotified, OwnedTasks, Schedule, SpawnLocation, Task, + TaskHarnessScheduleHooks, }; use crate::runtime::{ blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics, @@ -363,11 +364,28 @@ fn wake_deferred_tasks_and_free(context: &Context) { impl Context { /// Execute the closure with the given scheduler core stored in the /// thread-local context. - fn run_task(&self, mut core: Box, f: impl FnOnce() -> R) -> (Box, R) { - core.metrics.start_poll(); - let mut ret = self.enter(core, || crate::task::coop::budget(f)); - ret.0.metrics.end_poll(); - ret + fn run_task(&self, task: LocalNotified>, mut core: Box) -> Box { + #[cfg(tokio_unstable)] + let task_meta = task.task_meta(); + + #[cfg(tokio_unstable)] + core.metrics.start_poll(task.get_scheduled_at()); + #[cfg(not(tokio_unstable))] + core.metrics.start_poll(None); + + let (mut c, ()) = self.enter(core, || { + crate::task::coop::budget(|| { + #[cfg(tokio_unstable)] + self.handle.task_hooks.poll_start_callback(&task_meta); + + task.run(); + + #[cfg(tokio_unstable)] + self.handle.task_hooks.poll_stop_callback(&task_meta); + }) + }); + c.metrics.end_poll(); + c } /// Blocks the current thread until an event is received by the driver, @@ -657,6 +675,14 @@ impl Schedule for Arc { fn schedule(&self, task: task::Notified) { use scheduler::Context::CurrentThread; + // SAFETY: There are no concurrent writes because tasks cannot be scheduled in + // multiple places concurrently. There are no concurrent reads because this field + // is only read when polling the task, which can only happen after it's scheduled. + #[cfg(tokio_unstable)] + unsafe { + task.set_scheduled_at(std::time::Instant::now()); + } + context::with_scheduler(|maybe_cx| match maybe_cx { Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => { let mut core = cx.core.borrow_mut(); @@ -806,18 +832,7 @@ impl CoreGuard<'_> { let task = context.handle.shared.owned.assert_owner(task); - #[cfg(tokio_unstable)] - let task_meta = task.task_meta(); - - let (c, ()) = context.run_task(core, || { - #[cfg(tokio_unstable)] - context.handle.task_hooks.poll_start_callback(&task_meta); - - task.run(); - - #[cfg(tokio_unstable)] - context.handle.task_hooks.poll_stop_callback(&task_meta); - }); + let c = context.run_task(task, core); core = c; } diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index c59d4373ab8..840d88c66f0 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -113,8 +113,8 @@ impl Stats { } } - pub(crate) fn start_poll(&mut self) { - self.batch.start_poll(); + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { + self.batch.start_poll(task_scheduled_at); self.tasks_polled_in_batch += 1; } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 72bdc2bd31c..c51d7e60aac 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -629,7 +629,10 @@ impl Context { // tasks under this measurement. In this case, the tasks came from the // LIFO slot and are considered part of the current task for scheduling // purposes. These tasks inherent the "parent"'s limits. - core.stats.start_poll(); + #[cfg(tokio_unstable)] + core.stats.start_poll(task.get_scheduled_at()); + #[cfg(not(tokio_unstable))] + core.stats.start_poll(None); // Make the core available to the runtime context *self.core.borrow_mut() = Some(core); @@ -1271,6 +1274,14 @@ impl Worker { impl Handle { pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { + // SAFETY: There are no concurrent writes because tasks cannot be scheduled in + // multiple places concurrently. There are no concurrent reads because this field + // is only read when polling the task, which can only happen after it's scheduled. + #[cfg(tokio_unstable)] + unsafe { + task.set_scheduled_at(std::time::Instant::now()); + } + with_current(|maybe_cx| { if let Some(cx) = maybe_cx { // Make sure the task is part of the **current** scheduler. diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index aa3f61a2217..c94bcc3e991 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -32,6 +32,8 @@ use std::panic::Location; use std::pin::Pin; use std::ptr::NonNull; use std::task::{Context, Poll, Waker}; +#[cfg(tokio_unstable)] +use std::time::Instant; /// The task cell. Contains the components of the task. /// @@ -191,6 +193,10 @@ pub(crate) struct Header { /// The tracing ID for this instrumented task. #[cfg(all(tokio_unstable, feature = "tracing"))] pub(super) tracing_id: Option, + + /// The last time this task was scheduled. Used to measure schedule latency. + #[cfg(tokio_unstable)] + pub(super) scheduled_at: UnsafeCell>, } unsafe impl Send for Header {} @@ -247,6 +253,8 @@ impl Cell { owner_id: UnsafeCell::new(None), #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id, + #[cfg(tokio_unstable)] + scheduled_at: UnsafeCell::new(None), } } @@ -534,6 +542,25 @@ impl Header { pub(super) unsafe fn get_tracing_id(me: &NonNull
) -> Option<&tracing::Id> { me.as_ref().tracing_id.as_ref() } + + /// Updates the last time this task was scheduled. Used to calculate + /// the time elapsed between task scheduling and polling. + /// + /// # Safety + /// + /// The caller must guarantee exclusive access to this field. + #[cfg(tokio_unstable)] + pub(super) unsafe fn set_scheduled_at(&self, now: Instant) { + self.scheduled_at.with_mut(|ptr| *ptr = Some(now)); + } + + /// Gets the last time this task was scheduled. + #[cfg(tokio_unstable)] + pub(super) fn get_scheduled_at(&self) -> Option { + // Safety: If there are concurrent writes, then that write has violated + // the safety requirements on `set_scheduled_at`. + unsafe { self.scheduled_at.with(|ptr| *ptr) } + } } impl Trailer { diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 53c477d52de..2689d6bd1f7 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -225,6 +225,8 @@ use crate::runtime::TaskCallback; use std::marker::PhantomData; use std::panic::Location; use std::ptr::NonNull; +#[cfg(tokio_unstable)] +use std::time::Instant; use std::{fmt, mem}; /// An owned handle to the task, tracked by ref count. @@ -247,6 +249,13 @@ impl Notified { pub(crate) fn task_meta<'meta>(&self) -> crate::runtime::TaskMeta<'meta> { self.0.task_meta() } + + #[cfg(tokio_unstable)] + pub(crate) unsafe fn set_scheduled_at(&self, now: Instant) { + unsafe { + self.0.header().set_scheduled_at(now); + } + } } // safety: This type cannot be used to touch the task without first verifying @@ -268,6 +277,11 @@ impl LocalNotified { pub(crate) fn task_meta<'meta>(&self) -> crate::runtime::TaskMeta<'meta> { self.task.task_meta() } + + #[cfg(tokio_unstable)] + pub(crate) fn get_scheduled_at(&self) -> Option { + self.task.header().get_scheduled_at() + } } /// A task that is not owned by any `OwnedTasks`. Used for blocking tasks. diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index b6de3159134..1086e14f206 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -800,6 +800,53 @@ fn io_driver_ready_count() { assert_eq!(metrics.io_driver_ready_count(), 1); } +#[test] +fn schedule_latency_counts() { + const N: u64 = 50; + let rts = [ + tokio::runtime::Builder::new_current_thread() + .enable_all() + .enable_metrics_schedule_latency_histogram() + .metrics_schedule_latency_histogram_configuration(HistogramConfiguration::linear( + Duration::from_millis(50), + 3, + )) + .build() + .unwrap(), + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .enable_metrics_schedule_latency_histogram() + .metrics_schedule_latency_histogram_configuration(HistogramConfiguration::linear( + Duration::from_millis(50), + 3, + )) + .build() + .unwrap(), + ]; + + for rt in rts { + let metrics = rt.metrics(); + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async {}).await.unwrap(); + } + }); + drop(rt); + + let num_workers = metrics.num_workers(); + let num_buckets = metrics.schedule_latency_histogram_num_buckets(); + + assert!(metrics.schedule_latency_histogram_enabled()); + assert_eq!(num_buckets, 3); + + let n = (0..num_workers) + .flat_map(|i| (0..num_buckets).map(move |j| (i, j))) + .map(|(worker, bucket)| metrics.schedule_latency_histogram_bucket_count(worker, bucket)) + .sum(); + assert_eq!(N, n); + } +} + async fn try_spawn_stealable_task() -> Result<(), mpsc::RecvTimeoutError> { // We use a blocking channel to synchronize the tasks. let (tx, rx) = mpsc::channel(); From a355889cba25b8e98b649b18dc55d44b1b29ca40 Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Wed, 25 Mar 2026 11:12:09 -0400 Subject: [PATCH 2/8] metrics: clarify schedule latency docs --- tokio/src/runtime/builder.rs | 11 ++++++++--- tokio/src/runtime/metrics/runtime.rs | 8 ++++---- tokio/src/runtime/metrics/worker.rs | 1 + 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 8519cc2027f..c04a44a9d6e 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1570,9 +1570,9 @@ impl Builder { /// execution and when it is polled. /// /// Task schedule latencies are not instrumented by default as doing - /// so requires calling [`Instant::now()`] twice per task poll, which - /// could add measurable overhead. Use the [`Handle::metrics()`] to - /// access the metrics data. + /// so requires calling [`Instant::now()`] when a task is scheduled + /// and when it is polled, which could add measurable overhead. Use + /// the [`Handle::metrics()`] to access the metrics data. /// /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. /// This has an extremely low memory footprint, but may not provide enough granularity. For @@ -1610,6 +1610,10 @@ impl Builder { /// Configure the histogram for tracking task schedule latencies. /// + /// Tracking of task schedule latencies must be enabled with + /// [`enable_metrics_schedule_latency_histogram()`] for this function + /// to have any effect. + /// /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. /// This has an extremely low memory footprint, but may not provide enough granularity. For /// better granularity with low memory usage, use [`LogHistogram`] instead. @@ -1678,6 +1682,7 @@ impl Builder { /// ``` /// /// [`LogHistogram`]: crate::runtime::LogHistogram + /// [`enable_metrics_schedule_latency_histogram()`]: Builder::enable_metrics_schedule_latency_histogram pub fn metrics_schedule_latency_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self { self.metrics_schedule_latency_histogram.histogram_type = configuration.inner; self diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 187d41ed231..e14eddfc68e 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -575,10 +575,10 @@ impl RuntimeMetrics { /// Returns `true` if the runtime is tracking the distribution of task /// schedule latencies. /// - /// Task schedule latencies times are not instrumented by default as doing - /// so requires calling [`Instant::now()`] twice per task poll. The feature - /// is enabled by calling [`enable_metrics_schedule_latency_histogram()`] - /// when building the runtime. + /// Task schedule latencies are not instrumented by default as doing so + /// requires calling [`Instant::now()`] when a task is scheduled and when + /// it is polled. The feature is enabled by calling + /// [`enable_metrics_schedule_latency_histogram()`] when building the runtime. /// /// # Examples /// diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 7d3f0ce40cd..e65c6e335de 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -67,6 +67,7 @@ pub(crate) struct WorkerMetrics { pub(super) poll_count_histogram: Option, #[cfg(tokio_unstable)] + /// If `Some`, tracks the number of times tasks were scheduled by duration range. pub(super) schedule_latency_histogram: Option, } From eb3cd6e6101ef6553c36732da6bf258cbcccc56c Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Wed, 25 Mar 2026 11:13:26 -0400 Subject: [PATCH 3/8] metrics: restrict supported platforms for schedule latency --- tokio/src/runtime/metrics/batch.rs | 14 +++++++---- .../runtime/scheduler/current_thread/mod.rs | 23 ++++++++++++------- .../runtime/scheduler/multi_thread/worker.rs | 23 ++++++++++++------- tokio/src/runtime/task/core.rs | 11 +++++---- tokio/src/runtime/task/mod.rs | 4 ++-- tokio/tests/rt_unstable_metrics.rs | 1 + 6 files changed, 49 insertions(+), 27 deletions(-) diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index afab4a3cdab..acace513fa5 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -98,10 +98,13 @@ impl MetricsBatch { poll_started_at: now, }) }); - let schedule_latencies = worker_metrics + // Schedule latencies cannot be tracked if `Instant::now()` is unavailable + let schedule_latencies = maybe_now.and_then(|_| { + worker_metrics .schedule_latency_histogram .as_ref() - .map(HistogramBatch::from_histogram); + .map(HistogramBatch::from_histogram) + }); MetricsBatch { park_count: 0, park_unpark_count: 0, @@ -230,9 +233,10 @@ impl MetricsBatch { } if let Some(task_scheduled_at) = task_scheduled_at { if let Some(schedule_latencies) = &mut self.schedule_latencies { - let now = self.poll_timer.as_ref().map(|p| p.poll_started_at).unwrap_or_else(Instant::now); - let elapsed = duration_as_u64(now - task_scheduled_at); - schedule_latencies.measure(elapsed, 1); + if let Some(now) = self.poll_timer.as_ref().map(|p| p.poll_started_at).or_else(now) { + let elapsed = duration_as_u64(now.saturating_duration_since(task_scheduled_at)); + schedule_latencies.measure(elapsed, 1); + } } } } diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index e4e1f8d0f9a..98ef69d0b34 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -368,9 +368,9 @@ impl Context { #[cfg(tokio_unstable)] let task_meta = task.task_meta(); - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] core.metrics.start_poll(task.get_scheduled_at()); - #[cfg(not(tokio_unstable))] + #[cfg(not(all(tokio_unstable, target_has_atomic = "64")))] core.metrics.start_poll(None); let (mut c, ()) = self.enter(core, || { @@ -675,12 +675,19 @@ impl Schedule for Arc { fn schedule(&self, task: task::Notified) { use scheduler::Context::CurrentThread; - // SAFETY: There are no concurrent writes because tasks cannot be scheduled in - // multiple places concurrently. There are no concurrent reads because this field - // is only read when polling the task, which can only happen after it's scheduled. - #[cfg(tokio_unstable)] - unsafe { - task.set_scheduled_at(std::time::Instant::now()); + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] + if self + .shared + .config + .metrics_schedule_latency_histogram + .is_some() + { + // SAFETY: There are no concurrent writes because tasks cannot be scheduled in + // multiple places concurrently. There are no concurrent reads because this field + // is only read when polling the task, which can only happen after it's scheduled. + unsafe { + task.set_scheduled_at(std::time::Instant::now()); + } } context::with_scheduler(|maybe_cx| match maybe_cx { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index c51d7e60aac..e03793ee9f3 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -629,9 +629,9 @@ impl Context { // tasks under this measurement. In this case, the tasks came from the // LIFO slot and are considered part of the current task for scheduling // purposes. These tasks inherent the "parent"'s limits. - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] core.stats.start_poll(task.get_scheduled_at()); - #[cfg(not(tokio_unstable))] + #[cfg(not(all(tokio_unstable, target_has_atomic = "64")))] core.stats.start_poll(None); // Make the core available to the runtime context @@ -1274,12 +1274,19 @@ impl Worker { impl Handle { pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { - // SAFETY: There are no concurrent writes because tasks cannot be scheduled in - // multiple places concurrently. There are no concurrent reads because this field - // is only read when polling the task, which can only happen after it's scheduled. - #[cfg(tokio_unstable)] - unsafe { - task.set_scheduled_at(std::time::Instant::now()); + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] + if self + .shared + .config + .metrics_schedule_latency_histogram + .is_some() + { + // SAFETY: There are no concurrent writes because tasks cannot be scheduled in + // multiple places concurrently. There are no concurrent reads because this field + // is only read when polling the task, which can only happen after it's scheduled. + unsafe { + task.set_scheduled_at(std::time::Instant::now()); + } } with_current(|maybe_cx| { diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index c94bcc3e991..04871f372e1 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -195,7 +195,10 @@ pub(crate) struct Header { pub(super) tracing_id: Option, /// The last time this task was scheduled. Used to measure schedule latency. - #[cfg(tokio_unstable)] + /// + /// Only enabled when the target supports 64-bit atomics because the metric + /// that uses this field also requires 64-bit atomics. + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] pub(super) scheduled_at: UnsafeCell>, } @@ -253,7 +256,7 @@ impl Cell { owner_id: UnsafeCell::new(None), #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id, - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] scheduled_at: UnsafeCell::new(None), } } @@ -549,13 +552,13 @@ impl Header { /// # Safety /// /// The caller must guarantee exclusive access to this field. - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] pub(super) unsafe fn set_scheduled_at(&self, now: Instant) { self.scheduled_at.with_mut(|ptr| *ptr = Some(now)); } /// Gets the last time this task was scheduled. - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] pub(super) fn get_scheduled_at(&self) -> Option { // Safety: If there are concurrent writes, then that write has violated // the safety requirements on `set_scheduled_at`. diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 2689d6bd1f7..0da8d13d4d6 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -250,7 +250,7 @@ impl Notified { self.0.task_meta() } - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] pub(crate) unsafe fn set_scheduled_at(&self, now: Instant) { unsafe { self.0.header().set_scheduled_at(now); @@ -278,7 +278,7 @@ impl LocalNotified { self.task.task_meta() } - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] pub(crate) fn get_scheduled_at(&self) -> Option { self.task.header().get_scheduled_at() } diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 1086e14f206..05ef9f05fcb 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -814,6 +814,7 @@ fn schedule_latency_counts() { .build() .unwrap(), tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) .enable_all() .enable_metrics_schedule_latency_histogram() .metrics_schedule_latency_histogram_configuration(HistogramConfiguration::linear( From 46db3a285c9153017fab946eca9e367f59ea0b03 Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Wed, 25 Mar 2026 11:55:56 -0400 Subject: [PATCH 4/8] fix unused import on targets without 64-bit atomics --- tokio/src/runtime/task/core.rs | 2 +- tokio/src/runtime/task/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 04871f372e1..72c35c1c7cb 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -32,7 +32,7 @@ use std::panic::Location; use std::pin::Pin; use std::ptr::NonNull; use std::task::{Context, Poll, Waker}; -#[cfg(tokio_unstable)] +#[cfg(all(tokio_unstable, target_has_atomic = "64"))] use std::time::Instant; /// The task cell. Contains the components of the task. diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 0da8d13d4d6..a2fe47b378c 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -225,7 +225,7 @@ use crate::runtime::TaskCallback; use std::marker::PhantomData; use std::panic::Location; use std::ptr::NonNull; -#[cfg(tokio_unstable)] +#[cfg(all(tokio_unstable, target_has_atomic = "64"))] use std::time::Instant; use std::{fmt, mem}; From 804b42a38fefff9edc8bea40223e6117980f4dc7 Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Wed, 25 Mar 2026 13:24:56 -0400 Subject: [PATCH 5/8] make Notified::set_scheduled_at safe --- tokio/src/runtime/scheduler/current_thread/mod.rs | 7 +------ tokio/src/runtime/scheduler/multi_thread/worker.rs | 7 +------ tokio/src/runtime/task/mod.rs | 5 ++++- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 98ef69d0b34..a917ee48790 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -682,12 +682,7 @@ impl Schedule for Arc { .metrics_schedule_latency_histogram .is_some() { - // SAFETY: There are no concurrent writes because tasks cannot be scheduled in - // multiple places concurrently. There are no concurrent reads because this field - // is only read when polling the task, which can only happen after it's scheduled. - unsafe { - task.set_scheduled_at(std::time::Instant::now()); - } + task.set_scheduled_at(std::time::Instant::now()); } context::with_scheduler(|maybe_cx| match maybe_cx { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index e03793ee9f3..8e3bd88db20 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -1281,12 +1281,7 @@ impl Handle { .metrics_schedule_latency_histogram .is_some() { - // SAFETY: There are no concurrent writes because tasks cannot be scheduled in - // multiple places concurrently. There are no concurrent reads because this field - // is only read when polling the task, which can only happen after it's scheduled. - unsafe { - task.set_scheduled_at(std::time::Instant::now()); - } + task.set_scheduled_at(std::time::Instant::now()); } with_current(|maybe_cx| { diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index a2fe47b378c..aa514a0b21f 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -251,7 +251,10 @@ impl Notified { } #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - pub(crate) unsafe fn set_scheduled_at(&self, now: Instant) { + pub(crate) fn set_scheduled_at(&self, now: Instant) { + // SAFETY: There are no concurrent writes because there is only ever one `Notified` + // reference per task. There are no concurrent reads because this field is only read + // when polling the task, which can only happen after it's scheduled. unsafe { self.0.header().set_scheduled_at(now); } From f409f157d0d34a055114a87c6c9dc356b06b6f8f Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Fri, 27 Mar 2026 14:17:45 -0400 Subject: [PATCH 6/8] use u64 for scheduled_at instead of Instant --- tokio/src/runtime/builder.rs | 2 ++ tokio/src/runtime/metrics/batch.rs | 16 +++++++--- tokio/src/runtime/metrics/runtime.rs | 7 +++++ .../runtime/scheduler/current_thread/mod.rs | 27 +++++++++++++---- .../runtime/scheduler/multi_thread/stats.rs | 2 +- .../runtime/scheduler/multi_thread/worker.rs | 29 +++++++++++++++---- tokio/src/runtime/task/core.rs | 29 +++++++++++-------- tokio/src/runtime/task/mod.rs | 14 ++++----- 8 files changed, 92 insertions(+), 34 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index c04a44a9d6e..487bb82b1e8 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1569,6 +1569,8 @@ impl Builder { /// schedule latency is the time between when a task is scheduled for /// execution and when it is polled. /// + /// **This feature is only supported on 64-bit targets.** + /// /// Task schedule latencies are not instrumented by default as doing /// so requires calling [`Instant::now()`] when a task is scheduled /// and when it is polled, which could add measurable overhead. Use diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index acace513fa5..d6247c8120d 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -222,19 +222,27 @@ impl MetricsBatch { cfg_metrics_variant! { stable: { /// Start polling an individual task - pub(crate) fn start_poll(&mut self, _task_scheduled_at: Option) {} + pub(crate) fn start_poll(&mut self, _task_scheduled_at: Option<(Instant, u64)>) {} }, unstable: { /// Start polling an individual task - pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { + /// + /// # Arguments + /// + /// `task_scheduled_at` is an optional tuple containing the Instant the scheduler + /// was started and the number of nanoseconds elapsed between that instant and + /// the time the task being polled was scheduled. + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option<(Instant, u64)>) { self.poll_count += 1; if let Some(poll_timer) = &mut self.poll_timer { poll_timer.poll_started_at = Instant::now(); } - if let Some(task_scheduled_at) = task_scheduled_at { + if let Some((runtime_started_at, task_scheduled_at)) = task_scheduled_at { if let Some(schedule_latencies) = &mut self.schedule_latencies { if let Some(now) = self.poll_timer.as_ref().map(|p| p.poll_started_at).or_else(now) { - let elapsed = duration_as_u64(now.saturating_duration_since(task_scheduled_at)); + // `u64::MAX` as nanoseconds is equal to 584 years + let nanos_since_start = now.saturating_duration_since(runtime_started_at).as_nanos() as u64; + let elapsed = nanos_since_start.saturating_sub(task_scheduled_at); schedule_latencies.measure(elapsed, 1); } } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index e14eddfc68e..c96b70cc6e6 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -1145,7 +1145,14 @@ impl RuntimeMetrics { .load(Relaxed); Duration::from_nanos(nanos) } + } + feature! { + #![all( + tokio_unstable, + target_has_atomic = "64", + target_pointer_width = "64" + )] /// Returns the number of times the given worker polled tasks with a schedule /// latency within the given bucket's range. /// diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index a917ee48790..f28154b7d7d 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -21,6 +21,8 @@ use std::task::Poll::{Pending, Ready}; use std::task::Waker; use std::thread::ThreadId; use std::time::Duration; +#[cfg(all(tokio_unstable, target_pointer_width = "64"))] +use std::time::Instant; use std::{fmt, thread}; /// Executes tasks on the current thread @@ -98,6 +100,12 @@ struct Shared { /// This scheduler only has one worker. worker_metrics: WorkerMetrics, + + /// Startup time of this scheduler. + /// + /// This instant is used as the basis of task `scheduled_at` measurements. + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + started_at: Instant, } /// Thread-local context. @@ -158,6 +166,8 @@ impl CurrentThread { config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics, + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + started_at: Instant::now(), }, driver: driver_handle, blocking_spawner, @@ -368,9 +378,12 @@ impl Context { #[cfg(tokio_unstable)] let task_meta = task.task_meta(); - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - core.metrics.start_poll(task.get_scheduled_at()); - #[cfg(not(all(tokio_unstable, target_has_atomic = "64")))] + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + core.metrics.start_poll( + task.get_scheduled_at() + .map(|t| (self.handle.shared.started_at, t.get())), + ); + #[cfg(not(all(tokio_unstable, target_pointer_width = "64")))] core.metrics.start_poll(None); let (mut c, ()) = self.enter(core, || { @@ -675,14 +688,18 @@ impl Schedule for Arc { fn schedule(&self, task: task::Notified) { use scheduler::Context::CurrentThread; - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] if self .shared .config .metrics_schedule_latency_histogram .is_some() { - task.set_scheduled_at(std::time::Instant::now()); + // SAFETY: `.max(1)` ensures the value can never be 0. + let scheduled_at = unsafe { + NonZeroU64::new_unchecked(self.shared.started_at.elapsed().as_nanos().max(1) as u64) + }; + task.set_scheduled_at(scheduled_at); } context::with_scheduler(|maybe_cx| match maybe_cx { diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index 840d88c66f0..ab3aa3400ba 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -113,7 +113,7 @@ impl Stats { } } - pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option<(Instant, u64)>) { self.batch.start_poll(task_scheduled_at); self.tasks_polled_in_batch += 1; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 8e3bd88db20..62831762ee4 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -75,6 +75,8 @@ use std::cell::RefCell; use std::task::Waker; use std::thread; use std::time::Duration; +#[cfg(all(tokio_unstable, target_pointer_width = "64"))] +use std::time::Instant; mod metrics; @@ -193,6 +195,12 @@ pub(crate) struct Shared { pub(super) worker_metrics: Box<[WorkerMetrics]>, + /// Startup time of this scheduler. + /// + /// This instant is used as the basis of task `scheduled_at` measurements. + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + started_at: Instant, + /// Only held to trigger some code on drop. This is used to get internal /// runtime metrics that can be useful when doing performance /// investigations. This does nothing (empty struct, no drop impl) unless @@ -318,6 +326,8 @@ pub(super) fn create( config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: worker_metrics.into_boxed_slice(), + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + started_at: Instant::now(), _counters: Counters, }, driver: driver_handle, @@ -629,9 +639,12 @@ impl Context { // tasks under this measurement. In this case, the tasks came from the // LIFO slot and are considered part of the current task for scheduling // purposes. These tasks inherent the "parent"'s limits. - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - core.stats.start_poll(task.get_scheduled_at()); - #[cfg(not(all(tokio_unstable, target_has_atomic = "64")))] + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + core.stats.start_poll( + task.get_scheduled_at() + .map(|t| (self.worker.handle.shared.started_at, t.get())), + ); + #[cfg(not(all(tokio_unstable, target_pointer_width = "64")))] core.stats.start_poll(None); // Make the core available to the runtime context @@ -1274,14 +1287,20 @@ impl Worker { impl Handle { pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] if self .shared .config .metrics_schedule_latency_histogram .is_some() { - task.set_scheduled_at(std::time::Instant::now()); + // SAFETY: `.max(1)` ensures the value can never be 0. + let scheduled_at = unsafe { + std::num::NonZeroU64::new_unchecked( + self.shared.started_at.elapsed().as_nanos().max(1) as u64, + ) + }; + task.set_scheduled_at(scheduled_at); } with_current(|maybe_cx| { diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 72c35c1c7cb..9624e20cef7 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -32,8 +32,6 @@ use std::panic::Location; use std::pin::Pin; use std::ptr::NonNull; use std::task::{Context, Poll, Waker}; -#[cfg(all(tokio_unstable, target_has_atomic = "64"))] -use std::time::Instant; /// The task cell. Contains the components of the task. /// @@ -195,11 +193,12 @@ pub(crate) struct Header { pub(super) tracing_id: Option, /// The last time this task was scheduled. Used to measure schedule latency. + /// Stored as the number of nanoseconds since scheduler startup. /// - /// Only enabled when the target supports 64-bit atomics because the metric - /// that uses this field also requires 64-bit atomics. - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - pub(super) scheduled_at: UnsafeCell>, + /// Only enabled on 64-bit targets because this field extends the size of this + /// struct beyond the size of one cache line on 32-bit targets. + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + pub(super) scheduled_at: UnsafeCell>, } unsafe impl Send for Header {} @@ -256,7 +255,7 @@ impl Cell { owner_id: UnsafeCell::new(None), #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id, - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] scheduled_at: UnsafeCell::new(None), } } @@ -549,17 +548,23 @@ impl Header { /// Updates the last time this task was scheduled. Used to calculate /// the time elapsed between task scheduling and polling. /// + /// + /// # Arguments + /// + /// `nanos` is the number of nanoseconds elapsed since the scheduler + /// was started. + /// /// # Safety /// /// The caller must guarantee exclusive access to this field. - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - pub(super) unsafe fn set_scheduled_at(&self, now: Instant) { - self.scheduled_at.with_mut(|ptr| *ptr = Some(now)); + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + pub(super) unsafe fn set_scheduled_at(&self, nanos: NonZeroU64) { + self.scheduled_at.with_mut(|ptr| *ptr = Some(nanos)); } /// Gets the last time this task was scheduled. - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - pub(super) fn get_scheduled_at(&self) -> Option { + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + pub(super) fn get_scheduled_at(&self) -> Option { // Safety: If there are concurrent writes, then that write has violated // the safety requirements on `set_scheduled_at`. unsafe { self.scheduled_at.with(|ptr| *ptr) } diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index aa514a0b21f..dfce849561b 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -223,10 +223,10 @@ use crate::util::sharded_list; use crate::runtime::TaskCallback; use std::marker::PhantomData; +#[cfg(all(tokio_unstable, target_pointer_width = "64"))] +use std::num::NonZeroU64; use std::panic::Location; use std::ptr::NonNull; -#[cfg(all(tokio_unstable, target_has_atomic = "64"))] -use std::time::Instant; use std::{fmt, mem}; /// An owned handle to the task, tracked by ref count. @@ -250,13 +250,13 @@ impl Notified { self.0.task_meta() } - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - pub(crate) fn set_scheduled_at(&self, now: Instant) { + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + pub(crate) fn set_scheduled_at(&self, nanos: NonZeroU64) { // SAFETY: There are no concurrent writes because there is only ever one `Notified` // reference per task. There are no concurrent reads because this field is only read // when polling the task, which can only happen after it's scheduled. unsafe { - self.0.header().set_scheduled_at(now); + self.0.header().set_scheduled_at(nanos); } } } @@ -281,8 +281,8 @@ impl LocalNotified { self.task.task_meta() } - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - pub(crate) fn get_scheduled_at(&self) -> Option { + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + pub(crate) fn get_scheduled_at(&self) -> Option { self.task.header().get_scheduled_at() } } From cc3ba46d80197e0040345da2915096c5bf01913e Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Mon, 30 Mar 2026 10:21:43 -0400 Subject: [PATCH 7/8] Only run schedule latency test on 64-bit targets --- tokio/tests/rt_unstable_metrics.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 05ef9f05fcb..a238b78ca02 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -800,6 +800,8 @@ fn io_driver_ready_count() { assert_eq!(metrics.io_driver_ready_count(), 1); } +// Schedule latency tracking is only supported on 64-bit targets +#[cfg(target_pointer_width = "64")] #[test] fn schedule_latency_counts() { const N: u64 = 50; From 5b521164b52149c3470aa23e7ddc866da03cd06f Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Tue, 31 Mar 2026 12:02:46 +0300 Subject: [PATCH 8/8] Add CLAUDE.md --- CLAUDE.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000000..c7b7755897a --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +Never link to any issue or a pull request in any Github repository