Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ pub struct Builder {
enable_io: bool,
nevents: usize,

#[cfg(all(tokio_unstable, feature = "io-uring"))]
uring_setup_sqpoll: Option<u32>,

/// Whether or not to enable the time driver
enable_time: bool,

Expand Down Expand Up @@ -275,6 +278,9 @@ impl Builder {
enable_io: false,
nevents: 1024,

#[cfg(all(tokio_unstable, feature = "io-uring"))]
uring_setup_sqpoll: None,

// Time defaults to "off"
enable_time: false,

Expand Down Expand Up @@ -1598,6 +1604,13 @@ impl Builder {
cfg.timer_flavor = TimerFlavor::Traditional;
let (driver, driver_handle) = driver::Driver::new(cfg)?;

#[cfg(all(tokio_unstable, feature = "io-uring", target_os = "linux"))]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cfg here doesn’t match the conditions under which the io_uring pieces (including setup_uring_sqpoll) are actually compiled (cfg_io_uring! also requires feature = "rt" and feature = "fs"). As written, building with tokio_unstable + io-uring on Linux but without fs can try to compile this call path and fail due to missing symbols.

Severity: high

Other Locations
  • tokio/src/runtime/builder.rs:1819

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! All io_uring related functionalities should be gated behind the same cfg!. Otherwise some methods may be visible in some config but not usable due to others being missing in this config.

if let Some(idle) = self.uring_setup_sqpoll {
if let Some(io) = driver_handle.io.as_ref() {
io.setup_uring_sqpoll(idle);
}
}

// Blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
let blocking_spawner = blocking_pool.spawner().clone();
Expand Down Expand Up @@ -1742,6 +1755,28 @@ cfg_io_uring! {
self.enable_io = true;
self
}

/// Enables SQPOLL for the io_uring driver and sets the idle timeout.
///
/// When SQPOLL is enabled, a kernel thread is created to poll the
/// submission queue. This can reduce syscall overhead.
///
/// # Examples
///
/// ```
/// use tokio::runtime;
///
/// let rt = runtime::Builder::new_multi_thread()
/// .enable_io_uring()
/// .uring_setup_sqpoll(2000)
/// .build()
/// .unwrap();
/// ```
#[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
pub fn uring_setup_sqpoll(&mut self, idle: u32) -> &mut Self {
self.uring_setup_sqpoll = Some(idle);
self
}
}
}

Expand Down Expand Up @@ -1781,6 +1816,13 @@ cfg_rt_multi_thread! {

let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

#[cfg(all(tokio_unstable, feature = "io-uring", target_os = "linux"))]
if let Some(idle) = self.uring_setup_sqpoll {
if let Some(io) = driver_handle.io.as_ref() {
io.setup_uring_sqpoll(idle);
}
}
Comment on lines +1819 to +1824
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of code is identical to the one in build_current_thread_runtime_components (lines 1607-1612). To avoid duplication and improve maintainability, consider extracting this logic into a private helper function.

For example, you could add a helper method like this:

#[cfg(all(tokio_unstable, feature = "io-uring", target_os = "linux"))]
fn setup_uring_sqpoll(&self, driver_handle: &driver::Handle) {
    if let Some(idle) = self.uring_setup_sqpoll {
        if let Some(io) = driver_handle.io.as_ref() {
            io.setup_uring_sqpoll(idle);
        }
    }
}

And then call it from both build_current_thread_runtime_components and build_threaded_runtime.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! The same snippet of code is used in two places. It would be good to extract it to a helper function and reuse it. This would prevent any eventual regressions in the future if a bug is fixed in only one of the duplicated snippets.


// Create the blocking pool
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub(crate) struct Cfg {

impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
let (io_stack, io_handle, signal_handle) = create_io_stack(&cfg)?;

let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

Expand Down Expand Up @@ -146,12 +146,12 @@ cfg_io_driver! {
Disabled(UnparkThread),
}

fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
fn create_io_stack(cfg: &Cfg) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
#[cfg(loom)]
assert!(!enabled);
assert!(!cfg.enable_io);

let ret = if enabled {
let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;
let ret = if cfg.enable_io {
let (io_driver, io_handle) = crate::runtime::io::Driver::new(cfg.nevents)?;

let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
let process_driver = create_process_driver(signal_driver);
Expand Down Expand Up @@ -212,7 +212,7 @@ cfg_not_io_driver! {
#[derive(Debug)]
pub(crate) struct IoStack(ParkThread);

fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
fn create_io_stack(_cfg: &Cfg) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
let park_thread = ParkThread::new();
let unpark_thread = park_thread.unpark();
Ok((IoStack(park_thread), unpark_thread, Default::default()))
Expand Down
21 changes: 20 additions & 1 deletion tokio/src/runtime/io/driver/uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ const DEFAULT_RING_SIZE: u32 = 256;
pub(crate) struct UringContext {
pub(crate) uring: Option<io_uring::IoUring>,
pub(crate) ops: slab::Slab<Lifecycle>,
pub(crate) sqpoll_idle: Option<u32>,
}

impl UringContext {
pub(crate) fn new() -> Self {
Self {
ops: Slab::new(),
uring: None,
sqpoll_idle: None,
}
}

Expand All @@ -44,7 +46,11 @@ impl UringContext {
return Ok(false);
}

let uring = IoUring::new(DEFAULT_RING_SIZE)?;
let uring = if let Some(idle) = self.sqpoll_idle {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setup_sqpoll can fail with EPERM on systems that allow regular io_uring but don’t permit SQPOLL; since check_and_init treats EPERM as “io_uring unavailable”, this would disable io_uring entirely when SQPOLL is requested. Consider distinguishing SQPOLL permission errors from “io_uring blocked” errors so non-SQPOLL io_uring can still be used.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:valid-but-wont-fix; category:bug; feedback: The Augment AI reviewer is нот correct! If the SQPOLL setup fails then the error should be returned to the caller. Then it can try to setup io_uring without SQPOLL. The Tokio library should not try to do this automatically without letting know the developer.

IoUring::builder().setup_sqpoll(idle).build(DEFAULT_RING_SIZE)?
} else {
IoUring::new(DEFAULT_RING_SIZE)?
};

match uring.submitter().register_probe(probe) {
Ok(_) => {}
Expand Down Expand Up @@ -97,6 +103,14 @@ impl UringContext {
}

pub(crate) fn submit(&mut self) -> io::Result<()> {
if self.sqpoll_idle.is_some() {
let mut sq = self.ring_mut().submission();
sq.sync();
if !sq.need_wakeup() {
return Ok(());
}
}

loop {
// Errors from io_uring_enter: https://man7.org/linux/man-pages/man2/io_uring_enter.2.html#ERRORS
match self.ring().submit() {
Expand Down Expand Up @@ -164,6 +178,11 @@ impl Handle {
&self.uring_context
}

pub(crate) fn setup_uring_sqpoll(&self, idle: u32) {
let mut guard = self.get_uring().lock();
guard.sqpoll_idle = Some(idle);
}

/// Check if the io_uring context is initialized. If not, it will try to initialize it.
/// Then, check if the provided opcode is supported.
///
Expand Down
83 changes: 83 additions & 0 deletions tokio/tests/fs_uring_sqpoll.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#![cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]

use std::io::{Read, Seek, SeekFrom};
use tempfile::NamedTempFile;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::runtime::Builder;

#[test]
fn test_sqpoll_current_thread() {
let rt = Builder::new_current_thread()
.enable_all()
.uring_setup_sqpoll(1000)
.build()
.unwrap();

rt.block_on(async {
let mut temp = NamedTempFile::new().unwrap();
let path = temp.path().to_path_buf();

let mut file = tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.await
.unwrap();

file.write_all(b"hello").await.unwrap();
file.flush().await.unwrap();

// Check if data was actually written to the underlying file
let mut buf = vec![0; 5];
temp.as_file_mut().seek(SeekFrom::Start(0)).unwrap();
temp.as_file_mut().read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"hello");

file.seek(std::io::SeekFrom::Start(0)).await.unwrap();
let mut buf = vec![0; 5];
file.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello");
});
}

#[test]
fn test_sqpoll_multi_thread() {
let rt = Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.uring_setup_sqpoll(1000)
.build()
.unwrap();

rt.block_on(async {
let mut temp = NamedTempFile::new().unwrap();
let path = temp.path().to_path_buf();

let mut file = tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.await
.unwrap();

file.write_all(b"world").await.unwrap();
file.flush().await.unwrap();

// Check if data was actually written to the underlying file
let mut buf = vec![0; 5];
temp.as_file_mut().seek(SeekFrom::Start(0)).unwrap();
temp.as_file_mut().read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"world");

file.seek(std::io::SeekFrom::Start(0)).await.unwrap();
let mut buf = vec![0; 5];
file.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"world");
});
}