From 096b3a4a10095bdb5661c6e9faad4603fa228079 Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Thu, 5 Mar 2026 12:09:34 -0500 Subject: [PATCH] metrics: add task schedule latency metric This new metric tracks the amount of time between when a task is scheduled and when it is polled, also known as queue delay. This duration is recorded in a histogram, just like the poll time metric. This metric is useful for implementing queue management algorithms in systems using tokio. For example, it could be used to implement a generic http load shedder using the CoDel algorithm. --- tokio/src/runtime/builder.rs | 137 +++++++++++++ tokio/src/runtime/config.rs | 3 + tokio/src/runtime/metrics/batch.rs | 24 ++- tokio/src/runtime/metrics/runtime.rs | 184 ++++++++++++++++++ tokio/src/runtime/metrics/worker.rs | 7 + .../runtime/scheduler/current_thread/mod.rs | 51 +++-- .../runtime/scheduler/multi_thread/stats.rs | 4 +- .../runtime/scheduler/multi_thread/worker.rs | 13 +- tokio/src/runtime/task/core.rs | 27 +++ tokio/src/runtime/task/mod.rs | 14 ++ tokio/tests/rt_unstable_metrics.rs | 47 +++++ 11 files changed, 488 insertions(+), 23 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index aea4de7503e..8519cc2027f 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -133,6 +133,10 @@ pub struct Builder { /// Configures the task poll count histogram pub(super) metrics_poll_count_histogram: HistogramBuilder, + pub(super) metrics_schedule_latency_histogram_enable: bool, + + pub(super) metrics_schedule_latency_histogram: HistogramBuilder, + #[cfg(tokio_unstable)] pub(super) unhandled_panic: UnhandledPanic, @@ -323,6 +327,10 @@ impl Builder { metrics_poll_count_histogram: HistogramBuilder::default(), + metrics_schedule_latency_histogram_enable: false, + + metrics_schedule_latency_histogram: HistogramBuilder::default(), + disable_lifo_slot: false, timer_flavor: TimerFlavor::Traditional, @@ -1556,6 +1564,124 @@ impl Builder { self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets); self } + + /// Enables tracking the distribution of task schedule latencies. Task + /// schedule latency is the time between when a task is scheduled for + /// execution and when it is polled. + /// + /// Task schedule latencies are not instrumented by default as doing + /// so requires calling [`Instant::now()`] twice per task poll, which + /// could add measurable overhead. Use the [`Handle::metrics()`] to + /// access the metrics data. + /// + /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. + /// This has an extremely low memory footprint, but may not provide enough granularity. For + /// better granularity with low memory usage, use [`metrics_schedule_latency_histogram_configuration()`] + /// to select [`LogHistogram`] instead. + /// + /// # Examples + /// + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap(); + /// # // Test default values here + /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) } + /// # let m = rt.handle().metrics(); + /// # assert_eq!(m.schedule_latency_histogram_num_buckets(), 10); + /// # assert_eq!(m.schedule_latency_histogram_bucket_range(0), us(0)..us(100)); + /// # assert_eq!(m.schedule_latency_histogram_bucket_range(1), us(100)..us(200)); + /// # } + /// ``` + /// + /// [`Handle::metrics()`]: crate::runtime::Handle::metrics + /// [`Instant::now()`]: std::time::Instant::now + /// [`LogHistogram`]: crate::runtime::LogHistogram + /// [`metrics_schedule_latency_histogram_configuration()`]: Builder::metrics_schedule_latency_histogram_configuration + pub fn enable_metrics_schedule_latency_histogram(&mut self) -> &mut Self { + self.metrics_schedule_latency_histogram_enable = true; + self + } + + /// Configure the histogram for tracking task schedule latencies. + /// + /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. + /// This has an extremely low memory footprint, but may not provide enough granularity. For + /// better granularity with low memory usage, use [`LogHistogram`] instead. + /// + /// # Examples + /// Configure a [`LogHistogram`] with [default configuration]: + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::log(LogHistogram::default()) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// Configure a linear histogram with 100 buckets, each 10μs wide + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// use std::time::Duration; + /// use tokio::runtime::HistogramConfiguration; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::linear(Duration::from_micros(10), 100) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// Configure a [`LogHistogram`] with the following settings: + /// - Measure times from 100ns to 120s + /// - Max error of 0.1 + /// - No more than 1024 buckets + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use std::time::Duration; + /// use tokio::runtime; + /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::log(LogHistogram::builder() + /// .max_value(Duration::from_secs(120)) + /// .min_value(Duration::from_nanos(100)) + /// .max_error(0.1) + /// .max_buckets(1024) + /// .expect("configuration uses 488 buckets") + /// ) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// [`LogHistogram`]: crate::runtime::LogHistogram + pub fn metrics_schedule_latency_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self { + self.metrics_schedule_latency_histogram.histogram_type = configuration.inner; + self + } } fn build_current_thread_runtime(&mut self) -> io::Result { @@ -1631,6 +1757,8 @@ impl Builder { disable_lifo_slot: self.disable_lifo_slot, seed_generator: seed_generator_1, metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), + metrics_schedule_latency_histogram: self + .metrics_schedule_latency_histogram_builder(), }, local_tid, ); @@ -1649,6 +1777,14 @@ impl Builder { None } } + + fn metrics_schedule_latency_histogram_builder(&self) -> Option { + if self.metrics_schedule_latency_histogram_enable { + Some(self.metrics_schedule_latency_histogram.clone()) + } else { + None + } + } } cfg_io_driver! { @@ -1812,6 +1948,7 @@ cfg_rt_multi_thread! { disable_lifo_slot: self.disable_lifo_slot, seed_generator: seed_generator_1, metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), + metrics_schedule_latency_histogram: self.metrics_schedule_latency_histogram_builder(), }, self.timer_flavor, ); diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index b79df96e1e2..549af9975ab 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -48,6 +48,9 @@ pub(crate) struct Config { /// How to build poll time histograms pub(crate) metrics_poll_count_histogram: Option, + /// How to build schedule latency histograms + pub(crate) metrics_schedule_latency_histogram: Option, + #[cfg(tokio_unstable)] /// How to respond to unhandled task panics. pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index fe2f4a9da4e..afab4a3cdab 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -53,6 +53,9 @@ pub(crate) struct MetricsBatch { #[cfg(tokio_unstable)] /// If `Some`, tracks poll times in nanoseconds poll_timer: Option, + + #[cfg(tokio_unstable)] + schedule_latencies: Option, } cfg_unstable_metrics! { @@ -95,6 +98,10 @@ impl MetricsBatch { poll_started_at: now, }) }); + let schedule_latencies = worker_metrics + .schedule_latency_histogram + .as_ref() + .map(HistogramBatch::from_histogram); MetricsBatch { park_count: 0, park_unpark_count: 0, @@ -108,6 +115,7 @@ impl MetricsBatch { busy_duration_total: 0, processing_scheduled_tasks_started_at: maybe_now, poll_timer, + schedule_latencies, } } } @@ -155,6 +163,11 @@ impl MetricsBatch { let dst = worker.poll_count_histogram.as_ref().unwrap(); poll_timer.poll_counts.submit(dst); } + + if let Some(schedule_latencies) = &self.schedule_latencies { + let dst = worker.schedule_latency_histogram.as_ref().unwrap(); + schedule_latencies.submit(dst); + } } } } @@ -206,15 +219,22 @@ impl MetricsBatch { cfg_metrics_variant! { stable: { /// Start polling an individual task - pub(crate) fn start_poll(&mut self) {} + pub(crate) fn start_poll(&mut self, _task_scheduled_at: Option) {} }, unstable: { /// Start polling an individual task - pub(crate) fn start_poll(&mut self) { + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { self.poll_count += 1; if let Some(poll_timer) = &mut self.poll_timer { poll_timer.poll_started_at = Instant::now(); } + if let Some(task_scheduled_at) = task_scheduled_at { + if let Some(schedule_latencies) = &mut self.schedule_latencies { + let now = self.poll_timer.as_ref().map(|p| p.poll_started_at).unwrap_or_else(Instant::now); + let elapsed = duration_as_u64(now - task_scheduled_at); + schedule_latencies.measure(elapsed, 1); + } + } } } } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 8aeb608bd02..187d41ed231 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -571,6 +571,124 @@ impl RuntimeMetrics { pub fn blocking_queue_depth(&self) -> usize { self.handle.inner.blocking_queue_depth() } + + /// Returns `true` if the runtime is tracking the distribution of task + /// schedule latencies. + /// + /// Task schedule latencies times are not instrumented by default as doing + /// so requires calling [`Instant::now()`] twice per task poll. The feature + /// is enabled by calling [`enable_metrics_schedule_latency_histogram()`] + /// when building the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let enabled = metrics.schedule_latency_histogram_enabled(); + /// + /// println!("Tracking task schedule latency distribution: {:?}", enabled); + /// }); + /// } + /// ``` + /// + /// [`enable_metrics_schedule_latency_histogram()`]: crate::runtime::Builder::enable_metrics_schedule_latency_histogram + /// [`Instant::now()`]: std::time::Instant::now + pub fn schedule_latency_histogram_enabled(&self) -> bool { + self.handle.inner.worker_metrics(0).schedule_latency_histogram.is_some() + } + + /// Returns the number of histogram buckets tracking the distribution of + /// task schedule latencies. + /// + /// This value is configured by calling + /// [`metrics_schedule_latency_histogram_configuration()`] when building the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// println!("Histogram buckets: {:?}", buckets); + /// }); + /// } + /// ``` + /// + /// [`metrics_schedule_latency_histogram_configuration()`]: crate::runtime::Builder::metrics_schedule_latency_histogram_configuration + pub fn schedule_latency_histogram_num_buckets(&self) -> usize { + self.handle + .inner + .worker_metrics(0) + .schedule_latency_histogram + .as_ref() + .map(|histogram| histogram.num_buckets()) + .unwrap_or_default() + } + + /// Returns the range of task schedule latencies tracked by the given bucket. + /// + /// This value is configured by calling + /// [`metrics_schedule_latency_histogram_configuration()`] when building the runtime. + /// + /// # Panics + /// + /// The method panics if `bucket` represents an invalid bucket index, i.e. + /// is greater than or equal to `schedule_latency_histogram_num_buckets()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// for i in 0..buckets { + /// let range = metrics.schedule_latency_histogram_bucket_range(i); + /// println!("Histogram bucket {} range: {:?}", i, range); + /// } + /// }); + /// } + /// ``` + /// + /// [`metrics_schedule_latency_histogram_configuration()`]: crate::runtime::Builder::metrics_schedule_latency_histogram_configuration + pub fn schedule_latency_histogram_bucket_range(&self, bucket: usize) -> Range { + self.handle + .inner + .worker_metrics(0) + .schedule_latency_histogram + .as_ref() + .map(|histogram| { + let range = histogram.bucket_range(bucket); + std::ops::Range { + start: Duration::from_nanos(range.start), + end: Duration::from_nanos(range.end), + } + }) + .unwrap_or_default() + } } feature! { @@ -1027,6 +1145,72 @@ impl RuntimeMetrics { .load(Relaxed); Duration::from_nanos(nanos) } + + /// Returns the number of times the given worker polled tasks with a schedule + /// latency within the given bucket's range. + /// + /// Each worker maintains its own histogram and the counts for each bucket + /// starts at zero when the runtime is created. Each time the worker polls a + /// task, it tracks the time elapsed between when the task was scheduled and + /// when it was polled and increments the associated bucket by 1. + /// + /// Each bucket is a monotonically increasing counter. It is never + /// decremented or reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// `bucket` is the index of the bucket being queried. The bucket is scoped + /// to the worker. The range represented by the bucket can be queried by + /// calling [`schedule_latency_histogram_bucket_range()`]. Each worker maintains + /// identical bucket ranges. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()` or if `bucket` represents an + /// invalid bucket. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// for worker in 0..metrics.num_workers() { + /// for i in 0..buckets { + /// let range = metrics.schedule_latency_histogram_bucket_range(i); + /// let count = metrics.schedule_latency_histogram_bucket_count(worker, i); + /// println!("{} tasks encountered a scheduling latency between {}us and {}us", count, range.start.as_micros(), range.end.as_micros()); + /// } + /// } + /// }); + /// } + /// ``` + /// + /// [`schedule_latency_histogram_bucket_range()`]: crate::runtime::RuntimeMetrics::schedule_latency_histogram_bucket_range + #[track_caller] + pub fn schedule_latency_histogram_bucket_count(&self, worker: usize, bucket: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .schedule_latency_histogram + .as_ref() + .map(|histogram| histogram.get(bucket)) + .unwrap_or_default() + } } feature! { diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 30926b2a6c2..7d3f0ce40cd 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -65,6 +65,9 @@ pub(crate) struct WorkerMetrics { #[cfg(tokio_unstable)] /// If `Some`, tracks the number of polls by duration range. pub(super) poll_count_histogram: Option, + + #[cfg(tokio_unstable)] + pub(super) schedule_latency_histogram: Option, } impl WorkerMetrics { @@ -93,6 +96,10 @@ impl WorkerMetrics { .metrics_poll_count_histogram .as_ref() .map(|histogram_builder| histogram_builder.build()); + worker_metrics.schedule_latency_histogram = config + .metrics_schedule_latency_histogram + .as_ref() + .map(|histogram_builder| histogram_builder.build()); worker_metrics } } diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 68ab17f1402..e4e1f8d0f9a 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -3,7 +3,8 @@ use crate::loom::sync::Arc; use crate::runtime::driver::{self, Driver}; use crate::runtime::scheduler::{self, Defer, Inject}; use crate::runtime::task::{ - self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks, + self, JoinHandle, LocalNotified, OwnedTasks, Schedule, SpawnLocation, Task, + TaskHarnessScheduleHooks, }; use crate::runtime::{ blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics, @@ -363,11 +364,28 @@ fn wake_deferred_tasks_and_free(context: &Context) { impl Context { /// Execute the closure with the given scheduler core stored in the /// thread-local context. - fn run_task(&self, mut core: Box, f: impl FnOnce() -> R) -> (Box, R) { - core.metrics.start_poll(); - let mut ret = self.enter(core, || crate::task::coop::budget(f)); - ret.0.metrics.end_poll(); - ret + fn run_task(&self, task: LocalNotified>, mut core: Box) -> Box { + #[cfg(tokio_unstable)] + let task_meta = task.task_meta(); + + #[cfg(tokio_unstable)] + core.metrics.start_poll(task.get_scheduled_at()); + #[cfg(not(tokio_unstable))] + core.metrics.start_poll(None); + + let (mut c, ()) = self.enter(core, || { + crate::task::coop::budget(|| { + #[cfg(tokio_unstable)] + self.handle.task_hooks.poll_start_callback(&task_meta); + + task.run(); + + #[cfg(tokio_unstable)] + self.handle.task_hooks.poll_stop_callback(&task_meta); + }) + }); + c.metrics.end_poll(); + c } /// Blocks the current thread until an event is received by the driver, @@ -657,6 +675,14 @@ impl Schedule for Arc { fn schedule(&self, task: task::Notified) { use scheduler::Context::CurrentThread; + // SAFETY: There are no concurrent writes because tasks cannot be scheduled in + // multiple places concurrently. There are no concurrent reads because this field + // is only read when polling the task, which can only happen after it's scheduled. + #[cfg(tokio_unstable)] + unsafe { + task.set_scheduled_at(std::time::Instant::now()); + } + context::with_scheduler(|maybe_cx| match maybe_cx { Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => { let mut core = cx.core.borrow_mut(); @@ -806,18 +832,7 @@ impl CoreGuard<'_> { let task = context.handle.shared.owned.assert_owner(task); - #[cfg(tokio_unstable)] - let task_meta = task.task_meta(); - - let (c, ()) = context.run_task(core, || { - #[cfg(tokio_unstable)] - context.handle.task_hooks.poll_start_callback(&task_meta); - - task.run(); - - #[cfg(tokio_unstable)] - context.handle.task_hooks.poll_stop_callback(&task_meta); - }); + let c = context.run_task(task, core); core = c; } diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index c59d4373ab8..840d88c66f0 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -113,8 +113,8 @@ impl Stats { } } - pub(crate) fn start_poll(&mut self) { - self.batch.start_poll(); + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { + self.batch.start_poll(task_scheduled_at); self.tasks_polled_in_batch += 1; } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 72bdc2bd31c..c51d7e60aac 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -629,7 +629,10 @@ impl Context { // tasks under this measurement. In this case, the tasks came from the // LIFO slot and are considered part of the current task for scheduling // purposes. These tasks inherent the "parent"'s limits. - core.stats.start_poll(); + #[cfg(tokio_unstable)] + core.stats.start_poll(task.get_scheduled_at()); + #[cfg(not(tokio_unstable))] + core.stats.start_poll(None); // Make the core available to the runtime context *self.core.borrow_mut() = Some(core); @@ -1271,6 +1274,14 @@ impl Worker { impl Handle { pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { + // SAFETY: There are no concurrent writes because tasks cannot be scheduled in + // multiple places concurrently. There are no concurrent reads because this field + // is only read when polling the task, which can only happen after it's scheduled. + #[cfg(tokio_unstable)] + unsafe { + task.set_scheduled_at(std::time::Instant::now()); + } + with_current(|maybe_cx| { if let Some(cx) = maybe_cx { // Make sure the task is part of the **current** scheduler. diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index aa3f61a2217..c94bcc3e991 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -32,6 +32,8 @@ use std::panic::Location; use std::pin::Pin; use std::ptr::NonNull; use std::task::{Context, Poll, Waker}; +#[cfg(tokio_unstable)] +use std::time::Instant; /// The task cell. Contains the components of the task. /// @@ -191,6 +193,10 @@ pub(crate) struct Header { /// The tracing ID for this instrumented task. #[cfg(all(tokio_unstable, feature = "tracing"))] pub(super) tracing_id: Option, + + /// The last time this task was scheduled. Used to measure schedule latency. + #[cfg(tokio_unstable)] + pub(super) scheduled_at: UnsafeCell>, } unsafe impl Send for Header {} @@ -247,6 +253,8 @@ impl Cell { owner_id: UnsafeCell::new(None), #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id, + #[cfg(tokio_unstable)] + scheduled_at: UnsafeCell::new(None), } } @@ -534,6 +542,25 @@ impl Header { pub(super) unsafe fn get_tracing_id(me: &NonNull
) -> Option<&tracing::Id> { me.as_ref().tracing_id.as_ref() } + + /// Updates the last time this task was scheduled. Used to calculate + /// the time elapsed between task scheduling and polling. + /// + /// # Safety + /// + /// The caller must guarantee exclusive access to this field. + #[cfg(tokio_unstable)] + pub(super) unsafe fn set_scheduled_at(&self, now: Instant) { + self.scheduled_at.with_mut(|ptr| *ptr = Some(now)); + } + + /// Gets the last time this task was scheduled. + #[cfg(tokio_unstable)] + pub(super) fn get_scheduled_at(&self) -> Option { + // Safety: If there are concurrent writes, then that write has violated + // the safety requirements on `set_scheduled_at`. + unsafe { self.scheduled_at.with(|ptr| *ptr) } + } } impl Trailer { diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 53c477d52de..2689d6bd1f7 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -225,6 +225,8 @@ use crate::runtime::TaskCallback; use std::marker::PhantomData; use std::panic::Location; use std::ptr::NonNull; +#[cfg(tokio_unstable)] +use std::time::Instant; use std::{fmt, mem}; /// An owned handle to the task, tracked by ref count. @@ -247,6 +249,13 @@ impl Notified { pub(crate) fn task_meta<'meta>(&self) -> crate::runtime::TaskMeta<'meta> { self.0.task_meta() } + + #[cfg(tokio_unstable)] + pub(crate) unsafe fn set_scheduled_at(&self, now: Instant) { + unsafe { + self.0.header().set_scheduled_at(now); + } + } } // safety: This type cannot be used to touch the task without first verifying @@ -268,6 +277,11 @@ impl LocalNotified { pub(crate) fn task_meta<'meta>(&self) -> crate::runtime::TaskMeta<'meta> { self.task.task_meta() } + + #[cfg(tokio_unstable)] + pub(crate) fn get_scheduled_at(&self) -> Option { + self.task.header().get_scheduled_at() + } } /// A task that is not owned by any `OwnedTasks`. Used for blocking tasks. diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index b6de3159134..1086e14f206 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -800,6 +800,53 @@ fn io_driver_ready_count() { assert_eq!(metrics.io_driver_ready_count(), 1); } +#[test] +fn schedule_latency_counts() { + const N: u64 = 50; + let rts = [ + tokio::runtime::Builder::new_current_thread() + .enable_all() + .enable_metrics_schedule_latency_histogram() + .metrics_schedule_latency_histogram_configuration(HistogramConfiguration::linear( + Duration::from_millis(50), + 3, + )) + .build() + .unwrap(), + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .enable_metrics_schedule_latency_histogram() + .metrics_schedule_latency_histogram_configuration(HistogramConfiguration::linear( + Duration::from_millis(50), + 3, + )) + .build() + .unwrap(), + ]; + + for rt in rts { + let metrics = rt.metrics(); + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async {}).await.unwrap(); + } + }); + drop(rt); + + let num_workers = metrics.num_workers(); + let num_buckets = metrics.schedule_latency_histogram_num_buckets(); + + assert!(metrics.schedule_latency_histogram_enabled()); + assert_eq!(num_buckets, 3); + + let n = (0..num_workers) + .flat_map(|i| (0..num_buckets).map(move |j| (i, j))) + .map(|(worker, bucket)| metrics.schedule_latency_histogram_bucket_count(worker, bucket)) + .sum(); + assert_eq!(N, n); + } +} + async fn try_spawn_stealable_task() -> Result<(), mpsc::RecvTimeoutError> { // We use a blocking channel to synchronize the tasks. let (tx, rx) = mpsc::channel();