diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index aea4de7503e..73f7e2ebacf 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -58,6 +58,9 @@ pub struct Builder { enable_io: bool, nevents: usize, + #[cfg(all(tokio_unstable, feature = "io-uring"))] + uring_setup_sqpoll: Option, + /// Whether or not to enable the time driver enable_time: bool, @@ -275,6 +278,9 @@ impl Builder { enable_io: false, nevents: 1024, + #[cfg(all(tokio_unstable, feature = "io-uring"))] + uring_setup_sqpoll: None, + // Time defaults to "off" enable_time: false, @@ -1598,6 +1604,13 @@ impl Builder { cfg.timer_flavor = TimerFlavor::Traditional; let (driver, driver_handle) = driver::Driver::new(cfg)?; + #[cfg(all(tokio_unstable, feature = "io-uring", target_os = "linux"))] + if let Some(idle) = self.uring_setup_sqpoll { + if let Some(io) = driver_handle.io.as_ref() { + io.setup_uring_sqpoll(idle); + } + } + // Blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); let blocking_spawner = blocking_pool.spawner().clone(); @@ -1742,6 +1755,28 @@ cfg_io_uring! { self.enable_io = true; self } + + /// Enables SQPOLL for the io_uring driver and sets the idle timeout. + /// + /// When SQPOLL is enabled, a kernel thread is created to poll the + /// submission queue. This can reduce syscall overhead. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_io_uring() + /// .uring_setup_sqpoll(2000) + /// .build() + /// .unwrap(); + /// ``` + #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))] + pub fn uring_setup_sqpoll(&mut self, idle: u32) -> &mut Self { + self.uring_setup_sqpoll = Some(idle); + self + } } } @@ -1781,6 +1816,13 @@ cfg_rt_multi_thread! { let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; + #[cfg(all(tokio_unstable, feature = "io-uring", target_os = "linux"))] + if let Some(idle) = self.uring_setup_sqpoll { + if let Some(io) = driver_handle.io.as_ref() { + io.setup_uring_sqpoll(idle); + } + } + // Create the blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads); diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 92b2350db9d..6256d783bce 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -45,7 +45,7 @@ pub(crate) struct Cfg { impl Driver { pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> { - let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?; + let (io_stack, io_handle, signal_handle) = create_io_stack(&cfg)?; let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); @@ -146,12 +146,12 @@ cfg_io_driver! { Disabled(UnparkThread), } - fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> { + fn create_io_stack(cfg: &Cfg) -> io::Result<(IoStack, IoHandle, SignalHandle)> { #[cfg(loom)] - assert!(!enabled); + assert!(!cfg.enable_io); - let ret = if enabled { - let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?; + let ret = if cfg.enable_io { + let (io_driver, io_handle) = crate::runtime::io::Driver::new(cfg.nevents)?; let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?; let process_driver = create_process_driver(signal_driver); @@ -212,7 +212,7 @@ cfg_not_io_driver! { #[derive(Debug)] pub(crate) struct IoStack(ParkThread); - fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> { + fn create_io_stack(_cfg: &Cfg) -> io::Result<(IoStack, IoHandle, SignalHandle)> { let park_thread = ParkThread::new(); let unpark_thread = park_thread.unpark(); Ok((IoStack(park_thread), unpark_thread, Default::default())) diff --git a/tokio/src/runtime/io/driver/uring.rs b/tokio/src/runtime/io/driver/uring.rs index 89c97826bdf..3b7b6381a85 100644 --- a/tokio/src/runtime/io/driver/uring.rs +++ b/tokio/src/runtime/io/driver/uring.rs @@ -15,6 +15,7 @@ const DEFAULT_RING_SIZE: u32 = 256; pub(crate) struct UringContext { pub(crate) uring: Option, pub(crate) ops: slab::Slab, + pub(crate) sqpoll_idle: Option, } impl UringContext { @@ -22,6 +23,7 @@ impl UringContext { Self { ops: Slab::new(), uring: None, + sqpoll_idle: None, } } @@ -44,7 +46,11 @@ impl UringContext { return Ok(false); } - let uring = IoUring::new(DEFAULT_RING_SIZE)?; + let uring = if let Some(idle) = self.sqpoll_idle { + IoUring::builder().setup_sqpoll(idle).build(DEFAULT_RING_SIZE)? + } else { + IoUring::new(DEFAULT_RING_SIZE)? + }; match uring.submitter().register_probe(probe) { Ok(_) => {} @@ -97,6 +103,14 @@ impl UringContext { } pub(crate) fn submit(&mut self) -> io::Result<()> { + if self.sqpoll_idle.is_some() { + let mut sq = self.ring_mut().submission(); + sq.sync(); + if !sq.need_wakeup() { + return Ok(()); + } + } + loop { // Errors from io_uring_enter: https://man7.org/linux/man-pages/man2/io_uring_enter.2.html#ERRORS match self.ring().submit() { @@ -164,6 +178,11 @@ impl Handle { &self.uring_context } + pub(crate) fn setup_uring_sqpoll(&self, idle: u32) { + let mut guard = self.get_uring().lock(); + guard.sqpoll_idle = Some(idle); + } + /// Check if the io_uring context is initialized. If not, it will try to initialize it. /// Then, check if the provided opcode is supported. /// diff --git a/tokio/tests/fs_uring_sqpoll.rs b/tokio/tests/fs_uring_sqpoll.rs new file mode 100644 index 00000000000..5bdadfb9150 --- /dev/null +++ b/tokio/tests/fs_uring_sqpoll.rs @@ -0,0 +1,83 @@ +#![cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" +))] + +use std::io::{Read, Seek, SeekFrom}; +use tempfile::NamedTempFile; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +use tokio::runtime::Builder; + +#[test] +fn test_sqpoll_current_thread() { + let rt = Builder::new_current_thread() + .enable_all() + .uring_setup_sqpoll(1000) + .build() + .unwrap(); + + rt.block_on(async { + let mut temp = NamedTempFile::new().unwrap(); + let path = temp.path().to_path_buf(); + + let mut file = tokio::fs::OpenOptions::new() + .read(true) + .write(true) + .open(&path) + .await + .unwrap(); + + file.write_all(b"hello").await.unwrap(); + file.flush().await.unwrap(); + + // Check if data was actually written to the underlying file + let mut buf = vec![0; 5]; + temp.as_file_mut().seek(SeekFrom::Start(0)).unwrap(); + temp.as_file_mut().read_exact(&mut buf).unwrap(); + assert_eq!(&buf, b"hello"); + + file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + let mut buf = vec![0; 5]; + file.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"hello"); + }); +} + +#[test] +fn test_sqpoll_multi_thread() { + let rt = Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .uring_setup_sqpoll(1000) + .build() + .unwrap(); + + rt.block_on(async { + let mut temp = NamedTempFile::new().unwrap(); + let path = temp.path().to_path_buf(); + + let mut file = tokio::fs::OpenOptions::new() + .read(true) + .write(true) + .open(&path) + .await + .unwrap(); + + file.write_all(b"world").await.unwrap(); + file.flush().await.unwrap(); + + // Check if data was actually written to the underlying file + let mut buf = vec![0; 5]; + temp.as_file_mut().seek(SeekFrom::Start(0)).unwrap(); + temp.as_file_mut().read_exact(&mut buf).unwrap(); + assert_eq!(&buf, b"world"); + + file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + let mut buf = vec![0; 5]; + file.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"world"); + }); +}